## dlink-connector-pulsar
> 概要说明:
> 实现依附:https://gitee.com/apache/flink/tree/release-1.14/flink-connectors
* Flink 官方自1.14版本支持 Flink-pulsar-connector(目前未支持 Flink-sql)
* 在此版本前,自主实现了Flink-pulsar-connector,本次Flink-sql的实现向官方Flink-connector-pulsar对齐,更好的兼容使用,实现性能最优!
* 就生产经验,避坑处理
* 本次Pulsar版本使用版本:2.8.2 Flink版本:1.14.3
* Pulsar-connector应用广泛,在消息队列的使用中,FlinkSql的开发中具有总要作用意义。
## ★详情介绍 Pulsar-SQL Connector
### Dependencies
In order to use the Pulsar connector the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
* Maven dependency
```
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-Pulsar_2.11</artifactId>
<version>1.14.3</version>
</dependency>
```
### How to create a Pulsar table
```
CREATE TABLE source_pulsar_n(
requestId VARCHAR,
`timestamp` BIGINT,
`date` VARCHAR,
appId VARCHAR,
appName VARCHAR,
forwardTimeMs VARCHAR,
processingTimeMs INT,
errCode VARCHAR,
userIp VARCHAR,
b_create_time as TO_TIMESTAMP(FROM_UNIXTIME(createTime/1000,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd HH:mm:ss')
) WITH (
'connector' = 'pulsar',
'connector.version' = 'universal',
'connector.topic' = 'persistent://dlink/dev/context.pulsar',
'connector.service-url' = 'pulsar://pulsar-dlink-n.stream.com:6650',
'connector.subscription-name' = 'tmp_print_detail',
'connector.subscription-type' = 'Shared',
'connector.subscription-initial-position' = 'Latest',
'update-mode' = 'append',
'format' = 'json',
'format.derive-schema' = 'true'
);
```
### Data Type Mapping
Pulsar stores message keys and values as bytes, so Pulsar doesn’t have schema or data types. The Pulsar messages are deserialized and serialized by formats, e.g. csv, json, avro. Thus, the data type mapping is determined by specific formats. Please refer to Formats pages for more details.
### Connector Options
| Option | Required | Default | Type | Description |
| --------------------------------------- | ----------------- | ------- | ------ | ------------------------------------------------------------ |
| connector | required | (none) | String | Specify what connector to use, for pulsar use `'pulsar'`. |
| connector.version | required | (none) | String | universal |
| connector.topic | required for sink | (none) | String | Topic name(s) to read data from when the table is used as source |
| connector.service-url | optional | (none) | String | The address of the pulsar |
| connector.subscription-name | required | (none) | String | The subscription name of the Pulsar |
| connector.subscription-type | required | (none) | String | A subscription model of the Pulsar【Shared、Exclusive、Key_Shared、Failover】 |
| connector.subscription-initial-position | required | (none) | String | initial-position[EARLIEST、LATEST、TIMESTAMP] |
| update-mode | optional | (none) | String | append or upsert |
| format | optional | (none) | String | json、csv...... |
| format.derive-schema | optional | (none) | String | ture or false |
| | | | | |
## 🚀 快速上手
```shell
git clone https://github.com/DataLinkDC/dlink.git
cd dlink-connector/dlink-connector-pulsar-1.14
mvn clean install -DskipTests -Dflink.version=$version
```
## 🎉 Features
* Key and Value Formats
Both the key and value part of a Pulsar record can be serialized to and deserialized from raw bytes using one of the given
* Value Format
Since a key is optional in Pulsar records, the following statement reads and writes records with a configured value format but without a key format. The 'format' option is a synonym for 'value.format'. All format options are prefixed with the format identifier.
## 👻 使用
```sql
-- Pulsar多集群形式,
-- 此处分 n、b 两个集群
--声明数据源
CREATE TABLE source_pulsar_n(
requestId VARCHAR,
`timestamp` BIGINT,
`date` VARCHAR,
appId VARCHAR,
appName VARCHAR,
forwardTimeMs VARCHAR,
processingTimeMs INT,
errCode VARCHAR,
userIp VARCHAR,
createTime BIGINT,
b_create_time as TO_TIMESTAMP(FROM_UNIXTIME(createTime/1000,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd HH:mm:ss')
) WITH (
'connector' = 'pulsar',
'connector.version' = 'universal',
'connector.topic' = 'persistent://dlink/dev/context.pulsar',
'connector.service-url' = 'pulsar://pulsar-dlink-n.stream.com:6650',
'connector.subscription-name' = 'tmp_print_detail',
'connector.subscription-type' = 'Shared',
'connector.subscription-initial-position' = 'Latest',
'update-mode' = 'append',
'format' = 'json',
'format.derive-schema' = 'true'
);
CREATE TABLE source_pulsar_b(
requestId VARCHAR,
`timestamp` BIGINT,
`date` VARCHAR,
appId VARCHAR,
appName VARCHAR,
forwardTimeMs VARCHAR,
processingTimeMs INT,
errCode VARCHAR,
userIp VARCHAR,
createTime BIGINT,
b_create_im_time as TO_TIMESTAMP(FROM_UNIXTIME(createTime/1000,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd HH:mm:ss')
) WITH (
'connector' = 'pulsar',
'connector.version' = 'universal',
'connector.topic' = 'persistent://dlink/dev/context.pulsar',
'connector.service-url' = 'pulsar://pulsar-dlink-b.stream.com:6650',
'connector.subscription-name' = 'tmp_print_detail',
'connector.subscription-type' = 'Shared',
'connector.subscription-initial-position' = 'Latest',
'update-mode' = 'append',
'format' = 'json',
'format.derive-schema' = 'true'
);
-- 合并数据源
create view pulsar_source_all AS
select
requestId ,
`timestamp`,
`date`,
appId,
appName,
forwardTimeMs,
processingTim,
errCode,
userIp,
b_create_time
from source_pulsar_n
union all
select
requestId ,
`timestamp`,
`date`,
appId,
appName,
forwardTimeMs,
processingTim,
errCode,
userIp,
b_create_time
from source_pulsar_b;
-- 创建 sink
create table sink_pulsar_result(
requestId VARCHAR,
`timestamp` BIGINT,
`date` VARCHAR,
appId VARCHAR,
appName VARCHAR,
forwardTimeMs VARCHAR,
processingTimeMs INT,
errCode VARCHAR,
userIp VARCHAR
) with (
'connector' = 'print'
);
-- 执行逻辑
-- 查看 pulsar主题明细数据
insert into sink_pulsar_result
select
requestId ,
`timestamp`,
`date`,
appId,
appName,
forwardTimeMs,
processingTim,
errCode,
userIp,
b_create_time
from pulsar_source_all;
```
### 介绍
与Kafka对比
| 对比方面 | Kafka | Pulsar |
| --------------------------------------- | ----------------- | ------- |
| 模型概念 | producer – topic – consumer group – consumer | producer – topic -subsciption- consumer | Stri
| 消费模式 | 主要集中在流(Stream) 模式, 对单个partition是独占消费, 没有共享(Queue)的消费模式 | 提供了统一的消
没有合适的资源?快使用搜索试试~ 我知道了~
温馨提示
一、背景 当前行业不断有许多新概念与新技术涌现,同时伴随着大量开源项目的诞生和发展,也有越来越多的企业转向开源软件。面对海量的业务需求和数据,应该如何高效地进行数据处理与分析,如何搭建一个数据平台?如何选择合适的开源项目来搭建呢?这是目前大家比较困扰的一个问题。 本次分享将介绍如何运用 Doris + Flink + DolphinScheduler + Dinky 四个开源项目来构建一个基本的数据平台,并支持离线、实时、OLAP 三种技术需求。 二、开源数据平台思路 本章节主要讲述数据平台搭建所用的开源项目介绍以及设计思路。 技术介绍 Apache Doris 首先要运用到的是 Apache Doris。Apache Doris 是一个基于 MPP 架构的高性能、实时的分析型数据库,以极速易用的特点被人们所熟知,仅需亚秒级响应时间即可返回海量数据下的查询结果,不仅可以支持高并发的点查询场景,也能支持高吞吐的复杂分析场景。
资源推荐
资源详情
资源评论
收起资源包目录
Dinky一站式实时计算平台.rar (1959个子文件)
com.dlink.alert.Alert 38B
com.dlink.alert.Alert 34B
com.dlink.alert.Alert 34B
com.dlink.alert.Alert 32B
mvnw.cmd 7KB
build.cmd 1KB
CNAME 22B
custom.css 4KB
index.module.css 2KB
styles.module.css 1KB
index.css 304B
com.dlink.daemon.task.DaemonTask 26B
DinkyFlinkDockerfile 734B
Dockerfile 4KB
Dockerfile 722B
Dockerfile 528B
Dockerfile 468B
Dockerfile 245B
Dockerfile 195B
DockerfileDinkyFlink 497B
DockerfileMysql 209B
DockerfileMysql 199B
com.dlink.metadata.driver.Driver 42B
com.dlink.metadata.driver.Driver 42B
com.dlink.metadata.driver.Driver 41B
com.dlink.metadata.driver.Driver 41B
com.dlink.metadata.driver.Driver 39B
com.dlink.metadata.driver.Driver 38B
com.dlink.metadata.driver.Driver 38B
com.dlink.metadata.driver.Driver 37B
com.dlink.metadata.driver.Driver 37B
com.dlink.metadata.driver.Driver 36B
.editorconfig 245B
document.ejs 6KB
.eslintignore 56B
spring.factories 105B
org.apache.flink.table.factories.Factory 849B
org.apache.flink.table.factories.Factory 849B
org.apache.flink.table.factories.Factory 839B
org.apache.flink.table.factories.Factory 839B
org.apache.flink.table.factories.Factory 839B
org.apache.flink.table.factories.Factory 839B
org.apache.flink.table.factories.Factory 839B
org.apache.flink.table.factories.Factory 835B
org.apache.flink.table.factories.Factory 835B
com.dlink.gateway.Gateway 240B
header3.gif 93KB
.gitignore 684B
.gitignore 463B
favicon.ico 4KB
maven-wrapper.jar 57KB
StreamExecutionEnvironment.java 102KB
TaskServiceImpl.java 67KB
DataTypes.java 55KB
DlinkMysqlCatalog.java 48KB
DlinkMysqlCatalog.java 48KB
DlinkMysqlCatalog.java 48KB
DlinkMysqlCatalog.java 48KB
DlinkMysqlCatalog.java 47KB
ExtractionUtils.java 40KB
ExtractionUtils.java 40KB
ExtractionUtils.java 39KB
ExtractionUtils.java 39KB
Configuration.java 36KB
JobManager.java 35KB
AbstractJdbcDriver.java 32KB
StudioServiceImpl.java 27KB
Explainer.java 26KB
RelMdColumnOrigins.java 23KB
RelMdColumnOrigins.java 23KB
RelMdColumnOrigins.java 23KB
RelMdColumnOrigins.java 23KB
RelMdColumnOrigins.java 23KB
RelMdColumnOrigins.java 23KB
ConfigurationUtils.java 22KB
RelMdColumnOrigins.java 18KB
JdbcDynamicTableFactory.java 18KB
PhoenixInputFormat.java 17KB
PhoenixInputFormat.java 17KB
AbstractSinkBuilder.java 17KB
AbstractSinkBuilder.java 17KB
CustomTableEnvironmentImpl.java 17KB
SQLSinkBuilder.java 17KB
CustomTableEnvironmentImpl.java 17KB
KafkaSinkJsonBuilder.java 17KB
SQLSinkBuilder.java 16KB
SQLSinkBuilder.java 16KB
CustomTableEnvironmentImpl.java 16KB
PhoenixDynamicTableFactory.java 16KB
PhoenixDynamicTableFactory.java 16KB
YarnGateway.java 16KB
CustomTableEnvironmentImpl.java 16KB
SQLSinkBuilder.java 16KB
SQLSinkBuilder.java 16KB
LineageUtils.java 16KB
SQLSinkBuilder.java 16KB
SQLCatalogSinkBuilder.java 16KB
AbstractSinkBuilder.java 16KB
Executor.java 16KB
CustomTableEnvironmentImpl.java 15KB
共 1959 条
- 1
- 2
- 3
- 4
- 5
- 6
- 20
资源评论
野生的狒狒
- 粉丝: 1519
- 资源: 1582
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功