windowed

package
v0.0.0-...-6425869 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 9, 2026 License: Apache-2.0 Imports: 2 Imported by: 0

Documentation

Overview

Package windowed expresses time-bucketed aggregations on top of any structural monoid. State is keyed by (entity, bucket_id) where bucket_id is computed from event time and the configured Granularity. Queries assemble sliding windows by Combining the N most recent buckets through the wrapped monoid. DDB TTL handles eviction past Retention.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MergeBuckets

func MergeBuckets[V any](m monoid.Monoid[V], values []V) V

MergeBuckets folds bucket values via the inner monoid in stable order. Used by the query layer to produce sliding-window results from per-bucket reads.

Types

type Config

type Config struct {
	// Granularity is the size of each tumbling bucket (e.g. 24h for daily, 1h for hourly).
	// Smaller granularity means finer query precision and higher state cost.
	Granularity time.Duration

	// Retention is how long buckets are kept before TTL eviction. Sliding-window queries
	// can ask for any range up to Retention.
	Retention time.Duration

	// EventTimeField, if non-empty, names the field on the source record from which to
	// extract event-time. If empty, processing-time is used. Backends honor this when
	// computing BucketID.
	EventTimeField string
}

Config describes the bucket layout for a windowed aggregation.

func Daily

func Daily(retention time.Duration) Config

Daily returns a Config with 24h granularity and the given retention. The most common configuration for "last N days of X"-style counters.

func Hourly

func Hourly(retention time.Duration) Config

Hourly returns a Config with 1h granularity and the given retention. Suitable for "last N hours" or "last 7 days at hourly resolution" queries.

func Minute

func Minute(retention time.Duration) Config

Minute returns a Config with 1-minute granularity and the given retention. Useful for high-resolution short-window aggregations (last 5 minutes, last hour at per-minute resolution). At this granularity, "last 7 days" reads 10080 buckets per query — consider hierarchical roll-ups for queries spanning more than ~24h.

func (Config) BucketID

func (c Config) BucketID(t time.Time) int64

BucketID assigns the given time to a bucket according to Granularity. Buckets are tumbling and aligned to the Unix epoch.

func (Config) BucketRange

func (c Config) BucketRange(start, end time.Time) (lo, hi int64)

BucketRange returns the inclusive range of bucket IDs that cover [start, end].

func (Config) LastN

func (c Config) LastN(now time.Time, d time.Duration) (lo, hi int64)

LastN returns the bucket-ID range covering the most recent d duration ending at now. The number of buckets returned is ceil(d / Granularity); the upper bound is the bucket containing now and the range extends backward that many buckets. So for daily granularity, "last 7 days" returns 7 buckets (today plus 6 prior), not 8.

type Wrapped

type Wrapped[V any] struct {
	Inner  monoid.Monoid[V]
	Window Config
}

Wrapped pairs a monoid with a windowing config. The Pipeline DSL uses this to drive state-store keying and query-handler generation; the underlying Combine is unchanged.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL