Documentation
¶
Overview ¶
version: 0.0.1 file to manage channel
version: 0.0.1 file to manage connection
version: 0.0.1 file to build exchange
version: 0.0.1 broker wrapper is lib to manage creation of exchanges and consumers only type supported is rabbitmq
version: 0.0.1 file publish messages
redis.go Implementation of Redis pub/sub for gobroker
Index ¶
- Constants
- Variables
- type AmazonMQChannel
- type AmazonMQConnection
- type Broker
- func (e *Broker) AddAmazonMQConnection(ctype string) (*AmazonMQConnection, error)
- func (e *Broker) AddConnection(ctype string) (*Connection, error)
- func (e *Broker) AddRedisConnection(ctype string) (*RedisConnection, error)
- func (b *Broker) BuildExchange(name string, opts ...*ExchangeOptions) (*Exchange, error)
- func (b *Broker) Close()
- func (e *Broker) GetAmazonMQConnection(ctype string) (*AmazonMQConnection, error)
- func (e *Broker) GetConnection(ctype string) (*Connection, error)
- func (e *Broker) GetRedisConnection(ctype string) (*RedisConnection, error)
- func (b *Broker) Publish(ctx context.Context, topic string, body interface{}) error
- func (b *Broker) PublishToAmazonMQQueue(destination string, body interface{}) error
- func (b *Broker) PublishToExchange(ctx context.Context, exchangeName, routekey string, body interface{}, ...) error
- func (b *Broker) PublishToRedisChannel(channel string, body interface{}) error
- func (b *Broker) QueueDeclareAndBind(exchange, routeKey, queueName string) (string, error)
- func (b *Broker) RunAmazonMQConsumer(destination string, handler func([]byte)) error
- func (b *Broker) RunConsumer(exchange, routeKey string, functions func([]byte), queueName string) error
- func (b *Broker) RunRedisConsumer(channels []string, handler func([]byte)) error
- func (b *Broker) Subscribe(topic string, handler func([]byte), queueName ...string) error
- type Channel
- type Connection
- type EndpointOptions
- type Exchange
- func (e *Exchange) Publish(ctx context.Context, routekey string, body interface{}, ...) error
- func (e *Exchange) QueueDeclareAndBind(exchange, routeKey, queueName string, ch *Channel) (string, error)
- func (e *Exchange) RunConsumer(exchange, routeKey string, functions func([]byte), queueName string) error
- type ExchangeOptions
- type PublishOptions
- type RedisChannel
- type RedisConnection
- type RedisOptions
Constants ¶
const ( BrokerTypeRabbitMQ = "rabbitmq" BrokerTypeRedis = "redis" BrokerTypeAmazonMQ = "amazonmq" )
Updated broker type constants
Variables ¶
var ( AmazonMQPublishConnection = "amazonmq_publish" AmazonMQConsumerConnection = "amazonmq_consume" )
var ( PublishConnection = "publish" ConsumerConnection = "consume" )
connection types
var ( RedisPublishConnection = "redis_publish" RedisConsumerConnection = "redis_consume" )
Redis connection types
Functions ¶
This section is empty.
Types ¶
type AmazonMQChannel ¶
type AmazonMQChannel struct { Conn *stomp.Conn Status string Id int // Subscription reference, if this channel is used by a consumer Sub *stomp.Subscription }
AmazonMQChannel struct
type AmazonMQConnection ¶
type AmazonMQConnection struct { // The underlying STOMP connection Conn *stomp.Conn Status string Type string ChannelPool map[int]*AmazonMQChannel // We'll just store the address or any needed info here Address string // If you want to store the original stomp options: Options []func(*stomp.Conn) error // contains filtered or unexported fields }
AmazonMQConnection struct
func (*AmazonMQConnection) AddAmazonMQChannel ¶
func (c *AmazonMQConnection) AddAmazonMQChannel() (*AmazonMQChannel, error)
AddAmazonMQChannel
func (*AmazonMQConnection) GetAmazonMQChannel ¶
func (c *AmazonMQConnection) GetAmazonMQChannel(id ...int) (*AmazonMQChannel, error)
GetAmazonMQChannel
type Broker ¶
Broker represents a message broker connection
func NewBroker ¶
func NewBroker(endpoint string, brokerType string, opts ...*EndpointOptions) *Broker
NewBroker creates a new broker with unified API
func (*Broker) AddAmazonMQConnection ¶
func (e *Broker) AddAmazonMQConnection(ctype string) (*AmazonMQConnection, error)
AddAmazonMQConnection: create a new STOMP connection and attach it to Broker
func (*Broker) AddConnection ¶
func (e *Broker) AddConnection(ctype string) (*Connection, error)
create tls connection to borker
func (*Broker) AddRedisConnection ¶
func (e *Broker) AddRedisConnection(ctype string) (*RedisConnection, error)
Add Redis connection to broker
func (*Broker) BuildExchange ¶
func (b *Broker) BuildExchange(name string, opts ...*ExchangeOptions) (*Exchange, error)
build exchange
func (*Broker) GetAmazonMQConnection ¶
func (e *Broker) GetAmazonMQConnection(ctype string) (*AmazonMQConnection, error)
GetAmazonMQConnection: retrieves an existing connection or creates one if needed
func (*Broker) GetConnection ¶
func (e *Broker) GetConnection(ctype string) (*Connection, error)
GetConnection returns a live *Connection or tries to create one
func (*Broker) GetRedisConnection ¶
func (e *Broker) GetRedisConnection(ctype string) (*RedisConnection, error)
Get Redis connection
func (*Broker) PublishToAmazonMQQueue ¶
PublishToAmazonMQQueue publishes a message (JSON-encoded) to a STOMP destination
func (*Broker) PublishToExchange ¶
func (b *Broker) PublishToExchange(ctx context.Context, exchangeName, routekey string, body interface{}, opts ...*PublishOptions) error
expose method to publish messages to exchange
func (*Broker) PublishToRedisChannel ¶
Publish message to Redis
func (*Broker) QueueDeclareAndBind ¶
only declare and bind
func (*Broker) RunAmazonMQConsumer ¶
RunAmazonMQConsumer subscribes to a STOMP destination and processes messages
func (*Broker) RunConsumer ¶
func (b *Broker) RunConsumer(exchange, routeKey string, functions func([]byte), queueName string) error
only one channel is used per go cosumer
func (*Broker) RunRedisConsumer ¶
Run Redis consumer
type Connection ¶
type Connection struct { *amqp.Connection Status string Type string ChannelPool map[int]*Channel // contains filtered or unexported fields }
func (*Connection) AddChannel ¶
func (c *Connection) AddChannel() (*Channel, error)
create channel for rabbitmq
func (*Connection) GetChannel ¶
func (c *Connection) GetChannel(id ...int) (*Channel, error)
get channel can take id to get specific channel
type EndpointOptions ¶
type EndpointOptions struct { Protocol string Username string Password string Port string DB int // For Redis }
EndpointOptions defines connection parameters for different broker types
type Exchange ¶
type Exchange struct {
// contains filtered or unexported fields
}
exchane struct
func (*Exchange) Publish ¶
func (e *Exchange) Publish(ctx context.Context, routekey string, body interface{}, opts ...*PublishOptions) error
expose method to publish messages to exchange
type ExchangeOptions ¶
type ExchangeOptions struct { Type string Durable bool AutoDelete bool Internal bool NoWait bool Args amqp.Table }
exchange options
type PublishOptions ¶
type RedisChannel ¶
type RedisChannel struct { Client *redis.Client Status string Id int // contains filtered or unexported fields }
RedisChannel struct to maintain compatibility with Channel interface
type RedisConnection ¶
type RedisConnection struct { *redis.Client Status string Type string ChannelPool map[int]*RedisChannel // contains filtered or unexported fields }
RedisConnection struct to maintain compatibility with Connection interface
func (*RedisConnection) AddRedisChannel ¶
func (c *RedisConnection) AddRedisChannel() (*RedisChannel, error)
Add Redis channel
func (*RedisConnection) GetRedisChannel ¶
func (c *RedisConnection) GetRedisChannel(id ...int) (*RedisChannel, error)
Get Redis channel