Apache Flink: Vollständiges Checkpoint vs. Inkrementelles Checkpoint

Apache Flink ist eine Echtzeit-Datenstromverarbeitungs-Engine. Die meisten Stream-Verarbeitungsanwendungen sind „zustandsbehaftet“. Das bedeutet, dass der Zustand gespeichert und für die weitere Verarbeitung verwendet wird. In Apache Flink wird der Zustand über ein konfiguriertes State Backend verwaltet. Flink unterstützt in der Produktion zwei State Backends. Eines ist das HashMapStateBackend, und das andere ist das EmbeddedRocksDBStateBackend

Um Datenverlust zu verhindern und Fehlertoleranz zu erreichen, kann Flink Snapshots des Zustands in einem dauerhaften Speicher persistieren. Flink kann so konfiguriert werden, dass entweder der gesamte Zustand an einem dauerhaften Ort oder das Delta seit dem letzten Snapshot gespeichert wird. Letzteres wird als vollständiger Checkpoint bezeichnet, während ersteres als inkrementeller Checkpoint bekannt ist. 

In diesem Artikel werden wir HashMapStateBackend mit vollem Checkpoint und EmbeddedRocksDBStateBackend mit inkrementellem Checkpointing vergleichen. Dieser Artikel geht davon aus, dass das Publikum entweder praktische Kenntnisse oder theoretische Kenntnisse über Apache Flink hat. 

Überblick über das Flink State Backend

Um das Flink State Backend zu verstehen, ist es wichtig, den Unterschied zwischen In-Flight-Zustand und Zustands-Snapshots zu kennen. 

Der In-Flight-Zustand ist als Flinks Arbeitszustand bekannt und wird lokal gespeichert. Je nach Konfiguration des State Backends befindet er sich entweder im Heap-Speicher oder im Off-Heap-Speicher, mit einer möglichen Auslagerung auf die lokale Festplatte.

Auf der anderen Seite werden Zustandsschnappschüsse (Checkpoint oder Speicherpunkt) an einem dauerhaften entfernten Ort gespeichert. Diese Schnappschüsse werden verwendet, um den Flink-Jobzustand im Falle eines Jobfehlers wiederherzustellen. 

Der im Flug befindliche Zustand kann verloren gehen, wenn der Job fehlschlägt. Dies hat keinen Einfluss auf die Jobwiederherstellung, wenn der Checkpoint im Job aktiviert ist. Wenn der Checkpoint konfiguriert ist, wird der Zustand zum Zeitpunkt der Wiederherstellung aus dem dauerhaften Speicher abgerufen. 

Welcher Zustandsspeicher für die Produktion ausgewählt werden soll, hängt von den Anforderungen der Anwendung an Durchsatz, Latenzzeit und Skalierbarkeit ab.

Es gibt zwei Zustandsspeicher, die von Apache Flink in der Produktion unterstützt werden.

1. HashMapStateBackend

Es handelt sich um einen leichten Zustandsspeicher in Flink zur Verwaltung des Keyed-State und Operator-State während der Streamverarbeitung. Der Zustand wird im Java-Heap mithilfe einer HashMap-Datenstruktur gespeichert. Da er im Speicher gespeichert ist, ist die Hauptbeschränkung hier, dass die maximale Zustandsgröße auf die Größe des Java-Heaps begrenzt ist. Es gibt keine Serialisierung beim Schreiben in den Zustand oder Lesen aus dem Zustand. Daher eignet sich dies für Anwendungen mit geringer Latenz, hoher Durchsatzrate und nicht allzu großem Zustand. 

2. EmbeddedRocksDBStateBackend

Dieser Zustands-Backend speichert die im Flug befindlichen Daten in der im Arbeitsspeicher befindlichen RocksDB-Datenbank. Standardmäßig speichert RocksDB die Daten auf der lokalen Festplatte des Task-Managers. Die Daten werden serialisiert und in einem Off-Heap-Speicher gespeichert und auf eine lokale Festplatte übertragen, die mit dem Task-Manager verbunden ist. Das Serialisierungsformat hängt vom im Anwendungscode konfigurierten Typ-Serializer ab. 

Mit diesem Zustands-Backend ist die Menge an speicherbarem Zustand nur durch den Festplattenspeicher begrenzt, der dem Task-Manager zugeordnet ist. Wenn die Anwendung einen großen Zustand hat, der nicht im Heap-Speicher enthalten werden kann, ist dies das richtige Zustands-Backend. Da Serialisierung involviert ist, wird die Anwendung im Vergleich zum HashMapStateBackend eine höhere Latenz und eine niedrigere Durchsatzrate haben.

Überblick über den Snapshot-Zustand

Der Snapshot repräsentiert den globalen Zustand des Flink-Jobs. Dies umfasst einen Zeiger auf jeden Datenquellen und den Zustand aller Flink-Zustandsoperatoren nach der Verarbeitung bis zu diesen Zeigern von den Quellen. Das Checkpointing in Apache Flink ist ein Mechanismus zur Erreichung der Ausfallsicherheit durch periodisches Speichern des Zustands in dauerhaftem Remote-Speicher. 

Im Falle eines Jobfehlers ruft Flink den gespeicherten Zustand aus dem dauerhaften Remote-Speicher ab und beginnt mit der Verarbeitung der Streaming-Daten, wo es aufgehört hat. Flink verwendet asynchrones Barrieren-Snapshotting. Es handelt sich um eine Variante des Chandy-Lamport-Algorithmus. 

Flink unterstützt zwei Arten des Checkpointings.

1. Vollständiges Checkpointing

Vollständiges Checkpointing ist, wenn der gesamte Zustand des Flink-Jobs erfasst und in speicherhaltigem Remote-Speicher gespeichert wird. Im Falle eines Jobfehlers wird der Job aus dem zuvor gespeicherten Zustand wiederhergestellt. Die benötigte Speicherkapazität und die für das Checkpointing benötigte Zeit hängen vollständig vom Anwendungszustand ab. Das vollständige Checkpointing funktioniert sowohl mit HashMapStateBackend als auch mit RocksDBStateBackend.

2. Inkrementelles Checkpointing

Inkrementelles Checkpointing ist ein optimierter Ansatz. Anstatt den gesamten Zustand zu snapshotten, speichert Flink nur die seit dem letzten Checkpoint vorgenommenen „Deltas“. Dies reduziert die Netzwerkbelastung und damit die für das Checkpointing benötigte Zeit. In diesem Fall erfolgt das Checkpointing vollständig asynchron.

Nur RocksDBStateBackend unterstützt das inkrementelle Checkpointing. Flink nutzt den internen Mechanismus von RocksDB dafür. Obwohl das Checkpointing weniger Zeit in Anspruch nimmt als das vollständige Checkpointing, hängt die Wiederherstellungszeit im Falle eines Jobfehlers von vielen Faktoren ab. Wenn das Netzwerk ein Engpass ist, kann die Wiederherstellungszeit höher sein als die Wiederherstellung aus dem vollständigen Checkpointing.

Analyse

Pipeline-Details: Apache Beam-Pipeline, die auf der Flink-Engine mit einem festen Fenster von 10 Minuten läuft und das Checkpointing so konfiguriert ist, dass es alle 3 Minuten ausgeführt wird. Der konfigurierte Serialisierungstyp ist AVRO.

  • Cluster-Typ: „m5dn.4xlarge“
  • Endgültiger Checkpoint-Speicher: S3
  • Anzahl der eindeutigen Schlüssel: 2K
Input rate 10k/s (~ 7.8 MB/s)
Typ Nr:
von TM
Parallelismus Heap-Zuweisung
pro TM
Heap-Nutzung
pro TM
Pod-Speichernutzung
pro TM
CPU-Nutzung
pro TM
Checkpoint
Größe
Checkpoint
Dauer
Flink verwalteter
Speicher
HashMapState
Mit vollem Checkpoint
1 1 10GB 8,5GB 11,1GB 1,2 4GB 50 Sek 0
RocksDBState
Mit inkrementellem
Checkpoint mit AVRO
1 1 3GB 1,82GB 4,63GB 1,5 207MB 3 Sek 3GB
Input rate 20k/s (~15.6 MB/s)
Typ Nr:
von TM
Parallelismus Heap-Zuweisung
pro TM
Heap-Nutzung
pro TM
Pod-Nutzung
pro TM
CPU-Nutzung
pro TM
Checkpoint
Größe
Checkpoint
Dauer
Flink verwalteter
Speicher
HashMapState
Mit vollem Checkpoint
2 2 10GB 8,69GB 11,2GB 1,3 8,39GB 50 Sekunden 0
RocksDBState
Mit inkrementellem
Checkpoint mit AVRO
2 2 3GB 1,87GB 4,71GB 1,4 404MB 3 Sekunden 3GB

input rate 30k/s (~23.5 MB/s)
Typ  :
von TM
Parallelität Heap-Zuweisung
pro TM
Heap-Nutzung
pro TM
Pod-Nutzung
pro TM
CPU-Nutzung
pro TM
Checkpoint
Größe
Checkpoint
Dauer
Flink Managed
Speicher
HashMapState
Mit Vollständigem Checkpoint
3 3 10GB 9,74GB 11,2GB 1,2 11,8GB 65 Sekunden 0
RocksDBState
Mit Inkrementellem
 Checkpoint mit AVRO
3 3 3GB 1,72GB 3,75GB 1,4 576MB 5 Sekunden 3GB

Wie aus dem obigen Experiment ersichtlich ist, verringert sich die Checkpoint-Dauer mit inkrementellem Checkpointing. Dies kann die Anwendungsleistung durchaus verbessern.  

Zusammenfassung

Hier ist die Zusammenfassung des Experiments.

HashMapStateBackend mit vollem Checkpoint RocksDBStateBackend mit inkrementellem Checkpoint
Anwendungs-Latenz Niedrige Latenz, da Daten als Java-Objekte im Heap gespeichert werden. Lesen und Schreiben erfordern keine Serialisierung. Da bei jedem Lesen oder Schreiben Serialisierung erforderlich ist, ist die Latenz höher.
Skalierbarkeit Weniger skalierbar für Jobs mit großem Zustand Sehr skalierbar für Jobs mit großem Zustand und langsam ändernden Zuständen
Fehlertoleranz Sehr fehlertolerant Sehr fehlertolerant
Checkpoint-Dauer Die Checkpoint-Dauer ist hoch, da jedes Mal ein Snapshot des gesamten Datensatzes erstellt wird. Die Checkpoint-Dauer ist gering, da nur die Differenz seit dem letzten Checkpoint gespeichert wird.
Wiederherstellungskomplexität Die Wiederherstellung ist einfach, da nur ein Snapshot geladen werden muss. Die Wiederherstellung ist komplex, da RocksDB den Zustand aus mehreren Checkpoints aufbauen muss und vieles von der Netzwerkgeschwindigkeit abhängt.
Speicheranforderung Unterstützt von HashMapStateBackend und RocksDBStateBackend. Nur von RocksDBStateBackend unterstützt.
Zustandssnapshot Speichert bei jedem Checkpoint den gesamten Zustand. Speichert nur die Differenz seit dem letzten erfolgreichen.
Heap-Größe Da der Zustand vor dem Checkpoint im Heap gespeichert wird, sind die Heap-Anforderungen hoch, und es sind mehr GC-Zyklen zu erwarten. Zustände werden im Off-Heap gespeichert und möglicherweise auf der lokalen Festplatte, daher weniger Heap-Speicher und weniger GC-Zyklen.
Zustands-Backend-Größe Begrenzt auf den maximalen Heap, der einem JVM zugewiesen ist. Die Größe des RocksDB-Zustands-Backends wird nicht durch das JVM-Heap-Limit begrenzt, sondern nur durch den verfügbaren Festplattenspeicher.
Leistungsauswirkung Größerer Einfluss auf die Verarbeitung, da es sich um einen vollständigen Schnappschuss handelt. Geringerer Einfluss auf die Verarbeitung, da nur der Delta gespeichert wird.
CPU Die CPU-Auslastung erfolgt nur für die Verarbeitung und GC. Es ist keine Zustands-Backend-Serialisierung beteiligt. Die CPU-Auslastung ist im Vergleich zum vollständigen Checkpoint bei derselben Eingabedatenrate höher.

Die CPU-Auslastung kann durch Anwendung eines geeigneten Serialisierungsmechanismus optimiert werden. Wir haben mit Avro experimentiert und viel bessere Ergebnisse im Vergleich zu Kryo erhalten.

Bestes Anwendungsszenario Geeignet für kleinere Zustands-Backend-Größe und häufig wechselnden Zustand. Geeignet für größere Zustands-Backend-Größe und langsam aktualisierende Zustände.

Source:
https://dzone.com/articles/apache-flink-full-checkpoint-vs-incremental-checkpoint