在使用了flink的状态管理之后,因为此时所有的state的读写都只是在task本地的内存中进行,也就是state数据此时只存储在内存中。假设当任务出现故障之后,这些在内存中的state数据也会丢失,就无法恢复了。所以需要一种机制来保障这些state数据的不丢失,这也就是容错机制。flink通过checkpoint来实现。flink开启了checkpoint之后,会定时将状态数据的快照持久存储到指定的statebackend。
flink定期对整个job任务进行快照,将快照产生的备份数据保存到指定的statebacked中。当出现故障时,将job 的状态恢复到最近的一个快照点。Flink 的容错机制的核心部分是生成分布式数据流和operator状态一致的快照。这些快照充当checkpoint(检查点), 系统可以早发生故障时将其回滚。分布式快照是由 Chandy-Lamport 算法实现的。
每个checkpoint由checkpoint ID和timestamp来唯一标识,其中checkpoint ID可以是standalone(基于内存,保存在jobmanager内存中)的,也可能是基于ZK的。
1、持续的数据源,比如消息队列或者文件系统上的文件等
2、状态数据的持久化存储,比如采用分布式文件系统存储状态数据
默认情况下,flink是禁用了checkpoint的。下面看看程序中开启checkpoint以及相关checkpoint工作参数的配置。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/* -------------启用checkpoint */
//只指定两个checkpoint的时间间隔,单位是毫秒
env.enableCheckpointing(1000);
//指定checkpoint时间间隔,并指定checkpoint的模式,是exactly-once(刚好一次)还是AT_LEAST_ONCE(至少一次)。大多数情况下是exactly-once(默认就是这个模式),少数情况下,如果要求超低延迟的处理情况,才会设置AT_LEAST_ONCE
env.enableCheckpointing(1000,CheckpointingMode.EXACTLY_ONCE)
/* -------------设置checkpoint 模式,和上面的类似 */
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
/* -------------设置checkpoint上一个的结束点到下一个开始点之间的最短时间。
因为checkpoint触发时,需要一定时间去完成整个checkpoint的过程,
如果checkpoint的完成时间过程,导致前后两个checkpoint间的时间间隔过短,这是不合适的,没有必要。
1、这里的时间间隔,指的是上一个checkpoint完成的时间点,到下一个checkpoint开始的时间点的间隔,如果过短,会导致频繁checkpoint,影响性能。假设这个间隔为T
2、而上面设置的checkpoint时间间隔,指的是前一个checkpoint的开始时间到下一个checkpoint的开始时间。所以是始终大于1中的时间间隔的。假设这个间隔为 N
如果T小于这里设置的值,那么无论N设置多少,下一个checkpoint的开始时间必须是500ms之后。如果T大于这里设置的值,那么正常按照N设置的间隔来触发下一个checkpoint,这里设置的间隔无关了。
*/
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
/* -------------设置checkpoint完成的超时时间 */
env.getCheckpointConfig().setCheckpointTimeout(60000);
/* -------------设置checkpoint的最大并行度 */
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
/* 开启checkpoints的外部持久化,但是在job失败的时候不会自动清理,需要自己手工清理state
DELETE_ON_CANCELLATION:在job canceled的时候会自动删除外部的状态数据,但是如果是FAILED的状态则会保留;
RETAIN_ON_CANCELLATION:在job canceled的时候会保留状态数据*/
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
/* 当有更近的保存点时,优先采用savepoint来恢复成检查点*/
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
在conf/flink-conf.yaml中可以配置的参数
参数 | 默认值 | 作用 |
---|---|---|
state.backend | (无) | 用于存储状态的检查点数据的后端。有三种backend,如:jobmanager(MemoryStateBackend,默认是这个),filesystem(FsStateBackend),rocksdb(RocksDBStateBackend)。 |
state.backend.async | true | 是否异步快照 |
state.checkpoints.dir | 无 | checkpoint的目录,例如:hdfs://192.168.1.1/checkpoint |
state.backend.incremental | false | 是否选择增量检查点。即每次快照都只存储上一个检查点的差异数据,而不是完整的数据。可能某些后端不支持这种方式 |
state.checkpoints.num-retained | 1 | 要保留的已完成检查点的最大数量。 |
state.savepoints.dir | 无 | 保存点默认目录 |
checkpoint:
1、检查点的主要目的是在job意外失败时提供恢复机制。
2、Checkpoint的生命周期由Flink管理,即Flink创建,拥有和发布Checkpoint - 无需用户交互。
3、作为一种恢复和定期触发的方法,Checkpoint主要的设计目标是:创建checkpoint,是轻量级的和尽可能快地恢复
savepoint:
1、Savepoints由用户创建,拥有和删除。
2、他们一般是有计划的进行手动备份和恢复。而checkpoint的恢复只会发生在故障时
3、例如,在Flink版本需要更新的时候,或者更改你的流处理逻辑,更改并行性等等。
在这种情况下,我们往往需要关闭一下流,这就需要我们将流中的状态进行存储,后面重新部署job的时候进行会用来恢复。
4、从概念上讲,Savepoints的生成和恢复成本可能更高,并且更多地关注可移植性和对前面提到的作业更改的支持
命令用法:
flink savepoint jobid target_dir
例子:
保存状态数据到指定目录:
flink savepoint xxxxxxxx(哈希码) hdfs://ronnie01:8020/data/flink/savepoint
重启和恢复数据流(也可用于从checkpoint恢复数据流):
flink run -s hdfs://ronnie01:8020/data/flink/savepoint/savepoint-xxxxx-xxxxxxxxx -c com.ronnie.flink.stream.test.CheckPointTest flink-test.jar
-s 指定savepoint/checkpoint目录的存储目录
-c 指定运行的主类的全类名
checkpoint数据可存储方式有不同,flink支持三种:
MemoryStateBackend(内存状态)
FsStateBackend(文件状态)
RocksDBStateBackend(RocksDB状态)
1、概念
MemoryStateBackend将State作为Java对象保存(在堆内存),存储着key/value状态、window运算符、触发器等的哈希表。在Checkpoint时,State Backend将对State进行快照,并将其作为checkpoint发送到JobManager机器上,JobManager将这个State数据存储在Java堆内存。MemoryStateBackend默认使用异步快照,来避免阻塞管道。如果需要修改,可以在MemoryStateBackend的构造函数将布尔值改为false(仅用于调试)。
2、注意点
异步快照方式时,operator操作符在做快照的同时也会处理新流入的数据,默认异步方式
同步快照方式:operator操作符在做快照的时候,不会处理新流入的数据,同步快照会增加数据处理的延迟度。
3、局限性
单次状态大小最大默认被限制为5MB,这个值可以通过构造函数来更改。
无论单次状态大小最大被限制为多少,都不可用大过akka的frame大小。
聚合的状态都会写入jobmanager的内存
4、适用场景
本地开发和调试
状态比较少的作业
1、概念
FsStateBackend将正在运行的数据保存在TaskManager的内存中。在checkpoint时,它将State的快照写入文件系统对应的目录下的文件中。最小元数据存储在JobManager的内存中(如果是高可用模式下,元数据存储在checkpoint中)。FsStateBackend默认使用异步快照,来避免阻塞处理的管道。如果需要禁用,在FsStateBackend构造方法中将布尔值设为false
2、适用场景
状态比较大, 窗口比较长, 大的 KV 状态
需要做 HA 的场景
1、概念
此种方式kv state需要由rockdb数据库来管理,这是和内存或file backend最大的不同,即状态数据是直接写入到rockdb的,不像前面两种,只有在checkpoint的时候才会将数据保存到指定的backend。RocksDBStateBackend使用RocksDB数据库保存数据,这个数据库保存在TaskManager的数据目录中。注意的是:RocksDB,它是一个高性能的Key-Value数据库。数据会放到先内存当中,在一定条件下触发写到磁盘文件上。
在 checkpoint时, 整个 RocksDB数据库的数据会快照一份, 然后存到配置的文件系统中(一般是 hdfs)。同时, Apache Flink将一些最小的元数据存储在 JobManager 的内存或者 Zookeeper 中(对于高可用性情况)。RocksD始终配置为执行异步快照
2、适用场景
RocksDBStateBackend适用于非常大的状态,长窗口、大键值状态的高可用作业。
RocksDBStateBackend是目前唯一可用于支持有状态流处理应用程序的增量检查点
方式1:
直接在 conf/flink-conf.yaml 中指定 state.backend 就是默认程序的backend。
jobmanager(MemoryStateBackend,默认是这个)
filesystem(FsStateBackend)
rocksdb(RocksDBStateBackend)
方式2:
在程序中指定自己想使用的backend
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置状态后端
env.setStateBackend(xxx);
三种类型的实现类:
// 默认使用内存的方式存储状态值, 单位快照的状态上限为10MB, 使用同步方式进行快照。单个状态大小可以设置,单位是byte
env.setStateBackend(new MemeoryStateBackend(10*1024*1024, false));
// 使用 FsStateBackend的方式进行存储, 并且是同步方式进行快照
env.setStateBackend(new FsStateBackend("hdfs://namenode....", false));
// 使用 RocksDBStateBackend方式存储, 并采用增量的快照方式进行存储。后面的true表示增量
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode....", true));
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。