Documentation
¶
Overview ¶
Package core provides the standard numeric and basic monoids: Sum, Count, Min, Max, First, Last, Set. These are structural — backend executors recognize their Kind and translate them to native operations.
Index ¶
- func Count() monoid.Monoid[int64]
- func First[V any]() monoid.Monoid[Stamped[V]]
- func Last[V any]() monoid.Monoid[Stamped[V]]
- func Max[V cmp.Ordered]() monoid.Monoid[Bounded[V]]
- func Min[V cmp.Ordered]() monoid.Monoid[Bounded[V]]
- func Monotonic[V cmp.Ordered](identity V) monoid.Monoid[V]
- func Set[V comparable]() monoid.Monoid[map[V]struct{}]
- func Sum[V Numeric]() monoid.Monoid[V]
- type Bounded
- type Numeric
- type Stamped
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Count ¶
Count returns a monoid that counts events. Each event contributes 1; values are summed. Use with Pipeline's Value(func(T) int64 { return 1 }) or have the source emit unit values.
func First ¶
First returns a monoid that keeps the earliest-timestamped value seen. On ties, the first-arrived value wins (left side of Combine).
func Last ¶
Last returns a monoid that keeps the latest-timestamped value seen. On ties, the most-recently-arrived value wins (right side of Combine).
func Max ¶
Max returns a monoid that tracks the running maximum of V. Identity is the unset Bounded[V]; this satisfies the monoid identity law for all V.
func Min ¶
Min returns a monoid that tracks the running minimum of V. Identity is the unset Bounded[V]; this satisfies the monoid identity law for all V.
Lift per-event observations via NewBounded(v) (or core.Bounded[V]{Value: v, Set: true}).
func Monotonic ¶
Monotonic returns a monoid where Combine takes the larger of its inputs by V's natural ordering. The caller supplies Identity explicitly because `cmp.Ordered` has no portable minimum.
When to use this vs. Max ¶
`core.Max[V]()` is the right monoid for "the maximum observed value over the events seen so far" when each event contributes a single observation. Its value type is `Bounded[V]` (so Identity can mean "no observation yet"), which means callers lift each event via `NewBounded(v)`.
`Monotonic[V]` is the right monoid for the SetCountIfGreater pattern: upstream services emit ABSOLUTE values (e.g. "user X has 47 likes") and the pipeline must accept-only-if-greater so that an out-of-order earlier emission ("user X has 32 likes") doesn't roll the value backwards. Pair it with `pkg/state/dynamodb.Int64MaxStore` (DDB conditional UpdateItem `attribute_not_exists OR #v < :v`) for the canonical end-to-end shape: in-process Combine collapses a batch via `max`, the store performs the final conditional write.
pipe := pipeline.NewPipeline[Event, int64]("likes_total").
From(src).
Key(func(e Event) string { return "user:" + e.UserID }).
Value(func(e Event) int64 { return e.AbsoluteCount }).
Aggregate(core.Monotonic[int64](math.MinInt64)).
StoreIn(maxStore)
Why caller-supplied Identity ¶
For `cmp.Ordered`, there's no portable "minimum" value. For signed integers it's `math.MinInt*`; for unsigned, `0`; for `string`, `""`; for `time.Time`, the zero `Time{}`. Rather than hard-code one or require a new constraint, callers pass the appropriate min explicitly. In Int64MaxStore-backed pipelines Identity is largely vestigial — the DDB conditional handles the "no value yet" case via `attribute_not_exists` — so even an obviously-wrong Identity value would still produce correct stored state, but the in-process `Combine(Identity, x) = x` law fails if you pick wrong.
func Set ¶
func Set[V comparable]() monoid.Monoid[map[V]struct{}]
Set returns a monoid that unions sets of comparable elements. The set representation is a Go map[V]struct{}. For high-cardinality unique-element tracking prefer the HLL sketch monoid, which trades exactness for bounded memory.
Types ¶
type Bounded ¶
Bounded wraps a value V with a Set flag, used by Min/Max so that Identity can be represented as "no value yet" rather than the zero value of V (which would otherwise break the monoid identity law for any V where the zero is a legitimate input).
Use NewBounded to lift a single observation; the Set flag is then true. The zero Bounded[V] is Identity for both Min and Max.
func NewBounded ¶
NewBounded lifts a single observation into a Bounded wrapper.
type Numeric ¶
type Numeric interface {
~int | ~int8 | ~int16 | ~int32 | ~int64 |
~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 |
~float32 | ~float64
}
Numeric is the constraint for monoids over orderable numeric values.
type Stamped ¶
type Stamped[V any] struct { Time int64 // Unix nanoseconds Value V // Set indicates whether this Stamped carries a real value. The zero Stamped is the // identity for First/Last and must be distinguishable from a real event at t=0. Set bool }
Stamped wraps a value V with an event time. First and Last operate on Stamped[V].