Flink 基础-watermark(第五章)

时间语义

  • Event Time:事件创建时间
  • Ingestion Time:数据进入 Flink 的时间
  • Processing Time:执行操作算子的本地时间,即机器时间

watermark

1. 概念

  • 当使用Event Time作为处理时间时,此时会根据数据的时间戳来处理数据,由于网络、分布式等原因,会产生乱序数据
  • watermark就是衡量Event Time进展的机制,一般结合window来实现,处理乱序时间时,可以设定延迟触发关闭窗口时间,可以说window的执行是watermark触发的
  • watermark是一条特殊的时间记录,并且需要单调递增,与数据时间戳相关

2. 使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 创建流式环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置流式处理时间语义,设置而事件时间
env.setStresmTimeCharacteristic(TimeCharacteristic.EventTime);

// source
DataStream<String> input = env...;

// sink
DataStream<String> output = input.map(...)
// 声明方式一,乱序数据
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtrator<String>(Time.Seconds(5)) {
@override
public long extractTimestamp(String element) {
// 返回数据中的时间戳
return LocalDateTime.parse(element, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
}
})
// 声明方式二,某些情况下,数据本来就是升序时间时,可以使用这种方式
.assignTimestampsAndWatermarks(new AscendingTimestampExtrator<String>() {
@override
public long extractAscendingTimestamp(String element) {
// 返回数据中的时间戳
return LocalDateTime.parse(element, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
}
})

;
env.execute();

3. 自定义 watermark

可以继承自一下两种,两者都继承自TimestampAssigner

  • AssignerWithPeriodicWatermarks

    • 周期性(一定时间间隔或者达到一定的记录条数)的生成 watermark:系统会周期性的将 watermark 插入到流中
    • 默认周期是200毫秒,可以使用 ExecutionConfig.setAutoWatermarkInterval() 方法进行设置
    • 升序和前面乱序的处理 BoundedOutOfOrdernessTimestampExtractor,都是基于周期性 watermark 的。
  • AssignerWithPunctuatedWatermarks

    • 没有时间周期规律,可打断的生成 watermark
    • 数据流中每一个递增的时间都会产生一个 watermark