import copy
import logging
import os
import warnings
from functools import partial
import dask
from distributed import LocalCluster, Nanny, Worker
from distributed.worker_memory import parse_memory_limit
from .device_host_file import DeviceHostFile
from .initialize import initialize
from .plugins import CPUAffinity, CUDFSetup, PreImport, RMMSetup
from .proxify_host_file import ProxifyHostFile
from .utils import (
cuda_visible_devices,
get_cpu_affinity,
get_ucx_config,
nvml_device_index,
parse_cuda_visible_device,
parse_device_memory_limit,
)
class LoggedWorker(Worker):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
async def start(self):
await super().start()
self.data.set_address(self.address)
class LoggedNanny(Nanny):
def __init__(self, *args, **kwargs):
super().__init__(*args, worker_class=LoggedWorker, **kwargs)
[docs]
class LocalCUDACluster(LocalCluster):
"""A variant of ``dask.distributed.LocalCluster`` that uses one GPU per process.
This assigns a different ``CUDA_VISIBLE_DEVICES`` environment variable to each Dask
worker process.
For machines with a complex architecture mapping CPUs, GPUs, and network hardware,
such as NVIDIA DGX-1 and DGX-2, this class creates a local cluster that tries to
respect this hardware as much as possible.
Each worker process is automatically assigned the correct CPU cores and network
interface cards to maximize performance. If UCX and UCX-Py are available, InfiniBand
and NVLink connections can be used to optimize data transfer performance.
Parameters
----------
CUDA_VISIBLE_DEVICES : str, list of int, or None, default None
GPUs to restrict activity to. Can be a string (like ``"0,1,2,3"``), list (like
``[0, 1, 2, 3]``), or ``None`` to use all available GPUs.
n_workers : int or None, default None
Number of workers. Can be an integer or ``None`` to fall back on the GPUs
specified by ``CUDA_VISIBLE_DEVICES``. The value of ``n_workers`` must be
smaller or equal to the number of GPUs specified in ``CUDA_VISIBLE_DEVICES``
when the latter is specified, and if smaller, only the first ``n_workers`` GPUs
will be used.
threads_per_worker : int, default 1
Number of threads to be used for each Dask worker process.
memory_limit : int, float, str, or None, default "auto"
Size of the host LRU cache, which is used to determine when the worker
starts spilling to disk (not available if JIT-Unspill is enabled). Can be an
integer (bytes), float (fraction of total system memory), string (like ``"5GB"``
or ``"5000M"``), or ``"auto"``, 0, or ``None`` for no memory management.
device_memory_limit : int, float, str, or None, default 0.8
Size of the CUDA device LRU cache, which is used to determine when the worker
starts spilling to host memory. Can be an integer (bytes), float (fraction of
total device memory), string (like ``"5GB"`` or ``"5000M"``), or ``"auto"``, 0,
or ``None`` to disable spilling to host (i.e. allow full device memory usage).
enable_cudf_spill : bool, default False
Enable automatic cuDF spilling.
.. warning::
This should NOT be used together with JIT-Unspill.
cudf_spill_stats : int, default 0
Set the cuDF spilling statistics level. This option has no effect if
``enable_cudf_spill=False``.
local_directory : str or None, default None
Path on local machine to store temporary files. Can be a string (like
``"path/to/files"``) or ``None`` to fall back on the value of
``dask.temporary-directory`` in the local Dask configuration, using the current
working directory if this is not set.
shared_filesystem: bool or None, default None
Whether the `local_directory` above is shared between all workers or not.
If ``None``, the "jit-unspill-shared-fs" config value are used, which
defaults to True. Notice, in all other cases this option defaults to False,
but on a local cluster it defaults to True -- we assume all workers use the
same filesystem.
protocol : str or None, default None
Protocol to use for communication. Can be a string (like ``"tcp"`` or
``"ucx"``), or ``None`` to automatically choose the correct protocol.
enable_tcp_over_ucx : bool, default None
Set environment variables to enable TCP over UCX, even if InfiniBand and NVLink
are not supported or disabled.
enable_infiniband : bool, default None
Set environment variables to enable UCX over InfiniBand, requires
``protocol="ucx"`` and implies ``enable_tcp_over_ucx=True`` when ``True``.
enable_nvlink : bool, default None
Set environment variables to enable UCX over NVLink, requires ``protocol="ucx"``
and implies ``enable_tcp_over_ucx=True`` when ``True``.
enable_rdmacm : bool, default None
Set environment variables to enable UCX RDMA connection manager support,
requires ``protocol="ucx"`` and ``enable_infiniband=True``.
rmm_pool_size : int, str or None, default None
RMM pool size to initialize each worker with. Can be an integer (bytes), float
(fraction of total device memory), string (like ``"5GB"`` or ``"5000M"``), or
``None`` to disable RMM pools.
.. note::
This size is a per-worker configuration, and not cluster-wide.
rmm_maximum_pool_size : int, str or None, default None
When ``rmm_pool_size`` is set, this argument indicates
the maximum pool size.
Can be an integer (bytes), float (fraction of total device memory), string
(like ``"5GB"`` or ``"5000M"``) or ``None``. By default, the total available
memory on the GPU is used. ``rmm_pool_size`` must be specified to use RMM pool
and to set the maximum pool size.
.. note::
When paired with `--enable-rmm-async` the maximum size cannot be guaranteed
due to fragmentation.
.. note::
This size is a per-worker configuration, and not cluster-wide.
rmm_managed_memory : bool, default False
Initialize each worker with RMM and set it to use managed memory. If disabled,
RMM may still be used by specifying ``rmm_pool_size``.
.. warning::
Managed memory is currently incompatible with NVLink. Trying to enable both
will result in an exception.
rmm_async: bool, default False
Initialize each worker with RMM and set it to use RMM's asynchronous allocator.
See ``rmm.mr.CudaAsyncMemoryResource`` for more info.
.. warning::
The asynchronous allocator requires CUDA Toolkit 11.2 or newer. It is also
incompatible with RMM pools and managed memory. Trying to enable both will
result in an exception.
rmm_release_threshold: int, str or None, default None
When ``rmm.async is True`` and the pool size grows beyond this value, unused
memory held by the pool will be released at the next synchronization point.
Can be an integer (bytes), float (fraction of total device memory), string (like
``"5GB"`` or ``"5000M"``) or ``None``. By default, this feature is disabled.
.. note::
This size is a per-worker configuration, and not cluster-wide.
rmm_log_directory : str or None, default None
Directory to write per-worker RMM log files to. The client and scheduler are not
logged here. Can be a string (like ``"/path/to/logs/"``) or ``None`` to
disable logging.
.. note::
Logging will only be enabled if ``rmm_pool_size`` is specified or
``rmm_managed_memory=True``.
rmm_track_allocations : bool, default False
If True, wraps the memory resource used by each worker with a
``rmm.mr.TrackingResourceAdaptor``, which tracks the amount of
memory allocated.
.. note::
This option enables additional diagnostics to be collected and
reported by the Dask dashboard. However, there is significant overhead
associated with this and it should only be used for debugging and
memory profiling.
jit_unspill : bool or None, default None
Enable just-in-time unspilling. Can be a boolean or ``None`` to fall back on
the value of ``dask.jit-unspill`` in the local Dask configuration, disabling
unspilling if this is not set.
.. note::
This is experimental and doesn't support memory spilling to disk. See
``proxy_object.ProxyObject`` and ``proxify_host_file.ProxifyHostFile`` for
more info.
log_spilling : bool, default True
Enable logging of spilling operations directly to ``distributed.Worker`` with an
``INFO`` log level.
pre_import : str, list or None, default None
Pre-import libraries as a Worker plugin to prevent long import times bleeding
through later Dask operations. Should be a list of comma-separated names,
such as "cudf,rmm" or a list of strings such as ["cudf", "rmm"].
Examples
--------
>>> from dask_cuda import LocalCUDACluster
>>> from dask.distributed import Client
>>> cluster = LocalCUDACluster()
>>> client = Client(cluster)
Raises
------
TypeError
If InfiniBand or NVLink are enabled and ``protocol!="ucx"``.
ValueError
If RMM pool, RMM managed memory or RMM async allocator are requested but RMM
cannot be imported.
If RMM managed memory and asynchronous allocator are both enabled.
If RMM maximum pool size is set but RMM pool size is not.
If RMM maximum pool size is set but RMM async allocator is used.
If RMM release threshold is set but the RMM async allocator is not being used.
See Also
--------
LocalCluster
"""
def __init__(
self,
CUDA_VISIBLE_DEVICES=None,
n_workers=None,
threads_per_worker=1,
memory_limit="auto",
device_memory_limit=0.8,
enable_cudf_spill=False,
cudf_spill_stats=0,
data=None,
local_directory=None,
shared_filesystem=None,
protocol=None,
enable_tcp_over_ucx=None,
enable_infiniband=None,
enable_nvlink=None,
enable_rdmacm=None,
rmm_pool_size=None,
rmm_maximum_pool_size=None,
rmm_managed_memory=False,
rmm_async=False,
rmm_release_threshold=None,
rmm_log_directory=None,
rmm_track_allocations=False,
jit_unspill=None,
log_spilling=False,
worker_class=None,
pre_import=None,
**kwargs,
):
# Required by RAPIDS libraries (e.g., cuDF) to ensure no context
# initialization happens before we can set CUDA_VISIBLE_DEVICES
os.environ["RAPIDS_NO_INITIALIZE"] = "True"
if enable_cudf_spill:
import cudf
# cuDF spilling must be enabled in the client/scheduler process too.
cudf.set_option("spill", enable_cudf_spill)
cudf.set_option("spill_stats", cudf_spill_stats)
if threads_per_worker < 1:
raise ValueError("threads_per_worker must be higher than 0.")
if CUDA_VISIBLE_DEVICES is None:
CUDA_VISIBLE_DEVICES = cuda_visible_devices(0)
if isinstance(CUDA_VISIBLE_DEVICES, str):
CUDA_VISIBLE_DEVICES = CUDA_VISIBLE_DEVICES.split(",")
CUDA_VISIBLE_DEVICES = list(
map(parse_cuda_visible_device, CUDA_VISIBLE_DEVICES)
)
if n_workers is None:
n_workers = len(CUDA_VISIBLE_DEVICES)
if n_workers < 1:
raise ValueError("Number of workers cannot be less than 1.")
# Set nthreads=1 when parsing mem_limit since it only depends on n_workers
logger = logging.getLogger(__name__)
self.memory_limit = parse_memory_limit(
memory_limit=memory_limit,
nthreads=1,
total_cores=n_workers,
logger=logger,
)
self.device_memory_limit = parse_device_memory_limit(
device_memory_limit, device_index=nvml_device_index(0, CUDA_VISIBLE_DEVICES)
)
self.enable_cudf_spill = enable_cudf_spill
self.cudf_spill_stats = cudf_spill_stats
self.rmm_pool_size = rmm_pool_size
self.rmm_maximum_pool_size = rmm_maximum_pool_size
self.rmm_managed_memory = rmm_managed_memory
self.rmm_async = rmm_async
self.rmm_release_threshold = rmm_release_threshold
if rmm_pool_size is not None or rmm_managed_memory or rmm_async:
try:
import rmm # noqa F401
except ImportError:
raise ValueError(
"RMM pool or managed memory requested but module 'rmm' "
"is not available. For installation instructions, please "
"see https://github.com/rapidsai/rmm"
) # pragma: no cover
else:
if enable_nvlink:
warnings.warn(
"When using NVLink we recommend setting a "
"`rmm_pool_size`. Please see: "
"https://docs.rapids.ai/api/dask-cuda/nightly/ucx/ "
"for more details"
)
self.rmm_log_directory = rmm_log_directory
self.rmm_track_allocations = rmm_track_allocations
if not kwargs.pop("processes", True):
raise ValueError(
"Processes are necessary in order to use multiple GPUs with Dask"
)
if shared_filesystem is None:
# Notice, we assume a shared filesystem
shared_filesystem = dask.config.get("jit-unspill-shared-fs", default=True)
if jit_unspill is None:
jit_unspill = dask.config.get("jit-unspill", default=False)
data = kwargs.pop("data", None)
if data is None:
if device_memory_limit is None and memory_limit is None:
data = {}
elif jit_unspill:
if enable_cudf_spill:
warnings.warn(
"Enabling cuDF spilling and JIT-Unspill together is not "
"safe, consider disabling JIT-Unspill."
)
data = (
ProxifyHostFile,
{
"device_memory_limit": self.device_memory_limit,
"memory_limit": self.memory_limit,
"shared_filesystem": shared_filesystem,
},
)
else:
data = (
DeviceHostFile,
{
"device_memory_limit": self.device_memory_limit,
"memory_limit": self.memory_limit,
"log_spilling": log_spilling,
},
)
if enable_tcp_over_ucx or enable_infiniband or enable_nvlink:
if protocol is None:
protocol = "ucx"
elif protocol not in ["ucx", "ucxx"]:
raise TypeError(
"Enabling InfiniBand or NVLink requires protocol='ucx' or "
"protocol='ucxx'"
)
self.host = kwargs.get("host", None)
initialize(
create_cuda_context=False,
enable_tcp_over_ucx=enable_tcp_over_ucx,
enable_nvlink=enable_nvlink,
enable_infiniband=enable_infiniband,
enable_rdmacm=enable_rdmacm,
)
if worker_class is not None:
if log_spilling is True:
raise ValueError(
"Cannot enable `log_spilling` when `worker_class` is specified. If "
"logging is needed, ensure `worker_class` is a subclass of "
"`distributed.local_cuda_cluster.LoggedNanny` or a subclass of "
"`distributed.local_cuda_cluster.LoggedWorker`, and specify "
"`log_spilling=False`."
)
if not issubclass(worker_class, Nanny):
worker_class = partial(Nanny, worker_class=worker_class)
self.pre_import = pre_import
super().__init__(
n_workers=0,
threads_per_worker=threads_per_worker,
memory_limit=self.memory_limit,
processes=True,
data=data,
local_directory=local_directory,
protocol=protocol,
worker_class=worker_class,
config={
"distributed.comm.ucx": get_ucx_config(
enable_tcp_over_ucx=enable_tcp_over_ucx,
enable_nvlink=enable_nvlink,
enable_infiniband=enable_infiniband,
enable_rdmacm=enable_rdmacm,
)
},
**kwargs,
)
self.new_spec["options"]["preload"] = self.new_spec["options"].get(
"preload", []
) + ["dask_cuda.initialize"]
self.new_spec["options"]["preload_argv"] = self.new_spec["options"].get(
"preload_argv", []
) + ["--create-cuda-context", "--protocol", protocol]
self.cuda_visible_devices = CUDA_VISIBLE_DEVICES
self.scale(n_workers)
self.sync(self._correct_state)
[docs]
def new_worker_spec(self):
try:
name = min(set(self.cuda_visible_devices) - set(self.worker_spec))
except Exception:
raise ValueError(
"Can not scale beyond visible devices", self.cuda_visible_devices
)
spec = copy.deepcopy(self.new_spec)
worker_count = self.cuda_visible_devices.index(name)
visible_devices = cuda_visible_devices(worker_count, self.cuda_visible_devices)
spec["options"].update(
{
"env": {
"CUDA_VISIBLE_DEVICES": visible_devices,
},
"plugins": {
CPUAffinity(
get_cpu_affinity(nvml_device_index(0, visible_devices))
),
RMMSetup(
initial_pool_size=self.rmm_pool_size,
maximum_pool_size=self.rmm_maximum_pool_size,
managed_memory=self.rmm_managed_memory,
async_alloc=self.rmm_async,
release_threshold=self.rmm_release_threshold,
log_directory=self.rmm_log_directory,
track_allocations=self.rmm_track_allocations,
),
PreImport(self.pre_import),
CUDFSetup(self.enable_cudf_spill, self.cudf_spill_stats),
},
}
)
return {name: spec}