package com.shangjack.operatorstate;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.List;
public class ThresholdWarning extends RichFlatMapFunction<Tuple2<String, Long>, Tuple2<String, List<Tuple2<String, Long>>>> implements CheckpointedFunction {
// 非正常数据
private List<Tuple2<String, Long>> bufferedData;
// checkPointedState
private transient ListState<Tuple2<String, Long>> checkPointedState;
// 需要监控的阈值
private Long threshold;
// 次数
private Integer numberOfTimes;
ThresholdWarning(Long threshold, Integer numberOfTimes) {
this.threshold = threshold;
this.numberOfTimes = numberOfTimes;
this.bufferedData = new ArrayList<>();
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// 注意这里获取的是OperatorStateStore
checkPointedState = context.getOperatorStateStore().getListState(new ListStateDescriptor<>("abnormalData",
TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {
})));
// 如果发生重启,则需要从快照中将状态进行恢复
if (context.isRestored()) {
for (Tuple2<String, Long> element : checkPointedState.get()) {
bufferedData.add(element);
}
}
}
@Override
public void flatMap(Tuple2<String, Long> value, Collector<Tuple2<String, List<Tuple2<String, Long>>>> out) {
Long inputValue = value.f1;
// 超过阈值则进行记录
if (inputValue >= threshold) {
bufferedData.add(value);
}
// 超过指定次数则输出报警信息
if (bufferedData.size() >= numberOfTimes) {
// 顺便输出状态实例的hashcode
out.collect(Tuple2.of(checkPointedState.hashCode() + "阈值警报!", bufferedData));
bufferedData.clear();
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// 在进行快照时,将数据存储到checkPointedState
checkPointedState.clear();
for (Tuple2<String, Long> element : bufferedData) {
checkPointedState.add(element);
}
}
}
没有合适的资源?快使用搜索试试~ 我知道了~
Flink状态管理和检查点机制项目代码.rar
共62个文件
xml:55个
java:5个
properties:1个
需积分: 0 0 下载量 50 浏览量
2023-10-19
10:53:03
上传
评论
收藏 28KB RAR 举报
温馨提示
相对于其他流计算框架,Flink 一个比较重要的特性就是其支持有状态计算。即你可以将中间的计算结果进行保存,并提供给后续的计算使用: 具体而言,Flink 又将状态 (State) 分为 Keyed State 与 Operator State: 算子状态 (Operator State):顾名思义,状态是和算子进行绑定的,一个算子的状态不能被其他算子所访问到。 键控状态 (Keyed State) :是一种特殊的算子状态,即状态是根据 key 值进行区分的,Flink 会为每类键值维护一个状态实例。 为了使 Flink 的状态具有良好的容错性,Flink 提供了检查点机制 (CheckPoints) 。通过检查点机制,Flink 定期在数据流上生成 checkpoint barrier ,当某个算子收到 barrier 时,即会基于当前状态生成一份快照,然后再将该 barrier 传递到下游算子,下游算子接收到该 barrier 后,也基于当前状态生成一份快照,依次传递直至到最后的 Sink 算子上。当出现异常后,Flink 就可以根据最近的一次的快照数据将所有算子恢复到先前的状态
资源推荐
资源详情
资源评论
收起资源包目录
Flink状态管理和检查点机制项目代码.rar (62个子文件)
flink-state-management
flink-state-management.iml 6KB
pom.xml 7KB
src
main
resources
log4j.properties 1KB
java
com
shangjack
operatorstate
ThresholdWarning.java 3KB
OperatorStateJob.java 1KB
keyedstate
ThresholdWarning.java 2KB
ThresholdWarningWithTTL.java 2KB
KeyedStateJob.java 1KB
.idea
codeStyles
codeStyleConfig.xml 153B
sbt.xml 246B
libraries
Maven__commons_collections_commons_collections_3_2_2.xml 616B
Maven__org_apache_commons_commons_lang3_3_3_2.xml 570B
Maven__com_twitter_chill_2_11_0_7_6.xml 521B
Maven__com_typesafe_akka_akka_actor_2_11_2_5_21.xml 587B
Maven__commons_io_commons_io_2_4.xml 503B
Maven__org_apache_commons_commons_math3_3_5.xml 556B
Maven__org_apache_flink_flink_shaded_netty_4_1_32_Final_7_0.xml 674B
Maven__com_typesafe_config_1_3_3.xml 497B
Maven__org_apache_flink_flink_java_1_9_0.xml 541B
Maven__org_apache_flink_flink_hadoop_fs_1_9_0.xml 576B
Maven__com_google_code_findbugs_jsr305_1_3_9.xml 545B
Maven__org_apache_flink_flink_statebackend_rocksdb_2_11_1_9_0.xml 688B
Maven__commons_cli_commons_cli_1_3_1.xml 528B
Maven__org_apache_flink_flink_shaded_asm_6_6_2_1_7_0.xml 625B
Maven__org_slf4j_slf4j_api_1_7_15.xml 513B
Maven__org_apache_flink_flink_streaming_java_2_11_1_9_0.xml 646B
Maven__org_apache_flink_flink_annotations_1_9_0.xml 590B
Maven__org_scala_lang_modules_scala_parser_combinators_2_11_1_1_1.xml 698B
Maven__com_esotericsoftware_kryo_kryo_2_24_0.xml 542B
Maven__org_slf4j_slf4j_log4j12_1_7_7.xml 534B
Maven__org_apache_flink_flink_metrics_core_1_9_0.xml 597B
Maven__org_apache_commons_commons_compress_1_18.xml 584B
Maven__org_apache_flink_flink_clients_2_11_1_9_0.xml 597B
Maven__org_apache_flink_flink_shaded_jackson_2_9_8_7_0.xml 639B
Maven__com_typesafe_akka_akka_protobuf_2_11_2_5_21.xml 608B
Maven__org_clapper_grizzled_slf4j_2_11_1_3_2.xml 584B
Maven__org_xerial_snappy_snappy_java_1_1_4.xml 552B
Maven__com_typesafe_ssl_config_core_2_11_0_3_7.xml 595B
Maven__org_objenesis_objenesis_2_1.xml 508B
Maven__org_apache_flink_force_shading_1_9_0.xml 562B
Maven__com_esotericsoftware_minlog_minlog_1_2.xml 543B
Maven__org_apache_flink_flink_core_1_9_0.xml 541B
Maven__org_apache_flink_flink_queryable_state_client_java_1_9_0.xml 702B
Maven__com_twitter_chill_java_0_7_6.xml 521B
Maven__log4j_log4j_1_2_17.xml 469B
Maven__org_apache_flink_flink_shaded_guava_18_0_7_0.xml 618B
Maven__com_github_scopt_scopt_2_11_3_5_0.xml 541B
Maven__com_typesafe_akka_akka_stream_2_11_2_5_21.xml 594B
Maven__org_scala_lang_modules_scala_java8_compat_2_11_0_7_0.xml 656B
Maven__org_apache_flink_flink_runtime_2_11_1_9_0.xml 597B
Maven__org_javassist_javassist_3_19_0_GA.xml 550B
Maven__org_reactivestreams_reactive_streams_1_0_2.xml 595B
Maven__org_scala_lang_scala_library_2_11_12.xml 568B
Maven__org_apache_flink_flink_optimizer_2_11_1_9_0.xml 611B
Maven__com_data_artisans_frocksdbjni_5_17_2_artisans_1_0.xml 650B
Maven__com_typesafe_akka_akka_slf4j_2_11_2_5_21.xml 587B
vcs.xml 197B
workspace.xml 4KB
misc.xml 394B
compiler.xml 671B
modules.xml 291B
encodings.xml 267B
共 62 条
- 1
资源评论
shangjg3
- 粉丝: 1049
- 资源: 101
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
- 论文(最终)_20240430235101.pdf
- 基于python编写的Keras深度学习框架开发,利用卷积神经网络CNN,快速识别图片并进行分类
- 最全空间计量实证方法(空间杜宾模型和检验以及结果解释文档).txt
- 5uonly.apk
- 蓝桥杯Python组的历年真题
- 2023-04-06-项目笔记 - 第一百十九阶段 - 4.4.2.117全局变量的作用域-117 -2024.04.30
- 2023-04-06-项目笔记 - 第一百十九阶段 - 4.4.2.117全局变量的作用域-117 -2024.04.30
- 前端开发技术实验报告:内含4四实验&实验报告
- Highlight Plus v20.0.1
- 林周瑜-论文.docx
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功