tasks

package module
v0.0.0-...-90a8f14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 31, 2023 License: Apache-2.0 Imports: 26 Imported by: 14

README

tasks is a workflow management system, designed primarily to run jobs on a High Performance Computer (HPC) running PBS Pro or Slurm. Jobs can optionally be run inside Singularity containers. This is a very specific set of conditions, and there are many other workflow management systems already available. tasks is written in Go, and so are the workflows that it manages. It was originally started as an exercise is learning the Go language.

tasks is heavily influenced by Queue, a workflow engine by the Broad Institute's GSA group as part of the GATK software.

You may want to look at:

which both have more features and support.

Example Workflow

package main

import (
	"fmt"
	"strings"

	"github.com/jje42/tasks"
)

type CreateInput struct {
	Output string `type:"output"`
	Word   string
}

func (c CreateInput) AnalysisName() string {
	return "CreateInput"
}

func (c CreateInput) Command() string {
	return fmt.Sprintf("echo %s >%s", c.Word, c.Output)
}

func (c CreateInput) Resources() tasks.Resources {
	return tasks.Resources{CPUs: 1, Memory: 1, Time: 1, Container: "docker://debian:10"}
}

type ToUpper struct {
	Input  string `type:"input"`
	Output string `type:"output"`
}

func (c ToUpper) AnalysisName() string {
	return "ToUpper"
}

func (c ToUpper) Command() string {
	return fmt.Sprintf(`cat %s | tr '[:lower:]' '[:upper:]' >%s`, c.Input, c.Output)
}

func (c ToUpper) Resources() tasks.Resources {
	return tasks.Resources{CPUs: 1, Memory: 1, Time: 1, Container: "docker://debian:10"}
}

type Merge struct {
	Inputs []string `type:"input"`
	Output string   `type:"output"`
}

func (c Merge) AnalysisName() string {
	return "merge"
}

func (c Merge) Command() string {
	return fmt.Sprintf(`cat %s >%s`, strings.Join(c.Inputs, " "), c.Output)
}

func (c Merge) Resources() tasks.Resources {
	return tasks.Resources{CPUs: 1, Memory: 1, Time: 1, Container: "docker://debian:10"}
}

func main() {
	queue := tasks.Queue{}

	queue.Add(&CreateInput{Word: "hello", Output: "input1.txt"})
	queue.Add(&CreateInput{Word: "world", Output: "input2.txt"})
	queue.Add(&ToUpper{Input: "input1.txt", Output: "output1.txt"})
	queue.Add(&ToUpper{Input: "input2.txt", Output: "output2.txt"})
	queue.Add(&Merge{Inputs: []string{"output1.txt", "output2.txt"}, Output: "final.txt"})

	if err := queue.Run(); err != nil {
		log.Fatal(err)
	}
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewJobreport

func NewJobreport(w io.Writer) (jobReport, error)

func ReadFOFN

func ReadFOFN(fn string) ([]string, error)

ReadFOFN reads a file of filenames and returns them as a string slice. It does not check that the file exist or the user has permission to read them.

func RenderTemplate

func RenderTemplate(tpl string, object interface{}) string

RenderTemplate renders the text/template tpl using the data from object. This must succeed and return a string, panics on error.

func RunWorkflow

func RunWorkflow(fn string) error

should this be in the tasks package to make in easier for users to run workflows?

func SafeWriteConfigAs

func SafeWriteConfigAs(fn string) error

func TasksDir

func TasksDir() string

Types

type Commander

type Commander interface {
	AnalysisName() string
	Command() string
	Resources() Resources
}

type DummyRunner

type DummyRunner struct{}

DummyRunner does not actually run jobs, it just accepts jobs to run and always reports that they completed successfully.

func (DummyRunner) Completed

func (r DummyRunner) Completed(j *job) (bool, error)

func (DummyRunner) CompletedSuccessfully

func (r DummyRunner) CompletedSuccessfully(j *job) (bool, error)

func (DummyRunner) Kill

func (r DummyRunner) Kill(j *job) error

func (DummyRunner) ResourcesUsed

func (r DummyRunner) ResourcesUsed(j *job) (resourcesUsed, error)

func (DummyRunner) Run

func (r DummyRunner) Run(cxt executionContext) error

type LocalRunner

type LocalRunner struct {
	// contains filtered or unexported fields
}

func NewLocalRunner

func NewLocalRunner() *LocalRunner

func (*LocalRunner) Completed

func (r *LocalRunner) Completed(j *job) (bool, error)

func (*LocalRunner) CompletedSuccessfully

func (r *LocalRunner) CompletedSuccessfully(j *job) (bool, error)

func (*LocalRunner) Kill

func (r *LocalRunner) Kill(j *job) error

func (*LocalRunner) ResourcesUsed

func (r *LocalRunner) ResourcesUsed(j *job) (resourcesUsed, error)

func (*LocalRunner) Run

func (r *LocalRunner) Run(cxt executionContext) error

type Options

type Options struct {
	Log              string
	ReportLog        string
	StartFromScratch bool
	Quiet            bool
	Tasksdir         string
	Tmpdir           string
	JobRunner        string
	SingularityBin   string
}

type PBSRunner

type PBSRunner struct {
	// contains filtered or unexported fields
}

func NewPBSRunner

func NewPBSRunner() (*PBSRunner, error)

func (*PBSRunner) Completed

func (r *PBSRunner) Completed(j *job) (bool, error)

func (*PBSRunner) CompletedSuccessfully

func (r *PBSRunner) CompletedSuccessfully(j *job) (bool, error)

func (*PBSRunner) Kill

func (r *PBSRunner) Kill(j *job) error

func (*PBSRunner) ResourcesUsed

func (r *PBSRunner) ResourcesUsed(j *job) (resourcesUsed, error)

func (*PBSRunner) Run

func (r *PBSRunner) Run(ctx executionContext) error

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

func (*Queue) Add

func (q *Queue) Add(task ...Commander)

func (*Queue) Run

func (q *Queue) Run(opts ...Options) error

func (*Queue) Tasks

func (q *Queue) Tasks() []Commander

type Resources

type Resources struct {
	CPUs                 int
	Memory               int
	Time                 int
	Container            string
	SingularityExtraArgs string
}

type Runner

type Runner interface {
	// Run(*job) error
	Run(executionContext) error
	Completed(*job) (bool, error)
	CompletedSuccessfully(*job) (bool, error)
	ResourcesUsed(*job) (resourcesUsed, error)
	Kill(*job) error
}

type SlurmRunner

type SlurmRunner struct {
}

func NewSlurmRunner

func NewSlurmRunner() (*SlurmRunner, error)

func (*SlurmRunner) Completed

func (r *SlurmRunner) Completed(j *job) (bool, error)

func (*SlurmRunner) CompletedSuccessfully

func (r *SlurmRunner) CompletedSuccessfully(j *job) (bool, error)

func (*SlurmRunner) Kill

func (r *SlurmRunner) Kill(j *job) error

func (*SlurmRunner) ResourcesUsed

func (r *SlurmRunner) ResourcesUsed(j *job) (resourcesUsed, error)

func (*SlurmRunner) Run

func (r *SlurmRunner) Run(ctx executionContext) error

type Task

type Task struct {
	Name                 string
	CPUs                 int
	Memory               int
	Time                 int
	Container            string
	SingularityExtraArgs string
}

Task provides some default implementations for Commanders. It can be embedded in a struct to partially implement the Commander interface.

func (Task) AnalysisName

func (t Task) AnalysisName() string

func (Task) Resources

func (t Task) Resources() Resources

func (*Task) SetResources

func (t *Task) SetResources(res Resources)

type Tasks

type Tasks struct {
	Commands []Commander
	Outputs  map[string]string
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL