Documentation
¶
Index ¶
- Variables
- type ChannelSink
- type Deque
- type DrainSink
- type EventQueue
- func (q *EventQueue[E]) CancelReserved()
- func (q *EventQueue[E]) Clear()
- func (q *EventQueue[E]) Clone() []E
- func (q *EventQueue[E]) Drain()
- func (q *EventQueue[E]) IsRunning() bool
- func (q *EventQueue[E]) Len() (queued, reserved int)
- func (q *EventQueue[E]) Push(e E)
- func (q *EventQueue[E]) Range(fn func(i int, e E) (next bool))
- func (q *EventQueue[E]) Reserve(fn func(context.Context) (E, error))
- func (q *EventQueue[E]) Run(ctx context.Context) (remaining int, err error)
- func (q *EventQueue[E]) WaitReserved() <-chan struct{}
- func (q *EventQueue[E]) WaitUntil(cond func(writing bool, queued, reserved int) bool)
- type Option
- type PriorityQueue
- func (q *PriorityQueue[T]) Clear()
- func (q *PriorityQueue[T]) Clone() []T
- func (q *PriorityQueue[T]) Len() int
- func (q *PriorityQueue[T]) PopFront() T
- func (q *PriorityQueue[T]) PushBack(elem T)
- func (q *PriorityQueue[T]) PushFront(elem T)
- func (q *PriorityQueue[T]) Range(fn func(i int, e T) (next bool))
- type Queue
- type Sink
Constants ¶
This section is empty.
Variables ¶
var ( ErrAlreadyRunning = errors.New("already running") ErrClosed = errors.New("closed") )
Functions ¶
This section is empty.
Types ¶
type ChannelSink ¶
type ChannelSink[E any] struct { // contains filtered or unexported fields }
func NewChannelSink ¶
func NewChannelSink[E any](buf uint) *ChannelSink[E]
func (*ChannelSink[E]) Outlet ¶
func (s *ChannelSink[E]) Outlet() <-chan E
type EventQueue ¶
type EventQueue[E any] struct { // contains filtered or unexported fields }
func (*EventQueue[E]) CancelReserved ¶ added in v0.2.0
func (q *EventQueue[E]) CancelReserved()
CancelReserved cancels all jobs reserved via Reserve. CancelReserved only cancels all reservations present at the time CancelReserved is called. q is still valid and usable after this method returns.
func (*EventQueue[E]) Clear ¶ added in v0.3.0
func (q *EventQueue[E]) Clear()
Clear clears q. It may or may not retain memory allocated for q. Calling Clear on running q might be wrong.
func (*EventQueue[E]) Clone ¶ added in v0.3.0
func (q *EventQueue[E]) Clone() []E
Clone clones internal contents of q. The order of elements always is same as what the Sink would see them.
Calling Clone on running q might be wrong choice since it would block long if q holds many elements. An element being sent through Sink.Write may not be included in returned slice. If Sink.Write failed, the element would be pushed back to the head of q. So any subsequent calls could observe an additional element on head.
func (*EventQueue[E]) Drain ¶
func (q *EventQueue[E]) Drain()
Drain blocks until queue and reserved events become 0.
func (*EventQueue[E]) IsRunning ¶
func (q *EventQueue[E]) IsRunning() bool
func (*EventQueue[E]) Len ¶
func (q *EventQueue[E]) Len() (queued, reserved int)
func (*EventQueue[E]) Push ¶
func (q *EventQueue[E]) Push(e E)
func (*EventQueue[E]) Range ¶ added in v0.3.0
func (q *EventQueue[E]) Range(fn func(i int, e E) (next bool))
Range calls fn sequentially for each element in q. If fn returns false, range stops the iteration. The order of elements always is same as what the Sink would see them.
func (*EventQueue[E]) Reserve ¶
func (q *EventQueue[E]) Reserve(fn func(context.Context) (E, error))
Reserve reserves an update which will occur after fn returns.
fn will be called in a newly created goroutine, and it must return E. It also must respect ctx cancellation whose cause will be ErrClosed in case it has been cancelled. Cancellation would only happen if CloseReserved was called.
E returned by fn enters q only and only if it returned nil error.
func (*EventQueue[E]) Run ¶
func (q *EventQueue[E]) Run(ctx context.Context) (remaining int, err error)
Run runs q, block until ctx is cancelled.
func (*EventQueue[E]) WaitReserved ¶
func (q *EventQueue[E]) WaitReserved() <-chan struct{}
WaitReserved returns a channel which receives every time reserved event enters into the queue, or has been cancelled.
The channel is closed once all reservation events, which was present at the moment WaitReserved is called, are done.
type Option ¶
type Option[E any] func(q *EventQueue[E])
func WithReservationTimeout ¶ added in v0.3.0
type PriorityQueue ¶ added in v0.3.0
type PriorityQueue[T any] struct { // contains filtered or unexported fields }
PriorityQueue implements Queue.
func NewPriorityQueue ¶ added in v0.3.0
func NewPriorityQueue[T any](init []T, less func(i T, j T) bool, methods priorityqueue.SliceInterfaceMethods[T]) *PriorityQueue[T]
func (*PriorityQueue[T]) Clear ¶ added in v0.3.0
func (q *PriorityQueue[T]) Clear()
func (*PriorityQueue[T]) Clone ¶ added in v0.3.0
func (q *PriorityQueue[T]) Clone() []T
func (*PriorityQueue[T]) Len ¶ added in v0.3.0
func (q *PriorityQueue[T]) Len() int
func (*PriorityQueue[T]) PopFront ¶ added in v0.3.0
func (q *PriorityQueue[T]) PopFront() T
func (*PriorityQueue[T]) PushBack ¶ added in v0.3.0
func (q *PriorityQueue[T]) PushBack(elem T)
func (*PriorityQueue[T]) PushFront ¶ added in v0.3.0
func (q *PriorityQueue[T]) PushFront(elem T)
func (*PriorityQueue[T]) Range ¶ added in v0.3.0
func (q *PriorityQueue[T]) Range(fn func(i int, e T) (next bool))
type Sink ¶
type Sink[E any] interface { // Write writes the event object to Sink. // If Write returns error, the event is put back to head of the queue, // which does not make q.Run return with that error, // only suspends q until next Push or after retry timeout. Write(ctx context.Context, event E) error }
Sink is written once EventQueue receives events. Write is serialized in EventQueue. It can be goroutine-unsafe method.