סינון נתוני פארקט עם פנדס

כשזה מגיע לסינון נתונים מקבצי Parquet באמצעות pandas, יש לשים לב שישנם מספר שיטות אפשריות. בעוד שמהודא שחלוקת הנתונים יכולה לשפר באופן משמעותי את היעילות של פעולות הסינון, ישנם שיטות נוספות לייעל את הביצועים של שאילתת הנתונים המאוחסנים בקבצי Parquet. חלוקת הנתונים היא רק אחת האפשרויות.

סינון לפי שדות מחולקים

כפי שכבר הוזכר, שיטה זו היא לא רק הנפוצה ביותר אלא גם היעילה ביותר מבחינת שיפור הביצועים. ההגיון מאחורי זה הוא פשוט. כאשר משתמשים בחלוקות, ניתן לבדוק בספציפיות את הצורך בקריאת קבצים שלמים או אפילו ספריות שלמות של קבצים (המכונה גם דחיפת פרדיקט), ובכך להשיג שיפור משמעותי ומרגש.

Python

 

import pandas as pd
import time
from faker import Faker
 
fake = Faker()
 
MIL=1000000
NUM_OF_RECORDS=10*MIL
FOLDER="/tmp/out/"
PARTITIONED_PATH=f"{FOLDER}partitioned_{NUM_OF_RECORDS}/"
NON_PARTITIONED_PATH_PREFIX=f"{FOLDER}non_partitioned_{NUM_OF_RECORDS}.parquet"
 
print(f"Creating fake data")
data = {
    'id': range(NUM_OF_RECORDS),  # Generate IDs from 1 to 100
    'name': [fake.name() for _ in range(NUM_OF_RECORDS)],
    'age': [fake.random_int(min=18, max=99) for _ in range(NUM_OF_RECORDS)],
    'state': [fake.state() for _ in range(NUM_OF_RECORDS)],
    'city': [fake.city() for _ in range(NUM_OF_RECORDS)],
    'street': [fake.street_address() for _ in range(NUM_OF_RECORDS)]
}
 
df = pd.DataFrame(data)
 
# כתיבה ללא חלוקות
df.to_parquet(path=NON_PARTITIONED_PATH)
 
# כתיבת נתונים מחולקים
df.to_parquet(path=PARTITIONED_PATH, partition_cols=['state'])
 
# קריאה ללא חלוקות
start_time = time.time()
df1 = pd.read_parquet(path=NON_PARTITIONED_PATH)
df1 = df1[df1['state']=='California']
runtime1 = (time.time()) - start_time  # 37 sec
 
# קריאת נתונים מחולקים
start_time = time.time()
df2 = pd.read_parquet(path=PARTITIONED_PATH, filters=[('state','==','California')])
runtime2 = (time.time()) - start_time # 0.20 sec

שיפור הזמן (יחד עם הפחתת השימוש בזיכרון ובמשאבי CPU) הוא משמעותי, ירידה מ-37 שניות לרק 0.20 שניות.

סינון לפי שדות לא מחולקים

בדוגמה לעיל, ראינו איך סינון מבוסס על שדה מחולק יכול לשפר את שחזור הנתונים. עם זאת, ישנם מצבים בהם לא ניתן לחלק את הנתונים ביעילות לפי השדה הספציפי שברצוננו לסנן. בנוסף, במקרים מסוימים, יש צורך בסינון המבוסס על מספר שדות. זה אומר שכל הקבצים הקלט ייפתחו, מה שיכול לפגוע בביצועים.

תודה לכם, Parquet מציעה פתרון חכם למניעת הבעיה זו. קבצי Parquet מחולקים לקבוצות שורות. בתוך כל קבוצת שורה, Parquet מאחסן מפתחות. מדדים אלו כוללים את הערכים המינימליים והמקסימליים עבור כל שדה.

כשאתה כותב קבצי Parquet עם Pandas, אפשר לבחור מה יהיה מספר הרשומות בכל קבוצת שליטה.

כשמשתמשים ב-Pandas לקריאת קבצי Parquet עם מסננים, בקריית Pandas משתמשת במדדים אלו של Parquet כדי לסנן ביעילות את המידע שמועבר לזיכרון. אם השדה הרצוי נמצא מחוץ לטווח המינימום/מקסימום של קבוצת שורה, כולה קבוצת השורה מדורגת בהנחה.

Python

 

df = pd.DataFrame(data)

# כתיבת מידע שאינו מחולק, וצויין את גודל קבוצת השורות
df.to_parquet(path=PATH_TO_PARQUET_FILE, row_group_size=1000000)

# קריאת מידע שאינו מחולק ומסננים רק על פי קבוצות שורות
df = pd.read_parquet(path=DATASET_PATH, filters=[('state','==','California')])

צפייה במדדים בתוך קבצי Parquet ניתן לעשות באמצעות PyArrow.

Shell

 

>>> import pyarrow.parquet as pq

>>> parquet_file = pq.ParquetFile(PATH_TO_PARQUET_FILE)

>>> parquet_file.metadata
<pyarrow._parquet.FileMetaData object at 0x125b21220>
  created_by: parquet-cpp-arrow version 11.0.0
  num_columns: 6
  num_rows: 1000000
  num_row_groups: 10
  format_version: 2.6
  serialized_size: 9325

>>> parquet_file.metadata.row_group(0).column(3)
<pyarrow._parquet.ColumnChunkMetaData object at 0x125b5b180>
  file_offset: 1675616
  file_path: 
  physical_type: BYTE_ARRAY
  num_values: 100000
  path_in_schema: state
  is_stats_set: True
  statistics:
    <pyarrow._parquet.Statistics object at 0x115283590>
      has_min_max: True
      min: Alabama
      max: Wyoming
      null_count: 0
      distinct_count: 0
      num_values: 100000
      physical_type: BYTE_ARRAY
      logical_type: String
      converted_type (legacy): UTF8
  compression: SNAPPY
  encodings: ('RLE_DICTIONARY', 'PLAIN', 'RLE')
  has_dictionary_page: True
  dictionary_page_offset: 1599792
  data_page_offset: 1600354
  total_compressed_size: 75824
  total_uncompressed_size: 75891

שים לב שמספר קבוצות השורות מוזכר במדדים של הקובץ כולו, והערכים המינימליים והמקסימליים מוזכרים בסעיף הסטטיסטיקות של כל עמודה עבור כל קבוצת שורה.

עם זאת, יש דרך לנצל עוד יותר את תכונת Parquet זו לתוצאות מושלמות אף יותר: מיון.

מסננים לפי שדות ממוינים

כפי שצוין בסעיף הקודם, חלק מהמדדים שמאוחסנים על ידי Parquet כוללים את הערכים המינימליים והמקסימליים עבור כל שדה בתוך כל קבוצת שורה. כשהנתונים ממוינים על פי השדה שאנו רוצים לסנן, ל-Pandas יש סבירות גדולה יותר לדלג על קבוצות שורות רבות יותר.

לדוגמה, בואו ניקח מערך נתונים המכיל רשימה של רשומות, כאשר אחד השדות מייצג את "המדינה". אם הרשומות אינן ממוינות, יש סיכוי טוב שכל מדינה מופיעה ברוב קבוצות השורות. לדוגמה, הסתכלו על המטא-נתונים בסעיף הקודם. אפשר לראות שקבוצת השורה הראשונה לבדה מכילה את כל המדינות מ"אלבמה" עד "וויומינג".

עם זאת, אם נמיין את הנתונים על פי השדה "מדינה", יש סבירות ניכרת לדלות רבה מקבוצות שורות.

Python

 

df = pd.DataFrame(data)

# מיון הנתונים על פי "מדינה"
df.sort_values("state").to_parquet(path=NON_PARTITIONED_SORTED_PATH)

עכשיו, בואו נבחן שוב את המטא-נתונים ונראה איך הם השתנו.

PowerShell

 

>>> parquet_file = pq.ParquetFile(PATH_TO_PARQUET_FILE)

>>> parquet_file.metadata.row_group(0).column(3).statistics.min
'Alabama'
>>> parquet_file.metadata.row_group(0).column(3).statistics.max
'Kentucky'


>>> parquet_file.metadata.row_group(1).column(3).statistics.min
'Kentucky'
>>> parquet_file.metadata.row_group(1).column(3).statistics.max
'North Dakota'


>>> parquet_file.metadata.row_group(2).column(3).statistics.min
'North Dakota'
>>> parquet_file.metadata.row_group(2).column(3).statistics.max
'Wyoming'

כפי שאפשר לראות, לאחר מיון לפי מדינה, ערכי המינימום והמקסימום משתנים בהתאם; כל קבוצת שורות מחזיקה חלק מהמדינות במקום את כל המדינות. זה אומר שקריאה עם מסננים צריכה להיות הרבה יותר מהירה כעת.

עכשיו, בואו נראה איך זה משפיע על ביצועי הקריאה לנתונים. הקוד לקריאת הנתונים לא השתנה.

Python

 

# קריאת נתונים לא מפולטים ומסננים לפי קבוצות שורות, הקלט ממוין לפי מדינה
start_time = time.time()
df = pd.read_parquet(path=DATASET_PATH, filters=[('state','==','California')])
runtime = (time.time()) - start_time # 0.24 seconds

באופן מדהים, הביצועים כאן דומים מאוד לשימוש בחלוקות.

העיקרון זה חל על נתונים מחולקים ולא מחולקים. אנו יכולים להשתמש בשתי השיטות בו זמנית. אם לפעמים אנו רוצים לסנן את הנתונים על פי השדה A ובפעמים אחרות על פי השדה B, אז חלוקה על פי השדה A ומיון על פי השדה B יכול להיות אפשרות טובה.
במקרים אחרים, למשל, כאשר השדה שאנו רוצים לסנן על פיי הוא שדה עם קיבולת גבוהה, נוכל לחלק על פי כמה חשיש של הערך (באגירה) ולמיין את הנתונים בתוכו לפי הערך האמיתי של השדה בדרך זו ניהנה מיתרונות שני השיטות – חלוקה וקבוצות שורות.

קריאת תת קבוצה של העמודות

אם כי פחות נפוצות, שיטה נוספת להשגת תוצאות טובות יותר במהלך שליפת הנתונים כוללת בחירה רק בשדות הספציפיים שחיוניים למשימתך. אסטרטגיה זו יכולה להביא לעיכובים בביצועים. זה בגלל הטבע של פורמט Parquet. Parquet מיושם בפורמט טורי, מה שאומר שהוא מאחסן את הנתונים עמודה אחר עמודה בתוך כל קבוצת השורות. קריאה רק מספר מסוים של עמודות אומרת שהעמודות האחרות ידלגו.

Python

 

start_time = time.time()
df = pd.read_parquet(path=NON_PARTITIONED_SORTED_PATH, columns=["name", "state"])
runtime = (time.time()) - start_time # 0.08 seconds

לא מפתיע, שיפור הביצועים גדול.

סיכום

בעוד שחלוקת הנתונים היא בדרך כלל הגישה האופטימלית, זה לא תמיד אפשרי. מיון הנתונים יכול להביא לשיפורים משמעותיים. אנו עשויים לדלג על יותר קבוצות שורות זה. כמו כן, אם אפשר, בחירה רק בעמודות הנחוצות היא תמיד אפשרות טובה.

פוסט זה עזר לך להבין איך לנצל את הכוח של Parquet ו-pandas לשיפור ביצועים.

הנה תסריט המכיל את כל הדוגמאות שהוזכרו קודם לכן, וכולל גם השוואות זמן.

Source:
https://dzone.com/articles/parquet-data-filtering-with-pandas