Apache Flink: Volledige Controlepunt vs Incrementele Controlepunt

Apache Flink is een real-time gegevensstroomverwerkingsengine. De meeste gegevensstroomverwerkingsapplicaties zijn ‘stateful’. Dit betekent dat de status wordt opgeslagen en gebruikt voor verdere verwerking. In Apache Flink wordt de status beheerd via een geconfigureerde statusbackend. Flink ondersteunt twee statusback-ends in productie. Een daarvan is de HashMapStateBackend, en de andere is de EmbeddedRocksDBStateBackend

Om gegevensverlies te voorkomen en fouttolerantie te bereiken, kan Flink momentopnamen van de status opslaan in een duurzame opslag. Flink kan geconfigureerd worden om ofwel de volledige status naar een duurzame locatie te kopiëren ofwel de delta sinds de laatste momentopname. De eerste wordt een volledige controlepunt genoemd, en de laatste staat bekend als de incrementele controlepunt. 

In dit artikel gaan we HashMapStateBackend vergelijken met volledige controlepunten en EmbeddedRocksDBStateBackend met incrementele controlepunten. Dit artikel gaat ervan uit dat het publiek ofwel praktische kennis of theoretische kennis heeft van Apache Flink

Overzicht van Flink State Backend

Om de Flink-statusbackend te begrijpen, is het belangrijk om het verschil te kennen tussen in-flight state en statusmomentopnamen

De in-flight state staat bekend als Flinks werkende status en wordt lokaal opgeslagen. Afhankelijk van de configuratie van de statusback-end, bevindt deze zich ofwel in het heap-geheugen ofwel in het off-heap-geheugen, met mogelijk een overloop naar de lokale schijf.

Aan de andere kant worden staatssnapshots (checkpoint of opslagpunt) opgeslagen op een duurzame externe locatie. Deze snapshots worden gebruikt om de Flink-jobstatus te reconstrueren in geval van een jobfout.

De in-flight status kan verloren gaan als de job mislukt. Dit heeft geen impact op het herstel van de job als het checkpoint is ingeschakeld in de job. Wanneer het checkpoint is geconfigureerd, wordt de status opgehaald uit duurzame opslag op het moment van herstel.

Welke statusbackend moet worden geselecteerd voor de productie, hangt af van de vereisten van de toepassing voor doorvoer, latentie en schaalbaarheid.

Er zijn twee statusbackends die Apache Flink ondersteunt in productie.

1. HashMapStateBackend

Het is een lichtgewicht statusbackend in Flink voor het beheren van de Keyed State en Operator State tijdens de streamverwerking. De status wordt opgeslagen in de Java Heap met behulp van een HashMap-datastructuur. Aangezien het in het geheugen is opgeslagen, is de belangrijkste beperking hier dat de maximale statusgrootte beperkt is tot de Java Heap-grootte. Er is geen serialisatie betrokken bij het schrijven naar de status of lezen van de status. Dit is dus geschikt voor toepassingen met lage latentie, hoge doorvoer en niet al te grote status.

2. EmbeddedRocksDBStateBackend

Deze state backend slaat de in-flight data op in de in-memory RocksDB database. Standaard slaat RocksDB de data op in de lokale schijf van de taakbeheerder. De data wordt geserialiseerd en opgeslagen in een off-heap geheugen en overgezet naar een lokale schijf die is aangesloten op de taakbeheerder. Het serialisatieformaat is afhankelijk van de type serializer geconfigureerd in de applicatie. 

Met deze state backend is de hoeveelheid state die kan worden opgeslagen alleen beperkt door de schijfruimte die is aangesloten op de taakbeheerder. Als de applicatie een grote state heeft en niet in het heap-geheugen kan worden opgeslagen, is dit de juiste state backend. Omdat er serialisatie plaatsvindt, zal de applicatie een hogere latentie hebben en lagere doorvoer in vergelijking met HashMapStateBackend.

Overzicht van Snapshot State

De snapshot vertegenwoordigt de globale state van de Flink Job. Dit bestaat uit een pointer naar elke data source en de state van alle stateful operators van Flink na de verwerking tot die pointers van de bronnen. Checkpointing in Apache Flink is een mechanisme om fouttolerantie te bereiken door periodiek de state op te slaan naar duurzame externe opslag. 

In geval van jobfout haalt Flink de opgeslagen state op uit duurzame externe opslag en begint met het verwerken van de streaming data vanaf waar het was gebleven. Flink maakt gebruik van asynchrone barrière-snapshotting. Het is een variant van het Chandy-Lamport-algoritme. 

Flink ondersteunt twee soorten checkpointing.

1. Volledige Checkpointing

Volledige checkpointing is wanneer de volledige staat van de Flink-taak wordt vastgelegd en opgeslagen in duurzame externe opslag. In geval van een taakfout herstelt de taak vanuit de eerder opgeslagen staat. De opslagruimtevereiste en de tijd die nodig is om te checkpointen zijn volledig afhankelijk van de toepassingsstaat. Het volledige checkpointen werkt met zowel HashMapStateBackend als RocksDBStateBackend.

2. Incrementeel Checkpointen

Incrementeel checkpointen is een geoptimaliseerde aanpak. In plaats van de volledige staat vast te leggen, slaat Flink alleen de ‘delta’s’ op die sinds de laatste checkpoint aan de staat zijn toegevoegd. Dit vermindert de netwerkoverhead en dus de tijd die nodig is voor het checkpointen. Het checkpointen gebeurt volledig asynchroon in dit geval. 

Alleen RocksDBStateBackend ondersteunt het incrementeel checkpointen. Flink maakt gebruik van RocksDB’s interne mechanisme hiervoor. Hoewel checkpointing minder tijd in beslag neemt dan volledig checkpointen, is de hersteltijd bij een taakfout afhankelijk van vele factoren. Als het netwerk een bottleneck is, kan de hersteltijd langer zijn dan bij het herstellen van volledige checkpointing.

Analyse

Pijplijndetails: Apache Beam pijplijn die draait op de Flink-engine met een vast venster van 10 minuten en de checkpoint is geconfigureerd om elke 3 minuten uit te voeren. Het geconfigureerde serialisatietype is AVRO.

  • Clustertype: “m5dn.4xlarge”
  • Definitieve checkpointopslag: S3
  • Aantal unieke sleutels: 2K
Input rate 10k/s (~ 7.8 MB/s)
Type Nee:
van TM
Parallelisme Heap-toewijzing
per TM
Heapgebruik
per TM
Pod-geheugengebruik
per TM
CPU-gebruik
per TM
Controlepunt
Grootte
Controlepunt
Duur
Flink Beheerd
Geheugen
HashMapState
Met Volledig Controlepunt
1 1 10GB 8,5GB 11,1GB 1,2 4GB 50 sec 0
RocksDBState
Met Incrementeel
Controlepunt met AVRO
1 1 3GB 1,82GB 4,63GB 1,5 207MB 3 sec 3GB
Input rate 20k/s (~15.6 MB/s)
Type Nee:
van TM
Parallelisme Heap-toewijzing
per TM
Heapgebruik
per TM
Pod-gebruik
per TM
CPU-gebruik
per TM
Controlepunt
Grootte
Controlepunt
Duur
Flink Beheerd
Geheugen
HashMapState
Met Volledig Controlepunt
2 2 10GB 8,69GB 11,2GB 1,3 8,39GB 50 sec 0
RocksDBState
Met incrementele
Controlepunt met AVRO
2 2 3GB 1,87GB 4,71GB 1,4 404MB 3 sec 3GB

input rate 30k/s (~23.5 MB/s)
Type  Nee:
van TM
Parallelisme Heap-toewijzing
per TM
Heapgebruik
per TM
Podgebruik
per TM
CPU-gebruik
per TM
Controlepunt
Grootte
Controlepunt
Duur
Flink Beheerd
Geheugen
HashMapState
Met volledig controlepunt
3 3 10GB 9,74GB 11,2GB 1,2 11,8GB 65 sec 0
RocksDBState
Met incrementele
 Controlepunt met AVRO
3 3 3GB 1,72GB 3,75GB 1,4 576MB 5 sec 3GB

Zoals u kunt zien uit het bovenstaande experiment, neemt de duur van het controlepunt af met incrementele controlepunt. Dit kan zeer goed helpen bij de prestaties van de toepassing.  

Samenvatting

Hieronder vindt u de samenvatting van het experiment.

HashMapStateBackend met volledig controlepunt RocksDBStateBackend met Incremental Checkpoint
Toepassingslatentie Lage latentie omdat gegevens worden opgeslagen als Java-objecten in de heap. Lezen en schrijven vereisen geen serialisatie. Aangezien bij elke lees- of schrijftoepassing serialisatie betrokken is, zal de latentie hoger zijn.
Schaalbaarheid Minder schaalbaar voor taken met grote toestand Zeer schaalbaar voor taken met grote toestand en langzaam veranderende toestanden
Fouttolerantie Zeer fouttolerant Zeer fouttolerant
Checkpointhouder Checkpointhouder is hoog omdat er elke keer een momentopname wordt gemaakt van de hele dataset. Checkpointhouder is minder omdat alleen de delta sinds de laatste checkpoint wordt opgeslagen.
Herstelcomplexiteit Herstel is eenvoudig omdat slechts één momentopname moet worden geladen. Herstel is complex omdat RocksDB de toestand moet opbouwen uit meerdere checkpoints en veel afhangt van de netwerksnelheid.
Opslagvereiste Ondersteund door zowel HashMapStateBackend als RocksDBStateBackend. Alleen ondersteund door RocksDBStateBackend.
Toestandssnapshot Slaat de gehele toestand op bij elke checkpoint. Slaat alleen de delta op sinds de laatste succesvolle.
Heapgrootte Aangezien de toestand wordt opgeslagen in de heap vóór het maken van een checkpoint, is de heapvereiste hoog en zijn meer GC-cycli te verwachten. Statistieken worden opgeslagen in het off-heap en mogelijk op de lokale schijf, dus minder Heap-ruimte en minder GC-cycli.
State Backend Grootte Beperkt tot het maximaal toegewezen geheugen aan een JVM. De grootte van de RocksDB state backend wordt niet beperkt door de JVM Heap-limiet maar alleen door de beschikbare schijfruimte.
Impact op Prestaties Grotere impact op verwerking omdat het een volledige snapshot is. Minder impact op verwerking omdat alleen de delta wordt gesnapshot.
CPU CPU-gebruik is alleen voor verwerking en GC. Er is geen serialisatie van de state back-end bij betrokken. CPU-gebruik is hoger vergeleken met een volledige checkpoint voor dezelfde invoergegevenssnelheid.

De CPU-utilisatie kan worden geoptimaliseerd door een juist serialisatiemechanisme toe te passen. We hebben geëxperimenteerd met Avro en kregen veel betere resultaten in vergelijking met Kryo

Beste Gebruiksscenario Goed voor kleinere state backend grootte en frequent veranderende staat. Goed voor een grotere state backend en langzaam bijwerkende staat.

Source:
https://dzone.com/articles/apache-flink-full-checkpoint-vs-incremental-checkpoint