/*
*
* * Copyright (c) 2020-2024, Lykan (jiashuomeng@gmail.com).
* * <p>
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* * <p>
* * http://www.apache.org/licenses/LICENSE-2.0
* * <p>
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/
package cn.kstry.framework.core.engine;
import cn.kstry.framework.core.bpmn.*;
import cn.kstry.framework.core.bpmn.enums.BpmnTypeEnum;
import cn.kstry.framework.core.bpmn.enums.IterateStrategyEnum;
import cn.kstry.framework.core.bpmn.extend.ElementIterable;
import cn.kstry.framework.core.bpmn.impl.BasicElementIterable;
import cn.kstry.framework.core.bus.ContextStoryBus;
import cn.kstry.framework.core.bus.IterDataItem;
import cn.kstry.framework.core.bus.StoryBus;
import cn.kstry.framework.core.component.hook.AsyncFlowHook;
import cn.kstry.framework.core.component.hook.Hook;
import cn.kstry.framework.core.component.limiter.RateLimiterConfig;
import cn.kstry.framework.core.component.limiter.strategy.DemotionFailAcquireStrategy;
import cn.kstry.framework.core.component.limiter.strategy.FailAcquireStrategy;
import cn.kstry.framework.core.container.component.InvokeProperties;
import cn.kstry.framework.core.container.component.MethodWrapper;
import cn.kstry.framework.core.container.component.TaskServiceDef;
import cn.kstry.framework.core.engine.future.FlowTaskSubscriber;
import cn.kstry.framework.core.engine.future.RetryFlowTaskSubscriber;
import cn.kstry.framework.core.engine.interceptor.SubProcessInterceptorRepository;
import cn.kstry.framework.core.engine.interceptor.TaskInterceptorRepository;
import cn.kstry.framework.core.engine.thread.FragmentTask;
import cn.kstry.framework.core.engine.thread.InvokeMethodThreadLocal;
import cn.kstry.framework.core.engine.thread.MonoFlowTask;
import cn.kstry.framework.core.exception.BusinessException;
import cn.kstry.framework.core.exception.ExceptionEnum;
import cn.kstry.framework.core.exception.KstryException;
import cn.kstry.framework.core.monitor.DemotionInfo;
import cn.kstry.framework.core.monitor.MonitorTracking;
import cn.kstry.framework.core.role.Role;
import cn.kstry.framework.core.util.AssertUtil;
import cn.kstry.framework.core.util.ExceptionUtil;
import cn.kstry.framework.core.util.GlobalUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.lang.reflect.Array;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
/**
* 流程任务执行核心
*
* @author lykan
*/
public abstract class FlowTaskCore<T> extends BasicTaskCore<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(FlowTaskCore.class);
public FlowTaskCore(StoryEngineModule engineModule, FlowRegister flowRegister, Role role, StoryBus storyBus) {
super(engineModule, flowRegister, storyBus, role, GlobalUtil.getTaskName(flowRegister.getStartElement(), flowRegister.getRequestId()));
}
protected void doExe(Role role, StoryBus storyBus, FlowRegister flowRegister) {
ContextStoryBus csd;
Optional<FlowElement> next;
for (csd = new ContextStoryBus(storyBus), next = flowRegister.nextElement(csd); next.isPresent(); csd = new ContextStoryBus(storyBus), next = flowRegister.nextElement(csd)) {
FlowElement flowElement = next.get();
if (!doInvoke(role, storyBus, flowRegister, flowElement)) {
break;
}
Optional<AsyncFlowHook<List<FlowElement>>> asyncFlowHook = flowRegister.predictNextElement(csd, flowElement);
if (asyncFlowHook.isPresent() && BooleanUtils.isTrue(asyncFlowHook.get().openAsync())) {
submitAsyncTask(role, storyBus, flowRegister, asyncFlowHook.get());
}
}
}
private boolean doInvoke(Role role, StoryBus storyBus, FlowRegister flowRegister, FlowElement flowElement) {
if (flowElement.getElementType() == BpmnTypeEnum.SUB_PROCESS) {
SubProcess subProcess = GlobalUtil.transferNotEmpty(flowElement, SubProcess.class);
try {
InvokeMethodThreadLocal.setCycleTimes(flowRegister.getCycleTimes());
if (!subProcess.getConditionExpression().map(e -> e.condition(storyBus)).orElse(true)) {
return true;
}
} finally {
InvokeMethodThreadLocal.clearCycleTimes();
}
SubProcessInterceptorRepository subInterceptorRepository = engineModule.getSubInterceptorRepository();
if (!subInterceptorRepository.postBeforeProcessor(storyBus, subProcess.getStartEvent().getId(), flowRegister.getStoryId())) {
return true;
}
subProcessTaskHandler(role, storyBus, flowRegister, subProcess);
return false;
}
if (flowElement.getElementType() != BpmnTypeEnum.SERVICE_TASK) {
return true;
}
Object result;
MethodWrapper methodWrapper;
TaskServiceDef taskServiceDef;
ServiceTask serviceTask = (ServiceTask) flowElement;
try {
InvokeMethodThreadLocal.whenServiceInvoke(flowRegister, null, serviceTask, storyBus.getBusinessId());
if (!serviceTask.getConditionExpression().map(e -> e.condition(storyBus)).orElse(true)) {
flowRegister.getMonitorTracking().expressionTracking(storyBus.getScopeDataOperator(), serviceTask, false);
flowRegister.getMonitorTracking().finishTaskTracking(storyBus.getScopeDataOperator(), flowElement, null);
return true;
}
Optional<TaskServiceDef> taskServiceDefOptional = engineModule.getTaskContainer().getTaskServiceDef(serviceTask.getTaskComponent(), serviceTask.getTaskService(), role);
if (!taskServiceDefOptional.isPresent() && serviceTask.allowAbsent()) {
flowRegister.getMonitorTracking().finishTaskTracking(storyBus.getScopeDataOperator(), flowElement, null);
return true;
}
taskServiceDef = taskServiceDefOptional.orElseThrow(() -> ExceptionUtil.buildException(null,
ExceptionEnum.TASK_SERVICE_MATCH_ERROR, ExceptionEnum.TASK_SERVICE_MATCH_ERROR.getDesc() + GlobalUtil.format(" service task identity: {}", serviceTask.identity())));
methodWrapper = taskServiceDef.getMethodWrapper();
flowRegister.getMonitorTracking().getServiceNodeTracking(flowElement).ifPresent(nodeTracking -> {
nodeTracking.setThreadId(Thread.currentThread().getName());
nodeTracking.setMethodName(methodWrapper.getMethod().getName());
nodeTracking.setTargetName(taskServiceDef.getTaskComponentTarget().getTarget().getClass().getName());
nodeTracking.setAbility(Optional.ofNullable(methodWrapper.getAbility()).filter(StringUtils::isNotBlank).orElse(null));
});
} finally {
I
没有合适的资源?快使用搜索试试~ 我知道了~
温馨提示
该项目是一个基于Java开发的Kstry业务架构可视化轻量级框架设计源码,包含798个文件,涵盖494个Java源文件、89个SVG图形文件、69个JavaScript文件、28个BPMN流程定义文件、28个Vue组件文件、24个PNG图片文件、16个TypeScript文件、15个JSON配置文件、10个SCSS样式文件和6个YAML配置文件。Kstry以其可视化、业务隔离和轻量级特性成为业务架构的首选工具,适用于流程编排、并发编程、规则判断、微服务整合、动态微服务判定、数据字典和平台能力建设等多种场景。更多详情请访问项目主页:http://kstry.cn
资源推荐
资源详情
资源评论
收起资源包目录
基于Java的Kstry业务架构可视化轻量级框架设计源码 (826个子文件)
test_flow_01.bpmn 112KB
test_flow_04.bpmn 86KB
component-demo-process.bpmn 70KB
subprocess_test_001.bpmn 60KB
test_flow_03.bpmn 40KB
mono_flow_test.bpmn 39KB
test_flow_02.bpmn 32KB
async_method_test.bpmn 30KB
test_flow_02_sub.bpmn 28KB
goods2.bpmn 28KB
student-score-query-process.bpmn 26KB
goods.bpmn 24KB
test_role_01.bpmn 23KB
test_iterator_01.bpmn 17KB
student-score-query-process.bpmn 16KB
http-action-process.bpmn 14KB
test_bus_01.bpmn 12KB
test_iterator_02.bpmn 8KB
mono_result_test.bpmn 7KB
load_bpmn_005.bpmn 6KB
load_bpmn_006.bpmn 6KB
load_bpmn_007.bpmn 5KB
test_diagram_01.bpmn 4KB
load_bpmn_00302.bpmn 3KB
load_bpmn_00301.bpmn 3KB
load_bpmn_00401.bpmn 3KB
load_bpmn_00402.bpmn 3KB
test_kv_001.bpmn 3KB
load_bpmn_002.bpmn 55B
.browserslistrc 30B
nginx.conf 925B
.gitignore 680B
.gitignore 420B
.gitignore 231B
index.html 810B
favicon.ico 956B
logo.ico 956B
FlowTaskCore.java 42KB
JsonProcessModelTransfer.java 40KB
CamundaProcessModelTransfer.java 38KB
LambdaServiceSupport.java 23KB
OperatorStoryBus.java 22KB
FlowDemoCase2Test.java 19KB
TaskParamParser.java 19KB
StoryEngine.java 19KB
MonitorTracking.java 17KB
FlowRegister.java 16KB
MethodWrapper.java 16KB
BasicTaskCore.java 16KB
FlowCase01Test.java 16KB
VerifyFlowPostProcessor.java 15KB
ElementParserUtil.java 15KB
Exp.java 15KB
BasicStoryBus.java 15KB
KstryContextResolver.java 13KB
JsonSerializeIterator.java 13KB
DiagramCase01Test.java 12KB
TaskServiceExecutor.java 12KB
TaskComponentRepository.java 11KB
PeekStrategyRepository.java 11KB
AdminTaskFuture.java 11KB
LambdaUtil.java 11KB
Case1BpmnDiagramConfiguration.java 10KB
ExceptionEnum.java 10KB
ProcessLink.java 10KB
IteratorSubProcessCaseTest.java 10KB
BpmnDiagramLink.java 10KB
PropertyUtil.java 10KB
HttpActionService.java 9KB
MonoFlowTest.java 9KB
AssertUtil.java 9KB
ProxyUtil.java 9KB
ProcessConfig.java 9KB
ServiceTaskImpl.java 8KB
DynamicBpmnDiagram.java 8KB
StartEventFactory.java 8KB
AsyncTaskUtil.java 8KB
TypeConverterProcessor.java 7KB
MonoSuccessService.java 7KB
NodeTracking.java 7KB
ComponentImportSelector.java 7KB
SpringDynamicComponent.java 7KB
StudentScoreService.java 7KB
RoleCaseTest.java 7KB
StoryEngineModule.java 7KB
FlowDemoCaseTest.java 7KB
StudentScoreService.java 7KB
AsyncMethodTest.java 7KB
SubProcessTest.java 7KB
RearrangeFlowPostProcessor.java 7KB
ElementPropertyUtil.java 7KB
TaskParamWrapper.java 7KB
DynamicBpmnSubDiagram.java 7KB
FlowCase04Test.java 7KB
StudentController.java 7KB
StoryRequest.java 7KB
PermissionUtil.java 6KB
LocalSingleNodeRateLimiter.java 6KB
MarkIndexPostProcessor.java 6KB
FlowTaskSubscriber.java 6KB
共 826 条
- 1
- 2
- 3
- 4
- 5
- 6
- 9
资源评论
lly202406
- 粉丝: 3011
- 资源: 5529
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功