O Apache Flink é um mecanismo de processamento de fluxo de dados em tempo real. A maioria das aplicações de processamento de fluxo são ‘com estado’. Isso significa que o estado é armazenado e usado para processamento adicional. No Apache Flink, o estado é gerenciado por meio de um estado de back-end configurado. O Flink suporta dois back-ends de estado em produção. Um é o HashMapStateBackend
, e o outro é o EmbeddedRocksDBStateBackend
.
Para evitar a perda de dados e alcançar tolerância a falhas, o Flink pode persistir snapshots do estado em um armazenamento durável. O Flink pode ser configurado para capturar instantâneos do estado inteiro em um local durável ou o delta desde o último instantâneo. O primeiro é chamado de checkpoint completo, e o último é conhecido como checkpoint incremental.
Neste artigo, vamos comparar o HashMapStateBackend
com checkpoint completo e o EmbeddedRocksDBStateBackend
com checkpoint incremental. Este artigo pressupõe que a audiência tenha conhecimento prático ou teórico do Apache Flink.
Visão geral do Back-end de Estado do Flink
Para entender o back-end de estado do Flink, é importante saber a diferença entre estado em voo e instantâneos de estado.
O estado em voo é conhecido como estado de trabalho do Flink e é armazenado localmente. Com base na configuração do back-end de estado, ele está em memória heap ou fora da heap, com possibilidade de overflow para o disco local.
Por outro lado, os state snapshots (ponto de verificação ou ponto de salvamento) são armazenados em um local remoto durável. Esses snapshots são usados para reconstruir o estado do trabalho do Flink em caso de falha do trabalho.
O estado em voo pode ser perdido se o trabalho falhar. Isso não afeta a recuperação do trabalho se o ponto de verificação estiver habilitado no trabalho. Quando o ponto de verificação está configurado, o estado é recuperado do armazenamento durável no momento da recuperação.
Qual backend de estado deve ser selecionado para a produção depende dos requisitos da aplicação em termos de throughput, latência e escalabilidade.
Existem dois backends de estado que o Apache Flink suporta em produção.
1. HashMapStateBackend
É um backend de estado leve no Flink para gerenciar o Keyed State e o Operator State durante o processamento de stream. O estado é armazenado no Java Heap usando uma estrutura de dados HashMap. Como é armazenado em memória, a principal restrição aqui é que o tamanho máximo do estado é limitado ao tamanho do Java Heap. Não há serialização envolvida na escrita ou leitura do estado. Portanto, isso é adequado para aplicações de baixa latência, alto throughput e estado não tão grande.
2. EmbeddedRocksDBStateBackend
Este estado de backend armazena os dados em trânsito no banco de dados RocksDB em memória. Por padrão, o RocksDB armazena os dados no disco local do gerenciador de tarefas. Os dados são serializados e armazenados em uma memória off-heap e depois transferidos para um disco local conectado ao gerenciador de tarefas. O formato de serialização depende do serializador de tipo configurado na aplicação.
Com este estado de backend, a quantidade de estado que pode ser armazenada é limitada apenas pelo espaço em disco conectado ao gerenciador de tarefas. Se a aplicação tiver um estado enorme e não puder ser contido na memória heap, este é o estado de backend correto. Como a serialização está envolvida, a aplicação terá maior latência e menor taxa de transferência em comparação com o HashMapStateBackend
.
Visão geral do Snapshot do Estado
O snapshot representa o estado global do Job do Flink. Isso consiste em um ponteiro para cada fonte de dados e o estado de todos os operadores stateful do Flink após o processamento até esses ponteiros das fontes. O checkpoint no Apache Flink é um mecanismo para alcançar tolerância a falhas salvando periodicamente o estado em um armazenamento remoto durável.
No caso de falha no trabalho, o Flink recupera o estado armazenado do armazenamento remoto durável e começa a processar os dados de streaming de onde parou. O Flink usa o snapshotting de barreira assíncrona. É uma variante do algoritmo Chandy-Lamport.
O Flink suporta dois tipos de checkpointing.
1. Checkpointing Completo
O checkpointing completo é onde todo o estado do trabalho Flink é capturado e armazenado em um armazenamento remoto durável. Em caso de falha do trabalho, o trabalho se recupera a partir do estado armazenado anteriormente. O espaço de armazenamento necessário e o tempo levado para fazer o checkpointing dependem inteiramente do estado da aplicação. O checkpointing completo funciona tanto com HashMapStateBackend
quanto com RocksDBStateBackend
.
2. Checkpointing Incremental
O checkpointing incremental é uma abordagem otimizada. Em vez de capturar todo o estado, o Flink salva apenas os ‘deltas’ feitos no estado desde o último checkpoint. Isso reduz a sobrecarga de rede e, assim, o tempo necessário para o checkpointing. O checkpointing ocorre totalmente de forma assíncrona neste caso.
Somente RocksDBStateBackend
suporta o checkpointing incremental. O Flink aproveita o mecanismo interno do RocksDB para isso. Embora o checkpointing leve menos tempo do que o checkpointing completo, em caso de falha do trabalho, o tempo de recuperação depende de muitos fatores. Se a rede for um gargalo, o tempo de recuperação pode ser maior do que a recuperação do checkpointing completo.
Análise
Detalhes do pipeline: Apache Beam pipeline rodando no mecanismo Flink com uma janela fixa de 10 minutos e o checkpoint está configurado para ser executado a cada 3 minutos. O tipo de serialização configurado é AVRO.
- Tipo de cluster: “m5dn.4xlarge”
- Armazenamento final do checkpoint: S3
- Número de chaves únicas: 2K
Input rate 10k/s (~ 7.8 MB/s) | |||||||||
---|---|---|---|---|---|---|---|---|---|
Tipo | Nº: de TM |
Paralelismo | Aloação de Heap por TM |
Uso de Heap por TM |
Uso de Memória do Pod por TM |
Uso de CPU por TM |
Ponto de Verificação Tamanho |
Ponto de Verificação Duração |
Memória Gerenciada pelo Flink |
HashMapState Com Ponto de Verificação Completo |
1 | 1 | 10GB | 8.5GB | 11.1GB | 1.2 | 4GB | 50 seg | 0 |
RocksDBState Com Ponto de Verificação Incremental com AVRO |
1 | 1 | 3GB | 1.82GB | 4.63GB | 1.5 | 207MB | 3 seg | 3GB |
Input rate 20k/s (~15.6 MB/s) | |||||||||
---|---|---|---|---|---|---|---|---|---|
Tipo | Nº: de TM |
Paralelismo | Aloação de Heap por TM |
Uso de Heap por TM |
Uso do Pod por TM |
Uso de CPU por TM |
Ponto de Verificação Tamanho |
Ponto de Verificação Duração |
Memória Gerenciada pelo Flink |
HashMapState Com Ponto de Verificação Completo |
2 | 2 | 10GB | 8.69GB | 11.2GB | 1.3 | 8.39GB | 50 seg | 0 |
RocksDBState Com Checkpoint Incremental com AVRO |
2 | 2 | 3GB | 1.87GB | 4.71GB | 1.4 | 404MB | 3 seg | 3GB |
input rate 30k/s (~23.5 MB/s) | |||||||||
---|---|---|---|---|---|---|---|---|---|
Tipo | Nº: de TM |
Paralelismo | Aloação de Heap por TM |
Uso de Heap por TM |
Uso de Pod por TM |
Uso de CPU por TM |
Checkpoint Tamanho |
Checkpoint Duração |
Gerenciado pelo Flink Memória |
HashMapState Com Checkpoint Completo |
3 | 3 | 10GB | 9.74GB | 11.2GB | 1.2 | 11.8GB | 65 seg | 0 |
RocksDBState Com Checkpoint Incremental com AVRO |
3 | 3 | 3GB | 1.72GB | 3.75GB | 1.4 | 576MB | 5 seg | 3GB |
Como você pode ver pelo experimento acima, a duração do checkpoint diminui com o checkpoint incremental. Isso pode ajudar muito no desempenho da aplicação.
Resumo
Abaixo está o resumo do experimento.
HashMapStateBackend com Checkpoint Completo | RocksDBStateBackend com Checkpoint Incremental | |
Latência da Aplicação | Baixa latência porque os dados são armazenados como Objetos Java na Heap. A leitura e escrita não envolvem qualquer serialização. | Já que a serialização está envolvida em cada leitura ou escrita, a latência será maior. |
Escala | Menos escalável para trabalhos com grande estado | Altamente escalável para trabalhos com grande estado e estados que mudam lentamente |
Tolerância a Falhas | Altamente tolerante a falhas | Altamente tolerante a falhas |
Duração do Checkpoint | A duração do checkpoint é alta porque a criação de snapshots ocorre para todo o conjunto de dados a cada vez. | A duração do checkpoint é menor porque apenas o delta desde o último checkpoint é salvo. |
Complexidade da Recuperação | A recuperação é fácil porque apenas um snapshot precisa ser carregado. | A recuperação é complexa porque o RocksDB precisa construir o estado a partir de múltiplos checkpoints e muito depende da velocidade da rede. |
Requisito de Armazenamento | Compatível com HashMapStateBackend e RocksDBStatebackend. | Compatível apenas com RocksDBStatebackend. |
Snapshot do Estado | Salva o estado inteiro em cada checkpoint. | Salva apenas o delta desde o último com sucesso. |
Tamanho da Heap | Como o estado é armazenado na Heap antes do checkpoint, o requisito de Heap é alto, e espera-se mais ciclos de GC. | Os estados são armazenados fora do heap e possivelmente no disco local, assim ocupando menos espaço de heap e gerando menos ciclos de GC. |
Tamanho do State Backend | Limitado ao máximo de heap alocado para uma JVM. | O tamanho do state backend do RocksDB não é limitado pelo limite de heap da JVM, apenas pelo espaço em disco disponível. |
Impacto de Desempenho | Maior impacto no processamento, pois é um snapshot completo. | Menor impacto no processamento, pois é apenas o delta que é snapshotted. |
CPU | O uso da CPU é apenas para processamento e GC. Não há serialização do back-end de estado envolvida. | O uso da CPU é maior em comparação com o Checkpoint Completo para a mesma taxa de dados de entrada.
A utilização da CPU pode ser otimizada aplicando um mecanismo de serialização adequado. Experimentamos com Avro e obtivemos resultados muito melhores em comparação com o Kryo |
Melhor Caso de Uso | Bom para tamanhos menores de state backend e estados que mudam frequentemente. | Bom para state backend maior e estados que são atualizados lentamente. |
Source:
https://dzone.com/articles/apache-flink-full-checkpoint-vs-incremental-checkpoint