cugraph.dask.comms.comms.initialize#
- cugraph.dask.comms.comms.initialize(comms=None, p2p=False, prows=None, pcols=None, partition_type=1)[source]#
Initialize a communicator for multi-node/multi-gpu communications. It is expected to be called right after client initialization for running multi-GPU algorithms (this wraps raft comms that manages underlying NCCL and UCX comms handles across the workers of a Dask cluster).
It is recommended to also call destroy() when the comms are no longer needed so the underlying resources can be cleaned up.
- Parameters:
- commsraft Comms, optional (default=None)
A pre-initialized raft communicator. If provided, this is used for mnmg communications. If not provided, default comms are initialized as per client information.
- p2pbool, optional (default=False)
Initialize UCX endpoints if True.
- prowsint, optional (default=None)
Specifies the number of rows when performing a 2D partitioning of the input graph. If specified, this must be a factor of the total number of parallel processes. When specified with pcols, prows*pcols should be equal to the total number of parallel processes.
- pcolsint, optional (default=None)
Specifies the number of columns when performing a 2D partitioning of the input graph. If specified, this must be a factor of the total number of parallel processes. When specified with prows, prows*pcols should be equal to the total number of parallel processes.
- partition_typeint, optional (default=1)
Valid values are currently 1 or any int other than 1. A value of 1 (the default) represents a partitioning resulting in prows*pcols partitions. A non-1 value currently results in a partitioning of p*pcols partitions, where p is the number of GPUs.
Examples
>>> from dask.distributed import Client >>> from dask_cuda import LocalCUDACluster >>> import cugraph.dask.comms as Comms >>> cluster = LocalCUDACluster() >>> client = Client(cluster) >>> Comms.initialize(p2p=True) >>> # DO WORK HERE >>> # All done, clean up >>> Comms.destroy() >>> client.close() >>> cluster.close()