Documentation
¶
Overview ¶
Package chain implements a job chain. It provides the ability to traverse a chain and run all of the jobs in it.
Index ¶
- Variables
- func NewMemoryRepo() *memoryRepo
- func NewTraverser(cfg TraverserConfig) *traverser
- func Validate(jobChain proto.JobChain, new bool) error
- type Chain
- func (c *Chain) CanRetrySequence(jobId string) bool
- func (c *Chain) FailedJobs() uint
- func (c *Chain) FinishedJobs() uint
- func (c *Chain) IncrementFinishedJobs(delta int)
- func (c *Chain) IncrementJobTries(jobId string, delta int)
- func (c *Chain) IncrementSequenceTries(jobId string, delta int)
- func (c *Chain) IsDoneRunning() (done bool, complete bool)
- func (c *Chain) IsRunnable(jobId string) bool
- func (c *Chain) IsSequenceStartJob(jobId string) bool
- func (c *Chain) JobState(jobId string) byte
- func (c *Chain) JobTries(jobId string) (cur uint, total uint)
- func (c *Chain) NextJobs(jobId string) proto.Jobs
- func (c *Chain) RequestId() string
- func (c *Chain) RunnableJobs() proto.Jobs
- func (c *Chain) SequenceStartJob(jobId string) proto.Job
- func (c *Chain) SequenceTries(jobId string) uint
- func (c *Chain) SetJobState(jobId string, state byte)
- func (c *Chain) SetState(state byte)
- func (c *Chain) State() byte
- func (c *Chain) ToSuspended() proto.SuspendedJobChain
- type ChainReaperFactory
- type ErrInvalidChain
- type JobReaper
- type ReaperFactory
- type Repo
- type RunningChainReaper
- type StoppedChainReaper
- type SuspendedChainReaper
- type Traverser
- type TraverserConfig
- type TraverserFactory
Constants ¶
This section is empty.
Variables ¶
var ( ErrNotFound = errors.New("chain not found in repo") ErrConflict = errors.New("chain already exists in repo") ErrMultipleDeleted = errors.New("multiple chains deleted") )
var ( // Returned when Stop is called but the chain has already been suspended. ErrShuttingDown = fmt.Errorf("chain not stopped because traverser is shutting down") )
Functions ¶
func NewMemoryRepo ¶
func NewMemoryRepo() *memoryRepo
NewMemoryRepo returns a repo that is backed by a thread-safe map in memory.
func NewTraverser ¶
func NewTraverser(cfg TraverserConfig) *traverser
Types ¶
type Chain ¶
type Chain struct {
// contains filtered or unexported fields
}
chain represents a job chain and some meta information about it.
func NewChain ¶
func NewChain(jc *proto.JobChain, sequenceTries map[string]uint, totalJobTries map[string]uint, latestRunJobTries map[string]uint) *Chain
NewChain takes a JobChain proto and maps of sequence + jobs tries, and turns them into a Chain that the JR can use.
func (*Chain) CanRetrySequence ¶
func (*Chain) FailedJobs ¶
FailedJobs returns the number of failed jobs. This is used by reapers to determine if a chain failed, or if it can be finalized as stopped or suspended.
func (*Chain) FinishedJobs ¶
func (*Chain) IncrementFinishedJobs ¶
IncrementFinishedJobs increments the finished jobs count by delta. Negative delta is given on sequence retry.
func (*Chain) IncrementJobTries ¶
func (*Chain) IncrementSequenceTries ¶
func (*Chain) IsDoneRunning ¶
IsDoneRunning returns two booleans: done indicates if there are running or runnable jobs, and complete indicates if all jobs finished successfully (STATE_COMPLETE).
A chain is complete iff every job finished successfully (STATE_COMPLETE).
A chain is done running if there are no running or runnable jobs. The reaper waits for running jobs to reap them. Reapers roll back failed jobs if the sequence can be retried. Consequently, failed jobs do not mean the chain is done, and they do not immediately fail the whole chain.
Stopped jobs are not runnable in this context (i.e. chain context). This function applies to the current chain run. Once a job is stopped, it cannot be re-run in the current chain run. If the chain is re-run (i.e. resumed), IsRunnable will return true for stopped jobs because stopped jobs are runnable in that context (i.e. job context).
For chain A -> B -> C, if B is stopped, C is not runnable; the chain is done. But add job D off A (A -> D) and although B is stopped, if D is pending then the chain is not done. This is a side-effect of not stopping/failing the whole chain when a job stops/fails. Instead, the chain continues to run independent sequences.
func (*Chain) IsRunnable ¶
IsRunnable returns true if the job is runnable. A job is runnable iff its state is PENDING and all immediately previous jobs are state COMPLETE.
func (*Chain) IsSequenceStartJob ¶
func (*Chain) RunnableJobs ¶
RunnableJobs returns a list of all jobs that are runnable. A job is runnable iff its state is PENDING and all immediately previous jobs are state COMPLETE.
func (*Chain) SequenceTries ¶
func (*Chain) SetJobState ¶
Set the state of a job in the chain.
func (*Chain) ToSuspended ¶
func (c *Chain) ToSuspended() proto.SuspendedJobChain
type ChainReaperFactory ¶
type ChainReaperFactory struct { Chain *Chain ChainRepo Repo Logger *log.Entry RMClient rm.Client RMCTries int // times to try sending info to RM RMCRetryWait time.Duration // time to wait between tries to send info to RM DoneJobChan chan proto.Job // chan jobs are reaped from RunJobChan chan proto.Job // (running reaper) chan jobs to run are sent to RunnerRepo runner.Repo // (stopped + suspended reapers) repo of job runners }
Implements ReaperFactory, creating 3 types of reapers - for a normally running chain, a stopped chain, or a suspended chain.
func (*ChainReaperFactory) MakeRunning ¶
func (f *ChainReaperFactory) MakeRunning() JobReaper
Make a JobReaper for use on a running job chain.
func (*ChainReaperFactory) MakeStopped ¶
func (f *ChainReaperFactory) MakeStopped() JobReaper
Make a JobReaper for use on a job chain being stopped.
func (*ChainReaperFactory) MakeSuspended ¶
func (f *ChainReaperFactory) MakeSuspended() JobReaper
Make a JobReaper for use on a job chain being suspended.
type ErrInvalidChain ¶
type ErrInvalidChain struct {
Message string
}
ErrInvalidChain is the error returned when a chain is not valid.
func (ErrInvalidChain) Error ¶
func (e ErrInvalidChain) Error() string
type JobReaper ¶
type JobReaper interface { // Run reaps done jobs from doneJobChan, saving their states and enqueing // any jobs that should be run to runJobChan. When there are no more jobs to // reap, Run finalizes the chain and returns. Run() // Stop stops the JobReaper from reaping any more jobs. It blocks until // Run() returns and the reaper can be safely switched out for another // implementation. Stop() }
A JobReaper handles jobs and chains that have finished running.
The chain's current state (running as normal, stopped, or suspended) influences how jobs are handled once they finish running, and how the chain is handled once there are no more jobs to run. There are different implementations of the JobReaper for each of these cases - a running, stopped, or suspended chain.
type ReaperFactory ¶
type ReaperFactory interface { MakeRunning() JobReaper MakeSuspended() JobReaper MakeStopped() JobReaper }
A ReaperFactory makes new JobReapers.
type Repo ¶
type Repo interface { Get(string) (*Chain, error) Add(*Chain) error Set(*Chain) error Remove(string) error GetAll() ([]*Chain, error) }
Repo stores and provides thread-safe access to job chains.
type RunningChainReaper ¶
type RunningChainReaper struct {
// contains filtered or unexported fields
}
Job Reaper for running chains.
func (*RunningChainReaper) Finalize ¶
func (r *RunningChainReaper) Finalize(complete bool)
Finalize determines the final state of the chain and sends it to the Request Manager.
func (*RunningChainReaper) Reap ¶
func (r *RunningChainReaper) Reap(job proto.Job)
reap takes a job that just finished running, saves its final state, and prepares to continue running the chain (or recognizes that the chain is done running).
If chain is done: save final state + stop running more jobs. If job failed: retry sequence if possible. If job completed: prepared subsequent jobs and enqueue if runnable.
func (*RunningChainReaper) Run ¶
func (r *RunningChainReaper) Run()
Run reaps jobs when they finish running. For each job reaped, if... - chain is done: save final state + send to RM. - job failed: retry sequence if possible. - job completed: prepared subsequent jobs and enqueue if runnable.
func (*RunningChainReaper) Stop ¶
func (r *RunningChainReaper) Stop()
Stop stops the reaper from reaping any more jobs. It blocks until the reaper is stopped (will reap no more jobs and Run will return).
type StoppedChainReaper ¶
type StoppedChainReaper struct {
// contains filtered or unexported fields
}
Job Reaper for chains that are being stopped.
func (*StoppedChainReaper) Finalize ¶
func (r *StoppedChainReaper) Finalize()
Finalize determines the final state of the chain and sends it to the Request Manager.
func (*StoppedChainReaper) Reap ¶
func (r *StoppedChainReaper) Reap(job proto.Job)
reap takes a done job and saves its state.
func (*StoppedChainReaper) Run ¶
func (r *StoppedChainReaper) Run()
Run reaps jobs when they finish running. For each job reaped, its state is saved.
func (*StoppedChainReaper) Stop ¶
func (r *StoppedChainReaper) Stop()
Stop stops the reaper from reaping any more jobs. It blocks until the reaper is stopped (will reap no more jobs and Run will return).
type SuspendedChainReaper ¶
type SuspendedChainReaper struct {
// contains filtered or unexported fields
}
Job Reaper for chains that are being suspended (stopped to be resumed later).
func (*SuspendedChainReaper) Finalize ¶
func (r *SuspendedChainReaper) Finalize()
Finalize checks if the chain is done running or needs to be resumed later and either sends the Request Manager the chain's final state or a SuspendedJobChain that can be used to resume running the chain.
func (*SuspendedChainReaper) Reap ¶
func (r *SuspendedChainReaper) Reap(job proto.Job)
reap takes a done job, saves its state, and prepares the chain to be resumed at a later time.
If job is... Completed: prepare subsequent jobs (copy jobData). Failed: prepare a sequence retry. Stopped: nothing (job will be retried when chain is resumed).
func (*SuspendedChainReaper) Run ¶
func (r *SuspendedChainReaper) Run()
Run reaps jobs when they finish running. For each job reaped, if it's... - completed: prepare subsequent jobs (copy jobData). - failed: prepare a sequence retry. - stopped: do nothing (job will be retried when chain is resumed).
func (*SuspendedChainReaper) Stop ¶
func (r *SuspendedChainReaper) Stop()
Stop stops the reaper from reaping any more jobs. It blocks until the reaper is stopped (will reap no more jobs and Run will return).
type Traverser ¶
type Traverser interface { // Run traverses a job chain and runs all of the jobs in it. It starts by // running the first job in the chain, and then, if the job completed, // successfully, running its adjacent jobs. This process continues until there // are no more jobs to run, or until the Stop method is called on the traverser. Run() // Stop makes a traverser stop traversing its job chain. It also sends a stop // signal to all of the jobs that a traverser is running. // // It returns an error if it fails to stop all running jobs. Stop() error // Running returns all currently running jobs. The status.Manager uses this // to report running status. Running() []proto.JobStatus }
A Traverser provides the ability to run a job chain while respecting the dependencies between the jobs.
type TraverserConfig ¶
type TraverserFactory ¶
type TraverserFactory interface { Make(*proto.JobChain) (Traverser, error) MakeFromSJC(*proto.SuspendedJobChain) (Traverser, error) }
A TraverserFactory makes a new Traverser.