Parquet-gegevens filteren met Pandas

Bij het filteren van gegevens uit Parquet-bestanden met behulp van pandas, kunnen verschillende strategieën worden toegepast. Hoewel het breed erkend is dat het partitioneren van gegevens een significante verbetering kan betekenen voor de efficiëntie van filteerbewerkingen, zijn er aanvullende methoden om de prestaties van het opvragen van gegevens die zijn opgeslagen in Parquet-bestanden te optimaliseren. Partitioneren is slechts één van de mogelijkheden.

Filteren op Gepartitioneerde Velden

Zoals eerder genoemd, is deze aanpak niet alleen de meest bekende, maar ook meestal de meest impactvolle in termen van prestatieoptimalisatie. De reden hiervoor is eenvoudig. Wanneer partities worden gebruikt, kan het nodig zijn om gehele bestanden of zelfs volledige directories van bestanden te selectief uitsluiten (ook bekend als predicate pushdown), wat resulteert in een aanzienlijke en dramatische verbetering van de prestaties.

Python

 

import pandas as pd
import time
from faker import Faker
 
fake = Faker()
 
MIL=1000000
NUM_OF_RECORDS=10*MIL
FOLDER="/tmp/out/"
PARTITIONED_PATH=f"{FOLDER}partitioned_{NUM_OF_RECORDS}/"
NON_PARTITIONED_PATH_PREFIX=f"{FOLDER}non_partitioned_{NUM_OF_RECORDS}.parquet"
 
print(f"Creating fake data")
data = {
    'id': range(NUM_OF_RECORDS),  # Generate IDs from 1 to 100
    'name': [fake.name() for _ in range(NUM_OF_RECORDS)],
    'age': [fake.random_int(min=18, max=99) for _ in range(NUM_OF_RECORDS)],
    'state': [fake.state() for _ in range(NUM_OF_RECORDS)],
    'city': [fake.city() for _ in range(NUM_OF_RECORDS)],
    'street': [fake.street_address() for _ in range(NUM_OF_RECORDS)]
}
 
df = pd.DataFrame(data)
 
# schrijven zonder partities
df.to_parquet(path=NON_PARTITIONED_PATH)
 
# schrijven van gepartitioneerde gegevens
df.to_parquet(path=PARTITIONED_PATH, partition_cols=['state'])
 
# lezen van niet-gepartitioneerde gegevens
start_time = time.time()
df1 = pd.read_parquet(path=NON_PARTITIONED_PATH)
df1 = df1[df1['state']=='California']
runtime1 = (time.time()) - start_time  # 37 sec
 
# lezen van gepartitioneerde gegevens
start_time = time.time()
df2 = pd.read_parquet(path=PARTITIONED_PATH, filters=[('state','==','California')])
runtime2 = (time.time()) - start_time # 0.20 sec

De tijdsverbetering (samen met verminderde geheugen- en CPU-gebruik) is aanzienlijk, afnemend van 37 seconden tot slechts 0,20 seconden.

Filteren op Niet-Gepartitioneerde Velden

In het bovenstaande voorbeeld zagen we hoe filteren op basis van een gepartitioneerd veld de gegevensopvraag kan verbeteren. Er zijn echter scenario’s waarin gegevens niet effectief kunnen worden gepartitioneerd door het specifieke veld dat we willen filteren. Bovendien is filteren in sommige gevallen vereist op basis van meerdere velden. Dit betekent dat alle invoerbestanden worden geopend, wat schadelijk kan zijn voor de prestaties.

Gelukkig biedt Parquet een slimme oplossing om dit probleem te verlichten. Parquet-bestanden worden gesplitst in rijgroepen. Binnen elke rijgroep slaat Parquet metadata op. Deze metadata bevat de minimale en maximale waarden voor elk veld.

Bij het schrijven van Parquet-bestanden met Pandas, kunt u kiezen hoeveel records er in elke controlegroep zullen zijn.

Bij het gebruik van Pandas om Parquet-bestanden te lezen met filters, maakt de Pandas-bibliotheek gebruik van deze Parquet-metadata om gegevens efficiënt te filteren die in het geheugen worden geladen. Als het gewenste veld buiten het min/max-bereik van een rijgroep valt, wordt die hele rijgroep gracieus overgeslagen.

Python

 

df = pd.DataFrame(data)

# het schrijven van niet-gedeelde gegevens, het specificeren van de grootte van de rijgroep
df.to_parquet(path=PATH_TO_PARQUET_FILE, row_group_size=1000000)

# het lezen van niet-gedeelde gegevens en filteren op rijgroepen alleen
df = pd.read_parquet(path=DATASET_PATH, filters=[('state','==','California')])

Het bekijken van de metadata in Parquet-bestanden kan worden gedaan met PyArrow.

Shell

 

>>> import pyarrow.parquet as pq

>>> parquet_file = pq.ParquetFile(PATH_TO_PARQUET_FILE)

>>> parquet_file.metadata
<pyarrow._parquet.FileMetaData object at 0x125b21220>
  created_by: parquet-cpp-arrow version 11.0.0
  num_columns: 6
  num_rows: 1000000
  num_row_groups: 10
  format_version: 2.6
  serialized_size: 9325

>>> parquet_file.metadata.row_group(0).column(3)
<pyarrow._parquet.ColumnChunkMetaData object at 0x125b5b180>
  file_offset: 1675616
  file_path: 
  physical_type: BYTE_ARRAY
  num_values: 100000
  path_in_schema: state
  is_stats_set: True
  statistics:
    <pyarrow._parquet.Statistics object at 0x115283590>
      has_min_max: True
      min: Alabama
      max: Wyoming
      null_count: 0
      distinct_count: 0
      num_values: 100000
      physical_type: BYTE_ARRAY
      logical_type: String
      converted_type (legacy): UTF8
  compression: SNAPPY
  encodings: ('RLE_DICTIONARY', 'PLAIN', 'RLE')
  has_dictionary_page: True
  dictionary_page_offset: 1599792
  data_page_offset: 1600354
  total_compressed_size: 75824
  total_uncompressed_size: 75891

Merk op dat het aantal rijgroepen in de metadata van het hele bestand wordt genoemd, en de minimale en maximale waarden worden genoemd in de statistieken sectie van elke kolom voor elke rijgroep.

Er is echter een methode om dit Parquet-kenmerk verder te benutten voor nog meer geoptimaliseerde resultaten: sorteren.

Filteren op gesorteerde velden

Zoals eerder vermeld, bevat een deel van de metadata die door Parquet wordt opgeslagen de minimale en maximale waarden voor elk veld binnen elke rijgroep. Wanneer de gegevens worden gesorteerd op basis van het veld waar we door willen filteren, heeft Pandas een grotere kans om meer rijgroepen over te slaan.

Bijvoorbeeld, laten we een dataset bekijken die een lijst van records bevat, waarbij één van de velden de ‘staat’ vertegenwoordigt. Als de records ongesorteerd zijn, is er een grote kans dat elke staat in de meeste rijgroepen voorkomt. Bekijk bijvoorbeeld de metadaten in de vorige sectie. Je kunt zien dat de eerste rijgroep alleen al alle staten bevat van ‘Alabama’ tot ‘Wyoming’.

Echter, als we de gegevens sorteren op basis van het ‘staat’ veld, is er een aanzienlijke kans dat we veel rijgroepen kunnen overslaan.

Python

 

df = pd.DataFrame(data)

# gegevens sorteren op basis van 'staat'
df.sort_values("state").to_parquet(path=NON_PARTITIONED_SORTED_PATH)

Laten we nu nogmaals naar de metadaten kijken en zien hoe het is veranderd.

PowerShell

 

>>> parquet_file = pq.ParquetFile(PATH_TO_PARQUET_FILE)

>>> parquet_file.metadata.row_group(0).column(3).statistics.min
'Alabama'
>>> parquet_file.metadata.row_group(0).column(3).statistics.max
'Kentucky'


>>> parquet_file.metadata.row_group(1).column(3).statistics.min
'Kentucky'
>>> parquet_file.metadata.row_group(1).column(3).statistics.max
'North Dakota'


>>> parquet_file.metadata.row_group(2).column(3).statistics.min
'North Dakota'
>>> parquet_file.metadata.row_group(2).column(3).statistics.max
'Wyoming'

Zoals je kunt zien, na het sorteren op staat, zijn de min-max waarden dienovereenkomstig beïnvloed; elke rijgroep bevat nu een deel van de staten in plaats van alle staten. Dit betekent dat het lezen met filters nu veel sneller zou moeten zijn.

Laten we nu eens kijken hoe dit de prestaties van het lezen van de gegevens beïnvloedt. De code voor het lezen van de gegevens is niet veranderd.

Python

 

# niet-gedeelde gegevens lezen en filteren op basis van rijgroepen, de invoer is gesorteerd op staat
start_time = time.time()
df = pd.read_parquet(path=DATASET_PATH, filters=[('state','==','California')])
runtime = (time.time()) - start_time # 0.24 seconds

Verbazingwekkend genoeg is de prestatie hier bijna even goed als bij het gebruik van delen.

Deze principe is van toepassing op zowel gepartitioneerde als niet-gepartitioneerde gegevens. We kunnen beide methoden tegelijkertijd gebruiken. Als we soms gegevens willen filteren op basis van veld A en andere keren op basis van veld B, dan kan partities maken op veld A en sorteren op veld B een goede optie zijn.
In andere gevallen, bijvoorbeeld waar het veld waar we door willen filteren een veld met hoge kardinaliteit is, kunnen we partities maken op basis van een hash van de waarde (bucketing) en de gegevens erin sorteren op de werkelijke waarde van het veld. Op deze manier kunnen we de voordelen van zowel partities maken als rijgroepen genieten.

Lezen van een deel van de kolommen

Hoewel minder vaak gebruikt, is een andere methode voor het bereiken van betere resultaten tijdens gegevensopvraging het selecteren van alleen de specifieke velden die essentieel zijn voor uw taak. Deze strategie kan af en toe verbeteringen in prestaties opleveren. Dit komt door de aard van het Parquet formaat. Parquet is geïmplementeerd in een kolomformaat, wat betekent dat het de gegevens kolom voor kolom binnen elke rijgroep opslaat. Het lezen van slechts enkele kolommen betekent dat de andere kolommen worden overgeslagen.

Python

 

start_time = time.time()
df = pd.read_parquet(path=NON_PARTITIONED_SORTED_PATH, columns=["name", "state"])
runtime = (time.time()) - start_time # 0.08 seconds

Verrassend genoeg is de prestatieverbetering groot.

Conclusie

Hoewel gegevens partitioneren meestal de optimale aanpak is, is het niet altijd mogelijk. Het sorteren van gegevens kan leiden tot aanzienlijke verbeteringen. We kunnen meer rijgroepen overslaan door dit te doen. Bovendien is, indien mogelijk, het selecteren van alleen de noodzakelijke kolommen altijd een goede keuze.

Deze post heeft u geholpen om te begrijpen hoe u de kracht van Parquet en pandas kunt gebruiken voor betere prestaties.

Hier is een script dat alle eerder genoemde voorbeelden bevat, compleet met tijdvergelijkingen.

Source:
https://dzone.com/articles/parquet-data-filtering-with-pandas