Documentation
¶
Overview ¶
Package runner is the orchestration layer that ties the Phase-4 substrate together (PRD 1.5 resume + 1.8 lifecycle + 1.8.5 search): the Runner drives the agent turn-by-turn, persists each turn via conversations.Store, is the SOLE writer of aura.paused_states (it observes the Actions.AwaitingInput pause Event and inserts the rows), resolves resumes as a FRESH agent.Run over rehydrated history (SC-4: no silent LLM re-run, no duplicate ask_user tool_call), and owns the goleak-clean auto-title WaitGroup (D-A5-01).
The Runner is NOT an agent.Agent and must NOT collide with internal/agent/workflow.LoopAgent (a control-flow Agent) — it is an orchestrator in the ADK-Go `Runner` sense (AM-03). It consumes CONSUMER-SIDE narrow interfaces declared here (D-A2-02, "accept interfaces, return structs"): only the methods it calls, satisfied implicitly by the concrete *conversations.Store / *askuser.Store / *identity.Store. Unit tests pass hand-written in-memory fakes (no DB → supports the 85% coverage floor); db_integration tests use the real Stores.
Index ¶
- Variables
- func WithThreadLockHeld(ctx context.Context) context.Context
- type CacheMetricStore
- type ContextBlockProvider
- type ConversationStore
- type Deps
- type IdentityStore
- type PauseStore
- type ResponseInput
- type ResumeHook
- type Runner
- func (r *Runner) CloseLearner()
- func (r *Runner) EnsureConversation(ctx context.Context, convID string) error
- func (r *Runner) NewConversation(ctx context.Context) (string, error)
- func (r *Runner) NewConversationWithID(ctx context.Context, conversationID string) (string, error)
- func (r *Runner) PendingFor(ctx context.Context, conversationID string) ([]askuser.Pending, error)
- func (r *Runner) Stop(ctx context.Context, convID string) error
- func (r *Runner) SubmitAnswer(ctx context.Context, token string, resp ResponseInput) (int, error)
- func (r *Runner) SubmitAnswers(ctx context.Context, answers map[string]ResponseInput) (int, error)
- func (r *Runner) TryLockThread(convID string) (func(), bool)
- func (r *Runner) Turn(ctx context.Context, convID string, userMsg *string) iter.Seq2[*agent.Event, error]
- func (r *Runner) TurnBranch(ctx context.Context, convID string, leafSeq int) iter.Seq2[*agent.Event, error]
- type ToolInvocationStore
Constants ¶
This section is empty.
Variables ¶
var ErrThreadBusy = errors.New("thread already has an in-flight run")
ErrThreadBusy reports that a caller tried to start a second concurrent run for the same conversation through a non-blocking channel such as AG-UI.
Functions ¶
Types ¶
type CacheMetricStore ¶
type CacheMetricStore interface {
Insert(ctx context.Context, p sqlc.InsertCacheMetricParams) error
}
CacheMetricStore is the narrow aura.cache_metrics surface the Runner consumes (D-A2-02). *cachemetrics.Store satisfies it implicitly. The Runner writes ONE append-only metric row per completed assistant turn from the already-computed llm.Usage (D-02/D-02a). It is intentionally Insert-only — the window reads (ListSince/AggregateSince) are consumed by `aura cache-stats` in 06-04, not by the turn loop, so they are not part of the surface the Runner depends on.
type ContextBlockProvider ¶
ContextBlockProvider renders identity-aware context for messages[1]. The Runner composes its output before the legacy AlwaysBlock skill renderer.
type ConversationStore ¶
type ConversationStore interface {
Create(ctx context.Context, p conversations.CreateParams) (conversations.Conversation, error)
Get(ctx context.Context, conversationID string) (conversations.Conversation, error)
List(ctx context.Context, includeArchived bool) ([]conversations.Conversation, error)
UpdateStatus(ctx context.Context, conversationID, status string) error
Rename(ctx context.Context, conversationID, title string) error
SetTitleIfNull(ctx context.Context, conversationID, title string) error
CountTurns(ctx context.Context, conversationID string) (int, error)
AppendTurn(ctx context.Context, p conversations.AppendTurnParams) error
AppendAssistantTurnWithCacheMetric(ctx context.Context, p conversations.AppendTurnParams, metric sqlc.InsertCacheMetricParams) error
LoadHistory(ctx context.Context, conversationID string) ([]llm.Message, error)
LoadManagedHistory(ctx context.Context, conversationID string, cfg conversations.ContextConfig) ([]llm.Message, error)
// LoadManagedHistoryForBranch is the D-09 / CHAT-05 path-aware variant: it walks the
// SELECTED branch leaf->root and feeds that into the same ladder. TurnBranch uses it
// for the re-run-from-a-point; *conversations.Store satisfies it (plan 25-06).
LoadManagedHistoryForBranch(ctx context.Context, conversationID string, leafSeq int, cfg conversations.ContextConfig) ([]llm.Message, error)
SearchConversationTurns(ctx context.Context, query string, limit int) ([]conversations.SearchResult, error)
Delete(ctx context.Context, conversationID string) error
}
ConversationStore is the narrow conversation surface the Runner consumes (D-A2-02). *conversations.Store satisfies it implicitly. LoadManagedHistory returns the L1/L2/L2.5-ladder-applied history; LoadHistory is the raw byte-identical reconstruction (used by the auto-title worker so the title sees the full opening turns, not the compacted view).
type Deps ¶
type Deps struct {
Conv ConversationStore
Pause PauseStore
Identity IdentityStore
CacheMetrics CacheMetricStore
ToolInvocations ToolInvocationStore
Client llm.Client
Registry *tools.Registry
LLM llm.Config
// Breaker is the SHARED process-lifetime LLM circuit breaker (B-05). The
// composition root may inject one; nil => New mints the default. It is threaded
// into every per-turn agent so a provider outage trips cross-turn protection.
Breaker *llm.Breaker
RunDir string
PreviewCap int
EvictAfter int // AURA_CONTEXT_TOOL_EVICT_AFTER_TURNS — L1 eviction age
Workspace string // shell workspace announced per turn (#52/D-41); "" → the process cwd
TitleTimeout time.Duration
StopTimeout time.Duration
// AlwaysBlock renders the messages[1] always-on skill block per turn from current
// loader state (D-07). The composition root wires it over skills.RenderAlwaysBlock
// + the live loader; nil means no skills are wired (the block is empty). Rebuilt
// every turn so a skill add/remove changes messages[1] without busting messages[0].
ContextBlock ContextBlockProvider
AlwaysBlock func() string
ResumeHook ResumeHook
// Embedder wires the local embedding-based reasoning-tier classifier into
// each per-turn agent (replaces the LLM router round-trip). nil => the agent
// falls back to the LLM router. The composition root passes the granite
// sidecar client (documents.EmbeddingClient over Neo4j.EmbedURL).
Embedder prompt.Embedder
// ExampleStore folds oracle-labeled examples (Neo4j :ReasoningExample) into
// the classifier's centroids (self-improvement, spike 053). nil => seed-only.
ExampleStore prompt.ExampleStore
// ReasoningSaver persists new oracle-labeled examples for the async learner
// (the write half of self-improvement). Enabled only when LLM.ReasoningLearning
// is set; nil => no learning. The composition root passes the same Neo4j store
// as ExampleStore.
ReasoningSaver reasoninglearn.Saver
// HookManager is the optional agent extension surface. nil keeps the agent's
// hook calls as no-ops; production may inject command hooks here.
HookManager *agent.HookManager
// ToolSelectSaver persists oracle-confirmed (query -> tool) examples for the
// tool-selection active-learning loop (D-06/D-07) to :ToolSelectionExample. nil =>
// the loop is off (the tool_search ranker runs without the learned stage-2 boost).
// The composition root passes a *toolselectstore.Store over the same Neo4j client
// as ReasoningSaver, when the graph client opened.
ToolSelectSaver toolselectlearn.Saver
}
Deps are the Runner's constructor inputs: the three consumer-side Stores (narrow interfaces, D-A2-02), the LLM client + tool registry the fresh per-round LlmAgent is built from (D-A1-05/Pattern-4), the llm.Config (model + L2 context window inputs), and the bounded worker timeouts.
type IdentityStore ¶
type IdentityStore interface {
GetIdentityByName(ctx context.Context, name string) (identity.Identity, error)
GetIdentityByID(ctx context.Context, identityID string) (identity.Identity, error)
}
IdentityStore is the narrow identity surface the Runner consumes (D-A2-02). *identity.Store satisfies it implicitly. The Runner looks up the owning identity when it creates a new conversation (single-user `local` scaffolding, Slice 1.7).
type PauseStore ¶
type PauseStore interface {
Insert(ctx context.Context, p askuser.InsertParams) error
GetByToken(ctx context.Context, token string) (askuser.Pending, error)
ListPending(ctx context.Context, conversationID string) ([]askuser.Pending, error)
MarkResumed(ctx context.Context, token string, ans askuser.ResumeAnswer) error
MarkResumedBatch(ctx context.Context, answers map[string]askuser.ResumeAnswer) error
AutoResolveForConversation(ctx context.Context, conversationID string) error
}
PauseStore is the narrow paused_states surface the Runner consumes (D-A2-02). *askuser.Store satisfies it implicitly. The Runner is the SOLE caller of Insert (T-04-19: only the Runner writes paused_states, on a pause Event).
type ResponseInput ¶
ResponseInput is the CLI/caller-facing resume payload (the MCP three-action model, D-A3-01 / AM-02). It maps to askuser.ResumeAnswer at the Store boundary.
type ResumeHook ¶
ResumeHook is called after a paused ask_user response is persisted and before the runner resumes the pending turn.
type Runner ¶
type Runner struct {
Conv ConversationStore
// contains filtered or unexported fields
}
Runner is the orchestration layer (D-A1-01): it drives the agent turn-by-turn, persists each turn, is the SOLE writer of paused_states, resolves resumes as a FRESH agent.Run over rehydrated history (SC-4), and owns the auto-title WaitGroup. It is NOT an agent.Agent and does not collide with workflow.LoopAgent (AM-03). The exported Conv field lets the composition root / CLI read conversations directly (list/search/lifecycle) without re-plumbing the narrow interface; pause/title orchestration stays in the Runner.
func New ¶
New builds a Runner over the supplied dependencies, applying the timeout defaults when a caller leaves them zero.
func (*Runner) CloseLearner ¶
func (r *Runner) CloseLearner()
CloseLearner stops the async self-improvement workers (reasoning + tool-selection), if any. The composition root calls it at process shutdown (nil-safe).
func (*Runner) EnsureConversation ¶
EnsureConversation lazily creates the conversation row when it is absent and is a no-op when it already exists. Channels that key a stable conversation id off an external identifier (e.g. a Telegram chat id via a deterministic UUIDv5) have no explicit "new conversation" step like the REPL, so the first inbound message must create the row before Turn appends to it (appendUserTurn's AppendTurn FK-references the conversation). A Get short-circuits the common path; a concurrent first-message race that loses the Create is reconciled by a re-Get rather than surfaced as an error.
func (*Runner) NewConversation ¶
NewConversation creates a new active conversation owned by the seeded `local` identity and returns its id. The composition root / `aura chat new` calls it.
func (*Runner) NewConversationWithID ¶
NewConversationWithID creates a conversation with a caller-supplied id (the REPL mints the id so it can key the sidecar dir before the row exists).
func (*Runner) PendingFor ¶
PendingFor returns the still-unresolved pauses for a conversation in FIFO order (priority DESC, created_at ASC) — the REPL reads it to render each prompt inline.
func (*Runner) Stop ¶
Stop terminates the conversation lifecycle (D-A1-06 / Req#11): it auto-resolves every orphan pending (zero unresolved rows after) and joins the auto-title WaitGroup with a bounded wait so a hung worker cannot wedge shutdown (goleak-clean, D-A5-01). The wg.Wait() is the sync point tests hit so goleak sees no leak.
func (*Runner) SubmitAnswer ¶
SubmitAnswer resolves ONE pending pause with the three-action model (D-A3-01) and injects the answer into the conversation history so the next Turn(convID, nil) drives a fresh round over a wire-valid history (SC-4). It returns the remaining unresolved-pending count so the caller knows when remaining==0 to continue.
- accept → the answer is injected as RoleTool{ToolCallID:<original>, Content}.
- decline → a "user declined" RoleTool is injected so the model adapts.
- cancel → the turn is aborted via the Stop auto-resolve path (no injection); the caller should treat a cancel as a turn termination.
func (*Runner) SubmitAnswers ¶
SubmitAnswers resolves MANY pending pauses atomically (one MarkResumedBatch) and injects each as a RoleTool answer. Cancel actions short-circuit to the Stop auto-resolve path for the whole conversation. Returns the remaining count.
func (*Runner) TryLockThread ¶
TryLockThread attempts to acquire the per-conversation run lock without blocking. HTTP transports use it to return 409 instead of queueing an SSE run behind an already-active request.
func (*Runner) Turn ¶
func (r *Runner) Turn(ctx context.Context, convID string, userMsg *string) iter.Seq2[*agent.Event, error]
Turn drives ONE LLM round over the conversation and is the sole loop-driver (D-A1-06). userMsg!=nil starts a round with a fresh user message; userMsg=nil is "continue after resume" — the resolved answers are already RoleTool turns in the persisted history (SubmitAnswer wrote them), so Turn just re-runs a fresh agent over the rehydrated history (SC-4: no silent re-run, the answer pair is already in the messages). On an Actions.AwaitingInput Event it writes N paused_states rows (SOLE writer, T-04-19) and stops the loop while >=1 stays unresolved.
Resume = a FRESH agent.Run over rehydrated history (D-A1-05): a range-over-func iterator cannot be suspended, so durability is entirely in the Stores. The yield-after-false guard is honored (never yield again once the consumer returns false).
func (*Runner) TurnBranch ¶ added in v1.0.0
func (r *Runner) TurnBranch(ctx context.Context, convID string, leafSeq int) iter.Seq2[*agent.Event, error]
TurnBranch is the D-09 / CHAT-05 re-run-from-a-point primitive: it drives a fresh agent round over the SELECTED branch path (leafSeq) instead of the full linear history, with no fresh user message (continue-after-resume semantics, userMsg=nil) — the new branch's turns were already persisted by ForkBranch. The messages[0] head is byte-identical to the linear case (LoadManagedHistoryForBranch preserves the protected head; only body turns differ per branch — the CAP-04 cache invariant). leafSeq <= 0 falls back to the canonical branch leaf (the same history Turn would load).
type ToolInvocationStore ¶
type ToolInvocationStore interface {
Insert(ctx context.Context, e toolinvocations.Event) error
}
ToolInvocationStore is the narrow append-only tool ledger surface the Runner consumes. It records start/end facts emitted by the agent around dispatched tool execution.