10 Minutes to cuDF and Dask-cuDF#
Modeled after 10 Minutes to Pandas, this is a short introduction to cuDF and Dask-cuDF, geared mainly for new users.
What are these Libraries?#
cuDF is a Python GPU DataFrame library (built on the Apache Arrow columnar memory format) for loading, joining, aggregating, filtering, and otherwise manipulating tabular data using a DataFrame style API.
Dask is a flexible library for parallel computing in Python that makes scaling out your workflow smooth and simple. On the CPU, Dask uses Pandas to execute operations in parallel on DataFrame partitions.
Dask-cuDF extends Dask where necessary to allow its DataFrame partitions to be processed by cuDF GPU DataFrames as opposed to Pandas DataFrames. For instance, when you call dask_cudf.read_csv(…), your cluster’s GPUs do the work of parsing the CSV file(s) with underlying cudf.read_csv().
When to use cuDF and Dask-cuDF#
If your workflow is fast enough on a single GPU or your data comfortably fits in memory on a single GPU, you would want to use cuDF. If you want to distribute your workflow across multiple GPUs, have more data than you can fit in memory on a single GPU, or want to analyze data spread across many files at once, you would want to use Dask-cuDF.
import os
import cupy as cp
import pandas as pd
import cudf
import dask_cudf
cp.random.seed(12)
#### Portions of this were borrowed and adapted from the
#### cuDF cheatsheet, existing cuDF documentation,
#### and 10 Minutes to Pandas.
Object Creation#
Creating a cudf.Series
and dask_cudf.Series
.
s = cudf.Series([1,2,3,None,4])
s
0 1
1 2
2 3
3 <NA>
4 4
dtype: int64
ds = dask_cudf.from_cudf(s, npartitions=2)
ds.compute()
../../thread/thread_load.cuh(36): warning: cuda.h: [jitify] File not found
../../thread/thread_store.cuh(36): warning: cuda.h: [jitify] File not found
0 1
1 2
2 3
3 <NA>
4 4
dtype: int64
Creating a cudf.DataFrame
and a dask_cudf.DataFrame
by specifying values for each column.
df = cudf.DataFrame({'a': list(range(20)),
'b': list(reversed(range(20))),
'c': list(range(20))
})
df
a | b | c | |
---|---|---|---|
0 | 0 | 19 | 0 |
1 | 1 | 18 | 1 |
2 | 2 | 17 | 2 |
3 | 3 | 16 | 3 |
4 | 4 | 15 | 4 |
5 | 5 | 14 | 5 |
6 | 6 | 13 | 6 |
7 | 7 | 12 | 7 |
8 | 8 | 11 | 8 |
9 | 9 | 10 | 9 |
10 | 10 | 9 | 10 |
11 | 11 | 8 | 11 |
12 | 12 | 7 | 12 |
13 | 13 | 6 | 13 |
14 | 14 | 5 | 14 |
15 | 15 | 4 | 15 |
16 | 16 | 3 | 16 |
17 | 17 | 2 | 17 |
18 | 18 | 1 | 18 |
19 | 19 | 0 | 19 |
ddf = dask_cudf.from_cudf(df, npartitions=2)
ddf.compute()
a | b | c | |
---|---|---|---|
0 | 0 | 19 | 0 |
1 | 1 | 18 | 1 |
2 | 2 | 17 | 2 |
3 | 3 | 16 | 3 |
4 | 4 | 15 | 4 |
5 | 5 | 14 | 5 |
6 | 6 | 13 | 6 |
7 | 7 | 12 | 7 |
8 | 8 | 11 | 8 |
9 | 9 | 10 | 9 |
10 | 10 | 9 | 10 |
11 | 11 | 8 | 11 |
12 | 12 | 7 | 12 |
13 | 13 | 6 | 13 |
14 | 14 | 5 | 14 |
15 | 15 | 4 | 15 |
16 | 16 | 3 | 16 |
17 | 17 | 2 | 17 |
18 | 18 | 1 | 18 |
19 | 19 | 0 | 19 |
Creating a cudf.DataFrame
from a pandas Dataframe
and a dask_cudf.Dataframe
from a cudf.Dataframe
.
Note that best practice for using Dask-cuDF is to read data directly into a dask_cudf.DataFrame
with something like read_csv
(discussed below).
pdf = pd.DataFrame({'a': [0, 1, 2, 3],'b': [0.1, 0.2, None, 0.3]})
gdf = cudf.DataFrame.from_pandas(pdf)
gdf
a | b | |
---|---|---|
0 | 0 | 0.1 |
1 | 1 | 0.2 |
2 | 2 | <NA> |
3 | 3 | 0.3 |
dask_gdf = dask_cudf.from_cudf(gdf, npartitions=2)
dask_gdf.compute()
a | b | |
---|---|---|
0 | 0 | 0.1 |
1 | 1 | 0.2 |
2 | 2 | <NA> |
3 | 3 | 0.3 |
Viewing Data#
Viewing the top rows of a GPU dataframe.
df.head(2)
a | b | c | |
---|---|---|---|
0 | 0 | 19 | 0 |
1 | 1 | 18 | 1 |
ddf.head(2)
a | b | c | |
---|---|---|---|
0 | 0 | 19 | 0 |
1 | 1 | 18 | 1 |
Sorting by values.
df.sort_values(by='b')
a | b | c | |
---|---|---|---|
19 | 19 | 0 | 19 |
18 | 18 | 1 | 18 |
17 | 17 | 2 | 17 |
16 | 16 | 3 | 16 |
15 | 15 | 4 | 15 |
14 | 14 | 5 | 14 |
13 | 13 | 6 | 13 |
12 | 12 | 7 | 12 |
11 | 11 | 8 | 11 |
10 | 10 | 9 | 10 |
9 | 9 | 10 | 9 |
8 | 8 | 11 | 8 |
7 | 7 | 12 | 7 |
6 | 6 | 13 | 6 |
5 | 5 | 14 | 5 |
4 | 4 | 15 | 4 |
3 | 3 | 16 | 3 |
2 | 2 | 17 | 2 |
1 | 1 | 18 | 1 |
0 | 0 | 19 | 0 |
ddf.sort_values(by='b').compute()
a | b | c | |
---|---|---|---|
19 | 19 | 0 | 19 |
18 | 18 | 1 | 18 |
17 | 17 | 2 | 17 |
16 | 16 | 3 | 16 |
15 | 15 | 4 | 15 |
14 | 14 | 5 | 14 |
13 | 13 | 6 | 13 |
12 | 12 | 7 | 12 |
11 | 11 | 8 | 11 |
10 | 10 | 9 | 10 |
9 | 9 | 10 | 9 |
8 | 8 | 11 | 8 |
7 | 7 | 12 | 7 |
6 | 6 | 13 | 6 |
5 | 5 | 14 | 5 |
4 | 4 | 15 | 4 |
3 | 3 | 16 | 3 |
2 | 2 | 17 | 2 |
1 | 1 | 18 | 1 |
0 | 0 | 19 | 0 |
Selection#
Getting#
Selecting a single column, which initially yields a cudf.Series
or dask_cudf.Series
. Calling compute
results in a cudf.Series
(equivalent to df.a
).
df['a']
0 0
1 1
2 2
3 3
4 4
5 5
6 6
7 7
8 8
9 9
10 10
11 11
12 12
13 13
14 14
15 15
16 16
17 17
18 18
19 19
Name: a, dtype: int64
ddf['a'].compute()
0 0
1 1
2 2
3 3
4 4
5 5
6 6
7 7
8 8
9 9
10 10
11 11
12 12
13 13
14 14
15 15
16 16
17 17
18 18
19 19
Name: a, dtype: int64
Selection by Label#
Selecting rows from index 2 to index 5 from columns ‘a’ and ‘b’.
df.loc[2:5, ['a', 'b']]
a | b | |
---|---|---|
2 | 2 | 17 |
3 | 3 | 16 |
4 | 4 | 15 |
5 | 5 | 14 |
ddf.loc[2:5, ['a', 'b']].compute()
a | b | |
---|---|---|
2 | 2 | 17 |
3 | 3 | 16 |
4 | 4 | 15 |
5 | 5 | 14 |
Selection by Position#
Selecting via integers and integer slices, like numpy/pandas. Note that this functionality is not available for Dask-cuDF DataFrames.
df.iloc[0]
a 0
b 19
c 0
Name: 0, dtype: int64
df.iloc[0:3, 0:2]
a | b | |
---|---|---|
0 | 0 | 19 |
1 | 1 | 18 |
2 | 2 | 17 |
You can also select elements of a DataFrame
or Series
with direct index access.
df[3:5]
a | b | c | |
---|---|---|---|
3 | 3 | 16 | 3 |
4 | 4 | 15 | 4 |
s[3:5]
3 <NA>
4 4
dtype: int64
Boolean Indexing#
Selecting rows in a DataFrame
or Series
by direct Boolean indexing.
df[df.b > 15]
a | b | c | |
---|---|---|---|
0 | 0 | 19 | 0 |
1 | 1 | 18 | 1 |
2 | 2 | 17 | 2 |
3 | 3 | 16 | 3 |
ddf[ddf.b > 15].compute()
a | b | c | |
---|---|---|---|
0 | 0 | 19 | 0 |
1 | 1 | 18 | 1 |
2 | 2 | 17 | 2 |
3 | 3 | 16 | 3 |
Selecting values from a DataFrame
where a Boolean condition is met, via the query
API.
df.query("b == 3")
a | b | c | |
---|---|---|---|
16 | 16 | 3 | 16 |
ddf.query("b == 3").compute()
a | b | c | |
---|---|---|---|
16 | 16 | 3 | 16 |
You can also pass local variables to Dask-cuDF queries, via the local_dict
keyword. With standard cuDF, you may either use the local_dict
keyword or directly pass the variable via the @
keyword. Supported logical operators include >
, <
, >=
, <=
, ==
, and !=
.
cudf_comparator = 3
df.query("b == @cudf_comparator")
a | b | c | |
---|---|---|---|
16 | 16 | 3 | 16 |
dask_cudf_comparator = 3
ddf.query("b == @val", local_dict={'val':dask_cudf_comparator}).compute()
a | b | c | |
---|---|---|---|
16 | 16 | 3 | 16 |
Using the isin
method for filtering.
df[df.a.isin([0, 5])]
a | b | c | |
---|---|---|---|
0 | 0 | 19 | 0 |
5 | 5 | 14 | 5 |
MultiIndex#
cuDF supports hierarchical indexing of DataFrames using MultiIndex. Grouping hierarchically (see Grouping
below) automatically produces a DataFrame with a MultiIndex.
arrays = [['a', 'a', 'b', 'b'], [1, 2, 3, 4]]
tuples = list(zip(*arrays))
idx = cudf.MultiIndex.from_tuples(tuples)
idx
MultiIndex([('a', 1),
('a', 2),
('b', 3),
('b', 4)],
)
This index can back either axis of a DataFrame.
gdf1 = cudf.DataFrame({'first': cp.random.rand(4), 'second': cp.random.rand(4)})
gdf1.index = idx
gdf1
first | second | ||
---|---|---|---|
a | 1 | 0.082654 | 0.967955 |
2 | 0.399417 | 0.441425 | |
b | 3 | 0.784297 | 0.793582 |
4 | 0.070303 | 0.271711 |
gdf2 = cudf.DataFrame({'first': cp.random.rand(4), 'second': cp.random.rand(4)}).T
gdf2.columns = idx
gdf2
/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/column_accessor.py:251: FutureWarning: In a future version, the Index constructor will not infer numeric dtypes when passed object-dtype sequences (matching Series behavior)
result = pd.MultiIndex.from_frame(
/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/column_accessor.py:251: FutureWarning: In a future version, the Index constructor will not infer numeric dtypes when passed object-dtype sequences (matching Series behavior)
result = pd.MultiIndex.from_frame(
/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/column_accessor.py:251: FutureWarning: In a future version, the Index constructor will not infer numeric dtypes when passed object-dtype sequences (matching Series behavior)
result = pd.MultiIndex.from_frame(
a | b | |||
---|---|---|---|---|
1 | 2 | 3 | 4 | |
first | 0.343382 | 0.003700 | 0.20043 | 0.581614 |
second | 0.907812 | 0.101512 | 0.24179 | 0.224180 |
Accessing values of a DataFrame with a MultiIndex. Note that slicing is not yet supported.
gdf1.loc[('b', 3)]
first 0.784297
second 0.793582
Name: ('b', 3), dtype: float64
Missing Data#
Missing data can be replaced by using the fillna
method.
s.fillna(999)
0 1
1 2
2 3
3 999
4 4
dtype: int64
ds.fillna(999).compute()
0 1
1 2
2 3
3 999
4 4
dtype: int64
Operations#
Stats#
Calculating descriptive statistics for a Series
.
s.mean(), s.var()
(2.5, 1.666666666666666)
ds.mean().compute(), ds.var().compute()
(2.5, 1.6666666666666667)
Applymap#
Applying functions to a Series
. Note that applying user defined functions directly with Dask-cuDF is not yet implemented. For now, you can use map_partitions to apply a function to each partition of the distributed dataframe.
def add_ten(num):
return num + 10
df['a'].apply(add_ten)
0 10
1 11
2 12
3 13
4 14
5 15
6 16
7 17
8 18
9 19
10 20
11 21
12 22
13 23
14 24
15 25
16 26
17 27
18 28
19 29
Name: a, dtype: int64
ddf['a'].map_partitions(add_ten).compute()
0 10
1 11
2 12
3 13
4 14
5 15
6 16
7 17
8 18
9 19
10 20
11 21
12 22
13 23
14 24
15 25
16 26
17 27
18 28
19 29
Name: a, dtype: int64
Histogramming#
Counting the number of occurrences of each unique value of variable.
df.a.value_counts()
15 1
6 1
1 1
14 1
2 1
5 1
11 1
7 1
17 1
13 1
8 1
16 1
0 1
10 1
4 1
9 1
19 1
18 1
3 1
12 1
Name: a, dtype: int32
ddf.a.value_counts().compute()
15 1
6 1
1 1
14 1
2 1
5 1
11 1
7 1
17 1
13 1
8 1
16 1
0 1
10 1
4 1
9 1
19 1
18 1
3 1
12 1
Name: a, dtype: int64
String Methods#
Like pandas, cuDF provides string processing methods in the str
attribute of Series
. Full documentation of string methods is a work in progress. Please see the cuDF API documentation for more information.
s = cudf.Series(['A', 'B', 'C', 'Aaba', 'Baca', None, 'CABA', 'dog', 'cat'])
s.str.lower()
0 a
1 b
2 c
3 aaba
4 baca
5 <NA>
6 caba
7 dog
8 cat
dtype: object
ds = dask_cudf.from_cudf(s, npartitions=2)
ds.str.lower().compute()
0 a
1 b
2 c
3 aaba
4 baca
5 <NA>
6 caba
7 dog
8 cat
dtype: object
Concat#
Concatenating Series
and DataFrames
row-wise.
s = cudf.Series([1, 2, 3, None, 5])
cudf.concat([s, s])
0 1
1 2
2 3
3 <NA>
4 5
0 1
1 2
2 3
3 <NA>
4 5
dtype: int64
ds2 = dask_cudf.from_cudf(s, npartitions=2)
dask_cudf.concat([ds2, ds2]).compute()
0 1
1 2
2 3
3 <NA>
4 5
0 1
1 2
2 3
3 <NA>
4 5
dtype: int64
Join#
Performing SQL style merges. Note that the dataframe order is not maintained, but may be restored post-merge by sorting by the index.
df_a = cudf.DataFrame()
df_a['key'] = ['a', 'b', 'c', 'd', 'e']
df_a['vals_a'] = [float(i + 10) for i in range(5)]
df_b = cudf.DataFrame()
df_b['key'] = ['a', 'c', 'e']
df_b['vals_b'] = [float(i+100) for i in range(3)]
merged = df_a.merge(df_b, on=['key'], how='left')
merged
key | vals_a | vals_b | |
---|---|---|---|
0 | a | 10.0 | 100.0 |
1 | c | 12.0 | 101.0 |
2 | e | 14.0 | 102.0 |
3 | b | 11.0 | <NA> |
4 | d | 13.0 | <NA> |
ddf_a = dask_cudf.from_cudf(df_a, npartitions=2)
ddf_b = dask_cudf.from_cudf(df_b, npartitions=2)
merged = ddf_a.merge(ddf_b, on=['key'], how='left').compute()
merged
key | vals_a | vals_b | |
---|---|---|---|
0 | c | 12.0 | 101.0 |
1 | e | 14.0 | 102.0 |
2 | b | 11.0 | <NA> |
3 | d | 13.0 | <NA> |
0 | a | 10.0 | 100.0 |
Grouping#
Like pandas, cuDF and Dask-cuDF support the Split-Apply-Combine groupby paradigm.
df['agg_col1'] = [1 if x % 2 == 0 else 0 for x in range(len(df))]
df['agg_col2'] = [1 if x % 3 == 0 else 0 for x in range(len(df))]
ddf = dask_cudf.from_cudf(df, npartitions=2)
Grouping and then applying the sum
function to the grouped data.
df.groupby('agg_col1').sum()
a | b | c | agg_col2 | |
---|---|---|---|---|
agg_col1 | ||||
1 | 90 | 100 | 90 | 4 |
0 | 100 | 90 | 100 | 3 |
ddf.groupby('agg_col1').sum().compute()
a | b | c | agg_col2 | |
---|---|---|---|---|
agg_col1 | ||||
1 | 90 | 100 | 90 | 4 |
0 | 100 | 90 | 100 | 3 |
Grouping hierarchically then applying the sum
function to grouped data.
df.groupby(['agg_col1', 'agg_col2']).sum()
a | b | c | ||
---|---|---|---|---|
agg_col1 | agg_col2 | |||
1 | 0 | 54 | 60 | 54 |
0 | 0 | 73 | 60 | 73 |
1 | 1 | 36 | 40 | 36 |
0 | 1 | 27 | 30 | 27 |
ddf.groupby(['agg_col1', 'agg_col2']).sum().compute()
a | b | c | ||
---|---|---|---|---|
agg_col1 | agg_col2 | |||
1 | 1 | 36 | 40 | 36 |
0 | 0 | 73 | 60 | 73 |
1 | 0 | 54 | 60 | 54 |
0 | 1 | 27 | 30 | 27 |
Grouping and applying statistical functions to specific columns, using agg
.
df.groupby('agg_col1').agg({'a':'max', 'b':'mean', 'c':'sum'})
a | b | c | |
---|---|---|---|
agg_col1 | |||
1 | 18 | 10.0 | 90 |
0 | 19 | 9.0 | 100 |
ddf.groupby('agg_col1').agg({'a':'max', 'b':'mean', 'c':'sum'}).compute()
a | b | c | |
---|---|---|---|
agg_col1 | |||
1 | 18 | 10.0 | 90 |
0 | 19 | 9.0 | 100 |
Transpose#
Transposing a dataframe, using either the transpose
method or T
property. Currently, all columns must have the same type. Transposing is not currently implemented in Dask-cuDF.
sample = cudf.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6]})
sample
a | b | |
---|---|---|
0 | 1 | 4 |
1 | 2 | 5 |
2 | 3 | 6 |
sample.transpose()
0 | 1 | 2 | |
---|---|---|---|
a | 1 | 2 | 3 |
b | 4 | 5 | 6 |
Time Series#
DataFrames
supports datetime
typed columns, which allow users to interact with and filter data based on specific timestamps.
import datetime as dt
date_df = cudf.DataFrame()
date_df['date'] = pd.date_range('11/20/2018', periods=72, freq='D')
date_df['value'] = cp.random.sample(len(date_df))
search_date = dt.datetime.strptime('2018-11-23', '%Y-%m-%d')
date_df.query('date <= @search_date')
date | value | |
---|---|---|
0 | 2018-11-20 | 0.986051 |
1 | 2018-11-21 | 0.232034 |
2 | 2018-11-22 | 0.397617 |
3 | 2018-11-23 | 0.103839 |
date_ddf = dask_cudf.from_cudf(date_df, npartitions=2)
date_ddf.query('date <= @search_date', local_dict={'search_date':search_date}).compute()
date | value | |
---|---|---|
0 | 2018-11-20 | 0.986051 |
1 | 2018-11-21 | 0.232034 |
2 | 2018-11-22 | 0.397617 |
3 | 2018-11-23 | 0.103839 |
Categoricals#
DataFrames
support categorical columns.
gdf = cudf.DataFrame({"id": [1, 2, 3, 4, 5, 6], "grade":['a', 'b', 'b', 'a', 'a', 'e']})
gdf['grade'] = gdf['grade'].astype('category')
gdf
id | grade | |
---|---|---|
0 | 1 | a |
1 | 2 | b |
2 | 3 | b |
3 | 4 | a |
4 | 5 | a |
5 | 6 | e |
dgdf = dask_cudf.from_cudf(gdf, npartitions=2)
dgdf.compute()
id | grade | |
---|---|---|
0 | 1 | a |
1 | 2 | b |
2 | 3 | b |
3 | 4 | a |
4 | 5 | a |
5 | 6 | e |
Accessing the categories of a column. Note that this is currently not supported in Dask-cuDF.
gdf.grade.cat.categories
StringIndex(['a' 'b' 'e'], dtype='object')
Accessing the underlying code values of each categorical observation.
gdf.grade.cat.codes
0 0
1 1
2 1
3 0
4 0
5 2
dtype: uint8
dgdf.grade.cat.codes.compute()
0 0
1 1
2 1
3 0
4 0
5 2
dtype: uint8
Converting Data Representation#
Pandas#
Converting a cuDF and Dask-cuDF DataFrame
to a pandas DataFrame
.
df.head().to_pandas()
a | b | c | agg_col1 | agg_col2 | |
---|---|---|---|---|---|
0 | 0 | 19 | 0 | 1 | 1 |
1 | 1 | 18 | 1 | 0 | 0 |
2 | 2 | 17 | 2 | 1 | 0 |
3 | 3 | 16 | 3 | 0 | 1 |
4 | 4 | 15 | 4 | 1 | 0 |
ddf.compute().head().to_pandas()
a | b | c | agg_col1 | agg_col2 | |
---|---|---|---|---|---|
0 | 0 | 19 | 0 | 1 | 1 |
1 | 1 | 18 | 1 | 0 | 0 |
2 | 2 | 17 | 2 | 1 | 0 |
3 | 3 | 16 | 3 | 0 | 1 |
4 | 4 | 15 | 4 | 1 | 0 |
Numpy#
Converting a cuDF or Dask-cuDF DataFrame
to a numpy ndarray
.
df.to_numpy()
array([[ 0, 19, 0, 1, 1],
[ 1, 18, 1, 0, 0],
[ 2, 17, 2, 1, 0],
[ 3, 16, 3, 0, 1],
[ 4, 15, 4, 1, 0],
[ 5, 14, 5, 0, 0],
[ 6, 13, 6, 1, 1],
[ 7, 12, 7, 0, 0],
[ 8, 11, 8, 1, 0],
[ 9, 10, 9, 0, 1],
[10, 9, 10, 1, 0],
[11, 8, 11, 0, 0],
[12, 7, 12, 1, 1],
[13, 6, 13, 0, 0],
[14, 5, 14, 1, 0],
[15, 4, 15, 0, 1],
[16, 3, 16, 1, 0],
[17, 2, 17, 0, 0],
[18, 1, 18, 1, 1],
[19, 0, 19, 0, 0]])
ddf.compute().to_numpy()
array([[ 0, 19, 0, 1, 1],
[ 1, 18, 1, 0, 0],
[ 2, 17, 2, 1, 0],
[ 3, 16, 3, 0, 1],
[ 4, 15, 4, 1, 0],
[ 5, 14, 5, 0, 0],
[ 6, 13, 6, 1, 1],
[ 7, 12, 7, 0, 0],
[ 8, 11, 8, 1, 0],
[ 9, 10, 9, 0, 1],
[10, 9, 10, 1, 0],
[11, 8, 11, 0, 0],
[12, 7, 12, 1, 1],
[13, 6, 13, 0, 0],
[14, 5, 14, 1, 0],
[15, 4, 15, 0, 1],
[16, 3, 16, 1, 0],
[17, 2, 17, 0, 0],
[18, 1, 18, 1, 1],
[19, 0, 19, 0, 0]])
Converting a cuDF or Dask-cuDF Series
to a numpy ndarray
.
df['a'].to_numpy()
array([ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
17, 18, 19])
ddf['a'].compute().to_numpy()
array([ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
17, 18, 19])
Arrow#
Converting a cuDF or Dask-cuDF DataFrame
to a PyArrow Table
.
df.to_arrow()
pyarrow.Table
a: int64
b: int64
c: int64
agg_col1: int64
agg_col2: int64
----
a: [[0,1,2,3,4,...,15,16,17,18,19]]
b: [[19,18,17,16,15,...,4,3,2,1,0]]
c: [[0,1,2,3,4,...,15,16,17,18,19]]
agg_col1: [[1,0,1,0,1,...,0,1,0,1,0]]
agg_col2: [[1,0,0,1,0,...,1,0,0,1,0]]
ddf.compute().to_arrow()
pyarrow.Table
a: int64
b: int64
c: int64
agg_col1: int64
agg_col2: int64
----
a: [[0,1,2,3,4,...,15,16,17,18,19]]
b: [[19,18,17,16,15,...,4,3,2,1,0]]
c: [[0,1,2,3,4,...,15,16,17,18,19]]
agg_col1: [[1,0,1,0,1,...,0,1,0,1,0]]
agg_col2: [[1,0,0,1,0,...,1,0,0,1,0]]
Getting Data In/Out#
CSV#
Writing to a CSV file.
if not os.path.exists('example_output'):
os.mkdir('example_output')
df.to_csv('example_output/foo.csv', index=False)
ddf.compute().to_csv('example_output/foo_dask.csv', index=False)
Reading from a csv file.
df = cudf.read_csv('example_output/foo.csv')
df
a | b | c | agg_col1 | agg_col2 | |
---|---|---|---|---|---|
0 | 0 | 19 | 0 | 1 | 1 |
1 | 1 | 18 | 1 | 0 | 0 |
2 | 2 | 17 | 2 | 1 | 0 |
3 | 3 | 16 | 3 | 0 | 1 |
4 | 4 | 15 | 4 | 1 | 0 |
5 | 5 | 14 | 5 | 0 | 0 |
6 | 6 | 13 | 6 | 1 | 1 |
7 | 7 | 12 | 7 | 0 | 0 |
8 | 8 | 11 | 8 | 1 | 0 |
9 | 9 | 10 | 9 | 0 | 1 |
10 | 10 | 9 | 10 | 1 | 0 |
11 | 11 | 8 | 11 | 0 | 0 |
12 | 12 | 7 | 12 | 1 | 1 |
13 | 13 | 6 | 13 | 0 | 0 |
14 | 14 | 5 | 14 | 1 | 0 |
15 | 15 | 4 | 15 | 0 | 1 |
16 | 16 | 3 | 16 | 1 | 0 |
17 | 17 | 2 | 17 | 0 | 0 |
18 | 18 | 1 | 18 | 1 | 1 |
19 | 19 | 0 | 19 | 0 | 0 |
ddf = dask_cudf.read_csv('example_output/foo_dask.csv')
ddf.compute()
a | b | c | agg_col1 | agg_col2 | |
---|---|---|---|---|---|
0 | 0 | 19 | 0 | 1 | 1 |
1 | 1 | 18 | 1 | 0 | 0 |
2 | 2 | 17 | 2 | 1 | 0 |
3 | 3 | 16 | 3 | 0 | 1 |
4 | 4 | 15 | 4 | 1 | 0 |
5 | 5 | 14 | 5 | 0 | 0 |
6 | 6 | 13 | 6 | 1 | 1 |
7 | 7 | 12 | 7 | 0 | 0 |
8 | 8 | 11 | 8 | 1 | 0 |
9 | 9 | 10 | 9 | 0 | 1 |
10 | 10 | 9 | 10 | 1 | 0 |
11 | 11 | 8 | 11 | 0 | 0 |
12 | 12 | 7 | 12 | 1 | 1 |
13 | 13 | 6 | 13 | 0 | 0 |
14 | 14 | 5 | 14 | 1 | 0 |
15 | 15 | 4 | 15 | 0 | 1 |
16 | 16 | 3 | 16 | 1 | 0 |
17 | 17 | 2 | 17 | 0 | 0 |
18 | 18 | 1 | 18 | 1 | 1 |
19 | 19 | 0 | 19 | 0 | 0 |
Reading all CSV files in a directory into a single dask_cudf.DataFrame
, using the star wildcard.
ddf = dask_cudf.read_csv('example_output/*.csv')
ddf.compute()
a | b | c | agg_col1 | agg_col2 | |
---|---|---|---|---|---|
0 | 0 | 19 | 0 | 1 | 1 |
1 | 1 | 18 | 1 | 0 | 0 |
2 | 2 | 17 | 2 | 1 | 0 |
3 | 3 | 16 | 3 | 0 | 1 |
4 | 4 | 15 | 4 | 1 | 0 |
5 | 5 | 14 | 5 | 0 | 0 |
6 | 6 | 13 | 6 | 1 | 1 |
7 | 7 | 12 | 7 | 0 | 0 |
8 | 8 | 11 | 8 | 1 | 0 |
9 | 9 | 10 | 9 | 0 | 1 |
10 | 10 | 9 | 10 | 1 | 0 |
11 | 11 | 8 | 11 | 0 | 0 |
12 | 12 | 7 | 12 | 1 | 1 |
13 | 13 | 6 | 13 | 0 | 0 |
14 | 14 | 5 | 14 | 1 | 0 |
15 | 15 | 4 | 15 | 0 | 1 |
16 | 16 | 3 | 16 | 1 | 0 |
17 | 17 | 2 | 17 | 0 | 0 |
18 | 18 | 1 | 18 | 1 | 1 |
19 | 19 | 0 | 19 | 0 | 0 |
0 | 0 | 19 | 0 | 1 | 1 |
1 | 1 | 18 | 1 | 0 | 0 |
2 | 2 | 17 | 2 | 1 | 0 |
3 | 3 | 16 | 3 | 0 | 1 |
4 | 4 | 15 | 4 | 1 | 0 |
5 | 5 | 14 | 5 | 0 | 0 |
6 | 6 | 13 | 6 | 1 | 1 |
7 | 7 | 12 | 7 | 0 | 0 |
8 | 8 | 11 | 8 | 1 | 0 |
9 | 9 | 10 | 9 | 0 | 1 |
10 | 10 | 9 | 10 | 1 | 0 |
11 | 11 | 8 | 11 | 0 | 0 |
12 | 12 | 7 | 12 | 1 | 1 |
13 | 13 | 6 | 13 | 0 | 0 |
14 | 14 | 5 | 14 | 1 | 0 |
15 | 15 | 4 | 15 | 0 | 1 |
16 | 16 | 3 | 16 | 1 | 0 |
17 | 17 | 2 | 17 | 0 | 0 |
18 | 18 | 1 | 18 | 1 | 1 |
19 | 19 | 0 | 19 | 0 | 0 |
Parquet#
Writing to parquet files with GPU-accelerated parquet writer
df.to_parquet('example_output/temp_parquet')
Reading parquet files with a GPU-accelerated parquet reader.
df = cudf.read_parquet('example_output/temp_parquet')
df
a | b | c | agg_col1 | agg_col2 | |
---|---|---|---|---|---|
0 | 0 | 19 | 0 | 1 | 1 |
1 | 1 | 18 | 1 | 0 | 0 |
2 | 2 | 17 | 2 | 1 | 0 |
3 | 3 | 16 | 3 | 0 | 1 |
4 | 4 | 15 | 4 | 1 | 0 |
5 | 5 | 14 | 5 | 0 | 0 |
6 | 6 | 13 | 6 | 1 | 1 |
7 | 7 | 12 | 7 | 0 | 0 |
8 | 8 | 11 | 8 | 1 | 0 |
9 | 9 | 10 | 9 | 0 | 1 |
10 | 10 | 9 | 10 | 1 | 0 |
11 | 11 | 8 | 11 | 0 | 0 |
12 | 12 | 7 | 12 | 1 | 1 |
13 | 13 | 6 | 13 | 0 | 0 |
14 | 14 | 5 | 14 | 1 | 0 |
15 | 15 | 4 | 15 | 0 | 1 |
16 | 16 | 3 | 16 | 1 | 0 |
17 | 17 | 2 | 17 | 0 | 0 |
18 | 18 | 1 | 18 | 1 | 1 |
19 | 19 | 0 | 19 | 0 | 0 |
Writing to parquet files from a dask_cudf.DataFrame
using PyArrow under the hood.
ddf.to_parquet('example_output/ddf_parquet_files')
ORC#
Reading ORC files.
from pathlib import Path
cudf_root = Path(".").absolute().parents[3]
orc_file = Path("python/cudf/cudf/tests/data/orc/TestOrcFile.test1.orc")
file_path = cudf_root / orc_file
df2 = cudf.read_orc(file_path)
df2
boolean1 | byte1 | short1 | int1 | long1 | float1 | double1 | bytes1 | string1 | middle | list | map | |
---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | False | 1 | 1024 | 65536 | 9223372036854775807 | 1.0 | -15.0 | hi | {'list': [{'int1': 1, 'string1': 'bye'}, {'int... | [{'int1': 3, 'string1': 'good'}, {'int1': 4, '... | [] | |
1 | True | 100 | 2048 | 65536 | 9223372036854775807 | 2.0 | -5.0 | bye | {'list': [{'int1': 1, 'string1': 'bye'}, {'int... | [{'int1': 100000000, 'string1': 'cat'}, {'int1... | [{'key': 'chani', 'value': {'int1': 5, 'string... |
Dask Performance Tips#
Like Apache Spark, Dask operations are lazy. Instead of being executed at that moment, most operations are added to a task graph and the actual evaluation is delayed until the result is needed.
Sometimes, though, we want to force the execution of operations. Calling persist
on a Dask collection fully computes it (or actively computes it in the background), persisting the result into memory. When we’re using distributed systems, we may want to wait until persist
is finished before beginning any downstream operations. We can enforce this contract by using wait
. Wrapping an operation with wait
will ensure it doesn’t begin executing until all necessary upstream operations have finished.
The snippets below provide basic examples, using LocalCUDACluster
to create one dask-worker per GPU on the local machine. For more detailed information about persist
and wait
, please see the Dask documentation for persist and wait. Wait relies on the concept of Futures, which is beyond the scope of this tutorial. For more information on Futures, see the Dask Futures documentation. For more information about multi-GPU clusters, please see the dask-cuda library (documentation is in progress).
First, we set up a GPU cluster. With our client
set up, Dask-cuDF computation will be distributed across the GPUs in the cluster.
import time
from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster
cluster = LocalCUDACluster()
client = Client(cluster)
2022-12-10 10:40:00,278 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize
2022-12-10 10:40:00,278 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
2022-12-10 10:40:00,310 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize
2022-12-10 10:40:00,310 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
2022-12-10 10:40:00,353 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize
2022-12-10 10:40:00,354 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
2022-12-10 10:40:00,421 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize
2022-12-10 10:40:00,421 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
2022-12-10 10:40:00,449 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize
2022-12-10 10:40:00,449 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
Persisting Data#
Next, we create our Dask-cuDF DataFrame and apply a transformation, storing the result as a new column.
nrows = 10000000
df2 = cudf.DataFrame({'a': cp.arange(nrows), 'b': cp.arange(nrows)})
ddf2 = dask_cudf.from_cudf(df2, npartitions=5)
ddf2['c'] = ddf2['a'] + 5
ddf2
a | b | c | |
---|---|---|---|
npartitions=5 | |||
0 | int64 | int64 | int64 |
2000000 | ... | ... | ... |
... | ... | ... | ... |
8000000 | ... | ... | ... |
9999999 | ... | ... | ... |
!nvidia-smi
Sat Dec 10 10:40:04 2022
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 495.29.05 Driver Version: 495.29.05 CUDA Version: 11.5 |
|-------------------------------+----------------------+----------------------+
| GPU Name Persistence-M| Bus-Id Disp.A | Volatile Uncorr. ECC |
| Fan Temp Perf Pwr:Usage/Cap| Memory-Usage | GPU-Util Compute M. |
| | | MIG M. |
|===============================+======================+======================|
| 0 Tesla V100-PCIE... On | 00000000:02:00.0 Off | 0 |
| N/A 25C P0 32W / 250W | 1172MiB / 32510MiB | 0% Default |
| | | N/A |
+-------------------------------+----------------------+----------------------+
| 1 Tesla V100-PCIE... On | 00000000:05:00.0 Off | 0 |
| N/A 26C P0 33W / 250W | 307MiB / 32510MiB | 0% Default |
| | | N/A |
+-------------------------------+----------------------+----------------------+
| 2 Tesla V100-PCIE... On | 00000000:06:00.0 Off | 0 |
| N/A 27C P0 33W / 250W | 307MiB / 32510MiB | 0% Default |
| | | N/A |
+-------------------------------+----------------------+----------------------+
| 3 Tesla V100-PCIE... On | 00000000:81:00.0 Off | 0 |
| N/A 24C P0 33W / 250W | 307MiB / 32510MiB | 0% Default |
| | | N/A |
+-------------------------------+----------------------+----------------------+
| 4 Tesla V100-PCIE... On | 00000000:85:00.0 Off | 0 |
| N/A 24C P0 33W / 250W | 307MiB / 32510MiB | 0% Default |
| | | N/A |
+-------------------------------+----------------------+----------------------+
+-----------------------------------------------------------------------------+
| Processes: |
| GPU GI CI PID Type Process name GPU Memory |
| ID ID Usage |
|=============================================================================|
+-----------------------------------------------------------------------------+
Because Dask is lazy, the computation has not yet occurred. We can see that there are twenty tasks in the task graph and we’ve used about 800 MB of memory. We can force computation by using persist
. By forcing execution, the result is now explicitly in memory and our task graph only contains one task per partition (the baseline).
ddf2 = ddf2.persist()
ddf2
a | b | c | |
---|---|---|---|
npartitions=5 | |||
0 | int64 | int64 | int64 |
2000000 | ... | ... | ... |
... | ... | ... | ... |
8000000 | ... | ... | ... |
9999999 | ... | ... | ... |
# Sleep to ensure the persist finishes and shows in the memory usage
!sleep 5; nvidia-smi
Sat Dec 10 10:40:11 2022
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 495.29.05 Driver Version: 495.29.05 CUDA Version: 11.5 |
|-------------------------------+----------------------+----------------------+
| GPU Name Persistence-M| Bus-Id Disp.A | Volatile Uncorr. ECC |
| Fan Temp Perf Pwr:Usage/Cap| Memory-Usage | GPU-Util Compute M. |
| | | MIG M. |
|===============================+======================+======================|
| 0 Tesla V100-PCIE... On | 00000000:02:00.0 Off | 0 |
| N/A 26C P0 32W / 250W | 1582MiB / 32510MiB | 0% Default |
| | | N/A |
+-------------------------------+----------------------+----------------------+
| 1 Tesla V100-PCIE... On | 00000000:05:00.0 Off | 0 |
| N/A 27C P0 33W / 250W | 717MiB / 32510MiB | 0% Default |
| | | N/A |
+-------------------------------+----------------------+----------------------+
| 2 Tesla V100-PCIE... On | 00000000:06:00.0 Off | 0 |
| N/A 28C P0 33W / 250W | 717MiB / 32510MiB | 0% Default |
| | | N/A |
+-------------------------------+----------------------+----------------------+
| 3 Tesla V100-PCIE... On | 00000000:81:00.0 Off | 0 |
| N/A 24C P0 34W / 250W | 717MiB / 32510MiB | 0% Default |
| | | N/A |
+-------------------------------+----------------------+----------------------+
| 4 Tesla V100-PCIE... On | 00000000:85:00.0 Off | 0 |
| N/A 25C P0 33W / 250W | 717MiB / 32510MiB | 0% Default |
| | | N/A |
+-------------------------------+----------------------+----------------------+
+-----------------------------------------------------------------------------+
| Processes: |
| GPU GI CI PID Type Process name GPU Memory |
| ID ID Usage |
|=============================================================================|
+-----------------------------------------------------------------------------+
Because we forced computation, we now have a larger object in distributed GPU memory.
Wait#
Depending on our workflow or distributed computing setup, we may want to wait
until all upstream tasks have finished before proceeding with a specific function. This section shows an example of this behavior, adapted from the Dask documentation.
First, we create a new Dask DataFrame and define a function that we’ll map to every partition in the dataframe.
import random
nrows = 10000000
df1 = cudf.DataFrame({'a': cp.arange(nrows), 'b': cp.arange(nrows)})
ddf1 = dask_cudf.from_cudf(df1, npartitions=100)
def func(df):
time.sleep(random.randint(1, 10))
return (df + 5) * 3 - 11
This function will do a basic transformation of every column in the dataframe, but the time spent in the function will vary due to the time.sleep
statement randomly adding 1-10 seconds of time. We’ll run this on every partition of our dataframe using map_partitions
, which adds the task to our task-graph, and store the result. We can then call persist
to force execution.
results_ddf = ddf2.map_partitions(func)
results_ddf = results_ddf.persist()
However, some partitions will be done much sooner than others. If we had downstream processes that should wait for all partitions to be completed, we can enforce that behavior using wait
.
wait(results_ddf)
DoneAndNotDoneFutures(done={<Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-be8d4e73ff1e36fde22fcb0c38753bf0', 1)>, <Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-be8d4e73ff1e36fde22fcb0c38753bf0', 4)>, <Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-be8d4e73ff1e36fde22fcb0c38753bf0', 2)>, <Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-be8d4e73ff1e36fde22fcb0c38753bf0', 3)>, <Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-be8d4e73ff1e36fde22fcb0c38753bf0', 0)>}, not_done=set())
With wait
, we can safely proceed on in our workflow.