Public Member Functions | List of all members
cudf::groupby::streaming_groupby Class Reference

Stateful streaming groupby that accumulates partial aggregates across batches. More...

#include <groupby.hpp>

Public Member Functions

 streaming_groupby (streaming_groupby const &)=delete
 
streaming_groupbyoperator= (streaming_groupby const &)=delete
 
 streaming_groupby (streaming_groupby &&) noexcept
 Move constructor.
 
streaming_groupbyoperator= (streaming_groupby &&) noexcept
 Move assignment operator. More...
 
 streaming_groupby (host_span< size_type const > key_indices, host_span< streaming_aggregation_request const > requests, size_type max_distinct_keys, null_policy null_handling=null_policy::EXCLUDE)
 Construct a streaming groupby object with a persistent hash table. More...
 
void aggregate (table_view const &data, rmm::cuda_stream_view stream=cudf::get_default_stream())
 Feed a batch of data into the streaming aggregation. More...
 
void merge (streaming_groupby const &other, rmm::cuda_stream_view stream=cudf::get_default_stream())
 Merge another streaming_groupby's accumulated partial state into this one. More...
 
std::pair< std::unique_ptr< table >, std::vector< aggregation_result > > finalize (rmm::cuda_stream_view stream=cudf::get_default_stream(), rmm::device_async_resource_ref mr=cudf::get_current_device_resource_ref()) const
 Finalize the accumulated partial aggregates into final results. More...
 
size_type distinct_keys () const noexcept
 Returns the number of distinct keys accumulated so far. More...
 

Detailed Description

Stateful streaming groupby that accumulates partial aggregates across batches.

streaming_groupby and the stateless groupby serve different use cases. Use the stateless groupby for single-shot aggregation when all input fits in memory at once. Use streaming_groupby when input arrives over multiple batches and memory efficiency matters: peak memory does not scale with the number of input rows, because only the distinct keys seen so far and one aggregation slot per group are kept across batches. Arbitrarily long high-duplicate streams therefore accumulate without running out of memory.

If memory is not a concern, concatenating all batches and calling the stateless groupby once is also a valid choice. Reach for streaming_groupby when (a) the cumulative input does not fit in memory, or (b) partial-state aggregation across distributed workers (merge()) is part of the workload.

Per-batch cost is O(batch_size): each batch does direct hash table insertion and in-place aggregation updates against the persistent state. Partial states can be combined via merge(), and final results are produced via finalize().

The max_distinct_keys parameter sets the upper bound on the number of distinct key combinations across the lifetime of this object. The persistent state is sized to max_distinct_keys (constant for the lifetime of the object); the stored distinct keys grow with the number of distinct keys actually seen, so the incremental key storage is O(distinct_keys() × key_size) and does not scale with cumulative input rows.

Cumulative input rows are not bounded — only cumulative distinct keys. A single batch may also not exceed max_distinct_keys rows; this is an implementation limit because each in-flight batch row is encoded as max_distinct_keys + row_idx inside the hash set, which must fit in cudf::size_type.

All column types (including variable-width types such as strings, lists, and structs) are supported for key columns. Only hash-based aggregation kinds are supported; use is_streaming_groupby_supported() to query a specific (value type, aggregation kind) combination.

Supported aggregation kinds: SUM, SUM_OF_SQUARES, PRODUCT, MIN, MAX, COUNT_VALID, COUNT_ALL, MEAN, M2, VARIANCE, STD

Exceptions
std::invalid_argumentfor unsupported aggregation kinds
std::invalid_argumentif a single batch exceeds max_distinct_keys rows
cudf::logic_errorif cumulative distinct keys exceed max_distinct_keys

Definition at line 473 of file groupby.hpp.

Constructor & Destructor Documentation

◆ streaming_groupby()

cudf::groupby::streaming_groupby::streaming_groupby ( host_span< size_type const >  key_indices,
host_span< streaming_aggregation_request const >  requests,
size_type  max_distinct_keys,
null_policy  null_handling = null_policy::EXCLUDE 
)
explicit

Construct a streaming groupby object with a persistent hash table.

Parameters
key_indicesIndices of columns in the data table that serve as groupby keys
requestsThe aggregations to perform and which columns to aggregate
max_distinct_keysUpper bound on distinct key combinations. The hash set, companion vectors, and aggregation results table are all sized to this capacity. Cumulative input rows are not bounded.
null_handlingIndicates whether rows in keys that contain NULL values should be included
Exceptions
std::invalid_argumentif max_distinct_keys <= 0
std::invalid_argumentif any requested aggregation kind is unsupported

Member Function Documentation

◆ aggregate()

void cudf::groupby::streaming_groupby::aggregate ( table_view const &  data,
rmm::cuda_stream_view  stream = cudf::get_default_stream() 
)

Feed a batch of data into the streaming aggregation.

Batch keys are inserted into the persistent hash set and aggregation results are updated atomically. The input data table is not referenced after this call returns.

Parameters
dataTable containing both key and value columns
streamCUDA stream used for device memory operations and kernel launches
Exceptions
std::invalid_argumentif data.num_rows() exceeds max_distinct_keys
cudf::logic_errorif cumulative distinct keys exceed max_distinct_keys

◆ distinct_keys()

size_type cudf::groupby::streaming_groupby::distinct_keys ( ) const
noexcept

Returns the number of distinct keys accumulated so far.

Returns 0 before any successful aggregate() or merge() call.

Returns
The current count of distinct keys in the persistent hash table

◆ finalize()

std::pair<std::unique_ptr<table>, std::vector<aggregation_result> > cudf::groupby::streaming_groupby::finalize ( rmm::cuda_stream_view  stream = cudf::get_default_stream(),
rmm::device_async_resource_ref  mr = cudf::get_current_device_resource_ref() 
) const

Finalize the accumulated partial aggregates into final results.

For most aggregation kinds the partial state is the final result. For kinds like MEAN, VARIANCE, or STD, a finalization step converts the internal partial representation (e.g., sum+count) into the user-facing result.

This does not modify the internal state; aggregate() may be called again afterward.

Parameters
streamCUDA stream used for device memory operations and kernel launches
mrDevice memory resource used to allocate the returned table and columns
Returns
Pair of distinct keys table and a vector of aggregation_results (one per request)
Exceptions
cudf::logic_errorif no data has been accumulated

◆ merge()

void cudf::groupby::streaming_groupby::merge ( streaming_groupby const &  other,
rmm::cuda_stream_view  stream = cudf::get_default_stream() 
)

Merge another streaming_groupby's accumulated partial state into this one.

Extracts the other object's accumulated intermediate state and merges it into this object's persistent hash table. The other object is not modified. Both objects must have been constructed with compatible aggregation requests, and this object must have had at least one aggregate() call.

Parameters
otherThe streaming_groupby whose partial state to merge
streamCUDA stream used for device memory operations and kernel launches
Exceptions
std::invalid_argumentif the other object has more distinct keys than max_distinct_keys
cudf::logic_errorif this object has not been initialized via aggregate()
cudf::logic_errorif distinct keys exceed max_distinct_keys after merge

◆ operator=()

streaming_groupby& cudf::groupby::streaming_groupby::operator= ( streaming_groupby &&  )
noexcept

Move assignment operator.

Returns
Reference to this object.

The documentation for this class was generated from the following file: