Multi-node multi-GPU example on AWS using dask-cloudprovider#

Dask Cloud Provider is a native cloud integration for dask. It helps manage Dask clusters on different cloud platforms. In this notebook, we will look at how we can use the package to set-up a AWS cluster and run a multi-node multi-GPU (MNMG) example with RAPIDS. RAPIDS provides a suite of libraries to accelerate data science pipelines on the GPU entirely. This can be scaled to multiple nodes using Dask as we will see through this notebook.

Create your cluster#

Note

First follow the full instructions on launching a multi-node GPU cluster with Dask Cloud Provider.

Once you have a cluster object up and running head back here and continue.

from dask_cloudprovider.aws import EC2Cluster

cluster = EC2Cluster(...)

Client set up#

Now we can create a Dask Client with the cluster we just defined.

from dask.distributed import Client

client = Client(cluster)
client
Optionally: We can wait for all workers to be up and running.

We do so by adding:

# n_workers is the number of GPUs your cluster will have
client.wait_for_workers(n_workers)  

Machine Learning Workflow#

Once workers become available, we can now run the rest of our workflow:

  • read and clean the data

  • add features

  • split into training and validation sets

  • fit a Random Forest model

  • predict on the validation set

  • compute RMSE

Let’s import the rest of our dependencies.

import dask_cudf
import numpy as np
from cuml.dask.common import utils as dask_utils
from cuml.dask.ensemble import RandomForestRegressor
from cuml.metrics import mean_squared_error
from dask_ml.model_selection import train_test_split

1. Read and Clean Data#

The data needs to be cleaned up before it can be used in a meaningful way. We verify the columns have appropriate datatypes to make it ready for computation using cuML.

# create a list of all columns & dtypes the df must have for reading
col_dtype = {
    "VendorID": "int32",
    "tpep_pickup_datetime": "datetime64[ms]",
    "tpep_dropoff_datetime": "datetime64[ms]",
    "passenger_count": "int32",
    "trip_distance": "float32",
    "pickup_longitude": "float32",
    "pickup_latitude": "float32",
    "RatecodeID": "int32",
    "store_and_fwd_flag": "int32",
    "dropoff_longitude": "float32",
    "dropoff_latitude": "float32",
    "payment_type": "int32",
    "fare_amount": "float32",
    "extra": "float32",
    "mta_tax": "float32",
    "tip_amount": "float32",
    "total_amount": "float32",
    "tolls_amount": "float32",
    "improvement_surcharge": "float32",
}
taxi_df = dask_cudf.read_csv(
    "https://storage.googleapis.com/anaconda-public-data/nyc-taxi/csv/2016/yellow_tripdata_2016-02.csv",
    dtype=col_dtype,
)
# Dictionary of required columns and their datatypes
must_haves = {
    "pickup_datetime": "datetime64[ms]",
    "dropoff_datetime": "datetime64[ms]",
    "passenger_count": "int32",
    "trip_distance": "float32",
    "pickup_longitude": "float32",
    "pickup_latitude": "float32",
    "rate_code": "int32",
    "dropoff_longitude": "float32",
    "dropoff_latitude": "float32",
    "fare_amount": "float32",
}
def clean(ddf, must_haves):
    # replace the extraneous spaces in column names and lower the font type
    tmp = {col: col.strip().lower() for col in list(ddf.columns)}
    ddf = ddf.rename(columns=tmp)

    ddf = ddf.rename(
        columns={
            "tpep_pickup_datetime": "pickup_datetime",
            "tpep_dropoff_datetime": "dropoff_datetime",
            "ratecodeid": "rate_code",
        }
    )

    ddf["pickup_datetime"] = ddf["pickup_datetime"].astype("datetime64[ms]")
    ddf["dropoff_datetime"] = ddf["dropoff_datetime"].astype("datetime64[ms]")

    for col in ddf.columns:
        if col not in must_haves:
            ddf = ddf.drop(columns=col)
            continue
        if ddf[col].dtype == "object":
            # Fixing error: could not convert arg to str
            ddf = ddf.drop(columns=col)
        else:
            # downcast from 64bit to 32bit types
            # Tesla T4 are faster on 32bit ops
            if "int" in str(ddf[col].dtype):
                ddf[col] = ddf[col].astype("int32")
            if "float" in str(ddf[col].dtype):
                ddf[col] = ddf[col].astype("float32")
            ddf[col] = ddf[col].fillna(-1)

    return ddf
taxi_df = taxi_df.map_partitions(clean, must_haves, meta=must_haves)

2. Add Features#

We’ll add new features to the dataframe:

  1. We can split the datetime column to retrive year, month, day, hour, day_of_week columns. Find the difference between pickup time and drop off time.

  2. Haversine Distance between the pick-up and drop-off coordinates.

## add features

taxi_df["hour"] = taxi_df["pickup_datetime"].dt.hour.astype("int32")
taxi_df["year"] = taxi_df["pickup_datetime"].dt.year.astype("int32")
taxi_df["month"] = taxi_df["pickup_datetime"].dt.month.astype("int32")
taxi_df["day"] = taxi_df["pickup_datetime"].dt.day.astype("int32")
taxi_df["day_of_week"] = taxi_df["pickup_datetime"].dt.weekday.astype("int32")
taxi_df["is_weekend"] = (taxi_df["day_of_week"] >= 5).astype("int32")

# calculate the time difference between dropoff and pickup.
taxi_df["diff"] = taxi_df["dropoff_datetime"].astype("int32") - taxi_df[
    "pickup_datetime"
].astype("int32")
taxi_df["diff"] = (taxi_df["diff"] / 1000).astype("int32")

taxi_df["pickup_latitude_r"] = taxi_df["pickup_latitude"] // 0.01 * 0.01
taxi_df["pickup_longitude_r"] = taxi_df["pickup_longitude"] // 0.01 * 0.01
taxi_df["dropoff_latitude_r"] = taxi_df["dropoff_latitude"] // 0.01 * 0.01
taxi_df["dropoff_longitude_r"] = taxi_df["dropoff_longitude"] // 0.01 * 0.01

taxi_df = taxi_df.drop("pickup_datetime", axis=1)
taxi_df = taxi_df.drop("dropoff_datetime", axis=1)


def haversine_dist(df):
    import cuspatial

    pickup = cuspatial.GeoSeries.from_points_xy(
        df[["pickup_longitude", "pickup_latitude"]].interleave_columns()
    )
    dropoff = cuspatial.GeoSeries.from_points_xy(
        df[["dropoff_longitude", "dropoff_latitude"]].interleave_columns()
    )
    df["h_distance"] = cuspatial.haversine_distance(pickup, dropoff)
    df["h_distance"] = df["h_distance"].astype("float32")
    return df


taxi_df = taxi_df.map_partitions(haversine_dist)

3. Split Data#

# Split into training and validation sets
X, y = taxi_df.drop(["fare_amount"], axis=1).astype("float32"), taxi_df[
    "fare_amount"
].astype("float32")
X_train, X_test, y_train, y_test = train_test_split(X, y, shuffle=True)
workers = client.has_what().keys()
X_train, X_test, y_train, y_test = dask_utils.persist_across_workers(
    client, [X_train, X_test, y_train, y_test], workers=workers
)

4. Create and fit a Random Forest Model#

# create cuml.dask RF regressor
cu_dask_rf = RandomForestRegressor(ignore_empty_partitions=True)
# fit RF model
cu_dask_rf = cu_dask_rf.fit(X_train, y_train)

5. Predict on validation set#

# predict on validation set
y_pred = cu_dask_rf.predict(X_test)

6. Compute RMSE#

# compute RMSE
score = mean_squared_error(y_pred.compute().to_numpy(), y_test.compute().to_numpy())
print("Workflow Complete - RMSE: ", np.sqrt(score))

Resource Cleanup#

# Clean up resources
client.close()
cluster.close()

Learn More#