Versions in this module Expand all Collapse all v1 v1.1.0 Sep 29, 2020 Changes in this version + const AnyOffset + type AsyncProducer struct + func NewAsyncProducer(t ErrorReporter, config *sarama.Config) *AsyncProducer + func (mp *AsyncProducer) AsyncClose() + func (mp *AsyncProducer) Close() error + func (mp *AsyncProducer) Errors() <-chan *sarama.ProducerError + func (mp *AsyncProducer) ExpectInputAndFail(err error) + func (mp *AsyncProducer) ExpectInputAndSucceed() + func (mp *AsyncProducer) ExpectInputWithCheckerFunctionAndFail(cf ValueChecker, err error) + func (mp *AsyncProducer) ExpectInputWithCheckerFunctionAndSucceed(cf ValueChecker) + func (mp *AsyncProducer) Input() chan<- *sarama.ProducerMessage + func (mp *AsyncProducer) Successes() <-chan *sarama.ProducerMessage + type Consumer struct + func NewConsumer(t ErrorReporter, config *sarama.Config) *Consumer + func (c *Consumer) Close() error + func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) + func (c *Consumer) ExpectConsumePartition(topic string, partition int32, offset int64) *PartitionConsumer + func (c *Consumer) HighWaterMarks() map[string]map[int32]int64 + func (c *Consumer) Partitions(topic string) ([]int32, error) + func (c *Consumer) SetTopicMetadata(metadata map[string][]int32) + func (c *Consumer) Topics() ([]string, error) + type ErrorReporter interface + Errorf func(string, ...interface{}) + type PartitionConsumer struct + func (pc *PartitionConsumer) AsyncClose() + func (pc *PartitionConsumer) Close() error + func (pc *PartitionConsumer) Errors() <-chan *sarama.ConsumerError + func (pc *PartitionConsumer) ExpectErrorsDrainedOnClose() + func (pc *PartitionConsumer) ExpectMessagesDrainedOnClose() + func (pc *PartitionConsumer) HighWaterMarkOffset() int64 + func (pc *PartitionConsumer) Messages() <-chan *sarama.ConsumerMessage + func (pc *PartitionConsumer) YieldError(err error) + func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) + type SyncProducer struct + func NewSyncProducer(t ErrorReporter, config *sarama.Config) *SyncProducer + func (sp *SyncProducer) Close() error + func (sp *SyncProducer) ExpectSendMessageAndFail(err error) + func (sp *SyncProducer) ExpectSendMessageAndSucceed() + func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndFail(cf ValueChecker, err error) + func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndSucceed(cf ValueChecker) + func (sp *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) + func (sp *SyncProducer) SendMessages(msgs []*sarama.ProducerMessage) error + type ValueChecker func(val []byte) error