Documentation ¶
Overview ¶
Package mocks implements a simple Kafka client and consumer for testing purposes. The main use case is to test a consumer that has a predefined set of messages. Note: only the methods that are actively used for testing are implemented.
Index ¶
- type Message
- type MockClient
- func (m *MockClient) Broker(brokerID int32) (*sarama.Broker, error)
- func (m *MockClient) Brokers() []*sarama.Broker
- func (m *MockClient) Close() error
- func (m *MockClient) Closed() bool
- func (m *MockClient) Config() *sarama.Config
- func (m *MockClient) Consumer() sarama.Consumer
- func (m *MockClient) Controller() (*sarama.Broker, error)
- func (m *MockClient) Coordinator(consumerGroup string) (*sarama.Broker, error)
- func (m *MockClient) GetOffset(topic string, partitionID int32, time int64) (int64, error)
- func (m *MockClient) InSyncReplicas(topic string, partitionID int32) ([]int32, error)
- func (m *MockClient) InitProducerID() (*sarama.InitProducerIDResponse, error)
- func (m *MockClient) Leader(topic string, partitionID int32) (*sarama.Broker, error)
- func (m *MockClient) LeaderAndEpoch(topic string, partitionID int32) (*sarama.Broker, int32, error)
- func (m *MockClient) LeastLoadedBroker() *sarama.Broker
- func (m *MockClient) OfflineReplicas(topic string, partitionID int32) ([]int32, error)
- func (m *MockClient) Partitions(topic string) ([]int32, error)
- func (m *MockClient) RefreshBrokers(addrs []string) error
- func (m *MockClient) RefreshController() (*sarama.Broker, error)
- func (m *MockClient) RefreshCoordinator(consumerGroup string) error
- func (m *MockClient) RefreshMetadata(topics ...string) error
- func (m *MockClient) RefreshTransactionCoordinator(transactionID string) error
- func (m *MockClient) Replicas(topic string, partitionID int32) ([]int32, error)
- func (m *MockClient) Topics() ([]string, error)
- func (m *MockClient) TransactionCoordinator(transactionID string) (*sarama.Broker, error)
- func (m *MockClient) WritablePartitions(topic string) ([]int32, error)
- type MockConsumer
- func (m *MockConsumer) Close() error
- func (m *MockConsumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error)
- func (m *MockConsumer) HighWaterMarks() map[string]map[int32]int64
- func (m *MockConsumer) Partitions(topic string) ([]int32, error)
- func (m *MockConsumer) Pause(topicPartitions map[string][]int32)
- func (m *MockConsumer) PauseAll()
- func (m *MockConsumer) Resume(topicPartitions map[string][]int32)
- func (m *MockConsumer) ResumeAll()
- func (m *MockConsumer) Topics() ([]string, error)
- type MockPartitionConsumer
- func (m *MockPartitionConsumer) AsyncClose()
- func (m *MockPartitionConsumer) Close() error
- func (m *MockPartitionConsumer) Errors() <-chan *sarama.ConsumerError
- func (m *MockPartitionConsumer) HighWaterMarkOffset() int64
- func (m *MockPartitionConsumer) IsPaused() bool
- func (m *MockPartitionConsumer) Messages() <-chan *sarama.ConsumerMessage
- func (m *MockPartitionConsumer) Pause()
- func (m *MockPartitionConsumer) Resume()
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type MockClient ¶
type MockClient struct {
// contains filtered or unexported fields
}
MockClient implements sarama.Client interface for testing purposes.
func NewMockClient ¶
func NewMockClient(config *sarama.Config, consumer *MockConsumer) *MockClient
NewMockClient instantiate a new client that always returns the given Consumer
func (*MockClient) Broker ¶
func (m *MockClient) Broker(brokerID int32) (*sarama.Broker, error)
Broker implements sarama.Client.
func (*MockClient) Brokers ¶
func (m *MockClient) Brokers() []*sarama.Broker
Brokers implements sarama.Client.
func (*MockClient) Config ¶
func (m *MockClient) Config() *sarama.Config
Config implements sarama.Client.
func (*MockClient) Consumer ¶
func (m *MockClient) Consumer() sarama.Consumer
Consumer returns the underlying sarama.Consumer.
func (*MockClient) Controller ¶
func (m *MockClient) Controller() (*sarama.Broker, error)
Controller implements sarama.Client.
func (*MockClient) Coordinator ¶
func (m *MockClient) Coordinator(consumerGroup string) (*sarama.Broker, error)
Coordinator implements sarama.Client.
func (*MockClient) InSyncReplicas ¶
func (m *MockClient) InSyncReplicas(topic string, partitionID int32) ([]int32, error)
InSyncReplicas implements sarama.Client.
func (*MockClient) InitProducerID ¶
func (m *MockClient) InitProducerID() (*sarama.InitProducerIDResponse, error)
InitProducerID implements sarama.Client.
func (*MockClient) LeaderAndEpoch ¶
func (m *MockClient) LeaderAndEpoch( topic string, partitionID int32, ) (*sarama.Broker, int32, error)
LeaderAndEpoch implements sarama.Client.
func (*MockClient) LeastLoadedBroker ¶
func (m *MockClient) LeastLoadedBroker() *sarama.Broker
LeastLoadedBroker implements sarama.Client.
func (*MockClient) OfflineReplicas ¶
func (m *MockClient) OfflineReplicas(topic string, partitionID int32) ([]int32, error)
OfflineReplicas implements sarama.Client.
func (*MockClient) Partitions ¶
func (m *MockClient) Partitions(topic string) ([]int32, error)
Partitions implements sarama.Client.
func (*MockClient) RefreshBrokers ¶
func (m *MockClient) RefreshBrokers(addrs []string) error
RefreshBrokers implements sarama.Client.
func (*MockClient) RefreshController ¶
func (m *MockClient) RefreshController() (*sarama.Broker, error)
RefreshController implements sarama.Client.
func (*MockClient) RefreshCoordinator ¶
func (m *MockClient) RefreshCoordinator(consumerGroup string) error
RefreshCoordinator implements sarama.Client.
func (*MockClient) RefreshMetadata ¶
func (m *MockClient) RefreshMetadata(topics ...string) error
RefreshMetadata implements sarama.Client.
func (*MockClient) RefreshTransactionCoordinator ¶
func (m *MockClient) RefreshTransactionCoordinator(transactionID string) error
RefreshTransactionCoordinator implements sarama.Client.
func (*MockClient) Replicas ¶
func (m *MockClient) Replicas(topic string, partitionID int32) ([]int32, error)
Replicas implements sarama.Client.
func (*MockClient) Topics ¶
func (m *MockClient) Topics() ([]string, error)
Topics implements sarama.Client.
func (*MockClient) TransactionCoordinator ¶
func (m *MockClient) TransactionCoordinator(transactionID string) (*sarama.Broker, error)
TransactionCoordinator implements sarama.Client.
func (*MockClient) WritablePartitions ¶
func (m *MockClient) WritablePartitions(topic string) ([]int32, error)
WritablePartitions implements sarama.Client.
type MockConsumer ¶
type MockConsumer struct {
// contains filtered or unexported fields
}
MockConsumer is a consumer for a fixed set of messages.
func NewMockConsumer ¶
func NewMockConsumer(messages map[string]map[int32][]*Message, oldestOffset int64) *MockConsumer
NewMockConsumer creates a consumer for the given set of messages
func (*MockConsumer) ConsumePartition ¶
func (m *MockConsumer) ConsumePartition( topic string, partition int32, offset int64, ) (sarama.PartitionConsumer, error)
ConsumePartition implements sarama.Consumer.
func (*MockConsumer) HighWaterMarks ¶
func (m *MockConsumer) HighWaterMarks() map[string]map[int32]int64
HighWaterMarks implements sarama.Consumer.
func (*MockConsumer) Partitions ¶
func (m *MockConsumer) Partitions(topic string) ([]int32, error)
Partitions implements sarama.Consumer.
func (*MockConsumer) Pause ¶
func (m *MockConsumer) Pause(topicPartitions map[string][]int32)
Pause implements sarama.Consumer.
func (*MockConsumer) PauseAll ¶
func (m *MockConsumer) PauseAll()
PauseAll implements sarama.Consumer.
func (*MockConsumer) Resume ¶
func (m *MockConsumer) Resume(topicPartitions map[string][]int32)
Resume implements sarama.Consumer.
func (*MockConsumer) ResumeAll ¶
func (m *MockConsumer) ResumeAll()
ResumeAll implements sarama.Consumer.
func (*MockConsumer) Topics ¶
func (m *MockConsumer) Topics() ([]string, error)
Topics implements sarama.Consumer.
type MockPartitionConsumer ¶
type MockPartitionConsumer struct {
// contains filtered or unexported fields
}
MockPartitionConsumer implements sarama.PartitionConsumer interface for testing purposes.
func NewMockPartitionConsumer ¶
func NewMockPartitionConsumer( topic string, partition int32, messages []*Message, offset int64, oldestOffset int64, ) *MockPartitionConsumer
NewMockPartitionConsumer sets the messages that the partition will return to the consumer.
func (*MockPartitionConsumer) AsyncClose ¶
func (m *MockPartitionConsumer) AsyncClose()
AsyncClose implements sarama.PartitionConsumer.
func (*MockPartitionConsumer) Close ¶
func (m *MockPartitionConsumer) Close() error
Close implements sarama.PartitionConsumer.
func (*MockPartitionConsumer) Errors ¶
func (m *MockPartitionConsumer) Errors() <-chan *sarama.ConsumerError
Errors implements sarama.PartitionConsumer.
func (*MockPartitionConsumer) HighWaterMarkOffset ¶
func (m *MockPartitionConsumer) HighWaterMarkOffset() int64
HighWaterMarkOffset implements sarama.PartitionConsumer.
func (*MockPartitionConsumer) IsPaused ¶
func (m *MockPartitionConsumer) IsPaused() bool
IsPaused implements sarama.PartitionConsumer.
func (*MockPartitionConsumer) Messages ¶
func (m *MockPartitionConsumer) Messages() <-chan *sarama.ConsumerMessage
Messages implements sarama.PartitionConsumer.
func (*MockPartitionConsumer) Pause ¶
func (m *MockPartitionConsumer) Pause()
Pause implements sarama.PartitionConsumer.
func (*MockPartitionConsumer) Resume ¶
func (m *MockPartitionConsumer) Resume()
Resume implements sarama.PartitionConsumer.