// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.demo.flink.analyze;
import org.apache.doris.demo.flink.analyze.User;
import org.apache.doris.flink.cfg.DorisStreamOptions;
import org.apache.doris.flink.datastream.DorisSourceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
import java.util.Properties;
/**
* This example mainly demonstrates the use of Flink connector to read Doris data,
* build DataStream for analysis, and sum.
* <p>
* use flink doris connector
*/
public class FlinkDorisConnectorAnalyze {
public static void main(String[] args) throws Exception {
Properties properties = new Properties();
properties.put("fenodes", "IP:8030");
properties.put("username", "root");
properties.put("password", "");
properties.put("table.identifier", "test1.doris_test_source_2");
DorisStreamOptions options = new DorisStreamOptions(properties);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataStreamSource dataStreamSource = env.addSource(new DorisSourceFunction(options, new User()));
SingleOutputStreamOperator outputStream = dataStreamSource.map(new MapFunction<Object, User>() {
@Override
public User map(Object obj) throws Exception {
User user = new User();
if (obj instanceof ArrayList<?>) {
user.setName(((ArrayList<?>) obj).get(0).toString());
user.setAge(Integer.valueOf(((ArrayList<?>) obj).get(1).toString()));
user.setPrice(((ArrayList<?>) obj).get(2).toString());
user.setSale(((ArrayList<?>) obj).get(3).toString());
}
return user;
}
});
outputStream.keyBy(new KeySelector<User, String>() {
@Override
public String getKey(User user) throws Exception {
return user.getName();
}
})
.sum("age")
.print();
env.execute("Flink doris connector analyze test");
}
}
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
收起资源包目录
flink-doris-connnector-analyze.rar (7个子文件)
flink-doris-connnector-analyze
pom.xml 6KB
src
test
java
main
resources
java
org
apache
doris
demo
flink
analyze
DorisSource.java 3KB
FlinkDorisConnectorAnalyze.java 3KB
FlinkJdbcConnectorAnalyze.java 3KB
instructions.md 2KB
User.java 2KB
flink-doris-connnector-analyze.iml 30KB
共 7 条
- 1
资源评论
shangjg3
- 粉丝: 1146
- 资源: 101
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
- 实验二:IP协议分析.zip
- 驱动代码驱动代码驱动代码驱动代码
- SVID_20240523_141155_1.mp4
- Code for the complete guide to tkinter tutorial
- 关于百货中心供应链管理系统.zip
- SimpleFolderIcon-master 修改Unity的Project下的文件夹图标
- A python Tkinter widget to display tile based maps
- A pure Python library for adding tables to a Tkinter application
- Vector资源文件.zip
- MobaXterm-Installer
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功