Connect GX Cloud and Airflow
In this quickstart, you'll learn how to use GX Cloud with Apache Airflow directly. You'll create a basic DAG that runs a Checkpoint in GX Cloud, and then trigger it through a local installation of an Airflow server.
For a more ergonomic interface and more flexibility, use the Great Expectations Airflow Provider maintained by Astronomer to validate data directly from a DAG.
Apache Airflow is an orchestration tool that allows you to schedule and monitor your data pipelines. For more information about Apache Airflow, see the Apache Airflow documentation.
Prerequisites
-
You have a GX Cloud account.
-
You have installed Apache Airflow and initialized the database (airflow db init).
-
You have connected GX Cloud to a Data Asset on a Data Source. (Note that this automatically creates the Checkpoint your DAG will run.)
-
You have added Expectations.
Run Airflow Standalone to create a fresh local Airflow environment
-
The
airflow standalone
command initializes the database, creates a user, and starts all components.Terminal inputairflow standalone
This command will eventually output a username and password for the Airflow UI like this:
Terminal inputstandalone | Airflow is ready
standalone | Login with username: admin password: Bpu6RgmPMMaDeeq5
standalone | Airflow Standalone is for development purposes only. Do not use this in production! -
Access Airflow UI:
Once the web server is running, open a web browser and go to http://localhost:8080 (by default) to access the Airflow UI using the username and password from the last step
Create a DAG file for your GX Cloud Checkpoint
-
Open a terminal, browse to the
airflow
folder in your home directory, and then run the following code to create a new DAG namedgx_dag.py
:Terminal inputcd ~/airflow
mkdir dags
cd dags
touch gx_dag.py -
Open the
gx_dag.py
DAG file and add the following code:gx_dag.pyimport os
import pendulum
import great_expectations as gx
from airflow.decorators import dag, task
@dag(
schedule=None,
start_date=pendulum.datetime(2024, 9, 1),
catchup=False,
)
def gx_dag_with_deco():
os.environ["NO_PROXY"] = "*" #https://github.com/apache/airflow/discussions/24463
print("Great Expectations DAG Started")
@task
def run_checkpoint():
print("Running Checkpoint")
# Replace <YOUR_ACCESS_TOKEN>, <YOUR_CLOUD_ORGANIZATION_ID> with your credentials
# You can also set GX_CLOUD_ACCESS_TOKEN and GX_CLOUD_ORGANIZATION_ID as environment variables
GX_CLOUD_ACCESS_TOKEN = "<YOUR_ACCESS_TOKEN>"
GX_CLOUD_ORGANIZATION_ID = "<YOUR_CLOUD_ORGANIZATION_ID>"
# Find the Checkpoint name in the GX Cloud UI.
# - Go to the "Validations" tab.
# - Next to the "Validate" button, click the code snippet icon.
# - Click "Generate snippet".
# - Copy the Checkpoint name from the code snippet and use it below.
CHECKPOINT_NAME = "my_data_asset 123ABC - Default Checkpoint"
context = gx.get_context(
mode="cloud",
cloud_organization_id=GX_CLOUD_ACCESS_TOKEN,
cloud_access_token=GX_CLOUD_ORGANIZATION_ID
)
checkpoint = context.checkpoints.get(CHECKPOINT_NAME)
checkpoint.run()
return f"Checkpoint ran: {CHECKPOINT_NAME}"
run_checkpoint()
run_this = gx_dag_with_deco() -
Save your changes and close the
gx_dag.py
DAG file.
Run the DAG (Manually)
-
Restart the airflow server to pick up the new DAG file.
-
Sign in to Airflow using the username and password from the first standalone run
-
In the Actions column, click Trigger DAG for
gx_dag
and confirm your DAG runs as expected.
Clean up local Airflow environment
-
Delete the local files and sqllite database
Terminal inputrm -rf ~/airflow