v2.34.0-RC1 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Oct 26, 2021 License: Apache-2.0, BSD-3-Clause, MIT Imports: 5 Imported by: 53



Package window contains window representation, windowing strategies and utilities.



View Source
const (
	DefaultTrigger                         string = "Trigger_Default_"
	AlwaysTrigger                          string = "Trigger_Always_"
	AfterAnyTrigger                        string = "Trigger_AfterAny_"
	AfterAllTrigger                        string = "Trigger_AfterAll_"
	AfterProcessingTimeTrigger             string = "Trigger_AfterProcessing_Time_"
	ElementCountTrigger                    string = "Trigger_ElementCount_"
	AfterEndOfWindowTrigger                string = "Trigger_AfterEndOfWindow_"
	RepeatTrigger                          string = "Trigger_Repeat_"
	OrFinallyTrigger                       string = "Trigger_OrFinally_"
	NeverTrigger                           string = "Trigger_Never_"
	AfterSynchronizedProcessingTimeTrigger string = "Trigger_AfterSynchronizedProcessingTime_"


View Source
var (
	// SingleGlobalWindow is a slice of a single global window. Convenience value.
	SingleGlobalWindow = []typex.Window{GlobalWindow{}}


func IsEqualList

func IsEqualList(from, to []typex.Window) bool

IsEqualList returns true iff the lists of windows are equal. Note that ordering matters and that this is not set equality.


type AccumulationMode

type AccumulationMode string
const (
	Unspecified  AccumulationMode = "AccumulationMode_UNSPECIFIED"
	Discarding   AccumulationMode = "AccumulationMode_DISCARDING"
	Accumulating AccumulationMode = "AccumulationMode_ACCUMULATING"
	Retracting   AccumulationMode = "AccumulationMode_RETRACTING"

type AlignToTransform added in v2.34.0

type AlignToTransform struct {
	Period, Offset int64 // in milliseconds

AlignToTransform takes the timestamp and transforms it to the lowest multiple of the period starting from the offset.

Eg. A period of 20 with an offset of 45 would have alignments at 5,25,45,65 etc. Timestamps would be transformed as follows: 0 to 5 would be transformed to 5, 6 to 25 would be transformed to 25, 26 to 45 would be transformed to 45, and so on.

type DelayTransform added in v2.34.0

type DelayTransform struct {
	Delay int64 // in milliseconds

DelayTransform takes the timestamp and adds the given delay to it.

type Fn

type Fn struct {
	Kind Kind

	Size   time.Duration // FixedWindows, SlidingWindows
	Period time.Duration // SlidingWindows
	Gap    time.Duration // Sessions

Fn defines the window fn.

func NewFixedWindows

func NewFixedWindows(interval time.Duration) *Fn

NewFixedWindows returns the fixed WindowFn with the given interval.

func NewGlobalWindows

func NewGlobalWindows() *Fn

NewGlobalWindows returns the default WindowFn, which places all elements into a single window.

func NewSessions

func NewSessions(gap time.Duration) *Fn

NewSessions returns the session WindowFn with the given gap.

func NewSlidingWindows

func NewSlidingWindows(period, duration time.Duration) *Fn

NewSlidingWindows returns the sliding WindowFn with the given period and duration.

func (*Fn) Coder

func (w *Fn) Coder() *coder.WindowCoder

Coder returns the WindowCoder for the WindowFn.

func (*Fn) Equals

func (w *Fn) Equals(o *Fn) bool

Equals returns true iff the windows have the same kind and underlying behavior. Built-in window types (such as global window) are only equal to the same instances of the window. A user-defined window that happens to match a built-in will not match on Equals().

func (*Fn) String

func (w *Fn) String() string

type GlobalWindow

type GlobalWindow struct{}

GlobalWindow represents the singleton, global window.

func (GlobalWindow) Equals

func (GlobalWindow) Equals(o typex.Window) bool

func (GlobalWindow) MaxTimestamp

func (GlobalWindow) MaxTimestamp() typex.EventTime

MaxTimestamp returns the maximum timestamp in the window.

func (GlobalWindow) String

func (GlobalWindow) String() string

type IntervalWindow

type IntervalWindow struct {
	Start, End typex.EventTime

IntervalWindow represents a half-open bounded window [start,end).

func (IntervalWindow) Equals

func (w IntervalWindow) Equals(o typex.Window) bool

func (IntervalWindow) MaxTimestamp

func (w IntervalWindow) MaxTimestamp() typex.EventTime

MaxTimestamp returns the maximum timestamp in the window.

func (IntervalWindow) String

func (w IntervalWindow) String() string

type Kind

type Kind string

Kind is the semantic type of a window fn.

const (
	GlobalWindows  Kind = "GLO"
	FixedWindows   Kind = "FIX"
	SlidingWindows Kind = "SLI"
	Sessions       Kind = "SES"

type TimestampTransform added in v2.34.0

type TimestampTransform interface {
	// contains filtered or unexported methods

TimestampTransform describes how an after processing time trigger time is transformed to determine when to fire an aggregation.const The base timestamp is always the when the first element of the pane is received.

A series of these transforms will be applied in order emit at regular intervals.

type Trigger

type Trigger struct {
	Kind                string
	SubTriggers         []Trigger            // Repeat, OrFinally, Any, All
	TimestampTransforms []TimestampTransform // AfterProcessingTime
	ElementCount        int32                // ElementCount
	EarlyTrigger        *Trigger             // AfterEndOfWindow
	LateTrigger         *Trigger             // AfterEndOfWindow

Trigger describes when to emit new aggregations. Fields are exported for use by the framework, and not intended to be set by end users.

This API is experimental and subject to change.

func TriggerAfterCount added in v2.34.0

func TriggerAfterCount(count int32) Trigger

TriggerAfterCount constructs a trigger that fires after at least `count` number of elements are processed.

func TriggerAfterEndOfWindow added in v2.34.0

func TriggerAfterEndOfWindow() Trigger

TriggerAfterEndOfWindow constructs a trigger that is configurable for early firing (before the end of window) and late firing (after the end of window).

Default Options are: Default Trigger for EarlyFiring and No LateFiring. Override it with EarlyFiring and LateFiring methods on this trigger.

func TriggerAfterProcessingTime added in v2.34.0

func TriggerAfterProcessingTime() Trigger

TriggerAfterProcessingTime constructs a trigger that fires relative to when input first arrives.

Must be configured with calls to PlusDelay, or AlignedTo. May be configured with additional delay.

func TriggerAlways added in v2.34.0

func TriggerAlways() Trigger

TriggerAlways constructs a trigger that fires immediately whenever an element is received.

Equivalent to window.TriggerRepeat(window.TriggerAfterCount(1))

func TriggerDefault added in v2.34.0

func TriggerDefault() Trigger

TriggerDefault constructs a default trigger that fires once after the end of window. Late Data is discarded.

func TriggerRepeat added in v2.34.0

func TriggerRepeat(tr Trigger) Trigger

TriggerRepeat constructs a trigger that fires a trigger repeatedly once the condition has been met.

Ex: window.TriggerRepeat(window.TriggerAfterCount(1)) is same as window.TriggerAlways().

func (Trigger) AlignedTo added in v2.34.0

func (tr Trigger) AlignedTo(period time.Duration, offset time.Time) Trigger

AlignedTo configures an AfterProcessingTime trigger to fire at the smallest multiple of period since the offset greater than the first element timestamp.

* Period may not be smaller than a millisecond. * Offset may be a zero time (time.Time{}).

func (Trigger) EarlyFiring added in v2.34.0

func (tr Trigger) EarlyFiring(early Trigger) Trigger

EarlyFiring configures an AfterEndOfWindow trigger with an implicitly repeated trigger that applies before the end of the window.

func (Trigger) LateFiring added in v2.34.0

func (tr Trigger) LateFiring(late Trigger) Trigger

LateFiring configures an AfterEndOfWindow trigger with an implicitly repeated trigger that applies after the end of the window.

Not setting a late firing trigger means elements are discarded.

func (Trigger) PlusDelay added in v2.34.0

func (tr Trigger) PlusDelay(delay time.Duration) Trigger

PlusDelay configures an AfterProcessingTime trigger to fire after a specified delay, no smaller than a millisecond.

type WindowingStrategy

type WindowingStrategy struct {
	Fn               *Fn
	Trigger          Trigger
	AccumulationMode AccumulationMode
	AllowedLateness  int // in milliseconds

WindowingStrategy defines the types of windowing used in a pipeline and contains the data to support executing a windowing strategy.

func DefaultWindowingStrategy

func DefaultWindowingStrategy() *WindowingStrategy

DefaultWindowingStrategy returns the default windowing strategy.

func (*WindowingStrategy) Equals

func (ws *WindowingStrategy) Equals(o *WindowingStrategy) bool

func (*WindowingStrategy) String

func (ws *WindowingStrategy) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL