Documentation ¶
Index ¶
- Constants
- func CompareArgon2Hash(passphrase, salt string, multiplier uint32, hashedPassword []byte) (bool, error)
- func CompressWithGzip(data []byte, buffer *bytes.Buffer) error
- func CompressWithZstd(data []byte, buffer *bytes.Buffer) error
- func CreatePayload(input interface{}, compression *CompressionConfig, ...) ([]byte, error)
- func CreateTLSConfig(pemLocation string, localLocation string) (*tls.Config, error)
- func CreateWrappedPayload(input interface{}, letterID uuid.UUID, metadata string, ...) ([]byte, error)
- func DecompressWithGzip(buffer *bytes.Buffer) error
- func DecompressWithZstd(buffer *bytes.Buffer) error
- func DecryptWithAes(cipherDataWithNonce, hashedKey []byte, nonceSize int) ([]byte, error)
- func EncryptWithAes(data, hashedKey []byte, nonceSize int) ([]byte, error)
- func GetHashWithArgon(passphrase, salt string, timeConsideration uint32, multiplier uint32, ...) []byte
- func GetStringHashWithArgon(passphrase, salt string, timeConsideration uint32, threads uint8, ...) string
- func JSONUtcTimestamp() string
- func JSONUtcTimestampFromTime(t time.Time) string
- func RandomBytes(size int) []byte
- func RandomString(size int) string
- func RandomStringFromSource(size int, src rand.Source) string
- func ReadJSONFileToInterface(fileNamePath string) (interface{}, error)
- func ReadPayload(buffer *bytes.Buffer, compression *CompressionConfig, ...) error
- func RepeatedBytes(size int, repeat int) []byte
- func RepeatedRandomString(size int, repeat int) string
- type ChannelHost
- type CompressionConfig
- type ConnectionHost
- type ConnectionPool
- func NewConnectionPool(config *PoolConfig) (*ConnectionPool, error)
- func NewConnectionPoolWithErrorHandler(config *PoolConfig, errorHandler func(error)) (*ConnectionPool, error)
- func NewConnectionPoolWithHandlers(config *PoolConfig, errorHandler func(error), unhealthyHandler func(error)) (*ConnectionPool, error)
- func NewConnectionPoolWithUnhealthyHandler(config *PoolConfig, unhealthyHandler func(error)) (*ConnectionPool, error)
- func (cp *ConnectionPool) GetChannelFromPool() *ChannelHost
- func (cp *ConnectionPool) GetConnection() (*ConnectionHost, error)
- func (cp *ConnectionPool) GetTransientChannel(ackable bool) *amqp.Channel
- func (cp *ConnectionPool) ReturnChannel(chanHost *ChannelHost, erred bool)
- func (cp *ConnectionPool) ReturnConnection(connHost *ConnectionHost, flag bool)
- func (cp *ConnectionPool) Shutdown()
- type Consumer
- func (con *Consumer) Errors() <-chan error
- func (con *Consumer) FlushErrors()
- func (con *Consumer) FlushMessages()
- func (con *Consumer) FlushStop()
- func (con *Consumer) Get(queueName string) (*amqp.Delivery, error)
- func (con *Consumer) GetBatch(queueName string, batchSize int) ([]*amqp.Delivery, error)
- func (con *Consumer) ReceivedMessages() <-chan *ReceivedMessage
- func (con *Consumer) StartConsuming()
- func (con *Consumer) StartConsumingWithAction(action func(*ReceivedMessage))
- func (con *Consumer) Started() bool
- func (con *Consumer) StopConsuming(immediate bool, flushMessages bool) error
- type ConsumerConfig
- type EncryptionConfig
- type Envelope
- type ErrorMessage
- type Exchange
- type ExchangeBinding
- type Letter
- type ModdedBody
- type PoolConfig
- type PublishConfirmation
- type PublishReceipt
- type Publisher
- func (pub *Publisher) Publish(letter *Letter, skipReceipt bool)
- func (pub *Publisher) PublishReceipts() <-chan *PublishReceipt
- func (pub *Publisher) PublishWithConfirmation(letter *Letter, timeout time.Duration)
- func (pub *Publisher) PublishWithConfirmationContext(ctx context.Context, letter *Letter)
- func (pub *Publisher) PublishWithConfirmationContextError(ctx context.Context, letter *Letter) error
- func (pub *Publisher) PublishWithConfirmationError(letter *Letter, timeout time.Duration) error
- func (pub *Publisher) PublishWithConfirmationTransient(letter *Letter, timeout time.Duration)
- func (pub *Publisher) PublishWithError(letter *Letter, skipReceipt bool) error
- func (pub *Publisher) PublishWithTransient(letter *Letter) error
- func (pub *Publisher) QueueLetter(letter *Letter) bool
- func (pub *Publisher) QueueLetters(letters []*Letter) bool
- func (pub *Publisher) Shutdown(shutdownPools bool)
- func (pub *Publisher) StartAutoPublishing()
- type PublisherConfig
- type Queue
- type QueueBinding
- type RabbitSeasoning
- type RabbitService
- func NewRabbitService(config *RabbitSeasoning, passphrase string, salt string, ...) (*RabbitService, error)
- func NewRabbitServiceWithConnectionPool(connectionPool *ConnectionPool, config *RabbitSeasoning, passphrase string, ...) (*RabbitService, error)
- func NewRabbitServiceWithPublisher(publisher *Publisher, config *RabbitSeasoning, passphrase string, salt string, ...) (*RabbitService, error)
- func (rs *RabbitService) CentralErr() <-chan error
- func (rs *RabbitService) GetConsumer(consumerName string) (*Consumer, error)
- func (rs *RabbitService) GetConsumerConfig(consumerName string) (*ConsumerConfig, error)
- func (rs *RabbitService) Publish(input interface{}, exchangeName, routingKey, metadata string, wrapPayload bool, ...) error
- func (rs *RabbitService) PublishData(data []byte, exchangeName, routingKey string, headers amqp.Table) error
- func (rs *RabbitService) PublishLetter(letter *Letter) error
- func (rs *RabbitService) PublishWithConfirmation(input interface{}, exchangeName, routingKey, metadata string, wrapPayload bool, ...) error
- func (rs *RabbitService) QueueLetter(letter *Letter) error
- func (rs *RabbitService) Shutdown(stopConsumers bool)
- type ReceivedMessage
- type ReturnMessage
- type TLSConfig
- type Topologer
- func (top *Topologer) BindExchanges(bindings []*ExchangeBinding, ignoreErrors bool) error
- func (top *Topologer) BindQueues(bindings []*QueueBinding, ignoreErrors bool) error
- func (top *Topologer) BuildExchanges(exchanges []*Exchange, ignoreErrors bool) error
- func (top *Topologer) BuildQueues(queues []*Queue, ignoreErrors bool) error
- func (top *Topologer) BuildTopology(config *TopologyConfig, ignoreErrors bool) error
- func (top *Topologer) CreateExchange(exchangeName string, exchangeType string, ...) error
- func (top *Topologer) CreateExchangeFromConfig(exchange *Exchange) error
- func (top *Topologer) CreateQueue(queueName string, passiveDeclare bool, durable bool, autoDelete bool, ...) error
- func (top *Topologer) CreateQueueFromConfig(queue *Queue) error
- func (top *Topologer) ExchangeBind(exchangeBinding *ExchangeBinding) error
- func (top *Topologer) ExchangeDelete(exchangeName string, ifUnused, noWait bool) error
- func (top *Topologer) ExchangeUnbind(exchangeName, routingKey, parentExchangeName string, noWait bool, ...) error
- func (top *Topologer) PurgeQueue(queueName string, noWait bool) (int, error)
- func (top *Topologer) PurgeQueues(queueNames []string, noWait bool) (int, error)
- func (top *Topologer) QueueBind(queueBinding *QueueBinding) error
- func (top *Topologer) QueueDelete(name string, ifUnused, ifEmpty, noWait bool) (int, error)
- func (top *Topologer) UnbindQueue(queueName, routingKey, exchangeName string, args map[string]interface{}) error
- type TopologyConfig
- type WrappedBody
Constants ¶
const ( // GzipCompressionType helps identify which compression/decompression to use. GzipCompressionType = "gzip" // ZstdCompressionType helps identify which compression/decompression to use. ZstdCompressionType = "zstd" //AesSymmetricType helps identity which encryption/decryption to use. AesSymmetricType = "aes" )
const ( // QueueTypeQuorum indicates a queue of type quorum. QueueTypeQuorum = "quorum" // QueueTypeClassic indicates a queue of type classic. QueueTypeClassic = "classic" )
Variables ¶
This section is empty.
Functions ¶
func CompareArgon2Hash ¶
func CompareArgon2Hash(passphrase, salt string, multiplier uint32, hashedPassword []byte) (bool, error)
CompareArgon2Hash creates an Argon hash and then compares it to a provided hash.
func CompressWithGzip ¶
CompressWithGzip uses the standard Gzip Writer to compress data and places data in the supplied buffer.
func CompressWithZstd ¶
CompressWithZstd uses an external dependency for Zstd to compress data and places data in the supplied buffer.
func CreatePayload ¶
func CreatePayload( input interface{}, compression *CompressionConfig, encryption *EncryptionConfig) ([]byte, error)
CreatePayload creates a JSON marshal and optionally compresses and encrypts the bytes.
func CreateTLSConfig ¶
CreateTLSConfig creates a x509 TLS Config for use in TLS-based communication.
func CreateWrappedPayload ¶
func CreateWrappedPayload( input interface{}, letterID uuid.UUID, metadata string, compression *CompressionConfig, encryption *EncryptionConfig) ([]byte, error)
CreateWrappedPayload wraps your data in a plaintext wrapper called ModdedLetter and performs the selected modifications to data.
func DecompressWithGzip ¶
DecompressWithGzip uses the standard Gzip Reader to decompress data and replaces the supplied buffer with a new buffer with data in it.
func DecompressWithZstd ¶
DecompressWithZstd uses an external dependency for Zstd to decompress data and replaces the supplied buffer with a new buffer with data in it.
func DecryptWithAes ¶
DecryptWithAes decrypts bytes based on an Aes compatible hashed key.
func EncryptWithAes ¶
EncryptWithAes encrypts bytes based on an AES-256 compatible hashed key. If nonceSize is less than 12, the standard, 12, is used.
func GetHashWithArgon ¶
func GetHashWithArgon(passphrase, salt string, timeConsideration uint32, multiplier uint32, threads uint8, hashLength uint32) []byte
GetHashWithArgon uses Argon2 version 0x13 to hash a plaintext password with a provided salt string and return hash as bytes.
func GetStringHashWithArgon ¶
func GetStringHashWithArgon(passphrase, salt string, timeConsideration uint32, threads uint8, hashLength uint32) string
GetStringHashWithArgon uses Argon2 version 0x13 to hash a plaintext password with a provided salt string and return hash as base64 string.
func JSONUtcTimestamp ¶
func JSONUtcTimestamp() string
JSONUtcTimestamp quickly creates a string RFC3339 format in UTC
func JSONUtcTimestampFromTime ¶
JSONUtcTimestampFromTime quickly creates a string RFC3339 format in UTC
func RandomBytes ¶
RandomBytes returns a RandomString converted to bytes.
func RandomString ¶
RandomString creates a new RandomSource to generate a RandomString unique per nanosecond.
func RandomStringFromSource ¶
RandomStringFromSource generates a Random string that should always be unique. Example RandSrc.) var src = rand.NewSource(time.Now().UnixNano()) Source: https://stackoverflow.com/questions/22892120/how-to-generate-a-random-string-of-a-fixed-length-in-go
func ReadJSONFileToInterface ¶
ReadJSONFileToInterface opens a file.json and converts to interface{}.
func ReadPayload ¶
func ReadPayload(buffer *bytes.Buffer, compression *CompressionConfig, encryption *EncryptionConfig) error
ReadPayload unencrypts and uncompresses payloads
func RepeatedBytes ¶
RepeatedBytes generates a RandomString and then repeats it up to size.
func RepeatedRandomString ¶
RepeatedRandomString generates a RandomString and then repeats it up to size and repeat count.
Types ¶
type ChannelHost ¶
type ChannelHost struct { Channel *amqp.Channel ID uint64 ConnectionID uint64 Ackable bool CachedChannel bool Confirmations chan amqp.Confirmation Errors chan *amqp.Error // contains filtered or unexported fields }
ChannelHost is an internal representation of amqp.Channel.
func NewChannelHost ¶
func NewChannelHost( connHost *ConnectionHost, id uint64, connectionID uint64, ackable, cached bool) (*ChannelHost, error)
NewChannelHost creates a simple ChannelHost wrapper for management by end-user developer.
func (*ChannelHost) Close ¶
func (ch *ChannelHost) Close()
Close allows for manual close of Amqp Channel kept internally.
func (*ChannelHost) FlushConfirms ¶
func (ch *ChannelHost) FlushConfirms()
FlushConfirms removes all previous confirmations pending processing.
func (*ChannelHost) MakeChannel ¶
func (ch *ChannelHost) MakeChannel() (err error)
MakeChannel tries to create (or re-create) the channel from the ConnectionHost its attached to.
func (*ChannelHost) PauseForFlowControl ¶
func (ch *ChannelHost) PauseForFlowControl()
PauseForFlowControl allows you to wait and sleep while receiving flow control messages.
type CompressionConfig ¶
type CompressionConfig struct { Enabled bool `json:"Enabled" yaml:"Enabled"` Type string `json:"Type,omitempty" yaml:"Type,omitempty"` }
CompressionConfig allows you to configuration symmetric key encryption based on options
type ConnectionHost ¶
type ConnectionHost struct { Connection *amqp.Connection ConnectionID uint64 CachedChannelCount uint64 Errors chan *amqp.Error Blockers chan amqp.Blocking // contains filtered or unexported fields }
ConnectionHost is an internal representation of amqp.Connection.
func NewConnectionHost ¶
func NewConnectionHost( uri string, connectionName string, connectionID uint64, heartbeatInterval time.Duration, connectionTimeout time.Duration, tlsConfig *TLSConfig) (*ConnectionHost, error)
NewConnectionHost creates a simple ConnectionHost wrapper for management by end-user developer.
func (*ConnectionHost) Connect ¶
func (ch *ConnectionHost) Connect() bool
Connect tries to connect (or reconnect) to the provided properties of the host one time.
func (*ConnectionHost) ConnectWithErrorHandler ¶ added in v2.2.6
func (ch *ConnectionHost) ConnectWithErrorHandler(errorHandler func(error)) bool
ConnectWithErrorHandler tries to connect (or reconnect) to the provided properties of the host one time with an error handler.
func (*ConnectionHost) PauseOnFlowControl ¶
func (ch *ConnectionHost) PauseOnFlowControl()
PauseOnFlowControl allows you to wait and sleep while receiving flow control messages. Sleeps for one second, repeatedly until the blocking has stopped.
type ConnectionPool ¶
type ConnectionPool struct { Config PoolConfig // contains filtered or unexported fields }
ConnectionPool houses the pool of RabbitMQ connections.
func NewConnectionPool ¶
func NewConnectionPool(config *PoolConfig) (*ConnectionPool, error)
NewConnectionPool creates hosting structure for the ConnectionPool.
func NewConnectionPoolWithErrorHandler ¶
func NewConnectionPoolWithErrorHandler(config *PoolConfig, errorHandler func(error)) (*ConnectionPool, error)
NewConnectionPoolWithErrorHandler creates hosting structure for the ConnectionPool with an error handler.
func NewConnectionPoolWithHandlers ¶ added in v2.2.7
func NewConnectionPoolWithHandlers(config *PoolConfig, errorHandler func(error), unhealthyHandler func(error)) (*ConnectionPool, error)
NewConnectionPoolWithHandlers creates hosting structure for the ConnectionPool with an error and/or unhealthy handler.
func NewConnectionPoolWithUnhealthyHandler ¶ added in v2.2.7
func NewConnectionPoolWithUnhealthyHandler(config *PoolConfig, unhealthyHandler func(error)) (*ConnectionPool, error)
NewConnectionPoolWithUnhealthyHandler creates hosting structure for the ConnectionPool with an unhealthy handler.
func (*ConnectionPool) GetChannelFromPool ¶
func (cp *ConnectionPool) GetChannelFromPool() *ChannelHost
GetChannelFromPool gets a cached ackable channel from the Pool if they exist or creates a channel. A non-acked channel is always a transient channel. Blocking if Ackable is true and the cache is empty. If you want a transient Ackable channel (un-managed), use CreateChannel directly.
func (*ConnectionPool) GetConnection ¶
func (cp *ConnectionPool) GetConnection() (*ConnectionHost, error)
GetConnection gets a connection based on whats in the ConnectionPool (blocking under bad network conditions). Flowcontrol (blocking) or transient network outages will pause here until cleared. Uses the SleepOnErrorInterval to pause between retries.
func (*ConnectionPool) GetTransientChannel ¶
func (cp *ConnectionPool) GetTransientChannel(ackable bool) *amqp.Channel
GetTransientChannel allows you create an unmanaged amqp Channel with the help of the ConnectionPool.
func (*ConnectionPool) ReturnChannel ¶
func (cp *ConnectionPool) ReturnChannel(chanHost *ChannelHost, erred bool)
ReturnChannel returns a Channel. If Channel is not a cached channel, it is simply closed here. If Cache Channel, we check if erred, new Channel is created instead and then returned to the cache.
func (*ConnectionPool) ReturnConnection ¶
func (cp *ConnectionPool) ReturnConnection(connHost *ConnectionHost, flag bool)
ReturnConnection puts the connection back in the queue and flag it for error. This helps maintain a Round Robin on Connections and their resources.
func (*ConnectionPool) Shutdown ¶
func (cp *ConnectionPool) Shutdown()
Shutdown closes all connections in the ConnectionPool and resets the Pool to pre-initialized state.
type Consumer ¶
type Consumer struct { Config *ConsumerConfig ConnectionPool *ConnectionPool Enabled bool QueueName string ConsumerName string // contains filtered or unexported fields }
Consumer receives messages from a RabbitMQ location.
func NewConsumer ¶
func NewConsumer( rconfig *RabbitSeasoning, cp *ConnectionPool, queuename string, consumerName string, autoAck bool, exclusive bool, noWait bool, args map[string]interface{}, qosCountOverride int, sleepOnErrorInterval uint32, sleepOnIdleInterval uint32) (*Consumer, error)
NewConsumer creates a new Consumer to receive messages from a specific queuename.
func NewConsumerFromConfig ¶
func NewConsumerFromConfig(config *ConsumerConfig, cp *ConnectionPool) *Consumer
NewConsumerFromConfig creates a new Consumer to receive messages from a specific queuename.
func (*Consumer) FlushErrors ¶
func (con *Consumer) FlushErrors()
FlushErrors allows you to flush out all previous Errors.
func (*Consumer) FlushMessages ¶
func (con *Consumer) FlushMessages()
FlushMessages allows you to flush out all previous Messages. WARNING: THIS WILL RESULT IN LOST MESSAGES.
func (*Consumer) FlushStop ¶
func (con *Consumer) FlushStop()
FlushStop allows you to flush out all previous Stop signals.
func (*Consumer) ReceivedMessages ¶
func (con *Consumer) ReceivedMessages() <-chan *ReceivedMessage
ReceivedMessages yields all the internal messages ready for consuming.
func (*Consumer) StartConsuming ¶
func (con *Consumer) StartConsuming()
StartConsuming starts the Consumer.
func (*Consumer) StartConsumingWithAction ¶
func (con *Consumer) StartConsumingWithAction(action func(*ReceivedMessage))
StartConsumingWithAction starts the Consumer invoking a method on every ReceivedMessage.
func (*Consumer) StopConsuming ¶
StopConsuming allows you to signal stop to the consumer. Will stop on the consumer channelclose or responding to signal after getting all remaining deviveries. FlushMessages empties the internal buffer of messages received by queue. Ackable messages are still in RabbitMQ queue, while noAck messages will unfortunately be lost. Use wisely.
type ConsumerConfig ¶
type ConsumerConfig struct { Enabled bool `json:"Enabled" yaml:"Enabled"` QueueName string `json:"QueueName" yaml:"QueueName"` ConsumerName string `json:"ConsumerName" yaml:"ConsumerName"` AutoAck bool `json:"AutoAck" yaml:"AutoAck"` Exclusive bool `json:"Exclusive" yaml:"Exclusive"` NoWait bool `json:"NoWait" yaml:"NoWait"` Args map[string]interface{} `json:"Args" yaml:"Args"` QosCountOverride int `json:"QosCountOverride" yaml:"QosCountOverride"` // if zero ignored SleepOnErrorInterval uint32 `json:"SleepOnErrorInterval" yaml:"SleepOnErrorInterval"` // sleep on error SleepOnIdleInterval uint32 `json:"SleepOnIdleInterval" yaml:"SleepOnIdleInterval"` // sleep on idle }
ConsumerConfig represents settings for configuring a consumer with ease.
type EncryptionConfig ¶
type EncryptionConfig struct { Enabled bool `json:"Enabled" yaml:"Enabled"` Type string `json:"Type,omitempty" yaml:"Type,omitempty"` Hashkey []byte TimeConsideration uint32 `json:"TimeConsideration,omitempty" yaml:"TimeConsideration,omitempty"` MemoryMultiplier uint32 `json:"" yaml:""` Threads uint8 `json:"Threads,omitempty" yaml:"Threads,omitempty"` }
EncryptionConfig allows you to configuration symmetric key encryption based on options
type Envelope ¶
type Envelope struct { Exchange string RoutingKey string ContentType string CorrelationID string Type string Mandatory bool Immediate bool Headers amqp.Table DeliveryMode uint8 Priority uint8 }
Envelope contains all the address details of where a letter is going.
type ErrorMessage ¶
ErrorMessage allow for you to replay a message that was returned.
func NewErrorMessage ¶
func NewErrorMessage(amqpError *amqp.Error) *ErrorMessage
NewErrorMessage creates a new ErrorMessage.
func (*ErrorMessage) Error ¶
func (em *ErrorMessage) Error() string
Error allows you to quickly log the ErrorMessage struct as a string.
type Exchange ¶
type Exchange struct { Name string `json:"Name" yaml:"Name"` Type string `json:"Type" yaml:"Type"` // "direct", "fanout", "topic", "headers" PassiveDeclare bool `json:"PassiveDeclare" yaml:"PassiveDeclare"` Durable bool `json:"Durable" yaml:"Durable"` AutoDelete bool `json:"AutoDelete" yaml:"AutoDelete"` InternalOnly bool `json:"InternalOnly" yaml:"InternalOnly"` NoWait bool `json:"NoWait" yaml:"NoWait"` Args amqp.Table `json:"Args,omitempty" yaml:"Args,omitempty"` // map[string]interface() }
Exchange allows for you to create Exchange topology.
type ExchangeBinding ¶
type ExchangeBinding struct { ExchangeName string `json:"ExchangeName" yaml:"ExchangeName"` ParentExchangeName string `json:"ParentExchangeName" yaml:"ParentExchangeName"` RoutingKey string `json:"RoutingKey" yaml:"RoutingKey"` NoWait bool `json:"NoWait" yaml:"NoWait"` Args amqp.Table `json:"Args,omitempty" yaml:"Args,omitempty"` // map[string]interface() }
ExchangeBinding allows for you to create Bindings between an Exchange and Exchange.
type Letter ¶
Letter contains the message body and address of where things are going.
func CreateLetter ¶
CreateLetter creates a simple letter for publishing.
func CreateMockLetter ¶
CreateMockLetter creates a mock letter for publishing.
func CreateMockRandomLetter ¶
CreateMockRandomLetter creates a mock letter for publishing with random sizes and random Ids.
func CreateMockRandomWrappedBodyLetter ¶
CreateMockRandomWrappedBodyLetter creates a mock Letter for publishing with random sizes and random Ids.
type ModdedBody ¶
type ModdedBody struct { Encrypted bool `json:"Encrypted"` EType string `json:"EncryptionType,omitempty"` Compressed bool `json:"Compressed"` CType string `json:"CompressionType,omitempty"` UTCDateTime string `json:"UTCDateTime"` Data []byte `json:"Data"` }
ModdedBody is a payload with modifications and indicators of what was modified.
type PoolConfig ¶
type PoolConfig struct { ApplicationName string `json:"ApplicationName" yaml:"ApplicationName"` URI string `json:"URI" yaml:"URI"` Heartbeat uint32 `json:"Heartbeat" yaml:"Heartbeat"` ConnectionTimeout uint32 `json:"ConnectionTimeout" yaml:"ConnectionTimeout"` SleepOnErrorInterval uint32 `json:"SleepOnErrorInterval" yaml:"SleepOnErrorInterval"` // sleep length on errors MaxConnectionCount uint64 `json:"MaxConnectionCount" yaml:"MaxConnectionCount"` // number of connections to create in the pool MaxCacheChannelCount uint64 `json:"MaxCacheChannelCount" yaml:"MaxCacheChannelCount"` // number of channels to be cached in the pool TLSConfig *TLSConfig `json:"TLSConfig" yaml:"TLSConfig"` // TLS settings for connection with AMQPS. }
PoolConfig represents settings for creating/configuring pools.
type PublishConfirmation ¶
type PublishConfirmation struct { DeliveryTag uint64 // Delivery Tag Id Acked bool // Acked Serverside }
PublishConfirmation aids in guaranteed Deliverability.
func NewPublishConfirmation ¶
func NewPublishConfirmation(confirmation *amqp.Confirmation) *PublishConfirmation
NewPublishConfirmation creates a new PublishConfirmation.
type PublishReceipt ¶
PublishReceipt is a way to monitor publishing success and to initiate a retry when using async publishing.
func (*PublishReceipt) ToString ¶
func (not *PublishReceipt) ToString() string
ToString allows you to quickly log the PublishReceipt struct as a string.
type Publisher ¶
type Publisher struct { Config *RabbitSeasoning ConnectionPool *ConnectionPool // contains filtered or unexported fields }
Publisher contains everything you need to publish a message.
func NewPublisher ¶
func NewPublisher( cp *ConnectionPool, sleepOnIdleInterval time.Duration, sleepOnErrorInterval time.Duration, publishTimeOutDuration time.Duration) *Publisher
NewPublisher creates and configures a new Publisher.
func NewPublisherFromConfig ¶
func NewPublisherFromConfig( config *RabbitSeasoning, cp *ConnectionPool) *Publisher
NewPublisherFromConfig creates and configures a new Publisher.
func (*Publisher) Publish ¶
Publish sends a single message to the address on the letter using a cached ChannelHost. Subscribe to PublishReceipts to see success and errors.
For proper resilience (at least once delivery guarantee over shaky network) use PublishWithConfirmation
func (*Publisher) PublishReceipts ¶
func (pub *Publisher) PublishReceipts() <-chan *PublishReceipt
PublishReceipts yields all the success and failures during all publish events. Highly recommend susbscribing to this.
func (*Publisher) PublishWithConfirmation ¶
PublishWithConfirmation sends a single message to the address on the letter with confirmation capabilities.
This is an expensive and slow call - use this when delivery confirmation on publish is your highest priority. A timeout failure drops the letter back in the PublishReceipts. A confirmation failure keeps trying to publish (at least until timeout failure occurs.)
func (*Publisher) PublishWithConfirmationContext ¶
PublishWithConfirmationContext sends a single message to the address on the letter with confirmation capabilities. This is an expensive and slow call - use this when delivery confirmation on publish is your highest priority. A timeout failure drops the letter back in the PublishReceipts. A confirmation failure keeps trying to publish (at least until timeout failure occurs.)
func (*Publisher) PublishWithConfirmationContextError ¶
func (pub *Publisher) PublishWithConfirmationContextError(ctx context.Context, letter *Letter) error
PublishWithConfirmationContextError sends a single message to the address on the letter with confirmation capabilities. This is an expensive and slow call - use this when delivery confirmation on publish is your highest priority. A timeout failure drops the letter back in the PublishReceipts. A confirmation failure keeps trying to publish (at least until timeout failure occurs.)
func (*Publisher) PublishWithConfirmationError ¶
PublishWithConfirmationError sends a single message to the address on the letter with confirmation capabilities.
This is an expensive and slow call - use this when delivery confirmation on publish is your highest priority. A timeout failure drops the letter back in the PublishReceipts. A confirmation failure keeps trying to publish (at least until timeout failure occurs.)
func (*Publisher) PublishWithConfirmationTransient ¶
PublishWithConfirmationTransient sends a single message to the address on the letter with confirmation capabilities on transient Channels. This is an expensive and slow call - use this when delivery confirmation on publish is your highest priority. A timeout failure drops the letter back in the PublishReceipts. When combined with QueueLetter, it automatically
gets requeued for re-publish.
A confirmation failure keeps trying to publish (at least until timeout failure occurs.)
func (*Publisher) PublishWithError ¶
PublishWithError sends a single message to the address on the letter using a cached ChannelHost.
For proper resilience (at least once delivery guarantee over shaky network) use PublishWithConfirmation
func (*Publisher) PublishWithTransient ¶
PublishWithTransient sends a single message to the address on the letter using a transient (new) RabbitMQ channel. Subscribe to PublishReceipts to see success and errors. For proper resilience (at least once delivery guarantee over shaky network) use PublishWithConfirmation
func (*Publisher) QueueLetter ¶
QueueLetter queues up a letter that will be consumed by AutoPublish. By default, AutoPublish uses PublishWithConfirmation as the mechanism for publishing.
func (*Publisher) QueueLetters ¶
QueueLetters allows you to bulk queue letters that will be consumed by AutoPublish. By default, AutoPublish uses PublishWithConfirmation as the mechanism for publishing.
func (*Publisher) Shutdown ¶
Shutdown cleanly shutdown the publisher and resets it's internal state.
func (*Publisher) StartAutoPublishing ¶
func (pub *Publisher) StartAutoPublishing()
StartAutoPublishing starts the Publisher's auto-publishing capabilities.
type PublisherConfig ¶
type PublisherConfig struct { AutoAck bool `json:"AutoAck" yaml:"AutoAck"` SleepOnIdleInterval uint32 `json:"SleepOnIdleInterval" yaml:"SleepOnIdleInterval"` SleepOnErrorInterval uint32 `json:"SleepOnErrorInterval" yaml:"SleepOnErrorInterval"` PublishTimeOutInterval uint32 `json:"PublishTimeOutInterval" yaml:"PublishTimeOutInterval"` MaxRetryCount uint32 `json:"MaxRetryCount" yaml:"MaxRetryCount"` }
PublisherConfig represents settings for configuring global settings for all Publishers with ease.
type Queue ¶
type Queue struct { Name string `json:"Name" yaml:"Name"` PassiveDeclare bool `json:"PassiveDeclare" yaml:"PassiveDeclare"` Durable bool `json:"Durable" yaml:"Durable"` AutoDelete bool `json:"AutoDelete" yaml:"AutoDelete"` Exclusive bool `json:"Exclusive" yaml:"Exclusive"` NoWait bool `json:"NoWait" yaml:"NoWait"` Type string `json:"Type" yaml:"Type"` // classic or quorum, type of quorum disregards exclusive and enables durable properties when building from config Args amqp.Table `json:"Args,omitempty" yaml:"Args,omitempty"` // map[string]interface() }
Queue allows for you to create Queue topology.
type QueueBinding ¶
type QueueBinding struct { QueueName string `json:"QueueName" yaml:"QueueName"` ExchangeName string `json:"ExchangeName" yaml:"ExchangeName"` RoutingKey string `json:"RoutingKey" yaml:"RoutingKey"` NoWait bool `json:"NoWait" yaml:"NoWait"` Args amqp.Table `json:"Args,omitempty" yaml:"Args,omitempty"` // map[string]interface() }
QueueBinding allows for you to create Bindings between a Queue and Exchange.
type RabbitSeasoning ¶
type RabbitSeasoning struct { EncryptionConfig *EncryptionConfig `json:"EncryptionConfig" yaml:"EncryptionConfig"` CompressionConfig *CompressionConfig `json:"CompressionConfig" yaml:"CompressionConfig"` PoolConfig *PoolConfig `json:"PoolConfig" yaml:"PoolConfig"` ConsumerConfigs map[string]*ConsumerConfig `json:"ConsumerConfigs" yaml:"ConsumerConfigs"` PublisherConfig *PublisherConfig `json:"PublisherConfig" yaml:"PublisherConfig"` }
RabbitSeasoning represents the configuration values.
func ConvertJSONFileToConfig ¶
func ConvertJSONFileToConfig(fileNamePath string) (*RabbitSeasoning, error)
ConvertJSONFileToConfig opens a file.json and converts to RabbitSeasoning.
type RabbitService ¶
type RabbitService struct { Config *RabbitSeasoning ConnectionPool *ConnectionPool Topologer *Topologer Publisher *Publisher // contains filtered or unexported fields }
RabbitService is the struct for containing all you need for RabbitMQ access.
func NewRabbitService ¶
func NewRabbitService( config *RabbitSeasoning, passphrase string, salt string, processPublishReceipts func(*PublishReceipt), processError func(error)) (*RabbitService, error)
NewRabbitService creates everything you need for a RabbitMQ communication service.
func NewRabbitServiceWithConnectionPool ¶ added in v2.2.6
func NewRabbitServiceWithConnectionPool( connectionPool *ConnectionPool, config *RabbitSeasoning, passphrase string, salt string, processPublishReceipts func(*PublishReceipt), processError func(error)) (*RabbitService, error)
NewRabbitServiceWithConnectionPool creates everything you need for a RabbitMQ communication service from a connection pool.
func NewRabbitServiceWithPublisher ¶ added in v2.2.9
func NewRabbitServiceWithPublisher( publisher *Publisher, config *RabbitSeasoning, passphrase string, salt string, processPublishReceipts func(*PublishReceipt), processError func(error)) (*RabbitService, error)
NewRabbitServiceWithPublisher creates everything you need for a RabbitMQ communication service from a publisher.
func (*RabbitService) CentralErr ¶
func (rs *RabbitService) CentralErr() <-chan error
CentralErr yields all the internal errs for sub-processes.
func (*RabbitService) GetConsumer ¶
func (rs *RabbitService) GetConsumer(consumerName string) (*Consumer, error)
GetConsumer allows you to get the individual consumers stored in memory.
func (*RabbitService) GetConsumerConfig ¶
func (rs *RabbitService) GetConsumerConfig(consumerName string) (*ConsumerConfig, error)
GetConsumerConfig allows you to get the individual consumers' config stored in memory.
func (*RabbitService) Publish ¶
func (rs *RabbitService) Publish( input interface{}, exchangeName, routingKey, metadata string, wrapPayload bool, headers amqp.Table) error
Publish tries to publish directly without retry and data optionally wrapped in a ModdedLetter.
func (*RabbitService) PublishData ¶
func (rs *RabbitService) PublishData( data []byte, exchangeName, routingKey string, headers amqp.Table) error
PublishData tries to publish.
func (*RabbitService) PublishLetter ¶
func (rs *RabbitService) PublishLetter(letter *Letter) error
PublishLetter wraps around Publisher to simply Publish.
func (*RabbitService) PublishWithConfirmation ¶
func (rs *RabbitService) PublishWithConfirmation( input interface{}, exchangeName, routingKey, metadata string, wrapPayload bool, headers amqp.Table) error
PublishWithConfirmation tries to publish and wait for a confirmation.
func (*RabbitService) QueueLetter ¶
func (rs *RabbitService) QueueLetter(letter *Letter) error
QueueLetter wraps around AutoPublisher to simply QueueLetter. Error indicates message was not queued.
func (*RabbitService) Shutdown ¶
func (rs *RabbitService) Shutdown(stopConsumers bool)
Shutdown stops the service and shuts down the ChannelPool.
type ReceivedMessage ¶
type ReceivedMessage struct { IsAckable bool Body []byte MessageID string // LetterID ApplicationID string PublishDate string Delivery amqp.Delivery // Access everything. }
ReceivedMessage allow for you to acknowledge, after processing the received payload, by its RabbitMQ tag and Channel pointer.
func NewReceivedMessage ¶
func NewReceivedMessage( isAckable bool, delivery amqp.Delivery) *ReceivedMessage
NewReceivedMessage creates a new ReceivedMessage.
func (*ReceivedMessage) Acknowledge ¶
func (msg *ReceivedMessage) Acknowledge() error
Acknowledge allows for you to acknowledge message on the original channel it was received. Will fail if channel is closed and this is by design per RabbitMQ server. Can't ack from a different channel.
func (*ReceivedMessage) Nack ¶
func (msg *ReceivedMessage) Nack(requeue bool) error
Nack allows for you to negative acknowledge message on the original channel it was received. Will fail if channel is closed and this is by design per RabbitMQ server.
func (*ReceivedMessage) Reject ¶
func (msg *ReceivedMessage) Reject(requeue bool) error
Reject allows for you to reject on the original channel it was received. Will fail if channel is closed and this is by design per RabbitMQ server.
type ReturnMessage ¶
type ReturnMessage struct { ReplyCode uint16 // reason ReplyText string // description Exchange string // basic.publish exchange RoutingKey string // basic.publish routing key // Properties ContentType string // MIME content type ContentEncoding string // MIME content encoding Headers map[string]interface{} // Application or header exchange table DeliveryMode uint8 // queue implementation use - non-persistent (1) or persistent (2) Priority uint8 // queue implementation use - 0 to 9 CorrelationID string // application use - correlation identifier ReplyTo string // application use - address to to reply to (ex: RPC) Expiration string // implementation use - message expiration spec MessageID string // application use - message identifier Timestamp time.Time // application use - message timestamp Type string // application use - message type name UserID string // application use - creating user id AppID string // application use - creating application Body []byte }
ReturnMessage allow for you to replay a message that was returned.
func NewReturnMessage ¶
func NewReturnMessage(amqpReturn *amqp.Return) *ReturnMessage
NewReturnMessage creates a new ReturnMessage.
type TLSConfig ¶
type TLSConfig struct { EnableTLS bool `json:"EnableTLS" yaml:"EnableTLS"` // Use TLSConfig to create connections with AMQPS uri. PEMCertLocation string `json:"PEMCertLocation" yaml:"PEMCertLocation"` LocalCertLocation string `json:"LocalCertLocation" yaml:"LocalCertLocation"` CertServerName string `json:"CertServerName" yaml:"CertServerName"` }
TLSConfig represents settings for configuring TLS.
type Topologer ¶
type Topologer struct {
ConnectionPool *ConnectionPool
}
Topologer allows you to build RabbitMQ topology backed by a ConnectionPool.
func NewTopologer ¶
func NewTopologer(cp *ConnectionPool) *Topologer
NewTopologer builds you a new Topologer.
func (*Topologer) BindExchanges ¶
func (top *Topologer) BindExchanges(bindings []*ExchangeBinding, ignoreErrors bool) error
BindExchanges loops thrrough and binds Exchanges to Exchanges - stops on first error.
func (*Topologer) BindQueues ¶
func (top *Topologer) BindQueues(bindings []*QueueBinding, ignoreErrors bool) error
BindQueues loops through and binds Queues to Exchanges - stops on first error.
func (*Topologer) BuildExchanges ¶
BuildExchanges loops through and builds Exchanges - stops on first error.
func (*Topologer) BuildQueues ¶
BuildQueues loops through and builds Queues - stops on first error.
func (*Topologer) BuildTopology ¶
func (top *Topologer) BuildTopology(config *TopologyConfig, ignoreErrors bool) error
BuildTopology builds a topology based on a TopologyConfig - stops on first error.
func (*Topologer) CreateExchange ¶
func (top *Topologer) CreateExchange( exchangeName string, exchangeType string, passiveDeclare, durable, autoDelete, internal, noWait bool, args map[string]interface{}) error
CreateExchange builds an Exchange topology.
func (*Topologer) CreateExchangeFromConfig ¶
CreateExchangeFromConfig builds an Exchange topology from a config Exchange element.
func (*Topologer) CreateQueue ¶
func (top *Topologer) CreateQueue( queueName string, passiveDeclare bool, durable bool, autoDelete bool, exclusive bool, noWait bool, args map[string]interface{}) error
CreateQueue builds a Queue topology.
func (*Topologer) CreateQueueFromConfig ¶
CreateQueueFromConfig builds a Queue topology from a config Exchange element.
func (*Topologer) ExchangeBind ¶
func (top *Topologer) ExchangeBind(exchangeBinding *ExchangeBinding) error
ExchangeBind binds an exchange to an Exchange.
func (*Topologer) ExchangeDelete ¶
ExchangeDelete removes the exchange from the server.
func (*Topologer) ExchangeUnbind ¶
func (top *Topologer) ExchangeUnbind(exchangeName, routingKey, parentExchangeName string, noWait bool, args map[string]interface{}) error
ExchangeUnbind removes the binding of an Exchange to an Exchange.
func (*Topologer) PurgeQueue ¶
PurgeQueue removes all messages from the Queue that are not waiting to be Acknowledged and returns the count.
func (*Topologer) PurgeQueues ¶
PurgeQueues purges each Queue provided.
func (*Topologer) QueueBind ¶
func (top *Topologer) QueueBind(queueBinding *QueueBinding) error
QueueBind binds an Exchange to a Queue.
func (*Topologer) QueueDelete ¶
QueueDelete removes the queue from the server (and all bindings) and returns messages purged (count).
type TopologyConfig ¶
type TopologyConfig struct { Exchanges []*Exchange `json:"Exchanges" yaml:"Exchanges"` Queues []*Queue `json:"Queues" yaml:"Queues"` QueueBindings []*QueueBinding `json:"QueueBindings" yaml:"QueueBindings"` ExchangeBindings []*ExchangeBinding `json:"ExchangeBindings" yaml:"ExchangeBindings"` }
TopologyConfig allows you to build simple toplogies from a JSON file.
func ConvertJSONFileToTopologyConfig ¶
func ConvertJSONFileToTopologyConfig(fileNamePath string) (*TopologyConfig, error)
ConvertJSONFileToTopologyConfig opens a file.json and converts to Topology.
type WrappedBody ¶
type WrappedBody struct { LetterID uuid.UUID `json:"LetterID"` Body *ModdedBody `json:"Body"` LetterMetadata string `json:"LetterMetadata"` }
WrappedBody is to go inside a Letter struct with indications of the body of data being modified (ex., compressed).
func ReadWrappedBodyFromJSONBytes ¶
func ReadWrappedBodyFromJSONBytes(data []byte) (*WrappedBody, error)
ReadWrappedBodyFromJSONBytes simply read the bytes as a Letter.