Documentation ¶
Overview ¶
Package scheduler implements a runtime scheduler for cff with support for interdependent jobs.
To use the scheduler, build one with Config.New, providing the desired maximum number of goroutines.
cfg := scheduler.Config{Concurrency: 4} sched := cfg.New()
With a scheduler available, enqueue jobs into it with the Enqueue method.
j1 := sched.Enqueue(ctx, Job{..})
The scheduler will begin running this job as soon as a worker is available.
Enqueue returns a reference to the scheduled job. Use this reference in other Enqueue calls to specify dependencies for jobs.
j3 := sched.Enqueue(ctx, Job{ ..., Dependencies: []*scheduler.ScheduledJob{j1, j2}, })
j3 will not be run until j1 and j2 have finished successfully.
Dependencies must be enqueued before jobs that depend on them. This adds the burden of dependency order resolution on the caller.
After enqueuing all jobs, await completion with Scheduler.Wait. This is comparable to WaitGroup.Wait().
err := sched.Wait(ctx)
If any of the enqueued jobs failed, the remaining jobs will be aborted and Wait will return the error. This may be changed by setting Config.ContinueOnError.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { // Concurrency is the number of concurrent workers to schedule tasks to. // // Defaults to max(GOMAXPROCS, 4). Concurrency int // Emitter provides a hook into the state of the scheduler. Emitter Emitter // StateFlushFrequency is how often the scheduler will emit metrics with the // emitter. // // Defaults to 100 milliseconds. StateFlushFrequency time.Duration // ContinueOnError, if true when a job fails, directs the scheduler to // record its failure, invalidate all jobs that depend on the failed job, // and keep running. ContinueOnError bool }
Config stores parameters the scheduler should run with and is the entry point for running the scheduler.
type Job ¶
type Job struct { // Run executes the job and returns the error it encountered, if any. Run func(context.Context) error // Dependencies are previously enqueued jobs that must run before this // job. Dependencies []*ScheduledJob }
Job is an independent executable unit meant to be executed by the scheduler.
type ScheduledJob ¶
type ScheduledJob struct {
// contains filtered or unexported fields
}
ScheduledJob is a job that has been scheduled for execution by the scheduler.
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler schedules jobs for a cff flow or parallel.
func (*Scheduler) Enqueue ¶
func (s *Scheduler) Enqueue(ctx context.Context, j Job) *ScheduledJob
Enqueue queues up a job for execution with the scheduler. The returned object may be used as a dependency for other jobs.
Enqueue will panic if called after calling Wait.
type State ¶
type State struct { // Pending is total number of jobs, including jobs being executed and // waiting to be executed. Pending int // Ready is number of jobs to be executed but awaiting a free worker. // If this number is consistently high, increase the concurrency for this // flow. Ready int // Waiting is number of jobs waiting for other jobs to be finished before // being scheduled. If this number is consistently high, your flow has a // task that bottlenecks its performance, consider analyzing and // restructuring dependencies. Waiting int // IdleWorkers reports the number of workers that don't have any jobs to run. // If this is consistently high, decrease the concurrency for this flow. IdleWorkers int // Concurrency is the number of workers the scheduler can process tasks // with. Concurrency int }
State describes the status of jobs managed by the cff scheduler.