没有合适的资源?快使用搜索试试~ 我知道了~
Flink入门:读取Kafka实时数据流,实现WordCount
1.该资源内容由用户上传,如若侵权请联系客服进行举报
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
版权申诉
5星 · 超过95%的资源 24 下载量 125 浏览量
2021-01-07
11:35:51
上传
评论
收藏 437KB PDF 举报
温馨提示
本文主要介绍Flink接收一个Kafka文本数据流,进行WordCount词频统计,然后输出到标准输出上。通过本文你可以了解如何编写和运行Flink程序。 代码拆解 首先要设置Flink的执行环境: // 创建Flink执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 设置Kafka相关参数,连接对应的服务器和端口号,读取名为Shakespeare的Topic中的数据源,将数据源命名为stream: // Kafka参数 Properties propert
资源推荐
资源详情
资源评论
Flink入门:读取入门:读取Kafka实时数据流,实现实时数据流,实现WordCount
本文主要介绍Flink接收一个Kafka文本数据流,进行WordCount词频统计,然后输出到标准输出上。通过本文你可以了解如何编写和运行Flink程序。
代码拆解代码拆解
首先要设置Flink的执行环境:
// 创建Flink执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
设置Kafka相关参数,连接对应的服务器和端口号,读取名为Shakespeare的Topic中的数据源,将数据源命名为stream:
// Kafka参数
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "flink-group");
String inputTopic = "Shakespeare";
String outputTopic = "WordCount";
// Source
FlinkKafkaConsumer consumer =
new FlinkKafkaConsumer(inputTopic, new SimpleStringSchema(), properties);
DataStream stream = env.addSource(consumer);
使用Flink算子处理这个数据流:
// Transformations
// 使用Flink算子对输入流的文本进行操作
// 按空格切词、计数、分区、设置时间窗口、聚合
DataStream<Tuple2> wordCount = stream
.flatMap((String line, Collector<Tuple2> collector) -> {
String[] tokens = line.split("\s");
// 输出结果 (word, 1)
for (String token : tokens) {
if (token.length() > 0) {
collector.collect(new Tuple2(token, 1));
}
}
})
.returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
这里使用的是Flink提供的DataStream级别的API,主要包括转换、分组、窗口和聚合等操作。
将数据流打印:
// Sink
wordCount.print();
最后执行这个程序:
// execute
env.execute("kafka streaming word count");
env.execute 是启动Flink作业所必需的,只有在execute()被调用时,之前调用的各个操作才会在提交到集群上或本地计算机上执行。
完整代码如下:
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import java.util.Properties;
public class WordCountKafkaInStdOut {
public static void main(String[] args) throws Exception {
// 创建Flink执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Kafka参数
Properties properties = new Properties();
资源评论
- 无用书生~2023-01-09内容与描述一致,超赞的资源,值得借鉴的内容很多,支持!
- qq_532285372023-05-12感谢大佬分享的资源,对我启发很大,给了我新的灵感。
- m0_654996492024-08-20资源很受用,资源主总结的很全面,内容与描述一致,解决了我当下的问题。
- 2301_768318172023-08-03感谢资源主分享的资源解决了我当下的问题,非常有用的资源。
weixin_38659527
- 粉丝: 6
- 资源: 871
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
- Keil C51 插件 检测所有if语句
- 各种排序算法java实现的源代码.zip
- 金山PDF教育版编辑器
- 基于springboot+element的校园服务平台源代码项目包含全套技术资料.zip
- 自动化应用驱动的容器弹性管理平台解决方案
- 各种排序算法 Python 实现的源代码
- BlurAdmin 是一款使用 AngularJs + Bootstrap实现的单页管理端模版,视觉冲击极强的管理后台,各种动画效果
- 基于JSP+Servlet的网上书店系统源代码项目包含全套技术资料.zip
- GGJGJGJGGDGGDGG
- 基于SpringBoot的毕业设计选题系统源代码项目包含全套技术资料.zip
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功