rdelay

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 7, 2023 License: MIT Imports: 7 Imported by: 0

README

Go and redis based delayed/timed task implementation of the package, gradually production availability and high performance.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BucketConsumer

type BucketConsumer struct {
	// contains filtered or unexported fields
}

func NewBucketConsumer

func NewBucketConsumer(rds *redis.Client, bucketCount int, interval int, bucketParallel int, keyFunc QueueBucketKeyFunc) *BucketConsumer

func (*BucketConsumer) StartConsume

func (b *BucketConsumer) StartConsume(ctx context.Context, fn ConsumeFunc)

StartConsume 外部传进来的应该是一个 cancelCtx

type BucketProducer

type BucketProducer struct {
	// contains filtered or unexported fields
}

func NewBucketProducer

func NewBucketProducer(rds *redis.Client, bucketCount int, bucketFunc QueueBucketKeyFunc, memberFunc QueueMemberBucketFunc) *BucketProducer

func (*BucketProducer) Send

func (b *BucketProducer) Send(ctx context.Context, member QueueMember) error

func (*BucketProducer) SendDelay

func (b *BucketProducer) SendDelay(ctx context.Context, member string, duration time.Duration) error

type ConsumeFunc

type ConsumeFunc func(member QueueMember)

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(q *Queue, interval int, parallel int) *Consumer

func (*Consumer) Consume

func (c *Consumer) Consume(ctx context.Context, fn ConsumeFunc)

Consume Externally, ctx should be assigned a value such as context.WithCancel

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

func NewQueue

func NewQueue(rds *redis.Client, key string) *Queue

func (*Queue) Del

func (q *Queue) Del(ctx context.Context, member string) error

func (*Queue) Poll

func (q *Queue) Poll(ctx context.Context, nowUnix int64, pollSize int) (members []QueueMember, err error)

func (*Queue) Push

func (q *Queue) Push(ctx context.Context, member QueueMember) error

type QueueBucketKeyFunc

type QueueBucketKeyFunc func(bucketIdx int) string

type QueueMember

type QueueMember struct {
	Member    string
	DelayTime int64 // 生效时间
}

type QueueMemberBucketFunc

type QueueMemberBucketFunc func(member QueueMember) int

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL