使用Pandas进行Parquet数据过滤

在利用pandas从Parquet文件中筛选数据时,有多种策略可供选择。尽管普遍认为数据分区能显著提升筛选操作的效率,但还有其他方法可以优化查询Parquet文件中存储数据的表现。分区仅仅是其中之一。

通过分区字段筛选

如前所述,这种方法不仅最为人熟知,而且在性能优化方面通常效果最为显著。其背后的原理很简单:采用分区后,可以有选择地排除读取整个文件甚至整个目录文件的需求(即谓词下推),从而在性能上实现大幅且显著的提升。

Python

 

import pandas as pd
import time
from faker import Faker
 
fake = Faker()
 
MIL=1000000
NUM_OF_RECORDS=10*MIL
FOLDER="/tmp/out/"
PARTITIONED_PATH=f"{FOLDER}partitioned_{NUM_OF_RECORDS}/"
NON_PARTITIONED_PATH_PREFIX=f"{FOLDER}non_partitioned_{NUM_OF_RECORDS}.parquet"
 
print(f"Creating fake data")
data = {
    'id': range(NUM_OF_RECORDS),  # Generate IDs from 1 to 100
    'name': [fake.name() for _ in range(NUM_OF_RECORDS)],
    'age': [fake.random_int(min=18, max=99) for _ in range(NUM_OF_RECORDS)],
    'state': [fake.state() for _ in range(NUM_OF_RECORDS)],
    'city': [fake.city() for _ in range(NUM_OF_RECORDS)],
    'street': [fake.street_address() for _ in range(NUM_OF_RECORDS)]
}
 
df = pd.DataFrame(data)
 
# 未分区写入
df.to_parquet(path=NON_PARTITIONED_PATH)
 
# 分区数据写入
df.to_parquet(path=PARTITIONED_PATH, partition_cols=['state'])
 
# 读取未分区数据
start_time = time.time()
df1 = pd.read_parquet(path=NON_PARTITIONED_PATH)
df1 = df1[df1['state']=='California']
runtime1 = (time.time()) - start_time  # 37 sec
 
# 读取分区数据
start_time = time.time()
df2 = pd.read_parquet(path=PARTITIONED_PATH, filters=[('state','==','California')])
runtime2 = (time.time()) - start_time # 0.20 sec

时间改进(以及内存和CPU使用量的减少)非常显著,从37秒降至仅0.20秒。

通过非分区字段筛选

在上面的例子中,我们看到了基于分区字段进行筛选如何提升数据检索效率。然而,存在一些情况,数据不能有效地按照我们希望筛选的特定字段进行分区。此外,在某些情况下,需要基于多个字段进行筛选。这意味着所有输入文件都将被打开,这可能对性能造成损害。

幸运的是,Parquet提供了一个巧妙的解决方案来缓解这个问题。Parquet文件被分割成行组。在每个行组内,Parquet存储元数据。这些元数据包括每个字段的最小和最大值。

使用Pandas写入Parquet文件时,您可以选择每个控制组中记录的数量。

当使用Pandas读取带有过滤器的Parquet文件时,Pandas库利用这些Parquet元数据来高效地过滤加载到内存中的数据。如果所需字段超出了某个行组的最小/最大范围,则整个行组会被优雅地跳过。

Python

 

df = pd.DataFrame(data)

# 写入非分区数据,指定行组大小
df.to_parquet(path=PATH_TO_PARQUET_FILE, row_group_size=1000000)

# 读取非分区数据并仅通过行组过滤
df = pd.read_parquet(path=DATASET_PATH, filters=[('state','==','California')])

可以使用PyArrow查看Parquet文件内部的元数据。

Shell

 

>>> import pyarrow.parquet as pq

>>> parquet_file = pq.ParquetFile(PATH_TO_PARQUET_FILE)

>>> parquet_file.metadata
<pyarrow._parquet.FileMetaData object at 0x125b21220>
  created_by: parquet-cpp-arrow version 11.0.0
  num_columns: 6
  num_rows: 1000000
  num_row_groups: 10
  format_version: 2.6
  serialized_size: 9325

>>> parquet_file.metadata.row_group(0).column(3)
<pyarrow._parquet.ColumnChunkMetaData object at 0x125b5b180>
  file_offset: 1675616
  file_path: 
  physical_type: BYTE_ARRAY
  num_values: 100000
  path_in_schema: state
  is_stats_set: True
  statistics:
    <pyarrow._parquet.Statistics object at 0x115283590>
      has_min_max: True
      min: Alabama
      max: Wyoming
      null_count: 0
      distinct_count: 0
      num_values: 100000
      physical_type: BYTE_ARRAY
      logical_type: String
      converted_type (legacy): UTF8
  compression: SNAPPY
  encodings: ('RLE_DICTIONARY', 'PLAIN', 'RLE')
  has_dictionary_page: True
  dictionary_page_offset: 1599792
  data_page_offset: 1600354
  total_compressed_size: 75824
  total_uncompressed_size: 75891

请注意,整个文件的元数据中提到了行组数量,而每个列的每个行组的统计信息部分中提到了最小和最大值。

然而,还有一种方法可以进一步利用Parquet的这一特性,以获得更优化的结果:排序

通过排序字段进行过滤

如前所述,Parquet存储的元数据包括每个行组内每个字段的最小和最大值。当数据根据我们打算过滤的字段进行排序时,Pandas更有可能跳过更多的行组。

例如,假设有一组数据集,其中包含一系列记录,其中一个字段表示‘州’。如果这些记录未排序,那么每个州很可能会出现在大部分行组中。例如,回顾上一节的元数据,你会发现仅第一个行组就包含了从‘阿拉巴马’到‘怀俄明’的所有州。

然而,如果我们根据‘州’字段对数据进行排序,那么跳过许多行组的可能性将显著增加。

Python

 

df = pd.DataFrame(data)

# 根据'州'字段排序数据
df.sort_values("state").to_parquet(path=NON_PARTITIONED_SORTED_PATH)

现在,让我们再次查看元数据,看看它有何变化。

PowerShell

 

>>> parquet_file = pq.ParquetFile(PATH_TO_PARQUET_FILE)

>>> parquet_file.metadata.row_group(0).column(3).statistics.min
'Alabama'
>>> parquet_file.metadata.row_group(0).column(3).statistics.max
'Kentucky'


>>> parquet_file.metadata.row_group(1).column(3).statistics.min
'Kentucky'
>>> parquet_file.metadata.row_group(1).column(3).statistics.max
'North Dakota'


>>> parquet_file.metadata.row_group(2).column(3).statistics.min
'North Dakota'
>>> parquet_file.metadata.row_group(2).column(3).statistics.max
'Wyoming'

如你所见,在按州排序后,最小-最大值相应受到影响;每个行组现在只包含部分州而非全部州。这意味着使用过滤器进行读取现在应该快得多。

接下来,我们来看看这如何影响数据读取的性能。读取数据的代码并未改变。

Python

 

# 读取未分区数据并按行组过滤,输入已按州排序
start_time = time.time()
df = pd.read_parquet(path=DATASET_PATH, filters=[('state','==','California')])
runtime = (time.time()) - start_time # 0.24 seconds

令人惊讶的是,这里的性能几乎与使用分区相当。

此原则适用于分区数据和非分区数据。我们可以同时采用这两种方法。如果我们有时想根据字段A过滤数据,而其他时候根据字段B过滤,那么按字段A分区并按字段B排序可能是一个好选择。
在其他情况下,例如,当我们想要过滤的字段是一个高基数字段时,我们可以通过该值的某种哈希(分桶)进行分区,并在其中按字段的实际值对数据进行排序。这样,我们将同时享受分区和行组的优势。

读取部分列

虽然不常用,但另一种在数据检索时获得更好结果的方法是仅选择任务所需的关键字段。这种策略有时能提升性能,这是因为Parquet格式的特性。Parquet采用列式存储,意味着在每个行组中按列存储数据。只读取某些列意味着其他列将被跳过。

Python

 

start_time = time.time()
df = pd.read_parquet(path=NON_PARTITIONED_SORTED_PATH, columns=["name", "state"])
runtime = (time.time()) - start_time # 0.08 seconds

不出所料,性能提升显著。

结论

虽然数据分区通常是最佳方法,但并非总是可行。对数据进行排序可以带来显著改进。我们可能因此跳过更多行组。此外,如果可行,仅选择必要的列始终是一个好选择。

本文帮助你理解如何利用Parquet和Pandas提升性能。

这是一份脚本,包含了前面提到的所有示例,并附有时间对比。

Source:
https://dzone.com/articles/parquet-data-filtering-with-pandas