Flink 基础-窗口(第四章)

概念

将无界流切割为有界流,将切割的流数据放到有限大小的桶(bucket)中分析。

一旦应属于该窗口的第一个元素到达,就会创建一个窗口。并且当时间或者数量到达指定数量后,该窗口将被完全删除。

分类

1.滚动窗口(Tumbing Windows)

  • 将数据依据固定的长度进行切分
  • 时间对齐,窗口长度固定,没有重叠

2. 滑动窗口(Sliding Windows)

  • 滑动窗口由固定的窗口长度和滑动间隔构成
  • 窗口长度固定,可以叠加
  • 滚动窗口是一种窗口长度与滑动间隔相同的滑动窗口

3. 会话窗口(Session Windows)

  • 由一系列事件组合一个指定时间长度的时间间隙组成,也就是一段时间没有接到新数据就会关闭
  • 特点:时间无对齐

4. 全局窗口(Global Windows)

  • 可以设置移除器(evictor)和触发器(trigger),否则不会执行任何计算

API

1. 窗口分配器

1.1. 概念

窗口分配器,windows() 方法必须放在keyby方法后面,最后必须使用窗口聚合函数。

可以通过扩展WindowAssigner类来实现自定义窗口分配器。

1.2. 常用创建方法

1.2.1. 滚动窗口

直接声明窗口类型:

  • .window(TumblingEventTimeWindows.of(Time.seconds(5))):event-time 窗口
  • .window(TumblingProcessingTimeWindows.of(Time.seconds(5))):processing-time 窗口
  • .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))):滚动窗口分配器还采用一个可选 offset 参数,该参数可用于更改窗口的对齐方式。

间接声明窗口类型:

  • .timeWindow(Time.seconds(15)):滚动时间窗口,即时间到达 15s 时触发输出,默认使用的是 processing time
  • .countWindow(5):滚动计数窗口,即数据量达到 5 时触发输出
1.2.2. 滑动窗口

直接声明窗口类型:

  • .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))):event-time 窗口
  • .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))):processing-time 窗口

间接声明窗口类型:

  • .timeWindow(Time.seconds(15), Time.seconds(5)):滑动时间窗口,即时间到达 15s 后滑动 5s,默认使用的是 processing time
  • .countWindow(10, 5)滑动计数窗口,数据量达到 10 后滑动 5
1.2.3. 会话窗口

静态间隙可以直接设置,动态间隙通过实现SessionWindowTimeGapExtractor接口指定。

  • .window(EventTimeSessionWindows.withGap(Time.minutes(10))):静态间隙的 event-time 窗口
  • .window(EventTimeSessionWindows.withDynamicGap((element) -> { })):动态间隙的 event-time 窗口
  • .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))):静态间隙的 processing-time 窗口
  • .window(ProcessingTimeSessionWindows.withDynamicGap((element) -> { })):动态间隙的 processing-time 窗口
1.2.4. 全局窗口
  • .window(GlobalWindows.create()):全局窗口

2. 窗口函数

2.1. 概念

定义了对窗口中数据做的计算操作。

2.2. 分类

  • 增量聚合函数
    • 每条数据都会触发计算,只保持一个简单状态,比如 sum,只会保存求和的结果,每次更新这个和
    • ReduceFunctionAggrengateFunction
  • 全窗口函数:
    • 先收集窗口数据,等到需要计算时遍历所有数据计算,这种函数适用场景较多
    • ProcessWindowFunctionWindowFunction
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
StreamExecutionEnvironment env = ...;
DataStream<String> source = env...;

// 增量聚合函数
DateStream<String> sink1 =
source.keyBy(...)
.timeWindow(Time.seconds(5))
// 聚合函数一
.reduce(new ReduceFunction<String>() {
@override
public String reduce(Strng s1, String s2) throw Exception {
return ...;
}
})
// 聚合函数二,参数ACC是累加器
.aggregate(new AggrengateFunction<IN, ACC, OUT>() {

@override
public ACC createAccumulator() {
// 返回累加器初始值
return ...;
}

@override
public ACC add(IN value, ACC accumulator) {
return ...;
}

@override
public OUT getResult(ACC accumulator) {
return ...;
}

@override
public ACC merge(ACC a, ACC b) {
return ...;
}
});

// 全窗口函数
DateStream<String> sink2 =
source.keyBy(...)
.timeWindow(Time.seconds(5))
// 函数一,参数 KEY 是分组,W是当前窗口
.apply(new WindowFunction(IN, OUT, KEY, W) {
@override
public void apply(KEY key, TimeWindow window, Iterable<IN> in, Collector<OUT> out) throw exception {
...
}
});

2.3. 其他 API

  • .trigger():触发器,定义 window 什么时候关闭,触发计算并输出结果
  • .evictor():移除器,定义移除某些数据的逻辑
  • .allowedLateness():允许处理迟到的数据
  • .sideOutputLateData():将迟到的数据放入侧输出流
  • .getSideOutput():获取侧输出流
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
StreamExecutionEnvironment env = ...;
DataStream<String> source = env...;

// 定义侧输出流便签
OutputTag<IN> outputTag = new OutputTag<IN>("sideOutputName");

DateStream<String> sink =
source.keyBy(...)
.timeWindow(Time.seconds(5))
// 函数一
.trigger(new Trigger(...))
// 函数二
.evictor(new Evictor(...))
// 函数三,输出结果后,不关闭窗口,延迟 10s 后关闭,此时仍会在当前窗口进行统计,来一个处理一次
.allowedLateness(Time.second(10))
// 函数四,可以将窗口关闭后的数据放到侧输出流里
.sideOutputLateData(outputTag);

// 处理侧输出流
DateStream<String> side = source.getSideOutput(outputTag);

注意

  1. 开窗函数的起始窗口位置,是采用当前时间戳与偏移量的差值与窗口大小取模后的值,作为起始时间点,即
1
2
3
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize
}