Back to godoc.org
github.com/scipipe/scipipe

package scipipe

v0.9.6
Latest Go to latest
Published: Sep 7, 2019 | License: MIT | Module: github.com/scipipe/scipipe

Overview

Package scipipe is a library for writing scientific workflows (sometimes also called "pipelines") of shell commands that depend on each other, in the Go programming languages. It was initially designed for problems in cheminformatics and bioinformatics, but should apply equally well to any domain involving complex pipelines of interdependent shell commands.

Index

Constants

const (
	// BUFSIZE is the standard buffer size used for channels connecting processes
	BUFSIZE = 128
	// Version is the SciPipe version in string format
	Version = "0.9.6"
)
const FSRootPlaceHolder = "__fsroot__"

FSRootPlaceHolder is a string to use instead of an initial '/', to indicate a path that belongs to the absolute root

Variables

var (
	// Trace is a log handler for extremely detailed level logs. It is so far
	// sparely used in scipipe.
	Trace *log.Logger
	// Debug is a log handler for debugging level logs
	Debug *log.Logger
	// Info is a log handler for information level logs
	Info *log.Logger
	// Audit is a log handler for audit level logs
	Audit *log.Logger
	// Warning is a log handler for warning level logs
	Warning *log.Logger
	// Error is a log handler for error level logs
	Error *log.Logger
)

func AtomizeIPs

func AtomizeIPs(tempExecDir string, ips ...*FileIP)

AtomizeIPs renames temporary output files/directories to their proper paths. It is called both from Task, and from Process that implement cutom execution schedule.

func Check

func Check(err error)

Check checks the error err, and prints the message in the error

func CheckWithMsg

func CheckWithMsg(err error, errMsg string)

CheckWithMsg checks the error err, and prints both the original error message, and a custom one provided in errMsg

func ExecCmd

func ExecCmd(cmd string) string

ExecCmd executes the command cmd, as a shell command via bash

func Fail

func Fail(vs ...interface{})

Fail logs the error message, so that it will be possible to improve error messages in one place

func Failf

func Failf(msg string, vs ...interface{})

Failf is like Fail but with msg being a formatter string for the message and vs being items to format into the message

func InitLog

func InitLog(
	traceHandle io.Writer,
	debugHandle io.Writer,
	infoHandle io.Writer,
	auditHandle io.Writer,
	warningHandle io.Writer,
	errorHandle io.Writer)

InitLog initiates logging handlers

func InitLogAudit

func InitLogAudit()

InitLogAudit initiate logging with level=AUDIT

func InitLogAuditToFile

func InitLogAuditToFile(filePath string)

InitLogAuditToFile initiate logging with level=AUDIT, and write that to fileName

func InitLogDebug

func InitLogDebug()

InitLogDebug initiates logging with level=DEBUG

func InitLogError

func InitLogError()

InitLogError initiates logging with level=ERROR

func InitLogInfo

func InitLogInfo()

InitLogInfo initiates logging with level=INFO

func InitLogWarning

func InitLogWarning()

InitLogWarning initiates logging with level=WARNING

func LogAuditf

func LogAuditf(componentName string, message string, values ...interface{})

LogAuditf logs a pretty printed log message with the AUDIT log level, where componentName is a name of a process, task, workflow or similar that generates the message, while message and values are formatted in the manner of fmt.Printf

func LogAuditln

func LogAuditln(componentName string, message string)

LogAuditln logs a pretty printed log message with the AUDIT log level, where componentName is a name of a process, task, workflow or similar that generates the message, while message is a custom message (can be specified as multiple strings, which will then be formatted in the manner of fmt.Println).

type AuditInfo

type AuditInfo struct {
	ID          string
	ProcessName string
	Command     string
	Params      map[string]string
	Tags        map[string]string
	StartTime   time.Time
	FinishTime  time.Time
	ExecTimeNS  time.Duration
	OutFiles    map[string]string
	Upstream    map[string]*AuditInfo
}

AuditInfo contains structured audit/provenance logging information for a particular task (invocation), to go with all outgoing IPs from that task

func NewAuditInfo

func NewAuditInfo() *AuditInfo

NewAuditInfo returns a new AuditInfo struct

func UnmarshalAuditInfoJSONFile

func UnmarshalAuditInfoJSONFile(fileName string) (auditInfo *AuditInfo)

UnmarshalAuditInfoJSONFile returns an AuditInfo object from an AuditInfo .json file

type BaseIP

type BaseIP struct {
	// contains filtered or unexported fields
}

BaseIP contains foundational functionality which all IPs need to implement. It is meant to be embedded into other IP implementations.

func NewBaseIP

func NewBaseIP(path string) *BaseIP

NewBaseIP creates a new BaseIP

func (*BaseIP) ID

func (ip *BaseIP) ID() string

ID returns a globally unique ID for the IP

type BaseProcess

type BaseProcess struct {
	// contains filtered or unexported fields
}

BaseProcess provides a skeleton for processes, such as the main Process component, and the custom components in the scipipe/components library

func NewBaseProcess

func NewBaseProcess(wf *Workflow, name string) BaseProcess

NewBaseProcess returns a new BaseProcess, connected to the provided workflow, and with the name name

func (*BaseProcess) CloseAllOutPorts

func (p *BaseProcess) CloseAllOutPorts()

CloseAllOutPorts closes all normal-, and parameter out ports

func (*BaseProcess) CloseOutParamPorts

func (p *BaseProcess) CloseOutParamPorts()

CloseOutParamPorts closes all parameter out-ports

func (*BaseProcess) CloseOutPorts

func (p *BaseProcess) CloseOutPorts()

CloseOutPorts closes all (normal) out-ports

func (*BaseProcess) DeleteInParamPort

func (p *BaseProcess) DeleteInParamPort(portName string)

DeleteInParamPort deletes a InParamPort object from the process

func (*BaseProcess) DeleteInPort

func (p *BaseProcess) DeleteInPort(portName string)

DeleteInPort deletes an InPort object from the process

func (*BaseProcess) DeleteOutParamPort

func (p *BaseProcess) DeleteOutParamPort(portName string)

DeleteOutParamPort deletes a OutParamPort object from the process

func (*BaseProcess) DeleteOutPort

func (p *BaseProcess) DeleteOutPort(portName string)

DeleteOutPort deletes a OutPort object from the process

func (*BaseProcess) InParamPort

func (p *BaseProcess) InParamPort(portName string) *InParamPort

InParamPort returns the parameter port with name portName

func (*BaseProcess) InParamPorts

func (p *BaseProcess) InParamPorts() map[string]*InParamPort

InParamPorts returns all parameter in-ports of the process

func (*BaseProcess) InPort

func (p *BaseProcess) InPort(portName string) *InPort

InPort returns the in-port with name portName

func (*BaseProcess) InPorts

func (p *BaseProcess) InPorts() map[string]*InPort

InPorts returns a map of all the in-ports of the process, keyed by their names

func (*BaseProcess) InitInParamPort

func (p *BaseProcess) InitInParamPort(proc WorkflowProcess, portName string)

InitInParamPort adds the parameter port paramPort with name portName

func (*BaseProcess) InitInPort

func (p *BaseProcess) InitInPort(proc WorkflowProcess, portName string)

InitInPort adds the in-port port to the process, with name portName

func (*BaseProcess) InitOutParamPort

func (p *BaseProcess) InitOutParamPort(proc WorkflowProcess, portName string)

InitOutParamPort initializes the parameter port paramPort with name portName to the process We need to supply the concrete process used here as well, since this method might be used as part of an embedded struct, meaning that the process in the receiver is just the *BaseProcess, which doesn't suffice.

func (*BaseProcess) InitOutPort

func (p *BaseProcess) InitOutPort(proc WorkflowProcess, portName string)

InitOutPort adds the out-port port to the process, with name portName

func (*BaseProcess) Name

func (p *BaseProcess) Name() string

Name returns the name of the process

func (*BaseProcess) OutParamPort

func (p *BaseProcess) OutParamPort(portName string) *OutParamPort

OutParamPort returns the parameter port with name portName

func (*BaseProcess) OutParamPorts

func (p *BaseProcess) OutParamPorts() map[string]*OutParamPort

OutParamPorts returns all parameter out-ports of the process

func (*BaseProcess) OutPort

func (p *BaseProcess) OutPort(portName string) *OutPort

OutPort returns the out-port with name portName

func (*BaseProcess) OutPorts

func (p *BaseProcess) OutPorts() map[string]*OutPort

OutPorts returns a map of all the out-ports of the process, keyed by their names

func (*BaseProcess) Ready

func (p *BaseProcess) Ready() (isReady bool)

Ready checks whether all the process' ports are connected

func (*BaseProcess) Workflow

func (p *BaseProcess) Workflow() *Workflow

Workflow returns the workflow the process is connected to

type FileIP

type FileIP struct {
	*BaseIP

	SubStream *InPort
	// contains filtered or unexported fields
}

FileIP (Short for "Information Packet" in Flow-Based Programming terminology) contains information and helper methods for a physical file on a normal disk.

func NewFileIP

func NewFileIP(path string) *FileIP

NewFileIP creates a new FileIP

func (*FileIP) AddTag

func (ip *FileIP) AddTag(k string, v string)

AddTag adds the tag k with value v

func (*FileIP) AddTags

func (ip *FileIP) AddTags(tags map[string]string)

AddTags adds a map of tags to the IPs audit info

func (*FileIP) Atomize

func (ip *FileIP) Atomize()

Atomize renames the temporary file name to the final file name, thus enabling to separate unfinished, and finished files

func (*FileIP) AuditFilePath

func (ip *FileIP) AuditFilePath() string

AuditFilePath returns the file path of the audit info file for the FileIP

func (*FileIP) AuditInfo

func (ip *FileIP) AuditInfo() *AuditInfo

AuditInfo returns the AuditInfo struct for the FileIP

func (*FileIP) CreateFifo

func (ip *FileIP) CreateFifo()

CreateFifo creates a FIFO file for the FileIP

func (*FileIP) Exists

func (ip *FileIP) Exists() bool

Exists checks if the file exists (at its final file name)

func (*FileIP) FifoFileExists

func (ip *FileIP) FifoFileExists() bool

FifoFileExists checks if the FIFO-file (named pipe file) exists

func (*FileIP) FifoPath

func (ip *FileIP) FifoPath() string

FifoPath returns the path to use when a FIFO file is used instead of a normal file

func (*FileIP) Open

func (ip *FileIP) Open() *os.File

Open opens the file and returns a file handle (*os.File)

func (*FileIP) OpenTemp

func (ip *FileIP) OpenTemp() *os.File

OpenTemp opens the temp file and returns a file handle (*os.File)

func (*FileIP) OpenWriteTemp

func (ip *FileIP) OpenWriteTemp() *os.File

OpenWriteTemp opens the file for writing, and returns a file handle (*os.File)

func (*FileIP) Param

func (ip *FileIP) Param(key string) string

Param returns the parameter named key, from the IPs audit info

func (*FileIP) Path

func (ip *FileIP) Path() string

Path returns the (final) path of the physical file

func (*FileIP) Read

func (ip *FileIP) Read() []byte

Read reads the whole content of the file and returns the content as a byte array

func (*FileIP) RemoveFifo

func (ip *FileIP) RemoveFifo()

RemoveFifo removes the FIFO file, if it exists

func (*FileIP) SetAuditInfo

func (ip *FileIP) SetAuditInfo(ai *AuditInfo)

SetAuditInfo sets the AuditInfo struct for the FileIP

func (*FileIP) Size

func (ip *FileIP) Size() int64

Size returns the size of an existing file, in bytes

func (*FileIP) String

func (ip *FileIP) String() string

func (*FileIP) Tag

func (ip *FileIP) Tag(k string) string

Tag returns the tag for the tag with key k from the IPs audit info

func (*FileIP) Tags

func (ip *FileIP) Tags() map[string]string

Tags returns the audit info's tags

func (*FileIP) TempDir

func (ip *FileIP) TempDir() string

TempDir returns the path to a temporary directory where outputs are written

func (*FileIP) TempFileExists

func (ip *FileIP) TempFileExists() bool

TempFileExists checks if the temp-file exists

func (*FileIP) TempPath

func (ip *FileIP) TempPath() string

TempPath returns the temporary path of the physical file

func (*FileIP) UnMarshalJSON

func (ip *FileIP) UnMarshalJSON(v interface{})

UnMarshalJSON is a helper function to unmarshal the content of the IPs file to the interface v

func (*FileIP) Write

func (ip *FileIP) Write(dat []byte)

Write writes a byte array ([]byte) to the file's temp file path

func (*FileIP) WriteAuditLogToFile

func (ip *FileIP) WriteAuditLogToFile()

WriteAuditLogToFile writes the audit log to its designated file

type IP

type IP interface {
	ID() string
	Atomize()
}

IP Is the base interface which all other IPs need to adhere to

type InParamPort

type InParamPort struct {
	Chan chan string

	RemotePorts map[string]*OutParamPort
	// contains filtered or unexported fields
}

InParamPort is an in-port for parameter values of string type

func NewInParamPort

func NewInParamPort(name string) *InParamPort

NewInParamPort returns a new InParamPort

func (*InParamPort) AddRemotePort

func (pip *InParamPort) AddRemotePort(pop *OutParamPort)

AddRemotePort adds a remote OutParamPort to the InParamPort

func (*InParamPort) CloseConnection

func (pip *InParamPort) CloseConnection(popName string)

CloseConnection closes the connection to the remote out-port with name popName, on the InParamPort

func (*InParamPort) From

func (pip *InParamPort) From(pop *OutParamPort)

From connects one parameter port with another one

func (*InParamPort) FromFloat

func (pip *InParamPort) FromFloat(floats ...float64)

FromFloat feeds one or more parameters of type float64 to the param port

func (*InParamPort) FromInt

func (pip *InParamPort) FromInt(ints ...int)

FromInt feeds one or more parameters of type int to the param port

func (*InParamPort) FromStr

func (pip *InParamPort) FromStr(strings ...string)

FromStr feeds one or more parameters of type string to a port

func (*InParamPort) Name

func (pip *InParamPort) Name() string

Name returns the name of the InParamPort

func (*InParamPort) Process

func (pip *InParamPort) Process() WorkflowProcess

Process returns the process that is connected to the port

func (*InParamPort) Ready

func (pip *InParamPort) Ready() bool

Ready tells whether the port is ready or not

func (*InParamPort) Recv

func (pip *InParamPort) Recv() string

Recv receiveds a param value over the ports connection

func (*InParamPort) Send

func (pip *InParamPort) Send(param string)

Send sends IPs to the in-port, and is supposed to be called from the remote (out-) port, to send to this in-port

func (*InParamPort) SetProcess

func (pip *InParamPort) SetProcess(p WorkflowProcess)

SetProcess sets the process of the port to p

func (*InParamPort) SetReady

func (pip *InParamPort) SetReady(ready bool)

SetReady sets the ready status of the InParamPort

type InPort

type InPort struct {
	Chan chan *FileIP

	RemotePorts map[string]*OutPort
	// contains filtered or unexported fields
}

InPort represents a pluggable connection to multiple out-ports from other processes, from its own process, and with which it is communicating via channels under the hood

func NewInPort

func NewInPort(name string) *InPort

NewInPort returns a new InPort struct

func (*InPort) AddRemotePort

func (pt *InPort) AddRemotePort(rpt *OutPort)

AddRemotePort adds a remote OutPort to the InPort

func (*InPort) CloseConnection

func (pt *InPort) CloseConnection(rptName string)

CloseConnection closes the connection to the remote out-port with name rptName, on the InPort

func (*InPort) Disconnect

func (pt *InPort) Disconnect(rptName string)

Disconnect disconnects the (out-)port with name rptName, from the InPort

func (*InPort) From

func (pt *InPort) From(rpt *OutPort)

From connects an OutPort to the InPort

func (*InPort) Name

func (pt *InPort) Name() string

Name returns the name of the InPort

func (*InPort) Process

func (pt *InPort) Process() WorkflowProcess

Process returns the process connected to the port

func (*InPort) Ready

func (pt *InPort) Ready() bool

Ready tells whether the port is ready or not

func (*InPort) Recv

func (pt *InPort) Recv() *FileIP

Recv receives IPs from the port

func (*InPort) Send

func (pt *InPort) Send(ip *FileIP)

Send sends IPs to the in-port, and is supposed to be called from the remote (out-) port, to send to this in-port

func (*InPort) SetProcess

func (pt *InPort) SetProcess(p WorkflowProcess)

SetProcess sets the process of the port to p

func (*InPort) SetReady

func (pt *InPort) SetReady(ready bool)

SetReady sets the ready status of the InPort

type OutParamPort

type OutParamPort struct {
	RemotePorts map[string]*InParamPort
	// contains filtered or unexported fields
}

OutParamPort is an out-port for parameter values of string type

func NewOutParamPort

func NewOutParamPort(name string) *OutParamPort

NewOutParamPort returns a new OutParamPort

func (*OutParamPort) AddRemotePort

func (pop *OutParamPort) AddRemotePort(pip *InParamPort)

AddRemotePort adds a remote InParamPort to the OutParamPort

func (*OutParamPort) Close

func (pop *OutParamPort) Close()

Close closes the connection between this port and all the ports it is connected to. If this port is the last connected port to an in-port, that in-ports channel will also be closed.

func (*OutParamPort) Disconnect

func (pop *OutParamPort) Disconnect(pipName string)

Disconnect disonnects the (in-)port with name rptName, from the OutParamPort

func (*OutParamPort) Name

func (pop *OutParamPort) Name() string

Name returns the name of the OutParamPort

func (*OutParamPort) Process

func (pop *OutParamPort) Process() WorkflowProcess

Process returns the process that is connected to the port

func (*OutParamPort) Ready

func (pop *OutParamPort) Ready() bool

Ready tells whether the port is ready or not

func (*OutParamPort) Send

func (pop *OutParamPort) Send(param string)

Send sends an FileIP to all the in-ports connected to the OutParamPort

func (*OutParamPort) SetProcess

func (pop *OutParamPort) SetProcess(p WorkflowProcess)

SetProcess sets the process of the port to p

func (*OutParamPort) SetReady

func (pop *OutParamPort) SetReady(ready bool)

SetReady sets the ready status of the OutParamPort

func (*OutParamPort) To

func (pop *OutParamPort) To(pip *InParamPort)

To connects an InParamPort to the OutParamPort

type OutPort

type OutPort struct {
	RemotePorts map[string]*InPort
	// contains filtered or unexported fields
}

OutPort represents a pluggable connection to multiple in-ports from other processes, from its own process, and with which it is communicating via channels under the hood

func NewOutPort

func NewOutPort(name string) *OutPort

NewOutPort returns a new OutPort struct

func (*OutPort) AddRemotePort

func (pt *OutPort) AddRemotePort(rpt *InPort)

AddRemotePort adds a remote InPort to the OutPort

func (*OutPort) Close

func (pt *OutPort) Close()

Close closes the connection between this port and all the ports it is connected to. If this port is the last connected port to an in-port, that in-ports channel will also be closed.

func (*OutPort) Disconnect

func (pt *OutPort) Disconnect(rptName string)

Disconnect disconnects the (in-)port with name rptName, from the OutPort

func (*OutPort) Name

func (pt *OutPort) Name() string

Name returns the name of the OutPort

func (*OutPort) Process

func (pt *OutPort) Process() WorkflowProcess

Process returns the process connected to the port

func (*OutPort) Ready

func (pt *OutPort) Ready() bool

Ready tells whether the port is ready or not

func (*OutPort) Send

func (pt *OutPort) Send(ip *FileIP)

Send sends an FileIP to all the in-ports connected to the OutPort

func (*OutPort) SetProcess

func (pt *OutPort) SetProcess(p WorkflowProcess)

SetProcess sets the process of the port to p

func (*OutPort) SetReady

func (pt *OutPort) SetReady(ready bool)

SetReady sets the ready status of the OutPort

func (*OutPort) To

func (pt *OutPort) To(rpt *InPort)

To connects an InPort to the OutPort

type PortInfo

type PortInfo struct {
	// contains filtered or unexported fields
}

PortInfo is a container for various information about process ports

type Process

type Process struct {
	BaseProcess
	CommandPattern string
	PathFuncs      map[string]func(*Task) string
	CustomExecute  func(*Task)
	CoresPerTask   int
	Prepend        string
	Spawn          bool
	PortInfo       map[string]*PortInfo
}

Process is the central component in SciPipe after Workflow. Processes are long-running "services" that schedules and executes Tasks based on the IPs and parameters received on its in-ports and parameter ports

func NewProc

func NewProc(workflow *Workflow, name string, cmd string) *Process

NewProc returns a new Process, and initializes its ports based on the command pattern.

func (*Process) In

func (p *Process) In(portName string) *InPort

In is a short-form for InPort() (of BaseProcess), which works only on Process processes

func (*Process) InParam

func (p *Process) InParam(portName string) *InParamPort

InParam is a short-form for InParamPort() (of BaseProcess), which works only on Process processes

func (*Process) Out

func (p *Process) Out(portName string) *OutPort

Out is a short-form for OutPort() (of BaseProcess), which works only on Process processes

func (*Process) OutParam

func (p *Process) OutParam(portName string) *OutParamPort

OutParam is a short-form for OutParamPort() (of BaseProcess), which works only on Process processes

func (*Process) Run

func (p *Process) Run()

Run runs the process by instantiating and executing Tasks for all inputs and parameter values on its in-ports. in the case when there are no inputs or parameter values on the in-ports, it will run just once before it terminates. note that the actual execution of shell commands are done inside Task.Execute, not here.

func (*Process) SetOut

func (p *Process) SetOut(outPortName string, pathPattern string)

SetOut initializes a port (if it does not already exist), and takes a configuration for its outputs paths via a pattern similar to the command pattern used to create new processes, with placeholder tags. Available placeholder tags to use are: {i:inport_name} {p:param_name} {t:tag_name} An example might be: {i:foo}.replace_with_{p:replacement}.txt ... given that the process contains an in-port named 'foo', and a parameter named 'replacement'. If an out-port with the specified name does not exist, it will be created. This allows to create out-ports for filenames that are created without explicitly stating a filename on the commandline, such as when only submitting a prefix.

func (*Process) SetOutFunc

func (p *Process) SetOutFunc(outPortName string, pathFmtFunc func(task *Task) (path string))

SetOutFunc takes a function which produces a file path based on data available in *Task, such as concrete file paths and parameter values,

type Sink

type Sink struct {
	BaseProcess
}

Sink is a simple component that just receives IPs on its In-port without doing anything with them. It is used to drive pipelines of processes

func NewSink

func NewSink(wf *Workflow, name string) *Sink

NewSink returns a new Sink component

func (*Sink) From

func (p *Sink) From(outPort *OutPort)

From connects an out-port to the sinks in-port

func (*Sink) FromParam

func (p *Sink) FromParam(outParamPort *OutParamPort)

FromParam connects a param-out-port to the sinks param-in-port

func (*Sink) Run

func (p *Sink) Run()

Run runs the Sink process

type Task

type Task struct {
	Name          string
	Command       string
	CustomExecute func(*Task)
	InIPs         map[string]*FileIP
	OutIPs        map[string]*FileIP
	Params        map[string]string
	Tags          map[string]string
	Done          chan int

	Process *Process
	// contains filtered or unexported fields
}

Task represents a single static shell command, or go function, to be executed, and are scheduled and managed by a corresponding Process

func NewTask

func NewTask(workflow *Workflow, process *Process, name string, cmdPat string, inIPs map[string]*FileIP, outPathFuncs map[string]func(*Task) string, portInfos map[string]*PortInfo, params map[string]string, tags map[string]string, prepend string, customExecute func(*Task), cores int) *Task

NewTask instantiates and initializes a new Task

func (*Task) Execute

func (t *Task) Execute()

Execute executes the task (the shell command or go function in CustomExecute)

func (*Task) InIP

func (t *Task) InIP(portName string) *FileIP

InIP returns an IP for the in-port with name portName

func (*Task) InPath

func (t *Task) InPath(portName string) string

InPath returns the path name of an input file for the task

func (*Task) OutIP

func (t *Task) OutIP(portName string) *FileIP

OutIP returns an IP for the in-port with name portName

func (*Task) OutPath

func (t *Task) OutPath(portName string) string

OutPath returns the path name of an input file for the task

func (*Task) Param

func (t *Task) Param(portName string) string

Param returns the value of a param, for the task

func (*Task) Tag

func (t *Task) Tag(tagName string) string

Tag returns the value of a param, for the task

func (*Task) TempDir

func (t *Task) TempDir() string

TempDir returns a string that is unique to a task, suitable for use in file paths. It is built up by merging all input filenames and parameter values that a task takes as input, joined with dots.

type Workflow

type Workflow struct {
	PlotConf WorkflowPlotConf
	// contains filtered or unexported fields
}

Workflow is the centerpiece of the functionality in SciPipe, and is a container for a pipeline of processes making up a workflow. It has various methods for coordination the execution of the pipeline as a whole, such as keeping track of the maxiumum number of concurrent tasks, as well as helper methods for creating new processes, that automatically gets plugged in to the workflow on creation

func NewWorkflow

func NewWorkflow(name string, maxConcurrentTasks int) *Workflow

NewWorkflow returns a new Workflow

func NewWorkflowCustomLogFile

func NewWorkflowCustomLogFile(name string, maxConcurrentTasks int, logFile string) *Workflow

NewWorkflowCustomLogFile returns a new Workflow, with

func (*Workflow) AddProc

func (wf *Workflow) AddProc(proc WorkflowProcess)

AddProc adds a Process to the workflow, to be run when the workflow runs

func (*Workflow) AddProcs

func (wf *Workflow) AddProcs(procs ...WorkflowProcess)

AddProcs takes one or many Processes and adds them to the workflow, to be run when the workflow runs.

func (*Workflow) DecConcurrentTasks

func (wf *Workflow) DecConcurrentTasks(slots int)

DecConcurrentTasks decreases the conter for how many concurrent tasks are currently running in the workflow

func (*Workflow) DotGraph

func (wf *Workflow) DotGraph() (dot string)

DotGraph generates a graph description in DOT format (See https://en.wikipedia.org/wiki/DOT_%28graph_description_language%29) If Workflow.PlotConf.EdgeLabels is set to true, a label containing the in-port and out-port to which edges are connected to, will be printed.

func (*Workflow) IncConcurrentTasks

func (wf *Workflow) IncConcurrentTasks(slots int)

IncConcurrentTasks increases the conter for how many concurrent tasks are currently running in the workflow

func (*Workflow) Name

func (wf *Workflow) Name() string

Name returns the name of the workflow

func (*Workflow) NewProc

func (wf *Workflow) NewProc(procName string, commandPattern string) *Process

NewProc returns a new process based on a commandPattern (See the documentation for scipipe.NewProcess for more details about the pattern) and connects the process to the workflow

func (*Workflow) PlotGraph

func (wf *Workflow) PlotGraph(filePath string)

PlotGraph writes the workflow structure to a dot file

func (*Workflow) PlotGraphPDF

func (wf *Workflow) PlotGraphPDF(filePath string)

PlotGraphPDF writes the workflow structure to a dot file, and also runs the graphviz dot command to produce a PDF file (requires graphviz, with the dot command, installed on the system)

func (*Workflow) Proc

func (wf *Workflow) Proc(procName string) WorkflowProcess

Proc returns the process with name procName from the workflow

func (*Workflow) Procs

func (wf *Workflow) Procs() map[string]WorkflowProcess

Procs returns a map of all processes keyed by their names in the workflow

func (*Workflow) ProcsSorted

func (wf *Workflow) ProcsSorted() []WorkflowProcess

ProcsSorted returns the processes of the workflow, in an array, sorted by the process names

func (*Workflow) Run

func (wf *Workflow) Run()

Run runs all the processes of the workflow

func (*Workflow) RunTo

func (wf *Workflow) RunTo(finalProcNames ...string)

RunTo runs all processes upstream of, and including, the process with names provided as arguments

func (*Workflow) RunToProcs

func (wf *Workflow) RunToProcs(finalProcs ...WorkflowProcess)

RunToProcs runs all processes upstream of, and including, the process strucs provided as arguments

func (*Workflow) RunToRegex

func (wf *Workflow) RunToRegex(procNamePatterns ...string)

RunToRegex runs all processes upstream of, and including, the process whose name matches any of the provided regexp patterns

func (*Workflow) SetSink

func (wf *Workflow) SetSink(sink *Sink)

SetSink sets the sink of the workflow to the provided sink process

func (*Workflow) Sink

func (wf *Workflow) Sink() *Sink

Sink returns the sink process of the workflow

type WorkflowPlotConf

type WorkflowPlotConf struct {
	EdgeLabels bool
}

WorkflowPlotConf contains configuraiton for plotting the workflow as a graph with graphviz

type WorkflowProcess

type WorkflowProcess interface {
	Name() string
	InPorts() map[string]*InPort
	OutPorts() map[string]*OutPort
	InParamPorts() map[string]*InParamPort
	OutParamPorts() map[string]*OutParamPort
	Ready() bool
	Run()
}

WorkflowProcess is an interface for processes to be handled by Workflow

Documentation was rendered with GOOS=linux and GOARCH=amd64.

Jump to identifier

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to identifier