Skip to main content

How to connect to data on a filesystem using Spark

This guide will help you connect to your data stored on a filesystem using Spark. This will allow you to validate and explore your data.

Prerequisites: This how-to guide assumes you have:
  • Completed the Getting Started Tutorial
  • Have a working installation of Great Expectations
  • Have access to a working Spark installation
  • Have access to data on a filesystem

Steps#

1. Choose how to run the code in this guide#

Get an environment to run the code in this guide. Please choose an option below.

If you use the Great Expectations CLI, run this command to automatically generate a pre-configured Jupyter Notebook. Then you can follow along in the YAML-based workflow below:

great_expectations --v3-api datasource new

2. πŸ’‘ Instantiate your project's DataContext#

Import these necessary packages and modules.

from ruamel import yaml
import great_expectations as gefrom great_expectations.core.batch import BatchRequest, RuntimeBatchRequest

Please proceed only after you have instantiated your DataContext.

3. Configure your Datasource#

Using this example configuration, add in your path to a directory that contains some of your data:

datasource_yaml = fr"""name: my_filesystem_datasourceclass_name: Datasourceexecution_engine:    class_name: SparkDFExecutionEnginedata_connectors:    default_runtime_data_connector_name:        class_name: RuntimeDataConnector        batch_identifiers:            - default_identifier_name    default_inferred_data_connector_name:        class_name: InferredAssetFilesystemDataConnector        base_directory: <YOUR_PATH>        default_regex:            group_names:                - data_asset_name            pattern: (.*)\.csv"""

Run this code to test your configuration.

context.test_yaml_config(datasource_yaml)

If you specified a path containing CSV files you will see them listed as Available data_asset_names in the output of test_yaml_config().

Feel free to adjust your configuration and re-run test_yaml_config() as needed.

4. Save the Datasource configuration to your DataContext#

Save the configuration into your DataContext by using the add_datasource() function.

context.add_datasource(**yaml.load(datasource_yaml))

5. Test your new Datasource#

Verify your new Datasource by loading data from it into a Validator using a BatchRequest.

Add the path to your CSV in the path key under runtime_parameters in your BatchRequest.

batch_request = RuntimeBatchRequest(    datasource_name="my_filesystem_datasource",    data_connector_name="default_runtime_data_connector_name",    data_asset_name="<YOUR_MEANGINGFUL_NAME>",  # this can be anything that identifies this data_asset for you    runtime_parameters={"path": "<PATH_TO_YOUR_DATA_HERE>"},  # Add your path here.    batch_identifiers={"default_identifier_name": "default_identifier"},)

Then load data into the Validator.

context.create_expectation_suite(    expectation_suite_name="test_suite", overwrite_existing=True)validator = context.get_validator(    batch_request=batch_request, expectation_suite_name="test_suite")print(validator.head())

πŸš€πŸš€ Congratulations! πŸš€πŸš€ You successfully connected Great Expectations with your data.

Additional Notes#

How to read-in multiple CSVs as a single Spark Dataframe#

More advanced configuration for reading in CSV files through the SparkDFExecutionEngine is possible through the batch_spec_passthrough parameter. batch_spec_passthrough allows for reader-methods to be directly specified, and backend-specific reader_options to be passed through to the actual reader-method, in this case spark.read.csv(). The following example shows how batch_spec_passthrough parameters can be added to the BatchRequest. However, the same parameters can be added to the Datasource configuration at the DataConnector level.

If you have a directory with 3 CSV files with each file having 10,000 lines each:

  taxi_data_files/yellow_trip_data_sample_2019-1.csv  taxi_data_files/yellow_trip_data_sample_2019-2.csv  taxi_data_files/yellow_trip_data_sample_2019-3.csv

You could write a BatchRequest that reads in the entire folder as a single Spark Dataframe by specifying the reader_method to be csv, header to be set to True in the reader_options.

batch_request = RuntimeBatchRequest(    datasource_name="my_filesystem_datasource",    data_connector_name="default_runtime_data_connector_name",    data_asset_name="example_data_asset",    runtime_parameters={"path": "taxi_data_files"},    batch_identifiers={"default_identifier_name": 1234567890},    batch_spec_passthrough={"reader_method": "csv", "reader_options": {"header": True}},)

Once that step is complete, then we can confirm that our Validator contains a batch with the expected 30,000 lines.

context.create_expectation_suite(    expectation_suite_name="test_suite", overwrite_existing=True)validator = context.get_validator(    batch_request=batch_request, expectation_suite_name="test_suite")
print(validator.head())print(validator.active_batch.data.dataframe.count())  # should be 30,000

If you are working with nonstandard CSVs, read one of these guides:

To view the full scripts used in this page, see them on GitHub:

Next Steps#

Now that you've connected to your data, you'll want to work on these core skills: