Documentation
¶
Overview ¶
Package goal is the recursive execution core: one entity (the Goal) whose completion is DERIVED from an append-only acceptance ledger, never asserted. A goal is a root goal (parent_id NULL); children are the same shape all the way down. GoalService owns every durable write; the acceptance fold owns "done is derived".
The package is flat: foundation types (enums, contract/evidence structs, pure folds) live alongside the service and integration surface so they share the unexported helpers directly. Information hiding is by unexported identifiers, not subpackages.
Index ¶
- Constants
- Variables
- func CacheKey(item AcceptanceItem, env CheckEnv) string
- func ContentHash(parts ...any) string
- func HashOutput(out AttemptOutput) string
- func HashWithArtifacts(out AttemptOutput, arts []ArtifactRef) string
- func HumanVerdictEvent(v HumanVerdict, item AcceptanceItem) sqlc.AppendAcceptanceEventParams
- func IsTerminalLifecycle(s string) bool
- func RegisterGoalTickWorker(workers *river.Workers, d *Dispatcher, log *slog.Logger)
- func RegisterGoalWorker(workers *river.Workers, runner WorkerRunner, log *slog.Logger)
- func ValidAcceptanceState(s string) bool
- func ValidAttemptStatus(s string) bool
- func ValidAuthority(s string) bool
- func ValidBlockReason(s string) bool
- func ValidContractPolicy(s string) bool
- func ValidEdgeKind(s string) bool
- func ValidEscalation(s string) bool
- func ValidItemKind(s string) bool
- func ValidKind(s string) bool
- func ValidLifecycle(s string) bool
- func ValidOnFailure(s string) bool
- func ValidPriority(s string) bool
- func ValidPurpose(s string) bool
- func ValidResult(s string) bool
- func ValidReviewPolicy(s string) bool
- func ValidateDecomposition(c DecompositionContent, parentDepth, maxDepth int) error
- type AcceptanceContract
- type AcceptanceEventDetail
- type AcceptanceItem
- type AcceptedOutput
- type Actor
- type ArtifactRef
- type AttemptEnqueuer
- type AttemptEvidence
- type AttemptInput
- type AttemptOutput
- type Blocker
- type BootConfig
- type CheckEnv
- type CheckResult
- type CheckRunner
- type Config
- type ConvergencePolicy
- type CreateInput
- type Criterion
- type DecompositionContent
- type Dispatcher
- type DispatcherConfig
- type Evaluation
- type Executor
- type ExecutorRequest
- type ExecutorResult
- type Failure
- type Gap
- type GoalFilter
- type GoalService
- func (s *GoalService) Abandon(ctx context.Context, id, reason string, by Actor) error
- func (s *GoalService) Activate(ctx context.Context, id string) (sqlc.AgentGoal, error)
- func (s *GoalService) AddEdge(ctx context.Context, downstreamID, upstreamID, kind, onFailure string) (sqlc.AgentGoalEdge, error)
- func (s *GoalService) ApprovePlan(ctx context.Context, goalID string, by Actor) error
- func (s *GoalService) Archive(ctx context.Context, id string) error
- func (s *GoalService) BeginAutoDecomposition(ctx context.Context, id string, enqueue AttemptEnqueuer) (sqlc.AgentGoalAttempt, error)
- func (s *GoalService) BeginDecomposition(ctx context.Context, id string) (sqlc.AgentGoalAttempt, error)
- func (s *GoalService) Block(ctx context.Context, id, reason string, by Actor) error
- func (s *GoalService) Cancel(ctx context.Context, id, reason string, by Actor) error
- func (s *GoalService) Claim(ctx context.Context, id, workerID string, enqueue AttemptEnqueuer) (sqlc.AgentGoalAttempt, error)
- func (s *GoalService) CreateGoal(ctx context.Context, in CreateInput) (sqlc.AgentGoal, error)
- func (s *GoalService) CreateRoot(ctx context.Context, in CreateInput) (sqlc.AgentGoal, error)
- func (s *GoalService) FailAttempt(ctx context.Context, attemptID, reason string) error
- func (s *GoalService) Materialize(ctx context.Context, qtx *sqlc.Queries, parent sqlc.AgentGoal, ...) error
- func (s *GoalService) ReapAttempt(ctx context.Context, attemptID string) error
- func (s *GoalService) Reattempt(ctx context.Context, id string, by Actor) error
- func (s *GoalService) RecoverBlockedComposite(ctx context.Context, id string) error
- func (s *GoalService) RejectPlan(ctx context.Context, goalID, reason string, by Actor) error
- func (s *GoalService) RollupAccept(ctx context.Context, id string) error
- func (s *GoalService) RollupFail(ctx context.Context, id string) error
- func (s *GoalService) SetClock(c func() time.Time)
- func (s *GoalService) Submit(ctx context.Context, attemptID string, ev AttemptEvidence, out AttemptOutput) error
- func (s *GoalService) SubmitDecomposition(ctx context.Context, attemptID string, ev AttemptEvidence, ...) error
- func (s *GoalService) SubmitVerdict(ctx context.Context, in VerdictInput) error
- func (s *GoalService) Unarchive(ctx context.Context, id string) error
- func (s *GoalService) Unblock(ctx context.Context, id string, by Actor) error
- func (s *GoalService) UpdateMetadata(ctx context.Context, id string, in UpdateInput) (sqlc.AgentGoal, error)
- func (s *GoalService) WaiveEdge(ctx context.Context, downstreamID, upstreamID, reason string, by Actor) error
- type HumanVerdict
- type Option
- type Projection
- type ProposedChild
- type ProposedEdge
- type Readiness
- type Reason
- type Result
- type RollupVerdict
- type Service
- func (s *Service) Abandon(ctx context.Context, id, reason string, by Actor) error
- func (s *Service) Activate(ctx context.Context, id string) (sqlc.AgentGoal, error)
- func (s *Service) AddEdge(ctx context.Context, downstreamID, upstreamID, kind, onFailure string) (sqlc.AgentGoalEdge, error)
- func (s *Service) ApprovePlan(ctx context.Context, goalID string, by Actor) error
- func (s *Service) Archive(ctx context.Context, id string) error
- func (s *Service) Cancel(ctx context.Context, id, reason string, by Actor) error
- func (s *Service) CountGoals(ctx context.Context, userID string, filter GoalFilter) (int64, error)
- func (s *Service) CreateGoal(ctx context.Context, in CreateInput) (sqlc.AgentGoal, error)
- func (s *Service) GetAttempt(ctx context.Context, attemptID string) (sqlc.AgentGoalAttempt, error)
- func (s *Service) GetGoal(ctx context.Context, id string) (sqlc.AgentGoal, error)
- func (s *Service) GetReadiness(ctx context.Context, id string) (Readiness, error)
- func (s *Service) GoalQueueConfig() (string, river.QueueConfig)
- func (s *Service) GoalTickQueueConfig() (string, river.QueueConfig)
- func (s *Service) ListAcceptanceEvents(ctx context.Context, id string) ([]sqlc.AgentGoalAcceptanceEvent, error)
- func (s *Service) ListAttempts(ctx context.Context, id string) ([]sqlc.AgentGoalAttempt, error)
- func (s *Service) ListChildren(ctx context.Context, parentID string) ([]sqlc.AgentGoal, error)
- func (s *Service) ListEdges(ctx context.Context, id string) ([]sqlc.AgentGoalEdge, error)
- func (s *Service) ListGoals(ctx context.Context, userID string, filter GoalFilter, limit, offset int64) ([]sqlc.AgentGoal, error)
- func (s *Service) ListSubtree(ctx context.Context, rootID string) ([]sqlc.AgentGoal, error)
- func (s *Service) Reattempt(ctx context.Context, id string, by Actor) error
- func (s *Service) RegisterRiverWorker(workers *river.Workers)
- func (s *Service) RejectPlan(ctx context.Context, goalID, reason string, by Actor) error
- func (s *Service) SetRiverClient(c *river.Client[pgx.Tx])
- func (s *Service) StartDispatchTick() (rivertype.PeriodicJobHandle, error)
- func (s *Service) StopDispatchTick(handle rivertype.PeriodicJobHandle)
- func (s *Service) SubmitVerdict(ctx context.Context, in VerdictInput) error
- func (s *Service) Unarchive(ctx context.Context, id string) error
- func (s *Service) UpdateGoal(ctx context.Context, id string, in UpdateInput) (sqlc.AgentGoal, error)
- func (s *Service) WaiveEdge(ctx context.Context, downstreamID, upstreamID, reason string, by Actor) error
- type SessionMinter
- type SessionResolver
- type TaskChatFunc
- type TaskChatParams
- type TerminalAction
- type UpdateInput
- type VerdictInput
- type Worker
- type WorkerRunner
Constants ¶
const ( ReadinessDispatchable = "dispatchable" ReadinessWaitingDeps = "waiting_deps" ReadinessBlocked = "blocked" ReadinessActive = "active" ReadinessTerminal = "terminal" ReadinessDraft = "draft" ReadinessComposite = "composite" // a composite is driven by rollup, not claimed ReadinessUnknown = "unknown" )
Readiness states. dispatchable is the only state the dispatcher claims on; the rest are diagnostic (surfaced via the readiness API to explain "why isn't this running?").
const ( // GoalQueue isolates goal-attempt execution from the scheduler's queue on the // shared River client. GoalQueue = "stella_goal" // GoalTickQueue runs the dispatcher convergence tick (River Phase 2b). It is a // dedicated queue with one worker per node so a slow tick never consumes // attempt-execution slots and never overlaps itself on a node; combined with // leader-only periodic enqueue and ByState uniqueness, the cluster runs a // single convergence loop (at most one live tick at a time) rather than one // scan per node. A leader failover may run one extra (idempotent) pass. GoalTickQueue = "stella_goal_tick" )
River Phase 2a: goal attempt execution runs on durable River jobs instead of the old in-process worker pool. Claiming a leaf mints an attempt and enqueues ONE job here; a worker promotes and drives it to a terminal attempt state. River owns durability, multi-node distribution, and graceful drain — but NOT retry: MaxAttempts is 1 so the same attempt_id is never re-run. Retry is the goal convergence model's job (attempt_count / reopenForRework), driven by the dispatcher reaper off the lease.
The goal queue's worker runs on the single process-wide River client (db.NewWorkingRiverClient): there must be exactly one electable client per database, so the goal subsystem contributes its queue + worker to that shared client (RegisterGoalWorker / GoalQueueConfig) rather than building its own.
const ( LifecycleDraft = "draft" LifecycleReady = "ready" LifecycleActive = "active" LifecycleBlocked = "blocked" LifecycleAccepted = "accepted" // terminal-good; accepted_output frozen LifecycleRejectedFinal = "rejected_final" // judgment said no, no rework path left LifecycleAbandoned = "abandoned" // budget exhausted + give-up LifecycleCancelled = "cancelled" )
Goal lifecycle (contract §2). The single state machine every goal — root or child, leaf or composite — runs through.
const ( BlockBudgetExhausted = "budget_exhausted" BlockNeedsPlanApproval = "needs_plan_approval" BlockNeedsVerdict = "needs_verdict" BlockDep = "dep" )
Block reasons (meaningful only when lifecycle='blocked'). Precedence on concurrent causes: budget_exhausted > needs_plan_approval > needs_verdict > dep. needs_plan_approval is a composite's human-review gate on its proposed plan, before any child is materialized; it cannot co-occur with dep/needs_verdict (those require materialized children / a submitted attempt).
const ( AcceptancePending = "pending" AcceptancePassed = "passed" AcceptanceFailed = "failed" )
Acceptance projection — the evaluation RESULT, distinct from lifecycle.
const ( AttemptQueued = "queued" AttemptRunning = "running" AttemptSubmitted = "submitted" // evidence in; acceptance evaluates next AttemptInterrupted = "interrupted" AttemptFailed = "failed" AttemptCancelled = "cancelled" )
Attempt status (contract §2.2).
const ( PurposeExecution = "execution" PurposeDecomposition = "decomposition" PurposeReview = "review" )
Attempt purpose.
const ( KindLeaf = "leaf" KindComposite = "composite" )
Goal kind.
const ( PriorityRoutine = "routine" PriorityUrgent = "urgent" )
Scheduling priority.
const ( EdgeHard = "hard" EdgeSoft = "soft" )
Edge kind — hard blocks readiness, soft is advisory.
const ( OnFailureBlock = "block" OnFailureFail = "fail" OnFailureIgnore = "ignore" )
Edge on-failure policy when an upstream fails.
const ( ReviewNone = "none" ReviewHuman = "human" )
Decomposition review gate.
const ( PolicyDetThenJudgment = "deterministic_then_judgment" PolicyAll = "all" PolicyAny = "any" )
Acceptance-contract policy.
const ( ItemDeterministic = "deterministic" ItemJudgment = "judgment" )
Contract item kind.
const ( ResultPass = "pass" ResultFail = "fail" )
Acceptance outcome for one item.
const ( AuthoritySystem = "system" // deterministic check result AuthorityAgent = "agent" // reviewer-agent verdict AuthorityHuman = "human" // human verdict )
Verdict authority.
const ( EscalationBlock = "block" // human decides (default) EscalationAbandon = "abandon" // auto-terminal )
Convergence escalation on budget exhaustion.
const ( ActorSystem = "system" ActorUser = "user" ActorAgent = "agent" ActorWorker = "worker" ActorReviewer = "reviewer" ActorPlanner = "planner" )
Actor types.
Variables ¶
var ( // ErrInvalidTransition is returned when a transition's from-lifecycle (or // attempt from-status) no longer matches the row. Another tick // raced this one; the caller may re-fetch and retry. ErrInvalidTransition = errors.New("goal: invalid lifecycle transition") // ErrNotFound is returned when the target goal/attempt/edge // no longer exists. ErrNotFound = errors.New("goal: not found") // ErrPlanGate is returned by Activate when the plan gate is unmet: a leaf // with an empty non-trivial contract, or a composite that is not yet planned // (planned_at unset) / has no required children. ErrPlanGate = errors.New("goal: plan gate not satisfied") // ErrBudgetExhausted is informational: convergence reached MaxAttempts. The // goal moves to blocked(budget_exhausted) or abandoned/rejected_final // rather than retrying. ErrBudgetExhausted = errors.New("goal: convergence budget exhausted") // ErrInvalidContract is returned when an AcceptanceContract or // ConvergencePolicy fails structural validation at the write boundary. ErrInvalidContract = errors.New("goal: invalid acceptance contract") // ErrCompositeDeterministicContract is returned when a composite goal carries // a deterministic acceptance item. A composite produces no executed output, so // the deterministic check has no event source and the fold would stall pending // forever. Put deterministic checks on a leaf child, or use judgment items. ErrCompositeDeterministicContract = errors.New("goal: composite contract cannot contain deterministic items") // ErrInvalidDecomposition is returned when a composite's DecompositionContent // fails validation (no required child, dangling edge key, etc.). ErrInvalidDecomposition = errors.New("goal: invalid decomposition") // ErrDepthExceeded is returned at materialize when parent.depth+1 > max_depth. ErrDepthExceeded = errors.New("goal: decomposition depth exceeded") // ErrCycle is returned by AddEdge / decomposition validation when an edge // would close a cycle in the sibling dependency graph. ErrCycle = errors.New("goal: dependency cycle") // ErrConcurrencyCap is returned (or used to skip a candidate) when a Claim // would exceed the per-root or per-user in-flight attempt budget. ErrConcurrencyCap = errors.New("goal: concurrency cap reached") // ErrStaleProjection is returned when a fold computed against a lower seq // than the row's current acceptance_seq is rejected (stale-projection fence). ErrStaleProjection = errors.New("goal: stale acceptance projection") // ErrInvalidEvidence is returned when submitted evidence violates a contract // rule — e.g. a non-root goal submits an empty handoff summary. The // worker turns this into a retryable protocol miss. ErrInvalidEvidence = errors.New("goal: invalid evidence") // ErrInvalidVerdict is returned when a human/agent verdict is malformed // (unknown result/authority, empty scope_hash where required). ErrInvalidVerdict = errors.New("goal: invalid verdict") )
Typed sentinel errors. Callers branch on these via errors.Is, never on string Contains. APOSD "defined out of existence" governs acceptance: an unmet contract simply never produces a pass event, so the goal never reaches accepted — there is no "is it really done?" error here. These errors cover the guard/validation failures the rest of the package raises.
var ErrNoSandbox = errors.New("goal: no sandbox session for deterministic check")
ErrNoSandbox is returned when a deterministic check cannot resolve a sandbox session to run in. A check with no place to run is an infrastructure fault, not an unmet acceptance — it never silently reads as a pass.
Functions ¶
func CacheKey ¶
func CacheKey(item AcceptanceItem, env CheckEnv) string
CacheKey is the single constructor for a deterministic check's cache key (contract §4.1, invariant #9). It folds item id+command, sandbox image, repo tree hash, env hash, and the sorted upstream accepted-output hashes. If RepoTreeHash OR EnvHash is unavailable ("") the key is "" — a forced miss that is never written as a hit-eligible row. A false miss costs a re-run; a false hit ships broken work.
func ContentHash ¶
ContentHash is the canonical content-addressing hash used for AcceptedOutput.Hash and any artifact/verdict scope anchor. It hashes the JSON-canonical encoding of the parts so two structurally equal outputs hash identically. A single constructor keeps the cache key, the accepted-output hash, and the verdict scope_hash in agreement.
func HashOutput ¶
func HashOutput(out AttemptOutput) string
HashOutput computes the content hash of a candidate output's salient fields (summary + structured result). Stable under map ordering because json.Marshal sorts map keys. Artifacts are hash-addressed separately on the evidence; HashWithArtifacts folds them in when an output's identity must include its externalized artifacts.
func HashWithArtifacts ¶
func HashWithArtifacts(out AttemptOutput, arts []ArtifactRef) string
HashWithArtifacts extends an output hash with the sorted artifact hashes, so two outputs with identical prose but different artifacts hash distinctly.
func HumanVerdictEvent ¶
func HumanVerdictEvent(v HumanVerdict, item AcceptanceItem) sqlc.AppendAcceptanceEventParams
HumanVerdictEvent builds the append params for a human verdict (authority=human). seq/id/created_at are filled by appendAcceptanceEvent; this carries the verdict-as-evidence quartet (rationale/scope/authority/reviewer) + scope_hash.
func IsTerminalLifecycle ¶
IsTerminalLifecycle reports whether the lifecycle admits no further scheduling. 'blocked' is recoverable and so is NOT terminal.
func RegisterGoalTickWorker ¶ added in v0.50.0
func RegisterGoalTickWorker(workers *river.Workers, d *Dispatcher, log *slog.Logger)
RegisterGoalTickWorker registers the convergence-tick worker into the shared workers bundle (River Phase 2b). Paired with GoalTickQueueConfig and the periodic registered by Service.StartDispatchTick.
func RegisterGoalWorker ¶ added in v0.50.0
func RegisterGoalWorker(workers *river.Workers, runner WorkerRunner, log *slog.Logger)
RegisterGoalWorker registers the goal-attempt worker into a shared workers bundle used to build the process-wide River client. maxWorkers bounds concurrent attempt executions per node (the durable successor to the old in-process worker-pool cap); the per-root/per-user caps enforced at claim still bound total in-flight attempts cluster-wide.
func ValidAcceptanceState ¶
ValidAcceptanceState reports whether s is a known acceptance projection state.
func ValidAttemptStatus ¶
ValidAttemptStatus reports whether s is a known attempt status.
func ValidAuthority ¶
ValidAuthority reports whether s is a known verdict authority.
func ValidBlockReason ¶
ValidBlockReason reports whether s is a known block reason.
func ValidContractPolicy ¶
ValidContractPolicy reports whether s is a known acceptance-contract policy.
func ValidEdgeKind ¶
ValidEdgeKind reports whether s is a known edge kind.
func ValidEscalation ¶
ValidEscalation reports whether s is a known convergence escalation.
func ValidItemKind ¶
ValidItemKind reports whether s is a known contract item kind.
func ValidLifecycle ¶
ValidLifecycle reports whether s is a known goal lifecycle.
func ValidOnFailure ¶
ValidOnFailure reports whether s is a known edge failure policy.
func ValidPriority ¶
ValidPriority reports whether s is a known scheduling priority.
func ValidPurpose ¶
ValidPurpose reports whether s is a known attempt purpose.
func ValidResult ¶
ValidResult reports whether s is a known acceptance outcome.
func ValidReviewPolicy ¶
ValidReviewPolicy reports whether s is a known decomposition review gate.
func ValidateDecomposition ¶
func ValidateDecomposition(c DecompositionContent, parentDepth, maxDepth int) error
ValidateDecomposition checks a DecompositionContent's structural invariants (contract §6): ≥1 required child, at most maxDecompositionBreadth children, every child Key unique and non-empty, every edge key resolves to a child Key, no edge cycle (DFS over keys), known enum values, and a depth budget. A composite produces no executed output, so a composite child may not carry a deterministic acceptance item (its fold would stall forever) and must leave a level of depth under maxDepth for its own children — both are enforced here at the proposal boundary so a doomed plan is rejected before it consumes budget. Returns ErrInvalidDecomposition, ErrInvalidContract, ErrDepthExceeded, or ErrCycle.
parentDepth is the composite's own depth; a child sits at parentDepth+1.
Types ¶
type AcceptanceContract ¶
type AcceptanceContract struct {
Policy string `json:"policy"` // deterministic_then_judgment | all | any
Items []AcceptanceItem `json:"items"`
}
AcceptanceContract is the composite policy tree of deterministic + judgment items that gates a goal's acceptance (contract §3.1). An empty contract (no items) is the trivial auto-accept degradation — a "direct goal" with no bar. Marshaled to the acceptance_contract TEXT column.
func SplitCriteria ¶
func SplitCriteria(crits []Criterion) AcceptanceContract
SplitCriteria turns natural-language criteria into a deterministic|judgment item set under the deterministic_then_judgment policy (contract §3.1). A criterion with a Command becomes a binding deterministic item; one without becomes a judgment item (agent rubric or human prompt). The model never pretends prose is executable — the caller (planner) decides the split by supplying or omitting Command.
func (AcceptanceContract) HasDeterministicItem ¶ added in v0.50.0
func (c AcceptanceContract) HasDeterministicItem() bool
HasDeterministicItem reports whether the contract carries any deterministic (command-checked) item. A deterministic item needs an executed output to run its check against; a composite produces no output, so its fold would stall pending forever (no event source). Callers reject deterministic items on composites — see ErrCompositeDeterministicContract.
func (AcceptanceContract) IsTrivial ¶
func (c AcceptanceContract) IsTrivial() bool
IsTrivial reports whether the contract is the empty auto-accept degradation (no items). A trivial leaf accepts immediately; a trivial composite accepts when all required children are accepted.
func (AcceptanceContract) Valid ¶
func (c AcceptanceContract) Valid() bool
Valid reports whether the contract's policy and every item are well-formed. An empty contract is valid (trivial auto-accept).
type AcceptanceEventDetail ¶
type AcceptanceEventDetail struct {
Stdout string `json:"stdout,omitempty"` // truncated to N KB
Artifacts []ArtifactRef `json:"artifacts,omitempty"`
DurationMs int64 `json:"duration_ms,omitempty"`
CacheHit bool `json:"cache_hit,omitempty"`
Gaps []Gap `json:"gaps,omitempty"`
}
AcceptanceEventDetail is the truncated/hash-addressed payload on an acceptance event (contract §3.6). The verdict quartet (rationale/scope/authority/timestamp) + scope_hash are first-class columns, not this blob.
type AcceptanceItem ¶
type AcceptanceItem struct {
ID string `json:"id"`
Kind string `json:"kind"` // deterministic | judgment
Required bool `json:"required"`
// deterministic:
Command string `json:"command,omitempty"`
ExpectExit *int `json:"expect_exit,omitempty"` // default 0
// judgment:
Authority string `json:"authority,omitempty"` // agent | human
Rubric string `json:"rubric,omitempty"` // agent reviewer prompt
Prompt string `json:"prompt,omitempty"` // human verdict prompt
}
AcceptanceItem is one check in the contract. Deterministic items carry a Command; judgment items carry an Authority + Rubric/Prompt. A non-required item is advisory and does not gate acceptance.
func (AcceptanceItem) Valid ¶
func (i AcceptanceItem) Valid() bool
Valid reports whether a single item is structurally sound: a deterministic item needs a command; a judgment item needs a known authority.
type AcceptedOutput ¶
type AcceptedOutput struct {
GoalID string `json:"goal_id"`
Summary string `json:"summary"`
Result map[string]any `json:"result,omitempty"`
Artifacts []ArtifactRef `json:"artifacts,omitempty"`
Hash string `json:"hash"` // feeds downstream cache_key (§4.1) + verdict scope_hash (§4.2)
AcceptedAt string `json:"accepted_at"` // RFC3339 UTC
SourceAttempt string `json:"source_attempt_id"`
// Children carries the frozen accepted output of each accepted child, set only
// on a composite's rollup output. A composite produces no work of its own, so
// this is how its deliverables travel with the parent: a reader of the root
// goal gets the tree's results without walking children. Nested composites
// compose (a child composite's Children is already filled when it accepted).
// Empty for leaves. Bounded by fanout (max_concurrent) x depth (max_depth).
Children []AcceptedOutput `json:"children,omitempty"`
}
AcceptedOutput is the frozen snapshot copied from the accepted attempt (contract §3.5), then immutable. This is what flows downstream and what a verdict's scope_hash anchors to.
type Actor ¶
type Actor struct {
Type string // one of ActorSystem | ActorUser | ActorAgent | ActorWorker | ActorReviewer | ActorPlanner
ID string // user_id / agent_id / worker_id depending on Type; empty for system
}
Actor describes who initiated a transition. Ported from the old package; the goal ledger records verdict authority as a first-class column, but the service still carries an Actor for audit/attribution on non-acceptance transitions.
func SystemActor ¶
func SystemActor() Actor
SystemActor is the dispatcher/system originator convenience.
type ArtifactRef ¶
type ArtifactRef struct {
Hash string `json:"hash"`
Kind string `json:"kind"`
URI string `json:"uri,omitempty"`
Bytes int64 `json:"bytes"`
}
ArtifactRef is a hash-addressed externalized artifact (stdout/diffs/files).
type AttemptEnqueuer ¶ added in v0.50.0
AttemptEnqueuer enqueues the durable execution job for a freshly claimed attempt within the claim transaction tx, so the claim and its job are atomic (River Phase 2c). GoalService.Claim calls it after minting the attempt; returning an error rolls the whole claim back. A nil AttemptEnqueuer skips enqueue (tests that mint+claim directly without River).
type AttemptEvidence ¶
type AttemptEvidence struct {
Summary string `json:"summary"`
Artifacts []ArtifactRef `json:"artifacts,omitempty"`
Notes map[string]any `json:"notes,omitempty"`
}
AttemptEvidence is the submitted handoff (contract §3.4). A non-root goal submitting an empty Summary is a retryable protocol miss.
type AttemptInput ¶
type AttemptInput struct {
Intent string `json:"intent"`
UpstreamOutputs []AcceptedOutput `json:"upstream_outputs"` // ONLY accepted upstream outputs
PriorGaps *Evaluation `json:"prior_gaps,omitempty"` // Evaluation[i-1].gaps; nil on attempt 1
Contract AcceptanceContract `json:"contract"` // the bar the attempt must clear
ResolvedVerdict string `json:"resolved_verdict,omitempty"` // a human answer that unblocked needs_verdict
AttemptNo int `json:"attempt_no"`
}
AttemptInput is the frozen-at-mint context an attempt executes against (contract §3.3). Marshaled to the attempt's input_context column. Frozen means an in-flight edit to intent/contract never mutates a running attempt.
type AttemptOutput ¶
type AttemptOutput struct {
Result map[string]any `json:"result,omitempty"`
Summary string `json:"summary"`
Hash string `json:"hash"`
}
AttemptOutput is the candidate the contract evaluates (contract §3.4). Hash becomes accepted_output.Hash on acceptance.
type Blocker ¶
type Blocker struct {
Kind string `json:"kind"`
Question string `json:"question"`
Detail any `json:"detail,omitempty"`
}
Blocker carries a block action's payload. The goal model resolves blocks through the service (dep / needs_verdict / budget_exhausted), so an agent-declared block surfaces to the worker as a non-retryable failure whose reason embeds this payload; the structured form is kept here for the worker and for callers that introspect a recorded result.
type BootConfig ¶
type BootConfig struct {
DB *pgxpool.Pool
Services agent.ServiceManager // registry-backed session minting
Chat TaskChatFunc // runs persisted worker turns; nil => noop executor
// MaxWorkers caps concurrent attempt executions per node on the goal River
// queue (0 => defaultGoalMaxWorkers). TickEvery/LeaseTTL override the
// dispatcher/service defaults; zero values use them.
MaxWorkers int
TickEvery time.Duration
LeaseTTL time.Duration
Logger *slog.Logger
}
BootConfig is the minimal wiring needed at server start. It mirrors the old tasks.BootConfig: a DB handle, the agent ServiceManager for session minting, the Chat callback for worker turns, and the dispatcher tunables.
type CheckEnv ¶
type CheckEnv struct {
GoalID string
SandboxImage string
RepoTreeHash string // "" ⇒ forced miss
EnvHash string // "" ⇒ forced miss
UpstreamHashes []string // accepted-output hashes of upstream edges
}
CheckEnv carries the provenance inputs the cache key folds (contract §4.1). RepoTreeHash/EnvHash may be "" when the sandbox cannot guarantee a stable hash; that forces a cache miss rather than risking a false hit.
type CheckResult ¶
type CheckResult struct {
ItemID string
ExitCode int
Pass bool
Stdout string
CacheKey string
CacheHit bool
}
CheckResult is the deterministic-check outcome the runner returns; the service folds it into an acceptance_event (contract §4.1). It never writes lifecycle.
type CheckRunner ¶
type CheckRunner interface {
Run(ctx context.Context, item AcceptanceItem, env CheckEnv) (CheckResult, error)
}
CheckRunner runs ONE deterministic acceptance item in the sandbox and returns a CheckResult the service folds into an acceptance_event. It is the only sandbox-IO in acceptance; it NEVER writes lifecycle. It runs inside the worker, after the executor submits, before the durable transition (sandbox exec must never run inside a DB tx, which would pin a pooled connection). The implementation lands in the integration phase (runner.go).
func NewCheckRunner ¶
func NewCheckRunner(resolve SessionResolver, q *sqlc.Queries, stdoutLimit int) CheckRunner
NewCheckRunner builds the sandbox-backed CheckRunner. q is used only for the read-only cache probe; stdoutLimit ≤ 0 falls back to the package default. The returned value satisfies the CheckRunner interface declared in service.go.
type Config ¶
type Config struct {
// LeaseTTL bounds an attempt's lease before the dispatcher may reap it.
LeaseTTL time.Duration
// MaxConcurrentPerUser caps in-flight attempts per user (§5, default 16).
MaxConcurrentPerUser int
// StdoutLimit truncates captured stdout before it touches event detail.
StdoutLimit int
}
Config carries the service's tunables. Zero values fall back to package defaults at use sites.
type ConvergencePolicy ¶
type ConvergencePolicy struct {
MaxAttempts int `json:"max_attempts"` // default 3; exhaustion → blocked(budget_exhausted)
Escalation string `json:"escalation"` // block (default) | abandon
MaxDepth int `json:"max_depth,omitempty"` // recursion ceiling; default 4
// MaxConcurrent bounds breadth-of-fanout per root (§5, default 8). Read from
// the root's policy by the dispatcher; 0 ⇒ default.
MaxConcurrent int `json:"max_concurrent,omitempty"`
}
ConvergencePolicy bounds the rework loop and the recursion depth (§3.2).
func (ConvergencePolicy) Normalized ¶
func (p ConvergencePolicy) Normalized() ConvergencePolicy
Normalized returns the policy with defaults applied (MaxAttempts 3, Escalation block, MaxDepth 4, MaxConcurrent 8). It never mutates the receiver.
func (ConvergencePolicy) Valid ¶
func (p ConvergencePolicy) Valid() bool
Valid reports whether the convergence policy is well-formed. A zero-value policy is valid; Normalized fills the defaults.
type CreateInput ¶
type CreateInput struct {
UserID string
AgentID string
ProjectID string
ParentID string // "" ⇒ root
RootID string // = id for a root; parent.root_id for a child
Depth int64
Position int64
SessionID string
Title string
Intent string
Kind string // leaf | composite
Priority string // "" ⇒ routine
Required bool
Contract AcceptanceContract
Convergence ConvergencePolicy
ReviewPolicy string // "" ⇒ none
Context json.RawMessage // empty ⇒ "{}"
DispatchHint json.RawMessage // empty ⇒ "{}"
}
CreateInput is the request to mint a root or child goal. The caller pre-mints session_id OUTSIDE any tx and passes it here.
type Criterion ¶
type Criterion struct {
ID string // stable item id
Text string // the NL criterion
// Command, when non-empty, marks the criterion operationalizable: it becomes
// a deterministic item bound to this command. Empty ⇒ judgment.
Command string
// Authority routes a judgment criterion (agent | human). Defaults to agent
// when a judgment criterion leaves it blank.
Authority string
// Required marks whether the criterion gates acceptance (default true).
Required bool
}
Criterion is one natural-language acceptance criterion handed to the splitter. Operationalizable carries a machine-checkable command; everything else is routed to a judgment item.
type DecompositionContent ¶
type DecompositionContent struct {
Children []ProposedChild `json:"children"`
Edges []ProposedEdge `json:"edges"`
}
DecompositionContent is a composite's proposed children + edges (contract §3.7). Stored inline on agent_goal.plan (jsonb). The materializer keys idempotency on (goal_id, child.Key).
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
Dispatcher drives the convergence loop on a tick: reap stale attempts, propagate dep failures, roll up composites, then scan-and-claim dispatchable leaves under the concurrency caps and enqueue a durable River job per claim (contract §5, §7; River Phase 2a). It is the only scheduler; every durable write still routes through GoalService, and every attempt now executes as a River job rather than an in-process goroutine.
func NewDispatcher ¶
func NewDispatcher(cfg DispatcherConfig) *Dispatcher
NewDispatcher constructs a dispatcher, filling defaults for zero-valued config fields.
func (*Dispatcher) SetEnqueuer ¶ added in v0.50.0
func (d *Dispatcher) SetEnqueuer(e goalEnqueuer)
SetEnqueuer injects the River enqueuer the dispatcher uses to dispatch claimed attempts. Called by the composition root after the shared client is built and before the tick starts; a nil enqueuer leaves dispatch disabled.
func (*Dispatcher) Stop ¶
func (d *Dispatcher) Stop()
Stop signals the tick to go quiet. In-flight attempts now run as River jobs, and the tick itself runs as a River job; draining both is the shared River client's responsibility, not the dispatcher's.
func (*Dispatcher) Tick ¶
func (d *Dispatcher) Tick(ctx context.Context)
Tick runs one pass of the dispatch loop. Public so tests can drive it deterministically. The order is fixed (§5/§7): reap → propagate → rollup → scan-and-decompose → scan-and-claim.
func (*Dispatcher) TickInterval ¶ added in v0.50.0
func (d *Dispatcher) TickInterval() time.Duration
TickInterval is the convergence-tick period. The composition root reads it to register the single-leader River periodic tick (River Phase 2b).
type DispatcherConfig ¶
type DispatcherConfig struct {
Service *GoalService
Queries *sqlc.Queries
// Enqueuer enqueues one durable River job per claimed attempt (River Phase
// 2a). A nil enqueuer disables dispatch (tests that drive Worker.Run directly).
Enqueuer goalEnqueuer
TickEvery time.Duration // 0 ⇒ defaultTickEvery
LeaseTTL time.Duration // 0 ⇒ service LeaseTTL
BatchLimit int // 0 ⇒ defaultBatchLimit
// MaxConcurrentPerUser caps in-flight attempts per user (§5/§10.8, default
// 16). 0 ⇒ the service's configured per-user cap.
MaxConcurrentPerUser int
Logger *slog.Logger
}
DispatcherConfig wires a Dispatcher. Zero-valued fields fall back to the package defaults below.
type Evaluation ¶
type Evaluation struct {
Gaps []Gap `json:"gaps"`
}
Evaluation carries the shortfalls from one acceptance fold; it feeds the next attempt's input as prior_gaps.
type Executor ¶
type Executor interface {
Execute(ctx context.Context, req ExecutorRequest) (ExecutorResult, error)
}
Executor owns agent interaction for one attempt and is PURE with respect to durable state: Execute returns the action it wants; the WORKER applies the single transition through GoalService. The agent never mutates lifecycle/acceptance — the contract's non-negotiable. The concrete Request / Result types are declared by the integration phase (executor.go); they are referenced here only behind this interface so the core stays decoupled.
type ExecutorRequest ¶
type ExecutorRequest struct {
Goal sqlc.AgentGoal
Attempt sqlc.AgentGoalAttempt
Input AttemptInput
}
ExecutorRequest / ExecutorResult are forward-declared handles the integration phase fleshes out (executor.go). They are kept opaque here so service.go does not depend on the agent layer; the worker is the only caller that constructs and consumes them.
type ExecutorResult ¶
type ExecutorResult struct {
Submitted bool
Evidence AttemptEvidence
Output AttemptOutput
Decomposition *DecompositionContent // purpose=decomposition only
Failed bool
FailReason string
Retryable bool
}
ExecutorResult is the executor's declared outcome for one attempt. Exactly one of the submit/fail paths is meaningful; the worker maps it to a single service transition. Decomposition attempts carry produced plan content.
type Gap ¶
type Gap struct {
ItemID string `json:"item_id"`
Reason string `json:"reason"`
Detail string `json:"detail,omitempty"`
}
Gap is one unmet item with a reason fed into attempt_no+1.
type GoalFilter ¶
type GoalFilter struct {
AgentID string
Lifecycle string
ProjectID string
Terminal *bool
Q string
Archived bool
}
GoalFilter narrows a root-goal list. The zero value lists active (non-archived) roots across all agents; populated fields AND together. Terminal is tri-state: nil = both, false = active only, true = history (terminal) only.
type GoalService ¶
type GoalService struct {
// contains filtered or unexported fields
}
GoalService is the ONLY writer of agent_goal_* rows. Every durable change is one withTx: load → guard from→to → side-effects → counter bumps → append acceptance_event → commit. Callers branch on the typed errors in types.go, never on string Contains. Sessions/sandboxes are minted OUTSIDE the tx.
func New ¶
New builds a GoalService. The Queries is used for non-transactional reads; mutating methods open their own txns via withTx. Collaborators that are nil on a given path surface as clear errors rather than panics.
func (*GoalService) Abandon ¶
Abandon is the human give-up on a blocked(budget_exhausted): blocked→abandoned (terminal), bumping parent required_failed (contract §2.1).
func (*GoalService) Activate ¶
Activate is the plan gate: draft→ready (contract §2.1). A leaf passes when its contract has items or is explicitly trivial; a composite passes when it is planned (planned_at set) and required_total ≥ 1. A composite flips its draft children → ready. Returns ErrPlanGate when unmet.
func (*GoalService) AddEdge ¶
func (s *GoalService) AddEdge(ctx context.Context, downstreamID, upstreamID, kind, onFailure string) (sqlc.AgentGoalEdge, error)
AddEdge inserts an accepted-output dependency between siblings with a DFS cycle check (contract §1.3, §6). Returns ErrCycle when the edge would close a cycle.
func (*GoalService) ApprovePlan ¶ added in v0.50.0
ApprovePlan applies a human approval of a composite's proposed plan: it materializes the children and releases them, moving the composite out of blocked(needs_plan_approval) back to active for rollup (contract §2.3). Child sessions are pre-minted OUTSIDE the tx (mirrors the none path in SubmitDecomposition). Only a composite blocked on plan approval is accepted.
func (*GoalService) Archive ¶
func (s *GoalService) Archive(ctx context.Context, id string) error
Archive soft-flags a terminal goal out of default lists.
func (*GoalService) BeginAutoDecomposition ¶ added in v0.50.0
func (s *GoalService) BeginAutoDecomposition(ctx context.Context, id string, enqueue AttemptEnqueuer) (sqlc.AgentGoalAttempt, error)
BeginAutoDecomposition mints a headless decomposition attempt for a composite and enqueues its durable River job in ONE tx (mirrors Claim): on success both commit; on enqueue failure the whole thing rolls back, leaving the composite draft to be re-picked next tick — no orphaned active composite to recover. Unlike interactive BeginDecomposition the attempt carries a real claim-grace lease and is heartbeated by the River worker, so the reaper recovers it if the node crashes mid-plan. The dispatcher (scanAndDecompose) is the only caller.
The eligibility guards are re-checked INSIDE the tx after LockGoalForWrite: the dispatcher's scan is a snapshot, so a concurrent Activate or a prior decomposition could have moved the goal between the scan and here. Re-checking under the row lock makes auto-decomposition never clobber an already-running or already-planned composite.
func (*GoalService) BeginDecomposition ¶
func (s *GoalService) BeginDecomposition(ctx context.Context, id string) (sqlc.AgentGoalAttempt, error)
BeginDecomposition starts a composite's decomposition: draft→active, minting a purpose=decomposition attempt (contract §2.1). Guards kind=composite and not yet planned. The planning session is pre-minted outside the tx.
func (*GoalService) Block ¶
Block moves ready/active → blocked with a reason (contract §2.1). Used by the dispatcher (dep), the fold (needs_verdict), and convergence (budget_exhausted).
func (*GoalService) Cancel ¶
Cancel cascades cancel over the subtree (contract §6): depth-first set cancelled on each non-terminal descendant, reconcile each touched parent's required_failed, cancel in-flight attempts, stamp cancelled_at. One tx.
func (*GoalService) Claim ¶
func (s *GoalService) Claim(ctx context.Context, id, workerID string, enqueue AttemptEnqueuer) (sqlc.AgentGoalAttempt, error)
Claim is the dispatcher's leaf ready→active (contract §2.1, §5 step 1). It resolves the dispatch hint's executor, mints a queued execution attempt and claims the goal in one tx. The per-root/per-user concurrency caps are enforced by the dispatcher BEFORE Claim (§5 step 2); Claim itself only guards the single-writer invariants.
enqueue (River Phase 2c) inserts the attempt's durable execution job in the SAME tx, so the claim and its job commit atomically — a crash can no longer leave a claimed attempt with no job to run it. A nil enqueue skips this (tests minting+claiming without River); its failure rolls the claim back, leaving the goal ready for the next tick.
func (*GoalService) CreateGoal ¶
func (s *GoalService) CreateGoal(ctx context.Context, in CreateInput) (sqlc.AgentGoal, error)
CreateGoal inserts a goal in 'draft' (contract §2.1, (none)→draft).
func (*GoalService) CreateRoot ¶
func (s *GoalService) CreateRoot(ctx context.Context, in CreateInput) (sqlc.AgentGoal, error)
CreateRoot mints a worker session for a new root goal (a goal) and creates it in 'draft'. The session is minted OUTSIDE the insert tx to avoid a self-deadlock and to keep slow session-minting off a pooled connection (same discipline as Materialize's pre-minted child sessions). Child goals get their sessions from Materialize instead, so this entry is root-only.
func (*GoalService) FailAttempt ¶
func (s *GoalService) FailAttempt(ctx context.Context, attemptID, reason string) error
FailAttempt finalizes a worker-reported failed attempt AND applies the single convergence transition, so the goal never strands 'active' with no live attempt (issue #543). It mirrors ReapAttempt but is the executor-failure entry (agent reported fail / produced no action / empty handoff / panic), so the attempt is recorded 'failed' rather than 'interrupted'. One tx.
- execution attempt → branchOnFailure: budget left reopens to ready (rework = next attempt, the failure reason rides as a gap); budget out blocks/abandons/ rejects per policy. A failed attempt consumes one budget unit (same as a fold failure), so a persistently failing agent parks at blocked, never loops.
- decomposition attempt → release the composite to draft so a new BeginDecomposition can re-mint (mirror ReapAttempt's decomposition branch).
A 0-row FinalizeAttempt means the attempt is no longer queued/running (already reaped/raced); the tx rolls back and the caller treats ErrInvalidTransition as a no-op (the goal already recovered).
func (*GoalService) Materialize ¶
func (s *GoalService) Materialize(ctx context.Context, qtx *sqlc.Queries, parent sqlc.AgentGoal, content DecompositionContent, childSessions map[string]string) error
Materialize creates a composite's children + edges from its proposed plan, in ONE caller-supplied tx (contract §6). It operates on the passed tx-bound querier — no import cycle, no own tx, no session minting (sessions are pre-minted outside the tx and resolved per child by the caller through the childSessions map). It is idempotent: the planned_at fence (MarkGoalPlanned CAS) short-circuits a second call, and each child's deterministic id makes re-insert a no-op.
Steps:
- depth guard: parent.depth+1 ≤ max_depth (root's convergence_policy), else ErrDepthExceeded.
- validate content (≥1 required child, edge keys resolve, no cycle), else ErrInvalidDecomposition / ErrCycle.
- for each ProposedChild: insert (skip-if-exists) a child goal (parent_id=composite, root_id=composite.root_id, depth=parent.depth+1, position=index, contract/policy from the proposal, lifecycle draft).
- for each ProposedEdge: insert agent_goal_edge (resolve keys→ids); PK collision = no-op.
- set parent required_total (count of required children); CAS planned_at.
Replan reconcile (cancelling/detaching children dropped by a later plan) is intentionally out of scope: a composite is decomposed exactly once (planned_at gates re-decomposition). Re-enabling replan is a tracked follow-up (contract §10).
The caller MUST hold LockGoalForWrite on parent so the planned_at CAS and child inserts cannot race a concurrent materialize.
childSessions maps a child Key → a pre-minted session_id. A missing entry is a programming error the caller (converge.go) prevents by minting all sessions before opening the tx.
func (*GoalService) ReapAttempt ¶
func (s *GoalService) ReapAttempt(ctx context.Context, attemptID string) error
ReapAttempt finalizes a stale (lease-expired) attempt as 'interrupted' and returns its goal to ready within the convergence budget, or budget-blocks it if exhausted — one tx (contract §2.2 running→interrupted). Idempotent: an attempt no longer queued/running is a no-op (FinalizeAttempt affects 0 rows), surfacing as ErrInvalidTransition the dispatcher ignores.
func (*GoalService) Reattempt ¶
Reattempt raises the budget on a blocked(budget_exhausted) goal so the next tick mints a fresh attempt. A leaf returns to ready (the dispatcher claims it); a composite, whose budget meters DECOMPOSITION attempts, returns to draft so scanAndDecompose re-plans it (contract §2.1, see recoveryLifecycle).
func (*GoalService) RecoverBlockedComposite ¶ added in v0.50.0
func (s *GoalService) RecoverBlockedComposite(ctx context.Context, id string) error
RecoverBlockedComposite re-evaluates a composite the rollup parked in blocked(dep) and wakes it back to active once its blocking children have cleared (contract §6). The rollup scans (ListRollupCandidates/ ListStalledComposites) only see active composites, so a parent driven to blocked(dep) — e.g. while a required child awaits plan approval or a verdict — never re-enters rollup on its own even after that child recovers. The dispatcher runs this over ListBlockedDepComposites each tick. Idempotent: a no-op while any required child is still blocked.
func (*GoalService) RejectPlan ¶ added in v0.50.0
RejectPlan applies a human rejection of a composite's proposed plan: it clears the plan and returns the composite to draft so the dispatcher re-decomposes it (contract §2.3). Only a composite blocked on plan approval AND not yet materialized (planned_at IS NULL) is accepted — replan after materialize is not supported (childID would collide; see materializer.go).
func (*GoalService) RollupAccept ¶
func (s *GoalService) RollupAccept(ctx context.Context, id string) error
RollupAccept runs a composite parent's own Accept gate once all required children are accepted (contract §6, RollupComposite ⇒ accept_parent). A trivial-contract composite accepts immediately; an authored contract folds via applyAcceptance. One tx; bumps this parent's own parent counter.
func (*GoalService) RollupFail ¶
func (s *GoalService) RollupFail(ctx context.Context, id string) error
RollupFail moves a composite to rejected_final because a required child reached a terminal-bad state (contract §6, RollupComposite ⇒ fail), bumping this parent's own parent required_failed counter. One tx.
func (*GoalService) SetClock ¶
func (s *GoalService) SetClock(c func() time.Time)
SetClock overrides the clock for tests.
func (*GoalService) Submit ¶
func (s *GoalService) Submit(ctx context.Context, attemptID string, ev AttemptEvidence, out AttemptOutput) error
Submit advances an attempt running→submitted, writes its evidence+output, and folds acceptance (contract §2.2, §5 steps 5-7). It is the worker's single durable call after running the executor (and, for deterministic items, the CheckRunner that appended its check events). An empty summary on a non-root goal is a retryable protocol miss (ErrInvalidEvidence). The fold runs in the SAME tx so submit→evaluate→transition is atomic.
func (*GoalService) SubmitDecomposition ¶ added in v0.50.0
func (s *GoalService) SubmitDecomposition(ctx context.Context, attemptID string, ev AttemptEvidence, content DecompositionContent) error
SubmitDecomposition applies a successful autonomous decomposition attempt: it records the produced plan on the composite goal, then branches on review policy (contract §2.3, §6). For review_policy=none it materializes the children and releases them so the tree runs; for review_policy=human it parks the composite blocked(needs_plan_approval) so a human can ApprovePlan/RejectPlan. It is the decomposition analogue of Submit and the single durable transition the worker applies for a purpose=decomposition attempt. For the none path child sessions are pre-minted OUTSIDE the tx (their own minting tx would self-deadlock against the held one); everything else runs in ONE tx so a crash never leaves a half-planned composite.
func (*GoalService) SubmitVerdict ¶
func (s *GoalService) SubmitVerdict(ctx context.Context, in VerdictInput) error
SubmitVerdict appends a human verdict event and re-runs applyAcceptance (contract §2.1 blocked(needs_verdict)→active/accepted). The verdict is evidence, never a state write.
func (*GoalService) Unarchive ¶
func (s *GoalService) Unarchive(ctx context.Context, id string) error
Unarchive clears the archived flag.
func (*GoalService) Unblock ¶
Unblock clears a recoverable block: blocked(dep)→ready (leaf) or →active/draft (composite, see recoveryLifecycle) when the condition cleared (contract §2.1).
func (*GoalService) UpdateMetadata ¶
func (s *GoalService) UpdateMetadata(ctx context.Context, id string, in UpdateInput) (sqlc.AgentGoal, error)
UpdateMetadata applies a partial metadata edit (contract §2 — metadata is mutable; lifecycle is not). It validates the value-sets it touches and returns the refreshed row. Lifecycle, counters, and projection are untouched.
type HumanVerdict ¶
type HumanVerdict struct {
GoalID string
ItemID string
Pass bool
Rationale string
Scope string
ScopeHash string // the accepted-output/artifact hash the verdict covers
ReviewerUserID string
AttemptID string // the evaluated attempt whose output the verdict judges
}
HumanVerdict is the API-delivered human decision for an authority=human judgment item (contract §4.2). The service validates and folds it into an acceptance_event; it never sets acceptance_state directly.
func (HumanVerdict) Valid ¶
func (v HumanVerdict) Valid() bool
Valid reports whether a human verdict is well-formed. A verdict must name the item it answers and the reviewer who authored it; an empty scope_hash is allowed (it forces re-request on any subsequent output, the conservative default) but the identity fields are mandatory.
type Option ¶
type Option func(*GoalService)
Option configures a GoalService at construction.
func WithCheckRunner ¶
func WithCheckRunner(r CheckRunner) Option
WithCheckRunner sets the deterministic check runner.
func WithConfig ¶
WithConfig overrides the service config (defaults filled for zero fields).
func WithPlanningSessionMinter ¶
func WithPlanningSessionMinter(m SessionMinter) Option
WithPlanningSessionMinter sets the decomposition planning-session minter.
func WithSessionMinter ¶
func WithSessionMinter(m SessionMinter) Option
WithSessionMinter sets the worker-session minter.
type Projection ¶
type Projection struct {
State string // pending | passed | failed
PendingItems []string // required items with no terminal (or only stale) event
Gaps Evaluation
NeedsVerdict bool // a required human-authority item is pending/stale
}
Projection is the derived acceptance view of a goal — the result of folding its acceptance ledger against its contract (contract §4.3). The service maps a Projection to exactly one lifecycle transition; nothing else produces a Projection or writes acceptance_state.
func DeriveAcceptance ¶
func DeriveAcceptance(c AcceptanceContract, currentOutputHash string, events []sqlc.AgentGoalAcceptanceEvent) Projection
DeriveAcceptance is the PURE fold that turns events → projection (contract §4.3). It is the single place acceptance is decided. It takes the latest VALID event per item — a judgment event is valid only if its scope_hash matches currentOutputHash (§4.2 staleness); a deterministic event is taken as authoritative for its item. Policy evaluation then collapses item outcomes to a state. No DB calls, no clock, no side effects.
type ProposedChild ¶
type ProposedChild struct {
Key string `json:"key"`
Title string `json:"title"`
Intent string `json:"intent"`
Kind string `json:"kind"` // leaf | composite
Required bool `json:"required"`
AcceptanceContract AcceptanceContract `json:"acceptance_contract"`
ConvergencePolicy ConvergencePolicy `json:"convergence_policy"`
ReviewPolicy string `json:"review_policy,omitempty"` // a composite child carries it
}
ProposedChild is one child goal a plan proposes. Key is the stable id within the plan and the materialize idempotency key (replaces the old plan_item_id).
type ProposedEdge ¶
type ProposedEdge struct {
DownstreamKey string `json:"downstream_key"`
UpstreamKey string `json:"upstream_key"`
Kind string `json:"kind"` // hard | soft
OnFailure string `json:"on_failure"` // block | fail | ignore
}
ProposedEdge is one sibling dependency a plan proposes, by child Key.
type Readiness ¶
Readiness is the computed dispatchability view of a goal at a moment. Pure function over (goal row, edges with upstream state pre-joined, now).
func Compute ¶
Compute returns the readiness of a goal given its upstream edges (with upstream lifecycle pre-joined) at time now. PURE: no DB calls, no side effects, no clock reads beyond now. An edge means an accepted-output dependency — a hard edge is satisfied only when the upstream is 'accepted' (or the edge is waived); a soft edge is advisory and never blocks.
now is reserved for future deferral gates (not_before equivalents); it is accepted now so the signature is stable when those land.
type Reason ¶
type Reason struct {
Type string // upstream_not_accepted, upstream_failed_block, soft_upstream_pending, ...
UpstreamID string
Detail string
}
Reason explains a readiness state, e.g. an unsatisfied upstream edge.
type Result ¶
type Result struct {
Action TerminalAction
Evidence AttemptEvidence
Output AttemptOutput
Decomposition *DecompositionContent // purpose=decomposition only
Blocker *Blocker
Failure *Failure
// RepairAttempted is set when a text-only first turn triggered one bounded
// repair turn that still produced no terminal action. It only carries meaning
// for terminalNone and lets the worker distinguish a silent miss from a
// failed repair.
RepairAttempted bool
}
Result is the executor's rich internal outcome for one attempt: exactly one terminal action (or terminalNone when the agent ended without declaring one). Execute folds this down to the frozen ExecutorResult the worker consumes.
type RollupVerdict ¶
type RollupVerdict string
RollupVerdict is the decision RollupComposite returns over a composite parent's incremental rollup counters. The service acts on exactly one.
const ( // RollupWait — required children still in flight; no transition (no-op). RollupWait RollupVerdict = "wait" // RollupAcceptParent — all required children accepted; run the parent's own // Accept gate (its contract; trivial ⇒ immediate accept). RollupAcceptParent RollupVerdict = "accept_parent" // RollupBlock — a required child is blocked; the parent surfaces the stall. RollupBlock RollupVerdict = "block" // RollupFail — a required child reached a terminal-bad state // (rejected_final/abandoned/cancelled); the requirement is permanently unmet. RollupFail RollupVerdict = "fail" )
func RollupComposite ¶
func RollupComposite(parent sqlc.AgentGoal) RollupVerdict
RollupComposite is the PURE decision over a composite parent's incremental required_* counters (contract §6). It never scans the subtree — the counters are bumped in the same tx that transitions each child, so the parent reads O(1) state. Precedence mirrors the goal-rollup table: a failed required child fails the parent; else a blocked one blocks it; else all-accepted accepts it; else wait.
A parent with no required children yet (required_total == 0) waits: the materializer may still be inserting children, and a vacuous "all required accepted" must never auto-accept a composite that has nothing to roll up.
type Service ¶
type Service struct {
Queries *sqlc.Queries
Goal *GoalService
Dispatcher *Dispatcher
// contains filtered or unexported fields
}
Service is the boot-level bundle the server + CLI bind to. It owns the GoalService (the single durable-state writer) and the dispatcher, and exposes the read+command surface the HTTP handlers call. Every mutating method delegates to GoalService so the "lifecycle is written only through a transition" invariant holds; reads go straight to the querier.
The goal subsystem does NOT own a River client: it contributes its queue + worker to the single process-wide client (RegisterRiverWorker / GoalQueueConfig) and receives that client back as its dispatcher's enqueuer (SetRiverClient). The client's Start/Stop lifecycle belongs to the composition root.
func Boot ¶
func Boot(cfg BootConfig) (*Service, error)
Boot constructs the goal system and returns the bound bundle. The dispatcher is built but not ticking; the composition root injects the shared client via SetRiverClient and the server registers the single-leader tick via StartDispatchTick.
(Named Boot, not New: the package's GoalService constructor already owns New(db, q, …). This is the bundle/wiring entry the server binds to.)
Wiring: the worker executor (agent-backed when Chat is non-nil, else a noop that fails non-retryably) plus the worker + planning session minters are registered on the GoalService; one Worker drives claimed attempts and the Dispatcher schedules the convergence loop over it.
func (*Service) AddEdge ¶
func (s *Service) AddEdge(ctx context.Context, downstreamID, upstreamID, kind, onFailure string) (sqlc.AgentGoalEdge, error)
AddEdge inserts an accepted-output dependency between siblings (cycle-checked).
func (*Service) ApprovePlan ¶ added in v0.50.0
ApprovePlan approves a composite's proposed plan (blocked(needs_plan_approval)), materializing its children and resuming the tree.
func (*Service) CountGoals ¶
CountGoals returns the total root goals matching the same filter as ListGoals — it drives the list's exact `total` and the active/history/archived header badges (three counts varying only terminal/archived).
func (*Service) CreateGoal ¶
CreateGoal mints a root goal (a goal) in 'draft', minting its worker session first. Children are created by Materialize, not this entry.
func (*Service) GetAttempt ¶
GetAttempt returns one attempt by id.
func (*Service) GetReadiness ¶
GetReadiness loads a goal + its upstream edges (with upstream lifecycle pre-joined) and returns the computed dispatchability view.
func (*Service) GoalQueueConfig ¶ added in v0.50.0
func (s *Service) GoalQueueConfig() (string, river.QueueConfig)
GoalQueueConfig returns the goal attempt queue name and per-node worker config for the composition root assembling the shared working client.
func (*Service) GoalTickQueueConfig ¶ added in v0.50.0
func (s *Service) GoalTickQueueConfig() (string, river.QueueConfig)
GoalTickQueueConfig returns the convergence-tick queue and its per-node worker config (one worker: the tick never overlaps itself on a node). The composition root adds it alongside GoalQueueConfig.
func (*Service) ListAcceptanceEvents ¶
func (s *Service) ListAcceptanceEvents(ctx context.Context, id string) ([]sqlc.AgentGoalAcceptanceEvent, error)
ListAcceptanceEvents returns the acceptance ledger for a goal, in fold (seq) order — the audit trail.
func (*Service) ListAttempts ¶
ListAttempts returns the execution attempts for a goal, newest first.
func (*Service) ListChildren ¶
ListChildren lists the direct children of a composite goal, in position order.
func (*Service) ListGoals ¶
func (s *Service) ListGoals(ctx context.Context, userID string, filter GoalFilter, limit, offset int64) ([]sqlc.AgentGoal, error)
ListGoals lists root goals (goals: parent_id IS NULL) for a user, narrowed by filter. Empty filter strings match all rows.
func (*Service) ListSubtree ¶
ListSubtree lists every goal in a tree (the whole root_id family).
func (*Service) RegisterRiverWorker ¶ added in v0.50.0
RegisterRiverWorker registers the goal subsystem's workers into a shared workers bundle used to build the process-wide River client: the attempt executor and the convergence-tick worker (River Phase 2b). Call before building the client (composition root).
func (*Service) RejectPlan ¶ added in v0.50.0
RejectPlan rejects a composite's proposed plan, returning it to draft for the dispatcher to re-decompose.
func (*Service) SetRiverClient ¶ added in v0.50.0
SetRiverClient injects the shared working River client: it becomes the dispatcher's enqueuer and the target StartDispatchTick registers the periodic against. Call after the client is built and before StartDispatchTick.
func (*Service) StartDispatchTick ¶ added in v0.50.0
func (s *Service) StartDispatchTick() (rivertype.PeriodicJobHandle, error)
StartDispatchTick registers the convergence tick as a single-leader River periodic job, replacing the per-node in-process ticker (River Phase 2b). River enqueues a periodic only on the elected leader and ByState uniqueness keeps at most one tick live, so the cluster runs a single convergence loop; any node's tick worker may run a fired tick. RunOnStart fires an immediate tick on (re-)election so convergence resumes promptly after a failover or cold start rather than waiting a full interval (the cost is one extra idempotent pass on failover). Returns the handle for StopDispatchTick. Requires SetRiverClient first.
func (*Service) StopDispatchTick ¶ added in v0.50.0
func (s *Service) StopDispatchTick(handle rivertype.PeriodicJobHandle)
StopDispatchTick removes the convergence-tick periodic so no further ticks are enqueued. In-flight ticks drain when the shared client stops.
func (*Service) SubmitVerdict ¶
func (s *Service) SubmitVerdict(ctx context.Context, in VerdictInput) error
SubmitVerdict appends a human verdict event and re-folds acceptance.
func (*Service) UpdateGoal ¶
func (s *Service) UpdateGoal(ctx context.Context, id string, in UpdateInput) (sqlc.AgentGoal, error)
UpdateGoal applies a partial metadata edit (PATCH).
type SessionMinter ¶
SessionMinter returns a fresh durable session id for a goal's persistent agent session (worker) or a planning session (decomposition). It is called OUTSIDE every tx: minting a session opens its own transaction, and running it inside the service tx would self-deadlock (the inner write blocks on a row the outer tx holds) and pin a pooled connection. Implementations land in the integration phase (session.go).
func RegistryPlanningSessionMinter ¶
func RegistryPlanningSessionMinter(sm agent.ServiceManager) SessionMinter
RegistryPlanningSessionMinter mints the dedicated session a composite is decomposed in. Unlike worker sessions it uses KindDelegate/ChannelDelegate so the owning agent can resume it through the delegate tool and the user can re-open it from the UI to refine the decomposition by chatting (#525 planning sessions). Like the worker minter it runs OUTSIDE every service tx. Satisfies SessionMinter.
func RegistrySessionMinter ¶
func RegistrySessionMinter(sm agent.ServiceManager) SessionMinter
RegistrySessionMinter mints a goal's worker (execution) session through the agent registry. The session is hidden — KindTask/ChannelTask — so it never appears in user-facing session lists or review candidates, matching the old tasks worker. It is called OUTSIDE every service tx (minting opens its own transaction; running it inside the service tx would self-deadlock and pin a pooled connection). Satisfies SessionMinter.
type SessionResolver ¶
SessionResolver returns the live sandbox session a goal's deterministic checks run in. The session is minted OUTSIDE any service tx (sandbox boot opens its own writer; see service.go's SessionMinter note) and resolved here by the worker. Returning a nil session with a nil error means "no sandbox for this goal" — the runner surfaces that as ErrNoSandbox rather than panicking on a nil Exec.
type TaskChatFunc ¶
type TaskChatFunc func(ctx context.Context, p TaskChatParams) <-chan agent.Event
TaskChatFunc runs one worker turn through the agent service layer so the transcript persists to the goal's session and prior turns load as history. An unknown agent surfaces as an Err event on the returned channel.
type TaskChatParams ¶
type TaskChatParams struct {
AgentID string
UserID string
SessionID string
ProjectID string
Prompt string
// Decompose routes the turn to the decomposition planning session
// (KindDelegate) instead of the worker session (KindTask). Set for
// purpose=decomposition attempts; the two session kinds resolve differently.
Decompose bool
ExtraTools []tools.Tool
}
TaskChatParams is the worker-turn request passed to BootConfig.Chat. It mirrors the old tasks.TaskChatParams so the stellad wiring carries over verbatim: the callback resolves AgentID to an agent service and runs one persisted turn in the goal's session. executor.go consumes this type; it is declared here because BootConfig.Chat is its only producer.
type TerminalAction ¶
type TerminalAction string
TerminalAction is the durable outcome an executor reports for one attempt. The worker maps it to exactly one service transition; the agent never mutates goal/attempt state directly (the contract's non-negotiable).
type UpdateInput ¶
type UpdateInput struct {
Title *string
Intent *string
Priority *string
ReviewPolicy *string
Contract *AcceptanceContract
Convergence *ConvergencePolicy
}
UpdateInput is the mutable metadata of a goal (PATCH). A nil pointer leaves that field unchanged; the writer reads current values and overlays the provided ones so a partial edit never clobbers untouched columns.
type VerdictInput ¶
type VerdictInput struct {
GoalID string
ItemID string
Result string // pass | fail
Rationale string
Scope string
ScopeHash string // the accepted-output/artifact hash the verdict covers
ReviewerUserID string
}
VerdictInput is a human judgment-as-evidence (contract §4.2). The handler appends it as an acceptance_event and re-folds; it never sets acceptance_state.
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker runs ONE claimed execution/decomposition attempt to its single durable transition. It is the only place the executor (agent IO) and the CheckRunner (sandbox IO) run; both are pure with respect to durable state. The worker applies EXACTLY ONE service transition at the boundary (contract §5 steps 4–7): submit→fold on a submitted attempt, or a finalize-failed on a failed one. Nothing the worker does writes acceptance_state/counters — only the service, through Submit→applyAcceptance, does.
func NewWorker ¶
func NewWorker(svc *GoalService, q *sqlc.Queries) *Worker
NewWorker wires a worker against a service. The executor and check runner default to the ones registered on the service (WithExecutor/WithCheckRunner) so the dispatcher can spawn workers with a single dependency.
func (*Worker) Run ¶
Run drives one claimed attempt. Responsibilities:
- promote the attempt queued→running with started_at + initial lease (a zero-row promote means the attempt was already interrupted/cancelled out from under us — abort with ErrInvalidTransition so a superseded attempt never executes or applies a transition);
- keep lease_expires_at fresh while the executor runs (heartbeat);
- run the executor, then for a submitted attempt run the deterministic CheckRunner, append the results as acceptance_events via the service, and apply the single submit transition (service.Submit → applyAcceptance);
- for a failed (or no-action) attempt apply the single finalize-failed transition so convergence can mint the next attempt or block;
- turn an executor panic into a non-retryable failure.
func (*Worker) SetHeartbeat ¶
SetHeartbeat overrides the heartbeat interval (for tests).
type WorkerRunner ¶
type WorkerRunner interface {
// Run executes the given (already-claimed, queued) attempt to a terminal
// attempt state, folding acceptance through the service. It must be safe to
// call concurrently for distinct attempts.
Run(ctx context.Context, goalID, attemptID string) error
}
WorkerRunner drives one claimed attempt to completion: promote → execute → run checks → apply the single fold transition. The dispatcher owns the concurrency budget and lifetime; the worker (worker.go, integration phase) owns the per-attempt machinery. Declared here so the dispatcher compiles against the boundary without depending on the worker implementation — the same Executor/CheckRunner-style decoupling the spine uses for the service's collaborators.