Apache Flink — это движок обработки данных в реальном времени. Большинство приложений для обработки потоков являются «состоянием». Это означает, что состояние сохраняется и используется для дальнейшей обработки. В Apache Flink состояние управляется через настроенный бэкенд состояния. Flink поддерживает два бэкенда состояния в продакшене. Один из них — это HashMapStateBackend
, а другой — EmbeddedRocksDBStateBackend
.
Чтобы предотвратить потерю данных и достичь отказоустойчивости, Flink может сохранять снимки состояния в надежное хранилище. Flink можно настроить так, чтобы он снимал либо все состояние в надежное место, либо дельту с момента последнего снимка. Первое называется полным контрольным снимком, а второе известно как инкрементальный контрольный снимок.
В этой статье мы собираемся сравнить HashMapStateBackend
с полным контрольным снимком и EmbeddedRocksDBStateBackend
с инкрементальным контрольным снимком. Эта статья предполагает, что аудитория имеет либо практические, либо теоретические знания о Apache Flink.
Обзор бэкенда состояния Flink
Чтобы понять бэкенд состояния Flink, важно знать разницу между состоянием в процессе обработки и снимками состояния.
Состояние в процессе обработки известно как рабочее состояние Flink и хранится локально. В зависимости от конфигурации бэкенда состояния, оно находится либо в управляемой памяти, либо в памяти вне кучи, с возможным сбросом на локальный диск.
С другой стороны, снимки состояния (точки контроля или точки сохранения) хранятся в надежном удаленном месте. Эти снимки используются для восстановления состояния задачи Flink в случае сбоя задачи.
Состояние в полете может быть потеряно, если задача завершится неудачно. Это не влияет на восстановление задачи, если точка контроля включена в работу задачи. Когда настроена точка контроля, состояние извлекается из надежного хранилища во время восстановления.
Какое хранилище состояния выбрать для продакшена зависит от требований приложения к пропускной способности, задержке и масштабируемости.
В продакшене Apache Flink поддерживает два хранилища состояния.
1. HashMapStateBackend
Это легкое хранилище состояния в Flink для управления состоянием ключа и состоянием оператора во время обработки потока. Состояние хранится в Java Heap с использованием структуры данных HashMap. Поскольку оно хранится в памяти, основным ограничением здесь является то, что максимальный размер состояния ограничен размером Java Heap. При записи в состояние или чтении из состояния сериализация не выполняется. Поэтому это подходит для приложений с низкой задержкой, высокой пропускной способностью и не слишком большим объемом состояния.
2. EmbeddedRocksDBStateBackend
Этот бэкенд состояния хранит данные в процессе обработки в базе данных RocksDB в памяти. По умолчанию RocksDB хранит данные на локальном диске менеджера задач. Данные сериализуются и хранятся в памяти вне кучи, а затем сбрасываются на локальный диск, подключенный к менеджеру задач. Формат сериализации зависит от типа сериализатора, настроенного в приложении.
С этим бэкендом состояния количество хранимого состояния ограничивается только дисковым пространством, подключенным к менеджеру задач. Если приложение имеет большое состояние и не может быть помещено в кучу, это правильный бэкенд состояния. Поскольку используется сериализация, приложение будет иметь более высокую задержку и меньшую пропускную способность по сравнению с HashMapStateBackend
.
Обзор состояния снимка
Снимок представляет глобальное состояние задания Flink. Это состоит из указателя на каждый источник данных и состояния всех операторов с состоянием Flink после обработки данных до этих указателей из источников. К checkpointing в Apache Flink — это механизм достижения отказоустойчивости путем периодического сохранения состояния в надежное удаленное хранилище.
В случае сбоя задания Flink восстанавливает сохраненное состояние из надежного удаленного хранилища и начинает обрабатывать потоковые данные с того места, где остановился. Flink использует асинхронное создание снимков барьеров. Это вариант алгоритма Чанди-Лампорта.
Flink поддерживает два типа создания контрольных точек.
1. Полное создание контрольных точек
Полное чекпоинтинг — это когда всё состояние задания Flink захватывается и сохраняется в надежном удаленном хранилище. В случае сбоя задания оно восстанавливается из ранее сохраненного состояния. Требования к дисковому пространству и время, затрачиваемое на чекпоинтинг, полностью зависят от состояния приложения. Полное чекпоинтинг работает как с HashMapStateBackend
, так и с RocksDBStateBackend
.
2. Инкрементный чекпоинтинг
Инкрементный чекпоинтинг — это оптимизированный подход. Вместо того, чтобы создавать снимок всего состояния, Flink сохраняет только «дельты», внесенные в состояние с момента последнего чекпоинта. Это уменьшает сетевые накладные расходы и, таким образом, время, необходимое для чекпоинтинга. В этом случае чекпоинтинг происходит полностью асинхронно.
Только RocksDBStateBackend
поддерживает инкрементный чекпоинтинг. Flink использует внутренний механизм RocksDB для этого. Несмотря на то что чекпоинтинг занимает меньше времени, чем полное чекпоинтинг, в случае сбоя задания время восстановления зависит от многих факторов. Если сеть является узким местом, время восстановления может быть больше, чем время восстановления из полного чекпоинтинга.
Анализ
Детали пайплайна: Apache Beam пайплайн, работающий на движке Flink с фиксированным окном в 10 минут, а чекпоинт настроен на выполнение каждые 3 минуты. Настроенный тип сериализации — AVRO.
- Тип кластера: “m5dn.4xlarge”
- Хранилище для финального чекпоинта: S3
- Количество уникальных ключей: 2K
Input rate 10k/s (~ 7.8 MB/s) | |||||||||
---|---|---|---|---|---|---|---|---|---|
Тип | №: TM |
Параллелизм | Выделение памяти в куче на TM |
Использование кучи на TM |
Использование памяти пода на TM |
Использование ЦП на TM |
Контрольная точка Размер |
Контрольная точка Длительность |
Управляемая Flink Память |
HashMapState С Полной контрольной точкой |
1 | 1 | 10 ГБ | 8.5 ГБ | 11.1 ГБ | 1.2 | 4 ГБ | 50 сек | 0 |
RocksDBState С Инкрементальной Контрольной точкой с AVRO |
1 | 1 | 3 ГБ | 1.82 ГБ | 4.63 ГБ | 1.5 | 207 МБ | 3 сек | 3 ГБ |
Input rate 20k/s (~15.6 MB/s) | |||||||||
---|---|---|---|---|---|---|---|---|---|
Тип | №: TM |
Параллелизм | Выделение памяти в куче на TM |
Использование кучи на TM |
Использование пода на TM |
Использование ЦП на TM |
Контрольная точка Размер |
Контрольная точка Длительность |
Управляемая Flink Память |
HashMapState С полной контрольной точкой |
2 | 2 | 10 ГБ | 8.69 ГБ | 11.2 ГБ | 1.3 | 8.39 ГБ | 50 сек | 0 |
RocksDBState С инкрементальным контрольным снимком с AVRO |
2 | 2 | 3ГБ | 1.87ГБ | 4.71ГБ | 1.4 | 404МБ | 3 сек | 3ГБ |
input rate 30k/s (~23.5 MB/s) | |||||||||
---|---|---|---|---|---|---|---|---|---|
Тип | Нет: ТМ |
Параллелизм | Выделение кучи на ТМ |
Использование кучи на ТМ |
Использование пода на ТМ |
Использование ЦП на ТМ |
Контрольный снимок Размер |
Контрольный снимок Продолжительность |
Управляемая Flink Память |
HashMapState С Полным контрольным снимком |
3 | 3 | 10ГБ | 9.74ГБ | 11.2ГБ | 1.2 | 11.8ГБ | 65 сек | 0 |
RocksDBState С инкрементальным контрольным снимком с AVRO |
3 | 3 | 3ГБ | 1.72ГБ | 3.75ГБ | 1.4 | 576МБ | 5 сек | 3ГБ |
Как видно из приведенного выше эксперимента, продолжительность контрольного снимка уменьшается при инкрементальном контрольном снимке. Это может значительно помочь в производительности приложения.
Резюме
Ниже приведено резюме эксперимента.
HashMapStateBackend с полным контрольным снимком | RocksDBStateBackend с Инкрементальным Чекпоинтом | |
Задержка Приложения | Низкая задержка, потому что данные хранятся как Java-объекты в куче. Чтение и запись не включают сериализацию. | Поскольку сериализация включается при каждом чтении или записи приложения, задержка будет выше. |
Масштабируемость | Менее масштабируем для задач с большим состоянием | Высокомасштабируем для задач с большим состоянием и медленно изменяющимися состояниями |
Отказоустойчивость | Высокоустойчив | Высокоустойчив |
Продолжительность Чекпоинта | Продолжительность чекпоинта высока, потому что снимается снимок для всего набора данных каждый раз. | Продолжительность чекпоинта меньше, потому что сохраняется только дельта с момента последнего чекпоинта. |
Сложность Восстановления | Восстановление просто, потому что нужно загрузить только один снимок. | Восстановление сложно, потому что RocksDB должен построить состояние из нескольких чекпоинтов, и многое зависит от скорости сети. |
Требование к Хранилищу | Поддерживается как HashMapStateBackend, так и RocksDBStateBackend. | Поддерживается только RocksDBStateBackend. |
Снимок Состояния | Сохраняет всё состояние на каждом чекпоинте. | Сохраняет только дельту с момента последнего успешного. |
Размер Кучи | Поскольку состояние хранится в куче перед чекпоинтингом, требования к куче высоки, и ожидается больше циклов GC. | Состояния хранятся вне кучи и, возможно, на локальном диске, таким образом, используется меньше места кучи и меньше циклов сборки мусора. |
Размер бэкэнда состояния | Ограничен максимальным объемом кучи, выделенной для JVM. | Размер бэкэнда состояния RocksDB не ограничивается пределом кучи JVM, а только доступным диском. |
Влияние на производительность | Большее влияние на обработку, потому что это полный снимок. | Меньшее влияние на обработку, потому что снимается только дельта. |
CPU | Использование ЦП только для обработки и сборки мусора. Сериализация бэкэнда состояния не участвует. | Использование ЦП выше по сравнению с полным контрольным пунктом для того же объема входных данных.
Использование ЦП может быть оптимизировано путем применения правильного механизма сериализации. Мы экспериментировали с Avro и получили гораздо лучшие результаты по сравнению с Kryo |
Лучший сценарий использования | Хорошо подходит для небольшого размера бэкэнда состояния и часто изменяющегося состояния. | Хорошо подходит для большего бэкэнда состояния и медленного обновления состояния. |
Source:
https://dzone.com/articles/apache-flink-full-checkpoint-vs-incremental-checkpoint