eqmrmq

package module
v0.4.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 4, 2024 License: GPL-3.0 Imports: 5 Imported by: 0

README

EQMRMQ

EQMRMQ is a Go package for simplified interaction with RabbitMQ, specifically designed for sending messages, receiving responses, and consuming messages from queues with ease.

The eqmrmq package utilizes the github.com/rabbitmq/amqp091-go package, which is an AMQP 0.9.1 Go client library. This library provides the underlying functionality for interacting with RabbitMQ, including features such as establishing connections, creating channels, publishing messages, consuming messages, and handling acknowledgments. By leveraging amqp091-go, eqmrmq simplifies the process of sending and receiving messages to and from RabbitMQ queues within Go applications. EQMRMQ utilizes the slog package for structured logging.

Installation

To install EQMRMQ, use go get:

go get github.com/lacolle87/eqmrmq
Usage

Import EQMRMQ into your Go project:

import "github.com/lacolle87/eqmrmq"
Establishing a Connection

To establish a connection to RabbitMQ and monitor it:

package main

import (
	"log"
	"github.com/lacolle87/eqmrmq"
)

func main() {
	rabbitURL := "amqp://guest:guest@localhost:5672/"
	conn, err := eqmrmq.Connect(rabbitURL)
	if err != nil {
		log.Fatalf("Failed to connect to RabbitMQ: %v", err)
	}
	defer conn.Close()
}

The Connect function establishes a connection to RabbitMQ and starts monitoring it to automatically reconnect if the connection is lost.

Publishing a Message

To publish a message to a queue:

package main

import (
	"log"
	"github.com/lacolle87/eqmrmq"
)

func main() {
    // Create a new RabbitMQ connection
    rabbitURL := "amqp://guest:guest@localhost:5672/"
    conn, err := eqmrmq.Connect(rabbitURL)
    if err != nil {
        log.Fatalf("Failed to connect to RabbitMQ: %v", err)
    }
    defer conn.Close()

    // Create a channel
    ch, err := conn.Channel()
    if err != nil {
        panic(err)
    }
    defer ch.Close()

    // Generate a correlation ID
    correlationId := eqmrmq.GenerateCorrelationId()

    // Send a message
    msg := eqmrmq.Message{
        QueueName:     "my_queue",
        Message:       []byte("Hello, RabbitMQ!"),  // Message is now a byte slice
        CorrelationId: correlationId,
        ReplyQueue:    "", // Assuming no reply queue is needed here
        Ch:            ch,
    }

    err = msg.Publish()
    if err != nil {
        log.Fatalf("Failed to publish message: %v", err)
    }
}
Publishing a Message with Response

To send a message to a queue and wait for a response:

package main

import (
	"fmt"
	"log"
	"github.com/lacolle87/eqmrmq"
)

func main() {
    rabbitURL := "amqp://guest:guest@localhost:5672/"
    conn, err := eqmrmq.Connect(rabbitURL)
    if err != nil {
        log.Fatalf("Failed to connect to RabbitMQ: %v", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        panic(err)
    }
    defer ch.Close()

    // Send a message and wait for response
    msg := eqmrmq.Message{
        QueueName:   "my_queue",
        Message:     []byte("Hello, RabbitMQ!"),
        Ch:          ch,
    }
    response, err := eqmrmq.PublishToQueueWithResponse(msg)
    if err != nil {
        panic(err)
    }
    fmt.Println("Response:", string(response))
}
Consuming Messages

To consume messages from a queue:

package main

import (
	"fmt"
	"log"
	"github.com/lacolle87/eqmrmq"
	"github.com/rabbitmq/amqp091-go"
)

func main() {
    rabbitURL := "amqp://guest:guest@localhost:5672/"
    conn, err := eqmrmq.Connect(rabbitURL)
    if err != nil {
        log.Fatalf("Failed to connect to RabbitMQ: %v", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        panic(err)
    }
    defer ch.Close()

    // Define a handler function
    handler := func(ch *amqp.Channel, d amqp.Delivery, arg interface{}) error {
        fmt.Println("Received message:", string(d.Body))
        return nil
    }

    // Any additional argument you want to pass to the handler
    additionalArg := "my_custom_argument" // This can be any type

    // Consume messages
    err = eqmrmq.ConsumeMessages(ch, "my_queue", handler, additionalArg)
    if err != nil {
        panic(err)
    }
}
Replying to a Message

To reply to a message:

package main

import (
	"github.com/lacolle87/eqmrmq"
	"github.com/rabbitmq/amqp091-go"
)

func main() {
    rabbitURL := "amqp://guest:guest@localhost:5672/"
    conn, err := eqmrmq.Connect(rabbitURL)
    if err != nil {
        log.Fatalf("Failed to connect to RabbitMQ: %v", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        panic(err)
    }
    defer ch.Close()

    // Assuming 'delivery' is an amqp.Delivery received in a consumer
    var delivery amqp.Delivery // This would be your actual delivery object

    // Reply to a message
    err = eqmrmq.ReplyToMessage(ch, delivery, []byte("Response from server"))
    if err != nil {
        panic(err)
    }
}
Acknowledgments

Special thanks to the authors of RabbitMQ and the AMQP 0.9.1 Go client library github.com/rabbitmq/amqp091-go for providing the underlying functionality used by this package!

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Connect added in v0.3.1

func Connect(rabbitURL string) (*amqp.Connection, error)

func ConsumeMessages

func ConsumeMessages(ch *amqp.Channel, queueName string, handler func(*amqp.Channel, amqp.Delivery, interface{}) error, arg interface{}) error

func CreateReplyQueue added in v0.2.0

func CreateReplyQueue(channel *amqp.Channel) (string, error)

func DeclareQueue added in v0.2.0

func DeclareQueue(ch *amqp.Channel, queueName string) (amqp.Queue, error)

func GenerateCorrelationId added in v0.2.0

func GenerateCorrelationId() string

func PublishToQueueWithResponse added in v0.3.0

func PublishToQueueWithResponse(msg Message) ([]byte, error)

func ReceiveResponse added in v0.2.0

func ReceiveResponse(correlationId, replyQueue string, ch *amqp.Channel) ([]byte, error)

func RegisterConsumer added in v0.2.0

func RegisterConsumer(ch *amqp.Channel, queueName string) (<-chan amqp.Delivery, error)

func ReplyToMessage

func ReplyToMessage(ch *amqp.Channel, d amqp.Delivery, replyData []byte) error

func SendMessage

func SendMessage(queueName string, message interface{}, ch *amqp.Channel) ([]byte, error)

Types

type Message

type Message struct {
	QueueName     string
	Msg           []byte
	CorrelationId string
	ReplyQueue    string
	Ch            *amqp.Channel
}

func (Message) Publish added in v0.2.0

func (msg Message) Publish() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL