processing

package
v0.10.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 21, 2026 License: MIT Imports: 21 Imported by: 0

Documentation

Overview

Package processing provides the single dynamic processing engine for Pulse. It implements aggregators, attributes, filterers, and groupers that operate on record iterators backed by .pulse encoded data.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AggregateDecimalField added in v0.2.0

func AggregateDecimalField(agg types.AggregationType, records []*Record, field string, scale uint8) (decimalAggResult, error)

AggregateDecimalField runs the decimal-aware aggregator for `agg` over the given records, reading the wide map for `field`. Used by the orchestrator when the field type is decimal128 or nullable_decimal128.

func AggregationLabel added in v0.10.0

func AggregationLabel(agg *types.Aggregation) string

AggregationLabel returns the output column name for an aggregation, matching the labelling used by the streaming and buffered paths in Processor.Process. Empty label falls back to "<TYPE>_<Field>".

func CanChainRequest added in v0.10.0

func CanChainRequest(req *types.Request, schema *encoding.Schema) bool

CanChainRequest reports whether a request is eligible to participate in a ProcessChain stage. A request qualifies iff:

  • It passes the existing mergeable gate (CanMergeRequest), i.e. its online state can be merged across partitions of the input.
  • Every aggregator emits a single scalar value per output row (FREQUENCY emits a map; MODE emits a string — both are excluded from chain output until a richer downstream-schema synth lands).

The chain executor calls this before each stage. A failing stage surfaces PULSE_CHAIN_NOT_MERGEABLE with the stage index in details.

func CanMergeRequest added in v0.8.0

func CanMergeRequest(req *types.Request, schema *encoding.Schema) bool

CanMergeRequest reports whether a request's online state is mergeable across input partitions — the gate that the per-shard parallel reducer in service/shard_reduce.go consults before fanning out work across a worker pool. Returns true iff every aggregator, grouper, and filterer is mergeable AND the request contains no windows, no features, no regressions, no tests, no two-pass attributes, and no decimal-typed aggregation targets. A nil/empty aggregation list returns false (nothing to merge).

Mergeable is a strict subset of Streamable. Custom extension operators are conservatively treated as non-mergeable (the merge surface is not yet exposed on the extension registration struct).

func CanStreamRequest added in v0.2.0

func CanStreamRequest(req *types.Request, schema *encoding.Schema) bool

CanStreamRequest is the exported parity hook used by tests in descriptor/ (which cannot import processing) to confirm that a PredictResult.Streamable value matches the runtime gate. Application code should rely on PredictResult.Streamable, not this helper.

func ChainOutputSchema added in v0.10.0

func ChainOutputSchema(req *types.Request) (*encoding.Schema, error)

ChainOutputSchema synthesises the schema produced by a mergeable chain stage. The returned schema is purely in-memory — no byte layout — and is intended for downstream stages constructed via SliceIterator over RecordsFromChainRows.

Layout:

  • One categorical_u32 field per Group entry (named grp.Field), with an empty Dictionary that the caller populates as rows are materialized.
  • One f64 field per Aggregation entry (named via AggregationLabel).

Row-local attributes (FORMULA, DATE_PART) feed aggregators but do not surface in the streaming/grouped output today, so they do not appear in the chain schema. ByteOffset / BitPosition are zero across the board because the schema is never serialised.

func EnableReuse added in v0.2.0

func EnableReuse(iter RecordIterator)

EnableReuse opts the iterator into per-row Record reuse if it implements ReusableIterator. Safe no-op for iterators that do not support reuse (e.g. SliceIterator, whose records already exist as independent values).

func FormatJoinKindError added in v0.10.0

func FormatJoinKindError(kind string) string

FormatJoinKindError surfaces a unified "unsupported kind" message for callers that want to print the rejection reason themselves. Unused today; reserved for the upcoming outer/left/anti landings.

func IsDecimalAggregationSupported added in v0.2.0

func IsDecimalAggregationSupported(agg types.AggregationType) bool

IsDecimalAggregationSupported reports whether agg has a decimal-aware implementation.

func JoinedSchema added in v0.10.0

func JoinedSchema(left, right *encoding.Schema, spec *types.JoinSpec) (*encoding.Schema, error)

JoinedSchema synthesises the schema produced by a single inner hash-join attached to a Request. Left fields appear unchanged; right fields are prefixed with spec.As (when set) and validated for collisions against the left side. The returned schema has no on-wire byte layout — like ChainOutputSchema, it exists to satisfy downstream operator lookups against an in-memory SliceIterator.

Returns PULSE_JOIN_FIELD_COLLISION when a non-prefixed right field shares a name with a left field. Callers can set spec.As to disambiguate.

func StreamabilityKey added in v0.7.0

func StreamabilityKey(category, name string) string

StreamabilityKey is the canonical map key for ExtensionRegistry.Streamable. Format: "<category>|<name>". The category strings are the lowercase singular forms used in error details and the manifest: aggregator, attribute, filterer, grouper, window, feature, test.

Types

type Aggregator

type Aggregator interface {
	// Aggregate computes the aggregation over the given records for the named field.
	Aggregate(records []*Record, field string) (float64, error)
}

Aggregator computes a single aggregate value from a set of records.

type AggregatorFactory

type AggregatorFactory func(agg *types.Aggregation, schema *encoding.Schema) (Aggregator, error)

AggregatorFactory creates an Aggregator from a type specification.

type AttributeComputer

type AttributeComputer interface {
	// Compute calculates derived values for all records, returning a value per record.
	// The returned slice has one entry per record. A nil second return means no null handling.
	Compute(records []*Record, field string) ([]float64, error)
}

AttributeComputer computes derived attribute values for each record.

type AttributeFactory

type AttributeFactory func(attr *types.Attribute, schema *encoding.Schema) (AttributeComputer, error)

AttributeFactory creates an AttributeComputer from a type specification.

type BitsetSet added in v0.8.3

type BitsetSet struct {
	// contains filtered or unexported fields
}

BitsetSet stores membership as a packed bit per dictionary id.

Memory: dictSize bits, rounded up to a uint64 word. A categorical_u32 dict at its 4 294 967 295 cap would be ~512 MiB, but in practice categorical dicts run much smaller — a 65 536-entry categorical_u16 is 8 KiB.

func (*BitsetSet) Add added in v0.8.3

func (b *BitsetSet) Add(id uint32)

Add marks id as present. Idempotent.

func (*BitsetSet) Contains added in v0.8.3

func (b *BitsetSet) Contains(id uint32) bool

Contains reports whether id is set. Out-of-range ids return false without panicking — the loader can build a bitset sized to the dictionary, but records read against a later, larger dictionary on a shard archive could in principle present an id beyond the bitset.

func (*BitsetSet) Kind added in v0.8.3

func (b *BitsetSet) Kind() string

func (*BitsetSet) Len added in v0.8.3

func (b *BitsetSet) Len() int

type ExprFunction added in v0.7.0

type ExprFunction struct {
	Name string
	Fn   any
}

ExprFunction is the runtime-side mirror of pulse.ExprFunction. The processor injects every entry into the expr-lang environment when compiling ATTR_FORMULA / FILTER_EXPRESSION expressions.

type ExtensionAware added in v0.7.0

type ExtensionAware interface {
	SetExtensions(r *ExtensionRegistry)
}

ExtensionAware is the optional interface that AttributeComputer / FiltererBuilder instances implement when they want the Processor to inject the live ExtensionRegistry after construction. The formula attribute and expression filterer use this hook to expose embedder-registered ExprFunctions and LookupTables to the expr environment.

type ExtensionRegistry added in v0.7.0

type ExtensionRegistry struct {
	Aggregators map[types.AggregationType]AggregatorFactory
	Attributes  map[types.AttributeType]AttributeFactory
	Filterers   map[types.FiltererType]FiltererFactory
	Groupers    map[types.GroupType]GrouperFactory
	Windows     map[types.WindowType]window.WindowFactory
	Features    map[types.FeatureType]feature.Factory
	RowTests    map[types.TestType]RowTestFactory
	PostTests   map[types.TestType]PostTestFactory

	// Streamable is the per-(category, name) override consulted by
	// IsStreamable. Built-in entries are not stored here; the
	// fallback path consults the per-type Streamable() method.
	Streamable map[string]bool

	// ExprFunctions are merged into the runtime expression environment
	// used by ATTR_FORMULA and FILTER_EXPRESSION. Each entry is
	// callable from a request expression under its declared Name. The
	// shape mirrors pulse.ExprFunction (pulse package can't be
	// imported here without a cycle) — the public pulse.ExprFunction
	// is the canonical embedder-facing surface.
	ExprFunctions []ExprFunction

	// LookupTables are exposed to the runtime expression environment
	// via the built-in lookup() function. Mirrors pulse.LookupTable.
	LookupTables map[string]LookupTable

	// LabelTables are the string-valued ID→label maps consumed by the
	// output-time label resolver. Mirrors pulse.LabelTable. Indexed
	// by the user-facing table name.
	LabelTables map[string]LabelTable

	// FieldInputs is the per-(category, name) field-introspection
	// callback consulted by the buffered-projection extractor
	// (NeededFields). When the registry contains an entry for an
	// extension operator the projection extractor calls the callback
	// with the operator's raw Params; otherwise the extractor widens
	// the projection to "every field" so the runtime stays correct
	// for embedders that haven't opted in.
	FieldInputs map[string]FieldInputsFunc
}

ExtensionRegistry holds per-Service overlays for every operator category that supports the public extension API. A nil receiver is the no-extension case — all Lookup methods fall through to the built-in package-level registries.

The overlay maps are read-only after pulse.New populates them; the runtime never mutates them. Two Service instances with different Extensions inputs hold distinct ExtensionRegistry values, so a process can host more than one Pulse with disjoint extension sets.

func (*ExtensionRegistry) CustomAggregatorNames added in v0.7.0

func (r *ExtensionRegistry) CustomAggregatorNames() []types.AggregationType

CustomAggregatorNames returns the overlay-only aggregator names in sorted-ish order (map iteration; callers sort if needed). Used by manifest emission to list extension-shipped operators separately from built-ins.

func (*ExtensionRegistry) CustomAttributeNames added in v0.7.0

func (r *ExtensionRegistry) CustomAttributeNames() []types.AttributeType

func (*ExtensionRegistry) CustomFeatureNames added in v0.7.0

func (r *ExtensionRegistry) CustomFeatureNames() []types.FeatureType

func (*ExtensionRegistry) CustomFiltererNames added in v0.7.0

func (r *ExtensionRegistry) CustomFiltererNames() []types.FiltererType

func (*ExtensionRegistry) CustomGrouperNames added in v0.7.0

func (r *ExtensionRegistry) CustomGrouperNames() []types.GroupType

func (*ExtensionRegistry) CustomPostTestNames added in v0.7.0

func (r *ExtensionRegistry) CustomPostTestNames() []types.TestType

CustomPostTestNames returns the tier-2 overlay-only test names.

func (*ExtensionRegistry) CustomRowTestNames added in v0.7.0

func (r *ExtensionRegistry) CustomRowTestNames() []types.TestType

CustomRowTestNames returns the tier-1 overlay-only test names.

func (*ExtensionRegistry) CustomWindowNames added in v0.7.0

func (r *ExtensionRegistry) CustomWindowNames() []types.WindowType

func (*ExtensionRegistry) ExprOptions added in v0.7.0

func (r *ExtensionRegistry) ExprOptions() []expr.Option

ExprOptions returns the expr-lang Option slice an ExtensionRegistry contributes to ATTR_FORMULA / FILTER_EXPRESSION compilation. A nil receiver and a registry with no expression-side entries both return nil — the caller's base options stay untouched.

Two contributions:

  • One expr.Function per ExprFunctions entry, registered under its declared Name.
  • The built-in `lookup(table, keys...)` function when at least one LookupTable is registered. The function resolves keys against the named table and returns a float64 on hit; PULSE_LOOKUP_MISS on miss; PULSE_LOOKUP_TABLE_UNKNOWN when the table is not registered.

Expressions in cohorts with no lookup tables registered still reference `lookup` legally — the function returns PULSE_LOOKUP_TABLE_UNKNOWN at evaluation time. The compile-time typecheck (E8) catches static misuse.

func (*ExtensionRegistry) FeatureFactories added in v0.7.0

func (r *ExtensionRegistry) FeatureFactories() map[types.FeatureType]feature.Factory

FeatureFactories returns the overlay map for feature operators. Nil-receiver-safe.

func (*ExtensionRegistry) FieldInputsFor added in v0.8.0

func (r *ExtensionRegistry) FieldInputsFor(category, name string, raw json.RawMessage) ([]string, bool)

FieldInputsFor consults the FieldInputs overlay for the operator identified by (category, name). Returns (inputs, true) when the registration supplied a callback, ([], true) when no extra fields are read, or (nil, false) when the operator is custom but has no registered callback — caller should treat that as "can't introspect" and widen the projection.

Built-in operators are not stored in this map; callers should only reach FieldInputsFor for extension-resolved operators.

func (*ExtensionRegistry) HasAggregator added in v0.7.0

func (r *ExtensionRegistry) HasAggregator(t types.AggregationType) bool

HasAggregator reports whether name resolves either via overlay or built-in registry. Same shape for the remaining categories.

func (*ExtensionRegistry) HasAttribute added in v0.7.0

func (r *ExtensionRegistry) HasAttribute(t types.AttributeType) bool

func (*ExtensionRegistry) HasFeature added in v0.7.0

func (r *ExtensionRegistry) HasFeature(t types.FeatureType) bool

func (*ExtensionRegistry) HasFilterer added in v0.7.0

func (r *ExtensionRegistry) HasFilterer(t types.FiltererType) bool

func (*ExtensionRegistry) HasGrouper added in v0.7.0

func (r *ExtensionRegistry) HasGrouper(t types.GroupType) bool

func (*ExtensionRegistry) HasPostTest added in v0.7.0

func (r *ExtensionRegistry) HasPostTest(t types.TestType) bool

func (*ExtensionRegistry) HasRowTest added in v0.7.0

func (r *ExtensionRegistry) HasRowTest(t types.TestType) bool

func (*ExtensionRegistry) HasWindow added in v0.7.0

func (r *ExtensionRegistry) HasWindow(t types.WindowType) bool

func (*ExtensionRegistry) IsStreamable added in v0.7.0

func (r *ExtensionRegistry) IsStreamable(category, name string) bool

IsStreamable consults the Streamable overlay first, then the built-in per-type Streamable() method. Unknown categories return false. The cross-check between this and the runtime path lives in the per-category integration phases.

func (*ExtensionRegistry) LookupAggregator added in v0.7.0

func (r *ExtensionRegistry) LookupAggregator(t types.AggregationType) (AggregatorFactory, bool)

LookupAggregator returns the factory for an aggregator type. The overlay map wins over the built-in registry. The boolean second return is the standard "found" signal.

func (*ExtensionRegistry) LookupAttribute added in v0.7.0

func (r *ExtensionRegistry) LookupAttribute(t types.AttributeType) (AttributeFactory, bool)

LookupAttribute returns the factory for an attribute type. Overlay wins.

func (*ExtensionRegistry) LookupFeature added in v0.7.0

func (r *ExtensionRegistry) LookupFeature(t types.FeatureType) (feature.Factory, bool)

LookupFeature returns the factory for a feature type. Overlay wins. Falls through to feature.Lookup for built-ins.

func (*ExtensionRegistry) LookupFilterer added in v0.7.0

func (r *ExtensionRegistry) LookupFilterer(t types.FiltererType) (FiltererFactory, bool)

LookupFilterer returns the factory for a filterer type. Overlay wins.

func (*ExtensionRegistry) LookupGrouper added in v0.7.0

func (r *ExtensionRegistry) LookupGrouper(t types.GroupType) (GrouperFactory, bool)

LookupGrouper returns the factory for a grouper type. Overlay wins.

func (*ExtensionRegistry) LookupPostTest added in v0.7.0

func (r *ExtensionRegistry) LookupPostTest(t types.TestType) (PostTestFactory, bool)

LookupPostTest returns the tier-2 post-test factory for a test type. Overlay wins.

func (*ExtensionRegistry) LookupRowTest added in v0.7.0

func (r *ExtensionRegistry) LookupRowTest(t types.TestType) (RowTestFactory, bool)

LookupRowTest returns the tier-1 row-test factory for a test type. Overlay wins.

func (*ExtensionRegistry) LookupWindow added in v0.7.0

LookupWindow returns the factory for a window type. Overlay wins. Falls through to window.Lookup for built-ins.

func (*ExtensionRegistry) WindowFactories added in v0.7.0

func (r *ExtensionRegistry) WindowFactories() map[types.WindowType]window.WindowFactory

WindowFactories returns the overlay map for window operators, or nil when the registry has no entries. Safe to call on a nil receiver — the runtime treats nil as "no overlay" and falls through to the built-in window registry.

type FieldInputsFunc added in v0.8.0

type FieldInputsFunc func(raw json.RawMessage) []string

FieldInputsFunc reports the additional source-field names an extension operator reads beyond the spec's explicit Field/Field2/ PartitionBy/etc. references. raw carries the operator's Params block (may be nil for filterers). Return value is consumed by the buffered-projection extractor; nil/empty means "no extra fields."

type FieldSet added in v0.8.0

type FieldSet map[string]struct{}

FieldSet is an order-insignificant set of schema field names that must be decoded into each Record during the buffered Process path. The sentinel "*" entry means "no projection — keep every field" and short-circuits Has() to true; callers that produced a wide set should fall back to the full-decode path rather than try to project.

func NeededFields added in v0.8.0

func NeededFields(req *types.Request, schema *encoding.Schema, ext *ExtensionRegistry) FieldSet

NeededFields walks req against schema and returns the set of source field names that the buffered or streaming path must decode into each Record so every requested operator output computes correctly.

When extraction can't prove completeness (malformed expression, extension operator without a FieldInputs hook) the returned set is widened and callers should fall back to the full-decode path.

The ext argument may be nil — in that case any custom operator in req that resolves only via overlay would still produce a wide set because we'd have no introspection. Built-in operators are fully introspectable here.

func NewFieldSet added in v0.8.0

func NewFieldSet(hint int) FieldSet

NewFieldSet returns an empty FieldSet sized for hint members.

func (FieldSet) Add added in v0.8.0

func (s FieldSet) Add(name string)

Add inserts name into the set. Empty names are ignored.

func (FieldSet) Has added in v0.8.0

func (s FieldSet) Has(name string) bool

Has reports whether name is in the set. Returns true for every name when the set has been widened.

func (FieldSet) IsWide added in v0.8.0

func (s FieldSet) IsWide() bool

IsWide reports whether the set has been widened to cover every field.

func (FieldSet) Len added in v0.8.0

func (s FieldSet) Len() int

Len returns the number of explicitly listed fields. Returns the schema field count when the set is wide (caller-supplied schema resolves the count; pass len(schema.Fields) at the call site).

func (FieldSet) Widen added in v0.8.0

func (s FieldSet) Widen()

Widen marks the set as covering every field. After this call Has() returns true for any name. Used when extraction can't be proven complete — for example a malformed expression or an extension operator without an introspection hook.

type FilterFunc

type FilterFunc func(record *Record) (bool, error)

FilterFunc evaluates whether a record passes a filter.

func BuildFilters added in v0.7.0

func BuildFilters(filterers []*types.Filterer, schema *encoding.Schema, exts *ExtensionRegistry) ([]FilterFunc, error)

BuildFilters compiles a slice of types.Filterer into runtime FilterFuncs against the given schema. Mirrors the per-Processor helper so non-Processor consumers (FacetSchema, Sample variants) can reuse the same factory + extension semantics. Returns nil when filterers is empty.

func BuildMemberSetPredicate added in v0.8.3

func BuildMemberSetPredicate(set MemberSet, schema *encoding.Schema, fieldName string) (FilterFunc, error)

BuildMemberSetPredicate returns a per-row FilterFunc that reports whether the record's value for fieldName is a member of set. The concrete predicate is selected once at construction by type-switching on the set + schema field type — the returned closure carries no interface dispatch on the hot path.

Float fields are rejected, matching LoadMemberSetFromReader. Unknown field names return SERVICE_VALIDATION.

type FiltererBuilder

type FiltererBuilder interface {
	// Build creates a filter function from the filter specification.
	Build(filter *types.Filterer, schema *encoding.Schema) (FilterFunc, error)
}

FiltererBuilder constructs a FilterFunc from a filter specification.

type FiltererFactory

type FiltererFactory func() FiltererBuilder

FiltererFactory creates a FiltererBuilder from a type specification.

type Grouper

type Grouper interface {
	// Group partitions the records by the specified field, returning a map of group key to records.
	Group(records []*Record, field string) (map[string][]*Record, error)
}

Grouper partitions records into named groups.

type GrouperFactory

type GrouperFactory func(grp *types.Group, schema *encoding.Schema) (Grouper, error)

GrouperFactory creates a Grouper from a type specification.

type HashJoinIterator added in v0.10.0

type HashJoinIterator struct {
	// contains filtered or unexported fields
}

HashJoinIterator wraps a left-side iterator and yields joined records on each Next() call. The right side is materialised into a hashmap keyed by the composite (LeftField → string) tuple. Inner join only — non-matching left rows are dropped silently.

Memory: O(right_rows × per_record_state). The orchestrator picks the smaller side as the build side; in v1 this is always the caller-provided "right" path. A future iteration adds a CountRecords pre-pass to swap sides automatically.

func NewHashJoinIterator added in v0.10.0

func NewHashJoinIterator(left RecordIterator, right []*Record, leftSchema, rightSchema *encoding.Schema, spec *types.JoinSpec) (*HashJoinIterator, *encoding.Schema, error)

NewHashJoinIterator builds the right-side hash table eagerly from the supplied right-iterator, then returns an iterator that walks the left side. Each left row's composite key is looked up; matched pairs are emitted via Next() in (left, right[0]), (left, right[1]) order.

func (*HashJoinIterator) Next added in v0.10.0

func (h *HashJoinIterator) Next() bool

Next advances to the next joined record. Returns false when the left side is exhausted and the per-left-row match buffer is empty.

func (*HashJoinIterator) Record added in v0.10.0

func (h *HashJoinIterator) Record() *Record

Record returns the current joined record. Combines the left iterator's current record with the matched right record indexed by cursor-1 (Next() already advanced past it).

func (*HashJoinIterator) Reset added in v0.10.0

func (h *HashJoinIterator) Reset()

Reset rewinds the left iterator and clears per-row state. Right- hash retention is intentional: build-once + scan-many.

type LabelResolver added in v0.10.1

type LabelResolver struct {
	// contains filtered or unexported fields
}

LabelResolver translates raw categorical values to display labels per a request's LabelBinding slice. It is constructed once per Request via BuildLabelResolver and consumed at every output site (Sample rows, FacetField counts, AGG_FREQUENCY result keys, grouped result keys, Export cells).

Resolver methods are not safe for concurrent use. Each Request owns its own resolver; shard-parallel reducers either build one resolver per partial and merge warnings at the end, or serialise the output stage.

Replace-mode collision behaviour: when two distinct source values map to the same label string, the output disambiguates with the source value in parentheses ("United States (US)"). Rows-backed tables get a pre-pass so both colliding sources render symmetrically; function-driven tables detect collisions online, so the first source seen renders cleanly and the second renders disambiguated (asymmetric). The PULSE_LABEL_COLLISION warning names every involved source value so callers can audit.

func BuildLabelResolver added in v0.10.1

func BuildLabelResolver(bindings []*types.LabelBinding, registry *ExtensionRegistry) (*LabelResolver, error)

BuildLabelResolver constructs a LabelResolver from the request bindings and the runtime extension registry. Returns (nil, nil) when bindings is empty so callers can guard cheaply with a single nil check.

Returns PULSE_LABEL_TABLE_UNKNOWN if a binding references a table absent from the registry. Other binding-shape issues (unknown field, non-categorical field, augment collision) are validated upstream by descriptor.ValidateLabels — this constructor trusts schema-level checks.

func (*LabelResolver) Apply added in v0.10.1

func (r *LabelResolver) Apply(field, raw string) (out string, sibling string, ok bool)

Apply translates a raw categorical value through the binding for the named field. Behaviour by mode:

  • Replace: returns (label, sibling="", ok=true) on hit; on miss returns (raw, "", false) and records a miss for the warning summary. The label may be disambiguated as "label (raw)" when two source values collide on the same label.
  • Augment: returns (raw, label, ok=true) on hit; on miss returns (raw, "", false) and records a miss.

Returns (raw, "", false) when the resolver carries no binding for the field. Callers should typically gate with Has before calling.

func (*LabelResolver) AugmentField added in v0.10.1

func (r *LabelResolver) AugmentField(field string) string

AugmentField returns the augment-mode sibling column name for the named field, or empty string when the field has no augment binding.

func (*LabelResolver) FieldsWithAugment added in v0.10.1

func (r *LabelResolver) FieldsWithAugment() []string

FieldsWithAugment returns the field names that should emit a "<field>_label" sibling column. Stable in iteration order of the underlying map (used only for output schema overlay, where stable ordering is enforced by caller).

func (*LabelResolver) Has added in v0.10.1

func (r *LabelResolver) Has(field string) bool

Has reports whether the resolver carries any binding for the named field. Cheap and nil-safe.

func (*LabelResolver) Mode added in v0.10.1

func (r *LabelResolver) Mode(field string) types.LabelMode

Mode returns the binding mode for the named field. Returns the zero LabelMode when no binding exists; callers should gate with Has.

func (*LabelResolver) Warnings added in v0.10.1

func (r *LabelResolver) Warnings() []ResolverWarning

Warnings flushes the per-binding collision and miss summaries into envelope-ready warning records. Safe to call once at the end of a run; subsequent calls return the same set.

type LabelTable added in v0.10.1

type LabelTable struct {
	Rows   map[string]string
	Lookup func(key string) (string, bool, error)
}

LabelTable is the runtime-side mirror of pulse.LabelTable. The label resolver consults this map when a request supplies a LabelBinding referencing the table by name; exactly one of Rows or Lookup is populated per table.

type LoadMemberSetResult added in v0.8.3

type LoadMemberSetResult struct {
	Set             MemberSet
	Lines           int
	NotInDictionary int
	Invalid         int
}

LoadMemberSetResult is the per-load report returned by LoadMemberSetFromReader. Lines is total non-blank lines processed; NotInDictionary counts categorical lookups whose value was absent from the field's dictionary (those keys can never match a record so they're dropped); Invalid counts numeric lines that failed to parse on the integer path. Callers can decide whether to surface each counter as a warning or hard error.

func LoadMemberSetFromReader added in v0.8.3

func LoadMemberSetFromReader(r io.Reader, schema *encoding.Schema, fieldName string) (LoadMemberSetResult, error)

LoadMemberSetFromReader reads newline-delimited values from r and builds the best MemberSet impl for the named field on schema. Lines are trimmed of surrounding whitespace (covers CRLF). A UTF-8 BOM is stripped from the very first line. Blank lines are skipped silently — they don't count against Lines.

Dispatch by field type:

  • categorical_u{8,16,32}: parse line as string, resolve via dict.IDFor → BitsetSet. Lines not in the dictionary are counted in NotInDictionary and dropped — they can never match a record.
  • u8/u16/u32/u64, nullable_u4/u8/u16, nullable_bool, packed_bool, date: parse as uint64 → Uint64Set. ParseUint failures are counted in Invalid and dropped.
  • decimal128 / nullable_decimal128: parse as string → StringSet. The cohort filter compares the literal text against the decimal128 wide value's String() form; exact match required.

Float fields are rejected with SERVICE_VALIDATION. Unknown field names return SERVICE_VALIDATION.

type LookupTable added in v0.7.0

type LookupTable struct {
	Rows   map[string]float64
	Lookup func(keys ...string) (float64, bool, error)
}

LookupTable is the runtime-side mirror of pulse.LookupTable. The expr environment's lookup() built-in consults this map; exactly one of Rows or Lookup is populated per table.

type MemberSet added in v0.8.3

type MemberSet interface {
	Len() int
	Kind() string
}

MemberSet is a read-only set of field values used by the cohort filter include-from path to test record membership. Three concrete impls back this interface, picked by field type at load time so the per-row lookup runs on the fastest path available:

  • *BitsetSet — categorical fields. One bit per dictionary entry. Lookup is a single word load + bit test, no hashing, no compare.
  • *Uint64Set — integer / date / bit-packed integer fields. Backed by map[uint64]struct{}; avoids the per-row string allocation a string-keyed map would force.
  • *StringSet — fallback (decimal128, any non-categorical string surface). Backed by map[string]struct{}.

Float fields (f32/f64) are rejected at load time. Exact-equality membership on floats is a footgun; the caller is expected to use the expression filter for numeric ranges.

The interface itself carries only metadata; the per-row hot path goes through the concrete type via the predicate built by BuildMemberSetPredicate, which type-switches once at construction and returns a closure with no interface dispatch in the loop.

type MergeableAggregator added in v0.8.0

type MergeableAggregator interface {
	OnlineAggregator
	MergeOnline(other OnlineAggregator) error
}

MergeableAggregator is the optional sibling of OnlineAggregator for aggregators whose running state is associative+commutative (or associative under a parallel-friendly recurrence). Implementations fold another instance's state into the receiver without rescanning any rows, enabling per-shard parallel execution: each worker computes its partial state from its shard, the orchestrator merges partials in shard insertion order, then Finalize() emits the aggregate.

Implementations MUST be safe to call MergeOnline repeatedly. The receiver absorbs other's state; other is left in an unspecified state and should not be reused. Returning a non-nil error indicates the two instances were not constructed from compatible specs (a programming bug — the orchestrator only merges aggregators constructed from the same Aggregation spec).

MergeOnline preserves the mathematical contract of Finalize:

  • COUNT / SUM / NULL_COUNT: sum the counters.
  • MIN / MAX: pick the extremum, accounting for the "seen" flag.
  • MEAN (Welford): combine (n, mean, M2) via the Chan-Welford parallel formula so the merged mean equals the single-pass mean to within ULP on well-conditioned inputs.
  • FREQUENCY: union the per-value count maps; Finalize then picks the max as it does in the serial path.

Aggregators whose state is fundamentally non-mergeable (percentile/median/zscore — they require a sorted view) MUST NOT implement this interface; the parallel orchestrator falls through to the serial shard iterator for such requests.

type OnlineAggregator added in v0.2.0

type OnlineAggregator interface {
	// UpdateRow folds a single record into the running state. The field
	// argument is the field the aggregator operates on; for COUNT this
	// can be ignored (COUNT counts rows, not values).
	UpdateRow(record *Record, field string) error
	// Finalize returns the final aggregated value and resets internal
	// state. It must be safe to call exactly once after streaming.
	Finalize() (float64, error)
}

OnlineAggregator is the optional sibling of Aggregator that supports single-pass streaming computation without materializing the full record set. Aggregators that can produce their result via O(1) (or O(unique)) state per row implement this interface so the orchestrator can stream the iterator directly when every aggregation in a request is online.

Implementations MUST handle null values internally: callers invoke UpdateRow once per record (after filters), and the aggregator decides whether the row contributes (e.g., COUNT counts every row, SUM skips nulls on its target field).

Finalize is called once after the iterator is exhausted; it returns the aggregated value. Implementations MUST be safe to call Finalize without any UpdateRow calls (i.e., empty input case).

type PostTest added in v0.3.0

type PostTest interface {
	Run(rows []map[string]any) (*types.TestResult, error)
}

PostTest is a tier-2 statistical test consumed after the window stage on the materialized result row set. Run is called once over the post- pipeline data and returns a TestResult; tier-2 tests are always buffered.

type PostTestFactory added in v0.3.0

type PostTestFactory func(spec *types.Test, schema *encoding.Schema) (PostTest, error)

PostTestFactory creates a PostTest from a specification.

type ProcessPath added in v0.2.0

type ProcessPath int

ProcessPath identifies which execution path Process took. The streaming path runs aggregations in a single pass over the iterator without materializing the full record set. The buffered path collects every record into a slice first. This is exposed primarily for tests and benchmarks; production callers do not need it.

const (
	// PathUnknown is the zero value; set before the first Process call.
	PathUnknown ProcessPath = iota
	// PathBuffered is the legacy materialize-then-aggregate path. It is
	// always correct and is the fallback whenever streaming would be
	// unsafe (groups, attributes, non-online aggregators, expression
	// filters that need the full set, etc.).
	PathBuffered
	// PathStreaming runs aggregations in a single pass over the iterator,
	// folding each record into the running state of every aggregator.
	// Selected only when every aggregation is an OnlineAggregator and
	// the request has no groups, no attributes, and only row-level
	// filters.
	PathStreaming
)

func (ProcessPath) String added in v0.2.0

func (p ProcessPath) String() string

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor is the single dynamic processing engine for Pulse. It handles filtering, attribute computation, grouping, and aggregation over record iterators backed by .pulse encoded data.

func NewProcessor

func NewProcessor(schema *encoding.Schema) *Processor

NewProcessor creates a new Processor for the given schema. The resulting Processor uses only Pulse-shipped operator factories; embedder extensions land via NewProcessorWithExtensions.

func NewProcessorWithExtensions added in v0.7.0

func NewProcessorWithExtensions(schema *encoding.Schema, exts *ExtensionRegistry) *Processor

NewProcessorWithExtensions creates a Processor whose operator lookups consult exts before falling through to the built-in registries. Passing nil is equivalent to NewProcessor.

func (*Processor) LastPath added in v0.2.0

func (p *Processor) LastPath() ProcessPath

LastPath returns the ProcessPath taken by the most recent Process call on this processor instance. Returns PathUnknown before any call. Used by tests to verify that the orchestrator selected the streaming path for online-only requests; not part of the stable API contract.

func (*Processor) Process

func (p *Processor) Process(ctx context.Context, req *types.Request, iter RecordIterator) (*types.Response, error)

Process executes a single request against the record iterator.

Selects between two execution strategies:

  1. Streaming: when every aggregation supports OnlineAggregator and the request has no groups, no attributes, the iterator is consumed in one pass. Filters are applied per row before each aggregator folds the row into its running state. Memory is O(distinct values) for FREQUENCY/MODE/DISTINCT_COUNT and O(1) for everything else.

  2. Buffered: the legacy path. Every record is collected into a slice first, then filters, attributes, grouping, and aggregations run over the materialized set. Memory is O(rows). Always correct.

Output is identical between paths to float64 precision on well-conditioned inputs (variance/stddev/skewness/kurtosis use Welford-Pébaÿ recurrences in the streaming path).

func (*Processor) ProcessComposed

func (p *Processor) ProcessComposed(ctx context.Context, composed *types.ComposedRequest, records []*Record) ([]*types.Response, error)

ProcessComposed executes multiple requests against a shared record set.

type Record

type Record struct {
	// contains filtered or unexported fields
}

Record represents a single data row with field accessors. It provides both numeric and string access for processing operations.

func NewRecord

func NewRecord(schema *encoding.Schema, values map[string]float64) *Record

NewRecord creates a record with the given schema and field values.

func NewRecordWithNulls

func NewRecordWithNulls(schema *encoding.Schema, values map[string]float64, nulls map[string]bool) *Record

NewRecordWithNulls creates a record with explicit null tracking.

func NewRecordWithWide added in v0.2.0

func NewRecordWithWide(schema *encoding.Schema, values map[string]float64, nulls map[string]bool, wide map[string]any) *Record

NewRecordWithWide creates a record with typed wide values for fields that do not fit in float64 (decimal128).

func NewReusableRecord added in v0.2.0

func NewReusableRecord(schema *encoding.Schema) *Record

NewReusableRecord constructs a Record whose internal maps are sized for the given schema and intended to be reused across many ReadRecordReused calls. Returns a Record that callers must NOT retain past the next iteration step.

func RecordsFromChainRows added in v0.10.0

func RecordsFromChainRows(rows []map[string]any, schema *encoding.Schema) ([]*Record, error)

RecordsFromChainRows materializes records for the next chain stage from a Response.Data slice + the synthesised schema. Categorical fields land in the dictionary lazily as new keys arrive. Numeric cells are converted to float64; missing / nil cells become nulls.

The returned records share the schema reference; the schema's dictionaries are mutated as new keys are observed.

func (*Record) AllValues

func (r *Record) AllValues() map[string]any

AllValues returns all field values as a map (for expression evaluation).

The returned map is cached on the Record after the first call and reused on subsequent calls; callers MUST NOT mutate it. If a caller mutates the underlying values map directly (e.g., the processor injecting computed attributes), it must call invalidateAllValuesCache to discard the cache.

func (*Record) ClearForRow added in v0.2.0

func (r *Record) ClearForRow()

ClearForRow implements encoding.ReusableRecord. Resets per-row state so the next ReadRecordReused call starts from a clean slate while keeping the underlying maps allocated.

values is left intact because every field is overwritten on every row. nulls and wide are cleared because their entries are sparse.

func (*Record) NumericValue

func (r *Record) NumericValue(name string) (float64, bool)

NumericValue returns the numeric value for the named field. Returns the value and true if present and non-null, or 0 and false if null or missing.

func (*Record) Schema

func (r *Record) Schema() *encoding.Schema

Schema returns the record's schema.

func (*Record) Set added in v0.2.0

func (r *Record) Set(name string, value float64)

Set assigns a numeric value to the named field on this record. It clears any prior null marker and invalidates the AllValues cache. Used by pre-filter feature operators to inject derived columns into the record stream so downstream stages (filters, attributes, groupers, aggregators) can reference them by label.

func (*Record) SetNull added in v0.2.0

func (r *Record) SetNull(name string)

SetNull marks the named field as null. Used by feature operators to propagate input nulls into the derived column.

func (*Record) SetNullField added in v0.2.0

func (r *Record) SetNullField(name string)

SetNullField implements encoding.ReusableRecord. Marks a field as null in the reuse path; does not invalidate the AllValues cache (reuse path resets the cache once per row via ClearForRow).

func (*Record) SetNumeric added in v0.2.0

func (r *Record) SetNumeric(name string, value float64)

SetNumeric implements encoding.ReusableRecord. Assigns a numeric value without touching the null marker or invalidating the AllValues cache. Intended only for the streaming reuse path that calls ClearForRow before each row.

func (*Record) SetWide added in v0.2.0

func (r *Record) SetWide(name string, v any)

SetWide assigns a typed wide value to a field. Used by readers and feature operators that produce non-float values (decimal128).

func (*Record) SetWideField added in v0.2.0

func (r *Record) SetWideField(name string, v any)

SetWideField implements encoding.ReusableRecord. Stores a typed wide value (decimal128) without invalidating the AllValues cache.

func (*Record) StringValue

func (r *Record) StringValue(name string) (string, bool)

StringValue returns the resolved string value for categorical fields. For non-categorical fields, returns the empty string and false.

func (*Record) WideValue added in v0.2.0

func (r *Record) WideValue(name string) (any, bool)

WideValue returns the typed wide value for the named field, if present. Wide values are populated for decimal128 fields.

type RecordIterator

type RecordIterator interface {
	// Next advances to the next record. Returns false when exhausted.
	Next() bool
	// Record returns the current record. Only valid after Next returns true.
	Record() *Record
	// Reset resets the iterator to the beginning.
	Reset()
}

RecordIterator provides sequential access to records.

type ResolverWarning added in v0.10.1

type ResolverWarning struct {
	Code    errors.Code
	Message string
	Details map[string]any
}

ResolverWarning captures a single resolver-side diagnostic. The caller folds these into the response envelope after Process / Sample / Facet / Export completes.

type ReusableIterator added in v0.2.0

type ReusableIterator interface {
	SetReuse(bool)
}

ReusableIterator is an optional interface implemented by iterators that can return the same Record pointer across Next() calls, refreshing its values/nulls/wide maps in place. Streaming consumers that consume each record inline (no slice retention) can opt in to drop the per-row map allocations.

Callers MUST consume each record before invoking Next() again — the next call will overwrite the Record's contents.

type RowLocalAttribute added in v0.2.0

type RowLocalAttribute interface {
	// Row computes this attribute's value for a single record and field.
	// For a pure RowLocalAttribute, no PrePass call is needed; for a
	// TwoPassAttribute, Row must be called only after Finalize.
	Row(record *Record, field string) (float64, error)
}

RowLocalAttribute is the optional sibling of AttributeComputer for attributes whose value depends only on the current row (no first-pass population stats needed). Streaming paths drive RowLocalAttribute.Row inline instead of buffering the full record set.

FORMULA and DATE_PART implement this interface; ZSCORE / TSCORE / NORMALIZED implement TwoPassAttribute (a superset). PERCENTILE does NOT implement either — it needs a sorted view of every value, which forces the buffered path.

type RowTest added in v0.3.0

type RowTest interface {
	UpdateRow(record *Record) error
	Finalize() (*types.TestResult, error)
}

RowTest is a tier-1 statistical test consumed during the row scan alongside online aggregators. Implementations fold each filter-passing record into running state via UpdateRow and produce a TestResult via Finalize after the iterator is exhausted.

Per-test state is per-instance; callers construct a fresh instance per Process call via a RowTestFactory.

UpdateRow MUST be safe to call zero times (Finalize handles the empty input case). Implementations decide whether a row contributes; null values are typically skipped via the Record's NumericValue / StringValue helpers.

type RowTestFactory added in v0.3.0

type RowTestFactory func(spec *types.Test, schema *encoding.Schema) (RowTest, error)

RowTestFactory creates a RowTest from a specification.

type SliceIterator

type SliceIterator struct {
	// contains filtered or unexported fields
}

SliceIterator implements RecordIterator over a slice of records.

func NewSliceIterator

func NewSliceIterator(records []*Record) *SliceIterator

NewSliceIterator creates an iterator over the given records.

func (*SliceIterator) Next

func (it *SliceIterator) Next() bool

Next advances to the next record.

func (*SliceIterator) Record

func (it *SliceIterator) Record() *Record

Record returns the current record.

func (*SliceIterator) Reset

func (it *SliceIterator) Reset()

Reset resets the iterator to the beginning.

type StreamingGrouper added in v0.2.0

type StreamingGrouper interface {
	// KeyForRow returns the group key string for record's value of field.
	// ok=false signals the row should be skipped (e.g. null value);
	// ok=true means the key is valid for bucketing.
	KeyForRow(record *Record, field string) (key string, ok bool, err error)
}

StreamingGrouper is the optional sibling of Grouper for groupers that can derive a partition key from a single record without seeing the full record set. CATEGORY / RANGE / ROUNDED implement this interface; QUANTILE and DATE require finalize-time work over the full input.

Implementations MUST be safe to call repeatedly; the streaming path invokes KeyForRow once per filter-passing record and uses the key to index a per-group online aggregator bucket.

type StringSet added in v0.8.3

type StringSet struct {
	// contains filtered or unexported fields
}

StringSet stores string membership (decimal128 fields, fallback path).

func (*StringSet) Add added in v0.8.3

func (s *StringSet) Add(v string)

func (*StringSet) Contains added in v0.8.3

func (s *StringSet) Contains(v string) bool

func (*StringSet) Kind added in v0.8.3

func (s *StringSet) Kind() string

func (*StringSet) Len added in v0.8.3

func (s *StringSet) Len() int

type TwoPassAttribute added in v0.2.0

type TwoPassAttribute interface {
	RowLocalAttribute
	// PrePass folds a single record's contribution into the running
	// state used to compute population statistics.
	PrePass(record *Record, field string) error
	// Finalize closes the PrePass phase. After Finalize, Row may be
	// called for each record (typically during iter pass 2).
	Finalize() error
}

TwoPassAttribute is the streaming-friendly path for attributes that need population statistics (ZSCORE / TSCORE need mean+stddev, NORMALIZED needs min+max). The orchestrator drives PrePass over every filter-passing record, then Finalize locks the global stats, then Row emits per-record values during a second iter pass.

Mirrors feature.StreamingComputer.PrePass+Finalize+EmitRow so the streaming infrastructure (iter.Reset(), staged passes) is uniform.

Implementations MUST be safe to call PrePass repeatedly; Finalize exactly once between PrePass and Row; and Row only after Finalize. State is per-instance — callers construct a fresh instance per Process call via the AttributeFactory.

type Uint64Set added in v0.8.3

type Uint64Set struct {
	// contains filtered or unexported fields
}

Uint64Set stores integer membership.

func (*Uint64Set) Add added in v0.8.3

func (s *Uint64Set) Add(v uint64)

func (*Uint64Set) Contains added in v0.8.3

func (s *Uint64Set) Contains(v uint64) bool

func (*Uint64Set) Kind added in v0.8.3

func (s *Uint64Set) Kind() string

func (*Uint64Set) Len added in v0.8.3

func (s *Uint64Set) Len() int

Directories

Path Synopsis
Package arena provides a bump-allocator backed by a single contiguous []byte.
Package arena provides a bump-allocator backed by a single contiguous []byte.
Package feature implements the FEAT_* operators that run pre-filter to add derived columns to a record stream.
Package feature implements the FEAT_* operators that run pre-filter to add derived columns to a record stream.
Package regression hosts the REG_* operators that fit a regression model against the filtered record set.
Package regression hosts the REG_* operators that fit a regression model against the filtered record set.
Package window implements the WIN_* window operators for Pulse.
Package window implements the WIN_* window operators for Pulse.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL