package module
Version: v1.4.2 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Aug 30, 2021 License: BSD-2-Clause Imports: 8 Imported by: 0



RabbitMq queue consuming and producing

Consuming features

  1. Manage subscribers on the fly using socket.
  2. Auto reconnect when connection failed.
  3. Auto nack on panic and panic recover. Application will not crash. Except unrecovered panics.
  4. Multiple server and multiple queues implementation supports in config(yaml) files.
  5. Callback registry. Allows to create a callback for each queue.

Producing features

  1. Reusing connection.
  2. Implemented connection pool.
  3. Manage max connections by config file.
  4. Free connection in idle status after 10s

Allowed commands

  1. consumer start all - start all consumer defined in registry
  2. consumer start name_1 name_2 - start specific consumers
  3. consumer stop all - stop all consumer defined in registry
  4. consumer stop name_1 name_2 - stop specific consumers
  5. consumer restart all - restart all consumer defined in registry
  6. consumer restart name_1 name_2 - restart specific consumers
  7. consumer status all - status of all consumer defined in registry
  8. consumer status name_1 name_2 - status of specific consumers
  9. consumer set count N name_1 name_2 - set count of subscribers for specific consumer


echo "consumer status all" | nc localhost 3333




View Source
const (
	CommandStart    = "start"
	CommandStop     = "stop"
	CommandRestart  = "restart"
	CommandStatus   = "status"
	CommandConsumer = "consumer"
	CommandSet      = "set"

	CommandKeyWordAll   = "all"
	CommandKeyWordCount = "count"
View Source
const (
	// Default value for max conn
	DefaultMaxConnections = 10
	// Default value for max idle lifetime
	DefaultMaxIdleConnectionLifeTime = 10 * time.Second
	// Maximum connection on 5000 rps
	DefaultMaxConnectionOnRPS = 5000


This section is empty.


func ParseCommand added in v1.2.0

func ParseCommand(command *gocli.Command) (action string, arguments []gocli.Argument, e porterr.IError)

Parse gocli.Command


type Application

type Application struct {

	// Basic application
	// contains filtered or unexported fields

Application Rabbit application struct

func NewApplication

func NewApplication(config Config, app gocli.Application) *Application

NewApplication New rabbit application

func (*Application) Consume

func (a *Application) Consume(name string) porterr.IError

Consume Create new consumer

func (*Application) ConsumerCommander added in v1.2.0

func (a *Application) ConsumerCommander(command *gocli.Command)

ConsumerCommander Consumer command processor

func (*Application) GetConfig added in v1.2.0

func (a *Application) GetConfig() *Config

GetConfig Get app config

func (*Application) GetRegistry added in v1.2.0

func (a *Application) GetRegistry() Registry

GetRegistry Get registry of subscribers

func (*Application) Publish

func (a *Application) Publish(p amqp.Publishing, queue string, server string, route ...string) porterr.IError

Publish Publisher publishing - AMQP Message queue - name of the queue defined in config server - name of the server defined in config route - routing keys

func (*Application) SetRegistry added in v1.2.1

func (a *Application) SetRegistry(r Registry) *Application

SetRegistry Set registry of subscribers

type Config

type Config struct {
	// Servers configuration
	Servers Servers
	// Queues configuration
	Queues Queues

Rabbit config

func (*Config) GetQueue

func (c *Config) GetQueue(name string) (*RabbitQueue, porterr.IError)

Get queue

func (*Config) GetServer

func (c *Config) GetServer(name string) (*RabbitServer, porterr.IError)

Get server

type ConnectionPool added in v1.3.0

type ConnectionPool struct {
	// contains filtered or unexported fields

Connection pool

func NewConnectionPool added in v1.3.0

func NewConnectionPool() *ConnectionPool

Init connection pool

func (*ConnectionPool) GetConnection added in v1.3.0

func (cp *ConnectionPool) GetConnection(s RabbitServer) (c *connection, e porterr.IError)

Get current connection using round robin algorithm

func (*ConnectionPool) Publish added in v1.3.0

func (cp *ConnectionPool) Publish(p amqp.Publishing, s RabbitServer, q RabbitQueue, route ...string) (e porterr.IError)

Publish message to queue

type Consumer

type Consumer struct {
	// Queue name
	Queue string
	// Server name
	Server string
	// Delivery process callback
	Callback func(d amqp.Delivery)
	// Subscribers count
	Count uint8
	// contains filtered or unexported fields

Consumer entity

func (*Consumer) HasSubscribers added in v1.2.0

func (c *Consumer) HasSubscribers() bool

Check for subscribers

func (*Consumer) NewSubscriber added in v1.2.0

func (c *Consumer) NewSubscriber(name string) *subscriber

New subscribers

func (*Consumer) Stop added in v1.2.0

func (c *Consumer) Stop()

Stop all subscribers

func (*Consumer) Subscribe added in v1.2.0

func (c *Consumer) Subscribe(logger gocli.Logger) porterr.IError

Subscribe for queue

func (*Consumer) SubscribersCount added in v1.2.0

func (c *Consumer) SubscribersCount() uint8

Get s subscribers

type Queues

type Queues map[string]RabbitQueue


type RabbitQueue

type RabbitQueue struct {
	Server     string
	Exchange   string
	Internal   bool
	Type       string
	Name       string
	Passive    bool
	Durable    bool
	Exclusive  bool
	Nowait     bool
	AutoDelete bool     `yaml:"autoDelete"`
	RoutingKey []string `yaml:"routingKey"`
	Arguments  map[string]interface{}

Queue configuration

type RabbitServer

type RabbitServer struct {
	// RabbitMQx virtual host
	Vhost string
	// RabbitMQ host
	Host string
	// RabbitMQ port
	Port int
	// RabbitMQ user
	User string
	// RabbitMQ password
	Password string
	// Maximum number of connections to server
	MaxConnections int `yaml:"maxPublishConnections"`
	// Maximum lifetime for idle connection
	MaxIdleConnectionLifeTime time.Duration `yaml:"maxIdleConnectionLifeTime"`

Server configuration

func (*RabbitServer) String

func (srv *RabbitServer) String() string

Get connection string

type Registry

type Registry map[string]*Consumer

Consumer registry

type ServerPool added in v1.3.0

type ServerPool struct {
	// contains filtered or unexported fields

RabbitMq server Pool

func NewServerPool added in v1.3.0

func NewServerPool(l gocli.Logger) *ServerPool

Init server pool

func (*ServerPool) GetConnectionPoolOrCreate added in v1.3.0

func (sp *ServerPool) GetConnectionPoolOrCreate(server string) *ConnectionPool

Get connection pool If not - create

type Servers

type Servers map[string]RabbitServer


Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
t or T : Toggle theme light dark auto
y or Y : Canonical URL