Documentation
¶
Index ¶
- Variables
- type Config
- type ConsumerGroup
- func (cg *ConsumerGroup) Close() error
- func (cg *ConsumerGroup) Closed() bool
- func (cg *ConsumerGroup) CommitUpto(message *sarama.ConsumerMessage) error
- func (cg *ConsumerGroup) Errors() <-chan error
- func (cg *ConsumerGroup) FlushOffsets() error
- func (cg *ConsumerGroup) InstanceRegistered() (bool, error)
- func (cg *ConsumerGroup) Logf(format string, args ...interface{})
- func (cg *ConsumerGroup) Messages() <-chan *sarama.ConsumerMessage
- type OffsetManager
- type OffsetManagerConfig
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var (
AlreadyClosing = errors.New("The consumer group is already shutting down.")
)
var (
UncleanClose = errors.New("Not all offsets were committed before shutdown was completed")
)
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
*sarama.Config
Zookeeper *kazoo.Config
Offsets struct {
Initial int64 // The initial offset method to use if the consumer has no previously stored offset. Must be either sarama.OffsetOldest (default) or sarama.OffsetNewest.
ProcessingTimeout time.Duration // Time to wait for all the offsets for a partition to be processed after stopping to consume from it. Defaults to 1 minute.
CommitInterval time.Duration // The interval between which the processed offsets are commited.
ResetOffsets bool // Resets the offsets for the consumergroup so that it won't resume from where it left off previously.
}
}
type ConsumerGroup ¶
type ConsumerGroup struct {
// contains filtered or unexported fields
}
The ConsumerGroup type holds all the information for a consumer that is part of a consumer group. Call JoinConsumerGroup to start a consumer.
Example ¶
consumer, consumerErr := JoinConsumerGroup(
"ExampleConsumerGroup",
[]string{TopicWithSinglePartition, TopicWithMultiplePartitions},
zookeeperPeers,
nil)
if consumerErr != nil {
log.Fatalln(consumerErr)
}
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
<-c
consumer.Close()
}()
eventCount := 0
for event := range consumer.Messages() {
// Process event
log.Println(string(event.Value))
eventCount += 1
// Ack event
consumer.CommitUpto(event)
}
log.Printf("Processed %d events.", eventCount)
func JoinConsumerGroup ¶
func JoinConsumerGroup(name string, topics []string, zookeeper []string, config *Config) (cg *ConsumerGroup, err error)
Connects to a consumer group, using Zookeeper for auto-discovery
func (*ConsumerGroup) Close ¶
func (cg *ConsumerGroup) Close() error
func (*ConsumerGroup) Closed ¶
func (cg *ConsumerGroup) Closed() bool
func (*ConsumerGroup) CommitUpto ¶
func (cg *ConsumerGroup) CommitUpto(message *sarama.ConsumerMessage) error
func (*ConsumerGroup) Errors ¶
func (cg *ConsumerGroup) Errors() <-chan error
Returns a channel that you can read to obtain events from Kafka to process.
func (*ConsumerGroup) FlushOffsets ¶
func (cg *ConsumerGroup) FlushOffsets() error
func (*ConsumerGroup) InstanceRegistered ¶
func (cg *ConsumerGroup) InstanceRegistered() (bool, error)
func (*ConsumerGroup) Logf ¶
func (cg *ConsumerGroup) Logf(format string, args ...interface{})
func (*ConsumerGroup) Messages ¶
func (cg *ConsumerGroup) Messages() <-chan *sarama.ConsumerMessage
Returns a channel that you can read to obtain events from Kafka to process.
type OffsetManager ¶
type OffsetManager interface {
// InitializePartition is called when the consumergroup is starting to consume a
// partition. It should return the last processed offset for this partition. Note:
// the same partition can be initialized multiple times during a single run of a
// consumer group due to other consumer instances coming online and offline.
InitializePartition(topic string, partition int32) (int64, error)
// MarkAsProcessed tells the offset manager than a certain message has been successfully
// processed by the consumer, and should be committed. The implementation does not have
// to store this offset right away, but should return true if it intends to do this at
// some point.
//
// Offsets should generally be increasing if the consumer
// processes events serially, but this cannot be guaranteed if the consumer does any
// asynchronous processing. This can be handled in various ways, e.g. by only accepting
// offsets that are higehr than the offsets seen before for the same partition.
MarkAsProcessed(topic string, partition int32, offset int64) bool
// Flush tells the offset manager to immediately commit offsets synchronously and to
// return any errors that may have occured during the process.
Flush() error
// FinalizePartition is called when the consumergroup is done consuming a
// partition. In this method, the offset manager can flush any remaining offsets to its
// backend store. It should return an error if it was not able to commit the offset.
// Note: it's possible that the consumergroup instance will start to consume the same
// partition again after this function is called.
FinalizePartition(topic string, partition int32, lastOffset int64, timeout time.Duration) error
// Close is called when the consumergroup is shutting down. In normal circumstances, all
// offsets are committed because FinalizePartition is called for all the running partition
// consumers. You may want to check for this to be true, and try to commit any outstanding
// offsets. If this doesn't succeed, it should return an error.
Close() error
}
OffsetManager is the main interface consumergroup requires to manage offsets of the consumergroup.
func NewZookeeperOffsetManager ¶
func NewZookeeperOffsetManager(cg *ConsumerGroup, config *OffsetManagerConfig) OffsetManager
NewZookeeperOffsetManager returns an offset manager that uses Zookeeper to store offsets.
type OffsetManagerConfig ¶
type OffsetManagerConfig struct {
CommitInterval time.Duration // Interval between offset flushes to the backend store.
VerboseLogging bool // Whether to enable verbose logging.
}
OffsetManagerConfig holds configuration setting son how the offset manager should behave.
func NewOffsetManagerConfig ¶
func NewOffsetManagerConfig() *OffsetManagerConfig
NewOffsetManagerConfig returns a new OffsetManagerConfig with sane defaults.