engine

package
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 1, 2026 License: AGPL-3.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const (
	PortCtrl  = "ctrl"
	PortTrue  = "true"
	PortFalse = "false"
)
View Source
const (
	// SrcDeclared is the source ID for user-declared variables.
	SrcDeclared = "declared"
	// SrcFnArg is the source ID reserved for function-argument references.
	// References with this source ID resolve against a function Call's arguments rather than against any scope variable.
	SrcFnArg = "fnarg"
)

Well-known source IDs used in context variable lookups and expression references.

View Source
const StateIdle = ""
View Source
const SubBufSize = 64

SubBufSize is the buffer size used in subscription channels. Events are dropped when this buffer size is exceeded.

Variables

This section is empty.

Functions

func ApplyDeclaration

func ApplyDeclaration(s *Scope, nodeID string, od workflow.OutputDeclaration, val expr.Value) error

ApplyDeclaration stores a value into the scope according to the declaration mode.

func ApplyOutput

func ApplyOutput(s *Scope, nodeID, slotID string, binding workflow.OutputBinding, val expr.Value) error

ApplyOutput stores a value into the scope according to the binding mode.

func FilterEmitted

func FilterEmitted(raw map[string]workflow.DataType, bindings map[string]workflow.OutputBinding) map[string]workflow.DataType

FilterEmitted filters raw declared output slots to only those whose binding is emit-mode (or unbound, which defaults to emit). Used by EmitsVariables implementations to produce the seedable slot map.

func HeartbeatLoop

func HeartbeatLoop(ctx context.Context, lc Supervisor, address string, cfg RetryConfig)

HeartbeatLoop ticks cfg.Interval and posts one heartbeat per tick. Returns when ctx is canceled; failed ticks log at warn and continue.

func RegisterNodeOutputs

func RegisterNodeOutputs(scp *Scope, em Emitter)

RegisterNodeOutputs declares zero values for an emitter node's outputs into the scope.

func RegisterWithRetry

func RegisterWithRetry(ctx context.Context, lc Supervisor, reg AgentRegistration, cfg RetryConfig)

RegisterWithRetry calls lc.Register until success or ctx cancellation. Each attempt runs under its own cfg.AttemptTimeout.

Types

type ADCConfig

type ADCConfig struct {
	Device string `json:"device"`
}

type AgentRegistration

type AgentRegistration struct {
	Address  string
	Status   AgentStatus
	Manifest *DeviceManifest
	Error    *string
}

AgentRegistration is the per-boot state passed to Supervisor.Register.

type AgentStatus

type AgentStatus string

AgentStatus is the boot outcome reported through Supervisor.Register.

const (
	StatusOnline    AgentStatus = "online"
	StatusBootError AgentStatus = "booterror"
)

type BranchingNode

type BranchingNode struct {
	// contains filtered or unexported fields
}

BranchingNode is a base for a node that can decide between multiple state transitions per port (e.g. LLM agent)

func NewBranchingNode

func NewBranchingNode(id string) BranchingNode

NewBranchingNode creates a new BranchingNode

func (*BranchingNode) AddTransition

func (b *BranchingNode) AddTransition(port string, tr Transition) error

func (*BranchingNode) ID

func (b *BranchingNode) ID() string

func (*BranchingNode) Transitions

func (b *BranchingNode) Transitions(port string) []Transition

type BuildFunc

type BuildFunc func(ctx context.Context, wf *workflow.Workflow, dm DeploymentMapping, ext *ExternalResources) (*Runner, error)

BuildFunc builds a Runner from a binding-free workflow, the deploy mapping that binds its resources, and the resolved external-resource configs. Injected at Engine construction to avoid import cycle

type DACConfig

type DACConfig struct {
	Device string `json:"device"`
}

type DeploymentMapping

type DeploymentMapping map[string]ResourceBinding

DeploymentMapping binds a binding-free workflow's logical resource ids to concrete platform resources for one deploy, keyed by workflow resource id. Mirrors the engineapi wire shape.

type DeviceManifest

type DeviceManifest struct {
	GPIOs   map[string]GPIOConfig   `json:"gpios,omitempty"`
	ADCs    map[string]ADCConfig    `json:"adcs,omitempty"`
	DACs    map[string]DACConfig    `json:"dacs,omitempty"`
	Serials map[string]SerialConfig `json:"serials,omitempty"`
	PWMs    map[string]PWMConfig    `json:"pwms,omitempty"`
}

DeviceManifest is the hardware the engine opens drivers for, keyed by driver instance ID. JSON tags match the fh-backend wire shape.

type Emitter

type Emitter interface {
	Wirable
	// Outputs returns the output a node does actually emit (bindingMode = emit)
	Outputs() map[string]workflow.DataType
}

Emitter marks nodes that can emit variables to a scope

type Engine

type Engine struct {
	Secret  string    // shared with backend; used as Authorization bearer for /deploy + /stop
	Builder BuildFunc // constructs a Runner in /deploy from a workflow + network manifest
	// contains filtered or unexported fields
}

Engine is the long-lived host for one workflow Runner. It owns runner lifecycle (start/stop/swap on /deploy and /stop) and the HTTP surface that drives that lifecycle.

func (*Engine) Deploy

Deploy stops any running workflow and starts the new one.

func (*Engine) IsRunning

func (e *Engine) IsRunning() bool

IsRunning reports whether a workflow is currently running.

func (*Engine) Stop

func (e *Engine) Stop()

Stop tears down the currently running workflow and releases its transports. Safe to call when idle.

type Event

type Event struct {
	TargetState string       // Node ID to transition to
	Apply       func(*Scope) // Optional function to apply event data into the runner's scope
}

Event is produced by a Trigger and consumed by the runner's state loop.

type Executable

type Executable interface {
	Wirable
	Execute(ctx context.Context, scope *Scope) (nextState string, err error)
}

Executable is implemented by action nodes that run on the state-runner goroutine.

type ExternalResources

type ExternalResources struct {
	MQTTs     map[string]MQTTConnection
	Providers map[string]LLMProviderConfig
}

ExternalResources holds the resolved, deploy-delivered configs for a workflow's non-device external resources, keyed by the platform resource id the DeploymentMapping points at. The engine builds transports from MQTTs and per-deploy LLM providers from Providers (the connection for each declared custom/self-hosted model).

type Function

type Function struct {
	Info              workflow.FunctionInfo
	DeclaredVars      []workflow.Variable            // function-local declared variables to seed into the function scope at call time
	InitialState      string                         // entry node id (from OnFunctionCall's outgoing edge)
	Actions           map[string]Executable          // action nodes, keyed by node id
	OutputAssignments map[string]workflow.Expression // return uid → expression evaluated in callee scope at end
}

Function is a compiled, callable sub-workflow. Synchronous, no triggers.

func (*Function) Call

func (f *Function) Call(ctx context.Context, args map[string]expr.Value) (map[string]expr.Value, error)

Call runs the function in a fresh FunctionScope and returns the computed return values keyed by return uid.

type GPIOConfig

type GPIOConfig struct {
	Chip string `json:"chip"`
}

type HasSetup

type HasSetup interface {
	Setup(ctx context.Context) error
}

HasSetup is implemented by nodes that need ctx-bound, fallible initialization

type LLMProviderConfig added in v1.0.2

type LLMProviderConfig struct {
	URL    string
	APIKey string
	Model  string
}

LLMProviderConfig is the resolved connection to a self-hosted/custom LLM endpoint the llmproxy doesn't ship. The declared workflow model supplies the id and capabilities; this supplies how to reach it. Model is the optional upstream model name the endpoint serves (defaults to the workflow model id).

type LinearNode

type LinearNode struct {
	// contains filtered or unexported fields
}

LinearNode is embedded by nodes with at most one target per port

func NewLinearNode

func NewLinearNode(id string) LinearNode

NewLinearNode creates a new LinearNode

func (*LinearNode) AddTransition

func (b *LinearNode) AddTransition(port string, tr Transition) error

func (*LinearNode) ID

func (b *LinearNode) ID() string

func (*LinearNode) Next

func (b *LinearNode) Next(port string, scope *Scope) (string, error)

Next applies the outgoing transition's side effects (e.g. AgentTask prompt evaluation) against the scope and returns the target node ID. Returns StateIdle with a nil error when no transition is wired to the port.

type LlmClient

type LlmClient interface {
	Chat(ctx context.Context, req *llmproxy.ChatRequest) (*llmproxy.ChatResponse, error)
}

LlmClient is the external service for language model calls.

type MQTTConnection

type MQTTConnection struct {
	BrokerURL       string    `json:"brokerUrl"`
	ClientID        string    `json:"clientId,omitempty"`
	Username        string    `json:"username,omitempty"`
	Password        string    `json:"password,omitempty"`
	PublishPrefix   string    `json:"publishPrefix,omitempty"`
	SubscribePrefix string    `json:"subscribePrefix,omitempty"`
	Will            *MQTTWill `json:"will,omitempty"`
}

type MQTTWill

type MQTTWill struct {
	Topic   string `json:"topic"`
	Payload string `json:"payload"`
	Qos     int    `json:"qos"`
	Retain  bool   `json:"retain"`
}

type MemorySync added in v1.0.2

type MemorySync interface {
	Hydrate(ctx context.Context) ([]workflow.MemoryFile, error)
	Push(ctx context.Context, uid, content string) error
}

MemorySync is the OPTIONAL remote mirror for agent memory. The Manager owns local filesystem persistence unconditionally; when a MemorySync is configured it hydrates from the mirror on a cold start (empty local copy) and pushes every local write back. nil → local-only: no hydration, no mirroring. fh-backend adapter: HTTP. Push is best-effort — a mirror failure must not fail the agent's local write.

type MissingFieldError

type MissingFieldError struct {
	NodeID string
	Field  string
}

MissingFieldError signals a required workflow field was absent at build time.

func (*MissingFieldError) Error

func (e *MissingFieldError) Error() string

type PWMConfig

type PWMConfig struct {
	Chip string `json:"chip"`
}

type RAGQueryParams

type RAGQueryParams struct {
	CollectionID string
	Query        string
	TopK         int
}

RAGQueryParams is a similarity-search request issued through a Retriever.

type RAGQueryResult

type RAGQueryResult struct {
	ChunkID    string
	DocumentID string
	Content    string
	Score      float64
}

RAGQueryResult is one ranked chunk returned by a Retriever.

type ResourceBinding

type ResourceBinding struct {
	Ref   string `json:"ref"`
	Index *int   `json:"index,omitempty"`
}

ResourceBinding is how one workflow resource binds to the environment. Ref is the shared platform resource it points at (driver instance id in the boot DeviceManifest, or external resource id in ExternalResources); the engine picks the pool by the workflow resource's type. Index is the optional per-channel physical sub-address within that resource (GPIO line, or ADC/PWM/ DAC channel number); nil for UART/MQTT/memory/model.

type Retriever

type Retriever interface {
	QueryRAG(ctx context.Context, params RAGQueryParams) ([]RAGQueryResult, error)
}

Retriever is the external service for retrieval-augmented generation.

type RetryConfig

type RetryConfig struct {
	AttemptTimeout time.Duration
	Interval       time.Duration
}

RetryConfig tunes RegisterWithRetry and HeartbeatLoop. Interval is the wait between Register retries and the tick cadence for Heartbeat.

type Runner

type Runner struct {
	Scope        *Scope
	Nodes        map[string]Executable
	Triggers     map[string]Trigger
	InitialState string
	Transports   TransportRegistry // released by Run's defer chain on ctx cancellation
}

func (*Runner) Run

func (r *Runner) Run(ctx context.Context) error

Run starts all trigger goroutines and the state-runner loop. One iteration = one node execution or one event consumed. Runs indefinitely until ctx is cancelled by the caller. On exit (ctx cancellation), every owned resource is released — triggers via their individual Close, transports via Registry.CloseAll. The single ctx is the only lifecycle handle the caller needs.

type Scope

type Scope struct {
	Vars map[string]expr.Value // Holds variable sof all sources, separated by srcID

	Conversation llmproxy.InputItems
	// contains filtered or unexported fields
}

Scope holds runtime variables and conversation state for workflow execution. Scope is single-threaded: after Setup, only the state-runner modifies scope. Cross-goroutine delivery to trigger goroutines happens through the channels Subscribe() returns

func NewFunctionScope

func NewFunctionScope(declaredVars []workflow.Variable, args map[string]expr.Value) (*Scope, error)

NewFunctionScope creates an isolated scope for function execution, with the given arguments pre-seeded under SrcFnArg and initialized declared variables.

func NewMainScope

func NewMainScope(declaredVars []workflow.Variable) (*Scope, error)

NewMainScope creates a scope for the main workflow, initialized with the given declared variables.

func (*Scope) GetConversation

func (s *Scope) GetConversation() llmproxy.InputItems

func (*Scope) Resolve

func (s *Scope) Resolve(ref workflow.Reference) (expr.Value, error)

Resolve implements expr.VarResolver.

func (*Scope) Set

func (s *Scope) Set(srcId, varId string, v expr.Value)

Set updates the value of a variable in the scope and notifies subscribers.

func (*Scope) SetConversation

func (s *Scope) SetConversation(in llmproxy.Input)

func (*Scope) Subscribe

func (s *Scope) Subscribe(srcId, varId string) <-chan expr.Value

Subscribe returns a buffered channel that receives a value every time the variable is Set. Only subscribe during Setup, or you'll race the state-runner

type SerialConfig

type SerialConfig struct {
	Port string `json:"device"`
	Baud int    `json:"baud,omitempty"`
}

type Supervisor added in v1.0.2

type Supervisor interface {
	Register(ctx context.Context, reg AgentRegistration) error
	Heartbeat(ctx context.Context, address string) error
}

Supervisor is the external receiver of lifecycle events from the engine.

type ToolNode

type ToolNode struct {
	// contains filtered or unexported fields
}

ToolNode is embedded by tool-only nodes that never participate in the state machine

func NewToolNode

func NewToolNode(id string) ToolNode

NewToolNode creates a new ToolNode

func (*ToolNode) AddTransition

func (b *ToolNode) AddTransition(port string, _ Transition) error

func (*ToolNode) ID

func (b *ToolNode) ID() string

type ToolProvider

type ToolProvider interface {
	Tools() ([]llmproxy.FunctionTool, error)
}

ToolProvider marks nodes that can be exposed as LLM tools to agent nodes. A single ToolProvider may contribute one or more LLM-callable function tools. Descriptions live on the implementing node; either hardcoded, or carried as a node argument.

type Transition

type Transition struct {
	TargetID    string
	EdgeType    workflow.EdgeType
	Prompt      *workflow.Expression
	Description *string
}

Transition carries the metadata needed by a branching node to describe one of its possible outgoing transitions to an LLM.

func (Transition) Apply

func (tr Transition) Apply(scope *Scope) error

Apply runs the edge-type-specific side effect against the scope before the state machine moves on.

type TransportRegistry

type TransportRegistry interface {
	CloseAll() error
}

Runner is the workflow graph interpreter. One Runner executes one workflow and owns the per-deploy transport connections it was built against. Construct via build/ package. Run releases every owned resource via its defer chain on ctx cancellation TransportRegistry is the per-deploy set of transports the Runner owns and releases on shutdown. *transport.Registry satisfies it; kept as an interface so package engine does not import package transport (which would cycle now that transport depends on engine domain types).

type Trigger

type Trigger interface {
	Wirable

	// Wait blocks until the trigger fires or ctx cancels, then returns the
	// event or error.
	Wait(ctx context.Context) (Event, error)

	// Close releases resources on shutdown. Runner calls this even if Setup
	// failed partway, so implementations should guard nil fields.
	Close() error
}

Trigger is the contract for nodes that produce events from their own goroutine. The runner constructs one goroutine per Trigger and drives the lifecycle.

type TriggerNode

type TriggerNode struct {
	// contains filtered or unexported fields
}

TriggerNode is the common embed for every trigger

func NewTriggerNode

func NewTriggerNode(id string) TriggerNode

NewTriggerNode creates a new TriggerNode

func (*TriggerNode) AddTransition

func (b *TriggerNode) AddTransition(_ string, tr Transition) error

func (*TriggerNode) ID

func (b *TriggerNode) ID() string

func (*TriggerNode) Target

func (b *TriggerNode) Target() string

type Wirable

type Wirable interface {
	ID() string
	AddTransition(port string, tr Transition) error
}

Wirable is the basic wiring contract every workflow object (action or trigger) satisfies: it has an ID and can accept outgoing edges.

Directories

Path Synopsis
Package backend is the engine-side HTTP client for everything the engine needs from fh-backend: agent registration, log ingestion, LLM chat, RAG queries.
Package backend is the engine-side HTTP client for everything the engine needs from fh-backend: agent registration, log ingestion, LLM chat, RAG queries.
internal/httpclient
Package httpclient is a minimal JSON HTTP client vendored into the engine so the fh-backend capability implementation has no dependency on the closed fh-backend module.
Package httpclient is a minimal JSON HTTP client vendored into the engine so the fh-backend capability implementation has no dependency on the closed fh-backend module.
Package channel defines the engine's workflow-level handles to external resources — hardware drivers (GPIO, ADC, UART, ...) and network protocols (MQTT, future HTTP).
Package channel defines the engine's workflow-level handles to external resources — hardware drivers (GPIO, ADC, UART, ...) and network protocols (MQTT, future HTTP).
Package driver is the OS-level abstraction for I/O resources.
Package driver is the OS-level abstraction for I/O resources.
Package memory manages the engine's local copy of an agent's declared memory files.
Package memory manages the engine's local copy of an agent's declared memory files.
Package rag provides a local implementation to satisfy Retriever interface of the engine.
Package rag provides a local implementation to satisfy Retriever interface of the engine.
Package transport is the protocol-level abstraction for network resources the engine talks to.
Package transport is the protocol-level abstraction for network resources the engine talks to.
Package websearch hosts the web search provider abstraction used by the engine's WebSearchTool node.
Package websearch hosts the web search provider abstraction used by the engine's WebSearchTool node.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL