go_kafka_client

package module
v0.0.0-...-1578468 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 6, 2015 License: Apache-2.0 Imports: 28 Imported by: 0

README

go_kafka_client

Apache Kafka Client Library for Go

Build Status

Prerequisites:

  1. Install Golang http://golang.org/doc/install
  2. Make sure env variables GOPATH and GOROOT exist and point to correct places
  3. Install GPM https://github.com/pote/gpm
  4. go get github.com/stealthly/go_kafka_client && cd $GOPATH/src/github.com/stealthly/go_kafka_client
  5. gpm install

Optional (for all tests to work):

  1. Install Docker https://docs.docker.com/installation/#installation
  2. cd $GOPATH/src/github.com/stealthly/go_kafka_client
  3. Build docker image: docker build -t stealthly/go_kafka_client .
  4. docker run -v $(pwd):/go_kafka_client stealthly/go_kafka_client

After this is done you're ready to write some code!

For email support https://groups.google.com/forum/#!forum/kafka-clients

Documentation

Overview

Package go_kafka_client provides a high-level Kafka consumer implementation and introduces different approach than Java/Scala high-level consumer.
Primary differences include:
- workers concept enforcing at least once processing before committing offsets;
- improved rebalancing algorithm - closes obsolete connections and opens new connections without stopping the whole consumer;
- supports graceful shutdown notifying client when it is over;
- batch processing;
- supports static partitions configuration allowing to start a consumer with a predefined set of partitions never caring about rebalancing;

*

  • Licensed to the Apache Software Foundation (ASF) under one or more
  • contributor license agreements. See the NOTICE file distributed with
  • this work for additional information regarding copyright ownership.
  • The ASF licenses this file to You under the Apache License, Version 2.0
  • (the "License"); you may not use this file except in compliance with
  • the License. You may obtain a copy of the License at *
  • http://www.apache.org/licenses/LICENSE-2.0 *
  • Unless required by applicable law or agreed to in writing, software
  • distributed under the License is distributed on an "AS IS" BASIS,
  • WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  • See the License for the specific language governing permissions and
  • limitations under the License.

Index

Constants

View Source
const (
	InvalidOffset int64 = -1

	//Reset the offset to the smallest offset if it is out of range
	SmallestOffset = "smallest"
	//Reset the offset to the largest offset if it is out of range
	LargestOffset = "largest"

	//Zookeeper offset storage configuration string
	ZookeeperOffsetStorage = "zookeeper"
	//Kafka offset storage configuration string
	KafkaOffsetStorage = "kafka"
)
View Source
const (
	/* Range partitioning works on a per-topic basis. For each topic, we lay out the available partitions in numeric order
	and the consumer threads in lexicographic order. We then divide the number of partitions by the total number of
	consumer streams (threads) to determine the number of partitions to assign to each consumer. If it does not evenly
	divide, then the first few consumers will have one extra partition. For example, suppose there are two consumers C1
	and C2 with two streams each, and there are five available partitions (p0, p1, p2, p3, p4). So each consumer thread
	will get at least one partition and the first consumer thread will get one extra partition. So the assignment will be:
	p0 -> C1-0, p1 -> C1-0, p2 -> C1-1, p3 -> C2-0, p4 -> C2-1 */
	RangeStrategy = "range"

	/* The round-robin partition assignor lays out all the available partitions and all the available consumer threads. It
	then proceeds to do a round-robin assignment from partition to consumer thread. If the subscriptions of all consumer
	instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts
	will be within a delta of exactly one across all consumer threads.)

	(For simplicity of implementation) the assignor is allowed to assign a given topic-partition to any consumer instance
	and thread-id within that instance. Therefore, round-robin assignment is allowed only if:
	a) Every topic has the same number of streams within a consumer instance
	b) The set of subscribed topics is identical for every consumer instance within the group. */
	RoundRobinStrategy = "roundrobin"
)
View Source
const (
	Regular          = "Regular"
	NewTopicDeployed = "NewTopicDeployed"
)

Variables

This section is empty.

Functions

func CreateMultiplePartitionsTopic

func CreateMultiplePartitionsTopic(zk string, topicName string, numPartitions int)

Convenience utility to create a topic topicName with numPartitions partitions in Zookeeper located at zk (format should be host:port). Please note that this requires Apache Kafka 0.8.1 binary distribution available through KAFKA_PATH environment variable

func Critical

func Critical(tag interface{}, message interface{})

Writes a given message with a given tag to log with level Critical.

func Criticalf

func Criticalf(tag interface{}, message interface{}, params ...interface{})

Formats a given message according to given params with a given tag to log with level Critical.

func Debug

func Debug(tag interface{}, message interface{})

Writes a given message with a given tag to log with level Debug.

func Debugf

func Debugf(tag interface{}, message interface{}, params ...interface{})

Formats a given message according to given params with a given tag to log with level Debug.

func Error

func Error(tag interface{}, message interface{})

Writes a given message with a given tag to log with level Error.

func Errorf

func Errorf(tag interface{}, message interface{}, params ...interface{})

Formats a given message according to given params with a given tag to log with level Error.

func Info

func Info(tag interface{}, message interface{})

Writes a given message with a given tag to log with level Info.

func Infof

func Infof(tag interface{}, message interface{}, params ...interface{})

Formats a given message according to given params with a given tag to log with level Info.

func LoadConfiguration

func LoadConfiguration(path string) (map[string]string, error)

TODO we need a file -> ConsumerConfig parser, not a file -> map one

func Trace

func Trace(tag interface{}, message interface{})

Writes a given message with a given tag to log with level Trace.

func Tracef

func Tracef(tag interface{}, message interface{}, params ...interface{})

Formats a given message according to given params with a given tag to log with level Trace.

func Warn

func Warn(tag interface{}, message interface{})

Writes a given message with a given tag to log with level Warn.

func Warnf

func Warnf(tag interface{}, message interface{}, params ...interface{})

Formats a given message according to given params with a given tag to log with level Warn.

Types

type BlackList

type BlackList struct {
	// contains filtered or unexported fields
}

BlackList is a topic filter that will match every topic that does not match a given regex

func NewBlackList

func NewBlackList(regex string) *BlackList

Creates a new BlackList topic filter for a given regex

type BrokerInfo

type BrokerInfo struct {
	Version int16
	Id      int32
	Host    string
	Port    uint32
}

General information about Kafka broker. Used to keep it in consumer coordinator.

func (*BrokerInfo) String

func (b *BrokerInfo) String() string

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer is a high-level Kafka consumer designed to work within a consumer group. It subscribes to coordinator events and is able to balance load within a consumer group.

func NewConsumer

func NewConsumer(config *ConsumerConfig) *Consumer

NewConsumer creates a new Consumer with a given configuration. Creating a Consumer does not start fetching immediately.

func (*Consumer) Close

func (c *Consumer) Close() <-chan bool

Tells the Consumer to close all existing connections and stop. This method is NOT blocking but returns a channel which will get a single value once the closing is finished.

func (*Consumer) StartStatic

func (c *Consumer) StartStatic(topicCountMap map[string]int)

Starts consuming specified topics using a configured amount of goroutines for each topic.

func (*Consumer) StartStaticPartitions

func (c *Consumer) StartStaticPartitions(topicPartitionMap map[string][]int32)

Starts consuming given topic-partitions using ConsumerConfig.NumConsumerFetchers goroutines for each topic.

func (*Consumer) StartWildcard

func (c *Consumer) StartWildcard(topicFilter TopicFilter, numStreams int)

Starts consuming all topics which correspond to a given topicFilter using numStreams goroutines for each topic.

func (*Consumer) String

func (c *Consumer) String() string

type ConsumerConfig

type ConsumerConfig struct {
	/* A string that uniquely identifies a set of consumers within the same consumer group */
	Groupid string

	/* A string that uniquely identifies a consumer within a group. Generated automatically if not set.
	Set this explicitly for only testing purpose. */
	Consumerid string

	/* The socket timeout for network requests. Its value should be at least FetchWaitMaxMs. */
	SocketTimeout time.Duration

	/* The maximum number of bytes to attempt to fetch */
	FetchMessageMaxBytes int32

	/* The number of goroutines used to fetch data */
	NumConsumerFetchers int

	/* Max number of message batches buffered for consumption, each batch can be up to FetchBatchSize */
	QueuedMaxMessages int32

	/* Max number of retries during rebalance */
	RebalanceMaxRetries int32

	/* The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will block */
	FetchMinBytes int32

	/* The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy FetchMinBytes */
	FetchWaitMaxMs int32

	/* Backoff time between retries during rebalance */
	RebalanceBackoff time.Duration

	/* Backoff time to refresh the leader of a partition after it loses the current leader */
	RefreshLeaderBackoff time.Duration

	/* Retry the offset commit up to this many times on failure. */
	OffsetsCommitMaxRetries int

	/* Try to commit offset every OffsetCommitInterval. If previous offset commit for a partition is still in progress updates the next offset to commit and continues.
	This way it does not commit all the offset history if the coordinator is slow, but only the highest offsets. */
	OffsetCommitInterval time.Duration

	/* Specify whether offsets should be committed to "zookeeper" (default) or "kafka". */
	OffsetsStorage string

	/* What to do if an offset is out of range.
	SmallestOffset : automatically reset the offset to the smallest offset.
	LargestOffset : automatically reset the offset to the largest offset.
	Defaults to LargestOffset. */
	AutoOffsetReset string

	/* Client id is specified by the kafka consumer client, used to distinguish different clients. */
	Clientid string

	/* Whether messages from internal topics (such as offsets) should be exposed to the consumer. */
	ExcludeInternalTopics bool

	/* Select a strategy for assigning partitions to consumer streams. Possible values: RangeStrategy, RoundRobinStrategy */
	PartitionAssignmentStrategy string

	/* Amount of workers per partition to process consumed messages. */
	NumWorkers int

	/* Times to retry processing a failed message by a worker. */
	MaxWorkerRetries int

	/* Worker retry threshold. Increments each time a worker fails to process a message within MaxWorkerRetries.
	When this threshold is hit within a WorkerThresholdTimeWindow, WorkerFailureCallback is called letting the user to decide whether the consumer should stop. */
	WorkerRetryThreshold int32

	/* Resets WorkerRetryThreshold if it isn't hit within this period. */
	WorkerThresholdTimeWindow time.Duration

	/* Callback executed when WorkerRetryThreshold exceeded within WorkerThresholdTimeWindow */
	WorkerFailureCallback FailedCallback

	/* Callback executed when Worker failed to process the message after MaxWorkerRetries and WorkerRetryThreshold is not hit */
	WorkerFailedAttemptCallback FailedAttemptCallback

	/* Worker timeout to process a single message. */
	WorkerTaskTimeout time.Duration

	/* Backoff between worker attempts to process a single message. */
	WorkerBackoff time.Duration

	/* Maximum wait time to gracefully stop a worker manager */
	WorkerManagersStopTimeout time.Duration

	/* A function which defines a user-specified action on a single message. This function is responsible for actual message processing.
	Consumer panics if Strategy is not set. */
	Strategy WorkerStrategy

	/* Number of messages to accumulate before flushing them to workers */
	FetchBatchSize int

	/* Timeout to accumulate messages. Flushes accumulated batch to workers even if it is not yet full.
	Resets after each flush meaning this won't be triggered if FetchBatchSize is reached before timeout. */
	FetchBatchTimeout time.Duration

	/* Backoff between fetch requests if no messages were fetched from a previous fetch. */
	RequeueAskNextBackoff time.Duration

	/* Maximum fetch retries if no messages were fetched from a previous fetch */
	FetchMaxRetries int

	/* Maximum retries to fetch topic metadata from one broker. */
	FetchTopicMetadataRetries int

	/* Backoff for fetch topic metadata request if the previous request failed. */
	FetchTopicMetadataBackoff time.Duration

	/* Backoff between two fetch requests for one fetch routine. Needed to prevent fetcher from querying the broker too frequently. */
	FetchRequestBackoff time.Duration

	/* Coordinator used to coordinate consumer's actions, e.g. trigger rebalance events, store offsets and consumer metadata etc. */
	Coordinator ConsumerCoordinator

	/* Indicates whether the client supports blue-green deployment.
	This config entry is needed because blue-green deployment won't work with RoundRobin partition assignment strategy.
	Defaults to true. */
	BlueGreenDeploymentEnabled bool
}

ConsumerConfig defines configuration options for Consumer

func DefaultConsumerConfig

func DefaultConsumerConfig() *ConsumerConfig

DefaultConsumerConfig creates a ConsumerConfig with sane defaults. Note that several required config entries (like Strategy and callbacks) are still not set.

func (*ConsumerConfig) String

func (c *ConsumerConfig) String() string

func (*ConsumerConfig) Validate

func (c *ConsumerConfig) Validate() error

Validates this ConsumerConfig. Returns a corresponding error if the ConsumerConfig is invalid and nil otherwise.

type ConsumerCoordinator

type ConsumerCoordinator interface {
	/* Establish connection to this ConsumerCoordinator. Returns an error if fails to connect, nil otherwise. */
	Connect() error

	/* Registers a new consumer with Consumerid id and TopicCount subscription that is a part of consumer group Group in this ConsumerCoordinator. Returns an error if registration failed, nil otherwise. */
	RegisterConsumer(Consumerid string, Group string, TopicCount TopicsToNumStreams) error

	/* Deregisters consumer with Consumerid id that is a part of consumer group Group form this ConsumerCoordinator. Returns an error if deregistration failed, nil otherwise. */
	DeregisterConsumer(Consumerid string, Group string) error

	/* Gets the information about consumer with Consumerid id that is a part of consumer group Group from this ConsumerCoordinator.
	Returns ConsumerInfo on success and error otherwise (For example if consumer with given Consumerid does not exist). */
	GetConsumerInfo(Consumerid string, Group string) (*ConsumerInfo, error)

	/* Gets the information about consumers per topic in consumer group Group excluding internal topics (such as offsets) if ExcludeInternalTopics = true.
	Returns a map where keys are topic names and values are slices of consumer ids and fetcher ids associated with this topic and error on failure. */
	GetConsumersPerTopic(Group string, ExcludeInternalTopics bool) (map[string][]ConsumerThreadId, error)

	/* Gets the list of all consumer ids within a consumer group Group. Returns a slice containing all consumer ids in group and error on failure. */
	GetConsumersInGroup(Group string) ([]string, error)

	/* Gets the list of all topics registered in this ConsumerCoordinator. Returns a slice conaining topic names and error on failure. */
	GetAllTopics() ([]string, error)

	/* Gets the information about existing partitions for a given Topics.
	Returns a map where keys are topic names and values are slices of partition ids associated with this topic and error on failure. */
	GetPartitionsForTopics(Topics []string) (map[string][]int32, error)

	/* Gets the information about all Kafka brokers registered in this ConsumerCoordinator.
	Returns a slice of BrokerInfo and error on failure. */
	GetAllBrokers() ([]*BrokerInfo, error)

	/* Gets the offset for a given TopicPartition and consumer group Group.
	Returns offset on sucess, error otherwise. */
	GetOffsetForTopicPartition(Group string, TopicPartition *TopicAndPartition) (int64, error)

	/* Notifies consumer group about new deployed topic, which should be taken after current one is exhausted */
	NotifyConsumerGroup(Group string, ConsumerId string) error

	/* Removes a notification notificationId for consumer group Group */
	PurgeNotificationForGroup(Group string, notificationId string) error

	/* Subscribes for any change that should trigger consumer rebalance on consumer group Group in this ConsumerCoordinator or trigger topic switch.
	Returns a read-only channel of CoordinatorEvent that will get values on any significant coordinator event (e.g. new consumer appeared, new broker appeared etc.) and error if failed to subscribe. */
	SubscribeForChanges(Group string) (<-chan CoordinatorEvent, error)

	GetNewDeployedTopics(Group string) (map[string]*DeployedTopics, error)

	/* Tells the ConsumerCoordinator to unsubscribe from events for the consumer it is associated with. */
	Unsubscribe()

	/* Tells the ConsumerCoordinator to claim partition topic Topic and partition Partition for ConsumerThreadId fetcher that works within a consumer group Group.
	Returns true if claim is successful, false and error explaining failure otherwise. */
	ClaimPartitionOwnership(Group string, Topic string, Partition int32, ConsumerThreadId ConsumerThreadId) (bool, error)

	/* Tells the ConsumerCoordinator to release partition ownership on topic Topic and partition Partition for consumer group Group.
	Returns error if failed to released partition ownership. */
	ReleasePartitionOwnership(Group string, Topic string, Partition int32) error

	/* Tells the ConsumerCoordinator to commit offset Offset for topic and partition TopicPartition for consumer group Group.
	Returns error if failed to commit offset. */
	CommitOffset(Group string, TopicPartition *TopicAndPartition, Offset int64) error
}

ConsumerCoordinator is used to coordinate actions of multiple consumers within the same consumer group. It is responsible for keeping track of alive consumers, manages their offsets and assigns partitions to consume. The current default ConsumerCoordinator is ZookeeperCoordinator. More of them can be added in future.

type ConsumerInfo

type ConsumerInfo struct {
	Version      int16
	Subscription map[string]int
	Pattern      string
	Timestamp    int64
}

General information about Kafka consumer. Used to keep it in consumer coordinator.

func (*ConsumerInfo) String

func (c *ConsumerInfo) String() string

type ConsumerThreadId

type ConsumerThreadId struct {
	Consumer string
	ThreadId int
}

Consumer routine id. Used to keep track of what consumer routine consumes a particular topic-partition in consumer coordinator.

func (*ConsumerThreadId) String

func (c *ConsumerThreadId) String() string

type CoordinatorEvent

type CoordinatorEvent string

type DefaultLogger

type DefaultLogger struct {
	// contains filtered or unexported fields
}

Default implementation of KafkaLogger interface used in this client.

func NewDefaultLogger

func NewDefaultLogger(Level LogLevel) *DefaultLogger

Creates a new DefaultLogger that is configured to write messages to console with minimum log level Level.

func (*DefaultLogger) Critical

func (dl *DefaultLogger) Critical(message string, params ...interface{})

func (*DefaultLogger) Debug

func (dl *DefaultLogger) Debug(message string, params ...interface{})

func (*DefaultLogger) Error

func (dl *DefaultLogger) Error(message string, params ...interface{})

func (*DefaultLogger) Info

func (dl *DefaultLogger) Info(message string, params ...interface{})

func (*DefaultLogger) Trace

func (dl *DefaultLogger) Trace(message string, params ...interface{})

func (*DefaultLogger) Warn

func (dl *DefaultLogger) Warn(message string, params ...interface{})

type DeployedTopics

type DeployedTopics struct {
	//comma separated list of topics to consume from
	Topics string
	//either black_list, white_list or static
	Pattern string
}

type FailedAttemptCallback

type FailedAttemptCallback func(*Task, WorkerResult) FailedDecision

type FailedCallback

type FailedCallback func(*WorkerManager) FailedDecision

type FailedDecision

type FailedDecision int32
const (
	CommitOffsetAndContinue FailedDecision = iota
	DoNotCommitOffsetAndContinue
	CommitOffsetAndStop
	DoNotCommitOffsetAndStop
)

type FailureCounter

type FailureCounter struct {
	// contains filtered or unexported fields
}

func NewFailureCounter

func NewFailureCounter(FailedThreshold int32, WorkerThresholdTimeWindow time.Duration) *FailureCounter

func (*FailureCounter) Failed

func (f *FailureCounter) Failed() bool

type KafkaLogger

type KafkaLogger interface {
	//Formats a given message according to given params to log with level Trace.
	Trace(message string, params ...interface{})

	//Formats a given message according to given params to log with level Debug.
	Debug(message string, params ...interface{})

	//Formats a given message according to given params to log with level Info.
	Info(message string, params ...interface{})

	//Formats a given message according to given params to log with level Warn.
	Warn(message string, params ...interface{})

	//Formats a given message according to given params to log with level Error.
	Error(message string, params ...interface{})

	//Formats a given message according to given params to log with level Critical.
	Critical(message string, params ...interface{})
}

Logger interface. Lets you plug-in your custom logging library instead of using built-in one.

Logger used by this client. Defaults to build-in logger with Info log level.

type LogLevel

type LogLevel string

Represents a logging level

const (
	//Use TraceLevel for debugging to find problems in functions, variables etc.
	TraceLevel LogLevel = "trace"

	//Use DebugLevel for detailed system reports and diagnostic messages.
	DebugLevel LogLevel = "debug"

	//Use InfoLevel for general information about a running application.
	InfoLevel LogLevel = "info"

	//Use WarnLevel to indicate small errors and failures that should not happen normally but are recovered automatically.
	WarnLevel LogLevel = "warn"

	//Use ErrorLevel to indicate severe errors that affect application workflow and are not handled automatically.
	ErrorLevel LogLevel = "error"

	//Use CriticalLevel to indicate fatal errors that may cause data corruption or loss.
	CriticalLevel LogLevel = "critical"
)

type Message

type Message struct {
	Key       []byte
	Value     []byte
	Topic     string
	Partition int32
	Offset    int64
}

Single Kafka message that is sent to user-defined Strategy

func (*Message) String

func (m *Message) String() string

type ProcessingFailedResult

type ProcessingFailedResult struct {
	// contains filtered or unexported fields
}

func NewProcessingFailedResult

func NewProcessingFailedResult(id TaskId) *ProcessingFailedResult

func (*ProcessingFailedResult) Id

func (wr *ProcessingFailedResult) Id() TaskId

func (*ProcessingFailedResult) String

func (sr *ProcessingFailedResult) String() string

func (*ProcessingFailedResult) Success

func (wr *ProcessingFailedResult) Success() bool

type StaticTopicsToNumStreams

type StaticTopicsToNumStreams struct {
	ConsumerId            string
	TopicsToNumStreamsMap map[string]int
}

TopicsToNumStreams implementation representing a static consumer subscription.

func (*StaticTopicsToNumStreams) GetConsumerThreadIdsPerTopic

func (tc *StaticTopicsToNumStreams) GetConsumerThreadIdsPerTopic() map[string][]ConsumerThreadId

Creates a map describing consumer subscription where keys are topic names and values are slices of ConsumerThreadIds used to fetch these topics.

func (*StaticTopicsToNumStreams) GetTopicsToNumStreamsMap

func (tc *StaticTopicsToNumStreams) GetTopicsToNumStreamsMap() map[string]int

Creates a map descibing consumer subscription where keys are topic names and values are number of fetchers used to fetch these topics.

func (*StaticTopicsToNumStreams) Pattern

func (tc *StaticTopicsToNumStreams) Pattern() string

Returns a pattern describing this TopicsToNumStreams.

type SuccessfulResult

type SuccessfulResult struct {
	// contains filtered or unexported fields
}

func NewSuccessfulResult

func NewSuccessfulResult(id TaskId) *SuccessfulResult

func (*SuccessfulResult) Id

func (wr *SuccessfulResult) Id() TaskId

func (*SuccessfulResult) String

func (sr *SuccessfulResult) String() string

func (*SuccessfulResult) Success

func (wr *SuccessfulResult) Success() bool

type Task

type Task struct {
	Msg     *Message
	Retries int
	Callee  *Worker
}

func (*Task) Id

func (t *Task) Id() TaskId

type TaskId

type TaskId struct {
	TopicPartition TopicAndPartition
	Offset         int64
}

func (TaskId) String

func (tid TaskId) String() string

type TestKafkaCluster

type TestKafkaCluster struct {
	Path    string
	Servers []*TestKafkaServer
}

func StartTestKafkaCluster

func StartTestKafkaCluster(size int, zookeeperPort int) (*TestKafkaCluster, error)

func (*TestKafkaCluster) Stop

func (c *TestKafkaCluster) Stop()

type TestKafkaServer

type TestKafkaServer struct {
	Host string
	Port int
	Path string
}

func (*TestKafkaServer) Addr

func (k *TestKafkaServer) Addr() string

type TimedOutResult

type TimedOutResult struct {
	// contains filtered or unexported fields
}

func (*TimedOutResult) Id

func (wr *TimedOutResult) Id() TaskId

func (*TimedOutResult) String

func (sr *TimedOutResult) String() string

func (*TimedOutResult) Success

func (wr *TimedOutResult) Success() bool

type TopicAndPartition

type TopicAndPartition struct {
	Topic     string
	Partition int32
}

Type representing a single Kafka topic and partition

func (*TopicAndPartition) String

func (tp *TopicAndPartition) String() string

type TopicFilter

type TopicFilter interface {
	// contains filtered or unexported methods
}

Either a WhiteList or BlackList consumer topic filter.

type TopicInfo

type TopicInfo struct {
	Version    int16
	Partitions map[string][]int32
}

General information about Kafka topic. Used to keep it in consumer coordinator.

func (*TopicInfo) String

func (t *TopicInfo) String() string

type TopicPartitionData

type TopicPartitionData struct {
	TopicPartition TopicAndPartition
	Data           *sarama.FetchResponseBlock
}

Fetched data from Kafka broker for a particular topic and partition

type TopicSwitch

type TopicSwitch struct {
	ConsumerId            string
	DesiredPattern        string
	TopicsToNumStreamsMap map[string]int
}

TODO ???

func (*TopicSwitch) GetConsumerThreadIdsPerTopic

func (tc *TopicSwitch) GetConsumerThreadIdsPerTopic() map[string][]ConsumerThreadId

func (*TopicSwitch) GetTopicsToNumStreamsMap

func (tc *TopicSwitch) GetTopicsToNumStreamsMap() map[string]int

func (*TopicSwitch) Pattern

func (tc *TopicSwitch) Pattern() string

type TopicsToNumStreams

type TopicsToNumStreams interface {
	//Creates a map descibing consumer subscription where keys are topic names and values are number of fetchers used to fetch these topics.
	GetTopicsToNumStreamsMap() map[string]int

	//Creates a map describing consumer subscription where keys are topic names and values are slices of ConsumerThreadIds used to fetch these topics.
	GetConsumerThreadIdsPerTopic() map[string][]ConsumerThreadId

	//Returns a pattern describing this TopicsToNumStreams.
	Pattern() string
}

Information on Consumer subscription. Used to keep it in consumer coordinator.

func NewStaticTopicsToNumStreams

func NewStaticTopicsToNumStreams(consumerId string,
	topics string,
	pattern string,
	numStreams int,
	excludeInternalTopics bool,
	coordinator ConsumerCoordinator) TopicsToNumStreams

func NewTopicsToNumStreams

func NewTopicsToNumStreams(Groupid string, Consumerid string, Coordinator ConsumerCoordinator, ExcludeInternalTopics bool) (TopicsToNumStreams, error)

Constructs a new TopicsToNumStreams for consumer with Consumerid id that works within consumer group Groupid. Uses Coordinator to get consumer information. Returns error if fails to retrieve consumer information from Coordinator.

type WhiteList

type WhiteList struct {
	// contains filtered or unexported fields
}

WhiteList is a topic filter that will match every topic for a given regex

func NewWhiteList

func NewWhiteList(regex string) *WhiteList

Creates a new WhiteList topic filter for a given regex

type WildcardTopicsToNumStreams

type WildcardTopicsToNumStreams struct {
	Coordinator           ConsumerCoordinator
	ConsumerId            string
	TopicFilter           TopicFilter
	NumStreams            int
	ExcludeInternalTopics bool
}

TopicsToNumStreams implementation representing either whitelist or blacklist consumer subscription.

func (*WildcardTopicsToNumStreams) GetConsumerThreadIdsPerTopic

func (tc *WildcardTopicsToNumStreams) GetConsumerThreadIdsPerTopic() map[string][]ConsumerThreadId

Creates a map describing consumer subscription where keys are topic names and values are slices of ConsumerThreadIds used to fetch these topics.

func (*WildcardTopicsToNumStreams) GetTopicsToNumStreamsMap

func (tc *WildcardTopicsToNumStreams) GetTopicsToNumStreamsMap() map[string]int

Creates a map descibing consumer subscription where keys are topic names and values are number of fetchers used to fetch these topics.

func (*WildcardTopicsToNumStreams) Pattern

func (tc *WildcardTopicsToNumStreams) Pattern() string

Returns a pattern describing this TopicsToNumStreams.

type Worker

type Worker struct {
	OutputChannel chan WorkerResult
	TaskTimeout   time.Duration
	Closed        bool
}

func (*Worker) Start

func (w *Worker) Start(task *Task, strategy WorkerStrategy)

func (*Worker) String

func (w *Worker) String() string

type WorkerManager

type WorkerManager struct {
	Id                string
	Config            *ConsumerConfig
	Strategy          WorkerStrategy
	FailureHook       FailedCallback
	FailedAttemptHook FailedAttemptCallback
	Workers           []*Worker
	AvailableWorkers  chan *Worker
	CurrentBatch      map[TaskId]*Task //TODO inspect for race conditions
	InputChannel      chan []*Message
	TopicPartition    TopicAndPartition
	LargestOffset     int64

	FailCounter *FailureCounter
	// contains filtered or unexported fields
}

func NewWorkerManager

func NewWorkerManager(id string, config *ConsumerConfig, topicPartition TopicAndPartition,
	wmsIdleTimer metrics.Timer, batchDurationTimer metrics.Timer, activeWorkersCounter metrics.Counter,
	pendingWMsTasksCounter metrics.Counter) *WorkerManager

func (*WorkerManager) IsBatchProcessed

func (wm *WorkerManager) IsBatchProcessed() bool

func (*WorkerManager) Start

func (wm *WorkerManager) Start()

func (*WorkerManager) Stop

func (wm *WorkerManager) Stop() chan bool

func (*WorkerManager) String

func (wm *WorkerManager) String() string

type WorkerResult

type WorkerResult interface {
	Id() TaskId
	Success() bool
}

type WorkerStrategy

type WorkerStrategy func(*Worker, *Message, TaskId) WorkerResult

type ZookeeperConfig

type ZookeeperConfig struct {
	/* Zookeeper hosts */
	ZookeeperConnect []string

	/* Zookeeper read timeout */
	ZookeeperTimeout time.Duration

	/* Max retries to claim one partition */
	MaxClaimPartitionRetries int

	/* Backoff to retry to claim partition */
	ClaimPartitionBackoff time.Duration
}

ZookeeperConfig is used to pass multiple configuration entries to ZookeeperCoordinator.

func NewZookeeperConfig

func NewZookeeperConfig() *ZookeeperConfig

Created a new ZookeeperConfig with sane defaults. Default ZookeeperConnect points to localhost.

type ZookeeperCoordinator

type ZookeeperCoordinator struct {
	// contains filtered or unexported fields
}

ZookeeperCoordinator implements ConsumerCoordinator interface and is used to coordinate multiple consumers that work within the same consumer group.

func NewZookeeperCoordinator

func NewZookeeperCoordinator(Config *ZookeeperConfig) *ZookeeperCoordinator

Creates a new ZookeeperCoordinator with a given configuration.

func (*ZookeeperCoordinator) ClaimPartitionOwnership

func (zc *ZookeeperCoordinator) ClaimPartitionOwnership(Groupid string, Topic string, Partition int32, consumerThreadId ConsumerThreadId) (bool, error)

Tells the ConsumerCoordinator to claim partition topic Topic and partition Partition for consumerThreadId fetcher that works within a consumer group Group. Returns true if claim is successful, false and error explaining failure otherwise.

func (*ZookeeperCoordinator) CommitOffset

func (zc *ZookeeperCoordinator) CommitOffset(Groupid string, TopicPartition *TopicAndPartition, Offset int64) error

Tells the ConsumerCoordinator to commit offset Offset for topic and partition TopicPartition for consumer group Groupid. Returns error if failed to commit offset.

func (*ZookeeperCoordinator) Connect

func (zc *ZookeeperCoordinator) Connect() error

Establish connection to this ConsumerCoordinator. Returns an error if fails to connect, nil otherwise.

func (*ZookeeperCoordinator) DeployTopics

func (zc *ZookeeperCoordinator) DeployTopics(Group string, Topics DeployedTopics) error

func (*ZookeeperCoordinator) DeregisterConsumer

func (zc *ZookeeperCoordinator) DeregisterConsumer(Consumerid string, Groupid string) error

Deregisters consumer with Consumerid id that is a part of consumer group Groupid form this ConsumerCoordinator. Returns an error if deregistration failed, nil otherwise.

func (*ZookeeperCoordinator) GetAllBrokers

func (zc *ZookeeperCoordinator) GetAllBrokers() ([]*BrokerInfo, error)

Gets the information about all Kafka brokers registered in this ConsumerCoordinator. Returns a slice of BrokerInfo and error on failure.

func (*ZookeeperCoordinator) GetAllTopics

func (zc *ZookeeperCoordinator) GetAllTopics() ([]string, error)

Gets the list of all topics registered in this ConsumerCoordinator. Returns a slice conaining topic names and error on failure.

func (*ZookeeperCoordinator) GetConsumerInfo

func (zc *ZookeeperCoordinator) GetConsumerInfo(Consumerid string, Groupid string) (*ConsumerInfo, error)

Gets the information about consumer with Consumerid id that is a part of consumer group Groupid from this ConsumerCoordinator. Returns ConsumerInfo on success and error otherwise (For example if consumer with given Consumerid does not exist).

func (*ZookeeperCoordinator) GetConsumersInGroup

func (zc *ZookeeperCoordinator) GetConsumersInGroup(Groupid string) ([]string, error)

Gets the list of all consumer ids within a consumer group Groupid. Returns a slice containing all consumer ids in group and error on failure.

func (*ZookeeperCoordinator) GetConsumersPerTopic

func (zc *ZookeeperCoordinator) GetConsumersPerTopic(Groupid string, ExcludeInternalTopics bool) (map[string][]ConsumerThreadId, error)

Gets the information about consumers per topic in consumer group Groupid excluding internal topics (such as offsets) if ExcludeInternalTopics = true. Returns a map where keys are topic names and values are slices of consumer ids and fetcher ids associated with this topic and error on failure.

func (*ZookeeperCoordinator) GetNewDeployedTopics

func (zc *ZookeeperCoordinator) GetNewDeployedTopics(Group string) (map[string]*DeployedTopics, error)

func (*ZookeeperCoordinator) GetOffsetForTopicPartition

func (zc *ZookeeperCoordinator) GetOffsetForTopicPartition(Groupid string, TopicPartition *TopicAndPartition) (int64, error)

Gets the offset for a given TopicPartition and consumer group Groupid. Returns offset on sucess, error otherwise.

func (*ZookeeperCoordinator) GetPartitionsForTopics

func (zc *ZookeeperCoordinator) GetPartitionsForTopics(Topics []string) (map[string][]int32, error)

Gets the information about existing partitions for a given Topics. Returns a map where keys are topic names and values are slices of partition ids associated with this topic and error on failure.

func (*ZookeeperCoordinator) NotifyConsumerGroup

func (zc *ZookeeperCoordinator) NotifyConsumerGroup(Groupid string, ConsumerId string) error

Notifies consumer group about new deployed topic, which should be taken after current one is exhausted

func (*ZookeeperCoordinator) PurgeNotificationForGroup

func (zc *ZookeeperCoordinator) PurgeNotificationForGroup(Groupid string, notificationId string) error

Removes a notification notificationId for consumer group Group

func (*ZookeeperCoordinator) RegisterConsumer

func (zc *ZookeeperCoordinator) RegisterConsumer(Consumerid string, Groupid string, TopicCount TopicsToNumStreams) error

Registers a new consumer with Consumerid id and TopicCount subscription that is a part of consumer group Groupid in this ConsumerCoordinator. Returns an error if registration failed, nil otherwise.

func (*ZookeeperCoordinator) ReleasePartitionOwnership

func (zc *ZookeeperCoordinator) ReleasePartitionOwnership(Groupid string, Topic string, Partition int32) error

Tells the ConsumerCoordinator to release partition ownership on topic Topic and partition Partition for consumer group Groupid. Returns error if failed to released partition ownership.

func (*ZookeeperCoordinator) String

func (zc *ZookeeperCoordinator) String() string

func (*ZookeeperCoordinator) SubscribeForChanges

func (zc *ZookeeperCoordinator) SubscribeForChanges(Groupid string) (<-chan CoordinatorEvent, error)

Subscribes for any change that should trigger consumer rebalance on consumer group Groupid in this ConsumerCoordinator. Returns a read-only channel of booleans that will get values on any significant coordinator event (e.g. new consumer appeared, new broker appeared etc.) and error if failed to subscribe.

func (*ZookeeperCoordinator) Unsubscribe

func (zc *ZookeeperCoordinator) Unsubscribe()

Tells the ConsumerCoordinator to unsubscribe from events for the consumer it is associated with.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL