Quickstart#
Shuffle Basics#
rapidsmpf is designed as a service that plugs into other libraries. This means
it isn’t typically used as a standalone library, and is expected to operate in
some larger runtime.
Streaming Engine#
Basic streaming pipeline example in Python. In this example we have 3 Actors in the Network: push_to_channel->count_num_rows->pull_from_channel.
note: push_to_channel/pull_from_channel are convenience functions which simulate scans/writes
def main() -> int:
"""Basic example of a streaming graph."""
# Initialize configuration options from environment variables.
options = Options(get_environment_variables())
# Create a communicator and context that will be used by all streaming actors.
comm = single_process_comm(options, ProgressThread())
ctx = Context(
logger=comm.logger,
br=BufferResource(rmm.mr.get_current_device_resource()),
options=options,
)
# Create some pylibcudf tables as input to the streaming graph.
tables = [
pylibcudf.Table(
[
pylibcudf.Column.from_iterable_of_py(
[1 * seq, 2 * seq, 3 * seq],
pylibcudf.DataType(pylibcudf.TypeId.INT64),
)
]
)
for seq in range(10)
]
# Wrap tables in TableChunk objects before sending them into the graph.
# A TableChunk contains a pylibcudf table, a sequence number, and a CUDA stream.
table_chunks = [
Message(
seq,
TableChunk.from_pylibcudf_table(
expect, DEFAULT_STREAM, exclusive_view=False, br=ctx.br()
),
)
for seq, expect in enumerate(tables)
]
# Create input and output channels for table chunks.
ch1: Channel[TableChunk] = ctx.create_channel()
ch2: Channel[TableChunk] = ctx.create_channel()
# Actor 1: producer that pushes messages into the graph.
# This is a native C++ actor that runs as a coroutine with minimal Python overhead.
actor1: CppActor = push_to_channel(ctx, ch_out=ch1, messages=table_chunks)
# Actor 2: Python actor that counts the total number of rows.
# Runs as a Python coroutine (asyncio), which comes with overhead,
# but releases the GIL on `await` and when calling into C++ APIs.
@define_actor()
async def count_num_rows(
ctx: Context, ch_in: Channel, ch_out: Channel, total_num_rows: list[int]
) -> None:
assert len(total_num_rows) == 1, "should be a scalar"
msg: Message[TableChunk] | None
while (msg := await ch_in.recv(ctx)) is not None:
# Convert the message back into a table chunk (releases the message).
table = TableChunk.from_message(msg, br=ctx.br())
# Accumulate the number of rows.
total_num_rows[0] += table.table_view().num_rows()
# The message is now empty since it was released.
assert msg.empty()
# Wrap the table chunk in a new message.
msg = Message(msg.sequence_number, table)
# Forward the message to the output channel.
await ch_out.send(ctx, msg)
# `msg == None` indicates the channel is closed, i.e. we are done.
# Before exiting, drain the output channel to close it gracefully.
await ch_out.drain(ctx)
# Actors return None, so if we want an "output" value we can use either a closure
# or an output parameter like `total_num_rows`.
total_num_rows = [0] # Wrap scalar in a list to make it mutable in-place.
actor2: Awaitable[None] = count_num_rows(
ctx, ch_in=ch1, ch_out=ch2, total_num_rows=total_num_rows
)
# Actor 3: consumer that pulls messages from the graph.
# Like push_to_channel(), it returns a CppActor. It also returns a placeholder
# object that will be populated with the pulled messages after execution.
actor3, out_messages = pull_from_channel(ctx, ch_in=ch2)
# Run all actors. This blocks until every actor has completed.
run_actor_network(
ctx,
actors=(
actor1,
actor2,
actor3,
),
)
# Collect and verify results.
expect = 0
for msg in out_messages.release():
table = TableChunk.from_message(msg, br=ctx.br()).table_view()
expect += table.num_rows()
assert total_num_rows[0] == expect
# Shut down the context explicitly to ensure it happens on the same thread that
# created it. Alternatively, use `with Context(...) as ctx:` to shut it down
# automatically.
ctx.shutdown()
return total_num_rows[0]
if __name__ == "__main__":
print(f"total_num_rows: {main()}")