API

Cluster

class dask_cuda.LocalCUDACluster(CUDA_VISIBLE_DEVICES=None, n_workers=None, threads_per_worker=1, memory_limit='auto', device_memory_limit=0.8, data=None, local_directory=None, protocol=None, enable_tcp_over_ucx=False, enable_infiniband=False, enable_nvlink=False, enable_rdmacm=False, ucx_net_devices=None, rmm_pool_size=None, rmm_managed_memory=False, rmm_async=False, rmm_log_directory=None, jit_unspill=None, log_spilling=False, worker_class=None, **kwargs)[source]

A variant of dask.distributed.LocalCluster that uses one GPU per process.

This assigns a different CUDA_VISIBLE_DEVICES environment variable to each Dask worker process.

For machines with a complex architecture mapping CPUs, GPUs, and network hardware, such as NVIDIA DGX-1 and DGX-2, this class creates a local cluster that tries to respect this hardware as much as possible.

Each worker process is automatically assigned the correct CPU cores and network interface cards to maximize performance. If UCX and UCX-Py are available, InfiniBand and NVLink connections can be used to optimize data transfer performance.

Parameters
CUDA_VISIBLE_DEVICESstr, list of int, or None, default None

GPUs to restrict activity to. Can be a string (like "0,1,2,3"), list (like [0, 1, 2, 3]), or None to use all available GPUs.

n_workersint or None, default None

Number of workers. Can be an integer or None to fall back on the GPUs specified by CUDA_VISIBLE_DEVICES. Will override the value of CUDA_VISIBLE_DEVICES if specified.

threads_per_workerint, default 1

Number of threads to be used for each Dask worker process.

memory_limitint, float, str, or None, default “auto”

Bytes of memory per process that the worker can use. Can be an integer (bytes), float (fraction of total system memory), string (like "5GB" or "5000M"), or "auto", 0, or None for no memory management.

device_memory_limitint, float, str, or None, default 0.8

Size of the CUDA device LRU cache, which is used to determine when the worker starts spilling to host memory. Can be an integer (bytes), float (fraction of total device memory), string (like "5GB" or "5000M"), or "auto", 0, or None to disable spilling to host (i.e. allow full device memory usage).

local_directorystr or None, default None

Path on local machine to store temporary files. Can be a string (like "path/to/files") or None to fall back on the value of dask.temporary-directory in the local Dask configuration, using the current working directory if this is not set.

protocolstr or None, default None

Protocol to use for communication. Can be a string (like "tcp" or "ucx"), or None to automatically choose the correct protocol.

enable_tcp_over_ucxbool, default False

Set environment variables to enable TCP over UCX, even if InfiniBand and NVLink are not supported or disabled.

enable_infinibandbool, default False

Set environment variables to enable UCX over InfiniBand, requires protocol="ucx" and implies enable_tcp_over_ucx=True.

enable_nvlinkbool, default False

Set environment variables to enable UCX over NVLink, requires protocol="ucx" and implies enable_tcp_over_ucx=True.

enable_rdmacmbool, default False

Set environment variables to enable UCX RDMA connection manager support, requires protocol="ucx" and enable_infiniband=True.

ucx_net_devicesstr, callable, or None, default None

Interface(s) used by workers for UCX communication. Can be a string (like "eth0" for NVLink or "mlx5_0:1"/"ib0" for InfiniBand), a callable function that takes the index of the current GPU to return an interface name (like lambda dev: "mlx5_%d:1" % (dev // 2)), "auto" (requires enable_infiniband=True) to pick the optimal interface per-worker based on the system’s topology, or None to stay with the default value of "all" (use all available interfaces).

Warning

"auto" requires UCX-Py to be installed and compiled with hwloc support. Unexpected errors can occur when using "auto" if any interfaces are disconnected or improperly configured.

rmm_pool_sizeint, str or None, default None

RMM pool size to initialize each worker with. Can be an integer (bytes), string (like "5GB" or "5000M"), or None to disable RMM pools.

Note

This size is a per-worker configuration, and not cluster-wide.

rmm_managed_memorybool, default False

Initialize each worker with RMM and set it to use managed memory. If disabled, RMM may still be used by specifying rmm_pool_size.

Warning

Managed memory is currently incompatible with NVLink. Trying to enable both will result in an exception.

rmm_async: bool, default False

Initialize each worker withh RMM and set it to use RMM’s asynchronous allocator. See rmm.mr.CudaAsyncMemoryResource for more info.

Warning

The asynchronous allocator requires CUDA Toolkit 11.2 or newer. It is also incompatible with RMM pools and managed memory. Trying to enable both will result in an exception.

rmm_log_directorystr or None, default None

Directory to write per-worker RMM log files to. The client and scheduler are not logged here. Can be a string (like "/path/to/logs/") or None to disable logging.

Note

Logging will only be enabled if rmm_pool_size is specified or rmm_managed_memory=True.

jit_unspillbool or None, default None

Enable just-in-time unspilling. Can be a boolean or None to fall back on the value of dask.jit-unspill in the local Dask configuration, disabling unspilling if this is not set.

Note

This is experimental and doesn’t support memory spilling to disk. See proxy_object.ProxyObject and proxify_host_file.ProxifyHostFile for more info.

log_spillingbool, default True

Enable logging of spilling operations directly to distributed.Worker with an INFO log level.

Raises
TypeError

If InfiniBand or NVLink are enabled and protocol!="ucx".

ValueError

If ucx_net_devices="", if NVLink and RMM managed memory are both enabled, if RMM pools / managed memory and asynchronous allocator are both enabled, or if ucx_net_devices="auto" and:

  • UCX-Py is not installed or wasn’t compiled with hwloc support; or

  • enable_infiniband=False

See also

LocalCluster

Examples

>>> from dask_cuda import LocalCUDACluster
>>> from dask.distributed import Client
>>> cluster = LocalCUDACluster()
>>> client = Client(cluster)
new_worker_spec()[source]

Return name and spec for the next worker

Returns
d: dict mapping names to worker specs

See also

scale

Worker

dask-cuda-worker

dask-cuda-worker [OPTIONS] [SCHEDULER] [PRELOAD_ARGV]...

Options

--host <host>

IP address of serving host; should be visible to the scheduler and other workers. Can be a string (like "127.0.0.1") or None to fall back on the address of the interface specified by --interface or the default interface.

--nthreads <nthreads>

Number of threads to be used for each Dask worker process.

Default

1

--name <name>

A unique name for the worker. Can be a string (like "worker-1") or None for a nameless worker. If used with --nprocs, then the process number will be appended to the worker name, e.g. "worker-1-0", "worker-1-1", "worker-1-2".

--memory-limit <memory_limit>

Bytes of memory per process that the worker can use. Can be an integer (bytes), float (fraction of total system memory), string (like "5GB" or "5000M"), or "auto" or 0 for no memory management.

Default

auto

--device-memory-limit <device_memory_limit>

Size of the CUDA device LRU cache, which is used to determine when the worker starts spilling to host memory. Can be an integer (bytes), float (fraction of total device memory), string (like "5GB" or "5000M"), or "auto" or 0 to disable spilling to host (i.e. allow full device memory usage).

Default

0.8

--rmm-pool-size <rmm_pool_size>

RMM pool size to initialize each worker with. Can be an integer (bytes), string (like "5GB" or "5000M"), or None to disable RMM pools.

Note

This size is a per-worker configuration, and not cluster-wide.

--rmm-managed-memory, --no-rmm-managed-memory

Initialize each worker with RMM and set it to use managed memory. If disabled, RMM may still be used by specifying --rmm-pool-size.

Warning

Managed memory is currently incompatible with NVLink. Trying to enable both will result in failure.

Default

False

--rmm-async, --no-rmm-async

Initialize each worker withh RMM and set it to use RMM’s asynchronous allocator. See rmm.mr.CudaAsyncMemoryResource for more info.

Warning

The asynchronous allocator requires CUDA Toolkit 11.2 or newer. It is also incompatible with RMM pools and managed memory, trying to enable both will result in failure.

Default

False

--rmm-log-directory <rmm_log_directory>

Directory to write per-worker RMM log files to. The client and scheduler are not logged here. Can be a string (like "/path/to/logs/") or None to disable logging.

Note

Logging will only be enabled if --rmm-pool-size or --rmm-managed-memory are specified.

--pid-file <pid_file>

File to write the process PID.

--resources <resources>

Resources for task constraints like "GPU=2 MEM=10e9". Resources are applied separately to each worker process (only relevant when starting multiple worker processes with --nprocs).

--dashboard, --no-dashboard

Launch the dashboard.

Default

True

--dashboard-address <dashboard_address>

Relative address to serve the dashboard (if enabled).

Default

:0

--local-directory <local_directory>

Path on local machine to store temporary files. Can be a string (like "path/to/files") or None to fall back on the value of dask.temporary-directory in the local Dask configuration, using the current working directory if this is not set.

--scheduler-file <scheduler_file>

Filename to JSON encoded scheduler information. To be used in conjunction with the equivalent dask-scheduler option.

--protocol <protocol>

Protocol like tcp, tls, or ucx

--interface <interface>

External interface used to connect to the scheduler. Usually an ethernet interface is used for connection, and not an InfiniBand interface (if one is available). Can be a string (like "eth0" for NVLink or "ib0" for InfiniBand) or None to fall back on the default interface.

--preload <preload>

Module that should be loaded by each worker process like "foo.bar" or "/path/to/foo.py".

--dashboard-prefix <dashboard_prefix>

Prefix for the dashboard. Can be a string (like …) or None for no prefix.

--tls-ca-file <tls_ca_file>

CA certificate(s) file for TLS (in PEM format). Can be a string (like "path/to/certs"), or None for no certificate(s).

--tls-cert <tls_cert>

Certificate file for TLS (in PEM format). Can be a string (like "path/to/certs"), or None for no certificate(s).

--tls-key <tls_key>

Private key file for TLS (in PEM format). Can be a string (like "path/to/certs"), or None for no private key.

--enable-tcp-over-ucx, --disable-tcp-over-ucx

Set environment variables to enable TCP over UCX, even if InfiniBand and NVLink are not supported or disabled.

Default

False

--enable-infiniband, --disable-infiniband

Set environment variables to enable UCX over InfiniBand, implies --enable-tcp-over-ucx.

Default

False

Set environment variables to enable UCX over NVLink, implies --enable-tcp-over-ucx.

Default

False

--enable-rdmacm, --disable-rdmacm

Set environment variables to enable UCX RDMA connection manager support, requires --enable-infiniband.

Default

False

--net-devices <net_devices>

Interface(s) used by workers for UCX communication. Can be a string (like "eth0" for NVLink or "mlx5_0:1"/"ib0" for InfiniBand), "auto" (requires --enable-infiniband) to pick the optimal interface per-worker based on the system’s topology, or None to stay with the default value of "all" (use all available interfaces).

Warning

"auto" requires UCX-Py to be installed and compiled with hwloc support. Unexpected errors can occur when using "auto" if any interfaces are disconnected or improperly configured.

--enable-jit-unspill, --disable-jit-unspill

Enable just-in-time unspilling. Can be a boolean or None to fall back on the value of dask.jit-unspill in the local Dask configuration, disabling unspilling if this is not set.

Note

This is experimental and doesn’t support memory spilling to disk. See proxy_object.ProxyObject and proxify_host_file.ProxifyHostFile for more info.

--worker-class <worker_class>

Use a different class than Distributed’s default (distributed.Worker) to spawn distributed.Nanny.

Arguments

SCHEDULER

Optional argument

PRELOAD_ARGV

Optional argument(s)

Client initialization

dask_cuda.initialize.initialize(create_cuda_context=True, enable_tcp_over_ucx=False, enable_infiniband=False, enable_nvlink=False, enable_rdmacm=False, net_devices='', cuda_device_index=None)[source]

Create CUDA context and initialize UCX-Py, depending on user parameters.

Sometimes it is convenient to initialize the CUDA context, particularly before starting up Dask worker processes which create a variety of threads.

To ensure UCX works correctly, it is important to ensure it is initialized with the correct options. This is especially important for the client, which cannot be configured to use UCX with arguments like LocalCUDACluster and dask-cuda-worker. This function will ensure that they are provided a UCX configuration based on the flags and options passed by the user.

This function can also be used within a worker preload script for UCX configuration of mainline Dask.distributed. https://docs.dask.org/en/latest/setup/custom-startup.html

You can add it to your global config with the following YAML:

distributed:
  worker:
    preload:
      - dask_cuda.initialize

See https://docs.dask.org/en/latest/configuration.html for more information about Dask configuration.

Parameters
create_cuda_contextbool, default True

Create CUDA context on initialization.

enable_tcp_over_ucxbool, default False

Set environment variables to enable TCP over UCX, even if InfiniBand and NVLink are not supported or disabled.

enable_infinibandbool, default False

Set environment variables to enable UCX over InfiniBand, implies enable_tcp_over_ucx=True.

enable_nvlinkbool, default False

Set environment variables to enable UCX over NVLink, implies enable_tcp_over_ucx=True.

enable_rdmacmbool, default False

Set environment variables to enable UCX RDMA connection manager support, requires enable_infiniband=True.

net_devicesstr or callable, default “”

Interface(s) used by workers for UCX communication. Can be a string (like "eth0" for NVLink, "mlx5_0:1"/"ib0" for InfiniBand, or "" to use all available devices), or a callable function that takes the index of the current GPU to return an interface name (like lambda dev: "mlx5_%d:1" % (dev // 2)).

Note

If net_devices is callable, a GPU index must be supplied through cuda_device_index.

cuda_device_indexint or None, default None

Index of the current GPU, which must be specified for net_devices if it is callable. Can be an integer or None if net_devices is not callable.

Explicit-comms

class dask_cuda.explicit_comms.comms.CommsContext(client: Optional[distributed.client.Client] = None)[source]

Communication handler for explicit communication

Parameters
client: Client, optional

Specify client to use for communication. If None, use the default client.

run(coroutine, *args, workers=None, lock_workers=False)[source]

Run a coroutine on multiple workers

The coroutine is given the worker’s state dict as the first argument and *args as the following arguments.

Parameters
coroutine: coroutine

The function to run on each worker

*args:

Arguments for coroutine

workers: list, optional

List of workers. Default is all workers

lock_workers: bool, optional

Use distributed.MultiLock to get exclusive access to the workers. Use this flag to support parallel runs.

Returns
ret: list

List of the output from each worker

submit(worker, coroutine, *args, wait=False)[source]

Run a coroutine on a single worker

The coroutine is given the worker’s state dict as the first argument and *args as the following arguments.

Parameters
worker: str

Worker to run the coroutine

coroutine: coroutine

The function to run on the worker

*args:

Arguments for coroutine

wait: boolean, optional

If True, waits for the coroutine to finished before returning.

Returns
ret: object or Future

If wait=True, the result of coroutine If wait=False, Future that can be waited on later.