Flink 基础-状态(第八章)

wiki

与之前的数据无关的数据,就是无状态,比如 map 等;需要其他数据才能得出结果的就是有状态,比如 sum。

状态可以认为是一个本地变量,保存着之前的结果,可以被任务的业务数据访问。

状态分类

1. 算子状态(Operatior State)

仅限于算子任务。

  • 列表状态(list state):将状态表示为一组数据列表
  • 联合列表状态(union list state):也是列表状态的,区别在于使用算法不用
  • 广播状态(broadcast state):如果算子有多项任务,而它的每项任务状态有都相同,这种情况最适合广播状态。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static class MyMapFunction implements MapFunction<String, Integer>, ListCheckpointed<Integer> {

@override
public Integer map(String str) throws Exception {
...
}

@override
public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
...
}

@override
public Integer restoreState(List<Integer> state) throws Exception {
...
}
}

2. 键控状态(Keyed State)

根据输入数据流中定义的键来维护和访问。flink 为每个 key 维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务。

  • 值状态(Value State):表示为单个值
  • 列表状态(List State):表示为一组数据的列表
  • 映射状态(Map State):表示为一组K-V对
  • 聚合状态(Reducing State & Aggregating State):表示为一个用于聚合操作的列表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public static class MyMapFunction extends RichMapFunction<String, Integer> {

// 值状态
private ValueState<Integer> keyState;
// 列表状态
private ListState<Integer> listState;
// 映射状态
private MapState<String, Integer> mapState;
// 聚合状态
private ReducingState<Integer> reducingState;

@override
public void open(Configuration parameters) throws Exception {
keyState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("keyName", Integer.class));
listState = getRuntimeContext().getListState(new ListStateDescriptor<Integer>("listName", Integer.class));
mapState = getRuntimeContext().getMapState(new MapStateDescriptor<String, Integer>("mapName", String.class, Integer.class));
reducingState = getRuntimeContext()
.getReducingState(new ReducingStateDescriptor<Integer>("reducingName",new MyReducFunction<Integer>(), Integer.class));
...
}

@override
public Integer map(String str) throws Exception {
Integer i = keyState.value();
Iterable<Integer> ls = listState.get();
Integer mi = mapState.get("...");
reducingState.add(str); //此时会调用第二个参数 MyReducFunction<Integer>()
...
}

}

状态后端(State Backends)

1. 概念

状态的存储、访问和维护,由一个可插入的组件决定,这个组件就是状态后端。其主要负责两件事:

  • 本地状态管理
  • 检车点(checkpoint)状态持久化

2. 分类

  • MemoryStateBackend
    • 内存级,会将键控状态作为内存中的对象进行管理,将之存储在 TaskManager 的 JVM 上,将 checkpoint 存储在 JobManager 的内存中。
    • 特点:快速、低延迟、但不稳定
  • FsStateBackend
    • 本地状态也存储在 TaskManager 的 JVM 上,将 checkpoint 存储在远程文件系统中。
    • 保证了本地访问速度,也有更好的容错性
  • RocksDBStateBackend
    • 将所有状态序列化后,存入本地的 RocksDB 中存储。
    • 容量大,但是访问速度没有在内存中快

3. 使用

3.1. 配置文件

  • state.backend: filesystem|jobmanager|rocksdb|<class-name-of-factory>
  • state.checkpoint.dir
  • state.savepoint.dir
  • state.backend.incremental: false|true

3.2. 代码中使用

1
2
3
4
5
StreamExecutionEnvironment env = ...;
env.setStateBackend(new MemoryStateBackend(...));
env.setStateBackend(new FsStateBackend(...));
// 需要引入 flink-statebackend-rocksdb_X.XX
env.setStateBackend(new RocksDBStateBackend(...));