bunnify

package
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 14, 2024 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func EventAck added in v0.0.2

func EventAck(queue string, routingKey string, milliseconds int64)

func EventNack added in v0.0.2

func EventNack(queue string, routingKey string, milliseconds int64)

func EventNotParsable added in v0.0.2

func EventNotParsable(queue string, routingKey string)

func EventPublishFailed added in v0.0.2

func EventPublishFailed(exchange string, routingKey string)

func EventPublishSucceed added in v0.0.2

func EventPublishSucceed(exchange string, routingKey string)

func EventReceived added in v0.0.2

func EventReceived(queue string, routingKey string)

func EventWithoutHandler added in v0.0.2

func EventWithoutHandler(queue string, routingKey string)

func InitMetrics added in v0.0.2

func InitMetrics(registerer prometheus.Registerer) error

func WithBindingToExchange

func WithBindingToExchange(exchange string) func(*consumerOption)

WithBindingToExchange specifies the exchange on which the queue will bind for the handlers provided.

func WithCorrelationID

func WithCorrelationID(correlationID string) func(*eventOptions)

WithCorrelationID specifies the correlationID to be published if it is not used a random uuid will be generated.

func WithDeadLetterQueue

func WithDeadLetterQueue(queueName string) func(*consumerOption)

WithDeadLetterQueue indicates which queue will receive the events that were NACKed for this consumer.

func WithDefaultHandler

func WithDefaultHandler(handler EventHandler[json.RawMessage]) func(*consumerOption)

WithDefaultHandler specifies a handler that can be use for any type of routing key without a defined handler. This is mostly convenient if you don't care about the specific payload of the event, which will be received as a byte array.

func WithEventID

func WithEventID(eventID string) func(*eventOptions)

WithEventID specifies the eventID to be published if it is not used a random uuid will be generated.

func WithHandler

func WithHandler[T any](routingKey string, handler EventHandler[T]) func(*consumerOption)

WithHandler specifies under which routing key the provided handler will be invoked. The routing key indicated here will be bound to the queue if the WithBindingToExchange is supplied.

func WithNotificationChannel

func WithNotificationChannel(notificationCh chan<- Notification) func(*connectionOption)

WithNotificationChannel specifies a go channel to receive messages such as connection established, reconnecting, event published, consumed, etc.

func WithQoS

func WithQoS(prefetchCount, prefetchSize int) func(*consumerOption)

WithQoS specifies the prefetch count and size for the consumer.

func WithQuorumQueue

func WithQuorumQueue() func(*consumerOption)

WithQuorumQueue specifies that the queue to consume will be created as quorum queue. Quorum queues are used when data safety is the priority.

func WithReconnectInterval

func WithReconnectInterval(interval time.Duration) func(*connectionOption)

WithReconnectInterval establishes how much time to wait between each attempt of connection.

func WithURI

func WithURI(URI string) func(*connectionOption)

WithURI allows the consumer to specify the AMQP Server. It should be in the format of amqp://0.0.0.0:5672

Types

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection represents a connection towards the AMQP server. A single connection should be enough for the entire application as the consuming and publishing is handled by channels.

func NewConnection

func NewConnection(opts ...func(*connectionOption)) *Connection

NewConnection creates a new AMQP connection using the indicated options. If the consumer does not supply options, it will by default connect to a localhost instance on and try to reconnect every 10 seconds.

func (*Connection) Close

func (c *Connection) Close() error

Closes connection with towards the AMQP server

func (*Connection) NewConsumer

func (c *Connection) NewConsumer(
	queueName string,
	opts ...func(*consumerOption)) Consumer

NewConsumer creates a consumer for a given queue using the specified connection. Information messages such as channel status will be sent to the notification channel if it was specified on the connection struct. If no QoS is supplied the prefetch count will be of 20.

func (*Connection) NewPublisher

func (c *Connection) NewPublisher() *Publisher

NewPublisher creates a publisher using the specified connection.

func (*Connection) Start

func (c *Connection) Start() error

Start establishes the connection towards the AMQP server. Only returns errors when the uri is not valid (retry won't do a thing)

type ConsumableEvent

type ConsumableEvent[T any] struct {
	Metadata
	DeliveryInfo DeliveryInfo
	Payload      T
}

ConsumableEvent[T] represents an event that can be consumed. The type parameter T specifies the type of the event's payload.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer is used for consuming to events from an specified queue.

func (Consumer) Consume

func (c Consumer) Consume() error

Consume will start consuming events for the indicated queue. The first time this function is called it will return error if handlers or default handler are not specified and also if queues, exchanges, bindings or qos creation don't succeed. In case this function gets called recursively due to channel reconnection, the errors will be pushed to the notification channel (if one has been indicated in the connection).

type DeliveryInfo

type DeliveryInfo struct {
	Queue      string
	Exchange   string
	RoutingKey string
}

DeliveryInfo holds information of original queue, exchange and routing keys.

type EventHandler

type EventHandler[T any] func(ctx context.Context, event ConsumableEvent[T]) error

EventHandler is the type definition for a function that is used to handle events of a specific type.

type Metadata

type Metadata struct {
	ID            string    `json:"id"`
	CorrelationID string    `json:"correlationId"`
	Timestamp     time.Time `json:"timestamp"`
}

Metadata holds the metadata of an event.

type Notification

type Notification struct {
	Message string
	Type    NotificationType
	Source  NotificationSource
}

func (Notification) String

func (n Notification) String() string

type NotificationSource

type NotificationSource string
const (
	NotificationSourceConnection NotificationSource = "CONNECTION"
	NotificationSourceConsumer   NotificationSource = "CONSUMER"
	NotificationSourcePublisher  NotificationSource = "PUBLISHER"
)

type NotificationType

type NotificationType string
const (
	NotificationTypeInfo  NotificationType = "INFO"
	NotificationTypeError NotificationType = "ERROR"
)

type PublishableEvent

type PublishableEvent struct {
	Metadata
	Payload any `json:"payload"`
}

PublishableEvent represents an event that can be published. The Payload field holds the event's payload data, which can be of any type that can be marshal to json.

func NewPublishableEvent

func NewPublishableEvent(payload any, opts ...func(*eventOptions)) PublishableEvent

NewPublishableEvent creates an instance of a PublishableEvent. In case the ID and correlation ID are not supplied via options random uuid will be generated.

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher is used for publishing events.

func (*Publisher) Publish

func (p *Publisher) Publish(
	ctx context.Context,
	exchange, routingKey string,
	event PublishableEvent) error

Publish publishes an event to the specified exchange. If the channel is closed, it will retry until a channel is obtained.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL