Great Expectations 教程:使用 Python 驗證數據

數據質量和一致性就像房屋的基礎一樣——沒有堅固的基礎,頂部構建的一切都有倒塌的風險。這就是數據驗證發揮重要作用的地方。數據驗證幫助您確保您的數據準確、一致且可靠。

Great Expectations是一個開源數據驗證工具,讓您能夠及早識別數據問題,確保您的數據符合所需的質量標準。

在本指南中,我們將帶您了解如何使用Great Expectations進行數據驗證,並通過實際的端到端示例幫助您入門!

什麼是Great Expectations?

Great Expectations(GX)是一個開源框架,已經在現代數據管道中管理和自動化數據驗證中變得流行起來。

這個基於Python的框架旨在幫助數據團隊確保其數據的質量和一致性。用戶可以定義“期望”——描述有效數據應該是什麼樣子的規則或測試,並自動驗證數據是否符合這些標準。

Great Expectations的一些好處包括:

  • 自動數據驗證 –  Great Expectations自動化了數據驗證過程,減少手動工作量,並最小化錯誤的風險。它確保數據始終符合預定標準。
  • 與數據管道的集成 – 它可以輕鬆集成各種數據來源和平台,包括SQL數據庫、雲存儲和ETL工具,可以在管道的不同階段進行數據驗證。
  • 清晰、可操作的驗證結果 – 這個工具提供透明的驗證結果,可以輕鬆發現數據質量問題並迅速解決。
  • 資料文件 – Great Expectations 可以生成詳細、易於訪問的資料驗證流程文件,幫助團隊對質量標準進行對齊,並為將來的使用提供參考。
  • 可擴展性和靈活性 – 作為一個開源工具,Great Expectations 高度可定制,可以根據您的資料驗證需求進行擴展,提供靈活性以適應各種用例,而無需高昂成本。

現在,讓我們來看一個端到端的例子!

設置 Great Expectations

在本教程中,您將學習如何使用GX Core,Great Expectations 的開源版本,來驗證 Pandas DataFrame。我們將逐步介紹設置上下文、註冊 Pandas 數據源、定義期望並驗證數據批次。

注意: 我們建議您跟隨DataLab notebook,但您也可以創建自己的 Python 腳本。

先決條件

  • 已安裝 Python 3.9 到 3.12。
  • 為了避免衝突,強烈建議您在虛擬環境中安裝Great Expectations(免責聲明:虛擬環境的設置超出本文範圍)。
  • 示例數據集。

註: 如果使用提供的DataLab筆記本,這些先決條件已經滿足。請隨意跳過它們。

使用以下命令通过pip安装GX:

pip install great_expectations

此命令将安装核心包及所有必要的依赖项。

Great Expectations需要一个数据上下文来管理配置。我们使用临时数据上下文以避免持久化配置。

import great_expectations as gx # 获取临时数据上下文 context = gx.get_context() assert type(context).__name__ == "EphemeralDataContext"

创建您的第一个数据验证套件

现在GX已经设置好了,让我们创建一个数据验证套件。

数据源将Great Expectations连接到您的数据,而数据资产代表数据的特定子集(例如表、DataFrame或文件)。

在這種情況下,我們將準備好一切來連接到名為inventory_parts_df的 DataFrame。提供的 DataLab 中有樣本數據集,一旦運行 SQL 塊,它就會被創建:

如果您不使用 DataLab,請使用示例數據創建您自己的 DataFrame。

現在,創建您的數據源和資產:

# 添加一個 Pandas 數據源 data_source = context.data_sources.add_pandas(name="inventory_parts") # 將數據資產添加到數據源 data_asset = data_source.add_dataframe_asset(name="inventory_parts_asset")

批次定義用於識別和組織您的數據以進行驗證。在這裡,我們添加一個涵蓋整個 DataFrame 的批次定義:

# 定義批次定義名稱 batch_definition_name = "inventory_parts_batch" # 添加批次定義 batch_definition = data_asset.add_batch_definition_whole_dataframe(batch_definition_name) assert batch_definition.name == batch_definition_name

批次是與批次定義相關聯的數據集合。要驗證數據,您需要檢索並將批次鏈接到您的 DataFrame,在這個例子中是 inventory_parts_df

# 定義批次參數 batch_parameters = {"dataframe": inventory_parts_df} # 檢索批次 batch = batch_definition.get_batch(batch_parameters=batch_parameters)

期望是用於驗證數據的規則。在這個例子中,我們將定義以下簡單的期望:

  1. 確保 inventory_id 值非空。
  2. 確保 part_num 值唯一。
創建一個期望套件 expectation_suite_name = "inventory_parts_suite" suite = gx.ExpectationSuite(name=expectation_suite_name) 添加期望 suite.add_expectation( gx.expectations.ExpectColumnValuesToNotBeNull(column="inventory_id") ) suite.add_expectation( gx.expectations.ExpectColumnValuesToBeUnique(column="part_num") ) 將期望套件添加到上下文中 context.suites.add(suite)

您可以在期望庫中探索所有可用的期望。我們鼓勵您添加更多!

定義期望後,GX會輸出期望套件配置:

{ "name": "inventory_parts_suite", "id": "b2de0b69-0869-4163-8dde-6c09884483f7", "expectations": [ { "type": "expect_column_values_to_not_be_null", "kwargs": { "column": "inventory_id" }, "meta": {}, "id": "53d6c42a-d190-412f-a113-783b706531f4" }, { "type": "expect_column_values_to_be_unique", "kwargs": { "column": "part_num" }, "meta": {}, "id": "362a2bdc-616d-4b3a-b7f0-c73808caee78" } ], "meta": { "great_expectations_version": "1.2.4" }, "notes": null }

套件包括以下詳細信息:

  1. 套件名稱和ID:一個獨特的名稱(inventory_parts_suite)和標識符,用於跟踪和管理套件。
  2. 期望:每個規則都指定:
    • 檢查類型(例如,確保列沒有空值或唯一條目)。
    • 參數,如正在驗證的列。
    • 元數據和每個期望的唯一ID,以便更容易跟蹤和定制化。
  3. 元數據:Great Expectations的版本信息,確保與該工具的兼容性。
  4. 註: 用於添加關於套件的描述性評論的佔位符(可選)。

這種結構化的輸出既充當文檔,又充當可重複使用的配置,用於驗證您的數據集,從而清晰定義、可追踪並準備好供將來使用。

5. 驗證數據

最後,根據定義的期望驗證批次並評估結果。

# 根據套件驗證數據 validation_results = batch.validate(suite) # 評估結果 print(validation_results)

運行驗證後,Great Expectations將提供詳細報告,指出數據集是否符合定義的期望:

{ "success": false, "results": [ { "success": true, "expectation_config": { "type": "expect_column_values_to_not_be_null", "kwargs": { "batch_id": "inventory_parts-inventory_parts_asset", "column": "inventory_id" }, "meta": {}, "id": "53d6c42a-d190-412f-a113-783b706531f4" }, "result": { "element_count": 580069, "unexpected_count": 0, "unexpected_percent": 0.0, "partial_unexpected_list": [], "partial_unexpected_counts": [], "partial_unexpected_index_list": [] }, "meta": {}, "exception_info": { "raised_exception": false, "exception_traceback": null, "exception_message": null } }, { "success": false, "expectation_config": { "type": "expect_column_values_to_be_unique", "kwargs": { "batch_id": "inventory_parts-inventory_parts_asset", "column": "part_num" }, "meta": {}, "id": "362a2bdc-616d-4b3a-b7f0-c73808caee78" }, "result": { "element_count": 580069, "unexpected_count": 568352, "unexpected_percent": 97.98006788847535, "partial_unexpected_list": [ "48379c01", "paddle", "11816pr0005", "2343", "3003", "30176", "3020", "3022", "3023", "30357", "3039", "3062b", "3068b", "3069b", "3069b", "33291", "33291", "3795", "3941", "3960" ], "missing_count": 0, "missing_percent": 0.0, "unexpected_percent_total": 97.98006788847535, "unexpected_percent_nonmissing": 97.98006788847535, "partial_unexpected_counts": [ { "value": "3069b", "count": 2 }, { "value": "33291", "count": 2 }, { "value": "11816pr0005", "count": 1 }, { "value": "2343", "count": 1 }, { "value": "3003", "count": 1 }, { "value": "30176", "count": 1 }, { "value": "3020", "count": 1 }, { "value": "3022", "count": 1 }, { "value": "3023", "count": 1 }, { "value": "30357", "count": 1 }, { "value": "3039", "count": 1 }, { "value": "3062b", "count": 1 }, { "value": "3068b", "count": 1 }, { "value": "3795", "count": 1 }, { "value": "3941", "count": 1 }, { "value": "3960", "count": 1 }, { "value": "48379c01", "count": 1 }, { "value": "paddle", "count": 1 } ], "partial_unexpected_index_list": [ 0, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21 ] }, "meta": {}, "exception_info": { "raised_exception": false, "exception_traceback": null, "exception_message": null } } ], "suite_name": "inventory_parts_suite", "suite_parameters": {}, "statistics": { "evaluated_expectations": 2, "successful_expectations": 1, "unsuccessful_expectations": 1, "success_percent": 50.0 }, "meta": { "great_expectations_version": "1.2.4", "batch_spec": { "batch_data": "PandasDataFrame" }, "batch_markers": { "ge_load_time": "20241129T122532.416424Z", "pandas_data_fingerprint": "84a1e1939091fcf54324910def3b89cd" }, "active_batch_definition": { "datasource_name": "inventory_parts", "data_connector_name": "fluent", "data_asset_name": "inventory_parts_asset", "batch_identifiers": { "dataframe": "<DATAFRAME>" } } }, "id": null }

此報告詳細說明了您的數據質量,突出了成功和失敗之處。以下是對結果的簡要解釋:

整體驗證:驗證結果部分成功:50%的期望通過,50%失敗。失敗的期望表示存在需要關注的數據質量問題。在這種情況下,一列未滿足定義的規則。

期望 1:inventory_id 不應該有缺失值

  • 結果: 通過
  • 解釋: inventory_id列中的每個值均存在,沒有空值或缺失項。這表示該列的數據完整性良好。

期望2: part_num應該具有唯一值

  • 結果:失敗
  • 解釋: part_num 欄包含了97.98% 的重複值,這意味著只有少數值是獨一無二的。
  • 亮點:
    • 例如重複值包括 “3069b” 和 “33291”。
    • 該工具還顯示這些重複值出現的頻率和它們的行位置,使得更容易找到並解決問題。

當然,這只是一個樣本數據集,我們特意包含了一個通過和一個未通過的期望,這樣您就可以看到驗證結果。

就是這樣!您已成功運行端到端的數據驗證。

將Great Expectations集成到數據管道中

在生產環境中,驗證必須直接嵌入到工作流程中,以持續監控每個階段的數據質量。

在本部分中,我們將討論如何將Great Expectations集成到您的數據管道中。

這些都是示例,以便讓您了解,可能需要其他配置。請查閱每個工具的文檔以獲取最新的語法!

與ETL工具整合

將Great Expectations與流行的ETL工具如Apache AirflowPrefect集成相對直接。將驗證步驟直接嵌入ETL流程將使您能夠在數據影響下游分析之前實時捕捉並解決數據問題。

讓我們通過一個簡單的示例來介紹如何將Great Expectations與Prefect集成,以便作為自動化ETL工作流程的一部分運行數據驗證:

from prefect import task, Flow import great_expectations as ge # 定義一個任務來運行Great Expectations驗證 @task def validate_data(): context = ge.data_context.DataContext() batch_kwargs = {"path": "path/to/your/datafile.csv", "datasource": "your_datasource"} batch = context.get_batch(batch_kwargs, suite_name="your_expectation_suite") results = context.run_validation_operator("action_list_operator", assets_to_validate=[batch]) # 檢查驗證結果,如果驗證失敗則發出警報 if not results["success"]: raise ValueError("Data validation failed!") # 定義您的ETL流程 with Flow("ETL_with_GE_Validation") as flow: validation = validate_data() # 執行流程 flow.run()

在這個示例中,我們定義了一個Prefect流程,其中包括運行Great Expectations驗證的任務。

validate_data() 任務加載 Great Expectations 上下文,檢索數據批次,並應用期望套件。

如果數據不符合驗證標準,該任務將發出警報,停止工作流程並防止下游錯誤。

持續數據驗證

您可以使用各種工具安排驗證任務,例如在基於 Unix 的系統上使用 cron 任務或像 Apache Airflow 這樣的管理服務。在這個例子中,我們將演示如何使用 Airflow 安排驗證運行,它非常適合協調數據管道。

以下是如何設置 Airflow DAG (有向無環圖) 以每天運行 Great Expectations 驗證:

from airflow import DAG from airflow.operators.python_operator import PythonOperator from datetime import datetime import great_expectations as ge 定義DAG並將排程設置為每天運行 default_args = { 'owner': 'airflow', 'start_date': datetime(2024, 1, 1), 'retries': 1, } dag = DAG( 'great_expectations_validation', default_args=default_args, schedule_interval='@daily', 每天運行一次 ) 定義運行驗證的函數 def run_validation(): context = ge.data_context.DataContext() batch = context.get_batch(batch_kwargs, suite_name="your_expectation_suite") results = context.run_validation_operator("action_list_operator", assets_to_validate=[batch]) return results 在Airflow中設置任務 validation_task = PythonOperator( task_id='run_great_expectations_validation', python_callable=run_validation, dag=dag, ) 在DAG中設置任務 validation_task

在這個示例中,我們定義了一個DAG,安排每天運行一次驗證(@daily)。

run_validation()函數通過加載Great Expectations上下文並運行對數據定義的期望套件來執行驗證。

使用Great Expectations進行數據驗證的最佳實踐

始終遵循最佳實踐以獲得可擴展性和效率,在使用Great Expectations進行數據驗證時也不例外。

從小開始並逐步迭代

從基礎數據質量檢查開始,逐步擴展。最好一開始專注於基本期望,這有助於避免使過程過於複雜化,從而實現更順暢的整合和更容易的故障排除。隨著對數據集的理解提升,可以添加更複雜的驗證。

跨團隊合作

數據質量不僅是技術問題。跨業務團隊合作,明確定義期望並確保實施的驗證與基礎業務邏輯和目標一致。這種跨功能方法確保數據能夠發揮其預期作用,並滿足所有利益相關者的要求。

盡可能自動化

在可行的情況下自動化流程,將數據驗證整合到數據管道中。整合自動化驗證檢查可實現對數據質量進行持續監控,無需手動干預,從而顯著提高效率。

結論

做得好!你已經學會如何在 Great Expectations 中配置和驗證數據。這些技術將有助於在你的工作流程中維持高數據質量和透明度。

為了繼續提升你的技能,請查看這些資源:

Source:
https://www.datacamp.com/tutorial/great-expectations-tutorial