kafka

package
v0.0.25 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 9, 2026 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer[Output pubsub.MessageTransformer[KafkaMessage, Output]] struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer[Output pubsub.MessageTransformer[KafkaMessage, Output]](kafka *PubSub, topic string) *Consumer[Output]

NewConsumer create a consumer to take kafka message and send them to the destination.

Output object must implement the MessageTransformer interface from KafkaMessage to themselves

func (*Consumer[Output]) Run

func (k *Consumer[Output]) Run(ctx context.Context, ch chan<- Output, filter channel.MessageFilter[Output]) error
type Header = struct {
	Key   string
	Value []byte
}

type KafkaMessage

type KafkaMessage struct {
	Topic   string
	Payload []byte
	Key     []byte
	Headers []Header
	// When using a blocking pubsub, channels receiving KafkaMessages
	// must signal when they are done processing the message by sending the
	// result on the Done channel. If no value is sent on the channel before
	// the publishing timeout, the pubsub listener will panic
	Done chan<- error
}

func (KafkaMessage) String added in v0.0.13

func (k KafkaMessage) String() string

type KafkaMessageFilter

type KafkaMessageFilter struct {
	Topics map[string]bool
}

func (*KafkaMessageFilter) Filter

func (k *KafkaMessageFilter) Filter(message KafkaMessage) bool

type PubSub

type PubSub struct {
	// contains filtered or unexported fields
}

func New

func New(kafkaBroker string, consumerGroup string, blocking bool, topics ...string) *PubSub

func NewBlockingPubSub

func NewBlockingPubSub(kafkaBroker string, consumerGroup string, topics ...string) *PubSub

NewBlockingPubSub creates a new PubSub instance with blocking behavior. The blocking behavior means that Kafka is going to wait for the message to be processed before sending the next message.

func NewPubSub

func NewPubSub(kafkaBroker string, consumerGroup string, topics ...string) *PubSub

NewPubSub creates a new PubSub instance with non-blocking behavior. Non-blocking behavior means that Kafka is going to send only the new message coming in, without waiting for the previous message to be processed. Ex: When a client connects to the Kafka, it won't receive older messages that have none been processed. (fire and forget style)

func (*PubSub) MustPing added in v0.0.9

func (k *PubSub) MustPing()

func (*PubSub) Produce

func (k *PubSub) Produce(topic string, value []byte, callback func(error), headers ...Header)

Produce a message without giving it a key. This may send related messages to different partitions, and thus gives no guarantees that the consumer will receive messages in the same order you produced them.

func (*PubSub) ProduceSync added in v0.0.15

func (k *PubSub) ProduceSync(
	topic string,
	value []byte,
	key []byte,
	headers ...Header,
) error

ProduceSync a message and blocks until the broker responds. If the key parameter is non-nil, then the behavior will be similar to ProduceWithKey.

func (*PubSub) ProduceWithKey added in v0.0.14

func (k *PubSub) ProduceWithKey(
	topic string,
	value []byte,
	key []byte,
	callback func(error),
	headers ...Header,
)

ProduceWithKey a message with a key. Messages with the same key will be sent to the same partition, guaranteeing ordering of those messages.

func (*PubSub) SetFetchRetryDelay added in v0.0.23

func (k *PubSub) SetFetchRetryDelay(delay time.Duration)

SetFetchRetryDelay update the delay between fetch retries

func (*PubSub) SetFetchTimeout added in v0.0.23

func (k *PubSub) SetFetchTimeout(timeout time.Duration)

SetFetchTimeout update the timeout value when fetching messages from Kafka

func (*PubSub) SetMaxFetchRetries added in v0.0.23

func (k *PubSub) SetMaxFetchRetries(max int)

SetMaxFetchRetries update the max retries when fetching messages from Kafka

func (*PubSub) SetPublishTimeout added in v0.0.13

func (k *PubSub) SetPublishTimeout(timeout time.Duration)

SetPublishTimeout update the timeout value when interacting with Kafka

func (*PubSub) StartListener added in v0.0.12

func (k *PubSub) StartListener()

StartListener starts listening on kafka for topics that the PubSub is subscribed to. If the PubSub is already listening, StartListener is a no-op. If the context is canceled, the PubSub will stop consuming new events.

func (*PubSub) StopListener added in v0.0.21

func (k *PubSub) StopListener()

StopListener stops the listener if it is running. The subscribed channels are not closed and will be able to receive messages again if the listener is started once more.

func (*PubSub) Subscribe

func (k *PubSub) Subscribe(topics ...string) (<-chan KafkaMessage, error)

func (*PubSub) Unsubscribe

func (k *PubSub) Unsubscribe(c <-chan KafkaMessage) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL