没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
第一部分 Spark RDD API 参考示例
1、aggregate
原型
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp:
(U, U) => U): U
含义
aggregate 是一个聚合函数,一个 RDD 分区后,产生多个 Partition,在
aggregate 中需要指定两个处理函数,第一个函数用于对每个分区内部处理,
第二个函数用于分区之间的处理。aggregate 的初始值作用于前后两个部
分,
分区是严格按照顺序的,顺序不同 aggregate 结果就不同。
示例
//分区顺序严重 aggregate 的结果,分区特点,先平均分配,多的依次放到序号大的分区
val z = sc.parallelize(List(1,2,3,4,5,6), 2)
//使用 parallelize 产生 2 个分区,可以使用 mapPartitionWithIndex 查看分区
//分区结果是(1,2,3)和(4,5,6)
z.aggregate(0)(math.max,_+_)
res0: Int = 9
特别注意:初始值 0 是加在分区的左边
//第一个参数 0 表示将 0 放入到每个分区中,然后每个分区进行计算。每个分区之间汇总
时,再次将 0 放入其中计算
//math.max 表示对每个分区内部取最大值,第一个分区有(1,2,3),再加上 0,所以
是(0,1,2,3)取最大值
//_+_表示每个分区之间进行相加
val x = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3)
x.aggregate(5)(math.max,_+_)
res1: Int = 25
//分析三个分区为(1,2,3),(4,5,6),(7,8,9)其中 aggregate 的初始值为 5
//每次分区计算数据为(5,1,2,3),(5,4,5,6),(5,7,8,9),分别取最大值,得到(5,6,9)
//分区之间使用 _+_ 计算时,再次将 5,放入其中,所以变成(5,5,6,9)
//最后将其相加变成 5+5+6+9=25
//字符串类型处理
val z = sc.parallelize(List("a","b","c","d","e","f"),2)
z.aggregate("x")(_ + _, _+_)
res2: String = xxdefxabc
//第一个分区集合中有("x","a","b","c") 分区内部的聚合结果为"xabc"
//由于是并行的聚合,所以"xabc"和"xdef"谁在前面和后面并不知道
//分区之间进行聚合时,集合变成了("x","xabc","xdef")
//两个高级的示例
val z = sc.parallelize(List("12","23","345","4567"),2)
z.aggregate("")((x,y) => math.max(x.length, y.length).toString, _+_)
res3: String = 42
//分为两个区("12","23")和("345","4567")
//由于需要使用两个函数,所以这里定义了一个闭包函数,集合中的每两个元素,取出最
大的长度。
//每个分区进行比较时集合为("","12","34")、("","345","4567")
//每个分区聚合之后的结果为("2","4")
//分区之间进行聚合时,集合为("","2","4"),再次聚合结构可能是"24"也可能是"42"
val z = sc.parallelize(List("12","23","345","4567"),2)
z.aggregate("asd")((x,y) => math.min(x.length, y.length).toString, _+_)
res4: String = asd11
//分区后的结果为("12","23") ("345","456")
//分区内部应用初始值之后的结果为("asd","12","34") ("asd","345","456")
//每个分区中有三个元素,每个分区内部需要多次聚合,分为("asd","12")和("34")
//("asd","12")聚合结果是"2", 再与("34")聚合结果是"1"
//所以("asd","12","34") 聚合结果是"1"
//同理("asd","345","456") 聚合结果是"1"
//两个分区之间再次聚合,将初始值应用到其中,得到"asd11"
val z = sc.parallelize(List("12","23","345",""),2)
z.aggregate("")((x,y) => math.min(x.length, y.length).toString,_+_)
res5: String = 10
//分区后的结果为("12","23") ("345","")
//分区内部应用初始值之后的结果为("","12","23") ("","345","")
//有三个元素,需要多次聚合("","12") ("23")
//("","12")聚合结果是"0" 再与("23")聚合,得到的结果是"1"
//("","345")聚合结果是"0" 再与("")聚合,得到的结果是"0"
//最后两次结果相加,得到的是"10"
特别注意:为了说明分区顺序的重要性,请看下面的例子
val z = sc.parallelize(List("12","23","","345"),2)
z.aggregate("")((x,y) => math.min(x.length, y.length).toString,_+_)
res6: String = 11
//分区后的结果为("12","23") ("","345")
//分区内部应用初始值之后的结果为("","12","23") ("","","345")
//有三个元素,需要多次聚合("","12") ("23")
//("","12")聚合结果是"0" 再与("23")聚合,得到的结果是"1"
//("","")聚合结果是"0" 再与("345")聚合,得到的结果是"1"
//最后两次结果相加,得到的是"11"
2、aggregateByKey[Pair]
原型
def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ⇒ U, combOp: (U, U)
⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]
含义
aggregateByKey 也是一个聚合函数,一个 RDD 分区后,产生多个 Partition,
在 aggregateByKey 中需要指定两个处理函数,第一个函数用于对每个分区内
部处理,第二个函数用于分区之间的处理。aggregateByKey 的初始值只作用
于每个分区内部,不影响分区之间聚合,
分区是严格按照顺序的,顺序不同
aggregateByKey 结果就不同。
示例
val z = sc.parallelize(List( ("cat",2), ("cat", 5), ("pig", 4),("cat",
12), ("dog", 12)), 2)
z.aggregateByKey(0)(math.max(_, _), _ + _).collect
res1: Array((dog,12), (pig,4), (cat,17))
//分区之后的结果是(("cat",2),("cat",5))
(("pig",4),("cat",12),("dog",12))
//应用 aggregate 的初始值后,第二个分区为:
//([("pig",0),
("pig","4")],[("cat",0),("cat",12)],[("dog",0),("dog",12)])
//使用 math.max 函数后,结果为(("pig","4"),("cat",12),("dog",12))
//如果使用 math.min 函数后,结果就为(("pig",0),("cat",0),("dog",0))
//使用函数_+_聚合时,不会应用初始值
z.aggregateByKey(100)(math.max(_, _), _ + _).collect
res2: Array((dog,100), (pig,100), (cat,200))
3、cartesian
原型
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]
含义
cartesian 笛卡尔积,两个 RDD 中的元素两两组合
示例
val x = sc.parallelize(List(1,2,3,4,5))
val y = sc.parallelize(List(6,7,8,9,10))
x.cartesian(y).collect
//结果是两者由小到大的顺序排列
res0: Array[(Int, Int)] = Array((1,6), (1,7), (1,8), (1,9), (1,10),
(2,6), (2,7), (2,8), (2,9), (2,10), (3,6), (3,7), (3,8), (3,9), (3,10),
(4,6), (5,6), (4,7), (5,7), (4,8), (5,8), (4,9), (4,10), (5,9), (5,10))
4、checkpoint
原型
def checkpoint()
含义
checkpoint 检查点机制,假设你在迭代 1000 次的计算中在第 999 次失败了,
然后你没有 checkpoint,你只能重新开始恢复了,如果恰好你在第 998 次迭代
的时候你做了一个 checkpoint,那么你只需要恢复第 998 次产生的 rdd,然后再
执行 2 次迭代完成总共 1000 的迭代,这样效率就很高,比较适用于迭代计算非
常复杂的情况,也就是恢复计算代价非常高的情况,适当进行 checkpoint 会有
很大的好处。
示例
sc.setCheckpointDir("hdfs://192.168.10.71:9000/wc")
//检查点目录必须存在
val a = sc.parallelize(1 to 5)
a.checkpoint
//将其结果检查点更新
a.collect
res1: Long = 5
5、coalesce, repartition
原型
def coalesce ( numPartitions : Int , shuffle : Boolean = false ): RDD [T]
def repartition ( numPartitions : Int ): RDD [T]
剩余107页未读,继续阅读
资源评论
starryeyed
- 粉丝: 1
- 资源: 18
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功