Apache Flink ist eine Echtzeit-Datenstromverarbeitungs-Engine. Die meisten Stream-Verarbeitungsanwendungen sind „zustandsbehaftet“. Das bedeutet, dass der Zustand gespeichert und für die weitere Verarbeitung verwendet wird. In Apache Flink wird der Zustand über ein konfiguriertes State Backend verwaltet. Flink unterstützt in der Produktion zwei State Backends. Eines ist das HashMapStateBackend
, und das andere ist das EmbeddedRocksDBStateBackend
.
Um Datenverlust zu verhindern und Fehlertoleranz zu erreichen, kann Flink Snapshots des Zustands in einem dauerhaften Speicher persistieren. Flink kann so konfiguriert werden, dass entweder der gesamte Zustand an einem dauerhaften Ort oder das Delta seit dem letzten Snapshot gespeichert wird. Letzteres wird als vollständiger Checkpoint bezeichnet, während ersteres als inkrementeller Checkpoint bekannt ist.
In diesem Artikel werden wir HashMapStateBackend
mit vollem Checkpoint und EmbeddedRocksDBStateBackend
mit inkrementellem Checkpointing vergleichen. Dieser Artikel geht davon aus, dass das Publikum entweder praktische Kenntnisse oder theoretische Kenntnisse über Apache Flink hat.
Überblick über das Flink State Backend
Um das Flink State Backend zu verstehen, ist es wichtig, den Unterschied zwischen In-Flight-Zustand und Zustands-Snapshots zu kennen.
Der In-Flight-Zustand ist als Flinks Arbeitszustand bekannt und wird lokal gespeichert. Je nach Konfiguration des State Backends befindet er sich entweder im Heap-Speicher oder im Off-Heap-Speicher, mit einer möglichen Auslagerung auf die lokale Festplatte.
Auf der anderen Seite werden Zustandsschnappschüsse (Checkpoint oder Speicherpunkt) an einem dauerhaften entfernten Ort gespeichert. Diese Schnappschüsse werden verwendet, um den Flink-Jobzustand im Falle eines Jobfehlers wiederherzustellen.
Der im Flug befindliche Zustand kann verloren gehen, wenn der Job fehlschlägt. Dies hat keinen Einfluss auf die Jobwiederherstellung, wenn der Checkpoint im Job aktiviert ist. Wenn der Checkpoint konfiguriert ist, wird der Zustand zum Zeitpunkt der Wiederherstellung aus dem dauerhaften Speicher abgerufen.
Welcher Zustandsspeicher für die Produktion ausgewählt werden soll, hängt von den Anforderungen der Anwendung an Durchsatz, Latenzzeit und Skalierbarkeit ab.
Es gibt zwei Zustandsspeicher, die von Apache Flink in der Produktion unterstützt werden.
1. HashMapStateBackend
Es handelt sich um einen leichten Zustandsspeicher in Flink zur Verwaltung des Keyed-State und Operator-State während der Streamverarbeitung. Der Zustand wird im Java-Heap mithilfe einer HashMap-Datenstruktur gespeichert. Da er im Speicher gespeichert ist, ist die Hauptbeschränkung hier, dass die maximale Zustandsgröße auf die Größe des Java-Heaps begrenzt ist. Es gibt keine Serialisierung beim Schreiben in den Zustand oder Lesen aus dem Zustand. Daher eignet sich dies für Anwendungen mit geringer Latenz, hoher Durchsatzrate und nicht allzu großem Zustand.
2. EmbeddedRocksDBStateBackend
Dieser Zustands-Backend speichert die im Flug befindlichen Daten in der im Arbeitsspeicher befindlichen RocksDB-Datenbank. Standardmäßig speichert RocksDB die Daten auf der lokalen Festplatte des Task-Managers. Die Daten werden serialisiert und in einem Off-Heap-Speicher gespeichert und auf eine lokale Festplatte übertragen, die mit dem Task-Manager verbunden ist. Das Serialisierungsformat hängt vom im Anwendungscode konfigurierten Typ-Serializer ab.
Mit diesem Zustands-Backend ist die Menge an speicherbarem Zustand nur durch den Festplattenspeicher begrenzt, der dem Task-Manager zugeordnet ist. Wenn die Anwendung einen großen Zustand hat, der nicht im Heap-Speicher enthalten werden kann, ist dies das richtige Zustands-Backend. Da Serialisierung involviert ist, wird die Anwendung im Vergleich zum HashMapStateBackend
eine höhere Latenz und eine niedrigere Durchsatzrate haben.
Überblick über den Snapshot-Zustand
Der Snapshot repräsentiert den globalen Zustand des Flink-Jobs. Dies umfasst einen Zeiger auf jeden Datenquellen und den Zustand aller Flink-Zustandsoperatoren nach der Verarbeitung bis zu diesen Zeigern von den Quellen. Das Checkpointing in Apache Flink ist ein Mechanismus zur Erreichung der Ausfallsicherheit durch periodisches Speichern des Zustands in dauerhaftem Remote-Speicher.
Im Falle eines Jobfehlers ruft Flink den gespeicherten Zustand aus dem dauerhaften Remote-Speicher ab und beginnt mit der Verarbeitung der Streaming-Daten, wo es aufgehört hat. Flink verwendet asynchrones Barrieren-Snapshotting. Es handelt sich um eine Variante des Chandy-Lamport-Algorithmus.
Flink unterstützt zwei Arten des Checkpointings.
1. Vollständiges Checkpointing
Vollständiges Checkpointing ist, wenn der gesamte Zustand des Flink-Jobs erfasst und in speicherhaltigem Remote-Speicher gespeichert wird. Im Falle eines Jobfehlers wird der Job aus dem zuvor gespeicherten Zustand wiederhergestellt. Die benötigte Speicherkapazität und die für das Checkpointing benötigte Zeit hängen vollständig vom Anwendungszustand ab. Das vollständige Checkpointing funktioniert sowohl mit HashMapStateBackend
als auch mit RocksDBStateBackend
.
2. Inkrementelles Checkpointing
Inkrementelles Checkpointing ist ein optimierter Ansatz. Anstatt den gesamten Zustand zu snapshotten, speichert Flink nur die seit dem letzten Checkpoint vorgenommenen „Deltas“. Dies reduziert die Netzwerkbelastung und damit die für das Checkpointing benötigte Zeit. In diesem Fall erfolgt das Checkpointing vollständig asynchron.
Nur RocksDBStateBackend
unterstützt das inkrementelle Checkpointing. Flink nutzt den internen Mechanismus von RocksDB dafür. Obwohl das Checkpointing weniger Zeit in Anspruch nimmt als das vollständige Checkpointing, hängt die Wiederherstellungszeit im Falle eines Jobfehlers von vielen Faktoren ab. Wenn das Netzwerk ein Engpass ist, kann die Wiederherstellungszeit höher sein als die Wiederherstellung aus dem vollständigen Checkpointing.
Analyse
Pipeline-Details: Apache Beam-Pipeline, die auf der Flink-Engine mit einem festen Fenster von 10 Minuten läuft und das Checkpointing so konfiguriert ist, dass es alle 3 Minuten ausgeführt wird. Der konfigurierte Serialisierungstyp ist AVRO.
- Cluster-Typ: „m5dn.4xlarge“
- Endgültiger Checkpoint-Speicher: S3
- Anzahl der eindeutigen Schlüssel: 2K
Input rate 10k/s (~ 7.8 MB/s) | |||||||||
---|---|---|---|---|---|---|---|---|---|
Typ | Nr: von TM |
Parallelismus | Heap-Zuweisung pro TM |
Heap-Nutzung pro TM |
Pod-Speichernutzung pro TM |
CPU-Nutzung pro TM |
Checkpoint Größe |
Checkpoint Dauer |
Flink verwalteter Speicher |
HashMapState Mit vollem Checkpoint |
1 | 1 | 10GB | 8,5GB | 11,1GB | 1,2 | 4GB | 50 Sek | 0 |
RocksDBState Mit inkrementellem Checkpoint mit AVRO |
1 | 1 | 3GB | 1,82GB | 4,63GB | 1,5 | 207MB | 3 Sek | 3GB |
Input rate 20k/s (~15.6 MB/s) | |||||||||
---|---|---|---|---|---|---|---|---|---|
Typ | Nr: von TM |
Parallelismus | Heap-Zuweisung pro TM |
Heap-Nutzung pro TM |
Pod-Nutzung pro TM |
CPU-Nutzung pro TM |
Checkpoint Größe |
Checkpoint Dauer |
Flink verwalteter Speicher |
HashMapState Mit vollem Checkpoint |
2 | 2 | 10GB | 8,69GB | 11,2GB | 1,3 | 8,39GB | 50 Sekunden | 0 |
RocksDBState Mit inkrementellem Checkpoint mit AVRO |
2 | 2 | 3GB | 1,87GB | 4,71GB | 1,4 | 404MB | 3 Sekunden | 3GB |
input rate 30k/s (~23.5 MB/s) | |||||||||
---|---|---|---|---|---|---|---|---|---|
Typ | : von TM |
Parallelität | Heap-Zuweisung pro TM |
Heap-Nutzung pro TM |
Pod-Nutzung pro TM |
CPU-Nutzung pro TM |
Checkpoint Größe |
Checkpoint Dauer |
Flink Managed Speicher |
HashMapState Mit Vollständigem Checkpoint |
3 | 3 | 10GB | 9,74GB | 11,2GB | 1,2 | 11,8GB | 65 Sekunden | 0 |
RocksDBState Mit Inkrementellem Checkpoint mit AVRO |
3 | 3 | 3GB | 1,72GB | 3,75GB | 1,4 | 576MB | 5 Sekunden | 3GB |
Wie aus dem obigen Experiment ersichtlich ist, verringert sich die Checkpoint-Dauer mit inkrementellem Checkpointing. Dies kann die Anwendungsleistung durchaus verbessern.
Zusammenfassung
Hier ist die Zusammenfassung des Experiments.
HashMapStateBackend mit vollem Checkpoint | RocksDBStateBackend mit inkrementellem Checkpoint | |
Anwendungs-Latenz | Niedrige Latenz, da Daten als Java-Objekte im Heap gespeichert werden. Lesen und Schreiben erfordern keine Serialisierung. | Da bei jedem Lesen oder Schreiben Serialisierung erforderlich ist, ist die Latenz höher. |
Skalierbarkeit | Weniger skalierbar für Jobs mit großem Zustand | Sehr skalierbar für Jobs mit großem Zustand und langsam ändernden Zuständen |
Fehlertoleranz | Sehr fehlertolerant | Sehr fehlertolerant |
Checkpoint-Dauer | Die Checkpoint-Dauer ist hoch, da jedes Mal ein Snapshot des gesamten Datensatzes erstellt wird. | Die Checkpoint-Dauer ist gering, da nur die Differenz seit dem letzten Checkpoint gespeichert wird. |
Wiederherstellungskomplexität | Die Wiederherstellung ist einfach, da nur ein Snapshot geladen werden muss. | Die Wiederherstellung ist komplex, da RocksDB den Zustand aus mehreren Checkpoints aufbauen muss und vieles von der Netzwerkgeschwindigkeit abhängt. |
Speicheranforderung | Unterstützt von HashMapStateBackend und RocksDBStateBackend. | Nur von RocksDBStateBackend unterstützt. |
Zustandssnapshot | Speichert bei jedem Checkpoint den gesamten Zustand. | Speichert nur die Differenz seit dem letzten erfolgreichen. |
Heap-Größe | Da der Zustand vor dem Checkpoint im Heap gespeichert wird, sind die Heap-Anforderungen hoch, und es sind mehr GC-Zyklen zu erwarten. | Zustände werden im Off-Heap gespeichert und möglicherweise auf der lokalen Festplatte, daher weniger Heap-Speicher und weniger GC-Zyklen. |
Zustands-Backend-Größe | Begrenzt auf den maximalen Heap, der einem JVM zugewiesen ist. | Die Größe des RocksDB-Zustands-Backends wird nicht durch das JVM-Heap-Limit begrenzt, sondern nur durch den verfügbaren Festplattenspeicher. |
Leistungsauswirkung | Größerer Einfluss auf die Verarbeitung, da es sich um einen vollständigen Schnappschuss handelt. | Geringerer Einfluss auf die Verarbeitung, da nur der Delta gespeichert wird. |
CPU | Die CPU-Auslastung erfolgt nur für die Verarbeitung und GC. Es ist keine Zustands-Backend-Serialisierung beteiligt. | Die CPU-Auslastung ist im Vergleich zum vollständigen Checkpoint bei derselben Eingabedatenrate höher.
Die CPU-Auslastung kann durch Anwendung eines geeigneten Serialisierungsmechanismus optimiert werden. Wir haben mit Avro experimentiert und viel bessere Ergebnisse im Vergleich zu Kryo erhalten. |
Bestes Anwendungsszenario | Geeignet für kleinere Zustands-Backend-Größe und häufig wechselnden Zustand. | Geeignet für größere Zustands-Backend-Größe und langsam aktualisierende Zustände. |
Source:
https://dzone.com/articles/apache-flink-full-checkpoint-vs-incremental-checkpoint