mmq

package module
v1.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 18, 2023 License: MIT Imports: 5 Imported by: 0

README

MQ

A wrapper over amqp091 for easy working with RabbitMQ message-broker.

Installation

$ go get github.com/kaazedev/mq

Features

  • Easy to use
  • Automatic reconnection

Usage

func main() {
    app := mmq.New(mmq.Settings{
        ConnectionUrl:  "amqp://admin:admin@localhost:5672/", // RabbitMQ connection url
        AutoReconnect:  true, // Enable auto reconnection
        PublishTimeout: 10, // Publish timeout in seconds
    })
    // Defer close connection
    defer app.Close()
    
    // Connect to RabbitMQ server
    if err := app.Connect(); err != nil {
        panic(err)
    }
    
    // Declare queue and exchange in long form
    err := app.DeclareExchange(exchange, "topic", true, false, false, false)
    ...
    
    err = app.DeclareQueue(name, true, false, false, false)
    ...
    
    err = app.BindQueue(name, routingKey, exchange, false, nil)
    ...
	
    // Declare queue and exchange in short form
    app.Queue("test2", "test2", "test2")
	
    // Subscribe to queue and handle messages by callback
    app.Consume("test2", Handle)
    
    // Forever loop with reconnection if enabled
    app.Listen()
}

func Handle(c *mmq.Context) {
    // Get message body
    body := c.Delivery.Body
	
    // Reply to message
    err := c.Reply([]byte("answer"))
    if err == nil {
        // Acknowledge message
        c.Ack(true)
		
        // Nack message without requeue
        c.Nack(false, false)
    
        // Reject message with requeue
        c.Reject(false)
    }
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Context

type Context struct {
	App      *MMQ
	Delivery amqp091.Delivery
	Channel  *amqp091.Channel
}

func (*Context) Ack

func (ctx *Context) Ack(multiple bool) error

func (*Context) Bind added in v1.0.6

func (ctx *Context) Bind(data interface{}) error

func (*Context) Nack

func (ctx *Context) Nack(multiple, requeue bool) error

func (*Context) Reject

func (ctx *Context) Reject(requeue bool) error

func (*Context) Reply added in v1.0.6

func (ctx *Context) Reply(data []byte) error

type Handler

type Handler func(ctx *Context)

type MMQ

type MMQ struct {
	// contains filtered or unexported fields
}

func New

func New(conf Settings) *MMQ

func (*MMQ) BindQueue added in v1.0.6

func (m *MMQ) BindQueue(name, routingKey, exchange string, noWait bool, args amqp091.Table) error

func (*MMQ) Close

func (m *MMQ) Close() error

func (*MMQ) Connect

func (m *MMQ) Connect() error

func (*MMQ) Consume

func (m *MMQ) Consume(name string, handler Handler)

func (*MMQ) DeclareExchange

func (m *MMQ) DeclareExchange(name string, kind string, durable, autoDelete, internal, noWait bool) error

func (*MMQ) DeclareQueue

func (m *MMQ) DeclareQueue(name string, durable, autoDelete, noWait, exclusive bool) error

func (*MMQ) Listen

func (m *MMQ) Listen()

func (*MMQ) Publish

func (m *MMQ) Publish(name, exchange string, body []byte) error

func (*MMQ) Queue added in v1.0.6

func (m *MMQ) Queue(name, routingKey, exchange string)

func (*MMQ) Reconnect added in v1.0.6

func (m *MMQ) Reconnect()

func (*MMQ) Request added in v1.0.6

func (m *MMQ) Request(name string, body []byte) (data []byte, err error)

type Settings added in v1.0.6

type Settings struct {
	PublishTimeout int
	AutoReconnect  bool
	ConnectionUrl  string
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL