package module
v0.2.7 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Apr 1, 2020 License: MIT Imports: 16 Imported by: 1


go-consumergroup Build Status Go Report Card

Go-consumergroup is a kafka consumer library written in golang with rebalance and chroot supports.

Chinese Doc


  • Apache Kafka 0.8.x, 0.9.x, 0.10.x, 0.11.x, 1.0.x


Getting started

  • API documentation and examples are available via godoc.
  • The example directory contains more elaborate example applications.

User Defined Logger

logger := logrus.New()

Run Tests

$ make test

NOTE: docker-compse is required to run tests




This section is empty.


This section is empty.


This section is empty.


type Config

type Config struct {
	Chroot string
	// ZkList is required, zookeeper address's list
	ZkList []string
	// Zookeeper session timeout, default is 6s
	ZkSessionTimeout time.Duration
	// GroupID is required, identifer to determin which ConsumerGroup would be joined
	GroupID string
	// ConsumerID is optional, identifer to sign partition's owner
	ConsumerID string
	// TopicList is required, topics that ConsumerGroup would be consumed
	TopicList []string
	// Just export Sarama Config
	SaramaConfig *sarama.Config
	// Size of error channel, default is 1024
	ErrorChannelBufferSize int
	// Whether auto commit the offset or not, default is true
	OffsetAutoCommitEnable bool
	// Offset auto commit interval, default is 10s
	OffsetAutoCommitInterval time.Duration
	// Where to fetch messages when offset was not found, default is newest
	OffsetAutoReset int64
	// Claim the partition would give up after ClaimPartitionRetryTimes(>0) retires,
	// ClaimPartitionRetryTimes <= 0 would retry until success or receive stop signal
	ClaimPartitionRetryTimes int
	// Retry interval when fail to clain the partition
	ClaimPartitionRetryInterval time.Duration

Config is used to pass multiple configuration options to ConsumerGroup's constructors

func NewConfig

func NewConfig() *Config

NewConfig return the new config with default value.

type ConsumerGroup

type ConsumerGroup struct {
	// contains filtered or unexported fields

ConsumerGroup consume message from Kafka with rebalancing supports

func NewConsumerGroup

func NewConsumerGroup(config *Config) (*ConsumerGroup, error)

NewConsumerGroup create the ConsumerGroup instance with config

func (*ConsumerGroup) CommitOffset

func (cg *ConsumerGroup) CommitOffset(topic string, partition int32, offset int64) error

CommitOffset is used to commit offset when auto commit was disabled.

func (*ConsumerGroup) GetErrors added in v0.2.0

func (cg *ConsumerGroup) GetErrors(topic string) (<-chan *sarama.ConsumerError, bool)

GetErrors was used to get a unbuffered error's channel from specified topic

func (*ConsumerGroup) GetMessages added in v0.2.0

func (cg *ConsumerGroup) GetMessages(topic string) (<-chan *sarama.ConsumerMessage, bool)

GetMessages was used to get a unbuffered message's channel from specified topic

func (*ConsumerGroup) GetOffsets added in v0.2.3

func (cg *ConsumerGroup) GetOffsets() map[string]interface{}

GetOffsets return the offset in memory for debug

func (*ConsumerGroup) IsStopped

func (cg *ConsumerGroup) IsStopped() bool

IsStopped return whether the ConsumerGroup was stopped or not.

func (*ConsumerGroup) OnClose added in v0.2.3

func (cg *ConsumerGroup) OnClose(cb func())

OnClose load callback function that runs before the end

func (*ConsumerGroup) OnLoad added in v0.2.3

func (cg *ConsumerGroup) OnLoad(cb func())

OnLoad load callback function that runs after startup

func (*ConsumerGroup) Owners added in v0.2.3

func (cg *ConsumerGroup) Owners() map[string]map[int32]string

Owners return owners of all partitions

func (*ConsumerGroup) SetLogger

func (cg *ConsumerGroup) SetLogger(l *logrus.Logger)

SetLogger use to set the user's logger the consumer group

func (*ConsumerGroup) Start added in v0.2.2

func (cg *ConsumerGroup) Start() error

Start would register ConsumerGroup, and rebalance would be triggered. ConsumerGroup computes the partitions which should be consumed by consumer's num, and start fetching message.

func (*ConsumerGroup) Stop added in v0.2.2

func (cg *ConsumerGroup) Stop()

Stop would unregister ConsumerGroup, and rebalance would be triggered. The partitions which consumed by this ConsumerGroup would be assigned to others.


Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL