没有合适的资源?快使用搜索试试~ 我知道了~
这一章想讲一下Spark的缓存是如何实现的。这个persist方法是在RDD里面的,所以我们直接打开RDD这个类。它调用SparkContext去缓存这个RDD,追杀下去。它居然是用一个HashMap来存的,具体看这个map的类型是TimeStampedWeakValueHashMap[Int,RDD[_]]类型。把存进去的值都隐式转换成WeakReference,然后加到一个内部的一个ConcurrentHashMap里面。这里貌似也没干啥,这是有个鸟蛋用。。大神莫喷,知道干啥用的人希望告诉我一下。现在并没有保存,等到真正运行Task运行的时候才会去缓存起来。入口在Task的runTask方
资源推荐
资源详情
资源评论
Spark源码系列(五)分布式缓存源码系列(五)分布式缓存
这一章想讲一下Spark的缓存是如何实现的。这个persist方法是在RDD里面的,所以我们直接打开RDD这个类。
def persist(newLevel: StorageLevel): this.type = {
// StorageLevel不能随意更改
if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {
throw new UnsupportedOperationException("Cannot change storage level of an RDD after it was already assigned a level")
}
sc.persistRDD(this)
// Register the RDD with the ContextCleaner for automatic GC-based cleanup
// 注册清理方法
sc.cleaner.foreach(_.registerRDDForCleanup(this))
storageLevel = newLevel
this
}
它调用SparkContext去缓存这个RDD,追杀下去。
private[spark] def persistRDD(rdd: RDD[_]) {
persistentRdds(rdd.id) = rdd
}
它居然是用一个HashMap来存的,具体看这个map的类型是TimeStampedWeakValueHashMap[Int, RDD[_]]类型。把存进去
的值都隐式转换成WeakReference,然后加到一个内部的一个ConcurrentHashMap里面。这里貌似也没干啥,这是有个鸟蛋
用。。大神莫喷,知道干啥用的人希望告诉我一下。
CacheManager
现在并没有保存,等到真正运行Task运行的时候才会去缓存起来。入口在Task的runTask方法里面,具体的我们可以看
ResultTask,它调用了RDD的iterator方法。
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
} else {
computeOrReadCheckpoint(split, context)
}
}
一旦设置了StorageLevel,就要从SparkEnv的cacheManager取数据。
def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel): Iterator[T] = {
val key = RDDBlockId(rdd.id, split.index)
blockManager.get(key) match {
case Some(values) =>
// 已经有了,直接返回就可以了
new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
case None =>
// loading包含这个key表示已经有人在加载了,等到loading被释放了,就可以去blockManager里
面取到了
loading.synchronized {
if (loading.contains(key)) {
while (loading.contains(key)) {
try {
loading.wait()
} catch {
case e: Exception =>
logWarning(s"Got an exception while waiting for another thread to load $key", e)
}
}
// 别人成功拿到了,我们直接取结果就是了,如果别人取失败了,我们再来取一次
blockManager.get(key) match {
case Some(values) =>
return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
case None =>
loading.add(key)
}
} else {
loading.add(key)
}
}
try {
// 通过rdd自身的compute方法去计算得到结果,回去看看RDD那文章,自己看看源码就清楚了
val computedValues = rdd.computeOrReadCheckpoint(split, context)
// 如果是本地运行的,就没必要缓存了,直接返回即可
if (context.runningLocally) {
return computedValues
}
// 跟踪blocks的更新状态
var updatedBlocks = Seq[(BlockId, BlockStatus)]()
val returnValue: Iterator[T] = {
if (storageLevel.useDisk && !storageLevel.useMemory) {
/* 这是RDD采用DISK_ONLY的情况,直接扔给blockManager
* 然后把结果直接返回,它不需要把结果一下子全部加载进内存
* 这同样适用于MEMORY_ONLY_SER,但是我们需要在启用它之前确认blocks没被block store给
丢弃 */
updatedBlocks = blockManager.put(key, computedValues, storageLevel, tellMaster = true)
blockManager.get(key) match {
case Some(values) =>
values.asInstanceOf[Iterator[T]]
case None =>
throw new Exception("Block manager failed to return persisted valued")
}
} else {
// 先存到一个ArrayBuffer,然后一次返回,在blockManager里也存一份
val elements = new ArrayBuffer[Any]
elements ++= computedValues
updatedBlocks = blockManager.put(key, elements, storageLevel, tellMaster = true)
elements.iterator.asInstanceOf[Iterator[T]]
}
}
// 更新task的监控参数
val metrics = context.taskMetrics
metrics.updatedBlocks = Some(updatedBlocks)
new InterruptibleIterator(context, returnValue)
} finally {
// 改完了,释放锁
loading.synchronized {
loading.remove(key)
loading.notifyAll()
}
}
}
}
1、如果blockManager当中有,直接从blockManager当中取。
2、如果blockManager没有,就先用RDD的compute函数得到出来一个Iterable接口。
3、如果StorageLevel是只保存在硬盘的话,就把值存在blockManager当中,然后从blockManager当中取出一个Iterable接
口,这样的好处是不会一次把数据全部加载进内存。
4、如果StorageLevel是需要使用内存的情况,就把结果添加到一个ArrayBuffer当中一次返回,另外在blockManager存上一
份,下次直接从blockManager取。
对StorageLevel说明一下吧,贴一下它的源码。
剩余6页未读,继续阅读
资源评论
weixin_38735790
- 粉丝: 4
- 资源: 899
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功