Apache Flink 是一个实时数据流处理引擎。大多数流处理应用程序都是“有状态”的。这意味着状态被存储并用于进一步处理。在 Apache Flink 中,状态通过配置的状态后端进行管理。Flink 在生产中支持两种状态后端。一种是 HashMapStateBackend
,另一种是 EmbeddedRocksDBStateBackend
。
为了防止数据丢失并实现容错性,Flink 可以将状态的快照持久化到可靠的存储中。Flink 可以配置为将整个状态快照到可靠位置,或者自上次快照以来的增量。前者称为全量检查点,后者称为增量检查点。
在本文中,我们将比较带有全量检查点的 HashMapStateBackend
和带有增量检查点的 EmbeddedRocksDBStateBackend
。本文假定读者对Apache Flink要么有工作知识,要么有理论知识。
Flink 状态后端概述
要理解 Flink 状态后端,重要的是要了解在途状态和状态快照之间的区别。
在途状态被称为 Flink 的工作状态,并存储在本地。根据状态后端的配置,它可以是堆内存或堆外内存,可能会溢出到本地磁盘。
另一方面,状态快照(检查点或保存点)存储在持久的远程位置。这些快照用于在作业失败时重建 Flink 作业状态。
如果作业失败,正在处理的状态可能会丢失。如果作业中启用了检查点,这不会影响作业恢复。当配置了检查点时,状态会在恢复时从持久存储中检索出来。
生产环境中选择哪种状态后端取决于应用程序对吞吐量、延迟和可扩展性的要求。
Apache Flink 在生产环境中支持两种状态后端。
1. HashMapStateBackend
它是 Flink 中用于管理键控状态和操作状态的轻量级状态后端。状态存储在 Java 堆中,使用 HashMap 数据结构。由于存储在内存中,主要约束在于最大状态大小受限于 Java 堆大小。在写入状态或从状态中读取时不涉及序列化。因此,这适用于低延迟、高吞吐量和状态不是很大的应用程序。
2. EmbeddedRocksDBStateBackend
这个状态后端将正在传输的数据存储在内存中的RocksDB数据库中。默认情况下,RocksDB将数据存储在任务管理器的本地磁盘中。数据被序列化并存储在堆外内存中,并溢出到附加到任务管理器的本地磁盘。序列化格式取决于应用程序中配置的类型序列化程序。
使用这个状态后端,可以存储的状态量仅受限于连接到任务管理器的磁盘空间。如果应用程序具有巨大的状态并且无法包含在堆内存中,则这是正确的状态后端。由于涉及序列化,与HashMapStateBackend
相比,应用程序的延迟会更高,吞吐量会更低。
快照状态概述
快照表示Flink作业的全局状态。这包括每个数据源的指针以及从这些指针开始处理后所有Flink有状态操作符的状态。Apache Flink中的检查点是通过定期将状态保存到持久远程存储来实现容错性的机制。
在作业失败的情况下,Flink从持久远程存储中检索存储的状态,并从之前停止的地方开始处理流数据。Flink使用异步屏障快照。这是Chandy-Lamport算法的一种变体。
Flink支持两种类型的检查点。
1. 完整检查点
完整的检查点是指将整个 Flink 作业状态捕获并存储在持久的远程存储中。在作业失败的情况下,作业会从先前存储的状态中恢复。存储空间需求和执行检查点所需的时间完全取决于应用程序状态。完整检查点适用于 HashMapStateBackend
和 RocksDBStateBackend
。
2. 增量检查点
增量检查点是一种优化的方法。Flink 不是对整个状态进行快照,而是仅保存自上次检查点以来对状态所做的“增量”。这降低了网络开销,从而减少了检查点所需的时间。在这种情况下,检查点完全异步进行。
只有 RocksDBStateBackend
支持增量检查点。Flink 利用 RocksDB 的内部机制实现此功能。尽管检查点所需的时间比完整检查点少,但在作业失败的情况下,恢复时间取决于许多因素。如果网络是瓶颈,恢复时间可能高于从完整检查点恢复。
分析
管道细节:Apache Beam 管道在 Flink 引擎上运行,固定窗口为 10 分钟,检查点配置为每 3 分钟运行一次。配置的序列化类型为 AVRO。
- 集群类型:”m5dn.4xlarge”
- 最终检查点存储:S3
- 唯一键的数量:2K
Input rate 10k/s (~ 7.8 MB/s) | |||||||||
---|---|---|---|---|---|---|---|---|---|
键入 | 编号: 的TM |
并行性 | 堆分配 每个TM |
堆使用率 每个TM |
Pod内存使用情况 每个TM |
CPU使用率 每个TM |
检查点 大小 |
检查点 持续时间 |
Flink托管 内存 |
HashMapState 使用完整检查点 |
1 | 1 | 10GB | 8.5GB | 11.1GB | 1.2 | 4GB | 50秒 | 0 |
RocksDBState 使用带有增量检查点的AVRO |
1 | 1 | 3GB | 1.82GB | 4.63GB | 1.5 | 207MB | 3秒 | 3GB |
Input rate 20k/s (~15.6 MB/s) | |||||||||
---|---|---|---|---|---|---|---|---|---|
键入 | 编号: 的TM |
并行性 | 堆分配 每个TM |
堆使用率 每个TM |
Pod使用率 每个TM |
CPU使用率 每个TM |
检查点 大小 |
检查点 持续时间 |
Flink托管 内存 |
HashMapState 使用完整检查点 |
2 | 2 | 10GB | 8.69GB | 11.2GB | 1.3 | 8.39GB | 50秒 | 0 |
RocksDBState 带增量 使用AVRO进行检查点 |
2 | 2 | 3GB | 1.87GB | 4.71GB | 1.4 | 404MB | 3秒 | 3GB |
input rate 30k/s (~23.5 MB/s) | |||||||||
---|---|---|---|---|---|---|---|---|---|
类型 | 不: 的TM数量 |
并行度 | 堆分配 每个TM |
堆使用情况 每个TM |
Pod使用情况 每个TM |
CPU使用情况 每个TM |
检查点 大小 |
检查点 持续时间 |
Flink管理 内存 |
HashMapState 带完整检查点 |
3 | 3 | 10GB | 9.74GB | 11.2GB | 1.2 | 11.8GB | 65秒 | 0 |
RocksDBState 带增量 使用AVRO进行检查点 |
3 | 3 | 3GB | 1.72GB | 3.75GB | 1.4 | 576MB | 5秒 | 3GB |
如上实验结果可见,增量检查点可以缩短检查点持续时间,这有助于提升应用程序性能。
总结
以下是实验的总结。
HashMapStateBackend带完整检查点 | 具有增量检查点的RocksDBStateBackend | |
应用延迟 | 低延迟,因为数据存储为Java对象在堆中。读写不涉及任何序列化。 | 由于每次读取或写入应用都涉及序列化,因此延迟会更高。 |
可伸缩性 | 对具有大状态的作业来说可扩展性较低 | 对具有大状态和状态变化缓慢的作业来说可扩展性极高 |
容错性 | 高容错性 | 高容错性 |
检查点持续时间 | 检查点持续时间较长,因为每次都为整个数据集进行快照。 | 检查点持续时间较短,因为只保存自上次检查点以来的增量。 |
恢复复杂性 | 恢复很容易,因为只需加载一个快照。 | 恢复复杂,因为RocksDB必须从多个检查点构建状态,而且很大程度上取决于网络速度。 |
存储需求 | HashMapStateBackend和RocksDBStatebackend均支持。 | 仅RocksDBStatebackend支持。 |
状态快照 | 每次检查点都保存整个状态。 | 仅保存自上次成功检查点以来的增量。 |
堆大小 | 由于状态在检查点之前存储在堆中,因此堆要求很高,预计会有更多的GC循环。 | 状态存储在堆外并可能存储在本地磁盘上,因此占用堆空间较少,GC 循环也较少。 |
状态后端大小 | 受限于分配给 JVM 的最大堆大小。 | RocksDB 状态后端大小不受 JVM 堆限制,仅受可用磁盘空间限制。 |
性能影响 | 处理性能影响较大,因为它是完整快照。 | 处理性能影响较小,因为只快照增量。 |
CPU | CPU 使用仅用于处理和 GC。不涉及状态后端序列化。 | 与完整检查点相比,CPU 使用率较高,数据输入速率相同。
通过应用适当的序列化机制可以优化 CPU 利用率。我们尝试了 Avro,与 Kryo 相比获得了更好的结果。 |
最佳用例 | 适用于较小的状态后端大小和频繁更改的状态。 | 适用于较大的状态后端和缓慢更新状态。 |
Source:
https://dzone.com/articles/apache-flink-full-checkpoint-vs-incremental-checkpoint