Documentation
¶
Index ¶
- type BatchProcessor
- type CircuitBreaker
- type CircuitState
- type ConnectionPool
- type Consumer
- type HealthMonitor
- type HealthStatus
- type MemoryManager
- type Message
- type MessageHeap
- func (h MessageHeap) FindByID(id string) *Message
- func (h MessageHeap) Len() int
- func (h MessageHeap) Less(i, j int) bool
- func (h MessageHeap) Peek() *Message
- func (h *MessageHeap) Pop() interface{}
- func (h *MessageHeap) Push(x interface{})
- func (h *MessageHeap) RemoveByID(id string) bool
- func (h MessageHeap) Swap(i, j int)
- func (h *MessageHeap) UpdatePriority(id string, newPriority int) bool
- type MessageStatus
- type PendingEntry
- type Queue
- func (q *Queue) Acknowledge(messageID string) error
- func (q *Queue) CleanupExpiredMessages() error
- func (q *Queue) Consume(topic string, consumerName string) (*Message, error)
- func (q *Queue) Flush() error
- func (q *Queue) GetHealth() *HealthStatus
- func (q *Queue) GetProductionStats() map[string]interface{}
- func (q *Queue) GetStats() *QueueStats
- func (q *Queue) GracefulShutdown(timeout time.Duration) error
- func (q *Queue) Publish(topic string, data interface{}, priority int) error
- func (q *Queue) Reject(messageID string, reason string) error
- func (q *Queue) ScaleDown(reduceWorkers int) error
- func (q *Queue) ScaleUp(additionalWorkers int) error
- func (q *Queue) Stop()
- type QueueConfig
- type QueueStats
- type WorkerPool
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BatchProcessor ¶
type BatchProcessor struct {
// contains filtered or unexported fields
}
BatchProcessor handles message batching
func NewBatchProcessor ¶
func NewBatchProcessor(batchSize int, batchTimeout time.Duration) *BatchProcessor
NewBatchProcessor creates a new batch processor
func (*BatchProcessor) Stop ¶
func (bp *BatchProcessor) Stop()
Stop gracefully stops the batch processor
func (*BatchProcessor) Submit ¶
func (bp *BatchProcessor) Submit(msg *Message) error
Submit adds a message to the batch processor
type CircuitBreaker ¶
type CircuitBreaker struct {
// contains filtered or unexported fields
}
CircuitBreaker prevents cascade failures
func NewCircuitBreaker ¶
func NewCircuitBreaker(failureThreshold int, resetTimeout time.Duration) *CircuitBreaker
NewCircuitBreaker creates a new circuit breaker
func (*CircuitBreaker) Execute ¶
func (cb *CircuitBreaker) Execute(operation func() error) error
Execute runs a function with circuit breaker protection
func (*CircuitBreaker) GetState ¶
func (cb *CircuitBreaker) GetState() CircuitState
GetState returns the current circuit breaker state
func (*CircuitBreaker) GetStats ¶
func (cb *CircuitBreaker) GetStats() map[string]interface{}
GetStats returns circuit breaker statistics
type CircuitState ¶
type CircuitState string
CircuitState represents circuit breaker state
const ( CircuitClosed CircuitState = "closed" // Normal operation CircuitOpen CircuitState = "open" // Failing, reject requests CircuitHalf CircuitState = "half" // Testing recovery )
type ConnectionPool ¶
type ConnectionPool struct {
// contains filtered or unexported fields
}
ConnectionPool manages database connections
func NewConnectionPool ¶
func NewConnectionPool(maxConnections int, dbPath string) (*ConnectionPool, error)
NewConnectionPool creates a new database connection pool
func (*ConnectionPool) Close ¶
func (cp *ConnectionPool) Close()
Close closes all connections in the pool
func (*ConnectionPool) GetConnection ¶
func (cp *ConnectionPool) GetConnection() (*sql.DB, error)
GetConnection gets a database connection from the pool
func (*ConnectionPool) GetStats ¶
func (cp *ConnectionPool) GetStats() map[string]interface{}
GetStats returns connection pool statistics
func (*ConnectionPool) ReturnConnection ¶
func (cp *ConnectionPool) ReturnConnection(conn *sql.DB)
ReturnConnection returns a connection to the pool
type Consumer ¶
type Consumer struct {
Name string `json:"name"`
Group string `json:"group"`
PendingEntries map[string]*PendingEntry `json:"pending_entries"`
LastDelivered string `json:"last_delivered"`
// contains filtered or unexported fields
}
Consumer represents a message consumer
type HealthMonitor ¶
type HealthMonitor struct {
// contains filtered or unexported fields
}
HealthMonitor tracks queue health and performance
func NewHealthMonitor ¶
func NewHealthMonitor(checkInterval time.Duration) *HealthMonitor
NewHealthMonitor creates a new health monitor
func (*HealthMonitor) AddError ¶
func (hm *HealthMonitor) AddError(err string)
AddError adds an error to the health status
func (*HealthMonitor) AddWarning ¶
func (hm *HealthMonitor) AddWarning(warning string)
AddWarning adds a warning to the health status
func (*HealthMonitor) GetHealth ¶
func (hm *HealthMonitor) GetHealth() *HealthStatus
GetHealth returns the current health status
type HealthStatus ¶
type HealthStatus struct {
Status string `json:"status"`
Message string `json:"message"`
Timestamp time.Time `json:"timestamp"`
Metrics map[string]int64 `json:"metrics"`
Errors []string `json:"errors"`
Warnings []string `json:"warnings"`
}
HealthStatus represents queue health
type MemoryManager ¶
type MemoryManager struct {
// contains filtered or unexported fields
}
MemoryManager controls memory usage and prevents OOM
func NewMemoryManager ¶
func NewMemoryManager(maxMemoryMB int, threshold float64) *MemoryManager
NewMemoryManager creates a new memory manager
func (*MemoryManager) AllocateMemory ¶
func (mm *MemoryManager) AllocateMemory(mb int) error
AllocateMemory allocates memory and checks limits
func (*MemoryManager) FreeMemory ¶
func (mm *MemoryManager) FreeMemory(mb int)
FreeMemory frees allocated memory
func (*MemoryManager) GetStats ¶
func (mm *MemoryManager) GetStats() map[string]interface{}
GetStats returns memory manager statistics
type Message ¶
type Message struct {
ID string `json:"id"`
Topic string `json:"topic"`
Data interface{} `json:"data"`
Priority int `json:"priority"` // Higher = more important
Timestamp time.Time `json:"timestamp"`
Attempts int `json:"attempts"`
MaxAttempts int `json:"max_attempts"`
Status MessageStatus `json:"status"`
Consumer string `json:"consumer"` // Which consumer has it
ExpiresAt *time.Time `json:"expires_at"`
RetryAt *time.Time `json:"retry_at"` // When to retry (for exponential backoff)
// contains filtered or unexported fields
}
Message represents a queue message
type MessageHeap ¶
type MessageHeap []*Message
MessageHeap implements heap.Interface for priority queue
func (MessageHeap) FindByID ¶
func (h MessageHeap) FindByID(id string) *Message
FindByID finds a message by ID without removing it
func (MessageHeap) Len ¶
func (h MessageHeap) Len() int
func (MessageHeap) Less ¶
func (h MessageHeap) Less(i, j int) bool
func (MessageHeap) Peek ¶
func (h MessageHeap) Peek() *Message
Peek returns the highest priority message without removing it
func (*MessageHeap) Pop ¶
func (h *MessageHeap) Pop() interface{}
func (*MessageHeap) Push ¶
func (h *MessageHeap) Push(x interface{})
func (*MessageHeap) RemoveByID ¶
func (h *MessageHeap) RemoveByID(id string) bool
RemoveByID removes a message by ID
func (MessageHeap) Swap ¶
func (h MessageHeap) Swap(i, j int)
func (*MessageHeap) UpdatePriority ¶
func (h *MessageHeap) UpdatePriority(id string, newPriority int) bool
UpdatePriority updates the priority of a message and reorders the heap
type MessageStatus ¶
type MessageStatus string
MessageStatus represents the current status of a message
const ( StatusPending MessageStatus = "pending" StatusProcessing MessageStatus = "processing" StatusCompleted MessageStatus = "completed" StatusFailed MessageStatus = "failed" StatusRetry MessageStatus = "retry" StatusScheduled MessageStatus = "scheduled" // Scheduled for retry with backoff )
type PendingEntry ¶
type PendingEntry struct {
MessageID string `json:"message_id"`
Consumer string `json:"consumer"`
DeliveryTime time.Time `json:"delivery_time"`
DeliveryCount int `json:"delivery_count"`
}
PendingEntry tracks pending messages for acknowledgment
type Queue ¶
type Queue struct {
Name string
// contains filtered or unexported fields
}
Queue is the main queue implementation
func (*Queue) Acknowledge ¶
Acknowledge marks a message as completed
func (*Queue) CleanupExpiredMessages ¶
CleanupExpiredMessages removes expired messages from database and moves them to dead letter
func (*Queue) GetHealth ¶
func (q *Queue) GetHealth() *HealthStatus
GetHealth returns comprehensive health status
func (*Queue) GetProductionStats ¶
GetProductionStats returns comprehensive production metrics
func (*Queue) GracefulShutdown ¶
GracefulShutdown performs a graceful shutdown of all components
type QueueConfig ¶
type QueueConfig struct {
Name string `json:"name"`
DataDir string `json:"data_dir"`
BufferSize int `json:"buffer_size"`
FlushInterval time.Duration `json:"flush_interval"`
MaxRetries int `json:"max_retries"`
RetryDelay time.Duration `json:"retry_delay"`
MessageTTL time.Duration `json:"message_ttl"`
EnableMetrics bool `json:"enable_metrics"`
// 🚀 PRODUCTION SCALING CONFIGURATION
WorkerPoolSize int `json:"worker_pool_size"` // Number of concurrent workers
MaxConnections int `json:"max_connections"` // Max database connections
BatchSize int `json:"batch_size"` // Messages per batch
MaxMemoryMB int `json:"max_memory_mb"` // Memory limit in MB
EnableCompression bool `json:"enable_compression"` // Compress message data
AsyncOperations bool `json:"async_operations"` // Use async DB operations
CircuitBreaker bool `json:"circuit_breaker"` // Enable circuit breaker
HealthCheck bool `json:"health_check"` // Enable health monitoring
// 🚨 DEAD LETTER QUEUE CONFIGURATION
DLQMaxSize int `json:"dlq_max_size"` // Max DLQ entries (0 = unlimited)
DLQMaxAge time.Duration `json:"dlq_max_age"` // Max age for DLQ entries (0 = no expiry)
DLQCleanupInterval time.Duration `json:"dlq_cleanup_interval"` // How often to clean DLQ
DLQEnableAutoPrune bool `json:"dlq_enable_auto_prune"` // Enable automatic DLQ pruning
// 🔄 RETRY CONFIGURATION
RetryBaseDelay time.Duration `json:"retry_base_delay"` // Base delay for exponential backoff
RetryMaxDelay time.Duration `json:"retry_max_delay"` // Maximum retry delay
RetryMultiplier float64 `json:"retry_multiplier"` // Exponential backoff multiplier
RetryJitter bool `json:"retry_jitter"` // Add jitter to prevent thundering herd
// 🧹 MAINTENANCE CONFIGURATION
VacuumInterval time.Duration `json:"vacuum_interval"` // How often to run VACUUM (0 = disabled)
EnableAutoVacuum bool `json:"enable_auto_vacuum"` // Enable automatic VACUUM
// ⚡ DURABILITY CONFIGURATION
DurabilityMode string `json:"durability_mode"` // "full", "normal", "off" - SQLite synchronous mode
WriteBufferSize int `json:"write_buffer_size"` // Size of write buffer before flush
FlushOnPublish bool `json:"flush_on_publish"` // Force flush on every publish (slower but safer)
EnableFsync bool `json:"enable_fsync"` // Enable fsync after writes (slower but safer)
}
QueueConfig holds queue configuration
func DefaultQueueConfig ¶
func DefaultQueueConfig() *QueueConfig
DefaultQueueConfig returns default configuration
type QueueStats ¶
type QueueStats struct {
Name string `json:"name"`
PendingCount int `json:"pending_count"`
ProcessingCount int `json:"processing_count"`
CompletedCount int `json:"completed_count"`
FailedCount int `json:"failed_count"`
RetryCount int `json:"retry_count"`
BufferSize int `json:"buffer_size"`
ConsumerCount int `json:"consumer_count"`
LastFlush time.Time `json:"last_flush"`
LastMessage time.Time `json:"last_message"`
TotalMessages int `json:"total_messages"`
// 🚨 DEAD LETTER QUEUE STATISTICS
DLQCount int `json:"dlq_count"` // Current DLQ entries
DLQTotalCount int `json:"dlq_total_count"` // Total DLQ entries ever created
DLQLastCleanup time.Time `json:"dlq_last_cleanup"` // Last DLQ cleanup time
DLQPrunedCount int `json:"dlq_pruned_count"` // Number of entries pruned
}
QueueStats represents queue statistics
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool manages concurrent message processing
func NewWorkerPool ¶
func NewWorkerPool(workerCount int) *WorkerPool
NewWorkerPool creates a new worker pool for concurrent processing
func (*WorkerPool) GetStats ¶
func (wp *WorkerPool) GetStats() map[string]interface{}
GetStats returns worker pool statistics
func (*WorkerPool) Submit ¶
func (wp *WorkerPool) Submit(msg *Message) error
Submit adds a message to the work queue