Documentation
¶
Index ¶
- type AvailableResources
- type BaseTaskCommand
- func (b *BaseTaskCommand) Execute(ctx context.Context, task *types.Task) (*types.TaskResult, error)
- func (b *BaseTaskCommand) GetCapabilities() []string
- func (b *BaseTaskCommand) GetResourceRequirements() ResourceRequirements
- func (b *BaseTaskCommand) GetStats() CommandStats
- func (b *BaseTaskCommand) GetTaskType() types.TaskType
- func (b *BaseTaskCommand) SupportsTask(task *types.Task) bool
- func (b *BaseTaskCommand) Validate(task *types.Task) error
- type BulkheadConfig
- type BulkheadManager
- func (b *BulkheadManager) Close() error
- func (b *BulkheadManager) GetCircuitBreaker(service string) types.CircuitBreaker
- func (b *BulkheadManager) GetRateLimiter(taskType types.TaskType) types.RateLimiter
- func (b *BulkheadManager) GetResourceLimiter(taskType types.TaskType) ResourceLimiter
- func (b *BulkheadManager) GetStats() BulkheadStats
- func (b *BulkheadManager) GetSystemResourceUsage() AvailableResources
- type BulkheadStats
- type CircuitBreakerImpl
- type CircuitBreakerStats
- type CommandStats
- type ContextKey
- type Event
- type EventBus
- func (bus *EventBus) Close() error
- func (bus *EventBus) GetActiveObserverCount() int
- func (bus *EventBus) GetObserverCount() int
- func (bus *EventBus) NotifyObservers(_ context.Context, event Event, data *EventData)
- func (bus *EventBus) RegisterObserver(observer Observer)
- func (bus *EventBus) RegisterObserverWithFilter(observer Observer, events []Event)
- func (bus *EventBus) UnregisterObserver(observerID string)
- type EventData
- type HealthMonitorObserver
- func (h *HealthMonitorObserver) Cleanup(maxAge time.Duration) int
- func (h *HealthMonitorObserver) GetAllWorkerHealth() map[string]*HealthState
- func (h *HealthMonitorObserver) GetObserverID() string
- func (h *HealthMonitorObserver) GetWorkerHealth(workerID string) *HealthState
- func (h *HealthMonitorObserver) IsActive() bool
- func (h *HealthMonitorObserver) OnWorkerEvent(_ context.Context, data *EventData)
- func (h *HealthMonitorObserver) SetActive(active bool)
- func (h *HealthMonitorObserver) SetCallbacks(onUnhealthy func(string, *HealthState), onRecovered func(string, *HealthState), ...)
- type HealthState
- type HealthThresholds
- type Instance
- type IsolatedTaskCommand
- type IsolationStats
- type MetricsObserver
- type Observer
- type Pool
- type PoolInfo
- type PoolState
- type RateLimitSettings
- type RateLimiterImpl
- type RateLimiterStats
- type ResourceLimiter
- type ResourcePool
- func (p *ResourcePool) AcquireResources(_ context.Context, requirements ResourceRequirements) (ResourceToken, error)
- func (p *ResourcePool) CleanupStaleTokens(cutoff time.Time) int
- func (p *ResourcePool) Close()
- func (p *ResourcePool) GetAvailableResources() AvailableResources
- func (p *ResourcePool) GetCurrentUsage() AvailableResources
- func (p *ResourcePool) GetStats() ResourcePoolStats
- type ResourcePoolConfig
- type ResourcePoolStats
- type ResourceRequirements
- type ResourceToken
- type ResourceTokenImpl
- type ServiceCircuitConfig
- type Subject
- type TaskCommand
- type TaskCommandRegistry
- func (r *TaskCommandRegistry) GetAllCapabilities() []string
- func (r *TaskCommandRegistry) GetCommand(taskType types.TaskType) (TaskCommand, error)
- func (r *TaskCommandRegistry) GetCommandForTask(task *types.Task) (TaskCommand, error)
- func (r *TaskCommandRegistry) GetCommandsByCapability(capability string) []TaskCommand
- func (r *TaskCommandRegistry) GetSupportedTypes() []types.TaskType
- func (r *TaskCommandRegistry) RegisterCommand(cmd TaskCommand) error
- func (r *TaskCommandRegistry) UnregisterCommand(taskType types.TaskType)
- type Worker
- func (w *Worker) GetInfo() *types.WorkerInfo
- func (w *Worker) Heartbeat(ctx context.Context) error
- func (w *Worker) RegisterProcessor(taskType types.TaskType, processor types.TaskProcessor) error
- func (w *Worker) Start(ctx context.Context, queues []string) error
- func (w *Worker) Stop(ctx context.Context) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AvailableResources ¶
type AvailableResources struct {
MemoryMB int `json:"memory_mb"`
CPUPercent int `json:"cpu_percent"`
ActiveTasks int `json:"active_tasks"`
MaxTasks int `json:"max_tasks"`
}
AvailableResources represents currently available system resources
type BaseTaskCommand ¶
type BaseTaskCommand struct {
// contains filtered or unexported fields
}
BaseTaskCommand provides a base implementation for task commands It includes common functionality like validation, resource management, and error handling
func NewBaseTaskCommand ¶
func NewBaseTaskCommand( taskType types.TaskType, capabilities []string, requirements ResourceRequirements, processor types.TaskProcessor, logger types.Logger, ) *BaseTaskCommand
NewBaseTaskCommand creates a new base task command
func (*BaseTaskCommand) Execute ¶
func (b *BaseTaskCommand) Execute(ctx context.Context, task *types.Task) (*types.TaskResult, error)
Execute processes the task using the underlying processor
func (*BaseTaskCommand) GetCapabilities ¶
func (b *BaseTaskCommand) GetCapabilities() []string
GetCapabilities returns additional capabilities this command provides
func (*BaseTaskCommand) GetResourceRequirements ¶
func (b *BaseTaskCommand) GetResourceRequirements() ResourceRequirements
GetResourceRequirements returns resource requirements for execution
func (*BaseTaskCommand) GetStats ¶
func (b *BaseTaskCommand) GetStats() CommandStats
GetStats returns execution statistics for this command
func (*BaseTaskCommand) GetTaskType ¶
func (b *BaseTaskCommand) GetTaskType() types.TaskType
GetTaskType returns the task type this command handles
func (*BaseTaskCommand) SupportsTask ¶
func (b *BaseTaskCommand) SupportsTask(task *types.Task) bool
SupportsTask checks if this command can handle the given task
type BulkheadConfig ¶
type BulkheadConfig struct {
// Global resource limits
MaxTotalMemoryMB int `json:"max_total_memory_mb"`
MaxTotalCPUPercent int `json:"max_total_cpu_percent"`
MaxConcurrentTasks int `json:"max_concurrent_tasks"`
// Per-task-type resource pools
TaskTypePools map[types.TaskType]ResourcePoolConfig `json:"task_type_pools"`
// Circuit breaker settings
DefaultCircuitBreaker ServiceCircuitConfig `json:"default_circuit_breaker"`
ServiceCircuitBreakers map[string]ServiceCircuitConfig `json:"service_circuit_breakers"`
// Rate limiting settings
DefaultRateLimit RateLimitSettings `json:"default_rate_limit"`
TaskTypeRateLimits map[types.TaskType]RateLimitSettings `json:"task_type_rate_limits"`
// Health monitoring
HealthCheckInterval time.Duration `json:"health_check_interval"`
ResourceCleanupAge time.Duration `json:"resource_cleanup_age"`
}
BulkheadConfig defines configuration for the bulkhead pattern
func DefaultBulkheadConfig ¶
func DefaultBulkheadConfig() BulkheadConfig
DefaultBulkheadConfig returns sensible default bulkhead configuration
type BulkheadManager ¶
type BulkheadManager struct {
// contains filtered or unexported fields
}
BulkheadManager implements the Bulkhead pattern for failure isolation It manages resource pools and circuit breakers to prevent cascade failures
func NewBulkheadManager ¶
func NewBulkheadManager(config BulkheadConfig, logger types.Logger) *BulkheadManager
NewBulkheadManager creates a new bulkhead manager
func (*BulkheadManager) Close ¶
func (b *BulkheadManager) Close() error
Close shuts down the bulkhead manager gracefully
func (*BulkheadManager) GetCircuitBreaker ¶
func (b *BulkheadManager) GetCircuitBreaker(service string) types.CircuitBreaker
GetCircuitBreaker returns a circuit breaker for the specified service
func (*BulkheadManager) GetRateLimiter ¶
func (b *BulkheadManager) GetRateLimiter(taskType types.TaskType) types.RateLimiter
GetRateLimiter returns a rate limiter for the specified task type
func (*BulkheadManager) GetResourceLimiter ¶
func (b *BulkheadManager) GetResourceLimiter(taskType types.TaskType) ResourceLimiter
GetResourceLimiter returns a resource limiter for the specified task type
func (*BulkheadManager) GetStats ¶
func (b *BulkheadManager) GetStats() BulkheadStats
GetStats returns comprehensive bulkhead statistics
func (*BulkheadManager) GetSystemResourceUsage ¶
func (b *BulkheadManager) GetSystemResourceUsage() AvailableResources
GetSystemResourceUsage returns current system resource usage
type BulkheadStats ¶
type BulkheadStats struct {
ResourcePools map[string]ResourcePoolStats `json:"resource_pools"`
CircuitBreakers map[string]CircuitBreakerStats `json:"circuit_breakers"`
RateLimiters map[string]RateLimiterStats `json:"rate_limiters"`
SystemUsage AvailableResources `json:"system_usage"`
TotalLimits AvailableResources `json:"total_limits"`
}
BulkheadStats contains comprehensive bulkhead statistics
type CircuitBreakerImpl ¶
type CircuitBreakerImpl struct {
// contains filtered or unexported fields
}
CircuitBreakerImpl implements the CircuitBreaker interface
func NewCircuitBreakerImpl ¶
func NewCircuitBreakerImpl(service string, config ServiceCircuitConfig, logger types.Logger) *CircuitBreakerImpl
NewCircuitBreakerImpl creates a new circuit breaker implementation
func (*CircuitBreakerImpl) Execute ¶
func (cb *CircuitBreakerImpl) Execute(fn func() error) error
Execute runs the function with circuit breaker protection
func (*CircuitBreakerImpl) GetStats ¶
func (cb *CircuitBreakerImpl) GetStats() CircuitBreakerStats
GetStats returns circuit breaker statistics
func (*CircuitBreakerImpl) Reset ¶
func (cb *CircuitBreakerImpl) Reset()
Reset manually resets the circuit breaker
func (*CircuitBreakerImpl) State ¶
func (cb *CircuitBreakerImpl) State() types.CircuitBreakerState
State returns the current circuit breaker state
type CircuitBreakerStats ¶
type CircuitBreakerStats struct {
Service string `json:"service"`
State types.CircuitBreakerState `json:"state"`
TotalRequests int64 `json:"total_requests"`
SuccessfulRequests int64 `json:"successful_requests"`
FailedRequests int64 `json:"failed_requests"`
ConsecutiveFailures int `json:"consecutive_failures"`
FailureRate float64 `json:"failure_rate"`
LastStateChange time.Time `json:"last_state_change"`
}
CircuitBreakerStats contains statistics for a circuit breaker
type CommandStats ¶
type CommandStats struct {
TaskType types.TaskType `json:"task_type"`
ExecutionCount int64 `json:"execution_count"`
ErrorCount int64 `json:"error_count"`
ErrorRate float64 `json:"error_rate"`
TotalDuration time.Duration `json:"total_duration"`
AvgDuration time.Duration `json:"avg_duration"`
}
CommandStats contains execution statistics for a command
type ContextKey ¶
type ContextKey string
WorkerContextKey is a custom type for context keys to avoid collisions
const (
WorkerIDKey ContextKey = "worker_id"
)
type Event ¶
type Event string
Event represents different events in the worker lifecycle
const ( // Worker lifecycle events EventStarted Event = "worker_started" EventStopped Event = "worker_stopped" EventDraining Event = "worker_draining" EventHealthy Event = "worker_healthy" EventUnhealthy Event = "worker_unhealthy" EventRegistered Event = "worker_registered" // Task processing events TaskEventReceived Event = "task_received" TaskEventStarted Event = "task_started" TaskEventCompleted Event = "task_completed" TaskEventFailed Event = "task_failed" TaskEventRetrying Event = "task_retrying" TaskEventTimeout Event = "task_timeout" // Queue monitoring events QueueEventBacklog Event = "queue_backlog" QueueEventEmpty Event = "queue_empty" QueueEventHighLoad Event = "queue_high_load" QueueEventConnection Event = "queue_connection" // Circuit breaker events CircuitBreakerOpened Event = "circuit_breaker_opened" CircuitBreakerClosed Event = "circuit_breaker_closed" CircuitBreakerHalfOpen Event = "circuit_breaker_half_open" )
type EventBus ¶
type EventBus struct {
// contains filtered or unexported fields
}
EventBus implements a centralized event bus for worker events It manages multiple observers and provides event routing
func NewEventBus ¶
NewEventBus creates a new event bus for worker monitoring
func (*EventBus) GetActiveObserverCount ¶
GetActiveObserverCount returns the number of active observers
func (*EventBus) GetObserverCount ¶
GetObserverCount returns the number of registered observers
func (*EventBus) NotifyObservers ¶
NotifyObservers sends an event to all registered observers (async)
func (*EventBus) RegisterObserver ¶
RegisterObserver adds an observer to receive all events
func (*EventBus) RegisterObserverWithFilter ¶
RegisterObserverWithFilter adds an observer that only receives specific events
func (*EventBus) UnregisterObserver ¶
UnregisterObserver removes an observer
type EventData ¶
type EventData struct {
Event Event `json:"event"`
WorkerID string `json:"worker_id"`
Timestamp time.Time `json:"timestamp"`
TaskID string `json:"task_id,omitempty"`
TaskType types.TaskType `json:"task_type,omitempty"`
Queue string `json:"queue,omitempty"`
Error error `json:"error,omitempty"`
Duration time.Duration `json:"duration,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
// Health and resource metrics
MemoryUsageMB float64 `json:"memory_usage_mb,omitempty"`
CPUUsagePercent float64 `json:"cpu_usage_percent,omitempty"`
ActiveTasks int `json:"active_tasks,omitempty"`
CompletedTasks int64 `json:"completed_tasks,omitempty"`
FailedTasks int64 `json:"failed_tasks,omitempty"`
// Queue metrics
QueueDepth int64 `json:"queue_depth,omitempty"`
ActiveWorkers int `json:"active_workers,omitempty"`
}
EventData contains detailed information about a worker event
type HealthMonitorObserver ¶
type HealthMonitorObserver struct {
// contains filtered or unexported fields
}
HealthMonitorObserver monitors worker health and triggers alerts/actions
func NewHealthMonitorObserver ¶
func NewHealthMonitorObserver(id string, logger types.Logger, thresholds HealthThresholds) *HealthMonitorObserver
NewHealthMonitorObserver creates a new health monitoring observer
func (*HealthMonitorObserver) Cleanup ¶
func (h *HealthMonitorObserver) Cleanup(maxAge time.Duration) int
Cleanup removes health state for workers that haven't been seen recently
func (*HealthMonitorObserver) GetAllWorkerHealth ¶
func (h *HealthMonitorObserver) GetAllWorkerHealth() map[string]*HealthState
GetAllWorkerHealth returns health states for all tracked workers
func (*HealthMonitorObserver) GetObserverID ¶
func (h *HealthMonitorObserver) GetObserverID() string
GetObserverID returns the unique identifier for this observer
func (*HealthMonitorObserver) GetWorkerHealth ¶
func (h *HealthMonitorObserver) GetWorkerHealth(workerID string) *HealthState
GetWorkerHealth returns the health state for a specific worker
func (*HealthMonitorObserver) IsActive ¶
func (h *HealthMonitorObserver) IsActive() bool
IsActive returns whether this observer is currently active
func (*HealthMonitorObserver) OnWorkerEvent ¶
func (h *HealthMonitorObserver) OnWorkerEvent(_ context.Context, data *EventData)
OnWorkerEvent processes worker events to monitor health
func (*HealthMonitorObserver) SetActive ¶
func (h *HealthMonitorObserver) SetActive(active bool)
SetActive enables or disables this observer
func (*HealthMonitorObserver) SetCallbacks ¶
func (h *HealthMonitorObserver) SetCallbacks( onUnhealthy func(string, *HealthState), onRecovered func(string, *HealthState), onHighFailure func(string, float64), )
SetCallbacks sets callback functions for health events
type HealthState ¶
type HealthState struct {
WorkerID string
LastSeen time.Time
Status types.WorkerStatus
ConsecutiveFailures int
TotalTasks int64
FailedTasks int64
SuccessTasks int64
AvgTaskDuration time.Duration
MemoryUsageMB float64
CPUUsagePercent float64
ActiveTasks int
// Health indicators
IsHealthy bool
LastHealthCheck time.Time
HealthScore float64 // 0.0 = unhealthy, 1.0 = perfect health
}
WorkerHealthState tracks the health state of a worker
type HealthThresholds ¶
type HealthThresholds struct {
MaxConsecutiveFailures int // Max consecutive failures before unhealthy
MaxFailureRate float64 // Max failure rate (0.0-1.0) before unhealthy
HeartbeatTimeout time.Duration // Max time without heartbeat
MaxMemoryMB float64 // Max memory usage before warning
MaxCPUPercent float64 // Max CPU usage before warning
MaxTaskDuration time.Duration // Max avg task duration before warning
}
HealthThresholds defines thresholds for health monitoring
func DefaultHealthThresholds ¶
func DefaultHealthThresholds() HealthThresholds
DefaultHealthThresholds returns sensible default health thresholds
type Instance ¶
type Instance struct {
// contains filtered or unexported fields
}
Instance represents a single worker in the pool
type IsolatedTaskCommand ¶
type IsolatedTaskCommand struct {
TaskCommand
// contains filtered or unexported fields
}
IsolatedTaskCommand wraps a task command with additional isolation features It implements the Decorator pattern to add isolation capabilities
func NewIsolatedTaskCommand ¶
func NewIsolatedTaskCommand( baseCommand TaskCommand, resourceLimiter ResourceLimiter, circuitBreaker types.CircuitBreaker, rateLimiter types.RateLimiter, logger types.Logger, ) *IsolatedTaskCommand
NewIsolatedTaskCommand creates a new isolated task command
func (*IsolatedTaskCommand) Execute ¶
func (i *IsolatedTaskCommand) Execute(ctx context.Context, task *types.Task) (*types.TaskResult, error)
Execute processes the task with isolation features
func (*IsolatedTaskCommand) GetIsolationStats ¶
func (i *IsolatedTaskCommand) GetIsolationStats() IsolationStats
GetIsolationStats returns isolation-related statistics
type IsolationStats ¶
type IsolationStats struct {
TaskType types.TaskType `json:"task_type"`
IsolationViolations int64 `json:"isolation_violations"`
RateLimitHits int64 `json:"rate_limit_hits"`
CircuitBreakerHits int64 `json:"circuit_breaker_hits"`
ResourceLimitsEnabled bool `json:"resource_limits_enabled"`
CircuitBreakerEnabled bool `json:"circuit_breaker_enabled"`
RateLimitEnabled bool `json:"rate_limit_enabled"`
}
IsolationStats contains isolation-related statistics
type MetricsObserver ¶
type MetricsObserver struct {
// contains filtered or unexported fields
}
MetricsObserver implements Observer to collect metrics from worker events It integrates with the MetricsCollector interface from types
func NewMetricsObserver ¶
func NewMetricsObserver(id string, collector types.MetricsCollector, logger types.Logger) *MetricsObserver
NewMetricsObserver creates a new metrics observer
func (*MetricsObserver) GetObserverID ¶
func (m *MetricsObserver) GetObserverID() string
GetObserverID returns the unique identifier for this observer
func (*MetricsObserver) GetStats ¶
func (m *MetricsObserver) GetStats() map[string]interface{}
GetStats returns statistics about this metrics observer
func (*MetricsObserver) IsActive ¶
func (m *MetricsObserver) IsActive() bool
IsActive returns whether this observer is currently active
func (*MetricsObserver) OnWorkerEvent ¶
func (m *MetricsObserver) OnWorkerEvent(_ context.Context, data *EventData)
OnWorkerEvent processes worker events and collects relevant metrics
func (*MetricsObserver) SetActive ¶
func (m *MetricsObserver) SetActive(active bool)
SetActive enables or disables this observer
type Observer ¶
type Observer interface {
// OnWorkerEvent is called when a worker event occurs
OnWorkerEvent(ctx context.Context, data *EventData)
// GetObserverID returns a unique identifier for this observer
GetObserverID() string
// IsActive returns whether this observer is currently active
IsActive() bool
}
Observer defines the interface for observing worker events This enables the Observer pattern for monitoring and reacting to worker state changes
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
WorkerPool implements the main worker pool with lifecycle management It orchestrates workers, observes their behavior, and manages graceful shutdown
func NewWorkerPool ¶
func NewWorkerPool( id string, config *types.WorkerConfig, queueBackend types.QueueBackend, metricsCollector types.MetricsCollector, logger types.Logger, ) (*Pool, error)
NewWorkerPool creates a new worker pool
func (*Pool) RegisterTaskProcessor ¶
RegisterTaskProcessor registers a task processor with the worker pool
type PoolInfo ¶
type PoolInfo struct {
ID string `json:"id"`
Hostname string `json:"hostname"`
State PoolState `json:"state"`
WorkerCount int `json:"worker_count"`
Concurrency int `json:"concurrency"`
Queues []string `json:"queues"`
SupportedTypes []types.TaskType `json:"supported_types"`
Capabilities []string `json:"capabilities"`
StartTime time.Time `json:"start_time"`
LastHeartbeat time.Time `json:"last_heartbeat"`
ResourceUsage AvailableResources `json:"resource_usage"`
}
WorkerPoolInfo contains information about the worker pool
type RateLimitSettings ¶
type RateLimitSettings struct {
RequestsPerSecond int `json:"requests_per_second"`
BurstSize int `json:"burst_size"`
WindowSize time.Duration `json:"window_size"`
}
RateLimitSettings defines rate limiting configuration
type RateLimiterImpl ¶
type RateLimiterImpl struct {
// contains filtered or unexported fields
}
RateLimiterImpl implements a token bucket rate limiter
func NewRateLimiterImpl ¶
func NewRateLimiterImpl(key string, config RateLimitSettings, logger types.Logger) *RateLimiterImpl
NewRateLimiterImpl creates a new rate limiter implementation
func (*RateLimiterImpl) GetStats ¶
func (rl *RateLimiterImpl) GetStats() RateLimiterStats
GetStats returns rate limiter statistics
type RateLimiterStats ¶
type RateLimiterStats struct {
Key string `json:"key"`
TotalRequests int64 `json:"total_requests"`
AllowedRequests int64 `json:"allowed_requests"`
RejectedRequests int64 `json:"rejected_requests"`
CurrentRate float64 `json:"current_rate"`
BurstCapacity int `json:"burst_capacity"`
WindowSize time.Duration `json:"window_size"`
}
RateLimiterStats contains statistics for a rate limiter
type ResourceLimiter ¶
type ResourceLimiter interface {
// AcquireResources attempts to acquire resources for task execution
AcquireResources(ctx context.Context, requirements ResourceRequirements) (ResourceToken, error)
// GetAvailableResources returns currently available resources
GetAvailableResources() AvailableResources
}
ResourceLimiter defines interface for resource limiting
type ResourcePool ¶
type ResourcePool struct {
ID string
// contains filtered or unexported fields
}
ResourcePool implements resource management for the Bulkhead pattern It provides isolated resource allocation with configurable limits
func NewResourcePool ¶
func NewResourcePool(id string, config ResourcePoolConfig, logger types.Logger) *ResourcePool
NewResourcePool creates a new resource pool
func (*ResourcePool) AcquireResources ¶
func (p *ResourcePool) AcquireResources(_ context.Context, requirements ResourceRequirements) (ResourceToken, error)
AcquireResources attempts to acquire resources for task execution
func (*ResourcePool) CleanupStaleTokens ¶
func (p *ResourcePool) CleanupStaleTokens(cutoff time.Time) int
CleanupStaleTokens removes tokens that are older than the specified cutoff
func (*ResourcePool) Close ¶
func (p *ResourcePool) Close()
Close gracefully shuts down the resource pool
func (*ResourcePool) GetAvailableResources ¶
func (p *ResourcePool) GetAvailableResources() AvailableResources
GetAvailableResources returns currently available resources
func (*ResourcePool) GetCurrentUsage ¶
func (p *ResourcePool) GetCurrentUsage() AvailableResources
GetCurrentUsage returns current resource usage
func (*ResourcePool) GetStats ¶
func (p *ResourcePool) GetStats() ResourcePoolStats
GetStats returns resource pool statistics
type ResourcePoolConfig ¶
type ResourcePoolConfig struct {
MaxMemoryMB int `json:"max_memory_mb"`
MaxCPUPercent int `json:"max_cpu_percent"`
MaxConcurrentTasks int `json:"max_concurrent_tasks"`
TaskTimeout time.Duration `json:"task_timeout"`
Priority int `json:"priority"` // Higher priority gets more resources during contention
}
ResourcePoolConfig defines configuration for a resource pool
type ResourcePoolStats ¶
type ResourcePoolStats struct {
PoolID string `json:"pool_id"`
ActiveTokens int `json:"active_tokens"`
TotalRequests int64 `json:"total_requests"`
RejectedRequests int64 `json:"rejected_requests"`
CurrentUsage AvailableResources `json:"current_usage"`
MaxUsage ResourcePoolConfig `json:"max_usage"`
AverageWaitTime time.Duration `json:"average_wait_time"`
}
ResourcePoolStats contains statistics for a resource pool
type ResourceRequirements ¶
type ResourceRequirements struct {
MaxMemoryMB int `json:"max_memory_mb"`
MaxCPUPercent int `json:"max_cpu_percent"`
MaxDuration time.Duration `json:"max_duration"`
RequiresGPU bool `json:"requires_gpu"`
RequiresNetwork bool `json:"requires_network"`
Priority int `json:"priority"` // Higher priority gets more resources
}
ResourceRequirements defines resource requirements for task execution
type ResourceToken ¶
type ResourceToken interface {
// Release returns the acquired resources
Release()
// GetAcquiredResources returns the resources acquired by this token
GetAcquiredResources() ResourceRequirements
}
ResourceToken represents acquired resources that must be released
type ResourceTokenImpl ¶
type ResourceTokenImpl struct {
ID string
// contains filtered or unexported fields
}
ResourceTokenImpl implements ResourceToken interface
func (*ResourceTokenImpl) GetAcquiredResources ¶
func (t *ResourceTokenImpl) GetAcquiredResources() ResourceRequirements
GetAcquiredResources returns the resources acquired by this token
func (*ResourceTokenImpl) Release ¶
func (t *ResourceTokenImpl) Release()
Release returns the acquired resources to the pool
type ServiceCircuitConfig ¶
type ServiceCircuitConfig struct {
Threshold int `json:"threshold"` // failures before opening
Timeout time.Duration `json:"timeout"` // how long to stay open
MaxRequests int `json:"max_requests"` // max requests in half-open
ResetTimeout time.Duration `json:"reset_timeout"` // time to reset counters
FailureThreshold float64 `json:"failure_threshold"` // failure rate threshold (0.0-1.0)
MinRequestCount int `json:"min_request_count"` // minimum requests before checking failure rate
}
ServiceCircuitConfig defines circuit breaker settings for a service
type Subject ¶
type Subject interface {
// RegisterObserver adds an observer to receive events
RegisterObserver(observer Observer)
// UnregisterObserver removes an observer
UnregisterObserver(observerID string)
// NotifyObservers sends an event to all registered observers
NotifyObservers(ctx context.Context, event Event, data *EventData)
}
Subject defines the interface for objects that can be observed Workers implement this interface to support the Observer pattern
type TaskCommand ¶
type TaskCommand interface {
// Execute processes the task and returns a result
Execute(ctx context.Context, task *types.Task) (*types.TaskResult, error)
// GetTaskType returns the task type this command handles
GetTaskType() types.TaskType
// GetCapabilities returns additional capabilities this command provides
GetCapabilities() []string
// Validate checks if the task can be processed by this command
Validate(task *types.Task) error
// GetResourceRequirements returns resource requirements for execution
GetResourceRequirements() ResourceRequirements
// SupportsTask checks if this command can handle the given task
SupportsTask(task *types.Task) bool
}
TaskCommand represents a command in the Command pattern for task execution This provides a consistent interface for executing different types of tasks with proper isolation, timeout handling, and resource management
type TaskCommandRegistry ¶
type TaskCommandRegistry struct {
// contains filtered or unexported fields
}
TaskCommandRegistry manages task commands and provides command resolution It implements the Registry pattern for pluggable task processors
func NewTaskCommandRegistry ¶
func NewTaskCommandRegistry(logger types.Logger) *TaskCommandRegistry
NewTaskCommandRegistry creates a new command registry
func (*TaskCommandRegistry) GetAllCapabilities ¶
func (r *TaskCommandRegistry) GetAllCapabilities() []string
GetAllCapabilities returns all available capabilities
func (*TaskCommandRegistry) GetCommand ¶
func (r *TaskCommandRegistry) GetCommand(taskType types.TaskType) (TaskCommand, error)
GetCommand retrieves a command for the specified task type
func (*TaskCommandRegistry) GetCommandForTask ¶
func (r *TaskCommandRegistry) GetCommandForTask(task *types.Task) (TaskCommand, error)
GetCommandForTask finds the best command to handle a specific task
func (*TaskCommandRegistry) GetCommandsByCapability ¶
func (r *TaskCommandRegistry) GetCommandsByCapability(capability string) []TaskCommand
GetCommandsByCapability returns all commands that support a specific capability
func (*TaskCommandRegistry) GetSupportedTypes ¶
func (r *TaskCommandRegistry) GetSupportedTypes() []types.TaskType
GetSupportedTypes returns all supported task types
func (*TaskCommandRegistry) RegisterCommand ¶
func (r *TaskCommandRegistry) RegisterCommand(cmd TaskCommand) error
RegisterCommand registers a new task command
func (*TaskCommandRegistry) UnregisterCommand ¶
func (r *TaskCommandRegistry) UnregisterCommand(taskType types.TaskType)
UnregisterCommand removes a task command
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker implements the types.Worker interface with enhanced functionality It processes tasks using the command pattern and reports to observers
func NewWorker ¶
func NewWorker( id string, config *types.WorkerConfig, queueBackend types.QueueBackend, commandRegistry *TaskCommandRegistry, eventBus *EventBus, logger types.Logger, ) *Worker
NewWorker creates a new worker instance
func (*Worker) GetInfo ¶
func (w *Worker) GetInfo() *types.WorkerInfo
GetInfo returns current worker information
func (*Worker) RegisterProcessor ¶
RegisterProcessor adds a task processor for specific task types