Documentation
¶
Overview ¶
Package queue provides Redis-backed admin clients for the AgentFactory work queue and merge queue. It mirrors the data structures used by the TypeScript packages/cli/src/lib/queue-admin-runner.ts and merge-queue-runner.ts so the Go CLI can inspect and manipulate the same Redis state.
Index ¶
- Variables
- type AdminClient
- func (c *AdminClient) ClearAll(ctx context.Context) (BulkClearResult, error)
- func (c *AdminClient) ClearClaims(ctx context.Context) (int, error)
- func (c *AdminClient) ClearWorkQueue(ctx context.Context) (int, error)
- func (c *AdminClient) Close() error
- func (c *AdminClient) DequeueEntry(ctx context.Context, repoID string, prNumber int) error
- func (c *AdminClient) DropSession(ctx context.Context, partialID string) (int, error)
- func (c *AdminClient) ForceRetry(ctx context.Context, repoID string, prNumber int) error
- func (c *AdminClient) GetMergeQueueStatus(ctx context.Context, repoID string) (MergeQueueStatus, error)
- func (c *AdminClient) ListMergeQueue(ctx context.Context, repoID string) (MergeQueueSnapshot, error)
- func (c *AdminClient) ListSessions(ctx context.Context) ([]SessionEntry, error)
- func (c *AdminClient) ListWorkItems(ctx context.Context) ([]WorkItem, error)
- func (c *AdminClient) ListWorkers(ctx context.Context) ([]WorkerEntry, error)
- func (c *AdminClient) PauseMergeQueue(ctx context.Context, repoID string) error
- func (c *AdminClient) PeekWorkItem(ctx context.Context) (*WorkItem, error)
- func (c *AdminClient) Ping(ctx context.Context) error
- func (c *AdminClient) RequeueSession(ctx context.Context, partialID string) (int, error)
- func (c *AdminClient) ResetWorkState(ctx context.Context) (BulkClearResult, error)
- func (c *AdminClient) ResumeMergeQueue(ctx context.Context, repoID string) error
- func (c *AdminClient) SetMergeQueuePriority(ctx context.Context, repoID string, prNumber int, priority float64) error
- type BulkClearResult
- type MergeEntry
- type MergeQueueSnapshot
- type MergeQueueStatus
- type SessionEntry
- type Snapshot
- type WorkItem
- type WorkerEntry
Constants ¶
This section is empty.
Variables ¶
var ErrItemNotFound = errors.New("item not found in queue")
ErrItemNotFound is returned when a queue item cannot be found by the given selector.
var ErrMergeEntryNotFound = errors.New("merge queue entry not found")
ErrMergeEntryNotFound is returned when a merge queue entry cannot be located.
var ErrRedisURLRequired = errors.New("REDIS_URL environment variable is required")
ErrRedisURLRequired is returned when REDIS_URL is not set.
Functions ¶
This section is empty.
Types ¶
type AdminClient ¶
type AdminClient struct {
// contains filtered or unexported fields
}
AdminClient wraps a Redis connection and provides admin operations for both the work queue and the merge queue.
func NewAdminClient ¶
func NewAdminClient(redisURL string) (*AdminClient, error)
NewAdminClient parses redisURL and returns a connected AdminClient.
func (*AdminClient) ClearAll ¶
func (c *AdminClient) ClearAll(ctx context.Context) (BulkClearResult, error)
ClearAll wipes queue, sessions, claims, and workers — the nuclear-reset option.
func (*AdminClient) ClearClaims ¶
func (c *AdminClient) ClearClaims(ctx context.Context) (int, error)
ClearClaims deletes all work:claim:* keys from Redis. Returns the number of claim keys deleted.
func (*AdminClient) ClearWorkQueue ¶
func (c *AdminClient) ClearWorkQueue(ctx context.Context) (int, error)
ClearWorkQueue deletes the work queue sorted set and work items hash. Returns the number of logical queue items cleared.
func (*AdminClient) Close ¶
func (c *AdminClient) Close() error
Close releases the underlying connection pool.
func (*AdminClient) DequeueEntry ¶
DequeueEntry removes a PR from the merge queue entirely (skip/dequeue).
func (*AdminClient) DropSession ¶
DropSession removes a session (and its queue/claim entries) by partial ID match. Returns the number of entries removed.
func (*AdminClient) ForceRetry ¶
ForceRetry moves a failed/blocked PR back to the queued set.
func (*AdminClient) GetMergeQueueStatus ¶
func (c *AdminClient) GetMergeQueueStatus(ctx context.Context, repoID string) (MergeQueueStatus, error)
GetMergeQueueStatus returns a summary overview of the merge queue for repoID.
func (*AdminClient) ListMergeQueue ¶
func (c *AdminClient) ListMergeQueue(ctx context.Context, repoID string) (MergeQueueSnapshot, error)
ListMergeQueue returns all queued, failed, and blocked merge entries for repoID.
func (*AdminClient) ListSessions ¶
func (c *AdminClient) ListSessions(ctx context.Context) ([]SessionEntry, error)
ListSessions returns all agent sessions.
func (*AdminClient) ListWorkItems ¶
func (c *AdminClient) ListWorkItems(ctx context.Context) ([]WorkItem, error)
ListWorkItems returns all items currently in the work queue hash.
func (*AdminClient) ListWorkers ¶
func (c *AdminClient) ListWorkers(ctx context.Context) ([]WorkerEntry, error)
ListWorkers returns all registered workers.
func (*AdminClient) PauseMergeQueue ¶
func (c *AdminClient) PauseMergeQueue(ctx context.Context, repoID string) error
PauseMergeQueue sets the paused flag for repoID's merge queue.
func (*AdminClient) PeekWorkItem ¶
func (c *AdminClient) PeekWorkItem(ctx context.Context) (*WorkItem, error)
PeekWorkItem returns the first entry in the work queue without removing it.
func (*AdminClient) Ping ¶
func (c *AdminClient) Ping(ctx context.Context) error
Ping checks connectivity.
func (*AdminClient) RequeueSession ¶
RequeueSession resets a "running" or "claimed" session back to "pending".
func (*AdminClient) ResetWorkState ¶
func (c *AdminClient) ResetWorkState(ctx context.Context) (BulkClearResult, error)
ResetWorkState clears claims, clears the queue, and resets stuck (running/claimed) sessions back to pending. Mirrors `af-queue-admin reset`.
func (*AdminClient) ResumeMergeQueue ¶
func (c *AdminClient) ResumeMergeQueue(ctx context.Context, repoID string) error
ResumeMergeQueue removes the paused flag for repoID's merge queue.
func (*AdminClient) SetMergeQueuePriority ¶
func (c *AdminClient) SetMergeQueuePriority(ctx context.Context, repoID string, prNumber int, priority float64) error
SetMergeQueuePriority updates the priority (ZSET score) for a PR. Priority 1 = highest (processed first). Uses priority as the score so lower priority values float to the top.
type BulkClearResult ¶
type BulkClearResult struct {
QueueItemsCleared int `json:"queueItemsCleared"`
SessionsCleared int `json:"sessionsCleared"`
ClaimsCleared int `json:"claimsCleared"`
WorkersCleared int `json:"workersCleared"`
SessionsReset int `json:"sessionsReset,omitempty"`
}
BulkClearResult summarises the result of ClearAll or Reset.
type MergeEntry ¶
type MergeEntry struct {
RepoID string `json:"repoId"`
PRNumber int `json:"prNumber"`
SourceBranch string `json:"sourceBranch"`
Priority float64 `json:"priority"`
EnqueuedAt int64 `json:"enqueuedAt"`
Status string `json:"status"` // queued|processing|failed|blocked
FailureReason string `json:"failureReason,omitempty"`
BlockReason string `json:"blockReason,omitempty"`
}
MergeEntry represents one PR in the merge queue.
type MergeQueueSnapshot ¶
type MergeQueueSnapshot struct {
RepoID string `json:"repoId"`
Depth int `json:"depth"`
Entries []MergeEntry `json:"entries"`
}
MergeQueueSnapshot is the JSON output for `af admin merge-queue list`.
type MergeQueueStatus ¶
type MergeQueueStatus struct {
RepoID string `json:"repoId"`
Depth int `json:"depth"`
Processing *MergeEntry `json:"processing"`
FailedCount int `json:"failedCount"`
BlockedCount int `json:"blockedCount"`
Paused bool `json:"paused"`
}
MergeQueueStatus is the shape returned by GetMergeQueueStatus.
type SessionEntry ¶
type SessionEntry struct {
SessionID string `json:"sessionId"`
Status string `json:"status"`
IssueIdentifier string `json:"issueIdentifier,omitempty"`
IssueID string `json:"issueId,omitempty"`
LinearSessionID string `json:"linearSessionId,omitempty"`
WorkerID string `json:"workerId,omitempty"`
WorkType string `json:"workType,omitempty"`
UpdatedAt int64 `json:"updatedAt,omitempty"`
}
SessionEntry represents one agent session stored in Redis.
type Snapshot ¶
type Snapshot struct {
Items []WorkItem `json:"items"`
Sessions []SessionEntry `json:"sessions"`
Workers []WorkerEntry `json:"workers"`
}
Snapshot is the JSON output for `af admin queue list`.
type WorkItem ¶
type WorkItem struct {
SessionID string `json:"sessionId"`
IssueIdentifier string `json:"issueIdentifier,omitempty"`
WorkType string `json:"workType,omitempty"`
Priority float64 `json:"priority,omitempty"`
QueuedAt int64 `json:"queuedAt,omitempty"`
ProviderSession string `json:"providerSessionId,omitempty"`
Prompt string `json:"prompt,omitempty"`
}
WorkItem represents a single entry in the work queue hash.
type WorkerEntry ¶
type WorkerEntry struct {
ID string `json:"id"`
Hostname string `json:"hostname,omitempty"`
Status string `json:"status"`
Capacity int `json:"capacity,omitempty"`
ActiveCount int `json:"activeCount,omitempty"`
LastHeartbeat int64 `json:"lastHeartbeat,omitempty"`
}
WorkerEntry represents a registered worker.