amqp

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 16, 2023 License: MIT Imports: 9 Imported by: 2

README

pkg.amqp

Queuing interface package for com projects

Usage example

package main

import (
	"fmt"
	amqp "github.com/taranovegor/pkg.amqp"
	"log"
	"os"
	"time"
)

type RegularMessage struct {
	Text string
}

type MessageForReply struct {
	Text string
}

type MessageReply struct {
	OriginText string
}

type RegularConsumer struct {
	amqp.Consumer
}

func (h RegularConsumer) Name() string {
	return "consumer_regular"
}

func (h RegularConsumer) Handle(body amqp.Body) amqp.Handled {
	msg := RegularMessage{}
	body.To(&msg)

	log.Printf("Consumed message: %s", msg.Text)

	return amqp.HandledSuccessfully()
}

type WithReplyConsumer struct {
	amqp.Consumer
}

func (h WithReplyConsumer) Name() string {
	return "consumer_with_reply"
}

func (h WithReplyConsumer) Handle(body amqp.Body) amqp.Handled {
	msg := MessageForReply{}
	body.To(&msg)

	log.Printf("Consumed message with reply: %s", msg.Text)

	return amqp.HandledSuccessfully().WithReply(MessageReply{OriginText: msg.Text})
}

var cfg = amqp.NewConfig(
	map[string]amqp.ConsumerConfig{
		"consumer_regular":    {Queue: "regular", Exclusive: false, NoLocal: false, NoWait: false},
		"consumer_with_reply": {Queue: "request", Exclusive: true, NoLocal: true, NoWait: true},
	},
	map[string]amqp.ExchangeConfig{},
	map[string]amqp.QueueConfig{
		"regular": {Durable: false, AutoDelete: false, Exclusive: false, NoWait: false},
		"request": {Durable: true, AutoDelete: true, Exclusive: true, NoWait: true},
	},
	map[string]amqp.ProducerConfig{
		"producer_regular":            {Queues: []string{"regular"}},
		"producer_awaiting_for_reply": {Queues: []string{"request"}, ReplyTo: "response"},
	},
	map[interface{}]amqp.RouteConfig{
		RegularMessage{}:  {Producer: "producer_regular"},
		MessageForReply{}: {Producer: "producer_awaiting_for_reply"},
	},
)

func main() {
	ctrl, err := amqp.Init("pkg.amqp", os.Getenv("AMQP_URL"), cfg, []amqp.Consumer{
		RegularConsumer{},
		WithReplyConsumer{},
	})
	if err != nil {
		panic(err)
	}

	ctrl.Consume()

	ctrl.Publish(
		amqp.MessageToPublish(
			RegularMessage{Text: fmt.Sprintf("regular message, created at %s", time.Now().String())},
		),
	)

	ctrl.Publish(
		amqp.MessageToPublishWithReply(
			MessageForReply{Text: fmt.Sprintf("message for reply, created at %s", time.Now().String())},
			func(body amqp.Body) amqp.Handled {
				msg := MessageReply{}
				body.To(&msg)

				log.Printf("Consumed message reply: %s", msg.OriginText)

				return amqp.HandledSuccessfully()
			},
		),
	)

	select {}
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ConfigNotFound = errors.New("config not found")

Functions

This section is empty.

Types

type Body

type Body map[string]interface{}

func (Body) To

func (b Body) To(i interface{})

type Config

type Config struct {
	// contains filtered or unexported fields
}

func NewConfig

func NewConfig(
	consumers map[string]ConsumerConfig,
	exchanges map[string]ExchangeConfig,
	queues map[string]QueueConfig,
	producers map[string]ProducerConfig,
	routing map[interface{}]RouteConfig,
) Config

func (Config) GetConsumer

func (c Config) GetConsumer(name string) (ConsumerConfig, error)

func (Config) GetExchange

func (c Config) GetExchange(name string) (ExchangeConfig, error)

func (Config) GetProducer

func (c Config) GetProducer(name string) (ProducerConfig, error)

func (Config) GetQueue

func (c Config) GetQueue(name string) (QueueConfig, error)

func (Config) GetRoute

func (c Config) GetRoute(i interface{}) (RouteConfig, error)

type Consumer

type Consumer interface {
	Name() string
	Handle(Body) Handled
}

type ConsumerConfig

type ConsumerConfig struct {
	Queue     string
	Exclusive bool
	NoLocal   bool
	NoWait    bool
	Args      map[string]interface{}
}

type Controller

type Controller struct {
	*Producer
	// contains filtered or unexported fields
}

func Init

func Init(
	appName string,
	url string,
	config Config,
	consumers []Consumer,
) (*Controller, error)

func (Controller) Consume

func (c Controller) Consume()

type ExchangeConfig

type ExchangeConfig struct {
	Kind       string
	Durable    bool
	AutoDelete bool
	Internal   bool
	NoWait     bool
	Args       map[string]interface{}
}

type Handled

type Handled struct {
	// contains filtered or unexported fields
}

func HandledAndRejected

func HandledAndRejected() Handled

func HandledNotSuccessfully

func HandledNotSuccessfully(requeue bool) Handled

func HandledSuccessfully

func HandledSuccessfully() Handled

func (Handled) WithReply

func (h Handled) WithReply(r interface{}) Handled

type MessageHandlerFunc

type MessageHandlerFunc func(Body) Handled

type MessageType

type MessageType string
const (
	MessageRegular   MessageType = "regular"
	MessageWithReply MessageType = "with_reply"
)

type NoReply

type NoReply struct {
}

type Producer added in v0.3.0

type Producer struct {
	// contains filtered or unexported fields
}

func (Producer) Publish added in v0.3.0

func (p Producer) Publish(msg PublishMessage) (PublishedMessage, error)

type ProducerConfig

type ProducerConfig struct {
	Exchange  string
	Queues    []string
	ReplyTo   string
	Mandatory bool
	Immediate bool
}

type PublishMessage

type PublishMessage struct {
	// contains filtered or unexported fields
}

func MessageToPublish

func MessageToPublish(msg interface{}) PublishMessage

func MessageToPublishWithReply

func MessageToPublishWithReply(msg interface{}, handler MessageHandlerFunc) PublishMessage

type PublishedMessage

type PublishedMessage struct {
	ID            uuid.UUID
	CorrelationID uuid.UUID
	SentAt        time.Time
	Message       interface{}
}

type QueueConfig

type QueueConfig struct {
	Durable    bool
	AutoDelete bool
	Exclusive  bool
	NoWait     bool
	Args       map[string]interface{}
}

type RouteConfig

type RouteConfig struct {
	Type     interface{}
	Producer string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL