v1.2.51 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Jun 13, 2020 License: MIT Imports: 7 Imported by: 0



Package admin provides an interface for kafka administrative operations



This section is empty.


This section is empty.


func NewKafkaAdmin

func NewKafkaAdmin(bootstrapServer []string, options ...KafkaAdminOption) *kafkaAdmin


type KafkaAdmin

type KafkaAdmin interface {
	FetchInfo(topics []string) (map[string]*Topic, error)
	CreateTopics(topics map[string]*Topic) error
	DeleteTopics(topics []string) (map[string]error, error)


type KafkaAdminOption

type KafkaAdminOption func(*kafkaAdminOptions)

func WithKafkaVersion

func WithKafkaVersion(version sarama.KafkaVersion) KafkaAdminOption

func WithLogger

func WithLogger(logger log.Logger) KafkaAdminOption

type MockKafkaAdmin

type MockKafkaAdmin struct {
	Topics *Topics

func NewMockAdminWithTopics

func NewMockAdminWithTopics(tps map[string]*Topic) *MockKafkaAdmin

func (*MockKafkaAdmin) Close

func (m *MockKafkaAdmin) Close()

func (*MockKafkaAdmin) CreateTopics

func (m *MockKafkaAdmin) CreateTopics(topics map[string]*Topic) error

func (*MockKafkaAdmin) DeleteTopics

func (m *MockKafkaAdmin) DeleteTopics(topics []string) (map[string]error, error)

func (*MockKafkaAdmin) FetchInfo

func (m *MockKafkaAdmin) FetchInfo(topics []string) (map[string]*Topic, error)

type MockPartition

type MockPartition struct {
	// contains filtered or unexported fields

func (*MockPartition) Append

func (p *MockPartition) Append(r *data.Record) error

func (*MockPartition) Fetch

func (p *MockPartition) Fetch(start int64, limit int) (records []*data.Record, err error)

func (*MockPartition) FetchAll

func (p *MockPartition) FetchAll() (records []*data.Record)

func (*MockPartition) Latest

func (p *MockPartition) Latest() int64

type MockTopic

type MockTopic struct {
	Name string

	Meta *Topic
	// contains filtered or unexported fields

func (*MockTopic) AddPartition

func (tp *MockTopic) AddPartition(id int) error

func (*MockTopic) FetchAll

func (tp *MockTopic) FetchAll() (records []*data.Record)

func (*MockTopic) Partition

func (tp *MockTopic) Partition(id int) (*MockPartition, error)

func (*MockTopic) Partitions

func (tp *MockTopic) Partitions() []*MockPartition

type Partition

type Partition struct {
	Id    int32
	Error error

type Topic

type Topic struct {
	Name              string
	Partitions        []Partition
	Error             error
	NumPartitions     int32
	ReplicationFactor int16
	ReplicaAssignment map[int32][]int32
	ConfigEntries     map[string]string

type Topics

type Topics struct {
	// contains filtered or unexported fields

func NewMockTopics

func NewMockTopics() *Topics

func (*Topics) AddTopic

func (td *Topics) AddTopic(topic *MockTopic) error

func (*Topics) RemoveTopic

func (td *Topics) RemoveTopic(name string) error

func (*Topics) Topic

func (td *Topics) Topic(name string) (*MockTopic, error)

func (*Topics) Topics

func (td *Topics) Topics() map[string]*MockTopic

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL