# Purpose
This article introduces the new interface and the new code structure on account of the newly designed API for Connectors
in Apache SeaTunnel. This helps developers quickly understand API and transformation layer improvements. On the other
hand, it can guide contributors how to use the new API to develop new connectors.See
this [issue](https://github.com/apache/seatunnel/issues/1608) for details.
## **Code Structure**
In order to separate from the old code, we have defined new modules for execution flow. This facilitates parallel
development at the current stage, and reduces the difficulty of merging.
### engineering structure
- ../`seatunnel-connectors-v2` connector-v2 code implementation
- ../`seatunnel-translation` translation layer for the connector-v2
- ../`seatunnel-transform-v2` transform v2 connector implementation
- ../seatunnel-e2e/`seatunnel-connector-v2-e2e` connector v2 e2e code
- ../seatunnel-examples/`seatunnel-flink-connector-v2-example` seatunnel connector-v2 example use flink local running instance
- ../seatunnel-examples/`seatunnel-spark-connector-v2-example` seatunnel connector-v2 example use spark local running instance
### **Example**
We have prepared two new version of the locally executable example program in `seatunnel-examples`,one
is `seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelApiExample.java`
, it runs in the Flink engine. Another one
is `seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiExample.java`
, it runs in the Spark engine. This is also the debugging method that is often used in the local development of
Connector. You can debug these examples, which will help you better understand the running logic of the program. The
configuration files used in example are saved in the "resources/examples" folder. If you want to add examples for your
own connectors, you need to follow the steps below.
1. Add the groupId, artifactId and version of the connector to be tested to
`seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml`(or add it to
`seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml` when you want to runs it in Spark engine) as a
dependency.
2. Find the dependency in your connector pom file which scope is test or provided and then add them to
seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml(or add it to
seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml) file and modify the scope to compile.
3. Add the task configuration file under resources/examples.
4. Configure the file in the `SeaTunnelApiExample` main method.
5. Just run the main method.
### **Create new seatunnel v2 connector**
1.Create a new module under the `seatunnel-connectors-v2` directory and name it connector - {connector name}.
2.The pom file can refer to the pom file of the existing connector, and add the current sub model to the pom file of the parent model
3.Create two packages corresponding to source and sink
package org.apache.seatunnel.connectors.seatunnel.{connector name}}.source
package org.apache.seatunnel.connectors.seatunnel.{connector name}}.sink
4.add connector info to plugin-mapping.properties file in seatunnel root path.
5.add connector dependency to seatunnel-dist/pom.xml, so the connector jar can be find in binary package.
### **Startup Class**
Aside from the old startup class, we have created two new startup modules,
namely ``seatunnel-core/seatunnel-flink-starter`` and ``seatunnel-core/seatunnel-spark-starter``. You can find out how
to parse the configuration file into an executable Flink/Spark process here.
### **SeaTunnel API**
A new ``seatunnel-api`` (not ``seatunnel-apis``) module has been created to store the new interfaces defined by the
SeaTunnel API. By implementing these interfaces, developers can complete the SeaTunnel Connector that supports multiple
engines.
### **Translation Layer**
We realize the conversion between SeaTunnel API and Engine API by adapting the interfaces of different engines, so as to
achieve the effect of translation, and let our SeaTunnel Connector support the operation of multiple different engines.
The corresponding code address, ``seatunnel-translation``, this module has the corresponding translation layer
implementation. If you are interested, you can view the code and help us improve the current code.
## **API introduction**
The API design of the current version of SeaTunnel draws on the design concept of Flink.
### **Source**
#### **SeaTunnelSource.java**
- The Source of SeaTunnel adopts the design of stream-batch integration, ``getBoundedness`` which determines whether the
current Source is a stream Source or a batch Source, so you can specify a Source by dynamic configuration (refer to
the default method), which can be either a stream or a batch.
- ``getRowTypeInfo`` To get the schema of the data, the connector can choose to hard-code to implement a fixed schema,
or run the user to customize the schema through config configuration. The latter is recommended.
- SeaTunnelSource is a class executed on the driver side, through which objects such as SourceReader, SplitEnumerator
and serializers are obtained.
- Currently, the data type supported by SeaTunnelSource must be SeaTunnelRow.
#### **SourceSplitEnumerator.java**
Use this enumerator to get the data read shard (SourceSplit) situation, different shards may be assigned to different
SourceReaders to read data. Contains several key methods:
- ``run``: Used to perform a spawn SourceSplit and call ``SourceSplitEnumerator.Context.assignSplit``: to distribute the
shards to the SourceReader.
- ``addSplitsBackSourceSplitEnumerator``: is required to redistribute these Splits when SourceSplit cannot be processed
normally or restarted due to the exception of SourceReader.
- ``registerReaderProcess``: some SourceReaders that are registered after the run is run. If there is no SourceSplit
distributed at this time, it can be distributed to these new readers (yes, you need to maintain your SourceSplit
distribution in SourceSplitEnumerator most of the time).
- ``handleSplitRequest``: If some Readers actively request SourceSplit from SourceSplitEnumerator, this method can be
called SourceSplitEnumerator.Context.assignSplit to sends shards to the corresponding Reader.
- ``snapshotState``: It is used for stream processing to periodically return the current state that needs to be saved.
If there is a state restoration, it will be called SeaTunnelSource.restoreEnumerator to constructs a
SourceSplitEnumerator and restore the saved state to the SourceSplitEnumerator.
- ``notifyCheckpointComplete``: It is used for subsequent processing after the state is successfully saved, and can be
used to store the state or mark in third-party storage.
#### **SourceSplit.java**
The interface used to save shards. Different shards need to define different splitIds. You can implement this interface
to save the data that shards need to save, such as kafka's partition and topic, hbase's columnfamily and other
information, which are used by SourceReader to determine Which part of the total data should be read.
#### **SourceReader.java**
The interface that directly interacts with the data source, and the action of reading data from the data source is
completed by implementing this interface.
- ``pollNext``: It is the core of Reader. Through this interface, the process of reading the data of the data source and
returning it to SeaTunnel is realized. Whenever you are ready to pass data to SeaTunnel, you can call
the ``Collector.collect`` method in the parameter, which can be called an infinite number of times to complete a large
amount of data rea
没有合适的资源?快使用搜索试试~ 我知道了~
SeaTunnel是下一代超高性能、分布式、海量数据集成工具
共2000个文件
java:1609个
md:169个
xml:149个
1.该资源内容由用户上传,如若侵权请联系客服进行举报
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
版权申诉
0 下载量 149 浏览量
2023-10-16
08:20:17
上传
评论
收藏 8.25MB ZIP 举报
温馨提示
Apache SeaTunnel 是下一代超高性能的数据集成工具,支持海量数据离线和实时的超高性能同步,每天可以稳定高效同步万亿级数据,已在字节跳动、滴滴、腾讯云、B 站、360、Shopee 等公司生产上使用
资源推荐
资源详情
资源评论
收起资源包目录
SeaTunnel是下一代超高性能、分布式、海量数据集成工具 (2000个子文件)
JdbcSqlServerCreateTableIT.java 57KB
ClusterFaultToleranceIT.java 54KB
ClusterFaultToleranceTwoPipelineIT.java 35KB
OrcReadStrategy.java 32KB
SqlServerStreamingChangeEventSource.java 30KB
RestService.java 30KB
ConfigParser.java 28KB
EsRestClient.java 27KB
SeaTunnelFTPFileSystem.java 24KB
JdbcMySqlCreateTableIT.java 23KB
SeaTunnelRowDebeziumDeserializationConverters.java 23KB
SFTPFileSystem.java 22KB
MongoDBConnectorDeserializationSchema.java 22KB
SimpleConfigObject.java 22KB
AbstractJdbcCatalog.java 21KB
JdbcPostgresIT.java 21KB
FieldNamedPreparedStatement.java 20KB
FieldNamedPreparedStatement.java 20KB
FakeConfig.java 20KB
MySqlUtils.java 20KB
ClickhouseFileSinkWriter.java 20KB
DebeziumToKafkaIT.java 20KB
KafkaIT.java 20KB
ClickhouseIT.java 20KB
RowConverter.java 19KB
ParquetReadStrategy.java 19KB
SqlServerUtils.java 19KB
XaFacadeImplAutoLoad.java 19KB
TypeConvertUtil.java 19KB
JdbcPostgresIdentifierIT.java 18KB
AmazondynamodbIT.java 18KB
CassandraIT.java 18KB
MongodbUtils.java 18KB
PulsarSource.java 18KB
BsonToRowDataConverters.java 18KB
AbstractJdbcSourceChunkSplitter.java 18KB
AbstractWriteStrategy.java 17KB
OrcWriteStrategy.java 17KB
RocketMqIT.java 17KB
MongodbStreamFetchTask.java 17KB
StarRocksRowBatchReader.java 17KB
MySqlSourceFetchTaskContext.java 17KB
CanalToKafkaIT.java 17KB
StarRocksCatalog.java 17KB
InternalRowConverter.java 16KB
MaxcomputeTypeMapper.java 16KB
JsonToRowConverters.java 16KB
ParquetWriteStrategy.java 16KB
HttpClientProvider.java 16KB
CanalToPulsarIT.java 16KB
DorisConfig.java 15KB
JdbcDorisIT.java 15KB
IncrementalSource.java 15KB
JdbcSelectDBCloudIT.java 15KB
StarRocksIT.java 15KB
SeaTunnelRowDebeziumDeserializeSchema.java 15KB
KafkaSourceSplitEnumerator.java 15KB
JdbcDorisdbIT.java 15KB
SqlServerCDCIT.java 15KB
KafkaSource.java 15KB
OptionRule.java 15KB
MysqlCDCIT.java 15KB
IncrementalSourceStreamFetcher.java 15KB
KafkaSourceReader.java 15KB
RocketMqSourceSplitEnumerator.java 14KB
RowDataToBsonConverters.java 14KB
CatalogTableUtil.java 14KB
JdbcMysqlIT.java 14KB
JsonRowDataSerDeSchemaTest.java 14KB
ConfigValidatorTest.java 14KB
RocketMqSource.java 14KB
RocketMqAdminUtil.java 14KB
JdbcOutputFormatBuilder.java 13KB
StarRocksStreamLoadVisitor.java 13KB
FactoryUtil.java 13KB
SelectDBStageLoad.java 13KB
RocketMqSourceReader.java 13KB
CoordinatedSource.java 13KB
KafkaConnectToKafkaIT.java 13KB
FakeOption.java 13KB
DorisStreamLoad.java 13KB
SqlServerSourceFetchTaskContext.java 13KB
PulsarSplitEnumerator.java 13KB
ReadableConfigTest.java 13KB
SqlServerCreateTableSqlBuilder.java 13KB
RowTypeConverter.java 13KB
RowToJsonConverters.java 13KB
JdbcSourceFactory.java 13KB
MongodbSourceOptions.java 12KB
ClickhouseProxy.java 12KB
MongodbScanFetchTask.java 12KB
MongodbFetchTaskContext.java 12KB
MySqlSnapshotSplitReadTask.java 12KB
JdbcBatchStatementExecutorBuilder.java 12KB
PathParser.java 12KB
CanalJsonSerDeSchemaTest.java 12KB
MysqlCreateTableSqlBuilder.java 12KB
MongodbCDCIT.java 12KB
SqlServerSnapshotSplitReadTask.java 12KB
ClickhouseSink.java 12KB
共 2000 条
- 1
- 2
- 3
- 4
- 5
- 6
- 20
资源评论
Java程序员-张凯
- 粉丝: 1w+
- 资源: 6718
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功