events

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2026 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CountingWindow = "counting"
	TemporalWindow = "temporal"
	SelectNext     = "selectNext"
)

Variables

View Source
var (
	ErrBufferStopped = errors.New("buffer: is stopped")
	ErrLimitExceeded = errors.New("buffer: limit exceeded")
)

Functions

This section is empty.

Types

type Buffer added in v0.3.0

type Buffer[T any] interface {
	GetAndConsumeNextEvents(ctx context.Context) ([]Event[T], error)
	PeekNextEvent(ctx context.Context) (Event[T], error)
	AddEvent(ctx context.Context, event Event[T]) error
	AddEvents(ctx context.Context, events []Event[T]) error
	Len() int
	Get(x int) Event[T]
	Dump() []Event[T]
	StopBlocking()
	StartBlocking()
	GetAndRemoveNextEvent(ctx context.Context) (Event[T], error)
}

Buffer interface to interact with any buffer

func NewConsumableAsyncBuffer added in v0.3.0

func NewConsumableAsyncBuffer[T any](policy Policy[T]) Buffer[T]

NewConsumableAsyncBuffer creates a new buffer that consumes events based on a selection policy.

func NewLimitedConsumableAsyncBuffer added in v0.3.0

func NewLimitedConsumableAsyncBuffer[T any](policy Policy[T], limit int) Buffer[T]

NewLimitedConsumableAsyncBuffer creates a new buffer that consumes events based on a selection policy with a maximum event limit.

func NewLimitedSimpleAsyncBuffer added in v0.3.0

func NewLimitedSimpleAsyncBuffer[T any](limit int) Buffer[T]

NewLimitedSimpleAsyncBuffer creates a new asynchronous buffer with a maximum event limit.

func NewSimpleAsyncBuffer added in v0.3.0

func NewSimpleAsyncBuffer[T any]() Buffer[T]

NewSimpleAsyncBuffer creates a new unbounded asynchronous buffer.

func NewSortedSimpleAsyncBuffer added in v0.3.0

func NewSortedSimpleAsyncBuffer[T any](limit int) Buffer[T]

NewSortedSimpleAsyncBuffer creates a new asynchronous buffer that sorts events by timestamp on insertion. A limit <= 0 means the buffer is unbounded.

type BufferReader added in v0.3.0

type BufferReader[T any] interface {
	Get(i int) Event[T]
	Len() int
}

BufferReader allows read-only access to an underlying event buffer that implements BufferReader

type ConsumableAsyncBuffer added in v0.3.0

type ConsumableAsyncBuffer[T any] struct {
	// contains filtered or unexported fields
}

ConsumableAsyncBuffer allows to sync exactly one reader and n writer. The Read operations PeekNextEvent and RemoveNextEvent either return the Next event, if any is available in the buffer or wait until Next event is available based on a selection policy. see selection.Policy[T]

func (*ConsumableAsyncBuffer[T]) AddEvent added in v0.3.0

func (s *ConsumableAsyncBuffer[T]) AddEvent(ctx context.Context, event Event[T]) error

AddEvent adds a single event to the buffer and updates the selection policy.

func (*ConsumableAsyncBuffer[T]) AddEvents added in v0.3.0

func (s *ConsumableAsyncBuffer[T]) AddEvents(ctx context.Context, events []Event[T]) error

AddEvents adds multiple events to the buffer and updates the selection policy.

func (ConsumableAsyncBuffer) Dump added in v0.3.0

func (s ConsumableAsyncBuffer) Dump() []Event[T]

func (ConsumableAsyncBuffer) Get added in v0.3.0

func (s ConsumableAsyncBuffer) Get(i int) Event[T]

func (*ConsumableAsyncBuffer[T]) GetAndConsumeNextEvents added in v0.3.0

func (s *ConsumableAsyncBuffer[T]) GetAndConsumeNextEvents(ctx context.Context) ([]Event[T], error)

GetAndConsumeNextEvents returns the Next buffered events and removes this event from the buffer. Blocks until at least one event buffered. When stopped, returns nil.

func (ConsumableAsyncBuffer) GetAndRemoveNextEvent added in v0.3.0

func (s ConsumableAsyncBuffer) GetAndRemoveNextEvent(ctx context.Context) (Event[T], error)

func (ConsumableAsyncBuffer) Len added in v0.3.0

func (s ConsumableAsyncBuffer) Len() int

func (ConsumableAsyncBuffer) PeekNextEvent added in v0.3.0

func (s ConsumableAsyncBuffer) PeekNextEvent(ctx context.Context) (Event[T], error)

PeekNextEvent returns the Next buffered event, but no event will be removed from the buffer. Blocks until at least one event buffered. When stopped, returns nil.

func (ConsumableAsyncBuffer) StartBlocking added in v0.3.0

func (s ConsumableAsyncBuffer) StartBlocking()

func (ConsumableAsyncBuffer) StopBlocking added in v0.3.0

func (s ConsumableAsyncBuffer) StopBlocking()

type CountingWindowPolicy added in v0.3.0

type CountingWindowPolicy[T any] struct {
	PolicyID
	// contains filtered or unexported fields
}

CountingWindowPolicy selects a fixed number of events (n) with a sliding window (shift).

func (*CountingWindowPolicy[T]) Description added in v0.3.0

func (s *CountingWindowPolicy[T]) Description() PolicyDescription

func (*CountingWindowPolicy[T]) NextSelection added in v0.3.0

func (s *CountingWindowPolicy[T]) NextSelection() EventSelection

func (*CountingWindowPolicy[T]) NextSelectionReady added in v0.3.0

func (s *CountingWindowPolicy[T]) NextSelectionReady() bool

func (*CountingWindowPolicy[T]) Offset added in v0.3.0

func (s *CountingWindowPolicy[T]) Offset(offset int)

func (*CountingWindowPolicy[T]) SetBuffer added in v0.3.0

func (s *CountingWindowPolicy[T]) SetBuffer(buffer BufferReader[T])

func (*CountingWindowPolicy[T]) Shift added in v0.3.0

func (s *CountingWindowPolicy[T]) Shift()

func (*CountingWindowPolicy[T]) UpdateSelection added in v0.3.0

func (s *CountingWindowPolicy[T]) UpdateSelection()

type DuoPolicy added in v0.3.0

type DuoPolicy[TLeft, TRight any] interface {
	NextSelectionReady() bool
	NextSelection() (EventSelection, EventSelection)
	UpdateSelection()
	Shift()
	Offset(leftOffset, rightOffset int)
	ID() PolicyID
	SetBuffers(left BufferReader[TLeft], right BufferReader[TRight])
	Description() PolicyDescription
	AddCallback(callback func([]Event[TLeft], []Event[TRight]))
}

DuoPolicy defines how events are selected from two buffers of different types

func NewDuoTemporalWindowPolicy added in v0.3.0

func NewDuoTemporalWindowPolicy[TLeft, TRight any](startingTime time.Time, windowLength time.Duration, windowShift time.Duration) DuoPolicy[TLeft, TRight]

type DuoTemporalWindowPolicy added in v0.3.0

type DuoTemporalWindowPolicy[TLeft, TRight any] struct {
	PolicyID
	// contains filtered or unexported fields
}

DuoTemporalWindowPolicy selects events based on a time window across two buffers.

func (*DuoTemporalWindowPolicy[TLeft, TRight]) AddCallback added in v0.3.0

func (s *DuoTemporalWindowPolicy[TLeft, TRight]) AddCallback(callback func([]Event[TLeft], []Event[TRight]))

func (*DuoTemporalWindowPolicy[TLeft, TRight]) Description added in v0.3.0

func (s *DuoTemporalWindowPolicy[TLeft, TRight]) Description() PolicyDescription

func (*DuoTemporalWindowPolicy[TLeft, TRight]) NextSelection added in v0.3.0

func (s *DuoTemporalWindowPolicy[TLeft, TRight]) NextSelection() (EventSelection, EventSelection)

func (*DuoTemporalWindowPolicy[TLeft, TRight]) NextSelectionReady added in v0.3.0

func (s *DuoTemporalWindowPolicy[TLeft, TRight]) NextSelectionReady() bool

func (*DuoTemporalWindowPolicy[TLeft, TRight]) Offset added in v0.3.0

func (s *DuoTemporalWindowPolicy[TLeft, TRight]) Offset(leftOffset, rightOffset int)

func (*DuoTemporalWindowPolicy[TLeft, TRight]) SetBuffers added in v0.3.0

func (s *DuoTemporalWindowPolicy[TLeft, TRight]) SetBuffers(left BufferReader[TLeft], right BufferReader[TRight])

func (*DuoTemporalWindowPolicy[TLeft, TRight]) Shift added in v0.3.0

func (s *DuoTemporalWindowPolicy[TLeft, TRight]) Shift()

func (*DuoTemporalWindowPolicy[TLeft, TRight]) UpdateSelection added in v0.3.0

func (s *DuoTemporalWindowPolicy[TLeft, TRight]) UpdateSelection()

type Event

type Event[TContent any] interface {
	GetStamp() TimeStamp
	GetContent() TContent
}

Event interface for arbitrary events with any content of type T

func Arr

func Arr[TContent any](events ...Event[TContent]) []Event[TContent]

func NewEvent

func NewEvent[TContent any](content TContent) Event[TContent]

func NewEventFromJSON

func NewEventFromJSON(b []byte) (Event[map[string]interface{}], error)

func NewEventFromOthers

func NewEventFromOthers[TContent any](content TContent, others ...TimeStamp) Event[TContent]

func NewEventFromOthersM

func NewEventFromOthersM[TContent any](content TContent, meta StampMeta, others ...TimeStamp) Event[TContent]

func NewEventM

func NewEventM[TContent any](content TContent, meta StampMeta) Event[TContent]

func NewNumericEvent

func NewNumericEvent[T NumericConstraint](content T) Event[T]

type EventBuffer added in v0.3.0

type EventBuffer[T any] struct {
	// contains filtered or unexported fields
}

EventBuffer is a simple, non-concurrent, in-memory buffer for events. It is not safe for concurrent use without external locking. It implements BufferReader.

func NewEventBuffer added in v0.3.0

func NewEventBuffer[T any]() *EventBuffer[T]

NewEventBuffer creates a new, empty EventBuffer.

func (*EventBuffer[T]) Add added in v0.3.0

func (b *EventBuffer[T]) Add(e Event[T])

Add appends an event to the buffer.

func (*EventBuffer[T]) Get added in v0.3.0

func (b *EventBuffer[T]) Get(i int) Event[T]

Get returns the event at the given index.

func (*EventBuffer[T]) Len added in v0.3.0

func (b *EventBuffer[T]) Len() int

Len returns the number of events in the buffer.

func (*EventBuffer[T]) Remove added in v0.3.0

func (b *EventBuffer[T]) Remove(n int)

Remove removes the first n events from the buffer.

type EventChannel

type EventChannel[TContent any] chan Event[TContent]

type EventSelection added in v0.3.0

type EventSelection struct {
	Start int
	End   int
}

EventSelection represents a range of events within a buffer slice.

func (EventSelection) IsValid added in v0.3.0

func (e EventSelection) IsValid() bool

IsValid returns true if Start and End actually represent a possible selection in a buffer

type Iterator added in v0.3.0

type Iterator[T any] struct {
	// contains filtered or unexported fields
}

func NewIterator added in v0.3.0

func NewIterator[T any](buffer Buffer[T]) *Iterator[T]

func (*Iterator[T]) HasNext added in v0.3.0

func (i *Iterator[T]) HasNext() bool

func (*Iterator[T]) Next added in v0.3.0

func (i *Iterator[T]) Next() Event[T]

type LimitedConsumableAsyncBuffer added in v0.3.0

type LimitedConsumableAsyncBuffer[T any] struct {
	*ConsumableAsyncBuffer[T]
	// contains filtered or unexported fields
}

LimitedConsumableAsyncBuffer is a buffer with a fixed capacity limit that consumes events based on a policy.

func (*LimitedConsumableAsyncBuffer[T]) AddEvent added in v0.3.0

func (s *LimitedConsumableAsyncBuffer[T]) AddEvent(ctx context.Context, event Event[T]) error

AddEvent adds a single event to the buffer, ensuring the limit is not exceeded.

func (*LimitedConsumableAsyncBuffer[T]) AddEvents added in v0.3.0

func (s *LimitedConsumableAsyncBuffer[T]) AddEvents(ctx context.Context, events []Event[T]) error

AddEvents adds multiple events to the buffer, ensuring the limit is not exceeded.

func (LimitedConsumableAsyncBuffer) Dump added in v0.3.0

func (s LimitedConsumableAsyncBuffer) Dump() []Event[T]

func (LimitedConsumableAsyncBuffer) Get added in v0.3.0

func (s LimitedConsumableAsyncBuffer) Get(i int) Event[T]

func (*LimitedConsumableAsyncBuffer[T]) GetAndConsumeNextEvents added in v0.3.0

func (s *LimitedConsumableAsyncBuffer[T]) GetAndConsumeNextEvents(ctx context.Context) ([]Event[T], error)

GetAndConsumeNextEvents returns the Next event from the buffer and removes it.

func (*LimitedConsumableAsyncBuffer[T]) GetAndRemoveNextEvent added in v0.3.0

func (s *LimitedConsumableAsyncBuffer[T]) GetAndRemoveNextEvent(ctx context.Context) (Event[T], error)

func (LimitedConsumableAsyncBuffer) Len added in v0.3.0

func (s LimitedConsumableAsyncBuffer) Len() int

func (LimitedConsumableAsyncBuffer) PeekNextEvent added in v0.3.0

func (s LimitedConsumableAsyncBuffer) PeekNextEvent(ctx context.Context) (Event[T], error)

PeekNextEvent returns the Next buffered event, but no event will be removed from the buffer. Blocks until at least one event buffered. When stopped, returns nil.

func (LimitedConsumableAsyncBuffer) StartBlocking added in v0.3.0

func (s LimitedConsumableAsyncBuffer) StartBlocking()

func (LimitedConsumableAsyncBuffer) StopBlocking added in v0.3.0

func (s LimitedConsumableAsyncBuffer) StopBlocking()

type LimitedSimpleAsyncBuffer added in v0.3.0

type LimitedSimpleAsyncBuffer[T any] struct {
	*SimpleAsyncBuffer[T]
	// contains filtered or unexported fields
}

LimitedSimpleAsyncBuffer is a buffer with a fixed capacity limit.

func (*LimitedSimpleAsyncBuffer[T]) AddEvent added in v0.3.0

func (s *LimitedSimpleAsyncBuffer[T]) AddEvent(ctx context.Context, event Event[T]) error

AddEvent adds a single event to the buffer, ensuring the limit is not exceeded.

func (*LimitedSimpleAsyncBuffer[T]) AddEvents added in v0.3.0

func (s *LimitedSimpleAsyncBuffer[T]) AddEvents(ctx context.Context, events []Event[T]) error

AddEvents adds multiple events to the buffer, ensuring the limit is not exceeded.

func (LimitedSimpleAsyncBuffer) Dump added in v0.3.0

func (s LimitedSimpleAsyncBuffer) Dump() []Event[T]

func (LimitedSimpleAsyncBuffer) Get added in v0.3.0

func (s LimitedSimpleAsyncBuffer) Get(i int) Event[T]

func (*LimitedSimpleAsyncBuffer[T]) GetAndConsumeNextEvents added in v0.3.0

func (s *LimitedSimpleAsyncBuffer[T]) GetAndConsumeNextEvents(ctx context.Context) ([]Event[T], error)

GetAndConsumeNextEvents returns the Next event from the buffer and removes it.

func (*LimitedSimpleAsyncBuffer[T]) GetAndRemoveNextEvent added in v0.3.0

func (s *LimitedSimpleAsyncBuffer[T]) GetAndRemoveNextEvent(ctx context.Context) (Event[T], error)

GetAndRemoveNextEvent returns the Next event from the buffer and removes it.

func (LimitedSimpleAsyncBuffer) Len added in v0.3.0

func (s LimitedSimpleAsyncBuffer) Len() int

func (LimitedSimpleAsyncBuffer) PeekNextEvent added in v0.3.0

func (s LimitedSimpleAsyncBuffer) PeekNextEvent(ctx context.Context) (Event[T], error)

PeekNextEvent returns the Next buffered event, but no event will be removed from the buffer. Blocks until at least one event buffered. When stopped, returns nil.

func (LimitedSimpleAsyncBuffer) StartBlocking added in v0.3.0

func (s LimitedSimpleAsyncBuffer) StartBlocking()

func (LimitedSimpleAsyncBuffer) StopBlocking added in v0.3.0

func (s LimitedSimpleAsyncBuffer) StopBlocking()

type MultiEventSelection added in v0.3.0

type MultiEventSelection map[int]EventSelection

MultiEventSelection represents a map of buffer indices to their selected ranges.

type MultiPolicy added in v0.3.0

type MultiPolicy[T any] interface {
	NextSelectionReady() bool
	NextSelection() MultiEventSelection
	UpdateSelection()
	Shift()
	Offset(bufferIndex int, offset int)
	ID() PolicyID
	SetBuffers(readers map[int]BufferReader[T])
	Description() PolicyDescription
	AddCallback(callback func(map[int][]Event[T]))
}

MultiPolicy defines how events are selected from multiple buffers

func NewMultiTemporalWindowPolicy added in v0.3.0

func NewMultiTemporalWindowPolicy[T any](startingTime time.Time, windowLength time.Duration, windowShift time.Duration) MultiPolicy[T]

NewMultiTemporalWindowPolicy creates a new MultiTemporalWindowPolicy

type MultiTemporalWindowPolicy added in v0.3.0

type MultiTemporalWindowPolicy[T any] struct {
	PolicyID
	// contains filtered or unexported fields
}

MultiTemporalWindowPolicy selects events based on a time window across multiple buffers.

func (*MultiTemporalWindowPolicy[T]) AddCallback added in v0.3.0

func (s *MultiTemporalWindowPolicy[T]) AddCallback(callback func(map[int][]Event[T]))

func (*MultiTemporalWindowPolicy[T]) Description added in v0.3.0

func (s *MultiTemporalWindowPolicy[T]) Description() PolicyDescription

func (*MultiTemporalWindowPolicy[T]) NextSelection added in v0.3.0

func (s *MultiTemporalWindowPolicy[T]) NextSelection() MultiEventSelection

func (*MultiTemporalWindowPolicy[T]) NextSelectionReady added in v0.3.0

func (s *MultiTemporalWindowPolicy[T]) NextSelectionReady() bool

func (*MultiTemporalWindowPolicy[T]) Offset added in v0.3.0

func (s *MultiTemporalWindowPolicy[T]) Offset(bufferIndex int, offset int)

func (*MultiTemporalWindowPolicy[T]) SetBuffers added in v0.3.0

func (s *MultiTemporalWindowPolicy[T]) SetBuffers(buffers map[int]BufferReader[T])

func (*MultiTemporalWindowPolicy[T]) Shift added in v0.3.0

func (s *MultiTemporalWindowPolicy[T]) Shift()

func (*MultiTemporalWindowPolicy[T]) UpdateSelection added in v0.3.0

func (s *MultiTemporalWindowPolicy[T]) UpdateSelection()

type NumericConstraint

type NumericConstraint interface {
	~int | ~int8 | ~int16 | ~int32 | ~int64 | ~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 | ~float32 | ~float64
}

NumericConstraint constraint to limit the type parameter to numeric types

type NumericEvent

type NumericEvent[T NumericConstraint] struct {
	TemporalEvent[T]
}

NumericEvent restricts the content to numeric data types

type Policy added in v0.3.0

type Policy[T any] interface {
	NextSelectionReady() bool
	NextSelection() EventSelection
	UpdateSelection()
	Shift()
	Offset(offset int)
	ID() PolicyID
	SetBuffer(reader BufferReader[T])
	Description() PolicyDescription
}

Policy defines how events are selected from a buffer

func NewCountingWindowPolicy added in v0.3.0

func NewCountingWindowPolicy[T any](n int, shift int) Policy[T]

NewCountingWindowPolicy creates a new CountingWindowPolicy

func NewPolicyFromDescription added in v0.3.0

func NewPolicyFromDescription[T any](desc PolicyDescription) (Policy[T], error)

NewPolicyFromDescription creates a new Policy from a PolicyDescription.

func NewSelectNextPolicy added in v0.3.0

func NewSelectNextPolicy[T any]() Policy[T]

NewSelectNextPolicy creates a new SelectNextPolicy

func NewTemporalWindowPolicy added in v0.3.0

func NewTemporalWindowPolicy[T any](startingTime time.Time, windowLength time.Duration, windowShift time.Duration) Policy[T]

NewTemporalWindowPolicy creates a new TemporalWindowPolicy with the specified window and buffer

type PolicyDescription added in v0.3.0

type PolicyDescription struct {
	Active bool   `json:"active" yaml:"active"`
	Type   string `json:"type" yaml:"type"`
	// For CountingWindowPolicy
	Size  int `json:"size,omitempty" yaml:"size,omitempty"`
	Slide int `json:"slide,omitempty" yaml:"slide,omitempty"`
	// For TemporalWindowPolicy
	WindowStart  time.Time     `json:"windowStart,omitempty" yaml:"windowStart,omitempty"`
	WindowLength time.Duration `json:"windowLength,omitempty" yaml:"windowLength,omitempty"`
	WindowShift  time.Duration `json:"windowShift,omitempty" yaml:"windowShift,omitempty"`
}

PolicyDescription is a serializable representation of a selection policy.

func MakePolicy added in v0.3.0

func MakePolicy(t string, size int, slide int, windowStart time.Time, windowLength time.Duration, windowShift time.Duration) PolicyDescription

func PolicyDescriptionFromJSON added in v0.3.0

func PolicyDescriptionFromJSON(b []byte) (PolicyDescription, error)

PolicyDescriptionFromJSON parses a PolicyDescription from a JSON byte slice.

func PolicyDescriptionFromYML added in v0.3.0

func PolicyDescriptionFromYML(b []byte) (PolicyDescription, error)

PolicyDescriptionFromYML parses a PolicyDescription from a YAML byte slice.

func (PolicyDescription) ToJSON added in v0.3.0

func (d PolicyDescription) ToJSON() ([]byte, error)

ToJSON converts a PolicyDescription to its JSON representation.

func (PolicyDescription) ToYML added in v0.3.0

func (d PolicyDescription) ToYML() ([]byte, error)

ToYML converts a PolicyDescription to its YAML representation.

type PolicyID added in v0.3.0

type PolicyID uuid.UUID

PolicyID of each individual Policy

func (PolicyID) ID added in v0.3.0

func (id PolicyID) ID() PolicyID

func (PolicyID) String added in v0.3.0

func (id PolicyID) String() string

type SimpleAsyncBuffer added in v0.3.0

type SimpleAsyncBuffer[T any] struct {
	// contains filtered or unexported fields
}

SimpleAsyncBuffer allows to sync exactly one reader and n writer. The Read operations GetNextEvent and RemoveNextEvent either return the Next event, if any is available in the buffer or wait until Next event is available.

func (*SimpleAsyncBuffer[T]) AddEvent added in v0.3.0

func (s *SimpleAsyncBuffer[T]) AddEvent(ctx context.Context, event Event[T]) error

AddEvent adds a single event to the buffer.

func (*SimpleAsyncBuffer[T]) AddEvents added in v0.3.0

func (s *SimpleAsyncBuffer[T]) AddEvents(ctx context.Context, events []Event[T]) error

AddEvents adds multiple events to the buffer.

func (SimpleAsyncBuffer) Dump added in v0.3.0

func (s SimpleAsyncBuffer) Dump() []Event[T]

func (SimpleAsyncBuffer) Get added in v0.3.0

func (s SimpleAsyncBuffer) Get(i int) Event[T]

func (*SimpleAsyncBuffer[T]) GetAndConsumeNextEvents added in v0.3.0

func (s *SimpleAsyncBuffer[T]) GetAndConsumeNextEvents(ctx context.Context) ([]Event[T], error)

GetAndConsumeNextEvents returns the Next event from the buffer and removes it.

func (SimpleAsyncBuffer) GetAndRemoveNextEvent added in v0.3.0

func (s SimpleAsyncBuffer) GetAndRemoveNextEvent(ctx context.Context) (Event[T], error)

func (SimpleAsyncBuffer) Len added in v0.3.0

func (s SimpleAsyncBuffer) Len() int

func (SimpleAsyncBuffer) PeekNextEvent added in v0.3.0

func (s SimpleAsyncBuffer) PeekNextEvent(ctx context.Context) (Event[T], error)

PeekNextEvent returns the Next buffered event, but no event will be removed from the buffer. Blocks until at least one event buffered. When stopped, returns nil.

func (SimpleAsyncBuffer) StartBlocking added in v0.3.0

func (s SimpleAsyncBuffer) StartBlocking()

func (SimpleAsyncBuffer) StopBlocking added in v0.3.0

func (s SimpleAsyncBuffer) StopBlocking()

type SortedSimpleAsyncBuffer added in v0.3.0

type SortedSimpleAsyncBuffer[T any] struct {
	*SimpleAsyncBuffer[T]
	// contains filtered or unexported fields
}

SortedSimpleAsyncBuffer is a buffer that ensures events are strictly ordered by their StartTime.

func (*SortedSimpleAsyncBuffer[T]) AddEvent added in v0.3.0

func (s *SortedSimpleAsyncBuffer[T]) AddEvent(ctx context.Context, event Event[T]) error

AddEvent adds a single event to the buffer and sorts it into position.

func (*SortedSimpleAsyncBuffer[T]) AddEvents added in v0.3.0

func (s *SortedSimpleAsyncBuffer[T]) AddEvents(ctx context.Context, events []Event[T]) error

AddEvents adds multiple events to the buffer and sorts them by StartTime.

func (SortedSimpleAsyncBuffer) Dump added in v0.3.0

func (s SortedSimpleAsyncBuffer) Dump() []Event[T]

func (SortedSimpleAsyncBuffer) Get added in v0.3.0

func (s SortedSimpleAsyncBuffer) Get(i int) Event[T]

func (*SortedSimpleAsyncBuffer[T]) GetAndConsumeNextEvents added in v0.3.0

func (s *SortedSimpleAsyncBuffer[T]) GetAndConsumeNextEvents(ctx context.Context) ([]Event[T], error)

GetAndConsumeNextEvents returns the Next event from the buffer and removes it.

func (*SortedSimpleAsyncBuffer[T]) GetAndRemoveNextEvent added in v0.3.0

func (s *SortedSimpleAsyncBuffer[T]) GetAndRemoveNextEvent(ctx context.Context) (Event[T], error)

GetAndRemoveNextEvent returns the Next event from the buffer and removes it.

func (SortedSimpleAsyncBuffer) Len added in v0.3.0

func (s SortedSimpleAsyncBuffer) Len() int

func (SortedSimpleAsyncBuffer) PeekNextEvent added in v0.3.0

func (s SortedSimpleAsyncBuffer) PeekNextEvent(ctx context.Context) (Event[T], error)

PeekNextEvent returns the Next buffered event, but no event will be removed from the buffer. Blocks until at least one event buffered. When stopped, returns nil.

func (SortedSimpleAsyncBuffer) StartBlocking added in v0.3.0

func (s SortedSimpleAsyncBuffer) StartBlocking()

func (SortedSimpleAsyncBuffer) StopBlocking added in v0.3.0

func (s SortedSimpleAsyncBuffer) StopBlocking()

type StampMeta

type StampMeta map[string]interface{}

type TemporalEvent

type TemporalEvent[TContent any] struct {
	Stamp   TimeStamp
	Content TContent
}

TemporalEvent is an event with a TimeStamp, which allows to record the start and end time of an event

func (*TemporalEvent[TContent]) GetContent

func (e *TemporalEvent[TContent]) GetContent() TContent

func (*TemporalEvent[TContent]) GetStamp

func (e *TemporalEvent[TContent]) GetStamp() TimeStamp

type TemporalWindowPolicy added in v0.3.0

type TemporalWindowPolicy[T any] struct {
	PolicyID
	// contains filtered or unexported fields
}

TemporalWindowPolicy selects events based on a time window.

func (*TemporalWindowPolicy[T]) Description added in v0.3.0

func (s *TemporalWindowPolicy[T]) Description() PolicyDescription

func (*TemporalWindowPolicy[T]) NextSelection added in v0.3.0

func (s *TemporalWindowPolicy[T]) NextSelection() EventSelection

NextSelection returns the EventSelection for the current window

func (*TemporalWindowPolicy[T]) NextSelectionReady added in v0.3.0

func (s *TemporalWindowPolicy[T]) NextSelectionReady() bool

NextSelectionReady checks if there are no more events within the window

func (*TemporalWindowPolicy[T]) Offset added in v0.3.0

func (s *TemporalWindowPolicy[T]) Offset(offset int)

func (*TemporalWindowPolicy[T]) SetBuffer added in v0.3.0

func (s *TemporalWindowPolicy[T]) SetBuffer(buffer BufferReader[T])

func (*TemporalWindowPolicy[T]) Shift added in v0.3.0

func (s *TemporalWindowPolicy[T]) Shift()

Shift is not relevant for time-based window and is left empty

func (*TemporalWindowPolicy[T]) UpdateSelection added in v0.3.0

func (s *TemporalWindowPolicy[T]) UpdateSelection()

UpdateSelection updates the window based on the new event's timestamp

type TimeStamp

type TimeStamp struct {
	StartTime time.Time `json:"start_time" yaml:"start_time"`
	EndTime   time.Time `json:"end_time"  yaml:"end_time"`
	Meta      StampMeta `json:"meta"  yaml:"meta"`
}

func GetTimeStamps added in v0.2.0

func GetTimeStamps[T any](others ...Event[T]) []TimeStamp

func (TimeStamp) Content

func (t TimeStamp) Content() map[string]interface{}

func (TimeStamp) String

func (t TimeStamp) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL