1
第三课 通 过 Tunnel 命 令 收 集 海 量 数
据
Tunnel 的工作原理
作为数据通道,MaxComputer Tunnel 主要包括客户端 Client(集成在 MaxComputer SDK
中)和服务端 Tunnel Server,它们以 RESTful API 形式通信。用户通过调用 SDK 接口,
实现数据上传和下载。因此,理解 Tunnel 客户端和服务端数据如何传输以及交互,有助
于我们更好地使用 Tunnel。
数据如何传输
由于 Tunnel 是结构化数据传输服务,客户端和服务端之间首先需要协定数据格式,保证
两端对数据理解一致,最普遍的做法是通过序列化方式。上传数据时,客户端对结构化数
据进行序列化(生成二进制流),服务端接受到数据后,(对二进制流)执行反序列化,
还原出结构化数据,写到 MaxComputer 表中。下载数据类似,服务端读取 MaxComputer
表的结构化数据,执行序列化后发送给客户端,客户端接受后,执行反序列化。
Tunnel 采用了 Google Protobuf 作为其序列化的机制。每次序列化时,除了原始数据外,
还包含校验值(Checksum)和记录数,这样服务端接受反序列化后,可以先校验数据的
正确性。Tunnel 的数据传输协议以后很可能会公开,鼓励用户开发自己的 SDK 和工具,
在阿里内部,已经有团队开发了 Node.js 版的 SDK。
Tunnel 提 供 了 在 传 输 过 程 中 对 数 据 进 行 压 缩 , 这 是 一 个 可 选 项 , 可 以 通 过 函 数
UploadSession.openRecordWriter(long blockId, Boolean compress)的第二个参数指定。
值得一提的是,这个压缩是指网络传输过程中的压缩,而不是存储上的压缩。也就是说,
在上传时,如果执行压缩,客户端会对序列化后的数据先压缩,再发送给服务端,服务端
接受后,会对数据进行解压缩并反序列化,然后写到 MaxComputer 表(或分区)中。用
户只需要设定 compress 选项,压缩操作的实现对用户是透明的。
2
什么情况下采取压缩传输?实质上,这是个 CPU 和带宽之前的权衡。压缩消耗 CPU,节
省带宽,它本身有额外的开支,不一定会是传输变快。对于轻量级的应用,如简单的文本
传输(比如日志),压缩往往可以带来很大的性能提升;而对于计算密集型应用,比如图
片处理,压缩带来的额外开销可能会严重影响性能,不建议采用。
此外,Tunnel 还支持 HTTPS 加密传输,确保数据安全,和 dship 一样,在 odps.conf 文
件中设置 endpoint=https://service.odps.aliyun.com/api 就会以加密方式传输。同样,开
启加密传输的速度可能会比不开启加密慢 50%。
客户端和服务端如何交互
在上传数据时,客户端和服务端的交互过程可以简单地概括为三部曲:“创建上传 - 上
传数据 - 结束上传”,从 SDK 接口角度,大致如下:
1. 客户端执行 createUploadSession,服务端执行用户鉴权和权限检查,完成一些初始
化工作如创建临时目录、写 Meta 等,返回 UploadSession 对象(唯一 SessionId)。
2. 客户端打开 RecordWriter,上传指定 Block 数据;服务端通过 SessionId 获取之前创
建的临时目录,把数据反序列化后写入临时目录。
3. 客户端临时执行 commit,结束 UploadSession,服务端把数据从临时目录 Move 到结
果表(分区)所在的目录,修改 Meta。
下载数据类似不在赘述。
如何实现高并发
从客户端角度,顾名思义,SessionId 相同的 UploadSession 对象对于一个 Session,一
个 RecordWriter 对象即一个 Request。在一个 Session 中,可以(多进程或多线程)创
建多个 RecordWriter,实现并发上传不同 Block 的数据。但是,在很多情况下,并发方式
对性能的提升影响很小,这是因为单机数据传输的瓶颈往往在于网卡,这种情况下单线程
和多线程传输性能区别不大。在分布式环境中,交换机的带宽一般远大于网卡,如果可以
在多台机器上执行数据传输,才是真正实现并发。如果从阿里云 ECS 服务器上同步数据
到 MaxComputer,可以有非常高的带宽,并发就很有效。
在上传时,SessionId 可以通过 getId()方法获取。为了实现高并发上传,常见的做法是获
取 到 SessionId 之 后 , 启 动 多 个 进 程 , 每 个 进 程 调 用 getUploadSession() 函 数 获 取
3
UploadSession 实例,这些 UploadSession 实例对应同一个 Session,每个进程上传请求
再 分 别 上 传 不 同 的 Block 的 数 据 ( 即 调 用 getRecordWriter(blockid) 时 传 递 不 同 的
blockid)。通过这种方式,可以实现分布式高并发上传数据。
同理,在下载时,可以对 getRecoreCount()函数获取到的总记录数进行均匀切分,通过
多进程方式(每台机器一个进程),每个进程只下载一个区间范围的数据,各个区间之间
连 续 。 通 过 getId() 方 法 获 取 SessionId , 然 后 调 用 getDownloadSession() 函 数 获 取
DownloadSession 实例,每个实例下载一个区间范围的数据,实现并发下载。
Tunnel 导入数据
Tunnel 命令导入数据
准备数据
假设我们准备本地文件 wc_example.txt,内容如下:
I LOVE CHINA!
MY NAME IS MAGGIE.I LIVE IN SHANGHAI!I LIKE PLAYING BASKETBALL!
这里我们把该数据文件保存在 D:\odps\odps\bin 目录下。
创建 MaxCompute 表
我 们 需 要 把 上 面 的 数 据 导 入 到 MaxCompute 的 一 张 表 中 , 所 以 现 在 需 要 通 过
MaxCompute 客户端在项目中创建一张表:
CREATE TABLE wc_in (word string);
执行 tunnel 命令
输入表创建成功后,可以在 MaxCompute 客户端输入 tunnel 命令进行数据的导入,文
件中的一行时作为一条记录导入进去的,如下:
tunnel upload D:\odps\odps\bin\wc_example.txt wc_in;
4
执行成功后,查看表 wc_in 的记录,如下:
odps@ $odps_project>select * from wc_in;
ID = 20210918110501864g5z9c6
Log view:
http://webconsole.odps.aliyun-inc.com:8080/logview/?h=http://service-corp.odps.a
liyun-inc.com/api&p=odps_public_dev&i=20210918
QWxsb3ciLCJSZXNvdXJjZSI6WyJhY3M6b2RwczoqOnByb2plY3RzL29kcHNfcHVibGljX2Rldi9pbnN0
YW5jZXMvMjAxNTA5MTgxMTA1MDE4NjRnNXo5YzYiXX1dLC
+------+
| word |
+------+
| I LOVE CHINA! |
| MY NAME IS MAGGIE.I LIVE IN SHANGHAI!I LIKE PLAYING BASKETBALL! |
+------+
注意:当表中含有多个列时,可以通过
“-fd”
参数指定列分隔符;
Tunnel SDK 导入数据
关于如何利用 tunnel SDK 进行上传数据,下面也将通过场景介绍。场景描述:上传数据
到 MaxCompute , 其 中 , 项 目 空 间 为 ”cxbigdata202_03_dev” , 表 名
为”tunnel_sample_test”,分区为”pt=20220228,dt=shanghai”。
1. 创建表,添加分区:
CREATE TABLE IF NOT EXISTS tunnel_sample_test(
id STRING,
name STRING)
PARTITIONED BY (pt STRING, dt STRING); --创建表
ALTER TABLE tunnel_sample_test
ADD IF NOT EXISTS PARTITION (pt='20220228',dt='shanghai'); --添加分区
2. 创建 UploadSample 的工程目录结构,如下:
|---pom.xml
|---src
|---main
|---java
|---com
|---aliyun
|---odps
|---tunnel
|---example
|---UploadSample.java
UploadSample : tunnel 源文件
pom.xml : maven 工程文件
5
3. 编写 UploadSample 程序,程序如下:
package com.aliyun.odps.tunnel.example;
import java.io.IOException;
import java.util.Date;
import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordWriter;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;
import com.aliyun.odps.tunnel.TableTunnel.UploadSession;
public class UploadSample {
private static String accessId = "####";
private static String accessKey = "####";
private static String tunnelUrl = "https://dt.odps.aliyun.com";
private static String odpsUrl = "https://service.odps.aliyun.com/api";
private static String project = "cxbigdata201_02_dev";
private static String table = "tunnel_sample_test";
private static String partition = "pt=20220228,dt=shanghai";
public static void main(String args[]) {
Account account = new AliyunAccount(accessId, accessKey);
Odps odps = new Odps(account);
odps.setEndpoint(odpsUrl);
odps.setDefaultProject(project);
try {
TableTunnel tunnel = new TableTunnel(odps);
tunnel.setEndpoint(tunnelUrl);
PartitionSpec partitionSpec = new PartitionSpec(partition);
UploadSession uploadSession = tunnel.createUploadSession(project,
table, partitionSpec);
System.out.println("Session Status is : "
+ uploadSession.getStatus().toString());
TableSchema schema = uploadSession.getSchema();
RecordWriter recordWriter = uploadSession.openRecordWriter(0);
Record record = uploadSession.newRecord();
for (int i = 0; i < schema.getColumns().size(); i++) {
Column column = schema.getColumn(i);
switch (column.getType()) {
case BIGINT:
record.setBigint(i, 1L);
break;
case BOOLEAN:
record.setBoolean(i, true);
break;
case DATETIME:
record.setDatetime(i, new Date());
break;
case DOUBLE:
record.setDouble(i, 0.0);
break;
case STRING:
record.setString(i, "sample");
break;
评论0