eventqueue

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 26, 2024 License: Apache-2.0 Imports: 8 Imported by: 1

README

eventqueue

A generic FIFO event queue.

eventqueue queues up events (tasks) and send them to Sink in FIFO order.

Usage

// unbuffered channel sink.
sink := eventqueue.NewChannelSink[int](0)
q := eventqueue.New[int](sink)

for i := 0; i < 10; i++ {
	q.Push(i)
}

var wg sync.WaitGroup

ctx, cancel := context.WithCancel(context.Background())
wg.Add(1)
go func() {
	defer wg.Done()
	q.Run(ctx)
}()

wg.Add(1)
go func() {
	defer wg.Done()
	for i := 0; i < 10; i++ {
		fmt.Printf("received: %d\n", <-sink.Outlet())
	}
	fmt.Println("done")
}()

q.Drain()

cancel()
wg.Wait()

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrAlreadyRunning = errors.New("already running")
	ErrClosed         = errors.New("closed")
)

Functions

This section is empty.

Types

type ChannelSink

type ChannelSink[E any] struct {
	// contains filtered or unexported fields
}

func NewChannelSink

func NewChannelSink[E any](buf uint) *ChannelSink[E]

func (*ChannelSink[E]) Outlet

func (s *ChannelSink[E]) Outlet() <-chan E

func (*ChannelSink[E]) Write

func (s *ChannelSink[E]) Write(ctx context.Context, event E) error

type EventQueue

type EventQueue[E any] struct {
	// contains filtered or unexported fields
}

func New

func New[E any](sink Sink[E], opts ...Option[E]) *EventQueue[E]

func (*EventQueue[E]) CancelReserved added in v0.2.0

func (q *EventQueue[E]) CancelReserved()

CancelReserved cancels all jobs reserved via Reserve. CancelReserved only cancels all reservations present at the time CancelReserved is called. q is still valid and usable after this method returns.

func (*EventQueue[E]) Drain

func (q *EventQueue[E]) Drain()

Drain blocks until queue and reserved events become 0.

func (*EventQueue[E]) IsRunning

func (q *EventQueue[E]) IsRunning() bool

func (*EventQueue[E]) Len

func (q *EventQueue[E]) Len() (queued, reserved int)

func (*EventQueue[E]) Push

func (q *EventQueue[E]) Push(e E)

func (*EventQueue[E]) Reserve

func (q *EventQueue[E]) Reserve(fn func(context.Context) (E, error))

Reserve reserves an update which will occur after fn returns.

fn will be called in a newly created goroutine, and it must return E. It also must respect ctx cancellation whose cause will be ErrClosed in case it has been cancelled. Cancellation would only happen if CloseReserved was called.

E returned by fn enters q only and only if it returned nil error.

func (*EventQueue[E]) Run

func (q *EventQueue[E]) Run(ctx context.Context) (remaining int, err error)

Run runs q, block until ctx is cancelled.

func (*EventQueue[E]) WaitReserved

func (q *EventQueue[E]) WaitReserved() <-chan struct{}

WaitReserved returns a channel which receives every time reserved event enters into the queue, or has been cancelled.

The channel is closed once all reservation events, which was present at the moment WaitReserved is called, are done.

type Option

type Option[E any] func(q *EventQueue[E])

func WithRetryInterval added in v0.2.0

func WithRetryInterval[E any](dur time.Duration) Option[E]

WithRetryInterval returns Option that sets the retry interval to q. Without this option, q does not retry to Write until another push event occurs.

type Sink

type Sink[E any] interface {
	// Write writes the event object to Sink.
	// If Write returns error, the event is put back to the queue.
	Write(ctx context.Context, event E) error
}

Sink is written once EventQueue receives events. Write is serialized in EventQueue. It can be goroutine-unsafe method.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL