<script src="https://cdn.bootcss.com/pangu/3.3.0/pangu.min.js">
</script>
<script>
pangu.spacingPage();
</script>
# Spark Streaming学习笔记
# 目录
* [Spark Streaming 基本工作原理](#一spark-streaming-基本工作原理)
* [DStream](#11-dstream)
* [Kafka 的 Receiver 和 Direct 方式](#12-kafka-的-receiver-和-direct-方式)
* [基于 Receiver 的方式](#121-基于-receiver-的方式)
* [基于 Direct 方式](#122-基于-direct-方式)
* [transformation](#13-transformation)
* [updateStateByKey](#131-updatestatebykey)
* [transform](#132-transform)
* [window](#133-window)
* [output 和 foreachRDD](#14-output和foreachrdd)
* [与 Spark SQL 结合](#15-与-spark-sql-结合)
* [缓存与持久化](#二缓存与持久化)
* [Checkpoint](#三checkpoint)
* [何时开启 Checkpoint 机制](#31-何时开启-checkpoint-机制)
* [如何配置 Checkpoint](#32-如何配置-checkpoint)
* [源码分析](#四源码分析)
* [Structured Streaming](#五structured-streaming)
* [编程模型](#51-编程模型)
* [event-time 和 late-data process](#511-event-time-和-late-data-process)
* [流式 DataSet 和 DataFrame](#52-流式-dataset-和-dataframe)
* [创建流式 DataSet 和 DataFrame](#521-创建流式-dataset-和-dataframe)
* [对流式 DataSet 和 DataFrame 进行操作](#522-对流式-dataset-和-dataframe-进行操作)
* [join 操作](#523-join-操作)
* [不支持的操作](#524-不支持的操作)
* [starting streaming query](#53-starting-streaming-query)
* [managing streaming query](#54-managing-streaming-query)
* [checkpoint](#55-checkpoint)
* [源码及架构分析](#56-源码及架构分析)
## 一、Spark Streaming 基本工作原理
Spark Streaming 内部的基本工作原理如下:接收实时输入数据流,然后将数据拆分成多个 batch,比如每收集1秒的数据封装为一个batch,然后将每个batch交给Spark的计算引擎进行处理,最后会生产出一个结果数据流,其中的数据,也是由一个一个的batch所组成的。
<div align=center>
<img src="./pic/spark_streaming_data_flow.png" width="70%" height="50%" />
</div></br>
### 1.1 DStream
Spark Streaming 提供了一种高级的抽象,叫做 DStream,英文全称为 Discretized Stream,中文翻译为“离散流”,它代表了一个持续不断的数据流。DStream可以通过输入数据源来创建,比如Kafka、Flume和Kinesis;也可以通过对其他DStream应用高阶函数来创建,比如map、reduce、join、window。
DStream的内部,其实一系列持续不断产生的RDD。RDD是Spark Core的核心抽象,即,不可变的,分布式的数据集。DStream中的每个RDD都包含了一个时间段内的数据。
<div align=center>
<img src="./pic/DStream.png" width="70%" height="50%" />
</div></br>
对DStream应用的算子,比如map,其实在底层会被翻译为对DStream中每个RDD的操作。比如对一个DStream执行一个map操作,会产生一个新的DStream。但是,在底层,其实其原理为,对输入DStream中每个时间段的RDD,都应用一遍map操作,然后生成的新的RDD,即作为新的DStream中的那个时间段的一个RDD。底层的RDD的transformation操作,其实,还是由Spark Core的计算引擎来实现的。Spark Streaming对Spark Core进行了一层封装,隐藏了细节,然后对开发人员提供了方便易用的高层次的API。
<div align=center>
<img src="./pic/DStream操作.png" width="70%" height="50%" />
</div></br>
Spark Streaming的具体工作原理如下:
<div align=center>
<img src="./pic/Spark Streaming基本工作原理.png" />
</div></br>
### 1.2 Kafka的Receiver和Direct方式
#### 1.2.1 基于Receiver的方式
这种方式使用Receiver来获取数据。Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。
然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。
> 需要注意的点:
1、Kafka中的topic的partition,与Spark中的RDD的partition是没有关系的。所以,在KafkaUtils.createStream()中,提高partition的数量,只会增加一个Receiver中,读取partition的线程的数量。不会增加Spark处理数据的并行度。
2、可以创建多个Kafka输入DStream,使用不同的consumer group和topic,来通过多个receiver并行接收数据。
3、如果基于容错的文件系统,比如HDFS,启用了预写日志机制,接收到的数据都会被复制一份到预写日志中。因此,在KafkaUtils.createStream()中,设置的持久化级别是StorageLevel.MEMORY_AND_DISK_SER。
代码:[KafkaReceiverWordCount](./KafkaReceiverWordCount.scala)
#### 1.2.2 基于Direct方式
这种新的不基于Receiver的直接方式,是在Spark 1.3中引入的,从而能够确保更加健壮的机制。替代掉使用Receiver来接收数据后,这种方式会周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。
优点:
* 简化并行读取:
如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。
* 高性能:
如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下,因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到WAL中。而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复。
* 一次且仅一次的事务机制:
基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。
基于direct的方式,使用Kafka的简单api,Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。
代码:[KafkaDirectWordCount](./KafkaDirectWordCount.scala)
### 1.3 transformation
对于基础的操作就不一一列出来来,参考[官网文档](http://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations-on-dstreams)
#### 1.3.1 updateStateByKey
updateStateByKey操作,可以让我们为每个key维护一份state,并持续不断的更新该state。
1. 首先,要定义一个state,可以是任意的数据类型;
2. 其次,要定义state更新函数——指定一个函数如何使用之前的state和新值来更新state。
对于每个batch,Spark都会为每个之前已经存在的key去应用一次state更新函数,无论这个key在batch中是否有新的数据。如果state更新函数返回none,那么key对应的state就会被删除。
> tip:
1、对
没有合适的资源?快使用搜索试试~ 我知道了~
温馨提示
Spark 学习之路,包含 Spark Core,Spark SQL,Spark Streaming,Spark mllib 学习笔记 * [spark core学习笔记及代码 * [spark sql学习笔记及代码 * [spark streaming学习笔记及代码 Spark 消息通信 ### Spark 启动消息通信 Spark 启动过程中主要是进行 Master 和 Worker 之间的通信,其消息发送关系如下,首先由 worker 节点向 Master 发送注册消息,然后 Master 处理完毕后,返回注册成功或失败的消息。 # 作业执行源码分析 当我们的代码执行到了 action(行动)操作之后就会触发作业运行。在 Spark 调度中最重要的是 DAGScheduler 和 TaskScheduler 两个调度器,其中,DAGScheduler 负责任务的逻辑调度, 将作业拆分为不同阶段的具有依赖关系的任务集。TaskScheduler 则负责具体任务的调度执行...
资源推荐
资源详情
资源评论
收起资源包目录
Spark 学习之路,包含 Spark Core,Spark SQL,Spark Streaming,Spark mllib 学 (1475个子文件)
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_SUCCESS 0B
_SUCCESS 0B
_SUCCESS 0B
_SUCCESS 0B
_SUCCESS 0B
ua.base 1.71MB
ub.base 1.71MB
u1.base 1.51MB
u2.base 1.51MB
u3.base 1.51MB
u4.base 1.51MB
u5.base 1.51MB
checkpoint-1497929575000.bk 4KB
checkpoint-1497929585000.bk 4KB
checkpoint-1497929565000.bk 4KB
checkpoint-1497929560000.bk 4KB
checkpoint-1497929560000 4KB
checkpoint-1497929565000 4KB
checkpoint-1497929570000 4KB
checkpoint-1497929575000 4KB
checkpoint-1497929580000 4KB
checkpoint-1497929585000 4KB
.part-r-00000-20775331-2c3a-4295-8fa4-06acc56640f1.snappy.parquet.crc 84B
.checkpoint-1497929575000.crc 40B
.checkpoint-1497929565000.bk.crc 40B
.checkpoint-1497929560000.crc 40B
.checkpoint-1497929575000.bk.crc 40B
.checkpoint-1497929585000.bk.crc 40B
.checkpoint-1497929570000.crc 40B
.checkpoint-1497929565000.crc 40B
.checkpoint-1497929580000.crc 40B
.checkpoint-1497929560000.bk.crc 40B
.checkpoint-1497929585000.crc 40B
.part-r-00001-88630958-4f7b-4149-ad2a-6054fb942054.snappy.parquet.crc 16B
.part-r-00000-858667dd-2347-43ef-a1d9-4d21d9096651.snappy.parquet.crc 16B
.part-r-00000-88630958-4f7b-4149-ad2a-6054fb942054.snappy.parquet.crc 16B
.part-r-00000-72c92ea5-ab4c-4601-a2e4-21fd0229a550.snappy.parquet.crc 16B
.part-00000.crc 12B
.part-00001.crc 12B
.part-00000.crc 12B
.part-00001.crc 12B
.part-00000.crc 12B
.part-00001.crc 12B
.part-00000.crc 12B
.part-00001.crc 12B
.part-00000.crc 12B
.part-00001.crc 12B
.part-r-00000-6ae09ea4-2dd0-4e8e-9be0-481249d553bc.json.crc 12B
.part-r-00001-858667dd-2347-43ef-a1d9-4d21d9096651.snappy.parquet.crc 12B
.part-00000.crc 12B
.student_infos.txt.crc 12B
.student_scores.txt.crc 12B
name.csv 123B
name.csv 123B
logmirror.ctrl 48B
log.ctrl 48B
log1.dat 1024KB
c230.dat 312KB
c20.dat 104KB
c180.dat 72KB
c90.dat 72KB
ca1.dat 72KB
c8c1.dat 64KB
ca50.dat 64KB
c570.dat 64KB
c490.dat 64KB
cae1.dat 64KB
c8a0.dat 64KB
c670.dat 64KB
cb21.dat 64KB
c510.dat 64KB
cb71.dat 64KB
c530.dat 64KB
cba1.dat 64KB
c5b0.dat 64KB
c9c0.dat 64KB
c6f0.dat 64KB
ca30.dat 64KB
c630.dat 64KB
c610.dat 64KB
c5f0.dat 64KB
c9b1.dat 64KB
c4b0.dat 64KB
c9e0.dat 64KB
c41.dat 36KB
c51.dat 28KB
c31.dat 24KB
cf0.dat 20KB
c161.dat 20KB
c251.dat 20KB
c191.dat 16KB
c101.dat 16KB
c60.dat 16KB
c121.dat 16KB
c71.dat 16KB
共 1475 条
- 1
- 2
- 3
- 4
- 5
- 6
- 15
资源评论
程序媛小y
- 粉丝: 5615
- 资源: 187
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功