# Flink-ClickHouse-Sink
[![Build Status](https://travis-ci.com/ivi-ru/flink-clickhouse-sink.svg?branch=master)](https://travis-ci.com/ivi-ru/flink-clickhouse-sink)
[![Maven Central](https://maven-badges.herokuapp.com/maven-central/ru.ivi.opensource/flink-clickhouse-sink/badge.svg)](https://maven-badges.herokuapp.com/maven-central/ru.ivi.opensource/flink-clickhouse-sink/)
## Description
[Flink](https://github.com/apache/flink) sink for [ClickHouse](https://github.com/yandex/ClickHouse) database.
Powered by [Async Http Client](https://github.com/AsyncHttpClient/async-http-client).
High-performance library for loading data to ClickHouse.
It has two triggers for loading data:
_by timeout_ and _by buffer size_.
##### Version map
|flink |flink-clickhouse-sink |
|:-------:|:--------------------:|
|1.3.* |1.0.0 |
|1.9.* |1.3.1 |
### Install
##### Maven Central
```xml
<dependency>
<groupId>ru.ivi.opensource</groupId>
<artifactId>flink-clickhouse-sink</artifactId>
<version>1.3.1</version>
</dependency>
```
## Usage
### Properties
The flink-clickhouse-sink uses two parts of configuration properties:
common and for each sink in you operators chain.
**The common part** (use like global):
`clickhouse.sink.num-writers` - number of writers, which build and send requests,
`clickhouse.sink.queue-max-capacity` - max capacity (batches) of blank's queue,
`clickhouse.sink.timeout-sec` - timeout for loading data,
`clickhouse.sink.retries` - max number of retries,
`clickhouse.sink.failed-records-path`- path for failed records,
`clickhouse.sink.ignoring-clickhouse-sending-exception-enabled` - required boolean parameter responsible for raising (false) or not (true) ClickHouse sending exception in main thread.
if `ignoring-clickhouse-sending-exception-enabled` is true, exception while clickhouse sending is ignored and failed data automatically goes to the disk.
if `ignoring-clickhouse-sending-exception-enabled` is false, clickhouse sending exception thrown in "main" thread (thread which called ClickhHouseSink::invoke) and data also goes to the disk.
**The sink part** (use in chain):
`clickhouse.sink.target-table` - target table in ClickHouse,
`clickhouse.sink.max-buffer-size`- buffer size.
### In code
The main thing: the clickhouse-sink works with events in string
(ClickHouse insert format, like CSV) format.
You have to convert your event to csv format (like usual insert in database).
For example, you have event-pojo:
```java
class A {
public final String str;
public final int integer;
public A(String str, int i){
this.str = str;
this.integer = i;
}
}
```
You have to convert this pojo like this:
```java
public static String convertToCsv(A a) {
StringBuilder builder = new StringBuilder();
builder.append("(");
// add a.str
builder.append("'");
builder.append(a.str);
builder.append("', ");
// add a.intger
builder.append(String.valueOf(a.integer));
builder.append(" )");
return builder.toString();
}
```
And then add record to sink.
You have to add global parameters for Flink environment:
```java
StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironment();
Map<String, String> globalParameters = new HashMap<>();
// ClickHouse cluster properties
globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_HOSTS, ...);
globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_USER, ...);
globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_PASSWORD, ...);
// sink common
globalParameters.put(ClickHouseSinkConsts.TIMEOUT_SEC, ...);
globalParameters.put(ClickHouseSinkConsts.FAILED_RECORDS_PATH, ...);
globalParameters.put(ClickHouseSinkConsts.NUM_WRITERS, ...);
globalParameters.put(ClickHouseSinkConsts.NUM_RETRIES, ...);
globalParameters.put(ClickHouseSinkConsts.QUEUE_MAX_CAPACITY, ...);
globalParameters.put(ClickHouseSinkConsts.IGNORING_CLICKHOUSE_SENDING_EXCEPTION_ENABLED, ...);
// set global paramaters
ParameterTool parameters = ParameterTool.fromMap(buildGlobalParameters(config));
environment.getConfig().setGlobalJobParameters(parameters);
```
And add your sink like this:
```java
// create converter
public YourEventConverter {
String toClickHouseInsertFormat (YourEvent yourEvent){
String chFormat = ...;
....
return chFormat;
}
}
// create props for sink
Properties props = new Properties();
props.put(ClickHouseSinkConsts.TARGET_TABLE_NAME, "your_table");
props.put(ClickHouseSinkConsts.MAX_BUFFER_SIZE, "10000");
// build chain
DataStream<YourEvent> dataStream = ...;
dataStream.map(YourEventConverter::toClickHouseInsertFormat)
.name("convert YourEvent to ClickHouse table format")
.addSink(new ClickHouseSink(props))
.name("your_table ClickHouse sink);
```
## Roadmap
- [ ] reading files from "failed-records-path"
- [ ] migrate to gradle
没有合适的资源?快使用搜索试试~ 我知道了~
flink-clickhouse-sink:Flink水槽的Clickhouse
共27个文件
java:20个
xml:2个
license:1个
需积分: 50 21 下载量 113 浏览量
2021-05-17
09:41:08
上传
评论
收藏 33KB ZIP 举报
温馨提示
Flink-ClickHouse-Sink 描述 用于数据库的器。 由。 用于将数据加载到ClickHouse的高性能库。 它有两个触发器来加载数据:超时和缓冲区大小。 版本图 闪烁 flink-clickhouse-sink 1.3。* 1.0.0 1.9。* 1.3.1 安装 Maven中央 < dependency> < groupId>ru.ivi.opensource</ groupId> < artifactId>flink-clickhouse-sink</ artifactId> < version>1.3.1</ version> </ dependency> 用法 特性 flink-clickhouse-sink使用配置属性的两个部分:common和操作员链中的每个接收器。 共同的部分(像全局一样使用): clickhouse
资源详情
资源评论
资源推荐
收起资源包目录
flink-clickhouse-sink-master.zip (27个子文件)
flink-clickhouse-sink-master
.gitignore 143B
README.md 5KB
pom.xml 10KB
LICENSE 1KB
src
test
resources
reference.conf 204B
logback-test.xml 371B
java
ru
ivi
opensource
flinkclickhousesink
applied
UnexceptionableSinkTest.java 1KB
ExceptionsThrowableSinkTest.java 2KB
ClickHouseSinkBufferTest.java 5KB
ClickHouseSinkScheduledCheckerAndCleanerTest.java 3KB
ClickHouseWriterTest.java 12KB
main
java
ru
ivi
opensource
flinkclickhousesink
ClickHouseSink.java 2KB
applied
Sink.java 231B
ClickHouseSinkScheduledCheckerAndCleaner.java 3KB
ClickHouseSinkManager.java 3KB
ClickHouseSinkBuffer.java 5KB
ExceptionsThrowableSink.java 691B
ClickHouseWriter.java 12KB
UnexceptionableSink.java 528B
model
ClickHouseRequestBlank.java 2KB
ClickHouseSinkCommonParams.java 3KB
ClickHouseClusterSettings.java 4KB
ClickHouseSinkConst.java 858B
util
FutureUtil.java 375B
ConfigUtil.java 1KB
ThreadUtil.java 1KB
.travis.yml 89B
共 27 条
- 1
锦宣
- 粉丝: 18
- 资源: 4564
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
- 探索tecreate:软件开发的未来之星.zip
- 打标机项目C#源码连接扫码
- 基于SSM的房屋租赁系统的设计与实现
- xyctf:从入门到精通的实用指南.zip
- mmqrcode1714153659780.png
- Screenshot_2024-04-27-06-08-58-486_com.baidu.xin.aiqicha.jpg
- 基于Javaweb+Tomcat+MySQL的大学生公寓管理系统+sql文件.zip
- 实训作业基于javaweb的订单管理系统源码+数据库+实训报告.zip
- 多机调度问题贪心算法基于最小堆和贪心算法求解多机调度问题.zip
- 基于同态加密技术的匿名电子投票系统源码.zip
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功
评论0