在本文中,我將介紹一個基於Flask的Web應用程序的開發過程,該應用程序與SQL Server數據庫交互,用於分析人口數據。該應用程序允許用戶查詢人口範圍,按州提取縣市,並檢索特定人口範圍內的州。我還將討論如何集成Redis來緩存查詢結果以提高性能。
為什麼選擇Flask、SQL Server和Redis?
Flask是一個輕量級靈活的Python Web框架,非常適合構建中小型Web應用程序。它提供了創建RESTful API、呈現動態HTML模板和與數據庫交互所需的工具。另一方面,SQL Server是一個強大的關係型數據庫管理系統(RDBMS),廣泛應用於企業應用程序中。將Flask與SQL Server結合使用,可以為數據分析和可視化構建一個強大的應用程序。
為了進一步提高性能,我們將集成Redis,一個內存數據存儲庫,用於緩存經常訪問的查詢結果。這樣可以減輕數據庫負擔,加快重複查詢的響應時間。
應用程序概述
我們的Flask應用程序執行以下任務:
- 查詢人口範圍。用戶可以指定一個年份和人口範圍,以獲得在該範圍內的州數量。
- 按州查詢縣。用戶可以輸入州代碼以檢索縣的列表。
- 按人口範圍檢索州。用戶可以指定人口範圍和年份,以獲得該範圍內的州列表。
- 注意。要進行測試,請隨意在數據庫中創建自己的架構並根據以下使用SQL查詢共享的API插入所需的示例數據。此外,這裡使用的HTML頁面可以是基本的表設計,從Flask應用代碼中獲取返回的數據並顯示結果。
讓我們深入了解實現細節。
設置Flask應用程序
1. 先決條件
開始之前,請確保您已通過終端機 root 安裝了以下軟件(與 MacOS 兼容的命令):
- Python 3.x
- Flask(
pip install flask
) - SQLAlchemy(
pip install sqlalchemy
) - PyODBC(
pip install pyodbc
) - Redis(
pip install redis
)
2. 數據庫連接
我們使用 SQLAlchemy 來連接到 SQL Server 數據庫。以下是連接的配置方式:
from sqlalchemy import create_engine
import urllib
# SQL Server connection string
params = urllib.parse.quote_plus(
"Driver={ODBC Driver 17 for SQL Server};"
"Server=tcp:username.database.windows.net,1433;"
"Database=population;"
"Uid=user@username;"
"Pwd={azure@123};"
"Encrypt=yes;"
"TrustServerCertificate=no;"
"Connection Timeout=30;"
)
engine = create_engine("mssql+pyodbc:///?odbc_connect=%s" % params)
這個連接字串使用了 SQL Server 的 ODBC 驅動程序,並包括了加密和超時的參數。
3. Redis 配置
Redis 用於緩存查詢結果。以下是設置 Redis 連接的方法:
import redis
# Redis connection
redis_client = redis.StrictRedis(
host='username.redis.cache.windows.net',
port=6380,
db=0,
password='encryptedpasswordstring',
ssl=True
)
4. 實現應用程序路由
首頁路由
首頁路由渲染應用程序的主頁:
route('/') .
def index():
return render_template('index.html')
使用 Redis 緩存的人口範圍查詢
這個路由處理人口範圍的查詢。它首先檢查結果是否在 Redis 中緩存。如果沒有,則查詢數據庫並將結果緩存供將來使用:
route('/population-range', methods=['GET', 'POST']) .
def population_range():
if request.method == 'POST': # input params defined for this api
year = request.form['yr1']
range1_start = request.form['r1']
range1_end = request.form['r2']
range2_start = request.form['r3']
range2_end = request.form['r4']
range3_start = request.form['r5']
range3_end = request.form['r6']
# Map year to column name
year_map = {
'2010': 'ten',
'2011': 'eleven',
'2012': 'twelve',
'2013': 'thirteen',
'2014': 'fourteen',
'2015': 'fifteen',
'2016': 'sixteen',
'2017': 'seventeen',
'2018': 'eighteen'
}
year_column = year_map.get(year, 'ten') # Default to 'ten' if year not found
# Build cache key
cache_key = f"population_range_{year_column}_{range1_start}_{range1_end}_{range2_start}_{range2_end}_{range3_start}_{range3_end}"
# Check if result is cached
cached_result = redis_client.get(cache_key)
if cached_result:
result = eval(cached_result) # Deserialize cached result
time_taken = 0 # No database query, so time taken is negligible
cache_status = "Cache Hit"
else:
# Build SQL query
query = f"""
SELECT
SUM(CASE WHEN {year_column} BETWEEN '{range1_start}' AND '{range1_end}' THEN 1 ELSE 0 END) AS range1_count,
SUM(CASE WHEN {year_column} BETWEEN '{range2_start}' AND '{range2_end}' THEN 1 ELSE 0 END) AS range2_count,
SUM(CASE WHEN {year_column} BETWEEN '{range3_start}' AND '{range3_end}' THEN 1 ELSE 0 END) AS range3_count
FROM popul
"""
print(query) # For debugging
# Execute query and measure time
start_time = time()
result = engine.execute(query).fetchall()
end_time = time()
time_taken = end_time - start_time
cache_status = "Cache Miss"
# Cache the result
redis_client.set(cache_key, str(result), ex=3600) # Cache for 1 hour
return render_template('display.html', data1=result, t1=time_taken, cache_status=cache_status)
return render_template('index.html')
使用 Redis 緩存獲取州內郡的路由
這個路由為指定州代碼檢索郡。它同樣使用 Redis 來緩存結果:
route('/counties-by-state', methods=['GET', 'POST']) .
def counties_by_state():
if request.method == 'POST':
state_code = request.form['state_code']
# Build cache key
cache_key = f"counties_by_state_{state_code}"
# Check if result is cached
cached_result = redis_client.get(cache_key)
if cached_result:
result = eval(cached_result) # Deserialize cached result
time_taken = 0 # No database query, so time taken is negligible
cache_status = "Cache Hit"
else:
# Build SQL query
query = f"""
SELECT county
FROM dbo.county
WHERE state = (SELECT state FROM codes WHERE code = '{state_code}')
"""
print(query) # For debugging
# Execute query and measure time
start_time = time()
result = engine.execute(query).fetchall()
end_time = time()
time_taken = end_time - start_time
cache_status = "Cache Miss"
# Cache the result
redis_client.set(cache_key, str(result), ex=3600) # Cache for 1 hour
return render_template('counties.html', data=result, time_taken=time_taken, cache_status=cache_status)
return render_template('index.html')
使用 Redis 緩存按人口範圍檢索州的路由
這個路由按指定人口範圍檢索州並將結果緩存:
route('/states-by-population', methods=['GET', 'POST']) .
def states_by_population():
if request.method == 'POST':
year = request.form['year']
population_start = request.form['population_start']
population_end = request.form['population_end']
# Map year to column name
year_map = {
'2010': 'ten',
'2011': 'eleven',
'2012': 'twelve',
'2013': 'thirteen',
'2014': 'fourteen',
'2015': 'fifteen',
'2016': 'sixteen',
'2017': 'seventeen',
'2018': 'eighteen'
}
year_column = year_map.get(year, 'ten') # Default to 'ten' if year not found
# Build cache key
cache_key = f"states_by_population_{year_column}_{population_start}_{population_end}"
# Check if result is cached
cached_result = redis_client.get(cache_key)
if cached_result:
result = eval(cached_result) # Deserialize cached result
time_taken = 0 # No database query, so time taken is negligible
cache_status = "Cache Hit"
else:
# Build SQL query
query = f"""
SELECT state
FROM popul
WHERE {year_column} BETWEEN '{population_start}' AND '{population_end}'
"""
print(query) # For debugging
# Execute query and measure time
start_time = time()
result = engine.execute(query).fetchall()
end_time = time()
time_taken = end_time - start_time
cache_status = "Cache Miss"
# Cache the result
redis_client.set(cache_key, str(result), ex=3600) # Cache for 1 hour
return render_template('states.html', data=result, time_taken=time_taken, cache_status=cache_status)
return render_template('index.html')
性能比較:SQL Server vs. Redis
Query Type | Redis Fetch Time | SQL Execution Time |
---|---|---|
人口範圍查詢(已緩存) | 0.002 秒 | 0.000 秒 |
人口範圍查詢(新查詢) | 0.002 秒 | 1.342 秒 |
要點:Redis 將執行時間從 約 1.3 秒降至約 0.002 秒,使查詢速度提高了 650 倍!
Redis 如何提升性能
Redis 是一個內存數據存儲庫,作為應用程序與數據庫之間的緩存層。以下是它在我們應用程序中的工作方式:
- 緩存鍵。為每個查詢生成一個基於其參數的唯一鍵。
- 緩存檢查。在執行數據庫查詢之前,應用程序檢查結果是否已經在 Redis 中緩存。
- 緩存命中。如果在 Redis 中找到結果,則立即返回,避免數據庫查詢。
- 緩存未命中。如果未找到結果,則執行查詢,並將結果緩存到 Redis 以供將來使用。
- 緩存過期。緩存的結果設置為在指定時間後過期(例如,1 小時),以確保數據新鮮度。
通過緩存頻繁訪問的查詢結果,Redis 顯著減輕了數據庫負載,並改善了重複查詢的響應時間。
結論
在本文中,我們構建了一個與 SQL Server 數據庫交互以分析人口數據的 Flask 應用程序。我們集成了 Redis 來緩存查詢結果,從而提高性能並減輕數據庫負載。通過遵循最佳實踐,您可以擴展此應用程序以處理更複雜的查詢並為生產使用進行擴展。
鏈接:此完整應用程序的源代碼可在 GitHub 上找到。
Source:
https://dzone.com/articles/build-data-analytics-platform-flask-sql-redis