rabbit

package
v0.0.0-...-a63676f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2026 License: MIT Imports: 14 Imported by: 0

README

RabbitMQ Provider

This package provides a RabbitMQ implementation of the mq.Provider interface.

Features

  • ✅ Publish
  • ✅ Subscribe
  • ✅ Reconnect
  • ✅ OpenTelemetry
  • ✅ Logging

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Channel

type Channel struct {
	*amqp.Channel
	// contains filtered or unexported fields
}

Channel amqp.Channel wapper

func (*Channel) Close

func (ch *Channel) Close() error

Close ensure closed flag set

func (*Channel) Consume

func (ch *Channel) Consume(ctx context.Context, queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error)

Consume wrap amqp.Channel.Consume, the returned delivery will end only when channel closed by developer

func (*Channel) IsClosed

func (ch *Channel) IsClosed() bool

IsClosed indicate closed by developer

type Config

type Config struct {
	URI           string
	ReconnectTime int
}

type Connection

type Connection struct {
	*amqp.Connection
	// contains filtered or unexported fields
}

Connection amqp.Connection wrapper

func (*Connection) Channel

func (c *Connection) Channel() (*Channel, error)

Channel wraps amqp.Connection.Channel to provide an auto-reconnecting channel.

type MQ

type MQ struct {
	// contains filtered or unexported fields
}

func New

func New(log logger.Logger, cfg *config.Config) *MQ

func (*MQ) Check

func (mq *MQ) Check(_ context.Context) error

Check verifies the connection status.

func (*MQ) Dial

func (mq *MQ) Dial() error

Dial wraps amqp.Dial to establish a connection and set up automatic reconnection in case the connection is lost.

func (*MQ) Init

func (mq *MQ) Init(ctx context.Context, log logger.Logger) error

Init initializes the RabbitMQ connection and sets up the channel. It also sets up a graceful shutdown mechanism to close the connection and channel when the context is done.

func (*MQ) Publish

func (mq *MQ) Publish(ctx context.Context, target string, routingKey, payload []byte) error

func (*MQ) Subscribe

func (mq *MQ) Subscribe(ctx context.Context, target string, message query.Response) error

Subscribe binds a durable queue to a fanout exchange named target, then delivers messages to message.Chan. The exchange is created if missing (durable fanout). Publish must use the same exchange name as target with any routing key. Returns immediately; a background goroutine reads deliveries until ctx is done or the subscription channel is closed.

func (*MQ) UnSubscribe

func (mq *MQ) UnSubscribe(target string) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL