chain

package
v2.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2023 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Overview

Package chain implements a job chain. It provides the ability to traverse a chain and run all of the jobs in it.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNotFound        = errors.New("chain not found in repo")
	ErrConflict        = errors.New("chain already exists in repo")
	ErrMultipleDeleted = errors.New("multiple chains deleted")
)
View Source
var (
	// Returned when Stop is called but the chain has already been suspended.
	ErrShuttingDown = fmt.Errorf("chain not stopped because traverser is shutting down")
)

Functions

func NewMemoryRepo

func NewMemoryRepo() *memoryRepo

NewMemoryRepo returns a repo that is backed by a thread-safe map in memory.

func NewTraverser

func NewTraverser(cfg TraverserConfig) *traverser

func Validate

func Validate(jobChain proto.JobChain, new bool) error

Validate checks if a job chain is valid. It returns an error if it's not. new indicates if the job chain is new (true) or suspended (false). New job chains can have only PENDING jobs, but suspended jobs chains can have PENDING or STOPPED jobs.

Types

type Chain

type Chain struct {
	// contains filtered or unexported fields
}

chain represents a job chain and some meta information about it.

func NewChain

func NewChain(jc *proto.JobChain, sequenceTries map[string]uint, totalJobTries map[string]uint, latestRunJobTries map[string]uint) *Chain

NewChain takes a JobChain proto and maps of sequence + jobs tries, and turns them into a Chain that the JR can use.

func (*Chain) CanRetrySequence

func (c *Chain) CanRetrySequence(jobId string) bool

func (*Chain) FailedJobs

func (c *Chain) FailedJobs() uint

FailedJobs returns the number of failed jobs. This is used by reapers to determine if a chain failed, or if it can be finalized as stopped or suspended.

func (*Chain) FinishedJobs

func (c *Chain) FinishedJobs() uint

func (*Chain) IncrementFinishedJobs

func (c *Chain) IncrementFinishedJobs(delta int)

IncrementFinishedJobs increments the finished jobs count by delta. Negative delta is given on sequence retry.

func (*Chain) IncrementJobTries

func (c *Chain) IncrementJobTries(jobId string, delta int)

func (*Chain) IncrementSequenceTries

func (c *Chain) IncrementSequenceTries(jobId string, delta int)

func (*Chain) IsDoneRunning

func (c *Chain) IsDoneRunning() (done bool, complete bool)

IsDoneRunning returns two booleans: done indicates if there are running or runnable jobs, and complete indicates if all jobs finished successfully (STATE_COMPLETE).

A chain is complete iff every job finished successfully (STATE_COMPLETE).

A chain is done running if there are no running or runnable jobs. The reaper waits for running jobs to reap them. Reapers roll back failed jobs if the sequence can be retried. Consequently, failed jobs do not mean the chain is done, and they do not immediately fail the whole chain.

Stopped jobs are not runnable in this context (i.e. chain context). This function applies to the current chain run. Once a job is stopped, it cannot be re-run in the current chain run. If the chain is re-run (i.e. resumed), IsRunnable will return true for stopped jobs because stopped jobs are runnable in that context (i.e. job context).

For chain A -> B -> C, if B is stopped, C is not runnable; the chain is done. But add job D off A (A -> D) and although B is stopped, if D is pending then the chain is not done. This is a side-effect of not stopping/failing the whole chain when a job stops/fails. Instead, the chain continues to run independent sequences.

func (*Chain) IsRunnable

func (c *Chain) IsRunnable(jobId string) bool

IsRunnable returns true if the job is runnable. A job is runnable iff its state is PENDING and all immediately previous jobs are state COMPLETE.

func (*Chain) IsSequenceStartJob

func (c *Chain) IsSequenceStartJob(jobId string) bool

func (*Chain) JobState

func (c *Chain) JobState(jobId string) byte

JobState returns the state of a given job.

func (*Chain) JobTries

func (c *Chain) JobTries(jobId string) (cur uint, total uint)

func (*Chain) NextJobs

func (c *Chain) NextJobs(jobId string) proto.Jobs

NextJobs finds all of the jobs adjacent to the given job.

func (*Chain) RequestId

func (c *Chain) RequestId() string

RequestId returns the request id of the job chain.

func (*Chain) RunnableJobs

func (c *Chain) RunnableJobs() proto.Jobs

RunnableJobs returns a list of all jobs that are runnable. A job is runnable iff its state is PENDING and all immediately previous jobs are state COMPLETE.

func (*Chain) SequenceStartJob

func (c *Chain) SequenceStartJob(jobId string) proto.Job

func (*Chain) SequenceTries

func (c *Chain) SequenceTries(jobId string) uint

func (*Chain) SetJobState

func (c *Chain) SetJobState(jobId string, state byte)

Set the state of a job in the chain.

func (*Chain) SetState

func (c *Chain) SetState(state byte)

SetState sets the chain's state.

func (*Chain) State

func (c *Chain) State() byte

State returns the chain's state.

func (*Chain) ToSuspended

func (c *Chain) ToSuspended() proto.SuspendedJobChain

type ChainReaperFactory

type ChainReaperFactory struct {
	Chain        *Chain
	ChainRepo    Repo
	Logger       *log.Entry
	RMClient     rm.Client
	RMCTries     int            // times to try sending info to RM
	RMCRetryWait time.Duration  // time to wait between tries to send info to RM
	DoneJobChan  chan proto.Job // chan jobs are reaped from
	RunJobChan   chan proto.Job // (running reaper) chan jobs to run are sent to
	RunnerRepo   runner.Repo    // (stopped + suspended reapers) repo of job runners
}

Implements ReaperFactory, creating 3 types of reapers - for a normally running chain, a stopped chain, or a suspended chain.

func (*ChainReaperFactory) MakeRunning

func (f *ChainReaperFactory) MakeRunning() JobReaper

Make a JobReaper for use on a running job chain.

func (*ChainReaperFactory) MakeStopped

func (f *ChainReaperFactory) MakeStopped() JobReaper

Make a JobReaper for use on a job chain being stopped.

func (*ChainReaperFactory) MakeSuspended

func (f *ChainReaperFactory) MakeSuspended() JobReaper

Make a JobReaper for use on a job chain being suspended.

type ErrInvalidChain

type ErrInvalidChain struct {
	Message string
}

ErrInvalidChain is the error returned when a chain is not valid.

func (ErrInvalidChain) Error

func (e ErrInvalidChain) Error() string

type JobReaper

type JobReaper interface {
	// Run reaps done jobs from doneJobChan, saving their states and enqueing
	// any jobs that should be run to runJobChan. When there are no more jobs to
	// reap, Run finalizes the chain and returns.
	Run()

	// Stop stops the JobReaper from reaping any more jobs. It blocks until
	// Run() returns and the reaper can be safely switched out for another
	// implementation.
	Stop()
}

A JobReaper handles jobs and chains that have finished running.

The chain's current state (running as normal, stopped, or suspended) influences how jobs are handled once they finish running, and how the chain is handled once there are no more jobs to run. There are different implementations of the JobReaper for each of these cases - a running, stopped, or suspended chain.

type ReaperFactory

type ReaperFactory interface {
	MakeRunning() JobReaper
	MakeSuspended() JobReaper
	MakeStopped() JobReaper
}

A ReaperFactory makes new JobReapers.

type Repo

type Repo interface {
	Get(string) (*Chain, error)
	Add(*Chain) error
	Set(*Chain) error
	Remove(string) error
	GetAll() ([]*Chain, error)
}

Repo stores and provides thread-safe access to job chains.

type RunningChainReaper

type RunningChainReaper struct {
	// contains filtered or unexported fields
}

Job Reaper for running chains.

func (*RunningChainReaper) Finalize

func (r *RunningChainReaper) Finalize(complete bool)

Finalize determines the final state of the chain and sends it to the Request Manager.

func (*RunningChainReaper) Reap

func (r *RunningChainReaper) Reap(job proto.Job)

reap takes a job that just finished running, saves its final state, and prepares to continue running the chain (or recognizes that the chain is done running).

If chain is done: save final state + stop running more jobs. If job failed: retry sequence if possible. If job completed: prepared subsequent jobs and enqueue if runnable.

func (*RunningChainReaper) Run

func (r *RunningChainReaper) Run()

Run reaps jobs when they finish running. For each job reaped, if... - chain is done: save final state + send to RM. - job failed: retry sequence if possible. - job completed: prepared subsequent jobs and enqueue if runnable.

func (*RunningChainReaper) Stop

func (r *RunningChainReaper) Stop()

Stop stops the reaper from reaping any more jobs. It blocks until the reaper is stopped (will reap no more jobs and Run will return).

type StoppedChainReaper

type StoppedChainReaper struct {
	// contains filtered or unexported fields
}

Job Reaper for chains that are being stopped.

func (*StoppedChainReaper) Finalize

func (r *StoppedChainReaper) Finalize()

Finalize determines the final state of the chain and sends it to the Request Manager.

func (*StoppedChainReaper) Reap

func (r *StoppedChainReaper) Reap(job proto.Job)

reap takes a done job and saves its state.

func (*StoppedChainReaper) Run

func (r *StoppedChainReaper) Run()

Run reaps jobs when they finish running. For each job reaped, its state is saved.

func (*StoppedChainReaper) Stop

func (r *StoppedChainReaper) Stop()

Stop stops the reaper from reaping any more jobs. It blocks until the reaper is stopped (will reap no more jobs and Run will return).

type SuspendedChainReaper

type SuspendedChainReaper struct {
	// contains filtered or unexported fields
}

Job Reaper for chains that are being suspended (stopped to be resumed later).

func (*SuspendedChainReaper) Finalize

func (r *SuspendedChainReaper) Finalize()

Finalize checks if the chain is done running or needs to be resumed later and either sends the Request Manager the chain's final state or a SuspendedJobChain that can be used to resume running the chain.

func (*SuspendedChainReaper) Reap

func (r *SuspendedChainReaper) Reap(job proto.Job)

reap takes a done job, saves its state, and prepares the chain to be resumed at a later time.

If job is... Completed: prepare subsequent jobs (copy jobData). Failed: prepare a sequence retry. Stopped: nothing (job will be retried when chain is resumed).

func (*SuspendedChainReaper) Run

func (r *SuspendedChainReaper) Run()

Run reaps jobs when they finish running. For each job reaped, if it's... - completed: prepare subsequent jobs (copy jobData). - failed: prepare a sequence retry. - stopped: do nothing (job will be retried when chain is resumed).

func (*SuspendedChainReaper) Stop

func (r *SuspendedChainReaper) Stop()

Stop stops the reaper from reaping any more jobs. It blocks until the reaper is stopped (will reap no more jobs and Run will return).

type Traverser

type Traverser interface {
	// Run traverses a job chain and runs all of the jobs in it. It starts by
	// running the first job in the chain, and then, if the job completed,
	// successfully, running its adjacent jobs. This process continues until there
	// are no more jobs to run, or until the Stop method is called on the traverser.
	Run()

	// Stop makes a traverser stop traversing its job chain. It also sends a stop
	// signal to all of the jobs that a traverser is running.
	//
	// It returns an error if it fails to stop all running jobs.
	Stop() error

	// Running returns all currently running jobs. The status.Manager uses this
	// to report running status.
	Running() []proto.JobStatus
}

A Traverser provides the ability to run a job chain while respecting the dependencies between the jobs.

type TraverserConfig

type TraverserConfig struct {
	Chain         *Chain
	ChainRepo     Repo
	RunnerFactory runner.Factory
	RMClient      rm.Client
	ShutdownChan  chan struct{}
	StopTimeout   time.Duration
	SendTimeout   time.Duration
}

type TraverserFactory

type TraverserFactory interface {
	Make(*proto.JobChain) (Traverser, error)
	MakeFromSJC(*proto.SuspendedJobChain) (Traverser, error)
}

A TraverserFactory makes a new Traverser.

func NewTraverserFactory

func NewTraverserFactory(chainRepo Repo, rf runner.Factory, rmc rm.Client, shutdownChan chan struct{}) TraverserFactory

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL