wiki
与之前的数据无关的数据,就是无状态,比如 map 等;需要其他数据才能得出结果的就是有状态,比如 sum。
状态可以认为是一个本地变量,保存着之前的结果,可以被任务的业务数据访问。
状态分类
1. 算子状态(Operatior State)
仅限于算子任务。
- 列表状态(list state):将状态表示为一组数据列表
- 联合列表状态(union list state):也是列表状态的,区别在于使用算法不用
- 广播状态(broadcast state):如果算子有多项任务,而它的每项任务状态有都相同,这种情况最适合广播状态。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public static class MyMapFunction implements MapFunction<String, Integer>, ListCheckpointed<Integer> { @override public Integer map(String str) throws Exception { ... } @override public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception { ... } @override public Integer restoreState(List<Integer> state) throws Exception { ... } }
|
2. 键控状态(Keyed State)
根据输入数据流中定义的键来维护和访问。flink 为每个 key 维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务。
- 值状态(Value State):表示为单个值
- 列表状态(List State):表示为一组数据的列表
- 映射状态(Map State):表示为一组K-V对
- 聚合状态(Reducing State & Aggregating State):表示为一个用于聚合操作的列表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public static class MyMapFunction extends RichMapFunction<String, Integer> { private ValueState<Integer> keyState; private ListState<Integer> listState; private MapState<String, Integer> mapState; private ReducingState<Integer> reducingState; @override public void open(Configuration parameters) throws Exception { keyState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("keyName", Integer.class)); listState = getRuntimeContext().getListState(new ListStateDescriptor<Integer>("listName", Integer.class)); mapState = getRuntimeContext().getMapState(new MapStateDescriptor<String, Integer>("mapName", String.class, Integer.class)); reducingState = getRuntimeContext() .getReducingState(new ReducingStateDescriptor<Integer>("reducingName",new MyReducFunction<Integer>(), Integer.class)); ... } @override public Integer map(String str) throws Exception { Integer i = keyState.value(); Iterable<Integer> ls = listState.get(); Integer mi = mapState.get("..."); reducingState.add(str); ... } }
|
状态后端(State Backends)
1. 概念
状态的存储、访问和维护,由一个可插入的组件决定,这个组件就是状态后端。其主要负责两件事:
- 本地状态管理
- 检车点(checkpoint)状态持久化
2. 分类
MemoryStateBackend
- 内存级,会将键控状态作为内存中的对象进行管理,将之存储在 TaskManager 的 JVM 上,将 checkpoint 存储在 JobManager 的内存中。
- 特点:快速、低延迟、但不稳定
FsStateBackend
- 本地状态也存储在 TaskManager 的 JVM 上,将 checkpoint 存储在远程文件系统中。
- 保证了本地访问速度,也有更好的容错性
RocksDBStateBackend
- 将所有状态序列化后,存入本地的 RocksDB 中存储。
- 容量大,但是访问速度没有在内存中快
3. 使用
3.1. 配置文件
state.backend: filesystem|jobmanager|rocksdb|<class-name-of-factory>
state.checkpoint.dir
state.savepoint.dir
state.backend.incremental: false|true
3.2. 代码中使用
1 2 3 4 5
| StreamExecutionEnvironment env = ...; env.setStateBackend(new MemoryStateBackend(...)); env.setStateBackend(new FsStateBackend(...));
env.setStateBackend(new RocksDBStateBackend(...));
|