Documentation ¶
Index ¶
- Constants
- func NewAdmin(bootstrapServer []string, options ...AdminOption) *kAdmin
- func NewConsumerProvider(config *ConsumerConfig) kafka.ConsumerProvider
- func NewGroupConsumer(config *GroupConsumerConfig) (kafka.GroupConsumer, error)
- func NewGroupConsumerProvider(config *GroupConsumerConfig) kafka.GroupConsumerProvider
- func NewPartitionConsumer(configs *ConsumerConfig) (kafka.PartitionConsumer, error)
- func NewProducer(configs *ProducerConfig) (kafka.Producer, error)
- func NewProducerProvider(config *ProducerConfig) kafka.ProducerProvider
- type AdminOption
- type ConsumerConfig
- type Err
- type GroupConsumerConfig
- type ProducerConfig
- type Record
- func (r *Record) Ctx() context.Context
- func (r *Record) Headers() kafka.RecordHeaders
- func (r *Record) Key() []byte
- func (r *Record) Offset() int64
- func (r *Record) Partition() int32
- func (r *Record) String() string
- func (r *Record) Timestamp() time.Time
- func (r *Record) Topic() string
- func (r *Record) Value() []byte
Constants ¶
View Source
const ( PartitionerRandom kafka.PartitionerType = `random` PartitionerCRC32 kafka.PartitionerType = `consistent` PartitionerCRC32Random kafka.PartitionerType = `consistent_random` PartitionerConsistentMurmur2 kafka.PartitionerType = `murmur2` PartitionerConsistentMurmur2Random kafka.PartitionerType = `murmur2_random` PartitionerConsistentFNV1a kafka.PartitionerType = `fnv1a` PartitionerConsistentFNV1aRandom kafka.PartitionerType = `fnv1a_random` )
Variables ¶
This section is empty.
Functions ¶
func NewAdmin ¶
func NewAdmin(bootstrapServer []string, options ...AdminOption) *kAdmin
func NewConsumerProvider ¶
func NewConsumerProvider(config *ConsumerConfig) kafka.ConsumerProvider
func NewGroupConsumer ¶
func NewGroupConsumer(config *GroupConsumerConfig) (kafka.GroupConsumer, error)
func NewGroupConsumerProvider ¶
func NewGroupConsumerProvider(config *GroupConsumerConfig) kafka.GroupConsumerProvider
func NewPartitionConsumer ¶
func NewPartitionConsumer(configs *ConsumerConfig) (kafka.PartitionConsumer, error)
func NewProducer ¶
func NewProducer(configs *ProducerConfig) (kafka.Producer, error)
func NewProducerProvider ¶
func NewProducerProvider(config *ProducerConfig) kafka.ProducerProvider
Types ¶
type AdminOption ¶
type AdminOption func(*adminOptions)
func WithLogger ¶
func WithLogger(logger log.Logger) AdminOption
func WithTimeout ¶
func WithTimeout(duration time.Duration) AdminOption
type ConsumerConfig ¶
type ConsumerConfig struct { *kafka.ConsumerConfig Librd *librdKafka.ConfigMap }
func NewConsumerConfig ¶
func NewConsumerConfig() *ConsumerConfig
type Err ¶
type Err struct {
// contains filtered or unexported fields
}
func (Err) RequiresRestart ¶
type GroupConsumerConfig ¶
type GroupConsumerConfig struct { *kafka.GroupConsumerConfig Librd *librdKafka.ConfigMap }
func NewGroupConsumerConfig ¶
func NewGroupConsumerConfig() *GroupConsumerConfig
type ProducerConfig ¶
type ProducerConfig struct { Librd *librdKafka.ConfigMap *kafka.ProducerConfig }
func NewProducerConfig ¶
func NewProducerConfig() *ProducerConfig
Click to show internal directories.
Click to hide internal directories.