Quando si tratta di filtrare dati da file Parquet utilizzando pandas, ci sono diverse strategie che possono essere adottate. Sebbene sia ampiamente riconosciuto che la partizionamento dei dati può migliorare significativamente l’efficienza delle operazioni di filtraggio, ci sono ulteriori metodi per ottimizzare le prestazioni di interrogazione dei dati archiviati nei file Parquet. Il partizionamento è solo una delle opzioni.
Filtrare per Campi Partizionati
Come precedentemente menzionato, questo approccio non solo è il più familiare ma anche tipicamente il più influente in termini di ottimizzazione delle prestazioni. Il motivo è semplice. Quando si utilizzano partizioni, diventa possibile escludere in modo selettivo la necessità di leggere interi file o addirittura interi directory di file (conosciuto come “predicate pushdown”), con conseguente miglioramento sostanziale e drammatico delle prestazioni.
import pandas as pd
import time
from faker import Faker
fake = Faker()
MIL=1000000
NUM_OF_RECORDS=10*MIL
FOLDER="/tmp/out/"
PARTITIONED_PATH=f"{FOLDER}partitioned_{NUM_OF_RECORDS}/"
NON_PARTITIONED_PATH_PREFIX=f"{FOLDER}non_partitioned_{NUM_OF_RECORDS}.parquet"
print(f"Creating fake data")
data = {
'id': range(NUM_OF_RECORDS), # Generate IDs from 1 to 100
'name': [fake.name() for _ in range(NUM_OF_RECORDS)],
'age': [fake.random_int(min=18, max=99) for _ in range(NUM_OF_RECORDS)],
'state': [fake.state() for _ in range(NUM_OF_RECORDS)],
'city': [fake.city() for _ in range(NUM_OF_RECORDS)],
'street': [fake.street_address() for _ in range(NUM_OF_RECORDS)]
}
df = pd.DataFrame(data)
# scrivere senza partizioni
df.to_parquet(path=NON_PARTITIONED_PATH)
# scrivere dati partizionati
df.to_parquet(path=PARTITIONED_PATH, partition_cols=['state'])
# leggere dati non partizionati
start_time = time.time()
df1 = pd.read_parquet(path=NON_PARTITIONED_PATH)
df1 = df1[df1['state']=='California']
runtime1 = (time.time()) - start_time # 37 sec
# leggere dati partizionati
start_time = time.time()
df2 = pd.read_parquet(path=PARTITIONED_PATH, filters=[('state','==','California')])
runtime2 = (time.time()) - start_time # 0.20 sec
Il miglioramento temporale (insieme a una minore utilizzo di memoria e CPU) è notevole, passando da 37 secondi a solo 0,20 secondi.
Filtrare per Campi Non Partizionati
Nell’esempio sopra, abbiamo osservato come il filtraggio basato su un campo partizionato possa migliorare il recupero dei dati. Tuttavia, ci sono scenari in cui i dati non possono essere efficacemente partizionati dal campo specifico che desideriamo filtrare. Inoltre, in alcuni casi, è necessario filtrare in base a più campi. Ciò significa che tutti i file di input saranno aperti, il che può essere dannoso per le prestazioni.
Grazie a Dio, Parquet offre una soluzione intelligente per mitigare questo problema. I file Parquet sono suddivisi in gruppi di righe. All’interno di ogni gruppo di righe, Parquet memorizza i metadati. Questi metadati includono i valori minimi e massimi per ogni campo.
Quando si scrivono file Parquet con Pandas, è possibile selezionare il numero di record in ogni gruppo di controllo.
Utilizzando Pandas per leggere file Parquet con filtri, la libreria Pandas sfrutta questi metadati Parquet per filtrare in modo efficiente i dati caricati in memoria. Se il campo desiderato si trova al di fuori dell’intervallo min/max di un gruppo di righe, l’intero gruppo di righe viene evitato con grazia.
df = pd.DataFrame(data)
# scrivere dati non partizionati, specificando la dimensione del gruppo di righe
df.to_parquet(path=PATH_TO_PARQUET_FILE, row_group_size=1000000)
# leggere dati non partizionati e filtrare solo per gruppi di righe
df = pd.read_parquet(path=DATASET_PATH, filters=[('state','==','California')])
La visualizzazione dei metadati all’interno dei file Parquet può essere fatta utilizzando PyArrow
.
>>> import pyarrow.parquet as pq
>>> parquet_file = pq.ParquetFile(PATH_TO_PARQUET_FILE)
>>> parquet_file.metadata
<pyarrow._parquet.FileMetaData object at 0x125b21220>
created_by: parquet-cpp-arrow version 11.0.0
num_columns: 6
num_rows: 1000000
num_row_groups: 10
format_version: 2.6
serialized_size: 9325
>>> parquet_file.metadata.row_group(0).column(3)
<pyarrow._parquet.ColumnChunkMetaData object at 0x125b5b180>
file_offset: 1675616
file_path:
physical_type: BYTE_ARRAY
num_values: 100000
path_in_schema: state
is_stats_set: True
statistics:
<pyarrow._parquet.Statistics object at 0x115283590>
has_min_max: True
min: Alabama
max: Wyoming
null_count: 0
distinct_count: 0
num_values: 100000
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8
compression: SNAPPY
encodings: ('RLE_DICTIONARY', 'PLAIN', 'RLE')
has_dictionary_page: True
dictionary_page_offset: 1599792
data_page_offset: 1600354
total_compressed_size: 75824
total_uncompressed_size: 75891
Si noti che il numero di gruppi di righe è menzionato nei metadati dell’intero file, e i valori minimi e massimi sono menzionati all’interno della sezione delle statistiche di ogni colonna per ogni gruppo di righe.
Tuttavia, esiste un metodo per sfruttare ulteriormente questa funzionalità di Parquet per risultati ancora più ottimizzati: ordinamento.
Filtrare per campi ordinati
Come menzionato nella sezione precedente, parte dei metadati memorizzati da Parquet include i valori minimi e massimi per ogni campo all’interno di ogni gruppo di righe. Quando i dati sono ordinati in base al campo che intendiamo filtrare, Pandas ha maggiori probabilità di saltare più gruppi di righe.
Ad esempio, consideriamo un dataset che include un elenco di record, con uno dei campi che rappresenta ‘stato’. Se i record non sono ordinati, c’è una buona possibilità che ogni stato appaia in gran parte dei gruppi di righe. Ad esempio, guarda i metadati nella sezione precedente. Puoi vedere che il primo gruppo di righe contiene da solo tutti gli stati da ‘Alabama’ a ‘Wyoming’.
Tuttavia, se ordiniamo i dati in base al campo ‘stato’, c’è una significativa probabilità di saltare molti gruppi di righe.
df = pd.DataFrame(data)
# ordinando i dati in base a 'stato'
df.sort_values("state").to_parquet(path=NON_PARTITIONED_SORTED_PATH)
Ora, diamo un’altra occhiata ai metadati e vediamo come sono cambiati.
>>> parquet_file = pq.ParquetFile(PATH_TO_PARQUET_FILE)
>>> parquet_file.metadata.row_group(0).column(3).statistics.min
'Alabama'
>>> parquet_file.metadata.row_group(0).column(3).statistics.max
'Kentucky'
>>> parquet_file.metadata.row_group(1).column(3).statistics.min
'Kentucky'
>>> parquet_file.metadata.row_group(1).column(3).statistics.max
'North Dakota'
>>> parquet_file.metadata.row_group(2).column(3).statistics.min
'North Dakota'
>>> parquet_file.metadata.row_group(2).column(3).statistics.max
'Wyoming'
Come puoi vedere, dopo aver ordinato per stato, i valori minimi e massimi sono influenzati di conseguenza; ogni gruppo di righe contiene parte degli stati anziché tutti gli stati. Ciò significa che la lettura con filtri dovrebbe essere molto più veloce ora.
Ora, vediamo come influisce sulle prestazioni della lettura dei dati. Il codice per leggere i dati non è cambiato.
# lettura dei dati non partizionati e filtraggio per gruppi di righe, l'input è ordinato per stato
start_time = time.time()
df = pd.read_parquet(path=DATASET_PATH, filters=[('state','==','California')])
runtime = (time.time()) - start_time # 0.24 seconds
Sorprendentemente, le prestazioni qui sono quasi buone come quelle dell’uso dei partizionamenti.
Questo principio si applica sia ai dati suddivisi che non suddivisi. Possiamo utilizzare entrambi i metodi contemporaneamente. Se a volte vogliamo filtrare i dati in base al campo A e altre volte in base al campo B, allora la partizione in base al campo A e l’ordinamento in base al campo B potrebbe essere una buona opzione.
In altri casi, ad esempio, dove il campo che vogliamo filtrare è un campo con alta cardinalità, potremmo partizionare in base a una hash del valore (bucketing) e ordinare i dati al suo interno in base al valore effettivo del campo in questo modo godremo dei vantaggi di entrambi i metodi – partizionamento e gruppi di righe.
Lettura di un Sottoinsieme delle Colonne
Sebbene meno comunemente utilizzato, un altro metodo per ottenere risultati migliori durante il recupero dei dati consiste nel selezionare solo i campi specifici che sono essenziali per il tuo compito. Questa strategia può occasionalmente apportare miglioramenti in termini di prestazioni. Questo è dovuto alla natura del formato Parquet. Parquet è implementato in un formato tabulare, il che significa che memorizza i dati colonna per colonna all’interno di ogni gruppo di righe. La lettura di solo alcune delle colonne significa che le altre colonne verranno saltate.
start_time = time.time()
df = pd.read_parquet(path=NON_PARTITIONED_SORTED_PATH, columns=["name", "state"])
runtime = (time.time()) - start_time # 0.08 seconds
Non sorprende che il miglioramento delle prestazioni sia notevole.
Conclusione
Mentre il partizionamento dei dati è tipicamente l’approccio ottimale, non è sempre possibile. Ordinare i dati può portare a significativi miglioramenti. Potremmo saltare più gruppi di righe in questo modo. Inoltre, se possibile, selezionare solo le colonne necessarie è sempre una buona scelta.
Questo post ti ha aiutato a comprendere come sfruttare il potere di Parquet e Pandas per prestazioni migliori.
Ecco uno script contenente tutti gli esempi menzionati in precedenza, completo di confronti temporali.
Source:
https://dzone.com/articles/parquet-data-filtering-with-pandas