kafka

package
v0.0.0-...-c3319a1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 10, 2016 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNoBrokers             = errors.New("at least one broker address is required")
	ErrNoTopic               = errors.New("topic name is required")
	ErrNoLogger              = errors.New("you need to provide a valid logger")
	ErrInvalidPartitionValue = errors.New("invalid partition value")
	ErrInvalidOffset         = errors.New("invalid offset")
	ErrNoSuccessHandler      = errors.New("you need to provide a valid success handler")
)

Errors

Functions

func GetKafkaPartitions

func GetKafkaPartitions(brokers string, topic string) (partitions []int32, err error)

GetKafkaPartitions is a helper function to look up which partitions are available via the given brokers for the given topic. This should be called only on startup.

func NewKafkaPublisher

func NewKafkaPublisher(
	broker, topic string,
	options ...PublisherOption,
) (pubsub.Publisher, error)

NewKafkaPublisher will initiate a new Kafka publisher.

func NewSubscriber

func NewSubscriber(
	broker, topic string,
	options ...SubscriberOption,
) (pubsub.Subscriber, error)

NewSubscriber will initiate a Kafka subscriber.

Types

type PublisherOption

type PublisherOption func(*publisherConfig) error

PublisherOption allows for functional options / friendly API's

func AckNoResponse

func AckNoResponse() PublisherOption

AckNoResponse option to sets the ack mode to NoResponse.

func AckWaitForAll

func AckWaitForAll() PublisherOption

AckWaitForAll option to sets the ack mode to WaitForAll.

func AckWaitForLocal

func AckWaitForLocal() PublisherOption

AckWaitForLocal option to sets the ack mode to WaitForLocal.

func HashPartitioner

func HashPartitioner() PublisherOption

HashPartitioner option to set the partition logic to use

func ManualPartitioner

func ManualPartitioner(partition int32) PublisherOption

ManualPartitioner option to set the partition logic to use

func Partitioner

func Partitioner(partitioner sarama.PartitionerConstructor) PublisherOption

Partitioner option to set a custom partitioner.

func PubLogger

func PubLogger(logger log.Logger) PublisherOption

PubLogger option to set the logger to use

func RandomPartitioner

func RandomPartitioner() PublisherOption

RandomPartitioner option to set the partition logic to use

func RoundRobinPartitioner

func RoundRobinPartitioner() PublisherOption

RoundRobinPartitioner option to set the partition logic to use

func SyncPublisher

func SyncPublisher() PublisherOption

SyncPublisher option to return a synchronous blocking publisher

func WithSuccessHandler

func WithSuccessHandler(s SuccessHandler) PublisherOption

SuccessHandler option to set a success handler for use with async publisher.

type SubscriberOption

type SubscriberOption func(*subscriberConfig) error

SubscriberOption allows for friendly APIs.

func BroadcastOffset

func BroadcastOffset(cb func(int64)) SubscriberOption

BroadcastOffset sets the callback which will receive the offset of the message that has had the Done() method called on itself.

func Offset

func Offset(o int64) SubscriberOption

Offset will start the consumer at the given offset.

func OffsetCallback

func OffsetCallback(cb func() int64) SubscriberOption

OffsetCallback will start the consumer at the value returned by the callback function.

func OffsetNewest

func OffsetNewest() SubscriberOption

OffsetNewest will start the consumer at newly incoming messages.

func OffsetOldest

func OffsetOldest() SubscriberOption

OffsetOldest will start the consumer at the earliest available message.

func Partition

func Partition(p int32) SubscriberOption

Partition sets the partition ID option.

func SubLogger

func SubLogger(logger log.Logger) SubscriberOption

SubLogger option to set the logger to use

type SuccessHandler

type SuccessHandler func(*sarama.ProducerMessage)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL