Documentation
¶
Index ¶
- Constants
- Variables
- func Check(err error)
- func CheckWithMsg(err error, errMsg string)
- func ExecCmd(cmd string) string
- func Fail(vs ...interface{})
- func Failf(msg string, vs ...interface{})
- func InitLog(traceHandle io.Writer, debugHandle io.Writer, infoHandle io.Writer, ...)
- func InitLogAudit()
- func InitLogAuditToFile(filePath string)
- func InitLogDebug()
- func InitLogError()
- func InitLogInfo()
- func InitLogWarning()
- type AuditInfo
- type BaseProcess
- func (p *BaseProcess) Audit(msg interface{})
- func (p *BaseProcess) Auditf(msg string, parts ...interface{})
- func (p *BaseProcess) CloseOutPorts()
- func (p *BaseProcess) DeleteInPort(portName string)
- func (p *BaseProcess) DeleteOutPort(portName string)
- func (p *BaseProcess) Fail(msg interface{})
- func (p *BaseProcess) Failf(msg string, parts ...interface{})
- func (p *BaseProcess) InPort(portName string) *InPort
- func (p *BaseProcess) InPorts() map[string]*InPort
- func (p *BaseProcess) InitInPort(node Node, portName string)
- func (p *BaseProcess) InitOutPort(node Node, portName string)
- func (p *BaseProcess) Name() string
- func (p *BaseProcess) Network() *Network
- func (p *BaseProcess) OutPort(portName string) *OutPort
- func (p *BaseProcess) OutPorts() map[string]*OutPort
- func (p *BaseProcess) Ready() (isReady bool)
- type IP
- type InPort
- func (pt *InPort) AddRemotePort(rpt *OutPort)
- func (pt *InPort) CloseConnection(rptName string)
- func (pt *InPort) Disconnect(rptName string)
- func (pt *InPort) Fail(msg interface{})
- func (pt *InPort) Failf(msg string, parts ...interface{})
- func (pt *InPort) From(rpt *OutPort)
- func (pt *InPort) Name() string
- func (pt *InPort) Process() Node
- func (pt *InPort) Ready() bool
- func (pt *InPort) Recv() *Packet
- func (pt *InPort) Send(ip *Packet)
- func (pt *InPort) SetProcess(p Node)
- func (pt *InPort) SetReady(ready bool)
- type Network
- func (net *Network) AddProc(node Node)
- func (net *Network) AddProcs(procs ...Node)
- func (net *Network) Auditf(msg string, parts ...interface{})
- func (net *Network) DecConcurrentTasks(slots int)
- func (net *Network) DotGraph() (dot string)
- func (net *Network) Fail(msg interface{})
- func (net *Network) Failf(msg string, parts ...interface{})
- func (net *Network) IncConcurrentTasks(slots int)
- func (net *Network) Name() string
- func (net *Network) PlotGraph(filePath string)
- func (net *Network) PlotGraphPDF(filePath string)
- func (net *Network) Proc(procName string) Node
- func (net *Network) Procs() map[string]Node
- func (net *Network) ProcsSorted() []Node
- func (net *Network) Run()
- func (net *Network) RunTo(finalProcNames ...string)
- func (net *Network) RunToProcs(finalProcs ...Node)
- func (net *Network) RunToRegex(procNamePatterns ...string)
- func (net *Network) SetSink(sink *Sink)
- func (net *Network) Sink() *Sink
- type NetworkPlotConf
- type Node
- type OutPort
- func (pt *OutPort) AddRemotePort(rpt *InPort)
- func (pt *OutPort) Close()
- func (pt *OutPort) Disconnect(rptName string)
- func (pt *OutPort) Fail(msg interface{})
- func (pt *OutPort) Failf(msg string, parts ...interface{})
- func (pt *OutPort) Name() string
- func (pt *OutPort) Process() Node
- func (pt *OutPort) Ready() bool
- func (pt *OutPort) Send(data any)
- func (pt *OutPort) SetProcess(p Node)
- func (pt *OutPort) SetReady(ready bool)
- func (pt *OutPort) To(rpt *InPort)
- type Packet
- func (ip *Packet) AddTag(k string, v string)
- func (ip *Packet) AddTags(tags map[string]string)
- func (ip *Packet) Data() any
- func (ip *Packet) Fail(msg interface{})
- func (ip *Packet) Failf(msg string, parts ...interface{})
- func (ip *Packet) ID() string
- func (ip *Packet) Tag(k string) string
- func (ip *Packet) Tags() map[string]string
- type Sink
Constants ¶
const (
// Version is the FlowBase version in string format
Version = "0.2.0"
)
Variables ¶
var ( // Trace is a log handler for extremely detailed level logs. It is so far // sparely used in flowbase. Trace *log.Logger // Debug is a log handler for debugging level logs Debug *log.Logger // Info is a log handler for information level logs Info *log.Logger // Audit is a log handler for audit level logs Audit *log.Logger // Warning is a log handler for warning level logs Warning *log.Logger // Error is a log handler for error level logs Error *log.Logger )
var (
BUFSIZE = 128
)
Functions ¶
func CheckWithMsg ¶ added in v0.2.0
CheckWithMsg checks the error err, and prints both the original error message, and a custom one provided in errMsg
func Fail ¶ added in v0.2.0
func Fail(vs ...interface{})
Fail logs the error message, so that it will be possible to improve error messages in one place
func Failf ¶ added in v0.2.0
func Failf(msg string, vs ...interface{})
Failf is like Fail but with msg being a formatter string for the message and vs being items to format into the message
func InitLog ¶
func InitLog( traceHandle io.Writer, debugHandle io.Writer, infoHandle io.Writer, auditHandle io.Writer, warningHandle io.Writer, errorHandle io.Writer)
InitLog initiates logging handlers
func InitLogAuditToFile ¶ added in v0.2.0
func InitLogAuditToFile(filePath string)
InitLogAuditToFile initiate logging with level=AUDIT, and write that to fileName
Types ¶
type AuditInfo ¶ added in v0.2.0
type AuditInfo struct { ID string ProcessName string Command string Params map[string]string Tags map[string]string StartTime time.Time FinishTime time.Time ExecTimeNS time.Duration OutFiles map[string]string Upstream map[string]*AuditInfo }
AuditInfo contains structured audit/provenance logging information for a particular task (invocation), to go with all outgoing IPs from that task
func NewAuditInfo ¶ added in v0.2.0
func NewAuditInfo() *AuditInfo
NewAuditInfo returns a new AuditInfo struct
type BaseProcess ¶ added in v0.2.0
type BaseProcess struct {
// contains filtered or unexported fields
}
BaseProcess provides a skeleton for processes, such as the main Process component, and the custom components in the flowbase/components library
func NewBaseProcess ¶ added in v0.2.0
func NewBaseProcess(net *Network, name string) BaseProcess
NewBaseProcess returns a new BaseProcess, connected to the provided workflow, and with the name name
func (*BaseProcess) Audit ¶ added in v0.2.0
func (p *BaseProcess) Audit(msg interface{})
func (*BaseProcess) Auditf ¶ added in v0.2.0
func (p *BaseProcess) Auditf(msg string, parts ...interface{})
func (*BaseProcess) CloseOutPorts ¶ added in v0.2.0
func (p *BaseProcess) CloseOutPorts()
CloseOutPorts closes all (normal) out-ports
func (*BaseProcess) DeleteInPort ¶ added in v0.2.0
func (p *BaseProcess) DeleteInPort(portName string)
DeleteInPort deletes an InPort object from the process
func (*BaseProcess) DeleteOutPort ¶ added in v0.2.0
func (p *BaseProcess) DeleteOutPort(portName string)
DeleteOutPort deletes a OutPort object from the process
func (*BaseProcess) Fail ¶ added in v0.2.0
func (p *BaseProcess) Fail(msg interface{})
Fail fails with a message that includes the process name
func (*BaseProcess) Failf ¶ added in v0.2.0
func (p *BaseProcess) Failf(msg string, parts ...interface{})
Failf fails with a message that includes the process name
func (*BaseProcess) InPort ¶ added in v0.2.0
func (p *BaseProcess) InPort(portName string) *InPort
InPort returns the in-port with name portName
func (*BaseProcess) InPorts ¶ added in v0.2.0
func (p *BaseProcess) InPorts() map[string]*InPort
InPorts returns a map of all the in-ports of the process, keyed by their names
func (*BaseProcess) InitInPort ¶ added in v0.2.0
func (p *BaseProcess) InitInPort(node Node, portName string)
InitInPort adds the in-port port to the process, with name portName
func (*BaseProcess) InitOutPort ¶ added in v0.2.0
func (p *BaseProcess) InitOutPort(node Node, portName string)
InitOutPort adds the out-port port to the process, with name portName
func (*BaseProcess) Name ¶ added in v0.2.0
func (p *BaseProcess) Name() string
Name returns the name of the process
func (*BaseProcess) Network ¶ added in v0.2.0
func (p *BaseProcess) Network() *Network
Network returns the workflow the process is connected to
func (*BaseProcess) OutPort ¶ added in v0.2.0
func (p *BaseProcess) OutPort(portName string) *OutPort
OutPort returns the out-port with name portName
func (*BaseProcess) OutPorts ¶ added in v0.2.0
func (p *BaseProcess) OutPorts() map[string]*OutPort
OutPorts returns a map of all the out-ports of the process, keyed by their names
func (*BaseProcess) Ready ¶ added in v0.2.0
func (p *BaseProcess) Ready() (isReady bool)
Ready checks whether all the process' ports are connected
type IP ¶ added in v0.2.0
type IP interface {
ID() string
}
IP Is the base interface which all other IPs need to adhere to
type InPort ¶ added in v0.2.0
type InPort struct { Chan chan *Packet RemotePorts map[string]*OutPort // contains filtered or unexported fields }
InPort represents a pluggable connection to multiple out-ports from other processes, from its own process, and with which it is communicating via channels under the hood
func (*InPort) AddRemotePort ¶ added in v0.2.0
AddRemotePort adds a remote OutPort to the InPort
func (*InPort) CloseConnection ¶ added in v0.2.0
CloseConnection closes the connection to the remote out-port with name rptName, on the InPort
func (*InPort) Disconnect ¶ added in v0.2.0
Disconnect disconnects the (out-)port with name rptName, from the InPort
func (*InPort) Fail ¶ added in v0.2.0
func (pt *InPort) Fail(msg interface{})
Fail fails with a message that includes the process name
func (*InPort) Send ¶ added in v0.2.0
Send sends IPs to the in-port, and is supposed to be called from the remote (out-) port, to send to this in-port
func (*InPort) SetProcess ¶ added in v0.2.0
SetProcess sets the process of the port to p
type Network ¶ added in v0.2.0
type Network struct { PlotConf NetworkPlotConf // contains filtered or unexported fields }
Network is the centerpiece of the functionality in FlowBase, and is a container for a pipeline of processes making up a workflow. It has various methods for coordination the execution of the pipeline as a whole, such as keeping track of the maxiumum number of concurrent tasks, as well as helper methods for creating new processes, that automatically gets plugged in to the workflow on creation
func NewNetwork ¶ added in v0.2.0
func NewNetworkWithCustomLogFile ¶ added in v0.2.0
NewNetworkCustomLogFile returns a new Network, with
func NewNetworkWithFileLogging ¶ added in v0.2.0
func NewNetworkWithMaxTasks ¶ added in v0.2.0
func (*Network) AddProc ¶ added in v0.2.0
AddProc adds a Process to the workflow, to be run when the workflow runs
func (*Network) AddProcs ¶ added in v0.2.0
AddProcs takes one or many Processes and adds them to the workflow, to be run when the workflow runs.
func (*Network) DecConcurrentTasks ¶ added in v0.2.0
DecConcurrentTasks decreases the conter for how many concurrent tasks are currently running in the workflow
func (*Network) DotGraph ¶ added in v0.2.0
DotGraph generates a graph description in DOT format (See https://en.wikipedia.org/wiki/DOT_%28graph_description_language%29) If Network.PlotConf.EdgeLabels is set to true, a label containing the in-port and out-port to which edges are connected to, will be printed.
func (*Network) IncConcurrentTasks ¶ added in v0.2.0
IncConcurrentTasks increases the conter for how many concurrent tasks are currently running in the workflow
func (*Network) PlotGraphPDF ¶ added in v0.2.0
PlotGraphPDF writes the workflow structure to a dot file, and also runs the graphviz dot command to produce a PDF file (requires graphviz, with the dot command, installed on the system)
func (*Network) Proc ¶ added in v0.2.0
Proc returns the process with name procName from the workflow
func (*Network) Procs ¶ added in v0.2.0
Procs returns a map of all processes keyed by their names in the workflow
func (*Network) ProcsSorted ¶ added in v0.2.0
ProcsSorted returns the processes of the workflow, in an array, sorted by the process names
func (*Network) Run ¶ added in v0.2.0
func (net *Network) Run()
Run runs all the processes of the workflow
func (*Network) RunTo ¶ added in v0.2.0
RunTo runs all processes upstream of, and including, the process with names provided as arguments
func (*Network) RunToProcs ¶ added in v0.2.0
RunToProcs runs all processes upstream of, and including, the process strucs provided as arguments
func (*Network) RunToRegex ¶ added in v0.2.0
RunToRegex runs all processes upstream of, and including, the process whose name matches any of the provided regexp patterns
type NetworkPlotConf ¶ added in v0.2.0
type NetworkPlotConf struct {
EdgeLabels bool
}
NetworkPlotConf contains configuraiton for plotting the workflow as a graph with graphviz
type Node ¶ added in v0.2.0
type Node interface { Name() string InPorts() map[string]*InPort OutPorts() map[string]*OutPort Ready() bool Run() Fail(interface{}) Failf(string, ...interface{}) }
Node is an interface for processes to be handled by Network
type OutPort ¶ added in v0.2.0
OutPort represents a pluggable connection to multiple in-ports from other processes, from its own process, and with which it is communicating via channels under the hood
func NewOutPort ¶ added in v0.2.0
NewOutPort returns a new OutPort struct
func (*OutPort) AddRemotePort ¶ added in v0.2.0
AddRemotePort adds a remote InPort to the OutPort
func (*OutPort) Close ¶ added in v0.2.0
func (pt *OutPort) Close()
Close closes the connection between this port and all the ports it is connected to. If this port is the last connected port to an in-port, that in-ports channel will also be closed.
func (*OutPort) Disconnect ¶ added in v0.2.0
Disconnect disconnects the (in-)port with name rptName, from the OutPort
func (*OutPort) Fail ¶ added in v0.2.0
func (pt *OutPort) Fail(msg interface{})
Fail fails with a message that includes the process name
func (*OutPort) Send ¶ added in v0.2.0
Send sends an Packet to all the in-ports connected to the OutPort
func (*OutPort) SetProcess ¶ added in v0.2.0
SetProcess sets the process of the port to p
type Packet ¶ added in v0.2.0
type Packet struct {
// contains filtered or unexported fields
}
Packet contains foundational functionality which all IPs need to implement. It is meant to be embedded into other IP implementations.
type Sink ¶
type Sink struct {
BaseProcess
}
Sink is a simple component that just receives IPs on its In-port without doing anything with them. It is used to drive pipelines of processes