Spark源码解读

所需积分/C币:50 2017-08-09 09:50:30 6.19MB PDF
57
收藏 收藏
举报

Spark源码解读
个RDD可以包含多个分区。每个分区都是一个 datase片段。RDD之间可以相互 依赖 窄依赖:一一对应的关系,一个RDD分区只能被一个子RDD的分区使用的关系 宽依赖:一多对应关系,若多个子RDD分区都依赖同一个父RDD分区 如下RDD图览 原生数据空间 Scala焦合数据焦 5ca标量受型 Hadoop数据 RDD空间 (GroupBy Stage 二二二二==二二=二二二 Sparkcontext MAP union join stage2 age3 输入算子 广播变量 转换算子 缓存算子 累加器 行动算子 在源码 package org, apache. spark rdd RDD中有一些比较中的方法: mplemented by subclasses to return the set of partitions in this RDD. This method will only a be called once, so it is safe to implement a time-consuming computation in it 子类实现返回一组分区在这个RDD。这种方法捋只被调用一次,因此它是安全的它来实现一个 耗时的计算。 protected def getPartitions: Array[Partition 这个方法返回多个 partition,存放在一个数字中 2) 水水 s Implemented by subclasses to return how this rdd depends on parent rdds. This mcthod will on o be called once, so it is safe to implement a time-consuming computation in it 子类实现返回这个RD如何取决于父RDDS。这种方法捋只被调用一次,因此它是安全的,它来 实现一个耗时的计算。 protected def get Dependencies: Seq dependency]]=deps 它返回一个依赖关系的Seq集合 ∴ DeveloperApi∷ s Implcmcntcd by subclasses to compute a givcn partition *子类实现的计算一个给定的分区。 (a Developerapi def compute(split: Partition, context: Task Context): IteratorITI 每个RD都有一个对应的具体计算函数 4) 半 Optionally overridden by subclasses to specify placement preferences protected def gct PrefcrrcdLocations(split: Partition Scql[String]=Nil 获取 partition的首选位置,这是分区策略。 RDD Transformations and action RDD数据操作主要有两个动作: Transformations(转换map(f:T)U):RDD)RDDU filter(r: T) Bool): RDD[T])RDDIT flatMap(f: T)Seq[U]: RDDIT] RDD[U] sample(fraction Float): RDDIT) RDDIT] (Deterministic sampling group Bykcyo: RDD[(K, V])RDDLOK, ScqlvDi reduceByKey(: (V:V))V: RDDI(K, VIRDDIK, V) union(: (RDDIT RDDIT)RDDIT join(: (RDD[(K, V)]; RDDL(K, WD)RDDIK, (V, W)] cogroup(: (RDDI(K, V)]; RDD(K, W])RDDIK, (Seqlv], seqlwDl crossProduct(: (RDDIT]; RDD[UD)RDDIT,U map values(f: V)W): RDD[(K, V])RDDIOK, W)](Preserves partitioning) sort(c: Comparator(KD): RDD(K, VI)RDDI(K, V) partition By(p: ParLitionerKD: RDDIK, VI)RDDICK, V)I Action(动作 counto: RDDIT]) Long collecto: RDDIT]) Seqlt reduce(f: (T; T))T): RDDIT])T lookup(k: K): RDDI(K, VI) Seql vi(on hash/range partitioned RDDs) save(path String): Outputs rdd to a storage system, e. g, HDFS 先看下 Transformations部分 i/ Transformations(return a new RDD) 水 k Return a new rdd by applying a function to all elements of this rdd def map[U: ClassTagI(f:T=>U): RDD[U]=new MappedrDD(this, sc clean(f)) 水 e Return a new rdd by first applying a function to all elements of this RDD, and then flattening the results def flatMap[U: ClassTagI(f: T=> TravcrsablcOncc[U]: RDD[U] new FlatMappedRDD(this, scclean(f)) 水 k Return a ncw rdd containing only thc clements that satisfy a predicate def filter(f: T-> Boolean): RDDIT-new FilteredRDD(this, sc clean(f) Map k Rcturn a ncw Rdd by applying a function to all clements of this rdd def map U: ClassTag(:T=>U): RDD[U]=new MappedRDD(this, sc clean(f) 返回一个 MappedrDD,它继承RDD并重写了两个方法 getPartitions, compute 第一个方法 getPartitions,他获取第一个父RDD,并获取分片数组 override def getPartitions: Array[Partition]-firstParent[T]. partitions 第二个方法 compute,将根据map参数内容来逼历RDD分区 override def compute(split: Partition, context: Task Context) firstParent[T]. iterator(split, context). map(f filter k Return a new rdd containing only the elements that satisfy a predicate def filter(f:T=> Boolean): RDD[]= new FilteredRDD(this, sc clean(f) Filter是一个过滤操作,比如 mardI filter(>1) Union / c Return the union of this rdd and another one. any identical elements will appear multiplc k times(use distinct to eliminate them def union(other: RDDITD: RDD[T]=new UnionRDD(sc, Array(this, other)) 多个RDD组成成一个新RDD,它重写了RDD的5个方法 getPartitions、 getDependen cies、 compute、 getPreferredlocations、 clearDependencies 从 gctPartitions、 gct Dependencies中可以看出它应该是一组宽依赖关系 override def getDependencies: Seq[Dependency[=( val deps=new Array Buffer[Dependency[ lI var pos=0 for(rdd < -)i deps + new RangeDependency(rdd, 0, pos, rdd partitions. size) pos + rdd partitions. size eps groupby /串 k Return an Rdd of grouped items. Each group consists of a key and a sequence of elements mapping to that key s Note: This operation may be very expensive. If you are grouping in order to perform an aggregation(such as a sum or average)over each key, using IIPairrddFunctions. aggregatcBykcylI s or [[PairrDDFunctions reduce By Key]] will provide much better performance def group Byk](f:T=>K(implicit kt: ClassTaglk: RDD[(K, Iterable[T] group Byk(f, defaultPartitioner(this) 根据参数分组,这又产生了一个新的RDD action Count ** Rcturn the number of clements in thc rdd def counto: Long-Sc. runJob(this, Utils. getlteratorSize )sum 跟踪代码,在 runjob方法中调用了 dag Scheduler. runJob。而在 DAGScheduler,捋提 交到作业调度器,并获得 Job Waiter)象返回。该 Job Waiter象可以用来阻塞,直到任务完 成执行或可以用来取消作业。 RDD中的任务调度 RDD Objects DAGScheduler TaskScheduler Worker Cluster manager Threads DAG TaskSet Task Block manager rddl join(rdd) split graph into launch tasks via execute tasks groupBy(-) cluster marager Filter(. stages of tasks submit each retry failed or store and serve build operator DAG stage as ready straggling tasks blacks agnostic to stage doesn't know operators bout stages 从这个图中: RDD Objectp生DAG,然后进入 DAGScheduler阶段: 1、 DAGScheduler是面向 Stage的高层次调度器, DAGScheduler会捋DAG拆分成很 多个 tasks,而一组 tasks就是图中的 stago 2、每一次 shuffle的过程就会产生一个新的 Stage。 DAGScheduler会有RDD记录磁 盘的物 理化操作,为了获得最有 tasks, DAGSchulder会先查找本地 tasks 3、 DAGScheduler还要监控 shuffler生的失败任务,如果还得重 DAGScheduley划分 stage后,会以 TaskSet为单位把任务提交给 Task scheduler 1一个 TaskScheduler只为一个 sparkConext服务。 2当接收到 Task Set后,它会把任务提交给 Worker节点的 Executor中去沄行。失败 的任务 由 Task Scheduler监控重唇。 Executor是以多线程的方式运行,每个线程都负责一个任务。 接下来跟踪一个 spark提供的例子源码 dRispackage org apache. spark examples. Spark Pi def main(args: Array[]( ∥设置一个应用名称(用于在WebU中显示) val conf=new SparkConf().setAppName( "Spark Pi") 实例化一个 Spark Context val spark- new Spark Context(conf) 转成数据 val slices=if (args length>0)args(O).tolnt clsc 2 val n= 100000 slices val count-spark parallelize(l to n, slices ). map i val x= random 2 val y= random * 2-1 if(x*x+yy<1)1 else 0 reduce println (Pi is roughly +4.0>count /n) spark. stop 代码中的 parallelize是一个并行化的延迟加载,跟踪源码 /*x Distribute a local scala collection to form an rdd *从RDD中分配一个本地的 scala集合 * (@note Parallelize acts lazily. If seq is a mutable collection and s altered after the call to parallelize and before the first action on the RDD, the resultant RDD will reflect the modified collection. Pass a copy of the argument to avoid this def parallelize: ClassTagI(seq Seqlt], numSlices: Int= defaultParallelism) RIDIT=i new ParallelCollectionRDD[](this, seq, num Slices, Map[Int, Seq[stringJJ0) 它调用了RDD中的map,上面说过的map是一个转换过程,将生成一个新的RDD 。最后 reduce 在she中弄一个单词统计例子: scala> val rdd=sc text Filc(hdfs: //192.168.0.245: 8020/test/README. md" 14/12/1801: 12: 26 INFO Storage Memory Store: ensure FreeSpace(82180)called with curMem-331133. maxMem-280248975 14/12/1801: 12: 26 INFO Storage Memory Store: Block broadcast 3 stored as values in memory(estimated size 80.3 KB, free 266.9 MB) rdd: org. apache. sparkrdd RDD[String] hdfs: //192.168.0.245: 8020/tcst/README. md Mappcdrdd[7] at textFile at <console>: 12 scala> rdd. toDebug String 14/12/1801: 12: 29 INFO mapred FilcInput Format: Total input paths to proccss: 1 res: String (1) hdfs: //192.168.0.245: 8020/test/README. md MappedrDD[7] at textFile at <console>: 12 hdfs: //192.168.0.245: 8020/test/README. md hadooprdd6 at textFile at console> 12 Sc是从hdfs中读取数据,那在 debug String中他就转换成了 HadoopRDd scala>val result= rdd flatMap(, split("). map(, I)). reduce ByKey(+).collect 14/12/1801: 14:51 INFO Spark Spark Context: Starting job: collect at <console>: 14 14/12/1801: 14: 51 INFO Scheduler. DAGScheduler: Registering RDD 9(map at <console>: 14 14/12/1801: 14: 51 INFO Scheduler. DAGScheduler: Got job0(collect at console>: 14)with 1 output partitions(allow Local-false) 14/12/1801: 14: 51 INFO Scheduler. DAGScheduler Final stage Stage O(collect at <console>: 14 14/12/1801: 14: 51 INFO Scheduler. DAGScheduler: Parents of final stage: List(Stage 1) 14/12/1801: 14:51 INFO Scheduler. DAGScheduler: Missing parents: List(Stage 1) 14/12/1801: 14: 51 INFO scheduler. DAGScheduler: Submitting Stage 1 (MappedrDD[9] at map at <console>: 14), which has no missing parents 14/12/1801: 14: 51 INFO Storage Memory Store: ensure FreeSpace(3440) called with curren=413313, maxIe=280248975 14/12/1801: 14: 51 INFO Storage Memory Store: Block broadcast 4 stored as values in memory(estimated size 3. 4 KB, free 266.9 MB) 14/12/ 1801: 14: 51 INFO Scheduler. DAGScheduler: Submitting l missing tasks from Stage 1(MappedRDD[] at map at <console>: 14) 14/12/1801: 14:51 INFO scheduler Task Schedulerlmpl: Adding task set 1.0 with 1 tasks 14/12/1801: 14: 51 INFO scheduler. TaskSetManager: Starting task 0.0 in stage 1.0 (TID O, localhost, ANY, 1185 bytes) 14/12/1801: 14: 51 INFO executor Executor: Running task 0.0 in stage 1.0(TID 0) 14/12/1801: 14: 51 INFO rdd. HadoopRDD: Input split hdfs://192.168.0.245:8020/test/ README, md:0+4811 14/12/1801: 14:51 INFO Configuration deprecation: mapred tip id is deprecated Instead use mapreduce task id 14/12/1801: 14: 51 INFO Configuration deprecation: mapred. task id is deprecated Instead use mapreduce task attempt. id 14/12/1801: 14: 51 INFO Configuration deprecation: mapred task is. map is deprecated. Instead, use mapreduce task. ismap 14/12/1801: 14: 51 INFO Configuration. deprecation mapred task partition is deprecated. Instcad, usc mapreduce task partition

...展开详情
试读 127P Spark源码解读
立即下载 身份认证后 购VIP低至7折
一个资源只可评论一次,评论内容不能少于5个字
您会向同学/朋友/同事推荐我们的CSDN下载吗?
谢谢参与!您的真实评价是我们改进的动力~
上传资源赚钱or赚积分
最新推荐
Spark源码解读 50积分/C币 立即下载
1/127
Spark源码解读第1页
Spark源码解读第2页
Spark源码解读第3页
Spark源码解读第4页
Spark源码解读第5页
Spark源码解读第6页
Spark源码解读第7页
Spark源码解读第8页
Spark源码解读第9页
Spark源码解读第10页
Spark源码解读第11页
Spark源码解读第12页
Spark源码解读第13页
Spark源码解读第14页
Spark源码解读第15页
Spark源码解读第16页
Spark源码解读第17页
Spark源码解读第18页
Spark源码解读第19页
Spark源码解读第20页

试读结束, 可继续阅读

50积分/C币 立即下载