Great Expectations (GX) is an open source Python-based data validation framework. You can test your data by expressing what you “expect” from it as simple declarative statements in JSON or YAML, then run validations using those Expectation Suites against data SQL data, Filesystem Data or a pandas DataFrame. The Great Expectations Airflow Provider gives users a convenient method for running validations directly from their DAGs.
This tutorial shows how to use the GreatExpectationsOperator in an Airflow DAG, leveraging automatic creation of a default Checkpoint and connecting via an Airflow connection.
There are multiple resources for learning about this topic. See also:
This tutorial takes approximately 20 minutes to complete.
To get the most out of this tutorial, make sure you have an understanding of:
great_expectations package.To use GX with Airflow, install the Great Expectations Airflow Provider in your Astro project.
Create a new Astro project:
Add the GX Airflow provider to your Astro project requirements.txt file.
Add the following GX dependency to your Astro project packages.txt file.
The Great Expectations Airflow Provider requires a GX project to be present in your Airflow environment. The easiest way to create a GX project is by using the great_expectations package and following the steps below. If you cannot install the GX package locally, you can copy the great_expectations folder from this GitHub repository into your Astro project include folder instead and continue this tutorial at Step 3.
In your include folder, create a new file called gx_init_script.py. Then, copy and paste the following code into the file:
Go to your include folder and run the gx_init_script.py file to instantiate a Data Context at include/great_expectations.
Create a new file in your include/great_expectations/expectations folder called strawberry_suite.json. Then, copy and paste the following code into the file:
This JSON file defines an Expectation Suite containing one expectation of the type expect_table_row_count_to_be_between. This expectation will check that the number of rows in a table is between 2 and 1000.
You should now have the following file structure in your include folder:
Run astro dev start to start your Astro project.
The easiest way to use GX with Airflow is to let the GreatExpectationsOperator create a default Checkpoint and Datasource based on an Airflow connection. To set up a connection to a Postgres database complete the following steps:
In the Airflow UI at localhost:8080, go to Admin -> Connections and click +.
Create a new connection named postgres_default using the following information:
postgres_default.Postgres.<your postgres host>.<your postgres username>.<your postgres password>.postgres. Note that this is the name of the PostgreSQL database, not of the schema in the database. See the PostgreSQL provider documentation.<your postgres port>.Click Save.
Create a new file in your dags folder called gx_tutorial.py.
Copy and paste the following DAG code into the file:
This DAG will create a table in your Postgres database, run a GX validation on the table, and then drop the table.
The data in the table is validated using the GreatExpectationsOperator (GXO). The operator automatically creates a default Checkpoint and Datasource based on the postgres_default connection and runs the Expectations defined in the strawberry_suite.json file on the strawberries table. Note that instead of using the schema parameter of the GXO, you can also provide the schema name to the data_asset_name parameter in the form of my_schema_name.my_table_name.
Open Airflow at http://localhost:8080/. Run the DAG manually by clicking the play button.
By default, the GreatExpectationsOperator pushes a CheckpointResult object to XCom. You can instead return a json-serializable dictionary by setting the return_json_dict parameter to True.
If you do not want to use this built-in serialization, you can either enable XCom pickling by setting the environment variable AIRFLOW__CORE__ENABLE_XCOM_PICKLING=True, or use a custom serialization method in a custom XCom backend.
The GreatExpectationsOperator is a versatile operator allowing you to integrate GX into your Airflow environment. This tutorial shows the simplest way of using the operator by letting it create a default Checkpoint and Datasource based on an Airflow connection. The GreatExpectationsOperator also allows you to pass in a CheckpointConfig object using the checkpoint_config parameter or a checkpoint_kwargs dictionary. You can also customize the Execution Engines and pass DataContextConfig objects by configuring Operator parameters.
For more examples, check out the Get Improved Data Quality Checks in Airflow with the Updated Great Expectations Operator blog post and the Airflow data quality demo repository.
The GreatExpectationsOperator is highly customizable to allow expert GX users to use their custom objects. This section explains some of the most commonly used parameters. Please refer to the GX documentation for in depth explanations of GX concepts.
When using the GreatExpectationsOperator, you must pass in either one of the following parameters:
data_context_root_dir (str): The path to your GX project directory. This is the directory containing your great_expectations.yml file.data_context_config (DataContextConfig): To use an in-memory Data Context, a DataContextConfig must be defined and passed to the operator, see also this example DataContextConfig.Next, the operator will determine the Checkpoint and Datasource used to run your GX validations:
conn_id you pass in and run the given Expectation Suite. This is the simplest way to use the operator and what is shown in this tutorial.checkpoint_name parameter. The CheckpointStore is often in the great_expectations/checkpoints/ path, so that checkpoint_name = "strawberries.pass.chk" would reference the file great_expectations/checkpoints/strawberries/pass/chk.yml.checkpoint_config or a dictionary of your checkpoint config to checkpoint_kwargs. checkpoint_kwargs can also be used to specify additional overwriting configurations. See the example CheckpointConfig.The Datasource can also be a pandas DataFrame, as shown in Running GX validations on pandas DataFrames. Depending on how you define your Datasource the data_asset_name parameter has to be adjusted:
data_asset_name parameter can be any name that will help you identify the DataFrame.conn_id is supplied, the data_asset_name must be the name of the table the Expectations Suite runs on.By default, a Great Expectations task runs validations and raises an AirflowException if any of the tests fail. To override this behavior and continue running the pipeline even if tests fail, set the fail_task_on_validation_failure flag to False.
In Astronomer or any environment with OpenLineage configured, the GreatExpectationsOperator will automatically add the OpenLineage action to its default action list when a Checkpoint is not specified to the operator. To turn off this feature, set the use_open_lineage parameter to False.
For a full list of parameters, see the Great Expectations provider repository. For more information about the parameters and examples, see the README in the provider repository.
The GreatExpectationsOperator can also be used to run validations on CSV files by passing them in as a pandas DataFrame. This pattern is useful to test pipelines locally with small amounts of data. Note that the execution_engine parameter needs to be adjusted.
The GreatExpectationsOperator can run a Checkpoint on a dataset stored in any backend compatible with GX. This includes BigQuery, MSSQL, MySQL, PostgreSQL, Redshift, Snowflake, SQLite, and Athena, among others. All that’s needed to get the GreatExpectationsOperator to point at an external dataset is to set up an Airflow Connection to the Datasource and setting the conn_id parameter. Connections will still work if you have your connection configured in both Airflow and Great Expectations, as long as the correct Datasource is specified in the Checkpoint passed to the operator.