כשזה מגיע לסינון נתונים מקבצי Parquet באמצעות pandas, יש לשים לב שישנם מספר שיטות אפשריות. בעוד שמהודא שחלוקת הנתונים יכולה לשפר באופן משמעותי את היעילות של פעולות הסינון, ישנם שיטות נוספות לייעל את הביצועים של שאילתת הנתונים המאוחסנים בקבצי Parquet. חלוקת הנתונים היא רק אחת האפשרויות.
סינון לפי שדות מחולקים
כפי שכבר הוזכר, שיטה זו היא לא רק הנפוצה ביותר אלא גם היעילה ביותר מבחינת שיפור הביצועים. ההגיון מאחורי זה הוא פשוט. כאשר משתמשים בחלוקות, ניתן לבדוק בספציפיות את הצורך בקריאת קבצים שלמים או אפילו ספריות שלמות של קבצים (המכונה גם דחיפת פרדיקט), ובכך להשיג שיפור משמעותי ומרגש.
import pandas as pd
import time
from faker import Faker
fake = Faker()
MIL=1000000
NUM_OF_RECORDS=10*MIL
FOLDER="/tmp/out/"
PARTITIONED_PATH=f"{FOLDER}partitioned_{NUM_OF_RECORDS}/"
NON_PARTITIONED_PATH_PREFIX=f"{FOLDER}non_partitioned_{NUM_OF_RECORDS}.parquet"
print(f"Creating fake data")
data = {
'id': range(NUM_OF_RECORDS), # Generate IDs from 1 to 100
'name': [fake.name() for _ in range(NUM_OF_RECORDS)],
'age': [fake.random_int(min=18, max=99) for _ in range(NUM_OF_RECORDS)],
'state': [fake.state() for _ in range(NUM_OF_RECORDS)],
'city': [fake.city() for _ in range(NUM_OF_RECORDS)],
'street': [fake.street_address() for _ in range(NUM_OF_RECORDS)]
}
df = pd.DataFrame(data)
# כתיבה ללא חלוקות
df.to_parquet(path=NON_PARTITIONED_PATH)
# כתיבת נתונים מחולקים
df.to_parquet(path=PARTITIONED_PATH, partition_cols=['state'])
# קריאה ללא חלוקות
start_time = time.time()
df1 = pd.read_parquet(path=NON_PARTITIONED_PATH)
df1 = df1[df1['state']=='California']
runtime1 = (time.time()) - start_time # 37 sec
# קריאת נתונים מחולקים
start_time = time.time()
df2 = pd.read_parquet(path=PARTITIONED_PATH, filters=[('state','==','California')])
runtime2 = (time.time()) - start_time # 0.20 sec
שיפור הזמן (יחד עם הפחתת השימוש בזיכרון ובמשאבי CPU) הוא משמעותי, ירידה מ-37 שניות לרק 0.20 שניות.
סינון לפי שדות לא מחולקים
בדוגמה לעיל, ראינו איך סינון מבוסס על שדה מחולק יכול לשפר את שחזור הנתונים. עם זאת, ישנם מצבים בהם לא ניתן לחלק את הנתונים ביעילות לפי השדה הספציפי שברצוננו לסנן. בנוסף, במקרים מסוימים, יש צורך בסינון המבוסס על מספר שדות. זה אומר שכל הקבצים הקלט ייפתחו, מה שיכול לפגוע בביצועים.
תודה לכם, Parquet מציעה פתרון חכם למניעת הבעיה זו. קבצי Parquet מחולקים לקבוצות שורות. בתוך כל קבוצת שורה, Parquet מאחסן מפתחות. מדדים אלו כוללים את הערכים המינימליים והמקסימליים עבור כל שדה.
כשאתה כותב קבצי Parquet עם Pandas, אפשר לבחור מה יהיה מספר הרשומות בכל קבוצת שליטה.
כשמשתמשים ב-Pandas לקריאת קבצי Parquet עם מסננים, בקריית Pandas משתמשת במדדים אלו של Parquet כדי לסנן ביעילות את המידע שמועבר לזיכרון. אם השדה הרצוי נמצא מחוץ לטווח המינימום/מקסימום של קבוצת שורה, כולה קבוצת השורה מדורגת בהנחה.
df = pd.DataFrame(data)
# כתיבת מידע שאינו מחולק, וצויין את גודל קבוצת השורות
df.to_parquet(path=PATH_TO_PARQUET_FILE, row_group_size=1000000)
# קריאת מידע שאינו מחולק ומסננים רק על פי קבוצות שורות
df = pd.read_parquet(path=DATASET_PATH, filters=[('state','==','California')])
צפייה במדדים בתוך קבצי Parquet ניתן לעשות באמצעות PyArrow
.
>>> import pyarrow.parquet as pq
>>> parquet_file = pq.ParquetFile(PATH_TO_PARQUET_FILE)
>>> parquet_file.metadata
<pyarrow._parquet.FileMetaData object at 0x125b21220>
created_by: parquet-cpp-arrow version 11.0.0
num_columns: 6
num_rows: 1000000
num_row_groups: 10
format_version: 2.6
serialized_size: 9325
>>> parquet_file.metadata.row_group(0).column(3)
<pyarrow._parquet.ColumnChunkMetaData object at 0x125b5b180>
file_offset: 1675616
file_path:
physical_type: BYTE_ARRAY
num_values: 100000
path_in_schema: state
is_stats_set: True
statistics:
<pyarrow._parquet.Statistics object at 0x115283590>
has_min_max: True
min: Alabama
max: Wyoming
null_count: 0
distinct_count: 0
num_values: 100000
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8
compression: SNAPPY
encodings: ('RLE_DICTIONARY', 'PLAIN', 'RLE')
has_dictionary_page: True
dictionary_page_offset: 1599792
data_page_offset: 1600354
total_compressed_size: 75824
total_uncompressed_size: 75891
שים לב שמספר קבוצות השורות מוזכר במדדים של הקובץ כולו, והערכים המינימליים והמקסימליים מוזכרים בסעיף הסטטיסטיקות של כל עמודה עבור כל קבוצת שורה.
עם זאת, יש דרך לנצל עוד יותר את תכונת Parquet זו לתוצאות מושלמות אף יותר: מיון.
מסננים לפי שדות ממוינים
כפי שצוין בסעיף הקודם, חלק מהמדדים שמאוחסנים על ידי Parquet כוללים את הערכים המינימליים והמקסימליים עבור כל שדה בתוך כל קבוצת שורה. כשהנתונים ממוינים על פי השדה שאנו רוצים לסנן, ל-Pandas יש סבירות גדולה יותר לדלג על קבוצות שורות רבות יותר.
לדוגמה, בואו ניקח מערך נתונים המכיל רשימה של רשומות, כאשר אחד השדות מייצג את "המדינה". אם הרשומות אינן ממוינות, יש סיכוי טוב שכל מדינה מופיעה ברוב קבוצות השורות. לדוגמה, הסתכלו על המטא-נתונים בסעיף הקודם. אפשר לראות שקבוצת השורה הראשונה לבדה מכילה את כל המדינות מ"אלבמה" עד "וויומינג".
עם זאת, אם נמיין את הנתונים על פי השדה "מדינה", יש סבירות ניכרת לדלות רבה מקבוצות שורות.
df = pd.DataFrame(data)
# מיון הנתונים על פי "מדינה"
df.sort_values("state").to_parquet(path=NON_PARTITIONED_SORTED_PATH)
עכשיו, בואו נבחן שוב את המטא-נתונים ונראה איך הם השתנו.
>>> parquet_file = pq.ParquetFile(PATH_TO_PARQUET_FILE)
>>> parquet_file.metadata.row_group(0).column(3).statistics.min
'Alabama'
>>> parquet_file.metadata.row_group(0).column(3).statistics.max
'Kentucky'
>>> parquet_file.metadata.row_group(1).column(3).statistics.min
'Kentucky'
>>> parquet_file.metadata.row_group(1).column(3).statistics.max
'North Dakota'
>>> parquet_file.metadata.row_group(2).column(3).statistics.min
'North Dakota'
>>> parquet_file.metadata.row_group(2).column(3).statistics.max
'Wyoming'
כפי שאפשר לראות, לאחר מיון לפי מדינה, ערכי המינימום והמקסימום משתנים בהתאם; כל קבוצת שורות מחזיקה חלק מהמדינות במקום את כל המדינות. זה אומר שקריאה עם מסננים צריכה להיות הרבה יותר מהירה כעת.
עכשיו, בואו נראה איך זה משפיע על ביצועי הקריאה לנתונים. הקוד לקריאת הנתונים לא השתנה.
# קריאת נתונים לא מפולטים ומסננים לפי קבוצות שורות, הקלט ממוין לפי מדינה
start_time = time.time()
df = pd.read_parquet(path=DATASET_PATH, filters=[('state','==','California')])
runtime = (time.time()) - start_time # 0.24 seconds
באופן מדהים, הביצועים כאן דומים מאוד לשימוש בחלוקות.
העיקרון זה חל על נתונים מחולקים ולא מחולקים. אנו יכולים להשתמש בשתי השיטות בו זמנית. אם לפעמים אנו רוצים לסנן את הנתונים על פי השדה A ובפעמים אחרות על פי השדה B, אז חלוקה על פי השדה A ומיון על פי השדה B יכול להיות אפשרות טובה.
במקרים אחרים, למשל, כאשר השדה שאנו רוצים לסנן על פיי הוא שדה עם קיבולת גבוהה, נוכל לחלק על פי כמה חשיש של הערך (באגירה) ולמיין את הנתונים בתוכו לפי הערך האמיתי של השדה בדרך זו ניהנה מיתרונות שני השיטות – חלוקה וקבוצות שורות.
קריאת תת קבוצה של העמודות
אם כי פחות נפוצות, שיטה נוספת להשגת תוצאות טובות יותר במהלך שליפת הנתונים כוללת בחירה רק בשדות הספציפיים שחיוניים למשימתך. אסטרטגיה זו יכולה להביא לעיכובים בביצועים. זה בגלל הטבע של פורמט Parquet. Parquet מיושם בפורמט טורי, מה שאומר שהוא מאחסן את הנתונים עמודה אחר עמודה בתוך כל קבוצת השורות. קריאה רק מספר מסוים של עמודות אומרת שהעמודות האחרות ידלגו.
start_time = time.time()
df = pd.read_parquet(path=NON_PARTITIONED_SORTED_PATH, columns=["name", "state"])
runtime = (time.time()) - start_time # 0.08 seconds
לא מפתיע, שיפור הביצועים גדול.
סיכום
בעוד שחלוקת הנתונים היא בדרך כלל הגישה האופטימלית, זה לא תמיד אפשרי. מיון הנתונים יכול להביא לשיפורים משמעותיים. אנו עשויים לדלג על יותר קבוצות שורות זה. כמו כן, אם אפשר, בחירה רק בעמודות הנחוצות היא תמיד אפשרות טובה.
פוסט זה עזר לך להבין איך לנצל את הכוח של Parquet ו-pandas לשיפור ביצועים.
הנה תסריט המכיל את כל הדוגמאות שהוזכרו קודם לכן, וכולל גם השוואות זמן.
Source:
https://dzone.com/articles/parquet-data-filtering-with-pandas