Add Spark support for Custom Expectations
This guide will help you implement native Spark support for your Custom ExpectationAn extension of the `Expectation` class, developed outside of the Great Expectations library..
Prerequisites
Great Expectations supports a number of Execution EnginesA system capable of processing data to compute Metrics., including a Spark Execution Engine. These Execution Engines provide the computing resources used to calculate the MetricsA computed attribute of data such as the mean of a column. defined in the Metric class of your Custom Expectation.
If you decide to contribute your ExpectationA verifiable assertion about data., its entry in the Expectations Gallery will reflect the Execution Engines that it supports.
We will add Spark support for the Custom Expectations implemented in our guides on how to create Custom Column Aggregate Expectations and how to create Custom Column Map Expectations.
Specify your backends
To avoid surprises and help clearly define your Custom Expectation, it can be helpful to determine beforehand what backends you plan to support, and test them along the way.
Within the examples defined inside your Expectation class, the optional only_for and suppress_test_for keys specify which backends to use for testing. If a backend is not specified, Great Expectations attempts testing on all supported backends. Run the following command to add entries corresponding to the functionality you want to add:
examples = [
{
"data": {"x": [1, 2, 3, 4, 5], "y": [0, -1, -2, 4, None]},
"only_for": ["pandas", "spark", "sqlite", "postgresql"],
"tests": [
{
"title": "basic_positive_test",
"exact_match_out": False,
"include_in_gallery": True,
"in": {
"column": "x",
"min_value": 4,
"strict_min": True,
"max_value": 5,
"strict_max": False,
},
"out": {"success": True},
},
{
"title": "basic_negative_test",
"exact_match_out": False,
"include_in_gallery": True,
"in": {
"column": "y",
"min_value": -2,
"strict_min": False,
"max_value": 3,
"strict_max": True,
},
"out": {"success": False},
},
],
}
]
The optional only_for and suppress_test_for keys may be specified at the top-level (next to data and tests) or within specific tests (next to title, etc).
Allowed backends include: "bigquery", "mssql", "mysql", "pandas", "postgresql", "redshift", "snowflake", "spark", "sqlite", "trino"
Implement the Spark logic for your Custom Expectation
Great Expectations provides a variety of ways to implement an Expectation in Spark. Two of the most common include:
- Defining a partial function that takes a Spark DataFrame column as input
- Directly executing queries on Spark DataFrames to determine the value of your Expectation's metric directly
- Partial Function
- Query Execution
Great Expectations allows for much of the PySpark DataFrame logic to be abstracted away by specifying metric behavior as a partial function.
To do this, we use one of the @column_*_partial decorators:
@column_aggregate_partialfor Column Aggregate Expectations@column_condition_partialfor Column Map Expectations@column_pair_condition_partialfor Column Pair Map Expectations@multicolumn_condition_partialfor Multicolumn Map Expectations
These decorators expect an appropriate engine argument. In this case, we'll pass our SparkDFExecutionEngine.
The decorated method takes in a Spark Column object and will either return a pyspark.sql.functions.function or a pyspark.sql.Column.function that Great Expectations will use to generate the appropriate SQL queries.
For our Custom Column Aggregate Expectation ExpectColumnMaxToBeBetweenCustom, we're going to leverage PySpark's max SQL Function and the @column_aggregate_partial decorator.
@column_aggregate_partial(engine=SparkDFExecutionEngine)
def _spark(cls, column, _table, _column_name, **kwargs):
"""Spark Max Implementation"""
return F.max(column)
If we need a builtin function from pyspark.sql.functions, usually aliased to F, the import logic in
from great_expectations.compatibility.pyspark import functions as F
from great_expectations.compatibility import pyspark
allows us to access these functions even when PySpark is not installed.
Details
Applying Python Functions
F.udf allows us to use a Python function as a Spark User Defined Function for Column Map Expectations,
giving us the ability to define custom functions and apply them to our data.Here is an example of F.udf applied to ExpectColumnValuesToEqualThree:
@column_condition_partial(engine=SparkDFExecutionEngine)
def _spark(cls, column, strftime_format, **kwargs):
def is_equal_to_three(val):
return (val == 3)
success_udf = F.udf(is_equal_to_three, pyspark.types.BooleanType())
return success_udf(column)
For more on F.udf and the functionality it provides, see the Apache Spark UDF documentation.
The most direct way of implementing a metric is by computing its value by constructing or directly executing querys using objects provided by the @metric_* decorators:
@metric_valuefor Column Aggregate Expectations- Expects an appropriate
engine,metric_fn_type, anddomain_type
- Expects an appropriate
@metric_partialfor all Map Expectations- Expects an appropriate
engine,partial_fn_type, anddomain_type
- Expects an appropriate
Our engine will reflect the backend we're implementing (SparkDFExecutionEngine), while our fn_type and domain_type are unique to the type of Expectation we're implementing.
These decorators enable a higher-complexity workflow, allowing you to explicitly structure your queries and make intermediate queries to your database. While this approach can result in extra roundtrips to your database, it can also unlock advanced functionality for your Custom Expectations.
For our Custom Column Map Expectation ExpectColumnValuesToEqualThree, we're going to implement the @metric_partial decorator,
specifying the type of value we're computing (MAP_CONDITION_FN) and the domain over which we're computing (COLUMN):
@metric_partial(
engine=SparkDFExecutionEngine,
partial_fn_type=MetricPartialFunctionTypes.MAP_CONDITION_FN,
domain_type=MetricDomainTypes.COLUMN,
)
def _spark(
cls,
execution_engine: SparkDFExecutionEngine,
metric_domain_kwargs,
metric_value_kwargs,
metrics,
runtime_configuration,
):
The decorated method takes in a valid Execution Engine and relevant kwargs,
and will return a tuple of:
- A
pyspark.sql.column.Columndefining the query to be executed compute_domain_kwargsaccessor_domain_kwargs
These will be used to execute our query and compute the results of our metric.
To do this, we need to access our Compute Domain directly:
(
selectable,
compute_domain_kwargs,
accessor_domain_kwargs,
) = execution_engine.get_compute_domain(
metric_domain_kwargs, MetricDomainTypes.COLUMN
)
column_name = accessor_domain_kwargs["column"]
column = F.col(column_name)
This allows us to build and return a query to be executed, providing the result of our metric:
query = F.when(column == 3, F.lit(False)).otherwise(F.lit(True))
return (query, compute_domain_kwargs, accessor_domain_kwargs)
Because in Spark we are implementing the window function directly, we have to return the unexpected condition: False when column == 3, otherwise True.
Verify your implementation
If you now run your file, print_diagnostic_checklist() will attempt to execute your example cases using this new backend.
If your implementation is correctly defined, and the rest of the core logic in your Custom Expectation is already complete, you will see the following in your Diagnostic Checklist:
✔ Has at least one positive and negative example case, and all test cases pass
If you've already implemented the Pandas backend covered in our How-To guides for creating Custom Expectations and the SQLAlchemy backend covered in our guide on how to add SQLAlchemy support for Custom Expectations, you should see the following in your Diagnostic Checklist:
✔ Has core logic that passes tests for all applicable Execution Engines and SQL dialects
Congratulations!
🎉 You've successfully implemented Spark support for a Custom Expectation! 🎉
Contribution (Optional)
This guide will leave you with core functionality sufficient for contribution to Great Expectations at an Experimental level.
If you're interested in having your contribution accepted at a Beta level, your Custom Expectation will need to support SQLAlchemy, Spark, and Pandas.
For full acceptance into the Great Expectations codebase at a Production level, we require that your Custom Expectation meets our code standards, including test coverage and style. If you believe your Custom Expectation is otherwise ready for contribution at a Production level, please submit a Pull Request, and we will work with you to ensure your Custom Expectation meets these standards.
For more information on our code standards and contribution, see our guide on Levels of Maturity for Expectations.
To view the full scripts used in this page, see them on GitHub: