即時數據對企業快速決策至關重要。透過視覺化呈現這些數據,能夠加速決策過程。我們可以利用各種數據應用程式或儀表板來創建數據的視覺化表示。Dash是一個開源的Python庫,提供了豐富的內建元件,用於創建互動式圖表、圖形、表格及其他UI元素。RisingWave是一個基於SQL的串流資料庫,專門用於即時數據處理。本文將說明如何使用Python、Dash和RisingWave來進行即時數據的視覺化。
如何即時視覺化數據
我們知道即時數據是指從不同數據源收集後立即產生並處理的數據。這些數據源可以是典型的資料庫,如Postgres或MySQL,以及消息代理如Kafka。即時數據視覺化包含幾個步驟:首先,我們收集,接著處理,最後在儀表板上展示這些數據。
在訂單配送數據的情況下,即時視覺化這些數據能夠為餐廳或配送服務的績效提供寶貴的洞察。例如,我們可以利用即時數據監控訂單的配送時間,識別配送過程中的瓶頸,以及追蹤訂單量的時間變化。面對持續變化的數據,要跟蹤所有發生的事情並識別模式或趨勢可能很困難。使用如Dash和RisingWave這樣的免費工具,我們可以創建互動式視覺化,使我們能夠探索和分析這種不斷變化的數據。
談到數據工作,首先想到的程式語言可能是Python,因為它擁有多種庫。Dash就是其中之一,它允許我們僅用Python代碼創建具有豐富且可定制用戶界面的數據應用。Dash建立在Flask、Plotly.js和React.js之上,這些都是流行的網頁開發工具,因此您無需了解HTML、CSS或其他JavaScript框架。
透過RisingWave,我們可以從多種來源消費數據流,創建針對複雜查詢優化的物化視圖,並使用SQL查詢即時數據。由於RisingWave與PostgreSQL線上兼容,我們可以使用psycopg2
(Python中的PostgreSQL客戶端庫)驅動程序連接到RisingWave並進行查詢操作。詳見下一部分。

視覺化訂單配送數據示範
在本示範教程中,我們將利用以下GitHub 儲存庫中的 RisingWave 示範,假設所有必要設置已使用 Docker Compose 完成。您可以在官方網站上查看其他運行 RisingWave的方法。我們有一個名為delivery_orders
的 Kafka 主題,其中包含食品配送網站上每筆訂單的事件。每個事件包括訂單信息,如訂單 ID
、餐廳 ID
和配送狀態
。工作負載生成器(名為 Datagen 的 Python 腳本)模擬連續生成隨機模擬數據,並將其流式傳輸到 Kafka 主題。在實際應用中,此模擬數據可以替換為來自您的 Web 應用或後端服務的數據。
開始之前
要完成本教程,您需要以下內容:
- 確保您的環境中已安裝Docker和Docker Compose。
- 確保您的環境中已安裝PostgreSQL的互動式終端機psql。詳細安裝指南請參閱下載PostgreSQL。
- 根據您的操作系統下載並安裝Python 3,
pip
命令將會自動安裝。
我測試的示範是在Windows操作系統上,安裝了Docker桌面版及Python 3.10.11版本。
第一步:設置RisingWave示範集群
首先,將RisingWave示範倉庫克隆到您的本地環境。
git clone <https://github.com/risingwavelabs/risingwave.git>
接著,進入integration_tests/delivery
目錄,並從docker compose文件啟動示範集群。
cd risingwave/integration_tests/delivery
docker compose up -d
確保所有容器均已啟動並運行!
第二步:安裝Dash和Psycopg2庫
要安裝Dash,您也可以參考網站上的Dash安裝指南。基本上,我們需要通過運行以下pip install
命令來安裝兩個庫(Dash本身和Pandas):
# This also brings along the Plotly graphing library.
# Plotly is known for its interactive charts
# Plotly Express requires Pandas to be installed too.
pip install dash pandas
我們還應該安裝psycopg2
以與RisingWave流數據庫進行交互:
pip install psycopg2-binary
步驟3:創建數據源
要使用RisingWave攝取實時數據,您首先需要設置一個數據源。在演示項目中,應將Kafka定義為數據源。我們將創建一個名為create-a-source.py
的新文件,該文件位於與Python腳本相同的integration_tests/delivery
目錄中,我們在其中連接到RisingWave,並創建一個表以消費和持久化delivery_orders
Kafka主題。您只需將以下代碼複製並粘貼到新文件中即可。
import psycopg2
conn = psycopg2.connect(database="dev", user="root", password="", host="localhost", port="4566") # Connect to RisingWave.
conn.autocommit = True # Set queries to be automatically committed.
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE delivery_orders_source (
order_id BIGINT,
restaurant_id BIGINT,
order_state VARCHAR,
order_timestamp TIMESTAMP
) WITH (
connector = 'kafka',
topic = 'delivery_orders',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) ROW FORMAT JSON;""") # Execute the query.
conn.close() # Close the connection.
創建文件後,運行python create-a-source.py
,它將在RisingWave中創建源表。
步驟4:創建物化視圖
接著,我們如同建立表格般創建一個新的具化視圖。我們建立一個名為create-a-materialized-view.py
的新檔案,並使用psycopg2
庫執行SQL查詢。上述最後兩個步驟亦可合併為一個Python腳本檔案。
import psycopg2
conn = psycopg2.connect(database="dev", user="root", password="", host="localhost", port="4566")
conn.autocommit = True
with conn.cursor() as cur:
cur.execute("""CREATE MATERIALIZED VIEW restaurant_orders_view AS
SELECT
window_start,
restaurant_id,
COUNT(*) AS total_order
FROM
HOP(delivery_orders_source, order_timestamp, INTERVAL '1' MINUTE, INTERVAL '15' MINUTE)
WHERE
order_state = 'CREATED'
GROUP BY
restaurant_id,
window_start;""")
conn.close()
上述SQL查詢實時計算特定餐廳在過去15分鐘內創建的訂單總數,並將結果緩存於具化視圖中。若數據發生變動或新的Kafka主題到達,RisingWave將自動增量更新具化視圖的結果。一旦設置好數據源和具化視圖,即可開始數據攝入並使用Dash進行可視化。
第五步:構建Dash應用
現在我們來構建Dash應用,以查詢並可視化我們在RisingWave中的具化視圖內容。您可以參考20分鐘學Dash教程了解Dash的基本構建塊。我們的示例應用程式代碼以表格和圖表形式展示餐廳訂單數據。請參閱以下dash-example.py
中的Python代碼:
import psycopg2
import pandas as pd
import dash
from dash import dash_table
from dash import dcc
import dash_html_components as html
import plotly.express as px
# Connect to the PostgreSQL database
conn = psycopg2.connect(database="dev", user="root", password="", host="localhost", port="4566")
# Retrieve data from the materialized view using pandas
df = pd.read_sql_query("SELECT window_start, restaurant_id, total_order FROM restaurant_orders_view;", conn)
# Create a Dash application
app = dash.Dash(__name__)
# Define layout
app.layout = html.Div(children=[
html.H1("Restaurant Orders Table"),
dash_table.DataTable(id="restaurant_orders_table", columns=[{"name": i, "id": i} for i in df.columns], data=df.to_dict("records"), page_size=10),
html.H1("Restaurant Orders Graph"),
dcc.Graph(id="restaurant_orders_graph", figure=px.bar(df, x="window_start", y="total_order", color="restaurant_id", barmode="group"))
])
# Run the application
if __name__ == '__main__':
app.run_server(debug=True)
此段程式碼片段從restaurant_orders_view
實體化檢視中提取數據,使用pandas,並透過dash_table.DataTable
在Dash表格中展示,以及使用dcc.Graph
繪製條形圖。表格與條形圖的欄位對應於實體化檢視中的欄位(’window_start’, ‘total_order’, 和 ‘restaurant_id’),而行則對應於實體化檢視中的數據。
步驟6:查看結果
您可以透過運行上述dash-example.py
腳本來啟動應用程序,並在您的網頁瀏覽器中導航至http://localhost:8050/
(終端機將顯示一條消息,指示您前往此連結)。
總結
總體而言,Dash是一個強大的工具,用於創建需要複雜UI和數據可視化能力的数据分析视图,全部使用Python編程語言的簡單性和優雅性。當我們將其與RisingWave流式數據庫結合使用時,我們可以獲得對實時數據的洞察,這有助於我們做出更明智的決策並採取行動以優化性能。
相關資源
Source:
https://dzone.com/articles/visualize-real-time-data-with-python-dash-and-risi