Documentation ¶
Index ¶
- Variables
- func GetKafkaPartitions(brokers string, topic string) (partitions []int32, err error)
- func NewKafkaPublisher(broker, topic string, options ...PublisherOption) (pubsub.Publisher, error)
- func NewSubscriber(broker, topic string, options ...SubscriberOption) (pubsub.Subscriber, error)
- type PublisherOption
- func AckNoResponse() PublisherOption
- func AckWaitForAll() PublisherOption
- func AckWaitForLocal() PublisherOption
- func HashPartitioner() PublisherOption
- func ManualPartitioner(partition int32) PublisherOption
- func Partitioner(partitioner sarama.PartitionerConstructor) PublisherOption
- func PubLogger(logger log.Logger) PublisherOption
- func RandomPartitioner() PublisherOption
- func RoundRobinPartitioner() PublisherOption
- func SyncPublisher() PublisherOption
- func WithSuccessHandler(s SuccessHandler) PublisherOption
- type SubscriberOption
- func BroadcastOffset(cb func(int64)) SubscriberOption
- func Offset(o int64) SubscriberOption
- func OffsetCallback(cb func() int64) SubscriberOption
- func OffsetNewest() SubscriberOption
- func OffsetOldest() SubscriberOption
- func Partition(p int32) SubscriberOption
- func SubLogger(logger log.Logger) SubscriberOption
- type SuccessHandler
Constants ¶
This section is empty.
Variables ¶
var ( ErrNoBrokers = errors.New("at least one broker address is required") ErrNoTopic = errors.New("topic name is required") ErrNoLogger = errors.New("you need to provide a valid logger") ErrInvalidPartitionValue = errors.New("invalid partition value") ErrInvalidOffset = errors.New("invalid offset") ErrNoSuccessHandler = errors.New("you need to provide a valid success handler") )
Errors
Functions ¶
func GetKafkaPartitions ¶
GetKafkaPartitions is a helper function to look up which partitions are available via the given brokers for the given topic. This should be called only on startup.
func NewKafkaPublisher ¶
func NewKafkaPublisher( broker, topic string, options ...PublisherOption, ) (pubsub.Publisher, error)
NewKafkaPublisher will initiate a new Kafka publisher.
func NewSubscriber ¶
func NewSubscriber( broker, topic string, options ...SubscriberOption, ) (pubsub.Subscriber, error)
NewSubscriber will initiate a Kafka subscriber.
Types ¶
type PublisherOption ¶
type PublisherOption func(*publisherConfig) error
PublisherOption allows for functional options / friendly API's
func AckNoResponse ¶
func AckNoResponse() PublisherOption
AckNoResponse option to sets the ack mode to NoResponse.
func AckWaitForAll ¶
func AckWaitForAll() PublisherOption
AckWaitForAll option to sets the ack mode to WaitForAll.
func AckWaitForLocal ¶
func AckWaitForLocal() PublisherOption
AckWaitForLocal option to sets the ack mode to WaitForLocal.
func HashPartitioner ¶
func HashPartitioner() PublisherOption
HashPartitioner option to set the partition logic to use
func ManualPartitioner ¶
func ManualPartitioner(partition int32) PublisherOption
ManualPartitioner option to set the partition logic to use
func Partitioner ¶
func Partitioner(partitioner sarama.PartitionerConstructor) PublisherOption
Partitioner option to set a custom partitioner.
func PubLogger ¶
func PubLogger(logger log.Logger) PublisherOption
PubLogger option to set the logger to use
func RandomPartitioner ¶
func RandomPartitioner() PublisherOption
RandomPartitioner option to set the partition logic to use
func RoundRobinPartitioner ¶
func RoundRobinPartitioner() PublisherOption
RoundRobinPartitioner option to set the partition logic to use
func SyncPublisher ¶
func SyncPublisher() PublisherOption
SyncPublisher option to return a synchronous blocking publisher
func WithSuccessHandler ¶
func WithSuccessHandler(s SuccessHandler) PublisherOption
SuccessHandler option to set a success handler for use with async publisher.
type SubscriberOption ¶
type SubscriberOption func(*subscriberConfig) error
SubscriberOption allows for friendly APIs.
func BroadcastOffset ¶
func BroadcastOffset(cb func(int64)) SubscriberOption
BroadcastOffset sets the callback which will receive the offset of the message that has had the Done() method called on itself.
func Offset ¶
func Offset(o int64) SubscriberOption
Offset will start the consumer at the given offset.
func OffsetCallback ¶
func OffsetCallback(cb func() int64) SubscriberOption
OffsetCallback will start the consumer at the value returned by the callback function.
func OffsetNewest ¶
func OffsetNewest() SubscriberOption
OffsetNewest will start the consumer at newly incoming messages.
func OffsetOldest ¶
func OffsetOldest() SubscriberOption
OffsetOldest will start the consumer at the earliest available message.
func SubLogger ¶
func SubLogger(logger log.Logger) SubscriberOption
SubLogger option to set the logger to use
type SuccessHandler ¶
type SuccessHandler func(*sarama.ProducerMessage)