/*
* Copyright 2014 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package azkaban.executor;
import static java.util.Objects.requireNonNull;
import azkaban.Constants;
import azkaban.Constants.ConfigurationKeys;
import azkaban.alert.Alerter;
import azkaban.event.EventHandler;
import azkaban.executor.selector.ExecutorComparator;
import azkaban.executor.selector.ExecutorFilter;
import azkaban.executor.selector.ExecutorSelector;
import azkaban.flow.FlowUtils;
import azkaban.metrics.CommonMetrics;
import azkaban.project.Project;
import azkaban.project.ProjectWhitelist;
import azkaban.utils.AuthenticationUtils;
import azkaban.utils.FileIOUtils.JobMetaData;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
import azkaban.utils.Props;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.Thread.State;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
/**
* Executor manager used to manage the client side job.
*/
@Singleton
public class ExecutorManager extends EventHandler implements
ExecutorManagerAdapter {
private static final String SPARK_JOB_TYPE = "spark";
private static final String APPLICATION_ID = "${application.id}";
// The regex to look for while fetching application ID from the Hadoop/Spark job log
private static final Pattern APPLICATION_ID_PATTERN = Pattern
.compile("application_\\d+_\\d+");
// The regex to look for while validating the content from RM job link
private static final Pattern FAILED_TO_READ_APPLICATION_PATTERN = Pattern
.compile("Failed to read the application");
private static final Pattern INVALID_APPLICATION_ID_PATTERN = Pattern
.compile("Invalid Application ID");
private static final int DEFAULT_MAX_ONCURRENT_RUNS_ONEFLOW = 30;
// 12 weeks
private static final long DEFAULT_EXECUTION_LOGS_RETENTION_MS = 3 * 4 * 7
* 24 * 60 * 60 * 1000L;
private static final Duration RECENTLY_FINISHED_LIFETIME = Duration.ofMinutes(10);
private static final Logger logger = Logger.getLogger(ExecutorManager.class);
private final AlerterHolder alerterHolder;
private final Props azkProps;
private final CommonMetrics commonMetrics;
private final ExecutorLoader executorLoader;
private final CleanerThread cleanerThread;
private final ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningFlows =
new ConcurrentHashMap<>();
private final ExecutingManagerUpdaterThread executingManager;
private final ExecutorApiGateway apiGateway;
private final int maxConcurrentRunsOneFlow;
QueuedExecutions queuedFlows;
File cacheDir;
//make it immutable to ensure threadsafety
private volatile ImmutableSet<Executor> activeExecutors = null;
private QueueProcessorThread queueProcessor;
private volatile Pair<ExecutionReference, ExecutableFlow> runningCandidate = null;
private long lastCleanerThreadCheckTime = -1;
private long lastThreadCheckTime = -1;
private String updaterStage = "not started";
private List<String> filterList;
private Map<String, Integer> comparatorWeightsMap;
private long lastSuccessfulExecutorInfoRefresh;
private ExecutorService executorInforRefresherService;
@Inject
public ExecutorManager(final Props azkProps, final ExecutorLoader loader,
final AlerterHolder alerterHolder,
final CommonMetrics commonMetrics,
final ExecutorApiGateway apiGateway) throws ExecutorManagerException {
this.alerterHolder = alerterHolder;
this.azkProps = azkProps;
this.commonMetrics = commonMetrics;
this.executorLoader = loader;
this.apiGateway = apiGateway;
this.setupExecutors();
this.loadRunningFlows();
this.queuedFlows = new QueuedExecutions(
azkProps.getLong(Constants.ConfigurationKeys.WEBSERVER_QUEUE_SIZE, 100000));
// The default threshold is set to 30 for now, in case some users are affected. We may
// decrease this number in future, to better prevent DDos attacks.
this.maxConcurrentRunsOneFlow = azkProps
.getInt(Constants.ConfigurationKeys.MAX_CONCURRENT_RUNS_ONEFLOW,
DEFAULT_MAX_ONCURRENT_RUNS_ONEFLOW);
this.loadQueuedFlows();
this.cacheDir = new File(azkProps.getString("cache.directory", "cache"));
this.executingManager = new ExecutingManagerUpdaterThread();
if (isMultiExecutorMode()) {
setupMultiExecutorMode();
}
final long executionLogsRetentionMs =
azkProps.getLong("execution.logs.retention.ms",
DEFAULT_EXECUTION_LOGS_RETENTION_MS);
this.cleanerThread = new CleanerThread(executionLogsRetentionMs);
}
public void start() {
this.executingManager.start();
this.cleanerThread.start();
if (isMultiExecutorMode()) {
this.queueProcessor.start();
}
}
private String findApplicationIdFromLog(final String logData) {
final Matcher matcher = APPLICATION_ID_PATTERN.matcher(logData);
String appId = null;
if (matcher.find()) {
appId = matcher.group().substring(12);
}
this.logger.info("Application ID is " + appId);
return appId;
}
private void setupMultiExecutorMode() {
// initliatize hard filters for executor selector from azkaban.properties
final String filters = this.azkProps
.getString(Constants.ConfigurationKeys.EXECUTOR_SELECTOR_FILTERS, "");
if (filters != null) {
this.filterList = Arrays.asList(StringUtils.split(filters, ","));
}
// initliatize comparator feature weights for executor selector from
// azkaban.properties
final Map<String, String> compListStrings = this.azkProps
.getMapByPrefix(Constants.ConfigurationKeys.EXECUTOR_SELECTOR_COMPARATOR_PREFIX);
if (compListStrings != null) {
this.comparatorWeightsMap = new TreeMap<>();
for (final Map.Entry<String, String> entry : compListStrings.entrySet()) {
this.comparatorWeightsMap.put(entry.getKey(), Integer.valueOf(entry.getValue()));
}
}
this.executorInforRefresherService =
Executors.newFixedThreadPool(this.azkProps.getInt(
Constants.ConfigurationKeys.EXECUTORINFO_REFRESH_MAX_THREADS, 5));
// configure queue processor
this.queueProcessor =
new QueueProcessorThread(
this.azkProps.getBoolean(Constants.ConfigurationKeys.QUEUEPROCESSING_ENABLED, true),
this.azkProps.getLong(Constants.ConfigurationKeys.ACTIVE_EXECUTOR_REFRESH_IN_MS, 50000),
this.azkProps.getInt(
Co
评论0