worker

package
v0.0.0-...-d81d4e0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 22, 2025 License: MIT Imports: 7 Imported by: 0

README

TaskForge Phase 2B: Worker Engine

This directory contains the implementation of TaskForge's advanced worker engine, featuring enterprise-grade patterns for fault tolerance, resource management, and observability.

Architecture Overview

The worker engine implements three key design patterns:

1. Observer Pattern (Queue State Monitoring)
  • WorkerEventBus: Centralized event distribution system
  • WorkerObserver: Interface for monitoring worker lifecycle events
  • Built-in Observers:
    • MetricsObserver: Collects performance metrics
    • HealthMonitorObserver: Tracks worker health and triggers alerts
2. Command Pattern (Task Execution)
  • TaskCommand: Unified interface for task processing
  • TaskCommandRegistry: Pluggable task processor management
  • IsolatedTaskCommand: Decorator adding resource limits and circuit breaker protection
  • BaseTaskCommand: Common functionality for all task processors
3. Bulkhead Pattern (Failure Isolation)
  • BulkheadManager: Resource pool and circuit breaker coordination
  • ResourcePool: Isolated resource allocation per task type
  • CircuitBreaker: Prevents cascade failures
  • RateLimiter: Controls request rates

Core Components

Worker Pool
type WorkerPool struct {
    // Manages multiple worker instances
    // Provides lifecycle management (start/stop)
    // Handles graceful shutdown
    // Monitors worker health
}

Key Features:

  • Configurable concurrency levels
  • Automatic worker recovery
  • Graceful shutdown with task completion guarantees
  • Health monitoring with auto-restart capabilities
  • Resource usage tracking
Worker Implementation
type Worker struct {
    // Processes tasks using command pattern
    // Reports to observers via event bus
    // Integrates with Phase 2A retry logic
}

Key Features:

  • Task type-specific processing
  • Exponential backoff retry with jitter
  • Dead letter queue integration
  • Heartbeat monitoring
  • Context-aware cancellation
Resource Management
type ResourcePool struct {
    // Enforces resource limits per task type
    // Prevents resource exhaustion
    // Tracks resource usage metrics
}

Resource Types:

  • Memory allocation (MB)
  • CPU usage (percentage)
  • Concurrent task limits
  • Execution timeouts

Event System

Worker Events
  • WorkerEventStarted: Worker initialization
  • WorkerEventStopped: Worker shutdown
  • WorkerEventHealthy: Health check passed
  • WorkerEventUnhealthy: Health check failed
Task Events
  • TaskEventReceived: Task dequeued from Redis
  • TaskEventStarted: Task processing began
  • TaskEventCompleted: Task finished successfully
  • TaskEventFailed: Task failed (will retry)
  • TaskEventRetrying: Task scheduled for retry
System Events
  • QueueEventBacklog: Queue depth warning
  • CircuitBreakerOpened: Service failing
  • CircuitBreakerClosed: Service recovered

Usage Examples

Basic Worker Pool Setup
// Create configuration
config := types.DefaultConfig()
config.Worker.Concurrency = 5
config.Worker.Queues = []string{"high-priority", "normal", "low"}

// Create queue backend (from Phase 2A)
queueBackend, err := factory.CreateQueueBackend(&config.Queue, logger)

// Create worker pool
workerPool, err := worker.NewWorkerPool(
    "my-worker-pool",
    &config.Worker,
    queueBackend,
    metricsCollector,
    logger,
)

// Register task processors
emailProcessor := NewEmailProcessor(logger)
webhookProcessor := NewWebhookProcessor(logger)

workerPool.RegisterTaskProcessor(types.TaskTypeEmail, emailProcessor)
workerPool.RegisterTaskProcessor(types.TaskTypeWebhook, webhookProcessor)

// Start processing
ctx := context.Background()
if err := workerPool.Start(ctx); err != nil {
    log.Fatal(err)
}

// Graceful shutdown
workerPool.Stop(context.Background())
Custom Task Processor
type CustomProcessor struct {
    logger types.Logger
}

func (p *CustomProcessor) Process(ctx context.Context, task *types.Task) (*types.TaskResult, error) {
    // Parse task payload
    var payload CustomPayload
    if err := task.UnmarshalPayload(&payload); err != nil {
        return nil, err
    }
    
    // Process with timeout
    select {
    case result := <-p.processAsync(payload):
        return &types.TaskResult{
            TaskID: task.ID,
            Status: types.TaskStatusCompleted,
            Result: result,
        }, nil
    case <-ctx.Done():
        return nil, ctx.Err()
    }
}

func (p *CustomProcessor) GetSupportedTypes() []types.TaskType {
    return []types.TaskType{types.TaskTypeCustom}
}

func (p *CustomProcessor) GetCapabilities() []string {
    return []string{"async-processing", "timeout-handling"}
}
Custom Observer
type AlertingObserver struct {
    id     string
    slack  *slack.Client
}

func (o *AlertingObserver) OnWorkerEvent(ctx context.Context, data *WorkerEventData) {
    switch data.Event {
    case WorkerEventUnhealthy:
        o.sendAlert(fmt.Sprintf("Worker %s is unhealthy", data.WorkerID))
    case CircuitBreakerOpened:
        o.sendAlert(fmt.Sprintf("Circuit breaker opened for %s", data.Metadata["service"]))
    }
}

// Register with event bus
eventBus.RegisterObserverWithFilter(alertingObserver, []WorkerEvent{
    WorkerEventUnhealthy,
    CircuitBreakerOpened,
})

Configuration

Worker Configuration
type WorkerConfig struct {
    ID                string        // Worker identifier
    Queues            []string      // Queues to process
    Concurrency       int           // Concurrent tasks
    Timeout           time.Duration // Task timeout
    HeartbeatInterval time.Duration // Health check interval
    ShutdownTimeout   time.Duration // Graceful shutdown timeout
    
    // Retry configuration
    MaxRetries        int           // Max retry attempts
    RetryBackoff      string        // exponential, linear, fixed
    InitialDelay      time.Duration // First retry delay
    MaxDelay          time.Duration // Maximum retry delay
    BackoffFactor     float64       // Exponential multiplier
    
    // Resource limits
    MaxMemoryMB       int           // Memory limit
    MaxCPUPercent     int           // CPU limit
    
    // Task filtering
    SupportedTypes    []TaskType    // Allowed task types
    Capabilities      []string      // Required capabilities
}
Bulkhead Configuration
type BulkheadConfig struct {
    // Global resource limits
    MaxTotalMemoryMB   int
    MaxTotalCPUPercent int
    MaxConcurrentTasks int
    
    // Per-task-type pools
    TaskTypePools map[TaskType]ResourcePoolConfig
    
    // Circuit breaker settings
    DefaultCircuitBreaker ServiceCircuitConfig
    ServiceCircuitBreakers map[string]ServiceCircuitConfig
    
    // Rate limiting
    TaskTypeRateLimits map[TaskType]RateLimitSettings
}

Health Monitoring

Health Thresholds
type HealthThresholds struct {
    MaxConsecutiveFailures int           // 5 failures = unhealthy
    MaxFailureRate         float64       // 30% failure rate limit
    HeartbeatTimeout       time.Duration // 2 minutes without heartbeat
    MaxMemoryMB            float64       // Memory usage warning threshold
    MaxCPUPercent          float64       // CPU usage warning threshold
    MaxTaskDuration        time.Duration // Average duration warning
}
Health Score Calculation

Workers receive a health score from 0.0 (unhealthy) to 1.0 (perfect health) based on:

  • Consecutive failure count (-40% penalty)
  • Overall failure rate (-30% penalty)
  • Resource usage (-10% penalty each for memory/CPU)
  • Task processing speed (-10% penalty)
  • Heartbeat timeliness (-30% penalty)

Resource Management

Resource Requirements
type ResourceRequirements struct {
    MaxMemoryMB     int           // Memory allocation
    MaxCPUPercent   int           // CPU usage limit
    MaxDuration     time.Duration // Execution timeout
    RequiresGPU     bool          // GPU requirement
    RequiresNetwork bool          // Network access needed
    Priority        int           // Resource priority (1-5)
}
Default Resource Allocations
  • Webhook: 128MB, 20% CPU, 30s timeout
  • Email: 64MB, 15% CPU, 1m timeout
  • Image Processing: 512MB, 50% CPU, 5m timeout, GPU
  • Data Processing: 256MB, 40% CPU, 10m timeout
  • Batch Operations: 1GB, 60% CPU, 30m timeout

Circuit Breaker Protection

Circuit States
  • Closed: Normal operation, requests pass through
  • Open: Service failing, requests fail fast
  • Half-Open: Testing if service recovered
Configuration
type ServiceCircuitConfig struct {
    Threshold         int           // 5 failures to open
    Timeout           time.Duration // 60s stay-open duration
    MaxRequests       int           // 3 test requests in half-open
    ResetTimeout      time.Duration // 300s reset counter interval
    FailureThreshold  float64       // 50% failure rate limit
    MinRequestCount   int           // 10 requests minimum sample
}

Integration with Phase 2A

The worker engine seamlessly integrates with Phase 2A components:

Queue Backend Integration
// Uses existing QueueBackend interface
task, err := queueBackend.Dequeue(ctx, queue, timeout)
result := processor.Process(ctx, task)

// Retry logic with exponential backoff
if result.Status == types.TaskStatusFailed {
    retryAt := calculateNextRetry(task.CurrentRetries)
    queueBackend.ScheduleRetry(ctx, task.ID, retryAt)
}

// Dead letter queue for exhausted retries
if task.CurrentRetries >= task.MaxRetries {
    queueBackend.MoveToDLQ(ctx, task.ID, reason)
}
Redis Streams Features
  • Consumer group coordination
  • Priority queue processing
  • Automatic retry scheduling
  • Dead letter queue management
  • Task acknowledgment/NACK

Testing

Unit Tests

Run comprehensive unit tests:

make test
Integration Tests

Test with real Redis backend:

make test-integration
Worker Engine Demo

Run the full demo with Redis:

# Start Redis
docker-compose up redis -d

# Run demo
make worker-engine-demo
Benchmarks

Performance testing:

make benchmark

Monitoring and Metrics

Key Metrics
  • Task Throughput: Tasks/second per worker
  • Task Latency: Processing time distribution
  • Error Rates: Failures per task type
  • Resource Utilization: Memory/CPU usage
  • Queue Depths: Backlog per queue
  • Circuit Breaker States: Service health
  • Worker Health Scores: Overall worker condition
Prometheus Integration

The worker engine exposes metrics compatible with Prometheus:

// Task metrics
taskforge_tasks_processed_total{worker_id, task_type, queue}
taskforge_task_duration_seconds{worker_id, task_type, queue}  
taskforge_task_failures_total{worker_id, task_type, error_type}

// Resource metrics  
taskforge_memory_usage_bytes{worker_id, pool_id}
taskforge_cpu_usage_percent{worker_id}
taskforge_active_tasks{worker_id, queue}

// Health metrics
taskforge_worker_health_score{worker_id}
taskforge_circuit_breaker_state{service}
taskforge_queue_depth{queue}

Deployment

config := &types.WorkerConfig{
    Concurrency:       runtime.NumCPU() * 2,
    HeartbeatInterval: 30 * time.Second,
    ShutdownTimeout:   60 * time.Second,
    MaxRetries:        5,
    RetryBackoff:      "exponential",
    InitialDelay:      1 * time.Second,
    MaxDelay:          5 * time.Minute,
    BackoffFactor:     2.0,
    MaxMemoryMB:       1024,
    MaxCPUPercent:     80,
}
Scaling Considerations
  • Horizontal Scaling: Multiple worker pools across instances
  • Vertical Scaling: Increase concurrency per pool
  • Queue Partitioning: Separate queues by priority/type
  • Resource Isolation: Task-type-specific resource pools
  • Circuit Breakers: Prevent cascade failures
Monitoring Alerts
  • Worker health score < 0.5 (unhealthy)
  • Circuit breaker open for > 5 minutes
  • Queue depth > 1000 tasks
  • Memory usage > 85%
  • CPU usage > 90%
  • Task failure rate > 10%

Performance Characteristics

Throughput
  • High Priority Tasks: 1000+ tasks/second
  • Normal Priority: 500+ tasks/second
  • Batch Operations: 50+ tasks/second
Latency
  • Task Pickup: < 100ms from Redis
  • Processing Start: < 50ms after pickup
  • Health Checks: < 10ms per check
  • Event Propagation: < 5ms to observers
Resource Usage
  • Base Memory: ~50MB per worker pool
  • Per Task: Configurable via ResourceRequirements
  • CPU Overhead: ~5% for monitoring and coordination
  • Network: Minimal (Redis heartbeats only)

Troubleshooting

Common Issues
  1. High Memory Usage: Increase cleanup intervals, check for task leaks
  2. Circuit Breakers Stuck Open: Verify downstream service health
  3. Tasks Not Processing: Check queue connectivity and worker registration
  4. Health Scores Dropping: Review error rates and resource usage
Debug Logging

Enable debug logging to trace issues:

logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
    Level: slog.LevelDebug,
}))
Diagnostic Commands
# Check worker health
curl http://localhost:9090/health

# View metrics
curl http://localhost:9090/metrics

# Check queue stats  
redis-cli -c XLEN taskforge:queue:default

# View circuit breaker states
curl http://localhost:8080/api/v1/circuit-breakers

Future Enhancements

Planned Features
  • Auto-scaling: Dynamic worker pool sizing
  • Multi-region: Cross-region task distribution
  • Priority Preemption: Interrupt low-priority tasks
  • Task Dependencies: Workflow orchestration
  • Adaptive Timeouts: Machine learning-based timeout adjustment
  • Advanced Routing: Smart queue assignment
Extensibility Points
  • Custom observers for monitoring integration
  • Pluggable resource limiters
  • Custom circuit breaker policies
  • Task transformation pipelines
  • Custom retry strategies

This worker engine provides a foundation for distributed task processing with fault tolerance, resource management, and observability features.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AvailableResources

type AvailableResources struct {
	MemoryMB    int `json:"memory_mb"`
	CPUPercent  int `json:"cpu_percent"`
	ActiveTasks int `json:"active_tasks"`
	MaxTasks    int `json:"max_tasks"`
}

AvailableResources represents currently available system resources

type BaseTaskCommand

type BaseTaskCommand struct {
	// contains filtered or unexported fields
}

BaseTaskCommand provides a base implementation for task commands It includes common functionality like validation, resource management, and error handling

func NewBaseTaskCommand

func NewBaseTaskCommand(
	taskType types.TaskType,
	capabilities []string,
	requirements ResourceRequirements,
	processor types.TaskProcessor,
	logger types.Logger,
) *BaseTaskCommand

NewBaseTaskCommand creates a new base task command

func (*BaseTaskCommand) Execute

func (b *BaseTaskCommand) Execute(ctx context.Context, task *types.Task) (*types.TaskResult, error)

Execute processes the task using the underlying processor

func (*BaseTaskCommand) GetCapabilities

func (b *BaseTaskCommand) GetCapabilities() []string

GetCapabilities returns additional capabilities this command provides

func (*BaseTaskCommand) GetResourceRequirements

func (b *BaseTaskCommand) GetResourceRequirements() ResourceRequirements

GetResourceRequirements returns resource requirements for execution

func (*BaseTaskCommand) GetStats

func (b *BaseTaskCommand) GetStats() CommandStats

GetStats returns execution statistics for this command

func (*BaseTaskCommand) GetTaskType

func (b *BaseTaskCommand) GetTaskType() types.TaskType

GetTaskType returns the task type this command handles

func (*BaseTaskCommand) SupportsTask

func (b *BaseTaskCommand) SupportsTask(task *types.Task) bool

SupportsTask checks if this command can handle the given task

func (*BaseTaskCommand) Validate

func (b *BaseTaskCommand) Validate(task *types.Task) error

Validate checks if the task can be processed by this command

type BulkheadConfig

type BulkheadConfig struct {
	// Global resource limits
	MaxTotalMemoryMB   int `json:"max_total_memory_mb"`
	MaxTotalCPUPercent int `json:"max_total_cpu_percent"`
	MaxConcurrentTasks int `json:"max_concurrent_tasks"`

	// Per-task-type resource pools
	TaskTypePools map[types.TaskType]ResourcePoolConfig `json:"task_type_pools"`

	// Circuit breaker settings
	DefaultCircuitBreaker  ServiceCircuitConfig            `json:"default_circuit_breaker"`
	ServiceCircuitBreakers map[string]ServiceCircuitConfig `json:"service_circuit_breakers"`

	// Rate limiting settings
	DefaultRateLimit   RateLimitSettings                    `json:"default_rate_limit"`
	TaskTypeRateLimits map[types.TaskType]RateLimitSettings `json:"task_type_rate_limits"`

	// Health monitoring
	HealthCheckInterval time.Duration `json:"health_check_interval"`
	ResourceCleanupAge  time.Duration `json:"resource_cleanup_age"`
}

BulkheadConfig defines configuration for the bulkhead pattern

func DefaultBulkheadConfig

func DefaultBulkheadConfig() BulkheadConfig

DefaultBulkheadConfig returns sensible default bulkhead configuration

type BulkheadManager

type BulkheadManager struct {
	// contains filtered or unexported fields
}

BulkheadManager implements the Bulkhead pattern for failure isolation It manages resource pools and circuit breakers to prevent cascade failures

func NewBulkheadManager

func NewBulkheadManager(config BulkheadConfig, logger types.Logger) *BulkheadManager

NewBulkheadManager creates a new bulkhead manager

func (*BulkheadManager) Close

func (b *BulkheadManager) Close() error

Close shuts down the bulkhead manager gracefully

func (*BulkheadManager) GetCircuitBreaker

func (b *BulkheadManager) GetCircuitBreaker(service string) types.CircuitBreaker

GetCircuitBreaker returns a circuit breaker for the specified service

func (*BulkheadManager) GetRateLimiter

func (b *BulkheadManager) GetRateLimiter(taskType types.TaskType) types.RateLimiter

GetRateLimiter returns a rate limiter for the specified task type

func (*BulkheadManager) GetResourceLimiter

func (b *BulkheadManager) GetResourceLimiter(taskType types.TaskType) ResourceLimiter

GetResourceLimiter returns a resource limiter for the specified task type

func (*BulkheadManager) GetStats

func (b *BulkheadManager) GetStats() BulkheadStats

GetStats returns comprehensive bulkhead statistics

func (*BulkheadManager) GetSystemResourceUsage

func (b *BulkheadManager) GetSystemResourceUsage() AvailableResources

GetSystemResourceUsage returns current system resource usage

type BulkheadStats

type BulkheadStats struct {
	ResourcePools   map[string]ResourcePoolStats   `json:"resource_pools"`
	CircuitBreakers map[string]CircuitBreakerStats `json:"circuit_breakers"`
	RateLimiters    map[string]RateLimiterStats    `json:"rate_limiters"`
	SystemUsage     AvailableResources             `json:"system_usage"`
	TotalLimits     AvailableResources             `json:"total_limits"`
}

BulkheadStats contains comprehensive bulkhead statistics

type CircuitBreakerImpl

type CircuitBreakerImpl struct {
	// contains filtered or unexported fields
}

CircuitBreakerImpl implements the CircuitBreaker interface

func NewCircuitBreakerImpl

func NewCircuitBreakerImpl(service string, config ServiceCircuitConfig, logger types.Logger) *CircuitBreakerImpl

NewCircuitBreakerImpl creates a new circuit breaker implementation

func (*CircuitBreakerImpl) Execute

func (cb *CircuitBreakerImpl) Execute(fn func() error) error

Execute runs the function with circuit breaker protection

func (*CircuitBreakerImpl) GetStats

func (cb *CircuitBreakerImpl) GetStats() CircuitBreakerStats

GetStats returns circuit breaker statistics

func (*CircuitBreakerImpl) Reset

func (cb *CircuitBreakerImpl) Reset()

Reset manually resets the circuit breaker

func (*CircuitBreakerImpl) State

State returns the current circuit breaker state

type CircuitBreakerStats

type CircuitBreakerStats struct {
	Service             string                    `json:"service"`
	State               types.CircuitBreakerState `json:"state"`
	TotalRequests       int64                     `json:"total_requests"`
	SuccessfulRequests  int64                     `json:"successful_requests"`
	FailedRequests      int64                     `json:"failed_requests"`
	ConsecutiveFailures int                       `json:"consecutive_failures"`
	FailureRate         float64                   `json:"failure_rate"`
	LastStateChange     time.Time                 `json:"last_state_change"`
}

CircuitBreakerStats contains statistics for a circuit breaker

type CommandStats

type CommandStats struct {
	TaskType       types.TaskType `json:"task_type"`
	ExecutionCount int64          `json:"execution_count"`
	ErrorCount     int64          `json:"error_count"`
	ErrorRate      float64        `json:"error_rate"`
	TotalDuration  time.Duration  `json:"total_duration"`
	AvgDuration    time.Duration  `json:"avg_duration"`
}

CommandStats contains execution statistics for a command

type ContextKey

type ContextKey string

WorkerContextKey is a custom type for context keys to avoid collisions

const (
	WorkerIDKey ContextKey = "worker_id"
)

type Event

type Event string

Event represents different events in the worker lifecycle

const (
	// Worker lifecycle events
	EventStarted    Event = "worker_started"
	EventStopped    Event = "worker_stopped"
	EventDraining   Event = "worker_draining"
	EventHealthy    Event = "worker_healthy"
	EventUnhealthy  Event = "worker_unhealthy"
	EventRegistered Event = "worker_registered"

	// Task processing events
	TaskEventReceived  Event = "task_received"
	TaskEventStarted   Event = "task_started"
	TaskEventCompleted Event = "task_completed"
	TaskEventFailed    Event = "task_failed"
	TaskEventRetrying  Event = "task_retrying"
	TaskEventTimeout   Event = "task_timeout"

	// Queue monitoring events
	QueueEventBacklog    Event = "queue_backlog"
	QueueEventEmpty      Event = "queue_empty"
	QueueEventHighLoad   Event = "queue_high_load"
	QueueEventConnection Event = "queue_connection"

	// Circuit breaker events
	CircuitBreakerOpened   Event = "circuit_breaker_opened"
	CircuitBreakerClosed   Event = "circuit_breaker_closed"
	CircuitBreakerHalfOpen Event = "circuit_breaker_half_open"
)

type EventBus

type EventBus struct {
	// contains filtered or unexported fields
}

EventBus implements a centralized event bus for worker events It manages multiple observers and provides event routing

func NewEventBus

func NewEventBus(logger types.Logger, bufferSize int) *EventBus

NewEventBus creates a new event bus for worker monitoring

func (*EventBus) Close

func (bus *EventBus) Close() error

Close shuts down the event bus gracefully

func (*EventBus) GetActiveObserverCount

func (bus *EventBus) GetActiveObserverCount() int

GetActiveObserverCount returns the number of active observers

func (*EventBus) GetObserverCount

func (bus *EventBus) GetObserverCount() int

GetObserverCount returns the number of registered observers

func (*EventBus) NotifyObservers

func (bus *EventBus) NotifyObservers(_ context.Context, event Event, data *EventData)

NotifyObservers sends an event to all registered observers (async)

func (*EventBus) RegisterObserver

func (bus *EventBus) RegisterObserver(observer Observer)

RegisterObserver adds an observer to receive all events

func (*EventBus) RegisterObserverWithFilter

func (bus *EventBus) RegisterObserverWithFilter(observer Observer, events []Event)

RegisterObserverWithFilter adds an observer that only receives specific events

func (*EventBus) UnregisterObserver

func (bus *EventBus) UnregisterObserver(observerID string)

UnregisterObserver removes an observer

type EventData

type EventData struct {
	Event     Event                  `json:"event"`
	WorkerID  string                 `json:"worker_id"`
	Timestamp time.Time              `json:"timestamp"`
	TaskID    string                 `json:"task_id,omitempty"`
	TaskType  types.TaskType         `json:"task_type,omitempty"`
	Queue     string                 `json:"queue,omitempty"`
	Error     error                  `json:"error,omitempty"`
	Duration  time.Duration          `json:"duration,omitempty"`
	Metadata  map[string]interface{} `json:"metadata,omitempty"`

	// Health and resource metrics
	MemoryUsageMB   float64 `json:"memory_usage_mb,omitempty"`
	CPUUsagePercent float64 `json:"cpu_usage_percent,omitempty"`
	ActiveTasks     int     `json:"active_tasks,omitempty"`
	CompletedTasks  int64   `json:"completed_tasks,omitempty"`
	FailedTasks     int64   `json:"failed_tasks,omitempty"`

	// Queue metrics
	QueueDepth    int64 `json:"queue_depth,omitempty"`
	ActiveWorkers int   `json:"active_workers,omitempty"`
}

EventData contains detailed information about a worker event

type HealthMonitorObserver

type HealthMonitorObserver struct {
	// contains filtered or unexported fields
}

HealthMonitorObserver monitors worker health and triggers alerts/actions

func NewHealthMonitorObserver

func NewHealthMonitorObserver(id string, logger types.Logger, thresholds HealthThresholds) *HealthMonitorObserver

NewHealthMonitorObserver creates a new health monitoring observer

func (*HealthMonitorObserver) Cleanup

func (h *HealthMonitorObserver) Cleanup(maxAge time.Duration) int

Cleanup removes health state for workers that haven't been seen recently

func (*HealthMonitorObserver) GetAllWorkerHealth

func (h *HealthMonitorObserver) GetAllWorkerHealth() map[string]*HealthState

GetAllWorkerHealth returns health states for all tracked workers

func (*HealthMonitorObserver) GetObserverID

func (h *HealthMonitorObserver) GetObserverID() string

GetObserverID returns the unique identifier for this observer

func (*HealthMonitorObserver) GetWorkerHealth

func (h *HealthMonitorObserver) GetWorkerHealth(workerID string) *HealthState

GetWorkerHealth returns the health state for a specific worker

func (*HealthMonitorObserver) IsActive

func (h *HealthMonitorObserver) IsActive() bool

IsActive returns whether this observer is currently active

func (*HealthMonitorObserver) OnWorkerEvent

func (h *HealthMonitorObserver) OnWorkerEvent(_ context.Context, data *EventData)

OnWorkerEvent processes worker events to monitor health

func (*HealthMonitorObserver) SetActive

func (h *HealthMonitorObserver) SetActive(active bool)

SetActive enables or disables this observer

func (*HealthMonitorObserver) SetCallbacks

func (h *HealthMonitorObserver) SetCallbacks(
	onUnhealthy func(string, *HealthState),
	onRecovered func(string, *HealthState),
	onHighFailure func(string, float64),
)

SetCallbacks sets callback functions for health events

type HealthState

type HealthState struct {
	WorkerID            string
	LastSeen            time.Time
	Status              types.WorkerStatus
	ConsecutiveFailures int
	TotalTasks          int64
	FailedTasks         int64
	SuccessTasks        int64
	AvgTaskDuration     time.Duration
	MemoryUsageMB       float64
	CPUUsagePercent     float64
	ActiveTasks         int

	// Health indicators
	IsHealthy       bool
	LastHealthCheck time.Time
	HealthScore     float64 // 0.0 = unhealthy, 1.0 = perfect health
}

WorkerHealthState tracks the health state of a worker

type HealthThresholds

type HealthThresholds struct {
	MaxConsecutiveFailures int           // Max consecutive failures before unhealthy
	MaxFailureRate         float64       // Max failure rate (0.0-1.0) before unhealthy
	HeartbeatTimeout       time.Duration // Max time without heartbeat
	MaxMemoryMB            float64       // Max memory usage before warning
	MaxCPUPercent          float64       // Max CPU usage before warning
	MaxTaskDuration        time.Duration // Max avg task duration before warning
}

HealthThresholds defines thresholds for health monitoring

func DefaultHealthThresholds

func DefaultHealthThresholds() HealthThresholds

DefaultHealthThresholds returns sensible default health thresholds

type Instance

type Instance struct {
	// contains filtered or unexported fields
}

Instance represents a single worker in the pool

type IsolatedTaskCommand

type IsolatedTaskCommand struct {
	TaskCommand
	// contains filtered or unexported fields
}

IsolatedTaskCommand wraps a task command with additional isolation features It implements the Decorator pattern to add isolation capabilities

func NewIsolatedTaskCommand

func NewIsolatedTaskCommand(
	baseCommand TaskCommand,
	resourceLimiter ResourceLimiter,
	circuitBreaker types.CircuitBreaker,
	rateLimiter types.RateLimiter,
	logger types.Logger,
) *IsolatedTaskCommand

NewIsolatedTaskCommand creates a new isolated task command

func (*IsolatedTaskCommand) Execute

func (i *IsolatedTaskCommand) Execute(ctx context.Context, task *types.Task) (*types.TaskResult, error)

Execute processes the task with isolation features

func (*IsolatedTaskCommand) GetIsolationStats

func (i *IsolatedTaskCommand) GetIsolationStats() IsolationStats

GetIsolationStats returns isolation-related statistics

type IsolationStats

type IsolationStats struct {
	TaskType              types.TaskType `json:"task_type"`
	IsolationViolations   int64          `json:"isolation_violations"`
	RateLimitHits         int64          `json:"rate_limit_hits"`
	CircuitBreakerHits    int64          `json:"circuit_breaker_hits"`
	ResourceLimitsEnabled bool           `json:"resource_limits_enabled"`
	CircuitBreakerEnabled bool           `json:"circuit_breaker_enabled"`
	RateLimitEnabled      bool           `json:"rate_limit_enabled"`
}

IsolationStats contains isolation-related statistics

type MetricsObserver

type MetricsObserver struct {
	// contains filtered or unexported fields
}

MetricsObserver implements Observer to collect metrics from worker events It integrates with the MetricsCollector interface from types

func NewMetricsObserver

func NewMetricsObserver(id string, collector types.MetricsCollector, logger types.Logger) *MetricsObserver

NewMetricsObserver creates a new metrics observer

func (*MetricsObserver) GetObserverID

func (m *MetricsObserver) GetObserverID() string

GetObserverID returns the unique identifier for this observer

func (*MetricsObserver) GetStats

func (m *MetricsObserver) GetStats() map[string]interface{}

GetStats returns statistics about this metrics observer

func (*MetricsObserver) IsActive

func (m *MetricsObserver) IsActive() bool

IsActive returns whether this observer is currently active

func (*MetricsObserver) OnWorkerEvent

func (m *MetricsObserver) OnWorkerEvent(_ context.Context, data *EventData)

OnWorkerEvent processes worker events and collects relevant metrics

func (*MetricsObserver) SetActive

func (m *MetricsObserver) SetActive(active bool)

SetActive enables or disables this observer

type Observer

type Observer interface {
	// OnWorkerEvent is called when a worker event occurs
	OnWorkerEvent(ctx context.Context, data *EventData)

	// GetObserverID returns a unique identifier for this observer
	GetObserverID() string

	// IsActive returns whether this observer is currently active
	IsActive() bool
}

Observer defines the interface for observing worker events This enables the Observer pattern for monitoring and reacting to worker state changes

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

WorkerPool implements the main worker pool with lifecycle management It orchestrates workers, observes their behavior, and manages graceful shutdown

func NewWorkerPool

func NewWorkerPool(
	id string,
	config *types.WorkerConfig,
	queueBackend types.QueueBackend,
	metricsCollector types.MetricsCollector,
	logger types.Logger,
) (*Pool, error)

NewWorkerPool creates a new worker pool

func (*Pool) GetInfo

func (wp *Pool) GetInfo() *PoolInfo

GetInfo returns information about the worker pool

func (*Pool) RegisterTaskProcessor

func (wp *Pool) RegisterTaskProcessor(taskType types.TaskType, processor types.TaskProcessor) error

RegisterTaskProcessor registers a task processor with the worker pool

func (*Pool) Start

func (wp *Pool) Start(ctx context.Context) error

Start starts the worker pool

func (*Pool) Stop

func (wp *Pool) Stop(ctx context.Context) error

Stop gracefully stops the worker pool

type PoolInfo

type PoolInfo struct {
	ID             string             `json:"id"`
	Hostname       string             `json:"hostname"`
	State          PoolState          `json:"state"`
	WorkerCount    int                `json:"worker_count"`
	Concurrency    int                `json:"concurrency"`
	Queues         []string           `json:"queues"`
	SupportedTypes []types.TaskType   `json:"supported_types"`
	Capabilities   []string           `json:"capabilities"`
	StartTime      time.Time          `json:"start_time"`
	LastHeartbeat  time.Time          `json:"last_heartbeat"`
	ResourceUsage  AvailableResources `json:"resource_usage"`
}

WorkerPoolInfo contains information about the worker pool

type PoolState

type PoolState string

PoolState represents the current state of the worker pool

const (
	PoolStateIdle     PoolState = "idle"     // Not started
	PoolStateStarting PoolState = "starting" // Starting up
	PoolStateRunning  PoolState = "running"  // Processing tasks
	PoolStateDraining PoolState = "draining" // Gracefully shutting down
	PoolStateStopped  PoolState = "stopped"  // Fully stopped
)

type RateLimitSettings

type RateLimitSettings struct {
	RequestsPerSecond int           `json:"requests_per_second"`
	BurstSize         int           `json:"burst_size"`
	WindowSize        time.Duration `json:"window_size"`
}

RateLimitSettings defines rate limiting configuration

type RateLimiterImpl

type RateLimiterImpl struct {
	// contains filtered or unexported fields
}

RateLimiterImpl implements a token bucket rate limiter

func NewRateLimiterImpl

func NewRateLimiterImpl(key string, config RateLimitSettings, logger types.Logger) *RateLimiterImpl

NewRateLimiterImpl creates a new rate limiter implementation

func (*RateLimiterImpl) Allow

func (rl *RateLimiterImpl) Allow(ctx context.Context, key string) (bool, error)

Allow checks if an operation is allowed under the rate limit

func (*RateLimiterImpl) AllowN

func (rl *RateLimiterImpl) AllowN(_ context.Context, _ string, n int) (bool, error)

AllowN checks if N operations are allowed

func (*RateLimiterImpl) GetStats

func (rl *RateLimiterImpl) GetStats() RateLimiterStats

GetStats returns rate limiter statistics

func (*RateLimiterImpl) Reset

func (rl *RateLimiterImpl) Reset(_ context.Context, key string) error

Reset resets the rate limiter for a specific key

type RateLimiterStats

type RateLimiterStats struct {
	Key              string        `json:"key"`
	TotalRequests    int64         `json:"total_requests"`
	AllowedRequests  int64         `json:"allowed_requests"`
	RejectedRequests int64         `json:"rejected_requests"`
	CurrentRate      float64       `json:"current_rate"`
	BurstCapacity    int           `json:"burst_capacity"`
	WindowSize       time.Duration `json:"window_size"`
}

RateLimiterStats contains statistics for a rate limiter

type ResourceLimiter

type ResourceLimiter interface {
	// AcquireResources attempts to acquire resources for task execution
	AcquireResources(ctx context.Context, requirements ResourceRequirements) (ResourceToken, error)

	// GetAvailableResources returns currently available resources
	GetAvailableResources() AvailableResources
}

ResourceLimiter defines interface for resource limiting

type ResourcePool

type ResourcePool struct {
	ID string
	// contains filtered or unexported fields
}

ResourcePool implements resource management for the Bulkhead pattern It provides isolated resource allocation with configurable limits

func NewResourcePool

func NewResourcePool(id string, config ResourcePoolConfig, logger types.Logger) *ResourcePool

NewResourcePool creates a new resource pool

func (*ResourcePool) AcquireResources

func (p *ResourcePool) AcquireResources(_ context.Context, requirements ResourceRequirements) (ResourceToken, error)

AcquireResources attempts to acquire resources for task execution

func (*ResourcePool) CleanupStaleTokens

func (p *ResourcePool) CleanupStaleTokens(cutoff time.Time) int

CleanupStaleTokens removes tokens that are older than the specified cutoff

func (*ResourcePool) Close

func (p *ResourcePool) Close()

Close gracefully shuts down the resource pool

func (*ResourcePool) GetAvailableResources

func (p *ResourcePool) GetAvailableResources() AvailableResources

GetAvailableResources returns currently available resources

func (*ResourcePool) GetCurrentUsage

func (p *ResourcePool) GetCurrentUsage() AvailableResources

GetCurrentUsage returns current resource usage

func (*ResourcePool) GetStats

func (p *ResourcePool) GetStats() ResourcePoolStats

GetStats returns resource pool statistics

type ResourcePoolConfig

type ResourcePoolConfig struct {
	MaxMemoryMB        int           `json:"max_memory_mb"`
	MaxCPUPercent      int           `json:"max_cpu_percent"`
	MaxConcurrentTasks int           `json:"max_concurrent_tasks"`
	TaskTimeout        time.Duration `json:"task_timeout"`
	Priority           int           `json:"priority"` // Higher priority gets more resources during contention
}

ResourcePoolConfig defines configuration for a resource pool

type ResourcePoolStats

type ResourcePoolStats struct {
	PoolID           string             `json:"pool_id"`
	ActiveTokens     int                `json:"active_tokens"`
	TotalRequests    int64              `json:"total_requests"`
	RejectedRequests int64              `json:"rejected_requests"`
	CurrentUsage     AvailableResources `json:"current_usage"`
	MaxUsage         ResourcePoolConfig `json:"max_usage"`
	AverageWaitTime  time.Duration      `json:"average_wait_time"`
}

ResourcePoolStats contains statistics for a resource pool

type ResourceRequirements

type ResourceRequirements struct {
	MaxMemoryMB     int           `json:"max_memory_mb"`
	MaxCPUPercent   int           `json:"max_cpu_percent"`
	MaxDuration     time.Duration `json:"max_duration"`
	RequiresGPU     bool          `json:"requires_gpu"`
	RequiresNetwork bool          `json:"requires_network"`
	Priority        int           `json:"priority"` // Higher priority gets more resources
}

ResourceRequirements defines resource requirements for task execution

type ResourceToken

type ResourceToken interface {
	// Release returns the acquired resources
	Release()

	// GetAcquiredResources returns the resources acquired by this token
	GetAcquiredResources() ResourceRequirements
}

ResourceToken represents acquired resources that must be released

type ResourceTokenImpl

type ResourceTokenImpl struct {
	ID string
	// contains filtered or unexported fields
}

ResourceTokenImpl implements ResourceToken interface

func (*ResourceTokenImpl) GetAcquiredResources

func (t *ResourceTokenImpl) GetAcquiredResources() ResourceRequirements

GetAcquiredResources returns the resources acquired by this token

func (*ResourceTokenImpl) Release

func (t *ResourceTokenImpl) Release()

Release returns the acquired resources to the pool

type ServiceCircuitConfig

type ServiceCircuitConfig struct {
	Threshold        int           `json:"threshold"`         // failures before opening
	Timeout          time.Duration `json:"timeout"`           // how long to stay open
	MaxRequests      int           `json:"max_requests"`      // max requests in half-open
	ResetTimeout     time.Duration `json:"reset_timeout"`     // time to reset counters
	FailureThreshold float64       `json:"failure_threshold"` // failure rate threshold (0.0-1.0)
	MinRequestCount  int           `json:"min_request_count"` // minimum requests before checking failure rate
}

ServiceCircuitConfig defines circuit breaker settings for a service

type Subject

type Subject interface {
	// RegisterObserver adds an observer to receive events
	RegisterObserver(observer Observer)

	// UnregisterObserver removes an observer
	UnregisterObserver(observerID string)

	// NotifyObservers sends an event to all registered observers
	NotifyObservers(ctx context.Context, event Event, data *EventData)
}

Subject defines the interface for objects that can be observed Workers implement this interface to support the Observer pattern

type TaskCommand

type TaskCommand interface {
	// Execute processes the task and returns a result
	Execute(ctx context.Context, task *types.Task) (*types.TaskResult, error)

	// GetTaskType returns the task type this command handles
	GetTaskType() types.TaskType

	// GetCapabilities returns additional capabilities this command provides
	GetCapabilities() []string

	// Validate checks if the task can be processed by this command
	Validate(task *types.Task) error

	// GetResourceRequirements returns resource requirements for execution
	GetResourceRequirements() ResourceRequirements

	// SupportsTask checks if this command can handle the given task
	SupportsTask(task *types.Task) bool
}

TaskCommand represents a command in the Command pattern for task execution This provides a consistent interface for executing different types of tasks with proper isolation, timeout handling, and resource management

type TaskCommandRegistry

type TaskCommandRegistry struct {
	// contains filtered or unexported fields
}

TaskCommandRegistry manages task commands and provides command resolution It implements the Registry pattern for pluggable task processors

func NewTaskCommandRegistry

func NewTaskCommandRegistry(logger types.Logger) *TaskCommandRegistry

NewTaskCommandRegistry creates a new command registry

func (*TaskCommandRegistry) GetAllCapabilities

func (r *TaskCommandRegistry) GetAllCapabilities() []string

GetAllCapabilities returns all available capabilities

func (*TaskCommandRegistry) GetCommand

func (r *TaskCommandRegistry) GetCommand(taskType types.TaskType) (TaskCommand, error)

GetCommand retrieves a command for the specified task type

func (*TaskCommandRegistry) GetCommandForTask

func (r *TaskCommandRegistry) GetCommandForTask(task *types.Task) (TaskCommand, error)

GetCommandForTask finds the best command to handle a specific task

func (*TaskCommandRegistry) GetCommandsByCapability

func (r *TaskCommandRegistry) GetCommandsByCapability(capability string) []TaskCommand

GetCommandsByCapability returns all commands that support a specific capability

func (*TaskCommandRegistry) GetSupportedTypes

func (r *TaskCommandRegistry) GetSupportedTypes() []types.TaskType

GetSupportedTypes returns all supported task types

func (*TaskCommandRegistry) RegisterCommand

func (r *TaskCommandRegistry) RegisterCommand(cmd TaskCommand) error

RegisterCommand registers a new task command

func (*TaskCommandRegistry) UnregisterCommand

func (r *TaskCommandRegistry) UnregisterCommand(taskType types.TaskType)

UnregisterCommand removes a task command

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker implements the types.Worker interface with enhanced functionality It processes tasks using the command pattern and reports to observers

func NewWorker

func NewWorker(
	id string,
	config *types.WorkerConfig,
	queueBackend types.QueueBackend,
	commandRegistry *TaskCommandRegistry,
	eventBus *EventBus,
	logger types.Logger,
) *Worker

NewWorker creates a new worker instance

func (*Worker) GetInfo

func (w *Worker) GetInfo() *types.WorkerInfo

GetInfo returns current worker information

func (*Worker) Heartbeat

func (w *Worker) Heartbeat(ctx context.Context) error

Heartbeat updates the worker's status and metadata

func (*Worker) RegisterProcessor

func (w *Worker) RegisterProcessor(taskType types.TaskType, processor types.TaskProcessor) error

RegisterProcessor adds a task processor for specific task types

func (*Worker) Start

func (w *Worker) Start(ctx context.Context, queues []string) error

Start begins processing tasks from specified queues

func (*Worker) Stop

func (w *Worker) Stop(ctx context.Context) error

Stop gracefully stops the worker, finishing current tasks

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL