Training XGBoost with Dask RAPIDS in Databricks#

This notebook shows how to deploy Dask RAPIDS workflow in Databricks. We will focus on the HIGGS dataset, a moderately sized classification problem from the UCI Machine Learning repository.

In the following sections, we will begin by loading the dataset from Delta Lake and performing preprocessing with Dask. Then train an XGBoost model with various configurations and explore techniques for optimizing inference.

Launch multi-node Dask Cluster#

This workflow example can be ran on GPU, and you don’t even need to have the GPU locally since Databricks can provide one for you. Whereas Dask enables users to easily distribute or scale up computation tasks within a single GPU or across multiple GPUs.

Dask recently introduced dask-databricks (available via conda and pip). With this CLI tool, the dask databricks run --cuda command will launch a Dask scheduler in the driver node and cuda workers in the remaining nodes.

From a high level, we could break down this section into the following steps:

  • Create a new init script that installs RAPIDS and runs dask-databricks

  • Create a new multi-node cluster that uses the init script

  • Once the cluster is running upload this notebook to Databricks and continue running these cells on there

See Documentation

For more detailed information on launching Dask-RAPIDS in Databricks see the documentation.

Import packages#

Once your cluster has launched, start by importing all necessary libraries and dependencies.

import os
from time import time
from typing import Tuple

import pandas as pd
import numpy as np
import cupy
import cudf
import dask
import dask.dataframe as dd
import dask_cudf
import dask_databricks
import dask_deltatable as ddt
import xgboost as xgb
from xgboost import dask as dxgb
from dask_ml.model_selection import train_test_split
from distributed import wait

Connect to Dask Client#

Connect to the client (and optionally Dashboard) to submit tasks.

client = dask_databricks.get_client()
client

Client

Client-23114b4f-b7aa-11ee-87d9-9a67d50005f3

Connection method: Cluster object Cluster type: dask_databricks.DatabricksCluster
Dashboard: https://dbc-dp-8721196619973675.cloud.databricks.com/driver-proxy/o/8721196619973675/1031-230718-l2ubf858/8087/status

Cluster Info

Download dataset#

First we download the dataset to Databrick File Storage (DBFS). Alternatively, you could also use cloud storage (S3, Google Cloud, Azure Data Lake Refer to docs for more information

import subprocess

# Define the directory and file paths
directory_path = "/dbfs/databricks/rapids"
file_path = f"{directory_path}/HIGGS.csv.gz"

# Check if directory already exists
if not os.path.exists(directory_path):
    os.makedirs(directory_path)

# Check if the file already exists
if not os.path.exists(file_path):
    # If not, download dataset to the directory
    download_command = f"curl https://archive.ics.uci.edu/ml/machine-learning-databases/00280/HIGGS.csv.gz --output {file_path}"
    subprocess.run(download_command, shell=True)

    # decompress the csv file
    decompress_command = f"gunzip {file_path}"
    subprocess.run(decompress_command, shell=True)

Next we load the data into GPUs. Because the data is loaded multiple times during parameter tuning, we convert the original CSV file into Parquet format for better performance. This can be easily done using delta lake as shown in the next steps.

Integrating Dask and Delta Lake#

Delta Lake is an optimized storage layer within the Databricks lakehouse that provides a foundational platform for storing data and tables. This open-source software extends Parquet data files by incorporating a file-based transaction log to support ACID transactions and scalable metadata handling.

Delta Lake is the default storage format for all operations on Databricks, i.e (unless otherwise specified, all tables on Databricks are Delta tables). Check out tutorial for examples with basic Delta Lake operations.

Let’s explore step-by-step how we can leverage Delta Lake tables with Dask to accelerate data pre-processing with RAPIDS.

Read from Delta table with Dask#

With Dask’s dask-deltatable, we can write the .csv file into a Delta table using Spark then read and parallelize with Dask.

delta_table_name = "higgs_delta_table"

# Check if the Delta table already exists
if spark.catalog.tableExists(delta_table_name):
    # If it exists, print a message
    print(f"The Delta table '{delta_table_name}' already exists.")
else:
    # If not, Load csv file into a Spark dataframe then
    # Write the spark dataframe into delta table
    data = spark.read.csv(file_path, header=True, inferSchema=True)
    data.write.saveAsTable(delta_table_name)
    print(f"The Delta table '{delta_table_name}' has been created.")
The Delta table 'higgs_delta_table' already exists.
display(spark.sql("DESCRIBE DETAIL higgs_delta_table"))
formatidnamedescriptionlocationcreatedAtlastModifiedpartitionColumnsnumFilessizeInBytespropertiesminReaderVersionminWriterVersiontableFeaturesstatistics
delta90cdac79-5500-4a20-914b-47f86b616275spark_catalog.default.higgs_delta_tablenulldbfs:/user/hive/warehouse/higgs_delta_table2024-01-09T15:01:35.629+00002024-01-09T15:04:37.000+0000List()60906326187Map()12List(appendOnly, invariants)Map()

Calling dask_deltalake.read_deltalake() will return a dask dataframe. However, our objective is to utilize GPU acceleration for the entire ML pipeline, including data processing, model training and inference. For this reason, we will read the dask dataframe into a cuDF dask-dataframe using dask_cudf.from_dask_dataframe()

Note that these operations will automatically leverage the Dask client we created, ensuring optimal performance boost through parallelism with dask.

# Read the Delta Lake into a Dask DataFrame using `dask-deltatable`
df = ddt.read_deltalake("/dbfs/user/hive/warehouse/higgs_delta_table")

# Convert Dask DataFrame to Dask cuDF for GPU acceleration
ddf = dask_cudf.from_dask_dataframe(df)

ddf.head()
1.000000000000000000e+00 8.692932128906250000e-01 -6.350818276405334473e-01 2.256902605295181274e-01 3.274700641632080078e-01 -6.899932026863098145e-01 7.542022466659545898e-01 -2.485731393098831177e-01 -1.092063903808593750e+00 0.000000000000000000e+009 ... -1.045456994324922562e-02 -4.576716944575309753e-02 3.101961374282836914e+00 1.353760004043579102e+00 9.795631170272827148e-01 9.780761599540710449e-01 9.200048446655273438e-01 7.216574549674987793e-01 9.887509346008300781e-01 8.766783475875854492e-01
0 1.0 0.907542 0.329147 0.359412 1.497970 -0.313010 1.095531 -0.557525 -1.588230 2.173076 ... -1.138930 -0.000819 0.000000 0.302220 0.833048 0.985700 0.978098 0.779732 0.992356 0.798343
1 1.0 0.798835 1.470639 -1.635975 0.453773 0.425629 1.104875 1.282322 1.381664 0.000000 ... 1.128848 0.900461 0.000000 0.909753 1.108330 0.985692 0.951331 0.803252 0.865924 0.780118
2 0.0 1.344385 -0.876626 0.935913 1.992050 0.882454 1.786066 -1.646778 -0.942383 0.000000 ... -0.678379 -1.360356 0.000000 0.946652 1.028704 0.998656 0.728281 0.869200 1.026736 0.957904
3 1.0 1.105009 0.321356 1.522401 0.882808 -1.205349 0.681466 -1.070464 -0.921871 0.000000 ... -0.373566 0.113041 0.000000 0.755856 1.361057 0.986610 0.838085 1.133295 0.872245 0.808487
4 0.0 1.595839 -0.607811 0.007075 1.818450 -0.111906 0.847550 -0.566437 1.581239 2.173076 ... -0.654227 -1.274345 3.101961 0.823761 0.938191 0.971758 0.789176 0.430553 0.961357 0.957818

5 rows × 29 columns

colnames = ["label"] + ["feature-%02d" % i for i in range(1, 29)]
ddf.columns = colnames
ddf.head()
label feature-01 feature-02 feature-03 feature-04 feature-05 feature-06 feature-07 feature-08 feature-09 ... feature-19 feature-20 feature-21 feature-22 feature-23 feature-24 feature-25 feature-26 feature-27 feature-28
0 1.0 0.907542 0.329147 0.359412 1.497970 -0.313010 1.095531 -0.557525 -1.588230 2.173076 ... -1.138930 -0.000819 0.000000 0.302220 0.833048 0.985700 0.978098 0.779732 0.992356 0.798343
1 1.0 0.798835 1.470639 -1.635975 0.453773 0.425629 1.104875 1.282322 1.381664 0.000000 ... 1.128848 0.900461 0.000000 0.909753 1.108330 0.985692 0.951331 0.803252 0.865924 0.780118
2 0.0 1.344385 -0.876626 0.935913 1.992050 0.882454 1.786066 -1.646778 -0.942383 0.000000 ... -0.678379 -1.360356 0.000000 0.946652 1.028704 0.998656 0.728281 0.869200 1.026736 0.957904
3 1.0 1.105009 0.321356 1.522401 0.882808 -1.205349 0.681466 -1.070464 -0.921871 0.000000 ... -0.373566 0.113041 0.000000 0.755856 1.361057 0.986610 0.838085 1.133295 0.872245 0.808487
4 0.0 1.595839 -0.607811 0.007075 1.818450 -0.111906 0.847550 -0.566437 1.581239 2.173076 ... -0.654227 -1.274345 3.101961 0.823761 0.938191 0.971758 0.789176 0.430553 0.961357 0.957818

5 rows × 29 columns

Split data#

In the preceding step, we used dask-cudf for loading data from the Delta table, now use train_test_split() function from dask-ml to split up the dataset.

Most of the time, the GPU backend of Dask works seamlessly with utilities in dask-ml and we can accelerate the entire ML pipeline as such:

def load_higgs(
    ddf,
) -> Tuple[
    dask_cudf.core.DataFrame,
    dask_cudf.core.Series,
    dask_cudf.core.DataFrame,
    dask_cudf.core.Series,
]:
    y = ddf["label"]
    X = ddf[ddf.columns.difference(["label"])]

    X_train, X_valid, y_train, y_valid = train_test_split(
        X, y, test_size=0.33, random_state=42
    )
    X_train, X_valid, y_train, y_valid = client.persist(
        [X_train, X_valid, y_train, y_valid]
    )
    wait([X_train, X_valid, y_train, y_valid])

    return X_train, X_valid, y_train, y_valid
X_train, X_valid, y_train, y_valid = load_higgs(ddf)
/databricks/python/lib/python3.10/site-packages/dask_ml/model_selection/_split.py:462: FutureWarning: The default value for 'shuffle' must be specified when splitting DataFrames. In the future DataFrames will automatically be shuffled within blocks prior to splitting. Specify 'shuffle=True' to adopt the future behavior now, or 'shuffle=False' to retain the previous behavior.
  warnings.warn(
X_train.head()
feature-01 feature-02 feature-03 feature-04 feature-05 feature-06 feature-07 feature-08 feature-09 feature-10 ... feature-19 feature-20 feature-21 feature-22 feature-23 feature-24 feature-25 feature-26 feature-27 feature-28
0 0.907542 0.329147 0.359412 1.497970 -0.313010 1.095531 -0.557525 -1.588230 2.173076 0.812581 ... -1.138930 -0.000819 0.000000 0.302220 0.833048 0.985700 0.978098 0.779732 0.992356 0.798343
1 0.798835 1.470639 -1.635975 0.453773 0.425629 1.104875 1.282322 1.381664 0.000000 0.851737 ... 1.128848 0.900461 0.000000 0.909753 1.108330 0.985692 0.951331 0.803252 0.865924 0.780118
3 1.105009 0.321356 1.522401 0.882808 -1.205349 0.681466 -1.070464 -0.921871 0.000000 0.800872 ... -0.373566 0.113041 0.000000 0.755856 1.361057 0.986610 0.838085 1.133295 0.872245 0.808487
10 0.739357 -0.178290 0.829934 0.504539 -0.130217 0.961051 -0.355518 -1.717399 2.173076 0.620956 ... 0.774065 0.398820 3.101961 0.944536 1.026261 0.982197 0.542115 1.250979 0.830045 0.761308
11 1.384098 0.116822 -1.179879 0.762913 -0.079782 1.019863 0.877318 1.276887 2.173076 0.331252 ... 0.846521 0.504809 3.101961 0.959325 0.807376 1.191814 1.221210 0.861141 0.929341 0.838302

5 rows × 28 columns

y_train.head()
Out[14]: 0     1.0
1     1.0
3     1.0
10    0.0
11    1.0
Name: label, dtype: float64

Model training#

There are two things to notice here. Firstly, we specify the number of rounds to trigger early stopping for training. XGBoost will stop the training process once the validation metric fails to improve in consecutive X rounds, where X is the number of rounds specified for early stopping.

Secondly, we use a data type called DaskDeviceQuantileDMatrix for training but DaskDMatrix for validation. DaskDeviceQuantileDMatrix is a drop-in replacement of DaskDMatrix for GPU-based training inputs that avoids extra data copies.

def fit_model_es(client, X, y, X_valid, y_valid) -> dxgb.Booster:
    early_stopping_rounds = 5
    Xy = dxgb.DaskDeviceQuantileDMatrix(client, X, y)
    Xy_valid = dxgb.DaskDMatrix(client, X_valid, y_valid)
    # train the model
    booster = dxgb.train(
        client,
        {
            "objective": "binary:logistic",
            "eval_metric": "error",
            "tree_method": "gpu_hist",
        },
        Xy,
        evals=[(Xy_valid, "Valid")],
        num_boost_round=1000,
        early_stopping_rounds=early_stopping_rounds,
    )["booster"]
    return booster
booster = fit_model_es(client, X=X_train, y=y_train, X_valid=X_valid, y_valid=y_valid)
booster
/databricks/python/lib/python3.10/site-packages/xgboost/dask.py:703: FutureWarning: Please use `DaskQuantileDMatrix` instead.
  warnings.warn("Please use `DaskQuantileDMatrix` instead.", FutureWarning)
Out[16]: <xgboost.core.Booster at 0x7f7c5702c4c0>

Train with Customized objective and evaluation metric#

In the example below the XGBoost model is trained using a custom logistic regression-based objective function (logit) and a custom evaluation metric (error) along with early stopping.

Note that the function returns both gradient and hessian, which XGBoost uses to optimize the model. Also, the parameter named metric_name needs to be specified in our callback. It is used to inform XGBoost that the custom error function should be used for evaluating early stopping criteria.

def fit_model_customized_objective(client, X, y, X_valid, y_valid) -> dxgb.Booster:
    def logit(predt: np.ndarray, Xy: xgb.DMatrix) -> Tuple[np.ndarray, np.ndarray]:
        predt = 1.0 / (1.0 + np.exp(-predt))
        labels = Xy.get_label()
        grad = predt - labels
        hess = predt * (1.0 - predt)
        return grad, hess

    def error(predt: np.ndarray, Xy: xgb.DMatrix) -> Tuple[str, float]:
        label = Xy.get_label()
        r = np.zeros(predt.shape)
        predt = 1.0 / (1.0 + np.exp(-predt))
        gt = predt > 0.5
        r[gt] = 1 - label[gt]
        le = predt <= 0.5
        r[le] = label[le]
        return "CustomErr", float(np.average(r))

    # Use early stopping with custom objective and metric.
    early_stopping_rounds = 5
    # Specify the metric we want to use for early stopping.
    es = xgb.callback.EarlyStopping(
        rounds=early_stopping_rounds, save_best=True, metric_name="CustomErr"
    )

    Xy = dxgb.DaskDeviceQuantileDMatrix(client, X, y)
    Xy_valid = dxgb.DaskDMatrix(client, X_valid, y_valid)
    booster = dxgb.train(
        client,
        {"eval_metric": "error", "tree_method": "gpu_hist"},
        Xy,
        evals=[(Xy_valid, "Valid")],
        num_boost_round=1000,
        obj=logit,  # pass the custom objective
        feval=error,  # pass the custom metric
        callbacks=[es],
    )["booster"]
    return booster
booster_custom = fit_model_customized_objective(
    client, X=X_train, y=y_train, X_valid=X_valid, y_valid=y_valid
)
booster_custom
/databricks/python/lib/python3.10/site-packages/xgboost/dask.py:703: FutureWarning: Please use `DaskQuantileDMatrix` instead.
  warnings.warn("Please use `DaskQuantileDMatrix` instead.", FutureWarning)
Out[18]: <xgboost.core.Booster at 0x7f7c5702cd30>

Running inference#

After some tuning, we arrive at the final model for performing inference on new data.

def predict(client, model, X):
    predt = dxgb.predict(client, model, X)
    return predt
preds = predict(client, booster, X_train)
preds.head()
Out[20]: 0     0.843650
1     0.975618
3     0.378462
10    0.293985
11    0.966303
Name: 0, dtype: float32

Clean up#

When finished, be sure to destroy your cluster to avoid incurring extra costs for idle resources.

Note If you forget to destroy the cluster manually, it’s important to note that Databricks clusters will automatically time out after a period (specified during cluster creation).

client.close()