wiki
1. Flink SQL 与 Table API 区别
- Table API :内嵌在 Java 和 Scala 中的查询 API,
- Flink SQL :支持基于实现了 SQL 标准的 Apache Calcite
2. 基本使用
2.1. 使用 DataStream 转换 source
创建环境
1
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
接入 source
1
DataStream<T> source = ...;
转换成 POJO
1
DataStream<T> dataStream = source.map...;
引入依赖,创建表环境
1
2
3
4
5/*
* 必要:flink-table-planner_2.12 或者 flink-table-planner-blink_2.12
* 非必:flink-table-api-java-bridge_2.12
*/
StreamTableEnviroment tableEnv = StreamTableEnviroment.create(env);表操作
1
2
3
4
5
6
7
8// 使用 Table API 查询,可以在第二个参数指定别名
Table table = tableEnv.fromDataStream(source);
Table tableApiRes = table.select("id", "name").where("id = '1'");
// 使用 SQL 查询
tableEnv.createTemporaryView("tableName", source);
String sql = "SELECT id, name FROM table WHERE id = '1'";
Table tableSqlRes = tableEnv.sqlQuery(sql);打印结果
1
2tableEnv.toAppendStream(tableApiRes, Row.class).print("table api result");
tableEnv.toAppendStream(tableSqlRes, Row.class).print("table sql result");
2.2. 直接将 source 读取为表
创建表的执行环境
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21// 流环境
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
// 批环境
ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
// 可选构建配置:流批、planner
EnvironmentSettings settings = EnvironmentSettings.newInstance()
//.inStreamMode()
//.inBatchMode()
//.useOldPlanner()
//.useBlinkPlanner()
.build();
// 基于流的表环境,老版和新版主要在 planner 选择上有差异
StreamTableEnviroment tableEnv = StreamTableEnviroment.create(streamEnv, settings);
// 基于批的表环境,老版,其中参数中传入的是批环境
BatchTableEnviroment tableEnv = BatchTableEnviroment.create(batchEnv);
// 基于批的表环境,新版,其中参数传入的是配置,并使用 useOldPlanner()
TableEnviroment tableEnv = TableEnviroment.create(settings);创建表,读取数据
1
tableEnv.connect(...).createTemporaryTable("sourceTable");
注册表,输出结果
1
tableEnv.connect(...).createTemporaryTable("sinkTable");
表操作
1
2
3
4
5// 使用 Table API 查询
Table tableApiRes = tableEnv.from("sourceTable").select(...);
// 使用 SQL 查询
Table tableSqlRes = tableEnv.sqlQuery("SELECT ...");写入结果表
1
2tableApiRes.insertInto("sinkTable");
tableSqlRes.insertInto("sinkTable");
3. 表
1. 概念
TableEnviroment
可以注册目录Catalog
,并可以基于Catalog
去注册表- 表
Table
是由一个标识符(identifier
)来指定,其由三部分组成Catalog
名- 数据库(
database
)名 - 对象名
- 表分为常规表(
Table
)和虚拟表(View
)- 常规表:
Table
,用来描述外部数据,比如文件、数据库、消息队列,也可以从DataStream
转换来 - 虚拟表:
View
,通常是 Table API 或 SQL 的结果集
- 常规表:
2. 创建表
大致流程如下,需要引入必要的格式化包
1 | tableEnv.connect(...) // 定义表的数据来源,和外部表建立连接 |
2.1. 读取文件中数据
1 | String filePath = "/home/user/table.txt" |
2.2. 读取 kafka 中数据
1 | tableEnv.connect(new kafka().version("0.11") |
3. 查询表
- 使用
Table API
:Table tableRes = table.select("id", "name").where("id = '1'");
- 使用
SQL
:Table tableRes = tableEnv.sqlQuery("SELECT ...");
4. 输出表
4.1. 输出表数据到文件
1 | String filePath = "/home/user/out.txt" |
4.2. 输出表数据到 kafka
1 | tableEnv.connect(new kafka().version("0.11") |
4. 表更新模式
对于流式查询,需要声明如何在表和外部连接器之间执行转换,与外部数据系统交换的消息类型,由更新模式(Update Mode
)指定
4.1. 追加(append)模式
仅插入,和外部连接器只交换插入数据
4.2. 撤回(retract)模式
表和外部连接有添加和撤回消息。返回的信息会增加一个标志位,true
表示插入,false
表示删除
- 插入操作编码为
Add
消息 - 删除操作编码为
Retract
消息 - 更新操作编码为上一条的
Retract
消息和下一条的Add
消息
4.3. 更新插入(upsert)模式
- 更新和插入编码为
upsert
消息 - 删除操作编码为
Delete
消息