# Flink CDC Connectors
Flink CDC Connectors is a set of source connectors for Apache Flink, ingesting changes from different databases using change data capture (CDC).
The Flink CDC Connectors integrates Debezium as the engine to capture data changes. So it can fully leverage the ability of Debezium. See more about what is [Debezium](https://github.com/debezium/debezium).
This README is meant as a brief walkthrough on the core features with Flink CDC Connectors. For a fully detailed documentation, please see [Documentation](https://github.com/ververica/flink-cdc-connectors/wiki).
## Supported (Tested) Connectors
| Database | Version |
| --- | --- |
| MySQL | Database: 5.7, 8.0.x <br/>JDBC Driver: 8.0.16 |
| PostgreSQL | Database: 9.6, 10, 11, 12 <br/>JDBC Driver: 42.2.12|
## Features
1. Supports reading database snapshot and continues to read binlogs with **exactly-once processing** even failures happen.
2. CDC connectors for DataStream API, users can consume changes on multiple databases and tables in a single job without Debezium and Kafka deployed.
3. CDC connectors for Table/SQL API, users can use SQL DDL to create a CDC source to monitor changes on a single table.
## Usage for Table/SQL API
We need several steps to setup a Flink cluster with the provided connector.
1. Setup a Flink cluster with version 1.11+ and Java 8+ installed.
2. Download the connector SQL jars from the [Download](https://github.com/ververica/flink-cdc-connectors/wiki/Downloads) page (or [build yourself](#building-from-source).
3. Put the downloaded jars under `FLINK_HOME/lib/`.
4. Restart the Flink cluster.
The example shows how to create a MySQL CDC source in [Flink SQL Client](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sqlClient.html) and execute queries on it.
```sql
-- creates a mysql cdc table source
CREATE TABLE mysql_binlog (
id INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10,3)
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database-name' = 'inventory',
'table-name' = 'products'
);
-- read snapshot and binlog data from mysql, and do some transformation, and show on the client
SELECT id, UPPER(name), description, weight FROM mysql_binlog;
```
## Usage for DataStream API
Include following Maven dependency (available through Maven Central):
```
<dependency>
<groupId>com.alibaba.ververica</groupId>
<!-- add the dependency matching your database -->
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.0.0</version>
</dependency>
```
```java
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
public class MySqlBinlogSourceExample {
public static void main(String[] args) throws Exception {
SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("inventory") // monitor all tables under inventory database
.username("flinkuser")
.password("flinkpw")
.deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.addSource(sourceFunction)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute();
}
}
```
## Building from source
Prerequisites:
- git
- Maven
- At least Java 8
```
git clone https://github.com/ververica/flink-cdc-connectors.git
cd flink-cdc-connectors
mvn clean install -DskipTests
```
Flink CDC Connectors is now available at your local `.m2` repository.
## License
The code in this repository is licensed under the [Apache Software License 2](https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE).
## Contributing
The Flink CDC Connectors welcomes anyone that wants to help out in any way, whether that includes reporting problems, helping with documentation, or contributing code changes to fix bugs, add tests, or implement new features. You can report problems to request features in the [GitHub Issues](https://github.com/ververica/flink-cdc-connectors/issues).
没有合适的资源?快使用搜索试试~ 我知道了~
flink-cdc-connectors:更改Apache Flink的数据捕获(CDC)连接器
共67个文件
java:39个
xml:10个
sql:6个
需积分: 49 11 下载量 159 浏览量
2021-02-05
08:15:24
上传
评论 2
收藏 157KB ZIP 举报
温馨提示
Flink CDC连接器 Flink CDC连接器是Apache Flink的一组源连接器,使用更改数据捕获(CDC)从不同的数据库中提取更改。 Flink CDC连接器将Debezium集成为引擎来捕获数据更改。 因此,它可以充分利用Debezium的功能。 进一步了解什么是 。 本自述文件旨在简要介绍Flink CDC连接器的核心功能。 有关详细文档,请参阅。 支持(经过测试)的连接器 数据库 版 MySQL 数据库:5.7,8.0.x JDBC驱动程序:8.0.16 PostgreSQL 数据库:9.6,10,11,12 JDBC驱动程序:42.2.12 产品特点 支持读取数据库快
资源推荐
资源详情
资源评论
收起资源包目录
flink-cdc-connectors-master.zip (67个子文件)
flink-cdc-connectors-master
.gitignore 235B
flink-connector-mysql-cdc
src
main
resources
META-INF
services
org.apache.flink.table.factories.Factory 855B
java
com
alibaba
ververica
cdc
connectors
mysql
table
MySQLTableSourceFactory.java 5KB
MySQLTableSource.java 5KB
MySQLSource.java 6KB
test
resources
ddl
inventory.sql 4KB
column_type_test.sql 2KB
log4j2-test.properties 1KB
docker
setup.sql 2KB
my.cnf 2KB
java
com
alibaba
ververica
cdc
connectors
mysql
MySQLSourceTest.java 25KB
table
MySQLTableSourceFactoryTest.java 6KB
MySQLConnectorITCase.java 11KB
utils
UniqueDatabase.java 5KB
MySQLContainer.java 5KB
MySQLTestBase.java 2KB
MySQLSourceITCase.java 2KB
pom.xml 6KB
flink-sql-connector-mysql-cdc
src
main
resources
META-INF
NOTICE 237B
java
com
alibaba
ververica
cdc
connectors
mysql
DummyDocs.java 985B
pom.xml 4KB
LICENSE 11KB
flink-format-changelog-json
src
main
resources
META-INF
services
org.apache.flink.table.factories.Factory 848B
java
com
alibaba
ververica
cdc
formats
json
ChangelogJsonSerializationSchema.java 4KB
ChangelogJsonDeserializationSchema.java 5KB
ChangelogJsonFormatFactory.java 5KB
test
resources
changelog-json-data.txt 2KB
java
com
alibaba
ververica
cdc
formats
json
ChangelogJsonSerDeTest.java 9KB
ChangelogJsonFormatFactoryTest.java 6KB
pom.xml 3KB
pom.xml 21KB
tools
maven
checkstyle.xml 22KB
suppressions.xml 3KB
README.md 4KB
NOTICE 50B
flink-connector-postgres-cdc
src
main
resources
META-INF
services
org.apache.flink.table.factories.Factory 857B
java
com
alibaba
ververica
cdc
connectors
postgres
table
PostgresValueValidator.java 2KB
PostgreSQLTableSource.java 5KB
PostgreSQLTableFactory.java 5KB
PostgreSQLSource.java 6KB
test
resources
ddl
replica_identity.sql 2KB
inventory.sql 2KB
column_type_test.sql 2KB
log4j2-test.properties 1KB
java
com
alibaba
ververica
cdc
connectors
postgres
PostgresTestBase.java 4KB
table
PostgreSQLConnectorITCase.java 12KB
PostgreSQLTableFactoryTest.java 6KB
PostgreSQLSourceTest.java 20KB
pom.xml 6KB
flink-connector-test-util
src
main
java
com
alibaba
ververica
cdc
connectors
utils
AssertUtils.java 8KB
TestSourceContext.java 2KB
pom.xml 2KB
flink-sql-connector-postgres-cdc
src
main
java
com
alibaba
ververica
cdc
connectors
postgres
DummyDocs.java 988B
pom.xml 3KB
flink-connector-debezium
src
main
java
com
alibaba
ververica
cdc
debezium
StringDebeziumDeserializationSchema.java 2KB
table
DebeziumOptions.java 2KB
RowDataDebeziumDeserializeSchema.java 13KB
DebeziumSourceFunction.java 13KB
utils
TemporalConversions.java 7KB
DebeziumDeserializationSchema.java 2KB
internal
DebeziumChangeConsumer.java 6KB
ErrorReporter.java 1KB
FlinkDatabaseHistory.java 5KB
FlinkOffsetBackingStore.java 7KB
DebeziumState.java 2KB
DebeziumStateSerializer.java 2KB
pom.xml 2KB
共 67 条
- 1
资源评论
CharlesXiao
- 粉丝: 13
- 资源: 4489
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功