Documentation ¶
Index ¶
- Constants
- Variables
- type Channel
- func (c *Channel) CloseChan() <-chan error
- func (c *Channel) ConsumerBuilder(name, queue string) *ConsumerBuilder
- func (c *Channel) DirectExchangeBuilder(name string) *ExchangeBuilder
- func (c *Channel) ExchangeBuilder(name string) *ExchangeBuilder
- func (c *Channel) FanoutExchangeBuilder(name string) *ExchangeBuilder
- func (c *Channel) PublisherBuilder(exchange string, routingKey string) *PublisherBuilder
- func (c *Channel) QueueBuilder() *QueueBuilder
- func (c *Channel) TopicExchangeBuilder(name string) *ExchangeBuilder
- type ChannelImpl
- type Consumer
- type ConsumerBuilder
- func (c *ConsumerBuilder) AddArg(key string, val interface{}) *ConsumerBuilder
- func (c *ConsumerBuilder) Build() (*Consumer, error)
- func (c *ConsumerBuilder) SetAutoAck() *ConsumerBuilder
- func (c *ConsumerBuilder) SetExclusive() *ConsumerBuilder
- func (c *ConsumerBuilder) SetNoLocal() *ConsumerBuilder
- func (c *ConsumerBuilder) SetNoWait() *ConsumerBuilder
- func (c *ConsumerBuilder) SetPrefetch(prefetch int) *ConsumerBuilder
- type ConsumerBuilderImpl
- type ConsumerImpl
- type Delivery
- type ExchangeBuilder
- func (e *ExchangeBuilder) AddArg(key string, val interface{}) *ExchangeBuilder
- func (e *ExchangeBuilder) Declare() error
- func (e *ExchangeBuilder) DeleteOnDeclare(ifUnused, noWait bool) *ExchangeBuilder
- func (e *ExchangeBuilder) SetAutoDelete() *ExchangeBuilder
- func (e *ExchangeBuilder) SetDurable() *ExchangeBuilder
- func (e *ExchangeBuilder) SetInternal() *ExchangeBuilder
- func (e *ExchangeBuilder) SetNoWait() *ExchangeBuilder
- type ExchangeBuilderImpl
- type PublishFields
- func (p *PublishFields) AddHeader(key string, val interface{}) *PublishFields
- func (p *PublishFields) DeliveryModePersistent() *PublishFields
- func (p *PublishFields) DeliveryModeTransient() *PublishFields
- func (p *PublishFields) SetContentType(contentType string) *PublishFields
- func (p *PublishFields) SetCorrelationID(id string) *PublishFields
- func (p *PublishFields) SetDataTypeBytes() *PublishFields
- func (p *PublishFields) SetDataTypeJSON() *PublishFields
- func (p *PublishFields) SetExpiration(dur time.Duration) *PublishFields
- func (p *PublishFields) SetImmediate() *PublishFields
- func (p *PublishFields) SetMandatory() *PublishFields
- func (p *PublishFields) SetReplyToID(id string) *PublishFields
- type PublishFieldsImpl
- type Publisher
- func (p *Publisher) Fields() *PublishFields
- func (p *Publisher) Publish(ctx context.Context, data interface{}) error
- func (p *Publisher) PublishAwaitResponse(ctx context.Context, data interface{}, ...) (amqp091.Delivery, error)
- func (p *Publisher) PublishWithConfirmation(ctx context.Context, data interface{}) (bool, error)
- func (p *Publisher) WithFields(fields *PublishFields) *Publisher
- type PublisherBuilder
- type PublisherBuilderImpl
- type PublisherImpl
- type Queue
- type QueueBuilder
- func (q *QueueBuilder) AddArg(key string, val interface{}) *QueueBuilder
- func (q *QueueBuilder) Declare() (*Queue, error)
- func (q *QueueBuilder) SetAutoDelete() *QueueBuilder
- func (q *QueueBuilder) SetDurable() *QueueBuilder
- func (q *QueueBuilder) SetExclusive() *QueueBuilder
- func (q *QueueBuilder) SetName(name string) *QueueBuilder
- func (q *QueueBuilder) SetNoWait() *QueueBuilder
- type QueueBuilderImpl
- type QueueImpl
- type RMQ
- type RmqImpl
- type RmqOptions
Constants ¶
View Source
const ( DataTypeBytes dataType = iota DataTypeJSON )
Variables ¶
View Source
var ( ConnectionNotSetError = errors.New("connection_is_not_set") DataIsNotBytesError = errors.New("data_is_not_of_bytes_type") ResponseMapNotSetError = errors.New("response_map_not_set") CorrelationIdNotSetError = errors.New("correlation_id_not_set") PublishResponseInvalidReplyToIdError = errors.New("publish_response_invalid_reply_to_id") )
View Source
var ConnectionClosedError = fmt.Errorf("connection_closed")
Functions ¶
This section is empty.
Types ¶
type Channel ¶ added in v0.0.10
type Channel struct {
// contains filtered or unexported fields
}
func (*Channel) ConsumerBuilder ¶ added in v0.0.10
func (c *Channel) ConsumerBuilder(name, queue string) *ConsumerBuilder
func (*Channel) DirectExchangeBuilder ¶ added in v0.0.11
func (c *Channel) DirectExchangeBuilder(name string) *ExchangeBuilder
func (*Channel) ExchangeBuilder ¶ added in v0.0.11
func (c *Channel) ExchangeBuilder(name string) *ExchangeBuilder
func (*Channel) FanoutExchangeBuilder ¶ added in v0.0.11
func (c *Channel) FanoutExchangeBuilder(name string) *ExchangeBuilder
func (*Channel) PublisherBuilder ¶ added in v0.0.10
func (c *Channel) PublisherBuilder(exchange string, routingKey string) *PublisherBuilder
func (*Channel) QueueBuilder ¶ added in v0.0.10
func (c *Channel) QueueBuilder() *QueueBuilder
func (*Channel) TopicExchangeBuilder ¶ added in v0.0.11
func (c *Channel) TopicExchangeBuilder(name string) *ExchangeBuilder
type ChannelImpl ¶ added in v0.0.16
type ChannelImpl interface { PublisherBuilder(exchange string, routingKey string) PublisherBuilderImpl ConsumerBuilder(name, queue string) ConsumerBuilderImpl QueueBuilder() QueueBuilderImpl ExchangeBuilder(name string) ExchangeBuilderImpl FanoutExchangeBuilder(name string) ExchangeBuilderImpl DirectExchangeBuilder(name string) ExchangeBuilderImpl TopicExchangeBuilder(name string) ExchangeBuilderImpl CloseChan() <-chan error }
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
type ConsumerBuilder ¶ added in v0.0.10
type ConsumerBuilder struct {
// contains filtered or unexported fields
}
func (*ConsumerBuilder) AddArg ¶ added in v0.0.10
func (c *ConsumerBuilder) AddArg(key string, val interface{}) *ConsumerBuilder
func (*ConsumerBuilder) Build ¶ added in v0.0.10
func (c *ConsumerBuilder) Build() (*Consumer, error)
func (*ConsumerBuilder) SetAutoAck ¶ added in v0.0.10
func (c *ConsumerBuilder) SetAutoAck() *ConsumerBuilder
func (*ConsumerBuilder) SetExclusive ¶ added in v0.0.10
func (c *ConsumerBuilder) SetExclusive() *ConsumerBuilder
func (*ConsumerBuilder) SetNoLocal ¶ added in v0.0.10
func (c *ConsumerBuilder) SetNoLocal() *ConsumerBuilder
func (*ConsumerBuilder) SetNoWait ¶ added in v0.0.10
func (c *ConsumerBuilder) SetNoWait() *ConsumerBuilder
func (*ConsumerBuilder) SetPrefetch ¶ added in v0.0.12
func (c *ConsumerBuilder) SetPrefetch(prefetch int) *ConsumerBuilder
type ConsumerBuilderImpl ¶ added in v0.0.16
type ConsumerBuilderImpl interface { SetAutoAck() ConsumerBuilderImpl SetExclusive() ConsumerBuilderImpl SetNoLocal() ConsumerBuilderImpl SetNoWait() ConsumerBuilderImpl SetPrefetch(prefetch int) ConsumerBuilderImpl AddArg(key string, val interface{}) ConsumerBuilderImpl Build() (ConsumerImpl, error) }
type ConsumerImpl ¶ added in v0.0.16
type Delivery ¶
type Delivery struct { *amqp091.Delivery // contains filtered or unexported fields }
type ExchangeBuilder ¶ added in v0.0.10
type ExchangeBuilder struct {
// contains filtered or unexported fields
}
func (*ExchangeBuilder) AddArg ¶ added in v0.0.10
func (e *ExchangeBuilder) AddArg(key string, val interface{}) *ExchangeBuilder
func (*ExchangeBuilder) Declare ¶ added in v0.0.10
func (e *ExchangeBuilder) Declare() error
func (*ExchangeBuilder) DeleteOnDeclare ¶ added in v0.0.10
func (e *ExchangeBuilder) DeleteOnDeclare(ifUnused, noWait bool) *ExchangeBuilder
func (*ExchangeBuilder) SetAutoDelete ¶ added in v0.0.10
func (e *ExchangeBuilder) SetAutoDelete() *ExchangeBuilder
func (*ExchangeBuilder) SetDurable ¶ added in v0.0.10
func (e *ExchangeBuilder) SetDurable() *ExchangeBuilder
func (*ExchangeBuilder) SetInternal ¶ added in v0.0.10
func (e *ExchangeBuilder) SetInternal() *ExchangeBuilder
func (*ExchangeBuilder) SetNoWait ¶ added in v0.0.10
func (e *ExchangeBuilder) SetNoWait() *ExchangeBuilder
type ExchangeBuilderImpl ¶ added in v0.0.16
type ExchangeBuilderImpl interface { DeleteOnDeclare(ifUnused, noWait bool) ExchangeBuilderImpl SetDurable() ExchangeBuilderImpl SetAutoDelete() ExchangeBuilderImpl SetInternal() ExchangeBuilderImpl SetNoWait() ExchangeBuilderImpl AddArg(key string, val interface{}) ExchangeBuilderImpl Declare() error }
type PublishFields ¶ added in v0.0.3
type PublishFields struct {
// contains filtered or unexported fields
}
func NewPublishFields ¶ added in v0.0.3
func NewPublishFields() *PublishFields
func (*PublishFields) AddHeader ¶ added in v0.0.3
func (p *PublishFields) AddHeader(key string, val interface{}) *PublishFields
func (*PublishFields) DeliveryModePersistent ¶ added in v0.0.3
func (p *PublishFields) DeliveryModePersistent() *PublishFields
func (*PublishFields) DeliveryModeTransient ¶ added in v0.0.3
func (p *PublishFields) DeliveryModeTransient() *PublishFields
func (*PublishFields) SetContentType ¶ added in v0.0.3
func (p *PublishFields) SetContentType(contentType string) *PublishFields
func (*PublishFields) SetCorrelationID ¶ added in v0.0.3
func (p *PublishFields) SetCorrelationID(id string) *PublishFields
func (*PublishFields) SetDataTypeBytes ¶ added in v0.0.3
func (p *PublishFields) SetDataTypeBytes() *PublishFields
func (*PublishFields) SetDataTypeJSON ¶ added in v0.0.3
func (p *PublishFields) SetDataTypeJSON() *PublishFields
func (*PublishFields) SetExpiration ¶ added in v0.0.3
func (p *PublishFields) SetExpiration(dur time.Duration) *PublishFields
func (*PublishFields) SetImmediate ¶ added in v0.0.3
func (p *PublishFields) SetImmediate() *PublishFields
func (*PublishFields) SetMandatory ¶ added in v0.0.3
func (p *PublishFields) SetMandatory() *PublishFields
func (*PublishFields) SetReplyToID ¶ added in v0.0.3
func (p *PublishFields) SetReplyToID(id string) *PublishFields
type PublishFieldsImpl ¶ added in v0.0.16
type PublishFieldsImpl interface { SetDataTypeBytes() PublisherBuilderImpl SetDataTypeJSON() PublisherBuilderImpl SetContentType(contentType string) PublisherBuilderImpl DeliveryModePersistent() PublisherBuilderImpl DeliveryModeTransient() PublisherBuilderImpl AddHeader(key string, val interface{}) PublisherBuilderImpl SetCorrelationID(id string) PublisherBuilderImpl SetReplyToID(id string) PublisherBuilderImpl SetExpiration(dur time.Duration) PublisherBuilderImpl SetMandatory() PublisherBuilderImpl SetImmediate() PublisherBuilderImpl }
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
func (*Publisher) Fields ¶ added in v0.0.10
func (p *Publisher) Fields() *PublishFields
func (*Publisher) PublishAwaitResponse ¶
func (p *Publisher) PublishAwaitResponse( ctx context.Context, data interface{}, responseMap *genericSync.Map[chan amqp091.Delivery], ) (amqp091.Delivery, error)
PublishAwaitResponse creates a channel and stores it in responseMap
If an outsider writes the response to that map it checks for the reply to id to see if it matches the correlation id Fails upon an invalid correlation id or in case of a timeout
func (*Publisher) PublishWithConfirmation ¶
func (*Publisher) WithFields ¶ added in v0.0.3
func (p *Publisher) WithFields(fields *PublishFields) *Publisher
type PublisherBuilder ¶ added in v0.0.10
type PublisherBuilder struct {
// contains filtered or unexported fields
}
func (*PublisherBuilder) New ¶ added in v0.0.10
func (p *PublisherBuilder) New() *Publisher
func (*PublisherBuilder) NewWithDefaultFields ¶ added in v0.0.10
func (p *PublisherBuilder) NewWithDefaultFields() *Publisher
func (*PublisherBuilder) WithFields ¶ added in v0.0.10
func (p *PublisherBuilder) WithFields(fields *PublishFields) *PublisherBuilder
type PublisherBuilderImpl ¶ added in v0.0.16
type PublisherBuilderImpl interface { WithFields(fields *PublishFields) PublisherBuilderImpl New() PublisherImpl NewWithDefaultFields() PublisherImpl }
type PublisherImpl ¶ added in v0.0.16
type PublisherImpl interface { WithFields(fields PublishFieldsImpl) PublisherImpl Fields() PublishFieldsImpl Publish(ctx context.Context, data interface{}) error PublishAwaitResponse( ctx context.Context, data interface{}, responseMap *genericSync.Map[chan amqp091.Delivery], ) (amqp091.Delivery, error) PublishWithConfirmation(ctx context.Context, data interface{}) (bool, error) }
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
func (*Queue) BindToExchange ¶
type QueueBuilder ¶
type QueueBuilder struct {
// contains filtered or unexported fields
}
func (*QueueBuilder) AddArg ¶
func (q *QueueBuilder) AddArg(key string, val interface{}) *QueueBuilder
func (*QueueBuilder) Declare ¶
func (q *QueueBuilder) Declare() (*Queue, error)
func (*QueueBuilder) SetAutoDelete ¶
func (q *QueueBuilder) SetAutoDelete() *QueueBuilder
func (*QueueBuilder) SetDurable ¶
func (q *QueueBuilder) SetDurable() *QueueBuilder
func (*QueueBuilder) SetExclusive ¶
func (q *QueueBuilder) SetExclusive() *QueueBuilder
func (*QueueBuilder) SetName ¶
func (q *QueueBuilder) SetName(name string) *QueueBuilder
func (*QueueBuilder) SetNoWait ¶
func (q *QueueBuilder) SetNoWait() *QueueBuilder
type QueueBuilderImpl ¶ added in v0.0.16
type QueueBuilderImpl interface { SetName(name string) QueueBuilderImpl SetDurable() QueueBuilderImpl SetAutoDelete() QueueBuilderImpl SetExclusive() QueueBuilderImpl SetNoWait() QueueBuilderImpl AddArg(key string, val interface{}) QueueBuilderImpl Declare() (QueueImpl, error) }
type RMQ ¶ added in v0.0.10
type RMQ struct {
// contains filtered or unexported fields
}
func (*RMQ) Connect ¶ added in v0.0.10
func (r *RMQ) Connect(retryCount int, retryDelay time.Duration, onRetryError func(err error)) (<-chan error, error)
Connect
Important: onRetryError It will block the reconnection so make sure to use goroutine in the callback
func (*RMQ) NewChannel ¶ added in v0.0.10
func (*RMQ) NewChannelWithConfirm ¶ added in v0.0.10
type RmqOptions ¶ added in v0.0.10
type RmqOptions struct {
// contains filtered or unexported fields
}
func NewRmqOptions ¶ added in v0.0.10
func NewRmqOptions() *RmqOptions
func (*RmqOptions) SetReconnectDelay ¶ added in v0.0.10
func (r *RmqOptions) SetReconnectDelay(delay int) *RmqOptions
func (*RmqOptions) SetReconnectTries ¶ added in v0.0.10
func (r *RmqOptions) SetReconnectTries(tries int) *RmqOptions
Source Files ¶
Click to show internal directories.
Click to hide internal directories.