Versions in this module Expand all Collapse all v2 v2.1.0 Oct 14, 2020 Changes in this version + const AesSymmetricType + const GzipCompressionType + const QueueTypeClassic + const QueueTypeQuorum + const ZstdCompressionType + func CompareArgon2Hash(passphrase, salt string, multiplier uint32, hashedPassword []byte) (bool, error) + func CompressWithGzip(data []byte, buffer *bytes.Buffer) error + func CompressWithZstd(data []byte, buffer *bytes.Buffer) error + func CreatePayload(input interface{}, compression *CompressionConfig, ...) ([]byte, error) + func CreateTLSConfig(pemLocation string, localLocation string) (*tls.Config, error) + func CreateWrappedPayload(input interface{}, letterID uuid.UUID, metadata string, ...) ([]byte, error) + func DecompressWithGzip(buffer *bytes.Buffer) error + func DecompressWithZstd(buffer *bytes.Buffer) error + func DecryptWithAes(cipherDataWithNonce, hashedKey []byte, nonceSize int) ([]byte, error) + func EncryptWithAes(data, hashedKey []byte, nonceSize int) ([]byte, error) + func GetHashWithArgon(passphrase, salt string, timeConsideration uint32, multiplier uint32, ...) []byte + func GetStringHashWithArgon(passphrase, salt string, timeConsideration uint32, threads uint8, ...) string + func RandomBytes(size int) []byte + func RandomString(size int) string + func RandomStringFromSource(size int, src rand.Source) string + func ReadJSONFileToInterface(fileNamePath string) (interface{}, error) + func ReadPayload(buffer *bytes.Buffer, compression *CompressionConfig, ...) error + func RepeatedBytes(size int, repeat int) []byte + func RepeatedRandomString(size int, repeat int) string + type ChannelHost struct + Ackable bool + CachedChannel bool + Channel *amqp.Channel + Confirmations chan amqp.Confirmation + ConnectionID uint64 + Errors chan *amqp.Error + ID uint64 + func NewChannelHost(connHost *ConnectionHost, id uint64, connectionID uint64, ackable, cached bool) (*ChannelHost, error) + func (ch *ChannelHost) Close() + func (ch *ChannelHost) FlushConfirms() + func (ch *ChannelHost) MakeChannel() (err error) + func (ch *ChannelHost) PauseForFlowControl() + type CompressionConfig struct + Enabled bool + Type string + type ConnectionHost struct + Blockers chan amqp.Blocking + CachedChannelCount uint64 + Connection *amqp.Connection + ConnectionID uint64 + Errors chan *amqp.Error + func NewConnectionHost(uri string, connectionName string, connectionID uint64, ...) (*ConnectionHost, error) + func (ch *ConnectionHost) Connect() bool + func (ch *ConnectionHost) PauseOnFlowControl() + type ConnectionPool struct + Config PoolConfig + func NewConnectionPool(config *PoolConfig) (*ConnectionPool, error) + func NewConnectionPoolWithErrorHandler(config *PoolConfig, errorHandler func(error)) (*ConnectionPool, error) + func (cp *ConnectionPool) GetChannelFromPool() *ChannelHost + func (cp *ConnectionPool) GetConnection() (*ConnectionHost, error) + func (cp *ConnectionPool) GetTransientChannel(ackable bool) *amqp.Channel + func (cp *ConnectionPool) ReturnChannel(chanHost *ChannelHost, erred bool) + func (cp *ConnectionPool) ReturnConnection(connHost *ConnectionHost, flag bool) + func (cp *ConnectionPool) Shutdown() + type Consumer struct + Config *ConsumerConfig + ConnectionPool *ConnectionPool + ConsumerName string + Enabled bool + QueueName string + func NewConsumer(rconfig *RabbitSeasoning, cp *ConnectionPool, queuename string, ...) (*Consumer, error) + func NewConsumerFromConfig(config *ConsumerConfig, cp *ConnectionPool) *Consumer + func (con *Consumer) Errors() <-chan error + func (con *Consumer) FlushErrors() + func (con *Consumer) FlushMessages() + func (con *Consumer) FlushStop() + func (con *Consumer) Get(queueName string) (*amqp.Delivery, error) + func (con *Consumer) GetBatch(queueName string, batchSize int) ([]*amqp.Delivery, error) + func (con *Consumer) ReceivedMessages() <-chan *ReceivedMessage + func (con *Consumer) StartConsuming() + func (con *Consumer) StartConsumingWithAction(action func(*ReceivedMessage)) + func (con *Consumer) Started() bool + func (con *Consumer) StopConsuming(immediate bool, flushMessages bool) error + type ConsumerConfig struct + Args map[string]interface{} + AutoAck bool + ConsumerName string + Enabled bool + Exclusive bool + NoWait bool + QosCountOverride int + QueueName string + SleepOnErrorInterval uint32 + SleepOnIdleInterval uint32 + type EncryptionConfig struct + Enabled bool + Hashkey []byte + MemoryMultiplier uint32 + Threads uint8 + TimeConsideration uint32 + Type string + type Envelope struct + ContentType string + DeliveryMode uint8 + Exchange string + Headers amqp.Table + Immediate bool + Mandatory bool + RoutingKey string + type ErrorMessage struct + Code int + Reason string + Recover bool + Server bool + func NewErrorMessage(amqpError *amqp.Error) *ErrorMessage + func (em *ErrorMessage) Error() string + type Exchange struct + Args amqp.Table + AutoDelete bool + Durable bool + InternalOnly bool + Name string + NoWait bool + PassiveDeclare bool + Type string + type ExchangeBinding struct + Args amqp.Table + ExchangeName string + NoWait bool + ParentExchangeName string + RoutingKey string + type Letter struct + Body []byte + Envelope *Envelope + LetterID uuid.UUID + RetryCount uint32 + func CreateLetter(letterID uuid.UUID, exchangeName string, queueName string, body []byte) *Letter + func CreateMockLetter(letterID uuid.UUID, exchangeName string, queueName string, body []byte) *Letter + func CreateMockRandomLetter(queueName string) *Letter + func CreateMockRandomWrappedBodyLetter(queueName string) *Letter + type ModdedBody struct + CType string + Compressed bool + Data []byte + EType string + Encrypted bool + UTCDateTime string + type PoolConfig struct + ConnectionName string + ConnectionTimeout uint32 + Heartbeat uint32 + MaxCacheChannelCount uint64 + MaxConnectionCount uint64 + SleepOnErrorInterval uint32 + TLSConfig *TLSConfig + URI string + type PublishConfirmation struct + Acked bool + DeliveryTag uint64 + func NewPublishConfirmation(confirmation *amqp.Confirmation) *PublishConfirmation + type PublishReceipt struct + Error error + FailedLetter *Letter + LetterID uuid.UUID + Success bool + func (not *PublishReceipt) ToString() string + type Publisher struct + Config *RabbitSeasoning + ConnectionPool *ConnectionPool + func NewPublisher(cp *ConnectionPool, sleepOnIdleInterval time.Duration, ...) *Publisher + func NewPublisherFromConfig(config *RabbitSeasoning, cp *ConnectionPool) *Publisher + func (pub *Publisher) Publish(letter *Letter, skipReceipt bool) + func (pub *Publisher) PublishReceipts() <-chan *PublishReceipt + func (pub *Publisher) PublishWithConfirmation(letter *Letter, timeout time.Duration) + func (pub *Publisher) PublishWithConfirmationContext(ctx context.Context, letter *Letter) + func (pub *Publisher) PublishWithConfirmationTransient(letter *Letter, timeout time.Duration) + func (pub *Publisher) PublishWithTransient(letter *Letter) error + func (pub *Publisher) QueueLetter(letter *Letter) bool + func (pub *Publisher) QueueLetters(letters []*Letter) bool + func (pub *Publisher) Shutdown(shutdownPools bool) + func (pub *Publisher) StartAutoPublishing() + type PublisherConfig struct + AutoAck bool + PublishTimeOutInterval uint32 + SleepOnErrorInterval uint32 + SleepOnIdleInterval uint32 + type Queue struct + Args amqp.Table + AutoDelete bool + Durable bool + Exclusive bool + Name string + NoWait bool + PassiveDeclare bool + Type string + type QueueBinding struct + Args amqp.Table + ExchangeName string + NoWait bool + QueueName string + RoutingKey string + type RabbitSeasoning struct + CompressionConfig *CompressionConfig + ConsumerConfigs map[string]*ConsumerConfig + EncryptionConfig *EncryptionConfig + PoolConfig *PoolConfig + PublisherConfig *PublisherConfig + func ConvertJSONFileToConfig(fileNamePath string) (*RabbitSeasoning, error) + type RabbitService struct + Config *RabbitSeasoning + ConnectionPool *ConnectionPool + Publisher *Publisher + Topologer *Topologer + func NewRabbitService(config *RabbitSeasoning, passphrase string, salt string, ...) (*RabbitService, error) + func (rs *RabbitService) CentralErr() <-chan error + func (rs *RabbitService) GetConsumer(consumerName string) (*Consumer, error) + func (rs *RabbitService) GetConsumerConfig(consumerName string) (*ConsumerConfig, error) + func (rs *RabbitService) Publish(input interface{}, exchangeName, routingKey, metadata string, wrapPayload bool, ...) error + func (rs *RabbitService) PublishData(data []byte, exchangeName, routingKey string, headers amqp.Table) error + func (rs *RabbitService) PublishLetter(letter *Letter) error + func (rs *RabbitService) PublishWithConfirmation(input interface{}, exchangeName, routingKey, metadata string, wrapPayload bool, ...) error + func (rs *RabbitService) QueueLetter(letter *Letter) error + func (rs *RabbitService) Shutdown(stopConsumers bool) + type ReceivedMessage struct + Body []byte + Headers amqp.Table + IsAckable bool + func NewMessage(isAckable bool, body []byte, headers amqp.Table, deliveryTag uint64, ...) *ReceivedMessage + func (msg *ReceivedMessage) Acknowledge() error + func (msg *ReceivedMessage) Nack(requeue bool) error + func (msg *ReceivedMessage) Reject(requeue bool) error + type ReturnMessage struct + AppID string + Body []byte + ContentEncoding string + ContentType string + CorrelationID string + DeliveryMode uint8 + Exchange string + Expiration string + Headers map[string]interface{} + MessageID string + Priority uint8 + ReplyCode uint16 + ReplyText string + ReplyTo string + RoutingKey string + Timestamp time.Time + Type string + UserID string + func NewReturnMessage(amqpReturn *amqp.Return) *ReturnMessage + type TLSConfig struct + CertServerName string + EnableTLS bool + LocalCertLocation string + PEMCertLocation string + type Topologer struct + ConnectionPool *ConnectionPool + func NewTopologer(cp *ConnectionPool) *Topologer + func (top *Topologer) BindExchanges(bindings []*ExchangeBinding, ignoreErrors bool) error + func (top *Topologer) BindQueues(bindings []*QueueBinding, ignoreErrors bool) error + func (top *Topologer) BuildExchanges(exchanges []*Exchange, ignoreErrors bool) error + func (top *Topologer) BuildQueues(queues []*Queue, ignoreErrors bool) error + func (top *Topologer) BuildToplogy(config *TopologyConfig, ignoreErrors bool) error + func (top *Topologer) CreateExchange(exchangeName string, exchangeType string, ...) error + func (top *Topologer) CreateExchangeFromConfig(exchange *Exchange) error + func (top *Topologer) CreateQueue(queueName string, passiveDeclare bool, durable bool, autoDelete bool, ...) error + func (top *Topologer) CreateQueueFromConfig(queue *Queue) error + func (top *Topologer) ExchangeBind(exchangeBinding *ExchangeBinding) error + func (top *Topologer) ExchangeDelete(exchangeName string, ifUnused, noWait bool) error + func (top *Topologer) ExchangeUnbind(exchangeName, routingKey, parentExchangeName string, noWait bool, ...) error + func (top *Topologer) PurgeQueue(queueName string, noWait bool) (int, error) + func (top *Topologer) PurgeQueues(queueNames []string, noWait bool) (int, error) + func (top *Topologer) QueueBind(queueBinding *QueueBinding) error + func (top *Topologer) QueueDelete(name string, ifUnused, ifEmpty, noWait bool) (int, error) + func (top *Topologer) UnbindQueue(queueName, routingKey, exchangeName string, args map[string]interface{}) error + type TopologyConfig struct + ExchangeBindings []*ExchangeBinding + Exchanges []*Exchange + QueueBindings []*QueueBinding + Queues []*Queue + func ConvertJSONFileToTopologyConfig(fileNamePath string) (*TopologyConfig, error) + type WrappedBody struct + Body *ModdedBody + LetterID uuid.UUID + LetterMetadata string + func ReadWrappedBodyFromJSONBytes(data []byte) (*WrappedBody, error)