Training XGBoost with Dask RAPIDS in Databricks#
This notebook shows how to deploy Dask RAPIDS workflow in Databricks. We will focus on the HIGGS dataset, a moderately sized classification problem from the UCI Machine Learning repository.
In the following sections, we will begin by loading the dataset from Delta Lake and performing preprocessing with Dask. Then train an XGBoost model with various configurations and explore techniques for optimizing inference.
Launch multi-node Dask Cluster#
This workflow example can be ran on GPU, and you don’t even need to have the GPU locally since Databricks can provide one for you. Whereas Dask enables users to easily distribute or scale up computation tasks within a single GPU or across multiple GPUs.
Dask recently introduced dask-databricks (available via conda and pip). With this CLI tool, the dask databricks run --cuda
command will launch a Dask scheduler in the driver node and cuda
workers in the remaining nodes.
From a high level, we could break down this section into the following steps:
Create a new init script that installs RAPIDS and runs
dask-databricks
Create a new multi-node cluster that uses the init script
Once the cluster is running upload this notebook to Databricks and continue running these cells on there
See Documentation
For more detailed information on launching Dask-RAPIDS in Databricks see the documentation.
Import packages#
Once your cluster has launched, start by importing all necessary libraries and dependencies.
import os
from typing import Tuple
import dask_cudf
import dask_databricks
import dask_deltatable as ddt
import numpy as np
import xgboost as xgb
from dask_ml.model_selection import train_test_split
from distributed import wait
from xgboost import dask as dxgb
Connect to Dask Client#
Connect to the client (and optionally Dashboard) to submit tasks.
client = dask_databricks.get_client()
client
Client
Client-23114b4f-b7aa-11ee-87d9-9a67d50005f3
Connection method: Cluster object | Cluster type: dask_databricks.DatabricksCluster |
Dashboard: https://dbc-dp-8721196619973675.cloud.databricks.com/driver-proxy/o/8721196619973675/1031-230718-l2ubf858/8087/status |
Cluster Info
DatabricksCluster
1031-230718-l2ubf858
Dashboard: https://dbc-dp-8721196619973675.cloud.databricks.com/driver-proxy/o/8721196619973675/1031-230718-l2ubf858/8087/status | Workers: 2 |
Total threads: 2 | Total memory: 30.65 GiB |
Scheduler Info
Scheduler
Scheduler-f908617a-76cd-4f5b-8fc9-fb04a02e0c99
Comm: tcp://10.59.146.44:8786 | Workers: 2 |
Dashboard: http://10.59.146.44:8087/status | Total threads: 2 |
Started: 22 minutes ago | Total memory: 30.65 GiB |
Workers
Worker: tcp://10.59.135.19:33999
Comm: tcp://10.59.135.19:33999 | Total threads: 1 |
Dashboard: http://10.59.135.19:35075/status | Memory: 15.33 GiB |
Nanny: tcp://10.59.135.19:41477 | |
Local directory: /tmp/dask-scratch-space/worker-639byx42 | |
GPU: Tesla T4 | GPU memory: 15.00 GiB |
Worker: tcp://10.59.155.0:45293
Comm: tcp://10.59.155.0:45293 | Total threads: 1 |
Dashboard: http://10.59.155.0:44287/status | Memory: 15.33 GiB |
Nanny: tcp://10.59.155.0:35699 | |
Local directory: /tmp/dask-scratch-space/worker-i0pmkkyv | |
GPU: Tesla T4 | GPU memory: 15.00 GiB |
Download dataset#
First we download the dataset to Databrick File Storage (DBFS). Alternatively, you could also use cloud storage (S3, Google Cloud, Azure Data Lake Refer to docs for more information
import subprocess
# Define the directory and file paths
directory_path = "/dbfs/databricks/rapids"
file_path = f"{directory_path}/HIGGS.csv.gz"
# Check if directory already exists
if not os.path.exists(directory_path):
os.makedirs(directory_path)
# Check if the file already exists
if not os.path.exists(file_path):
# If not, download dataset to the directory
data_url = (
"https://archive.ics.uci.edu/ml/machine-learning-databases/00280/HIGGS.csv.gz"
)
download_command = f"curl {data_url} --output {file_path}"
subprocess.run(download_command, shell=True)
# decompress the csv file
decompress_command = f"gunzip {file_path}"
subprocess.run(decompress_command, shell=True)
Next we load the data into GPUs. Because the data is loaded multiple times during parameter tuning, we convert the original CSV file into Parquet format for better performance. This can be easily done using delta lake as shown in the next steps.
Integrating Dask and Delta Lake#
Delta Lake is an optimized storage layer within the Databricks lakehouse that provides a foundational platform for storing data and tables. This open-source software extends Parquet data files by incorporating a file-based transaction log to support ACID transactions and scalable metadata handling.
Delta Lake is the default storage format for all operations on Databricks, i.e (unless otherwise specified, all tables on Databricks are Delta tables). Check out tutorial for examples with basic Delta Lake operations.
Let’s explore step-by-step how we can leverage Delta Lake tables with Dask to accelerate data pre-processing with RAPIDS.
Read from Delta table with Dask#
With Dask’s dask-deltatable, we can write the .csv
file into a Delta table using Spark then read and parallelize with Dask.
delta_table_name = "higgs_delta_table"
# Check if the Delta table already exists
if spark.catalog.tableExists(delta_table_name):
# If it exists, print a message
print(f"The Delta table '{delta_table_name}' already exists.")
else:
# If not, Load csv file into a Spark dataframe then
# Write the spark dataframe into delta table
data = spark.read.csv(file_path, header=True, inferSchema=True)
data.write.saveAsTable(delta_table_name)
print(f"The Delta table '{delta_table_name}' has been created.")
The Delta table 'higgs_delta_table' already exists.
display(spark.sql("DESCRIBE DETAIL higgs_delta_table"))
format | id | name | description | location | createdAt | lastModified | partitionColumns | numFiles | sizeInBytes | properties | minReaderVersion | minWriterVersion | tableFeatures | statistics |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
delta | 90cdac79-5500-4a20-914b-47f86b616275 | spark_catalog.default.higgs_delta_table | null | dbfs:/user/hive/warehouse/higgs_delta_table | 2024-01-09T15:01:35.629+0000 | 2024-01-09T15:04:37.000+0000 | List() | 60 | 906326187 | Map() | 1 | 2 | List(appendOnly, invariants) | Map() |
Calling dask_deltalake.read_deltalake()
will return a dask dataframe
. However, our objective is to utilize GPU acceleration for the entire ML pipeline, including data processing, model training and inference. For this reason, we will read the dask dataframe into a cuDF dask-dataframe
using dask_cudf.from_dask_dataframe()
Note that these operations will automatically leverage the Dask client we created, ensuring optimal performance boost through parallelism with dask.
# Read the Delta Lake into a Dask DataFrame using `dask-deltatable`
df = ddt.read_deltalake("/dbfs/user/hive/warehouse/higgs_delta_table")
# Convert Dask DataFrame to Dask cuDF for GPU acceleration
ddf = dask_cudf.from_dask_dataframe(df)
ddf.head()
1.000000000000000000e+00 | 8.692932128906250000e-01 | -6.350818276405334473e-01 | 2.256902605295181274e-01 | 3.274700641632080078e-01 | -6.899932026863098145e-01 | 7.542022466659545898e-01 | -2.485731393098831177e-01 | -1.092063903808593750e+00 | 0.000000000000000000e+009 | ... | -1.045456994324922562e-02 | -4.576716944575309753e-02 | 3.101961374282836914e+00 | 1.353760004043579102e+00 | 9.795631170272827148e-01 | 9.780761599540710449e-01 | 9.200048446655273438e-01 | 7.216574549674987793e-01 | 9.887509346008300781e-01 | 8.766783475875854492e-01 | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 1.0 | 0.907542 | 0.329147 | 0.359412 | 1.497970 | -0.313010 | 1.095531 | -0.557525 | -1.588230 | 2.173076 | ... | -1.138930 | -0.000819 | 0.000000 | 0.302220 | 0.833048 | 0.985700 | 0.978098 | 0.779732 | 0.992356 | 0.798343 |
1 | 1.0 | 0.798835 | 1.470639 | -1.635975 | 0.453773 | 0.425629 | 1.104875 | 1.282322 | 1.381664 | 0.000000 | ... | 1.128848 | 0.900461 | 0.000000 | 0.909753 | 1.108330 | 0.985692 | 0.951331 | 0.803252 | 0.865924 | 0.780118 |
2 | 0.0 | 1.344385 | -0.876626 | 0.935913 | 1.992050 | 0.882454 | 1.786066 | -1.646778 | -0.942383 | 0.000000 | ... | -0.678379 | -1.360356 | 0.000000 | 0.946652 | 1.028704 | 0.998656 | 0.728281 | 0.869200 | 1.026736 | 0.957904 |
3 | 1.0 | 1.105009 | 0.321356 | 1.522401 | 0.882808 | -1.205349 | 0.681466 | -1.070464 | -0.921871 | 0.000000 | ... | -0.373566 | 0.113041 | 0.000000 | 0.755856 | 1.361057 | 0.986610 | 0.838085 | 1.133295 | 0.872245 | 0.808487 |
4 | 0.0 | 1.595839 | -0.607811 | 0.007075 | 1.818450 | -0.111906 | 0.847550 | -0.566437 | 1.581239 | 2.173076 | ... | -0.654227 | -1.274345 | 3.101961 | 0.823761 | 0.938191 | 0.971758 | 0.789176 | 0.430553 | 0.961357 | 0.957818 |
5 rows × 29 columns
colnames = ["label"] + ["feature-%02d" % i for i in range(1, 29)]
ddf.columns = colnames
ddf.head()
label | feature-01 | feature-02 | feature-03 | feature-04 | feature-05 | feature-06 | feature-07 | feature-08 | feature-09 | ... | feature-19 | feature-20 | feature-21 | feature-22 | feature-23 | feature-24 | feature-25 | feature-26 | feature-27 | feature-28 | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 1.0 | 0.907542 | 0.329147 | 0.359412 | 1.497970 | -0.313010 | 1.095531 | -0.557525 | -1.588230 | 2.173076 | ... | -1.138930 | -0.000819 | 0.000000 | 0.302220 | 0.833048 | 0.985700 | 0.978098 | 0.779732 | 0.992356 | 0.798343 |
1 | 1.0 | 0.798835 | 1.470639 | -1.635975 | 0.453773 | 0.425629 | 1.104875 | 1.282322 | 1.381664 | 0.000000 | ... | 1.128848 | 0.900461 | 0.000000 | 0.909753 | 1.108330 | 0.985692 | 0.951331 | 0.803252 | 0.865924 | 0.780118 |
2 | 0.0 | 1.344385 | -0.876626 | 0.935913 | 1.992050 | 0.882454 | 1.786066 | -1.646778 | -0.942383 | 0.000000 | ... | -0.678379 | -1.360356 | 0.000000 | 0.946652 | 1.028704 | 0.998656 | 0.728281 | 0.869200 | 1.026736 | 0.957904 |
3 | 1.0 | 1.105009 | 0.321356 | 1.522401 | 0.882808 | -1.205349 | 0.681466 | -1.070464 | -0.921871 | 0.000000 | ... | -0.373566 | 0.113041 | 0.000000 | 0.755856 | 1.361057 | 0.986610 | 0.838085 | 1.133295 | 0.872245 | 0.808487 |
4 | 0.0 | 1.595839 | -0.607811 | 0.007075 | 1.818450 | -0.111906 | 0.847550 | -0.566437 | 1.581239 | 2.173076 | ... | -0.654227 | -1.274345 | 3.101961 | 0.823761 | 0.938191 | 0.971758 | 0.789176 | 0.430553 | 0.961357 | 0.957818 |
5 rows × 29 columns
Split data#
In the preceding step, we used dask-cudf
for loading data from the Delta table, now use train_test_split()
function from dask-ml
to split up the dataset.
Most of the time, the GPU backend of Dask works seamlessly with utilities in dask-ml
and we can accelerate the entire ML pipeline as such:
def load_higgs(
ddf,
) -> Tuple[
dask_cudf.core.DataFrame,
dask_cudf.core.Series,
dask_cudf.core.DataFrame,
dask_cudf.core.Series,
]:
y = ddf["label"]
X = ddf[ddf.columns.difference(["label"])]
X_train, X_valid, y_train, y_valid = train_test_split(
X, y, test_size=0.33, random_state=42
)
X_train, X_valid, y_train, y_valid = client.persist(
[X_train, X_valid, y_train, y_valid]
)
wait([X_train, X_valid, y_train, y_valid])
return X_train, X_valid, y_train, y_valid
X_train, X_valid, y_train, y_valid = load_higgs(ddf)
/databricks/python/lib/python3.10/site-packages/dask_ml/model_selection/_split.py:462: FutureWarning: The default value for 'shuffle' must be specified when splitting DataFrames. In the future DataFrames will automatically be shuffled within blocks prior to splitting. Specify 'shuffle=True' to adopt the future behavior now, or 'shuffle=False' to retain the previous behavior.
warnings.warn(
X_train.head()
feature-01 | feature-02 | feature-03 | feature-04 | feature-05 | feature-06 | feature-07 | feature-08 | feature-09 | feature-10 | ... | feature-19 | feature-20 | feature-21 | feature-22 | feature-23 | feature-24 | feature-25 | feature-26 | feature-27 | feature-28 | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 0.907542 | 0.329147 | 0.359412 | 1.497970 | -0.313010 | 1.095531 | -0.557525 | -1.588230 | 2.173076 | 0.812581 | ... | -1.138930 | -0.000819 | 0.000000 | 0.302220 | 0.833048 | 0.985700 | 0.978098 | 0.779732 | 0.992356 | 0.798343 |
1 | 0.798835 | 1.470639 | -1.635975 | 0.453773 | 0.425629 | 1.104875 | 1.282322 | 1.381664 | 0.000000 | 0.851737 | ... | 1.128848 | 0.900461 | 0.000000 | 0.909753 | 1.108330 | 0.985692 | 0.951331 | 0.803252 | 0.865924 | 0.780118 |
3 | 1.105009 | 0.321356 | 1.522401 | 0.882808 | -1.205349 | 0.681466 | -1.070464 | -0.921871 | 0.000000 | 0.800872 | ... | -0.373566 | 0.113041 | 0.000000 | 0.755856 | 1.361057 | 0.986610 | 0.838085 | 1.133295 | 0.872245 | 0.808487 |
10 | 0.739357 | -0.178290 | 0.829934 | 0.504539 | -0.130217 | 0.961051 | -0.355518 | -1.717399 | 2.173076 | 0.620956 | ... | 0.774065 | 0.398820 | 3.101961 | 0.944536 | 1.026261 | 0.982197 | 0.542115 | 1.250979 | 0.830045 | 0.761308 |
11 | 1.384098 | 0.116822 | -1.179879 | 0.762913 | -0.079782 | 1.019863 | 0.877318 | 1.276887 | 2.173076 | 0.331252 | ... | 0.846521 | 0.504809 | 3.101961 | 0.959325 | 0.807376 | 1.191814 | 1.221210 | 0.861141 | 0.929341 | 0.838302 |
5 rows × 28 columns
y_train.head()
Out[14]: 0 1.0
1 1.0
3 1.0
10 0.0
11 1.0
Name: label, dtype: float64
Model training#
There are two things to notice here. Firstly, we specify the number of rounds to trigger early stopping for training. XGBoost will stop the training process once the validation metric fails to improve in consecutive X rounds, where X is the number of rounds specified for early stopping.
Secondly, we use a data type called DaskDeviceQuantileDMatrix
for training but DaskDMatrix
for validation. DaskDeviceQuantileDMatrix
is a drop-in replacement of DaskDMatrix
for GPU-based training inputs that avoids extra data copies.
def fit_model_es(client, X, y, X_valid, y_valid) -> dxgb.Booster:
early_stopping_rounds = 5
Xy = dxgb.DaskDeviceQuantileDMatrix(client, X, y)
Xy_valid = dxgb.DaskDMatrix(client, X_valid, y_valid)
# train the model
booster = dxgb.train(
client,
{
"objective": "binary:logistic",
"eval_metric": "error",
"tree_method": "gpu_hist",
},
Xy,
evals=[(Xy_valid, "Valid")],
num_boost_round=1000,
early_stopping_rounds=early_stopping_rounds,
)["booster"]
return booster
booster = fit_model_es(client, X=X_train, y=y_train, X_valid=X_valid, y_valid=y_valid)
booster
/databricks/python/lib/python3.10/site-packages/xgboost/dask.py:703: FutureWarning: Please use `DaskQuantileDMatrix` instead.
warnings.warn("Please use `DaskQuantileDMatrix` instead.", FutureWarning)
Out[16]: <xgboost.core.Booster at 0x7f7c5702c4c0>
Train with Customized objective and evaluation metric#
In the example below the XGBoost model is trained using a custom logistic regression-based objective function (logit
) and a custom evaluation metric (error
) along with early stopping.
Note that the function returns both gradient and hessian, which XGBoost uses to optimize the model. Also, the parameter named metric_name
needs to be specified in our callback. It is used to inform XGBoost that the custom error function should be used for evaluating early stopping criteria.
def fit_model_customized_objective(client, X, y, X_valid, y_valid) -> dxgb.Booster:
def logit(predt: np.ndarray, Xy: xgb.DMatrix) -> Tuple[np.ndarray, np.ndarray]:
predt = 1.0 / (1.0 + np.exp(-predt))
labels = Xy.get_label()
grad = predt - labels
hess = predt * (1.0 - predt)
return grad, hess
def error(predt: np.ndarray, Xy: xgb.DMatrix) -> Tuple[str, float]:
label = Xy.get_label()
r = np.zeros(predt.shape)
predt = 1.0 / (1.0 + np.exp(-predt))
gt = predt > 0.5
r[gt] = 1 - label[gt]
le = predt <= 0.5
r[le] = label[le]
return "CustomErr", float(np.average(r))
# Use early stopping with custom objective and metric.
early_stopping_rounds = 5
# Specify the metric we want to use for early stopping.
es = xgb.callback.EarlyStopping(
rounds=early_stopping_rounds, save_best=True, metric_name="CustomErr"
)
Xy = dxgb.DaskDeviceQuantileDMatrix(client, X, y)
Xy_valid = dxgb.DaskDMatrix(client, X_valid, y_valid)
booster = dxgb.train(
client,
{"eval_metric": "error", "tree_method": "gpu_hist"},
Xy,
evals=[(Xy_valid, "Valid")],
num_boost_round=1000,
obj=logit, # pass the custom objective
feval=error, # pass the custom metric
callbacks=[es],
)["booster"]
return booster
booster_custom = fit_model_customized_objective(
client, X=X_train, y=y_train, X_valid=X_valid, y_valid=y_valid
)
booster_custom
/databricks/python/lib/python3.10/site-packages/xgboost/dask.py:703: FutureWarning: Please use `DaskQuantileDMatrix` instead.
warnings.warn("Please use `DaskQuantileDMatrix` instead.", FutureWarning)
Out[18]: <xgboost.core.Booster at 0x7f7c5702cd30>
Running inference#
After some tuning, we arrive at the final model for performing inference on new data.
def predict(client, model, X):
predt = dxgb.predict(client, model, X)
return predt
preds = predict(client, booster, X_train)
preds.head()
Out[20]: 0 0.843650
1 0.975618
3 0.378462
10 0.293985
11 0.966303
Name: 0, dtype: float32
Clean up#
When finished, be sure to destroy your cluster to avoid incurring extra costs for idle resources.
Note If you forget to destroy the cluster manually, it’s important to note that Databricks clusters will automatically time out after a period (specified during cluster creation).
client.close()