Documentation
¶
Index ¶
- Variables
- func GetAwaitResponse(ctx context.Context) (string, bool)
- func GetConnection(addr string, config TLSConfig) (net.Conn, error)
- func GetConsumerID(ctx context.Context) (string, bool)
- func GetContentType(ctx context.Context) (string, bool)
- func GetHeader(ctx context.Context, key string) (string, bool)
- func GetHeaders(ctx context.Context) (storage.IMap[string, string], bool)
- func GetPublisherID(ctx context.Context) (string, bool)
- func GetQueue(ctx context.Context) (string, bool)
- func GetTriggerNode(ctx context.Context) (string, bool)
- func HeadersWithConsumerID(ctx context.Context, id string) map[string]string
- func HeadersWithConsumerIDAndQueue(ctx context.Context, id, queue string) map[string]string
- func IsClosed(conn net.Conn) bool
- func NewID() string
- func RecoverPanic(labelGenerator func() string)
- func RecoverTitle() string
- func SetHeaders(ctx context.Context, headers map[string]string) context.Context
- func WithHeaders(ctx context.Context, headers map[string]string) map[string]string
- func WrapError(err error, msg, op string) error
- type ActiveAlert
- type AlertManager
- type AlertNotifier
- type AlertRule
- type AlertStatus
- type Broker
- func (b *Broker) AddConsumer(ctx context.Context, queueName string, conn net.Conn) string
- func (b *Broker) AdjustConsumerWorkers(noOfWorkers int, consumerID ...string)
- func (b *Broker) Authenticate(ctx context.Context, credentials map[string]string) error
- func (b *Broker) Authorize(ctx context.Context, role string, action string) error
- func (b *Broker) Close() error
- func (b *Broker) HandleCallback(ctx context.Context, msg *codec.Message)
- func (b *Broker) MessageAck(ctx context.Context, msg *codec.Message)
- func (b *Broker) MessageDeny(ctx context.Context, msg *codec.Message)
- func (b *Broker) MessageResponseHandler(ctx context.Context, msg *codec.Message)
- func (b *Broker) NewQueue(name string) *Queue
- func (b *Broker) NewQueueWithConfig(name string, opts ...QueueOption) *Queue
- func (b *Broker) NewQueueWithOrdering(name string) *Queue
- func (b *Broker) NotifyHandler() func(context.Context, Result) error
- func (b *Broker) OnClose(ctx context.Context, conn net.Conn) error
- func (b *Broker) OnConsumerPause(ctx context.Context, _ *codec.Message)
- func (b *Broker) OnConsumerResume(ctx context.Context, _ *codec.Message)
- func (b *Broker) OnConsumerStop(ctx context.Context, _ *codec.Message)
- func (b *Broker) OnConsumerUpdated(ctx context.Context, msg *codec.Message)
- func (b *Broker) OnError(_ context.Context, conn net.Conn, err error)
- func (b *Broker) OnMessage(ctx context.Context, msg *codec.Message, conn net.Conn)
- func (b *Broker) Options() *Options
- func (b *Broker) PauseConsumer(ctx context.Context, consumerID string, queues ...string)
- func (b *Broker) Publish(ctx context.Context, task *Task, queue string) error
- func (b *Broker) PublishHandler(ctx context.Context, conn net.Conn, msg *codec.Message)
- func (b *Broker) RemoveConsumer(consumerID string, queues ...string)
- func (b *Broker) ReprocessDLQ(queueName string) error
- func (b *Broker) ResumeConsumer(ctx context.Context, consumerID string, queues ...string)
- func (b *Broker) SetNotifyHandler(callback Callback)
- func (b *Broker) SetURL(url string)
- func (b *Broker) Start(ctx context.Context) error
- func (b *Broker) StartEnhanced(ctx context.Context) error
- func (b *Broker) StopConsumer(ctx context.Context, consumerID string, queues ...string)
- func (b *Broker) StopEnhanced() error
- func (b *Broker) SubscribeHandler(ctx context.Context, conn net.Conn, msg *codec.Message)
- func (b *Broker) SyncMode() bool
- func (b *Broker) TLSConfig() TLSConfig
- func (b *Broker) URL() string
- func (b *Broker) UpdateConsumer(ctx context.Context, consumerID string, config DynamicConfig, queues ...string) error
- type BrokerConfig
- type BrokerConnection
- type Callback
- type CircuitBreaker
- type CircuitBreakerConfig
- type CircuitState
- type ClusteringConfig
- type CompletionCallback
- type ConfigManager
- func (cm *ConfigManager) AddWatcher(watcher ConfigWatcher)
- func (cm *ConfigManager) GetConfig() *ProductionConfig
- func (cm *ConfigManager) LoadConfig() error
- func (cm *ConfigManager) RemoveWatcher(watcher ConfigWatcher)
- func (cm *ConfigManager) SaveConfig() error
- func (cm *ConfigManager) StartWatching(ctx context.Context, interval time.Duration)
- func (cm *ConfigManager) UpdateConfig(newConfig *ProductionConfig) error
- type ConfigWatcher
- type ConnectionPool
- type Consumer
- func (c *Consumer) Close() error
- func (c *Consumer) Conn() net.Conn
- func (c *Consumer) Consume(ctx context.Context) error
- func (c *Consumer) ConsumeMessage(ctx context.Context, msg *codec.Message, conn net.Conn)
- func (c *Consumer) GetKey() string
- func (c *Consumer) GetType() string
- func (c *Consumer) Metrics() Metrics
- func (c *Consumer) OnClose(_ context.Context, _ net.Conn) error
- func (c *Consumer) OnError(_ context.Context, conn net.Conn, err error)
- func (c *Consumer) OnMessage(ctx context.Context, msg *codec.Message, conn net.Conn) error
- func (c *Consumer) OnResponse(ctx context.Context, result Result) error
- func (c *Consumer) Pause(ctx context.Context) error
- func (c *Consumer) ProcessTask(ctx context.Context, msg *Task) Result
- func (c *Consumer) Resume(ctx context.Context) error
- func (c *Consumer) SetKey(key string)
- func (c *Consumer) StartHTTPAPI() (int, error)
- func (c *Consumer) Stop(ctx context.Context) error
- func (c *Consumer) Update(ctx context.Context, payload []byte) error
- type ConsumerConfig
- type CronSchedule
- type DeadLetterQueue
- type DefaultPlugin
- type DetailedMetricsRegistry
- func (mr *DetailedMetricsRegistry) GetAllMetrics() map[string]*TimeSeries
- func (mr *DetailedMetricsRegistry) GetMetric(name string) (*TimeSeries, bool)
- func (mr *DetailedMetricsRegistry) RecordValue(name string, value float64)
- func (mr *DetailedMetricsRegistry) RegisterMetric(name string, metricType MetricType, description string, ...)
- type DiskSpaceHealthCheck
- type DistributedLocker
- type DynamicConfig
- type EnhancedCircuitBreaker
- type ExecutionHistory
- type FormattedMetrics
- type GoRoutineHealthCheck
- type Handler
- type HealthCheck
- type HealthCheckResult
- type HealthChecker
- type HealthStatus
- type HealthThresholds
- type InMemoryMessageStore
- func (ims *InMemoryMessageStore) Cleanup(olderThan time.Time) error
- func (ims *InMemoryMessageStore) Count(queue string) (int64, error)
- func (ims *InMemoryMessageStore) Delete(id string) error
- func (ims *InMemoryMessageStore) List(queue string, limit int, offset int) ([]*StoredMessage, error)
- func (ims *InMemoryMessageStore) Retrieve(id string) (*StoredMessage, error)
- func (ims *InMemoryMessageStore) Store(msg *StoredMessage) error
- type InMemoryMetricsRegistry
- type LockEntry
- type LogNotifier
- type MemoryHealthCheck
- type MemoryTaskStorage
- func (m *MemoryTaskStorage) CleanupExpiredTasks() error
- func (m *MemoryTaskStorage) DeleteTask(taskID string) error
- func (m *MemoryTaskStorage) FetchNextTask() (*QueueTask, error)
- func (m *MemoryTaskStorage) GetAllTasks() ([]*QueueTask, error)
- func (m *MemoryTaskStorage) GetTask(taskID string) (*QueueTask, error)
- func (m *MemoryTaskStorage) SaveTask(task *QueueTask) error
- type MessageStore
- type Metric
- type MetricType
- type Metrics
- type MetricsCollector
- type MetricsRegistry
- type MetricsServer
- type MonitoringConfig
- type Option
- func DisableBrokerRateLimit() Option
- func DisableConsumerRateLimit() Option
- func WithBrokerRateLimiter(rate int, burst int) Option
- func WithBrokerURL(url string) Option
- func WithCAPath(caPath string) Option
- func WithCallback(val ...func(context.Context, Result) Result) Option
- func WithCleanTaskOnComplete() Option
- func WithConsumerOnClose(handler func(ctx context.Context, topic, consumerName string)) Option
- func WithConsumerOnSubscribe(handler func(ctx context.Context, topic, consumerName string)) Option
- func WithConsumerRateLimiter(rate int, burst int) Option
- func WithConsumerTimeout(timeout time.Duration) Option
- func WithHTTPApi(flag bool) Option
- func WithInitialDelay(val time.Duration) Option
- func WithJitterPercent(val float64) Option
- func WithLogger(log logger.Logger) Option
- func WithMaxBackoff(val time.Duration) Option
- func WithMaxRetries(val int) Option
- func WithNotifyResponse(callback Callback) Option
- func WithRespondPendingResult(mode bool) Option
- func WithSyncMode(mode bool) Option
- func WithTLS(enableTLS bool, certPath, keyPath string) Option
- func WithWorkerPool(queueSize, numOfWorkers int, maxMemoryLoad int64) Option
- type Options
- func (o *Options) BrokerAddr() string
- func (o *Options) CleanTaskOnComplete() bool
- func (o *Options) ConsumerTimeout() time.Duration
- func (o *Options) HTTPApi() bool
- func (o *Options) Logger() logger.Logger
- func (o *Options) MaxMemoryLoad() int64
- func (o *Options) NumOfWorkers() int
- func (o *Options) QueueSize() int
- func (o *Options) SetSyncMode(sync bool)
- func (o *Options) Storage() TaskStorage
- type PersistenceConfig
- type Plugin
- type Pool
- func (wp *Pool) AddScheduledMetrics(total int)
- func (wp *Pool) AdjustWorkerCount(newWorkerCount int)
- func (wp *Pool) DLQ() *DeadLetterQueue
- func (wp *Pool) Dispatch(event func())
- func (wp *Pool) EnqueueTask(ctx context.Context, payload *Task, priority int) error
- func (wp *Pool) FormattedMetrics() FormattedMetrics
- func (wp *Pool) Init()
- func (wp *Pool) Metrics() Metrics
- func (wp *Pool) Pause()
- func (wp *Pool) Resume()
- func (wp *Pool) SetBatchSize(size int)
- func (wp *Pool) Start(numWorkers int)
- func (wp *Pool) Stop()
- func (wp *Pool) UpdateConfig(newConfig *DynamicConfig) error
- type PoolConfig
- type PoolOption
- func WithBatchSize(batchSize int) PoolOption
- func WithCircuitBreaker(config CircuitBreakerConfig) PoolOption
- func WithCompletionCallback(callback func()) PoolOption
- func WithDiagnostics(enabled bool) PoolOption
- func WithGracefulShutdown(timeout time.Duration) PoolOption
- func WithHandler(handler Handler) PoolOption
- func WithMaxMemoryLoad(maxMemoryLoad int64) PoolOption
- func WithMetricsRegistry(registry MetricsRegistry) PoolOption
- func WithPlugin(plugin Plugin) PoolOption
- func WithPoolCallback(callback Callback) PoolOption
- func WithTaskQueueSize(size int) PoolOption
- func WithTaskStorage(storage TaskStorage) PoolOption
- func WithTaskTimeout(t time.Duration) PoolOption
- func WithWarningThresholds(thresholds ThresholdConfig) PoolOption
- type PriorityQueue
- type Processor
- type ProductionConfig
- type Publisher
- type PublisherConfig
- type Queue
- type QueueConfig
- type QueueMetrics
- type QueueOption
- type QueueTask
- type QueuedTask
- type RateLimitConfig
- type RateLimiter
- type Result
- type Schedule
- type ScheduleOptions
- type ScheduledTask
- type Scheduler
- func (s *Scheduler) AddTask(ctx context.Context, payload *Task, opts ...SchedulerOption) string
- func (s *Scheduler) Close() error
- func (s *Scheduler) ListScheduledTasks() []TaskInfo
- func (s *Scheduler) PrintAllTasks()
- func (s *Scheduler) PrintExecutionHistory(id string) error
- func (s *Scheduler) RemoveTask(id string) error
- func (s *Scheduler) Start()
- func (s *Scheduler) UpdateTask(id string, newSched *Schedule) error
- type SchedulerConfig
- type SchedulerOpt
- type SchedulerOption
- func WithInterval(interval time.Duration) SchedulerOption
- func WithOverlap() SchedulerOption
- func WithRecurring() SchedulerOption
- func WithScheduleSpec(spec string) SchedulerOption
- func WithSchedulerCallback(callback Callback) SchedulerOption
- func WithSchedulerHandler(handler Handler) SchedulerOption
- type SecurityConfig
- type Status
- type StoredMessage
- type SystemHealthChecker
- type TLSConfig
- type Task
- type TaskInfo
- type TaskOption
- func WithDAG(dag any) TaskOption
- func WithDedupKey(key string) TaskOption
- func WithExpiry(expiry time.Time) TaskOption
- func WithPriority(priority int) TaskOption
- func WithTTL(ttl time.Duration) TaskOption
- func WithTags(tags map[string]string) TaskOption
- func WithTaskHeaders(headers map[string]string) TaskOption
- func WithTaskMaxRetries(maxRetries int) TaskOption
- func WithTraceID(traceID string) TaskOption
- type TaskStorage
- type ThresholdConfig
- type TimeSeries
- type TimeSeriesPoint
- type WarningThresholds
Constants ¶
This section is empty.
Variables ¶
var BrokerAddr string
var Config = &DynamicConfig{ Timeout: 10 * time.Second, BatchSize: 1, MaxMemoryLoad: 100 * 1024 * 1024, IdleTimeout: 5 * time.Minute, BackoffDuration: 2 * time.Second, MaxRetries: 3, ReloadInterval: 30 * time.Second, WarningThreshold: WarningThresholds{ HighMemory: 1 * 1024 * 1024, LongExecution: 2 * time.Second, }, NumberOfWorkers: 5, }
var Logger = log.DefaultLogger
Functions ¶
func GetConnection ¶
Modified GetConnection: reuse existing connection if valid.
func HeadersWithConsumerID ¶
func RecoverPanic ¶
func RecoverPanic(labelGenerator func() string)
func RecoverTitle ¶
func RecoverTitle() string
func WithHeaders ¶
Types ¶
type ActiveAlert ¶ added in v0.0.16
type ActiveAlert struct { Rule AlertRule `json:"rule"` Value float64 `json:"value"` StartsAt time.Time `json:"starts_at"` EndsAt *time.Time `json:"ends_at,omitempty"` Labels map[string]string `json:"labels"` Annotations map[string]string `json:"annotations"` Status AlertStatus `json:"status"` }
ActiveAlert represents an active alert
type AlertManager ¶ added in v0.0.16
type AlertManager struct {
// contains filtered or unexported fields
}
AlertManager manages alerts and notifications
func NewAlertManager ¶ added in v0.0.16
func NewAlertManager(logger logger.Logger) *AlertManager
NewAlertManager creates a new alert manager
func (*AlertManager) AddNotifier ¶ added in v0.0.16
func (am *AlertManager) AddNotifier(notifier AlertNotifier)
AddNotifier adds an alert notifier
func (*AlertManager) AddRule ¶ added in v0.0.16
func (am *AlertManager) AddRule(rule AlertRule)
AddRule adds an alert rule
func (*AlertManager) EvaluateRules ¶ added in v0.0.16
func (am *AlertManager) EvaluateRules(registry *DetailedMetricsRegistry)
EvaluateRules evaluates all alert rules against current metrics
type AlertNotifier ¶ added in v0.0.16
type AlertNotifier interface { Notify(ctx context.Context, alert ActiveAlert) error Name() string }
AlertNotifier interface for alert notifications
type AlertRule ¶ added in v0.0.16
type AlertRule struct { Name string `json:"name"` Metric string `json:"metric"` Condition string `json:"condition"` // "gt", "lt", "eq", "gte", "lte" Threshold float64 `json:"threshold"` Duration time.Duration `json:"duration"` Labels map[string]string `json:"labels"` Annotations map[string]string `json:"annotations"` Enabled bool `json:"enabled"` }
AlertRule defines conditions for triggering alerts
type AlertStatus ¶ added in v0.0.16
type AlertStatus string
AlertStatus represents the status of an alert
const ( AlertStatusFiring AlertStatus = "firing" AlertStatusResolved AlertStatus = "resolved" AlertStatusSilenced AlertStatus = "silenced" )
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
func (*Broker) AddConsumer ¶
func (*Broker) AdjustConsumerWorkers ¶ added in v0.0.11
func (*Broker) Authenticate ¶ added in v0.0.16
Add authentication and authorization for publishers and consumers
func (*Broker) HandleCallback ¶
func (*Broker) MessageResponseHandler ¶
func (*Broker) NewQueueWithConfig ¶ added in v0.0.16
func (b *Broker) NewQueueWithConfig(name string, opts ...QueueOption) *Queue
NewQueueWithConfig creates a queue with specific configuration
func (*Broker) NewQueueWithOrdering ¶ added in v0.0.16
Ensure message ordering in task queues
func (*Broker) OnConsumerPause ¶
func (*Broker) OnConsumerResume ¶
func (*Broker) OnConsumerStop ¶
func (*Broker) OnConsumerUpdated ¶ added in v0.0.11
func (*Broker) PauseConsumer ¶
func (*Broker) PublishHandler ¶
func (*Broker) RemoveConsumer ¶
func (*Broker) ReprocessDLQ ¶ added in v0.0.16
Add advanced dead-letter queue management
func (*Broker) ResumeConsumer ¶
func (*Broker) SetNotifyHandler ¶
func (*Broker) StartEnhanced ¶ added in v0.0.16
Enhanced Start method with production features
func (*Broker) StopConsumer ¶
func (*Broker) StopEnhanced ¶ added in v0.0.16
Enhanced Stop method with graceful shutdown
func (*Broker) SubscribeHandler ¶
func (*Broker) UpdateConsumer ¶ added in v0.0.11
type BrokerConfig ¶ added in v0.0.16
type BrokerConfig struct { Address string `json:"address"` Port int `json:"port"` MaxConnections int `json:"max_connections"` ConnectionTimeout time.Duration `json:"connection_timeout"` ReadTimeout time.Duration `json:"read_timeout"` WriteTimeout time.Duration `json:"write_timeout"` IdleTimeout time.Duration `json:"idle_timeout"` KeepAlive bool `json:"keep_alive"` KeepAlivePeriod time.Duration `json:"keep_alive_period"` MaxQueueDepth int `json:"max_queue_depth"` EnableDeadLetter bool `json:"enable_dead_letter"` DeadLetterMaxRetries int `json:"dead_letter_max_retries"` EnableMetrics bool `json:"enable_metrics"` MetricsInterval time.Duration `json:"metrics_interval"` GracefulShutdown time.Duration `json:"graceful_shutdown"` MessageTTL time.Duration `json:"message_ttl"` Headers map[string]string `json:"headers"` }
BrokerConfig contains broker-specific configuration
func (*BrokerConfig) UnmarshalJSON ¶ added in v0.0.16
func (b *BrokerConfig) UnmarshalJSON(data []byte) error
type BrokerConnection ¶ added in v0.0.16
type BrokerConnection struct {
// contains filtered or unexported fields
}
BrokerConnection represents a single broker connection
type CircuitBreaker ¶ added in v0.0.16
type CircuitBreakerConfig ¶ added in v0.0.11
type CircuitState ¶ added in v0.0.16
type CircuitState int
CircuitState represents the state of a circuit breaker
const ( CircuitClosed CircuitState = iota CircuitOpen CircuitHalfOpen )
type ClusteringConfig ¶ added in v0.0.16
type ClusteringConfig struct { EnableClustering bool `json:"enable_clustering"` NodeID string `json:"node_id"` ClusterNodes []string `json:"cluster_nodes"` DiscoveryMethod string `json:"discovery_method"` // "static", "consul", "etcd", "k8s" DiscoveryEndpoint string `json:"discovery_endpoint"` HeartbeatInterval time.Duration `json:"heartbeat_interval"` ElectionTimeout time.Duration `json:"election_timeout"` EnableLoadBalancing bool `json:"enable_load_balancing"` LoadBalancingStrategy string `json:"load_balancing_strategy"` // "round_robin", "least_connections", "hash" EnableFailover bool `json:"enable_failover"` FailoverTimeout time.Duration `json:"failover_timeout"` EnableReplication bool `json:"enable_replication"` ReplicationFactor int `json:"replication_factor"` ConsistencyLevel string `json:"consistency_level"` // "weak", "strong", "eventual" }
ClusteringConfig contains clustering configuration
func (*ClusteringConfig) UnmarshalJSON ¶ added in v0.0.16
func (c *ClusteringConfig) UnmarshalJSON(data []byte) error
type CompletionCallback ¶
type CompletionCallback func()
CompletionCallback is called when the pool completes a graceful shutdown.
type ConfigManager ¶ added in v0.0.16
type ConfigManager struct {
// contains filtered or unexported fields
}
ConfigManager handles dynamic configuration management
func NewConfigManager ¶ added in v0.0.16
func NewConfigManager(configFile string, logger logger.Logger) *ConfigManager
NewConfigManager creates a new configuration manager
func (*ConfigManager) AddWatcher ¶ added in v0.0.16
func (cm *ConfigManager) AddWatcher(watcher ConfigWatcher)
AddWatcher adds a configuration watcher
func (*ConfigManager) GetConfig ¶ added in v0.0.16
func (cm *ConfigManager) GetConfig() *ProductionConfig
GetConfig returns a copy of the current configuration
func (*ConfigManager) LoadConfig ¶ added in v0.0.16
func (cm *ConfigManager) LoadConfig() error
LoadConfig loads configuration from file
func (*ConfigManager) RemoveWatcher ¶ added in v0.0.16
func (cm *ConfigManager) RemoveWatcher(watcher ConfigWatcher)
RemoveWatcher removes a configuration watcher
func (*ConfigManager) SaveConfig ¶ added in v0.0.16
func (cm *ConfigManager) SaveConfig() error
SaveConfig saves current configuration to file
func (*ConfigManager) StartWatching ¶ added in v0.0.16
func (cm *ConfigManager) StartWatching(ctx context.Context, interval time.Duration)
StartWatching starts watching for configuration changes
func (*ConfigManager) UpdateConfig ¶ added in v0.0.16
func (cm *ConfigManager) UpdateConfig(newConfig *ProductionConfig) error
UpdateConfig updates the configuration
type ConfigWatcher ¶ added in v0.0.16
type ConfigWatcher interface {
OnConfigChange(oldConfig, newConfig *ProductionConfig) error
}
ConfigWatcher interface for configuration change notifications
type ConnectionPool ¶ added in v0.0.16
type ConnectionPool struct {
// contains filtered or unexported fields
}
ConnectionPool manages a pool of broker connections
func NewConnectionPool ¶ added in v0.0.16
func NewConnectionPool(maxConns int) *ConnectionPool
NewConnectionPool creates a new connection pool
func (*ConnectionPool) AddConnection ¶ added in v0.0.16
AddConnection adds a connection to the pool
func (*ConnectionPool) CleanupIdleConnections ¶ added in v0.0.16
func (cp *ConnectionPool) CleanupIdleConnections(idleTimeout time.Duration)
CleanupIdleConnections removes idle connections
func (*ConnectionPool) GetActiveConnections ¶ added in v0.0.16
func (cp *ConnectionPool) GetActiveConnections() int64
GetActiveConnections returns the number of active connections
func (*ConnectionPool) RemoveConnection ¶ added in v0.0.16
func (cp *ConnectionPool) RemoveConnection(id string)
RemoveConnection removes a connection from the pool
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func NewConsumer ¶
func (*Consumer) ConsumeMessage ¶
func (*Consumer) OnResponse ¶
func (*Consumer) ProcessTask ¶
func (*Consumer) StartHTTPAPI ¶ added in v0.0.12
StartHTTPAPI starts an HTTP server on a random available port and registers API endpoints. It returns the port number the server is listening on.
type ConsumerConfig ¶ added in v0.0.16
type ConsumerConfig struct { MaxRetries int `json:"max_retries"` InitialDelay time.Duration `json:"initial_delay"` MaxBackoff time.Duration `json:"max_backoff"` JitterPercent float64 `json:"jitter_percent"` EnableReconnect bool `json:"enable_reconnect"` ReconnectInterval time.Duration `json:"reconnect_interval"` HealthCheckInterval time.Duration `json:"health_check_interval"` MaxConcurrentTasks int `json:"max_concurrent_tasks"` TaskTimeout time.Duration `json:"task_timeout"` EnableDeduplication bool `json:"enable_deduplication"` DeduplicationWindow time.Duration `json:"deduplication_window"` EnablePriorityQueue bool `json:"enable_priority_queue"` EnableHTTPAPI bool `json:"enable_http_api"` HTTPAPIPort int `json:"http_api_port"` EnableCircuitBreaker bool `json:"enable_circuit_breaker"` CircuitBreakerThreshold int `json:"circuit_breaker_threshold"` CircuitBreakerTimeout time.Duration `json:"circuit_breaker_timeout"` }
ConsumerConfig contains consumer-specific configuration
func (*ConsumerConfig) UnmarshalJSON ¶ added in v0.0.16
func (c *ConsumerConfig) UnmarshalJSON(data []byte) error
type CronSchedule ¶
type CronSchedule struct { Seconds string Minute string Hour string DayOfMonth string Month string DayOfWeek string }
func (CronSchedule) String ¶
func (c CronSchedule) String() string
type DeadLetterQueue ¶ added in v0.0.11
type DeadLetterQueue struct {
// contains filtered or unexported fields
}
DeadLetterQueue stores tasks that have permanently failed.
func NewDeadLetterQueue ¶ added in v0.0.11
func NewDeadLetterQueue() *DeadLetterQueue
func (*DeadLetterQueue) Add ¶ added in v0.0.11
func (dlq *DeadLetterQueue) Add(task *QueueTask)
func (*DeadLetterQueue) Tasks ¶ added in v0.0.11
func (dlq *DeadLetterQueue) Tasks() []*QueueTask
type DefaultPlugin ¶ added in v0.0.11
type DefaultPlugin struct{}
DefaultPlugin is a no-op implementation of Plugin.
func (*DefaultPlugin) AfterTask ¶ added in v0.0.11
func (dp *DefaultPlugin) AfterTask(task *QueueTask, result Result)
func (*DefaultPlugin) BeforeTask ¶ added in v0.0.11
func (dp *DefaultPlugin) BeforeTask(task *QueueTask)
func (*DefaultPlugin) Initialize ¶ added in v0.0.11
func (dp *DefaultPlugin) Initialize(config interface{}) error
type DetailedMetricsRegistry ¶ added in v0.0.16
type DetailedMetricsRegistry struct {
// contains filtered or unexported fields
}
DetailedMetricsRegistry stores and manages metrics with enhanced features
func NewDetailedMetricsRegistry ¶ added in v0.0.16
func NewDetailedMetricsRegistry() *DetailedMetricsRegistry
NewMetricsRegistry creates a new metrics registry
func (*DetailedMetricsRegistry) GetAllMetrics ¶ added in v0.0.16
func (mr *DetailedMetricsRegistry) GetAllMetrics() map[string]*TimeSeries
GetAllMetrics returns all metrics
func (*DetailedMetricsRegistry) GetMetric ¶ added in v0.0.16
func (mr *DetailedMetricsRegistry) GetMetric(name string) (*TimeSeries, bool)
GetMetric returns a metric by name
func (*DetailedMetricsRegistry) RecordValue ¶ added in v0.0.16
func (mr *DetailedMetricsRegistry) RecordValue(name string, value float64)
RecordValue records a value for a metric
func (*DetailedMetricsRegistry) RegisterMetric ¶ added in v0.0.16
func (mr *DetailedMetricsRegistry) RegisterMetric(name string, metricType MetricType, description string, labels map[string]string)
RegisterMetric registers a new metric
type DiskSpaceHealthCheck ¶ added in v0.0.16
type DiskSpaceHealthCheck struct{}
DiskSpaceHealthCheck checks available disk space
func (*DiskSpaceHealthCheck) Check ¶ added in v0.0.16
func (dshc *DiskSpaceHealthCheck) Check(ctx context.Context) *HealthCheckResult
func (*DiskSpaceHealthCheck) Name ¶ added in v0.0.16
func (dshc *DiskSpaceHealthCheck) Name() string
func (*DiskSpaceHealthCheck) Timeout ¶ added in v0.0.16
func (dshc *DiskSpaceHealthCheck) Timeout() time.Duration
type DistributedLocker ¶ added in v0.0.16
type DynamicConfig ¶ added in v0.0.11
type DynamicConfig struct { Timeout time.Duration BatchSize int MaxMemoryLoad int64 IdleTimeout time.Duration BackoffDuration time.Duration MaxRetries int ReloadInterval time.Duration WarningThreshold WarningThresholds NumberOfWorkers int // new field for worker count }
DynamicConfig holds runtime configuration values.
type EnhancedCircuitBreaker ¶ added in v0.0.16
type EnhancedCircuitBreaker struct {
// contains filtered or unexported fields
}
EnhancedCircuitBreaker provides circuit breaker functionality
func NewEnhancedCircuitBreaker ¶ added in v0.0.16
func NewEnhancedCircuitBreaker(threshold int64, timeout time.Duration) *EnhancedCircuitBreaker
NewEnhancedCircuitBreaker creates a new circuit breaker
func (*EnhancedCircuitBreaker) Call ¶ added in v0.0.16
func (cb *EnhancedCircuitBreaker) Call(fn func() error) error
Call executes a function with circuit breaker protection
type ExecutionHistory ¶
type FormattedMetrics ¶ added in v0.0.12
type FormattedMetrics struct { TotalTasks int64 `json:"total_tasks"` CompletedTasks int64 `json:"completed_tasks"` ErrorCount int64 `json:"error_count"` CurrentMemoryUsed string `json:"current_memory_used"` CumulativeMemoryUsed string `json:"cumulative_memory_used"` TotalScheduled int64 `json:"total_scheduled"` CumulativeExecution string `json:"cumulative_execution"` AverageExecution string `json:"average_execution"` }
FormattedMetrics is a helper struct to present human-readable metrics.
type GoRoutineHealthCheck ¶ added in v0.0.16
type GoRoutineHealthCheck struct{}
GoRoutineHealthCheck checks goroutine count
func (*GoRoutineHealthCheck) Check ¶ added in v0.0.16
func (ghc *GoRoutineHealthCheck) Check(ctx context.Context) *HealthCheckResult
func (*GoRoutineHealthCheck) Name ¶ added in v0.0.16
func (ghc *GoRoutineHealthCheck) Name() string
func (*GoRoutineHealthCheck) Timeout ¶ added in v0.0.16
func (ghc *GoRoutineHealthCheck) Timeout() time.Duration
type HealthCheck ¶ added in v0.0.16
type HealthCheck interface { Name() string Check(ctx context.Context) *HealthCheckResult Timeout() time.Duration }
HealthCheck interface for health checks
type HealthCheckResult ¶ added in v0.0.16
type HealthCheckResult struct { Name string `json:"name"` Status HealthStatus `json:"status"` Message string `json:"message"` Duration time.Duration `json:"duration"` Timestamp time.Time `json:"timestamp"` Metadata map[string]interface{} `json:"metadata,omitempty"` }
HealthCheckResult represents the result of a health check
type HealthChecker ¶ added in v0.0.16
type HealthChecker struct {
// contains filtered or unexported fields
}
HealthChecker monitors broker health
func NewHealthChecker ¶ added in v0.0.16
func NewHealthChecker() *HealthChecker
NewHealthChecker creates a new health checker
func (*HealthChecker) Start ¶ added in v0.0.16
func (hc *HealthChecker) Start()
Start starts the health checker
func (*HealthChecker) Stop ¶ added in v0.0.16
func (hc *HealthChecker) Stop()
Stop stops the health checker
type HealthStatus ¶ added in v0.0.16
type HealthStatus string
HealthStatus represents the health status
const ( HealthStatusHealthy HealthStatus = "healthy" HealthStatusUnhealthy HealthStatus = "unhealthy" HealthStatusWarning HealthStatus = "warning" HealthStatusUnknown HealthStatus = "unknown" )
type HealthThresholds ¶ added in v0.0.16
type HealthThresholds struct { MaxMemoryUsage int64 MaxCPUUsage float64 MaxConnections int MaxQueueDepth int MaxResponseTime time.Duration MinFreeMemory int64 }
HealthThresholds defines health check thresholds
type InMemoryMessageStore ¶ added in v0.0.16
type InMemoryMessageStore struct {
// contains filtered or unexported fields
}
InMemoryMessageStore implements MessageStore in memory
func NewInMemoryMessageStore ¶ added in v0.0.16
func NewInMemoryMessageStore() *InMemoryMessageStore
NewInMemoryMessageStore creates a new in-memory message store
func (*InMemoryMessageStore) Cleanup ¶ added in v0.0.16
func (ims *InMemoryMessageStore) Cleanup(olderThan time.Time) error
Cleanup removes old messages
func (*InMemoryMessageStore) Count ¶ added in v0.0.16
func (ims *InMemoryMessageStore) Count(queue string) (int64, error)
Count counts messages in a queue
func (*InMemoryMessageStore) Delete ¶ added in v0.0.16
func (ims *InMemoryMessageStore) Delete(id string) error
Delete deletes a message
func (*InMemoryMessageStore) List ¶ added in v0.0.16
func (ims *InMemoryMessageStore) List(queue string, limit int, offset int) ([]*StoredMessage, error)
List lists messages for a queue
func (*InMemoryMessageStore) Retrieve ¶ added in v0.0.16
func (ims *InMemoryMessageStore) Retrieve(id string) (*StoredMessage, error)
Retrieve retrieves a message by ID
func (*InMemoryMessageStore) Store ¶ added in v0.0.16
func (ims *InMemoryMessageStore) Store(msg *StoredMessage) error
Store stores a message
type InMemoryMetricsRegistry ¶ added in v0.0.11
type InMemoryMetricsRegistry struct {
// contains filtered or unexported fields
}
InMemoryMetricsRegistry stores metrics in memory.
func NewInMemoryMetricsRegistry ¶ added in v0.0.11
func NewInMemoryMetricsRegistry() *InMemoryMetricsRegistry
func (*InMemoryMetricsRegistry) Get ¶ added in v0.0.11
func (m *InMemoryMetricsRegistry) Get(metricName string) interface{}
func (*InMemoryMetricsRegistry) Increment ¶ added in v0.0.11
func (m *InMemoryMetricsRegistry) Increment(metricName string)
func (*InMemoryMetricsRegistry) Register ¶ added in v0.0.11
func (m *InMemoryMetricsRegistry) Register(metricName string, value interface{})
type LockEntry ¶ added in v0.0.16
type LockEntry struct {
// contains filtered or unexported fields
}
type LogNotifier ¶ added in v0.0.16
type LogNotifier struct {
// contains filtered or unexported fields
}
LogNotifier sends alerts to logs
func NewLogNotifier ¶ added in v0.0.16
func NewLogNotifier(logger logger.Logger) *LogNotifier
func (*LogNotifier) Name ¶ added in v0.0.16
func (ln *LogNotifier) Name() string
func (*LogNotifier) Notify ¶ added in v0.0.16
func (ln *LogNotifier) Notify(ctx context.Context, alert ActiveAlert) error
type MemoryHealthCheck ¶ added in v0.0.16
type MemoryHealthCheck struct{}
MemoryHealthCheck checks memory usage
func (*MemoryHealthCheck) Check ¶ added in v0.0.16
func (mhc *MemoryHealthCheck) Check(ctx context.Context) *HealthCheckResult
func (*MemoryHealthCheck) Name ¶ added in v0.0.16
func (mhc *MemoryHealthCheck) Name() string
func (*MemoryHealthCheck) Timeout ¶ added in v0.0.16
func (mhc *MemoryHealthCheck) Timeout() time.Duration
type MemoryTaskStorage ¶
type MemoryTaskStorage struct {
// contains filtered or unexported fields
}
func NewMemoryTaskStorage ¶
func NewMemoryTaskStorage(expiryTime time.Duration) *MemoryTaskStorage
func (*MemoryTaskStorage) CleanupExpiredTasks ¶
func (m *MemoryTaskStorage) CleanupExpiredTasks() error
func (*MemoryTaskStorage) DeleteTask ¶
func (m *MemoryTaskStorage) DeleteTask(taskID string) error
func (*MemoryTaskStorage) FetchNextTask ¶
func (m *MemoryTaskStorage) FetchNextTask() (*QueueTask, error)
func (*MemoryTaskStorage) GetAllTasks ¶
func (m *MemoryTaskStorage) GetAllTasks() ([]*QueueTask, error)
func (*MemoryTaskStorage) GetTask ¶
func (m *MemoryTaskStorage) GetTask(taskID string) (*QueueTask, error)
func (*MemoryTaskStorage) SaveTask ¶
func (m *MemoryTaskStorage) SaveTask(task *QueueTask) error
type MessageStore ¶ added in v0.0.16
type MessageStore interface { Store(msg *StoredMessage) error Retrieve(id string) (*StoredMessage, error) Delete(id string) error List(queue string, limit int, offset int) ([]*StoredMessage, error) Count(queue string) (int64, error) Cleanup(olderThan time.Time) error }
MessageStore interface for storing messages
type Metric ¶ added in v0.0.16
type Metric struct { Name string `json:"name"` Value float64 `json:"value"` Timestamp time.Time `json:"timestamp"` Tags map[string]string `json:"tags,omitempty"` }
Metric represents a single metric
type MetricType ¶ added in v0.0.16
type MetricType string
MetricType represents the type of metric
const ( MetricTypeCounter MetricType = "counter" MetricTypeGauge MetricType = "gauge" MetricTypeHistogram MetricType = "histogram" MetricTypeSummary MetricType = "summary" )
type Metrics ¶
type Metrics struct { TotalTasks int64 // total number of tasks processed CompletedTasks int64 // number of successfully processed tasks ErrorCount int64 // number of tasks that resulted in error TotalMemoryUsed int64 // current memory used (in bytes) by tasks in flight TotalScheduled int64 // number of tasks scheduled ExecutionTime int64 // cumulative execution time in milliseconds CumulativeMemoryUsed int64 // cumulative memory used (sum of all task sizes) in bytes }
Metrics holds cumulative pool metrics.
type MetricsCollector ¶ added in v0.0.16
type MetricsCollector struct {
// contains filtered or unexported fields
}
MetricsCollector collects and stores metrics
func NewMetricsCollector ¶ added in v0.0.16
func NewMetricsCollector() *MetricsCollector
NewMetricsCollector creates a new metrics collector
func (*MetricsCollector) RecordMetric ¶ added in v0.0.16
func (mc *MetricsCollector) RecordMetric(name string, value float64, tags map[string]string)
RecordMetric records a metric
type MetricsRegistry ¶ added in v0.0.11
type MetricsServer ¶ added in v0.0.16
type MetricsServer struct {
// contains filtered or unexported fields
}
MetricsServer provides comprehensive monitoring and metrics
func NewMetricsServer ¶ added in v0.0.16
func NewMetricsServer(broker *Broker, config *MonitoringConfig, logger logger.Logger) *MetricsServer
NewMetricsServer creates a new metrics server
func (*MetricsServer) AddAlertNotifier ¶ added in v0.0.16
func (ms *MetricsServer) AddAlertNotifier(notifier AlertNotifier)
AddAlertNotifier adds an alert notifier to the metrics server
func (*MetricsServer) AddAlertRule ¶ added in v0.0.16
func (ms *MetricsServer) AddAlertRule(rule AlertRule)
AddAlertRule adds an alert rule to the metrics server
func (*MetricsServer) Start ¶ added in v0.0.16
func (ms *MetricsServer) Start(ctx context.Context) error
Start starts the metrics server
func (*MetricsServer) Stop ¶ added in v0.0.16
func (ms *MetricsServer) Stop() error
Stop stops the metrics server
type MonitoringConfig ¶ added in v0.0.16
type MonitoringConfig struct { EnableMetrics bool `json:"enable_metrics"` MetricsPort int `json:"metrics_port"` MetricsPath string `json:"metrics_path"` EnableHealthCheck bool `json:"enable_health_check"` HealthCheckPort int `json:"health_check_port"` HealthCheckPath string `json:"health_check_path"` HealthCheckInterval time.Duration `json:"health_check_interval"` EnableTracing bool `json:"enable_tracing"` TracingEndpoint string `json:"tracing_endpoint"` TracingSampleRate float64 `json:"tracing_sample_rate"` EnableLogging bool `json:"enable_logging"` LogLevel string `json:"log_level"` LogFormat string `json:"log_format"` // "json", "text" LogOutput string `json:"log_output"` // "stdout", "file", "syslog" LogFilePath string `json:"log_file_path"` LogMaxSize int `json:"log_max_size"` // MB LogMaxBackups int `json:"log_max_backups"` LogMaxAge int `json:"log_max_age"` // days EnableProfiling bool `json:"enable_profiling"` ProfilingPort int `json:"profiling_port"` }
MonitoringConfig contains monitoring and observability configuration
func (*MonitoringConfig) UnmarshalJSON ¶ added in v0.0.16
func (m *MonitoringConfig) UnmarshalJSON(data []byte) error
type Option ¶
type Option func(*Options)
func DisableBrokerRateLimit ¶ added in v0.0.11
func DisableBrokerRateLimit() Option
func DisableConsumerRateLimit ¶ added in v0.0.11
func DisableConsumerRateLimit() Option
func WithBrokerRateLimiter ¶ added in v0.0.11
func WithCallback ¶
WithCallback -
func WithConsumerOnClose ¶
func WithConsumerOnSubscribe ¶
func WithConsumerRateLimiter ¶ added in v0.0.11
func WithConsumerTimeout ¶ added in v0.0.16
func WithHTTPApi ¶ added in v0.0.12
WithHTTPApi - Option to enable/disable TLS
func WithLogger ¶ added in v0.0.10
func WithNotifyResponse ¶
func WithRespondPendingResult ¶
WithRespondPendingResult -
func WithWorkerPool ¶
type Options ¶
type Options struct { BrokerRateLimiter *RateLimiter // new field for broker rate limiting ConsumerRateLimiter *RateLimiter // new field for consumer rate limiting // contains filtered or unexported fields }
func SetupOptions ¶
func (*Options) BrokerAddr ¶ added in v0.0.10
func (*Options) CleanTaskOnComplete ¶
func (*Options) ConsumerTimeout ¶ added in v0.0.16
func (*Options) MaxMemoryLoad ¶
func (*Options) NumOfWorkers ¶
func (*Options) SetSyncMode ¶
func (*Options) Storage ¶
func (o *Options) Storage() TaskStorage
type PersistenceConfig ¶ added in v0.0.16
type PersistenceConfig struct { EnablePersistence bool `json:"enable_persistence"` StorageType string `json:"storage_type"` // "memory", "file", "redis", "postgres", "mysql" ConnectionString string `json:"connection_string"` MaxConnections int `json:"max_connections"` ConnectionTimeout time.Duration `json:"connection_timeout"` RetentionPeriod time.Duration `json:"retention_period"` CleanupInterval time.Duration `json:"cleanup_interval"` BackupEnabled bool `json:"backup_enabled"` BackupInterval time.Duration `json:"backup_interval"` BackupPath string `json:"backup_path"` CompressionEnabled bool `json:"compression_enabled"` EncryptionEnabled bool `json:"encryption_enabled"` ReplicationEnabled bool `json:"replication_enabled"` ReplicationNodes []string `json:"replication_nodes"` }
PersistenceConfig contains data persistence configuration
func (*PersistenceConfig) UnmarshalJSON ¶ added in v0.0.16
func (p *PersistenceConfig) UnmarshalJSON(data []byte) error
type Plugin ¶ added in v0.0.11
type Plugin interface { Initialize(config interface{}) error BeforeTask(task *QueueTask) AfterTask(task *QueueTask, result Result) }
Plugin is used to inject custom behavior before or after task processing.
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool represents the worker pool processing tasks.
func NewPool ¶
func NewPool(numOfWorkers int, opts ...PoolOption) *Pool
NewPool creates and starts a new pool with the given number of workers.
func (*Pool) AddScheduledMetrics ¶ added in v0.0.16
func (*Pool) AdjustWorkerCount ¶
func (*Pool) DLQ ¶ added in v0.0.11
func (wp *Pool) DLQ() *DeadLetterQueue
func (*Pool) EnqueueTask ¶
func (*Pool) FormattedMetrics ¶ added in v0.0.12
func (wp *Pool) FormattedMetrics() FormattedMetrics
FormattedMetrics returns a formatted version of the pool metrics.
func (*Pool) SetBatchSize ¶
func (*Pool) UpdateConfig ¶ added in v0.0.11
func (wp *Pool) UpdateConfig(newConfig *DynamicConfig) error
UpdateConfig updates pool configuration via a POOL_UPDATE command.
type PoolConfig ¶ added in v0.0.16
type PoolConfig struct { MinWorkers int `json:"min_workers"` MaxWorkers int `json:"max_workers"` QueueSize int `json:"queue_size"` MaxMemoryLoad int64 `json:"max_memory_load"` TaskTimeout time.Duration `json:"task_timeout"` IdleWorkerTimeout time.Duration `json:"idle_worker_timeout"` EnableDynamicScaling bool `json:"enable_dynamic_scaling"` ScalingFactor float64 `json:"scaling_factor"` ScalingInterval time.Duration `json:"scaling_interval"` MaxQueueWaitTime time.Duration `json:"max_queue_wait_time"` EnableWorkStealing bool `json:"enable_work_stealing"` EnablePriorityScheduling bool `json:"enable_priority_scheduling"` GracefulShutdownTimeout time.Duration `json:"graceful_shutdown_timeout"` }
PoolConfig contains worker pool configuration
func (*PoolConfig) UnmarshalJSON ¶ added in v0.0.16
func (p *PoolConfig) UnmarshalJSON(data []byte) error
type PoolOption ¶
type PoolOption func(*Pool)
func WithBatchSize ¶
func WithBatchSize(batchSize int) PoolOption
func WithCircuitBreaker ¶ added in v0.0.11
func WithCircuitBreaker(config CircuitBreakerConfig) PoolOption
func WithCompletionCallback ¶
func WithCompletionCallback(callback func()) PoolOption
func WithDiagnostics ¶ added in v0.0.11
func WithDiagnostics(enabled bool) PoolOption
func WithGracefulShutdown ¶ added in v0.0.11
func WithGracefulShutdown(timeout time.Duration) PoolOption
func WithHandler ¶
func WithHandler(handler Handler) PoolOption
func WithMaxMemoryLoad ¶
func WithMaxMemoryLoad(maxMemoryLoad int64) PoolOption
func WithMetricsRegistry ¶ added in v0.0.11
func WithMetricsRegistry(registry MetricsRegistry) PoolOption
func WithPlugin ¶ added in v0.0.11
func WithPlugin(plugin Plugin) PoolOption
func WithPoolCallback ¶
func WithPoolCallback(callback Callback) PoolOption
func WithTaskQueueSize ¶
func WithTaskQueueSize(size int) PoolOption
func WithTaskStorage ¶
func WithTaskStorage(storage TaskStorage) PoolOption
func WithTaskTimeout ¶
func WithTaskTimeout(t time.Duration) PoolOption
func WithWarningThresholds ¶ added in v0.0.11
func WithWarningThresholds(thresholds ThresholdConfig) PoolOption
type PriorityQueue ¶
type PriorityQueue []*QueueTask
func (PriorityQueue) Len ¶
func (pq PriorityQueue) Len() int
func (PriorityQueue) Less ¶
func (pq PriorityQueue) Less(i, j int) bool
func (*PriorityQueue) Pop ¶
func (pq *PriorityQueue) Pop() interface{}
func (*PriorityQueue) Push ¶
func (pq *PriorityQueue) Push(x interface{})
func (PriorityQueue) Swap ¶
func (pq PriorityQueue) Swap(i, j int)
type ProductionConfig ¶ added in v0.0.16
type ProductionConfig struct { Broker BrokerConfig `json:"broker"` Consumer ConsumerConfig `json:"consumer"` Publisher PublisherConfig `json:"publisher"` Pool PoolConfig `json:"pool"` Security SecurityConfig `json:"security"` Monitoring MonitoringConfig `json:"monitoring"` Persistence PersistenceConfig `json:"persistence"` Clustering ClusteringConfig `json:"clustering"` RateLimit RateLimitConfig `json:"rate_limit"` LastUpdated time.Time `json:"last_updated"` }
ProductionConfig contains all production configuration
func DefaultProductionConfig ¶ added in v0.0.16
func DefaultProductionConfig() *ProductionConfig
DefaultProductionConfig returns default production configuration
func (*ProductionConfig) UnmarshalJSON ¶ added in v0.0.16
func (c *ProductionConfig) UnmarshalJSON(data []byte) error
Custom unmarshaling to handle duration strings
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
func NewPublisher ¶
type PublisherConfig ¶ added in v0.0.16
type PublisherConfig struct { MaxRetries int `json:"max_retries"` InitialDelay time.Duration `json:"initial_delay"` MaxBackoff time.Duration `json:"max_backoff"` JitterPercent float64 `json:"jitter_percent"` ConnectionPoolSize int `json:"connection_pool_size"` PublishTimeout time.Duration `json:"publish_timeout"` EnableBatching bool `json:"enable_batching"` BatchSize int `json:"batch_size"` BatchTimeout time.Duration `json:"batch_timeout"` EnableCompression bool `json:"enable_compression"` CompressionLevel int `json:"compression_level"` EnableAsync bool `json:"enable_async"` AsyncBufferSize int `json:"async_buffer_size"` EnableOrderedDelivery bool `json:"enable_ordered_delivery"` }
PublisherConfig contains publisher-specific configuration
func (*PublisherConfig) UnmarshalJSON ¶ added in v0.0.16
func (p *PublisherConfig) UnmarshalJSON(data []byte) error
type QueueConfig ¶ added in v0.0.16
type QueueConfig struct { MaxDepth int `json:"max_depth"` MaxRetries int `json:"max_retries"` MessageTTL time.Duration `json:"message_ttl"` DeadLetter bool `json:"dead_letter"` Persistent bool `json:"persistent"` BatchSize int `json:"batch_size"` Priority int `json:"priority"` OrderedMode bool `json:"ordered_mode"` Throttling bool `json:"throttling"` ThrottleRate int `json:"throttle_rate"` ThrottleBurst int `json:"throttle_burst"` CompactionMode bool `json:"compaction_mode"` }
QueueConfig holds configuration for a specific queue
type QueueMetrics ¶ added in v0.0.16
type QueueMetrics struct { MessagesReceived int64 `json:"messages_received"` MessagesProcessed int64 `json:"messages_processed"` MessagesFailed int64 `json:"messages_failed"` CurrentDepth int64 `json:"current_depth"` AverageLatency time.Duration `json:"average_latency"` LastActivity time.Time `json:"last_activity"` }
QueueMetrics holds metrics for a specific queue
type QueueOption ¶ added in v0.0.16
type QueueOption func(*QueueConfig)
QueueOption defines options for queue configuration
func WithDeadLetter ¶ added in v0.0.16
func WithDeadLetter() QueueOption
WithDeadLetter enables dead letter queue for failed messages
func WithPersistent ¶ added in v0.0.16
func WithPersistent() QueueOption
WithPersistent enables message persistence
func WithQueueMaxDepth ¶ added in v0.0.16
func WithQueueMaxDepth(maxDepth int) QueueOption
WithQueueMaxDepth sets the maximum queue depth
func WithQueueMaxRetries ¶ added in v0.0.16
func WithQueueMaxRetries(maxRetries int) QueueOption
WithQueueMaxRetries sets the maximum retries for queue messages
func WithQueueOption ¶ added in v0.0.16
func WithQueueOption(config QueueConfig) QueueOption
WithQueueOption creates a queue with specific configuration
func WithQueueTTL ¶ added in v0.0.16
func WithQueueTTL(ttl time.Duration) QueueOption
WithQueueTTL sets the message TTL for the queue
type QueuedTask ¶
type RateLimitConfig ¶ added in v0.0.16
type RateLimitConfig struct { EnableBrokerRateLimit bool `json:"enable_broker_rate_limit"` BrokerRate int `json:"broker_rate"` // requests per second BrokerBurst int `json:"broker_burst"` EnableConsumerRateLimit bool `json:"enable_consumer_rate_limit"` ConsumerRate int `json:"consumer_rate"` ConsumerBurst int `json:"consumer_burst"` EnablePublisherRateLimit bool `json:"enable_publisher_rate_limit"` PublisherRate int `json:"publisher_rate"` PublisherBurst int `json:"publisher_burst"` EnablePerQueueRateLimit bool `json:"enable_per_queue_rate_limit"` PerQueueRate int `json:"per_queue_rate"` PerQueueBurst int `json:"per_queue_burst"` }
RateLimitConfig contains rate limiting configuration
type RateLimiter ¶ added in v0.0.11
type RateLimiter struct { C chan struct{} // contains filtered or unexported fields }
RateLimiter implementation
func NewRateLimiter ¶ added in v0.0.11
func NewRateLimiter(rate int, burst int) *RateLimiter
NewRateLimiter creates a new RateLimiter with the specified rate and burst.
func (*RateLimiter) Stop ¶ added in v0.0.12
func (rl *RateLimiter) Stop()
Stop terminates the rate limiter's internal goroutine.
func (*RateLimiter) Update ¶ added in v0.0.12
func (rl *RateLimiter) Update(newRate, newBurst int)
Update allows dynamic adjustment of rate and burst at runtime. It immediately applies the new settings.
func (*RateLimiter) Wait ¶ added in v0.0.11
func (rl *RateLimiter) Wait()
Wait blocks until a token is available.
type Result ¶
type Result struct { CreatedAt time.Time `json:"created_at"` ProcessedAt time.Time `json:"processed_at,omitempty"` Latency string `json:"latency"` Error error `json:"-"` // Keep error as an error type Topic string `json:"topic"` TaskID string `json:"task_id"` Status Status `json:"status"` ConditionStatus string `json:"condition_status"` Ctx context.Context `json:"-"` Payload json.RawMessage `json:"payload"` Last bool }
func (Result) MarshalJSON ¶
func (*Result) UnmarshalJSON ¶
type Schedule ¶
type Schedule struct { TimeOfDay time.Time CronSpec string DayOfWeek []time.Weekday DayOfMonth []int Interval time.Duration Recurring bool }
func (*Schedule) ToHumanReadable ¶
type ScheduleOptions ¶
type ScheduledTask ¶
type ScheduledTask struct {
// contains filtered or unexported fields
}
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
func NewScheduler ¶
func NewScheduler(pool *Pool, opts ...SchedulerOpt) *Scheduler
func (*Scheduler) ListScheduledTasks ¶ added in v0.0.16
func (*Scheduler) PrintAllTasks ¶
func (s *Scheduler) PrintAllTasks()
func (*Scheduler) PrintExecutionHistory ¶
func (*Scheduler) RemoveTask ¶
type SchedulerConfig ¶
type SchedulerOpt ¶ added in v0.0.16
type SchedulerOpt func(*Scheduler)
func WithStorage ¶ added in v0.0.16
func WithStorage(sm storage.IMap[string, *ScheduledTask]) SchedulerOpt
type SchedulerOption ¶
type SchedulerOption func(*ScheduleOptions)
func WithInterval ¶
func WithInterval(interval time.Duration) SchedulerOption
func WithOverlap ¶
func WithOverlap() SchedulerOption
func WithRecurring ¶
func WithRecurring() SchedulerOption
func WithScheduleSpec ¶ added in v0.0.16
func WithScheduleSpec(spec string) SchedulerOption
func WithSchedulerCallback ¶
func WithSchedulerCallback(callback Callback) SchedulerOption
func WithSchedulerHandler ¶
func WithSchedulerHandler(handler Handler) SchedulerOption
type SecurityConfig ¶ added in v0.0.16
type SecurityConfig struct { EnableTLS bool `json:"enable_tls"` TLSCertPath string `json:"tls_cert_path"` TLSKeyPath string `json:"tls_key_path"` TLSCAPath string `json:"tls_ca_path"` TLSInsecureSkipVerify bool `json:"tls_insecure_skip_verify"` EnableAuthentication bool `json:"enable_authentication"` AuthenticationMethod string `json:"authentication_method"` // "basic", "jwt", "oauth" EnableAuthorization bool `json:"enable_authorization"` EnableEncryption bool `json:"enable_encryption"` EncryptionKey string `json:"encryption_key"` EnableAuditLog bool `json:"enable_audit_log"` AuditLogPath string `json:"audit_log_path"` SessionTimeout time.Duration `json:"session_timeout"` MaxLoginAttempts int `json:"max_login_attempts"` LockoutDuration time.Duration `json:"lockout_duration"` }
SecurityConfig contains security-related configuration
func (*SecurityConfig) UnmarshalJSON ¶ added in v0.0.16
func (s *SecurityConfig) UnmarshalJSON(data []byte) error
type StoredMessage ¶ added in v0.0.16
type StoredMessage struct { ID string `json:"id"` Queue string `json:"queue"` Payload []byte `json:"payload"` Headers map[string]string `json:"headers,omitempty"` Metadata map[string]interface{} `json:"metadata,omitempty"` Priority int `json:"priority"` CreatedAt time.Time `json:"created_at"` ExpiresAt *time.Time `json:"expires_at,omitempty"` Attempts int `json:"attempts"` }
StoredMessage represents a message stored in the message store
type SystemHealthChecker ¶ added in v0.0.16
type SystemHealthChecker struct {
// contains filtered or unexported fields
}
SystemHealthChecker monitors system health
func NewSystemHealthChecker ¶ added in v0.0.16
func NewSystemHealthChecker(logger logger.Logger) *SystemHealthChecker
NewSystemHealthChecker creates a new system health checker
func (*SystemHealthChecker) GetOverallHealth ¶ added in v0.0.16
func (shc *SystemHealthChecker) GetOverallHealth() HealthStatus
GetOverallHealth returns the overall system health
func (*SystemHealthChecker) RegisterCheck ¶ added in v0.0.16
func (shc *SystemHealthChecker) RegisterCheck(check HealthCheck)
RegisterCheck registers a health check
func (*SystemHealthChecker) RunChecks ¶ added in v0.0.16
func (shc *SystemHealthChecker) RunChecks(ctx context.Context) map[string]*HealthCheckResult
RunChecks runs all health checks
type Task ¶
type Task struct { CreatedAt time.Time `json:"created_at"` ProcessedAt time.Time `json:"processed_at"` Expiry time.Time `json:"expiry"` Error error `json:"-"` // Don't serialize errors directly ErrorMsg string `json:"error,omitempty"` // Serialize error message if present ID string `json:"id"` Topic string `json:"topic"` Status Status `json:"status"` // Use Status type instead of string Payload json.RawMessage `json:"payload"` Priority int `json:"priority,omitempty"` Retries int `json:"retries,omitempty"` MaxRetries int `json:"max_retries,omitempty"` // Enhanced deduplication and tracing DedupKey string `json:"dedup_key,omitempty"` TraceID string `json:"trace_id,omitempty"` SpanID string `json:"span_id,omitempty"` Tags map[string]string `json:"tags,omitempty"` Headers map[string]string `json:"headers,omitempty"` // contains filtered or unexported fields }
func NewTask ¶
func NewTask(id string, payload json.RawMessage, nodeKey string, opts ...TaskOption) *Task
func (*Task) IncrementRetry ¶ added in v0.0.16
func (t *Task) IncrementRetry()
IncrementRetry increments the retry count
type TaskOption ¶ added in v0.0.10
type TaskOption func(*Task)
TaskOption defines a function type for setting options.
func WithDAG ¶ added in v0.0.10
func WithDAG(dag any) TaskOption
func WithDedupKey ¶ added in v0.0.16
func WithDedupKey(key string) TaskOption
new TaskOption for deduplication:
func WithExpiry ¶ added in v0.0.16
func WithExpiry(expiry time.Time) TaskOption
TaskOption for setting expiry time
func WithPriority ¶ added in v0.0.16
func WithPriority(priority int) TaskOption
TaskOption for setting priority
func WithTTL ¶ added in v0.0.16
func WithTTL(ttl time.Duration) TaskOption
TaskOption for setting TTL (time to live)
func WithTags ¶ added in v0.0.16
func WithTags(tags map[string]string) TaskOption
TaskOption for adding tags
func WithTaskHeaders ¶ added in v0.0.16
func WithTaskHeaders(headers map[string]string) TaskOption
TaskOption for adding headers
func WithTaskMaxRetries ¶ added in v0.0.16
func WithTaskMaxRetries(maxRetries int) TaskOption
TaskOption for setting max retries
func WithTraceID ¶ added in v0.0.16
func WithTraceID(traceID string) TaskOption
TaskOption for setting trace ID
type TaskStorage ¶
type ThresholdConfig ¶ added in v0.0.11
type TimeSeries ¶ added in v0.0.16
type TimeSeries struct { Name string `json:"name"` Type MetricType `json:"type"` Description string `json:"description"` Labels map[string]string `json:"labels"` Values []TimeSeriesPoint `json:"values"` MaxPoints int `json:"max_points"` // contains filtered or unexported fields }
TimeSeries represents a time series metric
type TimeSeriesPoint ¶ added in v0.0.16
TimeSeriesPoint represents a single point in a time series
type WarningThresholds ¶ added in v0.0.11
type WarningThresholds struct { HighMemory int64 // in bytes LongExecution time.Duration // threshold duration }
WarningThresholds defines thresholds for warnings.