Multi-node Multi-GPU#

RAFT contains C++ infrastructure for abstracting the communications layer when writing applications that scale on multiple nodes and across multiple GPUs. This infrastructure assumes OPG (one-process per GPU) architectures where multiple physical parallel units (processes, ranks, or workers) might be executing code concurrently but where each parallel unit is communicating with only a single GPU and is the only process communicating with each GPU.

The comms layer in RAFT is intended to provide a facade API for barrier synchronous collective communications, allowing users to write algorithms using a single abstraction layer and deploy in many different types of systems. Currently, RAFT communications code has been deployed in MPI, Dask, and Spark clusters.

Common Types#

#include <raft/core/comms.hpp>

namespace raft::comms

enum class datatype_t#

Values:

enumerator CHAR#
enumerator UINT8#
enumerator INT32#
enumerator UINT32#
enumerator INT64#
enumerator UINT64#
enumerator FLOAT32#
enumerator FLOAT64#
enum class op_t#

Values:

enumerator SUM#
enumerator PROD#
enumerator MIN#
enumerator MAX#
enum class status_t#

The resulting status of distributed stream synchronization

Values:

enumerator SUCCESS#
enumerator ERROR#
enumerator ABORT#
typedef unsigned int request_t#
template<typename value_t>
constexpr datatype_t get_type()#
template<>
constexpr datatype_t get_type<char>()#
template<>
constexpr datatype_t get_type<uint8_t>()#
template<>
constexpr datatype_t get_type<int>()#
template<>
constexpr datatype_t get_type<uint32_t>()#
template<>
constexpr datatype_t get_type<int64_t>()#
template<>
constexpr datatype_t get_type<uint64_t>()#
template<>
constexpr datatype_t get_type<float>()#
template<>
constexpr datatype_t get_type<double>()#

Comms Interface#

class comms_t#
#include <comms.hpp>

Public Functions

inline virtual ~comms_t()#

Virtual Destructor to enable polymorphism

inline int get_size() const#

Returns the size of the communicator clique

inline int get_rank() const#

Returns the local rank

inline std::unique_ptr<comms_iface> comm_split(int color, int key) const#

Splits the current communicator clique into sub-cliques matching the given color and key

Parameters:
  • color – ranks w/ the same color are placed in the same communicator

  • key – controls rank assignment

inline void barrier() const#

Performs a collective barrier synchronization

inline status_t sync_stream(cudaStream_t stream) const#

Some collective communications implementations (eg. NCCL) might use asynchronous collectives that are explicitly synchronized. It’s important to always synchronize using this method to allow failures to propagate, rather than cudaStreamSynchronize(), to prevent the potential for deadlocks.

Parameters:

stream – the cuda stream to sync collective operations on

template<typename value_t>
inline void isend(const value_t *buf, size_t size, int dest, int tag, request_t *request) const#

Performs an asynchronous point-to-point send

Template Parameters:

value_t – the type of data to send

Parameters:
  • buf – pointer to array of data to send

  • size – number of elements in buf

  • dest – destination rank

  • tag – a tag to use for the receiver to filter

  • request – pointer to hold returned request_t object. This will be used in waitall() to synchronize until the message is delivered (or fails).

template<typename value_t>
inline void irecv(value_t *buf, size_t size, int source, int tag, request_t *request) const#

Performs an asynchronous point-to-point receive

Template Parameters:

value_t – the type of data to be received

Parameters:
  • buf – pointer to (initialized) array that will hold received data

  • size – number of elements in buf

  • source – source rank

  • tag – a tag to use for message filtering

  • request – pointer to hold returned request_t object. This will be used in waitall() to synchronize until the message is delivered (or fails).

inline void waitall(int count, request_t array_of_requests[]) const#

Synchronize on an array of request_t objects returned from isend/irecv

Parameters:
  • count – number of requests to synchronize on

  • array_of_requests – an array of request_t objects returned from isend/irecv

template<typename value_t>
inline void allreduce(const value_t *sendbuff, value_t *recvbuff, size_t count, op_t op, cudaStream_t stream) const#

Perform an allreduce collective

Template Parameters:

value_t – datatype of underlying buffers

Parameters:
  • sendbuff – data to reduce

  • recvbuff – buffer to hold the reduced result

  • count – number of elements in sendbuff

  • op – reduction operation to perform

  • stream – CUDA stream to synchronize operation

template<typename value_t>
inline void bcast(value_t *buff, size_t count, int root, cudaStream_t stream) const#

Broadcast data from one rank to the rest

Template Parameters:

value_t – datatype of underlying buffers

Parameters:
  • buff – buffer to send

  • count – number of elements if buff

  • root – the rank initiating the broadcast

  • stream – CUDA stream to synchronize operation

template<typename value_t>
inline void bcast(const value_t *sendbuff, value_t *recvbuff, size_t count, int root, cudaStream_t stream) const#

Broadcast data from one rank to the rest

Template Parameters:

value_t – datatype of underlying buffers

Parameters:
  • sendbuff – buffer containing data to broadcast (only used in root)

  • recvbuff – buffer to receive broadcasted data

  • count – number of elements if buff

  • root – the rank initiating the broadcast

  • stream – CUDA stream to synchronize operation

template<typename value_t>
inline void reduce(const value_t *sendbuff, value_t *recvbuff, size_t count, op_t op, int root, cudaStream_t stream) const#

Reduce data from many ranks down to a single rank

Template Parameters:

value_t – datatype of underlying buffers

Parameters:
  • sendbuff – buffer containing data to reduce

  • recvbuff – buffer containing reduced data (only needs to be initialized on root)

  • count – number of elements in sendbuff

  • op – reduction operation to perform

  • root – rank to store the results

  • stream – CUDA stream to synchronize operation

template<typename value_t>
inline void allgather(const value_t *sendbuff, value_t *recvbuff, size_t sendcount, cudaStream_t stream) const#

Gathers data from each rank onto all ranks

Template Parameters:

value_t – datatype of underlying buffers

Parameters:
  • sendbuff – buffer containing data to gather

  • recvbuff – buffer containing gathered data from all ranks

  • sendcount – number of elements in send buffer

  • stream – CUDA stream to synchronize operation

template<typename value_t>
inline void allgatherv(const value_t *sendbuf, value_t *recvbuf, const size_t *recvcounts, const size_t *displs, cudaStream_t stream) const#

Gathers data from all ranks and delivers to combined data to all ranks

Template Parameters:

value_t – datatype of underlying buffers

Parameters:
  • sendbuf – buffer containing data to send

  • recvbuf – buffer containing data to receive

  • recvcounts – pointer to an array (of length num_ranks size) containing the number of elements that are to be received from each rank

  • displs – pointer to an array (of length num_ranks size) to specify the displacement (relative to recvbuf) at which to place the incoming data from each rank

  • stream – CUDA stream to synchronize operation

template<typename value_t>
inline void gather(const value_t *sendbuff, value_t *recvbuff, size_t sendcount, int root, cudaStream_t stream) const#

Gathers data from each rank onto all ranks

Template Parameters:

value_t – datatype of underlying buffers

Parameters:
  • sendbuff – buffer containing data to gather

  • recvbuff – buffer containing gathered data from all ranks

  • sendcount – number of elements in send buffer

  • root – rank to store the results

  • stream – CUDA stream to synchronize operation

template<typename value_t>
inline void gatherv(const value_t *sendbuf, value_t *recvbuf, size_t sendcount, const size_t *recvcounts, const size_t *displs, int root, cudaStream_t stream) const#

Gathers data from all ranks and delivers to combined data to all ranks

Template Parameters:

value_t – datatype of underlying buffers

Parameters:
  • sendbuf – buffer containing data to send

  • recvbuf – buffer containing data to receive

  • sendcount – number of elements in send buffer

  • recvcounts – pointer to an array (of length num_ranks size) containing the number of elements that are to be received from each rank

  • displs – pointer to an array (of length num_ranks size) to specify the displacement (relative to recvbuf) at which to place the incoming data from each rank

  • root – rank to store the results

  • stream – CUDA stream to synchronize operation

template<typename value_t>
inline void reducescatter(const value_t *sendbuff, value_t *recvbuff, size_t recvcount, op_t op, cudaStream_t stream) const#

Reduces data from all ranks then scatters the result across ranks

Template Parameters:

value_t – datatype of underlying buffers

Parameters:
  • sendbuff – buffer containing data to send (size recvcount * num_ranks)

  • recvbuff – buffer containing received data

  • recvcount – number of items to receive

  • op – reduction operation to perform

  • stream – CUDA stream to synchronize operation

template<typename value_t>
inline void device_send(const value_t *buf, size_t size, int dest, cudaStream_t stream) const#

Performs a point-to-point send

if a thread is sending & receiving at the same time, use device_sendrecv to avoid deadlock.

Template Parameters:

value_t – the type of data to send

Parameters:
  • buf – pointer to array of data to send

  • size – number of elements in buf

  • dest – destination rank

  • stream – CUDA stream to synchronize operation

template<typename value_t>
inline void device_recv(value_t *buf, size_t size, int source, cudaStream_t stream) const#

Performs a point-to-point receive

if a thread is sending & receiving at the same time, use device_sendrecv to avoid deadlock.

Template Parameters:

value_t – the type of data to be received

Parameters:
  • buf – pointer to (initialized) array that will hold received data

  • size – number of elements in buf

  • source – source rank

  • stream – CUDA stream to synchronize operation

template<typename value_t>
inline void device_sendrecv(const value_t *sendbuf, size_t sendsize, int dest, value_t *recvbuf, size_t recvsize, int source, cudaStream_t stream) const#

Performs a point-to-point send/receive

Template Parameters:

value_t – the type of data to be sent & received

Parameters:
  • sendbuf – pointer to array of data to send

  • sendsize – number of elements in sendbuf

  • dest – destination rank

  • recvbuf – pointer to (initialized) array that will hold received data

  • recvsize – number of elements in recvbuf

  • source – source rank

  • stream – CUDA stream to synchronize operation

template<typename value_t>
inline void device_multicast_sendrecv(const value_t *sendbuf, std::vector<size_t> const &sendsizes, std::vector<size_t> const &sendoffsets, std::vector<int> const &dests, value_t *recvbuf, std::vector<size_t> const &recvsizes, std::vector<size_t> const &recvoffsets, std::vector<int> const &sources, cudaStream_t stream) const#

Performs a multicast send/receive

Template Parameters:

value_t – the type of data to be sent & received

Parameters:
  • sendbuf – pointer to array of data to send

  • sendsizes – numbers of elements to send

  • sendoffsets – offsets in a number of elements from sendbuf

  • dests – destination ranks

  • recvbuf – pointer to (initialized) array that will hold received data

  • recvsizes – numbers of elements to recv

  • recvoffsets – offsets in a number of elements from recvbuf

  • sources – source ranks

  • stream – CUDA stream to synchronize operation

inline void group_start() const#

Multiple collectives & device send/receive operations placed between group_start() and group_end() are merged into one big operation. Internally, this function is a wrapper for ncclGroupStart().

inline void group_end() const#

Multiple collectives & device send/receive operations placed between group_start() and group_end() are merged into one big operation. Internally, this function is a wrapper for ncclGroupEnd().

MPI Comms#

inline void initialize_mpi_comms(resources *handle, MPI_Comm comm)#

Given a properly initialized MPI_Comm, construct an instance of RAFT’s MPI Communicator and inject it into the given RAFT handle instance

#include <raft/comms/mpi_comms.hpp>
#include <raft/core/device_mdarray.hpp>

MPI_Comm mpi_comm;
raft::raft::resources handle;

initialize_mpi_comms(&handle, mpi_comm);
...
const auto& comm = resource::get_comms(handle);
auto gather_data = raft::make_device_vector<float>(handle, comm.get_size());
...
comm.allgather((gather_data.data_handle())[comm.get_rank()],
               gather_data.data_handle(),
               1,
               resource::get_cuda_stream(handle));

comm.sync_stream(resource::get_cuda_stream(handle));

Parameters:
  • handle – raft handle for managing expensive resources

  • comm – an initialized MPI communicator

NCCL+UCX Comms#

void build_comms_nccl_only(resources *handle, ncclComm_t nccl_comm, int num_ranks, int rank)#

Factory function to construct a RAFT NCCL communicator and inject it into a RAFT handle.

#include <raft/comms/std_comms.hpp>
#include <raft/core/device_mdarray.hpp>

ncclComm_t nccl_comm;
raft::raft::resources handle;

build_comms_nccl_only(&handle, nccl_comm, 5, 0);
...
const auto& comm = resource::get_comms(handle);
auto gather_data = raft::make_device_vector<float>(handle, comm.get_size());
...
comm.allgather((gather_data.data_handle())[comm.get_rank()],
               gather_data.data_handle(),
               1,
               resource::get_cuda_stream(handle));

comm.sync_stream(resource::get_cuda_stream(handle));

Parameters:
  • handle – raft::resources for injecting the comms

  • nccl_comm – initialized NCCL communicator to use for collectives

  • num_ranks – number of ranks in communicator clique

  • rank – rank of local instance

void build_comms_nccl_ucx(resources *handle, ncclComm_t nccl_comm, void *ucp_worker, void *eps, int num_ranks, int rank)#

Factory function to construct a RAFT NCCL+UCX and inject it into a RAFT handle.

#include <raft/comms/std_comms.hpp>
#include <raft/core/device_mdarray.hpp>

ncclComm_t nccl_comm;
raft::raft::resources handle;
ucp_worker_h ucp_worker;
ucp_ep_h *ucp_endpoints_arr;

build_comms_nccl_ucx(&handle, nccl_comm, &ucp_worker, ucp_endpoints_arr, 5, 0);
...
const auto& comm = resource::get_comms(handle);
auto gather_data = raft::make_device_vector<float>(handle, comm.get_size());
...
comm.allgather((gather_data.data_handle())[comm.get_rank()],
               gather_data.data_handle(),
               1,
               resource::get_cuda_stream(handle));

comm.sync_stream(resource::get_cuda_stream(handle));

Parameters:
  • handle – raft::resources for injecting the comms

  • nccl_comm – initialized NCCL communicator to use for collectives

  • ucp_worker – of local process Note: This is purposefully left as void* so that the ucp_worker_h doesn’t need to be exposed through the cython layer

  • eps – array of ucp_ep_h instances. Note: This is purposefully left as void* so that the ucp_ep_h doesn’t need to be exposed through the cython layer.

  • num_ranks – number of ranks in communicator clique

  • rank – rank of local instance