Documentation
¶
Overview ¶
Package queue provides a Redis-backed work queue for the governor.
Index ¶
- Variables
- type Client
- func (c *Client) Close() error
- func (c *Client) Enqueue(ctx context.Context, payload []byte) error
- func (c *Client) GetDispatchCounter(ctx context.Context, key string) (int64, error)
- func (c *Client) IncrDispatchCounter(ctx context.Context, key string) (int64, error)
- func (c *Client) Peek(ctx context.Context) ([]byte, error)
- func (c *Client) Ping(ctx context.Context) error
- type Queue
Constants ¶
This section is empty.
Variables ¶
var ErrEmptyQueue = errors.New("queue: empty queue")
ErrEmptyQueue is returned when Peek is called on an empty queue.
var ErrInvalidRedisURL = errors.New("queue: invalid redis URL")
ErrInvalidRedisURL is returned when the Redis URL is empty or cannot be parsed.
ErrRedisUnavailable is returned when the Redis server cannot be reached.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client wraps a *redis.Client and implements the Queue interface.
func NewClient ¶
NewClient parses url and returns a connected Client. Returns ErrInvalidRedisURL if url is empty or malformed.
func (*Client) Enqueue ¶
Enqueue appends payload to the tail of both queue keys using RPUSH.
DT-5 dual-write: payload is written to primaryQueueKey ("donmai:governor:queue") AND legacyQueueKey ("agentfactory:governor:queue") so that platform workers still reading the old key continue to receive work during the transition window. Once all consumers have migrated, the legacy write will be removed.
func (*Client) GetDispatchCounter ¶
GetDispatchCounter returns the current integer value stored at key. Returns 0 if the key does not exist.
func (*Client) IncrDispatchCounter ¶
IncrDispatchCounter atomically increments the named counter key and returns the resulting value.
func (*Client) Peek ¶
Peek returns the oldest payload (head of the list) without removing it. Returns ErrEmptyQueue when both lists are empty.
DT-5 dual-read: the new primaryQueueKey ("donmai:governor:queue") is checked first; if it is empty, the legacyQueueKey ("agentfactory:governor:queue") is checked as a fallback so that consumers see items regardless of which key the producer wrote to. The fallback is removed once all producers have migrated.
type Queue ¶
type Queue interface {
// Ping verifies connectivity to the backing store.
Ping(ctx context.Context) error
// Enqueue appends payload to the tail of the queue.
Enqueue(ctx context.Context, payload []byte) error
// Peek returns the oldest payload without removing it.
// Returns ErrEmptyQueue when the queue is empty.
Peek(ctx context.Context) ([]byte, error)
// IncrDispatchCounter atomically increments the named counter and
// returns the new value.
IncrDispatchCounter(ctx context.Context, key string) (int64, error)
// GetDispatchCounter returns the current value of the named counter,
// or 0 if it has never been set.
GetDispatchCounter(ctx context.Context, key string) (int64, error)
// Close releases resources held by the client.
Close() error
}
Queue defines the interface for the governor's work queue operations. Implementations must be safe for concurrent use.