consumer

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 17, 2019 License: MIT Imports: 8 Imported by: 0

Documentation

Overview

Package consumer is Felice's primary entrance point for receiving messages from a Kafka cluster.

The best way to create a Consumer is using the New function. You have to pass a non nil Config otherwise New will return an error.

Note: NewConfig returns a Config with sane defaults.

c, err := consumer.New(consumer.NewConfig("client-id"))

Once you've constructed a consumer you must add message handlers to it. This is done by calling the Consumer.Handle method. Each time you call Handle you'll pass a topic name, a message converter and a type that implements the Handler interface. There can only ever be one handler associated with a topic so, if you call Handle multiple times with the same topic, only the final handler will be registered.

The Handler interface defines the signature for felice message handlers.

type myHandler struct {}

// HandleMessage implements consumer.MessageHandler.
func (h myHandler) HandleMessage(ctx context.Context, m *consumer.Message) error {
	// Do something of your choice here!
	return nil // .. or return an actual error.
}

c.Handle("testmsg", consumer.MessageConverterV1(nil),  myHandler{})

Once you've registered all your handlers you should call Consumer.Serve.

Serve will start consuming messages and pass them to their per-topic handlers. Serve itself will block until Consumer.Close is called. When Serve terminates it will return an error, which will be nil under normal circumstances.

Note that any calls to Consumer.Handle after Consumer.Serve has been called will panic.

Handlers are responsible for doing any appropriate retries (for example because of a temporary network outage). When doing this, a handler should return as soon as possible if the context is cancelled. This will happen when the Consumer is shutting down.

For example:

type FooHandler struct { }

func (FooHandler) HandleMessage(ctx context.Context, m *Message) error {
	tryCount := 0
	for {
		tryCount++
		err := tryAction(ctx, m)
		if err == nil || tryCount > 10 {
			return err
		}
		// Wait a while before trying again.
		select{
		case <-time.After(time.Second):
		case <-ctx.Done():
			// The context has been cancelled.
			return ctx.Err()
		}
	}
}

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// Config holds the configuration used to create the consumer
	// group instance. It must be non-nil.
	*sarama.Config

	// KafkaAddrs holds the kafka broker addresses in host:port
	// format. There must be at least one entry in the slice.
	KafkaAddrs []string

	// If non-nil, Discarded is called when a message handler
	// returns an error.
	Discarded func(m *sarama.ConsumerMessage, err error)
}

Config is used to configure the Consumer. It should be created with NewConfig before any custom configuration settings are applied.

func NewConfig

func NewConfig(clientID string, addrs ...string) Config

NewConfig returns a configuration filled in with default values.

If addrs is empty, KafkaAddrs will default to localhost:9092.

The clientID is used to form the consumer group name (clientID + "-consumer-group").

func (Config) Validate added in v1.0.0

func (c Config) Validate() error

Validate validates the configuration.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer represents a Kafka message consumer. See the package documentation and New for information on how to use it.

func New added in v1.0.0

func New(cfg Config) (*Consumer, error)

New returns a new consumer with the given configuration. Use the Handle method to register handlers for all subscribed topics, then call Serve to begin consuming messages.

The Consumer must be closed with the Close method after use.

func (*Consumer) Close added in v1.0.0

func (c *Consumer) Close() error

Close closes the consumer and synchronously shuts down the running Serve call if any, which will return with no error.

func (*Consumer) Handle

func (c *Consumer) Handle(topic string, converter MessageConverter, h Handler)

Handle registers the handler for the given topic. It will panic if it is called after Serve or the topic is empty.

func (*Consumer) Serve

func (c *Consumer) Serve(ctx context.Context) error

Serve runs the consumer and listens for new messages on the given topics. Serve will block until it is instructed to stop, which you can achieve by calling Close.

Serve will return an error if it is called more than once, if the Consumer has been closed, or if no handlers have been registered, or if the consumer failed with an error.

It will not return an error if Close was called and the consumer shut down cleanly.

type Handler

type Handler interface {
	// HandleMessage handles the given message. It is responsible
	// for any retries necessary, and should abort as soon as possible
	// when the context is done.
	// The message will be marked as handled even if an error is returned.
	HandleMessage(context.Context, *Message) error
}

Handler is the interface for handling consumed messages. See HandlerFunc for a function-based implementation. Note that felice Handlers receive felice Message types.

type Message

type Message struct {
	// The Kafka topic this message applies to.
	Topic string

	// Key on which this message was sent to.
	Key codec.Decoder

	// Body of the Kafka message.
	Body codec.Decoder

	// The time at which this Message was produced.
	ProducedAt time.Time

	// Partition where this publication was stored.
	Partition int32

	// Offset where this publication was stored.
	Offset int64

	// Headers of the message.
	Headers map[string]string

	// Unique ID of the message.
	ID string
	// contains filtered or unexported fields
}

Message represents a message received from Kafka. When using Felice's Consumer, any Handlers that you register will receive Messages as their arguments.

func (*Message) HighWaterMarkOffset added in v1.0.0

func (m *Message) HighWaterMarkOffset() int64

HighWaterMarkOffset returns the high water mark offset of the partition, i.e. the offset that will be used for the next message that will be produced. You can use this to determine how far behind the processing is.

type MessageConverter

type MessageConverter interface {
	FromKafka(*sarama.ConsumerMessage) (*Message, error)
}

A MessageConverter transforms a sarama.ConsumerMessage into a Message. The role of the converter is to decouple the conventions defined by users from the consumer. Each converter defines the way it wants to decode metadata, headers and body from the message received from Kafka and returns a format agnostic Message structure.

func MessageConverterV1

func MessageConverterV1(keyCodec codec.Codec) MessageConverter

MessageConverterV1 is the first version of the default converter. It converts messages formatted using the producer.MessageConverterV1. The headers are extracted from Kafka headers and the body is decoded from JSON. If the Message-Id and Produced-At headers are found, they will automatically be added to the ID and ProducedAt fields.

The keyCodec parameter is used to decode partition keys. If it's nil, codec.String() will be used.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL