channel_metadata.hpp
1 
6 #pragma once
7 
8 #include <cstdint>
9 #include <optional>
10 #include <vector>
11 
12 #include <cudf/types.hpp>
13 
14 #include <rapidsmpf/streaming/core/message.hpp>
15 
16 namespace rapidsmpf::streaming {
17 
23 struct HashScheme {
24  std::vector<cudf::size_type> column_indices;
25  int modulus;
26 
31  bool operator==(HashScheme const&) const = default;
32 };
33 
48  enum class Type : std::uint8_t {
49  NONE,
50  INHERIT,
51  HASH,
52  };
53 
55  std::optional<HashScheme> hash;
56 
62  return {};
63  }
64 
70  return {.type = Type::INHERIT, .hash = std::nullopt};
71  }
72 
79  return {.type = Type::HASH, .hash = std::move(h)};
80  }
81 
86  bool operator==(PartitioningSpec const&) const = default;
87 };
88 
103 struct Partitioning {
108 
113  bool operator==(Partitioning const&) const = default;
114 };
115 
123  std::uint64_t local_count{};
125  bool duplicated{};
126 
128  ChannelMetadata() = default;
129 
138  std::uint64_t local_count, Partitioning partitioning = {}, bool duplicated = false
139  )
141  partitioning(std::move(partitioning)),
143 
148  bool operator==(ChannelMetadata const&) const = default;
149 };
150 
158 Message to_message(std::uint64_t sequence_number, std::unique_ptr<ChannelMetadata> m);
159 
160 } // namespace rapidsmpf::streaming
Type-erased message wrapper around a payload.
Definition: message.hpp:27
Message to_message(std::uint64_t sequence_number, std::unique_ptr< PackedData > chunk)
Wrap PackedData into a Message.
Definition: packed_data.hpp:33
Channel-level metadata describing the data stream.
bool duplicated
Whether data is duplicated on all workers.
Partitioning partitioning
How the data is partitioned.
std::uint64_t local_count
Local chunk-count estimate for this rank.
bool operator==(ChannelMetadata const &) const =default
Equality comparison.
ChannelMetadata()=default
Default constructor.
ChannelMetadata(std::uint64_t local_count, Partitioning partitioning={}, bool duplicated=false)
Construct metadata with specified values.
Hash partitioning scheme.
std::vector< cudf::size_type > column_indices
Column indices to hash on.
bool operator==(HashScheme const &) const =default
Equality comparison.
int modulus
Hash modulus (number of partitions).
Partitioning specification for a single hierarchical level.
Type
Type tag for PartitioningSpec.
@ NONE
No partitioning information at this level.
@ INHERIT
Partitioning is inherited from parent level unchanged.
static PartitioningSpec inherit()
Create a spec indicating partitioning passes through from parent.
bool operator==(PartitioningSpec const &) const =default
Equality comparison.
Type type
The type of partitioning.
std::optional< HashScheme > hash
Valid only when type == HASH.
static PartitioningSpec none()
Create a spec indicating no partitioning information.
static PartitioningSpec from_hash(HashScheme h)
Create a spec for hash partitioning.
Hierarchical partitioning metadata for a data stream.
PartitioningSpec inter_rank
Distribution across ranks (corresponds to primary communicator).
bool operator==(Partitioning const &) const =default
Equality comparison.
PartitioningSpec local
Distribution within a rank (corresponds to local/single communicator).