数据质量和一致性就像一栋房子的基础一样重要——没有坚实的基础,顶部建造的一切都有倒塌的风险。这就是数据验证发挥重要作用的地方。数据验证帮助您确保数据准确、一致和可靠。
Great Expectations是一个开源数据验证工具,可帮助您及早识别数据问题,并确保您的数据符合所需的质量标准。
在本指南中,我们将指导您如何使用Great Expectations进行数据验证,提供一个实际的端到端示例,帮助您入门!
什么是Great Expectations?
Great Expectations(GX)是一个开源框架,已经在现代数据管道中管理和自动化数据验证方面变得流行。
其基于Python的框架旨在帮助数据团队确保其数据的质量和一致性。用户可以定义“期望” —描述有效数据应该如何看起来的规则或测试 —自动验证数据是否符合这些标准。
Great Expectations的一些优点包括:
- 自动化数据验证 – Great Expectations自动化了数据验证过程,减少了手动工作量,并最大限度地减少了错误的风险。它确保数据始终符合预定义的标准。
- 与数据管道集成 – 它可以轻松与各种数据来源和平台集成,包括SQL数据库、云存储和ETL工具,允许在管道的不同阶段进行数据验证。
- 清晰、可操作的验证结果 – 该工具提供透明的验证结果,便于发现数据质量问题并快速解决。
- 数据文档– Great Expectations可以生成详细、易于访问的数据验证过程文档,帮助团队对质量标准达成一致,并为未来使用提供参考。
- 可扩展性与灵活性– 作为开源工具,Great Expectations具有高度定制化的特点,可以根据数据验证需求进行扩展,提供灵活性以适应各种使用情况,而无需高成本。
现在,让我们看一个端到端的示例!
设置Great Expectations
在本教程中,您将学习如何使用GX Core,Great Expectations的开源版本,来验证Pandas DataFrame。我们将逐步介绍如何设置上下文、注册Pandas数据源、定义期望,并验证数据批次。
注意: 我们建议您跟着DataLab笔记本一起学习,但您也可以创建自己的Python脚本。
1. 安装Great Expectations
先决条件
- 已安装Python 3.9至3.12。
- 为避免冲突,强烈建议您在虚拟环境中安装Great Expectations(免责声明:虚拟环境的设置超出本文范围)。
- 一个示例数据集。
注意: 如果使用提供的DataLab笔记本,这些先决条件已经得到满足。可以直接跳过它们。
使用以下命令通过pip安装GX:
pip install great_expectations
此命令将安装核心包和所有必需的依赖项。
2. 初始化数据上下文
Great Expectations需要一个数据上下文来管理配置。我们使用一个临时数据上下文来避免持久化配置。
import great_expectations as gx # 获取临时数据上下文 context = gx.get_context() assert type(context).__name__ == "EphemeralDataContext"
创建您的第一个数据验证套件
现在GX已设置好,让我们创建一个数据验证套件。
1. 连接到数据源并创建数据资产
数据源将Great Expectations连接到您的数据,而数据资产代表数据的特定子集(例如表、DataFrame或文件)。
在这种情况下,我们将准备好连接到名为inventory_parts_df
的 DataFrame。提供了示例数据集,可以在提供的 DataLab 中找到,并在运行 SQL 代码块后创建:
如果您没有使用 DataLab,请使用示例数据创建自己的 DataFrame。
现在,创建您的数据源和资产:
# 添加一个 Pandas 数据源 data_source = context.data_sources.add_pandas(name="inventory_parts") # 将数据资产添加到数据源 data_asset = data_source.add_dataframe_asset(name="inventory_parts_asset")
2. 添加批定义
批定义用于识别和组织您的数据以进行验证。在这里,我们添加一个覆盖整个 DataFrame 的批定义:
#定义批处理定义名称 batch_definition_name = "inventory_parts_batch" #添加批处理定义 batch_definition = data_asset.add_batch_definition_whole_dataframe(batch_definition_name) assert batch_definition.name == batch_definition_name
3. 检索批处理
批处理是与批处理定义关联的数据集合。要验证数据,您需要检索并将批处理链接到您的 DataFrame,在本例中为 inventory_parts_df
:
#定义批处理参数 batch_parameters = {"dataframe": inventory_parts_df} #检索批处理 batch = batch_definition.get_batch(batch_parameters=batch_parameters)
4. 创建套件并定义期望
期望是用于验证数据的规则。在这个例子中,我们将定义以下简单期望:
- 确保
inventory_id
的值不为空。 - 确保
part_num
的值是唯一的。
# 创建期望套件 expectation_suite_name = "inventory_parts_suite" suite = gx.ExpectationSuite(name=expectation_suite_name) # 添加期望 suite.add_expectation( gx.expectations.ExpectColumnValuesToNotBeNull(column="inventory_id") ) suite.add_expectation( gx.expectations.ExpectColumnValuesToBeUnique(column="part_num") ) # 将期望套件添加到上下文中 context.suites.add(suite)
您可以在期望画廊中浏览所有可用的期望。我们鼓励您再添加一些!
定义期望后,GX 输出期望套件配置:
{ "name": "inventory_parts_suite", "id": "b2de0b69-0869-4163-8dde-6c09884483f7", "expectations": [ { "type": "expect_column_values_to_not_be_null", "kwargs": { "column": "inventory_id" }, "meta": {}, "id": "53d6c42a-d190-412f-a113-783b706531f4" }, { "type": "expect_column_values_to_be_unique", "kwargs": { "column": "part_num" }, "meta": {}, "id": "362a2bdc-616d-4b3a-b7f0-c73808caee78" } ], "meta": { "great_expectations_version": "1.2.4" }, "notes": null }
该套件包括以下详细信息:
- 套件名称和 ID:一个唯一的名称(
inventory_parts_suite
)和标识符,以跟踪和管理该套件。 - 期望: 每个规则指定:
- 检查的类型(例如,确保列没有空值或唯一条目)。
- 参数,如被验证的列。
- 元数据和每个期望的唯一ID,便于跟踪和定制。
- 元数据:Great Expectations的版本信息,确保与该工具兼容。
- 注意: 用于添加关于套件的描述性评论(可选)。
这种结构化输出既作为文档,又作为可重复使用的配置,用于验证数据集,以便清晰定义您的期望,可追踪,并准备好供将来使用。
5. 验证数据
最后,根据定义的期望验证批处理并评估结果。
# 根据套件验证数据 validation_results = batch.validate(suite) # 评估结果 print(validation_results)
运行验证后,Great Expectations 将提供详细报告,指出数据集是否符合定义的期望:
{ "success": false, "results": [ { "success": true, "expectation_config": { "type": "expect_column_values_to_not_be_null", "kwargs": { "batch_id": "inventory_parts-inventory_parts_asset", "column": "inventory_id" }, "meta": {}, "id": "53d6c42a-d190-412f-a113-783b706531f4" }, "result": { "element_count": 580069, "unexpected_count": 0, "unexpected_percent": 0.0, "partial_unexpected_list": [], "partial_unexpected_counts": [], "partial_unexpected_index_list": [] }, "meta": {}, "exception_info": { "raised_exception": false, "exception_traceback": null, "exception_message": null } }, { "success": false, "expectation_config": { "type": "expect_column_values_to_be_unique", "kwargs": { "batch_id": "inventory_parts-inventory_parts_asset", "column": "part_num" }, "meta": {}, "id": "362a2bdc-616d-4b3a-b7f0-c73808caee78" }, "result": { "element_count": 580069, "unexpected_count": 568352, "unexpected_percent": 97.98006788847535, "partial_unexpected_list": [ "48379c01", "paddle", "11816pr0005", "2343", "3003", "30176", "3020", "3022", "3023", "30357", "3039", "3062b", "3068b", "3069b", "3069b", "33291", "33291", "3795", "3941", "3960" ], "missing_count": 0, "missing_percent": 0.0, "unexpected_percent_total": 97.98006788847535, "unexpected_percent_nonmissing": 97.98006788847535, "partial_unexpected_counts": [ { "value": "3069b", "count": 2 }, { "value": "33291", "count": 2 }, { "value": "11816pr0005", "count": 1 }, { "value": "2343", "count": 1 }, { "value": "3003", "count": 1 }, { "value": "30176", "count": 1 }, { "value": "3020", "count": 1 }, { "value": "3022", "count": 1 }, { "value": "3023", "count": 1 }, { "value": "30357", "count": 1 }, { "value": "3039", "count": 1 }, { "value": "3062b", "count": 1 }, { "value": "3068b", "count": 1 }, { "value": "3795", "count": 1 }, { "value": "3941", "count": 1 }, { "value": "3960", "count": 1 }, { "value": "48379c01", "count": 1 }, { "value": "paddle", "count": 1 } ], "partial_unexpected_index_list": [ 0, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21 ] }, "meta": {}, "exception_info": { "raised_exception": false, "exception_traceback": null, "exception_message": null } } ], "suite_name": "inventory_parts_suite", "suite_parameters": {}, "statistics": { "evaluated_expectations": 2, "successful_expectations": 1, "unsuccessful_expectations": 1, "success_percent": 50.0 }, "meta": { "great_expectations_version": "1.2.4", "batch_spec": { "batch_data": "PandasDataFrame" }, "batch_markers": { "ge_load_time": "20241129T122532.416424Z", "pandas_data_fingerprint": "84a1e1939091fcf54324910def3b89cd" }, "active_batch_definition": { "datasource_name": "inventory_parts", "data_connector_name": "fluent", "data_asset_name": "inventory_parts_asset", "batch_identifiers": { "dataframe": "<DATAFRAME>" } } }, "id": null }
这份报告详细介绍了您的数据质量,突出了成功和失败之处。以下是结果的简要说明:
总体验证:验证结果部分成功:50% 的期望通过,50% 失败。失败的期望表示存在需要关注的数据质量问题。在这种情况下,一个列未符合定义的规则。
期望 1:inventory_id
不应缺少数值
- 结果:通过
- 解释:`inventory_id` 列中的每个数值都存在,没有空值或缺失条目。这表明该列具有良好的数据完整性。
期望 2: part_num
应具有唯一值
- 结果:失败
- 解释:
part_num
列包含97.98%的重复值,这意味着只有少数值是唯一的。 - 重点:
- 示例重复值包括“3069b”和“33291”。
- 该工具还显示这些重复值出现的频率及其行位置,使定位和修复问题变得更容易。
当然,这只是一个样本数据集,我们特意包含了一个通过和一个失败的预期,以便您可以看到验证结果。
就是这样!您已成功运行端到端数据验证。
将Great Expectations集成到数据管道中
在生产环境中,验证必须直接嵌入到工作流程中,以持续监视每个阶段的数据质量。
在本节中,我们将讨论如何将Great Expectations集成到您的数据管道中。
这些都是示例,供您参考,并且可能需要额外的配置。查阅每个工具的文档以获取最新的语法!
与ETL工具集成
将Great Expectations与流行的ETL工具(如Apache Airflow或Prefect)集成相对比较简单。直接嵌入验证步骤到ETL流程中,可以让您在数据影响下游分析之前实时捕获并处理数据问题。
让我们通过一个简单的示例来演示如何将Great Expectations与Prefect集成,作为自动化ETL工作流程的一部分运行数据验证:
from prefect import task, Flow import great_expectations as ge # 定义一个任务来运行Great Expectations验证 @task def validate_data(): context = ge.data_context.DataContext() batch_kwargs = {"path": "path/to/your/datafile.csv", "datasource": "your_datasource"} batch = context.get_batch(batch_kwargs, suite_name="your_expectation_suite") results = context.run_validation_operator("action_list_operator", assets_to_validate=[batch]) # 检查验证结果,如果验证失败则触发警报 if not results["success"]: raise ValueError("Data validation failed!") # 定义您的ETL流程 with Flow("ETL_with_GE_Validation") as flow: validation = validate_data() # 执行流程 flow.run()
在这个示例中,我们定义了一个Prefect流程,其中包含一个用于运行Great Expectations验证的任务。
validate_data()任务加载Great Expectations上下文,检索数据批次,并应用期望套件。
如果数据不符合验证标准,任务将引发警报,停止工作流程并防止下游错误。
持续数据验证
您可以使用各种工具安排验证作业,例如Unix系统上的cron作业或诸如Apache Airflow之类的托管服务。在这个示例中,我们将演示如何使用Airflow调度验证运行,Airflow非常适合编排数据管道。
以下是如何设置Airflow DAG(有向无环图),以便每天运行Great Expectations验证:
from airflow import DAG from airflow.operators.python_operator import PythonOperator from datetime import datetime import great_expectations as ge 定义DAG并设置每日运行计划 default_args = { 'owner': 'airflow', 'start_date': datetime(2024, 1, 1), 'retries': 1, } dag = DAG( 'great_expectations_validation', default_args=default_args, schedule_interval='@daily', # 每天运行一次 ) 定义运行验证的函数 def run_validation(): context = ge.data_context.DataContext() batch = context.get_batch(batch_kwargs, suite_name="your_expectation_suite") results = context.run_validation_operator("action_list_operator", assets_to_validate=[batch]) return results 在Airflow中设置任务 validation_task = PythonOperator( task_id='run_great_expectations_validation', python_callable=run_validation, dag=dag, ) 在DAG中设置任务 validation_task
在这个例子中,我们定义了一个DAG,安排每天运行一次验证(@daily
)。
run_validation()
函数通过加载Great Expectations上下文并运行定义的期望套件来执行验证。
使用Great Expectations进行数据验证的最佳实践
遵循最佳实践始终是扩展性和效率的最佳选择,对于使用Great Expectations进行数据验证也是如此。
从小处开始,循序渐进
从基础数据质量检查开始,并逐渐扩展。最好最初专注于基本期望,因为这有助于避免过度复杂化流程,使整合更顺畅,故障排除更容易。随着对数据集的理解提高,可以添加更复杂的验证。
跨团队合作
数据质量不仅是技术问题。跨业务团队合作定义期望,并确保实施的验证与基础业务逻辑和目标一致。这种跨职能的方法确保数据发挥预期作用,满足所有利益相关者的要求。
尽可能自动化
在可能的情况下自动化流程,将数据验证集成到数据管道中。集成自动化验证检查可实现对数据质量的持续监控,无需手动干预,显著提高效率。
结论
做得好!您已经学会了如何在Great Expectations中配置和验证数据。这些技术将有助于在工作流程中保持高数据质量和透明度。
要继续提升您的技能,请查阅以下资源:
- Python中的ETL和ELT:学习如何有效地转换和移动数据。
- 数据质量简介:探索数据质量管理的基本原理。
- Python中的数据清洗:掌握数据清洗技术,确保准确性和一致性。
- 数据质量维度速查表:一份关于数据质量维度的实用指南。
Source:
https://www.datacamp.com/tutorial/great-expectations-tutorial