Apache Flink: Checkpoint Completo vs Checkpoint Incremental

O Apache Flink é um mecanismo de processamento de fluxo de dados em tempo real. A maioria das aplicações de processamento de fluxo são ‘stateful’. Isso significa que o estado é armazenado e usado para processamento adicional. No Apache Flink, o estado é gerenciado por meio de um back-end de estado configurado. Flink suporta dois back-ends de estado em produção. Um é o HashMapStateBackend, e o outro é o EmbeddedRocksDBStateBackend.

Para evitar a perda de dados e alcançar tolerância a falhas, o Flink pode persistir snapshots do estado em um armazenamento durável. O Flink pode ser configurado para fazer um snapshot de todo o estado em um local durável ou do delta desde o último snapshot. O primeiro é chamado de checkpoint completo, e o último é conhecido como checkpoint incremental.

Neste artigo, vamos comparar o HashMapStateBackend com checkpoint completo e o EmbeddedRocksDBStateBackend com checkpoint incremental. Este artigo pressupõe que a audiência tenha conhecimento prático ou teórico do Apache Flink.

Visão geral do Back-end de Estado do Flink

Para entender o back-end de estado do Flink, é importante saber a diferença entre estado em andamento e snapshots de estado.

O estado em andamento é conhecido como estado de trabalho do Flink e é armazenado localmente. Com base na configuração do back-end de estado, ele está na memória heap ou fora da heap, com possibilidade de transbordamento para o disco local.

Por outro lado, instantâneos de estado (checkpoint ou ponto de salvamento) são armazenados em um local remoto durável. Esses instantâneos são usados para reconstruir o estado do trabalho Flink em caso de falha do trabalho.

O estado em andamento pode ser perdido se o trabalho falhar. Isso não afeta a recuperação do trabalho se o checkpoint estiver habilitado no trabalho. Quando o checkpoint é configurado, o estado é recuperado do armazenamento durável no momento da recuperação.

Qual back-end de estado deve ser selecionado para a produção depende dos requisitos da aplicação para throughput, latência e escalabilidade.

Existem dois back-ends de estado que o Apache Flink suporta em produção.

1. HashMapStateBackend

É um back-end de estado leve no Flink para gerenciar o Estado Chaveado e o Estado do Operador durante o processamento de stream. O estado é armazenado no Heap Java usando uma estrutura de dados HashMap. Como é armazenado na memória, a principal restrição aqui é que o tamanho máximo do estado é limitado ao tamanho do Heap Java. Não há serialização envolvida na escrita ou leitura do estado. Portanto, isso é adequado para baixa latência, alto throughput e aplicações de estado não tão grandes.

2. EmbeddedRocksDBStateBackend

Este estado backend armazena os dados em trânsito no banco de dados RocksDB em memória. Por padrão, o RocksDB armazena os dados no disco local do gerenciador de tarefas. Os dados são serializados e armazenados em memória fora do heap e são transferidos para um disco local ligado ao gerenciador de tarefas. O formato de serialização depende do serializador de tipo configurado na aplicação. 

Com este estado backend, a quantidade de estado que pode ser armazenada é limitada apenas pelo espaço em disco ligado ao gerenciador de tarefas. Se a aplicação tiver um estado enorme e não puder ser contido na memória heap, este é o estado backend certo. Como a serialização está envolvida, a aplicação terá latência mais alta e throughput menor em comparação com HashMapStateBackend.

Visão geral do Snapshot State

O snapshot representa o estado global do Job Flink. Isso consiste em um ponteiro para cada fonte de dados e o estado de todos os operadores stateful do Flink após o processamento até esses ponteiros das fontes. O checkpointing no Apache Flink é um mecanismo para alcançar tolerância a falhas ao salvar periodicamente o estado em armazenamento remoto durável. 

No caso de falha do job, o Flink recupera o estado armazenado do armazenamento remoto durável e começa a processar os dados de streaming de onde parou. O Flink usa o snapshotting de barreira assíncrono. É uma variante do algoritmo Chandy-Lamport. 

O Flink suporta dois tipos de checkpointing.

1. Checkpointing Completo

O checkpointing completo é quando todo o estado do trabalho Flink é capturado e armazenado em armazenamento remoto durável. Em caso de falha do trabalho, o trabalho se recupera do estado armazenado anteriormente. O requisito de espaço de armazenamento e o tempo necessário para fazer o checkpoint são inteiramente dependentes do estado da aplicação. O checkpoint completo funciona com ambos HashMapStateBackend e RocksDBStateBackend.

2. Checkpointing Incremental

O checkpointing incremental é uma abordagem otimizada. Em vez de capturar o estado inteiro, o Flink salva apenas os ‘deltas’ feitos no estado desde o último checkpoint. Isso reduz a sobrecarga de rede e, portanto, o tempo necessário para o checkpoint. O checkpointing acontece totalmente de forma assíncrona neste caso. 

Apenas RocksDBStateBackend suporta o checkpointing incremental. O Flink aproveita o mecanismo interno do RocksDB para isso. Mesmo que o checkpointing leve menos tempo do que o checkpointing completo, em caso de falha do trabalho, o tempo de recuperação depende de muitos fatores. Se a rede for um gargalo, o tempo de recuperação pode ser maior do que a recuperação do checkpoint completo.

Análise

Detalhes do pipeline: pipeline Apache Beam sendo executado no motor Flink com uma janela fixa de 10 minutos e o checkpoint configurado para ser executado a cada 3 minutos. O tipo de serialização configurado é AVRO.

  • Tipo de cluster: “m5dn.4xlarge”
  • Armazenamento final do checkpoint: S3
  • Número de chaves únicas: 2K
Input rate 10k/s (~ 7.8 MB/s)
Digite Não:
de TM
Paralelismo Alocação de Heap
por TM
Uso de Heap
por TM
Uso de Memória do Pod
por TM
Uso da CPU
por TM
Ponto de Verificação
Tamanho
Duração do Ponto de Verificação
Duração
Memória Gerenciada do Flink
Memória
Estado do HashMap
Com Ponto de Verificação Completo
1 1 10GB 8,5GB 11,1GB 1,2 4GB 50 seg 0
Estado do RocksDB
Com Ponto de Verificação Incremental
com AVRO
1 1 3GB 1,82GB 4,63GB 1,5 207MB 3 seg 3GB
Input rate 20k/s (~15.6 MB/s)
Digite Não:
de TM
Paralelismo Alocação de Heap
por TM
Uso de Heap
por TM
Uso de Pod
por TM
Uso da CPU
por TM
Ponto de Verificação
Tamanho
Duração do Ponto de Verificação
Duração
Memória Gerenciada do Flink
Memória
Estado do HashMap
Com Ponto de Verificação Completo
2 2 10GB 8,69GB 11,2GB 1,3 8,39GB 50 seg 0
RocksDBState
Com Incremental
Checkpoint com AVRO
2 2 3GB 1,87GB 4,71GB 1,4 404MB 3 seg 3GB

input rate 30k/s (~23.5 MB/s)
Tipo  Não:
de TM
Paralelismo Alocação de Heap
por TM
Uso de Heap
por TM
Uso de Pod
por TM
Uso de CPU
por TM
Checkpoint
Tamanho
Duração do Checkpoint Memória Gerenciada pelo Flink
HashMapState
Com Checkpoint Completo
3 3 10GB 9,74GB 11,2GB 1,2 11,8GB 65 seg 0
RocksDBState
Com Incremental
 Checkpoint com AVRO
3 3 3GB 1,72GB 3,75GB 1,4 576MB 5 seg 3GB

Como pode ser visto no experimento acima, a duração do checkpoint diminui com o checkpoint incremental. Isso pode contribuir significativamente para o desempenho da aplicação.  

Resumo

Abaixo está o resumo do experimento.

HashMapStateBackend com Checkpoint Completo RocksDBStateBackend com Checkpoint Incremental
Latência da Aplicação Latência baixa porque os dados são armazenados como Objetos Java no Heap. A leitura e escrita não envolvem nenhuma serialização. Já que a serialização está envolvida em cada leitura ou escrita, a latência será maior.
Escala Menos escalável para trabalhos com estado grande Altamente escalável para trabalhos com estado grande e estados que mudam lentamente
Tolerância a Falhas Altamente tolerante a falhas Altamente tolerante a falhas
Duração do Checkpoint A duração do checkpoint é alta porque a captura instantânea está ocorrendo para todo o conjunto de dados a cada vez. A duração do checkpoint é menor porque apenas o delta desde o último checkpoint é salvo.
Complexidade da Recuperação A recuperação é fácil porque apenas um snapshot precisa ser carregado. A recuperação é complexa porque o RocksDB precisa construir o estado a partir de múltiplos checkpoints e muito depende da velocidade da rede.
Requisito de Armazenamento Compatível com HashMapStateBackend e RocksDBStatebackend Compatível apenas com RocksDBStatebackend
Snapshot do Estado Salva todo o estado em cada checkpoint Salva apenas o delta desde o último com sucesso
Tamanho do Heap Já que o estado é armazenado no Heap antes do checkpoint, o requisito de Heap é alto e mais ciclos de GC são esperados. Os estados são armazenados fora do heap e possivelmente no disco local, assim ocupando menos espaço de heap e menos ciclos de GC.
Tamanho do State Backend Limitado ao máximo de heap alocado para um JVM. O tamanho do state backend do RocksDB não é limitado pelo limite de heap da JVM, mas apenas pelo espaço em disco disponível.
Impacto no Desempenho Maior impacto no processamento porque é um snapshot completo. Menor impacto no processamento porque é apenas o delta que é snapshotado.
CPU O uso da CPU é apenas para processamento e GC. A serialização do state backend não está envolvida. O uso da CPU é maior em comparação com o Checkpoint completo para a mesma taxa de dados de entrada.

A utilização da CPU pode ser otimizada aplicando um mecanismo de serialização adequado. Experimentamos com Avro e obtivemos resultados muito melhores em comparação com Kryo

Melhor Caso de Uso Bom para um tamanho de state backend menor e estados que mudam com frequência. Bom para um state backend maior e estados que são atualizados lentamente.

Source:
https://dzone.com/articles/apache-flink-full-checkpoint-vs-incremental-checkpoint