Visualisieren Sie Echtzeitdaten mit Python, Dash und RisingWave

Echtzeitdaten sind für Unternehmen wichtig, um schnell Entscheidungen zu treffen. Die visuelle Darstellung dieser Daten kann die Entscheidungsfindung sogar noch beschleunigen. Wir können visuelle Darstellungen von Daten mit verschiedenen Daten-Apps oder Dashboards erstellen. Dash ist eine quelloffene Python-Bibliothek, die eine breite Palette integrierter Komponenten für die Erstellung interaktiver Diagramme, Graphen, Tabellen und anderer UI-Elemente bietet. RisingWave ist eine SQL-basierte Streaming-Datenbank für die Echtzeitverarbeitung von Daten. In diesem Artikel wird erklärt, wie man Python, Dash und RisingWave verwendet, um Visualisierungen von Echtzeitdaten zu erstellen.

So Visualisieren Sie Daten in Echtzeit

Wir wissen, dass Echtzeitdaten Daten sind, die sofort generiert und verarbeitet werden, während sie von verschiedenen Datenquellen erfasst werden. Quellen können typische Datenbanken wie Postgres oder MySQL und Message-Broker wie Kafka sein. Eine Echtzeitdatenvisualisierung umfasst einige Schritte: Zuerst ingestieren wir, dann verarbeiten, und schließlich zeigen wir diese Daten in einem Dashboard an.

Im Fall von Bestell- und Lieferungsdaten können diese Daten in Echtzeit visualisiert werden, um wertvolle Einblicke in die Leistung eines Restaurants oder einer Lieferdienstleistung zu gewinnen. Zum Beispiel können wir Echtzeitdaten nutzen, um zu überwachen, wie lange es dauert, bis Bestellungen geliefert werden, Engpässe im Lieferprozess zu identifizieren und Veränderungen im Bestellvolumen über die Zeit zu verfolgen. Bei Daten, die sich ständig ändern, kann es schwierig sein, den Überblick über alles zu behalten und Muster oder Trends zu erkennen. Mit kostenlosen Tools wie Dash und RisingWave können wir interaktive Visualisierungen erstellen, die es uns ermöglichen, diese ständig wechselnden Daten zu erkunden und zu analysieren.

Wenn es um die Arbeit mit Daten geht, ist die erste Programmiersprache, an die man denken könnte, Python, weil es eine Vielzahl von Bibliotheken hat. Dash ist eine davon, die es uns ermöglicht, eine Daten-App mit einer reichhaltigen und anpassbaren Benutzeroberfläche mit nur Python-Code zu erstellen. Dash basiert auf Flask, Plotly.js und React.js, die beliebte Tools für die Webentwicklung sind, sodass man kein HTML, CSS oder andere JavaScript-Frameworks kennen muss.

Mit RisingWave können wir Datenströme ausverschiedenen Quellenverarbeiten, Materialized Views erstellen, die für komplexe Abfragen optimiert sind, undEchtzeitdaten mithilfe von SQL abfragen. Da RisingWave mit PostgreSQL kompatibel ist, können wir den psycopg2 (PostgreSQL-Clientbibliothek in Python) Treiber verwenden, um eine Verbindung zu RisingWave herzustellen und Abfrageoperationen durchzuführen. Siehe im nächsten Abschnitt.

Visualisierung von Bestelllieferungsdaten Demo

In dem Demo-Tutorial werden wir das folgende GitHub-Repository mit RisingWave-Demos nutzen, in dem angenommen wird, dass alle notwendigen Einrichtungen mithilfe von Docker Compose erfolgt sind. Andere Möglichkeiten, RisingWave auszuführen, finden Sie auf der offiziellen Website. Wir verfügen über einen Kafka-Topic namens delivery_orders, der Ereignisse für jede auf einer Essenslieferungs-Website getätigte Bestellung enthält. Jedes Ereignis enthält Informationen über die Bestellung, wie zum Beispiel die Bestell-ID, die Restaurant-ID und den Lieferstatus. Der Workload-Generator (Python-Skript namens Datagen) simuliert die kontinuierliche Erzeugung von zufälligen Mock-Daten und streamt sie in Kafka-Themen. In der Realität können diese Mock-Daten durch Daten ersetzt werden, die von Ihrer Web-App oder Backend-Service kommen.

Bevor Sie beginnen

Um dieses Tutorial abzuschließen, benötigen Sie Folgendes:

  • Stellen Sie sicher, dass Docker und Docker Compose in Ihrer Umgebung installiert sind.
  • Stellen Sie sicher, dass die interaktive PostgreSQL-Konsole, psql, in Ihrer Umgebung installiert ist. Für detaillierte Anweisungen siehe Download PostgreSQL.
  • Laden Sie Python 3 für Ihr Betriebssystem herunter und installieren Sie es. pip wird automatisch installiert.

Das Demo, das ich auf Windows-Betriebssystem, Docker Desktop und Python 3.10.11-Version installiert habe, getestet habe.

Schritt 1: Einrichten des RisingWave-Demo-Clusters

Klonen Sie zuerst das RisingWave-Beispielrepository in Ihre lokale Umgebung.

git clone <https://github.com/risingwavelabs/risingwave.git>

Dann navigieren Sie zum Verzeichnis integration_tests/delivery und starten Sie den Demo-Cluster aus der docker-Compose-Datei.

cd risingwave/integration_tests/delivery
docker compose up -d

Stellen Sie sicher, dass alle Container ausgeführt werden!

Schritt 2: Installieren der Dash- und Psycopg2-Bibliotheken

Um Dash zu installieren, können Sie sich auch an der Dash Installation Anleitung auf der Website orientieren. Grundsätzlich müssen wir zwei Bibliotheken (Dash selbst und Pandas) durch Ausführen des folgenden pip install Befehls installieren:

# This also brings along the Plotly graphing library.
# Plotly is known for its interactive charts
# Plotly Express requires Pandas to be installed too.
pip install dash pandas

Wir sollten auch psycopg2 installieren, um mit der RisingWave-Streaming-Datenbank zu interagieren:

pip install psycopg2-binary

Schritt 3: Erstellen einer Datenquelle

Um Echtzeitdaten mit RisingWave zu verarbeiten, müssen Sie zunächst eine Datenquelle einrichten. Im Demo-Projekt sollte Kafka als Datenquelle definiert werden. Wir werden eine neue Datei namens create-a-source.py erstellen, dieselbe integration_tests/delivery-Verzeichnis mit Python-Skript, in dem wir eine Verbindung zur RisingWave herstellen und eine Tabelle erstellen, um delivery_orders-Kafka-Themen zu konsumieren und zu speichern. Sie können einfach den unten stehenden Code in die neue Datei kopieren und einfügen.

import psycopg2

conn = psycopg2.connect(database="dev", user="root", password="", host="localhost", port="4566") # Connect to RisingWave.

conn.autocommit = True # Set queries to be automatically committed.

with conn.cursor() as cur:
    cur.execute("""
CREATE TABLE delivery_orders_source (
    order_id BIGINT,
    restaurant_id BIGINT,
    order_state VARCHAR,
    order_timestamp TIMESTAMP
) WITH (
    connector = 'kafka',
    topic = 'delivery_orders',
    properties.bootstrap.server = 'message_queue:29092',
    scan.startup.mode = 'earliest'
) ROW FORMAT JSON;""") # Execute the query.

conn.close() # Close the connection.

Nachdem Sie die Datei erstellt haben, führen Sie python create-a-source.py aus und es wird die Quelltabelle in der RisingWave erstellen.

Schritt 4: Erstellen einer Materialized View

Als Nächstes erstellen wir eine neue Materialized View, ähnlich wie wir eine Tabelle erstellt haben. Wir erstellen eine neue Datei namens create-a-materialized-view.py und führen eine SQL-Abfrage unter Verwendung der psycopg2-Bibliothek aus. Es ist auch möglich, die obigen letzten beiden Schritte in einer Python-Skriptdatei zusammenzuführen.

import psycopg2

conn = psycopg2.connect(database="dev", user="root", password="", host="localhost", port="4566")
conn.autocommit = True

with conn.cursor() as cur:
    cur.execute("""CREATE MATERIALIZED VIEW restaurant_orders_view AS
SELECT
    window_start,
    restaurant_id,
    COUNT(*) AS total_order
FROM
    HOP(delivery_orders_source, order_timestamp, INTERVAL '1' MINUTE, INTERVAL '15' MINUTE)
WHERE
        order_state = 'CREATED'
GROUP BY
    restaurant_id,
    window_start;""")

conn.close()

Oben berechnet die SQL-Abfrage die Anzahl der insgesamt erstellten Bestellungen von einem bestimmten Restaurant innerhalb der letzten 15 Minuten in Echtzeit und speichert das Ergebnis in der Materialized View. Fällt eine Datenänderung ein oder kommen neue Kafka-Themen an, erhöht RisingWave automatisch und aktualisiert das Ergebnis der Materialized View. Sobald Sie die Datenquelle und die Materialized View eingerichtet haben, können Sie mit dem Einlesen von Daten beginnen und diese Daten mithilfe von Dash visualisieren.

Schritt 5: Erstellen einer Dash-App

Jetzt erstellen wir unsere Dash-App, um den Inhalt der Materialized View in RisingWave abzufragen und zu visualisieren. Sie können das Tutorial Dash in 20 Minuten verfolgen, um die grundlegenden Bausteine von Dash zu verstehen. Unser Beispiel-Anwendungscode zeigt Restaurant-Bestelldaten sowohl in Tabellen- als auch in Graphenformaten. Sehen Sie sich den folgenden Python-Code in dash-example.py an:

import psycopg2
import pandas as pd
import dash
from dash import dash_table
from dash import dcc
import dash_html_components as html
import plotly.express as px

# Connect to the PostgreSQL database
conn = psycopg2.connect(database="dev", user="root", password="", host="localhost", port="4566")

# Retrieve data from the materialized view using pandas
df = pd.read_sql_query("SELECT window_start, restaurant_id, total_order FROM restaurant_orders_view;", conn)

# Create a Dash application
app = dash.Dash(__name__)

# Define layout
app.layout = html.Div(children=[
    html.H1("Restaurant Orders Table"),
    dash_table.DataTable(id="restaurant_orders_table", columns=[{"name": i, "id": i} for i in df.columns], data=df.to_dict("records"), page_size=10),
    html.H1("Restaurant Orders Graph"),
    dcc.Graph(id="restaurant_orders_graph", figure=px.bar(df, x="window_start", y="total_order", color="restaurant_id", barmode="group"))
])

# Run the application
if __name__ == '__main__':
    app.run_server(debug=True)

Dieser Codeausschnitt extrahiert Daten aus der restaurant_orders_view Materialisierungssicht mithilfe von pandas und zeigt sie in einer Dash-Tabelle unter Verwendung von dash_table.DataTable und einem Balkendiagramm unter Verwendung von dcc.Graph. Die Tabelle und das Balkendiagramm verfügen über Spalten, die den Spalten in der Materialisierungssicht (‚window_start‘, ‚total_order‘ und ‚restaurant_id‘) entsprechen, und über Zeilen, die den Daten in der Materialisierungssicht entsprechen.

Schritt 6: Ergebnisse anzeigen

Sie können die Anwendung starten, indem Sie das obige dash-example.py Skript ausführen und zu http://localhost:8050/ in Ihrem Webbrowser navigieren (Sie erhalten eine Nachricht im Terminal, die Sie auf diese URL verweist).

Zusammenfassung

Insgesamt ist Dash ein leistungsfähiges Werkzeug zum Erstellen von datenanalytischen Ansichten, die komplexe Benutzeroberflächen und Datenvisualisierungskapazitäten erfordern, und zwar unter Verwendung der Einfachheit und Eleganz der Python-Programmiersprache. Wenn wir es zusammen mit dem RisingWave Streaming-Datenbank verwenden, erhalten wir Einblicke in Echtzeitdaten, die uns dabei helfen, fundierter Entscheidungen zu treffen und Maßnahmen zur Optimierung der Leistung zu ergreifen.

Verwandte Ressourcen

Source:
https://dzone.com/articles/visualize-real-time-data-with-python-dash-and-risi