Documentation
¶
Index ¶
- Constants
- type Admin
- type Assignment
- type ConsumerBuilder
- type ConsumerConfig
- type ConsumerOffset
- type ConsumerProvider
- type DeliveryReport
- type Error
- type Event
- type GroupConsumer
- type GroupConsumerBuilder
- type GroupConsumerConfig
- type GroupConsumerProvider
- type GroupConsumerStatus
- type GroupMeta
- type GroupSession
- type IsolationLevel
- type Offset
- type OffsetManager
- type OffsetManagerBuilder
- type OffsetManagerConfig
- type Partition
- type PartitionClaim
- type PartitionConf
- type PartitionConsumer
- type PartitionEnd
- type PartitionerFunc
- type PartitionerType
- type Producer
- type ProducerBuilder
- type ProducerConfig
- type ProducerErr
- type ProducerFactory
- type ProducerProvider
- type RebalanceHandler
- type Record
- type RecordContextBinderFunc
- type RecordHeader
- type RecordHeaders
- type RecordMeta
- type RequiredAcks
- type Topic
- type TopicConfig
- type TopicMeta
- type TopicPartition
- type TopicPartitions
- type TransactionalProducer
Constants ¶
View Source
const PartitionAny = -1
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Assignment ¶
type Assignment interface {
TPs() TopicPartitions
ResetOffset(tp TopicPartition, offset Offset)
}
type ConsumerBuilder ¶
type ConsumerBuilder func(func(config *ConsumerConfig)) (PartitionConsumer, error)
type ConsumerConfig ¶
type ConsumerConfig struct {
Id string
BootstrapServers []string
IsolationLevel IsolationLevel
TopicMetaFetchTimeout time.Duration
EOSEnabled bool
MaxPollInterval time.Duration
ConsumerMessageChanSize int
Logger log.Logger
MetricsReporter metrics.Reporter
ContextExtractor RecordContextBinderFunc
}
func NewPartitionConsumerConfig ¶
func NewPartitionConsumerConfig() *ConsumerConfig
func (*ConsumerConfig) Copy ¶
func (conf *ConsumerConfig) Copy() *ConsumerConfig
type ConsumerOffset ¶
func (*ConsumerOffset) String ¶
func (off *ConsumerOffset) String() string
type ConsumerProvider ¶
type ConsumerProvider interface {
NewBuilder(config *ConsumerConfig) ConsumerBuilder
}
type DeliveryReport ¶
type GroupConsumer ¶
type GroupConsumer interface {
// Subscribe subscribes to a list of topic with a user provided RebalanceHandler
Subscribe(tps []string, handler RebalanceHandler) error
// Unsubscribe signals the consumer to unsubscribe from group
Unsubscribe() error
Errors() <-chan error
}
GroupConsumer is a wrapper for a kafka group consumer adaptor.
type GroupConsumerBuilder ¶
type GroupConsumerBuilder func(func(config *GroupConsumerConfig)) (GroupConsumer, error)
type GroupConsumerConfig ¶
type GroupConsumerConfig struct {
*ConsumerConfig
GroupId string
Offsets struct {
Initial Offset
Commit struct {
Auto bool
Interval time.Duration
}
}
}
func NewConfig ¶
func NewConfig() *GroupConsumerConfig
func (*GroupConsumerConfig) Copy ¶
func (conf *GroupConsumerConfig) Copy() *GroupConsumerConfig
type GroupConsumerProvider ¶
type GroupConsumerProvider interface {
NewBuilder(config *GroupConsumerConfig) GroupConsumerBuilder
}
type GroupConsumerStatus ¶
type GroupConsumerStatus string
const ( ConsumerPending GroupConsumerStatus = `Pending` ConsumerRebalancing GroupConsumerStatus = `Rebalancing` ConsumerReady GroupConsumerStatus = `Ready` )
type GroupMeta ¶
type GroupMeta struct {
Meta interface{}
}
GroupMeta wraps consumer group metadata used in transactional producer commits.
type GroupSession ¶
type IsolationLevel ¶
type IsolationLevel int8
const ( ReadUncommitted IsolationLevel = iota ReadCommitted )
type OffsetManager ¶
type OffsetManagerBuilder ¶
type OffsetManagerBuilder func(func(config *OffsetManagerConfig)) (OffsetManager, error)
type OffsetManagerConfig ¶
type OffsetManagerConfig struct {
Id string
BootstrapServers []string
Logger log.Logger
MetricsReporter metrics.Reporter
}
func NewOffsetManagerConfig ¶
func NewOffsetManagerConfig() *OffsetManagerConfig
type PartitionClaim ¶
type PartitionClaim interface {
TopicPartition() TopicPartition
Records() <-chan Record
}
type PartitionConf ¶
type PartitionConsumer ¶
type PartitionConsumer interface {
ConsumeTopic(ctx context.Context, topic string, offset Offset) (map[int32]Partition, error)
Partitions(ctx context.Context, topic string) ([]int32, error)
ConsumePartition(ctx context.Context, topic string, partition int32, offset Offset) (Partition, error)
OffsetValid(topic string, partition int32, offset int64) (isValid bool, err error)
GetOffsetLatest(topic string, partition int32) (offset int64, err error)
GetOffsetOldest(topic string, partition int32) (offset int64, err error)
Close() error
}
type PartitionEnd ¶
type PartitionEnd struct {
Tps []TopicPartition
}
func (*PartitionEnd) String ¶
func (p *PartitionEnd) String() string
func (*PartitionEnd) TopicPartitions ¶
func (p *PartitionEnd) TopicPartitions() []TopicPartition
type PartitionerType ¶
type PartitionerType string
type ProducerBuilder ¶
type ProducerBuilder func(conf func(*ProducerConfig)) (Producer, error)
type ProducerConfig ¶
type ProducerConfig struct {
Id string
BootstrapServers []string
PartitionerFunc PartitionerFunc
Acks RequiredAcks
Transactional struct {
Enabled bool
Id string
}
Idempotent bool
Logger log.Logger
MetricsReporter metrics.Reporter
}
func NewProducerConfig ¶
func NewProducerConfig() *ProducerConfig
func (*ProducerConfig) Copy ¶
func (conf *ProducerConfig) Copy() *ProducerConfig
type ProducerErr ¶
type ProducerFactory ¶
type ProducerFactory interface {
NewBuilder(config *GroupConsumerConfig) GroupConsumerBuilder
}
type ProducerProvider ¶
type ProducerProvider interface {
NewBuilder(config *ProducerConfig) ProducerBuilder
}
type RebalanceHandler ¶
type RebalanceHandler interface {
OnPartitionRevoked(ctx context.Context, session GroupSession) error
OnPartitionAssigned(ctx context.Context, session GroupSession) error
OnLost() error
Consume(ctx context.Context, session GroupSession, partition PartitionClaim) error
}
type RecordContextBinderFunc ¶
type RecordHeader ¶
RecordHeader stores key and value for a record header.
type RecordHeaders ¶
type RecordHeaders []RecordHeader
RecordHeaders are list of key:value pairs.
func (RecordHeaders) Read ¶
func (h RecordHeaders) Read(key []byte) []byte
Read returns a RecordHeader by its name or nil if not exist
type RecordMeta ¶
type RequiredAcks ¶
type RequiredAcks int
const ( // NoResponse doesn't send any response, the TCP ACK is all you get. NoResponse RequiredAcks = 0 // WaitForLeader waits for only the local commit to succeed before responding. WaitForLeader RequiredAcks = 1 // WaitForAll waits for all in-sync replicas to commit before responding. // The minimum number of in-sync replicas is configured on the broker via // the `min.insync.replicas` configuration key. WaitForAll RequiredAcks = -1 )
func (RequiredAcks) String ¶
func (ack RequiredAcks) String() string
type TopicConfig ¶
type TopicMeta ¶
type TopicMeta []TopicPartition
type TopicPartition ¶
TopicPartition represents a kafka topic partition.
func (TopicPartition) String ¶
func (tp TopicPartition) String() string
type TopicPartitions ¶
type TopicPartitions []TopicPartition
func (TopicPartitions) Less ¶
func (list TopicPartitions) Less(i, j int) bool
Less is part of sort.Interface.
func (TopicPartitions) Swap ¶
func (list TopicPartitions) Swap(i, j int)
Swap is part of sort.Interface.
type TransactionalProducer ¶
type TransactionalProducer interface {
Producer
ProduceAsync(ctx context.Context, record Record) (err error)
InitTransactions(ctx context.Context) error
BeginTransaction() error
SendOffsetsToTransaction(ctx context.Context, offsets []ConsumerOffset, meta *GroupMeta) error
CommitTransaction(ctx context.Context) error
AbortTransaction(ctx context.Context) error
}
Source Files
¶
Click to show internal directories.
Click to hide internal directories.