shuffler.hpp
1 
6 #pragma once
7 
8 #include <unordered_set>
9 
10 #include <rapidsmpf/shuffler/shuffler.hpp>
11 #include <rapidsmpf/streaming/core/channel.hpp>
12 #include <rapidsmpf/streaming/core/context.hpp>
13 #include <rapidsmpf/streaming/core/node.hpp>
14 #include <rapidsmpf/streaming/cudf/partition.hpp>
15 
16 namespace rapidsmpf::streaming {
17 
51  public:
67  std::shared_ptr<Context> ctx,
68  OpID op_id,
72  );
73 
74  // Prevent copying
75  ShufflerAsync(ShufflerAsync const&) = delete;
76  ShufflerAsync& operator=(ShufflerAsync const&) = delete;
77 
78  ~ShufflerAsync() noexcept;
79 
85  [[nodiscard]] constexpr std::shared_ptr<Context> const& ctx() const {
86  return ctx_;
87  }
88 
94  [[nodiscard]] constexpr shuffler::PartID total_num_partitions() const {
95  return shuffler_.total_num_partitions;
96  }
97 
103  [[nodiscard]] constexpr shuffler::Shuffler::PartitionOwner const&
104  partition_owner() const {
105  return shuffler_.partition_owner;
106  }
107 
109  [[nodiscard]] std::span<shuffler::PartID const> local_partitions() const;
110 
112  void insert(std::unordered_map<shuffler::PartID, PackedData>&& chunks);
113 
128  [[nodiscard]] Node insert_finished(std::vector<shuffler::PartID>&& pids);
129 
149  [[nodiscard]] coro::task<std::optional<std::vector<PackedData>>> extract_async(
150  shuffler::PartID pid
151  );
152 
158  using ExtractResult = std::pair<shuffler::PartID, std::vector<PackedData>>;
159 
175  [[nodiscard]] coro::task<std::optional<ExtractResult>> extract_any_async();
176 
177  private:
192  [[nodiscard]] Node finished_drain();
193 
194  std::shared_ptr<Context> ctx_;
195  coro::task_container<coro::thread_pool>
196  notifications_;
197  Semaphore semaphore_{0};
198  coro::latch
199  latch_;
200  std::mutex mtx_;
201  shuffler::Shuffler shuffler_;
202 
210  std::unordered_set<shuffler::PartID> ready_pids_;
211  std::unordered_set<shuffler::PartID> extracted_pids_;
212 };
213 
214 namespace node {
235 [[nodiscard]] Node shuffler(
236  std::shared_ptr<Context> ctx,
237  std::shared_ptr<Channel> ch_in,
238  std::shared_ptr<Channel> ch_out,
239  OpID op_id,
240  shuffler::PartID total_num_partitions,
242 );
243 
244 } // namespace node
245 
246 } // namespace rapidsmpf::streaming
Shuffle service for cuDF tables.
Definition: shuffler.hpp:47
static Rank round_robin(std::shared_ptr< Communicator > const &comm, PartID pid)
A PartitionOwner that distribute the partition using round robin.
Definition: shuffler.hpp:64
PartID const total_num_partitions
Total number of partition in the shuffle.
Definition: shuffler.hpp:339
std::function< Rank(std::shared_ptr< Communicator >, PartID)> PartitionOwner
Function that given a Communicator and a PartID, returns the rapidsmpf::Rank of the owning node.
Definition: shuffler.hpp:55
PartitionOwner const partition_owner
Function to determine partition ownership.
Definition: shuffler.hpp:340
Context for nodes (coroutines) in rapidsmpf.
Definition: context.hpp:25
An asynchronous shuffler that allows concurrent insertion and extraction of data.
Definition: shuffler.hpp:50
constexpr std::shared_ptr< Context > const & ctx() const
Gets the streaming context associated with this shuffler.
Definition: shuffler.hpp:85
ShufflerAsync(std::shared_ptr< Context > ctx, OpID op_id, shuffler::PartID total_num_partitions, shuffler::Shuffler::PartitionOwner partition_owner=shuffler::Shuffler::round_robin)
Constructs a new ShufflerAsync instance.
constexpr shuffler::Shuffler::PartitionOwner const & partition_owner() const
Gets the partition owner function used by this shuffler.
Definition: shuffler.hpp:104
coro::task< std::optional< ExtractResult > > extract_any_async()
Asynchronously extracts data for any ready partition.
Node insert_finished(std::vector< shuffler::PartID > &&pids)
Insert a finish mark for a list of partitions.
constexpr shuffler::PartID total_num_partitions() const
Gets the total number of partitions for this shuffle operation.
Definition: shuffler.hpp:94
void insert(std::unordered_map< shuffler::PartID, PackedData > &&chunks)
Insert a bunch of packed (serialized) chunks into the shuffle.
std::pair< shuffler::PartID, std::vector< PackedData > > ExtractResult
Result type for extract_any_async operations.
Definition: shuffler.hpp:158
std::span< shuffler::PartID const > local_partitions() const
Returns the local partition IDs owned by the current node.
coro::task< std::optional< std::vector< PackedData > > > extract_async(shuffler::PartID pid)
Asynchronously extracts all data for a specific partition.
std::uint32_t PartID
Partition ID, which goes from 0 to the total number of partitions.
Definition: chunk.hpp:22
Node shuffler(std::shared_ptr< Context > ctx, std::shared_ptr< Channel > ch_in, std::shared_ptr< Channel > ch_out, OpID op_id, shuffler::PartID total_num_partitions, shuffler::Shuffler::PartitionOwner partition_owner=shuffler::Shuffler::round_robin)
Launches a shuffler node for a single shuffle operation.
coro::task< void > Node
Alias for a node in a streaming pipeline.
Definition: node.hpp:18
coro::semaphore< std::numeric_limits< std::ptrdiff_t >::max()> Semaphore
An awaitable semaphore to manage acquisition and release of finite resources.
Definition: channel.hpp:29