Apache Flink: Punto de Control Completo vs Punto de Control Incremental

Apache Flink es un motor de procesamiento de datos en tiempo real. La mayoría de las aplicaciones de procesamiento de flujos son ‘stateful’. Esto significa que el estado se almacena y se utiliza para un procesamiento posterior. En Apache Flink, el estado se gestiona a través de un almacén de estado configurado. Flink admite dos almacenes de estado en producción. Uno es el HashMapStateBackend, y el otro es el EmbeddedRocksDBStateBackend

Para evitar la pérdida de datos y lograr la tolerancia a fallos, Flink puede persistir instantáneas del estado en un almacenamiento duradero. Flink puede configurarse para almacenar en una ubicación duradera tanto el estado completo como el delta desde la última instantánea. El primero se llama punto de control completo, y el segundo se conoce como punto de control incremental. 

En este artículo, vamos a comparar HashMapStateBackend con punto de control completo y EmbeddedRocksDBStateBackend con el uso de puntos de control incrementales. Este artículo asume que la audiencia tiene conocimientos prácticos o teóricos de Apache Flink

Resumen del Almacén de Estado de Flink

Para entender el almacén de estado de Flink, es importante conocer la diferencia entre estado en vuelo y instantáneas de estado

El estado en vuelo se conoce como estado de trabajo de Flink y se almacena localmente. Según la configuración del almacén de estado, puede estar en memoria heap o fuera de la memoria heap, con la posibilidad de desbordamiento al disco local.

Por otro lado, los state snapshots (punto de control o punto de guardado) se almacenan en una ubicación remota duradera. Estos snapshots se utilizan para reconstruir el estado del trabajo Flink en caso de un fallo del trabajo.

El estado en vuelo puede perderse si el trabajo falla. Esto no afecta a la recuperación del trabajo si el punto de control está habilitado en el trabajo. Cuando el punto de control está configurado, el estado se recupera desde el almacenamiento duradero en el momento de la recuperación.

Qué backend de estado se debe seleccionar para la producción depende de los requisitos de la aplicación en cuanto a rendimiento, latencia y escalabilidad.

Hay dos backends de estado que Apache Flink admite en producción.

1. HashMapStateBackend

Es un backend de estado ligero en Flink para gestionar el Estado Clave y el Estado del Operador durante el procesamiento de flujo. El estado se almacena en el montón de Java utilizando una estructura de datos HashMap. Dado que se almacena en memoria, la principal limitación aquí es que el tamaño máximo del estado está limitado al tamaño del montón de Java. No hay serialización involucrada al escribir en el estado o leer desde el estado. Por lo tanto, esto es adecuado para aplicaciones de baja latencia, alto rendimiento y estado no tan grande.

2. EmbeddedRocksDBStateBackend

Este estado backend almacena los datos en curso en la base de datos RocksDB en memoria. Por defecto, RocksDB almacena los datos en el disco local del administrador de tareas. Los datos se serializan y almacenan en memoria fuera del heap y luego se transfieren a un disco local conectado al administrador de tareas. El formato de serialización depende del serializador de tipo configurado en la aplicación. 

Con este estado backend, la cantidad de estado que se puede almacenar está limitada únicamente por el espacio en disco conectado al administrador de tareas. Si la aplicación tiene un estado enorme que no cabe en la memoria heap, este es el estado backend adecuado. Dado que implica serialización, la aplicación tendrá una latencia más alta y un rendimiento más bajo en comparación con HashMapStateBackend.

Resumen del Estado de Instantánea

La instantánea representa el estado global del trabajo de Flink. Esto incluye un puntero a cada fuente de datos y el estado de todos los operadores con estado de Flink después de procesar hasta esos punteros desde las fuentes. El checkpointing en Apache Flink es un mecanismo para lograr tolerancia a fallos al guardar periódicamente el estado en un almacenamiento remoto duradero. 

En caso de fallo del trabajo, Flink recupera el estado almacenado del almacenamiento remoto duradero y comienza a procesar los datos de transmisión desde donde lo dejó. Flink utiliza el snapshotting asincrónico de barreras. Es una variante del algoritmo Chandy-Lamport. 

Flink admite dos tipos de checkpointing.

1. Checkpointing Completo

El checkpointing completo es donde se captura y almacena todo el estado del trabajo de Flink en un almacenamiento remoto duradero. En caso de falla del trabajo, este se recupera del estado almacenado previamente. El requisito de espacio de almacenamiento y el tiempo requerido para realizar el checkpointing dependen completamente del estado de la aplicación. El checkpointing completo funciona tanto con HashMapStateBackend como con RocksDBStateBackend.

2. Checkpointing Incremental

El checkpointing incremental es un enfoque optimizado. En lugar de tomar una instantánea de todo el estado, Flink guarda solo los ‘deltas’ realizados al estado desde el último checkpoint. Esto reduce la sobrecarga de red y, por lo tanto, el tiempo requerido para el checkpointing. En este caso, el checkpointing se realiza de manera completamente asíncrona.

Solo RocksDBStateBackend admite el checkpointing incremental. Flink aprovecha el mecanismo interno de RocksDB para esto. Aunque el checkpointing toma menos tiempo que el checkpointing completo, en caso de falla del trabajo, el tiempo de recuperación depende de muchos factores. Si la red es un cuello de botella, el tiempo de recuperación puede ser mayor que el de recuperación del checkpointing completo.

Análisis

Detalles de la tubería: Apache Beam pipeline ejecutándose en el motor Flink con una ventana fija de 10 minutos y el checkpoint está configurado para ejecutarse cada 3 minutos. El tipo de serialización configurado es AVRO.

  • Tipo de clúster: “m5dn.4xlarge”
  • Almacenamiento final de checkpoint: S3
  • Número de claves únicas: 2K
Input rate 10k/s (~ 7.8 MB/s)
Tipo No:
de TM
Paralelismo Asignación de Heap
por TM
Uso de Heap
por TM
Uso de Memoria de Pod
por TM
Uso de CPU
por TM
Checkpoint
Tamaño
Checkpoint
Duración
Memoria Administrada por Flink
HashMapState
Con Checkpoint Completo
1 1 10GB 8.5GB 11.1GB 1.2 4GB 50 seg 0
RocksDBState
Con Incremental
 Checkpoint con AVRO
1 1 3GB 1.82GB 4.63GB 1.5 207MB 3 seg 3GB
Input rate 20k/s (~15.6 MB/s)
Tipo No:
de TM
Paralelismo Asignación de Heap
por TM
Uso de Heap
por TM
Uso de Pod
por TM
Uso de CPU
por TM
Checkpoint
Tamaño
Checkpoint
Duración
Memoria Administrada por Flink
HashMapState
Con Checkpoint Completo
2 2 10GB 8.69GB 11.2GB 1.3 8.39GB 50 sec 0
RocksDBState
Con Incremental
Punto de control con AVRO
2 2 3GB 1.87GB 4.71GB 1.4 404MB 3 seg 3GB

input rate 30k/s (~23.5 MB/s)
Tipo  No:
de TM
Paralelismo Asignación de montón
por TM
Uso de montón
por TM
Uso de Pod
por TM
Uso de CPU
por TM
Tamaño del Punto de Control Duración del Punto de Control Memoria Gestionada por Flink
HashMapState
Con Punto de control completo
3 3 10GB 9.74GB 11.2GB 1.2 11.8GB 65 seg 0
RocksDBState
Con Incremental
 Punto de control con AVRO
3 3 3GB 1.72GB 3.75GB 1.4 576MB 5 seg 3GB

Como se puede ver en el experimento anterior, la duración del punto de control disminuye con el punto de control incremental. Esto puede ayudar mucho con el rendimiento de la aplicación.  

Resumen

A continuación se muestra el resumen del experimento.

HashMapStateBackend con Punto de Control Completo RocksDBStateBackend con Checkpoint Incremental
Latencia de la Aplicación Baja latencia porque los datos se almacenan como Objetos Java en el Montón. La lectura y escritura no implican ninguna serialización. Dado que la serialización está involucrada en cada lectura o escritura de la aplicación, la latencia será mayor.
Escala Menos escalable para trabajos con un estado grande Altamente escalable para trabajos con un estado grande y estados que cambian lentamente
Tolerancia a Fallos Altamente tolerante a fallos Altamente tolerante a fallos
Duración del Checkpoint La duración del checkpoint es alta porque se está tomando una instantánea de todo el conjunto de datos cada vez. La duración del checkpoint es menor porque solo se guarda el delta desde el último checkpoint.
Complejidad de la Recuperación La recuperación es fácil porque solo se tiene que cargar una instantánea. La recuperación es compleja porque RocksDB tiene que construir el estado a partir de múltiples checkpoints y mucho depende de la velocidad de la red.
Requisito de Almacenamiento Soportado por HashMapStateBackend y RocksDBStatebackend. Soportado solo por RocksDBStatebackend.
Instantánea del Estado Guarda todo el estado en cada checkpoint. Guarda solo el delta desde el último exitoso.
Tamaño del Montón Dado que el estado se almacena en el Montón antes del checkpoint, el requisito de Montón es alto y se esperan más ciclos de GC. Los estados se almacenan fuera de la memoria y posiblemente en el disco local, lo que significa menos espacio en el Heap y menos ciclos de GC.
Tamaño del Backend de Estado Limitado al máximo heap asignado a una JVM. El tamaño del backend de estado de RocksDB no está limitado por el límite de Heap de la JVM, sino solo por el espacio en disco disponible.
Impacto en el Rendimiento Mayor impacto en el procesamiento porque es una instantánea completa. Menor impacto en el procesamiento porque solo es el delta que se instantánea.
CPU El uso de CPU es solo para procesamiento y GC. No se involucra serialización del backend de estado. El uso de CPU es mayor en comparación con el Checkpoint Completo para la misma tasa de datos de entrada.

La utilización de CPU se puede optimizar aplicando un mecanismo de serialización adecuado. Experimentamos con Avro y obtuvimos resultados mucho mejores en comparación con Kryo.

Mejor Caso de Uso Bueno para un tamaño de backend de estado más pequeño y estado que cambia con frecuencia. Bueno para un backend de estado mayor y actualización lenta del estado.

Source:
https://dzone.com/articles/apache-flink-full-checkpoint-vs-incremental-checkpoint