Documentation ¶
Index ¶
- Variables
- func NewTestStepEventsEmitterFactory(vault storage.EngineVault, jobID types.JobID, runID types.RunID, ...) *testStepEventsEmitterFactory
- type AddTargetToStep
- type ChanNotifier
- type JobRunner
- func (jr *JobRunner) BuildRunStatus(ctx xcontext.Context, coordinates job.RunCoordinates, currentJob *job.Job) (*job.RunStatus, error)
- func (jr *JobRunner) BuildRunStatuses(ctx xcontext.Context, currentJob *job.Job) ([]job.RunStatus, error)
- func (jr *JobRunner) RefreshLocks()
- func (jr *JobRunner) Run(ctx xcontext.Context, j *job.Job, resumeState *job.PauseEventPayload) (*job.PauseEventPayload, error)
- func (jr *JobRunner) StartLockRefresh()
- func (jr *JobRunner) StopLockRefresh()
- type RunStartedPayload
- type StepResult
- type StepRunner
- type TestRunner
- type TestStepEventsEmitterFactory
Constants ¶
This section is empty.
Variables ¶
var EventRunStarted = event.Name("RunStarted")
EventRunStarted indicates that a run has begun
var EventTestError = event.Name("TestError")
EventTestError indicates that a test failed.
Functions ¶
Types ¶
type AddTargetToStep ¶
type ChanNotifier ¶
type ChanNotifier interface {
NotifyCh() <-chan error
}
type JobRunner ¶
type JobRunner struct {
// contains filtered or unexported fields
}
JobRunner implements logic to run, cancel and stop Jobs
func NewJobRunner ¶
func NewJobRunner(js storage.JobStorage, storageVault storage.EngineVault, clk clock.Clock, lockDuration time.Duration) *JobRunner
NewJobRunner returns a new JobRunner, which holds an empty registry of jobs
func (*JobRunner) BuildRunStatus ¶
func (jr *JobRunner) BuildRunStatus(ctx xcontext.Context, coordinates job.RunCoordinates, currentJob *job.Job) (*job.RunStatus, error)
BuildRunStatus builds the status of a run with a job
func (*JobRunner) BuildRunStatuses ¶
func (jr *JobRunner) BuildRunStatuses(ctx xcontext.Context, currentJob *job.Job) ([]job.RunStatus, error)
BuildRunStatuses builds the status of all runs belonging to the job
func (*JobRunner) RefreshLocks ¶
func (jr *JobRunner) RefreshLocks()
RefreshLocks refreshes locks for running or paused jobs.
func (*JobRunner) Run ¶
func (jr *JobRunner) Run(ctx xcontext.Context, j *job.Job, resumeState *job.PauseEventPayload) (*job.PauseEventPayload, error)
Run implements the main job running logic. It holds a registry of all running jobs that can be referenced when when cancellation/pause/stop requests come in
It returns:
- [][]job.Report: all the run reports, grouped by run, sorted from first to last
- []job.Report: all the final reports
- error: an error, if any
func (*JobRunner) StartLockRefresh ¶
func (jr *JobRunner) StartLockRefresh()
StartLockRefresh starts the background lock refresh routine.
func (*JobRunner) StopLockRefresh ¶
func (jr *JobRunner) StopLockRefresh()
StopLockRefresh stops the background lock refresh routine.
type RunStartedPayload ¶
RunStartedPayload represents the payload carried by a failure event (e.g. JobStateFailed, JobStateCancelled, etc.)
type StepResult ¶
type StepResult struct { Err error ResumeState json.RawMessage }
type StepRunner ¶
type StepRunner struct {
// contains filtered or unexported fields
}
func (*StepRunner) Run ¶
func (sr *StepRunner) Run( ctx xcontext.Context, bundle test.TestStepBundle, stepsVariables test.StepsVariables, ev testevent.Emitter, resumeState json.RawMessage, resumeStateTargets []target.Target, ) (AddTargetToStep, []ChanNotifier, ChanNotifier, error)
func (*StepRunner) Started ¶
func (sr *StepRunner) Started() bool
func (*StepRunner) WaitResults ¶
func (sr *StepRunner) WaitResults(ctx context.Context) (stepResult StepResult, err error)
WaitResults returns TestStep.Run() output It returns an error if and only if waiting was terminated by inputQueue ctx argument and returns ctx.Err()
type TestRunner ¶
type TestRunner struct {
// contains filtered or unexported fields
}
TestRunner is the state associated with a test run. Here's how a test run works:
- Each target gets a targetState and a "target handler" - a goroutine that takes that particular target through each step of the pipeline in sequence. It injects the target, waits for the result, then moves on to the next step.
- Each step of the pipeline gets a stepState and:
- A "step runner" - a goroutine that is responsible for running the step's Run() method
- A "step reader" - a goroutine that processes results and sends them on to target handlers that await them.
- After starting all of the above, the main goroutine goes into "monitor" mode that checks on the pipeline's progress and is responsible for closing step input channels when all the targets have been injected.
- Monitor loop finishes when all the targets have been injected into the last step or if a step has encountered an error.
- We then wait for all the step runners and readers to shut down.
- Once all the activity has died down, resulting state is examined and an error is returned, if any.
func NewTestRunner ¶
func NewTestRunner() *TestRunner
func NewTestRunnerWithTimeouts ¶
func NewTestRunnerWithTimeouts(shutdownTimeout time.Duration) *TestRunner
func (*TestRunner) Run ¶
func (tr *TestRunner) Run( ctx xcontext.Context, t *test.Test, targets []*target.Target, emitterFactory TestStepEventsEmitterFactory, resumeState json.RawMessage, ) (json.RawMessage, map[string]error, error)
Run is the main enty point of the runner.