Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( ErrNilHandler = errors.New("handler function can not be nil") ErrHandlerNotAFunction = errors.New("provided handler is not a function") ErrHandlerInputLengthMissMatch = errors.New("handler function must have exactly three input arguments") ErrHandlerInputNoContext = errors.New("first argument of handler must be a context") ErrHandlerInputNoTopic = errors.New("second argument of handler must be a topic") ErrHandlerOutputLengthMissMatch = errors.New("handler must have exactly one output argument") ErrHandlerOutputNoError = errors.New("handler output must implements `error` interface") )
List of known errors for handler signature validation process.
Functions ¶
This section is empty.
Types ¶
type Broker ¶
type Broker interface { Publisher Subscriber Shutdowner }
Broker defines the interface for a pub/sub broker.
type Handler ¶
type Handler interface { // Deliver delivers the message to the handler. If handler does not // accept this kind of message, it should NOT return an error. Deliver(ctx context.Context, topic Topic, message interface{}) error // Reflect returns a description of the message type the handler is // interested in. Reflect() MessageReflection }
Handler represents a function capable of handling a message arriving at a topic.
func NewHandler ¶
func NewHandler(handlerFunc interface{}) Handler
NewHandler builds a new handler instance for the given function.
This function WILL PANIC if the given function does not match the signature `func (ctx context.Context, t pubsub.Topic, m <Type>) error`, e.g.:
- func (ctx context.Context, t pubsub.Topic, pointer *MyCustomStruct) error - func (ctx context.Context, t pubsub.Topic, customS MyCustomStruct) error - func (ctx context.Context, t pubsub.Topic, customI MyCustomInterface) error
Handlers should return an error if they're unable to properly handle a given message. The same handler can be used on multiple subscriptions. In the other hand, in order to increase Broker's throughput, is highly recommended designing each Broker in such a way that handling of each message is asynchronously, in a separated goroutine.
type MessageReflection ¶
type MessageReflection struct { // MessageType is the type of the message the handler is interested in, e.g. "myapp.MyMessage". MessageType reflect.Type // MessageKind is the kind of the type of the message, e.g. "struct", "interface", "ptr", etc. MessageKind reflect.Kind // Handler is the reflected handler function itself. Handler reflect.Value }
MessageReflection describes the message a handler is interested in.
func (MessageReflection) Accepts ¶
func (r MessageReflection) Accepts(m interface{}) bool
Accepts whether the handler accepts the provided message.
func (MessageReflection) String ¶ added in v1.2.1
func (r MessageReflection) String() string
String returns a string representation of the reflected message.
type Metadata ¶
type Metadata interface { // Get returns the value associated with this metadata for key, or nil if no // value is associated with key. Successive calls to Value with the same key // returns the same result. Get(key interface{}) interface{} // Set stores the given value under the specified key, overwrites if // already exists. Set(key, val interface{}) Metadata }
Metadata is the interface that wraps the basic Get and Set methods.
func NewMetadata ¶
func NewMetadata() Metadata
NewMetadata returns a new thread-safe Metadata instance which can be safely used by multiple goroutines.
type Publisher ¶ added in v1.2.0
type Publisher interface { // Publish the given message on the given topic. Publish(ctx context.Context, topic Topic, m interface{}) error }
Publisher is a convenience definition which extract topic-publishing behavior into an independent interface.
This is specially useful when needing to publish messages without having to expose the entire Broker implementation.
type Shutdowner ¶ added in v1.2.0
type Shutdowner interface { // Shutdown gracefully shutdowns all subscriptions. // // The provided context acts as a hard-limit cancellation for the shutdown // process. Shutdown(ctx context.Context) error }
Shutdowner provides a method that can manually trigger the shutdown of the Broker by gracefully closing each subscription.
Use this interface when you need to manually shutdown the Broker. Use with care, as it will prevent the broker from publishing or receiving any messages.
type StoppableSubscription ¶
type StoppableSubscription interface { Subscription // Context returns the internal context of this subscription which controls // its life cycle. This is usually branched from broker's internal context, // and allows implementing graceful shutdown mechanisms when broker decides // to stop. Context() context.Context // Stop is used to signal the subscription to stop. This is usually invoked // by the broker during graceful shutdown or during unsubscription // operations. Stop() }
StoppableSubscription represents a subscription that can be stopped.
Generally used by brokers implementations that relies on background running goroutines for handling subscriptions and message receptions from remote backends, e.g. Kafka, NATS, etc.
func NewStoppableSubscription ¶
func NewStoppableSubscription( ctx context.Context, id string, topic Topic, handler Handler, unsubscriber UnsubscribeFunc, options SubscribeOptions, ) (StoppableSubscription, error)
NewStoppableSubscription builds a new stoppable subscription. Given context should be the broker's internal context, this allows to implement graceful shutdown.
type SubscribeOption ¶
type SubscribeOption func(*SubscribeOptions)
SubscribeOption is a function that configures a SubscribeOptions instance.
func DisableAutoAck ¶
func DisableAutoAck() SubscribeOption
DisableAutoAck will disable auto ACKing of messages after they have been handled.
func SubscribeMetadata ¶
func SubscribeMetadata(meta Metadata) SubscribeOption
SubscribeMetadata set subscription metadata.
func WithGroup ¶
func WithGroup(name string) SubscribeOption
WithGroup sets the name of the group to share messages on.
type SubscribeOptions ¶
type SubscribeOptions struct { // Group subscriptions with the same group name will create a shared // subscription where each one receives a subset of the published messages. Group string // AutoAck defaults to true. When a handler returns with a nil error the // message is acknowledged. AutoAck bool // Metadata other options for concrete implementations of the Broker // interface can be stored as metadata. Metadata Metadata }
SubscribeOptions describes the options for a subscription action.
func NewSubscribeOptions ¶
func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions
NewSubscribeOptions convenience function for building a SubscribeOptions based on given list of options.
type Subscriber ¶ added in v1.2.0
type Subscriber interface { // Subscribe attaches the given handler to the given topic. // // The same Handler can be reused and attached multiple times to the same // or distinct topics. Subscribe(ctx context.Context, topic Topic, handler Handler, option ...SubscribeOption) (Subscription, error) }
Subscriber is a convenience definition which extract topic-subscription behavior into an independent interface.
This is specially useful when needing to subscribe to topics without having to expose the entire Broker implementation.
type Subscription ¶
type Subscription interface { // ID uniquely identifies the subscription. UUID algorithm, or similar, is // recommended. ID() string // Options returns the options used to create the subscription. Options() SubscribeOptions // Topic returns the topic the subscription is subscribed to. Topic() Topic // Unsubscribe unsubscribes. Unsubscribe() error // Handler returns the underlying Handler function this subscription will // use when receiving messages. Handler() Handler }
Subscription represents a Handler subscribed to a topic.
func NewSubscription ¶
func NewSubscription( id string, topic Topic, handler Handler, unsub UnsubscribeFunc, options SubscribeOptions, ) Subscription
NewSubscription convenience function for creating a new subscriptions.
type Topic ¶
type Topic string
Topic identifies a particular Topic on which messages can be published.
type UnsubscribeFunc ¶
type UnsubscribeFunc func() error
UnsubscribeFunc represents a function responsible for unsubscribing a subscription.
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
example
|
|
simple
command
|
|
middleware
|
|
retry
Package retry provides retry strategies for retry middleware.
|
Package retry provides retry strategies for retry middleware. |
provider
|
|
memory
Package memory provides a simple in-memory Broker, which moves messages using local memory.
|
Package memory provides a simple in-memory Broker, which moves messages using local memory. |
snssqs
Package snssqs provides a simple AWS SNS+SQS broker implementation.
|
Package snssqs provides a simple AWS SNS+SQS broker implementation. |