engine

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2026 License: MIT Imports: 59 Imported by: 0

Documentation

Overview

Emits CXDB turns from parsed CLI stream-json events, decomposing opaque conversation blobs into individual queryable turns.

Parses Claude CLI stream-json NDJSON output into structured events for decomposition into individual CXDB turns.

Index

Constants

View Source
const DefaultFileBackingThreshold = 100 * 1024

DefaultFileBackingThreshold is 100KB per spec §5.5. Artifacts below this threshold are stored in memory; above it, they are written to disk under the run's artifacts/ subdirectory.

Variables

This section is empty.

Functions

func ApplyDetectedProviders

func ApplyDetectedProviders(cfg *RunConfigFile, detected []DetectedProvider)

ApplyDetectedProviders populates cfg.LLM.Providers from auto-detected providers. Only providers not already configured are added.

func DefaultRunsBaseDir

func DefaultRunsBaseDir() string

DefaultRunsBaseDir returns the parent directory that contains all run subdirectories (one per run ID). This is the directory that list/prune commands scan.

func DelayForAttempt

func DelayForAttempt(attempt int, cfg BackoffConfig, jitterSeed string) time.Duration

func NewContextWithGraphAttrs

func NewContextWithGraphAttrs(g *model.Graph) *runtime.Context

func NewRunID

func NewRunID() (string, error)

func Prepare

func Prepare(dotSource []byte) (*model.Graph, []validate.Diagnostic, error)

Prepare parses/transforms/validates a graph.

func PrepareWithOptions

func PrepareWithOptions(dotSource []byte, opts PrepareOptions) (*model.Graph, []validate.Diagnostic, error)

func PrepareWithRegistry

func PrepareWithRegistry(dotSource []byte, reg *TransformRegistry) (*model.Graph, []validate.Diagnostic, error)

func ResolveProviderExecutable

func ResolveProviderExecutable(cfg *RunConfigFile, provider string, opts RunOptions) (string, error)

ResolveProviderExecutable applies run-level policy for provider CLI executable selection.

Types

type Answer

type Answer struct {
	Value          string
	Values         []string
	SelectedOption *Option // the full selected option (for SINGLE_SELECT); nil if not applicable
	Text           string
	TimedOut       bool
	Skipped        bool
}

type ArtifactInfo

type ArtifactInfo struct {
	ID           string    `json:"id"`
	Name         string    `json:"name"`
	SizeBytes    int64     `json:"size_bytes"`
	StoredAt     time.Time `json:"stored_at"`
	IsFileBacked bool      `json:"is_file_backed"`
	ContentHash  string    `json:"content_hash,omitempty"`
}

ArtifactInfo describes a stored artifact (spec §5.5 ArtifactInfo).

type ArtifactPolicyCheckpoint

type ArtifactPolicyCheckpoint struct {
	ExcludeGlobs []string `json:"exclude_globs,omitempty" yaml:"exclude_globs,omitempty"`
}

type ArtifactPolicyConfig

type ArtifactPolicyConfig struct {
	Profiles   []string                 `json:"profiles,omitempty" yaml:"profiles,omitempty"`
	Env        ArtifactPolicyEnv        `json:"env,omitempty" yaml:"env,omitempty"`
	Checkpoint ArtifactPolicyCheckpoint `json:"checkpoint,omitempty" yaml:"checkpoint,omitempty"`
}

type ArtifactPolicyEnv

type ArtifactPolicyEnv struct {
	ManagedRoots map[string]string            `json:"managed_roots,omitempty" yaml:"managed_roots,omitempty"`
	Overrides    map[string]map[string]string `json:"overrides,omitempty" yaml:"overrides,omitempty"`
}

type ArtifactStore

type ArtifactStore struct {
	// contains filtered or unexported fields
}

ArtifactStore provides named, typed storage for large stage outputs (spec §5.5). Thread-safe via RWMutex per the spec's ReadWriteLock requirement.

func NewArtifactStore

func NewArtifactStore(baseDir string, threshold int64) *ArtifactStore

NewArtifactStore creates a new artifact store. baseDir is the run's LogsRoot (artifacts/ subdirectory is created underneath on first file-backed write). Pass "" to disable file-backing entirely. threshold is the file-backing threshold in bytes; use DefaultFileBackingThreshold for the spec default of 100KB.

func (*ArtifactStore) Clear

func (s *ArtifactStore) Clear() int

Clear removes all artifacts, returning the count removed (spec §5.5 clear). For file-backed artifacts, the backing files are also removed.

func (*ArtifactStore) Has

func (s *ArtifactStore) Has(artifactID string) bool

Has returns whether an artifact exists (spec §5.5 has).

func (*ArtifactStore) Info

func (s *ArtifactStore) Info(artifactID string) (ArtifactInfo, bool)

Info returns the ArtifactInfo for a stored artifact, or false if not found. This is a convenience method for checking metadata without retrieving content.

func (*ArtifactStore) List

func (s *ArtifactStore) List() []ArtifactInfo

List returns all artifact infos, sorted by ID (spec §5.5 list).

func (*ArtifactStore) Remove

func (s *ArtifactStore) Remove(artifactID string) bool

Remove deletes an artifact, returning true if it existed (spec §5.5 remove). For file-backed artifacts, the backing file is also removed.

func (*ArtifactStore) Retrieve

func (s *ArtifactStore) Retrieve(artifactID string) ([]byte, error)

Retrieve returns the artifact data (spec §5.5 retrieve). For file-backed artifacts, the data is read from disk. Returns an error if the artifact does not exist.

func (*ArtifactStore) Store

func (s *ArtifactStore) Store(artifactID, name string, data []byte) (ArtifactInfo, error)

Store adds or replaces an artifact (spec §5.5 store). Data is stored in memory if size <= threshold or baseDir is empty; otherwise it is written to {baseDir}/artifacts/{artifactID}.json.

type AutoApproveInterviewer

type AutoApproveInterviewer struct{}

func (*AutoApproveInterviewer) Ask

func (*AutoApproveInterviewer) AskMultiple

func (i *AutoApproveInterviewer) AskMultiple(questions []Question) []Answer

func (*AutoApproveInterviewer) Inform

func (i *AutoApproveInterviewer) Inform(message string, stage string)

type BackendKind

type BackendKind string
const (
	BackendAPI BackendKind = "api"
	BackendCLI BackendKind = "cli"
)

type BackoffConfig

type BackoffConfig struct {
	InitialDelayMS int
	BackoffFactor  float64
	MaxDelayMS     int
	Jitter         bool
}

BackoffConfig configures retry delays. This matches the attractor-spec BackoffConfig fields.

type CXDBSink

type CXDBSink struct {
	Client *cxdb.Client
	Binary *cxdb.BinaryClient

	RunID      string
	ContextID  string
	HeadTurnID string
	BundleID   string
	// contains filtered or unexported fields
}

CXDBSink appends normalized Attractor events to a CXDB context and stores large artifacts in CXDB's blob CAS.

v1 implementation notes: - Prefers binary protocol for mutating operations; falls back to HTTP compat routes. - Serializes appends to maintain a linear head within a context.

func NewCXDBSink

func NewCXDBSink(client *cxdb.Client, binary *cxdb.BinaryClient, runID, contextID, headTurnID, bundleID string) *CXDBSink

func (*CXDBSink) Append

func (s *CXDBSink) Append(ctx context.Context, typeID string, typeVersion int, data map[string]any) (turnID string, contentHash string, err error)

func (*CXDBSink) ForkFromHead

func (s *CXDBSink) ForkFromHead(ctx context.Context) (*CXDBSink, error)

func (*CXDBSink) PutArtifactFile

func (s *CXDBSink) PutArtifactFile(ctx context.Context, nodeID, logicalName, path string) (artifactTurnID string, err error)

type CXDBStartupInfo

type CXDBStartupInfo struct {
	UIURL     string
	UIStarted bool
	Warnings  []string
	// contains filtered or unexported fields
}

type CallbackInterviewer

type CallbackInterviewer struct {
	Fn func(Question) Answer
}

func (*CallbackInterviewer) Ask

func (*CallbackInterviewer) AskMultiple

func (i *CallbackInterviewer) AskMultiple(questions []Question) []Answer

func (*CallbackInterviewer) Inform

func (i *CallbackInterviewer) Inform(message string, stage string)

type CodergenBackend

type CodergenBackend interface {
	Run(ctx context.Context, exec *Execution, node *model.Node, prompt string) (string, *runtime.Outcome, error)
}

type CodergenHandler

type CodergenHandler struct{}

func (*CodergenHandler) Execute

func (h *CodergenHandler) Execute(ctx context.Context, exec *Execution, node *model.Node) (runtime.Outcome, error)

func (*CodergenHandler) RequiresProvider

func (h *CodergenHandler) RequiresProvider() bool

RequiresProvider implements ProviderRequiringHandler. LLM nodes require an LLM provider to be configured.

func (*CodergenHandler) UsesFidelity

func (h *CodergenHandler) UsesFidelity() bool

UsesFidelity implements FidelityAwareHandler. LLM nodes need fidelity/thread resolution for context management and session reuse.

type CodergenRouter

type CodergenRouter struct {
	// contains filtered or unexported fields
}

func NewCodergenRouter

func NewCodergenRouter(cfg *RunConfigFile, catalog *modeldb.Catalog) *CodergenRouter

func NewCodergenRouterWithRuntimes

func NewCodergenRouterWithRuntimes(cfg *RunConfigFile, catalog *modeldb.Catalog, runtimes map[string]ProviderRuntime) *CodergenRouter

func (*CodergenRouter) Run

func (r *CodergenRouter) Run(ctx context.Context, exec *Execution, node *model.Node, prompt string) (string, *runtime.Outcome, error)

type ConditionalHandler

type ConditionalHandler struct{}

func (*ConditionalHandler) Execute

func (h *ConditionalHandler) Execute(ctx context.Context, exec *Execution, node *model.Node) (runtime.Outcome, error)

func (*ConditionalHandler) SkipRetry

func (h *ConditionalHandler) SkipRetry() bool

SkipRetry implements SingleExecutionHandler. Conditional nodes are pass-through routing points — retrying them burns retry budget without useful work.

type ConsoleInterviewer

type ConsoleInterviewer struct {
	In  *os.File
	Out *os.File
	// contains filtered or unexported fields
}

ConsoleInterviewer prompts on stdin/stdout. Designed for single-threaded interactive use (one human at the terminal). If concurrent callers (e.g. parallel pipeline branches) share the same ConsoleInterviewer, additional callers receive timeout responses rather than deadlocking. In non-interactive environments, prefer AutoApproveInterviewer or QueueInterviewer.

func (*ConsoleInterviewer) Ask

func (*ConsoleInterviewer) AskMultiple

func (i *ConsoleInterviewer) AskMultiple(questions []Question) []Answer

func (*ConsoleInterviewer) Inform

func (i *ConsoleInterviewer) Inform(message string, stage string)

type DetectedProvider

type DetectedProvider struct {
	Key     string
	Backend BackendKind
	APIKey  string
}

DetectedProvider describes a provider found via environment scanning.

func DetectProviders

func DetectProviders() []DetectedProvider

DetectProviders scans the environment for known API keys and returns provider configurations for each detected provider. For providers with a CLI spec, the CLI backend is preferred when the binary is on PATH.

type DiscoveredInputReference

type DiscoveredInputReference struct {
	SourceFile string             `json:"source_file"`
	Matched    string             `json:"matched_token"`
	Pattern    string             `json:"pattern"`
	Kind       InputReferenceKind `json:"kind"`
	Confidence string             `json:"confidence"`
}

type Engine

type Engine struct {
	Graph *model.Graph

	Options RunOptions

	// Original DOT input (pre-transforms), captured for replay/resume.
	DotSource []byte

	// Optional: config used to start the run (metaspec run config schema). Snapshotted to logs_root for resume.
	RunConfig *RunConfigFile

	// Resolved once per run (or restored from checkpoint on resume) so
	// artifact behavior is deterministic across retries and resumes.
	ArtifactPolicy ResolvedArtifactPolicy

	RunBranch string

	WorktreeDir string
	LogsRoot    string

	Context *runtime.Context

	Registry *HandlerRegistry

	// Backend for codergen nodes (until provider routing is wired in).
	CodergenBackend CodergenBackend

	Interviewer Interviewer

	// Optional: normalized event sink (CXDB).
	CXDB *CXDBSink

	// Artifact store for the run (spec §5.5). Initialized once per run;
	// handlers access it via Execution.Artifacts.
	Artifacts *ArtifactStore

	// Model catalog snapshot metadata (metaspec).
	ModelCatalogSHA    string
	ModelCatalogSource string
	ModelCatalogPath   string

	// Input materialization policy + inference runtime.
	InputMaterializationPolicy InputMaterializationPolicy
	InputReferenceInferer      InputReferenceInferer
	InputInferenceCache        map[string][]InferredReference
	InputSourceTargetMap       map[string]string

	Warnings []string
	// contains filtered or unexported fields
}

func (*Engine) Warn

func (e *Engine) Warn(msg string)

type Execution

type Execution struct {
	Graph       *model.Graph
	Context     *runtime.Context
	LogsRoot    string
	WorktreeDir string
	Engine      *Engine
	Artifacts   *ArtifactStore // spec §5.5: per-run artifact store
}

type ExitHandler

type ExitHandler struct{}

func (*ExitHandler) Execute

func (h *ExitHandler) Execute(ctx context.Context, exec *Execution, node *model.Node) (runtime.Outcome, error)

type FanInHandler

type FanInHandler struct{}

func (*FanInHandler) Execute

func (h *FanInHandler) Execute(ctx context.Context, exec *Execution, node *model.Node) (runtime.Outcome, error)

type FidelityAwareHandler

type FidelityAwareHandler interface {
	Handler
	UsesFidelity() bool
}

FidelityAwareHandler is an optional interface that handlers implement to declare they use fidelity/thread resolution (e.g., LLM session continuity). The engine resolves fidelity and thread keys only for handlers that implement this interface, avoiding hardcoded handler-type checks.

type Handler

type Handler interface {
	Execute(ctx context.Context, exec *Execution, node *model.Node) (runtime.Outcome, error)
}

type HandlerRegistry

type HandlerRegistry struct {
	// contains filtered or unexported fields
}

func NewDefaultRegistry

func NewDefaultRegistry() *HandlerRegistry

func (*HandlerRegistry) KnownTypes

func (r *HandlerRegistry) KnownTypes() []string

KnownTypes returns the list of registered handler type strings. Used by the validate package's TypeKnownRule to check node type overrides.

func (*HandlerRegistry) Register

func (r *HandlerRegistry) Register(typeString string, h Handler)

func (*HandlerRegistry) Resolve

func (r *HandlerRegistry) Resolve(n *model.Node) Handler

type InferredReference

type InferredReference struct {
	Pattern    string `json:"pattern"`
	Rationale  string `json:"rationale"`
	Confidence string `json:"confidence"`
}

type InputConfig

type InputConfig struct {
	Materialize InputMaterializationConfig `json:"materialize,omitempty" yaml:"materialize,omitempty"`
}

type InputDocForInference

type InputDocForInference struct {
	Path    string `json:"path"`
	Content string `json:"content"`
}

type InputImportEntry

type InputImportEntry struct {
	Pattern  string `json:"pattern" yaml:"pattern"`
	Required *bool  `json:"required,omitempty" yaml:"required,omitempty"`
}

type InputInferenceOptions

type InputInferenceOptions struct {
	Provider string `json:"provider"`
	Model    string `json:"model"`
}

type InputManifest

type InputManifest struct {
	Sources                      []string                    `json:"sources"`
	ResolvedFiles                []string                    `json:"resolved_files"`
	SourceTargetMap              []InputSourceTargetMapEntry `json:"source_target_map"`
	RunBaseRevision              string                      `json:"run_base_revision,omitempty"`
	BranchRevision               string                      `json:"branch_revision,omitempty"`
	BaseRunRevision              string                      `json:"base_run_revision,omitempty"`
	BranchHeadRevision           string                      `json:"branch_head_revision,omitempty"`
	DiscoveredReferences         []DiscoveredInputReference  `json:"discovered_references"`
	UnresolvedInferredReferences []InferredReference         `json:"unresolved_inferred_references,omitempty"`
	Warnings                     []string                    `json:"warnings,omitempty"`
	GeneratedAt                  string                      `json:"generated_at"`
}

type InputMaterializationConfig

type InputMaterializationConfig struct {
	Enabled          *bool                           `json:"enabled,omitempty" yaml:"enabled,omitempty"`
	Include          []string                        `json:"include,omitempty" yaml:"include,omitempty"`
	DefaultInclude   []string                        `json:"default_include,omitempty" yaml:"default_include,omitempty"`
	Imports          []InputImportEntry              `json:"imports,omitempty" yaml:"imports,omitempty"`
	FanIn            InputMaterializationFanInConfig `json:"fan_in,omitempty" yaml:"fan_in,omitempty"`
	FollowReferences *bool                           `json:"follow_references,omitempty" yaml:"follow_references,omitempty"`
	InferWithLLM     *bool                           `json:"infer_with_llm,omitempty" yaml:"infer_with_llm,omitempty"`
	LLMModel         string                          `json:"llm_model,omitempty" yaml:"llm_model,omitempty"`
	LLMProvider      string                          `json:"llm_provider,omitempty" yaml:"llm_provider,omitempty"`
}

type InputMaterializationFanInConfig

type InputMaterializationFanInConfig struct {
	PromoteRunScoped []string `json:"promote_run_scoped,omitempty" yaml:"promote_run_scoped,omitempty"`
}

type InputMaterializationOptions

type InputMaterializationOptions struct {
	SourceRoots             []string
	Include                 []string
	DefaultInclude          []string
	FollowReferences        bool
	TargetRoot              string
	SnapshotRoot            string
	ExistingSourceTargetMap map[string]string
	Scanner                 InputReferenceScanner
	InferWithLLM            bool
	Inferer                 InputReferenceInferer
	InferProvider           string
	InferModel              string
	InferenceCache          map[string][]InferredReference
}

type InputMaterializationPolicy

type InputMaterializationPolicy struct {
	Enabled          bool
	Include          []string
	DefaultInclude   []string
	FollowReferences bool
	InferWithLLM     bool
	InferProvider    string
	InferModel       string
}

type InputReferenceInferer

type InputReferenceInferer interface {
	Infer(ctx context.Context, docs []InputDocForInference, opts InputInferenceOptions) ([]InferredReference, error)
}

type InputReferenceKind

type InputReferenceKind string
const (
	InputReferenceKindPath InputReferenceKind = "path"
	InputReferenceKindGlob InputReferenceKind = "glob"
)

type InputReferenceScanner

type InputReferenceScanner interface {
	Scan(sourceFile string, content []byte) []DiscoveredInputReference
}

type InputSnapshotConflict

type InputSnapshotConflict struct {
	Path          string                            `json:"path"`
	BranchDigests []InputSnapshotConflictDigestPair `json:"branch_digests"`
}

type InputSnapshotConflictDigestPair

type InputSnapshotConflictDigestPair struct {
	BranchKey  string `json:"branch_key"`
	RevisionID string `json:"revision_id"`
	Digest     string `json:"digest"`
}

type InputSnapshotLineage

type InputSnapshotLineage struct {
	RunID       string                      `json:"run_id"`
	RunHead     string                      `json:"run_head_revision"`
	BranchHeads map[string]string           `json:"branch_heads"`
	Revisions   map[string]InputSnapshotRev `json:"revisions"`
}

func LoadInputSnapshotLineage

func LoadInputSnapshotLineage(logsRoot string) (*InputSnapshotLineage, error)

func (*InputSnapshotLineage) AdvanceBranch

func (l *InputSnapshotLineage) AdvanceBranch(branchKey string, parentRev string, digest map[string]string) string

func (*InputSnapshotLineage) CreateRunRevision

func (l *InputSnapshotLineage) CreateRunRevision(parentRev string, digest map[string]string) string

func (*InputSnapshotLineage) ForkBranch

func (l *InputSnapshotLineage) ForkBranch(branchKey string, baseRev string) (string, error)

func (*InputSnapshotLineage) MergePromotedPaths

func (l *InputSnapshotLineage) MergePromotedPaths(promote []string, branchRevs map[string]string) (string, []InputSnapshotConflict, error)

func (*InputSnapshotLineage) SaveAtomic

func (l *InputSnapshotLineage) SaveAtomic(logsRoot string) error

type InputSnapshotRev

type InputSnapshotRev struct {
	ID          string            `json:"id"`
	ParentIDs   []string          `json:"parent_ids"`
	Scope       string            `json:"scope"`
	BranchKey   string            `json:"branch_key,omitempty"`
	FileDigest  map[string]string `json:"run_scoped_file_digest"`
	GeneratedAt string            `json:"generated_at"`
}

type InputSourceTargetMapEntry

type InputSourceTargetMapEntry struct {
	Source string `json:"source"`
	Target string `json:"target"`
}

type Interviewer

type Interviewer interface {
	Ask(question Question) Answer
	AskMultiple(questions []Question) []Answer
	Inform(message string, stage string)
}

type ManagerLoopHandler

type ManagerLoopHandler struct{}

ManagerLoopHandler is defined in manager_loop.go.

func (*ManagerLoopHandler) Execute

func (h *ManagerLoopHandler) Execute(ctx context.Context, exec *Execution, node *model.Node) (runtime.Outcome, error)

Execute implements the ManagerLoopHandler per spec §4.11. It runs an observe/wait loop that monitors a child pipeline and evaluates stop conditions each cycle. The steer action is logged but not yet implemented (deferred to v2).

type Option

type Option struct {
	Key   string
	Label string
	To    string
}

type ParallelHandler

type ParallelHandler struct{}

func (*ParallelHandler) Execute

func (h *ParallelHandler) Execute(ctx context.Context, exec *Execution, node *model.Node) (runtime.Outcome, error)

type PreflightConfig

type PreflightConfig struct {
	PromptProbes PromptProbeConfig `json:"prompt_probes,omitempty" yaml:"prompt_probes,omitempty"`
}

type PreflightResult

type PreflightResult struct {
	RunID               string
	LogsRoot            string
	PreflightReportPath string
	Warnings            []string
	CXDBUIURL           string
}

PreflightResult contains metadata emitted by preflight-only startup checks.

func PreflightWithConfig

func PreflightWithConfig(ctx context.Context, dotSource []byte, cfg *RunConfigFile, overrides RunOptions) (*PreflightResult, error)

PreflightWithConfig runs all RunWithConfig prechecks and exits before pipeline startup.

type PrepareOptions

type PrepareOptions struct {
	Transforms []Transform
	// RepoPath is the repository root directory. When set, prompt_file attributes
	// on nodes are resolved relative to this path before other transforms run.
	RepoPath string
	// KnownTypes is an optional list of handler type strings. When non-empty,
	// the TypeKnownRule lint rule is added to validation so that nodes with
	// explicit type= attributes not in this set produce a warning.
	KnownTypes []string
	// Catalog is an optional modeldb catalog. When non-nil, model ID catalog
	// checks (stylesheet_unknown_model, stylesheet_noncanonical_model_id) are
	// enabled. When nil, those checks are silently skipped.
	Catalog *modeldb.Catalog
}

type ProgressFunc

type ProgressFunc func(map[string]any)

ProgressFunc is an optional callback for emitting structured progress events. Routing functions accept this to log decisions without depending on the engine.

type PromptProbeConfig

type PromptProbeConfig struct {
	Enabled     *bool    `json:"enabled,omitempty" yaml:"enabled,omitempty"`
	Transports  []string `json:"transports,omitempty" yaml:"transports,omitempty"`
	TimeoutMS   *int     `json:"timeout_ms,omitempty" yaml:"timeout_ms,omitempty"`
	Retries     *int     `json:"retries,omitempty" yaml:"retries,omitempty"`
	BaseDelayMS *int     `json:"base_delay_ms,omitempty" yaml:"base_delay_ms,omitempty"`
	MaxDelayMS  *int     `json:"max_delay_ms,omitempty" yaml:"max_delay_ms,omitempty"`
}

type ProviderAPIConfig

type ProviderAPIConfig struct {
	Protocol           string            `json:"protocol,omitempty" yaml:"protocol,omitempty"`
	BaseURL            string            `json:"base_url,omitempty" yaml:"base_url,omitempty"`
	Path               string            `json:"path,omitempty" yaml:"path,omitempty"`
	APIKeyEnv          string            `json:"api_key_env,omitempty" yaml:"api_key_env,omitempty"`
	ProviderOptionsKey string            `json:"provider_options_key,omitempty" yaml:"provider_options_key,omitempty"`
	ProfileFamily      string            `json:"profile_family,omitempty" yaml:"profile_family,omitempty"`
	Headers            map[string]string `json:"headers,omitempty" yaml:"headers,omitempty"`
}

type ProviderConfig

type ProviderConfig struct {
	Backend    BackendKind       `json:"backend" yaml:"backend"`
	Executable string            `json:"executable,omitempty" yaml:"executable,omitempty"`
	API        ProviderAPIConfig `json:"api,omitempty" yaml:"api,omitempty"`
	Failover   []string          `json:"failover,omitempty" yaml:"failover,omitempty"`
}

type ProviderRequiringHandler

type ProviderRequiringHandler interface {
	Handler
	RequiresProvider() bool
}

ProviderRequiringHandler is an optional interface that handlers implement to declare they require an LLM provider. The engine uses this during preflight to gather provider requirements instead of checking node shapes.

type ProviderRuntime

type ProviderRuntime struct {
	Key              string
	Backend          BackendKind
	Executable       string
	API              providerspec.APISpec
	CLI              *providerspec.CLISpec
	APIHeadersMap    map[string]string
	Failover         []string
	FailoverExplicit bool
	ProfileFamily    string
}

func (ProviderRuntime) APIHeaders

func (r ProviderRuntime) APIHeaders() map[string]string

type QAPair

type QAPair struct {
	Question Question
	Answer   Answer
}

QAPair stores a question and its answer for recording purposes.

type Question

type Question struct {
	Type           QuestionType
	Text           string
	Options        []Option
	Default        *Answer // default answer if timeout/skip (nil = no default)
	TimeoutSeconds float64 // max wait time; 0 means no timeout
	Stage          string
	Metadata       map[string]any // arbitrary key-value pairs for frontend use
}

type QuestionType

type QuestionType string
const (
	QuestionSingleSelect QuestionType = "SINGLE_SELECT"
	QuestionMultiSelect  QuestionType = "MULTI_SELECT"
	QuestionFreeText     QuestionType = "FREE_TEXT"
	QuestionConfirm      QuestionType = "CONFIRM"
	QuestionYesNo        QuestionType = "YES_NO" // binary yes/no; semantically distinct from CONFIRM
)

type QueueInterviewer

type QueueInterviewer struct {
	Answers []Answer
	// contains filtered or unexported fields
}

QueueInterviewer returns pre-seeded answers in order. Useful for tests. Spec §6.4: returns Answer{Skipped: true} when the queue is empty.

func (*QueueInterviewer) Ask

func (i *QueueInterviewer) Ask(q Question) Answer

func (*QueueInterviewer) AskMultiple

func (i *QueueInterviewer) AskMultiple(questions []Question) []Answer

func (*QueueInterviewer) Inform

func (i *QueueInterviewer) Inform(message string, stage string)

type RecordingInterviewer

type RecordingInterviewer struct {
	Inner Interviewer

	Recordings []QAPair
	// contains filtered or unexported fields
}

RecordingInterviewer wraps another Interviewer and records all Q&A pairs. Spec §6.4: Used for replay, debugging, and audit trails.

func (*RecordingInterviewer) Ask

func (*RecordingInterviewer) AskMultiple

func (i *RecordingInterviewer) AskMultiple(questions []Question) []Answer

func (*RecordingInterviewer) Inform

func (i *RecordingInterviewer) Inform(message string, stage string)

type ResolveArtifactPolicyInput

type ResolveArtifactPolicyInput struct {
	LogsRoot string `json:"logs_root,omitempty"`
}

type ResolvedArtifactCheckpoint

type ResolvedArtifactCheckpoint struct {
	ExcludeGlobs []string `json:"exclude_globs,omitempty"`
}

type ResolvedArtifactEnv

type ResolvedArtifactEnv struct {
	Vars map[string]string `json:"vars,omitempty"`
}

type ResolvedArtifactPolicy

type ResolvedArtifactPolicy struct {
	Profiles     []string                   `json:"profiles,omitempty"`
	ManagedRoots map[string]string          `json:"managed_roots,omitempty"`
	Env          ResolvedArtifactEnv        `json:"env,omitempty"`
	Checkpoint   ResolvedArtifactCheckpoint `json:"checkpoint,omitempty"`
}

type Result

type Result struct {
	RunID          string
	LogsRoot       string
	WorktreeDir    string
	RunBranch      string
	FinalStatus    runtime.FinalStatus
	FinalCommitSHA string
	Warnings       []string
	CXDBUIURL      string
}

func Resume

func Resume(ctx context.Context, logsRoot string) (*Result, error)

Resume continues an existing run from {logs_root}/checkpoint.json.

v1 resume source of truth: - filesystem checkpoint.json (execution state) - stage status.json for last completed node (routing outcome) - git commit SHA from checkpoint (code state)

func ResumeFromBranch

