Aggregation Groupby#
- group GroupBy
Functions
-
bool is_streaming_groupby_supported(data_type values_type, aggregation::Kind kind)#
Returns true if
streaming_groupbysupports the given value type and aggregation kind combination.Use this to query support without constructing a
streaming_groupby. Atruereturn implies that anaggregate()call with a value column ofvalues_typeand an aggregation ofkindwill not be rejected on type/kind grounds.- Parameters:
values_type – Type of the value column the aggregation would run on
kind – Aggregation kind
- Returns:
True if the combination is supported, false otherwise
-
struct aggregation_request#
- #include <groupby.hpp>
Request for groupby aggregation(s) to perform on a column.
The group membership of each
value[i]is determined by the corresponding rowiin the original order ofkeysused to construct thegroupby. I.e., for eachaggregation,values[i]is aggregated with all othervalues[j]where rowsiandjinkeysare equivalent.values.size()column must equalkeys.num_rows().Public Members
-
column_view values#
The elements to aggregate.
-
std::vector<std::unique_ptr<groupby_aggregation>> aggregations#
Desired aggregations.
-
column_view values#
-
struct scan_request#
- #include <groupby.hpp>
Request for groupby aggregation(s) for scanning a column.
The group membership of each
value[i]is determined by the corresponding rowiin the original order ofkeysused to construct thegroupby. I.e., for eachaggregation,values[i]is aggregated with all othervalues[j]where rowsiandjinkeysare equivalent.values.size()column must equalkeys.num_rows().Public Members
-
column_view values#
The elements to aggregate.
-
std::vector<std::unique_ptr<groupby_scan_aggregation>> aggregations#
Desired aggregations.
-
column_view values#
-
struct aggregation_result#
- #include <groupby.hpp>
The result(s) of an
aggregation_requestFor every
aggregation_requestgiven togroupby::aggregateanaggregation_resultwill be returned. Theaggregation_resultholds the resulting column(s) for each requested aggregation on therequests values.Public Members
-
std::vector<std::unique_ptr<column>> results = {}#
Columns of results from an
aggregation_request
-
std::vector<std::unique_ptr<column>> results = {}#
-
class groupby#
- #include <groupby.hpp>
Groups values by keys and computes aggregations on those groups.
Public Functions
-
explicit groupby(table_view const &keys, null_policy null_handling = null_policy::EXCLUDE, sorted keys_are_sorted = sorted::NO, std::vector<order> const &column_order = {}, std::vector<null_order> const &null_precedence = {})#
Construct a groupby object with the specified
keysIf the
keysare already sorted, better performance may be achieved by passingkeys_are_sorted == trueand indicating the ascending/descending order of each column and null order incolumn_orderandnull_precedence, respectively.Note
This object does not maintain the lifetime of
keys. It is the user’s responsibility to ensure thegroupbyobject does not outlive the data viewed by thekeystable_view.- Parameters:
keys – Table whose rows act as the groupby keys
null_handling – Indicates whether rows in
keysthat contain NULL values should be includedkeys_are_sorted – Indicates whether rows in
keysare already sortedcolumn_order – If
keys_are_sorted == YES, indicates whether each column is ascending/descending. If empty, assumes all columns are ascending. Ignored ifkeys_are_sorted == false.null_precedence – If
keys_are_sorted == YES, indicates the ordering of null values in each column. Else, ignored. If empty, assumes all columns usenull_order::AFTER. Ignored ifkeys_are_sorted == false.
-
std::pair<std::unique_ptr<table>, std::vector<aggregation_result>> aggregate(host_span<aggregation_request const> requests, rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref())#
Performs grouped aggregations on the specified values.
The values to aggregate and the aggregations to perform are specified in an
aggregation_request. Each request contains acolumn_viewof values to aggregate and a set ofaggregations to perform on those elements.For each
aggregationin a request,values[i]is aggregated with all othervalues[j]where rowsiandjinkeysare equivalent.The
size()of the request column must equalkeys.num_rows().For every
aggregation_requestanaggregation_resultwill be returned. Theaggregation_resultholds the resulting column(s) for each requested aggregation on therequests values. The order of the columns in each result is the same order as was specified in the request.The returned
tablecontains the group labels for each group, i.e., the distinct rows fromkeys. Elementiacross all aggregation results belongs to the group at rowiin the group labels table.The order of the rows in the group labels is arbitrary. Furthermore, successive
groupby::aggregatecalls may return results in different orders.Example:
Input: keys: {1 2 1 3 1} {1 2 1 4 1} request: values: {3 1 4 9 2} aggregations: {{SUM}, {MIN}} result: keys: {3 1 2} {4 1 2} values: SUM: {9 9 1} MIN: {9 2 1}
- Throws:
cudf::logic_error – If
requests[i].values.size() != keys.num_rows().- Parameters:
requests – The set of columns to aggregate and the aggregations to perform
stream – CUDA stream used for device memory operations and kernel launches.
mr – Device memory resource used to allocate the returned table and columns’ device memory
- Returns:
Pair containing the table with each group’s distinct key and a vector of aggregation_results for each request in the same order as specified in
requests.
-
std::pair<std::unique_ptr<table>, std::vector<aggregation_result>> scan(host_span<scan_request const> requests, rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref())#
Performs grouped scans on the specified values.
The values to aggregate and the aggregations to perform are specified in an
aggregation_request. Each request contains acolumn_viewof values to aggregate and a set ofaggregations to perform on those elements.For each
aggregationin a request,values[i]is scan aggregated with all previousvalues[j]where rowsiandjinkeysare equivalent.The
size()of the request column must equalkeys.num_rows().For every
aggregation_requestanaggregation_resultwill be returned. Theaggregation_resultholds the resulting column(s) for each requested aggregation on therequests values. The order of the columns in each result is the same order as was specified in the request.The returned
tablecontains the group labels for each row, i.e., thekeysgiven to groupby object. Elementiacross all aggregation results belongs to the group at rowiin the group labels table.The order of the rows in the group labels is arbitrary. Furthermore, successive
groupby::scancalls may return results in different orders.Example:
Input: keys: {1 2 1 3 1} {1 2 1 4 1} request: values: {3 1 4 9 2} aggregations: {{SUM}, {MIN}} result: keys: {3 1 1 1 2} {4 1 1 1 2} values: SUM: {9 3 7 9 1} MIN: {9 3 3 2 1}
- Throws:
cudf::logic_error – If
requests[i].values.size() != keys.num_rows().- Parameters:
requests – The set of columns to scan and the scans to perform
stream – CUDA stream used for device memory operations and kernel launches.
mr – Device memory resource used to allocate the returned table and columns’ device memory
- Returns:
Pair containing the table with each group’s key and a vector of aggregation_results for each request in the same order as specified in
requests.
-
std::pair<std::unique_ptr<table>, std::unique_ptr<table>> shift(table_view const &values, host_span<size_type const> offsets, std::vector<std::reference_wrapper<scalar const>> const &fill_values, rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref())#
Performs grouped shifts for specified values.
In
jth column, for each group,ith element is determined by thei - offsets[j]th element of the group. Ifi - offsets[j] < 0 or >= group_size, the value is determined byfill_values[j].Example:
keys: {1 4 1 3 4 4 1} {1 2 1 3 2 2 1} values: {3 9 1 4 2 5 7} {"a" "c" "bb" "ee" "z" "x" "d"} offset: {2, -1} fill_value: {@, @} result (group order maybe different): keys: {3 1 1 1 4 4 4} {3 1 1 1 2 2 2} values: {@ @ @ 3 @ @ 9} {@ "bb" "d" @ "z" "x" @} ------------------------------------------------- keys: {1 4 1 3 4 4 1} {1 2 1 3 2 2 1} values: {3 9 1 4 2 5 7} {"a" "c" "bb" "ee" "z" "x" "d"} offset: {-2, 1} fill_value: {-1, "42"} result (group order maybe different): keys: {3 1 1 1 4 4 4} {3 1 1 1 2 2 2} values: {-1 7 -1 -1 5 -1 -1} {"42" "42" "a" "bb" "42" "c" "z"}Note
The first returned table stores the keys passed to the groupby object. Row
iof the key table corresponds to the group labels of rowiin the shifted columns. The key order in each group matches the input order. The order of each group is arbitrary. The group order in successive calls togroupby::shiftsmay be different.- Parameters:
values – Table whose columns to be shifted
offsets – The offsets by which to shift the input
fill_values – Fill values for indeterminable outputs
stream – CUDA stream used for device memory operations and kernel launches.
mr – Device memory resource used to allocate the returned table and columns’ device memory
- Throws:
cudf::logic_error – if
fill_value[i] dtype does not matchvalues[i] dtype forith column- Returns:
Pair containing the tables with each group’s key and the columns shifted
-
groups get_groups(cudf::table_view values = {}, rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref())#
Get the grouped keys and values corresponding to a groupby operation on a set of values.
Returns a
groupsobject representing the grouped keys and values. If values is not provided, only a grouping of the keys is performed, and thevaluesof thegroupsobject will benullptr.- Parameters:
values – Table representing values on which a groupby operation is to be performed
stream – CUDA stream used for device memory operations and kernel launches.
mr – Device memory resource used to allocate the returned tables’s device memory in the returned groups
- Returns:
A
groupsobject representing grouped keys and values
-
std::pair<std::unique_ptr<table>, std::unique_ptr<table>> replace_nulls(table_view const &values, host_span<cudf::replace_policy const> replace_policies, rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref())#
Performs grouped replace nulls on
value.For each
value[i] == NULLin groupj,value[i]is replaced with the first non-null value in groupjthat precedes or followsvalue[i]. If a non-null value is not found in the specified direction,value[i]is left NULL.The returned pair contains a column of the sorted keys and the result column. In result column, values of the same group are in contiguous memory. In each group, the order of values maintain their original order. The order of groups are not guaranteed.
Example:
//Inputs: keys: {3 3 1 3 1 3 4} {2 2 1 2 1 2 5} values: {3 4 7 @ @ @ @} {@ @ @ "x" "tt" @ @} replace_policies: {FORWARD, BACKWARD} //Outputs (group orders may be different): keys: {3 3 3 3 1 1 4} {2 2 2 2 1 1 5} result: {3 4 4 4 7 7 @} {"x" "x" "x" @ "tt" "tt" @}- Parameters:
values – [in] A table whose column null values will be replaced
replace_policies – [in] Specify the position of replacement values relative to null values, one for each column
stream – [in] CUDA stream used for device memory operations and kernel launches.
mr – [in] Device memory resource used to allocate device memory of the returned column
- Returns:
Pair that contains a table with the sorted keys and the result column
-
struct groups#
- #include <groupby.hpp>
The grouped data corresponding to a groupby operation on a set of values.
A
groupsobject holds two tables of identical number of rows: a table of grouped keys and a table of grouped values. In addition, it holds a vector of integer offsets into the rows of the tables, such thatoffsets[i+1] - offsets[i]gives the size of groupi.
-
explicit groupby(table_view const &keys, null_policy null_handling = null_policy::EXCLUDE, sorted keys_are_sorted = sorted::NO, std::vector<order> const &column_order = {}, std::vector<null_order> const &null_precedence = {})#
-
struct streaming_aggregation_request#
- #include <groupby.hpp>
Request for a single streaming groupby aggregation on a column.
Analogous to
aggregation_requestbut identifies the value column by index rather than bycolumn_view, since data arrives in batches after construction, and carries exactly one aggregation per request.column_indexrefers to the position of the value column in thetable_viewpassed tostreaming_groupby::aggregate(). Multiple aggregations on the same column are expressed as separate requests (e.g.,[{col, sum}, {col, mean}]). Internal deduplication ensures redundant computations are shared automatically.Public Members
-
std::unique_ptr<groupby_aggregation> aggregation#
Desired aggregation.
-
std::unique_ptr<groupby_aggregation> aggregation#
-
class streaming_groupby#
- #include <groupby.hpp>
Stateful streaming groupby that accumulates partial aggregates across batches.
streaming_groupbyand the statelessgroupbyserve different use cases. Use the statelessgroupbyfor single-shot aggregation when all input fits in memory at once. Usestreaming_groupbywhen input arrives over multiple batches and memory efficiency matters: peak memory does not scale with the number of input rows, because only the distinct keys seen so far and one aggregation slot per group are kept across batches. Arbitrarily long high-duplicate streams therefore accumulate without running out of memory.If memory is not a concern, concatenating all batches and calling the stateless
groupbyonce is also a valid choice. Reach forstreaming_groupbywhen (a) the cumulative input does not fit in memory, or (b) partial-state aggregation across distributed workers (merge()) is part of the workload.Per-batch cost is O(batch_size): each batch does direct hash table insertion and in-place aggregation updates against the persistent state. Partial states can be combined via
merge(), and final results are produced viafinalize().The
max_distinct_keysparameter sets the upper bound on the number of distinct key combinations across the lifetime of this object. The persistent state is sized tomax_distinct_keys(constant for the lifetime of the object); the stored distinct keys grow with the number of distinct keys actually seen, so the incremental key storage is O(distinct_keys()× key_size) and does not scale with cumulative input rows.Cumulative input rows are not bounded — only cumulative distinct keys. A single batch may also not exceed
max_distinct_keysrows; this is an implementation limit because each in-flight batch row is encoded asmax_distinct_keys + row_idxinside the hash set, which must fit incudf::size_type.All column types (including variable-width types such as strings, lists, and structs) are supported for key columns. Only hash-based aggregation kinds are supported; use
is_streaming_groupby_supported()to query a specific (value type, aggregation kind) combination.Supported aggregation kinds: SUM, SUM_OF_SQUARES, PRODUCT, MIN, MAX, COUNT_VALID, COUNT_ALL, MEAN, M2, VARIANCE, STD
- Throws std::invalid_argument:
for unsupported aggregation kinds
- Throws std::invalid_argument:
if a single batch exceeds
max_distinct_keysrows- Throws cudf::logic_error:
if cumulative distinct keys exceed
max_distinct_keys
Public Functions
-
streaming_groupby(streaming_groupby&&) noexcept#
Move constructor.
-
streaming_groupby &operator=(streaming_groupby&&) noexcept#
Move assignment operator.
- Returns:
Reference to this object.
-
explicit streaming_groupby(host_span<size_type const> key_indices, host_span<streaming_aggregation_request const> requests, size_type max_distinct_keys, null_policy null_handling = null_policy::EXCLUDE)#
Construct a streaming groupby object with a persistent hash table.
- Parameters:
key_indices – Indices of columns in the data table that serve as groupby keys
requests – The aggregations to perform and which columns to aggregate
max_distinct_keys – Upper bound on distinct key combinations. The hash set, companion vectors, and aggregation results table are all sized to this capacity. Cumulative input rows are not bounded.
null_handling – Indicates whether rows in keys that contain NULL values should be included
- Throws:
std::invalid_argument – if
max_distinct_keys <= 0std::invalid_argument – if any requested aggregation kind is unsupported
-
void aggregate(table_view const &data, rmm::cuda_stream_view stream = cudf::get_default_stream())#
Feed a batch of data into the streaming aggregation.
Batch keys are inserted into the persistent hash set and aggregation results are updated atomically. The input
datatable is not referenced after this call returns.- Parameters:
data – Table containing both key and value columns
stream – CUDA stream used for device memory operations and kernel launches
- Throws:
std::invalid_argument – if
data.num_rows()exceedsmax_distinct_keyscudf::logic_error – if cumulative distinct keys exceed
max_distinct_keys
-
void merge(streaming_groupby const &other, rmm::cuda_stream_view stream = cudf::get_default_stream())#
Merge another streaming_groupby’s accumulated partial state into this one.
Extracts the other object’s accumulated intermediate state and merges it into this object’s persistent hash table. The other object is not modified. Both objects must have been constructed with compatible aggregation requests, and this object must have had at least one
aggregate()call.- Parameters:
other – The streaming_groupby whose partial state to merge
stream – CUDA stream used for device memory operations and kernel launches
- Throws:
std::invalid_argument – if the other object has more distinct keys than
max_distinct_keyscudf::logic_error – if this object has not been initialized via
aggregate()cudf::logic_error – if distinct keys exceed
max_distinct_keysafter merge
-
std::pair<std::unique_ptr<table>, std::vector<aggregation_result>> finalize(rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()) const#
Finalize the accumulated partial aggregates into final results.
For most aggregation kinds the partial state is the final result. For kinds like MEAN, VARIANCE, or STD, a finalization step converts the internal partial representation (e.g., sum+count) into the user-facing result.
This does not modify the internal state;
aggregate()may be called again afterward.- Parameters:
stream – CUDA stream used for device memory operations and kernel launches
mr – Device memory resource used to allocate the returned table and columns
- Throws:
cudf::logic_error – if no data has been accumulated
- Returns:
Pair of distinct keys table and a vector of aggregation_results (one per request)
-
size_type distinct_keys() const noexcept#
Returns the number of distinct keys accumulated so far.
Returns 0 before any successful
aggregate()ormerge()call.- Returns:
The current count of distinct keys in the persistent hash table
-
bool is_streaming_groupby_supported(data_type values_type, aggregation::Kind kind)#