Documentation
¶
Overview ¶
Package worker provides the dispatcher that bridges scheduler and worker pool.
Package worker provides job execution capabilities for GopherQueue.
Package worker provides a production-ready worker pool implementation.
Index ¶
- type Dispatcher
- type DispatcherStats
- type Handler
- type JobContext
- type PoolStats
- type SimpleDispatcher
- type SimplePool
- func (p *SimplePool) CompleteJob(ctx context.Context, result *core.JobResult) error
- func (p *SimplePool) FailJob(ctx context.Context, result *core.JobResult) error
- func (p *SimplePool) IsHealthy() bool
- func (p *SimplePool) RegisterHandler(jobType string, handler Handler)
- func (p *SimplePool) Start(ctx context.Context) error
- func (p *SimplePool) Stats() *PoolStats
- func (p *SimplePool) Stop(ctx context.Context) error
- func (p *SimplePool) Submit(job *core.Job) error
- func (p *SimplePool) WaitForJob(ctx context.Context) (*core.Job, error)
- type WorkerConfig
- type WorkerPool
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Dispatcher ¶
type Dispatcher interface {
// Start begins the dispatcher's main loop.
Start(ctx context.Context) error
// Stop gracefully shuts down the dispatcher.
Stop(ctx context.Context) error
// Stats returns dispatcher statistics.
Stats() *DispatcherStats
// IsHealthy returns whether the dispatcher is operating normally.
IsHealthy() bool
}
Dispatcher bridges the scheduler and worker pool, managing job flow.
type DispatcherStats ¶
type DispatcherStats struct {
JobsDispatched int64 `json:"jobs_dispatched"`
JobsCompleted int64 `json:"jobs_completed"`
JobsFailed int64 `json:"jobs_failed"`
AvgDispatchLatency time.Duration `json:"avg_dispatch_latency"`
Healthy bool `json:"healthy"`
}
DispatcherStats contains dispatcher metrics.
type Handler ¶
type Handler func(ctx context.Context, jctx core.JobContext) error
Handler is a function that processes a job.
type JobContext ¶
type JobContext = core.JobContext
JobContext is an alias to core.JobContext for external use.
type PoolStats ¶
type PoolStats struct {
// Worker counts
TotalWorkers int `json:"total_workers"`
ActiveWorkers int `json:"active_workers"`
IdleWorkers int `json:"idle_workers"`
// Job counts
QueuedJobs int64 `json:"queued_jobs"`
ProcessedJobs int64 `json:"processed_jobs"`
SucceededJobs int64 `json:"succeeded_jobs"`
FailedJobs int64 `json:"failed_jobs"`
RetriedJobs int64 `json:"retried_jobs"`
// Latencies
AvgProcessingTime time.Duration `json:"avg_processing_time"`
MaxProcessingTime time.Duration `json:"max_processing_time"`
P99ProcessingTime time.Duration `json:"p99_processing_time"`
// Current state
ProcessingIDs []uuid.UUID `json:"processing_ids"`
Uptime time.Duration `json:"uptime"`
Healthy bool `json:"healthy"`
}
PoolStats contains worker pool metrics.
type SimpleDispatcher ¶
type SimpleDispatcher struct {
// contains filtered or unexported fields
}
SimpleDispatcher is a basic dispatcher implementation.
func NewSimpleDispatcher ¶
func NewSimpleDispatcher(sched scheduler.Scheduler, pool WorkerPool) *SimpleDispatcher
NewSimpleDispatcher creates a new dispatcher.
func (*SimpleDispatcher) IsHealthy ¶
func (d *SimpleDispatcher) IsHealthy() bool
IsHealthy returns whether the dispatcher is operating normally.
func (*SimpleDispatcher) RecordCompletion ¶
func (d *SimpleDispatcher) RecordCompletion(result *core.JobResult)
RecordCompletion records a successful job completion.
func (*SimpleDispatcher) Start ¶
func (d *SimpleDispatcher) Start(ctx context.Context) error
Start begins the dispatcher's main loop.
func (*SimpleDispatcher) Stats ¶
func (d *SimpleDispatcher) Stats() *DispatcherStats
Stats returns dispatcher statistics.
type SimplePool ¶
type SimplePool struct {
// contains filtered or unexported fields
}
SimplePool is a production-ready worker pool implementation.
func NewSimplePool ¶
func NewSimplePool(store persistence.JobStore, sched scheduler.Scheduler, config *WorkerConfig) *SimplePool
NewSimplePool creates a new worker pool.
func (*SimplePool) CompleteJob ¶
CompleteJob is called when a job finishes successfully.
func (*SimplePool) IsHealthy ¶
func (p *SimplePool) IsHealthy() bool
IsHealthy returns whether the worker pool is operating normally.
func (*SimplePool) RegisterHandler ¶
func (p *SimplePool) RegisterHandler(jobType string, handler Handler)
RegisterHandler registers a handler for a job type.
func (*SimplePool) Start ¶
func (p *SimplePool) Start(ctx context.Context) error
Start initializes the worker pool and begins processing.
func (*SimplePool) Stats ¶
func (p *SimplePool) Stats() *PoolStats
Stats returns current worker pool statistics.
func (*SimplePool) Stop ¶
func (p *SimplePool) Stop(ctx context.Context) error
Stop gracefully shuts down all workers.
func (*SimplePool) Submit ¶
func (p *SimplePool) Submit(job *core.Job) error
Submit adds a job to the pool for execution.
func (*SimplePool) WaitForJob ¶
WaitForJob blocks until a job is available.
type WorkerConfig ¶
type WorkerConfig struct {
// ID is the unique identifier for this worker instance.
ID string
// Concurrency is the number of concurrent workers.
Concurrency int
// HeartbeatInterval is how often workers report heartbeat.
HeartbeatInterval time.Duration
// JobTimeout is the maximum time a job can run before being killed.
JobTimeout time.Duration
// ShutdownGracePeriod is the time to wait for jobs to complete on shutdown.
ShutdownGracePeriod time.Duration
// CheckpointInterval is how often to checkpoint long-running jobs.
CheckpointInterval time.Duration
// DataDir is the directory for local storage
DataDir string
}
WorkerConfig configures worker behavior.
func DefaultWorkerConfig ¶
func DefaultWorkerConfig() *WorkerConfig
DefaultWorkerConfig returns sensible defaults.
type WorkerPool ¶
type WorkerPool interface {
// Start initializes the worker pool and begins processing.
Start(ctx context.Context) error
// Stop gracefully shuts down all workers.
Stop(ctx context.Context) error
// RegisterHandler registers a handler for a job type.
RegisterHandler(jobType string, handler Handler)
// Submit adds a job to the pool for execution.
Submit(job *core.Job) error
// Stats returns current worker pool statistics.
Stats() *PoolStats
// IsHealthy returns whether the worker pool is operating normally.
IsHealthy() bool
// WaitForJob blocks until a job is available or context is cancelled.
WaitForJob(ctx context.Context) (*core.Job, error)
// CompleteJob is called when a job finishes execution.
CompleteJob(ctx context.Context, result *core.JobResult) error
// FailJob is called when a job fails with an error.
FailJob(ctx context.Context, result *core.JobResult) error
}
WorkerPool manages a set of workers that execute jobs.