guex

package module
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 6, 2023 License: MIT Imports: 22 Imported by: 0

README

guex

this repo is madness

created in several hours and this is combine of forked https://github.com/vgarvardt/gue and https://github.com/hibiken/asynqmon

in gue fixed long transactions and integrated asynqmon as interface

this queue you can use in next cases:

  • low/mid size of tasks in project
  • long-time tasks
  • persistent storage for tasks
  • if you have postgres at project and don't want install rabbit/kafka/redis/temporal and other heavy services

Documentation

Index

Constants

View Source
const (
	// WorkerIdxUnknown is returned when worker index in the pool is not set for some reasons.
	WorkerIdxUnknown = -1
)

Variables

View Source
var (
	// DefaultExponentialBackoff is the exponential Backoff implementation with default config applied
	DefaultExponentialBackoff = NewExponentialBackoff(exp.Config{
		BaseDelay:  1.0 * time.Second,
		Multiplier: 1.6,
		Jitter:     0.2,
		MaxDelay:   1.0 * time.Hour,
	})

	// BackoffNever is the Backoff implementation that never returns errored job to the queue for retry,
	// but discards it in case of the error.
	BackoffNever = func(retries int) time.Duration {
		return -1
	}
)
View Source
var (
	Meter        = global.MeterProvider().Meter("guex")
	EnqueueMeter instrument.Int64Counter
)
View Source
var ErrDiscard = errors.New("error discard")
View Source
var ErrMissingType = errors.New("job type must be specified")

ErrMissingType is returned when you attempt to enqueue a job with no Type specified.

Functions

func ErrRescheduleJobAt

func ErrRescheduleJobAt(t time.Time, reason string) error

ErrRescheduleJobAt spawns an error that reschedules a job to run at some predefined time.

func ErrRescheduleJobIn

func ErrRescheduleJobIn(d time.Duration, reason string) error

ErrRescheduleJobIn spawns an error that reschedules a job to run after some predefined duration.

func GetWorkerIdx

func GetWorkerIdx(ctx context.Context) int

GetWorkerIdx gets the index of the worker in the pool from the worker context. Returns WorkerIdxUnknown if the context is not set or the value is not found there.

func RandomStringID

func RandomStringID() string

RandomStringID returns random alphanumeric string that can be used as ID.

Types

type Backoff

type Backoff func(job *Job, retries int) time.Duration

Backoff is the interface for backoff implementation that will be used to reschedule errored jobs to a later time. If the Backoff implementation returns negative duration - the job will be discarded.

func NewConstantBackoff

func NewConstantBackoff(d time.Duration) Backoff

NewConstantBackoff instantiates new backoff implementation with teh constant retry duration that does not depend on the retry.

func NewExponentialBackoff

func NewExponentialBackoff(cfg exp.Config) Backoff

NewExponentialBackoff instantiates new exponential Backoff implementation with config

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is a Gue client that can add jobs to the queue and remove jobs from the queue.

func NewClient

func NewClient(pool *pgxpool.Pool) (c *Client)

NewClient creates a new Client that uses the pgx pool.

func (*Client) Enqueue

func (c *Client) Enqueue(ctx context.Context, j *Job) error

Enqueue adds a job to the queue.

func (*Client) EnqueueBatch

func (c *Client) EnqueueBatch(ctx context.Context, jobs []*Job) error

EnqueueBatch adds a batch of jobs. Operation is atomic, so either all jobs are added, or none.

func (*Client) EnqueueBatchTx

func (c *Client) EnqueueBatchTx(ctx context.Context, jobs []*Job, tx pgx.Tx) error

EnqueueBatchTx adds a batch of jobs within the scope of the transaction. This allows you to guarantee that an enqueued batch will either be committed or rolled back atomically with other changes in the course of this transaction.

It is the caller's responsibility to Commit or Rollback the transaction after this function is called.

func (*Client) EnqueueTx

func (c *Client) EnqueueTx(ctx context.Context, j *Job, tx pgx.Tx) error

EnqueueTx adds a job to the queue within the scope of the transaction. This allows you to guarantee that an enqueued job will either be committed or rolled back atomically with other changes in the course of this transaction.

It is the caller's responsibility to Commit or Rollback the transaction after this function is called.

func (*Client) LockNextScheduledJob

func (c *Client) LockNextScheduledJob(ctx context.Context, limits []QueueLimit) (jobs []*Job, err error)

LockNextScheduledJob attempts to retrieve the earliest scheduled Job from the database in the specified queue. If a job is found, it will be locked on the transactional level, so other workers will be skipping it. If no job is found, nil will be returned instead of an error.

This function cares about the scheduled time first to lock earliest to execute jobs first even if there are ones with a higher priority scheduled to a later time but already eligible for execution

Because Gue uses transaction-level locks, we have to hold the same transaction throughout the process of getting a job, working it, deleting it, and releasing the lock.

After the Job has been worked, you must call either Job.Done() or Job.Error() on it in order to commit transaction to persist Job changes (remove or update it).

func (*Client) RestoreStuck

func (c *Client) RestoreStuck(ctx context.Context, runAfter time.Duration, queue ...QueueLimit) (err error)

type ErrJobReschedule

type ErrJobReschedule interface {
	RescheduleJobAt() time.Time
}

ErrJobReschedule interface implementation allows errors to reschedule jobs in the individual basis.

type HookFunc

type HookFunc func(ctx context.Context, j *Job, err error)

HookFunc is a function that may react to a Job lifecycle events. All the callbacks are being executed synchronously, so be careful with the long-running locking operations. Hooks do not return an error, therefore they can not and must not be used to affect the Job execution flow, e.g. cancel it - this is the WorkFunc responsibility. Modifying Job fields and calling any methods that are modifying its state within hooks may lead to undefined behaviour. Please never do this.

Depending on the event err parameter may be empty or not - check the event description for its meaning.

type Job

type Job struct {
	database.Job
	// contains filtered or unexported fields
}

Job is a single unit of work for Gue to perform.

func (*Job) Done

func (j *Job) Done(ctx context.Context) error

Done commits transaction that marks job as done. If you got the job from the worker - it will take care of cleaning up the job and resources, no need to do this manually in a WorkFunc.

func (*Job) Error

func (j *Job) Error(ctx context.Context, jErr error) (err error)

Error marks the job as failed and schedules it to be reworked. An error message or backtrace can be provided as msg, which will be saved on the job. It will also increase the error count.

This call marks job as done and releases (commits) transaction, so calling Done() is not required, although calling it will not cause any issues. If you got the job from the worker - it will take care of cleaning up the job and resources, no need to do this manually in a WorkFunc.

func (*Job) Fail

func (j *Job) Fail(ctx context.Context) (err error)

Fail marks this job as failed

You must also later call Done() to return this job's database connection to the pool. If you got the job from the worker - it will take care of cleaning up the job and resources, no need to do this manually in a WorkFunc.

type PollStrategy

type PollStrategy string

PollStrategy determines how the DB is queried for the next job to work on

type QueueLimit

type QueueLimit struct {
	Queue string
	Limit int32
}

type WorkFunc

type WorkFunc func(ctx context.Context, j *Job) error

WorkFunc is the handler function that performs the Job. If an error is returned, the Job is either re-enqueued with the given backoff or is discarded based on the worker backoff strategy and returned error.

Modifying Job fields and calling any methods that are modifying its state within the handler may lead to undefined behaviour. Please never do this.

type WorkMap

type WorkMap map[string]WorkFunc

WorkMap is a map of Job names to WorkFuncs that are used to perform Jobs of a given type.

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool is a pool of Workers, each working jobs from the queue at the specified interval using the WorkMap.

func NewWorkerPool

func NewWorkerPool(c *Client, options ...WorkerPoolOption) (*WorkerPool, error)

NewWorkerPool creates a new WorkerPool with count workers using the Client c.

Each Worker in the pool default to a poll interval of 5 seconds, which can be overridden by WithPoolInterval option. The default queue is the nameless queue "", which can be overridden by WithPoolQueue option.

func (*WorkerPool) Client added in v0.1.1

func (w *WorkerPool) Client() *Client

func (*WorkerPool) Run

func (w *WorkerPool) Run(parentCtx context.Context) (err error)

Run runs all the Workers in the WorkerPool in own goroutines. Run blocks until all workers exit. Use context cancellation for shutdown.

func (*WorkerPool) Stop

func (w *WorkerPool) Stop()

func (*WorkerPool) WorkMap

func (w *WorkerPool) WorkMap(wm WorkMap)

func (*WorkerPool) WorkOne

func (w *WorkerPool) WorkOne(ctx context.Context) (err error)

WorkOne tries to consume single message from the queue.

type WorkerPoolOption

type WorkerPoolOption func(pool *WorkerPool)

WorkerPoolOption defines a type that allows to set worker pool properties during the build-time.

func WithBackoff added in v0.1.2

func WithBackoff(backoff Backoff) WorkerPoolOption

WithBackoff sets backoff implementation that will be applied to errored jobs within current client session.

func WithLogger

func WithLogger(l *zap.Logger) WorkerPoolOption

func WithPoolGracefulShutdown

func WithPoolGracefulShutdown(handlerCtx func() context.Context) WorkerPoolOption

WithPoolGracefulShutdown enables graceful shutdown mode for all workers in the pool. See WithWorkerGracefulShutdown for details.

func WithPoolHooksJobDone

func WithPoolHooksJobDone(hooks ...HookFunc) WorkerPoolOption

WithPoolHooksJobDone calls WithWorkerHooksJobDone for every worker in the pool.

func WithPoolHooksUnknownJobType

func WithPoolHooksUnknownJobType(hooks ...HookFunc) WorkerPoolOption

WithPoolHooksUnknownJobType calls WithWorkerHooksUnknownJobType for every worker in the pool.

func WithPoolID

func WithPoolID(id string) WorkerPoolOption

WithPoolID sets worker pool ID for easier identification in logs

func WithPoolInterval

func WithPoolInterval(d time.Duration) WorkerPoolOption

WithPoolInterval overrides default poll interval with the given value. Poll interval is the "sleep" duration if there were no jobs found in the DB.

func WithPoolPanicStackBufSize

func WithPoolPanicStackBufSize(size int) WorkerPoolOption

WithPoolPanicStackBufSize sets max size for the stacktrace buffer for panicking jobs. Default value is 1024 that is enough for most of the cases. Be careful setting buffer suze to the big values as this may affect overall performance.

func WithPoolQueueRestore

func WithPoolQueueRestore(restoreAfter, interval time.Duration) WorkerPoolOption

func WithWorkerPanicStackBufSize

func WithWorkerPanicStackBufSize(size int) WorkerPoolOption

WithWorkerPanicStackBufSize sets max size for the stacktrace buffer for panicking jobs. Default value is 1024 that is enough for most of the cases. Be careful setting buffer suze to the big values as this may affect overall performance.

func WithWorkerPanicWorkerMap

func WithWorkerPanicWorkerMap(workMap WorkMap) WorkerPoolOption

func WithWorkerPoolHandler

func WithWorkerPoolHandler(jobType string, h WorkFunc) WorkerPoolOption

func WithWorkerPoolQueue

func WithWorkerPoolQueue(queue ...QueueLimit) WorkerPoolOption

WithWorkerPoolQueue overrides default worker queue name with the given value.

Directories

Path Synopsis
ui module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL