Versions in this module Expand all Collapse all v0 v0.1.0 Apr 22, 2026 Changes in this version + const DeadLetterContentType + const ErrorContentType + const EventConnect + const EventDisconnect + const EventPublishFailed + const EventSubscribeFailed + const EventTimeout + var ErrAlreadyConnected = errors.New("broker is already connected") + var ErrClosed = errors.New("broker is closed") + var ErrInvalidConfig = errors.New("invalid configuration") + var ErrNoReplyTo = errors.New("no reply-to destination specified") + func AvailableBackends() []string + func EncodeErrorPayload(payload ErrorPayload) ([]byte, error) + func IsBackendAvailable(name string) bool + func IsCircuitOpen(err error) bool + func IsConnectionLost(err error) bool + func IsNotConnected(err error) bool + func IsTimeout(err error) bool + func IsUnknownBackend(err error) bool + func IsUnsupportedOperation(err error) bool + func Register(name string, factory BrokerFactory) + func RetryAttempt(ctx context.Context) int + type AMQPConfig struct + Exchange string + ExchangeType string + Heartbeat time.Duration + Host string + Password string + Port int + QueueAutoDelete bool + QueueDurable bool + QueueExclusive bool + TLS *TLSConfig + Username string + VHost string + func DefaultAMQPConfig() *AMQPConfig + type BackoffStrategy func(attempt int) time.Duration + func ExponentialBackoff(initialDelay, maxDelay time.Duration, multiplier float64) BackoffStrategy + func FixedBackoff(delay time.Duration) BackoffStrategy + type BrokerFactory func(config *Config) (MessageBroker, error) + type Caller interface + Call func(ctx context.Context, destination string, message *Message, ...) (*Message, error) + type Client interface + type Config struct + AMQP *AMQPConfig + Backend string + ConnectionName string + ConnectionRetry int + EventHook EventHook + HealthHook HealthHook + HealthReporter HealthReporter + Host string + Kafka *KafkaConfig + Logger EventLogger + Metrics MetricsHook + Password string + Port int + RetryDelay time.Duration + Tracing TracingHook + Username string + VHost string + func DefaultConfig() *Config + func (c *Config) EmitCounter(name string, value float64, labels map[string]string) + func (c *Config) EmitDuration(name string, value time.Duration, labels map[string]string) + func (c *Config) EmitEvent(ctx context.Context, event Event) + func (c *Config) EmitHealth(ctx context.Context, report HealthReport) + func (c *Config) StartSpan(ctx context.Context, span TraceSpanStart) (context.Context, TraceSpan) + func (c *Config) WithAMQP(amqp *AMQPConfig) *Config + func (c *Config) WithBackend(backend string) *Config + func (c *Config) WithKafka(kafka *KafkaConfig) *Config + type Connector interface + Backend func() string + Close func() error + Connect func(ctx context.Context) error + IsConnected func() bool + IsRecovering func() bool + type DeadLetterEnvelope struct + Attempt int + Backend string + Destination string + Error ErrorPayload + FailedAt time.Time + Handler string + Original DeadLetterMessage + func DecodeDeadLetterMessage(msg *Message) (DeadLetterEnvelope, error) + func NewDeadLetterEnvelope(msg *Message, err error, opts DeadLetterOptions) DeadLetterEnvelope + func (d DeadLetterEnvelope) Message() (*Message, error) + type DeadLetterMessage struct + Body []byte + ContentType string + CorrelationID string + Headers map[string]string + MessageID string + Offset int64 + Partition int32 + ReplyTo string + Subject string + Timestamp time.Time + type DeadLetterOptions struct + Attempt int + Backend string + Code string + Destination string + Details map[string]any + Handler string + Retryable bool + type ErrCircuitOpen struct + Operation string + RetryAfter string + func (e *ErrCircuitOpen) Error() string + type ErrConnectionFailed struct + Address string + Backend string + Cause error + func (e *ErrConnectionFailed) Error() string + func (e *ErrConnectionFailed) Unwrap() error + type ErrConnectionLost struct + Backend string + Cause error + func (e *ErrConnectionLost) Error() string + func (e *ErrConnectionLost) Unwrap() error + type ErrNotConnected struct + Backend string + func (e *ErrNotConnected) Error() string + type ErrPublishFailed struct + Backend string + Cause error + Destination string + func (e *ErrPublishFailed) Error() string + func (e *ErrPublishFailed) Unwrap() error + type ErrSubscribeFailed struct + Backend string + Cause error + Destination string + func (e *ErrSubscribeFailed) Error() string + func (e *ErrSubscribeFailed) Unwrap() error + type ErrTimeout struct + Duration string + Operation string + func (e *ErrTimeout) Error() string + type ErrUnknownBackend struct + Backend string + func (e *ErrUnknownBackend) Error() string + type ErrUnsupportedOperation struct + Backend string + Operation string + func (e *ErrUnsupportedOperation) Error() string + type ErrorPayload struct + Code string + CorrelationID string + Details map[string]any + Message string + Retryable bool + Timestamp time.Time + func DecodeErrorMessage(msg *Message) (ErrorPayload, error) + func DecodeErrorPayload(data []byte) (ErrorPayload, error) + func NewErrorPayload(code string, err error) ErrorPayload + type Event struct + Backend string + Component string + CorrelationID string + Destination string + Err error + Fields map[string]any + Level EventLevel + Name string + Operation string + Timestamp time.Time + type EventHook func(ctx context.Context, event Event) + type EventLevel string + const EventLevelDebug + const EventLevelError + const EventLevelInfo + const EventLevelWarn + type EventLogger interface + LogEvent func(ctx context.Context, event Event) + type Handler func(ctx context.Context, msg *Message) error + func RetryHandler(handler Handler, policy RetryPolicy) Handler + type HandlerErrorPolicy string + const HandlerErrorNoRetry + const HandlerErrorRetry + type HealthHook func(ctx context.Context, report HealthReport) + type HealthReport struct + Backend string + Component string + Connected bool + Details map[string]any + Started bool + Status HealthStatus + Timestamp time.Time + type HealthReporter interface + ReportHealth func(ctx context.Context, report HealthReport) + type HealthStatus string + const HealthStatusDegraded + const HealthStatusHealthy + const HealthStatusUnhealthy + type KafkaConfig struct + AutoOffsetReset string + Brokers []string + ClientID string + CompressionType string + ConsumerGroup string + HeartbeatInterval time.Duration + MaxRetries int + RequiredAcks int + RetryBackoff time.Duration + SASL *SASLConfig + SessionTimeout time.Duration + TLS *TLSConfig + func DefaultKafkaConfig() *KafkaConfig + type Message struct + Body []byte + ContentType string + CorrelationID string + Headers map[string]string + MessageID string + Offset int64 + Partition int32 + ReplyTo string + Subject string + Timestamp time.Time + func NewErrorMessage(payload ErrorPayload) (*Message, error) + func NewMessage(body []byte) *Message + func NewTextMessage(body string) *Message + func (m *Message) BodyString() string + func (m *Message) Clone() *Message + func (m *Message) GetHeader(key string) string + func (m *Message) WithContentType(contentType string) *Message + func (m *Message) WithCorrelationID(id string) *Message + func (m *Message) WithHeader(key, value string) *Message + func (m *Message) WithReplyTo(replyTo string) *Message + func (m *Message) WithSubject(subject string) *Message + type MessageBroker interface + func MustNew(config *Config) MessageBroker + func New(config *Config) (MessageBroker, error) + func NewWithBackend(backend string, config *Config) (MessageBroker, error) + type MetricsHook interface + IncrementCounter func(name string, value float64, labels map[string]string) + ObserveDuration func(name string, value time.Duration, labels map[string]string) + type PublishOption func(*PublishOptions) + func WithExchange(exchange string) PublishOption + func WithExpiration(ttl string) PublishOption + func WithKey(key string) PublishOption + func WithMandatory() PublishOption + func WithPartition(partition int) PublishOption + func WithPersistent() PublishOption + func WithPriority(priority uint8) PublishOption + func WithTimeout(timeout time.Duration) PublishOption + type PublishOptions struct + DeliveryMode uint8 + Exchange string + Expiration string + Immediate bool + Key string + Mandatory bool + Partition int + Priority uint8 + Timeout time.Duration + func ApplyPublishOptions(opts ...PublishOption) *PublishOptions + type Publisher interface + Publish func(ctx context.Context, destination string, message *Message, ...) error + type RetryHook func(ctx context.Context, attempt int, err error, delay time.Duration) + type RetryPolicy struct + Backoff BackoffStrategy + MaxAttempts int + OnRetry RetryHook + Retryable RetryPredicate + func DefaultRetryPolicy() RetryPolicy + type RetryPredicate func(error) bool + type SASLConfig struct + Enable bool + Mechanism string + Password string + Username string + type Server interface + type SubscribeOption func(*SubscribeOptions) + func WithAutoAck() SubscribeOption + func WithConsumerGroup(group string) SubscribeOption + func WithConsumerTag(tag string) SubscribeOption + func WithExclusive() SubscribeOption + func WithHandlerErrorNoRetry() SubscribeOption + func WithHandlerErrorRetry() SubscribeOption + func WithPrefetchCount(count int) SubscribeOption + func WithQueueBind(exchange, routingKey string) SubscribeOption + func WithStartFromBeginning() SubscribeOption + type SubscribeOptions struct + AutoAck bool + ConsumerGroup string + ConsumerTag string + Exchange string + Exclusive bool + HandlerErrorPolicy HandlerErrorPolicy + PrefetchCount int + QueueBind bool + RoutingKey string + StartFromBeginning bool + func ApplySubscribeOptions(opts ...SubscribeOption) *SubscribeOptions + type Subscriber interface + Subscribe func(ctx context.Context, destination string, handler Handler, ...) error + type TLSConfig struct + CAFile string + CertFile string + Enable bool + InsecureSkipVerify bool + KeyFile string + type TraceSpan interface + Finish func(finish TraceSpanFinish) + type TraceSpanFinish struct + Attributes map[string]string + Err error + type TraceSpanStart struct + Attributes map[string]string + Backend string + Component string + CorrelationID string + Destination string + Name string + Operation string + type TracingHook interface + StartSpan func(ctx context.Context, span TraceSpanStart) (context.Context, TraceSpan)