Streaming execution#

In addition to communications primitives, rapidsmpf provides building blocks for constructing and executing streaming pipelines for use in data processing engines. These communications primitives do not require use of the streaming execution framework, nor does use of the execution framework necessarily require using rapidsmpf communication primitives.

The goal is to enable pipelined “out of core” execution for tabular data processing workloads where one is processing data that does not all fit in GPU memory at once.

The term streaming is somewhat overloaded. In rapidsmpf, we mean execution on fixed size input data that we process piece by piece because it does not all fit in GPU memory at once, or we want to leverage multi-GPU parallelism and task launch pipelining.

This contrasts with “streaming analytics” or “event stream processing” where online queries are run on continuously arriving data.

Concepts#

The abstract framework we use is broadly that of Hoare’s Communicating Sequential Processes. Actors in the network are long-lived coroutines that read from zero-or-more Channels and write to zero-or-more Channels. In this sense, the programming model is relatively close to that of actors.

The communication channels are bounded capacity, multi-producer multi-consumer queues. An Actor processing data from an input Channel pulls data as necessary until the channel is empty, and can optionally signal that it needs no more data (thus shutting the producer down).

Communication between actors in the same process occurs through Channels. In contrast communication between processes uses the lower-level rapidsmpf communication primitives. In this way, achieving forward progress of the network is a local property, as long as the logically collective semantics of individual actors are obeyed internally.

The recommended usage to target multiple GPUs is to have one process per GPU, tied together by a rapidsmpf Communicator.

Building Actor networks from query plans#

Actor networks are designed to be lowered from some higher-level application specific intermediate representation, though one can write them by hand. For example, one can convert logical plans from query engines such as Polars, DuckDB, etc to a physical plan to be executed by rapidsmpf.

A typical approach is to define one Actor in the network for each physical operation in the query plan. Parallelism is obtained by using a multi-threaded executor to handle the concurrent actors that thus result.

For use with data processing engines, we provide a number of utility actors that layer a streaming (out of core) execution model over the GPU-accelerated libcudf library.

+------+     +--------+     +--------+     +------+
| Scan | --> | Select | --> | Filter | --> | Sink |
+------+     +--------+     +--------+     +------+

A typical rapidsmpf Network of Actors

Once constructed, the Network of Actors and their connecting Channels remains in place for the duration of the workflow. Each actor continuously awaits new data, activating as soon as inputs are ready and forwarding results downstream via the channels to the next actor(s) in the network.

Key Concepts#

The streaming engine is built around these core concepts (see the Glossary for complete definitions):

  • Network - A set of Actors connected by Channels

  • Actor - Coroutine-based asynchronous operators (read, filter, select, join)

  • Channel - Asynchronous messaging queues with backpressure

  • Message - Type-erased containers for data payloads

  • Communicator - The collective group of processes cooperating to produce the result.

  • Context - Provides access to resources (BufferResource, etc.)

  • Buffer - Raw memory allocations with attached CUDA streams