Parquet-Datenfilterung mit Pandas

Bei der Filterung von Daten aus Parquet-Dateien mithilfe von pandas können verschiedene Strategien angewendet werden. Während es weitgehend anerkannt ist, dass die Partitionierung von Daten die Effizienz von Filteroperationen erheblich steigern kann, gibt es zusätzliche Methoden, um die Leistungsfähigkeit von Abfragen zu optimieren, die Daten in Parquet-Dateien speichern. Die Partitionierung ist nur eine der Optionen.

Filterung nach Partitionsfeldern

Wie bereits erwähnt, ist dieser Ansatz nicht nur der am häufigsten bekannte, sondern auch in der Regel der am stärksten in Bezug auf Leistungsoptimierung. Der Grundgedanke dahinter ist einfach. Wenn Partitions verwendet werden, wird es möglich, das Lesen ganzer Dateien oder sogar ganzer Verzeichnisse von Dateien (auch bekannt als Prädikatunterdrückung) selektiv auszuschließen, was zu einer erheblichen und dramatischen Leistungssteigerung führt.

Python

 

import pandas as pd
import time
from faker import Faker
 
fake = Faker()
 
MIL=1000000
NUM_OF_RECORDS=10*MIL
FOLDER="/tmp/out/"
PARTITIONED_PATH=f"{FOLDER}partitioned_{NUM_OF_RECORDS}/"
NON_PARTITIONED_PATH_PREFIX=f"{FOLDER}non_partitioned_{NUM_OF_RECORDS}.parquet"
 
print(f"Creating fake data")
data = {
    'id': range(NUM_OF_RECORDS),  # Generate IDs from 1 to 100
    'name': [fake.name() for _ in range(NUM_OF_RECORDS)],
    'age': [fake.random_int(min=18, max=99) for _ in range(NUM_OF_RECORDS)],
    'state': [fake.state() for _ in range(NUM_OF_RECORDS)],
    'city': [fake.city() for _ in range(NUM_OF_RECORDS)],
    'street': [fake.street_address() for _ in range(NUM_OF_RECORDS)]
}
 
df = pd.DataFrame(data)
 
# Schreiben ohne Partitionen
df.to_parquet(path=NON_PARTITIONED_PATH)
 
# Schreiben partitionierter Daten
df.to_parquet(path=PARTITIONED_PATH, partition_cols=['state'])
 
# Lesen nicht partitionierter Daten
start_time = time.time()
df1 = pd.read_parquet(path=NON_PARTITIONED_PATH)
df1 = df1[df1['state']=='California']
runtime1 = (time.time()) - start_time  # 37 sec
 
# Lesen partitionierter Daten
start_time = time.time()
df2 = pd.read_parquet(path=PARTITIONED_PATH, filters=[('state','==','California')])
runtime2 = (time.time()) - start_time # 0.20 sec

Die Zeitverbesserung (zusammen mit verringertem Speicher- und CPU-Verbrauch) ist beträchtlich, von 37 Sekunden auf nur 0,20 Sekunden reduziert.

Filterung nach nicht partitionierten Feldern

Im obigen Beispiel haben wir gesehen, wie die Filterung basierend auf einem partitionierten Feld die Datenabrufe verbessern kann. Es gibt jedoch Szenarien, in denen Daten nicht effektiv nach dem spezifischen Feld, das wir filtern möchten, partitioniert werden können. Darüber hinaus ist in einigen Fällen eine Filterung erforderlich, die auf mehreren Feldern basiert. Dies bedeutet, dass alle Eingabedateien geöffnet werden, was sich negativ auf die Leistung auswirken kann.

Glücklicherweise bietet Parquet eine clevere Lösung, um dieses Problem zu mindern. Parquet-Dateien werden in Zeilengruppen unterteilt. Innerhalb jeder Zeilengruppe speichert Parquet Metadaten. Diese Metadaten enthalten die minimalen und maximalen Werte für jedes Feld.

Beim Schreiben von Parquet-Dateien mit Pandas können Sie auswählen, wie viele Datensätze in jeder Steuergruppe enthalten sein sollen.

Beim Lesen von Parquet-Dateien mit Filtern nutzt die Pandas-Bibliothek diese Parquet-Metadaten, um Daten effizient in den Arbeitsspeicher zu filtern. Fällt das gewünschte Feld außerhalb des Min/Max-Bereichs einer Zeilengruppe, wird diese ganze Zeilengruppe elegant übersprungen.

Python

 

df = pd.DataFrame(data)

# Schreiben von nicht partitionierten Daten und Festlegen der Größe der Zeilengruppe
df.to_parquet(path=PATH_TO_PARQUET_FILE, row_group_size=1000000)

# Lesen von nicht partitionierten Daten und Filtern nur nach Zeilengruppen
df = pd.read_parquet(path=DATASET_PATH, filters=[('state','==','California')])

Die Metadaten in Parquet-Dateien können mit PyArrow angesehen werden.

Shell

 

>>> import pyarrow.parquet as pq

>>> parquet_file = pq.ParquetFile(PATH_TO_PARQUET_FILE)

>>> parquet_file.metadata
<pyarrow._parquet.FileMetaData object at 0x125b21220>
  created_by: parquet-cpp-arrow version 11.0.0
  num_columns: 6
  num_rows: 1000000
  num_row_groups: 10
  format_version: 2.6
  serialized_size: 9325

>>> parquet_file.metadata.row_group(0).column(3)
<pyarrow._parquet.ColumnChunkMetaData object at 0x125b5b180>
  file_offset: 1675616
  file_path: 
  physical_type: BYTE_ARRAY
  num_values: 100000
  path_in_schema: state
  is_stats_set: True
  statistics:
    <pyarrow._parquet.Statistics object at 0x115283590>
      has_min_max: True
      min: Alabama
      max: Wyoming
      null_count: 0
      distinct_count: 0
      num_values: 100000
      physical_type: BYTE_ARRAY
      logical_type: String
      converted_type (legacy): UTF8
  compression: SNAPPY
  encodings: ('RLE_DICTIONARY', 'PLAIN', 'RLE')
  has_dictionary_page: True
  dictionary_page_offset: 1599792
  data_page_offset: 1600354
  total_compressed_size: 75824
  total_uncompressed_size: 75891

Beachten Sie, dass die Anzahl der Zeilengruppen im Metadaten der gesamten Datei erwähnt wird, und die minimalen und maximalen Werte sind im Statistikbereich jeder Spalte für jede Zeilengruppe aufgeführt.

Es gibt jedoch eine Methode, um dieses Parquet-Feature für noch optimalere Ergebnisse zu nutzen: Sortieren.

Filtern nach Sortierten Feldern

Wie im vorherigen Abschnitt erwähnt, beinhalten die von Parquet gespeicherten Metadaten die minimalen und maximalen Werte für jedes Feld in jeder Zeilengruppe. Wenn die Daten nach dem Feld sortiert sind, das wir filtern möchten, besteht eine höhere Wahrscheinlichkeit, dass Pandas mehr Zeilengruppen überspringt.

Zum Beispiel betrachten wir einen Datensatz, der eine Liste von Einträgen enthält, wobei eines der Felder die ‚Bundesstaaten‘ darstellt. Wenn die Einträge nicht sortiert sind, besteht eine hohe Wahrscheinlichkeit, dass jeder Bundesstaat in den meisten Zeilengruppen auftritt. Schauen Sie sich beispielsweise die Metadaten aus dem vorherigen Abschnitt an. Sie können sehen, dass die erste Zeilengruppe alle Bundesstaaten von ‚Alabama‘ bis ‚Wyoming‘ enthält.

Wenn wir jedoch die Daten basierend auf dem ‚Bundesstaat‘ sortieren, besteht eine erhebliche Wahrscheinlichkeit, viele Zeilengruppen zu überspringen.

Python

 

df = pd.DataFrame(data)

# Sortieren der Daten basierend auf 'Bundesstaat'
df.sort_values("state").to_parquet(path=NON_PARTITIONED_SORTED_PATH)

Schauen wir uns nun erneut die Metadaten an und betrachten, wie sich diese verändert haben.

PowerShell

 

>>> parquet_file = pq.ParquetFile(PATH_TO_PARQUET_FILE)

>>> parquet_file.metadata.row_group(0).column(3).statistics.min
'Alabama'
>>> parquet_file.metadata.row_group(0).column(3).statistics.max
'Kentucky'


>>> parquet_file.metadata.row_group(1).column(3).statistics.min
'Kentucky'
>>> parquet_file.metadata.row_group(1).column(3).statistics.max
'North Dakota'


>>> parquet_file.metadata.row_group(2).column(3).statistics.min
'North Dakota'
>>> parquet_file.metadata.row_group(2).column(3).statistics.max
'Wyoming'

Wie Sie sehen können, hat das Sortieren nach Bundesstaat die Min-Max-Werte entsprechend beeinflusst; jede Zeilengruppe enthält nun einen Teil der Bundesstaaten anstatt aller Bundesstaaten. Dies bedeutet, dass das Lesen mit Filtern nun viel schneller sein sollte.

Nun wollen wir untersuchen, wie sich dies auf die Leistung des Datenlesevorgangs auswirkt. Der Code zum Lesen der Daten hat sich nicht verändert.

Python

 

# Lesen nicht partitionierter Daten und Filtern nach Zeilengruppen, der Eingabedatensatz ist nach Bundesstaat sortiert
start_time = time.time()
df = pd.read_parquet(path=DATASET_PATH, filters=[('state','==','California')])
runtime = (time.time()) - start_time # 0.24 seconds

Erstaunlicherweise ist die Leistung hier fast so gut wie bei der Verwendung von Partitionen.

Dieser Grundsatz gilt sowohl für partitionierte als auch für nicht partitionierte Daten. Wir können beide Methoden gleichzeitig verwenden. Wenn wir manchmal die Daten basierend auf Feld A und zu anderen Zeiten basierend auf Feld B filtern möchten, könnte die Partitionierung nach Feld A und das Sortieren nach Feld B eine gute Option sein.
In anderen Fällen, zum Beispiel wenn das Feld, nach dem wir filtern möchten, ein Feld mit hoher Kardinalität ist, könnten wir nach einem Hash des Werts partitionieren (Bucketing) und die Daten darin nach dem tatsächlichen Wert des Felds sortieren. Auf diese Weise können wir die Vorteile beider Methoden — Partitionierung und Zeilengruppen — genießen.

Lesen eines Teils der Spalten

Obwohl seltener verwendet, ist eine weitere Methode zur Erzielung besserer Ergebnisse bei der Datenabfrage die Auswahl nur der spezifischen Felder, die für Ihre Aufgabe unerlässlich sind. Diese Strategie kann gelegentlich zu Verbesserungen bei der Leistung führen. Grund dafür ist die Art und Weise, wie das Parquet-Format implementiert ist. Parquet ist in einer spaltenbasierten Form implementiert, was bedeutet, dass es die Daten in jeder Zeilengruppe spaltenweise speichert. Das Lesen nur einiger Spalten bedeutet, dass die anderen Spalten übersprungen werden.

Python

 

start_time = time.time()
df = pd.read_parquet(path=NON_PARTITIONED_SORTED_PATH, columns=["name", "state"])
runtime = (time.time()) - start_time # 0.08 seconds

Es ist keine Überraschung, dass die Leistungssteigerung beträchtlich ist.

Schlussfolgerung

Während die Partitionierung von Daten in der Regel der optimale Ansatz ist, ist dies nicht immer möglich. Das Sortieren der Daten kann zu signifikanten Verbesserungen führen. Wir können mehr Zeilengruppen überspringen. Darüber hinaus ist die Auswahl nur der notwendigen Spalten immer eine gute Wahl, wenn dies möglich ist.

Dieser Beitrag hat Ihnen geholfen, zu verstehen, wie Sie die Kraft von Parquet und Pandas zur Verbesserung der Leistung nutzen können.

Hier ist ein Skript, das alle zuvor erwähnten Beispiele enthält, komplett mit Zeitvergleichen.

Source:
https://dzone.com/articles/parquet-data-filtering-with-pandas