Documentation
¶
Overview ¶
Package queue implements the bounded in-memory message queue for BubbleFish Nexus. The queue sits between the WAL (durable store) and the destination adapter (SQLite, Postgres, etc.).
Invariants:
- Enqueue is non-blocking. A full channel returns ErrLoadShed immediately. The WAL entry is still durable; the caller returns HTTP 429.
- Drain / DrainWithContext wrap close(done) in sync.Once. Calling either method multiple times never panics and never closes the channel twice.
- Worker goroutines log at WARN on transient destination errors and retry with exponential backoff. After maxDeliveryAttempts, the entry is marked PERMANENT_FAILURE in the WAL and dropped.
- A nil logger at construction time panics immediately with a clear message.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ErrLoadShed = errors.New("queue: channel full, load shed")
ErrLoadShed is returned by Enqueue when the channel is full. The WAL entry remains durable; the HTTP handler translates this to a 429 queue_full with Retry-After.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
// Size is the channel buffer length. Defaults to defaultSize (10 000).
Size int
// Workers is the number of goroutines draining the channel. Defaults to
// defaultWorkers (1).
Workers int
// OnProcessed is an optional callback invoked after each entry is
// successfully written to the destination. Used to increment the
// bubblefish_queue_processing_rate metric. Must be safe to call
// concurrently. If nil, no callback is made.
OnProcessed func()
// OnDelivered is an optional callback invoked after each entry is
// successfully written to the destination. Receives the destination name
// so the caller can invalidate caches for the affected destination.
// Must be safe to call concurrently. If nil, no callback is made.
OnDelivered func(destination string)
}
Config controls queue sizing and worker count.
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue is a bounded, concurrency-safe message queue. All state is held in struct fields; there are no package-level variables.
func New ¶
func New(cfg Config, logger *slog.Logger, dest destination.DestinationWriter, updater wal.WALUpdater) *Queue
New creates a Queue with the given configuration and starts the worker goroutines. Both logger and dest must be non-nil. Panics if logger is nil.
Callers MUST call Drain or DrainWithContext before process exit to allow in-flight entries to be written to the destination.
func (*Queue) Drain ¶
func (q *Queue) Drain()
Drain signals all workers to stop and waits for them to finish. It is safe to call multiple times; only the first call closes the done channel. Workers finish in-flight retries and drain remaining buffered entries before exiting.
func (*Queue) DrainWithContext ¶
DrainWithContext signals all workers to stop and waits up to ctx's deadline for them to finish. Returns true if all workers finished within the deadline, false if ctx expired first.
If ctx expires, the workers are still running and will exit once their current item finishes (because done was already closed). Callers may call Drain() afterward for a blocking wait if needed.
Goroutine lifecycle note: the internal goroutine spawned here calls q.wg.Wait(), which will return once all workers exit. Because done is always closed before q.wg.Wait() is called, the goroutine is guaranteed to complete, so no goroutine leak occurs.