worker

package
v1.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 21, 2026 License: MIT Imports: 11 Imported by: 0

Documentation

Overview

Package worker provides the dispatcher that bridges scheduler and worker pool.

Package worker provides job execution capabilities for GopherQueue.

Package worker provides a production-ready worker pool implementation.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Dispatcher

type Dispatcher interface {
	// Start begins the dispatcher's main loop.
	Start(ctx context.Context) error

	// Stop gracefully shuts down the dispatcher.
	Stop(ctx context.Context) error

	// Stats returns dispatcher statistics.
	Stats() *DispatcherStats

	// IsHealthy returns whether the dispatcher is operating normally.
	IsHealthy() bool
}

Dispatcher bridges the scheduler and worker pool, managing job flow.

type DispatcherStats

type DispatcherStats struct {
	JobsDispatched     int64         `json:"jobs_dispatched"`
	JobsCompleted      int64         `json:"jobs_completed"`
	JobsFailed         int64         `json:"jobs_failed"`
	AvgDispatchLatency time.Duration `json:"avg_dispatch_latency"`
	Healthy            bool          `json:"healthy"`
}

DispatcherStats contains dispatcher metrics.

type Handler

type Handler func(ctx context.Context, jctx core.JobContext) error

Handler is a function that processes a job.

type JobContext

type JobContext = core.JobContext

JobContext is an alias to core.JobContext for external use.

type PoolStats

type PoolStats struct {
	// Worker counts
	TotalWorkers  int `json:"total_workers"`
	ActiveWorkers int `json:"active_workers"`
	IdleWorkers   int `json:"idle_workers"`

	// Job counts
	QueuedJobs    int64 `json:"queued_jobs"`
	ProcessedJobs int64 `json:"processed_jobs"`
	SucceededJobs int64 `json:"succeeded_jobs"`
	FailedJobs    int64 `json:"failed_jobs"`
	RetriedJobs   int64 `json:"retried_jobs"`

	// Latencies
	AvgProcessingTime time.Duration `json:"avg_processing_time"`
	MaxProcessingTime time.Duration `json:"max_processing_time"`
	P99ProcessingTime time.Duration `json:"p99_processing_time"`

	// Current state
	ProcessingIDs []uuid.UUID   `json:"processing_ids"`
	Uptime        time.Duration `json:"uptime"`
	Healthy       bool          `json:"healthy"`
}

PoolStats contains worker pool metrics.

type SimpleDispatcher

type SimpleDispatcher struct {
	// contains filtered or unexported fields
}

SimpleDispatcher is a basic dispatcher implementation.

func NewSimpleDispatcher

func NewSimpleDispatcher(sched scheduler.Scheduler, pool WorkerPool) *SimpleDispatcher

NewSimpleDispatcher creates a new dispatcher.

func (*SimpleDispatcher) IsHealthy

func (d *SimpleDispatcher) IsHealthy() bool

IsHealthy returns whether the dispatcher is operating normally.

func (*SimpleDispatcher) RecordCompletion

func (d *SimpleDispatcher) RecordCompletion(result *core.JobResult)

RecordCompletion records a successful job completion.

func (*SimpleDispatcher) Start

func (d *SimpleDispatcher) Start(ctx context.Context) error

Start begins the dispatcher's main loop.

func (*SimpleDispatcher) Stats

func (d *SimpleDispatcher) Stats() *DispatcherStats

Stats returns dispatcher statistics.

func (*SimpleDispatcher) Stop

func (d *SimpleDispatcher) Stop(ctx context.Context) error

Stop gracefully shuts down the dispatcher.

type SimplePool

type SimplePool struct {
	// contains filtered or unexported fields
}

SimplePool is a production-ready worker pool implementation.

func NewSimplePool

func NewSimplePool(store persistence.JobStore, sched scheduler.Scheduler, config *WorkerConfig) *SimplePool

NewSimplePool creates a new worker pool.

func (*SimplePool) CompleteJob

func (p *SimplePool) CompleteJob(ctx context.Context, result *core.JobResult) error

CompleteJob is called when a job finishes successfully.

func (*SimplePool) FailJob

func (p *SimplePool) FailJob(ctx context.Context, result *core.JobResult) error

FailJob is called when a job fails.

func (*SimplePool) IsHealthy

func (p *SimplePool) IsHealthy() bool

IsHealthy returns whether the worker pool is operating normally.

func (*SimplePool) RegisterHandler

func (p *SimplePool) RegisterHandler(jobType string, handler Handler)

RegisterHandler registers a handler for a job type.

func (*SimplePool) Start

func (p *SimplePool) Start(ctx context.Context) error

Start initializes the worker pool and begins processing.

func (*SimplePool) Stats

func (p *SimplePool) Stats() *PoolStats

Stats returns current worker pool statistics.

func (*SimplePool) Stop

func (p *SimplePool) Stop(ctx context.Context) error

Stop gracefully shuts down all workers.

func (*SimplePool) Submit

func (p *SimplePool) Submit(job *core.Job) error

Submit adds a job to the pool for execution.

func (*SimplePool) WaitForJob

func (p *SimplePool) WaitForJob(ctx context.Context) (*core.Job, error)

WaitForJob blocks until a job is available.

type WorkerConfig

type WorkerConfig struct {
	// ID is the unique identifier for this worker instance.
	ID string

	// Concurrency is the number of concurrent workers.
	Concurrency int

	// HeartbeatInterval is how often workers report heartbeat.
	HeartbeatInterval time.Duration

	// JobTimeout is the maximum time a job can run before being killed.
	JobTimeout time.Duration

	// ShutdownGracePeriod is the time to wait for jobs to complete on shutdown.
	ShutdownGracePeriod time.Duration

	// CheckpointInterval is how often to checkpoint long-running jobs.
	CheckpointInterval time.Duration

	// DataDir is the directory for local storage
	DataDir string
}

WorkerConfig configures worker behavior.

func DefaultWorkerConfig

func DefaultWorkerConfig() *WorkerConfig

DefaultWorkerConfig returns sensible defaults.

type WorkerPool

type WorkerPool interface {
	// Start initializes the worker pool and begins processing.
	Start(ctx context.Context) error

	// Stop gracefully shuts down all workers.
	Stop(ctx context.Context) error

	// RegisterHandler registers a handler for a job type.
	RegisterHandler(jobType string, handler Handler)

	// Submit adds a job to the pool for execution.
	Submit(job *core.Job) error

	// Stats returns current worker pool statistics.
	Stats() *PoolStats

	// IsHealthy returns whether the worker pool is operating normally.
	IsHealthy() bool

	// WaitForJob blocks until a job is available or context is cancelled.
	WaitForJob(ctx context.Context) (*core.Job, error)

	// CompleteJob is called when a job finishes execution.
	CompleteJob(ctx context.Context, result *core.JobResult) error

	// FailJob is called when a job fails with an error.
	FailJob(ctx context.Context, result *core.JobResult) error
}

WorkerPool manages a set of workers that execute jobs.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL