spark rdd api dataframe 以及dataframe rdd dataset 相互转换 spark sql

所需积分/C币:50 2018-02-14 21:36:38 2.29MB PDF
收藏 收藏
举报

包括spara rdd api,dataframe action操作、查询操作、join操作,dataframe rdd dataset 相互转换以及spark sql。
//有二个元素,需要多次聚合(","12")("23") //(","12")聚合结果是"θ”再与("23")聚合,得到的结果是"1 //(","345")聚合结果是"”再与(")聚合,得到的结果是"e" //最后两次结果相加,得到的是"18" 特别注意:为了说明分区顺序的重要性,请看下面的例子 va1z=sc,para1le1ize(List("12","23","","345"),2) z, aggregate(")((x, y)=>math. min(x length, y length). tostring,+) res6: String =11 //分区后的结果为("12","23")(","345") /分区内部应用祕始值之后的结果为("","12","23")(",","345") //有三个元素,需要多次聚合("","12")("23") /(","12")聚合结果是"θ"再与("23")聚合,得到的结果是"1" //(",")聚合结果是"θ"冉与("345")聚合,待到的结果是"1" /最后两次结果相加,得到的是"11 2, aggregate ByKey[Pair] 原型 def aggregate ByKey[UI(zerovalue: U)(segop:(U, V)= U, combOp: (U, U) =U(implicit argO: ClassTag[U]): RDD[(K, U) 含义 aggregateByKey也是一个聚合函数,一个RDD分区后,产生多个 Partition, 在 aggregate Bykey中需要指定两个处理函数,第一个函数用于对每个分区内 部处理,第二个函数用于分区之间的处理。 aggregate Bykey的初始值只作用 于每个分区内部,不影响分区之间聚合,分区是严格按照版序的,顺序不同 aggregate6yKey结果就不同 示例 val z= Sc parallelize(list(("cat 2),(cat ,5),(pig", 4),(" cat 12),("dog",12)),2 z. aggregateBykey(o)(math. max(,) ) collect res1: Array((dog, 12),(pig, 4),(cat, 17)) //分区之后的结果是(("cat",2),("cat",5)) (("pig",4),("cat",12),("dog",12)) //应用 aggregate的初始值后,第二个分区为: /([("pig",), ("pig","4")],[("cat",e),("cat",12)],[("dog",),("dog",12)]) //使用math.max函数后,结果为("pig","4"),("cat",12),("dog",12) //如果使用math.min函数后,结果就为(("pig",0),("cat",e),("dog",θ)) //使用函数+聚合时,不会应用初始值 z. aggregateByKey (100)(math. max(,),_+). collect res2: Array ((dog, 100),(pig, 100),(cat, 200)) 3. cartesian 原型 def cartesian[U: ClassTagl(other: RDD[U]: RDD[(T, U) 含义 cartesian笛卡尔积,两个RDD中的元素两两组合 示例 val x= sc parallelize (list(1, 2,3,4,5)) val y = Sc parallelize(list(6,7,8,9, 10)) x cartesian(y). collect //结果是两者山小到大的顺序排列 res9: Array[(Int,Int)]= Array((1,6),(1,7),(1,8),(1,9),(1,10), (2,6),(2,7),(2,8),(2,9),(2,10),(3,6),(3,7),(3,8),(3,9),(3,10), (4,6),(5,6),(4,7),(5,7),(4,8),(5,8),(4,9),(4,10),(5,9),(519)) 4、 checkpoint 原型 def checkpoint( 含义 checkpoint检查点机制,假设你在迭代1000次的计算中在第99次失败了, 然后你没有 checkpoint,你只能重新开始恢复了,如果恰好你在第998次迭代 的时候你做了一个 checkpoint,那么你只需要恢复第998次产生的rdd,然后再 执行2次迭代完成总共1000的迭代,这样效率就很高,比较适用于迭代计算非 常复杂的情况,也就是恢复计算代价非常高的情况,适当进行 checkpoint会有 很大的好处 示例 sc. setcheckpointDir("hdfs: //192.168.10.71: 9000/wc") //检查点目录必须存在 val a=sc parallelize (1 to 5) a, checkpoint /将其结果检查点更新 a collect res1: Long 5. coalesce, repartition 原型 def coalesce( num Partitions Int, shuffle: Boolean= false RDD [T] def repartition( num Partitions: Int ) RDD [T 含义 repartition表示对已经分区的数据重新分区,是 coalesce的一个简单应用 示例 val y= sc parallelize(1 to 10, 5) y coalesce(2, false). collect res1: Array[Int]= Array(1,2,3,4,5,6,7,8,9,10) // coalesce分区中fa1se表示直接分区,不对其重新打乱顺序 val z= y coalesce(2, true) val x=y repartition (2) z collect res2: Array[Int]= Array(3,5,9,1,7,2,8,4,6,10) // coalesce分区中 false表示重新打乱顺序,再分区,和 repartition相等,默认是不 打乱顺序的 6, cogroup [Pair], groupWith [Pair] 原型 def cogroup[W](other: RDD[(K, W)]): RDD[(K,(Iterable[V], Iterable[wD) def groupWith[W](other: RDD[(K, W)I): RDD(K,(Iterable[V] Iterable[wi) 含义 cogroup超级分组,可以将3个 key-value类型的RDD进行分组 示例 //3个key- value类型的RDD分组聚合 val x= Sc parallelize(list((,"apple"),(2,banana"),(3,orange") (4,"kiwi")),2) val y sc parallelize (list((5,computer"),(1,"laptop"),(1 desktop"),(4,iPad")),2) val z=sc parallelize(list((3,"iphone"),(1,"xiaomi"),(4,"huawei")),2) x cogroup(y, z). collect res1 Array((4,(compactBuffer(kiwi,CompactBuffer(ipad), CompactBuffer(huawei) )),(2,(CompactBuffer(banana), CompactBuffer o, CompactBuffer o)) (1,( CompactBuffer(apple), CompactBuffer(laptop, desktop), CompactBuffer (xiaomi))) (3,(CompactBuffer (orange), CompactBuffer(, CompactBuffer (iphone))) (5,(CompactBuffer(, CompactBuffer (computer), CompactBuffer())) //分析:如果ⅹ中不存在这个元素,那么就会将其值置为空 7、 collect, toArray 原型 def collect[U: ClassTagl(f: PartialFunction[T, U]): RDD[U] def toArray0:Aray门 含义 collect将RDD数据转换成Scaa中的数组并返回 示例 val c= sc parallelize (list( " Gnu","Cat","Rat","Dog","Gnu","Rat") c collect resl: Array[string= Array(Gnu, Cat, Rat, Dog, Gnu, Rat 8、 collectAsMap 原型 def collectAsMapo: Map[K, V] 含义 collectAsMap和 collect非常类似,但是,是将数据转换成 key-value类型的 Map 示例 val a= sc parallelize(List(1, 2, 1, 3),1) val b=a.zip(a) //以List中的数字作为key,出现次数作为va1ue b, collectAsMap resl: scala collection. Map[Int, Int]= Map(2->2, 1 ->1, 3->3) 9、 combine ByKey[Pair] 原型 def combine ByKey[C](create Combiner: V=> C, mergeValue: (C, V)=> CI merge Combiners: (C, C)=>C): RDD[(K, C) 含义 combine ByKey高效的实现按照key来聚合,通过在每一个分区内部先聚合, 再在分区之间聚合。每个分区内部是通过迭代方式来聚合,效率非常高。 示例 yala= sc parallelize (list("dog","cat","gnu","salmon","rabbit","turkey","wolf o bear","bee"), 3) va1b=sc,para1le1ize(List(1,1,2,2,2,1,2,2,2),3) val c=b zip(a) //将数据分成三个区,使用数字当作key,字符串作为 value val d =c. combineBykey (list(,(X: List[String, y: String=>y::X, (x: List[stringl, y: List[String])=>x::: y) // combineBykey的三个参数的意思是 /第一个分区为((1,"dog"),(1,"cat"),(2,"gnu")) //List()每个分区中的key,每次取该分区中的一个key //第一次取1,初始时ⅹ中的List为空,找到第一个key为1的dog,将dog放入到x 中的List,然后找到cat //第二次取2,初始时ⅹ中的List为空,找到第一个key为2的gnu //第一个分区得到了((1,("cat","dog"))(2,("gnu") //同理,第二个分区得到了((2,(" salmon"," rabbit")),(1,(" turkey")) //同理,第二个分区得到了(2,("Wolf","bear","bee") //最后,每个分区之间进行合并,将相同的key的数据合并 d collect res1: Array[(Int, List[String])= Array((1, List(cat, dog, turkey)) (2,List(gnu, rabbit, salmon, bee, bear, wolf))) 10, context, spark Context 原型 def compute(split: Partition, context: Task Context): Iterator[T 含义 context返回创建RDD时的 Spark Context 示例 val C=sc parallelize(list( "Gnu","Cat","Rat",Dog"),2) c, context res1: org. apache, spark. SparkContext org. apache. spark. sparkContext(a74c08828 11、 count 原型 def count(: Long 含义 count返回RDD中存储的元素的个数 示例 val c= Sc parallelize (list( "Gnu","Cat","Rat",Dog"), 2) C, count res2: Long 4 12. countApproxDistinct 原型 def countApprox Distinct(relativeSD: Double=0.05): Long 含义 countApproxDistinct近似统计RDD中不重复的值的个数,可以通过 relativeS来确定统计精度,这种方式适合大规模,分布式的数据快速统计 示例 val z= sc parallelize(list(1, 3, 4, 1, 2, 3)) z. countApproxDistinct(0.01)

...展开详情
试读 108P spark rdd api dataframe 以及dataframe rdd dataset 相互转换 spark sql
立即下载 低至0.43元/次 身份认证VIP会员低至7折
    抢沙发
    一个资源只可评论一次,评论内容不能少于5个字
    关注 私信 TA的资源
    上传资源赚积分,得勋章
    最新推荐
    spark rdd api dataframe 以及dataframe rdd dataset 相互转换 spark sql 50积分/C币 立即下载
    1/108
    spark rdd api dataframe 以及dataframe rdd dataset 相互转换 spark sql第1页
    spark rdd api dataframe 以及dataframe rdd dataset 相互转换 spark sql第2页
    spark rdd api dataframe 以及dataframe rdd dataset 相互转换 spark sql第3页
    spark rdd api dataframe 以及dataframe rdd dataset 相互转换 spark sql第4页
    spark rdd api dataframe 以及dataframe rdd dataset 相互转换 spark sql第5页
    spark rdd api dataframe 以及dataframe rdd dataset 相互转换 spark sql第6页
    spark rdd api dataframe 以及dataframe rdd dataset 相互转换 spark sql第7页
    spark rdd api dataframe 以及dataframe rdd dataset 相互转换 spark sql第8页
    spark rdd api dataframe 以及dataframe rdd dataset 相互转换 spark sql第9页
    spark rdd api dataframe 以及dataframe rdd dataset 相互转换 spark sql第10页
    spark rdd api dataframe 以及dataframe rdd dataset 相互转换 spark sql第11页
    spark rdd api dataframe 以及dataframe rdd dataset 相互转换 spark sql第12页
    spark rdd api dataframe 以及dataframe rdd dataset 相互转换 spark sql第13页
    spark rdd api dataframe 以及dataframe rdd dataset 相互转换 spark sql第14页
    spark rdd api dataframe 以及dataframe rdd dataset 相互转换 spark sql第15页
    spark rdd api dataframe 以及dataframe rdd dataset 相互转换 spark sql第16页
    spark rdd api dataframe 以及dataframe rdd dataset 相互转换 spark sql第17页
    spark rdd api dataframe 以及dataframe rdd dataset 相互转换 spark sql第18页
    spark rdd api dataframe 以及dataframe rdd dataset 相互转换 spark sql第19页
    spark rdd api dataframe 以及dataframe rdd dataset 相互转换 spark sql第20页

    试读已结束,剩余88页未读...

    50积分/C币 立即下载 >