package flink;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import scala.collection.script.Start;
import java.util.Properties;
public class FlinkCDC2 {
public static void main(String[] args) throws Exception {
String ckAndGroupIdAndJobName = "sap_sapgateway10";
// Configuration conf = new Configuration();
// conf.setInteger("rest.port",33334);
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
//1.创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(10);
//2.Flink-CDC将读取binlog的位置信息以状态的方式保存在CK,如果想要做到断点续传,需要从Checkpoint或者Savepoint启动程序
//2.1 开启Checkpoint,每隔10分鐘做一次CK
env.enableCheckpointing(10*6000L, CheckpointingMode.EXACTLY_ONCE);//头和头的之间
env.getCheckpointConfig().setCheckpointTimeout(10*60000L);
//2.3 设置任务关闭的时候保留最后一次CK数据
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5*60000L);//头和尾
//2.4 指定从CK自动重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(20, 3000L));
//2.5 设置状态后端
env.setStateBackend(new HashMapStateBackend());
// 3. 设置 checkpoint 的存储路径
// env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop103:8020/ck/" + ckAndGroupIdAndJobName);
env.getCheckpointConfig().setCheckpointStorage("hdfs://mycluster:8020/ck/" + ckAndGroupIdAndJobName);
//2.6 设置访问HDFS的用户名
System.setProperty("HADOOP_USER_NAME", "sarah");
//3.创建Flink-MySQL-CDC的Source
//自定义时间转换配置
Properties properties = new Properties();
properties.setProperty("converters", "dateConverters");
properties.setProperty("dateConverters.type", "flink.MySqlDateTimeConverter");
//4.定义jdbc配置
Properties properties2 = new Properties();
properties2.setProperty("useSSL","false");
properties2.setProperty("characterEncoding","utf8");
//构建mysqlSource
// MySqlSource mysqlCdcSource =
// MySqlSource.<String>builder()
// .hostname("hadoop102")
// .port(3306)
// .username("root")
// .password("mivbAs7Awc")
// .databaseList("student")
// .tableList("student.table_name","student.table_name2")//可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据,注意:指定的时候需要使用"db.table"的方式
// .serverTimeZone("Asia/Shanghai")
// .debeziumProperties(properties)
// .scanNewlyAddedTableEnabled(true)
// .deserializer(new CustomerDeserialization()) // converts SourceRecord to JSON String
// .build();
// //构建mysqlSource
// MySqlSource mysqlCdcSource =
// MySqlSource.<String>builder()
// .hostname("172.16.11.154")
// .port(3306)
// .username("root")
// .password("mivbAs7Awc")
// .databaseList("easypm")
// .tableList(
// "easypm.ads_confirmed_requirement",
// "easypm.ads_group_result_stat",
// "easypm.ads_resouces",
// "easypm.ads_requirement_to_be_confirmed",
// "easypm.ads_hdb_quota"
// )//可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据,注意:指定的时候需要使用"db.table"的方式
//// .databaseList("student")
//// .tableList()
// .serverTimeZone("Asia/Shanghai")
// .debeziumProperties(properties)
// .scanNewlyAddedTableEnabled(true)
// .deserializer(new CustomerDeserialization()) // converts SourceRecord to JSON String
// .build();
// MySqlSource mysqlCdcSource = MySqlSource.<String>builder()
// .hostname("172.20.2.124")
// .port(3306)
// .username("sapgateway")
// .password("e5MyOF9WWUxXQO80")
// .databaseList("sap_gateway")
// .jdbcProperties(properties2)
// .tableList(
// "sap_gateway.EKET"
// ,"sap_gateway.EKKO",
// "sap_gateway.EKPO",
// "sap_gateway.LFA1",
// "sap_gateway.LIKP",
// "sap_gateway.LIPS",
// "sap_gateway.MAKT",
// "sap_gateway.MARA",
// "sap_gateway.MARC",
// "sap_gateway.MARD",
// "sap_gateway.MKPF",
// "sap_gateway.MLGN",
// "sap_gateway.MSEG",
// "sap_gateway.T001L",
// "sap_gateway.T001W",
// "sap_gateway.T001W_BIW",
// "sap_gateway.T300T"
// ,"sap_gateway.T305T",
// "sap_gateway.VBAK",
// "sap_gateway.VBAP",
// "sap_gateway.VBEP",
// "sap_gateway.VBRK",
// "sap_gateway.VBRP",
// "sap_gateway.ZTMM040",
// "sap_gateway.ZTMM090",
// "sap_gateway.ZTMM093",
// "sap_gateway.ZTWM006",
// "sap_gateway.ZTWM026",
// "sap_gateway.ZTWM048",
// "sap_gateway.ZTWM049",
// "sap_gateway.ZTWM050"
// )
// .startupOptions(StartupOptions.initial())
//// .tableList("sap_gateway.T300T")
// .debeziumProperties(properties)
// .scanNewlyAddedTableEnabled(true)
// .deserializer(new CustomerDeserialization()) // converts SourceRecord to JSON String
// .build();
// //构建mysqlSource
MySqlSource mysqlCdcSource =
MySqlSource.<String>builder()
.hostname("172.16.11.154")
.port(3306)
.username("root")
.password("mivbAs7Awc")
// .databaseList("easypm")
// .tableList(
// "easypm.ads_confirmed_requirement",
// "easypm.ads_group_result_stat",
// "easypm.ads_resouces",
// "easypm.ads_requirement_to_be_confirmed",
// "easypm.ads_hdb_quota"
// )//可选
flinkcdc code
需积分: 0 199 浏览量
2023-08-15
16:00:28
上传
评论
收藏 8KB ZIP 举报
m0_37759590
- 粉丝: 249
- 资源: 17