table_chunk.hpp
1 
6 #pragma once
7 
8 #include <array>
9 #include <memory>
10 #include <optional>
11 
12 #include <cudf/contiguous_split.hpp>
13 #include <cudf/table/table.hpp>
14 #include <cudf/table/table_view.hpp>
15 
16 #include <rapidsmpf/buffer/content_description.hpp>
17 #include <rapidsmpf/buffer/packed_data.hpp>
18 #include <rapidsmpf/error.hpp>
19 #include <rapidsmpf/owning_wrapper.hpp>
20 #include <rapidsmpf/streaming/core/channel.hpp>
21 #include <rapidsmpf/streaming/core/context.hpp>
22 #include <rapidsmpf/streaming/core/node.hpp>
23 
24 namespace rapidsmpf::streaming {
25 
26 
36 class TableChunk {
37  public:
52  enum class ExclusiveView : bool {
53  NO,
54  YES,
55  };
56 
63  TableChunk(std::unique_ptr<cudf::table> table, rmm::cuda_stream_view stream);
64 
93  cudf::table_view table_view,
94  std::size_t device_alloc_size,
96  OwningWrapper&& owner,
97  ExclusiveView exclusive_view
98  );
99 
107  std::unique_ptr<cudf::packed_columns> packed_columns, rmm::cuda_stream_view stream
108  );
109 
117  TableChunk(std::unique_ptr<PackedData> packed_data);
118 
119  ~TableChunk() = default;
120 
122  TableChunk(TableChunk&&) = default;
123 
130  TableChunk(TableChunk const&) = delete;
131  TableChunk& operator=(TableChunk const&) = delete;
132 
138  [[nodiscard]] rmm::cuda_stream_view stream() const noexcept;
139 
146  [[nodiscard]] std::size_t data_alloc_size(MemoryType mem_type) const;
147 
154  [[nodiscard]] bool is_available() const noexcept;
155 
163  [[nodiscard]] std::size_t make_available_cost() const noexcept;
164 
177  [[nodiscard]] TableChunk make_available(MemoryReservation& reservation);
178 
188  [[nodiscard]] cudf::table_view table_view() const;
189 
212  [[nodiscard]] bool is_spillable() const;
213 
229  [[nodiscard]] TableChunk copy(MemoryReservation& reservation) const;
230 
231  private:
234  OwningWrapper owner_{};
235 
236  // At most, one of the following unique pointers is non-null. If all of them are null,
237  // the TableChunk is a non-owning view.
238  // TODO: use a variant and drop the unique pointers?
239  std::unique_ptr<cudf::table> table_;
240  std::unique_ptr<cudf::packed_columns> packed_columns_;
241  std::unique_ptr<PackedData> packed_data_;
242 
243  // Has value iff this TableChunk is available.
244  std::optional<cudf::table_view> table_view_;
245 
246  // Zero initialized data allocation size (one for each memory type).
247  std::array<std::size_t, MEMORY_TYPES.size()> data_alloc_size_ = {};
248  std::size_t make_available_cost_; // For now, only device memory cost is tracked.
249 
250  rmm::cuda_stream_view stream_;
251  bool is_spillable_;
252 };
253 
261 
269 Message to_message(std::uint64_t sequence_number, std::unique_ptr<TableChunk> chunk);
270 
271 } // namespace rapidsmpf::streaming
Description of an object's content.
Represents a reservation for future memory allocation.
Definition: resource.hpp:33
Utility class to store an arbitrary type-erased object while another object is alive.
Type-erased message wrapper around a payload.
Definition: message.hpp:27
A unit of table data in a streaming pipeline.
Definition: table_chunk.hpp:36
std::size_t make_available_cost() const noexcept
Returns the estimated cost (in bytes) of making the table available.
TableChunk & operator=(TableChunk &&)=default
Move assignment.
TableChunk make_available(MemoryReservation &reservation)
Moves this table chunk into a new one with its cudf table made available.
TableChunk copy(MemoryReservation &reservation) const
Create a deep copy of the table chunk.
ExclusiveView
Indicates whether the TableChunk holds an exclusive or shared view of the underlying table data.
Definition: table_chunk.hpp:52
bool is_available() const noexcept
Indicates whether the underlying cudf table data is fully available in device memory.
TableChunk(std::unique_ptr< cudf::table > table, rmm::cuda_stream_view stream)
Construct a TableChunk from a device table.
std::size_t data_alloc_size(MemoryType mem_type) const
Number of bytes allocated for the data in the specified memory type.
TableChunk(std::unique_ptr< cudf::packed_columns > packed_columns, rmm::cuda_stream_view stream)
Construct a TableChunk from packed columns.
TableChunk(TableChunk &&)=default
TableChunk is moveable.
rmm::cuda_stream_view stream() const noexcept
Returns the CUDA stream on which this table chunk was created.
TableChunk(cudf::table_view table_view, std::size_t device_alloc_size, rmm::cuda_stream_view stream, OwningWrapper &&owner, ExclusiveView exclusive_view)
Construct a TableChunk from a device table view.
cudf::table_view table_view() const
Returns a view of the underlying table.
TableChunk(std::unique_ptr< PackedData > packed_data)
Construct a TableChunk from a packed data blob.
bool is_spillable() const
Indicates whether this table chunk can be spilled to device memory.
Message to_message(std::uint64_t sequence_number, std::unique_ptr< PackedDataChunk > chunk)
Wrap a PackedDataChunk into a Message.
Definition: packed_data.hpp:44
ContentDescription get_content_description(PackedDataChunk const &obj)
Generate a content description for a PackedDataChunk.
Definition: packed_data.hpp:30