Documentation
¶
Index ¶
- Variables
- type Backoff
- type BackoffStrategy
- type Context
- type Data
- type DefaultScriptsProvider
- func (p DefaultScriptsProvider) AddDelayedJob() *redis.Script
- func (p DefaultScriptsProvider) AddJob() *redis.Script
- func (p DefaultScriptsProvider) CheckStalledJobs() *redis.Script
- func (p DefaultScriptsProvider) RaiseDelayedJobs() *redis.Script
- func (p DefaultScriptsProvider) RemoveJob() *redis.Script
- type EagerTimer
- type Event
- type Job
- type Message
- type Options
- type ProcessFunc
- type Queue
- func (q *Queue) CheckHealth(ctx context.Context) (*QueueStatus, error)
- func (q *Queue) CheckStalledJobs(interval time.Duration)
- func (q *Queue) Close() error
- func (q *Queue) CloseTimeout(t time.Duration) error
- func (q *Queue) CreateJob(data interface{}) *Job
- func (q *Queue) Destroy(ctx context.Context) error
- func (q *Queue) GetJob(ctx context.Context, id string) (*Job, error)
- func (q *Queue) GetJobs(ctx context.Context, s Status, start, end int64, size int) ([]Job, error)
- func (q *Queue) IsRunning() bool
- func (q *Queue) Process(h func(Context) error) error
- func (q *Queue) ProcessConcurrently(concurrency int64, h func(Context) error) error
- func (q *Queue) RemoveJob(ctx context.Context, id string) error
- func (q *Queue) SaveAll(ctx context.Context, jobs []Job) error
- type QueueOption
- func WithActivateDelayedJobs(b bool, onRaised func(numRaised int64)) QueueOption
- func WithDelayedDebounce(d time.Duration) QueueOption
- func WithLogger(l log.Logger) QueueOption
- func WithNearTermWindow(d time.Duration) QueueOption
- func WithOnJobFailed(fn func(jobId string, err error)) QueueOption
- func WithOnJobProgress(fn func(jobId string, progress json.RawMessage)) QueueOption
- func WithOnJobRetrying(fn func(jobId string, err error)) QueueOption
- func WithOnJobSucceeded(fn func(jobId string, result json.RawMessage)) QueueOption
- func WithPrefix(s string) QueueOption
- func WithRedisScanCount(i int) QueueOption
- func WithRemoveOnFailure(b bool) QueueOption
- func WithRemoveOnSuccess(b bool) QueueOption
- func WithScriptsProvider(i ScriptsProvider) QueueOption
- func WithSendEvents(b bool) QueueOption
- func WithStallInterval(d time.Duration) QueueOption
- type QueueStatus
- type ScriptsProvider
- type Settings
- type Status
Constants ¶
This section is empty.
Variables ¶
var ( ErrRedisClientRequired = errors.New("gobeeq: Redis client is required") ErrInvalidResult = errors.New("gobeeq: invalid Redis result") ErrInvalidJobStatus = errors.New("gobeeq: invalid job status") ErrQueueClosed = errors.New("gobeeq: queue is already closed") ErrHandlerAlreadyRegistered = errors.New("gobeeq: handler already registered") ErrHandlerPanicked = errors.New("gobeeq: invoking handler caused a panic") )
Functions ¶
This section is empty.
Types ¶
type Backoff ¶
type Backoff struct {
Strategy BackoffStrategy `json:"strategy"`
Delay int64 `json:"delay"`
}
type BackoffStrategy ¶
type BackoffStrategy string
const ( BackoffImmediate BackoffStrategy = "immediate" BackoffFixed BackoffStrategy = "fixed" BackoffExponential BackoffStrategy = "exponential" )
type Data ¶
type Data struct {
Data json.RawMessage `json:"data"`
Options *Options `json:"options"`
Status `json:"status"`
}
type DefaultScriptsProvider ¶
type DefaultScriptsProvider struct {
// contains filtered or unexported fields
}
func (DefaultScriptsProvider) AddDelayedJob ¶
func (p DefaultScriptsProvider) AddDelayedJob() *redis.Script
func (DefaultScriptsProvider) AddJob ¶
func (p DefaultScriptsProvider) AddJob() *redis.Script
func (DefaultScriptsProvider) CheckStalledJobs ¶
func (p DefaultScriptsProvider) CheckStalledJobs() *redis.Script
func (DefaultScriptsProvider) RaiseDelayedJobs ¶
func (p DefaultScriptsProvider) RaiseDelayedJobs() *redis.Script
func (DefaultScriptsProvider) RemoveJob ¶
func (p DefaultScriptsProvider) RemoveJob() *redis.Script
type EagerTimer ¶
type EagerTimer struct {
// contains filtered or unexported fields
}
A timer that will eagerly replace an existing timer with a sooner one. Refuses to set a later timer, or a timer beyond the given maximum delay.
func NewEagerTimer ¶
New create an eager timer with maximun delay and callback function for scheduling.
func (*EagerTimer) Schedule ¶
func (et *EagerTimer) Schedule(t time.Time)
Schedule set next scheduling time. It can't be late then maxDelay from now.
func (*EagerTimer) Stop ¶
func (et *EagerTimer) Stop()
Stop stops an eager timer. panic if use for a stopped timer.
type Job ¶
type Job struct {
Id string
// contains filtered or unexported fields
}
func (*Job) DelayUntil ¶
DelayUntil set the job Delay until the given time. See the `Queue` settings section for information on controlling the activation of delayed jobs.
Defaults to enqueueing the job for immediate processing.
func (*Job) Remove ¶
Remove removes a job from the queue.
This may have unintended side-effect, e.g. if the job is currently being processed by another worker, so only use this method when you know it's safe.
func (*Job) Retries ¶
Sets how many times the job should be automatically retried in case of failure.
Stored in `job.options.retries` and decremented each time the job is retried.
Defaults to 0.
func (*Job) SetId ¶
SetId explicitly sets the ID of the job. If a job with the given ID already exists, the Job will not be created, and `job.id` will be set to `null`. This method can be used to run a job once for each of an external resource by passing that resource's ID. For instance, you might run the setup job for a user only once by setting the job ID to the ID of the user. Furthermore, when this feature is used with queue settings `removeOnSuccess: true` and `removeOnFailure: true`, it will allow that job to be re-run again, effectively ensuring that jobId will have a global concurrency of 1.
Avoid passing a numeric job ID, as it may conflict with an auto-generated ID.
type Message ¶
type Message struct {
Id string `json:"id"`
Event Event `json:"event"`
Data json.RawMessage `json:"data"`
}
type ProcessFunc ¶
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
func NewQueue ¶
func NewQueue( ctx context.Context, name string, r redis.UniversalClient, opts ...QueueOption, ) (*Queue, error)
NewQueue create a queue instance.
func (*Queue) CheckHealth ¶
func (q *Queue) CheckHealth(ctx context.Context) (*QueueStatus, error)
CheckHealth check the "health" of the queue. Returns the number of jobs in each state (`waiting`, `active`, `succeeded`, `failed`, `delayed`), and the newest job ID (if using the default ID behavior). You can periodically query the `NewestJobId` to estimate the job creation throughput, and can infer the job processing throughput by incorporating the `waiting` and `active` counts.
func (*Queue) CheckStalledJobs ¶
CheckStalledJobs Check for stalled jobs. The interval on which to check for stalled jobs. This should be set to half the stallInterval setting, to avoid unnecessary work.
func (*Queue) CloseTimeout ¶
CloseTimeout close queue and wait for t before all processing jobs finished.
func (*Queue) GetJobs ¶
GetJobs get jobs from queue. When get jobs of status `waiting`, active`, or `delayed`, set `start` and `end` to specify a range of job indices to return. Jobs of status `failed` and `succeeded` will return an arbitrary subset of the queue of size `size`. Note: This is because failed and succeeded jobs are represented by a Redis SET, which does not maintain a job ordering.
Note that large values of the attributes of `page` may cause excess load on the Redis server.
func (*Queue) ProcessConcurrently ¶
ProcessConcurrently begins processing jobs with the provided concurrency and handler function.
func (*Queue) RemoveJob ¶
RemoveJob removes a job from the queue by jobId.
This may have unintended side-effect, e.g. if the job is currently being processed by another worker, so only use this method when you know it's safe.
type QueueOption ¶
type QueueOption func(*Queue)
func WithActivateDelayedJobs ¶
func WithActivateDelayedJobs(b bool, onRaised func(numRaised int64)) QueueOption
WithActivateDelayedJobs activate delayed jobs once they've passed their `delayUntil` timestamp. Note that this must be enabled on at least one `Queue` instance for the delayed retry strategies (`fixed` and `exponential`) - this will reactivate them after their computed delay.
func WithDelayedDebounce ¶
func WithDelayedDebounce(d time.Duration) QueueOption
WithDelayedDebounce to avoid unnecessary churn for several jobs in short succession, the Queue may delay individual jobs by up to this duration.
func WithNearTermWindow ¶
func WithNearTermWindow(d time.Duration) QueueOption
WithNearTermWindow set the window during which delayed jobs will be specifically scheduled. If all delayed jobs are further out that this window, the Queue will double-check that it hasn't missed any jobs after the window elapses.
func WithOnJobFailed ¶
func WithOnJobFailed(fn func(jobId string, err error)) QueueOption
WithOnJobFailed set a receiver for job failed event message from Redis.
func WithOnJobProgress ¶
func WithOnJobProgress(fn func(jobId string, progress json.RawMessage)) QueueOption
WithOnJobProgress set a receiver for job progress event message from Redis.
func WithOnJobRetrying ¶
func WithOnJobRetrying(fn func(jobId string, err error)) QueueOption
WithOnJobRetrying set a receiver for job retrying event message from Redis.
func WithOnJobSucceeded ¶
func WithOnJobSucceeded(fn func(jobId string, result json.RawMessage)) QueueOption
WithOnJobSucceeded set a receiver for job succeeded event message from Redis.
func WithPrefix ¶
func WithPrefix(s string) QueueOption
WithPrefix set prefix of redis key, default `bq`. Useful if the `bq:` namespace is, for whatever reason, unavailable or problematic on your redis instance.
func WithRedisScanCount ¶
func WithRedisScanCount(i int) QueueOption
WithRedisScanCount set RedisScanCount which is the value of the `SSCAN` Redis command used in `Queue#GetJobs` for succeeded and failed job types.
func WithRemoveOnFailure ¶
func WithRemoveOnFailure(b bool) QueueOption
WithRemoveOnFailure enable to have this worker automatically remove its failed jobs from Redis, so as to keep memory usage down. This will not remove jobs that are set to retry unless they fail all their retries.
func WithRemoveOnSuccess ¶
func WithRemoveOnSuccess(b bool) QueueOption
WithRemoveOnSuccess enable to have this worker automatically remove its successfully completed jobs from Redis, so as to keep memory usage down.
func WithScriptsProvider ¶
func WithScriptsProvider(i ScriptsProvider) QueueOption
WithScriptsProvider set provider for lua scripts.
func WithSendEvents ¶
func WithSendEvents(b bool) QueueOption
WithSendEvents set if need send job events to queues.
func WithStallInterval ¶
func WithStallInterval(d time.Duration) QueueOption
WithStallInterval set the window in which workers must report that they aren't stalling. Higher values will reduce Redis/network overhead, but if a worker stalls, it will take longer before its stalled job(s) will be retried. A higher value will also result in a lower probability of false-positives during stall detection.