kafka

package
v0.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 25, 2020 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultMetadataRetryMax         = 3
	DefaultMetadataRetryBackoff     = util.Duration(100 * time.Millisecond)
	DefaultMetadataRefreshFrequency = util.Duration(10 * time.Minute)
	DefaultMaxMessageBytes          = 4194304
	DefaultDialTimeout              = util.Duration(time.Second)
	DefaultReadTimeout              = util.Duration(time.Second)
	DefaultWriteTimeout             = util.Duration(time.Second)
	DefaultProducerTimeout          = util.Duration(time.Second)
	DefaultProducerRetryMax         = 1
	DefaultProducerRetryBackoff     = util.Duration(100 * time.Millisecond)
	DefaultChannelBufferSize        = 2048
	DefaultMonitorInterval          = util.Duration(10 * time.Second)
)

Variables

This section is empty.

Functions

func NewPool

func NewPool() *sync.Pool

Types

type ClusterConfig

type ClusterConfig struct {
	ClusterName              string        `yaml:"ClusterName" mapstructure:"ClusterName"`
	Brokers                  []string      `yaml:"Brokers" mapstructure:"Brokers"`
	MetadataRetryMax         int           `yaml:"MetadataRetryMax" mapstructure:"MetadataRetryMax"`
	MetadataRetryBackoff     util.Duration `yaml:"MetadataRetryBackoff" mapstructure:"MetadataRetryBackoff"`
	MetadataRefreshFrequency util.Duration `yaml:"MetadataRefreshFrequency" mapstructure:"MetadataRefreshFrequency"`
	MaxMessageBytes          int           `yaml:"MaxMessageBytes" mapstructure:"MaxMessageBytes"`
	DialTimeout              util.Duration `yaml:"DialTimeout" mapstructure:"DialTimeout"`
	ReadTimeout              util.Duration `yaml:"ReadTimeout" mapstructure:"ReadTimeout"`
	WriteTimeout             util.Duration `yaml:"WriteTimeout" mapstructure:"WriteTimeout"`
	ProducerTimeout          util.Duration `yaml:"ProducerTimeout" mapstructure:"ProducerTimeout"`
	ProducerRetryMax         int           `yaml:"ProducerRetryMax" mapstructure:"ProducerRetryMax"`
	ProducerRetryBackoff     util.Duration `yaml:"ProducerRetryBackoff" mapstructure:"ProducerRetryBackoff"`
	ChannelBufferSize        int           `yaml:"ChannelBufferSize" mapstructure:"ChannelBufferSize"`
	MonitorInterval          util.Duration `yaml:"MonitorInterval" mapstructure:"MonitorInterval"`
}

func NewClusterConfig

func NewClusterConfig() *ClusterConfig

func (*ClusterConfig) AddBroker

func (ccfg *ClusterConfig) AddBroker(broker ...string)

func (*ClusterConfig) NewKafkaConfig

func (ccfg *ClusterConfig) NewKafkaConfig() *sarama.Config

func (*ClusterConfig) UnmarshalYAML

func (ccfg *ClusterConfig) UnmarshalYAML(unmarshal func(v interface{}) error) error

type Config

type Config []*ClusterConfig

func NewConfig

func NewConfig() Config

type Message

type Message struct {
	// contains filtered or unexported fields
}

func GetMessageWrapper

func GetMessageWrapper() *Message

func (*Message) Free

func (msg *Message) Free()

func (*Message) GetMessage

func (msg *Message) GetMessage() *sarama.ProducerMessage

func (*Message) SetContent

func (msg *Message) SetContent(value []byte)

func (*Message) SetTopic

func (msg *Message) SetTopic(topic string)

type Service

type Service struct {
	Config Config

	Closing chan struct{}
	Closed  chan struct{}

	Logger *zap.Logger
	// contains filtered or unexported fields
}

func New

func New(config Config) (*Service, error)

func (*Service) Close

func (s *Service) Close() error

func (*Service) CloseProducer

func (s *Service) CloseProducer() error

func (*Service) Open

func (s *Service) Open() error

func (*Service) Produce

func (s *Service) Produce(topic string, msg []byte, async bool, cluster string) error

func (*Service) WithLogger

func (s *Service) WithLogger(logger *zap.Logger)

type Statistics

type Statistics struct {
	AsyncSuccess uint64
	AsyncError   uint64
	SyncSuccess  uint64
	SyncError    uint64
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL