eventqueue

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 22, 2023 License: Apache-2.0 Imports: 8 Imported by: 1

README

eventqueue

A generic FIFO event queue.

eventqueue queues up events (tasks) and send them to Sink in FIFO order.

Usage

// unbuffered channel sink.
sink := eventqueue.NewChannelSink[int](0)
q := eventqueue.New[int](sink)

for i := 0; i < 10; i++ {
	q.Push(i)
}

var wg sync.WaitGroup

ctx, cancel := context.WithCancel(context.Background())
wg.Add(1)
go func() {
	defer wg.Done()
	q.Run(ctx)
}()

wg.Add(1)
go func() {
	defer wg.Done()
	for i := 0; i < 10; i++ {
		fmt.Printf("received: %d\n", <-sink.Outlet())
	}
	fmt.Println("done")
}()

q.Drain()

cancel()
wg.Wait()

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ChannelSink

type ChannelSink[E any] struct {
	// contains filtered or unexported fields
}

func NewChannelSink

func NewChannelSink[E any](buf uint) *ChannelSink[E]

func (*ChannelSink[E]) Outlet

func (s *ChannelSink[E]) Outlet() <-chan E

func (*ChannelSink[E]) Write

func (s *ChannelSink[E]) Write(ctx context.Context, event E) error

type EventQueue

type EventQueue[E any] struct {
	// contains filtered or unexported fields
}

func New

func New[E any](sink Sink[E], opts ...Option[E]) *EventQueue[E]

func (*EventQueue[E]) Drain

func (q *EventQueue[E]) Drain()

Drain blocks until queue and reserved events become 0.

func (*EventQueue[E]) IsRunning

func (q *EventQueue[E]) IsRunning() bool

func (*EventQueue[E]) Len

func (q *EventQueue[E]) Len() (inQueue, reserved int)

func (*EventQueue[E]) Push

func (q *EventQueue[E]) Push(e E)

func (*EventQueue[E]) Reserve

func (q *EventQueue[E]) Reserve(fn func() E)

Reserve reserves update. fn will be called in a newly created goroutine.

func (*EventQueue[E]) Run

func (q *EventQueue[E]) Run(ctx context.Context) (remaining int, err error)

Run runs q.

func (*EventQueue[E]) WaitReserved

func (q *EventQueue[E]) WaitReserved() <-chan struct{}

WaitReserved returns a channel which is sent every time reserved event enters into the queue. The channel is closed once all reserved event, which was present at the moment WaitReserved is called, enter into the queue.

type Option

type Option[E any] func(q *EventQueue[E])

func SetRetryInterval

func SetRetryInterval[E any](dur time.Duration) Option[E]

SetRetryInterval returns Option that sets the retry interval to q. Without this option, q does not retry to Write until another push event occurs.

type Sink

type Sink[E any] interface {
	// Write writes the event object to Sink.
	// If Write returns error, the event is put back to the queue.
	Write(ctx context.Context, event E) error
}

Sink is written once EventQueue receives events. Write is serialized in EventQueue. It can be goroutine-unsafe method.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL