Multi-Node Multi-GPU XGBoost example on Azure using dask-cloudprovider#

Dask Cloud Provider is a native cloud intergration library for Dask. It helps manage Dask clusters on different cloud platforms. In this notebook, we will look at how we can use this package to set-up an Azure cluster and run a multi-node multi-GPU (MNMG) example with RAPIDS. RAPIDS provides a suite of libraries to accelerate data science pipelines on the GPU entirely. This can be scaled to multiple nodes using Dask as we will see in this notebook.

For the purposes of this demo, we will use a part of the NYC Taxi Dataset (only the files of 2014 calendar year will be used here). The goal is to predict the fare amount for a given trip given the times and coordinates of the taxi trip. We will download the data from Azure Open Datasets, where the dataset is publicly hosted by Microsoft.

Note

In this notebook, we will explore two possible ways to use dask-cloudprovider to run our workloads on Azure VM clusters:

  1. Option 1: Using an Azure Marketplace image made available for free from NVIDIA. The RAPIDS container will be subsequently downloaded once the VMs start up.

  2. Option 2: Using packer to create a custom VM image to be used in the cluster. This image will include the RAPIDS container, and having the container already inside the image should speed up the process of provisioning the cluster.

You can either use Option 1 or use Option 2

Step 0: Set up Azure credentials and CLI#

Before running the notebook, run the following commands in the terminal to setup Azure CLI

curl -sL https://aka.ms/InstallAzureCLIDeb | sudo bash
az login

Then, follow the instructions on the prompt to finish setting up the account. If you are running the notebook from inside a Docker container, you can remove sudo.

!az login
A web browser has been opened at https://login.microsoftonline.com/organizations/oauth2/v2.0/authorize. Please continue the login in the web browser. If no web browser is available or if the web browser fails to open, use device code flow with `az login --use-device-code`.
[
  {
    "cloudName": "AzureCloud",
    "homeTenantId": "43083d15-7273-40c1-b7db-39efd9ccc17a",
    "id": "fc4f4a6b-4041-4b1c-8249-854d68edcf62",
    "isDefault": true,
    "managedByTenants": [
      {
        "tenantId": "2f4a9838-26b7-47ee-be60-ccc1fdec5953"
      }
    ],
    "name": "NV-AI-Infra",
    "state": "Enabled",
    "tenantId": "43083d15-7273-40c1-b7db-39efd9ccc17a",
    "user": {
      "name": "skirui@nvidia.com",
      "type": "user"
    }
  }
]

Step 1: Import necessary packages#

# # Uncomment the following and install some libraries at the beginning.
# If adlfs is not present, install adlfs to read from Azure data lake.
! pip install adlfs
! pip install "dask-cloudprovider[azure]" --upgrade
from dask.distributed import Client, wait, get_worker
from dask_cloudprovider.azure import AzureVMCluster
import dask_cudf
from dask_ml.model_selection import train_test_split
from cuml.dask.common import utils as dask_utils
from cuml.metrics import mean_squared_error
from cuml import ForestInference
import cudf
import xgboost as xgb
from datetime import datetime
from dateutil import parser
import numpy as np
from timeit import default_timer as timer
import dask
import json

Step 2: Set up the Azure VM Cluster#

We will now set up a Dask cluster on Azure Virtual machines using AzureVMCluster from Dask Cloud Provider following these instructions.

To do this, you will first need to set up a Resource Group, a Virtual Network and a Security Group on Azure. Learn more about how you can set this up. Note that you can also set it up using the Azure portal.

Once you have set it up, you can now plug in the names of the entities you have created in the cell below.

We need to pass in the docker argument docker_args = '--shm-size=256m' to allow larger shared memory for successfully running multiple docker containers in the same VM. This is the case when each VM has more than one worker. Even if you don’t have such a case, there is no harm in having a larger shared memory. Finally, note that we use the RAPIDS docker image to build the VM and use the dask_cuda.CUDAWorker to run within the VM. This will run the worker docker image with GPU capabilities instead of CPU.

location = "West US 2"
resource_group = "rapidsai-deployment"
vnet = "rapidsai-deployment-vnet"
security_group = "rapidsaiclouddeploymenttest-nsg"
vm_size = "Standard_NC12s_v3"  # or choose a different GPU enabled VM type

docker_image = "nvcr.io/nvidia/rapidsai/base:24.04-cuda11.8-py3.10"
docker_args = "--shm-size=256m"
worker_class = "dask_cuda.CUDAWorker"

Option 1: Use an Azure Marketplace VM image#

In this method, we can use an Azure marketplace VM provided by NVIDIA for free. These VM images contain all the necessary dependencies and NVIDIA drivers preinstalled. These images are made available by NVIDIA as an out-of-the-box solution to decrease the cluster setup time for data scientists. Fortunately for us, dask-cloudprovider has made it simple to pass in information of a marketplace VM, and it will use the selected VM image instead of a vanilla image.

We will use the following image –> NVIDIA GPU-Optimized Image for AI and HPC.

Note

Please make sure you have dask-cloudprovider version 2021.6.0 or above. Marketplace VMs in Azure is not supported in older versions.

Set up Marketplace VM information and clear default dask config.#

dask.config.set(
    {
        "logging.distributed": "info",
        "cloudprovider.azure.azurevm.marketplace_plan": {
            "publisher": "nvidia",
            "name": "ngc-base-version-23_03_0",
            "product": "ngc_azure_17_11",
            "version": "23.03.0",
        },
    }
)
vm_image = ""
config = dask.config.get("cloudprovider.azure.azurevm", {})
config
{'vnet': None,
 'security_group': None,
 'public_ingress': True,
 'vm_size': 'Standard_DS1_v2',
 'disk_size': 50,
 'scheduler_vm_size': None,
 'docker_image': 'daskdev/dask:latest',
 'vm_image': {'publisher': 'Canonical',
  'offer': 'UbuntuServer',
  'sku': '18.04-LTS',
  'version': 'latest'},
 'bootstrap': True,
 'auto_shutdown': True,
 'marketplace_plan': {'publisher': 'nvidia',
  'name': 'ngc-base-version-23_03_0',
  'product': 'ngc_azure_17_11',
  'version': '23.03.0'}}

If necessary, you must uncomment and accept the Azure Marketplace image terms so that the image can be used to create VMs

! az vm image terms accept --urn "nvidia:ngc_azure_17_11:ngc-base-version-23_03_0:23.03.0" --verbose
{
  "accepted": true,
  "id": "/subscriptions/fc4f4a6b-4041-4b1c-8249-854d68edcf62/providers/Microsoft.MarketplaceOrdering/offerTypes/Microsoft.MarketplaceOrdering/offertypes/publishers/nvidia/offers/ngc_azure_17_11/plans/ngc-base-version-23_03_0/agreements/current",
  "licenseTextLink": "https://mpcprodsa.blob.core.windows.net/legalterms/3E5ED_legalterms_NVIDIA%253a24NGC%253a5FAZURE%253a5F17%253a5F11%253a24NGC%253a2DBASE%253a2DVERSION%253a2D23%253a5F03%253a5F0%253a24KJVKRIWKTRQ3CIEPNL6YTG4AVORBHHPZCDQDVWX7JPPDEF6UM7R4XO76VDRHXCNTQYATKLGYYW3KA7DSIKTYXBZ3HJ2FMWYCINEY4WQ.txt",
  "marketplaceTermsLink": "https://mpcprodsa.blob.core.windows.net/marketplaceterms/3EDEF_marketplaceterms_VIRTUALMACHINE%253a24AAK2OAIZEAWW5H4MSP5KSTVB6NDKKRTUBAU23BRFTWN4YC2MQLJUB5ZEYUOUJBVF3YK34CIVPZL2HWYASPGDUY5O2FWEGRBYOXWZE5Y.txt",
  "name": "ngc-base-version-23_03_0",
  "plan": "ngc-base-version-23_03_0",
  "privacyPolicyLink": "https://www.nvidia.com/en-us/about-nvidia/privacy-policy/",
  "product": "ngc_azure_17_11",
  "publisher": "nvidia",
  "retrieveDatetime": "2023-10-02T08:17:40.3203275Z",
  "signature": "SWCKS7PPTL3XIBGBE2IZCMF43KBRDLSIZ7XLXXTLI6SXDCPCXY53BAISH6DNIELVV63GPZ44AOMMMZ6RV2AL5ARNM6XWHXRJ4HDNTJI",
  "systemData": {
    "createdAt": "2023-10-02T08:17:43.219827+00:00",
    "createdBy": "fc4f4a6b-4041-4b1c-8249-854d68edcf62",
    "createdByType": "ManagedIdentity",
    "lastModifiedAt": "2023-10-02T08:17:43.219827+00:00",
    "lastModifiedBy": "fc4f4a6b-4041-4b1c-8249-854d68edcf62",
    "lastModifiedByType": "ManagedIdentity"
  },
  "type": "Microsoft.MarketplaceOrdering/offertypes"
}
Command ran in 7.879 seconds (init: 0.159, invoke: 7.720)

Now that you have set up the necessary configurations to use the NVIDIA VM image, directly move to Step 2.1 to start the AzureVMCluster.

Option 2: Set up an Azure Customized VM#

If you already have a customized VM and you know its resource id, jump to Step f. of Option 2

In general, if we use a generic image to create a cluster, we would have to wait till the new VMs are provisioned fully with all dependencies. The provisioning step does several things such as set the VM up with required libraries, set up Docker, install the NVIDIA drivers and also pull and decompress the RAPIDS container etc. This usually takes around 10-15 minutes of time depending on the cloud provider. If the user wants to fire up a cluster quickly, setting up a VM from a generic image every time may not be optimal.

Further, as detailed in Option 1, we can also choose to use a custom Marketplace VM from NVIDIA. However, we will still have to download and decompress the RAPIDS container. So the setup time to start the workers and the scheduler would still be around 8-10 minutes.

Luckily we can improve on this. We can make our own customized VM bundled with all the necessary packages, drivers, containers and dependencies. This way, firing up the cluster using the customized VM will take minimal time.

In this example, we will be using a tool called packer to create our customized virtual machine image. Packer automates the process of building and customizing VMs across all major cloud providers.

Now, to create a customized VM image, follow steps a. to f.

a. Install packer#

Follow the getting started guide to download the necessary binary according to your platform and install it.

b. Authenticate packer with Azure#

There are several ways to authenticate packer to work with Azure (details provided here). However, since we already have installed Azure cli (az) at the beginning of the notebook, authenticating packer with az cli is the easiest option. We will let packer use the Azure credentials from az cli, and so, you do not have to do anything further in this step.

c. Generate the cloud init script for customizing the VM image#

packer can use a cloud-init script to initialize a VM. The cloud init script contains the set of commands that will set up the environment of our customized VM. We will pass this as an external file to the packer command via a configuration script.

The cloud init file cloud_init.yaml.j2 file is present in the configs folder. In case you want to add/modify any configuration, edit the cloud_init.yaml.j2 before proceeding to the next steps.

d. Write packer configuration to a configuration file#

We now need to provide packer with a build file with platform related and cloud-init configurations. packer will use this to create the customized VM.

In this example, we are creating a single custom VM image that will be accessible by the user only. We will use a Ubuntu Server 18.04 base image and customize it. Later on, we will instantiate all our VMs from this customized VM image.

If you are curious about what else you can configure, take a look at all the available Azure build parameters for packer.

Note

Our resource group already exists in this example. Hence we simply pass in our resource group name in the required parameters managed_image_resource_group_name and build_resource_group_name.

packer_config = {
  "builders": [{
    "type": "azure-arm",
    "use_azure_cli_auth": True,
    "managed_image_resource_group_name": resource_group,
    "managed_image_name": <the name of the customized VM image>,
    "custom_data_file": "./configs/cloud_init.yaml.j2",
    "os_type": "Linux",
    "image_publisher": "Canonical",
    "image_offer": "UbuntuServer",
    "image_sku": "18.04-LTS",
    "azure_tags": {
        "dept": "RAPIDS-CSP",
        "task": "RAPIDS Custom Image deployment"
    },
    "build_resource_group_name": resource_group,
    "vm_size": vm_size
  }],
  "provisioners": [{
    "inline": [
      "echo 'Waiting for cloud-init'; while [ ! -f /var/lib/cloud/instance/boot-finished ]; do sleep 1; done; echo 'Done'",
    ],
    "type": "shell"
  }]
}

with open("packer_config.json", "w") as fh:
    fh.write(json.dumps(packer_config))

e. Run packer build and create the image#

# # Uncomment the following line and run to create the custom image
# ! packer build packer_config.json

This will take around 15 minutes. Grab a coffee or watch an episode of your favourite tv show and come back. But remember, you will only have to do this once, unless you want to update the packages in the VM. This means that you can make this custom image once, and then keep on using it for hundreds of times.

While packer is building the image, you will see an output similar to what is shown below.

$ packer build packer_config.json
azure-arm: output will be in this color.

==> azure-arm: Running builder ...
==> azure-arm: Getting tokens using Azure CLI
==> azure-arm: Getting tokens using Azure CLI
    azure-arm: Creating Azure Resource Manager (ARM) client ...
==> azure-arm: Using existing resource group ...
==> azure-arm:  -> ResourceGroupName : <your resource group>
==> azure-arm:  -> Location          : <some chosen location>
==> azure-arm: Validating deployment template ...
==> azure-arm:  -> ResourceGroupName : <your resource group>
==> azure-arm:  -> DeploymentName    : 'pkrdp04rrahxkg9'
==> azure-arm: Deploying deployment template ...
==> azure-arm:  -> ResourceGroupName : <your resource group>
==> azure-arm:  -> DeploymentName    : 'pkrdp04rrahxkg9'
==> azure-arm:
==> azure-arm: Getting the VM's IP address ...
==> azure-arm:  -> ResourceGroupName   : <your resource group>
==> azure-arm:  -> PublicIPAddressName : 'pkrip04rrahxkg9'
==> azure-arm:  -> NicName             : 'pkrni04rrahxkg9'
==> azure-arm:  -> Network Connection  : 'PublicEndpoint'
==> azure-arm:  -> IP Address          : '40.77.62.118'
==> azure-arm: Waiting for SSH to become available...
==> azure-arm: Connected to SSH!
==> azure-arm: Provisioning with shell script: /tmp/packer-shell614221056
    azure-arm: Waiting for cloud-init
    azure-arm: Done
==> azure-arm: Querying the machine's properties ...
==> azure-arm:  -> ResourceGroupName : <your resource group>
==> azure-arm:  -> ComputeName       : 'pkrvm04rrahxkg9'
==> azure-arm:  -> Managed OS Disk   : '/subscriptions/<your subscription id>/resourceGroups/<your resource group>/providers/Microsoft.Compute/disks/pkros04rrahxkg9'
==> azure-arm: Querying the machine's additional disks properties ...
==> azure-arm:  -> ResourceGroupName : <your resource group>
==> azure-arm:  -> ComputeName       : 'pkrvm04rrahxkg9'
==> azure-arm: Powering off machine ...
==> azure-arm:  -> ResourceGroupName : <your resource group>
==> azure-arm:  -> ComputeName       : 'pkrvm04rrahxkg9'
==> azure-arm: Capturing image ...
==> azure-arm:  -> Compute ResourceGroupName : <your resource group>
==> azure-arm:  -> Compute Name              : 'pkrvm04rrahxkg9'
==> azure-arm:  -> Compute Location          : <some chosen location>
==> azure-arm:  -> Image ResourceGroupName   : <your resource group>
==> azure-arm:  -> Image Name                : <your chosen custom image name>
==> azure-arm:  -> Image Location            : <some chosen location>
==> azure-arm: 
==> azure-arm: Deleting individual resources ...
==> azure-arm: Adding to deletion queue -> Microsoft.Compute/virtualMachines : 'pkrvm04rrahxkg9'
==> azure-arm: Adding to deletion queue -> Microsoft.Network/networkInterfaces : 'pkrni04rrahxkg9'
==> azure-arm: Adding to deletion queue -> Microsoft.Network/publicIPAddresses : 'pkrip04rrahxkg9'
==> azure-arm: Adding to deletion queue -> Microsoft.Network/virtualNetworks : 'pkrvn04rrahxkg9'
==> azure-arm: Attempting deletion -> Microsoft.Network/networkInterfaces : 'pkrni04rrahxkg9'
==> azure-arm: Waiting for deletion of all resources...
==> azure-arm: Attempting deletion -> Microsoft.Network/publicIPAddresses : 'pkrip04rrahxkg9'
==> azure-arm: Attempting deletion -> Microsoft.Compute/virtualMachines : 'pkrvm04rrahxkg9'
==> azure-arm: Attempting deletion -> Microsoft.Network/virtualNetworks : 'pkrvn04rrahxkg9'
    .
    .
    .
    .
    .
    .
    .
    .
==> azure-arm:  Deleting -> Microsoft.Compute/disks : '/subscriptions/<your subscription id>/resourceGroups/<your resource group>/providers/Microsoft.Compute/disks/pkros04rrahxkg9'
==> azure-arm: Removing the created Deployment object: 'pkrdp04rrahxkg9'
==> azure-arm: 
==> azure-arm: The resource group was not created by Packer, not deleting ...
Build 'azure-arm' finished after 16 minutes 22 seconds.

==> Wait completed after 16 minutes 22 seconds

==> Builds finished. The artifacts of successful builds are:
--> azure-arm: Azure.ResourceManagement.VMImage:

OSType: Linux
ManagedImageResourceGroupName: <your resource group>
ManagedImageName: <your chosen custom image name>
ManagedImageId: /subscriptions/<your subscription id>/resourceGroups/<your resource group>/providers/Microsoft.Compute/images/<your chosen custom image name>
ManagedImageLocation: <some chosen location>

When packer finishes, at the bottom of the output, you will see something similar to the following:

ManagedImageResourceGroupName: <your resource group>
ManagedImageName: <your chosen custom image name>
ManagedImageId: /subscriptions/<your subscription id>/resourceGroups/<your resource group>/providers/Microsoft.Compute/images/<your chosen custom image name>
ManagedImageLocation: <some chosen location>

Make note of the ManagedImageId. This is the resource id of the custom image we will use.

As shown above the ManagedImageId will look something like : /subscriptions/12345/resourceGroups/myown-rg/providers/Microsoft.Compute/images/myCustomImage

f. Set up customized VM information and clear default dask config#

Once you have the custom VM resource id, you should reset the default VM image information in dask.config. The default image value loaded in dask.config is that of a basic Ubuntu Server 18.04 LTS (the one that you already customized). If you do not reset it, dask will try to use that image instead of your custom made one.

ManagedImageId = <value from the output above> # or the customized VM id if you already have resource id of the customized VM from a previous run.
dask.config.set({"cloudprovider.azure.azurevm.vm_image": {}})
config = dask.config.get("cloudprovider.azure.azurevm", {})
print(config)
vm_image = {"id": ManagedImageId}
print(vm_image)

Step 2.1: Start the VM Cluster in Azure#

Here, if you have used Option 1, i.e., the NVIDIA VM image, pass an empty string for vm_image information.

For Option 2, pass the vm_image information that you got from the output of packer run as a parameter to AzureVMCluster.

Also turn off the bootstrapping of the VM by passing bootstrap=False. This will turn off installation of the dependencies in the VM while instantiating, since we already have them on our custom VM in either cases.

Note

The rest of the notebook should be the same irrespective of whether you chose Option 1 or Option 2.

Note

The number of actual workers that our cluster would have is not always equal to the number of VMs spawned i.e. the value of \(n\_workers\) passed in. If the number of GPUs in the chosen vm_size is \(G\) and number of VMs spawned is \(n\_workers\), then we have then number of actual workers \(W = n\_workers \times G\). For example, for Standard_NC12s_v3 VMs that have 2 V100 GPUs per VM, for \(n\_workers=2\), we have \(W = 2 \times 2=4\).

%%time

cluster = AzureVMCluster(
    location=location,
    resource_group=resource_group,
    vnet=vnet,
    security_group=security_group,
    vm_image=vm_image,
    vm_size=vm_size,
    disk_size=200,
    docker_image=docker_image,
    worker_class=worker_class,
    n_workers=2,
    security=True,
    docker_args=docker_args,
    debug=False,
    bootstrap=False,  # This is to prevent the cloud init jinja2 script from running in the custom VM.
)
Creating scheduler instance
Assigned public IP
Network interface ready
Using Marketplace VM image with a Plan
Creating VM
Created VM dask-92c5978e-scheduler
Waiting for scheduler to run at 4.155.2.188:8786
Scheduler is running
/home/skirui/anaconda3/envs/rapids-23.08/lib/python3.10/contextlib.py:142: UserWarning: Creating your cluster is taking a surprisingly long time. This is likely due to pending resources. Hang tight! 
  next(self.gen)
Creating worker instance
Creating worker instance
Network interface ready
Using Marketplace VM image with a Plan
Creating VM
Network interface ready
Using Marketplace VM image with a Plan
Creating VM
Created VM dask-92c5978e-worker-54f8d057
Created VM dask-92c5978e-worker-9f9a9c9b
CPU times: user 1.22 s, sys: 189 ms, total: 1.41 s
Wall time: 6min 58s
client = Client(cluster)
client

Client

Client-b8982284-60fe-11ee-a1e9-80e82cd32958

Connection method: Cluster object Cluster type: dask_cloudprovider.AzureVMCluster
Dashboard: http://4.155.2.188:8787/status

Cluster Info

%%time
client.wait_for_workers(2)
CPU times: user 0 ns, sys: 6.1 ms, total: 6.1 ms
Wall time: 29 ms
# Uncomment if you only have the scheduler with n_workers=0 and want to scale the workers separately.
# %%time
# client.cluster.scale(n_workers)

Wait till all the workers are up. This will wait for n_workers number of VMs to be up.

Before we start the training process, let us take a quick look at the details of the GPUs in the worker pods that we will be using.

import pprint

pp = pprint.PrettyPrinter()

pp.pprint(
    client.scheduler_info()
)  # will show some information of the GPUs of the workers
{'address': 'tls://10.5.0.42:8786',
 'id': 'Scheduler-3bae5a4d-29d1-4317-bbfc-931e97a077fb',
 'services': {'dashboard': 8787},
 'started': 1696235012.5914223,
 'type': 'Scheduler',
 'workers': {'tls://10.5.0.43:36201': {'gpu': {'memory-total': 17179869184,
                                               'name': 'Tesla V100-PCIE-16GB'},
                                       'host': '10.5.0.43',
                                       'id': 'dask-92c5978e-worker-54f8d057-1',
                                       'last_seen': 1696235778.2340653,
                                       'local_directory': '/tmp/dask-scratch-space/worker-6bghw_yx',
                                       'memory_limit': 118225670144,
                                       'metrics': {'bandwidth': {'total': 100000000,
                                                                 'types': {},
                                                                 'workers': {}},
                                                   'cpu': 4.0,
                                                   'digests_total_since_heartbeat': {'latency': 0.004627227783203125,
                                                                                     'tick-duration': 0.5006744861602783},
                                                   'event_loop_interval': 0.019985613822937013,
                                                   'gpu': {'memory-used': 598867968,
                                                           'utilization': 0},
                                                   'gpu_memory_used': 598867968,
                                                   'gpu_utilization': 0,
                                                   'host_disk_io': {'read_bps': 0.0,
                                                                    'write_bps': 0.0},
                                                   'host_net_io': {'read_bps': 612.42422993883,
                                                                   'write_bps': 3346.3180145677247},
                                                   'managed_bytes': 0,
                                                   'memory': 623116288,
                                                   'num_fds': 86,
                                                   'rmm': {'rmm-total': 0,
                                                           'rmm-used': 0},
                                                   'spilled_bytes': {'disk': 0,
                                                                     'memory': 0},
                                                   'task_counts': {},
                                                   'time': 1696235777.730071,
                                                   'transfer': {'incoming_bytes': 0,
                                                                'incoming_count': 0,
                                                                'incoming_count_total': 0,
                                                                'outgoing_bytes': 0,
                                                                'outgoing_count': 0,
                                                                'outgoing_count_total': 0}},
                                       'name': 'dask-92c5978e-worker-54f8d057-1',
                                       'nanny': 'tls://10.5.0.43:42265',
                                       'nthreads': 1,
                                       'resources': {},
                                       'services': {'dashboard': 44817},
                                       'status': 'running',
                                       'type': 'Worker'},
             'tls://10.5.0.43:38107': {'gpu': {'memory-total': 17179869184,
                                               'name': 'Tesla V100-PCIE-16GB'},
                                       'host': '10.5.0.43',
                                       'id': 'dask-92c5978e-worker-54f8d057-0',
                                       'last_seen': 1696235778.2329032,
                                       'local_directory': '/tmp/dask-scratch-space/worker-ix8y4_eg',
                                       'memory_limit': 118225670144,
                                       'metrics': {'bandwidth': {'total': 100000000,
                                                                 'types': {},
                                                                 'workers': {}},
                                                   'cpu': 2.0,
                                                   'digests_total_since_heartbeat': {'latency': 0.004603147506713867,
                                                                                     'tick-duration': 0.4996976852416992},
                                                   'event_loop_interval': 0.019999494552612306,
                                                   'gpu': {'memory-used': 598867968,
                                                           'utilization': 0},
                                                   'gpu_memory_used': 598867968,
                                                   'gpu_utilization': 0,
                                                   'host_disk_io': {'read_bps': 0.0,
                                                                    'write_bps': 0.0},
                                                   'host_net_io': {'read_bps': 611.5250712835996,
                                                                   'write_bps': 3341.404964660714},
                                                   'managed_bytes': 0,
                                                   'memory': 623882240,
                                                   'num_fds': 86,
                                                   'rmm': {'rmm-total': 0,
                                                           'rmm-used': 0},
                                                   'spilled_bytes': {'disk': 0,
                                                                     'memory': 0},
                                                   'task_counts': {},
                                                   'time': 1696235777.729443,
                                                   'transfer': {'incoming_bytes': 0,
                                                                'incoming_count': 0,
                                                                'incoming_count_total': 0,
                                                                'outgoing_bytes': 0,
                                                                'outgoing_count': 0,
                                                                'outgoing_count_total': 0}},
                                       'name': 'dask-92c5978e-worker-54f8d057-0',
                                       'nanny': 'tls://10.5.0.43:33657',
                                       'nthreads': 1,
                                       'resources': {},
                                       'services': {'dashboard': 45421},
                                       'status': 'running',
                                       'type': 'Worker'},
             'tls://10.5.0.44:34087': {'gpu': {'memory-total': 17179869184,
                                               'name': 'Tesla V100-PCIE-16GB'},
                                       'host': '10.5.0.44',
                                       'id': 'dask-92c5978e-worker-9f9a9c9b-1',
                                       'last_seen': 1696235778.5268767,
                                       'local_directory': '/tmp/dask-scratch-space/worker-1d7vbddw',
                                       'memory_limit': 118225670144,
                                       'metrics': {'bandwidth': {'total': 100000000,
                                                                 'types': {},
                                                                 'workers': {}},
                                                   'cpu': 0.0,
                                                   'digests_total_since_heartbeat': {'latency': 0.004075765609741211,
                                                                                     'tick-duration': 0.4998819828033447},
                                                   'event_loop_interval': 0.02001068115234375,
                                                   'gpu': {'memory-used': 598867968,
                                                           'utilization': 0},
                                                   'gpu_memory_used': 598867968,
                                                   'gpu_utilization': 0,
                                                   'host_disk_io': {'read_bps': 0.0,
                                                                    'write_bps': 12597732.652975753},
                                                   'host_net_io': {'read_bps': 612.7208378808626,
                                                                   'write_bps': 3347.938695871903},
                                                   'managed_bytes': 0,
                                                   'memory': 624406528,
                                                   'num_fds': 86,
                                                   'rmm': {'rmm-total': 0,
                                                           'rmm-used': 0},
                                                   'spilled_bytes': {'disk': 0,
                                                                     'memory': 0},
                                                   'task_counts': {},
                                                   'time': 1696235778.023989,
                                                   'transfer': {'incoming_bytes': 0,
                                                                'incoming_count': 0,
                                                                'incoming_count_total': 0,
                                                                'outgoing_bytes': 0,
                                                                'outgoing_count': 0,
                                                                'outgoing_count_total': 0}},
                                       'name': 'dask-92c5978e-worker-9f9a9c9b-1',
                                       'nanny': 'tls://10.5.0.44:37979',
                                       'nthreads': 1,
                                       'resources': {},
                                       'services': {'dashboard': 36073},
                                       'status': 'running',
                                       'type': 'Worker'},
             'tls://10.5.0.44:37791': {'gpu': {'memory-total': 17179869184,
                                               'name': 'Tesla V100-PCIE-16GB'},
                                       'host': '10.5.0.44',
                                       'id': 'dask-92c5978e-worker-9f9a9c9b-0',
                                       'last_seen': 1696235778.528408,
                                       'local_directory': '/tmp/dask-scratch-space/worker-7y8g_hu7',
                                       'memory_limit': 118225670144,
                                       'metrics': {'bandwidth': {'total': 100000000,
                                                                 'types': {},
                                                                 'workers': {}},
                                                   'cpu': 0.0,
                                                   'digests_total_since_heartbeat': {'latency': 0.003975629806518555,
                                                                                     'tick-duration': 0.4994323253631592},
                                                   'event_loop_interval': 0.020001530647277832,
                                                   'gpu': {'memory-used': 598867968,
                                                           'utilization': 0},
                                                   'gpu_memory_used': 598867968,
                                                   'gpu_utilization': 0,
                                                   'host_disk_io': {'read_bps': 0.0,
                                                                    'write_bps': 12589746.67130889},
                                                   'host_net_io': {'read_bps': 612.3324205749067,
                                                                   'write_bps': 3345.8163634027583},
                                                   'managed_bytes': 0,
                                                   'memory': 623104000,
                                                   'num_fds': 86,
                                                   'rmm': {'rmm-total': 0,
                                                           'rmm-used': 0},
                                                   'spilled_bytes': {'disk': 0,
                                                                     'memory': 0},
                                                   'task_counts': {},
                                                   'time': 1696235778.0250378,
                                                   'transfer': {'incoming_bytes': 0,
                                                                'incoming_count': 0,
                                                                'incoming_count_total': 0,
                                                                'outgoing_bytes': 0,
                                                                'outgoing_count': 0,
                                                                'outgoing_count_total': 0}},
                                       'name': 'dask-92c5978e-worker-9f9a9c9b-0',
                                       'nanny': 'tls://10.5.0.44:36779',
                                       'nthreads': 1,
                                       'resources': {},
                                       'services': {'dashboard': 32965},
                                       'status': 'running',
                                       'type': 'Worker'}}}

Step 3: Data Setup, Cleanup and Enhancement#

Step 3.a: Set up the workers for reading parquet files from Azure Data Lake endpoints#

We will now enable all the workers to read the parquet files directly from the Azure Data Lake endpoints. This requires the adlfs python library in the workers. We will pass in the simple function installAdlfs in client.run which will install the python package in all the workers.

from dask.distributed import PipInstall

client.register_worker_plugin(PipInstall(packages=["adlfs"]))
{'tls://10.5.0.43:36201': {'status': 'OK'},
 'tls://10.5.0.43:38107': {'status': 'OK'},
 'tls://10.5.0.44:34087': {'status': 'OK'},
 'tls://10.5.0.44:37791': {'status': 'OK'}}

Step 3.b: Data Cleanup, Enhancement and Persisting Scripts#

The data needs to be cleaned up first. We remove some columns that we are not interested in. We also define the datatypes each of the columns need to be read as.

We also add some new features to our dataframe via some custom functions, namely:

  1. Haversine distance: This is used for calculating the total trip distance.

  2. Day of the week: This can be useful information for determining the fare cost.

add_features function combines the two to produce a new dataframe that has the added features.

Note

In the function persist_train_infer_split, We will also persist the test dataset in the workers. If the X_infer i.e. the test dataset is small enough, we can call compute() on it to bring the test dataset to the local machine and then perform predict on it. But in general, if the X_infer is large, it may not fit in the GPU(s) of the local machine. Moreover, moving around a large amount of data will also add to the prediction latency. Therefore it is better to persist the test dataset on the dask workers, and then call the predict functionality on the individual workers. Finally we collect the prediction results from the dask workers.

Adding features functions#

import math
from math import cos, sin, asin, sqrt, pi


def haversine_distance_kernel(
    pickup_latitude_r,
    pickup_longitude_r,
    dropoff_latitude_r,
    dropoff_longitude_r,
    h_distance,
    radius,
):
    for i, (x_1, y_1, x_2, y_2) in enumerate(
        zip(
            pickup_latitude_r,
            pickup_longitude_r,
            dropoff_latitude_r,
            dropoff_longitude_r,
        )
    ):
        x_1 = pi / 180 * x_1
        y_1 = pi / 180 * y_1
        x_2 = pi / 180 * x_2
        y_2 = pi / 180 * y_2

        dlon = y_2 - y_1
        dlat = x_2 - x_1
        a = sin(dlat / 2) ** 2 + cos(x_1) * cos(x_2) * sin(dlon / 2) ** 2

        c = 2 * asin(sqrt(a))
        # radius = 6371 # Radius of earth in kilometers # currently passed as input arguments

        h_distance[i] = c * radius


def day_of_the_week_kernel(day, month, year, day_of_week):
    for i, (d_1, m_1, y_1) in enumerate(zip(day, month, year)):
        if month[i] < 3:
            shift = month[i]
        else:
            shift = 0
        Y = year[i] - (month[i] < 3)
        y = Y - 2000
        c = 20
        d = day[i]
        m = month[i] + shift + 1
        day_of_week[i] = (d + math.floor(m * 2.6) + y + (y // 4) + (c // 4) - 2 * c) % 7


def add_features(df):
    df["hour"] = df["tpepPickupDateTime"].dt.hour
    df["year"] = df["tpepPickupDateTime"].dt.year
    df["month"] = df["tpepPickupDateTime"].dt.month
    df["day"] = df["tpepPickupDateTime"].dt.day
    df["diff"] = (
        df["tpepDropoffDateTime"] - df["tpepPickupDateTime"]
    ).dt.seconds  # convert difference between pickup and dropoff into seconds

    df["pickup_latitude_r"] = df["startLat"] // 0.01 * 0.01
    df["pickup_longitude_r"] = df["startLon"] // 0.01 * 0.01
    df["dropoff_latitude_r"] = df["endLat"] // 0.01 * 0.01
    df["dropoff_longitude_r"] = df["endLon"] // 0.01 * 0.01

    df = df.drop("tpepDropoffDateTime", axis=1)
    df = df.drop("tpepPickupDateTime", axis=1)

    df = df.apply_rows(
        haversine_distance_kernel,
        incols=[
            "pickup_latitude_r",
            "pickup_longitude_r",
            "dropoff_latitude_r",
            "dropoff_longitude_r",
        ],
        outcols=dict(h_distance=np.float32),
        kwargs=dict(radius=6371),
    )

    df = df.apply_rows(
        day_of_the_week_kernel,
        incols=["day", "month", "year"],
        outcols=dict(day_of_week=np.float32),
        kwargs=dict(),
    )

    df["is_weekend"] = df["day_of_week"] < 2
    return df

Functions for cleaning and persisting the data in the workers.

def persist_train_infer_split(
    client,
    df,
    response_dtype,
    response_id,
    infer_frac=1.0,
    random_state=42,
    shuffle=True,
):
    workers = client.has_what().keys()
    X, y = df.drop([response_id], axis=1), df[response_id].astype("float32")
    infer_frac = max(0, min(infer_frac, 1.0))
    X_train, X_infer, y_train, y_infer = train_test_split(
        X, y, shuffle=True, random_state=random_state, test_size=infer_frac
    )

    with dask.annotate(workers=set(workers)):
        X_train, y_train = client.persist(collections=[X_train, y_train])

    if infer_frac != 1.0:
        with dask.annotate(workers=set(workers)):
            X_infer, y_infer = client.persist(collections=[X_infer, y_infer])

        wait([X_train, y_train, X_infer, y_infer])
    else:
        X_infer = X_train
        y_infer = y_train

        wait([X_train, y_train])

    return X_train, y_train, X_infer, y_infer


def clean(df_part, must_haves):
    """
    This function performs the various clean up tasks for the data
    and returns the cleaned dataframe.
    """
    # iterate through columns in this df partition
    for col in df_part.columns:
        # drop anything not in our expected list
        if col not in must_haves:
            df_part = df_part.drop(col, axis=1)
            continue

        # fixes datetime error found by Ty Mckercher and fixed by Paul Mahler
        if df_part[col].dtype == "object" and col in [
            "tpepPickupDateTime",
            "tpepDropoffDateTime",
        ]:
            df_part[col] = df_part[col].astype("datetime64[ms]")
            continue

        # if column was read as a string, recast as float
        if df_part[col].dtype == "object":
            df_part[col] = df_part[col].str.fillna("-1")
            df_part[col] = df_part[col].astype("float32")
        else:
            # downcast from 64bit to 32bit types
            # Tesla T4 are faster on 32bit ops
            if "int" in str(df_part[col].dtype):
                df_part[col] = df_part[col].astype("int32")
            if "float" in str(df_part[col].dtype):
                df_part[col] = df_part[col].astype("float32")
            df_part[col] = df_part[col].fillna(-1)

    return df_part


def taxi_data_loader(
    client,
    adlsaccount,
    adlspath,
    response_dtype=np.float32,
    infer_frac=1.0,
    random_state=0,
):
    # create a list of columns & dtypes the df must have
    must_haves = {
        "tpepPickupDateTime": "datetime64[ms]",
        "tpepDropoffDateTime": "datetime64[ms]",
        "passengerCount": "int32",
        "tripDistance": "float32",
        "startLon": "float32",
        "startLat": "float32",
        "rateCodeId": "int32",
        "endLon": "float32",
        "endLat": "float32",
        "fareAmount": "float32",
    }

    workers = client.has_what().keys()
    response_id = "fareAmount"
    storage_options = {"account_name": adlsaccount}
    taxi_data = dask_cudf.read_parquet(
        adlspath,
        storage_options=storage_options,
        chunksize=25e6,
        npartitions=len(workers),
    )
    taxi_data = clean(taxi_data, must_haves)
    taxi_data = taxi_data.map_partitions(add_features)
    # Drop NaN values and convert to float32
    taxi_data = taxi_data.dropna()
    fields = [
        "passengerCount",
        "tripDistance",
        "startLon",
        "startLat",
        "rateCodeId",
        "endLon",
        "endLat",
        "fareAmount",
        "diff",
        "h_distance",
        "day_of_week",
        "is_weekend",
    ]
    taxi_data = taxi_data.astype("float32")
    taxi_data = taxi_data[fields]
    taxi_data = taxi_data.reset_index()

    return persist_train_infer_split(
        client, taxi_data, response_dtype, response_id, infer_frac, random_state
    )

Step 3.c: Get the split data and persist across workers#

We will make use of the data from November and December 2014 for the purposes of the demo.

tic = timer()
X_train, y_train, X_infer, y_infer = taxi_data_loader(
    client,
    adlsaccount="azureopendatastorage",
    adlspath="az://nyctlc/yellow/puYear=2014/puMonth=1*/*.parquet",
    infer_frac=0.1,
    random_state=42,
)
toc = timer()
print(f"Wall clock time taken for ETL and persisting : {toc-tic} s")
/home/skirui/anaconda3/envs/rapids-23.08/lib/python3.10/site-packages/dask/dataframe/io/parquet/core.py:411: FutureWarning: The `chunksize` argument is deprecated, and will be removed in a future release. Setting the `blocksize` argument instead. Please see documentation on the `blocksize` argument for more information.
  warnings.warn(
Wall clock time taken for ETL and persisting : 83.1002215759363 s
X_train.shape[0].compute()
48817562

The size of our training dataset is around 49 million rows. Let’s look at the data locally to see what we’re dealing with. We see that there are columns for pickup and dropoff latitude and longitude, passenger count, trip distance, day of week etc. These are the information we’ll use to estimate the trip fare amount.

X_train.head()
index passengerCount tripDistance startLon startLat rateCodeId endLon endLat diff h_distance day_of_week is_weekend
300446 300446 1.0 4.00 -73.984955 40.768543 1.0 -74.008789 40.719330 1324.0 5.809536e+00 6.0 0.0
163817 163817 3.0 1.93 -74.008179 40.722198 1.0 -73.992989 40.739151 840.0 1.395489e+00 2.0 0.0
236958 236958 5.0 1.10 -73.987595 40.775360 1.0 -73.976494 40.785755 360.0 1.394768e+00 5.0 0.0
73461 73461 1.0 0.76 -73.994698 40.725929 1.0 -73.994698 40.725929 180.0 1.005159e-13 5.0 0.0
294464 294464 1.0 0.60 -73.974342 40.748165 1.0 -73.982536 40.750767 229.0 1.395336e+00 6.0 0.0
X_infer
Dask DataFrame Structure:
index passengerCount tripDistance startLon startLat rateCodeId endLon endLat diff h_distance day_of_week is_weekend
npartitions=84
int64 float32 float32 float32 float32 float32 float32 float32 float32 float32 float32 float32
... ... ... ... ... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ... ... ... ... ...
Dask Name: split, 1 graph layer

Step 4: Train a XGBoost Model#

We are now ready to train a XGBoost model on the data and then predict the fare for each trip.

Step 4.a: Set training Parameters#

In this training example, we will use RMSE as the evaluation metric. It is also worth noting that performing HPO will lead to a set of more optimal hyperparameters.

Refer to the notebook HPO-RAPIDS in this repository for how to perform HPO on Azure.

params = {
    "learning_rate": 0.15,
    "max_depth": 8,
    "objective": "reg:squarederror",
    "subsample": 0.7,
    "colsample_bytree": 0.7,
    "min_child_weight": 1,
    "gamma": 1,
    "silent": True,
    "verbose_eval": True,
    "booster": "gbtree",  # 'gblinear' not implemented in dask
    "debug_synchronize": True,
    "eval_metric": "rmse",
    "tree_method": "gpu_hist",
    "num_boost_rounds": 100,
}

Step 4.b: Train XGBoost Model#

Since the data is already persisted in the dask workers in the cluster, the next steps should not take a lot of time.

data_train = xgb.dask.DaskDMatrix(client, X_train, y_train)
tic = timer()
xgboost_output = xgb.dask.train(
    client, params, data_train, num_boost_round=params["num_boost_rounds"]
)
xgb_gpu_model = xgboost_output["booster"]
toc = timer()
print(f"Wall clock time taken for this cell : {toc-tic} s")
Wall clock time taken for this cell : 9.483002611901611 s

Step 4.c: Save the trained model to disk locally#

xgb_gpu_model
<xgboost.core.Booster at 0x7fcee8055bd0>
model_filename = "trained-model_nyctaxi.xgb"
xgb_gpu_model.save_model(model_filename)

Step 5: Predict & Score using vanilla XGBoost Predict#

Here we will use the predict and inplace_predict methods provided by the xgboost.dask library, out of the box. Later we will also use Forest Inference Library (FIL) to perform prediction.

_y_test = y_infer.compute()
wait(_y_test)
DoneAndNotDoneFutures(done=set(), not_done=set())
d_test = xgb.dask.DaskDMatrix(client, X_infer)
tic = timer()
y_pred = xgb.dask.predict(client, xgb_gpu_model, d_test)
y_pred = y_pred.compute()
wait(y_pred)
toc = timer()
print(f"Wall clock time taken for xgb.dask.predict : {toc-tic} s")
Wall clock time taken for xgb.dask.predict : 1.5550181320868433 s

Inference with the inplace predict method of dask XGBoost#

tic = timer()
y_pred = xgb.dask.inplace_predict(client, xgb_gpu_model, X_infer)
y_pred = y_pred.compute()
wait(y_pred)
toc = timer()
print(f"Wall clock time taken for inplace inference : {toc-tic} s")
Wall clock time taken for inplace inference : 1.8849179210374132 s
tic = timer()
print("Calculating MSE")
score = mean_squared_error(y_pred, _y_test)
print("Workflow Complete - RMSE: ", np.sqrt(score))
toc = timer()
print(f"Wall clock time taken for this cell : {toc-tic} s")
Calculating MSE
Workflow Complete - RMSE:  2.2968235
Wall clock time taken for this cell : 0.009336891933344305 s

Step 6: Predict & Score using FIL or Forest Inference Library#

Forest Inference Library (FIL) provides GPU accelerated inference capabilities for tree models. We will import the FIL functionality from cuML library.

It accepts a trained tree model in a treelite format (currently LightGBM, XGBoost and SKLearn GBDT and random forest models are supported). In general, using FIL allows for faster inference while using a large number of workers, and the latency benefits are more pronounced as the size of the dataset grows large.

Step 6.a: Predict using compute on a single worker in case the test dataset is small.#

As noted in Step 3.b, in case the test dataset is huge, it makes sense to call predict individually on the dask workers instead of bringing the entire test dataset to the local machine.

To perform prediction individually on the dask workers, each dask worker needs to load the XGB model using FIL. However, the dask workers are remote and do not have access to the locally saved model. Hence we need to send the locally saved XGB model to the dask workers.

Persist the local model in the remote dask workers#

# the code below will read the locally saved xgboost model
# in binary format and write a copy of it to all dask workers
def read_model(path):
    """Read model file into memory."""
    with open(path, "rb") as fh:
        return fh.read()


def write_model(path, data):
    """Write model file to disk."""
    with open(path, "wb") as fh:
        fh.write(data)
    return path


model_data = read_model("trained-model_nyctaxi.xgb")

# Tell all the workers to write the model to disk
client.run(write_model, "/tmp/model.dat", model_data)


# this code reads the binary file in worker directory
# and loads the model via FIL for prediction
def predict_model(input_df):
    import xgboost as xgb
    from cuml import ForestInference

    # load xgboost model using FIL and make prediction
    fm = ForestInference.load("/tmp/model.dat", model_type="xgboost")
    print(fm)
    pred = fm.predict(input_df)

    return pred

Inference with distributed predict with FIL#

tic = timer()
predictions = X_infer.map_partitions(
    predict_model, meta="float"
)  # this is like MPI reduce
y_pred = predictions.compute()
wait(y_pred)
toc = timer()
print(f"Wall clock time taken for this cell : {toc-tic} s")
/home/skirui/anaconda3/envs/rapids-23.08/lib/python3.10/site-packages/dask/dataframe/core.py:7047: FutureWarning: Meta is not valid, `map_partitions` and `map_overlap` expects output to be a pandas object. Try passing a pandas object as meta or a dict or tuple representing the (name, dtype) of the columns. In the future the meta you passed will not work.
  warnings.warn(
Wall clock time taken for this cell : 5.638823717948981 s
rows_csv = X_infer.iloc[:, 0].shape[0].compute()
print(
    f"It took {toc-tic} seconds to predict on {rows_csv} rows using FIL distributedly on each worker"
)
It took 5.638823717948981 seconds to predict on 5426301 rows using FIL distributedly on each worker
tic = timer()
score = mean_squared_error(y_pred, _y_test)
toc = timer()
print("Final - RMSE: ", np.sqrt(score))
Final - RMSE:  2.2968235

Step 7: Clean up#

client.close()
cluster.close()
Terminated VM dask-92c5978e-worker-54f8d057
Terminated VM dask-92c5978e-worker-9f9a9c9b
Removed disks for VM dask-92c5978e-worker-54f8d057
Removed disks for VM dask-92c5978e-worker-9f9a9c9b
Deleted network interface
Deleted network interface
Terminated VM dask-92c5978e-scheduler
Removed disks for VM dask-92c5978e-scheduler
Deleted network interface
Unassigned public IP