# Flink动态业务规则的实现
[TOC]
## 一、 需求
在不停止flink任务的情况下,在外部通过配置,来达到修改flink输出特征逻辑的变化、新增和删除。
## 二、 监控配置变化
### 2.1 FlinkCDC的实现
直接使用 `flink-connector-mysql-cdc`
### 2.2 Apollo的实现
自定义apollo source,没隔1s输出期间内监控到的apollo配置的变化。
```java
import com.alibaba.fastjson.JSON;
import com.ctrip.framework.apollo.Config;
import com.ctrip.framework.apollo.ConfigChangeListener;
import com.ctrip.framework.apollo.ConfigService;
import com.ctrip.framework.apollo.model.ConfigChange;
import flink_dynamic_rules.entity.ApolloEvent;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
public class ApolloSource extends RichSourceFunction<String> {
private Config config;
private ApolloEvent event = null;
private boolean flag = true;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 一般修改一个配置间隔时间几秒,就是说几秒之内会只监听到一个事件
// 如果考虑高并发的情况,把事件放进队列中,在run中取出队列中所有的事件
ConfigChangeListener changeListener = changeEvent -> {
for (String key : changeEvent.changedKeys()) {
ConfigChange change = changeEvent.getChange(key);
event = new ApolloEvent(change.getPropertyName(), change.getOldValue(), change.getNewValue(), change.getChangeType().toString());
}
};
config = ConfigService.getAppConfig();
config.addChangeListener(changeListener);
}
@Override
public void run(SourceContext<String> sourceContext) throws Exception {
while (flag) {
if (event != null) {
sourceContext.collect(JSON.toJSONString(event));
event = null;
}
Thread.sleep(1000);
}
}
@Override
public void cancel() {
flag = false;
}
}
```
## 三、 根据配置的变化修改逻辑
### 3.1 简单的逻辑
逻辑变化不大,只是一些阈值或者系数发生变化的特征。
把这些阈值和系数看作维表,与数据流连接做特征的计算。
**数据流:**
| session_id | score1 | score2 | score3 |
| ---------- | ------ | ------ | ------ |
| | | | |
**配置流:**
| a | b | c | d |
| ---- | ---- | ---- | ---- |
| 3 | 2 | 100 | xxx |
**特征计算逻辑:**
```sql
select
session_id,
score1*a + score2*b as feature1,
if(score3>c,1,0) as feature2
from connect_stream;
```
### 3.2 复杂的逻辑
对于计算逻辑比较复杂的,需要用udf计算的特征。上面的方法就不能实现了。可以通过动态加载类,再通过反射执行类的方法,来实现特征的输出。
**具体的实现步骤: **flink_dynamic_rules.app.Demo2
1. 获取apolo/flink cdc配置流
2. 获取数据流
3. 广播配置流
4. 数据量与配置流连接 connectDS
5. connectDS 的BroadcastProcessFunction
- 处理广播流
把广播流的数据(类名,jar包路径)添加进mapState中
- 处理数据流
遍历mapState中的数据,通过类加载器加载,再通过反射执行方法
6. 输出 sink
没有合适的资源?快使用搜索试试~ 我知道了~
flink动态业务规则
共54个文件
java:27个
xml:19个
properties:2个
需积分: 9 3 下载量 67 浏览量
2022-08-18
16:52:40
上传
评论
收藏 48KB ZIP 举报
温馨提示
在不停止flink任务的情况下,通过修改规则配置文件/重写类中的方法,实现线上输出特征的修改、新增和删除。
资源详情
资源评论
资源推荐
收起资源包目录
flink-dynamic-rules.zip (54个子文件)
flink-apollo
pom.xml 3KB
flink-apollo.iml 81B
src
test
java
main
resources
META-INF
app.properties 55B
log4j2.xml 134B
java
com
example
flink_apollo
jaizai
Demo.java 3KB
app
Demo.java 563B
Demo2.java 5KB
anno
Demo2.java 428B
Test.java 136B
MyAnno.java 337B
entity
ApolloEvent.java 1KB
MyData.java 1KB
ApiUtil.java 136B
source
MySource.java 782B
ApolloSource.java 2KB
.idea
codeStyles
codeStyleConfig.xml 153B
uiDesigner.xml 9KB
$PRODUCT_WORKSPACE_FILE$ 489B
misc.xml 526B
.name 11B
compiler.xml 545B
workspace.xml 6KB
flink-dynamic-rules
pom.xml 938B
flink-jobs
pom.xml 4KB
Readme.md 4KB
src
test
java
main
resources
META-INF
app.properties 55B
log4j2.xml 134B
lib
anno-1.0-SNAPSHOT.jar 2KB
java
flink_dynamic_rules
jaizai
Demo.java 3KB
app
ApolloDemo.java 596B
Demo2.java 6KB
Demo1.java 8KB
entity
ApolloEvent.java 1KB
MyRule.java 1KB
MyCdcData.java 2KB
MyData.java 1KB
source
MySource.java 776B
MySource1.java 995B
ApolloSource.java 2KB
.idea
codeStyles
Project.xml 269B
codeStyleConfig.xml 153B
uiDesigner.xml 9KB
ZeppelinRemoteNotebooks
runConfigurations.xml 346B
misc.xml 480B
jarRepositories.xml 871B
compiler.xml 584B
workspace.xml 5KB
.gitignore 237B
flink-methods
pom.xml 862B
src
test
java
main
resources
java
com
lqh
methods
version2
Udf2.java 205B
Udf1.java 217B
Udf3.java 225B
version1
Udf2.java 204B
Udf1.java 217B
共 54 条
- 1
重生之我在异世界打工
- 粉丝: 178
- 资源: 2
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功
评论0