Фильтрация данных Parquet с помощью Pandas

При работе с фильтрацией данных из файлов Parquet с использованием pandas можно применять несколько стратегий. Хотя широко признано, что разбиение данных может значительно улучшить эффективность операций фильтрации, существуют дополнительные методы оптимизации производительности запросов к данным, хранящимся в файлах Parquet. Разбиение — это всего лишь один из вариантов.

Фильтрация по разделенным полям

Как уже упоминалось, этот подход не только наиболее знакомый, но и обычно оказывает наиболее значительное влияние на оптимизацию производительности. Причина этого проста. При использовании разбиения становится возможным избирательно исключать необходимость чтения целых файлов или даже целых директорий файлов (так называемое “упреждающее применение предикатов”), что приводит к значительному и драматическому улучшению производительности.

Python

 

import pandas as pd
import time
from faker import Faker
 
fake = Faker()
 
MIL=1000000
NUM_OF_RECORDS=10*MIL
FOLDER="/tmp/out/"
PARTITIONED_PATH=f"{FOLDER}partitioned_{NUM_OF_RECORDS}/"
NON_PARTITIONED_PATH_PREFIX=f"{FOLDER}non_partitioned_{NUM_OF_RECORDS}.parquet"
 
print(f"Creating fake data")
data = {
    'id': range(NUM_OF_RECORDS),  # Generate IDs from 1 to 100
    'name': [fake.name() for _ in range(NUM_OF_RECORDS)],
    'age': [fake.random_int(min=18, max=99) for _ in range(NUM_OF_RECORDS)],
    'state': [fake.state() for _ in range(NUM_OF_RECORDS)],
    'city': [fake.city() for _ in range(NUM_OF_RECORDS)],
    'street': [fake.street_address() for _ in range(NUM_OF_RECORDS)]
}
 
df = pd.DataFrame(data)
 
# запись без разбиения
df.to_parquet(path=NON_PARTITIONED_PATH)
 
# запись разбитых данных
df.to_parquet(path=PARTITIONED_PATH, partition_cols=['state'])
 
# чтение не разбитых
start_time = time.time()
df1 = pd.read_parquet(path=NON_PARTITIONED_PATH)
df1 = df1[df1['state']=='California']
runtime1 = (time.time()) - start_time  # 37 sec
 
# чтение разбитых данных
start_time = time.time()
df2 = pd.read_parquet(path=PARTITIONED_PATH, filters=[('state','==','California')])
runtime2 = (time.time()) - start_time # 0.20 sec

Улучшение времени (вместе с уменьшением использования памяти и CPU) значительное, уменьшение с 37 секунд до всего 0,20 секунд.

Фильтрация по неразделенным полям

В приведенном выше примере мы видели, как фильтрация на основе разделенного поля может улучшить извлечение данных. Однако в некоторых сценариях данные нельзя эффективно разбить по конкретному полю, по которому мы хотим фильтровать. Более того, в некоторых случаях требуется фильтрация на основе нескольких полей. Это означает, что все входные файлы будут открыты, что может негативно сказаться на производительности.

К счастью, Parquet предлагает умный способ смягчения этой проблемы. Файлы Parquet делятся на группы строк. В каждой группе строк Parquet хранит метаданные. Эти метаданные включают минимальные и максимальные значения для каждого поля.

При записи файлов Parquet с помощью Pandas вы можете выбрать, сколько записей будет в каждой контрольной группе.

При использовании Pandas для чтения файлов Parquet с фильтрами, библиотека Pandas использует эти метаданные Parquet для эффективного фильтрования данных, загружаемых в память. Если требуемое поле выходит за пределы диапазона min/max группы строк, вся эта группа строк грациозно пропускается.

Python

 

df = pd.DataFrame(data)

# запись не разбитых на разделы данных, указывая размер группы строк
df.to_parquet(path=PATH_TO_PARQUET_FILE, row_group_size=1000000)

# чтение не разбитых на разделы данных и фильтрация только по группам строк
df = pd.read_parquet(path=DATASET_PATH, filters=[('state','==','California')])

Просмотр метаданных внутри файлов Parquet можно выполнить с помощью PyArrow.

Shell

 

>>> import pyarrow.parquet as pq

>>> parquet_file = pq.ParquetFile(PATH_TO_PARQUET_FILE)

>>> parquet_file.metadata
<pyarrow._parquet.FileMetaData object at 0x125b21220>
  created_by: parquet-cpp-arrow version 11.0.0
  num_columns: 6
  num_rows: 1000000
  num_row_groups: 10
  format_version: 2.6
  serialized_size: 9325

>>> parquet_file.metadata.row_group(0).column(3)
<pyarrow._parquet.ColumnChunkMetaData object at 0x125b5b180>
  file_offset: 1675616
  file_path: 
  physical_type: BYTE_ARRAY
  num_values: 100000
  path_in_schema: state
  is_stats_set: True
  statistics:
    <pyarrow._parquet.Statistics object at 0x115283590>
      has_min_max: True
      min: Alabama
      max: Wyoming
      null_count: 0
      distinct_count: 0
      num_values: 100000
      physical_type: BYTE_ARRAY
      logical_type: String
      converted_type (legacy): UTF8
  compression: SNAPPY
  encodings: ('RLE_DICTIONARY', 'PLAIN', 'RLE')
  has_dictionary_page: True
  dictionary_page_offset: 1599792
  data_page_offset: 1600354
  total_compressed_size: 75824
  total_uncompressed_size: 75891

Обратите внимание, что количество групп строк указано в метаданных всего файла, а минимальные и максимальные значения указаны в разделе статистики каждого столбца для каждой группы строк.

Однако есть способ еще больше использовать эту функцию Parquet для еще более оптимизированных результатов: сортировка.

Фильтрация по отсортированным полям

Как упоминалось в предыдущем разделе, часть метаданных, хранящихся в Parquet, включает минимальные и максимальные значения для каждого поля в каждой группе строк. Когда данные сортируются по полю, по которому мы намереваемся фильтровать, у Pandas больше шансов пропускать больше групп строк.

Например, рассмотрим набор данных, который включает список записей, где одно из полей представляет собой ‘государство’. Если записи не отсортированы, существует большая вероятность того, что каждое государство появляется в большинстве групп строк. Например, посмотрите на метаданные в предыдущем разделе. Вы можете заметить, что первая группа строк сама по себе содержит все государства от ‘Алабама’ до ‘Вайоминг’.

Однако, если мы отсортируем данные по полю ‘государство’, существует значительная вероятность пропуска многих групп строк.

Python

 

df = pd.DataFrame(data)

# сортировка данных по 'государству'
df.sort_values("state").to_parquet(path=NON_PARTITIONED_SORTED_PATH)

Теперь давайте еще раз взглянем на метаданные и посмотрим, как они изменились.

PowerShell

 

>>> parquet_file = pq.ParquetFile(PATH_TO_PARQUET_FILE)

>>> parquet_file.metadata.row_group(0).column(3).statistics.min
'Alabama'
>>> parquet_file.metadata.row_group(0).column(3).statistics.max
'Kentucky'


>>> parquet_file.metadata.row_group(1).column(3).statistics.min
'Kentucky'
>>> parquet_file.metadata.row_group(1).column(3).statistics.max
'North Dakota'


>>> parquet_file.metadata.row_group(2).column(3).statistics.min
'North Dakota'
>>> parquet_file.metadata.row_group(2).column(3).statistics.max
'Wyoming'

Как видите, после сортировки по государству, минимальные и максимальные значения изменились соответственно; каждая группа строк теперь содержит часть государств, а не все государства. Это означает, что чтение с фильтрами теперь должно быть намного быстрее.

Теперь давайте посмотрим, как это влияет на производительность чтения данных. Код для чтения данных не изменился.

Python

 

# чтение не разбитых на части данных и фильтрация по группам строк, входные данные отсортированы по государству
start_time = time.time()
df = pd.read_parquet(path=DATASET_PATH, filters=[('state','==','California')])
runtime = (time.time()) - start_time # 0.24 seconds

Удивительно, но производительность здесь почти такая же хорошая, как и при использовании разбиения на части.

Этот принцип применим как к разделенным, так и к неразделенным данным. Мы можем использовать оба метода одновременно. Если иногда нам нужно фильтровать данные по полю A, а в другие разы — по полю B, то разбиение по полю A и сортировка по полю B может быть хорошим вариантом.
В других случаях, например, когда поле, по которому мы хотим фильтровать, является полем с высокой кардинальностью, мы можем разбивать по некоторой хеш-функции значения (bucketing) и сортировать данные внутри него по фактическому значению поля. Таким образом, мы будем пользоваться преимуществами обоих методов — разбиения и групп строк.

Чтение подмножества столбцов

Хотя менее распространенный, другой метод для достижения лучших результатов при извлечении данных заключается в выборе только конкретных полей, которые важны для вашей задачи. Эта стратегия иногда может улучшить производительность. Это связано с природой формата Parquet. Parquet реализован в столбчатом формате, что означает, что он хранит данные по столбцам внутри каждой группы строк. Чтение только некоторых столбцов означает, что другие столбцы будут пропущены.

Python

 

start_time = time.time()
df = pd.read_parquet(path=NON_PARTITIONED_SORTED_PATH, columns=["name", "state"])
runtime = (time.time()) - start_time # 0.08 seconds

Неудивительно, что улучшение производительности значительно.

Заключение

Хотя разбиение данных обычно является оптимальным подходом, это не всегда возможно. Сортировка данных может привести к значительным улучшениям. Мы можем пропускать больше групп строк. Кроме того, если это возможно, выбор только необходимых столбцов всегда является хорошим выбором.

Этот пост помог вам понять, как использовать мощь Parquet и pandas для лучшей производительности.

Вот скрипт, содержащий все упомянутые ранее примеры, включая сравнения по времени.

Source:
https://dzone.com/articles/parquet-data-filtering-with-pandas