Airflow#

Note

Download the full code example from the following link

Apache Airflow is an open-source workflow management system. It is commonly used to automate data processing, data science, and data engineering pipelines.

This tutorial demonstrates how deepchecks can be used with Apache Airflow. We will run a simple Airflow DAG that will evaluate the Adult dataset from the UCI Machine Learning Repository. The DAG will run the single_dataset_integrity() and the model_evaluation() suites on the Adult data and a pre-trained model.

The DAG for this tutorial
from datetime import datetime, timedelta
import os

from airflow import DAG
from airflow.operators.python import PythonOperator
import joblib
import pandas as pd

from deepchecks.tabular.datasets.classification import adult

dir_path = "suite_results"
# For demo only. Replace that with a S3/GCS other than local filesystem
data_path = os.path.join(os.getcwd(), "data")

Defining the Data & Model Loading Tasks#

def load_adult_dataset(**context):
    df_train, df_test = adult.load_data(data_format='Dataframe')

    try:
        os.mkdir(data_path)
    except OSError:
        print("Creation of the directory {} failed".format(dir_path))

    with open(os.path.join(data_path, "adult_train.csv"), "w") as f:
        df_train.to_csv(f, index=False)
        context["ti"].xcom_push(key="train_path", value=os.path.join(data_path, "adult_train.csv"))
    with open(os.path.join(data_path, "adult_test.csv"), "w") as f:
        df_test.to_csv(f, index=False)
        context["ti"].xcom_push(key="test_path", value=os.path.join(data_path, "adult_test.csv"))


def load_adult_model(**context):
    from deepchecks.tabular.datasets.classification.adult import load_fitted_model

    model = load_fitted_model()
    with open(os.path.join(data_path, "adult_model.joblib"), "wb") as f:
        joblib.dump(model, f)

    context["ti"].xcom_push(key="adult_model", value=os.path.join(data_path, "adult_model.joblib"))

Note

The dataset and the model are saved in the local filesystem for simplicity. For most use-cases, it is recommended to save the data and the model in a S3/GCS/other intermediate storage.

Defining the Integrity Report Task#

The single_dataset_integrity() suite will be used to evaluate the train and production datasets. It will check for integrity issues and will save the output html reports to the suite_results directory.

def dataset_integrity_step(**context):
    from deepchecks.tabular.suites import data_integrity
    from deepchecks.tabular.datasets.classification.adult import _CAT_FEATURES, _target
    from deepchecks.tabular import Dataset

    adult_train = pd.read_csv(context.get("ti").xcom_pull(key="train_path"))
    adult_test = pd.read_csv(context.get("ti").xcom_pull(key="test_path"))

    ds_train = Dataset(adult_train, label=_target, cat_features=_CAT_FEATURES)
    ds_test = Dataset(adult_test, label=_target, cat_features=_CAT_FEATURES)

    train_results = data_integrity().run(ds_train)
    test_results = data_integrity().run(ds_test)

    try:
        os.mkdir('suite_results')
    except OSError:
        print("Creation of the directory {} failed".format(dir_path))

    run_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    train_results.save_as_html(os.path.join(dir_path, f'train_integrity_{run_time}.html'))
    test_results.save_as_html(os.path.join(dir_path, f'test_integrity_{run_time}.html'))

Defining the Model Evaluation Task#

The model_evaluation() suite will be used to evaluate the model itself. It will check for model performance and overfit issues and will save the report to the suite_results directory.

def model_evaluation_step(**context):
    from deepchecks.tabular.suites import model_evaluation
    from deepchecks.tabular.datasets.classification.adult import _CAT_FEATURES, _target
    from deepchecks.tabular import Dataset

    adult_model = joblib.load(context.get("ti").xcom_pull(key="adult_model"))
    adult_train = pd.read_csv(context.get("ti").xcom_pull(key="train_path"))
    adult_test = pd.read_csv(context.get("ti").xcom_pull(key="test_path"))
    ds_train = Dataset(adult_train, label=_target, cat_features=_CAT_FEATURES)
    ds_test = Dataset(adult_test, label=_target, cat_features=_CAT_FEATURES)

    evaluation_results = model_evaluation().run(ds_train, ds_test, adult_model)

    run_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    evaluation_results.save_as_html(os.path.join(dir_path, f'model_evaluation_{run_time}.html'))

Creating the DAG#

After we have defined all the tasks, we can create the DAG using Airflow syntax. We will define a DAG that will run every day.

with DAG(
        dag_id="deepchecks_airflow_integration",
        schedule_interval="@daily",
        default_args={
            "owner": "airflow",
            "retries": 1,
            "retry_delay": timedelta(minutes=5),
            "start_date": datetime(2021, 1, 1),
        },
        catchup=False,
) as dag:
    load_adult_dataset = PythonOperator(
        task_id="load_adult_dataset",
        python_callable=load_adult_dataset
    )

    integrity_report = PythonOperator(
        task_id="integrity_report",
        python_callable=dataset_integrity_step
    )

    load_adult_model = PythonOperator(
        task_id="load_adult_model",
        python_callable=load_adult_model
    )

    evaluation_report = PythonOperator(
        task_id="evaluation_report",
        python_callable=model_evaluation_step
    )

load_adult_dataset >> integrity_report
load_adult_dataset >> load_adult_model >> evaluation_report

And that’s it! In order to run the dag, make sure you place the file in your DAGs folder referenced in your airflow.cfg. The default location for your DAGs is ~/airflow/dags.

The DAG is scheduled to run daily, but the scheduling can be configured using the schedule_interval property. The DAG can also be manually triggered for a single run by using the following command:

airflow dags backfill deepchecks_airflow_integration --start-date <some date in YYYY-MM-DD format>