Documentation ¶
Overview ¶
Package scipipe is a library for writing scientific workflows (sometimes also called "pipelines") of shell commands that depend on each other, in the Go programming languages. It was initially designed for problems in cheminformatics and bioinformatics, but should apply equally well to any domain involving complex pipelines of interdependent shell commands.
Index ¶
- Constants
- Variables
- func Check(err error)
- func CheckWithMsg(err error, errMsg string)
- func ExecCmd(cmd string) string
- func Fail(vs ...interface{})
- func Failf(msg string, vs ...interface{})
- func FinalizePaths(tempExecDir string, ips ...*FileIP) error
- func InitLog(traceHandle io.Writer, debugHandle io.Writer, infoHandle io.Writer, ...)
- func InitLogAudit()
- func InitLogAuditToFile(filePath string)
- func InitLogDebug()
- func InitLogError()
- func InitLogInfo()
- func InitLogWarning()
- type AuditInfo
- type BaseIP
- type BaseProcess
- func (p *BaseProcess) Audit(msg interface{})
- func (p *BaseProcess) Auditf(msg string, parts ...interface{})
- func (p *BaseProcess) CloseAllOutPorts()
- func (p *BaseProcess) CloseOutParamPorts()
- func (p *BaseProcess) CloseOutPorts()
- func (p *BaseProcess) DeleteInParamPort(portName string)
- func (p *BaseProcess) DeleteInPort(portName string)
- func (p *BaseProcess) DeleteOutParamPort(portName string)
- func (p *BaseProcess) DeleteOutPort(portName string)
- func (p *BaseProcess) Fail(msg interface{})
- func (p *BaseProcess) Failf(msg string, parts ...interface{})
- func (p *BaseProcess) InParamPort(portName string) *InParamPort
- func (p *BaseProcess) InParamPorts() map[string]*InParamPort
- func (p *BaseProcess) InPort(portName string) *InPort
- func (p *BaseProcess) InPorts() map[string]*InPort
- func (p *BaseProcess) InitInParamPort(proc WorkflowProcess, portName string)
- func (p *BaseProcess) InitInPort(proc WorkflowProcess, portName string)
- func (p *BaseProcess) InitOutParamPort(proc WorkflowProcess, portName string)
- func (p *BaseProcess) InitOutPort(proc WorkflowProcess, portName string)
- func (p *BaseProcess) Name() string
- func (p *BaseProcess) OutParamPort(portName string) *OutParamPort
- func (p *BaseProcess) OutParamPorts() map[string]*OutParamPort
- func (p *BaseProcess) OutPort(portName string) *OutPort
- func (p *BaseProcess) OutPorts() map[string]*OutPort
- func (p *BaseProcess) Ready() (isReady bool)
- func (p *BaseProcess) Workflow() *Workflow
- type FileIP
- func (ip *FileIP) AddTag(k string, v string)
- func (ip *FileIP) AddTags(tags map[string]string)
- func (ip *FileIP) AuditFilePath() string
- func (ip *FileIP) AuditInfo() *AuditInfo
- func (ip *FileIP) CreateFifo()
- func (ip *FileIP) Exists() bool
- func (ip *FileIP) Fail(msg interface{})
- func (ip *FileIP) Failf(msg string, parts ...interface{})
- func (ip *FileIP) FifoFileExists() bool
- func (ip *FileIP) FifoPath() string
- func (ip *FileIP) FinalizePath()
- func (ip *FileIP) Open() *os.File
- func (ip *FileIP) OpenTemp() *os.File
- func (ip *FileIP) Param(key string) string
- func (ip *FileIP) Path() string
- func (ip *FileIP) Read() []byte
- func (ip *FileIP) RemoveFifo()
- func (ip *FileIP) SetAuditInfo(ai *AuditInfo)
- func (ip *FileIP) Size() int64
- func (ip *FileIP) String() string
- func (ip *FileIP) Tag(k string) string
- func (ip *FileIP) Tags() map[string]string
- func (ip *FileIP) TempDir() string
- func (ip *FileIP) TempFileExists() bool
- func (ip *FileIP) TempPath() string
- func (ip *FileIP) UnMarshalJSON(v interface{})
- func (ip *FileIP) Write(dat []byte)
- func (ip *FileIP) WriteAuditLogToFile()
- type IP
- type InParamPort
- func (pip *InParamPort) AddRemotePort(pop *OutParamPort)
- func (pip *InParamPort) CloseConnection(popName string)
- func (pt *InParamPort) Fail(msg interface{})
- func (pt *InParamPort) Failf(msg string, parts ...interface{})
- func (pip *InParamPort) From(pop *OutParamPort)
- func (pip *InParamPort) FromFloat(floats ...float64)
- func (pip *InParamPort) FromInt(ints ...int)
- func (pip *InParamPort) FromStr(strings ...string)
- func (pip *InParamPort) Name() string
- func (pip *InParamPort) Process() WorkflowProcess
- func (pip *InParamPort) Ready() bool
- func (pip *InParamPort) Recv() string
- func (pip *InParamPort) Send(param string)
- func (pip *InParamPort) SetProcess(p WorkflowProcess)
- func (pip *InParamPort) SetReady(ready bool)
- type InPort
- func (pt *InPort) AddRemotePort(rpt *OutPort)
- func (pt *InPort) CloseConnection(rptName string)
- func (pt *InPort) Disconnect(rptName string)
- func (pt *InPort) Fail(msg interface{})
- func (pt *InPort) Failf(msg string, parts ...interface{})
- func (pt *InPort) From(rpt *OutPort)
- func (pt *InPort) Name() string
- func (pt *InPort) Process() WorkflowProcess
- func (pt *InPort) Ready() bool
- func (pt *InPort) Recv() *FileIP
- func (pt *InPort) Send(ip *FileIP)
- func (pt *InPort) SetProcess(p WorkflowProcess)
- func (pt *InPort) SetReady(ready bool)
- type OutParamPort
- func (pop *OutParamPort) AddRemotePort(pip *InParamPort)
- func (pop *OutParamPort) Close()
- func (pop *OutParamPort) Disconnect(pipName string)
- func (pt *OutParamPort) Fail(msg interface{})
- func (pt *OutParamPort) Failf(msg string, parts ...interface{})
- func (pop *OutParamPort) Name() string
- func (pop *OutParamPort) Process() WorkflowProcess
- func (pop *OutParamPort) Ready() bool
- func (pop *OutParamPort) Send(param string)
- func (pop *OutParamPort) SetProcess(p WorkflowProcess)
- func (pop *OutParamPort) SetReady(ready bool)
- func (pop *OutParamPort) To(pip *InParamPort)
- type OutPort
- func (pt *OutPort) AddRemotePort(rpt *InPort)
- func (pt *OutPort) Close()
- func (pt *OutPort) Disconnect(rptName string)
- func (pt *OutPort) Fail(msg interface{})
- func (pt *OutPort) Failf(msg string, parts ...interface{})
- func (pt *OutPort) Name() string
- func (pt *OutPort) Process() WorkflowProcess
- func (pt *OutPort) Ready() bool
- func (pt *OutPort) Send(ip *FileIP)
- func (pt *OutPort) SetProcess(p WorkflowProcess)
- func (pt *OutPort) SetReady(ready bool)
- func (pt *OutPort) To(rpt *InPort)
- type PortInfo
- type Process
- func (p *Process) In(portName string) *InPort
- func (p *Process) InParam(portName string) *InParamPort
- func (p *Process) Out(portName string) *OutPort
- func (p *Process) OutParam(portName string) *OutParamPort
- func (p *Process) Run()
- func (p *Process) SetOut(outPortName string, pathPattern string)
- func (p *Process) SetOutFunc(outPortName string, pathFmtFunc func(task *Task) (path string))
- type Sink
- type Task
- func (t *Task) Audit(msg string)
- func (t *Task) Auditf(msg string, parts ...interface{})
- func (t *Task) Execute()
- func (t *Task) Fail(msg interface{})
- func (t *Task) Failf(msg string, parts ...interface{})
- func (t *Task) InIP(portName string) *FileIP
- func (t *Task) InPath(portName string) string
- func (t *Task) OutIP(portName string) *FileIP
- func (t *Task) OutPath(portName string) string
- func (t *Task) Param(portName string) string
- func (t *Task) Tag(tagName string) string
- func (t *Task) TempDir() string
- type Workflow
- func (wf *Workflow) AddProc(proc WorkflowProcess)
- func (wf *Workflow) AddProcs(procs ...WorkflowProcess)
- func (wf *Workflow) Auditf(msg string, parts ...interface{})
- func (wf *Workflow) DecConcurrentTasks(slots int)
- func (wf *Workflow) DotGraph() (dot string)
- func (wf *Workflow) Fail(msg interface{})
- func (wf *Workflow) Failf(msg string, parts ...interface{})
- func (wf *Workflow) IncConcurrentTasks(slots int)
- func (wf *Workflow) Name() string
- func (wf *Workflow) NewProc(procName string, commandPattern string) *Process
- func (wf *Workflow) PlotGraph(filePath string)
- func (wf *Workflow) PlotGraphPDF(filePath string)
- func (wf *Workflow) Proc(procName string) WorkflowProcess
- func (wf *Workflow) Procs() map[string]WorkflowProcess
- func (wf *Workflow) ProcsSorted() []WorkflowProcess
- func (wf *Workflow) Run()
- func (wf *Workflow) RunTo(finalProcNames ...string)
- func (wf *Workflow) RunToProcs(finalProcs ...WorkflowProcess)
- func (wf *Workflow) RunToRegex(procNamePatterns ...string)
- func (wf *Workflow) SetSink(sink *Sink)
- func (wf *Workflow) Sink() *Sink
- type WorkflowPlotConf
- type WorkflowProcess
Constants ¶
const FSRootPlaceHolder = "__fsroot__"
FSRootPlaceHolder is a string to use instead of an initial '/', to indicate a path that belongs to the absolute root
const (
// Version is the SciPipe version in string format
Version = "0.12.0"
)
Variables ¶
var ( // Trace is a log handler for extremely detailed level logs. It is so far // sparely used in scipipe. Trace *log.Logger // Debug is a log handler for debugging level logs Debug *log.Logger // Info is a log handler for information level logs Info *log.Logger // Audit is a log handler for audit level logs Audit *log.Logger // Warning is a log handler for warning level logs Warning *log.Logger // Error is a log handler for error level logs Error *log.Logger )
var (
BUFSIZE = 128
)
Functions ¶
func CheckWithMsg ¶ added in v0.6.1
CheckWithMsg checks the error err, and prints both the original error message, and a custom one provided in errMsg
func Fail ¶ added in v0.6.1
func Fail(vs ...interface{})
Fail logs the error message, so that it will be possible to improve error messages in one place
func Failf ¶ added in v0.6.1
func Failf(msg string, vs ...interface{})
Failf is like Fail but with msg being a formatter string for the message and vs being items to format into the message
func FinalizePaths ¶ added in v0.12.0
FinalizePaths renames temporary output files/directories to their proper paths. It is called both from Task, and from Process that implement cutom execution schedule.
func InitLog ¶
func InitLog( traceHandle io.Writer, debugHandle io.Writer, infoHandle io.Writer, auditHandle io.Writer, warningHandle io.Writer, errorHandle io.Writer)
InitLog initiates logging handlers
func InitLogAuditToFile ¶ added in v0.6.4
func InitLogAuditToFile(filePath string)
InitLogAuditToFile initiate logging with level=AUDIT, and write that to fileName
Types ¶
type AuditInfo ¶
type AuditInfo struct { ID string ProcessName string Command string Params map[string]string Tags map[string]string StartTime time.Time FinishTime time.Time ExecTimeNS time.Duration OutFiles map[string]string Upstream map[string]*AuditInfo }
AuditInfo contains structured audit/provenance logging information for a particular task (invocation), to go with all outgoing IPs from that task
func UnmarshalAuditInfoJSONFile ¶ added in v0.8.0
UnmarshalAuditInfoJSONFile returns an AuditInfo object from an AuditInfo .json file
type BaseIP ¶ added in v0.6.1
type BaseIP struct {
// contains filtered or unexported fields
}
BaseIP contains foundational functionality which all IPs need to implement. It is meant to be embedded into other IP implementations.
type BaseProcess ¶ added in v0.5.1
type BaseProcess struct {
// contains filtered or unexported fields
}
BaseProcess provides a skeleton for processes, such as the main Process component, and the custom components in the scipipe/components library
func NewBaseProcess ¶ added in v0.5.1
func NewBaseProcess(wf *Workflow, name string) BaseProcess
NewBaseProcess returns a new BaseProcess, connected to the provided workflow, and with the name name
func (*BaseProcess) Audit ¶ added in v0.12.0
func (p *BaseProcess) Audit(msg interface{})
func (*BaseProcess) Auditf ¶ added in v0.12.0
func (p *BaseProcess) Auditf(msg string, parts ...interface{})
func (*BaseProcess) CloseAllOutPorts ¶ added in v0.6.1
func (p *BaseProcess) CloseAllOutPorts()
CloseAllOutPorts closes all normal-, and parameter out ports
func (*BaseProcess) CloseOutParamPorts ¶ added in v0.8.0
func (p *BaseProcess) CloseOutParamPorts()
CloseOutParamPorts closes all parameter out-ports
func (*BaseProcess) CloseOutPorts ¶ added in v0.6.1
func (p *BaseProcess) CloseOutPorts()
CloseOutPorts closes all (normal) out-ports
func (*BaseProcess) DeleteInParamPort ¶ added in v0.8.0
func (p *BaseProcess) DeleteInParamPort(portName string)
DeleteInParamPort deletes a InParamPort object from the process
func (*BaseProcess) DeleteInPort ¶ added in v0.5.1
func (p *BaseProcess) DeleteInPort(portName string)
DeleteInPort deletes an InPort object from the process
func (*BaseProcess) DeleteOutParamPort ¶ added in v0.8.0
func (p *BaseProcess) DeleteOutParamPort(portName string)
DeleteOutParamPort deletes a OutParamPort object from the process
func (*BaseProcess) DeleteOutPort ¶ added in v0.5.1
func (p *BaseProcess) DeleteOutPort(portName string)
DeleteOutPort deletes a OutPort object from the process
func (*BaseProcess) Fail ¶ added in v0.11.0
func (p *BaseProcess) Fail(msg interface{})
Fail fails with a message that includes the process name
func (*BaseProcess) Failf ¶ added in v0.10.2
func (p *BaseProcess) Failf(msg string, parts ...interface{})
Failf fails with a message that includes the process name
func (*BaseProcess) InParamPort ¶ added in v0.8.0
func (p *BaseProcess) InParamPort(portName string) *InParamPort
InParamPort returns the parameter port with name portName
func (*BaseProcess) InParamPorts ¶ added in v0.8.0
func (p *BaseProcess) InParamPorts() map[string]*InParamPort
InParamPorts returns all parameter in-ports of the process
func (*BaseProcess) InPort ¶ added in v0.5.1
func (p *BaseProcess) InPort(portName string) *InPort
InPort returns the in-port with name portName
func (*BaseProcess) InPorts ¶ added in v0.5.1
func (p *BaseProcess) InPorts() map[string]*InPort
InPorts returns a map of all the in-ports of the process, keyed by their names
func (*BaseProcess) InitInParamPort ¶ added in v0.8.0
func (p *BaseProcess) InitInParamPort(proc WorkflowProcess, portName string)
InitInParamPort adds the parameter port paramPort with name portName
func (*BaseProcess) InitInPort ¶ added in v0.5.1
func (p *BaseProcess) InitInPort(proc WorkflowProcess, portName string)
InitInPort adds the in-port port to the process, with name portName
func (*BaseProcess) InitOutParamPort ¶ added in v0.8.0
func (p *BaseProcess) InitOutParamPort(proc WorkflowProcess, portName string)
InitOutParamPort initializes the parameter port paramPort with name portName to the process We need to supply the concrete process used here as well, since this method might be used as part of an embedded struct, meaning that the process in the receiver is just the *BaseProcess, which doesn't suffice.
func (*BaseProcess) InitOutPort ¶ added in v0.5.1
func (p *BaseProcess) InitOutPort(proc WorkflowProcess, portName string)
InitOutPort adds the out-port port to the process, with name portName
func (*BaseProcess) Name ¶ added in v0.5.1
func (p *BaseProcess) Name() string
Name returns the name of the process
func (*BaseProcess) OutParamPort ¶ added in v0.8.0
func (p *BaseProcess) OutParamPort(portName string) *OutParamPort
OutParamPort returns the parameter port with name portName
func (*BaseProcess) OutParamPorts ¶ added in v0.8.0
func (p *BaseProcess) OutParamPorts() map[string]*OutParamPort
OutParamPorts returns all parameter out-ports of the process
func (*BaseProcess) OutPort ¶ added in v0.5.1
func (p *BaseProcess) OutPort(portName string) *OutPort
OutPort returns the out-port with name portName
func (*BaseProcess) OutPorts ¶ added in v0.5.1
func (p *BaseProcess) OutPorts() map[string]*OutPort
OutPorts returns a map of all the out-ports of the process, keyed by their names
func (*BaseProcess) Ready ¶ added in v0.8.0
func (p *BaseProcess) Ready() (isReady bool)
Ready checks whether all the process' ports are connected
func (*BaseProcess) Workflow ¶ added in v0.5.1
func (p *BaseProcess) Workflow() *Workflow
Workflow returns the workflow the process is connected to
type FileIP ¶ added in v0.6.1
FileIP (Short for "Information Packet" in Flow-Based Programming terminology) contains information and helper methods for a physical file on a normal disk.
func (*FileIP) AuditFilePath ¶ added in v0.6.1
AuditFilePath returns the file path of the audit info file for the FileIP
func (*FileIP) CreateFifo ¶ added in v0.6.1
func (ip *FileIP) CreateFifo()
CreateFifo creates a FIFO file for the FileIP
func (*FileIP) FifoFileExists ¶ added in v0.6.1
FifoFileExists checks if the FIFO-file (named pipe file) exists
func (*FileIP) FifoPath ¶ added in v0.6.1
FifoPath returns the path to use when a FIFO file is used instead of a normal file
func (*FileIP) FinalizePath ¶ added in v0.12.0
func (ip *FileIP) FinalizePath()
FinalizePath renames the temporary file name to the final file name, thus enabling to separate unfinished, and finished files
func (*FileIP) OpenTemp ¶ added in v0.6.1
OpenTemp opens the temp file and returns a file handle (*os.File)
func (*FileIP) Param ¶ added in v0.6.1
Param returns the parameter named key, from the IPs audit info
func (*FileIP) Read ¶ added in v0.6.1
Read reads the whole content of the file and returns the content as a byte array
func (*FileIP) RemoveFifo ¶ added in v0.6.1
func (ip *FileIP) RemoveFifo()
RemoveFifo removes the FIFO file, if it exists
func (*FileIP) SetAuditInfo ¶ added in v0.6.1
SetAuditInfo sets the AuditInfo struct for the FileIP
func (*FileIP) Tag ¶ added in v0.8.0
Tag returns the tag for the tag with key k from the IPs audit info
func (*FileIP) TempDir ¶ added in v0.8.0
TempDir returns the path to a temporary directory where outputs are written
func (*FileIP) TempFileExists ¶ added in v0.6.1
TempFileExists checks if the temp-file exists
func (*FileIP) UnMarshalJSON ¶ added in v0.6.1
func (ip *FileIP) UnMarshalJSON(v interface{})
UnMarshalJSON is a helper function to unmarshal the content of the IPs file to the interface v
func (*FileIP) Write ¶ added in v0.6.1
Write writes a byte array ([]byte) to the file's temp file path
func (*FileIP) WriteAuditLogToFile ¶ added in v0.6.1
func (ip *FileIP) WriteAuditLogToFile()
WriteAuditLogToFile writes the audit log to its designated file
type IP ¶ added in v0.5.1
type IP interface { ID() string FinalizePath() }
IP Is the base interface which all other IPs need to adhere to
type InParamPort ¶ added in v0.8.0
type InParamPort struct { Chan chan string RemotePorts map[string]*OutParamPort // contains filtered or unexported fields }
InParamPort is an in-port for parameter values of string type
func NewInParamPort ¶ added in v0.8.0
func NewInParamPort(name string) *InParamPort
NewInParamPort returns a new InParamPort
func (*InParamPort) AddRemotePort ¶ added in v0.8.0
func (pip *InParamPort) AddRemotePort(pop *OutParamPort)
AddRemotePort adds a remote OutParamPort to the InParamPort
func (*InParamPort) CloseConnection ¶ added in v0.8.0
func (pip *InParamPort) CloseConnection(popName string)
CloseConnection closes the connection to the remote out-port with name popName, on the InParamPort
func (*InParamPort) Fail ¶ added in v0.12.0
func (pt *InParamPort) Fail(msg interface{})
Fail fails with a message that includes the process name
func (*InParamPort) Failf ¶ added in v0.12.0
func (pt *InParamPort) Failf(msg string, parts ...interface{})
Failf fails with a message that includes the process name
func (*InParamPort) From ¶ added in v0.8.0
func (pip *InParamPort) From(pop *OutParamPort)
From connects one parameter port with another one
func (*InParamPort) FromFloat ¶ added in v0.8.0
func (pip *InParamPort) FromFloat(floats ...float64)
FromFloat feeds one or more parameters of type float64 to the param port
func (*InParamPort) FromInt ¶ added in v0.8.0
func (pip *InParamPort) FromInt(ints ...int)
FromInt feeds one or more parameters of type int to the param port
func (*InParamPort) FromStr ¶ added in v0.8.0
func (pip *InParamPort) FromStr(strings ...string)
FromStr feeds one or more parameters of type string to a port
func (*InParamPort) Name ¶ added in v0.8.0
func (pip *InParamPort) Name() string
Name returns the name of the InParamPort
func (*InParamPort) Process ¶ added in v0.8.0
func (pip *InParamPort) Process() WorkflowProcess
Process returns the process that is connected to the port
func (*InParamPort) Ready ¶ added in v0.8.0
func (pip *InParamPort) Ready() bool
Ready tells whether the port is ready or not
func (*InParamPort) Recv ¶ added in v0.8.0
func (pip *InParamPort) Recv() string
Recv receiveds a param value over the ports connection
func (*InParamPort) Send ¶ added in v0.8.0
func (pip *InParamPort) Send(param string)
Send sends IPs to the in-port, and is supposed to be called from the remote (out-) port, to send to this in-port
func (*InParamPort) SetProcess ¶ added in v0.8.0
func (pip *InParamPort) SetProcess(p WorkflowProcess)
SetProcess sets the process of the port to p
func (*InParamPort) SetReady ¶ added in v0.8.0
func (pip *InParamPort) SetReady(ready bool)
SetReady sets the ready status of the InParamPort
type InPort ¶ added in v0.5.1
type InPort struct { Chan chan *FileIP RemotePorts map[string]*OutPort // contains filtered or unexported fields }
InPort represents a pluggable connection to multiple out-ports from other processes, from its own process, and with which it is communicating via channels under the hood
func (*InPort) AddRemotePort ¶ added in v0.5.1
AddRemotePort adds a remote OutPort to the InPort
func (*InPort) CloseConnection ¶ added in v0.5.1
CloseConnection closes the connection to the remote out-port with name rptName, on the InPort
func (*InPort) Disconnect ¶ added in v0.5.1
Disconnect disconnects the (out-)port with name rptName, from the InPort
func (*InPort) Fail ¶ added in v0.12.0
func (pt *InPort) Fail(msg interface{})
Fail fails with a message that includes the process name
func (*InPort) Process ¶ added in v0.5.1
func (pt *InPort) Process() WorkflowProcess
Process returns the process connected to the port
func (*InPort) Send ¶ added in v0.5.1
Send sends IPs to the in-port, and is supposed to be called from the remote (out-) port, to send to this in-port
func (*InPort) SetProcess ¶ added in v0.5.1
func (pt *InPort) SetProcess(p WorkflowProcess)
SetProcess sets the process of the port to p
type OutParamPort ¶ added in v0.8.0
type OutParamPort struct { RemotePorts map[string]*InParamPort // contains filtered or unexported fields }
OutParamPort is an out-port for parameter values of string type
func NewOutParamPort ¶ added in v0.8.0
func NewOutParamPort(name string) *OutParamPort
NewOutParamPort returns a new OutParamPort
func (*OutParamPort) AddRemotePort ¶ added in v0.8.0
func (pop *OutParamPort) AddRemotePort(pip *InParamPort)
AddRemotePort adds a remote InParamPort to the OutParamPort
func (*OutParamPort) Close ¶ added in v0.8.0
func (pop *OutParamPort) Close()
Close closes the connection between this port and all the ports it is connected to. If this port is the last connected port to an in-port, that in-ports channel will also be closed.
func (*OutParamPort) Disconnect ¶ added in v0.8.0
func (pop *OutParamPort) Disconnect(pipName string)
Disconnect disonnects the (in-)port with name rptName, from the OutParamPort
func (*OutParamPort) Fail ¶ added in v0.12.0
func (pt *OutParamPort) Fail(msg interface{})
Fail fails with a message that includes the process name
func (*OutParamPort) Failf ¶ added in v0.12.0
func (pt *OutParamPort) Failf(msg string, parts ...interface{})
Failf fails with a message that includes the process name
func (*OutParamPort) Name ¶ added in v0.8.0
func (pop *OutParamPort) Name() string
Name returns the name of the OutParamPort
func (*OutParamPort) Process ¶ added in v0.8.0
func (pop *OutParamPort) Process() WorkflowProcess
Process returns the process that is connected to the port
func (*OutParamPort) Ready ¶ added in v0.8.0
func (pop *OutParamPort) Ready() bool
Ready tells whether the port is ready or not
func (*OutParamPort) Send ¶ added in v0.8.0
func (pop *OutParamPort) Send(param string)
Send sends an FileIP to all the in-ports connected to the OutParamPort
func (*OutParamPort) SetProcess ¶ added in v0.8.0
func (pop *OutParamPort) SetProcess(p WorkflowProcess)
SetProcess sets the process of the port to p
func (*OutParamPort) SetReady ¶ added in v0.8.0
func (pop *OutParamPort) SetReady(ready bool)
SetReady sets the ready status of the OutParamPort
func (*OutParamPort) To ¶ added in v0.8.0
func (pop *OutParamPort) To(pip *InParamPort)
To connects an InParamPort to the OutParamPort
type OutPort ¶ added in v0.5.1
OutPort represents a pluggable connection to multiple in-ports from other processes, from its own process, and with which it is communicating via channels under the hood
func NewOutPort ¶ added in v0.5.1
NewOutPort returns a new OutPort struct
func (*OutPort) AddRemotePort ¶ added in v0.5.1
AddRemotePort adds a remote InPort to the OutPort
func (*OutPort) Close ¶ added in v0.5.1
func (pt *OutPort) Close()
Close closes the connection between this port and all the ports it is connected to. If this port is the last connected port to an in-port, that in-ports channel will also be closed.
func (*OutPort) Disconnect ¶ added in v0.5.1
Disconnect disconnects the (in-)port with name rptName, from the OutPort
func (*OutPort) Fail ¶ added in v0.12.0
func (pt *OutPort) Fail(msg interface{})
Fail fails with a message that includes the process name
func (*OutPort) Process ¶ added in v0.5.1
func (pt *OutPort) Process() WorkflowProcess
Process returns the process connected to the port
func (*OutPort) Send ¶ added in v0.5.1
Send sends an FileIP to all the in-ports connected to the OutPort
func (*OutPort) SetProcess ¶ added in v0.5.1
func (pt *OutPort) SetProcess(p WorkflowProcess)
SetProcess sets the process of the port to p
type PortInfo ¶ added in v0.8.0
type PortInfo struct {
// contains filtered or unexported fields
}
PortInfo is a container for various information about process ports
type Process ¶
type Process struct { BaseProcess CommandPattern string PathFuncs map[string]func(*Task) string CustomExecute func(*Task) CoresPerTask int Prepend string Spawn bool PortInfo map[string]*PortInfo }
Process is the central component in SciPipe after Workflow. Processes are long-running "services" that schedules and executes Tasks based on the IPs and parameters received on its in-ports and parameter ports
func NewProc ¶
NewProc returns a new Process, and initializes its ports based on the command pattern.
func (*Process) In ¶ added in v0.5.1
In is a short-form for InPort() (of BaseProcess), which works only on Process processes
func (*Process) InParam ¶ added in v0.8.0
func (p *Process) InParam(portName string) *InParamPort
InParam is a short-form for InParamPort() (of BaseProcess), which works only on Process processes
func (*Process) Out ¶ added in v0.5.1
Out is a short-form for OutPort() (of BaseProcess), which works only on Process processes
func (*Process) OutParam ¶ added in v0.8.0
func (p *Process) OutParam(portName string) *OutParamPort
OutParam is a short-form for OutParamPort() (of BaseProcess), which works only on Process processes
func (*Process) Run ¶
func (p *Process) Run()
Run runs the process by instantiating and executing Tasks for all inputs and parameter values on its in-ports. in the case when there are no inputs or parameter values on the in-ports, it will run just once before it terminates. note that the actual execution of shell commands are done inside Task.Execute, not here.
func (*Process) SetOut ¶ added in v0.8.0
SetOut initializes a port (if it does not already exist), and takes a configuration for its outputs paths via a pattern similar to the command pattern used to create new processes, with placeholder tags. Available placeholder tags to use are: {i:inport_name} {p:param_name} {t:tag_name} An example might be: {i:foo}.replace_with_{p:replacement}.txt ... given that the process contains an in-port named 'foo', and a parameter named 'replacement'. If an out-port with the specified name does not exist, it will be created. This allows to create out-ports for filenames that are created without explicitly stating a filename on the commandline, such as when only submitting a prefix.
type Sink ¶
type Sink struct {
BaseProcess
}
Sink is a simple component that just receives IPs on its In-port without doing anything with them. It is used to drive pipelines of processes
func (*Sink) FromParam ¶ added in v0.8.0
func (p *Sink) FromParam(outParamPort *OutParamPort)
FromParam connects a param-out-port to the sinks param-in-port
type Task ¶ added in v0.5.1
type Task struct { Name string Command string CustomExecute func(*Task) InIPs map[string]*FileIP OutIPs map[string]*FileIP Params map[string]string Tags map[string]string Done chan int Process *Process // contains filtered or unexported fields }
Task represents a single static shell command, or go function, to be executed, and are scheduled and managed by a corresponding Process
func NewTask ¶ added in v0.5.1
func NewTask(workflow *Workflow, process *Process, name string, cmdPat string, inIPs map[string]*FileIP, outPathFuncs map[string]func(*Task) string, portInfos map[string]*PortInfo, params map[string]string, tags map[string]string, prepend string, customExecute func(*Task), cores int) *Task
NewTask instantiates and initializes a new Task
func (*Task) Execute ¶ added in v0.5.1
func (t *Task) Execute()
Execute executes the task (the shell command or go function in CustomExecute)
type Workflow ¶
type Workflow struct { PlotConf WorkflowPlotConf // contains filtered or unexported fields }
Workflow is the centerpiece of the functionality in SciPipe, and is a container for a pipeline of processes making up a workflow. It has various methods for coordination the execution of the pipeline as a whole, such as keeping track of the maxiumum number of concurrent tasks, as well as helper methods for creating new processes, that automatically gets plugged in to the workflow on creation
func NewWorkflow ¶
NewWorkflow returns a new Workflow
func NewWorkflowCustomLogFile ¶ added in v0.6.4
NewWorkflowCustomLogFile returns a new Workflow, with
func (*Workflow) AddProc ¶
func (wf *Workflow) AddProc(proc WorkflowProcess)
AddProc adds a Process to the workflow, to be run when the workflow runs
func (*Workflow) AddProcs ¶
func (wf *Workflow) AddProcs(procs ...WorkflowProcess)
AddProcs takes one or many Processes and adds them to the workflow, to be run when the workflow runs.
func (*Workflow) DecConcurrentTasks ¶
DecConcurrentTasks decreases the conter for how many concurrent tasks are currently running in the workflow
func (*Workflow) DotGraph ¶ added in v0.8.0
DotGraph generates a graph description in DOT format (See https://en.wikipedia.org/wiki/DOT_%28graph_description_language%29) If Workflow.PlotConf.EdgeLabels is set to true, a label containing the in-port and out-port to which edges are connected to, will be printed.
func (*Workflow) IncConcurrentTasks ¶
IncConcurrentTasks increases the conter for how many concurrent tasks are currently running in the workflow
func (*Workflow) NewProc ¶
NewProc returns a new process based on a commandPattern (See the documentation for scipipe.NewProcess for more details about the pattern) and connects the process to the workflow
func (*Workflow) PlotGraphPDF ¶ added in v0.8.0
PlotGraphPDF writes the workflow structure to a dot file, and also runs the graphviz dot command to produce a PDF file (requires graphviz, with the dot command, installed on the system)
func (*Workflow) Proc ¶
func (wf *Workflow) Proc(procName string) WorkflowProcess
Proc returns the process with name procName from the workflow
func (*Workflow) Procs ¶
func (wf *Workflow) Procs() map[string]WorkflowProcess
Procs returns a map of all processes keyed by their names in the workflow
func (*Workflow) ProcsSorted ¶ added in v0.8.0
func (wf *Workflow) ProcsSorted() []WorkflowProcess
ProcsSorted returns the processes of the workflow, in an array, sorted by the process names
func (*Workflow) RunTo ¶ added in v0.5.1
RunTo runs all processes upstream of, and including, the process with names provided as arguments
func (*Workflow) RunToProcs ¶ added in v0.5.1
func (wf *Workflow) RunToProcs(finalProcs ...WorkflowProcess)
RunToProcs runs all processes upstream of, and including, the process strucs provided as arguments
func (*Workflow) RunToRegex ¶ added in v0.6.1
RunToRegex runs all processes upstream of, and including, the process whose name matches any of the provided regexp patterns
type WorkflowPlotConf ¶ added in v0.8.0
type WorkflowPlotConf struct {
EdgeLabels bool
}
WorkflowPlotConf contains configuraiton for plotting the workflow as a graph with graphviz
type WorkflowProcess ¶ added in v0.5.1
type WorkflowProcess interface { Name() string InPorts() map[string]*InPort OutPorts() map[string]*OutPort InParamPorts() map[string]*InParamPort OutParamPorts() map[string]*OutParamPort Ready() bool Run() Fail(interface{}) Failf(string, ...interface{}) }
WorkflowProcess is an interface for processes to be handled by Workflow
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
cmd
|
|
examples
|
|
resequencing
Implementation (work in progress) of the resequencing analysis pipeline used to teach the introductory NGS bioinformatics analysis course at SciLifeLab as described on this page: https://scilifelab.github.io/courses/ngsintro/1502/labs/resequencing-analysis Prerequisites: Samtools, BWA, Picard, GATK.
|
Implementation (work in progress) of the resequencing analysis pipeline used to teach the introductory NGS bioinformatics analysis course at SciLifeLab as described on this page: https://scilifelab.github.io/courses/ngsintro/1502/labs/resequencing-analysis Prerequisites: Samtools, BWA, Picard, GATK. |
subworkflow
An example that shows how to create a sub-network / sub-workflow that can be used as a component
|
An example that shows how to create a sub-network / sub-workflow that can be used as a component |