// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.demo.flink.cdc;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.oracle.OracleSource;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.cfg.DorisSink;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import java.util.Properties;
/**
*Realize the consumption of oracle log data through flink cdc,
* and then import oracle data into doris table data in real time
* through the flink doris connector RowData data stream;
*/
public class FlinkOracleCdcDemo {
private static final String DATABASE_NAME = "xxx";
private static final String HOST_NAME = "127.0.0.1";
private static final int PORT = 1521;
private static final String SCHEMA_NAME = "xxx";
private static final String TABLE_NAME = "schema_name.table_name";
private static final String USER_NAME = "xxx";
private static final String PASSWORD = "xxx";
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
Properties props = new Properties();
props.setProperty("debezium.database.tablename.case.insensitive","false");
props.setProperty("debezium.log.mining.strategy","online_catalog");
props.setProperty("debezium.log.mining.continuous.mine","true");
SourceFunction<JSONObject> build = OracleSource.<JSONObject>builder()
.database(DATABASE_NAME)
.hostname(HOST_NAME)
.port(PORT)
.username(USER_NAME)
.password(PASSWORD)
.schemaList(SCHEMA_NAME)
.tableList(TABLE_NAME)
.debeziumProperties(props)
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
DataStreamSource<JSONObject> dataStreamSource = env.addSource(build);
SingleOutputStreamOperator<RowData> map = dataStreamSource.map(new MapFunction<JSONObject, RowData>() {
@Override
public RowData map(JSONObject jsonObject) throws Exception {
GenericRowData genericRowData = new GenericRowData(4);
genericRowData.setField(0, Integer.valueOf(jsonObject.getInteger("id")));
genericRowData.setField(1, StringData.fromString(jsonObject.getString("name")));
genericRowData.setField(2, StringData.fromString(jsonObject.getString("description")));
genericRowData.setField(3, Double.valueOf(jsonObject.getDoubleValue("weight")));
return genericRowData;
}
});
String[] fields = {"id", "name", "description","weight"};
LogicalType[] types={new IntType(),new VarCharType(),new VarCharType(), new DoubleType()};
map.addSink(
DorisSink.sink(
fields,
types,
DorisReadOptions.builder().build(),
DorisExecutionOptions.builder()
.setBatchSize(3)
.setMaxRetries(3)
.build(),
DorisOptions.builder()
.setFenodes("127.0.0.1:8030")
.setTableIdentifier("db_name.table_name")
.setUsername("root")
.setPassword("").build()
));
env.execute("flink oracle cdc");
}
}
Flink实时同步Oracle数据到Doris
176 浏览量
2023-11-30
10:35:04
上传
评论
收藏 9KB RAR 举报
shangjg3
- 粉丝: 1097
- 资源: 101
最新资源
- 毕业设计求职招聘系统-SpringBoot+vue前后端源码+数据库sql.zip
- 利用Rosweb实现ros与网页交互(1)
- 基于卷积神经网络的医学病理图像识别源码+数据集+详细资料合集.zip
- 基于树莓派的自动驾驶小车,利用树莓派和tensorflow实现小车在赛道的自动驾驶
- 前端面试必备 - Vue篇.rar
- LM324_datasheet.pdf
- 全新Storm+Core+API管理系统源码
- 基于RP2040的电子沙漏,使用RP2040游戏机开发板,灯板是74HC595驱动的8*8LED矩阵
- 基于SSM和VUE的商店POS积分管理系统(免费提供全套java开源项目源码+论文)
- 基于SpringBoot的“在线动漫信息平台”的设计与实现.rar
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