Quickstart#
Shuffle Basics#
rapidsmpf is designed as a service that plugs into other libraries. This means
it isn’t typically used as a standalone library, and is expected to operate in
some larger runtime.
Dask-cuDF Example#
rapidsmpf can be used with Dask-cuDF to shuffle a Dask DataFrame. This toy
example just loads the shuffled data into GPU memory. In practice, you would
reduce the output or write it to disk after shuffling.
import dask.distributed
import dask.dataframe as dd
from dask_cuda import LocalCUDACluster
from rapidsmpf.examples.dask import dask_cudf_shuffle
df = dask.datasets.timeseries().reset_index(drop=True).to_backend("cudf")
# RapidsMPF is compatible with `dask_cuda` workers.
# Use an rmm pool for optimal performance.
with LocalCUDACluster(rmm_pool_size=0.8) as cluster:
with dask.distributed.Client(cluster) as client:
shuffled = dask_cudf_shuffle(df, on=["name"])
# collect the results in memory.
result = shuffled.compute()
After shuffling on name, all of the records with a particular name will be in
the same partition. See Dask Integration for more.
Streaming Engine#
Basic streaming pipeline example in Python. In this example we have 3 Nodes in the network: push_to_channel->count_num_rows->pull_from_channel.
note: push_to_channel/pull_from_channel are convenience functions which simulate scans/writes
def main() -> int:
"""Basic example of a streaming pipeline."""
# Initialize configuration options from environment variables.
options = Options(get_environment_variables())
# Create a context that will be used by all streaming nodes.
ctx = Context(
comm=single_process_comm(options),
br=BufferResource(RmmResourceAdaptor(rmm.mr.get_current_device_resource())),
options=options,
)
# Executor for Python nodes (asyncio coroutines).
py_executor = ThreadPoolExecutor(max_workers=1)
# Create some pylibcudf tables as input to the streaming pipeline.
tables = [
cudf_to_pylibcudf_table(cudf.DataFrame({"a": [1 * seq, 2 * seq, 3 * seq]}))
for seq in range(10)
]
# Wrap tables in TableChunk objects before sending them into the pipeline.
# A TableChunk contains a pylibcudf table, a sequence number, and a CUDA stream.
table_chunks = [
Message(
seq,
TableChunk.from_pylibcudf_table(
expect, DEFAULT_STREAM, exclusive_view=False
),
)
for seq, expect in enumerate(tables)
]
# Create input and output channels for table chunks.
ch1: Channel[TableChunk] = ctx.create_channel()
ch2: Channel[TableChunk] = ctx.create_channel()
# Node 1: producer that pushes messages into the pipeline.
# This is a native C++ node that runs as a coroutine with minimal Python overhead.
node1: CppNode = push_to_channel(ctx, ch_out=ch1, messages=table_chunks)
# Node 2: Python node that counts the total number of rows.
# Runs as a Python coroutine (asyncio), which comes with overhead,
# but releases the GIL on `await` and when calling into C++ APIs.
@define_py_node()
async def count_num_rows(
ctx: Context, ch_in: Channel, ch_out: Channel, total_num_rows: list[int]
) -> None:
assert len(total_num_rows) == 1, "should be a scalar"
msg: Message[TableChunk] | None
while (msg := await ch_in.recv(ctx)) is not None:
# Convert the message back into a table chunk (releases the message).
table = TableChunk.from_message(msg)
# Accumulate the number of rows.
total_num_rows[0] += table.table_view().num_rows()
# The message is now empty since it was released.
assert msg.empty()
# Wrap the table chunk in a new message.
msg = Message(msg.sequence_number, table)
# Forward the message to the output channel.
await ch_out.send(ctx, msg)
# `msg == None` indicates the channel is closed, i.e. we are done.
# Before exiting, drain the output channel to close it gracefully.
await ch_out.drain(ctx)
# Nodes return None, so if we want an "output" value we can use either a closure
# or an output parameter like `total_num_rows`.
total_num_rows = [0] # Wrap scalar in a list to make it mutable in-place.
node2: PyNode = count_num_rows(
ctx, ch_in=ch1, ch_out=ch2, total_num_rows=total_num_rows
)
# Node 3: consumer that pulls messages from the pipeline.
# Like push_to_channel(), it returns a CppNode. It also returns a placeholder
# object that will be populated with the pulled messages after execution.
node3, out_messages = pull_from_channel(ctx, ch_in=ch2)
# Run all nodes. This blocks until every node has completed.
run_streaming_pipeline(
nodes=(
node1,
node2,
node3,
),
py_executor=py_executor,
)
# Collect and verify results.
expect = 0
for msg in out_messages.release():
table = TableChunk.from_message(msg).table_view()
expect += table.num_rows()
assert total_num_rows[0] == expect
return total_num_rows[0]
if __name__ == "__main__":
print(f"total_num_rows: {main()}")