### Spark Streaming
- 数据类型:
- 静态数据
- [ ] 特点:批量处理
- [ ] 传统数据处理流程:
1. 数据采集存储
2. 用户主动发出查询获取结果
- 流数据
- [ ] 特点:
1. 数据快速持续到达,潜在大小也许无穷
2. 数据来源众多,格式复杂
3. 数据量大,一旦处理完,要么丢弃,要么归档处理
4. 注重数据整体价值,不过分关注个别数据
5. 数据顺序颠倒或不完整,系统无法控制将要处理的新到达数据顺序
- [ ] 实时计算
- [ ] 数据价值随时间流逝降低
- [ ] 要求:高性能,实时性,分布性
- [ ] 过程:
1. 数据实时采集
2. 数据实时计算
3. 主动发起查询
- Strom
- 特点:
- [ ] 可毫秒级实时计算
- [ ] 高度容错
- [ ] 实时流计算
- Spark Streaming
- 特点:
- [ ] 模仿流计算小批量处理
- [ ] 可实现秒级响应,变相实现高效流计算
- [ ] 更高效容错处理
- [ ] 兼容批量实时
- 数据抽象
- [ ] Spark Core -->数据抽象:RDD
- [ ] Spark SQL -->数据抽象:DataFrame
- [ ] Spark Streaming -->数据抽象:DStream
- DStream:<br>
即为一系列RDD集合
- [ ] DStream操作:<br>
1. 在Spark Streaming中会有组件Receiver,作为一个长期运行的task跑在Execution上
2. 每个receiver(包括套接字流,文件流,kafka读取数据流)单独接受一个数据源Input DStream
3. Spark Streaming通过input DStream与外部数据源进行连接,读取相关数据
- [ ] 编写Spark Streaming程序
1. 通过创建DStream来定义输入源
2. 通过对DStream应用转换操作和输出操作定义流计算
3. 用streamingContext.start()来开始接受数据和处理流程
4. 通过streamingContext.awaitTermination()等待处理结束(手动结束或因错误结束)
5. 通过streamingContext.stop()手动结束流计算
- [ ] 运行Spark Streaming首先生成StreamingContext对象--> 主入口
1. pyspark环境下
```python
from pyspark.streaming import StreamingContext
StreamingContext(sc, 1)
```
2. python下
```python
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
conf = SparkConf()
conf.setAppName('TestDStream')
conf.setMaster('local[2]')
sc = SparkContext(conf = conf)
ssc = StreamingContext(sc, 1)
```
- 基本数据源获取SparkStreaming
- 文件流
- [ ] 创建文件
```
# shell 下运行
cd /usr/local/spark/mycode
mkdir streaming
cd streaming
mkdir logfile
# 将该github中5.Spark Streaming下的file.txt放置到node-1的/usr/local/spark/mycode/streaming中
# file.txt是取自https://github.com/fighting41love/funNLP/blob/master/data/%E6%95%8F%E6%84%9F%E8%AF%8D%E5%BA%93/%E8%89%B2%E6%83%85%E8%AF%8D%E5%BA%93.txt该目录下的色情词库
```
```python
vim WriteFile.py
# 在WriteFile.py写入如下代码,不断生成文本文件看作流源
import numpy as np
import time
import os
with open('/usr/local/spark/mycode/streaming/file.txt','r',encoding='utf-8') as f:
lines = f.readlines()
index = np.random.randint(low=0,high=len(lines)-10)
newlines = lines[index:index+10]
w_length = np.random.randint(low=5,high=15)
file_index = 1
write_base = '/usr/local/spark/mycode/streaming/logfile'
if not os.path.exists(write_base):
os.makedirs(write_base)
while file_index < 1000:
with open('%s/%d.txt'%(write_base,file_index),'w') as f:
for _ in range(w_length):
f.write(newlines[np.random.randint(low=0,high=len(newlines))].strip())
f.write(' ')
file_index += 1
index = np.random.randint(low=0,high=len(lines)-10)
newlines = lines[index:index+10]
w_length = np.random.randint(low=5,high=15)
time.sleep(5)
```
- [ ] 写入pyspark
开启hadoop,spark环境,具体操作可参考前几节
```python
# 运行pyspark,在pyspark写入如下代码
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 10)
lines = ssc. \
... textFileStream('file:///usr/local/spark/mycode/streaming/logfile')
words = lines.flatMap(lambda line: line.split(' '))
wordCounts = words.map(lambda x : (x,1)).reduceByKey(lambda a,b:a+b)
wordCounts.pprint()
ssc.start()
ssc.awaitTermination()
```
```
# 新打开个终端窗口来生成文件流
cd /usr/local/spark/mycode/streaming/
python WriteFile.py
# 在pyspark窗口就可以看到词频统计
```
- [ ] 采用独立应用程序方式
```python
cd /usr/local/spark/mycode/streaming/
vim FileStreaming.py
# FileStreaming.py里面写入如下代码
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
conf = SparkConf()
conf.setAppName('TestDStream')
conf.setMaster('local[2]')
sc = SparkContext(conf = conf)
ssc = StreamingContext(sc, 10)
lines = ssc.textFileStream('file:///usr/local/spark/mycode/streaming/logfile')
words = lines.flatMap(lambda line: line.split(' '))
wordCounts = words.map(lambda x : (x,1)).reduceByKey(lambda a,b:a+b)
wordCounts.pprint()
ssc.start()
ssc.awaitTermination()
```
```
# WriteFile.py别kill掉提交FileStreaming.py
cd /usr/local/spark/mycode/streaming/
/usr/local/spark/bin/spark-submit FileStreaming.py
# 在窗口可以看到词频统计
```
- 套接字流<br>
socket一直处于阻塞状态,等待响应
- [ ] 使用nc程序
```python
cd /usr/local/spark/mycode/streaming/
vim NetworkWordCount.py
# 在NetworkWordCount.py内写入如下代码
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: NetworkWordCount.py <hostname> <port>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingNetworkWordCount")
ssc = StreamingContext(sc, 1)
# socketTextStream-->定义套接字流,接入参数为网址和端口号
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
counts = lines.flatMap(lambda line: line.strip().split(" ")) \
.map(lambda word: (word, 1))
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
收起资源包目录
Learning_Spark【程序员VIP专用】.zip (34个子文件)
Learning_Spark【程序员VIP专用】
6.Structure Streaming
ReadMe.md 16KB
2.RDD
ReadMe.md 16KB
1.安装环境
img
19.png 44KB
10.png 49KB
9.png 73KB
3.png 89KB
12.png 101KB
15.png 118KB
1.png 96KB
11.png 87KB
13.png 98KB
6.png 48KB
5.png 93KB
4.png 78KB
8.png 87KB
21.png 180KB
17.png 23KB
16.png 120KB
7.png 92KB
20.png 233KB
18.png 61KB
2.png 93KB
14.png 126KB
22.png 39KB
ReadMe.md 15KB
8.总结
.DS_Store 6KB
ReadMe.md 3KB
3.HBase
img
1.png 40KB
ReadMe.md 9KB
4.Spark SQL
ReadMe.md 11KB
ReadMe.md 889B
7.Spark MLlib
ReadMe.md 11KB
5.Spark Streaming
file.txt 10KB
ReadMe.md 37KB
共 34 条
- 1
资源评论
想念@思恋
- 粉丝: 861
- 资源: 103
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功