Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	AlreadyClosing = errors.New("The consumer group is already shutting down.")
)
View Source
var (
	UncleanClose = errors.New("Not all offsets were committed before shutdown was completed")
)

Functions

This section is empty.

Types

type Config

type Config struct {
	*sarama.Config

	Zookeeper *kazoo.Config

	Offsets struct {
		Initial           int64         // The initial offset method to use if the consumer has no previously stored offset. Must be either sarama.OffsetOldest (default) or sarama.OffsetNewest.
		ProcessingTimeout time.Duration // Time to wait for all the offsets for a partition to be processed after stopping to consume from it. Defaults to 1 minute.
		CommitInterval    time.Duration // The interval between which the processed offsets are commited.
		ResetOffsets      bool          // Resets the offsets for the consumergroup so that it won't resume from where it left off previously.
	}
}

func NewConfig

func NewConfig() *Config

func (*Config) Validate

func (cgc *Config) Validate() error

type ConsumerGroup

type ConsumerGroup struct {
	// contains filtered or unexported fields
}

    The ConsumerGroup type holds all the information for a consumer that is part of a consumer group. Call JoinConsumerGroup to start a consumer.

    Example
    Output:
    
    

    func JoinConsumerGroup

    func JoinConsumerGroup(name string, topics []string, zookeeper []string, config *Config) (cg *ConsumerGroup, err error)

      Connects to a consumer group, using Zookeeper for auto-discovery

      func (*ConsumerGroup) Close

      func (cg *ConsumerGroup) Close() error

      func (*ConsumerGroup) Closed

      func (cg *ConsumerGroup) Closed() bool

      func (*ConsumerGroup) CommitUpto

      func (cg *ConsumerGroup) CommitUpto(message *sarama.ConsumerMessage) error

      func (*ConsumerGroup) Errors

      func (cg *ConsumerGroup) Errors() <-chan error

        Returns a channel that you can read to obtain events from Kafka to process.

        func (*ConsumerGroup) FlushOffsets

        func (cg *ConsumerGroup) FlushOffsets() error

        func (*ConsumerGroup) InstanceRegistered

        func (cg *ConsumerGroup) InstanceRegistered() (bool, error)

        func (*ConsumerGroup) Logf

        func (cg *ConsumerGroup) Logf(format string, args ...interface{})

        func (*ConsumerGroup) Messages

        func (cg *ConsumerGroup) Messages() <-chan *sarama.ConsumerMessage

          Returns a channel that you can read to obtain events from Kafka to process.

          type OffsetManager

          type OffsetManager interface {
          
          	// InitializePartition is called when the consumergroup is starting to consume a
          	// partition. It should return the last processed offset for this partition. Note:
          	// the same partition can be initialized multiple times during a single run of a
          	// consumer group due to other consumer instances coming online and offline.
          	InitializePartition(topic string, partition int32) (int64, error)
          
          	// MarkAsProcessed tells the offset manager than a certain message has been successfully
          	// processed by the consumer, and should be committed. The implementation does not have
          	// to store this offset right away, but should return true if it intends to do this at
          	// some point.
          	//
          	// Offsets should generally be increasing if the consumer
          	// processes events serially, but this cannot be guaranteed if the consumer does any
          	// asynchronous processing. This can be handled in various ways, e.g. by only accepting
          	// offsets that are higehr than the offsets seen before for the same partition.
          	MarkAsProcessed(topic string, partition int32, offset int64) bool
          
          	// Flush tells the offset manager to immediately commit offsets synchronously and to
          	// return any errors that may have occured during the process.
          	Flush() error
          
          	// FinalizePartition is called when the consumergroup is done consuming a
          	// partition. In this method, the offset manager can flush any remaining offsets to its
          	// backend store. It should return an error if it was not able to commit the offset.
          	// Note: it's possible that the consumergroup instance will start to consume the same
          	// partition again after this function is called.
          	FinalizePartition(topic string, partition int32, lastOffset int64, timeout time.Duration) error
          
          	// Close is called when the consumergroup is shutting down. In normal circumstances, all
          	// offsets are committed because FinalizePartition is called for all the running partition
          	// consumers. You may want to check for this to be true, and try to commit any outstanding
          	// offsets. If this doesn't succeed, it should return an error.
          	Close() error
          }

            OffsetManager is the main interface consumergroup requires to manage offsets of the consumergroup.

            func NewZookeeperOffsetManager

            func NewZookeeperOffsetManager(cg *ConsumerGroup, config *OffsetManagerConfig) OffsetManager

              NewZookeeperOffsetManager returns an offset manager that uses Zookeeper to store offsets.

              type OffsetManagerConfig

              type OffsetManagerConfig struct {
              	CommitInterval time.Duration // Interval between offset flushes to the backend store.
              	VerboseLogging bool          // Whether to enable verbose logging.
              }

                OffsetManagerConfig holds configuration setting son how the offset manager should behave.

                func NewOffsetManagerConfig

                func NewOffsetManagerConfig() *OffsetManagerConfig

                  NewOffsetManagerConfig returns a new OffsetManagerConfig with sane defaults.