Visualizza Dati in Tempo Reale con Python, Dash e RisingWave

I dati in tempo reale sono importanti per le aziende per prendere decisioni rapide. Visualizzare questi dati può aiutare a prendere decisioni ancora più velocemente. Possiamo creare rappresentazioni visive dei dati utilizzando vari app dati o dashboard. Dash è una libreria Python open-source che offre una vasta gamma di componenti predefiniti per creare grafici interattivi, tabelle, grafici e altri elementi UI. RisingWave è un database streaming basato su SQL per il trattamento dei dati in tempo reale. Questo articolo spiegherà come utilizzare Python, Dash e RisingWave per creare visualizzazioni dei dati in tempo reale.

Come Visualizzare i Dati in Tempo Reale

Sappiamo che i dati in tempo reale sono dati generati e elaborati immediatamente, man mano che vengono raccolti da diversi sorgenti dati. Le fonti possono essere database tipici come Postgres o MySQL, e broker di messaggi come Kafka. Una visualizzazione dei dati in tempo reale consiste in pochi passaggi: prima inglobiamo, poi processiamo, e infine mostriamo questi dati in un dashboard.

Nel caso di dati di consegna degli ordini, visualizzare questi dati in tempo reale può fornire informazioni preziose sulla performance di un ristorante o di un servizio di consegna. Ad esempio, possiamo utilizzare i dati in tempo reale per monitorare quanto tempo impiegano gli ordini a essere consegnati, identificare i colli di bottiglia nel processo di consegna e tracciare i cambiamenti nel volume degli ordini nel tempo. Quando si tratta di dati che cambiano costantemente, può essere difficile tenere traccia di tutto ciò che sta accadendo e identificare pattern o tendenze. Utilizzando strumenti gratuiti come Dash e RisingWave, possiamo creare visualizzazioni interattive che ci permettono di esplorare e analizzare questi dati in continua evoluzione.

Quando si tratta di lavorare con i dati, il primo linguaggio di programmazione a cui potresti pensare è Python, poiché dispone di una gamma di librerie. Dash è una di queste che ci permette di creare un’app dati con un’interfaccia utente ricca e personalizzabile utilizzando solo codice Python. Dash è costruito sulla base di Flask, Plotly.js e React.js, che sono strumenti di sviluppo web popolari, quindi non è necessario conoscere HTML, CSS o altri framework JavaScript.

Con RisingWave, possiamo consumare flussi di dati da varie fonti, creare viste materializzate ottimizzate per query complesse e eseguire query su dati in tempo reale utilizzando SQL. Poiché RisingWave è compatibile a livello di rete con PostgreSQL, possiamo utilizzare il driver psycopg2 (libreria client PostgreSQL in Python) per connetterci a RisingWave e eseguire operazioni di query. Vedere nella sezione successiva.

Visualizza Dati Spedizione Ordini Demo

Nel tutorial dimostrativo, utilizzeremo il seguente repository GitHub con demo di RisingWave dove assumiamo che tutte le cose necessarie siano configurate utilizzando Docker Compose. Puoi verificare altri modi per eseguire RisingWave sul sito ufficiale. Abbiamo un topic Kafka denominato delivery_orders che contiene eventi per ogni ordine effettuato su un sito di consegna di cibo. Ogni evento include informazioni sull’ordine, come il ID ordine, ID ristorante, e stato consegna. Il generatore di carico (script Python chiamato Datagen) simula la generazione di dati mock casuali in modo continuo e li trasmette in topic Kafka. In realtà, questi dati mock possono essere sostituiti con dati provenienti dal tuo web app o servizio backend.

Prima di Iniziare

Per completare questo tutorial, ti occorrono i seguenti elementi:

  • Assicurati di avere Docker e Docker Compose installati nel tuo ambiente.
  • Assicurati che il terminale interattivo PostgreSQL, psql, sia installato nel tuo ambiente. Per istruzioni dettagliate, consulta Scarica PostgreSQL.
  • Scarica e installa Python 3 per il tuo sistema operativo. Il comando pip verrà installato automaticamente.

Il demo che ho testato su OS Windows, Docker Desktop e versione Python 3.10.11 installata.

Passo 1: Configurazione del cluster dimostrativo di RisingWave

In primo luogo, clona il repository campione RisingWave nel tuo ambiente locale.

git clone <https://github.com/risingwavelabs/risingwave.git>

Quindi, accedi alla directory integration_tests/delivery e avvia il cluster dimostrativo dal file docker compose.

cd risingwave/integration_tests/delivery
docker compose up -d

Assicurati che tutti i container siano in funzione!

Passo 2: Installare le librerie Dash e Psycopg2

Per installare Dash, puoi anche fare riferimento alla guida Installazione di Dash sul sito web. Fondamentalmente, dobbiamo installare due librerie (Dash stesso e Pandas) eseguendo il comando pip install seguente:

# This also brings along the Plotly graphing library.
# Plotly is known for its interactive charts
# Plotly Express requires Pandas to be installed too.
pip install dash pandas

Dovremmo anche installare psycopg2 per interagire con il database di streaming RisingWave:

pip install psycopg2-binary

Passo 3: Creare una Fonte Dati

Per inserire dati in tempo reale con RisingWave, devi prima impostare una fonte dati. Nel progetto dimostrativo, Kafka dovrebbe essere definito come fonte dati. Stiamo per creare un nuovo file chiamato create-a-source.py, nella stessa directory integration_tests/delivery con lo script Python dove ci connettiamo a RisingWave e creiamo una tabella per consumare e memorizzare i temi Kafka delivery_orders. Puoi semplicemente copiare e incollare il codice seguente nel nuovo file.

import psycopg2

conn = psycopg2.connect(database="dev", user="root", password="", host="localhost", port="4566") # Connect to RisingWave.

conn.autocommit = True # Set queries to be automatically committed.

with conn.cursor() as cur:
    cur.execute("""
CREATE TABLE delivery_orders_source (
    order_id BIGINT,
    restaurant_id BIGINT,
    order_state VARCHAR,
    order_timestamp TIMESTAMP
) WITH (
    connector = 'kafka',
    topic = 'delivery_orders',
    properties.bootstrap.server = 'message_queue:29092',
    scan.startup.mode = 'earliest'
) ROW FORMAT JSON;""") # Execute the query.

conn.close() # Close the connection.

Dopo aver creato il file, esegui python create-a-source.py e creerà la tabella sorgente in RisingWave.

Passo 4: Creare una Vista Materializzata

Successivo, creiamo una nuova vista materializzata simile a come abbiamo creato la tabella. Creiamo un nuovo file chiamato create-a-materialized-view.py e eseguiamo una query SQL utilizzando la libreria psycopg2. È anche possibile unire gli ultimi due passaggi descritti sopra in un unico file di script Python.

import psycopg2

conn = psycopg2.connect(database="dev", user="root", password="", host="localhost", port="4566")
conn.autocommit = True

with conn.cursor() as cur:
    cur.execute("""CREATE MATERIALIZED VIEW restaurant_orders_view AS
SELECT
    window_start,
    restaurant_id,
    COUNT(*) AS total_order
FROM
    HOP(delivery_orders_source, order_timestamp, INTERVAL '1' MINUTE, INTERVAL '15' MINUTE)
WHERE
        order_state = 'CREATED'
GROUP BY
    restaurant_id,
    window_start;""")

conn.close()

Sopra, la query SQL calcola il numero totale di ordini creati da un ristorante specifico negli ultimi 15 minuti in tempo reale e memorizza il risultato nella vista materializzata. Se si verificano modifiche ai dati o arrivano nuovi topic Kafka, RisingWave incrementa automaticamente e aggiorna il risultato della vista materializzata. Una volta configurato la sorgente dati, la vista materializzata, è possibile iniziare a inserire dati e visualizzare questi dati utilizzando Dash.

Passo 5: Creazione di un’App Dash

Ora costruiamo la nostra app Dash per interrogare e visualizzare il contenuto della vista materializzata che abbiamo in RisingWave. È possibile seguire la guida Dash in 20 minuti per comprendere i blocchi costitutivi di base di Dash. Il nostro esempio di codice dell’applicazione mostra i dati degli ordini dei ristoranti in formato tabella e grafico. Vedi il codice Python seguente in dash-example.py:

import psycopg2
import pandas as pd
import dash
from dash import dash_table
from dash import dcc
import dash_html_components as html
import plotly.express as px

# Connect to the PostgreSQL database
conn = psycopg2.connect(database="dev", user="root", password="", host="localhost", port="4566")

# Retrieve data from the materialized view using pandas
df = pd.read_sql_query("SELECT window_start, restaurant_id, total_order FROM restaurant_orders_view;", conn)

# Create a Dash application
app = dash.Dash(__name__)

# Define layout
app.layout = html.Div(children=[
    html.H1("Restaurant Orders Table"),
    dash_table.DataTable(id="restaurant_orders_table", columns=[{"name": i, "id": i} for i in df.columns], data=df.to_dict("records"), page_size=10),
    html.H1("Restaurant Orders Graph"),
    dcc.Graph(id="restaurant_orders_graph", figure=px.bar(df, x="window_start", y="total_order", color="restaurant_id", barmode="group"))
])

# Run the application
if __name__ == '__main__':
    app.run_server(debug=True)

Questo frammento di codice recupera i dati dalla vista materializzata restaurant_orders_view utilizzando pandas e li visualizza in una tabella Dash tramite dash_table.DataTable e un grafico a barre tramite dcc.Graph. La tabella e il grafico a barre hanno colonne che corrispondono alle colonne della vista materializzata (‘window_start’, ‘total_order’, e ‘restaurant_id’) e righe che corrispondono ai dati della vista materializzata.

Passo 6: Visualizza i Risultati

Puoi eseguire l’applicazione eseguendo lo script dash-example.py e navigando a http://localhost:8050/ nel tuo browser web (riceverai un messaggio nel terminale che ti suggerirà di andare a questo link).

Riepilogo

In generale, Dash è uno strumento potente per creare visualizzazioni analitiche dei dati che richiedono interfacce utente complesse e capacità di visualizzazione dei dati, utilizzando tutto ciò con la semplicità e l’eleganza del linguaggio di programmazione Python. Quando lo usiamo insieme a RisingWave streaming database, acquisiamo informazioni sui dati in tempo reale che possono aiutarci a prendere decisioni più informate e ad agire per ottimizzare le prestazioni.

Risorse Correlate

Source:
https://dzone.com/articles/visualize-real-time-data-with-python-dash-and-risi