consumer

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 21, 2020 License: MIT Imports: 17 Imported by: 0

Documentation

Overview

Package consumer is a generated protocol buffer package.

It is generated from these files:

dlqMetadata.proto

It has these top-level messages:

DLQMetadata

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewSaramaClient

func NewSaramaClient(brokers []string, config *sarama.Config) (sarama.Client, error)

NewSaramaClient returns an internal sarama Client, which can be safely closed multiple times.

func NewSaramaProducer

func NewSaramaProducer(client sarama.Client) (sarama.AsyncProducer, error)

NewSaramaProducer returns a new AsyncProducer that has Close method that can be called multiple times.

Types

type ClusterConsumer

type ClusterConsumer struct {
	// contains filtered or unexported fields
}

ClusterConsumer is a consumer for a single Kafka cluster.

func NewClusterConsumer

func NewClusterConsumer(
	cluster string,
	saramaConsumer SaramaConsumer,
	consumerMap map[string]*TopicConsumer,
	scope tally.Scope,
	logger *zap.Logger,
) *ClusterConsumer

NewClusterConsumer returns a new single cluster consumer.

func (*ClusterConsumer) Closed

func (c *ClusterConsumer) Closed() <-chan struct{}

Closed returns a channel which will closed after this consumer is shutdown

func (*ClusterConsumer) ResetOffset

func (c *ClusterConsumer) ResetOffset(topic string, partition int32, offsetRange kafka.OffsetRange) error

ResetOffset will reset the consumer offset for the specified topic, partition.

func (*ClusterConsumer) Start

func (c *ClusterConsumer) Start() error

Start starts the consumer

func (*ClusterConsumer) Stop

func (c *ClusterConsumer) Stop()

Stop stops the consumer

type ClusterGroup

type ClusterGroup struct {
	Cluster string
	Group   string
}

ClusterGroup wrappers a Cluster

type Constructors

type Constructors struct {
	NewSaramaProducer func(sarama.Client) (sarama.AsyncProducer, error)
	NewSaramaConsumer func([]string, string, []string, *cluster.Config) (SaramaConsumer, error)
	NewSaramaClient   func([]string, *sarama.Config) (sarama.Client, error)
}

Constructors wraps multiple Sarama Constructors, which can be used for tests.

type DLQ

type DLQ interface {
	// Start the DLQ producer
	Start() error
	// Stop the DLQ producer and close resources it holds.
	Stop()
	// Add adds the given message to DLQ.
	// This is a synchronous call and will block until sending is successful.
	Add(m kafka.Message, qTypes ...ErrorQType) error
}

DLQ is the interface for implementations that can take a message and put them into some sort of error queue for later processing

func NewBufferedDLQ

func NewBufferedDLQ(topic kafka.Topic, producer sarama.AsyncProducer, scope tally.Scope, logger *zap.Logger) DLQ

NewBufferedDLQ returns a DLQ that is backed by a buffered async sarama producer.

func NewNoopDLQ

func NewNoopDLQ() DLQ

NewNoopDLQ returns returns a noop DLQ.

func NewRetryDLQMultiplexer

func NewRetryDLQMultiplexer(retryTopic, dlqTopic DLQ, threshold int64) DLQ

NewRetryDLQMultiplexer returns a DLQ that will produce messages to retryTopic or dlqTopic depending on the threshold.

Messages that are added to this DLQ will be sent to retryTopic if the retry count of the message is < the threshold. Else, it will go to the dlqTopic.

type DLQMetadata

type DLQMetadata struct {
	// retry_count is an incrementing value denoting the number
	// of times a message has been redelivered.
	// It will be 0 on first delivery.
	RetryCount int64 `protobuf:"varint,1,opt,name=retry_count,json=retryCount" json:"retry_count,omitempty"`
	// topic is the original kafka topic the mesasge was received on.
	// This is analogous to the logical topic name.
	Topic string `protobuf:"bytes,2,opt,name=topic" json:"topic,omitempty"`
	// partition is the original kafka partition the message was received on.
	Partition int32 `protobuf:"varint,3,opt,name=partition" json:"partition,omitempty"`
	// offset is the record offset of the original message in the original topic-partition.
	Offset int64 `protobuf:"varint,4,opt,name=offset" json:"offset,omitempty"`
	// timestamp_ns is the original record timestamp of the original mesage.
	TimestampNs int64 `protobuf:"varint,5,opt,name=timestamp_ns,json=timestampNs" json:"timestamp_ns,omitempty"`
	// data is a byte buffer for storing arbitrary information.
	// This is useful if the Kafka Broker version used is < 0.11
	// and hence Kafka native record headers (KAFKA-4208) are unavaiable
	// so the DLQ metadata must be stored in the record Key or Value.
	Data []byte `protobuf:"bytes,6,opt,name=data,proto3" json:"data,omitempty"`
}

DLQMetadata contains metadata from the original kafka message. The metadata will be encoded and decoded when sending or receiving messages from the DLQ cluster in order to present the library user a seamless logical topic.

func NoopDLQMetadataDecoder

func NoopDLQMetadataDecoder(b []byte) (*DLQMetadata, error)

NoopDLQMetadataDecoder does no decoding and returns a default DLQMetadata object.

func ProtobufDLQMetadataDecoder

func ProtobufDLQMetadataDecoder(b []byte) (*DLQMetadata, error)

ProtobufDLQMetadataDecoder uses proto.Unmarshal to decode protobuf encoded binary into the DLQMetadata object.

func (*DLQMetadata) Descriptor

func (*DLQMetadata) Descriptor() ([]byte, []int)

func (*DLQMetadata) GetData

func (m *DLQMetadata) GetData() []byte

func (*DLQMetadata) GetOffset

func (m *DLQMetadata) GetOffset() int64

func (*DLQMetadata) GetPartition

func (m *DLQMetadata) GetPartition() int32

func (*DLQMetadata) GetRetryCount

func (m *DLQMetadata) GetRetryCount() int64

func (*DLQMetadata) GetTimestampNs

func (m *DLQMetadata) GetTimestampNs() int64

func (*DLQMetadata) GetTopic

func (m *DLQMetadata) GetTopic() string

func (*DLQMetadata) ProtoMessage

func (*DLQMetadata) ProtoMessage()

func (*DLQMetadata) Reset

func (m *DLQMetadata) Reset()

func (*DLQMetadata) String

func (m *DLQMetadata) String() string

type DLQMetadataDecoder

type DLQMetadataDecoder func([]byte) (*DLQMetadata, error)

DLQMetadataDecoder decodes a byte array into DLQMetadata.

type ErrorQType

type ErrorQType string

ErrorQType is the queue type to send messages to when using the DLQ interface.

var (

	// RetryQErrorQType is the error queue for the retryQ.
	RetryQErrorQType ErrorQType = "retryQ"
	// DLQErrorQType is the error queue for DLQ.
	DLQErrorQType ErrorQType = "DLQ"

	// DLQConsumerGroupNameSuffix is the consumer group name used by the DLQ merge process.
	DLQConsumerGroupNameSuffix = "-dlq-merger"
)

type Message

type Message struct {
	// contains filtered or unexported fields
}

Message is a wrapper around kafka consumer message

func (*Message) Ack

func (m *Message) Ack() error

Ack acknowledges the message

func (*Message) Key

func (m *Message) Key() (key []byte)

Key is a mutable reference to the message's key

func (*Message) MarshalLogObject

func (m *Message) MarshalLogObject(e zapcore.ObjectEncoder) error

MarshalLogObject implements zapcore.ObjectMarshaler for structured logging.

func (*Message) Nack

func (m *Message) Nack() error

Nack negatively acknowledges the message also moves the message to a DLQ if the consumer has a dlq configured. This method will *block* until enqueue to the dlq succeeds

func (*Message) NackToDLQ

func (m *Message) NackToDLQ() error

NackToDLQ negatively acknowledges the message by sending it directly to the DLQ. This method will *block* until enqueue to the dlq succeeds

func (*Message) Offset

func (m *Message) Offset() int64

Offset is the message's offset.

func (*Message) Partition

func (m *Message) Partition() int32

Partition is the ID of the partition from which the message was read

func (*Message) RetryCount

func (m *Message) RetryCount() int64

RetryCount returns the number of times this message has be retried.

func (*Message) Timestamp

func (m *Message) Timestamp() time.Time

Timestamp returns the timestamp for this message

func (*Message) Topic

func (m *Message) Topic() string

Topic is the topic from which the message was read

func (*Message) Value

func (m *Message) Value() []byte

Value is a mutable reference to the message's value

type MultiClusterConsumer

type MultiClusterConsumer struct {
	// contains filtered or unexported fields
}

MultiClusterConsumer is a map that contains multiple kafka consumers

func NewMultiClusterConsumer

func NewMultiClusterConsumer(
	groupName string,
	topics kafka.ConsumerTopicList,
	clusterConsumerMap map[ClusterGroup]*ClusterConsumer,
	saramaClients map[ClusterGroup]sarama.Client,
	msgC chan kafka.Message,
	scope tally.Scope,
	logger *zap.Logger,
) *MultiClusterConsumer

NewMultiClusterConsumer returns a new consumer that consumes messages from multiple Kafka clusters.

func (*MultiClusterConsumer) Closed

func (c *MultiClusterConsumer) Closed() <-chan struct{}

Closed returns a channel that will be closed when the consumer is closed.

func (*MultiClusterConsumer) MergeDLQ

func (c *MultiClusterConsumer) MergeDLQ(cluster, group, topic string, partition int32, offsetRange kafka.OffsetRange) error

MergeDLQ will merge the offset range for each partition of the DLQ topic for the specified ConsumerTopic. Topic should be the DLQ topic (with __dlq).

func (*MultiClusterConsumer) Messages

func (c *MultiClusterConsumer) Messages() <-chan kafka.Message

Messages returns a channel to receive messages on.

func (*MultiClusterConsumer) Name

func (c *MultiClusterConsumer) Name() string

Name returns the consumer group name used by this consumer.

func (*MultiClusterConsumer) ResetOffset

func (c *MultiClusterConsumer) ResetOffset(cluster, group, topic string, partition int32, offsetRange kafka.OffsetRange) error

ResetOffset will reset the consumer offset for the specified cluster, topic, partition.

func (*MultiClusterConsumer) Start

func (c *MultiClusterConsumer) Start() error

Start will fail to start if there is any clusterConsumer that fails.

func (*MultiClusterConsumer) Stop

func (c *MultiClusterConsumer) Stop()

Stop will stop the consumer.

func (*MultiClusterConsumer) Topics

Topics returns a list of topics this consumer is consuming from.

type Options

type Options struct {
	ClientID               string // client ID
	RcvBufferSize          int    // aggregate message buffer size
	PartitionRcvBufferSize int    // message buffer size for each partition
	Concurrency            int    // number of goroutines that will concurrently process messages
	OffsetPolicy           int64
	OffsetCommitInterval   time.Duration
	RebalanceDwellTime     time.Duration
	MaxProcessingTime      time.Duration // amount of time a partitioned consumer will wait during a drain
	ConsumerMode           cluster.ConsumerMode
	ProducerMaxMessageByes int
	FetchDefaultBytes      int32
	OtherConsumerTopics    []Topic
	TLSConfig              *tls.Config       // TLSConfig is the configuration to use for secure connections, not nil -> enable, nil -> disabled
	SASLConfig             *kafka.SASLConfig // SASLConfig is the configuration to use for SASL based auth
}

Options are the tunable and injectable options for the consumer

func DefaultOptions

func DefaultOptions() *Options

DefaultOptions returns the default options

type PartitionConsumer

type PartitionConsumer interface {
	Start() error
	Stop()
	Drain(time.Duration)
	ResetOffset(kafka.OffsetRange) error
}

PartitionConsumer is the consumer for a specific kafka partition

func NewPartitionConsumer

func NewPartitionConsumer(
	topic Topic,
	sarama SaramaConsumer,
	pConsumer cluster.PartitionConsumer,
	options *Options,
	msgCh chan kafka.Message,
	dlq DLQ,
	scope tally.Scope,
	logger *zap.Logger) PartitionConsumer

NewPartitionConsumer returns a kafka consumer that can read messages from a given [ topic, partition ] tuple

func NewPartitionConsumerWithoutCommit

func NewPartitionConsumerWithoutCommit(
	topic Topic,
	sarama SaramaConsumer,
	pConsumer cluster.PartitionConsumer,
	options *Options,
	msgCh chan kafka.Message,
	dlq DLQ,
	scope tally.Scope,
	logger *zap.Logger) PartitionConsumer

NewPartitionConsumerWithoutCommit returns a kafka consumer that can read messages from a given [ topic, partition ] tuple where commits are disabled.

func NewRangePartitionConsumer

func NewRangePartitionConsumer(
	topic Topic,
	sarama SaramaConsumer,
	pConsumer cluster.PartitionConsumer,
	options *Options,
	msgCh chan kafka.Message,
	dlq DLQ,
	scope tally.Scope,
	logger *zap.Logger) PartitionConsumer

NewRangePartitionConsumer returns a kafka consumer that can read messages from a given [ topic, partition ] tuple. Commits are always enabled.

type PartitionConsumerFactory

type PartitionConsumerFactory func(
	topic Topic,
	sarama SaramaConsumer,
	pConsumer cluster.PartitionConsumer,
	options *Options,
	msgCh chan kafka.Message,
	dlq DLQ,
	scope tally.Scope,
	logger *zap.Logger) PartitionConsumer

PartitionConsumerFactory is a factory method for returning PartitionConsumer. NewPartitionConsumer returns an unbounded partition consumer. NewRangePartitionConsumer returns a range partition consumer.

type SaramaConsumer

type SaramaConsumer interface {
	Close() error
	Errors() <-chan error
	Notifications() <-chan *cluster.Notification
	Partitions() <-chan cluster.PartitionConsumer
	CommitOffsets() error
	Messages() <-chan *sarama.ConsumerMessage
	HighWaterMarks() map[string]map[int32]int64
	MarkOffset(msg *sarama.ConsumerMessage, metadata string)
	MarkPartitionOffset(topic string, partition int32, offset int64, metadata string)
	ResetPartitionOffset(topic string, partition int32, offset int64, metadata string)
}

SaramaConsumer is an interface for external consumer library (sarama)

func NewSaramaConsumer

func NewSaramaConsumer(brokers []string, groupID string, topics []string, config *cluster.Config) (SaramaConsumer, error)

NewSaramaConsumer returns a new SaramaConsumer that has a Close method that can be called multiple times.

type Topic

type Topic struct {
	kafka.ConsumerTopic
	DLQMetadataDecoder
	PartitionConsumerFactory
	ConsumerGroupSuffix string
}

Topic is an internal wrapper around kafka.ConsumerTopic

func (Topic) MarshalLogObject

func (t Topic) MarshalLogObject(e zapcore.ObjectEncoder) error

MarshalLogObject implements zapcore.ObjectMarshaler for structured logging.

type TopicConsumer

type TopicConsumer struct {
	// contains filtered or unexported fields
}

TopicConsumer is a consumer for a specific topic. TopicConsumer is an abstraction that runs on the same goroutine as the cluster consumer.

func NewTopicConsumer

func NewTopicConsumer(
	topic Topic,
	msgC chan kafka.Message,
	consumer SaramaConsumer,
	dlq DLQ,
	options *Options,
	scope tally.Scope,
	logger *zap.Logger,
) *TopicConsumer

NewTopicConsumer returns a new TopicConsumer for consuming from a single topic.

func (*TopicConsumer) ResetOffset

func (c *TopicConsumer) ResetOffset(partition int32, offsetRange kafka.OffsetRange) error

ResetOffset will reset the consumer offset for the specified topic, partition.

func (*TopicConsumer) Start

func (c *TopicConsumer) Start() error

Start the DLQ consumer goroutine.

func (*TopicConsumer) Stop

func (c *TopicConsumer) Stop()

Stop shutdown and frees the resource held by this TopicConsumer and stops the batch DLQ producer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL