shuffler.hpp
1 
6 #pragma once
7 
8 #include <coro/event.hpp>
9 
10 #include <rapidsmpf/shuffler/shuffler.hpp>
11 #include <rapidsmpf/streaming/core/actor.hpp>
12 #include <rapidsmpf/streaming/core/channel.hpp>
13 #include <rapidsmpf/streaming/core/context.hpp>
14 #include <rapidsmpf/streaming/cudf/partition.hpp>
15 
16 namespace rapidsmpf::streaming {
17 
44  public:
61  std::shared_ptr<Context> ctx,
62  std::shared_ptr<Communicator> comm,
63  OpID op_id,
67  );
68 
69  // Prevent copying
70  ShufflerAsync(ShufflerAsync const&) = delete;
71  ShufflerAsync& operator=(ShufflerAsync const&) = delete;
72 
73  ~ShufflerAsync() noexcept;
74 
80  [[nodiscard]] constexpr std::shared_ptr<Context> const& ctx() const {
81  return ctx_;
82  }
83 
89  [[nodiscard]] std::shared_ptr<Communicator> const& comm() const noexcept {
90  return shuffler_.comm();
91  }
92 
98  [[nodiscard]] constexpr shuffler::PartID total_num_partitions() const {
99  return shuffler_.total_num_partitions;
100  }
101 
107  [[nodiscard]] constexpr shuffler::Shuffler::PartitionOwner const&
108  partition_owner() const {
109  return shuffler_.partition_owner;
110  }
111 
113  [[nodiscard]] std::span<shuffler::PartID const> local_partitions() const;
114 
116  void insert(std::unordered_map<shuffler::PartID, PackedData>&& chunks);
117 
127  [[nodiscard]] Actor insert_finished();
128 
137  [[nodiscard]] std::vector<PackedData> extract(shuffler::PartID pid);
138 
139  private:
140  std::shared_ptr<Context> ctx_;
141  coro::event
142  event_{};
143  shuffler::Shuffler shuffler_;
144 };
145 
146 namespace actor {
168 [[nodiscard]] Actor shuffler(
169  std::shared_ptr<Context> ctx,
170  std::shared_ptr<Communicator> comm,
171  std::shared_ptr<Channel> ch_in,
172  std::shared_ptr<Channel> ch_out,
173  OpID op_id,
174  shuffler::PartID total_num_partitions,
176 );
177 
178 } // namespace actor
179 
180 } // namespace rapidsmpf::streaming
Shuffle service for all-to-all style communication of partitioned data.
Definition: shuffler.hpp:46
std::function< Rank(std::shared_ptr< Communicator > const &, PartID, PartID)> PartitionOwner
Function that given a Communicator, PartID, and total partition count, returns the rapidsmpf::Rank of...
Definition: shuffler.hpp:53
PartID const total_num_partitions
Total number of partition in the shuffle.
Definition: shuffler.hpp:320
static Rank round_robin(std::shared_ptr< Communicator > const &comm, PartID pid, [[maybe_unused]] PartID total_num_partitions)
A PartitionOwner that distributes partitions using round robin assignment.
Definition: shuffler.hpp:63
std::shared_ptr< Communicator > const & comm() const noexcept
Gets the communicator associated with this Shuffler.
Definition: shuffler.hpp:183
PartitionOwner const partition_owner
Function to determine partition ownership.
Definition: shuffler.hpp:321
Context for actors (coroutines) in rapidsmpf.
Definition: context.hpp:41
An asynchronous shuffler that wraps the synchronous shuffler with a coroutine interface.
Definition: shuffler.hpp:43
constexpr std::shared_ptr< Context > const & ctx() const
Gets the streaming context associated with this shuffler.
Definition: shuffler.hpp:80
std::shared_ptr< Communicator > const & comm() const noexcept
Gets the communicator associated with this shuffler.
Definition: shuffler.hpp:89
constexpr shuffler::Shuffler::PartitionOwner const & partition_owner() const
Gets the partition owner function used by this shuffler.
Definition: shuffler.hpp:108
ShufflerAsync(std::shared_ptr< Context > ctx, std::shared_ptr< Communicator > comm, OpID op_id, shuffler::PartID total_num_partitions, shuffler::Shuffler::PartitionOwner partition_owner=shuffler::Shuffler::round_robin)
Constructs a new ShufflerAsync instance.
Actor insert_finished()
Signal that no more data will be inserted into the shuffle.
constexpr shuffler::PartID total_num_partitions() const
Gets the total number of partitions for this shuffle operation.
Definition: shuffler.hpp:98
void insert(std::unordered_map< shuffler::PartID, PackedData > &&chunks)
Insert a bunch of packed (serialized) chunks into the shuffle.
std::span< shuffler::PartID const > local_partitions() const
Returns the local partition IDs owned by the current node.
std::vector< PackedData > extract(shuffler::PartID pid)
Extract all chunks belonging to the specified partition.
std::uint32_t PartID
Partition ID, which goes from 0 to the total number of partitions.
Definition: chunk.hpp:22
Actor shuffler(std::shared_ptr< Context > ctx, std::shared_ptr< Communicator > comm, std::shared_ptr< Channel > ch_in, std::shared_ptr< Channel > ch_out, OpID op_id, shuffler::PartID total_num_partitions, shuffler::Shuffler::PartitionOwner partition_owner=shuffler::Shuffler::round_robin)
Launches a shuffler actor for a single shuffle operation.
coro::task< void > Actor
Alias for an actor in a streaming graph.
Definition: actor.hpp:18
std::int32_t OpID
Operation ID defined by the user. This allows users to concurrently execute multiple operations,...