rabbitmq

package
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 18, 2025 License: MIT Imports: 9 Imported by: 1

Documentation

Index

Constants

View Source
const (
	DefaultMaxReconnectTimes = 3               // 3 attempts
	WaitTime                 = 5 * time.Second // 5 seconds wait time before retrying to connect
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Bind

type Bind struct {
	ExchangeName string
	BindingKey   string
}

type ConsumerConfig

type ConsumerConfig struct {
	Queue            string
	Consumer         string
	AutoAck          bool
	Exclusive        bool
	NoLocal          bool
	NoWait           bool
	Args             amqp.Table
	ControlQosConfig *ControlQosConfig
}

type ControlQosConfig

type ControlQosConfig struct {
	PrefetchCount int
	PrefetchSize  int
	Global        bool
}

type Exchange

type Exchange struct {
	Name       string     // name
	Kind       string     // kind of exchange. ex: 'direct' | 'topic' | 'fanout'
	Durable    bool       // durable
	AutoDelete bool       // delete when unused
	Internal   bool       // internal exchange
	NoWait     bool       // no-wait
	Arguments  amqp.Table // arguments
}

type Message

type Message struct {
	Data        []byte
	ContentType string
}

type ProducerConfig

type ProducerConfig struct {
	Exchange  string
	Key       string
	Mandatory bool
	Immediate bool
}

type Queue

type Queue struct {
	Name       string     // name
	Durable    bool       // durable
	AutoDelete bool       // delete when unused
	Exclusive  bool       // exclusive
	NoWait     bool       // no-wait
	Arguments  amqp.Table // arguments
	Binds      *[]Bind    // bind to exchange and route with queue bind
}

type RabbitInterface

type RabbitInterface interface {
	ExchangeDeclare(exchange Exchange) error
	QueueDeclare(queue Queue) error
	Producer(ctx context.Context, pc *ProducerConfig, msg *Message) error
	ProducerWithContext(ctx context.Context, pc *ProducerConfig, msg *Message) error
	Consumer(ctx context.Context, cc *ConsumerConfig, chanMessage chan amqp.Delivery)
	Reject(ctx context.Context, msg *amqp.Delivery, requeue bool)
	Ack(ctx context.Context, msg *amqp.Delivery)
	Close() error
}

func NewRabbitmq added in v0.3.0

func NewRabbitmq(url string, args map[string]interface{}) RabbitInterface

NewRabbitmq creates a new RabbitMQ connection pool It reads the RabbitMQ URL and maximum reconnect times from environment variables. If the environment variables are not set, it uses default values. It returns a RabbitInterface that can be used to interact with RabbitMQ. If the connection cannot be established, it logs an error and returns nil. The function also handles reconnection logic in case of connection failures. It is important to ensure that the RabbitMQ server is running and accessible at the specified URL. The function is designed to be used in a context where RabbitMQ is required for message queuing and processing.

Example usage:

rabbit := NewRabbitmq("amqp://guest:guest@localhost:5672/", map[string]string{"RABBITMQ_MAXX_RECONNECT_TIMES": "5"})

type Rbm_pool

type Rbm_pool struct {
	Channel *amqp.Channel

	RMQ_URI string // RabbitMQ URI
	Args    map[string]interface{}
	// contains filtered or unexported fields
}

func (*Rbm_pool) Ack added in v0.3.0

func (r *Rbm_pool) Ack(ctx context.Context, msg *amqp.Delivery)

Ack acknowledges a message

func (*Rbm_pool) Close added in v0.6.0

func (r *Rbm_pool) Close() error

func (*Rbm_pool) Consumer

func (r *Rbm_pool) Consumer(ctx context.Context, cc *ConsumerConfig, chanMessage chan amqp.Delivery)

func (*Rbm_pool) ExchangeDeclare added in v0.3.0

func (r *Rbm_pool) ExchangeDeclare(exchange Exchange) error

func (*Rbm_pool) Producer

func (r *Rbm_pool) Producer(ctx context.Context, pc *ProducerConfig, msg *Message) error

func (*Rbm_pool) ProducerWithContext added in v0.3.0

func (r *Rbm_pool) ProducerWithContext(ctx context.Context, pc *ProducerConfig, msg *Message) error

func (*Rbm_pool) QueueDeclare added in v0.3.0

func (r *Rbm_pool) QueueDeclare(queue Queue) error

func (*Rbm_pool) Reject added in v0.3.0

func (r *Rbm_pool) Reject(ctx context.Context, msg *amqp.Delivery, requeue bool)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL