package org.example.stacker_v2;
import org.example.stacker_v2.domain.*;
import org.example.stacker_v2.domain.StackerTask;
import org.example.stacker_v2.domain.StackerTaskResult;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
/**
* 堆垛机管理类
*/
@Slf4j
public class StackerManager {
private Map<String/*stacker id*/, Stacker> stackerMap=new HashMap<>();
@Deprecated
private Map<String/*stacker id*/, LinkedBlockingQueue<StackerTask>> taskQueueMap =new HashMap<>();
@Deprecated
private Map<Long/*task id*/, Function<StackerTaskResult,Boolean>> taskCallbackMap=new HashMap<>();
private ScheduledExecutorService scheduledExecutorService= Executors.newSingleThreadScheduledExecutor();
private ReentrantLock healthCheckLock= new ReentrantLock();
private ReentrantLock pickLock=new ReentrantLock();
/**
* 健康检查线程池
*/
private ExecutorService heartbeatPool=Executors.newSingleThreadExecutor();
private ExecutorService taskPickPool=Executors.newSingleThreadExecutor();
private ExecutorService taskWorkerPool;
private Map<String/*stacker id*/,Thread> workerMap=new HashMap<>();
private StackerNotify stackerNotify;
private TaskPicker taskPicker;
public StackerManager(int stackerCount,StackerNotify stackerNotify){
// 匹配堆垛机的数量
taskWorkerPool =Executors.newFixedThreadPool(stackerCount);
this.stackerNotify=stackerNotify;
}
public void start(){
scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
if(healthCheckLock.isLocked()){
return;
}
healthCheckLock.lock();
// 检查堆垛机状态
heartbeatPool.execute(new Runnable() {
@Override
public void run() {
try {
for (Stacker stacker: stackerMap.values()) {
stacker.checkHealth();
}
}finally {
healthCheckLock.unlock();
}
}
});
// 检查工作线程状态
for (Map.Entry<String,Thread> t : workerMap.entrySet()) {
if(!t.getValue().isAlive()){
rebuildWorker(stackerMap.get(t.getKey()));
}
}
taskPickPool.execute(new Runnable() {
@Override
public void run() {
if(pickLock.isLocked()){
return;
}
pickLock.lock();
try {
// 读取任务
List<StackerTask> taskList = taskPicker.take();
for (StackerTask t : taskList) {
asyncExecuteTask(t, result -> taskPicker.callback(result));
}
}finally {
pickLock.unlock();
}
}
});
}
}, 0, 500, TimeUnit.MILLISECONDS);
// 启动堆垛机,执行任务
for (Stacker stacker: stackerMap.values()) {
buildWorker(stacker);
}
}
private void buildWorker(Stacker stacker){
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
StackerInvoker stackerInvoker = invokerMap.get(stacker.getId());
stackerInvoker.execute();
}
}, "stacker-"+stacker.getId());
// 注册工作线程
workerMap.put(stacker.getId(),thread);
// 执行工作线程
taskWorkerPool.execute(thread);
}
/**
* 重建工作线程
* @param stacker
*/
private void rebuildWorker(Stacker stacker){
// 中止线程
workerMap.get(stacker.getId()).interrupt();
if(stacker.getStackerStatus().isHealth()){
buildWorker(stacker);
}else {
log.warn("堆垛机无法连接,销毁工作线程:{}",stacker.getId());
stackerNotify.notify(stacker.getId(), "无法连接堆垛机");
}
}
public void registerStacker(String id,String ip,int port){
stackerMap.put(id,new Stacker(id,ip,port));
}
/**
* 任务注册状态
* @param task
* @param callback
* @return
*/
public boolean asyncExecuteTask(StackerTask task, Function<StackerTaskResult,Boolean> callback){
Stacker stacker = stackerMap.get(task.getTunnel());
if(!stacker.getStackerStatus().isHealth()){
log.error("堆垛机{}异常,无法注册任务:{}",stacker.getId(),task.getTaskId());
return false;
}
taskQueueMap.get(task.getTunnel()).add(task);
taskCallbackMap.put(task.getTaskId(),callback);
// 异步放入成功
return true;
}
}