queue

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2026 License: AGPL-3.0 Imports: 8 Imported by: 0

Documentation

Overview

Package queue implements the bounded in-memory message queue for BubbleFish Nexus. The queue sits between the WAL (durable store) and the destination adapter (SQLite, Postgres, etc.).

Invariants:

  • Enqueue is non-blocking. A full channel returns ErrLoadShed immediately. The WAL entry is still durable; the caller returns HTTP 429.
  • Drain / DrainWithContext wrap close(done) in sync.Once. Calling either method multiple times never panics and never closes the channel twice.
  • Worker goroutines log at WARN on transient destination errors and retry with exponential backoff. After maxDeliveryAttempts, the entry is marked PERMANENT_FAILURE in the WAL and dropped.
  • A nil logger at construction time panics immediately with a clear message.

Index

Constants

This section is empty.

Variables

View Source
var ErrLoadShed = errors.New("queue: channel full, load shed")

ErrLoadShed is returned by Enqueue when the channel is full. The WAL entry remains durable; the HTTP handler translates this to a 429 queue_full with Retry-After.

Functions

This section is empty.

Types

type Config

type Config struct {
	// Size is the channel buffer length. Defaults to defaultSize (10 000).
	Size int

	// Workers is the number of goroutines draining the channel. Defaults to
	// defaultWorkers (1).
	Workers int

	// OnProcessed is an optional callback invoked after each entry is
	// successfully written to the destination. Used to increment the
	// bubblefish_queue_processing_rate metric. Must be safe to call
	// concurrently. If nil, no callback is made.
	OnProcessed func()

	// OnDelivered is an optional callback invoked after each entry is
	// successfully written to the destination. Receives the destination name
	// so the caller can invalidate caches for the affected destination.
	// Must be safe to call concurrently. If nil, no callback is made.
	OnDelivered func(destination string)
}

Config controls queue sizing and worker count.

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue is a bounded, concurrency-safe message queue. All state is held in struct fields; there are no package-level variables.

func New

func New(cfg Config, logger *slog.Logger, dest destination.DestinationWriter, updater wal.WALUpdater) *Queue

New creates a Queue with the given configuration and starts the worker goroutines. Both logger and dest must be non-nil. Panics if logger is nil.

Callers MUST call Drain or DrainWithContext before process exit to allow in-flight entries to be written to the destination.

func (*Queue) Drain

func (q *Queue) Drain()

Drain signals all workers to stop and waits for them to finish. It is safe to call multiple times; only the first call closes the done channel. Workers finish in-flight retries and drain remaining buffered entries before exiting.

func (*Queue) DrainWithContext

func (q *Queue) DrainWithContext(ctx context.Context) bool

DrainWithContext signals all workers to stop and waits up to ctx's deadline for them to finish. Returns true if all workers finished within the deadline, false if ctx expired first.

If ctx expires, the workers are still running and will exit once their current item finishes (because done was already closed). Callers may call Drain() afterward for a blocking wait if needed.

Goroutine lifecycle note: the internal goroutine spawned here calls q.wg.Wait(), which will return once all workers exit. Because done is always closed before q.wg.Wait() is called, the goroutine is guaranteed to complete, so no goroutine leak occurs.

func (*Queue) Enqueue

func (q *Queue) Enqueue(entry wal.Entry) error

Enqueue adds entry to the queue channel. Returns ErrLoadShed immediately if the channel is full — it never blocks. This is the non-blocking select pattern required by the spec (Tech Spec Section 5):

select { case q.ch <- entry: return nil; default: return ErrLoadShed }

func (*Queue) Len

func (q *Queue) Len() int

Len returns the current number of entries buffered in the queue channel. The value is approximate — it may change between the call and any subsequent use. Safe to call concurrently.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL