多年來,JDBC和ODBC一直是數據庫互動的常用標準。如今,隨著我們凝視數據領域的廣闊天地,數據科學和數據湖分析的興起帶來了越來越大的數據集。相應地,我們需要越來越快的數據讀取和傳輸速度,因此我們開始尋找比JDBC和ODBC更優的解決方案。因此,我們在Apache Doris 2.1中引入了Arrow Flight SQL協議,為數據傳輸提供了數十倍的加速效果。
基於Arrow Flight SQL的高速數據傳輸
Apache Doris作為一個列式數據倉庫,其查詢結果以列式格式的數據塊形式排列。在2.1版本之前,這些數據塊必須先被序列化為行式格式的字節,然後才能通過MySQL客戶端或JDBC/ODBC驅動傳輸到目標客戶端。此外,如果目標客戶端是列式數據庫或如Pandas這樣的列式數據科學組件,則數據還需進行反序列化。序列化-反序列化過程成為數據傳輸的速度瓶頸。
Apache Doris 2.1 採用了基於 Arrow Flight SQL 的數據傳輸通道。(Apache Arrow 是一個專為跨系統及語言實現高效數據傳輸而設計的軟件開發平台,而 Arrow 格式旨在實現高性能且無損的數據交換。) 它允許透過 SQL 在多種主流編程語言中高速讀取 Doris 的大規模數據。對於同樣支持 Arrow 格式的目標客戶端,整個過程將免於序列化/反序列化,從而無性能損失。另一個優勢是,Arrow Flight 能充分利用多節點和多核心架構,實現並行數據傳輸,這是另一個高數據吞吐量的推動因素。
例如,若一個 Python 客戶端從 Apache Doris 讀取數據,Doris 首先會將列導向的 Blocks 轉換為 Arrow RecordBatch。接著在 Python 客戶端中,Arrow RecordBatch 將被轉換為 Pandas DataFrame。這兩次轉換都非常迅速,因為 Doris Blocks、Arrow RecordBatch 和 Pandas DataFrame 均為列導向。
此外,Arrow Flight SQL 提供了一個通用的 JDBC 驅動程序,以促進支持 Arrow Flight SQL 協議的數據庫之間的無縫通信。這釋放了 Doris 連接到更廣泛生態系統並應用於更多場景的潛力。
性能測試
“十倍提速”結論源自於我們的基準測試。我們嘗試使用PyMySQL、Pandas和Arrow Flight SQL從Doris讀取數據,並分別記錄了耗時。測試數據集為ClickBench。
各類數據類型的測試結果如下:
如圖所示,Arrow Flight SQL在所有數據類型上均超越PyMySQL和Pandas,性能提升範圍從20倍至數百倍不等。
使用方法
藉助Arrow Flight SQL支持,Apache Doris能夠利用Python ADBC驅動實現快速數據讀取。接下來,我將展示幾種常用的數據庫操作,包括DDL、DML、會話變量設置以及show
語句,這些操作均使用Python ADBC驅動(版本3.9及以上)。
1. 安裝庫
相關庫已發佈於PyPI,安裝步驟如下:
pip install adbc_driver_manager
pip install adbc_driver_flightsql
導入以下模塊/庫以與安裝的庫進行交互:
import adbc_driver_manager
import adbc_driver_flightsql.dbapi as flight_sql
2. 連接至Doris
建立一個與Doris Arrow Flight SQL服務交互的客戶端。前置條件包括Doris前端(FE)主機、Arrow Flight端口以及登錄用戶名/密碼。
配置Doris前端(FE)與後端(BE)的參數:
- 在
fe/conf/fe.conf
中,將arrow_flight_sql_port
設置為可用端口,例如9090。 - 在
be/conf/be.conf
中,將arrow_flight_port
設置為可用端口,例如9091。
假設Arrow Flight SQL服務將在Doris實例的9090和9091端口上分別運行FE和BE,並且Doris的用戶名/密碼是”user”和”pass”,連接過程將如下:
conn = flight_sql.connect(uri="grpc://127.0.0.1:9090", db_kwargs={
adbc_driver_manager.DatabaseOptions.USERNAME.value: "user",
adbc_driver_manager.DatabaseOptions.PASSWORD.value: "pass",
})
cursor = conn.cursor()
3. 創建表並檢索元數據
Pass the query to the cursor.execute()
function, which creates tables and retrieves metadata.
cursor.execute("DROP DATABASE IF EXISTS arrow_flight_sql FORCE;")
print(cursor.fetchallarrow().to_pandas())
cursor.execute("create database arrow_flight_sql;")
print(cursor.fetchallarrow().to_pandas())
cursor.execute("show databases;")
print(cursor.fetchallarrow().to_pandas())
cursor.execute("use arrow_flight_sql;")
print(cursor.fetchallarrow().to_pandas())
cursor.execute("""CREATE TABLE arrow_flight_sql_test
(
k0 INT,
k1 DOUBLE,
K2 varchar(32) NULL DEFAULT "" COMMENT "",
k3 DECIMAL(27,9) DEFAULT "0",
k4 BIGINT NULL DEFAULT '10',
k5 DATE,
)
DISTRIBUTED BY HASH(k5) BUCKETS 5
PROPERTIES("replication_num" = "1");""")
print(cursor.fetchallarrow().to_pandas())
cursor.execute("show create table arrow_flight_sql_test;")
print(cursor.fetchallarrow().to_pandas())
StatusResult
is 0, which means the query is executed successfully. (Such design is to ensure compatibility with JDBC.)
StatusResult
0 0
StatusResult
0 0
Database
0 __internal_schema
1 arrow_flight_sql
.. ...
507 udf_auth_db
[508 rows x 1 columns]
StatusResult
0 0
StatusResult
0 0
Table Create Table
0 arrow_flight_sql_test CREATE TABLE `arrow_flight_sql_test` (\n `k0`...
4. 導入數據
Execute an INSERT INTO statement to load test data into the table created:
cursor.execute("""INSERT INTO arrow_flight_sql_test VALUES
('0', 0.1, "ID", 0.0001, 9999999999, '2023-10-21'),
('1', 0.20, "ID_1", 1.00000001, 0, '2023-10-21'),
('2', 3.4, "ID_1", 3.1, 123456, '2023-10-22'),
('3', 4, "ID", 4, 4, '2023-10-22'),
('4', 122345.54321, "ID", 122345.54321, 5, '2023-10-22');""")
print(cursor.fetchallarrow().to_pandas())
StatusResult
0 0
5. 執行查詢
Perform queries on the above table, such as aggregation, sorting, and session variable setting.
cursor.execute("select * from arrow_flight_sql_test order by k0;")
print(cursor.fetchallarrow().to_pandas())
cursor.execute("set exec_mem_limit=2000;")
print(cursor.fetchallarrow().to_pandas())
cursor.execute("show variables like \"%exec_mem_limit%\";")
print(cursor.fetchallarrow().to_pandas())
cursor.execute("select k5, sum(k1), count(1), avg(k3) from arrow_flight_sql_test group by k5;")
print(cursor.fetchallarrow().to_pandas())
k0 k1 K2 k3 k4 k5
0 0 0.10000 ID 0.000100000 9999999999 2023-10-21
1 1 0.20000 ID_1 1.000000010 0 2023-10-21
2 2 3.40000 ID_1 3.100000000 123456 2023-10-22
3 3 4.00000 ID 4.000000000 4 2023-10-22
4 4 122345.54321 ID 122345.543210000 5 2023-10-22
[5 rows x 6 columns]
StatusResult
0 0
Variable_name Value Default_Value Changed
0 exec_mem_limit 2000 2147483648 1
k5 Nullable(Float64)_1 Int64_2 Nullable(Decimal(38, 9))_3
0 2023-10-22 122352.94321 3 40784.214403333
1 2023-10-21 0.30000 2 0.500050005
[2 rows x 5 columns]
6. 完成代碼
# Doris Arrow Flight SQL測試
# 步驟1,庫已發佈在PyPI上,可以輕鬆安裝。
# pip install adbc_driver_manager
# pip install adbc_driver_flightsql
import adbc_driver_manager
import adbc_driver_flightsql.dbapi as flight_sql
# 步驟2,創建一個與Doris Arrow Flight SQL服務交互的客戶端。
# 修改fe/conf/fe.conf中的arrow_flight_sql_port為可用端口,例如9090。
# 修改be/conf/be.conf中的arrow_flight_port為可用端口,例如9091。
conn = flight_sql.connect(uri="grpc://127.0.0.1:9090", db_kwargs={
adbc_driver_manager.DatabaseOptions.USERNAME.value: "root",
adbc_driver_manager.DatabaseOptions.PASSWORD.value: "",
})
cursor = conn.cursor()
# 通過SQL使用游標與Doris交互
def execute(sql):
print("\n### execute query: ###\n " + sql)
cursor.execute(sql)
print("### result: ###")
print(cursor.fetchallarrow().to_pandas())
# 步驟3,執行DDL語句,創建數據庫/表,顯示語句。
execute("DROP DATABASE IF EXISTS arrow_flight_sql FORCE;")
execute("show databases;")
execute("create database arrow_flight_sql;")
execute("show databases;")
execute("use arrow_flight_sql;")
execute("""CREATE TABLE arrow_flight_sql_test
(
k0 INT,
k1 DOUBLE,
K2 varchar(32) NULL DEFAULT "" COMMENT "",
k3 DECIMAL(27,9) DEFAULT "0",
k4 BIGINT NULL DEFAULT '10',
k5 DATE,
)
DISTRIBUTED BY HASH(k5) BUCKETS 5
PROPERTIES("replication_num" = "1");""")
execute("show create table arrow_flight_sql_test;")
# 步驟4,插入數據
execute("""INSERT INTO arrow_flight_sql_test VALUES
('0', 0.1, "ID", 0.0001, 9999999999, '2023-10-21'),
('1', 0.20, "ID_1", 1.00000001, 0, '2023-10-21'),
('2', 3.4, "ID_1", 3.1, 123456, '2023-10-22'),
('3', 4, "ID", 4, 4, '2023-10-22'),
('4', 122345.54321, "ID", 122345.54321, 5, '2023-10-22');""")
# 步驟5,執行查詢,聚合,排序,設置會話變量
execute("select * from arrow_flight_sql_test order by k0;")
execute("set exec_mem_limit=2000;")
execute("show variables like \"%exec_mem_limit%\";")
execute("select k5, sum(k1), count(1), avg(k3) from arrow_flight_sql_test group by k5;")
# 步驟6,關閉游標
cursor.close()
大規模數據傳輸示例
1. Python
In Python, after connecting to Doris using the ADBC Driver, you can use various ADBC APIs to load the Clickbench dataset from Doris into Python. Here’s how:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import adbc_driver_manager
import adbc_driver_flightsql.dbapi as flight_sql
import pandas
from datetime import datetime
my_uri = "grpc://0.0.0.0:`fe.conf_arrow_flight_port`"
my_db_kwargs = {
adbc_driver_manager.DatabaseOptions.USERNAME.value: "root",
adbc_driver_manager.DatabaseOptions.PASSWORD.value: "",
}
sql = "select * from clickbench.hits limit 1000000;"
# ADBC Driver Manager的PEP 249 (DB-API 2.0) API包裝器。
def dbapi_adbc_execute_fetchallarrow():
conn = flight_sql.connect(uri=my_uri, db_kwargs=my_db_kwargs)
cursor = conn.cursor()
start_time = datetime.now()
cursor.execute(sql)
arrow_data = cursor.fetchallarrow()
dataframe = arrow_data.to_pandas()
print("\n##################\n dbapi_adbc_execute_fetchallarrow" + ", cost:" + str(datetime.now() - start_time) + ", bytes:" + str(arrow_data.nbytes) + ", len(arrow_data):" + str(len(arrow_data)))
print(dataframe.info(memory_usage='deep'))
print(dataframe)
# ADBC將數據讀入pandas dataframe,比先fetchallarrow再to_pandas更快。
def dbapi_adbc_execute_fetch_df():
conn = flight_sql.connect(uri=my_uri, db_kwargs=my_db_kwargs)
cursor = conn.cursor()
start_time = datetime.now()
cursor.execute(sql)
dataframe = cursor.fetch_df()
print("\n##################\n dbapi_adbc_execute_fetch_df" + ", cost:" + str(datetime.now() - start_time))
print(dataframe.info(memory_usage='deep'))
print(dataframe)
# 可以並行讀取多個分區。
def dbapi_adbc_execute_partitions():
conn = flight_sql.connect(uri=my_uri, db_kwargs=my_db_kwargs)
cursor = conn.cursor()
start_time = datetime.now()
partitions, schema = cursor.adbc_execute_partitions(sql)
cursor.adbc_read_partition(partitions[0])
arrow_data = cursor.fetchallarrow()
dataframe = arrow_data.to_pandas()
print("\n##################\n dbapi_adbc_execute_partitions" + ", cost:" + str(datetime.now() - start_time) + ", len(partitions):" + str(len(partitions)))
print(dataframe.info(memory_usage='deep'))
print(dataframe)
dbapi_adbc_execute_fetchallarrow()
dbapi_adbc_execute_fetch_df()
dbapi_adbc_execute_partitions()
##################
dbapi_adbc_execute_fetchallarrow, cost:0:00:03.548080, bytes:784372793, len(arrow_data):1000000
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000000 entries, 0 to 999999
Columns: 105 entries, CounterID to CLID
dtypes: int16(48), int32(19), int64(6), object(32)
memory usage: 2.4 GB
None
CounterID EventDate UserID EventTime WatchID JavaEnable Title GoodEvent ... UTMCampaign UTMContent UTMTerm FromTag HasGCLID RefererHash URLHash CLID
0 245620 2013-07-09 2178958239546411410 2013-07-09 19:30:27 8302242799508478680 1 OWAProfessionov — Мой Круг (СВАО Интернет-магазин 1 ... 0 -7861356476484644683 -2933046165847566158 0
999999 1095 2013-07-03 4224919145474070397 2013-07-03 14:36:17 6301487284302774604 0 @дневники Sinatra (ЛАДА, цена для деталли кто ... 1 ... 0 -296158784638538920 1335027772388499430 0
[1000000 rows x 105 columns]
##################
dbapi_adbc_execute_fetch_df, cost:0:00:03.611664
##################
dbapi_adbc_execute_partitions, cost:0:00:03.483436, len(partitions):1
##################
low_level_api_execute_query, cost:0:00:03.523598, stream.address:139992182177600, rows:-1, bytes:784322926, len(arrow_data):1000000
##################
low_level_api_execute_partitions, cost:0:00:03.738128streams.size:3, 1, -1
2. JDBC
此驅動程式的使用方式與MySQL協議的驅動程式類似。您只需將連接URL中的jdbc:mysql
替換為jdbc:arrow-flight-sql
。返回的結果將以JDBC ResultSet資料結構呈現。
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
Class.forName("org.apache.arrow.driver.jdbc.ArrowFlightJdbcDriver");
String DB_URL = "jdbc:arrow-flight-sql://0.0.0.0:9090?useServerPrepStmts=false"
+ "&cachePrepStmts=true&useSSL=false&useEncryption=false";
String USER = "root";
String PASS = "";
Connection conn = DriverManager.getConnection(DB_URL, USER, PASS);
Statement stmt = conn.createStatement();
ResultSet resultSet = stmt.executeQuery("show tables;");
while (resultSet.next()) {
String col1 = resultSet.getString(1);
System.out.println(col1);
}
resultSet.close();
stmt.close();
conn.close();
3. JAVA
Similar to that with Python, you can directly create an ADBC client with JAVA to read data from Doris. Firstly, you need to obtain the FlightInfo. Then, you connect to each endpoint to pull the data.
// 方法一
AdbcStatement stmt = connection.createStatement()
stmt.setSqlQuery("SELECT * FROM " + tableName)
// executeQuery,兩步驟:
// 1. 執行查詢並獲取返回的FlightInfo;
// 2. 創建FlightInfoReader以順序遍歷每個Endpoint;
QueryResult queryResult = stmt.executeQuery()
// 方法二
AdbcStatement stmt = connection.createStatement()
stmt.setSqlQuery("SELECT * FROM " + tableName)
// 執行查詢並解析FlightInfo中的每個Endpoint,並使用Location和Ticket來構造一個PartitionDescriptor
partitionResult = stmt.executePartitioned();
partitionResult.getPartitionDescriptors()
// 為每個PartitionDescriptor創建ArrowReader以讀取資料
ArrowReader reader = connection2.readPartition(partitionResult.getPartitionDescriptors().get(0).getDescriptor()))
4. Spark
For Spark users, apart from connecting to Flight SQL Server using JDBC and JAVA, you can apply the Spark-Flight-Connector, which enables Spark to act as a client for reading and writing data from/to a Flight SQL Server. This is made possible by the fast data conversion between the Arrow format and the Block in Apache Doris, which is 10 times faster than the conversion between CSV and Block. Moreover, the Arrow data format provides more comprehensive and robust support for complex data types such as Map and Array.
跟上趨勢列車
A number of enterprise users of Doris have tried loading data from Doris to Python, Spark, and Flink using Arrow Flight SQL and enjoyed much faster data reading speed. In the future, we plan to include support for Arrow Flight SQL in data writing, too. By then, most systems built with mainstream programming languages will be able to read and write data from/to Apache Doris by an ADBC client. That’s high-speed data interaction which opens up numerous possibilities. On our to-do list, we also envision leveraging Arrow Flight to implement parallel data reading by multiple backends and facilitate federated queries across Doris and Spark. Download Apache Doris 2.1 and get a taste of 100 times faster data transfer powered by Arrow Flight SQL. If you need assistance, come find us in the Apache Doris developer and user community.
Source:
https://dzone.com/articles/data-warehouse-for-data-science-adopting-arrow