概念
将无界流切割为有界流,将切割的流数据放到有限大小的桶(bucket)中分析。
一旦应属于该窗口的第一个元素到达,就会创建一个窗口。并且当时间或者数量到达指定数量后,该窗口将被完全删除。
分类
1.滚动窗口(Tumbing Windows)
- 将数据依据固定的长度进行切分
- 时间对齐,窗口长度固定,没有重叠
2. 滑动窗口(Sliding Windows)
- 滑动窗口由固定的窗口长度和滑动间隔构成
- 窗口长度固定,可以叠加
- 滚动窗口是一种窗口长度与滑动间隔相同的滑动窗口
3. 会话窗口(Session Windows)
- 由一系列事件组合一个指定时间长度的时间间隙组成,也就是一段时间没有接到新数据就会关闭
- 特点:时间无对齐
4. 全局窗口(Global Windows)
- 可以设置移除器(
evictor
)和触发器(trigger
),否则不会执行任何计算
API
1. 窗口分配器
1.1. 概念
窗口分配器,windows()
方法必须放在keyby
方法后面,最后必须使用窗口聚合函数。
可以通过扩展WindowAssigner
类来实现自定义窗口分配器。
1.2. 常用创建方法
1.2.1. 滚动窗口
直接声明窗口类型:
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
:event-time 窗口.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
:processing-time 窗口.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
:滚动窗口分配器还采用一个可选 offset 参数,该参数可用于更改窗口的对齐方式。
间接声明窗口类型:
.timeWindow(Time.seconds(15))
:滚动时间窗口,即时间到达 15s 时触发输出,默认使用的是 processing time.countWindow(5)
:滚动计数窗口,即数据量达到 5 时触发输出
1.2.2. 滑动窗口
直接声明窗口类型:
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
:event-time 窗口.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
:processing-time 窗口
间接声明窗口类型:
.timeWindow(Time.seconds(15), Time.seconds(5))
:滑动时间窗口,即时间到达 15s 后滑动 5s,默认使用的是 processing time.countWindow(10, 5)
滑动计数窗口,数据量达到 10 后滑动 5
1.2.3. 会话窗口
静态间隙可以直接设置,动态间隙通过实现SessionWindowTimeGapExtractor
接口指定。
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
:静态间隙的 event-time 窗口.window(EventTimeSessionWindows.withDynamicGap((element) -> { }))
:动态间隙的 event-time 窗口.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
:静态间隙的 processing-time 窗口.window(ProcessingTimeSessionWindows.withDynamicGap((element) -> { }))
:动态间隙的 processing-time 窗口
1.2.4. 全局窗口
.window(GlobalWindows.create())
:全局窗口
2. 窗口函数
2.1. 概念
定义了对窗口中数据做的计算操作。
2.2. 分类
- 增量聚合函数
- 每条数据都会触发计算,只保持一个简单状态,比如 sum,只会保存求和的结果,每次更新这个和
ReduceFunction
,AggrengateFunction
- 全窗口函数:
- 先收集窗口数据,等到需要计算时遍历所有数据计算,这种函数适用场景较多
ProcessWindowFunction
,WindowFunction
1 | StreamExecutionEnvironment env = ...; |
2.3. 其他 API
.trigger()
:触发器,定义 window 什么时候关闭,触发计算并输出结果.evictor()
:移除器,定义移除某些数据的逻辑.allowedLateness()
:允许处理迟到的数据.sideOutputLateData()
:将迟到的数据放入侧输出流.getSideOutput()
:获取侧输出流
1 | StreamExecutionEnvironment env = ...; |
注意
- 开窗函数的起始窗口位置,是采用当前时间戳与偏移量的差值与窗口大小取模后的值,作为起始时间点,即
1 | public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) { |