rabbitmq

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 12, 2025 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ExchangeDirect  exchangeType = "direct"
	ExchangeFanout  exchangeType = "fanout"
	ExchangeTopic   exchangeType = "topic"
	ExchangeHeaders exchangeType = "headers"
)

Variables

View Source
var AppName = "app"

Functions

func InitTracer

func InitTracer()

Types

type Callback

type Callback interface {
	Process(ctx context.Context, data *[]byte, headers map[string]any, route, contentType string) error
}

type ChannelKey

type ChannelKey struct {
	Queue    string
	Consumer string
	Exchange string
	Key      string
}

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

func NewConnection

func NewConnection(dsn string, backoffPolicy []time.Duration, prefetchCount uint16) *Connection

func (*Connection) Close

func (c *Connection) Close(ctx context.Context) error

func (*Connection) Connect

func (c *Connection) Connect(ctx context.Context, errGroup *errgroup.Group) error

func (*Connection) Consume

func (c *Connection) Consume(ctx context.Context, queue, consumer string, autoAck, exclusive, noLocal, noWait bool,
	args amqp.Table,
) (<-chan amqp.Delivery, error)

func (*Connection) ExchangeDeclare

func (c *Connection) ExchangeDeclare(e *Exchange) error

func (*Connection) GetChannelFromPool

func (c *Connection) GetChannelFromPool(key ChannelKey) (ch *amqp.Channel, err error)

func (*Connection) IsClosed

func (c *Connection) IsClosed() bool

func (*Connection) Publish

func (c *Connection) Publish(ctx context.Context, exchange, key string, mandatory, immediate bool,
	msg *amqp.Publishing,
) error

func (*Connection) QueueBind

func (c *Connection) QueueBind(name, key string, q *Queue, args amqp.Table) error

func (*Connection) QueueDeclare

func (c *Connection) QueueDeclare(name string, q *Queue) (amqp.Queue, error)

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(cnf *Queue, ch SubConn) *Consumer

func NewConsumerWithRoute

func NewConsumerWithRoute(cnf *Queue, ch SubConn, route string) *Consumer

func (*Consumer) Connect

func (c *Consumer) Connect() error

func (*Consumer) Consume

func (c *Consumer) Consume(ctx context.Context, errGroup *errgroup.Group, cb Callback)

type DLQConfig

type DLQConfig struct {
	Enabled bool
	TTL     time.Duration
}

type Exchange

type Exchange struct {
	Name       string
	Type       exchangeType
	Durable    bool
	AutoDelete bool
	Internal   bool
	NoWait     bool
	Args       amqp.Table
}

type PubConn

type PubConn interface {
	ExchangeDeclare(e *Exchange) error
	Publish(ctx context.Context, exchange, key string, mandatory, immediate bool, msg *amqp.Publishing) error
}

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

func NewPublisher

func NewPublisher(cnf *Exchange, ch PubConn) *Publisher

func NewPublisherWithRoute

func NewPublisherWithRoute(cnf *Exchange, ch PubConn, route string) *Publisher

func (*Publisher) Connect

func (p *Publisher) Connect() error

func (*Publisher) SendMessage

func (p *Publisher) SendMessage(ctx context.Context, body []byte, typ string) error

func (*Publisher) SendMessageToExchangeWithHeaders

func (p *Publisher) SendMessageToExchangeWithHeaders(
	ctx context.Context,
	exchange string,
	body []byte,
	route string,
	exp string,
	headers map[string]any,
	typ string,
) error

func (*Publisher) SendMessageWithHeaders

func (p *Publisher) SendMessageWithHeaders(
	ctx context.Context,
	body []byte,
	route string,
	exp string,
	headers map[string]any,
	typ string,
) error

func (*Publisher) SendMessageWithRoute

func (p *Publisher) SendMessageWithRoute(ctx context.Context, body []byte, route string, typ string) error

type Queue

type Queue struct {
	Exchange   string
	Durable    bool
	AutoDelete bool
	Exclusive  bool
	NoWait     bool
	Args       amqp.Table
	DLQ        *DLQConfig
}

type SubConn

type SubConn interface {
	ExchangeDeclare(e *Exchange) error
	QueueDeclare(name string, q *Queue) (amqp.Queue, error)
	QueueBind(name, key string, q *Queue, args amqp.Table) error
	Consume(ctx context.Context, queue, consumer string, autoAck, exclusive, noLocal, noWait bool,
		args amqp.Table) (<-chan amqp.Delivery, error)
	IsClosed() bool
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL