Dask Multi-GPU Guide#

This guide demonstrates how to use cuML estimators in multi-GPU and multi-node contexts using Dask. While cuML’s single-GPU implementations are highly optimized, distributed computing with Dask enables you to:

  • Scale beyond single GPU memory: Process datasets larger than what fits on a single GPU

  • Accelerate training: Distribute computation across multiple GPUs for faster model training

  • Handle production workloads: Deploy models that serve high-throughput prediction requests

cuML’s Dask integration uses a One Process Per GPU (OPG) architecture, where each Dask worker manages a single GPU. This design maximizes GPU utilization and simplifies memory management across the cluster.

Setup and Configuration#

Installing Dependencies#

To use cuML with Dask, you need to install additional dependencies. If you haven’t already, install them using conda or pip:

# Install with Conda:
conda install rapids-dask-dependency dask-cudf raft-dask

# Or install with pip (replace cu13 with your CUDA version):
pip install cuml-cu13[dask]

Setting Up a CUDA Cluster#

For single-node, multi-GPU execution, use LocalCUDACluster from dask-cuda. This automatically creates one worker per available GPU. For detailed information on configuring local CUDA clusters, including advanced networking options (UCX, InfiniBand) and multi-node cluster setup, see the RAPIDS dask-cuda documentation.

[1]:
from dask.distributed import Client
from dask_cuda import LocalCUDACluster

# Create a local cluster with one worker per GPU
cluster = LocalCUDACluster()
client = Client(cluster)

# Display cluster information
print(f"Cluster dashboard available at: {client.dashboard_link}")
Cluster dashboard available at: http://127.0.0.1:8787/status

Example 1: K-Means Clustering#

K-Means is one of the most commonly used clustering algorithms. cuML’s distributed implementation parallelizes the fit operation for each iteration, sharing only the centroids between iterations.

[2]:
from cuml.dask.cluster import KMeans
from cuml.dask.datasets import make_blobs
from cuml.metrics import adjusted_rand_score

# Get number of workers for data partitioning
n_workers = len(client.scheduler_info()['workers'])

# Generate distributed synthetic data
X, y = make_blobs(
    n_samples=10000,
    n_features=20,
    centers=5,
    cluster_std=0.5,
    random_state=42,
    n_parts=n_workers * 2  # Multiple partitions per worker
)

print(f"Generated data with {len(X.to_delayed())} partitions")
print(f"Data type: {type(X)}")

Generated data with 2 partitions
Data type: <class 'dask.array.core.Array'>

Now we can train a distributed K-Means model. The API is nearly identical to the single-GPU version:

[3]:
# Train distributed K-Means
kmeans = KMeans(n_clusters=5, random_state=42)
kmeans.fit(X)

# Make predictions
labels = kmeans.predict(X)

# Evaluate clustering quality
score = adjusted_rand_score(y.compute(), labels.compute())
print(f"Adjusted Rand Score: {score:.4f}")

# View cluster centers
print(f"\nCluster centers shape: {kmeans.cluster_centers_.shape}")
Adjusted Rand Score: 1.0000

Cluster centers shape: (5, 20)

Convert into a single-GPU model#

We can use the distributed model for inference directly (as shown above) or convert it back into a single-GPU version based on needs.

[4]:
# Extract single-GPU model from distributed model
combined_model = kmeans.get_combined_model()

combined_model.predict(X)

[4]:
array([3, 1, 0, ..., 1, 3, 3], shape=(10000,), dtype=int32)

Model Serialization#

Distributed models cannot be pickled directly. For pickling we need to first extract the single-GPU version as shown above and can then serialize as usual:

[5]:
import pickle


# Save the model
with open("kmeans_model.pkl", "wb") as f:
    pickle.dump(combined_model, f, protocol=5)

# Load and use the model
with open("kmeans_model.pkl", "rb") as f:
    loaded_model = pickle.load(f)

print(f"Loaded model type: {type(loaded_model)}")
print(f"Model has {loaded_model.n_clusters} clusters")

Loaded model type: <class 'cuml.cluster.kmeans_mg.KMeansMG'>
Model has 5 clusters

The loaded model is a single-GPU estimator that can be used for inference. For distributed inference across a Dask cluster, consider using Dask-ML’s ParallelPostFit meta-estimator.

Example 2: Random Forest Classification#

Random Forest is an ensemble learning method that builds multiple decision trees. cuML’s distributed implementation uses embarrassingly parallel training: for a forest with N trees trained by W workers, each worker builds N/W trees.

[6]:
from cuml.dask.ensemble import RandomForestClassifier
from cuml.dask.datasets import make_classification
from cuml.metrics import accuracy_score

# Generate classification dataset
n_samples = 5000
n_features = 30
n_classes = 3

X, y = make_classification(
    n_samples=n_samples,
    n_features=n_features,
    n_informative=int(n_features * 0.7),
    n_redundant=int(n_features * 0.2),
    n_classes=n_classes,
    random_state=42,
    n_parts=n_workers * 2
)

print(f"Generated classification dataset:")
print(f"  Samples: {n_samples}")
print(f"  Features: {n_features}")
print(f"  Classes: {n_classes}")
print(f"  Partitions: {len(X.to_delayed())}")

Generated classification dataset:
  Samples: 5000
  Features: 30
  Classes: 3
  Partitions: 2

Data Distribution Best Practices#

For Random Forest, data distribution is critical for model accuracy:

  • Option 1: Well-shuffled data: Distribute shuffled data evenly across workers (used above)

  • Option 2: Replicated data: If memory allows, replicate the entire dataset to all workers for training that most closely matches single-GPU behavior

Both approaches ensure each worker sees a representative sample of the data distribution.

[7]:
# Train distributed Random Forest
rf = RandomForestClassifier(
    n_estimators=100,
    max_depth=16,
    n_bins=32,
    random_state=42
)

rf.fit(X, y)

# Make predictions
predictions = rf.predict(X)

# Evaluate accuracy
accuracy = accuracy_score(y.compute(), predictions.compute())
print(f"Training accuracy: {accuracy:.4f}")

/opt/conda/envs/docs/lib/python3.13/site-packages/distributed/client.py:3370: UserWarning: Sending large graph of size 11.03 MiB.
This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.
  warnings.warn(
Training accuracy: 1.0000

Performance Notes#

Random Forest training performance is heavily influenced by:

  • max_depth: Lower values significantly speed up training but may reduce accuracy. Start with 12-16 for balanced performance.

  • n_estimators: More trees improve accuracy but increase training time linearly. The work is distributed across workers.

  • n_bins: Controls histogram granularity for split finding. Lower values (8-32) are faster but may miss optimal splits.

Example 3: Linear Regression#

Linear models in cuML support both Dask DataFrame and Dask Array inputs. This example demonstrates a distributed linear regression workflow.

[8]:
from cuml.dask.linear_model import LinearRegression
from cuml.dask.datasets import make_regression
from cuml.metrics import r2_score

# Generate regression dataset
X, y = make_regression(
    n_samples=10000,
    n_features=50,
    n_informative=40,
    random_state=42,
    n_parts=n_workers * 2
)

print(f"Generated regression dataset with {X.shape[0]:,} samples")

Generated regression dataset with 10,000 samples
[9]:
# Train distributed Linear Regression
lr = LinearRegression()
lr.fit(X, y)

# Make predictions
predictions = lr.predict(X)

# Evaluate model
r2 = r2_score(y.compute(), predictions.compute())
print(f"R² score: {r2:.4f}")
print(f"Number of coefficients: {len(lr.coef_)}")

R² score: 1.0000
Number of coefficients: 50

Using Dask DataFrames#

You can also use Dask cuDF DataFrames as input, which is useful when loading data from files:

[10]:
import dask_cudf
import cudf

# Convert Dask Array to Dask DataFrame
# In practice, you'd often load data directly: dask_cudf.read_csv("data.csv")
X_computed = X.compute()
y_computed = y.compute()

# Create cuDF DataFrame
df = cudf.DataFrame(X_computed)
target = cudf.Series(y_computed)

# Convert to Dask DataFrame
ddf = dask_cudf.from_cudf(df, npartitions=n_workers * 2)
dtarget = dask_cudf.from_cudf(target, npartitions=n_workers * 2)

# Train with Dask DataFrame input
lr_df = LinearRegression()
lr_df.fit(ddf, dtarget)

print(f"Model trained successfully with Dask DataFrame input")
print(f"Predictions type: {type(lr_df.predict(ddf))}")

Model trained successfully with Dask DataFrame input
Predictions type: <class 'dask_cudf._expr.collection.Series'>

Available Dask-Enabled Estimators#

cuML provides Dask implementations for many popular algorithms including clustering (KMeans, DBSCAN), ensemble methods (Random Forest), linear models (LinearRegression, Logistic Regression, Ridge, Lasso), decomposition (PCA, TruncatedSVD), nearest neighbors, and more. All are available in the cuml.dask module.

For a comprehensive list of supported algorithms and their documentation, see the Multi-Node, Multi-GPU Algorithms section of the API reference.

Performance Considerations and Best Practices#

When to Use Multi-GPU#

Use distributed computing with Dask when:

  • Dataset exceeds single GPU memory: Your data doesn’t fit on a single GPU

  • Training time is critical: Multiple GPUs can accelerate model training

  • Scaling to production: You need to handle high-throughput inference workloads

Single-GPU implementations are often sufficient and simpler for:

  • Small to medium datasets: Data that comfortably fits in single GPU memory (typically < 80% of GPU RAM)

  • Rapid prototyping: Simpler setup and debugging

  • Latency-sensitive inference: Single-GPU inference has lower overhead

Data Partitioning Strategy#

The n_parts parameter controls how data is distributed across workers:

# Rule of thumb: 2-4 partitions per worker
n_parts = n_workers * 2  # Good starting point

# More partitions: Better load balancing, more overhead
# Fewer partitions: Lower overhead, potential load imbalance

Network Optimization#

For multi-node clusters, high-performance networking options are available:

  • NVLink: For multi-GPU communication on the same node

  • InfiniBand: For fast inter-node communication

  • UCX protocol: Unified communication framework for optimal performance

For detailed configuration examples and setup instructions, see the RAPIDS dask-cuda documentation.

Memory Management#

  • RMM pool size: Pre-allocate GPU memory to reduce allocation overhead

  • Worker memory limit: Set limits to prevent out-of-memory errors

  • Partition size: Keep partitions small enough to fit comfortably in GPU memory

Performance Profiling#

Monitor your distributed computation:

  1. Dask Dashboard: Access at client.dashboard_link to visualize task execution

  2. NVIDIA tools: Use nvidia-smi to monitor GPU utilization

  3. RAPIDS Memory Manager: Enable RMM logging for memory profiling

Input Data Types#

cuML’s Dask estimators accept multiple input formats:

  • Dask Array: Use dask.array with CuPy backend for array operations

  • Dask DataFrame: Use dask_cudf.DataFrame for structured data

Choose based on your workflow:

# Dask Array - good for numerical operations
import dask.array as da
X = da.random.random((10000, 50), chunks=(1000, 50))

# Dask DataFrame - good for mixed types and data loading
import dask_cudf
df = dask_cudf.read_csv("data.csv")

Clean Up#

Optionally close your Dask client and cluster when finished to free up resources. Both the client and cluster will be shut down automatically when the notebook kernel process is shut down.

[11]:
# Close the client and cluster
client.close()
cluster.close()

print("Cluster shut down successfully")

Cluster shut down successfully

Additional Resources#