## TableStore增量数据导出通道:TableStoreStreamReader
### 快速介绍
TableStoreStreamReader插件主要用于TableStore的增量数据导出,增量数据可以看作操作日志,除了数据本身外还附有操作信息。
与全量导出插件不同,增量导出插件只有多版本模式,同时不支持指定列。这是与增量导出的原理有关的,导出的格式下面有详细介绍。
使用插件前必须确保表上已经开启Stream功能,可以在建表的时候指定开启,或者使用SDK的UpdateTable接口开启。
开启Stream的方法:
SyncClient client = new SyncClient("", "", "", "");
1. 建表的时候开启:
CreateTableRequest createTableRequest = new CreateTableRequest(tableMeta);
createTableRequest.setStreamSpecification(new StreamSpecification(true, 24)); // 24代表增量数据保留24小时
client.createTable(createTableRequest);
2. 如果建表时未开启,可以通过UpdateTable开启:
UpdateTableRequest updateTableRequest = new UpdateTableRequest("tableName");
updateTableRequest.setStreamSpecification(new StreamSpecification(true, 24));
client.updateTable(updateTableRequest);
### 实现原理
首先用户使用SDK的UpdateTable功能,指定开启Stream并设置过期时间,即开启了增量功能。
开启后,TableStore服务端就会将用户的操作日志额外保存起来,
每个分区有一个有序的操作日志队列,每条操作日志会在一定时间后被垃圾回收,这个时间即用户指定的过期时间。
TableStore的SDK提供了几个Stream相关的API用于将这部分操作日志读取出来,增量插件也是通过TableStore SDK的接口获取到增量数据的,并将
增量数据转化为多个6元组的形式(pk, colName, version, colValue, opType, sequenceInfo)导入到ODPS中。
### Reader的配置模版:
"reader": {
"name" : "otsstreamreader",
"parameter" : {
"endpoint" : "",
"accessId" : "",
"accessKey" : "",
"instanceName" : "",
//dataTable即需要导出数据的表。
"dataTable" : "",
//statusTable是Reader用于保存状态的表,若该表不存在,Reader会自动创建该表。
//一次离线导出任务完成后,用户不应删除该表,该表中记录的状态可用于下次导出任务中。
"statusTable" : "TableStoreStreamReaderStatusTable",
//增量数据的时间范围(左闭右开)的左边界。
"startTimestampMillis" : "",
//增量数据的时间范围(左闭右开)的右边界。
"endTimestampMillis" : "",
//采云间调度只支持天级别,所以提供该配置,作用与startTimestampMillis和endTimestampMillis类似。
"date": "",
//是否导出时序信息。
"isExportSequenceInfo": true,
//从TableStore中读增量数据时,每次请求的最大重试次数,默认为30。
"maxRetries" : 30
}
}
### 参数说明
| 名称 | 说明 | 类型 | 必选 |
| ---- | ---- | ---- | ---- |
| endpoint | TableStoreServer的Endpoint地址。| String | 是 |
| accessId | 用于访问TableStore服务的accessId。| String | 是 |
| accessKey | 用于访问TableStore服务的accessKey。 | String | 是 |
| instanceName | TableStore的实例名称。 | String | 是 |
| dataTable | 需要导出增量数据的表的名称。该表需要开启Stream,可以在建表时开启,或者使用UpdateTable接口开启。 | String | 是 |
| statusTable | Reader插件用于记录状态的表的名称,这些状态可用于减少对非目标范围内的数据的扫描,从而加快导出速度。<br> 1. 用户不需要创建该表,只需要给出一个表名。Reader插件会尝试在用户的instance下创建该表,若该表不存在即创建新表,若该表已存在,会判断该表的Meta是否与期望一致,若不一致会抛出异常。<br> 2. 在一次导出完成之后,用户不应删除该表,该表的状态可用于下次导出任务。<br> 3. 该表会开启TTL,数据自动过期,因此可认为其数据量很小。<br> 4. 针对同一个instance下的多个不同的dataTable的Reader配置,可以使用同一个statusTable,记录的状态信息互不影响。 <br> 综上,用户配置一个类似TableStoreStreamReaderStatusTable之类的名称即可,注意不要与业务相关的表重名。| String | 是 |
| startTimestampMillis | 增量数据的时间范围(左闭右开)的左边界,单位毫秒。 <br> 1. Reader插件会从statusTable中找对应startTimestampMillis的位点,从该点开始读取开始导出数据。 <br> 2. 若statusTable中找不到对应的位点,则从系统保留的增量数据的第一条开始读取,并跳过写入时间小于startTimestampMillis的数据。| Long | 否 |
| endTimestampMillis | 增量数据的时间范围(左闭右开)的右边界,单位毫秒。<br> 1. Reader插件从startTimestampMillis位置开始导出数据后,当遇到第一条时间戳大于等于endTimestampMillis的数据时,结束导出数据,导出完成。 <br> 2. 当读取完当前全部的增量数据时,结束读取,即使未达到endTimestampMillis。 | Long | 否 |
| date | 日期格式为yyyyMMdd,如20151111,表示导出该日的数据。<br> 若没有指定date,则必须指定startTimestampMillis和endTimestampMillis,反之也成立。 | String | 否 |
| isExportSequenceInfo | 是否导出时序信息,时序信息包含了数据的写入时间等。默认该值为false,即不导出。 | Boolean | 否 |
| maxRetries | 从TableStore中读增量数据时,每次请求的最大重试次数,默认为30,重试之间有间隔,30次重试总时间约为5分钟,一般无需更改。| Int | 否 |
### 导出的数据格式
首先,在TableStore多版本模型下,表中的数据组织为“行-列-版本”三级的模式,
一行可以有任意列,列名也并非固定的,每一列可以含有多个版本,每个版本都有一个特定的时间戳(版本号)。
用户可以通过TableStore的API进行一系列读写操作,
TableStore通过记录用户最近对表的一系列写操作(或称为数据更改操作)来实现记录增量数据的目的,
所以也可以把增量数据看作一批操作记录。
TableStore有三类数据更改操作:PutRow、UpdateRow、DeleteRow。
+ PutRow的语义是写入一行,若该行已存在即覆盖该行。
+ UpdateRow的语义是更新一行,对原行其他数据不做更改,
更新可能包括新增或覆盖(若对应列的对应版本已存在)一些列值、删除某一列的全部版本、删除某一列的某个版本。
+ DeleteRow的语义是删除一行。
TableStore会根据每种操作生成对应的增量数据记录,Reader插件会读出这些记录,并导出成Datax的数据格式。
同时,由于TableStore具有动态列、多版本的特性,所以Reader插件导出的一行不对应TableStore中的一行,而是对应TableStore中的一列的一个版本。
即<B>TableStore中的一行可能会导出很多行,每行包含主键值、该列的列名、该列下该版本的时间戳(版本号)、该版本的值、操作类型</B>。若设置isExportSequenceInfo为true,还会包括时序信息。
转换为Datax的数据格式后,我们定义了四种操作类型,分别为:
+ U(UPDATE): 写入一列的一个版本
+ DO(DELETE_ONE_VERSION): 删除某一列的某个版本
+ DA(DELETE_ALL_VERSION): 删除某一列的全部版本,此时需要根据主键和列名,将对应列的全部版本删除
+ DR(DELETE_ROW): 删除某一行,此时需要根据主键,将该行数据全部删除
举例如下,假设该表有两个主键列,主键列名分别为pkName1, pkName2�
没有合适的资源?快使用搜索试试~ 我知道了~
温馨提示
【项目资源】: 包含前端、后端、移动开发、操作系统、人工智能、物联网、信息化管理、数据库、硬件开发、大数据、课程资源、音视频、网站开发等各种技术项目的源码。 包括STM32、ESP8266、PHP、QT、Linux、iOS、C++、Java、python、web、C#、EDA、proteus、RTOS等项目的源码。 【项目质量】: 所有源码都经过严格测试,可以直接运行。 功能在确认正常工作后才上传。 【适用人群】: 适用于希望学习不同技术领域的小白或进阶学习者。 可作为毕设项目、课程设计、大作业、工程实训或初期项目立项。 【附加价值】: 项目具有较高的学习借鉴价值,也可直接拿来修改复刻。 对于有一定基础或热衷于研究的人来说,可以在这些基础代码上进行修改和扩展,实现其他功能。 【沟通交流】: 有任何使用上的问题,欢迎随时与博主沟通,博主会及时解答。 鼓励下载和使用,并欢迎大家互相学习,共同进步。
资源推荐
资源详情
资源评论
收起资源包目录
基于阿里Datax改版web datax ,支持管理平台与restful风格API.zip (761个子文件)
3_1562312097720 85KB
.gitignore 2KB
.gitignore 320B
.gitkeep 0B
.gitkeep 0B
.gitkeep 0B
.gitkeep 0B
.gitkeep 0B
.gitkeep 0B
JobContainer.java 43KB
DBUtil.java 35KB
DFSUtil.java 31KB
Configuration.java 31KB
AdsInsertProxy.java 30KB
UnstructuredStorageReaderUtil.java 29KB
OdpsUtil.java 28KB
PerfTrace.java 28KB
HdfsHelper.java 27KB
CommonRdbmsWriter.java 27KB
TaskGroupContainer.java 26KB
OssWriter.java 24KB
Hbase11xHelper.java 22KB
ESWriter.java 21KB
Hbase094xHelper.java 20KB
AdsWriter.java 20KB
HBase20SQLReaderHelper.java 19KB
SingleTableSplitUtil.java 19KB
OdpsReader.java 19KB
MongoDBWriter.java 18KB
HdfsWriter.java 18KB
VMInfo.java 17KB
SecretUtil.java 17KB
OdpsUtil.java 17KB
AdsHelper.java 16KB
CheckpointTimeTracker.java 16KB
OTSStreamReaderSlaveProxy.java 15KB
TxtFileWriter.java 15KB
CommonRdbmsReader.java 15KB
UnstructuredStorageWriterUtil.java 15KB
Hbase11xHelper.java 15KB
HBase20xSQLWriterTask.java 15KB
OdpsWriter.java 15KB
OriginalConfPretreatmentUtil.java 14KB
HbaseSQLWriterTask.java 14KB
StandardFtpHelperImpl.java 14KB
TxtFileReader.java 14KB
OssReader.java 14KB
StreamReader.java 14KB
OcsWriter.java 13KB
HbaseSQLHelper.java 13KB
ColumnDataType.java 13KB
HdfsReader.java 13KB
FtpWriter.java 13KB
Hbase094xHelper.java 13KB
SftpHelperImpl.java 13KB
ServletUtils.java 13KB
RangeSplit.java 13KB
OTSStreamReaderConfig.java 12KB
CommunicationTool.java 12KB
ReaderProxy.java 12KB
RecordProcessor.java 11KB
ThinClientPTable.java 10KB
SftpHelper.java 10KB
FilterTransformer.java 10KB
RdbmsException.java 10KB
MongoDBReader.java 10KB
ParamChecker.java 10KB
AdsClientProxy.java 10KB
RetryUtil.java 9KB
OriginalConfPretreatmentUtil.java 9KB
StreamWriter.java 9KB
OtsReaderMasterProxy.java 9KB
StandardFtpHelper.java 9KB
WriterUtil.java 9KB
Channel.java 9KB
Engine.java 9KB
HbaseSQLWriterConfig.java 9KB
FtpReader.java 9KB
ConfigParser.java 9KB
CoreConstant.java 8KB
JobAssignUtil.java 8KB
Communication.java 8KB
AdsUtil.java 8KB
ESClient.java 8KB
OpenTSDBReader.java 8KB
ReaderModelParser.java 8KB
OTSStreamReaderChecker.java 8KB
OdpsWriterProxy.java 8KB
HbaseSQLReaderTask.java 8KB
ReaderSplitUtil.java 8KB
AdsInsertUtil.java 7KB
RangeSplitUtil.java 7KB
LoadUtil.java 7KB
TransformerRegistry.java 7KB
SubCommonRdbmsReader.java 7KB
CollectionSplitUtil.java 7KB
SubCommonRdbmsWriter.java 7KB
IDataxJobServiceImpl.java 7KB
StreamJob.java 7KB
PerfRecord.java 7KB
共 761 条
- 1
- 2
- 3
- 4
- 5
- 6
- 8
资源评论
妄北y
- 粉丝: 1w+
- 资源: 1万+
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功