kago

package module
v0.0.0-...-04f7f48 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 10, 2018 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func InitOffsetFile

func InitOffsetFile()

init fileMap

func ListDir

func ListDir(dirPth string, suffix string) (files []string, err error)

获取指定目录下的所有文件,不进入下一级目录搜索,可以匹配后缀过滤。

func Max

func Max(a, b int64) int64

func Partitions

func Partitions(addr []string, topic string, conf *Config) ([]int32, error)

func Topics

func Topics(addr []string, conf *Config) ([]string, error)

issue github.com/Shopify/sarama/issues/1130 has been recovered

Types

type AsyncProducer

type AsyncProducer struct {
	Id              int
	ProducerGroupId string
	// contains filtered or unexported fields
}

func InitManualRetryAsyncProducer

func InitManualRetryAsyncProducer(addr []string, conf *Config) (*AsyncProducer, error)

one asyncProducer without retry

func InitManualRetryAsyncProducerGroup

func InitManualRetryAsyncProducerGroup(addr []string, conf *Config, groupId string) ([]*AsyncProducer, error)

some(config.AsyncProducerAmount) asyncProducer without retry

func (*AsyncProducer) Close

func (asp *AsyncProducer) Close() (err error)

func (*AsyncProducer) Errors

func (asp *AsyncProducer) Errors() <-chan *ProducerError

func (*AsyncProducer) Send

func (asp *AsyncProducer) Send() chan<- *ProducerMessage

send message

func (*AsyncProducer) Successes

func (asp *AsyncProducer) Successes() <-chan *ProducerMessage

type Config

type Config struct {
	sarama_cluster.Config
	SyncProducerAmount    int
	AsyncProducerAmount   int
	ConsumerOfGroupAmount int
	OffsetLocalOrServer   int //0,local  1,server  2,newest
}

func NewConfig

func NewConfig() (conf *Config)

type Consumer

type Consumer struct {
	Topic   string
	GroupId string
	// contains filtered or unexported fields
}

func InitConsumersOfGroup

func InitConsumersOfGroup(addr []string, topic string, groupId string, conf *Config) ([]*Consumer, error)

func InitOneConsumerOfGroup

func InitOneConsumerOfGroup(addr []string, topic string, groupId string, conf *Config) (*Consumer, error)

func (*Consumer) Close

func (cs *Consumer) Close() error

func (*Consumer) CommitOffsets

func (cs *Consumer) CommitOffsets() error

func (*Consumer) Errors

func (cs *Consumer) Errors() <-chan error

func (*Consumer) MarkOffset

func (cs *Consumer) MarkOffset(topic string, partition int32, offset int64, groupId string, ifExactOnce bool)

func (*Consumer) Notifications

func (cs *Consumer) Notifications() <-chan *NotifyMessage

func (*Consumer) Recv

func (cs *Consumer) Recv() <-chan *ConsumerMessage

func (*Consumer) ResetOffset

func (cs *Consumer) ResetOffset(topic string, partition int32, offset int64, groupId string, ifExactOnce bool)

type ConsumerError

type ConsumerError = sarama.ConsumerError

type ConsumerMessage

type ConsumerMessage = sarama.ConsumerMessage

type NotifyMessage

type NotifyMessage = sarama_cluster.Notification

type PartitionConsumer

type PartitionConsumer struct {
	Topic     string
	Partition int32
	GroupId   string
	// contains filtered or unexported fields
}

func InitPartitionConsumer

func InitPartitionConsumer(addr []string, topic string, partition int32, groupId string, conf *Config) (*PartitionConsumer, error)

func InitPartitionConsumers

func InitPartitionConsumers(addr []string, topic string, groupId string, conf *Config) ([]*PartitionConsumer, error)

func (*PartitionConsumer) Close

func (pcs *PartitionConsumer) Close() error

func (*PartitionConsumer) Errors

func (pcs *PartitionConsumer) Errors() <-chan *ConsumerError

func (*PartitionConsumer) Recv

func (pcs *PartitionConsumer) Recv() <-chan *ConsumerMessage

type PartitionOffsetManager

type PartitionOffsetManager struct {
	// contains filtered or unexported fields
}

func InitPartitionOffsetManager

func InitPartitionOffsetManager(addr []string, topic, groupId string, partition int32, conf *Config) (*PartitionOffsetManager, error)

func (*PartitionOffsetManager) Close

func (pom *PartitionOffsetManager) Close() (error, error)

func (*PartitionOffsetManager) Errors

func (pom *PartitionOffsetManager) Errors() <-chan *ConsumerError

func (*PartitionOffsetManager) MarkOffset

func (pom *PartitionOffsetManager) MarkOffset(topic string, partition int32, offset int64, groupId string, ifExactOnce bool)

func (*PartitionOffsetManager) NextOffset

func (pom *PartitionOffsetManager) NextOffset() (offset int64)

func (*PartitionOffsetManager) ResetOffset

func (pom *PartitionOffsetManager) ResetOffset(topic string, partition int32, offset int64, groupId string, ifExactOnce bool)

type ProducerError

type ProducerError = sarama.ProducerError

type ProducerMessage

type ProducerMessage = sarama.ProducerMessage

type SyncProducer

type SyncProducer struct {
	Id              int
	ProducerGroupId string
	// contains filtered or unexported fields
}

func InitManualRetrySyncProducer

func InitManualRetrySyncProducer(addr []string, conf *Config) (*SyncProducer, error)

one syncProducer without retry

func InitManualRetrySyncProducerGroup

func InitManualRetrySyncProducerGroup(addr []string, conf *Config, groupId string) ([]*SyncProducer, error)

some(config.SyncProducerAmount) syncProducer without retry

func (*SyncProducer) Close

func (sp *SyncProducer) Close() (err error)

func (*SyncProducer) SendMessage

func (sp *SyncProducer) SendMessage(msg *ProducerMessage) (string, int32, int64, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL