Multi-GPU with cuGraph¶
cuGraph supports multi-GPU leveraging Dask. Dask is a flexible library for parallel computing in Python which makes scaling out your workflow smooth and simple. cuGraph also uses other Dask-based RAPIDS projects such as dask-cuda. The maximum graph size is currently limited to 2 Billion vertices (to be waived in the next versions).
Distributed graph analytics¶
The current solution is able to scale across multiple GPUs on multiple machines. Distributing the graph and computation lets you analyze datasets far larger than a single GPU’s memory.
With cuGraph and Dask, whether you’re using a single NVIDIA GPU or multiple nodes, your RAPIDS workflow will run smoothly, intelligently distributing the workload across the available resources.
If your graph comfortably fits in memory on a single GPU, you would want to use the single-GPU version of cuGraph. If you want to distribute your workflow across multiple GPUs and have more data than you can fit in memory on a single GPU, you would want to use cuGraph’s multi-GPU features.
Distributed Graph Algorithms¶
-
cugraph.dask.link_analysis.pagerank.
pagerank
(input_graph, alpha=0.85, personalization=None, max_iter=100, tol=1e-05, nstart=None)[source]¶ Find the PageRank values for each vertex in a graph using multiple GPUs. cuGraph computes an approximation of the Pagerank using the power method. The input graph must contain edge list as dask-cudf dataframe with one partition per GPU.
- Parameters
- graphcugraph.DiGraph
cuGraph graph descriptor, should contain the connectivity information as dask cudf edge list dataframe(edge weights are not used for this algorithm). Undirected Graph not currently supported.
- alphafloat
The damping factor alpha represents the probability to follow an outgoing edge, standard value is 0.85. Thus, 1.0-alpha is the probability to “teleport” to a random vertex. Alpha should be greater than 0.0 and strictly lower than 1.0.
- personalizationcudf.Dataframe
GPU Dataframe containing the personalization information. Currently not supported. personalization[‘vertex’] : cudf.Series
Subset of vertices of graph for personalization
- personalization[‘values’]cudf.Series
Personalization values for vertices
- max_iterint
The maximum number of iterations before an answer is returned. If this value is lower or equal to 0 cuGraph will use the default value, which is 30.
- tolerancefloat
Set the tolerance the approximation, this parameter should be a small magnitude value. The lower the tolerance the better the approximation. If this value is 0.0f, cuGraph will use the default value which is 1.0E-5. Setting too small a tolerance can lead to non-convergence due to numerical roundoff. Usually values between 0.01 and 0.00001 are acceptable.
- nstartnot supported
initial guess for pagerank
- Returns
- ——-
- PageRankdask_cudf.DataFrame
GPU data frame containing two dask_cudf.Series of size V: the vertex identifiers and the corresponding PageRank values.
- ddf[‘vertex’]dask_cudf.Series
Contains the vertex identifiers
- ddf[‘pagerank’]dask_cudf.Series
Contains the PageRank score
Examples
>>> import cugraph.dask as dcg >>> Comms.initialize(p2p=True) >>> chunksize = dcg.get_chunksize(input_data_path) >>> ddf = dask_cudf.read_csv(input_data_path, chunksize=chunksize, delimiter=' ', names=['src', 'dst', 'value'], dtype=['int32', 'int32', 'float32']) >>> dg = cugraph.DiGraph() >>> dg.from_dask_cudf_edgelist(ddf, source='src', destination='dst', edge_attr='value') >>> pr = dcg.pagerank(dg) >>> Comms.destroy()
-
cugraph.dask.traversal.bfs.
bfs
(graph, start, return_distances=False)[source]¶ Find the distances and predecessors for a breadth first traversal of a graph. The input graph must contain edge list as dask-cudf dataframe with one partition per GPU.
- Parameters
- graphcugraph.DiGraph
cuGraph graph descriptor, should contain the connectivity information as dask cudf edge list dataframe(edge weights are not used for this algorithm). Undirected Graph not currently supported.
- startInteger
Specify starting vertex for breadth-first search; this function iterates over edges in the component reachable from this node.
- return_distancesbool, optional, default=False
Indicates if distances should be returned
- Returns
- dfdask_cudf.DataFrame
df[‘vertex’] gives the vertex id
df[‘distance’] gives the path distance from the starting vertex (Only if return_distances is True)
df[‘predecessor’] gives the vertex it was reached from in the traversal
Examples
>>> import cugraph.dask as dcg >>> Comms.initialize(p2p=True) >>> chunksize = dcg.get_chunksize(input_data_path) >>> ddf = dask_cudf.read_csv(input_data_path, chunksize=chunksize, delimiter=' ', names=['src', 'dst', 'value'], dtype=['int32', 'int32', 'float32']) >>> dg = cugraph.DiGraph() >>> dg.from_dask_cudf_edgelist(ddf, 'src', 'dst') >>> df = dcg.bfs(dg, 0) >>> Comms.destroy()
Helper functions¶
-
cugraph.comms.comms.
initialize
(comms=None, p2p=False, prows=None, pcols=None, partition_type=1)[source]¶ Initialize a communicator for multi-node/multi-gpu communications. It is expected to be called right after client initialization for running multi-GPU algorithms (this wraps raft comms that manages underlying NCCL and UCX comms handles across the workers of a Dask cluster).
It is recommended to also call destroy() when the comms are no longer needed so the underlying resources can be cleaned up.
- Parameters
- commsraft Comms
A pre-initialized raft communicator. If provided, this is used for mnmg communications. If not provided, default comms are initialized as per client information.
- p2pbool
Initialize UCX endpoints if True. Default is False.
- prowsint
Specifies the number of rows when performing a 2D partitioning of the input graph. If specified, this must be a factor of the total number of parallel processes. When specified with pcols, prows*pcols should be equal to the total number of parallel processes.
- pcolsint
Specifies the number of columns when performing a 2D partitioning of the input graph. If specified, this must be a factor of the total number of parallel processes. When specified with prows, prows*pcols should be equal to the total number of parallel processes.
- partition_typeint
Valid values are currently 1 or any int other than 1. A value of 1 (the default) represents a partitioning resulting in prows*pcols partitions. A non-1 value currently results in a partitioning of p*pcols partitions, where p is the number of GPUs.
Consolidation¶
cuGraph can transparently interpret the Dask cuDF Dataframe as a regular Dataframe when loading the edge list. This is particularly helpful for workflows extracting a single GPU sized edge list from a distributed dataset. From there any existing single GPU feature will just work on this input.
For instance, consolidation allows leveraging Dask cuDF CSV reader to load file(s) on multiple GPUs and consolidate this input to a single GPU graph. Reading is often the time and memory bottleneck, with this feature users can call the Multi-GPU version of the reader without changing anything else.
Batch Processing¶
cuGraph can leverage multi GPUs to increase processing speed for graphs that fit on a single GPU, providing faster analytics on such graphs. You will be able to use the Graph the same way as you used to in a Single GPU environment, but analytics that support batch processing will automatically use the GPUs available to the dask client. For example, Betweenness Centrality scores can be slow to obtain depending on the number of vertices used in the approximation. Thank to Multi GPUs Batch Processing, you can create Single GPU graph as you would regularly do it using cuDF CSV reader, enable Batch analytics on it, and obtain scores much faster as each GPU will handle a sub-set of the sources. In order to use Batch Analytics you need to set up a Dask Cluster and Client in addition to the cuGraph communicator, then you can simply call enable_batch() on you graph, and algorithms supporting batch processing will use multiple GPUs.
Algorithms supporting Batch Processing¶
-
cugraph.centrality.
betweenness_centrality
(G, k=None, normalized=True, weight=None, endpoints=False, seed=None, result_dtype=<class 'numpy.float64'>)[source] Compute the betweenness centrality for all vertices of the graph G. Betweenness centrality is a measure of the number of shortest paths that pass through a vertex. A vertex with a high betweenness centrality score has more paths passing through it and is therefore believed to be more important. Rather than doing an all-pair shortest path, a sample of k starting vertices can be used.
CuGraph does not currently support the ‘endpoints’ and ‘weight’ parameters as seen in the corresponding networkX call.
- Parameters
- GcuGraph.Graph or networkx.Graph
The graph can be either directed (DiGraph) or undirected (Graph). Weights in the graph are ignored, the current implementation uses BFS traversals. Use weight parameter if weights need to be considered (currently not supported)
- kint or list or None, optional, default=None
If k is not None, use k node samples to estimate betweenness. Higher values give better approximation If k is a list, use the content of the list for estimation: the list should contain vertices identifiers. If k is None (the default), all the vertices are used to estimate betweenness. Vertices obtained through sampling or defined as a list will be used as sources for traversals inside the algorithm.
- normalizedbool, optional
Default is True. If true, the betweenness values are normalized by 2 / ((n - 1) * (n - 2)) for Graphs (undirected), and 1 / ((n - 1) * (n - 2)) for DiGraphs (directed graphs) where n is the number of nodes in G. Normalization will ensure that values are in [0, 1], this normalization scales for the highest possible value where one node is crossed by every single shortest path.
- weightcudf.DataFrame, optional, default=None
Specifies the weights to be used for each edge. Should contain a mapping between edges and weights. (Not Supported)
- endpointsbool, optional, default=False
If true, include the endpoints in the shortest path counts. (Not Supported)
- seedoptional
if k is specified and k is an integer, use seed to initialize the random number generator. Using None as seed relies on random.seed() behavior: using current system time If k is either None or list: seed parameter is ignored
- result_dtypenp.float32 or np.float64, optional, default=np.float64
Indicate the data type of the betweenness centrality scores
- Returns
- dfcudf.DataFrame or Dictionary if using NetworkX
GPU data frame containing two cudf.Series of size V: the vertex identifiers and the corresponding betweenness centrality values. Please note that the resulting the ‘vertex’ column might not be in ascending order. The Dictionary conatains the same two columns
- df[‘vertex’]cudf.Series
Contains the vertex identifiers
- df[‘betweenness_centrality’]cudf.Series
Contains the betweenness centrality of vertices
Examples
>>> gdf = cudf.read_csv('datasets/karate.csv', delimiter=' ', >>> dtype=['int32', 'int32', 'float32'], header=None) >>> G = cugraph.Graph() >>> G.from_cudf_edgelist(gdf, source='0', destination='1') >>> bc = cugraph.betweenness_centrality(G)
-
cugraph.centrality.
edge_betweenness_centrality
(G, k=None, normalized=True, weight=None, seed=None, result_dtype=<class 'numpy.float64'>)[source] Compute the edge betweenness centrality for all edges of the graph G. Betweenness centrality is a measure of the number of shortest paths that pass over an edge. An edge with a high betweenness centrality score has more paths passing over it and is therefore believed to be more important. Rather than doing an all-pair shortest path, a sample of k starting vertices can be used.
CuGraph does not currently support the ‘weight’ parameter as seen in the corresponding networkX call.
- Parameters
- GcuGraph.Graph or networkx.Graph
The graph can be either directed (DiGraph) or undirected (Graph). Weights in the graph are ignored, the current implementation uses BFS traversals. Use weight parameter if weights need to be considered (currently not supported)
- kint or list or None, optional, default=None
If k is not None, use k node samples to estimate betweenness. Higher values give better approximation If k is a list, use the content of the list for estimation: the list should contain vertices identifiers. Vertices obtained through sampling or defined as a list will be used as sources for traversals inside the algorithm.
- normalizedbool, optional
Default is True. If true, the betweenness values are normalized by 2 / (n * (n - 1)) for Graphs (undirected), and 1 / (n * (n - 1)) for DiGraphs (directed graphs) where n is the number of nodes in G. Normalization will ensure that values are in [0, 1], this normalization scales for the highest possible value where one edge is crossed by every single shortest path.
- weightcudf.DataFrame, optional, default=None
Specifies the weights to be used for each edge. Should contain a mapping between edges and weights. (Not Supported)
- seedoptional
if k is specified and k is an integer, use seed to initialize the random number generator. Using None as seed relies on random.seed() behavior: using current system time If k is either None or list: seed parameter is ignored
- result_dtypenp.float32 or np.float64, optional, default=np.float64
Indicate the data type of the betweenness centrality scores Using double automatically switch implementation to “default”
- Returns
- dfcudf.DataFrame or Dictionary if using NetworkX
GPU data frame containing three cudf.Series of size E: the vertex identifiers of the sources, the vertex identifies of the destinations and the corresponding betweenness centrality values. Please note that the resulting the ‘src’, ‘dst’ column might not be in ascending order.
- df[‘src’]cudf.Series
Contains the vertex identifiers of the source of each edge
- df[‘dst’]cudf.Series
Contains the vertex identifiers of the destination of each edge
- df[‘edge_betweenness_centrality’]cudf.Series
Contains the betweenness centrality of edges
When using undirected graphs, ‘src’ and ‘dst’ only contains elements such that ‘src’ < ‘dst’, which might differ from networkx and user’s input. Namely edge (1 -> 0) is transformed into (0 -> 1) but contains the betweenness centrality of edge (1 -> 0).
Examples
>>> gdf = cudf.read_csv('datasets/karate.csv', delimiter=' ', >>> dtype=['int32', 'int32', 'float32'], header=None) >>> G = cugraph.Graph() >>> G.from_cudf_edgelist(gdf, source='0', destination='1') >>> ebc = cugraph.edge_betweenness_centrality(G)