Step 3: Pipeline integration

This tutorial covers integrating Great Expectations (GE) into a data pipeline.

We will continue the example we used in the previous section, where we created an expectation suite for the data asset npidata. That defined what we expect a valid batch of this data to look like.

Once our pipeline is deployed, it will process new batches of this data asset as they arrive. We will use Great Expectations to validate each batch and ensure it matches our expectations for the relevant component of our data application. Later in this document we will also cover validation operators, which allow you to provide more complex logic to accompany validation.

Just before calling the method that does the computation on a new batch, we call Great Expectations’ validate method. If the file does not pass validation, we can decide what to do—stop the pipeline, log a warning, send a notification or perform some other custom action.

A Great Expectations DataContext describes data assets using a three-part namespace consisting of datasource_name, generator_name, and generator_asset.

To run validation for a data_asset, we need two additional elements:

  • a batch to validate; in our case it is a file loaded into a Pandas DataFrame

  • an expectation_suite to validate against

../_images/data_asset_namespace.png

Video

Watch the video on YouTube.

Get a DataContext object

A DataContext represents a Great Expectations project. It organizes storage and access for expectation suites, datasources, notification settings, and data fixtures. The DataContext is configured via a yml file stored in a directory called great_expectations. The configuration file and managed expectation suites should be stored in version control.

Obtaining a DataContext object gets us access to these resources after the object reads its configuration file:

context = ge.data_context.DataContext()

The DataContext constructor takes an optional parameter specifying the path to the great_expectations.yml file; by default it will search in the current directory or relative to known great expectations project filepaths.

To read more, see: DataContexts.

Set a Run Id

A run_id links together validations of different data assets, making it possible to track “runs” of a pipeline and follow data assets as they are transformed, joined, annotated, enriched, or evaluated. The run id can be any string; by default, Great Expectations will use an ISO 8601-formatted UTC datetime string.

The Great Expectations DataContext object uses the run id to determine when Evaluation Parameters should be linked between data assets. After each validation completes, the DataContext identifies and stores any validation results that are referenced by other data asset expectation suites managed by the context. Then, when a batch of data is validated against one of those expectation suites, with the same run id, the context will automatically insert the relevant parameter from the validation result. For example, if a batch of the node_2 data_asset expects the number of unique values in its id column to equal the number of distinct values in the id column from node_1, we need to provide the same run_id to ensure that parameter can be passed between validations.

See DataContext Evaluation Parameter Store for more information.

The default run_id generated by Great Expectations is built using the following code:

run_id = datetime.datetime.utcnow().isoformat().replace(":", "") + "Z"

Choose a Data Asset and Expectation Suite

We called our data asset npidata and created an expectation suite called warning. When there is no ambiguity, the DataContext will be able to infer the fully-normalized name for our data asset (data__dir/default/npidata) from the generator_asset name that we provided, but we can also specify it completely. For our validation, we will link all of the parameters and supply them to great_expectations.

Usually, we do not need to obtain an expectation suite without an associated batch of data (see the next section), but if we do wish to obtain the expectation suite directly, the context makes that easy:

# This command will normalize the shortened data asset name to the full name "data__dir/default/npidata"
my_suite = context.get_expectation_suite("npidata", "warning")

Obtain a Batch to Validate

You can use the same approach to obtain a batch of data as we used in the create expectations notebook.

Further, for many integrations, it is possible to provide a reference to an existing batch of data, such as a pandas or spark DataFrame. In such cases, the InMemoryGenerator provides a straightforward mechanism to create a Great Expectations data batch from existing objects; that leaves management of information about the specific batch to your existing pipeline runner.

We can also use a partition_id help a generator identify a particular batch of data and build all the required batch_kwargs from a short name:

batch_kwargs = context.build_batch_kwargs("npidata", "npidata_pfile_20190902-20190908"))

Finally, we can explicitly build batch kwargs, which can be particularly useful for sqlalchemy datasources:

my_explicit_batch_kwargs = {
  "query": "select id, date, procedure_code from normalized_codes where date < '2019-07-01'"
}
batch = context.get_batch("my_source/my_generator/my_asset","my_expectations", my_explicit_batch_kwargs)

Validate

Validation evaluates our expectations against the given batch and produces a report that describes observed values and any places where expectations are not met. To validate the batch of data call the validate() method on the batch of data obtained from the DataContext:

validation_result = batch.validate(run_id=run_id)

Review Validation Results

As part of an integrated pipeline, we may take specific actions based on the the result of the validation run. See Validation Results for more information about the validation_result result object. A common pattern is to stop or issue a warning in the code:

if validation_result["success"]:
  logger.info("This file meets all expectations from a valid batch of {0:s}".format(str(data_asset_name)))
else:
  logger.warning("This file is not a valid batch of {0:s}".format(str(data_asset_name)))

Validation Operators

Validation Operators and Actions make it possible to define collections of tasks together that should be done after a validation. For example, we might store results (either on a local filesystem or to S3), send a slack notification, and update data documentation. The default configuration performs each of those actions. See the Validation Operators And Actions Introduction for more information.

Below is the default configuration for

perform_action_list_operator:
  class_name: ActionListValidationOperator
  action_list:
    - name: store_validation_result
      action:
        class_name: StoreAction
    - name: store_evaluation_params
      action:
        class_name: ExtractAndStoreEvaluationParamsAction
    - name: update_data_docs
      action:
        class_name: UpdateDataDocsAction
    - name: send_slack_notification_on_validation_result
      action:
        class_name: SlackNotificationAction
        # put the actual webhook URL in the uncommitted/config_variables.yml file
        slack_webhook: ${validation_notification_slack_webhook}
        notify_on: all # possible values: "all", "failure", "success"
        renderer:
          module_name: great_expectations.render.renderer.slack_renderer
          class_name: SlackRenderer

Save Validation Results

The DataContext object provides a configurable validations_store where GE can store validation_result objects for subsequent evaluation and review. By default, the DataContext stores results in the great_expectations/uncommitted/validations directory. To specify a different directory or use a remote store such as s3, edit stores section of the DataContext configuration object:

stores:
  validations_store:
    class_name: ValidationsStore
    store_backend:
      class_name: FixedLengthTupleS3Backend
      bucket: my_bucket
      prefix: my_prefix

Validation results will be stored according to the same hierarchical namespace used to refer to data assets elsewhere in the context, and will have the run_id prepended: base_location/run_id/datasource_name/generator_name/generator_asset/expectation_suite_name.json.

Removing the result_store section from the configuration object will disable automatically storing validation_result objects.

Send a Slack Notification

The last action in the action list of the Validation Operator above sends notifications using a user-provided callback function based on the validation result.

- name: send_slack_notification_on_validation_result
  action:
    class_name: SlackNotificationAction
    # put the actual webhook URL in the uncommitted/config_variables.yml file
    slack_webhook: ${validation_notification_slack_webhook}
    notify_on: all # possible values: "all", "failure", "success"
    renderer:
      module_name: great_expectations.render.renderer.slack_renderer
      class_name: SlackRenderer

GE includes a slack-based notification in the base package. To enable a slack notification for results, simply specify the slack webhook URL in the uncommitted/config_variables.yml file:

validation_notification_slack_webhook: https://slack.com/your_webhook_url

Validation Operators And Actions Introduction