eamqp

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 20, 2026 License: Apache-2.0 Imports: 23 Imported by: 0

README

eamqp - RabbitMQ Component for Ego Framework

eamqp is a RabbitMQ client component for the Ego framework. It keeps the official amqp091-go connection/channel model visible, while adding Ego-style configuration loading and injectable logging/metrics hooks.

See CAPABILITY_MATRIX.md for the current support matrix and the boundary between the base AMQP wrapper, Ego observability, and future consumer lifecycle work.

Features

  • AMQP 0-9-1 support via amqp091-go thin wrappers and raw accessors
  • Ego config-file loading with eamqp.Load(...).Build()
  • Connection pooling and channel pooling
  • Reconnect policy primitives and manual reconnect
  • Publisher confirms for reliable message delivery
  • TLS/SSL support (programmatic and file-based certificates)
  • Multiple URIs for basic load balancing
  • Ego elog/emetric adapters when loaded through Load(...).Build()
  • AMQP header trace propagation helpers
  • Lightweight health status and ping checks
  • High-level producer and consumer helpers

Installation

go get github.com/ego-component/eamqp

Quick Start

package main

import (
	"log"

	"github.com/ego-component/eamqp"
	amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
	// Create client.
	client, err := eamqp.New(eamqp.Config{
		Addr: "amqp://guest:guest@localhost:5672/",
	})
	if err != nil {
		log.Fatal(err)
	}
	defer client.Close()

	// Create channel.
	ch, err := client.NewChannel()
	if err != nil {
		log.Fatal(err)
	}
	defer ch.Close()

	// Declare exchange and queue.
	if err := ch.ExchangeDeclare("my-exchange", "topic", true, false, false, false, nil); err != nil {
		log.Fatal(err)
	}

	q, err := ch.QueueDeclare("my-queue", true, false, false, false, nil)
	if err != nil {
		log.Fatal(err)
	}

	if err := ch.QueueBind(q.Name, "order.*", "my-exchange", false, nil); err != nil {
		log.Fatal(err)
	}

	// Publish a message.
	err = ch.Publish("my-exchange", "order.created", false, false, amqp.Publishing{
		ContentType: "application/json",
		Body:        []byte(`{"order_id": 123}`),
	})
	if err != nil {
		log.Fatal(err)
	}
}

Configuration

For Ego config-file usage with eamqp.Load("amqp.default").Build(), see CONFIGURATION.md.

config := eamqp.Config{
	// Connection URI(s). Multiple URIs separated by comma enable load balancing.
	Addr: "amqp://guest:guest@localhost:5672/",

	// TLS configuration.
	TLSConfig: &tls.Config{ServerName: "localhost"},

	// Or use file-based TLS.
	TLSCertFile: "/path/to/cert.pem",
	TLSKeyFile:  "/path/to/key.pem",
	TLSCACert:   "/path/to/ca.pem",

	// Tuning.
	Heartbeat: 10 * time.Second,
	ChannelMax: 0,   // 0 = server default
	FrameSize:  0,   // 0 = server default

	// Connection pool.
	PoolSize:    1,   // Number of connections
	PoolMaxIdle: 2,
	PoolMaxLife: time.Hour,

	// Channel pool (per connection).
	ChannelPoolSize:    1,
	ChannelPoolMaxIdle: 2,
	ChannelPoolMaxLife: 5 * time.Minute,

	// Explicit reconnect helper policy. No background topology/consumer
	// supervisor is started by this component.
	ReconnectInterval:    5 * time.Second,
	ReconnectMaxAttempts: 0,

	// Observability.
	EnableAccessInterceptor: false,
	EnableMetricInterceptor: true,
	EnableTraceInterceptor:  true,
}

Connection Pooling

// Enable connection pooling for high availability.
client, err := eamqp.New(eamqp.Config{
	// Multiple URIs enable basic load balancing across connections.
	Addr: "amqp://localhost:5672,amqp://localhost:5673",
	PoolSize: 2,  // Creates 2 connections
})

Channel Pooling

// Enable channel pooling for efficient channel reuse.
client, err := eamqp.New(eamqp.Config{
	Addr: "amqp://localhost:5672",
	ChannelPoolSize: 10,  // Pool of 10 channels per connection
})

Channel wraps the official amqp091-go channel. Wrapper methods only protect eamqp state briefly; AMQP operation serialization is delegated to amqp091-go. If you use RawChannel(), keep the official rule: a raw channel is not safe to share across goroutines without your own synchronization. Pooled channels that enter stateful modes such as confirm, QoS, transactions, consumption, notify listeners, or raw access are closed on Close() instead of being returned to the idle pool.

Publisher Confirms

// Enable confirms for reliable publishing.
if err := ch.Confirm(false); err != nil {
	log.Fatal(err)
}

confirms := ch.NotifyPublish()
// Always consume NotifyPublish/NotifyClose style channels until they close.

// Publish with confirmation.
err = ch.Publish("exchange", "key", false, false, amqp.Publishing{
	Body: []byte("message"),
})
if err != nil {
	log.Fatal(err)
}

// Wait for confirmation.
confirm := <-confirms
if !confirm.Ack {
	log.Printf("Message rejected")
}

Producer Helper

// Create producer with confirms.
producer, err := eamqp.NewProducer(ch, eamqp.WithConfirm(5*time.Second))
if err != nil {
	log.Fatal(err)
}

// Publish with automatic confirm waiting.
err = producer.Publish("exchange", "key", amqp.Publishing{
	Body: []byte("message"),
})

Consumer Helper

// Create consumer.
consumer := eamqp.NewConsumer(ch, "my-queue",
	eamqp.WithConsumerAutoAck(),  // Enable auto-ack
)

// Start consuming.
deliveries, err := consumer.Consume("consumer-1")
if err != nil {
	log.Fatal(err)
}

for delivery := range deliveries {
	fmt.Println(string(delivery.Body))
	delivery.Ack(false)
}

Exchange Types

// Direct exchange - routing by exact key match.
ch.ExchangeDeclare("direct-exchange", eamqp.ExchangeDirect, true, false, false, false, nil)

// Fanout exchange - routing to all bound queues.
ch.ExchangeDeclare("fanout-exchange", eamqp.ExchangeFanout, true, false, false, false, nil)

// Topic exchange - routing by pattern matching.
ch.ExchangeDeclare("topic-exchange", eamqp.ExchangeTopic, true, false, false, false, nil)

// Headers exchange - routing by header values.
ch.ExchangeDeclare("headers-exchange", eamqp.ExchangeHeaders, true, false, false, false, nil)

Queue Types

// Classic queue.
q, _ := ch.QueueDeclare("my-classic-queue", true, false, false, false, nil)

// Quorum queue (recommended for most use cases).
q, _ := ch.QueueDeclare("my-quorum-queue", true, false, false, false, amqp.Table{
	eamqp.QueueTypeArg: eamqp.QueueTypeQuorum,
})

// Stream queue (for replay-style scenarios).
q, _ := ch.QueueDeclare("my-stream-queue", true, false, false, false, amqp.Table{
	eamqp.QueueTypeArg: eamqp.QueueTypeStream,
})

Error Handling

// Errors are wrapped with context.
ch.Publish("exchange", "key", false, false, amqp.Publishing{Body: []byte("msg")})

// Check if error is retryable.
if err != nil {
	if e, ok := err.(*eamqp.Error); ok && e.IsRetryable() {
		// Can retry
	}
}

Logging and Metrics

Load(...).Build() injects Ego elog, emetric, and trace adapters according to enableAccessInterceptor, enableMetricInterceptor, and enableTraceInterceptor. Direct New(...) usage can inject custom hooks explicitly:

// Implement the Logger interface.
type MyLogger struct{}

func (l *MyLogger) Debug(msg string, keyvals ...any) { /* ... */ }
func (l *MyLogger) Info(msg string, keyvals ...any)  { /* ... */ }
func (l *MyLogger) Warn(msg string, keyvals ...any)  { /* ... */ }
func (l *MyLogger) Error(msg string, keyvals ...any) { /* ... */ }

// Implement the MetricsCollector interface.
type MyMetrics struct{}

func (m *MyMetrics) RecordPublishLatency(d time.Duration) { /* ... */ }
func (m *MyMetrics) RecordMessageConsumed(size int)       { /* ... */ }
// ... implement all methods

client, err := eamqp.New(cfg,
	eamqp.WithLogger(&MyLogger{}),
	eamqp.WithMetrics(&MyMetrics{}),
)

Trace Headers

PublishWithContext injects the active Ego/OpenTelemetry trace context into AMQP headers when a global Ego tracer is registered. With amqp091-go v1.9.0, the context should not be treated as a guaranteed publish timeout or cancellation mechanism. Consumer code can extract the context explicitly from a delivery:

ctx = eamqp.ExtractTraceContext(ctx, delivery.Headers)

For custom publishing paths, use:

msg.Headers = eamqp.InjectTraceHeaders(ctx, msg.Headers)

Health Checks

HealthStatus() is a lightweight in-memory check based on the current connection or connection pool state. Ping(ctx) opens and closes an AMQP channel, so use it when readiness must prove that RabbitMQ is accepting channel operations:

if err := client.Ping(ctx); err != nil {
	return err
}

health := client.HealthStatus()
if !client.Health() {
	log.Printf("amqp unhealthy: %s", health.Reason)
}

RabbitMQ blocked/unblocked events are still exposed separately through NotifyBlocked(). Reconnect() is explicit; this component does not silently rebuild channels, consumers, or topology in the background.

Raw AMQP Access

The wrapper exposes raw accessors so uncommon amqp091-go features are not blocked by eamqp:

conn := client.RawConnection()
rawCh := ch.RawChannel()

Examples

Examples use Ego-style config loading through eamqp.Load(...).Build(). Start RabbitMQ locally, then run examples with the provided config file:

go run ./examples/producer --config=examples/config/local.toml
go run ./examples/consumer --config=examples/config/local.toml
go run ./examples/pubsub --config=examples/config/local.toml
go run ./examples/connection-pool --config=examples/config/local.toml
go run ./examples/producer-confirm --config=examples/config/local.toml
go run ./examples/batch-producer --config=examples/config/local.toml
go run ./examples/transaction --config=examples/config/local.toml
go run ./examples/workqueue-publisher --config=examples/config/local.toml
go run ./examples/workqueue-worker --config=examples/config/local.toml
go run ./examples/rpc --config=examples/config/local.toml
go run ./examples/qos --config=examples/config/local.toml publish
go run ./examples/qos --config=examples/config/local.toml inspect
go run ./examples/dead-letter --config=examples/config/local.toml publisher
go run ./examples/dead-letter --config=examples/config/local.toml inspect-dlq
go run ./examples/retry-consumer-sender --config=examples/config/local.toml
go run ./examples/pubsub-fanout --config=examples/config/local.toml publish
go run ./examples/reconnect --config=examples/config/local.toml producer

The example helper also checks EAMQP_EXAMPLE_CONFIG and, when running from the repository root, falls back to examples/config/local.toml. Role-based examples strip --config before parsing their own arguments, so either --config=examples/config/local.toml publish or publish --config=examples/config/local.toml works.

  • examples/pubsub - Publish/subscribe pattern
  • examples/producer / examples/consumer - Paired direct exchange example
  • examples/producer-confirm - High-level producer confirms
  • examples/batch-producer - Batch helper with publisher confirms
  • examples/connection-pool - Configured connection pool
  • examples/transaction - AMQP transactions
  • examples/workqueue-publisher / examples/workqueue-worker - Work queue pattern
  • examples/rpc - RPC-style communication
  • examples/qos - Consumer prefetch and queue inspection
  • examples/dead-letter - DLX and DLQ handling
  • examples/retry-consumer-listener / examples/retry-consumer-sender - Retry pattern
  • examples/pubsub-fanout - Fanout broadcast publish/subscribe
  • examples/reconnect - Close notifications and explicit reconnect boundary

Roadmap

v0.1.0

Initial production candidate:

  • Ego-style config loading with Load(...).Build() and BuildE()
  • Configurable connection, channel pool, TLS, logging, metrics, and trace options
  • Thin AMQP wrappers with raw access to amqp091-go
  • Publisher confirms, QoS, transactions, dead-letter, RPC, pub/sub, and work queue examples
  • Health checks, ping, pool stats, and /debug/amqp/stats
  • Explicit reconnect boundary without automatic topology or consumer recovery
v0.2.0

Consumer lifecycle hardening:

  • Consumer supervisor helper for common worker patterns
  • Graceful shutdown, handler panic recovery, and structured ack/nack handling
  • Consumer metrics for deliveries, ack/nack/reject, handler errors, and processing latency
  • Reconnect template that rebuilds connection, channel, topology, QoS, and consumers in application code
  • Separate producer and consumer connection examples for RabbitMQ TCP pushback isolation
  • CONSUMER_GUIDE.md with production consumption patterns
v0.3.0

Advanced operational helpers:

  • Optional topology declaration helper for applications that want a repeatable bootstrap step
  • Batch publisher confirm helper with explicit nack and retry hooks
  • RabbitMQ blocked/unblocked event handling examples
  • Refined metric labels and dashboard guidance
  • Compatibility review for newer amqp091-go releases

Architecture

Client
  ├── Connection (single mode)
  │     └── Channel
  │           ├── ExchangeDeclare/Bind
  │           ├── QueueDeclare/Bind
  │           ├── Publish
  │           └── Consume
  │
  └── ConnectionPool (multi-connection mode)
        ├── Connection 1
        │     └── ChannelPool
        │           ├── Channel
        │           └── Channel
        ├── Connection 2
        │     └── ChannelPool
        └── ...

License

BSD 3-Clause License. See LICENSE file.

Documentation

Overview

Package eamqp provides RabbitMQ integration for Ego framework.

Index

Constants

View Source
const (
	ExchangeDirect  = "direct"
	ExchangeFanout  = "fanout"
	ExchangeTopic   = "topic"
	ExchangeHeaders = "headers"
)

Exchange types.

View Source
const (
	// Transient messages are lost on broker restart.
	Transient uint8 = 1
	// Persistent messages survive broker restart.
	Persistent uint8 = 2
)

Delivery modes.

View Source
const (
	QueueTypeArg                 = "x-queue-type"
	QueueMaxLenArg               = "x-max-length"
	QueueMaxLenBytesArg          = "x-max-length-bytes"
	QueueOverflowArg             = "x-overflow"
	QueueMessageTTLArg           = "x-message-ttl"
	QueueTTLArg                  = "x-expires"
	StreamMaxAgeArg              = "x-max-age"
	StreamMaxSegmentSizeBytesArg = "x-stream-max-segment-size-bytes"
	QueueVersionArg              = "x-queue-version"
	ConsumerTimeoutArg           = "x-consumer-timeout"
	SingleActiveConsumerArg      = "x-single-active-consumer"
	QueueExclusiveArg            = "x-exclusive"
)

Queue argument keys.

View Source
const (
	QueueTypeClassic = "classic"
	QueueTypeQuorum  = "quorum"
	QueueTypeStream  = "stream"
)

Queue type values.

View Source
const (
	QueueOverflowDropHead         = "drop-head"
	QueueOverflowRejectPublish    = "reject-publish"
	QueueOverflowRejectPublishDLX = "reject-publish-dlx"
)

Overflow behavior values.

View Source
const (
	NeverExpire       = ""
	ImmediatelyExpire = "0"
)

Expiration constants.

View Source
const ComponentName = "eamqp"

ComponentName is kept as a short component type label for AMQP metrics.

View Source
const PackageName = "component.eamqp"

PackageName is the component identifier used by Ego logging and metrics.

Variables

This section is empty.

Functions

func DeclareExchangeAndQueue

func DeclareExchangeAndQueue(ch *Channel, exchange, kind, queue string, routingKeys []string, durable bool) (amqp.Queue, error)

DeclareExchangeAndQueue declares an exchange and queue with binding.

func DeclarePubSub

func DeclarePubSub(ch *Channel, exchange string) (amqp.Queue, error)

DeclarePubSub declares a fanout exchange with a unique queue.

func DeclareWorkQueue

func DeclareWorkQueue(ch *Channel, name string) (amqp.Queue, error)

DeclareWorkQueue declares a durable work queue.

func ExtractTraceContext

func ExtractTraceContext(ctx context.Context, headers amqp.Table) context.Context

ExtractTraceContext extracts trace context from AMQP headers.

func InjectTraceHeaders

func InjectTraceHeaders(ctx context.Context, headers amqp.Table) amqp.Table

InjectTraceHeaders injects the current trace context into AMQP headers.

func ReconnectLoop

func ReconnectLoop(ctx context.Context, client *Client, connect func() error) error

ReconnectLoop runs a reconnection loop, calling connect on each attempt. It blocks until the context is cancelled or reconnection is disabled.

func SimpleConsume

func SimpleConsume(client *Client, queue, consumerTag string, autoAck bool) (<-chan amqp.Delivery, error)

SimpleConsume returns a delivery channel for simple consumption.

func SimplePublish

func SimplePublish(client *Client, exchange, routingKey string, body []byte) error

SimplePublish publishes a single message.

func SimpleRPC

func SimpleRPC(client *Client, exchange, routingKey, replyTo string, body []byte, timeout time.Duration) ([]byte, error)

SimpleRPC performs a simple RPC-style request/response. It is a lightweight helper: the reply is acknowledged before the response body is returned, so callers that need acknowledgement control should build on Channel.Consume directly or use a higher-level RPC abstraction.

Types

type Authentication

type Authentication struct {
	// TLS configuration
	TLS *TLSConfig
	// SASL mechanism: "plain", "external", or empty for no SASL.
	SASLMechanism string `json:"saslMechanism" toml:"saslMechanism"`
}

Authentication holds TLS and credentials configuration.

type BatchProducer

type BatchProducer struct {
	// contains filtered or unexported fields
}

BatchProducer provides batch publishing support.

func NewBatchProducer

func NewBatchProducer(ch *Channel, exchange, routingKey string, maxSize int, opts ...ProducerOption) (*BatchProducer, error)

NewBatchProducer creates a batch producer.

func (*BatchProducer) Add

func (bp *BatchProducer) Add(msg amqp.Publishing)

Add adds a message to the batch. It does not block or fail when the batch reaches maxSize; callers should use ShouldFlush after Add and call Flush when it returns true.

func (*BatchProducer) Close

func (bp *BatchProducer) Close() error

Close closes the batch producer.

func (*BatchProducer) Flush

func (bp *BatchProducer) Flush() error

Flush publishes all batched messages.

func (*BatchProducer) ShouldFlush

func (bp *BatchProducer) ShouldFlush() bool

ShouldFlush returns true if the batch should be flushed.

func (*BatchProducer) Size

func (bp *BatchProducer) Size() int

Size returns the current batch size.

type Channel

type Channel struct {
	// contains filtered or unexported fields
}

Channel wraps amqp.Channel with ego integration.

func (*Channel) Ack

func (ch *Channel) Ack(tag uint64, multiple bool) error

Ack acknowledges a message.

func (*Channel) Cancel

func (ch *Channel) Cancel(consumer string, noWait bool) error

Cancel cancels a consumer.

func (*Channel) Close

func (ch *Channel) Close() error

Close closes the channel.

func (*Channel) Confirm

func (ch *Channel) Confirm(noWait bool) error

Confirm enables publisher confirms.

func (*Channel) Consume

func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error)

Consume starts consuming messages from a queue.

func (*Channel) ConsumeWithContext

func (ch *Channel) ConsumeWithContext(ctx context.Context, queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error)

ConsumeWithContext starts consuming messages with context.

func (*Channel) ExchangeBind

func (ch *Channel) ExchangeBind(destination, key, source string, noWait bool, args amqp.Table) error

ExchangeBind binds an exchange to another exchange.

func (*Channel) ExchangeDeclare

func (ch *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error

ExchangeDeclare declares an exchange.

func (*Channel) ExchangeDeclarePassive

func (ch *Channel) ExchangeDeclarePassive(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error

ExchangeDeclarePassive declares an exchange (passive = check existence only).

func (*Channel) ExchangeDelete

func (ch *Channel) ExchangeDelete(name string, ifUnused, noWait bool) error

ExchangeDelete deletes an exchange.

func (*Channel) ExchangeUnbind

func (ch *Channel) ExchangeUnbind(destination, key, source string, noWait bool, args amqp.Table) error

ExchangeUnbind unbinds an exchange from another exchange.

func (*Channel) Flow

func (ch *Channel) Flow(active bool) error

Flow pauses or resumes deliveries on this channel.

func (*Channel) Get

func (ch *Channel) Get(queue string, autoAck bool) (amqp.Delivery, bool, error)

Get synchronously retrieves a message from a queue.

func (*Channel) GetNextPublishSeqNo

func (ch *Channel) GetNextPublishSeqNo() uint64

GetNextPublishSeqNo returns the sequence number for the next publish.

func (*Channel) IsClosed

func (ch *Channel) IsClosed() bool

IsClosed returns true if the channel is closed.

func (*Channel) Nack

func (ch *Channel) Nack(tag uint64, multiple, requeue bool) error

Nack negatively acknowledges a message.

func (*Channel) NotifyCancel

func (ch *Channel) NotifyCancel() <-chan string

NotifyCancel returns a channel that receives consumer cancel notifications.

func (*Channel) NotifyClose

func (ch *Channel) NotifyClose() <-chan *amqp.Error

NotifyClose returns a channel that receives close notifications. The returned channel must be consumed until it is closed, matching amqp091-go's asynchronous notification contract.

func (*Channel) NotifyConfirm

func (ch *Channel) NotifyConfirm(ack, nack chan uint64) (chan uint64, chan uint64)

NotifyConfirm returns ack and nack channels for publisher confirms.

func (*Channel) NotifyFlow

func (ch *Channel) NotifyFlow() <-chan bool

NotifyFlow returns a channel that receives flow control notifications.

func (*Channel) NotifyPublish

func (ch *Channel) NotifyPublish() <-chan amqp.Confirmation

NotifyPublish returns a channel that receives publish confirmations.

func (*Channel) NotifyReturn

func (ch *Channel) NotifyReturn() <-chan amqp.Return

NotifyReturn returns a channel that receives undeliverable messages.

func (*Channel) Publish

func (ch *Channel) Publish(exchange, routingKey string, mandatory, immediate bool, msg amqp.Publishing) error

Publish publishes a message.

func (*Channel) PublishWithContext

func (ch *Channel) PublishWithContext(ctx context.Context, exchange, routingKey string, mandatory, immediate bool, msg amqp.Publishing) error

PublishWithContext publishes a message with context. Context currently carries trace propagation into AMQP headers; amqp091-go v1.9.0 does not guarantee publish timeout or cancellation from this context. Use publisher confirms when the caller must know whether the broker accepted the message.

func (*Channel) PublishWithDeferredConfirm

func (ch *Channel) PublishWithDeferredConfirm(exchange, routingKey string, mandatory, immediate bool, msg amqp.Publishing) (*amqp.DeferredConfirmation, error)

PublishWithDeferredConfirm publishes a message and returns a deferred confirmation.

func (*Channel) PublishWithDeferredConfirmWithContext

func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, exchange, routingKey string, mandatory, immediate bool, msg amqp.Publishing) (*amqp.DeferredConfirmation, error)

PublishWithDeferredConfirmWithContext publishes and returns a deferred confirm. Context currently carries trace propagation into AMQP headers; amqp091-go v1.9.0 also accepts the context for future cancellation semantics.

func (*Channel) Qos

func (ch *Channel) Qos(prefetchCount, prefetchSize int, global bool) error

Qos sets QoS (Quality of Service) parameters.

func (*Channel) QueueBind

func (ch *Channel) QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error

QueueBind binds a queue to an exchange.

func (*Channel) QueueDeclare

func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)

QueueDeclare declares a queue.

func (*Channel) QueueDeclarePassive

func (ch *Channel) QueueDeclarePassive(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)

QueueDeclarePassive declares a queue (passive = check existence only).

func (*Channel) QueueDelete

func (ch *Channel) QueueDelete(name string, ifUnused, ifEmpty, noWait bool) (int, error)

QueueDelete deletes a queue.

func (*Channel) QueueInspect

func (ch *Channel) QueueInspect(name string) (amqp.Queue, error)

QueueInspect passively inspects a queue by name.

func (*Channel) QueuePurge

func (ch *Channel) QueuePurge(name string, noWait bool) (int, error)

QueuePurge purges all messages from a queue.

func (*Channel) QueueUnbind

func (ch *Channel) QueueUnbind(name, key, exchange string, args amqp.Table) error

QueueUnbind unbinds a queue from an exchange.

func (*Channel) RawChannel

func (ch *Channel) RawChannel() *amqp.Channel

RawChannel returns the underlying AMQP channel. The wrapper treats raw access as stateful and will discard pooled channels on Close.

func (*Channel) Recover

func (ch *Channel) Recover(requeue bool) error

Recover redelivers unacknowledged messages.

func (*Channel) Reject

func (ch *Channel) Reject(tag uint64, requeue bool) error

Reject rejects a message.

func (*Channel) Tx

func (ch *Channel) Tx() error

Tx starts a transaction.

func (*Channel) TxCommit

func (ch *Channel) TxCommit() error

TxCommit commits a transaction.

func (*Channel) TxRollback

func (ch *Channel) TxRollback() error

TxRollback rolls back a transaction.

type ChannelPool

type ChannelPool struct {
	// contains filtered or unexported fields
}

ChannelPool manages a pool of AMQP channels for efficient reuse.

func (*ChannelPool) Acquire

func (p *ChannelPool) Acquire(ctx context.Context) (*amqp.Channel, func(), func(), error)

Acquire gets a channel from the pool.

func (*ChannelPool) Close

func (p *ChannelPool) Close() error

Close closes all channels in the pool.

func (*ChannelPool) IsClosed

func (p *ChannelPool) IsClosed() bool

IsClosed returns true if the pool is closed.

func (*ChannelPool) Stats

func (p *ChannelPool) Stats() PoolStats

Stats returns pool statistics.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is the main AMQP client.

func LoadInstance

func LoadInstance(name string) *Client

LoadInstance returns a built client by component name.

func MustNew

func MustNew(config Config, opts ...Option) *Client

MustNew creates a new client and panics on error.

func New

func New(config Config, opts ...Option) (*Client, error)

New creates a new AMQP client.

func (*Client) AcquireChannel

func (c *Client) AcquireChannel(ctx context.Context) (*Channel, func(), error)

AcquireChannel acquires a channel from the pool (pool mode only).

func (*Client) Close

func (c *Client) Close() error

Close closes the client.

func (*Client) Config

func (c *Client) Config() *Config

Config returns the client configuration.

func (*Client) GetLogger

func (c *Client) GetLogger() Logger

GetLogger returns the logger.

func (*Client) GetMetrics

func (c *Client) GetMetrics() MetricsCollector

GetMetrics returns the metrics collector.

func (*Client) Health

func (c *Client) Health() bool

Health returns true when the client has at least one usable AMQP connection.

func (*Client) HealthStatus

func (c *Client) HealthStatus() HealthInfo

HealthStatus returns an in-memory health snapshot without opening a channel.

func (*Client) IsClosed

func (c *Client) IsClosed() bool

IsClosed returns true if the client is closed.

func (*Client) NewChannel

func (c *Client) NewChannel() (*Channel, error)

NewChannel creates a new channel.

func (*Client) NotifyBlocked

func (c *Client) NotifyBlocked() <-chan amqp.Blocking

NotifyBlocked returns RabbitMQ connection blocked/unblocked notifications.

func (*Client) NotifyClose

func (c *Client) NotifyClose() <-chan *Error

NotifyClose returns a channel that receives close notifications. The returned channel must be consumed until it is closed, matching amqp091-go's asynchronous notification contract.

func (*Client) Ping

func (c *Client) Ping(ctx context.Context) error

Ping verifies that RabbitMQ accepts opening an AMQP channel.

func (*Client) RawConnection

func (c *Client) RawConnection() *amqp.Connection

RawConnection returns the underlying AMQP connection.

func (*Client) Reconnect

func (c *Client) Reconnect() error

Reconnect attempts to reconnect the client.

func (*Client) Stats

func (c *Client) Stats() PoolStats

Stats returns pool statistics.

type ClientInterface

type ClientInterface interface {
	GetLogger() Logger
	GetMetrics() MetricsCollector
}

ClientInterface defines the methods needed by Channel.

type Config

type Config struct {
	// Addr is the AMQP URI(s). Multiple URIs separated by comma enable
	// basic load balancing. Use amqps:// for TLS.
	// Environment variable: EGO_CONFIG_AMQP_ADDR
	Addr string `json:"addr" toml:"addr"`

	// Vhost overrides the virtual host from Addr. Empty keeps the URI vhost.
	Vhost string `json:"vhost" toml:"vhost"`

	// TLS configuration.
	TLSConfig *tls.Config `json:"-" toml:"-"`

	// TLSFileConfig enables file-based TLS. Alternative to TLSConfig.
	TLSCertFile   string `json:"tlsCertFile" toml:"tlsCertFile"`
	TLSKeyFile    string `json:"tlsKeyFile" toml:"tlsKeyFile"`
	TLSCACert     string `json:"tlsCaCert" toml:"tlsCaCert"`
	TLSServerName string `json:"tlsServerName" toml:"tlsServerName"`

	// Auth overrides credentials from Addr.
	Username string `json:"username" toml:"username"`
	Password string `json:"password" toml:"password"`

	// Tuning parameters.
	Heartbeat  time.Duration `json:"heartbeat" toml:"heartbeat"`
	ChannelMax uint16        `json:"channelMax" toml:"channelMax"`
	FrameSize  int           `json:"frameSize" toml:"frameSize"`
	Locale     string        `json:"locale" toml:"locale"`

	// Connection pool (0 = single connection, N = pool size).
	PoolSize    int           `json:"poolSize" toml:"poolSize"`
	PoolMaxIdle int           `json:"poolMaxIdle" toml:"poolMaxIdle"`
	PoolMaxLife time.Duration `json:"poolMaxLife" toml:"poolMaxLife"`

	// Channel pool (0 = single channel, N = pool size per connection).
	ChannelPoolSize    int           `json:"channelPoolSize" toml:"channelPoolSize"`
	ChannelPoolMaxIdle int           `json:"channelPoolMaxIdle" toml:"channelPoolMaxIdle"`
	ChannelPoolMaxLife time.Duration `json:"channelPoolMaxLife" toml:"channelPoolMaxLife"`

	// Manual reconnect policy. These fields configure helper policies only;
	// the component does not run background reconnect or topology recovery.
	ReconnectInterval    time.Duration `json:"reconnectInterval" toml:"reconnectInterval"`
	ReconnectMaxAttempts int           `json:"reconnectMaxAttempts" toml:"reconnectMaxAttempts"`

	// Observability.
	EnableAccessInterceptor bool `json:"enableAccessInterceptor" toml:"enableAccessInterceptor"`
	EnableMetricInterceptor bool `json:"enableMetricInterceptor" toml:"enableMetricInterceptor"`
	EnableTraceInterceptor  bool `json:"enableTraceInterceptor" toml:"enableTraceInterceptor"`

	// Debug.
	ClientName string `json:"clientName" toml:"clientName"`

	// OnFail controls startup failure behavior: "panic" or "error".
	OnFail string `json:"onFail" toml:"onFail"`
}

Config holds all configuration for the AMQP client.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns a config with sensible defaults.

func (Config) ReconnectPolicy

func (c Config) ReconnectPolicy() ReconnectPolicy

ReconnectPolicy returns the explicit reconnect helper policy derived from Config.

func (*Config) Validate

func (c *Config) Validate() error

Validate checks the config for common issues.

type ConnectionPool

type ConnectionPool struct {
	// contains filtered or unexported fields
}

ConnectionPool manages multiple AMQP connections for high availability.

func (*ConnectionPool) AcquireChannel

func (p *ConnectionPool) AcquireChannel(ctx context.Context) (*amqp.Channel, func(), error)

AcquireChannel gets a channel from the next connection (round-robin).

func (*ConnectionPool) AcquireFromPool

func (p *ConnectionPool) AcquireFromPool(ctx context.Context) (*amqp.Channel, func(), func(), error)

AcquireFromPool gets a channel from the pool of the next connection.

func (*ConnectionPool) Close

func (p *ConnectionPool) Close() error

Close closes all connections.

func (*ConnectionPool) GetConnection

func (p *ConnectionPool) GetConnection(idx int) *amqp.Connection

GetConnection returns the connection at the given index.

func (*ConnectionPool) IsClosed

func (p *ConnectionPool) IsClosed() bool

IsClosed returns true if the pool is closed.

func (*ConnectionPool) Len

func (p *ConnectionPool) Len() int

Len returns the number of connections in the pool.

func (*ConnectionPool) NotifyBlocked

func (p *ConnectionPool) NotifyBlocked() <-chan amqp.Blocking

NotifyBlocked returns a channel that receives RabbitMQ blocked notifications.

func (*ConnectionPool) NotifyClose

func (p *ConnectionPool) NotifyClose() <-chan *amqp.Error

NotifyClose returns a channel that receives connection close notifications.

func (*ConnectionPool) Stats

func (p *ConnectionPool) Stats() PoolStats

Stats returns connection pool statistics.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer provides high-level message consumption.

func NewConsumer

func NewConsumer(ch *Channel, queue string, opts ...ConsumerOption) *Consumer

NewConsumer creates a new consumer.

func (*Consumer) Cancel

func (c *Consumer) Cancel(consumerTag string) error

Cancel cancels the consumer.

func (*Consumer) Close

func (c *Consumer) Close() error

Close closes the consumer.

func (*Consumer) Consume

func (c *Consumer) Consume(consumerTag string) (<-chan amqp.Delivery, error)

Consume starts consuming messages.

func (*Consumer) ConsumeWithHandler

func (c *Consumer) ConsumeWithHandler(ctx context.Context, consumerTag string, handler MessageHandler) error

ConsumeWithHandler starts consuming and processes messages with a handler.

func (*Consumer) ConsumeWithTimeout

func (c *Consumer) ConsumeWithTimeout(consumerTag string, timeout time.Duration, handler MessageHandler) error

ConsumeWithTimeout starts consuming with per-message timeout.

func (*Consumer) ConsumeWithWorkers

func (c *Consumer) ConsumeWithWorkers(ctx context.Context, consumerTag string, numWorkers int, handler MessageHandler) error

ConsumeWithWorkers starts consuming with a worker pool.

type ConsumerOption

type ConsumerOption func(*consumerOptions)

ConsumerOption configures a consumer.

func WithConsumerArgs

func WithConsumerArgs(args amqp.Table) ConsumerOption

WithConsumerArgs sets consumer arguments.

func WithConsumerAutoAck

func WithConsumerAutoAck() ConsumerOption

WithConsumerAutoAck sets auto-acknowledge mode.

func WithConsumerExclusive

func WithConsumerExclusive() ConsumerOption

WithConsumerExclusive sets exclusive consumer.

type Container

type Container struct {
	// contains filtered or unexported fields
}

Container wraps the Config and ego logging for configuration-driven initialization.

func DefaultContainer

func DefaultContainer() *Container

DefaultContainer returns a Container with default configuration.

func Load

func Load(key string) *Container

Load loads configuration from ego's config manager and returns a Container. The key parameter corresponds to the config section name (e.g., "eamqp").

func (*Container) Build

func (c *Container) Build(options ...ContainerOption) *Client

Build constructs the AMQP Client from the loaded configuration. It applies all configured interceptors and handles connection errors according to OnFail.

func (*Container) BuildE

func (c *Container) BuildE(options ...ContainerOption) (*Client, error)

BuildE constructs the AMQP Client and returns startup errors to the caller.

type ContainerOption

type ContainerOption func(c *Container)

ContainerOption configures the Container.

func WithOnFail

func WithOnFail(onFail string) ContainerOption

WithOnFail sets the failure behavior: "panic" (default) or "error".

type EgoMetricsCollector

type EgoMetricsCollector struct {
	// contains filtered or unexported fields
}

EgoMetricsCollector records AMQP client metrics using Ego's shared metrics.

func NewEgoMetrics

func NewEgoMetrics(name, peer string) *EgoMetricsCollector

NewEgoMetrics creates a MetricsCollector backed by Ego emetric.

func (*EgoMetricsCollector) RecordChannelAcquired

func (m *EgoMetricsCollector) RecordChannelAcquired()

func (*EgoMetricsCollector) RecordChannelReturned

func (m *EgoMetricsCollector) RecordChannelReturned()

func (*EgoMetricsCollector) RecordConnection

func (m *EgoMetricsCollector) RecordConnection(active bool)

func (*EgoMetricsCollector) RecordConnectionError

func (m *EgoMetricsCollector) RecordConnectionError()

func (*EgoMetricsCollector) RecordConsumeLatency

func (m *EgoMetricsCollector) RecordConsumeLatency(duration time.Duration)

func (*EgoMetricsCollector) RecordMessageConfirmed

func (m *EgoMetricsCollector) RecordMessageConfirmed()

func (*EgoMetricsCollector) RecordMessageConsumed

func (m *EgoMetricsCollector) RecordMessageConsumed(size int)

func (*EgoMetricsCollector) RecordMessageNacked

func (m *EgoMetricsCollector) RecordMessageNacked()

func (*EgoMetricsCollector) RecordMessagePublished

func (m *EgoMetricsCollector) RecordMessagePublished(size int)

func (*EgoMetricsCollector) RecordPublishLatency

func (m *EgoMetricsCollector) RecordPublishLatency(duration time.Duration)

type Error

type Error struct {
	Component string // "connection", "channel", "publish", "consume"
	Op        string // Operation name
	// contains filtered or unexported fields
}

Error wraps amqp.Error with component context.

func (*Error) Error

func (e *Error) Error() string

Error implements error.

func (*Error) IsRetryable

func (e *Error) IsRetryable() bool

IsRetryable returns true if the error is temporary and can be retried.

func (*Error) Unwrap

func (e *Error) Unwrap() error

Unwrap returns the underlying amqp.Error.

type HealthInfo

type HealthInfo struct {
	Status            HealthStatus
	Reason            string
	ConnectionsActive int
	ConnectionsTotal  int
}

HealthInfo contains a lightweight health snapshot for readiness checks.

type HealthStatus

type HealthStatus string

HealthStatus describes the lightweight client health state.

const (
	// HealthStatusUp means all known AMQP connections are open.
	HealthStatusUp HealthStatus = "up"
	// HealthStatusDegraded means at least one pooled connection is open, but not all are healthy.
	HealthStatusDegraded HealthStatus = "degraded"
	// HealthStatusDown means the client has no usable AMQP connection.
	HealthStatusDown HealthStatus = "down"
	// HealthStatusClosed means the client or connection pool was explicitly closed.
	HealthStatusClosed HealthStatus = "closed"
)

type InstanceStats

type InstanceStats struct {
	Health HealthInfo `json:"health"`
	Pool   PoolStats  `json:"pool"`
}

InstanceStats is the governor/debug snapshot for a built eamqp client.

type Logger

type Logger interface {
	Debug(msg string, keyvals ...any)
	Info(msg string, keyvals ...any)
	Warn(msg string, keyvals ...any)
	Error(msg string, keyvals ...any)
}

Logger interface for structured logging.

func NewEgoLogger

func NewEgoLogger(component *elog.Component) Logger

NewEgoLogger adapts an Ego logger component to the eamqp Logger interface.

type MessageHandler

type MessageHandler func(delivery amqp.Delivery) error

MessageHandler is a function that handles a delivery.

type MetricsCollector

type MetricsCollector interface {
	RecordConnection(active bool)
	RecordConnectionError()
	RecordChannelAcquired()
	RecordChannelReturned()
	RecordMessagePublished(size int)
	RecordMessageConfirmed()
	RecordMessageNacked()
	RecordMessageConsumed(size int)
	RecordPublishLatency(duration time.Duration)
	RecordConsumeLatency(duration time.Duration)
}

MetricsCollector collects AMQP metrics.

type NoOpMetrics

type NoOpMetrics struct{}

NoOpMetrics is a no-op metrics collector.

func (*NoOpMetrics) RecordChannelAcquired

func (m *NoOpMetrics) RecordChannelAcquired()

func (*NoOpMetrics) RecordChannelReturned

func (m *NoOpMetrics) RecordChannelReturned()

func (*NoOpMetrics) RecordConnection

func (m *NoOpMetrics) RecordConnection(active bool)

func (*NoOpMetrics) RecordConnectionError

func (m *NoOpMetrics) RecordConnectionError()

func (*NoOpMetrics) RecordConsumeLatency

func (m *NoOpMetrics) RecordConsumeLatency(duration time.Duration)

func (*NoOpMetrics) RecordMessageConfirmed

func (m *NoOpMetrics) RecordMessageConfirmed()

func (*NoOpMetrics) RecordMessageConsumed

func (m *NoOpMetrics) RecordMessageConsumed(size int)

func (*NoOpMetrics) RecordMessageNacked

func (m *NoOpMetrics) RecordMessageNacked()

func (*NoOpMetrics) RecordMessagePublished

func (m *NoOpMetrics) RecordMessagePublished(size int)

func (*NoOpMetrics) RecordPublishLatency

func (m *NoOpMetrics) RecordPublishLatency(duration time.Duration)

type NopLogger

type NopLogger struct{}

NopLogger is a no-op logger.

func (NopLogger) Debug

func (NopLogger) Debug(msg string, keyvals ...any)

func (NopLogger) Error

func (NopLogger) Error(msg string, keyvals ...any)

func (NopLogger) Info

func (NopLogger) Info(msg string, keyvals ...any)

func (NopLogger) Warn

func (NopLogger) Warn(msg string, keyvals ...any)

type Option

type Option func(*Options)

Option configures the client.

func WithLogger

func WithLogger(log Logger) Option

WithLogger sets a custom logger.

func WithMetrics

func WithMetrics(m MetricsCollector) Option

WithMetrics sets a custom metrics collector.

func WithOptions

func WithOptions(opts *Options) Option

WithOptions sets the options.

type Options

type Options struct {
	// Dial is a custom dial function. If set, it is used instead of net.Dial.
	// The addr parameter is the host:port from the URI.
	Dial func(network, addr string) (net.Conn, error)

	// Auth specifies SASL authentication mechanisms.
	// If set, this overrides the default PLAIN auth.
	Auth []amqp.Authentication

	// ConnectionName sets the RabbitMQ connection name for management UI.
	ConnectionName string

	// ChannelOptions is called after each raw channel is opened.
	// In pooled mode it runs when the raw channel is created, not on every checkout.
	ChannelOptions func(ch *amqp.Channel) error

	// OnReconnect is reserved for a future lifecycle supervisor.
	// Client.Reconnect is explicit and does not invoke this callback today.
	OnReconnect func(attempt int)

	// OnDisconnect is reserved for a future lifecycle supervisor.
	// Use NotifyClose today to observe connection close events.
	OnDisconnect func(err error)

	// OnChannelError is reserved for a future lifecycle supervisor.
	// Use Channel.NotifyClose today to observe channel close events.
	OnChannelError func(channelID uint16, err error)

	// Logger overrides the component logger.
	Logger Logger

	// Metrics overrides the component metrics collector.
	Metrics MetricsCollector
}

Options holds optional configuration for the AMQP client.

type PoolStats

type PoolStats struct {
	ConnectionsActive int
	ConnectionsTotal  int
	ChannelsActive    int
	ChannelsAcquired  int64
	ChannelsReturned  int64
	Reconnects        int64
}

PoolStats holds connection/channel pool statistics.

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer provides high-level publishing with confirms support.

func NewProducer

func NewProducer(ch *Channel, opts ...ProducerOption) (*Producer, error)

NewProducer creates a new producer.

func (*Producer) Close

func (p *Producer) Close() error

Close closes the producer's channel.

func (*Producer) Publish

func (p *Producer) Publish(exchange, routingKey string, msg amqp.Publishing) error

Publish publishes a message.

func (*Producer) PublishAsync

func (p *Producer) PublishAsync(exchange, routingKey string, msg amqp.Publishing) (*amqp.DeferredConfirmation, error)

PublishAsync publishes without waiting for confirm.

func (*Producer) PublishWithContext

func (p *Producer) PublishWithContext(ctx context.Context, exchange, routingKey string, msg amqp.Publishing) error

PublishWithContext publishes with context.

func (*Producer) PublishWithContextOptions

func (p *Producer) PublishWithContextOptions(ctx context.Context, exchange, routingKey string, mandatory, immediate bool, msg amqp.Publishing) error

PublishWithContextOptions publishes with context and full options.

func (*Producer) PublishWithOptions

func (p *Producer) PublishWithOptions(exchange, routingKey string, mandatory, immediate bool, msg amqp.Publishing) error

PublishWithOptions publishes a message with full control.

type ProducerOption

type ProducerOption func(*Producer)

ProducerOption configures a producer.

func WithConfirm

func WithConfirm(timeout time.Duration) ProducerOption

WithConfirm enables publisher confirms with the given timeout.

type QueueArgs

type QueueArgs map[string]any

QueueArgs holds optional arguments for QueueDeclare.

func NewQueueArgs

func NewQueueArgs() QueueArgs

NewQueueArgs creates a new QueueArgs with common defaults.

func (QueueArgs) WithDeadLetterExchange

func (a QueueArgs) WithDeadLetterExchange(dlx string) QueueArgs

WithDeadLetterExchange sets the dead letter exchange.

func (QueueArgs) WithDeadLetterRoutingKey

func (a QueueArgs) WithDeadLetterRoutingKey(key string) QueueArgs

WithDeadLetterRoutingKey sets the dead letter routing key.

func (QueueArgs) WithDurable deprecated

func (a QueueArgs) WithDurable(durable bool) QueueArgs

WithDurable sets the durable argument.

Deprecated: durable is a QueueDeclare parameter, not a queue argument. Use QueueDeclare's durable parameter directly. Use WithQueueType to set x-queue-type explicitly.

func (QueueArgs) WithMaxLength

func (a QueueArgs) WithMaxLength(n int) QueueArgs

WithMaxLength sets the maximum number of messages.

func (QueueArgs) WithMaxLengthBytes

func (a QueueArgs) WithMaxLengthBytes(n int) QueueArgs

WithMaxLengthBytes sets the maximum total body size.

func (QueueArgs) WithMessageTTL

func (a QueueArgs) WithMessageTTL(ttl time.Duration) QueueArgs

WithMessageTTL sets the per-message TTL in milliseconds.

func (QueueArgs) WithOverflow

func (a QueueArgs) WithOverflow(behavior string) QueueArgs

WithOverflow sets the overflow behavior.

func (QueueArgs) WithQueueTTL

func (a QueueArgs) WithQueueTTL(ttl time.Duration) QueueArgs

WithQueueTTL sets the queue TTL (auto-delete after idle time).

func (QueueArgs) WithQueueType

func (a QueueArgs) WithQueueType(qt string) QueueArgs

WithQueueType sets the queue type (classic, quorum, stream).

func (QueueArgs) WithSingleActiveConsumer

func (a QueueArgs) WithSingleActiveConsumer() QueueArgs

WithSingleActiveConsumer enables single active consumer.

type ReconnectPolicy

type ReconnectPolicy struct {
	Enabled     bool
	Initial     time.Duration // Initial backoff interval
	Max         time.Duration // Maximum backoff interval
	Multiplier  float64       // Backoff multiplier
	MaxAttempts int           // 0 = infinite
}

ReconnectPolicy defines the reconnection strategy.

func DefaultReconnectPolicy

func DefaultReconnectPolicy() ReconnectPolicy

DefaultReconnectPolicy returns the default reconnection policy.

func (ReconnectPolicy) Backoff

func (p ReconnectPolicy) Backoff(attempt int) time.Duration

Backoff calculates the next reconnect delay.

type RetryConsumer

type RetryConsumer struct {
	// contains filtered or unexported fields
}

RetryConsumer provides consumption with automatic retry.

func NewRetryConsumer

func NewRetryConsumer(ch *Channel, queue string, maxRetries int, retryDelay time.Duration, opts ...ConsumerOption) (*RetryConsumer, error)

NewRetryConsumer creates a retry consumer.

func (*RetryConsumer) ConsumeWithRetry

func (rc *RetryConsumer) ConsumeWithRetry(ctx context.Context, consumerTag string, handler MessageHandler) error

ConsumeWithRetry starts consuming with automatic retry.

type TLSConfig

type TLSConfig struct {
	CertFile string `json:"certFile" toml:"certFile"`
	KeyFile  string `json:"keyFile" toml:"keyFile"`
	CACert   string `json:"caCert" toml:"caCert"`
	Insecure bool   `json:"insecure" toml:"insecure"`
}

TLSConfig holds file-based TLS configuration.

func (*TLSConfig) LoadTLSConfig

func (c *TLSConfig) LoadTLSConfig() (*tls.Config, error)

LoadTLSConfig builds a *tls.Config from file-based settings.

Directories

Path Synopsis
examples
batch-producer command
batch-producer demonstrates the high-level BatchProducer helper with publisher confirms enabled.
batch-producer demonstrates the high-level BatchProducer helper with publisher confirms enabled.
connection-pool command
connection-pool demonstrates high-availability connection pooling with round-robin load distribution across multiple connections.
connection-pool demonstrates high-availability connection pooling with round-robin load distribution across multiple connections.
consumer command
consumer example demonstrates consuming with manual acks.
consumer example demonstrates consuming with manual acks.
dead-letter command
dead-letter demonstrates Dead Letter Exchange (DLX) and Dead Letter Queue (DLQ) patterns for handling failed message processing.
dead-letter demonstrates Dead Letter Exchange (DLX) and Dead Letter Queue (DLQ) patterns for handling failed message processing.
producer command
producer example demonstrates publishing with confirms.
producer example demonstrates publishing with confirms.
producer-confirm command
producer-confirm demonstrates the high-level Producer API with publisher confirms.
producer-confirm demonstrates the high-level Producer API with publisher confirms.
pubsub command
pubsub example demonstrates a simple publish/subscribe pattern with RabbitMQ.
pubsub example demonstrates a simple publish/subscribe pattern with RabbitMQ.
pubsub-fanout command
pubsub-fanout demonstrates the fanout exchange pattern where each subscriber gets its own copy of every message.
pubsub-fanout demonstrates the fanout exchange pattern where each subscriber gets its own copy of every message.
qos command
qos demonstrates Quality of Service (QoS) settings for consumer-side message prefetching.
qos demonstrates Quality of Service (QoS) settings for consumer-side message prefetching.
reconnect command
reconnect demonstrates connection close notifications and the explicit reconnect boundary.
reconnect demonstrates connection close notifications and the explicit reconnect boundary.
retry-consumer-listener command
listener demonstrates automatic message retry with configurable max attempts using a worker pool for concurrent processing.
listener demonstrates automatic message retry with configurable max attempts using a worker pool for concurrent processing.
retry-consumer-sender command
sender publishes test messages to the retry queue.
sender publishes test messages to the retry queue.
rpc command
rpc demonstrates RPC-style request/response over RabbitMQ using the Direct Reply-To pattern.
rpc demonstrates RPC-style request/response over RabbitMQ using the Direct Reply-To pattern.
transaction command
transaction demonstrates AMQP channel transactions.
transaction demonstrates AMQP channel transactions.
workqueue-publisher command
publisher sends tasks to the workqueue.
publisher sends tasks to the workqueue.
workqueue-worker command
worker is one of the workqueue example.
worker is one of the workqueue example.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL