API
Cluster
- class dask_cuda.LocalCUDACluster(CUDA_VISIBLE_DEVICES=None, n_workers=None, threads_per_worker=1, memory_limit='auto', device_memory_limit=0.8, enable_cudf_spill=False, cudf_spill_stats=0, data=None, local_directory=None, shared_filesystem=None, protocol=None, enable_tcp_over_ucx=None, enable_infiniband=None, enable_nvlink=None, enable_rdmacm=None, rmm_pool_size=None, rmm_maximum_pool_size=None, rmm_managed_memory=False, rmm_async=False, rmm_allocator_external_lib_list=None, rmm_release_threshold=None, rmm_log_directory=None, rmm_track_allocations=False, jit_unspill=None, log_spilling=False, worker_class=None, pre_import=None, **kwargs)[source]
A variant of
dask.distributed.LocalCluster
that uses one GPU per process.This assigns a different
CUDA_VISIBLE_DEVICES
environment variable to each Dask worker process.For machines with a complex architecture mapping CPUs, GPUs, and network hardware, such as NVIDIA DGX-1 and DGX-2, this class creates a local cluster that tries to respect this hardware as much as possible.
Each worker process is automatically assigned the correct CPU cores and network interface cards to maximize performance. If UCX and UCX-Py are available, InfiniBand and NVLink connections can be used to optimize data transfer performance.
- Parameters:
- CUDA_VISIBLE_DEVICESstr, list of int, or None, default None
GPUs to restrict activity to. Can be a string (like
"0,1,2,3"
), list (like[0, 1, 2, 3]
), orNone
to use all available GPUs.- n_workersint or None, default None
Number of workers. Can be an integer or
None
to fall back on the GPUs specified byCUDA_VISIBLE_DEVICES
. The value ofn_workers
must be smaller or equal to the number of GPUs specified inCUDA_VISIBLE_DEVICES
when the latter is specified, and if smaller, only the firstn_workers
GPUs will be used.- threads_per_workerint, default 1
Number of threads to be used for each Dask worker process.
- memory_limitint, float, str, or None, default “auto”
Size of the host LRU cache, which is used to determine when the worker starts spilling to disk (not available if JIT-Unspill is enabled). Can be an integer (bytes), float (fraction of total system memory), string (like
"5GB"
or"5000M"
), or"auto"
, 0, orNone
for no memory management.- device_memory_limitint, float, str, or None, default 0.8
Size of the CUDA device LRU cache, which is used to determine when the worker starts spilling to host memory. Can be an integer (bytes), float (fraction of total device memory), string (like
"5GB"
or"5000M"
), or"auto"
, 0, orNone
to disable spilling to host (i.e. allow full device memory usage).- enable_cudf_spillbool, default False
Enable automatic cuDF spilling.
Warning
This should NOT be used together with JIT-Unspill.
- cudf_spill_statsint, default 0
Set the cuDF spilling statistics level. This option has no effect if
enable_cudf_spill=False
.- local_directorystr or None, default None
Path on local machine to store temporary files. Can be a string (like
"path/to/files"
) orNone
to fall back on the value ofdask.temporary-directory
in the local Dask configuration, using the current working directory if this is not set.- shared_filesystem: bool or None, default None
Whether the local_directory above is shared between all workers or not. If
None
, the “jit-unspill-shared-fs” config value are used, which defaults to True. Notice, in all other cases this option defaults to False, but on a local cluster it defaults to True – we assume all workers use the same filesystem.- protocolstr or None, default None
Protocol to use for communication. Can be a string (like
"tcp"
or"ucx"
), orNone
to automatically choose the correct protocol.- enable_tcp_over_ucxbool, default None
Set environment variables to enable TCP over UCX, even if InfiniBand and NVLink are not supported or disabled.
- enable_infinibandbool, default None
Set environment variables to enable UCX over InfiniBand, requires
protocol="ucx"
and impliesenable_tcp_over_ucx=True
whenTrue
.- enable_nvlinkbool, default None
Set environment variables to enable UCX over NVLink, requires
protocol="ucx"
and impliesenable_tcp_over_ucx=True
whenTrue
.- enable_rdmacmbool, default None
Set environment variables to enable UCX RDMA connection manager support, requires
protocol="ucx"
andenable_infiniband=True
.- rmm_pool_sizeint, str or None, default None
RMM pool size to initialize each worker with. Can be an integer (bytes), float (fraction of total device memory), string (like
"5GB"
or"5000M"
), orNone
to disable RMM pools.Note
This size is a per-worker configuration, and not cluster-wide.
- rmm_maximum_pool_sizeint, str or None, default None
When
rmm_pool_size
is set, this argument indicates the maximum pool size. Can be an integer (bytes), float (fraction of total device memory), string (like"5GB"
or"5000M"
) orNone
. By default, the total available memory on the GPU is used.rmm_pool_size
must be specified to use RMM pool and to set the maximum pool size.Note
When paired with –enable-rmm-async the maximum size cannot be guaranteed due to fragmentation.
Note
This size is a per-worker configuration, and not cluster-wide.
- rmm_managed_memorybool, default False
Initialize each worker with RMM and set it to use managed memory. If disabled, RMM may still be used by specifying
rmm_pool_size
.Warning
Managed memory is currently incompatible with NVLink. Trying to enable both will result in an exception.
- rmm_async: bool, default False
Initialize each worker with RMM and set it to use RMM’s asynchronous allocator. See
rmm.mr.CudaAsyncMemoryResource
for more info.Warning
The asynchronous allocator requires CUDA Toolkit 11.2 or newer. It is also incompatible with RMM pools and managed memory. Trying to enable both will result in an exception.
- rmm_allocator_external_lib_list: str, list or None, default None
List of external libraries for which to set RMM as the allocator. Supported options are:
["torch", "cupy"]
. Can be a comma-separated string (like"torch,cupy"
) or a list of strings (like["torch", "cupy"]
). IfNone
, no external libraries will use RMM as their allocator.- rmm_release_threshold: int, str or None, default None
When
rmm.async is True
and the pool size grows beyond this value, unused memory held by the pool will be released at the next synchronization point. Can be an integer (bytes), float (fraction of total device memory), string (like"5GB"
or"5000M"
) orNone
. By default, this feature is disabled.Note
This size is a per-worker configuration, and not cluster-wide.
- rmm_log_directorystr or None, default None
Directory to write per-worker RMM log files to. The client and scheduler are not logged here. Can be a string (like
"/path/to/logs/"
) orNone
to disable logging.Note
Logging will only be enabled if
rmm_pool_size
is specified orrmm_managed_memory=True
.- rmm_track_allocationsbool, default False
If True, wraps the memory resource used by each worker with a
rmm.mr.TrackingResourceAdaptor
, which tracks the amount of memory allocated.Note
This option enables additional diagnostics to be collected and reported by the Dask dashboard. However, there is significant overhead associated with this and it should only be used for debugging and memory profiling.
- jit_unspillbool or None, default None
Enable just-in-time unspilling. Can be a boolean or
None
to fall back on the value ofdask.jit-unspill
in the local Dask configuration, disabling unspilling if this is not set.Note
This is experimental and doesn’t support memory spilling to disk. See
proxy_object.ProxyObject
andproxify_host_file.ProxifyHostFile
for more info.- log_spillingbool, default True
Enable logging of spilling operations directly to
distributed.Worker
with anINFO
log level.- pre_importstr, list or None, default None
Pre-import libraries as a Worker plugin to prevent long import times bleeding through later Dask operations. Should be a list of comma-separated names, such as “cudf,rmm” or a list of strings such as [“cudf”, “rmm”].
- Raises:
- TypeError
If InfiniBand or NVLink are enabled and
protocol!="ucx"
.- ValueError
If RMM pool, RMM managed memory or RMM async allocator are requested but RMM cannot be imported. If RMM managed memory and asynchronous allocator are both enabled. If RMM maximum pool size is set but RMM pool size is not. If RMM maximum pool size is set but RMM async allocator is used. If RMM release threshold is set but the RMM async allocator is not being used.
See also
LocalCluster
Examples
>>> from dask_cuda import LocalCUDACluster >>> from dask.distributed import Client >>> cluster = LocalCUDACluster() >>> client = Client(cluster)
CLI
Worker
dask cuda
Launch a distributed worker with GPUs attached to an existing scheduler.
A scheduler can be specified either through a URI passed through the SCHEDULER
argument or a scheduler file passed through the --scheduler-file
option.
See https://docs.rapids.ai/api/dask-cuda/stable/quickstart.html#dask-cuda-worker for info.
dask cuda [OPTIONS] [SCHEDULER] [PRELOAD_ARGV]...
Options
- --host <host>
IP address of serving host; should be visible to the scheduler and other workers. Can be a string (like
"127.0.0.1"
) orNone
to fall back on the address of the interface specified by--interface
or the default interface.
- --nthreads <nthreads>
Number of threads to be used for each Dask worker process.
- Default:
1
- --name <name>
A unique name for the worker. Can be a string (like
"worker-1"
) orNone
for a nameless worker.
- --memory-limit <memory_limit>
Size of the host LRU cache, which is used to determine when the worker starts spilling to disk (not available if JIT-Unspill is enabled). Can be an integer (bytes), float (fraction of total system memory), string (like
"5GB"
or"5000M"
), or"auto"
, 0, orNone
for no memory management.- Default:
'auto'
- --device-memory-limit <device_memory_limit>
Size of the CUDA device LRU cache, which is used to determine when the worker starts spilling to host memory. Can be an integer (bytes), float (fraction of total device memory), string (like
"5GB"
or"5000M"
), or"auto"
or 0 to disable spilling to host (i.e. allow full device memory usage).- Default:
'0.8'
- --enable-cudf-spill, --disable-cudf-spill
Enable automatic cuDF spilling. WARNING: This should NOT be used with JIT-Unspill.
- Default:
False
- --cudf-spill-stats <cudf_spill_stats>
Set the cuDF spilling statistics level. This option has no effect if –enable-cudf-spill is not specified.
- --rmm-pool-size <rmm_pool_size>
RMM pool size to initialize each worker with. Can be an integer (bytes), float (fraction of total device memory), string (like
"5GB"
or"5000M"
), orNone
to disable RMM pools.Note
This size is a per-worker configuration, and not cluster-wide.
- --rmm-maximum-pool-size <rmm_maximum_pool_size>
When
--rmm-pool-size
is specified, this argument indicates the maximum pool size. Can be an integer (bytes), float (fraction of total device memory), string (like"5GB"
or"5000M"
) orNone
. By default, the total available memory on the GPU is used.rmm_pool_size
must be specified to use RMM pool and to set the maximum pool size.Note
When paired with –enable-rmm-async the maximum size cannot be guaranteed due to fragmentation.
Note
This size is a per-worker configuration, and not cluster-wide.
- --rmm-managed-memory, --no-rmm-managed-memory
Initialize each worker with RMM and set it to use managed memory. If disabled, RMM may still be used by specifying
--rmm-pool-size
.Warning
Managed memory is currently incompatible with NVLink. Trying to enable both will result in failure.
- Default:
False
- --rmm-async, --no-rmm-async
Initialize each worker with RMM and set it to use RMM’s asynchronous allocator. See
rmm.mr.CudaAsyncMemoryResource
for more info.Warning
The asynchronous allocator requires CUDA Toolkit 11.2 or newer. It is also incompatible with RMM pools and managed memory, trying to enable both will result in failure.
- Default:
False
- --set-rmm-allocator-for-libs <rmm_allocator_external_lib_list>
Set RMM as the allocator for external libraries. Provide a comma-separated list of libraries to set, e.g., “torch,cupy”.
- Options:
cupy | torch
- --rmm-release-threshold <rmm_release_threshold>
When
rmm.async
isTrue
and the pool size grows beyond this value, unused memory held by the pool will be released at the next synchronization point. Can be an integer (bytes), float (fraction of total device memory), string (like"5GB"
or"5000M"
) orNone
. By default, this feature is disabled.Note
This size is a per-worker configuration, and not cluster-wide.
- --rmm-log-directory <rmm_log_directory>
Directory to write per-worker RMM log files to. The client and scheduler are not logged here. Can be a string (like
"/path/to/logs/"
) orNone
to disable logging.Note
Logging will only be enabled if
--rmm-pool-size
or--rmm-managed-memory
are specified.
- --rmm-track-allocations, --no-rmm-track-allocations
Track memory allocations made by RMM. If
True
, wraps the memory resource of each worker with armm.mr.TrackingResourceAdaptor
that allows querying the amount of memory allocated by RMM.- Default:
False
- --pid-file <pid_file>
File to write the process PID.
- --resources <resources>
Resources for task constraints like
"GPU=2 MEM=10e9"
.
- --dashboard, --no-dashboard
Launch the dashboard.
- Default:
True
- --dashboard-address <dashboard_address>
Relative address to serve the dashboard (if enabled).
- Default:
':0'
- --local-directory <local_directory>
Path on local machine to store temporary files. Can be a string (like
"path/to/files"
) orNone
to fall back on the value ofdask.temporary-directory
in the local Dask configuration, using the current working directory if this is not set.
If –shared-filesystem is specified, inform JIT-Unspill that local_directory is a shared filesystem available for all workers, whereas –no-shared-filesystem informs it may not assume it’s a shared filesystem. If neither is specified, JIT-Unspill will decide based on the Dask config value specified by “jit-unspill-shared-fs”. Notice, a shared filesystem must support the os.link() operation.
- --scheduler-file <scheduler_file>
Filename to JSON encoded scheduler information. To be used in conjunction with the equivalent
dask scheduler
option.
- --protocol <protocol>
Protocol like tcp, tls, or ucx
- --interface <interface>
External interface used to connect to the scheduler. Usually an ethernet interface is used for connection, and not an InfiniBand interface (if one is available). Can be a string (like
"eth0"
for NVLink or"ib0"
for InfiniBand) orNone
to fall back on the default interface.
- --preload <preload>
Module that should be loaded by each worker process like
"foo.bar"
or"/path/to/foo.py"
.
- --death-timeout <death_timeout>
Seconds to wait for a scheduler before closing
- --dashboard-prefix <dashboard_prefix>
Prefix for the dashboard. Can be a string (like …) or
None
for no prefix.
- --tls-ca-file <tls_ca_file>
CA certificate(s) file for TLS (in PEM format). Can be a string (like
"path/to/certs"
), orNone
for no certificate(s).
- --tls-cert <tls_cert>
Certificate file for TLS (in PEM format). Can be a string (like
"path/to/certs"
), orNone
for no certificate(s).
- --tls-key <tls_key>
Private key file for TLS (in PEM format). Can be a string (like
"path/to/certs"
), orNone
for no private key.
- --enable-tcp-over-ucx, --disable-tcp-over-ucx
Set environment variables to enable TCP over UCX, even if InfiniBand and NVLink are not supported or disabled.
- --enable-infiniband, --disable-infiniband
Set environment variables to enable UCX over InfiniBand, implies
--enable-tcp-over-ucx
when enabled.
- --enable-nvlink, --disable-nvlink
Set environment variables to enable UCX over NVLink, implies
--enable-tcp-over-ucx
when enabled.
- --enable-rdmacm, --disable-rdmacm
Set environment variables to enable UCX RDMA connection manager support, requires
--enable-infiniband
.
- --enable-jit-unspill, --disable-jit-unspill
Enable just-in-time unspilling. Can be a boolean or
None
to fall back on the value ofdask.jit-unspill
in the local Dask configuration, disabling unspilling if this is not set.Note
This is experimental and doesn’t support memory spilling to disk. See
proxy_object.ProxyObject
andproxify_host_file.ProxifyHostFile
for more info.
- --worker-class <worker_class>
Use a different class than Distributed’s default (
distributed.Worker
) to spawndistributed.Nanny
.
- --pre-import <pre_import>
Pre-import libraries as a Worker plugin to prevent long import times bleeding through later Dask operations. Should be a list of comma-separated names, such as “cudf,rmm”.
- --multiprocessing-method <multiprocessing_method>
Method used to start new processes with multiprocessing
- Options:
spawn | fork | forkserver
Arguments
- SCHEDULER
Optional argument
- PRELOAD_ARGV
Optional argument(s)
Cluster configuration
dask cuda
Query an existing GPU cluster’s configuration.
A cluster can be specified either through a URI passed through the SCHEDULER
argument or a scheduler file passed through the --scheduler-file
option.
dask cuda [OPTIONS] [SCHEDULER] [PRELOAD_ARGV]...
Options
- --scheduler-file <scheduler_file>
Filename to JSON encoded scheduler information. To be used in conjunction with the equivalent
dask scheduler
option.
- --tls-ca-file <tls_ca_file>
CA certificate(s) file for TLS (in PEM format). Can be a string (like
"path/to/certs"
), orNone
for no certificate(s).
- --tls-cert <tls_cert>
Certificate file for TLS (in PEM format). Can be a string (like
"path/to/certs"
), orNone
for no certificate(s).
- --tls-key <tls_key>
Private key file for TLS (in PEM format). Can be a string (like
"path/to/certs"
), orNone
for no private key.
Arguments
- SCHEDULER
Optional argument
- PRELOAD_ARGV
Optional argument(s)
Client initialization
- dask_cuda.initialize.initialize(create_cuda_context=True, enable_tcp_over_ucx=None, enable_infiniband=None, enable_nvlink=None, enable_rdmacm=None, protocol='ucx')[source]
Create CUDA context and initialize UCX-Py, depending on user parameters.
Sometimes it is convenient to initialize the CUDA context, particularly before starting up Dask worker processes which create a variety of threads.
To ensure UCX works correctly, it is important to ensure it is initialized with the correct options. This is especially important for the client, which cannot be configured to use UCX with arguments like
LocalCUDACluster
anddask cuda worker
. This function will ensure that they are provided a UCX configuration based on the flags and options passed by the user.This function can also be used within a worker preload script for UCX configuration of mainline Dask.distributed. https://docs.dask.org/en/latest/setup/custom-startup.html
You can add it to your global config with the following YAML:
distributed: worker: preload: - dask_cuda.initialize
See https://docs.dask.org/en/latest/configuration.html for more information about Dask configuration.
- Parameters:
- create_cuda_contextbool, default True
Create CUDA context on initialization.
- enable_tcp_over_ucxbool, default None
Set environment variables to enable TCP over UCX, even if InfiniBand and NVLink are not supported or disabled.
- enable_infinibandbool, default None
Set environment variables to enable UCX over InfiniBand, implies
enable_tcp_over_ucx=True
whenTrue
.- enable_nvlinkbool, default None
Set environment variables to enable UCX over NVLink, implies
enable_tcp_over_ucx=True
whenTrue
.- enable_rdmacmbool, default None
Set environment variables to enable UCX RDMA connection manager support, requires
enable_infiniband=True
.
Explicit-comms
- class dask_cuda.explicit_comms.comms.CommsContext(client: Client | None = None)[source]
Communication handler for explicit communication
- Parameters:
- client: Client, optional
Specify client to use for communication. If None, use the default client.
- run(coroutine, *args, workers=None, lock_workers=False)[source]
Run a coroutine on multiple workers
The coroutine is given the worker’s state dict as the first argument and
*args
as the following arguments.- Parameters:
- coroutine: coroutine
The function to run on each worker
- *args:
Arguments for
coroutine
- workers: list, optional
List of workers. Default is all workers
- lock_workers: bool, optional
Use distributed.MultiLock to get exclusive access to the workers. Use this flag to support parallel runs.
- Returns:
- ret: list
List of the output from each worker
- stage_keys(name: str, keys: Iterable[Hashable]) Dict[int, set] [source]
Staging keys on workers under the given name
In an explicit-comms task, use pop_staging_area(…, name) to access the staged keys and the associated data.
- Parameters:
- name: str
Name for the staging area
- keys: iterable
The keys to stage
- Returns:
- dict
dict that maps each worker-rank to the workers set of staged keys
Notes
In the context of explicit-comms, staging is the act of duplicating the responsibility of Dask keys. When staging a key, the worker owning the key (as assigned by the Dask scheduler) save a reference to the key and the associated data to its local staging area. From this point on, if the scheduler cancels the key, the worker (and the task running on the worker) now has exclusive access to the key and the associated data. This way, staging makes it possible for long running explicit-comms tasks to free input data ASAP.
- submit(worker, coroutine, *args, wait=False)[source]
Run a coroutine on a single worker
The coroutine is given the worker’s state dict as the first argument and
*args
as the following arguments.- Parameters:
- worker: str
Worker to run the
coroutine
- coroutine: coroutine
The function to run on the worker
- *args:
Arguments for
coroutine
- wait: boolean, optional
If True, waits for the coroutine to finished before returning.
- Returns:
- ret: object or Future
If wait=True, the result of coroutine If wait=False, Future that can be waited on later.
- dask_cuda.explicit_comms.dataframe.shuffle.shuffle(df: DataFrame, column_names: List[str], npartitions: int | None = None, ignore_index: bool = False, batchsize: int | None = None) DataFrame [source]
Order divisions of DataFrame so that all values within column(s) align
This enacts a task-based shuffle using explicit-comms. It requires a full dataset read, serialization and shuffle. This is expensive. If possible you should avoid shuffles.
This does not preserve a meaningful index/partitioning scheme. This is not deterministic if done in parallel.
Requires an activate client.
- Parameters:
- df: dask.dataframe.DataFrame
Dataframe to shuffle
- column_names: list of strings
List of column names on which we want to split.
- npartitions: int or None
The desired number of output partitions. If None, the number of output partitions equals df.npartitions
- ignore_index: bool
Ignore index during shuffle. If True, performance may improve, but index values will not be preserved.
- batchsize: int
A shuffle consist of multiple rounds where each worker partitions and then all-to-all communicates a number of its dataframe partitions. The batch size is the number of partitions each worker will handle in each round. If -1, each worker will handle all its partitions in a single round and all techniques to reduce memory usage are disabled, which might be faster when memory pressure isn’t an issue. If None, the value of DASK_EXPLICIT_COMMS_BATCHSIZE is used or 1 if not set thus by default, we prioritize robustness over performance.
- Returns:
- df: dask.dataframe.DataFrame
Shuffled dataframe