在一个孤立系统里,如果没有外力做功,其总混乱度(即熵)会不断增大
RDD函数传递和依赖关系
RDD中的函数传递
在实际开发中我们往往需要自己定义一些对于RDD的操作,那么此时需要主要的是,初始化工
作是在Driver端进行的,而实际运行程序是在Executor端进行的,这就涉及到了跨进程通信,是
需要序列化的。
跨进程通信,对象传递,序列化
传递一个方法
直接运行程序会发现报错:没有初始化.因为rdd.filter(isMatch)用到了对象this的方法isMatch,所以对象
this需要序列化,才能把对象从driver发送到executor。【其实就是对象在进程间传输需要序列化】
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object SerDemo {
def main(args: Array[String]): Unit = {
valconf:SparkConf=new SparkConf()
.setAppName("SerDemo")
.setMaster("local[*]")
val sc = new SparkContext(conf)
val rdd: RDD[String] = sc.parallelize(Array("hello world", "hello
zhutian", "zhutian", "hahah"), 2)
val searcher = new Searcher("hello")
val result: RDD[String] = searcher.getMatchedRDD1(rdd)
result.collect.foreach(println)
}
}
//需求: 在 RDD 中查找出来包含 query 子字符串的元素
// query 为需要查找的子字符串
class Searcher(val query: String){
// 判断 s 中是否包括子字符串 query
def isMatch(s : String) ={
s.contains(query)
}
// 过滤出包含 query字符串的字符串组成的新的 RDD
def getMatchedRDD1(rdd: RDD[String]) ={
rdd.filter(isMatch) //
}
// 过滤出包含 query字符串的字符串组成的新的 RDD
def getMatchedRDD2(rdd: RDD[String]) ={
rdd.filter(_.contains(query))
}
}
评论0