Skip to main content

How to Use Great Expectations in AWS Glue

This Guide demonstrates how to set up, initialize and run validations against your data on AWS Glue Spark Job. We will cover case with RuntimeDataConnector and use S3 as metadata store.

0. Pre-requirements​

  • Configure great_expectations.yaml and upload to your S3 bucket or generate it dynamically from code
config_version: 3.0
datasources:
spark_s3:
module_name: great_expectations.datasource
class_name: Datasource
execution_engine:
module_name: great_expectations.execution_engine
class_name: SparkDFExecutionEngine
data_connectors:
default_inferred_data_connector_name:
class_name: InferredAssetS3DataConnector
bucket: bucket_name
prefix: data_folder_prefix/
default_regex:
pattern: (.*)
group_names:
- data_asset_name
default_runtime_data_connector_name:
batch_identifiers:
- runtime_batch_identifier_name
module_name: great_expectations.datasource.data_connector
class_name: RuntimeDataConnector

config_variables_file_path: great_expectations/uncommitted/config_variables.yml


plugins_directory: great_expectations/plugins/

stores:
expectations_S3_store:
class_name: ExpectationsStore
store_backend:
class_name: TupleS3StoreBackend
bucket: 'bucket_name'
prefix: 'path_from_root/great_expectations/expectations/'

validations_S3_store:
class_name: ValidationsStore
store_backend:
class_name: TupleS3StoreBackend
bucket: 'bucket_name'
prefix: 'path_from_root/great_expectations/uncommitted/validations/'

evaluation_parameter_store:
class_name: EvaluationParameterStore

checkpoint_S3_store:
class_name: CheckpointStore
store_backend:
class_name: TupleS3StoreBackend
bucket: 'bucket_name'
prefix: 'path_from_root/great_expectations/checkpoints/'

expectations_store_name: expectations_S3_store
validations_store_name: validations_S3_store
evaluation_parameter_store_name: evaluation_parameter_store
checkpoint_store_name: checkpoint_S3_store

data_docs_sites:
s3_site:
class_name: SiteBuilder
show_how_to_buttons: false
store_backend:
class_name: TupleS3StoreBackend
bucket: bucket_name
site_index_builder:
class_name: DefaultSiteIndexBuilder

1. Install Great Expectations​

You need to add to your AWS Glue Spark Job Parameters to install great expectations module. Glue at least v2

  β€” additional-python-modules great_expectations

Then import necessary libs:

import boto3
import yaml
from awsglue.context import GlueContext
from pyspark.context import SparkContext

import great_expectations as gx
from great_expectations.checkpoint import SimpleCheckpoint
from great_expectations.core.batch import RuntimeBatchRequest
from great_expectations.data_context import BaseDataContext
from great_expectations.data_context.types.base import (
DataContextConfig,
S3StoreBackendDefaults,
)

2. Set up Great Expectations​

Here we initialize a Spark and Glue, and read great_expectations.yaml

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
s3_client = boto3.client("s3")
response = s3_client.get_object(
Bucket="bucket", Key="bucket/great_expectations/great_expectations.yml"
)
config_file = yaml.safe_load(response["Body"])

3. Connect to your data​

config = DataContextConfig(
config_version=config_file["config_version"],
datasources=config_file["datasources"],
expectations_store_name=config_file["expectations_store_name"],
validations_store_name=config_file["validations_store_name"],
evaluation_parameter_store_name=config_file["evaluation_parameter_store_name"],
plugins_directory="/great_expectations/plugins",
validation_operators=config_file["validation_operators"],
stores=config_file["stores"],
data_docs_sites=config_file["data_docs_sites"],
config_variables_file_path=config_file["config_variables_file_path"],
anonymous_usage_statistics=config_file["anonymous_usage_statistics"],
checkpoint_store_name=config_file["checkpoint_store_name"],
store_backend_defaults=S3StoreBackendDefaults(
default_bucket_name=config_file["data_docs_sites"]["s3_site"]["store_backend"][
"bucket"
]
),
)
context_gx = BaseDataContext(project_config=config)

4. Create Expectations​

expectation_suite_name = "suite_name"
suite = context_gx.create_expectation_suite(expectation_suite_name)
batch_request = RuntimeBatchRequest(
datasource_name="spark_s3",
data_asset_name="datafile_name",
batch_identifiers={"runtime_batch_identifier_name": "default_identifier"},
data_connector_name="default_inferred_data_connector_name",
runtime_parameters={"path": "s3a://bucket_name/path_to_file.format"},
)
validator = context_gx.get_validator(
batch_request=batch_request,
expectation_suite_name=expectation_suite_name,
)
print(validator.head())
validator.expect_column_values_to_not_be_null(
column="passenger_count"
) ## add some test
validator.save_expectation_suite(discard_failed_expectations=False)

5. Validate your data​

checkpoint_config = {
"class_name": "SimpleCheckpoint",
"validations": [
{
"batch_request": batch_request,
"expectation_suite_name": expectation_suite_name,
}
],
}

checkpoint = SimpleCheckpoint(
f"_tmp_checkpoint_{expectation_suite_name}", context_gx, **checkpoint_config
)
results = checkpoint.run(result_format="SUMMARY", run_name="test")
validation_result_identifier = results.list_validation_result_identifiers()[0]

6. Congratulations!​

Your data docs built on S3 and you can see index.html at the bucket

This documentation has been contributed by Bogdan Volodarskiy from Provectus