# Flink Kudu Connector
This connector provides a source (```KuduInputFormat```), a sink/output
(```KuduSink``` and ```KuduOutputFormat```, respectively),
as well a table source (`KuduTableSource`), an upsert table sink (`KuduTableSink`), and a catalog (`KuduCatalog`),
to allow reading and writing to [Kudu](https://kudu.apache.org/).
To use this connector, add the following dependency to your project:
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-kudu_2.11</artifactId>
<version>1.1-SNAPSHOT</version>
</dependency>
*Version Compatibility*: This module is compatible with Apache Kudu *1.11.1* (last stable version) and Apache Flink 1.10.+.
Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution.
See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/projectsetup/dependencies.html).
## Installing Kudu
Follow the instructions from the [Kudu Installation Guide](https://kudu.apache.org/docs/installation.html).
Optionally, you can use the docker images provided in dockers folder.
## SQL and Table API
The Kudu connector is fully integrated with the Flink Table and SQL APIs. Once we configure the Kudu catalog (see next section)
we can start querying or inserting into existing Kudu tables using the Flink SQL or Table API.
For more information about the possible queries please check the [official documentation](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/)
### Kudu Catalog
The connector comes with a catalog implementation to handle metadata about your Kudu setup and perform table management.
By using the Kudu catalog, you can access all the tables already created in Kudu from Flink SQL queries. The Kudu catalog only
allows users to create or access existing Kudu tables. Tables using other data sources must be defined in other catalogs such as
in-memory catalog or Hive catalog.
When using the SQL CLI you can easily add the Kudu catalog to your environment yaml file:
```
catalogs:
- name: kudu
type: kudu
kudu.masters: <host>:7051
```
Once the SQL CLI is started you can simply switch to the Kudu catalog by calling `USE CATALOG kudu;`
You can also create and use the KuduCatalog directly in the Table environment:
```java
String KUDU_MASTERS="host1:port1,host2:port2"
KuduCatalog catalog = new KuduCatalog(KUDU_MASTERS);
tableEnv.registerCatalog("kudu", catalog);
tableEnv.useCatalog("kudu");
```
### DDL operations using SQL
It is possible to manipulate Kudu tables using SQL DDL.
When not using the Kudu catalog, the following additional properties must be specified in the `WITH` clause:
* `'connector.type'='kudu'`
* `'kudu.masters'='host1:port1,host2:port2,...'`: comma-delimitered list of Kudu masters
* `'kudu.table'='...'`: The table's name within the Kudu database.
If you have registered and are using the Kudu catalog, these properties are handled automatically.
To create a table, the additional properties `kudu.primary-key-columns` and `kudu.hash-columns` must be specified
as comma-delimited lists. Optionally, you can set the `kudu.replicas` property (defaults to 1).
Other properties, such as range partitioning, cannot be configured here - for more flexibility, please use
`catalog.createTable` as described in [this](#Creating-a-KuduTable-directly-with-KuduCatalog) section or create the table directly in Kudu.
The `NOT NULL` constraint can be added to any of the column definitions.
By setting a column as a primary key, it will automatically by created with the `NOT NULL` constraint.
Hash columns must be a subset of primary key columns.
Kudu Catalog
```
CREATE TABLE TestTable (
first STRING,
second STRING,
third INT NOT NULL
) WITH (
'kudu.hash-columns' = 'first',
'kudu.primary-key-columns' = 'first,second'
)
```
Other catalogs
```
CREATE TABLE TestTable (
first STRING,
second STRING,
third INT NOT NULL
) WITH (
'connector.type' = 'kudu',
'kudu.masters' = '...',
'kudu.table' = 'TestTable',
'kudu.hash-columns' = 'first',
'kudu.primary-key-columns' = 'first,second'
)
```
Renaming a table:
```
ALTER TABLE TestTable RENAME TO TestTableRen
```
Dropping a table:
```sql
DROP TABLE TestTableRen
```
#### Creating a KuduTable directly with KuduCatalog
The KuduCatalog also exposes a simple `createTable` method that required only the where table configuration,
including schema, partitioning, replication, etc. can be specified using a `KuduTableInfo` object.
Use the `createTableIfNotExists` method, that takes a `ColumnSchemasFactory` and
a `CreateTableOptionsFactory` parameter, that implement respectively `getColumnSchemas()`
returning a list of Kudu [ColumnSchema](https://kudu.apache.org/apidocs/org/apache/kudu/ColumnSchema.html) objects;
and `getCreateTableOptions()` returning a
[CreateTableOptions](https://kudu.apache.org/apidocs/org/apache/kudu/client/CreateTableOptions.html) object.
This example shows the creation of a table called `ExampleTable` with two columns,
`first` being a primary key; and configuration of replicas and hash partitioning.
```java
KuduTableInfo tableInfo = KuduTableInfo
.forTable("ExampleTable")
.createTableIfNotExists(
() ->
Lists.newArrayList(
new ColumnSchema
.ColumnSchemaBuilder("first", Type.INT32)
.key(true)
.build(),
new ColumnSchema
.ColumnSchemaBuilder("second", Type.STRING)
.build()
),
() -> new CreateTableOptions()
.setNumReplicas(1)
.addHashPartitions(Lists.newArrayList("first"), 2));
catalog.createTable(tableInfo, false);
```
The example uses lambda expressions to implement the functional interfaces.
Read more about Kudu schema design in the [Kudu docs](https://kudu.apache.org/docs/schema_design.html).
### Supported data types
| Flink/SQL | Kudu |
|----------------------|:-----------------------:|
| `STRING` | STRING |
| `BOOLEAN` | BOOL |
| `TINYINT` | INT8 |
| `SMALLINT` | INT16 |
| `INT` | INT32 |
| `BIGINT` | INT64 |
| `FLOAT` | FLOAT |
| `DOUBLE` | DOUBLE |
| `BYTES` | BINARY |
| `TIMESTAMP(3)` | UNIXTIME_MICROS |
Note:
* `TIMESTAMP`s are fixed to a precision of 3, and the corresponding Java conversion class is `java.sql.Timestamp`
* `BINARY` and `VARBINARY` are not yet supported - use `BYTES`, which is a `VARBINARY(2147483647)`
* `CHAR` and `VARCHAR` are not yet supported - use `STRING`, which is a `VARCHAR(2147483647)`
* `DECIMAL` types are not yet supported
### Known limitations
* Data type limitations (see above).
* SQL Create table: primary keys can only be set by the `kudu.primary-key-columns` property, using the
`PRIMARY KEY` constraint is not yet possible.
* SQL Create table: range partitioning is not supported.
* When getting a table through the Catalog, NOT NULL and PRIMARY KEY constraints are ignored. All columns
are described as being nullable, and not being primary keys.
* Kudu tables cannot be altered through the catalog other than simple renaming
## DataStream API
It is also possible to use the Kudu connector directly from the DataStream API however we
encourage all users to explore the Table API as it provides a lot of useful tooling when working
with Kudu data.
### Reading tables into a DataStreams
There are 2 main ways of reading a Kudu Table into a DataStream
1. Using the `KuduCatalog` and the Table API
2. Using the `KuduRowInputFormat` directly
Using the `KuduCatalog` and Table API is
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
收起资源包目录
flink-connector-kudu.zip (44个子文件)
flink-connector-kudu
pom.xml 4KB
src
test
resources
log4j.properties 1KB
log4j2-test.properties 1KB
java
main
resources
META-INF
services
org.apache.flink.table.factories.TableFactory 896B
java
org
apache
flink
connectors
kudu
table
UpsertOperationMapper.java 2KB
KuduTableSink.java 4KB
KuduCatalog.java 13KB
AbstractReadOnlyCatalog.java 7KB
KuduTableSource.java 7KB
KuduTableFactory.java 9KB
utils
KuduTypeUtils.java 5KB
KuduTableUtils.java 14KB
KuduCatalogFactory.java 3KB
streaming
KuduSink.java 5KB
batch
KuduOutputFormat.java 4KB
KuduRowInputFormat.java 5KB
connector
CreateTableOptionsFactory.java 2KB
failure
KuduFailureHandler.java 2KB
DefaultKuduFailureHandler.java 1KB
ColumnSchemasFactory.java 2KB
reader
KuduReader.java 7KB
KuduReaderIterator.java 2KB
KuduReaderConfig.java 3KB
KuduInputSplit.java 1KB
KuduFilterInfo.java 6KB
KuduTableInfo.java 4KB
writer
RowOperationMapper.java 1KB
AbstractSingleOperationMapper.java 4KB
KuduWriter.java 6KB
KuduOperationMapper.java 2KB
PojoOperationMapper.java 3KB
TupleOperationMapper.java 1KB
KuduWriterConfig.java 8KB
.idea
scala_compiler.xml 388B
codeStyles
Project.xml 269B
codeStyleConfig.xml 153B
runConfigurations.xml 346B
misc.xml 480B
jarRepositories.xml 1KB
encodings.xml 267B
compiler.xml 760B
workspace.xml 2KB
.gitignore 184B
README.md 12KB
共 44 条
- 1
重生之我在异世界打工
- 粉丝: 182
- 资源: 2
下载权益
C知道特权
VIP文章
课程特权
开通VIP
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功
- 1
- 2
- 3
前往页