Profiling and Tracing#
Streaming Statistics#
When a query runs on a streaming engine
(RayEngine,
DaskEngine,
SPMDEngine, or the default
engine="gpu"), the underlying streaming runtime can record detailed per-rank statistics:
shuffle byte counts, allgather participation, memory-pool high-water marks, and more. See the
underlying statistics reference for the full list of metrics.
Statistics collection is off by default. Enable it by setting statistics=True on
StreamingOptions (or exporting
RAPIDSMPF_STATISTICS=1), then call gather_statistics() on the engine to pull the per-rank
records:
import polars as pl
from cudf_polars.engine.options import StreamingOptions
from cudf_polars.engine.ray import RayEngine
opts = StreamingOptions(statistics=True)
with RayEngine.from_options(opts) as engine:
result = (
pl.scan_parquet("/data/*.parquet")
.group_by("customer_id")
.agg(pl.col("amount").sum())
.collect(engine=engine)
)
per_rank = engine.gather_statistics(clear=True)
for rank, stats in enumerate(per_rank):
print(f"rank {rank}:\n{stats}")
gather_statistics(*, clear=False) returns a list of rapidsmpf.statistics.Statistics objects,
one per rank, in rank order. Passing clear=True resets each rank’s counters after the gather —
useful when you want to scope statistics to a single query.
Use global_statistics(*, clear=False) when you only need the cluster-wide picture. It gathers
and merges the per-rank statistics into a single Statistics (counts and values summed, maxima
reduced with max). Capture it inside the engine context, then print after exit:
import polars as pl
from cudf_polars.engine.options import StreamingOptions
from cudf_polars.engine.ray import RayEngine
opts = StreamingOptions(statistics=True)
with RayEngine.from_options(opts) as engine:
result = pl.scan_parquet("/data/*.parquet").collect(engine=engine)
total = engine.global_statistics(clear=True)
print(total)
GPU Profiling#
For streaming queries, we recommend profiling with NVIDIA NSight Systems. cudf-polars
includes nvtx annotations to help you understand where time is being spent. Streaming
engines do not support LazyFrame.profile, since profile requires a single in-memory pass.
If you specifically need LazyFrame.profile,
the in-memory engine supports it. This is useful for small queries during development:
import polars as pl
q = pl.scan_parquet("ny-taxi/2024/*.parquet").filter(pl.col("total_amount") > 15.0)
profile = q.profile(engine=pl.GPUEngine(executor="in-memory"))
The result is (result_df, timings_df), see the Polars docs link above for the schema.
Tracing#
cudf-polars can optionally trace execution of each node in the query plan. To enable tracing, set
the environment variable CUDF_POLARS_LOG_TRACES to a true value (“1”, “true”, “y”, “yes”)
before starting your process.
cudf-polars logs traces at three scopes (levels):
plan: These generally happen once per query. This will include things like the (serialized) query plan.actor: (streaming engines only). There will be roughly oneactortrace per node in the logical plan.evaluate_ir_node: Logs the evaluation of a physical node in the query plan. Note that one logical node might expand to more than one physical nodes.
Each trace includes a scope key indicating which level that trace belongs to. actor-scoped
nodes will be nested under a plan-scoped node. When using a streaming engine,
evaluate_ir_node-scoped nodes will be nested under an actor-scoped node.
Schemas#
The different scopes have different schemas. Fields in bold are required / always present.
scope=plan#
Field Name |
Type |
Description |
|---|---|---|
scope |
Literal[“plan”] |
The string literal |
cudf_polars_query_id |
UUID4 |
A unique identifier for the polars query being executed. All traces logged as part of this query use this ID. |
plan |
|
A serialized representation of the query plan. |
event |
String |
A message like “Query Plan” |
scope=actor#
actor-scoped traces only appear when running on a streaming engine.
Field Name |
Type |
Description |
|---|---|---|
scope |
Literal[“actor”] |
The string literal |
cudf_polars_query_id |
UUID4 |
A unique identifier for the polars query being executed. All traces logged as part of this query use this ID. |
start |
int |
A nanosecond-resolution counter indicating when the actor started. Note: actors generally start early in the query and suspend waiting for data. |
stop |
int |
A nanosecond-resolution counter indicating when the actor completed. |
event |
String |
A message like “Streaming Actor”. |
actor_ir_type |
String |
The type of the actor, like |
actor_ir_id |
int |
A unique identifier for the actor. All traces logged under this actor will include this value. |
chunk_count |
int |
A counter for how many table chunks have been processed by this actor at the time of logging. |
duplicated |
bool |
Whether the output rows are duplicated across ranks (e.g. after an allgather). |
row_count |
int |
Total row count produced by this node during execution. |
scope=evaluate_ir_node#
Field Name |
Type |
Description |
|---|---|---|
scope |
|
The string literal |
cudf_polars_query_id |
UUID4 |
A unique identifier for the polars query being executed. All traces logged as part of this query use this ID. |
type |
string |
The name of the IR node |
start |
int |
A nanosecond-precision counter indicating when this node started executing |
stop |
int |
A nanosecond-precision counter indicating when this node finished executing |
overhead_duration |
int |
The overhead, in nanoseconds, added by tracing |
|
int |
The number of dataframes for the input / output |
|
|
A list with dictionaries with “shape” and “size” fields, one per input dataframe, for the input / output |
|
int |
The sum of the size (in bytes) of the dataframes for the input / output |
|
int |
The current number of bytes allocated by RMM Memory Resource used by cudf-polars for the input / output |
|
int |
The current number of allocations made by RMM Memory Resource used by cudf-polars for the input / output |
|
int |
The peak number of bytes allocated by RMM Memory Resource used by cudf-polars for the input / output |
|
int |
The peak number of allocations made by RMM Memory Resource used by cudf-polars for the input / output |
|
int |
The total number of bytes allocated by RMM Memory Resource used by cudf-polars for the input / output |
|
int |
The total number of allocations made by RMM Memory Resource used by cudf-polars for the input / output |
|
int |
The device memory usage of this process, as reported by NVML, for the input / output |
actor_ir_id |
int |
A unique identifier for the parent actor (streaming engines only). |
Setting CUDF_POLARS_LOG_TRACES=1 enables all the metrics. Depending on the query, the overhead
from collecting the memory or dataframe metrics can be measurable. You can disable some metrics
through additional environment variables. For example, to disable the memory-related metrics, set:
CUDF_POLARS_LOG_TRACES=1 CUDF_POLARS_LOG_TRACES_MEMORY=0
And to disable the memory and dataframe metrics, which essentially leaves just the duration metrics, set
CUDF_POLARS_LOG_TRACES=1 CUDF_POLARS_LOG_TRACES_MEMORY=0 CUDF_POLARS_LOG_TRACES_DATAFRAMES=0
Note that tracing still needs to be enabled with CUDF_POLARS_LOG_TRACES=1.
The implementation uses structlog to build log records. You can configure the output using structlog’s configuration and enrich the records with context variables.
>>> df = pl.DataFrame({"a": ["a", "a", "b"], "b": [1, 2, 3]}).lazy()
>>> df.group_by("a").agg(pl.col("b").min().alias("min"), pl.col("b").max().alias("max")).collect(engine=pl.GPUEngine(executor="in-memory"))
2025-09-10 07:44:01 [info ] Execute IR count_frames_input=0 count_frames_output=1 ... type=DataFrameScan
2025-09-10 07:44:01 [info ] Execute IR count_frames_input=1 count_frames_output=1 ... type=GroupBy
shape: (2, 3)
┌─────┬─────┬─────┐
│ a ┆ min ┆ max │
│ --- ┆ --- ┆ --- │
│ str ┆ i64 ┆ i64 │
╞═════╪═════╪═════╡
│ b ┆ 3 ┆ 3 │
│ a ┆ 1 ┆ 2 │
└─────┴─────┴─────┘