Attention
The vector search and clustering algorithms in RAFT are being migrated to a new library dedicated to vector search called cuVS. We will continue to support the vector search algorithms in RAFT during this move, but will no longer update them after the RAPIDS 24.06 (June) release. We plan to complete the migration by RAPIDS 24.10 (October) release and they will be removed from RAFT altogether in the 24.12 (December) release.
Multi-node Multi-GPU#
RAFT contains C++ infrastructure for abstracting the communications layer when writing applications that scale on multiple nodes and across multiple GPUs. This infrastructure assumes OPG (one-process per GPU) architectures where multiple physical parallel units (processes, ranks, or workers) might be executing code concurrently but where each parallel unit is communicating with only a single GPU and is the only process communicating with each GPU.
The comms layer in RAFT is intended to provide a facade API for barrier synchronous collective communications, allowing users to write algorithms using a single abstraction layer and deploy in many different types of systems. Currently, RAFT communications code has been deployed in MPI, Dask, and Spark clusters.
Common Types#
#include <raft/core/comms.hpp>
namespace raft::comms
-
enum class datatype_t#
Values:
-
enumerator CHAR#
-
enumerator UINT8#
-
enumerator INT32#
-
enumerator UINT32#
-
enumerator INT64#
-
enumerator UINT64#
-
enumerator FLOAT32#
-
enumerator FLOAT64#
-
enumerator CHAR#
-
enum class status_t#
The resulting status of distributed stream synchronization
Values:
-
enumerator SUCCESS#
-
enumerator ERROR#
-
enumerator ABORT#
-
enumerator SUCCESS#
-
typedef unsigned int request_t#
-
template<typename value_t>
datatype_t get_type()#
-
template<>
datatype_t get_type<char>()#
-
template<>
datatype_t get_type<uint8_t>()#
-
template<>
datatype_t get_type<int>()#
-
template<>
datatype_t get_type<uint32_t>()#
-
template<>
datatype_t get_type<int64_t>()#
-
template<>
datatype_t get_type<uint64_t>()#
-
template<>
datatype_t get_type<float>()#
-
template<>
datatype_t get_type<double>()#
Comms Interface#
-
class comms_t#
- #include <comms.hpp>
Public Functions
-
inline virtual ~comms_t()#
Virtual Destructor to enable polymorphism
-
inline int get_size() const#
Returns the size of the communicator clique
-
inline int get_rank() const#
Returns the local rank
-
inline std::unique_ptr<comms_iface> comm_split(int color, int key) const#
Splits the current communicator clique into sub-cliques matching the given color and key
- Parameters:
color – ranks w/ the same color are placed in the same communicator
key – controls rank assignment
-
inline void barrier() const#
Performs a collective barrier synchronization
-
inline status_t sync_stream(cudaStream_t stream) const#
Some collective communications implementations (eg. NCCL) might use asynchronous collectives that are explicitly synchronized. It’s important to always synchronize using this method to allow failures to propagate, rather than
cudaStreamSynchronize()
, to prevent the potential for deadlocks.- Parameters:
stream – the cuda stream to sync collective operations on
-
template<typename value_t>
inline void isend(const value_t *buf, size_t size, int dest, int tag, request_t *request) const# Performs an asynchronous point-to-point send
- Template Parameters:
value_t – the type of data to send
- Parameters:
buf – pointer to array of data to send
size – number of elements in buf
dest – destination rank
tag – a tag to use for the receiver to filter
request – pointer to hold returned request_t object. This will be used in
waitall()
to synchronize until the message is delivered (or fails).
-
template<typename value_t>
inline void irecv(value_t *buf, size_t size, int source, int tag, request_t *request) const# Performs an asynchronous point-to-point receive
- Template Parameters:
value_t – the type of data to be received
- Parameters:
buf – pointer to (initialized) array that will hold received data
size – number of elements in buf
source – source rank
tag – a tag to use for message filtering
request – pointer to hold returned request_t object. This will be used in
waitall()
to synchronize until the message is delivered (or fails).
-
inline void waitall(int count, request_t array_of_requests[]) const#
Synchronize on an array of request_t objects returned from isend/irecv
- Parameters:
count – number of requests to synchronize on
array_of_requests – an array of request_t objects returned from isend/irecv
-
template<typename value_t>
inline void allreduce(const value_t *sendbuff, value_t *recvbuff, size_t count, op_t op, cudaStream_t stream) const# Perform an allreduce collective
- Template Parameters:
value_t – datatype of underlying buffers
- Parameters:
sendbuff – data to reduce
recvbuff – buffer to hold the reduced result
count – number of elements in sendbuff
op – reduction operation to perform
stream – CUDA stream to synchronize operation
-
template<typename value_t>
inline void bcast(value_t *buff, size_t count, int root, cudaStream_t stream) const# Broadcast data from one rank to the rest
- Template Parameters:
value_t – datatype of underlying buffers
- Parameters:
buff – buffer to send
count – number of elements if buff
root – the rank initiating the broadcast
stream – CUDA stream to synchronize operation
-
template<typename value_t>
inline void bcast(const value_t *sendbuff, value_t *recvbuff, size_t count, int root, cudaStream_t stream) const# Broadcast data from one rank to the rest
- Template Parameters:
value_t – datatype of underlying buffers
- Parameters:
sendbuff – buffer containing data to broadcast (only used in root)
recvbuff – buffer to receive broadcasted data
count – number of elements if buff
root – the rank initiating the broadcast
stream – CUDA stream to synchronize operation
-
template<typename value_t>
inline void reduce(const value_t *sendbuff, value_t *recvbuff, size_t count, op_t op, int root, cudaStream_t stream) const# Reduce data from many ranks down to a single rank
- Template Parameters:
value_t – datatype of underlying buffers
- Parameters:
sendbuff – buffer containing data to reduce
recvbuff – buffer containing reduced data (only needs to be initialized on root)
count – number of elements in sendbuff
op – reduction operation to perform
root – rank to store the results
stream – CUDA stream to synchronize operation
-
template<typename value_t>
inline void allgather(const value_t *sendbuff, value_t *recvbuff, size_t sendcount, cudaStream_t stream) const# Gathers data from each rank onto all ranks
- Template Parameters:
value_t – datatype of underlying buffers
- Parameters:
sendbuff – buffer containing data to gather
recvbuff – buffer containing gathered data from all ranks
sendcount – number of elements in send buffer
stream – CUDA stream to synchronize operation
-
template<typename value_t>
inline void allgatherv(const value_t *sendbuf, value_t *recvbuf, const size_t *recvcounts, const size_t *displs, cudaStream_t stream) const# Gathers data from all ranks and delivers to combined data to all ranks
- Template Parameters:
value_t – datatype of underlying buffers
- Parameters:
sendbuf – buffer containing data to send
recvbuf – buffer containing data to receive
recvcounts – pointer to an array (of length num_ranks size) containing the number of elements that are to be received from each rank
displs – pointer to an array (of length num_ranks size) to specify the displacement (relative to recvbuf) at which to place the incoming data from each rank
stream – CUDA stream to synchronize operation
-
template<typename value_t>
inline void gather(const value_t *sendbuff, value_t *recvbuff, size_t sendcount, int root, cudaStream_t stream) const# Gathers data from each rank onto all ranks
- Template Parameters:
value_t – datatype of underlying buffers
- Parameters:
sendbuff – buffer containing data to gather
recvbuff – buffer containing gathered data from all ranks
sendcount – number of elements in send buffer
root – rank to store the results
stream – CUDA stream to synchronize operation
-
template<typename value_t>
inline void gatherv(const value_t *sendbuf, value_t *recvbuf, size_t sendcount, const size_t *recvcounts, const size_t *displs, int root, cudaStream_t stream) const# Gathers data from all ranks and delivers to combined data to all ranks
- Template Parameters:
value_t – datatype of underlying buffers
- Parameters:
sendbuf – buffer containing data to send
recvbuf – buffer containing data to receive
sendcount – number of elements in send buffer
recvcounts – pointer to an array (of length num_ranks size) containing the number of elements that are to be received from each rank
displs – pointer to an array (of length num_ranks size) to specify the displacement (relative to recvbuf) at which to place the incoming data from each rank
root – rank to store the results
stream – CUDA stream to synchronize operation
-
template<typename value_t>
inline void reducescatter(const value_t *sendbuff, value_t *recvbuff, size_t recvcount, op_t op, cudaStream_t stream) const# Reduces data from all ranks then scatters the result across ranks
- Template Parameters:
value_t – datatype of underlying buffers
- Parameters:
sendbuff – buffer containing data to send (size recvcount * num_ranks)
recvbuff – buffer containing received data
recvcount – number of items to receive
op – reduction operation to perform
stream – CUDA stream to synchronize operation
-
template<typename value_t>
inline void device_send(const value_t *buf, size_t size, int dest, cudaStream_t stream) const# Performs a point-to-point send
if a thread is sending & receiving at the same time, use device_sendrecv to avoid deadlock.
- Template Parameters:
value_t – the type of data to send
- Parameters:
buf – pointer to array of data to send
size – number of elements in buf
dest – destination rank
stream – CUDA stream to synchronize operation
-
template<typename value_t>
inline void device_recv(value_t *buf, size_t size, int source, cudaStream_t stream) const# Performs a point-to-point receive
if a thread is sending & receiving at the same time, use device_sendrecv to avoid deadlock.
- Template Parameters:
value_t – the type of data to be received
- Parameters:
buf – pointer to (initialized) array that will hold received data
size – number of elements in buf
source – source rank
stream – CUDA stream to synchronize operation
-
template<typename value_t>
inline void device_sendrecv(const value_t *sendbuf, size_t sendsize, int dest, value_t *recvbuf, size_t recvsize, int source, cudaStream_t stream) const# Performs a point-to-point send/receive
- Template Parameters:
value_t – the type of data to be sent & received
- Parameters:
sendbuf – pointer to array of data to send
sendsize – number of elements in sendbuf
dest – destination rank
recvbuf – pointer to (initialized) array that will hold received data
recvsize – number of elements in recvbuf
source – source rank
stream – CUDA stream to synchronize operation
-
template<typename value_t>
inline void device_multicast_sendrecv(const value_t *sendbuf, std::vector<size_t> const &sendsizes, std::vector<size_t> const &sendoffsets, std::vector<int> const &dests, value_t *recvbuf, std::vector<size_t> const &recvsizes, std::vector<size_t> const &recvoffsets, std::vector<int> const &sources, cudaStream_t stream) const# Performs a multicast send/receive
- Template Parameters:
value_t – the type of data to be sent & received
- Parameters:
sendbuf – pointer to array of data to send
sendsizes – numbers of elements to send
sendoffsets – offsets in a number of elements from sendbuf
dests – destination ranks
recvbuf – pointer to (initialized) array that will hold received data
recvsizes – numbers of elements to recv
recvoffsets – offsets in a number of elements from recvbuf
sources – source ranks
stream – CUDA stream to synchronize operation
-
inline void group_start() const#
Multiple collectives & device send/receive operations placed between group_start() and group_end() are merged into one big operation. Internally, this function is a wrapper for ncclGroupStart().
-
inline void group_end() const#
Multiple collectives & device send/receive operations placed between group_start() and group_end() are merged into one big operation. Internally, this function is a wrapper for ncclGroupEnd().
-
inline virtual ~comms_t()#
MPI Comms#
-
inline void initialize_mpi_comms(resources *handle, MPI_Comm comm)#
Given a properly initialized MPI_Comm, construct an instance of RAFT’s MPI Communicator and inject it into the given RAFT handle instance
#include <raft/comms/mpi_comms.hpp> #include <raft/core/device_mdarray.hpp> MPI_Comm mpi_comm; raft::raft::resources handle; initialize_mpi_comms(&handle, mpi_comm); ... const auto& comm = resource::get_comms(handle); auto gather_data = raft::make_device_vector<float>(handle, comm.get_size()); ... comm.allgather((gather_data.data_handle())[comm.get_rank()], gather_data.data_handle(), 1, resource::get_cuda_stream(handle)); comm.sync_stream(resource::get_cuda_stream(handle));
- Parameters:
handle – raft handle for managing expensive resources
comm – an initialized MPI communicator
NCCL+UCX Comms#
-
void build_comms_nccl_only(raft::resources *handle, ncclComm_t nccl_comm, int num_ranks, int rank)#
Factory function to construct a RAFT NCCL communicator and inject it into a RAFT handle.
#include <raft/comms/std_comms.hpp> #include <raft/core/device_mdarray.hpp> ncclComm_t nccl_comm; raft::raft::resources handle; build_comms_nccl_only(&handle, nccl_comm, 5, 0); ... const auto& comm = resource::get_comms(handle); auto gather_data = raft::make_device_vector<float>(handle, comm.get_size()); ... comm.allgather((gather_data.data_handle())[comm.get_rank()], gather_data.data_handle(), 1, resource::get_cuda_stream(handle)); comm.sync_stream(resource::get_cuda_stream(handle));
- Parameters:
handle – raft::resources for injecting the comms
nccl_comm – initialized NCCL communicator to use for collectives
num_ranks – number of ranks in communicator clique
rank – rank of local instance
-
void build_comms_nccl_ucx(resources *handle, ncclComm_t nccl_comm, bool is_ucxx, void *ucp_worker, void *eps, int num_ranks, int rank)#
Factory function to construct a RAFT NCCL+UCX and inject it into a RAFT handle.
#include <raft/comms/std_comms.hpp> #include <raft/core/device_mdarray.hpp> ncclComm_t nccl_comm; raft::raft::resources handle; ucp_worker_h ucp_worker; ucp_ep_h *ucp_endpoints_arr; build_comms_nccl_ucx(&handle, nccl_comm, &ucp_worker, ucp_endpoints_arr, 5, 0); ... const auto& comm = resource::get_comms(handle); auto gather_data = raft::make_device_vector<float>(handle, comm.get_size()); ... comm.allgather((gather_data.data_handle())[comm.get_rank()], gather_data.data_handle(), 1, resource::get_cuda_stream(handle)); comm.sync_stream(resource::get_cuda_stream(handle));
- Parameters:
handle – raft::resources for injecting the comms
nccl_comm – initialized NCCL communicator to use for collectives
is_ucxx – whether
ucp_worker
andeps
objects are UCXX (true) or pure UCX (false).ucp_worker – of local process Note: This is purposefully left as void* so that the ucp_worker_h doesn’t need to be exposed through the cython layer
eps – array of ucp_ep_h instances. Note: This is purposefully left as void* so that the ucp_ep_h doesn’t need to be exposed through the cython layer.
num_ranks – number of ranks in communicator clique
rank – rank of local instance