goal

package
v0.50.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 23, 2026 License: AGPL-3.0 Imports: 24 Imported by: 0

Documentation

Overview

Package goal is the recursive execution core: one entity (the Goal) whose completion is DERIVED from an append-only acceptance ledger, never asserted. A goal is a root goal (parent_id NULL); children are the same shape all the way down. GoalService owns every durable write; the acceptance fold owns "done is derived".

The package is flat: foundation types (enums, contract/evidence structs, pure folds) live alongside the service and integration surface so they share the unexported helpers directly. Information hiding is by unexported identifiers, not subpackages.

Index

Constants

View Source
const (
	ReadinessDispatchable = "dispatchable"
	ReadinessWaitingDeps  = "waiting_deps"
	ReadinessBlocked      = "blocked"
	ReadinessActive       = "active"
	ReadinessTerminal     = "terminal"
	ReadinessDraft        = "draft"
	ReadinessComposite    = "composite" // a composite is driven by rollup, not claimed
	ReadinessUnknown      = "unknown"
)

Readiness states. dispatchable is the only state the dispatcher claims on; the rest are diagnostic (surfaced via the readiness API to explain "why isn't this running?").

View Source
const (
	// GoalQueue isolates goal-attempt execution from the scheduler's queue on the
	// shared River client.
	GoalQueue = "stella_goal"

	// GoalTickQueue runs the dispatcher convergence tick (River Phase 2b). It is a
	// dedicated queue with one worker per node so a slow tick never consumes
	// attempt-execution slots and never overlaps itself on a node; combined with
	// leader-only periodic enqueue and ByState uniqueness, the cluster runs a
	// single convergence loop (at most one live tick at a time) rather than one
	// scan per node. A leader failover may run one extra (idempotent) pass.
	GoalTickQueue = "stella_goal_tick"
)

River Phase 2a: goal attempt execution runs on durable River jobs instead of the old in-process worker pool. Claiming a leaf mints an attempt and enqueues ONE job here; a worker promotes and drives it to a terminal attempt state. River owns durability, multi-node distribution, and graceful drain — but NOT retry: MaxAttempts is 1 so the same attempt_id is never re-run. Retry is the goal convergence model's job (attempt_count / reopenForRework), driven by the dispatcher reaper off the lease.

The goal queue's worker runs on the single process-wide River client (db.NewWorkingRiverClient): there must be exactly one electable client per database, so the goal subsystem contributes its queue + worker to that shared client (RegisterGoalWorker / GoalQueueConfig) rather than building its own.

View Source
const (
	LifecycleDraft         = "draft"
	LifecycleReady         = "ready"
	LifecycleActive        = "active"
	LifecycleBlocked       = "blocked"
	LifecycleAccepted      = "accepted"       // terminal-good; accepted_output frozen
	LifecycleRejectedFinal = "rejected_final" // judgment said no, no rework path left
	LifecycleAbandoned     = "abandoned"      // budget exhausted + give-up
	LifecycleCancelled     = "cancelled"
)

Goal lifecycle (contract §2). The single state machine every goal — root or child, leaf or composite — runs through.

View Source
const (
	BlockBudgetExhausted   = "budget_exhausted"
	BlockNeedsPlanApproval = "needs_plan_approval"
	BlockNeedsVerdict      = "needs_verdict"
	BlockDep               = "dep"
)

Block reasons (meaningful only when lifecycle='blocked'). Precedence on concurrent causes: budget_exhausted > needs_plan_approval > needs_verdict > dep. needs_plan_approval is a composite's human-review gate on its proposed plan, before any child is materialized; it cannot co-occur with dep/needs_verdict (those require materialized children / a submitted attempt).

View Source
const (
	AcceptancePending = "pending"
	AcceptancePassed  = "passed"
	AcceptanceFailed  = "failed"
)

Acceptance projection — the evaluation RESULT, distinct from lifecycle.

View Source
const (
	AttemptQueued      = "queued"
	AttemptRunning     = "running"
	AttemptSubmitted   = "submitted" // evidence in; acceptance evaluates next
	AttemptInterrupted = "interrupted"
	AttemptFailed      = "failed"
	AttemptCancelled   = "cancelled"
)

Attempt status (contract §2.2).

View Source
const (
	PurposeExecution     = "execution"
	PurposeDecomposition = "decomposition"
	PurposeReview        = "review"
)

Attempt purpose.

View Source
const (
	KindLeaf      = "leaf"
	KindComposite = "composite"
)

Goal kind.

View Source
const (
	PriorityRoutine = "routine"
	PriorityUrgent  = "urgent"
)

Scheduling priority.

View Source
const (
	EdgeHard = "hard"
	EdgeSoft = "soft"
)

Edge kind — hard blocks readiness, soft is advisory.

View Source
const (
	OnFailureBlock  = "block"
	OnFailureFail   = "fail"
	OnFailureIgnore = "ignore"
)

Edge on-failure policy when an upstream fails.

View Source
const (
	ReviewNone  = "none"
	ReviewHuman = "human"
)

Decomposition review gate.

View Source
const (
	PolicyDetThenJudgment = "deterministic_then_judgment"
	PolicyAll             = "all"
	PolicyAny             = "any"
)

Acceptance-contract policy.

View Source
const (
	ItemDeterministic = "deterministic"
	ItemJudgment      = "judgment"
)

Contract item kind.

View Source
const (
	ResultPass = "pass"
	ResultFail = "fail"
)

Acceptance outcome for one item.

View Source
const (
	AuthoritySystem = "system" // deterministic check result
	AuthorityAgent  = "agent"  // reviewer-agent verdict
	AuthorityHuman  = "human"  // human verdict
)

Verdict authority.

View Source
const (
	EscalationBlock   = "block"   // human decides (default)
	EscalationAbandon = "abandon" // auto-terminal
)

Convergence escalation on budget exhaustion.

View Source
const (
	ActorSystem   = "system"
	ActorUser     = "user"
	ActorAgent    = "agent"
	ActorWorker   = "worker"
	ActorReviewer = "reviewer"
	ActorPlanner  = "planner"
)

Actor types.

Variables

View Source
var (
	// ErrInvalidTransition is returned when a transition's from-lifecycle (or
	// attempt from-status) no longer matches the row. Another tick
	// raced this one; the caller may re-fetch and retry.
	ErrInvalidTransition = errors.New("goal: invalid lifecycle transition")

	// ErrNotFound is returned when the target goal/attempt/edge
	// no longer exists.
	ErrNotFound = errors.New("goal: not found")

	// ErrPlanGate is returned by Activate when the plan gate is unmet: a leaf
	// with an empty non-trivial contract, or a composite that is not yet planned
	// (planned_at unset) / has no required children.
	ErrPlanGate = errors.New("goal: plan gate not satisfied")

	// ErrBudgetExhausted is informational: convergence reached MaxAttempts. The
	// goal moves to blocked(budget_exhausted) or abandoned/rejected_final
	// rather than retrying.
	ErrBudgetExhausted = errors.New("goal: convergence budget exhausted")

	// ErrInvalidContract is returned when an AcceptanceContract or
	// ConvergencePolicy fails structural validation at the write boundary.
	ErrInvalidContract = errors.New("goal: invalid acceptance contract")

	// ErrCompositeDeterministicContract is returned when a composite goal carries
	// a deterministic acceptance item. A composite produces no executed output, so
	// the deterministic check has no event source and the fold would stall pending
	// forever. Put deterministic checks on a leaf child, or use judgment items.
	ErrCompositeDeterministicContract = errors.New("goal: composite contract cannot contain deterministic items")

	// ErrInvalidDecomposition is returned when a composite's DecompositionContent
	// fails validation (no required child, dangling edge key, etc.).
	ErrInvalidDecomposition = errors.New("goal: invalid decomposition")

	// ErrDepthExceeded is returned at materialize when parent.depth+1 > max_depth.
	ErrDepthExceeded = errors.New("goal: decomposition depth exceeded")

	// ErrCycle is returned by AddEdge / decomposition validation when an edge
	// would close a cycle in the sibling dependency graph.
	ErrCycle = errors.New("goal: dependency cycle")

	// ErrConcurrencyCap is returned (or used to skip a candidate) when a Claim
	// would exceed the per-root or per-user in-flight attempt budget.
	ErrConcurrencyCap = errors.New("goal: concurrency cap reached")

	// ErrStaleProjection is returned when a fold computed against a lower seq
	// than the row's current acceptance_seq is rejected (stale-projection fence).
	ErrStaleProjection = errors.New("goal: stale acceptance projection")

	// ErrInvalidEvidence is returned when submitted evidence violates a contract
	// rule — e.g. a non-root goal submits an empty handoff summary. The
	// worker turns this into a retryable protocol miss.
	ErrInvalidEvidence = errors.New("goal: invalid evidence")

	// ErrInvalidVerdict is returned when a human/agent verdict is malformed
	// (unknown result/authority, empty scope_hash where required).
	ErrInvalidVerdict = errors.New("goal: invalid verdict")
)

Typed sentinel errors. Callers branch on these via errors.Is, never on string Contains. APOSD "defined out of existence" governs acceptance: an unmet contract simply never produces a pass event, so the goal never reaches accepted — there is no "is it really done?" error here. These errors cover the guard/validation failures the rest of the package raises.

View Source
var ErrNoSandbox = errors.New("goal: no sandbox session for deterministic check")

ErrNoSandbox is returned when a deterministic check cannot resolve a sandbox session to run in. A check with no place to run is an infrastructure fault, not an unmet acceptance — it never silently reads as a pass.

Functions

func CacheKey

func CacheKey(item AcceptanceItem, env CheckEnv) string

CacheKey is the single constructor for a deterministic check's cache key (contract §4.1, invariant #9). It folds item id+command, sandbox image, repo tree hash, env hash, and the sorted upstream accepted-output hashes. If RepoTreeHash OR EnvHash is unavailable ("") the key is "" — a forced miss that is never written as a hit-eligible row. A false miss costs a re-run; a false hit ships broken work.

func ContentHash

func ContentHash(parts ...any) string

ContentHash is the canonical content-addressing hash used for AcceptedOutput.Hash and any artifact/verdict scope anchor. It hashes the JSON-canonical encoding of the parts so two structurally equal outputs hash identically. A single constructor keeps the cache key, the accepted-output hash, and the verdict scope_hash in agreement.

func HashOutput

func HashOutput(out AttemptOutput) string

HashOutput computes the content hash of a candidate output's salient fields (summary + structured result). Stable under map ordering because json.Marshal sorts map keys. Artifacts are hash-addressed separately on the evidence; HashWithArtifacts folds them in when an output's identity must include its externalized artifacts.

func HashWithArtifacts

func HashWithArtifacts(out AttemptOutput, arts []ArtifactRef) string

HashWithArtifacts extends an output hash with the sorted artifact hashes, so two outputs with identical prose but different artifacts hash distinctly.

func HumanVerdictEvent

HumanVerdictEvent builds the append params for a human verdict (authority=human). seq/id/created_at are filled by appendAcceptanceEvent; this carries the verdict-as-evidence quartet (rationale/scope/authority/reviewer) + scope_hash.

func IsTerminalLifecycle

func IsTerminalLifecycle(s string) bool

IsTerminalLifecycle reports whether the lifecycle admits no further scheduling. 'blocked' is recoverable and so is NOT terminal.

func RegisterGoalTickWorker added in v0.50.0

func RegisterGoalTickWorker(workers *river.Workers, d *Dispatcher, log *slog.Logger)

RegisterGoalTickWorker registers the convergence-tick worker into the shared workers bundle (River Phase 2b). Paired with GoalTickQueueConfig and the periodic registered by Service.StartDispatchTick.

func RegisterGoalWorker added in v0.50.0

func RegisterGoalWorker(workers *river.Workers, runner WorkerRunner, log *slog.Logger)

RegisterGoalWorker registers the goal-attempt worker into a shared workers bundle used to build the process-wide River client. maxWorkers bounds concurrent attempt executions per node (the durable successor to the old in-process worker-pool cap); the per-root/per-user caps enforced at claim still bound total in-flight attempts cluster-wide.

func ValidAcceptanceState

func ValidAcceptanceState(s string) bool

ValidAcceptanceState reports whether s is a known acceptance projection state.

func ValidAttemptStatus

func ValidAttemptStatus(s string) bool

ValidAttemptStatus reports whether s is a known attempt status.

func ValidAuthority

func ValidAuthority(s string) bool

ValidAuthority reports whether s is a known verdict authority.

func ValidBlockReason

func ValidBlockReason(s string) bool

ValidBlockReason reports whether s is a known block reason.

func ValidContractPolicy

func ValidContractPolicy(s string) bool

ValidContractPolicy reports whether s is a known acceptance-contract policy.

func ValidEdgeKind

func ValidEdgeKind(s string) bool

ValidEdgeKind reports whether s is a known edge kind.

func ValidEscalation

func ValidEscalation(s string) bool

ValidEscalation reports whether s is a known convergence escalation.

func ValidItemKind

func ValidItemKind(s string) bool

ValidItemKind reports whether s is a known contract item kind.

func ValidKind

func ValidKind(s string) bool

ValidKind reports whether s is a known goal kind.

func ValidLifecycle

func ValidLifecycle(s string) bool

ValidLifecycle reports whether s is a known goal lifecycle.

func ValidOnFailure

func ValidOnFailure(s string) bool

ValidOnFailure reports whether s is a known edge failure policy.

func ValidPriority

func ValidPriority(s string) bool

ValidPriority reports whether s is a known scheduling priority.

func ValidPurpose

func ValidPurpose(s string) bool

ValidPurpose reports whether s is a known attempt purpose.

func ValidResult

func ValidResult(s string) bool

ValidResult reports whether s is a known acceptance outcome.

func ValidReviewPolicy

func ValidReviewPolicy(s string) bool

ValidReviewPolicy reports whether s is a known decomposition review gate.

func ValidateDecomposition

func ValidateDecomposition(c DecompositionContent, parentDepth, maxDepth int) error

ValidateDecomposition checks a DecompositionContent's structural invariants (contract §6): ≥1 required child, at most maxDecompositionBreadth children, every child Key unique and non-empty, every edge key resolves to a child Key, no edge cycle (DFS over keys), known enum values, and a depth budget. A composite produces no executed output, so a composite child may not carry a deterministic acceptance item (its fold would stall forever) and must leave a level of depth under maxDepth for its own children — both are enforced here at the proposal boundary so a doomed plan is rejected before it consumes budget. Returns ErrInvalidDecomposition, ErrInvalidContract, ErrDepthExceeded, or ErrCycle.

parentDepth is the composite's own depth; a child sits at parentDepth+1.

Types

type AcceptanceContract

type AcceptanceContract struct {
	Policy string           `json:"policy"` // deterministic_then_judgment | all | any
	Items  []AcceptanceItem `json:"items"`
}

AcceptanceContract is the composite policy tree of deterministic + judgment items that gates a goal's acceptance (contract §3.1). An empty contract (no items) is the trivial auto-accept degradation — a "direct goal" with no bar. Marshaled to the acceptance_contract TEXT column.

func SplitCriteria

func SplitCriteria(crits []Criterion) AcceptanceContract

SplitCriteria turns natural-language criteria into a deterministic|judgment item set under the deterministic_then_judgment policy (contract §3.1). A criterion with a Command becomes a binding deterministic item; one without becomes a judgment item (agent rubric or human prompt). The model never pretends prose is executable — the caller (planner) decides the split by supplying or omitting Command.

func (AcceptanceContract) HasDeterministicItem added in v0.50.0

func (c AcceptanceContract) HasDeterministicItem() bool

HasDeterministicItem reports whether the contract carries any deterministic (command-checked) item. A deterministic item needs an executed output to run its check against; a composite produces no output, so its fold would stall pending forever (no event source). Callers reject deterministic items on composites — see ErrCompositeDeterministicContract.

func (AcceptanceContract) IsTrivial

func (c AcceptanceContract) IsTrivial() bool

IsTrivial reports whether the contract is the empty auto-accept degradation (no items). A trivial leaf accepts immediately; a trivial composite accepts when all required children are accepted.

func (AcceptanceContract) Valid

func (c AcceptanceContract) Valid() bool

Valid reports whether the contract's policy and every item are well-formed. An empty contract is valid (trivial auto-accept).

type AcceptanceEventDetail

type AcceptanceEventDetail struct {
	Stdout     string        `json:"stdout,omitempty"` // truncated to N KB
	Artifacts  []ArtifactRef `json:"artifacts,omitempty"`
	DurationMs int64         `json:"duration_ms,omitempty"`
	CacheHit   bool          `json:"cache_hit,omitempty"`
	Gaps       []Gap         `json:"gaps,omitempty"`
}

AcceptanceEventDetail is the truncated/hash-addressed payload on an acceptance event (contract §3.6). The verdict quartet (rationale/scope/authority/timestamp) + scope_hash are first-class columns, not this blob.

type AcceptanceItem

type AcceptanceItem struct {
	ID       string `json:"id"`
	Kind     string `json:"kind"` // deterministic | judgment
	Required bool   `json:"required"`
	// deterministic:
	Command    string `json:"command,omitempty"`
	ExpectExit *int   `json:"expect_exit,omitempty"` // default 0
	// judgment:
	Authority string `json:"authority,omitempty"` // agent | human
	Rubric    string `json:"rubric,omitempty"`    // agent reviewer prompt
	Prompt    string `json:"prompt,omitempty"`    // human verdict prompt
}

AcceptanceItem is one check in the contract. Deterministic items carry a Command; judgment items carry an Authority + Rubric/Prompt. A non-required item is advisory and does not gate acceptance.

func (AcceptanceItem) Valid

func (i AcceptanceItem) Valid() bool

Valid reports whether a single item is structurally sound: a deterministic item needs a command; a judgment item needs a known authority.

type AcceptedOutput

type AcceptedOutput struct {
	GoalID        string         `json:"goal_id"`
	Summary       string         `json:"summary"`
	Result        map[string]any `json:"result,omitempty"`
	Artifacts     []ArtifactRef  `json:"artifacts,omitempty"`
	Hash          string         `json:"hash"`        // feeds downstream cache_key (§4.1) + verdict scope_hash (§4.2)
	AcceptedAt    string         `json:"accepted_at"` // RFC3339 UTC
	SourceAttempt string         `json:"source_attempt_id"`
	// Children carries the frozen accepted output of each accepted child, set only
	// on a composite's rollup output. A composite produces no work of its own, so
	// this is how its deliverables travel with the parent: a reader of the root
	// goal gets the tree's results without walking children. Nested composites
	// compose (a child composite's Children is already filled when it accepted).
	// Empty for leaves. Bounded by fanout (max_concurrent) x depth (max_depth).
	Children []AcceptedOutput `json:"children,omitempty"`
}

AcceptedOutput is the frozen snapshot copied from the accepted attempt (contract §3.5), then immutable. This is what flows downstream and what a verdict's scope_hash anchors to.

type Actor

type Actor struct {
	Type string // one of ActorSystem | ActorUser | ActorAgent | ActorWorker | ActorReviewer | ActorPlanner
	ID   string // user_id / agent_id / worker_id depending on Type; empty for system
}

Actor describes who initiated a transition. Ported from the old package; the goal ledger records verdict authority as a first-class column, but the service still carries an Actor for audit/attribution on non-acceptance transitions.

func SystemActor

func SystemActor() Actor

SystemActor is the dispatcher/system originator convenience.

func UserActor

func UserActor(userID string) Actor

UserActor attributes a transition to a human.

type ArtifactRef

type ArtifactRef struct {
	Hash  string `json:"hash"`
	Kind  string `json:"kind"`
	URI   string `json:"uri,omitempty"`
	Bytes int64  `json:"bytes"`
}

ArtifactRef is a hash-addressed externalized artifact (stdout/diffs/files).

type AttemptEnqueuer added in v0.50.0

type AttemptEnqueuer func(ctx context.Context, tx pgx.Tx, goalID, attemptID string) error

AttemptEnqueuer enqueues the durable execution job for a freshly claimed attempt within the claim transaction tx, so the claim and its job are atomic (River Phase 2c). GoalService.Claim calls it after minting the attempt; returning an error rolls the whole claim back. A nil AttemptEnqueuer skips enqueue (tests that mint+claim directly without River).

type AttemptEvidence

type AttemptEvidence struct {
	Summary   string         `json:"summary"`
	Artifacts []ArtifactRef  `json:"artifacts,omitempty"`
	Notes     map[string]any `json:"notes,omitempty"`
}

AttemptEvidence is the submitted handoff (contract §3.4). A non-root goal submitting an empty Summary is a retryable protocol miss.

type AttemptInput

type AttemptInput struct {
	Intent          string             `json:"intent"`
	UpstreamOutputs []AcceptedOutput   `json:"upstream_outputs"`           // ONLY accepted upstream outputs
	PriorGaps       *Evaluation        `json:"prior_gaps,omitempty"`       // Evaluation[i-1].gaps; nil on attempt 1
	Contract        AcceptanceContract `json:"contract"`                   // the bar the attempt must clear
	ResolvedVerdict string             `json:"resolved_verdict,omitempty"` // a human answer that unblocked needs_verdict
	AttemptNo       int                `json:"attempt_no"`
}

AttemptInput is the frozen-at-mint context an attempt executes against (contract §3.3). Marshaled to the attempt's input_context column. Frozen means an in-flight edit to intent/contract never mutates a running attempt.

type AttemptOutput

type AttemptOutput struct {
	Result  map[string]any `json:"result,omitempty"`
	Summary string         `json:"summary"`
	Hash    string         `json:"hash"`
}

AttemptOutput is the candidate the contract evaluates (contract §3.4). Hash becomes accepted_output.Hash on acceptance.

type Blocker

type Blocker struct {
	Kind     string `json:"kind"`
	Question string `json:"question"`
	Detail   any    `json:"detail,omitempty"`
}

Blocker carries a block action's payload. The goal model resolves blocks through the service (dep / needs_verdict / budget_exhausted), so an agent-declared block surfaces to the worker as a non-retryable failure whose reason embeds this payload; the structured form is kept here for the worker and for callers that introspect a recorded result.

type BootConfig

type BootConfig struct {
	DB       *pgxpool.Pool
	Services agent.ServiceManager // registry-backed session minting
	Chat     TaskChatFunc         // runs persisted worker turns; nil => noop executor
	// MaxWorkers caps concurrent attempt executions per node on the goal River
	// queue (0 => defaultGoalMaxWorkers). TickEvery/LeaseTTL override the
	// dispatcher/service defaults; zero values use them.
	MaxWorkers int
	TickEvery  time.Duration
	LeaseTTL   time.Duration
	Logger     *slog.Logger
}

BootConfig is the minimal wiring needed at server start. It mirrors the old tasks.BootConfig: a DB handle, the agent ServiceManager for session minting, the Chat callback for worker turns, and the dispatcher tunables.

type CheckEnv

type CheckEnv struct {
	GoalID         string
	SandboxImage   string
	RepoTreeHash   string   // "" ⇒ forced miss
	EnvHash        string   // "" ⇒ forced miss
	UpstreamHashes []string // accepted-output hashes of upstream edges
}

CheckEnv carries the provenance inputs the cache key folds (contract §4.1). RepoTreeHash/EnvHash may be "" when the sandbox cannot guarantee a stable hash; that forces a cache miss rather than risking a false hit.

type CheckResult

type CheckResult struct {
	ItemID   string
	ExitCode int
	Pass     bool
	Stdout   string
	CacheKey string
	CacheHit bool
}

CheckResult is the deterministic-check outcome the runner returns; the service folds it into an acceptance_event (contract §4.1). It never writes lifecycle.

type CheckRunner

type CheckRunner interface {
	Run(ctx context.Context, item AcceptanceItem, env CheckEnv) (CheckResult, error)
}

CheckRunner runs ONE deterministic acceptance item in the sandbox and returns a CheckResult the service folds into an acceptance_event. It is the only sandbox-IO in acceptance; it NEVER writes lifecycle. It runs inside the worker, after the executor submits, before the durable transition (sandbox exec must never run inside a DB tx, which would pin a pooled connection). The implementation lands in the integration phase (runner.go).

func NewCheckRunner

func NewCheckRunner(resolve SessionResolver, q *sqlc.Queries, stdoutLimit int) CheckRunner

NewCheckRunner builds the sandbox-backed CheckRunner. q is used only for the read-only cache probe; stdoutLimit ≤ 0 falls back to the package default. The returned value satisfies the CheckRunner interface declared in service.go.

type Config

type Config struct {
	// LeaseTTL bounds an attempt's lease before the dispatcher may reap it.
	LeaseTTL time.Duration
	// MaxConcurrentPerUser caps in-flight attempts per user (§5, default 16).
	MaxConcurrentPerUser int
	// StdoutLimit truncates captured stdout before it touches event detail.
	StdoutLimit int
}

Config carries the service's tunables. Zero values fall back to package defaults at use sites.

type ConvergencePolicy

type ConvergencePolicy struct {
	MaxAttempts int    `json:"max_attempts"`        // default 3; exhaustion → blocked(budget_exhausted)
	Escalation  string `json:"escalation"`          // block (default) | abandon
	MaxDepth    int    `json:"max_depth,omitempty"` // recursion ceiling; default 4
	// MaxConcurrent bounds breadth-of-fanout per root (§5, default 8). Read from
	// the root's policy by the dispatcher; 0 ⇒ default.
	MaxConcurrent int `json:"max_concurrent,omitempty"`
}

ConvergencePolicy bounds the rework loop and the recursion depth (§3.2).

func (ConvergencePolicy) Normalized

func (p ConvergencePolicy) Normalized() ConvergencePolicy

Normalized returns the policy with defaults applied (MaxAttempts 3, Escalation block, MaxDepth 4, MaxConcurrent 8). It never mutates the receiver.

func (ConvergencePolicy) Valid

func (p ConvergencePolicy) Valid() bool

Valid reports whether the convergence policy is well-formed. A zero-value policy is valid; Normalized fills the defaults.

type CreateInput

type CreateInput struct {
	UserID    string
	AgentID   string
	ProjectID string
	ParentID  string // "" ⇒ root
	RootID    string // = id for a root; parent.root_id for a child
	Depth     int64
	Position  int64
	SessionID string
	Title     string
	Intent    string
	Kind      string // leaf | composite
	Priority  string // "" ⇒ routine
	Required  bool

	Contract     AcceptanceContract
	Convergence  ConvergencePolicy
	ReviewPolicy string // "" ⇒ none

	Context      json.RawMessage // empty ⇒ "{}"
	DispatchHint json.RawMessage // empty ⇒ "{}"
}

CreateInput is the request to mint a root or child goal. The caller pre-mints session_id OUTSIDE any tx and passes it here.

type Criterion

type Criterion struct {
	ID   string // stable item id
	Text string // the NL criterion
	// Command, when non-empty, marks the criterion operationalizable: it becomes
	// a deterministic item bound to this command. Empty ⇒ judgment.
	Command string
	// Authority routes a judgment criterion (agent | human). Defaults to agent
	// when a judgment criterion leaves it blank.
	Authority string
	// Required marks whether the criterion gates acceptance (default true).
	Required bool
}

Criterion is one natural-language acceptance criterion handed to the splitter. Operationalizable carries a machine-checkable command; everything else is routed to a judgment item.

type DecompositionContent

type DecompositionContent struct {
	Children []ProposedChild `json:"children"`
	Edges    []ProposedEdge  `json:"edges"`
}

DecompositionContent is a composite's proposed children + edges (contract §3.7). Stored inline on agent_goal.plan (jsonb). The materializer keys idempotency on (goal_id, child.Key).

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher drives the convergence loop on a tick: reap stale attempts, propagate dep failures, roll up composites, then scan-and-claim dispatchable leaves under the concurrency caps and enqueue a durable River job per claim (contract §5, §7; River Phase 2a). It is the only scheduler; every durable write still routes through GoalService, and every attempt now executes as a River job rather than an in-process goroutine.

func NewDispatcher

func NewDispatcher(cfg DispatcherConfig) *Dispatcher

NewDispatcher constructs a dispatcher, filling defaults for zero-valued config fields.

func (*Dispatcher) SetEnqueuer added in v0.50.0

func (d *Dispatcher) SetEnqueuer(e goalEnqueuer)

SetEnqueuer injects the River enqueuer the dispatcher uses to dispatch claimed attempts. Called by the composition root after the shared client is built and before the tick starts; a nil enqueuer leaves dispatch disabled.

func (*Dispatcher) Stop

func (d *Dispatcher) Stop()

Stop signals the tick to go quiet. In-flight attempts now run as River jobs, and the tick itself runs as a River job; draining both is the shared River client's responsibility, not the dispatcher's.

func (*Dispatcher) Tick

func (d *Dispatcher) Tick(ctx context.Context)

Tick runs one pass of the dispatch loop. Public so tests can drive it deterministically. The order is fixed (§5/§7): reap → propagate → rollup → scan-and-decompose → scan-and-claim.

func (*Dispatcher) TickInterval added in v0.50.0

func (d *Dispatcher) TickInterval() time.Duration

TickInterval is the convergence-tick period. The composition root reads it to register the single-leader River periodic tick (River Phase 2b).

type DispatcherConfig

type DispatcherConfig struct {
	Service *GoalService
	Queries *sqlc.Queries
	// Enqueuer enqueues one durable River job per claimed attempt (River Phase
	// 2a). A nil enqueuer disables dispatch (tests that drive Worker.Run directly).
	Enqueuer goalEnqueuer

	TickEvery  time.Duration // 0 ⇒ defaultTickEvery
	LeaseTTL   time.Duration // 0 ⇒ service LeaseTTL
	BatchLimit int           // 0 ⇒ defaultBatchLimit
	// MaxConcurrentPerUser caps in-flight attempts per user (§5/§10.8, default
	// 16). 0 ⇒ the service's configured per-user cap.
	MaxConcurrentPerUser int
	Logger               *slog.Logger
}

DispatcherConfig wires a Dispatcher. Zero-valued fields fall back to the package defaults below.

type Evaluation

type Evaluation struct {
	Gaps []Gap `json:"gaps"`
}

Evaluation carries the shortfalls from one acceptance fold; it feeds the next attempt's input as prior_gaps.

type Executor

type Executor interface {
	Execute(ctx context.Context, req ExecutorRequest) (ExecutorResult, error)
}

Executor owns agent interaction for one attempt and is PURE with respect to durable state: Execute returns the action it wants; the WORKER applies the single transition through GoalService. The agent never mutates lifecycle/acceptance — the contract's non-negotiable. The concrete Request / Result types are declared by the integration phase (executor.go); they are referenced here only behind this interface so the core stays decoupled.

type ExecutorRequest

type ExecutorRequest struct {
	Goal    sqlc.AgentGoal
	Attempt sqlc.AgentGoalAttempt
	Input   AttemptInput
}

ExecutorRequest / ExecutorResult are forward-declared handles the integration phase fleshes out (executor.go). They are kept opaque here so service.go does not depend on the agent layer; the worker is the only caller that constructs and consumes them.

type ExecutorResult

type ExecutorResult struct {
	Submitted     bool
	Evidence      AttemptEvidence
	Output        AttemptOutput
	Decomposition *DecompositionContent // purpose=decomposition only
	Failed        bool
	FailReason    string
	Retryable     bool
}

ExecutorResult is the executor's declared outcome for one attempt. Exactly one of the submit/fail paths is meaningful; the worker maps it to a single service transition. Decomposition attempts carry produced plan content.

type Failure

type Failure struct {
	Reason    string `json:"reason"`
	Retryable bool   `json:"retryable"`
}

Failure carries a fail action's payload.

type Gap

type Gap struct {
	ItemID string `json:"item_id"`
	Reason string `json:"reason"`
	Detail string `json:"detail,omitempty"`
}

Gap is one unmet item with a reason fed into attempt_no+1.

type GoalFilter

type GoalFilter struct {
	AgentID   string
	Lifecycle string
	ProjectID string
	Terminal  *bool
	Q         string
	Archived  bool
}

GoalFilter narrows a root-goal list. The zero value lists active (non-archived) roots across all agents; populated fields AND together. Terminal is tri-state: nil = both, false = active only, true = history (terminal) only.

type GoalService

type GoalService struct {
	// contains filtered or unexported fields
}

GoalService is the ONLY writer of agent_goal_* rows. Every durable change is one withTx: load → guard from→to → side-effects → counter bumps → append acceptance_event → commit. Callers branch on the typed errors in types.go, never on string Contains. Sessions/sandboxes are minted OUTSIDE the tx.

func New

func New(db *pgxpool.Pool, q *sqlc.Queries, opts ...Option) *GoalService

New builds a GoalService. The Queries is used for non-transactional reads; mutating methods open their own txns via withTx. Collaborators that are nil on a given path surface as clear errors rather than panics.

func (*GoalService) Abandon

func (s *GoalService) Abandon(ctx context.Context, id, reason string, by Actor) error

Abandon is the human give-up on a blocked(budget_exhausted): blocked→abandoned (terminal), bumping parent required_failed (contract §2.1).

func (*GoalService) Activate

func (s *GoalService) Activate(ctx context.Context, id string) (sqlc.AgentGoal, error)

Activate is the plan gate: draft→ready (contract §2.1). A leaf passes when its contract has items or is explicitly trivial; a composite passes when it is planned (planned_at set) and required_total ≥ 1. A composite flips its draft children → ready. Returns ErrPlanGate when unmet.

func (*GoalService) AddEdge

func (s *GoalService) AddEdge(ctx context.Context, downstreamID, upstreamID, kind, onFailure string) (sqlc.AgentGoalEdge, error)

AddEdge inserts an accepted-output dependency between siblings with a DFS cycle check (contract §1.3, §6). Returns ErrCycle when the edge would close a cycle.

func (*GoalService) ApprovePlan added in v0.50.0

func (s *GoalService) ApprovePlan(ctx context.Context, goalID string, by Actor) error

ApprovePlan applies a human approval of a composite's proposed plan: it materializes the children and releases them, moving the composite out of blocked(needs_plan_approval) back to active for rollup (contract §2.3). Child sessions are pre-minted OUTSIDE the tx (mirrors the none path in SubmitDecomposition). Only a composite blocked on plan approval is accepted.

func (*GoalService) Archive

func (s *GoalService) Archive(ctx context.Context, id string) error

Archive soft-flags a terminal goal out of default lists.

func (*GoalService) BeginAutoDecomposition added in v0.50.0

func (s *GoalService) BeginAutoDecomposition(ctx context.Context, id string, enqueue AttemptEnqueuer) (sqlc.AgentGoalAttempt, error)

BeginAutoDecomposition mints a headless decomposition attempt for a composite and enqueues its durable River job in ONE tx (mirrors Claim): on success both commit; on enqueue failure the whole thing rolls back, leaving the composite draft to be re-picked next tick — no orphaned active composite to recover. Unlike interactive BeginDecomposition the attempt carries a real claim-grace lease and is heartbeated by the River worker, so the reaper recovers it if the node crashes mid-plan. The dispatcher (scanAndDecompose) is the only caller.

The eligibility guards are re-checked INSIDE the tx after LockGoalForWrite: the dispatcher's scan is a snapshot, so a concurrent Activate or a prior decomposition could have moved the goal between the scan and here. Re-checking under the row lock makes auto-decomposition never clobber an already-running or already-planned composite.

func (*GoalService) BeginDecomposition

func (s *GoalService) BeginDecomposition(ctx context.Context, id string) (sqlc.AgentGoalAttempt, error)

BeginDecomposition starts a composite's decomposition: draft→active, minting a purpose=decomposition attempt (contract §2.1). Guards kind=composite and not yet planned. The planning session is pre-minted outside the tx.

func (*GoalService) Block

func (s *GoalService) Block(ctx context.Context, id, reason string, by Actor) error

Block moves ready/active → blocked with a reason (contract §2.1). Used by the dispatcher (dep), the fold (needs_verdict), and convergence (budget_exhausted).

func (*GoalService) Cancel

func (s *GoalService) Cancel(ctx context.Context, id, reason string, by Actor) error

Cancel cascades cancel over the subtree (contract §6): depth-first set cancelled on each non-terminal descendant, reconcile each touched parent's required_failed, cancel in-flight attempts, stamp cancelled_at. One tx.

func (*GoalService) Claim

func (s *GoalService) Claim(ctx context.Context, id, workerID string, enqueue AttemptEnqueuer) (sqlc.AgentGoalAttempt, error)

Claim is the dispatcher's leaf ready→active (contract §2.1, §5 step 1). It resolves the dispatch hint's executor, mints a queued execution attempt and claims the goal in one tx. The per-root/per-user concurrency caps are enforced by the dispatcher BEFORE Claim (§5 step 2); Claim itself only guards the single-writer invariants.

enqueue (River Phase 2c) inserts the attempt's durable execution job in the SAME tx, so the claim and its job commit atomically — a crash can no longer leave a claimed attempt with no job to run it. A nil enqueue skips this (tests minting+claiming without River); its failure rolls the claim back, leaving the goal ready for the next tick.

func (*GoalService) CreateGoal

func (s *GoalService) CreateGoal(ctx context.Context, in CreateInput) (sqlc.AgentGoal, error)

CreateGoal inserts a goal in 'draft' (contract §2.1, (none)→draft).

func (*GoalService) CreateRoot

func (s *GoalService) CreateRoot(ctx context.Context, in CreateInput) (sqlc.AgentGoal, error)

CreateRoot mints a worker session for a new root goal (a goal) and creates it in 'draft'. The session is minted OUTSIDE the insert tx to avoid a self-deadlock and to keep slow session-minting off a pooled connection (same discipline as Materialize's pre-minted child sessions). Child goals get their sessions from Materialize instead, so this entry is root-only.

func (*GoalService) FailAttempt

func (s *GoalService) FailAttempt(ctx context.Context, attemptID, reason string) error

FailAttempt finalizes a worker-reported failed attempt AND applies the single convergence transition, so the goal never strands 'active' with no live attempt (issue #543). It mirrors ReapAttempt but is the executor-failure entry (agent reported fail / produced no action / empty handoff / panic), so the attempt is recorded 'failed' rather than 'interrupted'. One tx.

  • execution attempt → branchOnFailure: budget left reopens to ready (rework = next attempt, the failure reason rides as a gap); budget out blocks/abandons/ rejects per policy. A failed attempt consumes one budget unit (same as a fold failure), so a persistently failing agent parks at blocked, never loops.
  • decomposition attempt → release the composite to draft so a new BeginDecomposition can re-mint (mirror ReapAttempt's decomposition branch).

A 0-row FinalizeAttempt means the attempt is no longer queued/running (already reaped/raced); the tx rolls back and the caller treats ErrInvalidTransition as a no-op (the goal already recovered).

func (*GoalService) Materialize

func (s *GoalService) Materialize(ctx context.Context, qtx *sqlc.Queries, parent sqlc.AgentGoal, content DecompositionContent, childSessions map[string]string) error

Materialize creates a composite's children + edges from its proposed plan, in ONE caller-supplied tx (contract §6). It operates on the passed tx-bound querier — no import cycle, no own tx, no session minting (sessions are pre-minted outside the tx and resolved per child by the caller through the childSessions map). It is idempotent: the planned_at fence (MarkGoalPlanned CAS) short-circuits a second call, and each child's deterministic id makes re-insert a no-op.

Steps:

  • depth guard: parent.depth+1 ≤ max_depth (root's convergence_policy), else ErrDepthExceeded.
  • validate content (≥1 required child, edge keys resolve, no cycle), else ErrInvalidDecomposition / ErrCycle.
  • for each ProposedChild: insert (skip-if-exists) a child goal (parent_id=composite, root_id=composite.root_id, depth=parent.depth+1, position=index, contract/policy from the proposal, lifecycle draft).
  • for each ProposedEdge: insert agent_goal_edge (resolve keys→ids); PK collision = no-op.
  • set parent required_total (count of required children); CAS planned_at.

Replan reconcile (cancelling/detaching children dropped by a later plan) is intentionally out of scope: a composite is decomposed exactly once (planned_at gates re-decomposition). Re-enabling replan is a tracked follow-up (contract §10).

The caller MUST hold LockGoalForWrite on parent so the planned_at CAS and child inserts cannot race a concurrent materialize.

childSessions maps a child Key → a pre-minted session_id. A missing entry is a programming error the caller (converge.go) prevents by minting all sessions before opening the tx.

func (*GoalService) ReapAttempt

func (s *GoalService) ReapAttempt(ctx context.Context, attemptID string) error

ReapAttempt finalizes a stale (lease-expired) attempt as 'interrupted' and returns its goal to ready within the convergence budget, or budget-blocks it if exhausted — one tx (contract §2.2 running→interrupted). Idempotent: an attempt no longer queued/running is a no-op (FinalizeAttempt affects 0 rows), surfacing as ErrInvalidTransition the dispatcher ignores.

func (*GoalService) Reattempt

func (s *GoalService) Reattempt(ctx context.Context, id string, by Actor) error

Reattempt raises the budget on a blocked(budget_exhausted) goal so the next tick mints a fresh attempt. A leaf returns to ready (the dispatcher claims it); a composite, whose budget meters DECOMPOSITION attempts, returns to draft so scanAndDecompose re-plans it (contract §2.1, see recoveryLifecycle).

func (*GoalService) RecoverBlockedComposite added in v0.50.0

func (s *GoalService) RecoverBlockedComposite(ctx context.Context, id string) error

RecoverBlockedComposite re-evaluates a composite the rollup parked in blocked(dep) and wakes it back to active once its blocking children have cleared (contract §6). The rollup scans (ListRollupCandidates/ ListStalledComposites) only see active composites, so a parent driven to blocked(dep) — e.g. while a required child awaits plan approval or a verdict — never re-enters rollup on its own even after that child recovers. The dispatcher runs this over ListBlockedDepComposites each tick. Idempotent: a no-op while any required child is still blocked.

func (*GoalService) RejectPlan added in v0.50.0

func (s *GoalService) RejectPlan(ctx context.Context, goalID, reason string, by Actor) error

RejectPlan applies a human rejection of a composite's proposed plan: it clears the plan and returns the composite to draft so the dispatcher re-decomposes it (contract §2.3). Only a composite blocked on plan approval AND not yet materialized (planned_at IS NULL) is accepted — replan after materialize is not supported (childID would collide; see materializer.go).

func (*GoalService) RollupAccept

func (s *GoalService) RollupAccept(ctx context.Context, id string) error

RollupAccept runs a composite parent's own Accept gate once all required children are accepted (contract §6, RollupComposite ⇒ accept_parent). A trivial-contract composite accepts immediately; an authored contract folds via applyAcceptance. One tx; bumps this parent's own parent counter.

func (*GoalService) RollupFail

func (s *GoalService) RollupFail(ctx context.Context, id string) error

RollupFail moves a composite to rejected_final because a required child reached a terminal-bad state (contract §6, RollupComposite ⇒ fail), bumping this parent's own parent required_failed counter. One tx.

func (*GoalService) SetClock

func (s *GoalService) SetClock(c func() time.Time)

SetClock overrides the clock for tests.

func (*GoalService) Submit

func (s *GoalService) Submit(ctx context.Context, attemptID string, ev AttemptEvidence, out AttemptOutput) error

Submit advances an attempt running→submitted, writes its evidence+output, and folds acceptance (contract §2.2, §5 steps 5-7). It is the worker's single durable call after running the executor (and, for deterministic items, the CheckRunner that appended its check events). An empty summary on a non-root goal is a retryable protocol miss (ErrInvalidEvidence). The fold runs in the SAME tx so submit→evaluate→transition is atomic.

func (*GoalService) SubmitDecomposition added in v0.50.0

func (s *GoalService) SubmitDecomposition(ctx context.Context, attemptID string, ev AttemptEvidence, content DecompositionContent) error

SubmitDecomposition applies a successful autonomous decomposition attempt: it records the produced plan on the composite goal, then branches on review policy (contract §2.3, §6). For review_policy=none it materializes the children and releases them so the tree runs; for review_policy=human it parks the composite blocked(needs_plan_approval) so a human can ApprovePlan/RejectPlan. It is the decomposition analogue of Submit and the single durable transition the worker applies for a purpose=decomposition attempt. For the none path child sessions are pre-minted OUTSIDE the tx (their own minting tx would self-deadlock against the held one); everything else runs in ONE tx so a crash never leaves a half-planned composite.

func (*GoalService) SubmitVerdict

func (s *GoalService) SubmitVerdict(ctx context.Context, in VerdictInput) error

SubmitVerdict appends a human verdict event and re-runs applyAcceptance (contract §2.1 blocked(needs_verdict)→active/accepted). The verdict is evidence, never a state write.

func (*GoalService) Unarchive

func (s *GoalService) Unarchive(ctx context.Context, id string) error

Unarchive clears the archived flag.

func (*GoalService) Unblock

func (s *GoalService) Unblock(ctx context.Context, id string, by Actor) error

Unblock clears a recoverable block: blocked(dep)→ready (leaf) or →active/draft (composite, see recoveryLifecycle) when the condition cleared (contract §2.1).

func (*GoalService) UpdateMetadata

func (s *GoalService) UpdateMetadata(ctx context.Context, id string, in UpdateInput) (sqlc.AgentGoal, error)

UpdateMetadata applies a partial metadata edit (contract §2 — metadata is mutable; lifecycle is not). It validates the value-sets it touches and returns the refreshed row. Lifecycle, counters, and projection are untouched.

func (*GoalService) WaiveEdge

func (s *GoalService) WaiveEdge(ctx context.Context, downstreamID, upstreamID, reason string, by Actor) error

WaiveEdge waives a hard edge so a blocked(dep) downstream can proceed (contract §2.1, blocked(dep)→ready).

type HumanVerdict

type HumanVerdict struct {
	GoalID         string
	ItemID         string
	Pass           bool
	Rationale      string
	Scope          string
	ScopeHash      string // the accepted-output/artifact hash the verdict covers
	ReviewerUserID string
	AttemptID      string // the evaluated attempt whose output the verdict judges
}

HumanVerdict is the API-delivered human decision for an authority=human judgment item (contract §4.2). The service validates and folds it into an acceptance_event; it never sets acceptance_state directly.

func (HumanVerdict) Valid

func (v HumanVerdict) Valid() bool

Valid reports whether a human verdict is well-formed. A verdict must name the item it answers and the reviewer who authored it; an empty scope_hash is allowed (it forces re-request on any subsequent output, the conservative default) but the identity fields are mandatory.

type Option

type Option func(*GoalService)

Option configures a GoalService at construction.

func WithCheckRunner

func WithCheckRunner(r CheckRunner) Option

WithCheckRunner sets the deterministic check runner.

func WithConfig

func WithConfig(c Config) Option

WithConfig overrides the service config (defaults filled for zero fields).

func WithExecutor

func WithExecutor(e Executor) Option

WithExecutor sets the attempt executor.

func WithPlanningSessionMinter

func WithPlanningSessionMinter(m SessionMinter) Option

WithPlanningSessionMinter sets the decomposition planning-session minter.

func WithSessionMinter

func WithSessionMinter(m SessionMinter) Option

WithSessionMinter sets the worker-session minter.

type Projection

type Projection struct {
	State        string   // pending | passed | failed
	PendingItems []string // required items with no terminal (or only stale) event
	Gaps         Evaluation
	NeedsVerdict bool // a required human-authority item is pending/stale
}

Projection is the derived acceptance view of a goal — the result of folding its acceptance ledger against its contract (contract §4.3). The service maps a Projection to exactly one lifecycle transition; nothing else produces a Projection or writes acceptance_state.

func DeriveAcceptance

func DeriveAcceptance(c AcceptanceContract, currentOutputHash string, events []sqlc.AgentGoalAcceptanceEvent) Projection

DeriveAcceptance is the PURE fold that turns events → projection (contract §4.3). It is the single place acceptance is decided. It takes the latest VALID event per item — a judgment event is valid only if its scope_hash matches currentOutputHash (§4.2 staleness); a deterministic event is taken as authoritative for its item. Policy evaluation then collapses item outcomes to a state. No DB calls, no clock, no side effects.

type ProposedChild

type ProposedChild struct {
	Key                string             `json:"key"`
	Title              string             `json:"title"`
	Intent             string             `json:"intent"`
	Kind               string             `json:"kind"` // leaf | composite
	Required           bool               `json:"required"`
	AcceptanceContract AcceptanceContract `json:"acceptance_contract"`
	ConvergencePolicy  ConvergencePolicy  `json:"convergence_policy"`
	ReviewPolicy       string             `json:"review_policy,omitempty"` // a composite child carries it
}

ProposedChild is one child goal a plan proposes. Key is the stable id within the plan and the materialize idempotency key (replaces the old plan_item_id).

type ProposedEdge

type ProposedEdge struct {
	DownstreamKey string `json:"downstream_key"`
	UpstreamKey   string `json:"upstream_key"`
	Kind          string `json:"kind"`       // hard | soft
	OnFailure     string `json:"on_failure"` // block | fail | ignore
}

ProposedEdge is one sibling dependency a plan proposes, by child Key.

type Readiness

type Readiness struct {
	State        string
	Dispatchable bool
	Reasons      []Reason
}

Readiness is the computed dispatchability view of a goal at a moment. Pure function over (goal row, edges with upstream state pre-joined, now).

func Compute

Compute returns the readiness of a goal given its upstream edges (with upstream lifecycle pre-joined) at time now. PURE: no DB calls, no side effects, no clock reads beyond now. An edge means an accepted-output dependency — a hard edge is satisfied only when the upstream is 'accepted' (or the edge is waived); a soft edge is advisory and never blocks.

now is reserved for future deferral gates (not_before equivalents); it is accepted now so the signature is stable when those land.

type Reason

type Reason struct {
	Type       string // upstream_not_accepted, upstream_failed_block, soft_upstream_pending, ...
	UpstreamID string
	Detail     string
}

Reason explains a readiness state, e.g. an unsatisfied upstream edge.

type Result

type Result struct {
	Action        TerminalAction
	Evidence      AttemptEvidence
	Output        AttemptOutput
	Decomposition *DecompositionContent // purpose=decomposition only
	Blocker       *Blocker
	Failure       *Failure
	// RepairAttempted is set when a text-only first turn triggered one bounded
	// repair turn that still produced no terminal action. It only carries meaning
	// for terminalNone and lets the worker distinguish a silent miss from a
	// failed repair.
	RepairAttempted bool
}

Result is the executor's rich internal outcome for one attempt: exactly one terminal action (or terminalNone when the agent ended without declaring one). Execute folds this down to the frozen ExecutorResult the worker consumes.

type RollupVerdict

type RollupVerdict string

RollupVerdict is the decision RollupComposite returns over a composite parent's incremental rollup counters. The service acts on exactly one.

const (
	// RollupWait — required children still in flight; no transition (no-op).
	RollupWait RollupVerdict = "wait"
	// RollupAcceptParent — all required children accepted; run the parent's own
	// Accept gate (its contract; trivial ⇒ immediate accept).
	RollupAcceptParent RollupVerdict = "accept_parent"
	// RollupBlock — a required child is blocked; the parent surfaces the stall.
	RollupBlock RollupVerdict = "block"
	// RollupFail — a required child reached a terminal-bad state
	// (rejected_final/abandoned/cancelled); the requirement is permanently unmet.
	RollupFail RollupVerdict = "fail"
)

func RollupComposite

func RollupComposite(parent sqlc.AgentGoal) RollupVerdict

RollupComposite is the PURE decision over a composite parent's incremental required_* counters (contract §6). It never scans the subtree — the counters are bumped in the same tx that transitions each child, so the parent reads O(1) state. Precedence mirrors the goal-rollup table: a failed required child fails the parent; else a blocked one blocks it; else all-accepted accepts it; else wait.

A parent with no required children yet (required_total == 0) waits: the materializer may still be inserting children, and a vacuous "all required accepted" must never auto-accept a composite that has nothing to roll up.

type Service

type Service struct {
	Queries    *sqlc.Queries
	Goal       *GoalService
	Dispatcher *Dispatcher
	// contains filtered or unexported fields
}

Service is the boot-level bundle the server + CLI bind to. It owns the GoalService (the single durable-state writer) and the dispatcher, and exposes the read+command surface the HTTP handlers call. Every mutating method delegates to GoalService so the "lifecycle is written only through a transition" invariant holds; reads go straight to the querier.

The goal subsystem does NOT own a River client: it contributes its queue + worker to the single process-wide client (RegisterRiverWorker / GoalQueueConfig) and receives that client back as its dispatcher's enqueuer (SetRiverClient). The client's Start/Stop lifecycle belongs to the composition root.

func Boot

func Boot(cfg BootConfig) (*Service, error)

Boot constructs the goal system and returns the bound bundle. The dispatcher is built but not ticking; the composition root injects the shared client via SetRiverClient and the server registers the single-leader tick via StartDispatchTick.

(Named Boot, not New: the package's GoalService constructor already owns New(db, q, …). This is the bundle/wiring entry the server binds to.)

Wiring: the worker executor (agent-backed when Chat is non-nil, else a noop that fails non-retryably) plus the worker + planning session minters are registered on the GoalService; one Worker drives claimed attempts and the Dispatcher schedules the convergence loop over it.

func (*Service) Abandon

func (s *Service) Abandon(ctx context.Context, id, reason string, by Actor) error

Abandon is the human give-up on a budget-exhausted block.

func (*Service) Activate

func (s *Service) Activate(ctx context.Context, id string) (sqlc.AgentGoal, error)

Activate runs the plan gate: draft→ready.

func (*Service) AddEdge

func (s *Service) AddEdge(ctx context.Context, downstreamID, upstreamID, kind, onFailure string) (sqlc.AgentGoalEdge, error)

AddEdge inserts an accepted-output dependency between siblings (cycle-checked).

func (*Service) ApprovePlan added in v0.50.0

func (s *Service) ApprovePlan(ctx context.Context, goalID string, by Actor) error

ApprovePlan approves a composite's proposed plan (blocked(needs_plan_approval)), materializing its children and resuming the tree.

func (*Service) Archive

func (s *Service) Archive(ctx context.Context, id string) error

Archive soft-flags a terminal goal out of default lists.

func (*Service) Cancel

func (s *Service) Cancel(ctx context.Context, id, reason string, by Actor) error

Cancel cascades a cancel over the subtree.

func (*Service) CountGoals

func (s *Service) CountGoals(ctx context.Context, userID string, filter GoalFilter) (int64, error)

CountGoals returns the total root goals matching the same filter as ListGoals — it drives the list's exact `total` and the active/history/archived header badges (three counts varying only terminal/archived).

func (*Service) CreateGoal

func (s *Service) CreateGoal(ctx context.Context, in CreateInput) (sqlc.AgentGoal, error)

CreateGoal mints a root goal (a goal) in 'draft', minting its worker session first. Children are created by Materialize, not this entry.

func (*Service) GetAttempt

func (s *Service) GetAttempt(ctx context.Context, attemptID string) (sqlc.AgentGoalAttempt, error)

GetAttempt returns one attempt by id.

func (*Service) GetGoal

func (s *Service) GetGoal(ctx context.Context, id string) (sqlc.AgentGoal, error)

GetGoal returns one goal, mapping a missing row to ErrNotFound.

func (*Service) GetReadiness

func (s *Service) GetReadiness(ctx context.Context, id string) (Readiness, error)

GetReadiness loads a goal + its upstream edges (with upstream lifecycle pre-joined) and returns the computed dispatchability view.

func (*Service) GoalQueueConfig added in v0.50.0

func (s *Service) GoalQueueConfig() (string, river.QueueConfig)

GoalQueueConfig returns the goal attempt queue name and per-node worker config for the composition root assembling the shared working client.

func (*Service) GoalTickQueueConfig added in v0.50.0

func (s *Service) GoalTickQueueConfig() (string, river.QueueConfig)

GoalTickQueueConfig returns the convergence-tick queue and its per-node worker config (one worker: the tick never overlaps itself on a node). The composition root adds it alongside GoalQueueConfig.

func (*Service) ListAcceptanceEvents

func (s *Service) ListAcceptanceEvents(ctx context.Context, id string) ([]sqlc.AgentGoalAcceptanceEvent, error)

ListAcceptanceEvents returns the acceptance ledger for a goal, in fold (seq) order — the audit trail.

func (*Service) ListAttempts

func (s *Service) ListAttempts(ctx context.Context, id string) ([]sqlc.AgentGoalAttempt, error)

ListAttempts returns the execution attempts for a goal, newest first.

func (*Service) ListChildren

func (s *Service) ListChildren(ctx context.Context, parentID string) ([]sqlc.AgentGoal, error)

ListChildren lists the direct children of a composite goal, in position order.

func (*Service) ListEdges

func (s *Service) ListEdges(ctx context.Context, id string) ([]sqlc.AgentGoalEdge, error)

ListEdges returns the upstream dependency edges of a goal.

func (*Service) ListGoals

func (s *Service) ListGoals(ctx context.Context, userID string, filter GoalFilter, limit, offset int64) ([]sqlc.AgentGoal, error)

ListGoals lists root goals (goals: parent_id IS NULL) for a user, narrowed by filter. Empty filter strings match all rows.

func (*Service) ListSubtree

func (s *Service) ListSubtree(ctx context.Context, rootID string) ([]sqlc.AgentGoal, error)

ListSubtree lists every goal in a tree (the whole root_id family).

func (*Service) Reattempt

func (s *Service) Reattempt(ctx context.Context, id string, by Actor) error

Reattempt raises the budget on a blocked(budget_exhausted) goal.

func (*Service) RegisterRiverWorker added in v0.50.0

func (s *Service) RegisterRiverWorker(workers *river.Workers)

RegisterRiverWorker registers the goal subsystem's workers into a shared workers bundle used to build the process-wide River client: the attempt executor and the convergence-tick worker (River Phase 2b). Call before building the client (composition root).

func (*Service) RejectPlan added in v0.50.0

func (s *Service) RejectPlan(ctx context.Context, goalID, reason string, by Actor) error

RejectPlan rejects a composite's proposed plan, returning it to draft for the dispatcher to re-decompose.

func (*Service) SetRiverClient added in v0.50.0

func (s *Service) SetRiverClient(c *river.Client[pgx.Tx])

SetRiverClient injects the shared working River client: it becomes the dispatcher's enqueuer and the target StartDispatchTick registers the periodic against. Call after the client is built and before StartDispatchTick.

func (*Service) StartDispatchTick added in v0.50.0

func (s *Service) StartDispatchTick() (rivertype.PeriodicJobHandle, error)

StartDispatchTick registers the convergence tick as a single-leader River periodic job, replacing the per-node in-process ticker (River Phase 2b). River enqueues a periodic only on the elected leader and ByState uniqueness keeps at most one tick live, so the cluster runs a single convergence loop; any node's tick worker may run a fired tick. RunOnStart fires an immediate tick on (re-)election so convergence resumes promptly after a failover or cold start rather than waiting a full interval (the cost is one extra idempotent pass on failover). Returns the handle for StopDispatchTick. Requires SetRiverClient first.

func (*Service) StopDispatchTick added in v0.50.0

func (s *Service) StopDispatchTick(handle rivertype.PeriodicJobHandle)

StopDispatchTick removes the convergence-tick periodic so no further ticks are enqueued. In-flight ticks drain when the shared client stops.

func (*Service) SubmitVerdict

func (s *Service) SubmitVerdict(ctx context.Context, in VerdictInput) error

SubmitVerdict appends a human verdict event and re-folds acceptance.

func (*Service) Unarchive

func (s *Service) Unarchive(ctx context.Context, id string) error

Unarchive clears the archived flag.

func (*Service) UpdateGoal

func (s *Service) UpdateGoal(ctx context.Context, id string, in UpdateInput) (sqlc.AgentGoal, error)

UpdateGoal applies a partial metadata edit (PATCH).

func (*Service) WaiveEdge

func (s *Service) WaiveEdge(ctx context.Context, downstreamID, upstreamID, reason string, by Actor) error

WaiveEdge waives a hard edge so a blocked(dep) downstream can proceed.

type SessionMinter

type SessionMinter func(ctx context.Context, userID, agentID, projectID string) (string, error)

SessionMinter returns a fresh durable session id for a goal's persistent agent session (worker) or a planning session (decomposition). It is called OUTSIDE every tx: minting a session opens its own transaction, and running it inside the service tx would self-deadlock (the inner write blocks on a row the outer tx holds) and pin a pooled connection. Implementations land in the integration phase (session.go).

func RegistryPlanningSessionMinter

func RegistryPlanningSessionMinter(sm agent.ServiceManager) SessionMinter

RegistryPlanningSessionMinter mints the dedicated session a composite is decomposed in. Unlike worker sessions it uses KindDelegate/ChannelDelegate so the owning agent can resume it through the delegate tool and the user can re-open it from the UI to refine the decomposition by chatting (#525 planning sessions). Like the worker minter it runs OUTSIDE every service tx. Satisfies SessionMinter.

func RegistrySessionMinter

func RegistrySessionMinter(sm agent.ServiceManager) SessionMinter

RegistrySessionMinter mints a goal's worker (execution) session through the agent registry. The session is hidden — KindTask/ChannelTask — so it never appears in user-facing session lists or review candidates, matching the old tasks worker. It is called OUTSIDE every service tx (minting opens its own transaction; running it inside the service tx would self-deadlock and pin a pooled connection). Satisfies SessionMinter.

type SessionResolver

type SessionResolver func(ctx context.Context, goalID string) (sandbox.Session, error)

SessionResolver returns the live sandbox session a goal's deterministic checks run in. The session is minted OUTSIDE any service tx (sandbox boot opens its own writer; see service.go's SessionMinter note) and resolved here by the worker. Returning a nil session with a nil error means "no sandbox for this goal" — the runner surfaces that as ErrNoSandbox rather than panicking on a nil Exec.

type TaskChatFunc

type TaskChatFunc func(ctx context.Context, p TaskChatParams) <-chan agent.Event

TaskChatFunc runs one worker turn through the agent service layer so the transcript persists to the goal's session and prior turns load as history. An unknown agent surfaces as an Err event on the returned channel.

type TaskChatParams

type TaskChatParams struct {
	AgentID   string
	UserID    string
	SessionID string
	ProjectID string
	Prompt    string
	// Decompose routes the turn to the decomposition planning session
	// (KindDelegate) instead of the worker session (KindTask). Set for
	// purpose=decomposition attempts; the two session kinds resolve differently.
	Decompose  bool
	ExtraTools []tools.Tool
}

TaskChatParams is the worker-turn request passed to BootConfig.Chat. It mirrors the old tasks.TaskChatParams so the stellad wiring carries over verbatim: the callback resolves AgentID to an agent service and runs one persisted turn in the goal's session. executor.go consumes this type; it is declared here because BootConfig.Chat is its only producer.

type TerminalAction

type TerminalAction string

TerminalAction is the durable outcome an executor reports for one attempt. The worker maps it to exactly one service transition; the agent never mutates goal/attempt state directly (the contract's non-negotiable).

type UpdateInput

type UpdateInput struct {
	Title        *string
	Intent       *string
	Priority     *string
	ReviewPolicy *string
	Contract     *AcceptanceContract
	Convergence  *ConvergencePolicy
}

UpdateInput is the mutable metadata of a goal (PATCH). A nil pointer leaves that field unchanged; the writer reads current values and overlays the provided ones so a partial edit never clobbers untouched columns.

type VerdictInput

type VerdictInput struct {
	GoalID         string
	ItemID         string
	Result         string // pass | fail
	Rationale      string
	Scope          string
	ScopeHash      string // the accepted-output/artifact hash the verdict covers
	ReviewerUserID string
}

VerdictInput is a human judgment-as-evidence (contract §4.2). The handler appends it as an acceptance_event and re-folds; it never sets acceptance_state.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker runs ONE claimed execution/decomposition attempt to its single durable transition. It is the only place the executor (agent IO) and the CheckRunner (sandbox IO) run; both are pure with respect to durable state. The worker applies EXACTLY ONE service transition at the boundary (contract §5 steps 4–7): submit→fold on a submitted attempt, or a finalize-failed on a failed one. Nothing the worker does writes acceptance_state/counters — only the service, through Submit→applyAcceptance, does.

func NewWorker

func NewWorker(svc *GoalService, q *sqlc.Queries) *Worker

NewWorker wires a worker against a service. The executor and check runner default to the ones registered on the service (WithExecutor/WithCheckRunner) so the dispatcher can spawn workers with a single dependency.

func (*Worker) Run

func (w *Worker) Run(ctx context.Context, goalID, attemptID string, actor Actor) (err error)

Run drives one claimed attempt. Responsibilities:

  • promote the attempt queued→running with started_at + initial lease (a zero-row promote means the attempt was already interrupted/cancelled out from under us — abort with ErrInvalidTransition so a superseded attempt never executes or applies a transition);
  • keep lease_expires_at fresh while the executor runs (heartbeat);
  • run the executor, then for a submitted attempt run the deterministic CheckRunner, append the results as acceptance_events via the service, and apply the single submit transition (service.Submit → applyAcceptance);
  • for a failed (or no-action) attempt apply the single finalize-failed transition so convergence can mint the next attempt or block;
  • turn an executor panic into a non-retryable failure.

func (*Worker) SetHeartbeat

func (w *Worker) SetHeartbeat(d time.Duration)

SetHeartbeat overrides the heartbeat interval (for tests).

func (*Worker) SetLease

func (w *Worker) SetLease(d time.Duration)

SetLease overrides the lease duration (for tests).

type WorkerRunner

type WorkerRunner interface {
	// Run executes the given (already-claimed, queued) attempt to a terminal
	// attempt state, folding acceptance through the service. It must be safe to
	// call concurrently for distinct attempts.
	Run(ctx context.Context, goalID, attemptID string) error
}

WorkerRunner drives one claimed attempt to completion: promote → execute → run checks → apply the single fold transition. The dispatcher owns the concurrency budget and lifetime; the worker (worker.go, integration phase) owns the per-attempt machinery. Declared here so the dispatcher compiles against the boundary without depending on the worker implementation — the same Executor/CheckRunner-style decoupling the spine uses for the service's collaborators.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL