gobroker

package module
v0.0.0-...-879b0f5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 20, 2025 License: MIT Imports: 9 Imported by: 0

README

Go Reference

Golang Broker Package

This package provides an abstraction layer on top of brokers like RabbitMQ, AmazonMQ and make publishing and consuming messages easier while following best practices. Main use-case for this package is micro-service architecture.

Features

  • Separate connection for publishing and consuming
  • Connection and Channel pooling
Supported Broker
- RabbitMQ

Usage

  • Setup Broker
newbroker := broker.NewBroker("127.0.0.1", &broker.EndpointOptions{
						Username: "guest", 
						Password: "guest", 
						Port: "5672"
					})
  • Publish Message To Exchange
err = newbroker.PublishToExchange(
		"exchange_name", 
		"servive.event.type", 			//route key
		map[string]string{"msg": "test"} 	//message, type: map[string]interface{}
		)
if err != nil {
	...
}	
  • Build Exchange
ex, err := newbroker.BuildExchange("exchange-name")
if err != nil {
	...
}
  • Publish Message
err = ex.Publish(
		"servive.event.type", 			//route key
		map[string]string{"msg": "test"} 	//message, type: map[string]interface{}
		)
if err != nil {
	...
}	
  • Start Consumer
err = ex.RunConsumer(
		"exchange_name", 
		"service.event.type", 	//route key
		ConsumeMethod, 		//customerMethodName
		"" 			//queue name, leave empty for exclusive queue
		)
if err != nil {
	...
}
  • Consume Method
func ConsumeMethod(message []byte) {
	response := make(map[string]string)
	json.Unmarshal(message, &response) 

	fmt.Println("Message Recieved:%v", response)
}

Upcoming Features

  • AmazonMQ Support

Example

Full example

Contributing

We welcome contributions to gobroker. If you would like to report a bug or request a new feature, please open an issue on GitHub. If you would like to contribute code, please submit a pull request.

License

GoBroker is licensed under the MIT License.

Documentation

Overview

version: 0.0.1 file to manage channel

version: 0.0.1 file to manage connection

version: 0.0.1 file to build exchange

version: 0.0.1 broker wrapper is lib to manage creation of exchanges and consumers only type supported is rabbitmq

version: 0.0.1 file publish messages

Index

Constants

This section is empty.

Variables

View Source
var (
	PublishConnection  = "publish"
	ConsumerConnection = "consume"
)

connection types

Functions

This section is empty.

Types

type Broker

type Broker struct {
	Endpoint string
	Type     string // only rabbitmq supported
	// contains filtered or unexported fields
}

broker struct

func NewBroker

func NewBroker(endpoint string, opts ...*EndpointOptions) *Broker

new broker

func (*Broker) AddConnection

func (e *Broker) AddConnection(ctype string) (*Connection, error)

create tls connection to borker

func (*Broker) BuildExchange

func (b *Broker) BuildExchange(name string, opts ...*ExchangeOptions) (*Exchange, error)

build exchange

func (*Broker) GetConnection

func (e *Broker) GetConnection(ctype string) (*Connection, error)

func (*Broker) PublishToExchange

func (b *Broker) PublishToExchange(exchangeName, routekey string, body interface{}, opts ...*PublishOptions) error

expose method to publish messages to exchange

func (*Broker) QueueDeclareAndBind

func (b *Broker) QueueDeclareAndBind(exchange, routeKey, queueName string) (string, error)

only declare and bind

func (*Broker) RunConsumer

func (b *Broker) RunConsumer(exchange, routeKey string, functions func([]byte), queueName string) error

only one channel is used per go cosumer

type Channel

type Channel struct {
	*amqp.Channel
	Status string
	Id     int
}

type Connection

type Connection struct {
	*amqp.Connection
	Status      string
	Type        string
	ChannelPool map[int]*Channel
	// contains filtered or unexported fields
}

func (*Connection) AddChannel

func (c *Connection) AddChannel() (*Channel, error)

create channel for rabbitmq

func (*Connection) GetChannel

func (c *Connection) GetChannel(id ...int) (*Channel, error)

get channel can take id to get specific channel

type EndpointOptions

type EndpointOptions struct {
	Protocol string
	Username string
	Password string
	Port     string
}

broker options like username password

type Exchange

type Exchange struct {
	// contains filtered or unexported fields
}

exchane struct

func (*Exchange) Publish

func (e *Exchange) Publish(routekey string, body interface{}, opts ...*PublishOptions) error

expose method to publish messages to exchange

func (*Exchange) QueueDeclareAndBind

func (e *Exchange) QueueDeclareAndBind(exchange, routeKey, queueName string, ch *Channel) (string, error)

only declare and bind

func (*Exchange) RunConsumer

func (e *Exchange) RunConsumer(exchange, routeKey string, functions func([]byte), queueName string) error

only one channel is used per go cosumer

type ExchangeOptions

type ExchangeOptions struct {
	Type       string
	Durable    bool
	AutoDelete bool
	Internal   bool
	NoWait     bool
	Args       amqp.Table
}

exchange options

type PublishOptions

type PublishOptions struct {
	Mandatory bool
	Immediate bool
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL