jobExecutor

package module
v2.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 25, 2023 License: MIT Imports: 14 Imported by: 0

README

jobExecutor

go module to assist in running jobs in multiple goroutines and print their output

features:

  • Can set the max concurrent jobs with: SetMaxConcurrentJobs, default to runtime. GOMAXPROCS ()
  • Can run commands and "runnable" functions (they must return a string and an error)
  • Can handle job dependencies by running them in topological order
  • Can register handlers for the following events:
    • OnJobsStart: called before any job start
    • OnJobStart: called before each job start
    • OnJobDone: called after each job terminated
    • OnJobsDone: called after all jobs are terminated
  • Fluent interface: you can chain methods call
  • Can add jobs programmatically
  • Can display a progress report of ongoing jobs
  • Can display output using custom templates

Usage:

Adding some jobs and executing them
package main

import (
	"errors"
	"fmt"
	"math/rand"
	"os"
	"os/exec"
	"strings"
	"time"

	"github.com/software-t-rex/go-jobExecutor"
)

func longFunction() (string, error) {
	duration := time.Duration(rand.Intn(5)) * time.Millisecond
	time.Sleep(duration)
	if rand.Intn(10) <= 7 { // random failure
		return fmt.Sprintf("- runnable succeed in %v\n", duration), nil
	}
	return fmt.Sprintf("- runnable Failed in %v\n", duration), errors.New("error while asleep")
}

func longFunction2() (string, error) {
	res, err := longFunction()
	if err == nil {
		res = strings.Replace(res, "runnable", "runnable2", -1)
	}
	return res, err
}

func main() {
	// set max concurrent jobs (not required default to GOMAXPROCS)
	jobExecutor.SetMaxConcurrentJobs(8)
	executor := jobExecutor.NewExecutor()
	// add some "runnable" functions
	executor.AddJobFns(longFunction, longFunction2)
	// add a single command
	executor.AddJobCmds(exec.Command("ls", "-l"))
	// or multiple command at once
	executor.AddJobCmds(
		exec.Command("sleep", "5"),
		exec.Command("sleep", "2"),
	)
	// there's also AddJob and AddJobs that are not chainable but that returns a job api instead
	myJob := executor.AddJob(exec.Command("sleep", "1"))
	jobs := executor.AddJobs(
		exec.Command("sleep", "2"),
		jobExecutor.NamedJob("MyNamedJob", longFunction) // you can wrap in a NamedJob structure to add job with a name
	)

	// execute them and get errors if any
	jobErrors := executor.Execute()
	if len(jobErrors) > 0 {
		fmt.Fprintln(os.Stderr, jobErrors)
	}
}
Handling dependencies between jobs

This is based on Directed Acyclic Graph, and using the khan algorithm to topologically sort the jobs.

func main() {
	executor := jobExecutor.NewExecutor()
	jobs := executor.addJobs(
		exec.Command("sleep", "1"),
		exec.Command("exit", "1"),
		exec.Command("ls", "-l"),
	)
	executor.AddDependencies(jobs[0], jobs[1]) // sleep will never run as it depends on a job that always fails
	// execute them respecting dependencies
	jobErrors := executor.DagExecute()
	if len(jobErrors) > 0 {
		fmt.Fprintln(os.Stderr, jobErrors)
	}
}
Binding some event handlers:
func main () {
	executor := jobExecutor.NewExecutor()

	// add a simple command
	executor.AddJobCmds(exec.Command("sleep", "5"))

	// binding some event handlers (can be done anytime before calling Execute)
	// you can call the same method multiple times to bind more than one handler
	// they will be called in order
	executor.
		OnJobsStart(func(jobs jobExecutor.JobList) {
			fmt.Printf("Starting %d jobs\n", len(jobs))
		}).
		OnJobStart(func (jobs jobExecutor.JobList, jobId int) {
			fmt.Printf("Starting jobs %d\n", jobId)
		}).
		OnJobDone(func (jobs jobExecutor.JobList, jobId int) {
			job:=jobs[jobId]
			if job.IsState(jobExecutor.JobStateFailed) {
				fmt.Printf("job %d terminanted with error: %s\n", jobId, job.Err)
			}
		}).
		OnJobsDone(func (jobExecutor.JobList) {
			fmt.Println("Done")
		})

	// add some "runnable" functions and execute
	executor.AddJobFns( longFunction, longFunction2).Execute()
}
Display state of running jobs:

func main() {
	jobExecutor.SetMaxConcurrentJobs(5)
	executor := jobExecutor.NewExecutor().WithOngoingStatusOutput()
	// add a command and set its display name in output templates (there's a AddNamedJobFn too)
	executor.AddNamedJobCmd("Wait for 2 seconds", exec.Command("sleep", "2"))

	executor.AddJobCmds(
		exec.Command("sleep", "10"),
		exec.Command("sleep", "9"),
		exec.Command("sleep", "8"),
		exec.Command("sleep", "7"),
		exec.Command("sleep", "6"),
		exec.Command("sleep", "5"),
		exec.Command("sleep", "4"),
		exec.Command("sleep", "3"),
		exec.Command("sleep", "2"),
		exec.Command("sleep", "1"),
	).Execute()
}
Other outputs methods:
  • WithProgressBarOutput: Display a progress bar while status are running
  • WithOrderedOutput: output ordered res and errors at the end
  • WithFifoOutput: output res and errors as they arrive
  • WithStartOutput: output a line when launching a job
  • WithStartSummary: output a summary of jobs to do
  • WithInterleavedOutput: output lines as they arrive prefixed by job name
Change output formats

All output methods use a go template which you can override by calling the method

jobExecutor.SetTemplateString(myTemplateString)

the template string must contains following templates definition:

  • startSummary
  • jobStatusLine
  • jobStatusFull
  • doneReport
  • startProgressReport
  • progressReport You can look at output.gtpl file for an example

Alternatively, you can pass a template bound to a specific executor like this:

executor := jobExecutor.NewExecutorWithTemplate(myTemplate)
A note about stdin and stdout

The default behavior of jobExecutor is to run exec.Cmd using the CombinedOutput method. This allows to print grouped output for jobs as in most of with*Output methods. If you have set exec.Cmd.Stdout and/or Stderr, it will then rely on the exec.Cmd.Run method instead. It won't collect stderr or stdout for you anymore. Some output methods like the withInterleavedOutput use this internally. Most of the time this won't impact you as a user of this package, but in case you're diving in customizing a lot the way you handle the output it may be important to know how this work.

Generate a graphviz dot textual representation of the job execution

You can generate a graph representation of the jobs already added to the executor by calling the method GetDot

fmt.println(executor.GetDot())
// output from a test case
digraph G{
	graph [bgcolor="#121212" fontcolor="black" rankdir="RL"]
	node [colorscheme="set312" style="filled,rounded" shape="box"]
	edge [color="#f0f0f0"]
	0 [label="fn 0" color="1"]
	1 [label="fn 1" color="2"]
	2 [label="fn 2" color="3"]
	3 [label="fn 3" color="4"]
	4 [label="cmd 4" color="5"]
	5 [label="cmd 5" color="6"]
	6 [label="cmd 6" color="7"]
	7 [label="cmd 7" color="8"]
	8 [label="cmd 8" color="9"]
	0 -> 1
	0 -> 5
	2 -> 3
	4 -> 7
	6 -> 2
	7 -> 8
	7 -> 0
	{rank=same; 1;3;5;8}
}

you can see the result here https://bit.ly/40wXkwD

Contributing

Contributions are welcome, but please make small independent commits when you contribute, it makes the review process a lot easier for me.

Funding / Sponsorship

If you like my work, and find it useful to you or your company, you can sponsors my work here: become sponsors to the project.

Documentation

Overview

Copyright © 2023 Jonathan Gotti <jgotti at jgotti dot org> SPDX-FileType: SOURCE SPDX-License-Identifier: MIT SPDX-FileCopyrightText: 2023 Jonathan Gotti <jgotti@jgotti.org>

Index

Constants

View Source
const (
	JobStatePending = 0
	JobStateRunning = 1
	JobStateDone    = 2
	JobStateSucceed = 4
	JobStateFailed  = 8
)

Variables

View Source
var ErrCyclicDependencyDetected = fmt.Errorf("cyclic dependencies detected")
View Source
var ErrRequiredJobFailed = fmt.Errorf("required job failed")
View Source
var ErrUndefinedTemplate = fmt.Errorf("template is not defined, see jobExecutor.setTemplate")

Functions

func NewPrefixedWriter added in v2.1.2

func NewPrefixedWriter(buf io.Writer, prefix string) *prefixedWriter

func SetMaxConcurrentJobs

func SetMaxConcurrentJobs(n int)

set the default number of concurrent jobs to run default to GOMAXPROCS

func SetTemplateString

func SetTemplateString(templateString string)

Template for all outputs related to jobs It must define the following templates:

  • startSummary: which will receive a JobList
  • jobStatus: which will receive a single job
  • progressReport: which will receive a jobList
  • doneReport: which will receive a jobList

Types

type Job added in v2.1.0

type Job struct {
	// contains filtered or unexported fields
}

func (*Job) CombinedOutput added in v2.1.0

func (j *Job) CombinedOutput() string

return the combinedOutput of job (only after execution) this is concurrency safe

func (*Job) Err added in v2.1.0

func (j *Job) Err() error

return the error returned by a job if any (only after execution) this is concurrency safe

func (*Job) Id added in v2.1.0

func (j *Job) Id() int

return internal job Id, correspond to insertion order in an executor

func (*Job) IsCmdJob added in v2.1.0

func (j *Job) IsCmdJob() bool

check the given job is of *exec.Cmd type

func (*Job) IsFnJob added in v2.1.0

func (j *Job) IsFnJob() bool

check the given job is of func() (string, error) type

func (*Job) IsState added in v2.1.0

func (j *Job) IsState(state int) bool

allow to check the status of the job (concurrency safe)

job.IsState(jobExecutor.JobStateSucceed)
job.IsState(jobExecutor.JobStateRunning)

func (*Job) Name added in v2.1.0

func (j *Job) Name() string

return the assigned name of a job or a computed one

type JobExecutor

type JobExecutor struct {
	// contains filtered or unexported fields
}

func NewExecutor

func NewExecutor() *JobExecutor

Instanciate a new JobExecutor

func NewExecutorWithTemplate added in v2.1.0

func NewExecutorWithTemplate(template *template.Template) *JobExecutor

func (*JobExecutor) AddJob added in v2.1.0

func (e *JobExecutor) AddJob(j interface{}) Job

Add any kind of supported job to the jobExecutor pool and return a Job supported jobs are: - an *exec.Cmd - a runnableFn (func() (string, error)) - a NamedJob any unsupported job type will panic some examples:

// add an *exec.Cmd
cmd := exec.Command("mycommand")
job, err := executor.AddJob(cmd)
// add runnableFn
job, err := executor.AddJob(func() (string, error) {... })
// add named *exec.Cmd
job, err := executor.AddJob(&jobExecutor.NamedJob{"myjob", cmd))
// add named runnableFn
job, err := executor.AddJob(&jobExecutor.NamedJob{"myjob", func() (string, error) {... }})

the returned Job can be used to declare dependencies between Jobs

func (*JobExecutor) AddJobCmds

func (e *JobExecutor) AddJobCmds(cmds ...*exec.Cmd) *JobExecutor

Add multiple job commands to run. This method can be chained.

func (*JobExecutor) AddJobDependency added in v2.1.0

func (e *JobExecutor) AddJobDependency(from Job, to Job) *JobExecutor

Register "from" job as dependent on "to" job

func (*JobExecutor) AddJobFns

func (e *JobExecutor) AddJobFns(fns ...runnableFn) *JobExecutor

Add one or more job function to run (func() (string, error)). This method can be chained.

func (*JobExecutor) AddJobs added in v2.1.0

func (e *JobExecutor) AddJobs(jobs ...interface{}) []Job

same as AddJob but for multiple jobs at once it will panic on invalid job, and return a slice of added Jobs

func (*JobExecutor) AddNamedJobCmd

func (e *JobExecutor) AddNamedJobCmd(name string, cmd *exec.Cmd) *JobExecutor

Add a job command and set its output display name. This method can be chained.

func (*JobExecutor) AddNamedJobFn

func (e *JobExecutor) AddNamedJobFn(name string, fn runnableFn) *JobExecutor

Add a job function and set its output display name. This method can be chained.

func (*JobExecutor) DagExecute added in v2.1.0

func (e *JobExecutor) DagExecute() JobsError

func (*JobExecutor) Execute

func (e *JobExecutor) Execute() JobsError

Effectively execute jobs and return collected errors as JobsError

func (*JobExecutor) GetDot added in v2.1.1

func (e *JobExecutor) GetDot() string

return a graphviz dot representation of the execution graph you can render it using graphviz or pasting output to https://dreampuf.github.io/GraphvizOnline/

func (*JobExecutor) IsAcyclic added in v2.1.0

func (e *JobExecutor) IsAcyclic() bool

Check that the jobs registered in the executor don't make a cyclic dependency (use Kahn's topological sort algorithm)

func (*JobExecutor) Len

func (e *JobExecutor) Len() int

Return the total number of jobs added to the jobExecutor

func (*JobExecutor) OnJobDone

func (e *JobExecutor) OnJobDone(fn jobEventHandler) *JobExecutor

Add a handler which will be called after a job is terminated

func (*JobExecutor) OnJobStart

func (e *JobExecutor) OnJobStart(fn jobEventHandler) *JobExecutor

Add a handler which will be called before a job is started

func (*JobExecutor) OnJobsDone

func (e *JobExecutor) OnJobsDone(fn jobsEventHandler) *JobExecutor

Add a handler which will be called after all jobs are terminated

func (*JobExecutor) OnJobsStart

func (e *JobExecutor) OnJobsStart(fn jobsEventHandler) *JobExecutor

Add a handler which will be called before any jobs is started

func (*JobExecutor) WithFifoOutput

func (e *JobExecutor) WithFifoOutput() *JobExecutor

Display full jobStatus as they arrive

func (*JobExecutor) WithInterleavedOutput added in v2.1.2

func (e *JobExecutor) WithInterleavedOutput() *JobExecutor

Print stdout and stderr of command directly to stdout as they arrive prefixing the ouput with the job name It overrides cmd.Stdin and cmd.Stdout so it won't work well with other With*Output methods that rely on collecting them to display them later (typically WithOrderedOutput will have nothing to display)

func (*JobExecutor) WithOngoingStatusOutput

func (e *JobExecutor) WithOngoingStatusOutput() *JobExecutor

Display a job status report updated each time a job start or end be carefull when dealing with other handler that generate output as it will potentially break progress output

func (*JobExecutor) WithOrderedOutput

func (e *JobExecutor) WithOrderedOutput() *JobExecutor

Display doneReport when all jobs are Done

func (*JobExecutor) WithProgressBarOutput

func (e *JobExecutor) WithProgressBarOutput(length int, keepOnDone bool, colorEscSeq string) *JobExecutor

- length is the number of characters used to print the progress bar - keepOnDone determines if the progress bar should be kept on the screen when done or not - colorEscSeq is an ANSII terminal escape sequence ie: "\033[32m"

func (*JobExecutor) WithStartOutput

func (e *JobExecutor) WithStartOutput() *JobExecutor

Output a line to say a job is starting

func (*JobExecutor) WithStartSummary

func (e *JobExecutor) WithStartSummary() *JobExecutor

Output a summary of jobs that will be run

type JobList

type JobList []*job

type JobsError

type JobsError map[int]error

Map indexes correspond to job index in the queue

func (JobsError) Error

func (es JobsError) Error() string

func (JobsError) Len added in v2.1.0

func (es JobsError) Len() int

func (JobsError) String

func (es JobsError) String() string

type NamedJob added in v2.1.0

type NamedJob struct {
	Name string
	// must be *execCmd or runnableFn
	Job interface{}
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL