state

package
v0.1.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 27, 2020 License: AGPL-3.0-only Imports: 12 Imported by: 0

Documentation

Overview

* Package state provides an interface for a beanstalkd job, state for * job, tube and a connected client. and job state machine with the * states and transitions as defined in the protocol. * https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt * * Implementations include an in-memory implementation of the interfaces

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrInvalidIndex no job entry found at the index
	ErrInvalidIndex = errors.New("provided index is out of range of the heap")

	// ErrMismatchJobEntry - returned when job entry at index does not match the heap's value
	ErrMismatchJobEntry = errors.New("job entry at index does not match provided entry")

	// ErrEntryExists - returned when an entry exists in the existing map/set to prevents from overriding
	ErrEntryExists = errors.New("entry exists in container")

	// ErrEntryMissing - returned when an entry is not found in the container
	ErrEntryMissing = errors.New("entry not found in container")

	// ErrContainerEmpty - returned when the container such as a list/map/slice etc is empty
	ErrContainerEmpty = errors.New("the container is empty")

	// ErrInvalidJobTransition - the current state of the job prevents this transition
	ErrInvalidJobTransition = errors.New("invalid transition")

	// ErrInvalidOperation - The state indicates that this op cannot be done
	ErrInvalidOperation = errors.New("invalid operation due to the current state")

	// ErrUnauthorizedOperation - This state requires a matching client to perform this transition
	ErrUnauthorizedOperation = errors.New("client is not authorized to perform this operation")

	// ErrCancel - indicates the request is cancelled
	ErrCancel = errors.New("cancelled")

	// ErrNoReservation - indicates that a request for a reservation could not be completed
	ErrNoReservationFound = errors.New("no reservation could be found")

	// ErrInvalidResvTimeout - indicates the provided reservation timeout is invalid
	ErrInvalidResvTimeout = errors.New("the provided reservation timeout is invalid")

	// ErrClientIsWaitingForReservation - Indicates the client is waiting for a reservation
	ErrClientIsWaitingForReservation = errors.New("the request client cannot request for another reservation")
)

Functions

func GetStatistics

func GetStatistics(nowSecs int64, j Job) map[string]interface{}

Types

type BuriedJobs

type BuriedJobs JobHeap

BuriedJobs is a JobHeap, with jobs ordered by its BuriedAt field. Lower (earlier) BuriedAt take a higher precedence If two jobs have the same BuriedAt value, the lower job id gets precedence

type ClientID

type ClientID string

type ClientResvEntry

type ClientResvEntry struct {
	// ID of the client making reservations
	CliID ClientID

	// Tubes are the tube names watched by this client
	// for this reservation
	WatchedTubes []TubeName

	// reservation Deadline timestamp
	ResvDeadlineAt int64

	// Indicates if this entry is waiting for a reservation
	IsWaitingForResv bool

	// clock at which the client needs some processing
	// If client IsWaitingForResv is set to true
	// - if there is a job already reserved at the lowest job's
	//   deadline is within a second of now, then send a DEADLINE_SOON
	//   and un-reserve the client
	// - If there are no jobs reserved at the current time is past
	//   the client's reservation ResvDeadlineAt, then un-reserve the client
	// Check to see if any reservations need to be cleaned
	TickAt int64

	// the request ID of the reservation
	ReqID string

	// Index of the client in the client Heap
	HeapIndex int
}

Represents a client reservation as requested by the client

type DelayedJobs

type DelayedJobs JobHeap

DelayedJobs is a JobHeap, with jobs ordered by its ReadyAt field. Lower (earlier) ReadyAt takes a higher precedence.

func NewDelayedJobs

func NewDelayedJobs() DelayedJobs

type IndexEntry

type IndexEntry struct {
	// contains filtered or unexported fields
}

type JSM

type JSM interface {
	// Put makes a new job. It initializes the job with
	// a unique identifier, sets state to Ready or Delayed based
	// on the delay parameter.
	Put(nowSeconds int64,
		priority uint32,
		delay int64,
		ttr int,
		bodySize int,
		body []byte,
		tubeName TubeName) (JobID, error)

	// Delete, removes a job specified by the id by a specific client
	Delete(jobID JobID, clientID ClientID) error

	// PeekDelayedJob returns the job in the Delay state in order of
	// priority for this tube. A job with and earlier (lower) delay
	// takes higher precedence.
	PeekDelayedJob(tubeName TubeName) (Job, error)

	// PeekReadyJob returns the job in the Ready state in order of
	// priority for this tube. A job with a lower priority value
	// takes higher precedence.
	PeekReadyJob(tubeName TubeName) (Job, error)

	// PeekReadyJob returns the job in the Buried state in order of
	// priority for this tube. A job which was buried first takes higher precedence.
	PeekBuriedJob(tubeName TubeName) (Job, error)

	// GetJob returns the job by its id
	GetJob(id JobID) (Job, error)

	// Release transitions this reserved job to a Ready state
	Release(jobID JobID, clientID ClientID) error

	// ReleaseWith transitions this reserved job to a Ready or Delayed state
	// with a modified priority and delay
	ReleaseWith(nowSeconds int64, jobID JobID, clientID ClientID, pri uint32, delay int64) error

	// Extend a reserved job's reservation TTL by its TTR (time-to-run)
	Touch(nowSeconds int64, jobID JobID, clientID ClientID) error

	// Bury this job (from a reserved state)
	Bury(nowSeconds int64, jobID JobID, priority uint32, clientID ClientID) error

	// Kick this job from buried state to a ready state
	Kick(jobID JobID) error

	// Kick atmost n jobs from the specified tube to ready state
	KickN(name TubeName, n int) (int, error)

	// Retrieve the statistics of this job
	// The stats-job data is a YAML byte slice representing a single dictionary of string
	// keys to scalar values. It contains these keys:
	// - "id" is the job id
	// - "tube" is the name of the tube that contains this job
	// - "state" is "ready" or "delayed" or "reserved" or "buried"
	// - "pri" is the priority value set by the put, release, or bury commands.
	// - "age" is the time in seconds since the put command that created this job.
	// - "delay" is the integer number of seconds to wait before putting this job in
	//   the ready queue.
	// - "ttr" -- time to run -- is the integer number of seconds a worker is
	//   allowed to run this job.
	// - "time-left" is the number of seconds left until the server puts this job
	//   into the ready queue. This number is only meaningful if the job is
	//   reserved or delayed. If the job is reserved and this amount of time
	//   elapses before its state changes, it is considered to have timed out.
	// - "file" this will be 0.
	// - "reserves" is the number of times this job has been reserved.
	// - "timeouts" is the number of times this job has timed out during a
	//   reservation.
	// - "releases" is the number of times a client has released this job from a
	//   reservation.
	// - "buries" is the number of times this job has been buried.
	// - "kicks" is the number of times this job has been kicked.
	GetStatsJobAsYaml(nowSeconds int64, id JobID) ([]byte, error)

	// Retrieve tube statistics
	//
	// 	The stats-job data is a YAML byte slice representing a single dictionary of string
	//	keys to scalar values. It contains these keys:
	//  - "name" is the tube's name.
	// - "current-jobs-ready" is the number of jobs in the ready queue in this tube.
	// - "current-jobs-reserved" is the number of jobs reserved by all clients in this tube.
	// - "current-jobs-delayed" is the number of delayed jobs in this tube.
	// - "current-jobs-buried" is the number of buried jobs in this tube.
	// - "current-waiting" is the number of open connections that have issued a
	//   reserve command while watching this tube but not yet received a response.
	//
	// The following are not implemented but have placeholders for backward-compat:
	// - "current-jobs-urgent" always zero
	// - "total-jobs" is always zero
	// - "current-using" is always zero
	// - "current-watching" is the number of open connections that are currently
	//   watching this tube.
	// - "pause" is the number of seconds the tube has been paused for.
	// - "cmd-delete" is the cumulative number of delete commands for this tube
	// - "cmd-pause-tube" is the cumulative number of pause-tube commands for this tube.
	// - "pause-time-left" is the number of seconds until the tube is un-paused.
	GetStatsTubeAsYaml(nowSeconds int64, tubeName TubeName) ([]byte, error)

	// Retrieve overall statistics
	GetStatsAsYaml(nowSeconds int64) ([]byte, error)

	// Returns an interface that allows a caller to snapshot the current
	// state of the JSM. Callers of the interface should not be done across
	// go-routines.
	Snapshot() (JSMSnapshot, error)

	// AppendReservation Appends a new Reservation Request for a client, and the specified set of tubes
	// if the timeoutSecs is zero, then an infinite timeout is assumed.
	//
	// A pointer to a Reservation struct is returned which encapsulates a result if a reservation
	// was handled or not
	AppendReservation(clientId ClientID, reqID string, watchedTubes []TubeName, nowSecs, deadlineAt int64) (*Reservation, error)

	// Tick runs a step of the job state machine with the current time.
	//
	// This call returns all the allocated reservations in this tick call
	Tick(nowSecs int64) ([]*Reservation, error)

	// GetTubes returns the tubeNames of all current tubes
	GetTubes() ([]TubeName, error)

	// CheckClientState queries the job state machine whether the provided list of clientIds are waiting for reservations.
	//
	// The response returns the ClientIDs (i) which are waiting for reservations, (ii) those which are not waiting and (iii)
	// those which are missing or an error.
	CheckClientState(clientIDs []ClientID) ([]ClientID, []ClientID, []ClientID, error)

	// Close or stop this jobstatemachine
	Stop() error
}

JSM provides methods for the beanstalkd job state machine. put with delay release with delay

----------------> [DELAYED] <------------.
                      |                   |  touch (extend ttr)
                      | timeout/ttr       | .----.
                      |                   | |    |
 put                  v     reserve       | |    v   delete
-----------------> [READY] ------------> [RESERVED] --------> *poof*
                     ^  ^                | | |
                     |  ^\  release      | | |
                     |   \ `-------------' | |
                     |    \                | |
                     |     \  timeout/ttr  , |
                     |      `--------------  |
                     |                       |
                     | kick                  |
                     |                       |
                     |       bury            |
                  [BURIED] <-----------------'
                     |
                     |  delete
                      `--------> *poof*

func NewJSM

func NewJSM() (JSM, error)

type JSMSnapshot

type JSMSnapshot interface {
	// SnapshotJobs returns a read-only job channel. This allows a caller
	// to iterate through the jobs sent on the channel, when all the jobs
	// in the state machine are returned, this method closes the channel,
	// signaling the end of this snapshot.
	//
	// SnapshotJobs is used to support log compaction. This call should
	// an be used to save a point-in-time snapshot of the FSM.
	//
	// SnapshotJobs should not be called to the JSM across go-routines,
	// this is the default behavior (unless an implementation forces to override this)
	// A caller is recommended to clone this job
	SnapshotJobs() (<-chan Job, error)

	// SnapshotClients returns a read-only job channel. This allows a caller
	// to iterate through the clientResvEntries sent on the channel, when
	// all the entries in the state machine are returned, this method closes
	// the channel, signaling the end of this snapshot.
	//
	// SnapshotClients is used to support log compaction. This call should
	// an be used to save a point-in-time snapshot of the FSM.
	//
	// SnapshotClients should not be called to the JSM across go-routines,
	// this is the default behavior (unless an implementation forces to override this)
	// A caller is recommended to clone this job
	SnapshotClients() (<-chan *ClientResvEntry, error)

	// RestoreJobs takes jobsCh, a write-only job channel, that allow a caller
	// to send jobs to be added to the job state machine (JSM).
	//
	// Once RestoreJobs and RestoreClients are complete. call FinalizeRestore
	// which replaces the current state of JSM with the jobs provided in the
	// channel.
	//
	// RestoreJobs takes an additional context which can be used to signal
	// a cancellation. In this case, the method discards the jobs that were
	// provided on the channel, after the cancel is called
	//
	// Note: It is the responsibility of the caller to close the channels
	// and drain the jobsCh
	RestoreJobs(ctx context.Context, jobsCh <-chan Job) error

	// RestoreClients takes entriesCh, a write-only job channel, that allow
	// a caller to send clientResvEntry structs to be added to the job state
	// machine (JSM).
	//
	// Once RestoreJobs and RestoreClients are complete. call FinalizeRestore
	// which replaces the current state of JSM with the jobs provided in the
	// channel.
	//
	// RestoreClients takes an additional context which can be used to signal
	// a cancellation. In this case, the method discards the clientResvEntries
	// that were provided on the channel, after the cancel is called
	//
	// Note: It is the responsibility of the caller to close the channels
	// and drain the jobsCh
	RestoreClients(ctx context.Context, nClients int, entriesCh <-chan *ClientResvEntry) error

	// Finalize Restore overwrites the state of the job-state-machine
	// with the current state of the snapshot
	//
	// Once RestoreJobs and RestoreClients are complete. call FinalizeRestore
	// which replaces the current state of JSM with the jobs provided in the
	// channel.
	FinalizeRestore()
}

JSMSnapshot provides methods allowing a caller to read & restore jobs out of the job state machine.

type Job

type Job interface {
	// ID is a unique identifier integer for this job (generated by the server)
	ID() JobID

	// Priority is an integer < 2**32. Jobs with smaller priority values will be
	// scheduled before jobs with larger priorities. The most urgent priority is 0;
	// the least urgent priority is 4,294,967,295.
	Priority() uint32

	// UpdatePriority to a new value. Return back the newly set value
	UpdatePriority(newPriority uint32) uint32

	// Delay is an integer number of seconds to wait before putting the job in
	// the ready queue. The job will be in the "delayed" state during this time.
	// Maximum delay is 2**32-1.
	Delay() int64

	// Update Delay to a new value. Return back the newly set value
	UpdateDelay(newDelay int64) int64

	// TTR/time to run -- is an integer number of seconds to allow a worker
	// to run this job. This time is counted from the moment a worker reserves
	// this job. If the worker does not delete, release, or bury the job within
	// <ttr> seconds, the job will time out and the server will release the job.
	// The minimum ttr is 1. If the client sends 0, the server will silently
	// increase the ttr to 1. Maximum ttr is 2**32-1.
	TTR() int

	// BodySize is an integer indicating the size of the job body, not including the
	// trailing "\r\n". This value must be less than max-job-size (default: 2**16)
	BodySize() int

	// Body is the job body -- a sequence of bytes of length BodySize
	Body() []byte

	// TubeName the name of the tube associated with this job
	TubeName() TubeName

	// CreatedAt - Indicates the time, when this job is created
	CreatedAt() int64

	// ReadyAt - Indicates the time when the job is ready
	ReadyAt() int64

	// Reset the ReadyAt time taking the current time in reference
	// Return back the new readyAt time
	UpdateReadyAt(nowSeconds int64) (int64, error)

	// Retrieve the current job state
	State() JobState

	// Update the job state
	UpdateState(newState JobState)

	// Return the time at which the reservation expires
	ExpiresAt() int64

	// Reset the reservation time taking the current in reference
	// Return back the new reservation time
	UpdateReservation(nowSeconds int64) (int64, error)

	// ReservedBy returns the name of the client which has
	// reserved this job. Empty string if this job is not reserved
	ReservedBy() ClientID

	// Update the reservedBy client
	UpdateReservedBy(clientID ClientID)

	// Returns the time this specific job was buried
	BuriedAt() int64

	// Reset the buriedAt value to zero
	ResetBuriedAt()

	// Update the buriedAt value to the current clock
	// Return back the new BuriedAt time
	UpdateBuriedAt(nowSeconds int64) int64

	ReserveCount() uint32
	IncReserveCount()
	TimeoutCount() uint32
	IncTimeoutCount()
	ReleaseCount() uint32
	IncReleaseCount()
	BuryCount() uint32
	IncBuryCount()
	KickCount() uint32
	IncKickCount()
}

type JobEntry

type JobEntry struct {
	Job
	// contains filtered or unexported fields
}

JobEntry is an entry in the JobHeap

type JobHeap

type JobHeap interface {
	// Enqueue appends an entry to the job heap in priority order
	Enqueue(job Job) *JobEntry

	// Dequeue returns a from the heap in priority order
	Dequeue() Job

	// RemoveAt removes a specific job entry.
	RemoveAt(jobEntry *JobEntry) (*JobEntry, error)

	// Len returns the heap length
	Len() int

	// Peek returns the top element of the heap without dequeuing it
	Peek() *JobEntry

	// Return the number of jobs found in the specific tube
	JobCountByTube(tubename TubeName) uint64
}

JobHeap is a binary heap of jobs

type JobID

type JobID uint64

type JobIndex

type JobIndex struct {
	// contains filtered or unexported fields
}

An index of indexEntry indexed by a job ID

func NewJobIndex

func NewJobIndex() *JobIndex

func (*JobIndex) Add

func (idx *JobIndex) Add(job Job) (*IndexEntry, error)

func (*JobIndex) Get

func (idx *JobIndex) Get(jobID JobID) (*IndexEntry, error)

func (*JobIndex) Jobs

func (idx *JobIndex) Jobs() <-chan Job

Return a read-only channel of jobs

func (*JobIndex) Len

func (idx *JobIndex) Len() int

func (*JobIndex) NextJobID

func (idx *JobIndex) NextJobID() JobID

Return the maximum assigned JobID

func (*JobIndex) Remove

func (idx *JobIndex) Remove(jobID JobID) (*IndexEntry, error)

type JobState

type JobState int
const (
	Initial JobState = iota
	Ready
	Reserved
	Buried
	Delayed
	Deleted
)

func (JobState) String

func (i JobState) String() string

type PriorityJobs

type PriorityJobs JobHeap

PriorityJobs is a JobHeap, with jobs ordered by its Priority. Lower priority values takes a higher precedence.

func NewPriorityJobs

func NewPriorityJobs() PriorityJobs

type Reservation

type Reservation struct {
	RequestId string
	ClientId  ClientID
	JobId     JobID
	Status    ReservationStatus
	BodySize  int
	Body      []byte
	Error     error
}

func (Reservation) String

func (r Reservation) String() string

type ReservationStatus

type ReservationStatus int

go:generate stringer -type=ReservationStatus --output state_string.go

const (
	Unknown ReservationStatus = iota
	Queued
	DeadlineSoon
	Matched
	Timeout
	Error
)

type ReservedJobs

type ReservedJobs JobHeap

ReservedJobs is a JobHeap, with jobs ordered by its ExpiresAt field. Lower (earlier) ExpiresAt take a higher precedence

func NewBuriedJobs

func NewBuriedJobs() ReservedJobs

func NewReservedJobs

func NewReservedJobs() ReservedJobs

type ResultError

type ResultError struct {
	// ID of the request
	RequestID string

	// Identifier for the error code
	ErrorCode int32

	// Error
	Err error
}

func (*ResultError) Error

func (r *ResultError) Error() string

func (*ResultError) Unwrap

func (r *ResultError) Unwrap() error

type TubeName

type TubeName string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL