Documentation
¶
Index ¶
- Constants
- Variables
- func NewContextWithNumRetries(ctx context.Context, numRetries int64) context.Context
- func NumRetriesFromContext(ctx context.Context) int64
- type AMQPError
- type EventBus
- func (b *EventBus) AddHandler(ctx context.Context, matcher eh.EventMatcher, eventHandler eh.EventHandler) error
- func (b *EventBus) AddHandlerWithOptions(ctx context.Context, matcher eh.EventMatcher, eventHandler eh.EventHandler, ...) error
- func (b *EventBus) Close() error
- func (b *EventBus) Errors() <-chan error
- func (b *EventBus) HandleEvent(ctx context.Context, event eh.Event) error
- func (*EventBus) HandlerType() eh.EventHandlerType
- func (b *EventBus) PublishEvent(ctx context.Context, event eh.Event) error
- func (b *EventBus) PublishEventWithOptions(ctx context.Context, event eh.Event, options ...PublishOption) error
- func (b *EventBus) RegisteredHandlers() []eh.EventHandlerType
- func (b *EventBus) RemoveHandler(handlerType eh.EventHandlerType) error
- type EventBusError
- type HandlerOption
- type MaxRetriesExceededHandler
- type Option
- func WithClariMQConnections(publishingConn *clarimq.Connection, consumeConn *clarimq.Connection) Option
- func WithClariMQPublishingCache(publishingCache clarimq.PublishingCache) Option
- func WithConsumerQuantity(concurrency int) Option
- func WithEventCodec(codec eh.EventCodec) Option
- func WithLogging(loggers ...clarimq.Logger) Option
- func WithMaxRecoveryRetry(maxRetries int64) Option
- func WithRetry(maxRetries int64, delays []time.Duration, handler MaxRetriesExceededHandler) Option
- type PublishOption
- type RecoveryFailedError
Constants ¶
const DefaultNumRetries = 0
DefaultNumRetries is the retry value to use if not set in the context.
const ( // InfiniteRetries is the maximum number for recovery or event delivery retries. InfiniteRetries int64 = math.MaxInt64 )
Variables ¶
var ErrCouldNotBeRouted = errors.New("message could not be routed")
ErrCouldNotBeRouted is returned when a mandatory message could not be routed.
var ErrFailedToPublishChannelClosed = errors.New("amqp channel is closed")
ErrFailedToPublishChannelClosed occurs when the channel accessed but is closed.
var ErrHandlerNotRegistered = errors.New("handler not registered")
ErrErrHandlerNotRegistered is returned when calling RemoveHandler with a handler that is not registered.
Functions ¶
func NewContextWithNumRetries ¶ added in v0.4.1
NewContextWithNumRetries sets the retries value to use in the context. The number of retries is used to determine how often an event has failed to be handled.
func NumRetriesFromContext ¶ added in v0.4.1
NumRetriesFromContext returns the number of retries from the context, or zero.
Types ¶
type EventBus ¶
type EventBus struct {
// contains filtered or unexported fields
}
EventBus is a local event bus that delegates handling of published events to all matching registered handlers, in order of registration.
func NewEventBus ¶
func NewEventBus(addr, appID, clientID, exchange, topic string, options ...Option) (*EventBus, error)
NewEventBus creates an EventBus, with optional settings.
func (*EventBus) AddHandler ¶
func (b *EventBus) AddHandler(ctx context.Context, matcher eh.EventMatcher, eventHandler eh.EventHandler) error
AddHandler implements the AddHandler method of the eventhorizon.EventBus interface.
func (*EventBus) AddHandlerWithOptions ¶ added in v1.6.0
func (b *EventBus) AddHandlerWithOptions(ctx context.Context, matcher eh.EventMatcher, eventHandler eh.EventHandler, options ...HandlerOption) error
AddHandlerWithOptions adds a new eventhorizon.Eventhandler with options.
func (*EventBus) Errors ¶
Errors implements the Errors method of the eventhorizon.EventBus interface.
func (*EventBus) HandleEvent ¶
HandleEvent implements the HandleEvent method of the eventhorizon.EventHandler interface.
func (*EventBus) HandlerType ¶
func (*EventBus) HandlerType() eh.EventHandlerType
HandlerType implements the HandlerType method of the eventhorizon.EventHandler interface.
func (*EventBus) PublishEvent ¶ added in v0.6.0
PublishEvent publishes an event. Same as HandleEvent, but with better naming.
func (*EventBus) PublishEventWithOptions ¶ added in v1.6.0
func (b *EventBus) PublishEventWithOptions(ctx context.Context, event eh.Event, options ...PublishOption) error
PublishEventWithTopic publishes an event with options.
func (*EventBus) RegisteredHandlers ¶ added in v1.3.0
func (b *EventBus) RegisteredHandlers() []eh.EventHandlerType
RegisteredHandlers returns a slice of all registered handler types.
func (*EventBus) RemoveHandler ¶ added in v1.3.0
func (b *EventBus) RemoveHandler(handlerType eh.EventHandlerType) error
RemoveHandler removes a handler from the event bus by type.
type EventBusError ¶ added in v1.1.0
type EventBusError struct { eh.EventBusError HandlerType eh.EventHandlerType }
EventBusError is an async error containing the error returned from a handler and the event that it happened on. Its a wrapper around the eventhorizon.EventBusError with extra information about the handler.
type HandlerOption ¶ added in v1.6.0
type HandlerOption func(*handler)
HandlerOption is a handler option.
func WithHandlerExchange ¶ added in v1.6.0
func WithHandlerExchange(name string) HandlerOption
WithHandlerExchange is an option to set the handler exchange.
func WithHandlerTopic ¶ added in v1.6.0
func WithHandlerTopic(topic string) HandlerOption
WithHandlerTopic is an option to set the handler topic.
type MaxRetriesExceededHandler ¶ added in v1.5.0
MaxRetriesExceededHandler is a function that is called when the maximum number of retries has been reached.
type Option ¶
type Option func(*EventBus)
Option is an option setter used to configure creation.
func WithClariMQConnections ¶ added in v1.0.0
func WithClariMQConnections(publishingConn *clarimq.Connection, consumeConn *clarimq.Connection) Option
WithClariMQConnections sets the connections used for publishing and consuming events.
func WithClariMQPublishingCache ¶ added in v0.9.0
func WithClariMQPublishingCache(publishingCache clarimq.PublishingCache) Option
WithClariMQPublishingCache enables caching events that failed to be published.
func WithConsumerQuantity ¶ added in v1.3.2
WithConsumerQuantity sets the number of concurrent consumers.
func WithEventCodec ¶ added in v0.9.0
func WithEventCodec(codec eh.EventCodec) Option
WithEventCodec uses the specified codec for encoding events.
func WithLogging ¶ added in v0.9.0
WithLogging enables logging to the given loggers.
func WithMaxRecoveryRetry ¶ added in v1.0.0
WithMaxRecoveryRetry sets the max count for recovery retries.
Default: Infinite.
func WithRetry ¶ added in v0.4.0
func WithRetry(maxRetries int64, delays []time.Duration, handler MaxRetriesExceededHandler) Option
WithRetry enables event retries. If maxRetries is bigger than the number of delays provided, it will use the last value until maxRetries has been reached. Use InfiniteRetries to never drop the message.
Default maxRetries is Infinite.
type PublishOption ¶ added in v1.6.0
type PublishOption func(*publishOptions)
PublishOption is a publish option.
func WithPublishingExchange ¶ added in v1.6.0
func WithPublishingExchange(name string) PublishOption
WithPublishingExchange is an option to set the publishing exchange.
func WithPublishingTopic ¶ added in v1.6.0
func WithPublishingTopic(topic string) PublishOption
WithPublishingTopic is an option to set the publishing topic.
type RecoveryFailedError ¶ added in v0.9.0
type RecoveryFailedError struct {
// contains filtered or unexported fields
}
ErrRecoveryFailed occurs when the recovery failed after a connection loss.
func (*RecoveryFailedError) ConnectionName ¶ added in v1.2.0
func (e *RecoveryFailedError) ConnectionName() string
ConnectionName returns the name of the connection that failed to recover.
func (*RecoveryFailedError) Error ¶ added in v0.9.0
func (e *RecoveryFailedError) Error() string
Error implements the Error method of the error interface.