UCX Integration
===============
Communication can be a major bottleneck in distributed systems.
Dask-CUDA addresses this by supporting integration with `UCX `_, an optimized communication framework that provides high-performance networking and supports a variety of transport methods, including `NVLink `_ and `InfiniBand `_ for systems with specialized hardware, and TCP for systems without it.
This integration is enabled through `UCX-Py `_, an interface that provides Python bindings for UCX.
Hardware requirements
---------------------
To use UCX with NVLink or InfiniBand, relevant GPUs must be connected with NVLink bridges or NVIDIA Mellanox InfiniBand Adapters, respectively.
NVIDIA provides comparison charts for both `NVLink bridges `_ and `InfiniBand adapters `_.
Software requirements
---------------------
UCX integration requires an environment with both UCX and UCX-Py installed; see `UCX-Py Installation `_ for detailed instructions on this process.
When using UCX, each NVLink and InfiniBand memory buffer must create a mapping between each unique pair of processes they are transferred across; this can be quite costly, potentially in the range of hundreds of milliseconds per mapping.
For this reason, it is strongly recommended to use `RAPIDS Memory Manager (RMM) `_ to allocate a memory pool that is only prone to a single mapping operation, which all subsequent transfers may rely upon.
A memory pool also prevents the Dask scheduler from deserializing CUDA data, which will cause a crash.
.. warning::
Dask-CUDA must create worker CUDA contexts during cluster initialization, and properly ordering that task is critical for correct UCX configuration.
If a CUDA context already exists for this process at the time of cluster initialization, unexpected behavior can occur.
To avoid this, it is advised to initialize any UCX-enabled clusters before doing operations that would result in a CUDA context being created.
Depending on the library, even an import can force CUDA context creation.
For some RAPIDS libraries (e.g. cuDF), setting ``RAPIDS_NO_INITIALIZE=1`` at runtime will delay or disable their CUDA context creation, allowing for improved compatibility with UCX-enabled clusters and preventing runtime warnings.
Configuration
-------------
Automatic
~~~~~~~~~
Beginning with Dask-CUDA 22.02 and assuming UCX >= 1.11.1, specifying UCX transports is now optional.
A local cluster can now be started with ``LocalCUDACluster(protocol="ucx")``, implying automatic UCX transport selection (``UCX_TLS=all``). Starting a cluster separately -- scheduler, workers and client as different processes -- is also possible, as long as Dask scheduler is created with ``dask scheduler --protocol="ucx"`` and connecting a ``dask cuda worker`` to the scheduler will imply automatic UCX transport selection, but that requires the Dask scheduler and client to be started with ``DASK_DISTRIBUTED__COMM__UCX__CREATE_CUDA_CONTEXT=True``. See `Enabling UCX communication `_ for more details examples of UCX usage with automatic configuration.
Configuring transports manually is still possible, please refer to the subsection below.
Manual
~~~~~~
In addition to installations of UCX and UCX-Py on your system, for manual configuration several options must be specified within your Dask configuration to enable the integration.
Typically, these will affect ``UCX_TLS`` and ``UCX_SOCKADDR_TLS_PRIORITY``, environment variables used by UCX to decide what transport methods to use and which to prioritize, respectively.
However, some will affect related libraries, such as RMM:
- ``distributed.comm.ucx.cuda_copy: true`` -- **required.**
Adds ``cuda_copy`` to ``UCX_TLS``, enabling CUDA transfers over UCX.
- ``distributed.comm.ucx.tcp: true`` -- **required.**
Adds ``tcp`` to ``UCX_TLS``, enabling TCP transfers over UCX; this is required for very small transfers which are inefficient for NVLink and InfiniBand.
- ``distributed.comm.ucx.nvlink: true`` -- **required for NVLink.**
Adds ``cuda_ipc`` to ``UCX_TLS``, enabling NVLink transfers over UCX; affects intra-node communication only.
- ``distributed.comm.ucx.infiniband: true`` -- **required for InfiniBand.**
Adds ``rc`` to ``UCX_TLS``, enabling InfiniBand transfers over UCX.
For optimal performance with UCX 1.11 and above, it is recommended to also set the environment variables ``UCX_MAX_RNDV_RAILS=1`` and ``UCX_MEMTYPE_REG_WHOLE_ALLOC_TYPES=cuda``, see documentation `here `_ and `here `_ for more details on those variables.
- ``distributed.comm.ucx.rdmacm: true`` -- **recommended for InfiniBand.**
Replaces ``sockcm`` with ``rdmacm`` in ``UCX_SOCKADDR_TLS_PRIORITY``, enabling remote direct memory access (RDMA) for InfiniBand transfers.
This is recommended by UCX for use with InfiniBand, and will not work if InfiniBand is disabled.
- ``distributed.rmm.pool-size: `` -- **recommended.**
Allocates an RMM pool of the specified size for the process; size can be provided with an integer number of bytes or in human readable format, e.g. ``"4GB"``.
It is recommended to set the pool size to at least the minimum amount of memory used by the process; if possible, one can map all GPU memory to a single pool, to be utilized for the lifetime of the process.
.. note::
These options can be used with mainline Dask.distributed.
However, some features are exclusive to Dask-CUDA, such as the automatic detection of InfiniBand interfaces.
See `Dask-CUDA -- Motivation `_ for more details on the benefits of using Dask-CUDA.
Usage
-----
See `Enabling UCX communication `_ for examples of UCX usage with different supported transports.
Running in a fork-starved environment
-------------------------------------
Many high-performance networking stacks do not support the user
application calling ``fork()`` after the network substrate is
initialized. Symptoms include jobs randomly hanging, or crashing,
especially when using a large number of workers. To mitigate against
this when using Dask-CUDA's UCX integration, processes launched via
multiprocessing should use the start processes using the
`"forkserver"
`_
method. When launching workers using `dask cuda worker `_, this can be
achieved by passing ``--multiprocessing-method forkserver`` as an
argument. In user code, the method can be controlled with the
``distributed.worker.multiprocessing-method`` configuration key in
``dask``. One must take care to, in addition, manually ensure that the
forkserver is running before launching any jobs. A run script should
therefore do something like the following:
.. code-block::
import dask
if __name__ == "__main__":
import multiprocessing.forkserver as f
f.ensure_running()
with dask.config.set(
{"distributed.worker.multiprocessing-method": "forkserver"}
):
run_analysis(...)
.. note::
In addition to this, at present one must also set
``PTXCOMPILER_CHECK_NUMBA_CODEGEN_PATCH_NEEDED=0`` in the
environment to avoid a subprocess call from `ptxcompiler
`_
.. note::
To confirm that no bad fork calls are occurring, start jobs with
``UCX_IB_FORK_INIT=n``. UCX will produce a warning ``UCX WARN IB:
ibv_fork_init() was disabled or failed, yet a fork() has been
issued.`` if the application calls ``fork()``.