# Scaling up hyperparameter optimization with multi-GPU workload on Kubernetes

Choosing an optimal set of hyperparameters is a daunting task, especially for algorithms like XGBoost that have many hyperparameters to tune. In this notebook, we will speed up hyperparameter optimization by running multiple training jobs in parallel on a Kubernetes cluster. We handle larger data sets by splitting the data into multiple GPU devices.

## Prerequisites
Please follow instructions in [Dask Operator: Installation](../../tools/kubernetes/dask-operator) to install the Dask operator on top of a GPU-enabled Kubernetes cluster. (For the purpose of this example, you may ignore other sections of the linked document.

### Optional: Kubeflow
Kubeflow gives you a nice notebook environment to run this notebook within the k8s cluster. Install Kubeflow by following instructions in [Installing Kubeflow](https://www.kubeflow.org/docs/started/installing-kubeflow/). You may choose any method; we tested this example after installing Kubeflow from manifests.

## Install extra Python modules
We'll need a few extra Python modules.

In [1]:
!pip install dask_kubernetes optuna

Collecting dask_kubernetes
  Downloading dask_kubernetes-2024.5.0-py3-none-any.whl.metadata (4.2 kB)
Collecting optuna
  Downloading optuna-3.6.1-py3-none-any.whl.metadata (17 kB)
Collecting kopf>=1.35.3 (from dask_kubernetes)
  Downloading kopf-1.37.2-py3-none-any.whl.metadata (9.7 kB)
Collecting kr8s==0.14.* (from dask_kubernetes)
  Downloading kr8s-0.14.4-py3-none-any.whl.metadata (6.7 kB)
Collecting kubernetes-asyncio>=12.0.1 (from dask_kubernetes)
  Downloading kubernetes_asyncio-29.0.0-py3-none-any.whl.metadata (1.3 kB)
Collecting kubernetes>=12.0.1 (from dask_kubernetes)
  Downloading kubernetes-29.0.0-py2.py3-none-any.whl.metadata (1.5 kB)
Collecting pykube-ng>=22.9.0 (from dask_kubernetes)
  Downloading pykube_ng-23.6.0-py3-none-any.whl.metadata (8.0 kB)
Collecting asyncache>=0.3.1 (from kr8s==0.14.*->dask_kubernetes)
  Downloading asyncache-0.3.1-py3-none-any.whl.metadata (2.0 kB)
Collecting cryptography>=35 (from kr8s==0.14.*->dask_kubernetes)
  Downloading cryptography-42.0

## Import Python modules

In [10]:
import threading
import warnings

import cupy as cp
import cuspatial
import dask_cudf
import optuna
from cuml.dask.common import utils as dask_utils
from dask.distributed import Client, wait
from dask_kubernetes.operator import KubeCluster
from dask_ml.metrics import mean_squared_error
from dask_ml.model_selection import KFold
from xgboost import dask as dxgb

## Set up multiple Dask clusters

To run multi-GPU training jobs in parallel, we will create multiple Dask clusters each controlling its share of GPUs. It's best to think of each Dask cluster as a portion of the compute resource of the Kubernetes cluster.

Fill in the following variables:

In [29]:
# Number of nodes in the Kubernetes cluster.
# Each node is assumed to have a single NVIDIA GPU attached
n_nodes = 7

# Number of worker nodes to be assigned to each Dask cluster
n_worker_per_dask_cluster = 2

# Number of nodes to be assigned to each Dask cluster
# 1 is added since the Dask cluster's scheduler process needs to be mapped to its own node
n_node_per_dask_cluster = n_worker_per_dask_cluster + 1

# Number of Dask clusters to be created
# Subtract 1 to account for the notebook pod (it requires its own node)
n_clusters = (n_nodes - 1) // n_node_per_dask_cluster

print(f"{n_clusters=}")
if n_clusters == 0:
    raise ValueError(
        "No cluster can be created. Reduce `n_worker_per_dask_cluster` or create more compute nodes"
    )
print(f"{n_worker_per_dask_cluster=}")
print(f"{n_node_per_dask_cluster=}")

n_node_active = n_clusters * n_node_per_dask_cluster + 1
if n_node_active != n_nodes:
    n_idle = n_nodes - n_node_active
    warnings.warn(f"{n_idle} node(s) will not be used", stacklevel=2)

n_clusters=2
n_worker_per_dask_cluster=2
n_node_per_dask_cluster=3


Once we've determined the number of Dask clusters and their size, we are now ready to launch them:

In [30]:
# Choose the same RAPIDS image you used for launching the notebook session
rapids_image = ""

In [33]:
clusters = []
for i in range(n_clusters):
    print(f"Launching cluster {i}...")
    clusters.append(
        KubeCluster(
            name=f"rapids-dask{i}",
            image=rapids_image,
            worker_command="dask-cuda-worker",
            n_workers=2,
            resources={"limits": {"nvidia.com/gpu": "1"}},
            env={"EXTRA_PIP_PACKAGES": "optuna"},
        )
    )

Output()

Launching cluster 0...


Output()

Launching cluster 1...


## Set up Hyperparameter Optimization Task with NYC Taxi data

Anaconda has graciously made some of the NYC Taxi dataset available in a public Google Cloud Storage bucket. We'll use our Cluster of GPUs to process it and train a model that predicts the fare amount. We'll use our Dask clusters to process it and train a model that predicts the fare amount.

In [34]:
col_dtype = {
    "VendorID": "int32",
    "tpep_pickup_datetime": "datetime64[ms]",
    "tpep_dropoff_datetime": "datetime64[ms]",
    "passenger_count": "int32",
    "trip_distance": "float32",
    "pickup_longitude": "float32",
    "pickup_latitude": "float32",
    "RatecodeID": "int32",
    "store_and_fwd_flag": "int32",
    "dropoff_longitude": "float32",
    "dropoff_latitude": "float32",
    "payment_type": "int32",
    "fare_amount": "float32",
    "extra": "float32",
    "mta_tax": "float32",
    "tip_amount": "float32",
    "total_amount": "float32",
    "tolls_amount": "float32",
    "improvement_surcharge": "float32",
}


must_haves = {
    "pickup_datetime": "datetime64[ms]",
    "dropoff_datetime": "datetime64[ms]",
    "passenger_count": "int32",
    "trip_distance": "float32",
    "pickup_longitude": "float32",
    "pickup_latitude": "float32",
    "rate_code": "int32",
    "dropoff_longitude": "float32",
    "dropoff_latitude": "float32",
    "fare_amount": "float32",
}


def compute_haversine_distance(df):
    pickup = cuspatial.GeoSeries.from_points_xy(
        df[["pickup_longitude", "pickup_latitude"]].interleave_columns()
    )
    dropoff = cuspatial.GeoSeries.from_points_xy(
        df[["dropoff_longitude", "dropoff_latitude"]].interleave_columns()
    )
    df["haversine_distance"] = cuspatial.haversine_distance(pickup, dropoff)
    df["haversine_distance"] = df["haversine_distance"].astype("float32")
    return df


def clean(ddf, must_haves):
    # replace the extraneous spaces in column names and lower the font type
    tmp = {col: col.strip().lower() for col in list(ddf.columns)}
    ddf = ddf.rename(columns=tmp)

    ddf = ddf.rename(
        columns={
            "tpep_pickup_datetime": "pickup_datetime",
            "tpep_dropoff_datetime": "dropoff_datetime",
            "ratecodeid": "rate_code",
        }
    )

    ddf["pickup_datetime"] = ddf["pickup_datetime"].astype("datetime64[ms]")
    ddf["dropoff_datetime"] = ddf["dropoff_datetime"].astype("datetime64[ms]")

    for col in ddf.columns:
        if col not in must_haves:
            ddf = ddf.drop(columns=col)
            continue
        if ddf[col].dtype == "object":
            # Fixing error: could not convert arg to str
            ddf = ddf.drop(columns=col)
        else:
            # downcast from 64bit to 32bit types
            # Tesla T4 are faster on 32bit ops
            if "int" in str(ddf[col].dtype):
                ddf[col] = ddf[col].astype("int32")
            if "float" in str(ddf[col].dtype):
                ddf[col] = ddf[col].astype("float32")
            ddf[col] = ddf[col].fillna(-1)

    return ddf


def prepare_data(client):
    taxi_df = dask_cudf.read_csv(
        "https://storage.googleapis.com/anaconda-public-data/nyc-taxi/csv/2016/yellow_tripdata_2016-02.csv",
        dtype=col_dtype,
    )
    taxi_df = taxi_df.map_partitions(clean, must_haves, meta=must_haves)

    ## add features
    taxi_df["hour"] = taxi_df["pickup_datetime"].dt.hour.astype("int32")
    taxi_df["year"] = taxi_df["pickup_datetime"].dt.year.astype("int32")
    taxi_df["month"] = taxi_df["pickup_datetime"].dt.month.astype("int32")
    taxi_df["day"] = taxi_df["pickup_datetime"].dt.day.astype("int32")
    taxi_df["day_of_week"] = taxi_df["pickup_datetime"].dt.weekday.astype("int32")
    taxi_df["is_weekend"] = (taxi_df["day_of_week"] >= 5).astype("int32")

    # calculate the time difference between dropoff and pickup.
    taxi_df["diff"] = taxi_df["dropoff_datetime"].astype("int32") - taxi_df[
        "pickup_datetime"
    ].astype("int32")
    taxi_df["diff"] = (taxi_df["diff"] / 1000).astype("int32")

    taxi_df["pickup_latitude_r"] = taxi_df["pickup_latitude"] // 0.01 * 0.01
    taxi_df["pickup_longitude_r"] = taxi_df["pickup_longitude"] // 0.01 * 0.01
    taxi_df["dropoff_latitude_r"] = taxi_df["dropoff_latitude"] // 0.01 * 0.01
    taxi_df["dropoff_longitude_r"] = taxi_df["dropoff_longitude"] // 0.01 * 0.01

    taxi_df = taxi_df.drop("pickup_datetime", axis=1)
    taxi_df = taxi_df.drop("dropoff_datetime", axis=1)

    taxi_df = taxi_df.map_partitions(compute_haversine_distance)

    X = (
        taxi_df.drop(["fare_amount"], axis=1)
        .astype("float32")
        .to_dask_array(lengths=True)
    )
    y = taxi_df["fare_amount"].astype("float32").to_dask_array(lengths=True)

    X._meta = cp.asarray(X._meta)
    y._meta = cp.asarray(y._meta)

    X, y = dask_utils.persist_across_workers(client, [X, y])
    return X, y


def train_model(params):
    cluster = get_cluster(threading.get_ident())

    default_params = {
        "objective": "reg:squarederror",
        "eval_metric": "rmse",
        "verbosity": 0,
        "tree_method": "hist",
        "device": "cuda",
    }
    params = dict(default_params, **params)

    with Client(cluster) as client:
        X, y = prepare_data(client)
        wait([X, y])

        scores = []
        kfold = KFold(n_splits=5, shuffle=False)
        for train_index, test_index in kfold.split(X, y):
            dtrain = dxgb.DaskQuantileDMatrix(client, X[train_index, :], y[train_index])
            dtest = dxgb.DaskQuantileDMatrix(client, X[test_index, :], y[test_index])
            model = dxgb.train(
                client,
                params,
                dtrain,
                num_boost_round=10,
                verbose_eval=False,
            )
            y_test_pred = dxgb.predict(client, model, dtest).to_backend("cupy")
            rmse_score = mean_squared_error(y[test_index], y_test_pred, squared=False)
            scores.append(rmse_score)
        return sum(scores) / len(scores)


def objective(trial):
    params = {
        "n_estimators": trial.suggest_int("n_estimators", 2, 4),
        "learning_rate": trial.suggest_float("learning_rate", 0.5, 0.7),
        "colsample_bytree": trial.suggest_float("colsample_bytree", 0.5, 1),
        "colsample_bynode": trial.suggest_float("colsample_bynode", 0.5, 1),
        "colsample_bylevel": trial.suggest_float("colsample_bylevel", 0.5, 1),
        "reg_lambda": trial.suggest_float("reg_lambda", 0, 1),
        "max_depth": trial.suggest_int("max_depth", 1, 6),
        "max_leaves": trial.suggest_int("max_leaves", 0, 2),
        "max_cat_to_onehot": trial.suggest_int("max_cat_to_onehot", 1, 10),
    }
    return train_model(params)

To kick off multiple training jobs in parallel, we will launch multiple threads, so that each thread controls a Dask cluster.
One important utility function is `get_cluster`, which returns the Dask cluster that's mapped to a given thread.

In [35]:
# Map each thread's integer ID to a sequential number (0, 1, 2 ...)
thread_id_map: dict[int, KubeCluster] = {}
thread_id_map_lock = threading.Lock()


def get_cluster(thread_id: int) -> KubeCluster:
    with thread_id_map_lock:
        try:
            return clusters[thread_id_map[thread_id]]
        except KeyError:
            seq_id = len(thread_id_map)
            thread_id_map[thread_id] = seq_id
            return clusters[seq_id]

Now we are ready to start hyperparameter optimization.

In [36]:
n_trials = (
    10  # set to a low number so that the demo finishes quickly. Feel free to adjust
)
study = optuna.create_study(direction="minimize")

[I 2024-05-09 07:53:00,718] A new study created in memory with name: no-name-da830427-bce3-4e42-98e6-c98c0c3da0d7


In [37]:
# With n_jobs parameter, Optuna will launch [n_clusters] threads internally
# Each thread will deploy a training job to a Dask cluster
study.optimize(objective, n_trials=n_trials, n_jobs=n_clusters)

[I 2024-05-09 07:54:10,229] Trial 1 finished with value: 59.449462890625 and parameters: {'n_estimators': 4, 'learning_rate': 0.6399993857892183, 'colsample_bytree': 0.7020623988319513, 'colsample_bynode': 0.777468318546648, 'colsample_bylevel': 0.7890749134903386, 'reg_lambda': 0.4464953694744921, 'max_depth': 3, 'max_leaves': 0, 'max_cat_to_onehot': 9}. Best is trial 1 with value: 59.449462890625.
[I 2024-05-09 07:54:19,507] Trial 0 finished with value: 57.77985763549805 and parameters: {'n_estimators': 4, 'learning_rate': 0.674087333032356, 'colsample_bytree': 0.557642421113256, 'colsample_bynode': 0.9719449711676733, 'colsample_bylevel': 0.6984302171973646, 'reg_lambda': 0.7201514298169174, 'max_depth': 4, 'max_leaves': 1, 'max_cat_to_onehot': 4}. Best is trial 0 with value: 57.77985763549805.
[I 2024-05-09 07:54:59,524] Trial 2 finished with value: 57.77985763549805 and parameters: {'n_estimators': 2, 'learning_rate': 0.6894880267544121, 'colsample_bytree': 0.8171662437182604, 'co