Skip to main content
Version: 0.14.13

How to connect to data on a filesystem using Spark

This guide will help you connect to your data stored on a filesystem using Spark. This will allow you to ValidateThe act of applying an Expectation Suite to a Batch. and explore your data.

Prerequisites: This how-to guide assumes you have:
  • Completed the Getting Started Tutorial
  • Have a working installation of Great Expectations
  • Have access to a working Spark installation
  • Have access to data on a filesystem

Steps

1. Choose how to run the code in this guide

Get an environment to run the code in this guide. Please choose an option below.

If you use the Great Expectations CLICommand Line Interface, run this command to automatically generate a pre-configured Jupyter Notebook. Then you can follow along in the YAML-based workflow below:

great_expectations datasource new

2. 💡 Instantiate your project's DataContext

Import these necessary packages and modules.

from ruamel import yaml

import great_expectations as ge
from great_expectations.core.batch import BatchRequest, RuntimeBatchRequest

Please proceed only after you have instantiated your DataContext.

3. Configure your Datasource

Using this example configuration, add in your path to a directory that contains some of your data:

datasource_yaml = rf"""
name: my_filesystem_datasource
class_name: Datasource
execution_engine:
class_name: SparkDFExecutionEngine
data_connectors:
default_runtime_data_connector_name:
class_name: RuntimeDataConnector
batch_identifiers:
- default_identifier_name
default_inferred_data_connector_name:
class_name: InferredAssetFilesystemDataConnector
base_directory: <YOUR_PATH>
default_regex:
group_names:
- data_asset_name
pattern: (.*)\.csv
"""

Run this code to test your configuration.

context.test_yaml_config(datasource_yaml)

If you specified a path containing CSV files you will see them listed as Available data_asset_names in the output of test_yaml_config().

Feel free to adjust your configuration and re-run test_yaml_config() as needed.

4. Save the Datasource configuration to your DataContext

Save the configuration into your Data ContextThe primary entry point for a Great Expectations deployment, with configurations and methods for all supporting components. by using the add_datasource() function.

context.add_datasource(**yaml.load(datasource_yaml))

5. Test your new Datasource

Verify your new DatasourceProvides a standard API for accessing and interacting with data from a wide variety of source systems. by loading data from it into a ValidatorUsed to run an Expectation Suite against data. using a Batch RequestProvided to a Datasource in order to create a Batch..

Add the path to your CSV in the path key under runtime_parameters in your BatchRequest.

batch_request = RuntimeBatchRequest(
datasource_name="my_filesystem_datasource",
data_connector_name="default_runtime_data_connector_name",
data_asset_name="<YOUR_MEANGINGFUL_NAME>", # this can be anything that identifies this data_asset for you
runtime_parameters={"path": "<PATH_TO_YOUR_DATA_HERE>"}, # Add your path here.
batch_identifiers={"default_identifier_name": "default_identifier"},
)

Then load data into the Validator.

context.create_expectation_suite(
expectation_suite_name="test_suite", overwrite_existing=True
)
validator = context.get_validator(
batch_request=batch_request, expectation_suite_name="test_suite"
)
print(validator.head())

🚀🚀 Congratulations! 🚀🚀 You successfully connected Great Expectations with your data.

Additional Notes

How to read-in multiple CSVs as a single Spark Dataframe

More advanced configuration for reading in CSV files through the SparkDFExecutionEngine is possible through the batch_spec_passthrough parameter. batch_spec_passthrough allows for reader-methods to be directly specified, and backend-specific reader_options to be passed through to the actual reader-method, in this case spark.read.csv(). The following example shows how batch_spec_passthrough parameters can be added to the BatchRequest. However, the same parameters can be added to the Datasource configuration at the DataConnector level.

If you have a directory with 3 CSV files with each file having 10,000 lines each:

  taxi_data_files/yellow_tripdata_sample_2019-1.csv
taxi_data_files/yellow_tripdata_sample_2019-2.csv
taxi_data_files/yellow_tripdata_sample_2019-3.csv

You could write a BatchRequest that reads in the entire folder as a single Spark Dataframe by specifying the reader_method to be csv, header to be set to True in the reader_options.

batch_request = RuntimeBatchRequest(
datasource_name="my_filesystem_datasource",
data_connector_name="default_runtime_data_connector_name",
data_asset_name="example_data_asset",
runtime_parameters={"path": "taxi_data_files"},
batch_identifiers={"default_identifier_name": 1234567890},
batch_spec_passthrough={"reader_method": "csv", "reader_options": {"header": True}},
)

Once that step is complete, then we can confirm that our Validator contains a BatchA selection of records from a Data Asset. with the expected 30,000 lines.

context.create_expectation_suite(
expectation_suite_name="test_suite", overwrite_existing=True
)
validator = context.get_validator(
batch_request=batch_request, expectation_suite_name="test_suite"
)

print(validator.head())
print(validator.active_batch.data.dataframe.count()) # should be 30,000

If you are working with nonstandard CSVs, read one of these guides:

To view the full scripts used in this page, see them on GitHub:

Next Steps

Now that you've connected to your data, you'll want to work on these core skills: