SparkContext的初始化的初始化
SparkContext的初始化步骤如下:
1 创建创建Spark执行环境执行环境SparkEnv
1.2 什么是什么是SparkEnv??
SparkEnv是Spark的执行环境对象,其中包括许多与Executor执行相关的对象,所以SparkEnv存在于需要创建Executor的进
程中。那么需要创建Executor的进程有哪些呢?
在local模式下,Driver进程中会创建Executor。
在local-cluster模式或Standalone模式下,Worker中的CoarseGrainedExecutorBackend进程中也会创建Executor。
综上,SparkEnv存在于Driver或CoarseGrainedExecutorBackend进程中。
1.3 SparkEnv的构造步骤的构造步骤
创建SparkEnv,主要使用SparkEnv的createDriverEnv。
private[spark] def createDriverEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus,
numCores: Int,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv{...}
其中conf是SparkConf的复制,idLocal标识是否是单机模式,listenerBus采用监听器模式维护各类事件的处理。
SparkEnv.createDriverEnv方法最终调用SparkEnv.create方法创建SparkEnv。
在SparkEnv.create方法的,SparkEnv的构造步骤如下:
创建安全管理器SecurityManager;
//step1:创建安全管理器
val securityManager = new SecurityManager(conf, ioEncryptionKey)
if (isDriver) {
securityManager.initializeAuth()
}
创建基于Netty的分布式消息系统ActorSystem;(Spark1.6之前使用的Akka)
Netty、Akka待学习,内容转。
val systemName = if (isDriver) driverSystemName else executorSystemName
val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
securityManager, numUsableCores, !isDriver)
2 创建创建RDD清理器清理器metadataCleaner
3 创建并初始化创建并初始化Spark UI
4 Hadoop相关配置及相关配置及Executor环境变量的配置环境变量的配置
5 创建任务调度创建任务调度TaskScheduler
TaskScheduler负责提交任务,请求集群管理器调度任务。
5.1 createTaskScheduler
创建TaskScheduler的源代码源代码为SparkContext.createTaskScheduler,如下所示。该方法会根据master的配置匹配部署模式,
每种部署模式中都会创建两个类(TaskSchedulerImpl、SchedulerBackend)的实例,只是TaskSchedulerImpl都相
同,SchedulerBackend不同。
/**
* Create a task scheduler based on a given master URL.
* Return a 2-tuple of the scheduler backend and the task scheduler.
*/
private def createTaskScheduler(
sc: SparkContext,
master: String,
deployMode: String): (SchedulerBackend, TaskScheduler) = {
import SparkMasterRegex._
// When running locally, don't try to re-execute tasks on failure.
val MAX_LOCAL_TASK_FAILURES = 1
master match {
case "local" =>
评论10