job

package
Version: v0.0.0-...-fc15ddb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 27, 2017 License: MPL-2.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MaxName = 255

	// 1 MiB = 1,024 KiB = 1,048,576 bytes
	MaxPayload = 1048576
	MaxResult  = 1048576

	MaxAttempts = 255
	MaxFails    = 255

	// 24 hours in ms
	MaxTTR = 86400000
	// 30 days in ms
	MaxTTL          = 2592000000
	MaxHardAttempts = 255
	TimeFormat      = "2006-01-02T15:04:05Z"

	// Max timeout for wait related cmds (lease).
	MaxTimeout = 86400000
)
View Source
const (
	StateNew       = 0
	StateCompleted = 1
	StateFailed    = 2
	StatePending   = 3
	StateLeased    = 4

	MaxRunRecAttempts = 1<<64 - 1

	RunRecTTRTimerIdx   = 0
	RunRecTTLTimerIdx   = 1
	RunRecSchedTimerIdx = 2
)

Variables

View Source
var (
	ErrDuplicateJob       = errors.New("Duplicate job")
	ErrDuplicateResult    = errors.New("Duplicate result")
	ErrLeaseExpired       = errors.New("Lease expired")
	ErrMaxAttemptsReached = errors.New("Max attempts reached")
	ErrAlreadyLeased      = errors.New("Job already leased")
	ErrAlreadyProcessed   = errors.New("Job already processed")
	ErrEnqueue            = errors.New("Unable to enqueue")
	ErrNotFound           = errors.New("Not found")
	ErrTimeout            = errors.New("Timeout")
	ErrQueueOutOfSync     = errors.New("Queue out of sync")
)
View Source
var (
	ErrInvalidID          = errors.New("Invalid ID")
	ErrInvalidName        = errors.New("Invalid Name")
	ErrInvalidPayload     = errors.New("Invalid Payload")
	ErrInvalidResult      = errors.New("Invalid Result")
	ErrInvalidMaxAttempts = errors.New("Invalid Max Attempts")
	ErrInvalidMaxFails    = errors.New("Invalid Max Fails")
	ErrInvalidPriority    = errors.New("Invalid Priority")
	ErrInvalidTTR         = errors.New("Invalid TTR")
	ErrInvalidTTL         = errors.New("Invalid TTL")
	ErrInvalidTime        = errors.New("Invalid Time")
	ErrInvalidTimeout     = errors.New("Invalid Timeout")
)
View Source
var (
	ErrDuplicate = errors.New("Duplicate")
)

Functions

func ValidateAddJob

func ValidateAddJob(j *Job) error

func ValidateID

func ValidateID(id ID) error

func ValidateName

func ValidateName(name string) error

func ValidatePayload

func ValidatePayload(p []byte) error

func ValidateResult

func ValidateResult(r []byte) error

func ValidateRunJob

func ValidateRunJob(j *Job) error

func ValidateScheduleJob

func ValidateScheduleJob(j *Job) error

func ValidateTTL

func ValidateTTL(ttl uint64) error

Valid TTL is 2^64 - 1, non zero, and non-negative.

func ValidateTTR

func ValidateTTR(ttr uint32) error

A valid TTR is 2^32 - 1, non zero, and non-negative.

func ValidateTime

func ValidateTime(t time.Time) error

Valid time is in UTC, and greater or equal to current time.

func ValidateTimeout

func ValidateTimeout(t uint32) error

Types

type Adder

type Adder interface {
	Add(j *Job) error
	Expire(id ID)
	HandleExpire(func(ID))
	ExpireFunc() func(ID)
}

type Completer

type Completer interface {
	Complete(id ID, result []byte) error
}

type Controller

type Controller struct {
	// contains filtered or unexported fields
}

func NewController

func NewController(reg *Registry, qc QueueControllerInterface) *Controller

func (*Controller) Add

func (c *Controller) Add(j *Job) error

Adds a job to its named work queue with respect for <priority>. TTL timer starts immmediately.

func (*Controller) Complete

func (c *Controller) Complete(id ID, result []byte) error

Sucessfully complete a job with an optional result. Stops TTR timer.

func (*Controller) Delete

func (c *Controller) Delete(id ID) error

Delete a job by ID regardless of existing state.

func (*Controller) Expire

func (c *Controller) Expire(id ID)

Expire job by ID. Invoked by TTL timers and removes job regardless of state. See the expire() method for the implementation.

func (*Controller) ExpireFunc

func (c *Controller) ExpireFunc() func(ID)

Return the current Expire callback.

func (*Controller) Fail

func (c *Controller) Fail(id ID, result []byte) error

Fail a job with a result. Stops TTR timer.

func (*Controller) HandleExpire

func (c *Controller) HandleExpire(f func(ID))

HandleExpire sets an the func to be used for the Expire() method. This is primarily used to allow the proxying of a job controller in full as the expire method is not invoked directly, but rather as a secondary call from "add" or "schedule". Example can be found in the cmdlog package.

func (*Controller) HandleTimeoutAttempt

func (c *Controller) HandleTimeoutAttempt(f func(ID))

HandleTimeoutAttempt sets the func to be used for the TimeoutAttempt() method. This is primarily used to allow the proxying of a job controller in full as the TimeoutAttempt method is not invoked directly, but rather as a background call from "lease". Example can be found in the cmdlog package.

func (*Controller) Lease

func (c *Controller) Lease(names []string, timeout uint32) (*Job, error)

Lease a job by name blocking until <wait-timeout>. Multiple job names can be specified and they will be processed uniformly by random selection.

Returns a leased job on success within <wait-timeout> or a timeout error. TTR timer starts immediately on success.

func (*Controller) Run

func (c *Controller) Run(j *Job, timeout uint32) (*RunResult, error)

Run a job, blocking until wait-timeout if no workers are available, or until TTR if a worker is processing.

Returns the job result on successful completion. TIMEOUT error is returned if no workers were available within <wait-timeout> or if the job failed to complete within TTR.

This is the syncronous form of "add job". All job related data is deleted

func (*Controller) Schedule

func (c *Controller) Schedule(j *Job) error

Schedules a job to run at a UTC time with respect for <priority> TTL timer starts when scheduled time is met.

func (*Controller) StartAttempt

func (c *Controller) StartAttempt(id ID) error

Start an attempt by ID. Ensures jobs respect TTR & max attempts policy.

func (*Controller) Stats

func (c *Controller) Stats() Stats

Stats returns job controllers stats at the current time.

func (*Controller) TimeoutAttempt

func (c *Controller) TimeoutAttempt(id ID)

Timeout job attempt by ID. See the "timeoutAttempt()" method for the implementation.

func (*Controller) TimeoutAttemptFunc

func (c *Controller) TimeoutAttemptFunc() func(ID)

Return TimeoutAttempt callback.

type ControllerInterface

type ControllerInterface interface {
	Adder
	Completer
	Deleter
	Failer
	Runner
	Scheduler
	Leaser
}

type Deleter

type Deleter interface {
	Delete(id ID) error
}

type Failer

type Failer interface {
	Fail(id ID, result []byte) error
}

type ID

type ID [16]byte // UUIDv4

16 byte UUIDv4

func (ID) String

func (id ID) String() string

Returns canonical UUIDv4 form Implements fmt.Stringer

type Inspector

type Inspector struct {
	// contains filtered or unexported fields
}

Inspector allows for deeper inspection of a WorkQueue.

func NewInspector

func NewInspector(wqueue *WorkQueue) *Inspector

func (*Inspector) Iterators

func (i *Inspector) Iterators() (*Iterator, *Iterator)

Iterators for individual queues

func (*Inspector) Lens

func (i *Inspector) Lens() (int, int)

Length of individual queues

type Iterator

type Iterator struct {
	*skiplist.Iterator
}

type Job

type Job struct {
	ID          ID
	Name        string    // Unique name of job
	Payload     []byte    // 1MB limit
	Priority    int32     // Priority from lowest to highest
	MaxAttempts uint8     // Num of allowed attempts
	MaxFails    uint8     // Num of allowed failures
	TTR         uint32    // time to run in ms
	TTL         uint64    // max time to live in ms
	Time        time.Time // Scheduled Time to Exec
	Created     time.Time
}

func NewEmptyJob

func NewEmptyJob() *Job

New Empty Job returns an Job with its created time initiliazed.

func (*Job) Expiration

func (j *Job) Expiration() time.Time

type JobProxy

type JobProxy func() (*Job, bool)

JobProxy interface A function that returns a job when invoked following comma ok idiom Used in WorkQueue leases

type Leaser

type Leaser interface {
	Lease(names []string, timeout uint32) (*Job, error)
	StartAttempt(id ID) error
	TimeoutAttempt(id ID)
	HandleTimeoutAttempt(func(ID))
	TimeoutAttemptFunc() func(ID)
}

type QueueController

type QueueController struct {
	// contains filtered or unexported fields
}

Queue controller owns/manages all queues. Almost all calls are proxied to the QueueInterface holder. Allows for management of any queue by name in a single location. Friendly for future queue call interception.

func NewQueueController

func NewQueueController() *QueueController

func (*QueueController) Add

func (c *QueueController) Add(j *Job) bool

Add a job Automatically creates the queue if it does not exist.

func (*QueueController) Awake

func (c *QueueController) Awake(j *Job) bool

Awake an existing scheduled job

func (*QueueController) Delete

func (c *QueueController) Delete(j *Job) bool

Delete a job Automatically creates the queue if it does not exist.

func (*QueueController) Exists

func (c *QueueController) Exists(j *Job) bool

Verify if a job exists by object.

func (*QueueController) Lease

func (c *QueueController) Lease(name string) <-chan JobProxy

Return a job lease by name

func (*QueueController) Queue

func (c *QueueController) Queue(name string) QueueInterface

Return a queue by name, creating it if it does not exist.

func (*QueueController) Queues

func (c *QueueController) Queues() (map[string]QueueInterface, *sync.RWMutex)

Return all queues as a map with an explicit sync.RWMutex. All calls to the map require locking.

func (*QueueController) Run

func (c *QueueController) Run(j *Job) bool

Run a job @FYI When durability is added, run cmd's will not be durable. Automatically creates the queue if it does not exist.

func (*QueueController) Schedule

func (c *QueueController) Schedule(j *Job) bool

Schedule a job Automatically creates the queue if it does not exist.

type QueueControllerInterface

type QueueControllerInterface interface {
	Queues() (map[string]QueueInterface, *sync.RWMutex)
	Queue(name string) QueueInterface
	Add(j *Job) bool
	Schedule(j *Job) bool
	Run(j *Job) bool
	Delete(j *Job) bool
	Awake(j *Job) bool
	Exists(j *Job) bool
	Lease(name string) <-chan JobProxy
}

type QueueInterface

type QueueInterface interface {
	Add(j *Job) bool
	Schedule(j *Job) bool
	Lease() <-chan JobProxy
	Delete(j *Job) bool
	Exists(j *Job) bool
	Awake(j *Job) bool
	Len() int
}

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry holds all Job Run Records

func NewRegistry

func NewRegistry() *Registry

func (*Registry) Add

func (r *Registry) Add(rec *RunRecord) bool

Add a RunRecord indexed by attached job id. Returns false on duplicate or invalid job id.

func (*Registry) Delete

func (r *Registry) Delete(id ID) bool

Delete a run record by job ID. Cancels all attached timers. Return false if record does not exist.

func (*Registry) Len

func (r *Registry) Len() int

Return length of registry

func (*Registry) Record

func (r *Registry) Record(jid ID) (*RunRecord, bool)

Look up run record by ID Follows comma ok idiom

type Result

type Result []byte

type RunRecord

type RunRecord struct {
	Attempts uint64    // Number of attempts made
	State    uint8     // One of state constants
	Fails    uint8     // Number of explicit failures
	Job      *Job      // attached job object
	Result   Result    // Result of job
	Wait     Wait      // Wait channel for job result
	Timers   [3]*Timer // Timer container for scheduled time, TTR, TTL
	Mu       sync.RWMutex
}

Run Record encapsulates the meta for the execution of jobs. This detached run record allows Job objects to stay immutable after creation.

func NewRunRecord

func NewRunRecord() *RunRecord

Returns an empty but intiialized run record

func (*RunRecord) Processed

func (r *RunRecord) Processed() bool

Returns if the job is fully processed.

func (*RunRecord) Running

func (r *RunRecord) Running() bool

Returns if the job is currently executing.

func (*RunRecord) Success

func (r *RunRecord) Success() bool

Returns if the job was successful

func (*RunRecord) WriteResult

func (r *RunRecord) WriteResult(result Result, success bool) bool

Records a job result and sends result over the "wait" channel. Requires locking. Returns false if result has already been written.

type RunResult

type RunResult struct {
	Success bool
	Result  Result
}

RunResult is sent over the Wait channel. Allows readers to wait for a result and determine the success.

type Runner

type Runner interface {
	Run(j *Job, timeout uint32) (*RunResult, error)
}

type Scheduler

type Scheduler interface {
	Schedule(j *Job) error
}

type Stats

type Stats struct {
	// Number of incomplete jobs expired by TTL.
	EvictedJobs uint64
}

Stats represents job controller specific stats.

type Timer

type Timer struct {
	C            <-chan time.Time
	Cancellation chan struct{}
	// Read only timer deadline, useful for approximations.
	// Slightly behind actual timer as the calculation is:
	// NOW + Duration.
	Deadline time.Time
	// contains filtered or unexported fields
}

Timer container encapsulates the original time.Timer with support for Cancellation.

func NewTimer

func NewTimer(d time.Duration) *Timer

func (*Timer) Cancel

func (t *Timer) Cancel()

Cancel a timer and signal readers on cancellation channel.

type Wait

type Wait chan *RunResult

Wait channel for active connected readers.

type WorkQueue

type WorkQueue struct {
	// contains filtered or unexported fields
}

WorkQueue encapsulates 3 separate queues for a single type of job. Queues implement skiplists with different compare functions.

func NewWorkQueue

func NewWorkQueue() *WorkQueue

NewWorkQueue returns an initialized WorkQueue

func (*WorkQueue) Add

func (w *WorkQueue) Add(j *Job) bool

Add a job Returns false on duplicate job Returns true on success and sends a job proxy to signal leasers.

func (*WorkQueue) Awake

func (w *WorkQueue) Awake(j *Job) bool

Awake a scheduled job by swapping a job from scheduled -> ready queue

func (*WorkQueue) Delete

func (w *WorkQueue) Delete(j *Job) bool

Delete a job by value Returns false if job was not found

func (*WorkQueue) Exists

func (w *WorkQueue) Exists(j *Job) bool

Verify if the job already exists

func (*WorkQueue) Lease

func (w *WorkQueue) Lease() <-chan JobProxy

Lease returns a channel to listen on with a JobProxy callback.

func (*WorkQueue) Len

func (w *WorkQueue) Len() int

Get the length of the combined queues.

func (*WorkQueue) Schedule

func (w *WorkQueue) Schedule(j *Job) bool

Schedule a jobs on scheduled queue Returns false on duplicate job Returns true on success and sends a job proxy to signal leasers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
t or T : Toggle theme light dark auto
y or Y : Canonical URL