kq

package
v1.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 18, 2024 License: MIT Imports: 20 Imported by: 22

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MustNewQueue

func MustNewQueue(c KqConf, handler ConsumeHandler, opts ...QueueOption) queue.MessageQueue

func NewQueue

func NewQueue(c KqConf, handler ConsumeHandler, opts ...QueueOption) (queue.MessageQueue, error)

Types

type ConsumeErrorHandler added in v1.2.0

type ConsumeErrorHandler func(msg kafka.Message, err error)

type ConsumeHandle

type ConsumeHandle func(key, value string) error

type ConsumeHandler

type ConsumeHandler interface {
	Consume(key, value string) error
}

func WithHandle

func WithHandle(handle ConsumeHandle) ConsumeHandler

type KqConf

type KqConf struct {
	service.ServiceConf
	Brokers     []string
	Group       string
	Topic       string
	CaFile      string `json:",optional"`
	Offset      string `json:",options=first|last,default=last"`
	Conns       int    `json:",default=1"`
	Consumers   int    `json:",default=8"`
	Processors  int    `json:",default=8"`
	MinBytes    int    `json:",default=10240"`    // 10K
	MaxBytes    int    `json:",default=10485760"` // 10M
	Username    string `json:",optional"`
	Password    string `json:",optional"`
	ForceCommit bool   `json:",default=true"`
}

type PushOption

type PushOption func(options *pushOptions)

func WithAllowAutoTopicCreation added in v1.2.0

func WithAllowAutoTopicCreation() PushOption

WithAllowAutoTopicCreation allows the Pusher to create the given topic if it does not exist.

func WithChunkSize

func WithChunkSize(chunkSize int) PushOption

WithChunkSize customizes the Pusher with the given chunk size.

func WithFlushInterval

func WithFlushInterval(interval time.Duration) PushOption

WithFlushInterval customizes the Pusher with the given flush interval.

type Pusher

type Pusher struct {
	// contains filtered or unexported fields
}

func NewPusher

func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher

NewPusher returns a Pusher with the given Kafka addresses and topic.

func (*Pusher) Close

func (p *Pusher) Close() error

Close closes the Pusher and releases any resources used by it.

func (*Pusher) Name

func (p *Pusher) Name() string

Name returns the name of the Kafka topic that the Pusher is sending messages to.

func (*Pusher) Push

func (p *Pusher) Push(v string) error

Push sends a message to the Kafka topic.

type QueueOption

type QueueOption func(*queueOptions)

func WithCommitInterval

func WithCommitInterval(interval time.Duration) QueueOption

func WithErrorHandler added in v1.2.0

func WithErrorHandler(errorHandler ConsumeErrorHandler) QueueOption

func WithMaxWait

func WithMaxWait(wait time.Duration) QueueOption

func WithMetrics

func WithMetrics(metrics *stat.Metrics) QueueOption

func WithQueueCapacity

func WithQueueCapacity(queueCapacity int) QueueOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL