Documentation
¶
Overview ¶
Package window contains window representation, windowing strategies and utilities.
Index ¶
Constants ¶
const ( DefaultTrigger string = "Trigger_Default_" AlwaysTrigger string = "Trigger_Always_" AfterAnyTrigger string = "Trigger_AfterAny_" AfterAllTrigger string = "Trigger_AfterAll_" AfterProcessingTimeTrigger string = "Trigger_AfterProcessing_Time_" ElementCountTrigger string = "Trigger_ElementCount_" AfterEndOfWindowTrigger string = "Trigger_AfterEndOfWindow_" RepeatTrigger string = "Trigger_Repeat_" OrFinallyTrigger string = "Trigger_OrFinally_" NeverTrigger string = "Trigger_Never_" AfterSynchronizedProcessingTimeTrigger string = "Trigger_AfterSynchronizedProcessingTime_" )
Variables ¶
var ( // SingleGlobalWindow is a slice of a single global window. Convenience value. SingleGlobalWindow = []typex.Window{GlobalWindow{}} )
Functions ¶
func IsEqualList ¶
IsEqualList returns true iff the lists of windows are equal. Note that ordering matters and that this is not set equality.
Types ¶
type AccumulationMode ¶
type AccumulationMode string
const ( Unspecified AccumulationMode = "AccumulationMode_UNSPECIFIED" Discarding AccumulationMode = "AccumulationMode_DISCARDING" Accumulating AccumulationMode = "AccumulationMode_ACCUMULATING" Retracting AccumulationMode = "AccumulationMode_RETRACTING" )
type AlignToTransform ¶ added in v2.34.0
type AlignToTransform struct {
Period, Offset int64 // in milliseconds
}
AlignToTransform takes the timestamp and transforms it to the lowest multiple of the period starting from the offset.
Eg. A period of 20 with an offset of 45 would have alignments at 5,25,45,65 etc. Timestamps would be transformed as follows: 0 to 5 would be transformed to 5, 6 to 25 would be transformed to 25, 26 to 45 would be transformed to 45, and so on.
type DelayTransform ¶ added in v2.34.0
type DelayTransform struct {
Delay int64 // in milliseconds
}
DelayTransform takes the timestamp and adds the given delay to it.
type Fn ¶
type Fn struct { Kind Kind Size time.Duration // FixedWindows, SlidingWindows Period time.Duration // SlidingWindows Gap time.Duration // Sessions }
Fn defines the window fn.
func NewFixedWindows ¶
NewFixedWindows returns the fixed WindowFn with the given interval.
func NewGlobalWindows ¶
func NewGlobalWindows() *Fn
NewGlobalWindows returns the default WindowFn, which places all elements into a single window.
func NewSessions ¶
NewSessions returns the session WindowFn with the given gap.
func NewSlidingWindows ¶
NewSlidingWindows returns the sliding WindowFn with the given period and duration.
func (*Fn) Coder ¶
func (w *Fn) Coder() *coder.WindowCoder
Coder returns the WindowCoder for the WindowFn.
type GlobalWindow ¶
type GlobalWindow struct{}
GlobalWindow represents the singleton, global window.
func (GlobalWindow) MaxTimestamp ¶
func (GlobalWindow) MaxTimestamp() typex.EventTime
MaxTimestamp returns the maximum timestamp in the window.
func (GlobalWindow) String ¶
func (GlobalWindow) String() string
type IntervalWindow ¶
IntervalWindow represents a half-open bounded window [start,end).
func (IntervalWindow) MaxTimestamp ¶
func (w IntervalWindow) MaxTimestamp() typex.EventTime
MaxTimestamp returns the maximum timestamp in the window.
func (IntervalWindow) String ¶
func (w IntervalWindow) String() string
type TimestampTransform ¶ added in v2.34.0
type TimestampTransform interface {
// contains filtered or unexported methods
}
TimestampTransform describes how an after processing time trigger time is transformed to determine when to fire an aggregation.const The base timestamp is always the when the first element of the pane is received.
A series of these transforms will be applied in order emit at regular intervals.
type Trigger ¶
type Trigger struct { Kind string SubTriggers []Trigger // Repeat, OrFinally, Any, All TimestampTransforms []TimestampTransform // AfterProcessingTime ElementCount int32 // ElementCount EarlyTrigger *Trigger // AfterEndOfWindow LateTrigger *Trigger // AfterEndOfWindow }
Trigger describes when to emit new aggregations. Fields are exported for use by the framework, and not intended to be set by end users.
This API is experimental and subject to change.
func TriggerAfterCount ¶ added in v2.34.0
TriggerAfterCount constructs a trigger that fires after at least `count` number of elements are processed.
func TriggerAfterEndOfWindow ¶ added in v2.34.0
func TriggerAfterEndOfWindow() Trigger
TriggerAfterEndOfWindow constructs a trigger that is configurable for early firing (before the end of window) and late firing (after the end of window).
Default Options are: Default Trigger for EarlyFiring and No LateFiring. Override it with EarlyFiring and LateFiring methods on this trigger.
func TriggerAfterProcessingTime ¶ added in v2.34.0
func TriggerAfterProcessingTime() Trigger
TriggerAfterProcessingTime constructs a trigger that fires relative to when input first arrives.
Must be configured with calls to PlusDelay, or AlignedTo. May be configured with additional delay.
func TriggerAlways ¶ added in v2.34.0
func TriggerAlways() Trigger
TriggerAlways constructs a trigger that fires immediately whenever an element is received.
Equivalent to window.TriggerRepeat(window.TriggerAfterCount(1))
func TriggerDefault ¶ added in v2.34.0
func TriggerDefault() Trigger
TriggerDefault constructs a default trigger that fires once after the end of window. Late Data is discarded.
func TriggerRepeat ¶ added in v2.34.0
TriggerRepeat constructs a trigger that fires a trigger repeatedly once the condition has been met.
Ex: window.TriggerRepeat(window.TriggerAfterCount(1)) is same as window.TriggerAlways().
func (Trigger) AlignedTo ¶ added in v2.34.0
AlignedTo configures an AfterProcessingTime trigger to fire at the smallest multiple of period since the offset greater than the first element timestamp.
* Period may not be smaller than a millisecond. * Offset may be a zero time (time.Time{}).
func (Trigger) EarlyFiring ¶ added in v2.34.0
EarlyFiring configures an AfterEndOfWindow trigger with an implicitly repeated trigger that applies before the end of the window.
func (Trigger) LateFiring ¶ added in v2.34.0
LateFiring configures an AfterEndOfWindow trigger with an implicitly repeated trigger that applies after the end of the window.
Not setting a late firing trigger means elements are discarded.
type WindowingStrategy ¶
type WindowingStrategy struct { Fn *Fn Trigger Trigger AccumulationMode AccumulationMode AllowedLateness int // in milliseconds }
WindowingStrategy defines the types of windowing used in a pipeline and contains the data to support executing a windowing strategy.
func DefaultWindowingStrategy ¶
func DefaultWindowingStrategy() *WindowingStrategy
DefaultWindowingStrategy returns the default windowing strategy.
func (*WindowingStrategy) Equals ¶
func (ws *WindowingStrategy) Equals(o *WindowingStrategy) bool
func (*WindowingStrategy) String ¶
func (ws *WindowingStrategy) String() string