Documentation
¶
Index ¶
- Variables
- type ConsumeController
- func (c *ConsumeController) AddWeight(ctx context.Context, biz string)
- func (c *ConsumeController) Assign(ctx context.Context, bizName string)
- func (c *ConsumeController) CalGoroutineNum(qps int) int
- func (c *ConsumeController) Pull(ctx context.Context, biz string, queueReady chan struct{})
- func (c *ConsumeController) Schedule(ctx context.Context, wg *sync.WaitGroup)
- func (c *ConsumeController) SubWeight(ctx context.Context, biz string)
- func (c *ConsumeController) WatchLeftTask(ctx context.Context, wg *sync.WaitGroup, key string, interval time.Duration)
- func (c *ConsumeController) WriteBackLeftMessage(ctx context.Context, biz string, msgs []domain.Message) error
- type Consumer
Constants ¶
This section is empty.
Variables ¶
View Source
var ( NoMessage = repository.NoMessage Paused = errors.New("paused") )
Functions ¶
This section is empty.
Types ¶
type ConsumeController ¶
type ConsumeController struct {
// contains filtered or unexported fields
}
func NewConsumeController ¶
func NewConsumeController(ctx context.Context, ch <-chan registry.Event, repo repository.ConsumerRepository, interceptor *interceptor.Interceptor, rg registry.Registry, ) *ConsumeController
func (*ConsumeController) AddWeight ¶
func (c *ConsumeController) AddWeight(ctx context.Context, biz string)
func (*ConsumeController) Assign ¶
func (c *ConsumeController) Assign(ctx context.Context, bizName string)
Assign 需要开启goroutine
func (*ConsumeController) CalGoroutineNum ¶
func (c *ConsumeController) CalGoroutineNum(qps int) int
func (*ConsumeController) Pull ¶
func (c *ConsumeController) Pull(ctx context.Context, biz string, queueReady chan struct{})
func (*ConsumeController) Schedule ¶
func (c *ConsumeController) Schedule(ctx context.Context, wg *sync.WaitGroup)
Schedule 需要开启goroutine
func (*ConsumeController) SubWeight ¶
func (c *ConsumeController) SubWeight(ctx context.Context, biz string)
SubWeight 开始消费时,减掉权重,消费完成,权重加回去
func (*ConsumeController) WatchLeftTask ¶
func (*ConsumeController) WriteBackLeftMessage ¶
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func NewConsumer ¶
func NewConsumer(repo repository.ConsumerRepository, interceptor *interceptor.Interceptor) *Consumer
func (*Consumer) ConsumeOld ¶
Click to show internal directories.
Click to hide internal directories.