How to create a Batch of data from an in-memory Spark or Pandas dataframe or path
This guide will help you load the following as BatchesA selection of records from a Data Asset. for use in creating ExpectationsA verifiable assertion about data.:
- Pandas DataFrames
- Spark DataFrames
What used to be called a “Batch” in the old API was replaced with ValidatorUsed to run an Expectation Suite against data.. A Validator knows how to ValidateThe act of applying an Expectation Suite to a Batch. a particular Batch of data on a particular Execution EngineA system capable of processing data to compute Metrics. against a particular Expectation SuiteA collection of verifiable assertions about data.. In interactive mode, the Validator can store and update an Expectation Suite while conducting Data Discovery or Exploratory Data Analysis.
- Spark DataFrame
- Pandas DataFrame
Prerequisites: This how-to guide assumes you have:
- Completed the Getting Started Tutorial
- A working installation of Great Expectations
- Set up a working deployment of Great Expectations
- Configured and loaded a Data Context
- Configured a Spark Datasource
- Identified an in-memory Spark DataFrame that you would like to use as the data to validate OR
- Identified a filesystem or S3 path to a file that contains the data you would like to use to validate.
Load or create a Data Context
The
context
referenced below can be loaded from disk or configured in code.First, import these necessary packages and modules.
import pyspark
from ruamel import yaml
import great_expectations as gx
from great_expectations import DataContext
from great_expectations.core import ExpectationSuite
from great_expectations.core.batch import RuntimeBatchRequest
from great_expectations.data_context.util import file_relative_path
from great_expectations.validator.validator import ValidatorLoad an on-disk Data ContextThe primary entry point for a Great Expectations deployment, with configurations and methods for all supporting components. (ie. from a
great_expectations.yml
configuration) via theget_context()
command:context: DataContext = gx.get_context()
If you are working in an environment without easy access to a local filesystem (e.g. AWS Spark EMR, Databricks, etc.), load an in-code Data Context using these instructions: How to instantiate a Data Context without a yml file
Obtain an Expectation Suite
If you have not already created an Expectation Suite, you can do so now.
context.add_expectation_suite(
expectation_suite_name="version-0.15.50 insert_your_expectation_suite_name_here"
)The Expectation Suite can then be loaded into memory by using
get_expectation_suite()
.suite: ExpectationSuite = context.get_expectation_suite(
expectation_suite_name="version-0.15.50 insert_your_expectation_suite_name_here"
)Construct a RuntimeBatchRequest
We will create a
RuntimeBatchRequest
and pass it our Spark DataFrame or path via theruntime_parameters
argument, under either thebatch_data
orpath
key. Thebatch_identifiers
argument is required and must be a non-empty dictionary containing all of the Batch Identifiers specified in your Runtime Data ConnectorProvides the configuration details based on the source data system which are needed by a Datasource to define Data Assets. configuration.If you are providing a filesystem path instead of a materialized DataFrame, you may use either an absolute or relative path (with respect to the current working directory). Under the hood, Great Expectations will instantiate a Spark Dataframe using the appropriate
spark.read.*
method, which will be inferred from the file extension. If your file names do not have extensions, you can specify the appropriate reader method explicitly via thebatch_spec_passthrough
argument. Any Spark reader options (i.e.delimiter
orheader
) that are required to properly read your data can also be specified with thebatch_spec_passthrough
argument, in a dictionary nested under a key namedreader_options
.Here is an example DatasourceProvides a standard API for accessing and interacting with data from a wide variety of source systems. configuration in YAML.
datasource_yaml = f"""
name: my_spark_datasource
class_name: Datasource
module_name: great_expectations.datasource
execution_engine:
module_name: great_expectations.execution_engine
class_name: SparkDFExecutionEngine
data_connectors:
my_runtime_data_connector:
class_name: RuntimeDataConnector
batch_identifiers:
- some_key_maybe_pipeline_stage
- some_other_key_maybe_airflow_run_id
"""Save the configuration into your DataContext by using the
add_datasource()
function.context.add_datasource(**yaml.load(datasource_yaml))
If you have a file in the following location:
path_to_file: str = "some_path.csv"
Then the file can be read as a Spark Dataframe using:
df: pyspark.sql.dataframe.DataFrame = spark_session.read.csv(path_to_file)
Here is a Runtime Batch RequestProvided to a Datasource in order to create a Batch. using an in-memory DataFrame:
runtime_batch_request = RuntimeBatchRequest(
datasource_name="version-0.15.50 my_spark_datasource",
data_connector_name="version-0.15.50 my_runtime_data_connector",
data_asset_name="version-0.15.50 insert_your_data_asset_name_here",
runtime_parameters={"batch_data": df},
batch_identifiers={
"some_key_maybe_pipeline_stage": "ingestion step 1",
"some_other_key_maybe_airflow_run_id": "run 18",
},
)Here is a Runtime Batch Request using a path:
runtime_batch_request = RuntimeBatchRequest(
datasource_name="version-0.15.50 my_spark_datasource",
data_connector_name="version-0.15.50 my_runtime_data_connector",
data_asset_name="version-0.15.50 insert_your_data_asset_name_here",
runtime_parameters={"path": path_to_file},
batch_identifiers={
"some_key_maybe_pipeline_stage": "ingestion step 1",
"some_other_key_maybe_airflow_run_id": "run 18",
},
)Best PracticeThough not strictly required, we recommend that you make every Data Asset Name unique. Choosing a unique Data Asset Name makes it easier to navigate quickly through Data DocsHuman readable documentation generated from Great Expectations metadata detailing Expectations, Validation Results, etc. and ensures your logical Data AssetsA collection of records within a Datasource which is usually named based on the underlying data system and sliced to correspond to a desired specification. are not confused with any particular view of them provided by an Execution Engine.
Construct a Validator
my_validator: Validator = context.get_validator(
batch_request=runtime_batch_request,
expectation_suite=suite, # OR
# expectation_suite_name=suite_name
)Alternatively, you may skip step 2 and pass the same Runtime Batch Request instantiation arguments, along with the Expectation Suite (or name), directly to the
get_validator
method.my_validator: Validator = context.get_validator(
datasource_name="version-0.15.50 my_spark_datasource",
data_connector_name="version-0.15.50 my_runtime_data_connector",
data_asset_name="version-0.15.50 insert_your_data_asset_name_here",
runtime_parameters={"path": path_to_file},
batch_identifiers={
"some_key_maybe_pipeline_stage": "ingestion step 1",
"some_other_key_maybe_airflow_run_id": "run 18",
},
batch_spec_passthrough={
"reader_method": "csv",
"reader_options": {"delimiter": ",", "header": True},
},
expectation_suite=suite, # OR
# expectation_suite_name=suite_name
)Check your data
You can check that the first few lines of your Batch are what you expect by running:
my_validator.head()
Now that you have a Validator, you can use it to create Expectations or validate the data.
Prerequisites: This how-to guide assumes you have:
- Completed the Getting Started Tutorial
- A working installation of Great Expectations
- Set up a working deployment of Great Expectations
- Configured and loaded a Data Context
- Configured a Pandas/filesystem Datasource
- Identified a Pandas DataFrame that you would like to use as the data to validate.
Load or create a Data Context
The
context
referenced below can be loaded from disk or configured in code.First, import these necessary packages and modules.
import pandas as pd
from ruamel import yaml
import great_expectations as gx
from great_expectations import DataContext
from great_expectations.core import ExpectationSuite
from great_expectations.core.batch import RuntimeBatchRequest
from great_expectations.validator.validator import ValidatorLoad an on-disk Data Context (ie. from a
great_expectations.yml
configuration) via theget_context()
command:context: DataContext = gx.get_context()
If you are working in an environment without easy access to a local filesystem (e.g. AWS Spark EMR, Databricks, etc.), load an in-code Data Context using these instructions: How to instantiate a Data Context without a yml file
Obtain an Expectation Suite If you have not already created an Expectation Suite, you can do so now.
context.add_expectation_suite(
expectation_suite_name="version-0.15.50 insert_your_expectation_suite_name_here"
)The Expectation Suite can then be loaded into memory by using
get_expectation_suite()
.suite: ExpectationSuite = context.get_expectation_suite(
expectation_suite_name="version-0.15.50 insert_your_expectation_suite_name_here"
)Construct a Runtime Batch Request
We will create a
RuntimeBatchRequest
and pass it our DataFrame or path via theruntime_parameters
argument, under either thebatch_data
orpath
key. Thebatch_identifiers
argument is required and must be a non-empty dictionary containing all of the Batch Identifiers specified in your Runtime Data Connector configuration.If you are providing a filesystem path instead of a materialized DataFrame, you may use either an absolute or relative path (with respect to the current working directory). Under the hood, Great Expectations will instantiate a Pandas Dataframe using the appropriate
pandas.read_*
method, which will be inferred from the file extension. If your file names do not have extensions, you can specify the appropriate reader method explicitly via thebatch_spec_passthrough
argument. Any Pandas reader options (i.e.sep
orheader
) that are required to properly read your data can also be specified with thebatch_spec_passthrough
argument, in a dictionary nested under a key namedreader_options
.Here is an example Datasource configuration in YAML.
datasource_yaml = f"""
name: my_pandas_datasource
class_name: Datasource
module_name: great_expectations.datasource
execution_engine:
module_name: great_expectations.execution_engine
class_name: PandasExecutionEngine
data_connectors:
my_runtime_data_connector:
class_name: RuntimeDataConnector
batch_identifiers:
- some_key_maybe_pipeline_stage
- some_other_key_maybe_airflow_run_id
"""Save the configuration into your DataContext by using the
add_datasource()
function.context.add_datasource(**yaml.load(datasource_yaml))
If you have a file in the following location:
path_to_file: str = "some_path.csv"
Then the file can be read as a Pandas Dataframe using
df: pd.DataFrame = pd.read_csv(path_to_file)
Here is a Runtime Batch Request using an in-memory DataFrame:
runtime_batch_request = RuntimeBatchRequest(
datasource_name="version-0.15.50 my_pandas_datasource",
data_connector_name="version-0.15.50 my_runtime_data_connector",
data_asset_name="version-0.15.50 insert_your_data_asset_name_here",
runtime_parameters={"batch_data": df},
batch_identifiers={
"some_key_maybe_pipeline_stage": "ingestion step 1",
"some_other_key_maybe_airflow_run_id": "run 18",
},
)Here is a Runtime Batch Request using a path:
runtime_batch_request = RuntimeBatchRequest(
datasource_name="version-0.15.50 my_pandas_datasource",
data_connector_name="version-0.15.50 my_runtime_data_connector",
data_asset_name="version-0.15.50 insert_your_data_asset_name_here",
runtime_parameters={"path": path_to_file},
batch_identifiers={
"some_key_maybe_pipeline_stage": "ingestion step 1",
"some_other_key_maybe_airflow_run_id": "run 18",
},
batch_spec_passthrough={
"reader_method": "read_csv",
"reader_options": {"sep": ",", "header": 0},
},
)Best PracticeThough not strictly required, we recommend that you make every Data Asset Name unique. Choosing a unique Data Asset Name makes it easier to navigate quickly through Data Docs and ensures your logical Data Assets are not confused with any particular view of them provided by an Execution Engine.
Construct a Validator
my_validator: Validator = context.get_validator(
batch_request=runtime_batch_request,
expectation_suite=suite, # OR
# expectation_suite_name=suite_name
)Alternatively, you may skip step 2 and pass the same Runtime Batch Request instantiation arguments, along with the Expectation Suite (or name), directly to the
get_validator
method.my_validator: Validator = context.get_validator(
datasource_name="version-0.15.50 my_pandas_datasource",
data_connector_name="version-0.15.50 my_runtime_data_connector",
data_asset_name="version-0.15.50 insert_your_data_asset_name_here",
runtime_parameters={"path": path_to_file},
batch_identifiers={
"some_key_maybe_pipeline_stage": "ingestion step 1",
"some_other_key_maybe_airflow_run_id": "run 18",
},
batch_spec_passthrough={
"reader_method": "read_csv",
"reader_options": {"sep": ",", "header": 0},
},
expectation_suite=suite, # OR
# expectation_suite_name=suite_name
)Check your data
You can check that the first few lines of your Batch are what you expect by running:
my_validator.head()
Now that you have a Validator, you can use it to create Expectations or validate the data.
Additional Notes
To view the full scripts used in this page, see them on GitHub: