Documentation
¶
Index ¶
- func NewAdminClientFromConsumer(c Consumer) (a *lib.AdminClient, err error)
- func NewAdminClientFromProducer(p Producer) (a *lib.AdminClient, err error)
- type Consumer
- type MockConsumer
- func (c *MockConsumer) Assign(partitions []lib.TopicPartition) (err error)
- func (c *MockConsumer) Assignment() ([]lib.TopicPartition, error)
- func (c *MockConsumer) Close() (err error)
- func (c *MockConsumer) Commit() ([]lib.TopicPartition, error)
- func (c *MockConsumer) CommitMessage(m *lib.Message) ([]lib.TopicPartition, error)
- func (c *MockConsumer) CommitOffsets(offsets []lib.TopicPartition) ([]lib.TopicPartition, error)
- func (c *MockConsumer) Committed(partitions []lib.TopicPartition, timeoutMs int) ([]lib.TopicPartition, error)
- func (c *MockConsumer) Events() chan lib.Event
- func (c *MockConsumer) GetConsumerGroupMetadata() (*lib.ConsumerGroupMetadata, error)
- func (c *MockConsumer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*lib.Metadata, error)
- func (c *MockConsumer) GetWatermarkOffsets(topic string, partition int32) (low, high int64, err error)
- func (c *MockConsumer) Logs() chan lib.LogEvent
- func (c *MockConsumer) OffsetsForTimes(times []lib.TopicPartition, timeoutMs int) (offsets []lib.TopicPartition, err error)
- func (c *MockConsumer) Pause(partitions []lib.TopicPartition) (err error)
- func (c *MockConsumer) Poll(timeoutMs int) (event lib.Event)
- func (c *MockConsumer) Position(partitions []lib.TopicPartition) (offsets []lib.TopicPartition, err error)
- func (c *MockConsumer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
- func (c *MockConsumer) ReadMessage(timeout time.Duration) (*lib.Message, error)
- func (c *MockConsumer) Resume(partitions []lib.TopicPartition) (err error)
- func (c *MockConsumer) Seek(partition lib.TopicPartition, timeoutMs int) error
- func (c *MockConsumer) SetOAuthBearerToken(oauthBearerToken lib.OAuthBearerToken) error
- func (c *MockConsumer) SetOAuthBearerTokenFailure(errstr string) error
- func (c *MockConsumer) StoreOffsets(offsets []lib.TopicPartition) ([]lib.TopicPartition, error)
- func (c *MockConsumer) String() string
- func (c *MockConsumer) Subscribe(topic string, rebalanceCb lib.RebalanceCb) error
- func (c *MockConsumer) SubscribeTopics(topics []string, rebalanceCb lib.RebalanceCb) (err error)
- func (c *MockConsumer) Subscription() (topics []string, err error)
- func (c *MockConsumer) Unassign() (err error)
- func (c *MockConsumer) Unsubscribe() (err error)
- type MockProducer
- func (p *MockProducer) AbortTransaction(ctx context.Context) error
- func (p *MockProducer) BeginTransaction() error
- func (p *MockProducer) Close()
- func (p *MockProducer) CommitTransaction(ctx context.Context) error
- func (p *MockProducer) Events() chan lib.Event
- func (p *MockProducer) Flush(timeoutMs int) int
- func (p *MockProducer) GetFatalError() error
- func (p *MockProducer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*lib.Metadata, error)
- func (p *MockProducer) InitTransactions(ctx context.Context) error
- func (p *MockProducer) Len() int
- func (p *MockProducer) Logs() chan lib.LogEvent
- func (p *MockProducer) OffsetsForTimes(times []lib.TopicPartition, timeoutMs int) ([]lib.TopicPartition, error)
- func (p *MockProducer) Produce(msg *lib.Message, deliveryChan chan lib.Event) error
- func (p *MockProducer) ProduceChannel() chan *lib.Message
- func (p *MockProducer) Purge(flags int) error
- func (p *MockProducer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
- func (p *MockProducer) SendOffsetsToTransaction(ctx context.Context, offsets []lib.TopicPartition, ...) error
- func (p *MockProducer) SetOAuthBearerToken(oauthBearerToken lib.OAuthBearerToken) error
- func (p *MockProducer) SetOAuthBearerTokenFailure(errstr string) error
- func (p *MockProducer) String() string
- func (p *MockProducer) TestFatalError(code lib.ErrorCode, str string) lib.ErrorCode
- type Producer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewAdminClientFromConsumer ¶
func NewAdminClientFromConsumer(c Consumer) (a *lib.AdminClient, err error)
NewAdminClientFromConsumer derives a new AdminClient from an existing Consumer instance. The AdminClient will use the same configuration and connections as the parent instance.
func NewAdminClientFromProducer ¶
func NewAdminClientFromProducer(p Producer) (a *lib.AdminClient, err error)
NewAdminClientFromProducer derives a new AdminClient from an existing Producer instance. The AdminClient will use the same configuration and connections as the parent instance.
Types ¶
type Consumer ¶
type Consumer interface {
Assign(partitions []lib.TopicPartition) (err error)
Assignment() (partitions []lib.TopicPartition, err error)
Close() (err error)
Commit() ([]lib.TopicPartition, error)
CommitMessage(m *lib.Message) ([]lib.TopicPartition, error)
CommitOffsets(offsets []lib.TopicPartition) ([]lib.TopicPartition, error)
Committed(partitions []lib.TopicPartition, timeoutMs int) (offsets []lib.TopicPartition, err error)
Events() chan lib.Event
GetConsumerGroupMetadata() (*lib.ConsumerGroupMetadata, error)
GetMetadata(topic *string, allTopics bool, timeoutMs int) (*lib.Metadata, error)
GetWatermarkOffsets(topic string, partition int32) (low, high int64, err error)
Logs() chan lib.LogEvent
OffsetsForTimes(times []lib.TopicPartition, timeoutMs int) (offsets []lib.TopicPartition, err error)
Pause(partitions []lib.TopicPartition) (err error)
Poll(timeoutMs int) (event lib.Event)
Position(partitions []lib.TopicPartition) (offsets []lib.TopicPartition, err error)
QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
ReadMessage(timeout time.Duration) (*lib.Message, error)
Resume(partitions []lib.TopicPartition) (err error)
Seek(partition lib.TopicPartition, timeoutMs int) error
SetOAuthBearerToken(oauthBearerToken lib.OAuthBearerToken) error
SetOAuthBearerTokenFailure(errstr string) error
StoreOffsets(offsets []lib.TopicPartition) (storedOffsets []lib.TopicPartition, err error)
String() string
Subscribe(topic string, rebalanceCb lib.RebalanceCb) error
SubscribeTopics(topics []string, rebalanceCb lib.RebalanceCb) (err error)
Subscription() (topics []string, err error)
Unassign() (err error)
Unsubscribe() (err error)
}
Consumer is an interface for the confluent-kafka-go Consumer struct
type MockConsumer ¶
MockConsumer is a mock of the Consumer interface using github.com/stretchr/testify/mock
func NewMockConsumer ¶
func NewMockConsumer() *MockConsumer
NewMockConsumer returns a new MockConsumer struct
func (*MockConsumer) Assign ¶
func (c *MockConsumer) Assign(partitions []lib.TopicPartition) (err error)
Assign method
func (*MockConsumer) Assignment ¶
func (c *MockConsumer) Assignment() ([]lib.TopicPartition, error)
Assignment method
func (*MockConsumer) Commit ¶
func (c *MockConsumer) Commit() ([]lib.TopicPartition, error)
Commit method
func (*MockConsumer) CommitMessage ¶
func (c *MockConsumer) CommitMessage(m *lib.Message) ([]lib.TopicPartition, error)
CommitMessage method
func (*MockConsumer) CommitOffsets ¶
func (c *MockConsumer) CommitOffsets(offsets []lib.TopicPartition) ([]lib.TopicPartition, error)
CommitOffsets method
func (*MockConsumer) Committed ¶
func (c *MockConsumer) Committed(partitions []lib.TopicPartition, timeoutMs int) ([]lib.TopicPartition, error)
Committed method
func (*MockConsumer) GetConsumerGroupMetadata ¶
func (c *MockConsumer) GetConsumerGroupMetadata() (*lib.ConsumerGroupMetadata, error)
GetConsumerGroupMetadata method
func (*MockConsumer) GetMetadata ¶
func (c *MockConsumer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*lib.Metadata, error)
GetMetadata method
func (*MockConsumer) GetWatermarkOffsets ¶
func (c *MockConsumer) GetWatermarkOffsets(topic string, partition int32) (low, high int64, err error)
GetWatermarkOffsets method
func (*MockConsumer) OffsetsForTimes ¶
func (c *MockConsumer) OffsetsForTimes(times []lib.TopicPartition, timeoutMs int) (offsets []lib.TopicPartition, err error)
OffsetsForTimes method
func (*MockConsumer) Pause ¶
func (c *MockConsumer) Pause(partitions []lib.TopicPartition) (err error)
Pause method
func (*MockConsumer) Poll ¶
func (c *MockConsumer) Poll(timeoutMs int) (event lib.Event)
Poll method
func (*MockConsumer) Position ¶
func (c *MockConsumer) Position(partitions []lib.TopicPartition) (offsets []lib.TopicPartition, err error)
Position method
func (*MockConsumer) QueryWatermarkOffsets ¶
func (c *MockConsumer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
QueryWatermarkOffsets method
func (*MockConsumer) ReadMessage ¶
ReadMessage method
func (*MockConsumer) Resume ¶
func (c *MockConsumer) Resume(partitions []lib.TopicPartition) (err error)
Resume method
func (*MockConsumer) Seek ¶
func (c *MockConsumer) Seek(partition lib.TopicPartition, timeoutMs int) error
Seek method
func (*MockConsumer) SetOAuthBearerToken ¶
func (c *MockConsumer) SetOAuthBearerToken(oauthBearerToken lib.OAuthBearerToken) error
SetOAuthBearerToken method
func (*MockConsumer) SetOAuthBearerTokenFailure ¶
func (c *MockConsumer) SetOAuthBearerTokenFailure(errstr string) error
SetOAuthBearerTokenFailure method
func (*MockConsumer) StoreOffsets ¶
func (c *MockConsumer) StoreOffsets(offsets []lib.TopicPartition) ([]lib.TopicPartition, error)
StoreOffsets method
func (*MockConsumer) Subscribe ¶
func (c *MockConsumer) Subscribe(topic string, rebalanceCb lib.RebalanceCb) error
Subscribe method
func (*MockConsumer) SubscribeTopics ¶
func (c *MockConsumer) SubscribeTopics(topics []string, rebalanceCb lib.RebalanceCb) (err error)
SubscribeTopics method
func (*MockConsumer) Subscription ¶
func (c *MockConsumer) Subscription() (topics []string, err error)
Subscription method
func (*MockConsumer) Unsubscribe ¶
func (c *MockConsumer) Unsubscribe() (err error)
Unsubscribe method
type MockProducer ¶
MockProducer is a mock of the Producer interface using github.com/stretchr/testify/mock
func NewMockProducer ¶
func NewMockProducer() *MockProducer
NewMockProducer returns a new MockProducer struct
func (*MockProducer) AbortTransaction ¶
func (p *MockProducer) AbortTransaction(ctx context.Context) error
AbortTransaction method
func (*MockProducer) BeginTransaction ¶
func (p *MockProducer) BeginTransaction() error
BeginTransaction method
func (*MockProducer) CommitTransaction ¶
func (p *MockProducer) CommitTransaction(ctx context.Context) error
CommitTransaction method
func (*MockProducer) GetFatalError ¶
func (p *MockProducer) GetFatalError() error
GetFatalError method
func (*MockProducer) GetMetadata ¶
func (p *MockProducer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*lib.Metadata, error)
GetMetadata method
func (*MockProducer) InitTransactions ¶
func (p *MockProducer) InitTransactions(ctx context.Context) error
InitTransactions method
func (*MockProducer) OffsetsForTimes ¶
func (p *MockProducer) OffsetsForTimes(times []lib.TopicPartition, timeoutMs int) ([]lib.TopicPartition, error)
OffsetsForTimes method
func (*MockProducer) ProduceChannel ¶
func (p *MockProducer) ProduceChannel() chan *lib.Message
ProduceChannel method
func (*MockProducer) QueryWatermarkOffsets ¶
func (p *MockProducer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
QueryWatermarkOffsets method
func (*MockProducer) SendOffsetsToTransaction ¶
func (p *MockProducer) SendOffsetsToTransaction(ctx context.Context, offsets []lib.TopicPartition, consumerMetadata *lib.ConsumerGroupMetadata) error
SendOffsetsToTransaction method
func (*MockProducer) SetOAuthBearerToken ¶
func (p *MockProducer) SetOAuthBearerToken(oauthBearerToken lib.OAuthBearerToken) error
SetOAuthBearerToken method
func (*MockProducer) SetOAuthBearerTokenFailure ¶
func (p *MockProducer) SetOAuthBearerTokenFailure(errstr string) error
SetOAuthBearerTokenFailure method
func (*MockProducer) TestFatalError ¶
TestFatalError method
type Producer ¶
type Producer interface {
AbortTransaction(ctx context.Context) error
BeginTransaction() error
Close()
CommitTransaction(ctx context.Context) error
Events() chan lib.Event
Flush(timeoutMs int) int
GetFatalError() error
GetMetadata(topic *string, allTopics bool, timeoutMs int) (*lib.Metadata, error)
InitTransactions(ctx context.Context) error
Len() int
Logs() chan lib.LogEvent
OffsetsForTimes(times []lib.TopicPartition, timeoutMs int) (offsets []lib.TopicPartition, err error)
Produce(msg *lib.Message, deliveryChan chan lib.Event) error
ProduceChannel() chan *lib.Message
Purge(flags int) error
QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
SendOffsetsToTransaction(ctx context.Context, offsets []lib.TopicPartition, consumerMetadata *lib.ConsumerGroupMetadata) error
SetOAuthBearerToken(oauthBearerToken lib.OAuthBearerToken) error
SetOAuthBearerTokenFailure(errstr string) error
String() string
TestFatalError(code lib.ErrorCode, str string) lib.ErrorCode
}
Producer is an interface for the confluent-kafka-go Producer struct