kq

package
v0.0.0-...-41313b4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 6, 2025 License: MIT Imports: 26 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MustNewQueue

func MustNewQueue(c KqConf, handler ConsumeHandler, queueOpts ...QueueOption) queue.MessageQueue

func NewQueue

func NewQueue(c KqConf, handler ConsumeHandler, queueOpts ...QueueOption) (queue.MessageQueue, error)

Types

type BatchHandle

type BatchHandle func(ctx context.Context, items []kafka.Message) error

type ConsumeErrorHandler

type ConsumeErrorHandler func(ctx context.Context, err error, msgs ...kafka.Message)

type ConsumeHandle

type ConsumeHandle func(ctx context.Context, key, value string) error

type ConsumeHandler

type ConsumeHandler interface {
	Consume(ctx context.Context, key, value string) error
}

func WithHandle

func WithHandle(handle ConsumeHandle) ConsumeHandler

type KqConf

type KqConf struct {
	service.ServiceConf
	Brokers    []string
	Group      string
	Topic      string
	CaFile     string `json:",optional"`
	Offset     string `json:",options=first|last,default=last"`
	Conns      int    `json:",default=1"`
	Consumers  int    `json:",default=8"`
	Processors int    `json:",default=8"`
	MinBytes   int    `json:",default=10240"`    // 10K
	MaxBytes   int    `json:",default=10485760"` // 10M
	Username   string `json:",optional"`
	Password   string `json:",optional"`
	//ForceCommit   bool   `json:",default=true"`
	CommitInOrder bool `json:",default=false"`
}

type PushOption

type PushOption func(options *pushOptions)

func WithAllowAutoTopicCreation

func WithAllowAutoTopicCreation() PushOption

WithAllowAutoTopicCreation allows the Pusher to create the given topic if it does not exist.

func WithBalancer

func WithBalancer(balancer kafka.Balancer) PushOption

WithBalancer customizes the Pusher with the given balancer.

func WithChunkSize

func WithChunkSize(chunkSize int) PushOption

WithChunkSize customizes the Pusher with the given chunk size.

func WithCompletion

func WithCompletion(completion func(messages []kafka.Message, err error)) PushOption

WithCompletion

@Description:
@param completion
@return PushOption

func WithFlushInterval

func WithFlushInterval(interval time.Duration) PushOption

WithFlushInterval customizes the Pusher with the given flush interval.

func WithMaxAttempts

func WithMaxAttempts(maxAttempts int) PushOption

WithMaxAttempts

@Description:
@param MaxAttempts
@return PushOption

func WithRequiredAcks

func WithRequiredAcks(acks kafka.RequiredAcks) PushOption

WithRequiredAcks

@Description:
@param acks

Number of acknowledges from partition replicas required before receiving a response to a produce request, the following values are supported:

 RequireNone (0)  fire-and-forget, do not wait for acknowledgements from the
 RequireOne  (1)  wait for the leader to acknowledge the writes
 RequireAll  (-1) wait for the full ISR to acknowledge the writes

	@return PushOption

func WithSyncPush

func WithSyncPush() PushOption

WithSyncPush enables the Pusher to push messages synchronously.

func WithWriteBackoffMax

func WithWriteBackoffMax(writeBackoffMax time.Duration) PushOption

WithWriteBackoffMax

 @Description:
 WriteBackoffMax optionally sets the maximum amount of time the writer waits before
	it attempts to write a batchProcessor of messages
	Default: 1s
 @param writeBackoffMax
 @return PushOption

func WithWriteBackoffMin

func WithWriteBackoffMin(writeBackoffMin time.Duration) PushOption

WithWriteBackoffMin

@Description:

WriteBackoffMin optionally sets the smallest amount of time the writer waits before it attempts to write a batchProcessor of messages

Default: 100ms

@param writeBackoffMin
@return PushOption

type Pusher

type Pusher struct {
	// contains filtered or unexported fields
}

func NewPusher

func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher

NewPusher returns a Pusher with the given Kafka addresses and topic.

func (*Pusher) Close

func (p *Pusher) Close() error

Close closes the Pusher and releases any resources used by it.

func (*Pusher) KPush

func (p *Pusher) KPush(ctx context.Context, k, v string) error

KPush sends a message to the Kafka topic.

func (*Pusher) Name

func (p *Pusher) Name() string

Name returns the name of the Kafka topic that the Pusher is sending messages to.

func (*Pusher) Push

func (p *Pusher) Push(ctx context.Context, v string) error

Push sends a message to the Kafka topic.

func (*Pusher) PushWithKey

func (p *Pusher) PushWithKey(ctx context.Context, key, v string) error

PushWithKey sends a message with the given key to the Kafka topic.

type QueueOption

type QueueOption func(*queueOptions)

func WithBatchFlushInterval

func WithBatchFlushInterval(flushInterval string) QueueOption

WithBatchFlushInterval

@Description:  batchProcessor flush of the windows
@param batchFlushInterval default 1s
@return BatchOption

func WithBatchHandle

func WithBatchHandle(batchHandle BatchHandle) QueueOption

WithBatchHandle

@Description: batchProcessor handle
@param batchHandle
@return QueueOption

func WithBatchSize

func WithBatchSize(batchSize int) QueueOption

WithBatchSize

@Description:
@param batchSize default 1000
@return BatchOption

func WithCommitInterval

func WithCommitInterval(interval time.Duration) QueueOption

WithCommitInterval

@Description:
@param interval
@return QueueOption

func WithErrorHandler

func WithErrorHandler(errorHandler ConsumeErrorHandler) QueueOption

WithErrorHandler

@Description: go-queue handle message  ,if err!=nil  ,will call this func
@param errorHandler
@return QueueOption

func WithMaxWait

func WithMaxWait(wait time.Duration) QueueOption

func WithMetrics

func WithMetrics(metrics *stat.Metrics) QueueOption

func WithQueueCapacity

func WithQueueCapacity(queueCapacity int) QueueOption

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL