Skip to main content

How to run a Checkpoint in Airflow

This guide will help you run a Great Expectations checkpoint in Apache Airflow, which allows you to trigger validation of a data asset using an Expectation Suite directly within an Airflow DAG.

Prerequisites: This how-to guide assumes you have:

Using checkpoints is the most straightforward way to trigger a validation run from within Airflow.

The following sections describe two alternative approaches to accomplishing this:

  1. Running a checkpoint with a BashOperator
  2. Running the checkpoint script output with a PythonOperator

Steps#

Option 1: Running a checkpoint with a BashOperator#

You can use a simple BashOperator in Airflow to trigger the checkpoint run. The following snippet shows an Airflow task for an Airflow DAG named dag that triggers the run of a checkpoint we named my_checkpoint:

validation_task = BashOperator(    task_id='validation_task',    bash_command='great_expectations --v3-api checkpoint run my_checkpoint',    dag=dag)

Option 2: Running the checkpoint script output with a PythonOperator#

Another option is to use the output of the great_expectations --v3-api checkpoint script command and paste it into a method that is called from a PythonOperator in the DAG. This gives you more fine-grained control over how to respond to Validation Results:

  1. Run great_expectations --v3-api checkpoint script

    great_expectations --v3-api checkpoint script my_checkpoint
    ...
    A Python script was created that runs the checkpoint named: `my_checkpoint`- The script is located in `great_expectations/uncommitted/my_checkpoint.py`- The script can be run with `python great_expectations/uncommitted/my_checkpoint.py`
  2. Navigate to the generated Python script and copy the content

  3. Create a method in your Airflow DAG file and call it from a PythonOperator:

    def run_checkpoint():    # paste content from the checkpoint script here
    task_run_checkpoint = PythonOperator(    task_id='run_checkpoint',    python_callable=run_checkpoint,    dag=dag,)

Additional Resources#

Please see How to configure a New Checkpoint using "test_yaml_config" for additional Checkpoint configuration and DataContext.run_checkpoint() examples.