Apache Flink est un moteur de traitement de flux de données en temps réel. La plupart des applications de traitement de flux sont « avec état ». Cela signifie que l’état est stocké et utilisé pour un traitement ultérieur. Dans Apache Flink, l’état est géré à travers un backend d’état configuré. Flink prend en charge deux backends d’état en production. L’un est le HashMapStateBackend
, et l’autre est le EmbeddedRocksDBStateBackend
.
Pour éviter la perte de données et atteindre la tolérance aux pannes, Flink peut persister des instantanés de l’état dans un stockage durable. Flink peut être configuré pour prendre un instantané soit de l’intégralité de l’état dans un emplacement durable, soit du delta depuis le dernier instantané. Le premier est appelé point de contrôle complet, et le dernier est connu sous le nom de point de contrôle incrémentiel.
Dans cet article, nous allons comparer HashMapStateBackend
avec point de contrôle complet et EmbeddedRocksDBStateBackend
avec point de contrôle incrémentiel. Cet article suppose que le public a soit des connaissances pratiques, soit des connaissances théoriques de Apache Flink.
Vue d’ensemble du backend d’état de Flink
Pour comprendre le backend d’état de Flink, il est important de connaître la différence entre l’état en vol et les instantanés d’état.
L’ état en vol est connu comme l’état de travail de Flink et est stocké localement. En fonction de la configuration du backend d’état, il est soit dans la mémoire de tas, soit dans la mémoire hors tas, avec un possible débordement sur le disque local.
D’autre part, les instantanés d’état (point de contrôle ou point de sauvegarde) sont stockés dans un emplacement distant durable. Ces instantanés sont utilisés pour reconstruire l’état du job Flink en cas de défaillance du job.
L’état en cours peut être perdu si le job échoue. Cela n’impacte pas la récupération du job si le point de contrôle est activé dans le job. Lorsque le point de contrôle est configuré, l’état est récupéré à partir du stockage durable au moment de la récupération.
Le choix de l’arrière-plan d’état à sélectionner pour la production dépend des exigences de l’application en matière de débit, de latence et de scalabilité.
Il existe deux arrière-plans d’état que Apache Flink supporte en production.
1. HashMapStateBackend
C’est un arrière-plan d’état léger dans Flink pour gérer l’état clé et l’état d’opérateur pendant le traitement de flux. L’état est stocké dans la mémoire Java à l’aide d’une structure de données HashMap. Étant donné qu’il est stocké en mémoire, la principale contrainte ici est que la taille maximale de l’état est limitée à la taille de la mémoire Java. Il n’y a pas de sérialisation impliquée dans l’écriture ou la lecture de l’état. Donc, cela convient aux applications à faible latence, à haut débit et à état pas trop volumineux.
2. EmbeddedRocksDBStateBackend
Ce backend d’état stocke les données en cours dans la base de données en mémoire RocksDB. Par défaut, RocksDB stocke les données sur le disque local du gestionnaire de tâches. Les données sont sérialisées et stockées dans une mémoire hors tas et débordent sur un disque local attaché au gestionnaire de tâches. Le format de sérialisation dépend du sérialiseur de type configuré dans l’application.
Avec ce backend d’état, la quantité d’état pouvant être stockée est uniquement limitée par l’espace disque attaché au gestionnaire de tâches. Si l’application a un état énorme et ne peut pas être contenu dans la mémoire tas, c’est le bon backend d’état. Étant donné que la sérialisation est impliquée, l’application va avoir une latence plus élevée et un débit plus faible par rapport à HashMapStateBackend
.
Vue d’ensemble de l’état de snapshot
Le snapshot représente l’état global du travail Flink. Cela consiste en un pointeur vers chaque source de données et l’état de tous les opérateurs à état de Flink après traitement jusqu’à ces pointeurs à partir des sources. Le point de contrôle dans Apache Flink est un mécanisme pour atteindre la tolérance aux pannes en sauvegardant périodiquement l’état dans un stockage distant durable.
En cas d’échec de l’emploi, Flink récupère l’état stocké depuis le stockage distant durable et commence à traiter les données de streaming à partir de l’endroit où il s’était arrêté. Flink utilise la capture d’instantané de barrière asynchrone. C’est une variante de l’algorithme de Chandy-Lamport.
Flink prend en charge deux types de point de contrôle.
1. Point de contrôle complet
Le checkpointing complet est celui où l’état entier du travail Flink est capturé et stocké dans un stockage distant durable. En cas de défaillance du travail, celui-ci se rétablit à partir de l’état précédemment stocké. L’espace de stockage requis et le temps nécessaire pour effectuer le checkpointing dépendent entièrement de l’état de l’application. Le checkpointing complet fonctionne avec à la fois HashMapStateBackend
et RocksDBStateBackend
.
2. Checkpointing Incrémental
Le checkpointing incrémental est une approche optimisée. Au lieu de prendre un instantané de l’état entier, Flink ne sauvegarde que les « deltas » apportés à l’état depuis le dernier checkpoint. Cela réduit la surcharge réseau et, par conséquent, le temps nécessaire pour le checkpointing. Le checkpointing se fait entièrement de manière asynchrone dans ce cas.
Seul RocksDBStateBackend
prend en charge le checkpointing incrémental. Flink tire parti du mécanisme interne de RocksDB pour cela. Même si le checkpointing prend moins de temps que le checkpointing complet, en cas de défaillance du travail, le temps de récupération dépend de nombreux facteurs. Si le réseau est un goulot d’étranglement, le temps de récupération peut être supérieur à celui de la récupération après un checkpointing complet.
Analyse
Détails du pipeline : Pipeline Apache Beam fonctionnant sur le moteur Flink avec une fenêtre fixe de 10 minutes et le checkpoint est configuré pour s’exécuter toutes les 3 minutes. Le type de sérialisation configuré est AVRO.
- Type de cluster : « m5dn.4xlarge »
- Stockage final des checkpoints : S3
- Nombre de clés uniques : 2K
Input rate 10k/s (~ 7.8 MB/s) | |||||||||
---|---|---|---|---|---|---|---|---|---|
Tapez | Non : de TM |
Parallélisme | Allocation de Tas par TM |
Utilisation du Tas par TM |
Utilisation de la Mémoire du Pod par TM |
Utilisation du CPU par TM |
Point de Contrôle Taille |
Point de Contrôle Durée |
Mémoire Gérée par Flink |
État HashMap Avec Point de Contrôle Complet |
1 | 1 | 10 Go | 8.5 Go | 11.1 Go | 1.2 | 4 Go | 50 sec | 0 |
État RocksDB Avec Point de Contrôle Incrémental avec AVRO |
1 | 1 | 3 Go | 1.82 Go | 4.63 Go | 1.5 | 207 Mo | 3 sec | 3 Go |
Input rate 20k/s (~15.6 MB/s) | |||||||||
---|---|---|---|---|---|---|---|---|---|
Type | Non : de TM |
Parallélisme | Allocation de Tas par TM |
Utilisation du Tas par TM |
Utilisation du Pod par TM |
Utilisation du CPU par TM |
Point de Contrôle Taille |
Point de Contrôle Durée |
Mémoire Gérée par Flink |
État HashMap Avec Point de Contrôle Complet |
2 | 2 | 10 Go | 8.69 Go | 11.2 Go | 1.3 | 8.39 Go | 50 sec | 0 |
RocksDBState Avec un Checkpoint Incrémental avec AVRO |
2 | 2 | 3GB | 1.87GB | 4.71GB | 1.4 | 404MB | 3 sec | 3GB |
input rate 30k/s (~23.5 MB/s) | |||||||||
---|---|---|---|---|---|---|---|---|---|
Type | Non: de TM |
Parallélisme | Allocation de Tas par TM |
Utilisation de Tas par TM |
Utilisation de Pod par TM |
Utilisation du CPU par TM |
Checkpoint Taille |
Checkpoint Durée |
Mémoire Gérée par Flink |
HashMapState Avec Checkpoint Complet |
3 | 3 | 10GB | 9.74GB | 11.2GB | 1.2 | 11.8GB | 65 sec | 0 |
RocksDBState Avec Checkpoint Incrémental avec AVRO |
3 | 3 | 3GB | 1.72GB | 3.75GB | 1.4 | 576MB | 5 sec | 3GB |
Comme vous pouvez le voir dans l’expérience ci-dessus, la durée du checkpoint diminue avec le checkpointing incrémental. Cela peut très bien aider à la performance de l’application.
Résumé
Voici le résumé de l’expérience.
HashMapStateBackend avec Checkpoint Complet | RocksDBStateBackend avec Point de Contrôle Incrémental | |
Latence de l’Application | Faible latence car les données sont stockées sous forme d’objets Java dans le tas. La lecture et l’écriture n’impliquent aucune sérialisation. | Étant donné que la sérialisation est impliquée dans chaque lecture ou écriture, la latence sera plus élevée. |
Scalabilité | Moins évolutif pour les tâches avec un état large | Hautement évolutif pour les tâches avec un état large et des états changeants lentement |
Tolérance aux pannes | Hautement tolérant aux pannes | Hautement tolérant aux pannes |
Durée du point de contrôle | La durée du point de contrôle est élevée car un instantané est pris pour l’ensemble du jeu de données à chaque fois. | La durée de pointage est réduite car seul le delta depuis le dernier point de contrôle est sauvegardé. |
Complexité de récupération | La récupération est facile car seul un instantané doit être chargé. | La récupération est complexe car RocksDB doit reconstruire l’état à partir de plusieurs points de contrôle et beaucoup dépend de la vitesse du réseau. |
Exigence de stockage | Supporté à la fois par HashMapStateBackend et RocksDBStatebackend. | Supporté uniquement par RocksDBStatebackend. |
Instantané d’état | Enregistre l’ensemble de l’état à chaque point de contrôle. | Enregistre uniquement le delta depuis le dernier réussi. |
Taille du tas | Étant donné que l’état est stocké dans le tas avant le pointage, l’exigence de tas est élevée et plus de cycles de GC sont à prévoir. | Les états sont stockés hors tas et éventuellement sur le disque local, réduisant ainsi l’espace de tas et diminuant les cycles de GC. |
Taille du backend d’état | Limitée à la taille maximale du tas alloué à une JVM. | La taille du backend d’état RocksDB n’est pas limitée par la limite du tas JVM mais seulement par l’espace disque disponible. |
Impact sur les performances | Impact plus élevé sur le traitement parce qu’il s’agit d’un instantané complet. | Impact moindre sur le traitement car seul le delta est instantané. |
CPU | L’utilisation du CPU est uniquement pour le traitement et le GC. Aucune sérialisation du backend d’état n’est impliquée. | L’utilisation du CPU est plus élevée par rapport à un point de contrôle complet pour le même taux de données d’entrée.
L’utilisation du CPU peut être optimisée en appliquant un mécanisme de sérialisation approprié. Nous avons expérimenté avec Avro et obtenu de bien meilleurs résultats par rapport à Kryo |
Meilleur cas d’utilisation | Bon pour une taille de backend d’état plus petite et un état changeant fréquemment. | Bon pour un backend d’état plus important et une mise à jour lente de l’état. |
Source:
https://dzone.com/articles/apache-flink-full-checkpoint-vs-incremental-checkpoint