Channels#

Channels are asynchronous messaging queues used to move Messages between Actors in the rapidsmpf streaming Network.

Animation Legend Animated buffer pipeline
As buffers move through the network, the channels (arrows) move from empty (dashed line) to full (solid line).
┌─────────────────────────────────────────────────────────────────────────┐
│                          STREAMING NETWORK                              │
│                                                                         │
│  ┌──────────┐         ┌──────────┐         ┌──────────┐                 │
│  │  Actor 1 │ ──ch1─> │  Actor 2 │ ──ch2─> │  Actor 3 │                 │
│  │(Producer)│         │(Transform)         │(Consumer)│                 │
│  └──────────┘         └──────────┘         └──────────┘                 │
│       │                    │                     │                      │
│    Message              Message               Message                   │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

fig: example streaming network with 3 Actors and 2 Channels

Components: • Actor: Coroutine that processes messages • Channel: Async queue connecting actors • Message: GPU Buffer with a CUDA Stream

In the above network, moving data in and out of Channels on a single GPU should be relatively cheap, nearly free! This strategy of using channels to move Messages/Buffers is a core methodology for rapidsmpf to overlap: scans, compute, Spilling, and communication.

Backpressure#

Channels provide asynchronous communication with backpressure. Backpressure is built-in by limiting the number of buffers in a channel to a single slot

Producer Side:                    Consumer Side:
┌──────────────┐                 ┌──────────────┐
│   Producer   │                 │   Consumer   │
│              │                 │              │
│ send(msg) ───┼────> Channel ───┼───> receive()│
│   (async)    │     [buffer]    │    (async)   │
└──────────────┘                 └──────────────┘
      │                                  │
      │ If consumer is full,             │
      │ Suspends (backpressure)          │
      ▼                                  ▼
   Resumes when                      Operates when
   space available                   data available

Key Properties: • Non-blocking: Coroutines suspend, not threads • Backpressure: Slow consumers throttle producers • Type-safe: Messages are type-erased but validated

A Consumer is “full” when an internal ring_buffer coro::ring_buffer<Message, 1> rb_; has reached capacity.

Additional backpressure control can be applied by usage of a Throttling system (semaphores) controlling the maximum number of threads/concurrent operations.

throttle = asyncio.Semaphore(4)
async with throttle:
      msg = Message(chunk)
      await ch_out.send(msg)
auto throttle = std::make_shared<ThrottlingAdaptor>(ch, 4);
std::vector<Actor> producers;]
constexpr int n_producer{100};
for (int i = 0; i < n_producer; i++) {
    producers.push_back(producer(ctx, throttle, i));
}

Internally, when using a throttle an Actor that writes into a Channel must acquire a ticket granting permission to write before being able to. The write/send then returns a receipt that grants permission to release the ticket. The consumer of a throttled channel reads Messages without issue. This means that the throttle is localised to the producer actors.

More simply, using a throttling adaptor limits the number of Messages a producer writes into a Channel. This pattern is very useful for producer Actors where we want some amount of bounded concurrency in the actors that might suspend before sending into a channel – especially useful when trying to minimize the over-production of long-lived memory: reads/scans, shuffles, etc.

e.g. a source actor that reads files. ThrottlingAdaptor will allow the Actor to delay reading files, until it has acquired a ticket to send a Message to the Channel. In comparison, non-throttling channels will suspend during send by which time, the files have already loaded into the memory unnecessarily.