table_chunk.hpp
1 
6 #pragma once
7 
8 #include <array>
9 #include <cstddef>
10 #include <cstdint>
11 #include <memory>
12 #include <optional>
13 
14 #include <cudf/packed_types.hpp>
15 #include <cudf/table/table.hpp>
16 #include <cudf/table/table_view.hpp>
17 #include <rmm/cuda_stream_view.hpp>
18 
19 #include <coro/task.hpp>
20 
21 #include <rapidsmpf/memory/content_description.hpp>
22 #include <rapidsmpf/memory/packed_data.hpp>
23 #include <rapidsmpf/owning_wrapper.hpp>
24 #include <rapidsmpf/streaming/core/context.hpp>
25 #include <rapidsmpf/streaming/core/memory_reserve_or_wait.hpp>
26 #include <rapidsmpf/streaming/core/message.hpp>
27 
28 namespace rapidsmpf::streaming {
29 
38 class TableChunk {
39  public:
54  enum class ExclusiveView : bool {
55  NO,
56  YES,
57  };
58 
65  TableChunk(std::unique_ptr<cudf::table> table, rmm::cuda_stream_view stream);
66 
94  cudf::table_view table_view,
96  OwningWrapper&& owner,
97  ExclusiveView exclusive_view
98  );
99 
107  TableChunk(std::unique_ptr<PackedData> packed_data);
108 
109  ~TableChunk() = default;
110 
117  TableChunk(TableChunk&& other) noexcept;
118 
126  TableChunk& operator=(TableChunk&& other) noexcept;
127  TableChunk(TableChunk const&) = delete;
128  TableChunk& operator=(TableChunk const&) = delete;
129 
135  [[nodiscard]] rmm::cuda_stream_view stream() const noexcept;
136 
143  [[nodiscard]] std::size_t data_alloc_size(MemoryType mem_type) const;
144 
151  [[nodiscard]] bool is_available() const noexcept;
152 
160  [[nodiscard]] std::size_t make_available_cost() const noexcept;
161 
174  [[nodiscard]] TableChunk make_available(MemoryReservation& reservation);
175 
189  [[nodiscard]] TableChunk make_available(MemoryReservation&& reservation);
190 
211  [[nodiscard]] coro::task<TableChunk> make_available(
212  std::shared_ptr<Context> ctx,
213  std::int64_t net_memory_delta = MemoryReserveOrWait::missing_net_memory_delta
214  );
215 
225  [[nodiscard]] cudf::table_view table_view() const;
226 
249  [[nodiscard]] bool is_spillable() const;
250 
266  [[nodiscard]] TableChunk copy(MemoryReservation& reservation) const;
267 
289  [[nodiscard]] std::unique_ptr<PackedData> into_packed_data(BufferResource* br) &&;
290 
296  [[nodiscard]] std::pair<cudf::size_type, cudf::size_type> shape() const noexcept;
297 
298  private:
301  OwningWrapper owner_{};
302 
303  // At most, one of the following unique pointers is non-null. If all of them are null,
304  // the TableChunk is a non-owning view.
305  // TODO: use a variant and drop the unique pointers?
306  std::unique_ptr<cudf::table> table_;
307  std::unique_ptr<PackedData> packed_data_;
308 
309  // Has value iff this TableChunk is available.
310  std::optional<cudf::table_view> table_view_;
311 
312  // Zero initialized data allocation size (one for each memory type).
313  std::array<std::size_t, MEMORY_TYPES.size()> data_alloc_size_ = {};
314  std::size_t make_available_cost_; // For now, only device memory cost is tracked.
315 
316  rmm::cuda_stream_view stream_;
317  bool is_spillable_;
318 };
319 
327 
335 Message to_message(std::uint64_t sequence_number, std::unique_ptr<TableChunk> chunk);
336 
337 } // namespace rapidsmpf::streaming
Class managing buffer resources.
Description of an object's content.
Represents a reservation for future memory allocation.
Utility class to store an arbitrary type-erased object while another object is alive.
Context for actors (coroutines) in rapidsmpf.
Definition: context.hpp:41
Asynchronous coordinator for memory reservation requests.
Type-erased message wrapper around a payload.
Definition: message.hpp:27
A unit of table data in a streaming pipeline.
Definition: table_chunk.hpp:38
std::size_t make_available_cost() const noexcept
Returns the estimated cost (in bytes) of making the table available.
TableChunk make_available(MemoryReservation &reservation)
Moves this table chunk into a new one with its cudf table made available.
TableChunk copy(MemoryReservation &reservation) const
Create a deep copy of the table chunk.
TableChunk(cudf::table_view table_view, rmm::cuda_stream_view stream, OwningWrapper &&owner, ExclusiveView exclusive_view)
Construct a TableChunk from a device table view.
ExclusiveView
Indicates whether the TableChunk holds an exclusive or shared view of the underlying table data.
Definition: table_chunk.hpp:54
bool is_available() const noexcept
Indicates whether the underlying cudf table data is fully available in device memory.
TableChunk(std::unique_ptr< cudf::table > table, rmm::cuda_stream_view stream)
Construct a TableChunk from a device table.
std::size_t data_alloc_size(MemoryType mem_type) const
Number of bytes allocated for the data in the specified memory type.
std::pair< cudf::size_type, cudf::size_type > shape() const noexcept
Return the shape of the table stored by the table chunk.
TableChunk & operator=(TableChunk &&other) noexcept
Move assignment.
rmm::cuda_stream_view stream() const noexcept
Returns the CUDA stream on which this table chunk was created.
cudf::table_view table_view() const
Returns a view of the underlying table.
TableChunk(TableChunk &&other) noexcept
Move constructor.
std::unique_ptr< PackedData > into_packed_data(BufferResource *br) &&
Convert this table chunk to a PackedData, avoiding unnecessary copies.
TableChunk(std::unique_ptr< PackedData > packed_data)
Construct a TableChunk from a packed data blob.
bool is_spillable() const
Indicates whether this table chunk can be spilled to device memory.
ContentDescription get_content_description(PackedData const &obj)
Generate a content description for PackedData.
Definition: packed_data.hpp:20
Message to_message(std::uint64_t sequence_number, std::unique_ptr< PackedData > chunk)
Wrap PackedData into a Message.
Definition: packed_data.hpp:33
@ YES
Overbooking is allowed.
@ NO
Overbooking is not allowed.
constexpr std::array< MemoryType, 3 > MEMORY_TYPES
All memory types sorted in decreasing order of preference.
Definition: memory_type.hpp:23
MemoryType
Enum representing the type of memory sorted in decreasing order of preference.
Definition: memory_type.hpp:16
Bag of bytes with metadata suitable for sending over the wire.
Definition: packed_data.hpp:26