eventqueue

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 9, 2024 License: Apache-2.0 Imports: 10 Imported by: 1

README

eventqueue

A generic FIFO event queue.

eventqueue queues up events (tasks) and send them to Sink in FIFO order.

Usage

eventqueue queues up event object E (whatever you want), and then write them into Sink once events are available.

Sink is an interface expressed as

// Sink is written once EventQueue receives events.
// Write is serialized in EventQueue. It can be goroutine-unsafe method.
type Sink[E any] interface {
	// Write writes the event object to Sink.
	// If Write returns error, the event is put back to the head of queue.
	Write(ctx context.Context, event E) error
}

eventqueue can be Push-ed arbitrary number as long as the system has enough memory space. It then tries to Write to Sink in FIFO order.

eventqueue also can Reserve happening of event after fn func(context.Context) (E, error), passed fn will be called in a new goroutine and once fn returns with nil error, returned event from fn enters eventqueue.

func main() {
	// unbuffered channel sink.
	sink := eventqueue.NewChannelSink[int](0)
	// sink is interface.
	q := eventqueue.New[int](sink)

	for i := 0; i < 10; i++ {
		q.Push(i)
	}

	// q also can Reserve happening of event after fn returns.
	// If fn returns with nil error, returned E enters queue.
	q.Reserve(func(ctx context.Context) (int, error) {
		time.Sleep(500 * time.Millisecond)
		return 999, nil
	})

	// q also can cancel too long fn.
	// Of course if and only if fn respects given ctx.
	q.Reserve(func(ctx context.Context) (int, error) {
		timer := time.NewTimer(time.Hour)
		select {
		case <-ctx.Done():
			return 0, ctx.Err()
		case <-timer.C:
			return 2999, nil
		}
	})

	var wg sync.WaitGroup

	ctx, cancel := context.WithCancel(context.Background())
	wg.Add(1)
	go func() {
		defer wg.Done()
		remaining, err := q.Run(ctx)
		fmt.Printf("tasks remaining in queue: %d, err = %v\n", remaining, err) // tasks remaining in queue: 0, err = <nil>
	}()

	wg.Add(1)
	go func() {
		defer wg.Done()
		for i := 0; i < 11; i++ {
			fmt.Printf("received: %d\n", <-sink.Outlet())
			/*
				received: 0
				received: 1
				received: 2
				received: 3
				received: 4
				received: 5
				received: 6
				received: 7
				received: 8
				received: 9
				received: 999
			*/
		}
		fmt.Println("done")
	}()

	// at this point
	queued, reserved := q.Len()
	fmt.Printf("queued = %d, reserved = %d\n", queued, reserved) // queued = 10, reserved = 2
	waitCh := q.WaitReserved()
	// can observe Reserved fn returned.
	<-waitCh

	// and now is
	queued, reserved = q.Len()
	fmt.Printf("queued = %d, reserved = %d\n", queued, reserved) // queued = 1, reserved = 1

	// requests cancellation.
	q.CancelReserved()
	for range waitCh {
	}
	// then
	queued, reserved = q.Len()
	fmt.Printf("queued = %d, reserved = %d\n", queued, reserved) // queued = 0, reserved = 0

	q.Drain()

	cancel()
	wg.Wait()
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrAlreadyRunning = errors.New("already running")
	ErrClosed         = errors.New("closed")
)

Functions

This section is empty.

Types

type ChannelSink

type ChannelSink[E any] struct {
	// contains filtered or unexported fields
}

func NewChannelSink

func NewChannelSink[E any](buf uint) *ChannelSink[E]

func (*ChannelSink[E]) Outlet

func (s *ChannelSink[E]) Outlet() <-chan E

func (*ChannelSink[E]) Write

func (s *ChannelSink[E]) Write(ctx context.Context, event E) error

type Deque added in v0.3.0

type Deque[T any] struct {
	*deque.Deque[T]
}

func NewDeque added in v0.3.0

func NewDeque[T any](cap int) *Deque[T]

func (*Deque[T]) Clone added in v0.3.0

func (q *Deque[T]) Clone() []T

func (*Deque[T]) Range added in v0.3.0

func (q *Deque[T]) Range(fn func(i int, e T) (next bool))

type DrainSink added in v0.3.0

type DrainSink[E any] struct{}

func (DrainSink[E]) Write added in v0.3.0

func (DrainSink[E]) Write(ctx context.Context, event E) error

type EventQueue

type EventQueue[E any] struct {
	// contains filtered or unexported fields
}

func New

func New[E any](sink Sink[E], opts ...Option[E]) *EventQueue[E]

func (*EventQueue[E]) CancelReserved added in v0.2.0

func (q *EventQueue[E]) CancelReserved()

CancelReserved cancels all jobs reserved via Reserve. CancelReserved only cancels all reservations present at the time CancelReserved is called. q is still valid and usable after this method returns.

func (*EventQueue[E]) Clear added in v0.3.0

func (q *EventQueue[E]) Clear()

Clear clears q. It may or may not retain memory allocated for q. Calling Clear on running q might be wrong.

func (*EventQueue[E]) Clone added in v0.3.0

func (q *EventQueue[E]) Clone() []E

Clone clones internal contents of q. The order of elements always is same as what the Sink would see them.

Calling Clone on running q might be wrong choice since it would block long if q holds many elements. An element being sent through Sink.Write may not be included in returned slice. If Sink.Write failed, the element would be pushed back to the head of q. So any subsequent calls could observe an additional element on head.

func (*EventQueue[E]) Drain

func (q *EventQueue[E]) Drain()

Drain blocks until queue and reserved events become 0.

func (*EventQueue[E]) IsRunning

func (q *EventQueue[E]) IsRunning() bool

func (*EventQueue[E]) Len

func (q *EventQueue[E]) Len() (queued, reserved int)

func (*EventQueue[E]) Push

func (q *EventQueue[E]) Push(e E)

func (*EventQueue[E]) Range added in v0.3.0

func (q *EventQueue[E]) Range(fn func(i int, e E) (next bool))

Range calls fn sequentially for each element in q. If fn returns false, range stops the iteration. The order of elements always is same as what the Sink would see them.

func (*EventQueue[E]) Reserve

func (q *EventQueue[E]) Reserve(fn func(context.Context) (E, error))

Reserve reserves an update which will occur after fn returns.

fn will be called in a newly created goroutine, and it must return E. It also must respect ctx cancellation whose cause will be ErrClosed in case it has been cancelled. Cancellation would only happen if CloseReserved was called.

E returned by fn enters q only and only if it returned nil error.

func (*EventQueue[E]) Run

func (q *EventQueue[E]) Run(ctx context.Context) (remaining int, err error)

Run runs q, block until ctx is cancelled.

func (*EventQueue[E]) WaitReserved

func (q *EventQueue[E]) WaitReserved() <-chan struct{}

WaitReserved returns a channel which receives every time reserved event enters into the queue, or has been cancelled.

The channel is closed once all reservation events, which was present at the moment WaitReserved is called, are done.

func (*EventQueue[E]) WaitUntil added in v0.3.0

func (q *EventQueue[E]) WaitUntil(cond func(writing bool, queued, reserved int) bool)

type Option

type Option[E any] func(q *EventQueue[E])

func WithQueue added in v0.3.0

func WithQueue[E any](queue Queue[E]) Option[E]

func WithReservationTimeout added in v0.3.0

func WithReservationTimeout[E any](reservationTimeout time.Duration) Option[E]

func WithRetryInterval added in v0.2.0

func WithRetryInterval[E any](retryTimeout time.Duration) Option[E]

WithRetryInterval returns Option that sets the retry interval to q. Without this option, q does not retry to Write until another push event occurs.

type PriorityQueue added in v0.3.0

type PriorityQueue[T any] struct {
	// contains filtered or unexported fields
}

PriorityQueue implements Queue.

func NewPriorityQueue added in v0.3.0

func NewPriorityQueue[T any](init []T, less func(i T, j T) bool, methods priorityqueue.SliceInterfaceMethods[T]) *PriorityQueue[T]

func (*PriorityQueue[T]) Clear added in v0.3.0

func (q *PriorityQueue[T]) Clear()

func (*PriorityQueue[T]) Clone added in v0.3.0

func (q *PriorityQueue[T]) Clone() []T

func (*PriorityQueue[T]) Len added in v0.3.0

func (q *PriorityQueue[T]) Len() int

func (*PriorityQueue[T]) PopFront added in v0.3.0

func (q *PriorityQueue[T]) PopFront() T

func (*PriorityQueue[T]) PushBack added in v0.3.0

func (q *PriorityQueue[T]) PushBack(elem T)

func (*PriorityQueue[T]) PushFront added in v0.3.0

func (q *PriorityQueue[T]) PushFront(elem T)

func (*PriorityQueue[T]) Range added in v0.3.0

func (q *PriorityQueue[T]) Range(fn func(i int, e T) (next bool))

type Queue added in v0.3.0

type Queue[T any] interface {
	Range(fn func(i int, e T) (next bool))
	Clone() []T
	Clear()
	Len() int
	PopFront() T
	PushBack(elem T)
	PushFront(elem T)
}

type Sink

type Sink[E any] interface {
	// Write writes the event object to Sink.
	// If Write returns error, the event is put back to head of the queue,
	// which does not make q.Run return with that error,
	// only suspends q until next Push or after retry timeout.
	Write(ctx context.Context, event E) error
}

Sink is written once EventQueue receives events. Write is serialized in EventQueue. It can be goroutine-unsafe method.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL