Documentation
¶
Overview ¶
Package fgbase layers a ready-send flow mechanism on top of goroutines. https://github.com/vectaport/fgbase/wiki
Index ¶
- Constants
- Variables
- func AddFire(n *Node) error
- func ArrayFire(n *Node) error
- func ConfigByFlag(defaults map[string]interface{})
- func ConstFire(n *Node) error
- func CopySlice(d interface{}) interface{}
- func DivFire(n *Node) error
- func EqualsTest(n *Node, a, b interface{}) bool
- func Index(d interface{}, i int) interface{}
- func Int(a interface{}) int
- func IsEOF(v interface{}) (eof bool)
- func IsInt(d interface{}) bool
- func IsNada(d interface{}) bool
- func IsPtr(d interface{}) bool
- func IsSlice(d interface{}) bool
- func IsStruct(d interface{}) bool
- func IsZero(d interface{}) bool
- func Len(d interface{}) int
- func MakeGraph(sze, szn int) ([]Edge, []Node)
- func ModFire(n *Node) error
- func MulFire(n *Node) error
- func NameEdges(edges []Edge, names []string)
- func OutputDot(nodes []*Node)
- func OutputGml(nodes []*Node)
- func ParseDatum(s string) interface{}
- func Promote(n *Node, a, b interface{}) (aBig, bBig interface{}, same bool)
- func RdyFire(n *Node) error
- func ResolvePtr(d interface{}) interface{}
- func RunAll(nodes []Node)
- func RunGraph(nodes []*Node)
- func SinkFire(n *Node) error
- func SteercFire(n *Node) error
- func SteercRdy(n *Node) bool
- func SteervFire(n *Node) error
- func SteervRdy(n *Node) bool
- func String(d interface{}) string
- func StringSlice(d interface{}) string
- func StringStruct(d interface{}) string
- func StringsToMap(strings []string) map[string]int
- func SubFire(n *Node) error
- func TimeSinceStart() float64
- func ZeroTest(a interface{}) bool
- type Edge
- func (e *Edge) CloseData()
- func (e *Edge) Const(d interface{})
- func (e *Edge) Disconnect(n *Node)
- func (e *Edge) DotAttrs() []string
- func (e *Edge) DstCnt() int
- func (e *Edge) DstJSON(n *Node, portString string)
- func (e *Edge) DstNode(i int) *Node
- func (e *Edge) DstOrder(n *Node) int
- func (e *Edge) DstPut(v interface{})
- func (e *Edge) DstRdy(n *Node) bool
- func (e *Edge) Dump()
- func (e *Edge) DumpEdgeNodes()
- func (e *Edge) IsConst() bool
- func (e *Edge) IsSink() bool
- func (e *Edge) PoolEdge(src *Edge) *Edge
- func (e *Edge) RdyZero() bool
- func (e *Edge) Same(e2 *Edge) bool
- func (e *Edge) SendAck(n *Node) bool
- func (e *Edge) SendData(n *Node) bool
- func (e *Edge) SetDotAttrs(attrs []string)
- func (e *Edge) SetName(name string)
- func (e *Edge) Sink()
- func (e *Edge) SrcCnt() int
- func (e *Edge) SrcGet() interface{}
- func (e *Edge) SrcJSON(n *Node, portString string)
- func (e *Edge) SrcNode(i int) *Node
- func (e *Edge) SrcRdy(n *Node) bool
- type Error
- type Inter
- type Node
- func FuncAdd(a, b, x Edge) Node
- func FuncArbit(a, b, x Edge) Node
- func FuncArray(x Edge, arr []interface{}) Node
- func FuncCSVI(x []Edge, r io.Reader, enums map[string]interface{}) Node
- func FuncCSVO(a []Edge, r io.Reader, enums map[string]interface{}) Node
- func FuncConst(x Edge, c interface{}) Node
- func FuncDecrypt(a Edge, x Edge, localPrivateKey, remotePublicKey *[32]byte, nonce [24]byte) Node
- func FuncDiv(a, b, x Edge) Node
- func FuncDst(a Edge, rw io.ReadWriter) Node
- func FuncEither(a, b, x Edge) Node
- func FuncEncrypt(a Edge, x Edge, localPrivateKey, remotePublicKey *[32]byte, nonce [24]byte) Node
- func FuncFork(a, x, y Edge) Node
- func FuncLsh(a, b, x Edge) Node
- func FuncMod(a, b, x Edge) Node
- func FuncMul(a, b, x Edge) Node
- func FuncPass(a, x Edge) Node
- func FuncPrint(a Edge, x Edge, format string) Node
- func FuncRdy(a, b, x Edge) Node
- func FuncRead(x Edge, r io.Reader) Node
- func FuncReduce(a, x Edge, reducer func(n *Node, datum, collection interface{}) interface{}, ...) Node
- func FuncRsh(a, b, x Edge) Node
- func FuncSink(a Edge) Node
- func FuncSrc(x Edge, rw io.ReadWriter) Node
- func FuncSteerc(a, x, y Edge) Node
- func FuncSteerv(a, b, x, y Edge) Node
- func FuncSub(a, b, x Edge) Node
- func FuncWrite(a Edge, w io.Writer) Node
- func MakeNode(name string, srcs, dsts []*Edge, ready NodeRdy, fire NodeFire) Node
- func MakeNodes(sz int) []Node
- func (n *Node) AckWrap(d interface{}, ack chan struct{}) interface{}
- func (n *Node) DefaultRdyFunc() bool
- func (n *Node) DefaultRunFunc() error
- func (n *Node) DotAttr() string
- func (n *Node) Dst(i int) *Edge
- func (n *Node) DstAppend(e *Edge)
- func (n *Node) DstByName(name string) *Edge
- func (n *Node) DstCnt() int
- func (n *Node) DstNames() []string
- func (n *Node) DstSet(i int, e *Edge)
- func (n *Node) FindDst(name string) (*Edge, bool)
- func (n *Node) FindDstIndex(name string) (int, bool)
- func (n *Node) FindSrc(name string) (*Edge, bool)
- func (n *Node) FindSrcIndex(name string) (int, bool)
- func (n *Node) Fire() error
- func (n *Node) Init()
- func (n *Node) IsPool() bool
- func (n *Node) Link(in, ex *Edge)
- func (n *Node) LogError(format string, v ...interface{})
- func (n *Node) Panicf(format string, v ...interface{})
- func (n *Node) RdyAll() (rdy bool)
- func (n *Node) Recursed() bool
- func (n *Node) RecvOne() (recvOK bool)
- func (n *Node) RemoveInputCase(e *Edge)
- func (n *Node) Run() error
- func (n *Node) SendAll() bool
- func (n *Node) SetDotAttr(attr string)
- func (n *Node) SetDstNames(name ...string)
- func (n *Node) SetDstNum(num int)
- func (n *Node) SetSrcNames(name ...string)
- func (n *Node) SetSrcNum(num int)
- func (n *Node) Src(i int) *Edge
- func (n *Node) SrcAppend(e *Edge)
- func (n *Node) SrcByName(name string) *Edge
- func (n *Node) SrcCnt() int
- func (n *Node) SrcNames() []string
- func (n *Node) SrcSet(i int, e *Edge)
- func (n *Node) String() string
- func (n *Node) TraceValRdy()
- func (n *Node) TraceVals()
- func (n *Node) Tracef(format string, v ...interface{})
- type NodeFire
- type NodeRdy
- type NodeRun
- type Pool
- type RecursiveSort
- type SinkStats
- type Sinker
- type TraceLevelType
- type ZeroTester
Constants ¶
const ( Old int = iota New )
const EOF = Error("EOF")
Variables ¶
var ChannelSize = 1
Buffer size for every channel.
var DotOutput = false
DotOutput is Graphviz .dot output
var EdgeID int64
EgdeID is a unique edge id.
var GlobalStats = false
GlobalStats will enable global flowgraph stats for uncommon debug-purposes. Otherwise it should be left false because it requires a global mutex.
var GmlOutput = false
GMLOutput is gml output
var NodeID int64
NodeID is a unique node id.
var RunTime time.Duration = -1
RunTime is the duration to run this flowgraph.
var StartTime time.Time
When the flowgraph started running.
var StderrLog = log.New(os.Stderr, "", 0)
Log for collecting error messages.
var StdoutLog = log.New(os.Stdout, "", 0)
Log for tracing flowgraph execution.
var TraceFireCnt = true
TraceFireCnt is the number of node executions.
var TraceIndent = false
TraceIndent is trace by Node id tabs.
var TraceLevel = Q
TraceLevel enables tracing, writes to StdoutLog if TraceLevel>Q.
var TraceLevels = map[string]TraceLevelType{ "QQ": QQ, "Q": Q, "V": V, "VV": VV, "VVV": VVV, "VVVV": VVVV, }
Map from string to enum for trace flag checking.
var TracePointer = false
Trace Node pointer.
var TracePorts = false
TracePorts by name using dot notation
var TraceSeconds = false
TraceSeconds that have elapsed.
var TraceStyle = Old
TraceStyle is Old or New
var TraceTypes = false
TraceTypes in full detail, including common types.
Functions ¶
func ConfigByFlag ¶
func ConfigByFlag(defaults map[string]interface{})
ConfigByFlag initializes a standard set of command line arguments for flowgraph utilities, while at the same time parsing all other flags. Use the defaults argument to override default settings for ncore, chanz, sec, trace, trsec, trtyp, trport, summ, dot, and gml. Use -help to see the standard set.
func CopySlice ¶
func CopySlice(d interface{}) interface{}
CopySlice returns a copy of a slice from an empty interface (as an empty interface).
func EqualsTest ¶
EqualsTest returns true if two empty interfaces have the same numeric or string value.
func Index ¶
func Index(d interface{}, i int) interface{}
Index returns the nth element of an empty interface (interface{}) that is a slice.
func IsInt ¶
func IsInt(d interface{}) bool
IsInt returns true if empty interface (interface{}) is an int.
func IsNada ¶
func IsNada(d interface{}) bool
IsNada tests if a interface{} (an empty interface) is a struct{} (an empty struct)
func IsPtr ¶
func IsPtr(d interface{}) bool
IsPtr returns true if empty interface (interface{}) is a pointer
func IsSlice ¶
func IsSlice(d interface{}) bool
IsSlice returns true if empty interface (interface{}) is a slice.
func IsStruct ¶
func IsStruct(d interface{}) bool
IsStruct returns true if empty interface (interface{}) is a struct.
func IsZero ¶
func IsZero(d interface{}) bool
IsZero returns true if a interface{} has a golang zero value
func Len ¶
func Len(d interface{}) int
Len returns the length of an empty interface (interface{}) if it is a slice.
func OutputGml ¶
func OutputGml(nodes []*Node)
OutputGml outputs .gml graph modeling language format
func ParseDatum ¶
func ParseDatum(s string) interface{}
ParseDatum parses a string for numeric constants, otherwise returns the string.
func ResolvePtr ¶
func ResolvePtr(d interface{}) interface{}
ResolvePtr resolves an empty interface which is a pointer
func RunAll ¶
func RunAll(nodes []Node)
RunAll calls Run for each Node, and times out after RunTime.
func RunGraph ¶
func RunGraph(nodes []*Node)
RunGraph calls Run for each *Node, and times out after RunTime.
func String ¶
func String(d interface{}) string
String returns a string representation of a interface{} with ellipse shortened slices if TraceLevel<VVVV.
func StringSlice ¶
func StringSlice(d interface{}) string
StringSlice returns a string representation of a slice, ellipse shortened if TraceLevel<VVVV.
func StringStruct ¶
func StringStruct(d interface{}) string
StringStruct returns a string representation of a struct with ellipse shortened slices if TraceLevel<VVVV.
func StringsToMap ¶
StringsToMap converts []string into map[string]int.
func TimeSinceStart ¶
func TimeSinceStart() float64
TimeSinceStart returns time since start of running flowgraph.
Types ¶
type Edge ¶
type Edge struct {
// values shared by upstream and downstream Nodes
Name string // for trace
Data *[]chan interface{} // slice of data channels
Ack chan struct{} // request (or acknowledge) channel
// values unique to upstream and downstream Nodes
Val interface{} // generic empty interface
RdyCnt int // readiness of I/O
Flow bool // set true to allow one output, data or ack
Ack2 chan struct{} // alternate channel for ack steering
// contains filtered or unexported fields
}
Edge of a flowgraph.
func (*Edge) Const ¶
func (e *Edge) Const(d interface{})
Const sets up an Edge to provide a constant value.
func (*Edge) DotAttrs ¶
DotAttrs returns the attribute strings used for outputting this edge in dot format
func (*Edge) DstPut ¶
func (e *Edge) DstPut(v interface{})
DstPut sets the empty interface value flowing to the output Edge
func (*Edge) DumpEdgeNodes ¶
func (e *Edge) DumpEdgeNodes()
DumpEdgeNodes dumps all the edgeNodes for this Edge
func (*Edge) SetDotAttrs ¶
SetDotAttrs set the attribute string used for outputting this edge in dot format If more than one they are spread across dot edges.
func (*Edge) SrcGet ¶
func (e *Edge) SrcGet() interface{}
SrcGet returns the empty interface value flowing from the input Edge
type Node ¶
type Node struct {
ID int64 // unique id
Name string // for tracing
Cnt int64 // execution count
Srcs []*Edge // upstream Edge's
Dsts []*Edge // downstream Edge's
RdyFunc NodeRdy // func to test Edge readiness
FireFunc NodeFire // func to fire off the Node
RunFunc NodeRun // func to repeatedly run Node
Aux interface{} // auxiliary empty interface to hold state
RdyState int // state of latest readiness
Owner interface{} // owner of this node
// contains filtered or unexported fields
}
Node of a flowgraph.
func FuncCSVI ¶
FuncCSVI reads a vector of input data values from a Reader and outputs them downstream. enums is an optional map from field.enum to an empty interface.
func FuncCSVO ¶
FuncCSVO reads a vector of expected data values from a Reader and tests the against input from upstream. enums is an optional map from field.enum to an empty interface.
func FuncConst ¶
FuncConst produces a constant value (x = c). Can also be done with an Edge made const.
func FuncDecrypt ¶
FuncDecrypt decrypts a buffer of byte data
func FuncDst ¶
func FuncDst(a Edge, rw io.ReadWriter) Node
FuncDst writes data and waits for an acknowledging '\n'.
func FuncEither ¶
FuncEither passes on one of two values (if =a { x = a } else { x = b }).
func FuncEncrypt ¶
FuncEncrypt encrypts a buffer of byte data
func FuncFork ¶
FuncFork sends a value two ways (x = a; y = a). If the value is a slice it is duplicated onto the second output.
func FuncReduce ¶
func FuncReduce(a, x Edge, reducer func(n *Node, datum, collection interface{}) interface{}, freerun bool) Node
FuncReduce reduces a stream of data into a single empty interface.
func FuncSrc ¶
func FuncSrc(x Edge, rw io.ReadWriter) Node
FuncSrc reads a data value and writes a '\n' acknowledgement.
func FuncSteerc ¶
FuncSteerc steers a condition one of two ways (if a==0 { x = a } else { y = a }).
func FuncSteerv ¶
FuncSteerv steers the second value by the first (if a==0 { x = b } else { y = b }).
func MakeNode ¶
MakeNode returns a new Node with slices of input and output Edge's and functions for testing readiness then firing.
func (*Node) AckWrap ¶
func (n *Node) AckWrap(d interface{}, ack chan struct{}) interface{}
AckWrap bundles a Node pointer, and an ack channel with an empty interface, in order to pass information about an upstream node downstream. Used for acking back in a Pool.
func (*Node) DefaultRdyFunc ¶
DefaultRdyFunc tests for everything ready.
func (*Node) DefaultRunFunc ¶
DefaultRunFunc is the default run func.
func (*Node) DotAttr ¶
DotAttr returns the attribute string used for outputting this node in dot format
func (*Node) FindDstIndex ¶
FindDstIndex returns index of outgoing edge by name
func (*Node) FindSrcIndex ¶
FindSrcIndex returns index of incoming edge by name
func (*Node) Init ¶
func (n *Node) Init()
Init initializes node internals after edges have been added
func (*Node) Recursed ¶
Recursed returns true if a Node from the same Pool is upstream of this Node.
func (*Node) RemoveInputCase ¶
RemoveInputCase removes the input of a Node from the select switch. It is restored after RdyAll.
func (*Node) SetDotAttr ¶
SetDotAttr set the attribute string used for outputting this node in dot format
func (*Node) SetDstNames ¶
SetDstNames names the outgoing edges
func (*Node) SetSrcNames ¶
SetSrcNames names the incoming edges
func (*Node) TraceValRdy ¶
func (n *Node) TraceValRdy()
TraceValRdy lists Node input values and output readiness when TraceLevel >= VVV.
type NodeFire ¶
NodeFire is the function signature for executing a Node. Any error message should be written using Node.LogError and nil written to any output Edge.
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool of Node's
func MakePool ¶
func MakePool( size int, name string, srcs, dsts []Edge, ready NodeRdy, fire NodeFire, recurse, spread bool) *Pool
MakePool returns a Pool of Nodes that share both data channels and the source ack channel.
type RecursiveSort ¶
type RecursiveSort interface {
sort.Interface
// SubSlice returns a sub-slice.
SubSlice(n, m int) interface{}
// Slice returns current slice.
Slice() []int
// Original returns original slice
Original() []int
// Depth returns the depth of a recursive sort
Depth() int64
// ID returns a unique ID for the original slice.
ID() int64
}
RecursiveSort extends sort.Interface for recursive sorting.
type Sinker ¶
type Sinker interface {
Sink(source []interface{})
}
Sinker consumes wavefronts of values one at a time forever
type TraceLevelType ¶
type TraceLevelType int
Trace level constants.
const ( QQ TraceLevelType = iota // ultra-quiet for minimal stats Q // quiet, default V // trace Node execution VV // trace channel IO VVV // trace state before select VVVV // full-length array dumps )
func (TraceLevelType) String ¶
func (t TraceLevelType) String() string
String method for TraceLevelType
type ZeroTester ¶
type ZeroTester interface {
ZeroTest() bool
}
ZeroTester tests if an empty interface represents zero
Source Files
¶
- datum.go
- edge.go
- fgbase.go
- func_add.go
- func_arbit.go
- func_array.go
- func_const.go
- func_csvi.go
- func_csvo.go
- func_decrypt.go
- func_div.go
- func_dst.go
- func_either.go
- func_encrypt.go
- func_fork.go
- func_lsh.go
- func_map.go
- func_mod.go
- func_mul.go
- func_pass.go
- func_print.go
- func_qsort.go
- func_rdy.go
- func_read.go
- func_reduce.go
- func_rsh.go
- func_sink.go
- func_src.go
- func_steerc.go
- func_steerv.go
- func_sub.go
- func_write.go
- node.go
- pool.go
- promote.go
- sort.go
Directories
¶
| Path | Synopsis |
|---|---|
|
Package imglab extends the fgbase package with imaging primitives.
|
Package imglab extends the fgbase package with imaging primitives. |
|
Package regexp builds regular expressions out of scalable flowgraphs
|
Package regexp builds regular expressions out of scalable flowgraphs |
|
Package weblab extends the fgbase package with web and http primitives.
|
Package weblab extends the fgbase package with web and http primitives. |