Aggregate Functions
Aggregate functions reduce multiple rows into a single value per group — like SUM(),
COUNT(), or AVG(). DuckDB supports parallel aggregation, which introduces a combine
step that merges partial results from parallel workers.
The aggregate lifecycle
flowchart TD
REG["**Registration**<br/>AggregateFunctionBuilder<br/>→ duckdb_register_aggregate_function"]
REG --> SIZE
SIZE --> INIT
INIT --> UPDATE
UPDATE --> COMBINE
COMBINE --> FINAL
FINAL --> DESTROY
SIZE["**state_size**()<br/>How many bytes to allocate per group?"]
INIT["**state_init**(state)<br/>Initialize a fresh state"]
UPDATE["**update**(chunk, states[])<br/>Process one input batch"]
COMBINE["**combine**(src[], tgt[], count)<br/>Merge partial results from parallel workers<br/>⚠️ Pitfall L1: target starts fresh — copy ALL config fields"]
FINAL["**finalize**(states[], out, count)<br/>Write results to output vector"]
DESTROY["**state_destroy**(states[], count)<br/>Free memory"]
style COMBINE fill:#fff3cd,stroke:#e6ac00,color:#333
DuckDB may call combine multiple times as it merges results from parallel segments.
Target states in combine are always fresh (zero-initialized via state_init).
Registration
#![allow(unused)] fn main() { use quack_rs::aggregate::AggregateFunctionBuilder; use quack_rs::types::TypeId; unsafe fn register(con: duckdb_connection) -> Result<(), ExtensionError> { unsafe { AggregateFunctionBuilder::new("my_agg") .param(TypeId::Varchar) // input type(s) .returns(TypeId::BigInt) // output type .state_size(state_size) .init(state_init) .update(update) .combine(combine) .finalize(finalize) .destructor(state_destroy) .register(con)?; } Ok(()) } }
The five core callbacks (state_size, init, update, combine, finalize) must be
set before register — the builder will return an error if any are missing. The
destructor callback is optional but strongly recommended when your state allocates
heap memory (e.g., when using FfiState<T>).
Callback signatures
state_size
#![allow(unused)] fn main() { unsafe extern "C" fn state_size(_info: duckdb_function_info) -> idx_t { FfiState::<MyState>::size_callback(_info) } }
Returns the size DuckDB must allocate per group. This is always size_of::<*mut MyState>()
— a pointer, since FfiState<T> stores a Box<T> pointer in the allocated slot.
state_init
#![allow(unused)] fn main() { unsafe extern "C" fn state_init(info: duckdb_function_info, state: duckdb_aggregate_state) { unsafe { FfiState::<MyState>::init_callback(info, state) }; } }
Allocates a Box<MyState> (using MyState::default()) and writes its raw pointer into
the DuckDB-allocated state slot.
update
#![allow(unused)] fn main() { unsafe extern "C" fn update( _info: duckdb_function_info, input: duckdb_data_chunk, states: *mut duckdb_aggregate_state, ) { let reader = unsafe { VectorReader::new(input, 0) }; let row_count = reader.row_count(); for row in 0..row_count { if unsafe { !reader.is_valid(row) } { continue; } let value = unsafe { reader.read_i64(row) }; let state_ptr = unsafe { *states.add(row) }; if let Some(st) = unsafe { FfiState::<MyState>::with_state_mut(state_ptr) } { st.accumulate(value); } } } }
states[i] corresponds to chunk row i. Each state belongs to one group.
combine
#![allow(unused)] fn main() { unsafe extern "C" fn combine( _info: duckdb_function_info, source: *mut duckdb_aggregate_state, target: *mut duckdb_aggregate_state, count: idx_t, ) { for i in 0..count as usize { let src = unsafe { FfiState::<MyState>::with_state(*source.add(i)) }; let tgt = unsafe { FfiState::<MyState>::with_state_mut(*target.add(i)) }; if let (Some(s), Some(t)) = (src, tgt) { // ⚠️ MUST copy ALL fields — see Pitfall L1 t.config_field = s.config_field; // configuration t.accumulator += s.accumulator; // data } } } }
Pitfall L1 — critical: Target states are fresh
T::default()values. You must copy every field, including configuration fields set duringupdate. Forgetting even one config field produces silently wrong results. See Pitfall L1.
finalize
#![allow(unused)] fn main() { unsafe extern "C" fn finalize( _info: duckdb_function_info, source: *mut duckdb_aggregate_state, result: duckdb_vector, count: idx_t, offset: idx_t, ) { let mut writer = unsafe { VectorWriter::new(result) }; for i in 0..count as usize { let state_ptr = unsafe { *source.add(i) }; match unsafe { FfiState::<MyState>::with_state(state_ptr) } { Some(st) => unsafe { writer.write_i64(offset as usize + i, st.result()) }, None => unsafe { writer.set_null(offset as usize + i) }, } } } }
The offset parameter is non-zero when DuckDB is writing into a portion of a larger vector.
Always add it to your index.
state_destroy
#![allow(unused)] fn main() { unsafe extern "C" fn state_destroy(states: *mut duckdb_aggregate_state, count: idx_t) { unsafe { FfiState::<WordCountState>::destroy_callback(states, count) }; } }
destroy_callback calls Box::from_raw for each state and then nulls the pointer,
preventing double-free. See Pitfall L2.
Complex parameter and return types
For functions that accept or return parameterized types like LIST(BIGINT),
MAP(VARCHAR, INTEGER), or STRUCT(...), use param_logical and
returns_logical instead of param and returns:
#![allow(unused)] fn main() { use quack_rs::aggregate::AggregateFunctionBuilder; use quack_rs::types::{LogicalType, TypeId}; unsafe fn register(con: duckdb_connection) -> Result<(), ExtensionError> { unsafe { AggregateFunctionBuilder::new("retention") .param(TypeId::Boolean) .param(TypeId::Boolean) .returns_logical(LogicalType::list(TypeId::Boolean)) // LIST(BOOLEAN) .state_size(state_size) .init(state_init) .update(update) .combine(combine) .finalize(finalize) .destructor(state_destroy) .register(con)?; } Ok(()) } }
param_logical and param can be interleaved — the parameter position is
determined by the total number of calls made so far:
#![allow(unused)] fn main() { AggregateFunctionBuilder::new("my_func") .param(TypeId::Varchar) // position 0: VARCHAR .param_logical(LogicalType::list(TypeId::BigInt)) // position 1: LIST(BIGINT) .param(TypeId::Integer) // position 2: INTEGER .returns(TypeId::BigInt) // ... }
If both returns and returns_logical are called, the logical type takes precedence.
Extra info
Attach arbitrary data to an aggregate function using extra_info. This is useful
for parameterising the function behaviour (e.g., passing configuration):
#![allow(unused)] fn main() { use std::os::raw::c_void; let config = Box::into_raw(Box::new(42u64)).cast::<c_void>(); unsafe { AggregateFunctionBuilder::new("my_agg") .param(TypeId::BigInt) .returns(TypeId::BigInt) .extra_info(config, Some(my_destroy)) .state_size(state_size) .init(state_init) .update(update) .combine(combine) .finalize(finalize) .destructor(state_destroy) .register(con)?; } }
Inside callbacks, retrieve the extra info with AggregateFunctionInfo::get_extra_info().
AggregateFunctionInfo
AggregateFunctionInfo wraps the duckdb_function_info handle provided to
aggregate function callbacks (update, combine, finalize, etc.). It exposes:
get_extra_info() -> *mut c_void— retrieves the extra-info pointer set during registrationset_error(message)— reports an error, causing DuckDB to abort the query
#![allow(unused)] fn main() { use quack_rs::aggregate::AggregateFunctionInfo; unsafe extern "C" fn update( info: duckdb_function_info, input: duckdb_data_chunk, states: *mut duckdb_aggregate_state, ) { let info = unsafe { AggregateFunctionInfo::new(info) }; let extra = unsafe { info.get_extra_info() }; // ... use extra info, or report errors via info.set_error("...") ... } }
Next steps
- State Management —
FfiState<T>,AggregateState, and lifecycle details - Overloading with Function Sets — register multiple signatures under one name