agenticloop

package
v1.0.0-alpha.24 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2026 License: MIT Imports: 25 Imported by: 0

README

agentic-loop

Loop orchestrator component for the agentic processing system.

Overview

The agentic-loop component orchestrates autonomous agent execution by managing the lifecycle of agentic loops. It coordinates communication between the model processor (LLM calls) and tools processor (tool execution), tracks state through a 10-state machine, supports signal handling for user control, manages context memory with automatic compaction, and captures complete execution trajectories for observability.

Architecture

                         ┌─────────────────┐
    agent.task.*    ────►│                 │────► agent.request.*
                         │  agentic-loop   │
    agent.response.>◄────│                 │◄──── (from model)
                         │                 │
    tool.result.>   ────►│                 │────► tool.execute.*
                         │                 │
    agent.signal.*  ────►│                 │────► agent.complete.*
                         │                 │
                         │                 │────► agent.context.compaction.*
                         └────────┬────────┘
                                  │
                         ┌────────┴────────┐
                         │    NATS KV      │
                         │  AGENT_LOOPS    │
                         │  AGENT_TRAJ...  │
                         └─────────────────┘

Features

  • State Machine: 10-state lifecycle with signal-related states
  • Signal Handling: Cancel, pause, resume, and approval signals
  • Context Management: Automatic compaction and GC for long-running loops
  • Tool Coordination: Tracks pending tool calls, aggregates results
  • Trajectory Capture: Records complete execution paths for debugging
  • Iteration Guards: Configurable max iterations to prevent runaway loops
  • Architect/Editor Split: Automatic spawning of editor from architect
  • Rules Integration: Enriched completion events for rules-based orchestration

Configuration

{
  "type": "processor",
  "name": "agentic-loop",
  "enabled": true,
  "config": {
    "max_iterations": 20,
    "timeout": "120s",
    "stream_name": "AGENT",
    "loops_bucket": "AGENT_LOOPS",
    "trajectories_bucket": "AGENT_TRAJECTORIES",
    "context": {
      "enabled": true,
      "compact_threshold": 0.60,
      "tool_result_max_age": 3,
      "headroom_tokens": 6400,
      "model_limits": {
        "gpt-4o": 128000,
        "gpt-4o-mini": 128000,
        "claude-sonnet": 200000,
        "claude-opus": 200000,
        "default": 128000
      }
    },
    "ports": {
      "inputs": [
        {"name": "agent.task", "type": "jetstream", "subject": "agent.task.*", "stream_name": "AGENT"},
        {"name": "agent.response", "type": "jetstream", "subject": "agent.response.>", "stream_name": "AGENT"},
        {"name": "tool.result", "type": "jetstream", "subject": "tool.result.>", "stream_name": "AGENT"},
        {"name": "agent.signal", "type": "jetstream", "subject": "agent.signal.*", "stream_name": "AGENT"}
      ],
      "outputs": [
        {"name": "agent.request", "type": "jetstream", "subject": "agent.request.*", "stream_name": "AGENT"},
        {"name": "tool.execute", "type": "jetstream", "subject": "tool.execute.*", "stream_name": "AGENT"},
        {"name": "agent.complete", "type": "jetstream", "subject": "agent.complete.*", "stream_name": "AGENT"},
        {"name": "agent.context.compaction", "type": "jetstream", "subject": "agent.context.compaction.*", "stream_name": "AGENT"}
      ]
    }
  }
}
Configuration Options
Option Type Default Description
max_iterations int 20 Maximum loop iterations before failure (1-1000)
timeout string "120s" Loop execution timeout
stream_name string "AGENT" JetStream stream name
consumer_name_suffix string "" Suffix for consumer names (for testing)
loops_bucket string "AGENT_LOOPS" KV bucket for loop state
trajectories_bucket string "AGENT_TRAJECTORIES" KV bucket for trajectories
context object (defaults) Context management configuration
ports object (defaults) Port configuration
Context Configuration
Option Type Default Description
enabled bool true Enable context memory management
compact_threshold float 0.60 Trigger compaction at this utilization (0.01-1.0)
tool_result_max_age int 3 Evict tool results older than N iterations
headroom_tokens int 6400 Reserve tokens for new content
model_limits map (defaults) Token limits per model name

Ports

Inputs
Name Type Subject Description
agent.task jetstream agent.task.* Task requests from external systems
agent.response jetstream agent.response.> Model responses from agentic-model
tool.result jetstream tool.result.> Tool results from agentic-tools
agent.signal jetstream agent.signal.* Control signals (cancel, pause, resume)
Outputs
Name Type Subject Description
agent.request jetstream agent.request.* Model requests to agentic-model
tool.execute jetstream tool.execute.* Tool calls to agentic-tools
agent.complete jetstream agent.complete.* Loop completion events
agent.context.compaction jetstream agent.context.compaction.* Context compaction events
KV Write
Name Bucket Key Pattern Description
loops AGENT_LOOPS {loop_id} Loop entity state
loops AGENT_LOOPS COMPLETE_{loop_id} Completion state for rules engine
trajectories AGENT_TRAJECTORIES {loop_id} Execution trajectories

State Machine

exploring → planning → architecting → executing → reviewing → complete
     ↑          ↑            ↑             ↑           ↑        ↘ failed
     └──────────┴────────────┴─────────────┴───────────┘         ↘ cancelled
                                                                  ↘ paused
                                                                   ↘ awaiting_approval
States
State Terminal Description
exploring No Initial state, gathering information
planning No Developing approach
architecting No Designing solution
executing No Implementing solution
reviewing No Validating results
complete Yes Successfully finished
failed Yes Failed due to error or max iterations
cancelled Yes Cancelled by user signal
paused No Paused by user signal, can resume
awaiting_approval No Waiting for user approval

States are fluid checkpoints - loops can transition backward except from terminal states.

Signal Handling

The loop accepts control signals via the agent.signal.* input port.

Signal Message Format
{
  "signal_id": "sig_abc123",
  "type": "cancel",
  "loop_id": "loop_456",
  "user_id": "user_789",
  "channel_type": "cli",
  "channel_id": "session_001",
  "payload": null,
  "timestamp": "2024-01-15T10:30:00Z"
}
Signal Types
Type Description Resulting State
cancel Stop execution immediately cancelled
pause Pause at next checkpoint paused
resume Continue paused loop (previous state)
approve Approve pending result complete
reject Reject with optional reason failed
feedback Add feedback without decision (no change)
retry Retry failed loop exploring

Context Management

The loop includes automatic context memory management to handle long-running conversations.

Context Regions

Messages are organized into priority regions (lower priority evicted first):

  1. tool_results (priority 1) - Tool execution results, GC'd by age
  2. recent_history (priority 2) - Recent conversation messages
  3. hydrated_context (priority 3) - Retrieved context from memory
  4. compacted_history (priority 4) - Summarized old conversation
  5. system_prompt (priority 5) - Never evicted
Context Events

Published to agent.context.compaction.*:

{
  "type": "compaction_starting",
  "loop_id": "loop_123",
  "iteration": 5,
  "utilization": 0.65
}
{
  "type": "compaction_complete",
  "loop_id": "loop_123",
  "iteration": 5,
  "tokens_saved": 2500,
  "summary": "Discussed authentication implementation..."
}
{
  "type": "gc_complete",
  "loop_id": "loop_123",
  "iteration": 6
}

