O Apache Flink é um mecanismo de processamento de fluxo de dados em tempo real. A maioria das aplicações de processamento de fluxo são ‘stateful’. Isso significa que o estado é armazenado e usado para processamento adicional. No Apache Flink, o estado é gerenciado por meio de um back-end de estado configurado. Flink suporta dois back-ends de estado em produção. Um é o HashMapStateBackend
, e o outro é o EmbeddedRocksDBStateBackend
.
Para evitar a perda de dados e alcançar tolerância a falhas, o Flink pode persistir snapshots do estado em um armazenamento durável. O Flink pode ser configurado para fazer um snapshot de todo o estado em um local durável ou do delta desde o último snapshot. O primeiro é chamado de checkpoint completo, e o último é conhecido como checkpoint incremental.
Neste artigo, vamos comparar o HashMapStateBackend
com checkpoint completo e o EmbeddedRocksDBStateBackend
com checkpoint incremental. Este artigo pressupõe que a audiência tenha conhecimento prático ou teórico do Apache Flink.
Visão geral do Back-end de Estado do Flink
Para entender o back-end de estado do Flink, é importante saber a diferença entre estado em andamento e snapshots de estado.
O estado em andamento é conhecido como estado de trabalho do Flink e é armazenado localmente. Com base na configuração do back-end de estado, ele está na memória heap ou fora da heap, com possibilidade de transbordamento para o disco local.
Por outro lado, instantâneos de estado (checkpoint ou ponto de salvamento) são armazenados em um local remoto durável. Esses instantâneos são usados para reconstruir o estado do trabalho Flink em caso de falha do trabalho.
O estado em andamento pode ser perdido se o trabalho falhar. Isso não afeta a recuperação do trabalho se o checkpoint estiver habilitado no trabalho. Quando o checkpoint é configurado, o estado é recuperado do armazenamento durável no momento da recuperação.
Qual back-end de estado deve ser selecionado para a produção depende dos requisitos da aplicação para throughput, latência e escalabilidade.
Existem dois back-ends de estado que o Apache Flink suporta em produção.
1. HashMapStateBackend
É um back-end de estado leve no Flink para gerenciar o Estado Chaveado e o Estado do Operador durante o processamento de stream. O estado é armazenado no Heap Java usando uma estrutura de dados HashMap. Como é armazenado na memória, a principal restrição aqui é que o tamanho máximo do estado é limitado ao tamanho do Heap Java. Não há serialização envolvida na escrita ou leitura do estado. Portanto, isso é adequado para baixa latência, alto throughput e aplicações de estado não tão grandes.
2. EmbeddedRocksDBStateBackend
Este estado backend armazena os dados em trânsito no banco de dados RocksDB em memória. Por padrão, o RocksDB armazena os dados no disco local do gerenciador de tarefas. Os dados são serializados e armazenados em memória fora do heap e são transferidos para um disco local ligado ao gerenciador de tarefas. O formato de serialização depende do serializador de tipo configurado na aplicação.
Com este estado backend, a quantidade de estado que pode ser armazenada é limitada apenas pelo espaço em disco ligado ao gerenciador de tarefas. Se a aplicação tiver um estado enorme e não puder ser contido na memória heap, este é o estado backend certo. Como a serialização está envolvida, a aplicação terá latência mais alta e throughput menor em comparação com HashMapStateBackend
.
Visão geral do Snapshot State
O snapshot representa o estado global do Job Flink. Isso consiste em um ponteiro para cada fonte de dados e o estado de todos os operadores stateful do Flink após o processamento até esses ponteiros das fontes. O checkpointing no Apache Flink é um mecanismo para alcançar tolerância a falhas ao salvar periodicamente o estado em armazenamento remoto durável.
No caso de falha do job, o Flink recupera o estado armazenado do armazenamento remoto durável e começa a processar os dados de streaming de onde parou. O Flink usa o snapshotting de barreira assíncrono. É uma variante do algoritmo Chandy-Lamport.
O Flink suporta dois tipos de checkpointing.
1. Checkpointing Completo
O checkpointing completo é quando todo o estado do trabalho Flink é capturado e armazenado em armazenamento remoto durável. Em caso de falha do trabalho, o trabalho se recupera do estado armazenado anteriormente. O requisito de espaço de armazenamento e o tempo necessário para fazer o checkpoint são inteiramente dependentes do estado da aplicação. O checkpoint completo funciona com ambos HashMapStateBackend
e RocksDBStateBackend
.
2. Checkpointing Incremental
O checkpointing incremental é uma abordagem otimizada. Em vez de capturar o estado inteiro, o Flink salva apenas os ‘deltas’ feitos no estado desde o último checkpoint. Isso reduz a sobrecarga de rede e, portanto, o tempo necessário para o checkpoint. O checkpointing acontece totalmente de forma assíncrona neste caso.
Apenas RocksDBStateBackend
suporta o checkpointing incremental. O Flink aproveita o mecanismo interno do RocksDB para isso. Mesmo que o checkpointing leve menos tempo do que o checkpointing completo, em caso de falha do trabalho, o tempo de recuperação depende de muitos fatores. Se a rede for um gargalo, o tempo de recuperação pode ser maior do que a recuperação do checkpoint completo.
Análise
Detalhes do pipeline: pipeline Apache Beam sendo executado no motor Flink com uma janela fixa de 10 minutos e o checkpoint configurado para ser executado a cada 3 minutos. O tipo de serialização configurado é AVRO.
- Tipo de cluster: “m5dn.4xlarge”
- Armazenamento final do checkpoint: S3
- Número de chaves únicas: 2K
Input rate 10k/s (~ 7.8 MB/s) | |||||||||
---|---|---|---|---|---|---|---|---|---|
Digite | Não: de TM |
Paralelismo | Alocação de Heap por TM |
Uso de Heap por TM |
Uso de Memória do Pod por TM |
Uso da CPU por TM |
Ponto de Verificação Tamanho |
Duração do Ponto de Verificação Duração |
Memória Gerenciada do Flink Memória |
Estado do HashMap Com Ponto de Verificação Completo |
1 | 1 | 10GB | 8,5GB | 11,1GB | 1,2 | 4GB | 50 seg | 0 |
Estado do RocksDB Com Ponto de Verificação Incremental com AVRO |
1 | 1 | 3GB | 1,82GB | 4,63GB | 1,5 | 207MB | 3 seg | 3GB |
Input rate 20k/s (~15.6 MB/s) | |||||||||
---|---|---|---|---|---|---|---|---|---|
Digite | Não: de TM |
Paralelismo | Alocação de Heap por TM |
Uso de Heap por TM |
Uso de Pod por TM |
Uso da CPU por TM |
Ponto de Verificação Tamanho |
Duração do Ponto de Verificação Duração |
Memória Gerenciada do Flink Memória |
Estado do HashMap Com Ponto de Verificação Completo |
2 | 2 | 10GB | 8,69GB | 11,2GB | 1,3 | 8,39GB | 50 seg | 0 |
RocksDBState Com Incremental Checkpoint com AVRO |
2 | 2 | 3GB | 1,87GB | 4,71GB | 1,4 | 404MB | 3 seg | 3GB |
input rate 30k/s (~23.5 MB/s) | |||||||||
---|---|---|---|---|---|---|---|---|---|
Tipo | Não: de TM |
Paralelismo | Alocação de Heap por TM |
Uso de Heap por TM |
Uso de Pod por TM |
Uso de CPU por TM |
Checkpoint Tamanho |
Duração do Checkpoint | Memória Gerenciada pelo Flink |
HashMapState Com Checkpoint Completo |
3 | 3 | 10GB | 9,74GB | 11,2GB | 1,2 | 11,8GB | 65 seg | 0 |
RocksDBState Com Incremental Checkpoint com AVRO |
3 | 3 | 3GB | 1,72GB | 3,75GB | 1,4 | 576MB | 5 seg | 3GB |
Como pode ser visto no experimento acima, a duração do checkpoint diminui com o checkpoint incremental. Isso pode contribuir significativamente para o desempenho da aplicação.
Resumo
Abaixo está o resumo do experimento.
HashMapStateBackend com Checkpoint Completo | RocksDBStateBackend com Checkpoint Incremental | |
Latência da Aplicação | Latência baixa porque os dados são armazenados como Objetos Java no Heap. A leitura e escrita não envolvem nenhuma serialização. | Já que a serialização está envolvida em cada leitura ou escrita, a latência será maior. |
Escala | Menos escalável para trabalhos com estado grande | Altamente escalável para trabalhos com estado grande e estados que mudam lentamente |
Tolerância a Falhas | Altamente tolerante a falhas | Altamente tolerante a falhas |
Duração do Checkpoint | A duração do checkpoint é alta porque a captura instantânea está ocorrendo para todo o conjunto de dados a cada vez. | A duração do checkpoint é menor porque apenas o delta desde o último checkpoint é salvo. |
Complexidade da Recuperação | A recuperação é fácil porque apenas um snapshot precisa ser carregado. | A recuperação é complexa porque o RocksDB precisa construir o estado a partir de múltiplos checkpoints e muito depende da velocidade da rede. |
Requisito de Armazenamento | Compatível com HashMapStateBackend e RocksDBStatebackend | Compatível apenas com RocksDBStatebackend |
Snapshot do Estado | Salva todo o estado em cada checkpoint | Salva apenas o delta desde o último com sucesso |
Tamanho do Heap | Já que o estado é armazenado no Heap antes do checkpoint, o requisito de Heap é alto e mais ciclos de GC são esperados. | Os estados são armazenados fora do heap e possivelmente no disco local, assim ocupando menos espaço de heap e menos ciclos de GC. |
Tamanho do State Backend | Limitado ao máximo de heap alocado para um JVM. | O tamanho do state backend do RocksDB não é limitado pelo limite de heap da JVM, mas apenas pelo espaço em disco disponível. |
Impacto no Desempenho | Maior impacto no processamento porque é um snapshot completo. | Menor impacto no processamento porque é apenas o delta que é snapshotado. |
CPU | O uso da CPU é apenas para processamento e GC. A serialização do state backend não está envolvida. | O uso da CPU é maior em comparação com o Checkpoint completo para a mesma taxa de dados de entrada.
A utilização da CPU pode ser otimizada aplicando um mecanismo de serialização adequado. Experimentamos com Avro e obtivemos resultados muito melhores em comparação com Kryo |
Melhor Caso de Uso | Bom para um tamanho de state backend menor e estados que mudam com frequência. | Bom para um state backend maior e estados que são atualizados lentamente. |
Source:
https://dzone.com/articles/apache-flink-full-checkpoint-vs-incremental-checkpoint