通过上述处理,一份可能造成后续数据倾斜的测试数据即以准备好。接
下来,使用 Spark 读取该测试数据,并通过 groupByKey(12)对 id 分组处理,
且 Shu e 并行度为 12。代码如下
scala> val sourceRdd = sc.textFile("hdfs://master01:9000/test_data/p*")
sourceRdd: org.apache.spark.rdd.RDD[String] = hdfs://master01:9000/test_data/p*
MapPartitionsRDD[1] at textFile at <console>:24
scala> val kvRdd = sourceRdd.map(x =>{ val parm=x.split(" ");
(parm(0).trim().toInt,parm(1).trim()) })
kvRdd: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[2] at map at
<console>:26
scala> kvRdd.groupByKey(12).count
res0: Long = 150000
scala> :quit
本次实验所使用集群节点数为 3,每个节点可被 Yarn 使用的 CPU 核数为
3,内存为 2GB。在 Spark-shell 中进行提交
GroupBy Stage 的 Task 状态如下图所示,Task 8 处理的记录数为 90 万,
远大于(9 倍于)其它 11 个 Task 处理的 10 万记录。而 Task 8 所耗费的时间
为 1 秒,远高于其它 11 个 Task 的平均时间。整个 Stage 的时间也为 1 秒,该
时间主要由最慢的 Task 8 决定。数据之间处理的比例最大为 105 倍。
评论0
最新资源