KV Storage

AGENT_LOOPS

Stores LoopEntity as JSON:

{
  "id": "loop_123",
  "task_id": "task_456",
  "state": "executing",
  "role": "general",
  "model": "gpt-4",
  "iterations": 3,
  "max_iterations": 20,
  "started_at": "2024-01-15T10:30:00Z",
  "timeout_at": "2024-01-15T10:32:00Z",
  "parent_loop_id": "",
  "pause_requested": false,
  "pause_requested_by": "",
  "state_before_pause": "",
  "cancelled_by": "",
  "cancelled_at": null,
  "user_id": "user_789",
  "channel_type": "cli",
  "channel_id": "session_001"
}
COMPLETE_{loopID}

Written when a loop completes, for rules engine consumption:

{
  "loop_id": "loop_123",
  "task_id": "task_456",
  "outcome": "success",
  "role": "architect",
  "result": "Designed authentication system with JWT...",
  "model": "gpt-4",
  "iterations": 3,
  "parent_loop": ""
}
AGENT_TRAJECTORIES

Stores Trajectory as JSON:

{
  "loop_id": "loop_123",
  "start_time": "2024-01-15T10:30:00Z",
  "end_time": "2024-01-15T10:31:45Z",
  "steps": [...],
  "outcome": "complete",
  "total_tokens_in": 1500,
  "total_tokens_out": 800,
  "duration": 105000
}

Message Formats

TaskMessage (Input)
{
  "loop_id": "optional-custom-id",
  "task_id": "task_123",
  "role": "general",
  "model": "gpt-4",
  "prompt": "Analyze this code for bugs"
}
Completion Event (Output)
{
  "loop_id": "loop_123",
  "task_id": "task_456",
  "outcome": "success",
  "role": "architect",
  "result": "Designed authentication system...",
  "model": "gpt-4",
  "iterations": 3,
  "parent_loop": ""
}

Rules/Workflow Integration

The loop integrates with the rules engine for orchestration:

  1. On completion, writes COMPLETE_{loopID} key to KV
  2. Rules engine watches COMPLETE_* keys
  3. Rules can trigger follow-up actions (e.g., spawn editor when architect completes)
Architect/Editor Pattern
1. Task arrives with role="architect"
2. Architect loop executes and produces a plan
3. On completion, COMPLETE_{loopID} written with role="architect"
4. Rule matches COMPLETE_* where role="architect"
5. Rule spawns new loop with role="editor", parent_loop={loopID}
6. Editor receives architect's output as context

agentic-memory Integration

The loop publishes context events that agentic-memory consumes:

  • compaction_starting - agentic-memory extracts facts before compaction
  • compaction_complete - agentic-memory injects recovered context
  • gc_complete - Logged for observability

Troubleshooting

Loop stuck waiting for response
  • Check that agentic-model is running and subscribed
  • Verify AGENT stream exists with correct subjects
  • Check model endpoint is accessible
Max iterations reached
  • Increase max_iterations for complex tasks
  • Check if agent is stuck in tool call loop
  • Review trajectory for repeated patterns
Missing tool results
  • Verify agentic-tools is running
  • Check tool executor is registered
  • Ensure tool name matches exactly
Context compaction issues
  • Check compact_threshold is appropriate for workload
  • Verify model registry has a summarization-capable endpoint or a large-context model
  • Review model_limits for your model
Signal not processed
  • Verify signal published to correct subject: agent.signal.{loop_id}
  • Check loop is not in terminal state (complete/failed/cancelled)
  • Ensure signal message format is correct

Documentation

Overview

Package agenticloop provides the agentic loop orchestrator component. It manages the state machine for agentic loops, coordinating between model calls and tool executions while capturing execution trajectories.

Package agenticloop provides the loop orchestrator for the SemStreams agentic system.

Overview

The agentic-loop processor orchestrates autonomous agent execution by managing the lifecycle of agentic loops. It coordinates communication between the model processor (LLM calls) and tools processor (tool execution), tracks state through a 10-state machine, supports signal handling for user control, manages context memory with automatic compaction, and captures complete execution trajectories for observability.

This is the central component of the agentic system - it receives task requests, routes messages between model and tools, handles iteration limits, processes control signals, manages context memory, and publishes completion events.

Architecture

The loop orchestrator sits at the center of the agentic component family:

                  ┌─────────────────┐
agent.task.*  ──▶ │                 │ ──▶ agent.request.*
                  │  agentic-loop   │
agent.response.>◀─│   (this pkg)    │◀── agent.response.*
                  │                 │
tool.result.>  ──▶│                 │ ──▶ tool.execute.*
                  │                 │
agent.signal.* ──▶│                 │ ──▶ agent.complete.*
                  │                 │
                  │                 │ ──▶ agent.context.compaction.*
                  └────────┬────────┘
                           │
                  ┌────────┴────────┐
                  │   NATS KV       │
                  │  AGENT_LOOPS    │
                  │  AGENT_TRAJ...  │
                  └─────────────────┘

Message Flow

A typical loop execution follows this pattern:

  1. External system publishes TaskMessage to agent.task.*
  2. Loop creates LoopEntity, starts Trajectory, publishes AgentRequest to agent.request.*
  3. agentic-model processes request, publishes AgentResponse to agent.response.*
  4. Loop receives response: - If status="tool_call": publishes ToolCall to tool.execute.* for each tool - If status="complete": publishes completion to agent.complete.* - If status="error": marks loop as failed
  5. agentic-tools executes tools, publishes ToolResult to tool.result.*
  6. Loop receives tool results, when all complete: increments iteration, sends next request
  7. Cycle repeats until complete, failed, or max iterations reached

State Machine

Loops progress through ten states defined in the agentic package:

exploring → planning → architecting → executing → reviewing → complete
     ↑          ↑            ↑             ↑           ↑        ↘ failed
     └──────────┴────────────┴─────────────┴───────────┘         ↘ cancelled
                                                                  ↘ paused
                                                                   ↘ awaiting_approval

States:

  • exploring: Initial state, gathering information
  • planning: Developing approach
  • architecting: Designing solution
  • executing: Implementing solution
  • reviewing: Validating results
  • complete: Successfully finished (terminal)
  • failed: Failed due to error or max iterations (terminal)
  • cancelled: Cancelled by user signal (terminal)
  • paused: Paused by user signal, can resume
  • awaiting_approval: Waiting for user approval

States are fluid checkpoints - the loop can transition backward (e.g., from executing back to exploring) to support agent rethinking. Only terminal states (complete, failed, cancelled) prevent further transitions.

State transitions are managed by the LoopManager and persisted to NATS KV.

Signal Handling

The loop accepts control signals via the agent.signal.* input port:

signal := agentic.UserSignal{
    SignalID:    "sig_abc123",
    Type:        "cancel",  // cancel, pause, resume, approve, reject, feedback, retry
    LoopID:      "loop_456",
    UserID:      "user_789",
    ChannelType: "cli",
    ChannelID:   "session_001",
    Timestamp:   time.Now(),
}

Signal types and their effects:

  • cancel: Stop execution immediately, transition to cancelled state
  • pause: Pause at next checkpoint, transition to paused state
  • resume: Continue paused loop, restore previous state
  • approve: Approve pending result, transition to complete
  • reject: Reject with optional reason, transition to failed
  • feedback: Add feedback without decision, no state change
  • retry: Retry failed loop, transition to exploring

Context Management

The loop includes automatic context memory management to handle long-running conversations that approach model token limits.

Context is organized into priority regions (lower priority evicted first):

  1. tool_results (priority 1) - Tool execution results, GC'd by age
  2. recent_history (priority 2) - Recent conversation messages
  3. hydrated_context (priority 3) - Retrieved context from memory
  4. compacted_history (priority 4) - Summarized old conversation
  5. system_prompt (priority 5) - Never evicted

Configuration:

context := agenticloop.ContextConfig{
    Enabled:            true,
    CompactThreshold:   0.60,  // Trigger compaction at 60% utilization
    ToolResultMaxAge:   3,     // Evict tool results older than 3 iterations
    HeadroomTokens:     6400,  // Reserve tokens for new content
}

Model context limits are resolved from the unified model registry (component.Dependencies.ModelRegistry). If a model is not found in the registry, DefaultContextLimit (128000) is used as fallback.

Context events are published to agent.context.compaction.*:

  • compaction_starting: Context approaching limit, compaction starting
  • compaction_complete: Compaction finished, includes tokens saved
  • gc_complete: Tool result garbage collection completed

Component Architecture

The package is organized into three main components:

**LoopManager** - Manages loop entity lifecycle:

manager := NewLoopManager()

// Create a loop
loopID, err := manager.CreateLoop("task_123", "general", "gpt-4", 20)

// State transitions
err = manager.TransitionLoop(loopID, agentic.LoopStateExecuting)

// Iteration tracking
err = manager.IncrementIteration(loopID)

// Pending tool management
manager.AddPendingTool(loopID, "call_001")
manager.RemovePendingTool(loopID, "call_001")
allDone := manager.AllToolsComplete(loopID)

// Context management
cm := manager.GetContextManager(loopID)

**TrajectoryManager** - Captures execution traces:

trajManager := NewTrajectoryManager()

// Start trajectory for a loop
trajectory, err := trajManager.StartTrajectory(loopID)

// Add steps (model calls, tool calls)
trajManager.AddStep(loopID, agentic.TrajectoryStep{
    Timestamp: time.Now(),
    StepType:  "model_call",
    TokensIn:  150,
    TokensOut: 200,
})

// Complete trajectory
trajectory, err = trajManager.CompleteTrajectory(loopID, "complete")

**MessageHandler** - Routes and processes messages:

handler := NewMessageHandler(config)

// Handle incoming task
result, err := handler.HandleTask(ctx, TaskMessage{
    TaskID: "task_123",
    Role:   "general",
    Model:  "gpt-4",
    Prompt: "Analyze this code for bugs",
})

// Handle model response
result, err = handler.HandleModelResponse(ctx, loopID, response)

// Handle tool result
result, err = handler.HandleToolResult(ctx, loopID, toolResult)

Configuration

The processor is configured via JSON:

{
    "max_iterations": 20,
    "timeout": "120s",
    "stream_name": "AGENT",
    "loops_bucket": "AGENT_LOOPS",
    "trajectories_bucket": "AGENT_TRAJECTORIES",
    "context": {
        "enabled": true,
        "compact_threshold": 0.60,
        "tool_result_max_age": 3,
        "headroom_tokens": 6400,
    },
    "ports": {
        "inputs": [...],
        "outputs": [...]
    }
}

Configuration fields:

  • max_iterations: Maximum loop iterations before failure (default: 20, range: 1-1000)
  • timeout: Loop execution timeout as duration string (default: "120s")
  • stream_name: JetStream stream name for agentic messages (default: "AGENT")
  • loops_bucket: NATS KV bucket for loop state (default: "AGENT_LOOPS")
  • trajectories_bucket: NATS KV bucket for trajectories (default: "AGENT_TRAJECTORIES")
  • consumer_name_suffix: Optional suffix for JetStream consumer names (for testing)
  • context: Context management configuration (see ContextConfig)
  • ports: Port configuration for inputs and outputs

Ports

Input ports (JetStream consumers):

  • agent.task: Task requests from external systems (subject: agent.task.*)
  • agent.response: Model responses from agentic-model (subject: agent.response.>)
  • tool.result: Tool results from agentic-tools (subject: tool.result.>)
  • agent.signal: Control signals for loops (subject: agent.signal.*)

Output ports (JetStream publishers):

  • agent.request: Model requests to agentic-model (subject: agent.request.*)
  • tool.execute: Tool execution requests to agentic-tools (subject: tool.execute.*)
  • agent.complete: Loop completion events (subject: agent.complete.*)
  • agent.context.compaction: Context compaction events (subject: agent.context.compaction.*)

KV write ports:

  • loops: Loop entity state (bucket: AGENT_LOOPS)
  • trajectories: Trajectory data (bucket: AGENT_TRAJECTORIES)

KV Storage

Loop state and trajectories are persisted to NATS KV for durability and queryability:

**AGENT_LOOPS bucket**: Stores LoopEntity as JSON, keyed by loop ID

{
    "id": "loop_123",
    "task_id": "task_456",
    "state": "executing",
    "role": "general",
    "model": "gpt-4",
    "iterations": 3,
    "max_iterations": 20,
    "parent_loop_id": "",
    "pause_requested": false,
    "user_id": "user_789",
    "channel_type": "cli",
    "channel_id": "session_001"
}

**COMPLETE_{loopID}**: Written when a loop completes, for rules engine consumption

{
    "loop_id": "loop_123",
    "task_id": "task_456",
    "outcome": "success",
    "role": "architect",
    "result": "Designed authentication system...",
    "model": "gpt-4",
    "iterations": 3,
    "parent_loop": ""
}

**AGENT_TRAJECTORIES bucket**: Stores Trajectory as JSON, keyed by loop ID

{
    "loop_id": "loop_123",
    "start_time": "2024-01-15T10:30:00Z",
    "end_time": "2024-01-15T10:31:45Z",
    "steps": [...],
    "outcome": "complete",
    "total_tokens_in": 1500,
    "total_tokens_out": 800,
    "duration": 105000
}

KV buckets are created automatically if they don't exist during component startup.

Rules/Workflow Integration

The loop integrates with the rules engine for orchestration:

  1. On completion, writes COMPLETE_{loopID} key to KV
  2. Rules engine watches COMPLETE_* keys
  3. Rules can trigger follow-up actions (e.g., spawn editor when architect completes)

Architect/Editor pattern:

  1. Task arrives with role="architect"
  2. Architect loop executes and produces a plan
  3. On completion, COMPLETE_{loopID} written with role="architect"
  4. Rule matches COMPLETE_* where role="architect"
  5. Rule spawns new loop with role="editor", parent_loop={loopID}
  6. Editor receives architect's output as context

agentic-memory Integration

The loop publishes context events that agentic-memory consumes:

  • compaction_starting: agentic-memory extracts facts before compaction
  • compaction_complete: agentic-memory injects recovered context
  • gc_complete: Logged for observability

Quick Start

Create and start the component:

config := agenticloop.Config{
    MaxIterations:      20,
    Timeout:            "120s",
    StreamName:         "AGENT",
    LoopsBucket:        "AGENT_LOOPS",
    TrajectoriesBucket: "AGENT_TRAJECTORIES",
    Context:            agenticloop.DefaultContextConfig(),
}

rawConfig, _ := json.Marshal(config)
comp, err := agenticloop.NewComponent(rawConfig, deps)

lc := comp.(component.LifecycleComponent)
lc.Initialize()
lc.Start(ctx)
defer lc.Stop(5 * time.Second)

Publish a task:

task := agenticloop.TaskMessage{
    TaskID: "analyze_code",
    Role:   "general",
    Model:  "gpt-4",
    Prompt: "Review main.go for security issues",
}
taskData, _ := json.Marshal(task)
natsClient.PublishToStream(ctx, "agent.task.review", taskData)

Thread Safety

The LoopManager, TrajectoryManager, and ContextManager are thread-safe, using RWMutex for concurrent access. Multiple goroutines can safely:

  • Create and manage different loops concurrently
  • Read loop state while other loops are being modified
  • Track pending tools across concurrent tool executions
  • Add messages to context regions

The Component itself is not designed for concurrent Start/Stop calls.

Error Handling

Errors are handled at multiple levels:

  • Validation errors: Returned immediately from handlers
  • State transition errors: Logged, loop may continue or fail depending on severity
  • Max iterations: Loop transitions to failed state, not returned as error
  • KV persistence errors: Logged but don't block message processing
  • Context cancellation: Propagated up, handlers check ctx.Err() early

Observability

The component provides observability through:

  • Structured logging (slog) for all significant events
  • Trajectory capture for complete execution audit trails
  • Context events for memory management visibility
  • Health status via Health() method
  • Flow metrics via DataFlow() method

Testing

For testing, use the ConsumerNameSuffix config option to create unique JetStream consumer names per test:

config := agenticloop.Config{
    StreamName:         "AGENT",
    ConsumerNameSuffix: "test-" + t.Name(),
    // ...
}

This prevents consumer name conflicts when running tests in parallel.

Limitations

Current limitations:

  • No streaming support for partial responses
  • Trajectory size limited by NATS KV (1MB default)
  • No built-in retry for failed tool executions
  • Context summarization requires LLM call (cost consideration)

See Also

Related packages:

  • agentic: Shared types (LoopEntity, AgentRequest, UserSignal, etc.)
  • processor/agentic-model: LLM endpoint integration
  • processor/agentic-tools: Tool execution framework
  • processor/agentic-memory: Graph-backed agent memory
  • processor/agentic-dispatch: User message routing
  • processor/workflow: Multi-step orchestration

Package agenticloop provides Prometheus metrics for agentic-loop component.

Index

Constants

View Source
const DefaultContextLimit = 128000

DefaultContextLimit is the fallback context window size when the model is unknown.

Variables

This section is empty.

Functions

func NewComponent

func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)

NewComponent creates a new agentic-loop component

func Register

func Register(registry *component.Registry) error

Register registers the agentic-loop component factory with the registry

Types

type BoidHandler

type BoidHandler struct {
	// contains filtered or unexported fields
}

BoidHandler manages agent position tracking and steering signal processing for Boids-style local coordination rules.

func NewBoidHandler

func NewBoidHandler(positionsBucket jetstream.KeyValue, logger *slog.Logger) *BoidHandler

NewBoidHandler creates a new boid handler with the default signal TTL.

func NewBoidHandlerWithTTL

func NewBoidHandlerWithTTL(positionsBucket jetstream.KeyValue, logger *slog.Logger, signalTTL time.Duration) *BoidHandler

NewBoidHandlerWithTTL creates a new boid handler with a custom signal TTL.

func (*BoidHandler) ApplySteeringToEntities

func (h *BoidHandler) ApplySteeringToEntities(loopID string) (prioritize, avoid []string)

ApplySteeringToEntities applies active steering signals to prioritize/deprioritize entities. Returns two lists: prioritized entities (from cohesion) and avoided entities (from separation).

func (*BoidHandler) CalculateVelocity

func (h *BoidHandler) CalculateVelocity(oldFocus, newFocus []string) float64

CalculateVelocity computes velocity based on position changes.

func (*BoidHandler) ClearSignals

func (h *BoidHandler) ClearSignals(loopID string)

ClearSignals removes all signals for a loop (called on loop completion).

func (*BoidHandler) DeletePosition

func (h *BoidHandler) DeletePosition(ctx context.Context, loopID string) error

DeletePosition removes an agent's position when the loop completes.

func (*BoidHandler) ExtractEntitiesFromContext

func (h *BoidHandler) ExtractEntitiesFromContext(content string) []string

ExtractEntitiesFromContext extracts entity IDs from context messages. Looks for "[Entity: {id}]" prefixes added by AddGraphEntityContext.

func (*BoidHandler) ExtractEntitiesFromToolResult

func (h *BoidHandler) ExtractEntitiesFromToolResult(content string) []string

ExtractEntitiesFromToolResult extracts entity IDs from tool result content. This captures which entities an agent accessed during tool execution.

func (*BoidHandler) ExtractPredicatesFromToolResult

func (h *BoidHandler) ExtractPredicatesFromToolResult(content string) []string

ExtractPredicatesFromToolResult extracts predicate patterns from tool result content. This captures which relationship types an agent is following during graph traversal. Only known predicates are returned to avoid noise from random snake_case strings.

func (*BoidHandler) FilterEntitiesBySignals

func (h *BoidHandler) FilterEntitiesBySignals(loopID string, entities []string) []string

FilterEntitiesBySignals filters a list of entities based on active steering signals. Entities in the avoid list are moved to the end, entities in prioritize list go first.

func (*BoidHandler) GetActiveSignal

func (h *BoidHandler) GetActiveSignal(loopID, signalType string) *boid.SteeringSignal

GetActiveSignal retrieves the most recent signal of a specific type for a loop.

func (*BoidHandler) GetActiveSignals

func (h *BoidHandler) GetActiveSignals(loopID string) map[string]*boid.SteeringSignal

GetActiveSignals retrieves all active signals for a loop.

func (*BoidHandler) GetAlignmentPatterns

func (h *BoidHandler) GetAlignmentPatterns(loopID string) []string

GetAlignmentPatterns returns the alignment patterns (predicates) to follow.

func (*BoidHandler) GetPosition

func (h *BoidHandler) GetPosition(ctx context.Context, loopID string) (*boid.AgentPosition, error)

GetPosition retrieves an agent's position from the KV bucket.

func (*BoidHandler) HandleSteeringSignalMessage

func (h *BoidHandler) HandleSteeringSignalMessage(ctx context.Context, data []byte, getContextManager func(loopID string) *ContextManager) string

HandleSteeringSignalMessage handles incoming boid steering signal messages from NATS. Returns the signal type if successfully processed, empty string otherwise.

func (*BoidHandler) ProcessSteeringSignal

func (h *BoidHandler) ProcessSteeringSignal(_ context.Context, signal *boid.SteeringSignal, cm *ContextManager) error

ProcessSteeringSignal processes an incoming boid steering signal. Stores the signal for use during context building and tool prioritization, and applies steering to the context manager if provided.

func (*BoidHandler) UpdatePosition

func (h *BoidHandler) UpdatePosition(ctx context.Context, pos *boid.AgentPosition) error

UpdatePosition updates an agent's position in the AGENT_POSITIONS KV bucket.

type BoidSteeringConfig

type BoidSteeringConfig struct {
	// PrioritizeEntities are entities to move earlier in context (cohesion).
	PrioritizeEntities []string

	// AvoidEntities are entities to deprioritize in context (separation).
	AvoidEntities []string

	// AlignPatterns are predicate patterns to favor (alignment).
	AlignPatterns []string
}

BoidSteeringConfig holds Boid steering signal configuration for context building.

type CompactionResult

type CompactionResult struct {
	Summary       string
	EvictedTokens int
	NewTokens     int
	Model         string // model used for summarization (empty if stub fallback)
}

CompactionResult contains the results of a compaction operation.

type Compactor

type Compactor struct {
	// contains filtered or unexported fields
}

Compactor handles context compaction operations.

func NewCompactor

func NewCompactor(config ContextConfig, opts ...CompactorOption) *Compactor

NewCompactor creates a new compactor. Variadic opts allow optional injection of a Summarizer and logger without breaking existing callers.

func (*Compactor) Compact

Compact performs compaction on the context manager. When a Summarizer is injected, it calls the LLM to generate a real summary. If the summarizer returns an error, it falls back to a stub summary and logs a warning.

func (*Compactor) ShouldCompact

func (c *Compactor) ShouldCompact(cm *ContextManager) bool

ShouldCompact delegates to the context manager's ShouldCompact.

type CompactorOption

type CompactorOption func(*Compactor)

CompactorOption is a functional option for configuring a Compactor.

func WithCompactorLogger

func WithCompactorLogger(l *slog.Logger) CompactorOption

WithCompactorLogger sets the logger used by the Compactor.

func WithModelName

func WithModelName(name string) CompactorOption

WithModelName sets the resolved model name reported in CompactionResult.

func WithSummarizer

func WithSummarizer(s Summarizer) CompactorOption

WithSummarizer injects an LLM-backed summarizer into the Compactor. When set, Compact() calls the summarizer instead of the stub fallback.

type Component

type Component struct {
	// contains filtered or unexported fields
}

Component implements the agentic-loop processor

func (*Component) ConfigSchema

func (c *Component) ConfigSchema() component.ConfigSchema

ConfigSchema returns the configuration schema

func (*Component) DataFlow

func (c *Component) DataFlow() component.FlowMetrics

DataFlow returns current data flow metrics

func (*Component) Health

func (c *Component) Health() component.HealthStatus

Health returns current health status

func (*Component) Initialize

func (c *Component) Initialize() error

Initialize prepares the component (no-op for this component)

func (*Component) InputPorts

func (c *Component) InputPorts() []component.Port

InputPorts returns input port definitions

func (*Component) Meta

func (c *Component) Meta() component.Metadata

Meta returns component metadata

func (*Component) OutputPorts

func (c *Component) OutputPorts() []component.Port

OutputPorts returns output port definitions

func (*Component) Phase

func (c *Component) Phase() string

Phase returns the workflow phase this component represents.

func (*Component) Start

func (c *Component) Start(ctx context.Context) error

Start starts the component. The context is used for cancellation during startup operations.

func (*Component) StateManager

func (c *Component) StateManager() *workflow.StateManager

StateManager returns nil since agentic-loop manages its own state internally. Workflows interact with agentic-loop via events and KV watches, not direct state access.

func (*Component) Stop

func (c *Component) Stop(timeout time.Duration) error

Stop stops the component within the given timeout.

func (*Component) WorkflowID

func (c *Component) WorkflowID() string

WorkflowID returns empty string since this component handles multiple workflows dynamically. The workflow context is tracked per-loop via WorkflowSlug/WorkflowStep fields.

type Config

type Config struct {
	MaxIterations        int                   `` /* 153-byte string literal not displayed */
	Timeout              string                `` /* 138-byte string literal not displayed */
	StreamName           string                `json:"stream_name" schema:"type:string,description:JetStream stream name,default:AGENT,category:advanced"`
	ConsumerNameSuffix   string                `json:"consumer_name_suffix" schema:"type:string,description:Suffix for consumer names,category:advanced"`
	DeleteConsumerOnStop bool                  `` /* 157-byte string literal not displayed */
	LoopsBucket          string                `` /* 142-byte string literal not displayed */
	TrajectoriesBucket   string                `` /* 158-byte string literal not displayed */
	PositionsBucket      string                `` /* 153-byte string literal not displayed */
	BoidEnabled          bool                  `` /* 170-byte string literal not displayed */
	BoidSignalTTL        string                `` /* 160-byte string literal not displayed */
	TrajectoryDetail     string                `` /* 152-byte string literal not displayed */
	TrajectoryTTL        string                `json:"trajectory_ttl,omitempty" schema:"type:string,description:TTL for trajectory KV entries,default:24h,category:advanced"`
	TrajectoryHistory    int                   `` /* 144-byte string literal not displayed */
	Context              ContextConfig         `` /* 142-byte string literal not displayed */
	Ports                *component.PortConfig `json:"ports,omitempty" schema:"type:ports,description:Port configuration for inputs and outputs,category:basic"`
}

Config represents the configuration for the agentic-loop processor

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns the default configuration

func (Config) Validate

func (c Config) Validate() error

Validate validates the configuration

type ContextConfig

type ContextConfig struct {
	Enabled          bool    `json:"enabled" description:"Enable context memory management"`
	CompactThreshold float64 `json:"compact_threshold" description:"Utilization threshold (0.01-1.0) that triggers context compaction"`
	ToolResultMaxAge int     `json:"tool_result_max_age" description:"Maximum age in iterations before tool results are garbage collected"`
	HeadroomTokens   int     `json:"headroom_tokens" description:"Token headroom to reserve for model responses"`
	// Multi-agent context budget fields
	MaxBudgetTokens  int      `json:"max_budget_tokens,omitempty" description:"Hard token limit for context budget (overrides model limits when set)"`
	SliceOnBudget    bool     `json:"slice_on_budget,omitempty" description:"Enable context slicing when budget is exceeded"`
	PreserveEntities []string `json:"preserve_entities,omitempty" description:"Entity IDs to always keep in context during slicing"`
	EntityPriority   int      `` /* 127-byte string literal not displayed */
}

ContextConfig represents configuration for context memory management. Model context limits are resolved from the unified model registry (component.Dependencies.ModelRegistry).

func DefaultContextConfig

func DefaultContextConfig() ContextConfig

DefaultContextConfig returns the default context configuration

func (ContextConfig) Validate

func (c ContextConfig) Validate() error

Validate validates the context configuration

type ContextManager

type ContextManager struct {
	// contains filtered or unexported fields
}

ContextManager manages conversation context with memory optimization

func NewContextManager

func NewContextManager(loopID, model string, config ContextConfig, opts ...ContextManagerOption) *ContextManager

NewContextManager creates a new context manager

func (*ContextManager) AddGraphEntityContext

func (cm *ContextManager) AddGraphEntityContext(entityID string, content string) error

AddGraphEntityContext adds context from graph entities to the dedicated region

func (*ContextManager) AddMessage

func (cm *ContextManager) AddMessage(region RegionType, msg agentic.ChatMessage) error

AddMessage adds a message to a specific region

func (*ContextManager) AdvanceIteration

func (cm *ContextManager) AdvanceIteration()

AdvanceIteration moves to the next iteration. Call this after processing each loop iteration to ensure proper age tracking for GC.

func (*ContextManager) ApplyBoidSteering

func (cm *ContextManager) ApplyBoidSteering(steering BoidSteeringConfig)

ApplyBoidSteering reorders graph entity context based on Boid steering signals. This moves prioritized entities earlier and avoided entities later in the context.

func (*ContextManager) CheckBudget

func (cm *ContextManager) CheckBudget(budgetTokens int) (bool, int)

CheckBudget checks if current context is within the token budget. Returns (withinBudget, currentTokens).

func (*ContextManager) ClearGraphEntities

func (cm *ContextManager) ClearGraphEntities()

ClearGraphEntities clears the graph entities region

func (*ContextManager) GCToolResults

func (cm *ContextManager) GCToolResults(currentIteration int) int

GCToolResults garbage collects old tool results based on age. Tool results live in RegionRecentHistory (for chronological ordering with assistant messages), so this scans that region for role="tool" messages. After eviction, repairs tool pairs to ensure no orphaned assistant/tool messages.

func (*ContextManager) GetContext

func (cm *ContextManager) GetContext() []agentic.ChatMessage

GetContext returns all messages in region priority order

func (*ContextManager) GetGraphEntityTokens

func (cm *ContextManager) GetGraphEntityTokens() int

GetGraphEntityTokens returns the total tokens in the graph entities region

func (*ContextManager) GetRegionTokens

func (cm *ContextManager) GetRegionTokens(region RegionType) int

GetRegionTokens returns the total token count for a region

func (*ContextManager) ShouldCompact

func (cm *ContextManager) ShouldCompact() bool

ShouldCompact returns true if compaction should be triggered

func (*ContextManager) SliceForBudget

func (cm *ContextManager) SliceForBudget(budget int, slice ContextSlice) error

SliceForBudget removes content to fit within the budget. Uses graph-aware slicing that preserves entity context when EntityPriority is set. Boid steering signals influence eviction: AvoidEntities are evicted first.

func (*ContextManager) Utilization

func (cm *ContextManager) Utilization() float64

Utilization returns the current context utilization (0.0 to 1.0)

type ContextManagerOption

type ContextManagerOption func(*ContextManager)

ContextManagerOption is a functional option for configuring ContextManager

func WithLogger

func WithLogger(logger *slog.Logger) ContextManagerOption

WithLogger sets the logger for the ContextManager

func WithModelRegistry

func WithModelRegistry(reg model.RegistryReader) ContextManagerOption

WithModelRegistry provides registry-based model limit resolution

type ContextSlice

type ContextSlice struct {
	IncludeRegions   []RegionType // Regions to include
	ExcludeRegions   []RegionType // Regions to exclude (takes precedence)
	PreserveEntities []string     // Entity IDs to always keep

	// Boid steering configuration (from active steering signals)
	AvoidEntities []string // Entities to deprioritize (from separation signals)
}

ContextSlice defines which regions to include when slicing context

type HandlerResult

type HandlerResult struct {
	LoopID               string
	State                agentic.LoopState
	PublishedMessages    []PublishedMessage
	PendingTools         []string
	TrajectorySteps      []agentic.TrajectoryStep
	ContextEvents        []agentic.ContextEvent
	RetryScheduled       bool
	MaxIterationsReached bool
	// CompletionState contains enriched completion data for KV persistence.
	// This is populated when a loop completes and is used by component.go
	// to write to the loops bucket with key pattern COMPLETE_{loopID}.
	CompletionState *agentic.LoopCompletedEvent
}

HandlerResult contains the results of a handler operation

type LLMSummarizer

type LLMSummarizer struct {
	// contains filtered or unexported fields
}

LLMSummarizer implements Summarizer using a graph/llm.Client.

func NewLLMSummarizer

func NewLLMSummarizer(client llm.Client, logger *slog.Logger) *LLMSummarizer

NewLLMSummarizer creates a summarizer backed by an LLM client.

func (*LLMSummarizer) Summarize

func (s *LLMSummarizer) Summarize(ctx context.Context, messages []agentic.ChatMessage, maxTokens int) (string, error)

Summarize calls the LLM to generate a summary of the conversation messages.

type LoopManager

type LoopManager struct {
	// contains filtered or unexported fields
}

LoopManager manages loop entity lifecycle and state

func NewLoopManager

func NewLoopManager(opts ...LoopManagerOption) *LoopManager

NewLoopManager creates a new LoopManager

func NewLoopManagerWithConfig

func NewLoopManagerWithConfig(contextConfig ContextConfig, opts ...LoopManagerOption) *LoopManager

NewLoopManagerWithConfig creates a new LoopManager with custom context config

func (*LoopManager) AddPendingTool

func (m *LoopManager) AddPendingTool(loopID, callID string) error

AddPendingTool adds a pending tool call to the loop

func (*LoopManager) AllToolsComplete

func (m *LoopManager) AllToolsComplete(loopID string) bool

AllToolsComplete returns true if there are no pending tool calls

func (*LoopManager) CacheMetadata

func (m *LoopManager) CacheMetadata(loopID string, metadata map[string]any)

CacheMetadata stores domain context metadata for a loop (set once from task, reused for all tool calls)

func (*LoopManager) CacheTools

func (m *LoopManager) CacheTools(loopID string, tools []agentic.ToolDefinition)

CacheTools stores tool definitions for a loop (discovered once, reused for all requests)

func (*LoopManager) CancelLoop

func (m *LoopManager) CancelLoop(loopID, cancelledBy string) (agentic.LoopEntity, error)

CancelLoop atomically cancels a loop and populates completion data. Returns the updated entity for further processing, or an error if the loop cannot be cancelled (not found or already terminal).

func (*LoopManager) CreateLoop

func (m *LoopManager) CreateLoop(taskID, role, model string, maxIterations ...int) (string, error)

CreateLoop creates a new loop entity with a generated UUID

func (*LoopManager) CreateLoopWithID

func (m *LoopManager) CreateLoopWithID(loopID, taskID, role, model string, maxIterations ...int) (string, error)

CreateLoopWithID creates a new loop entity with a specific ID

func (*LoopManager) DeleteLoop

func (m *LoopManager) DeleteLoop(loopID string) error

DeleteLoop deletes a loop entity and all associated tracking data.

func (*LoopManager) ExtractLoopIDFromRequest

func (m *LoopManager) ExtractLoopIDFromRequest(requestID string) string

ExtractLoopIDFromRequest extracts the loop ID from a structured request ID. Returns empty string if the ID is not in structured format.

func (*LoopManager) ExtractLoopIDFromToolCall

func (m *LoopManager) ExtractLoopIDFromToolCall(toolCallID string) string

ExtractLoopIDFromToolCall extracts the loop ID from a structured tool call ID. Returns empty string if the ID is not in structured format.

func (*LoopManager) GenerateRequestID

func (m *LoopManager) GenerateRequestID(loopID string) string

GenerateRequestID creates a structured request ID that embeds the loop ID. Format: loopID:req:shortUUID This allows recovery of loop ID from request ID if in-memory maps are lost.

func (*LoopManager) GenerateToolCallID

func (m *LoopManager) GenerateToolCallID(loopID string) string

GenerateToolCallID creates a structured tool call ID that embeds the loop ID. Format: loopID:tool:shortUUID This allows recovery of loop ID from tool call ID if in-memory maps are lost.

func (*LoopManager) GetAndClearToolResults

func (m *LoopManager) GetAndClearToolResults(loopID string) []agentic.ToolResult

GetAndClearToolResults retrieves all accumulated tool results and clears them

func (*LoopManager) GetCachedMetadata

func (m *LoopManager) GetCachedMetadata(loopID string) map[string]any

GetCachedMetadata retrieves the cached metadata for a loop

func (*LoopManager) GetCachedTools

func (m *LoopManager) GetCachedTools(loopID string) []agentic.ToolDefinition

GetCachedTools retrieves the cached tool definitions for a loop

func (*LoopManager) GetContextManager

func (m *LoopManager) GetContextManager(loopID string) *ContextManager

GetContextManager retrieves the context manager for a loop

func (*LoopManager) GetCurrentIteration

func (m *LoopManager) GetCurrentIteration(loopID string) int

GetCurrentIteration returns the current iteration for a loop

func (*LoopManager) GetDepth

func (m *LoopManager) GetDepth(loopID string) (depth, maxDepth int, err error)

GetDepth returns the current depth and max depth for a loop

func (*LoopManager) GetLoop

func (m *LoopManager) GetLoop(loopID string) (agentic.LoopEntity, error)

GetLoop retrieves a loop entity by ID

func (*LoopManager) GetLoopForRequest

func (m *LoopManager) GetLoopForRequest(requestID string) (string, bool)

GetLoopForRequest retrieves the loop ID for a request ID

func (*LoopManager) GetLoopForRequestWithRecovery

func (m *LoopManager) GetLoopForRequestWithRecovery(requestID string) (string, bool)

GetLoopForRequestWithRecovery retrieves the loop ID for a request ID, attempting recovery from structured ID if not found in cache.

func (*LoopManager) GetLoopForToolCall

func (m *LoopManager) GetLoopForToolCall(callID string) (string, bool)

GetLoopForToolCall retrieves the loop ID for a tool call ID

func (*LoopManager) GetLoopForToolCallWithRecovery

func (m *LoopManager) GetLoopForToolCallWithRecovery(toolCallID string) (string, bool)

GetLoopForToolCallWithRecovery retrieves the loop ID for a tool call ID, attempting recovery from structured ID if not found in cache.

func (*LoopManager) GetPendingTools

func (m *LoopManager) GetPendingTools(loopID string) []string

GetPendingTools returns all pending tool calls for a loop

func (*LoopManager) GetRequestStart

func (m *LoopManager) GetRequestStart(requestID string) time.Time

GetRequestStart retrieves the start time for a model request.

func (*LoopManager) GetToolArguments

func (m *LoopManager) GetToolArguments(callID string) map[string]any

GetToolArguments retrieves a shallow copy of the arguments for a tool call ID.

func (*LoopManager) GetToolName

func (m *LoopManager) GetToolName(callID string) string

GetToolName retrieves the function name for a tool call ID.

func (*LoopManager) GetToolStart

func (m *LoopManager) GetToolStart(callID string) time.Time

GetToolStart retrieves the start time for a tool call.

func (*LoopManager) IncrementIteration

func (m *LoopManager) IncrementIteration(loopID string) error

IncrementIteration increments the loop iteration counter

func (*LoopManager) IsTimedOut

func (m *LoopManager) IsTimedOut(loopID string) bool

IsTimedOut checks if a loop has exceeded its timeout

func (*LoopManager) RemovePendingTool

func (m *LoopManager) RemovePendingTool(loopID, callID string) error

RemovePendingTool removes a pending tool call from the loop

func (*LoopManager) SetDepth

func (m *LoopManager) SetDepth(loopID string, depth, maxDepth int) error

SetDepth sets the depth tracking for a loop in the multi-agent hierarchy

func (*LoopManager) SetParentLoop

func (m *LoopManager) SetParentLoop(loopID, parentLoopID string) error

SetParentLoop sets the parent loop ID for tracking architect->editor relationships

func (*LoopManager) SetParentLoopID

func (m *LoopManager) SetParentLoopID(loopID, parentLoopID string) error

SetParentLoopID is an alias for SetParentLoop for consistency with TaskMessage field names

func (*LoopManager) SetTimeout

func (m *LoopManager) SetTimeout(loopID string, timeout time.Duration) error

SetTimeout sets the timeout for a loop

func (*LoopManager) SetUserContext

func (m *LoopManager) SetUserContext(loopID, channelType, channelID, userID string) error

SetUserContext sets the user routing info for error notifications

func (*LoopManager) SetWorkflowContext

func (m *LoopManager) SetWorkflowContext(loopID, workflowSlug, workflowStep string) error

SetWorkflowContext sets the workflow slug and step for loops created by workflow commands

func (*LoopManager) StoreToolResult

func (m *LoopManager) StoreToolResult(loopID string, result agentic.ToolResult) error

StoreToolResult stores a tool result in the loop entity for later retrieval

func (*LoopManager) TrackRequest

func (m *LoopManager) TrackRequest(requestID, loopID string)

TrackRequest associates a request ID with a loop ID

func (*LoopManager) TrackRequestStart

func (m *LoopManager) TrackRequestStart(requestID string)

TrackRequestStart records when a model request was sent.

func (*LoopManager) TrackToolArguments

func (m *LoopManager) TrackToolArguments(callID string, args map[string]any)

TrackToolArguments associates a tool call ID with its arguments. This is used to populate the ToolArguments field on trajectory steps for audit.

func (*LoopManager) TrackToolCall

func (m *LoopManager) TrackToolCall(callID, loopID string)

TrackToolCall associates a tool call ID with a loop ID

func (*LoopManager) TrackToolName

func (m *LoopManager) TrackToolName(callID, name string)

TrackToolName associates a tool call ID with its function name. This is used to populate the name field on tool result messages (required by Gemini).

func (*LoopManager) TrackToolStart

func (m *LoopManager) TrackToolStart(callID string)

TrackToolStart records when a tool call was dispatched for execution.

func (*LoopManager) TransitionLoop

func (m *LoopManager) TransitionLoop(loopID string, newState agentic.LoopState) error

TransitionLoop transitions a loop to a new state

func (*LoopManager) UpdateCompletion

func (m *LoopManager) UpdateCompletion(loopID, outcome, result, errMsg string) error

UpdateCompletion updates a loop with completion data (outcome, result, error). This is called when a loop finishes to populate fields for SSE delivery via KV watch.

func (*LoopManager) UpdateLoop

func (m *LoopManager) UpdateLoop(entity agentic.LoopEntity) error

UpdateLoop updates an existing loop entity

type LoopManagerOption

type LoopManagerOption func(*LoopManager)

LoopManagerOption is a functional option for configuring LoopManager

func WithLoopManagerLogger

func WithLoopManagerLogger(logger *slog.Logger) LoopManagerOption

WithLoopManagerLogger sets the logger for the LoopManager and its context managers

func WithLoopManagerModelRegistry

func WithLoopManagerModelRegistry(reg model.RegistryReader) LoopManagerOption

WithLoopManagerModelRegistry sets the model registry for context managers

type MessageHandler

type MessageHandler struct {
	// contains filtered or unexported fields
}

MessageHandler handles incoming messages and coordinates loop execution

func NewMessageHandler

func NewMessageHandler(config Config, loopManagerOpts ...LoopManagerOption) *MessageHandler

NewMessageHandler creates a new MessageHandler

func (*MessageHandler) BuildFailureEvent

func (h *MessageHandler) BuildFailureEvent(loopID, reason, errorMsg string) (PublishedMessage, error)

BuildFailureEvent creates a failure event for publishing (public wrapper). Used by the component to publish failure events when handler returns errors.

func (*MessageHandler) BuildFailureMessages

func (h *MessageHandler) BuildFailureMessages(loopID, reason, errorMsg string) ([]PublishedMessage, error)

BuildFailureMessages creates failure events for publishing. Returns the standard failure event for reactive workflows to observe via KV watch or subject subscription.

func (*MessageHandler) CancelLoop

func (h *MessageHandler) CancelLoop(loopID, cancelledBy string) (agentic.LoopEntity, error)

CancelLoop atomically cancels a loop and populates completion data.

func (*MessageHandler) GetContextManager

func (h *MessageHandler) GetContextManager(loopID string) *ContextManager

GetContextManager returns the ContextManager for a given loop ID. Used by BoidHandler to apply steering signals to context.

func (*MessageHandler) GetLoop

func (h *MessageHandler) GetLoop(loopID string) (agentic.LoopEntity, error)

GetLoop retrieves a loop entity (for testing)

func (*MessageHandler) HandleModelResponse

func (h *MessageHandler) HandleModelResponse(ctx context.Context, loopID string, response agentic.AgentResponse) (HandlerResult, error)

HandleModelResponse processes a model response

func (*MessageHandler) HandleTask

func (h *MessageHandler) HandleTask(ctx context.Context, task TaskMessage) (HandlerResult, error)

HandleTask processes an incoming task message and creates a new loop

func (*MessageHandler) HandleToolResult

func (h *MessageHandler) HandleToolResult(ctx context.Context, loopID string, toolResult agentic.ToolResult) (HandlerResult, error)

HandleToolResult processes a tool execution result

func (*MessageHandler) SetLogger

func (h *MessageHandler) SetLogger(logger *slog.Logger)

SetLogger sets the logger for the handler

func (*MessageHandler) SetSummarizer

func (h *MessageHandler) SetSummarizer(s Summarizer, modelName string)

SetSummarizer injects an LLM-backed summarizer into the compactor. When set, context compaction generates real summaries instead of stubs. modelName is the resolved endpoint name reported in CompactionResult.

func (*MessageHandler) SetToolCallFilter

func (h *MessageHandler) SetToolCallFilter(filter agentic.ToolCallFilter)

SetToolCallFilter sets a filter that intercepts tool calls before execution. When set, each tool call batch is passed through the filter. Rejected calls receive immediate error results; approved calls proceed to tool.execute.

func (*MessageHandler) UpdateLoop

func (h *MessageHandler) UpdateLoop(entity agentic.LoopEntity) error

UpdateLoop updates a loop entity

type PublishedMessage

type PublishedMessage struct {
	Subject string
	Data    []byte
}

PublishedMessage represents a message published to NATS

type RegionType

type RegionType string

RegionType defines the type of context region

const (
	RegionSystemPrompt     RegionType = "system_prompt"     // System prompt and instructions
	RegionCompactedHistory RegionType = "compacted_history" // Summarized old conversation
	RegionRecentHistory    RegionType = "recent_history"    // Recent uncompacted messages
	RegionToolResults      RegionType = "tool_results"      // Tool execution results
	RegionHydratedContext  RegionType = "hydrated_context"  // Retrieved context from memory
	RegionGraphEntities    RegionType = "graph_entities"    // Graph entity context (multi-agent)
)

Context region types define different areas of the conversation context

type SignalStore

type SignalStore struct {
	// contains filtered or unexported fields
}

SignalStore maintains active steering signals per loop, organized by signal type. Signals are time-limited and automatically expire after TTL.

func NewSignalStore

func NewSignalStore(ttl time.Duration) *SignalStore

NewSignalStore creates a new signal store with the given TTL.

func (*SignalStore) Cleanup

func (s *SignalStore) Cleanup() int

Cleanup removes expired signals (can be called periodically).

func (*SignalStore) Get

func (s *SignalStore) Get(loopID, signalType string) *boid.SteeringSignal

Get retrieves an active signal by loop ID and signal type. Returns nil if no signal exists or if it has expired.

func (*SignalStore) GetAll

func (s *SignalStore) GetAll(loopID string) map[string]*boid.SteeringSignal

GetAll retrieves all active signals for a loop. Returns a map of signal type -> signal, excluding expired signals.

func (*SignalStore) Remove

func (s *SignalStore) Remove(loopID string)

Remove removes all signals for a loop (called on loop completion).

func (*SignalStore) Store

func (s *SignalStore) Store(signal *boid.SteeringSignal)

Store adds or updates a steering signal for a loop. Signals are keyed by loop ID and signal type (only most recent per type kept).

type Summarizer

type Summarizer interface {
	// Summarize generates a concise summary of the given conversation messages.
	// maxTokens limits the response length.
	Summarize(ctx context.Context, messages []agentic.ChatMessage, maxTokens int) (string, error)
}

Summarizer abstracts the LLM call for context compaction.

type TaskMessage

type TaskMessage = agentic.TaskMessage

TaskMessage is an alias for agentic.TaskMessage for backward compatibility. This allows existing code to use agenticloop.TaskMessage without modification.

type TrajectoryManager

type TrajectoryManager struct {
	// contains filtered or unexported fields
}

TrajectoryManager manages trajectory capture and persistence

func NewTrajectoryManager

func NewTrajectoryManager() *TrajectoryManager

NewTrajectoryManager creates a new TrajectoryManager

func (*TrajectoryManager) AddStep

AddStep adds a step to a trajectory

func (*TrajectoryManager) CompleteTrajectory

func (m *TrajectoryManager) CompleteTrajectory(loopID, outcome string) (agentic.Trajectory, error)

CompleteTrajectory marks a trajectory as complete

func (*TrajectoryManager) GetTrajectory

func (m *TrajectoryManager) GetTrajectory(loopID string) (agentic.Trajectory, error)

GetTrajectory retrieves a trajectory by loop ID

func (*TrajectoryManager) SaveTrajectory

func (m *TrajectoryManager) SaveTrajectory(_ context.Context, _ agentic.Trajectory) error

SaveTrajectory saves a trajectory to KV storage

func (*TrajectoryManager) StartTrajectory

func (m *TrajectoryManager) StartTrajectory(loopID string) (agentic.Trajectory, error)

StartTrajectory starts a new trajectory for a loop

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL