Documentation
¶
Index ¶
- Variables
- func PostDelayedTaskAndReplyWithResult[T any](targetRunner TaskRunner, task TaskWithResult[T], delay time.Duration, ...)
- func PostDelayedTaskAndReplyWithResultAndTraits[T any](targetRunner TaskRunner, task TaskWithResult[T], delay time.Duration, ...)
- func PostTaskAndReplyWithResult[T any](targetRunner TaskRunner, task TaskWithResult[T], reply ReplyWithResult[T], ...)
- func PostTaskAndReplyWithResultAndTraits[T any](targetRunner TaskRunner, task TaskWithResult[T], taskTraits TaskTraits, ...)
- func RegisterHandler[T any](m *JobManager, jobType string, handler TypedHandler[T]) error
- type DefaultLogger
- type DefaultPanicHandler
- type DefaultRejectedTaskHandler
- type DelayManager
- type DelayedTask
- type DelayedTaskHeap
- type DurableJobStore
- type ErrorHandler
- type FIFOTaskQueue
- func (q *FIFOTaskQueue) Clear()
- func (q *FIFOTaskQueue) IsEmpty() bool
- func (q *FIFOTaskQueue) Len() int
- func (q *FIFOTaskQueue) MaybeCompact()
- func (q *FIFOTaskQueue) PeekTraits() (TaskTraits, bool)
- func (q *FIFOTaskQueue) Pop() (TaskItem, bool)
- func (q *FIFOTaskQueue) PopUpTo(max int) []TaskItem
- func (q *FIFOTaskQueue) Push(t Task, traits TaskTraits)
- func (q *FIFOTaskQueue) PushWithID(t Task, traits TaskTraits) TaskID
- type Field
- type JSONSerializer
- type JobEntity
- type JobFilter
- type JobManager
- func (m *JobManager) CancelJob(id string) error
- func (m *JobManager) GetActiveJobCount() int
- func (m *JobManager) GetActiveJobs() []*JobEntity
- func (m *JobManager) GetJob(ctx context.Context, id string) (*JobEntity, error)
- func (m *JobManager) GetRetryPolicy() RetryPolicy
- func (m *JobManager) ListJobs(ctx context.Context, filter JobFilter) ([]*JobEntity, error)
- func (m *JobManager) SetErrorHandler(handler ErrorHandler)
- func (m *JobManager) SetLogger(logger Logger)
- func (m *JobManager) SetRetryPolicy(policy RetryPolicy)
- func (m *JobManager) Shutdown(ctx context.Context) error
- func (m *JobManager) Start(ctx context.Context) error
- func (m *JobManager) SubmitDelayedJob(ctx context.Context, id string, jobType string, args any, delay time.Duration, ...) error
- func (m *JobManager) SubmitJob(ctx context.Context, id string, jobType string, args any, traits TaskTraits) error
- type JobSerializer
- type JobStatus
- type JobStore
- type Logger
- type MemoryJobStore
- func (s *MemoryJobStore) Clear()
- func (s *MemoryJobStore) Count() int
- func (s *MemoryJobStore) CreateJob(ctx context.Context, job *JobEntity) error
- func (s *MemoryJobStore) DeleteJob(ctx context.Context, id string) error
- func (s *MemoryJobStore) GetJob(ctx context.Context, id string) (*JobEntity, error)
- func (s *MemoryJobStore) GetRecoverableJobs(ctx context.Context) ([]*JobEntity, error)
- func (s *MemoryJobStore) ListJobs(ctx context.Context, filter JobFilter) ([]*JobEntity, error)
- func (s *MemoryJobStore) SaveJob(ctx context.Context, job *JobEntity) error
- func (s *MemoryJobStore) UpdateStatus(ctx context.Context, id string, status JobStatus, result string) error
- type Metrics
- type NilMetrics
- func (m *NilMetrics) RecordQueueDepth(runnerName string, depth int)
- func (m *NilMetrics) RecordTaskDuration(runnerName string, priority TaskPriority, duration time.Duration)
- func (m *NilMetrics) RecordTaskPanic(runnerName string, panicInfo any)
- func (m *NilMetrics) RecordTaskRejected(runnerName string, reason string)
- type NoOpLogger
- type PanicHandler
- type ParallelTaskRunner
- func (r *ParallelTaskRunner) FlushAsync(callback func())
- func (r *ParallelTaskRunner) GetThreadPool() ThreadPool
- func (r *ParallelTaskRunner) IsClosed() bool
- func (r *ParallelTaskRunner) MaxConcurrency() int
- func (r *ParallelTaskRunner) Metadata() map[string]any
- func (r *ParallelTaskRunner) Name() string
- func (r *ParallelTaskRunner) PendingTaskCount() int
- func (r *ParallelTaskRunner) PostDelayedTask(task Task, delay time.Duration)
- func (r *ParallelTaskRunner) PostDelayedTaskNamed(name string, task Task, delay time.Duration)
- func (r *ParallelTaskRunner) PostDelayedTaskWithTraits(task Task, delay time.Duration, traits TaskTraits)
- func (r *ParallelTaskRunner) PostDelayedTaskWithTraitsNamed(name string, task Task, delay time.Duration, traits TaskTraits)
- func (r *ParallelTaskRunner) PostRepeatingTask(task Task, interval time.Duration) RepeatingTaskHandle
- func (r *ParallelTaskRunner) PostRepeatingTaskWithInitialDelay(task Task, initialDelay, interval time.Duration, traits TaskTraits) RepeatingTaskHandle
- func (r *ParallelTaskRunner) PostRepeatingTaskWithTraits(task Task, interval time.Duration, traits TaskTraits) RepeatingTaskHandle
- func (r *ParallelTaskRunner) PostTask(task Task)
- func (r *ParallelTaskRunner) PostTaskAndReply(task Task, reply Task, replyRunner TaskRunner)
- func (r *ParallelTaskRunner) PostTaskAndReplyWithTraits(task Task, taskTraits TaskTraits, reply Task, replyTraits TaskTraits, ...)
- func (r *ParallelTaskRunner) PostTaskNamed(name string, task Task)
- func (r *ParallelTaskRunner) PostTaskWithTraits(task Task, traits TaskTraits)
- func (r *ParallelTaskRunner) PostTaskWithTraitsNamed(name string, task Task, traits TaskTraits)
- func (r *ParallelTaskRunner) RecentTasks(limit int) []TaskExecutionRecord
- func (r *ParallelTaskRunner) RunningTaskCount() int
- func (r *ParallelTaskRunner) SetMetadata(key string, value any)
- func (r *ParallelTaskRunner) SetName(name string)
- func (r *ParallelTaskRunner) Shutdown()
- func (r *ParallelTaskRunner) Stats() RunnerStats
- func (r *ParallelTaskRunner) WaitIdle(ctx context.Context) error
- func (r *ParallelTaskRunner) WaitShutdown(ctx context.Context) error
- type PoolStats
- type PriorityTaskQueue
- func (q *PriorityTaskQueue) Clear()
- func (q *PriorityTaskQueue) IsEmpty() bool
- func (q *PriorityTaskQueue) Len() int
- func (q *PriorityTaskQueue) MaybeCompact()
- func (q *PriorityTaskQueue) PeekTraits() (TaskTraits, bool)
- func (q *PriorityTaskQueue) Pop() (TaskItem, bool)
- func (q *PriorityTaskQueue) PopUpTo(max int) []TaskItem
- func (q *PriorityTaskQueue) Push(t Task, traits TaskTraits)
- func (q *PriorityTaskQueue) PushWithID(t Task, traits TaskTraits) TaskID
- type QueuePolicy
- type RawJobHandler
- type RejectedTaskHandler
- type RejectionCallback
- type RepeatingTaskHandle
- type ReplyWithResult
- type RetryPolicy
- type RunnerStats
- type SequencedTaskRunner
- func (r *SequencedTaskRunner) FlushAsync(callback func())
- func (r *SequencedTaskRunner) GetThreadPool() ThreadPool
- func (r *SequencedTaskRunner) IsClosed() bool
- func (r *SequencedTaskRunner) Metadata() map[string]any
- func (r *SequencedTaskRunner) Name() string
- func (r *SequencedTaskRunner) PendingTaskCount() int
- func (r *SequencedTaskRunner) PostDelayedTask(task Task, delay time.Duration)
- func (r *SequencedTaskRunner) PostDelayedTaskNamed(name string, task Task, delay time.Duration)
- func (r *SequencedTaskRunner) PostDelayedTaskWithTraits(task Task, delay time.Duration, traits TaskTraits)
- func (r *SequencedTaskRunner) PostDelayedTaskWithTraitsNamed(name string, task Task, delay time.Duration, traits TaskTraits)
- func (r *SequencedTaskRunner) PostRepeatingTask(task Task, interval time.Duration) RepeatingTaskHandle
- func (r *SequencedTaskRunner) PostRepeatingTaskWithInitialDelay(task Task, initialDelay, interval time.Duration, traits TaskTraits) RepeatingTaskHandle
- func (r *SequencedTaskRunner) PostRepeatingTaskWithTraits(task Task, interval time.Duration, traits TaskTraits) RepeatingTaskHandle
- func (r *SequencedTaskRunner) PostTask(task Task)
- func (r *SequencedTaskRunner) PostTaskAndReply(task Task, reply Task, replyRunner TaskRunner)
- func (r *SequencedTaskRunner) PostTaskAndReplyWithTraits(task Task, taskTraits TaskTraits, reply Task, replyTraits TaskTraits, ...)
- func (r *SequencedTaskRunner) PostTaskNamed(name string, task Task)
- func (r *SequencedTaskRunner) PostTaskWithTraits(task Task, traits TaskTraits)
- func (r *SequencedTaskRunner) PostTaskWithTraitsNamed(name string, task Task, traits TaskTraits)
- func (r *SequencedTaskRunner) RecentTasks(limit int) []TaskExecutionRecord
- func (r *SequencedTaskRunner) RunningTaskCount() int
- func (r *SequencedTaskRunner) SetMetadata(key string, value any)
- func (r *SequencedTaskRunner) SetName(name string)
- func (r *SequencedTaskRunner) Shutdown()
- func (r *SequencedTaskRunner) Stats() RunnerStats
- func (r *SequencedTaskRunner) WaitIdle(ctx context.Context) error
- func (r *SequencedTaskRunner) WaitShutdown(ctx context.Context) error
- type SingleThreadTaskRunner
- func (r *SingleThreadTaskRunner) FlushAsync(callback func())
- func (r *SingleThreadTaskRunner) GetQueuePolicy() QueuePolicy
- func (r *SingleThreadTaskRunner) GetThreadPool() ThreadPool
- func (r *SingleThreadTaskRunner) IsClosed() bool
- func (r *SingleThreadTaskRunner) Metadata() map[string]any
- func (r *SingleThreadTaskRunner) Name() string
- func (r *SingleThreadTaskRunner) PendingTaskCount() int
- func (r *SingleThreadTaskRunner) PostDelayedTask(task Task, delay time.Duration)
- func (r *SingleThreadTaskRunner) PostDelayedTaskNamed(name string, task Task, delay time.Duration)
- func (r *SingleThreadTaskRunner) PostDelayedTaskWithTraits(task Task, delay time.Duration, traits TaskTraits)
- func (r *SingleThreadTaskRunner) PostDelayedTaskWithTraitsNamed(name string, task Task, delay time.Duration, traits TaskTraits)
- func (r *SingleThreadTaskRunner) PostRepeatingTask(task Task, interval time.Duration) RepeatingTaskHandle
- func (r *SingleThreadTaskRunner) PostRepeatingTaskWithInitialDelay(task Task, initialDelay, interval time.Duration, traits TaskTraits) RepeatingTaskHandle
- func (r *SingleThreadTaskRunner) PostRepeatingTaskWithTraits(task Task, interval time.Duration, traits TaskTraits) RepeatingTaskHandle
- func (r *SingleThreadTaskRunner) PostTask(task Task)
- func (r *SingleThreadTaskRunner) PostTaskAndReply(task Task, reply Task, replyRunner TaskRunner)
- func (r *SingleThreadTaskRunner) PostTaskAndReplyWithTraits(task Task, taskTraits TaskTraits, reply Task, replyTraits TaskTraits, ...)
- func (r *SingleThreadTaskRunner) PostTaskNamed(name string, task Task)
- func (r *SingleThreadTaskRunner) PostTaskWithTraits(task Task, traits TaskTraits)
- func (r *SingleThreadTaskRunner) PostTaskWithTraitsNamed(name string, task Task, traits TaskTraits)
- func (r *SingleThreadTaskRunner) RecentTasks(limit int) []TaskExecutionRecord
- func (r *SingleThreadTaskRunner) RejectedCount() int64
- func (r *SingleThreadTaskRunner) RunningTaskCount() int
- func (r *SingleThreadTaskRunner) SetMetadata(key string, value any)
- func (r *SingleThreadTaskRunner) SetName(name string)
- func (r *SingleThreadTaskRunner) SetQueuePolicy(policy QueuePolicy)
- func (r *SingleThreadTaskRunner) SetRejectionCallback(callback RejectionCallback)
- func (r *SingleThreadTaskRunner) Shutdown()
- func (r *SingleThreadTaskRunner) Stats() RunnerStats
- func (r *SingleThreadTaskRunner) Stop()
- func (r *SingleThreadTaskRunner) WaitIdle(ctx context.Context) error
- func (r *SingleThreadTaskRunner) WaitShutdown(ctx context.Context) error
- type Task
- type TaskExecutionRecord
- type TaskID
- type TaskItem
- type TaskPriority
- type TaskQueue
- type TaskRunner
- type TaskScheduler
- func NewFIFOTaskScheduler(workerCount int) *TaskScheduler
- func NewFIFOTaskSchedulerWithConfig(workerCount int, config *TaskSchedulerConfig) *TaskScheduler
- func NewPriorityTaskScheduler(workerCount int) *TaskScheduler
- func NewPriorityTaskSchedulerWithConfig(workerCount int, config *TaskSchedulerConfig) *TaskScheduler
- func (s *TaskScheduler) ActiveTaskCount() int
- func (s *TaskScheduler) DelayedTaskCount() int
- func (s *TaskScheduler) GetMetrics() Metrics
- func (s *TaskScheduler) GetPanicHandler() PanicHandler
- func (s *TaskScheduler) GetWork(stopCh <-chan struct{}) (TaskItem, bool)
- func (s *TaskScheduler) OnTaskEnd()
- func (s *TaskScheduler) OnTaskStart()
- func (s *TaskScheduler) PostDelayedInternal(task Task, delay time.Duration, traits TaskTraits, target TaskRunner)
- func (s *TaskScheduler) PostInternal(task Task, traits TaskTraits)
- func (s *TaskScheduler) QueuedTaskCount() int
- func (s *TaskScheduler) Shutdown()
- func (s *TaskScheduler) ShutdownGraceful(timeout time.Duration) error
- func (s *TaskScheduler) WorkerCount() int
- type TaskSchedulerConfig
- type TaskTraits
- type TaskWithResult
- type ThreadPool
- type TypedHandler
- type WorkSource
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrJobAlreadyExists = errors.New("job already exists")
ErrJobAlreadyExists indicates the job ID already exists in persistent storage.
Functions ¶
func PostDelayedTaskAndReplyWithResult ¶
func PostDelayedTaskAndReplyWithResult[T any]( targetRunner TaskRunner, task TaskWithResult[T], delay time.Duration, reply ReplyWithResult[T], replyRunner TaskRunner, )
PostDelayedTaskAndReplyWithResult is similar to PostTaskAndReplyWithResult, but delays the execution of the task.
The reply is NOT delayed - it executes immediately after the task completes. Only the initial task execution is delayed by the specified duration.
Example:
PostDelayedTaskAndReplyWithResult(
runner,
func(ctx context.Context) (string, error) {
return "delayed result", nil
},
2*time.Second, // Wait 2 seconds before starting task
func(ctx context.Context, result string, err error) {
fmt.Println(result) // Executes immediately after task completes
},
replyRunner,
)
func PostDelayedTaskAndReplyWithResultAndTraits ¶
func PostDelayedTaskAndReplyWithResultAndTraits[T any]( targetRunner TaskRunner, task TaskWithResult[T], delay time.Duration, taskTraits TaskTraits, reply ReplyWithResult[T], replyTraits TaskTraits, replyRunner TaskRunner, )
PostDelayedTaskAndReplyWithResultAndTraits is the full-featured delayed version with separate traits for task and reply.
func PostTaskAndReplyWithResult ¶
func PostTaskAndReplyWithResult[T any]( targetRunner TaskRunner, task TaskWithResult[T], reply ReplyWithResult[T], replyRunner TaskRunner, )
PostTaskAndReplyWithResult executes a task that returns a result of type T and an error, then passes that result to a reply callback on the replyRunner.
This function uses closure capture to safely pass the result across goroutines. The captured variables (result and err) will escape to the heap, ensuring thread safety.
Execution guarantee (Happens-Before): - The task ALWAYS completes before the reply starts - The reply ALWAYS sees the final values written by the task - This is guaranteed by the sequential execution in wrappedTask
Example:
PostTaskAndReplyWithResult(
backgroundRunner,
func(ctx context.Context) (int, error) {
return len("Hello"), nil
},
func(ctx context.Context, length int, err error) {
fmt.Printf("Length: %d\n", length)
},
uiRunner,
)
func PostTaskAndReplyWithResultAndTraits ¶
func PostTaskAndReplyWithResultAndTraits[T any]( targetRunner TaskRunner, task TaskWithResult[T], taskTraits TaskTraits, reply ReplyWithResult[T], replyTraits TaskTraits, replyRunner TaskRunner, )
PostTaskAndReplyWithResultAndTraits is the full-featured version that allows specifying different traits for the task and reply separately.
This is useful when: - Task is background work (BestEffort) but reply is UI update (UserVisible/UserBlocking) - Task has different priority requirements than the reply
Example:
PostTaskAndReplyWithResultAndTraits(
backgroundRunner,
func(ctx context.Context) (*UserData, error) {
return fetchUserFromDB(ctx)
},
TraitsBestEffort(), // Background work, low priority
func(ctx context.Context, user *UserData, err error) {
updateUI(user)
},
TraitsUserVisible(), // UI update, higher priority
uiRunner,
)
func RegisterHandler ¶ added in v0.2.0
func RegisterHandler[T any](m *JobManager, jobType string, handler TypedHandler[T]) error
RegisterHandler registers a type-safe handler for a job type. The handler will be called with deserialized arguments of type T.
Types ¶
type DefaultLogger ¶ added in v0.2.0
type DefaultLogger struct{}
DefaultLogger is a simple logger implementation using the standard log package
func NewDefaultLogger ¶ added in v0.2.0
func NewDefaultLogger() *DefaultLogger
NewDefaultLogger creates a new DefaultLogger
func (*DefaultLogger) Debug ¶ added in v0.2.0
func (l *DefaultLogger) Debug(msg string, fields ...Field)
Debug logs a debug message
func (*DefaultLogger) Error ¶ added in v0.2.0
func (l *DefaultLogger) Error(msg string, fields ...Field)
Error logs an error message
func (*DefaultLogger) Info ¶ added in v0.2.0
func (l *DefaultLogger) Info(msg string, fields ...Field)
Info logs an info message
func (*DefaultLogger) Warn ¶ added in v0.2.0
func (l *DefaultLogger) Warn(msg string, fields ...Field)
Warn logs a warning message
type DefaultPanicHandler ¶ added in v0.3.0
type DefaultPanicHandler struct{}
DefaultPanicHandler provides a basic panic handler that logs to stdout.
func (*DefaultPanicHandler) HandlePanic ¶ added in v0.3.0
func (h *DefaultPanicHandler) HandlePanic(ctx context.Context, runnerName string, workerID int, panicInfo any, stackTrace []byte)
HandlePanic prints panic information to stdout.
type DefaultRejectedTaskHandler ¶ added in v0.3.0
type DefaultRejectedTaskHandler struct{}
DefaultRejectedTaskHandler provides a basic handler that logs rejected tasks.
func (*DefaultRejectedTaskHandler) HandleRejectedTask ¶ added in v0.3.0
func (h *DefaultRejectedTaskHandler) HandleRejectedTask(runnerName string, reason string)
HandleRejectedTask logs the rejected task.
type DelayManager ¶
type DelayManager struct {
// contains filtered or unexported fields
}
func NewDelayManager ¶
func NewDelayManager() *DelayManager
func (*DelayManager) AddDelayedTask ¶
func (dm *DelayManager) AddDelayedTask(task Task, delay time.Duration, traits TaskTraits, target TaskRunner)
func (*DelayManager) Stop ¶
func (dm *DelayManager) Stop()
func (*DelayManager) TaskCount ¶
func (dm *DelayManager) TaskCount() int
type DelayedTask ¶
type DelayedTask struct {
RunAt time.Time
Task Task
Traits TaskTraits
Target TaskRunner
// contains filtered or unexported fields
}
DelayedTask represents a task scheduled for the future
type DelayedTaskHeap ¶
type DelayedTaskHeap []*DelayedTask
DelayedTaskHeap implements heap.Interface
func (DelayedTaskHeap) Len ¶
func (h DelayedTaskHeap) Len() int
func (DelayedTaskHeap) Less ¶
func (h DelayedTaskHeap) Less(i, j int) bool
func (*DelayedTaskHeap) Peek ¶
func (h *DelayedTaskHeap) Peek() *DelayedTask
func (*DelayedTaskHeap) Pop ¶
func (h *DelayedTaskHeap) Pop() any
func (*DelayedTaskHeap) Push ¶
func (h *DelayedTaskHeap) Push(x any)
func (DelayedTaskHeap) Swap ¶
func (h DelayedTaskHeap) Swap(i, j int)
type DurableJobStore ¶ added in v0.3.0
DurableJobStore provides atomic create semantics for durable-ack submission. Implement this interface to guarantee CreateJob fails when a job ID already exists.
type ErrorHandler ¶ added in v0.2.0
ErrorHandler is called when an IO operation fails after all retries
type FIFOTaskQueue ¶
type FIFOTaskQueue struct {
// contains filtered or unexported fields
}
func NewFIFOTaskQueue ¶
func NewFIFOTaskQueue() *FIFOTaskQueue
func (*FIFOTaskQueue) Clear ¶
func (q *FIFOTaskQueue) Clear()
Clear removes all tasks from the queue and releases references
func (*FIFOTaskQueue) IsEmpty ¶
func (q *FIFOTaskQueue) IsEmpty() bool
func (*FIFOTaskQueue) Len ¶
func (q *FIFOTaskQueue) Len() int
func (*FIFOTaskQueue) MaybeCompact ¶
func (q *FIFOTaskQueue) MaybeCompact()
func (*FIFOTaskQueue) PeekTraits ¶
func (q *FIFOTaskQueue) PeekTraits() (TaskTraits, bool)
func (*FIFOTaskQueue) Pop ¶
func (q *FIFOTaskQueue) Pop() (TaskItem, bool)
func (*FIFOTaskQueue) PopUpTo ¶
func (q *FIFOTaskQueue) PopUpTo(max int) []TaskItem
func (*FIFOTaskQueue) Push ¶
func (q *FIFOTaskQueue) Push(t Task, traits TaskTraits)
func (*FIFOTaskQueue) PushWithID ¶ added in v0.3.0
func (q *FIFOTaskQueue) PushWithID(t Task, traits TaskTraits) TaskID
type JSONSerializer ¶ added in v0.2.0
type JSONSerializer struct{}
JSONSerializer uses JSON encoding for serialization.
func NewJSONSerializer ¶ added in v0.2.0
func NewJSONSerializer() *JSONSerializer
NewJSONSerializer creates a new JSON serializer
func (*JSONSerializer) Deserialize ¶ added in v0.2.0
func (s *JSONSerializer) Deserialize(data []byte, target any) error
func (*JSONSerializer) Name ¶ added in v0.2.0
func (s *JSONSerializer) Name() string
type JobManager ¶ added in v0.2.0
type JobManager struct {
// contains filtered or unexported fields
}
JobManager manages job lifecycle using a three-layer runner architecture: - Layer 1 (controlRunner): Fast control operations (<100μs, pure memory) - Layer 2 (ioRunner): Sequential IO operations (database, file, network) - Layer 3 (executionRunner): User job execution (may be slow/blocking)
func NewJobManager ¶ added in v0.2.0
func NewJobManager( controlRunner TaskRunner, ioRunner TaskRunner, executionRunner TaskRunner, store JobStore, serializer JobSerializer, ) *JobManager
NewJobManager creates a new JobManager with the three-layer runner architecture
func (*JobManager) CancelJob ¶ added in v0.2.0
func (m *JobManager) CancelJob(id string) error
CancelJob cancels an active job
func (*JobManager) GetActiveJobCount ¶ added in v0.2.0
func (m *JobManager) GetActiveJobCount() int
GetActiveJobCount returns the number of active jobs
func (*JobManager) GetActiveJobs ¶ added in v0.2.0
func (m *JobManager) GetActiveJobs() []*JobEntity
GetActiveJobs returns a snapshot of active jobs
func (*JobManager) GetRetryPolicy ¶ added in v0.2.0
func (m *JobManager) GetRetryPolicy() RetryPolicy
GetRetryPolicy returns the current retry policy
func (*JobManager) ListJobs ¶ added in v0.2.0
ListJobs returns jobs matching the filter (allows slight delay from recent writes)
func (*JobManager) SetErrorHandler ¶ added in v0.2.0
func (m *JobManager) SetErrorHandler(handler ErrorHandler)
SetErrorHandler sets a custom error handler for failed IO operations
func (*JobManager) SetLogger ¶ added in v0.2.0
func (m *JobManager) SetLogger(logger Logger)
SetLogger sets the logger for JobManager
func (*JobManager) SetRetryPolicy ¶ added in v0.2.0
func (m *JobManager) SetRetryPolicy(policy RetryPolicy)
SetRetryPolicy sets the retry policy for IO operations
func (*JobManager) Shutdown ¶ added in v0.2.0
func (m *JobManager) Shutdown(ctx context.Context) error
Shutdown gracefully shuts down the JobManager
func (*JobManager) Start ¶ added in v0.2.0
func (m *JobManager) Start(ctx context.Context) error
Start initializes the JobManager and recovers unfinished jobs
func (*JobManager) SubmitDelayedJob ¶ added in v0.2.0
func (m *JobManager) SubmitDelayedJob( ctx context.Context, id string, jobType string, args any, delay time.Duration, traits TaskTraits, ) error
SubmitDelayedJob submits a job with a delay before execution
func (*JobManager) SubmitJob ¶ added in v0.2.0
func (m *JobManager) SubmitJob(ctx context.Context, id string, jobType string, args any, traits TaskTraits) error
SubmitJob submits a job for immediate execution
type JobSerializer ¶ added in v0.2.0
type JobSerializer interface {
// Serialize converts a Go value to bytes
Serialize(args any) ([]byte, error)
// Deserialize converts bytes back to a Go value
Deserialize(data []byte, target any) error
// Name returns the serializer name (for debugging/logging)
Name() string
}
JobSerializer defines the interface for serializing and deserializing job arguments.
type JobStore ¶ added in v0.2.0
type JobStore interface {
// SaveJob saves a new job or updates an existing one
SaveJob(ctx context.Context, job *JobEntity) error
// UpdateStatus updates the status and result of a job
UpdateStatus(ctx context.Context, id string, status JobStatus, result string) error
// GetJob retrieves a job by ID
GetJob(ctx context.Context, id string) (*JobEntity, error)
// ListJobs returns jobs matching the filter
ListJobs(ctx context.Context, filter JobFilter) ([]*JobEntity, error)
// GetRecoverableJobs returns jobs that should be recovered on startup
// (typically PENDING jobs)
GetRecoverableJobs(ctx context.Context) ([]*JobEntity, error)
// DeleteJob removes a job from storage
DeleteJob(ctx context.Context, id string) error
}
JobStore defines the interface for job persistence. Implementations can use in-memory storage, databases, or other backends.
type Logger ¶ added in v0.2.0
type Logger interface {
// Debug logs a debug message with optional fields
Debug(msg string, fields ...Field)
// Info logs an info message with optional fields
Info(msg string, fields ...Field)
// Warn logs a warning message with optional fields
Warn(msg string, fields ...Field)
// Error logs an error message with optional fields
Error(msg string, fields ...Field)
}
Logger interface for structured logging Implementations can provide custom logging behavior (e.g., integration with logrus, zap, etc.)
type MemoryJobStore ¶ added in v0.2.0
type MemoryJobStore struct {
// contains filtered or unexported fields
}
MemoryJobStore is an in-memory implementation of JobStore. It uses sync.Map for concurrent-safe storage.
func NewMemoryJobStore ¶ added in v0.2.0
func NewMemoryJobStore() *MemoryJobStore
NewMemoryJobStore creates a new in-memory job store
func (*MemoryJobStore) Clear ¶ added in v0.2.0
func (s *MemoryJobStore) Clear()
Clear removes all jobs from the store (useful for testing)
func (*MemoryJobStore) Count ¶ added in v0.2.0
func (s *MemoryJobStore) Count() int
Count returns the total number of jobs in the store
func (*MemoryJobStore) CreateJob ¶ added in v0.3.0
func (s *MemoryJobStore) CreateJob(ctx context.Context, job *JobEntity) error
CreateJob inserts a new job atomically. Returns ErrJobAlreadyExists if the ID already exists.
func (*MemoryJobStore) DeleteJob ¶ added in v0.2.0
func (s *MemoryJobStore) DeleteJob(ctx context.Context, id string) error
func (*MemoryJobStore) GetRecoverableJobs ¶ added in v0.2.0
func (s *MemoryJobStore) GetRecoverableJobs(ctx context.Context) ([]*JobEntity, error)
func (*MemoryJobStore) SaveJob ¶ added in v0.2.0
func (s *MemoryJobStore) SaveJob(ctx context.Context, job *JobEntity) error
func (*MemoryJobStore) UpdateStatus ¶ added in v0.2.0
type Metrics ¶ added in v0.3.0
type Metrics interface {
// RecordTaskDuration records how long a task took to execute.
//
// Parameters:
// - runnerName: The name of the task runner
// - priority: The task priority
// - duration: How long the task took to execute
RecordTaskDuration(runnerName string, priority TaskPriority, duration time.Duration)
// RecordTaskPanic records that a task panicked during execution.
//
// Parameters:
// - runnerName: The name of the task runner
// - panicInfo: The panic value recovered from the task
RecordTaskPanic(runnerName string, panicInfo any)
// RecordQueueDepth records the current queue depth.
// This can be called periodically to track queue growth/shrinkage.
//
// Parameters:
// - runnerName: The name of the task runner
// - depth: The current number of tasks in the queue
RecordQueueDepth(runnerName string, depth int)
// RecordTaskRejected records that a task was rejected (e.g., during shutdown).
//
// Parameters:
// - runnerName: The name of the task runner
// - reason: Why the task was rejected
RecordTaskRejected(runnerName string, reason string)
}
Metrics defines the interface for collecting task execution metrics. Implementations can send metrics to monitoring systems (Prometheus, StatsD, etc.).
All methods are optional; implementations should handle nil receivers gracefully. Methods should be non-blocking and fast to avoid impacting task execution performance.
type NilMetrics ¶ added in v0.3.0
type NilMetrics struct{}
NilMetrics provides a no-op metrics implementation that does nothing. This is the default when no metrics interface is provided.
func (*NilMetrics) RecordQueueDepth ¶ added in v0.3.0
func (m *NilMetrics) RecordQueueDepth(runnerName string, depth int)
RecordQueueDepth is a no-op.
func (*NilMetrics) RecordTaskDuration ¶ added in v0.3.0
func (m *NilMetrics) RecordTaskDuration(runnerName string, priority TaskPriority, duration time.Duration)
RecordTaskDuration is a no-op.
func (*NilMetrics) RecordTaskPanic ¶ added in v0.3.0
func (m *NilMetrics) RecordTaskPanic(runnerName string, panicInfo any)
RecordTaskPanic is a no-op.
func (*NilMetrics) RecordTaskRejected ¶ added in v0.3.0
func (m *NilMetrics) RecordTaskRejected(runnerName string, reason string)
RecordTaskRejected is a no-op.
type NoOpLogger ¶ added in v0.2.0
type NoOpLogger struct{}
NoOpLogger is a logger that discards all log messages Useful for tests or when logging is not desired
func NewNoOpLogger ¶ added in v0.2.0
func NewNoOpLogger() *NoOpLogger
NewNoOpLogger creates a new NoOpLogger
func (*NoOpLogger) Debug ¶ added in v0.2.0
func (l *NoOpLogger) Debug(msg string, fields ...Field)
func (*NoOpLogger) Error ¶ added in v0.2.0
func (l *NoOpLogger) Error(msg string, fields ...Field)
func (*NoOpLogger) Info ¶ added in v0.2.0
func (l *NoOpLogger) Info(msg string, fields ...Field)
func (*NoOpLogger) Warn ¶ added in v0.2.0
func (l *NoOpLogger) Warn(msg string, fields ...Field)
type PanicHandler ¶ added in v0.3.0
type PanicHandler interface {
// HandlePanic is called when a task panics.
//
// Parameters:
// - ctx: The context from the panicked task (may contain task runner info)
// - runnerName: The name of the task runner where the panic occurred
// - workerID: The ID of the worker (for thread pool workers, -1 for single-threaded runners)
// - panicInfo: The panic value recovered from the task
// - stackTrace: The stack trace at the time of panic
HandlePanic(ctx context.Context, runnerName string, workerID int, panicInfo any, stackTrace []byte)
}
PanicHandler is called when a task panics during execution. This allows custom panic handling, logging, and recovery strategies.
Implementations should be thread-safe as they may be called concurrently.
type ParallelTaskRunner ¶ added in v0.3.0
type ParallelTaskRunner struct {
// contains filtered or unexported fields
}
ParallelTaskRunner executes up to maxConcurrency tasks simultaneously. Tasks are queued with priority support and executed as slots become available.
func NewParallelTaskRunner ¶ added in v0.3.0
func NewParallelTaskRunner(threadPool ThreadPool, maxConcurrency int) *ParallelTaskRunner
NewParallelTaskRunner creates a new ParallelTaskRunner with the specified concurrency limit. Panics if threadPool is nil or maxConcurrency is out of valid range [1, 10000].
func (*ParallelTaskRunner) FlushAsync ¶ added in v0.3.0
func (r *ParallelTaskRunner) FlushAsync(callback func())
FlushAsync posts a barrier task that executes callback when all prior tasks complete. This is a non-blocking alternative to WaitIdle.
The callback will execute after all tasks posted before FlushAsync() have completed. Tasks posted after FlushAsync() will not run before the callback.
Implementation note: This posts a special barrier task to the queue. When the scheduler encounters the barrier, it waits for all currently running tasks to complete before executing the callback. This provides true barrier semantics.
Example:
runner.PostTask(task1)
runner.PostTask(task2)
runner.FlushAsync(func() {
// This runs after task1 and task2 complete
fmt.Println("task1 and task2 completed!")
})
runner.PostTask(task3) // Will NOT run before the callback
func (*ParallelTaskRunner) GetThreadPool ¶ added in v0.3.0
func (r *ParallelTaskRunner) GetThreadPool() ThreadPool
GetThreadPool returns the underlying ThreadPool used by this runner
func (*ParallelTaskRunner) IsClosed ¶ added in v0.3.0
func (r *ParallelTaskRunner) IsClosed() bool
IsClosed returns true if the runner has been shut down.
func (*ParallelTaskRunner) MaxConcurrency ¶ added in v0.3.0
func (r *ParallelTaskRunner) MaxConcurrency() int
MaxConcurrency returns the maximum number of concurrent tasks.
func (*ParallelTaskRunner) Metadata ¶ added in v0.3.0
func (r *ParallelTaskRunner) Metadata() map[string]any
Metadata returns the metadata associated with the task runner
func (*ParallelTaskRunner) Name ¶ added in v0.3.0
func (r *ParallelTaskRunner) Name() string
Name returns the name of the task runner
func (*ParallelTaskRunner) PendingTaskCount ¶ added in v0.3.0
func (r *ParallelTaskRunner) PendingTaskCount() int
PendingTaskCount returns the number of queued tasks waiting to run.
func (*ParallelTaskRunner) PostDelayedTask ¶ added in v0.3.0
func (r *ParallelTaskRunner) PostDelayedTask(task Task, delay time.Duration)
PostDelayedTask submits a task to execute after a delay.
func (*ParallelTaskRunner) PostDelayedTaskNamed ¶ added in v0.3.0
func (r *ParallelTaskRunner) PostDelayedTaskNamed(name string, task Task, delay time.Duration)
PostDelayedTaskNamed submits a delayed named task.
func (*ParallelTaskRunner) PostDelayedTaskWithTraits ¶ added in v0.3.0
func (r *ParallelTaskRunner) PostDelayedTaskWithTraits(task Task, delay time.Duration, traits TaskTraits)
PostDelayedTaskWithTraits submits a delayed task with specified traits.
func (*ParallelTaskRunner) PostDelayedTaskWithTraitsNamed ¶ added in v0.3.0
func (r *ParallelTaskRunner) PostDelayedTaskWithTraitsNamed(name string, task Task, delay time.Duration, traits TaskTraits)
PostDelayedTaskWithTraitsNamed submits a delayed named task with specified traits.
func (*ParallelTaskRunner) PostRepeatingTask ¶ added in v0.3.0
func (r *ParallelTaskRunner) PostRepeatingTask(task Task, interval time.Duration) RepeatingTaskHandle
PostRepeatingTask submits a repeating task
func (*ParallelTaskRunner) PostRepeatingTaskWithInitialDelay ¶ added in v0.3.0
func (r *ParallelTaskRunner) PostRepeatingTaskWithInitialDelay( task Task, initialDelay, interval time.Duration, traits TaskTraits, ) RepeatingTaskHandle
PostRepeatingTaskWithInitialDelay submits a repeating task with an initial delay
func (*ParallelTaskRunner) PostRepeatingTaskWithTraits ¶ added in v0.3.0
func (r *ParallelTaskRunner) PostRepeatingTaskWithTraits( task Task, interval time.Duration, traits TaskTraits, ) RepeatingTaskHandle
PostRepeatingTaskWithTraits submits a repeating task with specific traits
func (*ParallelTaskRunner) PostTask ¶ added in v0.3.0
func (r *ParallelTaskRunner) PostTask(task Task)
PostTask submits a task with default traits.
func (*ParallelTaskRunner) PostTaskAndReply ¶ added in v0.3.0
func (r *ParallelTaskRunner) PostTaskAndReply(task Task, reply Task, replyRunner TaskRunner)
PostTaskAndReply executes task on this runner, then posts reply to replyRunner.
func (*ParallelTaskRunner) PostTaskAndReplyWithTraits ¶ added in v0.3.0
func (r *ParallelTaskRunner) PostTaskAndReplyWithTraits( task Task, taskTraits TaskTraits, reply Task, replyTraits TaskTraits, replyRunner TaskRunner, )
PostTaskAndReplyWithTraits allows specifying different traits for task and reply.
func (*ParallelTaskRunner) PostTaskNamed ¶ added in v0.3.0
func (r *ParallelTaskRunner) PostTaskNamed(name string, task Task)
PostTaskNamed submits a task with a caller-provided display name.
func (*ParallelTaskRunner) PostTaskWithTraits ¶ added in v0.3.0
func (r *ParallelTaskRunner) PostTaskWithTraits(task Task, traits TaskTraits)
PostTaskWithTraits submits a task with specified traits.
func (*ParallelTaskRunner) PostTaskWithTraitsNamed ¶ added in v0.3.0
func (r *ParallelTaskRunner) PostTaskWithTraitsNamed(name string, task Task, traits TaskTraits)
PostTaskWithTraitsNamed submits a named task with specified traits.
func (*ParallelTaskRunner) RecentTasks ¶ added in v0.3.0
func (r *ParallelTaskRunner) RecentTasks(limit int) []TaskExecutionRecord
RecentTasks returns completed task execution records in newest-first order.
func (*ParallelTaskRunner) RunningTaskCount ¶ added in v0.3.0
func (r *ParallelTaskRunner) RunningTaskCount() int
RunningTaskCount returns the number of currently executing tasks.
func (*ParallelTaskRunner) SetMetadata ¶ added in v0.3.0
func (r *ParallelTaskRunner) SetMetadata(key string, value any)
SetMetadata sets a metadata key-value pair
func (*ParallelTaskRunner) SetName ¶ added in v0.3.0
func (r *ParallelTaskRunner) SetName(name string)
SetName sets the name of the task runner
func (*ParallelTaskRunner) Shutdown ¶ added in v0.3.0
func (r *ParallelTaskRunner) Shutdown()
Shutdown marks the runner as closed and clears all pending tasks. This method is non-blocking and can be safely called from within a task.
Shutdown does NOT interrupt currently executing tasks - they will run to completion. However, no new tasks will be started from the queue after Shutdown is called.
func (*ParallelTaskRunner) Stats ¶ added in v0.3.0
func (r *ParallelTaskRunner) Stats() RunnerStats
Stats returns current observability data for this runner.
func (*ParallelTaskRunner) WaitIdle ¶ added in v0.3.0
func (r *ParallelTaskRunner) WaitIdle(ctx context.Context) error
WaitIdle blocks until all currently queued tasks have completed execution.
This method waits until both the queue is empty AND no tasks are currently executing (runningCount == 0).
Returns error if: - Context is cancelled or deadline exceeded - Runner is closed when WaitIdle is called
Note: Tasks posted after WaitIdle is called are not waited for.
func (*ParallelTaskRunner) WaitShutdown ¶ added in v0.3.0
func (r *ParallelTaskRunner) WaitShutdown(ctx context.Context) error
WaitShutdown blocks until Shutdown() is called on this runner. Returns error if context is cancelled.
type PoolStats ¶ added in v0.3.0
PoolStats represents runtime observability state for a thread pool.
type PriorityTaskQueue ¶
type PriorityTaskQueue struct {
// contains filtered or unexported fields
}
func NewPriorityTaskQueue ¶
func NewPriorityTaskQueue() *PriorityTaskQueue
func (*PriorityTaskQueue) Clear ¶
func (q *PriorityTaskQueue) Clear()
Clear removes all tasks from the queue and releases references
func (*PriorityTaskQueue) IsEmpty ¶
func (q *PriorityTaskQueue) IsEmpty() bool
func (*PriorityTaskQueue) Len ¶
func (q *PriorityTaskQueue) Len() int
func (*PriorityTaskQueue) MaybeCompact ¶
func (q *PriorityTaskQueue) MaybeCompact()
func (*PriorityTaskQueue) PeekTraits ¶
func (q *PriorityTaskQueue) PeekTraits() (TaskTraits, bool)
func (*PriorityTaskQueue) Pop ¶
func (q *PriorityTaskQueue) Pop() (TaskItem, bool)
func (*PriorityTaskQueue) PopUpTo ¶
func (q *PriorityTaskQueue) PopUpTo(max int) []TaskItem
func (*PriorityTaskQueue) Push ¶
func (q *PriorityTaskQueue) Push(t Task, traits TaskTraits)
func (*PriorityTaskQueue) PushWithID ¶ added in v0.3.0
func (q *PriorityTaskQueue) PushWithID(t Task, traits TaskTraits) TaskID
type QueuePolicy ¶ added in v0.2.0
type QueuePolicy int
QueuePolicy defines how to handle full queue situations
const ( // QueuePolicyDrop: Silently drop tasks when queue is full (current behavior) QueuePolicyDrop QueuePolicy = iota // QueuePolicyReject: Call rejection callback when queue is full QueuePolicyReject // QueuePolicyWait: Block until queue has space or context is done QueuePolicyWait )
type RawJobHandler ¶ added in v0.2.0
RawJobHandler is the internal handler type that works with raw bytes
type RejectedTaskHandler ¶ added in v0.3.0
type RejectedTaskHandler interface {
// HandleRejectedTask is called when a task is rejected.
//
// Parameters:
// - runnerName: The name of the task runner
// - reason: Why the task was rejected (e.g., "shutdown", "backpressure")
HandleRejectedTask(runnerName string, reason string)
}
RejectedTaskHandler is called when a task is rejected by the scheduler. This can happen when: - The scheduler is shutting down - The signal channel is full (backpressure) - The task queue is full (if bounded queues are implemented in the future)
Implementations should be thread-safe as they may be called concurrently.
type RejectionCallback ¶ added in v0.2.0
type RejectionCallback func(task Task, traits TaskTraits)
RejectionCallback is called when a task is rejected (QueuePolicyReject mode)
type RepeatingTaskHandle ¶
type RepeatingTaskHandle interface {
// Stop stops the repeating task. It will not interrupt a currently executing task,
// but will prevent future executions from being scheduled.
Stop()
// IsStopped returns true if the task has been stopped.
IsStopped() bool
}
RepeatingTaskHandle controls the lifecycle of a repeating task.
type ReplyWithResult ¶
ReplyWithResult defines a reply callback that receives a result of type T and an error. This is the counterpart to TaskWithResult, receiving the values returned by the task.
type RetryPolicy ¶ added in v0.2.0
type RetryPolicy struct {
// MaxRetries is the maximum number of retry attempts (0 = no retry, 1 = one retry)
MaxRetries int
// InitialDelay is the delay before the first retry
InitialDelay time.Duration
// MaxDelay is the maximum delay between retries
MaxDelay time.Duration
// BackoffRatio is the multiplier for delay after each retry (e.g., 2.0 for exponential)
// For example, with InitialDelay=100ms and BackoffRatio=2.0:
// - Retry 1 delay: 100ms
// - Retry 2 delay: 200ms
// - Retry 3 delay: 400ms (capped by MaxDelay)
BackoffRatio float64
}
RetryPolicy defines retry behavior for IO operations
func DefaultRetryPolicy ¶ added in v0.2.0
func DefaultRetryPolicy() RetryPolicy
DefaultRetryPolicy returns a sensible default retry policy
func NoRetry ¶ added in v0.2.0
func NoRetry() RetryPolicy
NoRetry returns a retry policy with no retries
type RunnerStats ¶ added in v0.3.0
type RunnerStats struct {
Name string
Type string
Pending int
Running int
Rejected int64
Closed bool
BarrierPending bool
LastTaskName string
LastTaskAt time.Time
}
RunnerStats represents runtime observability state for a task runner.
type SequencedTaskRunner ¶
type SequencedTaskRunner struct {
// contains filtered or unexported fields
}
func NewSequencedTaskRunner ¶
func NewSequencedTaskRunner(threadPool ThreadPool) *SequencedTaskRunner
func (*SequencedTaskRunner) FlushAsync ¶
func (r *SequencedTaskRunner) FlushAsync(callback func())
FlushAsync posts a barrier task that executes the callback when all prior tasks complete. This is a non-blocking alternative to WaitIdle.
The callback will be executed on this runner's thread, after all tasks posted before FlushAsync have completed.
Example:
runner.PostTask(task1)
runner.PostTask(task2)
runner.FlushAsync(func() {
fmt.Println("task1 and task2 completed!")
})
func (*SequencedTaskRunner) GetThreadPool ¶
func (r *SequencedTaskRunner) GetThreadPool() ThreadPool
GetThreadPool returns the underlying ThreadPool used by this runner
func (*SequencedTaskRunner) IsClosed ¶
func (r *SequencedTaskRunner) IsClosed() bool
IsClosed returns true if the runner has been shut down.
func (*SequencedTaskRunner) Metadata ¶
func (r *SequencedTaskRunner) Metadata() map[string]any
Metadata returns the metadata associated with the task runner
func (*SequencedTaskRunner) Name ¶
func (r *SequencedTaskRunner) Name() string
Name returns the name of the task runner
func (*SequencedTaskRunner) PendingTaskCount ¶ added in v0.3.0
func (r *SequencedTaskRunner) PendingTaskCount() int
PendingTaskCount returns the number of queued tasks waiting to run.
func (*SequencedTaskRunner) PostDelayedTask ¶
func (r *SequencedTaskRunner) PostDelayedTask(task Task, delay time.Duration)
PostDelayedTask submits a task to execute after a delay
func (*SequencedTaskRunner) PostDelayedTaskNamed ¶ added in v0.3.0
func (r *SequencedTaskRunner) PostDelayedTaskNamed(name string, task Task, delay time.Duration)
PostDelayedTaskNamed submits a delayed named task.
func (*SequencedTaskRunner) PostDelayedTaskWithTraits ¶
func (r *SequencedTaskRunner) PostDelayedTaskWithTraits(task Task, delay time.Duration, traits TaskTraits)
PostDelayedTaskWithTraits submits a delayed task with specified traits
func (*SequencedTaskRunner) PostDelayedTaskWithTraitsNamed ¶ added in v0.3.0
func (r *SequencedTaskRunner) PostDelayedTaskWithTraitsNamed(name string, task Task, delay time.Duration, traits TaskTraits)
PostDelayedTaskWithTraitsNamed submits a delayed named task with specified traits.
func (*SequencedTaskRunner) PostRepeatingTask ¶
func (r *SequencedTaskRunner) PostRepeatingTask(task Task, interval time.Duration) RepeatingTaskHandle
PostRepeatingTask submits a task that repeats at a fixed interval
func (*SequencedTaskRunner) PostRepeatingTaskWithInitialDelay ¶
func (r *SequencedTaskRunner) PostRepeatingTaskWithInitialDelay( task Task, initialDelay, interval time.Duration, traits TaskTraits, ) RepeatingTaskHandle
PostRepeatingTaskWithInitialDelay submits a repeating task with an initial delay The task will first execute after initialDelay, then repeat every interval.
func (*SequencedTaskRunner) PostRepeatingTaskWithTraits ¶
func (r *SequencedTaskRunner) PostRepeatingTaskWithTraits( task Task, interval time.Duration, traits TaskTraits, ) RepeatingTaskHandle
PostRepeatingTaskWithTraits submits a repeating task with specific traits
func (*SequencedTaskRunner) PostTask ¶
func (r *SequencedTaskRunner) PostTask(task Task)
PostTask submits a task with default traits
func (*SequencedTaskRunner) PostTaskAndReply ¶
func (r *SequencedTaskRunner) PostTaskAndReply(task Task, reply Task, replyRunner TaskRunner)
PostTaskAndReply executes task on this runner, then posts reply to replyRunner. If task panics, reply will not be executed.
func (*SequencedTaskRunner) PostTaskAndReplyWithTraits ¶
func (r *SequencedTaskRunner) PostTaskAndReplyWithTraits( task Task, taskTraits TaskTraits, reply Task, replyTraits TaskTraits, replyRunner TaskRunner, )
PostTaskAndReplyWithTraits allows specifying different traits for task and reply. This is useful when task is background work (BestEffort) but reply is UI update (UserVisible).
func (*SequencedTaskRunner) PostTaskNamed ¶ added in v0.3.0
func (r *SequencedTaskRunner) PostTaskNamed(name string, task Task)
PostTaskNamed submits a task with a caller-provided display name.
func (*SequencedTaskRunner) PostTaskWithTraits ¶
func (r *SequencedTaskRunner) PostTaskWithTraits(task Task, traits TaskTraits)
PostTaskWithTraits submits a task with specified traits
func (*SequencedTaskRunner) PostTaskWithTraitsNamed ¶ added in v0.3.0
func (r *SequencedTaskRunner) PostTaskWithTraitsNamed(name string, task Task, traits TaskTraits)
PostTaskWithTraitsNamed submits a named task with specified traits.
func (*SequencedTaskRunner) RecentTasks ¶ added in v0.3.0
func (r *SequencedTaskRunner) RecentTasks(limit int) []TaskExecutionRecord
RecentTasks returns completed task execution records in newest-first order.
func (*SequencedTaskRunner) RunningTaskCount ¶ added in v0.3.0
func (r *SequencedTaskRunner) RunningTaskCount() int
RunningTaskCount returns 1 if runLoop is running/scheduled, otherwise 0.
func (*SequencedTaskRunner) SetMetadata ¶
func (r *SequencedTaskRunner) SetMetadata(key string, value any)
SetMetadata sets a metadata key-value pair
func (*SequencedTaskRunner) SetName ¶
func (r *SequencedTaskRunner) SetName(name string)
SetName sets the name of the task runner
func (*SequencedTaskRunner) Shutdown ¶
func (r *SequencedTaskRunner) Shutdown()
Shutdown gracefully stops the runner by: 1. Marking it as closed (stops accepting new tasks) 2. Clearing all pending tasks in the queue 3. All repeating tasks will automatically stop on their next execution 4. Signaling all WaitShutdown() waiters
Note: This method is non-blocking and can be safely called from within a task. Note: This will not interrupt currently executing tasks.
func (*SequencedTaskRunner) Stats ¶ added in v0.3.0
func (r *SequencedTaskRunner) Stats() RunnerStats
Stats returns current observability data for this runner.
func (*SequencedTaskRunner) WaitIdle ¶
func (r *SequencedTaskRunner) WaitIdle(ctx context.Context) error
WaitIdle blocks until all currently queued tasks have completed execution. This is implemented by posting a barrier task and waiting for it to execute.
Due to the sequential nature of SequencedTaskRunner, when the barrier task executes, all tasks posted before WaitIdle are guaranteed to have completed.
Returns error if: - Context is cancelled or deadline exceeded - Runner is closed when WaitIdle is called
Note: Tasks posted after WaitIdle is called are not waited for. Note: Repeating tasks will continue to repeat and are not waited for.
func (*SequencedTaskRunner) WaitShutdown ¶
func (r *SequencedTaskRunner) WaitShutdown(ctx context.Context) error
WaitShutdown blocks until Shutdown() is called on this runner.
This is useful for waiting for the runner to be shut down, either by an external caller or by a task running on the runner itself.
Returns error if context is cancelled or deadline exceeded.
Example:
// Task shuts down the runner when condition is met
runner.PostTask(func(ctx context.Context) {
if conditionMet() {
me := GetCurrentTaskRunner(ctx)
me.Shutdown()
}
})
// Main thread waits for shutdown
runner.WaitShutdown(context.Background())
type SingleThreadTaskRunner ¶
type SingleThreadTaskRunner struct {
// contains filtered or unexported fields
}
SingleThreadTaskRunner binds a dedicated Goroutine to execute tasks sequentially. It guarantees that all tasks submitted to it run on the same Goroutine (Thread Affinity).
Use cases: 1. Blocking IO operations (e.g., NetworkReceiver) 2. CGO calls that require Thread Local Storage 3. Simulating Main Thread / UI Thread behavior
Key differences from SequencedTaskRunner: - SequencedTaskRunner: Tasks execute sequentially but may run on different worker goroutines - SingleThreadTaskRunner: Tasks execute sequentially AND always on the same dedicated goroutine
func NewSingleThreadTaskRunner ¶
func NewSingleThreadTaskRunner() *SingleThreadTaskRunner
NewSingleThreadTaskRunner creates and starts a new SingleThreadTaskRunner. It immediately spawns a dedicated goroutine for task execution.
func (*SingleThreadTaskRunner) FlushAsync ¶
func (r *SingleThreadTaskRunner) FlushAsync(callback func())
FlushAsync posts a barrier task that executes the callback when all prior tasks complete. This is a non-blocking alternative to WaitIdle.
The callback will be executed on this runner's dedicated goroutine, after all tasks posted before FlushAsync have completed.
Example:
runner.PostTask(task1)
runner.PostTask(task2)
runner.FlushAsync(func() {
fmt.Println("task1 and task2 completed!")
})
func (*SingleThreadTaskRunner) GetQueuePolicy ¶ added in v0.2.0
func (r *SingleThreadTaskRunner) GetQueuePolicy() QueuePolicy
GetQueuePolicy returns the current queue policy
func (*SingleThreadTaskRunner) GetThreadPool ¶
func (r *SingleThreadTaskRunner) GetThreadPool() ThreadPool
GetThreadPool returns nil because SingleThreadTaskRunner doesn't use a thread pool
func (*SingleThreadTaskRunner) IsClosed ¶
func (r *SingleThreadTaskRunner) IsClosed() bool
IsClosed returns true if the runner has been stopped
func (*SingleThreadTaskRunner) Metadata ¶
func (r *SingleThreadTaskRunner) Metadata() map[string]any
Metadata returns the metadata associated with the task runner
func (*SingleThreadTaskRunner) Name ¶
func (r *SingleThreadTaskRunner) Name() string
Name returns the name of the task runner
func (*SingleThreadTaskRunner) PendingTaskCount ¶ added in v0.3.0
func (r *SingleThreadTaskRunner) PendingTaskCount() int
PendingTaskCount returns the number of queued tasks waiting to run.
func (*SingleThreadTaskRunner) PostDelayedTask ¶
func (r *SingleThreadTaskRunner) PostDelayedTask(task Task, delay time.Duration)
PostDelayedTask submits a delayed task
func (*SingleThreadTaskRunner) PostDelayedTaskNamed ¶ added in v0.3.0
func (r *SingleThreadTaskRunner) PostDelayedTaskNamed(name string, task Task, delay time.Duration)
PostDelayedTaskNamed submits a delayed named task.
func (*SingleThreadTaskRunner) PostDelayedTaskWithTraits ¶
func (r *SingleThreadTaskRunner) PostDelayedTaskWithTraits(task Task, delay time.Duration, traits TaskTraits)
PostDelayedTaskWithTraits submits a delayed task with traits. Uses time.AfterFunc which is independent of the global TaskScheduler, ensuring IO-related timers are not affected by scheduler load.
func (*SingleThreadTaskRunner) PostDelayedTaskWithTraitsNamed ¶ added in v0.3.0
func (r *SingleThreadTaskRunner) PostDelayedTaskWithTraitsNamed(name string, task Task, delay time.Duration, traits TaskTraits)
PostDelayedTaskWithTraitsNamed submits a delayed named task with traits.
func (*SingleThreadTaskRunner) PostRepeatingTask ¶
func (r *SingleThreadTaskRunner) PostRepeatingTask(task Task, interval time.Duration) RepeatingTaskHandle
PostRepeatingTask submits a task that repeats at a fixed interval
func (*SingleThreadTaskRunner) PostRepeatingTaskWithInitialDelay ¶
func (r *SingleThreadTaskRunner) PostRepeatingTaskWithInitialDelay( task Task, initialDelay, interval time.Duration, traits TaskTraits, ) RepeatingTaskHandle
PostRepeatingTaskWithInitialDelay submits a repeating task with an initial delay
func (*SingleThreadTaskRunner) PostRepeatingTaskWithTraits ¶
func (r *SingleThreadTaskRunner) PostRepeatingTaskWithTraits( task Task, interval time.Duration, traits TaskTraits, ) RepeatingTaskHandle
PostRepeatingTaskWithTraits submits a repeating task with traits
func (*SingleThreadTaskRunner) PostTask ¶
func (r *SingleThreadTaskRunner) PostTask(task Task)
PostTask submits a task for execution
func (*SingleThreadTaskRunner) PostTaskAndReply ¶
func (r *SingleThreadTaskRunner) PostTaskAndReply(task Task, reply Task, replyRunner TaskRunner)
PostTaskAndReply executes task on this runner, then posts reply to replyRunner. If task panics, reply will not be executed. Both task and reply will execute on the same dedicated goroutine if replyRunner is this runner.
func (*SingleThreadTaskRunner) PostTaskAndReplyWithTraits ¶
func (r *SingleThreadTaskRunner) PostTaskAndReplyWithTraits( task Task, taskTraits TaskTraits, reply Task, replyTraits TaskTraits, replyRunner TaskRunner, )
PostTaskAndReplyWithTraits allows specifying different traits for task and reply. This is useful when task is background work (BestEffort) but reply is UI update (UserVisible). Note: For SingleThreadTaskRunner, traits don't affect execution order since all tasks run sequentially on the same goroutine, but they may be used for logging or metrics.
func (*SingleThreadTaskRunner) PostTaskNamed ¶ added in v0.3.0
func (r *SingleThreadTaskRunner) PostTaskNamed(name string, task Task)
PostTaskNamed submits a task with a caller-provided display name.
func (*SingleThreadTaskRunner) PostTaskWithTraits ¶
func (r *SingleThreadTaskRunner) PostTaskWithTraits(task Task, traits TaskTraits)
PostTaskWithTraits submits a task with traits (traits are ignored for single-threaded execution) The behavior when the queue is full depends on the configured QueuePolicy: - QueuePolicyDrop: Silently drops the task (default) - QueuePolicyReject: Calls the rejection callback if set - QueuePolicyWait: Blocks until queue has space or context is done
func (*SingleThreadTaskRunner) PostTaskWithTraitsNamed ¶ added in v0.3.0
func (r *SingleThreadTaskRunner) PostTaskWithTraitsNamed(name string, task Task, traits TaskTraits)
PostTaskWithTraitsNamed submits a named task with traits.
func (*SingleThreadTaskRunner) RecentTasks ¶ added in v0.3.0
func (r *SingleThreadTaskRunner) RecentTasks(limit int) []TaskExecutionRecord
RecentTasks returns completed task execution records in newest-first order.
func (*SingleThreadTaskRunner) RejectedCount ¶ added in v0.2.0
func (r *SingleThreadTaskRunner) RejectedCount() int64
RejectedCount returns the number of tasks that have been rejected due to full queue Only incremented when QueuePolicy is QueuePolicyReject
func (*SingleThreadTaskRunner) RunningTaskCount ¶ added in v0.3.0
func (r *SingleThreadTaskRunner) RunningTaskCount() int
RunningTaskCount returns the number of tasks currently executing.
func (*SingleThreadTaskRunner) SetMetadata ¶
func (r *SingleThreadTaskRunner) SetMetadata(key string, value any)
SetMetadata sets a metadata key-value pair
func (*SingleThreadTaskRunner) SetName ¶
func (r *SingleThreadTaskRunner) SetName(name string)
SetName sets the name of the task runner
func (*SingleThreadTaskRunner) SetQueuePolicy ¶ added in v0.2.0
func (r *SingleThreadTaskRunner) SetQueuePolicy(policy QueuePolicy)
SetQueuePolicy sets the policy for handling full queue situations
func (*SingleThreadTaskRunner) SetRejectionCallback ¶ added in v0.2.0
func (r *SingleThreadTaskRunner) SetRejectionCallback(callback RejectionCallback)
SetRejectionCallback sets the callback to be called when a task is rejected Only used when QueuePolicy is set to QueuePolicyReject
func (*SingleThreadTaskRunner) Shutdown ¶
func (r *SingleThreadTaskRunner) Shutdown()
Shutdown marks the runner as closed and signals shutdown waiters. Unlike Stop(), this method does NOT immediately terminate the runLoop. This allows tasks to call Shutdown() from within themselves.
After calling Shutdown(): - WaitShutdown() will return - IsClosed() will return true - New tasks posted will be ignored - Existing queued tasks will still execute - Call Stop() to actually terminate the runLoop
func (*SingleThreadTaskRunner) Stats ¶ added in v0.3.0
func (r *SingleThreadTaskRunner) Stats() RunnerStats
Stats returns current observability data for this runner.
func (*SingleThreadTaskRunner) Stop ¶
func (r *SingleThreadTaskRunner) Stop()
Stop stops the runner and releases resources
func (*SingleThreadTaskRunner) WaitIdle ¶
func (r *SingleThreadTaskRunner) WaitIdle(ctx context.Context) error
WaitIdle blocks until all currently queued tasks have completed execution. This is implemented by posting a barrier task and waiting for it to execute.
Since SingleThreadTaskRunner executes tasks sequentially on a dedicated goroutine, when the barrier task executes, all tasks posted before WaitIdle are guaranteed to have completed.
Returns error if: - Context is cancelled or deadline exceeded - Runner is closed when WaitIdle is called
Note: Tasks posted after WaitIdle is called are not waited for. Note: Repeating tasks will continue to repeat and are not waited for.
func (*SingleThreadTaskRunner) WaitShutdown ¶
func (r *SingleThreadTaskRunner) WaitShutdown(ctx context.Context) error
WaitShutdown blocks until Shutdown() is called on this runner.
This is useful for waiting for the runner to be shut down, either by an external caller or by a task running on the runner itself.
Returns error if context is cancelled or deadline exceeded.
Example:
// IO thread: receives messages and posts shutdown when condition met
ioRunner.PostTask(func(ctx context.Context) {
for {
msg := receiveMessage()
mainRunner.PostTask(func(ctx context.Context) {
if shouldShutdown(msg) {
me := GetCurrentTaskRunner(ctx)
me.Shutdown()
}
})
}
})
// Main thread waits for shutdown
mainRunner.WaitShutdown(context.Background())
type TaskExecutionRecord ¶ added in v0.3.0
type TaskExecutionRecord struct {
TaskID TaskID
Name string
RunnerName string
RunnerType string
Priority TaskPriority
StartedAt time.Time
FinishedAt time.Time
Duration time.Duration
Panicked bool
}
TaskExecutionRecord captures a completed task execution event.
type TaskID ¶ added in v0.3.0
TaskID is a unique identifier for tasks, using UUID for guaranteed uniqueness.
func GenerateTaskID ¶ added in v0.3.0
func GenerateTaskID() TaskID
GenerateTaskID creates a new unique TaskID.
type TaskItem ¶
type TaskItem struct {
Task Task
Traits TaskTraits
ID TaskID // Unique identifier for the task
}
type TaskPriority ¶
type TaskPriority int
const ( // TaskPriorityBestEffort: Lowest priority TaskPriorityBestEffort TaskPriority = iota // TaskPriorityUserVisible: Default priority TaskPriorityUserVisible // TaskPriorityUserBlocking: Highest priority // `UserBlocking` means the task may block the main thread. // If main thread is blocked, the UI will be unresponsive. // The user experience will be affected if the task blocks the main thread. TaskPriorityUserBlocking )
type TaskQueue ¶
type TaskQueue interface {
Push(t Task, traits TaskTraits)
PushWithID(t Task, traits TaskTraits) TaskID // Push with specific TaskID
Pop() (TaskItem, bool)
PopUpTo(max int) []TaskItem
PeekTraits() (TaskTraits, bool)
Len() int
IsEmpty() bool
MaybeCompact()
Clear() // Clear all tasks from the queue
}
TaskQueue defines the interface for different queue implementations
type TaskRunner ¶
type TaskRunner interface {
PostTask(task Task)
PostTaskWithTraits(task Task, traits TaskTraits)
PostDelayedTask(task Task, delay time.Duration)
// [v2.1 New] Support delayed tasks with specific traits
PostDelayedTaskWithTraits(task Task, delay time.Duration, traits TaskTraits)
// [v2.2 New] Support repeating tasks
PostRepeatingTask(task Task, interval time.Duration) RepeatingTaskHandle
PostRepeatingTaskWithTraits(task Task, interval time.Duration, traits TaskTraits) RepeatingTaskHandle
PostRepeatingTaskWithInitialDelay(task Task, initialDelay, interval time.Duration, traits TaskTraits) RepeatingTaskHandle
// [v2.3 New] Support task and reply pattern
// PostTaskAndReply executes task on this runner, then posts reply to replyRunner
PostTaskAndReply(task Task, reply Task, replyRunner TaskRunner)
// PostTaskAndReplyWithTraits allows specifying traits for both task and reply
PostTaskAndReplyWithTraits(task Task, taskTraits TaskTraits, reply Task, replyTraits TaskTraits, replyRunner TaskRunner)
// [v2.4 New] Synchronization and lifecycle management
// WaitIdle blocks until all currently queued tasks have completed execution
// Tasks posted after WaitIdle is called are not waited for
// Returns error if context is cancelled or runner is closed
WaitIdle(ctx context.Context) error
// FlushAsync posts a barrier task that executes callback when all prior tasks complete
// This is a non-blocking alternative to WaitIdle
FlushAsync(callback func())
// WaitShutdown blocks until Shutdown() is called on this runner
// Returns error if context is cancelled
WaitShutdown(ctx context.Context) error
// Shutdown marks the runner as closed and clears all pending tasks
// This method is non-blocking and can be safely called from within a task
Shutdown()
// IsClosed returns true if the runner has been shut down
IsClosed() bool
// [v2.5 New] Identification and Metadata
// Name returns the name of the task runner
Name() string
// Metadata returns the metadata associated with the task runner
Metadata() map[string]any
// [v2.6 New] Thread Pool Access
// GetThreadPool returns the underlying ThreadPool used by this runner
// Returns nil for runners that don't use a thread pool (e.g., SingleThreadTaskRunner)
GetThreadPool() ThreadPool
}
func GetCurrentTaskRunner ¶
func GetCurrentTaskRunner(ctx context.Context) TaskRunner
type TaskScheduler ¶
type TaskScheduler struct {
// contains filtered or unexported fields
}
func NewFIFOTaskScheduler ¶
func NewFIFOTaskScheduler(workerCount int) *TaskScheduler
func NewFIFOTaskSchedulerWithConfig ¶ added in v0.3.0
func NewFIFOTaskSchedulerWithConfig(workerCount int, config *TaskSchedulerConfig) *TaskScheduler
func NewPriorityTaskScheduler ¶
func NewPriorityTaskScheduler(workerCount int) *TaskScheduler
func NewPriorityTaskSchedulerWithConfig ¶ added in v0.3.0
func NewPriorityTaskSchedulerWithConfig(workerCount int, config *TaskSchedulerConfig) *TaskScheduler
func (*TaskScheduler) ActiveTaskCount ¶
func (s *TaskScheduler) ActiveTaskCount() int
func (*TaskScheduler) DelayedTaskCount ¶
func (s *TaskScheduler) DelayedTaskCount() int
func (*TaskScheduler) GetMetrics ¶ added in v0.3.0
func (s *TaskScheduler) GetMetrics() Metrics
GetMetrics returns the metrics collector for this scheduler
func (*TaskScheduler) GetPanicHandler ¶ added in v0.3.0
func (s *TaskScheduler) GetPanicHandler() PanicHandler
GetPanicHandler returns the panic handler for this scheduler
func (*TaskScheduler) GetWork ¶
func (s *TaskScheduler) GetWork(stopCh <-chan struct{}) (TaskItem, bool)
GetWork (Called by Worker)
func (*TaskScheduler) OnTaskEnd ¶
func (s *TaskScheduler) OnTaskEnd()
func (*TaskScheduler) OnTaskStart ¶
func (s *TaskScheduler) OnTaskStart()
func (*TaskScheduler) PostDelayedInternal ¶
func (s *TaskScheduler) PostDelayedInternal(task Task, delay time.Duration, traits TaskTraits, target TaskRunner)
PostDelayedInternal
func (*TaskScheduler) PostInternal ¶
func (s *TaskScheduler) PostInternal(task Task, traits TaskTraits)
PostInternal
func (*TaskScheduler) QueuedTaskCount ¶
func (s *TaskScheduler) QueuedTaskCount() int
func (*TaskScheduler) Shutdown ¶
func (s *TaskScheduler) Shutdown()
func (*TaskScheduler) ShutdownGraceful ¶ added in v0.2.0
func (s *TaskScheduler) ShutdownGraceful(timeout time.Duration) error
ShutdownGraceful waits for all queued and active tasks to complete Returns error if timeout is exceeded before tasks complete
type TaskSchedulerConfig ¶ added in v0.3.0
type TaskSchedulerConfig struct {
// PanicHandler is called when a task panics. Defaults to DefaultPanicHandler.
PanicHandler PanicHandler
// Metrics is called to record task execution metrics. Defaults to NilMetrics.
Metrics Metrics
// RejectedTaskHandler is called when a task is rejected. Defaults to DefaultRejectedTaskHandler.
RejectedTaskHandler RejectedTaskHandler
}
TaskSchedulerConfig holds configuration options for TaskScheduler. All handlers are optional; if not provided, default implementations will be used.
Example ¶
// Create custom handlers
panicHandler := &DefaultPanicHandler{}
metrics := &NilMetrics{}
rejectedHandler := &DefaultRejectedTaskHandler{}
// Create config
config := &TaskSchedulerConfig{
PanicHandler: panicHandler,
Metrics: metrics,
RejectedTaskHandler: rejectedHandler,
}
// Create scheduler with config
_ = NewFIFOTaskSchedulerWithConfig(4, config)
func DefaultTaskSchedulerConfig ¶ added in v0.3.0
func DefaultTaskSchedulerConfig() *TaskSchedulerConfig
DefaultTaskSchedulerConfig returns a config with default handlers.
Example ¶
// Use default config config := DefaultTaskSchedulerConfig() // Create scheduler with default config _ = NewFIFOTaskSchedulerWithConfig(4, config)
type TaskTraits ¶
type TaskTraits struct {
Priority TaskPriority
}
func DefaultTaskTraits ¶
func DefaultTaskTraits() TaskTraits
func TraitsBestEffort ¶
func TraitsBestEffort() TaskTraits
func TraitsUserBlocking ¶
func TraitsUserBlocking() TaskTraits
func TraitsUserVisible ¶
func TraitsUserVisible() TaskTraits
type TaskWithResult ¶
TaskWithResult defines a task that returns a result of type T and an error. This is used with PostTaskAndReplyWithResult to pass data from task to reply.
type ThreadPool ¶
type ThreadPool interface {
PostInternal(task Task, traits TaskTraits)
PostDelayedInternal(task Task, delay time.Duration, traits TaskTraits, target TaskRunner)
Start(ctx context.Context)
Stop()
ID() string
IsRunning() bool
WorkerCount() int
QueuedTaskCount() int // In queue
ActiveTaskCount() int // Executing
DelayedTaskCount() int // Delayed
}
============================================================================= ThreadPool: Define task execution interface =============================================================================
type TypedHandler ¶ added in v0.2.0
TypedHandler is a generic handler type for type-safe job handlers