Documentation
¶
Index ¶
- Constants
- Variables
- type Buffer
- func NewConsumableAsyncBuffer[T any](policy Policy[T]) Buffer[T]
- func NewLimitedConsumableAsyncBuffer[T any](policy Policy[T], limit int) Buffer[T]
- func NewLimitedSimpleAsyncBuffer[T any](limit int) Buffer[T]
- func NewSimpleAsyncBuffer[T any]() Buffer[T]
- func NewSortedSimpleAsyncBuffer[T any](limit int) Buffer[T]
- type BufferReader
- type ConsumableAsyncBuffer
- func (s *ConsumableAsyncBuffer[T]) AddEvent(ctx context.Context, event Event[T]) error
- func (s *ConsumableAsyncBuffer[T]) AddEvents(ctx context.Context, events []Event[T]) error
- func (s ConsumableAsyncBuffer) Dump() []Event[T]
- func (s ConsumableAsyncBuffer) Get(i int) Event[T]
- func (s *ConsumableAsyncBuffer[T]) GetAndConsumeNextEvents(ctx context.Context) ([]Event[T], error)
- func (s ConsumableAsyncBuffer) GetAndRemoveNextEvent(ctx context.Context) (Event[T], error)
- func (s ConsumableAsyncBuffer) Len() int
- func (s ConsumableAsyncBuffer) PeekNextEvent(ctx context.Context) (Event[T], error)
- func (s ConsumableAsyncBuffer) StartBlocking()
- func (s ConsumableAsyncBuffer) StopBlocking()
- type CountingWindowPolicy
- func (s *CountingWindowPolicy[T]) Description() PolicyDescription
- func (s *CountingWindowPolicy[T]) NextSelection() EventSelection
- func (s *CountingWindowPolicy[T]) NextSelectionReady() bool
- func (s *CountingWindowPolicy[T]) Offset(offset int)
- func (s *CountingWindowPolicy[T]) SetBuffer(buffer BufferReader[T])
- func (s *CountingWindowPolicy[T]) Shift()
- func (s *CountingWindowPolicy[T]) UpdateSelection()
- type DuoPolicy
- type DuoTemporalWindowPolicy
- func (s *DuoTemporalWindowPolicy[TLeft, TRight]) AddCallback(callback func([]Event[TLeft], []Event[TRight]))
- func (s *DuoTemporalWindowPolicy[TLeft, TRight]) Description() PolicyDescription
- func (s *DuoTemporalWindowPolicy[TLeft, TRight]) NextSelection() (EventSelection, EventSelection)
- func (s *DuoTemporalWindowPolicy[TLeft, TRight]) NextSelectionReady() bool
- func (s *DuoTemporalWindowPolicy[TLeft, TRight]) Offset(leftOffset, rightOffset int)
- func (s *DuoTemporalWindowPolicy[TLeft, TRight]) SetBuffers(left BufferReader[TLeft], right BufferReader[TRight])
- func (s *DuoTemporalWindowPolicy[TLeft, TRight]) Shift()
- func (s *DuoTemporalWindowPolicy[TLeft, TRight]) UpdateSelection()
- type Event
- func Arr[TContent any](events ...Event[TContent]) []Event[TContent]
- func NewEvent[TContent any](content TContent) Event[TContent]
- func NewEventFromJSON(b []byte) (Event[map[string]interface{}], error)
- func NewEventFromOthers[TContent any](content TContent, others ...TimeStamp) Event[TContent]
- func NewEventFromOthersM[TContent any](content TContent, meta StampMeta, others ...TimeStamp) Event[TContent]
- func NewEventM[TContent any](content TContent, meta StampMeta) Event[TContent]
- func NewNumericEvent[T NumericConstraint](content T) Event[T]
- type EventBuffer
- type EventChannel
- type EventSelection
- type Iterator
- type LimitedConsumableAsyncBuffer
- func (s *LimitedConsumableAsyncBuffer[T]) AddEvent(ctx context.Context, event Event[T]) error
- func (s *LimitedConsumableAsyncBuffer[T]) AddEvents(ctx context.Context, events []Event[T]) error
- func (s LimitedConsumableAsyncBuffer) Dump() []Event[T]
- func (s LimitedConsumableAsyncBuffer) Get(i int) Event[T]
- func (s *LimitedConsumableAsyncBuffer[T]) GetAndConsumeNextEvents(ctx context.Context) ([]Event[T], error)
- func (s *LimitedConsumableAsyncBuffer[T]) GetAndRemoveNextEvent(ctx context.Context) (Event[T], error)
- func (s LimitedConsumableAsyncBuffer) Len() int
- func (s LimitedConsumableAsyncBuffer) PeekNextEvent(ctx context.Context) (Event[T], error)
- func (s LimitedConsumableAsyncBuffer) StartBlocking()
- func (s LimitedConsumableAsyncBuffer) StopBlocking()
- type LimitedSimpleAsyncBuffer
- func (s *LimitedSimpleAsyncBuffer[T]) AddEvent(ctx context.Context, event Event[T]) error
- func (s *LimitedSimpleAsyncBuffer[T]) AddEvents(ctx context.Context, events []Event[T]) error
- func (s LimitedSimpleAsyncBuffer) Dump() []Event[T]
- func (s LimitedSimpleAsyncBuffer) Get(i int) Event[T]
- func (s *LimitedSimpleAsyncBuffer[T]) GetAndConsumeNextEvents(ctx context.Context) ([]Event[T], error)
- func (s *LimitedSimpleAsyncBuffer[T]) GetAndRemoveNextEvent(ctx context.Context) (Event[T], error)
- func (s LimitedSimpleAsyncBuffer) Len() int
- func (s LimitedSimpleAsyncBuffer) PeekNextEvent(ctx context.Context) (Event[T], error)
- func (s LimitedSimpleAsyncBuffer) StartBlocking()
- func (s LimitedSimpleAsyncBuffer) StopBlocking()
- type MultiEventSelection
- type MultiPolicy
- type MultiTemporalWindowPolicy
- func (s *MultiTemporalWindowPolicy[T]) AddCallback(callback func(map[int][]Event[T]))
- func (s *MultiTemporalWindowPolicy[T]) Description() PolicyDescription
- func (s *MultiTemporalWindowPolicy[T]) NextSelection() MultiEventSelection
- func (s *MultiTemporalWindowPolicy[T]) NextSelectionReady() bool
- func (s *MultiTemporalWindowPolicy[T]) Offset(bufferIndex int, offset int)
- func (s *MultiTemporalWindowPolicy[T]) SetBuffers(buffers map[int]BufferReader[T])
- func (s *MultiTemporalWindowPolicy[T]) Shift()
- func (s *MultiTemporalWindowPolicy[T]) UpdateSelection()
- type NumericConstraint
- type NumericEvent
- type Policy
- func NewCountingWindowPolicy[T any](n int, shift int) Policy[T]
- func NewPolicyFromDescription[T any](desc PolicyDescription) (Policy[T], error)
- func NewSelectNextPolicy[T any]() Policy[T]
- func NewTemporalWindowPolicy[T any](startingTime time.Time, windowLength time.Duration, windowShift time.Duration) Policy[T]
- type PolicyDescription
- type PolicyID
- type SimpleAsyncBuffer
- func (s *SimpleAsyncBuffer[T]) AddEvent(ctx context.Context, event Event[T]) error
- func (s *SimpleAsyncBuffer[T]) AddEvents(ctx context.Context, events []Event[T]) error
- func (s SimpleAsyncBuffer) Dump() []Event[T]
- func (s SimpleAsyncBuffer) Get(i int) Event[T]
- func (s *SimpleAsyncBuffer[T]) GetAndConsumeNextEvents(ctx context.Context) ([]Event[T], error)
- func (s SimpleAsyncBuffer) GetAndRemoveNextEvent(ctx context.Context) (Event[T], error)
- func (s SimpleAsyncBuffer) Len() int
- func (s SimpleAsyncBuffer) PeekNextEvent(ctx context.Context) (Event[T], error)
- func (s SimpleAsyncBuffer) StartBlocking()
- func (s SimpleAsyncBuffer) StopBlocking()
- type SortedSimpleAsyncBuffer
- func (s *SortedSimpleAsyncBuffer[T]) AddEvent(ctx context.Context, event Event[T]) error
- func (s *SortedSimpleAsyncBuffer[T]) AddEvents(ctx context.Context, events []Event[T]) error
- func (s SortedSimpleAsyncBuffer) Dump() []Event[T]
- func (s SortedSimpleAsyncBuffer) Get(i int) Event[T]
- func (s *SortedSimpleAsyncBuffer[T]) GetAndConsumeNextEvents(ctx context.Context) ([]Event[T], error)
- func (s *SortedSimpleAsyncBuffer[T]) GetAndRemoveNextEvent(ctx context.Context) (Event[T], error)
- func (s SortedSimpleAsyncBuffer) Len() int
- func (s SortedSimpleAsyncBuffer) PeekNextEvent(ctx context.Context) (Event[T], error)
- func (s SortedSimpleAsyncBuffer) StartBlocking()
- func (s SortedSimpleAsyncBuffer) StopBlocking()
- type StampMeta
- type TemporalEvent
- type TemporalWindowPolicy
- func (s *TemporalWindowPolicy[T]) Description() PolicyDescription
- func (s *TemporalWindowPolicy[T]) NextSelection() EventSelection
- func (s *TemporalWindowPolicy[T]) NextSelectionReady() bool
- func (s *TemporalWindowPolicy[T]) Offset(offset int)
- func (s *TemporalWindowPolicy[T]) SetBuffer(buffer BufferReader[T])
- func (s *TemporalWindowPolicy[T]) Shift()
- func (s *TemporalWindowPolicy[T]) UpdateSelection()
- type TimeStamp
Constants ¶
const ( CountingWindow = "counting" TemporalWindow = "temporal" SelectNext = "selectNext" )
Variables ¶
var ( ErrBufferStopped = errors.New("buffer: is stopped") ErrLimitExceeded = errors.New("buffer: limit exceeded") )
Functions ¶
This section is empty.
Types ¶
type Buffer ¶ added in v0.3.0
type Buffer[T any] interface { GetAndConsumeNextEvents(ctx context.Context) ([]Event[T], error) PeekNextEvent(ctx context.Context) (Event[T], error) AddEvent(ctx context.Context, event Event[T]) error AddEvents(ctx context.Context, events []Event[T]) error Len() int Get(x int) Event[T] Dump() []Event[T] StopBlocking() StartBlocking() GetAndRemoveNextEvent(ctx context.Context) (Event[T], error) }
Buffer interface to interact with any buffer
func NewConsumableAsyncBuffer ¶ added in v0.3.0
NewConsumableAsyncBuffer creates a new buffer that consumes events based on a selection policy.
func NewLimitedConsumableAsyncBuffer ¶ added in v0.3.0
NewLimitedConsumableAsyncBuffer creates a new buffer that consumes events based on a selection policy with a maximum event limit.
func NewLimitedSimpleAsyncBuffer ¶ added in v0.3.0
NewLimitedSimpleAsyncBuffer creates a new asynchronous buffer with a maximum event limit.
func NewSimpleAsyncBuffer ¶ added in v0.3.0
NewSimpleAsyncBuffer creates a new unbounded asynchronous buffer.
func NewSortedSimpleAsyncBuffer ¶ added in v0.3.0
NewSortedSimpleAsyncBuffer creates a new asynchronous buffer that sorts events by timestamp on insertion. A limit <= 0 means the buffer is unbounded.
type BufferReader ¶ added in v0.3.0
BufferReader allows read-only access to an underlying event buffer that implements BufferReader
type ConsumableAsyncBuffer ¶ added in v0.3.0
type ConsumableAsyncBuffer[T any] struct { // contains filtered or unexported fields }
ConsumableAsyncBuffer allows to sync exactly one reader and n writer. The Read operations PeekNextEvent and RemoveNextEvent either return the Next event, if any is available in the buffer or wait until Next event is available based on a selection policy. see selection.Policy[T]
func (*ConsumableAsyncBuffer[T]) AddEvent ¶ added in v0.3.0
func (s *ConsumableAsyncBuffer[T]) AddEvent(ctx context.Context, event Event[T]) error
AddEvent adds a single event to the buffer and updates the selection policy.
func (*ConsumableAsyncBuffer[T]) AddEvents ¶ added in v0.3.0
func (s *ConsumableAsyncBuffer[T]) AddEvents(ctx context.Context, events []Event[T]) error
AddEvents adds multiple events to the buffer and updates the selection policy.
func (ConsumableAsyncBuffer) Dump ¶ added in v0.3.0
func (s ConsumableAsyncBuffer) Dump() []Event[T]
func (*ConsumableAsyncBuffer[T]) GetAndConsumeNextEvents ¶ added in v0.3.0
func (s *ConsumableAsyncBuffer[T]) GetAndConsumeNextEvents(ctx context.Context) ([]Event[T], error)
GetAndConsumeNextEvents returns the Next buffered events and removes this event from the buffer. Blocks until at least one event buffered. When stopped, returns nil.
func (ConsumableAsyncBuffer) GetAndRemoveNextEvent ¶ added in v0.3.0
func (ConsumableAsyncBuffer) PeekNextEvent ¶ added in v0.3.0
PeekNextEvent returns the Next buffered event, but no event will be removed from the buffer. Blocks until at least one event buffered. When stopped, returns nil.
func (ConsumableAsyncBuffer) StartBlocking ¶ added in v0.3.0
func (s ConsumableAsyncBuffer) StartBlocking()
func (ConsumableAsyncBuffer) StopBlocking ¶ added in v0.3.0
func (s ConsumableAsyncBuffer) StopBlocking()
type CountingWindowPolicy ¶ added in v0.3.0
CountingWindowPolicy selects a fixed number of events (n) with a sliding window (shift).
func (*CountingWindowPolicy[T]) Description ¶ added in v0.3.0
func (s *CountingWindowPolicy[T]) Description() PolicyDescription
func (*CountingWindowPolicy[T]) NextSelection ¶ added in v0.3.0
func (s *CountingWindowPolicy[T]) NextSelection() EventSelection
func (*CountingWindowPolicy[T]) NextSelectionReady ¶ added in v0.3.0
func (s *CountingWindowPolicy[T]) NextSelectionReady() bool
func (*CountingWindowPolicy[T]) Offset ¶ added in v0.3.0
func (s *CountingWindowPolicy[T]) Offset(offset int)
func (*CountingWindowPolicy[T]) SetBuffer ¶ added in v0.3.0
func (s *CountingWindowPolicy[T]) SetBuffer(buffer BufferReader[T])
func (*CountingWindowPolicy[T]) UpdateSelection ¶ added in v0.3.0
func (s *CountingWindowPolicy[T]) UpdateSelection()
type DuoPolicy ¶ added in v0.3.0
type DuoPolicy[TLeft, TRight any] interface { NextSelectionReady() bool NextSelection() (EventSelection, EventSelection) UpdateSelection() Shift() Offset(leftOffset, rightOffset int) ID() PolicyID SetBuffers(left BufferReader[TLeft], right BufferReader[TRight]) Description() PolicyDescription AddCallback(callback func([]Event[TLeft], []Event[TRight])) }
DuoPolicy defines how events are selected from two buffers of different types
type DuoTemporalWindowPolicy ¶ added in v0.3.0
type DuoTemporalWindowPolicy[TLeft, TRight any] struct { PolicyID // contains filtered or unexported fields }
DuoTemporalWindowPolicy selects events based on a time window across two buffers.
func (*DuoTemporalWindowPolicy[TLeft, TRight]) AddCallback ¶ added in v0.3.0
func (s *DuoTemporalWindowPolicy[TLeft, TRight]) AddCallback(callback func([]Event[TLeft], []Event[TRight]))
func (*DuoTemporalWindowPolicy[TLeft, TRight]) Description ¶ added in v0.3.0
func (s *DuoTemporalWindowPolicy[TLeft, TRight]) Description() PolicyDescription
func (*DuoTemporalWindowPolicy[TLeft, TRight]) NextSelection ¶ added in v0.3.0
func (s *DuoTemporalWindowPolicy[TLeft, TRight]) NextSelection() (EventSelection, EventSelection)
func (*DuoTemporalWindowPolicy[TLeft, TRight]) NextSelectionReady ¶ added in v0.3.0
func (s *DuoTemporalWindowPolicy[TLeft, TRight]) NextSelectionReady() bool
func (*DuoTemporalWindowPolicy[TLeft, TRight]) Offset ¶ added in v0.3.0
func (s *DuoTemporalWindowPolicy[TLeft, TRight]) Offset(leftOffset, rightOffset int)
func (*DuoTemporalWindowPolicy[TLeft, TRight]) SetBuffers ¶ added in v0.3.0
func (s *DuoTemporalWindowPolicy[TLeft, TRight]) SetBuffers(left BufferReader[TLeft], right BufferReader[TRight])
func (*DuoTemporalWindowPolicy[TLeft, TRight]) Shift ¶ added in v0.3.0
func (s *DuoTemporalWindowPolicy[TLeft, TRight]) Shift()
func (*DuoTemporalWindowPolicy[TLeft, TRight]) UpdateSelection ¶ added in v0.3.0
func (s *DuoTemporalWindowPolicy[TLeft, TRight]) UpdateSelection()
type Event ¶
Event interface for arbitrary events with any content of type T
func NewEventFromOthers ¶
func NewEventFromOthersM ¶
func NewNumericEvent ¶
func NewNumericEvent[T NumericConstraint](content T) Event[T]
type EventBuffer ¶ added in v0.3.0
type EventBuffer[T any] struct { // contains filtered or unexported fields }
EventBuffer is a simple, non-concurrent, in-memory buffer for events. It is not safe for concurrent use without external locking. It implements BufferReader.
func NewEventBuffer ¶ added in v0.3.0
func NewEventBuffer[T any]() *EventBuffer[T]
NewEventBuffer creates a new, empty EventBuffer.
func (*EventBuffer[T]) Add ¶ added in v0.3.0
func (b *EventBuffer[T]) Add(e Event[T])
Add appends an event to the buffer.
func (*EventBuffer[T]) Get ¶ added in v0.3.0
func (b *EventBuffer[T]) Get(i int) Event[T]
Get returns the event at the given index.
func (*EventBuffer[T]) Len ¶ added in v0.3.0
func (b *EventBuffer[T]) Len() int
Len returns the number of events in the buffer.
func (*EventBuffer[T]) Remove ¶ added in v0.3.0
func (b *EventBuffer[T]) Remove(n int)
Remove removes the first n events from the buffer.
type EventChannel ¶
type EventSelection ¶ added in v0.3.0
EventSelection represents a range of events within a buffer slice.
func (EventSelection) IsValid ¶ added in v0.3.0
func (e EventSelection) IsValid() bool
IsValid returns true if Start and End actually represent a possible selection in a buffer
type Iterator ¶ added in v0.3.0
type Iterator[T any] struct { // contains filtered or unexported fields }
func NewIterator ¶ added in v0.3.0
type LimitedConsumableAsyncBuffer ¶ added in v0.3.0
type LimitedConsumableAsyncBuffer[T any] struct { *ConsumableAsyncBuffer[T] // contains filtered or unexported fields }
LimitedConsumableAsyncBuffer is a buffer with a fixed capacity limit that consumes events based on a policy.
func (*LimitedConsumableAsyncBuffer[T]) AddEvent ¶ added in v0.3.0
func (s *LimitedConsumableAsyncBuffer[T]) AddEvent(ctx context.Context, event Event[T]) error
AddEvent adds a single event to the buffer, ensuring the limit is not exceeded.
func (*LimitedConsumableAsyncBuffer[T]) AddEvents ¶ added in v0.3.0
func (s *LimitedConsumableAsyncBuffer[T]) AddEvents(ctx context.Context, events []Event[T]) error
AddEvents adds multiple events to the buffer, ensuring the limit is not exceeded.
func (LimitedConsumableAsyncBuffer) Dump ¶ added in v0.3.0
func (s LimitedConsumableAsyncBuffer) Dump() []Event[T]
func (*LimitedConsumableAsyncBuffer[T]) GetAndConsumeNextEvents ¶ added in v0.3.0
func (s *LimitedConsumableAsyncBuffer[T]) GetAndConsumeNextEvents(ctx context.Context) ([]Event[T], error)
GetAndConsumeNextEvents returns the Next event from the buffer and removes it.
func (*LimitedConsumableAsyncBuffer[T]) GetAndRemoveNextEvent ¶ added in v0.3.0
func (s *LimitedConsumableAsyncBuffer[T]) GetAndRemoveNextEvent(ctx context.Context) (Event[T], error)
func (LimitedConsumableAsyncBuffer) Len ¶ added in v0.3.0
func (s LimitedConsumableAsyncBuffer) Len() int
func (LimitedConsumableAsyncBuffer) PeekNextEvent ¶ added in v0.3.0
PeekNextEvent returns the Next buffered event, but no event will be removed from the buffer. Blocks until at least one event buffered. When stopped, returns nil.
func (LimitedConsumableAsyncBuffer) StartBlocking ¶ added in v0.3.0
func (s LimitedConsumableAsyncBuffer) StartBlocking()
func (LimitedConsumableAsyncBuffer) StopBlocking ¶ added in v0.3.0
func (s LimitedConsumableAsyncBuffer) StopBlocking()
type LimitedSimpleAsyncBuffer ¶ added in v0.3.0
type LimitedSimpleAsyncBuffer[T any] struct { *SimpleAsyncBuffer[T] // contains filtered or unexported fields }
LimitedSimpleAsyncBuffer is a buffer with a fixed capacity limit.
func (*LimitedSimpleAsyncBuffer[T]) AddEvent ¶ added in v0.3.0
func (s *LimitedSimpleAsyncBuffer[T]) AddEvent(ctx context.Context, event Event[T]) error
AddEvent adds a single event to the buffer, ensuring the limit is not exceeded.
func (*LimitedSimpleAsyncBuffer[T]) AddEvents ¶ added in v0.3.0
func (s *LimitedSimpleAsyncBuffer[T]) AddEvents(ctx context.Context, events []Event[T]) error
AddEvents adds multiple events to the buffer, ensuring the limit is not exceeded.
func (LimitedSimpleAsyncBuffer) Dump ¶ added in v0.3.0
func (s LimitedSimpleAsyncBuffer) Dump() []Event[T]
func (*LimitedSimpleAsyncBuffer[T]) GetAndConsumeNextEvents ¶ added in v0.3.0
func (s *LimitedSimpleAsyncBuffer[T]) GetAndConsumeNextEvents(ctx context.Context) ([]Event[T], error)
GetAndConsumeNextEvents returns the Next event from the buffer and removes it.
func (*LimitedSimpleAsyncBuffer[T]) GetAndRemoveNextEvent ¶ added in v0.3.0
func (s *LimitedSimpleAsyncBuffer[T]) GetAndRemoveNextEvent(ctx context.Context) (Event[T], error)
GetAndRemoveNextEvent returns the Next event from the buffer and removes it.
func (LimitedSimpleAsyncBuffer) PeekNextEvent ¶ added in v0.3.0
PeekNextEvent returns the Next buffered event, but no event will be removed from the buffer. Blocks until at least one event buffered. When stopped, returns nil.
func (LimitedSimpleAsyncBuffer) StartBlocking ¶ added in v0.3.0
func (s LimitedSimpleAsyncBuffer) StartBlocking()
func (LimitedSimpleAsyncBuffer) StopBlocking ¶ added in v0.3.0
func (s LimitedSimpleAsyncBuffer) StopBlocking()
type MultiEventSelection ¶ added in v0.3.0
type MultiEventSelection map[int]EventSelection
MultiEventSelection represents a map of buffer indices to their selected ranges.
type MultiPolicy ¶ added in v0.3.0
type MultiPolicy[T any] interface { NextSelectionReady() bool NextSelection() MultiEventSelection UpdateSelection() Shift() Offset(bufferIndex int, offset int) ID() PolicyID SetBuffers(readers map[int]BufferReader[T]) Description() PolicyDescription AddCallback(callback func(map[int][]Event[T])) }
MultiPolicy defines how events are selected from multiple buffers
func NewMultiTemporalWindowPolicy ¶ added in v0.3.0
func NewMultiTemporalWindowPolicy[T any](startingTime time.Time, windowLength time.Duration, windowShift time.Duration) MultiPolicy[T]
NewMultiTemporalWindowPolicy creates a new MultiTemporalWindowPolicy
type MultiTemporalWindowPolicy ¶ added in v0.3.0
MultiTemporalWindowPolicy selects events based on a time window across multiple buffers.
func (*MultiTemporalWindowPolicy[T]) AddCallback ¶ added in v0.3.0
func (s *MultiTemporalWindowPolicy[T]) AddCallback(callback func(map[int][]Event[T]))
func (*MultiTemporalWindowPolicy[T]) Description ¶ added in v0.3.0
func (s *MultiTemporalWindowPolicy[T]) Description() PolicyDescription
func (*MultiTemporalWindowPolicy[T]) NextSelection ¶ added in v0.3.0
func (s *MultiTemporalWindowPolicy[T]) NextSelection() MultiEventSelection
func (*MultiTemporalWindowPolicy[T]) NextSelectionReady ¶ added in v0.3.0
func (s *MultiTemporalWindowPolicy[T]) NextSelectionReady() bool
func (*MultiTemporalWindowPolicy[T]) Offset ¶ added in v0.3.0
func (s *MultiTemporalWindowPolicy[T]) Offset(bufferIndex int, offset int)
func (*MultiTemporalWindowPolicy[T]) SetBuffers ¶ added in v0.3.0
func (s *MultiTemporalWindowPolicy[T]) SetBuffers(buffers map[int]BufferReader[T])
func (*MultiTemporalWindowPolicy[T]) Shift ¶ added in v0.3.0
func (s *MultiTemporalWindowPolicy[T]) Shift()
func (*MultiTemporalWindowPolicy[T]) UpdateSelection ¶ added in v0.3.0
func (s *MultiTemporalWindowPolicy[T]) UpdateSelection()
type NumericConstraint ¶
type NumericConstraint interface {
~int | ~int8 | ~int16 | ~int32 | ~int64 | ~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 | ~float32 | ~float64
}
NumericConstraint constraint to limit the type parameter to numeric types
type NumericEvent ¶
type NumericEvent[T NumericConstraint] struct { TemporalEvent[T] }
NumericEvent restricts the content to numeric data types
type Policy ¶ added in v0.3.0
type Policy[T any] interface { NextSelectionReady() bool NextSelection() EventSelection UpdateSelection() Shift() Offset(offset int) ID() PolicyID SetBuffer(reader BufferReader[T]) Description() PolicyDescription }
Policy defines how events are selected from a buffer
func NewCountingWindowPolicy ¶ added in v0.3.0
NewCountingWindowPolicy creates a new CountingWindowPolicy
func NewPolicyFromDescription ¶ added in v0.3.0
func NewPolicyFromDescription[T any](desc PolicyDescription) (Policy[T], error)
NewPolicyFromDescription creates a new Policy from a PolicyDescription.
func NewSelectNextPolicy ¶ added in v0.3.0
NewSelectNextPolicy creates a new SelectNextPolicy
type PolicyDescription ¶ added in v0.3.0
type PolicyDescription struct {
Active bool `json:"active" yaml:"active"`
Type string `json:"type" yaml:"type"`
// For CountingWindowPolicy
Size int `json:"size,omitempty" yaml:"size,omitempty"`
Slide int `json:"slide,omitempty" yaml:"slide,omitempty"`
// For TemporalWindowPolicy
WindowStart time.Time `json:"windowStart,omitempty" yaml:"windowStart,omitempty"`
WindowLength time.Duration `json:"windowLength,omitempty" yaml:"windowLength,omitempty"`
WindowShift time.Duration `json:"windowShift,omitempty" yaml:"windowShift,omitempty"`
}
PolicyDescription is a serializable representation of a selection policy.
func MakePolicy ¶ added in v0.3.0
func PolicyDescriptionFromJSON ¶ added in v0.3.0
func PolicyDescriptionFromJSON(b []byte) (PolicyDescription, error)
PolicyDescriptionFromJSON parses a PolicyDescription from a JSON byte slice.
func PolicyDescriptionFromYML ¶ added in v0.3.0
func PolicyDescriptionFromYML(b []byte) (PolicyDescription, error)
PolicyDescriptionFromYML parses a PolicyDescription from a YAML byte slice.
func (PolicyDescription) ToJSON ¶ added in v0.3.0
func (d PolicyDescription) ToJSON() ([]byte, error)
ToJSON converts a PolicyDescription to its JSON representation.
func (PolicyDescription) ToYML ¶ added in v0.3.0
func (d PolicyDescription) ToYML() ([]byte, error)
ToYML converts a PolicyDescription to its YAML representation.
type SimpleAsyncBuffer ¶ added in v0.3.0
type SimpleAsyncBuffer[T any] struct { // contains filtered or unexported fields }
SimpleAsyncBuffer allows to sync exactly one reader and n writer. The Read operations GetNextEvent and RemoveNextEvent either return the Next event, if any is available in the buffer or wait until Next event is available.
func (*SimpleAsyncBuffer[T]) AddEvent ¶ added in v0.3.0
func (s *SimpleAsyncBuffer[T]) AddEvent(ctx context.Context, event Event[T]) error
AddEvent adds a single event to the buffer.
func (*SimpleAsyncBuffer[T]) AddEvents ¶ added in v0.3.0
func (s *SimpleAsyncBuffer[T]) AddEvents(ctx context.Context, events []Event[T]) error
AddEvents adds multiple events to the buffer.
func (*SimpleAsyncBuffer[T]) GetAndConsumeNextEvents ¶ added in v0.3.0
func (s *SimpleAsyncBuffer[T]) GetAndConsumeNextEvents(ctx context.Context) ([]Event[T], error)
GetAndConsumeNextEvents returns the Next event from the buffer and removes it.
func (SimpleAsyncBuffer) GetAndRemoveNextEvent ¶ added in v0.3.0
func (SimpleAsyncBuffer) PeekNextEvent ¶ added in v0.3.0
PeekNextEvent returns the Next buffered event, but no event will be removed from the buffer. Blocks until at least one event buffered. When stopped, returns nil.
func (SimpleAsyncBuffer) StartBlocking ¶ added in v0.3.0
func (s SimpleAsyncBuffer) StartBlocking()
func (SimpleAsyncBuffer) StopBlocking ¶ added in v0.3.0
func (s SimpleAsyncBuffer) StopBlocking()
type SortedSimpleAsyncBuffer ¶ added in v0.3.0
type SortedSimpleAsyncBuffer[T any] struct { *SimpleAsyncBuffer[T] // contains filtered or unexported fields }
SortedSimpleAsyncBuffer is a buffer that ensures events are strictly ordered by their StartTime.
func (*SortedSimpleAsyncBuffer[T]) AddEvent ¶ added in v0.3.0
func (s *SortedSimpleAsyncBuffer[T]) AddEvent(ctx context.Context, event Event[T]) error
AddEvent adds a single event to the buffer and sorts it into position.
func (*SortedSimpleAsyncBuffer[T]) AddEvents ¶ added in v0.3.0
func (s *SortedSimpleAsyncBuffer[T]) AddEvents(ctx context.Context, events []Event[T]) error
AddEvents adds multiple events to the buffer and sorts them by StartTime.
func (SortedSimpleAsyncBuffer) Dump ¶ added in v0.3.0
func (s SortedSimpleAsyncBuffer) Dump() []Event[T]
func (*SortedSimpleAsyncBuffer[T]) GetAndConsumeNextEvents ¶ added in v0.3.0
func (s *SortedSimpleAsyncBuffer[T]) GetAndConsumeNextEvents(ctx context.Context) ([]Event[T], error)
GetAndConsumeNextEvents returns the Next event from the buffer and removes it.
func (*SortedSimpleAsyncBuffer[T]) GetAndRemoveNextEvent ¶ added in v0.3.0
func (s *SortedSimpleAsyncBuffer[T]) GetAndRemoveNextEvent(ctx context.Context) (Event[T], error)
GetAndRemoveNextEvent returns the Next event from the buffer and removes it.
func (SortedSimpleAsyncBuffer) PeekNextEvent ¶ added in v0.3.0
PeekNextEvent returns the Next buffered event, but no event will be removed from the buffer. Blocks until at least one event buffered. When stopped, returns nil.
func (SortedSimpleAsyncBuffer) StartBlocking ¶ added in v0.3.0
func (s SortedSimpleAsyncBuffer) StartBlocking()
func (SortedSimpleAsyncBuffer) StopBlocking ¶ added in v0.3.0
func (s SortedSimpleAsyncBuffer) StopBlocking()
type TemporalEvent ¶
TemporalEvent is an event with a TimeStamp, which allows to record the start and end time of an event
func (*TemporalEvent[TContent]) GetContent ¶
func (e *TemporalEvent[TContent]) GetContent() TContent
type TemporalWindowPolicy ¶ added in v0.3.0
TemporalWindowPolicy selects events based on a time window.
func (*TemporalWindowPolicy[T]) Description ¶ added in v0.3.0
func (s *TemporalWindowPolicy[T]) Description() PolicyDescription
func (*TemporalWindowPolicy[T]) NextSelection ¶ added in v0.3.0
func (s *TemporalWindowPolicy[T]) NextSelection() EventSelection
NextSelection returns the EventSelection for the current window
func (*TemporalWindowPolicy[T]) NextSelectionReady ¶ added in v0.3.0
func (s *TemporalWindowPolicy[T]) NextSelectionReady() bool
NextSelectionReady checks if there are no more events within the window
func (*TemporalWindowPolicy[T]) Offset ¶ added in v0.3.0
func (s *TemporalWindowPolicy[T]) Offset(offset int)
func (*TemporalWindowPolicy[T]) SetBuffer ¶ added in v0.3.0
func (s *TemporalWindowPolicy[T]) SetBuffer(buffer BufferReader[T])
func (*TemporalWindowPolicy[T]) Shift ¶ added in v0.3.0
func (s *TemporalWindowPolicy[T]) Shift()
Shift is not relevant for time-based window and is left empty
func (*TemporalWindowPolicy[T]) UpdateSelection ¶ added in v0.3.0
func (s *TemporalWindowPolicy[T]) UpdateSelection()
UpdateSelection updates the window based on the new event's timestamp