v0.1.6 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Aug 24, 2015 License: MIT, MIT Imports: 11 Imported by: 0



Kazoo is a library to interact with the Kafka metadata that lives in Zookeeper. It provides discovery of the cluster's brokers, topic metadata, and consumer groups.

API documentation can be found on godoc.org.




This section is empty.


View Source
var (
	ErrRunningInstances          = errors.New("Cannot deregister a consumergroup with running instances")
	ErrInstanceAlreadyRegistered = errors.New("Cannot register consumer instance because it already is registered")
	ErrInstanceNotRegistered     = errors.New("Cannot deregister consumer instance because it not registered")
	ErrPartitionClaimedByOther   = errors.New("Cannot claim partition: it is already claimed by another instance")
	ErrPartitionNotClaimed       = errors.New("Cannot release partition: it is not claimed by this instance")
View Source
var (
	FailedToClaimPartition = errors.New("Failed to claim partition for this consumer instance. Do you have a rogue consumer running?")


func ParseConnectionString

func ParseConnectionString(zookeeper string) (nodes []string, chroot string)

ParseConnectionString parses a zookeeper connection string in the form of host1:2181,host2:2181/chroot and returns the list of servers, and the chroot.


type Config

type Config struct {
	// The chroot the Kafka installation is registerde under. Defaults to "".
	Chroot string

	// The amount of time the Zookeeper client can be disconnected from the Zookeeper cluster
	// before the cluster will get rid of watches and ephemeral nodes. Defaults to 1 second.
	Timeout time.Duration

Config holds configuration values f.

func NewConfig

func NewConfig() *Config

NewConfig instantiates a new Config struct with sane defaults.

type Consumergroup

type Consumergroup struct {
	Name string
	// contains filtered or unexported fields

Consumergroup represents a high-level consumer that is registered in Zookeeper,

func (*Consumergroup) CommitOffset

func (cg *Consumergroup) CommitOffset(topic string, partition int32, offset int64) error

CommitOffset commits an offset to a group/topic/partition

func (*Consumergroup) Create

func (cg *Consumergroup) Create() error

Create registers the consumergroup in zookeeper

func (*Consumergroup) Delete

func (cg *Consumergroup) Delete() error

Delete removes the consumergroup from zookeeper

func (*Consumergroup) Exists

func (cg *Consumergroup) Exists() (bool, error)

Exists checks whether the consumergroup has been registered in Zookeeper

func (*Consumergroup) FetchOffset

func (cg *Consumergroup) FetchOffset(topic string, partition int32) (int64, error)

FetchOffset retrieves an offset to a group/topic/partition

func (*Consumergroup) Instance

func (cg *Consumergroup) Instance(id string) *ConsumergroupInstance

Instance instantiates a new ConsumergroupInstance inside this consumer group, using an existing ID.

func (*Consumergroup) Instances

func (cg *Consumergroup) Instances() (ConsumergroupInstanceList, error)

Instances returns a map of all running instances inside this consumergroup.

func (*Consumergroup) NewInstance

func (cg *Consumergroup) NewInstance() *ConsumergroupInstance

NewInstance instantiates a new ConsumergroupInstance inside this consumer group, using a newly generated ID.

func (*Consumergroup) PartitionOwner

func (cg *Consumergroup) PartitionOwner(topic string, partition int32) (*ConsumergroupInstance, error)

PartitionOwner returns the ConsumergroupInstance that has claimed the given partition. This can be nil if nobody has claime dit yet.

func (*Consumergroup) Topics

func (cg *Consumergroup) Topics() (TopicList, error)

Topics retrieves the list of topics the consumergroup has claimed ownership of at some point.

func (*Consumergroup) WatchInstances

func (cg *Consumergroup) WatchInstances() (ConsumergroupInstanceList, <-chan struct{}, error)

WatchInstances returns a ConsumergroupInstanceList, and a channel that will be closed as soon the instance list changes.

type ConsumergroupInstance

type ConsumergroupInstance struct {
	ID string
	// contains filtered or unexported fields

ConsumergroupInstance represents an instance of a Consumergroup.

func (*ConsumergroupInstance) ClaimPartition

func (cgi *ConsumergroupInstance) ClaimPartition(topic string, partition int32) error

Claim claims a topic/partition ownership for a consumer ID within a group. If the partition is already claimed by another running instance, it will return ErrAlreadyClaimed.

func (*ConsumergroupInstance) Deregister

func (cgi *ConsumergroupInstance) Deregister() error

Deregister removes the registration of the instance from zookeeper.

func (*ConsumergroupInstance) Register

func (cgi *ConsumergroupInstance) Register(topics []string) error

Register registers the consumergroup instance in Zookeeper.

func (*ConsumergroupInstance) Registered

func (cgi *ConsumergroupInstance) Registered() (bool, error)

Registered checks whether the consumergroup instance is registered in Zookeeper.

func (*ConsumergroupInstance) Registration

func (cgi *ConsumergroupInstance) Registration() (*Registration, error)

Registered returns current registration of the consumer group instance.

func (*ConsumergroupInstance) ReleasePartition

func (cgi *ConsumergroupInstance) ReleasePartition(topic string, partition int32) error

ReleasePartition releases a claim to a partition.

type ConsumergroupInstanceList

type ConsumergroupInstanceList []*ConsumergroupInstance

func (ConsumergroupInstanceList) Find

Find returns the consumergroup instance with the given ID if it exists in the list. Otherwise it will return `nil`.

func (ConsumergroupInstanceList) Len

func (cgil ConsumergroupInstanceList) Len() int

func (ConsumergroupInstanceList) Less

func (cgil ConsumergroupInstanceList) Less(i, j int) bool

func (ConsumergroupInstanceList) Swap

func (cgil ConsumergroupInstanceList) Swap(i, j int)

type ConsumergroupList

type ConsumergroupList []*Consumergroup

func (ConsumergroupList) Find

func (cgl ConsumergroupList) Find(name string) *Consumergroup

Find returns the consumergroup with the given name if it exists in the list. Otherwise it will return `nil`.

func (ConsumergroupList) Len

func (cgl ConsumergroupList) Len() int

func (ConsumergroupList) Less

func (cgl ConsumergroupList) Less(i, j int) bool

func (ConsumergroupList) Swap

func (cgl ConsumergroupList) Swap(i, j int)

type Kazoo

type Kazoo struct {
	// contains filtered or unexported fields

Kazoo interacts with the Kafka metadata in Zookeeper

func NewKazoo

func NewKazoo(servers []string, conf *Config) (*Kazoo, error)

NewKazoo creates a new connection instance

func NewKazooFromConnectionString

func NewKazooFromConnectionString(connectionString string, conf *Config) (*Kazoo, error)

NewKazooFromConnectionString creates a new connection instance based on a zookeeer connection string that can include a chroot.

func (*Kazoo) BrokerList

func (kz *Kazoo) BrokerList() ([]string, error)

BrokerList returns a slice of broker addresses that can be used to connect to the Kafka cluster, e.g. using `sarama.NewAsyncProducer()`.

func (*Kazoo) Brokers

func (kz *Kazoo) Brokers() (map[int32]string, error)

Brokers returns a map of all the brokers that make part of the Kafka cluster that is regeistered in Zookeeper.

func (*Kazoo) Close

func (kz *Kazoo) Close() error

func (*Kazoo) Consumergroup

func (kz *Kazoo) Consumergroup(name string) *Consumergroup

Consumergroup instantiates a new consumergroup.

func (*Kazoo) Consumergroups

func (kz *Kazoo) Consumergroups() (ConsumergroupList, error)

Consumergroups returns all the registered consumergroups

func (*Kazoo) Controller

func (kz *Kazoo) Controller() (int32, error)

Controller returns what broker is currently acting as controller of the Kafka cluster

func (*Kazoo) Topic

func (kz *Kazoo) Topic(topic string) *Topic

Topic returns a Topic instance for a given topic name

func (*Kazoo) Topics

func (kz *Kazoo) Topics() (TopicList, error)

Topics returns a map of all registered Kafka topics.

type Partition

type Partition struct {
	ID       int32
	Replicas []int32
	// contains filtered or unexported fields

Partition interacts with Kafka's partition metadata in Zookeeper.

func (*Partition) ISR

func (p *Partition) ISR() ([]int32, error)

ISR returns the broker IDs of the current in-sync replica set for the partition

func (*Partition) Leader

func (p *Partition) Leader() (int32, error)

Leader returns the broker ID of the broker that is currently the leader for the partition.

func (*Partition) UnderReplicated

func (p *Partition) UnderReplicated() (bool, error)

func (*Partition) UsesPreferredReplica

func (p *Partition) UsesPreferredReplica() (bool, error)

type PartitionList

type PartitionList []*Partition

func (PartitionList) Len

func (pl PartitionList) Len() int

func (PartitionList) Less

func (pl PartitionList) Less(i, j int) bool

func (PartitionList) Swap

func (pl PartitionList) Swap(i, j int)

type RegPattern

type RegPattern string
const (
	RegPatternStatic    RegPattern = "static"
	RegPatternWhiteList RegPattern = "white_list"
	RegPatternBlackList RegPattern = "black_list"

type RegVersion

type RegVersion int
const (
	RegDefaultVersion RegVersion = 1

type Registration

type Registration struct {
	Pattern      RegPattern     `json:"pattern"`
	Subscription map[string]int `json:"subscription"`
	Timestamp    int64          `json:"timestamp"`
	Version      RegVersion     `json:"version"`

type Topic

type Topic struct {
	Name string
	// contains filtered or unexported fields

Topic interacts with Kafka's topic metadata in Zookeeper.

func (*Topic) Config

func (t *Topic) Config() (map[string]string, error)

Config returns topic-level configuration settings as a map.

func (*Topic) Partition

func (t *Topic) Partition(id int32, replicas []int32) *Partition

Partition returns a Partition instance for the topic.

func (*Topic) Partitions

func (t *Topic) Partitions() (PartitionList, error)

Partitions returns a map of all partitions for the topic.

type TopicList

type TopicList []*Topic

func (TopicList) Find

func (tl TopicList) Find(name string) *Topic

Find returns the topic with the given name if it exists in the topic list, and will return `nil` otherwise.

func (TopicList) Len

func (tl TopicList) Len() int

func (TopicList) Less

func (tl TopicList) Less(i, j int) bool

func (TopicList) Swap

func (tl TopicList) Swap(i, j int)


Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL