flowbase

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 27, 2022 License: MIT Imports: 16 Imported by: 5

README

FlowBase

A Flow-based Programming (FBP) micro-framework for Go (Golang).

The aim of FlowBase, as opposed to being a full-blown framework, is to provide just enough functionality on top of the existing FBP-like primives in Golang (channels with bounded buffers, asynchronous go-routines), to enable developing data processing applications with it. Thus the term "FBP micro-framework".

The pattern has previously been described in the following blog posts on GopherAcademy:

Status: The current version is an enhanced version of the above pattern, based on functionality subsequently developed in the SciPipe library.

Installation

First, make sure you have initialized a go module in your library, with:

go mod init <name-of-package>

(Note that this could be e.g go mod init github.com/yourusername/yourpackage, or just a local name such as youpackage)

Then, install the latest version of the flowbase library to your module:

go get github.com/flowbase/flowbase@latest

Usage

For a code example, see the examples folder here in the repo.

More code examples

Note: The below code currently uses an earlier version of the FlowBase library.

For a real-world example, see this code

defining an app to transform from semantic RDF data to wiki pages in MediaWiki XML format (the network connection code is highlighted, to help you find the interesting parts quick :) ).

Libraries based on FlowBase

  • RDF2SMW - A tool to convert RDF triples to a Semantic MediaWiki XML import file
  • FlowBase - A Scientific Workflow engine library (actually not formally built on FlowBase any more)

References

Other Go FBP frameworks

  • GoFBP - FBP framework by FBP inventor, following the original FBP principles closely
  • GoFlow - The first production grade Go FBP framework
  • Cascades

Even more Go FBP (like) frameworks

Seemingly less mature and/or well-known...

Documentation

Index

Constants

View Source
const (
	// Version is the FlowBase version in string format
	Version = "0.2.0"
)

Variables

View Source
var (
	// Trace is a log handler for extremely detailed level logs. It is so far
	// sparely used in flowbase.
	Trace *log.Logger
	// Debug is a log handler for debugging level logs
	Debug *log.Logger
	// Info is a log handler for information level logs
	Info *log.Logger
	// Audit is a log handler for audit level logs
	Audit *log.Logger
	// Warning is a log handler for warning level logs
	Warning *log.Logger
	// Error is a log handler for error level logs
	Error *log.Logger
)
View Source
var (
	BUFSIZE = 128
)

Functions

func Check

func Check(err error)

Check checks the error err, and prints the message in the error

func CheckWithMsg added in v0.2.0

func CheckWithMsg(err error, errMsg string)

CheckWithMsg checks the error err, and prints both the original error message, and a custom one provided in errMsg

func ExecCmd

func ExecCmd(cmd string) string

ExecCmd executes the command cmd, as a shell command via bash

func Fail added in v0.2.0

func Fail(vs ...interface{})

Fail logs the error message, so that it will be possible to improve error messages in one place

func Failf added in v0.2.0

func Failf(msg string, vs ...interface{})

Failf is like Fail but with msg being a formatter string for the message and vs being items to format into the message

func InitLog

func InitLog(
	traceHandle io.Writer,
	debugHandle io.Writer,
	infoHandle io.Writer,
	auditHandle io.Writer,
	warningHandle io.Writer,
	errorHandle io.Writer)

InitLog initiates logging handlers

func InitLogAudit

func InitLogAudit()

InitLogAudit initiate logging with level=AUDIT

func InitLogAuditToFile added in v0.2.0

func InitLogAuditToFile(filePath string)

InitLogAuditToFile initiate logging with level=AUDIT, and write that to fileName

func InitLogDebug

func InitLogDebug()

InitLogDebug initiates logging with level=DEBUG

func InitLogError

func InitLogError()

InitLogError initiates logging with level=ERROR

func InitLogInfo

func InitLogInfo()

InitLogInfo initiates logging with level=INFO

func InitLogWarning

func InitLogWarning()

InitLogWarning initiates logging with level=WARNING

Types

type AuditInfo added in v0.2.0

type AuditInfo struct {
	ID          string
	ProcessName string
	Command     string
	Params      map[string]string
	Tags        map[string]string
	StartTime   time.Time
	FinishTime  time.Time
	ExecTimeNS  time.Duration
	OutFiles    map[string]string
	Upstream    map[string]*AuditInfo
}

AuditInfo contains structured audit/provenance logging information for a particular task (invocation), to go with all outgoing IPs from that task

func NewAuditInfo added in v0.2.0

func NewAuditInfo() *AuditInfo

NewAuditInfo returns a new AuditInfo struct

type BaseProcess added in v0.2.0

type BaseProcess struct {
	// contains filtered or unexported fields
}

BaseProcess provides a skeleton for processes, such as the main Process component, and the custom components in the flowbase/components library

func NewBaseProcess added in v0.2.0

func NewBaseProcess(net *Network, name string) BaseProcess

NewBaseProcess returns a new BaseProcess, connected to the provided workflow, and with the name name

func (*BaseProcess) Audit added in v0.2.0

func (p *BaseProcess) Audit(msg interface{})

func (*BaseProcess) Auditf added in v0.2.0

func (p *BaseProcess) Auditf(msg string, parts ...interface{})

func (*BaseProcess) CloseOutPorts added in v0.2.0

func (p *BaseProcess) CloseOutPorts()

CloseOutPorts closes all (normal) out-ports

func (*BaseProcess) DeleteInPort added in v0.2.0

func (p *BaseProcess) DeleteInPort(portName string)

DeleteInPort deletes an InPort object from the process

func (*BaseProcess) DeleteOutPort added in v0.2.0

func (p *BaseProcess) DeleteOutPort(portName string)

DeleteOutPort deletes a OutPort object from the process

func (*BaseProcess) Fail added in v0.2.0

func (p *BaseProcess) Fail(msg interface{})

Fail fails with a message that includes the process name

func (*BaseProcess) Failf added in v0.2.0

func (p *BaseProcess) Failf(msg string, parts ...interface{})

Failf fails with a message that includes the process name

func (*BaseProcess) InPort added in v0.2.0

func (p *BaseProcess) InPort(portName string) *InPort

InPort returns the in-port with name portName

func (*BaseProcess) InPorts added in v0.2.0

func (p *BaseProcess) InPorts() map[string]*InPort

InPorts returns a map of all the in-ports of the process, keyed by their names

func (*BaseProcess) InitInPort added in v0.2.0

func (p *BaseProcess) InitInPort(node Node, portName string)

InitInPort adds the in-port port to the process, with name portName

func (*BaseProcess) InitOutPort added in v0.2.0

func (p *BaseProcess) InitOutPort(node Node, portName string)

InitOutPort adds the out-port port to the process, with name portName

func (*BaseProcess) Name added in v0.2.0

func (p *BaseProcess) Name() string

Name returns the name of the process

func (*BaseProcess) Network added in v0.2.0

func (p *BaseProcess) Network() *Network

Network returns the workflow the process is connected to

func (*BaseProcess) OutPort added in v0.2.0

func (p *BaseProcess) OutPort(portName string) *OutPort

OutPort returns the out-port with name portName

func (*BaseProcess) OutPorts added in v0.2.0

func (p *BaseProcess) OutPorts() map[string]*OutPort

OutPorts returns a map of all the out-ports of the process, keyed by their names

func (*BaseProcess) Ready added in v0.2.0

func (p *BaseProcess) Ready() (isReady bool)

Ready checks whether all the process' ports are connected

type IP added in v0.2.0

type IP interface {
	ID() string
}

IP Is the base interface which all other IPs need to adhere to

type InPort added in v0.2.0

type InPort struct {
	Chan chan *Packet

	RemotePorts map[string]*OutPort
	// contains filtered or unexported fields
}

InPort represents a pluggable connection to multiple out-ports from other processes, from its own process, and with which it is communicating via channels under the hood

func NewInPort added in v0.2.0

func NewInPort(name string) *InPort

NewInPort returns a new InPort struct

func (*InPort) AddRemotePort added in v0.2.0

func (pt *InPort) AddRemotePort(rpt *OutPort)

AddRemotePort adds a remote OutPort to the InPort

func (*InPort) CloseConnection added in v0.2.0

func (pt *InPort) CloseConnection(rptName string)

CloseConnection closes the connection to the remote out-port with name rptName, on the InPort

func (*InPort) Disconnect added in v0.2.0

func (pt *InPort) Disconnect(rptName string)

Disconnect disconnects the (out-)port with name rptName, from the InPort

func (*InPort) Fail added in v0.2.0

func (pt *InPort) Fail(msg interface{})

Fail fails with a message that includes the process name

func (*InPort) Failf added in v0.2.0

func (pt *InPort) Failf(msg string, parts ...interface{})

Failf fails with a message that includes the process name

func (*InPort) From added in v0.2.0

func (pt *InPort) From(rpt *OutPort)

From connects an OutPort to the InPort

func (*InPort) Name added in v0.2.0

func (pt *InPort) Name() string

Name returns the name of the InPort

func (*InPort) Process added in v0.2.0

func (pt *InPort) Process() Node

Process returns the process connected to the port

func (*InPort) Ready added in v0.2.0

func (pt *InPort) Ready() bool

Ready tells whether the port is ready or not

func (*InPort) Recv added in v0.2.0

func (pt *InPort) Recv() *Packet

Recv receives IPs from the port

func (*InPort) Send added in v0.2.0

func (pt *InPort) Send(ip *Packet)

Send sends IPs to the in-port, and is supposed to be called from the remote (out-) port, to send to this in-port

func (*InPort) SetProcess added in v0.2.0

func (pt *InPort) SetProcess(p Node)

SetProcess sets the process of the port to p

func (*InPort) SetReady added in v0.2.0

func (pt *InPort) SetReady(ready bool)

SetReady sets the ready status of the InPort

type Network added in v0.2.0

type Network struct {
	PlotConf NetworkPlotConf
	// contains filtered or unexported fields
}

Network is the centerpiece of the functionality in FlowBase, and is a container for a pipeline of processes making up a workflow. It has various methods for coordination the execution of the pipeline as a whole, such as keeping track of the maxiumum number of concurrent tasks, as well as helper methods for creating new processes, that automatically gets plugged in to the workflow on creation

func NewNetwork added in v0.2.0

func NewNetwork(name string) *Network

func NewNetworkWithCustomLogFile added in v0.2.0

func NewNetworkWithCustomLogFile(name string, maxConcurrentTasks int, logFile string) *Network

NewNetworkCustomLogFile returns a new Network, with

func NewNetworkWithFileLogging added in v0.2.0

func NewNetworkWithFileLogging(name string, maxConcurrentTasks int) *Network

func NewNetworkWithMaxTasks added in v0.2.0

func NewNetworkWithMaxTasks(name string, maxConcurrentTasks int) *Network

func (*Network) AddProc added in v0.2.0

func (net *Network) AddProc(node Node)

AddProc adds a Process to the workflow, to be run when the workflow runs

func (*Network) AddProcs added in v0.2.0

func (net *Network) AddProcs(procs ...Node)

AddProcs takes one or many Processes and adds them to the workflow, to be run when the workflow runs.

func (*Network) Auditf added in v0.2.0

func (net *Network) Auditf(msg string, parts ...interface{})

func (*Network) DecConcurrentTasks added in v0.2.0

func (net *Network) DecConcurrentTasks(slots int)

DecConcurrentTasks decreases the conter for how many concurrent tasks are currently running in the workflow

func (*Network) DotGraph added in v0.2.0

func (net *Network) DotGraph() (dot string)

DotGraph generates a graph description in DOT format (See https://en.wikipedia.org/wiki/DOT_%28graph_description_language%29) If Network.PlotConf.EdgeLabels is set to true, a label containing the in-port and out-port to which edges are connected to, will be printed.

func (*Network) Fail added in v0.2.0

func (net *Network) Fail(msg interface{})

func (*Network) Failf added in v0.2.0

func (net *Network) Failf(msg string, parts ...interface{})

func (*Network) IncConcurrentTasks added in v0.2.0

func (net *Network) IncConcurrentTasks(slots int)

IncConcurrentTasks increases the conter for how many concurrent tasks are currently running in the workflow

func (*Network) Name added in v0.2.0

func (net *Network) Name() string

Name returns the name of the workflow

func (*Network) PlotGraph added in v0.2.0

func (net *Network) PlotGraph(filePath string)

PlotGraph writes the workflow structure to a dot file

func (*Network) PlotGraphPDF added in v0.2.0

func (net *Network) PlotGraphPDF(filePath string)

PlotGraphPDF writes the workflow structure to a dot file, and also runs the graphviz dot command to produce a PDF file (requires graphviz, with the dot command, installed on the system)

func (*Network) Proc added in v0.2.0

func (net *Network) Proc(procName string) Node

Proc returns the process with name procName from the workflow

func (*Network) Procs added in v0.2.0

func (net *Network) Procs() map[string]Node

Procs returns a map of all processes keyed by their names in the workflow

func (*Network) ProcsSorted added in v0.2.0

func (net *Network) ProcsSorted() []Node

ProcsSorted returns the processes of the workflow, in an array, sorted by the process names

func (*Network) Run added in v0.2.0

func (net *Network) Run()

Run runs all the processes of the workflow

func (*Network) RunTo added in v0.2.0

func (net *Network) RunTo(finalProcNames ...string)

RunTo runs all processes upstream of, and including, the process with names provided as arguments

func (*Network) RunToProcs added in v0.2.0

func (net *Network) RunToProcs(finalProcs ...Node)

RunToProcs runs all processes upstream of, and including, the process strucs provided as arguments

func (*Network) RunToRegex added in v0.2.0

func (net *Network) RunToRegex(procNamePatterns ...string)

RunToRegex runs all processes upstream of, and including, the process whose name matches any of the provided regexp patterns

func (*Network) SetSink added in v0.2.0

func (net *Network) SetSink(sink *Sink)

SetSink sets the sink of the workflow to the provided sink process

func (*Network) Sink added in v0.2.0

func (net *Network) Sink() *Sink

Sink returns the sink process of the workflow

type NetworkPlotConf added in v0.2.0

type NetworkPlotConf struct {
	EdgeLabels bool
}

NetworkPlotConf contains configuraiton for plotting the workflow as a graph with graphviz

type Node added in v0.2.0

type Node interface {
	Name() string
	InPorts() map[string]*InPort
	OutPorts() map[string]*OutPort
	Ready() bool
	Run()
	Fail(interface{})
	Failf(string, ...interface{})
}

Node is an interface for processes to be handled by Network

type OutPort added in v0.2.0

type OutPort struct {
	RemotePorts map[string]*InPort
	// contains filtered or unexported fields
}

OutPort represents a pluggable connection to multiple in-ports from other processes, from its own process, and with which it is communicating via channels under the hood

func NewOutPort added in v0.2.0

func NewOutPort(name string) *OutPort

NewOutPort returns a new OutPort struct

func (*OutPort) AddRemotePort added in v0.2.0

func (pt *OutPort) AddRemotePort(rpt *InPort)

AddRemotePort adds a remote InPort to the OutPort

func (*OutPort) Close added in v0.2.0

func (pt *OutPort) Close()

Close closes the connection between this port and all the ports it is connected to. If this port is the last connected port to an in-port, that in-ports channel will also be closed.

func (*OutPort) Disconnect added in v0.2.0

func (pt *OutPort) Disconnect(rptName string)

Disconnect disconnects the (in-)port with name rptName, from the OutPort

func (*OutPort) Fail added in v0.2.0

func (pt *OutPort) Fail(msg interface{})

Fail fails with a message that includes the process name

func (*OutPort) Failf added in v0.2.0

func (pt *OutPort) Failf(msg string, parts ...interface{})

Failf fails with a message that includes the process name

func (*OutPort) Name added in v0.2.0

func (pt *OutPort) Name() string

Name returns the name of the OutPort

func (*OutPort) Process added in v0.2.0

func (pt *OutPort) Process() Node

Process returns the process connected to the port

func (*OutPort) Ready added in v0.2.0

func (pt *OutPort) Ready() bool

Ready tells whether the port is ready or not

func (*OutPort) Send added in v0.2.0

func (pt *OutPort) Send(data any)

Send sends an Packet to all the in-ports connected to the OutPort

func (*OutPort) SetProcess added in v0.2.0

func (pt *OutPort) SetProcess(p Node)

SetProcess sets the process of the port to p

func (*OutPort) SetReady added in v0.2.0

func (pt *OutPort) SetReady(ready bool)

SetReady sets the ready status of the OutPort

func (*OutPort) To added in v0.2.0

func (pt *OutPort) To(rpt *InPort)

To connects an InPort to the OutPort

type Packet added in v0.2.0

type Packet struct {
	// contains filtered or unexported fields
}

Packet contains foundational functionality which all IPs need to implement. It is meant to be embedded into other IP implementations.

func NewPacket added in v0.2.0

func NewPacket(data any) *Packet

NewPacket creates a new Packet

func (*Packet) AddTag added in v0.2.0

func (ip *Packet) AddTag(k string, v string)

AddTag adds the tag k with value v

func (*Packet) AddTags added in v0.2.0

func (ip *Packet) AddTags(tags map[string]string)

AddTags adds a map of tags to the IPs audit info

func (*Packet) Data added in v0.2.0

func (ip *Packet) Data() any

func (*Packet) Fail added in v0.2.0

func (ip *Packet) Fail(msg interface{})

func (*Packet) Failf added in v0.2.0

func (ip *Packet) Failf(msg string, parts ...interface{})

func (*Packet) ID added in v0.2.0

func (ip *Packet) ID() string

ID returns a globally unique ID for the IP

func (*Packet) Tag added in v0.2.0

func (ip *Packet) Tag(k string) string

Tag returns the tag for the tag with key k from the IPs audit info

func (*Packet) Tags added in v0.2.0

func (ip *Packet) Tags() map[string]string

Tags returns the audit info's tags

type Sink

type Sink struct {
	BaseProcess
}

Sink is a simple component that just receives IPs on its In-port without doing anything with them. It is used to drive pipelines of processes

func NewSink

func NewSink(net *Network, name string) *Sink

NewSink returns a new Sink component

func (*Sink) From added in v0.2.0

func (p *Sink) From(outPort *OutPort)

From connects an out-port to the sinks in-port

func (*Sink) Run

func (p *Sink) Run()

Run runs the Sink process

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL