gorabbit

package module
v1.10.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 29, 2023 License: BSD-2-Clause Imports: 9 Imported by: 0

README

gorabbit

RabbitMq queue consuming and producing

Consuming features

  1. Manage subscribers on the fly using socket.
  2. Auto reconnect when connection failed.
  3. Auto nack on panic and panic recover. Application will not crash. Except unrecovered panics.
  4. Multiple server and multiple queues implementation supports in config(yaml) files.
  5. Callback registry. Allows to create a callback for each queue.

Producing features

  1. Reusing connection.
  2. Implemented connection pool.
  3. Manage max connections via config file.
  4. Free connection in idle status after 10s

Allowed commands

  1. consumer start all - start all consumer defined in registry
  2. consumer start name_1 name_2 - start specific consumers
  3. consumer stop all - stop all consumer defined in registry
  4. consumer stop name_1 name_2 - stop specific consumers
  5. consumer restart all - restart all consumer defined in registry
  6. consumer restart name_1 name_2 - restart specific consumers
  7. consumer status all - status of all consumer defined in registry
  8. consumer status name_1 name_2 - status of specific consumers
  9. consumer set count N name_1 name_2 - set count of subscribers for specific consumer

Example

echo "consumer status all" | nc localhost 3333
If you find this project useful or want to support the author, you can send tokens to any of these wallets
  • Bitcoin: bc1qgx5c3n7q26qv0tngculjz0g78u6mzavy2vg3tf
  • Ethereum: 0x62812cb089E0df31347ca32A1610019537bbFe0D
  • Dogecoin: DET7fbNzZftp4sGRrBehfVRoi97RiPKajV

Documentation

Index

Constants

View Source
const (
	CommandStart    = "start"
	CommandStop     = "stop"
	CommandRestart  = "restart"
	CommandStatus   = "status"
	CommandConsumer = "consumer"
	CommandSet      = "set"

	CommandKeyWordAll   = "all"
	CommandKeyWordCount = "count"
)
View Source
const (
	// Default value for max conn
	DefaultMaxConnections = 10
	// Default value for max idle lifetime
	DefaultMaxIdleConnectionLifeTime = 10 * time.Second
	// Maximum connection on 5000 rps
	DefaultMaxConnectionOnRPS = 5000
)
View Source
const MaxMessagesPerConnection = int64(50000)

MaxMessagesPerConnection will close connection on reach limit

Variables

This section is empty.

Functions

func ParseCommand added in v1.2.0

func ParseCommand(command *gocli.Command) (action string, arguments []gocli.Argument, e porterr.IError)

Parse gocli.Command

Types

type Application

type Application struct {

	// Basic application
	gocli.Application
	// contains filtered or unexported fields
}

Application Rabbit application struct

func NewApplication

func NewApplication(config Config, app gocli.Application) *Application

NewApplication New rabbit application

func (*Application) Consume

func (a *Application) Consume(name string) porterr.IError

Consume Create new consumer

func (*Application) ConsumerCommander added in v1.2.0

func (a *Application) ConsumerCommander(command *gocli.Command)

ConsumerCommander Consumer command processor

func (*Application) GetConfig added in v1.2.0

func (a *Application) GetConfig() *Config

GetConfig Get app config

func (*Application) GetRegistry added in v1.2.0

func (a *Application) GetRegistry() Registry

GetRegistry Get registry of subscribers

func (*Application) Publish

func (a *Application) Publish(p amqp.Publishing, queue string, server string, route ...string) porterr.IError

Publish Publisher publishing - AMQP Message queue - name of the queue defined in config server - name of the server defined in config route - routing keys

func (*Application) SetRegistry added in v1.2.1

func (a *Application) SetRegistry(r Registry) *Application

SetRegistry Set registry of subscribers

type Config

type Config struct {
	// Servers configuration
	Servers Servers
	// Queues configuration
	Queues Queues
}

Config Rabbit config

func (*Config) GetQueue

func (c *Config) GetQueue(name string) (*RabbitQueue, porterr.IError)

GetQueue Get queue

func (*Config) GetServer

func (c *Config) GetServer(name string) (*RabbitServer, porterr.IError)

GetServer Get server

type ConnectionPool added in v1.3.0

type ConnectionPool struct {
	// contains filtered or unexported fields
}

ConnectionPool Connection pool

func NewConnectionPool added in v1.3.0

func NewConnectionPool(maxConnection int) *ConnectionPool

NewConnectionPool Init connection pool

func (*ConnectionPool) GetConnection added in v1.3.0

func (cp *ConnectionPool) GetConnection(s RabbitServer) (c *connection, e porterr.IError)

GetConnection Get current connection using round robin algorithm

func (*ConnectionPool) Publish added in v1.3.0

func (cp *ConnectionPool) Publish(p amqp.Publishing, s RabbitServer, q RabbitQueue, route ...string) (e porterr.IError)

Publish message to queue

type Consumer

type Consumer struct {
	// Queue name
	Queue string
	// Server name
	Server string
	// Delivery process callback
	Callback func(d amqp.Delivery)
	// Subscribers count
	Count uint8
	// contains filtered or unexported fields
}

Consumer entity

func (*Consumer) HasSubscribers added in v1.2.0

func (c *Consumer) HasSubscribers() bool

Check for subscribers

func (*Consumer) NewSubscriber added in v1.2.0

func (c *Consumer) NewSubscriber(name string) *subscriber

New subscribers

func (*Consumer) Stop added in v1.2.0

func (c *Consumer) Stop()

Stop all subscribers

func (*Consumer) Subscribe added in v1.2.0

func (c *Consumer) Subscribe(logger gocli.Logger) porterr.IError

Subscribe for queue

func (*Consumer) SubscribersCount added in v1.2.0

func (c *Consumer) SubscribersCount() uint8

Get s subscribers

type Queues

type Queues map[string]RabbitQueue

Queues queue registry

type RabbitQueue

type RabbitQueue struct {
	Server     string
	Exchange   string
	Internal   bool
	Type       string
	Name       string
	Passive    bool
	Durable    bool
	Exclusive  bool
	Nowait     bool
	AutoDelete bool     `yaml:"autoDelete"`
	RoutingKey []string `yaml:"routingKey"`
	Arguments  map[string]interface{}
}

Queue configuration

type RabbitServer

type RabbitServer struct {
	// RabbitMQx virtual host
	Vhost string
	// RabbitMQ host
	Host string
	// RabbitMQ port
	Port int
	// RabbitMQ user
	User string
	// RabbitMQ password
	Password string
	// Maximum number of connections to server
	MaxConnections int `yaml:"maxPublishConnections"`
	// Maximum lifetime for idle connection
	MaxIdleConnectionLifeTime time.Duration `yaml:"maxIdleConnectionLifeTime"`
}

Server configuration

func (*RabbitServer) String

func (srv *RabbitServer) String() string

Get connection string

type Registry

type Registry map[string]*Consumer

Registry consumer registry

type ServerPool added in v1.3.0

type ServerPool struct {
	// contains filtered or unexported fields
}

ServerPool RabbitMq server Pool

func NewServerPool added in v1.3.0

func NewServerPool(l gocli.Logger) *ServerPool

NewServerPool Init server pool

func (*ServerPool) GetConnectionPoolOrCreate added in v1.3.0

func (sp *ServerPool) GetConnectionPoolOrCreate(server string, maxConnections int) *ConnectionPool

GetConnectionPoolOrCreate Get connection pool If not - create

type Servers

type Servers map[string]RabbitServer

Servers server registry

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL