没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
Spark 常用算子实现原理详解
Spark 常用算子实现原理详解...............................................................................................................................................1
1. take(num:Int)...............................................................................................................................................................2
2. first()............................................................................................................................................................................4
3. sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)..................................................4
4. count()........................................................................................................................................................................13
5. countApprox( timeout: Long, confidence: Double = 0.95)......................................................................................13
6. countApproxDistinct(relativeSD: Double = 0.05)....................................................................................................17
7. collect()......................................................................................................................................................................18
8. toLocalIterator...........................................................................................................................................................18
9. takeOrdered(num: Int)...............................................................................................................................................20
10. aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U)...........................................21
11. fold(zeroValue: T)....................................................................................................................................................22
12. treeAggregate...........................................................................................................................................................23
13. reduce(f: (T, T) => T)..............................................................................................................................................25
14. max()........................................................................................................................................................................26
15. min().........................................................................................................................................................................27
16. treeReduce(f: (T, T) => T).......................................................................................................................................27
17. map[U: ClassTag](f: T => U)..................................................................................................................................28
18. mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false).....................29
19. mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean =
false)...............................................................................................................................................................................30
20. flatMap[U: ClassTag](f: T => TraversableOnce[U])..............................................................................................31
21. filter(f: T => Boolean).............................................................................................................................................31
22. combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C,
partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null)............................................33
23. distinct()...................................................................................................................................................................40
24. groupByKey(partitioner: Partitioner)......................................................................................................................41
25. aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int).........................................................................42
26. coalesce(numPartitions: Int, shuffle: Boolean = false)...........................................................................................44
27. repartition(numPartitions: Int).................................................................................................................................51
28. sample( withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong)............................52
29. takeSample( withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong)..................................54
30. randomSplit( weights: Array[Double], seed: Long = Utils.random.nextLong)......................................................56
31. union(other: RDD[T])..............................................................................................................................................58
32. ++(other: RDD[T])..................................................................................................................................................63
33. intersection(other: RDD[T])....................................................................................................................................63
34. glom.........................................................................................................................................................................68
35. cartesian[U: ClassTag].............................................................................................................................................69
36. zip[U: ClassTag](other: RDD[U])...........................................................................................................................70
37. zipPartitions.............................................................................................................................................................73
38. zipWithIndex().........................................................................................................................................................73
39. zipWithUniqueId.....................................................................................................................................................75
40. foreach(f: T => Unit)...............................................................................................................................................76
41. foreachPartition(f: Iterator[T] => Unit)...................................................................................................................77
42. subtract( other: RDD[T],p: Partitioner)...................................................................................................................77
43. keyBy[K] (f: T => K)..............................................................................................................................................81
1. take(num:Int)
获取前 num 条记录。
def take(num: Int): Array[T] = withScope {
if (num == 0) {
new Array[T](0)
} else {
val buf = new ArrayBuffer[T]
val totalParts = this.partitions.length
var partsScanned = 0
while (buf.size < num && partsScanned < totalParts) {
// The number of partitions to try in this iteration. It is ok for this number to be
// greater than totalParts because we actually cap it at totalParts in runJob.
var numPartsToTry = 1
if (partsScanned > 0) {
// If we didn't find any rows after the previous iteration, quadruple and retry.
// Otherwise, interpolate the number of partitions we need to try, but overestimate
// it by 50%. We also cap the estimation in the end.
if (buf.size == 0) {//截止目前为止 buf 为空的话,则扩大 4 倍范围
numPartsToTry = partsScanned * 4
} else {//截止目前为止还有部分值没取到的话,则扩大至 Math.max((1.5 * num * partsScanned / buf.size).toInt
- partsScanned, 1),但是不超过当前已扫描过分区的 4 倍
// the left side of max is >=1 whenever partsScanned >= 2
numPartsToTry = Math.max((1.5 * num * partsScanned / buf.size).toInt - partsScanned, 1)
numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)
}
}
val left = num - buf.size
val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts)
val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p, allowLocal = true)
res.foreach(buf ++= _.take(num - buf.size))
partsScanned += numPartsToTry
}
buf.toArray
}
}
首先关注下 sc.runJob 函数的传参:
/**
* Run a job on a given set of partitions of an RDD, but take a function of type
* `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`.
*/
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: Iterator[T] => U,
partitions: Seq[Int],
allowLocal: Boolean
): Array[U] = {
val cleanedFunc = clean(func)
runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions, allowLocal)
}
其中 partitions: Seq[Int]代表需要计算的分区,可以计算某个分区,也可以计算多个分区,是待计算的分区集合。
其次先看第一次循环,其 partsScanned 为 0,numPartsToTry 为 1,因此先计算第一个分区的结果,如果第一次计
算可以取得满足条件的 num 个值,则循环结束,如果取不到满足条件的 num 个值,则扩大第二次计算的分区范
围,很可能一下子扫多个分区。
其执行过程见下图:
Take 可以避免全量计算,执行时间比较短。
2. first()
取 RDD 的第一个元素
/**
* Return the first element in this RDD.
*/
def first(): T = withScope {
take(1) match {
case Array(t) => t
case _ => throw new UnsupportedOperationException("empty collection")
}
}
其实就是调用 take 来完成的,take 的流程可以查阅 take 函数详解
3. sortByKey(ascending: Boolean = true, numPartitions: Int =
self.partitions.length)
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)] = self.withScope
{
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V](self, part)
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
}
sortByKey 其实就是根据父 RDD 生成 ShuffledRDD 的过程,其分区函数为范围分区 RangePartitioner,执行过程
如下:
父 RDD 的每个 分区按照分 区函数 RangePartitioner 将 每个分区的 数据划分为 多个分区的 数据,然后
ShuffledRDD 拉取自己对应分区的数据。但是 sortByKey 主要应该掌握其 RangePartitioner 分区函数的执行原理,
它如何保证 ShuffledRDD 的每个分区的数量是大致相同的,也就是如何来划分每个分区的边界的,且看:
class RangePartitioner[K : Ordering : ClassTag, V](
@transient partitions: Int,
@transient rdd: RDD[_ <: Product2[K, V]],
private var ascending: Boolean = true)
extends Partitioner {
// We allow partitions = 0, which happens when sorting an empty RDD under the default settings.
require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.")
private var ordering = implicitly[Ordering[K]]
// An array of upper bounds for the first (partitions - 1) partitions
前(
partitions - 1
)的分区边界
private var rangeBounds: Array[K] = {
if (partitions <= 1) {
Array.empty
} else {
// This is the sample size we need to have roughly balanced output partitions, capped at 1M.
val sampleSize = math.min(20.0 * partitions, 1e6)
// Assume the input partitions are roughly balanced and over-sample a little bit.
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt
// numItems
相当于记录
rdd
元素的总数
// sketched
的类型是
Array[(Int, Int, Array[K])]
,记录的是分区的编号、该分区中总元素的个数以及从父
RDD
中每个分区采样的
数据
剩余63页未读,继续阅读
资源评论
亮亮-AC米兰
- 粉丝: 232
- 资源: 8
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功