rmqgo

package module
v1.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 26, 2024 License: MIT Imports: 9 Imported by: 0

README

🐰 rmqgo

Go Version License

Wrapper of rabbitmq/amqp091-go that provides some features.

Installation

go get github.com/Almazatun/rmqgo

Connect to rabbitMQ

import (rmqgo "github.com/Almazatun/rmqgo")

rmq := rmqgo.New()

config := rmqgo.ConnectConfig{
 User: "user",
 Pass: "pass",
 Host: "host",
 Port: "port",
}

err := rmq.Connect(config)

if err != nil {
 // some action
}
Optional params when initialize rmqgo.New()

With RPC mode for request and replay pattern

rmqgo.New(rmqgo.WithRpc(replayQueueName, exchangeType))

With topic RPC

rmqgo.New(rmqgo.WithTopicRpc(replayQueueName, exchangeType, routingKey))
Create channel
ch, err := rmq.CreateChannel()

if err != nil {
 // some action
}
Create queue
args := make(map[string]interface{})

q, err := rmq.CreateQueue(rmqgo.CreateQueueConfig{
 Name:         "some_name",
 DeleteUnused: false,
 Exclusive:    false,
 NoWait:       false,
 Durable:      true,
 Args:         &args,
})

if err != nil {
 // some action
}

Create exchange

Exchanges

import (rmqgo "github.com/Almazatun/rmqgo")

rmqgo.Exchanges.Direct()
rmqgo.Exchanges.Topic()
rmqgo.Exchanges.Fanout()
rmqgo.Exchanges.Headers()

Exchange types

import (rmqgo "github.com/Almazatun/rmqgo")

rmqgo.ExchangeType.Direct()
rmqgo.ExchangeType.Topic()
rmqgo.ExchangeType.Fanout()
args := make(map[string]interface{})

err := rmq.CreateExchange(rmqgo.CreateExchangeConfig{
 Name:       rmqgo.Exchanges.RmqDirect,
 Type:       rmqgo.ExchangeType.Direct,
 Durable:    true,
 AutoDelete: false,
 Internal:   false,
 NoWait:     false,
 Args:       &args,
})

if err != nil {
 // some action
}
Bind exchange by created queue
args := make(map[string]interface{})

err := rmq.BindQueueByExchange(rmqgo.BindQueueByExgConfig{
 QueueName:    "some_name",
 RoutingKey:   "some_key",
 ExchangeName: Exchanges.RmqDirect,
 NoWait:       false,
 Args:         &args,
})

if err != nil {
 // some action
}

Create producer

producer = rmqgo.NewProducer(&rmq)
Send message
err := producer.Send(Exchanges.RmqDirect, routingKey, msg, method)

if err != nil {
 // some action
}

Send message with reply
b, err := producer.SendReply(Exchanges.RmqDirect, routingKey, msg, method)

if err != nil {
 // some action
}

// msg - is your own type SomeName struct { someFields:... }

err = json.Unmarshal(*b, &msg)

if err != nil {
 // some action
}

Create consumer

consumer := rmqgo.NewConsumer(
  &rmq,
  rmqgo.WithConsumerConfig(rmqgo.CreateConsumerConfig{
   NameQueue: "some_name",
   Consumer:  "some_value",
   AutoAck:   false,
   Exclusive: false,
   NoWait:    false,
   NoLocal:   false,
  }),
 )

consumer.Listen()

Consuming messages from queues

// Bytes - <- chan []byte
<- rmq.ReceiveMessages()
consumer.Listen()
Optional params when initialize rmqgo.NewConsumer(...)

With HttpConsumer

rmqgo.NewConsumer(*rmq, rmqgo.WithHttpConsumer())

With Consumer Args

rmqgo.NewConsumer(*rmq, rmqgo.WithConsumerArgs(rmqgo.ConsumerArgs{
 XDeadLetterExc        *""
 XDeadLetterRoutingKey *""
 Ttl                   *int
 XExpires              *int
 XMaxPriority          *int
}))

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ExchangeType = exchangeType{}
View Source
var Exchanges = exchanges{}

Functions

func WithConsumerArgs added in v1.1.0

func WithConsumerArgs(config ConsumerArgs) consumerOption

Consumer

func WithConsumerConfig added in v1.1.0

func WithConsumerConfig(config CreateConsumerConfig) consumerOption

func WithHttpConsumer added in v1.1.0

func WithHttpConsumer() consumerOption

Make able to run in other thread when init Consumer It can be used if need to run rmq service with http

Types

type BindQueueByExgConfig

type BindQueueByExgConfig struct {
	QueueName    string
	RoutingKey   string
	ExchangeName string
	NoWait       bool
	Args         *map[string]interface{}
}

type ConnectConfig

type ConnectConfig struct {
	User string
	Pass string
	Host string
	Port string
}

type Consumer added in v1.1.0

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer added in v1.1.0

func NewConsumer(rmq *Rmq, options ...consumerOption) *Consumer

func (*Consumer) AddTopicsFuncs added in v1.1.0

func (c *Consumer) AddTopicsFuncs(topicsFuncs map[string]func([]byte) interface{}) error

func (*Consumer) Listen added in v1.1.0

func (c *Consumer) Listen()

type ConsumerArgs added in v1.1.0

type ConsumerArgs struct {
	XDeadLetterExc        *string
	XDeadLetterRoutingKey *string
	Ttl                   *int
	XExpires              *int
	XMaxPriority          *int
}

type CreateConsumerConfig

type CreateConsumerConfig struct {
	NameQueue string
	Consumer  string
	AutoAck   bool
	Exclusive bool
	NoWait    bool
	NoLocal   bool
}

type CreateExchangeConfig

type CreateExchangeConfig struct {
	Name       string
	Type       string
	Durable    bool
	AutoDelete bool
	Internal   bool
	NoWait     bool
	Args       *map[string]interface{}
}

type CreateQueueConfig

type CreateQueueConfig struct {
	// https://www.rabbitmq.com/ttl.html
	// By default 30_000 millisecond
	MsgTtl       *int
	Name         string //queue name
	DeleteUnused bool   //delete when unused
	Exclusive    bool
	NoWait       bool
	Durable      bool
	Args         *map[string]interface{}
}

type Msg added in v1.1.0

type Msg interface{}

type Producer added in v1.1.0

type Producer struct {
	// contains filtered or unexported fields
}

func NewProducer added in v1.1.0

func NewProducer(rmq *Rmq, options ...producerOption) *Producer

func (*Producer) Send added in v1.1.0

func (p *Producer) Send(ex, rk string, msg interface{}, method string) error

func (*Producer) SendReply added in v1.3.1

func (p *Producer) SendReply(ex, rk string, msg interface{}, method string) (res []byte, err error)

type ProducerInitConfig added in v1.1.0

type ProducerInitConfig struct {
	NameQueue    string
	ExchangeName string
}

type Rmq

type Rmq struct {
	// contains filtered or unexported fields
}

func New

func New(options ...RmqOption) *Rmq

func (*Rmq) BindQueueByExchange

func (rmq *Rmq) BindQueueByExchange(config BindQueueByExgConfig) error

func (*Rmq) Close

func (rmq *Rmq) Close() error

func (*Rmq) Connect

func (rmq *Rmq) Connect(config ConnectConfig) error

func (*Rmq) CreateChannel

func (rmq *Rmq) CreateChannel() (c *amqp.Channel, err error)

func (*Rmq) CreateExchange

func (rmq *Rmq) CreateExchange(config CreateExchangeConfig) error

func (*Rmq) CreateQueue

func (rmq *Rmq) CreateQueue(config CreateQueueConfig) (q *amqp.Queue, err error)

func (*Rmq) ReceiveMessages added in v1.3.2

func (rmq *Rmq) ReceiveMessages() <-chan []byte

Only read access

type RmqOption added in v1.1.0

type RmqOption func(*Rmq)

func WithRpc added in v1.1.0

func WithRpc(replayQueueName, exchangeType string) RmqOption

Optionals

func WithTopicRpc added in v1.1.0

func WithTopicRpc(replayQueueName, exchangeType, rk string) RmqOption

type SendMsg

type SendMsg struct {
	Method string
	Msg
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL