Documentation
¶
Index ¶
- type Consumer
- type Header
- type KafkaMessage
- type KafkaMessageFilter
- type PubSub
- func (k *PubSub) MustPing()
- func (k *PubSub) Produce(topic string, value []byte, callback func(error), headers ...Header)
- func (k *PubSub) ProduceSync(topic string, value []byte, key []byte, headers ...Header) error
- func (k *PubSub) ProduceWithKey(topic string, value []byte, key []byte, callback func(error), ...)
- func (k *PubSub) SetFetchRetryDelay(delay time.Duration)
- func (k *PubSub) SetFetchTimeout(timeout time.Duration)
- func (k *PubSub) SetMaxFetchRetries(max int)
- func (k *PubSub) SetPublishTimeout(timeout time.Duration)
- func (k *PubSub) StartListener()
- func (k *PubSub) StopListener()
- func (k *PubSub) Subscribe(topics ...string) (<-chan KafkaMessage, error)
- func (k *PubSub) Unsubscribe(c <-chan KafkaMessage) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
type Consumer[Output pubsub.MessageTransformer[KafkaMessage, Output]] struct { // contains filtered or unexported fields }
func NewConsumer ¶
func NewConsumer[Output pubsub.MessageTransformer[KafkaMessage, Output]](kafka *PubSub, topic string) *Consumer[Output]
NewConsumer create a consumer to take kafka message and send them to the destination.
Output object must implement the MessageTransformer interface from KafkaMessage to themselves
type KafkaMessage ¶
type KafkaMessage struct {
Topic string
Payload []byte
Key []byte
Headers []Header
// When using a blocking pubsub, channels receiving KafkaMessages
// must signal when they are done processing the message by sending the
// result on the Done channel. If no value is sent on the channel before
// the publishing timeout, the pubsub listener will panic
Done chan<- error
}
func (KafkaMessage) String ¶ added in v0.0.13
func (k KafkaMessage) String() string
type KafkaMessageFilter ¶
func (*KafkaMessageFilter) Filter ¶
func (k *KafkaMessageFilter) Filter(message KafkaMessage) bool
type PubSub ¶
type PubSub struct {
// contains filtered or unexported fields
}
func NewBlockingPubSub ¶
NewBlockingPubSub creates a new PubSub instance with blocking behavior. The blocking behavior means that Kafka is going to wait for the message to be processed before sending the next message.
func NewPubSub ¶
NewPubSub creates a new PubSub instance with non-blocking behavior. Non-blocking behavior means that Kafka is going to send only the new message coming in, without waiting for the previous message to be processed. Ex: When a client connects to the Kafka, it won't receive older messages that have none been processed. (fire and forget style)
func (*PubSub) Produce ¶
Produce a message without giving it a key. This may send related messages to different partitions, and thus gives no guarantees that the consumer will receive messages in the same order you produced them.
func (*PubSub) ProduceSync ¶ added in v0.0.15
ProduceSync a message and blocks until the broker responds. If the key parameter is non-nil, then the behavior will be similar to ProduceWithKey.
func (*PubSub) ProduceWithKey ¶ added in v0.0.14
func (k *PubSub) ProduceWithKey( topic string, value []byte, key []byte, callback func(error), headers ...Header, )
ProduceWithKey a message with a key. Messages with the same key will be sent to the same partition, guaranteeing ordering of those messages.
func (*PubSub) SetFetchRetryDelay ¶ added in v0.0.23
SetFetchRetryDelay update the delay between fetch retries
func (*PubSub) SetFetchTimeout ¶ added in v0.0.23
SetFetchTimeout update the timeout value when fetching messages from Kafka
func (*PubSub) SetMaxFetchRetries ¶ added in v0.0.23
SetMaxFetchRetries update the max retries when fetching messages from Kafka
func (*PubSub) SetPublishTimeout ¶ added in v0.0.13
SetPublishTimeout update the timeout value when interacting with Kafka
func (*PubSub) StartListener ¶ added in v0.0.12
func (k *PubSub) StartListener()
StartListener starts listening on kafka for topics that the PubSub is subscribed to. If the PubSub is already listening, StartListener is a no-op. If the context is canceled, the PubSub will stop consuming new events.
func (*PubSub) StopListener ¶ added in v0.0.21
func (k *PubSub) StopListener()
StopListener stops the listener if it is running. The subscribed channels are not closed and will be able to receive messages again if the listener is started once more.
func (*PubSub) Subscribe ¶
func (k *PubSub) Subscribe(topics ...string) (<-chan KafkaMessage, error)
func (*PubSub) Unsubscribe ¶
func (k *PubSub) Unsubscribe(c <-chan KafkaMessage) error