README
¶
Robust, flexible and resource-efficient pipelines using Go and the commandline
Project links: Documentation & Main Website | Issue Tracker | Chat
Why SciPipe?
- Intuitive: SciPipe works by flowing data through a network of channels and processes
- Flexible: Wrapped command-line programs can be combined with processes in Go
- Convenient: Full control over how your files are named
- Efficient: Workflows are compiled to binary code that run fast
- Parallel: Pipeline paralellism between processes as well as task parallelism for multiple inputs, making efficient use of multiple CPU cores
- Supports streaming: Stream data between programs to avoid wasting disk space
- Easy to debug: Use available Go debugging tools or just
println()
- Portable: Distribute workflows as Go code or as self-contained executable files
Project updates
- Jan 2020: New screencast: "Hello World" scientific workflow in SciPipe
- May 2019: The SciPipe paper published open access in GigaScience: SciPipe: A workflow library for agile development of complex and dynamic bioinformatics pipelines
- Nov 2018: Scientific study using SciPipe: Predicting off-target binding profiles with confidence using Conformal Prediction
- Slides: Presentation on SciPipe and more at Go Stockholm Conference
- Blog post: Provenance reports in Scientific Workflows - going into details about how SciPipe is addressing provenance.
- Blog post: First production workflow run with SciPipe</a
Introduction

SciPipe is a library for writing Scientific Workflows, sometimes also called "pipelines", in the Go programming language.
When you need to run many commandline programs that depend on each other in complex ways, SciPipe helps by making the process of running these programs flexible, robust and reproducible. SciPipe also lets you restart an interrupted run without over-writing already produced output and produces an audit report of what was run, among many other things.
SciPipe is built on the proven principles of Flow-Based Programming (FBP) to achieve maximum flexibility, productivity and agility when designing workflows. Compared to plain dataflow, FBP provides the benefits that processes are fully self-contained, so that a library of re-usable components can be created, and plugged into new workflows ad-hoc.
Similar to other FBP systems, SciPipe workflows can be likened to a network of assembly lines in a factory, where items (files) are flowing through a network of conveyor belts, stopping at different independently running stations (processes) for processing, as depicted in the picture above.
SciPipe was initially created for problems in bioinformatics and cheminformatics, but works equally well for any problem involving pipelines of commandline applications.
Project status: SciPipe pretty stable now, and only very minor API changes might still occur. We have successfully used SciPipe in a handful of both real and experimental projects, and it has had occasional use outside the research group as well.
Known limitations
- There are still a number of missing good-to-have features for workflow design. See the issue tracker for details.
- There is not (yet) support for the Common Workflow Language.
Hello World example
Let's look at an example workflow to get a feel for what writing workflows in SciPipe looks like:
package main
import (
// Import SciPipe, aliased to sp
sp "github.com/scipipe/scipipe"
)
func main() {
// Init workflow and max concurrent tasks
wf := sp.NewWorkflow("hello_world", 4)
// Initialize processes, and file extensions
hello := wf.NewProc("hello", "echo 'Hello ' > {o:out|.txt}")
world := wf.NewProc("world", "echo $(cat {i:in}) World > {o:out|.txt}")
// Define data flow
world.In("in").From(hello.Out("out"))
// Run workflow
wf.Run()
}
Running the example
Let's put the code in a file named hello_world.go
and run it:
$ go run hello_world.go
AUDIT 2018/07/17 21:42:26 | workflow:hello_world | Starting workflow (Writing log to log/scipipe-20180717-214226-hello_world.log)
AUDIT 2018/07/17 21:42:26 | hello | Executing: echo 'Hello ' > hello.out.txt
AUDIT 2018/07/17 21:42:26 | hello | Finished: echo 'Hello ' > hello.out.txt
AUDIT 2018/07/17 21:42:26 | world | Executing: echo $(cat ../hello.out.txt) World > hello.out.txt.world.out.txt
AUDIT 2018/07/17 21:42:26 | world | Finished: echo $(cat ../hello.out.txt) World > hello.out.txt.world.out.txt
AUDIT 2018/07/17 21:42:26 | workflow:hello_world | Finished workflow (Log written to log/scipipe-20180717-214226-hello_world.log)
Let's check what file SciPipe has generated:
$ ls -1 hello*
hello.out.txt
hello.out.txt.audit.json
hello.out.txt.world.out.txt
hello.out.txt.world.out.txt.audit.json
As you can see, it has created a file hello.out.txt
, and hello.out.world.out.txt
, and
an accompanying .audit.json
for each of these files.
Now, let's check the output of the final resulting file:
$ cat hello.out.txt.world.out.txt
Hello World
Now we can rejoice that it contains the text "Hello World", exactly as a proper Hello World example should :)
Now, these were a little long and cumbersome filenames, weren't they? SciPipe gives you very good control over how to name your files, if you don't want to rely on the automatic file naming. For example, we could set the first filename to a static one, and then use the first name as a basis for the file name for the second process, like so:
package main
import (
// Import the SciPipe package, aliased to 'sp'
sp "github.com/scipipe/scipipe"
)
func main() {
// Init workflow with a name, and max concurrent tasks
wf := sp.NewWorkflow("hello_world", 4)
// Initialize processes and set output file paths
hello := wf.NewProc("hello", "echo 'Hello ' > {o:out}")
hello.SetOut("out", "hello.txt")
world := wf.NewProc("world", "echo $(cat {i:in}) World >> {o:out}")
// The modifier 's/.txt//' will replace '.txt' in the input path with ''
world.SetOut("out", "{i:in|s/.txt//}_world.txt")
// Connect network
world.In("in").From(hello.Out("out"))
// Run workflow
wf.Run()
}
Now, if we run this, the file names get a little cleaner:
$ ls -1 hello*
hello.txt
hello.txt.audit.json
hello.txt.world.go
hello.txt.world.txt
hello.txt.world.txt.audit.json
The audit logs
Finally, we could have a look at one of those audit file created:
$ cat hello.txt.world.txt.audit.json
{
"ID": "99i5vxhtd41pmaewc8pr",
"ProcessName": "world",
"Command": "echo $(cat hello.txt) World \u003e\u003e hello.txt.world.txt.tmp/hello.txt.world.txt",
"Params": {},
"Tags": {},
"StartTime": "2018-06-15T19:10:37.955602979+02:00",
"FinishTime": "2018-06-15T19:10:37.959410102+02:00",
"ExecTimeNS": 3000000,
"Upstream": {
"hello.txt": {
"ID": "w4oeiii9h5j7sckq7aqq",
"ProcessName": "hello",
"Command": "echo 'Hello ' \u003e hello.txt.tmp/hello.txt",
"Params": {},
"Tags": {},
"StartTime": "2018-06-15T19:10:37.950032676+02:00",
"FinishTime": "2018-06-15T19:10:37.95468214+02:00",
"ExecTimeNS": 4000000,
"Upstream": {}
}
}
Each such audit-file contains a hierarchic JSON-representation of the full workflow path that was executed in order to produce this file. On the first level is the command that directly produced the corresponding file, and then, indexed by their filenames, under "Upstream", there is a similar chunk describing how all of its input files were generated. This process will be repeated in a recursive way for large workflows, so that, for each file generated by the workflow, there is always a full, hierarchic, history of all the commands run - with their associated metadata - to produce that file.
You can find many more examples in the examples folder in the GitHub repo.
For more information about how to write workflows using SciPipe, and much more, see SciPipe website (scipipe.org)!
More material on SciPipe
- See a poster on SciPipe, presented at the e-Science Academy in Lund, on Oct 12-13 2016.
- See slides from a recent presentation of SciPipe for use in a Bioinformatics setting.
- The architecture of SciPipe is based on an flow-based programming like pattern in pure Go presented in this and this blog posts on Gopher Academy.
Citing SciPipe
If you use SciPipe in academic or scholarly work, please cite the following paper as source:
Lampa S, Dahlö M, Alvarsson J, Spjuth O. SciPipe: A workflow library for agile development of complex and dynamic bioinformatics pipelines Gigascience. 8, 5 (2019). DOI: 10.1093/gigascience/giz044
Acknowledgements
- SciPipe is very heavily dependent on the proven principles form Flow-Based Programming (FBP), as invented by John Paul Morrison. From Flow-based programming, SciPipe uses the ideas of separate network (workflow dependency graph) definition, named in- and out-ports, sub-networks/sub-workflows and bounded buffers (already available in Go's channels) to make writing workflows as easy as possible.
- This library is has been much influenced/inspired also by the GoFlow library by Vladimir Sibirov.
- Thanks to Egon Elbre for helpful input on the design of the internals of the pipeline, and processes, which greatly simplified the implementation.
- This work is financed by faculty grants and other financing for the Pharmaceutical Bioinformatics group of Dept. of Pharmaceutical Biosciences at Uppsala University, and by Swedish Research Council through the Swedish National Bioinformatics Infrastructure Sweden.
- Supervisor for the project is Ola Spjuth.
Related tools
Find below a few tools that are more or less similar to SciPipe that are worth worth checking out before deciding on what tool fits you best (in approximate order of similarity to SciPipe):
Documentation
¶
Overview ¶
Package scipipe is a library for writing scientific workflows (sometimes also called "pipelines") of shell commands that depend on each other, in the Go programming languages. It was initially designed for problems in cheminformatics and bioinformatics, but should apply equally well to any domain involving complex pipelines of interdependent shell commands.
Index ¶
- Constants
- Variables
- func AtomizeIPs(tempExecDir string, ips ...*FileIP)
- func Check(err error)
- func CheckWithMsg(err error, errMsg string)
- func ExecCmd(cmd string) string
- func Fail(vs ...interface{})
- func Failf(msg string, vs ...interface{})
- func InitLog(traceHandle io.Writer, debugHandle io.Writer, infoHandle io.Writer, ...)
- func InitLogAudit()
- func InitLogAuditToFile(filePath string)
- func InitLogDebug()
- func InitLogError()
- func InitLogInfo()
- func InitLogWarning()
- func LogAuditf(componentName string, message string, values ...interface{})
- func LogAuditln(componentName string, message string)
- type AuditInfo
- type BaseIP
- type BaseProcess
- func (p *BaseProcess) CloseAllOutPorts()
- func (p *BaseProcess) CloseOutParamPorts()
- func (p *BaseProcess) CloseOutPorts()
- func (p *BaseProcess) DeleteInParamPort(portName string)
- func (p *BaseProcess) DeleteInPort(portName string)
- func (p *BaseProcess) DeleteOutParamPort(portName string)
- func (p *BaseProcess) DeleteOutPort(portName string)
- func (p *BaseProcess) InParamPort(portName string) *InParamPort
- func (p *BaseProcess) InParamPorts() map[string]*InParamPort
- func (p *BaseProcess) InPort(portName string) *InPort
- func (p *BaseProcess) InPorts() map[string]*InPort
- func (p *BaseProcess) InitInParamPort(proc WorkflowProcess, portName string)
- func (p *BaseProcess) InitInPort(proc WorkflowProcess, portName string)
- func (p *BaseProcess) InitOutParamPort(proc WorkflowProcess, portName string)
- func (p *BaseProcess) InitOutPort(proc WorkflowProcess, portName string)
- func (p *BaseProcess) Name() string
- func (p *BaseProcess) OutParamPort(portName string) *OutParamPort
- func (p *BaseProcess) OutParamPorts() map[string]*OutParamPort
- func (p *BaseProcess) OutPort(portName string) *OutPort
- func (p *BaseProcess) OutPorts() map[string]*OutPort
- func (p *BaseProcess) Ready() (isReady bool)
- func (p *BaseProcess) Workflow() *Workflow
- type FileIP
- func (ip *FileIP) AddTag(k string, v string)
- func (ip *FileIP) AddTags(tags map[string]string)
- func (ip *FileIP) Atomize()
- func (ip *FileIP) AuditFilePath() string
- func (ip *FileIP) AuditInfo() *AuditInfo
- func (ip *FileIP) CreateFifo()
- func (ip *FileIP) Exists() bool
- func (ip *FileIP) FifoFileExists() bool
- func (ip *FileIP) FifoPath() string
- func (ip *FileIP) Open() *os.File
- func (ip *FileIP) OpenTemp() *os.File
- func (ip *FileIP) OpenWriteTemp() *os.File
- func (ip *FileIP) Param(key string) string
- func (ip *FileIP) Path() string
- func (ip *FileIP) Read() []byte
- func (ip *FileIP) RemoveFifo()
- func (ip *FileIP) SetAuditInfo(ai *AuditInfo)
- func (ip *FileIP) Size() int64
- func (ip *FileIP) String() string
- func (ip *FileIP) Tag(k string) string
- func (ip *FileIP) Tags() map[string]string
- func (ip *FileIP) TempDir() string
- func (ip *FileIP) TempFileExists() bool
- func (ip *FileIP) TempPath() string
- func (ip *FileIP) UnMarshalJSON(v interface{})
- func (ip *FileIP) Write(dat []byte)
- func (ip *FileIP) WriteAuditLogToFile()
- type IP
- type InParamPort
- func (pip *InParamPort) AddRemotePort(pop *OutParamPort)
- func (pip *InParamPort) CloseConnection(popName string)
- func (pip *InParamPort) From(pop *OutParamPort)
- func (pip *InParamPort) FromFloat(floats ...float64)
- func (pip *InParamPort) FromInt(ints ...int)
- func (pip *InParamPort) FromStr(strings ...string)
- func (pip *InParamPort) Name() string
- func (pip *InParamPort) Process() WorkflowProcess
- func (pip *InParamPort) Ready() bool
- func (pip *InParamPort) Recv() string
- func (pip *InParamPort) Send(param string)
- func (pip *InParamPort) SetProcess(p WorkflowProcess)
- func (pip *InParamPort) SetReady(ready bool)
- type InPort
- func (pt *InPort) AddRemotePort(rpt *OutPort)
- func (pt *InPort) CloseConnection(rptName string)
- func (pt *InPort) Disconnect(rptName string)
- func (pt *InPort) From(rpt *OutPort)
- func (pt *InPort) Name() string
- func (pt *InPort) Process() WorkflowProcess
- func (pt *InPort) Ready() bool
- func (pt *InPort) Recv() *FileIP
- func (pt *InPort) Send(ip *FileIP)
- func (pt *InPort) SetProcess(p WorkflowProcess)
- func (pt *InPort) SetReady(ready bool)
- type OutParamPort
- func (pop *OutParamPort) AddRemotePort(pip *InParamPort)
- func (pop *OutParamPort) Close()
- func (pop *OutParamPort) Disconnect(pipName string)
- func (pop *OutParamPort) Name() string
- func (pop *OutParamPort) Process() WorkflowProcess
- func (pop *OutParamPort) Ready() bool
- func (pop *OutParamPort) Send(param string)
- func (pop *OutParamPort) SetProcess(p WorkflowProcess)
- func (pop *OutParamPort) SetReady(ready bool)
- func (pop *OutParamPort) To(pip *InParamPort)
- type OutPort
- func (pt *OutPort) AddRemotePort(rpt *InPort)
- func (pt *OutPort) Close()
- func (pt *OutPort) Disconnect(rptName string)
- func (pt *OutPort) Name() string
- func (pt *OutPort) Process() WorkflowProcess
- func (pt *OutPort) Ready() bool
- func (pt *OutPort) Send(ip *FileIP)
- func (pt *OutPort) SetProcess(p WorkflowProcess)
- func (pt *OutPort) SetReady(ready bool)
- func (pt *OutPort) To(rpt *InPort)
- type PortInfo
- type Process
- func (p *Process) In(portName string) *InPort
- func (p *Process) InParam(portName string) *InParamPort
- func (p *Process) Out(portName string) *OutPort
- func (p *Process) OutParam(portName string) *OutParamPort
- func (p *Process) Run()
- func (p *Process) SetOut(outPortName string, pathPattern string)
- func (p *Process) SetOutFunc(outPortName string, pathFmtFunc func(task *Task) (path string))
- type Sink
- type Task
- func (t *Task) Execute()
- func (t *Task) InIP(portName string) *FileIP
- func (t *Task) InPath(portName string) string
- func (t *Task) OutIP(portName string) *FileIP
- func (t *Task) OutPath(portName string) string
- func (t *Task) Param(portName string) string
- func (t *Task) Tag(tagName string) string
- func (t *Task) TempDir() string
- type Workflow
- func (wf *Workflow) AddProc(proc WorkflowProcess)
- func (wf *Workflow) AddProcs(procs ...WorkflowProcess)
- func (wf *Workflow) DecConcurrentTasks(slots int)
- func (wf *Workflow) DotGraph() (dot string)
- func (wf *Workflow) IncConcurrentTasks(slots int)
- func (wf *Workflow) Name() string
- func (wf *Workflow) NewProc(procName string, commandPattern string) *Process
- func (wf *Workflow) PlotGraph(filePath string)
- func (wf *Workflow) PlotGraphPDF(filePath string)
- func (wf *Workflow) Proc(procName string) WorkflowProcess
- func (wf *Workflow) Procs() map[string]WorkflowProcess
- func (wf *Workflow) ProcsSorted() []WorkflowProcess
- func (wf *Workflow) Run()
- func (wf *Workflow) RunTo(finalProcNames ...string)
- func (wf *Workflow) RunToProcs(finalProcs ...WorkflowProcess)
- func (wf *Workflow) RunToRegex(procNamePatterns ...string)
- func (wf *Workflow) SetSink(sink *Sink)
- func (wf *Workflow) Sink() *Sink
- type WorkflowPlotConf
- type WorkflowProcess
Constants ¶
const FSRootPlaceHolder = "__fsroot__"
FSRootPlaceHolder is a string to use instead of an initial '/', to indicate a path that belongs to the absolute root
const (
// Version is the SciPipe version in string format
Version = "0.9.13"
)
Variables ¶
var ( // Trace is a log handler for extremely detailed level logs. It is so far // sparely used in scipipe. Trace *log.Logger // Debug is a log handler for debugging level logs Debug *log.Logger // Info is a log handler for information level logs Info *log.Logger // Audit is a log handler for audit level logs Audit *log.Logger // Warning is a log handler for warning level logs Warning *log.Logger // Error is a log handler for error level logs Error *log.Logger )
var (
BUFSIZE = 128
)
Functions ¶
func AtomizeIPs ¶
AtomizeIPs renames temporary output files/directories to their proper paths. It is called both from Task, and from Process that implement cutom execution schedule.
func CheckWithMsg ¶
CheckWithMsg checks the error err, and prints both the original error message, and a custom one provided in errMsg
func Fail ¶
func Fail(vs ...interface{})
Fail logs the error message, so that it will be possible to improve error messages in one place
func Failf ¶
func Failf(msg string, vs ...interface{})
Failf is like Fail but with msg being a formatter string for the message and vs being items to format into the message
func InitLog ¶
func InitLog( traceHandle io.Writer, debugHandle io.Writer, infoHandle io.Writer, auditHandle io.Writer, warningHandle io.Writer, errorHandle io.Writer)
InitLog initiates logging handlers
func InitLogAuditToFile ¶
func InitLogAuditToFile(filePath string)
InitLogAuditToFile initiate logging with level=AUDIT, and write that to fileName
func LogAuditf ¶
LogAuditf logs a pretty printed log message with the AUDIT log level, where componentName is a name of a process, task, workflow or similar that generates the message, while message and values are formatted in the manner of fmt.Printf
func LogAuditln ¶
LogAuditln logs a pretty printed log message with the AUDIT log level, where componentName is a name of a process, task, workflow or similar that generates the message, while message is a custom message (can be specified as multiple strings, which will then be formatted in the manner of fmt.Println).
Types ¶
type AuditInfo ¶
type AuditInfo struct { ID string ProcessName string Command string Params map[string]string Tags map[string]string StartTime time.Time FinishTime time.Time ExecTimeNS time.Duration OutFiles map[string]string Upstream map[string]*AuditInfo }
AuditInfo contains structured audit/provenance logging information for a particular task (invocation), to go with all outgoing IPs from that task
func UnmarshalAuditInfoJSONFile ¶
UnmarshalAuditInfoJSONFile returns an AuditInfo object from an AuditInfo .json file
type BaseIP ¶
type BaseIP struct {
// contains filtered or unexported fields
}
BaseIP contains foundational functionality which all IPs need to implement. It is meant to be embedded into other IP implementations.
type BaseProcess ¶
type BaseProcess struct {
// contains filtered or unexported fields
}
BaseProcess provides a skeleton for processes, such as the main Process component, and the custom components in the scipipe/components library
func NewBaseProcess ¶
func NewBaseProcess(wf *Workflow, name string) BaseProcess
NewBaseProcess returns a new BaseProcess, connected to the provided workflow, and with the name name
func (*BaseProcess) CloseAllOutPorts ¶
func (p *BaseProcess) CloseAllOutPorts()
CloseAllOutPorts closes all normal-, and parameter out ports
func (*BaseProcess) CloseOutParamPorts ¶
func (p *BaseProcess) CloseOutParamPorts()
CloseOutParamPorts closes all parameter out-ports
func (*BaseProcess) CloseOutPorts ¶
func (p *BaseProcess) CloseOutPorts()
CloseOutPorts closes all (normal) out-ports
func (*BaseProcess) DeleteInParamPort ¶
func (p *BaseProcess) DeleteInParamPort(portName string)
DeleteInParamPort deletes a InParamPort object from the process
func (*BaseProcess) DeleteInPort ¶
func (p *BaseProcess) DeleteInPort(portName string)
DeleteInPort deletes an InPort object from the process
func (*BaseProcess) DeleteOutParamPort ¶
func (p *BaseProcess) DeleteOutParamPort(portName string)
DeleteOutParamPort deletes a OutParamPort object from the process
func (*BaseProcess) DeleteOutPort ¶
func (p *BaseProcess) DeleteOutPort(portName string)
DeleteOutPort deletes a OutPort object from the process
func (*BaseProcess) InParamPort ¶
func (p *BaseProcess) InParamPort(portName string) *InParamPort
InParamPort returns the parameter port with name portName
func (*BaseProcess) InParamPorts ¶
func (p *BaseProcess) InParamPorts() map[string]*InParamPort
InParamPorts returns all parameter in-ports of the process
func (*BaseProcess) InPort ¶
func (p *BaseProcess) InPort(portName string) *InPort
InPort returns the in-port with name portName
func (*BaseProcess) InPorts ¶
func (p *BaseProcess) InPorts() map[string]*InPort
InPorts returns a map of all the in-ports of the process, keyed by their names
func (*BaseProcess) InitInParamPort ¶
func (p *BaseProcess) InitInParamPort(proc WorkflowProcess, portName string)
InitInParamPort adds the parameter port paramPort with name portName
func (*BaseProcess) InitInPort ¶
func (p *BaseProcess) InitInPort(proc WorkflowProcess, portName string)
InitInPort adds the in-port port to the process, with name portName
func (*BaseProcess) InitOutParamPort ¶
func (p *BaseProcess) InitOutParamPort(proc WorkflowProcess, portName string)
InitOutParamPort initializes the parameter port paramPort with name portName to the process We need to supply the concrete process used here as well, since this method might be used as part of an embedded struct, meaning that the process in the receiver is just the *BaseProcess, which doesn't suffice.
func (*BaseProcess) InitOutPort ¶
func (p *BaseProcess) InitOutPort(proc WorkflowProcess, portName string)
InitOutPort adds the out-port port to the process, with name portName
func (*BaseProcess) OutParamPort ¶
func (p *BaseProcess) OutParamPort(portName string) *OutParamPort
OutParamPort returns the parameter port with name portName
func (*BaseProcess) OutParamPorts ¶
func (p *BaseProcess) OutParamPorts() map[string]*OutParamPort
OutParamPorts returns all parameter out-ports of the process
func (*BaseProcess) OutPort ¶
func (p *BaseProcess) OutPort(portName string) *OutPort
OutPort returns the out-port with name portName
func (*BaseProcess) OutPorts ¶
func (p *BaseProcess) OutPorts() map[string]*OutPort
OutPorts returns a map of all the out-ports of the process, keyed by their names
func (*BaseProcess) Ready ¶
func (p *BaseProcess) Ready() (isReady bool)
Ready checks whether all the process' ports are connected
func (*BaseProcess) Workflow ¶
func (p *BaseProcess) Workflow() *Workflow
Workflow returns the workflow the process is connected to
type FileIP ¶
FileIP (Short for "Information Packet" in Flow-Based Programming terminology) contains information and helper methods for a physical file on a normal disk.
func (*FileIP) Atomize ¶
func (ip *FileIP) Atomize()
Atomize renames the temporary file name to the final file name, thus enabling to separate unfinished, and finished files
func (*FileIP) AuditFilePath ¶
AuditFilePath returns the file path of the audit info file for the FileIP
func (*FileIP) CreateFifo ¶
func (ip *FileIP) CreateFifo()
CreateFifo creates a FIFO file for the FileIP
func (*FileIP) FifoFileExists ¶
FifoFileExists checks if the FIFO-file (named pipe file) exists
func (*FileIP) FifoPath ¶
FifoPath returns the path to use when a FIFO file is used instead of a normal file
func (*FileIP) OpenWriteTemp ¶
OpenWriteTemp opens the file for writing, and returns a file handle (*os.File)
func (*FileIP) Read ¶
Read reads the whole content of the file and returns the content as a byte array
func (*FileIP) RemoveFifo ¶
func (ip *FileIP) RemoveFifo()
RemoveFifo removes the FIFO file, if it exists
func (*FileIP) SetAuditInfo ¶
SetAuditInfo sets the AuditInfo struct for the FileIP
func (*FileIP) TempDir ¶
TempDir returns the path to a temporary directory where outputs are written
func (*FileIP) TempFileExists ¶
TempFileExists checks if the temp-file exists
func (*FileIP) UnMarshalJSON ¶
func (ip *FileIP) UnMarshalJSON(v interface{})
UnMarshalJSON is a helper function to unmarshal the content of the IPs file to the interface v
func (*FileIP) WriteAuditLogToFile ¶
func (ip *FileIP) WriteAuditLogToFile()
WriteAuditLogToFile writes the audit log to its designated file
type IP ¶
type IP interface { ID() string Atomize() }
IP Is the base interface which all other IPs need to adhere to
type InParamPort ¶
type InParamPort struct { Chan chan string RemotePorts map[string]*OutParamPort // contains filtered or unexported fields }
InParamPort is an in-port for parameter values of string type
func NewInParamPort ¶
func NewInParamPort(name string) *InParamPort
NewInParamPort returns a new InParamPort
func (*InParamPort) AddRemotePort ¶
func (pip *InParamPort) AddRemotePort(pop *OutParamPort)
AddRemotePort adds a remote OutParamPort to the InParamPort
func (*InParamPort) CloseConnection ¶
func (pip *InParamPort) CloseConnection(popName string)
CloseConnection closes the connection to the remote out-port with name popName, on the InParamPort
func (*InParamPort) From ¶
func (pip *InParamPort) From(pop *OutParamPort)
From connects one parameter port with another one
func (*InParamPort) FromFloat ¶
func (pip *InParamPort) FromFloat(floats ...float64)
FromFloat feeds one or more parameters of type float64 to the param port
func (*InParamPort) FromInt ¶
func (pip *InParamPort) FromInt(ints ...int)
FromInt feeds one or more parameters of type int to the param port
func (*InParamPort) FromStr ¶
func (pip *InParamPort) FromStr(strings ...string)
FromStr feeds one or more parameters of type string to a port
func (*InParamPort) Name ¶
func (pip *InParamPort) Name() string
Name returns the name of the InParamPort
func (*InParamPort) Process ¶
func (pip *InParamPort) Process() WorkflowProcess
Process returns the process that is connected to the port
func (*InParamPort) Ready ¶
func (pip *InParamPort) Ready() bool
Ready tells whether the port is ready or not
func (*InParamPort) Recv ¶
func (pip *InParamPort) Recv() string
Recv receiveds a param value over the ports connection
func (*InParamPort) Send ¶
func (pip *InParamPort) Send(param string)
Send sends IPs to the in-port, and is supposed to be called from the remote (out-) port, to send to this in-port
func (*InParamPort) SetProcess ¶
func (pip *InParamPort) SetProcess(p WorkflowProcess)
SetProcess sets the process of the port to p
func (*InParamPort) SetReady ¶
func (pip *InParamPort) SetReady(ready bool)
SetReady sets the ready status of the InParamPort
type InPort ¶
type InPort struct { Chan chan *FileIP RemotePorts map[string]*OutPort // contains filtered or unexported fields }
InPort represents a pluggable connection to multiple out-ports from other processes, from its own process, and with which it is communicating via channels under the hood
func (*InPort) AddRemotePort ¶
AddRemotePort adds a remote OutPort to the InPort
func (*InPort) CloseConnection ¶
CloseConnection closes the connection to the remote out-port with name rptName, on the InPort
func (*InPort) Disconnect ¶
Disconnect disconnects the (out-)port with name rptName, from the InPort
func (*InPort) Process ¶
func (pt *InPort) Process() WorkflowProcess
Process returns the process connected to the port
func (*InPort) Send ¶
Send sends IPs to the in-port, and is supposed to be called from the remote (out-) port, to send to this in-port
func (*InPort) SetProcess ¶
func (pt *InPort) SetProcess(p WorkflowProcess)
SetProcess sets the process of the port to p
type OutParamPort ¶
type OutParamPort struct { RemotePorts map[string]*InParamPort // contains filtered or unexported fields }
OutParamPort is an out-port for parameter values of string type
func NewOutParamPort ¶
func NewOutParamPort(name string) *OutParamPort
NewOutParamPort returns a new OutParamPort
func (*OutParamPort) AddRemotePort ¶
func (pop *OutParamPort) AddRemotePort(pip *InParamPort)
AddRemotePort adds a remote InParamPort to the OutParamPort
func (*OutParamPort) Close ¶
func (pop *OutParamPort) Close()
Close closes the connection between this port and all the ports it is connected to. If this port is the last connected port to an in-port, that in-ports channel will also be closed.
func (*OutParamPort) Disconnect ¶
func (pop *OutParamPort) Disconnect(pipName string)
Disconnect disonnects the (in-)port with name rptName, from the OutParamPort
func (*OutParamPort) Name ¶
func (pop *OutParamPort) Name() string
Name returns the name of the OutParamPort
func (*OutParamPort) Process ¶
func (pop *OutParamPort) Process() WorkflowProcess
Process returns the process that is connected to the port
func (*OutParamPort) Ready ¶
func (pop *OutParamPort) Ready() bool
Ready tells whether the port is ready or not
func (*OutParamPort) Send ¶
func (pop *OutParamPort) Send(param string)
Send sends an FileIP to all the in-ports connected to the OutParamPort
func (*OutParamPort) SetProcess ¶
func (pop *OutParamPort) SetProcess(p WorkflowProcess)
SetProcess sets the process of the port to p
func (*OutParamPort) SetReady ¶
func (pop *OutParamPort) SetReady(ready bool)
SetReady sets the ready status of the OutParamPort
func (*OutParamPort) To ¶
func (pop *OutParamPort) To(pip *InParamPort)
To connects an InParamPort to the OutParamPort
type OutPort ¶
OutPort represents a pluggable connection to multiple in-ports from other processes, from its own process, and with which it is communicating via channels under the hood
func (*OutPort) AddRemotePort ¶
AddRemotePort adds a remote InPort to the OutPort
func (*OutPort) Close ¶
func (pt *OutPort) Close()
Close closes the connection between this port and all the ports it is connected to. If this port is the last connected port to an in-port, that in-ports channel will also be closed.
func (*OutPort) Disconnect ¶
Disconnect disconnects the (in-)port with name rptName, from the OutPort
func (*OutPort) Process ¶
func (pt *OutPort) Process() WorkflowProcess
Process returns the process connected to the port
func (*OutPort) SetProcess ¶
func (pt *OutPort) SetProcess(p WorkflowProcess)
SetProcess sets the process of the port to p
type PortInfo ¶
type PortInfo struct {
// contains filtered or unexported fields
}
PortInfo is a container for various information about process ports
type Process ¶
type Process struct { BaseProcess CommandPattern string PathFuncs map[string]func(*Task) string CustomExecute func(*Task) CoresPerTask int Prepend string Spawn bool PortInfo map[string]*PortInfo }
Process is the central component in SciPipe after Workflow. Processes are long-running "services" that schedules and executes Tasks based on the IPs and parameters received on its in-ports and parameter ports
func NewProc ¶
NewProc returns a new Process, and initializes its ports based on the command pattern.
func (*Process) In ¶
In is a short-form for InPort() (of BaseProcess), which works only on Process processes
func (*Process) InParam ¶
func (p *Process) InParam(portName string) *InParamPort
InParam is a short-form for InParamPort() (of BaseProcess), which works only on Process processes
func (*Process) Out ¶
Out is a short-form for OutPort() (of BaseProcess), which works only on Process processes
func (*Process) OutParam ¶
func (p *Process) OutParam(portName string) *OutParamPort
OutParam is a short-form for OutParamPort() (of BaseProcess), which works only on Process processes
func (*Process) Run ¶
func (p *Process) Run()
Run runs the process by instantiating and executing Tasks for all inputs and parameter values on its in-ports. in the case when there are no inputs or parameter values on the in-ports, it will run just once before it terminates. note that the actual execution of shell commands are done inside Task.Execute, not here.
func (*Process) SetOut ¶
SetOut initializes a port (if it does not already exist), and takes a configuration for its outputs paths via a pattern similar to the command pattern used to create new processes, with placeholder tags. Available placeholder tags to use are: {i:inport_name} {p:param_name} {t:tag_name} An example might be: {i:foo}.replace_with_{p:replacement}.txt ... given that the process contains an in-port named 'foo', and a parameter named 'replacement'. If an out-port with the specified name does not exist, it will be created. This allows to create out-ports for filenames that are created without explicitly stating a filename on the commandline, such as when only submitting a prefix.
type Sink ¶
type Sink struct {
BaseProcess
}
Sink is a simple component that just receives IPs on its In-port without doing anything with them. It is used to drive pipelines of processes
func (*Sink) FromParam ¶
func (p *Sink) FromParam(outParamPort *OutParamPort)
FromParam connects a param-out-port to the sinks param-in-port
type Task ¶
type Task struct { Name string Command string CustomExecute func(*Task) InIPs map[string]*FileIP OutIPs map[string]*FileIP Params map[string]string Tags map[string]string Done chan int Process *Process // contains filtered or unexported fields }
Task represents a single static shell command, or go function, to be executed, and are scheduled and managed by a corresponding Process
func NewTask ¶
func NewTask(workflow *Workflow, process *Process, name string, cmdPat string, inIPs map[string]*FileIP, outPathFuncs map[string]func(*Task) string, portInfos map[string]*PortInfo, params map[string]string, tags map[string]string, prepend string, customExecute func(*Task), cores int) *Task
NewTask instantiates and initializes a new Task
func (*Task) Execute ¶
func (t *Task) Execute()
Execute executes the task (the shell command or go function in CustomExecute)
type Workflow ¶
type Workflow struct { PlotConf WorkflowPlotConf // contains filtered or unexported fields }
Workflow is the centerpiece of the functionality in SciPipe, and is a container for a pipeline of processes making up a workflow. It has various methods for coordination the execution of the pipeline as a whole, such as keeping track of the maxiumum number of concurrent tasks, as well as helper methods for creating new processes, that automatically gets plugged in to the workflow on creation
func NewWorkflow ¶
NewWorkflow returns a new Workflow
func NewWorkflowCustomLogFile ¶
NewWorkflowCustomLogFile returns a new Workflow, with
func (*Workflow) AddProc ¶
func (wf *Workflow) AddProc(proc WorkflowProcess)
AddProc adds a Process to the workflow, to be run when the workflow runs
func (*Workflow) AddProcs ¶
func (wf *Workflow) AddProcs(procs ...WorkflowProcess)
AddProcs takes one or many Processes and adds them to the workflow, to be run when the workflow runs.
func (*Workflow) DecConcurrentTasks ¶
DecConcurrentTasks decreases the conter for how many concurrent tasks are currently running in the workflow
func (*Workflow) DotGraph ¶
DotGraph generates a graph description in DOT format (See https://en.wikipedia.org/wiki/DOT_%28graph_description_language%29) If Workflow.PlotConf.EdgeLabels is set to true, a label containing the in-port and out-port to which edges are connected to, will be printed.
func (*Workflow) IncConcurrentTasks ¶
IncConcurrentTasks increases the conter for how many concurrent tasks are currently running in the workflow
func (*Workflow) NewProc ¶
NewProc returns a new process based on a commandPattern (See the documentation for scipipe.NewProcess for more details about the pattern) and connects the process to the workflow
func (*Workflow) PlotGraphPDF ¶
PlotGraphPDF writes the workflow structure to a dot file, and also runs the graphviz dot command to produce a PDF file (requires graphviz, with the dot command, installed on the system)
func (*Workflow) Proc ¶
func (wf *Workflow) Proc(procName string) WorkflowProcess
Proc returns the process with name procName from the workflow
func (*Workflow) Procs ¶
func (wf *Workflow) Procs() map[string]WorkflowProcess
Procs returns a map of all processes keyed by their names in the workflow
func (*Workflow) ProcsSorted ¶
func (wf *Workflow) ProcsSorted() []WorkflowProcess
ProcsSorted returns the processes of the workflow, in an array, sorted by the process names
func (*Workflow) RunTo ¶
RunTo runs all processes upstream of, and including, the process with names provided as arguments
func (*Workflow) RunToProcs ¶
func (wf *Workflow) RunToProcs(finalProcs ...WorkflowProcess)
RunToProcs runs all processes upstream of, and including, the process strucs provided as arguments
func (*Workflow) RunToRegex ¶
RunToRegex runs all processes upstream of, and including, the process whose name matches any of the provided regexp patterns
type WorkflowPlotConf ¶
type WorkflowPlotConf struct {
EdgeLabels bool
}
WorkflowPlotConf contains configuraiton for plotting the workflow as a graph with graphviz
type WorkflowProcess ¶
type WorkflowProcess interface { Name() string InPorts() map[string]*InPort OutPorts() map[string]*OutPort InParamPorts() map[string]*InParamPort OutParamPorts() map[string]*OutParamPort Ready() bool Run() }
WorkflowProcess is an interface for processes to be handled by Workflow
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
cmd
|
|
examples
|
|
resequencing
Implementation (work in progress) of the resequencing analysis pipeline used to teach the introductory NGS bioinformatics analysis course at SciLifeLab as described on this page: https://scilifelab.github.io/courses/ngsintro/1502/labs/resequencing-analysis Prerequisites: Samtools, BWA, Picard, GATK.
|
Implementation (work in progress) of the resequencing analysis pipeline used to teach the introductory NGS bioinformatics analysis course at SciLifeLab as described on this page: https://scilifelab.github.io/courses/ngsintro/1502/labs/resequencing-analysis Prerequisites: Samtools, BWA, Picard, GATK. |
subworkflow
An example that shows how to create a sub-network / sub-workflow that can be used as a component
|
An example that shows how to create a sub-network / sub-workflow that can be used as a component |