Apache Flink es un motor de procesamiento de datos en tiempo real. La mayoría de las aplicaciones de procesamiento de flujos son ‘stateful’. Esto significa que el estado se almacena y se utiliza para un procesamiento posterior. En Apache Flink, el estado se gestiona a través de un almacén de estado configurado. Flink admite dos almacenes de estado en producción. Uno es el HashMapStateBackend
, y el otro es el EmbeddedRocksDBStateBackend
.
Para evitar la pérdida de datos y lograr la tolerancia a fallos, Flink puede persistir instantáneas del estado en un almacenamiento duradero. Flink puede configurarse para almacenar en una ubicación duradera tanto el estado completo como el delta desde la última instantánea. El primero se llama punto de control completo, y el segundo se conoce como punto de control incremental.
En este artículo, vamos a comparar HashMapStateBackend
con punto de control completo y EmbeddedRocksDBStateBackend
con el uso de puntos de control incrementales. Este artículo asume que la audiencia tiene conocimientos prácticos o teóricos de Apache Flink.
Resumen del Almacén de Estado de Flink
Para entender el almacén de estado de Flink, es importante conocer la diferencia entre estado en vuelo y instantáneas de estado.
El estado en vuelo se conoce como estado de trabajo de Flink y se almacena localmente. Según la configuración del almacén de estado, puede estar en memoria heap o fuera de la memoria heap, con la posibilidad de desbordamiento al disco local.
Por otro lado, los state snapshots (punto de control o punto de guardado) se almacenan en una ubicación remota duradera. Estos snapshots se utilizan para reconstruir el estado del trabajo Flink en caso de un fallo del trabajo.
El estado en vuelo puede perderse si el trabajo falla. Esto no afecta a la recuperación del trabajo si el punto de control está habilitado en el trabajo. Cuando el punto de control está configurado, el estado se recupera desde el almacenamiento duradero en el momento de la recuperación.
Qué backend de estado se debe seleccionar para la producción depende de los requisitos de la aplicación en cuanto a rendimiento, latencia y escalabilidad.
Hay dos backends de estado que Apache Flink admite en producción.
1. HashMapStateBackend
Es un backend de estado ligero en Flink para gestionar el Estado Clave y el Estado del Operador durante el procesamiento de flujo. El estado se almacena en el montón de Java utilizando una estructura de datos HashMap. Dado que se almacena en memoria, la principal limitación aquí es que el tamaño máximo del estado está limitado al tamaño del montón de Java. No hay serialización involucrada al escribir en el estado o leer desde el estado. Por lo tanto, esto es adecuado para aplicaciones de baja latencia, alto rendimiento y estado no tan grande.
2. EmbeddedRocksDBStateBackend
Este estado backend almacena los datos en curso en la base de datos RocksDB en memoria. Por defecto, RocksDB almacena los datos en el disco local del administrador de tareas. Los datos se serializan y almacenan en memoria fuera del heap y luego se transfieren a un disco local conectado al administrador de tareas. El formato de serialización depende del serializador de tipo configurado en la aplicación.
Con este estado backend, la cantidad de estado que se puede almacenar está limitada únicamente por el espacio en disco conectado al administrador de tareas. Si la aplicación tiene un estado enorme que no cabe en la memoria heap, este es el estado backend adecuado. Dado que implica serialización, la aplicación tendrá una latencia más alta y un rendimiento más bajo en comparación con HashMapStateBackend
.
Resumen del Estado de Instantánea
La instantánea representa el estado global del trabajo de Flink. Esto incluye un puntero a cada fuente de datos y el estado de todos los operadores con estado de Flink después de procesar hasta esos punteros desde las fuentes. El checkpointing en Apache Flink es un mecanismo para lograr tolerancia a fallos al guardar periódicamente el estado en un almacenamiento remoto duradero.
En caso de fallo del trabajo, Flink recupera el estado almacenado del almacenamiento remoto duradero y comienza a procesar los datos de transmisión desde donde lo dejó. Flink utiliza el snapshotting asincrónico de barreras. Es una variante del algoritmo Chandy-Lamport.
Flink admite dos tipos de checkpointing.
1. Checkpointing Completo
El checkpointing completo es donde se captura y almacena todo el estado del trabajo de Flink en un almacenamiento remoto duradero. En caso de falla del trabajo, este se recupera del estado almacenado previamente. El requisito de espacio de almacenamiento y el tiempo requerido para realizar el checkpointing dependen completamente del estado de la aplicación. El checkpointing completo funciona tanto con HashMapStateBackend
como con RocksDBStateBackend
.
2. Checkpointing Incremental
El checkpointing incremental es un enfoque optimizado. En lugar de tomar una instantánea de todo el estado, Flink guarda solo los ‘deltas’ realizados al estado desde el último checkpoint. Esto reduce la sobrecarga de red y, por lo tanto, el tiempo requerido para el checkpointing. En este caso, el checkpointing se realiza de manera completamente asíncrona.
Solo RocksDBStateBackend
admite el checkpointing incremental. Flink aprovecha el mecanismo interno de RocksDB para esto. Aunque el checkpointing toma menos tiempo que el checkpointing completo, en caso de falla del trabajo, el tiempo de recuperación depende de muchos factores. Si la red es un cuello de botella, el tiempo de recuperación puede ser mayor que el de recuperación del checkpointing completo.
Análisis
Detalles de la tubería: Apache Beam pipeline ejecutándose en el motor Flink con una ventana fija de 10 minutos y el checkpoint está configurado para ejecutarse cada 3 minutos. El tipo de serialización configurado es AVRO.
- Tipo de clúster: “m5dn.4xlarge”
- Almacenamiento final de checkpoint: S3
- Número de claves únicas: 2K
Input rate 10k/s (~ 7.8 MB/s) | |||||||||
---|---|---|---|---|---|---|---|---|---|
Tipo | No: de TM |
Paralelismo | Asignación de Heap por TM |
Uso de Heap por TM |
Uso de Memoria de Pod por TM |
Uso de CPU por TM |
Checkpoint Tamaño |
Checkpoint Duración |
Memoria Administrada por Flink |
HashMapState Con Checkpoint Completo |
1 | 1 | 10GB | 8.5GB | 11.1GB | 1.2 | 4GB | 50 seg | 0 |
RocksDBState Con Incremental Checkpoint con AVRO |
1 | 1 | 3GB | 1.82GB | 4.63GB | 1.5 | 207MB | 3 seg | 3GB |
Input rate 20k/s (~15.6 MB/s) | |||||||||
---|---|---|---|---|---|---|---|---|---|
Tipo | No: de TM |
Paralelismo | Asignación de Heap por TM |
Uso de Heap por TM |
Uso de Pod por TM |
Uso de CPU por TM |
Checkpoint Tamaño |
Checkpoint Duración |
Memoria Administrada por Flink |
HashMapState Con Checkpoint Completo |
2 | 2 | 10GB | 8.69GB | 11.2GB | 1.3 | 8.39GB | 50 sec | 0 |
RocksDBState Con Incremental Punto de control con AVRO |
2 | 2 | 3GB | 1.87GB | 4.71GB | 1.4 | 404MB | 3 seg | 3GB |
input rate 30k/s (~23.5 MB/s) | |||||||||
---|---|---|---|---|---|---|---|---|---|
Tipo | No: de TM |
Paralelismo | Asignación de montón por TM |
Uso de montón por TM |
Uso de Pod por TM |
Uso de CPU por TM |
Tamaño del Punto de Control | Duración del Punto de Control | Memoria Gestionada por Flink |
HashMapState Con Punto de control completo |
3 | 3 | 10GB | 9.74GB | 11.2GB | 1.2 | 11.8GB | 65 seg | 0 |
RocksDBState Con Incremental Punto de control con AVRO |
3 | 3 | 3GB | 1.72GB | 3.75GB | 1.4 | 576MB | 5 seg | 3GB |
Como se puede ver en el experimento anterior, la duración del punto de control disminuye con el punto de control incremental. Esto puede ayudar mucho con el rendimiento de la aplicación.
Resumen
A continuación se muestra el resumen del experimento.
HashMapStateBackend con Punto de Control Completo | RocksDBStateBackend con Checkpoint Incremental | |
Latencia de la Aplicación | Baja latencia porque los datos se almacenan como Objetos Java en el Montón. La lectura y escritura no implican ninguna serialización. | Dado que la serialización está involucrada en cada lectura o escritura de la aplicación, la latencia será mayor. |
Escala | Menos escalable para trabajos con un estado grande | Altamente escalable para trabajos con un estado grande y estados que cambian lentamente |
Tolerancia a Fallos | Altamente tolerante a fallos | Altamente tolerante a fallos |
Duración del Checkpoint | La duración del checkpoint es alta porque se está tomando una instantánea de todo el conjunto de datos cada vez. | La duración del checkpoint es menor porque solo se guarda el delta desde el último checkpoint. |
Complejidad de la Recuperación | La recuperación es fácil porque solo se tiene que cargar una instantánea. | La recuperación es compleja porque RocksDB tiene que construir el estado a partir de múltiples checkpoints y mucho depende de la velocidad de la red. |
Requisito de Almacenamiento | Soportado por HashMapStateBackend y RocksDBStatebackend. | Soportado solo por RocksDBStatebackend. |
Instantánea del Estado | Guarda todo el estado en cada checkpoint. | Guarda solo el delta desde el último exitoso. |
Tamaño del Montón | Dado que el estado se almacena en el Montón antes del checkpoint, el requisito de Montón es alto y se esperan más ciclos de GC. | Los estados se almacenan fuera de la memoria y posiblemente en el disco local, lo que significa menos espacio en el Heap y menos ciclos de GC. |
Tamaño del Backend de Estado | Limitado al máximo heap asignado a una JVM. | El tamaño del backend de estado de RocksDB no está limitado por el límite de Heap de la JVM, sino solo por el espacio en disco disponible. |
Impacto en el Rendimiento | Mayor impacto en el procesamiento porque es una instantánea completa. | Menor impacto en el procesamiento porque solo es el delta que se instantánea. |
CPU | El uso de CPU es solo para procesamiento y GC. No se involucra serialización del backend de estado. | El uso de CPU es mayor en comparación con el Checkpoint Completo para la misma tasa de datos de entrada.
La utilización de CPU se puede optimizar aplicando un mecanismo de serialización adecuado. Experimentamos con Avro y obtuvimos resultados mucho mejores en comparación con Kryo. |
Mejor Caso de Uso | Bueno para un tamaño de backend de estado más pequeño y estado que cambia con frecuencia. | Bueno para un backend de estado mayor y actualización lenta del estado. |
Source:
https://dzone.com/articles/apache-flink-full-checkpoint-vs-incremental-checkpoint