kafkaconsumer

package
v2.1.1+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 2, 2016 License: MIT Imports: 12 Imported by: 6

Documentation

Overview

package kafkaconsumer is a clone of github.com/wvanbergen/kafka/kafkaconsumer with sarama references replaced with gopkg.in links instead of "raw" Github

Index

Examples

Constants

This section is empty.

Variables

View Source
var Logger sarama.StdLogger

Functions

This section is empty.

Types

type Config

type Config struct {
	*sarama.Config

	Zookeeper *kazoo.Config

	MaxProcessingTime time.Duration // Time to wait for all the offsets for a partition to be processed after stopping to consume from it. Defaults to 1 minute.

	Offsets struct {
		Initial int64 // The initial offset method to use if the consumer has no previously stored offset. Must be either sarama.OffsetOldest (default) or sarama.OffsetNewest.
	}
}

func NewConfig

func NewConfig() *Config

func (*Config) Validate

func (cgc *Config) Validate() error

type Consumer

type Consumer interface {
	// Interrups will initiate the shutdown procedure of the consumer, and return immediately.
	// When you are done using the consumer, you must either call Close or Interrupt to prevent leaking memory.
	Interrupt()

	// Closes will start the shutdown procedure for the consumer and wait for it to complete.
	// When you are done using the consumer, you must either call Close or Interrupt to prevent leaking memory.
	Close() error

	// Messages returns a channel that you can read to obtain messages from Kafka to process.
	// Every message that you receive from this channel should be sent to Ack after it has been processed.
	Messages() <-chan *sarama.ConsumerMessage

	// Error returns a channel that you can read to obtain errors that occur.
	Errors() <-chan error

	// Ack marks a message as processed, indicating that the message offset can be committed
	// for the message's partition by the offset manager. Note that the offset manager may decide
	// not to commit every offset immediately for efficiency reasons. Calling Close or Interrupt
	// will make sure that the last offset provided to this function will be flushed to storage.
	// You have to provide the messages in the same order as you received them from the Messages
	// channel.
	Ack(*sarama.ConsumerMessage)
}

Consumer represents a consumer instance and is the main interface to work with as a consumer of this library.

Example

This example sets up a consumer instance that consumes two topics, processes and commits the messages that are consumed, and properly shuts down the consumer when the process is interrupted.

consumer, err := Join(
	"ExampleConsumerGroup",                       // name of the consumer group
	TopicSubscription("access_log", "audit_log"), // topics to subscribe to
	"zk1:2181,zk2:2181,zk3:2181/chroot",          // zookeeper connection string
	nil)                                          // Set this to a *Config instance to override defaults

if err != nil {
	log.Fatalln(err)
}

// Trap the interrupt signal to cleanly shut down the consumer
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
	<-c
	consumer.Interrupt()
}()

eventCount := 0
for message := range consumer.Messages() {
	// Process message
	log.Println(string(message.Value))
	eventCount += 1

	// Acknowledge that the message has been processed
	consumer.Ack(message)
}

log.Printf("Processed %d events.", eventCount)
Output:

func Join

func Join(group string, subscription Subscription, zookeeper string, config *Config) (Consumer, error)

Join joins a Kafka consumer group, and returns a Consumer instance.

  • `group` is the name of the group this consumer instance will join . All instances that form a consumer group should use the same name. A group name must be unique per Kafka cluster.
  • `subscription` is an object that describes what partitions the group wants to consume. A single instance may end up consuming between zero of them, or all of them, or any number in between. Every running instance in a group should use the same subscription; the behavior is undefined if that is not the case.
  • `zookeeper` is the zookeeper connection string, e.g. "zk1:2181,zk2:2181,zk3:2181/chroot"
  • `config` specifies the configuration. If it is nil, a default configuration is used.

type Subscription

type Subscription interface {

	// WatchPartitions returns a list of partitions that the consumer group should
	// consume, and a channel that will be fired if this list has changed.
	WatchPartitions(kazoo *kazoo.Kazoo) (kazoo.PartitionList, <-chan zk.Event, error)

	// JSON returns a JSON-encoded representation of the subscription, which will be
	// stored in Zookeeper alongside every running instance registration.
	JSON() ([]byte, error)
}

Subscription describes what topics/partitions a consumer instance is subscribed to. This can be a static list of topic, or can be a regular expression that acts as a whitelist or blacklist of topics.

The subscription is responsible for watching zookeeper to changes is the list of topics or partitions, and notify the consumer so it can trigger a rebalance.

func BlacklistSubscription

func BlacklistSubscription(pattern *regexp.Regexp) Subscription

BlacklistSubscription creates a subscription on topics that do not match a given regular expression. It will automatically subscribe to new topics that don't match the pattern when they are created.

func StaticSubscription

func StaticSubscription(subscription map[string]int) Subscription

StaticSubscription creates a static subscription for a map of topics, and the number of streams that will be used to consume it.

func TopicSubscription

func TopicSubscription(topics ...string) Subscription

TopicSubscription creates a static subscription for a list of topics.

func WhitelistSubscription

func WhitelistSubscription(pattern *regexp.Regexp) Subscription

WhitelistSubscription creates a subscription on topics that match a given regular expression. It will automatically subscribe to new topics that match the pattern when they are created.

type SubscriptionPattern

type SubscriptionPattern string
const (
	SubscriptionPatternStatic    SubscriptionPattern = "static"
	SubscriptionPatternWhiteList SubscriptionPattern = "white_list"
	SubscriptionPatternBlackList SubscriptionPattern = "black_list"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL