pubsub

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 26, 2026 License: MIT Imports: 9 Imported by: 0

README

pubsub

A Go library for working with RabbitMQ via AMQP 0-9-1 with automatic reconnection, channel reinitialization, and consumer management.

Built on top of amqp091-go.

Features

  • Automatic Reconnection — when the TCP connection with RabbitMQ is lost, the session automatically reconnects and redeclares all resources.
  • Channel Reinitialization — on an AMQP channel error (e.g., 404 during declaration), the channel is recreated without dropping the connection.
  • Confirmed PublishingPublish blocks until confirmation from the server, retries on failure, and keeps publish/confirm matching correct under concurrency. Publish serialization is context-aware: PublishWithContext honors its context even under contention.
  • Consumer Management — consumers are automatically restarted after reconnection. Supports Ack/Nack based on the handler's return value. When AutoAck is true, the broker auto-acknowledges messages on delivery; the handler is still invoked but Ack/Nack calls are skipped.
  • Custom Logger — the Logger interface allows you to integrate any logging system.
  • Context-aware OperationsPublishWithContext, PublishToWithContext, UnsafePublishToWithContext, and WaitReady support cancellation via context.Context. PublishWithContext correctly cancels even when waiting to acquire the internal publish semaphore.
  • Configurable Retry Delays — tune reconnect/reinit/resend delays with WithReconnectDelay, WithReInitDelay, and WithResendDelay.

Installation

go get github.com/germangorelkin/pubsub

Requirements: Go 1.24+, a running RabbitMQ instance.

Quick Start

Minimal Example: Publishing and Subscribing
package main

import (
    "fmt"
    "log"
    "os"
    "os/signal"
    "syscall"

    "github.com/germangorelkin/pubsub"
)

func main() {
    // Create a session with default resource declarations.
    session := pubsub.New(
        "amqp://guest:guest@localhost:5672/",
        pubsub.WithDeclare(
            pubsub.Exchange{
                Name:           "events",
                Kind:           "direct",
                Durable:        true,
                IsUsageDefault: true,
            },
            pubsub.Queue{
                Name:           "events.processing",
                Durable:        true,
                IsUsageDefault: true,
            },
            pubsub.Bind{
                QueueName:      "events.processing",
                ExchangeName:   "events",
                Key:            "event.created",
                IsUsageDefault: true,
            },
        ),
    )
    defer session.Close()

    // Subscribe to the default queue.
    err := session.Subscribe(func(d pubsub.Delivery) error {
        fmt.Printf("Received: %s\n", string(d.Body))
        return nil // nil = Ack, error = Nack with requeue
    })
    if err != nil {
        log.Fatal(err)
    }

    // Publish a message to the default exchange and routing key.
    if err := session.Publish([]byte("Hello, RabbitMQ!")); err != nil {
        log.Fatal(err)
    }

    // Wait for the termination signal.
    sig := make(chan os.Signal, 1)
    signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
    <-sig
}

Examples

Publishing with Context and Timeout
import (
    "context"
    "time"
)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

err := session.PublishWithContext(ctx, []byte("message with timeout"))
if err != nil {
    // err might be context.DeadlineExceeded, context.Canceled, or ErrShutdown
    log.Printf("Publish error: %v", err)
}
Wait Until Session Is Ready
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()

if err := session.WaitReady(ctx); err != nil {
    log.Printf("session is not ready: %v", err)
}
Publishing to a Specific Exchange
// Publishing without using default resources.
err := session.PublishTo("notifications", "user.signup", []byte(`{"user_id": 42}`))
if err != nil {
    log.Printf("Error: %v", err)
}

// With context:
err = session.PublishToWithContext(ctx, "notifications", "user.signup", []byte(`{"user_id": 42}`))
Subscribing to a Specific Queue
err := session.SubscribeTo("orders.new", func(d pubsub.Delivery) error {
    log.Printf("New order: %s", string(d.Body))
    // Processing...
    return nil // Ack
})
// If you want to remove this consumer later, get its generated tag.
tag, err := session.SubscribeToWithTag("orders.new", handler)
if err != nil {
    log.Fatal(err)
}
_ = session.RemoveConsumer(tag)
Consumer Management (AddConsumer / RemoveConsumer)
// Create a consumer with an explicit name for later management.
consumer := &pubsub.Consumer{
    Name:      "order-processor",
    QueueName: "orders.new",
    Handler: func(d pubsub.Delivery) error {
        log.Printf("Processing order: %s", string(d.Body))
        return nil
    },
}

// Register. The consumer will start automatically once the channel is ready.
if err := session.AddConsumer(consumer); err != nil {
    log.Fatal(err)
}

// Later on — stop and remove the consumer.
if err := session.RemoveConsumer("order-processor"); err != nil {
    log.Printf("Error removing consumer: %v", err)
}
AutoAck Consumer
// AutoAck consumer: the broker acknowledges the message on delivery.
// The handler's return value does not trigger Ack/Nack.
// Use this when processing is best-effort and message loss is acceptable.
consumer := &pubsub.Consumer{
    Name:      "metrics-collector",
    QueueName: "metrics",
    AutoAck:   true,
    Handler: func(d pubsub.Delivery) error {
        log.Printf("Metric: %s", string(d.Body))
        return nil
    },
}
session.AddConsumer(consumer)
Dynamic Resource Declaration
session := pubsub.New("amqp://guest:guest@localhost:5672/")
defer session.Close()

// Resources can be declared after creating the session.
// They will be declared on the server (if the channel is ready)
// and redeclared upon every reconnection.

session.ExchangeDeclare(pubsub.Exchange{
    Name:    "logs",
    Kind:    "fanout",
    Durable: true,
})

session.QueueDeclare(pubsub.Queue{
    Name:    "logs.archive",
    Durable: true,
})

session.QueueBind(pubsub.Bind{
    QueueName:    "logs.archive",
    ExchangeName: "logs",
    Key:          "",
})

After a successful declaration, the session signals its consumer manager. This means consumers that were added before their queue was declared (and thus could not start) will automatically be retried.

Custom Logger
import "go.uber.org/zap"

type zapLogger struct {
    sugar *zap.SugaredLogger
}

func (l *zapLogger) Printf(format string, v ...interface{}) {
    l.sugar.Infof(format, v...)
}

logger, _ := zap.NewProduction()
session := pubsub.New(
    "amqp://guest:guest@localhost:5672/",
    pubsub.WithLogger(&zapLogger{sugar: logger.Sugar()}),
)
UnsafePublish — Publishing without Confirmation
// Use ONLY when message loss is acceptable (metrics, logs).
// DO NOT call concurrently with Publish/PublishTo — it is not thread-safe.
err := session.UnsafePublishTo("metrics", "host.cpu", []byte("metric:cpu=42"))
if err != nil {
    log.Printf("Error: %v", err)
}

UnsafePublish(message, exchange, key) is kept for backward compatibility.

Error Handling in Handler
err := session.Subscribe(func(d pubsub.Delivery) error {
    var order Order
    if err := json.Unmarshal(d.Body, &order); err != nil {
        // Returning an error will cause a Nack with requeue=true.
        // The message will be returned to the queue for reprocessing.
        return fmt.Errorf("invalid JSON: %w", err)
    }

    if err := processOrder(order); err != nil {
        return fmt.Errorf("processing error: %w", err)
    }

    // nil = Ack. The message is removed from the queue.
    return nil
})

Note: When AutoAck is true, handler errors and panics do not cause Nack/requeue — the message has already been acknowledged by the broker on delivery.

Graceful Shutdown
func main() {
    session := pubsub.New("amqp://guest:guest@localhost:5672/", ...)
    // DO NOT use defer session.Close() in main — use a signal instead.

    session.Subscribe(handler)

    // Wait for the termination signal.
    sig := make(chan os.Signal, 1)
    signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
    <-sig

    log.Println("Shutting down...")
    // Close() waits for the current messages to finish processing,
    // then closes the channel and the connection.
    if err := session.Close(); err != nil {
        log.Printf("Close error: %v", err)
    }
}

Best Practices

1. Always Set Default Resources via WithDeclare
// Good: resources are set during creation, Publish/Subscribe methods work immediately.
session := pubsub.New(addr, pubsub.WithDeclare(exchange, queue, bind))

// Bad: forgot IsUsageDefault — Publish will return ErrNotSetDefaultExchange.
session := pubsub.New(addr, pubsub.WithDeclare(
    pubsub.Exchange{Name: "x", Kind: "direct", Durable: true}, // IsUsageDefault: false!
    ...
))
2. Use PublishWithContext Instead of Publish

Publish uses context.Background() and blocks indefinitely until success or session closure. In production code, it is preferable to set a timeout:

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
err := session.PublishWithContext(ctx, message)

PublishWithContext correctly respects the context deadline even when another publish is in progress (contention on the internal semaphore). This makes it safe to use tight timeouts in high-concurrency scenarios.

3. Wait for Readiness in Startup-Critical Flows

If your app must not start work before RabbitMQ is available, use WaitReady(ctx) after New(...).

4. Handler Should Be Idempotent

A message can be delivered more than once (on Nack with requeue, or upon reconnection). Ensure that reprocessing the same message is safe.

5. Do Not Panic in the Handler

Panics are recovered, but the message will be returned to the queue (Nack + requeue). If the panic repeats during every process attempt, the message will enter an infinite loop. Handle errors explicitly.

6. Use AutoAck Only When Message Loss Is Acceptable

When AutoAck: true, the broker removes the message from the queue as soon as it is delivered. If the handler returns an error or panics, the message is lost — there is no requeue. Use AutoAck for non-critical data (metrics, logs) where processing speed matters more than reliability.

7. Do Not Use UnsafePublish for Important Data

UnsafePublish does not wait for confirmation and is not thread-safe with Publish*. Use it only for metrics, logs, and data where loss is acceptable.

8. Always Call Close()
session := pubsub.New(addr, opts...)
defer session.Close() // Guarantees graceful shutdown of goroutines.

Without Close(), the three background goroutines will run forever, causing a resource leak.

9. Use Unique Consumer Names

If you need to manage consumers (RemoveConsumer), set explicit, unique names:

consumer := &pubsub.Consumer{
    Name:      "worker-1",
    QueueName: "tasks",
    Handler:   handler,
}
session.AddConsumer(consumer)
// Later: session.RemoveConsumer("worker-1")

Adding a consumer with the same name again will return ErrConsumerAlreadyExists.

10. Use Valid Declaration Parameters
  • QueueDeclare: queue name must be non-empty.
  • ExchangeDeclare: exchange name and kind must be non-empty.
  • QueueBind: queue and exchange names must be non-empty.
11. Durable Queues and Persistent Messages

The library always publishes with DeliveryMode: amqp.Persistent. To guarantee message safety upon RabbitMQ restart, use Durable: true for queues and exchanges.

Errors

Error When it occurs
ErrNotConnected Publishing or Stream call when the channel is not ready.
ErrAlreadyClosed Repeated call to Close().
ErrShutdown Publishing after calling Close().
ErrNotSetDefaultQueue Subscribe() without a set default queue.
ErrNotSetDefaultExchange Publish() without set default exchange/bind.
ErrConsumerAlreadyExists AddConsumer() with a name that is already registered.
ErrInvalidConsumer Nil/invalid consumer config or empty tag in RemoveConsumer().
ErrInvalidExchange Invalid ExchangeDeclare() input.
ErrInvalidQueue Invalid QueueDeclare() input.
ErrInvalidBind Invalid QueueBind() input.
ErrInvalidDelay Invalid delay passed to WithReconnectDelay/WithReInitDelay/WithResendDelay.

Testing

# Unit tests
go test ./...

# E2E tests (require Docker)
cd e2e && go test ./...

The E2E tests use dockertest to automatically start RabbitMQ in a container.

License

MIT

Documentation

Overview

Package pubsub provides a wrapper around the amqp091-go library for working with RabbitMQ. Its primary goal is automatic reconnection on connection loss, channel reinitialization, resource declaration (exchange, queue, binding), and consumer management with Ack/Nack support.

The architecture is built around a Session facade that owns the AMQP connection, channel, and three background goroutines:

  • handleReconnect — monitors TCP connection loss and reconnects;
  • handleReOpenChannel — recreates the AMQP channel after a channel error;
  • handleConsumers — starts and restarts consumers when the channel is ready.

Coordination between goroutines is implemented via signal channels (connReadyCh, chanReadyCh, consumerUpdateCh) and atomic flags (isReady, isClosed).

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNotConnected indicates the session is not connected to the RabbitMQ server.
	// Returned when attempting to publish or create a stream without a ready channel.
	ErrNotConnected = errors.New("not connected to a server")

	// ErrAlreadyClosed indicates a duplicate call to Close() on an already closed session.
	ErrAlreadyClosed = errors.New("already closed: not connected to the server")

	// ErrShutdown indicates the session is in the process of shutting down.
	// Returned by publish methods when the done channel is closed.
	ErrShutdown = errors.New("session is shutting down")

	// ErrNotSetDefaultQueue indicates no default queue has been set.
	// Returned by Subscribe() if WithDeclare was not called with IsUsageDefault.
	ErrNotSetDefaultQueue = errors.New("default queue is not set")

	// ErrNotSetDefaultExchange indicates no default exchange or routing key has been set.
	// Returned by Publish() if WithDeclare was not called with IsUsageDefault.
	ErrNotSetDefaultExchange = errors.New("default exchange or key is not set")

	// ErrConsumerAlreadyExists indicates a consumer with the given name is already registered.
	// Returned by AddConsumer() to prevent silent overwrites.
	ErrConsumerAlreadyExists = errors.New("consumer with this name already exists")

	// ErrInvalidConsumer indicates invalid consumer configuration.
	ErrInvalidConsumer = errors.New("invalid consumer configuration")

	// ErrInvalidExchange indicates invalid exchange declaration parameters.
	ErrInvalidExchange = errors.New("invalid exchange declaration")

	// ErrInvalidQueue indicates invalid queue declaration parameters.
	ErrInvalidQueue = errors.New("invalid queue declaration")

	// ErrInvalidBind indicates invalid queue binding parameters.
	ErrInvalidBind = errors.New("invalid queue binding")

	// ErrInvalidDelay indicates an invalid delay option value.
	ErrInvalidDelay = errors.New("invalid delay option value")
)

--------------------------------------------------------------------------- Standard errors returned by Session methods. ---------------------------------------------------------------------------

Functions

This section is empty.

Types

type Bind

type Bind struct {
	QueueName    string
	ExchangeName string
	Key          string

	IsUsageDefault bool
}

Bind describes a binding of a queue to an exchange via a routing key.

Fields:

  • QueueName — queue name.
  • ExchangeName — exchange name.
  • Key — routing key used to route messages to the queue.
  • IsUsageDefault — if true, this routing key will be used by Publish/PublishWithContext by default.

type Consumer

type Consumer struct {
	Name      string
	QueueName string
	AutoAck   bool
	Exclusive bool

	Handler EventHandler
	// contains filtered or unexported fields
}

Consumer describes a message consumer from an AMQP queue.

Fields:

  • Name — unique consumer tag. If empty, one is generated automatically.
  • QueueName — name of the queue the consumer reads messages from.
  • AutoAck — if true, the server considers the message delivered immediately (no Ack needed).
  • Exclusive — if true, only one consumer can read from the queue.
  • Handler — function called for each message.
  • run — atomic flag: 1 = consumer goroutine is running, 0 = stopped. Used by handleConsumers to determine if a restart is needed.

type Delivery added in v0.10.0

type Delivery struct {
	Exchange   string
	RoutingKey string

	Body []byte
}

Delivery represents a received message in a simplified form. It hides the internal details of amqp.Delivery, exposing only the fields useful to the handler: exchange, routing key, and message body.

type EventHandler

type EventHandler func(Delivery) error

EventHandler is a function that handles an incoming message. If it returns nil, the message is acknowledged (Ack). If it returns an error, the message is rejected (Nack) and requeued.

type Exchange

type Exchange struct {
	Name       string
	Kind       string
	Durable    bool
	AutoDelete bool
	Internal   bool

	IsUsageDefault bool
}

Exchange describes an AMQP exchange to be declared on the server.

Fields:

  • Name — exchange name (e.g., "events").
  • Kind — exchange type: "direct", "fanout", "topic", "headers".
  • Durable — if true, the exchange survives server restarts.
  • AutoDelete — if true, the exchange is deleted when all queues unbind from it.
  • Internal — if true, the exchange is only accessible for routing between exchanges.
  • IsUsageDefault — if true, this exchange will be used by Publish/PublishWithContext by default.

type Logger added in v1.0.0

type Logger interface {
	Printf(format string, v ...any)
}

Logger defines the interface for replacing the session's logger. Any implementation providing a Printf method can be passed via the WithLogger option.

type Option

type Option func(*Session) error

Option is a functional option applied to Session during New(). If an option returns an error, it is logged but session creation continues.

func WithDeclare

func WithDeclare(ex Exchange, q Queue, b Bind) Option

WithDeclare creates an option that registers an exchange, queue, and binding in the session. Resources will be declared on the server on every reconnection and channel reinitialization.

If IsUsageDefault is set for the exchange, queue, or bind, they will be used as default resources for the Publish and Subscribe methods.

func WithLogger added in v1.0.0

func WithLogger(l Logger) Option

WithLogger creates an option to replace the session's default logger. If nil is passed, the logger is not changed (defaultLogger remains).

func WithReInitDelay added in v1.0.0

func WithReInitDelay(d time.Duration) Option

WithReInitDelay overrides the delay between channel re-initialization attempts. The value must be > 0.

func WithReconnectDelay added in v1.0.0

func WithReconnectDelay(d time.Duration) Option

WithReconnectDelay overrides the delay between reconnect attempts. The value must be > 0.

func WithResendDelay added in v1.0.0

func WithResendDelay(d time.Duration) Option

WithResendDelay overrides the delay between publish retry attempts. The value must be > 0.

type Queue

type Queue struct {
	Name       string
	Durable    bool
	AutoDelete bool
	Exclusive  bool

	IsUsageDefault bool
}

Queue describes an AMQP queue for declaration on the server.

Fields:

  • Name — queue name. Must be non-empty.
  • Durable — if true, the queue survives server restarts.
  • AutoDelete — if true, the queue is deleted when all consumers disconnect.
  • Exclusive — if true, the queue is only accessible by the current connection and deleted when it closes.
  • IsUsageDefault — if true, this queue will be used by the Subscribe method by default.

type Session

type Session struct {
	// contains filtered or unexported fields
}

Session encapsulates the AMQP connection, channel, resource declarations, and consumer management. It provides automatic reconnection and reinitialization on failures.

Lifecycle:

  1. New() creates a session and launches three background goroutines.
  2. handleReconnect establishes the TCP connection to RabbitMQ.
  3. handleReOpenChannel opens an AMQP channel, declares resources, enables confirm mode.
  4. handleConsumers starts consumers after the channel is ready.
  5. Close() stops all goroutines, waits for message processing to finish, closes the channel and connection.

Concurrent access:

  • mu (sync.RWMutex) protects: queues, exchanges, binds, defaultExchange, defaultQueue, defaultBind, consumers, connection, notifyConnClose, notifyChanClose, notifyConfirm, connReadyCh, chanReadyCh.
  • publishMu (sync.Mutex) serializes PublishWithContext calls to ensure correct correspondence between publishes and confirms.
  • channel (atomic.Pointer) — atomic pointer to the current AMQP channel.
  • isReady, isClosed (int32) — atomic state flags.

func New

func New(addr string, opts ...Option) *Session

New creates a new session and immediately begins connection attempts to the server.

Three background goroutines are started automatically:

  • handleReconnect — establishes and restores the TCP connection.
  • handleReOpenChannel — opens the AMQP channel and declares resources.
  • handleConsumers — starts consumers when the channel is ready.

Options (opts) are applied sequentially. If an option returns an error (e.g., WithDeclare on an already-connected channel fails to declare a resource), the error is logged but session creation continues — the resource will be re-declared on the next reconnection.

The returned session must be closed with Close() to release resources.

func (*Session) AddConsumer added in v0.7.0

func (session *Session) AddConsumer(c *Consumer) error

AddConsumer registers a new consumer in the session.

If c.Name is empty, a unique name of the format "consumer-<hex>" is generated. If a consumer with the same name already exists, returns ErrConsumerAlreadyExists.

After registration, the consumer will be automatically started by the handleConsumers goroutine when the AMQP channel is ready. On reconnection, the consumer is restarted automatically.

func (*Session) ChannelClose

func (session *Session) ChannelClose() error

ChannelClose forcibly closes the current AMQP channel. This will trigger a channel error, and handleReOpenChannel will automatically create a new one.

Useful for testing channel error recovery behavior.

func (*Session) Close

func (session *Session) Close() error

Close gracefully shuts down the session.

Sequence:

  1. Atomically sets isClosed = 1 (CAS prevents duplicate calls).
  2. Closes the done channel — all goroutines receive the termination signal.
  3. Resets isReady = 0 — new publishes will be rejected.
  4. Closes chanReadyCh (setChanReady(true)) — unblocks consumer goroutines waiting for channel readiness so they can see the closed done and exit.
  5. Waits for all consumer goroutines to finish (consumerWG.Wait()).
  6. Closes the AMQP channel.
  7. Closes the AMQP connection (if not already closed by the server).

Returns ErrAlreadyClosed on duplicate calls. Returns the channel or connection close error (first non-nil).

func (*Session) ExchangeDeclare added in v0.9.0

func (session *Session) ExchangeDeclare(ex Exchange) error

ExchangeDeclare registers an exchange in the session and declares it on the server if the channel is already initialized.

If an exchange with the same name is already registered, no duplicate is added. If ex.IsUsageDefault == true, this exchange becomes the default for Publish/PublishWithContext. Exchange name and kind must be non-empty.

func (*Session) Publish added in v0.7.0

func (session *Session) Publish(message []byte) error

Publish sends a message to the default exchange and routing key, waiting for a server confirmation.

Blocks until:

  • The server confirms receipt (Ack) — returns nil.
  • The session is closed — returns ErrShutdown.

Uses context.Background(), so it cannot be cancelled externally. For cancellable publishing, use PublishWithContext.

Requires defaultExchange and defaultBind to be set (via WithDeclare with IsUsageDefault=true), otherwise returns ErrNotSetDefaultExchange.

func (*Session) PublishTo added in v0.7.0

func (session *Session) PublishTo(exchange, key string, message []byte) error

PublishTo sends a message to the specified exchange with the specified routing key, waiting for a server confirmation.

Similar to Publish but allows explicit exchange and routing key specification. Uses context.Background().

func (*Session) PublishToWithContext added in v1.0.0

func (session *Session) PublishToWithContext(ctx context.Context, exchange, key string, message []byte) error

PublishToWithContext sends a message to the specified exchange with the specified routing key with context cancellation support.

func (*Session) PublishWithContext added in v1.0.0

func (session *Session) PublishWithContext(ctx context.Context, message []byte) error

PublishWithContext sends a message to the default exchange and routing key with context cancellation support.

Blocks until confirmation, context cancellation, or session shutdown. Returns ctx.Err() on context cancellation.

func (*Session) QueueBind added in v0.9.0

func (session *Session) QueueBind(b Bind) error

QueueBind registers a binding of a queue to an exchange and performs the binding on the server if the channel is already initialized.

A binding is considered a duplicate if QueueName, ExchangeName, and Key all match. If b.IsUsageDefault == true, this binding's routing key will be used by Publish/PublishWithContext by default. QueueName and ExchangeName must be non-empty.

func (*Session) QueueDeclare

func (session *Session) QueueDeclare(q Queue) error

QueueDeclare registers a queue in the session and declares it on the server if the channel is already initialized.

If a queue with the same name is already registered, no duplicate is added. If q.IsUsageDefault == true, this queue becomes the default queue for Subscribe. Queue name must be non-empty.

func (*Session) RemoveConsumer added in v1.0.0

func (session *Session) RemoveConsumer(tag string) error

RemoveConsumer stops and removes a consumer by its name (tag).

Removes the consumer from the map and cancels it on the AMQP server via ch.Cancel(). After cancellation, the AMQP server closes the delivery channel, and the consumer goroutine terminates automatically.

If no consumer with the given tag is found, returns nil (not an error).

func (*Session) Stream

func (session *Session) Stream(c *Consumer) (<-chan amqp.Delivery, error)

Stream creates an AMQP consumer for the specified consumer and returns a delivery channel. This is a low-level method — for most cases, prefer Subscribe or SubscribeTo.

Returns ErrNotConnected if the session is not ready.

The delivery channel is closed automatically when the AMQP channel closes or the consumer is cancelled via ch.Cancel().

func (*Session) Subscribe

func (session *Session) Subscribe(handler func(Delivery) error) error

Subscribe registers a handler for the default queue. Requires defaultQueue to be set (via WithDeclare with IsUsageDefault=true).

The consumer is automatically restarted on reconnection.

func (*Session) SubscribeTo added in v0.7.0

func (session *Session) SubscribeTo(queue string, handler func(Delivery) error) error

SubscribeTo registers a handler for the specified queue. Creates a Consumer with an auto-generated name and registers it.

The consumer is automatically restarted on reconnection.

func (*Session) SubscribeToWithTag added in v1.0.0

func (session *Session) SubscribeToWithTag(queue string, handler func(Delivery) error) (string, error)

SubscribeToWithTag registers a handler for the specified queue and returns the assigned consumer tag.

func (*Session) SubscribeWithTag added in v1.0.0

func (session *Session) SubscribeWithTag(handler func(Delivery) error) (string, error)

SubscribeWithTag registers a handler for the default queue and returns the assigned consumer tag.

func (*Session) UnsafePublish added in v0.7.0

func (session *Session) UnsafePublish(message []byte, exchange, key string) error

UnsafePublish sends a message without waiting for a server confirmation. Returns an error if the session is not ready.

WARNING: This method does NOT guarantee delivery. The server may not receive the message due to network issues or buffer overflow.

WARNING: This method does NOT acquire publishMu and is NOT safe for concurrent use with Publish, PublishTo, PublishWithContext, or PublishToWithContext. Use it only when there are guaranteed no concurrent calls to Publish* methods.

func (*Session) UnsafePublishTo added in v1.0.0

func (session *Session) UnsafePublishTo(exchange, key string, message []byte) error

UnsafePublishTo publishes a message without waiting for a server confirmation. Parameters are ordered as exchange, key, message to match PublishTo.

func (*Session) UnsafePublishToWithContext added in v1.0.0

func (session *Session) UnsafePublishToWithContext(ctx context.Context, exchange, key string, message []byte) error

UnsafePublishToWithContext publishes a message without waiting for a server confirmation and supports context cancellation.

func (*Session) WaitReady added in v1.0.0

func (session *Session) WaitReady(ctx context.Context) error

WaitReady blocks until the session is ready to publish/consume. Returns ErrShutdown if the session is closed.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL