kafka

package
v0.0.0-...-f79bcc6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 29, 2022 License: Apache-2.0 Imports: 4 Imported by: 0

README

更简单易用的kafka客户端

参考 B站代码

Documentation

Index

Constants

View Source
const (
	// RequireNone the producer won’t even wait for a response from the broker.
	RequireNone kafka.RequiredAcks = kafka.RequireNone
	// RequireOne the producer will consider the write successful when the leader receives the record.
	RequireOne kafka.RequiredAcks = kafka.RequireOne
	// RequireAll the producer will consider the write successful when all of the in-sync replicas receive the record.
	RequireAll kafka.RequiredAcks = kafka.RequireAll
)

Variables

View Source
var ErrEventFull = errors.New("message event chan full")

ErrEventFull is a message event chan full.

Functions

This section is empty.

Types

type Event

type Event struct {
	// Key sets the key of the message for routing policy
	Key string
	// Payload for the message
	Payload []byte
	// Properties attach application defined properties on the message
	Properties map[string]string
	// topic
	Topic string
	// offset
	Offset int64
	// 分区
	Partition int
}

type Handler

type Handler func(context.Context, Event) error

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

func NewPublisher

func NewPublisher(brokers []string, opts ...PublisherOption) *Publisher

NewPublisher new a kafka publisher.

func (*Publisher) Close

func (p *Publisher) Close() error

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, event Event) error

type PublisherOption

type PublisherOption func(*Publisher)

PublisherOption is a publisher options.

func BatchSize

func BatchSize(size int) PublisherOption

func EventBuffer

func EventBuffer(n int) PublisherOption

EventBuffer with event buffer option.

func ReadTimeout

func ReadTimeout(d time.Duration) PublisherOption

ReadTimeout with read timeout option.

func RequiredAcks

func RequiredAcks(acks kafka.RequiredAcks) PublisherOption

RequiredAcks with required acks option.

func WriteTimeout

func WriteTimeout(d time.Duration) PublisherOption

WriteTimeout with write timeout option.

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

func NewSubscriber

func NewSubscriber(topic string, brokers []string, opts ...SubscriberOption) *Subscriber

NewSubscriber new a kafka subscriber.

func (*Subscriber) Close

func (s *Subscriber) Close() error

func (*Subscriber) GetEntity

func (s *Subscriber) GetEntity() *kafka.Reader

func (*Subscriber) ReadMessage

func (s *Subscriber) ReadMessage(ctx context.Context) (kafka.Message, error)

func (*Subscriber) Subscribe

func (s *Subscriber) Subscribe(ctx context.Context, h Handler) error

type SubscriberOption

type SubscriberOption func(*Subscriber)

SubscriberOption is a subscriber option.

func ConsumerGroup

func ConsumerGroup(id string) SubscriberOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL