delayqueue

package module
v1.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 6, 2024 License: MIT Imports: 5 Imported by: 0

README

delayqueue

This is a simple delay queue that sends each added value to a channel after a specified delay has elapsed. Compared to the naive approach of spawning one goroutine per item, this implementation uses a constant two goroutines, a single auxiliary timer, and a priority queue, so should therefore comfortably handle a large number of items.

Install

go get github.com/jaz303/delayqueue

Usage

// Create a new delay queue of ints that will run until
// the provided context is cancelled. The second argument
// is the buffer size of the outgoing channel.
queue := delayqueue.New[int](context.Background(), 0)

go func() {
    // Read items from the queue
    for i := range queue.C {
        log.Printf("Read from queue: %+v", i)
    }
}()

now := time.Now()

// Add items to the queue
queue.Add(now.Add(100 * time.Millisecond), 2)
queue.Add(now.Add(500 * time.Millisecond), 3)
queue.Add(now, 1)

Benchmarks

benchmark_test.go compares delayqueue to a goroutine per item approach:

BenchmarkDelayQueue-4   	       1	8313947735 ns/op	320948480 B/op	10001373 allocs/op
BenchmarkNaive-4        	       1	10157876788 ns/op	3296331856 B/op	19999860 allocs/op

delayqueue is about 20% faster, performs half as many allocations, and uses a whopping 10 times less RAM.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Queue

type Queue[T any] struct {
	// Items added to the queue are sent to this channel as they become due.
	// If the number of items that will be added to the queue exceeds the buffer
	// size, this channel must be continously read in order to prevent blocking.
	C <-chan T
	// contains filtered or unexported fields
}

func New

func New[T any](ctx context.Context, outBufferSize int) *Queue[T]

Create a new Queue that will run until the provided context is cancelled. The second argument specifies the buffer size of C, the outgoing channel.

func (*Queue[T]) Add

func (q *Queue[T]) Add(due time.Time, i T) error

Add an item i to the queue, to be emitted at or after the given due time. Add() will return an error if the queue's context is cancelled before the operation completes; the returned error will be ctx.Err(). Add() can be called safely from multiple goroutines.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL