Documentation
¶
Index ¶
- Variables
- type ChannelSink
- type EventQueue
- func (q *EventQueue[E]) CancelReserved()
- func (q *EventQueue[E]) Drain()
- func (q *EventQueue[E]) IsRunning() bool
- func (q *EventQueue[E]) Len() (queued, reserved int)
- func (q *EventQueue[E]) Push(e E)
- func (q *EventQueue[E]) Reserve(fn func(context.Context) (E, error))
- func (q *EventQueue[E]) Run(ctx context.Context) (remaining int, err error)
- func (q *EventQueue[E]) WaitReserved() <-chan struct{}
- type Option
- type Sink
Constants ¶
This section is empty.
Variables ¶
var ( ErrAlreadyRunning = errors.New("already running") ErrClosed = errors.New("closed") )
Functions ¶
This section is empty.
Types ¶
type ChannelSink ¶
type ChannelSink[E any] struct { // contains filtered or unexported fields }
func NewChannelSink ¶
func NewChannelSink[E any](buf uint) *ChannelSink[E]
func (*ChannelSink[E]) Outlet ¶
func (s *ChannelSink[E]) Outlet() <-chan E
type EventQueue ¶
type EventQueue[E any] struct { // contains filtered or unexported fields }
func (*EventQueue[E]) CancelReserved ¶ added in v0.2.0
func (q *EventQueue[E]) CancelReserved()
CancelReserved cancels all jobs reserved via Reserve. CancelReserved only cancels all reservations present at the time CancelReserved is called. q is still valid and usable after this method returns.
func (*EventQueue[E]) Drain ¶
func (q *EventQueue[E]) Drain()
Drain blocks until queue and reserved events become 0.
func (*EventQueue[E]) IsRunning ¶
func (q *EventQueue[E]) IsRunning() bool
func (*EventQueue[E]) Len ¶
func (q *EventQueue[E]) Len() (queued, reserved int)
func (*EventQueue[E]) Push ¶
func (q *EventQueue[E]) Push(e E)
func (*EventQueue[E]) Reserve ¶
func (q *EventQueue[E]) Reserve(fn func(context.Context) (E, error))
Reserve reserves an update which will occur after fn returns.
fn will be called in a newly created goroutine, and it must return E. It also must respect ctx cancellation whose cause will be ErrClosed in case it has been cancelled. Cancellation would only happen if CloseReserved was called.
E returned by fn enters q only and only if it returned nil error.
func (*EventQueue[E]) Run ¶
func (q *EventQueue[E]) Run(ctx context.Context) (remaining int, err error)
Run runs q, block until ctx is cancelled.
func (*EventQueue[E]) WaitReserved ¶
func (q *EventQueue[E]) WaitReserved() <-chan struct{}
WaitReserved returns a channel which receives every time reserved event enters into the queue, or has been cancelled.
The channel is closed once all reservation events, which was present at the moment WaitReserved is called, are done.
type Option ¶
type Option[E any] func(q *EventQueue[E])
type Sink ¶
type Sink[E any] interface { // Write writes the event object to Sink. // If Write returns error, the event is put back to the queue. Write(ctx context.Context, event E) error }
Sink is written once EventQueue receives events. Write is serialized in EventQueue. It can be goroutine-unsafe method.