הצגת נתונים בזמן אמת עם פייתון, דאש, ו-RisingWave

נתונים בזמן אמת חשובים עבור עסקים לקבלת החלטות מהירות. ראיית נתונים זה באופן ויזואלי יכולה לעזור לקבל החלטות אפילו מהר יותר. אנו יכולים ליצור ייצוגים ויזואליים של נתונים באמצעות יישומים נתונים שונים או לוחות מחויכים. Dash היא ספריית Python פתוחת המקור המספקת מגוון רחב של רכיבים מובנים ליצירת גרפים אינטראקטיביים, גרפים, טבלאות ואלמנטים UI אחרים. RisingWave הוא מסד נתונים זרימה מבוסס SQL לעיבוד נתונים בזמן אמת. מאמר זה יסביר כיצד להשתמש Python, Dash, ו-RisingWave ליצירת ויזואליזציות של נתונים בזמן אמת.

איך להמחיש נתונים בזמן אמת

אנו יודעים שנתונים בזמן אמת הם נתונים שנוצרים ונעבדים מיידית, כפי שנאספים ממקורות נתונים שונים. מקורות יכולים להיות מסדי נתונים טיפוסיים כמו Postgres או MySQL, ובאמבטי מסרים כמו Kafka. ויזואליזציה של נתונים בזמן אמת כוללת מספר שלבים: קודם כל, אנו מזרימים, לאחר מכן מעבדים, ולבסוףמציגים את הנתונים בלוח מחויכים.

במקרה של נתוני משלוח הזמנות, המראה של נתונים אלו בזמן אמת יכול לספק תובנות חשובות לגבי ביצועים של מסעדה או שירות משלוח. לדוגמה, אנו יכולים להשתמש בנתונים בזמן אמת כדי לנטר את הזמן שלוקח למשלוחים להגיע, לזהות צווארי בקבוק בתהליך המשלוח ולעקוב אחר שינויים בנפח הזמנות לאורך זמן. כשמדובר בנתונים שמשתנים כל הזמן, יכול להיות קשה לעקוב אחר הכל שקורה ולזהות דפוסים או מגמות. באמצעות כלי חינמיים כמו Dash ו-RisingWave, אנו יכולים ליצור הדגמות אינטראקטיביות המאפשרות לנו לחקור ולנתח את הנתונים המשתנים הללו ברציפות.

כשמדובר בעבודה עם נתונים, השפה הקודית הראשונה שאולי תחשוב עליה היא פייתון, משום שיש לה מגוון של ספריות. Dash הוא אחד מהם שמאפשר לנו ליצור אפליקציה נתונים עם ממשק משתמש עשיר ומתוחכם באמצעות קוד פייתון בלבד. Dash מבוסס על Flask, Plotly.js ו-React.js, כלי פיתוח אינטרנט פופולריים כך שאינך זקוק לידע ב-HTML, CSS או רכיבים גאגא של JavaScript.

באמצעות RisingWave, אנו יכולים לצרוך זרמי נתונים ממקורות שונים, ליצור נוף ממוחשה שמיועדת לשאילתות מורכבות, ולשאול נתונים בזמן אמת באמצעות SQL. כש-RisingWave תואם ברשת את PostgreSQL, אנו יכולים להשתמש בכלי הנהג psycopg2 (ספריית לקוח PostgreSQL בפייתון) כדי להתחבר ל-RisingWave ולבצע פעולות שאילתה. ראה בסעיף הבא.

דגמה להמחשת נתוני משלוח מוצרים

בדרכת הדגמה, נשתמש במדיניות הבאה אגף הביניים של GitHub עם הדגמות של RisingWave שם אנו מניחים שכל הדברים הנדרשים מותאמים באמצעות Docker Compose. אפשר לבדוק דרכים נוספות ללרוץ RisingWave באתר הרשמי. יש לנו נושא קפקא בשם delivery_orders המכיל אירועים עבור כל מנה שממוקמת באתר משלוח אוכל. כל אירוע כולל מידע על המנה, כגון מזהה הזמנה, מזהה מסעדה, וסטטוס המשלוח. יצרן העבודה (תסריט Python הנקרא Datagen) מדמה את יצירת נתונים מותחנים אקראיים ברציפות ומזרימים אותם לנושאי קפקא. במציאות, ניתן להחליף את הנתונים המותחנים הללו בנתונים שמגיעים מאפליקצית האינטרנט שלך או משירות הביניים.

לפני שאתה מתחיל

כדי להשלים את הדרכה זו, אתה זקוק לאלה:

  • הקפד להתקין Docker וDocker Compose בסביבתך.
  • וודא שהמסוף האינטראקטיבי PostgreSQL, psql, מותקן בסביבתך. להוראות מפורטות, ראה Download PostgreSQL.
  • הורד והתקן Python 3 עבור המערכת האפליקציה שלך. הפקודה pip תותקן אוטומטית.

הדגמא שבדקתי על מערכת ההפעלה Windows, דפרסט דוקר ו-Python 3.10.11 גרסת התקנה.

שלב 1: הקמת קלאסטר הדגמא של RisingWave

ראשית, העתק את רשומות הדוגמא RisingWave לסביבה המקומית שלך.

git clone <https://github.com/risingwavelabs/risingwave.git>

לאחר מכן, גלול לתיקייה integration_tests/delivery והתחל את קלאסטר הדגמא מקובץ ה-docker compose.

cd risingwave/integration_tests/delivery
docker compose up -d

ודא שכל המיכונים פועלים!

שלב 2: התקנת ספריות Dash ו-Psycopg2

להתקין את Dash, באפשרותך גם להתייעץ במדריך התקנת Dash באתר. בעיקרון, אנו צריכים להתקין שני ספריות (Dash עצמו ו-Pandas) על ידי הרצת הפקודה pip install הבאה:

# This also brings along the Plotly graphing library.
# Plotly is known for its interactive charts
# Plotly Express requires Pandas to be installed too.
pip install dash pandas

כדאי גם להתקין psycopg2 כדי לתקשר עם מסד הנתונים הנע של RisingWave:

pip install psycopg2-binary

שלב 3: יצירת מקור נתונים

כדי לשמור נתונים בזמן אמת עם RisingWave, קודם כל עליך להגדיר מקור נתונים. בפרויקט הדגמה, יש להגדיר את Kafka כמקור הנתונים. אנו עומדים ליצור קובץ חדש בשם create-a-source.py, באותו ספרייה integration_tests/delivery עם תכנית Python שבה אנו מתחברים ל-RisingWave ויוצרים טבלה לצריכת נתונים ולשמירתם מנושאי Kafka delivery_orders. אפשר פשוט להעתיק ולהדביק את הקוד הבא לתוך הקובץ החדש.

import psycopg2

conn = psycopg2.connect(database="dev", user="root", password="", host="localhost", port="4566") # Connect to RisingWave.

conn.autocommit = True # Set queries to be automatically committed.

with conn.cursor() as cur:
    cur.execute("""
CREATE TABLE delivery_orders_source (
    order_id BIGINT,
    restaurant_id BIGINT,
    order_state VARCHAR,
    order_timestamp TIMESTAMP
) WITH (
    connector = 'kafka',
    topic = 'delivery_orders',
    properties.bootstrap.server = 'message_queue:29092',
    scan.startup.mode = 'earliest'
) ROW FORMAT JSON;""") # Execute the query.

conn.close() # Close the connection.

לאחר שתוצרת הקובץ, תרוץ את python create-a-source.py וזה ייצור את טבלת המקור ב-RisingWave.

שלב 4: יצירת ממשק חומרני

ואז, אנו יוצרים ממשק חזותי חדש ממשק חזותי בדומה לאופן שבו יצרנו את הטבלה. אנו יוצרים קובץ חדש בשם create-a-materialized-view.py ומריצים שאילתת SQL באמצעות ספריית psycopg2. ניתן גם לשלב את שתי הצעדים האחרונים למסר פייתון אחד.

import psycopg2

conn = psycopg2.connect(database="dev", user="root", password="", host="localhost", port="4566")
conn.autocommit = True

with conn.cursor() as cur:
    cur.execute("""CREATE MATERIALIZED VIEW restaurant_orders_view AS
SELECT
    window_start,
    restaurant_id,
    COUNT(*) AS total_order
FROM
    HOP(delivery_orders_source, order_timestamp, INTERVAL '1' MINUTE, INTERVAL '15' MINUTE)
WHERE
        order_state = 'CREATED'
GROUP BY
    restaurant_id,
    window_start;""")

conn.close()

לעיל, השאילתה המחשבת את מספר ההזמנות הכולל שנוצרו ממסעדה ספציפית ב-15 הדקות האחרונות בזמן אמת ומטמין את התוצאה בממשק חזותי. אם מתרחש שינוי בנתונים או שמגיעים נושאים חדשים של Kafka, RisingWave מחדש מיידית ומעדכן את תוצאת הממשק חזותי. ברגע שבניתם את מקור הנתונים, ממשק חזותי, תוכלו להתחיל לספוג נתונים ולהציג אותם באמצעות Dash.

צעד 5: בניית אפליקציית Dash

כעת נבנה את אפליקציית Dash שלנו לשאילתה והצגת התוכן של הממשק חזותי שיש לנו ב-RisingWave. תוכלו לעקוב אחר הדרכה Dash תוך 20 דקות כדי להבין את המרכיבים הבסיסיים של Dash. הקוד הדוגמא שלנו מציג נתוני מסעדות בפורמטים טבלה וגרף. ראה את הקוד הפייתון למטה בdash-example.py:

import psycopg2
import pandas as pd
import dash
from dash import dash_table
from dash import dcc
import dash_html_components as html
import plotly.express as px

# Connect to the PostgreSQL database
conn = psycopg2.connect(database="dev", user="root", password="", host="localhost", port="4566")

# Retrieve data from the materialized view using pandas
df = pd.read_sql_query("SELECT window_start, restaurant_id, total_order FROM restaurant_orders_view;", conn)

# Create a Dash application
app = dash.Dash(__name__)

# Define layout
app.layout = html.Div(children=[
    html.H1("Restaurant Orders Table"),
    dash_table.DataTable(id="restaurant_orders_table", columns=[{"name": i, "id": i} for i in df.columns], data=df.to_dict("records"), page_size=10),
    html.H1("Restaurant Orders Graph"),
    dcc.Graph(id="restaurant_orders_graph", figure=px.bar(df, x="window_start", y="total_order", color="restaurant_id", barmode="group"))
])

# Run the application
if __name__ == '__main__':
    app.run_server(debug=True)

קטע הקוד הזה משיג את הנתונים מהנציג restaurant_orders_view המוגבר באמצעות פנדה ומציג אותם בטבלה של דאש באמצעות dash_table.DataTable ובתרשים עמודתי באמצעות dcc.Graph. הטבלה והתרשים העמודתי כוללים עמודות המתאימות לעמודות בנציג המוגבר ('window_start', 'total_order', ו- 'restaurant_id’) ושורות המתאימות לנתונים בנציג המוגבר.

שלב 6: צפייה בתוצאות

אפשר להריץ את היישום על ידי הרצת תוכנית dash-example.py לעיל ונavigating ל- http://localhost:8050/ בדפדפן האינטרנט שלך (תקבל מסר במסוף המסמך ללכת לקישור זה).

סיכום

בסך הכל, דאש היא כלי עוצמתי ליצירת ראייה בנתונים ניתוחית שדורשת UI מורכב ויכולות ויזואליזציה של נתונים, כולם באמצעות הפשטות ואלגנטיות של שפת התכנות פייתון. כשאנו משתמשים בו יחד עם מסד הנתונים הנדגל של RisingWave, אנו זוכים בתובנות לנתונים בזמן אמת שיכולות לעזור לנו לקבל החלטות מדויקות יותר ולפעול לייעלת הביצועים.

משאבים קשורים

Source:
https://dzone.com/articles/visualize-real-time-data-with-python-dash-and-risi