Flink笔记笔记(二十四二十四)::Flink 数据数据 保存保存 Redis(自定义自定义Redis Sink)
本文主要来介绍本文主要来介绍 Flink 读取读取 Kafka数据,并实时下沉数据,并实时下沉(Sink)数据到数据到 Redis 的过程的过程 。
通过如下链接:Flink官方文档,我们知道数据保存到 Redis 的容错机制是 at least once。所以我们通过幂等操作,使用新数据覆盖旧数据的方式,以此来
实现 exactly-once 。
1.代码部分代码部分
1.1 config.properties配置文件配置文件
bootstrap.server=192.168.204.210:9092,192.168.204.211:9092,192.168.204.212:9092
group.id=testGroup
auto.offset.reset=earliest
enable.auto.commit=false
topics=words
redis.host=192.168.204.210
redis.port=6379
redis.password=123456
redis.timeout=5000
redis.db=0
1.2 RedisUtils工具类工具类
/**
* TODO FlinkUtils工具类(持续更新编写)
*
* @author liuzebiao
* @Date 2020-2-18 9:11
*/
public class FlinkUtils {
private static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/**
* 返回 Flink流式环境
* @return
*/
public static StreamExecutionEnvironment getEnv(){
return env;
}
/**
* Flink 从 Kafka 中读取数据(满足Exactly-Once)
* @param parameters
* @param clazz
* @param
* @return
* @throws IllegalAccessException
* @throws InstantiationException
*/
public static DataStream createKafkaStream(ParameterTool parameters, Class clazz) throws IllegalAccessException, InstantiationException {
//设置全局参数
env.getConfig().setGlobalJobParameters(parameters);
//1.只有开启了CheckPointing,才会有重启策略
//设置Checkpoint模式(与Kafka整合,一定要设置Checkpoint模式为Exactly_Once)
env.enableCheckpointing(parameters.getLong("checkpoint.interval",5000L),CheckpointingMode.EXACTLY_ONCE);
//2.默认的重启策略是:固定延迟无限重启
//此处设置重启策略为:出现异常重启3次,隔5秒一次(你也可以在flink-conf.yaml配置文件中写死。此处配置会覆盖配置文件中的)
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(10, Time.seconds(20)));
//系统异常退出或人为 Cancel 掉,不删除checkpoint数据
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
/**2.Source:读取 Kafka 中的消息**/
//Kafka props
Properties properties = new Properties();
//指定Kafka的Broker地址
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, parameters.getRequired("bootstrap.server"));
//指定组ID
properties.put(ConsumerConfig.GROUP_ID_CONFIG, parameters.getRequired("group.id"));
//如果没有记录偏移量,第一次从最开始消费
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, parameters.get("auto.offset.reset","earliest"));
//Kafka的消费者,不自动提交偏移量
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, parameters.get("enable.auto.commit","false"));
String topics = parameters.getRequired("topics");
List topicList = Arrays.asList(topics.split(","));
FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer(topicList, clazz.newInstance(), properties);
return env.addSource(kafkaConsumer);
}
}
1.3 Flink 整合整合 Kafka
Flink整合 Kafka 实现 Exactly Once,本文不再过多介绍,你可以参考:Flink 整合 Kafka (实现 Exactly-Once)
1.4 自定义自定义 Redis Sink
评论0
最新资源