Deploying Great Expectations with Google Cloud Composer (Hosted Airflow)

This guide will help you deploy Great Expectations within an Airflow pipeline running on Google Cloud Composer.

Prerequisites: This how-to guide assumes you have already:

Steps

Note: These steps are basically following the Deploying Great Expectations with Airflow documentation with some items specific to Google Cloud Composer.

  1. Set up your Composer environment

    Create a Composer environment using the instructions located in the Composer documentation. Currently Airflow >=1.10.6 is supported by Great Expectations >=0.12.1.

  2. Create Expectations

    Create Expectations using our guide to Creating and Editing Expectations.

    You can store your Expectations anywhere that is accessible to the Cloud Composer environment. One simple pattern is to use a folder in the bucket provided by the Composer environment. You can manually push updated expectation JSON files from your version controlled repository via gsutil (as in the code snippet below) or the GCS UI. Alternatively you can automate this using Google Cloud Build or any other automation tool.

    Read more about setting up expectation stores in GCS here.

    # copy expectation suites to bucket
    # where COMPOSER_GCS_BUCKET is an environment variable with the name of your bucket
    gsutil cp -r expectations/ gs://${COMPOSER_GCS_BUCKET}/great_expectations/
    
  3. Create your Data Context

    Since we’d prefer not to use the Airflow container filesystem to host a Data Context as a .yml file, another approach is to instantiate it in a Python file either as part of your DAG or imported by your DAG at runtime. Follow this guide on How to instantiate a Data Context without a yml file and see the example below.

    Note: You may want to reference our Configuring metadata stores and Configuring Data Docs how-to guides. All of the stores in the below example are configured to use GCS, however you can use whichever store is applicable to your infrastructure.

    Important: If your Composer workflow includes spinning up/tearing down Composer environments and deleting the associated GCS bucket, you need to configure a separate bucket to persist your Great Expectations assets.

    project_config = DataContextConfig(
        config_version=2,
        plugins_directory=None,
        config_variables_file_path=None,
        datasources={
            "my_pandas_datasource": { # This is an example for a Pandas Datasource
                "data_asset_type": {
                    "class_name": "PandasDataset",
                    "module_name": "great_expectations.dataset",
                },
                "class_name": "PandasDatasource",
                "module_name": "great_expectations.datasource",
                "batch_kwargs_generators": {
                    # TODO: Enter these here or later in your validations
                },
            },
            "my_bigquery_datasource": { # This is an example for a BigQuery Datasource with a credentials file
                "data_asset_type": {
                    "module_name": "great_expectations.dataset",
                    "class_name": "SqlAlchemyDataset"
                },
                "class_name": "SqlAlchemyDatasource",
                "module_name": "great_expectations.datasource",
                "credentials": {
                    "url": "bigquery://REPLACE/ME/credentials.json" # TODO: replace with your value
                },
            }
        },
        stores={
            "expectations_GCS_store": {
                "class_name": "ExpectationsStore",
                "store_backend": {
                    "class_name": "TupleGCSStoreBackend",
                    "project": "REPLACE ME",  # TODO: replace with your value
                    "bucket": "REPLACE ME",  # TODO: replace with your value
                    "prefix": "REPLACE ME",  # TODO: replace with your value
                },
            },
            "validations_GCS_store": {
                "class_name": "ValidationsStore",
                "store_backend": {
                    "class_name": "TupleGCSStoreBackend",
                    "project": "REPLACE ME",  # TODO: replace with your value
                    "bucket": "REPLACE ME",  # TODO: replace with your value
                    "prefix": "REPLACE ME",  # TODO: replace with your value
                },
            },
            "evaluation_parameter_store": {"class_name": "EvaluationParameterStore"},
        },
        expectations_store_name="expectations_GCS_store",
        validations_store_name="validations_GCS_store",
        evaluation_parameter_store_name="evaluation_parameter_store",
        data_docs_sites={
            "gs_site": {
                "class_name": "SiteBuilder",
                "store_backend": {
                    "class_name": "TupleGCSStoreBackend",
                    "project": "REPLACE ME",  # TODO: replace with your value
                    "bucket": "REPLACE ME",  # TODO: replace with your value
                    "prefix": "REPLACE ME",  # TODO: replace with your value
                },
                "site_index_builder": {
                    "class_name": "DefaultSiteIndexBuilder",
                    "show_cta_footer": True,
                },
                "show_how_to_buttons": True,
            }
        },
        validation_operators={
            "action_list_operator": {
                "class_name": "ActionListValidationOperator",
                "action_list": [
                    {
                        "name": "store_validation_result",
                        "action": {"class_name": "StoreValidationResultAction"},
                    },
                    {
                        "name": "store_evaluation_params",
                        "action": {"class_name": "StoreEvaluationParametersAction"},
                    },
                    {
                        "name": "update_data_docs",
                        "action": {"class_name": "UpdateDataDocsAction"},
                    },
                ],
            }
        },
        anonymous_usage_statistics={
          "enabled": True
        }
    )
    
    context = BaseDataContext(project_config=project_config)
    
  4. Create a DAG with Validations

    To create a DAG to run Validations with Great Expectations, follow the instructions for Running a Validation using a PythonOperator. Note that we will use the Data Context instantiated in the previous step in place of reading from the filesystem as in the linked example.

    You can raise an AirflowException if your Validation fails (as in the example here: Running a Validation using a PythonOperator) which will show in logs and the UI as in the image below:

Airflow pipeline with Validations passing and failing.
  1. Upload your Expectations and DAG

    Upload your Expectations to your Expectation Store (as configured in your Data Context). If your Expectation Store is in your GCS bucket you can use gsutil to upload the JSON files - just make sure to keep the same directory structure. Alternatively you can automate using something like Google Cloud Build or GitHub Actions or your favorite CI tool.

    Upload your DAG files to the GCS bucket dags/ folder assigned to your Composer environment.

  2. Monitor your deployment

    You can now monitor your deployment just like any other Airflow environment either via the Airflow UI (linked from your cloud platform environments page) or by submitting commands using Google Cloud Shell. If you used AirflowExceptions to handle failing Validations as in step 4, these will show up in your logs and in the Airflow UI.

Additional resources

Comments