queue

package
v0.36.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 7, 2026 License: MIT Imports: 5 Imported by: 0

Documentation

Overview

Package queue provides a Redis-backed work queue for the governor.

Index

Constants

This section is empty.

Variables

View Source
var ErrEmptyQueue = errors.New("queue: empty queue")

ErrEmptyQueue is returned when Peek is called on an empty queue.

View Source
var ErrInvalidRedisURL = errors.New("queue: invalid redis URL")

ErrInvalidRedisURL is returned when the Redis URL is empty or cannot be parsed.

View Source
var ErrRedisUnavailable = errors.New("queue: redis unavailable")

ErrRedisUnavailable is returned when the Redis server cannot be reached.

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client wraps a *redis.Client and implements the Queue interface.

func NewClient

func NewClient(url string) (*Client, error)

NewClient parses url and returns a connected Client. Returns ErrInvalidRedisURL if url is empty or malformed.

func (*Client) Close

func (c *Client) Close() error

Close releases the underlying Redis connection pool.

func (*Client) Enqueue

func (c *Client) Enqueue(ctx context.Context, payload []byte) error

Enqueue appends payload to the tail of both queue keys using RPUSH.

DT-5 dual-write: payload is written to primaryQueueKey ("donmai:governor:queue") AND legacyQueueKey ("agentfactory:governor:queue") so that platform workers still reading the old key continue to receive work during the transition window. Once all consumers have migrated, the legacy write will be removed.

func (*Client) GetDispatchCounter

func (c *Client) GetDispatchCounter(ctx context.Context, key string) (int64, error)

GetDispatchCounter returns the current integer value stored at key. Returns 0 if the key does not exist.

func (*Client) IncrDispatchCounter

func (c *Client) IncrDispatchCounter(ctx context.Context, key string) (int64, error)

IncrDispatchCounter atomically increments the named counter key and returns the resulting value.

func (*Client) Peek

func (c *Client) Peek(ctx context.Context) ([]byte, error)

Peek returns the oldest payload (head of the list) without removing it. Returns ErrEmptyQueue when both lists are empty.

DT-5 dual-read: the new primaryQueueKey ("donmai:governor:queue") is checked first; if it is empty, the legacyQueueKey ("agentfactory:governor:queue") is checked as a fallback so that consumers see items regardless of which key the producer wrote to. The fallback is removed once all producers have migrated.

func (*Client) Ping

func (c *Client) Ping(ctx context.Context) error

Ping verifies connectivity to Redis. Returns ErrRedisUnavailable (wrapping the underlying error) on failure.

type Queue

type Queue interface {
	// Ping verifies connectivity to the backing store.
	Ping(ctx context.Context) error

	// Enqueue appends payload to the tail of the queue.
	Enqueue(ctx context.Context, payload []byte) error

	// Peek returns the oldest payload without removing it.
	// Returns ErrEmptyQueue when the queue is empty.
	Peek(ctx context.Context) ([]byte, error)

	// IncrDispatchCounter atomically increments the named counter and
	// returns the new value.
	IncrDispatchCounter(ctx context.Context, key string) (int64, error)

	// GetDispatchCounter returns the current value of the named counter,
	// or 0 if it has never been set.
	GetDispatchCounter(ctx context.Context, key string) (int64, error)

	// Close releases resources held by the client.
	Close() error
}

Queue defines the interface for the governor's work queue operations. Implementations must be safe for concurrent use.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL