Documentation ¶
Overview ¶
Package cluster provides cluster extensions for Sarama, enabing users to consume topics across from multiple, balanced nodes.
It requires Kafka v0.9+ and follows the steps guide, described in: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
Index ¶
- type Client
- type Config
- type Consumer
- func (c *Consumer) Close() (err error)
- func (c *Consumer) CommitOffsets() error
- func (c *Consumer) Errors() <-chan error
- func (c *Consumer) HighWaterMarks() map[string]map[int32]int64
- func (c *Consumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string)
- func (c *Consumer) MarkOffsets(s *OffsetStash)
- func (c *Consumer) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string)
- func (c *Consumer) Messages() <-chan *sarama.ConsumerMessage
- func (c *Consumer) Notifications() <-chan *Notification
- func (c *Consumer) Partitions() <-chan PartitionConsumer
- func (c *Consumer) RewindOffsets(topic string, resetTime int64) error
- func (c *Consumer) Subscriptions() map[string][]int32
- type ConsumerMode
- type Error
- type Notification
- type NotificationType
- type OffsetStash
- type PartitionConsumer
- type Strategy
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { sarama.Config // Group is the namespace for group management properties Group struct { // The strategy to use for the allocation of partitions to consumers (defaults to StrategyRange) PartitionStrategy Strategy // By default, messages and errors from the subscribed topics and partitions are all multiplexed and // made available through the consumer's Messages() and Errors() channels. // // Users who require low-level access can enable ConsumerModePartitions where individual partitions // are exposed on the Partitions() channel. Messages and errors must then be consumed on the partitions // themselves. Mode ConsumerMode Offsets struct { Retry struct { // The numer retries when committing offsets (defaults to 3). Max int } Synchronization struct { // The duration allowed for other clients to commit their offsets before resumption in this client, e.g. during a rebalance // NewConfig sets this to the Consumer.MaxProcessingTime duration of the Sarama configuration DwellTime time.Duration } } Session struct { // The allowed session timeout for registered consumers (defaults to 30s). // Must be within the allowed server range. Timeout time.Duration } Heartbeat struct { // Interval between each heartbeat (defaults to 3s). It should be no more // than 1/3rd of the Group.Session.Timout setting Interval time.Duration } // Return specifies which group channels will be populated. If they are set to true, // you must read from the respective channels to prevent deadlock. Return struct { // If enabled, rebalance notification will be returned on the // Notifications channel (default disabled). Notifications bool } Topics struct { // An additional whitelist of topics to subscribe to. Whitelist *regexp.Regexp // An additional blacklist of topics to avoid. If set, this will precede over // the Whitelist setting. Blacklist *regexp.Regexp } Member struct { // Custom metadata to include when joining the group. The user data for all joined members // can be retrieved by sending a DescribeGroupRequest to the broker that is the // coordinator for the group. UserData []byte } } }
Config extends sarama.Config with Group specific namespace
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer is a cluster group consumer
Example ¶
This example shows how to use the consumer to read messages from a multiple topics through a multiplexed channel.
package main import ( "fmt" "log" "os" "os/signal" cluster "github.com/bsm/sarama-cluster" ) func main() { // init (custom) config, enable errors and notifications config := cluster.NewConfig() config.Consumer.Return.Errors = true config.Group.Return.Notifications = true // init consumer brokers := []string{"127.0.0.1:9092"} topics := []string{"my_topic", "other_topic"} consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config) if err != nil { panic(err) } defer consumer.Close() // trap SIGINT to trigger a shutdown. signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) // consume errors go func() { for err := range consumer.Errors() { log.Printf("Error: %s\n", err.Error()) } }() // consume notifications go func() { for ntf := range consumer.Notifications() { log.Printf("Rebalanced: %+v\n", ntf) } }() // consume messages, watch signals for { select { case msg, ok := <-consumer.Messages(): if ok { fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value) consumer.MarkOffset(msg, "") // mark message as processed } case <-signals: return } } }
Output:
func NewConsumer ¶
func NewConsumer(addrs []string, groupID string, topics []string, config *Config) (*Consumer, error)
NewConsumer initializes a new consumer
func (*Consumer) CommitOffsets ¶
CommitOffsets manually commits marked offsets.
func (*Consumer) Errors ¶
Errors returns a read channel of errors that occur during offset management, if enabled. By default, errors are logged and not returned over this channel. If you want to implement any custom error handling, set your config's Consumer.Return.Errors setting to true, and read from this channel.
func (*Consumer) HighWaterMarks ¶
HighWaterMarks returns the current high water marks for each topic and partition Consistency between partitions is not guaranteed since high water marks are updated separately.
func (*Consumer) MarkOffset ¶
func (c *Consumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string)
MarkOffset marks the provided message as processed, alongside a metadata string that represents the state of the partition consumer at that point in time. The metadata string can be used by another consumer to restore that state, so it can resume consumption.
Note: calling MarkOffset does not necessarily commit the offset to the backend store immediately for efficiency reasons, and it may never be committed if your application crashes. This means that you may end up processing the same message twice, and your processing should ideally be idempotent.
func (*Consumer) MarkOffsets ¶
func (c *Consumer) MarkOffsets(s *OffsetStash)
MarkOffsets marks stashed offsets as processed. See MarkOffset for additional explanation.
func (*Consumer) MarkPartitionOffset ¶
func (c *Consumer) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string)
MarkPartitionOffset marks an offset of the provided topic/partition as processed. See MarkOffset for additional explanation.
func (*Consumer) Messages ¶
func (c *Consumer) Messages() <-chan *sarama.ConsumerMessage
Messages returns the read channel for the messages that are returned by the broker.
This channel will only return if Config.Group.Mode option is set to ConsumerModeMultiplex (default).
func (*Consumer) Notifications ¶
func (c *Consumer) Notifications() <-chan *Notification
Notifications returns a channel of Notifications that occur during consumer rebalancing. Notifications will only be emitted over this channel, if your config's Group.Return.Notifications setting to true.
func (*Consumer) Partitions ¶
func (c *Consumer) Partitions() <-chan PartitionConsumer
Partitions returns the read channels for individual partitions of this broker.
This will channel will only return if Config.Group.Mode option is set to ConsumerModePartitions.
The Partitions() channel must be listened to for the life of this consumer; when a rebalance happens old partitions will be closed (naturally come to completion) and new ones will be emitted. The returned channel will only close when the consumer is completely shut down.
Example ¶
This example shows how to use the consumer to read messages through individual partitions.
package main import ( "fmt" "os" "os/signal" cluster "github.com/bsm/sarama-cluster" ) func main() { // init (custom) config, set mode to ConsumerModePartitions config := cluster.NewConfig() config.Group.Mode = cluster.ConsumerModePartitions // init consumer brokers := []string{"127.0.0.1:9092"} topics := []string{"my_topic", "other_topic"} consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config) if err != nil { panic(err) } defer consumer.Close() // trap SIGINT to trigger a shutdown. signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) // consume partitions for { select { case part, ok := <-consumer.Partitions(): if !ok { return } // start a separate goroutine to consume messages go func(pc cluster.PartitionConsumer) { for msg := range pc.Messages() { fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value) consumer.MarkOffset(msg, "") // mark message as processed } }(part) case <-signals: return } } }
Output:
func (*Consumer) RewindOffsets ¶
RewindOffsets rewinds the offsets for this CG to the specified timestamp for all the partitions on this topic for which this consumer is subscribed
func (*Consumer) Subscriptions ¶
Subscriptions returns the consumed topics and partitions
type ConsumerMode ¶
type ConsumerMode uint8
const ( ConsumerModeMultiplex ConsumerMode = iota ConsumerModePartitions )
type Error ¶
type Error struct { Ctx string // contains filtered or unexported fields }
Error instances are wrappers for internal errors with a context and may be returned through the consumer's Errors() channel
type Notification ¶
type Notification struct { // Type exposes the notification type Type NotificationType // Claimed contains topic/partitions that were claimed by this rebalance cycle Claimed map[string][]int32 // Released contains topic/partitions that were released as part of this rebalance cycle Released map[string][]int32 // Current are topic/partitions that are currently claimed to the consumer Current map[string][]int32 }
Notification are state events emitted by the consumers on rebalance
type NotificationType ¶
type NotificationType uint8
NotificationType defines the type of notification
const ( UnknownNotification NotificationType = iota RebalanceStart RebalanceOK RebalanceError )
func (NotificationType) String ¶
func (t NotificationType) String() string
String describes the notification type
type OffsetStash ¶
type OffsetStash struct {
// contains filtered or unexported fields
}
OffsetStash allows to accumulate offsets and mark them as processed in a bulk
func (*OffsetStash) MarkOffset ¶
func (s *OffsetStash) MarkOffset(msg *sarama.ConsumerMessage, metadata string)
MarkOffset stashes the provided message offset
func (*OffsetStash) MarkPartitionOffset ¶
func (s *OffsetStash) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string)
MarkPartitionOffset stashes the offset for the provided topic/partition combination
func (*OffsetStash) Offsets ¶
func (s *OffsetStash) Offsets() map[string]int64
Offsets returns the latest stashed offsets by topic-partition
type PartitionConsumer ¶
type PartitionConsumer interface { // Close stops the PartitionConsumer from fetching messages. It will initiate a shutdown, drain // the Messages channel, harvest any errors & return them to the caller and trigger a rebalance. Close() error // Messages returns the read channel for the messages that are returned by // the broker. Messages() <-chan *sarama.ConsumerMessage // HighWaterMarkOffset returns the high water mark offset of the partition, // i.e. the offset that will be used for the next message that will be produced. // You can use this to determine how far behind the processing is. HighWaterMarkOffset() int64 // Topic returns the consumed topic name Topic() string // Partition returns the consumed partition Partition() int32 }
PartitionConsumer allows code to consume individual partitions from the cluster.
See docs for Consumer.Partitions() for more on how to implement this.
type Strategy ¶
type Strategy string
Strategy for partition to consumer assignement
const ( // StrategyRange is the default and assigns partition ranges to consumers. // Example with six partitions and two consumers: // C1: [0, 1, 2] // C2: [3, 4, 5] StrategyRange Strategy = "range" // StrategyRoundRobin assigns partitions by alternating over consumers. // Example with six partitions and two consumers: // C1: [0, 2, 4] // C2: [1, 3, 5] StrategyRoundRobin Strategy = "roundrobin" )