Apache Flink: Checkpoint completo vs Checkpoint incrementale

Apache Flink è un motore per l’elaborazione in tempo reale di flussi di dati. La maggior parte delle applicazioni di elaborazione dei flussi sono “stateful”. Ciò significa che lo stato viene memorizzato e utilizzato per ulteriori elaborazioni. In Apache Flink, lo stato è gestito tramite un backend di stato configurato. Flink supporta due backend di stato in produzione. Uno è il HashMapStateBackend, e l’altro è il EmbeddedRocksDBStateBackend.

Per prevenire la perdita di dati e ottenere tolleranza ai guasti, Flink può persistere istantanee dello stato in uno storage durevole. Flink può essere configurato per catturare un’istantanea dell’intero stato in una posizione durevole o del delta dall’ultima istantanea. La prima è chiamata full checkpoint, e l’ultima è nota come incremental checkpoint.

In questo articolo, confronteremo HashMapStateBackend con full checkpoint e EmbeddedRocksDBStateBackend con incremental checkpointing. Questo articolo presuppone che il pubblico abbia una conoscenza pratica o teorica di Apache Flink.

Panoramica del Backend di Stato di Flink

Per comprendere il backend di stato di Flink, è importante conoscere la differenza tra in-flight state e state snapshots.

L’ in-flight state è conosciuto come lo stato di lavoro di Flink ed è memorizzato localmente. In base alla configurazione del backend di stato, è memorizzato in memoria heap o off-heap, con una possibile fuoriuscita su disco locale.

D’altra parte, gli snapshot di stato (checkpoint o punto di salvataggio) sono memorizzati in una posizione remota durevole. Questi snapshot sono utilizzati per ricostruire lo stato del lavoro di Flink in caso di fallimento del lavoro.

Lo stato in volo può essere perso se il lavoro fallisce. Questo non influisce sul recupero del lavoro se il checkpoint è abilitato nel lavoro. Quando il checkpoint è configurato, lo stato viene recuperato dallo storage durevole al momento del recupero.

Quale backend di stato selezionare per la produzione dipende dai requisiti dell’applicazione in termini di throughput, latenza e scalabilità.

Ci sono due backend di stato che Apache Flink supporta in produzione.

1. HashMapStateBackend

È un backend di stato leggero in Flink per gestire lo Stato Keyed e lo Stato dell’Operatore durante l’elaborazione del flusso. Lo stato è memorizzato nella Java Heap utilizzando una struttura dati HashMap. Poiché è memorizzato in memoria, il principale vincolo qui è che la dimensione massima dello stato è limitata alla dimensione della Java Heap. Non c’è serializzazione coinvolta nella scrittura o nella lettura dello stato. Quindi, questo è adatto per applicazioni a bassa latenza, ad alto throughput e con stati non troppo grandi.

2. EmbeddedRocksDBStateBackend

Questo backend di stato memorizza i dati in volo nel database RocksDB in memoria. Per impostazione predefinita, RocksDB memorizza i dati nel disco locale del task manager. I dati vengono serializzati e memorizzati in una memoria off-heap e trasferiti su un disco locale collegato al task manager. Il formato di serializzazione dipende dal tipo di serializer configurato nell’applicazione.

Con questo backend di stato, la quantità di stato che può essere memorizzata è limitata solo dallo spazio su disco collegato al task manager. Se l’applicazione ha uno stato enorme e non può essere contenuta nella memoria heap, questo è il backend di stato giusto. Poiché è coinvolta la serializzazione, l’applicazione avrà una latenza più elevata e una minore capacità di elaborazione rispetto a HashMapStateBackend.

Panoramica dello Stato di Snapshot

Lo snapshot rappresenta lo stato globale del Job Flink. Questo consiste in un puntatore a ciascuna fonte di dati e nello stato di tutti gli operatori statali di Flink dopo aver elaborato fino a quei puntatori dalle fonti. Il checkpointing in Apache Flink è un meccanismo per ottenere tolleranza ai guasti salvando periodicamente lo stato in uno storage remoto durevole.

In caso di guasto del job, Flink recupera lo stato memorizzato dallo storage remoto durevole e inizia a elaborare i dati in streaming da dove si era fermato. Flink utilizza snapshotting asincrono a barriera. È una variante dell’algoritmo di Chandy-Lamport.

Flink supporta due tipi di checkpointing.

1. Checkpointing Completo

Il checkpointing completo è dove l’intero stato del lavoro Flink viene catturato e memorizzato in uno storage remoto durevole. In caso di errore del lavoro, il lavoro si riprende dallo stato memorizzato in precedenza. I requisiti di spazio di archiviazione e il tempo necessario per effettuare il checkpointing dipendono interamente dallo stato dell’applicazione. Il checkpointing completo funziona sia con HashMapStateBackend che con RocksDBStateBackend.

2. Checkpointing Incrementale

Il checkpointing incrementale è un approccio ottimizzato. Invece di acquisire un’istantanea dell’intero stato, Flink salva solo i ‘delta’ apportati allo stato dall’ultimo checkpoint. Questo riduce il sovraccarico di rete e, quindi, il tempo necessario per il checkpointing. In questo caso, il checkpointing avviene completamente in modo asincrono.

Solo RocksDBStateBackend supporta il checkpointing incrementale. Flink sfrutta il meccanismo interno di RocksDB per questo. Anche se il checkpointing richiede meno tempo rispetto al checkpointing completo, in caso di errore del lavoro, il tempo di recupero dipende da molti fattori. Se la rete è un collo di bottiglia, il tempo di recupero può essere superiore a quello del recupero dal checkpointing completo.

Analisi

Dettagli della pipeline: Pipeline Apache Beam in esecuzione sul motore Flink con una finestra fissa di 10 minuti e il checkpoint è configurato per essere eseguito ogni 3 minuti. Il tipo di serializzazione configurato è AVRO.

  • Tipo di cluster: “m5dn.4xlarge”
  • Storage finale del checkpoint: S3
  • Numero di chiavi uniche: 2K
Input rate 10k/s (~ 7.8 MB/s)
Tipo No:
di TM
Parallelismo Allocazione Heap
per TM
Utilizzo Heap
per TM
Utilizzo Memoria Pod
per TM
Utilizzo CPU
per TM
Checkpoint
Dimensione
Checkpoint
Durata
Flink Gestito
Memoria
HashMapState
Con Checkpoint Completo
1 1 10GB 8.5GB 11.1GB 1.2 4GB 50 sec 0
RocksDBState
Con Incrementale
 Checkpoint con AVRO
1 1 3GB 1.82GB 4.63GB 1.5 207MB 3 sec 3GB
Input rate 20k/s (~15.6 MB/s)
Tipo No:
di TM
Parallelismo Allocazione Heap
per TM
Utilizzo Heap
per TM
Utilizzo Pod
per TM
Utilizzo CPU
per TM
Checkpoint
Dimensione
Checkpoint
Durata
Flink Gestito
Memoria
HashMapState
Con Checkpoint Completo
2 2 10GB 8.69GB 11.2GB 1.3 8.39GB 50 sec 0
RocksDBState
Con Incrementale
Checkpoint con AVRO
2 2 3GB 1.87GB 4.71GB 1.4 404MB 3 sec 3GB

input rate 30k/s (~23.5 MB/s)
Tipo  Numero:
di TM
Parallelismo Allocazione Heap
per TM
Utilizzo Heap
per TM
Utilizzo Pod
per TM
Utilizzo CPU
per TM
Checkpoint
Dimensione
Checkpoint
Durata
Memoria Gestita da Flink
HashMapState
Con Full Checkpoint
3 3 10GB 9.74GB 11.2GB 1.2 11.8GB 65 sec 0
RocksDBState
Con Incrementale
 Checkpoint con AVRO
3 3 3GB 1.72GB 3.75GB 1.4 576MB 5 sec 3GB

Come puoi vedere dall’esperimento sopra, la durata del checkpoint diminuisce con il checkpointing incrementale. Questo può aiutare molto le prestazioni dell’applicazione.  

Riepilogo

Di seguito è riportato il riepilogo dell’esperimento.

HashMapStateBackend con Full Checkpoint RocksDBStateBackend con Checkpoint Incrementale
Latenza dell’Applicazione Bassa latenza perché i dati sono memorizzati come Oggetti Java nell’Heap. La lettura e la scrittura non comportano alcuna serializzazione. Poiché la serializzazione è coinvolta in ogni operazione di lettura o scrittura, la latenza sarà più alta.
Scalabilità Meno scalabile per lavori con stato di grandi dimensioni Altamente scalabile per lavori con stato di grandi dimensioni e stati che cambiano lentamente
Tolleranza ai Guasti Altamente tollerante ai guasti Altamente tollerante ai guasti
Durata del Checkpoint La durata del checkpoint è alta perché viene eseguita la snapshotting per l’intero set di dati ogni volta. La durata del checkpointing è minore perché viene salvato solo il delta dall’ultimo checkpoint.
Complessità di Recupero Il recupero è semplice perché deve essere caricata solo una snapshot. Il recupero è complesso perché RocksDB deve ricostruire lo stato da più checkpoint e molto dipende dalla velocità della rete.
Requisiti di Archiviazione Supportato sia da HashMapStateBackend che da RocksDBStatebackend. Supportato solo da RocksDBStatebackend.
Snapshot dello Stato Salva l’intero stato ad ogni checkpoint. Salva solo il delta dall’ultimo riuscito.
Dimensione dell’Heap Poiché lo stato è memorizzato nell’Heap prima del checkpointing, il requisito dell’Heap è alto e ci si aspetta più cicli di GC. Gli stati sono memorizzati al di fuori dell’heap e possibilmente sul disco locale, riducendo così lo spazio dell’heap e il numero di cicli di GC.
Dimensione del Backend di Stato Limitata al massimo heap allocato a una JVM. La dimensione del backend di stato RocksDB non è limitata dal limite dell’heap della JVM, ma solo dallo spazio disco disponibile.
Impatto sulle Prestazioni Maggiore impatto sul processamento perché si tratta di uno snapshot completo. Minore impatto sul processamento perché si tratta solo del delta che viene snapshotato.
CPU Utilizzo della CPU solo per il processamento e il GC. Non è coinvolta alcuna serializzazione del backend di stato. L’utilizzo della CPU è maggiore rispetto al Full Checkpoint per la stessa velocità di dati in ingresso.

L’utilizzo della CPU può essere ottimizzato applicando un appropriato meccanismo di serializzazione. Abbiamo sperimentato con Avro e abbiamo ottenuto risultati molto migliori rispetto a Kryo

Caso d’uso migliore Buono per dimensioni ridotte del backend di stato e stati in cambiamento frequente. Buono per un backend di stato più grande e aggiornamenti lenti dello stato.

Source:
https://dzone.com/articles/apache-flink-full-checkpoint-vs-incremental-checkpoint