## dlink-connector-pulsar
> 概要说明:
> 实现依附:https://gitee.com/apache/flink/tree/release-1.14/flink-connectors
* Flink 官方自1.14版本支持 Flink-pulsar-connector(目前未支持 Flink-sql)
* 在此版本前,自主实现了Flink-pulsar-connector,本次Flink-sql的实现向官方Flink-connector-pulsar对齐,更好的兼容使用,实现性能最优!
* 就生产经验,避坑处理
* 本次Pulsar版本使用版本:2.8.2 Flink版本:1.14.3
* Pulsar-connector应用广泛,在消息队列的使用中,FlinkSql的开发中具有总要作用意义。
## ★详情介绍 Pulsar-SQL Connector
### Dependencies
In order to use the Pulsar connector the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
* Maven dependency
```
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-Pulsar_2.11</artifactId>
<version>1.14.3</version>
</dependency>
```
### How to create a Pulsar table
```
CREATE TABLE source_pulsar_n(
requestId VARCHAR,
`timestamp` BIGINT,
`date` VARCHAR,
appId VARCHAR,
appName VARCHAR,
forwardTimeMs VARCHAR,
processingTimeMs INT,
errCode VARCHAR,
userIp VARCHAR,
b_create_time as TO_TIMESTAMP(FROM_UNIXTIME(createTime/1000,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd HH:mm:ss')
) WITH (
'connector' = 'pulsar',
'connector.version' = 'universal',
'connector.topic' = 'persistent://dlink/dev/context.pulsar',
'connector.service-url' = 'pulsar://pulsar-dlink-n.stream.com:6650',
'connector.subscription-name' = 'tmp_print_detail',
'connector.subscription-type' = 'Shared',
'connector.subscription-initial-position' = 'Latest',
'update-mode' = 'append',
'format' = 'json',
'format.derive-schema' = 'true'
);
```
### Data Type Mapping
Pulsar stores message keys and values as bytes, so Pulsar doesn’t have schema or data types. The Pulsar messages are deserialized and serialized by formats, e.g. csv, json, avro. Thus, the data type mapping is determined by specific formats. Please refer to Formats pages for more details.
### Connector Options
| Option | Required | Default | Type | Description |
| --------------------------------------- | ----------------- | ------- | ------ | ------------------------------------------------------------ |
| connector | required | (none) | String | Specify what connector to use, for pulsar use `'pulsar'`. |
| connector.version | required | (none) | String | universal |
| connector.topic | required for sink | (none) | String | Topic name(s) to read data from when the table is used as source |
| connector.service-url | optional | (none) | String | The address of the pulsar |
| connector.subscription-name | required | (none) | String | The subscription name of the Pulsar |
| connector.subscription-type | required | (none) | String | A subscription model of the Pulsar【Shared、Exclusive、Key_Shared、Failover】 |
| connector.subscription-initial-position | required | (none) | String | initial-position[EARLIEST、LATEST、TIMESTAMP] |
| update-mode | optional | (none) | String | append or upsert |
| format | optional | (none) | String | json、csv...... |
| format.derive-schema | optional | (none) | String | ture or false |
| | | | | |
## 🚀 快速上手
```shell
git clone https://github.com/DataLinkDC/dlink.git
cd dlink-connector/dlink-connector-pulsar-1.14
mvn clean install -DskipTests -Dflink.version=$version
```
## 🎉 Features
* Key and Value Formats
Both the key and value part of a Pulsar record can be serialized to and deserialized from raw bytes using one of the given
* Value Format
Since a key is optional in Pulsar records, the following statement reads and writes records with a configured value format but without a key format. The 'format' option is a synonym for 'value.format'. All format options are prefixed with the format identifier.
## 👻 使用
```sql
-- Pulsar多集群形式,
-- 此处分 n、b 两个集群
--声明数据源
CREATE TABLE source_pulsar_n(
requestId VARCHAR,
`timestamp` BIGINT,
`date` VARCHAR,
appId VARCHAR,
appName VARCHAR,
forwardTimeMs VARCHAR,
processingTimeMs INT,
errCode VARCHAR,
userIp VARCHAR,
createTime BIGINT,
b_create_time as TO_TIMESTAMP(FROM_UNIXTIME(createTime/1000,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd HH:mm:ss')
) WITH (
'connector' = 'pulsar',
'connector.version' = 'universal',
'connector.topic' = 'persistent://dlink/dev/context.pulsar',
'connector.service-url' = 'pulsar://pulsar-dlink-n.stream.com:6650',
'connector.subscription-name' = 'tmp_print_detail',
'connector.subscription-type' = 'Shared',
'connector.subscription-initial-position' = 'Latest',
'update-mode' = 'append',
'format' = 'json',
'format.derive-schema' = 'true'
);
CREATE TABLE source_pulsar_b(
requestId VARCHAR,
`timestamp` BIGINT,
`date` VARCHAR,
appId VARCHAR,
appName VARCHAR,
forwardTimeMs VARCHAR,
processingTimeMs INT,
errCode VARCHAR,
userIp VARCHAR,
createTime BIGINT,
b_create_im_time as TO_TIMESTAMP(FROM_UNIXTIME(createTime/1000,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd HH:mm:ss')
) WITH (
'connector' = 'pulsar',
'connector.version' = 'universal',
'connector.topic' = 'persistent://dlink/dev/context.pulsar',
'connector.service-url' = 'pulsar://pulsar-dlink-b.stream.com:6650',
'connector.subscription-name' = 'tmp_print_detail',
'connector.subscription-type' = 'Shared',
'connector.subscription-initial-position' = 'Latest',
'update-mode' = 'append',
'format' = 'json',
'format.derive-schema' = 'true'
);
-- 合并数据源
create view pulsar_source_all AS
select
requestId ,
`timestamp`,
`date`,
appId,
appName,
forwardTimeMs,
processingTim,
errCode,
userIp,
b_create_time
from source_pulsar_n
union all
select
requestId ,
`timestamp`,
`date`,
appId,
appName,
forwardTimeMs,
processingTim,
errCode,
userIp,
b_create_time
from source_pulsar_b;
-- 创建 sink
create table sink_pulsar_result(
requestId VARCHAR,
`timestamp` BIGINT,
`date` VARCHAR,
appId VARCHAR,
appName VARCHAR,
forwardTimeMs VARCHAR,
processingTimeMs INT,
errCode VARCHAR,
userIp VARCHAR
) with (
'connector' = 'print'
);
-- 执行逻辑
-- 查看 pulsar主题明细数据
insert into sink_pulsar_result
select
requestId ,
`timestamp`,
`date`,
appId,
appName,
forwardTimeMs,
processingTim,
errCode,
userIp,
b_create_time
from pulsar_source_all;
```
### 介绍
与Kafka对比
| 对比方面 | Kafka | Pulsar |
| --------------------------------------- | ----------------- | ------- |
| 模型概念 | producer – topic – consumer group – consumer | producer – topic -subsciption- consumer | Stri
| 消费模式 | 主要集中在流(Stream) 模式, 对单个partition是独占消费, 没有共享(Queue)的消费模式 | 提供了统一的消
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
收起资源包目录
Dinky-dev.zip (1946个子文件)
com.dlink.alert.Alert 38B
com.dlink.alert.Alert 34B
com.dlink.alert.Alert 34B
com.dlink.alert.Alert 32B
mvnw.cmd 7KB
build.cmd 1KB
CNAME 22B
custom.css 4KB
index.module.css 2KB
styles.module.css 1KB
index.css 304B
com.dlink.daemon.task.DaemonTask 26B
DinkyFlinkDockerfile 734B
Dockerfile 4KB
Dockerfile 722B
Dockerfile 514B
Dockerfile 468B
Dockerfile 245B
Dockerfile 195B
DockerfileDinkyFlink 469B
DockerfileMysql 209B
DockerfileMysql 199B
com.dlink.metadata.driver.Driver 42B
com.dlink.metadata.driver.Driver 42B
com.dlink.metadata.driver.Driver 41B
com.dlink.metadata.driver.Driver 41B
com.dlink.metadata.driver.Driver 39B
com.dlink.metadata.driver.Driver 38B
com.dlink.metadata.driver.Driver 38B
com.dlink.metadata.driver.Driver 37B
com.dlink.metadata.driver.Driver 37B
com.dlink.metadata.driver.Driver 36B
.editorconfig 245B
document.ejs 6KB
.eslintignore 56B
spring.factories 105B
org.apache.flink.table.factories.Factory 849B
org.apache.flink.table.factories.Factory 849B
org.apache.flink.table.factories.Factory 839B
org.apache.flink.table.factories.Factory 839B
org.apache.flink.table.factories.Factory 839B
org.apache.flink.table.factories.Factory 839B
org.apache.flink.table.factories.Factory 835B
org.apache.flink.table.factories.Factory 835B
com.dlink.gateway.Gateway 240B
header3.gif 93KB
.gitignore 684B
.gitignore 463B
favicon.ico 4KB
maven-wrapper.jar 57KB
StreamExecutionEnvironment.java 102KB
TaskServiceImpl.java 64KB
DataTypes.java 55KB
DlinkMysqlCatalog.java 48KB
DlinkMysqlCatalog.java 48KB
DlinkMysqlCatalog.java 48KB
DlinkMysqlCatalog.java 47KB
ExtractionUtils.java 40KB
ExtractionUtils.java 40KB
ExtractionUtils.java 39KB
ExtractionUtils.java 39KB
Configuration.java 36KB
JobManager.java 33KB
AbstractJdbcDriver.java 32KB
StudioServiceImpl.java 26KB
Explainer.java 25KB
ConfigurationUtils.java 22KB
RelMdColumnOrigins.java 18KB
RelMdColumnOrigins.java 18KB
RelMdColumnOrigins.java 18KB
RelMdColumnOrigins.java 18KB
RelMdColumnOrigins.java 18KB
RelMdColumnOrigins.java 18KB
JdbcDynamicTableFactory.java 18KB
CustomTableEnvironmentImpl.java 18KB
PhoenixInputFormat.java 17KB
PhoenixInputFormat.java 17KB
AbstractSinkBuilder.java 17KB
AbstractSinkBuilder.java 17KB
CustomTableEnvironmentImpl.java 17KB
KafkaSinkJsonBuilder.java 17KB
CustomTableEnvironmentImpl.java 17KB
CustomTableEnvironmentImpl.java 17KB
CustomTableEnvironmentImpl.java 16KB
PhoenixDynamicTableFactory.java 16KB
PhoenixDynamicTableFactory.java 16KB
SQLSinkBuilder.java 16KB
YarnGateway.java 16KB
CustomTableEnvironmentImpl.java 16KB
LineageUtils.java 16KB
AbstractSinkBuilder.java 16KB
JdbcBatchingOutputFormat.java 15KB
Executor.java 15KB
JdbcBatchingOutputFormat.java 15KB
AbstractSinkBuilder.java 15KB
LineageBuilder.java 15KB
CustomTableResultImpl.java 15KB
DorisExtendSinkBuilder.java 15KB
DorisExtendSinkBuilder.java 15KB
AbstractSinkBuilder.java 15KB
共 1946 条
- 1
- 2
- 3
- 4
- 5
- 6
- 20
资源评论
m0_72731342
- 粉丝: 2
- 资源: 1832
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功