queue

package module
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 6, 2024 License: MIT Imports: 13 Imported by: 0

README

Queue for Tinh Tinh

GitHub Release GitHub License

Description

Package Queue for Tinh Tinh

Install

go get -u github.com/tinh-tinh/queue

Features

  • Robust design based on Redis
  • Delayed jobs
  • Schedule and repeat jobs according to a cron specification
  • Rate limiter for jobs
  • Retries
  • Priority
  • Concurrency
  • Pause/resume
  • Automatic recovery from process crashes

Documentation

Index

Constants

View Source
const QUEUE core.Provide = "QUEUE"

Variables

This section is empty.

Functions

func Min

func Min(a int, b int) int

func Register

func Register(name string, opt *Options) core.Module

Register registers a new queue module with the given name and options. The registered module creates a new queue with the given name and options, and exports the queue under the name "<name>Queue".

Types

type AddJobOptions

type AddJobOptions struct {
	Id       string
	Data     interface{}
	Priority int
}

type Callback

type Callback func() error

type Job

type Job struct {
	Id       string
	Data     interface{}
	Priority int
	Status   JobStatus

	ProcessedOn   time.Time
	FinishedOn    time.Time
	Stacktrace    []string
	FailedReason  string
	RetryFailures int
	// contains filtered or unexported fields
}

func (*Job) IsFinished

func (job *Job) IsFinished() bool

IsFinished returns true if the job has finished, either successfully or with an error.

func (*Job) IsReady

func (job *Job) IsReady() bool

IsReady returns true if the job is ready to be processed. If the job uses a scheduler, it will always be ready. Otherwise, the job is ready if it is waiting or active.

func (*Job) Process

func (job *Job) Process(cb Callback)

Process runs the given callback and updates the job's status accordingly. It also measures and logs the execution time. If the callback returns an error, the job is either retried or marked as failed.

type JobFnc

type JobFnc func(job *Job)

type JobStatus

type JobStatus string
const (
	WaitStatus      JobStatus = "wait"
	DelayedStatus   JobStatus = "delayed"
	ActiveStatus    JobStatus = "active"
	CompletedStatus JobStatus = "completed"
	FailedStatus    JobStatus = "failed"
)

type Options

type Options struct {
	Connect       *redis.Options
	Workers       int
	RetryFailures int
	Limiter       *RateLimiter
	Pattern       string
}

type Queue

type Queue struct {
	Name string

	RetryFailures int
	// contains filtered or unexported fields
}

func Inject

func Inject(module *core.DynamicModule, name string) *Queue

InjectQueue injects a queue from the given module, using the given name. If the module does not contain a queue with the given name, or if the queue is not of type *Queue, InjectQueue returns nil.

func New

func New(name string, opt *Options) *Queue

New creates a new queue with the given name and options. The name is used to identify the queue in Redis, and the options are used to configure the queue behavior. The options are as follows:

- Connect: the Redis connection options - Workers: the number of workers to run concurrently - RetryFailures: the number of times to retry a failed job - Limiter: the rate limiter options - Pattern: the cron pattern to use for scheduling jobs

The returned queue is ready to use.

func (*Queue) AddJob

func (q *Queue) AddJob(opt AddJobOptions)

AddJob adds a new job to the queue. If the queue is currently rate limited, the job is delayed. Otherwise, the job is added to the waiting list and the queue is run.

func (*Queue) BulkAddJob

func (q *Queue) BulkAddJob(options []AddJobOptions)

BulkAddJob adds multiple jobs to the queue at once. If the queue is currently rate limited, the jobs are delayed. Otherwise, the jobs are added to the waiting list and the queue is run.

func (*Queue) CountJobs

func (q *Queue) CountJobs(status JobStatus) int

CountJobs returns the number of jobs in the queue that have the given status.

This can be used to monitor the queue, and to test the queue's behavior.

func (*Queue) IsLimit

func (q *Queue) IsLimit() bool

IsLimit returns true if the number of jobs in the queue has reached the maximum value set in the RateLimiter. It checks the current value of the counter in Redis and returns true if it is greater than or equal to the maximum value. If the counter does not exist or is less than the maximum, it increments the counter and returns false. If the increment fails, it panics.

func (*Queue) Pause

func (q *Queue) Pause()

Pause stops the queue from running. When paused, the queue will not accept new jobs and will not run any jobs in the queue. It will resume when Resume is called.

func (*Queue) Process

func (q *Queue) Process(jobFnc JobFnc)

Process sets the callback for the queue to process jobs. If the queue has a scheduler, it will be started with the given cron pattern. Otherwise, the callback is simply stored.

func (*Queue) Remove

func (q *Queue) Remove(key string)

Remove removes the job with the given key from the queue. It uses a linear search, so it has a time complexity of O(n), where n is the number of jobs in the queue.

func (*Queue) Resume

func (q *Queue) Resume()

Resume resumes the queue from a paused state. When resumed, the queue will accept new jobs and run any jobs in the queue.

func (*Queue) Retry

func (q *Queue) Retry()

func (*Queue) Run

func (q *Queue) Run()

Run runs all ready jobs in the queue. It locks the mutex, runs all ready jobs in parallel, and then unlocks the mutex. If the queue has a scheduler, it will be started with the given cron pattern. Otherwise, the callback is simply stored.

type RateLimiter

type RateLimiter struct {
	Max      int
	Duration time.Duration
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL