taskqueue

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 31, 2026 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func RandomHex

func RandomHex(n int) string

RandomHex generates n random bytes as a hex string. Exported for use by gateway and other packages that need unique keys.

Types

type Config

type Config struct {
	MaxConcurrent    int
	ResultRetention  time.Duration
	ProgressThrottle time.Duration
}

Config holds task queue configuration.

type Limiter

type Limiter interface {
	Acquire(ctx context.Context) error
	Release()
}

Limiter provides a concurrency-limiting semaphore. Manager implements this; the orchestrator Dispatcher can use it instead of its own internal semaphore for global concurrency control.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager manages background tasks with persistence and cancellation.

func New

func New(logger *zap.SugaredLogger, filePath string, cfg Config) *Manager

New creates a TaskManager. It loads existing state from filePath, marking any previously-running tasks as failed (interrupted by restart).

func (*Manager) Acquire

func (m *Manager) Acquire(ctx context.Context) error

Acquire implements the Limiter interface using the global semaphore.

func (*Manager) AddAnnouncer

func (m *Manager) AddAnnouncer(a agentapi.Announcer)

AddAnnouncer registers a channel bot for progress updates.

func (*Manager) Announce

func (m *Manager) Announce(sessionKey, text string)

Announce sends an unthrottled message to a session (for final results).

func (*Manager) AnnounceProgress

func (m *Manager) AnnounceProgress(sessionKey, text string)

AnnounceProgress sends a throttled progress update to the parent session.

func (*Manager) Cancel

func (m *Manager) Cancel(id string) error

Cancel cancels a running task by ID.

func (*Manager) CancelAll

func (m *Manager) CancelAll(sessionKey string) int

CancelAll cancels all running tasks for a session key. Returns count cancelled.

func (*Manager) Get

func (m *Manager) Get(id string) *TaskRecord

Get returns a copy of the task with the given ID, or nil if not found.

func (*Manager) List

func (m *Manager) List() []*TaskRecord

List returns a snapshot of all tasks (running + recently completed).

func (*Manager) ListByStatus

func (m *Manager) ListByStatus(status string) []*TaskRecord

ListByStatus returns copies of all tasks with the given status.

func (*Manager) ListForSession

func (m *Manager) ListForSession(sessionKey string) []*TaskRecord

ListForSession returns tasks associated with a specific session.

func (*Manager) Release

func (m *Manager) Release()

Release implements the Limiter interface.

func (*Manager) RunningCount

func (m *Manager) RunningCount() int

RunningCount returns the number of currently running tasks.

func (*Manager) SetPriority

func (m *Manager) SetPriority(id string, priority int) error

SetPriority updates the priority of a pending task. Returns an error if the task is not found or not in pending status.

func (*Manager) Shutdown

func (m *Manager) Shutdown()

Shutdown cancels all running tasks and flushes state to disk.

func (*Manager) StartPruneLoop

func (m *Manager) StartPruneLoop(ctx context.Context)

StartPruneLoop prunes completed tasks older than retention. Blocks until ctx done.

func (*Manager) Submit

func (m *Manager) Submit(sessionKey, agentID, message string, fn func(ctx context.Context) (string, error), opts SubmitOpts) string

Submit enqueues a background task. The task enters a priority queue and is dispatched by the dispatcher goroutine in priority order, gated by the global concurrency semaphore. Returns the task ID.

type RateLimiter

type RateLimiter struct {
	// contains filtered or unexported fields
}

RateLimiter implements a token-bucket algorithm for rate limiting API calls.

func NewRateLimiter

func NewRateLimiter(rate float64, burst int) *RateLimiter

NewRateLimiter creates a rate limiter. rate: requests per second, burst: max concurrent burst.

func (*RateLimiter) Wait

func (rl *RateLimiter) Wait(ctx context.Context) error

Wait blocks until a token is available or ctx is cancelled.

type SubmitOpts

type SubmitOpts struct {
	// OnComplete is called after the task finishes (success, failure, or cancellation).
	OnComplete func(result string, err error)
	// Priority sets the task's scheduling priority (higher = run first). Default 0.
	Priority int
	// Tags are user-defined labels for filtering.
	Tags []string
	// Source indicates how the task was created: "delegate", "dispatch", "user", "cron".
	Source string
}

SubmitOpts configures optional behavior for Submit.

type TaskRecord

type TaskRecord struct {
	ID            string     `json:"id"`
	AgentID       string     `json:"agentId"`
	SessionKey    string     `json:"sessionKey"`
	Message       string     `json:"message"`
	Status        TaskStatus `json:"status"`
	Result        string     `json:"result,omitempty"`
	Error         string     `json:"error,omitempty"`
	CreatedAtMs   int64      `json:"createdAtMs"`
	StartedAtMs   int64      `json:"startedAtMs,omitempty"`
	CompletedAtMs int64      `json:"completedAtMs,omitempty"`
	DurationMs    int64      `json:"durationMs,omitempty"`
	Priority      int        `json:"priority"`       // higher = run first; default 0
	Tags          []string   `json:"tags,omitempty"` // user-defined labels
	Source        string     `json:"source"`         // "delegate", "dispatch", "user", "cron"
}

TaskRecord is the persisted representation of a task.

type TaskStatus

type TaskStatus string

TaskStatus represents the lifecycle state of a task.

const (
	StatusPending   TaskStatus = "pending"
	StatusRunning   TaskStatus = "running"
	StatusSuccess   TaskStatus = "success"
	StatusFailed    TaskStatus = "failed"
	StatusCancelled TaskStatus = "cancelled"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL