8 #include <unordered_set>
10 #include <rapidsmpf/shuffler/shuffler.hpp>
11 #include <rapidsmpf/streaming/core/actor.hpp>
12 #include <rapidsmpf/streaming/core/channel.hpp>
13 #include <rapidsmpf/streaming/core/context.hpp>
14 #include <rapidsmpf/streaming/cudf/partition.hpp>
68 std::shared_ptr<Context>
ctx,
69 std::shared_ptr<Communicator>
comm,
87 [[nodiscard]] constexpr std::shared_ptr<
Context> const&
ctx()
const {
96 [[nodiscard]] std::shared_ptr<Communicator>
const&
comm() const noexcept {
97 return shuffler_.
comm();
123 void insert(std::unordered_map<shuffler::PartID, PackedData>&& chunks);
160 [[nodiscard]] coro::task<std::optional<std::vector<PackedData>>>
extract_async(
203 [[nodiscard]]
Actor finished_drain();
205 std::shared_ptr<Context> ctx_;
206 coro::task_group<coro::thread_pool>
221 std::unordered_set<shuffler::PartID> ready_pids_;
222 std::unordered_set<shuffler::PartID> extracted_pids_;
248 std::shared_ptr<Context> ctx,
249 std::shared_ptr<Communicator> comm,
250 std::shared_ptr<Channel> ch_in,
251 std::shared_ptr<Channel> ch_out,
Shuffle service for cuDF tables.
std::function< Rank(std::shared_ptr< Communicator > const &, PartID, PartID)> PartitionOwner
Function that given a Communicator, PartID, and total partition count, returns the rapidsmpf::Rank of...
PartID const total_num_partitions
Total number of partition in the shuffle.
static Rank round_robin(std::shared_ptr< Communicator > const &comm, PartID pid, [[maybe_unused]] PartID total_num_partitions)
A PartitionOwner that distributes partitions using round robin assignment.
std::shared_ptr< Communicator > const & comm() const noexcept
Gets the communicator associated with this Shuffler.
PartitionOwner const partition_owner
Function to determine partition ownership.
Context for actors (coroutines) in rapidsmpf.
An asynchronous shuffler that allows concurrent insertion and extraction of data.
constexpr std::shared_ptr< Context > const & ctx() const
Gets the streaming context associated with this shuffler.
std::shared_ptr< Communicator > const & comm() const noexcept
Gets the communicator associated with this shuffler.
constexpr shuffler::Shuffler::PartitionOwner const & partition_owner() const
Gets the partition owner function used by this shuffler.
ShufflerAsync(std::shared_ptr< Context > ctx, std::shared_ptr< Communicator > comm, OpID op_id, shuffler::PartID total_num_partitions, shuffler::Shuffler::PartitionOwner partition_owner=shuffler::Shuffler::round_robin)
Constructs a new ShufflerAsync instance.
coro::task< std::optional< ExtractResult > > extract_any_async()
Asynchronously extracts data for any ready partition.
Actor insert_finished()
Insert a finish mark for a list of partitions.
constexpr shuffler::PartID total_num_partitions() const
Gets the total number of partitions for this shuffle operation.
void insert(std::unordered_map< shuffler::PartID, PackedData > &&chunks)
Insert a bunch of packed (serialized) chunks into the shuffle.
std::pair< shuffler::PartID, std::vector< PackedData > > ExtractResult
Result type for extract_any_async operations.
std::span< shuffler::PartID const > local_partitions() const
Returns the local partition IDs owned by the current node.
coro::task< std::optional< std::vector< PackedData > > > extract_async(shuffler::PartID pid)
Asynchronously extracts all data for a specific partition.
std::uint32_t PartID
Partition ID, which goes from 0 to the total number of partitions.
Actor shuffler(std::shared_ptr< Context > ctx, std::shared_ptr< Communicator > comm, std::shared_ptr< Channel > ch_in, std::shared_ptr< Channel > ch_out, OpID op_id, shuffler::PartID total_num_partitions, shuffler::Shuffler::PartitionOwner partition_owner=shuffler::Shuffler::round_robin)
Launches a shuffler actor for a single shuffle operation.
coro::task< void > Actor
Alias for an actor in a streaming graph.
coro::semaphore< std::numeric_limits< std::ptrdiff_t >::max()> Semaphore
An awaitable semaphore to manage acquisition and release of finite resources.
std::int32_t OpID
Operation ID defined by the user. This allows users to concurrently execute multiple operations,...