# Pulsar Flink Connector
The Pulsar Flink connector implements elastic data processing using [Apache Pulsar](https://pulsar.apache.org) and [Apache Flink](https://flink.apache.org).
For details about the Chinese document, see [here](doc/README_CN.md).
# Prerequisites
- Java 8 or higher version
- Flink 1.9.0 or higher version
- Pulsar 2.4.0 or higher version
# Basic information
This section describes basic information about the Pulsar Flink connector.
## Client
Currently, the following Flink versions are supported.
- Flink 1.9 - 1.10: they are maintained in the [`flink-1.9` branch](https://github.com/streamnative/pulsar-flink/tree/flink-1.9).
- Flink 1.11: it is maintained in the [`flink-1.11` branch](https://github.com/streamnative/pulsar-flink/tree/flink-1.11).
- Flink 1.12: it is maintained in the [`master` branch](https://github.com/streamnative/pulsar-flink/tree/master).
> **Note**
> Since Flink's API has changed greatly, we mainly work on new features in the `master` branch and fix bugs in other branches.
The JAR package is located in the [Bintray Maven repository of StreamNative](https://dl.bintray.com/streamnative/maven).
For projects using SBT, Maven, or Gradle, you can set the following parameters for your project.
- `FLINK_VERSION`: currently, versions `1.9`, `1.11`, and `1.12` are available.
- `SCALA_BINARY_VERSION`: this parameter defines the Scala version used by Flink. Versions `2.11` and `2.12` are available.
- `PULSAR_FLINK_VERSION`: it is the version of the Pulsar Flink connector. Usually, use a three-digit version (such as version `2.7.0`) for a master release and a four-digit version for a branch release (such as version `2.7.0.1`).
Here is an example about how to configure parameters for projects using SBT, Maven, or Gradle.
```shell
groupId = io.streamnative.connectors
artifactId = pulsar-flink-connector-{{SCALA_BINARY_VERSION}}-{{FLINK_VERSION}}
version = {{PULSAR_FLINK_VERSION}}
```
## Maven projects
For Maven projects, you can add the repository configuration to your `pom.xml`, as shown below.
```xml
<repositories>
<repository>
<id>central</id>
<layout>default</layout>
<url>https://repo1.maven.org/maven2</url>
</repository>
<repository>
<id>bintray-streamnative-maven</id>
<name>bintray</name>
<url>https://dl.bintray.com/streamnative/maven</url>
</repository>
</repositories>
```
For Maven projects, you can use the following [shade](https://imperceptiblethoughts.com/shadow/) plugin definition template to build an application JAR package that contains all the dependencies required for the client library and Pulsar Flink connector.
```xml
<plugin>
<!-- Shade all the dependencies to avoid conflicts -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${maven-shade-plugin.version}</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
<minimizeJar>false</minimizeJar>
<artifactSet>
<includes>
<include>io.streamnative.connectors:*</include>
<!-- more libs to include here -->
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.PluginXmlResourceTransformer" />
</transformers>
</configuration>
</execution>
</executions>
</plugin>
```
## Gradle projects
For Gradle projects, you can add the repository configuration to your `build.gradle`, as shown below.
```groovy
repositories {
maven {
url 'https://dl.bintray.com/streamnative/maven'
}
}
```
For gradle projects, you can use the following [shade](https://imperceptiblethoughts.com/shadow/) plugin definition template to build an application JAR package that contains all the dependencies required for the client library and Pulsar Flink connector.
```groovy
buildscript {
repositories {
jcenter()
}
dependencies {
classpath 'com.github.jengelman.gradle.plugins:shadow:6.0.0'
}
}
apply plugin: 'com.github.johnrengelman.shadow'
apply plugin: 'java'
```
# Build Pulsar Flink connector
To build the Pulsar Flink connector for reading data from Pulsar or writing the results to Pulsar, follow these steps.
1. Check out the source code.
```bash
git clone https://github.com/streamnative/pulsar-flink.git
cd pulsar-flink
```
2. Install the Docker.
The Pulsar Flink connector uses [Testcontainers](https://www.testcontainers.org/) for integration test. To run the integration test, ensure to install the Docker. For details about how to install the Docker, see [here](https://docs.docker.com/docker-for-mac/install/).
3. Set the Java version.
Modify `java.version` and `java.binary.version` in `pom.xml`.
> **Note**
> Ensure that the Java version should be identical to the Java version for the Pulsar Flink connector.
4. Build the project.
```bash
mvn clean install -DskipTests
```
5. Run the test.
```bash
mvn clean install
```
After the Pulsar Flink connector is installed, a JAR package that contains all the dependencies is generated in both the local Maven repository and the `target` directory.
> **Note**
> If you use intellij IDEA to debug this project, you might encounter the `org.apache.pulsar.shade.org.bookkeeper.ledger` package error. To fix the error, use the ` mvn clean install -DskipTests` command to install the JAR package to the local repository, ignore the `managed-ledger-shaded` Maven module on the project, and then click **Refresh**.
# Deploy Pulsar Flink connector
This section describes how to deploy the Pulsar Flink connector.
## Client library
For any Flink application, use the `./bin/flink run` command to compile and start your application.
If you have already built a JAR package with dependencies using the above shade plugin, you can use the `--classpath` option to add your JAR package.
> **Note**
> The path must be in a protocol format (such as `file://`) and the path must be accessible on all nodes.
**Example**
```
./bin/flink run -c com.example.entry.point.ClassName file://path/to/jars/your_fat_jar.jar
```
## Scala REPL
The Scala REPL is a tool (scala) for evaluating expressions in Scala. Use the `bin/start-scala-shell.sh` command to deploy Pulsar Flink connector on Scala client. You can use the `--addclasspath` to add `pulsar-flink-connector_{{SCALA_BINARY_VERSION}}-{{ PULSAR_FLINK_VERSION}}.jar` package.
**Example**
```
./bin/start-scala-shell.sh remote <hostname> <portnumber>
--addclasspath pulsar-flink-connector-{{SCALA_BINARY_VERSION}}-{{PULSAR_FLINK_VERSION}}.jar
```
For more information on submitting applications through the CLI, see [Command-Line Interface](https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/cli.html) .
## SQL client
The [SQL Client](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sqlClient.html) is used to write SQL queries for manipulating data in Pulsar, you can use the `-addclasspath` option to add `pulsar-flink-connector-{{SCALA_BINARY_VERSION}}-{{PULSAR_FLINK_VERSION}}.jar` package.
**Example**
```
./bin/sql-client.sh embedded --jar pulsar-flink-connector-{{SCALA_BINARY_VERSION}}-{{PULSAR_FLINK_VERSION}}.jar
```
> **Note**
> If you put the JAR
没有合适的资源?快使用搜索试试~ 我知道了~
pulsar-flink:使用Apache Pulsar和Apache Flink进行弹性数据处理
共192个文件
java:162个
xml:6个
sh:5个
需积分: 49 8 下载量 160 浏览量
2021-01-30
22:05:23
上传
评论 1
收藏 436KB ZIP 举报
温馨提示
脉冲星Flink连接器 Pulsar Flink连接器使用和实现弹性数据处理。 有关中文文档的详细信息,请参见。 先决条件 Java 8或更高版本 Flink 1.9.0或更高版本 Pulsar 2.4.0或更高版本 基本信息 本节介绍有关Pulsar Flink连接器的基本信息。 客户 当前,支持以下Flink版本。 Flink :它们维护在。 Flink 1.11:它在维护。 Flink 1.12:它在维护。 注意由于Flink的API发生了很大变化,因此我们主要致力于master分支中的新功能,并修复其他分支中的错误。 JAR包位于。 对于使用SBT,Maven或Gradle的项目,可以为项目设置以下参数。 FLINK_VERSION :目前,版本1.9 , 1.11 ,和1.12可用。 SCALA_BINARY_VERSION :此参数定义Flink使用的Scala版本。 提供版本2.11和2.12 。 PULSAR_FLINK_VERSION :它是Pulsar Flink连接器的版本。 通常,对于主发行版,请使用三位数版本(例如,版本2.7.0 );对
资源详情
资源评论
资源推荐
收起资源包目录
pulsar-flink:使用Apache Pulsar和Apache Flink进行弹性数据处理 (192个子文件)
CODEOWNERS 124B
txnStandalone.conf 35KB
standalone.conf 33KB
client.conf 2KB
org.apache.flink.table.factories.Factory 767B
.gitignore 496B
FlinkPulsarITest.java 63KB
FlinkPulsarSource.java 36KB
FlinkPulsarSourceTest.java 35KB
PulsarTableOptions.java 32KB
PulsarDynamicTableFactoryTest.java 31KB
FlinkPulsarSinkBase.java 30KB
PulsarFetcher.java 29KB
PulsarDeserializer.java 28KB
FlinkPulsarTableITest.java 27KB
JacksonRecordParser.java 26KB
UpsertPulsarTableITCase.java 25KB
PulsarFetcherWatermarkTest.java 21KB
PulsarEnumeratorTest.java 20KB
ClosableBlockingQueueTest.java 20KB
PulsarDynamicTableSource.java 20KB
UpsertPulsarDynamicTableFactoryTest.java 20KB
SimpleSchemaTranslator.java 19KB
PulsarMetadataReader.java 19KB
CatalogITest.java 18KB
ClosableBlockingQueue.java 17KB
PulsarDynamicTableFactory.java 17KB
PulsarTestBase.java 17KB
FlinkPulsarSinkTest.java 16KB
PulsarPartitionSplitReader.java 16KB
RowDataDerSerializationSchemaTest.java 15KB
UpsertPulsarDynamicTableFactory.java 15KB
PulsarSerializer.java 15KB
PulsarCatalog.java 15KB
PulsarDynamicTableSink.java 15KB
DiscovererTest.java 12KB
SchemaUtils.java 12KB
DateTimeUtils.java 12KB
PulsarSourceITCase.java 11KB
SchemaITest.java 11KB
ReaderThread.java 11KB
PulsarSourceEnumerator.java 11KB
PulsarTransactionalSinkTest.java 10KB
DynamicPulsarDeserializationSchema.java 10KB
PulsarSource.java 9KB
InputSplitReader.java 9KB
PulsarSerializationSchemaWrapper.java 9KB
PulsarAuthTest.java 9KB
PulsarFetcherTest.java 9KB
CachedClients.java 8KB
PulsarSourceBuilder.java 8KB
SourceSinkUtils.java 8KB
Pulsar.java 8KB
DynamicPulsarSerializationSchema.java 8KB
SplitUtils.java 7KB
SourceReaderTestBase.java 7KB
PulsarInputFormat.java 7KB
JSONOptions.java 7KB
PulsarValidator.java 6KB
SchemaData.java 6KB
PulsarOptionsTest.java 6KB
AtomicRowDataDeserializationSchema.java 6KB
PulsarSubscriberTest.java 6KB
StartOffsetInitializer.java 6KB
SerdeUtils.java 6KB
PulsarCatalogSupport.java 5KB
PulsarTest.java 5KB
PulsarPrimitiveSchema.java 5KB
FlinkPulsarSink.java 5KB
PulsarOptions.java 5KB
AtomicRowDataFormatFactory.java 5KB
PulsarInputFormatITest.java 5KB
PartitionReader.java 4KB
IntegerSource.java 4KB
CachedPulsarClient.java 4KB
StopCondition.java 4KB
TopicPatternSubscriber.java 4KB
AtomicRowDataSerializationSchema.java 4KB
SpecifiedStartOffsetInitializer.java 4KB
AtomicRowDataDeserializationSchemaTest.java 4KB
PulsarDeserializationSchema.java 4KB
PulsarTestBaseWithFlink.java 4KB
CachedPulsarClientTest.java 4KB
RowDataUtil.java 4KB
TopicListSubscriber.java 4KB
PulsarCatalogFactory.java 4KB
FailingIdentityMapper.java 4KB
ConnectorConfig.java 4KB
PulsarSubscriber.java 4KB
TopicRange.java 4KB
TestMetadataReader.java 3KB
PulsarValidatorTest.java 3KB
SchemaTranslator.java 3KB
ComponentClosingUtils.java 3KB
ComponentClosingUtils.java 3KB
MessageDeserializer.java 3KB
SerializableRange.java 3KB
ValidatingExactlyOnceSink.java 3KB
PulsarTopicPartitionStateWithWatermarkGenerator.java 3KB
PulsarSourceOptions.java 3KB
共 192 条
- 1
- 2
邱笑晨
- 粉丝: 44
- 资源: 4553
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
- (源码)基于ArcEngine的GIS数据处理系统.zip
- (源码)基于JavaFX和MySQL的医院挂号管理系统.zip
- (源码)基于IdentityServer4和Finbuckle.MultiTenant的多租户身份认证系统.zip
- (源码)基于Spring Boot和Vue3+ElementPlus的后台管理系统.zip
- (源码)基于C++和Qt框架的dearoot配置管理系统.zip
- (源码)基于 .NET 和 EasyHook 的虚拟文件系统.zip
- (源码)基于Python的金融文档智能分析系统.zip
- (源码)基于Java的医药管理系统.zip
- (源码)基于Java和MySQL的学生信息管理系统.zip
- (源码)基于ASP.NET Core的零售供应链管理系统.zip
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功
评论0