kafka

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 12, 2018 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

View Source
const UUIDHeaderKey = "_watermill_message_uuid"

Variables

View Source
var (
	ErrClosingConsumer = errors.New("closing subscriber")
	ErrNackReceived    = errors.New("closing subscriber")
)

Functions

func DefaultConfluentConsumerConstructor

func DefaultConfluentConsumerConstructor(config SubscriberConfig) (*kafka.Consumer, error)

func NewConfluentSubscriber

func NewConfluentSubscriber(
	config SubscriberConfig,
	unmarshaler Unmarshaler,
	logger watermill.LoggerAdapter,
) (message.Subscriber, error)

func NewCustomConfluentSubscriber

func NewCustomConfluentSubscriber(
	config SubscriberConfig,
	unmarshaler Unmarshaler,
	consumerConstructor ConfluentConsumerConstructor,
	logger watermill.LoggerAdapter,
) (message.Subscriber, error)

func NewCustomPublisher

func NewCustomPublisher(producer *kafka.Producer, marshaler Marshaler) (message.Publisher, error)

func NewPublisher

func NewPublisher(brokers []string, marshaler Marshaler, kafkaConfigOverwrite kafka.ConfigMap) (message.Publisher, error)

Types

type ConfluentConsumerConstructor

type ConfluentConsumerConstructor func(config SubscriberConfig) (*kafka.Consumer, error)

type DefaultMarshaler

type DefaultMarshaler struct{}

func (DefaultMarshaler) Marshal

func (DefaultMarshaler) Unmarshal

func (DefaultMarshaler) Unmarshal(kafkaMsg *confluentKafka.Message) (*message.Message, error)

type GeneratePartitionKey

type GeneratePartitionKey func(topic string, msg *message.Message) (string, error)

type Marshaler

type Marshaler interface {
	Marshal(topic string, msg *message.Message) (*confluentKafka.Message, error)
}

type MarshalerUnmarshaler

type MarshalerUnmarshaler interface {
	Marshaler
	Unmarshaler
}

func NewWithPartitioningMarshaler

func NewWithPartitioningMarshaler(generatePartitionKey GeneratePartitionKey) MarshalerUnmarshaler

type SubscriberConfig

type SubscriberConfig struct {
	Brokers []string

	ConsumerGroup   string
	NoConsumerGroup bool

	AutoOffsetReset string

	ConsumersCount int

	KafkaConfigOverwrite kafka.ConfigMap
}

func (SubscriberConfig) Validate

func (c SubscriberConfig) Validate() error

type Unmarshaler

type Unmarshaler interface {
	Unmarshal(*confluentKafka.Message) (*message.Message, error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL