channel_metadata.hpp
1 
6 #pragma once
7 
8 #include <cstdint>
9 #include <memory>
10 #include <optional>
11 #include <vector>
12 
13 #include <cuda_runtime_api.h>
14 
15 #include <cudf/types.hpp>
16 
17 #include <rapidsmpf/memory/buffer_resource.hpp>
18 #include <rapidsmpf/memory/content_description.hpp>
19 #include <rapidsmpf/streaming/core/message.hpp>
20 #include <rapidsmpf/streaming/cudf/table_chunk.hpp>
21 
22 namespace rapidsmpf::streaming {
23 
29 struct HashScheme {
30  std::vector<cudf::size_type> column_indices;
31  int modulus;
32 
37  bool operator==(HashScheme const&) const = default;
38 };
39 
43 struct OrderKey {
44  cudf::size_type column_index;
45  cudf::order order;
46  cudf::null_order null_order;
47 
52  bool operator==(OrderKey const&) const = default;
53 
58  bool operator!=(OrderKey const&) const = default;
59 };
60 
80 struct OrderScheme {
81  std::vector<OrderKey> keys;
82  std::shared_ptr<TableChunk> boundaries;
84  bool strict_boundaries{false};
85 
87  OrderScheme() = default;
88 
100  std::vector<OrderKey> keys,
101  std::shared_ptr<TableChunk> boundaries,
102  bool strict_boundaries = false
103  );
104 
117  [[nodiscard]] OrderScheme with_keys(std::vector<OrderKey> new_keys) const;
118 
127  [[nodiscard]] bool boundaries_aligned_with(
128  OrderScheme const& other, rapidsmpf::BufferResource const& br
129  ) const;
130 };
131 
147  enum class Type : std::uint8_t {
148  NONE,
149  INHERIT,
150  HASH,
151  ORDER,
152  };
153 
155  std::optional<HashScheme> hash;
156  std::optional<OrderScheme> order;
157 
163  return {};
164  }
165 
171  return {.type = Type::INHERIT, .hash = std::nullopt, .order = std::nullopt};
172  }
173 
180  return {.type = Type::HASH, .hash = std::move(h), .order = std::nullopt};
181  }
182 
190 };
191 
206 struct Partitioning {
211 };
212 
220  std::uint64_t local_count{};
222  bool duplicated{};
223 
225  ChannelMetadata() = default;
226 
235  std::uint64_t local_count, Partitioning partitioning = {}, bool duplicated = false
236  )
238  partitioning(std::move(partitioning)),
240 };
241 
249 Message to_message(std::uint64_t sequence_number, std::unique_ptr<ChannelMetadata> m);
250 
251 } // namespace rapidsmpf::streaming
Class managing buffer resources.
Type-erased message wrapper around a payload.
Definition: message.hpp:27
Message to_message(std::uint64_t sequence_number, std::unique_ptr< PackedData > chunk)
Wrap PackedData into a Message.
Definition: packed_data.hpp:33
Channel-level metadata describing the data stream.
bool duplicated
Whether data is duplicated on all workers.
Partitioning partitioning
How the data is partitioned.
std::uint64_t local_count
Local chunk-count estimate for this rank.
ChannelMetadata()=default
Default constructor.
ChannelMetadata(std::uint64_t local_count, Partitioning partitioning={}, bool duplicated=false)
Construct metadata with specified values.
Hash partitioning scheme.
std::vector< cudf::size_type > column_indices
Column indices to hash on.
bool operator==(HashScheme const &) const =default
Equality comparison.
int modulus
Hash modulus (number of partitions).
A single sort key: column index, sort direction, and null placement.
cudf::null_order null_order
BEFORE or AFTER.
bool operator!=(OrderKey const &) const =default
Inequality comparison.
bool operator==(OrderKey const &) const =default
Equality comparison.
cudf::size_type column_index
Column to sort on.
cudf::order order
ASCENDING or DESCENDING.
Order-based partitioning scheme for sorted/range-partitioned data.
std::vector< OrderKey > keys
Sort keys (column, order, null_order per entry).
bool boundaries_aligned_with(OrderScheme const &other, rapidsmpf::BufferResource const &br) const
Check whether boundary values are aligned with another scheme.
std::shared_ptr< TableChunk > boundaries
OrderScheme with_keys(std::vector< OrderKey > new_keys) const
Return a new OrderScheme with updated key column indices, sharing boundaries.
OrderScheme(std::vector< OrderKey > keys, std::shared_ptr< TableChunk > boundaries, bool strict_boundaries=false)
Construct a validated OrderScheme.
bool strict_boundaries
See struct-level note on strict_boundaries semantics.
OrderScheme()=default
Default constructor. Produces an invalid (empty) scheme.
Partitioning specification for a single hierarchical level.
std::optional< OrderScheme > order
Valid only when type == ORDER.
static PartitioningSpec from_order(OrderScheme o)
Create a spec for order/range partitioning.
Type
Type tag for PartitioningSpec.
@ NONE
No partitioning information at this level.
@ INHERIT
Partitioning is inherited from parent level unchanged.
static PartitioningSpec inherit()
Create a spec indicating partitioning passes through from parent.
Type type
The type of partitioning.
std::optional< HashScheme > hash
Valid only when type == HASH.
static PartitioningSpec none()
Create a spec indicating no partitioning information.
static PartitioningSpec from_hash(HashScheme h)
Create a spec for hash partitioning.
Hierarchical partitioning metadata for a data stream.
PartitioningSpec inter_rank
Distribution across ranks (corresponds to primary communicator).
PartitioningSpec local
Distribution within a rank (corresponds to local/single communicator).