mmq

package module
v1.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 8, 2023 License: MIT Imports: 7 Imported by: 0

README

MQ

A wrapper over wrapper for easy working with RabbitMQ message-broker.

Installation

$ go get github.com/kaazedev/mq

Features

  • Easy to use
  • Automatic reconnection

Usage

package main

import (
	"fmt"
	"github.com/kaazedev/mmq"
	"github.com/streadway/amqp"
)

func main() {
    // Create a new instance of the MMQ package
    app := mmq.New("amqp://admin:admin@localhost:5672/")
    
    // Create an exchange with the name "testing" and type "topic" and durable
    app.DeclareExchange("testing", mmq.ExchangeTypeTopic, true)
    
    // Create a queue with the name "testing", exchange "testing" and routing key "testing"
    app.DeclareQueue("testing", "testing", "testing", true)
    
    // Consume messages from the queue "testing" and pass them to the handler function
    app.Consume("testing", handler)
    
    // Publish a message to the exchange "testing" with the routing key "testing"
    app.Publish("testing", "testing", []byte("Hi!"), amqp.Table{})
    
    // RPCCall will publish a message to the exchange "testing" with the routing key "testing" and wait for a response
    data, _ := app.RPCCall("testing", "testing", []byte("Hi!"), amqp.Table{})
    fmt.Println("Received a response: ", string(data))
    
    // Just a loop to keep the program running
    app.Listen()
}

func handler(ctx *mmq.Context) {
    fmt.Println("Received a message from queue: ", ctx.Delivery.CorrelationId)
    
    // Print the message body, which is a byte array
    fmt.Println("Received a message: ", string(ctx.Delivery.Body))
    
    // Also you can nAck or Ack the message manually
    // ctx.Ack(false)
    // ctx.Nack(false, false)
    // ctx.Reject(false)
    
    // Send a response to the queue, will be sent to the queue that sent the message with automatic acknowledgement
    ctx.SendResponse(map[string]string{
        "message": "Hello from Go!",
    }, amqp.Table{})
}

Credits and license

hadihammurabi/go-rabbitmq
streadway/amqp

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Context

type Context struct {
	App      *MMQ
	Channel  *amqp.Channel
	Delivery amqp.Delivery
}

func (*Context) Ack

func (m *Context) Ack(multiple bool) error

Ack the message This will acknowledge the message and remove it from the queue @param multiple If true, will acknowledge all messages up to this one

func (*Context) Nack

func (m *Context) Nack(multiple, requeue bool) error

Nack the message This will not acknowledge the message and remove it from the queue @param multiple If true, will acknowledge all messages up to this one @param requeue If true, will requeue the message

func (*Context) Reject

func (m *Context) Reject(requeue bool) error

Reject the message This will not acknowledge the message and remove it from the queue @param requeue If true, will requeue the message

func (*Context) SendResponse

func (m *Context) SendResponse(message interface{}, headers amqp.Table) error

SendResponse sends a response to the message This will send a response to the message and acknowledge the message @param message The message to send as a response @param headers The headers to send with the message

type ExchageType

type ExchageType int
const (
	ExchangeTypeDirect ExchageType = 0
	ExchangeTypeFanout ExchageType = 1
	ExchangeTypeTopic  ExchageType = 2
)

type Exchange

type Exchange struct {
	Name    string
	Type    ExchageType
	Durable bool
}

Exchange is a struct that contains the name, type and durability of an exchange

type Handler

type Handler func(ctx *Context)

Handler is a function that is called when a message is received

type MMQ

type MMQ struct {
	// contains filtered or unexported fields
}

func New

func New(connectionUrl string) *MMQ

New creates a new instance of the MMQ package @param connection The connection URL to the RabbitMQ server @return *MMQ

func (*MMQ) Close

func (m *MMQ) Close()

Close the connection to the RabbitMQ server

func (*MMQ) Connect

func (m *MMQ) Connect() (err error)

Connect to the RabbitMQ server

func (*MMQ) Consume

func (m *MMQ) Consume(q string, handler Handler)

Consume a queue with a handler function The handler function will be called with a message context - @param q The name of the queue - @param handler The handler function

func (*MMQ) DeclareExchange

func (m *MMQ) DeclareExchange(name string, exchangeType ExchageType, durable bool)

DeclareExchange declares an exchange with a name, type and durability @param name The name of the exchange @param exchangeType The type of the exchange @param durable Whether the exchange is durable or not

func (*MMQ) DeclareExchanges

func (m *MMQ) DeclareExchanges(exchanges ...Exchange)

DeclareExchanges declares multiple exchanges @param exchanges The exchanges to declare

func (*MMQ) DeclareQueue

func (m *MMQ) DeclareQueue(name, exchange, routingKey string, durable bool)

DeclareQueue declares a queue with a name, exchange and routing key @param name The name of the queue @param exchange The name of the exchange @param routingKey The routing key

func (*MMQ) DeclareQueues

func (m *MMQ) DeclareQueues(queues ...Queue)

DeclareQueues declares multiple queues @param queues The queues to declare

func (*MMQ) GetQueue

func (m *MMQ) GetQueue(name string) *queue.Queue

GetQueue returns a queue by name @param name The name of the queue @return *queue.Queue

func (*MMQ) GetQueues

func (m *MMQ) GetQueues() map[string]*queue.Queue

GetQueues returns all queues @return map[string]*queue.Queue

func (*MMQ) Listen

func (m *MMQ) Listen()

Listen is a loop to keep the program running

func (*MMQ) Publish

func (m *MMQ) Publish(q, e string, message []byte, headers amqp.Table) error

Publish a message to a queue @param q The name of the queue @param e The name of the exchange @param message The message to publish

func (*MMQ) RPCCall added in v1.0.5

func (m *MMQ) RPCCall(q, e string, message []byte, headers amqp.Table) ([]byte, error)

RPCCall a message to a queue @param q The name of the queue @param e The name of the exchange @param message The message to publish @return []byte @return error

type Queue

type Queue struct {
	Name       string
	Exchange   string
	RoutingKey string
	Durable    bool
}

Queue is a struct that contains the name, exchange and routing key of a queue

Directories

Path Synopsis
examples
pub command
pubsub command
sub command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL