11、Flink实战:容错机制(一)State,CheckPoint及重启策略简介

Flink的流计算是带状态的计算,为例更好的容错,引入了State和CheckPoint。

一、简介

1、 State一般指一个具体的Task/Operator的状态,State数据默认保持在Java的堆内存中;

2、 CheckPoint(可以理解为CheckPoint是把State数据持久化存储了),则表示了一个FlinkJob在一个特定时刻的一份全局状态快照(可以设置快照生成周期),即包含了所有Task/Operator的状态;

# 此处的Task是Flink中执行的基本单位,也有的地方叫SubTask;Operator是算子(Transformation)。

3、 Flink中有以下两种基本类型的State:KeyedState和OperatorState;

3、 1KeyedState是跟特定的Key绑定的状态,每一个Key都对应一个State;

3、 2OperatorState是全局的状态,与具体的key无关,整个Operator只对应一个State;

二、Keyed State

stream.keyby()会返回一个KeyedStream对象,与之对应的就是Keyed State。Flink针对Keyed State提供了以下可以保存State的数据结构。

1、 ValueState:类型为T的单值状态,可通过update方法更新状态值,通过value()方法获取值;

2、 ListState:Key上的状态值为一个列表,可以通过add方法往列表中添加值,也可以通过get()方法返回一个Iterable来遍历状态值;

3、 ReducingState:每次调用add方法添加值的时候,会调用用户传入的reduceFunction,最后合并到一个单一的状态值;

4、 MapState<K,V>:状态值是一个Map,用户通过put或putAll方法添加元素;

注:以上所述的State对象,仅仅用于与状态进行交互(更新,删除,清空等),而真正的状态值可能存在于内存、磁盘或其他分布式存储系统中,相当于我们只是持有了这个状态的句柄。

三、Operator State

Flink 针对Operator State 提供了以下可以保存State的数据结构 ListState

四、CheckPoint

CheckPoint是Flink实现容错的核心功能,它能根据配置周期性地基于Stream中各个Operator/Task的状态来生成快照,从而将这些状态数据定期持久化存储下来。Flink程序一旦意外崩溃,重启时可以有选择地从持久化的快照中进行恢复。

CheckPoint发挥作用有两个前提:

(1)需要持久化的source,它需要支持在一定时间内重放事件,典型的是消息队列(如Kafka, RabbitMQ)或文件系统。

(2)需要有用于State的持久化存储介质,如HDFS。

五、StateBackend

默认情况下,State会保存在TaskManager的内存中,CheckPoint会存储在JobManager的内存中。State和CheckPoint的存储位置取决于StateBackend的配置,Flink一共提供了3种StateBackend。

1MemoryStateBackend: State数据保存在Java堆内存中,执行CheckPoint的时候,会把State的快照数据保存到JobManager的内存中,基于内存的StateBackend不建议在生产环境中使用。

2FsStateBackend:State数据保存在TaskManager的内存中,执行CheckPoint的时候,把State的快照数据保存到配置的文件系统中,如果HDFS。

3RocksDBStateBackend: 在本地文件系统中维护状态,State会直接写入本地的RocksDB中。同时它需要配置一个远端的FileSystem URI(一般是HDFS),在进行CheckPoint的时候,会把本地的数据直接复制到远端的FileSystem中。程序重启的时候,会直接从远端的Filesystem中恢复数据到本地。RocksDB克服了State受内存限制的缺点,同时又能持久化到远端文件系统中,推荐在生成中使用。

六、重启策略

默认重启策略通过配置文件flink-conf.yaml中的Restart-strategy参数指定。常用策略有以下三种:

1、 固定间隔(Fixeddelay)如果启用了CheckPoint,但没有配置重启策略,则使用固定间隔策略;

2、 失败率(Failurerate):达到一定的失败率后就不重启了;

3、 无重启(Norestart);

七、SavePoint

SavePoint可以生成全局、一致性的快照,也可以保存数据源、Offset、Operator操作状态等信息,还可以从应用在过去任意做了SavePoint的时刻开始继续执行。

1、 SavePoint和CheckPoint的区别:;

(1)CheckPoint是应用定时触发,用于保存状态,它会过期,在内部应用失败重启的时候使用。

(2)SavePoint是用户手动执行,是指向CheckPoint的指针,它不会过期,可在升级的情况下使用。

2、 开启SavePoint;

(1)在flink-conf.yaml中配置SavePoint的存在位置

state.savepoint.dir:savepointPath

(2)触发一个SavePoint

直接触发:bin/flink savepoint jobid [targetDirectory] [-yid yarnAppid]

调用canel的时候触发:bin/flink cancel -s [targetDirectory] jobid [-yid yarnAppid]

(3)从指定的SavePoint启动Job

bin/flink run -s savepointPath [runArgs] 或者从 web界面中指定。

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: