Apache Flink è un motore per l’elaborazione in tempo reale di flussi di dati. La maggior parte delle applicazioni di elaborazione dei flussi sono “stateful”. Ciò significa che lo stato viene memorizzato e utilizzato per ulteriori elaborazioni. In Apache Flink, lo stato è gestito tramite un backend di stato configurato. Flink supporta due backend di stato in produzione. Uno è il HashMapStateBackend
, e l’altro è il EmbeddedRocksDBStateBackend
.
Per prevenire la perdita di dati e ottenere tolleranza ai guasti, Flink può persistere istantanee dello stato in uno storage durevole. Flink può essere configurato per catturare un’istantanea dell’intero stato in una posizione durevole o del delta dall’ultima istantanea. La prima è chiamata full checkpoint, e l’ultima è nota come incremental checkpoint.
In questo articolo, confronteremo HashMapStateBackend
con full checkpoint e EmbeddedRocksDBStateBackend
con incremental checkpointing. Questo articolo presuppone che il pubblico abbia una conoscenza pratica o teorica di Apache Flink.
Panoramica del Backend di Stato di Flink
Per comprendere il backend di stato di Flink, è importante conoscere la differenza tra in-flight state e state snapshots.
L’ in-flight state è conosciuto come lo stato di lavoro di Flink ed è memorizzato localmente. In base alla configurazione del backend di stato, è memorizzato in memoria heap o off-heap, con una possibile fuoriuscita su disco locale.
D’altra parte, gli snapshot di stato (checkpoint o punto di salvataggio) sono memorizzati in una posizione remota durevole. Questi snapshot sono utilizzati per ricostruire lo stato del lavoro di Flink in caso di fallimento del lavoro.
Lo stato in volo può essere perso se il lavoro fallisce. Questo non influisce sul recupero del lavoro se il checkpoint è abilitato nel lavoro. Quando il checkpoint è configurato, lo stato viene recuperato dallo storage durevole al momento del recupero.
Quale backend di stato selezionare per la produzione dipende dai requisiti dell’applicazione in termini di throughput, latenza e scalabilità.
Ci sono due backend di stato che Apache Flink supporta in produzione.
1. HashMapStateBackend
È un backend di stato leggero in Flink per gestire lo Stato Keyed e lo Stato dell’Operatore durante l’elaborazione del flusso. Lo stato è memorizzato nella Java Heap utilizzando una struttura dati HashMap. Poiché è memorizzato in memoria, il principale vincolo qui è che la dimensione massima dello stato è limitata alla dimensione della Java Heap. Non c’è serializzazione coinvolta nella scrittura o nella lettura dello stato. Quindi, questo è adatto per applicazioni a bassa latenza, ad alto throughput e con stati non troppo grandi.
2. EmbeddedRocksDBStateBackend
Questo backend di stato memorizza i dati in volo nel database RocksDB in memoria. Per impostazione predefinita, RocksDB memorizza i dati nel disco locale del task manager. I dati vengono serializzati e memorizzati in una memoria off-heap e trasferiti su un disco locale collegato al task manager. Il formato di serializzazione dipende dal tipo di serializer configurato nell’applicazione.
Con questo backend di stato, la quantità di stato che può essere memorizzata è limitata solo dallo spazio su disco collegato al task manager. Se l’applicazione ha uno stato enorme e non può essere contenuta nella memoria heap, questo è il backend di stato giusto. Poiché è coinvolta la serializzazione, l’applicazione avrà una latenza più elevata e una minore capacità di elaborazione rispetto a HashMapStateBackend
.
Panoramica dello Stato di Snapshot
Lo snapshot rappresenta lo stato globale del Job Flink. Questo consiste in un puntatore a ciascuna fonte di dati e nello stato di tutti gli operatori statali di Flink dopo aver elaborato fino a quei puntatori dalle fonti. Il checkpointing in Apache Flink è un meccanismo per ottenere tolleranza ai guasti salvando periodicamente lo stato in uno storage remoto durevole.
In caso di guasto del job, Flink recupera lo stato memorizzato dallo storage remoto durevole e inizia a elaborare i dati in streaming da dove si era fermato. Flink utilizza snapshotting asincrono a barriera. È una variante dell’algoritmo di Chandy-Lamport.
Flink supporta due tipi di checkpointing.
1. Checkpointing Completo
Il checkpointing completo è dove l’intero stato del lavoro Flink viene catturato e memorizzato in uno storage remoto durevole. In caso di errore del lavoro, il lavoro si riprende dallo stato memorizzato in precedenza. I requisiti di spazio di archiviazione e il tempo necessario per effettuare il checkpointing dipendono interamente dallo stato dell’applicazione. Il checkpointing completo funziona sia con HashMapStateBackend
che con RocksDBStateBackend
.
2. Checkpointing Incrementale
Il checkpointing incrementale è un approccio ottimizzato. Invece di acquisire un’istantanea dell’intero stato, Flink salva solo i ‘delta’ apportati allo stato dall’ultimo checkpoint. Questo riduce il sovraccarico di rete e, quindi, il tempo necessario per il checkpointing. In questo caso, il checkpointing avviene completamente in modo asincrono.
Solo RocksDBStateBackend
supporta il checkpointing incrementale. Flink sfrutta il meccanismo interno di RocksDB per questo. Anche se il checkpointing richiede meno tempo rispetto al checkpointing completo, in caso di errore del lavoro, il tempo di recupero dipende da molti fattori. Se la rete è un collo di bottiglia, il tempo di recupero può essere superiore a quello del recupero dal checkpointing completo.
Analisi
Dettagli della pipeline: Pipeline Apache Beam in esecuzione sul motore Flink con una finestra fissa di 10 minuti e il checkpoint è configurato per essere eseguito ogni 3 minuti. Il tipo di serializzazione configurato è AVRO.
- Tipo di cluster: “m5dn.4xlarge”
- Storage finale del checkpoint: S3
- Numero di chiavi uniche: 2K
Input rate 10k/s (~ 7.8 MB/s) | |||||||||
---|---|---|---|---|---|---|---|---|---|
Tipo | No: di TM |
Parallelismo | Allocazione Heap per TM |
Utilizzo Heap per TM |
Utilizzo Memoria Pod per TM |
Utilizzo CPU per TM |
Checkpoint Dimensione |
Checkpoint Durata |
Flink Gestito Memoria |
HashMapState Con Checkpoint Completo |
1 | 1 | 10GB | 8.5GB | 11.1GB | 1.2 | 4GB | 50 sec | 0 |
RocksDBState Con Incrementale Checkpoint con AVRO |
1 | 1 | 3GB | 1.82GB | 4.63GB | 1.5 | 207MB | 3 sec | 3GB |
Input rate 20k/s (~15.6 MB/s) | |||||||||
---|---|---|---|---|---|---|---|---|---|
Tipo | No: di TM |
Parallelismo | Allocazione Heap per TM |
Utilizzo Heap per TM |
Utilizzo Pod per TM |
Utilizzo CPU per TM |
Checkpoint Dimensione |
Checkpoint Durata |
Flink Gestito Memoria |
HashMapState Con Checkpoint Completo |
2 | 2 | 10GB | 8.69GB | 11.2GB | 1.3 | 8.39GB | 50 sec | 0 |
RocksDBState Con Incrementale Checkpoint con AVRO |
2 | 2 | 3GB | 1.87GB | 4.71GB | 1.4 | 404MB | 3 sec | 3GB |
input rate 30k/s (~23.5 MB/s) | |||||||||
---|---|---|---|---|---|---|---|---|---|
Tipo | Numero: di TM |
Parallelismo | Allocazione Heap per TM |
Utilizzo Heap per TM |
Utilizzo Pod per TM |
Utilizzo CPU per TM |
Checkpoint Dimensione |
Checkpoint Durata |
Memoria Gestita da Flink |
HashMapState Con Full Checkpoint |
3 | 3 | 10GB | 9.74GB | 11.2GB | 1.2 | 11.8GB | 65 sec | 0 |
RocksDBState Con Incrementale Checkpoint con AVRO |
3 | 3 | 3GB | 1.72GB | 3.75GB | 1.4 | 576MB | 5 sec | 3GB |
Come puoi vedere dall’esperimento sopra, la durata del checkpoint diminuisce con il checkpointing incrementale. Questo può aiutare molto le prestazioni dell’applicazione.
Riepilogo
Di seguito è riportato il riepilogo dell’esperimento.
HashMapStateBackend con Full Checkpoint | RocksDBStateBackend con Checkpoint Incrementale | |
Latenza dell’Applicazione | Bassa latenza perché i dati sono memorizzati come Oggetti Java nell’Heap. La lettura e la scrittura non comportano alcuna serializzazione. | Poiché la serializzazione è coinvolta in ogni operazione di lettura o scrittura, la latenza sarà più alta. |
Scalabilità | Meno scalabile per lavori con stato di grandi dimensioni | Altamente scalabile per lavori con stato di grandi dimensioni e stati che cambiano lentamente |
Tolleranza ai Guasti | Altamente tollerante ai guasti | Altamente tollerante ai guasti |
Durata del Checkpoint | La durata del checkpoint è alta perché viene eseguita la snapshotting per l’intero set di dati ogni volta. | La durata del checkpointing è minore perché viene salvato solo il delta dall’ultimo checkpoint. |
Complessità di Recupero | Il recupero è semplice perché deve essere caricata solo una snapshot. | Il recupero è complesso perché RocksDB deve ricostruire lo stato da più checkpoint e molto dipende dalla velocità della rete. |
Requisiti di Archiviazione | Supportato sia da HashMapStateBackend che da RocksDBStatebackend. | Supportato solo da RocksDBStatebackend. |
Snapshot dello Stato | Salva l’intero stato ad ogni checkpoint. | Salva solo il delta dall’ultimo riuscito. |
Dimensione dell’Heap | Poiché lo stato è memorizzato nell’Heap prima del checkpointing, il requisito dell’Heap è alto e ci si aspetta più cicli di GC. | Gli stati sono memorizzati al di fuori dell’heap e possibilmente sul disco locale, riducendo così lo spazio dell’heap e il numero di cicli di GC. |
Dimensione del Backend di Stato | Limitata al massimo heap allocato a una JVM. | La dimensione del backend di stato RocksDB non è limitata dal limite dell’heap della JVM, ma solo dallo spazio disco disponibile. |
Impatto sulle Prestazioni | Maggiore impatto sul processamento perché si tratta di uno snapshot completo. | Minore impatto sul processamento perché si tratta solo del delta che viene snapshotato. |
CPU | Utilizzo della CPU solo per il processamento e il GC. Non è coinvolta alcuna serializzazione del backend di stato. | L’utilizzo della CPU è maggiore rispetto al Full Checkpoint per la stessa velocità di dati in ingresso.
L’utilizzo della CPU può essere ottimizzato applicando un appropriato meccanismo di serializzazione. Abbiamo sperimentato con Avro e abbiamo ottenuto risultati molto migliori rispetto a Kryo |
Caso d’uso migliore | Buono per dimensioni ridotte del backend di stato e stati in cambiamento frequente. | Buono per un backend di stato più grande e aggiornamenti lenti dello stato. |
Source:
https://dzone.com/articles/apache-flink-full-checkpoint-vs-incremental-checkpoint