SPMD#

SPMDEngine runs the streaming executor in SPMD mode: the same Python script runs once per GPU, and each process owns its local data fragment. Collective operations (shuffles, allgathers, joins) coordinate across processes to produce a globally consistent result.

On startup, SPMDEngine pins the process to the CPU cores and NUMA node closest to its GPU. Under rrun this binding is delegated to the launcher; outside rrun (single-process mode) SPMDEngine performs it itself. See HardwareBindingPolicy to override this behaviour.

Single-GPU setup#

To use SPMDEngine on a single GPU, create the engine and run your Python script as normal. You still get the full streaming executor (partitioned inputs, spilling, scaling past device memory), you just don’t need any multi-process coordination:

# python my_script.py
import polars as pl
from cudf_polars.engine.spmd import SPMDEngine

with SPMDEngine() as engine:
    result = (
        pl.scan_parquet("/data/dataset/*.parquet")
          .filter(pl.col("amount") > 100)
          .group_by("customer_id")
          .agg(pl.col("amount").sum())
          .collect(engine=engine)
    )

With a single rank, the Query symmetry requirement and Collecting distributed results steps below do not apply, collect() returns the full result directly.

Multi-GPU with rrun#

To run on more than one GPU, the same Python script must be launched collectively, and all processes must be informed that they are participating in the cluster. This is the role of the rrun launcher: it starts one process per GPU, SPMDEngine detects this and bootstraps a UCXX communicator across all ranks.

When the same script is launched without rrun, SPMDEngine falls back to a single-process, single-GPU communicator that requires no external communication library. This mode is useful for local development, unit tests, and single-GPU pipelines (see Single-GPU setup above).

# multi-GPU launch: rrun -n 4 python my_script.py
# single-GPU:       python my_script.py
import polars as pl
from cudf_polars.engine.spmd import SPMDEngine

with SPMDEngine() as engine:
    result = (
        pl.scan_parquet("/data/dataset/*.parquet")
          .filter(pl.col("amount") > 100)
          .group_by("customer_id")
          .agg(pl.col("amount").sum())
          .collect(engine=engine)
    )

File-based sources (scan_parquet, scan_csv, …) are automatically partitioned so that each rank reads a different file or row-group range. In-memory DataFrame objects are already rank-local, so each rank processes its own copy.

Configuring SPMDEngine#

For custom configuration, build a StreamingOptions and use SPMDEngine.from_options():

import polars as pl
from cudf_polars.engine.options import StreamingOptions
from cudf_polars.engine.spmd import SPMDEngine

opts = StreamingOptions(num_streaming_threads=8, fallback_mode="silent")

with SPMDEngine.from_options(opts) as engine:
    result = pl.scan_parquet("/data/dataset/*.parquet").collect(engine=engine)

See Configuration Options for the available fields.

SPMDEngine exposes a few properties that are useful in SPMD code:

  • engine.nranks / engine.rank: cluster size and local rank index.

  • engine.comm: the active rapidsmpf.communicator.Communicator.

  • engine.context: the active rapidsmpf.streaming.core.context.Context.

Query symmetry requirement#

All ranks must execute the same sequence of queries in the same order. Collective operations are matched using internal operation IDs. If one rank executes a collective that another rank does not, the program will deadlock.

In practice:

  • Avoid rank-conditional collect() or sink*() calls.

  • Avoid branches that change the query graph.

  • Keep the client script deterministic.

# OK: every rank runs the same query in the same order.
with SPMDEngine() as engine:
    result = (
        pl.scan_parquet("/data/*.parquet")
          .group_by("customer_id")
          .agg(pl.col("amount").sum())
          .collect(engine=engine)
    )
# DEADLOCKS: rank 0 issues a group_by collective the other ranks never see.
with SPMDEngine() as engine:
    df = pl.scan_parquet("/data/*.parquet")
    if engine.rank == 0:        # don't do this
        df = df.group_by("customer_id").agg(pl.col("amount").sum())
    result = df.collect(engine=engine)

Collecting distributed results#

Unlike RayEngine / DaskEngine, where .collect() gathers every partition to the client, here each rank’s .collect() returns its own fragment. If you want to keep processing the data rank-by-rank, just use that fragment directly. If you need a single concatenated view, use the helper below.

collect() returns a rank-local result. Use allgather_polars_dataframe() to assemble the full dataset on every rank:

from cudf_polars.streaming.actor_graph.collectives.common import reserve_op_id
from cudf_polars.engine.spmd import (
    SPMDEngine,
    allgather_polars_dataframe,
)

with SPMDEngine() as engine:
    result = pl.scan_parquet("/data/*.parquet").collect(engine=engine)

    with reserve_op_id() as op_id:
        full = allgather_polars_dataframe(
            engine=engine,
            local_df=result,
            op_id=op_id,
        )

op_id identifies the collective across ranks. All ranks must pass the same value. reserve_op_id() draws from the same pool that cudf-polars uses internally for shuffle and join collectives, so there is no risk of collision. Do not pass hardcoded integers: they may silently collide with an ID reserved by an active collective inside collect().

The result is a pl.DataFrame containing rows from all ranks in rank order (rank 0 first, then rank 1, …, rank N-1).

Reusing a communicator#

By default SPMDEngine bootstraps a new UCXX communicator on every construction. When running multiple engines in sequence (for example in a test suite or interactive session), repeated bootstrapping is unnecessary and can race on the file-based coordination layer shared by all ranks.

Pass a pre-created communicator via comm= to skip the bootstrap entirely. The engine does not close the communicator on shutdown. The caller retains ownership and can reuse it across multiple SPMDEngine lifetimes:

from rapidsmpf import bootstrap
from rapidsmpf.progress_thread import ProgressThread
from cudf_polars.engine.spmd import SPMDEngine

# Bootstrap once.
comm = bootstrap.create_ucxx_comm(progress_thread=ProgressThread())

# Reuse across multiple engine lifetimes, no re-bootstrap between them.
with SPMDEngine(comm=comm) as engine:
    result1 = df1.lazy().collect(engine=engine)

with SPMDEngine(comm=comm) as engine:
    result2 = df2.lazy().collect(engine=engine)

Cluster diagnostics#

gather_cluster_info() returns placement information for every rank:

with SPMDEngine() as engine:
    if engine.rank == 0:
        for i, info in enumerate(engine.gather_cluster_info()):
            print(
                f"rank {i}: hostname={info['hostname']}, pid={info['pid']}, "
                f"CUDA_VISIBLE_DEVICES={info['cuda_visible_devices']}"
            )