Ao filtrar dados de arquivos Parquet usando o pandas, diversas estratégias podem ser adotadas. Embora seja amplamente reconhecido que a partição de dados pode melhorar significativamente a eficiência das operações de filtragem, existem outras técnicas para otimizar o desempenho de consultas a dados armazenados em arquivos Parquet. A partição é apenas uma das opções.
Filtragem por Campos Particionados
Como mencionado anteriormente, essa abordagem não apenas é a mais familiar, mas também geralmente a mais impactante em termos de otimização de desempenho. A lógica por trás disso é simples. Ao usar partições, é possível excluir seletivamente a necessidade de ler arquivos inteiros ou até mesmo diretórios inteiros de arquivos (também conhecido como “empurrar a predicação”), resultando em uma melhoria substancial e dramática no desempenho.
import pandas as pd
import time
from faker import Faker
fake = Faker()
MIL=1000000
NUM_OF_RECORDS=10*MIL
FOLDER="/tmp/out/"
PARTITIONED_PATH=f"{FOLDER}partitioned_{NUM_OF_RECORDS}/"
NON_PARTITIONED_PATH_PREFIX=f"{FOLDER}non_partitioned_{NUM_OF_RECORDS}.parquet"
print(f"Creating fake data")
data = {
'id': range(NUM_OF_RECORDS), # Generate IDs from 1 to 100
'name': [fake.name() for _ in range(NUM_OF_RECORDS)],
'age': [fake.random_int(min=18, max=99) for _ in range(NUM_OF_RECORDS)],
'state': [fake.state() for _ in range(NUM_OF_RECORDS)],
'city': [fake.city() for _ in range(NUM_OF_RECORDS)],
'street': [fake.street_address() for _ in range(NUM_OF_RECORDS)]
}
df = pd.DataFrame(data)
# Escrevendo sem partições
df.to_parquet(path=NON_PARTITIONED_PATH)
# Escrevendo dados particionados
df.to_parquet(path=PARTITIONED_PATH, partition_cols=['state'])
# Lendo não particionado
start_time = time.time()
df1 = pd.read_parquet(path=NON_PARTITIONED_PATH)
df1 = df1[df1['state']=='California']
runtime1 = (time.time()) - start_time # 37 sec
# Lendo dados particionados
start_time = time.time()
df2 = pd.read_parquet(path=PARTITIONED_PATH, filters=[('state','==','California')])
runtime2 = (time.time()) - start_time # 0.20 sec
A melhoria no tempo (juntamente com a redução no uso de memória e CPU) é substancial, diminuindo de 37 segundos para apenas 0,20 segundos.
Filtragem por Campos Não Particionados
No exemplo acima, observamos como a filtragem baseada em um campo particionado pode melhorar a recuperação de dados. No entanto, existem cenários em que os dados não podem ser efetivamente particionados pelo campo específico que desejamos filtrar. Além disso, em alguns casos, a filtragem é necessária com base em vários campos. Isso significa que todos os arquivos de entrada serão abertos, o que pode ser prejudicial ao desempenho.
Graças a Deus, o Parquet oferece uma solução inteligente para mitigar esse problema. Os arquivos Parquet são divididos em grupos de linhas. Dentro de cada grupo de linhas, o Parquet armazena metadados. Esses metadados incluem os valores mínimo e máximo para cada campo.
Ao gravar arquivos Parquet com o Pandas, você pode selecionar quantos registros haverá em cada grupo de controle.
Ao usar o Pandas para ler arquivos Parquet com filtros, a biblioteca Pandas utiliza esses metadados do Parquet para filtrar eficientemente os dados carregados na memória. Se o campo desejado estiver fora do intervalo min/max de um grupo de linhas, todo esse grupo de linhas é pulado graciosamente.
df = pd.DataFrame(data)
# escrevendo dados não particionados, especificando o tamanho do grupo de linhas
df.to_parquet(path=PATH_TO_PARQUET_FILE, row_group_size=1000000)
# lendo dados não particionados e filtrando apenas por grupos de linhas
df = pd.read_parquet(path=DATASET_PATH, filters=[('state','==','California')])
Visualizar os metadados dentro dos arquivos Parquet pode ser feito usando PyArrow
.
>>> import pyarrow.parquet as pq
>>> parquet_file = pq.ParquetFile(PATH_TO_PARQUET_FILE)
>>> parquet_file.metadata
<pyarrow._parquet.FileMetaData object at 0x125b21220>
created_by: parquet-cpp-arrow version 11.0.0
num_columns: 6
num_rows: 1000000
num_row_groups: 10
format_version: 2.6
serialized_size: 9325
>>> parquet_file.metadata.row_group(0).column(3)
<pyarrow._parquet.ColumnChunkMetaData object at 0x125b5b180>
file_offset: 1675616
file_path:
physical_type: BYTE_ARRAY
num_values: 100000
path_in_schema: state
is_stats_set: True
statistics:
<pyarrow._parquet.Statistics object at 0x115283590>
has_min_max: True
min: Alabama
max: Wyoming
null_count: 0
distinct_count: 0
num_values: 100000
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8
compression: SNAPPY
encodings: ('RLE_DICTIONARY', 'PLAIN', 'RLE')
has_dictionary_page: True
dictionary_page_offset: 1599792
data_page_offset: 1600354
total_compressed_size: 75824
total_uncompressed_size: 75891
Observe que o número de grupos de linhas é mencionado nos metadados do arquivo inteiro, e os valores mínimo e máximo são mencionados dentro da seção de estatísticas de cada coluna para cada grupo de linhas.
No entanto, há um método para aproveitar ainda mais essa característica do Parquet para resultados ainda mais otimizados: ordenando.
Filtrando por Campos Ordenados
Como mencionado na seção anterior, parte dos metadados armazenados pelo Parquet inclui os valores mínimo e máximo para cada campo em cada grupo de linhas. Quando os dados são ordenados com base no campo pelo qual pretendemos filtrar, o Pandas tem maior probabilidade de pular mais grupos de linhas.
Por exemplo, consideremos um conjunto de dados que inclui uma lista de registros, com um dos campos representando ‘estado’. Se os registros não estiverem ordenados, há uma boa chance de que cada estado apareça em grande parte dos grupos de linhas. Por exemplo, observe os metadados na seção anterior. Você pode ver que o 1º grupo de linhas por si só contém todos os estados de ‘Alabama’ a ‘Wyoming’.
No entanto, se ordenarmos os dados com base no campo ‘estado’, há uma probabilidade significativa de ignorarmos muitos grupos de linhas.
df = pd.DataFrame(data)
# ordenando os dados com base em 'estado'
df.sort_values("state").to_parquet(path=NON_PARTITIONED_SORTED_PATH)
Agora, vamos olhar novamente para os metadados e ver como eles mudaram.
>>> parquet_file = pq.ParquetFile(PATH_TO_PARQUET_FILE)
>>> parquet_file.metadata.row_group(0).column(3).statistics.min
'Alabama'
>>> parquet_file.metadata.row_group(0).column(3).statistics.max
'Kentucky'
>>> parquet_file.metadata.row_group(1).column(3).statistics.min
'Kentucky'
>>> parquet_file.metadata.row_group(1).column(3).statistics.max
'North Dakota'
>>> parquet_file.metadata.row_group(2).column(3).statistics.min
'North Dakota'
>>> parquet_file.metadata.row_group(2).column(3).statistics.max
'Wyoming'
Como você pode ver, após a ordenação por estado, os valores mínimo-máximo são afetados de acordo; cada grupo de linhas contém parte dos estados em vez de todos os estados. Isso significa que a leitura com filtros deve ser muito mais rápida agora.
Agora, vamos ver como isso afeta o desempenho da leitura dos dados. O código para ler os dados não foi alterado.
# lendo dados não particionados e filtrando por grupos de linhas, os dados de entrada são ordenados por estado
start_time = time.time()
df = pd.read_parquet(path=DATASET_PATH, filters=[('state','==','California')])
runtime = (time.time()) - start_time # 0.24 seconds
Surpreendentemente, o desempenho aqui é quase tão bom quanto ao usar partições.
Este princípio se aplica tanto a dados particionados quanto a não particionados. Podemos usar ambos os métodos ao mesmo tempo. Se às vezes quisermos filtrar os dados com base no campo A e outras vezes com base no campo B, então particionar por campo A e classificar por campo B pode ser uma boa opção.
Em outros casos, por exemplo, onde o campo pelo qual queremos filtrar é um campo com alta cardinalidade, poderíamos particionar por algum hash do valor (bucketing) e classificar os dados dentro dele pelo valor real do campo, desta forma aproveitaremos as vantagens de ambos os métodos — partição e grupos de linhas.
Lendo um Subconjunto das Colunas
Embora menos comumente usado, outra maneira de obter melhores resultados durante a recuperação de dados envolve selecionar apenas os campos específicos essenciais para sua tarefa. Essa estratégia pode ocasionalmente trazer melhorias de desempenho. Isso ocorre devido à natureza do formato Parquet. O Parquet é implementado em um formato columnar, o que significa que armazena os dados coluna por coluna dentro de cada grupo de linhas. Ler apenas algumas das colunas significa que as outras colunas serão ignoradas.
start_time = time.time()
df = pd.read_parquet(path=NON_PARTITIONED_SORTED_PATH, columns=["name", "state"])
runtime = (time.time()) - start_time # 0.08 seconds
Não é de surpreender que a melhoria no desempenho seja grande.
Conclusão
Embora a partição de dados seja geralmente a abordagem ideal, nem sempre é possível. Classificar os dados pode levar a melhorias significativas. Podemos pular mais grupos de linhas com isso. Além disso, se viável, selecionar apenas as colunas necessárias é sempre uma boa escolha.
Este post ajudou você a entender como aproveitar o poder do parquet e pandas para um melhor desempenho.
Aqui está um script contendo todos os exemplos mencionados anteriormente, completo com comparações de tempo.
Source:
https://dzone.com/articles/parquet-data-filtering-with-pandas