Flink 基础-SQL&Table(第六章)

wiki

  • Table API :内嵌在 Java 和 Scala 中的查询 API,
  • Flink SQL :支持基于实现了 SQL 标准的 Apache Calcite

2. 基本使用

2.1. 使用 DataStream 转换 source

  1. 创建环境

    1
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. 接入 source

    1
    DataStream<T> source = ...;
  3. 转换成 POJO

    1
    DataStream<T> dataStream = source.map...;
  4. 引入依赖,创建表环境

    1
    2
    3
    4
    5
    /*
    * 必要:flink-table-planner_2.12 或者 flink-table-planner-blink_2.12
    * 非必:flink-table-api-java-bridge_2.12
    */
    StreamTableEnviroment tableEnv = StreamTableEnviroment.create(env);
  5. 表操作

    1
    2
    3
    4
    5
    6
    7
    8
    // 使用 Table API 查询,可以在第二个参数指定别名
    Table table = tableEnv.fromDataStream(source);
    Table tableApiRes = table.select("id", "name").where("id = '1'");

    // 使用 SQL 查询
    tableEnv.createTemporaryView("tableName", source);
    String sql = "SELECT id, name FROM table WHERE id = '1'";
    Table tableSqlRes = tableEnv.sqlQuery(sql);
  6. 打印结果

    1
    2
    tableEnv.toAppendStream(tableApiRes, Row.class).print("table api result");
    tableEnv.toAppendStream(tableSqlRes, Row.class).print("table sql result");

2.2. 直接将 source 读取为表

  1. 创建表的执行环境

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    // 流环境
    StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    // 批环境
    ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();

    // 可选构建配置:流批、planner
    EnvironmentSettings settings = EnvironmentSettings.newInstance()
    //.inStreamMode()
    //.inBatchMode()
    //.useOldPlanner()
    //.useBlinkPlanner()
    .build();

    // 基于流的表环境,老版和新版主要在 planner 选择上有差异
    StreamTableEnviroment tableEnv = StreamTableEnviroment.create(streamEnv, settings);

    // 基于批的表环境,老版,其中参数中传入的是批环境
    BatchTableEnviroment tableEnv = BatchTableEnviroment.create(batchEnv);

    // 基于批的表环境,新版,其中参数传入的是配置,并使用 useOldPlanner()
    TableEnviroment tableEnv = TableEnviroment.create(settings);
  2. 创建表,读取数据

    1
    tableEnv.connect(...).createTemporaryTable("sourceTable");
  3. 注册表,输出结果

    1
    tableEnv.connect(...).createTemporaryTable("sinkTable");
  4. 表操作

    1
    2
    3
    4
    5
    // 使用 Table API 查询
    Table tableApiRes = tableEnv.from("sourceTable").select(...);

    // 使用 SQL 查询
    Table tableSqlRes = tableEnv.sqlQuery("SELECT ...");
  5. 写入结果表

    1
    2
    tableApiRes.insertInto("sinkTable");
    tableSqlRes.insertInto("sinkTable");

3. 表

1. 概念

  • TableEnviroment可以注册目录Catalog,并可以基于Catalog去注册表
  • Table是由一个标识符(identifier)来指定,其由三部分组成
    • Catalog
    • 数据库(database)名
    • 对象名
  • 表分为常规表(Table)和虚拟表(View)
    • 常规表:Table,用来描述外部数据,比如文件、数据库、消息队列,也可以从DataStream转换来
    • 虚拟表:View,通常是 Table API 或 SQL 的结果集

2. 创建表

大致流程如下,需要引入必要的格式化包

1
2
3
4
tableEnv.connect(...)                     // 定义表的数据来源,和外部表建立连接
.withFormat(...) // 定义数据格式化方法,比如切分解析
.withSchema(...) // 定义表结构
.createTemporaryTable("MyTable"); // 创建临时表

2.1. 读取文件中数据

1
2
3
4
5
6
7
8
9
String filePath = "/home/user/table.txt"
tableEnv.connect(new FileSystem().path(filePath))
.withFormat(new CSV())
.withSchema(new Schema().field("id", DataTypes.BIGINT())
.field("name", DataTypes.STRING())
)
.createTemporaryTable("sourceTable");

Table table = tableEnv.from("sourceTable");

2.2. 读取 kafka 中数据

1
2
3
4
5
6
7
8
9
10
11
12
tableEnv.connect(new kafka().version("0.11")
.topic("glett-test-source")
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.server", "localhost:9092")
)
.withFormat(new CSV()) // csv,avro等
.withSchema(new Schema().field("id", DataTypes.BIGINT())
.field("name", DataTypes.STRING())
)
.createTemporaryTable("sourceTable");

Table table = tableEnv.from("sourceTable");

3. 查询表

  • 使用Table APITable tableRes = table.select("id", "name").where("id = '1'");
  • 使用SQLTable tableRes = tableEnv.sqlQuery("SELECT ...");

4. 输出表

4.1. 输出表数据到文件

1
2
3
4
5
6
7
8
9
10
String filePath = "/home/user/out.txt"
tableEnv.connect(new FileSystem().path(filePath))
.withFormat(new CSV())
.withSchema(new Schema().field("id", DataTypes.BIGINT())
.field("name", DataTypes.STRING())
)
.createTemporaryTable("sourceTable");

//由于数据进行了聚合,有数据的更新,即删除和插入,因此无法直接写入
//tableRes.insertInto("sinkTable");

4.2. 输出表数据到 kafka

1
2
3
4
5
6
7
8
9
10
11
12
tableEnv.connect(new kafka().version("0.11")
.topic("glett-test-sink")
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.server", "localhost:9092")
)
.withFormat(new CSV()) // csv,avro等
.withSchema(new Schema().field("id", DataTypes.BIGINT())
.field("name", DataTypes.STRING())
)
.createTemporaryTable("sourceTable");

tableRes.insertInto("sinkTable");

4. 表更新模式

对于流式查询,需要声明如何在表和外部连接器之间执行转换,与外部数据系统交换的消息类型,由更新模式(Update Mode)指定

4.1. 追加(append)模式

仅插入,和外部连接器只交换插入数据

4.2. 撤回(retract)模式

表和外部连接有添加和撤回消息。返回的信息会增加一个标志位,true表示插入,false表示删除

  • 插入操作编码为 Add 消息
  • 删除操作编码为 Retract 消息
  • 更新操作编码为上一条的 Retract 消息和下一条的 Add 消息

4.3. 更新插入(upsert)模式

  • 更新和插入编码为 upsert 消息
  • 删除操作编码为 Delete 消息