consumer

package
Version: v1.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 16, 2021 License: MIT Imports: 13 Imported by: 3

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewMockConsumer

func NewMockConsumer(topics *admin.Topics) *mockConsumer

func NewMockPartitionConsumer

func NewMockPartitionConsumer(topics *admin.Topics, offsets offsets.Manager) *mockPartitionConsumer

Types

type Builder

type Builder interface {
	Config() *Config
	Build(options ...BuilderOption) (Consumer, error)
}

func NewBuilder

func NewBuilder() Builder

func NewMockConsumerBuilder

func NewMockConsumerBuilder(topics *admin.Topics) Builder

type BuilderOption

type BuilderOption func(config *Config)

func BuilderWithGroupId

func BuilderWithGroupId(id string) BuilderOption

func BuilderWithId

func BuilderWithId(id string) BuilderOption

func BuilderWithLogger

func BuilderWithLogger(logger log.Logger) BuilderOption

func BuilderWithMetricsReporter

func BuilderWithMetricsReporter(reporter metrics.Reporter) BuilderOption

func BuilderWithOptions added in v1.2.0

func BuilderWithOptions(options ...Option) BuilderOption

type Config

type Config struct {
	Id               string
	GroupId          string
	BootstrapServers []string
	MetricsReporter  metrics.Reporter
	Logger           log.Logger

	*sarama.Config
	// contains filtered or unexported fields
}

func NewConsumerConfig

func NewConsumerConfig() *Config

type Consumer

type Consumer interface {
	Consume(tps []string, handler ReBalanceHandler) (chan Partition, error)
	Errors() <-chan *Error
	Close() error
}

func NewConsumer

func NewConsumer(config *Config, options ...Option) (Consumer, error)

type Error

type Error struct {
	// contains filtered or unexported fields
}

func (*Error) Error

func (p *Error) Error() string

func (*Error) String

func (p *Error) String() string

type Event

type Event interface {
	String() string
}

type MockConsumerBuilder

type MockConsumerBuilder struct {
	Builder
	// contains filtered or unexported fields
}

func (*MockConsumerBuilder) Build

func (mb *MockConsumerBuilder) Build(options ...BuilderOption) (Consumer, error)

type MockPartitionConsumerBuilder

type MockPartitionConsumerBuilder struct {
	PartitionConsumerBuilder
	// contains filtered or unexported fields
}

func (*MockPartitionConsumerBuilder) Build

type Offset

type Offset int64
const (
	Earliest Offset = -2
	Latest   Offset = -1
)

func (Offset) String

func (o Offset) String() string

type Option added in v1.2.0

type Option func(*consumerOptions)

func WithRecordUuidExtractFunc added in v1.2.0

func WithRecordUuidExtractFunc(fn RecordUuidExtractFunc) Option

type Partition

type Partition interface {
	Records() <-chan *data.Record
	Partition() TopicPartition
	MarkOffset(offset int64)
	CommitOffset(*data.Record) error
}

type PartitionAllocated

type PartitionAllocated struct {
	// contains filtered or unexported fields
}

func (*PartitionAllocated) String

func (p *PartitionAllocated) String() string

func (*PartitionAllocated) TopicPartitions

func (p *PartitionAllocated) TopicPartitions() []TopicPartition

type PartitionConsumer

type PartitionConsumer interface {
	Consume(topic string, partition int32, offset Offset) (<-chan Event, error)
	Errors() <-chan *Error
	Close() error
	Id() string
}

func NewPartitionConsumer

func NewPartitionConsumer(c *Config) (PartitionConsumer, error)

type PartitionConsumerBuilder

type PartitionConsumerBuilder interface {
	Config() *Config
	Build(options ...BuilderOption) (PartitionConsumer, error)
}

func NewMockPartitionConsumerBuilder

func NewMockPartitionConsumerBuilder(topics *admin.Topics, offsets offsets.Manager) PartitionConsumerBuilder

func NewPartitionConsumerBuilder

func NewPartitionConsumerBuilder() PartitionConsumerBuilder

type PartitionEnd

type PartitionEnd struct {
	// contains filtered or unexported fields
}

func (*PartitionEnd) String

func (p *PartitionEnd) String() string

func (*PartitionEnd) TopicPartitions

func (p *PartitionEnd) TopicPartitions() []TopicPartition

type PartitionRemoved

type PartitionRemoved struct {
	// contains filtered or unexported fields
}

func (*PartitionRemoved) String

func (p *PartitionRemoved) String() string

func (*PartitionRemoved) TopicPartitions

func (p *PartitionRemoved) TopicPartitions() []TopicPartition

type ReBalanceHandler

type ReBalanceHandler interface {
	OnPartitionRevoked(ctx context.Context, revoked []TopicPartition) error
	OnPartitionAssigned(ctx context.Context, assigned []TopicPartition) error
}

type RecordUuidExtractFunc added in v1.2.0

type RecordUuidExtractFunc func(message *data.Record) uuid.UUID

type TopicPartition

type TopicPartition struct {
	Topic     string
	Partition int32
}

func (TopicPartition) String

func (tp TopicPartition) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL