Spark & Databricks#
This tutorial demonstrates how deepchecks can be used on the Databricks ML platform using Spark. We will build a logistic regression model on top of the Adult dataset, a sample dataset that is automatically available on every databricks workspace.
Loading the dataset#
We first define the dataset schema and then load it as a Spark dataframe.
schema = """`age` DOUBLE,
`workclass` STRING,
`fnlwgt` DOUBLE,
`education` STRING,
`education_num` DOUBLE,
`marital_status` STRING,
`occupation` STRING,
`relationship` STRING,
`race` STRING,
`sex` STRING,
`capital_gain` DOUBLE,
`capital_loss` DOUBLE,
`hours_per_week` DOUBLE,
`native_country` STRING,
`income` STRING"""
dataset = spark.read.csv("/databricks-datasets/adult/adult.data", schema=schema)
# Splitting the data to train/test set
trainDF, testDF = dataset.randomSplit([0.8, 0.2], seed=42)
Defining deepchecks Dataset#
We first convert the spark DataFrame to a pandas dataframe deepchecks can work with.
Note
Conversion to a pandas dataframe will load the data into memory. If you have a large dataset, is is recommended to sample the data first. Logically, it is OK to sample since anyway most of the checks will be performed on a small subset of the data.
from deepchecks.tabular import Dataset
pd_train = trainDF.toPandas()
pd_test = testDF.toPandas()
ds_train = Dataset(pd_train, label='income', cat_features=['workclass', 'education', 'marital_status', 'occupation', 'relationship', 'race', 'sex', 'native_country'])
ds_test = Dataset(pd_test, label='income', cat_features=['workclass', 'education', 'marital_status', 'occupation', 'relationship', 'race', 'sex', 'native_country'])
Running the Integrity Suite#
One of deepchecks’ use-cases is to validate the integrity of the dataset, even without a model. In order to do so, the single dataset integrity suite can be run on the dataset.
from deepchecks.tabular.suites import data_integrity
# Validate the training set
train_res = data_integrity().run(ds_train)
Displaying the results#
We will use the built-in functions of the Databricks platform to view the results in a HTML format.
from io import StringIO
buff = StringIO()
train_res.save_as_html(buff)
displayHTML(buff.getvalue())
Building a Logistic Regression Model#
After we validated that our data is clean and ready to be used in a model, we will build a logistic regression model that classifies whether a person’s income is above or below 50K. First, we preprocess the features. The categorical features are one-hot encoded and the label is being transformed to 0/1.
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
categoricalCols = ["workclass", "education", "marital_status", "occupation", "relationship", "race", "sex"]
# The following two lines are estimators. They return functions that we will later apply to transform the dataset.
stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=[x + "Index" for x in categoricalCols])
encoder = OneHotEncoder(inputCols=stringIndexer.getOutputCols(), outputCols=[x + "OHE" for x in categoricalCols])
# The label column ("income") is also a string value - it has two possible values, "<=50K" and ">50K".
# Convert it to a numeric value using StringIndexer.
labelToIndex = StringIndexer(inputCol="income", outputCol="label")
stringIndexerModel = stringIndexer.fit(trainDF)
# This includes both the numeric columns and the one-hot encoded binary vector columns in our dataset.
numericCols = ["age", "fnlwgt", "education_num", "capital_gain", "capital_loss", "hours_per_week"]
assemblerInputs = [c + "OHE" for c in categoricalCols] + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
Training the Model#
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
lr = LogisticRegression(featuresCol="features", labelCol="label")
# Define the pipeline based on the stages created in previous steps.
pipeline = Pipeline(stages=[stringIndexer, encoder, labelToIndex, vecAssembler, lr])
# Fit the pipeline model.
pipelineModel = pipeline.fit(trainDF)
Writing a Model Wrapper#
We will write a wrapper to our model, that will implement the required API for deepchecks according the the
Working with Models and Predictions guide. Generally the wrapper model will contain 2 functions in
case of a classification problem: the predict
and the predict_proba
functions that will be called by
deepchecks. In addition it is also possible to specify the feature importances of the model. Read more about
feature importance handling in the Feature Importance guide.
import numpy as np
import pyspark
from pyspark.ml.feature import IndexToString
class PySparkModelWrapper:
def __init__(self, model: pyspark.ml.pipeline.PipelineModel, label_map):
self.model = model
self.idx_to_string = IndexToString(inputCol="prediction", outputCol="predictedValue")
self.idx_to_string.setLabels(label_map)
def predict(self, X: np.ndarray) -> np.ndarray:
df=spark.createDataFrame(X)
preds = self.idx_to_string.transform(self.model.transform(df).select('prediction')).select('predictedValue').collect()
return np.array(preds).reshape(-1)
def predict_proba(self, X: np.ndarray) -> np.ndarray:
df=spark.createDataFrame(X)
preds = self.model.transform(df).select('prediction').collect()
return np.array(preds).reshape(-1, 2)
@property
def feature_importances_(self):
return np.array([1/14] * 14)
Note
The wrapper here considers that all features are equally important. This is not a valid assumption for real models, but is done here for simplicity.
Evaluating the Model Using Deepchecks Suites#
We will run 2 suites, the model_evaluation
suite that is meant to test model performance and overfit, and the
train_test_validation
is meant to validate correctness of train-test split, including integrity, distribution and
leakage checks.
from deepchecks.tabular.suites import model_evaluation, train_test_validation
eval_suite = model_evaluation()
model_evaluation_res = eval_suite.run(ds_train,ds_test, PySparkModelWrapper(pipelineModel,
pipelineModel.stages[2].labels))
train_test_suite = train_test_validation()
train_test_res = train_test_suite.run(ds_train, ds_test, PySparkModelWrapper(pipelineModel,
pipelineModel.stages[2].labels))
Displaying the Results#
from io import StringIO
buff = StringIO()
model_evaluation_res.save_as_html(buff)
displayHTML(buff.getvalue())
buff = StringIO()
train_test_res.save_as_html(buff)
displayHTML(buff.getvalue())