package com.gobrs.async.core;
import com.gobrs.async.core.callback.ErrorCallback;
import com.gobrs.async.core.common.domain.AnyConditionResult;
import com.gobrs.async.core.common.domain.AsyncParam;
import com.gobrs.async.core.common.domain.TaskResult;
import com.gobrs.async.core.common.domain.TaskStatus;
import com.gobrs.async.core.common.enums.ExpState;
import com.gobrs.async.core.common.enums.ResultState;
import com.gobrs.async.core.common.enums.TaskEnum;
import com.gobrs.async.core.common.exception.GobrsForceStopException;
import com.gobrs.async.core.common.exception.ManualStopException;
import com.gobrs.async.core.config.ConfigManager;
import com.gobrs.async.core.log.LogWrapper;
import com.gobrs.async.core.log.TraceUtil;
import com.gobrs.async.core.task.AsyncTask;
import com.gobrs.async.core.task.TaskUtil;
import com.gobrs.async.core.timer.GobrsFutureTask;
import com.gobrs.async.core.timer.GobrsTimer;
import com.gobrs.async.core.timer.TimerListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import java.lang.ref.Reference;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static com.gobrs.async.core.common.def.DefaultConfig.*;
import static com.gobrs.async.core.common.enums.InterruptEnum.INTERRUPTED;
import static com.gobrs.async.core.common.enums.InterruptEnum.INTERRUPTTING;
import static com.gobrs.async.core.task.ReUsing.reusing;
import static com.gobrs.async.core.timer.GobrsFutureTask.STOP_STAMP;
/**
* The type Task actuator.
*
* @param <Param> the type parameter
* @param <Result> the type parameter
*/
@Slf4j
public class TaskActuator<Param, Result> implements Callable, Cloneable {
/**
* The Support.
*/
public TaskSupport support;
/**
* Tasks to be performed
*/
public final AsyncTask<Param, Result> task;
/**
* Upstream dependent quantity
*/
public volatile int upstreamDepdends;
/**
* depend com.gobrs.async.com.gobrs.async.test.task
*/
public final List<AsyncTask<?, ?>> subTasks;
/**
* The Param.
*/
public AsyncParam<Param> param;
private Lock lock;
private Map<AsyncTask<?, ?>, List<AsyncTask<?, ?>>> upwardTasksMap;
/**
* Instantiates a new Task actuator.
*
* @param asyncTask the async com.gobrs.async.com.gobrs.async.test.task
* @param depends the depends
* @param subTasks the sub tasks
*/
TaskActuator(AsyncTask<Param, Result> asyncTask, int depends, List<AsyncTask<?, ?>> subTasks) {
this.task = asyncTask;
this.upstreamDepdends = depends > 1 & task.isAny() ? 1 : depends;
this.subTasks = subTasks;
}
/**
* Instantiates a new Task actuator.
*
* @param asyncTask the async com.gobrs.async.com.gobrs.async.test.task
* @param depends the depends
* @param subTasks the sub tasks
* @param upwardTasksMap the upward tasks map
*/
TaskActuator(AsyncTask<Param, Result> asyncTask, int depends, List<AsyncTask<?, ?>> subTasks, Map<AsyncTask<?, ?>, List<AsyncTask<?, ?>>> upwardTasksMap) {
this.task = asyncTask;
this.upstreamDepdends = depends > 1 & task.isAny() ? 1 : depends;
this.subTasks = subTasks;
this.upwardTasksMap = upwardTasksMap;
}
/**
* Initialize the object cloned from prototype.
*
* @param support the support
* @param param the param
*/
void init(TaskSupport support, AsyncParam param) {
this.support = support;
this.param = param;
}
@Override
public Object call() throws Exception {
Param parameter = getParameter(task);
preparation();
TaskLoader taskLoader = support.getTaskLoader();
/**
* If the conditions are not met
* no execution is performed
*/
Result result = null;
try {
/**
* 判断任务是否有必要执行
* 1、necessary 返回true
* 2、如果具备执行结果 则无需执行
*/
if (executeNecessary(parameter, task)) {
task.prepare(parameter);
/**
* Unified front intercept
* 统一前置处理
*/
taskLoader.preInterceptor(parameter, task.getName());
/**
* Perform a com.gobrs.async.com.gobrs.async.test.task
* 执行核心任务处理
*/
result = task.taskAdapter(parameter, support);
/**
* Setting Task Results
* 设置任务结果
*/
result(result);
stopAsync0(parameter, support);
/**
* 状态改变
*/
change();
/**
* 数量统计
*/
statisticsOptimalCount();
/**
* Post-processing of tasks
* 后置任务
*/
taskLoader.postInterceptor(result, task.getName());
/**
* Success com.gobrs.async.callback
* 执行成功回调
*/
task.onSuccess(support);
}
noRepeat(taskLoader, result);
} catch (Exception e) {
try {
exceptionProcess(parameter, taskLoader, e);
} catch (Exception exception) {
if (log.isErrorEnabled()) {
log.error("<{}> [{}] exceptionProcess error {} ", TraceUtil.get(), task.getName(), e);
}
taskLoader.stopSingleTaskLine(subTasks);
}
} finally {
clear();
stopOrRelease(parameter, taskLoader);
}
return result;
}
private void result(Result result) {
if (TaskEnum.METHOD.getType().equals(task.getType())) {
support.getResultMap().put(task.getName(), buildSuccessResult(result));
} else {
support.getResultMap().put(task.getClass(), buildSuccessResult(result));
}
}
private void stopAsync0(Object parameter, TaskSupport support) {
AtomicInteger diagnose = support.getTaskLoader().getINTERRUPTFLAG();
if (diagnose.get() == INTERRUPTTING.getState() && diagnose.compareAndSet(INTERRUPTTING.getState(), INTERRUPTED.getState())) {
ErrorCallback<Object> errorCallback = new ErrorCallback<Object>(() -> parameter, null, support, task);
LogWrapper logWrapper = support.getLogWrapper();
if (Objects.nonNull(logWrapper)) {
logWrapper.setStopTaskName(task.getName());
}
support.taskLoader.setExpCode(new AtomicInteger(ExpState.STOP_ASYNC.getCode()));
support.getResultMap().put(task.getName(), buildErrorResult(null, new ManualStopException("Manually executing stopAsync Exception")));
support.getTaskLoader().isRunning().set(false);
support.getTaskLoader().errorInterrupted(errorCallback);
}
}
/**
* 停止任务 或 释放资源
*
* @param parameter
* @param taskLoader
*/
private void stopOrRelease(Object parameter, TaskLoader<Param, Result> taskLoader) throws Exception {
if (task.getTimeoutInMilliseconds() > TASK_TIME_OUT) {
futureStopRelease(parameter, taskLoader);
}
if (task.isExclusive()) {
releaseFutureTasks();
}
}
/**
* 根据中断位强制释放资源 针对开发者使用死循环等问题fix
没有合适的资源?快使用搜索试试~ 我知道了~
温馨提示
该项目是采用Java编写的gobrs-async多线程并发编程框架的源码,总计包含445个文件,涵盖了242个Java源文件、47个Markdown文档、36个PNG图片、36个Vue组件、20个JavaScript文件、14个XML配置文件、13个Stylus样式表、5个YAML配置文件、4个JPG图片、3个TypeScript文件。该框架旨在为企业提供高效的多线程并发解决方案,尤其适用于电商等高并发场景,能够快速应对复杂的并发需求。如果您在多线程开发上遇到挑战,不妨尝试使用gobrs-async,体验其带来的便利!
资源推荐
资源详情
资源评论
收起资源包目录
基于Java的多线程并发编程框架gobrs-async设计源码 (443个子文件)
index.css 741B
spring.factories 188B
spring.factories 128B
.gitignore 15B
ssr.html 481B
dev.html 247B
favicon.ico 289KB
TaskActuator.java 25KB
TaskLoader.java 18KB
ExtensionLoader.java 14KB
GobrsFutureTask.java 14KB
TaskTrigger.java 11KB
ThreadPoolBuilder.java 10KB
RuleParseEngine.java 10KB
AsyncTask.java 10KB
MethodComponentScanner.java 7KB
GobrsAsync.java 6KB
MethodTaskAdapter.java 6KB
GobrsAutoConfiguration.java 6KB
IdWorker.java 5KB
JsonUtil.java 5KB
GobrsAsyncThreadPoolFactory.java 4KB
AnnotationUtil.java 4KB
Optimal.java 4KB
GobrsPropertyAutoConfiguration.java 4KB
GobrsAsyncProperties.java 4KB
CaseFuture.java 4KB
BeanHolder.java 3KB
ConfigManager.java 3KB
CaseMethodTaskOne.java 3KB
GobrsTimer.java 3KB
GobrsConfig.java 3KB
RulePostProcessor.java 3KB
CaseAnyCondition.java 3KB
TaskFlow.java 3KB
SystemClock.java 3KB
GobrsService.java 3KB
CServiceCondition.java 3KB
GobrsPrint.java 3KB
ErrorCallback.java 3KB
TaskReceive.java 2KB
LogCreator.java 2KB
TaskSupport.java 2KB
ParalCountDownLatch.java 2KB
CaseMethodTask.java 2KB
ScheduledExecutor.java 2KB
GobrsThreadPoolConfiguration.java 2KB
GobrsController.java 2KB
CService.java 2KB
CaseRetryTaskB.java 2KB
Hippo4jAutoConfiguration.java 2KB
TaskUtil.java 2KB
TimerListener.java 2KB
FutureTaskC.java 2KB
GobrsTask.java 2KB
ThreadPool.java 2KB
InterruptTaskC.java 2KB
GobbrsPropertySourceFactory.java 2KB
TaskStatus.java 2KB
RuleThermalLoad.java 2KB
GobrsMethodTaskConfiguration.java 2KB
DefaultConfig.java 2KB
ExtensionLoaderTest.java 2KB
CaseParam.java 1KB
GobrsTaskMethodEnum.java 1KB
AServiceCondition.java 1KB
BeanProxy.java 1KB
MethodConfig.java 1KB
BService.java 1KB
UUIDGenerator.java 1KB
SpiExtensionFactory.java 1KB
FutureResult.java 1KB
Task.java 1KB
TaskResult.java 1KB
BServiceCondition.java 1KB
EService.java 1KB
DService.java 1KB
FService.java 1KB
GService.java 1KB
HService.java 1KB
Invoke.java 1KB
CaseTimeoutTaskC.java 1KB
Realize.java 1KB
CaseGeneral.java 1KB
InterruptEnum.java 1KB
ProxyUtil.java 1KB
GobrsAsyncException.java 1KB
AService.java 1KB
ExpState.java 1KB
AsyncTaskTimeoutException.java 1KB
ExtensionFactory.java 1KB
AbstractEngine.java 1KB
CaseFive.java 1KB
GobrsForceStopException.java 1KB
TaskEnum.java 1KB
InterruptTaskA.java 1KB
CaseTimeoutTaskA.java 1KB
CaseOptional.java 1KB
CaseTimeoutTaskB.java 1KB
FutureTaskA.java 1KB
共 443 条
- 1
- 2
- 3
- 4
- 5
资源评论
xyq2024
- 粉丝: 2498
- 资源: 5467
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功