mmq

package module
v1.0.15 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 7, 2023 License: MIT Imports: 6 Imported by: 0

README

MQ

A wrapper over amqp091 for easy working with RabbitMQ message-broker.

Installation

$ go get github.com/kaazedev/mmq

Features

  • Easy to use
  • Middlewares
  • Multiple Listeners

Usage

Short Form
func main() {
    app := mmq.New(mmq.Settings{
        ConnectionUrl:  "amqp://admin:admin@localhost:5672/", // RabbitMQ connection url
        PublishTimeout: 10, // Publish timeout in seconds
    })
    // Defer close connection
    defer app.Close()
    
    // Connect to RabbitMQ server
    if err := app.Connect(); err != nil {
        panic(err)
    }
	
    // Declare queue and exchange in short form, sets topic type
    app.Queue("test", "test", "test")
	
    // Subscribe to queue and handle messages by callback
    app.Consume("test", Handler)
	
	// Subscribe to queue with 15 workers and handle messages by callback
    app.ConsumeMultiply("test", 15, Handler)
    
    // Forever loop with reconnection if enabled
    app.Listen()
}

func Handler(c *mmq.Context) {
    // Get message body
    body := c.Body()
	
	// Do your stuff here
	...
	
    // Reply to message
    return c.Reply([]byte("answer"))
}
Long Form
func main() {
    app := mmq.New(mmq.Settings{
        ConnectionUrl:  "amqp://admin:admin@localhost:5672/", // RabbitMQ connection url
        PublishTimeout: 10, // Publish timeout in seconds
    })
    // Defer close connection
    defer app.Close()
    
    // Connect to RabbitMQ server
    if err := app.Connect(); err != nil {
        panic(err)
    }
    
    // Declare queue and exchange in long form
    err := app.DeclareExchange(exchange, "topic", true, false, false, false)
    ...
    
    err = app.DeclareQueue(name, true, false, false, false)
    ...
    
    err = app.BindQueue(name, routingKey, exchange, false, nil)
    ...
	
    // Subscribe to queue and handle messages by callback
    app.Consume(routingKey, Handler)
    
    // Forever loop with reconnection if enabled
    app.Listen()
}

func Handler(c *mmq.Context) {
    // Get message body
    body := c.Body()
    
    // Do your stuff here
    ...
    
    // Reply to message
    return c.Reply([]byte("answer"))
}
Middlewares
func main() {
    ... 
    
    // use build-in logger for all consumers
    app.Use(logger.New())
	
    app.Consume(routingKey, Middleware, Handler)
    
    ...
}

func Middleware(c *mmq.Context) error {
	fmt.Println("before handler")
	
	if err := c.Next(); err != nil {
		return err
	}
	
	fmt.Println("after handler")
	
	return nil
}

func Handler(c *mmq.Context) error {
    ...
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Context

type Context struct {
	App      *MMQ
	Delivery amqp091.Delivery
	Channel  *amqp091.Channel
	// contains filtered or unexported fields
}

func (*Context) Ack

func (ctx *Context) Ack(multiple bool) error

func (*Context) AppId added in v1.0.8

func (ctx *Context) AppId() string

func (*Context) Bind added in v1.0.6

func (ctx *Context) Bind(data interface{}) error

func (*Context) Body added in v1.0.8

func (ctx *Context) Body() []byte

func (*Context) ConsumerTag added in v1.0.8

func (ctx *Context) ConsumerTag() string

func (*Context) ContentType added in v1.0.8

func (ctx *Context) ContentType() string

func (*Context) CorrelationId added in v1.0.8

func (ctx *Context) CorrelationId() string

func (*Context) DeliveryTag added in v1.0.8

func (ctx *Context) DeliveryTag() uint64

func (*Context) MessageCount added in v1.0.8

func (ctx *Context) MessageCount() uint32

func (*Context) MessageId added in v1.0.8

func (ctx *Context) MessageId() string

func (*Context) Nack

func (ctx *Context) Nack(multiple, requeue bool) error

func (*Context) Next added in v1.0.14

func (ctx *Context) Next() error

func (*Context) Redelivered added in v1.0.8

func (ctx *Context) Redelivered() bool

func (*Context) Reject

func (ctx *Context) Reject(requeue bool) error

func (*Context) Reply added in v1.0.6

func (ctx *Context) Reply(data []byte) error

func (*Context) ReplyJSON added in v1.0.7

func (ctx *Context) ReplyJSON(data interface{}) error

func (*Context) ReplyTo added in v1.0.8

func (ctx *Context) ReplyTo() string

func (*Context) Timestamp added in v1.0.8

func (ctx *Context) Timestamp() time.Time

func (*Context) Type added in v1.0.8

func (ctx *Context) Type() string

func (*Context) UserId added in v1.0.8

func (ctx *Context) UserId() string

func (*Context) WorkerId added in v1.0.14

func (ctx *Context) WorkerId() int

type Handler

type Handler func(ctx *Context) error

type JSON added in v1.0.7

type JSON map[string]interface{}

type MMQ

type MMQ struct {
	JsonMarshal   func(v interface{}) ([]byte, error)
	JsonUnmarshal func(data []byte, v interface{}) error
	// contains filtered or unexported fields
}

func New

func New(conf Settings) *MMQ

func (*MMQ) BindQueue added in v1.0.6

func (m *MMQ) BindQueue(name, routingKey, exchange string, noWait bool, args amqp091.Table) error

func (*MMQ) Close

func (m *MMQ) Close() error

func (*MMQ) Connect

func (m *MMQ) Connect() error

func (*MMQ) Consume

func (m *MMQ) Consume(name string, handler ...Handler)

func (*MMQ) ConsumeMultiply added in v1.0.14

func (m *MMQ) ConsumeMultiply(name string, workers int, handler ...Handler)

func (*MMQ) DeclareExchange

func (m *MMQ) DeclareExchange(name string, kind string, durable, autoDelete, internal, noWait bool) error

func (*MMQ) DeclareQueue

func (m *MMQ) DeclareQueue(name string, durable, autoDelete, noWait, exclusive bool) error

func (*MMQ) Listen

func (m *MMQ) Listen()

func (*MMQ) Publish

func (m *MMQ) Publish(name, exchange string, body []byte) error

func (*MMQ) PurgeExchange added in v1.0.8

func (m *MMQ) PurgeExchange(name string, ifUnused, noWait bool) error

func (*MMQ) PurgeQueue added in v1.0.8

func (m *MMQ) PurgeQueue(name string, noWait bool) error

func (*MMQ) Queue added in v1.0.6

func (m *MMQ) Queue(name, routingKey, exchange string)

func (*MMQ) Reconnect added in v1.0.6

func (m *MMQ) Reconnect()

func (*MMQ) Request added in v1.0.6

func (m *MMQ) Request(name string, body []byte) (data []byte, err error)

func (*MMQ) RequestJSON added in v1.0.10

func (m *MMQ) RequestJSON(name string, body interface{}, response interface{}) error

func (*MMQ) Use added in v1.0.14

func (m *MMQ) Use(handler Handler)

type Settings added in v1.0.6

type Settings struct {
	PublishTimeout int
	ConnectionUrl  string

	JsonMarshal   func(v interface{}) ([]byte, error)
	JsonUnmarshal func(data []byte, v interface{}) error
}

Directories

Path Synopsis
middlewares

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL