Documentation
¶
Index ¶
- Constants
- type Bind
- type ConsumerConfig
- type ControlQosConfig
- type Exchange
- type Message
- type ProducerConfig
- type Queue
- type RabbitInterface
- type Rbm_pool
- func (r *Rbm_pool) Ack(ctx context.Context, msg *amqp.Delivery)
- func (r *Rbm_pool) Close() error
- func (r *Rbm_pool) Consumer(ctx context.Context, cc *ConsumerConfig, chanMessage chan amqp.Delivery)
- func (r *Rbm_pool) ExchangeDeclare(exchange Exchange) error
- func (r *Rbm_pool) Producer(ctx context.Context, pc *ProducerConfig, msg *Message) error
- func (r *Rbm_pool) ProducerWithContext(ctx context.Context, pc *ProducerConfig, msg *Message) error
- func (r *Rbm_pool) QueueDeclare(queue Queue) error
- func (r *Rbm_pool) Reject(ctx context.Context, msg *amqp.Delivery, requeue bool)
Constants ¶
const ( DefaultMaxReconnectTimes = 3 // 3 attempts WaitTime = 5 * time.Second // 5 seconds wait time before retrying to connect )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConsumerConfig ¶
type ControlQosConfig ¶
type ProducerConfig ¶
type RabbitInterface ¶
type RabbitInterface interface {
ExchangeDeclare(exchange Exchange) error
QueueDeclare(queue Queue) error
Producer(ctx context.Context, pc *ProducerConfig, msg *Message) error
ProducerWithContext(ctx context.Context, pc *ProducerConfig, msg *Message) error
Consumer(ctx context.Context, cc *ConsumerConfig, chanMessage chan amqp.Delivery)
Reject(ctx context.Context, msg *amqp.Delivery, requeue bool)
Ack(ctx context.Context, msg *amqp.Delivery)
Close() error
}
func NewRabbitmq ¶ added in v0.3.0
func NewRabbitmq(url string, args map[string]interface{}) RabbitInterface
NewRabbitmq creates a new RabbitMQ connection pool It reads the RabbitMQ URL and maximum reconnect times from environment variables. If the environment variables are not set, it uses default values. It returns a RabbitInterface that can be used to interact with RabbitMQ. If the connection cannot be established, it logs an error and returns nil. The function also handles reconnection logic in case of connection failures. It is important to ensure that the RabbitMQ server is running and accessible at the specified URL. The function is designed to be used in a context where RabbitMQ is required for message queuing and processing.
Example usage:
rabbit := NewRabbitmq("amqp://guest:guest@localhost:5672/", map[string]string{"RABBITMQ_MAXX_RECONNECT_TIMES": "5"})
type Rbm_pool ¶
type Rbm_pool struct {
Channel *amqp.Channel
RMQ_URI string // RabbitMQ URI
Args map[string]interface{}
// contains filtered or unexported fields
}