Documentation
¶
Overview ¶
Package task provides a small worker pool and optional priority scheduler for async work units.
Index ¶
- type PoolOption
- type PriorityQueue
- func (pq *PriorityQueue) GetPosition(taskID string) int
- func (pq *PriorityQueue) Len() int
- func (pq *PriorityQueue) Pop() any
- func (pq *PriorityQueue) Push(task any, priority int, taskID string)
- func (pq *PriorityQueue) Remove(taskID string) bool
- func (pq *PriorityQueue) Snapshot() []QueueItemSnapshot
- type QueueItemSnapshot
- type Scheduler
- func (s *Scheduler[Params, Result]) CancelTaskByID(taskID string) bool
- func (s *Scheduler[Params, Result]) GetTaskPosition(taskID string) int
- func (s *Scheduler[Params, Result]) PendingSnapshot() []QueueItemSnapshot
- func (s *Scheduler[Params, Result]) Pool() *TaskPool[Params, Result]
- func (s *Scheduler[Params, Result]) QueueLen() int
- func (s *Scheduler[Params, Result]) RunningCount() int32
- func (s *Scheduler[Params, Result]) Stats() Stats
- func (s *Scheduler[Params, Result]) Stop() error
- func (s *Scheduler[Params, Result]) SubmitTask(ctx context.Context, priority int, param Params, ...) *Task[Params, Result]
- type Stats
- type Task
- type TaskPool
- func (tp *TaskPool[Param, Result]) AddTask(ctx context.Context, param Param, ...) (*Task[Param, Result], error)
- func (tp *TaskPool[Param, Result]) CancelTask(task *Task[Param, Result])
- func (tp *TaskPool[Param, Result]) Close()
- func (tp *TaskPool[Param, Result]) Enqueue(task *Task[Param, Result]) error
- func (tp *TaskPool[Param, Result]) QueueLen() int
- func (tp *TaskPool[Param, Result]) Running() int32
- func (tp *TaskPool[Param, Result]) WorkerCount() int
- type TaskStatus
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type PoolOption ¶
PoolOption configures a TaskPool.
type PriorityQueue ¶
type PriorityQueue struct {
// contains filtered or unexported fields
}
PriorityQueue is a mutex-protected priority list (higher priority first).
func (*PriorityQueue) GetPosition ¶
func (pq *PriorityQueue) GetPosition(taskID string) int
GetPosition returns queue index (0 = next to run) or -1.
func (*PriorityQueue) Pop ¶
func (pq *PriorityQueue) Pop() any
Pop removes and returns the highest-priority task, or nil if empty.
func (*PriorityQueue) Push ¶
func (pq *PriorityQueue) Push(task any, priority int, taskID string)
Push inserts task with priority (larger runs earlier).
func (*PriorityQueue) Remove ¶
func (pq *PriorityQueue) Remove(taskID string) bool
Remove drops a task by id; returns true if removed.
func (*PriorityQueue) Snapshot ¶
func (pq *PriorityQueue) Snapshot() []QueueItemSnapshot
Snapshot returns current pending order (0 is next to dispatch).
type QueueItemSnapshot ¶
QueueItemSnapshot is one pending item in priority queue.
type Scheduler ¶
type Scheduler[Params, Result any] struct { // contains filtered or unexported fields }
Scheduler multiplexes prioritized submissions onto a TaskPool.
func NewScheduler ¶
NewScheduler builds a scheduler with workerCount workers.
func (*Scheduler[Params, Result]) CancelTaskByID ¶
CancelTaskByID removes a pending task from the priority queue.
func (*Scheduler[Params, Result]) GetTaskPosition ¶
GetTaskPosition returns queue position (0 = next) or -1.
func (*Scheduler[Params, Result]) PendingSnapshot ¶
func (s *Scheduler[Params, Result]) PendingSnapshot() []QueueItemSnapshot
PendingSnapshot returns pending task IDs and priorities in queue order.
func (*Scheduler[Params, Result]) Pool ¶
Pool returns the underlying pool (for direct AddTask when priority is not needed).
func (*Scheduler[Params, Result]) QueueLen ¶
QueueLen is tasks waiting in the priority queue (not yet in the pool channel).
func (*Scheduler[Params, Result]) RunningCount ¶
RunningCount is handlers currently executing in the pool.
func (*Scheduler[Params, Result]) Stats ¶
Stats returns queued, channel backlog, running, and unfinished totals.
func (*Scheduler[Params, Result]) Stop ¶
Stop stops dispatching new tasks. Running tasks continue to completion; the worker pool is closed afterward.
func (*Scheduler[Params, Result]) SubmitTask ¶
func (s *Scheduler[Params, Result]) SubmitTask( ctx context.Context, priority int, param Params, handler func(ctx context.Context, params Params) (Result, error), ) *Task[Params, Result]
SubmitTask enqueues by priority (higher runs earlier among waiting tasks).
type Stats ¶
type Stats struct {
Queued int // priority queue length
ChannelLen int // tasks sitting in pool channel
Running int32 // handlers executing
Unfinished int // Queued + ChannelLen + Running
}
Stats exposes scheduler + pool visibility for dashboards.
type Task ¶
type Task[Params, Result any] struct { ID string Priority int Params Params Handler func(ctx context.Context, params Params) (Result, error) Result chan Result Err chan error Status atomic.Value Progress atomic.Int32 SubmitTime time.Time // contains filtered or unexported fields }
Task carries one handler invocation.
type TaskPool ¶
TaskPool runs handlers on a fixed number of workers fed from a buffered channel.
func NewTaskPool ¶
func NewTaskPool[Param any, Result any](opt *PoolOption) *TaskPool[Param, Result]
NewTaskPool starts workerCount goroutines consuming from an internal queue.
func (*TaskPool[Param, Result]) AddTask ¶
func (tp *TaskPool[Param, Result]) AddTask(ctx context.Context, param Param, handler func(ctx context.Context, p Param) (Result, error)) (*Task[Param, Result], error)
AddTask submits work directly to the pool (no priority queue).
func (*TaskPool[Param, Result]) CancelTask ¶
CancelTask cancels a task's context (running handler should respect ctx).
func (*TaskPool[Param, Result]) Close ¶
func (tp *TaskPool[Param, Result]) Close()
Close stops accepting new work and waits for workers to drain the queue.
func (*TaskPool[Param, Result]) Enqueue ¶
Enqueue schedules a task that was already constructed (used by Scheduler).
func (*TaskPool[Param, Result]) WorkerCount ¶
WorkerCount returns configured workers.
type TaskStatus ¶
type TaskStatus string
TaskStatus is the lifecycle of a submitted unit of work.
const ( TaskStatusPending TaskStatus = "pending" TaskStatusRunning TaskStatus = "running" TaskStatusSuccess TaskStatus = "success" TaskStatusFailed TaskStatus = "failed" TaskStatusCanceled TaskStatus = "canceled" )
func (TaskStatus) String ¶
func (t TaskStatus) String() string