consumer

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 20, 2020 License: MIT Imports: 13 Imported by: 4

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewMockConsumer

func NewMockConsumer(topics *admin.Topics) *mockConsumer

func NewMockPartitionConsumer

func NewMockPartitionConsumer(topics *admin.Topics, offsets offsets.Manager) *mockPartitionConsumer

Types

type Builder

type Builder interface {
	Config() *Config
	Build(options ...BuilderOption) (Consumer, error)
}

func NewBuilder

func NewBuilder() Builder

func NewMockConsumerBuilder

func NewMockConsumerBuilder(topics *admin.Topics) Builder

type BuilderOption

type BuilderOption func(config *Config)

func BuilderWithGroupId

func BuilderWithGroupId(id string) BuilderOption

func BuilderWithId

func BuilderWithId(id string) BuilderOption

func BuilderWithLogger

func BuilderWithLogger(logger log.Logger) BuilderOption

func BuilderWithMetricsReporter

func BuilderWithMetricsReporter(reporter metrics.Reporter) BuilderOption

type Config

type Config struct {
	Id               string
	GroupId          string
	BootstrapServers []string
	MetricsReporter  metrics.Reporter
	Logger           log.Logger
	*sarama.Config
}

func NewConsumerConfig

func NewConsumerConfig() *Config

type Consumer

type Consumer interface {
	Consume(tps []string, handler ReBalanceHandler) (chan Partition, error)
	Errors() <-chan *Error
	Close() error
}

func NewConsumer

func NewConsumer(config *Config) (Consumer, error)

type Error

type Error struct {
	// contains filtered or unexported fields
}

func (*Error) Error

func (p *Error) Error() string

func (*Error) String

func (p *Error) String() string

type Event

type Event interface {
	String() string
}

type MockConsumerBuilder

type MockConsumerBuilder struct {
	Builder
	// contains filtered or unexported fields
}

func (*MockConsumerBuilder) Build

func (mb *MockConsumerBuilder) Build(options ...BuilderOption) (Consumer, error)

type MockPartitionConsumerBuilder

type MockPartitionConsumerBuilder struct {
	PartitionConsumerBuilder
	// contains filtered or unexported fields
}

func (*MockPartitionConsumerBuilder) Build

type Offset

type Offset int64
const (
	Earliest Offset = -2
	Latest   Offset = -1
)

func (Offset) String

func (o Offset) String() string

type Partition

type Partition interface {
	Records() <-chan *data.Record
	Partition() TopicPartition
	MarkOffset(offset int64)
	CommitOffset(*data.Record) error
}

type PartitionAllocated

type PartitionAllocated struct {
	// contains filtered or unexported fields
}

func (*PartitionAllocated) String

func (p *PartitionAllocated) String() string

func (*PartitionAllocated) TopicPartitions

func (p *PartitionAllocated) TopicPartitions() []TopicPartition

type PartitionConsumer

type PartitionConsumer interface {
	Consume(topic string, partition int32, offset Offset) (<-chan Event, error)
	Errors() <-chan *Error
	Close() error
	Id() string
}

func NewPartitionConsumer

func NewPartitionConsumer(c *Config) (PartitionConsumer, error)

type PartitionConsumerBuilder

type PartitionConsumerBuilder interface {
	Config() *Config
	Build(options ...BuilderOption) (PartitionConsumer, error)
}

func NewMockPartitionConsumerBuilder

func NewMockPartitionConsumerBuilder(topics *admin.Topics, offsets offsets.Manager) PartitionConsumerBuilder

func NewPartitionConsumerBuilder

func NewPartitionConsumerBuilder() PartitionConsumerBuilder

type PartitionEnd

type PartitionEnd struct {
	// contains filtered or unexported fields
}

func (*PartitionEnd) String

func (p *PartitionEnd) String() string

func (*PartitionEnd) TopicPartitions

func (p *PartitionEnd) TopicPartitions() []TopicPartition

type PartitionRemoved

type PartitionRemoved struct {
	// contains filtered or unexported fields
}

func (*PartitionRemoved) String

func (p *PartitionRemoved) String() string

func (*PartitionRemoved) TopicPartitions

func (p *PartitionRemoved) TopicPartitions() []TopicPartition

type ReBalanceHandler

type ReBalanceHandler interface {
	OnPartitionRevoked(ctx context.Context, revoked []TopicPartition) error
	OnPartitionAssigned(ctx context.Context, assigned []TopicPartition) error
}

type TopicPartition

type TopicPartition struct {
	Topic     string
	Partition int32
}

func (TopicPartition) String

func (tp TopicPartition) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL