Documentation ¶
Overview ¶
Package kafka_sarama implements a Kafka binding using github.com/IBM/sarama module
Index ¶
- func WithMessageKey(ctx context.Context, key sarama.Encoder) context.Context
- func WithSkipKeyMapping(ctx context.Context) context.Context
- func WriteProducerMessage(ctx context.Context, m binding.Message, ...) error
- type Consumer
- type Message
- func (m *Message) Finish(error) error
- func (m *Message) GetAttribute(k spec.Kind) (spec.Attribute, interface{})
- func (m *Message) GetExtension(name string) interface{}
- func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) (err error)
- func (m *Message) ReadEncoding() binding.Encoding
- func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error
- type Protocol
- type ProtocolOptionFunc
- type Receiver
- func (r *Receiver) Cleanup(sarama.ConsumerGroupSession) error
- func (r *Receiver) Close(context.Context) error
- func (r *Receiver) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (r *Receiver) Receive(ctx context.Context) (binding.Message, error)
- func (r *Receiver) Setup(sarama.ConsumerGroupSession) error
- type Sender
- func NewSender(brokers []string, saramaConfig *sarama.Config, topic string, ...) (*Sender, error)
- func NewSenderFromClient(client sarama.Client, topic string, options ...SenderOptionFunc) (*Sender, error)
- func NewSenderFromSyncProducer(topic string, syncProducer sarama.SyncProducer, options ...SenderOptionFunc) (*Sender, error)
- type SenderOptionFunc
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func WithMessageKey ¶ added in v2.2.0
WithMessageKey allows to set the key used when sending the producer message
func WriteProducerMessage ¶
func WriteProducerMessage(ctx context.Context, m binding.Message, producerMessage *sarama.ProducerMessage, transformers ...binding.Transformer) error
WriteProducerMessage fills the provided producerMessage with the message m. Using context you can tweak the encoding processing (more details on binding.Write documentation). By default, this function implements the key mapping, trying to set the key of the message based on partitionKey extension. If you want to disable the Key Mapping, decorate the context with `WithSkipKeyMapping`
Types ¶
type Consumer ¶
type Consumer struct { Receiver // contains filtered or unexported fields }
func NewConsumer ¶
func NewConsumerFromClient ¶
type Message ¶
type Message struct { Value []byte Headers map[string][]byte ContentType string // contains filtered or unexported fields }
Message holds a Kafka Message. This message *can* be read several times safely
func NewMessage ¶
NewMessage returns a binding.Message that holds the provided kafka message components. The returned binding.Message *can* be read several times safely This function *doesn't* guarantee that the returned binding.Message is always a kafka_sarama.Message instance
func NewMessageFromConsumerMessage ¶
func NewMessageFromConsumerMessage(cm *sarama.ConsumerMessage) *Message
NewMessageFromConsumerMessage returns a binding.Message that holds the provided ConsumerMessage. The returned binding.Message *can* be read several times safely This function *doesn't* guarantee that the returned binding.Message is always a kafka_sarama.Message instance
func (*Message) GetAttribute ¶
func (*Message) GetExtension ¶
func (*Message) ReadBinary ¶
func (*Message) ReadEncoding ¶
func (*Message) ReadStructured ¶
type Protocol ¶
type Protocol struct { // Kafka Client sarama.Client // Sender Sender *Sender // Sender options SenderContextDecorators []func(context.Context) context.Context // Consumer Consumer *Consumer // contains filtered or unexported fields }
func NewProtocol ¶
func NewProtocol(brokers []string, saramaConfig *sarama.Config, sendToTopic string, receiveFromTopic string, opts ...ProtocolOptionFunc) (*Protocol, error)
NewProtocol creates a new kafka transport.
func NewProtocolFromClient ¶
func NewProtocolFromClient(client sarama.Client, sendToTopic string, receiveFromTopic string, opts ...ProtocolOptionFunc) (*Protocol, error)
NewProtocolFromClient creates a new kafka transport starting from a sarama.Client
func (*Protocol) OpenInbound ¶
OpenInbound implements Opener.OpenInbound NOTE: This is a blocking call.
type ProtocolOptionFunc ¶
type ProtocolOptionFunc func(protocol *Protocol)
ProtocolOptionFunc is the type of kafka_sarama.Protocol options
func WithReceiverGroupId ¶
func WithReceiverGroupId(groupId string) ProtocolOptionFunc
func WithSenderContextDecorators ¶
func WithSenderContextDecorators(decorator func(context.Context) context.Context) ProtocolOptionFunc
type Receiver ¶
type Receiver struct {
// contains filtered or unexported fields
}
Receiver which implements sarama.ConsumerGroupHandler After the first invocation of Receiver.Receive(), the sarama.ConsumerGroup is created and started.
func NewReceiver ¶
func NewReceiver() *Receiver
NewReceiver creates a Receiver which implements sarama.ConsumerGroupHandler The sarama.ConsumerGroup must be started invoking. If you need a Receiver which also manage the ConsumerGroup, use NewConsumer After the first invocation of Receiver.Receive(), the sarama.ConsumerGroup is created and started.
func (*Receiver) ConsumeClaim ¶
func (r *Receiver) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). Also the method should return when `session.Context()` is done. Refer - https://github.com/Shopify/sarama/blob/5e2c2ef0e429f895c86152189f625bfdad7d3452/examples/consumergroup/main.go#L177
type Sender ¶
type Sender struct {
// contains filtered or unexported fields
}
Sender implements binding.Sender that sends messages to a specific receiverTopic using sarama.SyncProducer
func NewSender ¶
func NewSender(brokers []string, saramaConfig *sarama.Config, topic string, options ...SenderOptionFunc) (*Sender, error)
NewSender returns a binding.Sender that sends messages to a specific receiverTopic using sarama.SyncProducer
func NewSenderFromClient ¶
func NewSenderFromClient(client sarama.Client, topic string, options ...SenderOptionFunc) (*Sender, error)
NewSenderFromClient returns a binding.Sender that sends messages to a specific receiverTopic using sarama.SyncProducer
func NewSenderFromSyncProducer ¶ added in v2.3.0
func NewSenderFromSyncProducer(topic string, syncProducer sarama.SyncProducer, options ...SenderOptionFunc) (*Sender, error)
NewSenderFromSyncProducer returns a binding.Sender that sends messages to a specific topic using sarama.SyncProducer
type SenderOptionFunc ¶
type SenderOptionFunc func(sender *Sender)
SenderOptionFunc is the type of kafka_sarama.Sender options