workqueue

package
v1.0.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 17, 2022 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Backoff

type Backoff struct {
	Attempt uint64
	// Factor is the multiplying factor for each increment step
	Factor float64
	// Jitter eases contention by randomizing backoff steps
	Jitter bool
	// Min and Max are the minimum and maximum values of the counter
	Min, Max   time.Duration
	Expiration time.Duration
	Limit      int
}

Backoff is a time.Duration counter, starting at Min. After every call to the Duration method the current timing is multiplied by Factor, but it never exceeds Max.

Backoff is not generally concurrent-safe, but the ForAttempt method can be used concurrently.

func (*Backoff) Copy

func (b *Backoff) Copy() *Backoff

Copy returns a backoff with equals constraints as the original

func (*Backoff) Duration

func (b *Backoff) Duration() time.Duration

Duration returns the duration for the current attempt before incrementing the attempt counter. See ForAttempt.

func (*Backoff) ForAttempt

func (b *Backoff) ForAttempt(attempt float64) time.Duration

ForAttempt returns the duration for a specific attempt. This is useful if you have a large number of independent Backoffs, but don't want use unnecessary memory storing the Backoff parameters per Backoff. The first attempt should be 0.

ForAttempt is concurrent-safe.

func (*Backoff) GetAttempt added in v1.0.2

func (b *Backoff) GetAttempt() float64

Attempt returns the current attempt counter value.

func (*Backoff) Reset

func (b *Backoff) Reset()

Reset restarts the current attempt counter at zero.

type Job

type Job struct {
	Status JobStatus
	//Unique identifier for a Job
	ID   uint64
	Name string // user provided descriptive information
	//Data contains the bytes that were pushed using Queue.PushBytes()
	Data []byte
	//RetryCount is the number of times the job has been retried
	//If your work can have a temporary failure state, it is recommended
	//that you check retry count and return a fatal error after a certain
	//number of retries
	RetryCount int
	//Message is primarily used for debugging. It contains status info
	//about what was last done with the job.
	Message string

	ExpectAt int64 // in millisecond
	Backoff        // backoff should be stored per job for timing schedule
}

Job wraps arbitrary data for processing (需要encode/decode,所以字段都需要大写)

func DecodeJob

func DecodeJob(b []byte) (*Job, error)

DecodeJob decodes a gob encoded byte array into a Job struct and returns a pointer to it

func (*Job) Bytes

func (j *Job) Bytes() []byte

Bytes returns a gob encoded byte array representation of *j

func (Job) String

func (j Job) String() string

type JobStatus

type JobStatus int

JobStatus is a enumerated int representing the processing status of a Job

const (
	Uack JobStatus = iota
	Ack
	Nack
	Failed
)

JobStatus types

func (JobStatus) String

func (s JobStatus) String() string

State returns the job status in human readable form

type Optioner added in v1.0.8

type Optioner func(q *Queue)

func WithArchive added in v1.0.8

func WithArchive(archiveFailed bool) Optioner

func WithAutoStart added in v1.0.10

func WithAutoStart(auto bool) Optioner

func WithBackoff added in v1.0.8

func WithBackoff(backoff Backoff) Optioner

func WithBatchSize added in v1.0.8

func WithBatchSize(size int) Optioner

size should in range [1,1000]

func WithPollRate added in v1.0.8

func WithPollRate(rate time.Duration) Optioner

type Queue

type Queue struct {
	//ID is a unique identifier for a Queue
	ID string
	// contains filtered or unexported fields
}

Queue represents a queue

func Init

func Init(filepath string, w Worker, options ...Optioner) (*Queue, error)

Init creates a connection to the internal database and initializes the Queue type filepath must be a valid path to a file. It cannot be shared between instances of a Queue. If the file cannot be opened r/w, an error is returned.

func InitExtend added in v1.0.2

func InitExtend(filepath string, w Worker, timingSchedule bool, options ...Optioner) (*Queue, error)

func InitTiming added in v1.0.2

func InitTiming(filepath string, w Worker, options ...Optioner) (*Queue, error)

func (*Queue) CleanFailedJobs

func (q *Queue) CleanFailedJobs() error

CleanFailedJobs loops through all jobs marked as failed and deletes them from the database Warning: this is destructive, that job data is definitely done if you call this function.

func (*Queue) Close

func (q *Queue) Close() error

Close attempts to gracefull shutdown all workers in a queue and shutdown the db connection

func (*Queue) Push added in v1.0.4

func (q *Queue) Push(name string, obj interface{}, expectAts ...time.Time) (uint64, error)

func (*Queue) PushAfter added in v1.0.5

func (q *Queue) PushAfter(name string, obj interface{}, expectDelays ...time.Duration) (uint64, error)

func (*Queue) PushBytes

func (q *Queue) PushBytes(name string, d []byte, expectAts ...time.Time) (uint64, error)

PushBytes wraps arbitrary binary data in a job and pushes it onto the queue

func (*Queue) RestoreFailedJobs

func (q *Queue) RestoreFailedJobs() error

func (*Queue) Size

func (q *Queue) Size() (int, int, error)

func (*Queue) Start added in v1.0.10

func (q *Queue) Start() bool

func (*Queue) Stop added in v1.0.10

func (q *Queue) Stop()

type RecoverableWorkerError

type RecoverableWorkerError struct {
	// contains filtered or unexported fields
}

RecoverableWorkerError defines an error that a worker DoWork func can return that indicates the message should be retried

func NewRecoverableWorkerError

func NewRecoverableWorkerError(message string) RecoverableWorkerError

NewRecoverableWorkerError creates a new RecoverableWorkerError

func (RecoverableWorkerError) Error

func (e RecoverableWorkerError) Error() string

type Worker

type Worker interface {
	//DoWork is called when a worker picks up a job from the queue
	//Context can be used for cancelling jobs early when Close
	//is called on the Queue
	DoWork(context.Context, *Job) error
	//ID is a semi-unique identifier for a worker
	//it is primarily used for logging purposes
	ID() string
}

Worker represents a worker for handling Jobs

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL