{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Overview of User Defined Functions with cuDF" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Like many tabular data processing APIs, cuDF provides a range of composable, DataFrame style operators. While out of the box functions are flexible and useful, it is sometimes necessary to write custom code, or user-defined functions (UDFs), that can be applied to rows, columns, and other groupings of the cells making up the DataFrame.\n", "\n", "In conjunction with the broader GPU PyData ecosystem, cuDF provides interfaces to run UDFs on a variety of data structures. Currently, we can only execute UDFs on numeric and Boolean typed data (support for strings is being planned). This guide covers writing and executing UDFs on the following data structures:\n", "\n", "- Series\n", "- DataFrame\n", "- Rolling Windows Series\n", "- Groupby DataFrames\n", "- CuPy NDArrays\n", "- Numba DeviceNDArrays\n", "\n", "It also demonstrates cuDF's default null handling behavior, and how to write UDFs that can interact with null values in a limited fashion." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Overview\n", "\n", "When cuDF executes a UDF, it gets just-in-time (JIT) compiled into a CUDA kernel (either explicitly or implicitly) and is run on the GPU. Exploring CUDA and GPU architecture in-depth is out of scope for this guide. At a high level:\n", "\n", "- Compute is spread across multiple \"blocks\", which have access to both global memory and their own block local memory\n", "- Within each block, many \"threads\" operate independently and simultaneously access their block-specific shared memory with low latency\n", "\n", "\n", "This guide covers APIs that automatically handle dividing columns into chunks and assigning them into different GPU blocks for parallel computation (see [apply_chunks](https://docs.rapids.ai/api/cudf/stable/api.html#cudf.core.dataframe.DataFrame.apply_chunks) or the [numba CUDA JIT API](https://numba.pydata.org/numba-doc/dev/cuda/index.html) if you need to control this yourself)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Series UDFs\n", "\n", "You can execute UDFs on Series in two ways:\n", "\n", "- Writing a standard Python function and using `applymap`\n", "- Writing a Numba kernel and using Numba's `forall` syntax\n", "\n", "Using `applymap` is simpler, but writing a Numba kernel offers the flexibility to build more complex functions (we'll be writing only simple kernels in this guide).\n", "\n", "Let's start by importing a few libraries and creating a DataFrame of several Series." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
abc
0-0.691674TrueDan
10.480099FalseBob
2-0.473370TrueXavier
30.067479TrueAlice
4-0.970850FalseSarah
\n", "
" ], "text/plain": [ " a b c\n", "0 -0.691674 True Dan\n", "1 0.480099 False Bob\n", "2 -0.473370 True Xavier\n", "3 0.067479 True Alice\n", "4 -0.970850 False Sarah" ] }, "execution_count": 1, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import numpy as np\n", "\n", "import cudf\n", "from cudf.datasets import randomdata \n", "\n", "df = randomdata(nrows=10, dtypes={'a':float, 'b':bool, 'c':str}, seed=12)\n", "df.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, we'll define a basic Python function and call it as a UDF with `applymap`." ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "def udf(x):\n", " if x > 0:\n", " return x + 5\n", " else:\n", " return x - 5" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "0 -5.691674\n", "1 5.480099\n", "2 -5.473370\n", "3 5.067479\n", "4 -5.970850\n", "5 5.837494\n", "6 5.801430\n", "7 -5.933157\n", "8 5.913899\n", "9 -5.725581\n", "Name: a, dtype: float64" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df['a'].applymap(udf)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "That's all there is to it. For more complex UDFs, though, we'd want to write an actual Numba kernel.\n", "\n", "For more complex logic (for instance, accessing values from multiple input columns or rows, you'll need to use a more complex API. There are several types. First we'll cover writing and running a Numba JITed CUDA kernel.\n", "\n", "The easiest way to write a Numba kernel is to use `cuda.grid(1)` to manage our thread indices, and then leverage Numba's `forall` method to configure the kernel for us. Below, define a basic multiplication kernel as an example and use `@cuda.jit` to compile it." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "from numba import cuda\n", "\n", "@cuda.jit\n", "def multiply(in_col, out_col, multiplier):\n", " i = cuda.grid(1)\n", " if i < in_col.size: # boundary guard\n", " out_col[i] = in_col[i] * multiplier" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This kernel will take an input array, multiply it by a configurable value (supplied at runtime), and store the result in an output array. Notice that we wrapped our logic in an `if` statement. Because we can launch more threads than the size of our array, we need to make sure that we don't use threads with an index that would be out of bounds. Leaving this out can result in undefined behavior.\n", "\n", "To execute our kernel, we just need to pre-allocate an output array and leverage the `forall` method mentioned above. First, we create a Series of all `0.0` in our DataFrame, since we want `float64` output. Next, we run the kernel with `forall`. `forall` requires us to specify our desired number of tasks, so we'll supply in the length of our Series (which we store in `size`). The [__cuda_array_interface__](https://numba.pydata.org/numba-doc/dev/cuda/cuda_array_interface.html) is what allows us to directly call our Numba kernel on our Series." ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "size = len(df['a'])\n", "df['e'] = 0.0\n", "multiply.forall(size)(df['a'], df['e'], 10.0)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "After calling our kernel, our DataFrame is now populated with the result." ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
abce
0-0.691674TrueDan-6.916743
10.480099FalseBob4.800994
2-0.473370TrueXavier-4.733700
30.067479TrueAlice0.674788
4-0.970850FalseSarah-9.708501
\n", "
" ], "text/plain": [ " a b c e\n", "0 -0.691674 True Dan -6.916743\n", "1 0.480099 False Bob 4.800994\n", "2 -0.473370 True Xavier -4.733700\n", "3 0.067479 True Alice 0.674788\n", "4 -0.970850 False Sarah -9.708501" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note that, while we're operating on the Series `df['e']`, the kernel executes on the [DeviceNDArray](https://numba.pydata.org/numba-doc/dev/cuda/memory.html#device-arrays) \\\"underneath\\\" the Series. If you ever need to access the underlying DeviceNDArray of a Series, you can do so with `Series.data.mem`. We'll use this during an example in the Null Handling section of this guide." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## DataFrame UDFs\n", "\n", "We could apply a UDF on a DataFrame like we did above with `forall`. We'd need to write a kernel that expects multiple inputs, and pass multiple Series as arguments when we execute our kernel. Because this is fairly common and can be difficult to manage, cuDF provides two APIs to streamline this: `apply_rows` and `apply_chunks`. Below, we walk through an example of using `apply_rows`. `apply_chunks` works in a similar way, but also offers more control over low-level kernel behavior.\n", "\n", "Now that we have two numeric columns in our DataFrame, let's write a kernel that uses both of them." ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "def conditional_add(x, y, out):\n", " for i, (a, e) in enumerate(zip(x, y)):\n", " if a > 0:\n", " out[i] = a + e\n", " else:\n", " out[i] = a" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Notice that we need to `enumerate` through our `zipped` function arguments (which either match or are mapped to our input column names). We can pass this kernel to `apply_rows`. We'll need to specify a few arguments:\n", "- incols\n", " - A list of names of input columns that match the function arguments. Or, a dictionary mapping input column names to their corresponding function arguments such as `{'col1': 'arg1'}`.\n", "- outcols\n", " - A dictionary defining our output column names and their data types. These names must match our function arguments.\n", "- kwargs (optional)\n", " - We can optionally pass keyword arguments as a dictionary. Since we don't need any, we pass an empty one.\n", " \n", "While it looks like our function is looping sequentially through our columns, it actually executes in parallel in multiple threads on the GPU. This parallelism is the heart of GPU-accelerated computing. With that background, we're ready to use our UDF." ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
abceout
0-0.691674TrueDan-6.916743-0.691674
10.480099FalseBob4.8009945.281093
2-0.473370TrueXavier-4.733700-0.473370
30.067479TrueAlice0.6747880.742267
4-0.970850FalseSarah-9.708501-0.970850
\n", "
" ], "text/plain": [ " a b c e out\n", "0 -0.691674 True Dan -6.916743 -0.691674\n", "1 0.480099 False Bob 4.800994 5.281093\n", "2 -0.473370 True Xavier -4.733700 -0.473370\n", "3 0.067479 True Alice 0.674788 0.742267\n", "4 -0.970850 False Sarah -9.708501 -0.970850" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df = df.apply_rows(conditional_add, \n", " incols={'a':'x', 'e':'y'},\n", " outcols={'out': np.float64},\n", " kwargs={}\n", " )\n", "df.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As expected, we see our conditional addition worked. At this point, we've successfully executed UDFs on the core data structures of cuDF." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Rolling Window UDFs\n", "\n", "For time-series data, we may need to operate on a small \\\"window\\\" of our column at a time, processing each portion independently. We could slide (\\\"roll\\\") this window over the entire column to answer questions like \\\"What is the 3-day moving average of a stock price over the past year?\"\n", "\n", "We can apply more complex functions to rolling windows to `rolling` Series and DataFrames using `apply`. This example is adapted from cuDF's [API documentation](https://docs.rapids.ai/api/cudf/stable/api.html#cudf.core.dataframe.DataFrame.rolling). First, we'll create an example Series and then create a `rolling` object from the Series." ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "0 16.0\n", "1 25.0\n", "2 36.0\n", "3 49.0\n", "4 64.0\n", "5 81.0\n", "dtype: float64" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "ser = cudf.Series([16, 25, 36, 49, 64, 81], dtype='float64')\n", "ser" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Rolling [window=3,min_periods=3,center=False]" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "rolling = ser.rolling(window=3, min_periods=3, center=False)\n", "rolling" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, we'll define a function to use on our rolling windows. We created this one to highlight how you can include things like loops, mathematical functions, and conditionals. Rolling window UDFs do not yet support null values." ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "import math\n", "\n", "def example_func(window):\n", " b = 0\n", " for a in window:\n", " b = max(b, math.sqrt(a))\n", " if b == 8:\n", " return 100 \n", " return b" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can execute the function by passing it to `apply`. With `window=3`, `min_periods=3`, and `center=False`, our first two values are `null`." ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "0 null\n", "1 null\n", "2 6.0\n", "3 7.0\n", "4 100.0\n", "5 9.0\n", "dtype: float64" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "rolling.apply(example_func)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can apply this function to every column in a DataFrame, too." ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
ab
055.055.0
156.056.0
257.057.0
358.058.0
459.059.0
\n", "
" ], "text/plain": [ " a b\n", "0 55.0 55.0\n", "1 56.0 56.0\n", "2 57.0 57.0\n", "3 58.0 58.0\n", "4 59.0 59.0" ] }, "execution_count": 13, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df2 = cudf.DataFrame()\n", "df2['a'] = np.arange(55, 65, dtype='float64')\n", "df2['b'] = np.arange(55, 65, dtype='float64')\n", "df2.head()" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
ab
0nullnull
1nullnull
27.5498344357.549834435
37.6157731067.615773106
47.6811457487.681145748
57.7459666927.745966692
67.8102496767.810249676
77.8740078747.874007874
87.9372539337.937253933
9100.0100.0
\n", "
" ], "text/plain": [ " a b\n", "0 null null\n", "1 null null\n", "2 7.549834435 7.549834435\n", "3 7.615773106 7.615773106\n", "4 7.681145748 7.681145748\n", "5 7.745966692 7.745966692\n", "6 7.810249676 7.810249676\n", "7 7.874007874 7.874007874\n", "8 7.937253933 7.937253933\n", "9 100.0 100.0" ] }, "execution_count": 14, "metadata": {}, "output_type": "execute_result" } ], "source": [ "rolling = df2.rolling(window=3, min_periods=3, center=False)\n", "rolling.apply(example_func)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## GroupBy DataFrame UDFs\n", "\n", "We can also apply UDFs to grouped DataFrames using `apply_grouped`. This example is also drawn and adapted from the RAPIDS [API documentation](https://docs.rapids.ai/api/cudf/stable/api.html#cudf.core.groupby.groupby.GroupBy.apply_grouped).\n", "\n", "First, we'll group our DataFrame based on column `b`, which is either True or False. Note that we currently need to pass `method=\"cudf\"` to use UDFs with GroupBy objects." ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
abceout
0-0.691674TrueDan-6.916743-0.691674
10.480099FalseBob4.8009945.281093
2-0.473370TrueXavier-4.733700-0.473370
30.067479TrueAlice0.6747880.742267
4-0.970850FalseSarah-9.708501-0.970850
\n", "
" ], "text/plain": [ " a b c e out\n", "0 -0.691674 True Dan -6.916743 -0.691674\n", "1 0.480099 False Bob 4.800994 5.281093\n", "2 -0.473370 True Xavier -4.733700 -0.473370\n", "3 0.067479 True Alice 0.674788 0.742267\n", "4 -0.970850 False Sarah -9.708501 -0.970850" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.head()" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/opt/conda/envs/rapids/lib/python3.7/site-packages/cudf/core/dataframe.py:2559: UserWarning: as_index==True not supported due to the lack of multi-index with legacy groupby function. Use hash method for multi-index\n", " \"as_index==True not supported due to the lack of \"\n" ] } ], "source": [ "grouped = df.groupby(['b'], method=\"cudf\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next we'll define a function to apply to each group independently. In this case, we'll take the rolling average of column `e`, and call that new column `rolling_avg_e`." ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [], "source": [ "def rolling_avg(e, rolling_avg_e):\n", " win_size = 3\n", " for i in range(cuda.threadIdx.x, len(e), cuda.blockDim.x):\n", " if i < win_size - 1:\n", " # If there is not enough data to fill the window,\n", " # take the average to be NaN\n", " rolling_avg_e[i] = np.nan\n", " else:\n", " total = 0\n", " for j in range(i - win_size + 1, i + 1):\n", " total += e[j]\n", " rolling_avg_e[i] = total / win_size" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can execute this with a very similar API to `apply_rows`. This time, though, it's going to execute independently for each group." ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
abceoutrolling_avg_e
00.480099FalseBob4.8009945.281093NaN
1-0.970850FalseSarah-9.708501-0.970850NaN
20.801430FalseSarah8.0142978.8157271.035597
3-0.933157FalseQuinn-9.331571-0.933157-3.675258
4-0.691674TrueDan-6.916743-0.691674NaN
5-0.473370TrueXavier-4.733700-0.473370NaN
60.067479TrueAlice0.6747880.742267-3.658552
70.837494TrueWendy8.3749409.2124341.438676
80.913899TrueUrsula9.13898710.0528856.062905
9-0.725581TrueGeorge-7.255814-0.7255813.419371
\n", "
" ], "text/plain": [ " a b c e out rolling_avg_e\n", "0 0.480099 False Bob 4.800994 5.281093 NaN\n", "1 -0.970850 False Sarah -9.708501 -0.970850 NaN\n", "2 0.801430 False Sarah 8.014297 8.815727 1.035597\n", "3 -0.933157 False Quinn -9.331571 -0.933157 -3.675258\n", "4 -0.691674 True Dan -6.916743 -0.691674 NaN\n", "5 -0.473370 True Xavier -4.733700 -0.473370 NaN\n", "6 0.067479 True Alice 0.674788 0.742267 -3.658552\n", "7 0.837494 True Wendy 8.374940 9.212434 1.438676\n", "8 0.913899 True Ursula 9.138987 10.052885 6.062905\n", "9 -0.725581 True George -7.255814 -0.725581 3.419371" ] }, "execution_count": 18, "metadata": {}, "output_type": "execute_result" } ], "source": [ "results = grouped.apply_grouped(rolling_avg,\n", " incols=['e'],\n", " outcols=dict(rolling_avg_e=np.float64))\n", "results" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Notice how, with a window size of three in the kernel, the first two values in each group for our output column are null." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Numba Kernels on CuPy Arrays\n", "\n", "We can also execute Numba kernels on CuPy NDArrays, again thanks to the `__cuda_array_interface__`. We can even run the same UDF on the Series and the CuPy array. First, we define a Series and then create a CuPy array from that Series." ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "array([ 1., 2., 3., 4., 10.])" ] }, "execution_count": 19, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import cupy as cp\n", "\n", "s = cudf.Series([1.0, 2, 3, 4, 10])\n", "arr = cp.asarray(s)\n", "arr" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, we define a UDF and execute it on our Series. We need to allocate a Series of the same size for our output, which we'll call `out`." ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "0 5\n", "1 10\n", "2 15\n", "3 20\n", "4 50\n", "dtype: int32" ] }, "execution_count": 20, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from cudf.utils import cudautils\n", "\n", "@cuda.jit\n", "def multiply_by_5(x, out):\n", " i = cuda.grid(1)\n", " if i < x.size:\n", " out[i] = x[i] * 5\n", " \n", "out = cudf.Series(cudautils.zeros(len(s), dtype='int32'))\n", "multiply_by_5.forall(s.shape[0])(s, out)\n", "out" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Finally, we execute the same function on our array. We allocate an empty array `out` to store our results." ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "array([ 5., 10., 15., 20., 50.])" ] }, "execution_count": 21, "metadata": {}, "output_type": "execute_result" } ], "source": [ "out = cp.empty_like(arr)\n", "multiply_by_5.forall(arr.size)(arr, out)\n", "out" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Null Handling in UDFs\n", "\n", "Above, we covered most basic usage of UDFs with cuDF.\n", "\n", "The remainder of the guide focuses on considerations for executing UDFs on DataFrames containing null values. If your UDFs will read or write any column containing nulls, **you should read this section carefully**.\n", "\n", "Writing UDFs that can handle null values is complicated by the fact that a separate bitmask is used to identify when a value is valid and when it's null. By default, DataFrame methods for applying UDFs like `apply_rows` will handle nulls pessimistically (all rows with a null value will be removed from the output if they are used in the kernel). Exploring how not handling not pessimistically can lead to undefined behavior is outside the scope of this guide. Suffice it to say, pessimistic null handling is the safe and consistent approach. You can see an example below." ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
abc
09631005997
19771026null
2null10261019
31078null985
49799821011
\n", "
" ], "text/plain": [ " a b c\n", "0 963 1005 997\n", "1 977 1026 null\n", "2 null 1026 1019\n", "3 1078 null 985\n", "4 979 982 1011" ] }, "execution_count": 22, "metadata": {}, "output_type": "execute_result" } ], "source": [ "def gpu_add(a, b, out):\n", " for i, (x, y) in enumerate(zip(a, b)):\n", " out[i] = x + y\n", "\n", "df = randomdata(nrows=5, dtypes={'a':int, 'b':int, 'c':int}, seed=12)\n", "df.loc[2, 'a'] = None\n", "df.loc[3, 'b'] = None\n", "df.loc[1, 'c'] = None\n", "df.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In the dataframe above, there are three null values. Each column has a null in a different row. When we use our UDF with `apply_rows`, our output should have two nulls due to pessimistic null handling (because we're not using column `c`, the null value there does not matter to us)." ] }, { "cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
abcout
096310059971968.0
19771026null2003.0
2null10261019null
31078null985null
497998210111961.0
\n", "
" ], "text/plain": [ " a b c out\n", "0 963 1005 997 1968.0\n", "1 977 1026 null 2003.0\n", "2 null 1026 1019 null\n", "3 1078 null 985 null\n", "4 979 982 1011 1961.0" ] }, "execution_count": 23, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df = df.apply_rows(gpu_add, \n", " incols=['a', 'b'],\n", " outcols={'out':np.float64},\n", " kwargs={})\n", "df.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As expected, we end up with two nulls in our output. The null values from the columns we used propogated to our output, but the null from the column we ignored did not." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Operating on Null Values\n", "\n", "If you don't need to conditionally handle null values in your UDFs, feel free to skip these final two sections.\n", "\n", "As a developer or data scientist, you may sometimes need to write UDFs that operate on null values. This means you need to think about the null bitmask array when writing your UDF. As a note, cuDF allows you to turn off pessimistic null handling in `apply_rows`. Instead of doing this, if you need to operate on null values we recommend writing standard `Numba.cuda` kernels. To help you interact with null bitmasks from Python, cuDF provides the `mask_get` utility function. The following example illustrates how you can use `mask_get` in Numba kernels like we used earlier in this guide." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Standard Numba Kernels\n", "\n", "First, we import `mask_get` and create a DataFrame with some null values." ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
ab
0-0.691674315True
10.480099393False
2nullTrue
30.067478787True
4nullFalse
\n", "
" ], "text/plain": [ " a b\n", "0 -0.691674315 True\n", "1 0.480099393 False\n", "2 null True\n", "3 0.067478787 True\n", "4 null False" ] }, "execution_count": 24, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from cudf.utils.cudautils import mask_get\n", "\n", "df = randomdata(nrows=10, dtypes={'a':float, 'b':bool}, seed=12)\n", "df.loc[[2,4], 'a'] = None\n", "df.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, we'll define a simple kernel like before, with a couple of differences. This kernel needs access to the null bitmask, so we include a `validity_mask` argument. We also wrap our logic in a conditional based on the results of `mask_get`:\n", "- If the result of `mask_get` for that index **is** valid (there is a value), do the multiplication\n", "- If the result of `mask_get` for that index **is not** valid (it's null), set the output -999999" ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [], "source": [ "@cuda.jit\n", "def gpu_kernel_masked(in_col, validity_mask, out_col, multiplier):\n", " i = cuda.grid(1)\n", " if i < in_col.size:\n", " valid = mask_get(validity_mask, i)\n", " if valid:\n", " out_col[i] = in_col[i] * multiplier\n", " else:\n", " out_col[i] = -999999" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We now grab the underlying DeviceArrays and execute our kernel like we did previously, except that this time we also pass in the DeviceArray of our column's null mask. Because Numba doesn't yet handle masked GPU arrays, we can't directly pass our `Series` here." ] }, { "cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
abresult
0-0.691674315True-6.916743
10.480099393False4.800994
2nullTrue-999999.000000
30.067478787True0.674788
4nullFalse-999999.000000
\n", "
" ], "text/plain": [ " a b result\n", "0 -0.691674315 True -6.916743\n", "1 0.480099393 False 4.800994\n", "2 null True -999999.000000\n", "3 0.067478787 True 0.674788\n", "4 null False -999999.000000" ] }, "execution_count": 26, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import rmm # RAPIDS Memory Manager\n", "\n", "a_dary = df.a._column.data.mem\n", "a_mask = df.a.nullmask.mem\n", "output_dary = rmm.device_array_like(a_dary)\n", "\n", "gpu_kernel_masked.forall(output_dary.size)(a_dary, a_mask, output_dary, 10)\n", "df['result'] = output_dary\n", "df.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Summary\n", "\n", "This guide has covered a lot of content. At this point, you should hopefully feel comfortable writing UDFs (with or without null values) that operate on\n", "\n", "- Series\n", "- DataFrame\n", "- Rolling Windows\n", "- GroupBy DataFrames\n", "- CuPy NDArrays\n", "- Numba DeviceNDArrays\n", "\n", "\n", "For more information please see the [cuDF](https://docs.rapids.ai/api/cudf/nightly/), [Numba.cuda](https://numba.pydata.org/numba-doc/dev/cuda/index.html), and [CuPy](https://docs-cupy.chainer.org/en/stable/) documentation." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.3" } }, "nbformat": 4, "nbformat_minor": 4 }