Documentation
¶
Index ¶
- type BufferableEvent
- type Element
- type GroupTree
- type List
- type TimeBuffer
- func (t *TimeBuffer[T, TT]) All() (slice []TT)
- func (t *TimeBuffer[T, TT]) LatestVisibleRev() int64
- func (t *TimeBuffer[T, TT]) Len() int
- func (t *TimeBuffer[T, TT]) Push(event TT)
- func (t *TimeBuffer[T, TT]) Range(start int64, query T) ([]TT, int64, int64)
- func (t *TimeBuffer[T, TT]) Run(ctx context.Context)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BufferableEvent ¶
type GroupTree ¶
type GroupTree[T any] struct { // contains filtered or unexported fields }
GroupTree extends etcd's ADT package by grouping listeners such that they can overlap. Otherwise ADT only supports a single listener per keyspace.
func NewGroupTree ¶
type List ¶
List was adopted from https://gist.github.com/pje/90e727f80685c78a6c1cfff35f62155a. Replaec with container/list once it's generic.
func (*List[T]) InsertAfter ¶
type TimeBuffer ¶
type TimeBuffer[T any, TT BufferableEvent[T]] struct { // contains filtered or unexported fields }
TimeBuffer buffers events and sorts them by their logical timestamp.
Events are only visible to readers when the preceding event has been received. If the preceding event is never received, the following event will still become visible after a (wallclock) timeout period in order to prevent deadlocks caused dropped events.
In practice, it is useful for merging streams of events that may be out of order due to network latency, and have the possibility of missing events due to network partitions.
func NewTimeBuffer ¶
func NewTimeBuffer[T any, TT BufferableEvent[T]](gapTimeout time.Duration, len int, ch chan<- TT) *TimeBuffer[T, TT]
func (*TimeBuffer[T, TT]) All ¶
func (t *TimeBuffer[T, TT]) All() (slice []TT)
func (*TimeBuffer[T, TT]) LatestVisibleRev ¶
func (t *TimeBuffer[T, TT]) LatestVisibleRev() int64
func (*TimeBuffer[T, TT]) Len ¶
func (t *TimeBuffer[T, TT]) Len() int
func (*TimeBuffer[T, TT]) Push ¶
func (t *TimeBuffer[T, TT]) Push(event TT)
func (*TimeBuffer[T, TT]) Range ¶
func (t *TimeBuffer[T, TT]) Range(start int64, query T) ([]TT, int64, int64)
func (*TimeBuffer[T, TT]) Run ¶
func (t *TimeBuffer[T, TT]) Run(ctx context.Context)