consumer

package
v0.0.0-...-1f3a96c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 25, 2015 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type FetcherState

type FetcherState struct {
	LastCommitted int64
	Removed       bool
	// contains filtered or unexported fields
}

func NewFetcherState

func NewFetcherState(initialOffset int64) *FetcherState

func (*FetcherState) GetOffset

func (this *FetcherState) GetOffset() int64

func (*FetcherState) GetStopChannel

func (this *FetcherState) GetStopChannel() chan<- bool

func (*FetcherState) SetOffset

func (this *FetcherState) SetOffset(offset int64)

type PartitionConsumer

type PartitionConsumer struct {
	// contains filtered or unexported fields
}

func NewPartitionConsumer

func NewPartitionConsumer(consumerConfig PartitionConsumerConfig) *PartitionConsumer

func (*PartitionConsumer) Add

func (this *PartitionConsumer) Add(topic string, partition int32, strategy Strategy) error

func (*PartitionConsumer) GetTopicPartitions

func (this *PartitionConsumer) GetTopicPartitions() *TopicAndPartitionSet

func (*PartitionConsumer) Remove

func (this *PartitionConsumer) Remove(topic string, partition int32)

type PartitionConsumerConfig

type PartitionConsumerConfig struct {
	// Consumer group
	Group string

	//Interval to commit offsets at
	CommitInterval time.Duration

	// BrokerList is a bootstrap list to discover other brokers in a cluster. At least one broker is required.
	BrokerList []string

	// ReadTimeout is a timeout to read the response from a TCP socket.
	ReadTimeout time.Duration

	// WriteTimeout is a timeout to write the request to a TCP socket.
	WriteTimeout time.Duration

	// ConnectTimeout is a timeout to connect to a TCP socket.
	ConnectTimeout time.Duration

	// Sets whether the connection should be kept alive.
	KeepAlive bool

	// A keep alive period for a TCP connection.
	KeepAliveTimeout time.Duration

	// Maximum number of open connections for a connector.
	MaxConnections int

	// Maximum number of open connections for a single broker for a connector.
	MaxConnectionsPerBroker int

	// Maximum fetch size in bytes which will be used in all Consume() calls.
	FetchSize int32

	// The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will block
	FetchMinBytes int32

	// The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy FetchMinBytes
	FetchMaxWaitTime int32

	// Number of retries to get topic metadata.
	MetadataRetries int

	// Backoff value between topic metadata requests.
	MetadataBackoff time.Duration

	// Number of retries to commit an offset.
	CommitOffsetRetries int

	// Backoff value between commit offset requests.
	CommitOffsetBackoff time.Duration

	// Number of retries to get consumer metadata.
	ConsumerMetadataRetries int

	// Backoff value between consumer metadata requests.
	ConsumerMetadataBackoff time.Duration

	// ClientID that will be used by a connector to identify client requests by broker.
	ClientID string

	// Backoff value between fetches in case of errors
	FetchErrorBackoff time.Duration
}

func NewPartitionConsumerConfig

func NewPartitionConsumerConfig(group string) *PartitionConsumerConfig

type Strategy

type Strategy func(topic string, partition int32, messages []*siesta.MessageAndOffset) error

type TopicAndPartition

type TopicAndPartition struct {
	Topic     string
	Partition int32
}

type TopicAndPartitionSet

type TopicAndPartitionSet struct {
	// contains filtered or unexported fields
}

func NewTopicAndPartitionSet

func NewTopicAndPartitionSet() *TopicAndPartitionSet

func (*TopicAndPartitionSet) Add

func (*TopicAndPartitionSet) AddAll

func (this *TopicAndPartitionSet) AddAll(tps []TopicAndPartition)

func (*TopicAndPartitionSet) Contains

func (this *TopicAndPartitionSet) Contains(tp TopicAndPartition) bool

func (*TopicAndPartitionSet) ContainsAll

func (this *TopicAndPartitionSet) ContainsAll(tps []TopicAndPartition) bool

func (*TopicAndPartitionSet) GetArray

func (this *TopicAndPartitionSet) GetArray() []TopicAndPartition

func (*TopicAndPartitionSet) IsEmpty

func (this *TopicAndPartitionSet) IsEmpty() bool

func (*TopicAndPartitionSet) Remove

func (this *TopicAndPartitionSet) Remove(tp TopicAndPartition) bool

func (*TopicAndPartitionSet) RemoveAll

func (this *TopicAndPartitionSet) RemoveAll(tps []TopicAndPartition)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL