Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MessageOpqaue

func MessageOpqaue(m *Message) interface{}

MessageOpqaue returns an interface object, which can be used by the same client implementations to handle any message logic specific to that implementation.

func SetMessageOpaque

func SetMessageOpaque(m *Message, o interface{}) error

SetMessageOpaque is a semi-private function. It is used by stream client implementations to set the private `opaque` field on a message. This field contains extra data only relevant to that client implementation. This field can only be set once (and is set by the stream clients), so calling this function outside of its intended purpose will result in an error.

Types

type Consumer

type Consumer interface {
	ErrorCloser

	// Messages is a read-only channel on which the consumer delivers any messages
	// being read from the stream.
	//
	// The channel returns each message as a `stream.Message` value object.
	Messages() <-chan Message

	// Ack can be used to acknowledge that a message was processed and should not
	// be delivered again.
	Ack(Message) error

	// Nack is the opposite of `Ack`. It can be used to indicate that a message
	// was _not_ processed, and should be delivered again in the future.
	Nack(Message) error

	// Backlog returns an integer, indicating the number of messages still to be
	// consumed by the consumer. If the stream client implementation does not have
	// the concept of stream persistence, or consumer identity, this will always
	// return `0`. An error is returned if the backlog could not be determined.
	Backlog() (int, error)

	// Config returns the final configuration used by the consumer as an
	// interface. To access the configuration, cast the interface to a
	// `streamconfig.Consumer` struct.
	Config() interface{}
}

Consumer interface to be implemented by different stream clients.

type ConsumerMock

type ConsumerMock struct {
	Configuration interface{}
	MessagesChan  chan Message
	ErrorsChan    chan error
}

ConsumerMock is a mock implementation of the Consumer interface

func (*ConsumerMock) Ack

func (c *ConsumerMock) Ack(_ Message) error

Ack implements the Consumer interface for ConsumerMock.

func (ConsumerMock) Backlog

func (c ConsumerMock) Backlog() (int, error)

Backlog implements the Consumer interface for ConsumerMock.

func (*ConsumerMock) Close

func (c *ConsumerMock) Close() error

Close implements the Consumer interface for ConsumerMock.

func (ConsumerMock) Config

func (c ConsumerMock) Config() interface{}

Config implements the Consumer interface for ConsumerMock.

func (*ConsumerMock) Errors

func (c *ConsumerMock) Errors() <-chan error

Errors implements the Consumer interface for ConsumerMock.

func (*ConsumerMock) Messages

func (c *ConsumerMock) Messages() <-chan Message

Messages implements the Consumer interface for ConsumerMock.

func (*ConsumerMock) Nack

func (c *ConsumerMock) Nack(_ Message) error

Nack implements the Consumer interface for ConsumerMock.

type ErrorCloser

type ErrorCloser interface {
	// Errors is a read-only channel on which the consumer or producer delivers
	// any errors that occurred while consuming from, or producing to the stream.
	Errors() <-chan error

	// Close closes the consumer or producer. After calling this method, the
	// consumer or producer is no longer in a usable state, and future method
	// calls can result in panics.
	//
	// Check the specific implementations to know what happens when calling close,
	// but in general any active connection to the message stream is terminated
	// and the messages channel is closed.
	Close() error
}

ErrorCloser interface contains a shared subset of methods between consumers and producers. This subset can be used to collectively listen to errors from any of the configured stream consumers or producers, and close them all when one triggers an error.

type Message

type Message struct {
	// Value is the actual body of the message. All stream clients use this field
	// to handle the message.
	Value []byte

	// Key is the identifier of the message. Not all stream client implementations
	// have a specific need for this field. The implementations that do use this
	// field might behave differently based on the value of this field. For
	// example, Kafka will use this value to calculate the topic partition to
	// assign this message to. If you send a message with different properties,
	// but the same key, they will always end up on the same partition.
	Key []byte

	// Timestamp can be used to order messages, if so desired. Not all stream
	// client implementations use this field, and those that do might behave
	// differently based on the value of this field.
	Timestamp time.Time

	// Tags is a set of key/value labels assigned to a message. Not all stream
	// client implementations use this field, and those that do might behave
	// differently based on the value of this field.
	Tags map[string][]byte

	// ConsumerTopic can be used by stream client consumers to expose from where
	// the message originated. Not all stream client implementations use this
	// field, and those that do might behave differently based on the value of
	// this field.
	ConsumerTopic string

	// ProducerTopic can be used by the producers to dictate on which topic the
	// message should be produced. Not all stream client implementations use this
	// field, and those that do might behave differently based on the value of
	// this field.
	ProducerTopic string

	// Offset can be used by stream client implementations to relay the position
	// in a list of messages this message has. This is a read-only value, setting
	// this value on a new message has no effect. Not all stream client
	// implementations set this field, for those that don't, this field will
	// always be `nil`.
	Offset *int64
	// contains filtered or unexported fields
}

Message is what is passed around by the different consumers and producers.

func TestMessage

func TestMessage(_ testing.TB, k, v string) Message

TestMessage returns a new message with test data to be used during testing.

type Producer

type Producer interface {
	ErrorCloser

	// Messages is a write-only channel on which you can deliver any messages that
	// need to be produced on the message stream.
	//
	// The channel accepts `stream.Message` value objects.
	Messages() chan<- Message

	// Config returns the final configuration used by the producer as an
	// interface. To access the configuration, cast the interface to a
	// `streamconfig.Producer` struct.
	Config() interface{}
}

Producer interface to be implemented by different stream clients.

type ProducerMock

type ProducerMock struct {
	Configuration interface{}
	MessagesChan  chan Message
	ErrorsChan    chan error
}

ProducerMock is a mock implementation of the Producer interface

func (*ProducerMock) Close

func (p *ProducerMock) Close() error

Close implements the Producer interface for ProducerMock.

func (ProducerMock) Config

func (p ProducerMock) Config() interface{}

Config implements the Producer interface for ProducerMock.

func (*ProducerMock) Errors

func (p *ProducerMock) Errors() <-chan error

Errors implements the Producer interface for ProducerMock.

func (*ProducerMock) Messages

func (p *ProducerMock) Messages() chan<- Message

Messages implements the Producer interface for ProducerMock.

type Store

type Store interface {
	// Add stores a single `stream.Message` in the store.
	Add(Message) error

	// Del removes a single message from the store.
	Del(Message) error

	// Flush empties an entire store.
	Flush() error

	// Messages returns all the messages in the store.
	Messages() []Message
}

Store interface to be implemented by different stream stores. A stream store knows how to store and retrieve `Message's