Documentation
¶
Overview ¶
* Package state provides an interface for a beanstalkd job, state for * job, tube and a connected client. and job state machine with the * states and transitions as defined in the protocol. * https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt * * Implementations include an in-memory implementation of the interfaces
Index ¶
- Variables
- func GetStatistics(nowSecs int64, j Job) map[string]interface{}
- type BuriedJobs
- type ClientID
- type ClientResvEntry
- type DelayedJobs
- type IndexEntry
- type JSM
- type JSMSnapshot
- type Job
- type JobEntry
- type JobHeap
- type JobID
- type JobIndex
- type JobState
- type PriorityJobs
- type Reservation
- type ReservationStatus
- type ReservedJobs
- type ResultError
- type TubeName
Constants ¶
This section is empty.
Variables ¶
var ( // ErrInvalidIndex no job entry found at the index ErrInvalidIndex = errors.New("provided index is out of range of the heap") // ErrMismatchJobEntry - returned when job entry at index does not match the heap's value ErrMismatchJobEntry = errors.New("job entry at index does not match provided entry") // ErrEntryExists - returned when an entry exists in the existing map/set to prevents from overriding ErrEntryExists = errors.New("entry exists in container") // ErrEntryMissing - returned when an entry is not found in the container ErrEntryMissing = errors.New("entry not found in container") // ErrContainerEmpty - returned when the container such as a list/map/slice etc is empty ErrContainerEmpty = errors.New("the container is empty") // ErrInvalidJobTransition - the current state of the job prevents this transition ErrInvalidJobTransition = errors.New("invalid transition") // ErrInvalidOperation - The state indicates that this op cannot be done ErrInvalidOperation = errors.New("invalid operation due to the current state") ErrUnauthorizedOperation = errors.New("client is not authorized to perform this operation") // ErrCancel - indicates the request is cancelled ErrCancel = errors.New("cancelled") // ErrNoReservation - indicates that a request for a reservation could not be completed ErrNoReservationFound = errors.New("no reservation could be found") // ErrInvalidResvTimeout - indicates the provided reservation timeout is invalid ErrInvalidResvTimeout = errors.New("the provided reservation timeout is invalid") // ErrClientIsWaitingForReservation - Indicates the client is waiting for a reservation ErrClientIsWaitingForReservation = errors.New("the request client cannot request for another reservation") )
Functions ¶
func GetStatistics ¶
Types ¶
type BuriedJobs ¶
type BuriedJobs JobHeap
BuriedJobs is a JobHeap, with jobs ordered by its BuriedAt field. Lower (earlier) BuriedAt take a higher precedence If two jobs have the same BuriedAt value, the lower job id gets precedence
type ClientResvEntry ¶
type ClientResvEntry struct {
// ID of the client making reservations
CliID ClientID
// Tubes are the tube names watched by this client
// for this reservation
WatchedTubes []TubeName
// reservation Deadline timestamp
ResvDeadlineAt int64
// Indicates if this entry is waiting for a reservation
IsWaitingForResv bool
// clock at which the client needs some processing
// If client IsWaitingForResv is set to true
// - if there is a job already reserved at the lowest job's
// deadline is within a second of now, then send a DEADLINE_SOON
// and un-reserve the client
// - If there are no jobs reserved at the current time is past
// the client's reservation ResvDeadlineAt, then un-reserve the client
// Check to see if any reservations need to be cleaned
TickAt int64
// the request ID of the reservation
ReqID string
// Index of the client in the client Heap
HeapIndex int
}
Represents a client reservation as requested by the client
type DelayedJobs ¶
type DelayedJobs JobHeap
DelayedJobs is a JobHeap, with jobs ordered by its ReadyAt field. Lower (earlier) ReadyAt takes a higher precedence.
func NewDelayedJobs ¶
func NewDelayedJobs() DelayedJobs
type IndexEntry ¶
type IndexEntry struct {
// contains filtered or unexported fields
}
type JSM ¶
type JSM interface {
// Put makes a new job. It initializes the job with
// a unique identifier, sets state to Ready or Delayed based
// on the delay parameter.
Put(nowSeconds int64,
priority uint32,
delay int64,
ttr int,
bodySize int,
body []byte,
tubeName TubeName) (JobID, error)
// Delete, removes a job specified by the id by a specific client
Delete(jobID JobID, clientID ClientID) error
// PeekDelayedJob returns the job in the Delay state in order of
// priority for this tube. A job with and earlier (lower) delay
// takes higher precedence.
PeekDelayedJob(tubeName TubeName) (Job, error)
// PeekReadyJob returns the job in the Ready state in order of
// priority for this tube. A job with a lower priority value
// takes higher precedence.
PeekReadyJob(tubeName TubeName) (Job, error)
// PeekReadyJob returns the job in the Buried state in order of
// priority for this tube. A job which was buried first takes higher precedence.
PeekBuriedJob(tubeName TubeName) (Job, error)
// GetJob returns the job by its id
GetJob(id JobID) (Job, error)
// Release transitions this reserved job to a Ready state
Release(jobID JobID, clientID ClientID) error
// ReleaseWith transitions this reserved job to a Ready or Delayed state
// with a modified priority and delay
ReleaseWith(nowSeconds int64, jobID JobID, clientID ClientID, pri uint32, delay int64) error
// Extend a reserved job's reservation TTL by its TTR (time-to-run)
Touch(nowSeconds int64, jobID JobID, clientID ClientID) error
// Bury this job (from a reserved state)
Bury(nowSeconds int64, jobID JobID, priority uint32, clientID ClientID) error
// Kick this job from buried state to a ready state
Kick(jobID JobID) error
// Kick atmost n jobs from the specified tube to ready state
KickN(name TubeName, n int) (int, error)
// Retrieve the statistics of this job
// The stats-job data is a YAML byte slice representing a single dictionary of string
// keys to scalar values. It contains these keys:
// - "id" is the job id
// - "tube" is the name of the tube that contains this job
// - "state" is "ready" or "delayed" or "reserved" or "buried"
// - "pri" is the priority value set by the put, release, or bury commands.
// - "age" is the time in seconds since the put command that created this job.
// - "delay" is the integer number of seconds to wait before putting this job in
// the ready queue.
// - "ttr" -- time to run -- is the integer number of seconds a worker is
// allowed to run this job.
// - "time-left" is the number of seconds left until the server puts this job
// into the ready queue. This number is only meaningful if the job is
// reserved or delayed. If the job is reserved and this amount of time
// elapses before its state changes, it is considered to have timed out.
// - "file" this will be 0.
// - "reserves" is the number of times this job has been reserved.
// - "timeouts" is the number of times this job has timed out during a
// reservation.
// - "releases" is the number of times a client has released this job from a
// reservation.
// - "buries" is the number of times this job has been buried.
// - "kicks" is the number of times this job has been kicked.
GetStatsJobAsYaml(nowSeconds int64, id JobID) ([]byte, error)
// Retrieve tube statistics
//
// The stats-job data is a YAML byte slice representing a single dictionary of string
// keys to scalar values. It contains these keys:
// - "name" is the tube's name.
// - "current-jobs-ready" is the number of jobs in the ready queue in this tube.
// - "current-jobs-reserved" is the number of jobs reserved by all clients in this tube.
// - "current-jobs-delayed" is the number of delayed jobs in this tube.
// - "current-jobs-buried" is the number of buried jobs in this tube.
// - "current-waiting" is the number of open connections that have issued a
// reserve command while watching this tube but not yet received a response.
//
// The following are not implemented but have placeholders for backward-compat:
// - "current-jobs-urgent" always zero
// - "total-jobs" is always zero
// - "current-using" is always zero
// - "current-watching" is the number of open connections that are currently
// watching this tube.
// - "pause" is the number of seconds the tube has been paused for.
// - "cmd-delete" is the cumulative number of delete commands for this tube
// - "cmd-pause-tube" is the cumulative number of pause-tube commands for this tube.
// - "pause-time-left" is the number of seconds until the tube is un-paused.
GetStatsTubeAsYaml(nowSeconds int64, tubeName TubeName) ([]byte, error)
// Retrieve overall statistics
GetStatsAsYaml(nowSeconds int64) ([]byte, error)
// Returns an interface that allows a caller to snapshot the current
// state of the JSM. Callers of the interface should not be done across
// go-routines.
Snapshot() (JSMSnapshot, error)
// AppendReservation Appends a new Reservation Request for a client, and the specified set of tubes
// if the timeoutSecs is zero, then an infinite timeout is assumed.
//
// A pointer to a Reservation struct is returned which encapsulates a result if a reservation
// was handled or not
AppendReservation(clientId ClientID, reqID string, watchedTubes []TubeName, nowSecs, deadlineAt int64) (*Reservation, error)
// Tick runs a step of the job state machine with the current time.
//
// This call returns all the allocated reservations in this tick call
Tick(nowSecs int64) ([]*Reservation, error)
// GetTubes returns the tubeNames of all current tubes
GetTubes() ([]TubeName, error)
// CheckClientState queries the job state machine whether the provided list of clientIds are waiting for reservations.
//
// The response returns the ClientIDs (i) which are waiting for reservations, (ii) those which are not waiting and (iii)
// those which are missing or an error.
CheckClientState(clientIDs []ClientID) ([]ClientID, []ClientID, []ClientID, error)
// Close or stop this jobstatemachine
Stop() error
}
JSM provides methods for the beanstalkd job state machine. put with delay release with delay
----------------> [DELAYED] <------------.
| | touch (extend ttr)
| timeout/ttr | .----.
| | | |
put v reserve | | v delete
-----------------> [READY] ------------> [RESERVED] --------> *poof*
^ ^ | | |
| ^\ release | | |
| \ `-------------' | |
| \ | |
| \ timeout/ttr , |
| `-------------- |
| |
| kick |
| |
| bury |
[BURIED] <-----------------'
|
| delete
`--------> *poof*
type JSMSnapshot ¶
type JSMSnapshot interface {
// SnapshotJobs returns a read-only job channel. This allows a caller
// to iterate through the jobs sent on the channel, when all the jobs
// in the state machine are returned, this method closes the channel,
// signaling the end of this snapshot.
//
// SnapshotJobs is used to support log compaction. This call should
// an be used to save a point-in-time snapshot of the FSM.
//
// SnapshotJobs should not be called to the JSM across go-routines,
// this is the default behavior (unless an implementation forces to override this)
// A caller is recommended to clone this job
SnapshotJobs() (<-chan Job, error)
// SnapshotClients returns a read-only job channel. This allows a caller
// to iterate through the clientResvEntries sent on the channel, when
// all the entries in the state machine are returned, this method closes
// the channel, signaling the end of this snapshot.
//
// SnapshotClients is used to support log compaction. This call should
// an be used to save a point-in-time snapshot of the FSM.
//
// SnapshotClients should not be called to the JSM across go-routines,
// this is the default behavior (unless an implementation forces to override this)
// A caller is recommended to clone this job
SnapshotClients() (<-chan *ClientResvEntry, error)
// RestoreJobs takes jobsCh, a write-only job channel, that allow a caller
// to send jobs to be added to the job state machine (JSM).
//
// Once RestoreJobs and RestoreClients are complete. call FinalizeRestore
// which replaces the current state of JSM with the jobs provided in the
// channel.
//
// RestoreJobs takes an additional context which can be used to signal
// a cancellation. In this case, the method discards the jobs that were
// provided on the channel, after the cancel is called
//
// Note: It is the responsibility of the caller to close the channels
// and drain the jobsCh
RestoreJobs(ctx context.Context, jobsCh <-chan Job) error
// RestoreClients takes entriesCh, a write-only job channel, that allow
// a caller to send clientResvEntry structs to be added to the job state
// machine (JSM).
//
// Once RestoreJobs and RestoreClients are complete. call FinalizeRestore
// which replaces the current state of JSM with the jobs provided in the
// channel.
//
// RestoreClients takes an additional context which can be used to signal
// a cancellation. In this case, the method discards the clientResvEntries
// that were provided on the channel, after the cancel is called
//
// Note: It is the responsibility of the caller to close the channels
// and drain the jobsCh
RestoreClients(ctx context.Context, nClients int, entriesCh <-chan *ClientResvEntry) error
// Finalize Restore overwrites the state of the job-state-machine
// with the current state of the snapshot
//
// Once RestoreJobs and RestoreClients are complete. call FinalizeRestore
// which replaces the current state of JSM with the jobs provided in the
// channel.
FinalizeRestore()
}
JSMSnapshot provides methods allowing a caller to read & restore jobs out of the job state machine.
type Job ¶
type Job interface {
// ID is a unique identifier integer for this job (generated by the server)
ID() JobID
// Priority is an integer < 2**32. Jobs with smaller priority values will be
// scheduled before jobs with larger priorities. The most urgent priority is 0;
// the least urgent priority is 4,294,967,295.
Priority() uint32
// UpdatePriority to a new value. Return back the newly set value
UpdatePriority(newPriority uint32) uint32
// Delay is an integer number of seconds to wait before putting the job in
// the ready queue. The job will be in the "delayed" state during this time.
// Maximum delay is 2**32-1.
Delay() int64
// Update Delay to a new value. Return back the newly set value
UpdateDelay(newDelay int64) int64
// TTR/time to run -- is an integer number of seconds to allow a worker
// to run this job. This time is counted from the moment a worker reserves
// this job. If the worker does not delete, release, or bury the job within
// <ttr> seconds, the job will time out and the server will release the job.
// The minimum ttr is 1. If the client sends 0, the server will silently
// increase the ttr to 1. Maximum ttr is 2**32-1.
TTR() int
// BodySize is an integer indicating the size of the job body, not including the
// trailing "\r\n". This value must be less than max-job-size (default: 2**16)
BodySize() int
// Body is the job body -- a sequence of bytes of length BodySize
Body() []byte
// TubeName the name of the tube associated with this job
TubeName() TubeName
// CreatedAt - Indicates the time, when this job is created
CreatedAt() int64
// ReadyAt - Indicates the time when the job is ready
ReadyAt() int64
// Reset the ReadyAt time taking the current time in reference
// Return back the new readyAt time
UpdateReadyAt(nowSeconds int64) (int64, error)
// Retrieve the current job state
State() JobState
// Update the job state
UpdateState(newState JobState)
// Return the time at which the reservation expires
ExpiresAt() int64
// Reset the reservation time taking the current in reference
// Return back the new reservation time
UpdateReservation(nowSeconds int64) (int64, error)
// ReservedBy returns the name of the client which has
// reserved this job. Empty string if this job is not reserved
ReservedBy() ClientID
// Update the reservedBy client
UpdateReservedBy(clientID ClientID)
// Returns the time this specific job was buried
BuriedAt() int64
// Reset the buriedAt value to zero
ResetBuriedAt()
// Update the buriedAt value to the current clock
// Return back the new BuriedAt time
UpdateBuriedAt(nowSeconds int64) int64
ReserveCount() uint32
IncReserveCount()
TimeoutCount() uint32
IncTimeoutCount()
ReleaseCount() uint32
IncReleaseCount()
BuryCount() uint32
IncBuryCount()
KickCount() uint32
IncKickCount()
}
type JobEntry ¶
type JobEntry struct {
Job
// contains filtered or unexported fields
}
JobEntry is an entry in the JobHeap
type JobHeap ¶
type JobHeap interface {
// Enqueue appends an entry to the job heap in priority order
Enqueue(job Job) *JobEntry
// Dequeue returns a from the heap in priority order
Dequeue() Job
// RemoveAt removes a specific job entry.
RemoveAt(jobEntry *JobEntry) (*JobEntry, error)
// Len returns the heap length
Len() int
// Peek returns the top element of the heap without dequeuing it
Peek() *JobEntry
// Return the number of jobs found in the specific tube
JobCountByTube(tubename TubeName) uint64
}
JobHeap is a binary heap of jobs
type JobIndex ¶
type JobIndex struct {
// contains filtered or unexported fields
}
An index of indexEntry indexed by a job ID
func NewJobIndex ¶
func NewJobIndex() *JobIndex
type PriorityJobs ¶
type PriorityJobs JobHeap
PriorityJobs is a JobHeap, with jobs ordered by its Priority. Lower priority values takes a higher precedence.
func NewPriorityJobs ¶
func NewPriorityJobs() PriorityJobs
type Reservation ¶
type Reservation struct {
RequestId string
ClientId ClientID
JobId JobID
Status ReservationStatus
BodySize int
Body []byte
Error error
}
func (Reservation) String ¶
func (r Reservation) String() string
type ReservationStatus ¶
type ReservationStatus int
go:generate stringer -type=ReservationStatus --output state_string.go
const ( Unknown ReservationStatus = iota Queued DeadlineSoon Matched Timeout Error )
type ReservedJobs ¶
type ReservedJobs JobHeap
ReservedJobs is a JobHeap, with jobs ordered by its ExpiresAt field. Lower (earlier) ExpiresAt take a higher precedence
func NewBuriedJobs ¶
func NewBuriedJobs() ReservedJobs
func NewReservedJobs ¶
func NewReservedJobs() ReservedJobs
type ResultError ¶
type ResultError struct {
// ID of the request
RequestID string
// Identifier for the error code
ErrorCode int32
// Error
Err error
}
func (*ResultError) Error ¶
func (r *ResultError) Error() string
func (*ResultError) Unwrap ¶
func (r *ResultError) Unwrap() error