Flink的流计算是带状态的计算,为例更好的容错,引入了State和CheckPoint。
一、简介
1、 State一般指一个具体的Task/Operator的状态,State数据默认保持在Java的堆内存中;
2、 CheckPoint(可以理解为CheckPoint是把State数据持久化存储了),则表示了一个FlinkJob在一个特定时刻的一份全局状态快照(可以设置快照生成周期),即包含了所有Task/Operator的状态;
# 此处的Task是Flink中执行的基本单位,也有的地方叫SubTask;Operator是算子(Transformation)。
3、 Flink中有以下两种基本类型的State:KeyedState和OperatorState;
3、 1KeyedState是跟特定的Key绑定的状态,每一个Key都对应一个State;
3、 2OperatorState是全局的状态,与具体的key无关,整个Operator只对应一个State;
二、Keyed State
stream.keyby()会返回一个KeyedStream对象,与之对应的就是Keyed State。Flink针对Keyed State提供了以下可以保存State的数据结构。
1、 ValueState
2、 ListState
3、 ReducingState
4、 MapState<K,V>:状态值是一个Map,用户通过put或putAll方法添加元素;
注:以上所述的State对象,仅仅用于与状态进行交互(更新,删除,清空等),而真正的状态值可能存在于内存、磁盘或其他分布式存储系统中,相当于我们只是持有了这个状态的句柄。
三、Operator State
Flink 针对Operator State 提供了以下可以保存State的数据结构 ListState
四、CheckPoint
CheckPoint是Flink实现容错的核心功能,它能根据配置周期性地基于Stream中各个Operator/Task的状态来生成快照,从而将这些状态数据定期持久化存储下来。Flink程序一旦意外崩溃,重启时可以有选择地从持久化的快照中进行恢复。
CheckPoint发挥作用有两个前提:
(1)需要持久化的source,它需要支持在一定时间内重放事件,典型的是消息队列(如Kafka, RabbitMQ)或文件系统。
(2)需要有用于State的持久化存储介质,如HDFS。
五、StateBackend
默认情况下,State会保存在TaskManager的内存中,CheckPoint会存储在JobManager的内存中。State和CheckPoint的存储位置取决于StateBackend的配置,Flink一共提供了3种StateBackend。
1MemoryStateBackend: State数据保存在Java堆内存中,执行CheckPoint的时候,会把State的快照数据保存到JobManager的内存中,基于内存的StateBackend不建议在生产环境中使用。
2FsStateBackend:State数据保存在TaskManager的内存中,执行CheckPoint的时候,把State的快照数据保存到配置的文件系统中,如果HDFS。
3RocksDBStateBackend: 在本地文件系统中维护状态,State会直接写入本地的RocksDB中。同时它需要配置一个远端的FileSystem URI(一般是HDFS),在进行CheckPoint的时候,会把本地的数据直接复制到远端的FileSystem中。程序重启的时候,会直接从远端的Filesystem中恢复数据到本地。RocksDB克服了State受内存限制的缺点,同时又能持久化到远端文件系统中,推荐在生成中使用。
六、重启策略
默认重启策略通过配置文件flink-conf.yaml中的Restart-strategy参数指定。常用策略有以下三种:
1、 固定间隔(Fixeddelay)如果启用了CheckPoint,但没有配置重启策略,则使用固定间隔策略;
2、 失败率(Failurerate):达到一定的失败率后就不重启了;
3、 无重启(Norestart);
七、SavePoint
SavePoint可以生成全局、一致性的快照,也可以保存数据源、Offset、Operator操作状态等信息,还可以从应用在过去任意做了SavePoint的时刻开始继续执行。
1、 SavePoint和CheckPoint的区别:;
(1)CheckPoint是应用定时触发,用于保存状态,它会过期,在内部应用失败重启的时候使用。
(2)SavePoint是用户手动执行,是指向CheckPoint的指针,它不会过期,可在升级的情况下使用。
2、 开启SavePoint;
(1)在flink-conf.yaml中配置SavePoint的存在位置
state.savepoint.dir:savepointPath
(2)触发一个SavePoint
直接触发:bin/flink savepoint jobid [targetDirectory] [-yid yarnAppid]
调用canel的时候触发:bin/flink cancel -s [targetDirectory] jobid [-yid yarnAppid]
(3)从指定的SavePoint启动Job
bin/flink run -s savepointPath [runArgs] 或者从 web界面中指定。
版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: