mmq

package module
v1.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 8, 2023 License: MIT Imports: 6 Imported by: 0

README

MQ

A wrapper over wrapper for easy working with RabbitMQ message-broker.

Installation

$ go get github.com/kaazedev/mq

Usage

func main() {
    // Create a new instance of the MQ package
    app := mq.New("amqp://admin:admin@localhost:5672/")

    // Connect to the RabbitMQ server
    if err := app.Connect(); err != nil {
        panic(err)
    }

    // Create an exchange with the name "testing" and type "topic" and durable
    app.DeclareExchange("testing", "topic", true)

    // Also you can declare a multiple queues at once
    // app.DeclareExchanges(mq.Exchange{...}, mq.Exchange{...})
	
    // Create a queue with the name "testing", exchange "testing" and routing key "testing"
    app.DeclareQueue("testing", "testing", "testing")
	
    // Also you can declare a multiple queues at once
    // app.DeclareQueues(mq.Queue{...}, mq.Queue{...})
    
    // Consume messages from the queue "testing" and pass them to the handler function
    app.Consume("testing", handler)
    
    // Publish a message to the exchange "testing" with the routing key "testing"
    app.Publish("testing", "testing", []byte("Hi!"))
    
    // Just a loop to keep the program running
    app.Listen()
}

func handler(ctx *mq.Context) {
    // Print the message body, which is a byte array
    fmt.Println("Received a message: ", string(ctx.Delivery.Body))
    
    // You can also get the message headers and other properties
    fmt.Println("Message headers: ", ctx.Delivery.Headers)
    
    // Also you can nAck, Ack, reject the messages manually
    // ctx.Ack()
    // ctx.Nack() 
    // ctx.Reject(fa)
    
    // Send a response to the queue, will be sent to the queue that sent the message with automatic acknowledgement
    ctx.SendResponse(map[string]string{
        "message": "Hello from Go!",
    })
}

Credits and license

hadihammurabi/go-rabbitmq
streadway/amqp

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Context

type Context struct {
	App      *MMQ
	Channel  *amqp.Channel
	Delivery amqp.Delivery
}

func (*Context) Ack

func (m *Context) Ack(multiple bool) error

Ack the message This will acknowledge the message and remove it from the queue @param multiple If true, will acknowledge all messages up to this one

func (*Context) Nack

func (m *Context) Nack(multiple, requeue bool) error

Nack the message This will not acknowledge the message and remove it from the queue @param multiple If true, will acknowledge all messages up to this one @param requeue If true, will requeue the message

func (*Context) Reject

func (m *Context) Reject(requeue bool) error

Reject the message This will not acknowledge the message and remove it from the queue @param requeue If true, will requeue the message

func (*Context) SendResponse

func (m *Context) SendResponse(message interface{}, headers amqp.Table) error

SendResponse sends a response to the message This will send a response to the message and acknowledge the message @param message The message to send as a response @param headers The headers to send with the message

type ExchageType

type ExchageType int
const (
	ExchangeTypeDirect ExchageType = 0
	ExchangeTypeFanout ExchageType = 1
	ExchangeTypeTopic  ExchageType = 2
)

type Exchange

type Exchange struct {
	Name    string
	Type    ExchageType
	Durable bool
}

Exchange is a struct that contains the name, type and durability of an exchange

type Handler

type Handler func(ctx *Context)

Handler is a function that is called when a message is received

type MMQ

type MMQ struct {
	// contains filtered or unexported fields
}

func New

func New(connectionUrl string) *MMQ

New creates a new instance of the MMQ package @param connection The connection URL to the RabbitMQ server @return *MMQ

func (*MMQ) Close

func (m *MMQ) Close()

Close the connection to the RabbitMQ server

func (*MMQ) Connect

func (m *MMQ) Connect() (err error)

Connect to the RabbitMQ server

func (*MMQ) Consume

func (m *MMQ) Consume(q string, handler Handler)

Consume a queue with a handler function The handler function will be called with a message context - @param q The name of the queue - @param handler The handler function

func (*MMQ) DeclareExchange

func (m *MMQ) DeclareExchange(name string, exchangeType ExchageType, durable bool)

DeclareExchange declares an exchange with a name, type and durability @param name The name of the exchange @param exchangeType The type of the exchange @param durable Whether the exchange is durable or not

func (*MMQ) DeclareExchanges

func (m *MMQ) DeclareExchanges(exchanges ...Exchange)

DeclareExchanges declares multiple exchanges @param exchanges The exchanges to declare

func (*MMQ) DeclareQueue

func (m *MMQ) DeclareQueue(name, exchange, routingKey string, durable bool)

DeclareQueue declares a queue with a name, exchange and routing key @param name The name of the queue @param exchange The name of the exchange @param routingKey The routing key

func (*MMQ) DeclareQueues

func (m *MMQ) DeclareQueues(queues ...Queue)

DeclareQueues declares multiple queues @param queues The queues to declare

func (*MMQ) GetQueue

func (m *MMQ) GetQueue(name string) *queue.Queue

GetQueue returns a queue by name @param name The name of the queue @return *queue.Queue

func (*MMQ) GetQueues

func (m *MMQ) GetQueues() map[string]*queue.Queue

GetQueues returns all queues @return map[string]*queue.Queue

func (*MMQ) Listen

func (m *MMQ) Listen()

Listen is a loop to keep the program running

func (*MMQ) Publish

func (m *MMQ) Publish(q, e string, message []byte, headers amqp.Table) error

Publish a message to a queue @param q The name of the queue @param e The name of the exchange @param message The message to publish

type Queue

type Queue struct {
	Name       string
	Exchange   string
	RoutingKey string
	Durable    bool
}

Queue is a struct that contains the name, exchange and routing key of a queue

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL