Documentation ¶
Index ¶
Constants ¶
const OffsetNewest = sarama.OffsetNewest
const OffsetOldest = sarama.OffsetOldest
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
type Consumer interface { // Messages returns the message channel for the consumer // it is the channel where new kafka massages from the topic will go to Messages() <-chan *ConsumerMessage // MarkOffset can be used to mark the last processed message MarkOffset(msg *ConsumerMessage) // MarkLatestOffset can be used to update the topic offsets for the consumer group to the latest offsets MarkLatestOffset(topic string) // Close can be used to cleanly shut down the consumer Close() }
func MustSetupConsumer ¶
func MustSetupConsumer(config ConsumerConfig) Consumer
MustSetupConsumer creates a consumer for the config.InputTopics on config.InputBrokers
func SetupConsumer ¶
func SetupConsumer(config ConsumerConfig) (Consumer, error)
MustSetupConsumer creates a consumer for the config.InputTopics on config.InputBrokers
type ConsumerConfig ¶
type ConsumerConfig struct { // Brokers is a mandatory comma separated list of kafka nodes with host and ip, eg "172.0.0.1:9092,172.0.0.2:9092" Brokers string // ConsumerGroup is a mandatory name for the consumer group that is being used by Kafka ConsumerGroup string // Topics is a mandatory comma separated list of topics from which to consume the messages Topics string // ConsumerTimeout is the amount of seconds before a consumer poll times out, default is 100 SessionTimeout int // BufferSize is the size of the channel on which the kafka messages are loaded, default is 0 (no buffering) BufferSize int // InitialOffset is used if no offset was previously committed. // Should be OffsetNewest or OffsetOldest. Defaults to OffsetOldest. InitialOffset int64 // OnError is an optional error handler. By default all errors are logged, but ignored OnError func(error) // OnNotification is an optional notification handler. By default all notifications are logged, but ignored OnNotification func(*cluster.Notification) // MetricsRegistry to define metrics into. // If you want to disable metrics gathering, set "metrics.UseNilMetrics" to "true" MetricsRegistry metrics.Registry ParitionStrategy cluster.Strategy }
ConsumerConfig contains the different configuration options for the consumer
type ConsumerMessage ¶
type ConsumerMessage struct {
Key, Value []byte
Topic string
Partition int32
Offset int64
Timestamp time.Time
}
ConsumerMessage is
type MockConsumer ¶
func (*MockConsumer) Close ¶
func (m *MockConsumer) Close()
func (*MockConsumer) MarkLatestOffset ¶
func (m *MockConsumer) MarkLatestOffset(topic string)
func (*MockConsumer) MarkOffset ¶
func (m *MockConsumer) MarkOffset(msg *ConsumerMessage)
func (*MockConsumer) Messages ¶
func (m *MockConsumer) Messages() <-chan *ConsumerMessage
type MockProducer ¶
func (*MockProducer) Close ¶
func (m *MockProducer) Close()
func (*MockProducer) Send ¶
func (m *MockProducer) Send(t string, msgs []ProducerMessage) error
type Producer ¶
type Producer interface { Send(topic string, values []ProducerMessage) error SendBytes(topic string, values [][]byte) error Close() }
func MustSetupProducer ¶
mustSetupProducer creates a producer for the kafka queues on brokers brokers is a comma separated list of kafka nodes with host and ip, eg "172.0.0.1:9092,172.0.0.2:9092"
func MustSetupProducerWithSerializer ¶
func MustSetupProducerWithSerializer(brokers string, serializer Serializer) Producer
func SetupProducer ¶
func SetupProducer(brokers string, serializer Serializer) (Producer, error)
mustSetupProducer creates a producer for the kafka queues on brokers brokers is a comma separated list of kafka nodes with host and ip, eg "172.0.0.1:9092,172.0.0.2:9092"
type ProducerMessage ¶
type ProducerMessage interface { // getKey returns the key for the kafka message, this can be set for each message type separately GetKey() string }
ProducerMessage is to be implemented by all messages to be send through this library
type Serializer ¶
type Serializer func(ProducerMessage) ([]byte, error)