work

package module
v0.1.3-0...-545a7b7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 6, 2020 License: MIT Imports: 13 Imported by: 0

README

gocraft/work v2

GoDoc Go Report Card FOSSA Status CircleCI

This is the in-progress repo for gocraft/work rewrite.

Improvements

  • queue backend abstraction
    • redis is still the default, but the new design allows custom queue implementation.
  • simplify the keyspace design of redis queue backend
    • The new design uses 1 redis hash per job, and 1 redis sorted set for queue.
    • Interesting read
  • modular
    • The core only catches panics, retries on failure, and waits if a queue is empty.
    • All other functionalities are either removed or moved to separate middlewares.
  • support binary payload/args with message pack.
  • replace built-in UI with prometheus metrics (use grafana if you want dashboard).
  • additional optimizations (alloc + bulk queue ops)
    BenchmarkWorkerRunJob/work_v1_1-8         	    3000	    515957 ns/op
    BenchmarkWorkerRunJob/work_v2_1-8         	    5000	    284516 ns/op
    BenchmarkWorkerRunJob/work_v1_10-8        	    1000	   2136546 ns/op
    BenchmarkWorkerRunJob/work_v2_10-8        	    5000	    367997 ns/op
    BenchmarkWorkerRunJob/work_v1_100-8       	     100	  18234023 ns/op
    BenchmarkWorkerRunJob/work_v2_100-8       	    1000	   1759186 ns/op
    BenchmarkWorkerRunJob/work_v1_1000-8      	      10	 162110100 ns/op
    BenchmarkWorkerRunJob/work_v2_1000-8      	     100	  12646080 ns/op
    BenchmarkWorkerRunJob/work_v1_10000-8     	       1	1691287122 ns/op
    BenchmarkWorkerRunJob/work_v2_10000-8     	      10	 144923087 ns/op
    BenchmarkWorkerRunJob/work_v1_100000-8    	       1	17515722574 ns/op
    BenchmarkWorkerRunJob/work_v2_100000-8    	       1	1502468637 ns/op
    PASS
    ok  	github.com/charm-jp/work	87.901s
    
  • http enqueuer

License

FOSSA Status

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrEmptyNamespace = errors.New("work: empty namespace")
	ErrEmptyQueueID   = errors.New("work: empty queue id")
	ErrAt             = errors.New("work: at should not be zero")
	ErrInvisibleSec   = errors.New("work: invisible sec should be >= 0")
)

options validation errors

View Source
var (
	ErrMaxExecutionTime = errors.New("work: max execution time should be > 0")
	ErrNumGoroutines    = errors.New("work: number of goroutines should be > 0")
	ErrIdleWait         = errors.New("work: idle wait should be > 0")
)

options validation error

View Source
var (
	// ErrQueueNotFound is returned if the queue is not yet
	// defined with Register().
	ErrQueueNotFound = errors.New("work: queue is not found")

	// ErrUnrecoverable is returned if the error is unrecoverable.
	// The job will be discarded.
	ErrUnrecoverable = errors.New("work: permanent error")

	// ErrUnsupported is returned if it is not implemented.
	ErrUnsupported = errors.New("work: unsupported")
)
View Source
var (
	// ErrEmptyQueue is returned if Dequeue() is called on an empty queue.
	ErrEmptyQueue = errors.New("work: no job is found")
)

Functions

This section is empty.

Types

type AckOptions

type AckOptions struct {
	Namespace string
	QueueID   string
}

AckOptions specifies how a job is deleted from a queue.

func (*AckOptions) Validate

func (opt *AckOptions) Validate() error

Validate validates AckOptions.

type BulkDequeuer

type BulkDequeuer interface {
	BulkDequeue(int64, *DequeueOptions) ([]*Job, error)
	BulkAck([]*Job, *AckOptions) error
}

BulkDequeuer dequeues jobs in a batch.

type BulkEnqueuer

type BulkEnqueuer interface {
	BulkEnqueue([]*Job, *EnqueueOptions) error
}

BulkEnqueuer enqueues jobs in a batch.

type DequeueFunc

type DequeueFunc func(*DequeueOptions) (*Job, error)

DequeueFunc generates a job.

type DequeueMiddleware

type DequeueMiddleware func(DequeueFunc) DequeueFunc

DequeueMiddleware modifies DequeueFunc behavior.

type DequeueOptions

type DequeueOptions struct {
	// Namespace is the namespace of a queue.
	Namespace string
	// QueueID is the id of a queue.
	QueueID string
	// At is the current time of the dequeuer.
	// Any job that is scheduled before this can be executed.
	At time.Time
	// After the job is dequeued, no other dequeuer can see this job for a while.
	// InvisibleSec controls how long this period is.
	InvisibleSec int64
}

DequeueOptions specifies how a job is dequeued.

func (*DequeueOptions) Validate

func (opt *DequeueOptions) Validate() error

Validate validates DequeueOptions.

type Dequeuer

type Dequeuer interface {
	Dequeue(*DequeueOptions) (*Job, error)
	Ack(*Job, *AckOptions) error
}

Dequeuer dequeues a job. If a job is processed successfully, call Ack() to delete the job.

type EnqueueFunc

type EnqueueFunc func(*Job, *EnqueueOptions) error

EnqueueFunc takes in a job for processing.

type EnqueueMiddleware

type EnqueueMiddleware func(EnqueueFunc) EnqueueFunc

EnqueueMiddleware modifies EnqueueFunc behavior.

type EnqueueOptions

type EnqueueOptions struct {
	// Namespace is the namespace of a queue.
	Namespace string
	// QueueID is the id of a queue.
	QueueID string
}

EnqueueOptions specifies how a job is enqueued.

func (*EnqueueOptions) Validate

func (opt *EnqueueOptions) Validate() error

Validate validates EnqueueOptions.

type Enqueuer

type Enqueuer interface {
	Enqueue(*Job, *EnqueueOptions) error
}

Enqueuer enqueues a job.

type HandleFunc

type HandleFunc func(*Job, *DequeueOptions) error

HandleFunc runs a job.

type HandleMiddleware

type HandleMiddleware func(HandleFunc) HandleFunc

HandleMiddleware modifies HandleFunc hehavior.

type Job

type Job struct {
	// ID is the unique id of a job.
	ID string `msgpack:"id"`
	// CreatedAt is set to the time when NewJob() is called.
	CreatedAt time.Time `msgpack:"created_at"`
	// UpdatedAt is when the job is last executed.
	// UpdatedAt is set to the time when NewJob() is called initially.
	UpdatedAt time.Time `msgpack:"updated_at"`
	// EnqueuedAt is when the job will be executed next.
	// EnqueuedAt is set to the time when NewJob() is called initially.
	EnqueuedAt time.Time `msgpack:"enqueued_at"`

	// Payload is raw bytes.
	Payload []byte `msgpack:"payload"`

	// If the job previously fails, Retries will be incremented.
	Retries int64 `msgpack:"retries"`
	// If the job previously fails, LastError will be populated with error string.
	LastError string `msgpack:"last_error"`
}

Job is a single unit of work.

func NewJob

func NewJob() *Job

NewJob creates a job.

func (Job) Delay

func (j Job) Delay(d time.Duration) *Job

Delay creates a job that can be executed in future.

func (*Job) MarshalJSONPayload

func (j *Job) MarshalJSONPayload(v interface{}) error

MarshalJSONPayload encodes a variable into the JSON payload.

func (*Job) MarshalPayload

func (j *Job) MarshalPayload(v interface{}) error

MarshalPayload encodes a variable into the msgpack payload.

func (*Job) UnmarshalJSONPayload

func (j *Job) UnmarshalJSONPayload(v interface{}) error

UnmarshalJSONPayload decodes the JSON payload into a variable.

func (*Job) UnmarshalPayload

func (j *Job) UnmarshalPayload(v interface{}) error

UnmarshalPayload decodes the msgpack payload into a variable.

func (Job) WithPayload

func (j Job) WithPayload(v interface{}) (*Job, error)

WithPayload adds payload to the job.

type JobOptions

type JobOptions struct {
	WorkerOptions
	MaxExecutionTime time.Duration
	IdleWait         time.Duration
	NumGoroutines    int64

	DequeueMiddleware []DequeueMiddleware
	HandleMiddleware  []HandleMiddleware
}

JobOptions specifies how a job is executed.

func (*JobOptions) AddDequeueMiddleware

func (opt *JobOptions) AddDequeueMiddleware(mw DequeueMiddleware) *JobOptions

AddDequeueMiddleware adds DequeueMiddleware.

func (*JobOptions) AddHandleMiddleware

func (opt *JobOptions) AddHandleMiddleware(mw HandleMiddleware) *JobOptions

AddHandleMiddleware adds HandleMiddleware.

func (*JobOptions) Validate

func (opt *JobOptions) Validate() error

Validate validates JobOptions.

type Metrics

type Metrics struct {
	Queue []*QueueMetrics
}

Metrics wraps metrics reported by MetricsExporter.

type MetricsExporter

type MetricsExporter interface {
	GetQueueMetrics(*QueueMetricsOptions) (*QueueMetrics, error)
}

MetricsExporter can be implemented by Queue to report metrics.

type Queue

type Queue interface {
	Enqueuer
	Dequeuer
}

Queue can enqueue and dequeue jobs.

func NewRedisQueue

func NewRedisQueue(client redis.UniversalClient) Queue

NewRedisQueue creates a new queue stored in redis.

type QueueMetrics

type QueueMetrics struct {
	Namespace string
	QueueID   string
	// Total number of jobs that can be executed right now.
	ReadyTotal int64
	// Total number of jobs that are scheduled to run in future.
	ScheduledTotal int64
}

QueueMetrics contains metrics from a queue.

type QueueMetricsOptions

type QueueMetricsOptions struct {
	Namespace string
	QueueID   string
	At        time.Time
}

QueueMetricsOptions specifies how to fetch queue metrics.

func (*QueueMetricsOptions) Validate

func (opt *QueueMetricsOptions) Validate() error

Validate validates QueueMetricsOptions.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker runs jobs.

func NewWorker

func NewWorker(opt *WorkerOptions) *Worker

NewWorker creates a new worker.

func (*Worker) ExportMetrics

func (w *Worker) ExportMetrics() (*Metrics, error)

ExportMetrics dumps queue stats if the queue implements MetricsExporter.

func (*Worker) Register

func (w *Worker) Register(queueID string, h HandleFunc, opt *JobOptions) error

Register adds handler for a queue.

func (*Worker) Start

func (w *Worker) Start()

Start starts the worker.

func (*Worker) Stop

func (w *Worker) Stop()

Stop stops the worker.

type WorkerOptions

type WorkerOptions struct {
	Namespace string
	Queue     Queue
	ErrorFunc func(error)
}

WorkerOptions is used to create a worker.

Directories

Path Synopsis
middleware

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL