kazoo

package module
Version: v0.0.0-...-f72d861 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 2, 2018 License: MIT Imports: 13 Imported by: 163

README

Kazoo

Kazoo is a library to interact with the Kafka metadata that lives in Zookeeper. It provides discovery of the cluster's brokers, topic metadata, and consumer groups.

API documentation can be found on godoc.org.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrRunningInstances          = errors.New("Cannot deregister a consumergroup with running instances")
	ErrInstanceAlreadyRegistered = errors.New("Consumer instance already registered")
	ErrInstanceNotRegistered     = errors.New("Consumer instance not registered")
	ErrPartitionClaimedByOther   = errors.New("Cannot claim partition: it is already claimed by another instance")
	ErrPartitionNotClaimed       = errors.New("Cannot release partition: it is not claimed by this instance")
)
View Source
var (
	ErrTopicExists          = errors.New("Topic already exists")
	ErrTopicMarkedForDelete = errors.New("Topic is already marked for deletion")
	ErrDeletionTimedOut     = errors.New("Timed out while waiting for a topic to be deleted")
)
View Source
var (
	ErrInvalidPartitionCount    = errors.New("Number of partitions must be larger than 0")
	ErrInvalidReplicationFactor = errors.New("Replication factor must be between 1 and the number of brokers")
	ErrInvalidReplicaCount      = errors.New("All partitions must have the same number of replicas")
	ErrReplicaBrokerOverlap     = errors.New("All replicas for a partition must be on separate brokers")
	ErrInvalidBroker            = errors.New("Replica assigned to invalid broker")
	ErrMissingPartitionID       = errors.New("Partition ids must be sequential starting from 0")
	ErrDuplicatePartitionID     = errors.New("Each partition must have a unique ID")
)
View Source
var (
	FailedToClaimPartition = errors.New("Failed to claim partition for this consumer instance. Do you have a rogue consumer running?")
)

Functions

func BuildConnectionString

func BuildConnectionString(nodes []string) string

BuildConnectionString builds a Zookeeper connection string for a list of nodes. Returns a string like "zk1:2181,zk2:2181,zk3:2181"

func BuildConnectionStringWithChroot

func BuildConnectionStringWithChroot(nodes []string, chroot string) string

ConnectionStringWithChroot builds a Zookeeper connection string for a list of nodes and a chroot. The chroot should start with "/". Returns a string like "zk1:2181,zk2:2181,zk3:2181/chroot"

func ParseConnectionString

func ParseConnectionString(zookeeper string) (nodes []string, chroot string)

ParseConnectionString parses a zookeeper connection string in the form of host1:2181,host2:2181/chroot and returns the list of servers, and the chroot.

Types

type Config

type Config struct {
	// The chroot the Kafka installation is registerde under. Defaults to "".
	Chroot string

	// The amount of time the Zookeeper client can be disconnected from the Zookeeper cluster
	// before the cluster will get rid of watches and ephemeral nodes. Defaults to 1 second.
	Timeout time.Duration

	// Logger
	Logger zk.Logger
}

Config holds configuration values f.

func NewConfig

func NewConfig() *Config

NewConfig instantiates a new Config struct with sane defaults.

type Consumergroup

type Consumergroup struct {
	Name string
	// contains filtered or unexported fields
}

Consumergroup represents a high-level consumer that is registered in Zookeeper,

func (*Consumergroup) CommitOffset

func (cg *Consumergroup) CommitOffset(topic string, partition int32, offset int64) error

CommitOffset commits an offset to a group/topic/partition

func (*Consumergroup) Create

func (cg *Consumergroup) Create() error

Create registers the consumergroup in zookeeper

func (*Consumergroup) Delete

func (cg *Consumergroup) Delete() error

Delete removes the consumergroup from zookeeper

func (*Consumergroup) Exists

func (cg *Consumergroup) Exists() (bool, error)

Exists checks whether the consumergroup has been registered in Zookeeper

func (*Consumergroup) FetchAllOffsets

func (cg *Consumergroup) FetchAllOffsets() (map[string]map[int32]int64, error)

FetchOffset retrieves all the commmitted offsets for a group

func (*Consumergroup) FetchOffset

func (cg *Consumergroup) FetchOffset(topic string, partition int32) (int64, error)

FetchOffset retrieves an offset to a group/topic/partition

func (*Consumergroup) Instance

func (cg *Consumergroup) Instance(id string) *ConsumergroupInstance

Instance instantiates a new ConsumergroupInstance inside this consumer group, using an existing ID.

func (*Consumergroup) Instances

func (cg *Consumergroup) Instances() (ConsumergroupInstanceList, error)

Instances returns a map of all running instances inside this consumergroup.

func (*Consumergroup) NewInstance

func (cg *Consumergroup) NewInstance() *ConsumergroupInstance

NewInstance instantiates a new ConsumergroupInstance inside this consumer group, using a newly generated ID.

func (*Consumergroup) PartitionOwner

func (cg *Consumergroup) PartitionOwner(topic string, partition int32) (*ConsumergroupInstance, error)

PartitionOwner returns the ConsumergroupInstance that has claimed the given partition. This can be nil if nobody has claimed it yet.

func (*Consumergroup) ResetOffsets

func (cg *Consumergroup) ResetOffsets() error

func (*Consumergroup) Topics

func (cg *Consumergroup) Topics() (TopicList, error)

Topics retrieves the list of topics the consumergroup has claimed ownership of at some point.

func (*Consumergroup) WatchInstances

func (cg *Consumergroup) WatchInstances() (ConsumergroupInstanceList, <-chan zk.Event, error)

WatchInstances returns a ConsumergroupInstanceList, and a channel that will be closed as soon the instance list changes.

func (*Consumergroup) WatchPartitionOwner

func (cg *Consumergroup) WatchPartitionOwner(topic string, partition int32) (*ConsumergroupInstance, <-chan zk.Event, error)

WatchPartitionOwner retrieves what instance is currently owning the partition, and sets a Zookeeper watch to be notified of changes. If the partition currently does not have an owner, the function returns nil for every return value. In this case is should be safe to claim the partition for an instance.

type ConsumergroupInstance

type ConsumergroupInstance struct {
	ID string
	// contains filtered or unexported fields
}

ConsumergroupInstance represents an instance of a Consumergroup.

func (*ConsumergroupInstance) ClaimPartition

func (cgi *ConsumergroupInstance) ClaimPartition(topic string, partition int32) error

Claim claims a topic/partition ownership for a consumer ID within a group. If the partition is already claimed by another running instance, it will return ErrAlreadyClaimed.

func (*ConsumergroupInstance) Deregister

func (cgi *ConsumergroupInstance) Deregister() error

Deregister removes the registration of the instance from zookeeper.

func (*ConsumergroupInstance) Register

func (cgi *ConsumergroupInstance) Register(topics []string) error

Register registers the consumergroup instance in Zookeeper.

func (*ConsumergroupInstance) RegisterWithSubscription

func (cgi *ConsumergroupInstance) RegisterWithSubscription(subscriptionJSON []byte) error

RegisterSubscription registers the consumer instance in Zookeeper, with its subscription.

func (*ConsumergroupInstance) Registered

func (cgi *ConsumergroupInstance) Registered() (bool, error)

Registered checks whether the consumergroup instance is registered in Zookeeper.

func (*ConsumergroupInstance) Registration

func (cgi *ConsumergroupInstance) Registration() (*Registration, error)

Registered returns current registration of the consumer group instance.

func (*ConsumergroupInstance) ReleasePartition

func (cgi *ConsumergroupInstance) ReleasePartition(topic string, partition int32) error

ReleasePartition releases a claim to a partition.

func (*ConsumergroupInstance) UpdateRegistration

func (cgi *ConsumergroupInstance) UpdateRegistration(topics []string) error

UpdateRegistration updates a consumer group member registration. If the consumer group member has not been registered yet, then an error is returned.

func (*ConsumergroupInstance) WatchRegistration

func (cgi *ConsumergroupInstance) WatchRegistration() (*Registration, <-chan zk.Event, error)

WatchRegistered returns current registration of the consumer group instance, and a channel that will be closed as soon the registration changes.

type ConsumergroupInstanceList

type ConsumergroupInstanceList []*ConsumergroupInstance

ConsumergroupInstanceList implements the sortable interface on top of a consumer instance list

func (ConsumergroupInstanceList) Find

Find returns the consumergroup instance with the given ID if it exists in the list. Otherwise it will return `nil`.

func (ConsumergroupInstanceList) Len

func (cgil ConsumergroupInstanceList) Len() int

func (ConsumergroupInstanceList) Less

func (cgil ConsumergroupInstanceList) Less(i, j int) bool

func (ConsumergroupInstanceList) Swap

func (cgil ConsumergroupInstanceList) Swap(i, j int)

type ConsumergroupList

type ConsumergroupList []*Consumergroup

ConsumergroupList implements the sortable interface on top of a consumer group list

func (ConsumergroupList) Find

func (cgl ConsumergroupList) Find(name string) *Consumergroup

Find returns the consumergroup with the given name if it exists in the list. Otherwise it will return `nil`.

func (ConsumergroupList) Len

func (cgl ConsumergroupList) Len() int

func (ConsumergroupList) Less

func (cgl ConsumergroupList) Less(i, j int) bool

func (ConsumergroupList) Swap

func (cgl ConsumergroupList) Swap(i, j int)

type Kazoo

type Kazoo struct {
	// contains filtered or unexported fields
}

Kazoo interacts with the Kafka metadata in Zookeeper

func NewKazoo

func NewKazoo(servers []string, conf *Config) (*Kazoo, error)

NewKazoo creates a new connection instance

func NewKazooFromConnectionString

func NewKazooFromConnectionString(connectionString string, conf *Config) (*Kazoo, error)

NewKazooFromConnectionString creates a new connection instance based on a zookeeer connection string that can include a chroot.

func (*Kazoo) BrokerList

func (kz *Kazoo) BrokerList() ([]string, error)

BrokerList returns a slice of broker addresses that can be used to connect to the Kafka cluster, e.g. using `sarama.NewAsyncProducer()`.

func (*Kazoo) Brokers

func (kz *Kazoo) Brokers() (map[int32]string, error)

Brokers returns a map of all the brokers that make part of the Kafka cluster that is registered in Zookeeper.

func (*Kazoo) Close

func (kz *Kazoo) Close() error

Close closes the connection with the Zookeeper cluster

func (*Kazoo) Consumergroup

func (kz *Kazoo) Consumergroup(name string) *Consumergroup

Consumergroup instantiates a new consumergroup.

func (*Kazoo) Consumergroups

func (kz *Kazoo) Consumergroups() (ConsumergroupList, error)

Consumergroups returns all the registered consumergroups

func (*Kazoo) Controller

func (kz *Kazoo) Controller() (int32, error)

Controller returns what broker is currently acting as controller of the Kafka cluster

func (*Kazoo) CreateTopic

func (kz *Kazoo) CreateTopic(name string, partitionCount int, replicationFactor int, topicConfig map[string]string) error

CreateTopic creates a new kafka topic with the specified parameters and properties

func (*Kazoo) DeleteTopic

func (kz *Kazoo) DeleteTopic(name string) error

DeleteTopic marks a kafka topic for deletion. Deleting a topic is asynchronous and DeleteTopic will return before Kafka actually does the deletion.

func (*Kazoo) DeleteTopicSync

func (kz *Kazoo) DeleteTopicSync(name string, timeout time.Duration) error

DeleteTopicSync marks a kafka topic for deletion and waits until it is deleted before returning.

func (*Kazoo) Topic

func (kz *Kazoo) Topic(topic string) *Topic

Topic returns a Topic instance for a given topic name

func (*Kazoo) Topics

func (kz *Kazoo) Topics() (TopicList, error)

Topics returns a list of all registered Kafka topics.

func (*Kazoo) WatchTopics

func (kz *Kazoo) WatchTopics() (TopicList, <-chan zk.Event, error)

WatchTopics returns a list of all registered Kafka topics, and watches that list for changes.

type Partition

type Partition struct {
	ID       int32
	Replicas []int32
	// contains filtered or unexported fields
}

Partition interacts with Kafka's partition metadata in Zookeeper.

func (*Partition) ISR

func (p *Partition) ISR() ([]int32, error)

ISR returns the broker IDs of the current in-sync replica set for the partition

func (*Partition) Key

func (p *Partition) Key() string

Key returns a unique identifier for the partition, using the form "topic/partition".

func (*Partition) Leader

func (p *Partition) Leader() (int32, error)

Leader returns the broker ID of the broker that is currently the leader for the partition.

func (*Partition) PreferredReplica

func (p *Partition) PreferredReplica() int32

PreferredReplica returns the preferred replica for this partition.

func (*Partition) Topic

func (p *Partition) Topic() *Topic

Topic returns the Topic of this partition.

func (*Partition) UnderReplicated

func (p *Partition) UnderReplicated() (bool, error)

func (*Partition) UsesPreferredReplica

func (p *Partition) UsesPreferredReplica() (bool, error)

type PartitionList

type PartitionList []*Partition

PartitionList is a type that implements the sortable interface for a list of Partition instances

func (PartitionList) Len

func (pl PartitionList) Len() int

func (PartitionList) Less

func (pl PartitionList) Less(i, j int) bool

func (PartitionList) Swap

func (pl PartitionList) Swap(i, j int)

type RegPattern

type RegPattern string
const (
	RegPatternStatic    RegPattern = "static"
	RegPatternWhiteList RegPattern = "white_list"
	RegPatternBlackList RegPattern = "black_list"
)

type RegVersion

type RegVersion int
const (
	RegDefaultVersion RegVersion = 1
)

type Registration

type Registration struct {
	Pattern      RegPattern     `json:"pattern"`
	Subscription map[string]int `json:"subscription"`
	Timestamp    int64          `json:"timestamp"`
	Version      RegVersion     `json:"version"`
}

type Topic

type Topic struct {
	Name string
	// contains filtered or unexported fields
}

Topic interacts with Kafka's topic metadata in Zookeeper.

func (*Topic) Config

func (t *Topic) Config() (map[string]string, error)

Config returns topic-level configuration settings as a map.

func (*Topic) Exists

func (t *Topic) Exists() (bool, error)

Exists returns true if the topic exists on the Kafka cluster.

func (*Topic) Partition

func (t *Topic) Partition(id int32, replicas []int32) *Partition

Partition returns a Partition instance for the topic.

func (*Topic) Partitions

func (t *Topic) Partitions() (PartitionList, error)

Partitions returns a list of all partitions for the topic.

func (*Topic) Watch

func (t *Topic) Watch() (<-chan zk.Event, error)

Watch watches the topic for changes.

func (*Topic) WatchPartitions

func (t *Topic) WatchPartitions() (PartitionList, <-chan zk.Event, error)

WatchPartitions returns a list of all partitions for the topic, and watches the topic for changes.

type TopicList

type TopicList []*Topic

TopicList is a type that implements the sortable interface for a list of Topic instances.

func (TopicList) Find

func (tl TopicList) Find(name string) *Topic

Find returns the topic with the given name if it exists in the topic list, and will return `nil` otherwise.

func (TopicList) Len

func (tl TopicList) Len() int

func (TopicList) Less

func (tl TopicList) Less(i, j int) bool

func (TopicList) Swap

func (tl TopicList) Swap(i, j int)

Directories

Path Synopsis
tools

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL