UCX Integration

Communication can be a major bottleneck in distributed systems. Dask-CUDA addresses this by supporting integration with UCX, an optimized communication framework that provides high-performance networking and supports a variety of transport methods, including NVLink and InfiniBand for systems with specialized hardware, and TCP for systems without it. This integration is enabled through UCX-Py, an interface that provides Python bindings for UCX.

Hardware requirements

To use UCX with NVLink or InfiniBand, relevant GPUs must be connected with NVLink bridges or NVIDIA Mellanox InfiniBand Adapters, respectively. NVIDIA provides comparison charts for both NVLink bridges and InfiniBand adapters.

Software requirements

UCX integration requires an environment with both UCX and UCX-Py installed; see UCX-Py Installation for detailed instructions on this process.

When using UCX, each NVLink and InfiniBand memory buffer must create a mapping between each unique pair of processes they are transferred across; this can be quite costly, potentially in the range of hundreds of milliseconds per mapping. For this reason, it is strongly recommended to use RAPIDS Memory Manager (RMM) to allocate a memory pool that is only prone to a single mapping operation, which all subsequent transfers may rely upon. A memory pool also prevents the Dask scheduler from deserializing CUDA data, which will cause a crash.

Warning

Dask-CUDA must create worker CUDA contexts during cluster initialization, and properly ordering that task is critical for correct UCX configuration. If a CUDA context already exists for this process at the time of cluster initialization, unexpected behavior can occur. To avoid this, it is advised to initialize any UCX-enabled clusters before doing operations that would result in a CUDA context being created. Depending on the library, even an import can force CUDA context creation.

For some RAPIDS libraries (e.g. cuDF), setting RAPIDS_NO_INITIALIZE=1 at runtime will delay or disable their CUDA context creation, allowing for improved compatibility with UCX-enabled clusters and preventing runtime warnings.

Configuration

Automatic

Beginning with Dask-CUDA 22.02 and assuming UCX >= 1.11.1, specifying UCX transports is now optional.

A local cluster can now be started with LocalCUDACluster(protocol="ucx"), implying automatic UCX transport selection (UCX_TLS=all). Starting a cluster separately – scheduler, workers and client as different processes – is also possible, as long as Dask scheduler is created with dask scheduler --protocol="ucx" and connecting a dask cuda worker to the scheduler will imply automatic UCX transport selection, but that requires the Dask scheduler and client to be started with DASK_DISTRIBUTED__COMM__UCX__CREATE_CUDA_CONTEXT=True. See Enabling UCX communication for more details examples of UCX usage with automatic configuration.

Configuring transports manually is still possible, please refer to the subsection below.

Manual

In addition to installations of UCX and UCX-Py on your system, for manual configuration several options must be specified within your Dask configuration to enable the integration. Typically, these will affect UCX_TLS and UCX_SOCKADDR_TLS_PRIORITY, environment variables used by UCX to decide what transport methods to use and which to prioritize, respectively. However, some will affect related libraries, such as RMM:

  • distributed.comm.ucx.cuda_copy: truerequired.

    Adds cuda_copy to UCX_TLS, enabling CUDA transfers over UCX.

  • distributed.comm.ucx.tcp: truerequired.

    Adds tcp to UCX_TLS, enabling TCP transfers over UCX; this is required for very small transfers which are inefficient for NVLink and InfiniBand.

  • distributed.comm.ucx.nvlink: truerequired for NVLink.

    Adds cuda_ipc to UCX_TLS, enabling NVLink transfers over UCX; affects intra-node communication only.

  • distributed.comm.ucx.infiniband: truerequired for InfiniBand.

    Adds rc to UCX_TLS, enabling InfiniBand transfers over UCX.

    For optimal performance with UCX 1.11 and above, it is recommended to also set the environment variables UCX_MAX_RNDV_RAILS=1 and UCX_MEMTYPE_REG_WHOLE_ALLOC_TYPES=cuda, see documentation here and here for more details on those variables.

  • distributed.comm.ucx.rdmacm: truerecommended for InfiniBand.

    Replaces sockcm with rdmacm in UCX_SOCKADDR_TLS_PRIORITY, enabling remote direct memory access (RDMA) for InfiniBand transfers. This is recommended by UCX for use with InfiniBand, and will not work if InfiniBand is disabled.

  • distributed.rmm.pool-size: <str|int>recommended.

    Allocates an RMM pool of the specified size for the process; size can be provided with an integer number of bytes or in human readable format, e.g. "4GB". It is recommended to set the pool size to at least the minimum amount of memory used by the process; if possible, one can map all GPU memory to a single pool, to be utilized for the lifetime of the process.

Note

These options can be used with mainline Dask.distributed. However, some features are exclusive to Dask-CUDA, such as the automatic detection of InfiniBand interfaces. See Dask-CUDA – Motivation for more details on the benefits of using Dask-CUDA.

Usage

See Enabling UCX communication for examples of UCX usage with different supported transports.

Running in a fork-starved environment

Many high-performance networking stacks do not support the user application calling fork() after the network substrate is initialized. Symptoms include jobs randomly hanging, or crashing, especially when using a large number of workers. To mitigate against this when using Dask-CUDA’s UCX integration, processes launched via multiprocessing should use the start processes using the “forkserver” method. When launching workers using dask cuda worker, this can be achieved by passing --multiprocessing-method forkserver as an argument. In user code, the method can be controlled with the distributed.worker.multiprocessing-method configuration key in dask. One must take care to, in addition, manually ensure that the forkserver is running before launching any jobs. A run script should therefore do something like the following:

import dask

if __name__ == "__main__":
    import multiprocessing.forkserver as f
    f.ensure_running()
    with dask.config.set(
        {"distributed.worker.multiprocessing-method": "forkserver"}
    ):
        run_analysis(...)

Note

In addition to this, at present one must also set PTXCOMPILER_CHECK_NUMBA_CODEGEN_PATCH_NEEDED=0 in the environment to avoid a subprocess call from ptxcompiler

Note

To confirm that no bad fork calls are occurring, start jobs with UCX_IB_FORK_INIT=n. UCX will produce a warning UCX  WARN  IB: ibv_fork_init() was disabled or failed, yet a fork() has been issued. if the application calls fork().