goqueue

package module
v1.0.0-alpha.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 9, 2024 License: MIT Imports: 7 Imported by: 0

README

goqueue

GoQueue - one library to rule them all. A golang wrapper that handles all the complexity of every Queue platforms. Extensible and easy to learn

Index

Support

You can file an Issue. See documentation in Go.Dev

Getting Started

Install
go get -u github.com/bxcodec/goqueue

Example

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"

	"github.com/bxcodec/goqueue"
	"github.com/bxcodec/goqueue/consumer"
	rmqConsumer "github.com/bxcodec/goqueue/consumer/rabbitmq"
	"github.com/bxcodec/goqueue/middleware"
	"github.com/bxcodec/goqueue/publisher"
	rmqPublisher "github.com/bxcodec/goqueue/publisher/rabbitmq"
)

func initExchange(ch *amqp.Channel, exchangeName string) error {
	return ch.ExchangeDeclare(
		exchangeName,
		"topic",
		true,
		false,
		false,
		false,
		nil,
	)
}

func main() {
	rmqDSN := "amqp://rabbitmq:rabbitmq@localhost:5672/"
	rmqConn, err := amqp.Dial(rmqDSN)
	if err != nil {
		panic(err)
	}

	rmqPub := rmqPublisher.NewPublisher(rmqConn,
		publisher.WithPublisherID("publisher_id"),
		publisher.WithMiddlewares(
			middleware.HelloWorldMiddlewareExecuteBeforePublisher(),
			middleware.HelloWorldMiddlewareExecuteAfterPublisher(),
		),
	)

	publisherChannel, err := rmqConn.Channel()
	if err != nil {
		panic(err)
	}

	defer publisherChannel.Close()
	initExchange(publisherChannel, "goqueue")

	consumerChannel, err := rmqConn.Channel()
	if err != nil {
		panic(err)
	}
	defer consumerChannel.Close()

	rmqConsumer := rmqConsumer.NewConsumer(
		publisherChannel,
		consumerChannel,
		consumer.WithMiddlewares(
			middleware.HelloWorldMiddlewareExecuteAfterInboundMessageHandler(),
			middleware.HelloWorldMiddlewareExecuteBeforeInboundMessageHandler(),
		),
		consumer.WithQueueName("consumer_queue"),
		consumer.WithConsumerID("consumer_id"),
		consumer.WithBatchMessageSize(1),
		consumer.WithMaxRetryFailedMessage(3),
		consumer.WithActionsPatternSubscribed("goqueue.payments.#", "goqueue.users.#"),
		consumer.WithTopicName("goqueue"),
	)

	queueSvc := goqueue.NewQueueService(
		goqueue.WithPublisher(rmqPub),
		goqueue.WithConsumer(rmqConsumer),
		goqueue.WithMessageHandler(handler()),
	)

	go func() {
		for i := 0; i < 10; i++ {
			data := map[string]interface{}{
				"message": fmt.Sprintf("Hello World %d", i),
			}
			jbyt, _ := json.Marshal(data)
			err := queueSvc.Publish(context.Background(), goqueue.Message{
				Data:   data,
				Action: "goqueue.payments.create",
				Topic:  "goqueue",
			})
			if err != nil {
				panic(err)
			}
			fmt.Println("Message Sent: ", string(jbyt))
		}
	}()

	// change to context.Background() if you want to run it forever
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()
	err = queueSvc.Start(ctx)
	if err != nil {
		panic(err)
	}
}

func handler() goqueue.InboundMessageHandlerFunc {
	return func(ctx context.Context, m goqueue.InboundMessage) (err error) {
		data := m.Data
		jbyt, _ := json.Marshal(data)
		fmt.Println("Message Received: ", string(jbyt))
		return m.Ack(ctx)
	}
}

Contribution


To contrib to this project, you can open a PR or an issue.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// JSONEncoder is an implementation of the EncoderFn interface
	// that encodes a Message into JSON format.
	JSONEncoder EncoderFn = func(ctx context.Context, m Message) (data []byte, err error) {
		return json.Marshal(m)
	}
	// JSONDecoder is a DecoderFn implementation that decodes JSON data into a Message.
	JSONDecoder DecoderFn = func(ctx context.Context, data []byte) (m Message, err error) {
		err = json.Unmarshal(data, &m)
		return
	}

	DefaultEncoder EncoderFn = JSONEncoder
	DefaultDecoder DecoderFn = JSONDecoder
)
View Source
var (
	// JSONEncoding represents the encoding configuration for JSON.
	JSONEncoding = &Encoding{
		ContentType: headerVal.ContentTypeJSON,
		Encode:      JSONEncoder,
		Decode:      JSONDecoder,
	}
	DefaultEncoding = JSONEncoding
)

Functions

func AddGoQueueEncoding

func AddGoQueueEncoding(contentType headerVal.ContentType, encoding *Encoding)

AddGoQueueEncoding stores the given encoding for the specified content type in the goQueueEncodingMap. The goQueueEncodingMap is a concurrent-safe map that maps content types to encodings. The content type is specified by the `contentType` parameter, and the encoding is specified by the `encoding` parameter. This function is typically used to register custom encodings for specific content types in the GoQueue library.

Types

type Consumer

type Consumer interface {
	// Consume consumes messages from the queue and passes them to the provided handler.
	// It takes a context, an InboundMessageHandler, and a map of metadata as parameters.
	// It returns an error if there was a problem consuming the messages.
	Consume(ctx context.Context, handler InboundMessageHandler, meta map[string]interface{}) (err error)

	// Stop stops the consumer from consuming messages.
	// It takes a context as a parameter and returns an error if there was a problem stopping the consumer.
	Stop(ctx context.Context) (err error)
}

Consumer represents an entity that consumes messages from a queue.

type DecoderFn

type DecoderFn func(ctx context.Context, data []byte) (m Message, err error)

DecoderFn is a function type that decodes a byte slice into a Message. It takes a context and a byte slice as input and returns a Message and an error.

type DelayFn

type DelayFn func(retries int64) (delay int64)
var (
	// ExponentialBackoffDelayFn is a delay function that implements exponential backoff.
	// It takes the number of retries as input and returns the delay in milliseconds.
	ExponentialBackoffDelayFn DelayFn = func(retries int64) (delay int64) {
		return 2 << retries
	}
	// NoDelayFn is a DelayFn implementation that returns 0 delay for retries.
	NoDelayFn DelayFn = func(retries int64) (delay int64) {
		return 0
	}
	DefaultDelayFn DelayFn = NoDelayFn
)

type EncoderFn

type EncoderFn func(ctx context.Context, m Message) (data []byte, err error)

EncoderFn is a function type that encodes a message into a byte slice. It takes a context and a message as input and returns the encoded data and an error (if any).

type Encoding

type Encoding struct {
	ContentType headerVal.ContentType // The content type associated with this encoding.
	Encode      EncoderFn             // The encoding function used to encode data.
	Decode      DecoderFn             // The decoding function used to decode data.
}

Encoding represents an encoding configuration for a specific content type.

func GetGoQueueEncoding

func GetGoQueueEncoding(contentType headerVal.ContentType) (res *Encoding, ok bool)

GetGoQueueEncoding returns the encoding associated with the given content type. It looks up the encoding in the goQueueEncodingMap and returns it along with a boolean value indicating if the encoding was found. If the encoding is not found, it returns nil and false.

type InboundMessage

type InboundMessage struct {
	Message
	RetryCount int64                  `json:"retryCount"`
	Metadata   map[string]interface{} `json:"metadata"`
	// Ack is used for confirming the message. It will drop the message from the queue.
	Ack func(ctx context.Context) (err error) `json:"-"`
	// Nack is used for rejecting the message. It will requeue the message to be re-delivered again.
	Nack func(ctx context.Context) (err error) `json:"-"`
	// MoveToDeadLetterQueue is used for rejecting the message same with Nack, but instead of requeueing the message,
	// Read how to configure dead letter queue in each queue provider.
	// eg RabbitMQ: https://www.rabbitmq.com/docs/dlx
	MoveToDeadLetterQueue func(ctx context.Context) (err error) `json:"-"`
	// Requeue is used to put the message back to the tail of the queue after a delay.
	PutToBackOfQueueWithDelay func(ctx context.Context, delayFn DelayFn) (err error) `json:"-"`
}

type InboundMessageHandler

type InboundMessageHandler interface {
	HandleMessage(ctx context.Context, m InboundMessage) (err error)
}

type InboundMessageHandlerFunc

type InboundMessageHandlerFunc func(ctx context.Context, m InboundMessage) (err error)

func (InboundMessageHandlerFunc) HandleMessage

func (mhf InboundMessageHandlerFunc) HandleMessage(ctx context.Context, m InboundMessage) (err error)

type InboundMessageHandlerMiddlewareFunc

type InboundMessageHandlerMiddlewareFunc func(next InboundMessageHandlerFunc) InboundMessageHandlerFunc

type Message

type Message struct {
	ID           string                     `json:"id"`
	Action       string                     `json:"action"`
	Topic        string                     `json:"topic"`
	Data         any                        `json:"data"`
	ContentType  headerVal.ContentType      `json:"-"`
	Timestamp    time.Time                  `json:"timestamp"`
	Headers      map[string]interface{}     `json:"-"`
	ServiceAgent headerVal.GoquServiceAgent `json:"-"`
	// contains filtered or unexported fields
}

Message represents a message that will be published to the queue It contains the message ID, action, topic, data, content type, timestamp, headers, and service agent. The schema version is set by the SetSchemaVersion method. The message is used to publish messages to the queue. Read the concept of message publishing in the documentation, here: TODO(bxcodec): Add link to the documentation

func (*Message) GetSchemaVersion

func (m *Message) GetSchemaVersion() string

func (*Message) SetSchemaVersion

func (m *Message) SetSchemaVersion(v string)

type Option

type Option struct {
	// number of consumer/worker/goroutine that will be spawned in one goqueue instance
	NumberOfConsumer int
	Consumer         Consumer
	Publisher        Publisher
	MessageHandler   InboundMessageHandler
}

type OptionFunc

type OptionFunc func(opt *Option)

OptionFunc used for option chaining

func WithConsumer

func WithConsumer(c Consumer) OptionFunc

func WithMessageHandler

func WithMessageHandler(h InboundMessageHandler) OptionFunc

func WithNumberOfConsumer

func WithNumberOfConsumer(n int) OptionFunc

func WithPublisher

func WithPublisher(p Publisher) OptionFunc

type Publisher

type Publisher interface {
	PublisherHandler
	Close(ctx context.Context) (err error)
}

Publisher represents an interface for publishing messages.

type PublisherFunc

type PublisherFunc func(ctx context.Context, m Message) (err error)

PublisherFunc is a function type that represents a publisher function. It takes a context and a message as input parameters and returns an error.

func (PublisherFunc) Publish

func (f PublisherFunc) Publish(ctx context.Context, m Message) (err error)

Publish sends the given message using the provided context. It calls the underlying PublisherFunc to perform the actual publishing. If an error occurs during publishing, it is returned.

type PublisherHandler

type PublisherHandler interface {
	Publish(ctx context.Context, m Message) (err error)
}

PublisherHandler is an interface that defines the behavior of a message publisher.

type PublisherMiddlewareFunc

type PublisherMiddlewareFunc func(next PublisherFunc) PublisherFunc

type QueueService

type QueueService struct {
	NumberOfConsumer int // The number of consumers to process messages concurrently.
	// contains filtered or unexported fields
}

QueueService represents a queue service that handles incoming messages.

func NewQueueService

func NewQueueService(opts ...OptionFunc) *QueueService

NewQueueService creates a new instance of QueueService with the provided options. It accepts zero or more OptionFunc functions to customize the behavior of the QueueService. Returns a pointer to the created QueueService.

func (*QueueService) Publish

func (qs *QueueService) Publish(ctx context.Context, m Message) (err error)

Publish sends a message to the queue using the defined publisher. It returns an error if the publisher is not defined or if there was an error while publishing the message.

func (*QueueService) Start

func (qs *QueueService) Start(ctx context.Context) (err error)

Start starts the queue service by spawning multiple consumers to process messages. It returns an error if the consumer or handler is not defined. The method uses the provided context to manage the lifecycle of the consumers. Each consumer is assigned a unique consumer ID and the start time is recorded in the meta data. The method uses the errgroup package to manage the goroutines and waits for all consumers to finish.

func (*QueueService) Stop

func (qs *QueueService) Stop(ctx context.Context) error

Stop stops the queue service by stopping the consumer and closing the publisher. It returns an error if there is an issue stopping the consumer or closing the publisher.

Directories

Path Synopsis
examples
rabbitmq command
headers
key

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL