Apache Flink: フルチェックポイント vs インクリメンタルチェックポイント

Apache Flinkはリアルタイムのデータストリーム処理エンジンです。ほとんどのストリーム処理アプリケーションは「状態を持つ」です。これは、状態が保存されて将来の処理に使用されることを意味します。Apache Flinkでは、状態は構成された状態バックエンドを介して管理されます。Flinkは本番環境で2つの状態バックエンドをサポートしています。1つはHashMapStateBackendであり、もう1つはEmbeddedRocksDBStateBackendです。

データの損失を防ぎ耐障害性を実現するために、Flinkは状態のスナップショットを耐久性のあるストレージに永続化できます。Flinkは、全体の状態を耐久性のある場所にスナップショットするか、前回のスナップショット以降の差分をスナップショットするように構成できます。前者はフルチェックポイントと呼ばれ、後者はインクリメンタルチェックポイントとして知られています。

この記事では、HashMapStateBackendとフルチェックポイントを比較し、EmbeddedRocksDBStateBackendとインクリメンタルチェックポイントを比較します。この記事では、聴衆がApache Flinkの実務知識または理論知識を持っていることを前提としています。

Flink State Backendの概要

Flinkの状態バックエンドを理解するには、in-flight state(飛行中の状態)状態スナップショットの違いを知ることが重要です。

飛行中の状態はFlinkの作業状態として知られ、ローカルに保存されます。状態バックエンドの構成に基づいて、ヒープメモリまたはオフヒープメモリになり、必要に応じてローカルディスクにオーバーフローする可能性があります。

一方、ステートスナップショット(チェックポイントまたはセーブポイント)は耐久性のあるリモート位置に保存されます。これらのスナップショットは、ジョブの障害が発生した場合にFlinkジョブの状態を再構築するために使用されます。

ジョブが失敗した場合、途中の状態は失われる可能性があります。ただし、ジョブでチェックポイントが有効になっている場合、ジョブの復旧には影響しません。チェックポイントが構成されている場合、状態は復旧時に耐久性のあるストレージから取得されます。

本番環境で選択すべきステートバックエンドは、スループット、レイテンシ、およびスケーラビリティの要件に依存します。

Apache Flinkが本番環境でサポートするステートバックエンドは2つあります。

1. HashMapStateBackend

FlinkのKeyed StateとOperator Stateを処理するための軽量なステートバックエンドです。状態はHashMapデータ構造を使用してJavaヒープに格納されます。メモリに保存されるため、ここでの主な制約は最大状態サイズがJavaヒープサイズに制限されることです。状態への書き込みや状態からの読み取りにはシリアライゼーションは関与しません。そのため、これは低レイテンシ、高スループット、そしてそれほど大規模でない状態のアプリケーションに適しています。

2. EmbeddedRocksDBStateBackend

この状態バックエンドは、インフライトデータをインメモリのRocksDBデータベースに保存します。デフォルトでは、RocksDBはデータをタスクマネージャーのローカルディスクに保存します。データはシリアル化され、オフヒープメモリに保存され、タスクマネージャーに接続されたローカルディスクにスパイルされます。シリアル化フォーマットは、アプリケーションで構成された型のシリアライザに依存します。

この状態バックエンドでは、保存できる状態の量は、タスクマネージャーに接続されたディスクスペースのみによって制限されます。アプリケーションに巨大な状態があり、ヒープメモリに収まらない場合、これが適した状態バックエンドです。シリアル化が関係しているため、HashMapStateBackendに比べて、アプリケーションのレイテンシーが高く、スループットが低くなります。

スナップショット状態の概要

スナップショットは、Flink Jobのグローバル状態を表します。これには、各データソースへのポインタと、それらのポインタからソースまでの処理後のすべてのFlinkの状態フルオペレータの状態が含まれます。Apache Flinkのチェックポイントは、定期的に状態を耐久性のあるリモートストレージに保存することで、障害耐性を実現するメカニズムです。

ジョブの障害の場合、Flinkは耐久性のあるリモートストレージから保存された状態を取得し、ストリーミングデータを前回終了した地点から処理を再開します。Flinkは非同期バリアスナップショットを使用しています。これはChandy-Lamportアルゴリズムの変種です。

Flinkは2種類のチェックポイントをサポートしています。

1. フルチェックポイント

フルチェックポイントは、Flinkジョブの全状態がキャプチャされ、耐久性のあるリモートストレージに保存される方法です。ジョブの障害が発生した場合、ジョブは以前に保存された状態から復旧します。ストレージスペースの要件やチェックポイントの実行時間は、アプリケーションの状態に完全に依存します。フルチェックポイントは、HashMapStateBackendおよびRocksDBStateBackendの両方で動作します。

2. インクリメンタルチェックポイント

インクリメンタルチェックポイントは、最適化されたアプローチです。全体の状態をスナップショットする代わりに、Flinkは最後のチェックポイント以降に状態に加えられた「デルタ」のみを保存します。これにより、ネットワークオーバーヘッドとしたがって、チェックポイントにかかる時間が削減されます。この場合、チェックポイントは完全に非同期で実行されます。

RocksDBStateBackendのみがインクリメンタルチェックポイントをサポートしています。Flinkはこれに対してRocksDBの内部メカニズムを活用しています。フルチェックポイントよりもチェックポイントにかかる時間が短いものの、ジョブの障害発生時の復旧時間は多くの要因に依存します。ネットワークがボトルネックである場合、フルチェックポイントからの復旧時間よりも長くなる可能性があります。

分析

パイプラインの詳細: Apache BeamパイプラインがFlinkエンジンで実行され、固定ウィンドウが10分で、チェックポイントは3分ごとに実行されるように構成されています。シリアル化タイプはAVROに構成されています。

  • クラスタータイプ: “m5dn.4xlarge”
  • 最終チェックポイントストレージ: S3
  • ユニークキーの数: 2K
Input rate 10k/s (~ 7.8 MB/s)
タイプ  No:
TMの数
並列処理 ヒープ割り当て
TMごと
ヒープ使用量
TMごと
Podメモリ使用量
TMごと
CPU使用率
TMごと
チェックポイント
サイズ
チェックポイント
期間
Flink管理
メモリ
HashMapState
With 完全チェックポイント
1 1 10GB 8.5GB 11.1GB 1.2 4GB 50秒 0
RocksDBState
With 増分
 AVROを使用したチェックポイント
1 1 3GB 1.82GB 4.63GB 1.5 207MB 3秒 3GB
Input rate 20k/s (~15.6 MB/s)
タイプ  No:
TMの数
並列処理 ヒープ割り当て
TMごと
ヒープ使用量
TMごと
Pod使用量
TMごと
CPU使用率
TMごと
チェックポイント
サイズ
チェックポイント
期間
Flink管理
メモリ
HashMapState
With 完全チェックポイント
2 2 10GB 8.69GB 11.2GB 1.3 8.39GB 50秒 0
RocksDBState
インクリメンタル
AVROを使用したチェックポイント
2 2 3GB 1.87GB 4.71GB 1.4 404MB 3秒 3GB

input rate 30k/s (~23.5 MB/s)
タイプ No:
TMの数
並列性 ヒープ割り当て
TMごと
ヒープ使用量
TMごと
Pod使用量
TMごと
CPU使用量
TMごと
チェックポイント
サイズ
チェックポイント
期間
Flink管理
メモリ
HashMapState
フルチェックポイントと 
3 3 10GB 9.74GB 11.2GB 1.2 11.8GB 65秒 0
RocksDBState
インクリメンタル
 AVROを使用したチェックポイント
3 3 3GB 1.72GB 3.75GB 1.4 576MB 5秒 3GB

上記の実験からわかるように、チェックポイントの期間はインクリメンタルチェックポイントで短縮します。これはアプリケーションのパフォーマンスを大いに助けることができます。

まとめ

以下は実験の結果のまとめです。

フルチェックポイントを伴うHashMapStateBackend 増分チェックポイントを備えたRocksDBStateBackend
アプリケーションレイテンシー ヒープ内のJavaオブジェクトとしてデータが格納されているため、低レイテンシー。読み取りや書き込みはシリアル化を必要としません。 シリアル化がすべての読み取りまたは書き込みに関与するため、レイテンシーが高くなります。
スケーラビリティ 大規模な状態を持つジョブにはスケーラビリティが低い 大規模な状態やゆっくり変化する状態を持つジョブには高いスケーラビリティ
障害耐性 非常に障害耐性が高い 非常に障害耐性が高い
チェックポイントの期間 スナップショットが常に全データセットに対して行われるため、チェックポイントの期間が長い 直近のチェックポイント以降のデルタのみ保存されるため、チェックポイントの期間が短い
復旧の複雑さ 1つのスナップショットのみを読み込めばよいため、復旧が容易 RocksDBは複数のチェックポイントから状態を構築する必要があるため、復旧が複雑でネットワーク速度に多くが依存する
ストレージ要件 HashMapStateBackendとRocksDBStatebackendの両方でサポートされています RocksDBStatebackendのみでサポートされています
状態のスナップショット すべてのチェックポイントで完全な状態を保存 直近の成功したチェックポイント以降のデルタのみを保存
ヒープサイズ チェックポイント前に状態がヒープに保存されるため、ヒープ要件が高く、より多くのGCサイクルが予想されます。 状態はオフヒープおよびローカルディスクに保存されており、そのためヒープスペースが少なくGCサイクルも少なくなります。
状態バックエンドのサイズ JVMに割り当てられた最大ヒープに制限されます。 RocksDB状態バックエンドのサイズは、JVMヒープ制限ではなく利用可能なディスク容量のみによって制限されます。
パフォーマンスへの影響 フルスナップショットであるため、処理に高い影響を与えます。 デルタのみがスナップショットされるため、処理には少ない影響があります。
CPU CPU使用率は処理とGCのみです。状態バックエンドのシリアル化は関与しません。 フルチェックポイントと比較して、同じ入力データレートに対してCPU使用率が高くなります。

適切なシリアル化メカニズムを適用することでCPU利用率を最適化できます。Avroで実験を行い、Kryoと比較してはるかに良い結果を得ました。

最適な使用ケース 状態バックエンドのサイズが小さい場合や頻繁に変更される状態に適しています。 状態バックエンドが大きく、状態の更新がゆっくりの場合に適しています。

Source:
https://dzone.com/articles/apache-flink-full-checkpoint-vs-incremental-checkpoint