Actors#

Actors are coroutine-based asynchronous relational operators that read from zero-or-more Channels and write to zero-or-more Channels within a Network.

C++

// sum the row counts of all incoming table chunks
rapidsmpf::streaming::Actor accumulator(
    std::shared_ptr<streaming::Context> ctx,
    std::shared_ptr<rapidsmpf::Channel> ch_in,
    std::shared_ptr<rapidsmpf::Channel> ch_out)
{
    co_await ctx->executor()->schedule();
    int64_t total = 0;
    while (true) {
        // continuously read until channel is empty
        auto msg = co_await ch_in->receive();
        if (msg.empty()) {
            break;
        }
        auto seq = msg.sequence_number();
        auto chunk = co_await msg.release<rapidsmpf::streaming::TableChunk>()
                            .make_available(ctx);
        total += chunk.table_view().num_rows();

        // Forward the chunk downstream.
        co_await ch_out->send(
            streaming::to_message(seq, std::move(chunk))
        );
    }
}

Python

@define_actor()
async def accumulator(ctx, ch_in, ch_out):
    """Sum a column across all incoming table chunks."""

    total = 0
    msg: Message[TableChunk] | None
    # continuously read until channel is empty
    while (msg := await ch_in.recv(ctx)) is not None:
        # Convert the message into a TableChunk (releases the message).
        table = TableChunk.from_message(msg)
        total += table.table_view().num_rows()

        # Wrap and forward the chunk downstream.
        await ch_out.send(ctx, Message(msg.sequence_number, table))

    # Drain the output channel to close it gracefully.
    await ch_out.drain(ctx)

examples of actors in C++ and Python

Actor Types#

Actors fall into two categories:

  • Local Actors: These include operations like filtering, projection, or column-wise transforms. They operate exclusively on local data and preserve CSP semantics.

  • Collective Actors: These include Collective Operations like Shuffler, join, groupby aggregations, etc. which require access to distributed data.

In the case of a collective Actor, remote communication is handled internally. For example, a shuffle actor may need to access all Partitions of a table, both local and remote, but this coordination and data exchange happens inside the CSP-process itself. As a reminder Channels are an abstraction and not used to serialize and pass data between workers.

This hybrid model, which combines a SPMD-style distribution model and a local CSP-style streaming model, offers several advantages:

  • It enables clear process composition and streaming semantics for local operations.

  • It allows collective coordination to be localized inside CSP-processes, avoiding the need for global synchronization or a complete global task graph.

  • It makes inter-worker parallelism explicit through SPMD-style communication.

For examples of communication actors and collective operations please read the shuffle architecture page.