Parquetファイルからデータをフィルタリングする際にpandasを使用する場合、いくつかの戦略が考えられます。データのパーティショニングがフィルタリング操作の効率性を大幅に向上させることは広く認識されていますが、Parquetファイルに格納されたデータのクエリのパフォーマンスを最適化する追加の方法も存在します。パーティショニングはその一つのオプションです。
パーティショニングされたフィールドによるフィルタリング
前述の通り、このアプローチは最も一般的であるだけでなく、パフォーマンス最適化の観点からも最も影響力があります。この理由は明快です。パーティションが使用されると、全体のファイルやさらにはファイルの全体のディレクトリ(いわゆる述語プッシュダウン)を読み取る必要がなくなり、パフォーマンスの大幅かつ劇的な改善が可能になります。
import pandas as pd
import time
from faker import Faker
fake = Faker()
MIL=1000000
NUM_OF_RECORDS=10*MIL
FOLDER="/tmp/out/"
PARTITIONED_PATH=f"{FOLDER}partitioned_{NUM_OF_RECORDS}/"
NON_PARTITIONED_PATH_PREFIX=f"{FOLDER}non_partitioned_{NUM_OF_RECORDS}.parquet"
print(f"Creating fake data")
data = {
'id': range(NUM_OF_RECORDS), # Generate IDs from 1 to 100
'name': [fake.name() for _ in range(NUM_OF_RECORDS)],
'age': [fake.random_int(min=18, max=99) for _ in range(NUM_OF_RECORDS)],
'state': [fake.state() for _ in range(NUM_OF_RECORDS)],
'city': [fake.city() for _ in range(NUM_OF_RECORDS)],
'street': [fake.street_address() for _ in range(NUM_OF_RECORDS)]
}
df = pd.DataFrame(data)
# パーティションなしでの書き込み
df.to_parquet(path=NON_PARTITIONED_PATH)
# パーティショニングされたデータの書き込み
df.to_parquet(path=PARTITIONED_PATH, partition_cols=['state'])
# パーティションなしでの読み込み
start_time = time.time()
df1 = pd.read_parquet(path=NON_PARTITIONED_PATH)
df1 = df1[df1['state']=='California']
runtime1 = (time.time()) - start_time # 37 sec
# パーティショニングされたデータの読み込み
start_time = time.time()
df2 = pd.read_parquet(path=PARTITIONED_PATH, filters=[('state','==','California')])
runtime2 = (time.time()) - start_time # 0.20 sec
時間の改善(メモリとCPU使用量の削減とともに)は著しいもので、37秒からわずか0.20秒に減少します。
パーティションされていないフィールドによるフィルタリング
上記の例では、パーティションされたフィールドに基づくフィルタリングがデータ取得をどのように改善するかを観察しました。しかし、データが特定のフィルタリングしたいフィールドによって効果的にパーティション化できないシナリオもあります。さらに、場合によっては複数のフィールドに基づいてフィルタリングが必要になることもあります。これは、すべての入力ファイルを開くことを意味し、パフォーマンスに悪影響を及ぼす可能性があります。
ありがたいことに、Parquetはこの問題を緩和するための巧妙な解決策を提供しています。Parquetファイルは行グループに分割されており、各行グループ内でParquetはメタデータを保存します。このメタデータには、各フィールドの最小値と最大値が含まれています。
Pandasを使用してParquetファイルを書き込む際、各コントロールグループに含まれるレコード数を選択できます。
Pandasを使用してフィルタリングされたParquetファイルを読み込む場合、PandasライブラリはこのParquetメタデータを活用してメモリに効率的にデータをフィルタリングします。目的のフィールドが行グループのmin/max範囲外にある場合、その行グループ全体が上手くスキップされます。
df = pd.DataFrame(data)
# 非パーティション化されたデータを書き込み、行グループのサイズを指定
df.to_parquet(path=PATH_TO_PARQUET_FILE, row_group_size=1000000)
# 非パーティション化されたデータを読み込み、行グループのみでフィルタリング
df = pd.read_parquet(path=DATASET_PATH, filters=[('state','==','California')])
Parquetファイル内のメタデータを表示するには、PyArrow
を使用できます。
>>> import pyarrow.parquet as pq
>>> parquet_file = pq.ParquetFile(PATH_TO_PARQUET_FILE)
>>> parquet_file.metadata
<pyarrow._parquet.FileMetaData object at 0x125b21220>
created_by: parquet-cpp-arrow version 11.0.0
num_columns: 6
num_rows: 1000000
num_row_groups: 10
format_version: 2.6
serialized_size: 9325
>>> parquet_file.metadata.row_group(0).column(3)
<pyarrow._parquet.ColumnChunkMetaData object at 0x125b5b180>
file_offset: 1675616
file_path:
physical_type: BYTE_ARRAY
num_values: 100000
path_in_schema: state
is_stats_set: True
statistics:
<pyarrow._parquet.Statistics object at 0x115283590>
has_min_max: True
min: Alabama
max: Wyoming
null_count: 0
distinct_count: 0
num_values: 100000
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8
compression: SNAPPY
encodings: ('RLE_DICTIONARY', 'PLAIN', 'RLE')
has_dictionary_page: True
dictionary_page_offset: 1599792
data_page_offset: 1600354
total_compressed_size: 75824
total_uncompressed_size: 75891
ファイル全体のメタデータには行グループの数が記載されており、各列の各行グループの統計情報セクション内に最小値と最大値が記載されていることに注意してください。
しかし、より最適化された結果を得るためにこのParquet機能をさらに活用する方法があります:ソート。
ソートされたフィールドによるフィルタリング
前のセクションで述べたように、Parquetによって保存されるメタデータの一部には、各行グループ内の各フィールドの最小値と最大値が含まれています。データがフィルタリングするフィールドに基づいてソートされている場合、Pandasはより多くの行グループをスキップする可能性が高くなります。
例えば、’state’ というフィールドを含むレコードのリストが含まれているデータセットを考えてみましょう。レコードがソートされていない場合、ほとんどの行グループに各州が現れる可能性が高いです。例えば、前のセクションのメタデータを見てください。1つ目の行グループだけで、’Alabama’ から ‘Wyoming’ までのすべての州を含んでいることがわかります。
しかし、データを ‘state’ フィールドでソートすると、多くの行グループをスキップする可能性が大幅に高まります。
df = pd.DataFrame(data)
# 'state' でデータをソート
df.sort_values("state").to_parquet(path=NON_PARTITIONED_SORTED_PATH)
それでは、メタデータがどのように変化したかを再び見てみましょう。
>>> parquet_file = pq.ParquetFile(PATH_TO_PARQUET_FILE)
>>> parquet_file.metadata.row_group(0).column(3).statistics.min
'Alabama'
>>> parquet_file.metadata.row_group(0).column(3).statistics.max
'Kentucky'
>>> parquet_file.metadata.row_group(1).column(3).statistics.min
'Kentucky'
>>> parquet_file.metadata.row_group(1).column(3).statistics.max
'North Dakota'
>>> parquet_file.metadata.row_group(2).column(3).statistics.min
'North Dakota'
>>> parquet_file.metadata.row_group(2).column(3).statistics.max
'Wyoming'
州でソートした後、最小値と最大値がそれに応じて影響を受けていることがわかります。各行グループがすべての州ではなく、一部の州を保持しています。これは、フィルタリングによる読み込みが今やはるかに速くなることを意味します。
それでは、データの読み込みのパフォーマンスにどのように影響するか見てみましょう。データを読み込むコードは変更されていません。
# 州でソートされた非パーティション化データを行グループでフィルタリングして読み込む
start_time = time.time()
df = pd.read_parquet(path=DATASET_PATH, filters=[('state','==','California')])
runtime = (time.time()) - start_time # 0.24 seconds
驚くべきことに、ここでのパフォーマンスはパーティショニングを使用するのとほぼ同じです。
この原則は、パーティション分割されたデータと非パーティション分割されたデータの両方に適用されます。私たちは同時に両方の方法を使用することができます。時にはフィールドAに基づいてデータをフィルタリングしたいときもあれば、フィールドBに基づいてデータをフィルタリングしたいときもあるなら、フィールドAでパーティション分割し、フィールドBでソートすることが良い選択肢かもしれません。
他のケースでは、たとえば、フィルタリングに使用したいフィールドが高い基数を持つフィールドである場合、値のハッシュでパーティション分割(バケティング)し、その中でフィールドの実際の値でデータをソートすることができます。この方法で、パーティション分割と行グループの両方の利点を享受できます。
列のサブセットを読み取る
あまり一般的ではありませんが、データ取得時により良い結果を得るための別の方法は、タスクに不可欠な特定のフィールドのみを選択することです。この戦略は時折、パフォーマンスの改善をもたらすことがあります。これはParquet形式の性質によるものです。Parquetは列指向の形式で実装されており、各行グループ内でデータを列単位で保存しています。いくつかの列のみを読み取ることで、他の列はスキップされます。
start_time = time.time()
df = pd.read_parquet(path=NON_PARTITIONED_SORTED_PATH, columns=["name", "state"])
runtime = (time.time()) - start_time # 0.08 seconds
その結果、パフォーマンスの改善は大きくなります。
結論
データのパーティショニングは一般的に最適なアプローチですが、常に可能であるとは限りません。データをソートすることで、大きな改善が見込めます。これにより、より多くの行グループをスキップできます。さらに、可能であれば、必要な列のみを選択することが常に良い選択です。
この記事が、Parquetとpandasの力を活用してパフォーマンスを向上させる方法を理解するのに役立ったことを願っています。
ここにスクリプトがあります、すべての前述の例を含み、時間比較も完全に揃っています。
Source:
https://dzone.com/articles/parquet-data-filtering-with-pandas