9 #include <condition_variable>
13 #include <unordered_map>
16 #include <rapidsmpf/communicator/communicator.hpp>
17 #include <rapidsmpf/shuffler/chunk.hpp>
18 #include <rapidsmpf/shuffler/postbox.hpp>
19 #include <rapidsmpf/utils.hpp>
68 std::vector<PartID>
const& local_partitions,
149 void wait_on(
PartID pid, std::optional<std::chrono::milliseconds> timeout = {});
155 [[nodiscard]] std::string
str()
const;
160 n_unfinished_partitions_;
164 struct PartitionInfo {
173 constexpr PartitionInfo() =
default;
176 RAPIDSMPF_EXPECTS(nchunks != 0,
"the goalpost was moved by 0 chunks");
178 ++rank_count <= nranks,
"the goalpost was moved more than one per rank"
180 chunk_goal += nchunks;
184 finished_chunk_count++;
187 (rank_count < nranks) || (finished_chunk_count <= chunk_goal),
188 "finished chunk exceeds the goal"
194 [[nodiscard]] constexpr
bool is_finished(Rank nranks)
const {
195 return rank_count == nranks && finished_chunk_count == chunk_goal;
198 [[nodiscard]] constexpr
ChunkID data_chunk_goal()
const {
203 return chunk_goal -
static_cast<ChunkID>(rank_count);
210 std::unordered_map<PartID, PartitionInfo> goalposts_;
212 mutable std::mutex mutex_;
213 mutable std::condition_variable wait_cv_;
Helper to tally the finish status of a shuffle.
void wait_on(PartID pid, std::optional< std::chrono::milliseconds > timeout={})
Wait for a specific partition to be finished (blocking). Optionally a timeout (in ms) can be provided...
std::function< void(PartID)> FinishedCallback
Callback function type called when a partition is finished.
std::string str() const
Returns a description of this instance.
PartID wait_any(std::optional< std::chrono::milliseconds > timeout={})
Returns the partition ID of a finished partition that hasn't been waited on (blocking)....
FinishCounter(Rank nranks, std::vector< PartID > const &local_partitions, FinishedCallback &&finished_callback=nullptr)
Construct a finish counter.
void move_goalpost(PartID pid, ChunkID nchunks)
Move the goalpost for a specific rank and partition.
void add_finished_chunk(PartID pid)
Add a finished chunk to a partition counter.
bool all_finished() const
Returns whether all partitions are finished (non-blocking).
std::uint64_t ChunkID
The globally unique ID of a chunk.
std::uint32_t PartID
Partition ID, which goes from 0 to the total number of partitions.
std::ostream & operator<<(std::ostream &os, detail::FinishCounter const &obj)
Overloads the stream insertion operator for the FinishCounter class.