没有合适的资源?快使用搜索试试~ 我知道了~
SparkContext的初始化步骤如下: 1 创建Spark执行环境SparkEnv 1.2 什么是SparkEnv? SparkEnv是Spark的执行环境对象,其中包括许多与Executor执行相关的对象,所以SparkEnv存在于需要创建Executor的进程中。那么需要创建Executor的进程有哪些呢? 在local模式下,Driver进程中会创建Executor。 在local-cluster模式或Standalone模式下,Worker中的CoarseGrainedExecutorBackend进程中也会创建Executor。 综上,SparkEnv存在于Driver或Coar
资源详情
资源评论
资源推荐
SparkContext的初始化的初始化
SparkContext的初始化步骤如下:
1 创建创建Spark执行环境执行环境SparkEnv
1.2 什么是什么是SparkEnv??
SparkEnv是Spark的执行环境对象,其中包括许多与Executor执行相关的对象,所以SparkEnv存在于需要创建Executor的进
程中。那么需要创建Executor的进程有哪些呢?
在local模式下,Driver进程中会创建Executor。
在local-cluster模式或Standalone模式下,Worker中的CoarseGrainedExecutorBackend进程中也会创建Executor。
综上,SparkEnv存在于Driver或CoarseGrainedExecutorBackend进程中。
1.3 SparkEnv的构造步骤的构造步骤
创建SparkEnv,主要使用SparkEnv的createDriverEnv。
private[spark] def createDriverEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus,
numCores: Int,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv{...}
其中conf是SparkConf的复制,idLocal标识是否是单机模式,listenerBus采用监听器模式维护各类事件的处理。
SparkEnv.createDriverEnv方法最终调用SparkEnv.create方法创建SparkEnv。
在SparkEnv.create方法的,SparkEnv的构造步骤如下:
创建安全管理器SecurityManager;
//step1:创建安全管理器
val securityManager = new SecurityManager(conf, ioEncryptionKey)
if (isDriver) {
securityManager.initializeAuth()
}
创建基于Netty的分布式消息系统ActorSystem;(Spark1.6之前使用的Akka)
Netty、Akka待学习,内容转。
val systemName = if (isDriver) driverSystemName else executorSystemName
val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
securityManager, numUsableCores, !isDriver)
2 创建创建RDD清理器清理器metadataCleaner
3 创建并初始化创建并初始化Spark UI
4 Hadoop相关配置及相关配置及Executor环境变量的配置环境变量的配置
5 创建任务调度创建任务调度TaskScheduler
TaskScheduler负责提交任务,请求集群管理器调度任务。
5.1 createTaskScheduler
创建TaskScheduler的源代码源代码为SparkContext.createTaskScheduler,如下所示。该方法会根据master的配置匹配部署模式,
每种部署模式中都会创建两个类(TaskSchedulerImpl、SchedulerBackend)的实例,只是TaskSchedulerImpl都相
同,SchedulerBackend不同。
/**
* Create a task scheduler based on a given master URL.
* Return a 2-tuple of the scheduler backend and the task scheduler.
*/
private def createTaskScheduler(
sc: SparkContext,
master: String,
deployMode: String): (SchedulerBackend, TaskScheduler) = {
import SparkMasterRegex._
// When running locally, don't try to re-execute tasks on failure.
val MAX_LOCAL_TASK_FAILURES = 1
master match {
case "local" =>
weixin_38674050
- 粉丝: 5
- 资源: 981
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功
评论10