kafka_sarama

package
v2.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 16, 2020 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func WriteProducerMessage

func WriteProducerMessage(ctx context.Context, m binding.Message, producerMessage *sarama.ProducerMessage, transformerFactories ...binding.TransformerFactory) error

Fill the provided producerMessage with the message m. Using context you can tweak the encoding processing (more details on binding.Write documentation).

Types

type Consumer

type Consumer struct {
	Receiver
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(brokers []string, saramaConfig *sarama.Config, groupId string, topic string) (*Consumer, error)

func NewConsumerFromClient

func NewConsumerFromClient(client sarama.Client, groupId string, topic string) *Consumer

func (*Consumer) Close

func (c *Consumer) Close(ctx context.Context) error

func (*Consumer) OpenInbound

func (c *Consumer) OpenInbound(ctx context.Context) (err error)

type Message

type Message struct {
	Key, Value  []byte
	Headers     map[string][]byte
	ContentType string
	// contains filtered or unexported fields
}

Message holds a Kafka Message. This message *can* be read several times safely

func NewMessage

func NewMessage(key []byte, value []byte, contentType string, headers map[string][]byte) *Message

Returns a binding.Message that holds the provided kafka message components. The returned binding.Message *can* be read several times safely This function *doesn't* guarantee that the returned binding.Message is always a kafka_sarama.Message instance

func NewMessageFromConsumerMessage

func NewMessageFromConsumerMessage(cm *sarama.ConsumerMessage) *Message

Returns a binding.Message that holds the provided ConsumerMessage. The returned binding.Message *can* be read several times safely This function *doesn't* guarantee that the returned binding.Message is always a kafka_sarama.Message instance

func (*Message) Finish

func (m *Message) Finish(error) error

func (*Message) ReadBinary

func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) error

func (*Message) ReadEncoding

func (m *Message) ReadEncoding() binding.Encoding

func (*Message) ReadStructured

func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error

type Protocol

type Protocol struct {
	// Kafka
	Client sarama.Client

	// Sender
	Sender *Sender

	// Sender options
	SenderContextDecorators []func(context.Context) context.Context

	// Consumer
	Consumer *Consumer
	// contains filtered or unexported fields
}

func NewProtocol

func NewProtocol(brokers []string, saramaConfig *sarama.Config, sendToTopic string, receiveFromTopic string, opts ...ProtocolOptionFunc) (*Protocol, error)

NewProtocol creates a new kafka transport.

func NewProtocolFromClient

func NewProtocolFromClient(client sarama.Client, sendToTopic string, receiveFromTopic string, opts ...ProtocolOptionFunc) (*Protocol, error)

NewProtocolFromClient creates a new kafka transport starting from a sarama.Client

func (*Protocol) Close

func (p *Protocol) Close(ctx context.Context) error

func (*Protocol) HasTracePropagation

func (p *Protocol) HasTracePropagation() bool

HasTracePropagation implements Protocol.HasTracePropagation

func (*Protocol) OpenInbound

func (p *Protocol) OpenInbound(ctx context.Context) error

StartReceiver implements Protocol.StartReceiver NOTE: This is a blocking call.

func (*Protocol) Receive

func (p *Protocol) Receive(ctx context.Context) (binding.Message, error)

func (*Protocol) Send

func (p *Protocol) Send(ctx context.Context, in binding.Message) error

type ProtocolOptionFunc

type ProtocolOptionFunc func(protocol *Protocol)

kafka_sarama.Protocol options

func WithReceiverGroupId

func WithReceiverGroupId(groupId string) ProtocolOptionFunc

func WithSenderContextDecorators

func WithSenderContextDecorators(decorator func(context.Context) context.Context) ProtocolOptionFunc

type Receiver

type Receiver struct {
	// contains filtered or unexported fields
}

Receiver which implements sarama.ConsumerGroupHandler After the first invocation of Receiver.Receive(), the sarama.ConsumerGroup is created and started.

func NewReceiver

func NewReceiver() *Receiver

NewReceiver creates a Receiver which implements sarama.ConsumerGroupHandler The sarama.ConsumerGroup must be started invoking. If you need a Receiver which also manage the ConsumerGroup, use NewConsumer After the first invocation of Receiver.Receive(), the sarama.ConsumerGroup is created and started.

func (*Receiver) Cleanup

func (*Receiver) ConsumeClaim

func (r *Receiver) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

func (*Receiver) Receive

func (r *Receiver) Receive(ctx context.Context) (binding.Message, error)

func (*Receiver) Setup

func (r *Receiver) Setup(sess sarama.ConsumerGroupSession) error

type Sender

type Sender struct {
	// contains filtered or unexported fields
}

Sender implements binding.Sender that sends messages to a specific receiverTopic using sarama.SyncProducer

func NewSender

func NewSender(brokers []string, saramaConfig *sarama.Config, topic string, options ...SenderOptionFunc) (*Sender, error)

Returns a binding.Sender that sends messages to a specific receiverTopic using sarama.SyncProducer

func NewSenderFromClient

func NewSenderFromClient(client sarama.Client, topic string, options ...SenderOptionFunc) (*Sender, error)

Returns a binding.Sender that sends messages to a specific receiverTopic using sarama.SyncProducer

func (*Sender) Close

func (s *Sender) Close(ctx context.Context) error

func (*Sender) Send

func (s *Sender) Send(ctx context.Context, m binding.Message) error

type SenderOptionFunc

type SenderOptionFunc func(sender *Sender)

kafka_sarama.Sender options

func WithTransformer

func WithTransformer(transformer binding.TransformerFactory) SenderOptionFunc

Add a transformer, which Sender uses while encoding a binding.Message to a sarama.ProducerMessage

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL