kafka

package
v0.0.0-...-66bed6f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 24, 2018 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

View Source
const OffsetNewest = sarama.OffsetNewest
View Source
const OffsetOldest = sarama.OffsetOldest

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer interface {

	// Messages returns the message channel for the consumer
	// it is the channel where new kafka massages from the topic will go to
	Messages() <-chan *ConsumerMessage

	// MarkOffset can be used to mark the last processed message
	MarkOffset(msg *ConsumerMessage)

	// MarkLatestOffset can be used to update the topic offsets for the consumer group to the latest offsets
	MarkLatestOffset(topic string)

	// Close can be used to cleanly shut down the consumer
	Close()
}

func MustSetupConsumer

func MustSetupConsumer(config ConsumerConfig) Consumer

MustSetupConsumer creates a consumer for the config.InputTopics on config.InputBrokers

func SetupConsumer

func SetupConsumer(config ConsumerConfig) (Consumer, error)

MustSetupConsumer creates a consumer for the config.InputTopics on config.InputBrokers

type ConsumerConfig

type ConsumerConfig struct {

	// Brokers is a mandatory comma separated list of kafka nodes with host and ip, eg "172.0.0.1:9092,172.0.0.2:9092"
	Brokers string
	// ConsumerGroup is a mandatory name for the consumer group that is being used by Kafka
	ConsumerGroup string
	// Topics is a mandatory comma separated list of topics from which to consume the messages
	Topics string

	// ConsumerTimeout is the amount of seconds before a consumer poll times out, default is 100
	SessionTimeout int

	// BufferSize is the size of the channel on which the kafka messages are loaded, default is 0 (no buffering)
	BufferSize int

	// InitialOffset is used if no offset was previously committed.
	// Should be OffsetNewest or OffsetOldest. Defaults to OffsetOldest.
	InitialOffset int64

	// OnError is an optional error handler. By default all errors are logged, but ignored
	OnError func(error)

	// OnNotification is an optional notification handler. By default all notifications are logged, but ignored
	OnNotification func(*cluster.Notification)

	// MetricsRegistry to define metrics into.
	// If you want to disable metrics gathering, set "metrics.UseNilMetrics" to "true"
	MetricsRegistry metrics.Registry

	ParitionStrategy cluster.Strategy
}

ConsumerConfig contains the different configuration options for the consumer

type ConsumerMessage

type ConsumerMessage struct {
	Key, Value []byte
	Topic      string
	Partition  int32
	Offset     int64
	Timestamp  time.Time
}

ConsumerMessage is

type MockConsumer

type MockConsumer struct {
	mock.Mock
	Consumer
}

func (*MockConsumer) Close

func (m *MockConsumer) Close()

func (*MockConsumer) MarkLatestOffset

func (m *MockConsumer) MarkLatestOffset(topic string)

func (*MockConsumer) MarkOffset

func (m *MockConsumer) MarkOffset(msg *ConsumerMessage)

func (*MockConsumer) Messages

func (m *MockConsumer) Messages() <-chan *ConsumerMessage

type MockProducer

type MockProducer struct {
	mock.Mock
	Producer
}

func (*MockProducer) Close

func (m *MockProducer) Close()

func (*MockProducer) Send

func (m *MockProducer) Send(t string, msgs []ProducerMessage) error

type Producer

type Producer interface {
	Send(topic string, values []ProducerMessage) error
	SendBytes(topic string, values [][]byte) error
	Close()
}

func MustSetupProducer

func MustSetupProducer(brokers string) Producer

mustSetupProducer creates a producer for the kafka queues on brokers brokers is a comma separated list of kafka nodes with host and ip, eg "172.0.0.1:9092,172.0.0.2:9092"

func MustSetupProducerWithSerializer

func MustSetupProducerWithSerializer(brokers string, serializer Serializer) Producer

func SetupProducer

func SetupProducer(brokers string, serializer Serializer) (Producer, error)

mustSetupProducer creates a producer for the kafka queues on brokers brokers is a comma separated list of kafka nodes with host and ip, eg "172.0.0.1:9092,172.0.0.2:9092"

type ProducerMessage

type ProducerMessage interface {
	// getKey returns the key for the kafka message, this can be set for each message type separately
	GetKey() string
}

ProducerMessage is to be implemented by all messages to be send through this library

type Serializer

type Serializer func(ProducerMessage) ([]byte, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL