实时数据对企业快速决策至关重要。通过视觉呈现这些数据,决策过程可以更加迅速。我们可以利用各种数据应用或仪表盘创建数据的视觉表示。Dash是一个开源的Python库,提供了广泛的预置组件,用于创建交互式图表、图形、表格及其他用户界面元素。RisingWave是一种基于SQL的流式数据库,专为实时数据处理设计。本文将阐述如何运用Python、Dash和RisingWave来实现实时数据的视觉化展示。
如何实时可视化数据
我们了解到,实时数据是指从不同数据源收集到后立即生成并处理的数据。这些数据源可以是如Postgres或MySQL这样的典型数据库,或是像Kafka这样的消息代理。实时数据可视化包含几个步骤:首先,我们采集数据,接着处理,最后在仪表盘上展示这些数据。
在订单配送数据方面,实时可视化这些数据能为餐厅或配送服务的性能提供宝贵洞察。例如,我们可以利用实时数据监控订单配送所需时间,识别配送流程中的瓶颈,并追踪订单量的时间变化。处理不断变化的数据时,跟踪所有动态并识别模式或趋势可能颇具挑战。借助Dash和RisingWave等免费工具,我们能创建交互式可视化,以便探索和分析这种持续变化的数据。
谈到数据处理,首先想到的编程语言可能是Python,因为它拥有丰富的库。Dash便是其中之一,它让我们仅用Python代码就能构建出丰富且可定制的用户界面数据应用。Dash基于Flask、Plotly.js和React.js这些流行的Web开发工具构建,因此无需掌握HTML、CSS或其他JavaScript框架知识。
通过RisingWave,我们可以从多种来源获取数据流,创建针对复杂查询优化的物化视图,并使用SQL查询实时数据。由于RisingWave与PostgreSQL兼容,我们可以使用psycopg2
(Python中的PostgreSQL客户端库)驱动程序连接到RisingWave并执行查询操作。详情见下一部分。

可视化订单配送数据演示
在本演示教程中,我们将利用以下GitHub仓库中的RisingWave示例,假设所有必要设置已通过Docker Compose完成。您可以在官方网站上查看其他运行RisingWave的方法。我们有一个名为delivery_orders
的Kafka主题,其中包含食品配送网站上每笔订单的事件。每个事件都包含有关订单的信息,如订单ID
、餐厅ID
和配送状态
。工作负载生成器(名为Datagen的Python脚本)模拟连续生成随机模拟数据,并将它们流式传输到Kafka主题中。实际上,这些模拟数据可以替换为您网络应用或后端服务传来的数据。
开始之前
要完成本教程,您需要以下条件:
- 确保您的环境中已安装Docker和Docker Compose。
- 确保您的环境中已安装PostgreSQL交互式终端psql。详细安装指南请参阅下载PostgreSQL。
- 下载并安装适用于您操作系统的Python 3。
pip
命令将自动随之安装。
我所测试的演示是在Windows操作系统上进行的,安装了Docker桌面和Python 3.10.11版本。
第一步:搭建RisingWave演示集群
首先,将RisingWave示例仓库克隆到本地环境。
git clone <https://github.com/risingwavelabs/risingwave.git>
接着,进入integration_tests/delivery
目录,并根据docker compose文件启动演示集群。
cd risingwave/integration_tests/delivery
docker compose up -d
确保所有容器均已成功运行!
第二步:安装Dash和Psycopg2库
要安装Dash,您还可以参考网站上的Dash安装指南。基本上,我们需要通过运行以下pip install
命令来安装两个库(Dash本身和Pandas):
# This also brings along the Plotly graphing library.
# Plotly is known for its interactive charts
# Plotly Express requires Pandas to be installed too.
pip install dash pandas
我们还应该安装psycopg2
以与RisingWave流数据库进行交互:
pip install psycopg2-binary
步骤3:创建数据源
要使用RisingWave摄取实时数据,首先需要设置一个数据源。在演示项目中,应将Kafka定义为数据源。我们将创建一个名为create-a-source.py
的新文件,与我们在integration_tests/delivery
目录中连接到RisingWave的Python脚本相同,并创建一个表以消费和持久化delivery_orders
Kafka主题。您只需将以下代码复制并粘贴到新文件中即可。
import psycopg2
conn = psycopg2.connect(database="dev", user="root", password="", host="localhost", port="4566") # Connect to RisingWave.
conn.autocommit = True # Set queries to be automatically committed.
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE delivery_orders_source (
order_id BIGINT,
restaurant_id BIGINT,
order_state VARCHAR,
order_timestamp TIMESTAMP
) WITH (
connector = 'kafka',
topic = 'delivery_orders',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) ROW FORMAT JSON;""") # Execute the query.
conn.close() # Close the connection.
创建文件后,运行python create-a-source.py
,它将在RisingWave中创建源表。
步骤4:创建物化视图
接下来,我们创建一个类似于创建表的物化视图。我们创建一个名为create-a-materialized-view.py
的新文件,并使用psycopg2
库运行SQL查询。也可以将上述最后两个步骤合并为一个Python脚本文件。
import psycopg2
conn = psycopg2.connect(database="dev", user="root", password="", host="localhost", port="4566")
conn.autocommit = True
with conn.cursor() as cur:
cur.execute("""CREATE MATERIALIZED VIEW restaurant_orders_view AS
SELECT
window_start,
restaurant_id,
COUNT(*) AS total_order
FROM
HOP(delivery_orders_source, order_timestamp, INTERVAL '1' MINUTE, INTERVAL '15' MINUTE)
WHERE
order_state = 'CREATED'
GROUP BY
restaurant_id,
window_start;""")
conn.close()
上述SQL查询实时计算过去15分钟内从特定餐厅创建的订单总数,并将结果缓存在物化视图中。如果有任何数据变更或新的Kafka主题到达,RisingWave会自动递增并更新物化视图的结果。一旦设置好数据源和物化视图,就可以开始摄取数据,并使用Dash可视化这些数据。
步骤5:构建Dash应用
现在我们构建Dash应用,查询并可视化RisingWave中物化视图的内容。您可以参考20分钟学会Dash教程,了解Dash的基本构建块。我们的示例应用代码以表格和图形形式展示餐厅订单数据。请参见以下dash-example.py
中的Python代码:
import psycopg2
import pandas as pd
import dash
from dash import dash_table
from dash import dcc
import dash_html_components as html
import plotly.express as px
# Connect to the PostgreSQL database
conn = psycopg2.connect(database="dev", user="root", password="", host="localhost", port="4566")
# Retrieve data from the materialized view using pandas
df = pd.read_sql_query("SELECT window_start, restaurant_id, total_order FROM restaurant_orders_view;", conn)
# Create a Dash application
app = dash.Dash(__name__)
# Define layout
app.layout = html.Div(children=[
html.H1("Restaurant Orders Table"),
dash_table.DataTable(id="restaurant_orders_table", columns=[{"name": i, "id": i} for i in df.columns], data=df.to_dict("records"), page_size=10),
html.H1("Restaurant Orders Graph"),
dcc.Graph(id="restaurant_orders_graph", figure=px.bar(df, x="window_start", y="total_order", color="restaurant_id", barmode="group"))
])
# Run the application
if __name__ == '__main__':
app.run_server(debug=True)
此代码片段利用pandas从restaurant_orders_view
物化视图中提取数据,并通过dash_table.DataTable
在Dash表格中展示,同时使用dcc.Graph
呈现条形图。表格与条形图的列对应于物化视图中的列(’window_start’, ‘total_order’, 和 ‘restaurant_id’),而行则对应视图中的数据。
步骤6:查看结果
通过运行上述dash-example.py
脚本,您可以启动应用程序,并在网络浏览器中访问http://localhost:8050/
(终端会提示您前往此链接)。
总结
总体而言,Dash是一款强大的工具,适用于创建需要复杂用户界面和数据可视化能力的数据分析视图,这一切都得益于Python编程语言的简洁与优雅。当我们将其与RisingWave流数据库结合使用时,能够洞察实时数据,从而帮助我们做出更明智的决策并采取行动以优化性能。
相关资源
Source:
https://dzone.com/articles/visualize-real-time-data-with-python-dash-and-risi