リアルタイムデータは、企業が迅速な意思決定を行うために重要です。このデータを視覚的に確認することで、さらに迅速な意思決定が可能になります。様々なデータアプリやダッシュボードを使用して、データの視覚表現を作成できます。Dashは、インタラクティブなチャート、グラフ、テーブル、その他のUI要素を作成するための幅広い組み込みコンポーネントを提供するオープンソースのPythonライブラリです。RisingWaveは、リアルタイムデータ処理のためのSQLベースのストリーミングデータベースです。この記事では、Python、Dash、RisingWaveを使用してリアルタイムデータの可視化を行う方法について説明します。
リアルタイムデータを可視化する方法
リアルタイムデータは、異なるデータソースから収集されたデータをすぐに生成および処理するデータであることを知っています。ソースはPostgresやMySQLのような典型的なデータベースや、Kafkaのようなメッセージブローカーである可能性があります。リアルタイムデータの可視化は、いくつかのステップで構成されています:まず、取り込み、次に処理、そして最後にダッシュボードでこのデータを表示します。
注文配送データの場合、このデータをリアルタイムで可視化することで、レストランや配送サービスのパフォーマンスに関する貴重な洞察を得ることができます。例えば、リアルタイムデータを使用して、注文がどれだけ早く配送されるかを監視し、配送プロセスのボトルネックを特定し、注文量の時間的な変化を追跡することができます。データが絶えず変化している場合、何が起こっているかをすべて追跡し、パターンやトレンドを特定することが難しくなることがあります。DashやRisingWaveなどのフリーツールを使用すると、この絶え間ない変化するデータを探索し分析できるインタラクティブな可視化を作成できます。
データを扱う際に最初に思い浮かべるプログラミング言語がPythonかもしれませんが、ライブラリの幅広いものがあります。Dashはその一つであり、Pythonコードのみを使用して、豊富でカスタマイズ可能なユーザーインターフェースを持つデータアプリを作成できます。Dashは、Flask、Plotly.js、React.jsといった人気のあるウェブ開発ツールの上に構築されているため、HTML、CSS、その他のJavaScriptフレームワークを知る必要はありません。
RisingWaveを使用することで、さまざまなソースからのデータストリームを消費し、複雑なクエリに最適化されたマテリアライズドビューを作成し、SQLを使用してリアルタイムデータをクエリすることができます。RisingWaveはPostgreSQLとワイヤ互換性があるため、psycopg2
(PythonのPostgreSQLクライアントライブラリ)ドライバを使用してRisingWaveに接続し、クエリ操作を行うことができます。次のセクションで見てみましょう。

注文配送データデモの視覚化
デモチュートリアルでは、以下のGitHubリポジトリを利用してRisingWaveデモを活用します。ここでは、Docker Composeを使用して必要な設定がすべて完了していると仮定します。RisingWaveを実行する他の方法については、公式ウェブサイトを確認できます。Kafkaトピックとしてdelivery_orders
があり、これには食品配送ウェブサイトで発生した各注文のイベントが含まれています。各イベントには、注文ID
、レストランID
、配送状態
などの注文に関する情報が含まれています。ワークロードジェネレータ(Datagenと呼ばれるPythonスクリプト)は、ランダムなモックデータの生成をシミュレートし、それらをKafkaトピックに連続的にストリーミングします。実際には、このモックデータは、ウェブアプリやバックエンドサービスから来るデータに置き換えることができます。
始める前に
このチュートリアルを完了するためには、以下が必要です。
- 環境にDockerとDocker Composeがインストールされていることを確認してください。
- PostgreSQLの対話型ターミナルであるpsqlが環境にインストールされていることを確認してください。詳細な手順については、PostgreSQLのダウンロードを参照してください。
- お使いのOS用にPython 3をダウンロードしてインストールしてください。
pip
コマンドは自動的にインストールされます。
Windows OS、Docker Desktop、およびPython 3.10.11バージョンがインストールされた環境でテストしたデモです。
ステップ1: RisingWaveのデモクラスタの設定
まず、RisingWaveのサンプルリポジトリをローカル環境にクローンしてください。
git clone <https://github.com/risingwavelabs/risingwave.git>
次に、integration_tests/delivery
ディレクトリに移動し、docker composeファイルからデモクラスタを起動してください。
cd risingwave/integration_tests/delivery
docker compose up -d
すべてのコンテナが起動していることを確認してください!
ステップ2: DashとPsycopg2ライブラリのインストール
ダッシュのインストールについては、ウェブサイト上のダッシュのインストールガイドも参照ください。基本的には、以下のpip install
コマンドを実行して、2つのライブラリ(ダッシュ自体とパンダ)をインストールする必要があります:
# This also brings along the Plotly graphing library.
# Plotly is known for its interactive charts
# Plotly Express requires Pandas to be installed too.
pip install dash pandas
また、リジングウェーブストリーミングデータベースと対話するために、psycopg2
もインストールするべきです:
pip install psycopg2-binary
ステップ3: データソースの作成
リジングウェーブでリアルタイムデータを取り込むためには、最初にデータソースを設定する必要があります。デモプロジェクトでは、Kafkaをデータソースとして定義するべきです。新しいファイルcreate-a-source.py
を作成し、リジングウェーブに接続して、delivery_orders
Kafkaトピックを消費し永続化するテーブルを作成するPythonスクリプトがあるintegration_tests/delivery
ディレクトリと同じ場所に配置します。以下のコードを新しいファイルにコピー&ペーストするだけです。
import psycopg2
conn = psycopg2.connect(database="dev", user="root", password="", host="localhost", port="4566") # Connect to RisingWave.
conn.autocommit = True # Set queries to be automatically committed.
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE delivery_orders_source (
order_id BIGINT,
restaurant_id BIGINT,
order_state VARCHAR,
order_timestamp TIMESTAMP
) WITH (
connector = 'kafka',
topic = 'delivery_orders',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) ROW FORMAT JSON;""") # Execute the query.
conn.close() # Close the connection.
ファイルを作成した後、python create-a-source.py
を実行すると、リジングウェーブにソーステーブルが作成されます。
ステップ4: マテリアライズドビューの作成
次に、テーブルを作成したのと同様に新しいマテリアライズドビューを作成します。新しいファイルをcreate-a-materialized-view.py
として作成し、psycopg2
ライブラリを使用してSQLクエリを実行します。上記の最後の2つのステップを1つのPythonスクリプトファイルに統合することも可能です。
import psycopg2
conn = psycopg2.connect(database="dev", user="root", password="", host="localhost", port="4566")
conn.autocommit = True
with conn.cursor() as cur:
cur.execute("""CREATE MATERIALIZED VIEW restaurant_orders_view AS
SELECT
window_start,
restaurant_id,
COUNT(*) AS total_order
FROM
HOP(delivery_orders_source, order_timestamp, INTERVAL '1' MINUTE, INTERVAL '15' MINUTE)
WHERE
order_state = 'CREATED'
GROUP BY
restaurant_id,
window_start;""")
conn.close()
上記のSQLクエリは、特定のレストランから過去15分間に作成された注文の総数をリアルタイムで計算し、マテリアライズドビューに結果をキャッシュします。データが変更されたり、新しいKafkaトピックが到着した場合、RisingWaveは自動的にマテリアライズドビューの結果を増分して更新します。データソース、マテリアライズドビューを設定したら、データの取り込みを開始し、Dashを使用してこのデータを可視化できます。
ステップ5: Dashアプリの構築
次に、RisingWaveにあるマテリアライズドビューの内容を照会して可視化するためにDashアプリを構築します。Dashの基本構成要素を理解するために、チュートリアルDash in 20 minsに従ってください。私たちの例のアプリケーションコードは、テーブル形式とグラフ形式の両方でレストランの注文データを表示します。以下のPythonコードをdash-example.py
で参照してください。
import psycopg2
import pandas as pd
import dash
from dash import dash_table
from dash import dcc
import dash_html_components as html
import plotly.express as px
# Connect to the PostgreSQL database
conn = psycopg2.connect(database="dev", user="root", password="", host="localhost", port="4566")
# Retrieve data from the materialized view using pandas
df = pd.read_sql_query("SELECT window_start, restaurant_id, total_order FROM restaurant_orders_view;", conn)
# Create a Dash application
app = dash.Dash(__name__)
# Define layout
app.layout = html.Div(children=[
html.H1("Restaurant Orders Table"),
dash_table.DataTable(id="restaurant_orders_table", columns=[{"name": i, "id": i} for i in df.columns], data=df.to_dict("records"), page_size=10),
html.H1("Restaurant Orders Graph"),
dcc.Graph(id="restaurant_orders_graph", figure=px.bar(df, x="window_start", y="total_order", color="restaurant_id", barmode="group"))
])
# Run the application
if __name__ == '__main__':
app.run_server(debug=True)
このコードスニペットは、pandasを使用してrestaurant_orders_view
マテリアライズドビューからデータを取得し、dash_table.DataTable
を使用してDashテーブルに表示し、dcc.Graph
を使用して棒グラフを表示します。テーブルと棒グラフには、マテリアライズドビューの列(’window_start’、’total_order’、および’restaurant_id’)に対応する列があり、マテリアライズドビューのデータに対応する行があります。
ステップ6: 結果の確認
上記のdash-example.py
スクリプトを実行してアプリケーションを起動し、ウェブブラウザでhttp://localhost:8050/
にアクセスすることで結果を確認できます(ターミナルにこのリンクにアクセスするように指示されます)。
まとめ
全体として、Dashは複雑なUIとデータ可視化機能を必要とするデータ分析ビューを作成するための強力なツールであり、Pythonプログラミング言語のシンプルさとエレガンスを活かしています。RisingWaveストリーミングデータベースと共に使用することで、リアルタイムデータの洞察を得ることができ、より情報に基づいた意思決定やパフォーマンス最適化のための行動を取ることができます。
関連リソース
Source:
https://dzone.com/articles/visualize-real-time-data-with-python-dash-and-risi