mq

package module
v0.0.16 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 29, 2025 License: MIT Imports: 34 Imported by: 0

README

Introduction MQ (Message Queue Broker)

A simple Pub/Sub system memory based task processing. It uses centralized server to manage consumers and publishers.

Examples:

Run server

go run server.go

Run consumer

go run consumer.go

Run publisher

go run publisher.go

tasks.go

package tasks

import (
	"context"
	"github.com/oarkflow/json"
	"fmt"
	"log"

	"github.com/oarkflow/mq"
)

func Node1(ctx context.Context, task *mq.Task) mq.Result {
	return mq.Result{Payload: task.Payload, TaskID: task.ID}
}

func Node2(ctx context.Context, task *mq.Task) mq.Result {
	return mq.Result{Payload: task.Payload, TaskID: task.ID}
}

func Node3(ctx context.Context, task *mq.Task) mq.Result {
	var user map[string]any
	json.Unmarshal(task.Payload, &user)
	age := int(user["age"].(float64))
	status := "FAIL"
	if age > 20 {
		status = "PASS"
	}
	user["status"] = status
	resultPayload, _ := json.Marshal(user)
	return mq.Result{Payload: resultPayload, Status: status}
}

func Node4(ctx context.Context, task *mq.Task) mq.Result {
	var user map[string]any
	json.Unmarshal(task.Payload, &user)
	user["final"] = "D"
	resultPayload, _ := json.Marshal(user)
	return mq.Result{Payload: resultPayload}
}

func Node5(ctx context.Context, task *mq.Task) mq.Result {
	var user map[string]any
	json.Unmarshal(task.Payload, &user)
	user["salary"] = "E"
	resultPayload, _ := json.Marshal(user)
	return mq.Result{Payload: resultPayload}
}

func Node6(ctx context.Context, task *mq.Task) mq.Result {
	var user map[string]any
	json.Unmarshal(task.Payload, &user)
	resultPayload, _ := json.Marshal(map[string]any{"storage": user})
	return mq.Result{Payload: resultPayload}
}

func Callback(ctx context.Context, task mq.Result) mq.Result {
	fmt.Println("Received task", task.TaskID, "Payload", string(task.Payload), task.Error, task.Topic)
	return mq.Result{}
}

func NotifyResponse(ctx context.Context, result mq.Result) {
	log.Printf("DAG Final response: TaskID: %s, Payload: %s, Topic: %s", result.TaskID, result.Payload, result.Topic)
}

Start Server

server.go

package main

import (
	"context"
	
	"github.com/oarkflow/mq"
	"github.com/oarkflow/mq/examples/tasks"
)

func main() {
	b := mq.NewBroker(mq.WithCallback(tasks.Callback))
	b.NewQueue("queue1")
	b.NewQueue("queue2")
	b.Start(context.Background())
}

Start Consumer

consumer.go

package main

import (
	"context"

	"github.com/oarkflow/mq"

	"github.com/oarkflow/mq/examples/tasks"
)

func main() {
	consumer1 := mq.NewConsumer("consumer-1", "queue1", tasks.Node1)
	consumer2 := mq.NewConsumer("consumer-2", "queue2", tasks.Node2)
	// consumer := mq.NewConsumer("consumer-1", mq.WithTLS(true, "./certs/server.crt", "./certs/server.key"))
	go consumer1.Consume(context.Background())
	consumer2.Consume(context.Background())
}

Publish tasks

publisher.go

package main

import (
	"context"
	"fmt"
	
	"github.com/oarkflow/mq"
)

func main() {
	payload := []byte(`{"message":"Message Publisher \n Task"}`)
	task := mq.Task{
		Payload: payload,
	}
	publisher := mq.NewPublisher("publish-1")
	err := publisher.Publish(context.Background(), "queue1", task)
	if err != nil {
		panic(err)
	}
	fmt.Println("Async task published successfully")
	payload = []byte(`{"message":"Fire-and-Forget \n Task"}`)
	task = mq.Task{
		Payload: payload,
	}
	result, err := publisher.Request(context.Background(), "queue1", task)
	if err != nil {
		panic(err)
	}
	fmt.Printf("Sync task published. Result: %v\n", string(result.Payload))
}

DAG (Directed Acyclic Graph)

In this package, you can use the DAG feature to create a directed acyclic graph of tasks. The DAG feature allows you to define a sequence of tasks that need to be executed in a specific order.

Example

dag.go

package main

import (
	"context"
	"github.com/oarkflow/json"
	"github.com/oarkflow/mq/consts"
	"github.com/oarkflow/mq/examples/tasks"
	"io"
	"net/http"

	"github.com/oarkflow/mq"
	"github.com/oarkflow/mq/dag"
)

var (
	d = dag.NewDAG(mq.WithSyncMode(false), mq.WithNotifyResponse(tasks.NotifyResponse))
	// d = dag.NewDAG(mq.WithSyncMode(true), mq.WithTLS(true, "./certs/server.crt", "./certs/server.key"), mq.WithCAPath("./certs/ca.cert"))
)

func main() {
	d.AddNode("A", tasks.Node1, true)
	d.AddNode("B", tasks.Node2)
	d.AddNode("C", tasks.Node3)
	d.AddNode("D", tasks.Node4)
	d.AddNode("E", tasks.Node5)
	d.AddNode("F", tasks.Node6)
	d.AddEdge("A", "B", dag.LoopEdge)
	d.AddCondition("C", map[string]string{"PASS": "D", "FAIL": "E"})
	d.AddEdge("B", "C")
	d.AddEdge("D", "F")
	d.AddEdge("E", "F")
	http.HandleFunc("POST /publish", requestHandler("publish"))
	http.HandleFunc("POST /request", requestHandler("request"))
	err := d.Start(context.TODO(), ":8083")
	if err != nil {
		panic(err)
	}
}

func requestHandler(requestType string) func(w http.ResponseWriter, r *http.Request) {
	return func(w http.ResponseWriter, r *http.Request) {
		if r.Method != http.MethodPost {
			http.Error(w, "Invalid request method", http.StatusMethodNotAllowed)
			return
		}
		var payload []byte
		if r.Body != nil {
			defer r.Body.Close()
			var err error
			payload, err = io.ReadAll(r.Body)
			if err != nil {
				http.Error(w, "Failed to read request body", http.StatusBadRequest)
				return
			}
		} else {
			http.Error(w, "Empty request body", http.StatusBadRequest)
			return
		}
		ctx := context.Background()
		if requestType == "request" {
			ctx = mq.SetHeaders(ctx, map[string]string{consts.AwaitResponseKey: "true"})
		}
		// ctx = context.WithValue(ctx, "initial_node", "E")
		rs := d.ProcessTask(ctx, payload)
		w.Header().Set("Content-Type", "application/json")
		json.NewEncoder(w).Encode(rs)
	}
}

TODOS

  • Backend for task persistence
  • Task scheduling

Documentation

Index

Constants

This section is empty.

Variables

View Source
var BrokerAddr string
View Source
var Config = &DynamicConfig{
	Timeout:         10 * time.Second,
	BatchSize:       1,
	MaxMemoryLoad:   100 * 1024 * 1024,
	IdleTimeout:     5 * time.Minute,
	BackoffDuration: 2 * time.Second,
	MaxRetries:      3,
	ReloadInterval:  30 * time.Second,
	WarningThreshold: WarningThresholds{
		HighMemory:    1 * 1024 * 1024,
		LongExecution: 2 * time.Second,
	},
	NumberOfWorkers: 5,
}

Functions

func GetAwaitResponse

func GetAwaitResponse(ctx context.Context) (string, bool)

func GetConnection

func GetConnection(addr string, config TLSConfig) (net.Conn, error)

Modified GetConnection: reuse existing connection if valid.

func GetConsumerID

func GetConsumerID(ctx context.Context) (string, bool)

func GetContentType

func GetContentType(ctx context.Context) (string, bool)

func GetHeader

func GetHeader(ctx context.Context, key string) (string, bool)

func GetHeaders

func GetHeaders(ctx context.Context) (storage.IMap[string, string], bool)

func GetPublisherID

func GetPublisherID(ctx context.Context) (string, bool)

func GetQueue

func GetQueue(ctx context.Context) (string, bool)

func GetTriggerNode

func GetTriggerNode(ctx context.Context) (string, bool)

func HeadersWithConsumerID

func HeadersWithConsumerID(ctx context.Context, id string) map[string]string

func HeadersWithConsumerIDAndQueue

func HeadersWithConsumerIDAndQueue(ctx context.Context, id, queue string) map[string]string

func IsClosed

func IsClosed(conn net.Conn) bool

func NewID

func NewID() string

func RecoverPanic

func RecoverPanic(labelGenerator func() string)

func RecoverTitle

func RecoverTitle() string

func SetHeaders

func SetHeaders(ctx context.Context, headers map[string]string) context.Context

func WithHeaders

func WithHeaders(ctx context.Context, headers map[string]string) map[string]string

func WrapError

func WrapError(err error, msg, op string) error

Types

type ActiveAlert added in v0.0.16

type ActiveAlert struct {
	Rule        AlertRule         `json:"rule"`
	Value       float64           `json:"value"`
	StartsAt    time.Time         `json:"starts_at"`
	EndsAt      *time.Time        `json:"ends_at,omitempty"`
	Labels      map[string]string `json:"labels"`
	Annotations map[string]string `json:"annotations"`
	Status      AlertStatus       `json:"status"`
}

ActiveAlert represents an active alert

type AlertManager added in v0.0.16

type AlertManager struct {
	// contains filtered or unexported fields
}

AlertManager manages alerts and notifications

func NewAlertManager added in v0.0.16

func NewAlertManager(logger logger.Logger) *AlertManager

NewAlertManager creates a new alert manager

func (*AlertManager) AddNotifier added in v0.0.16

func (am *AlertManager) AddNotifier(notifier AlertNotifier)

AddNotifier adds an alert notifier

func (*AlertManager) AddRule added in v0.0.16

func (am *AlertManager) AddRule(rule AlertRule)

AddRule adds an alert rule

func (*AlertManager) EvaluateRules added in v0.0.16

func (am *AlertManager) EvaluateRules(registry *DetailedMetricsRegistry)

EvaluateRules evaluates all alert rules against current metrics

type AlertNotifier added in v0.0.16

type AlertNotifier interface {
	Notify(ctx context.Context, alert ActiveAlert) error
	Name() string
}

AlertNotifier interface for alert notifications

type AlertRule added in v0.0.16

type AlertRule struct {
	Name        string            `json:"name"`
	Metric      string            `json:"metric"`
	Condition   string            `json:"condition"` // "gt", "lt", "eq", "gte", "lte"
	Threshold   float64           `json:"threshold"`
	Duration    time.Duration     `json:"duration"`
	Labels      map[string]string `json:"labels"`
	Annotations map[string]string `json:"annotations"`
	Enabled     bool              `json:"enabled"`
}

AlertRule defines conditions for triggering alerts

type AlertStatus added in v0.0.16

type AlertStatus string

AlertStatus represents the status of an alert

const (
	AlertStatusFiring   AlertStatus = "firing"
	AlertStatusResolved AlertStatus = "resolved"
	AlertStatusSilenced AlertStatus = "silenced"
)

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

func NewBroker

func NewBroker(opts ...Option) *Broker

func (*Broker) AddConsumer

func (b *Broker) AddConsumer(ctx context.Context, queueName string, conn net.Conn) string

func (*Broker) AdjustConsumerWorkers added in v0.0.11

func (b *Broker) AdjustConsumerWorkers(noOfWorkers int, consumerID ...string)

func (*Broker) Authenticate added in v0.0.16

func (b *Broker) Authenticate(ctx context.Context, credentials map[string]string) error

Add authentication and authorization for publishers and consumers

func (*Broker) Authorize added in v0.0.16

func (b *Broker) Authorize(ctx context.Context, role string, action string) error

func (*Broker) Close added in v0.0.2

func (b *Broker) Close() error

func (*Broker) HandleCallback

func (b *Broker) HandleCallback(ctx context.Context, msg *codec.Message)

func (*Broker) MessageAck

func (b *Broker) MessageAck(ctx context.Context, msg *codec.Message)

func (*Broker) MessageDeny

func (b *Broker) MessageDeny(ctx context.Context, msg *codec.Message)

func (*Broker) MessageResponseHandler

func (b *Broker) MessageResponseHandler(ctx context.Context, msg *codec.Message)

func (*Broker) NewQueue

func (b *Broker) NewQueue(name string) *Queue

func (*Broker) NewQueueWithConfig added in v0.0.16

func (b *Broker) NewQueueWithConfig(name string, opts ...QueueOption) *Queue

NewQueueWithConfig creates a queue with specific configuration

func (*Broker) NewQueueWithOrdering added in v0.0.16

func (b *Broker) NewQueueWithOrdering(name string) *Queue

Ensure message ordering in task queues

func (*Broker) NotifyHandler

func (b *Broker) NotifyHandler() func(context.Context, Result) error

func (*Broker) OnClose

func (b *Broker) OnClose(ctx context.Context, conn net.Conn) error

func (*Broker) OnConsumerPause

func (b *Broker) OnConsumerPause(ctx context.Context, _ *codec.Message)

func (*Broker) OnConsumerResume

func (b *Broker) OnConsumerResume(ctx context.Context, _ *codec.Message)

func (*Broker) OnConsumerStop

func (b *Broker) OnConsumerStop(ctx context.Context, _ *codec.Message)

func (*Broker) OnConsumerUpdated added in v0.0.11

func (b *Broker) OnConsumerUpdated(ctx context.Context, msg *codec.Message)

func (*Broker) OnError

func (b *Broker) OnError(_ context.Context, conn net.Conn, err error)

func (*Broker) OnMessage

func (b *Broker) OnMessage(ctx context.Context, msg *codec.Message, conn net.Conn)

func (*Broker) Options

func (b *Broker) Options() *Options

func (*Broker) PauseConsumer

func (b *Broker) PauseConsumer(ctx context.Context, consumerID string, queues ...string)

func (*Broker) Publish

func (b *Broker) Publish(ctx context.Context, task *Task, queue string) error

func (*Broker) PublishHandler

func (b *Broker) PublishHandler(ctx context.Context, conn net.Conn, msg *codec.Message)

func (*Broker) RemoveConsumer

func (b *Broker) RemoveConsumer(consumerID string, queues ...string)

func (*Broker) ReprocessDLQ added in v0.0.16

func (b *Broker) ReprocessDLQ(queueName string) error

Add advanced dead-letter queue management

func (*Broker) ResumeConsumer

func (b *Broker) ResumeConsumer(ctx context.Context, consumerID string, queues ...string)

func (*Broker) SetNotifyHandler

func (b *Broker) SetNotifyHandler(callback Callback)

func (*Broker) SetURL added in v0.0.2

func (b *Broker) SetURL(url string)

func (*Broker) Start

func (b *Broker) Start(ctx context.Context) error

func (*Broker) StartEnhanced added in v0.0.16

func (b *Broker) StartEnhanced(ctx context.Context) error

Enhanced Start method with production features

func (*Broker) StopConsumer

func (b *Broker) StopConsumer(ctx context.Context, consumerID string, queues ...string)

func (*Broker) StopEnhanced added in v0.0.16

func (b *Broker) StopEnhanced() error

Enhanced Stop method with graceful shutdown

func (*Broker) SubscribeHandler

func (b *Broker) SubscribeHandler(ctx context.Context, conn net.Conn, msg *codec.Message)

func (*Broker) SyncMode

func (b *Broker) SyncMode() bool

func (*Broker) TLSConfig

func (b *Broker) TLSConfig() TLSConfig

func (*Broker) URL

func (b *Broker) URL() string

func (*Broker) UpdateConsumer added in v0.0.11

func (b *Broker) UpdateConsumer(ctx context.Context, consumerID string, config DynamicConfig, queues ...string) error

type BrokerConfig added in v0.0.16

type BrokerConfig struct {
	Address              string            `json:"address"`
	Port                 int               `json:"port"`
	MaxConnections       int               `json:"max_connections"`
	ConnectionTimeout    time.Duration     `json:"connection_timeout"`
	ReadTimeout          time.Duration     `json:"read_timeout"`
	WriteTimeout         time.Duration     `json:"write_timeout"`
	IdleTimeout          time.Duration     `json:"idle_timeout"`
	KeepAlive            bool              `json:"keep_alive"`
	KeepAlivePeriod      time.Duration     `json:"keep_alive_period"`
	MaxQueueDepth        int               `json:"max_queue_depth"`
	EnableDeadLetter     bool              `json:"enable_dead_letter"`
	DeadLetterMaxRetries int               `json:"dead_letter_max_retries"`
	EnableMetrics        bool              `json:"enable_metrics"`
	MetricsInterval      time.Duration     `json:"metrics_interval"`
	GracefulShutdown     time.Duration     `json:"graceful_shutdown"`
	MessageTTL           time.Duration     `json:"message_ttl"`
	Headers              map[string]string `json:"headers"`
}

BrokerConfig contains broker-specific configuration

func (*BrokerConfig) UnmarshalJSON added in v0.0.16

func (b *BrokerConfig) UnmarshalJSON(data []byte) error

type BrokerConnection added in v0.0.16

type BrokerConnection struct {
	// contains filtered or unexported fields
}

BrokerConnection represents a single broker connection

type Callback

type Callback func(ctx context.Context, result Result) error

Callback is called when a task processing is completed.

type CircuitBreaker added in v0.0.16

type CircuitBreaker struct {
	Enabled          bool
	FailureThreshold int
	ResetTimeout     time.Duration
}

type CircuitBreakerConfig added in v0.0.11

type CircuitBreakerConfig struct {
	Enabled          bool
	FailureThreshold int
	ResetTimeout     time.Duration
}

type CircuitState added in v0.0.16

type CircuitState int

CircuitState represents the state of a circuit breaker

const (
	CircuitClosed CircuitState = iota
	CircuitOpen
	CircuitHalfOpen
)

type ClusteringConfig added in v0.0.16

type ClusteringConfig struct {
	EnableClustering      bool          `json:"enable_clustering"`
	NodeID                string        `json:"node_id"`
	ClusterNodes          []string      `json:"cluster_nodes"`
	DiscoveryMethod       string        `json:"discovery_method"` // "static", "consul", "etcd", "k8s"
	DiscoveryEndpoint     string        `json:"discovery_endpoint"`
	HeartbeatInterval     time.Duration `json:"heartbeat_interval"`
	ElectionTimeout       time.Duration `json:"election_timeout"`
	EnableLoadBalancing   bool          `json:"enable_load_balancing"`
	LoadBalancingStrategy string        `json:"load_balancing_strategy"` // "round_robin", "least_connections", "hash"
	EnableFailover        bool          `json:"enable_failover"`
	FailoverTimeout       time.Duration `json:"failover_timeout"`
	EnableReplication     bool          `json:"enable_replication"`
	ReplicationFactor     int           `json:"replication_factor"`
	ConsistencyLevel      string        `json:"consistency_level"` // "weak", "strong", "eventual"
}

ClusteringConfig contains clustering configuration

func (*ClusteringConfig) UnmarshalJSON added in v0.0.16

func (c *ClusteringConfig) UnmarshalJSON(data []byte) error

type CompletionCallback

type CompletionCallback func()

CompletionCallback is called when the pool completes a graceful shutdown.

type ConfigManager added in v0.0.16

type ConfigManager struct {
	// contains filtered or unexported fields
}

ConfigManager handles dynamic configuration management

func NewConfigManager added in v0.0.16

func NewConfigManager(configFile string, logger logger.Logger) *ConfigManager

NewConfigManager creates a new configuration manager

func (*ConfigManager) AddWatcher added in v0.0.16

func (cm *ConfigManager) AddWatcher(watcher ConfigWatcher)

AddWatcher adds a configuration watcher

func (*ConfigManager) GetConfig added in v0.0.16

func (cm *ConfigManager) GetConfig() *ProductionConfig

GetConfig returns a copy of the current configuration

func (*ConfigManager) LoadConfig added in v0.0.16

func (cm *ConfigManager) LoadConfig() error

LoadConfig loads configuration from file

func (*ConfigManager) RemoveWatcher added in v0.0.16

func (cm *ConfigManager) RemoveWatcher(watcher ConfigWatcher)

RemoveWatcher removes a configuration watcher

func (*ConfigManager) SaveConfig added in v0.0.16

func (cm *ConfigManager) SaveConfig() error

SaveConfig saves current configuration to file

func (*ConfigManager) StartWatching added in v0.0.16

func (cm *ConfigManager) StartWatching(ctx context.Context, interval time.Duration)

StartWatching starts watching for configuration changes

func (*ConfigManager) UpdateConfig added in v0.0.16

func (cm *ConfigManager) UpdateConfig(newConfig *ProductionConfig) error

UpdateConfig updates the configuration

type ConfigWatcher added in v0.0.16

type ConfigWatcher interface {
	OnConfigChange(oldConfig, newConfig *ProductionConfig) error
}

ConfigWatcher interface for configuration change notifications

type ConnectionPool added in v0.0.16

type ConnectionPool struct {
	// contains filtered or unexported fields
}

ConnectionPool manages a pool of broker connections

func NewConnectionPool added in v0.0.16

func NewConnectionPool(maxConns int) *ConnectionPool

NewConnectionPool creates a new connection pool

func (*ConnectionPool) AddConnection added in v0.0.16

func (cp *ConnectionPool) AddConnection(id string, conn net.Conn, connType string) error

AddConnection adds a connection to the pool

func (*ConnectionPool) CleanupIdleConnections added in v0.0.16

func (cp *ConnectionPool) CleanupIdleConnections(idleTimeout time.Duration)

CleanupIdleConnections removes idle connections

func (*ConnectionPool) GetActiveConnections added in v0.0.16

func (cp *ConnectionPool) GetActiveConnections() int64

GetActiveConnections returns the number of active connections

func (*ConnectionPool) RemoveConnection added in v0.0.16

func (cp *ConnectionPool) RemoveConnection(id string)

RemoveConnection removes a connection from the pool

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(id string, queue string, handler Handler, opts ...Option) *Consumer

func (*Consumer) Close

func (c *Consumer) Close() error

func (*Consumer) Conn

func (c *Consumer) Conn() net.Conn

func (*Consumer) Consume

func (c *Consumer) Consume(ctx context.Context) error

func (*Consumer) ConsumeMessage

func (c *Consumer) ConsumeMessage(ctx context.Context, msg *codec.Message, conn net.Conn)

func (*Consumer) GetKey

func (c *Consumer) GetKey() string

func (*Consumer) GetType

func (c *Consumer) GetType() string

func (*Consumer) Metrics

func (c *Consumer) Metrics() Metrics

func (*Consumer) OnClose

func (c *Consumer) OnClose(_ context.Context, _ net.Conn) error

func (*Consumer) OnError

func (c *Consumer) OnError(_ context.Context, conn net.Conn, err error)

func (*Consumer) OnMessage

func (c *Consumer) OnMessage(ctx context.Context, msg *codec.Message, conn net.Conn) error

func (*Consumer) OnResponse

func (c *Consumer) OnResponse(ctx context.Context, result Result) error

func (*Consumer) Pause

func (c *Consumer) Pause(ctx context.Context) error

func (*Consumer) ProcessTask

func (c *Consumer) ProcessTask(ctx context.Context, msg *Task) Result

func (*Consumer) Resume

func (c *Consumer) Resume(ctx context.Context) error

func (*Consumer) SetKey

func (c *Consumer) SetKey(key string)

func (*Consumer) StartHTTPAPI added in v0.0.12

func (c *Consumer) StartHTTPAPI() (int, error)

StartHTTPAPI starts an HTTP server on a random available port and registers API endpoints. It returns the port number the server is listening on.

func (*Consumer) Stop

func (c *Consumer) Stop(ctx context.Context) error

func (*Consumer) Update added in v0.0.11

func (c *Consumer) Update(ctx context.Context, payload []byte) error

type ConsumerConfig added in v0.0.16

type ConsumerConfig struct {
	MaxRetries              int           `json:"max_retries"`
	InitialDelay            time.Duration `json:"initial_delay"`
	MaxBackoff              time.Duration `json:"max_backoff"`
	JitterPercent           float64       `json:"jitter_percent"`
	EnableReconnect         bool          `json:"enable_reconnect"`
	ReconnectInterval       time.Duration `json:"reconnect_interval"`
	HealthCheckInterval     time.Duration `json:"health_check_interval"`
	MaxConcurrentTasks      int           `json:"max_concurrent_tasks"`
	TaskTimeout             time.Duration `json:"task_timeout"`
	EnableDeduplication     bool          `json:"enable_deduplication"`
	DeduplicationWindow     time.Duration `json:"deduplication_window"`
	EnablePriorityQueue     bool          `json:"enable_priority_queue"`
	EnableHTTPAPI           bool          `json:"enable_http_api"`
	HTTPAPIPort             int           `json:"http_api_port"`
	EnableCircuitBreaker    bool          `json:"enable_circuit_breaker"`
	CircuitBreakerThreshold int           `json:"circuit_breaker_threshold"`
	CircuitBreakerTimeout   time.Duration `json:"circuit_breaker_timeout"`
}

ConsumerConfig contains consumer-specific configuration

func (*ConsumerConfig) UnmarshalJSON added in v0.0.16

func (c *ConsumerConfig) UnmarshalJSON(data []byte) error

type CronSchedule

type CronSchedule struct {
	Seconds    string
	Minute     string
	Hour       string
	DayOfMonth string
	Month      string
	DayOfWeek  string
}

func (CronSchedule) String

func (c CronSchedule) String() string

type DeadLetterQueue added in v0.0.11

type DeadLetterQueue struct {
	// contains filtered or unexported fields
}

DeadLetterQueue stores tasks that have permanently failed.

func NewDeadLetterQueue added in v0.0.11

func NewDeadLetterQueue() *DeadLetterQueue

func (*DeadLetterQueue) Add added in v0.0.11

func (dlq *DeadLetterQueue) Add(task *QueueTask)

func (*DeadLetterQueue) Tasks added in v0.0.11

func (dlq *DeadLetterQueue) Tasks() []*QueueTask

type DefaultPlugin added in v0.0.11

type DefaultPlugin struct{}

DefaultPlugin is a no-op implementation of Plugin.

func (*DefaultPlugin) AfterTask added in v0.0.11

func (dp *DefaultPlugin) AfterTask(task *QueueTask, result Result)

func (*DefaultPlugin) BeforeTask added in v0.0.11

func (dp *DefaultPlugin) BeforeTask(task *QueueTask)

func (*DefaultPlugin) Initialize added in v0.0.11

func (dp *DefaultPlugin) Initialize(config interface{}) error

type DetailedMetricsRegistry added in v0.0.16

type DetailedMetricsRegistry struct {
	// contains filtered or unexported fields
}

DetailedMetricsRegistry stores and manages metrics with enhanced features

func NewDetailedMetricsRegistry added in v0.0.16

func NewDetailedMetricsRegistry() *DetailedMetricsRegistry

NewMetricsRegistry creates a new metrics registry

func (*DetailedMetricsRegistry) GetAllMetrics added in v0.0.16

func (mr *DetailedMetricsRegistry) GetAllMetrics() map[string]*TimeSeries

GetAllMetrics returns all metrics

func (*DetailedMetricsRegistry) GetMetric added in v0.0.16

func (mr *DetailedMetricsRegistry) GetMetric(name string) (*TimeSeries, bool)

GetMetric returns a metric by name

func (*DetailedMetricsRegistry) RecordValue added in v0.0.16

func (mr *DetailedMetricsRegistry) RecordValue(name string, value float64)

RecordValue records a value for a metric

func (*DetailedMetricsRegistry) RegisterMetric added in v0.0.16

func (mr *DetailedMetricsRegistry) RegisterMetric(name string, metricType MetricType, description string, labels map[string]string)

RegisterMetric registers a new metric

type DiskSpaceHealthCheck added in v0.0.16

type DiskSpaceHealthCheck struct{}

DiskSpaceHealthCheck checks available disk space

func (*DiskSpaceHealthCheck) Check added in v0.0.16

func (*DiskSpaceHealthCheck) Name added in v0.0.16

func (dshc *DiskSpaceHealthCheck) Name() string

func (*DiskSpaceHealthCheck) Timeout added in v0.0.16

func (dshc *DiskSpaceHealthCheck) Timeout() time.Duration

type DistributedLocker added in v0.0.16

type DistributedLocker interface {
	Acquire(key string, ttl time.Duration) bool
	Release(key string)
}

type DynamicConfig added in v0.0.11

type DynamicConfig struct {
	Timeout          time.Duration
	BatchSize        int
	MaxMemoryLoad    int64
	IdleTimeout      time.Duration
	BackoffDuration  time.Duration
	MaxRetries       int
	ReloadInterval   time.Duration
	WarningThreshold WarningThresholds
	NumberOfWorkers  int // new field for worker count
}

DynamicConfig holds runtime configuration values.

type EnhancedCircuitBreaker added in v0.0.16

type EnhancedCircuitBreaker struct {
	// contains filtered or unexported fields
}

EnhancedCircuitBreaker provides circuit breaker functionality

func NewEnhancedCircuitBreaker added in v0.0.16

func NewEnhancedCircuitBreaker(threshold int64, timeout time.Duration) *EnhancedCircuitBreaker

NewEnhancedCircuitBreaker creates a new circuit breaker

func (*EnhancedCircuitBreaker) Call added in v0.0.16

func (cb *EnhancedCircuitBreaker) Call(fn func() error) error

Call executes a function with circuit breaker protection

type ExecutionHistory

type ExecutionHistory struct {
	Timestamp time.Time
	Result    Result
}

type FormattedMetrics added in v0.0.12

type FormattedMetrics struct {
	TotalTasks           int64  `json:"total_tasks"`
	CompletedTasks       int64  `json:"completed_tasks"`
	ErrorCount           int64  `json:"error_count"`
	CurrentMemoryUsed    string `json:"current_memory_used"`
	CumulativeMemoryUsed string `json:"cumulative_memory_used"`
	TotalScheduled       int64  `json:"total_scheduled"`
	CumulativeExecution  string `json:"cumulative_execution"`
	AverageExecution     string `json:"average_execution"`
}

FormattedMetrics is a helper struct to present human-readable metrics.

type GoRoutineHealthCheck added in v0.0.16

type GoRoutineHealthCheck struct{}

GoRoutineHealthCheck checks goroutine count

func (*GoRoutineHealthCheck) Check added in v0.0.16

func (*GoRoutineHealthCheck) Name added in v0.0.16

func (ghc *GoRoutineHealthCheck) Name() string

func (*GoRoutineHealthCheck) Timeout added in v0.0.16

func (ghc *GoRoutineHealthCheck) Timeout() time.Duration

type Handler

type Handler func(context.Context, *Task) Result

type HealthCheck added in v0.0.16

type HealthCheck interface {
	Name() string
	Check(ctx context.Context) *HealthCheckResult
	Timeout() time.Duration
}

HealthCheck interface for health checks

type HealthCheckResult added in v0.0.16

type HealthCheckResult struct {
	Name      string                 `json:"name"`
	Status    HealthStatus           `json:"status"`
	Message   string                 `json:"message"`
	Duration  time.Duration          `json:"duration"`
	Timestamp time.Time              `json:"timestamp"`
	Metadata  map[string]interface{} `json:"metadata,omitempty"`
}

HealthCheckResult represents the result of a health check

type HealthChecker added in v0.0.16

type HealthChecker struct {
	// contains filtered or unexported fields
}

HealthChecker monitors broker health

func NewHealthChecker added in v0.0.16

func NewHealthChecker() *HealthChecker

NewHealthChecker creates a new health checker

func (*HealthChecker) Start added in v0.0.16

func (hc *HealthChecker) Start()

Start starts the health checker

func (*HealthChecker) Stop added in v0.0.16

func (hc *HealthChecker) Stop()

Stop stops the health checker

type HealthStatus added in v0.0.16

type HealthStatus string

HealthStatus represents the health status

const (
	HealthStatusHealthy   HealthStatus = "healthy"
	HealthStatusUnhealthy HealthStatus = "unhealthy"
	HealthStatusWarning   HealthStatus = "warning"
	HealthStatusUnknown   HealthStatus = "unknown"
)

type HealthThresholds added in v0.0.16

type HealthThresholds struct {
	MaxMemoryUsage  int64
	MaxCPUUsage     float64
	MaxConnections  int
	MaxQueueDepth   int
	MaxResponseTime time.Duration
	MinFreeMemory   int64
}

HealthThresholds defines health check thresholds

type InMemoryMessageStore added in v0.0.16

type InMemoryMessageStore struct {
	// contains filtered or unexported fields
}

InMemoryMessageStore implements MessageStore in memory

func NewInMemoryMessageStore added in v0.0.16

func NewInMemoryMessageStore() *InMemoryMessageStore

NewInMemoryMessageStore creates a new in-memory message store

func (*InMemoryMessageStore) Cleanup added in v0.0.16

func (ims *InMemoryMessageStore) Cleanup(olderThan time.Time) error

Cleanup removes old messages

func (*InMemoryMessageStore) Count added in v0.0.16

func (ims *InMemoryMessageStore) Count(queue string) (int64, error)

Count counts messages in a queue

func (*InMemoryMessageStore) Delete added in v0.0.16

func (ims *InMemoryMessageStore) Delete(id string) error

Delete deletes a message

func (*InMemoryMessageStore) List added in v0.0.16

func (ims *InMemoryMessageStore) List(queue string, limit int, offset int) ([]*StoredMessage, error)

List lists messages for a queue

func (*InMemoryMessageStore) Retrieve added in v0.0.16

func (ims *InMemoryMessageStore) Retrieve(id string) (*StoredMessage, error)

Retrieve retrieves a message by ID

func (*InMemoryMessageStore) Store added in v0.0.16

func (ims *InMemoryMessageStore) Store(msg *StoredMessage) error

Store stores a message

type InMemoryMetricsRegistry added in v0.0.11

type InMemoryMetricsRegistry struct {
	// contains filtered or unexported fields
}

InMemoryMetricsRegistry stores metrics in memory.

func NewInMemoryMetricsRegistry added in v0.0.11

func NewInMemoryMetricsRegistry() *InMemoryMetricsRegistry

func (*InMemoryMetricsRegistry) Get added in v0.0.11

func (m *InMemoryMetricsRegistry) Get(metricName string) interface{}

func (*InMemoryMetricsRegistry) Increment added in v0.0.11

func (m *InMemoryMetricsRegistry) Increment(metricName string)

func (*InMemoryMetricsRegistry) Register added in v0.0.11

func (m *InMemoryMetricsRegistry) Register(metricName string, value interface{})

type LockEntry added in v0.0.16

type LockEntry struct {
	// contains filtered or unexported fields
}

type LogNotifier added in v0.0.16

type LogNotifier struct {
	// contains filtered or unexported fields
}

LogNotifier sends alerts to logs

func NewLogNotifier added in v0.0.16

func NewLogNotifier(logger logger.Logger) *LogNotifier

func (*LogNotifier) Name added in v0.0.16

func (ln *LogNotifier) Name() string

func (*LogNotifier) Notify added in v0.0.16

func (ln *LogNotifier) Notify(ctx context.Context, alert ActiveAlert) error

type MemoryHealthCheck added in v0.0.16

type MemoryHealthCheck struct{}

MemoryHealthCheck checks memory usage

func (*MemoryHealthCheck) Check added in v0.0.16

func (*MemoryHealthCheck) Name added in v0.0.16

func (mhc *MemoryHealthCheck) Name() string

func (*MemoryHealthCheck) Timeout added in v0.0.16

func (mhc *MemoryHealthCheck) Timeout() time.Duration

type MemoryTaskStorage

type MemoryTaskStorage struct {
	// contains filtered or unexported fields
}

func NewMemoryTaskStorage

func NewMemoryTaskStorage(expiryTime time.Duration) *MemoryTaskStorage

func (*MemoryTaskStorage) CleanupExpiredTasks

func (m *MemoryTaskStorage) CleanupExpiredTasks() error

func (*MemoryTaskStorage) DeleteTask

func (m *MemoryTaskStorage) DeleteTask(taskID string) error

func (*MemoryTaskStorage) FetchNextTask

func (m *MemoryTaskStorage) FetchNextTask() (*QueueTask, error)

func (*MemoryTaskStorage) GetAllTasks

func (m *MemoryTaskStorage) GetAllTasks() ([]*QueueTask, error)

func (*MemoryTaskStorage) GetTask

func (m *MemoryTaskStorage) GetTask(taskID string) (*QueueTask, error)

func (*MemoryTaskStorage) SaveTask

func (m *MemoryTaskStorage) SaveTask(task *QueueTask) error

type MessageStore added in v0.0.16

type MessageStore interface {
	Store(msg *StoredMessage) error
	Retrieve(id string) (*StoredMessage, error)
	Delete(id string) error
	List(queue string, limit int, offset int) ([]*StoredMessage, error)
	Count(queue string) (int64, error)
	Cleanup(olderThan time.Time) error
}

MessageStore interface for storing messages

type Metric added in v0.0.16

type Metric struct {
	Name      string            `json:"name"`
	Value     float64           `json:"value"`
	Timestamp time.Time         `json:"timestamp"`
	Tags      map[string]string `json:"tags,omitempty"`
}

Metric represents a single metric

type MetricType added in v0.0.16

type MetricType string

MetricType represents the type of metric

const (
	MetricTypeCounter   MetricType = "counter"
	MetricTypeGauge     MetricType = "gauge"
	MetricTypeHistogram MetricType = "histogram"
	MetricTypeSummary   MetricType = "summary"
)

type Metrics

type Metrics struct {
	TotalTasks           int64 // total number of tasks processed
	CompletedTasks       int64 // number of successfully processed tasks
	ErrorCount           int64 // number of tasks that resulted in error
	TotalMemoryUsed      int64 // current memory used (in bytes) by tasks in flight
	TotalScheduled       int64 // number of tasks scheduled
	ExecutionTime        int64 // cumulative execution time in milliseconds
	CumulativeMemoryUsed int64 // cumulative memory used (sum of all task sizes) in bytes
}

Metrics holds cumulative pool metrics.

type MetricsCollector added in v0.0.16

type MetricsCollector struct {
	// contains filtered or unexported fields
}

MetricsCollector collects and stores metrics

func NewMetricsCollector added in v0.0.16

func NewMetricsCollector() *MetricsCollector

NewMetricsCollector creates a new metrics collector

func (*MetricsCollector) RecordMetric added in v0.0.16

func (mc *MetricsCollector) RecordMetric(name string, value float64, tags map[string]string)

RecordMetric records a metric

type MetricsRegistry added in v0.0.11

type MetricsRegistry interface {
	Register(metricName string, value interface{})
	Increment(metricName string)
	Get(metricName string) interface{}
}

type MetricsServer added in v0.0.16

type MetricsServer struct {
	// contains filtered or unexported fields
}

MetricsServer provides comprehensive monitoring and metrics

func NewMetricsServer added in v0.0.16

func NewMetricsServer(broker *Broker, config *MonitoringConfig, logger logger.Logger) *MetricsServer

NewMetricsServer creates a new metrics server

func (*MetricsServer) AddAlertNotifier added in v0.0.16

func (ms *MetricsServer) AddAlertNotifier(notifier AlertNotifier)

AddAlertNotifier adds an alert notifier to the metrics server

func (*MetricsServer) AddAlertRule added in v0.0.16

func (ms *MetricsServer) AddAlertRule(rule AlertRule)

AddAlertRule adds an alert rule to the metrics server

func (*MetricsServer) Start added in v0.0.16

func (ms *MetricsServer) Start(ctx context.Context) error

Start starts the metrics server

func (*MetricsServer) Stop added in v0.0.16

func (ms *MetricsServer) Stop() error

Stop stops the metrics server

type MonitoringConfig added in v0.0.16

type MonitoringConfig struct {
	EnableMetrics       bool          `json:"enable_metrics"`
	MetricsPort         int           `json:"metrics_port"`
	MetricsPath         string        `json:"metrics_path"`
	EnableHealthCheck   bool          `json:"enable_health_check"`
	HealthCheckPort     int           `json:"health_check_port"`
	HealthCheckPath     string        `json:"health_check_path"`
	HealthCheckInterval time.Duration `json:"health_check_interval"`
	EnableTracing       bool          `json:"enable_tracing"`
	TracingEndpoint     string        `json:"tracing_endpoint"`
	TracingSampleRate   float64       `json:"tracing_sample_rate"`
	EnableLogging       bool          `json:"enable_logging"`
	LogLevel            string        `json:"log_level"`
	LogFormat           string        `json:"log_format"` // "json", "text"
	LogOutput           string        `json:"log_output"` // "stdout", "file", "syslog"
	LogFilePath         string        `json:"log_file_path"`
	LogMaxSize          int           `json:"log_max_size"` // MB
	LogMaxBackups       int           `json:"log_max_backups"`
	LogMaxAge           int           `json:"log_max_age"` // days
	EnableProfiling     bool          `json:"enable_profiling"`
	ProfilingPort       int           `json:"profiling_port"`
}

MonitoringConfig contains monitoring and observability configuration

func (*MonitoringConfig) UnmarshalJSON added in v0.0.16

func (m *MonitoringConfig) UnmarshalJSON(data []byte) error

type Option

type Option func(*Options)

func DisableBrokerRateLimit added in v0.0.11

func DisableBrokerRateLimit() Option

func DisableConsumerRateLimit added in v0.0.11

func DisableConsumerRateLimit() Option

func WithBrokerRateLimiter added in v0.0.11

func WithBrokerRateLimiter(rate int, burst int) Option

func WithBrokerURL

func WithBrokerURL(url string) Option

WithBrokerURL -

func WithCAPath

func WithCAPath(caPath string) Option

WithCAPath - Option to enable/disable TLS

func WithCallback

func WithCallback(val ...func(context.Context, Result) Result) Option

WithCallback -

func WithCleanTaskOnComplete

func WithCleanTaskOnComplete() Option

WithCleanTaskOnComplete -

func WithConsumerOnClose

func WithConsumerOnClose(handler func(ctx context.Context, topic, consumerName string)) Option

func WithConsumerOnSubscribe

func WithConsumerOnSubscribe(handler func(ctx context.Context, topic, consumerName string)) Option

func WithConsumerRateLimiter added in v0.0.11

func WithConsumerRateLimiter(rate int, burst int) Option

func WithConsumerTimeout added in v0.0.16

func WithConsumerTimeout(timeout time.Duration) Option

func WithHTTPApi added in v0.0.12

func WithHTTPApi(flag bool) Option

WithHTTPApi - Option to enable/disable TLS

func WithInitialDelay

func WithInitialDelay(val time.Duration) Option

WithInitialDelay -

func WithJitterPercent

func WithJitterPercent(val float64) Option

WithJitterPercent -

func WithLogger added in v0.0.10

func WithLogger(log logger.Logger) Option

func WithMaxBackoff

func WithMaxBackoff(val time.Duration) Option

WithMaxBackoff -

func WithMaxRetries

func WithMaxRetries(val int) Option

WithMaxRetries -

func WithNotifyResponse

func WithNotifyResponse(callback Callback) Option

func WithRespondPendingResult

func WithRespondPendingResult(mode bool) Option

WithRespondPendingResult -

func WithSyncMode

func WithSyncMode(mode bool) Option

WithSyncMode -

func WithTLS

func WithTLS(enableTLS bool, certPath, keyPath string) Option

WithTLS - Option to enable/disable TLS

func WithWorkerPool

func WithWorkerPool(queueSize, numOfWorkers int, maxMemoryLoad int64) Option

type Options

type Options struct {
	BrokerRateLimiter   *RateLimiter // new field for broker rate limiting
	ConsumerRateLimiter *RateLimiter // new field for consumer rate limiting
	// contains filtered or unexported fields
}

func SetupOptions

func SetupOptions(opts ...Option) *Options

func (*Options) BrokerAddr added in v0.0.10

func (o *Options) BrokerAddr() string

func (*Options) CleanTaskOnComplete

func (o *Options) CleanTaskOnComplete() bool

func (*Options) ConsumerTimeout added in v0.0.16

func (o *Options) ConsumerTimeout() time.Duration

func (*Options) HTTPApi added in v0.0.12

func (o *Options) HTTPApi() bool

func (*Options) Logger added in v0.0.10

func (o *Options) Logger() logger.Logger

func (*Options) MaxMemoryLoad

func (o *Options) MaxMemoryLoad() int64

func (*Options) NumOfWorkers

func (o *Options) NumOfWorkers() int

func (*Options) QueueSize

func (o *Options) QueueSize() int

func (*Options) SetSyncMode

func (o *Options) SetSyncMode(sync bool)

func (*Options) Storage

func (o *Options) Storage() TaskStorage

type PersistenceConfig added in v0.0.16

type PersistenceConfig struct {
	EnablePersistence  bool          `json:"enable_persistence"`
	StorageType        string        `json:"storage_type"` // "memory", "file", "redis", "postgres", "mysql"
	ConnectionString   string        `json:"connection_string"`
	MaxConnections     int           `json:"max_connections"`
	ConnectionTimeout  time.Duration `json:"connection_timeout"`
	RetentionPeriod    time.Duration `json:"retention_period"`
	CleanupInterval    time.Duration `json:"cleanup_interval"`
	BackupEnabled      bool          `json:"backup_enabled"`
	BackupInterval     time.Duration `json:"backup_interval"`
	BackupPath         string        `json:"backup_path"`
	CompressionEnabled bool          `json:"compression_enabled"`
	EncryptionEnabled  bool          `json:"encryption_enabled"`
	ReplicationEnabled bool          `json:"replication_enabled"`
	ReplicationNodes   []string      `json:"replication_nodes"`
}

PersistenceConfig contains data persistence configuration

func (*PersistenceConfig) UnmarshalJSON added in v0.0.16

func (p *PersistenceConfig) UnmarshalJSON(data []byte) error

type Plugin added in v0.0.11

type Plugin interface {
	Initialize(config interface{}) error
	BeforeTask(task *QueueTask)
	AfterTask(task *QueueTask, result Result)
}

Plugin is used to inject custom behavior before or after task processing.

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool represents the worker pool processing tasks.

func NewPool

func NewPool(numOfWorkers int, opts ...PoolOption) *Pool

NewPool creates and starts a new pool with the given number of workers.

func (*Pool) AddScheduledMetrics added in v0.0.16

func (wp *Pool) AddScheduledMetrics(total int)

func (*Pool) AdjustWorkerCount

func (wp *Pool) AdjustWorkerCount(newWorkerCount int)

func (*Pool) DLQ added in v0.0.11

func (wp *Pool) DLQ() *DeadLetterQueue

func (*Pool) Dispatch

func (wp *Pool) Dispatch(event func())

func (*Pool) EnqueueTask

func (wp *Pool) EnqueueTask(ctx context.Context, payload *Task, priority int) error

func (*Pool) FormattedMetrics added in v0.0.12

func (wp *Pool) FormattedMetrics() FormattedMetrics

FormattedMetrics returns a formatted version of the pool metrics.

func (*Pool) Init added in v0.0.16

func (wp *Pool) Init()

func (*Pool) Metrics

func (wp *Pool) Metrics() Metrics

func (*Pool) Pause

func (wp *Pool) Pause()

func (*Pool) Resume

func (wp *Pool) Resume()

func (*Pool) SetBatchSize

func (wp *Pool) SetBatchSize(size int)

func (*Pool) Start

func (wp *Pool) Start(numWorkers int)

func (*Pool) Stop

func (wp *Pool) Stop()

func (*Pool) UpdateConfig added in v0.0.11

func (wp *Pool) UpdateConfig(newConfig *DynamicConfig) error

UpdateConfig updates pool configuration via a POOL_UPDATE command.

type PoolConfig added in v0.0.16

type PoolConfig struct {
	MinWorkers               int           `json:"min_workers"`
	MaxWorkers               int           `json:"max_workers"`
	QueueSize                int           `json:"queue_size"`
	MaxMemoryLoad            int64         `json:"max_memory_load"`
	TaskTimeout              time.Duration `json:"task_timeout"`
	IdleWorkerTimeout        time.Duration `json:"idle_worker_timeout"`
	EnableDynamicScaling     bool          `json:"enable_dynamic_scaling"`
	ScalingFactor            float64       `json:"scaling_factor"`
	ScalingInterval          time.Duration `json:"scaling_interval"`
	MaxQueueWaitTime         time.Duration `json:"max_queue_wait_time"`
	EnableWorkStealing       bool          `json:"enable_work_stealing"`
	EnablePriorityScheduling bool          `json:"enable_priority_scheduling"`
	GracefulShutdownTimeout  time.Duration `json:"graceful_shutdown_timeout"`
}

PoolConfig contains worker pool configuration

func (*PoolConfig) UnmarshalJSON added in v0.0.16

func (p *PoolConfig) UnmarshalJSON(data []byte) error

type PoolOption

type PoolOption func(*Pool)

func WithBatchSize

func WithBatchSize(batchSize int) PoolOption

func WithCircuitBreaker added in v0.0.11

func WithCircuitBreaker(config CircuitBreakerConfig) PoolOption

func WithCompletionCallback

func WithCompletionCallback(callback func()) PoolOption

func WithDiagnostics added in v0.0.11

func WithDiagnostics(enabled bool) PoolOption

func WithGracefulShutdown added in v0.0.11

func WithGracefulShutdown(timeout time.Duration) PoolOption

func WithHandler

func WithHandler(handler Handler) PoolOption

func WithMaxMemoryLoad

func WithMaxMemoryLoad(maxMemoryLoad int64) PoolOption

func WithMetricsRegistry added in v0.0.11

func WithMetricsRegistry(registry MetricsRegistry) PoolOption

func WithPlugin added in v0.0.11

func WithPlugin(plugin Plugin) PoolOption

func WithPoolCallback

func WithPoolCallback(callback Callback) PoolOption

func WithTaskQueueSize

func WithTaskQueueSize(size int) PoolOption

func WithTaskStorage

func WithTaskStorage(storage TaskStorage) PoolOption

func WithTaskTimeout

func WithTaskTimeout(t time.Duration) PoolOption

func WithWarningThresholds added in v0.0.11

func WithWarningThresholds(thresholds ThresholdConfig) PoolOption

type PriorityQueue

type PriorityQueue []*QueueTask

func (PriorityQueue) Len

func (pq PriorityQueue) Len() int

func (PriorityQueue) Less

func (pq PriorityQueue) Less(i, j int) bool

func (*PriorityQueue) Pop

func (pq *PriorityQueue) Pop() interface{}

func (*PriorityQueue) Push

func (pq *PriorityQueue) Push(x interface{})

func (PriorityQueue) Swap

func (pq PriorityQueue) Swap(i, j int)

type Processor

type Processor interface {
	ProcessTask(ctx context.Context, msg *Task) Result
	Consume(ctx context.Context) error
	Pause(ctx context.Context) error
	Resume(ctx context.Context) error
	Stop(ctx context.Context) error
	Close() error
	GetKey() string
	SetKey(key string)
	GetType() string
}

type ProductionConfig added in v0.0.16

type ProductionConfig struct {
	Broker      BrokerConfig      `json:"broker"`
	Consumer    ConsumerConfig    `json:"consumer"`
	Publisher   PublisherConfig   `json:"publisher"`
	Pool        PoolConfig        `json:"pool"`
	Security    SecurityConfig    `json:"security"`
	Monitoring  MonitoringConfig  `json:"monitoring"`
	Persistence PersistenceConfig `json:"persistence"`
	Clustering  ClusteringConfig  `json:"clustering"`
	RateLimit   RateLimitConfig   `json:"rate_limit"`
	LastUpdated time.Time         `json:"last_updated"`
}

ProductionConfig contains all production configuration

func DefaultProductionConfig added in v0.0.16

func DefaultProductionConfig() *ProductionConfig

DefaultProductionConfig returns default production configuration

func (*ProductionConfig) UnmarshalJSON added in v0.0.16

func (c *ProductionConfig) UnmarshalJSON(data []byte) error

Custom unmarshaling to handle duration strings

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

func NewPublisher

func NewPublisher(id string, opts ...Option) *Publisher

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, task Task, queue string) error

Publish method that uses the persistent connection.

func (*Publisher) Request

func (p *Publisher) Request(ctx context.Context, task Task, queue string) Result

type PublisherConfig added in v0.0.16

type PublisherConfig struct {
	MaxRetries            int           `json:"max_retries"`
	InitialDelay          time.Duration `json:"initial_delay"`
	MaxBackoff            time.Duration `json:"max_backoff"`
	JitterPercent         float64       `json:"jitter_percent"`
	ConnectionPoolSize    int           `json:"connection_pool_size"`
	PublishTimeout        time.Duration `json:"publish_timeout"`
	EnableBatching        bool          `json:"enable_batching"`
	BatchSize             int           `json:"batch_size"`
	BatchTimeout          time.Duration `json:"batch_timeout"`
	EnableCompression     bool          `json:"enable_compression"`
	CompressionLevel      int           `json:"compression_level"`
	EnableAsync           bool          `json:"enable_async"`
	AsyncBufferSize       int           `json:"async_buffer_size"`
	EnableOrderedDelivery bool          `json:"enable_ordered_delivery"`
}

PublisherConfig contains publisher-specific configuration

func (*PublisherConfig) UnmarshalJSON added in v0.0.16

func (p *PublisherConfig) UnmarshalJSON(data []byte) error

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

type QueueConfig added in v0.0.16

type QueueConfig struct {
	MaxDepth       int           `json:"max_depth"`
	MaxRetries     int           `json:"max_retries"`
	MessageTTL     time.Duration `json:"message_ttl"`
	DeadLetter     bool          `json:"dead_letter"`
	Persistent     bool          `json:"persistent"`
	BatchSize      int           `json:"batch_size"`
	Priority       int           `json:"priority"`
	OrderedMode    bool          `json:"ordered_mode"`
	Throttling     bool          `json:"throttling"`
	ThrottleRate   int           `json:"throttle_rate"`
	ThrottleBurst  int           `json:"throttle_burst"`
	CompactionMode bool          `json:"compaction_mode"`
}

QueueConfig holds configuration for a specific queue

type QueueMetrics added in v0.0.16

type QueueMetrics struct {
	MessagesReceived  int64         `json:"messages_received"`
	MessagesProcessed int64         `json:"messages_processed"`
	MessagesFailed    int64         `json:"messages_failed"`
	CurrentDepth      int64         `json:"current_depth"`
	AverageLatency    time.Duration `json:"average_latency"`
	LastActivity      time.Time     `json:"last_activity"`
}

QueueMetrics holds metrics for a specific queue

type QueueOption added in v0.0.16

type QueueOption func(*QueueConfig)

QueueOption defines options for queue configuration

func WithDeadLetter added in v0.0.16

func WithDeadLetter() QueueOption

WithDeadLetter enables dead letter queue for failed messages

func WithPersistent added in v0.0.16

func WithPersistent() QueueOption

WithPersistent enables message persistence

func WithQueueMaxDepth added in v0.0.16

func WithQueueMaxDepth(maxDepth int) QueueOption

WithQueueMaxDepth sets the maximum queue depth

func WithQueueMaxRetries added in v0.0.16

func WithQueueMaxRetries(maxRetries int) QueueOption

WithQueueMaxRetries sets the maximum retries for queue messages

func WithQueueOption added in v0.0.16

func WithQueueOption(config QueueConfig) QueueOption

WithQueueOption creates a queue with specific configuration

func WithQueueTTL added in v0.0.16

func WithQueueTTL(ttl time.Duration) QueueOption

WithQueueTTL sets the message TTL for the queue

type QueueTask

type QueueTask struct {
	// contains filtered or unexported fields
}

type QueuedTask

type QueuedTask struct {
	Message    *codec.Message
	RetryCount int
}

type RateLimitConfig added in v0.0.16

type RateLimitConfig struct {
	EnableBrokerRateLimit    bool `json:"enable_broker_rate_limit"`
	BrokerRate               int  `json:"broker_rate"` // requests per second
	BrokerBurst              int  `json:"broker_burst"`
	EnableConsumerRateLimit  bool `json:"enable_consumer_rate_limit"`
	ConsumerRate             int  `json:"consumer_rate"`
	ConsumerBurst            int  `json:"consumer_burst"`
	EnablePublisherRateLimit bool `json:"enable_publisher_rate_limit"`
	PublisherRate            int  `json:"publisher_rate"`
	PublisherBurst           int  `json:"publisher_burst"`
	EnablePerQueueRateLimit  bool `json:"enable_per_queue_rate_limit"`
	PerQueueRate             int  `json:"per_queue_rate"`
	PerQueueBurst            int  `json:"per_queue_burst"`
}

RateLimitConfig contains rate limiting configuration

type RateLimiter added in v0.0.11

type RateLimiter struct {
	C chan struct{}
	// contains filtered or unexported fields
}

RateLimiter implementation

func NewRateLimiter added in v0.0.11

func NewRateLimiter(rate int, burst int) *RateLimiter

NewRateLimiter creates a new RateLimiter with the specified rate and burst.

func (*RateLimiter) Stop added in v0.0.12

func (rl *RateLimiter) Stop()

Stop terminates the rate limiter's internal goroutine.

func (*RateLimiter) Update added in v0.0.12

func (rl *RateLimiter) Update(newRate, newBurst int)

Update allows dynamic adjustment of rate and burst at runtime. It immediately applies the new settings.

func (*RateLimiter) Wait added in v0.0.11

func (rl *RateLimiter) Wait()

Wait blocks until a token is available.

type Result

type Result struct {
	CreatedAt       time.Time       `json:"created_at"`
	ProcessedAt     time.Time       `json:"processed_at,omitempty"`
	Latency         string          `json:"latency"`
	Error           error           `json:"-"` // Keep error as an error type
	Topic           string          `json:"topic"`
	TaskID          string          `json:"task_id"`
	Status          Status          `json:"status"`
	ConditionStatus string          `json:"condition_status"`
	Ctx             context.Context `json:"-"`
	Payload         json.RawMessage `json:"payload"`
	Last            bool
}

func HandleError

func HandleError(ctx context.Context, err error, status ...Status) Result

func (Result) MarshalJSON

func (r Result) MarshalJSON() ([]byte, error)

func (Result) Unmarshal

func (r Result) Unmarshal(data any) error

func (*Result) UnmarshalJSON

func (r *Result) UnmarshalJSON(data []byte) error

func (Result) WithData

func (r Result) WithData(status Status, data []byte) Result

type Schedule

type Schedule struct {
	TimeOfDay  time.Time
	CronSpec   string
	DayOfWeek  []time.Weekday
	DayOfMonth []int
	Interval   time.Duration
	Recurring  bool
}

func (*Schedule) ToHumanReadable

func (s *Schedule) ToHumanReadable() string

type ScheduleOptions

type ScheduleOptions struct {
	Handler      Handler
	Callback     Callback
	Interval     time.Duration
	Overlap      bool
	Recurring    bool
	ScheduleSpec string
}

type ScheduledTask

type ScheduledTask struct {
	// contains filtered or unexported fields
}

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler(pool *Pool, opts ...SchedulerOpt) *Scheduler

func (*Scheduler) AddTask

func (s *Scheduler) AddTask(ctx context.Context, payload *Task, opts ...SchedulerOption) string

func (*Scheduler) Close added in v0.0.16

func (s *Scheduler) Close() error

func (*Scheduler) ListScheduledTasks added in v0.0.16

func (s *Scheduler) ListScheduledTasks() []TaskInfo

func (*Scheduler) PrintAllTasks

func (s *Scheduler) PrintAllTasks()

func (*Scheduler) PrintExecutionHistory

func (s *Scheduler) PrintExecutionHistory(id string) error

func (*Scheduler) RemoveTask

func (s *Scheduler) RemoveTask(id string) error

func (*Scheduler) Start

func (s *Scheduler) Start()

func (*Scheduler) UpdateTask added in v0.0.16

func (s *Scheduler) UpdateTask(id string, newSched *Schedule) error

type SchedulerConfig

type SchedulerConfig struct {
	Callback   Callback
	Overlap    bool
	MaxRetries int
}

type SchedulerOpt added in v0.0.16

type SchedulerOpt func(*Scheduler)

func WithStorage added in v0.0.16

func WithStorage(sm storage.IMap[string, *ScheduledTask]) SchedulerOpt

type SchedulerOption

type SchedulerOption func(*ScheduleOptions)

func WithInterval

func WithInterval(interval time.Duration) SchedulerOption

func WithOverlap

func WithOverlap() SchedulerOption

func WithRecurring

func WithRecurring() SchedulerOption

func WithScheduleSpec added in v0.0.16

func WithScheduleSpec(spec string) SchedulerOption

func WithSchedulerCallback

func WithSchedulerCallback(callback Callback) SchedulerOption

func WithSchedulerHandler

func WithSchedulerHandler(handler Handler) SchedulerOption

type SecurityConfig added in v0.0.16

type SecurityConfig struct {
	EnableTLS             bool          `json:"enable_tls"`
	TLSCertPath           string        `json:"tls_cert_path"`
	TLSKeyPath            string        `json:"tls_key_path"`
	TLSCAPath             string        `json:"tls_ca_path"`
	TLSInsecureSkipVerify bool          `json:"tls_insecure_skip_verify"`
	EnableAuthentication  bool          `json:"enable_authentication"`
	AuthenticationMethod  string        `json:"authentication_method"` // "basic", "jwt", "oauth"
	EnableAuthorization   bool          `json:"enable_authorization"`
	EnableEncryption      bool          `json:"enable_encryption"`
	EncryptionKey         string        `json:"encryption_key"`
	EnableAuditLog        bool          `json:"enable_audit_log"`
	AuditLogPath          string        `json:"audit_log_path"`
	SessionTimeout        time.Duration `json:"session_timeout"`
	MaxLoginAttempts      int           `json:"max_login_attempts"`
	LockoutDuration       time.Duration `json:"lockout_duration"`
}

SecurityConfig contains security-related configuration

func (*SecurityConfig) UnmarshalJSON added in v0.0.16

func (s *SecurityConfig) UnmarshalJSON(data []byte) error

type Status added in v0.0.2

type Status string
const (
	Pending    Status = "Pending"
	Processing Status = "Processing"
	Completed  Status = "Completed"
	Failed     Status = "Failed"
	Cancelled  Status = "Cancelled"
)

type StoredMessage added in v0.0.16

type StoredMessage struct {
	ID        string                 `json:"id"`
	Queue     string                 `json:"queue"`
	Payload   []byte                 `json:"payload"`
	Headers   map[string]string      `json:"headers,omitempty"`
	Metadata  map[string]interface{} `json:"metadata,omitempty"`
	Priority  int                    `json:"priority"`
	CreatedAt time.Time              `json:"created_at"`
	ExpiresAt *time.Time             `json:"expires_at,omitempty"`
	Attempts  int                    `json:"attempts"`
}

StoredMessage represents a message stored in the message store

type SystemHealthChecker added in v0.0.16

type SystemHealthChecker struct {
	// contains filtered or unexported fields
}

SystemHealthChecker monitors system health

func NewSystemHealthChecker added in v0.0.16

func NewSystemHealthChecker(logger logger.Logger) *SystemHealthChecker

NewSystemHealthChecker creates a new system health checker

func (*SystemHealthChecker) GetOverallHealth added in v0.0.16

func (shc *SystemHealthChecker) GetOverallHealth() HealthStatus

GetOverallHealth returns the overall system health

func (*SystemHealthChecker) RegisterCheck added in v0.0.16

func (shc *SystemHealthChecker) RegisterCheck(check HealthCheck)

RegisterCheck registers a health check

func (*SystemHealthChecker) RunChecks added in v0.0.16

func (shc *SystemHealthChecker) RunChecks(ctx context.Context) map[string]*HealthCheckResult

RunChecks runs all health checks

type TLSConfig

type TLSConfig struct {
	CertPath string
	KeyPath  string
	CAPath   string
	UseTLS   bool
}

type Task

type Task struct {
	CreatedAt   time.Time       `json:"created_at"`
	ProcessedAt time.Time       `json:"processed_at"`
	Expiry      time.Time       `json:"expiry"`
	Error       error           `json:"-"`               // Don't serialize errors directly
	ErrorMsg    string          `json:"error,omitempty"` // Serialize error message if present
	ID          string          `json:"id"`
	Topic       string          `json:"topic"`
	Status      Status          `json:"status"` // Use Status type instead of string
	Payload     json.RawMessage `json:"payload"`
	Priority    int             `json:"priority,omitempty"`
	Retries     int             `json:"retries,omitempty"`
	MaxRetries  int             `json:"max_retries,omitempty"`

	// Enhanced deduplication and tracing
	DedupKey string            `json:"dedup_key,omitempty"`
	TraceID  string            `json:"trace_id,omitempty"`
	SpanID   string            `json:"span_id,omitempty"`
	Tags     map[string]string `json:"tags,omitempty"`
	Headers  map[string]string `json:"headers,omitempty"`
	// contains filtered or unexported fields
}

func NewTask

func NewTask(id string, payload json.RawMessage, nodeKey string, opts ...TaskOption) *Task

func (*Task) AddHeader added in v0.0.16

func (t *Task) AddHeader(key, value string)

AddHeader adds a header to the task

func (*Task) AddTag added in v0.0.16

func (t *Task) AddTag(key, value string)

AddTag adds a tag to the task

func (*Task) CanRetry added in v0.0.16

func (t *Task) CanRetry() bool

CanRetry checks if the task can be retried

func (*Task) GetError added in v0.0.16

func (t *Task) GetError() error

GetError returns the error if present

func (*Task) GetFlow added in v0.0.10

func (t *Task) GetFlow() any

func (*Task) IncrementRetry added in v0.0.16

func (t *Task) IncrementRetry()

IncrementRetry increments the retry count

func (*Task) IsExpired added in v0.0.16

func (t *Task) IsExpired() bool

IsExpired checks if the task has expired

func (*Task) SetError added in v0.0.16

func (t *Task) SetError(err error)

SetError sets the error and updates the error message

type TaskInfo added in v0.0.16

type TaskInfo struct {
	TaskID      string    `json:"task_id"`
	NextRunTime time.Time `json:"next_run_time"`
}

type TaskOption added in v0.0.10

type TaskOption func(*Task)

TaskOption defines a function type for setting options.

func WithDAG added in v0.0.10

func WithDAG(dag any) TaskOption

func WithDedupKey added in v0.0.16

func WithDedupKey(key string) TaskOption

new TaskOption for deduplication:

func WithExpiry added in v0.0.16

func WithExpiry(expiry time.Time) TaskOption

TaskOption for setting expiry time

func WithPriority added in v0.0.16

func WithPriority(priority int) TaskOption

TaskOption for setting priority

func WithTTL added in v0.0.16

func WithTTL(ttl time.Duration) TaskOption

TaskOption for setting TTL (time to live)

func WithTags added in v0.0.16

func WithTags(tags map[string]string) TaskOption

TaskOption for adding tags

func WithTaskHeaders added in v0.0.16

func WithTaskHeaders(headers map[string]string) TaskOption

TaskOption for adding headers

func WithTaskMaxRetries added in v0.0.16

func WithTaskMaxRetries(maxRetries int) TaskOption

TaskOption for setting max retries

func WithTraceID added in v0.0.16

func WithTraceID(traceID string) TaskOption

TaskOption for setting trace ID

type TaskStorage

type TaskStorage interface {
	SaveTask(task *QueueTask) error
	GetTask(taskID string) (*QueueTask, error)
	DeleteTask(taskID string) error
	GetAllTasks() ([]*QueueTask, error)
	FetchNextTask() (*QueueTask, error)
	CleanupExpiredTasks() error
}

type ThresholdConfig added in v0.0.11

type ThresholdConfig struct {
	HighMemory    int64
	LongExecution time.Duration
}

type TimeSeries added in v0.0.16

type TimeSeries struct {
	Name        string            `json:"name"`
	Type        MetricType        `json:"type"`
	Description string            `json:"description"`
	Labels      map[string]string `json:"labels"`
	Values      []TimeSeriesPoint `json:"values"`
	MaxPoints   int               `json:"max_points"`
	// contains filtered or unexported fields
}

TimeSeries represents a time series metric

type TimeSeriesPoint added in v0.0.16

type TimeSeriesPoint struct {
	Timestamp time.Time `json:"timestamp"`
	Value     float64   `json:"value"`
}

TimeSeriesPoint represents a single point in a time series

type WarningThresholds added in v0.0.11

type WarningThresholds struct {
	HighMemory    int64         // in bytes
	LongExecution time.Duration // threshold duration
}

WarningThresholds defines thresholds for warnings.

Directories

Path Synopsis
apperror/apperror.go
apperror/apperror.go
context_keys.go
context_keys.go
main.go
main.go
internal
phuslulog.go
phuslulog.go

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL