scipipe

package module
Version: v0.11.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 1, 2021 License: MIT Imports: 19 Imported by: 16

README

SciPipe

Robust, flexible and resource-efficient pipelines using Go and the commandline

Build Status Test Coverage Codebeat Grade Go Report Card GoDoc Gitter DOI

Project links: Documentation & Main Website | Issue Tracker | Chat

Why SciPipe?

  • Intuitive: SciPipe works by flowing data through a network of channels and processes
  • Flexible: Wrapped command-line programs can be combined with processes in Go
  • Convenient: Full control over how your files are named
  • Efficient: Workflows are compiled to binary code that run fast
  • Parallel: Pipeline paralellism between processes as well as task parallelism for multiple inputs, making efficient use of multiple CPU cores
  • Supports streaming: Stream data between programs to avoid wasting disk space
  • Easy to debug: Use available Go debugging tools or just println()
  • Portable: Distribute workflows as Go code or as self-contained executable files

Project updates

Introduction

SciPipe is a library for writing Scientific Workflows, sometimes also called "pipelines", in the Go programming language.

When you need to run many commandline programs that depend on each other in complex ways, SciPipe helps by making the process of running these programs flexible, robust and reproducible. SciPipe also lets you restart an interrupted run without over-writing already produced output and produces an audit report of what was run, among many other things.

SciPipe is built on the proven principles of Flow-Based Programming (FBP) to achieve maximum flexibility, productivity and agility when designing workflows. Compared to plain dataflow, FBP provides the benefits that processes are fully self-contained, so that a library of re-usable components can be created, and plugged into new workflows ad-hoc.

Similar to other FBP systems, SciPipe workflows can be likened to a network of assembly lines in a factory, where items (files) are flowing through a network of conveyor belts, stopping at different independently running stations (processes) for processing, as depicted in the picture above.

SciPipe was initially created for problems in bioinformatics and cheminformatics, but works equally well for any problem involving pipelines of commandline applications.

Project status: SciPipe pretty stable now, and only very minor API changes might still occur. We have successfully used SciPipe in a handful of both real and experimental projects, and it has had occasional use outside the research group as well.

Known limitations

Installing

For full installation instructions, see the intallation page. For quick getting started steps, you can do:

  1. Download and install Go
  2. Run the following command, to install the scipipe Go library (don't miss the trailing dots!), and create a Go module for your script:
go install github.com/scipipe/scipipe/...@latest
go mod init myfirstworkflow-module

Hello World example

Let's look at an example workflow to get a feel for what writing workflows in SciPipe looks like:

package main

import (
    // Import SciPipe, aliased to sp
    sp "github.com/scipipe/scipipe"
)

func main() {
    // Init workflow and max concurrent tasks
    wf := sp.NewWorkflow("hello_world", 4)

    // Initialize processes, and file extensions
    hello := wf.NewProc("hello", "echo 'Hello ' > {o:out|.txt}")
    world := wf.NewProc("world", "echo $(cat {i:in}) World > {o:out|.txt}")

    // Define data flow
    world.In("in").From(hello.Out("out"))

    // Run workflow
    wf.Run()
}

To create a file with a similar simple example, you can run:

scipipe new hello_world.go

Running the example

Let's put the code in a file named hello_world.go and run it.

First you need to make sure that the dependencies (SciPipe in this case) is installed in your local Go module. This you can do with:

go mod tidy

Then you can go ahead and run the workflow:

$ go run hello_world.go
AUDIT   2018/07/17 21:42:26 | workflow:hello_world             | Starting workflow (Writing log to log/scipipe-20180717-214226-hello_world.log)
AUDIT   2018/07/17 21:42:26 | hello                            | Executing: echo 'Hello ' > hello.out.txt
AUDIT   2018/07/17 21:42:26 | hello                            | Finished: echo 'Hello ' > hello.out.txt
AUDIT   2018/07/17 21:42:26 | world                            | Executing: echo $(cat ../hello.out.txt) World > hello.out.txt.world.out.txt
AUDIT   2018/07/17 21:42:26 | world                            | Finished: echo $(cat ../hello.out.txt) World > hello.out.txt.world.out.txt
AUDIT   2018/07/17 21:42:26 | workflow:hello_world             | Finished workflow (Log written to log/scipipe-20180717-214226-hello_world.log)

Let's check what file SciPipe has generated:

$ ls -1 hello*
hello.out.txt
hello.out.txt.audit.json
hello.out.txt.world.out.txt
hello.out.txt.world.out.txt.audit.json

As you can see, it has created a file hello.out.txt, and hello.out.world.out.txt, and an accompanying .audit.json for each of these files.

Now, let's check the output of the final resulting file:

$ cat hello.out.txt.world.out.txt
Hello World

Now we can rejoice that it contains the text "Hello World", exactly as a proper Hello World example should :)

Now, these were a little long and cumbersome filenames, weren't they? SciPipe gives you very good control over how to name your files, if you don't want to rely on the automatic file naming. For example, we could set the first filename to a static one, and then use the first name as a basis for the file name for the second process, like so:

package main

import (
    // Import the SciPipe package, aliased to 'sp'
    sp "github.com/scipipe/scipipe"
)

func main() {
    // Init workflow with a name, and max concurrent tasks
    wf := sp.NewWorkflow("hello_world", 4)

    // Initialize processes and set output file paths
    hello := wf.NewProc("hello", "echo 'Hello ' > {o:out}")
    hello.SetOut("out", "hello.txt")

    world := wf.NewProc("world", "echo $(cat {i:in}) World >> {o:out}")
    world.SetOut("out", "{i:in|%.txt}_world.txt")

    // Connect network
    world.In("in").From(hello.Out("out"))

    // Run workflow
    wf.Run()
}

In the {i:in... part, we are re-using the file path from the file received on the in-port named 'in', and then running a Bash-style trim-from-end command on it to remove the .txt extension.

Now, if we run this, the file names get a little cleaner:

$ ls -1 hello*
hello.txt
hello.txt.audit.json
hello_world.go
hello_world.txt
hello_world.txt.audit.json

The audit logs

Finally, we could have a look at one of those audit file created:

$ cat hello_world.txt.audit.json
{
    "ID": "99i5vxhtd41pmaewc8pr",
    "ProcessName": "world",
    "Command": "echo $(cat hello.txt) World \u003e\u003e hello_world.txt.tmp/hello_world.txt",
    "Params": {},
    "Tags": {},
    "StartTime": "2018-06-15T19:10:37.955602979+02:00",
    "FinishTime": "2018-06-15T19:10:37.959410102+02:00",
    "ExecTimeNS": 3000000,
    "Upstream": {
        "hello.txt": {
            "ID": "w4oeiii9h5j7sckq7aqq",
            "ProcessName": "hello",
            "Command": "echo 'Hello ' \u003e hello.txt.tmp/hello.txt",
            "Params": {},
            "Tags": {},
            "StartTime": "2018-06-15T19:10:37.950032676+02:00",
            "FinishTime": "2018-06-15T19:10:37.95468214+02:00",
            "ExecTimeNS": 4000000,
            "Upstream": {}
        }
    }

Each such audit-file contains a hierarchic JSON-representation of the full workflow path that was executed in order to produce this file. On the first level is the command that directly produced the corresponding file, and then, indexed by their filenames, under "Upstream", there is a similar chunk describing how all of its input files were generated. This process will be repeated in a recursive way for large workflows, so that, for each file generated by the workflow, there is always a full, hierarchic, history of all the commands run - with their associated metadata - to produce that file.

You can find many more examples in the examples folder in the GitHub repo.

For more information about how to write workflows using SciPipe, and much more, see SciPipe website (scipipe.org)!

More material on SciPipe

Citing SciPipe

If you use SciPipe in academic or scholarly work, please cite the following paper as source:

Lampa S, Dahlö M, Alvarsson J, Spjuth O. SciPipe: A workflow library for agile development of complex and dynamic bioinformatics pipelines Gigascience. 8, 5 (2019). DOI: 10.1093/gigascience/giz044

Acknowledgements

Find below a few tools that are more or less similar to SciPipe that are worth worth checking out before deciding on what tool fits you best (in approximate order of similarity to SciPipe):

Documentation

Overview

Package scipipe is a library for writing scientific workflows (sometimes also called "pipelines") of shell commands that depend on each other, in the Go programming languages. It was initially designed for problems in cheminformatics and bioinformatics, but should apply equally well to any domain involving complex pipelines of interdependent shell commands.

Index

Constants

View Source
const FSRootPlaceHolder = "__fsroot__"

FSRootPlaceHolder is a string to use instead of an initial '/', to indicate a path that belongs to the absolute root

View Source
const (
	// Version is the SciPipe version in string format
	Version = "0.11.1"
)

Variables

View Source
var (
	// Trace is a log handler for extremely detailed level logs. It is so far
	// sparely used in scipipe.
	Trace *log.Logger
	// Debug is a log handler for debugging level logs
	Debug *log.Logger
	// Info is a log handler for information level logs
	Info *log.Logger
	// Audit is a log handler for audit level logs
	Audit *log.Logger
	// Warning is a log handler for warning level logs
	Warning *log.Logger
	// Error is a log handler for error level logs
	Error *log.Logger
)
View Source
var (
	BUFSIZE = 128
)

Functions

func AtomizeIPs added in v0.8.0

func AtomizeIPs(tempExecDir string, ips ...*FileIP) error

AtomizeIPs renames temporary output files/directories to their proper paths. It is called both from Task, and from Process that implement cutom execution schedule.

func Check

func Check(err error)

Check checks the error err, and prints the message in the error

func CheckWithMsg added in v0.6.1

func CheckWithMsg(err error, errMsg string)

CheckWithMsg checks the error err, and prints both the original error message, and a custom one provided in errMsg

func ExecCmd

func ExecCmd(cmd string) string

ExecCmd executes the command cmd, as a shell command via bash

func Fail added in v0.6.1

func Fail(vs ...interface{})

Fail logs the error message, so that it will be possible to improve error messages in one place

func Failf added in v0.6.1

func Failf(msg string, vs ...interface{})

Failf is like Fail but with msg being a formatter string for the message and vs being items to format into the message

func InitLog

func InitLog(
	traceHandle io.Writer,
	debugHandle io.Writer,
	infoHandle io.Writer,
	auditHandle io.Writer,
	warningHandle io.Writer,
	errorHandle io.Writer)

InitLog initiates logging handlers

func InitLogAudit

func InitLogAudit()

InitLogAudit initiate logging with level=AUDIT

func InitLogAuditToFile added in v0.6.4

func InitLogAuditToFile(filePath string)

InitLogAuditToFile initiate logging with level=AUDIT, and write that to fileName

func InitLogDebug

func InitLogDebug()

InitLogDebug initiates logging with level=DEBUG

func InitLogError

func InitLogError()

InitLogError initiates logging with level=ERROR

func InitLogInfo

func InitLogInfo()

InitLogInfo initiates logging with level=INFO

func InitLogWarning

func InitLogWarning()

InitLogWarning initiates logging with level=WARNING

Types

type AuditInfo

type AuditInfo struct {
	ID          string
	ProcessName string
	Command     string
	Params      map[string]string
	Tags        map[string]string
	StartTime   time.Time
	FinishTime  time.Time
	ExecTimeNS  time.Duration
	OutFiles    map[string]string
	Upstream    map[string]*AuditInfo
}

AuditInfo contains structured audit/provenance logging information for a particular task (invocation), to go with all outgoing IPs from that task

func NewAuditInfo

func NewAuditInfo() *AuditInfo

NewAuditInfo returns a new AuditInfo struct

func UnmarshalAuditInfoJSONFile added in v0.8.0

func UnmarshalAuditInfoJSONFile(fileName string) (auditInfo *AuditInfo)

UnmarshalAuditInfoJSONFile returns an AuditInfo object from an AuditInfo .json file

type BaseIP added in v0.6.1

type BaseIP struct {
	// contains filtered or unexported fields
}

BaseIP contains foundational functionality which all IPs need to implement. It is meant to be embedded into other IP implementations.

func NewBaseIP added in v0.6.1

func NewBaseIP(path string) *BaseIP

NewBaseIP creates a new BaseIP

func (*BaseIP) ID added in v0.6.1

func (ip *BaseIP) ID() string

ID returns a globally unique ID for the IP

type BaseProcess added in v0.5.1

type BaseProcess struct {
	// contains filtered or unexported fields
}

BaseProcess provides a skeleton for processes, such as the main Process component, and the custom components in the scipipe/components library

func NewBaseProcess added in v0.5.1

func NewBaseProcess(wf *Workflow, name string) BaseProcess

NewBaseProcess returns a new BaseProcess, connected to the provided workflow, and with the name name

func (*BaseProcess) CloseAllOutPorts added in v0.6.1

func (p *BaseProcess) CloseAllOutPorts()

CloseAllOutPorts closes all normal-, and parameter out ports

func (*BaseProcess) CloseOutParamPorts added in v0.8.0

func (p *BaseProcess) CloseOutParamPorts()

CloseOutParamPorts closes all parameter out-ports

func (*BaseProcess) CloseOutPorts added in v0.6.1

func (p *BaseProcess) CloseOutPorts()

CloseOutPorts closes all (normal) out-ports

func (*BaseProcess) DeleteInParamPort added in v0.8.0

func (p *BaseProcess) DeleteInParamPort(portName string)

DeleteInParamPort deletes a InParamPort object from the process

func (*BaseProcess) DeleteInPort added in v0.5.1

func (p *BaseProcess) DeleteInPort(portName string)

DeleteInPort deletes an InPort object from the process

func (*BaseProcess) DeleteOutParamPort added in v0.8.0

func (p *BaseProcess) DeleteOutParamPort(portName string)

DeleteOutParamPort deletes a OutParamPort object from the process

func (*BaseProcess) DeleteOutPort added in v0.5.1

func (p *BaseProcess) DeleteOutPort(portName string)

DeleteOutPort deletes a OutPort object from the process

func (*BaseProcess) Fail added in v0.11.0

func (p *BaseProcess) Fail(msg interface{})

Fail fails with a message that includes the process name

func (*BaseProcess) Failf added in v0.10.2

func (p *BaseProcess) Failf(msg string, parts ...interface{})

Failf fails with a message that includes the process name

func (*BaseProcess) InParamPort added in v0.8.0

func (p *BaseProcess) InParamPort(portName string) *InParamPort

InParamPort returns the parameter port with name portName

func (*BaseProcess) InParamPorts added in v0.8.0

func (p *BaseProcess) InParamPorts() map[string]*InParamPort

InParamPorts returns all parameter in-ports of the process

func (*BaseProcess) InPort added in v0.5.1

func (p *BaseProcess) InPort(portName string) *InPort

InPort returns the in-port with name portName

func (*BaseProcess) InPorts added in v0.5.1

func (p *BaseProcess) InPorts() map[string]*InPort

InPorts returns a map of all the in-ports of the process, keyed by their names

func (*BaseProcess) InitInParamPort added in v0.8.0

func (p *BaseProcess) InitInParamPort(proc WorkflowProcess, portName string)

InitInParamPort adds the parameter port paramPort with name portName

func (*BaseProcess) InitInPort added in v0.5.1

func (p *BaseProcess) InitInPort(proc WorkflowProcess, portName string)

InitInPort adds the in-port port to the process, with name portName

func (*BaseProcess) InitOutParamPort added in v0.8.0

func (p *BaseProcess) InitOutParamPort(proc WorkflowProcess, portName string)

InitOutParamPort initializes the parameter port paramPort with name portName to the process We need to supply the concrete process used here as well, since this method might be used as part of an embedded struct, meaning that the process in the receiver is just the *BaseProcess, which doesn't suffice.

func (*BaseProcess) InitOutPort added in v0.5.1

func (p *BaseProcess) InitOutPort(proc WorkflowProcess, portName string)

InitOutPort adds the out-port port to the process, with name portName

func (*BaseProcess) Name added in v0.5.1

func (p *BaseProcess) Name() string

Name returns the name of the process

func (*BaseProcess) OutParamPort added in v0.8.0

func (p *BaseProcess) OutParamPort(portName string) *OutParamPort

OutParamPort returns the parameter port with name portName

func (*BaseProcess) OutParamPorts added in v0.8.0

func (p *BaseProcess) OutParamPorts() map[string]*OutParamPort

OutParamPorts returns all parameter out-ports of the process

func (*BaseProcess) OutPort added in v0.5.1

func (p *BaseProcess) OutPort(portName string) *OutPort

OutPort returns the out-port with name portName

func (*BaseProcess) OutPorts added in v0.5.1

func (p *BaseProcess) OutPorts() map[string]*OutPort

OutPorts returns a map of all the out-ports of the process, keyed by their names

func (*BaseProcess) Ready added in v0.8.0

func (p *BaseProcess) Ready() (isReady bool)

Ready checks whether all the process' ports are connected

func (*BaseProcess) Workflow added in v0.5.1

func (p *BaseProcess) Workflow() *Workflow

Workflow returns the workflow the process is connected to

type FileIP added in v0.6.1

type FileIP struct {
	*BaseIP

	SubStream *InPort
	// contains filtered or unexported fields
}

FileIP (Short for "Information Packet" in Flow-Based Programming terminology) contains information and helper methods for a physical file on a normal disk.

func NewFileIP added in v0.6.1

func NewFileIP(path string) (*FileIP, error)

NewFileIP creates a new FileIP

func (*FileIP) AddTag added in v0.8.0

func (ip *FileIP) AddTag(k string, v string)

AddTag adds the tag k with value v

func (*FileIP) AddTags added in v0.8.0

func (ip *FileIP) AddTags(tags map[string]string)

AddTags adds a map of tags to the IPs audit info

func (*FileIP) Atomize added in v0.6.1

func (ip *FileIP) Atomize()

Atomize renames the temporary file name to the final file name, thus enabling to separate unfinished, and finished files

func (*FileIP) AuditFilePath added in v0.6.1

func (ip *FileIP) AuditFilePath() string

AuditFilePath returns the file path of the audit info file for the FileIP

func (*FileIP) AuditInfo added in v0.6.1

func (ip *FileIP) AuditInfo() *AuditInfo

AuditInfo returns the AuditInfo struct for the FileIP

func (*FileIP) CreateFifo added in v0.6.1

func (ip *FileIP) CreateFifo()

CreateFifo creates a FIFO file for the FileIP

func (*FileIP) Exists added in v0.6.1

func (ip *FileIP) Exists() bool

Exists checks if the file exists (at its final file name)

func (*FileIP) Fail added in v0.11.0

func (ip *FileIP) Fail(msg interface{})

func (*FileIP) Failf added in v0.11.0

func (ip *FileIP) Failf(msg string, parts ...interface{})

func (*FileIP) FifoFileExists added in v0.6.1

func (ip *FileIP) FifoFileExists() bool

FifoFileExists checks if the FIFO-file (named pipe file) exists

func (*FileIP) FifoPath added in v0.6.1

func (ip *FileIP) FifoPath() string

FifoPath returns the path to use when a FIFO file is used instead of a normal file

func (*FileIP) Open added in v0.6.1

func (ip *FileIP) Open() *os.File

Open opens the file and returns a file handle (*os.File)

func (*FileIP) OpenTemp added in v0.6.1

func (ip *FileIP) OpenTemp() *os.File

OpenTemp opens the temp file and returns a file handle (*os.File)

func (*FileIP) OpenWriteTemp added in v0.6.1

func (ip *FileIP) OpenWriteTemp() *os.File

OpenWriteTemp opens the file for writing, and returns a file handle (*os.File)

func (*FileIP) Param added in v0.6.1

func (ip *FileIP) Param(key string) string

Param returns the parameter named key, from the IPs audit info

func (*FileIP) Path added in v0.6.1

func (ip *FileIP) Path() string

Path returns the (final) path of the physical file

func (*FileIP) Read added in v0.6.1

func (ip *FileIP) Read() []byte

Read reads the whole content of the file and returns the content as a byte array

func (*FileIP) RemoveFifo added in v0.6.1

func (ip *FileIP) RemoveFifo()

RemoveFifo removes the FIFO file, if it exists

func (*FileIP) SetAuditInfo added in v0.6.1

func (ip *FileIP) SetAuditInfo(ai *AuditInfo)

SetAuditInfo sets the AuditInfo struct for the FileIP

func (*FileIP) Size added in v0.6.1

func (ip *FileIP) Size() int64

Size returns the size of an existing file, in bytes

func (*FileIP) String added in v0.9.1

func (ip *FileIP) String() string

func (*FileIP) Tag added in v0.8.0

func (ip *FileIP) Tag(k string) string

Tag returns the tag for the tag with key k from the IPs audit info

func (*FileIP) Tags added in v0.8.0

func (ip *FileIP) Tags() map[string]string

Tags returns the audit info's tags

func (*FileIP) TempDir added in v0.8.0

func (ip *FileIP) TempDir() string

TempDir returns the path to a temporary directory where outputs are written

func (*FileIP) TempFileExists added in v0.6.1

func (ip *FileIP) TempFileExists() bool

TempFileExists checks if the temp-file exists

func (*FileIP) TempPath added in v0.6.1

func (ip *FileIP) TempPath() string

TempPath returns the temporary path of the physical file

func (*FileIP) UnMarshalJSON added in v0.6.1

func (ip *FileIP) UnMarshalJSON(v interface{})

UnMarshalJSON is a helper function to unmarshal the content of the IPs file to the interface v

func (*FileIP) Write added in v0.6.1

func (ip *FileIP) Write(dat []byte)

Write writes a byte array ([]byte) to the file's temp file path

func (*FileIP) WriteAuditLogToFile added in v0.6.1

func (ip *FileIP) WriteAuditLogToFile()

WriteAuditLogToFile writes the audit log to its designated file

type IP added in v0.5.1

type IP interface {
	ID() string
	Atomize()
}

IP Is the base interface which all other IPs need to adhere to

type InParamPort added in v0.8.0

type InParamPort struct {
	Chan chan string

	RemotePorts map[string]*OutParamPort
	// contains filtered or unexported fields
}

InParamPort is an in-port for parameter values of string type

func NewInParamPort added in v0.8.0

func NewInParamPort(name string) *InParamPort

NewInParamPort returns a new InParamPort

func (*InParamPort) AddRemotePort added in v0.8.0

func (pip *InParamPort) AddRemotePort(pop *OutParamPort)

AddRemotePort adds a remote OutParamPort to the InParamPort

func (*InParamPort) CloseConnection added in v0.8.0

func (pip *InParamPort) CloseConnection(popName string)

CloseConnection closes the connection to the remote out-port with name popName, on the InParamPort

func (*InParamPort) From added in v0.8.0

func (pip *InParamPort) From(pop *OutParamPort)

From connects one parameter port with another one

func (*InParamPort) FromFloat added in v0.8.0

func (pip *InParamPort) FromFloat(floats ...float64)

FromFloat feeds one or more parameters of type float64 to the param port

func (*InParamPort) FromInt added in v0.8.0

func (pip *InParamPort) FromInt(ints ...int)

FromInt feeds one or more parameters of type int to the param port

func (*InParamPort) FromStr added in v0.8.0

func (pip *InParamPort) FromStr(strings ...string)

FromStr feeds one or more parameters of type string to a port

func (*InParamPort) Name added in v0.8.0

func (pip *InParamPort) Name() string

Name returns the name of the InParamPort

func (*InParamPort) Process added in v0.8.0

func (pip *InParamPort) Process() WorkflowProcess

Process returns the process that is connected to the port

func (*InParamPort) Ready added in v0.8.0

func (pip *InParamPort) Ready() bool

Ready tells whether the port is ready or not

func (*InParamPort) Recv added in v0.8.0

func (pip *InParamPort) Recv() string

Recv receiveds a param value over the ports connection

func (*InParamPort) Send added in v0.8.0

func (pip *InParamPort) Send(param string)

Send sends IPs to the in-port, and is supposed to be called from the remote (out-) port, to send to this in-port

func (*InParamPort) SetProcess added in v0.8.0

func (pip *InParamPort) SetProcess(p WorkflowProcess)

SetProcess sets the process of the port to p

func (*InParamPort) SetReady added in v0.8.0

func (pip *InParamPort) SetReady(ready bool)

SetReady sets the ready status of the InParamPort

type InPort added in v0.5.1

type InPort struct {
	Chan chan *FileIP

	RemotePorts map[string]*OutPort
	// contains filtered or unexported fields
}

InPort represents a pluggable connection to multiple out-ports from other processes, from its own process, and with which it is communicating via channels under the hood

func NewInPort added in v0.5.1

func NewInPort(name string) *InPort

NewInPort returns a new InPort struct

func (*InPort) AddRemotePort added in v0.5.1

func (pt *InPort) AddRemotePort(rpt *OutPort)

AddRemotePort adds a remote OutPort to the InPort

func (*InPort) CloseConnection added in v0.5.1

func (pt *InPort) CloseConnection(rptName string)

CloseConnection closes the connection to the remote out-port with name rptName, on the InPort

func (*InPort) Disconnect added in v0.5.1

func (pt *InPort) Disconnect(rptName string)

Disconnect disconnects the (out-)port with name rptName, from the InPort

func (*InPort) From added in v0.8.0

func (pt *InPort) From(rpt *OutPort)

From connects an OutPort to the InPort

func (*InPort) Name added in v0.5.1

func (pt *InPort) Name() string

Name returns the name of the InPort

func (*InPort) Process added in v0.5.1

func (pt *InPort) Process() WorkflowProcess

Process returns the process connected to the port

func (*InPort) Ready added in v0.8.0

func (pt *InPort) Ready() bool

Ready tells whether the port is ready or not

func (*InPort) Recv added in v0.5.1

func (pt *InPort) Recv() *FileIP

Recv receives IPs from the port

func (*InPort) Send added in v0.5.1

func (pt *InPort) Send(ip *FileIP)

Send sends IPs to the in-port, and is supposed to be called from the remote (out-) port, to send to this in-port

func (*InPort) SetProcess added in v0.5.1

func (pt *InPort) SetProcess(p WorkflowProcess)

SetProcess sets the process of the port to p

func (*InPort) SetReady added in v0.8.0

func (pt *InPort) SetReady(ready bool)

SetReady sets the ready status of the InPort

type OutParamPort added in v0.8.0

type OutParamPort struct {
	RemotePorts map[string]*InParamPort
	// contains filtered or unexported fields
}

OutParamPort is an out-port for parameter values of string type

func NewOutParamPort added in v0.8.0

func NewOutParamPort(name string) *OutParamPort

NewOutParamPort returns a new OutParamPort

func (*OutParamPort) AddRemotePort added in v0.8.0

func (pop *OutParamPort) AddRemotePort(pip *InParamPort)

AddRemotePort adds a remote InParamPort to the OutParamPort

func (*OutParamPort) Close added in v0.8.0

func (pop *OutParamPort) Close()

Close closes the connection between this port and all the ports it is connected to. If this port is the last connected port to an in-port, that in-ports channel will also be closed.

func (*OutParamPort) Disconnect added in v0.8.0

func (pop *OutParamPort) Disconnect(pipName string)

Disconnect disonnects the (in-)port with name rptName, from the OutParamPort

func (*OutParamPort) Name added in v0.8.0

func (pop *OutParamPort) Name() string

Name returns the name of the OutParamPort

func (*OutParamPort) Process added in v0.8.0

func (pop *OutParamPort) Process() WorkflowProcess

Process returns the process that is connected to the port

func (*OutParamPort) Ready added in v0.8.0

func (pop *OutParamPort) Ready() bool

Ready tells whether the port is ready or not

func (*OutParamPort) Send added in v0.8.0

func (pop *OutParamPort) Send(param string)

Send sends an FileIP to all the in-ports connected to the OutParamPort

func (*OutParamPort) SetProcess added in v0.8.0

func (pop *OutParamPort) SetProcess(p WorkflowProcess)

SetProcess sets the process of the port to p

func (*OutParamPort) SetReady added in v0.8.0

func (pop *OutParamPort) SetReady(ready bool)

SetReady sets the ready status of the OutParamPort

func (*OutParamPort) To added in v0.8.0

func (pop *OutParamPort) To(pip *InParamPort)

To connects an InParamPort to the OutParamPort

type OutPort added in v0.5.1

type OutPort struct {
	RemotePorts map[string]*InPort
	// contains filtered or unexported fields
}

OutPort represents a pluggable connection to multiple in-ports from other processes, from its own process, and with which it is communicating via channels under the hood

func NewOutPort added in v0.5.1

func NewOutPort(name string) *OutPort

NewOutPort returns a new OutPort struct

func (*OutPort) AddRemotePort added in v0.5.1

func (pt *OutPort) AddRemotePort(rpt *InPort)

AddRemotePort adds a remote InPort to the OutPort

func (*OutPort) Close added in v0.5.1

func (pt *OutPort) Close()

Close closes the connection between this port and all the ports it is connected to. If this port is the last connected port to an in-port, that in-ports channel will also be closed.

func (*OutPort) Disconnect added in v0.5.1

func (pt *OutPort) Disconnect(rptName string)

Disconnect disconnects the (in-)port with name rptName, from the OutPort

func (*OutPort) Name added in v0.5.1

func (pt *OutPort) Name() string

Name returns the name of the OutPort

func (*OutPort) Process added in v0.5.1

func (pt *OutPort) Process() WorkflowProcess

Process returns the process connected to the port

func (*OutPort) Ready added in v0.8.0

func (pt *OutPort) Ready() bool

Ready tells whether the port is ready or not

func (*OutPort) Send added in v0.5.1

func (pt *OutPort) Send(ip *FileIP)

Send sends an FileIP to all the in-ports connected to the OutPort

func (*OutPort) SetProcess added in v0.5.1

func (pt *OutPort) SetProcess(p WorkflowProcess)

SetProcess sets the process of the port to p

func (*OutPort) SetReady added in v0.8.0

func (pt *OutPort) SetReady(ready bool)

SetReady sets the ready status of the OutPort

func (*OutPort) To added in v0.8.0

func (pt *OutPort) To(rpt *InPort)

To connects an InPort to the OutPort

type PortInfo added in v0.8.0

type PortInfo struct {
	// contains filtered or unexported fields
}

PortInfo is a container for various information about process ports

type Process

type Process struct {
	BaseProcess
	CommandPattern string
	PathFuncs      map[string]func(*Task) string
	CustomExecute  func(*Task)
	CoresPerTask   int
	Prepend        string
	Spawn          bool
	PortInfo       map[string]*PortInfo
}

Process is the central component in SciPipe after Workflow. Processes are long-running "services" that schedules and executes Tasks based on the IPs and parameters received on its in-ports and parameter ports

func NewProc

func NewProc(workflow *Workflow, name string, cmd string) *Process

NewProc returns a new Process, and initializes its ports based on the command pattern.

func (*Process) Fail added in v0.11.0

func (p *Process) Fail(msg interface{})

func (*Process) Failf added in v0.11.0

func (p *Process) Failf(msg string, parts ...interface{})

func (*Process) In added in v0.5.1

func (p *Process) In(portName string) *InPort

In is a short-form for InPort() (of BaseProcess), which works only on Process processes

func (*Process) InParam added in v0.8.0

func (p *Process) InParam(portName string) *InParamPort

InParam is a short-form for InParamPort() (of BaseProcess), which works only on Process processes

func (*Process) Out added in v0.5.1

func (p *Process) Out(portName string) *OutPort

Out is a short-form for OutPort() (of BaseProcess), which works only on Process processes

func (*Process) OutParam added in v0.8.0

func (p *Process) OutParam(portName string) *OutParamPort

OutParam is a short-form for OutParamPort() (of BaseProcess), which works only on Process processes

func (*Process) Run

func (p *Process) Run()

Run runs the process by instantiating and executing Tasks for all inputs and parameter values on its in-ports. in the case when there are no inputs or parameter values on the in-ports, it will run just once before it terminates. note that the actual execution of shell commands are done inside Task.Execute, not here.

func (*Process) SetOut added in v0.8.0

func (p *Process) SetOut(outPortName string, pathPattern string)

SetOut initializes a port (if it does not already exist), and takes a configuration for its outputs paths via a pattern similar to the command pattern used to create new processes, with placeholder tags. Available placeholder tags to use are: {i:inport_name} {p:param_name} {t:tag_name} An example might be: {i:foo}.replace_with_{p:replacement}.txt ... given that the process contains an in-port named 'foo', and a parameter named 'replacement'. If an out-port with the specified name does not exist, it will be created. This allows to create out-ports for filenames that are created without explicitly stating a filename on the commandline, such as when only submitting a prefix.

func (*Process) SetOutFunc added in v0.8.0

func (p *Process) SetOutFunc(outPortName string, pathFmtFunc func(task *Task) (path string))

SetOutFunc takes a function which produces a file path based on data available in *Task, such as concrete file paths and parameter values,

type Sink

type Sink struct {
	BaseProcess
}

Sink is a simple component that just receives IPs on its In-port without doing anything with them. It is used to drive pipelines of processes

func NewSink

func NewSink(wf *Workflow, name string) *Sink

NewSink returns a new Sink component

func (*Sink) Fail added in v0.11.0

func (p *Sink) Fail(msg interface{})

func (*Sink) Failf added in v0.11.0

func (p *Sink) Failf(msg string, parts ...interface{})

func (*Sink) From added in v0.8.0

func (p *Sink) From(outPort *OutPort)

From connects an out-port to the sinks in-port

func (*Sink) FromParam added in v0.8.0

func (p *Sink) FromParam(outParamPort *OutParamPort)

FromParam connects a param-out-port to the sinks param-in-port

func (*Sink) Run

func (p *Sink) Run()

Run runs the Sink process

type Task added in v0.5.1

type Task struct {
	Name          string
	Command       string
	CustomExecute func(*Task)
	InIPs         map[string]*FileIP
	OutIPs        map[string]*FileIP
	Params        map[string]string
	Tags          map[string]string
	Done          chan int

	Process *Process
	// contains filtered or unexported fields
}

Task represents a single static shell command, or go function, to be executed, and are scheduled and managed by a corresponding Process

func NewTask added in v0.5.1

func NewTask(workflow *Workflow, process *Process, name string, cmdPat string, inIPs map[string]*FileIP, outPathFuncs map[string]func(*Task) string, portInfos map[string]*PortInfo, params map[string]string, tags map[string]string, prepend string, customExecute func(*Task), cores int) *Task

NewTask instantiates and initializes a new Task

func (*Task) Audit added in v0.11.0

func (t *Task) Audit(msg string)

func (*Task) Auditf added in v0.11.0

func (t *Task) Auditf(msg string, parts ...interface{})

func (*Task) Execute added in v0.5.1

func (t *Task) Execute()

Execute executes the task (the shell command or go function in CustomExecute)

func (*Task) Fail added in v0.11.0

func (t *Task) Fail(msg interface{})

func (*Task) Failf added in v0.11.0

func (t *Task) Failf(msg string, parts ...interface{})

func (*Task) InIP added in v0.6.1

func (t *Task) InIP(portName string) *FileIP

InIP returns an IP for the in-port with name portName

func (*Task) InPath added in v0.5.1

func (t *Task) InPath(portName string) string

InPath returns the path name of an input file for the task

func (*Task) OutIP added in v0.6.1

func (t *Task) OutIP(portName string) *FileIP

OutIP returns an IP for the in-port with name portName

func (*Task) OutPath added in v0.6.1

func (t *Task) OutPath(portName string) string

OutPath returns the path name of an input file for the task

func (*Task) Param added in v0.5.1

func (t *Task) Param(portName string) string

Param returns the value of a param, for the task

func (*Task) Tag added in v0.8.0

func (t *Task) Tag(tagName string) string

Tag returns the value of a param, for the task

func (*Task) TempDir added in v0.8.0

func (t *Task) TempDir() string

TempDir returns a string that is unique to a task, suitable for use in file paths. It is built up by merging all input filenames and parameter values that a task takes as input, joined with dots.

type Workflow

type Workflow struct {
	PlotConf WorkflowPlotConf
	// contains filtered or unexported fields
}

Workflow is the centerpiece of the functionality in SciPipe, and is a container for a pipeline of processes making up a workflow. It has various methods for coordination the execution of the pipeline as a whole, such as keeping track of the maxiumum number of concurrent tasks, as well as helper methods for creating new processes, that automatically gets plugged in to the workflow on creation

func NewWorkflow

func NewWorkflow(name string, maxConcurrentTasks int) *Workflow

NewWorkflow returns a new Workflow

func NewWorkflowCustomLogFile added in v0.6.4

func NewWorkflowCustomLogFile(name string, maxConcurrentTasks int, logFile string) *Workflow

NewWorkflowCustomLogFile returns a new Workflow, with

func (*Workflow) AddProc

func (wf *Workflow) AddProc(proc WorkflowProcess)

AddProc adds a Process to the workflow, to be run when the workflow runs

func (*Workflow) AddProcs

func (wf *Workflow) AddProcs(procs ...WorkflowProcess)

AddProcs takes one or many Processes and adds them to the workflow, to be run when the workflow runs.

func (*Workflow) Auditf added in v0.11.0

func (wf *Workflow) Auditf(msg string, parts ...interface{})

func (*Workflow) DecConcurrentTasks

func (wf *Workflow) DecConcurrentTasks(slots int)

DecConcurrentTasks decreases the conter for how many concurrent tasks are currently running in the workflow

func (*Workflow) DotGraph added in v0.8.0

func (wf *Workflow) DotGraph() (dot string)

DotGraph generates a graph description in DOT format (See https://en.wikipedia.org/wiki/DOT_%28graph_description_language%29) If Workflow.PlotConf.EdgeLabels is set to true, a label containing the in-port and out-port to which edges are connected to, will be printed.

func (*Workflow) Fail added in v0.11.0

func (wf *Workflow) Fail(msg interface{})

func (*Workflow) Failf added in v0.11.0

func (wf *Workflow) Failf(msg string, parts ...interface{})

func (*Workflow) IncConcurrentTasks

func (wf *Workflow) IncConcurrentTasks(slots int)

IncConcurrentTasks increases the conter for how many concurrent tasks are currently running in the workflow

func (*Workflow) Name added in v0.5.1

func (wf *Workflow) Name() string

Name returns the name of the workflow

func (*Workflow) NewProc

func (wf *Workflow) NewProc(procName string, commandPattern string) *Process

NewProc returns a new process based on a commandPattern (See the documentation for scipipe.NewProcess for more details about the pattern) and connects the process to the workflow

func (*Workflow) PlotGraph added in v0.8.0

func (wf *Workflow) PlotGraph(filePath string)

PlotGraph writes the workflow structure to a dot file

func (*Workflow) PlotGraphPDF added in v0.8.0

func (wf *Workflow) PlotGraphPDF(filePath string)

PlotGraphPDF writes the workflow structure to a dot file, and also runs the graphviz dot command to produce a PDF file (requires graphviz, with the dot command, installed on the system)

func (*Workflow) Proc

func (wf *Workflow) Proc(procName string) WorkflowProcess

Proc returns the process with name procName from the workflow

func (*Workflow) Procs

func (wf *Workflow) Procs() map[string]WorkflowProcess

Procs returns a map of all processes keyed by their names in the workflow

func (*Workflow) ProcsSorted added in v0.8.0

func (wf *Workflow) ProcsSorted() []WorkflowProcess

ProcsSorted returns the processes of the workflow, in an array, sorted by the process names

func (*Workflow) Run

func (wf *Workflow) Run()

Run runs all the processes of the workflow

func (*Workflow) RunTo added in v0.5.1

func (wf *Workflow) RunTo(finalProcNames ...string)

RunTo runs all processes upstream of, and including, the process with names provided as arguments

func (*Workflow) RunToProcs added in v0.5.1

func (wf *Workflow) RunToProcs(finalProcs ...WorkflowProcess)

RunToProcs runs all processes upstream of, and including, the process strucs provided as arguments

func (*Workflow) RunToRegex added in v0.6.1

func (wf *Workflow) RunToRegex(procNamePatterns ...string)

RunToRegex runs all processes upstream of, and including, the process whose name matches any of the provided regexp patterns

func (*Workflow) SetSink

func (wf *Workflow) SetSink(sink *Sink)

SetSink sets the sink of the workflow to the provided sink process

func (*Workflow) Sink

func (wf *Workflow) Sink() *Sink

Sink returns the sink process of the workflow

type WorkflowPlotConf added in v0.8.0

type WorkflowPlotConf struct {
	EdgeLabels bool
}

WorkflowPlotConf contains configuraiton for plotting the workflow as a graph with graphviz

type WorkflowProcess added in v0.5.1

type WorkflowProcess interface {
	Name() string
	InPorts() map[string]*InPort
	OutPorts() map[string]*OutPort
	InParamPorts() map[string]*InParamPort
	OutParamPorts() map[string]*OutParamPort
	Ready() bool
	Run()
	Fail(interface{})
	Failf(string, ...interface{})
}

WorkflowProcess is an interface for processes to be handled by Workflow

Directories

Path Synopsis
cmd
examples
resequencing
Implementation (work in progress) of the resequencing analysis pipeline used to teach the introductory NGS bioinformatics analysis course at SciLifeLab as described on this page: https://scilifelab.github.io/courses/ngsintro/1502/labs/resequencing-analysis Prerequisites: Samtools, BWA, Picard, GATK.
Implementation (work in progress) of the resequencing analysis pipeline used to teach the introductory NGS bioinformatics analysis course at SciLifeLab as described on this page: https://scilifelab.github.io/courses/ngsintro/1502/labs/resequencing-analysis Prerequisites: Samtools, BWA, Picard, GATK.
subworkflow
An example that shows how to create a sub-network / sub-workflow that can be used as a component
An example that shows how to create a sub-network / sub-workflow that can be used as a component

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL