API

Cluster

class dask_cuda.LocalCUDACluster(CUDA_VISIBLE_DEVICES=None, n_workers=None, threads_per_worker=1, memory_limit='auto', device_memory_limit=0.8, enable_cudf_spill=False, cudf_spill_stats=0, data=None, local_directory=None, shared_filesystem=None, protocol=None, enable_tcp_over_ucx=None, enable_infiniband=None, enable_nvlink=None, enable_rdmacm=None, rmm_pool_size=None, rmm_maximum_pool_size=None, rmm_managed_memory=False, rmm_async=False, rmm_allocator_external_lib_list=None, rmm_release_threshold=None, rmm_log_directory=None, rmm_track_allocations=False, jit_unspill=None, log_spilling=False, worker_class=None, pre_import=None, **kwargs)[source]

A variant of dask.distributed.LocalCluster that uses one GPU per process.

This assigns a different CUDA_VISIBLE_DEVICES environment variable to each Dask worker process.

For machines with a complex architecture mapping CPUs, GPUs, and network hardware, such as NVIDIA DGX-1 and DGX-2, this class creates a local cluster that tries to respect this hardware as much as possible.

Each worker process is automatically assigned the correct CPU cores and network interface cards to maximize performance. If UCX and UCX-Py are available, InfiniBand and NVLink connections can be used to optimize data transfer performance.

Parameters:
CUDA_VISIBLE_DEVICESstr, list of int, or None, default None

GPUs to restrict activity to. Can be a string (like "0,1,2,3"), list (like [0, 1, 2, 3]), or None to use all available GPUs.

n_workersint or None, default None

Number of workers. Can be an integer or None to fall back on the GPUs specified by CUDA_VISIBLE_DEVICES. The value of n_workers must be smaller or equal to the number of GPUs specified in CUDA_VISIBLE_DEVICES when the latter is specified, and if smaller, only the first n_workers GPUs will be used.

threads_per_workerint, default 1

Number of threads to be used for each Dask worker process.

memory_limitint, float, str, or None, default “auto”

Size of the host LRU cache, which is used to determine when the worker starts spilling to disk (not available if JIT-Unspill is enabled). Can be an integer (bytes), float (fraction of total system memory), string (like "5GB" or "5000M"), or "auto", 0, or None for no memory management.

device_memory_limitint, float, str, or None, default 0.8

Size of the CUDA device LRU cache, which is used to determine when the worker starts spilling to host memory. Can be an integer (bytes), float (fraction of total device memory), string (like "5GB" or "5000M"), or "auto", 0, or None to disable spilling to host (i.e. allow full device memory usage).

enable_cudf_spillbool, default False

Enable automatic cuDF spilling.

Warning

This should NOT be used together with JIT-Unspill.

cudf_spill_statsint, default 0

Set the cuDF spilling statistics level. This option has no effect if enable_cudf_spill=False.

local_directorystr or None, default None

Path on local machine to store temporary files. Can be a string (like "path/to/files") or None to fall back on the value of dask.temporary-directory in the local Dask configuration, using the current working directory if this is not set.

shared_filesystem: bool or None, default None

Whether the local_directory above is shared between all workers or not. If None, the “jit-unspill-shared-fs” config value are used, which defaults to True. Notice, in all other cases this option defaults to False, but on a local cluster it defaults to True – we assume all workers use the same filesystem.

protocolstr or None, default None

Protocol to use for communication. Can be a string (like "tcp" or "ucx"), or None to automatically choose the correct protocol.

enable_tcp_over_ucxbool, default None

Set environment variables to enable TCP over UCX, even if InfiniBand and NVLink are not supported or disabled.

enable_infinibandbool, default None

Set environment variables to enable UCX over InfiniBand, requires protocol="ucx" and implies enable_tcp_over_ucx=True when True.

enable_nvlinkbool, default None

Set environment variables to enable UCX over NVLink, requires protocol="ucx" and implies enable_tcp_over_ucx=True when True.

enable_rdmacmbool, default None

Set environment variables to enable UCX RDMA connection manager support, requires protocol="ucx" and enable_infiniband=True.

rmm_pool_sizeint, str or None, default None

RMM pool size to initialize each worker with. Can be an integer (bytes), float (fraction of total device memory), string (like "5GB" or "5000M"), or None to disable RMM pools.

Note

This size is a per-worker configuration, and not cluster-wide.

rmm_maximum_pool_sizeint, str or None, default None

When rmm_pool_size is set, this argument indicates the maximum pool size. Can be an integer (bytes), float (fraction of total device memory), string (like "5GB" or "5000M") or None. By default, the total available memory on the GPU is used. rmm_pool_size must be specified to use RMM pool and to set the maximum pool size.

Note

When paired with –enable-rmm-async the maximum size cannot be guaranteed due to fragmentation.

Note

This size is a per-worker configuration, and not cluster-wide.

rmm_managed_memorybool, default False

Initialize each worker with RMM and set it to use managed memory. If disabled, RMM may still be used by specifying rmm_pool_size.

Warning

Managed memory is currently incompatible with NVLink. Trying to enable both will result in an exception.

rmm_async: bool, default False

Initialize each worker with RMM and set it to use RMM’s asynchronous allocator. See rmm.mr.CudaAsyncMemoryResource for more info.

Warning

The asynchronous allocator requires CUDA Toolkit 11.2 or newer. It is also incompatible with RMM pools and managed memory. Trying to enable both will result in an exception.

rmm_allocator_external_lib_list: str, list or None, default None

List of external libraries for which to set RMM as the allocator. Supported options are: ["torch", "cupy"]. Can be a comma-separated string (like "torch,cupy") or a list of strings (like ["torch", "cupy"]). If None, no external libraries will use RMM as their allocator.

rmm_release_threshold: int, str or None, default None

When rmm.async is True and the pool size grows beyond this value, unused memory held by the pool will be released at the next synchronization point. Can be an integer (bytes), float (fraction of total device memory), string (like "5GB" or "5000M") or None. By default, this feature is disabled.

Note

This size is a per-worker configuration, and not cluster-wide.

rmm_log_directorystr or None, default None

Directory to write per-worker RMM log files to. The client and scheduler are not logged here. Can be a string (like "/path/to/logs/") or None to disable logging.

Note

Logging will only be enabled if rmm_pool_size is specified or rmm_managed_memory=True.

rmm_track_allocationsbool, default False

If True, wraps the memory resource used by each worker with a rmm.mr.TrackingResourceAdaptor, which tracks the amount of memory allocated.

Note

This option enables additional diagnostics to be collected and reported by the Dask dashboard. However, there is significant overhead associated with this and it should only be used for debugging and memory profiling.

jit_unspillbool or None, default None

Enable just-in-time unspilling. Can be a boolean or None to fall back on the value of dask.jit-unspill in the local Dask configuration, disabling unspilling if this is not set.

Note

This is experimental and doesn’t support memory spilling to disk. See proxy_object.ProxyObject and proxify_host_file.ProxifyHostFile for more info.

log_spillingbool, default True

Enable logging of spilling operations directly to distributed.Worker with an INFO log level.

pre_importstr, list or None, default None

Pre-import libraries as a Worker plugin to prevent long import times bleeding through later Dask operations. Should be a list of comma-separated names, such as “cudf,rmm” or a list of strings such as [“cudf”, “rmm”].

Raises:
TypeError

If InfiniBand or NVLink are enabled and protocol!="ucx".

ValueError

If RMM pool, RMM managed memory or RMM async allocator are requested but RMM cannot be imported. If RMM managed memory and asynchronous allocator are both enabled. If RMM maximum pool size is set but RMM pool size is not. If RMM maximum pool size is set but RMM async allocator is used. If RMM release threshold is set but the RMM async allocator is not being used.

See also

LocalCluster

Examples

>>> from dask_cuda import LocalCUDACluster
>>> from dask.distributed import Client
>>> cluster = LocalCUDACluster()
>>> client = Client(cluster)
new_worker_spec()[source]

Return name and spec for the next worker

Returns:
d: dict mapping names to worker specs

See also

scale

CLI

Worker

dask cuda

Launch a distributed worker with GPUs attached to an existing scheduler.

A scheduler can be specified either through a URI passed through the SCHEDULER argument or a scheduler file passed through the --scheduler-file option.

See https://docs.rapids.ai/api/dask-cuda/stable/quickstart.html#dask-cuda-worker for info.

dask cuda [OPTIONS] [SCHEDULER] [PRELOAD_ARGV]...

Options

--host <host>

IP address of serving host; should be visible to the scheduler and other workers. Can be a string (like "127.0.0.1") or None to fall back on the address of the interface specified by --interface or the default interface.

--nthreads <nthreads>

Number of threads to be used for each Dask worker process.

Default:

1

--name <name>

A unique name for the worker. Can be a string (like "worker-1") or None for a nameless worker.

--memory-limit <memory_limit>

Size of the host LRU cache, which is used to determine when the worker starts spilling to disk (not available if JIT-Unspill is enabled). Can be an integer (bytes), float (fraction of total system memory), string (like "5GB" or "5000M"), or "auto", 0, or None for no memory management.

Default:

'auto'

--device-memory-limit <device_memory_limit>

Size of the CUDA device LRU cache, which is used to determine when the worker starts spilling to host memory. Can be an integer (bytes), float (fraction of total device memory), string (like "5GB" or "5000M"), or "auto" or 0 to disable spilling to host (i.e. allow full device memory usage).

Default:

'0.8'

--enable-cudf-spill, --disable-cudf-spill

Enable automatic cuDF spilling. WARNING: This should NOT be used with JIT-Unspill.

Default:

False

--cudf-spill-stats <cudf_spill_stats>

Set the cuDF spilling statistics level. This option has no effect if –enable-cudf-spill is not specified.

--rmm-pool-size <rmm_pool_size>

RMM pool size to initialize each worker with. Can be an integer (bytes), float (fraction of total device memory), string (like "5GB" or "5000M"), or None to disable RMM pools.

Note

This size is a per-worker configuration, and not cluster-wide.

--rmm-maximum-pool-size <rmm_maximum_pool_size>

When --rmm-pool-size is specified, this argument indicates the maximum pool size. Can be an integer (bytes), float (fraction of total device memory), string (like "5GB" or "5000M") or None. By default, the total available memory on the GPU is used. rmm_pool_size must be specified to use RMM pool and to set the maximum pool size.

Note

When paired with –enable-rmm-async the maximum size cannot be guaranteed due to fragmentation.

Note

This size is a per-worker configuration, and not cluster-wide.

--rmm-managed-memory, --no-rmm-managed-memory

Initialize each worker with RMM and set it to use managed memory. If disabled, RMM may still be used by specifying --rmm-pool-size.

Warning

Managed memory is currently incompatible with NVLink. Trying to enable both will result in failure.

Default:

False

--rmm-async, --no-rmm-async

Initialize each worker with RMM and set it to use RMM’s asynchronous allocator. See rmm.mr.CudaAsyncMemoryResource for more info.

Warning

The asynchronous allocator requires CUDA Toolkit 11.2 or newer. It is also incompatible with RMM pools and managed memory, trying to enable both will result in failure.

Default:

False

--set-rmm-allocator-for-libs <rmm_allocator_external_lib_list>

Set RMM as the allocator for external libraries. Provide a comma-separated list of libraries to set, e.g., “torch,cupy”.

Options:

cupy | torch

--rmm-release-threshold <rmm_release_threshold>

When rmm.async is True and the pool size grows beyond this value, unused memory held by the pool will be released at the next synchronization point. Can be an integer (bytes), float (fraction of total device memory), string (like "5GB" or "5000M") or None. By default, this feature is disabled.

Note

This size is a per-worker configuration, and not cluster-wide.

--rmm-log-directory <rmm_log_directory>

Directory to write per-worker RMM log files to. The client and scheduler are not logged here. Can be a string (like "/path/to/logs/") or None to disable logging.

Note

Logging will only be enabled if --rmm-pool-size or --rmm-managed-memory are specified.

--rmm-track-allocations, --no-rmm-track-allocations

Track memory allocations made by RMM. If True, wraps the memory resource of each worker with a rmm.mr.TrackingResourceAdaptor that allows querying the amount of memory allocated by RMM.

Default:

False

--pid-file <pid_file>

File to write the process PID.

--resources <resources>

Resources for task constraints like "GPU=2 MEM=10e9".

--dashboard, --no-dashboard

Launch the dashboard.

Default:

True

--dashboard-address <dashboard_address>

Relative address to serve the dashboard (if enabled).

Default:

':0'

--local-directory <local_directory>

Path on local machine to store temporary files. Can be a string (like "path/to/files") or None to fall back on the value of dask.temporary-directory in the local Dask configuration, using the current working directory if this is not set.

--shared-filesystem, --no-shared-filesystem

If –shared-filesystem is specified, inform JIT-Unspill that local_directory is a shared filesystem available for all workers, whereas –no-shared-filesystem informs it may not assume it’s a shared filesystem. If neither is specified, JIT-Unspill will decide based on the Dask config value specified by “jit-unspill-shared-fs”. Notice, a shared filesystem must support the os.link() operation.

--scheduler-file <scheduler_file>

Filename to JSON encoded scheduler information. To be used in conjunction with the equivalent dask scheduler option.

--protocol <protocol>

Protocol like tcp, tls, or ucx

--interface <interface>

External interface used to connect to the scheduler. Usually an ethernet interface is used for connection, and not an InfiniBand interface (if one is available). Can be a string (like "eth0" for NVLink or "ib0" for InfiniBand) or None to fall back on the default interface.

--preload <preload>

Module that should be loaded by each worker process like "foo.bar" or "/path/to/foo.py".

--death-timeout <death_timeout>

Seconds to wait for a scheduler before closing

--dashboard-prefix <dashboard_prefix>

Prefix for the dashboard. Can be a string (like …) or None for no prefix.

--tls-ca-file <tls_ca_file>

CA certificate(s) file for TLS (in PEM format). Can be a string (like "path/to/certs"), or None for no certificate(s).

--tls-cert <tls_cert>

Certificate file for TLS (in PEM format). Can be a string (like "path/to/certs"), or None for no certificate(s).

--tls-key <tls_key>

Private key file for TLS (in PEM format). Can be a string (like "path/to/certs"), or None for no private key.

--enable-tcp-over-ucx, --disable-tcp-over-ucx

Set environment variables to enable TCP over UCX, even if InfiniBand and NVLink are not supported or disabled.

--enable-infiniband, --disable-infiniband

Set environment variables to enable UCX over InfiniBand, implies --enable-tcp-over-ucx when enabled.

Set environment variables to enable UCX over NVLink, implies --enable-tcp-over-ucx when enabled.

--enable-rdmacm, --disable-rdmacm

Set environment variables to enable UCX RDMA connection manager support, requires --enable-infiniband.

--enable-jit-unspill, --disable-jit-unspill

Enable just-in-time unspilling. Can be a boolean or None to fall back on the value of dask.jit-unspill in the local Dask configuration, disabling unspilling if this is not set.

Note

This is experimental and doesn’t support memory spilling to disk. See proxy_object.ProxyObject and proxify_host_file.ProxifyHostFile for more info.

--worker-class <worker_class>

Use a different class than Distributed’s default (distributed.Worker) to spawn distributed.Nanny.

--pre-import <pre_import>

Pre-import libraries as a Worker plugin to prevent long import times bleeding through later Dask operations. Should be a list of comma-separated names, such as “cudf,rmm”.

--multiprocessing-method <multiprocessing_method>

Method used to start new processes with multiprocessing

Options:

spawn | fork | forkserver

Arguments

SCHEDULER

Optional argument

PRELOAD_ARGV

Optional argument(s)

Cluster configuration

dask cuda

Query an existing GPU cluster’s configuration.

A cluster can be specified either through a URI passed through the SCHEDULER argument or a scheduler file passed through the --scheduler-file option.

dask cuda [OPTIONS] [SCHEDULER] [PRELOAD_ARGV]...

Options

--scheduler-file <scheduler_file>

Filename to JSON encoded scheduler information. To be used in conjunction with the equivalent dask scheduler option.

--tls-ca-file <tls_ca_file>

CA certificate(s) file for TLS (in PEM format). Can be a string (like "path/to/certs"), or None for no certificate(s).

--tls-cert <tls_cert>

Certificate file for TLS (in PEM format). Can be a string (like "path/to/certs"), or None for no certificate(s).

--tls-key <tls_key>

Private key file for TLS (in PEM format). Can be a string (like "path/to/certs"), or None for no private key.

Arguments

SCHEDULER

Optional argument

PRELOAD_ARGV

Optional argument(s)

Client initialization

dask_cuda.initialize.initialize(create_cuda_context=True, enable_tcp_over_ucx=None, enable_infiniband=None, enable_nvlink=None, enable_rdmacm=None, protocol='ucx')[source]

Create CUDA context and initialize UCX-Py, depending on user parameters.

Sometimes it is convenient to initialize the CUDA context, particularly before starting up Dask worker processes which create a variety of threads.

To ensure UCX works correctly, it is important to ensure it is initialized with the correct options. This is especially important for the client, which cannot be configured to use UCX with arguments like LocalCUDACluster and dask cuda worker. This function will ensure that they are provided a UCX configuration based on the flags and options passed by the user.

This function can also be used within a worker preload script for UCX configuration of mainline Dask.distributed. https://docs.dask.org/en/latest/setup/custom-startup.html

You can add it to your global config with the following YAML:

distributed:
  worker:
    preload:
      - dask_cuda.initialize

See https://docs.dask.org/en/latest/configuration.html for more information about Dask configuration.

Parameters:
create_cuda_contextbool, default True

Create CUDA context on initialization.

enable_tcp_over_ucxbool, default None

Set environment variables to enable TCP over UCX, even if InfiniBand and NVLink are not supported or disabled.

enable_infinibandbool, default None

Set environment variables to enable UCX over InfiniBand, implies enable_tcp_over_ucx=True when True.

enable_nvlinkbool, default None

Set environment variables to enable UCX over NVLink, implies enable_tcp_over_ucx=True when True.

enable_rdmacmbool, default None

Set environment variables to enable UCX RDMA connection manager support, requires enable_infiniband=True.

Explicit-comms

class dask_cuda.explicit_comms.comms.CommsContext(client: Client | None = None)[source]

Communication handler for explicit communication

Parameters:
client: Client, optional

Specify client to use for communication. If None, use the default client.

run(coroutine, *args, workers=None, lock_workers=False)[source]

Run a coroutine on multiple workers

The coroutine is given the worker’s state dict as the first argument and *args as the following arguments.

Parameters:
coroutine: coroutine

The function to run on each worker

*args:

Arguments for coroutine

workers: list, optional

List of workers. Default is all workers

lock_workers: bool, optional

Use distributed.MultiLock to get exclusive access to the workers. Use this flag to support parallel runs.

Returns:
ret: list

List of the output from each worker

stage_keys(name: str, keys: Iterable[Hashable]) Dict[int, set][source]

Staging keys on workers under the given name

In an explicit-comms task, use pop_staging_area(…, name) to access the staged keys and the associated data.

Parameters:
name: str

Name for the staging area

keys: iterable

The keys to stage

Returns:
dict

dict that maps each worker-rank to the workers set of staged keys

Notes

In the context of explicit-comms, staging is the act of duplicating the responsibility of Dask keys. When staging a key, the worker owning the key (as assigned by the Dask scheduler) save a reference to the key and the associated data to its local staging area. From this point on, if the scheduler cancels the key, the worker (and the task running on the worker) now has exclusive access to the key and the associated data. This way, staging makes it possible for long running explicit-comms tasks to free input data ASAP.

submit(worker, coroutine, *args, wait=False)[source]

Run a coroutine on a single worker

The coroutine is given the worker’s state dict as the first argument and *args as the following arguments.

Parameters:
worker: str

Worker to run the coroutine

coroutine: coroutine

The function to run on the worker

*args:

Arguments for coroutine

wait: boolean, optional

If True, waits for the coroutine to finished before returning.

Returns:
ret: object or Future

If wait=True, the result of coroutine If wait=False, Future that can be waited on later.

dask_cuda.explicit_comms.dataframe.shuffle.shuffle(df: DataFrame, column_names: List[str], npartitions: int | None = None, ignore_index: bool = False, batchsize: int | None = None) DataFrame[source]

Order divisions of DataFrame so that all values within column(s) align

This enacts a task-based shuffle using explicit-comms. It requires a full dataset read, serialization and shuffle. This is expensive. If possible you should avoid shuffles.

This does not preserve a meaningful index/partitioning scheme. This is not deterministic if done in parallel.

Requires an activate client.

Parameters:
df: dask.dataframe.DataFrame

Dataframe to shuffle

column_names: list of strings

List of column names on which we want to split.

npartitions: int or None

The desired number of output partitions. If None, the number of output partitions equals df.npartitions

ignore_index: bool

Ignore index during shuffle. If True, performance may improve, but index values will not be preserved.

batchsize: int

A shuffle consist of multiple rounds where each worker partitions and then all-to-all communicates a number of its dataframe partitions. The batch size is the number of partitions each worker will handle in each round. If -1, each worker will handle all its partitions in a single round and all techniques to reduce memory usage are disabled, which might be faster when memory pressure isn’t an issue. If None, the value of DASK_EXPLICIT_COMMS_BATCHSIZE is used or 1 if not set thus by default, we prioritize robustness over performance.

Returns:
df: dask.dataframe.DataFrame

Shuffled dataframe