package com.tclgf.etl.flink.config;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.tclgf.etl.flink.connection.PostgresqlJdbcConn;
import com.tclgf.etl.flink.module.FlinkProperties;
import com.tclgf.etl.flink.module.Message;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import lombok.extern.slf4j.Slf4j;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
/*
* @Description:将mysql-cdc数据反序列化成pg的sql语句
* @author: zhenchao.yu
* @date:2024-01-12
* */
@Slf4j
public class Mysql2PgSqlDebeziumDeserializationSchema implements DebeziumDeserializationSchema {
private FlinkProperties flinkProperties;
public Mysql2PgSqlDebeziumDeserializationSchema(FlinkProperties flinkProperties) {
this.flinkProperties = flinkProperties;
}
@Override
public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {
Message message = new Message();
//获取数据本身
Struct struct = (Struct) sourceRecord.value();
if (struct == null) {
return;
}
try {
//大部分都是dml操作
Struct after = struct.getStruct("after");
Struct before = struct.getStruct("before");
Struct source = struct.getStruct("source");
if (! (after == null && before == null)) {
//dml操作类型
message.setChangeType("dml");
//获取操作类型
Envelope.Operation source_operation = Envelope.operationFor(sourceRecord);
String source_operation_type = source_operation.toString().toLowerCase();
message.setType(source_operation_type);
//获取主键
HashMap<String,String> priKeyMap = new HashMap<>();
Struct key = (Struct) sourceRecord.key();
Schema schema;
if (key != null) {
schema = key.schema();
for (Field field : schema.fields()) {
String priStr = "\"-1\""; //如果存在主键为空字符串的,则置为-1
Object priData = key.get(field);
if (priData != null && !("".equals(priData.toString()))) {
priStr = "$flink_cdc_column_value$"+priData.toString()+"$flink_cdc_column_value$";
}
priKeyMap.put(field.name().toLowerCase(),priStr);
}
}
message.setPriKey(priKeyMap);
//获取数据库和表
if (source != null) {
String source_database = null;
String source_table = null;
Object dbData = source.get("db");
if (dbData != null && !("".equals(dbData.toString()))) {
source_database = dbData.toString();
}
Object tableData = source.get("table");
if (tableData != null && !("".equals(tableData.toString()))) {
source_table = tableData.toString();
}
message.setSourceDatabase(source_database);
message.setSourceTable(source_table);
}
//目标数据库
message.setTargetDatabase(flinkProperties.getSink_dbname());
String s = flinkProperties.getMysqlMappingPg().get(message.getSourceDatabase()+"."+message.getSourceTable());
String[] schemaTable = s.split("\\.");
message.setTargetSchema(schemaTable[0]);
//目标表
message.setTargetTable(schemaTable[1]);
//before和after合并,优先after,after有的不替换,after没有的用null替换
if (after != null) {
HashMap<String,String> hashMap = new HashMap<>();
schema = after.schema();
for (Field field : schema.fields()) {
String strings = "null";
Object data = after.get(field);
if (data != null && !("".equals(data.toString()))) {
strings = "$flink_cdc_column_value$"+data.toString()+"$flink_cdc_column_value$";
}
hashMap.put(field.name().toLowerCase(),strings);
}
if (before != null) {
schema = before.schema();
for (Field field : schema.fields()) {
if (! hashMap.containsKey(field.name())) {
hashMap.put(field.name().toLowerCase(),"null");
}
}
}
message.setData(hashMap);
}
}
} catch (Throwable throwable) {
//异常可能是非dml操作造成的
//所以,在这里进行ddl操作
Object historyRecord = struct.get("historyRecord");
if (historyRecord == null) {
return;
}
JSONObject jsonObject = (JSONObject) JSON.parse(historyRecord.toString());
Boolean isDdl = jsonObject.containsKey("ddl");
if (isDdl) {
//判断ddl类型
//先获取到执行的ddl语句
String ddlSql = jsonObject.getString("ddl").toLowerCase().replaceAll("\r\n", " ").replaceAll("\t", " ").replaceAll(" + ", " ");
//只处理增加字段
//这里会进来异常数据 ALTER TABLE tbl_route_info ADD INDEX idx_bussiness_no(bussinesNo) COMMENT '业务单据索引'
if ((ddlSql.contains("alter table") && (ddlSql.contains("add") || ddlSql.contains("add column") || ddlSql.contains("modify")))) {
//拿到json数组
JSONArray tableChanges = jsonObject.getJSONArray("tableChanges");
if (tableChanges == null || tableChanges.size() == 0) {
//先打印告警日志
log.warn(sourceRecord.toString());
//不做任何处理
return;
}
message.setChangeType("ddl");
//拿到json
JSONObject changeInfo = (JSONObject) JSON.parse(tableChanges.get(0).toString().toLowerCase());
//拿到数据库和表名
String schemaTable = changeInfo.getString("id");
//.分隔数据库和表名
String[] schemaTableArr = schemaTable.split("\\.");
//去除双引号后写进message
message.setSourceDatabase(schemaTableArr[0].replaceAll("\"",""));
message.setSourceTable(schemaTableArr[1].replaceAll("\"",""));
HashMap<String,String> hashMap = new HashMap<>();
JSONObject tableInfo = (JSONObject)JSON.parse(changeInfo.getString("table"));
JSONArray columnsInfo = tableInfo.getJSONArray("columns");
没有合适的资源?快使用搜索试试~ 我知道了~
mysql实时同步pg
共33个文件
xml:9个
class:7个
java:7个
需积分: 0 0 下载量 132 浏览量
2024-01-17
18:30:16
上传
评论
收藏 41.65MB ZIP 举报
温馨提示
Flink CDC是一种基于Flink的Change Data Capture(CDC)工具,用于实现数据库之间的实时数据同步。在这个场景中,我们将使用Flink CDC来实现MySQL到PostgreSQL的实时数据同步。 首先,我们需要配置源数据库和目标数据库的信息。在MySQL端,我们需要指定要同步的数据库和表,以及MySQL的连接信息。在PostgreSQL端,我们需要指定要同步的目标数据库和表,以及PostgreSQL的连接信息。 接下来,我们需要运行MysqlCDC中的main函数来启动Flink CDC作业。这个作业将会从MySQL源数据库中捕获变更数据,并将其转发到PostgreSQL目标数据库中。 在这个过程中,Flink CDC会使用datastream方式来处理数据。它会将MySQL中的数据变更转换为Flink的DataStream,并通过自定义的sink将数据写入到PostgreSQL中。 此外,Flink CDC还支持DDL变更,这意味着当MySQL中的表结构发生变化时,Flink CDC能够自动适应这些变化,并将其同步到PostgreSQL中。 另
资源推荐
资源详情
资源评论
收起资源包目录
flinkcdcmysql2pg.zip (33个子文件)
flinkcdcmysql2pg
pom.xml 12KB
src
test
java
main
resources
application.properties 460B
java
com
tclgf
etl
flink
sink
PostgresqlMergeSink.java 3KB
module
Message.java 9KB
FlinkProperties.java 5KB
connection
PostgresqlJdbcConn.java 1KB
test
Test.java 4KB
starter
config
MySqlDateTimeConverter.java 4KB
Mysql2PgSqlDebeziumDeserializationSchema.java 11KB
.idea
jarRepositories.xml 1KB
ZeppelinRemoteNotebooks
uiDesigner.xml 9KB
workspace.xml 7KB
misc.xml 541B
inspectionProfiles
Project_Default.xml 411B
compiler.xml 549B
.gitignore 237B
encodings.xml 390B
target
flinkcdcmysql2pg-1.0-SNAPSHOT.jar 23.39MB
classes
application.properties 460B
com
tclgf
etl
flink
sink
PostgresqlMergeSink.class 3KB
module
FlinkProperties.class 11KB
Message.class 12KB
connection
PostgresqlJdbcConn.class 5KB
test
Test.class 6KB
config
MySqlDateTimeConverter.class 6KB
Mysql2PgSqlDebeziumDeserializationSchema.class 10KB
original-flinkcdcmysql2pg-1.0-SNAPSHOT.jar 27KB
dependency-reduced-pom.xml 12KB
flinkcdcmysql2pg-1.0-SNAPSHOT-shaded.jar 23.39MB
maven-status
maven-compiler-plugin
compile
default-compile
createdFiles.lst 0B
inputFiles.lst 706B
testCompile
default-testCompile
inputFiles.lst 0B
maven-archiver
pom.properties 122B
generated-sources
annotations
共 33 条
- 1
资源评论
sqlboy-yuzhenc
- 粉丝: 142
- 资源: 3
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功