queue

package
v0.37.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 8, 2026 License: MIT Imports: 7 Imported by: 0

Documentation

Overview

Package queue provides Redis-backed admin clients for the AgentFactory work queue and merge queue. It mirrors the data structures used by the TypeScript packages/cli/src/lib/queue-admin-runner.ts and merge-queue-runner.ts so the Go CLI can inspect and manipulate the same Redis state.

Index

Constants

This section is empty.

Variables

View Source
var ErrItemNotFound = errors.New("item not found in queue")

ErrItemNotFound is returned when a queue item cannot be found by the given selector.

View Source
var ErrMergeEntryNotFound = errors.New("merge queue entry not found")

ErrMergeEntryNotFound is returned when a merge queue entry cannot be located.

View Source
var ErrRedisURLRequired = errors.New("REDIS_URL environment variable is required")

ErrRedisURLRequired is returned when REDIS_URL is not set.

Functions

This section is empty.

Types

type AdminClient

type AdminClient struct {
	// contains filtered or unexported fields
}

AdminClient wraps a Redis connection and provides admin operations for both the work queue and the merge queue.

func NewAdminClient

func NewAdminClient(redisURL string) (*AdminClient, error)

NewAdminClient parses redisURL and returns a connected AdminClient.

func (*AdminClient) ClearAll

func (c *AdminClient) ClearAll(ctx context.Context) (BulkClearResult, error)

ClearAll wipes queue, sessions, claims, and workers — the nuclear-reset option.

func (*AdminClient) ClearClaims

func (c *AdminClient) ClearClaims(ctx context.Context) (int, error)

ClearClaims deletes all work:claim:* keys from Redis. Returns the number of claim keys deleted.

func (*AdminClient) ClearWorkQueue

func (c *AdminClient) ClearWorkQueue(ctx context.Context) (int, error)

ClearWorkQueue deletes the work queue sorted set and work items hash. Returns the number of logical queue items cleared.

func (*AdminClient) Close

func (c *AdminClient) Close() error

Close releases the underlying connection pool.

func (*AdminClient) DequeueEntry

func (c *AdminClient) DequeueEntry(ctx context.Context, repoID string, prNumber int) error

DequeueEntry removes a PR from the merge queue entirely (skip/dequeue).

func (*AdminClient) DropSession

func (c *AdminClient) DropSession(ctx context.Context, partialID string) (int, error)

DropSession removes a session (and its queue/claim entries) by partial ID match. Returns the number of entries removed.

func (*AdminClient) ForceRetry

func (c *AdminClient) ForceRetry(ctx context.Context, repoID string, prNumber int) error

ForceRetry moves a failed/blocked PR back to the queued set.

func (*AdminClient) GetMergeQueueStatus

func (c *AdminClient) GetMergeQueueStatus(ctx context.Context, repoID string) (MergeQueueStatus, error)

GetMergeQueueStatus returns a summary overview of the merge queue for repoID.

func (*AdminClient) ListMergeQueue

func (c *AdminClient) ListMergeQueue(ctx context.Context, repoID string) (MergeQueueSnapshot, error)

ListMergeQueue returns all queued, failed, and blocked merge entries for repoID.

func (*AdminClient) ListSessions

func (c *AdminClient) ListSessions(ctx context.Context) ([]SessionEntry, error)

ListSessions returns all agent sessions.

func (*AdminClient) ListWorkItems

func (c *AdminClient) ListWorkItems(ctx context.Context) ([]WorkItem, error)

ListWorkItems returns all items currently in the work queue hash.

func (*AdminClient) ListWorkers

func (c *AdminClient) ListWorkers(ctx context.Context) ([]WorkerEntry, error)

ListWorkers returns all registered workers.

func (*AdminClient) PauseMergeQueue

func (c *AdminClient) PauseMergeQueue(ctx context.Context, repoID string) error

PauseMergeQueue sets the paused flag for repoID's merge queue.

func (*AdminClient) PeekWorkItem

func (c *AdminClient) PeekWorkItem(ctx context.Context) (*WorkItem, error)

PeekWorkItem returns the first entry in the work queue without removing it.

func (*AdminClient) Ping

func (c *AdminClient) Ping(ctx context.Context) error

Ping checks connectivity.

func (*AdminClient) RequeueSession

func (c *AdminClient) RequeueSession(ctx context.Context, partialID string) (int, error)

RequeueSession resets a "running" or "claimed" session back to "pending".

func (*AdminClient) ResetWorkState

func (c *AdminClient) ResetWorkState(ctx context.Context) (BulkClearResult, error)

ResetWorkState clears claims, clears the queue, and resets stuck (running/claimed) sessions back to pending. Mirrors `af-queue-admin reset`.

func (*AdminClient) ResumeMergeQueue

func (c *AdminClient) ResumeMergeQueue(ctx context.Context, repoID string) error

ResumeMergeQueue removes the paused flag for repoID's merge queue.

func (*AdminClient) SetMergeQueuePriority

func (c *AdminClient) SetMergeQueuePriority(ctx context.Context, repoID string, prNumber int, priority float64) error

SetMergeQueuePriority updates the priority (ZSET score) for a PR. Priority 1 = highest (processed first). Uses priority as the score so lower priority values float to the top.

type BulkClearResult

type BulkClearResult struct {
	QueueItemsCleared int `json:"queueItemsCleared"`
	SessionsCleared   int `json:"sessionsCleared"`
	ClaimsCleared     int `json:"claimsCleared"`
	WorkersCleared    int `json:"workersCleared"`
	SessionsReset     int `json:"sessionsReset,omitempty"`
}

BulkClearResult summarises the result of ClearAll or Reset.

type MergeEntry

type MergeEntry struct {
	RepoID        string  `json:"repoId"`
	PRNumber      int     `json:"prNumber"`
	SourceBranch  string  `json:"sourceBranch"`
	Priority      float64 `json:"priority"`
	EnqueuedAt    int64   `json:"enqueuedAt"`
	Status        string  `json:"status"` // queued|processing|failed|blocked
	FailureReason string  `json:"failureReason,omitempty"`
	BlockReason   string  `json:"blockReason,omitempty"`
}

MergeEntry represents one PR in the merge queue.

type MergeQueueSnapshot

type MergeQueueSnapshot struct {
	RepoID  string       `json:"repoId"`
	Depth   int          `json:"depth"`
	Entries []MergeEntry `json:"entries"`
}

MergeQueueSnapshot is the JSON output for `af admin merge-queue list`.

type MergeQueueStatus

type MergeQueueStatus struct {
	RepoID       string      `json:"repoId"`
	Depth        int         `json:"depth"`
	Processing   *MergeEntry `json:"processing"`
	FailedCount  int         `json:"failedCount"`
	BlockedCount int         `json:"blockedCount"`
	Paused       bool        `json:"paused"`
}

MergeQueueStatus is the shape returned by GetMergeQueueStatus.

type SessionEntry

type SessionEntry struct {
	SessionID       string `json:"sessionId"`
	Status          string `json:"status"`
	IssueIdentifier string `json:"issueIdentifier,omitempty"`
	IssueID         string `json:"issueId,omitempty"`
	LinearSessionID string `json:"linearSessionId,omitempty"`
	WorkerID        string `json:"workerId,omitempty"`
	WorkType        string `json:"workType,omitempty"`
	UpdatedAt       int64  `json:"updatedAt,omitempty"`
}

SessionEntry represents one agent session stored in Redis.

type Snapshot

type Snapshot struct {
	Items    []WorkItem     `json:"items"`
	Sessions []SessionEntry `json:"sessions"`
	Workers  []WorkerEntry  `json:"workers"`
}

Snapshot is the JSON output for `af admin queue list`.

type WorkItem

type WorkItem struct {
	SessionID       string  `json:"sessionId"`
	IssueIdentifier string  `json:"issueIdentifier,omitempty"`
	WorkType        string  `json:"workType,omitempty"`
	Priority        float64 `json:"priority,omitempty"`
	QueuedAt        int64   `json:"queuedAt,omitempty"`
	ProviderSession string  `json:"providerSessionId,omitempty"`
	Prompt          string  `json:"prompt,omitempty"`
}

WorkItem represents a single entry in the work queue hash.

type WorkerEntry

type WorkerEntry struct {
	ID            string `json:"id"`
	Hostname      string `json:"hostname,omitempty"`
	Status        string `json:"status"`
	Capacity      int    `json:"capacity,omitempty"`
	ActiveCount   int    `json:"activeCount,omitempty"`
	LastHeartbeat int64  `json:"lastHeartbeat,omitempty"`
}

WorkerEntry represents a registered worker.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL