kafka_sarama

package module
v2.15.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 6, 2024 License: Apache-2.0 Imports: 14 Imported by: 45

Documentation

Overview

Package kafka_sarama implements a Kafka binding using github.com/IBM/sarama module

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func WithMessageKey added in v2.2.0

func WithMessageKey(ctx context.Context, key sarama.Encoder) context.Context

WithMessageKey allows to set the key used when sending the producer message

func WithSkipKeyMapping

func WithSkipKeyMapping(ctx context.Context) context.Context

func WriteProducerMessage

func WriteProducerMessage(ctx context.Context, m binding.Message, producerMessage *sarama.ProducerMessage, transformers ...binding.Transformer) error

WriteProducerMessage fills the provided producerMessage with the message m. Using context you can tweak the encoding processing (more details on binding.Write documentation). By default, this function implements the key mapping, trying to set the key of the message based on partitionKey extension. If you want to disable the Key Mapping, decorate the context with `WithSkipKeyMapping`

Types

type Consumer

type Consumer struct {
	Receiver
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(brokers []string, saramaConfig *sarama.Config, groupId string, topic string) (*Consumer, error)

func NewConsumerFromClient

func NewConsumerFromClient(client sarama.Client, groupId string, topic string) *Consumer

func (*Consumer) Close

func (c *Consumer) Close(ctx context.Context) error

func (*Consumer) OpenInbound

func (c *Consumer) OpenInbound(ctx context.Context) error

type Message

type Message struct {
	Value       []byte
	Headers     map[string][]byte
	ContentType string
	// contains filtered or unexported fields
}

Message holds a Kafka Message. This message *can* be read several times safely

func NewMessage

func NewMessage(value []byte, contentType string, headers map[string][]byte) *Message

NewMessage returns a binding.Message that holds the provided kafka message components. The returned binding.Message *can* be read several times safely This function *doesn't* guarantee that the returned binding.Message is always a kafka_sarama.Message instance

func NewMessageFromConsumerMessage

func NewMessageFromConsumerMessage(cm *sarama.ConsumerMessage) *Message

NewMessageFromConsumerMessage returns a binding.Message that holds the provided ConsumerMessage. The returned binding.Message *can* be read several times safely This function *doesn't* guarantee that the returned binding.Message is always a kafka_sarama.Message instance

func (*Message) Finish

func (m *Message) Finish(error) error

func (*Message) GetAttribute

func (m *Message) GetAttribute(k spec.Kind) (spec.Attribute, interface{})

func (*Message) GetExtension

func (m *Message) GetExtension(name string) interface{}

func (*Message) ReadBinary

func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) (err error)

func (*Message) ReadEncoding

func (m *Message) ReadEncoding() binding.Encoding

func (*Message) ReadStructured

func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error

type Protocol

type Protocol struct {
	// Kafka
	Client sarama.Client

	// Sender
	Sender *Sender

	// Sender options
	SenderContextDecorators []func(context.Context) context.Context

	// Consumer
	Consumer *Consumer
	// contains filtered or unexported fields
}

func NewProtocol

func NewProtocol(brokers []string, saramaConfig *sarama.Config, sendToTopic string, receiveFromTopic string, opts ...ProtocolOptionFunc) (*Protocol, error)

NewProtocol creates a new kafka transport.

func NewProtocolFromClient

func NewProtocolFromClient(client sarama.Client, sendToTopic string, receiveFromTopic string, opts ...ProtocolOptionFunc) (*Protocol, error)

NewProtocolFromClient creates a new kafka transport starting from a sarama.Client

func (*Protocol) Close

func (p *Protocol) Close(ctx context.Context) error

func (*Protocol) OpenInbound

func (p *Protocol) OpenInbound(ctx context.Context) error

OpenInbound implements Opener.OpenInbound NOTE: This is a blocking call.

func (*Protocol) Receive

func (p *Protocol) Receive(ctx context.Context) (binding.Message, error)

func (*Protocol) Send

func (p *Protocol) Send(ctx context.Context, in binding.Message, transformers ...binding.Transformer) error

type ProtocolOptionFunc

type ProtocolOptionFunc func(protocol *Protocol)

ProtocolOptionFunc is the type of kafka_sarama.Protocol options

func WithReceiverGroupId

func WithReceiverGroupId(groupId string) ProtocolOptionFunc

func WithSenderContextDecorators

func WithSenderContextDecorators(decorator func(context.Context) context.Context) ProtocolOptionFunc

type Receiver

type Receiver struct {
	// contains filtered or unexported fields
}

Receiver which implements sarama.ConsumerGroupHandler After the first invocation of Receiver.Receive(), the sarama.ConsumerGroup is created and started.

func NewReceiver

func NewReceiver() *Receiver

NewReceiver creates a Receiver which implements sarama.ConsumerGroupHandler The sarama.ConsumerGroup must be started invoking. If you need a Receiver which also manage the ConsumerGroup, use NewConsumer After the first invocation of Receiver.Receive(), the sarama.ConsumerGroup is created and started.

func (*Receiver) Cleanup

func (*Receiver) Close added in v2.4.0

func (r *Receiver) Close(context.Context) error

func (*Receiver) ConsumeClaim

func (r *Receiver) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). Also the method should return when `session.Context()` is done. Refer - https://github.com/Shopify/sarama/blob/5e2c2ef0e429f895c86152189f625bfdad7d3452/examples/consumergroup/main.go#L177

func (*Receiver) Receive

func (r *Receiver) Receive(ctx context.Context) (binding.Message, error)

func (*Receiver) Setup

type Sender

type Sender struct {
	// contains filtered or unexported fields
}

Sender implements binding.Sender that sends messages to a specific receiverTopic using sarama.SyncProducer

func NewSender

func NewSender(brokers []string, saramaConfig *sarama.Config, topic string, options ...SenderOptionFunc) (*Sender, error)

NewSender returns a binding.Sender that sends messages to a specific receiverTopic using sarama.SyncProducer

func NewSenderFromClient

func NewSenderFromClient(client sarama.Client, topic string, options ...SenderOptionFunc) (*Sender, error)

NewSenderFromClient returns a binding.Sender that sends messages to a specific receiverTopic using sarama.SyncProducer

func NewSenderFromSyncProducer added in v2.3.0

func NewSenderFromSyncProducer(topic string, syncProducer sarama.SyncProducer, options ...SenderOptionFunc) (*Sender, error)

NewSenderFromSyncProducer returns a binding.Sender that sends messages to a specific topic using sarama.SyncProducer

func (*Sender) Close

func (s *Sender) Close(ctx context.Context) error

func (*Sender) Send

func (s *Sender) Send(ctx context.Context, m binding.Message, transformers ...binding.Transformer) error

type SenderOptionFunc

type SenderOptionFunc func(sender *Sender)

SenderOptionFunc is the type of kafka_sarama.Sender options

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL