core

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 18, 2026 License: MIT Imports: 36 Imported by: 0

README

Apteva Core

The continuous thinking engine. Runs autonomous AI agents that observe, reason, act, and evolve — around the clock.

Core is a standalone Go binary. It can run headless, with its own TUI, or managed by apteva-server.

Architecture

┌─────────────────────────────────────────────┐
│  Main Thread (coordinator)                  │
│  Observes events, spawns/kills threads      │
└──────────┬──────────────────────────────────┘
           │
     ┌─────┴─────┐
     │  EventBus  │ ← never blocks, pub/sub
     └─────┬─────┘
           │
    ┌──────┼──────┐
    ▼      ▼      ▼
 Thread  Thread  Thread   ← permanent or temporary workers
    │      │      │
    ▼      ▼      ▼
  MCP    MCP    MCP       ← external tools (stdio or HTTP)

Quick Start

# Set your API key
echo "FIREWORKS_API_KEY=your-key" > .env

# Build and run with TUI
go build -o apteva-core . && ./apteva-core

# Or run headless (API only)
./apteva-core --headless

Or use the apteva CLI which manages everything:

cd ../apteva && ./apteva   # spawns server + core + TUI

API

Default port: 3210 (set with API_PORT env var)

Endpoint Method Description
/health GET Health check
/status GET Uptime, iteration, rate, model, mode, threads, memory
/threads GET List all threads with state
/threads/{id} DELETE Kill a thread
/events GET SSE stream of telemetry events
/event POST Inject a message to a thread
/config GET/PUT Read/update config (directive, mode, MCP servers, computer)
/pause POST Toggle pause/resume

Core Tools

Always available to all threads:

Tool Description
pace Set thinking speed and model size
send Send message to another thread
done Terminate this thread
evolve Rewrite own directive
remember Store to persistent memory

Coordinator-only:

Tool Description
spawn Create new thread with directive and tools
kill Stop a thread
update Change a thread's directive/tools
connect Attach an MCP server at runtime
disconnect Detach an MCP server

Discoverable (RAG-retrieved when relevant):

Tool Description
web Fetch a URL
exec Run a shell command
computer_use Screen interaction (click, type, scroll, screenshot)
browser_session Navigate URLs, manage browser sessions

All tools support _reason — an optional observability field for explaining why the tool is being called.

Safety Modes

No forced approval gates. The agent decides, learns, asks when unsure.

Mode Behavior
autonomous Acts freely. Learns from feedback.
cautious Asks before risky actions. Learns from answers.
learn Asks about every new tool type. Builds safety profile.

Set via PUT /config {"mode": "cautious"} or the CLI /mode command.

Session Persistence

Conversation history persists across restarts:

  • JSONL files per thread: history/main.jsonl, history/<thread>.jsonl
  • Last 50 messages loaded on startup
  • Auto-compaction at 500 messages (keeps 100 recent + summaries)
  • Thread history deleted on [[done]] or [[kill]]

Providers

Provider Env Var Native Tool Calling
Fireworks (Kimi K2.5) FIREWORKS_API_KEY Yes
Anthropic (Claude) ANTHROPIC_API_KEY Yes + native computer use
OpenAI (GPT-4) OPENAI_API_KEY Yes
Google (Gemini) GOOGLE_API_KEY Yes

Configuration

{
  "directive": "Your mission here",
  "mode": "autonomous",
  "provider": {
    "name": "fireworks",
    "models": { "large": "accounts/fireworks/models/kimi-k2p5", "small": "accounts/fireworks/models/kimi-k2p5" }
  },
  "computer": { "type": "local", "width": 1280, "height": 800 },
  "mcp_servers": [
    { "name": "myservice", "command": "./my-server", "main_access": true }
  ]
}

Browser / Computer Use

{ "computer": { "type": "local" } }

Two tools registered when a computer is connected:

  • browser_session — open URLs, close, status (no screenshots)
  • computer_use — click, type, scroll, screenshot (returns images)

Implementations: local Chrome (auto-launched), Browserbase (cloud), custom HTTP service.

Testing

go test ./... -short              # unit tests
RUN_COMPUTER_TESTS=1 go test -run TestComputerUse_Local  # browser tests
go test -v -run TestScenario      # full agent scenarios

License

MIT — see LICENSE

Documentation

Index

Constants

View Source
const (
	DefaultBlobMaxTotal = int64(256 * 1024 * 1024) // 256 MB across all live blobs
	DefaultBlobTTL      = 30 * time.Minute         // age-based eviction
)

Default caps — liberal enough for typical audio/small-video flows, strict enough that a runaway agent can't swallow the process.

View Source
const (
	EventInbox       = "inbox"        // message addressed to a thinker (replaces inbox chan string)
	EventChunk       = "chunk"        // streaming token from LLM
	EventToolChunk   = "tool_chunk"   // streaming tool argument chunk from LLM
	EventThinkDone   = "think_done"   // completed a think cycle
	EventThinkError  = "think_error"  // error during think
	EventThreadStart = "thread_start" // thread spawned
	EventThreadDone  = "thread_done"  // thread terminated
)

Event types

View Source
const MaxSpawnDepth = 2

MaxSpawnDepth is the maximum depth for sub-thread spawning. Main = depth -1 (conceptual), its children = 0, grandchildren = 1, etc.

Variables

View Source
var (
	Version   = "dev"
	BuildTime = "dev"
)

Version + BuildTime are populated by cmd/apteva-core at startup with the values that ldflags injected into its `package main`. Default to "dev" so direct `go test` runs (which never call core.SetVersion) still work.

View Source
var ErrRealtimeNotImplemented = errors.New("realtime provider not implemented yet")

ErrRealtimeNotImplemented is returned by stub realtime providers that have been registered but whose Open() implementation hasn't landed yet. Callers should propagate this as a normal spawn error — the gate in the spawn handler turns it into a tool result the LLM can read.

View Source
var ErrStreamIdleTimeout = errors.New("stream idle timeout (provider went silent mid-response)")

ErrStreamIdleTimeout is returned by a stream reader that went silent for longer than the idle window. Callers can branch on it to tag telemetry as a stall instead of a generic I/O error.

View Source
var GeminiModelOrder = []string{
	"gemini-3.1-pro-preview",
	"gemini-3-flash-preview",
	"gemini-3.1-flash-lite-preview",
	"gemini-2.5-pro",
	"gemini-2.5-flash",
}

GeminiModelOrder defines the cycle order for model switching in the TUI.

Functions

func BuildMCPBinary

func BuildMCPBinary(t *testing.T, dir string) string

BuildMCPBinary compiles the Go MCP server in `dir` and returns the binary path. Pass `dir` either as an absolute path, a path relative to the current working directory, OR a path relative to the core module root (e.g. "mcps/helpdesk") — the helper tries each in turn, so callers from sibling packages don't need to thread their cwd.

func ChatContainsAny

func ChatContainsAny(replies []ChatReply, keywords ...string) bool

ChatContainsAny returns true if any reply's message contains at least one of the keywords (case-insensitive).

func CountTool

func CountTool(entries []ScenarioAuditEntry, tool string) int

CountTool returns the number of audit entries with `tool == name`.

func ModelContextWindow

func ModelContextWindow(modelID string) int

ModelContextWindow returns the advertised input-context window (in tokens) for a given model id. Pure static lookup — no network, no API call, ~hundreds of nanoseconds. Returns 0 if the model isn't in the table; the UI treats 0 as "unknown" and skips the % display.

Numbers come from each provider's own model documentation. Update when a new model ships or a provider changes a limit. Match is by substring so we tolerate the various id forms providers use (e.g. "claude-opus-4-7", "claude-opus-4-7-20251119", "claude-opus-4-7[1m]").

Order matters: longer / more specific keys checked first so "claude-opus-4-7[1m]" matches the 1M variant before falling through to the generic "claude-opus-4-7" 200K entry.

func Run

func Run()

Run is the apteva-core entrypoint. cmd/apteva-core/main.go calls this after wiring -ldflags Version/BuildTime via SetVersion.

func RunScenario

func RunScenario(t *testing.T, s Scenario)

RunScenario executes a scenario end-to-end against a freshly-built Thinker. Spawns a goroutine per scenario observer, polls each Phase's Wait condition until it returns true (or times out), runs Verify, and reports token/cost totals at the end.

func SetVersion

func SetVersion(version, buildTime string)

SetVersion is called by cmd/apteva-core/main.go to forward the -ldflags-injected build info from the binary's `package main` into core. Keeping the actual ldflag targets in `package main` of cmd/apteva-core preserves the existing Dockerfile flags ("-X main.Version=...") unchanged.

func ThreadIDs

func ThreadIDs(th *Thinker) []string

ThreadIDs returns the IDs of the Thinker's currently-live threads.

func WaitFor

func WaitFor(t *testing.T, timeout, interval time.Duration, desc string, cond func() bool)

WaitFor polls cond at `interval` until it returns true or `timeout` elapses (in which case t.Fatalf fires with `desc`).

func WriteJSONFile

func WriteJSONFile(t *testing.T, dir, name string, v any)

WriteJSONFile writes v as indented JSON to dir/name. Fatals on error.

Types

type APIEvent

type APIEvent struct {
	Time      time.Time `json:"time"`
	Type      string    `json:"type"` // "thought", "chunk", "reply", "thread_started", "thread_done", "error"
	ThreadID  string    `json:"thread_id"`
	Message   string    `json:"message,omitempty"`
	Iteration int       `json:"iteration,omitempty"`
	Duration  string    `json:"duration,omitempty"`
}

type APIServer

type APIServer struct {
	// contains filtered or unexported fields
}

type AnthropicProvider

type AnthropicProvider struct {
	// contains filtered or unexported fields
}

func (*AnthropicProvider) AvailableBuiltinTools

func (p *AnthropicProvider) AvailableBuiltinTools() []BuiltinTool

func (*AnthropicProvider) Chat

func (p *AnthropicProvider) Chat(ctx context.Context, messages []Message, model string, tools []NativeTool, onChunk func(string), onThinking func(string), onToolChunk func(string, string, string)) (ChatResponse, error)

func (*AnthropicProvider) CostPer1M

func (p *AnthropicProvider) CostPer1M() (float64, float64, float64)

func (*AnthropicProvider) Models

func (p *AnthropicProvider) Models() map[ModelTier]string

func (*AnthropicProvider) Name

func (p *AnthropicProvider) Name() string

func (*AnthropicProvider) SetBuiltinTools

func (p *AnthropicProvider) SetBuiltinTools(tools []string)

func (*AnthropicProvider) SupportsNativeTools

func (p *AnthropicProvider) SupportsNativeTools() bool

func (*AnthropicProvider) WithBuiltins

func (p *AnthropicProvider) WithBuiltins(builtins []string) LLMProvider

type AudioFormat

type AudioFormat string

AudioFormat names a wire encoding for realtime audio. Providers map these to their native config values (e.g. OpenAI "pcm16", "g711_ulaw").

const (
	AudioPCM16    AudioFormat = "pcm16"
	AudioG711ULaw AudioFormat = "g711_ulaw"
	AudioG711ALaw AudioFormat = "g711_alaw"
)

type AudioURL

type AudioURL struct {
	URL      string `json:"url"`                 // https:// or data:audio/...;base64,...
	MimeType string `json:"mime_type,omitempty"` // "audio/mp3", "audio/wav", etc. (auto-detected if empty)
}

type BlobStore

type BlobStore struct {
	// contains filtered or unexported fields
}

BlobStore is an in-process, in-memory store for binary payloads that flow between MCP tools without ever entering the LLM context.

When an MCP tool returns a JSON envelope of the form

{"_binary": true, "base64": "...", "mimeType": "...", "size": N}

the store keeps the decoded bytes and the tool-result text is rewritten to a compact handle:

{"_file": true, "ref": "blobref://<id>", "mimeType": "...", "size": N}

The LLM reads and references the handle. When it calls another tool and passes either the scalar "blobref://<id>" or an object {"_file_ref": "..."} as an argument value, the store rehydrates the value back into the full _binary envelope before dispatch — so the downstream tool sees real bytes and the bytes never traverse the LLM boundary.

State is in-memory only. Blobs age out after the configured TTL and are capped in aggregate size; the oldest blob is evicted first when the cap is reached on Put.

func NewBlobStore

func NewBlobStore(maxTotal int64, ttl time.Duration) *BlobStore

func (*BlobStore) Close

func (bs *BlobStore) Close()

func (*BlobStore) Count

func (bs *BlobStore) Count() int

Count returns the number of live blobs. Test/observability helper.

func (*BlobStore) Get

func (bs *BlobStore) Get(ref string) ([]byte, string, bool)

Get retrieves bytes and the original mimeType. Accepts either the full "blobref://<id>" ref or a bare id.

func (*BlobStore) Put

func (bs *BlobStore) Put(mime string, data []byte) string

Put stores bytes and returns a ref of the form "blobref://<id>". Evicts the oldest blob(s) if the aggregate cap would be exceeded.

func (*BlobStore) RehydrateFileRefs

func (bs *BlobStore) RehydrateFileRefs(args map[string]string) map[string]string

RehydrateFileRefs walks arg values and, for any value that references a known blob, replaces it with a full _binary envelope JSON string. Two reference forms are accepted:

  1. The scalar "blobref://<id>" — the LLM passed the ref string directly as an argument value.
  2. A JSON object {"_file_ref": "blobref://<id>"} — the LLM wrapped the ref to make intent explicit. The bare id form (without the blobref:// prefix) is also accepted inside _file_ref.

Unknown refs (expired / unknown ids) are left untouched so the downstream tool produces a clear error rather than a silent corruption. The original args map is not mutated; a new map is returned.

func (*BlobStore) RewriteBinaryToHandle

func (bs *BlobStore) RewriteBinaryToHandle(text string) string

RewriteBinaryToHandle inspects a tool-result string. If it parses as a JSON envelope with "_binary":true, the bytes are stored in the BlobStore and a compact "_file" handle string is returned in place of the original envelope. Non-binary text is returned unchanged.

The fast path (string does not contain "_binary") costs an O(n) substring search and no allocations, so this is cheap to call on every MCP tool result.

type BuiltinTool

type BuiltinTool struct {
	Type string `json:"type"` // e.g. "code_execution_20250825", "code_interpreter"
	Name string `json:"name"` // e.g. "code_execution", "code_interpreter"
}

BuiltinTool defines a provider-side tool (executed by the LLM provider, not by us).

type ChatReply

type ChatReply struct {
	User    string
	Message string
}

ChatReply represents a single chat reply parsed out of an audit log.

func ReadChatReplies

func ReadChatReplies(dir string) []ChatReply

ReadChatReplies parses send_reply audit entries into ChatReply records for chat-style scenarios.

type ChatResponse

type ChatResponse struct {
	Text          string             // streamed text content
	Reasoning     string             // accumulated chain-of-thought (Fireworks reasoning_content / OpenRouter reasoning); empty when the provider didn't emit any
	ToolCalls     []NativeToolCall   // structured tool calls WE need to execute
	ServerResults []ServerToolResult // tools the PROVIDER already executed
	Usage         TokenUsage
}

ChatResponse is the structured return from Chat().

type ComputerConfig

type ComputerConfig struct {
	Type      string `json:"type"`                 // "browserbase", "service"
	URL       string `json:"url,omitempty"`        // for "service" type
	APIKey    string `json:"api_key,omitempty"`    // for "browserbase"
	ProjectID string `json:"project_id,omitempty"` // for "browserbase"
	Width     int    `json:"width,omitempty"`      // display width (default 2000)
	Height    int    `json:"height,omitempty"`     // display height (default 1000)
}

ComputerConfig holds the configuration for a computer use environment.

type Config

type Config struct {
	Directive       string             `json:"directive"`
	Mode            RunMode            `json:"mode,omitempty"`
	Unconscious     bool               `json:"unconscious,omitempty"`      // enable background memory consolidation thread
	RealtimeEnabled bool               `json:"realtime_enabled,omitempty"` // master switch for realtime (voice/audio) threads; off = main never sees the capability and spawn rejects realtime=true
	Providers       []ProviderConfig   `json:"providers,omitempty"`        // multi-provider pool
	Provider        *ProviderConfig    `json:"provider,omitempty"`         // legacy single-provider (auto-migrated to Providers on load)
	Computer        *ComputerConfig    `json:"computer,omitempty"`
	Threads         []PersistentThread `json:"threads,omitempty"`
	MCPServers      []MCPServerConfig  `json:"mcp_servers,omitempty"`
	// contains filtered or unexported fields
}

func NewConfig

func NewConfig() *Config

func (*Config) ClearThreads

func (c *Config) ClearThreads()

func (*Config) GetDefaultProvider

func (c *Config) GetDefaultProvider() *ProviderConfig

GetDefaultProvider returns the default provider config, or nil.

func (*Config) GetDirective

func (c *Config) GetDirective() string

func (*Config) GetMCPServers

func (c *Config) GetMCPServers() []MCPServerConfig

func (*Config) GetMode

func (c *Config) GetMode() RunMode

func (*Config) GetProvider

func (c *Config) GetProvider() *ProviderConfig

GetProvider returns the persisted default provider config, or nil. Backward-compatible wrapper around GetDefaultProvider.

func (*Config) GetProviderByName

func (c *Config) GetProviderByName(name string) *ProviderConfig

GetProviderByName returns a provider config by name, or nil.

func (*Config) GetProviders

func (c *Config) GetProviders() []ProviderConfig

GetProviders returns a copy of the providers list.

func (*Config) GetThreads

func (c *Config) GetThreads() []PersistentThread

func (*Config) RealtimeEnabledFlag

func (c *Config) RealtimeEnabledFlag() bool

RealtimeEnabledFlag returns whether realtime (voice/audio) threads are enabled on this instance. Read under the config lock so it reflects the current value if toggled at runtime via the HTTP config endpoint. Naming avoids collision with the bare field.

func (*Config) RemoveMCPServer

func (c *Config) RemoveMCPServer(name string)

func (*Config) RemoveProvider

func (c *Config) RemoveProvider(name string)

RemoveProvider removes a provider by name.

func (*Config) RemoveThread

func (c *Config) RemoveThread(id string)

func (*Config) Save

func (c *Config) Save() error

func (*Config) SaveMCPServer

func (c *Config) SaveMCPServer(cfg MCPServerConfig)

func (*Config) SaveThread

func (c *Config) SaveThread(pt PersistentThread)

func (*Config) SetDefaultProvider

func (c *Config) SetDefaultProvider(name string)

SetDefaultProvider marks a provider as default (clears default on others).

func (*Config) SetDirective

func (c *Config) SetDirective(d string)

func (*Config) SetMode

func (c *Config) SetMode(m RunMode)

func (*Config) SetProvider

func (c *Config) SetProvider(pc *ProviderConfig)

SetProvider adds or updates a provider in the list. If it's the only one, marks it default.

func (*Config) SetProviderModel

func (c *Config) SetProviderModel(tier string, modelID string)

SetProviderModel updates a single model tier for a provider (default if not specified).

func (*Config) SetProviderName

func (c *Config) SetProviderName(name string)

SetProviderName adds or updates a provider by name with default flag.

type ConsoleLogger

type ConsoleLogger struct {
	// contains filtered or unexported fields
}

ConsoleLogger renders live telemetry events to stderr with colors. Used in headless mode to provide human-readable output without the TUI.

func NewConsoleLogger

func NewConsoleLogger(t *Telemetry) *ConsoleLogger

func (*ConsoleLogger) Run

func (c *ConsoleLogger) Run()

type ContentPart

type ContentPart struct {
	Type       string      `json:"type"`                  // "text", "image_url", "input_audio", "audio_url"
	Text       string      `json:"text,omitempty"`        // type=text
	ImageURL   *ImageURL   `json:"image_url,omitempty"`   // type=image_url
	InputAudio *InputAudio `json:"input_audio,omitempty"` // type=input_audio
	AudioURL   *AudioURL   `json:"audio_url,omitempty"`   // type=audio_url
}

ContentPart represents a multimodal content block (OpenAI Chat Completions format).

type Delta

type Delta struct {
	Content string `json:"content"`
}

type DirectiveChangeData

type DirectiveChangeData struct {
	Old string `json:"old,omitempty"`
	New string `json:"new"`
}

type Event

type Event struct {
	Type string // one of the Event* constants
	From string // source: "main", thread ID, "tui", "api", "tool:name"
	To   string // target subscriber ID; "" = broadcast

	Text       string        // message payload
	ToolName   string        // tool name (for EventToolChunk)
	Parts      []ContentPart // optional media (images, audio) attached to this event
	ToolResult *ToolResult   // optional: structured tool result (for computer_use etc.)

	// Structured fields (populated for ThinkDone events)
	Iteration      int
	Duration       time.Duration
	ConsumedEvents []string
	Usage          TokenUsage
	ToolCalls      []string
	Replies        []string
	Rate           ThinkRate
	SleepDuration  time.Duration
	Model          ModelTier
	MemoryCount    int
	ThreadCount    int
	ContextMsgs    int // number of messages in context window
	ContextChars   int // approximate character count of context
	Error          error
}

Event is the single message type flowing through the system.

type EventBus

type EventBus struct {
	// contains filtered or unexported fields
}

EventBus is the central pub/sub hub. All thinkers share one bus.

func NewEventBus

func NewEventBus() *EventBus

func (*EventBus) DropStats

func (b *EventBus) DropStats() map[string]uint64

DropStats returns a snapshot of dropped-event counts keyed by subscription id. Useful for tests and ops dashboards to detect backpressure. Order is not guaranteed.

func (*EventBus) Publish

func (b *EventBus) Publish(ev Event)

Publish delivers an event to all matching subscriptions. It never blocks the publisher: when a subscriber's channel is full, the event is discarded and the subscription's Dropped counter is incremented. Drops are logged (rate-limited per subscription) so silent loss is impossible to miss in the logs.

Subscribers are snapshotted under a read lock and delivered to without holding the lock, so one slow channel never stalls other subscribers or blocks concurrent publishers / (un)subscribes.

func (*EventBus) Subscribe

func (b *EventBus) Subscribe(id string, buffer int) *Subscription

Subscribe creates a targeted subscription. Receives events where To == id, plus broadcasts (To == "").

func (*EventBus) SubscribeAll

func (b *EventBus) SubscribeAll(id string, buffer int) *Subscription

SubscribeAll creates an observer subscription that receives ALL events. Used by TUI, API SSE, tests.

func (*EventBus) Unsubscribe

func (b *EventBus) Unsubscribe(id string)

Unsubscribe removes a subscription.

type ExtraSystemBlock

type ExtraSystemBlock struct {
	Preview string `json:"preview"` // first ~60 chars, newlines stripped
	Bytes   int    `json:"bytes"`
}

ExtraSystemBlock describes a role=system message that sits AFTER messages[0] — the [memories] block appended each iteration is the canonical example, but anything the thinker injects ends up here.

type GoogleModel

type GoogleModel struct {
	ID              string
	InputPer1M      float64
	CachedPer1M     float64
	OutputPer1M     float64
	MaxOutputTokens int
}

GoogleModel holds metadata for a Gemini model.

type GoogleProvider

type GoogleProvider struct {
	// contains filtered or unexported fields
}

func (*GoogleProvider) ActiveModel

func (p *GoogleProvider) ActiveModel() string

ActiveModel returns the current model ID.

func (*GoogleProvider) AvailableBuiltinTools

func (p *GoogleProvider) AvailableBuiltinTools() []BuiltinTool

func (*GoogleProvider) AvailableModels

func (p *GoogleProvider) AvailableModels() []string

AvailableModels returns all supported Gemini model IDs in cycle order.

func (*GoogleProvider) Chat

func (p *GoogleProvider) Chat(ctx context.Context, messages []Message, model string, tools []NativeTool, onChunk func(string), onThinking func(string), onToolChunk func(string, string, string)) (ChatResponse, error)

func (*GoogleProvider) CostPer1M

func (p *GoogleProvider) CostPer1M() (float64, float64, float64)

func (*GoogleProvider) Models

func (p *GoogleProvider) Models() map[ModelTier]string

func (*GoogleProvider) Name

func (p *GoogleProvider) Name() string

func (*GoogleProvider) SetBuiltinTools

func (p *GoogleProvider) SetBuiltinTools(tools []string)

func (*GoogleProvider) SetModel

func (p *GoogleProvider) SetModel(modelID string)

SetModel updates the active model. Called from TUI model cycling.

func (*GoogleProvider) SupportsNativeTools

func (p *GoogleProvider) SupportsNativeTools() bool

func (*GoogleProvider) WithBuiltins

func (p *GoogleProvider) WithBuiltins(builtins []string) LLMProvider

type ImageURL

type ImageURL struct {
	URL    string `json:"url"`              // https:// or data:image/...;base64,...
	Detail string `json:"detail,omitempty"` // "low", "high", "auto"
}

type IndexEntry

type IndexEntry struct {
	Server      string
	Name        string // fully-qualified (e.g. "storage_files_upload")
	Description string
	NoSpawn     bool // sub-threads cannot see this tool in search
	// contains filtered or unexported fields
}

IndexEntry is one tool's worth of searchable metadata.

type InputAudio

type InputAudio struct {
	Data   string `json:"data"`   // base64
	Format string `json:"format"` // "wav", "mp3"
}

type LLMChunkData

type LLMChunkData struct {
	Text      string `json:"text"`
	Iteration int    `json:"iteration"`
}

type LLMDoneData

type LLMDoneData struct {
	Model        string `json:"model"`
	TokensIn     int    `json:"tokens_in"`
	TokensCached int    `json:"tokens_cached"`
	TokensOut    int    `json:"tokens_out"`
	DurationMs   int64  `json:"duration_ms"`
	// cost_usd is no longer populated by core — pricing lives in the
	// server, which enriches llm.done events with a canonical
	// cost_usd on ingest. Removing the field from the Go type keeps
	// core free of pricing data, but the wire format is still the
	// same map-of-strings consumed by dashboards and persisted by
	// the server.
	Iteration    int    `json:"iteration"`
	Rate         string `json:"rate"`
	ContextMsgs  int    `json:"context_msgs"`
	ContextChars int    `json:"context_chars"`
	// MaxContextTokens is the model's advertised input-context window
	// (in tokens). Comes from a static lookup keyed on the model id —
	// see ModelContextWindow. 0 when the model isn't in the table; UI
	// should treat 0 as "unknown" and skip percentage rendering.
	MaxContextTokens int    `json:"max_context_tokens,omitempty"`
	MemoryCount      int    `json:"memory_count"`
	ThreadCount      int    `json:"thread_count"`
	Message          string `json:"message,omitempty"`
}

type LLMErrorData

type LLMErrorData struct {
	Model     string `json:"model"`
	Error     string `json:"error"`
	Iteration int    `json:"iteration"`
}

type LLMProvider

type LLMProvider interface {
	// Chat sends messages and streams the response.
	// ctx is propagated to the underlying HTTP request so cancellation
	// (user abort, shutdown) unblocks an in-flight stream cleanly.
	// tools: native tool definitions to include in the request (nil = no tools).
	// onChunk is called for each text token chunk as it arrives.
	// onThinking is called for each reasoning/thinking token (separate from output).
	// onToolChunk is called for each tool argument chunk as it streams
	// (toolName, callID, argChunk). callID disambiguates two parallel
	// calls of the same tool in one response — providers pass their own
	// stable per-call id (tc.Index for OpenAI, content_block id for
	// Anthropic). Empty string is acceptable.
	// Returns ChatResponse with text, tool calls, and usage.
	Chat(ctx context.Context, messages []Message, model string, tools []NativeTool, onChunk func(string), onThinking func(string), onToolChunk func(toolName, callID, chunk string)) (ChatResponse, error)

	// Models returns model IDs for each tier.
	Models() map[ModelTier]string

	// Name returns the provider name for display/telemetry.
	Name() string

	// CostPer1M returns pricing per 1M tokens: (input, cached, output).
	CostPer1M() (float64, float64, float64)

	// SupportsNativeTools returns true if this provider handles structured tool calling.
	SupportsNativeTools() bool

	// AvailableBuiltinTools returns built-in tools this provider supports.
	AvailableBuiltinTools() []BuiltinTool

	// SetBuiltinTools enables specific built-in tools.
	SetBuiltinTools(tools []string)

	// WithBuiltins returns a shallow clone of this provider with only the specified builtins enabled.
	// If builtins is nil, returns the provider unchanged (inherit all).
	WithBuiltins(builtins []string) LLMProvider
}

LLMProvider abstracts the LLM API call. All thinking, threading, tool handling stays in the Thinker. The provider only handles: send messages → get streaming response.

func NewAnthropicProvider

func NewAnthropicProvider(apiKey string) LLMProvider

func NewFireworksProvider

func NewFireworksProvider(apiKey string) LLMProvider

func NewGoogleProvider

func NewGoogleProvider(apiKey string) LLMProvider

func NewNvidiaProvider

func NewNvidiaProvider(apiKey string) LLMProvider

NewNvidiaProvider wires up NVIDIA's NIM hosted catalog. They expose an OpenAI-compatible Chat Completions endpoint at integrate.api.nvidia.com, so we reuse OpenAICompatProvider verbatim — just the base URL and default model slugs are NVIDIA-specific. Pricing is left at zero by default because NIM billing is account-scoped rather than per-token-listed, so cost projections in the dashboard stats bar will stay at $0 unless the user manually overrides these costs via config.

func NewOllamaProvider

func NewOllamaProvider(host string) LLMProvider

func NewOpenAINativeProvider

func NewOpenAINativeProvider(apiKey string) LLMProvider

func NewOpenAIProvider

func NewOpenAIProvider(apiKey string) LLMProvider

func NewOpenCodeGoProvider

func NewOpenCodeGoProvider(apiKey string) LLMProvider

NewOpenCodeGoProvider — flat-rate subscription gateway from opencode.ai/go that fronts the same Kimi K2.6 we use via Fireworks plus Qwen / GLM / MiMo variants under one OpenAI-compatible endpoint.

Per-token costs are placeholders (0/0/0) because OpenCode Go bills per subscription, not per call — the server's model_fetch.go pricing table reports the same so the dashboard's per-call $ figure stays blank rather than misleadingly nonzero.

Defaults: kimi-k2.6 across all three tiers. With a flat-rate plan the per-iteration cost incentive that justified Qwen on small/medium for token-priced providers doesn't apply, so we let the agent run the strongest model end-to-end. Users who want to stretch the monthly cap with cheaper tiers can override per-instance in the dashboard provider settings.

func NewVeniceProvider

func NewVeniceProvider(apiKey string) LLMProvider

NewVeniceProvider — privacy-focused inference gateway at venice.ai. OpenAI-compatible /chat/completions endpoint, large rotating catalog (Llama, Qwen, GLM, Mistral, plus Claude/Grok/Gemini reseller variants). Pricing varies per model and is set per-account in their dashboard, so per-token costs are left at zero here — the model picker / picker dropdown is the source of truth for what's available right now.

type MCPConn

type MCPConn interface {
	GetName() string
	ListTools() ([]mcpToolDef, error)
	CallTool(name string, args map[string]string) (string, error)
	Close()
}

MCPConn is the interface for any MCP server connection (stdio or HTTP)

type MCPHTTPServer

type MCPHTTPServer struct {
	Name string
	// contains filtered or unexported fields
}

MCPHTTPServer connects to an MCP server via Streamable HTTP transport. Per MCP spec 2025-03-26: POST for requests, single endpoint.

Compatibility notes for real-world hosted MCP servers (observed against Composio's backend.composio.dev):

  • Some servers host the MCP endpoint at a sub-path and respond to the parent path with HTTP 307 → Location (appending "/mcp"). Go's default client strips the POST body on 307 redirects, so we disable auto- redirects and re-issue the POST manually with the body intact. After the first successful hop we store the resolved URL and skip the redirect on every subsequent call.
  • Some servers return SSE-framed responses (`Content-Type: text/event-stream` with one or more `event: message\ndata: {...}\n\n` frames) instead of plain JSON, even for POST requests. We parse both.

func (*MCPHTTPServer) CallTool

func (s *MCPHTTPServer) CallTool(name string, args map[string]string) (string, error)

func (*MCPHTTPServer) Close

func (s *MCPHTTPServer) Close()

func (*MCPHTTPServer) GetName

func (s *MCPHTTPServer) GetName() string

func (*MCPHTTPServer) ListTools

func (s *MCPHTTPServer) ListTools() ([]mcpToolDef, error)

type MCPServer

type MCPServer struct {
	Name string
	// contains filtered or unexported fields
}

MCPServer manages a running MCP server subprocess (stdio transport)

func (*MCPServer) CallTool

func (s *MCPServer) CallTool(name string, args map[string]string) (string, error)

CallTool invokes a tool on the MCP server

func (*MCPServer) Close

func (s *MCPServer) Close()

func (*MCPServer) GetName

func (s *MCPServer) GetName() string

func (*MCPServer) ListTools

func (s *MCPServer) ListTools() ([]mcpToolDef, error)

ListTools calls tools/list on the MCP server

type MCPServerConfig

type MCPServerConfig struct {
	Name      string            `json:"name"`
	Command   string            `json:"command,omitempty"`   // stdio transport
	Args      []string          `json:"args,omitempty"`      // stdio transport
	Env       map[string]string `json:"env,omitempty"`       // stdio transport
	Transport string            `json:"transport,omitempty"` // "stdio" (default) or "http"
	URL       string            `json:"url,omitempty"`       // http transport
	// NoSpawn, when true, hides this server's tools from sub-thread
	// search_tools results and refuses sub-thread spawn(mcps=[...])
	// attachments. Used for infrastructure-level servers the host
	// wires in for main-thread-only responsibilities (management
	// gateway, outbound channel bridges) where letting a worker
	// invoke them would be a privilege escalation. Core has no
	// knowledge of which names are "system" — the host sets this
	// flag when registering those entries. The privileged HTTP spawn
	// path (POST /threads/{id}) sets SpawnOpts.BypassNoSpawn to
	// punch through this filter for system-initiated workers
	// (channelchat's chat-handling thread needs `channels`).
	NoSpawn bool `json:"no_spawn,omitempty"`
}

MCPServerConfig is stored in config.json.

Note: the legacy `main_access` field is intentionally absent. Earlier versions of core split MCPs into "main" (tools eagerly registered to the main thread) and "catalog" (tools held off main, attachable only by spawning a sub-thread with mcp="name"). That distinction is gone: every MCP attached here is connected and indexed, every thread activates the subset it needs via search_tools or spawn-time MCPNames. Old configs containing main_access:true|false deserialize cleanly — json.Unmarshal silently drops the unknown field.

type MCPServerInfo

type MCPServerInfo struct {
	Name      string
	ToolCount int
}

MCPServerInfo is a lightweight catalog entry for an MCP server. Main uses this to show available servers in its prompt without registering all tools.

type MemoryRecord

type MemoryRecord struct {
	ID         string    `json:"id"`
	TS         time.Time `json:"ts"`
	Content    string    `json:"content,omitempty"`
	Tags       []string  `json:"tags,omitempty"`
	Weight     float64   `json:"weight,omitempty"`
	Supersedes string    `json:"supersedes,omitempty"`
	Embedding  []float64 `json:"embedding,omitempty"`

	// Tombstone bits.
	Tombstone bool   `json:"tombstone,omitempty"`
	IDTarget  string `json:"id_target,omitempty"`
	Reason    string `json:"reason,omitempty"`
}

MemoryRecord is one line in memory.jsonl. Either a memory or a tombstone — never both.

func (MemoryRecord) IsTombstone

func (r MemoryRecord) IsTombstone() bool

IsTombstone reports whether this record is a tombstone marker.

type MemoryStore

type MemoryStore struct {
	// contains filtered or unexported fields
}

MemoryStore is the in-process journal owner. Append-only on disk, rebuilds the active-set on load.

func NewMemoryStore

func NewMemoryStore(apiKey string) *MemoryStore

NewMemoryStore opens (or creates) memory.jsonl at the cwd, picks an embedding backend from env, runs the legacy-format migration if needed, and returns a ready store. apiKey is kept for backward- compat as a forced Fireworks key when env-based detection finds nothing — passing "" defers entirely to env.

func (*MemoryStore) Active

func (ms *MemoryStore) Active() []MemoryRecord

Active returns the current active memories — everything not tombstoned and not superseded by a newer record. Returned slice is a copy; callers can mutate / sort freely.

func (*MemoryStore) All

func (ms *MemoryStore) All() []MemoryRecord

All returns every record in insertion order, including tombstones and superseded entries. Used by the dashboard memory panel for debugging / audit.

func (*MemoryStore) BuildContext

func (ms *MemoryStore) BuildContext(records []MemoryRecord) string

BuildContext renders a slice of memories as the dynamic-context [memories] block. Header is explicit about provenance so the LLM reads these as memories, not current statements — that's the structural defense against the fabricated-approvals failure mode.

func (*MemoryStore) Count

func (ms *MemoryStore) Count() int

Count returns the number of currently-active memories. Used by telemetry and the unconscious's directive ("you have N memories").

func (*MemoryStore) Drop

func (ms *MemoryStore) Drop(id, reason string) error

Drop tombstones a memory by id. reason is required.

func (*MemoryStore) Enabled

func (ms *MemoryStore) Enabled() bool

Enabled reports whether embeddings are available. Lexical scoring still works either way; callers that ONLY need embeddings (RAG tool indexing in api.go / thinker.go) check this to short-circuit.

func (*MemoryStore) HasID

func (ms *MemoryStore) HasID(id string) bool

HasID reports whether a record with this id exists in the journal (active, tombstoned, or superseded — anything that was ever written). Used by callers that want to decide between RememberWithID (insert) and Supersede (update) without racing on the not-found error.

func (*MemoryStore) Recall

func (ms *MemoryStore) Recall(query string, n int) []MemoryRecord

Recall returns the top-N active memories scored by relevance to the given query context — multi-factor: cosine × weight × decay, with a lexical fallback when no embedding backend.

Used by buildDynamicTurnContext for auto-injection at every turn. N is typically 3–5 with a token-budget cap applied by the caller.

func (*MemoryStore) Remember

func (ms *MemoryStore) Remember(content string, tags []string, weight float64) (string, error)

Remember writes a fresh memory and returns its id. weight defaults to 0.7 if zero. tags may be nil. Embedding is computed when a backend is configured; on failure the record is still written without an embedding (lexical recall continues to work).

func (*MemoryStore) RememberWithID

func (ms *MemoryStore) RememberWithID(id, content string, tags []string, weight float64) (string, error)

RememberWithID is Remember with a caller-supplied id instead of a freshly-minted ULID. Required for deterministic ids — the platform uses this to push skill-as-memory records keyed by the skill's primary key, so re-pushing the same skill upserts via Supersede rather than creating a duplicate row.

Errors if id is empty (use Remember) or if the id already exists (caller should call HasID first and route to Supersede). Refusing silent overwrite keeps the journal append-only semantics intact.

func (*MemoryStore) Search

func (ms *MemoryStore) Search(query string, limit int) []MemoryRecord

Search returns active memories matching the query. Embedding-based when a backend is configured, lexical (BM25-ish over content + tags) otherwise. Used by the unconscious's memory_search tool to look up existing memories before deciding remember vs supersede.

func (*MemoryStore) Supersede

func (ms *MemoryStore) Supersede(oldID, content string, tags []string, weight float64, reason string) (string, error)

Supersede writes a NEW memory and a tombstone for oldID, linking them via the new record's Supersedes field. Both records are appended atomically (one after the other, no other writer in between because we hold the lock for both). Returns the new id.

type Message

type Message struct {
	Role        string           `json:"role"`
	Content     string           `json:"content"`
	Parts       []ContentPart    `json:"parts,omitempty"`        // multimodal content
	ToolCalls   []NativeToolCall `json:"tool_calls,omitempty"`   // assistant messages: structured tool calls
	ToolResults []ToolResult     `json:"tool_results,omitempty"` // user messages: results for prior tool calls
	// Reasoning is the model's chain-of-thought from the turn that
	// produced this message. We replay it back to the provider on
	// subsequent turns because Moonshot (Kimi K2.6 via OpenCode Go)
	// requires `reasoning_content` to be present on assistant
	// tool_call messages when thinking mode is enabled — without it
	// every multi-turn tool flow 400s. Other providers ignore it.
	Reasoning string `json:"reasoning,omitempty"`
}

func (Message) HasParts

func (m Message) HasParts() bool

HasParts returns true if this message has multimodal content.

func (Message) TextContent

func (m Message) TextContent() string

TextContent returns the text content of a message, whether plain Content or from Parts.

type ModelTier

type ModelTier int
const (
	ModelLarge ModelTier = iota
	ModelMedium
	ModelSmall
)

func (ModelTier) String

func (m ModelTier) String() string

type NativeTool

type NativeTool struct {
	Name        string         `json:"name"`
	Description string         `json:"description"`
	Parameters  map[string]any `json:"parameters"` // JSON Schema
}

NativeTool defines a tool sent to the provider API.

type NativeToolCall

type NativeToolCall struct {
	ID               string            `json:"id"` // provider-assigned ID for matching results
	Name             string            `json:"name"`
	Args             map[string]string `json:"args"`
	ThoughtSignature string            `json:"thought_signature,omitempty"` // Gemini: encrypted reasoning state
}

NativeToolCall is a structured tool call returned by the provider.

type NativeToolSize

type NativeToolSize struct {
	Name  string `json:"name"`
	Kind  string `json:"kind"` // "core" | "mcp" | "local"
	Bytes int    `json:"bytes"`
}

NativeToolSize describes one entry in the tools[] payload sent to the LLM. Kind separates core loop tools from local / MCP-main-access tools so the user can see which flavor is burning bytes.

type OpenAICompatProvider

type OpenAICompatProvider struct {
	// contains filtered or unexported fields
}

OpenAICompatProvider works with any OpenAI-compatible API: Fireworks, OpenAI, Ollama, Together, Groq, etc.

func (*OpenAICompatProvider) AvailableBuiltinTools

func (p *OpenAICompatProvider) AvailableBuiltinTools() []BuiltinTool

func (*OpenAICompatProvider) Chat

func (p *OpenAICompatProvider) Chat(ctx context.Context, messages []Message, model string, tools []NativeTool, onChunk func(string), onThinking func(string), onToolChunk func(string, string, string)) (ChatResponse, error)

func (*OpenAICompatProvider) CostPer1M

func (p *OpenAICompatProvider) CostPer1M() (float64, float64, float64)

func (*OpenAICompatProvider) Models

func (p *OpenAICompatProvider) Models() map[ModelTier]string

func (*OpenAICompatProvider) Name

func (p *OpenAICompatProvider) Name() string

func (*OpenAICompatProvider) SetBuiltinTools

func (p *OpenAICompatProvider) SetBuiltinTools(tools []string)

func (*OpenAICompatProvider) SupportsNativeTools

func (p *OpenAICompatProvider) SupportsNativeTools() bool

func (*OpenAICompatProvider) WithBuiltins

func (p *OpenAICompatProvider) WithBuiltins(builtins []string) LLMProvider

type OpenAINativeProvider

type OpenAINativeProvider struct {
	// contains filtered or unexported fields
}

OpenAINativeProvider uses the OpenAI Responses API for native computer use, web_search, code_interpreter, and other OpenAI-specific features. For OpenAI-compatible endpoints (Fireworks, Ollama, etc.), use OpenAICompatProvider.

func (*OpenAINativeProvider) AvailableBuiltinTools

func (p *OpenAINativeProvider) AvailableBuiltinTools() []BuiltinTool

func (*OpenAINativeProvider) Chat

func (p *OpenAINativeProvider) Chat(ctx context.Context, messages []Message, model string, tools []NativeTool, onChunk func(string), onThinking func(string), onToolChunk func(string, string, string)) (ChatResponse, error)

func (*OpenAINativeProvider) CostPer1M

func (p *OpenAINativeProvider) CostPer1M() (float64, float64, float64)

func (*OpenAINativeProvider) Models

func (p *OpenAINativeProvider) Models() map[ModelTier]string

func (*OpenAINativeProvider) Name

func (p *OpenAINativeProvider) Name() string

func (*OpenAINativeProvider) SetBuiltinTools

func (p *OpenAINativeProvider) SetBuiltinTools(tools []string)

func (*OpenAINativeProvider) SupportsNativeTools

func (p *OpenAINativeProvider) SupportsNativeTools() bool

func (*OpenAINativeProvider) WithBuiltins

func (p *OpenAINativeProvider) WithBuiltins(builtins []string) LLMProvider

type OpenAIRealtimeProvider

type OpenAIRealtimeProvider struct {
	// contains filtered or unexported fields
}

OpenAIRealtimeProvider is the stub for OpenAI's Realtime API (wss://api.openai.com/v1/realtime). This stage of the realtime rollout registers the provider in the pool so the rest of the surface (config, spawn gate, prompt bullet) can be exercised end-to-end; the WebSocket client lives in a follow-up change.

Open() returns ErrRealtimeNotImplemented today. When that lands it'll dial the WebSocket, send a session.update with the directive + tools, fan events out on the RealtimeEvent channel, and forward audio/text/tool-result calls. The interface boundary is final; adding the impl won't change the surface.

func NewOpenAIRealtimeProvider

func NewOpenAIRealtimeProvider(apiKey string) *OpenAIRealtimeProvider

NewOpenAIRealtimeProvider constructs a provider bound to the given API key. Models map can be overridden via config; defaults pick OpenAI's current realtime preview.

func (*OpenAIRealtimeProvider) CostPer1M

func (p *OpenAIRealtimeProvider) CostPer1M() (in, cached, out, audio float64)

CostPer1M returns realtime pricing. Numbers reflect OpenAI's published rates for gpt-4o-realtime-preview at the time of scaffolding; verify before relying on cost accounting in production. Audio rate is the per-1M-token rate for audio I/O, which is billed separately from text.

func (*OpenAIRealtimeProvider) DefaultVoice

func (p *OpenAIRealtimeProvider) DefaultVoice() string

func (*OpenAIRealtimeProvider) Models

func (p *OpenAIRealtimeProvider) Models() map[ModelTier]string

func (*OpenAIRealtimeProvider) Name

func (p *OpenAIRealtimeProvider) Name() string

func (*OpenAIRealtimeProvider) Open

Open dials the OpenAI Realtime WebSocket, sends the initial session.update, and returns a live session. ctx governs the dial + handshake; the returned session manages its own goroutines for reads/writes and will surface a RealtimeEventSessionEnded event when the connection drops.

type PersistentThread

type PersistentThread struct {
	ID        string   `json:"id"`
	Name      string   `json:"name,omitempty"`      // human-readable label; empty = display as ID
	ParentID  string   `json:"parent_id,omitempty"` // empty = child of main
	Depth     int      `json:"depth,omitempty"`     // 0 = main's direct child
	System    bool     `json:"system,omitempty"`    // system thread (can't be killed by LLM)
	Directive string   `json:"directive"`
	Tools     []string `json:"tools"`
	MCPNames  []string `json:"mcp_names,omitempty"` // MCP servers to connect on respawn
	Realtime  bool     `json:"realtime,omitempty"`  // spawn as a realtime (voice/audio) thread
	Voice     string   `json:"voice,omitempty"`     // realtime voice id (e.g. "alloy"); empty = provider default
}

type Phase

type Phase struct {
	Name    string
	Setup   func(t *testing.T, dir string)                   // optional: inject data before this phase
	Wait    func(t *testing.T, dir string, th *Thinker) bool // poll condition (return true when done)
	Verify  func(t *testing.T, dir string, th *Thinker)      // optional: assertions after Wait succeeds
	Timeout time.Duration
}

Phase is a step in a scenario.

type PromptComposition

type PromptComposition struct {
	System         SystemBreakdown    `json:"system"`
	NativeTools    []NativeToolSize   `json:"native_tools"`
	NativeBytes    int                `json:"native_bytes"`
	ExtraSystem    []ExtraSystemBlock `json:"extra_system"` // role=system msgs after messages[0]
	ExtraBytes     int                `json:"extra_bytes"`
	ConvBytes      int                `json:"conv_bytes"` // user/assistant/tool/other messages
	GrandTotal     int                `json:"grand_total"`
	ModelMaxTokens int                `json:"model_max_tokens,omitempty"`
}

buildComposition is a read-only instrumentation pass over a thread's current messages + live registry state. It produces a size breakdown of everything the LLM actually sees on a call:

  1. System text (messages[0].Content) split into the sections that buildSystemPrompt emits. No double-pass required — we just look for the known "[SECTION]" markers in the text that was already built.

  2. Native tool schemas (registry.NativeTools). Each tool's name + description + JSON-serialized Parameters contributes to the provider's `tools[]` payload; we estimate its on-wire size by marshalling it.

  3. The separate "[memories]" system message and any other role=system messages appended after the main prompt (they all travel as prompt tokens too).

  4. All remaining user / assistant / tool messages rolled up so the user sees grand_total = everything sent on the next call.

Pure read, pure compute. Nothing on the hot path calls this.

type PromptTokensDetails

type PromptTokensDetails struct {
	CachedTokens int `json:"cached_tokens"`
}

type ProviderConfig

type ProviderConfig struct {
	Name          string            `json:"name"`                     // "google", "openai", "anthropic", "fireworks", "ollama", "openai-realtime"
	Default       bool              `json:"default,omitempty"`        // true = default provider (first match wins)
	Models        map[string]string `json:"models,omitempty"`         // "large" → model ID, "medium" → ..., "small" → ...
	BuiltinTools  []string          `json:"builtin_tools,omitempty"`  // e.g. ["code_execution"]
	RealtimeVoice string            `json:"realtime_voice,omitempty"` // default voice for realtime providers (e.g. "alloy")
}

ProviderConfig persists a provider and its model selections.

type ProviderPool

type ProviderPool struct {
	// contains filtered or unexported fields
}

ProviderPool holds multiple LLM providers keyed by name. Supports default selection and fallback on error.

Realtime providers (RealtimeProvider) live in a separate map so the existing LLM-provider plumbing (Default, Fallback, ProviderSummary, etc.) stays untouched. A name can in principle appear in both maps if a vendor offers both APIs, though in practice they're distinct (e.g. "openai" text vs "openai-realtime").

func (*ProviderPool) Count

func (pp *ProviderPool) Count() int

Count returns the number of providers in the pool.

func (*ProviderPool) Default

func (pp *ProviderPool) Default() LLMProvider

Default returns the default provider.

func (*ProviderPool) DefaultName

func (pp *ProviderPool) DefaultName() string

DefaultName returns the name of the default provider.

func (*ProviderPool) Fallback

func (pp *ProviderPool) Fallback(exclude string) LLMProvider

Fallback returns the next provider in the fallback chain after the excluded one.

func (*ProviderPool) Get

func (pp *ProviderPool) Get(name string) LLMProvider

Get returns a provider by name, or nil if not found.

func (*ProviderPool) HasRealtimeProvider

func (pp *ProviderPool) HasRealtimeProvider() bool

HasRealtimeProvider reports whether any RealtimeProvider is registered. Used as one half of the realtime feature gate (the other being Config.RealtimeEnabled): if no provider is registered, the main thread is never told realtime exists and spawn rejects realtime=true.

func (*ProviderPool) Names

func (pp *ProviderPool) Names() []string

Names returns all provider names in config order.

func (*ProviderPool) ProviderSummary

func (pp *ProviderPool) ProviderSummary(name string) string

ProviderSummary returns a description of a provider for system prompt injection.

func (*ProviderPool) RealtimeByName

func (pp *ProviderPool) RealtimeByName(name string) RealtimeProvider

RealtimeByName returns a RealtimeProvider by name, or nil.

func (*ProviderPool) RealtimeDefault

func (pp *ProviderPool) RealtimeDefault() RealtimeProvider

RealtimeDefault returns the default RealtimeProvider, or nil if none registered.

func (*ProviderPool) RealtimeNames

func (pp *ProviderPool) RealtimeNames() []string

RealtimeNames returns all registered realtime provider names.

type RealtimeEvent

type RealtimeEvent struct {
	Type RealtimeEventType

	// Audio (RealtimeEventAudioOut)
	Audio []byte

	// Transcript (RealtimeEventTranscript*)
	Transcript string
	Final      bool // true = end-of-utterance, false = partial

	// Tool call (RealtimeEventToolCall)
	ToolCallID string
	ToolName   string
	ToolArgs   string // raw JSON string

	// Error (RealtimeEventError)
	Err error
}

RealtimeEvent is a single event from a session. Only the fields relevant to the Type are populated.

type RealtimeEventType

type RealtimeEventType string

RealtimeEventType discriminates the union of events a session can emit. Receivers should switch on Type before reading fields.

const (
	RealtimeEventAudioOut         RealtimeEventType = "audio_out"         // PCM chunk
	RealtimeEventTranscriptInput  RealtimeEventType = "transcript_input"  // user said
	RealtimeEventTranscriptOutput RealtimeEventType = "transcript_output" // model said
	RealtimeEventToolCall         RealtimeEventType = "tool_call"
	RealtimeEventResponseDone     RealtimeEventType = "response_done"
	RealtimeEventSessionEnded     RealtimeEventType = "session_ended"
	RealtimeEventError            RealtimeEventType = "error"
)

type RealtimeProvider

type RealtimeProvider interface {
	Name() string
	Models() map[ModelTier]string
	// CostPer1M returns (input_text, cached_text, output_text, audio)
	// pricing per 1M tokens / characters. Audio is a separate rate
	// because realtime APIs bill audio in/out very differently from text.
	CostPer1M() (in, cached, out, audio float64)

	// DefaultVoice returns the provider's preferred voice when the
	// caller doesn't specify one. Empty string is acceptable for
	// providers that pick a voice server-side.
	DefaultVoice() string

	// Open establishes a session. ctx propagates to the underlying
	// connection; cancelling ctx closes the session cleanly. The
	// returned session is bound to ctx for its lifetime.
	Open(ctx context.Context, opts RealtimeSessionOpts) (RealtimeSession, error)
}

RealtimeProvider is the parallel of LLMProvider for bidirectional, streaming, audio-capable models (e.g. OpenAI Realtime, Gemini Live).

Where LLMProvider.Chat() is request → response over HTTP+SSE, a RealtimeProvider holds a persistent WebSocket session for the lifetime of a realtime thread. Audio in / audio out flow over channels owned by the returned RealtimeSession; the thinker layer never touches the audio path itself.

Two interfaces, not one, because the request/response and streaming-session models are fundamentally different shapes — fusing them would either lossy-shim realtime into Chat() or force every text provider to implement methods it has no notion of. Common metadata (Name, Models, CostPer1M) lives on both for uniformity at registration time.

type RealtimeSession

type RealtimeSession interface {
	// SendAudio pushes a chunk of input audio in the configured
	// AudioInFmt to the model. Non-blocking on the model side — the
	// session buffers and the network does its own back-pressure.
	SendAudio(pcm []byte) error

	// SendText injects a text message into the conversation. role is
	// one of "user", "system", "assistant". Used by main to deliver
	// out-of-band context to the realtime worker without speaking it
	// into the call (e.g. "[from:main] caller is VIP, be warm").
	SendText(role, text string) error

	// SendToolResult delivers the result of a tool call back to the
	// model. callID matches the id from the corresponding
	// RealtimeToolCallEvent.
	SendToolResult(callID, result string, isError bool) error

	// UpdateInstructions replaces the session's system instructions
	// in place (provider-side session.update). The conversation
	// history is preserved; only the directive shifts.
	UpdateInstructions(directive string) error

	// Interrupt cancels the model's current utterance. Used when new
	// user audio arrives during model speech, or when main sends a
	// course-correction the worker decides to act on immediately.
	Interrupt() error

	// Events returns the unified event stream from the session:
	// audio out, transcript fragments, tool calls, lifecycle events,
	// errors. Channel closes when the session ends.
	Events() <-chan RealtimeEvent

	// Close terminates the session and releases resources. Safe to
	// call multiple times.
	Close() error
}

RealtimeSession is the live, bidirectional handle to a single model session. All methods are safe to call concurrently with each other and with Events() consumption.

type RealtimeSessionOpts

type RealtimeSessionOpts struct {
	Model        string       // provider-specific model id
	Voice        string       // empty = provider default
	Instructions string       // the thread's directive
	Tools        []NativeTool // tool schemas to expose to the model
	AudioInFmt   AudioFormat
	AudioOutFmt  AudioFormat
	Temperature  float64 // 0 = provider default
}

RealtimeSessionOpts is the connect-time configuration for a realtime session. Once the session is open, mutable fields can be updated via UpdateInstructions / similar — opts itself is consumed only at Open.

type RealtimeThinker

type RealtimeThinker struct {
	*Thinker
	// contains filtered or unexported fields
}

RealtimeThinker drives a thread whose conversation runs over a RealtimeSession (bidirectional WebSocket) instead of discrete request/response Chat() calls.

It embeds *Thinker for shared state — registry, bus subscription, pause/quit channels, memory, config, telemetry. The standard Run() loop is replaced by an event-driven select that fans together:

  • session events from the model (audio out, transcript, tool calls)
  • inbound audio from the caller (mic / telephony)
  • bus inbox events (send from other threads, lifecycle)
  • pause / quit signals

All Thinker mechanics that don't assume the iteration loop (executeTool, pendingTools tracking, persistence, telemetry) are reused unchanged.

func (*RealtimeThinker) Run

func (rt *RealtimeThinker) Run()

Run is the event-driven counterpart to Thinker.Run. It blocks until the session ends, the thread is killed, or the bus quits. Tool calls fire as goroutines via the existing executeTool plumbing and their results are delivered back via session.SendToolResult.

type Request

type Request struct {
	Model    string    `json:"model"`
	Messages []Message `json:"messages"`
	Stream   bool      `json:"stream"`
}

type RunMode

type RunMode string

RunMode controls the agent's safety behavior via system prompt guidance.

const (
	ModeAutonomous RunMode = "autonomous" // agent operates freely, asks when it thinks it should
	ModeCautious   RunMode = "cautious"   // agent asks before destructive/external actions
	ModeLearn      RunMode = "learn"      // agent actively asks about new tool types, builds safety profile
)

type Scenario

type Scenario struct {
	Name       string
	Directive  string
	MCPServers []MCPServerConfig // {{dataDir}} in Env values is replaced at runtime
	Providers  []ProviderConfig  // multi-provider pool config (optional)
	DataSetup  func(t *testing.T, dir string)
	Phases     []Phase
	Timeout    time.Duration // hard cap for entire scenario
}

Scenario defines a complete agent behavior test.

Note: there are deliberately no thread-count knobs here. Scenarios assert on OUTCOMES — the goal got done — not on HOW the agent got there. Whether it spawned five workers or did the work inline on main is the agent's call; the harness only tracks peak threads as an informational line in the run summary.

type ScenarioAuditEntry

type ScenarioAuditEntry struct {
	Time string            `json:"time"`
	Tool string            `json:"tool"`
	Args map[string]string `json:"args"`
}

ScenarioAuditEntry is the row shape mock MCPs write into audit.jsonl.

func ReadAuditEntries

func ReadAuditEntries(dir string) []ScenarioAuditEntry

ReadAuditEntries parses the audit.jsonl file the mock MCPs write into dataDir. Returns nil if the file doesn't exist yet.

type ServerToolResult

type ServerToolResult struct {
	ToolName string `json:"tool_name"`
	Code     string `json:"code,omitempty"`   // code that was executed
	Output   string `json:"output,omitempty"` // stdout/result
	Error    string `json:"error,omitempty"`  // stderr if any
}

ServerToolResult is the result of a built-in tool executed server-side.

type Session

type Session struct {
	// contains filtered or unexported fields
}

Session manages persistent JSONL history for one thread.

func NewSession

func NewSession(baseDir, threadID string) *Session

NewSession creates or opens a session file for a thread.

func (*Session) Append

func (s *Session) Append(entry SessionEntry)

Append writes one entry to the history file.

func (*Session) AppendMessage

func (s *Session) AppendMessage(msg Message, iteration int, usage TokenUsage)

AppendMessage is a convenience to append a Message as a SessionEntry.

func (*Session) Compact

func (s *Session) Compact(summarize func(text string) string)

Compact summarizes old messages and rewrites the file. summarize is a function that takes messages text and returns a summary.

func (*Session) Count

func (s *Session) Count() int

Count returns the approximate number of entries.

func (*Session) Delete

func (s *Session) Delete()

Delete removes the history file.

func (*Session) LoadTail

func (s *Session) LoadTail(n int) (messages []Message, compactedSummaries []string)

LoadTail reads the last n messages from the history file and converts them to Messages. Skips system messages and _compacted entries (compacted summaries are prepended as context).

func (*Session) NeedsCompaction

func (s *Session) NeedsCompaction() bool

NeedsCompaction returns true if the file is large enough to compact.

func (*Session) Rename

func (s *Session) Rename(newThreadID string) error

Rename moves the on-disk history file when a thread's id changes. Best-effort: a missing source file (the thread had no entries yet) is treated as success so the caller can rename the in-memory record without worrying about whether disk state existed.

type SessionEntry

type SessionEntry struct {
	Timestamp   time.Time        `json:"ts"`
	Role        string           `json:"role"` // "system", "user", "assistant", "tool_result", "_compacted"
	Content     string           `json:"content"`
	Parts       []ContentPart    `json:"parts,omitempty"`
	ToolCalls   []NativeToolCall `json:"tool_calls,omitempty"`
	ToolResults []ToolResult     `json:"tool_results,omitempty"`
	Summary     string           `json:"summary,omitempty"`        // for _compacted entries
	OrigCount   int              `json:"original_count,omitempty"` // how many messages were compacted
	TokensIn    int              `json:"tokens_in,omitempty"`
	TokensOut   int              `json:"tokens_out,omitempty"`
	Iteration   int              `json:"iteration,omitempty"`
}

SessionEntry is one line in the JSONL history file.

type SpawnOpts

type SpawnOpts struct {
	MediaParts      []ContentPart
	ProviderName    string // override provider from pool (empty = inherit parent)
	InitialMessages []string
	ParentID        string   // "main" or parent thread ID (empty = "main")
	Depth           int      // depth in the spawn tree (0 = child of main)
	MCPNames        []string // MCP servers whose tools preload into the child's activeTools at boot
	// Tools, when set, preloads specific tool names (across any server)
	// into the child's activeTools at boot. Complements MCPNames:
	// MCPNames = "give me everything from these servers", Tools =
	// "give me exactly these names". Both are additive. Used by the
	// privileged HTTP spawn endpoint (POST /threads/{id}) for system
	// callers that know which tools they need; the LLM-driven spawn
	// tool path leaves this nil and uses mcps=[…] instead.
	Tools        []string
	BuiltinTools []string // provider builtin overrides (nil = inherit, empty = none)
	DeferRun     bool     // if true, don't start Run() — call StartAll() later
	// Paused: if true, the thread spawns in paused state. Run() loop
	// blocks at the top of its first iteration until either an inbox
	// event arrives (an explicit `send` from the leader) OR the
	// thinker is unpaused via PauseAll(false). Useful for
	//   - "configure-then-launch" patterns where the leader spawns
	//     several workers atomically before any of them think
	//   - cautious/learn modes that want children to wait for explicit
	//     instruction rather than acting on the directive alone
	//   - debugging — inspect the worker before it does anything
	Paused bool
	// BypassNoSpawn skips the no_spawn MCP filter. Set by the
	// authenticated HTTP spawn endpoint (POST /threads/{id}) where
	// the caller has the core API key — the system itself is asking
	// for a privileged sub-thread (e.g. channelchat's chat thread
	// needs the `channels` MCP to reply to users). The LLM-driven
	// spawn tool path never sets this, so an in-agent worker still
	// can't escalate by attaching a no_spawn MCP.
	BypassNoSpawn bool
	// Realtime: if true, construct a realtime (voice/audio) thread
	// driven by a RealtimeProvider session instead of the standard
	// request/response Thinker. ProviderName must resolve to a
	// registered RealtimeProvider (e.g. "openai-realtime"); when
	// empty, the pool's RealtimeDefault is used. SpawnWithOpts will
	// refuse with a clear error if no realtime provider is available.
	Realtime bool
	// Voice: realtime voice id (e.g. "alloy"). Empty = provider's
	// default. Ignored when Realtime is false.
	Voice string
	// AudioIn: PCM audio chunks pushed by the caller (telephony
	// bridge, browser WebRTC, mic source). The realtime thread reads
	// these and forwards to session.SendAudio. nil = no inbound audio
	// (text-only realtime — useful for tests). Ignored when
	// Realtime is false.
	AudioIn <-chan []byte
	// AudioOut: PCM audio chunks the realtime thread writes when the
	// model speaks. Caller plays/streams them. nil = audio output
	// silently dropped. Ignored when Realtime is false.
	AudioOut chan<- []byte
}

SpawnOpts holds optional parameters for spawning a thread.

type StreamChoice

type StreamChoice struct {
	Delta Delta `json:"delta"`
}

type StreamEvent

type StreamEvent struct {
	Choices []StreamChoice `json:"choices"`
	Usage   *Usage         `json:"usage,omitempty"`
}

type Subscription

type Subscription struct {
	ID      string
	C       chan Event
	Wake    chan struct{} // signaled on every new event delivery
	Dropped uint64        // atomic
	// contains filtered or unexported fields
}

Subscription is a handle returned by Subscribe/SubscribeAll.

Dropped is an atomic counter of events the bus discarded because this subscription's channel was full. Non-zero means the consumer is not keeping up with the publish rate and some events were lost (order of remaining events is preserved — drops are tail truncations of the backlog, not reorderings). Read with atomic.LoadUint64; inspect from anywhere safely.

type SystemBreakdown

type SystemBreakdown struct {
	Base            int `json:"base"`             // up to "CORE TOOLS — always available:"
	CoreTools       int `json:"core_tools"`       // "CORE TOOLS — always available:" to next [ marker
	RetrievedTools  int `json:"retrieved_tools"`  // [available tools — matched to your current context] block (RAG per-turn)
	MCPServers      int `json:"mcp_servers"`      // [AVAILABLE MCP SERVERS] block
	MCPToolDocs     int `json:"mcp_tool_docs"`    // [MCP TOOLS — available for sub-threads] block
	Providers       int `json:"providers"`        // [AVAILABLE PROVIDERS] block
	ActiveThreads   int `json:"active_threads"`   // [ACTIVE THREADS] block
	SafetyMode      int `json:"safety_mode"`      // [SAFETY MODE: ...] block
	Skills          int `json:"skills"`           // [LEARNED SKILLS] block
	BlobHint        int `json:"blob_hint"`        // [FILE HANDLES] block
	PreviousContext int `json:"previous_context"` // [PREVIOUS CONTEXT] block (from session load)
	Directive       int `json:"directive"`        // [DIRECTIVE — EXECUTE ON STARTUP] block
	Other           int `json:"other"`            // text not matching any known marker
	Total           int `json:"total"`
}

SystemBreakdown is the per-section byte count of messages[0].Content, i.e. the main system prompt. Unknown text that falls between known markers lands in Other so the total always reconciles with Total.

type Telemetry

type Telemetry struct {
	// contains filtered or unexported fields
}

Telemetry collects events and forwards them to the server.

func NewTelemetry

func NewTelemetry() *Telemetry

func (*Telemetry) DroppedLiveEvents

func (t *Telemetry) DroppedLiveEvents() int64

DroppedLiveEvents returns the cumulative count of live-forward events that were discarded because the buffer was full. Useful for health checks and end-of-run diagnostics.

func (*Telemetry) Emit

func (t *Telemetry) Emit(eventType, threadID string, data any)

Emit records a telemetry event (stored + forwarded to server).

func (*Telemetry) EmitLive

func (t *Telemetry) EmitLive(eventType, threadID string, data any)

EmitLive records a telemetry event for SSE only (not forwarded to server).

func (*Telemetry) Events

func (t *Telemetry) Events(since int) ([]TelemetryEvent, int)

Events returns all events (including live-only) since the given index. Used by SSE. If the log was truncated (since > len), reset to return everything available.

func (*Telemetry) Stop

func (t *Telemetry) Stop()

func (*Telemetry) StoredEvents

func (t *Telemetry) StoredEvents(since int) ([]TelemetryEvent, int)

StoredEvents returns only stored events since the given index. Used by forwardLoop. If the log was truncated (since > len), reset to return everything available.

type TelemetryEvent

type TelemetryEvent struct {
	ID         string          `json:"id"`
	InstanceID int64           `json:"instance_id,omitempty"`
	ThreadID   string          `json:"thread_id"`
	Type       string          `json:"type"`
	Time       time.Time       `json:"time"`
	Data       json.RawMessage `json:"data"`
}

TelemetryEvent is the unified event format — matches server schema.

type ThinkRate

type ThinkRate int
const (
	RateReactive ThinkRate = iota // 500ms — event just arrived
	RateFast                      // 2s — actively working
	RateNormal                    // 10s — thinking, no urgency
	RateSlow                      // 30s — not much to do
	RateSleep                     // 120s — deep idle
)

func (ThinkRate) Delay

func (r ThinkRate) Delay() time.Duration

func (ThinkRate) String

func (r ThinkRate) String() string

type Thinker

type Thinker struct {
	// contains filtered or unexported fields
}

func NewThinker

func NewThinker(apiKey string, provider LLMProvider, cfg ...*Config) *Thinker

func (*Thinker) APIEvents

func (t *Thinker) APIEvents(since int) ([]APIEvent, int)

func (*Thinker) Config

func (t *Thinker) Config() *Config

Config returns the Config this Thinker was constructed from.

func (*Thinker) Inject

func (t *Thinker) Inject(msg string)

Inject sends a message event to this thinker's bus subscription.

func (*Thinker) InjectConsole

func (t *Thinker) InjectConsole(msg string)

InjectConsole sends a console event to this thinker.

func (*Thinker) InjectWithParts

func (t *Thinker) InjectWithParts(text string, parts []ContentPart)

InjectWithParts sends a text event with media parts attached.

func (*Thinker) Iteration

func (t *Thinker) Iteration() int

Iteration returns the current think-loop iteration counter.

func (*Thinker) Memory

func (t *Thinker) Memory() *MemoryStore

Memory returns the MemoryStore this Thinker reads/writes against.

func (*Thinker) Messages

func (t *Thinker) Messages() []Message

Messages returns the current message slice. Returned slice shares backing storage with the Thinker — copy if you need to retain it.

func (*Thinker) Pool

func (t *Thinker) Pool() *ProviderPool

Pool returns the underlying provider pool.

func (*Thinker) ReloadDirective

func (t *Thinker) ReloadDirective()

func (*Thinker) ResetConversation

func (t *Thinker) ResetConversation()

ResetConversation truncates the message history back to the initial system prompt. Used by long-running scenarios that need to clear context between phases without rebuilding the Thinker.

func (*Thinker) Run

func (t *Thinker) Run()

func (*Thinker) SetComputer

func (t *Thinker) SetComputer(c computer.Computer)

SetComputer attaches a computer use environment to this thinker. Registers computer_use as a tool in the registry for non-Anthropic providers.

func (*Thinker) Shutdown

func (t *Thinker) Shutdown()

Shutdown releases external resources held by the thinker: currently only the computer-use browser session. Safe to call multiple times. Used by the main signal handler so SIGTERM/SIGINT closes Chrome (local) or REQUEST_RELEASEs the session (Browserbase) instead of orphaning it when the server SIGKILLs core during instance stop.

func (*Thinker) Stop

func (t *Thinker) Stop()

func (*Thinker) Threads

func (t *Thinker) Threads() *ThreadManager

Threads returns the ThreadManager owning this Thinker's worker threads. Read-only intent; callers must not mutate the returned value's fields directly.

func (*Thinker) TogglePause

func (t *Thinker) TogglePause()

type Thread

type Thread struct {
	ID   string
	Name string // human-readable label, separate from ID. ID is immutable;
	// Name can be edited via update without touching parent_id
	// references or session storage. Empty means "use ID for display".
	ParentID  string   // "main" or parent thread ID
	Depth     int      // 0 = child of main, 1 = grandchild, etc.
	Directive string   // original directive before tool docs
	MCPNames  []string // MCP server names this thread connected to
	Thinker   *Thinker
	Realtime  *RealtimeThinker // non-nil for realtime (voice/audio) threads; runs in place of Thinker.Run
	Parent    *Thinker
	Children  *ThreadManager // non-nil if this thread can spawn (depth < MaxSpawnDepth)
	Tools     map[string]bool
	Started   time.Time
	// contains filtered or unexported fields
}

type ThreadDoneData

type ThreadDoneData struct {
	ParentID string `json:"parent_id"`
	Result   string `json:"result,omitempty"`
}

type ThreadInfo

type ThreadInfo struct {
	ID           string
	Name         string // human-readable display label; empty = render id
	ParentID     string // "main" or parent thread ID
	Depth        int
	Directive    string
	Tools        []string
	MCPNames     []string
	Running      bool
	Iteration    int
	Rate         ThinkRate
	Model        ModelTier
	Provider     string // active provider name
	Started      time.Time
	ContextMsgs  int
	ContextChars int
	SubThreads   int // number of direct children
}

func AllThreadInfos

func AllThreadInfos(tm *ThreadManager) []ThreadInfo

AllThreadInfos walks tm and all nested ThreadManagers, returning flattened ThreadInfo records. Used by scenarios that assert on the total tree of agent threads, not just the top-level set.

type ThreadManager

type ThreadManager struct {
	// contains filtered or unexported fields
}

func NewThreadManager

func NewThreadManager(parent *Thinker) *ThreadManager

func (*ThreadManager) Count

func (tm *ThreadManager) Count() int

func (*ThreadManager) Kill

func (tm *ThreadManager) Kill(id string)

func (*ThreadManager) KillAll

func (tm *ThreadManager) KillAll()

func (*ThreadManager) List

func (tm *ThreadManager) List() []ThreadInfo

func (*ThreadManager) PauseAll

func (tm *ThreadManager) PauseAll(paused bool)

PauseAll pauses or resumes all child threads.

func (*ThreadManager) Rename

func (tm *ThreadManager) Rename(oldID, newID string) error

Rename changes a thread's immutable id. Touches every reference:

  • the threads map key
  • children's ParentID
  • the persistent record (delete old, save new)
  • the on-disk session file
  • emits thread.renamed telemetry so the dashboard can swap its record over to the new identity

Returns an error if the new id is empty, equals the old, collides with an existing sibling, or any of the persistence steps fail.

func (*ThreadManager) Send

func (tm *ThreadManager) Send(id, message string) bool

func (*ThreadManager) SendWithParts

func (tm *ThreadManager) SendWithParts(id, message string, parts []ContentPart) bool

func (*ThreadManager) Spawn

func (tm *ThreadManager) Spawn(id, directive string, tools []string, initialMessages ...string) error

func (*ThreadManager) SpawnWithMedia

func (tm *ThreadManager) SpawnWithMedia(id, directive string, tools []string, parts []ContentPart, initialMessages ...string) error

SpawnWithMedia creates a thread and injects media parts before it starts thinking.

func (*ThreadManager) SpawnWithOpts

func (tm *ThreadManager) SpawnWithOpts(id, directive string, tools []string, opts SpawnOpts) error

SpawnWithOpts creates a thread with full options (provider, media, etc).

func (*ThreadManager) StartAll

func (tm *ThreadManager) StartAll()

StartAll starts Run() on all threads (and their children) that were spawned with DeferRun. Used after batch-respawning persisted threads so parents see their children before thinking.

func (*ThreadManager) Update

func (tm *ThreadManager) Update(id, name, directive string, tools []string) error

Update changes a thread's directive and/or tools. Rebuilds the system prompt immediately.

type ThreadMessageData

type ThreadMessageData struct {
	From    string `json:"from"`
	To      string `json:"to"`
	Message string `json:"message"`
}

type ThreadRenamedData

type ThreadRenamedData struct {
	OldID    string `json:"old_id"`
	NewID    string `json:"new_id"`
	Name     string `json:"name,omitempty"`
	ParentID string `json:"parent_id,omitempty"`
}

ThreadRenamedData fires when update changes a thread's display name or its immutable id. The dashboard uses old_id to swap its record for the new identity (id changes are rare but legal). When only Name changed, OldID == NewID.

type ThreadSpawnData

type ThreadSpawnData struct {
	ParentID  string   `json:"parent_id"`
	Directive string   `json:"directive"`
	Tools     []string `json:"tools"`
}

type TokenUsage

type TokenUsage struct {
	PromptTokens     int
	CachedTokens     int
	CompletionTokens int
	AudioTokens      int // realtime providers only; billed at a separate rate
}

type ToolCallData

type ToolCallData struct {
	ID     string            `json:"id,omitempty"`
	Name   string            `json:"name"`
	Args   map[string]string `json:"args,omitempty"`
	Reason string            `json:"reason,omitempty"`
}

type ToolDef

type ToolDef struct {
	Name        string
	Description string                                    // human-readable
	Syntax      string                                    // example usage
	Rules       string                                    // usage rules for the prompt
	Core        bool                                      // always in prompt (pace, send, done, evolve, search_tools)
	MainOnly    bool                                      // only for main thread (spawn, kill)
	ThreadOnly  bool                                      // only for sub-threads, not main (reply)
	SystemOnly  bool                                      // only for system threads (unconscious)
	MCP         bool                                      // provided by an MCP server — hidden from the per-turn tool list until activated (search_tools / spawn preload / BM25 preload)
	MCPServer   string                                    // name of the MCP server that provides this tool
	Handler     func(args map[string]string) ToolResponse // nil = handled inline by tool handler
	InputSchema map[string]any                            // JSON Schema for native tool calling (nil = auto-generated from Syntax)
}

ToolDef defines a tool available to threads.

type ToolHandler

type ToolHandler func(t *Thinker, calls []toolCall, consumed []string) (replies []string, toolNames []string, results []ToolResult)

ToolHandler processes parsed tool calls from a thought. Returns replies, tool names logged, and tool results for inline-handled tools. consumed contains the events that were consumed this iteration (for context).

type ToolIndex

type ToolIndex struct {
	// contains filtered or unexported fields
}

ToolIndex catalogs every MCP tool known to the process by metadata (server, name, description, schema, no_spawn) and supports cheap keyword search. It is the search surface for `search_tools` and the per-turn preload, replacing the old main-access/catalog split.

The index is a metadata mirror of what's in ToolRegistry. The registry remains the source of truth for handlers and dispatch; the index is just a fast read-side view optimised for ranking.

Why a separate structure instead of querying the registry directly:

  • The registry stores tools by name, not by server; per-server enumeration (for spawn-time preload) would mean an O(N) scan every time.
  • Search wants tokenized text; precomputing it once at add-time keeps the per-query cost to a sort.
  • The registry will eventually want to evict tools (uninstall an app); a separate index keeps that bookkeeping local.

func NewToolIndex

func NewToolIndex() *ToolIndex

NewToolIndex returns an empty index.

func (*ToolIndex) Add

func (ix *ToolIndex) Add(server string, tools []mcpToolDef, noSpawn bool)

Add registers a server's tools. Replaces any prior entries for the same server name so reconnect or hot-reload semantics work cleanly.

func (*ToolIndex) AllNames

func (ix *ToolIndex) AllNames(allowNoSpawn bool) []string

AllNames returns every indexed tool's fully-qualified name. Backs the APTEVA_EAGER_TOOLS escape hatch, which makes the whole attached surface visible every turn (the pre-discovery behaviour). When allowNoSpawn is false, no_spawn entries are excluded — a sub-thread in eager mode still must not see gateway/channels tools.

func (*ToolIndex) Count

func (ix *ToolIndex) Count() int

Count returns the number of tools currently indexed. Backs the APTEVA_TOOL_SEARCH=auto decision: below a threshold the surface is small enough that eager-loading beats the search round-trip.

func (*ToolIndex) Get

func (ix *ToolIndex) Get(name string) (IndexEntry, bool)

Get returns the entry for a fully-qualified tool name, if present.

func (*ToolIndex) Remove

func (ix *ToolIndex) Remove(server string)

Remove drops every entry for the named server. Used when an app uninstalls or an MCP disconnects.

func (*ToolIndex) Search

func (ix *ToolIndex) Search(query string, k int, allowNoSpawn bool) []IndexEntry

Search returns up to k entries matching the query, ranked. When allowNoSpawn is false, no_spawn entries are filtered out — that path is used from sub-threads, which must not discover gateway or channels tools they have no business calling.

func (*ToolIndex) Servers

func (ix *ToolIndex) Servers() []string

Servers returns every server name currently indexed.

func (*ToolIndex) ToolCountByServer

func (ix *ToolIndex) ToolCountByServer() map[string]int

ToolCountByServer returns name → count of indexed tools. Used by the system prompt's [AVAILABLE MCP SERVERS] block so the LLM sees which servers exist and how many tools each contributes without the full schemas appearing in context.

func (*ToolIndex) ToolsForServer

func (ix *ToolIndex) ToolsForServer(server string) []string

ToolsForServer returns every tool name a server contributes. Used by spawn-time preload (SpawnOpts.MCPNames) to seed a child thread's activeTools with the full surface of the listed servers.

type ToolRegistry

type ToolRegistry struct {
	// contains filtered or unexported fields
}

ToolRegistry holds all tool definitions.

func NewToolRegistry

func NewToolRegistry(apiKey string) *ToolRegistry

func (*ToolRegistry) AllToolNames

func (tr *ToolRegistry) AllToolNames() []string

AllToolNames returns all non-core tool names (for spawn docs).

func (*ToolRegistry) AllTools

func (tr *ToolRegistry) AllTools() []*ToolDef

AllTools returns all tool definitions for display.

func (*ToolRegistry) CoreDocs

func (tr *ToolRegistry) CoreDocs(includeMainOnly bool, includeSystemOnly ...bool) string

CoreDocs returns documentation for core tools, always included in prompts.

func (*ToolRegistry) CoreDocsSummary

func (tr *ToolRegistry) CoreDocsSummary(includeMainOnly bool, includeSystemOnly ...bool) string

CoreDocsSummary returns a one-line summary of core tool names, sized for providers that receive full schemas via NativeTools in their `tools[]` payload. Emitting the full prose here (see CoreDocs) duplicates every tool's Description + Rules in the system prompt — ~5k extra input chars per iteration on a typical main thread.

Callers that target providers WITHOUT native-tool support should keep using CoreDocs: those providers only see the prose and need the rules in the system prompt to behave.

Ordering matches CoreDocs so the two agree when comparing, and the block is prefixed with the same marker so the composition breakdown still identifies it as the "core_tools" segment.

func (*ToolRegistry) Count

func (tr *ToolRegistry) Count() int

Count returns the total number of registered tools.

func (*ToolRegistry) Counts

func (tr *ToolRegistry) Counts() (core, rag, total int)

Counts returns core, discoverable (RAG), and total tool counts.

func (*ToolRegistry) Dispatch

func (tr *ToolRegistry) Dispatch(name string, args map[string]string) (ToolResponse, bool)

Dispatch executes a tool by name if it has a Handler. Returns response and whether it was handled.

func (*ToolRegistry) Get

func (tr *ToolRegistry) Get(name string) *ToolDef

func (*ToolRegistry) NativeTools

func (tr *ToolRegistry) NativeTools(allowlist, active map[string]bool) []NativeTool

NativeTools returns tool definitions for the LLM provider API.

Visibility model:

  • Core / non-MCP tools: always visible unless excluded by an allowlist (sub-thread restriction) or by role flags (MainOnly / ThreadOnly / SystemOnly applied at caller via RetrieveTools — this method trusts what's in the registry).
  • MCP tools (ToolDef.MCP == true): hidden by default — they only appear when their name is in `active`. That's how the "agent-driven discovery" model works: spawn-time MCPNames preload, search_tools, and per-turn BM25 preload all push names into the active set; nothing else gets MCP tools on the wire.

allowlist is the boot-time per-thread allowlist (sub-threads pass their tool set; main passes nil). active is the live per-turn set of MCP tools the thread has surfaced for use. Either argument may be nil.

func (*ToolRegistry) Register

func (tr *ToolRegistry) Register(tool *ToolDef)

func (*ToolRegistry) RemoveByMCPServer

func (tr *ToolRegistry) RemoveByMCPServer(serverName string)

RemoveByMCPServer removes all tools provided by the named MCP server.

type ToolResponse

type ToolResponse struct {
	Text  string // text result (always present)
	Image []byte // optional image (screenshot etc.) — sent as part of tool result to LLM
}

ToolResponse is the return value from a tool handler.

type ToolResult

type ToolResult struct {
	CallID  string `json:"call_id"`
	Content string `json:"content"`         // text result
	Image   []byte `json:"image,omitempty"` // optional image (screenshot etc.)
	IsError bool   `json:"is_error,omitempty"`
}

ToolResult is sent back to the provider after executing a tool.

type ToolResultData

type ToolResultData struct {
	ID         string `json:"id,omitempty"`
	Name       string `json:"name"`
	DurationMs int64  `json:"duration_ms"`
	Success    bool   `json:"success"`
	Result     string `json:"result,omitempty"`
}

type Usage

type Usage struct {
	PromptTokens        int                  `json:"prompt_tokens"`
	CompletionTokens    int                  `json:"completion_tokens"`
	TotalTokens         int                  `json:"total_tokens"`
	PromptTokensDetails *PromptTokensDetails `json:"prompt_tokens_details,omitempty"`
}

Directories

Path Synopsis
cmd
apteva-core command
apteva-core — agent runtime binary.
apteva-core — agent runtime binary.
mcps
ads command
MCP server for ad budget monitoring and cost-per-lead tracking.
MCP server for ad budget monitoring and cost-per-lead tracking.
codebase command
MCP server for file-based code read/write and test execution.
MCP server for file-based code read/write and test execution.
files command
MCP server for file fetching, deduplication, and CSV parsing.
MCP server for file fetching, deduplication, and CSV parsing.
market command
MCP server for simulated market data, portfolio management, and trading.
MCP server for simulated market data, portfolio management, and trading.
metrics command
MCP server for simulated time-series monitoring and alerting.
MCP server for simulated time-series monitoring and alerting.
pkg
computer
Package computer defines the Computer interface for screen-based environments.
Package computer defines the Computer interface for screen-based environments.
Package testkit provides a thin HTTP harness for writing phased, real-LLM tests against a live apteva-server.
Package testkit provides a thin HTTP harness for writing phased, real-LLM tests against a live apteva-server.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL