GPU optimization for the Dask scheduler on Kubernetes#
An optimization users can make while deploying Dask clusters is to ensure that the scheduler is placed on a node with a less powerful GPU to reduce overall cost. This previous guide explains why the scheduler needs access to the same environment as the workers, as there are a few edge cases where the scheduler does serialize data and unpickles high-level graphs.
Warning
This guide outlines our current advice on scheduler hardware requirements, but this may be subject to change.
However, when working with nodes with multiple GPUs, placing the scheduler on one of these nodes would be a waste of resources. This guide walks through the steps to create a Kubernetes cluster on GKE along with a nodepool of less powerful Nvidia Tesla T4 GPUs and placing the scheduler on this node using Kubernetes node affinity.
Prerequisites#
First you’ll need to have the gcloud
CLI tool installed along with kubectl
, helm
, etc for managing Kubernetes.
Ensure you are logged into the gcloud
CLI.
$ gcloud init
Create the Kubernetes cluster#
Now we can launch a GPU enabled GKE cluster.
$ gcloud container clusters create rapids-gpu \
--accelerator type=nvidia-tesla-a100,count=2 --machine-type a2-highgpu-2g \
--zone us-central1-c --release-channel stable
With this command, you’ve launched a GKE cluster called rapids-gpu
. You’ve specified that it should use nodes of type
a2-highgpu-2g, each with two A100 GPUs.
Create the dedicated nodepool for the scheduler#
Now create a new nodepool on this GPU cluster.
$ gcloud container node-pools create scheduler-pool --cluster rapids-gpu \
--accelerator type=nvidia-tesla-t4,count=1 --machine-type n1-standard-2 \
--num-nodes 1 --node-labels dedicated=scheduler --zone us-central1-c
With this command, you’ve created an additional nodepool called scheduler-pool
with 1 node. You’ve also specified that it should use a node of type n1-standard-2, with one T4 GPU.
We also add a Kubernetes label dedicated=scheduled
to the node in this nodepool which will be used to place the scheduler onto this node.
Install drivers#
Next, install the NVIDIA drivers onto each node.
$ kubectl apply -f https://raw.githubusercontent.com/GoogleCloudPlatform/container-engine-accelerators/master/nvidia-driver-installer/cos/daemonset-preloaded-latest.yaml
daemonset.apps/nvidia-driver-installer created
Verify that the NVIDIA drivers are successfully installed.
$ kubectl get po -A --watch | grep nvidia
kube-system nvidia-driver-installer-6zwcn 1/1 Running 0 8m47s
kube-system nvidia-driver-installer-8zmmn 1/1 Running 0 8m47s
kube-system nvidia-driver-installer-mjkb8 1/1 Running 0 8m47s
kube-system nvidia-gpu-device-plugin-5ffkm 1/1 Running 0 13m
kube-system nvidia-gpu-device-plugin-d599s 1/1 Running 0 13m
kube-system nvidia-gpu-device-plugin-jrgjh 1/1 Running 0 13m
After your drivers are installed, you are ready to test your cluster.
Let’s create a sample pod that uses some GPU compute to make sure that everything is working as expected.
cat << EOF | kubectl create -f -
apiVersion: v1
kind: Pod
metadata:
name: cuda-vectoradd
spec:
restartPolicy: OnFailure
containers:
- name: cuda-vectoradd
image: "nvidia/samples:vectoradd-cuda11.6.0-ubuntu18.04"
resources:
limits:
nvidia.com/gpu: 1
EOF
$ kubectl logs pod/cuda-vectoradd
[Vector addition of 50000 elements]
Copy input data from the host memory to the CUDA device
CUDA kernel launch with 196 blocks of 256 threads
Copy output data from the CUDA device to the host memory
Test PASSED
Done
If you see Test PASSED
in the output, you can be confident that your Kubernetes cluster has GPU compute set up correctly.
Next, clean up that pod.
$ kubectl delete pod cuda-vectoradd
pod "cuda-vectoradd" deleted
Installing Dask operator with Helm#
The operator has a Helm chart which can be used to manage the installation of the operator. The chart is published in the Dask Helm Repo repository, and can be installed via:
$ helm repo add dask https://helm.dask.org
"dask" has been added to your repositories
$ helm repo update
Hang tight while we grab the latest from your chart repositories...
...Successfully got an update from the "dask" chart repository
Update Complete. ⎈Happy Helming!⎈
$ helm install --create-namespace -n dask-operator --generate-name dask/dask-kubernetes-operator
NAME: dask-kubernetes-operator-1666875935
NAMESPACE: dask-operator
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
Operator has been installed successfully.
Then you should be able to list your Dask clusters via kubectl
.
$ kubectl get daskclusters
No resources found in default namespace.
We can also check the operator pod is running:
$ kubectl get pods -A -l app.kubernetes.io/name=dask-kubernetes-operator
NAMESPACE NAME READY STATUS RESTARTS AGE
dask-operator dask-kubernetes-operator-775b8bbbd5-zdrf7 1/1 Running 0 74s
Configuring a RAPIDS DaskCluster
#
To configure the DaskCluster
resource to run RAPIDS you need to set a few things:
The container image must contain RAPIDS, the official RAPIDS container images are a good choice for this.
The Dask workers must be configured with one or more NVIDIA GPU resources.
The worker command must be set to
dask-cuda-worker
.
Creating a RAPIDS DaskCluster
using kubectl
#
Here is an example resource manifest for launching a RAPIDS Dask cluster with the scheduler optimzation
# rapids-dask-cluster.yaml apiVersion: kubernetes.dask.org/v1 kind: DaskCluster metadata: name: rapids-dask-cluster labels: dask.org/cluster-name: rapids-dask-cluster spec: worker: replicas: 2 spec: containers: - name: worker image: "nvcr.io/nvidia/rapidsai/base:24.10-cuda12.5-py3.12" imagePullPolicy: "IfNotPresent" args: - dask-cuda-worker - --name - $(DASK_WORKER_NAME) resources: limits: nvidia.com/gpu: "1" scheduler: spec: containers: - name: scheduler image: "nvcr.io/nvidia/rapidsai/base:24.10-cuda12.5-py3.12" imagePullPolicy: "IfNotPresent" env: args: - dask-scheduler ports: - name: tcp-comm containerPort: 8786 protocol: TCP - name: http-dashboard containerPort: 8787 protocol: TCP readinessProbe: httpGet: port: http-dashboard path: /health initialDelaySeconds: 5 periodSeconds: 10 livenessProbe: httpGet: port: http-dashboard path: /health initialDelaySeconds: 15 periodSeconds: 20 resources: limits: nvidia.com/gpu: "1" affinity: nodeAffinity: preferredDuringSchedulingIgnoredDuringExecution: - weight: 100 preference: matchExpressions: - key: dedicated operator: In values: - scheduler service: type: ClusterIP selector: dask.org/cluster-name: rapids-dask-cluster dask.org/component: scheduler ports: - name: tcp-comm protocol: TCP port: 8786 targetPort: "tcp-comm" - name: http-dashboard protocol: TCP port: 8787 targetPort: "http-dashboard"
You can create this cluster with kubectl
.
$ kubectl apply -f rapids-dask-cluster.yaml
Manifest breakdown#
Most of this manifest is explained in the Dask Operator documentation in the tools section of the RAPIDS documentation.
The only addition made to the example from the above documentation page is the following section in the scheduler configuration
# ...
affinity:
nodeAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
preference:
matchExpressions:
- key: dedicated
operator: In
values:
- scheduler
# ...
For the Dask scheduler pod we are setting a node affinity using the label previously specified on the dedicated node. Node affinity in Kubernetes allows you to constrain which nodes your Pod can be scheduled based on node labels. This is also intended to be a soft requirement as we are using the preferredDuringSchedulingIgnoredDuringExecution
type of node affinity. The Kubernetes scheduler tries to find a node which meets the rule. If a matching node is not available, the Kubernetes scheduler still schedules the pod on any available node. This ensures that you will not face any issues with the Dask cluster even if the T4 node is unavailable.
Accessing your Dask cluster#
Once you have created your DaskCluster
resource we can use kubectl
to check the status of all the other resources it created for us.
$ kubectl get all -l dask.org/cluster-name=rapids-dask-cluster
NAME READY STATUS RESTARTS AGE
pod/rapids-dask-cluster-default-worker-group-worker-0c202b85fd 1/1 Running 0 4m13s
pod/rapids-dask-cluster-default-worker-group-worker-ff5d376714 1/1 Running 0 4m13s
pod/rapids-dask-cluster-scheduler 1/1 Running 0 4m14s
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/rapids-dask-cluster-service ClusterIP 10.96.223.217 <none> 8786/TCP,8787/TCP 4m13s
Here you can see our scheduler pod and two worker pods along with the scheduler service.
If you have a Python session running within the Kubernetes cluster (like the example one on the Kubernetes page) you should be able to connect a Dask distributed client directly.
from dask.distributed import Client
client = Client("rapids-dask-cluster-scheduler:8786")
Alternatively if you are outside of the Kubernetes cluster you can change the Service
to use LoadBalancer
or NodePort
or use kubectl
to port forward the connection locally.
$ kubectl port-forward svc/rapids-dask-cluster-service 8786:8786
Forwarding from 127.0.0.1:8786 -> 8786
from dask.distributed import Client
client = Client("localhost:8786")
Example using KubeCluster
#
In additon to creating clusters via kubectl
you can also do so from Python with dask_kubernetes.operator.KubeCluster
. This class implements the Dask Cluster Manager interface and under the hood creates and manages the DaskCluster
resource for you. You can also generate a spec with make_cluster_spec()
which KubeCluster uses internally and then modify it with your custom options. We will use this to add node affinity to the scheduler.
from dask_kubernetes.operator import KubeCluster, make_cluster_spec spec = make_cluster_spec( name="rapids-dask-cluster", image="nvcr.io/nvidia/rapidsai/base:24.10-cuda12.5-py3.12", n_workers=2, resources={"limits": {"nvidia.com/gpu": "1"}}, worker_command="dask-cuda-worker", )
To add the node affinity to the scheduler, you can create a custom dictionary specifying the type of node affinity and the label of the node.
affinity_config = {
"nodeAffinity": {
"preferredDuringSchedulingIgnoredDuringExecution": [
{
"weight": 100,
"preference": {
"matchExpressions": [
{"key": "dedicated", "operator": "In", "values": ["scheduler"]}
]
},
}
]
}
}
Now you can add this configuration to the spec created in the previous step, and create the Dask cluster using this custom spec.
spec["spec"]["scheduler"]["spec"]["affinity"] = affinity_config
cluster = KubeCluster(custom_cluster_spec=spec)
If we check with kubectl
we can see the above Python generated the same DaskCluster
resource as the kubectl
example above.
$ kubectl get daskclusters
NAME AGE
rapids-dask-cluster 3m28s
$ kubectl get all -l dask.org/cluster-name=rapids-dask-cluster
NAME READY STATUS RESTARTS AGE
pod/rapids-dask-cluster-default-worker-group-worker-07d674589a 1/1 Running 0 3m30s
pod/rapids-dask-cluster-default-worker-group-worker-a55ed88265 1/1 Running 0 3m30s
pod/rapids-dask-cluster-scheduler 1/1 Running 0 3m30s
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/rapids-dask-cluster-service ClusterIP 10.96.200.202 <none> 8786/TCP,8787/TCP 3m30s
With this cluster object in Python we can also connect a client to it directly without needing to know the address as Dask will discover that for us. It also automatically sets up port forwarding if you are outside of the Kubernetes cluster.
from dask.distributed import Client
client = Client(cluster)
This object can also be used to scale the workers up and down.
cluster.scale(5)
And to manually close the cluster.
cluster.close()
Note
By default the KubeCluster
command registers an exit hook so when the Python process exits the cluster is deleted automatically. You can disable this by setting KubeCluster(..., shutdown_on_close=False)
when launching the cluster.
This is useful if you have a multi-stage pipeline made up of multiple Python processes and you want your Dask cluster to persist between them.
You can also connect a KubeCluster
object to your existing cluster with cluster = KubeCluster.from_name(name="rapids-dask")
if you wish to use the cluster or manually call cluster.close()
in the future.