Clink
Java
Java
共203个文件
java: 169
xml: 11
gitignore: 7
png: 3
md: 2
SourceFactory: 2
properties: 2
LICENSE: 1
xsd: 1
DataTypeFactory: 1
Flink流批一体数据处理快速集成开发框架。不仅能够快速构建基于Java的Flink流批一体应用程序,实现异构数据库实时同步和ETL,还可以让Flink SQL变得极其简单,玩转Flink。
简介
Clink 为基于 Java 的 Flink 应用程序提供快速集成的能力,可通过 Clink 快速构建 Flink 流批一体应用程序,实现异构数据库实时同步和 ETL。Clink 提供了数据源管理模块,通过 Clink 运行 Flink SQL 会变得极其简单。使用 clink-clients 可以实现基于 Java API 启动 Clink 应用程序,还可以将 flink 任务实现通过 XML 配置文件来管理。一个典型的Clink部署架构如下:
当然,如果您选择仅使用Flink CDC,那么以上的Debezium和Kafka就不需要了。总体而言,Clink是一个集成开发框架,它能够帮助用户更好地使用Flink及Flink的周边生态(包括但不限于Flink CDC、FlinkX),尤其是Flink SQL和Flink CDC。
Flink版本
Clink对Flink特定版本依赖较弱,已知在1.13+环境下运行良好,用户可根据需要自行选择Flink的发行版本。
开始使用
以Maven项目为例
pom.xml添加依赖(Flink等其他相关依赖此处省略),${clink.version}为版本号,可定义属性或直接使用版本号替换
<!-- https://mvnrepository.com/artifact/cn.tenmg/clink-clients -->
<dependency>
<groupId>cn.tenmg</groupId>
<artifactId>clink-clients</artifactId>
<version>${clink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/cn.tenmg/clink-core -->
<dependency>
<groupId>cn.tenmg</groupId>
<artifactId>clink-core</artifactId>
<version>${clink.version}</version>
</dependency>
配置文件clink.properties
clink.properties用于配置Clink应用运行的数据源以及其他特性等。
# REST 配置,用“,”分隔不同的地址使用“:”分隔地址和端口号,端口号可省略默认为8081
rest.addresses=192.168.100.11,192.168.100.12,192.168.100.13
# 或者也允许使用 rest.address
# rest.address=192.168.100.11,192.168.100.12,192.168.100.13
# 只重试一次(默认值为20),以避免某些节点挂了后重试时间过长
rest.retry.max-attempts=1
# Clink客户端提交执行的默认类名,它不是必需的,默认为 cn.tenmg.clink.ClinkPortal,也可以实现和配置自己的类或者在 jar 中指定主类
# clink.default.class=cn.tenmg.clink.ClinkPortal
#Flink Table API配置
#空值处理配置
table.exec.sink.not-null-enforcer=drop
#Clink数据同步类型转换配置(将BIGINT表示的时间减去8小时得到北京时间,并转为TIMESTAMP)
data.sync.columns.convert=BIGINT,TIMESTAMP:TO_TIMESTAMP(FROM_UNIXTIME(#columnName/1000 - 8*60*60, 'yyyy-MM-dd HH:mm:ss'))
#FlinkSQL数据源配置
#配置名称为kafka的数据源
datasource.kafka.connector=kafka
datasource.kafka.properties.bootstrap.servers=192.168.100.24:9092,192.168.100.25:9092,192.168.100.26:9092
datasource.kafka.properties.group.id=Clink
datasource.kafka.scan.startup.mode=earliest-offset
datasource.kafka.format=debezium-json
datasource.kafka.debezium-json.schema-include=false
#配置名称为bidb的数据源
datasource.bidb.connector=jdbc
datasource.bidb.driver=com.mysql.jdbc.Driver
datasource.bidb.url=jdbc:mysql://192.168.100.66:3306/bidb?useSSL=false&serverTimezone=GMT%2B8&zeroDateTimeBehavior=convertToNull
datasource.bidb.username=your_name
datasource.bidb.password=your_password
datasource.starrocks.jdbc-url=jdbc:mysql://192.168.10.140:9030
datasource.starrocks.load-url=192.168.10.140:8030
datasource.starrocks.connector=starrocks
datasource.starrocks.username=your_name
datasource.starrocks.password=your_password
datasource.starrocks.database-name=your_db
datasource.starrocks.sink.properties.column_separator=\\x01
datasource.starrocks.sink.properties.row_delimiter=\\x02
# the flushing time interval, range: [1000ms, 3600000ms].
datasource.starrocks.sink.buffer-flush.interval-ms=10000
# max retry times of the stream load request, range: [0, 10].
datasource.starrocks.sink.max-retries=3
编写应用入口类(此步骤非必须,可直接使用cn.tenmg.clink.ClinkPortal)
public class ClinkPortal {
/**
* 服务基础包名
*/
private static final String basePackage = "cn.tenmg.clink.quickstart.service";
public static void main(String... args) throws Exception {
ClinkRunner runner = new ClinkRunner() {
@SuppressWarnings("unchecked")
@Override
protected StreamService getStreamService(String serviceName) {// 根据类名获取流服务实例
StreamService streamService = null;
try {
Class<StreamService> streamServiceClass = (Class<StreamService>) Class
.forName(basePackage + "." + serviceName);
streamService = streamServiceClass.newInstance();
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
e.printStackTrace();
}
return streamService;
}
};
runner.run(args);
}
}
运行、监控、取消和停止应用
(1) 提交作业
调用XMLConfigLoader的load方法加载XML配置文件并提交给客户端执行:
Clink clink = XMLConfigLoader.getInstance().load(ClassUtils.getDefaultClassLoader().getResourceAsStream("clink.xml"));
StandaloneRestClusterClient client = new StandaloneRestClusterClient();
JobID jobId = client.submit(clink);
System.out.println("Flink job launched: " + jobId.toHexString());// 启动clink作业
或
Clink clink = XMLConfigLoader.getInstance()
.load("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\r\n" +
"<clink xmlns=\"http://www.10mg.cn/schema/clink\"\r\n" +
" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"\r\n" +
" xsi:schemaLocation=\"http://www.10mg.cn/schema/clink http://www.10mg.cn/schema/clink.xsd\"\r\n" +
" jar=\"/opt/clink/clink-quickstart-1.1.4.jar\" serviceName=\"HelloWorldService\">\r\n" +
"</clink>");
StandaloneRestClusterClient client = new StandaloneRestClusterClient();
JobID jobId = client.submit(clink);
System.out.println("Flink job launched: " + jobId.toHexString());// 启动clink作业
(2) 监控状态
JobID jobId = JobID.fromHexString(hexString);
JobStatus jobStatus = client.getJobStatus(jobId);// 获取作业状态
System.out.println("Job status: " + jobStatus);
(3) 高级功能
//ClusterClient clusterClient = client.getClusterClient(customConf);// 使用自定义配置获取ClusterClient
ClusterClient clusterClient = client.getClusterClient();
// Use clusterClient to do something
(4) 停止作业
System.out.println("Flink job of jobId: " + jobId.toHexString() + " stopped, savepoint path: " + client.stop(jobId));// 停止clink作业
配置手册
XML
使用clink-clients可实现使用XML配置文件来管理Clink任务,这样开发Clink任务会显得非常简单;同时,用户自定义的Clink服务也可以被更轻松得集成到其他系统中。另外,XML文件具有良好的可读性,并且在IDE环境下能够对配置进行自动提示,方便用户更高效地完成任务的配置。
<clink>
<clink>是Clink任务XML配置文件的根节点,需注意必须配置正确的命名空间,通常结构如下:
<clink xmlns="http://www.10mg.cn/schema/clink"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.10mg.cn/schema/clink http://www.10mg.cn/schema/clink.xsd">
</clink>
相关属性及说明:
属性
类型
必需
说明
jar
String
否
运行的JAR包。可通过配置文件的clink.default.jar配置指定默认运行的JAR包。
class
String
否
运行的主类。可通过配置文件的clink.default.
没有合适的资源?快使用搜索试试~ 我知道了~
温馨提示
项目概述: 本项目是基于Java语言开发的Clink Flink流批一体开发框架,旨在为数据处理提供快速集成的开发解决方案。框架包含203个文件,其中主要构成如下: - Java源文件:169个,构成了框架的核心功能; - 配置文件:11个XML文件,用于配置异构数据库同步和ETL任务; - 忽略文件:7个.gitignore文件,用于版本控制; - 图像资源:3个PNG图片,用于文档或演示; - 阅读材料:2个Markdown文件,提供项目指南和说明; - 工厂类:2个SourceFactory和1个DataTypeFactory,简化数据源和类型的配置; - 属性配置:2个properties文件,定义运行时参数; - 许可证:1个LICENSE文件,声明版权和许可信息; - XSD文件:1个,用于定义框架配置的结构。 功能特点: - 快速构建:支持通过Java快速搭建Flink流批一体应用程序; - 数据处理:实现异构数据库间的实时同步及ETL操作; - 简化Flink SQL:使Flink SQL的使用变得简单直观,易于操作和调试。 本框架不仅提高了开发效率,还让Flink流处理和批处理变得更加灵活和高效,是数据工程师和开发者的强大工具。
资源推荐
资源详情
资源评论
收起资源包目录
基于Java的Clink Flink流批一体开发框架源码 (202个子文件)
cn.tenmg.clink.jdbc.DatabaseSwitcher 208B
cn.tenmg.clink.datasource.DataSourceConverter 59B
cn.tenmg.clink.datasource.DataSourceFilter 109B
cn.tenmg.clink.data.type.DataTypeFactory 1KB
.gitignore 242B
.gitignore 59B
.gitignore 59B
.gitignore 59B
.gitignore 59B
.gitignore 59B
.gitignore 58B
AbstractDataSyncJobGenerator.java 24KB
MultiTableDebeziumDeserializationSchema.java 17KB
MultiTableDebeziumDeserializationSchema.java 17KB
MultiTablesDataSyncJobGenerator.java 16KB
SQLUtils.java 15KB
ClinkContext.java 14KB
SQLServerCdcSourceFactory.java 14KB
StandaloneRestClusterClient.java 13KB
MySqlCdcSourceFactory.java 13KB
SingleTableDataSyncJobGenerator.java 9KB
ConfigurationUtils.java 7KB
AbstractClinkClient.java 7KB
Clink.java 7KB
CreateTableOperator.java 6KB
ReadOnlySQLExecuter.java 6KB
JdbcOperator.java 6KB
RowDataTypeFactory.java 4KB
DataSync.java 4KB
DataSync.java 4KB
JDBCUtils.java 3KB
CreateTable.java 3KB
IntervalDataTypeFactory.java 3KB
MetaDataGetter.java 3KB
BasicClinkRunner.java 3KB
AbstractJDBCMetaDataGetter.java 3KB
JSONUtils.java 3KB
DataSyncOperator.java 3KB
CreateTable.java 3KB
MapDataTypeFactory.java 2KB
ClinkClient.java 2KB
ComplexColumn.java 2KB
FlinkSQLTest.java 2KB
Column.java 2KB
MetaDataGetterFactory.java 2KB
AccurateDataTypeFactory.java 2KB
DataTypeUtils.java 2KB
Arguments.java 2KB
SimpleColumn.java 2KB
AbstractOperator.java 2KB
XMLConfigLoader.java 2KB
StandaloneRestClusterClientTest.java 2KB
JDBCDataSourceConverter.java 2KB
DataSourceFilterUtils.java 2KB
StarrocksMetaDataGetter.java 2KB
NacosConfigurationLoader.java 2KB
Column.java 2KB
AbstractSqlOperator.java 2KB
NacosConfigurationLoader.java 2KB
SqlQuery.java 2KB
OperatorUtils.java 2KB
AbstractDataSourceFilter.java 2KB
ClinkClientsUtils.java 2KB
ParallelParsingTest.java 2KB
Jdbc.java 2KB
ClassUtils.java 2KB
Db2DatabaseSwitcher.java 1KB
BshOperator.java 1KB
OracleDatabaseSwitcher.java 1KB
ExecuteSqlOperator.java 1KB
SQLServerDatabaseSwithcer.java 1KB
AbstractConfigurationLoader.java 1KB
Option.java 1KB
AbstractConfigurationLoader.java 1KB
BasicOperate.java 1KB
NestedDataTypeFactory.java 1KB
PopularDatabaseSwitcher.java 1KB
SqlQueryOperator.java 1KB
ExecuteSql.java 1KB
GetSQLExecuter.java 1KB
PrecisionDataTypeFactory.java 1KB
Bsh.java 1KB
FlinkSQLParamsParser.java 1KB
Operate.java 1KB
SelectSQLExecuter.java 1KB
Var.java 1KB
ExecuteSql.java 1KB
StreamTableEnvironmentUtils.java 1016B
ClinkRunner.java 973B
PropertiesFileConfigurationLoader.java 971B
BigIntegerResultGetter.java 952B
Param.java 944B
Column.java 943B
DataSyncJobGenerator.java 911B
PropertiesFileConfigurationLoader.java 900B
Jdbc.java 876B
Bsh.java 855B
SqlQuery.java 847B
Column.java 846B
SourceFactory.java 840B
共 202 条
- 1
- 2
- 3
资源评论
沐知全栈开发
- 粉丝: 5798
- 资源: 5226
下载权益
C知道特权
VIP文章
课程特权
开通VIP
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功