## TableStore增量数据导出通道:TableStoreStreamReader
### 快速介绍
TableStoreStreamReader插件主要用于TableStore的增量数据导出,增量数据可以看作操作日志,除了数据本身外还附有操作信息。
与全量导出插件不同,增量导出插件只有多版本模式,同时不支持指定列。这是与增量导出的原理有关的,导出的格式下面有详细介绍。
使用插件前必须确保表上已经开启Stream功能,可以在建表的时候指定开启,或者使用SDK的UpdateTable接口开启。
开启Stream的方法:
SyncClient client = new SyncClient("", "", "", "");
1. 建表的时候开启:
CreateTableRequest createTableRequest = new CreateTableRequest(tableMeta);
createTableRequest.setStreamSpecification(new StreamSpecification(true, 24)); // 24代表增量数据保留24小时
client.createTable(createTableRequest);
2. 如果建表时未开启,可以通过UpdateTable开启:
UpdateTableRequest updateTableRequest = new UpdateTableRequest("tableName");
updateTableRequest.setStreamSpecification(new StreamSpecification(true, 24));
client.updateTable(updateTableRequest);
### 实现原理
首先用户使用SDK的UpdateTable功能,指定开启Stream并设置过期时间,即开启了增量功能。
开启后,TableStore服务端就会将用户的操作日志额外保存起来,
每个分区有一个有序的操作日志队列,每条操作日志会在一定时间后被垃圾回收,这个时间即用户指定的过期时间。
TableStore的SDK提供了几个Stream相关的API用于将这部分操作日志读取出来,增量插件也是通过TableStore SDK的接口获取到增量数据的,并将
增量数据转化为多个6元组的形式(pk, colName, version, colValue, opType, sequenceInfo)导入到ODPS中。
### Reader的配置模版:
"reader": {
"name" : "otsstreamreader",
"parameter" : {
"endpoint" : "",
"accessId" : "",
"accessKey" : "",
"instanceName" : "",
//dataTable即需要导出数据的表。
"dataTable" : "",
//statusTable是Reader用于保存状态的表,若该表不存在,Reader会自动创建该表。
//一次离线导出任务完成后,用户不应删除该表,该表中记录的状态可用于下次导出任务中。
"statusTable" : "TableStoreStreamReaderStatusTable",
//增量数据的时间范围(左闭右开)的左边界。
"startTimestampMillis" : "",
//增量数据的时间范围(左闭右开)的右边界。
"endTimestampMillis" : "",
//采云间调度只支持天级别,所以提供该配置,作用与startTimestampMillis和endTimestampMillis类似。
"date": "",
//是否导出时序信息。
"isExportSequenceInfo": true,
//从TableStore中读增量数据时,每次请求的最大重试次数,默认为30。
"maxRetries" : 30
}
}
### 参数说明
| 名称 | 说明 | 类型 | 必选 |
| ---- | ---- | ---- | ---- |
| endpoint | TableStoreServer的Endpoint地址。| String | 是 |
| accessId | 用于访问TableStore服务的accessId。| String | 是 |
| accessKey | 用于访问TableStore服务的accessKey。 | String | 是 |
| instanceName | TableStore的实例名称。 | String | 是 |
| dataTable | 需要导出增量数据的表的名称。该表需要开启Stream,可以在建表时开启,或者使用UpdateTable接口开启。 | String | 是 |
| statusTable | Reader插件用于记录状态的表的名称,这些状态可用于减少对非目标范围内的数据的扫描,从而加快导出速度。<br> 1. 用户不需要创建该表,只需要给出一个表名。Reader插件会尝试在用户的instance下创建该表,若该表不存在即创建新表,若该表已存在,会判断该表的Meta是否与期望一致,若不一致会抛出异常。<br> 2. 在一次导出完成之后,用户不应删除该表,该表的状态可用于下次导出任务。<br> 3. 该表会开启TTL,数据自动过期,因此可认为其数据量很小。<br> 4. 针对同一个instance下的多个不同的dataTable的Reader配置,可以使用同一个statusTable,记录的状态信息互不影响。 <br> 综上,用户配置一个类似TableStoreStreamReaderStatusTable之类的名称即可,注意不要与业务相关的表重名。| String | 是 |
| startTimestampMillis | 增量数据的时间范围(左闭右开)的左边界,单位毫秒。 <br> 1. Reader插件会从statusTable中找对应startTimestampMillis的位点,从该点开始读取开始导出数据。 <br> 2. 若statusTable中找不到对应的位点,则从系统保留的增量数据的第一条开始读取,并跳过写入时间小于startTimestampMillis的数据。| Long | 否 |
| endTimestampMillis | 增量数据的时间范围(左闭右开)的右边界,单位毫秒。<br> 1. Reader插件从startTimestampMillis位置开始导出数据后,当遇到第一条时间戳大于等于endTimestampMillis的数据时,结束导出数据,导出完成。 <br> 2. 当读取完当前全部的增量数据时,结束读取,即使未达到endTimestampMillis。 | Long | 否 |
| date | 日期格式为yyyyMMdd,如20151111,表示导出该日的数据。<br> 若没有指定date,则必须指定startTimestampMillis和endTimestampMillis,反之也成立。 | String | 否 |
| isExportSequenceInfo | 是否导出时序信息,时序信息包含了数据的写入时间等。默认该值为false,即不导出。 | Boolean | 否 |
| maxRetries | 从TableStore中读增量数据时,每次请求的最大重试次数,默认为30,重试之间有间隔,30次重试总时间约为5分钟,一般无需更改。| Int | 否 |
### 导出的数据格式
首先,在TableStore多版本模型下,表中的数据组织为“行-列-版本”三级的模式,
一行可以有任意列,列名也并非固定的,每一列可以含有多个版本,每个版本都有一个特定的时间戳(版本号)。
用户可以通过TableStore的API进行一系列读写操作,
TableStore通过记录用户最近对表的一系列写操作(或称为数据更改操作)来实现记录增量数据的目的,
所以也可以把增量数据看作一批操作记录。
TableStore有三类数据更改操作:PutRow、UpdateRow、DeleteRow。
+ PutRow的语义是写入一行,若该行已存在即覆盖该行。
+ UpdateRow的语义是更新一行,对原行其他数据不做更改,
更新可能包括新增或覆盖(若对应列的对应版本已存在)一些列值、删除某一列的全部版本、删除某一列的某个版本。
+ DeleteRow的语义是删除一行。
TableStore会根据每种操作生成对应的增量数据记录,Reader插件会读出这些记录,并导出成Datax的数据格式。
同时,由于TableStore具有动态列、多版本的特性,所以Reader插件导出的一行不对应TableStore中的一行,而是对应TableStore中的一列的一个版本。
即<B>TableStore中的一行可能会导出很多行,每行包含主键值、该列的列名、该列下该版本的时间戳(版本号)、该版本的值、操作类型</B>。若设置isExportSequenceInfo为true,还会包括时序信息。
转换为Datax的数据格式后,我们定义了四种操作类型,分别为:
+ U(UPDATE): 写入一列的一个版本
+ DO(DELETE_ONE_VERSION): 删除某一列的某个版本
+ DA(DELETE_ALL_VERSION): 删除某一列的全部版本,此时需要根据主键和列名,将对应列的全部版本删除
+ DR(DELETE_ROW): 删除某一行,此时需要根据主键,将该行数据全部删除
举例如下,假设该表有两个主键列,主键列名分别为pkName1, pkName2�
没有合适的资源?快使用搜索试试~ 我知道了~
Datax 开源修改版,增加 greenplum sqlserver2000 tbase Amazon s3 插件
共826个文件
java:526个
xml:107个
json:102个
1.该资源内容由用户上传,如若侵权请联系客服进行举报
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
版权申诉
5星 · 超过95%的资源 1 下载量 137 浏览量
2023-04-15
16:47:25
上传
评论
收藏 27.99MB ZIP 举报
温馨提示
Datax 开源修改版,增加 greenplum sqlserver2000 tbase Amazon s3 插件
资源推荐
资源详情
资源评论
收起资源包目录
Datax 开源修改版,增加 greenplum sqlserver2000 tbase Amazon s3 插件 (826个子文件)
.gitkeep 0B
.gitkeep 0B
.gitkeep 0B
.gitkeep 0B
.gitkeep 0B
.gitkeep 0B
ojdbc8-12.2.0.1.jar 3.85MB
db2jcc4.jar 3.37MB
db2jcc4.jar 3.37MB
bcprov-jdk15on-1.52.jar 2.77MB
bcprov-jdk15on-1.52.jar 2.77MB
ojdbc6-11.2.0.3.jar 2.59MB
ojdbc6-11.2.0.3.jar 2.59MB
guava-16.0.jar 2.12MB
jconn3-1.0.0-SNAPSHOT.jar 983KB
jconn3-1.0.0-SNAPSHOT.jar 983KB
Dm7JdbcDriver16.jar 800KB
Dm7JdbcDriver16.jar 800KB
edb-jdbc16.jar 652KB
edb-jdbc16.jar 652KB
sqlserjdbc.jar 525KB
sqlserjdbc.jar 525KB
sqlserjdbc2000.jar 521KB
FakePluginer.jar 19KB
FakePluginer.jar 19KB
oracle_oss_transformer-1.0-SNAPSHOT.jar 4KB
oracle_oss_transformer-1.0-SNAPSHOT.jar 4KB
JobContainer.java 39KB
DBUtil.java 33KB
DFSUtil.java 32KB
Configuration.java 31KB
AdsInsertProxy.java 30KB
HdfsHelper.java 30KB
OdpsUtil.java 28KB
PerfTrace.java 28KB
UnstructuredStorageReaderUtil.java 27KB
FilterTransformerTest.java 26KB
CommonRdbmsWriter.java 26KB
TaskGroupContainer.java 25KB
OssWriter.java 24KB
ConfigurationTest.java 24KB
Hbase11xHelper.java 22KB
S3Writer.java 22KB
ESWriter.java 21KB
Hbase094xHelper.java 20KB
HbaseUtil.java 19KB
AdsWriter.java 19KB
HdfsWriter.java 19KB
OdpsReader.java 19KB
SingleTableSplitUtil.java 18KB
MongoDBWriter.java 18KB
SecretUtil.java 17KB
VMInfo.java 17KB
OdpsUtil.java 17KB
AdsHelper.java 17KB
CheckpointTimeTracker.java 16KB
JobContainerTest.java 16KB
CommonRdbmsReader.java 16KB
OTSStreamReaderSlaveProxy.java 15KB
TxtFileWriter.java 15KB
Hbase11xHelper.java 15KB
OdpsWriter.java 15KB
UnstructuredStorageWriterUtil.java 14KB
StandardFtpHelperImpl.java 14KB
TxtFileReader.java 14KB
OssReader.java 14KB
StreamReader.java 14KB
HbaseSQLWriterTask.java 14KB
OcsWriter.java 13KB
ColumnDataType.java 13KB
OriginalConfPretreatmentUtil.java 13KB
HdfsReader.java 13KB
FtpWriter.java 13KB
Hbase094xHelper.java 13KB
SftpHelperImpl.java 13KB
RangeSplit.java 13KB
OTSStreamReaderConfig.java 12KB
CommunicationTool.java 12KB
ReaderProxy.java 12KB
TaskGroupContainerTest.java 11KB
RecordProcessor.java 11KB
HBaseWriter.java 11KB
RetryUtilTest.java 10KB
SftpHelper.java 10KB
FilterTransformer.java 10KB
RdbmsException.java 10KB
MongoDBReader.java 10KB
ParamChecker.java 10KB
StreamWriter.java 9KB
OtsReaderMasterProxy.java 9KB
StandardFtpHelper.java 9KB
RangeSplitUtil.java 9KB
RecordExchangerTest.java 9KB
OriginalConfPretreatmentUtil.java 9KB
Channel.java 9KB
RetryUtil.java 9KB
FtpReader.java 9KB
WriterUtil.java 9KB
Engine.java 9KB
HbaseSQLHelper.java 9KB
共 826 条
- 1
- 2
- 3
- 4
- 5
- 6
- 9
资源评论
- cpoc85992023-11-01这个资源值得下载,资源内容详细全面,与描述一致,受益匪浅。
Java程序员-张凯
- 粉丝: 1w+
- 资源: 6651
下载权益
C知道特权
VIP文章
课程特权
开通VIP
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
- oracle 触发器语法及 for each row 详细说明
- 微信小程序-豆瓣图书源码
- Fragment - Unknow animation name objectAnimator 问题解决及源码分析
- 词向量-中文文本相似度计算-采用text2vec词向量工具进行计算对比.zip
- 521293804316625base(1).apk
- ARP IP地址分类及特殊IP
- 【MySQL补丁】vcredist-x64xz MySQL必备插件
- 基于SG3525芯片PWM控制推挽隔离DCDC电源模块AD09设计硬件(原理图+PCB)工程文件.zip
- Word_20240428_092324.docx
- cloudcc_v1.1.10-release_sign.apk
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功