Apache Flink is een real-time gegevensstroomverwerkingsengine. De meeste gegevensstroomverwerkingsapplicaties zijn ‘stateful’. Dit betekent dat de status wordt opgeslagen en gebruikt voor verdere verwerking. In Apache Flink wordt de status beheerd via een geconfigureerde statusbackend. Flink ondersteunt twee statusback-ends in productie. Een daarvan is de HashMapStateBackend
, en de andere is de EmbeddedRocksDBStateBackend
.
Om gegevensverlies te voorkomen en fouttolerantie te bereiken, kan Flink momentopnamen van de status opslaan in een duurzame opslag. Flink kan geconfigureerd worden om ofwel de volledige status naar een duurzame locatie te kopiëren ofwel de delta sinds de laatste momentopname. De eerste wordt een volledige controlepunt genoemd, en de laatste staat bekend als de incrementele controlepunt.
In dit artikel gaan we HashMapStateBackend
vergelijken met volledige controlepunten en EmbeddedRocksDBStateBackend
met incrementele controlepunten. Dit artikel gaat ervan uit dat het publiek ofwel praktische kennis of theoretische kennis heeft van Apache Flink.
Overzicht van Flink State Backend
Om de Flink-statusbackend te begrijpen, is het belangrijk om het verschil te kennen tussen in-flight state en statusmomentopnamen.
De in-flight state staat bekend als Flinks werkende status en wordt lokaal opgeslagen. Afhankelijk van de configuratie van de statusback-end, bevindt deze zich ofwel in het heap-geheugen ofwel in het off-heap-geheugen, met mogelijk een overloop naar de lokale schijf.
Aan de andere kant worden staatssnapshots (checkpoint of opslagpunt) opgeslagen op een duurzame externe locatie. Deze snapshots worden gebruikt om de Flink-jobstatus te reconstrueren in geval van een jobfout.
De in-flight status kan verloren gaan als de job mislukt. Dit heeft geen impact op het herstel van de job als het checkpoint is ingeschakeld in de job. Wanneer het checkpoint is geconfigureerd, wordt de status opgehaald uit duurzame opslag op het moment van herstel.
Welke statusbackend moet worden geselecteerd voor de productie, hangt af van de vereisten van de toepassing voor doorvoer, latentie en schaalbaarheid.
Er zijn twee statusbackends die Apache Flink ondersteunt in productie.
1. HashMapStateBackend
Het is een lichtgewicht statusbackend in Flink voor het beheren van de Keyed State en Operator State tijdens de streamverwerking. De status wordt opgeslagen in de Java Heap met behulp van een HashMap-datastructuur. Aangezien het in het geheugen is opgeslagen, is de belangrijkste beperking hier dat de maximale statusgrootte beperkt is tot de Java Heap-grootte. Er is geen serialisatie betrokken bij het schrijven naar de status of lezen van de status. Dit is dus geschikt voor toepassingen met lage latentie, hoge doorvoer en niet al te grote status.
2. EmbeddedRocksDBStateBackend
Deze state backend slaat de in-flight data op in de in-memory RocksDB database. Standaard slaat RocksDB de data op in de lokale schijf van de taakbeheerder. De data wordt geserialiseerd en opgeslagen in een off-heap geheugen en overgezet naar een lokale schijf die is aangesloten op de taakbeheerder. Het serialisatieformaat is afhankelijk van de type serializer geconfigureerd in de applicatie.
Met deze state backend is de hoeveelheid state die kan worden opgeslagen alleen beperkt door de schijfruimte die is aangesloten op de taakbeheerder. Als de applicatie een grote state heeft en niet in het heap-geheugen kan worden opgeslagen, is dit de juiste state backend. Omdat er serialisatie plaatsvindt, zal de applicatie een hogere latentie hebben en lagere doorvoer in vergelijking met HashMapStateBackend
.
Overzicht van Snapshot State
De snapshot vertegenwoordigt de globale state van de Flink Job. Dit bestaat uit een pointer naar elke data source en de state van alle stateful operators van Flink na de verwerking tot die pointers van de bronnen. Checkpointing in Apache Flink is een mechanisme om fouttolerantie te bereiken door periodiek de state op te slaan naar duurzame externe opslag.
In geval van jobfout haalt Flink de opgeslagen state op uit duurzame externe opslag en begint met het verwerken van de streaming data vanaf waar het was gebleven. Flink maakt gebruik van asynchrone barrière-snapshotting. Het is een variant van het Chandy-Lamport-algoritme.
Flink ondersteunt twee soorten checkpointing.
1. Volledige Checkpointing
Volledige checkpointing is wanneer de volledige staat van de Flink-taak wordt vastgelegd en opgeslagen in duurzame externe opslag. In geval van een taakfout herstelt de taak vanuit de eerder opgeslagen staat. De opslagruimtevereiste en de tijd die nodig is om te checkpointen zijn volledig afhankelijk van de toepassingsstaat. Het volledige checkpointen werkt met zowel HashMapStateBackend
als RocksDBStateBackend
.
2. Incrementeel Checkpointen
Incrementeel checkpointen is een geoptimaliseerde aanpak. In plaats van de volledige staat vast te leggen, slaat Flink alleen de ‘delta’s’ op die sinds de laatste checkpoint aan de staat zijn toegevoegd. Dit vermindert de netwerkoverhead en dus de tijd die nodig is voor het checkpointen. Het checkpointen gebeurt volledig asynchroon in dit geval.
Alleen RocksDBStateBackend
ondersteunt het incrementeel checkpointen. Flink maakt gebruik van RocksDB’s interne mechanisme hiervoor. Hoewel checkpointing minder tijd in beslag neemt dan volledig checkpointen, is de hersteltijd bij een taakfout afhankelijk van vele factoren. Als het netwerk een bottleneck is, kan de hersteltijd langer zijn dan bij het herstellen van volledige checkpointing.
Analyse
Pijplijndetails: Apache Beam pijplijn die draait op de Flink-engine met een vast venster van 10 minuten en de checkpoint is geconfigureerd om elke 3 minuten uit te voeren. Het geconfigureerde serialisatietype is AVRO.
- Clustertype: “m5dn.4xlarge”
- Definitieve checkpointopslag: S3
- Aantal unieke sleutels: 2K
Input rate 10k/s (~ 7.8 MB/s) | |||||||||
---|---|---|---|---|---|---|---|---|---|
Type | Nee: van TM |
Parallelisme | Heap-toewijzing per TM |
Heapgebruik per TM |
Pod-geheugengebruik per TM |
CPU-gebruik per TM |
Controlepunt Grootte |
Controlepunt Duur |
Flink Beheerd Geheugen |
HashMapState Met Volledig Controlepunt |
1 | 1 | 10GB | 8,5GB | 11,1GB | 1,2 | 4GB | 50 sec | 0 |
RocksDBState Met Incrementeel Controlepunt met AVRO |
1 | 1 | 3GB | 1,82GB | 4,63GB | 1,5 | 207MB | 3 sec | 3GB |
Input rate 20k/s (~15.6 MB/s) | |||||||||
---|---|---|---|---|---|---|---|---|---|
Type | Nee: van TM |
Parallelisme | Heap-toewijzing per TM |
Heapgebruik per TM |
Pod-gebruik per TM |
CPU-gebruik per TM |
Controlepunt Grootte |
Controlepunt Duur |
Flink Beheerd Geheugen |
HashMapState Met Volledig Controlepunt |
2 | 2 | 10GB | 8,69GB | 11,2GB | 1,3 | 8,39GB | 50 sec | 0 |
RocksDBState Met incrementele Controlepunt met AVRO |
2 | 2 | 3GB | 1,87GB | 4,71GB | 1,4 | 404MB | 3 sec | 3GB |
input rate 30k/s (~23.5 MB/s) | |||||||||
---|---|---|---|---|---|---|---|---|---|
Type | Nee: van TM |
Parallelisme | Heap-toewijzing per TM |
Heapgebruik per TM |
Podgebruik per TM |
CPU-gebruik per TM |
Controlepunt Grootte |
Controlepunt Duur |
Flink Beheerd Geheugen |
HashMapState Met volledig controlepunt |
3 | 3 | 10GB | 9,74GB | 11,2GB | 1,2 | 11,8GB | 65 sec | 0 |
RocksDBState Met incrementele Controlepunt met AVRO |
3 | 3 | 3GB | 1,72GB | 3,75GB | 1,4 | 576MB | 5 sec | 3GB |
Zoals u kunt zien uit het bovenstaande experiment, neemt de duur van het controlepunt af met incrementele controlepunt. Dit kan zeer goed helpen bij de prestaties van de toepassing.
Samenvatting
Hieronder vindt u de samenvatting van het experiment.
HashMapStateBackend met volledig controlepunt | RocksDBStateBackend met Incremental Checkpoint | |
Toepassingslatentie | Lage latentie omdat gegevens worden opgeslagen als Java-objecten in de heap. Lezen en schrijven vereisen geen serialisatie. | Aangezien bij elke lees- of schrijftoepassing serialisatie betrokken is, zal de latentie hoger zijn. |
Schaalbaarheid | Minder schaalbaar voor taken met grote toestand | Zeer schaalbaar voor taken met grote toestand en langzaam veranderende toestanden |
Fouttolerantie | Zeer fouttolerant | Zeer fouttolerant |
Checkpointhouder | Checkpointhouder is hoog omdat er elke keer een momentopname wordt gemaakt van de hele dataset. | Checkpointhouder is minder omdat alleen de delta sinds de laatste checkpoint wordt opgeslagen. |
Herstelcomplexiteit | Herstel is eenvoudig omdat slechts één momentopname moet worden geladen. | Herstel is complex omdat RocksDB de toestand moet opbouwen uit meerdere checkpoints en veel afhangt van de netwerksnelheid. |
Opslagvereiste | Ondersteund door zowel HashMapStateBackend als RocksDBStateBackend. | Alleen ondersteund door RocksDBStateBackend. |
Toestandssnapshot | Slaat de gehele toestand op bij elke checkpoint. | Slaat alleen de delta op sinds de laatste succesvolle. |
Heapgrootte | Aangezien de toestand wordt opgeslagen in de heap vóór het maken van een checkpoint, is de heapvereiste hoog en zijn meer GC-cycli te verwachten. | Statistieken worden opgeslagen in het off-heap en mogelijk op de lokale schijf, dus minder Heap-ruimte en minder GC-cycli. |
State Backend Grootte | Beperkt tot het maximaal toegewezen geheugen aan een JVM. | De grootte van de RocksDB state backend wordt niet beperkt door de JVM Heap-limiet maar alleen door de beschikbare schijfruimte. |
Impact op Prestaties | Grotere impact op verwerking omdat het een volledige snapshot is. | Minder impact op verwerking omdat alleen de delta wordt gesnapshot. |
CPU | CPU-gebruik is alleen voor verwerking en GC. Er is geen serialisatie van de state back-end bij betrokken. | CPU-gebruik is hoger vergeleken met een volledige checkpoint voor dezelfde invoergegevenssnelheid.
De CPU-utilisatie kan worden geoptimaliseerd door een juist serialisatiemechanisme toe te passen. We hebben geëxperimenteerd met Avro en kregen veel betere resultaten in vergelijking met Kryo |
Beste Gebruiksscenario | Goed voor kleinere state backend grootte en frequent veranderende staat. | Goed voor een grotere state backend en langzaam bijwerkende staat. |
Source:
https://dzone.com/articles/apache-flink-full-checkpoint-vs-incremental-checkpoint