Nodes#

Nodes are coroutine-based asynchronous relational operators that read from zero-or-more Channels and write to zero-or-more Channels within a Network.

C++

// sum a column
rapidsmpf::streaming::Node accumulator(
    std::shared_ptr<rapidsmpf::Channel> ch_out,
    std::shared_ptr<rapidsmpf::Channel> ch_in)
{
    int64_t total = 0;
    while (true) {
        // continuously read until channel is empty
        auto msg = co_await ch_in->recv();
        if (!msg) {
            break;
        }

        auto column = ... // get column from data buffer in message

        total += column->sum<int64_t>();
    }

    // Send the accumulated result downstream as a message
    co_await ch_out->send(total));
}

Python

async def accumulator(ch_out, ch_in, msg):
    """Sum Column"""

    total = 0
    # continuously read until channel is empty
    while (msg := await ch_in is not None:)
        col = ... # get column from data buffer in message
        total += sum(col)

    # Send the accumulated result downstream as a message
    send(total, ch_out)

examples of nodes in C++ and Python

Node Types#

Nodes fall into two categories:

  • Local Nodes: These include operations like filtering, projection, or column-wise transforms. They operate exclusively on local data and preserve CSP semantics.

  • Collective Nodes: These include operations like shuffle, join, groupby aggregations, etc. which require access to distributed data.

In the case of a collective nodes, remote communication is handled internally. For example, a shuffle node may need to access all partitions of a table, both local and remote, but this coordination and data exchange happens inside the CSP-process itself. As a reminder “Channels” are an abstraction and not used to serialize and pass data between workers

This hybrid model, which combines a SPMD-style distribution model and a local CSP-style streaming model, offers several advantages:

  • It enables clear process composition and streaming semantics for local operations.

  • It allows collective coordination to be localized inside CSP-processes, avoiding the need for global synchronization or a complete global task graph.

  • It makes inter-worker parallelism explicit through SPMD-style communication.

For examples of communication nodes and collective operations please read the shuffle architecture page