使用Pandas進行Parquet數據過濾

談到使用pandas從Parquet檔案中過濾數據,有多種策略可供採用。雖然普遍認為數據分區能大幅提升過濾操作的效率,但還有其他方法能優化查詢存儲在Parquet檔案中的數據的性能。分區僅是其中一種選擇。

透過分區字段進行過濾

如前所述,此方法不僅最為人熟知,而且在性能優化方面通常最具影響力。其背後的原理相當直接。當使用分區時,可以有選擇地排除讀取整個檔案甚至整個目錄檔案的需求(即,謂詞下推),從而大幅提升性能。

Python

 

import pandas as pd
import time
from faker import Faker
 
fake = Faker()
 
MIL=1000000
NUM_OF_RECORDS=10*MIL
FOLDER="/tmp/out/"
PARTITIONED_PATH=f"{FOLDER}partitioned_{NUM_OF_RECORDS}/"
NON_PARTITIONED_PATH_PREFIX=f"{FOLDER}non_partitioned_{NUM_OF_RECORDS}.parquet"
 
print(f"Creating fake data")
data = {
    'id': range(NUM_OF_RECORDS),  # Generate IDs from 1 to 100
    'name': [fake.name() for _ in range(NUM_OF_RECORDS)],
    'age': [fake.random_int(min=18, max=99) for _ in range(NUM_OF_RECORDS)],
    'state': [fake.state() for _ in range(NUM_OF_RECORDS)],
    'city': [fake.city() for _ in range(NUM_OF_RECORDS)],
    'street': [fake.street_address() for _ in range(NUM_OF_RECORDS)]
}
 
df = pd.DataFrame(data)
 
# 未分區寫入
df.to_parquet(path=NON_PARTITIONED_PATH)
 
# 分區數據寫入
df.to_parquet(path=PARTITIONED_PATH, partition_cols=['state'])
 
# 讀取未分區數據
start_time = time.time()
df1 = pd.read_parquet(path=NON_PARTITIONED_PATH)
df1 = df1[df1['state']=='California']
runtime1 = (time.time()) - start_time  # 37 sec
 
# 讀取分區數據
start_time = time.time()
df2 = pd.read_parquet(path=PARTITIONED_PATH, filters=[('state','==','California')])
runtime2 = (time.time()) - start_time # 0.20 sec

時間上的改善(以及減少的記憶體和CPU使用)相當顯著,從37秒降至僅0.20秒。

透過非分區字段進行過濾

在上例中,我們看到了基於分區字段過濾如何增強數據檢索。然而,有些情況下,我們希望過濾的特定字段無法有效分區。此外,某些情況下需要基於多個字段進行過濾。這意味著所有輸入檔案都將被打開,這可能對性能造成損害。

幸運的是,Parquet提供了一個巧妙的解決方案來緩解這個問題。Parquet文件被分割成行組。在每個行組內,Parquet存儲元數據。這些元數據包括每個字段的最小和最大值。

在使用Pandas寫入Parquet文件時,您可以選擇每個控制組中將包含多少記錄。

當使用Pandas讀取帶有過濾器的Parquet文件時,Pandas庫利用Parquet元數據高效地將數據過濾到內存中。如果所需的字段超出了某個行組的最小/最大範圍,則整個行組會被優雅地跳過。

Python

 

df = pd.DataFrame(data)

# 寫入非分區數據,指定行組大小
df.to_parquet(path=PATH_TO_PARQUET_FILE, row_group_size=1000000)

# 讀取非分區數據並僅通過行組進行過濾
df = pd.read_parquet(path=DATASET_PATH, filters=[('state','==','California')])

使用PyArrow可以查看Parquet文件內的元數據。

Shell

 

>>> import pyarrow.parquet as pq

>>> parquet_file = pq.ParquetFile(PATH_TO_PARQUET_FILE)

>>> parquet_file.metadata
<pyarrow._parquet.FileMetaData object at 0x125b21220>
  created_by: parquet-cpp-arrow version 11.0.0
  num_columns: 6
  num_rows: 1000000
  num_row_groups: 10
  format_version: 2.6
  serialized_size: 9325

>>> parquet_file.metadata.row_group(0).column(3)
<pyarrow._parquet.ColumnChunkMetaData object at 0x125b5b180>
  file_offset: 1675616
  file_path: 
  physical_type: BYTE_ARRAY
  num_values: 100000
  path_in_schema: state
  is_stats_set: True
  statistics:
    <pyarrow._parquet.Statistics object at 0x115283590>
      has_min_max: True
      min: Alabama
      max: Wyoming
      null_count: 0
      distinct_count: 0
      num_values: 100000
      physical_type: BYTE_ARRAY
      logical_type: String
      converted_type (legacy): UTF8
  compression: SNAPPY
  encodings: ('RLE_DICTIONARY', 'PLAIN', 'RLE')
  has_dictionary_page: True
  dictionary_page_offset: 1599792
  data_page_offset: 1600354
  total_compressed_size: 75824
  total_uncompressed_size: 75891

注意,整個文件的元數據中提到了行組的數量,而每個行組的每個列的統計信息部分中提到了最小和最大值。

然而,有一種方法可以進一步利用Parquet的這一特性,以實現更優化的結果:排序

按排序字段過濾

如前一節所述,Parquet存儲的元數據的一部分包括每個行組內每個字段的最小和最大值。當數據根據我們打算過濾的字段進行排序時,Pandas更有可能跳過更多的行組。

舉例來說,假設有一個數據集包含一系列記錄,其中一個欄位代表’州’。若這些記錄未經排序,各州很可能會出現在大多數的行組中。例如,參考前一節的元數據,您會發現僅第一個行組就包含了從’阿拉巴馬’到’懷俄明’的所有州。

然而,若我們依據’州’欄位對數據進行排序,則有很大機會能跳過許多行組。

Python

 

df = pd.DataFrame(data)

# 依據'州'排序數據
df.sort_values("state").to_parquet(path=NON_PARTITIONED_SORTED_PATH)

現在,讓我們再次查看元數據,觀察其變化。

PowerShell

 

>>> parquet_file = pq.ParquetFile(PATH_TO_PARQUET_FILE)

>>> parquet_file.metadata.row_group(0).column(3).statistics.min
'Alabama'
>>> parquet_file.metadata.row_group(0).column(3).statistics.max
'Kentucky'


>>> parquet_file.metadata.row_group(1).column(3).statistics.min
'Kentucky'
>>> parquet_file.metadata.row_group(1).column(3).statistics.max
'North Dakota'


>>> parquet_file.metadata.row_group(2).column(3).statistics.min
'North Dakota'
>>> parquet_file.metadata.row_group(2).column(3).statistics.max
'Wyoming'

如您所見,經過按州排序後,最小至最大值受到相應影響;每個行組現在只包含部分州而非全部州。這意味著使用篩選器進行讀取應該會快得多。

接著,讓我們看看這如何影響數據讀取的效能。讀取數據的代碼並未改變。

Python

 

# 讀取未分區的數據並按行組篩選,輸入已按州排序
start_time = time.time()
df = pd.read_parquet(path=DATASET_PATH, filters=[('state','==','California')])
runtime = (time.time()) - start_time # 0.24 seconds

令人驚訝的是,此處的效能幾乎與使用分區相當。

此原則適用於分區與非分區數據。我們可以同時採用兩種方法。若我們有時依據欄位A過濾數據,有時則依據欄位B,那麼按欄位A進行分區並按欄位B排序可能是一個好選擇。
在其他情況下,例如,當我們想要依據的是一個高基數的欄位時,我們可以通過該值的某種哈希(桶式)進行分區,並在內部按該欄位的實際值排序。這樣一來,我們就能同時享受到分區與列組的優勢。

讀取列的子集

雖然較少使用,但另一種在數據檢索時達到更好效果的方法是僅選擇對任務至關重要的特定欄位。此策略有時能帶來性能上的提升。這是由於Parquet格式的特性。Parquet以列式格式實現,意味著它在每個列組內按列存儲數據。只讀取部分列意味著其他列將被跳過。

Python

 

start_time = time.time()
df = pd.read_parquet(path=NON_PARTITIONED_SORTED_PATH, columns=["name", "state"])
runtime = (time.time()) - start_time # 0.08 seconds

不出所料,性能提升顯著。

結論

雖然數據分區通常是最佳方法,但並非總是可行。排序數據能帶來顯著改進。我們可能因此跳過更多列組。此外,若可行,僅選擇必要的列始終是個好選擇。

本文幫助您理解如何利用Parquet和Pandas的力量以提升性能。

這裡有一個腳本,包含了先前提到的所有範例,並附有時間對比。

Source:
https://dzone.com/articles/parquet-data-filtering-with-pandas