Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func SubscribeNonBlocking ¶
func SubscribeNonBlocking(ctx context.Context, options SubscribeOptions, subscriber Subscriber, wg *sync.WaitGroup, consumerCount int) chan error
SubscribeNonBlocking allows you to start many consumers with the same SubscribeOptions, in a non-blocking way. If nothing is reading from errChan this function will be blocked and you will not be notified if any consumers weren't started.
Types ¶
type Message ¶
type Message interface {
// Type returns the type of the message.
Type() string
// Payload returns the message payload.
Payload() ([]byte, error)
// WithPayload validates and sets the given payload on the message.
WithPayload(payload []byte) error
}
Message is a message that can be published and consumed.
type ObservedPublisher ¶
type ObservedPublisher struct {
// contains filtered or unexported fields
}
ObservedPublisher can be used to test the way an application publishes messages.
func NewObservedPublisher ¶
func NewObservedPublisher() *ObservedPublisher
NewObservedPublisher returns a new publisher that can be used when testing.
func (*ObservedPublisher) Clear ¶
func (p *ObservedPublisher) Clear() error
Clear clears out any stored messages.
func (*ObservedPublisher) Close ¶
func (p *ObservedPublisher) Close() error
Close performs no action.
func (*ObservedPublisher) Messages ¶
func (p *ObservedPublisher) Messages() []Message
Messages returns all stored messages.
func (*ObservedPublisher) Publish ¶
func (p *ObservedPublisher) Publish(m Message) error
Publish stores the given message internally so it can be retrieved later on.
type Publisher ¶
type Publisher interface {
// Publish publishes the given message.
// If an error is returned, the message has not been published.
Publish(m Message) error
// Close closes any open connections.
Close() error
}
Publisher can be used to publish messages.
func NewKafkaPublisher ¶
NewKafkaPublisher returns a Publisher that will publish messages to kafka.
type SubscribeOptions ¶
type SubscribeOptions struct {
// ConsumeFn is the function to handle the consumed messages.
ConsumeFn ConsumeFn
// A message will only be consumed once per group.
Group string
// Types is the set of messages types to subscribe the ConsumeFn to.
Types []string
// IgnoreErrors defines whether or not errors returned from ConsumeFn will be written to Errors.
// If this is false, a value must be provided for Errors.
IgnoreErrors bool
// Errors will receive any errors returned from ConsumeFn, if IgnoreErrors is false.
Errors chan<- error
}
SubscribeArgs contains a set of arguments used when Subscribing to Messages.
func (*SubscribeOptions) Validate ¶
func (x *SubscribeOptions) Validate() error
Validate makes sure we have a set of valid options and applies defaults.
type Subscriber ¶
type Subscriber interface {
// Subscribe starts a consumer with the given context.
//
// If an error is returned then the consumer has not been started, otherwise you should listen
// on the errChan and handle any consumer errors.
//
// The consumer will be stopped when the given context is cancelled.
Subscribe(ctx context.Context, options SubscribeOptions) error
}
Subscriber defines an interface that can be used to consume messages.
func NewKafkaSubscriber ¶
func NewKafkaSubscriber(brokers []string) Subscriber
NewKafkaSubscriber returns a Subscriber that will consume from kafka.