Skip to main content
Version: 0.14.13

How to Use Great Expectations in Flyte

This piece of documentation was authored by Samhita Alla and the Flyte team

This guide will help you run a Great Expectations in Flyte

The power of data validation in Great Expectations can be integrated with Flyte to validate the data moving in and out of the pipeline entities you may have defined in Flyte. This helps establish stricter boundaries around your data to ensure that everything works as expected and data does not crash your pipelines anymore unexpectedly!

Flyte is a structured programming and distributed processing platform that enables highly concurrent, scalable, and maintainable workflows for Machine Learning and Data Processing. It is a fabric that connects disparate computation backends using a type-safe data dependency graph. It records all changes to a pipeline, making it possible to rewind time. It also stores a history of all executions and provides an intuitive UI, CLI, and REST/gRPC API to interact with the computation.

Key Benefits of Flyte

  • Kubernetes-Native Workflow Automation Platform
  • Ergonomic SDKs in Python, Java & Scala
  • Versioned & Auditable
  • Reproducible Pipelines
  • Strong Data Typing See all of the features offered by Flyte in the README.

Here’s a simple Flyte code snippet using the Python Flytekit API that defines a Flyte task to compute the total pay, which returns the output as a pandas DataFrame:

import pandas as pd
from flytekit import Resources, task


@task(limits=Resources(cpu="2", mem="150Mi"))
def total_pay(hourly_pay: float, hours_worked: int, df: pd.DataFrame) -> pd.DataFrame:
return df.assign(total_pay=hourly_pay * hours_worked)

The Idea

Flyte helps automate how we architect our ML or data pipelines, which means data is an integral part of Flyte, and lots of data points flow in and out of the pipelines. Keeping an eye on data all the time isn’t a feasible solution. What we need is an automated mechanism that validates our data thoroughly.

Flyte does have a native type system that enforces the correctness of data; however, that alone doesn’t suffice in performing comprehensive data validation. This is where Great Expectations can help Flyte take its data validation system to the next level!

If you haven’t worked with Flyte before, jump right into the getting started tutorial.

Background

As mentioned earlier, Flyte has a native type system. This includes support for FlyteFile and FlyteSchema.

  • FlyteFile: FlyteFile represents an automatic persistence object in Flyte. It represents files in remote storage, and Flyte will transparently materialize them in every task execution.
  • FlyteSchema: FlyteSchema supports tabular data. Examples include pandas DataFrame, Spark DataFrame, etc.

The integration has been written such that both FlyteFile and FlyteSchema are inherently supported. So you can validate FlyteFile and FlyteSchema using Great Expectations within any Flyte pipeline!

Flyte types have been added because, in Great Expectations, we have the ability to pass a non-string (Pandas/Spark DataFrame) when using a RuntimeDataConnector but not when using an InferredAssetFilesystemDataConnector or a ConfiguredAssetFilesystemDataConnector. For the latter case, with the integration of Flyte types, we can give a Pandas/Spark DataFrame or a remote URI as the dataset.

Ways to Define the Integration

To use the plugin, run the following commands:

pip install flytekit
pip install flytekitplugins-great_expectations

An integration in Flyte can be a task or a type.

  • Flyte Task: A Flyte task defines the task prototype that one could use within a task or a workflow. Example: Papermill
  • Flyte Type: A Flyte type is a custom data type that one could attach to the data flowing through Flyte pipelines. Example: Pandera

Great Expectations in Flyte is available both as a task and a type.

Task Example

Great Expectations can be written as a Flyte task. Before running the code examples, make sure to have the Great Expectations configuration and expectation suites set up. Here’s an example using FlyteFile:

import pandas as pd
from flytekit import Resources, kwtypes, task, workflow
from flytekit.types.file import CSVFile
from flytekitplugins.great_expectations import GreatExpectationsTask

file_task_object = GreatExpectationsTask(
name="great_expectations_task_flytefile",
datasource_name="data",
inputs=kwtypes(dataset=CSVFile),
expectation_suite_name="test.demo",
data_connector_name="data_flytetype_data_connector",
local_file_path="/tmp",
context_root_dir="great_expectations",
)


@task(limits=Resources(mem="500Mi"))
def file_task(
dataset: CSVFile,
) -> int:
file_task_object(dataset=dataset)
return len(pd.read_csv(dataset))


@workflow
def file_wf(
dataset: CSVFile = "https://raw.githubusercontent.com/superconductive/ge_tutorials/main/data/yellow_tripdata_sample_2019-01.csv",
) -> int:
return file_task(dataset=dataset)

Note: Additional Batch Request parameters can be given using BatchRequestConfig.

In the code, we define a file_task_object that encapsulates the Great Expectations configuration. Later, we initialize a Flyte task that performs data validation and returns the length of the DataFrame once the validation succeeds.

CSVFile—a variant of FlyteFile, accepts remote dataset and downloads it to the user-given local_file_path. The plugin then fetches the file from that specific path and does validation. If the validation fails, the plugin shows the data columns and the associated expectations.

Figure: Validation failure message on the Flyte console

Figure: Validation failure message on the Flyte console

Note: local_file_path’s directory and base_directory in Great Expectations config should be the same.

Type Example

Great Expectations validation can be encapsulated in Flyte’s type-system. Here’s an example using FlyteSchema:

import pandas as pd
from flytekit import Resources, task, workflow
from flytekit.types.schema import FlyteSchema
from flytekitplugins.great_expectations import (
BatchRequestConfig,
GreatExpectationsFlyteConfig,
GreatExpectationsType,
)


@task(limits=Resources(mem="500Mi"))
def schema_task(
dataframe: GreatExpectationsType[
FlyteSchema,
GreatExpectationsFlyteConfig(
datasource_name="data",
expectation_suite_name="test.demo",
data_connector_name="data_flytetype_data_connector",
batch_request_config=BatchRequestConfig(data_connector_query={"limit": 10}),
local_file_path="/tmp/test.parquet",
context_root_dir="great_expectations",
),
]
) -> int:
return dataframe.shape[0]


@task
def to_df(dataset: str) -> pd.DataFrame:
return pd.read_csv(dataset)


@workflow
def schema_wf() -> int:
return schema_task(
dataframe=to_df(
dataset="https://raw.githubusercontent.com/superconductive/ge_tutorials/main/data/yellow_tripdata_sample_2019-01.csv"
)
)

In the code, we define a schema_task that accepts a dataframe argument, which is associated with the GreatExpectationsType. The first argument denotes the data type, which in our case is a FlyteSchema, and the second argument represents the Great Expectations configuration.

The GreatExpectationsType generates a parquet file and copies it to the local_file_path when inputting the tabular data. This path is then used to perform data validation.

But Wait, There’s More

Great Expectations’ RunTimeBatchRequest can be used just like a simple BatchRequest in Flyte. Make sure to set up the data connector correctly. The plugin then automatically checks for the type of batch request and instantiates it. Check RunTimeBatchRequest in task and type examples.

In the future, we plan to integrate Great Expectations data docs with Flyte UI. This can enhance the visualization of errors and capture the key characteristics of the dataset.

Here’s Where to Go Next

Refer to the fully worked task and type examples in the Flyte documentation to understand how well Great Expectations integrates with Flyte. To run the code examples in Flyte sandbox, refer to the setting up the environment section.

Let us know if you have any questions or suggestions and feel free to jump into the Great Expectations Slack as well as Flyte's Slack!