C++#

RapidsMPF exposes a full C++ API for building high-performance distributed GPU workloads without a Python runtime. The C++ layer is the foundation on which the Python bindings are built.

The C++ API reference is available at docs.rapids.ai/api/librapidsmpf/stable (nightly).

Coverage#

The C++ API provides access to all core RapidsMPF subsystems:

  • Communicator — MPI and UCXX backends for inter-process communication.

  • Shuffler — Out-of-core, distributed table shuffle service.

  • Streaming Engine — Asynchronous multi-GPU pipeline with Channels, Actors, and Messages.

  • Memory — BufferResource, spilling, pinned memory, and packed data utilities.

  • Config — Configuration options and environment-variable parsing.

Table Shuffle Service#

See Shuffle Architecture for an in-depth explanation of the shuffle design.

The following is a complete MPI program that uses the RapidsMPF shuffler:

#include <mpi.h>
#include <unistd.h>

#include <rapidsmpf/communicator/mpi.hpp>
#include <rapidsmpf/error.hpp>
#include <rapidsmpf/integrations/cudf/partition.hpp>
#include <rapidsmpf/memory/packed_data.hpp>
#include <rapidsmpf/shuffler/shuffler.hpp>
#include <rapidsmpf/statistics.hpp>

#include "../benchmarks/utils/random_data.hpp"

// An example of how to use the shuffler.
int main(int argc, char** argv) {
    // In this example we use the MPI backed. For convenience, rapidsmpf provides an
    // optional MPI-init function that initialize MPI with thread support.
    rapidsmpf::mpi::init(&argc, &argv);

    // Initialize configuration options from environment variables.
    rapidsmpf::config::Options options{rapidsmpf::config::get_environment_variables()};

    // First, we have to create a Communicator, which we will use throughout the
    // example. Notice, if you want to do multiple shuffles concurrently, each shuffle
    // should use its own Communicator backed by its own MPI communicator.
    std::shared_ptr<rapidsmpf::Communicator> comm =
        std::make_shared<rapidsmpf::MPI>(MPI_COMM_WORLD, options);

    // Create a statistics instance for the shuffler that tracks useful information.
    auto stats = std::make_shared<rapidsmpf::Statistics>();

    // Then a progress thread where the shuffler event loop executes is created. A single
    // progress thread may be used by multiple shufflers simultaneously.
    std::shared_ptr<rapidsmpf::ProgressThread> progress_thread =
        std::make_shared<rapidsmpf::ProgressThread>(comm->logger(), stats);

    // The Communicator provides a logger.
    auto& log = comm->logger();

    // We will use the same stream, memory, and buffer resource throughout the example.
    rmm::cuda_stream_view stream = cudf::get_default_stream();
    rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref();
    rapidsmpf::BufferResource br{mr};

    // As input data, we use a helper function from the benchmark suite. It creates a
    // random cudf table with 2 columns and 100 rows. In this example, each MPI rank
    // creates its own local input and we only have one input per rank but each rank
    // could take any number of inputs.
    cudf::table local_input = random_table(2, 100, 0, 10, stream, mr);

    // The total number of inputs equals the number of ranks, in this case.
    auto const total_num_partitions =
        static_cast<rapidsmpf::shuffler::PartID>(comm->nranks());

    // We create a new shuffler instance, which represents a single shuffle. It takes
    // a Communicator, the total number of partitions, and a "owner function", which
    // map partitions to their destination ranks. All ranks must use the same owner
    // function, in this example we use the included round-robin owner function.
    rapidsmpf::shuffler::Shuffler shuffler(
        comm,
        progress_thread,
        0,  // op_id
        total_num_partitions,
        &br,
        rapidsmpf::shuffler::Shuffler::round_robin  // partition owner
    );

    // It is our own responsibility to partition and pack (serialize) the input for
    // the shuffle. The shuffler only handles raw host and device buffers. However, it
    // does provide a convenience function that hash partitions a cudf table and packs
    // each partition. The result is a mapping of `PartID`, globally unique partition
    // identifiers, to their packed partitions.
    std::unordered_map<rapidsmpf::shuffler::PartID, rapidsmpf::PackedData> packed_inputs =
        rapidsmpf::partition_and_pack(
            local_input,
            {0},  // columns_to_hash
            static_cast<int>(total_num_partitions),
            cudf::hash_id::HASH_MURMUR3,
            cudf::DEFAULT_HASH_SEED,
            stream,
            &br
        );

    // Now, we can insert the packed partitions into the shuffler. This operation is
    // non-blocking and we can continue inserting new input partitions. E.g., a pipeline
    // could read, hash-partition, pack, and insert, one parquet-file at a time while the
    // distributed shuffle is being processed underneath.
    shuffler.insert(std::move(packed_inputs));

    // When we are finished inserting to a specific partition, we tell the shuffler.
    // Again, this is non-blocking and should be done as soon as we known that we don't
    // have more inputs for a specific partition. In this case, we are finished with all
    // partitions.
    for (rapidsmpf::shuffler::PartID i = 0; i < total_num_partitions; ++i) {
        shuffler.insert_finished(i);
    }

    // Vector to hold the local results of the shuffle operation.
    std::vector<std::unique_ptr<cudf::table>> local_outputs;

    // Wait for and process the shuffle results for each partition.
    while (!shuffler.finished()) {
        // Block until a partition is ready and retrieve its partition ID.
        rapidsmpf::shuffler::PartID finished_partition = shuffler.wait_any();

        // Extract the finished partition's data from the Shuffler.
        auto packed_chunks = shuffler.extract(finished_partition);

        // Unpack (deserialize) and concatenate the chunks into a single table using a
        // convenience function.
        local_outputs.push_back(
            rapidsmpf::unpack_and_concat(
                rapidsmpf::unspill_partitions(
                    std::move(packed_chunks), &br, rapidsmpf::AllowOverbooking::YES
                ),
                stream,
                &br
            )
        );
    }
    // At this point, `local_outputs` contains the local result of the shuffle.
    // Let's log the result.
    log.print("Finished shuffle with ", local_outputs.size(), " local output partitions");

    // Log the statistics report.
    log.print(stats->report());

    // Shutdown the Shuffler explicitly or let it go out of scope for cleanup.
    shuffler.shutdown();

    // Finalize the execution, `RAPIDSMPF_MPI` is a convenience macro that
    // checks for MPI errors.
    RAPIDSMPF_MPI(MPI_Finalize());
}

rrun — Distributed Launcher#

RapidsMPF includes rrun, a lightweight launcher that eliminates the MPI dependency for multi-GPU workloads. See Streaming execution for more on the programming model.

Build rrun#

cd cpp/build
cmake --build . --target rrun

Single-Node Launch#

# Launch 2 ranks on the local node
./tools/rrun -n 2 ./benchmarks/bench_comm -C ucxx -O all-to-all

# With verbose output and specific GPUs
./tools/rrun -v -n 4 -g 0,1,2,3 ./benchmarks/bench_comm -C ucxx