task

package
v0.0.0-...-f17429d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 26, 2026 License: GPL-3.0 Imports: 9 Imported by: 0

Documentation

Overview

Package task provides a small worker pool and optional priority scheduler for async work units.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type PoolOption

type PoolOption struct {
	WorkerCount int
	QueueSize   int
	Logger      *zap.Logger
}

PoolOption configures a TaskPool.

type PriorityQueue

type PriorityQueue struct {
	// contains filtered or unexported fields
}

PriorityQueue is a mutex-protected priority list (higher priority first).

func (*PriorityQueue) GetPosition

func (pq *PriorityQueue) GetPosition(taskID string) int

GetPosition returns queue index (0 = next to run) or -1.

func (*PriorityQueue) Len

func (pq *PriorityQueue) Len() int

Len returns queued task count.

func (*PriorityQueue) Pop

func (pq *PriorityQueue) Pop() any

Pop removes and returns the highest-priority task, or nil if empty.

func (*PriorityQueue) Push

func (pq *PriorityQueue) Push(task any, priority int, taskID string)

Push inserts task with priority (larger runs earlier).

func (*PriorityQueue) Remove

func (pq *PriorityQueue) Remove(taskID string) bool

Remove drops a task by id; returns true if removed.

func (*PriorityQueue) Snapshot

func (pq *PriorityQueue) Snapshot() []QueueItemSnapshot

Snapshot returns current pending order (0 is next to dispatch).

type QueueItemSnapshot

type QueueItemSnapshot struct {
	TaskID   string
	Priority int
}

QueueItemSnapshot is one pending item in priority queue.

type Scheduler

type Scheduler[Params, Result any] struct {
	// contains filtered or unexported fields
}

Scheduler multiplexes prioritized submissions onto a TaskPool.

func NewScheduler

func NewScheduler[Params, Result any](workerCount int, lg *zap.Logger) *Scheduler[Params, Result]

NewScheduler builds a scheduler with workerCount workers.

func (*Scheduler[Params, Result]) CancelTaskByID

func (s *Scheduler[Params, Result]) CancelTaskByID(taskID string) bool

CancelTaskByID removes a pending task from the priority queue.

func (*Scheduler[Params, Result]) GetTaskPosition

func (s *Scheduler[Params, Result]) GetTaskPosition(taskID string) int

GetTaskPosition returns queue position (0 = next) or -1.

func (*Scheduler[Params, Result]) PendingSnapshot

func (s *Scheduler[Params, Result]) PendingSnapshot() []QueueItemSnapshot

PendingSnapshot returns pending task IDs and priorities in queue order.

func (*Scheduler[Params, Result]) Pool

func (s *Scheduler[Params, Result]) Pool() *TaskPool[Params, Result]

Pool returns the underlying pool (for direct AddTask when priority is not needed).

func (*Scheduler[Params, Result]) QueueLen

func (s *Scheduler[Params, Result]) QueueLen() int

QueueLen is tasks waiting in the priority queue (not yet in the pool channel).

func (*Scheduler[Params, Result]) RunningCount

func (s *Scheduler[Params, Result]) RunningCount() int32

RunningCount is handlers currently executing in the pool.

func (*Scheduler[Params, Result]) Stats

func (s *Scheduler[Params, Result]) Stats() Stats

Stats returns queued, channel backlog, running, and unfinished totals.

func (*Scheduler[Params, Result]) Stop

func (s *Scheduler[Params, Result]) Stop() error

Stop stops dispatching new tasks. Running tasks continue to completion; the worker pool is closed afterward.

func (*Scheduler[Params, Result]) SubmitTask

func (s *Scheduler[Params, Result]) SubmitTask(
	ctx context.Context,
	priority int,
	param Params,
	handler func(ctx context.Context, params Params) (Result, error),
) *Task[Params, Result]

SubmitTask enqueues by priority (higher runs earlier among waiting tasks).

type Stats

type Stats struct {
	Queued     int   // priority queue length
	ChannelLen int   // tasks sitting in pool channel
	Running    int32 // handlers executing
	Unfinished int   // Queued + ChannelLen + Running
}

Stats exposes scheduler + pool visibility for dashboards.

type Task

type Task[Params, Result any] struct {
	ID string

	Priority   int
	Params     Params
	Handler    func(ctx context.Context, params Params) (Result, error)
	Result     chan Result
	Err        chan error
	Status     atomic.Value
	Progress   atomic.Int32
	SubmitTime time.Time
	// contains filtered or unexported fields
}

Task carries one handler invocation.

func (*Task[Params, Result]) Wait

func (t *Task[Params, Result]) Wait(ctx context.Context) (Result, error)

Wait blocks until the task completes or ctx is canceled.

type TaskPool

type TaskPool[Param any, Result any] struct {
	// contains filtered or unexported fields
}

TaskPool runs handlers on a fixed number of workers fed from a buffered channel.

func NewTaskPool

func NewTaskPool[Param any, Result any](opt *PoolOption) *TaskPool[Param, Result]

NewTaskPool starts workerCount goroutines consuming from an internal queue.

func (*TaskPool[Param, Result]) AddTask

func (tp *TaskPool[Param, Result]) AddTask(ctx context.Context, param Param, handler func(ctx context.Context, p Param) (Result, error)) (*Task[Param, Result], error)

AddTask submits work directly to the pool (no priority queue).

func (*TaskPool[Param, Result]) CancelTask

func (tp *TaskPool[Param, Result]) CancelTask(task *Task[Param, Result])

CancelTask cancels a task's context (running handler should respect ctx).

func (*TaskPool[Param, Result]) Close

func (tp *TaskPool[Param, Result]) Close()

Close stops accepting new work and waits for workers to drain the queue.

func (*TaskPool[Param, Result]) Enqueue

func (tp *TaskPool[Param, Result]) Enqueue(task *Task[Param, Result]) error

Enqueue schedules a task that was already constructed (used by Scheduler).

func (*TaskPool[Param, Result]) QueueLen

func (tp *TaskPool[Param, Result]) QueueLen() int

QueueLen is the number of tasks waiting in the channel.

func (*TaskPool[Param, Result]) Running

func (tp *TaskPool[Param, Result]) Running() int32

Running returns in-flight handler count.

func (*TaskPool[Param, Result]) WorkerCount

func (tp *TaskPool[Param, Result]) WorkerCount() int

WorkerCount returns configured workers.

type TaskStatus

type TaskStatus string

TaskStatus is the lifecycle of a submitted unit of work.

const (
	TaskStatusPending  TaskStatus = "pending"
	TaskStatusRunning  TaskStatus = "running"
	TaskStatusSuccess  TaskStatus = "success"
	TaskStatusFailed   TaskStatus = "failed"
	TaskStatusCanceled TaskStatus = "canceled"
)

func (TaskStatus) String

func (t TaskStatus) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL