package com.alibaba.datax.core.job;
import com.alibaba.datax.common.constant.PluginType;
import com.webank.wedatasphere.exchangis.datax.common.constant.TransportType;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.AbstractJobPlugin;
import com.alibaba.datax.common.plugin.JobPluginCollector;
import com.alibaba.datax.common.plugin.PluginProcessorLoader;
import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.statistics.PerfTrace;
import com.alibaba.datax.common.statistics.VMInfo;
import com.alibaba.datax.common.util.Configuration;
import com.webank.wedatasphere.exchangis.datax.common.GsonUtil;
import com.alibaba.datax.common.util.StrUtil;
import com.alibaba.datax.core.AbstractContainer;
import com.alibaba.datax.core.Engine;
import com.alibaba.datax.core.container.util.HookInvoker;
import com.alibaba.datax.core.container.util.JobAssignUtil;
import com.webank.wedatasphere.exchangis.datax.core.job.meta.MetaSchema;
import com.alibaba.datax.core.job.scheduler.AbstractScheduler;
import com.alibaba.datax.core.job.scheduler.processinner.StandAloneScheduler;
import com.webank.wedatasphere.exchangis.datax.core.processor.loader.JavaSrcUtils;
import com.alibaba.datax.core.statistics.communication.Communication;
import com.alibaba.datax.core.statistics.communication.CommunicationTool;
import com.alibaba.datax.core.statistics.container.communicator.AbstractContainerCommunicator;
import com.alibaba.datax.core.statistics.container.communicator.job.StandAloneJobContainerCommunicator;
import com.alibaba.datax.core.statistics.plugin.DefaultJobPluginCollector;
import com.alibaba.datax.core.util.ClassUtil;
import com.alibaba.datax.core.util.ErrorRecordChecker;
import com.alibaba.datax.core.util.FrameworkErrorCode;
import com.alibaba.datax.core.util.HttpClientUtil;
import com.alibaba.datax.core.util.container.ClassLoaderSwapper;
import com.alibaba.datax.core.util.container.CoreConstant;
import com.alibaba.datax.core.util.container.LoadUtil;
import com.alibaba.datax.dataxservice.face.domain.enums.ExecuteMode;
import com.alibaba.fastjson.JSON;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.FileFileFilter;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.Validate;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.*;
/**
* Created by jingxing on 14-8-24.
* <p/>
* job实例运行在jobContainer容器中,它是所有任务的master,负责初始化、拆分、调度、运行、回收、监控和汇报
* 但它并不做实际的数据同步操作
*/
public class JobContainer extends AbstractContainer {
private static final Logger LOG = LoggerFactory
.getLogger(JobContainer.class);
private ClassLoaderSwapper classLoaderSwapper = ClassLoaderSwapper
.newCurrentThreadClassLoaderSwapper();
private long jobId;
private String readerPluginName;
private String[] writerPluginNames;
/**
* reader和writer jobContainer的实例
*/
private Reader.Job jobReader;
private List<Writer.Job> jobWriters = new ArrayList<>();
private long startTimeStamp;
private long endTimeStamp;
private long startTransferTimeStamp;
private long endTransferTimeStamp;
private int needChannelNumber;
private int totalStage = 1;
private ErrorRecordChecker errorLimit;
private TransportType transportType;
private AbstractScheduler taskGroupScheduler;
public JobContainer(Configuration configuration) {
super(configuration);
//Adjust configuration
adjustJobConfiguration(this.configuration);
this.transportType = TransportType.valueOf(
this.configuration.getString(CoreConstant.DATAX_JOB_SETTING_TRANSPORT_TYPE, "record").toUpperCase());
errorLimit = new ErrorRecordChecker(configuration);
}
/**
* jobContainer主要负责的工作全部在start()里面,包括init、prepare、split、scheduler、
* post以及destroy和statistics
*/
@Override
public void start() {
LOG.info("DataX jobContainer starts job.");
boolean hasException = false;
boolean isDryRun = false;
try {
this.startTimeStamp = System.currentTimeMillis();
isDryRun = configuration.getBool(CoreConstant.DATAX_JOB_SETTING_DRYRUN, false);
if (isDryRun) {
LOG.info("jobContainer starts to do preCheck ...");
this.preCheck();
} else {
// userConf = configuration.clone();
LOG.debug("jobContainer starts to do preHandle ...");
this.preHandle();
LOG.debug("jobContainer starts to do init ...");
this.init();
if(configuration.getBool(CoreConstant.DATAX_JOB_SETTING_SYNCMETA, false)){
LOG.info("jobContainer starts to do syncMetaData ...");
this.syncMetaData();
}
LOG.info("jobContainer starts to do prepare ...");
this.prepare();
LOG.info("jobContainer starts to do split ...");
this.totalStage = this.split();
if(this.totalStage > 0) {
LOG.info("jobContainer starts to do schedule ...");
this.schedule();
}
LOG.debug("jobContainer starts to do post ...");
this.post();
LOG.debug("jobContainer starts to do postHandle ...");
this.postHandle();
LOG.info("DataX jobId [{}] completed successfully.", this.jobId);
this.invokeHooks();
}
} catch (Throwable e) {
LOG.error("Exception when job run", e);
hasException = true;
if (e instanceof OutOfMemoryError) {
try {
this.destroy();
}catch(Exception e1){
//ignore
}
System.gc();
}
if (super.getContainerCommunicator() == null) {
// 由于 containerCollector 是在 scheduler() 中初始化的,所以当在 scheduler() 之前出现异常时,需要在此处对 containerCollector 进行初始化
AbstractContainerCommunicator tempContainerCollector;
// standalone
tempContainerCollector = new StandAloneJobContainerCommunicator(configuration);
super.setContainerCommunicator(tempContainerCollector);
}
Communication communication = super.getContainerCommunicator().collect();
// 汇报前的状态,不需要手动进行设置
// communication.setState(State.FAILED);
communication.setThrowable(e);
communication.setTimestamp(this.endTimeStamp);
Communication tempComm = new Communication();
tempComm.setTimestamp(this.startTransferTimeStamp);
Communication reportCommunication = CommunicationTool.getReportCommunication(communication, tempComm, this.totalStage);
super.getContainerCommunicator().report(reportCommunication);
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR, e);
} finally {
if (!isDryRun) {
this.destroy();
this.endTimeStamp = System.currentTimeMillis();
if (!hasException) {
//最后打印cpu的平均消耗,GC的统计
VMInfo vmInfo = VMInfo.getVmInfo();
if (vmInfo != null) {
vmInfo.getDelta(fal
评论0