gobeeq

package module
v0.0.0-...-97a652b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 25, 2022 License: MIT Imports: 11 Imported by: 0

README

English | 简体中文

Gobeeq

Golang implementation of Bee-Queue. A simple, fast, robust job/task queue backed by Redis.

Ci codecov Go Report Card Go Reference MPLv2 License

Prerequisites

  • Go: 1.13 and above.

Todo

  • Benchmark test

Notice

  • For compatible with the original Bee-Queue, all integer type of time/duration is millisecond format.
  • For more robust and efficiency scripts execution, there's no ensure scripts process, but use Run() of github.com/go-redis/redis, which optimistically uses EVALSHA to run the script, if the script does not exist it is retried using EVAL.
  • Since events is not associated with specific Job instances, this implementation do not provide Job store like the original Bee-Queue.

License

MIT

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrRedisClientRequired      = errors.New("gobeeq: Redis client is required")
	ErrInvalidResult            = errors.New("gobeeq: invalid Redis result")
	ErrInvalidJobStatus         = errors.New("gobeeq: invalid job status")
	ErrQueueClosed              = errors.New("gobeeq: queue is already closed")
	ErrHandlerAlreadyRegistered = errors.New("gobeeq: handler already registered")
	ErrHandlerPanicked          = errors.New("gobeeq: invoking handler caused a panic")
)

Functions

This section is empty.

Types

type Backoff

type Backoff struct {
	Strategy BackoffStrategy `json:"strategy"`
	Delay    int64           `json:"delay"`
}

type BackoffStrategy

type BackoffStrategy string
const (
	BackoffImmediate   BackoffStrategy = "immediate"
	BackoffFixed       BackoffStrategy = "fixed"
	BackoffExponential BackoffStrategy = "exponential"
)

type Context

type Context interface {
	context.Context

	GetId() string
	BindData(i interface{}) error
	SetResult(v interface{}) error
	ReportProgress(p interface{}) error
}

type Data

type Data struct {
	Data    json.RawMessage `json:"data"`
	Options *Options        `json:"options"`
	Status  `json:"status"`
}

type DefaultScriptsProvider

type DefaultScriptsProvider struct {
	// contains filtered or unexported fields
}

func (DefaultScriptsProvider) AddDelayedJob

func (p DefaultScriptsProvider) AddDelayedJob() *redis.Script

func (DefaultScriptsProvider) AddJob

func (p DefaultScriptsProvider) AddJob() *redis.Script

func (DefaultScriptsProvider) CheckStalledJobs

func (p DefaultScriptsProvider) CheckStalledJobs() *redis.Script

func (DefaultScriptsProvider) RaiseDelayedJobs

func (p DefaultScriptsProvider) RaiseDelayedJobs() *redis.Script

func (DefaultScriptsProvider) RemoveJob

func (p DefaultScriptsProvider) RemoveJob() *redis.Script

type EagerTimer

type EagerTimer struct {
	// contains filtered or unexported fields
}

A timer that will eagerly replace an existing timer with a sooner one. Refuses to set a later timer, or a timer beyond the given maximum delay.

func NewEagerTimer

func NewEagerTimer(maxDelay time.Duration, fn func(ctx context.Context),
) (*EagerTimer, error)

New create an eager timer with maximun delay and callback function for scheduling.

func (*EagerTimer) Schedule

func (et *EagerTimer) Schedule(t time.Time)

Schedule set next scheduling time. It can't be late then maxDelay from now.

func (*EagerTimer) Stop

func (et *EagerTimer) Stop()

Stop stops an eager timer. panic if use for a stopped timer.

type Event

type Event string
const (
	EventSuccessed Event = "succeeded"
	EventRetrying  Event = "retrying"
	EventFailed    Event = "failed"
	EventProgress  Event = "progress"
)

type Job

type Job struct {
	Id string
	// contains filtered or unexported fields
}

func (*Job) Backoff

func (j *Job) Backoff(b Backoff) *Job

Set backoff strategy when retry for job fails. Defaults to immediate.

func (*Job) DelayUntil

func (j *Job) DelayUntil(t time.Time) *Job

DelayUntil set the job Delay until the given time. See the `Queue` settings section for information on controlling the activation of delayed jobs.

Defaults to enqueueing the job for immediate processing.

func (*Job) Remove

func (j *Job) Remove(ctx context.Context) error

Remove removes a job from the queue.

This may have unintended side-effect, e.g. if the job is currently being processed by another worker, so only use this method when you know it's safe.

func (*Job) Retries

func (j *Job) Retries(n int) *Job

Sets how many times the job should be automatically retried in case of failure.

Stored in `job.options.retries` and decremented each time the job is retried.

Defaults to 0.

func (*Job) Save

func (j *Job) Save(ctx context.Context) (*Job, error)

Save save job and returns job pointer for chainable call.

func (*Job) SetId

func (j *Job) SetId(id string) *Job

SetId explicitly sets the ID of the job. If a job with the given ID already exists, the Job will not be created, and `job.id` will be set to `null`. This method can be used to run a job once for each of an external resource by passing that resource's ID. For instance, you might run the setup job for a user only once by setting the job ID to the ID of the user. Furthermore, when this feature is used with queue settings `removeOnSuccess: true` and `removeOnFailure: true`, it will allow that job to be re-run again, effectively ensuring that jobId will have a global concurrency of 1.

Avoid passing a numeric job ID, as it may conflict with an auto-generated ID.

func (*Job) Timeout

func (j *Job) Timeout(t int64) *Job

Timeout set timeout milliseconds for a job. If the job's handler function takes longer than the timeout to call `done`, the worker assumes the job has failed and reports it as such (causing the job to retry if applicable).

Defaults to no timeout.

type Message

type Message struct {
	Id    string          `json:"id"`
	Event Event           `json:"event"`
	Data  json.RawMessage `json:"data"`
}

type Options

type Options struct {
	Timestamp   int64    `json:"timestamp"` // ms
	Stacktraces []string `json:"stacktraces"`
	Timeout     int64    `json:"timeout"` // ms
	Delay       int64    `json:"delay"`   // ms
	Retries     int      `json:"retries"`
	Backoff     Backoff  `json:"backoff"`
}

type ProcessFunc

type ProcessFunc func(Context) error

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

func NewQueue

func NewQueue(
	ctx context.Context, name string, r redis.UniversalClient, opts ...QueueOption,
) (*Queue, error)

NewQueue create a queue instance.

func (*Queue) CheckHealth

func (q *Queue) CheckHealth(ctx context.Context) (*QueueStatus, error)

CheckHealth check the "health" of the queue. Returns the number of jobs in each state (`waiting`, `active`, `succeeded`, `failed`, `delayed`), and the newest job ID (if using the default ID behavior). You can periodically query the `NewestJobId` to estimate the job creation throughput, and can infer the job processing throughput by incorporating the `waiting` and `active` counts.

func (*Queue) CheckStalledJobs

func (q *Queue) CheckStalledJobs(interval time.Duration)

CheckStalledJobs Check for stalled jobs. The interval on which to check for stalled jobs. This should be set to half the stallInterval setting, to avoid unnecessary work.

func (*Queue) Close

func (q *Queue) Close() error

Close close queue and wait for 30s before all processing jobs finished.

func (*Queue) CloseTimeout

func (q *Queue) CloseTimeout(t time.Duration) error

CloseTimeout close queue and wait for t before all processing jobs finished.

func (*Queue) CreateJob

func (q *Queue) CreateJob(data interface{}) *Job

CreateJob create a job instance with the associated user data.

func (*Queue) Destroy

func (q *Queue) Destroy(ctx context.Context) error

Destroy removes all Redis keys belonging to this queue.

func (*Queue) GetJob

func (q *Queue) GetJob(ctx context.Context, id string) (*Job, error)

GetJob get a job by its id.

func (*Queue) GetJobs

func (q *Queue) GetJobs(ctx context.Context,
	s Status, start, end int64, size int) ([]Job, error)

GetJobs get jobs from queue. When get jobs of status `waiting`, active`, or `delayed`, set `start` and `end` to specify a range of job indices to return. Jobs of status `failed` and `succeeded` will return an arbitrary subset of the queue of size `size`. Note: This is because failed and succeeded jobs are represented by a Redis SET, which does not maintain a job ordering.

Note that large values of the attributes of `page` may cause excess load on the Redis server.

func (*Queue) IsRunning

func (q *Queue) IsRunning() bool

IsRunning returns `true` unless the Queue is shutting down

func (*Queue) Process

func (q *Queue) Process(h func(Context) error) error

Process begins processing jobs with the provided handler function.

func (*Queue) ProcessConcurrently

func (q *Queue) ProcessConcurrently(
	concurrency int64,
	h func(Context) error,
) error

ProcessConcurrently begins processing jobs with the provided concurrency and handler function.

func (*Queue) RemoveJob

func (q *Queue) RemoveJob(ctx context.Context, id string) error

RemoveJob removes a job from the queue by jobId.

This may have unintended side-effect, e.g. if the job is currently being processed by another worker, so only use this method when you know it's safe.

func (*Queue) SaveAll

func (q *Queue) SaveAll(ctx context.Context, jobs []Job) error

SaveAll Save all the provided jobs, without waiting for each job to be created. This pipelines the requests which avoids the waiting 2N*RTT for N jobs - the client waits to receive each command result before sending the next command.

type QueueOption

type QueueOption func(*Queue)

func WithActivateDelayedJobs

func WithActivateDelayedJobs(b bool, onRaised func(numRaised int64)) QueueOption

WithActivateDelayedJobs activate delayed jobs once they've passed their `delayUntil` timestamp. Note that this must be enabled on at least one `Queue` instance for the delayed retry strategies (`fixed` and `exponential`) - this will reactivate them after their computed delay.

func WithDelayedDebounce

func WithDelayedDebounce(d time.Duration) QueueOption

WithDelayedDebounce to avoid unnecessary churn for several jobs in short succession, the Queue may delay individual jobs by up to this duration.

func WithLogger

func WithLogger(l log.Logger) QueueOption

WithLogger set custome logger.

func WithNearTermWindow

func WithNearTermWindow(d time.Duration) QueueOption

WithNearTermWindow set the window during which delayed jobs will be specifically scheduled. If all delayed jobs are further out that this window, the Queue will double-check that it hasn't missed any jobs after the window elapses.

func WithOnJobFailed

func WithOnJobFailed(fn func(jobId string, err error)) QueueOption

WithOnJobFailed set a receiver for job failed event message from Redis.

func WithOnJobProgress

func WithOnJobProgress(fn func(jobId string, progress json.RawMessage)) QueueOption

WithOnJobProgress set a receiver for job progress event message from Redis.

func WithOnJobRetrying

func WithOnJobRetrying(fn func(jobId string, err error)) QueueOption

WithOnJobRetrying set a receiver for job retrying event message from Redis.

func WithOnJobSucceeded

func WithOnJobSucceeded(fn func(jobId string, result json.RawMessage)) QueueOption

WithOnJobSucceeded set a receiver for job succeeded event message from Redis.

func WithPrefix

func WithPrefix(s string) QueueOption

WithPrefix set prefix of redis key, default `bq`. Useful if the `bq:` namespace is, for whatever reason, unavailable or problematic on your redis instance.

func WithRedisScanCount

func WithRedisScanCount(i int) QueueOption

WithRedisScanCount set RedisScanCount which is the value of the `SSCAN` Redis command used in `Queue#GetJobs` for succeeded and failed job types.

func WithRemoveOnFailure

func WithRemoveOnFailure(b bool) QueueOption

WithRemoveOnFailure enable to have this worker automatically remove its failed jobs from Redis, so as to keep memory usage down. This will not remove jobs that are set to retry unless they fail all their retries.

func WithRemoveOnSuccess

func WithRemoveOnSuccess(b bool) QueueOption

WithRemoveOnSuccess enable to have this worker automatically remove its successfully completed jobs from Redis, so as to keep memory usage down.

func WithScriptsProvider

func WithScriptsProvider(i ScriptsProvider) QueueOption

WithScriptsProvider set provider for lua scripts.

func WithSendEvents

func WithSendEvents(b bool) QueueOption

WithSendEvents set if need send job events to queues.

func WithStallInterval

func WithStallInterval(d time.Duration) QueueOption

WithStallInterval set the window in which workers must report that they aren't stalling. Higher values will reduce Redis/network overhead, but if a worker stalls, it will take longer before its stalled job(s) will be retried. A higher value will also result in a lower probability of false-positives during stall detection.

type QueueStatus

type QueueStatus struct {
	Keys        map[key]int64
	NewestJobId int64
}

type ScriptsProvider

type ScriptsProvider interface {
	CheckStalledJobs() *redis.Script
	AddJob() *redis.Script
	RemoveJob() *redis.Script
	AddDelayedJob() *redis.Script
	RaiseDelayedJobs() *redis.Script
}

type Settings

type Settings struct {
	Prefix              string
	StallInterval       time.Duration
	NearTermWindow      time.Duration
	DelayedDebounce     time.Duration
	SendEvents          bool
	ActivateDelayedJobs bool
	RemoveOnSuccess     bool
	RemoveOnFailure     bool
	RedisScanCount      int
}

type Status

type Status string
const (
	StatusCreated   Status = "created"
	StatusSucceeded Status = "succeeded"
	StatusRetrying  Status = "retrying"
	StatusFailed    Status = "failed"
	StatusWaiting   Status = "waiting"
	StatusActive    Status = "active"
	StatusDelayed   Status = "delayed"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL