fmesh

package module
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 29, 2025 License: MIT Imports: 9 Imported by: 12

README

f-mesh

f-mesh

Flow Based Programming inspired framework in Go

Learn more about FBP (originally discovered by @jpaulm) or read the documentation

What is it?

F-Mesh is a functions orchestrator inspired by FBP. It allows you to express your program as a mesh of interconnected components (or more formally as a computational graph).

Main concepts:

  • F-Mesh consists of Components - the main building blocks
  • Components have unlimited number of input and output Ports
  • Ports can be connected via Pipes
  • Ports and pipes are type agnostic, any data can be transferred to any port in form of Signals
  • The framework works in discrete time, not in wall time. The quant of time is 1 activation cycle, which gives you "logical parallelism" out of the box (activation function is running in "frozen time")
  • Learn more in documentation

Limitations

F-mesh is not a classical FBP implementation, it is not suited for long-running components or wall-time events (like timers and tickers)

Example:

fm := fmesh.New("hello world").
	WithComponents(
		component.New("concat").
			WithInputs("i1", "i2").
			WithOutputs("res").
			WithActivationFunc(func(this *component.Component) error {
				word1 := this.InputByName("i1").FirstSignalPayloadOrDefault("").(string)
				word2 := this.InputByName("i2").FirstSignalPayloadOrDefault("").(string)
				this.OutputByName("res").PutSignals(signal.New(word1 + word2))
				return nil
			}),
		component.New("case").
			WithInputs("i1").
			WithOutputs("res").
			WithActivationFunc(func(this *component.Component) error {
				inputString := this.InputByName("i1").FirstSignalPayloadOrDefault("").(string)
				this.OutputByName("res").PutSignals(signal.New(strings.ToTitle(inputString)))
				return nil
			}))

fm.ComponentByName("concat").OutputByName("res").PipeTo(fm.ComponentByName("case").InputByName("i1"))

// Init inputs
fm.ComponentByName("concat").InputByName("i1").PutSignals(signal.New("hello "))
fm.ComponentByName("concat").InputByName("i2").PutSignals(signal.New("world !"))

// Run the mesh
_, err := fm.Run()

// Check for errors
if err != nil {
	fmt.Println("F-Mesh returned an error")
	os.Exit(1)
}

//Extract results
results := fm.ComponentByName("case").OutputByName("res").FirstSignalPayloadOrNil()
fmt.Printf("Result is : %v", results) // Result is : HELLO WORLD !

See more in examples repo.

Documentation

Index

Constants

View Source
const (
	// UnlimitedCycles defines the maximum number of activation cycles, 0 means no limit
	UnlimitedCycles = 0
	// UnlimitedTime defines the maximum duration F-Mesh can run before being forcefully stopped, 0 means no limit
	UnlimitedTime = 0
)

Variables

View Source
var (
	// ErrHitAnErrorOrPanic is returned when f-mesh hit an error or panic and will be stopped
	ErrHitAnErrorOrPanic = errors.New("f-mesh hit an error or panic and will be stopped")
	// ErrHitAPanic is returned when f-mesh hit a panic and will be stopped
	ErrHitAPanic = errors.New("f-mesh hit a panic and will be stopped")
	// ErrUnsupportedErrorHandlingStrategy is returned when an unsupported error handling strategy is used
	ErrUnsupportedErrorHandlingStrategy = errors.New("unsupported error handling strategy")
	// ErrReachedMaxAllowedCycles is returned when the maximum number of allowed cycles is reached
	ErrReachedMaxAllowedCycles = errors.New("reached max allowed cycles")
	// ErrTimeLimitExceeded is returned when the time limit is exceeded
	ErrTimeLimitExceeded = errors.New("time limit exceeded")

	// ErrFailedToDrain is returned when failed to drain
	ErrFailedToDrain = errors.New("failed to drain")
)

Functions

This section is empty.

Types

type Config

type Config struct {
	// ErrorHandlingStrategy defines how f-mesh will handle errors and panics
	ErrorHandlingStrategy ErrorHandlingStrategy

	// Debug enables debug mode, which logs additional detailed information for troubleshooting and analysis.
	Debug bool

	Logger *log.Logger

	// CyclesLimit defines max number of activation cycles, 0 means no limit
	CyclesLimit int

	// TimeLimit defines the maximum duration F-Mesh can run before being forcefully stopped.
	// A value of 0 disables the time constraint, allowing indefinite execution.
	TimeLimit time.Duration
}

Config defines the configuration for the f-mesh

type ErrorHandlingStrategy

type ErrorHandlingStrategy int

ErrorHandlingStrategy defines the strategy for handling errors in run-time

const (
	// StopOnFirstErrorOrPanic stops the f-mesh on first error or panic
	StopOnFirstErrorOrPanic ErrorHandlingStrategy = iota

	// StopOnFirstPanic ignores errors, but stops the f-mesh on first panic
	StopOnFirstPanic

	// IgnoreAll allows to continue running the f-mesh regardless of how components finish their activation functions
	IgnoreAll
)

type FMesh

type FMesh struct {
	common.NamedEntity
	common.DescribedEntity
	*common.Chainable
	// contains filtered or unexported fields
}

FMesh is the functional mesh

func New

func New(name string) *FMesh

New creates a new f-mesh with default config

func NewWithConfig

func NewWithConfig(name string, config *Config) *FMesh

NewWithConfig creates a new f-mesh with custom config

func (*FMesh) ComponentByName

func (fm *FMesh) ComponentByName(name string) *component.Component

ComponentByName shortcut method

func (*FMesh) Components

func (fm *FMesh) Components() *component.Collection

Components getter

func (*FMesh) IsDebug

func (fm *FMesh) IsDebug() bool

IsDebug returns true when debug mode is enabled

func (*FMesh) LogDebug

func (fm *FMesh) LogDebug(v ...any)

LogDebug logs a debug message only when debug mode is enabled (no-op otherwise)

func (*FMesh) Logger

func (fm *FMesh) Logger() *log.Logger

Logger getter

func (*FMesh) Run

func (fm *FMesh) Run() (*RuntimeInfo, error)

Run starts the computation until there is no component which activates (mesh has no unprocessed inputs)

func (*FMesh) WithComponents

func (fm *FMesh) WithComponents(components ...*component.Component) *FMesh

WithComponents adds components to f-mesh

func (*FMesh) WithDescription

func (fm *FMesh) WithDescription(description string) *FMesh

WithDescription sets a description

func (*FMesh) WithErr

func (fm *FMesh) WithErr(err error) *FMesh

WithErr returns f-mesh with error

type RuntimeInfo

type RuntimeInfo struct {
	Cycles    *cycle.Group
	StartedAt time.Time
	StoppedAt time.Time
	Duration  time.Duration
}

RuntimeInfo contains information about the runtime of the f-mesh

Directories

Path Synopsis
experiments

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL