compute

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 12, 2026 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Overview

Package compute holds the chunk-level Arrow kernels that back Cosma's eager DataFrame operations: expression evaluation, filtering, and (in time) arithmetic, aggregation, and hashing. It depends only on Arrow and the public expression tree — never on the dataframe package — so eager operations call into it without an import cycle.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BoxedValues

func BoxedValues(chunked *arrow.Chunked) ([]any, error)

BoxedValues reads a chunked column into one boxed Go value per row, in row order, with nil for nulls. It backs group-key construction where the key type is only known at runtime.

func BuildArray

func BuildArray(dtype arrow.DataType, vals []any, mem memory.Allocator) (arrow.Array, error)

BuildArray materializes a slice of boxed values into an Arrow array of the given type. A nil element becomes a null. It is the inverse of BoxedValues and is used to emit group keys and reduction results.

func Eval

func Eval(e expr.ExprNode, rec arrow.Record, mem memory.Allocator) (arrow.Array, error)

Eval evaluates an expression tree against a single record batch and returns a newly-allocated arrow.Array of length rec.NumRows().

Ownership: the caller owns the returned array and must Release it. rec and its columns are not released. Every intermediate array allocated while recursing into children is released before Eval returns.

func EvalParallel

func EvalParallel(
	ctx context.Context,
	predicate expr.ExprNode,
	batches []arrow.Record,
	mem memory.Allocator,
) ([]arrow.Record, error)

EvalParallel evaluates predicate and applies FilterRecord across each record batch produced by iter, fanning the work out over up to workers() goroutines. The output slices are returned in the same order as the input batches.

It is the parallel counterpart of the serial loop in dataframe.Filter; the dataframe package calls it when Parallelism() > 1.

Ownership: each returned arrow.Record is newly allocated and owned by the caller. ctx cancellation causes an early return; already-filtered records are released before returning the context error.

func FilterRecord

func FilterRecord(rec arrow.Record, mask arrow.Array, mem memory.Allocator) (arrow.Record, error)

FilterRecord returns a new record holding only the rows for which mask is true. mask must be a boolean array of length rec.NumRows(); a null mask entry is treated as false and drops the row.

Ownership: the caller owns the returned record and must Release it. rec and mask are not released.

func Parallelism

func Parallelism() int

Parallelism returns the configured degree of parallelism. 0 means "GOMAXPROCS at call time."

func RegisterBinaryKernel

func RegisterBinaryKernel(typeID arrow.Type, k BinaryKernel)

RegisterBinaryKernel installs a binary kernel for the given Arrow type ID. A later registration for the same type ID replaces the earlier one.

func RegisterUnaryKernel

func RegisterUnaryKernel(typeID arrow.Type, k UnaryKernel)

RegisterUnaryKernel installs a unary kernel for the given Arrow type ID. A later registration for the same type ID replaces the earlier one.

func SetParallelism

func SetParallelism(n int)

SetParallelism sets the number of goroutines that EvalParallel and GroupReduceParallel will use. A value of 0 (the default) means GOMAXPROCS is queried at each call, so it tracks runtime changes. A value of 1 disables all parallelism (serial execution). Negative values are clamped to 1.

func SortIndices

func SortIndices(chunked *arrow.Chunked, descending, nullsFirst bool) ([]int64, error)

SortIndices returns a stable permutation of row indices that orders chunked by value. By default nulls sort last; set nullsFirst to place them first. The result is a take/gather permutation: reorder every column of a DataFrame by it to keep rows aligned.

func SortIndicesMulti

func SortIndicesMulti(keys []SortKey) ([]int64, error)

SortIndicesMulti returns a stable permutation ordering rows lexicographically by keys: keys[0] is most significant, ties broken by keys[1], and so on. Each key carries its own direction and null placement. All key columns must share the same logical length.

func Take

func Take(chunked *arrow.Chunked, indices []int64, mem memory.Allocator) (arrow.Array, error)

Take reorders a chunked array by row indices, returning a new single-chunk array. indices select logical rows across all chunks of chunked: index i refers to the i-th row in chunk order. A null at a selected row is preserved as a null in the output.

Ownership: the caller owns the returned array and must Release it. chunked is not retained or released.

Take is the shared gather primitive behind Sort (reorder by a permutation) and Join (gather matched rows). Indices may be negative to emit a null at that output position; this lets a left/outer join fill unmatched right-side rows with nulls.

Types

type Aggregates

type Aggregates struct {
	Count int64
	Sum   any
	Min   any
	Max   any
	Mean  float64
}

Aggregates holds the reductions for one numeric column, computed in a single pass. Sum, Min and Max are boxed in the column's element type and are nil when Count == 0 (empty or all-null). Mean is valid only when Count > 0.

func GroupReduce

func GroupReduce(groupIDs []int, numGroups int, chunked *arrow.Chunked) ([]Aggregates, error)

GroupReduce folds a chunked column into per-group Aggregates. groupIDs holds one group index per logical row, in row order across chunks, so it must have length chunked.Len(); numGroups is the number of distinct groups. The result is indexed by group id. It is the per-column half of a two-phase GroupBy: the caller assigns group ids once and reduces every value column against them.

func GroupReduceParallel

func GroupReduceParallel(groupIDs []int, numGroups int, chunked *arrow.Chunked) ([]Aggregates, error)

GroupReduceParallel is the parallel version of GroupReduce. It partitions the flat row range [0, len(groupIDs)) into up to workers() equal-sized strips, reduces each strip independently, then merges the partial results.

The two-phase approach is safe because Aggregates fields (Sum, Count, Min, Max) are commutative and associative: partial sums add, counts add, min/max take the extremum.

Output: a []Aggregates of length numGroups in the same format as GroupReduce.

func Reduce

func Reduce(chunked *arrow.Chunked) (Aggregates, error)

Reduce folds a chunked column into its Sum/Count/Min/Max/Mean, skipping nulls. It is the building block for both whole-column aggregation and the per-group partials of GroupBy.

type BinaryKernel

type BinaryKernel func(op expr.BinaryOp, left, right arrow.Array, mem memory.Allocator) (arrow.Array, error)

BinaryKernel evaluates a binary op over two same-length arrays of a custom Arrow type, returning a newly-allocated result the caller owns.

type OpMetrics

type OpMetrics struct {
	// Op is a short human-readable label for the operation (e.g. "filter",
	// "groupby").
	Op string

	// Workers is the number of goroutines that ran.
	Workers int

	// Rows is the number of logical rows processed.
	Rows int64

	// Elapsed is the wall-clock time from start of fan-out to completion of
	// all workers.
	Elapsed time.Duration
}

OpMetrics captures timing and throughput for a single parallel kernel invocation. It is updated by EvalParallel and GroupReduceParallel and can be read via LastMetrics().

func LastMetrics

func LastMetrics() OpMetrics

LastMetrics returns the OpMetrics recorded by the most recent parallel kernel call in this process. It is a snapshot; concurrent calls may overwrite it.

type SortKey

type SortKey struct {
	Column     *arrow.Chunked
	Descending bool
	NullsFirst bool
}

SortKey names a column-sort spec resolved against a chunked column: the column to compare plus its direction and null placement.

type UnaryKernel

type UnaryKernel func(op expr.UnaryOp, input arrow.Array, mem memory.Allocator) (arrow.Array, error)

UnaryKernel evaluates a unary op over one array of a custom Arrow type, returning a newly-allocated result the caller owns.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL