consumer

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 10, 2023 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	NoMessage = repository.NoMessage
	Paused    = errors.New("paused")
)

Functions

This section is empty.

Types

type ConsumeController

type ConsumeController struct {
	// contains filtered or unexported fields
}

func NewConsumeController

func NewConsumeController(ctx context.Context, ch <-chan registry.Event, repo repository.ConsumerRepository,
	interceptor *interceptor.Interceptor, rg registry.Registry,
) *ConsumeController

func (*ConsumeController) AddWeight

func (c *ConsumeController) AddWeight(ctx context.Context, biz string)

func (*ConsumeController) Assign

func (c *ConsumeController) Assign(ctx context.Context, bizName string)

Assign 需要开启goroutine

func (*ConsumeController) CalGoroutineNum

func (c *ConsumeController) CalGoroutineNum(qps int) int

func (*ConsumeController) Pull

func (c *ConsumeController) Pull(ctx context.Context, biz string, queueReady chan struct{})

func (*ConsumeController) Schedule

func (c *ConsumeController) Schedule(ctx context.Context, wg *sync.WaitGroup)

Schedule 需要开启goroutine

func (*ConsumeController) SubWeight

func (c *ConsumeController) SubWeight(ctx context.Context, biz string)

SubWeight 开始消费时,减掉权重,消费完成,权重加回去

func (*ConsumeController) WatchLeftTask

func (c *ConsumeController) WatchLeftTask(ctx context.Context, wg *sync.WaitGroup, key string, interval time.Duration)

func (*ConsumeController) WriteBackLeftMessage

func (c *ConsumeController) WriteBackLeftMessage(ctx context.Context, biz string, msgs []domain.Message) error

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(repo repository.ConsumerRepository, interceptor *interceptor.Interceptor) *Consumer

func (*Consumer) Consume

func (c *Consumer) Consume(ctx, dequeueCtx context.Context, biz string, finished, queueReady chan struct{}) error

func (*Consumer) ConsumeOld

func (c *Consumer) ConsumeOld(ctx context.Context, bizName string) error

func (*Consumer) Push

func (c *Consumer) Push(msg domain.Message)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL