Apache Flink: Полная точка сохранения против Инкрементальной точки сохранения

Apache Flink — это движок обработки данных в реальном времени. Большинство приложений для обработки потоков являются «состоянием». Это означает, что состояние сохраняется и используется для дальнейшей обработки. В Apache Flink состояние управляется через настроенный бэкенд состояния. Flink поддерживает два бэкенда состояния в продакшене. Один из них — это HashMapStateBackend, а другой — EmbeddedRocksDBStateBackend.

Чтобы предотвратить потерю данных и достичь отказоустойчивости, Flink может сохранять снимки состояния в надежное хранилище. Flink можно настроить так, чтобы он снимал либо все состояние в надежное место, либо дельту с момента последнего снимка. Первое называется полным контрольным снимком, а второе известно как инкрементальный контрольный снимок.

В этой статье мы собираемся сравнить HashMapStateBackend с полным контрольным снимком и EmbeddedRocksDBStateBackend с инкрементальным контрольным снимком. Эта статья предполагает, что аудитория имеет либо практические, либо теоретические знания о Apache Flink.

Обзор бэкенда состояния Flink

Чтобы понять бэкенд состояния Flink, важно знать разницу между состоянием в процессе обработки и снимками состояния.

Состояние в процессе обработки известно как рабочее состояние Flink и хранится локально. В зависимости от конфигурации бэкенда состояния, оно находится либо в управляемой памяти, либо в памяти вне кучи, с возможным сбросом на локальный диск.

С другой стороны, снимки состояния (точки контроля или точки сохранения) хранятся в надежном удаленном месте. Эти снимки используются для восстановления состояния задачи Flink в случае сбоя задачи.

Состояние в полете может быть потеряно, если задача завершится неудачно. Это не влияет на восстановление задачи, если точка контроля включена в работу задачи. Когда настроена точка контроля, состояние извлекается из надежного хранилища во время восстановления.

Какое хранилище состояния выбрать для продакшена зависит от требований приложения к пропускной способности, задержке и масштабируемости.

В продакшене Apache Flink поддерживает два хранилища состояния.

1. HashMapStateBackend

Это легкое хранилище состояния в Flink для управления состоянием ключа и состоянием оператора во время обработки потока. Состояние хранится в Java Heap с использованием структуры данных HashMap. Поскольку оно хранится в памяти, основным ограничением здесь является то, что максимальный размер состояния ограничен размером Java Heap. При записи в состояние или чтении из состояния сериализация не выполняется. Поэтому это подходит для приложений с низкой задержкой, высокой пропускной способностью и не слишком большим объемом состояния.

2. EmbeddedRocksDBStateBackend

Этот бэкенд состояния хранит данные в процессе обработки в базе данных RocksDB в памяти. По умолчанию RocksDB хранит данные на локальном диске менеджера задач. Данные сериализуются и хранятся в памяти вне кучи, а затем сбрасываются на локальный диск, подключенный к менеджеру задач. Формат сериализации зависит от типа сериализатора, настроенного в приложении.

С этим бэкендом состояния количество хранимого состояния ограничивается только дисковым пространством, подключенным к менеджеру задач. Если приложение имеет большое состояние и не может быть помещено в кучу, это правильный бэкенд состояния. Поскольку используется сериализация, приложение будет иметь более высокую задержку и меньшую пропускную способность по сравнению с HashMapStateBackend.

Обзор состояния снимка

Снимок представляет глобальное состояние задания Flink. Это состоит из указателя на каждый источник данных и состояния всех операторов с состоянием Flink после обработки данных до этих указателей из источников. К checkpointing в Apache Flink — это механизм достижения отказоустойчивости путем периодического сохранения состояния в надежное удаленное хранилище.

В случае сбоя задания Flink восстанавливает сохраненное состояние из надежного удаленного хранилища и начинает обрабатывать потоковые данные с того места, где остановился. Flink использует асинхронное создание снимков барьеров. Это вариант алгоритма Чанди-Лампорта.

Flink поддерживает два типа создания контрольных точек.

1. Полное создание контрольных точек

Полное чекпоинтинг — это когда всё состояние задания Flink захватывается и сохраняется в надежном удаленном хранилище. В случае сбоя задания оно восстанавливается из ранее сохраненного состояния. Требования к дисковому пространству и время, затрачиваемое на чекпоинтинг, полностью зависят от состояния приложения. Полное чекпоинтинг работает как с HashMapStateBackend, так и с RocksDBStateBackend.

2. Инкрементный чекпоинтинг

Инкрементный чекпоинтинг — это оптимизированный подход. Вместо того, чтобы создавать снимок всего состояния, Flink сохраняет только «дельты», внесенные в состояние с момента последнего чекпоинта. Это уменьшает сетевые накладные расходы и, таким образом, время, необходимое для чекпоинтинга. В этом случае чекпоинтинг происходит полностью асинхронно.

Только RocksDBStateBackend поддерживает инкрементный чекпоинтинг. Flink использует внутренний механизм RocksDB для этого. Несмотря на то что чекпоинтинг занимает меньше времени, чем полное чекпоинтинг, в случае сбоя задания время восстановления зависит от многих факторов. Если сеть является узким местом, время восстановления может быть больше, чем время восстановления из полного чекпоинтинга.

Анализ

Детали пайплайна: Apache Beam пайплайн, работающий на движке Flink с фиксированным окном в 10 минут, а чекпоинт настроен на выполнение каждые 3 минуты. Настроенный тип сериализации — AVRO.

  • Тип кластера: “m5dn.4xlarge”
  • Хранилище для финального чекпоинта: S3
  • Количество уникальных ключей: 2K
Input rate 10k/s (~ 7.8 MB/s)
Тип №:
TM
Параллелизм Выделение памяти в куче
на TM
Использование кучи
на TM
Использование памяти пода
на TM
Использование ЦП
на TM
Контрольная точка
Размер
Контрольная точка
Длительность
Управляемая Flink
Память
HashMapState
С Полной контрольной точкой
1 1 10 ГБ 8.5 ГБ 11.1 ГБ 1.2 4 ГБ 50 сек 0
RocksDBState
С Инкрементальной
Контрольной точкой с AVRO
1 1 3 ГБ 1.82 ГБ 4.63 ГБ 1.5 207 МБ 3 сек 3 ГБ
Input rate 20k/s (~15.6 MB/s)
Тип №:
TM
Параллелизм Выделение памяти в куче
на TM
Использование кучи
на TM
Использование пода
на TM
Использование ЦП
на TM
Контрольная точка
Размер
Контрольная точка
Длительность
Управляемая Flink
Память
HashMapState
С полной контрольной точкой
2 2 10 ГБ 8.69 ГБ 11.2 ГБ 1.3 8.39 ГБ 50 сек 0
RocksDBState
С инкрементальным
контрольным снимком с AVRO
2 2 3ГБ 1.87ГБ 4.71ГБ 1.4 404МБ 3 сек 3ГБ

input rate 30k/s (~23.5 MB/s)
Тип  Нет:
ТМ
Параллелизм Выделение кучи
на ТМ
Использование кучи
на ТМ
Использование пода
на ТМ
Использование ЦП
на ТМ
Контрольный снимок
Размер
Контрольный снимок
Продолжительность
Управляемая Flink
Память
HashMapState
С Полным контрольным снимком
3 3 10ГБ 9.74ГБ 11.2ГБ 1.2 11.8ГБ 65 сек 0
RocksDBState
С инкрементальным
 контрольным снимком с AVRO
3 3 3ГБ 1.72ГБ 3.75ГБ 1.4 576МБ 5 сек 3ГБ

Как видно из приведенного выше эксперимента, продолжительность контрольного снимка уменьшается при инкрементальном контрольном снимке. Это может значительно помочь в производительности приложения.  

Резюме

Ниже приведено резюме эксперимента.

HashMapStateBackend с полным контрольным снимком RocksDBStateBackend с Инкрементальным Чекпоинтом
Задержка Приложения Низкая задержка, потому что данные хранятся как Java-объекты в куче. Чтение и запись не включают сериализацию. Поскольку сериализация включается при каждом чтении или записи приложения, задержка будет выше.
Масштабируемость Менее масштабируем для задач с большим состоянием Высокомасштабируем для задач с большим состоянием и медленно изменяющимися состояниями
Отказоустойчивость Высокоустойчив Высокоустойчив
Продолжительность Чекпоинта Продолжительность чекпоинта высока, потому что снимается снимок для всего набора данных каждый раз. Продолжительность чекпоинта меньше, потому что сохраняется только дельта с момента последнего чекпоинта.
Сложность Восстановления Восстановление просто, потому что нужно загрузить только один снимок. Восстановление сложно, потому что RocksDB должен построить состояние из нескольких чекпоинтов, и многое зависит от скорости сети.
Требование к Хранилищу Поддерживается как HashMapStateBackend, так и RocksDBStateBackend. Поддерживается только RocksDBStateBackend.
Снимок Состояния Сохраняет всё состояние на каждом чекпоинте. Сохраняет только дельту с момента последнего успешного.
Размер Кучи Поскольку состояние хранится в куче перед чекпоинтингом, требования к куче высоки, и ожидается больше циклов GC. Состояния хранятся вне кучи и, возможно, на локальном диске, таким образом, используется меньше места кучи и меньше циклов сборки мусора.
Размер бэкэнда состояния Ограничен максимальным объемом кучи, выделенной для JVM. Размер бэкэнда состояния RocksDB не ограничивается пределом кучи JVM, а только доступным диском.
Влияние на производительность Большее влияние на обработку, потому что это полный снимок. Меньшее влияние на обработку, потому что снимается только дельта.
CPU Использование ЦП только для обработки и сборки мусора. Сериализация бэкэнда состояния не участвует. Использование ЦП выше по сравнению с полным контрольным пунктом для того же объема входных данных.

Использование ЦП может быть оптимизировано путем применения правильного механизма сериализации. Мы экспериментировали с Avro и получили гораздо лучшие результаты по сравнению с Kryo

Лучший сценарий использования Хорошо подходит для небольшого размера бэкэнда состояния и часто изменяющегося состояния. Хорошо подходит для большего бэкэнда состояния и медленного обновления состояния.

Source:
https://dzone.com/articles/apache-flink-full-checkpoint-vs-incremental-checkpoint