package com.happy3w.task.composer;
import com.happy3w.toolkits.message.MessageRecorderException;
import java.text.MessageFormat;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Stack;
public class TaskComposer implements TaskExecuteContext {
private final Map<Long, TaskStatusHolder> holderMap;
private final Map<String, TaskStatusHolder> outputHolderMap;
private final Map<String, Object> outputValues = new HashMap<>();
public TaskComposer(Map<Long, TaskStatusHolder> holderMap, Map<String, TaskStatusHolder> outputHolderMap) {
this.holderMap = holderMap;
this.outputHolderMap = outputHolderMap;
}
public TaskComposer resetContext() {
outputValues.clear();
for (TaskStatusHolder holder : outputHolderMap.values()) {
holder.setStatus(TaskStatus.waiting);
}
return this;
}
public TaskComposer withValues(Map<String, Object> values) {
this.outputValues.putAll(values);
return this;
}
public List<DependItem> dependItemList() {
return TaskStatusHolder.genDependGraphItems(holderMap.values());
}
public List<DependItem> dependItemListFor(String name) {
Stack<TaskStatusHolder> taskStack = new Stack<>();
taskStack.push(this.outputHolderMap.get(name));
Map<Long, TaskStatusHolder> taskToRun = new HashMap<>();
while (!taskStack.isEmpty()) {
TaskStatusHolder curTask = taskStack.pop();
if (taskToRun.containsKey(curTask.getId())) {
continue;
}
curTask.dependStream()
.filter(depend -> !taskToRun.containsKey(depend.getId()))
.forEach(taskStack::push);
}
return TaskStatusHolder.genDependGraphItems(taskToRun.values());
}
@Override
public synchronized Object getValue(String name) {
return outputValues.get(name);
}
@Override
public synchronized void setValue(String name, Object value) {
outputValues.put(name, value);
}
public static TaskComposer build(Collection<Task> tasks) {
long nextId = 0;
Map<String, TaskStatusHolder> outputTaskMap = new HashMap<>();
Map<Long, TaskStatusHolder> holderMap = new HashMap();
indexAllTask(tasks, nextId, outputTaskMap, holderMap);
completeDepends(outputTaskMap);
return new TaskComposer(holderMap, outputTaskMap);
}
private static void completeDepends(Map<String, TaskStatusHolder> outputHolderMap) {
for (TaskStatusHolder holder : outputHolderMap.values()) {
List<TaskDataDef> inputs = holder.getTask().getInputs();
if (inputs == null || inputs.isEmpty()) {
continue;
}
for (TaskDataDef input : inputs) {
TaskStatusHolder inputCreator = outputHolderMap.get(input.getName());
if (inputCreator != null) {
holder.addDepend(inputCreator);
}
}
}
}
private static void indexAllTask(Collection<Task> tasks, long nextId, Map<String, TaskStatusHolder> outputTaskMap, Map<Long, TaskStatusHolder> holderMap) {
for (Task task : tasks) {
List<TaskDataDef> outputs = task.getOutputs();
if (outputs == null || outputs.isEmpty()) {
throw new IllegalArgumentException("Task output should not be empty. task:" + task);
}
TaskStatusHolder holder = new TaskStatusHolder(nextId++, task);
holderMap.put(holder.getId(), holder);
for (TaskDataDef output : outputs) {
TaskStatusHolder existOutput = outputTaskMap.get(output.getName());
if (existOutput != null) {
String message = MessageFormat.format("Duplicate output:{0} in tasks:{1}, {2}",
output.getName(), existOutput.getTask(), task);
throw new IllegalArgumentException(message);
}
outputTaskMap.put(output.getName(), holder);
}
}
}
public TaskComposer withValue(String name, Object value) {
outputValues.put(name, value);
return this;
}
public TaskExecutor executorFor(String outputName) {
if (outputValues.containsKey(outputName)) {
return new TaskExecutor(Collections.EMPTY_LIST, this);
}
TaskStatusHolder outputHolder = outputHolderMap.get(outputName);
if (outputHolder == null) {
throw new IllegalArgumentException("Failed to execute for no task for output:" + outputName);
}
if (outputHolder.getStatus() == TaskStatus.finished) {
return new TaskExecutor(Collections.EMPTY_LIST, this);
}
Collection<TaskStatusHolder> taskHolderToRun = collectTasksToRun(outputHolder);
return new TaskExecutor(taskHolderToRun, this);
}
public <T> T evaluate(String outputName) {
return executorFor(outputName)
.getDataValue(outputName);
}
private Collection<TaskStatusHolder> collectTasksToRun(TaskStatusHolder outputHolder) {
Map<Long, TaskStatusHolder> existedTaskHolder = new HashMap<>();
Map<String, TaskDataDef> lostInputs = new HashMap<>();
Stack<TaskStatusHolder> holderStack = new Stack<>();
holderStack.push(outputHolder);
while (!holderStack.isEmpty()) {
TaskStatusHolder holder = holderStack.pop();
if (existedTaskHolder.containsKey(holder.getId())) {
continue;
}
holder.setStatus(TaskStatus.waiting);
checkInputExist(holder.getTask().getInputs(), lostInputs);
existedTaskHolder.put(holder.getId(), holder);
List<TaskStatusHolder> depends = holder.getDepends();
if (depends != null && !depends.isEmpty()) {
for (TaskStatusHolder dependHolder : holder.getDepends()) {
if (dependHolder.getStatus() != TaskStatus.finished) {
holderStack.push(dependHolder);
}
}
}
}
if (!lostInputs.isEmpty()) {
throw new MessageRecorderException("These inputs are required:" + lostInputs.values());
}
return existedTaskHolder.values();
}
private void checkInputExist(List<TaskDataDef> inputs, Map<String, TaskDataDef> lostInputs) {
if (inputs == null || inputs.isEmpty()) {
return;
}
for (TaskDataDef def : inputs) {
String inputName = def.getName();
if (outputHolderMap.containsKey(inputName)
|| lostInputs.containsKey(inputName)
|| outputValues.containsKey(inputName)) {
continue;
}
lostInputs.put(inputName, def);
}
}
}