func ResumeFromBranch(ctx context.Context, repoPath string, runBranch string) (*Result, error)

ResumeFromBranch resumes a run given only the git run branch name, using best-effort discovery of the logs_root in the default state directory.

func ResumeFromCXDB

func ResumeFromCXDB(ctx context.Context, cxdbHTTPBaseURL string, contextID string) (*Result, error)

ResumeFromCXDB resumes a run by reading the latest checkpoint pointer from the CXDB context head.

This supports the metaspec requirement: resume MUST be possible from the CXDB trajectory.

func Run

func Run(ctx context.Context, dotSource []byte, opts RunOptions) (*Result, error)

Run executes the pipeline in a dedicated git worktree and creates a checkpoint commit after each node.

func RunWithConfig

func RunWithConfig(ctx context.Context, dotSource []byte, cfg *RunConfigFile, overrides RunOptions) (*Result, error)

RunWithConfig executes a run using the metaspec run configuration file schema.

type ResumeOverrides

type ResumeOverrides struct {
	CXDBHTTPBaseURL string
	CXDBContextID   string
}

type RunConfigFile

type RunConfigFile struct {
	Version int `json:"version" yaml:"version"`
	// Graph and Task are optional operator metadata fields used by wrappers/UI.
	// The engine does not consume them directly during run execution.
	Graph string `json:"graph,omitempty" yaml:"graph,omitempty"`
	Task  string `json:"task,omitempty" yaml:"task,omitempty"`

	Repo struct {
		Path string `json:"path" yaml:"path"`
	} `json:"repo" yaml:"repo"`

	CXDB struct {
		BinaryAddr  string `json:"binary_addr" yaml:"binary_addr"`
		HTTPBaseURL string `json:"http_base_url" yaml:"http_base_url"`
		Autostart   struct {
			Enabled        bool     `json:"enabled" yaml:"enabled"`
			Command        []string `json:"command" yaml:"command"`
			WaitTimeoutMS  int      `json:"wait_timeout_ms" yaml:"wait_timeout_ms"`
			PollIntervalMS int      `json:"poll_interval_ms" yaml:"poll_interval_ms"`
			UI             struct {
				Enabled bool     `json:"enabled" yaml:"enabled"`
				Command []string `json:"command" yaml:"command"`
				URL     string   `json:"url" yaml:"url"`
			} `json:"ui" yaml:"ui"`
		} `json:"autostart" yaml:"autostart"`
	} `json:"cxdb" yaml:"cxdb"`

	LLM struct {
		CLIProfile string                    `json:"cli_profile" yaml:"cli_profile"`
		Providers  map[string]ProviderConfig `json:"providers" yaml:"providers"`
	} `json:"llm" yaml:"llm"`

	ModelDB struct {
		OpenRouterModelInfoPath           string `json:"openrouter_model_info_path" yaml:"openrouter_model_info_path"`
		OpenRouterModelInfoUpdatePolicy   string `json:"openrouter_model_info_update_policy" yaml:"openrouter_model_info_update_policy"`
		OpenRouterModelInfoURL            string `json:"openrouter_model_info_url" yaml:"openrouter_model_info_url"`
		OpenRouterModelInfoFetchTimeoutMS int    `json:"openrouter_model_info_fetch_timeout_ms" yaml:"openrouter_model_info_fetch_timeout_ms"`
	} `json:"modeldb" yaml:"modeldb"`

	Git struct {
		RequireClean           *bool    `json:"require_clean,omitempty" yaml:"require_clean,omitempty"`
		RunBranchPrefix        string   `json:"run_branch_prefix" yaml:"run_branch_prefix"`
		CommitPerNode          bool     `json:"commit_per_node" yaml:"commit_per_node"`
		PushRemote             string   `json:"push_remote,omitempty" yaml:"push_remote,omitempty"`
		CheckpointExcludeGlobs []string `json:"checkpoint_exclude_globs,omitempty" yaml:"checkpoint_exclude_globs,omitempty"`
	} `json:"git" yaml:"git"`

	ArtifactPolicy ArtifactPolicyConfig `json:"artifact_policy,omitempty" yaml:"artifact_policy,omitempty"`

	Setup struct {
		Commands  []string `json:"commands,omitempty" yaml:"commands,omitempty"`
		TimeoutMS int      `json:"timeout_ms,omitempty" yaml:"timeout_ms,omitempty"`
	} `json:"setup,omitempty" yaml:"setup,omitempty"`

	RuntimePolicy RuntimePolicyConfig `json:"runtime_policy,omitempty" yaml:"runtime_policy,omitempty"`
	Preflight     PreflightConfig     `json:"preflight,omitempty" yaml:"preflight,omitempty"`
	Inputs        InputConfig         `json:"inputs,omitempty" yaml:"inputs,omitempty"`
}

func DefaultRunConfig

func DefaultRunConfig() (*RunConfigFile, error)

DefaultRunConfig builds a RunConfigFile with sensible defaults suitable for running without an explicit config file. The repo path defaults to the current working directory if it is a git repo, otherwise an error is returned. Call applyConfigDefaults and validateConfig on the result before use.

func LoadRunConfigFile

func LoadRunConfigFile(path string) (*RunConfigFile, error)

type RunOptions

type RunOptions struct {
	RepoPath string

	// RunID is a globally unique filesystem-safe identifier. If empty, one is generated (ULID).
	RunID string

	// LogsRoot defaults to:
	//   ${XDG_STATE_HOME:-$HOME/.local/state}/kilroy/attractor/runs/<run_id>
	LogsRoot string

	// WorktreeDir defaults to {LogsRoot}/worktree.
	WorktreeDir string

	// Git branch prefix defaults to "attractor/run".
	RunBranchPrefix string

	// If true (default), refuse to start when repo has uncommitted changes.
	RequireClean bool

	// Optional callback invoked after CXDB/UI bootstrap and before pipeline execution starts.
	// Pointer is used to avoid copying synchronization primitives inside CXDBStartupInfo.
	OnCXDBStartup func(info *CXDBStartupInfo)

	// Allows explicit opt-in for test-shim CLI execution profile.
	AllowTestShim bool

	// When true, skip CXDB startup entirely. eng.CXDB remains nil;
	// all downstream consumers already nil-check before use.
	DisableCXDB bool

	// Optional provider-level model overrides (provider -> model id).
	// When set, the forced model is used for execution and bypasses model-catalog
	// membership validation for that provider.
	ForceModels map[string]string

	// Optional global stage timeout cap. When > 0, each stage attempt uses the
	// smaller positive timeout from node timeout and this global cap.
	StageTimeout time.Duration

	// Optional watchdog for no-progress stalls. Defaults are applied when unset.
	StallTimeout       time.Duration
	StallCheckInterval time.Duration

	// Optional cap for LLM retries in codergen routing.
	// Pointer preserves explicit zero versus unset semantics from config.
	MaxLLMRetries *int

	// Optional callback invoked for every progress event (same data written to
	// progress.ndjson). The map is a deep-copied snapshot safe for concurrent
	// use by the caller. Used by the HTTP server to fan events to SSE clients.
	ProgressSink func(map[string]any)

	// Optional interviewer for human-in-the-loop gates. Defaults to
	// AutoApproveInterviewer when nil.
	Interviewer Interviewer

	// Optional callback invoked after the engine is fully initialized but
	// before the main loop starts. Allows callers to capture an engine
	// reference for context inspection, etc.
	OnEngineReady func(e *Engine)

	// Arbitrary key/value metadata written to manifest.json under "labels".
	// Use to fingerprint runs for later querying or pruning (e.g. source=test).
	Labels map[string]string
}

type RuntimePolicyConfig

type RuntimePolicyConfig struct {
	StageTimeoutMS       *int `json:"stage_timeout_ms,omitempty" yaml:"stage_timeout_ms,omitempty"`
	StallTimeoutMS       *int `json:"stall_timeout_ms,omitempty" yaml:"stall_timeout_ms,omitempty"`
	StallCheckIntervalMS *int `json:"stall_check_interval_ms,omitempty" yaml:"stall_check_interval_ms,omitempty"`
	MaxLLMRetries        *int `json:"max_llm_retries,omitempty" yaml:"max_llm_retries,omitempty"`
}

type SimulatedCodergenBackend

type SimulatedCodergenBackend struct{}

func (*SimulatedCodergenBackend) Run

func (b *SimulatedCodergenBackend) Run(ctx context.Context, exec *Execution, node *model.Node, prompt string) (string, *runtime.Outcome, error)

type SingleExecutionHandler

type SingleExecutionHandler interface {
	Handler
	SkipRetry() bool
}

SingleExecutionHandler is an optional interface that handlers implement to declare they should bypass retry logic (execute exactly once). Conditional pass-through nodes are the canonical example: retrying a routing point burns retry budget without useful work.

type StartHandler

type StartHandler struct{}

func (*StartHandler) Execute

func (h *StartHandler) Execute(ctx context.Context, exec *Execution, node *model.Node) (runtime.Outcome, error)

type ToolHandler

type ToolHandler struct{}

func (*ToolHandler) Execute

func (h *ToolHandler) Execute(ctx context.Context, execCtx *Execution, node *model.Node) (runtime.Outcome, error)

type Transform

type Transform interface {
	ID() string
	Apply(g *model.Graph) error
}

Transform can mutate the parsed graph between parse and validate (attractor-spec DoD).

type TransformRegistry

type TransformRegistry struct {
	// contains filtered or unexported fields
}

TransformRegistry stores transforms to apply in registration order.

func NewTransformRegistry

func NewTransformRegistry() *TransformRegistry

func (*TransformRegistry) List

func (r *TransformRegistry) List() []Transform

func (*TransformRegistry) Register

func (r *TransformRegistry) Register(t Transform)

type WaitHumanHandler

type WaitHumanHandler struct{}

func (*WaitHumanHandler) Execute

func (h *WaitHumanHandler) Execute(ctx context.Context, exec *Execution, node *model.Node) (runtime.Outcome, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL