在本文中,我将介绍一个基于Flask的Web应用程序的开发过程,该应用程序与SQL Server数据库进行交互以分析人口数据。该应用程序允许用户查询人口范围,按州获取县,并检索特定人口范围内的州。我还将讨论如何集成Redis来缓存查询结果,以提高性能。
为什么选择Flask、SQL Server和Redis?
Flask是一个轻量灵活的Python Web框架,非常适合构建中小型Web应用程序。它提供了创建RESTful API、渲染动态HTML模板和与数据库交互所需的工具。另一方面,SQL Server是一个强大的关系型数据库管理系统(RDBMS),在企业应用中被广泛使用。将Flask与SQL Server结合使用,可以构建一个强大的用于数据分析和可视化的应用程序。
为了进一步提升性能,我们将集成Redis,一个内存数据存储,用于缓存频繁访问的查询结果。这样可以减轻数据库负载,加快重复查询的响应时间。
应用程序概述
我们的Flask应用程序执行以下任务:
- 查询人口范围。用户可以指定年份和人口范围,以获取落入该范围内的州的计数。
- 按州获取县。用户可以输入州代码以检索县的列表。
- 按人口范围检索州。用户可以指定人口范围和年份,以获取该范围内的州列表。
- 注意。要进行测试,请随意在数据库中创建自己的架构,并根据以下使用SQL查询共享的API插入所需的示例数据。此外,在这里使用的HTML页面可以是基本的表设计,从Flask应用程序代码中获取返回的数据并显示结果。
让我们深入实施细节。
设置Flask应用程序
1. 先决条件
开始之前,请确保通过您的终端根目录安装了以下内容(与MacOS兼容的命令):
- Python 3.x
- Flask(
pip install flask
) - SQLAlchemy(
pip install sqlalchemy
) - PyODBC(
pip install pyodbc
) - Redis(
pip install redis
)
2. 数据库连接
我们使用SQLAlchemy来连接到SQL Server数据库。以下是连接的配置方式:
from sqlalchemy import create_engine
import urllib
# SQL Server connection string
params = urllib.parse.quote_plus(
"Driver={ODBC Driver 17 for SQL Server};"
"Server=tcp:username.database.windows.net,1433;"
"Database=population;"
"Uid=user@username;"
"Pwd={azure@123};"
"Encrypt=yes;"
"TrustServerCertificate=no;"
"Connection Timeout=30;"
)
engine = create_engine("mssql+pyodbc:///?odbc_connect=%s" % params)
这个连接字符串使用了 SQL Server 的 ODBC 驱动程序,并包括了加密和超时参数。
3. Redis 配置
Redis 用于缓存查询结果。以下是如何设置 Redis 连接:
import redis
# Redis connection
redis_client = redis.StrictRedis(
host='username.redis.cache.windows.net',
port=6380,
db=0,
password='encryptedpasswordstring',
ssl=True
)
4. 实现应用程序路由
主页路由
主页路由呈现应用程序的主页面:
route('/') .
def index():
return render_template('index.html')
使用 Redis 缓存的人口范围查询
该路由处理人口范围的查询。它首先检查结果是否在 Redis 中缓存。如果没有,则查询数据库并将结果缓存以供将来使用:
route('/population-range', methods=['GET', 'POST']) .
def population_range():
if request.method == 'POST': # input params defined for this api
year = request.form['yr1']
range1_start = request.form['r1']
range1_end = request.form['r2']
range2_start = request.form['r3']
range2_end = request.form['r4']
range3_start = request.form['r5']
range3_end = request.form['r6']
# Map year to column name
year_map = {
'2010': 'ten',
'2011': 'eleven',
'2012': 'twelve',
'2013': 'thirteen',
'2014': 'fourteen',
'2015': 'fifteen',
'2016': 'sixteen',
'2017': 'seventeen',
'2018': 'eighteen'
}
year_column = year_map.get(year, 'ten') # Default to 'ten' if year not found
# Build cache key
cache_key = f"population_range_{year_column}_{range1_start}_{range1_end}_{range2_start}_{range2_end}_{range3_start}_{range3_end}"
# Check if result is cached
cached_result = redis_client.get(cache_key)
if cached_result:
result = eval(cached_result) # Deserialize cached result
time_taken = 0 # No database query, so time taken is negligible
cache_status = "Cache Hit"
else:
# Build SQL query
query = f"""
SELECT
SUM(CASE WHEN {year_column} BETWEEN '{range1_start}' AND '{range1_end}' THEN 1 ELSE 0 END) AS range1_count,
SUM(CASE WHEN {year_column} BETWEEN '{range2_start}' AND '{range2_end}' THEN 1 ELSE 0 END) AS range2_count,
SUM(CASE WHEN {year_column} BETWEEN '{range3_start}' AND '{range3_end}' THEN 1 ELSE 0 END) AS range3_count
FROM popul
"""
print(query) # For debugging
# Execute query and measure time
start_time = time()
result = engine.execute(query).fetchall()
end_time = time()
time_taken = end_time - start_time
cache_status = "Cache Miss"
# Cache the result
redis_client.set(cache_key, str(result), ex=3600) # Cache for 1 hour
return render_template('display.html', data1=result, t1=time_taken, cache_status=cache_status)
return render_template('index.html')
使用 Redis 缓存按州代码获取县
该路由检索给定州代码的县。它还使用 Redis 缓存结果:
route('/counties-by-state', methods=['GET', 'POST']) .
def counties_by_state():
if request.method == 'POST':
state_code = request.form['state_code']
# Build cache key
cache_key = f"counties_by_state_{state_code}"
# Check if result is cached
cached_result = redis_client.get(cache_key)
if cached_result:
result = eval(cached_result) # Deserialize cached result
time_taken = 0 # No database query, so time taken is negligible
cache_status = "Cache Hit"
else:
# Build SQL query
query = f"""
SELECT county
FROM dbo.county
WHERE state = (SELECT state FROM codes WHERE code = '{state_code}')
"""
print(query) # For debugging
# Execute query and measure time
start_time = time()
result = engine.execute(query).fetchall()
end_time = time()
time_taken = end_time - start_time
cache_status = "Cache Miss"
# Cache the result
redis_client.set(cache_key, str(result), ex=3600) # Cache for 1 hour
return render_template('counties.html', data=result, time_taken=time_taken, cache_status=cache_status)
return render_template('index.html')
使用 Redis 缓存按人口范围获取州
该路由获取指定人口范围内的州并缓存结果:
route('/states-by-population', methods=['GET', 'POST']) .
def states_by_population():
if request.method == 'POST':
year = request.form['year']
population_start = request.form['population_start']
population_end = request.form['population_end']
# Map year to column name
year_map = {
'2010': 'ten',
'2011': 'eleven',
'2012': 'twelve',
'2013': 'thirteen',
'2014': 'fourteen',
'2015': 'fifteen',
'2016': 'sixteen',
'2017': 'seventeen',
'2018': 'eighteen'
}
year_column = year_map.get(year, 'ten') # Default to 'ten' if year not found
# Build cache key
cache_key = f"states_by_population_{year_column}_{population_start}_{population_end}"
# Check if result is cached
cached_result = redis_client.get(cache_key)
if cached_result:
result = eval(cached_result) # Deserialize cached result
time_taken = 0 # No database query, so time taken is negligible
cache_status = "Cache Hit"
else:
# Build SQL query
query = f"""
SELECT state
FROM popul
WHERE {year_column} BETWEEN '{population_start}' AND '{population_end}'
"""
print(query) # For debugging
# Execute query and measure time
start_time = time()
result = engine.execute(query).fetchall()
end_time = time()
time_taken = end_time - start_time
cache_status = "Cache Miss"
# Cache the result
redis_client.set(cache_key, str(result), ex=3600) # Cache for 1 hour
return render_template('states.html', data=result, time_taken=time_taken, cache_status=cache_status)
return render_template('index.html')
性能比较:SQL Server vs. Redis
Query Type | Redis Fetch Time | SQL Execution Time |
---|---|---|
人口范围查询(已缓存) | 0.002 秒 | 0.000 秒 |
人口范围查询(新的) | 0.002 秒 | 1.342 秒 |
关键收获:Redis 将执行时间从 ~1.3 秒减少到 ~0.002 秒,使查询快了 650 倍!
Redis如何提升性能
Redis是一个内存数据存储,作为应用程序和数据库之间的缓存层。以下是它在我们应用程序中的工作方式:
- 缓存键。针对每个查询基于其参数生成唯一的键。
- 缓存检查。在执行数据库查询之前,应用程序会检查结果是否已经缓存在Redis中。
- 缓存命中。如果结果在Redis中找到,则立即返回,避免进行数据库查询。
- 缓存未命中。如果未找到结果,则执行查询,并将结果缓存在Redis中以供将来使用。
- 缓存过期。缓存的结果设置为在指定时间后过期(例如,1小时),以确保数据的新鲜性。
通过缓存频繁访问的查询结果,Redis显著减少了对数据库的负载,并提高了重复查询的响应时间。
结论
在本文中,我们构建了一个与SQL Server数据库交互以分析人口数据的Flask应用程序。我们集成了Redis来缓存查询结果,提高性能并减少数据库负载。通过遵循最佳实践,您可以扩展此应用程序以处理更复杂的查询,并将其扩展到生产环境中使用。
链接:此完整应用程序的源代码可在GitHub上找到。
Source:
https://dzone.com/articles/build-data-analytics-platform-flask-sql-redis