没有合适的资源?快使用搜索试试~ 我知道了~
大数据之flink教程-TableAPI和SQL.docx
需积分: 15 10 下载量 23 浏览量
2020-11-23
20:51:15
上传
评论 1
收藏 483KB DOCX 举报
温馨提示
试读
50页
大数据之flink教程-TableAPI和SQL.docx
资源推荐
资源详情
资源评论
Table API 和 Flink SQL
第一章 整体介绍
1.1 什么是 Table API 和 Flink SQL
本身是批流统一的处理框架,所以 和 ,就是批流统一的上层处理
。
目前功能尚未完善,处于活跃的开发阶段。
是一套内嵌在 和 语言中的查询 ,它允许我们以非常直观的方式,
组合来自一些关系运算符的查询(比如 、 和 )。而对于 ,就是直接
可以在代码中写 ,来实现一些查询()操作。 的 支持,基于实现了
标准的 ( 开源 解析工具)。
无论输入是批输入还是流式输入,在这两套 中,指定的查询都具有相同的语义,
得到相同的结果。
1.2 需要引入的依赖
和 需要引入的依赖有两个: 和 。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.10.0</version>
</dependency>
!"": 计划器,是 最主要的部分,提供了运行时环境和
生成程序执行计划的 ;
!"""" : 桥接器,主要负责 和 #$%#
的连接支持,按照语言分 和 。
这里的两个依赖,是 #& 环境下运行需要添加的;如果是生产环境, 目录下默认已
经有了 ,就只需要有 就可以了。
当然,如果想使用用户自定义函数,或是跟 ' 做连接,需要有一个 ,这
个包含在 !""$$ 里。
1.3 两种 planner(old & blink)的区别
()批流统一:* 将批处理作业,视为流式处理的特殊情况。所以, 不支持表和
# 之间的转换,批处理作业将不转换为 # 应用程序,而是跟流处理一样,转换
为 #$ 程序来处理。
+) 因 为 批 流 统 一 , * 也 不 支 持 * , 而 使 用 有 界 的
$ 代替。
,)* 只支持全新的目录,不支持已弃用的 &- 。
.)旧 和 * 的 实现不兼容。旧的 会把
&- 下推到 中,而 则会把 &- 下推。
/)基于字符串的键值配置选项仅适用于 *。
0) 在两个 中的实现不同。
1)* 会将多个 优化在一个 #2 中(仅在 &$ 上受支持,而
在 $&$ 上不受支持)。而旧 的优化总是将每一个 放在一个
新的 #2 中,其中所有 #2 彼此独立。
3)旧的 不支持目录统计,而 * 支持。
第二章 API 调用
2.1 基本程序结构
和 的程序结构,与流式处理的程序结构类似;也可以近似地认为有这么
几步:首先创建执行环境,然后定义 、4$ 和 。
具体操作流程如下:
val tableEnv = ... //
创建表的执行环境
//
创建一张表,用于读取数据
tableEnv.connect(...).createTemporaryTable("inputTable")
//
注册一张表,用于把计算结果输出
tableEnv.connect(...).createTemporaryTable("outputTable")
//
通过
Table API
查询算子,得到一张结果表
val result = tableEnv.from("inputTable").select(...)
//
通过
SQL
查询语句,得到一张结果表
val sqlResult = tableEnv.sqlQuery("SELECT ... FROM inputTable ...")
//
将结果表写入输出表中
result.insertInto("outputTable")
2.2 创建表环境
创建表环境最简单的方式,就是基于流处理执行环境,调 方法直接创建:
val tableEnv = StreamTableEnvironment.create(env)
表环境(&$)是 ! 中集成 5 的核心概念。它负责6
注册
在内部 中注册表
执行 查询
注册用户自定义函数
将 #$或 #转换为表
保存对 &-7&$或 $&-7&$的引用
在创建 & 的时候,可以多传入一个 &$8 或者 参数,
可以用来配置 &$ 的一些特性。
比如,配置老版本的流式查询("$ "):
val settings = EnvironmentSettings.newInstance()
.useOldPlanner() //
使用老版本
planner
.inStreamingMode() //
流处理模式
.build()
val tableEnv = StreamTableEnvironment.create(env, settings)
基于老版本的批处理环境("*"):
val batchEnv = ExecutionEnvironment.getExecutionEnvironment
val batchTableEnv = BatchTableEnvironment.create(batchEnv)
基于 版本的流处理环境(*"$ "):
val bsSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode().build()
val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
基于 版本的批处理环境(*"*"):
val bbSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inBatchMode().build()
val bbTableEnv = TableEnvironment.create(bbSettings)
2.3 在 Catalog 中注册表
2.3.1 表(Table)的概念
&$ 可以注册目录 ,并可以基于 注册表。它会维护一个
" 表之间的 $。
表 ( ) 是 由 一 个 “ 标 识 符 ” 来 指 定 的 , 由 , 部 分 组 成 : 名 、 数 据 库
()名和对象名(表名)。如果没有指定目录或数据库,就使用当前的默认值。
表可以是常规的(,表),或者虚拟的(9:,视图)。常规表()一般可
以用来描述外部数据,比如文件、数据库表或消息队列的数据,也可以直接从 #$
转换而来。视图可以从现有的表中创建,通常是 或者 查询的一个结果。
2.3.2 连接到文件系统(Csv 格式)
连接外部系统在 中注册表,直接调用 &);<就可以,里面参数要传
剩余49页未读,继续阅读
资源评论
YuBx
- 粉丝: 25
- 资源: 26
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功