goqueue

package module
v1.0.0-beta.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 11, 2024 License: MIT Imports: 11 Imported by: 0

README

goqueue

GoQueue - One library to rule them all. A Golang wrapper that handles all the complexity of various Queue platforms. Extensible and easy to learn.

Index

Support

You can file an Issue. See documentation in Go.Dev

Getting Started

Install
go get -u github.com/bxcodec/goqueue

Example

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"

	"github.com/bxcodec/goqueue"
	"github.com/bxcodec/goqueue/consumer"
	"github.com/bxcodec/goqueue/interfaces"
	"github.com/bxcodec/goqueue/middleware"
	"github.com/bxcodec/goqueue/options"
	consumerOpts "github.com/bxcodec/goqueue/options/consumer"
	publisherOpts "github.com/bxcodec/goqueue/options/publisher"
	"github.com/bxcodec/goqueue/publisher"
)

func initExchange(ch *amqp.Channel, exchangeName string) error {
	return ch.ExchangeDeclare(
		exchangeName,
		"topic",
		true,
		false,
		false,
		false,
		nil,
	)
}

func main() {

	// Initialize the RMQ connection
	rmqDSN := "amqp://rabbitmq:rabbitmq@localhost:5672/"
	rmqConn, err := amqp.Dial(rmqDSN)
	if err != nil {
		panic(err)
	}

	// Initialize the Publisher
	rmqPub := publisher.NewPublisher(
		publisherOpts.PublisherPlatformRabbitMQ,
		publisherOpts.WithRabbitMQPublisherConfig(&publisherOpts.RabbitMQPublisherConfig{
			Conn:                     rmqConn,
			PublisherChannelPoolSize: 5,
		}),
		publisherOpts.WithPublisherID("publisher_id"),
		publisherOpts.WithMiddlewares(
			middleware.HelloWorldMiddlewareExecuteBeforePublisher(),
			middleware.HelloWorldMiddlewareExecuteAfterPublisher(),
		),
	)

	publisherChannel, err := rmqConn.Channel()
	if err != nil {
		panic(err)
	}

	defer publisherChannel.Close()
	initExchange(publisherChannel, "goqueue")

	consumerChannel, err := rmqConn.Channel()
	if err != nil {
		panic(err)
	}
	defer consumerChannel.Close()
	rmqConsumer := consumer.NewConsumer(
		consumerOpts.ConsumerPlatformRabbitMQ,
		consumerOpts.WithRabbitMQConsumerConfig(consumerOpts.RabbitMQConfigWithDefaultTopicFanOutPattern(
			consumerChannel,
			publisherChannel,
			"goqueue",                      // exchange name
			[]string{"goqueue.payments.#"}, // routing keys pattern
		)),
		consumerOpts.WithConsumerID("consumer_id"),
		consumerOpts.WithMiddlewares(
			middleware.HelloWorldMiddlewareExecuteAfterInboundMessageHandler(),
			middleware.HelloWorldMiddlewareExecuteBeforeInboundMessageHandler(),
		),
		consumerOpts.WithMaxRetryFailedMessage(3),
		consumerOpts.WithBatchMessageSize(1),
		consumerOpts.WithQueueName("consumer_queue"),
	)

	queueSvc := goqueue.NewQueueService(
		options.WithConsumer(rmqConsumer),
		options.WithPublisher(rmqPub),
		options.WithMessageHandler(handler()),
	)
	go func() {
		for i := 0; i < 10; i++ {
			data := map[string]interface{}{
				"message": fmt.Sprintf("Hello World %d", i),
			}
			jbyt, _ := json.Marshal(data)
			err := queueSvc.Publish(context.Background(), interfaces.Message{
				Data:   data,
				Action: "goqueue.payments.create",
				Topic:  "goqueue",
			})
			if err != nil {
				panic(err)
			}
			fmt.Println("Message Sent: ", string(jbyt))
		}
	}()

	// change to context.Background() if you want to run it forever
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()
	err = queueSvc.Start(ctx)
	if err != nil {
		panic(err)
	}
}

func handler() interfaces.InboundMessageHandlerFunc {
	return func(ctx context.Context, m interfaces.InboundMessage) (err error) {
		data := m.Data
		jbyt, _ := json.Marshal(data)
		fmt.Println("Message Received: ", string(jbyt))
		return m.Ack(ctx)
	}
}

Advance Setups

RabbitMQ -- Retry Concept

Goqueue Retry Architecture RabbitMQ Src: Excalidraw Link

Contribution


To contrib to this project, you can open a PR or an issue.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// JSONEncoder is an implementation of the EncoderFn interface
	// that encodes a Message into JSON format.
	JSONEncoder EncoderFn = func(ctx context.Context, m interfaces.Message) (data []byte, err error) {
		return json.Marshal(m)
	}
	// JSONDecoder is a DecoderFn implementation that decodes JSON data into a Message.
	JSONDecoder DecoderFn = func(ctx context.Context, data []byte) (m interfaces.Message, err error) {
		err = json.Unmarshal(data, &m)
		return
	}

	DefaultEncoder EncoderFn = JSONEncoder
	DefaultDecoder DecoderFn = JSONDecoder
)
View Source
var (
	// JSONEncoding represents the encoding configuration for JSON.
	JSONEncoding = &Encoding{
		ContentType: headerVal.ContentTypeJSON,
		Encode:      JSONEncoder,
		Decode:      JSONDecoder,
	}
	DefaultEncoding = JSONEncoding
)

Functions

func AddGoQueueEncoding

func AddGoQueueEncoding(contentType headerVal.ContentType, encoding *Encoding)

AddGoQueueEncoding stores the given encoding for the specified content type in the goQueueEncodingMap. The goQueueEncodingMap is a concurrent-safe map that maps content types to encodings. The content type is specified by the `contentType` parameter, and the encoding is specified by the `encoding` parameter. This function is typically used to register custom encodings for specific content types in the GoQueue library.

Types

type DecoderFn

type DecoderFn func(ctx context.Context, data []byte) (m interfaces.Message, err error)

DecoderFn is a function type that decodes a byte slice into a Message. It takes a context and a byte slice as input and returns a Message and an error.

type EncoderFn

type EncoderFn func(ctx context.Context, m interfaces.Message) (data []byte, err error)

EncoderFn is a function type that encodes a message into a byte slice. It takes a context and a message as input and returns the encoded data and an error (if any).

type Encoding

type Encoding struct {
	ContentType headerVal.ContentType // The content type associated with this encoding.
	Encode      EncoderFn             // The encoding function used to encode data.
	Decode      DecoderFn             // The decoding function used to decode data.
}

Encoding represents an encoding configuration for a specific content type.

func GetGoQueueEncoding

func GetGoQueueEncoding(contentType headerVal.ContentType) (res *Encoding, ok bool)

GetGoQueueEncoding returns the encoding associated with the given content type. It looks up the encoding in the goQueueEncodingMap and returns it along with a boolean value indicating if the encoding was found. If the encoding is not found, it returns nil and false.

type QueueService

type QueueService struct {
	NumberOfConsumer int // The number of consumers to process messages concurrently.
	// contains filtered or unexported fields
}

QueueService represents a service that handles message queuing operations.

func NewQueueService

func NewQueueService(opts ...options.GoQueueOptionFunc) *QueueService

NewQueueService creates a new instance of QueueService with the provided options. It accepts a variadic parameter `opts` which allows configuring the QueueService. The options are applied in the order they are provided. Returns a pointer to the created QueueService.

func (*QueueService) Publish

func (qs *QueueService) Publish(ctx context.Context, m interfaces.Message) (err error)

Publish publishes a message to the queue. It returns an error if the publisher is not defined or if there was an error while publishing the message.

func (*QueueService) Start

func (qs *QueueService) Start(ctx context.Context) (err error)

Start starts the queue service by spawning multiple consumers to process messages. It returns an error if the consumer or handler is not defined. The method uses the provided context to manage the lifecycle of the consumers.

func (*QueueService) Stop

func (qs *QueueService) Stop(ctx context.Context) error

Stop stops the queue service by stopping the consumer and closing the publisher. It returns an error if there was an issue stopping the consumer or closing the publisher.

Directories

Path Synopsis
examples
rabbitmq/basic command
headers
key
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL