Skip to main content

How to create a Batch of data from an in-memory Spark or Pandas dataframe or path

This guide will help you load the following as BatchesA selection of records from a Data Asset. for use in creating ExpectationsA verifiable assertion about data.:

  1. Pandas DataFrames
  2. Spark DataFrames

What used to be called a “Batch” in the old API was replaced with ValidatorUsed to run an Expectation Suite against data.. A Validator knows how to ValidateThe act of applying an Expectation Suite to a Batch. a particular Batch of data on a particular Execution EngineA system capable of processing data to compute Metrics. against a particular Expectation SuiteA collection of verifiable assertions about data.. In interactive mode, the Validator can store and update an Expectation Suite while conducting Data Discovery or Exploratory Data Analysis.

Prerequisites: This how-to guide assumes you have:
  1. Load or create a Data Context

    The context referenced below can be loaded from disk or configured in code.

    First, import these necessary packages and modules.

    import pyspark
    from ruamel import yaml

    import great_expectations as gx
    from great_expectations import DataContext
    from great_expectations.core import ExpectationSuite
    from great_expectations.core.batch import RuntimeBatchRequest
    from great_expectations.data_context.util import file_relative_path
    from great_expectations.validator.validator import Validator

    Load an on-disk Data ContextThe primary entry point for a Great Expectations deployment, with configurations and methods for all supporting components. (ie. from a great_expectations.yml configuration) via the get_context() command:

    context: DataContext = gx.get_context()

    If you are working in an environment without easy access to a local filesystem (e.g. AWS Spark EMR, Databricks, etc.), load an in-code Data Context using these instructions: How to instantiate a Data Context without a yml file

  2. Obtain an Expectation Suite

    If you have not already created an Expectation Suite, you can do so now.

    context.create_expectation_suite(
    expectation_suite_name="insert_your_expectation_suite_name_here"
    )

    The Expectation Suite can then be loaded into memory by using get_expectation_suite().

    suite: ExpectationSuite = context.get_expectation_suite(
    expectation_suite_name="insert_your_expectation_suite_name_here"
    )
  3. Construct a RuntimeBatchRequest

    We will create a RuntimeBatchRequest and pass it our Spark DataFrame or path via the runtime_parameters argument, under either the batch_data or path key. The batch_identifiers argument is required and must be a non-empty dictionary containing all of the Batch Identifiers specified in your Runtime Data ConnectorProvides the configuration details based on the source data system which are needed by a Datasource to define Data Assets. configuration.

    If you are providing a filesystem path instead of a materialized DataFrame, you may use either an absolute or relative path (with respect to the current working directory). Under the hood, Great Expectations will instantiate a Spark Dataframe using the appropriate spark.read.* method, which will be inferred from the file extension. If your file names do not have extensions, you can specify the appropriate reader method explicitly via the batch_spec_passthrough argument. Any Spark reader options (i.e. delimiter or header) that are required to properly read your data can also be specified with the batch_spec_passthrough argument, in a dictionary nested under a key named reader_options.

    Here is an example DatasourceProvides a standard API for accessing and interacting with data from a wide variety of source systems. configuration in YAML.

    datasource_yaml = f"""
    name: my_spark_datasource
    class_name: Datasource
    module_name: great_expectations.datasource
    execution_engine:
    module_name: great_expectations.execution_engine
    class_name: SparkDFExecutionEngine
    data_connectors:
    my_runtime_data_connector:
    class_name: RuntimeDataConnector
    batch_identifiers:
    - some_key_maybe_pipeline_stage
    - some_other_key_maybe_airflow_run_id
    """

    Save the configuration into your DataContext by using the add_datasource() function.

    context.add_datasource(**yaml.load(datasource_yaml))

    If you have a file in the following location:

    path_to_file: str = "some_path.csv"

    Then the file can be read as a Spark Dataframe using:

    df: pyspark.sql.dataframe.DataFrame = spark_session.read.csv(path_to_file)

    Here is a Runtime Batch RequestProvided to a Datasource in order to create a Batch. using an in-memory DataFrame:

    runtime_batch_request = RuntimeBatchRequest(
    datasource_name="my_spark_datasource",
    data_connector_name="my_runtime_data_connector",
    data_asset_name="insert_your_data_asset_name_here",
    runtime_parameters={"batch_data": df},
    batch_identifiers={
    "some_key_maybe_pipeline_stage": "ingestion step 1",
    "some_other_key_maybe_airflow_run_id": "run 18",
    },
    )

    Here is a Runtime Batch Request using a path:

    runtime_batch_request = RuntimeBatchRequest(
    datasource_name="my_spark_datasource",
    data_connector_name="my_runtime_data_connector",
    data_asset_name="insert_your_data_asset_name_here",
    runtime_parameters={"path": path_to_file},
    batch_identifiers={
    "some_key_maybe_pipeline_stage": "ingestion step 1",
    "some_other_key_maybe_airflow_run_id": "run 18",
    },
    )
    Best Practice

    Though not strictly required, we recommend that you make every Data Asset Name unique. Choosing a unique Data Asset Name makes it easier to navigate quickly through Data DocsHuman readable documentation generated from Great Expectations metadata detailing Expectations, Validation Results, etc. and ensures your logical Data AssetsA collection of records within a Datasource which is usually named based on the underlying data system and sliced to correspond to a desired specification. are not confused with any particular view of them provided by an Execution Engine.

  4. Construct a Validator

    my_validator: Validator = context.get_validator(
    batch_request=runtime_batch_request,
    expectation_suite=suite, # OR
    # expectation_suite_name=suite_name
    )

    Alternatively, you may skip step 2 and pass the same Runtime Batch Request instantiation arguments, along with the Expectation Suite (or name), directly to the get_validator method.

    my_validator: Validator = context.get_validator(
    datasource_name="my_spark_datasource",
    data_connector_name="my_runtime_data_connector",
    data_asset_name="insert_your_data_asset_name_here",
    runtime_parameters={"path": path_to_file},
    batch_identifiers={
    "some_key_maybe_pipeline_stage": "ingestion step 1",
    "some_other_key_maybe_airflow_run_id": "run 18",
    },
    batch_spec_passthrough={
    "reader_method": "csv",
    "reader_options": {"delimiter": ",", "header": True},
    },
    expectation_suite=suite, # OR
    # expectation_suite_name=suite_name
    )
  5. Check your data

    You can check that the first few lines of your Batch are what you expect by running:

    my_validator.head()

    Now that you have a Validator, you can use it to create Expectations or validate the data.

Additional Notes

To view the full scripts used in this page, see them on GitHub: