Визуализируйте данные в реальном времени с помощью Python, Dash и RisingWave

Реальные данные важны для бизнеса, чтобы принимать быстрые решения. Визуальное представление этих данных может помочь принимать решения еще быстрее. Мы можем создавать визуальные представления данных с использованием различных приложений или панелей мониторинга данных. Dash – это открытый исходный код библиотеки Python, которая предоставляет широкий спектр встроенных компонентов для создания интерактивных графиков, диаграмм, таблиц и других элементов пользовательского интерфейса. RisingWave – это основанная на SQL потоковая база данных для обработки реальных данных. В этой статье будет объяснено, как использовать Python, Dash и RisingWave для создания визуализаций реальных данных.

Как визуализировать данные в реальном времени

Мы знаем, что реальные данные – это данные, которые генерируются и обрабатываются немедленно, так как они собираются из различных источников данных. Источниками могут быть типичные базы данных, такие как Postgres или MySQL, и брокеры сообщений, такие как Kafka. Визуализация реальных данных состоит из нескольких шагов: сначала мы подаем, затем обрабатываем, и, наконец, показываем эти данные на панели мониторинга.

В случае с данными доставки заказов, визуализация этих данных в реальном времени может предоставить ценные сведения о производительности ресторана или службы доставки. Например, мы можем использовать реальные данные для мониторинга того, как долго занимает доставка заказов, выявления узких мест в процессе доставки и отслеживания изменений объема заказов со временем. При работе с постоянно меняющимися данными может быть сложно отслеживать все, что происходит, и выявлять закономерности или тенденции. Использование бесплатных инструментов, таких как Dash и RisingWave, позволяет создавать интерактивные визуализации, которые позволяют нам исследовать и анализировать постоянно меняющиеся данные.

Когда дело доходит до работы с данными, первым языком программирования, который может прийти на ум, является Python, поскольку он обладает широким спектром библиотек. Dash – одна из них, которая позволяет нам создавать приложения для работы с данными с богатым и настраиваемым пользовательским интерфейсом, используя только код Python. Dash построен на основе Flask, Plotly.js и React.js, которые являются популярными инструментами веб-разработки, поэтому вам не нужно знать HTML, CSS или другие фреймворки JavaScript.

С помощью RisingWave мы можем потреблять потоки данных из различных источников, создавать материализованные представления, оптимизированные для сложных запросов, и запрашивать реальные данные с помощью SQL. Поскольку RisingWave совместим по шине с PostgreSQL, мы можем использовать драйвер psycopg2 (библиотека клиента PostgreSQL на Python) для подключения к RisingWave и выполнения операций запроса. См. в следующем разделе.

Визуализация данных доставки заказов – демо

В учебнике-демо мы воспользуемся следующим репозиторием на GitHub с демонстрациями RisingWave, где мы предполагаем, что все необходимые настройки выполнены с помощью Docker Compose. Вы можете ознакомиться с другими способами запуска RisingWave на официальном сайте. У нас есть топик Kafka с именем delivery_orders, который содержит события для каждого размещенного заказа на сайте доставки еды. Каждое событие включает информацию о заказе, такую как ID заказа, ID ресторана и статус доставки. Генератор нагрузки (скрипт на Python под названием Datagen) имитирует непрерывное создание случайных фейковых данных и передачу их в топики Kafka. На практике эти фейковые данные можно заменить данными, поступающими от вашего веб-приложения или сервиса бэкенда.

До начала работы

Для завершения этого урока вам потребуется:

  • Убедитесь, что у вас установлены Docker и Docker Compose в вашей среде.
  • Убедитесь, что интерактивная терминал PostgreSQL psql установлен в вашей среде. Подробные инструкции см. в Download PostgreSQL.
  • Скачайте и установите Python 3 для вашей ОС. Команда pip будет установлена автоматически.

Демонстрация, которую я тестировал на ОС Windows, Docker Desktop, и Python 3.10.11 версия установлена.

Шаг 1: Настройка демонстрационного кластера RisingWave

Сначала клонируйте примерный репозиторий RisingWave в вашу локальную среду.

git clone <https://github.com/risingwavelabs/risingwave.git>

Затем перейдите в каталог integration_tests/delivery и запустите демонстрационный кластер из docker compose файла.

cd risingwave/integration_tests/delivery
docker compose up -d

Убедитесь, что все контейнеры работают!

Шаг 2: Установите библиотеки Dash и Psycopg2

Для установки Dash можно также обратиться к руководству по установке Dash на сайте. В основном, нам нужно установить две библиотеки (саму Dash и Pandas) с помощью следующей команды pip install:

# This also brings along the Plotly graphing library.
# Plotly is known for its interactive charts
# Plotly Express requires Pandas to be installed too.
pip install dash pandas

Также следует установить psycopg2 для взаимодействия с потоковым базой данных RisingWave:

pip install psycopg2-binary

Шаг 3: Создание источника данных

Для загрузки реальных данных с помощью RisingWave сначала нужно настроить источник данных. В демонстрационном проекте Kafka должен быть определен как источник данных. Мы собираемся создать новый файл с названием create-a-source.py, в том же каталоге integration_tests/delivery с Python-скриптом, где мы подключаемся к RisingWave и создаем таблицу для приема и сохранения тем Kafka delivery_orders. Вы можете просто скопировать и вставить нижеприведенный код в новый файл.

import psycopg2

conn = psycopg2.connect(database="dev", user="root", password="", host="localhost", port="4566") # Connect to RisingWave.

conn.autocommit = True # Set queries to be automatically committed.

with conn.cursor() as cur:
    cur.execute("""
CREATE TABLE delivery_orders_source (
    order_id BIGINT,
    restaurant_id BIGINT,
    order_state VARCHAR,
    order_timestamp TIMESTAMP
) WITH (
    connector = 'kafka',
    topic = 'delivery_orders',
    properties.bootstrap.server = 'message_queue:29092',
    scan.startup.mode = 'earliest'
) ROW FORMAT JSON;""") # Execute the query.

conn.close() # Close the connection.

После создания файла выполните python create-a-source.py, и он создаст таблицу источника в RisingWave.

Шаг 4: Создание материализованного представления

Далее мы создаем новую материализованное представление подобно тому, как мы создавали таблицу. Создаем новый файл с названием create-a-materialized-view.py и выполняем SQL-запрос с использованием библиотеки psycopg2. Также возможно объединить вышеупомянутые последние два шага в один файл скрипта Python.

import psycopg2

conn = psycopg2.connect(database="dev", user="root", password="", host="localhost", port="4566")
conn.autocommit = True

with conn.cursor() as cur:
    cur.execute("""CREATE MATERIALIZED VIEW restaurant_orders_view AS
SELECT
    window_start,
    restaurant_id,
    COUNT(*) AS total_order
FROM
    HOP(delivery_orders_source, order_timestamp, INTERVAL '1' MINUTE, INTERVAL '15' MINUTE)
WHERE
        order_state = 'CREATED'
GROUP BY
    restaurant_id,
    window_start;""")

conn.close()

Выше SQL-запрос вычисляет количество созданных заказов от конкретного ресторана за последние 15 минут в режиме реального времени и кэширует результат в материализованном представлении. Если происходит изменение данных или приходят новые топики Kafka, RisingWave автоматически увеличивает и обновляет результат материализованного представления. После того как вы настроили источник данных, материализованное представление, вы можете начать впитывать данные и визуализировать их с помощью Dash.

Шаг 5: Создание приложения Dash

Теперь мы создаем наше приложение Dash для запроса и визуализации содержимого материализованного представления, которое у нас есть в RisingWave. Вы можете следовать туториалу Dash за 20 минут для понимания основных строительных блоков Dash. Наше демонстрационное приложение отображает данные о заказах ресторанов как в табличной, так и в графической формах. Смотрите ниже Python код в файле dash-example.py:

import psycopg2
import pandas as pd
import dash
from dash import dash_table
from dash import dcc
import dash_html_components as html
import plotly.express as px

# Connect to the PostgreSQL database
conn = psycopg2.connect(database="dev", user="root", password="", host="localhost", port="4566")

# Retrieve data from the materialized view using pandas
df = pd.read_sql_query("SELECT window_start, restaurant_id, total_order FROM restaurant_orders_view;", conn)

# Create a Dash application
app = dash.Dash(__name__)

# Define layout
app.layout = html.Div(children=[
    html.H1("Restaurant Orders Table"),
    dash_table.DataTable(id="restaurant_orders_table", columns=[{"name": i, "id": i} for i in df.columns], data=df.to_dict("records"), page_size=10),
    html.H1("Restaurant Orders Graph"),
    dcc.Graph(id="restaurant_orders_graph", figure=px.bar(df, x="window_start", y="total_order", color="restaurant_id", barmode="group"))
])

# Run the application
if __name__ == '__main__':
    app.run_server(debug=True)

Данный фрагмент кода извлекает данные из материализованного представления `restaurant_orders_view` с использованием pandas и отображает их в таблице Dash с помощью `dash_table.DataTable` и столбчатой диаграммы с использованием `dcc.Graph`. Таблица и столбчатая диаграмма имеют столбцы, соответствующие столбцам в материализованном представлении (‘window_start’, ‘total_order’, и ‘restaurant_id’) и строки, соответствующие данным в материализованном представлении.

Шаг 6: Просмотр результатов

Вы можете запустить приложение, выполнив скрипт `dash-example.py` выше, и перейдя по адресу `http://localhost:8050/` в вашем веб-браузере (в терминале вы увидите сообщение, указывающее на переход по этой ссылке).

Итог

В целом, Dash – это мощный инструмент для создания аналитических представлений данных, требующих сложных пользовательских интерфейсов и возможностей визуализации данных, все с использованием простоты и элегантности языка программирования Python. Когда мы используем его вместе с потоковой базой данных RisingWave, мы получаем представление о реальных данных, которое может помочь нам принимать более обоснованные решения и предпринимать действия для оптимизации производительности.

Связанные ресурсы

Source:
https://dzone.com/articles/visualize-real-time-data-with-python-dash-and-risi