Filtrazione dei Dati Parquet con Pandas

Quando si tratta di filtrare dati da file Parquet utilizzando pandas, ci sono diverse strategie che possono essere adottate. Sebbene sia ampiamente riconosciuto che la partizionamento dei dati può migliorare significativamente l’efficienza delle operazioni di filtraggio, ci sono ulteriori metodi per ottimizzare le prestazioni di interrogazione dei dati archiviati nei file Parquet. Il partizionamento è solo una delle opzioni.

Filtrare per Campi Partizionati

Come precedentemente menzionato, questo approccio non solo è il più familiare ma anche tipicamente il più influente in termini di ottimizzazione delle prestazioni. Il motivo è semplice. Quando si utilizzano partizioni, diventa possibile escludere in modo selettivo la necessità di leggere interi file o addirittura interi directory di file (conosciuto come “predicate pushdown”), con conseguente miglioramento sostanziale e drammatico delle prestazioni.

Python

 

import pandas as pd
import time
from faker import Faker
 
fake = Faker()
 
MIL=1000000
NUM_OF_RECORDS=10*MIL
FOLDER="/tmp/out/"
PARTITIONED_PATH=f"{FOLDER}partitioned_{NUM_OF_RECORDS}/"
NON_PARTITIONED_PATH_PREFIX=f"{FOLDER}non_partitioned_{NUM_OF_RECORDS}.parquet"
 
print(f"Creating fake data")
data = {
    'id': range(NUM_OF_RECORDS),  # Generate IDs from 1 to 100
    'name': [fake.name() for _ in range(NUM_OF_RECORDS)],
    'age': [fake.random_int(min=18, max=99) for _ in range(NUM_OF_RECORDS)],
    'state': [fake.state() for _ in range(NUM_OF_RECORDS)],
    'city': [fake.city() for _ in range(NUM_OF_RECORDS)],
    'street': [fake.street_address() for _ in range(NUM_OF_RECORDS)]
}
 
df = pd.DataFrame(data)
 
# scrivere senza partizioni
df.to_parquet(path=NON_PARTITIONED_PATH)
 
# scrivere dati partizionati
df.to_parquet(path=PARTITIONED_PATH, partition_cols=['state'])
 
# leggere dati non partizionati
start_time = time.time()
df1 = pd.read_parquet(path=NON_PARTITIONED_PATH)
df1 = df1[df1['state']=='California']
runtime1 = (time.time()) - start_time  # 37 sec
 
# leggere dati partizionati
start_time = time.time()
df2 = pd.read_parquet(path=PARTITIONED_PATH, filters=[('state','==','California')])
runtime2 = (time.time()) - start_time # 0.20 sec

Il miglioramento temporale (insieme a una minore utilizzo di memoria e CPU) è notevole, passando da 37 secondi a solo 0,20 secondi.

Filtrare per Campi Non Partizionati

Nell’esempio sopra, abbiamo osservato come il filtraggio basato su un campo partizionato possa migliorare il recupero dei dati. Tuttavia, ci sono scenari in cui i dati non possono essere efficacemente partizionati dal campo specifico che desideriamo filtrare. Inoltre, in alcuni casi, è necessario filtrare in base a più campi. Ciò significa che tutti i file di input saranno aperti, il che può essere dannoso per le prestazioni.

Grazie a Dio, Parquet offre una soluzione intelligente per mitigare questo problema. I file Parquet sono suddivisi in gruppi di righe. All’interno di ogni gruppo di righe, Parquet memorizza i metadati. Questi metadati includono i valori minimi e massimi per ogni campo.

Quando si scrivono file Parquet con Pandas, è possibile selezionare il numero di record in ogni gruppo di controllo.

Utilizzando Pandas per leggere file Parquet con filtri, la libreria Pandas sfrutta questi metadati Parquet per filtrare in modo efficiente i dati caricati in memoria. Se il campo desiderato si trova al di fuori dell’intervallo min/max di un gruppo di righe, l’intero gruppo di righe viene evitato con grazia.

Python

 

df = pd.DataFrame(data)

# scrivere dati non partizionati, specificando la dimensione del gruppo di righe
df.to_parquet(path=PATH_TO_PARQUET_FILE, row_group_size=1000000)

# leggere dati non partizionati e filtrare solo per gruppi di righe
df = pd.read_parquet(path=DATASET_PATH, filters=[('state','==','California')])

La visualizzazione dei metadati all’interno dei file Parquet può essere fatta utilizzando PyArrow.

Shell

 

>>> import pyarrow.parquet as pq

>>> parquet_file = pq.ParquetFile(PATH_TO_PARQUET_FILE)

>>> parquet_file.metadata
<pyarrow._parquet.FileMetaData object at 0x125b21220>
  created_by: parquet-cpp-arrow version 11.0.0
  num_columns: 6
  num_rows: 1000000
  num_row_groups: 10
  format_version: 2.6
  serialized_size: 9325

>>> parquet_file.metadata.row_group(0).column(3)
<pyarrow._parquet.ColumnChunkMetaData object at 0x125b5b180>
  file_offset: 1675616
  file_path: 
  physical_type: BYTE_ARRAY
  num_values: 100000
  path_in_schema: state
  is_stats_set: True
  statistics:
    <pyarrow._parquet.Statistics object at 0x115283590>
      has_min_max: True
      min: Alabama
      max: Wyoming
      null_count: 0
      distinct_count: 0
      num_values: 100000
      physical_type: BYTE_ARRAY
      logical_type: String
      converted_type (legacy): UTF8
  compression: SNAPPY
  encodings: ('RLE_DICTIONARY', 'PLAIN', 'RLE')
  has_dictionary_page: True
  dictionary_page_offset: 1599792
  data_page_offset: 1600354
  total_compressed_size: 75824
  total_uncompressed_size: 75891

Si noti che il numero di gruppi di righe è menzionato nei metadati dell’intero file, e i valori minimi e massimi sono menzionati all’interno della sezione delle statistiche di ogni colonna per ogni gruppo di righe.

Tuttavia, esiste un metodo per sfruttare ulteriormente questa funzionalità di Parquet per risultati ancora più ottimizzati: ordinamento.

Filtrare per campi ordinati

Come menzionato nella sezione precedente, parte dei metadati memorizzati da Parquet include i valori minimi e massimi per ogni campo all’interno di ogni gruppo di righe. Quando i dati sono ordinati in base al campo che intendiamo filtrare, Pandas ha maggiori probabilità di saltare più gruppi di righe.

Ad esempio, consideriamo un dataset che include un elenco di record, con uno dei campi che rappresenta ‘stato’. Se i record non sono ordinati, c’è una buona possibilità che ogni stato appaia in gran parte dei gruppi di righe. Ad esempio, guarda i metadati nella sezione precedente. Puoi vedere che il primo gruppo di righe contiene da solo tutti gli stati da ‘Alabama’ a ‘Wyoming’.

Tuttavia, se ordiniamo i dati in base al campo ‘stato’, c’è una significativa probabilità di saltare molti gruppi di righe.

Python

 

df = pd.DataFrame(data)

# ordinando i dati in base a 'stato'
df.sort_values("state").to_parquet(path=NON_PARTITIONED_SORTED_PATH)

Ora, diamo un’altra occhiata ai metadati e vediamo come sono cambiati.

PowerShell

 

>>> parquet_file = pq.ParquetFile(PATH_TO_PARQUET_FILE)

>>> parquet_file.metadata.row_group(0).column(3).statistics.min
'Alabama'
>>> parquet_file.metadata.row_group(0).column(3).statistics.max
'Kentucky'


>>> parquet_file.metadata.row_group(1).column(3).statistics.min
'Kentucky'
>>> parquet_file.metadata.row_group(1).column(3).statistics.max
'North Dakota'


>>> parquet_file.metadata.row_group(2).column(3).statistics.min
'North Dakota'
>>> parquet_file.metadata.row_group(2).column(3).statistics.max
'Wyoming'

Come puoi vedere, dopo aver ordinato per stato, i valori minimi e massimi sono influenzati di conseguenza; ogni gruppo di righe contiene parte degli stati anziché tutti gli stati. Ciò significa che la lettura con filtri dovrebbe essere molto più veloce ora.

Ora, vediamo come influisce sulle prestazioni della lettura dei dati. Il codice per leggere i dati non è cambiato.

Python

 

# lettura dei dati non partizionati e filtraggio per gruppi di righe, l'input è ordinato per stato
start_time = time.time()
df = pd.read_parquet(path=DATASET_PATH, filters=[('state','==','California')])
runtime = (time.time()) - start_time # 0.24 seconds

Sorprendentemente, le prestazioni qui sono quasi buone come quelle dell’uso dei partizionamenti.

Questo principio si applica sia ai dati suddivisi che non suddivisi. Possiamo utilizzare entrambi i metodi contemporaneamente. Se a volte vogliamo filtrare i dati in base al campo A e altre volte in base al campo B, allora la partizione in base al campo A e l’ordinamento in base al campo B potrebbe essere una buona opzione.
In altri casi, ad esempio, dove il campo che vogliamo filtrare è un campo con alta cardinalità, potremmo partizionare in base a una hash del valore (bucketing) e ordinare i dati al suo interno in base al valore effettivo del campo in questo modo godremo dei vantaggi di entrambi i metodi – partizionamento e gruppi di righe.

Lettura di un Sottoinsieme delle Colonne

Sebbene meno comunemente utilizzato, un altro metodo per ottenere risultati migliori durante il recupero dei dati consiste nel selezionare solo i campi specifici che sono essenziali per il tuo compito. Questa strategia può occasionalmente apportare miglioramenti in termini di prestazioni. Questo è dovuto alla natura del formato Parquet. Parquet è implementato in un formato tabulare, il che significa che memorizza i dati colonna per colonna all’interno di ogni gruppo di righe. La lettura di solo alcune delle colonne significa che le altre colonne verranno saltate.

Python

 

start_time = time.time()
df = pd.read_parquet(path=NON_PARTITIONED_SORTED_PATH, columns=["name", "state"])
runtime = (time.time()) - start_time # 0.08 seconds

Non sorprende che il miglioramento delle prestazioni sia notevole.

Conclusione

Mentre il partizionamento dei dati è tipicamente l’approccio ottimale, non è sempre possibile. Ordinare i dati può portare a significativi miglioramenti. Potremmo saltare più gruppi di righe in questo modo. Inoltre, se possibile, selezionare solo le colonne necessarie è sempre una buona scelta.

Questo post ti ha aiutato a comprendere come sfruttare il potere di Parquet e Pandas per prestazioni migliori.

Ecco uno script contenente tutti gli esempi menzionati in precedenza, completo di confronti temporali.

Source:
https://dzone.com/articles/parquet-data-filtering-with-pandas