Multi-GPU with cuGraph

cuGraph supports multi-GPU leveraging Dask. Dask is a flexible library for parallel computing in Python which makes scaling out your workflow smooth and simple. cuGraph also uses other Dask-based RAPIDS projects such as dask-cuda. The maximum graph size is currently limited to 2 Billion vertices (to be waived in the next versions).

Distributed graph analytics

The current solution is able to scale across multiple GPUs on multiple machines. Distributing the graph and computation lets you analyze datasets far larger than a single GPU’s memory.

With cuGraph and Dask, whether you’re using a single NVIDIA GPU or multiple nodes, your RAPIDS workflow will run smoothly, intelligently distributing the workload across the available resources.

If your graph comfortably fits in memory on a single GPU, you would want to use the single-GPU version of cuGraph. If you want to distribute your workflow across multiple GPUs and have more data than you can fit in memory on a single GPU, you would want to use cuGraph’s multi-GPU features.

Helper functions

cugraph.comms.comms.initialize(comms=None, p2p=False, prows=None, pcols=None, partition_type=1)[source]

Initialize a communicator for multi-node/multi-gpu communications. It is expected to be called right after client initialization for running multi-GPU algorithms (this wraps raft comms that manages underlying NCCL and UCX comms handles across the workers of a Dask cluster).

It is recommended to also call destroy() when the comms are no longer needed so the underlying resources can be cleaned up.

Parameters
commsraft Comms

A pre-initialized raft communicator. If provided, this is used for mnmg communications. If not provided, default comms are initialized as per client information.

p2pbool

Initialize UCX endpoints if True. Default is False.

prowsint

Specifies the number of rows when performing a 2D partitioning of the input graph. If specified, this must be a factor of the total number of parallel processes. When specified with pcols, prows*pcols should be equal to the total number of parallel processes.

pcolsint

Specifies the number of columns when performing a 2D partitioning of the input graph. If specified, this must be a factor of the total number of parallel processes. When specified with prows, prows*pcols should be equal to the total number of parallel processes.

partition_typeint

Valid values are currently 1 or any int other than 1. A value of 1 (the default) represents a partitioning resulting in prows*pcols partitions. A non-1 value currently results in a partitioning of p*pcols partitions, where p is the number of GPUs.

cugraph.comms.comms.destroy()[source]

Shuts down initialized comms and cleans up resources.

cugraph.dask.common.read_utils.get_chunksize(input_path)[source]

Calculate the appropriate chunksize for dask_cudf.read_csv to get a number of partitions equal to the number of GPUs.

Examples

>>> import dask_cugraph.pagerank as dcg
>>> chunksize = dcg.get_chunksize(edge_list.csv)

Consolidation

cuGraph can transparently interpret the Dask cuDF Dataframe as a regular Dataframe when loading the edge list. This is particularly helpful for workflows extracting a single GPU sized edge list from a distributed dataset. From there any existing single GPU feature will just work on this input.

For instance, consolidation allows leveraging Dask cuDF CSV reader to load file(s) on multiple GPUs and consolidate this input to a single GPU graph. Reading is often the time and memory bottleneck, with this feature users can call the Multi-GPU version of the reader without changing anything else.

Batch Processing

cuGraph can leverage multi GPUs to increase processing speed for graphs that fit on a single GPU, providing faster analytics on such graphs. You will be able to use the Graph the same way as you used to in a Single GPU environment, but analytics that support batch processing will automatically use the GPUs available to the dask client. For example, Betweenness Centrality scores can be slow to obtain depending on the number of vertices used in the approximation. Thank to Multi GPUs Batch Processing, you can create Single GPU graph as you would regularly do it using cuDF CSV reader, enable Batch analytics on it, and obtain scores much faster as each GPU will handle a sub-set of the sources. In order to use Batch Analytics you need to set up a Dask Cluster and Client in addition to the cuGraph communicator, then you can simply call enable_batch() on you graph, and algorithms supporting batch processing will use multiple GPUs.

Algorithms supporting Batch Processing

cugraph.centrality.betweenness_centrality(G, k=None, normalized=True, weight=None, endpoints=False, seed=None, result_dtype=<class 'numpy.float64'>)[source]

Compute the betweenness centrality for all vertices of the graph G. Betweenness centrality is a measure of the number of shortest paths that pass through a vertex. A vertex with a high betweenness centrality score has more paths passing through it and is therefore believed to be more important.

To improve performance. rather than doing an all-pair shortest path, a sample of k starting vertices can be used.

CuGraph does not currently support the ‘endpoints’ and ‘weight’ parameters as seen in the corresponding networkX call.

Parameters
GcuGraph.Graph or networkx.Graph

The graph can be either directed (DiGraph) or undirected (Graph). Weights in the graph are ignored, the current implementation uses BFS traversals. Use weight parameter if weights need to be considered (currently not supported)

kint or list or None, optional, default=None

If k is not None, use k node samples to estimate betweenness. Higher values give better approximation. If k is a list, use the content of the list for estimation: the list should contain vertex identifiers. If k is None (the default), all the vertices are used to estimate betweenness. Vertices obtained through sampling or defined as a list will be used assources for traversals inside the algorithm.

normalizedbool, optional

Default is True. If true, the betweenness values are normalized by __2 / ((n - 1) * (n - 2))__ for Graphs (undirected), and __1 / ((n - 1) * (n - 2))__ for DiGraphs (directed graphs) where n is the number of nodes in G. Normalization will ensure that values are in [0, 1], this normalization scales for the highest possible value where one node is crossed by every single shortest path.

weightcudf.DataFrame, optional, default=None

Specifies the weights to be used for each edge. Should contain a mapping between edges and weights. (Not Supported)

endpointsbool, optional, default=False

If true, include the endpoints in the shortest path counts. (Not Supported)

seedoptional

if k is specified and k is an integer, use seed to initialize the random number generator. Using None as seed relies on random.seed() behavior: using current system time If k is either None or list: seed parameter is ignored

result_dtypenp.float32 or np.float64, optional, default=np.float64

Indicate the data type of the betweenness centrality scores

Returns
dfcudf.DataFrame or Dictionary if using NetworkX

GPU data frame containing two cudf.Series of size V: the vertex identifiers and the corresponding betweenness centrality values. Please note that the resulting the ‘vertex’ column might not be in ascending order. The Dictionary conatains the same two columns

df[‘vertex’]cudf.Series

Contains the vertex identifiers

df[‘betweenness_centrality’]cudf.Series

Contains the betweenness centrality of vertices

Examples

>>> gdf = cudf.read_csv('datasets/karate.csv', delimiter=' ',
>>>                   dtype=['int32', 'int32', 'float32'], header=None)
>>> G = cugraph.Graph()
>>> G.from_cudf_edgelist(gdf, source='0', destination='1')
>>> bc = cugraph.betweenness_centrality(G)
cugraph.centrality.edge_betweenness_centrality(G, k=None, normalized=True, weight=None, seed=None, result_dtype=<class 'numpy.float64'>)[source]

Compute the edge betweenness centrality for all edges of the graph G. Betweenness centrality is a measure of the number of shortest paths that pass over an edge. An edge with a high betweenness centrality score has more paths passing over it and is therefore believed to be more important.

To improve performance, rather than doing an all-pair shortest path, a sample of k starting vertices can be used.

CuGraph does not currently support the ‘weight’ parameter as seen in the corresponding networkX call.

Parameters
GcuGraph.Graph or networkx.Graph

The graph can be either directed (DiGraph) or undirected (Graph). Weights in the graph are ignored, the current implementation uses BFS traversals. Use weight parameter if weights need to be considered (currently not supported)

kint or list or None, optional, default=None

If k is not None, use k node samples to estimate betweenness. Higher values give better approximation. If k is a list, use the content of the list for estimation: the list should contain vertices identifiers. Vertices obtained through sampling or defined as a list will be used as sources for traversals inside the algorithm.

normalizedbool, optional

Default is True. If true, the betweenness values are normalized by 2 / (n * (n - 1)) for Graphs (undirected), and 1 / (n * (n - 1)) for DiGraphs (directed graphs) where n is the number of nodes in G. Normalization will ensure that values are in [0, 1], this normalization scales for the highest possible value where one edge is crossed by every single shortest path.

weightcudf.DataFrame, optional, default=None

Specifies the weights to be used for each edge. Should contain a mapping between edges and weights. (Not Supported)

seedoptional

if k is specified and k is an integer, use seed to initialize the random number generator. Using None as seed relies on random.seed() behavior: using current system time If k is either None or list: seed parameter is ignored

result_dtypenp.float32 or np.float64, optional, default=np.float64

Indicate the data type of the betweenness centrality scores Using double automatically switch implementation to “default”

Returns
dfcudf.DataFrame or Dictionary if using NetworkX

GPU data frame containing three cudf.Series of size E: the vertex identifiers of the sources, the vertex identifies of the destinations and the corresponding betweenness centrality values. Please note that the resulting the ‘src’, ‘dst’ column might not be in ascending order.

df[‘src’]cudf.Series

Contains the vertex identifiers of the source of each edge

df[‘dst’]cudf.Series

Contains the vertex identifiers of the destination of each edge

df[‘edge_betweenness_centrality’]cudf.Series

Contains the betweenness centrality of edges

When using undirected graphs, ‘src’ and ‘dst’ only contains elements such that ‘src’ < ‘dst’, which might differ from networkx and user’s input. Namely edge (1 -> 0) is transformed into (0 -> 1) but contains the betweenness centrality of edge (1 -> 0).

Examples

>>> gdf = cudf.read_csv('datasets/karate.csv', delimiter=' ',
>>>                   dtype=['int32', 'int32', 'float32'], header=None)
>>> G = cugraph.Graph()
>>> G.from_cudf_edgelist(gdf, source='0', destination='1')
>>> ebc = cugraph.edge_betweenness_centrality(G)