Al abordar la filtración de datos desde archivos Parquet utilizando pandas, se pueden emplear varias estrategias. Si bien es ampliamente reconocido que la particionamiento de datos puede mejorar significativamente la eficiencia de las operaciones de filtrado, existen métodos adicionales para optimizar el rendimiento de la consulta de datos almacenados en archivos Parquet. El particionamiento es solo una de las opciones.
Filtrado por Campos Particionados
Como se mencionó anteriormente, este enfoque no solo es el más familiar sino también típicamente el más impactante en términos de optimización del rendimiento. La lógica detrás de esto es sencilla. Al utilizar particiones, es posible excluir selectivamente la necesidad de leer archivos completos o incluso directorios completos de archivos (también conocido como “empuje de predicado”), lo que resulta en una mejora sustancial y dramática en el rendimiento.
import pandas as pd
import time
from faker import Faker
fake = Faker()
MIL=1000000
NUM_OF_RECORDS=10*MIL
FOLDER="/tmp/out/"
PARTITIONED_PATH=f"{FOLDER}partitioned_{NUM_OF_RECORDS}/"
NON_PARTITIONED_PATH_PREFIX=f"{FOLDER}non_partitioned_{NUM_OF_RECORDS}.parquet"
print(f"Creating fake data")
data = {
'id': range(NUM_OF_RECORDS), # Generate IDs from 1 to 100
'name': [fake.name() for _ in range(NUM_OF_RECORDS)],
'age': [fake.random_int(min=18, max=99) for _ in range(NUM_OF_RECORDS)],
'state': [fake.state() for _ in range(NUM_OF_RECORDS)],
'city': [fake.city() for _ in range(NUM_OF_RECORDS)],
'street': [fake.street_address() for _ in range(NUM_OF_RECORDS)]
}
df = pd.DataFrame(data)
# Escribir sin particiones
df.to_parquet(path=NON_PARTITIONED_PATH)
# Escribir datos particionados
df.to_parquet(path=PARTITIONED_PATH, partition_cols=['state'])
# Leer sin particiones
start_time = time.time()
df1 = pd.read_parquet(path=NON_PARTITIONED_PATH)
df1 = df1[df1['state']=='California']
runtime1 = (time.time()) - start_time # 37 sec
# Leer datos particionados
start_time = time.time()
df2 = pd.read_parquet(path=PARTITIONED_PATH, filters=[('state','==','California')])
runtime2 = (time.time()) - start_time # 0.20 sec
La mejora en el tiempo (junto con el uso reducido de memoria y CPU) es sustancial, disminuyendo de 37 segundos a solo 0.20 segundos.
Filtrado por Campos No Particionados
En el ejemplo anterior, observamos cómo el filtrado basado en un campo particionado puede mejorar la recuperación de datos. Sin embargo, hay escenarios en los que los datos no pueden ser efectivamente particionados por el campo específico que deseamos filtrar. Además, en algunos casos, se requiere filtrar basándose en múltiples campos. Esto significa que todos los archivos de entrada se abrirán, lo que puede ser perjudicial para el rendimiento.
Agradecidamente, Parquet ofrece una solución inteligente para mitigar este problema. Los archivos Parquet se dividen en grupos de filas. Dentro de cada grupo de filas, Parquet almacena metadatos. Estos metadatos incluyen los valores mínimo y máximo para cada campo.
Al escribir archivos Parquet con Pandas, puedes seleccionar cuál será el número de registros en cada grupo de control.
Al utilizar Pandas para leer archivos Parquet con filtros, la biblioteca Pandas aprovecha estos metadatos de Parquet para filtrar de manera eficiente los datos cargados en la memoria. Si el campo deseado está fuera del rango min/max de un grupo de filas, todo ese grupo de filas se omite de manera elegante.
df = pd.DataFrame(data)
# escribiendo datos no particionados, especificando el tamaño del grupo de filas
df.to_parquet(path=PATH_TO_PARQUET_FILE, row_group_size=1000000)
# leyendo datos no particionados y filtrando solo por grupos de filas
df = pd.read_parquet(path=DATASET_PATH, filters=[('state','==','California')])
Ver los metadatos dentro de los archivos Parquet se puede hacer utilizando PyArrow
.
>>> import pyarrow.parquet as pq
>>> parquet_file = pq.ParquetFile(PATH_TO_PARQUET_FILE)
>>> parquet_file.metadata
<pyarrow._parquet.FileMetaData object at 0x125b21220>
created_by: parquet-cpp-arrow version 11.0.0
num_columns: 6
num_rows: 1000000
num_row_groups: 10
format_version: 2.6
serialized_size: 9325
>>> parquet_file.metadata.row_group(0).column(3)
<pyarrow._parquet.ColumnChunkMetaData object at 0x125b5b180>
file_offset: 1675616
file_path:
physical_type: BYTE_ARRAY
num_values: 100000
path_in_schema: state
is_stats_set: True
statistics:
<pyarrow._parquet.Statistics object at 0x115283590>
has_min_max: True
min: Alabama
max: Wyoming
null_count: 0
distinct_count: 0
num_values: 100000
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8
compression: SNAPPY
encodings: ('RLE_DICTIONARY', 'PLAIN', 'RLE')
has_dictionary_page: True
dictionary_page_offset: 1599792
data_page_offset: 1600354
total_compressed_size: 75824
total_uncompressed_size: 75891
Tenga en cuenta que el número de grupos de filas se menciona en los metadatos del archivo completo, y los valores mínimo y máximo se mencionan dentro de la sección de estadísticas de cada columna para cada grupo de filas.
Sin embargo, existe un método para aprovechar aún más esta función de Parquet para obtener resultados aún más optimizados: ordenación.
Filtrado por Campos Ordenados
Como se mencionó en la sección anterior, parte de los metadatos almacenados por Parquet incluye los valores mínimo y máximo para cada campo dentro de cada grupo de filas. Cuando los datos están ordenados según el campo por el que pretendemos filtrar, Pandas tiene mayor probabilidad de saltar más grupos de filas.
Por ejemplo, consideremos un conjunto de datos que incluye una lista de registros, con uno de los campos representando ‘estado’. Si los registros no están ordenados, es probable que cada estado aparezca en la mayoría de los grupos de filas. Por ejemplo, observe la metadatos en la sección anterior. Puedes ver que el primer grupo de filas solo contiene todos los estados desde ‘Alabama’ hasta ‘Wyoming’.
Sin embargo, si ordenamos los datos en función del campo ‘estado’, hay una probabilidad significativa de saltarnos muchos grupos de filas.
df = pd.DataFrame(data)
# ordenando los datos en función de 'estado'
df.sort_values("state").to_parquet(path=NON_PARTITIONED_SORTED_PATH)
Ahora, volvamos a ver la metadatos y veamos cómo ha cambiado.
>>> parquet_file = pq.ParquetFile(PATH_TO_PARQUET_FILE)
>>> parquet_file.metadata.row_group(0).column(3).statistics.min
'Alabama'
>>> parquet_file.metadata.row_group(0).column(3).statistics.max
'Kentucky'
>>> parquet_file.metadata.row_group(1).column(3).statistics.min
'Kentucky'
>>> parquet_file.metadata.row_group(1).column(3).statistics.max
'North Dakota'
>>> parquet_file.metadata.row_group(2).column(3).statistics.min
'North Dakota'
>>> parquet_file.metadata.row_group(2).column(3).statistics.max
'Wyoming'
Como puedes ver, después de ordenar por estado, los valores mínimo-máximo se ven afectados en consecuencia; cada grupo de filas contiene una parte de los estados en lugar de todos los estados. Esto significa que la lectura con filtros debería ser mucho más rápida ahora.
Ahora, veamos cómo afecta esto al rendimiento de la lectura de los datos. El código para leer los datos no ha cambiado.
# lectura de datos no particionados y filtrado por grupos de filas, los datos están ordenados por estado
start_time = time.time()
df = pd.read_parquet(path=DATASET_PATH, filters=[('state','==','California')])
runtime = (time.time()) - start_time # 0.24 seconds
Sorprendentemente, el rendimiento aquí es casi tan bueno como al utilizar particiones.
Este principio se aplica tanto a datos particionados como no particionados. Podemos utilizar ambos métodos al mismo tiempo. Si a veces queremos filtrar los datos en función del campo A y otras veces en función del campo B, entonces particionar por el campo A y ordenar por el campo B podría ser una buena opción.
En otros casos, por ejemplo, donde el campo por el que queremos filtrar es un campo con alta cardinalidad, podríamos particionar por algún hash del valor (agrupamiento) y ordenar los datos dentro de él por el valor real del campo de esta manera disfrutaremos de las ventajas de ambos métodos: particionamiento y grupos de filas.
Leer un Subconjunto de las Columnas
Aunque se usa con menos frecuencia, otro método para lograr mejores resultados durante la recuperación de datos implica seleccionar solo los campos específicos que son esenciales para su tarea. Esta estrategia a veces puede mejorar el rendimiento. Esto se debe a la naturaleza del formato Parquet. Parquet está implementado en un formato columnar, lo que significa que almacena los datos columna por columna dentro de cada grupo de filas. Leer solo algunas de las columnas significa que se omitirán las otras columnas.
start_time = time.time()
df = pd.read_parquet(path=NON_PARTITIONED_SORTED_PATH, columns=["name", "state"])
runtime = (time.time()) - start_time # 0.08 seconds
No es de extrañar que la mejora en el rendimiento sea grande.
Conclusión
Si bien la partición de datos suele ser el enfoque óptimo, no siempre es posible. Ordenar los datos puede llevar a mejoras significativas. Podemos omitir más grupos de filas con esto. Además, si es posible, seleccionar solo las columnas necesarias siempre es una buena opción.
Este post te ayudó a comprender cómo aprovechar el poder de Parquet y pandas para un mejor rendimiento.
Aquí hay un script que contiene todos los ejemplos mencionados anteriormente, incluidas las comparaciones de tiempo.
Source:
https://dzone.com/articles/parquet-data-filtering-with-pandas