kafkac

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 30, 2021 License: MIT Imports: 20 Imported by: 0

README

介绍

kafkac是一个kafka消息队列的客户端组件,在github.com/segmentio/kafka-go的基础上,封装了客户端发布消息失败(网络失败等)转储本地文件(或其他存储接口),状态恢复后自动重新进行发布/消费,检测和维护kafka的连接状态,并对外告警等琐碎但必要的功能。

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrorPaused = errors.New("paused")

Functions

func CreateTopic

func CreateTopic(brokerAddr string, topic string, NumPartitions, ReplicationFactor int) error

use when auto.create.topics.enable='false'

func ListTopics

func ListTopics(brokerAddr string) ([]string, error)

brokerAddr host:port

Types

type Alarm

type Alarm interface {
	Alarm(message string)
}

type Consumer

type Consumer struct {
}

由于kafka是持久化和基于消费position的可回溯消费,因此consumer端无需再过度封装。 无论是低层api还是高层api,基于pattition或是组消费,kafka-go已经做得足够好。 特此说明。

type FileRecover

type FileRecover struct {
	// contains filtered or unexported fields
}

func NewFileRecover

func NewFileRecover(topic, savePath string, bufCap int, writeFileIntervalSecond int) (*FileRecover, error)

func (*FileRecover) CloseNotify

func (m *FileRecover) CloseNotify() chan struct{}

func (*FileRecover) Restore

func (m *FileRecover) Restore(fn RestoreFunc)

func (*FileRecover) Save

func (m *FileRecover) Save(message Message) error

type KeyHashPartitionRule

type KeyHashPartitionRule struct {
	PartitionNum int
}

func (*KeyHashPartitionRule) Partition

func (m *KeyHashPartitionRule) Partition(topic string, key, value []byte) int

type Message

type Message struct {
	Key   []byte
	Value []byte
}

type PartitionRule

type PartitionRule interface {
	Partition(topic string, key, value []byte) int
}

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(topic string, partitionRule PartitionRule, alarm Alarm, recover ProducerRecover, brokerAddrList []string) *Producer

func (*Producer) PublishMessage

func (m *Producer) PublishMessage(msg Message) error

func (*Producer) PublishMessages

func (m *Producer) PublishMessages(msgs []Message, partition int) (int, error)

type ProducerRecover

type ProducerRecover interface {
	Save(message Message) error
	Restore(fn RestoreFunc)
	CloseNotify() chan struct{}
}

type RestoreFunc

type RestoreFunc func(message *Message, err error, canceled *bool)

type RoundRobinPartitionRule

type RoundRobinPartitionRule struct {
	PartitionNum int

	sync.Mutex
	// contains filtered or unexported fields
}

func (*RoundRobinPartitionRule) Partition

func (m *RoundRobinPartitionRule) Partition(topic string, key, value []byte) int

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL