При работе с фильтрацией данных из файлов Parquet с использованием pandas можно применять несколько стратегий. Хотя широко признано, что разбиение данных может значительно улучшить эффективность операций фильтрации, существуют дополнительные методы оптимизации производительности запросов к данным, хранящимся в файлах Parquet. Разбиение — это всего лишь один из вариантов.
Фильтрация по разделенным полям
Как уже упоминалось, этот подход не только наиболее знакомый, но и обычно оказывает наиболее значительное влияние на оптимизацию производительности. Причина этого проста. При использовании разбиения становится возможным избирательно исключать необходимость чтения целых файлов или даже целых директорий файлов (так называемое “упреждающее применение предикатов”), что приводит к значительному и драматическому улучшению производительности.
import pandas as pd
import time
from faker import Faker
fake = Faker()
MIL=1000000
NUM_OF_RECORDS=10*MIL
FOLDER="/tmp/out/"
PARTITIONED_PATH=f"{FOLDER}partitioned_{NUM_OF_RECORDS}/"
NON_PARTITIONED_PATH_PREFIX=f"{FOLDER}non_partitioned_{NUM_OF_RECORDS}.parquet"
print(f"Creating fake data")
data = {
'id': range(NUM_OF_RECORDS), # Generate IDs from 1 to 100
'name': [fake.name() for _ in range(NUM_OF_RECORDS)],
'age': [fake.random_int(min=18, max=99) for _ in range(NUM_OF_RECORDS)],
'state': [fake.state() for _ in range(NUM_OF_RECORDS)],
'city': [fake.city() for _ in range(NUM_OF_RECORDS)],
'street': [fake.street_address() for _ in range(NUM_OF_RECORDS)]
}
df = pd.DataFrame(data)
# запись без разбиения
df.to_parquet(path=NON_PARTITIONED_PATH)
# запись разбитых данных
df.to_parquet(path=PARTITIONED_PATH, partition_cols=['state'])
# чтение не разбитых
start_time = time.time()
df1 = pd.read_parquet(path=NON_PARTITIONED_PATH)
df1 = df1[df1['state']=='California']
runtime1 = (time.time()) - start_time # 37 sec
# чтение разбитых данных
start_time = time.time()
df2 = pd.read_parquet(path=PARTITIONED_PATH, filters=[('state','==','California')])
runtime2 = (time.time()) - start_time # 0.20 sec
Улучшение времени (вместе с уменьшением использования памяти и CPU) значительное, уменьшение с 37 секунд до всего 0,20 секунд.
Фильтрация по неразделенным полям
В приведенном выше примере мы видели, как фильтрация на основе разделенного поля может улучшить извлечение данных. Однако в некоторых сценариях данные нельзя эффективно разбить по конкретному полю, по которому мы хотим фильтровать. Более того, в некоторых случаях требуется фильтрация на основе нескольких полей. Это означает, что все входные файлы будут открыты, что может негативно сказаться на производительности.
К счастью, Parquet предлагает умный способ смягчения этой проблемы. Файлы Parquet делятся на группы строк. В каждой группе строк Parquet хранит метаданные. Эти метаданные включают минимальные и максимальные значения для каждого поля.
При записи файлов Parquet с помощью Pandas вы можете выбрать, сколько записей будет в каждой контрольной группе.
При использовании Pandas для чтения файлов Parquet с фильтрами, библиотека Pandas использует эти метаданные Parquet для эффективного фильтрования данных, загружаемых в память. Если требуемое поле выходит за пределы диапазона min/max группы строк, вся эта группа строк грациозно пропускается.
df = pd.DataFrame(data)
# запись не разбитых на разделы данных, указывая размер группы строк
df.to_parquet(path=PATH_TO_PARQUET_FILE, row_group_size=1000000)
# чтение не разбитых на разделы данных и фильтрация только по группам строк
df = pd.read_parquet(path=DATASET_PATH, filters=[('state','==','California')])
Просмотр метаданных внутри файлов Parquet можно выполнить с помощью PyArrow
.
>>> import pyarrow.parquet as pq
>>> parquet_file = pq.ParquetFile(PATH_TO_PARQUET_FILE)
>>> parquet_file.metadata
<pyarrow._parquet.FileMetaData object at 0x125b21220>
created_by: parquet-cpp-arrow version 11.0.0
num_columns: 6
num_rows: 1000000
num_row_groups: 10
format_version: 2.6
serialized_size: 9325
>>> parquet_file.metadata.row_group(0).column(3)
<pyarrow._parquet.ColumnChunkMetaData object at 0x125b5b180>
file_offset: 1675616
file_path:
physical_type: BYTE_ARRAY
num_values: 100000
path_in_schema: state
is_stats_set: True
statistics:
<pyarrow._parquet.Statistics object at 0x115283590>
has_min_max: True
min: Alabama
max: Wyoming
null_count: 0
distinct_count: 0
num_values: 100000
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8
compression: SNAPPY
encodings: ('RLE_DICTIONARY', 'PLAIN', 'RLE')
has_dictionary_page: True
dictionary_page_offset: 1599792
data_page_offset: 1600354
total_compressed_size: 75824
total_uncompressed_size: 75891
Обратите внимание, что количество групп строк указано в метаданных всего файла, а минимальные и максимальные значения указаны в разделе статистики каждого столбца для каждой группы строк.
Однако есть способ еще больше использовать эту функцию Parquet для еще более оптимизированных результатов: сортировка.
Фильтрация по отсортированным полям
Как упоминалось в предыдущем разделе, часть метаданных, хранящихся в Parquet, включает минимальные и максимальные значения для каждого поля в каждой группе строк. Когда данные сортируются по полю, по которому мы намереваемся фильтровать, у Pandas больше шансов пропускать больше групп строк.
Например, рассмотрим набор данных, который включает список записей, где одно из полей представляет собой ‘государство’. Если записи не отсортированы, существует большая вероятность того, что каждое государство появляется в большинстве групп строк. Например, посмотрите на метаданные в предыдущем разделе. Вы можете заметить, что первая группа строк сама по себе содержит все государства от ‘Алабама’ до ‘Вайоминг’.
Однако, если мы отсортируем данные по полю ‘государство’, существует значительная вероятность пропуска многих групп строк.
df = pd.DataFrame(data)
# сортировка данных по 'государству'
df.sort_values("state").to_parquet(path=NON_PARTITIONED_SORTED_PATH)
Теперь давайте еще раз взглянем на метаданные и посмотрим, как они изменились.
>>> parquet_file = pq.ParquetFile(PATH_TO_PARQUET_FILE)
>>> parquet_file.metadata.row_group(0).column(3).statistics.min
'Alabama'
>>> parquet_file.metadata.row_group(0).column(3).statistics.max
'Kentucky'
>>> parquet_file.metadata.row_group(1).column(3).statistics.min
'Kentucky'
>>> parquet_file.metadata.row_group(1).column(3).statistics.max
'North Dakota'
>>> parquet_file.metadata.row_group(2).column(3).statistics.min
'North Dakota'
>>> parquet_file.metadata.row_group(2).column(3).statistics.max
'Wyoming'
Как видите, после сортировки по государству, минимальные и максимальные значения изменились соответственно; каждая группа строк теперь содержит часть государств, а не все государства. Это означает, что чтение с фильтрами теперь должно быть намного быстрее.
Теперь давайте посмотрим, как это влияет на производительность чтения данных. Код для чтения данных не изменился.
# чтение не разбитых на части данных и фильтрация по группам строк, входные данные отсортированы по государству
start_time = time.time()
df = pd.read_parquet(path=DATASET_PATH, filters=[('state','==','California')])
runtime = (time.time()) - start_time # 0.24 seconds
Удивительно, но производительность здесь почти такая же хорошая, как и при использовании разбиения на части.
Этот принцип применим как к разделенным, так и к неразделенным данным. Мы можем использовать оба метода одновременно. Если иногда нам нужно фильтровать данные по полю A, а в другие разы — по полю B, то разбиение по полю A и сортировка по полю B может быть хорошим вариантом.
В других случаях, например, когда поле, по которому мы хотим фильтровать, является полем с высокой кардинальностью, мы можем разбивать по некоторой хеш-функции значения (bucketing) и сортировать данные внутри него по фактическому значению поля. Таким образом, мы будем пользоваться преимуществами обоих методов — разбиения и групп строк.
Чтение подмножества столбцов
Хотя менее распространенный, другой метод для достижения лучших результатов при извлечении данных заключается в выборе только конкретных полей, которые важны для вашей задачи. Эта стратегия иногда может улучшить производительность. Это связано с природой формата Parquet. Parquet реализован в столбчатом формате, что означает, что он хранит данные по столбцам внутри каждой группы строк. Чтение только некоторых столбцов означает, что другие столбцы будут пропущены.
start_time = time.time()
df = pd.read_parquet(path=NON_PARTITIONED_SORTED_PATH, columns=["name", "state"])
runtime = (time.time()) - start_time # 0.08 seconds
Неудивительно, что улучшение производительности значительно.
Заключение
Хотя разбиение данных обычно является оптимальным подходом, это не всегда возможно. Сортировка данных может привести к значительным улучшениям. Мы можем пропускать больше групп строк. Кроме того, если это возможно, выбор только необходимых столбцов всегда является хорошим выбором.
Этот пост помог вам понять, как использовать мощь Parquet и pandas для лучшей производительности.
Вот скрипт, содержащий все упомянутые ранее примеры, включая сравнения по времени.
Source:
https://dzone.com/articles/parquet-data-filtering-with-pandas