没有合适的资源?快使用搜索试试~ 我知道了~
细说Flink CEP_152.pdf
需积分: 9 11 下载量 109 浏览量
2020-03-24
16:08:24
上传
评论
收藏 942KB PDF 举报
温馨提示
试读
16页
本文由浅入深(从相关背景、API 使用,到具体实现)介绍 Flink 当前的 CEP 模块。 读者需要对 Flink DataStream API 有一定了解,内容比较全面,请据所需选读。
资源推荐
资源详情
资源评论
细说Flink CEP
本文由浅入深(从相关背景、!"# 使用,到具体实现)介绍 $%&'( 当前的 )*" 模块。
读者需要对 $%&'(+,-.-/.01-2+!"# 有一定了解,内容比较全面,请据所需选读。+
背景介绍
什么是CEP?
全称 Complex(event(processing,专指解决3从一系列具体源事件实时地推导(或
提取、感知)出所需的更复杂更抽象的事件4的一类技术。+
别看 )*" 解决的问题就一句话,其实应用场景极其广泛:量化交易(如通过实时
的股票交易数据判断一些价格转折点及时作出买卖操作)、监控系统(根据监测
数据实时检测复杂的异常)、物联网传感器数据分析(此类应用场景很多,工业
制造、医疗、智慧城市等均需要 )*")等。总之,在有非常多源数据(事件)以
流的形式产生时,需要对其分析、实时感知并作出响应的场景,都属于 )*" 的应
用场景。+
)*" 有一些重要特征:+
• 实时性:)*"是需要实时感知并作出响应的,至于实时具体是多长时间,
这个根据场景可能是毫秒级,也可能是秒级,分钟级的在一些监控场景
也可能。一般从充足的源事件输入到输出结果事件的延迟是毫秒级。+
• 复合性:+
输入事件可以有一个或多个事件源,一般是从多个(甚至很多)序列事
件进行推导。+
• 与数据库相反的处理逻辑:数据库是先存储数据,再查询需要的结果,
即先有数据,再运行查询;而)*"是先定义好查询,再用实时数据去匹配
查询,即先有查询,再运行数据。+
目前 )*" 的技术产品非常多,比如 *5610、/&778&、!9:01+/.01-2+!'-%;.&<5。+
)*" 领域有一项核心技术就是基于事件流的模式匹配("-..10'+2-.<8&'=+>?10+
1?1'.+5.01-25),后文也主要针对此项技术讨论。事件流一般是指时间序列数据,
事件按照发生时间有序(也有时候无序)输入到事件流。多个连续或非连续的事
件构成一个事件序列,如果一个事件序列满足预定义的 "-..10',就会成为一个
输出事件。+
就像字符串的正则表达式一样,需要先定义好模式("-..10'),编译后再去对字
符串(或字符流)进行匹配。如果把一个字符作为一个事件,"-..10' 定义为
@ABCDE@-B9D,那么事件流FG+-+H+I+C+J+KKKF会得到输出流FG-+CJ+KKKF。(注:6-..1' 中
还可以定义事件衔接规则,允许不连续L5(&6M.&%%M'1N.O还会输出FHCJF,若
5(&6M.&%%M-';,则还有FGJ+GHJ+GCJ+GHCJ+HJF)+
Flink CEP是什么情况?
$%&'(+)*" 与 P-J%1+!"# 均是建立在 ,-.-/.01-2+!"# 之上,其实 P-J%1+!"# 也可以做
一些简单的数据筛选,但对于判断序列数据间关系比较无力。目前 $%&'(+)*" 没
有类 /QR 的支持,只提供了一些编程 !"#,此模块仍在继续增强中。+
$%&'(+)*" 模块代码再 S%&'(B%&J0-0&15 下,使用其 !"# 时除了依赖 S%&'(B5.01-2&'=BT-?-
外,还需要单独添加 S%&'(B<16 依赖。+
$%&'(+)*" 其实就是在 ,-.-/.01-2 上对流数据提供了模式匹配功能,从输入流检
测出满足预定义 "-..1' 的数据序列作为输出。其 !"# 及详细实现将在后文介绍。+
Flink CEP的使用示例及API介绍
股票监测示例
问题描述:股票经常有这样一种情况,单位时间成交量维持高位且价格逐步上升,
到某点时成交量会骤降,我们据此定义以下 "-..10' 来监测此类情况:+
1. ##FROM#### stock_stream_source ###### ##### ####/*#指定输入源#*/###
2. PATTERN#SEQ(Stock+#a[#],#Stock#b)#########/*#定义要选取的事件序列,Stock+#是Kl
eene#plus,表示一到多个满足a 条件的Stock集合#*/###
3. WHERE### strict_contiguity(a[ #],#b)#{## #/*#序列需要满足的条件放在{}中,strict_c
ontiguity是一种事件衔接规则,稍后解释#*/# ##
4. #### ####[symbol]##########################/*#按股票代码keyBY#*/## #
5. ##and## #a[1].volume#>#1000################/*#从成交量大于1000手时开始匹配#*/###
6. ##and## #a[i].price#>#avg(a[..i-1].price)##/*#下一次的价格高于之前的平均价格,循
环匹配#*/###
7. ##and## #b.volume#<#80%*a[a.LEN].volume#}##/*#成交量骤降时结束并成功匹配#*/###
8. WITHIN##1# hour ###### ##### ##### ##### ##### ##/*#定义1小时的时间窗口,1小时内未成功
的匹配时将timeout掉#*/# ##
9. RETURN##a[1].symbol,#a[1].price,#a[1].timeStamp,#b.price,#b.timeStamp#/*#定
义输出结果格式#*/### #
#
/.><( 代表单位时间内某个股票的成交信息L一个 *?1'.O,包含属性:5;2J>%+ 股票
代码,60&<1+ 股票价格,?>%:21+ 成交量,.&21/.-26+ 成交时间。+
我们再定义输出结果结构为 /1%%">&'.,包含属性:5;2J>%U+5.-0."0&<1U+
5.-0.P&21/.-26U+1'7"0&<1U+1'7P&21/.-26 和输出结果对应。+
以上代码是用 /!/*E语言表达,S%&'( 并不支持,下面我们用 S%&'(+)*"+!"# 实现:+
1. //第一步:构造Patten## #
2. Pattern<Stock,#?>#pattern1#=#Pattern.<Stock>begin("a[1]").where(###
3. #### new#IterativeCondition<Stock>()#{###
4. #### ####@Override###
5. #### ####public#boolean#filter(###
6. #### ##### #######Stock#stock,#Context<Stock>#context)#throws#Exception#{###
7. #### ##### ###return#stock.getVolume()#>#1000;###
8. #### ####} ###
9. #### });###
10. Pattern<Stock,#?>#pattern2#=#pattern1.next("a[i]").where(###
11. #### new#IterativeCondition<Stock>()#{###
12. #### ####@Override###
13. #### ####public#boolean#filter(Stock#stock,#Context<Stock>#context)#throws#Ex
ception#{###
14. #### ##### ###Stock#startStock#=#context.getEventsForPattern("a[1]").iterator(
).next();###
15. #### ##### ###int#totalVolume#=#startStock.getVolume();###
16. #### ##### ###double#totalMoney#=#startStock.getPrice()#*#totalVolume;###
17. #### ##### ###for#(Stock#s#:#context.getEventsForPattern("a[i]"))#{###
18. #### ##### #######totalVolume#+=#s.getVolume();###
19. #### ##### #######totalMoney#+=#s.getPrice()#*#s.getVolume();###
20. #### ##### ###}###
21. #### ##### ###return#stock.getPrice()#>#totalMoney#/#totalVolume;###
22. #### ####} ###
23. #### }).oneOrMore().consec utive();###
24. Pattern<Stock,#?>#pattern3#=#pattern2.next("b").where(###
25. #### new#IterativeCondition<Stock>()#{###
26. #### ####@Override###
27. #### ####public#boolean#filter(Stock#stock,#Context<Stock>#context)#throws#Ex
ception#{###
28. #### ##### ###int#lastVolume#=#0;###
29. #### ##### ###long#lastTS#=#0;###
30. #### ##### ###for#(Stock#s#:#context.getEventsForPattern("a[i]"))#{###
31. #### ##### #######if#(s.getTimeStamp()#>#lastTS)#{###
32. #### ##### ###########lastTS#=#s.getTimeStamp();###
33. #### ##### ###########lastVolume#=#s.getVolume();###
34. #### ##### #######}###
35. #### ##### ###}###
36. #### ##### ###return#lastVolume#>#0#&&#stock.getVolume()#<#lastVolume#*#0.8;###
37. #### ####} ###
38. #### #}).within(Time.hours(1));###
39. ###
40. //第二步:创建PatternStream,假设已有DataStreamSource<Stock> #stockStreamSource
###
41. PatternStream<Stock>#patternStream#=#CEP.pattern(###
剩余15页未读,继续阅读
资源评论
微夜白
- 粉丝: 3
- 资源: 1
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功