processing

package
v0.7.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 15, 2026 License: MIT Imports: 18 Imported by: 0

Documentation

Overview

Package processing provides the single dynamic processing engine for Pulse. It implements aggregators, attributes, filterers, and groupers that operate on record iterators backed by .pulse encoded data.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AggregateDecimalField added in v0.2.0

func AggregateDecimalField(agg types.AggregationType, records []*Record, field string, scale uint8) (decimalAggResult, error)

AggregateDecimalField runs the decimal-aware aggregator for `agg` over the given records, reading the wide map for `field`. Used by the orchestrator when the field type is decimal128 or nullable_decimal128.

func AggregateGeoField added in v0.2.0

func AggregateGeoField(agg types.AggregationType, records []*Record, field string) (any, error)

AggregateGeoField runs the geo aggregator for `agg` over records, reading the wide map for `field`. Returns a typed value (CentroidResult or BBoxResult) the orchestrator emits directly into the response row.

func CanStreamRequest added in v0.2.0

func CanStreamRequest(req *types.Request, schema *encoding.Schema) bool

CanStreamRequest is the exported parity hook used by tests in descriptor/ (which cannot import processing) to confirm that a PredictResult.Streamable value matches the runtime gate. Application code should rely on PredictResult.Streamable, not this helper.

func EnableReuse added in v0.2.0

func EnableReuse(iter RecordIterator)

EnableReuse opts the iterator into per-row Record reuse if it implements ReusableIterator. Safe no-op for iterators that do not support reuse (e.g. SliceIterator, whose records already exist as independent values).

func IsDecimalAggregationSupported added in v0.2.0

func IsDecimalAggregationSupported(agg types.AggregationType) bool

IsDecimalAggregationSupported reports whether agg has a decimal-aware implementation.

func IsGeoAggregation added in v0.2.0

func IsGeoAggregation(agg types.AggregationType) bool

IsGeoAggregation reports whether agg is an AGG_GEO_*.

func StreamabilityKey added in v0.7.0

func StreamabilityKey(category, name string) string

StreamabilityKey is the canonical map key for ExtensionRegistry.Streamable. Format: "<category>|<name>". The category strings are the lowercase singular forms used in error details and the manifest: aggregator, attribute, filterer, grouper, window, feature, test.

Types

type Aggregator

type Aggregator interface {
	// Aggregate computes the aggregation over the given records for the named field.
	Aggregate(records []*Record, field string) (float64, error)
}

Aggregator computes a single aggregate value from a set of records.

type AggregatorFactory

type AggregatorFactory func(agg *types.Aggregation, schema *encoding.Schema) (Aggregator, error)

AggregatorFactory creates an Aggregator from a type specification.

type AttributeComputer

type AttributeComputer interface {
	// Compute calculates derived values for all records, returning a value per record.
	// The returned slice has one entry per record. A nil second return means no null handling.
	Compute(records []*Record, field string) ([]float64, error)
}

AttributeComputer computes derived attribute values for each record.

type AttributeFactory

type AttributeFactory func(attr *types.Attribute, schema *encoding.Schema) (AttributeComputer, error)

AttributeFactory creates an AttributeComputer from a type specification.

type BBoxResult added in v0.2.0

type BBoxResult struct {
	MinLat float64 `json:"min_lat"`
	MinLon float64 `json:"min_lon"`
	MaxLat float64 `json:"max_lat"`
	MaxLon float64 `json:"max_lon"`
}

BBoxResult holds the four corners of an AGG_GEO_BBOX result. Returned as a struct field in the response (not a new field type).

type CentroidResult added in v0.2.0

type CentroidResult struct {
	Lat float64 `json:"lat"`
	Lon float64 `json:"lon"`
}

CentroidResult holds the latitude/longitude of an AGG_GEO_CENTROID result.

type ExprFunction added in v0.7.0

type ExprFunction struct {
	Name string
	Fn   any
}

ExprFunction is the runtime-side mirror of pulse.ExprFunction. The processor injects every entry into the expr-lang environment when compiling ATTR_FORMULA / FILTER_EXPRESSION expressions.

type ExtensionAware added in v0.7.0

type ExtensionAware interface {
	SetExtensions(r *ExtensionRegistry)
}

ExtensionAware is the optional interface that AttributeComputer / FiltererBuilder instances implement when they want the Processor to inject the live ExtensionRegistry after construction. The formula attribute and expression filterer use this hook to expose embedder-registered ExprFunctions and LookupTables to the expr environment.

type ExtensionRegistry added in v0.7.0

type ExtensionRegistry struct {
	Aggregators map[types.AggregationType]AggregatorFactory
	Attributes  map[types.AttributeType]AttributeFactory
	Filterers   map[types.FiltererType]FiltererFactory
	Groupers    map[types.GroupType]GrouperFactory
	Windows     map[types.WindowType]window.WindowFactory
	Features    map[types.FeatureType]feature.Factory
	RowTests    map[types.TestType]RowTestFactory
	PostTests   map[types.TestType]PostTestFactory

	// Streamable is the per-(category, name) override consulted by
	// IsStreamable. Built-in entries are not stored here; the
	// fallback path consults the per-type Streamable() method.
	Streamable map[string]bool

	// ExprFunctions are merged into the runtime expression environment
	// used by ATTR_FORMULA and FILTER_EXPRESSION. Each entry is
	// callable from a request expression under its declared Name. The
	// shape mirrors pulse.ExprFunction (pulse package can't be
	// imported here without a cycle) — the public pulse.ExprFunction
	// is the canonical embedder-facing surface.
	ExprFunctions []ExprFunction

	// LookupTables are exposed to the runtime expression environment
	// via the built-in lookup() function. Mirrors pulse.LookupTable.
	LookupTables map[string]LookupTable
}

ExtensionRegistry holds per-Service overlays for every operator category that supports the public extension API. A nil receiver is the no-extension case — all Lookup methods fall through to the built-in package-level registries.

The overlay maps are read-only after pulse.New populates them; the runtime never mutates them. Two Service instances with different Extensions inputs hold distinct ExtensionRegistry values, so a process can host more than one Pulse with disjoint extension sets.

func (*ExtensionRegistry) CustomAggregatorNames added in v0.7.0

func (r *ExtensionRegistry) CustomAggregatorNames() []types.AggregationType

CustomAggregatorNames returns the overlay-only aggregator names in sorted-ish order (map iteration; callers sort if needed). Used by manifest emission to list extension-shipped operators separately from built-ins.

func (*ExtensionRegistry) CustomAttributeNames added in v0.7.0

func (r *ExtensionRegistry) CustomAttributeNames() []types.AttributeType

func (*ExtensionRegistry) CustomFeatureNames added in v0.7.0

func (r *ExtensionRegistry) CustomFeatureNames() []types.FeatureType

func (*ExtensionRegistry) CustomFiltererNames added in v0.7.0

func (r *ExtensionRegistry) CustomFiltererNames() []types.FiltererType

func (*ExtensionRegistry) CustomGrouperNames added in v0.7.0

func (r *ExtensionRegistry) CustomGrouperNames() []types.GroupType

func (*ExtensionRegistry) CustomPostTestNames added in v0.7.0

func (r *ExtensionRegistry) CustomPostTestNames() []types.TestType

CustomPostTestNames returns the tier-2 overlay-only test names.

func (*ExtensionRegistry) CustomRowTestNames added in v0.7.0

func (r *ExtensionRegistry) CustomRowTestNames() []types.TestType

CustomRowTestNames returns the tier-1 overlay-only test names.

func (*ExtensionRegistry) CustomWindowNames added in v0.7.0

func (r *ExtensionRegistry) CustomWindowNames() []types.WindowType

func (*ExtensionRegistry) ExprOptions added in v0.7.0

func (r *ExtensionRegistry) ExprOptions() []expr.Option

ExprOptions returns the expr-lang Option slice an ExtensionRegistry contributes to ATTR_FORMULA / FILTER_EXPRESSION compilation. A nil receiver and a registry with no expression-side entries both return nil — the caller's base options stay untouched.

Two contributions:

  • One expr.Function per ExprFunctions entry, registered under its declared Name.
  • The built-in `lookup(table, keys...)` function when at least one LookupTable is registered. The function resolves keys against the named table and returns a float64 on hit; PULSE_LOOKUP_MISS on miss; PULSE_LOOKUP_TABLE_UNKNOWN when the table is not registered.

Expressions in cohorts with no lookup tables registered still reference `lookup` legally — the function returns PULSE_LOOKUP_TABLE_UNKNOWN at evaluation time. The compile-time typecheck (E8) catches static misuse.

func (*ExtensionRegistry) FeatureFactories added in v0.7.0

func (r *ExtensionRegistry) FeatureFactories() map[types.FeatureType]feature.Factory

FeatureFactories returns the overlay map for feature operators. Nil-receiver-safe.

func (*ExtensionRegistry) HasAggregator added in v0.7.0

func (r *ExtensionRegistry) HasAggregator(t types.AggregationType) bool

HasAggregator reports whether name resolves either via overlay or built-in registry. Same shape for the remaining categories.

func (*ExtensionRegistry) HasAttribute added in v0.7.0

func (r *ExtensionRegistry) HasAttribute(t types.AttributeType) bool

func (*ExtensionRegistry) HasFeature added in v0.7.0

func (r *ExtensionRegistry) HasFeature(t types.FeatureType) bool

func (*ExtensionRegistry) HasFilterer added in v0.7.0

func (r *ExtensionRegistry) HasFilterer(t types.FiltererType) bool

func (*ExtensionRegistry) HasGrouper added in v0.7.0

func (r *ExtensionRegistry) HasGrouper(t types.GroupType) bool

func (*ExtensionRegistry) HasPostTest added in v0.7.0

func (r *ExtensionRegistry) HasPostTest(t types.TestType) bool

func (*ExtensionRegistry) HasRowTest added in v0.7.0

func (r *ExtensionRegistry) HasRowTest(t types.TestType) bool

func (*ExtensionRegistry) HasWindow added in v0.7.0

func (r *ExtensionRegistry) HasWindow(t types.WindowType) bool

func (*ExtensionRegistry) IsStreamable added in v0.7.0

func (r *ExtensionRegistry) IsStreamable(category, name string) bool

IsStreamable consults the Streamable overlay first, then the built-in per-type Streamable() method. Unknown categories return false. The cross-check between this and the runtime path lives in the per-category integration phases.

func (*ExtensionRegistry) LookupAggregator added in v0.7.0

func (r *ExtensionRegistry) LookupAggregator(t types.AggregationType) (AggregatorFactory, bool)

LookupAggregator returns the factory for an aggregator type. The overlay map wins over the built-in registry. The boolean second return is the standard "found" signal.

func (*ExtensionRegistry) LookupAttribute added in v0.7.0

func (r *ExtensionRegistry) LookupAttribute(t types.AttributeType) (AttributeFactory, bool)

LookupAttribute returns the factory for an attribute type. Overlay wins.

func (*ExtensionRegistry) LookupFeature added in v0.7.0

func (r *ExtensionRegistry) LookupFeature(t types.FeatureType) (feature.Factory, bool)

LookupFeature returns the factory for a feature type. Overlay wins. Falls through to feature.Lookup for built-ins.

func (*ExtensionRegistry) LookupFilterer added in v0.7.0

func (r *ExtensionRegistry) LookupFilterer(t types.FiltererType) (FiltererFactory, bool)

LookupFilterer returns the factory for a filterer type. Overlay wins.

func (*ExtensionRegistry) LookupGrouper added in v0.7.0

func (r *ExtensionRegistry) LookupGrouper(t types.GroupType) (GrouperFactory, bool)

LookupGrouper returns the factory for a grouper type. Overlay wins.

func (*ExtensionRegistry) LookupPostTest added in v0.7.0

func (r *ExtensionRegistry) LookupPostTest(t types.TestType) (PostTestFactory, bool)

LookupPostTest returns the tier-2 post-test factory for a test type. Overlay wins.

func (*ExtensionRegistry) LookupRowTest added in v0.7.0

func (r *ExtensionRegistry) LookupRowTest(t types.TestType) (RowTestFactory, bool)

LookupRowTest returns the tier-1 row-test factory for a test type. Overlay wins.

func (*ExtensionRegistry) LookupWindow added in v0.7.0

LookupWindow returns the factory for a window type. Overlay wins. Falls through to window.Lookup for built-ins.

func (*ExtensionRegistry) WindowFactories added in v0.7.0

func (r *ExtensionRegistry) WindowFactories() map[types.WindowType]window.WindowFactory

WindowFactories returns the overlay map for window operators, or nil when the registry has no entries. Safe to call on a nil receiver — the runtime treats nil as "no overlay" and falls through to the built-in window registry.

type FilterFunc

type FilterFunc func(record *Record) (bool, error)

FilterFunc evaluates whether a record passes a filter.

func BuildFilters added in v0.7.0

func BuildFilters(filterers []*types.Filterer, schema *encoding.Schema, exts *ExtensionRegistry) ([]FilterFunc, error)

BuildFilters compiles a slice of types.Filterer into runtime FilterFuncs against the given schema. Mirrors the per-Processor helper so non-Processor consumers (FacetSchema, Sample variants) can reuse the same factory + extension semantics. Returns nil when filterers is empty.

type FiltererBuilder

type FiltererBuilder interface {
	// Build creates a filter function from the filter specification.
	Build(filter *types.Filterer, schema *encoding.Schema) (FilterFunc, error)
}

FiltererBuilder constructs a FilterFunc from a filter specification.

type FiltererFactory

type FiltererFactory func() FiltererBuilder

FiltererFactory creates a FiltererBuilder from a type specification.

type Grouper

type Grouper interface {
	// Group partitions the records by the specified field, returning a map of group key to records.
	Group(records []*Record, field string) (map[string][]*Record, error)
}

Grouper partitions records into named groups.

type GrouperFactory

type GrouperFactory func(grp *types.Group, schema *encoding.Schema) (Grouper, error)

GrouperFactory creates a Grouper from a type specification.

type LookupTable added in v0.7.0

type LookupTable struct {
	Rows   map[string]float64
	Lookup func(keys ...string) (float64, bool, error)
}

LookupTable is the runtime-side mirror of pulse.LookupTable. The expr environment's lookup() built-in consults this map; exactly one of Rows or Lookup is populated per table.

type OnlineAggregator added in v0.2.0

type OnlineAggregator interface {
	// UpdateRow folds a single record into the running state. The field
	// argument is the field the aggregator operates on; for COUNT this
	// can be ignored (COUNT counts rows, not values).
	UpdateRow(record *Record, field string) error
	// Finalize returns the final aggregated value and resets internal
	// state. It must be safe to call exactly once after streaming.
	Finalize() (float64, error)
}

OnlineAggregator is the optional sibling of Aggregator that supports single-pass streaming computation without materializing the full record set. Aggregators that can produce their result via O(1) (or O(unique)) state per row implement this interface so the orchestrator can stream the iterator directly when every aggregation in a request is online.

Implementations MUST handle null values internally: callers invoke UpdateRow once per record (after filters), and the aggregator decides whether the row contributes (e.g., COUNT counts every row, SUM skips nulls on its target field).

Finalize is called once after the iterator is exhausted; it returns the aggregated value. Implementations MUST be safe to call Finalize without any UpdateRow calls (i.e., empty input case).

type PostTest added in v0.3.0

type PostTest interface {
	Run(rows []map[string]any) (*types.TestResult, error)
}

PostTest is a tier-2 statistical test consumed after the window stage on the materialized result row set. Run is called once over the post- pipeline data and returns a TestResult; tier-2 tests are always buffered.

type PostTestFactory added in v0.3.0

type PostTestFactory func(spec *types.Test, schema *encoding.Schema) (PostTest, error)

PostTestFactory creates a PostTest from a specification.

type ProcessPath added in v0.2.0

type ProcessPath int

ProcessPath identifies which execution path Process took. The streaming path runs aggregations in a single pass over the iterator without materializing the full record set. The buffered path collects every record into a slice first. This is exposed primarily for tests and benchmarks; production callers do not need it.

const (
	// PathUnknown is the zero value; set before the first Process call.
	PathUnknown ProcessPath = iota
	// PathBuffered is the legacy materialize-then-aggregate path. It is
	// always correct and is the fallback whenever streaming would be
	// unsafe (groups, attributes, non-online aggregators, expression
	// filters that need the full set, etc.).
	PathBuffered
	// PathStreaming runs aggregations in a single pass over the iterator,
	// folding each record into the running state of every aggregator.
	// Selected only when every aggregation is an OnlineAggregator and
	// the request has no groups, no attributes, and only row-level
	// filters.
	PathStreaming
)

func (ProcessPath) String added in v0.2.0

func (p ProcessPath) String() string

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor is the single dynamic processing engine for Pulse. It handles filtering, attribute computation, grouping, and aggregation over record iterators backed by .pulse encoded data.

func NewProcessor

func NewProcessor(schema *encoding.Schema) *Processor

NewProcessor creates a new Processor for the given schema. The resulting Processor uses only Pulse-shipped operator factories; embedder extensions land via NewProcessorWithExtensions.

func NewProcessorWithExtensions added in v0.7.0

func NewProcessorWithExtensions(schema *encoding.Schema, exts *ExtensionRegistry) *Processor

NewProcessorWithExtensions creates a Processor whose operator lookups consult exts before falling through to the built-in registries. Passing nil is equivalent to NewProcessor.

func (*Processor) LastPath added in v0.2.0

func (p *Processor) LastPath() ProcessPath

LastPath returns the ProcessPath taken by the most recent Process call on this processor instance. Returns PathUnknown before any call. Used by tests to verify that the orchestrator selected the streaming path for online-only requests; not part of the stable API contract.

func (*Processor) Process

func (p *Processor) Process(ctx context.Context, req *types.Request, iter RecordIterator) (*types.Response, error)

Process executes a single request against the record iterator.

Selects between two execution strategies:

  1. Streaming: when every aggregation supports OnlineAggregator and the request has no groups, no attributes, the iterator is consumed in one pass. Filters are applied per row before each aggregator folds the row into its running state. Memory is O(distinct values) for FREQUENCY/MODE/DISTINCT_COUNT and O(1) for everything else.

  2. Buffered: the legacy path. Every record is collected into a slice first, then filters, attributes, grouping, and aggregations run over the materialized set. Memory is O(rows). Always correct.

Output is identical between paths to float64 precision on well-conditioned inputs (variance/stddev/skewness/kurtosis use Welford-Pébaÿ recurrences in the streaming path).

func (*Processor) ProcessComposed

func (p *Processor) ProcessComposed(ctx context.Context, composed *types.ComposedRequest, records []*Record) ([]*types.Response, error)

ProcessComposed executes multiple requests against a shared record set.

type Record

type Record struct {
	// contains filtered or unexported fields
}

Record represents a single data row with field accessors. It provides both numeric and string access for processing operations.

func NewRecord

func NewRecord(schema *encoding.Schema, values map[string]float64) *Record

NewRecord creates a record with the given schema and field values.

func NewRecordWithNulls

func NewRecordWithNulls(schema *encoding.Schema, values map[string]float64, nulls map[string]bool) *Record

NewRecordWithNulls creates a record with explicit null tracking.

func NewRecordWithWide added in v0.2.0

func NewRecordWithWide(schema *encoding.Schema, values map[string]float64, nulls map[string]bool, wide map[string]any) *Record

NewRecordWithWide creates a record with typed wide values for fields that do not fit in float64 (decimal128, point_f64, h3_cell).

func NewReusableRecord added in v0.2.0

func NewReusableRecord(schema *encoding.Schema) *Record

NewReusableRecord constructs a Record whose internal maps are sized for the given schema and intended to be reused across many ReadRecordReused calls. Returns a Record that callers must NOT retain past the next iteration step.

func (*Record) AllValues

func (r *Record) AllValues() map[string]any

AllValues returns all field values as a map (for expression evaluation).

The returned map is cached on the Record after the first call and reused on subsequent calls; callers MUST NOT mutate it. If a caller mutates the underlying values map directly (e.g., the processor injecting computed attributes), it must call invalidateAllValuesCache to discard the cache.

func (*Record) ClearForRow added in v0.2.0

func (r *Record) ClearForRow()

ClearForRow implements encoding.ReusableRecord. Resets per-row state so the next ReadRecordReused call starts from a clean slate while keeping the underlying maps allocated.

values is left intact because every field is overwritten on every row. nulls and wide are cleared because their entries are sparse.

func (*Record) NumericValue

func (r *Record) NumericValue(name string) (float64, bool)

NumericValue returns the numeric value for the named field. Returns the value and true if present and non-null, or 0 and false if null or missing.

func (*Record) Schema

func (r *Record) Schema() *encoding.Schema

Schema returns the record's schema.

func (*Record) Set added in v0.2.0

func (r *Record) Set(name string, value float64)

Set assigns a numeric value to the named field on this record. It clears any prior null marker and invalidates the AllValues cache. Used by pre-filter feature operators to inject derived columns into the record stream so downstream stages (filters, attributes, groupers, aggregators) can reference them by label.

func (*Record) SetNull added in v0.2.0

func (r *Record) SetNull(name string)

SetNull marks the named field as null. Used by feature operators to propagate input nulls into the derived column.

func (*Record) SetNullField added in v0.2.0

func (r *Record) SetNullField(name string)

SetNullField implements encoding.ReusableRecord. Marks a field as null in the reuse path; does not invalidate the AllValues cache (reuse path resets the cache once per row via ClearForRow).

func (*Record) SetNumeric added in v0.2.0

func (r *Record) SetNumeric(name string, value float64)

SetNumeric implements encoding.ReusableRecord. Assigns a numeric value without touching the null marker or invalidating the AllValues cache. Intended only for the streaming reuse path that calls ClearForRow before each row.

func (*Record) SetWide added in v0.2.0

func (r *Record) SetWide(name string, v any)

SetWide assigns a typed wide value to a field. Used by readers and feature operators that produce non-float values (decimal, point, h3).

func (*Record) SetWideField added in v0.2.0

func (r *Record) SetWideField(name string, v any)

SetWideField implements encoding.ReusableRecord. Stores a typed wide value (decimal/point/h3) without invalidating the AllValues cache.

func (*Record) StringValue

func (r *Record) StringValue(name string) (string, bool)

StringValue returns the resolved string value for categorical fields. For non-categorical fields, returns the empty string and false.

func (*Record) WideValue added in v0.2.0

func (r *Record) WideValue(name string) (any, bool)

WideValue returns the typed wide value for the named field, if present. Wide values are populated for decimal128, point_f64, and h3_cell fields.

type RecordIterator

type RecordIterator interface {
	// Next advances to the next record. Returns false when exhausted.
	Next() bool
	// Record returns the current record. Only valid after Next returns true.
	Record() *Record
	// Reset resets the iterator to the beginning.
	Reset()
}

RecordIterator provides sequential access to records.

type ReusableIterator added in v0.2.0

type ReusableIterator interface {
	SetReuse(bool)
}

ReusableIterator is an optional interface implemented by iterators that can return the same Record pointer across Next() calls, refreshing its values/nulls/wide maps in place. Streaming consumers that consume each record inline (no slice retention) can opt in to drop the per-row map allocations.

Callers MUST consume each record before invoking Next() again — the next call will overwrite the Record's contents.

type RowLocalAttribute added in v0.2.0

type RowLocalAttribute interface {
	// Row computes this attribute's value for a single record and field.
	// For a pure RowLocalAttribute, no PrePass call is needed; for a
	// TwoPassAttribute, Row must be called only after Finalize.
	Row(record *Record, field string) (float64, error)
}

RowLocalAttribute is the optional sibling of AttributeComputer for attributes whose value depends only on the current row (no first-pass population stats needed). Streaming paths drive RowLocalAttribute.Row inline instead of buffering the full record set.

FORMULA and DATE_PART implement this interface; ZSCORE / TSCORE / NORMALIZED implement TwoPassAttribute (a superset). PERCENTILE does NOT implement either — it needs a sorted view of every value, which forces the buffered path.

type RowTest added in v0.3.0

type RowTest interface {
	UpdateRow(record *Record) error
	Finalize() (*types.TestResult, error)
}

RowTest is a tier-1 statistical test consumed during the row scan alongside online aggregators. Implementations fold each filter-passing record into running state via UpdateRow and produce a TestResult via Finalize after the iterator is exhausted.

Per-test state is per-instance; callers construct a fresh instance per Process call via a RowTestFactory.

UpdateRow MUST be safe to call zero times (Finalize handles the empty input case). Implementations decide whether a row contributes; null values are typically skipped via the Record's NumericValue / StringValue helpers.

type RowTestFactory added in v0.3.0

type RowTestFactory func(spec *types.Test, schema *encoding.Schema) (RowTest, error)

RowTestFactory creates a RowTest from a specification.

type SliceIterator

type SliceIterator struct {
	// contains filtered or unexported fields
}

SliceIterator implements RecordIterator over a slice of records.

func NewSliceIterator

func NewSliceIterator(records []*Record) *SliceIterator

NewSliceIterator creates an iterator over the given records.

func (*SliceIterator) Next

func (it *SliceIterator) Next() bool

Next advances to the next record.

func (*SliceIterator) Record

func (it *SliceIterator) Record() *Record

Record returns the current record.

func (*SliceIterator) Reset

func (it *SliceIterator) Reset()

Reset resets the iterator to the beginning.

type StreamingGrouper added in v0.2.0

type StreamingGrouper interface {
	// KeyForRow returns the group key string for record's value of field.
	// ok=false signals the row should be skipped (e.g. null value);
	// ok=true means the key is valid for bucketing.
	KeyForRow(record *Record, field string) (key string, ok bool, err error)
}

StreamingGrouper is the optional sibling of Grouper for groupers that can derive a partition key from a single record without seeing the full record set. CATEGORY / RANGE / ROUNDED / H3_CELL implement this interface; QUANTILE and DATE require finalize-time work over the full input.

Implementations MUST be safe to call repeatedly; the streaming path invokes KeyForRow once per filter-passing record and uses the key to index a per-group online aggregator bucket.

type TwoPassAttribute added in v0.2.0

type TwoPassAttribute interface {
	RowLocalAttribute
	// PrePass folds a single record's contribution into the running
	// state used to compute population statistics.
	PrePass(record *Record, field string) error
	// Finalize closes the PrePass phase. After Finalize, Row may be
	// called for each record (typically during iter pass 2).
	Finalize() error
}

TwoPassAttribute is the streaming-friendly path for attributes that need population statistics (ZSCORE / TSCORE need mean+stddev, NORMALIZED needs min+max). The orchestrator drives PrePass over every filter-passing record, then Finalize locks the global stats, then Row emits per-record values during a second iter pass.

Mirrors feature.StreamingComputer.PrePass+Finalize+EmitRow so the streaming infrastructure (iter.Reset(), staged passes) is uniform.

Implementations MUST be safe to call PrePass repeatedly; Finalize exactly once between PrePass and Row; and Row only after Finalize. State is per-instance — callers construct a fresh instance per Process call via the AttributeFactory.

Directories

Path Synopsis
Package arena provides a bump-allocator backed by a single contiguous []byte.
Package arena provides a bump-allocator backed by a single contiguous []byte.
Package feature implements the FEAT_* operators that run pre-filter to add derived columns to a record stream.
Package feature implements the FEAT_* operators that run pre-filter to add derived columns to a record stream.
Package regression hosts the REG_* operators that fit a regression model against the filtered record set.
Package regression hosts the REG_* operators that fit a regression model against the filtered record set.
Package window implements the WIN_* window operators for Pulse.
Package window implements the WIN_* window operators for Pulse.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL