Ao lidar com a filtragem de dados a partir de arquivos Parquet usando o pandas, diversas estratégias podem ser adotadas. Embora seja amplamente reconhecido que a partição de dados pode melhorar significativamente a eficiência das operações de filtragem, existem outras técnicas para otimizar o desempenho de consultas em dados armazenados em arquivos Parquet. A partição é apenas uma das opções.
Filtragem por Campos Particionados
Como mencionado anteriormente, essa abordagem não apenas é a mais familiar, mas também geralmente é a mais impactante em termos de otimização de desempenho. A lógica por trás disso é simples. Ao usar partições, é possível excluir seletivamente a necessidade de ler arquivos inteiros ou mesmo diretórios inteiros de arquivos (também conhecido como “empurrão de predicado”), resultando em uma melhoria substancial e dramática no desempenho.
import pandas as pd
import time
from faker import Faker
fake = Faker()
MIL=1000000
NUM_OF_RECORDS=10*MIL
FOLDER="/tmp/out/"
PARTITIONED_PATH=f"{FOLDER}partitioned_{NUM_OF_RECORDS}/"
NON_PARTITIONED_PATH_PREFIX=f"{FOLDER}non_partitioned_{NUM_OF_RECORDS}.parquet"
print(f"Creating fake data")
data = {
'id': range(NUM_OF_RECORDS), # Generate IDs from 1 to 100
'name': [fake.name() for _ in range(NUM_OF_RECORDS)],
'age': [fake.random_int(min=18, max=99) for _ in range(NUM_OF_RECORDS)],
'state': [fake.state() for _ in range(NUM_OF_RECORDS)],
'city': [fake.city() for _ in range(NUM_OF_RECORDS)],
'street': [fake.street_address() for _ in range(NUM_OF_RECORDS)]
}
df = pd.DataFrame(data)
# Escrevendo sem partições
df.to_parquet(path=NON_PARTITIONED_PATH)
# Escrevendo dados particionados
df.to_parquet(path=PARTITIONED_PATH, partition_cols=['state'])
# Lendo sem partição
start_time = time.time()
df1 = pd.read_parquet(path=NON_PARTITIONED_PATH)
df1 = df1[df1['state']=='California']
runtime1 = (time.time()) - start_time # 37 sec
# Lendo dados particionados
start_time = time.time()
df2 = pd.read_parquet(path=PARTITIONED_PATH, filters=[('state','==','California')])
runtime2 = (time.time()) - start_time # 0.20 sec
A melhoria no tempo (juntamente com a redução no uso de memória e CPU) é substancial, diminuindo de 37 segundos para apenas 0,20 segundos.
Filtragem por Campos Não Particionados
No exemplo acima, observamos como a filtragem baseada em um campo particionado pode melhorar a recuperação de dados. No entanto, existem cenários em que os dados não podem ser efetivamente particionados pelo campo específico que desejamos filtrar. Além disso, em alguns casos, a filtragem é necessária com base em múltiplos campos. Isso significa que todos os arquivos de entrada serão abertos, o que pode prejudicar o desempenho.
Graças a Deus, o Parquet oferece uma solução inteligente para mitigar esse problema. Os arquivos Parquet são divididos em grupos de linhas. Dentro de cada grupo de linhas, o Parquet armazena metadados. Esses metadados incluem os valores mínimo e máximo para cada campo.
Ao escrever arquivos Parquet com o Pandas, você pode selecionar quantos registros haverá em cada grupo de controle.
Ao usar o Pandas para ler arquivos Parquet com filtros, a biblioteca Pandas aproveita esses metadados do Parquet para filtrar eficientemente os dados carregados na memória. Se o campo desejado estiver fora do intervalo min/max de um grupo de linhas, todo esse grupo de linhas é pulado graciosamente.
df = pd.DataFrame(data)
# escrevendo dados não particionados, especificando o tamanho do grupo de linhas
df.to_parquet(path=PATH_TO_PARQUET_FILE, row_group_size=1000000)
# lendo dados não particionados e filtrando apenas por grupos de linhas
df = pd.read_parquet(path=DATASET_PATH, filters=[('state','==','California')])
Visualizar os metadados dentro dos arquivos Parquet pode ser feito usando PyArrow
.
>>> import pyarrow.parquet as pq
>>> parquet_file = pq.ParquetFile(PATH_TO_PARQUET_FILE)
>>> parquet_file.metadata
<pyarrow._parquet.FileMetaData object at 0x125b21220>
created_by: parquet-cpp-arrow version 11.0.0
num_columns: 6
num_rows: 1000000
num_row_groups: 10
format_version: 2.6
serialized_size: 9325
>>> parquet_file.metadata.row_group(0).column(3)
<pyarrow._parquet.ColumnChunkMetaData object at 0x125b5b180>
file_offset: 1675616
file_path:
physical_type: BYTE_ARRAY
num_values: 100000
path_in_schema: state
is_stats_set: True
statistics:
<pyarrow._parquet.Statistics object at 0x115283590>
has_min_max: True
min: Alabama
max: Wyoming
null_count: 0
distinct_count: 0
num_values: 100000
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8
compression: SNAPPY
encodings: ('RLE_DICTIONARY', 'PLAIN', 'RLE')
has_dictionary_page: True
dictionary_page_offset: 1599792
data_page_offset: 1600354
total_compressed_size: 75824
total_uncompressed_size: 75891
Observe que o número de grupos de linhas é mencionado nos metadados do arquivo inteiro, e os valores mínimo e máximo são mencionados dentro da seção de estatísticas de cada coluna para cada grupo de linhas.
No entanto, há um método para aproveitar ainda mais esse recurso do Parquet para resultados ainda mais otimizados: classificação.
Filtragem por Campos Ordenados
Como mencionado na seção anterior, parte dos metadados armazenados pelo Parquet inclui os valores mínimo e máximo para cada campo em cada grupo de linhas. Quando os dados são ordenados com base no campo pelo qual pretendemos filtrar, o Pandas tem maior probabilidade de pular mais grupos de linhas.
Por exemplo, consideremos um conjunto de dados que inclui uma lista de registros, com um dos campos representando ‘estado’. Se os registros não estiverem ordenados, há uma boa chance de que cada estado apareça em a maioria dos grupos de linhas. Por exemplo, observe os metadados na seção anterior. Você pode ver que o 1º grupo de linhas sozinho contém todos os estados de ‘Alabama’ a ‘Wyoming’.
No entanto, se ordenarmos os dados com base no campo ‘estado’, há uma probabilidade significativa de pular muitos grupos de linhas.
df = pd.DataFrame(data)
# ordenando os dados com base em 'estado'
df.sort_values("state").to_parquet(path=NON_PARTITIONED_SORTED_PATH)
Agora, vamos olhar novamente para os metadados e ver como eles mudaram.
>>> parquet_file = pq.ParquetFile(PATH_TO_PARQUET_FILE)
>>> parquet_file.metadata.row_group(0).column(3).statistics.min
'Alabama'
>>> parquet_file.metadata.row_group(0).column(3).statistics.max
'Kentucky'
>>> parquet_file.metadata.row_group(1).column(3).statistics.min
'Kentucky'
>>> parquet_file.metadata.row_group(1).column(3).statistics.max
'North Dakota'
>>> parquet_file.metadata.row_group(2).column(3).statistics.min
'North Dakota'
>>> parquet_file.metadata.row_group(2).column(3).statistics.max
'Wyoming'
Como você pode ver, após a ordenação por estado, os valores mínimo-máximo são afetados conforme necessário; cada grupo de linhas contém parte dos estados em vez de todos os estados. Isso significa que a leitura com filtros deve ser muito mais rápida agora.
Agora, vamos ver como isso afeta o desempenho da leitura dos dados. O código para ler os dados não foi alterado.
# lendo dados não particionados e filtrando por grupos de linhas, os dados de entrada são ordenados por estado
start_time = time.time()
df = pd.read_parquet(path=DATASET_PATH, filters=[('state','==','California')])
runtime = (time.time()) - start_time # 0.24 seconds
Surpreendentemente, o desempenho aqui é quase tão bom quanto usar partições.
Este princípio se aplica tanto a dados particionados quanto a não particionados. Podemos usar ambos os métodos ao mesmo tempo. Se às vezes quisermos filtrar os dados com base no campo A e outras vezes com base no campo B, então particionar por campo A e classificar por campo B pode ser uma boa opção.
Em outros casos, por exemplo, onde o campo pelo qual queremos filtrar é um campo com alta cardinalidade, poderíamos particionar por alguma hash do valor (bucketing) e classificar os dados dentro dele pelo valor real do campo. Dessa forma, aproveitaremos as vantagens de ambos os métodos — partição e grupos de linhas.
Lendo um Subconjunto das Colunas
Embora seja menos comum, outra maneira de obter melhores resultados durante a recuperação de dados envolve selecionar apenas os campos específicos essenciais para sua tarefa. Essa estratégia pode ocasionalmente gerar melhorias de desempenho. Isso ocorre devido à natureza do formato Parquet. O Parquet é implementado em um formato columnar, o que significa que armazena os dados coluna por coluna dentro de cada grupo de linhas. Ler apenas algumas das colunas significa que as outras colunas serão puladas.
start_time = time.time()
df = pd.read_parquet(path=NON_PARTITIONED_SORTED_PATH, columns=["name", "state"])
runtime = (time.time()) - start_time # 0.08 seconds
Não é surpreendente que a melhoria no desempenho seja grande.
Conclusão
Embora a partição de dados seja geralmente a abordagem ideal, nem sempre é possível. Classificar os dados pode levar a melhorias significativas. Podemos pular mais grupos de linhas com isso. Além disso, se possível, selecionar apenas as colunas necessárias é sempre uma boa escolha.
Este post ajudou você a entender como aproveitar o poder do parquet e pandas para um melhor desempenho.
Aqui está um script contendo todos os exemplos mencionados anteriormente, completo com comparações de tempo.
Source:
https://dzone.com/articles/parquet-data-filtering-with-pandas