この記事では、SQL Serverデータベースと連携し、人口データを分析するFlaskベースのウェブアプリケーションの開発について説明します。このアプリケーションでは、ユーザーが人口範囲をクエリし、州ごとに郡を取得し、特定の人口範囲内の州を取得できるようになっています。また、パフォーマンスを向上させるために、クエリ結果をキャッシュするためにRedisを統合する方法についても説明します。
なぜFlask、SQL Server、そしてRedisなのか?
Flaskは、小規模から中規模のウェブアプリケーションを構築するのに最適な軽量で柔軟なPythonウェブフレームワークです。RESTful APIを作成し、動的HTMLテンプレートをレンダリングし、データベースと対話するために必要なツールを提供します。一方で、SQL Serverは、企業アプリケーションで広く使用されている堅牢なリレーショナルデータベース管理システム(RDBMS)です。FlaskとSQL Serverを組み合わせることで、データ分析と視覚化のための強力なアプリケーションを構築できます。
さらにパフォーマンスを向上させるために、頻繁にアクセスされるクエリ結果をキャッシュするためにインメモリデータストアのRedisを統合します。これにより、データベースへの負荷が軽減され、繰り返しクエリに対する応答時間が短縮されます。
アプリケーション概要
私たちのFlaskアプリケーションは、以下のタスクを実行します:
- クエリ人口範囲。ユーザーは年と人口範囲を指定して、その範囲内の州の数を取得できます。
- 州ごとの郡を取得。ユーザーは州コードを入力して郡のリストを取得できます。
- 人口範囲ごとの州を取得。ユーザーは人口範囲と年を指定して、その範囲内の州のリストを取得できます。
- 注意。テストする場合は、データベースに独自のスキーマを作成し、以下のAPIを使用して共有されたSQLクエリに基づいて必要に応じてサンプルデータを挿入してください。また、ここで使用されるHTMLページは、Flaskアプリコードから返されたデータを取得し、結果を表示するための基本的なテーブル設計です。
実装の詳細に入りましょう。
Flaskアプリケーションの設定
1. 前提条件
開始する前に、ターミナルルートを介して次のものがインストールされていることを確認してください(MacOSと互換性のあるコマンド):
- Python 3.x
- Flask(
pip install flask
) - SQLAlchemy(
pip install sqlalchemy
) - PyODBC(
pip install pyodbc
) - Redis(
pip install redis
)
2. データベース接続
SQL Serverデータベースに接続するためにSQLAlchemyを使用します。接続の構成方法は次のとおりです:
from sqlalchemy import create_engine
import urllib
# SQL Server connection string
params = urllib.parse.quote_plus(
"Driver={ODBC Driver 17 for SQL Server};"
"Server=tcp:username.database.windows.net,1433;"
"Database=population;"
"Uid=user@username;"
"Pwd={azure@123};"
"Encrypt=yes;"
"TrustServerCertificate=no;"
"Connection Timeout=30;"
)
engine = create_engine("mssql+pyodbc:///?odbc_connect=%s" % params)
この接続文字列は、SQL ServerのODBCドライバーを使用し、暗号化とタイムアウトのパラメーターを含んでいます。
3. Redisの設定
Redisはクエリ結果をキャッシュするために使用されます。Redis接続の設定方法は次のとおりです:
import redis
# Redis connection
redis_client = redis.StrictRedis(
host='username.redis.cache.windows.net',
port=6380,
db=0,
password='encryptedpasswordstring',
ssl=True
)
4. アプリケーションルートの実装
ホームページのルート
ホームページのルートは、アプリケーションのメインページをレンダリングします:
route('/') .
def index():
return render_template('index.html')
Redisキャッシュを使用した人口範囲クエリ
このルートは人口範囲のクエリを処理します。最初にRedisで結果がキャッシュされているかどうかをチェックします。キャッシュされていない場合は、データベースからクエリを実行し、結果を将来の使用のためにキャッシュします:
route('/population-range', methods=['GET', 'POST']) .
def population_range():
if request.method == 'POST': # input params defined for this api
year = request.form['yr1']
range1_start = request.form['r1']
range1_end = request.form['r2']
range2_start = request.form['r3']
range2_end = request.form['r4']
range3_start = request.form['r5']
range3_end = request.form['r6']
# Map year to column name
year_map = {
'2010': 'ten',
'2011': 'eleven',
'2012': 'twelve',
'2013': 'thirteen',
'2014': 'fourteen',
'2015': 'fifteen',
'2016': 'sixteen',
'2017': 'seventeen',
'2018': 'eighteen'
}
year_column = year_map.get(year, 'ten') # Default to 'ten' if year not found
# Build cache key
cache_key = f"population_range_{year_column}_{range1_start}_{range1_end}_{range2_start}_{range2_end}_{range3_start}_{range3_end}"
# Check if result is cached
cached_result = redis_client.get(cache_key)
if cached_result:
result = eval(cached_result) # Deserialize cached result
time_taken = 0 # No database query, so time taken is negligible
cache_status = "Cache Hit"
else:
# Build SQL query
query = f"""
SELECT
SUM(CASE WHEN {year_column} BETWEEN '{range1_start}' AND '{range1_end}' THEN 1 ELSE 0 END) AS range1_count,
SUM(CASE WHEN {year_column} BETWEEN '{range2_start}' AND '{range2_end}' THEN 1 ELSE 0 END) AS range2_count,
SUM(CASE WHEN {year_column} BETWEEN '{range3_start}' AND '{range3_end}' THEN 1 ELSE 0 END) AS range3_count
FROM popul
"""
print(query) # For debugging
# Execute query and measure time
start_time = time()
result = engine.execute(query).fetchall()
end_time = time()
time_taken = end_time - start_time
cache_status = "Cache Miss"
# Cache the result
redis_client.set(cache_key, str(result), ex=3600) # Cache for 1 hour
return render_template('display.html', data1=result, t1=time_taken, cache_status=cache_status)
return render_template('index.html')
Redisキャッシュを使用した州コードによる郡の取得
このルートは指定された州コードに対する郡を取得します。結果をキャッシュするためにRedisも使用します:
route('/counties-by-state', methods=['GET', 'POST']) .
def counties_by_state():
if request.method == 'POST':
state_code = request.form['state_code']
# Build cache key
cache_key = f"counties_by_state_{state_code}"
# Check if result is cached
cached_result = redis_client.get(cache_key)
if cached_result:
result = eval(cached_result) # Deserialize cached result
time_taken = 0 # No database query, so time taken is negligible
cache_status = "Cache Hit"
else:
# Build SQL query
query = f"""
SELECT county
FROM dbo.county
WHERE state = (SELECT state FROM codes WHERE code = '{state_code}')
"""
print(query) # For debugging
# Execute query and measure time
start_time = time()
result = engine.execute(query).fetchall()
end_time = time()
time_taken = end_time - start_time
cache_status = "Cache Miss"
# Cache the result
redis_client.set(cache_key, str(result), ex=3600) # Cache for 1 hour
return render_template('counties.html', data=result, time_taken=time_taken, cache_status=cache_status)
return render_template('index.html')
Redisキャッシュを使用した人口範囲による州の取得
このルートは指定された人口範囲内の州を取得し、結果をキャッシュします:
route('/states-by-population', methods=['GET', 'POST']) .
def states_by_population():
if request.method == 'POST':
year = request.form['year']
population_start = request.form['population_start']
population_end = request.form['population_end']
# Map year to column name
year_map = {
'2010': 'ten',
'2011': 'eleven',
'2012': 'twelve',
'2013': 'thirteen',
'2014': 'fourteen',
'2015': 'fifteen',
'2016': 'sixteen',
'2017': 'seventeen',
'2018': 'eighteen'
}
year_column = year_map.get(year, 'ten') # Default to 'ten' if year not found
# Build cache key
cache_key = f"states_by_population_{year_column}_{population_start}_{population_end}"
# Check if result is cached
cached_result = redis_client.get(cache_key)
if cached_result:
result = eval(cached_result) # Deserialize cached result
time_taken = 0 # No database query, so time taken is negligible
cache_status = "Cache Hit"
else:
# Build SQL query
query = f"""
SELECT state
FROM popul
WHERE {year_column} BETWEEN '{population_start}' AND '{population_end}'
"""
print(query) # For debugging
# Execute query and measure time
start_time = time()
result = engine.execute(query).fetchall()
end_time = time()
time_taken = end_time - start_time
cache_status = "Cache Miss"
# Cache the result
redis_client.set(cache_key, str(result), ex=3600) # Cache for 1 hour
return render_template('states.html', data=result, time_taken=time_taken, cache_status=cache_status)
return render_template('index.html')
パフォーマンス比較:SQL Server vs. Redis
Query Type | Redis Fetch Time | SQL Execution Time |
---|---|---|
人口範囲クエリ(キャッシュ済み) | 0.002秒 | 0.000秒 |
人口範囲クエリ(新規) | 0.002秒 | 1.342秒 |
要点: Redisは実行時間を約1.3秒から約0.002秒に短縮し、クエリを650倍高速化します!
Redisがパフォーマンスを向上させる方法
Redisは、アプリケーションとデータベースの間にキャッシング層として機能するインメモリデータストアです。私たちのアプリケーションでの動作は次のとおりです:
- キャッシュキー。各クエリのパラメータに基づいて、一意のキーが生成されます。
- キャッシュチェック。データベースクエリを実行する前に、アプリケーションは結果がすでにRedisにキャッシュされているかを確認します。
- キャッシュヒット。結果がRedisに見つかれば、すぐに返され、データベースクエリを回避します。
- キャッシュミス。結果が見つからない場合、クエリが実行され、その結果は将来の使用のためにRedisにキャッシュされます。
- キャッシュの有効期限。キャッシュされた結果は、データの新鮮さを確保するために指定された時間(例:1時間)後に期限切れになります。
頻繁にアクセスされるクエリ結果をキャッシュすることにより、Redisはデータベースへの負荷を大幅に軽減し、繰り返しのクエリに対する応答時間を改善します。
結論
この記事では、人口データを分析するためにSQL Serverデータベースと対話するFlaskアプリケーションを構築しました。クエリ結果をキャッシュするためにRedisを統合し、パフォーマンスを向上させ、データベースの負荷を軽減しました。ベストプラクティスに従うことで、このアプリケーションを拡張してより複雑なクエリを処理し、製品使用のためにスケールアップすることができます。
リンク:この完全なアプリケーションのソースコードはGitHubで見つけることができます。
Source:
https://dzone.com/articles/build-data-analytics-platform-flask-sql-redis