runner

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 20, 2026 License: MIT Imports: 28 Imported by: 0

Documentation

Overview

Package runner is the orchestration layer that ties the Phase-4 substrate together (PRD 1.5 resume + 1.8 lifecycle + 1.8.5 search): the Runner drives the agent turn-by-turn, persists each turn via conversations.Store, is the SOLE writer of aura.paused_states (it observes the Actions.AwaitingInput pause Event and inserts the rows), resolves resumes as a FRESH agent.Run over rehydrated history (SC-4: no silent LLM re-run, no duplicate ask_user tool_call), and owns the goleak-clean auto-title WaitGroup (D-A5-01).

The Runner is NOT an agent.Agent and must NOT collide with internal/agent/workflow.LoopAgent (a control-flow Agent) — it is an orchestrator in the ADK-Go `Runner` sense (AM-03). It consumes CONSUMER-SIDE narrow interfaces declared here (D-A2-02, "accept interfaces, return structs"): only the methods it calls, satisfied implicitly by the concrete *conversations.Store / *askuser.Store / *identity.Store. Unit tests pass hand-written in-memory fakes (no DB → supports the 85% coverage floor); db_integration tests use the real Stores.

Index

Constants

This section is empty.

Variables

View Source
var ErrThreadBusy = errors.New("thread already has an in-flight run")

ErrThreadBusy reports that a caller tried to start a second concurrent run for the same conversation through a non-blocking channel such as AG-UI.

Functions

func WithThreadLockHeld

func WithThreadLockHeld(ctx context.Context) context.Context

WithThreadLockHeld marks a context whose caller already owns the per-thread runner lock. It lets HTTP gateways reject busy threads up front without making Runner.Turn take the same lock twice.

Types

type CacheMetricStore

type CacheMetricStore interface {
	Insert(ctx context.Context, p sqlc.InsertCacheMetricParams) error
}

CacheMetricStore is the narrow aura.cache_metrics surface the Runner consumes (D-A2-02). *cachemetrics.Store satisfies it implicitly. The Runner writes ONE append-only metric row per completed assistant turn from the already-computed llm.Usage (D-02/D-02a). It is intentionally Insert-only — the window reads (ListSince/AggregateSince) are consumed by `aura cache-stats` in 06-04, not by the turn loop, so they are not part of the surface the Runner depends on.

type ContextBlockProvider

type ContextBlockProvider func(ctx context.Context, owner identity.Identity) string

ContextBlockProvider renders identity-aware context for messages[1]. The Runner composes its output before the legacy AlwaysBlock skill renderer.

type ConversationStore

type ConversationStore interface {
	Create(ctx context.Context, p conversations.CreateParams) (conversations.Conversation, error)
	Get(ctx context.Context, conversationID string) (conversations.Conversation, error)
	List(ctx context.Context, includeArchived bool) ([]conversations.Conversation, error)
	UpdateStatus(ctx context.Context, conversationID, status string) error
	Rename(ctx context.Context, conversationID, title string) error
	SetTitleIfNull(ctx context.Context, conversationID, title string) error
	CountTurns(ctx context.Context, conversationID string) (int, error)
	AppendTurn(ctx context.Context, p conversations.AppendTurnParams) error
	AppendAssistantTurnWithCacheMetric(ctx context.Context, p conversations.AppendTurnParams, metric sqlc.InsertCacheMetricParams) error
	LoadHistory(ctx context.Context, conversationID string) ([]llm.Message, error)
	LoadManagedHistory(ctx context.Context, conversationID string, cfg conversations.ContextConfig) ([]llm.Message, error)
	// LoadManagedHistoryForBranch is the D-09 / CHAT-05 path-aware variant: it walks the
	// SELECTED branch leaf->root and feeds that into the same ladder. TurnBranch uses it
	// for the re-run-from-a-point; *conversations.Store satisfies it (plan 25-06).
	LoadManagedHistoryForBranch(ctx context.Context, conversationID string, leafSeq int, cfg conversations.ContextConfig) ([]llm.Message, error)
	SearchConversationTurns(ctx context.Context, query string, limit int) ([]conversations.SearchResult, error)
	Delete(ctx context.Context, conversationID string) error
}

ConversationStore is the narrow conversation surface the Runner consumes (D-A2-02). *conversations.Store satisfies it implicitly. LoadManagedHistory returns the L1/L2/L2.5-ladder-applied history; LoadHistory is the raw byte-identical reconstruction (used by the auto-title worker so the title sees the full opening turns, not the compacted view).

type Deps

type Deps struct {
	Conv            ConversationStore
	Pause           PauseStore
	Identity        IdentityStore
	CacheMetrics    CacheMetricStore
	ToolInvocations ToolInvocationStore
	Client          llm.Client
	Registry        *tools.Registry
	LLM             llm.Config
	// Breaker is the SHARED process-lifetime LLM circuit breaker (B-05). The
	// composition root may inject one; nil => New mints the default. It is threaded
	// into every per-turn agent so a provider outage trips cross-turn protection.
	Breaker      *llm.Breaker
	RunDir       string
	PreviewCap   int
	EvictAfter   int    // AURA_CONTEXT_TOOL_EVICT_AFTER_TURNS — L1 eviction age
	Workspace    string // shell workspace announced per turn (#52/D-41); "" → the process cwd
	TitleTimeout time.Duration
	StopTimeout  time.Duration
	// AlwaysBlock renders the messages[1] always-on skill block per turn from current
	// loader state (D-07). The composition root wires it over skills.RenderAlwaysBlock
	// + the live loader; nil means no skills are wired (the block is empty). Rebuilt
	// every turn so a skill add/remove changes messages[1] without busting messages[0].
	ContextBlock ContextBlockProvider
	AlwaysBlock  func() string
	ResumeHook   ResumeHook
	// Embedder wires the local embedding-based reasoning-tier classifier into
	// each per-turn agent (replaces the LLM router round-trip). nil => the agent
	// falls back to the LLM router. The composition root passes the granite
	// sidecar client (documents.EmbeddingClient over Neo4j.EmbedURL).
	Embedder prompt.Embedder
	// ExampleStore folds oracle-labeled examples (Neo4j :ReasoningExample) into
	// the classifier's centroids (self-improvement, spike 053). nil => seed-only.
	ExampleStore prompt.ExampleStore
	// ReasoningSaver persists new oracle-labeled examples for the async learner
	// (the write half of self-improvement). Enabled only when LLM.ReasoningLearning
	// is set; nil => no learning. The composition root passes the same Neo4j store
	// as ExampleStore.
	ReasoningSaver reasoninglearn.Saver
	// HookManager is the optional agent extension surface. nil keeps the agent's
	// hook calls as no-ops; production may inject command hooks here.
	HookManager *agent.HookManager
	// ToolSelectSaver persists oracle-confirmed (query -> tool) examples for the
	// tool-selection active-learning loop (D-06/D-07) to :ToolSelectionExample. nil =>
	// the loop is off (the tool_search ranker runs without the learned stage-2 boost).
	// The composition root passes a *toolselectstore.Store over the same Neo4j client
	// as ReasoningSaver, when the graph client opened.
	ToolSelectSaver toolselectlearn.Saver
}

Deps are the Runner's constructor inputs: the three consumer-side Stores (narrow interfaces, D-A2-02), the LLM client + tool registry the fresh per-round LlmAgent is built from (D-A1-05/Pattern-4), the llm.Config (model + L2 context window inputs), and the bounded worker timeouts.

type IdentityStore

type IdentityStore interface {
	GetIdentityByName(ctx context.Context, name string) (identity.Identity, error)
	GetIdentityByID(ctx context.Context, identityID string) (identity.Identity, error)
}

IdentityStore is the narrow identity surface the Runner consumes (D-A2-02). *identity.Store satisfies it implicitly. The Runner looks up the owning identity when it creates a new conversation (single-user `local` scaffolding, Slice 1.7).

type PauseStore

type PauseStore interface {
	Insert(ctx context.Context, p askuser.InsertParams) error
	GetByToken(ctx context.Context, token string) (askuser.Pending, error)
	ListPending(ctx context.Context, conversationID string) ([]askuser.Pending, error)
	MarkResumed(ctx context.Context, token string, ans askuser.ResumeAnswer) error
	MarkResumedBatch(ctx context.Context, answers map[string]askuser.ResumeAnswer) error
	AutoResolveForConversation(ctx context.Context, conversationID string) error
}

PauseStore is the narrow paused_states surface the Runner consumes (D-A2-02). *askuser.Store satisfies it implicitly. The Runner is the SOLE caller of Insert (T-04-19: only the Runner writes paused_states, on a pause Event).

type ResponseInput

type ResponseInput struct {
	Action  string // accept | decline | cancel
	Content string
}

ResponseInput is the CLI/caller-facing resume payload (the MCP three-action model, D-A3-01 / AM-02). It maps to askuser.ResumeAnswer at the Store boundary.

type ResumeHook

type ResumeHook func(ctx context.Context, pending askuser.Pending, resp ResponseInput) error

ResumeHook is called after a paused ask_user response is persisted and before the runner resumes the pending turn.

type Runner

type Runner struct {
	Conv ConversationStore
	// contains filtered or unexported fields
}

Runner is the orchestration layer (D-A1-01): it drives the agent turn-by-turn, persists each turn, is the SOLE writer of paused_states, resolves resumes as a FRESH agent.Run over rehydrated history (SC-4), and owns the auto-title WaitGroup. It is NOT an agent.Agent and does not collide with workflow.LoopAgent (AM-03). The exported Conv field lets the composition root / CLI read conversations directly (list/search/lifecycle) without re-plumbing the narrow interface; pause/title orchestration stays in the Runner.

func New

func New(d Deps) *Runner

New builds a Runner over the supplied dependencies, applying the timeout defaults when a caller leaves them zero.

func (*Runner) CloseLearner

func (r *Runner) CloseLearner()

CloseLearner stops the async self-improvement workers (reasoning + tool-selection), if any. The composition root calls it at process shutdown (nil-safe).

func (*Runner) EnsureConversation

func (r *Runner) EnsureConversation(ctx context.Context, convID string) error

EnsureConversation lazily creates the conversation row when it is absent and is a no-op when it already exists. Channels that key a stable conversation id off an external identifier (e.g. a Telegram chat id via a deterministic UUIDv5) have no explicit "new conversation" step like the REPL, so the first inbound message must create the row before Turn appends to it (appendUserTurn's AppendTurn FK-references the conversation). A Get short-circuits the common path; a concurrent first-message race that loses the Create is reconciled by a re-Get rather than surfaced as an error.

func (*Runner) NewConversation

func (r *Runner) NewConversation(ctx context.Context) (string, error)

NewConversation creates a new active conversation owned by the seeded `local` identity and returns its id. The composition root / `aura chat new` calls it.

func (*Runner) NewConversationWithID

func (r *Runner) NewConversationWithID(ctx context.Context, conversationID string) (string, error)

NewConversationWithID creates a conversation with a caller-supplied id (the REPL mints the id so it can key the sidecar dir before the row exists).

func (*Runner) PendingFor

func (r *Runner) PendingFor(ctx context.Context, conversationID string) ([]askuser.Pending, error)

PendingFor returns the still-unresolved pauses for a conversation in FIFO order (priority DESC, created_at ASC) — the REPL reads it to render each prompt inline.

func (*Runner) Stop

func (r *Runner) Stop(ctx context.Context, convID string) error

Stop terminates the conversation lifecycle (D-A1-06 / Req#11): it auto-resolves every orphan pending (zero unresolved rows after) and joins the auto-title WaitGroup with a bounded wait so a hung worker cannot wedge shutdown (goleak-clean, D-A5-01). The wg.Wait() is the sync point tests hit so goleak sees no leak.

func (*Runner) SubmitAnswer

func (r *Runner) SubmitAnswer(ctx context.Context, token string, resp ResponseInput) (int, error)

SubmitAnswer resolves ONE pending pause with the three-action model (D-A3-01) and injects the answer into the conversation history so the next Turn(convID, nil) drives a fresh round over a wire-valid history (SC-4). It returns the remaining unresolved-pending count so the caller knows when remaining==0 to continue.

  • accept → the answer is injected as RoleTool{ToolCallID:<original>, Content}.
  • decline → a "user declined" RoleTool is injected so the model adapts.
  • cancel → the turn is aborted via the Stop auto-resolve path (no injection); the caller should treat a cancel as a turn termination.

func (*Runner) SubmitAnswers

func (r *Runner) SubmitAnswers(ctx context.Context, answers map[string]ResponseInput) (int, error)

SubmitAnswers resolves MANY pending pauses atomically (one MarkResumedBatch) and injects each as a RoleTool answer. Cancel actions short-circuit to the Stop auto-resolve path for the whole conversation. Returns the remaining count.

func (*Runner) TryLockThread

func (r *Runner) TryLockThread(convID string) (func(), bool)

TryLockThread attempts to acquire the per-conversation run lock without blocking. HTTP transports use it to return 409 instead of queueing an SSE run behind an already-active request.

func (*Runner) Turn

func (r *Runner) Turn(ctx context.Context, convID string, userMsg *string) iter.Seq2[*agent.Event, error]

Turn drives ONE LLM round over the conversation and is the sole loop-driver (D-A1-06). userMsg!=nil starts a round with a fresh user message; userMsg=nil is "continue after resume" — the resolved answers are already RoleTool turns in the persisted history (SubmitAnswer wrote them), so Turn just re-runs a fresh agent over the rehydrated history (SC-4: no silent re-run, the answer pair is already in the messages). On an Actions.AwaitingInput Event it writes N paused_states rows (SOLE writer, T-04-19) and stops the loop while >=1 stays unresolved.

Resume = a FRESH agent.Run over rehydrated history (D-A1-05): a range-over-func iterator cannot be suspended, so durability is entirely in the Stores. The yield-after-false guard is honored (never yield again once the consumer returns false).

func (*Runner) TurnBranch added in v1.0.0

func (r *Runner) TurnBranch(ctx context.Context, convID string, leafSeq int) iter.Seq2[*agent.Event, error]

TurnBranch is the D-09 / CHAT-05 re-run-from-a-point primitive: it drives a fresh agent round over the SELECTED branch path (leafSeq) instead of the full linear history, with no fresh user message (continue-after-resume semantics, userMsg=nil) — the new branch's turns were already persisted by ForkBranch. The messages[0] head is byte-identical to the linear case (LoadManagedHistoryForBranch preserves the protected head; only body turns differ per branch — the CAP-04 cache invariant). leafSeq <= 0 falls back to the canonical branch leaf (the same history Turn would load).

type ToolInvocationStore

type ToolInvocationStore interface {
	Insert(ctx context.Context, e toolinvocations.Event) error
}

ToolInvocationStore is the narrow append-only tool ledger surface the Runner consumes. It records start/end facts emitted by the agent around dispatched tool execution.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL