table_chunk.hpp
1 
6 #pragma once
7 
8 #include <array>
9 #include <cstddef>
10 #include <cstdint>
11 #include <memory>
12 #include <optional>
13 
14 #include <cudf/packed_types.hpp>
15 #include <cudf/table/table.hpp>
16 #include <cudf/table/table_view.hpp>
17 #include <rmm/cuda_stream_view.hpp>
18 
19 #include <coro/task.hpp>
20 
21 #include <rapidsmpf/memory/content_description.hpp>
22 #include <rapidsmpf/memory/packed_data.hpp>
23 #include <rapidsmpf/owning_wrapper.hpp>
24 #include <rapidsmpf/streaming/core/context.hpp>
25 #include <rapidsmpf/streaming/core/memory_reserve_or_wait.hpp>
26 #include <rapidsmpf/streaming/core/message.hpp>
27 
28 namespace rapidsmpf::streaming {
29 
38 class TableChunk {
39  public:
54  enum class ExclusiveView : bool {
55  NO,
56  YES,
57  };
58 
65  TableChunk(std::unique_ptr<cudf::table> table, rmm::cuda_stream_view stream);
66 
94  cudf::table_view table_view,
96  OwningWrapper&& owner,
97  ExclusiveView exclusive_view
98  );
99 
107  TableChunk(std::unique_ptr<PackedData> packed_data);
108 
109  ~TableChunk() = default;
110 
117  TableChunk(TableChunk&& other) noexcept;
118 
126  TableChunk& operator=(TableChunk&& other) noexcept;
127  TableChunk(TableChunk const&) = delete;
128  TableChunk& operator=(TableChunk const&) = delete;
129 
135  [[nodiscard]] rmm::cuda_stream_view stream() const noexcept;
136 
143  [[nodiscard]] std::size_t data_alloc_size(MemoryType mem_type) const;
144 
151  [[nodiscard]] bool is_available() const noexcept;
152 
160  [[nodiscard]] std::size_t make_available_cost() const noexcept;
161 
174  [[nodiscard]] TableChunk make_available(MemoryReservation& reservation);
175 
189  [[nodiscard]] TableChunk make_available(MemoryReservation&& reservation);
190 
211  [[nodiscard]] coro::task<TableChunk> make_available(
212  std::shared_ptr<Context> ctx,
213  std::int64_t net_memory_delta = MemoryReserveOrWait::missing_net_memory_delta
214  );
215 
225  [[nodiscard]] cudf::table_view table_view() const;
226 
249  [[nodiscard]] bool is_spillable() const;
250 
266  [[nodiscard]] TableChunk copy(MemoryReservation& reservation) const;
267 
273  [[nodiscard]] std::pair<cudf::size_type, cudf::size_type> shape() const noexcept;
274 
275  private:
278  OwningWrapper owner_{};
279 
280  // At most, one of the following unique pointers is non-null. If all of them are null,
281  // the TableChunk is a non-owning view.
282  // TODO: use a variant and drop the unique pointers?
283  std::unique_ptr<cudf::table> table_;
284  std::unique_ptr<PackedData> packed_data_;
285 
286  // Has value iff this TableChunk is available.
287  std::optional<cudf::table_view> table_view_;
288 
289  // Zero initialized data allocation size (one for each memory type).
290  std::array<std::size_t, MEMORY_TYPES.size()> data_alloc_size_ = {};
291  std::size_t make_available_cost_; // For now, only device memory cost is tracked.
292 
293  rmm::cuda_stream_view stream_;
294  bool is_spillable_;
295 };
296 
304 
312 Message to_message(std::uint64_t sequence_number, std::unique_ptr<TableChunk> chunk);
313 
314 } // namespace rapidsmpf::streaming
Description of an object's content.
Represents a reservation for future memory allocation.
Utility class to store an arbitrary type-erased object while another object is alive.
Context for actors (coroutines) in rapidsmpf.
Definition: context.hpp:41
Asynchronous coordinator for memory reservation requests.
Type-erased message wrapper around a payload.
Definition: message.hpp:27
A unit of table data in a streaming pipeline.
Definition: table_chunk.hpp:38
std::size_t make_available_cost() const noexcept
Returns the estimated cost (in bytes) of making the table available.
TableChunk make_available(MemoryReservation &reservation)
Moves this table chunk into a new one with its cudf table made available.
TableChunk copy(MemoryReservation &reservation) const
Create a deep copy of the table chunk.
TableChunk(cudf::table_view table_view, rmm::cuda_stream_view stream, OwningWrapper &&owner, ExclusiveView exclusive_view)
Construct a TableChunk from a device table view.
ExclusiveView
Indicates whether the TableChunk holds an exclusive or shared view of the underlying table data.
Definition: table_chunk.hpp:54
bool is_available() const noexcept
Indicates whether the underlying cudf table data is fully available in device memory.
TableChunk(std::unique_ptr< cudf::table > table, rmm::cuda_stream_view stream)
Construct a TableChunk from a device table.
std::size_t data_alloc_size(MemoryType mem_type) const
Number of bytes allocated for the data in the specified memory type.
std::pair< cudf::size_type, cudf::size_type > shape() const noexcept
Return the shape of the table stored by the table chunk.
TableChunk & operator=(TableChunk &&other) noexcept
Move assignment.
rmm::cuda_stream_view stream() const noexcept
Returns the CUDA stream on which this table chunk was created.
cudf::table_view table_view() const
Returns a view of the underlying table.
TableChunk(TableChunk &&other) noexcept
Move constructor.
TableChunk(std::unique_ptr< PackedData > packed_data)
Construct a TableChunk from a packed data blob.
bool is_spillable() const
Indicates whether this table chunk can be spilled to device memory.
ContentDescription get_content_description(PackedData const &obj)
Generate a content description for PackedData.
Definition: packed_data.hpp:20
Message to_message(std::uint64_t sequence_number, std::unique_ptr< PackedData > chunk)
Wrap PackedData into a Message.
Definition: packed_data.hpp:33
@ YES
Overbooking is allowed.
@ NO
Overbooking is not allowed.
constexpr std::array< MemoryType, 3 > MEMORY_TYPES
All memory types sorted in decreasing order of preference.
Definition: memory_type.hpp:23
MemoryType
Enum representing the type of memory sorted in decreasing order of preference.
Definition: memory_type.hpp:16