kafka

package
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 26, 2022 License: MIT Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewAdminClientFromConsumer

func NewAdminClientFromConsumer(c Consumer) (a *lib.AdminClient, err error)

NewAdminClientFromConsumer derives a new AdminClient from an existing Consumer instance. The AdminClient will use the same configuration and connections as the parent instance.

func NewAdminClientFromProducer

func NewAdminClientFromProducer(p Producer) (a *lib.AdminClient, err error)

NewAdminClientFromProducer derives a new AdminClient from an existing Producer instance. The AdminClient will use the same configuration and connections as the parent instance.

Types

type Consumer

type Consumer interface {
	Assign(partitions []lib.TopicPartition) (err error)
	Assignment() (partitions []lib.TopicPartition, err error)
	Close() (err error)
	Commit() ([]lib.TopicPartition, error)
	CommitMessage(m *lib.Message) ([]lib.TopicPartition, error)
	CommitOffsets(offsets []lib.TopicPartition) ([]lib.TopicPartition, error)
	Committed(partitions []lib.TopicPartition, timeoutMs int) (offsets []lib.TopicPartition, err error)
	Events() chan lib.Event
	GetConsumerGroupMetadata() (*lib.ConsumerGroupMetadata, error)
	GetMetadata(topic *string, allTopics bool, timeoutMs int) (*lib.Metadata, error)
	GetWatermarkOffsets(topic string, partition int32) (low, high int64, err error)
	Logs() chan lib.LogEvent
	OffsetsForTimes(times []lib.TopicPartition, timeoutMs int) (offsets []lib.TopicPartition, err error)
	Pause(partitions []lib.TopicPartition) (err error)
	Poll(timeoutMs int) (event lib.Event)
	Position(partitions []lib.TopicPartition) (offsets []lib.TopicPartition, err error)
	QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
	ReadMessage(timeout time.Duration) (*lib.Message, error)
	Resume(partitions []lib.TopicPartition) (err error)
	Seek(partition lib.TopicPartition, timeoutMs int) error
	SetOAuthBearerToken(oauthBearerToken lib.OAuthBearerToken) error
	SetOAuthBearerTokenFailure(errstr string) error
	StoreOffsets(offsets []lib.TopicPartition) (storedOffsets []lib.TopicPartition, err error)
	String() string
	Subscribe(topic string, rebalanceCb lib.RebalanceCb) error
	SubscribeTopics(topics []string, rebalanceCb lib.RebalanceCb) (err error)
	Subscription() (topics []string, err error)
	Unassign() (err error)
	Unsubscribe() (err error)
}

Consumer is an interface for the confluent-kafka-go Consumer struct

type MockConsumer

type MockConsumer struct {
	mock.Mock
}

MockConsumer is a mock of the Consumer interface using github.com/stretchr/testify/mock

func NewMockConsumer

func NewMockConsumer() *MockConsumer

NewMockConsumer returns a new MockConsumer struct

func (*MockConsumer) Assign

func (c *MockConsumer) Assign(partitions []lib.TopicPartition) (err error)

Assign method

func (*MockConsumer) Assignment

func (c *MockConsumer) Assignment() ([]lib.TopicPartition, error)

Assignment method

func (*MockConsumer) Close

func (c *MockConsumer) Close() (err error)

Close method

func (*MockConsumer) Commit

func (c *MockConsumer) Commit() ([]lib.TopicPartition, error)

Commit method

func (*MockConsumer) CommitMessage

func (c *MockConsumer) CommitMessage(m *lib.Message) ([]lib.TopicPartition, error)

CommitMessage method

func (*MockConsumer) CommitOffsets

func (c *MockConsumer) CommitOffsets(offsets []lib.TopicPartition) ([]lib.TopicPartition, error)

CommitOffsets method

func (*MockConsumer) Committed

func (c *MockConsumer) Committed(partitions []lib.TopicPartition, timeoutMs int) ([]lib.TopicPartition, error)

Committed method

func (*MockConsumer) Events

func (c *MockConsumer) Events() chan lib.Event

Events method

func (*MockConsumer) GetConsumerGroupMetadata

func (c *MockConsumer) GetConsumerGroupMetadata() (*lib.ConsumerGroupMetadata, error)

GetConsumerGroupMetadata method

func (*MockConsumer) GetMetadata

func (c *MockConsumer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*lib.Metadata, error)

GetMetadata method

func (*MockConsumer) GetWatermarkOffsets

func (c *MockConsumer) GetWatermarkOffsets(topic string, partition int32) (low, high int64, err error)

GetWatermarkOffsets method

func (*MockConsumer) Logs

func (c *MockConsumer) Logs() chan lib.LogEvent

Logs method

func (*MockConsumer) OffsetsForTimes

func (c *MockConsumer) OffsetsForTimes(times []lib.TopicPartition, timeoutMs int) (offsets []lib.TopicPartition, err error)

OffsetsForTimes method

func (*MockConsumer) Pause

func (c *MockConsumer) Pause(partitions []lib.TopicPartition) (err error)

Pause method

func (*MockConsumer) Poll

func (c *MockConsumer) Poll(timeoutMs int) (event lib.Event)

Poll method

func (*MockConsumer) Position

func (c *MockConsumer) Position(partitions []lib.TopicPartition) (offsets []lib.TopicPartition, err error)

Position method

func (*MockConsumer) QueryWatermarkOffsets

func (c *MockConsumer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)

QueryWatermarkOffsets method

func (*MockConsumer) ReadMessage

func (c *MockConsumer) ReadMessage(timeout time.Duration) (*lib.Message, error)

ReadMessage method

func (*MockConsumer) Resume

func (c *MockConsumer) Resume(partitions []lib.TopicPartition) (err error)

Resume method

func (*MockConsumer) Seek

func (c *MockConsumer) Seek(partition lib.TopicPartition, timeoutMs int) error

Seek method

func (*MockConsumer) SetOAuthBearerToken

func (c *MockConsumer) SetOAuthBearerToken(oauthBearerToken lib.OAuthBearerToken) error

SetOAuthBearerToken method

func (*MockConsumer) SetOAuthBearerTokenFailure

func (c *MockConsumer) SetOAuthBearerTokenFailure(errstr string) error

SetOAuthBearerTokenFailure method

func (*MockConsumer) StoreOffsets

func (c *MockConsumer) StoreOffsets(offsets []lib.TopicPartition) ([]lib.TopicPartition, error)

StoreOffsets method

func (*MockConsumer) String

func (c *MockConsumer) String() string

String method

func (*MockConsumer) Subscribe

func (c *MockConsumer) Subscribe(topic string, rebalanceCb lib.RebalanceCb) error

Subscribe method

func (*MockConsumer) SubscribeTopics

func (c *MockConsumer) SubscribeTopics(topics []string, rebalanceCb lib.RebalanceCb) (err error)

SubscribeTopics method

func (*MockConsumer) Subscription

func (c *MockConsumer) Subscription() (topics []string, err error)

Subscription method

func (*MockConsumer) Unassign

func (c *MockConsumer) Unassign() (err error)

Unassign method

func (*MockConsumer) Unsubscribe

func (c *MockConsumer) Unsubscribe() (err error)

Unsubscribe method

type MockProducer

type MockProducer struct {
	mock.Mock
}

MockProducer is a mock of the Producer interface using github.com/stretchr/testify/mock

func NewMockProducer

func NewMockProducer() *MockProducer

NewMockProducer returns a new MockProducer struct

func (*MockProducer) AbortTransaction

func (p *MockProducer) AbortTransaction(ctx context.Context) error

AbortTransaction method

func (*MockProducer) BeginTransaction

func (p *MockProducer) BeginTransaction() error

BeginTransaction method

func (*MockProducer) Close

func (p *MockProducer) Close()

Close emethod

func (*MockProducer) CommitTransaction

func (p *MockProducer) CommitTransaction(ctx context.Context) error

CommitTransaction method

func (*MockProducer) Events

func (p *MockProducer) Events() chan lib.Event

Events method

func (*MockProducer) Flush

func (p *MockProducer) Flush(timeoutMs int) int

Flush method

func (*MockProducer) GetFatalError

func (p *MockProducer) GetFatalError() error

GetFatalError method

func (*MockProducer) GetMetadata

func (p *MockProducer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*lib.Metadata, error)

GetMetadata method

func (*MockProducer) InitTransactions

func (p *MockProducer) InitTransactions(ctx context.Context) error

InitTransactions method

func (*MockProducer) Len

func (p *MockProducer) Len() int

Len method

func (*MockProducer) Logs

func (p *MockProducer) Logs() chan lib.LogEvent

Logs method

func (*MockProducer) OffsetsForTimes

func (p *MockProducer) OffsetsForTimes(times []lib.TopicPartition, timeoutMs int) ([]lib.TopicPartition, error)

OffsetsForTimes method

func (*MockProducer) Produce

func (p *MockProducer) Produce(msg *lib.Message, deliveryChan chan lib.Event) error

Produce method

func (*MockProducer) ProduceChannel

func (p *MockProducer) ProduceChannel() chan *lib.Message

ProduceChannel method

func (*MockProducer) Purge

func (p *MockProducer) Purge(flags int) error

Purge method

func (*MockProducer) QueryWatermarkOffsets

func (p *MockProducer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)

QueryWatermarkOffsets method

func (*MockProducer) SendOffsetsToTransaction

func (p *MockProducer) SendOffsetsToTransaction(ctx context.Context, offsets []lib.TopicPartition, consumerMetadata *lib.ConsumerGroupMetadata) error

SendOffsetsToTransaction method

func (*MockProducer) SetOAuthBearerToken

func (p *MockProducer) SetOAuthBearerToken(oauthBearerToken lib.OAuthBearerToken) error

SetOAuthBearerToken method

func (*MockProducer) SetOAuthBearerTokenFailure

func (p *MockProducer) SetOAuthBearerTokenFailure(errstr string) error

SetOAuthBearerTokenFailure method

func (*MockProducer) String

func (p *MockProducer) String() string

String method

func (*MockProducer) TestFatalError

func (p *MockProducer) TestFatalError(code lib.ErrorCode, str string) lib.ErrorCode

TestFatalError method

type Producer

type Producer interface {
	AbortTransaction(ctx context.Context) error
	BeginTransaction() error
	Close()
	CommitTransaction(ctx context.Context) error
	Events() chan lib.Event
	Flush(timeoutMs int) int
	GetFatalError() error
	GetMetadata(topic *string, allTopics bool, timeoutMs int) (*lib.Metadata, error)
	InitTransactions(ctx context.Context) error
	Len() int
	Logs() chan lib.LogEvent
	OffsetsForTimes(times []lib.TopicPartition, timeoutMs int) (offsets []lib.TopicPartition, err error)
	Produce(msg *lib.Message, deliveryChan chan lib.Event) error
	ProduceChannel() chan *lib.Message
	Purge(flags int) error
	QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
	SendOffsetsToTransaction(ctx context.Context, offsets []lib.TopicPartition, consumerMetadata *lib.ConsumerGroupMetadata) error
	SetOAuthBearerToken(oauthBearerToken lib.OAuthBearerToken) error
	SetOAuthBearerTokenFailure(errstr string) error
	String() string
	TestFatalError(code lib.ErrorCode, str string) lib.ErrorCode
}

Producer is an interface for the confluent-kafka-go Producer struct

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL