kafka

package
v0.0.0-...-d6a9545 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 9, 2016 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrTimeout          = errors.New("timeout")
	ErrClosed           = errors.New("consumer closed")
	ErrBadAck           = errors.New("bad ack")
	ErrNewConsumer      = errors.New("new kafka consumer failed")
	ErrIdcNotExist      = errors.New("idc not exist")
	ErrInvaildPartition = errors.New("invaild partition")
	ErrInvaildOffset    = errors.New("invaild offset")
	ErrEmptyAddr        = errors.New("empty addrs")
)

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(brokerAddrs map[string][]string, config *cluster.Config, topic, group string) (*Consumer, error)

func (*Consumer) Ack

func (c *Consumer) Ack(idc string, partition int32, offset int64) error

func (*Consumer) Close

func (c *Consumer) Close()

Close 不能多次重复调用

func (*Consumer) Recv

func (c *Consumer) Recv() (msg *sarama.ConsumerMessage, idc string, err error)

Get a message

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

func NewManager

func NewManager(zkAddrs []string, kafkaRoot string, conf *sarama.Config) (*Manager, error)

new a kafka manager by give zookeeper address of kafka

func (*Manager) Accumulation

func (m *Manager) Accumulation(topic, group string) (int64, int64, error)

获得指定topic, group堆积的消息的信息

func (*Manager) BrokerAddrs

func (m *Manager) BrokerAddrs() []string

get broker address from manager's cached data

func (*Manager) BrokersList

func (m *Manager) BrokersList() []int32

get broker id list from manager's cached data

func (*Manager) Close

func (m *Manager) Close() error

close manager

func (*Manager) CommitOffset

func (m *Manager) CommitOffset(topic, group string, offsets map[int32]int64) error

目前只能用于新建group时的offset置位,当group join-group后,Kafka server需要检查 OffsetCommitRequest的ConsumerGroupGeneration、ConsumerID是否有效,这样,该API 就会返回失败。当有新的需求时,应当考虑该API是否需要重新设计。

func (*Manager) CreateTopic

func (m *Manager) CreateTopic(topic string, replications int32, partitions int32) error

create a topic by given name

func (*Manager) DeleteTopic

func (m *Manager) DeleteTopic(topic string) error

mark given topic to delete

func (*Manager) ExistTopic

func (m *Manager) ExistTopic(topic string) (bool, error)

test given topic whether exists.

func (*Manager) FetchGroupOffsets

func (m *Manager) FetchGroupOffsets(topic, group string) (map[int32]int64, error)

func (*Manager) FetchTopicOffsets

func (m *Manager) FetchTopicOffsets(topic string, time int64) (map[int32]int64, error)

得到的offset为该topic将要写入消息的offset

func (*Manager) RefreshMetadata

func (m *Manager) RefreshMetadata() error

refresh the available metadata of kafka

func (*Manager) Topics

func (m *Manager) Topics() (topics []string, err error)

Topics returns the set of available topics as retrieved from cluster metadata.

func (*Manager) UpdateTopic

func (m *Manager) UpdateTopic(topic string, partitions int) error

add partitions to a topic

type Producer

type Producer struct {
	sarama.SyncProducer
}

func NewProducer

func NewProducer(brokerAddrs []string, conf *sarama.Config) (*Producer, error)

func (*Producer) Send

func (p *Producer) Send(topic string, key, data []byte) (partition int32, offset int64, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL