Apache Flink: Checkpoint Completo vs Checkpoint Incremental

O Apache Flink é um mecanismo de processamento de fluxo de dados em tempo real. A maioria das aplicações de processamento de fluxo são ‘com estado’. Isso significa que o estado é armazenado e usado para processamento adicional. No Apache Flink, o estado é gerenciado por meio de um estado de back-end configurado. O Flink suporta dois back-ends de estado em produção. Um é o HashMapStateBackend, e o outro é o EmbeddedRocksDBStateBackend

Para evitar a perda de dados e alcançar tolerância a falhas, o Flink pode persistir snapshots do estado em um armazenamento durável. O Flink pode ser configurado para capturar instantâneos do estado inteiro em um local durável ou o delta desde o último instantâneo. O primeiro é chamado de checkpoint completo, e o último é conhecido como checkpoint incremental. 

Neste artigo, vamos comparar o HashMapStateBackend com checkpoint completo e o EmbeddedRocksDBStateBackend com checkpoint incremental. Este artigo pressupõe que a audiência tenha conhecimento prático ou teórico do Apache Flink

Visão geral do Back-end de Estado do Flink

Para entender o back-end de estado do Flink, é importante saber a diferença entre estado em voo e instantâneos de estado

O estado em voo é conhecido como estado de trabalho do Flink e é armazenado localmente. Com base na configuração do back-end de estado, ele está em memória heap ou fora da heap, com possibilidade de overflow para o disco local.

Por outro lado, os state snapshots (ponto de verificação ou ponto de salvamento) são armazenados em um local remoto durável. Esses snapshots são usados para reconstruir o estado do trabalho do Flink em caso de falha do trabalho. 

O estado em voo pode ser perdido se o trabalho falhar. Isso não afeta a recuperação do trabalho se o ponto de verificação estiver habilitado no trabalho. Quando o ponto de verificação está configurado, o estado é recuperado do armazenamento durável no momento da recuperação. 

Qual backend de estado deve ser selecionado para a produção depende dos requisitos da aplicação em termos de throughput, latência e escalabilidade.

Existem dois backends de estado que o Apache Flink suporta em produção.

1. HashMapStateBackend

É um backend de estado leve no Flink para gerenciar o Keyed State e o Operator State durante o processamento de stream. O estado é armazenado no Java Heap usando uma estrutura de dados HashMap. Como é armazenado em memória, a principal restrição aqui é que o tamanho máximo do estado é limitado ao tamanho do Java Heap. Não há serialização envolvida na escrita ou leitura do estado. Portanto, isso é adequado para aplicações de baixa latência, alto throughput e estado não tão grande. 

2. EmbeddedRocksDBStateBackend

Este estado de backend armazena os dados em trânsito no banco de dados RocksDB em memória. Por padrão, o RocksDB armazena os dados no disco local do gerenciador de tarefas. Os dados são serializados e armazenados em uma memória off-heap e depois transferidos para um disco local conectado ao gerenciador de tarefas. O formato de serialização depende do serializador de tipo configurado na aplicação.

Com este estado de backend, a quantidade de estado que pode ser armazenada é limitada apenas pelo espaço em disco conectado ao gerenciador de tarefas. Se a aplicação tiver um estado enorme e não puder ser contido na memória heap, este é o estado de backend correto. Como a serialização está envolvida, a aplicação terá maior latência e menor taxa de transferência em comparação com o HashMapStateBackend.

Visão geral do Snapshot do Estado

O snapshot representa o estado global do Job do Flink. Isso consiste em um ponteiro para cada fonte de dados e o estado de todos os operadores stateful do Flink após o processamento até esses ponteiros das fontes. O checkpoint no Apache Flink é um mecanismo para alcançar tolerância a falhas salvando periodicamente o estado em um armazenamento remoto durável.

No caso de falha no trabalho, o Flink recupera o estado armazenado do armazenamento remoto durável e começa a processar os dados de streaming de onde parou. O Flink usa o snapshotting de barreira assíncrona. É uma variante do algoritmo Chandy-Lamport.

O Flink suporta dois tipos de checkpointing.

1. Checkpointing Completo

O checkpointing completo é onde todo o estado do trabalho Flink é capturado e armazenado em um armazenamento remoto durável. Em caso de falha do trabalho, o trabalho se recupera a partir do estado armazenado anteriormente. O espaço de armazenamento necessário e o tempo levado para fazer o checkpointing dependem inteiramente do estado da aplicação. O checkpointing completo funciona tanto com HashMapStateBackend quanto com RocksDBStateBackend.

2. Checkpointing Incremental

O checkpointing incremental é uma abordagem otimizada. Em vez de capturar todo o estado, o Flink salva apenas os ‘deltas’ feitos no estado desde o último checkpoint. Isso reduz a sobrecarga de rede e, assim, o tempo necessário para o checkpointing. O checkpointing ocorre totalmente de forma assíncrona neste caso.

Somente RocksDBStateBackend suporta o checkpointing incremental. O Flink aproveita o mecanismo interno do RocksDB para isso. Embora o checkpointing leve menos tempo do que o checkpointing completo, em caso de falha do trabalho, o tempo de recuperação depende de muitos fatores. Se a rede for um gargalo, o tempo de recuperação pode ser maior do que a recuperação do checkpointing completo.

Análise

Detalhes do pipeline: Apache Beam pipeline rodando no mecanismo Flink com uma janela fixa de 10 minutos e o checkpoint está configurado para ser executado a cada 3 minutos. O tipo de serialização configurado é AVRO.

  • Tipo de cluster: “m5dn.4xlarge”
  • Armazenamento final do checkpoint: S3
  • Número de chaves únicas: 2K
Input rate 10k/s (~ 7.8 MB/s)
Tipo Nº:
de TM
Paralelismo Aloação de Heap
por TM
Uso de Heap
por TM
Uso de Memória do Pod
por TM
Uso de CPU
por TM
Ponto de Verificação
Tamanho
Ponto de Verificação
Duração
Memória Gerenciada pelo Flink
HashMapState
Com Ponto de Verificação Completo
1 1 10GB 8.5GB 11.1GB 1.2 4GB 50 seg 0
RocksDBState
Com Ponto de Verificação Incremental
 com AVRO
1 1 3GB 1.82GB 4.63GB 1.5 207MB 3 seg 3GB
Input rate 20k/s (~15.6 MB/s)
Tipo Nº:
de TM
Paralelismo Aloação de Heap
por TM
Uso de Heap
por TM
Uso do Pod
por TM
Uso de CPU
por TM
Ponto de Verificação
Tamanho
Ponto de Verificação
Duração
Memória Gerenciada pelo Flink
HashMapState
Com Ponto de Verificação Completo
2 2 10GB 8.69GB 11.2GB 1.3 8.39GB 50 seg 0
RocksDBState
Com Checkpoint Incremental
com AVRO
2 2 3GB 1.87GB 4.71GB 1.4 404MB 3 seg 3GB

input rate 30k/s (~23.5 MB/s)
Tipo Nº:
de TM
Paralelismo Aloação de Heap
por TM
Uso de Heap
por TM
Uso de Pod
por TM
Uso de CPU
por TM
Checkpoint
Tamanho
Checkpoint
Duração
Gerenciado pelo Flink
Memória
HashMapState
Com Checkpoint Completo
3 3 10GB 9.74GB 11.2GB 1.2 11.8GB 65 seg 0
RocksDBState
Com Checkpoint Incremental
 com AVRO
3 3 3GB 1.72GB 3.75GB 1.4 576MB 5 seg 3GB

Como você pode ver pelo experimento acima, a duração do checkpoint diminui com o checkpoint incremental. Isso pode ajudar muito no desempenho da aplicação.

Resumo

Abaixo está o resumo do experimento.

HashMapStateBackend com Checkpoint Completo RocksDBStateBackend com Checkpoint Incremental
Latência da Aplicação Baixa latência porque os dados são armazenados como Objetos Java na Heap. A leitura e escrita não envolvem qualquer serialização. Já que a serialização está envolvida em cada leitura ou escrita, a latência será maior.
Escala Menos escalável para trabalhos com grande estado Altamente escalável para trabalhos com grande estado e estados que mudam lentamente
Tolerância a Falhas Altamente tolerante a falhas Altamente tolerante a falhas
Duração do Checkpoint A duração do checkpoint é alta porque a criação de snapshots ocorre para todo o conjunto de dados a cada vez. A duração do checkpoint é menor porque apenas o delta desde o último checkpoint é salvo.
Complexidade da Recuperação A recuperação é fácil porque apenas um snapshot precisa ser carregado. A recuperação é complexa porque o RocksDB precisa construir o estado a partir de múltiplos checkpoints e muito depende da velocidade da rede.
Requisito de Armazenamento Compatível com HashMapStateBackend e RocksDBStatebackend. Compatível apenas com RocksDBStatebackend.
Snapshot do Estado Salva o estado inteiro em cada checkpoint. Salva apenas o delta desde o último com sucesso.
Tamanho da Heap Como o estado é armazenado na Heap antes do checkpoint, o requisito de Heap é alto, e espera-se mais ciclos de GC. Os estados são armazenados fora do heap e possivelmente no disco local, assim ocupando menos espaço de heap e gerando menos ciclos de GC.
Tamanho do State Backend Limitado ao máximo de heap alocado para uma JVM. O tamanho do state backend do RocksDB não é limitado pelo limite de heap da JVM, apenas pelo espaço em disco disponível.
Impacto de Desempenho Maior impacto no processamento, pois é um snapshot completo. Menor impacto no processamento, pois é apenas o delta que é snapshotted.
CPU O uso da CPU é apenas para processamento e GC. Não há serialização do back-end de estado envolvida. O uso da CPU é maior em comparação com o Checkpoint Completo para a mesma taxa de dados de entrada.

A utilização da CPU pode ser otimizada aplicando um mecanismo de serialização adequado. Experimentamos com Avro e obtivemos resultados muito melhores em comparação com o Kryo

Melhor Caso de Uso Bom para tamanhos menores de state backend e estados que mudam frequentemente. Bom para state backend maior e estados que são atualizados lentamente.

Source:
https://dzone.com/articles/apache-flink-full-checkpoint-vs-incremental-checkpoint