flow

package module
v0.0.0-...-eb10bae Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 23, 2017 License: MIT Imports: 10 Imported by: 0

README

GoFlow - Dataflow and Flow-based programming library for Go (golang)

This is quite a minimalistic implementation of Flow-based programming and several other concurrent models in Go programming language that aims at designing applications as graphs of components which react to data that flows through the graph.

The main properties of the proposed model are:

  • Concurrent - graph nodes run in parallel.
  • Structural - applications are described as components, their ports and connections between them.
  • Reactive/active - system's behavior is how components react to events or how they handle their lifecycle.
  • Asynchronous/synchronous - there is no determined order in which events happen, unless you demand for such order.
  • Isolated - sharing is done by communication, state is not shared.

Getting started

Current version of the library requires a latest stable Go release. If you don't have the Go compiler installed, read the official Go install guide.

Use go tool to install the package in your packages tree:

go get github.com/trustmaster/goflow

Then you can use it in import section of your Go programs:

import "github.com/trustmaster/goflow"

Basic Example

Below there is a listing of a simple program running a network of two processes.

Greeter example diagram

This first one generates greetings for given names, the second one prints them on screen. It demonstrates how components and graphs are defined and how they are embedded into the main program.

package main

import (
	"fmt"
	"github.com/trustmaster/goflow"
)

// A component that generates greetings
type Greeter struct {
	flow.Component               // component "superclass" embedded
	Name           <-chan string // input port
	Res            chan<- string // output port
}

// Reaction to a new name input
func (g *Greeter) OnName(name string) {
	greeting := fmt.Sprintf("Hello, %s!", name)
	// send the greeting to the output port
	g.Res <- greeting
}

// A component that prints its input on screen
type Printer struct {
	flow.Component
	Line <-chan string // inport
}

// Prints a line when it gets it
func (p *Printer) OnLine(line string) {
	fmt.Println(line)
}

// Our greeting network
type GreetingApp struct {
	flow.Graph               // graph "superclass" embedded
}

// Graph constructor and structure definition
func NewGreetingApp() *GreetingApp {
	n := new(GreetingApp) // creates the object in heap
	n.InitGraphState()    // allocates memory for the graph
	// Add processes to the network
	n.Add(new(Greeter), "greeter")
	n.Add(new(Printer), "printer")
	// Connect them with a channel
	n.Connect("greeter", "Res", "printer", "Line")
	// Our net has 1 inport mapped to greeter.Name
	n.MapInPort("In", "greeter", "Name")
	return n
}

func main() {
	// Create the network
	net := NewGreetingApp()
	// We need a channel to talk to it
	in := make(chan string)
	net.SetInPort("In", in)
	// Run the net
	flow.RunNet(net)
	// Now we can send some names and see what happens
	in <- "John"
	in <- "Boris"
	in <- "Hanna"
	// Close the input to shut the network down
	close(in)
	// Wait until the app has done its job
	<-net.Wait()
}

Looks a bit heavy for such a simple task but FBP is aimed at a bit more complex things than just printing on screen. So in more complex an realistic examples the infractructure pays the price.

You probably have one question left even after reading the comments in code: why do we need to wait for the finish signal? This is because flow-based world is asynchronous and while you expect things to happen in the same sequence as they are in main(), during runtime they don't necessarily follow the same order and the application might terminate before the network has done its job. To avoid this confusion we listen for a signal on network's Wait() channel which is closed when the network finishes its job.

Terminology

Here are some Flow-based programming terms used in GoFlow:

  • Component - the basic element that processes data. Its structure consists of input and output ports and state fields. Its behavior is the set of event handlers. In OOP terms Component is a Class.
  • Connection - a link between 2 ports in the graph. In Go it is a channel of specific type.
  • Graph - components and connections between them, forming a higher level entity. Graphs may represent composite components or entire applications. In OOP terms Graph is a Class.
  • Network - is a Graph instance running in memory. In OOP terms a Network is an object of Graph class.
  • Port - is a property of a Component or Graph through which it communicates with the outer world. There are input ports (Inports) and output ports (Outports). For GoFlow components it is a channel field.
  • Process - is a Component instance running in memory. In OOP terms a Process is an object of Component class.

More terms can be found in flowbased terms and FBP wiki.

Documentation

Contents
  1. Components
    1. Ports, Events and Handlers
    2. Processes and their lifetime
    3. State
    4. Concurrency
    5. Internal state and Thread-safety
  2. Graphs
    1. Structure definition
    2. Behavior
Package docs

Documentation for the flow package can be accessed using standard godoc tool, e.g.

godoc github.com/trustmaster/goflow

More examples

  • GoChat, a simple chat in Go using this library

Here are related projects and resources:

TODO

  • Integration with NoFlo-UI/Flowhub (in progress)
  • Distributed networks via TCP/IP and UDP
  • Reflection and monitoring of networks

Documentation

Overview

The flow package is a framework for Flow-based programming in Go.

Index

Constants

View Source
const (
	// ComponentModeUndefined stands for a fallback component mode (Async).
	ComponentModeUndefined = iota
	// ComponentModeAsync stands for asynchronous functioning mode.
	ComponentModeAsync
	// ComponentModeSync stands for synchronous functioning mode.
	ComponentModeSync
	// ComponentModePool stands for async functioning with a fixed pool.
	ComponentModePool
)
View Source
const DefaultRegistryCapacity = 64

DefaultRegistryCapacity is the capacity component registry is initialized with.

Variables

ComponentRegistry is used to register components and spawn processes given just a string component name.

View Source
var DefaultBufferSize = 0

DefaultBufferSize is the default channel buffer capacity.

View Source
var DefaultComponentMode = ComponentModeAsync

DefaultComponentMode is the preselected functioning mode of all components being run.

View Source
var DefaultNetworkCapacity = 32

DefaultNetworkCapacity is the default capacity of network's processes/ports maps.

View Source
var DefaultNetworkPortsNum = 16

Default network output or input ports number

Functions

func Annotate

func Annotate(componentName string, info ComponentInfo) bool

Annotate sets component information utilized by runtimes and FBP protocol clients. Recommended fields are: Description and Icon. Other fields are infered by the runtime itself.

func Factory

func Factory(componentName string) interface{}

Factory creates a new instance of a component registered under a specific name.

func NewGraph

func NewGraph() interface{}

NewGraph creates a new canvas graph that can be modified at run-time. Implements ComponentConstructor interace, so can it be used with Factory.

func Register

func Register(componentName string, constructor ComponentConstructor) bool

Register registers a component so that it can be instantiated at run-time using component Factory. It returns true on success or false if component name is already taken.

func RegisterJSON

func RegisterJSON(componentName, filePath string) bool

RegisterJSON registers an external JSON graph definition as a component that can be instantiated at run-time using component Factory. It returns true on success or false if component name is already taken.

func RunNet

func RunNet(i interface{})

RunNet runs the network by starting all of its processes. It runs Init/Finish handlers if the network implements Initializable/Finalizable interfaces.

func RunProc

func RunProc(c interface{}) bool

RunProc runs event handling loop on component ports. It returns true on success or panics with error message and returns false on error.

func StopProc

func StopProc(c interface{}) bool

StopProc terminates the process if it is running. It doesn't close any in or out ports of the process, so it can be replaced without side effects.

func Unregister

func Unregister(componentName string) bool

Unregister removes a component with a given name from the component registry and returns true or returns false if no such component is registered.

func UpdateComponentInfo

func UpdateComponentInfo(componentName string) bool

UpdateComponentInfo extracts run-time information about a component and its ports. It is called when an FBP protocol client requests component information.

Types

type Canvas

type Canvas struct {
	Graph
}

Canvas is a generic graph that is manipulated at run-time only

type Component

type Component struct {
	// Is running flag indicates that the process is currently running.
	IsRunning bool
	// Net is a pointer to network to inform it when the process is started and over
	// or to change its structure at run time.
	Net *Graph
	// Mode is component's functioning mode.
	Mode int8
	// PoolSize is used to define pool size when using ComponentModePool.
	PoolSize uint8
	// Term chan is used to terminate the process immediately without closing
	// any channels.
	Term chan struct{}
}

Component is a generic flow component that has to be contained in concrete components. It stores network-specific information.

type ComponentConstructor

type ComponentConstructor func() interface{}

ComponentConstructor is a function that can be registered in the ComponentRegistry so that it is used when creating new processes of a specific component using Factory function at run-time.

type ComponentEntry

type ComponentEntry struct {
	// Constructor is a function that creates a component instance.
	// It is required for the factory to add components at run-time.
	Constructor ComponentConstructor
	// Run-time component description
	Info ComponentInfo
}

ComponentEntry contains runtime information about a component

type ComponentInfo

type ComponentInfo struct {
	Name        string     `json:"name"`
	Description string     `json:"description"`
	Icon        string     `json:"icon"`
	Subgraph    bool       `json:"subgraph"`
	InPorts     []PortInfo `json:"inPorts"`
	OutPorts    []PortInfo `json:"outPorts"`
}

ComponentInfo represents a component to a protocol client

type Finalizable

type Finalizable interface {
	Finish()
}

Finalizable is the interface implemented by components/graphs with extra finalization code.

type Graph

type Graph struct {

	// Net is a pointer to parent network.
	Net *Graph
	// contains filtered or unexported fields
}

Graph represents a graph of processes connected with packet channels.

func LoadJSON

func LoadJSON(filename string) *Graph

LoadJSON loads a JSON graph definition file into a flow.Graph object that can be run or used in other networks

func ParseJSON

func ParseJSON(js []byte) *Graph

ParseJSON converts a JSON network definition string into a flow.Graph object that can be run or used in other networks

func (*Graph) Add

func (n *Graph) Add(c interface{}, name string) bool

Add adds a new process with a given name to the network. It returns true on success or panics and returns false on error.

func (*Graph) AddGraph

func (n *Graph) AddGraph(name string) bool

AddGraph adds a new blank graph instance to a network. That instance can be modified then at run-time.

func (*Graph) AddIIP

func (n *Graph) AddIIP(data interface{}, processName, portName string) bool

AddIIP adds an Initial Information packet to the network

func (*Graph) AddNew

func (n *Graph) AddNew(componentName string, processName string) bool

AddNew creates a new process instance using component factory and adds it to the network.

func (*Graph) AnnotateInPort

func (n *Graph) AnnotateInPort(name string, info PortInfo) bool

AnnotateInPort sets optional run-time annotation for the port utilized by runtimes and FBP protocol clients.

func (*Graph) AnnotateOutPort

func (n *Graph) AnnotateOutPort(name string, info PortInfo) bool

AnnotateOutPort sets optional run-time annotation for the port utilized by runtimes and FBP protocol clients.

func (*Graph) Connect

func (n *Graph) Connect(senderName, senderPort, receiverName, receiverPort string) bool

Connect connects a sender to a receiver and creates a channel between them using DefaultBufferSize. Normally such a connection is unbuffered but you can change by setting flow.DefaultBufferSize > 0 or by using ConnectBuf() function instead. It returns true on success or panics and returns false if error occurs.

func (*Graph) ConnectBuf

func (n *Graph) ConnectBuf(senderName, senderPort, receiverName, receiverPort string, bufferSize int) bool

Connect connects a sender to a receiver using a channel with a buffer of a given size. It returns true on success or panics and returns false if error occurs.

func (*Graph) DecSendChanRefCount

func (n *Graph) DecSendChanRefCount(c reflect.Value) bool

Decrements SendChanRefCount It returns true if the RefCount has reached 0

func (*Graph) Disconnect

func (n *Graph) Disconnect(senderName, senderPort, receiverName, receiverPort string) bool

Disconnect removes a connection between sender's outport and receiver's inport.

func (*Graph) Get

func (n *Graph) Get(processName string) interface{}

Get returns a node contained in the network by its name.

func (*Graph) IncSendChanRefCount

func (n *Graph) IncSendChanRefCount(c reflect.Value)

Increments SendChanRefCount

func (*Graph) InitGraphState

func (n *Graph) InitGraphState()

InitGraphState method initializes graph fields and allocates memory.

func (*Graph) MapInPort

func (n *Graph) MapInPort(name, procName, procPort string) bool

MapInPort adds an inport to the net and maps it to a contained proc's port. It returns true on success or panics and returns false on error.

func (*Graph) MapOutPort

func (n *Graph) MapOutPort(name, procName, procPort string) bool

MapOutPort adds an outport to the net and maps it to a contained proc's port. It returns true on success or panics and returns false on error.

func (*Graph) Ready

func (n *Graph) Ready() <-chan struct{}

Ready returns a channel that can be used to suspend the caller goroutine until the network is ready to accept input packets

func (*Graph) Remove

func (n *Graph) Remove(processName string) bool

Remove deletes a process from the graph. First it stops the process if running. Then it disconnects it from other processes and removes the connections from the graph. Then it drops the process itself.

func (*Graph) RemoveIIP

func (n *Graph) RemoveIIP(processName, portName string) bool

RemoveIIP detaches an IIP from specific process and port

func (*Graph) Rename

func (n *Graph) Rename(processName, newName string) bool

Rename changes a process name in all connections, external ports, IIPs and the graph itself.

func (*Graph) RenameInPort

func (n *Graph) RenameInPort(oldName, newName string) bool

RenameInPort changes graph's inport name

func (*Graph) RenameOutPort

func (n *Graph) RenameOutPort(oldName, newName string) bool

RenameOutPort changes graph's outport name

func (*Graph) RunProc

func (n *Graph) RunProc(procName string) bool

RunProc starts a proc added to a net at run time

func (*Graph) SetInPort

func (n *Graph) SetInPort(name string, channel interface{}) bool

SetInPort assigns a channel to a network's inport to talk to the outer world. It returns true on success or false if the inport cannot be set.

func (*Graph) SetOutPort

func (n *Graph) SetOutPort(name string, channel interface{}) bool

SetOutPort assigns a channel to a network's outport to talk to the outer world. It returns true on success or false if the outport cannot be set.

func (*Graph) Stop

func (n *Graph) Stop()

Stop terminates the network without closing any connections

func (*Graph) StopProc

func (n *Graph) StopProc(procName string) bool

StopProc stops a specific process in the net

func (*Graph) UnmapInPort

func (n *Graph) UnmapInPort(name string) bool

UnmapInPort removes an existing inport mapping

func (*Graph) UnmapOutPort

func (n *Graph) UnmapOutPort(name string) bool

UnmapOutPort removes an existing outport mapping

func (*Graph) UnsetInPort

func (n *Graph) UnsetInPort(name string) bool

UnsetInPort removes an external inport from the graph

func (*Graph) UnsetOutPort

func (n *Graph) UnsetOutPort(name string) bool

UnsetOutPort removes an external outport from the graph

func (*Graph) Wait

func (n *Graph) Wait() <-chan struct{}

Wait returns a channel that can be used to suspend the caller goroutine until the network finishes its job

type Initializable

type Initializable interface {
	Init()
}

Initalizable is the interface implemented by components/graphs with custom initialization code.

type Looper

type Looper interface {
	Loop()
}

Looper is a long-running process which actively receives data from its ports using a Loop function

type Message

type Message struct {
	// Protocol is NoFlo protocol identifier:
	// "runtime", "component", "graph" or "network"
	Protocol string `json:"protocol"`
	// Command is a command to be executed within the protocol
	Command string `json:"command"`
	// Payload is JSON-encoded body of the message
	Payload interface{} `json:"payload"`
}

Message represents a single FBP protocol message

type PortInfo

type PortInfo struct {
	Id          string        `json:"id"`
	Type        string        `json:"type"`
	Description string        `json:"description"`
	Addressable bool          `json:"addressable"` // ignored
	Required    bool          `json:"required"`
	Values      []interface{} `json:"values"`  // ignored
	Default     interface{}   `json:"default"` // ignored
}

PortInfo represents a port to a runtime client

type Runtime

type Runtime struct {
	// contains filtered or unexported fields
}

Runtime is a NoFlo-compatible runtime implementing the FBP protocol

func (*Runtime) Handle

func (r *Runtime) Handle(w http.ResponseWriter, req *http.Request)

func (*Runtime) Id

func (r *Runtime) Id() string

Id returns runtime's UUID v4

func (*Runtime) Init

func (r *Runtime) Init(name string)

Register command handlers

func (*Runtime) Listen

func (r *Runtime) Listen(address string)

func (*Runtime) Ready

func (r *Runtime) Ready() chan struct{}

Ready returns a channel which is closed when the runtime is ready to work

func (*Runtime) Stop

func (r *Runtime) Stop()

Stop tells the runtime to shut down

type Shutdowner

type Shutdowner interface {
	Shutdown()
}

Shutdowner is the interface implemented by components overriding default Shutdown() behavior.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL