Bij het filteren van gegevens uit Parquet-bestanden met behulp van pandas, kunnen verschillende strategieën worden toegepast. Hoewel het breed erkend is dat het partitioneren van gegevens een significante verbetering kan betekenen voor de efficiëntie van filteerbewerkingen, zijn er aanvullende methoden om de prestaties van het opvragen van gegevens die zijn opgeslagen in Parquet-bestanden te optimaliseren. Partitioneren is slechts één van de mogelijkheden.
Filteren op Gepartitioneerde Velden
Zoals eerder genoemd, is deze aanpak niet alleen de meest bekende, maar ook meestal de meest impactvolle in termen van prestatieoptimalisatie. De reden hiervoor is eenvoudig. Wanneer partities worden gebruikt, kan het nodig zijn om gehele bestanden of zelfs volledige directories van bestanden te selectief uitsluiten (ook bekend als predicate pushdown), wat resulteert in een aanzienlijke en dramatische verbetering van de prestaties.
import pandas as pd
import time
from faker import Faker
fake = Faker()
MIL=1000000
NUM_OF_RECORDS=10*MIL
FOLDER="/tmp/out/"
PARTITIONED_PATH=f"{FOLDER}partitioned_{NUM_OF_RECORDS}/"
NON_PARTITIONED_PATH_PREFIX=f"{FOLDER}non_partitioned_{NUM_OF_RECORDS}.parquet"
print(f"Creating fake data")
data = {
'id': range(NUM_OF_RECORDS), # Generate IDs from 1 to 100
'name': [fake.name() for _ in range(NUM_OF_RECORDS)],
'age': [fake.random_int(min=18, max=99) for _ in range(NUM_OF_RECORDS)],
'state': [fake.state() for _ in range(NUM_OF_RECORDS)],
'city': [fake.city() for _ in range(NUM_OF_RECORDS)],
'street': [fake.street_address() for _ in range(NUM_OF_RECORDS)]
}
df = pd.DataFrame(data)
# schrijven zonder partities
df.to_parquet(path=NON_PARTITIONED_PATH)
# schrijven van gepartitioneerde gegevens
df.to_parquet(path=PARTITIONED_PATH, partition_cols=['state'])
# lezen van niet-gepartitioneerde gegevens
start_time = time.time()
df1 = pd.read_parquet(path=NON_PARTITIONED_PATH)
df1 = df1[df1['state']=='California']
runtime1 = (time.time()) - start_time # 37 sec
# lezen van gepartitioneerde gegevens
start_time = time.time()
df2 = pd.read_parquet(path=PARTITIONED_PATH, filters=[('state','==','California')])
runtime2 = (time.time()) - start_time # 0.20 sec
De tijdsverbetering (samen met verminderde geheugen- en CPU-gebruik) is aanzienlijk, afnemend van 37 seconden tot slechts 0,20 seconden.
Filteren op Niet-Gepartitioneerde Velden
In het bovenstaande voorbeeld zagen we hoe filteren op basis van een gepartitioneerd veld de gegevensopvraag kan verbeteren. Er zijn echter scenario’s waarin gegevens niet effectief kunnen worden gepartitioneerd door het specifieke veld dat we willen filteren. Bovendien is filteren in sommige gevallen vereist op basis van meerdere velden. Dit betekent dat alle invoerbestanden worden geopend, wat schadelijk kan zijn voor de prestaties.
Gelukkig biedt Parquet een slimme oplossing om dit probleem te verlichten. Parquet-bestanden worden gesplitst in rijgroepen. Binnen elke rijgroep slaat Parquet metadata op. Deze metadata bevat de minimale en maximale waarden voor elk veld.
Bij het schrijven van Parquet-bestanden met Pandas, kunt u kiezen hoeveel records er in elke controlegroep zullen zijn.
Bij het gebruik van Pandas om Parquet-bestanden te lezen met filters, maakt de Pandas-bibliotheek gebruik van deze Parquet-metadata om gegevens efficiënt te filteren die in het geheugen worden geladen. Als het gewenste veld buiten het min/max-bereik van een rijgroep valt, wordt die hele rijgroep gracieus overgeslagen.
df = pd.DataFrame(data)
# het schrijven van niet-gedeelde gegevens, het specificeren van de grootte van de rijgroep
df.to_parquet(path=PATH_TO_PARQUET_FILE, row_group_size=1000000)
# het lezen van niet-gedeelde gegevens en filteren op rijgroepen alleen
df = pd.read_parquet(path=DATASET_PATH, filters=[('state','==','California')])
Het bekijken van de metadata in Parquet-bestanden kan worden gedaan met PyArrow
.
>>> import pyarrow.parquet as pq
>>> parquet_file = pq.ParquetFile(PATH_TO_PARQUET_FILE)
>>> parquet_file.metadata
<pyarrow._parquet.FileMetaData object at 0x125b21220>
created_by: parquet-cpp-arrow version 11.0.0
num_columns: 6
num_rows: 1000000
num_row_groups: 10
format_version: 2.6
serialized_size: 9325
>>> parquet_file.metadata.row_group(0).column(3)
<pyarrow._parquet.ColumnChunkMetaData object at 0x125b5b180>
file_offset: 1675616
file_path:
physical_type: BYTE_ARRAY
num_values: 100000
path_in_schema: state
is_stats_set: True
statistics:
<pyarrow._parquet.Statistics object at 0x115283590>
has_min_max: True
min: Alabama
max: Wyoming
null_count: 0
distinct_count: 0
num_values: 100000
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8
compression: SNAPPY
encodings: ('RLE_DICTIONARY', 'PLAIN', 'RLE')
has_dictionary_page: True
dictionary_page_offset: 1599792
data_page_offset: 1600354
total_compressed_size: 75824
total_uncompressed_size: 75891
Merk op dat het aantal rijgroepen in de metadata van het hele bestand wordt genoemd, en de minimale en maximale waarden worden genoemd in de statistieken sectie van elke kolom voor elke rijgroep.
Er is echter een methode om dit Parquet-kenmerk verder te benutten voor nog meer geoptimaliseerde resultaten: sorteren.
Filteren op gesorteerde velden
Zoals eerder vermeld, bevat een deel van de metadata die door Parquet wordt opgeslagen de minimale en maximale waarden voor elk veld binnen elke rijgroep. Wanneer de gegevens worden gesorteerd op basis van het veld waar we door willen filteren, heeft Pandas een grotere kans om meer rijgroepen over te slaan.
Bijvoorbeeld, laten we een dataset bekijken die een lijst van records bevat, waarbij één van de velden de ‘staat’ vertegenwoordigt. Als de records ongesorteerd zijn, is er een grote kans dat elke staat in de meeste rijgroepen voorkomt. Bekijk bijvoorbeeld de metadaten in de vorige sectie. Je kunt zien dat de eerste rijgroep alleen al alle staten bevat van ‘Alabama’ tot ‘Wyoming’.
Echter, als we de gegevens sorteren op basis van het ‘staat’ veld, is er een aanzienlijke kans dat we veel rijgroepen kunnen overslaan.
df = pd.DataFrame(data)
# gegevens sorteren op basis van 'staat'
df.sort_values("state").to_parquet(path=NON_PARTITIONED_SORTED_PATH)
Laten we nu nogmaals naar de metadaten kijken en zien hoe het is veranderd.
>>> parquet_file = pq.ParquetFile(PATH_TO_PARQUET_FILE)
>>> parquet_file.metadata.row_group(0).column(3).statistics.min
'Alabama'
>>> parquet_file.metadata.row_group(0).column(3).statistics.max
'Kentucky'
>>> parquet_file.metadata.row_group(1).column(3).statistics.min
'Kentucky'
>>> parquet_file.metadata.row_group(1).column(3).statistics.max
'North Dakota'
>>> parquet_file.metadata.row_group(2).column(3).statistics.min
'North Dakota'
>>> parquet_file.metadata.row_group(2).column(3).statistics.max
'Wyoming'
Zoals je kunt zien, na het sorteren op staat, zijn de min-max waarden dienovereenkomstig beïnvloed; elke rijgroep bevat nu een deel van de staten in plaats van alle staten. Dit betekent dat het lezen met filters nu veel sneller zou moeten zijn.
Laten we nu eens kijken hoe dit de prestaties van het lezen van de gegevens beïnvloedt. De code voor het lezen van de gegevens is niet veranderd.
# niet-gedeelde gegevens lezen en filteren op basis van rijgroepen, de invoer is gesorteerd op staat
start_time = time.time()
df = pd.read_parquet(path=DATASET_PATH, filters=[('state','==','California')])
runtime = (time.time()) - start_time # 0.24 seconds
Verbazingwekkend genoeg is de prestatie hier bijna even goed als bij het gebruik van delen.
Deze principe is van toepassing op zowel gepartitioneerde als niet-gepartitioneerde gegevens. We kunnen beide methoden tegelijkertijd gebruiken. Als we soms gegevens willen filteren op basis van veld A en andere keren op basis van veld B, dan kan partities maken op veld A en sorteren op veld B een goede optie zijn.
In andere gevallen, bijvoorbeeld waar het veld waar we door willen filteren een veld met hoge kardinaliteit is, kunnen we partities maken op basis van een hash van de waarde (bucketing) en de gegevens erin sorteren op de werkelijke waarde van het veld. Op deze manier kunnen we de voordelen van zowel partities maken als rijgroepen genieten.
Lezen van een deel van de kolommen
Hoewel minder vaak gebruikt, is een andere methode voor het bereiken van betere resultaten tijdens gegevensopvraging het selecteren van alleen de specifieke velden die essentieel zijn voor uw taak. Deze strategie kan af en toe verbeteringen in prestaties opleveren. Dit komt door de aard van het Parquet formaat. Parquet is geïmplementeerd in een kolomformaat, wat betekent dat het de gegevens kolom voor kolom binnen elke rijgroep opslaat. Het lezen van slechts enkele kolommen betekent dat de andere kolommen worden overgeslagen.
start_time = time.time()
df = pd.read_parquet(path=NON_PARTITIONED_SORTED_PATH, columns=["name", "state"])
runtime = (time.time()) - start_time # 0.08 seconds
Verrassend genoeg is de prestatieverbetering groot.
Conclusie
Hoewel gegevens partitioneren meestal de optimale aanpak is, is het niet altijd mogelijk. Het sorteren van gegevens kan leiden tot aanzienlijke verbeteringen. We kunnen meer rijgroepen overslaan door dit te doen. Bovendien is, indien mogelijk, het selecteren van alleen de noodzakelijke kolommen altijd een goede keuze.
Deze post heeft u geholpen om te begrijpen hoe u de kracht van Parquet en pandas kunt gebruiken voor betere prestaties.
Hier is een script dat alle eerder genoemde voorbeelden bevat, compleet met tijdvergelijkingen.
Source:
https://dzone.com/articles/parquet-data-filtering-with-pandas