Documentation
¶
Overview ¶
Package agenticloop provides the agentic loop orchestrator component. It manages the state machine for agentic loops, coordinating between model calls and tool executions while capturing execution trajectories.
Package agenticloop provides the loop orchestrator for the SemStreams agentic system.
Overview ¶
The agentic-loop processor orchestrates autonomous agent execution by managing the lifecycle of agentic loops. It coordinates communication between the model processor (LLM calls) and tools processor (tool execution), tracks state through a 10-state machine, supports signal handling for user control, manages context memory with automatic compaction, and captures complete execution trajectories for observability.
This is the central component of the agentic system - it receives task requests, routes messages between model and tools, handles iteration limits, processes control signals, manages context memory, and publishes completion events.
Architecture ¶
The loop orchestrator sits at the center of the agentic component family:
┌─────────────────┐
agent.task.* ──▶ │ │ ──▶ agent.request.*
│ agentic-loop │
agent.response.>◀─│ (this pkg) │◀── agent.response.*
│ │
tool.result.> ──▶│ │ ──▶ tool.execute.*
│ │
agent.signal.* ──▶│ │ ──▶ agent.complete.*
│ │
│ │ ──▶ agent.context.compaction.*
└────────┬────────┘
│
┌────────┴────────┐
│ NATS KV │
│ AGENT_LOOPS │
│ AGENT_TRAJ... │
└─────────────────┘
Message Flow ¶
A typical loop execution follows this pattern:
- External system publishes TaskMessage to agent.task.*
- Loop creates LoopEntity, starts Trajectory, publishes AgentRequest to agent.request.*
- agentic-model processes request, publishes AgentResponse to agent.response.*
- Loop receives response: - If status="tool_call": publishes ToolCall to tool.execute.* for each tool - If status="complete": publishes completion to agent.complete.* - If status="error": marks loop as failed
- agentic-tools executes tools, publishes ToolResult to tool.result.*
- Loop receives tool results, when all complete: increments iteration, sends next request
- Cycle repeats until complete, failed, or max iterations reached
State Machine ¶
Loops progress through ten states defined in the agentic package:
exploring → planning → architecting → executing → reviewing → complete
↑ ↑ ↑ ↑ ↑ ↘ failed
└──────────┴────────────┴─────────────┴───────────┘ ↘ cancelled
↘ paused
↘ awaiting_approval
States:
- exploring: Initial state, gathering information
- planning: Developing approach
- architecting: Designing solution
- executing: Implementing solution
- reviewing: Validating results
- complete: Successfully finished (terminal)
- failed: Failed due to error or max iterations (terminal)
- cancelled: Cancelled by user signal (terminal)
- paused: Paused by user signal, can resume
- awaiting_approval: Waiting for user approval
States are fluid checkpoints - the loop can transition backward (e.g., from executing back to exploring) to support agent rethinking. Only terminal states (complete, failed, cancelled) prevent further transitions.
State transitions are managed by the LoopManager and persisted to NATS KV.
Signal Handling ¶
The loop accepts control signals via the agent.signal.* input port:
signal := agentic.UserSignal{
SignalID: "sig_abc123",
Type: "cancel", // cancel, pause, resume, approve, reject, feedback, retry
LoopID: "loop_456",
UserID: "user_789",
ChannelType: "cli",
ChannelID: "session_001",
Timestamp: time.Now(),
}
Signal types and their effects:
- cancel: Stop execution immediately, transition to cancelled state
- pause: Pause at next checkpoint, transition to paused state
- resume: Continue paused loop, restore previous state
- approve: Approve pending result, transition to complete
- reject: Reject with optional reason, transition to failed
- feedback: Add feedback without decision, no state change
- retry: Retry failed loop, transition to exploring
Context Management ¶
The loop includes automatic context memory management to handle long-running conversations that approach model token limits.
Context is organized into priority regions (lower priority evicted first):
- tool_results (priority 1) - Tool execution results, GC'd by age
- recent_history (priority 2) - Recent conversation messages
- hydrated_context (priority 3) - Retrieved context from memory
- compacted_history (priority 4) - Summarized old conversation
- system_prompt (priority 5) - Never evicted
Configuration:
context := agenticloop.ContextConfig{
Enabled: true,
CompactThreshold: 0.60, // Trigger compaction at 60% utilization
ToolResultMaxAge: 3, // Evict tool results older than 3 iterations
HeadroomTokens: 6400, // Reserve tokens for new content
}
Model context limits are resolved from the unified model registry (component.Dependencies.ModelRegistry). If a model is not found in the registry, DefaultContextLimit (128000) is used as fallback.
Context events are published to agent.context.compaction.*:
- compaction_starting: Context approaching limit, compaction starting
- compaction_complete: Compaction finished, includes tokens saved
- gc_complete: Tool result garbage collection completed
Component Architecture ¶
The package is organized into three main components:
**LoopManager** - Manages loop entity lifecycle:
manager := NewLoopManager()
// Create a loop
loopID, err := manager.CreateLoop("task_123", "general", "gpt-4", 20)
// State transitions
err = manager.TransitionLoop(loopID, agentic.LoopStateExecuting)
// Iteration tracking
err = manager.IncrementIteration(loopID)
// Pending tool management
manager.AddPendingTool(loopID, "call_001")
manager.RemovePendingTool(loopID, "call_001")
allDone := manager.AllToolsComplete(loopID)
// Context management
cm := manager.GetContextManager(loopID)
**TrajectoryManager** - Captures execution traces:
trajManager := NewTrajectoryManager()
// Start trajectory for a loop
trajectory, err := trajManager.StartTrajectory(loopID)
// Add steps (model calls, tool calls)
trajManager.AddStep(loopID, agentic.TrajectoryStep{
Timestamp: time.Now(),
StepType: "model_call",
TokensIn: 150,
TokensOut: 200,
})
// Complete trajectory
trajectory, err = trajManager.CompleteTrajectory(loopID, "complete")
**MessageHandler** - Routes and processes messages:
handler := NewMessageHandler(config)
// Handle incoming task
result, err := handler.HandleTask(ctx, TaskMessage{
TaskID: "task_123",
Role: "general",
Model: "gpt-4",
Prompt: "Analyze this code for bugs",
})
// Handle model response
result, err = handler.HandleModelResponse(ctx, loopID, response)
// Handle tool result
result, err = handler.HandleToolResult(ctx, loopID, toolResult)
Configuration ¶
The processor is configured via JSON:
{
"max_iterations": 20,
"timeout": "120s",
"stream_name": "AGENT",
"loops_bucket": "AGENT_LOOPS",
"trajectories_bucket": "AGENT_TRAJECTORIES",
"context": {
"enabled": true,
"compact_threshold": 0.60,
"tool_result_max_age": 3,
"headroom_tokens": 6400,
},
"ports": {
"inputs": [...],
"outputs": [...]
}
}
Configuration fields:
- max_iterations: Maximum loop iterations before failure (default: 20, range: 1-1000)
- timeout: Loop execution timeout as duration string (default: "120s")
- stream_name: JetStream stream name for agentic messages (default: "AGENT")
- loops_bucket: NATS KV bucket for loop state (default: "AGENT_LOOPS")
- trajectories_bucket: NATS KV bucket for trajectories (default: "AGENT_TRAJECTORIES")
- consumer_name_suffix: Optional suffix for JetStream consumer names (for testing)
- context: Context management configuration (see ContextConfig)
- ports: Port configuration for inputs and outputs
Ports ¶
Input ports (JetStream consumers):
- agent.task: Task requests from external systems (subject: agent.task.*)
- agent.response: Model responses from agentic-model (subject: agent.response.>)
- tool.result: Tool results from agentic-tools (subject: tool.result.>)
- agent.signal: Control signals for loops (subject: agent.signal.*)
Output ports (JetStream publishers):
- agent.request: Model requests to agentic-model (subject: agent.request.*)
- tool.execute: Tool execution requests to agentic-tools (subject: tool.execute.*)
- agent.complete: Loop completion events (subject: agent.complete.*)
- agent.context.compaction: Context compaction events (subject: agent.context.compaction.*)
KV write ports:
- loops: Loop entity state (bucket: AGENT_LOOPS)
- trajectories: Trajectory data (bucket: AGENT_TRAJECTORIES)
KV Storage ¶
Loop state and trajectories are persisted to NATS KV for durability and queryability:
**AGENT_LOOPS bucket**: Stores LoopEntity as JSON, keyed by loop ID
{
"id": "loop_123",
"task_id": "task_456",
"state": "executing",
"role": "general",
"model": "gpt-4",
"iterations": 3,
"max_iterations": 20,
"parent_loop_id": "",
"pause_requested": false,
"user_id": "user_789",
"channel_type": "cli",
"channel_id": "session_001"
}
**COMPLETE_{loopID}**: Written when a loop completes, for rules engine consumption
{
"loop_id": "loop_123",
"task_id": "task_456",
"outcome": "success",
"role": "architect",
"result": "Designed authentication system...",
"model": "gpt-4",
"iterations": 3,
"parent_loop": ""
}
**AGENT_TRAJECTORIES bucket**: Stores Trajectory as JSON, keyed by loop ID
{
"loop_id": "loop_123",
"start_time": "2024-01-15T10:30:00Z",
"end_time": "2024-01-15T10:31:45Z",
"steps": [...],
"outcome": "complete",
"total_tokens_in": 1500,
"total_tokens_out": 800,
"duration": 105000
}
KV buckets are created automatically if they don't exist during component startup.
Rules/Workflow Integration ¶
The loop integrates with the rules engine for orchestration:
- On completion, writes COMPLETE_{loopID} key to KV
- Rules engine watches COMPLETE_* keys
- Rules can trigger follow-up actions (e.g., spawn editor when architect completes)
Architect/Editor pattern:
- Task arrives with role="architect"
- Architect loop executes and produces a plan
- On completion, COMPLETE_{loopID} written with role="architect"
- Rule matches COMPLETE_* where role="architect"
- Rule spawns new loop with role="editor", parent_loop={loopID}
- Editor receives architect's output as context
agentic-memory Integration ¶
The loop publishes context events that agentic-memory consumes:
- compaction_starting: agentic-memory extracts facts before compaction
- compaction_complete: agentic-memory injects recovered context
- gc_complete: Logged for observability
Quick Start ¶
Create and start the component:
config := agenticloop.Config{
MaxIterations: 20,
Timeout: "120s",
StreamName: "AGENT",
LoopsBucket: "AGENT_LOOPS",
TrajectoriesBucket: "AGENT_TRAJECTORIES",
Context: agenticloop.DefaultContextConfig(),
}
rawConfig, _ := json.Marshal(config)
comp, err := agenticloop.NewComponent(rawConfig, deps)
lc := comp.(component.LifecycleComponent)
lc.Initialize()
lc.Start(ctx)
defer lc.Stop(5 * time.Second)
Publish a task:
task := agenticloop.TaskMessage{
TaskID: "analyze_code",
Role: "general",
Model: "gpt-4",
Prompt: "Review main.go for security issues",
}
taskData, _ := json.Marshal(task)
natsClient.PublishToStream(ctx, "agent.task.review", taskData)
Thread Safety ¶
The LoopManager, TrajectoryManager, and ContextManager are thread-safe, using RWMutex for concurrent access. Multiple goroutines can safely:
- Create and manage different loops concurrently
- Read loop state while other loops are being modified
- Track pending tools across concurrent tool executions
- Add messages to context regions
The Component itself is not designed for concurrent Start/Stop calls.
Error Handling ¶
Errors are handled at multiple levels:
- Validation errors: Returned immediately from handlers
- State transition errors: Logged, loop may continue or fail depending on severity
- Max iterations: Loop transitions to failed state, not returned as error
- KV persistence errors: Logged but don't block message processing
- Context cancellation: Propagated up, handlers check ctx.Err() early
Observability ¶
The component provides observability through:
- Structured logging (slog) for all significant events
- Trajectory capture for complete execution audit trails
- Context events for memory management visibility
- Health status via Health() method
- Flow metrics via DataFlow() method
Testing ¶
For testing, use the ConsumerNameSuffix config option to create unique JetStream consumer names per test:
config := agenticloop.Config{
StreamName: "AGENT",
ConsumerNameSuffix: "test-" + t.Name(),
// ...
}
This prevents consumer name conflicts when running tests in parallel.
Limitations ¶
Current limitations:
- No streaming support for partial responses
- Trajectory size limited by NATS KV (1MB default)
- No built-in retry for failed tool executions
- Context summarization requires LLM call (cost consideration)
See Also ¶
Related packages:
- agentic: Shared types (LoopEntity, AgentRequest, UserSignal, etc.)
- processor/agentic-model: LLM endpoint integration
- processor/agentic-tools: Tool execution framework
- processor/agentic-memory: Graph-backed agent memory
- processor/agentic-dispatch: User message routing
- processor/workflow: Multi-step orchestration
Package agenticloop provides Prometheus metrics for agentic-loop component.
Index ¶
- Constants
- func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
- func Register(registry *component.Registry) error
- type BoidHandler
- func (h *BoidHandler) ApplySteeringToEntities(loopID string) (prioritize, avoid []string)
- func (h *BoidHandler) CalculateVelocity(oldFocus, newFocus []string) float64
- func (h *BoidHandler) ClearSignals(loopID string)
- func (h *BoidHandler) DeletePosition(ctx context.Context, loopID string) error
- func (h *BoidHandler) ExtractEntitiesFromContext(content string) []string
- func (h *BoidHandler) ExtractEntitiesFromToolResult(content string) []string
- func (h *BoidHandler) ExtractPredicatesFromToolResult(content string) []string
- func (h *BoidHandler) FilterEntitiesBySignals(loopID string, entities []string) []string
- func (h *BoidHandler) GetActiveSignal(loopID, signalType string) *boid.SteeringSignal
- func (h *BoidHandler) GetActiveSignals(loopID string) map[string]*boid.SteeringSignal
- func (h *BoidHandler) GetAlignmentPatterns(loopID string) []string
- func (h *BoidHandler) GetPosition(ctx context.Context, loopID string) (*boid.AgentPosition, error)
- func (h *BoidHandler) HandleSteeringSignalMessage(ctx context.Context, data []byte, ...) string
- func (h *BoidHandler) ProcessSteeringSignal(_ context.Context, signal *boid.SteeringSignal, cm *ContextManager) error
- func (h *BoidHandler) UpdatePosition(ctx context.Context, pos *boid.AgentPosition) error
- type BoidSteeringConfig
- type CompactionResult
- type Compactor
- type CompactorOption
- type Component
- func (c *Component) ConfigSchema() component.ConfigSchema
- func (c *Component) DataFlow() component.FlowMetrics
- func (c *Component) Health() component.HealthStatus
- func (c *Component) Initialize() error
- func (c *Component) InputPorts() []component.Port
- func (c *Component) Meta() component.Metadata
- func (c *Component) OutputPorts() []component.Port
- func (c *Component) Phase() string
- func (c *Component) Start(ctx context.Context) error
- func (c *Component) StateManager() *workflow.StateManager
- func (c *Component) Stop(timeout time.Duration) error
- func (c *Component) WorkflowID() string
- type Config
- type ContextConfig
- type ContextManager
- func (cm *ContextManager) AddGraphEntityContext(entityID string, content string) error
- func (cm *ContextManager) AddMessage(region RegionType, msg agentic.ChatMessage) error
- func (cm *ContextManager) AdvanceIteration()
- func (cm *ContextManager) ApplyBoidSteering(steering BoidSteeringConfig)
- func (cm *ContextManager) CheckBudget(budgetTokens int) (bool, int)
- func (cm *ContextManager) ClearGraphEntities()
- func (cm *ContextManager) GCToolResults(currentIteration int) int
- func (cm *ContextManager) GetContext() []agentic.ChatMessage
- func (cm *ContextManager) GetGraphEntityTokens() int
- func (cm *ContextManager) GetRegionTokens(region RegionType) int
- func (cm *ContextManager) ShouldCompact() bool
- func (cm *ContextManager) SliceForBudget(budget int, slice ContextSlice) error
- func (cm *ContextManager) Utilization() float64
- type ContextManagerOption
- type ContextSlice
- type HandlerResult
- type LLMSummarizer
- type LoopManager
- func (m *LoopManager) AddPendingTool(loopID, callID string) error
- func (m *LoopManager) AllToolsComplete(loopID string) bool
- func (m *LoopManager) CacheMetadata(loopID string, metadata map[string]any)
- func (m *LoopManager) CacheTools(loopID string, tools []agentic.ToolDefinition)
- func (m *LoopManager) CancelLoop(loopID, cancelledBy string) (agentic.LoopEntity, error)
- func (m *LoopManager) CreateLoop(taskID, role, model string, maxIterations ...int) (string, error)
- func (m *LoopManager) CreateLoopWithID(loopID, taskID, role, model string, maxIterations ...int) (string, error)
- func (m *LoopManager) DeleteLoop(loopID string) error
- func (m *LoopManager) ExtractLoopIDFromRequest(requestID string) string
- func (m *LoopManager) ExtractLoopIDFromToolCall(toolCallID string) string
- func (m *LoopManager) GenerateRequestID(loopID string) string
- func (m *LoopManager) GenerateToolCallID(loopID string) string
- func (m *LoopManager) GetAndClearToolResults(loopID string) []agentic.ToolResult
- func (m *LoopManager) GetCachedMetadata(loopID string) map[string]any
- func (m *LoopManager) GetCachedTools(loopID string) []agentic.ToolDefinition
- func (m *LoopManager) GetContextManager(loopID string) *ContextManager
- func (m *LoopManager) GetCurrentIteration(loopID string) int
- func (m *LoopManager) GetDepth(loopID string) (depth, maxDepth int, err error)
- func (m *LoopManager) GetLoop(loopID string) (agentic.LoopEntity, error)
- func (m *LoopManager) GetLoopForRequest(requestID string) (string, bool)
- func (m *LoopManager) GetLoopForRequestWithRecovery(requestID string) (string, bool)
- func (m *LoopManager) GetLoopForToolCall(callID string) (string, bool)
- func (m *LoopManager) GetLoopForToolCallWithRecovery(toolCallID string) (string, bool)
- func (m *LoopManager) GetPendingTools(loopID string) []string
- func (m *LoopManager) GetRequestStart(requestID string) time.Time
- func (m *LoopManager) GetToolArguments(callID string) map[string]any
- func (m *LoopManager) GetToolName(callID string) string
- func (m *LoopManager) GetToolStart(callID string) time.Time
- func (m *LoopManager) IncrementIteration(loopID string) error
- func (m *LoopManager) IsTimedOut(loopID string) bool
- func (m *LoopManager) RemovePendingTool(loopID, callID string) error
- func (m *LoopManager) SetDepth(loopID string, depth, maxDepth int) error
- func (m *LoopManager) SetParentLoop(loopID, parentLoopID string) error
- func (m *LoopManager) SetParentLoopID(loopID, parentLoopID string) error
- func (m *LoopManager) SetTimeout(loopID string, timeout time.Duration) error
- func (m *LoopManager) SetUserContext(loopID, channelType, channelID, userID string) error
- func (m *LoopManager) SetWorkflowContext(loopID, workflowSlug, workflowStep string) error
- func (m *LoopManager) StoreToolResult(loopID string, result agentic.ToolResult) error
- func (m *LoopManager) TrackRequest(requestID, loopID string)
- func (m *LoopManager) TrackRequestStart(requestID string)
- func (m *LoopManager) TrackToolArguments(callID string, args map[string]any)
- func (m *LoopManager) TrackToolCall(callID, loopID string)
- func (m *LoopManager) TrackToolName(callID, name string)
- func (m *LoopManager) TrackToolStart(callID string)
- func (m *LoopManager) TransitionLoop(loopID string, newState agentic.LoopState) error
- func (m *LoopManager) UpdateCompletion(loopID, outcome, result, errMsg string) error
- func (m *LoopManager) UpdateLoop(entity agentic.LoopEntity) error
- type LoopManagerOption
- type MessageHandler
- func (h *MessageHandler) BuildFailureEvent(loopID, reason, errorMsg string) (PublishedMessage, error)
- func (h *MessageHandler) BuildFailureMessages(loopID, reason, errorMsg string) ([]PublishedMessage, error)
- func (h *MessageHandler) CancelLoop(loopID, cancelledBy string) (agentic.LoopEntity, error)
- func (h *MessageHandler) GetContextManager(loopID string) *ContextManager
- func (h *MessageHandler) GetLoop(loopID string) (agentic.LoopEntity, error)
- func (h *MessageHandler) HandleModelResponse(ctx context.Context, loopID string, response agentic.AgentResponse) (HandlerResult, error)
- func (h *MessageHandler) HandleTask(ctx context.Context, task TaskMessage) (HandlerResult, error)
- func (h *MessageHandler) HandleToolResult(ctx context.Context, loopID string, toolResult agentic.ToolResult) (HandlerResult, error)
- func (h *MessageHandler) SetLogger(logger *slog.Logger)
- func (h *MessageHandler) SetSummarizer(s Summarizer, modelName string)
- func (h *MessageHandler) SetToolCallFilter(filter agentic.ToolCallFilter)
- func (h *MessageHandler) UpdateLoop(entity agentic.LoopEntity) error
- type PublishedMessage
- type RegionType
- type SignalStore
- type Summarizer
- type TaskMessage
- type TrajectoryManager
- func (m *TrajectoryManager) AddStep(loopID string, step agentic.TrajectoryStep) (agentic.Trajectory, error)
- func (m *TrajectoryManager) CompleteTrajectory(loopID, outcome string) (agentic.Trajectory, error)
- func (m *TrajectoryManager) GetTrajectory(loopID string) (agentic.Trajectory, error)
- func (m *TrajectoryManager) SaveTrajectory(_ context.Context, _ agentic.Trajectory) error
- func (m *TrajectoryManager) StartTrajectory(loopID string) (agentic.Trajectory, error)
Constants ¶
const DefaultContextLimit = 128000
DefaultContextLimit is the fallback context window size when the model is unknown.
Variables ¶
This section is empty.
Functions ¶
func NewComponent ¶
func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
NewComponent creates a new agentic-loop component
Types ¶
type BoidHandler ¶
type BoidHandler struct {
// contains filtered or unexported fields
}
BoidHandler manages agent position tracking and steering signal processing for Boids-style local coordination rules.
func NewBoidHandler ¶
func NewBoidHandler(positionsBucket jetstream.KeyValue, logger *slog.Logger) *BoidHandler
NewBoidHandler creates a new boid handler with the default signal TTL.
func NewBoidHandlerWithTTL ¶
func NewBoidHandlerWithTTL(positionsBucket jetstream.KeyValue, logger *slog.Logger, signalTTL time.Duration) *BoidHandler
NewBoidHandlerWithTTL creates a new boid handler with a custom signal TTL.
func (*BoidHandler) ApplySteeringToEntities ¶
func (h *BoidHandler) ApplySteeringToEntities(loopID string) (prioritize, avoid []string)
ApplySteeringToEntities applies active steering signals to prioritize/deprioritize entities. Returns two lists: prioritized entities (from cohesion) and avoided entities (from separation).
func (*BoidHandler) CalculateVelocity ¶
func (h *BoidHandler) CalculateVelocity(oldFocus, newFocus []string) float64
CalculateVelocity computes velocity based on position changes.
func (*BoidHandler) ClearSignals ¶
func (h *BoidHandler) ClearSignals(loopID string)
ClearSignals removes all signals for a loop (called on loop completion).
func (*BoidHandler) DeletePosition ¶
func (h *BoidHandler) DeletePosition(ctx context.Context, loopID string) error
DeletePosition removes an agent's position when the loop completes.
func (*BoidHandler) ExtractEntitiesFromContext ¶
func (h *BoidHandler) ExtractEntitiesFromContext(content string) []string
ExtractEntitiesFromContext extracts entity IDs from context messages. Looks for "[Entity: {id}]" prefixes added by AddGraphEntityContext.
func (*BoidHandler) ExtractEntitiesFromToolResult ¶
func (h *BoidHandler) ExtractEntitiesFromToolResult(content string) []string
ExtractEntitiesFromToolResult extracts entity IDs from tool result content. This captures which entities an agent accessed during tool execution.
func (*BoidHandler) ExtractPredicatesFromToolResult ¶
func (h *BoidHandler) ExtractPredicatesFromToolResult(content string) []string
ExtractPredicatesFromToolResult extracts predicate patterns from tool result content. This captures which relationship types an agent is following during graph traversal. Only known predicates are returned to avoid noise from random snake_case strings.
func (*BoidHandler) FilterEntitiesBySignals ¶
func (h *BoidHandler) FilterEntitiesBySignals(loopID string, entities []string) []string
FilterEntitiesBySignals filters a list of entities based on active steering signals. Entities in the avoid list are moved to the end, entities in prioritize list go first.
func (*BoidHandler) GetActiveSignal ¶
func (h *BoidHandler) GetActiveSignal(loopID, signalType string) *boid.SteeringSignal
GetActiveSignal retrieves the most recent signal of a specific type for a loop.
func (*BoidHandler) GetActiveSignals ¶
func (h *BoidHandler) GetActiveSignals(loopID string) map[string]*boid.SteeringSignal
GetActiveSignals retrieves all active signals for a loop.
func (*BoidHandler) GetAlignmentPatterns ¶
func (h *BoidHandler) GetAlignmentPatterns(loopID string) []string
GetAlignmentPatterns returns the alignment patterns (predicates) to follow.
func (*BoidHandler) GetPosition ¶
func (h *BoidHandler) GetPosition(ctx context.Context, loopID string) (*boid.AgentPosition, error)
GetPosition retrieves an agent's position from the KV bucket.
func (*BoidHandler) HandleSteeringSignalMessage ¶
func (h *BoidHandler) HandleSteeringSignalMessage(ctx context.Context, data []byte, getContextManager func(loopID string) *ContextManager) string
HandleSteeringSignalMessage handles incoming boid steering signal messages from NATS. Returns the signal type if successfully processed, empty string otherwise.
func (*BoidHandler) ProcessSteeringSignal ¶
func (h *BoidHandler) ProcessSteeringSignal(_ context.Context, signal *boid.SteeringSignal, cm *ContextManager) error
ProcessSteeringSignal processes an incoming boid steering signal. Stores the signal for use during context building and tool prioritization, and applies steering to the context manager if provided.
func (*BoidHandler) UpdatePosition ¶
func (h *BoidHandler) UpdatePosition(ctx context.Context, pos *boid.AgentPosition) error
UpdatePosition updates an agent's position in the AGENT_POSITIONS KV bucket.
type BoidSteeringConfig ¶
type BoidSteeringConfig struct {
// PrioritizeEntities are entities to move earlier in context (cohesion).
PrioritizeEntities []string
// AvoidEntities are entities to deprioritize in context (separation).
AvoidEntities []string
// AlignPatterns are predicate patterns to favor (alignment).
AlignPatterns []string
}
BoidSteeringConfig holds Boid steering signal configuration for context building.
type CompactionResult ¶
type CompactionResult struct {
Summary string
EvictedTokens int
NewTokens int
Model string // model used for summarization (empty if stub fallback)
}
CompactionResult contains the results of a compaction operation.
type Compactor ¶
type Compactor struct {
// contains filtered or unexported fields
}
Compactor handles context compaction operations.
func NewCompactor ¶
func NewCompactor(config ContextConfig, opts ...CompactorOption) *Compactor
NewCompactor creates a new compactor. Variadic opts allow optional injection of a Summarizer and logger without breaking existing callers.
func (*Compactor) Compact ¶
func (c *Compactor) Compact(ctx context.Context, cm *ContextManager) (CompactionResult, error)
Compact performs compaction on the context manager. When a Summarizer is injected, it calls the LLM to generate a real summary. If the summarizer returns an error, it falls back to a stub summary and logs a warning.
func (*Compactor) ShouldCompact ¶
func (c *Compactor) ShouldCompact(cm *ContextManager) bool
ShouldCompact delegates to the context manager's ShouldCompact.
type CompactorOption ¶
type CompactorOption func(*Compactor)
CompactorOption is a functional option for configuring a Compactor.
func WithCompactorLogger ¶
func WithCompactorLogger(l *slog.Logger) CompactorOption
WithCompactorLogger sets the logger used by the Compactor.
func WithModelName ¶
func WithModelName(name string) CompactorOption
WithModelName sets the resolved model name reported in CompactionResult.
func WithSummarizer ¶
func WithSummarizer(s Summarizer) CompactorOption
WithSummarizer injects an LLM-backed summarizer into the Compactor. When set, Compact() calls the summarizer instead of the stub fallback.
type Component ¶
type Component struct {
// contains filtered or unexported fields
}
Component implements the agentic-loop processor
func (*Component) ConfigSchema ¶
func (c *Component) ConfigSchema() component.ConfigSchema
ConfigSchema returns the configuration schema
func (*Component) DataFlow ¶
func (c *Component) DataFlow() component.FlowMetrics
DataFlow returns current data flow metrics
func (*Component) Health ¶
func (c *Component) Health() component.HealthStatus
Health returns current health status
func (*Component) Initialize ¶
Initialize prepares the component (no-op for this component)
func (*Component) InputPorts ¶
InputPorts returns input port definitions
func (*Component) OutputPorts ¶
OutputPorts returns output port definitions
func (*Component) Start ¶
Start starts the component. The context is used for cancellation during startup operations.
func (*Component) StateManager ¶
func (c *Component) StateManager() *workflow.StateManager
StateManager returns nil since agentic-loop manages its own state internally. Workflows interact with agentic-loop via events and KV watches, not direct state access.
func (*Component) WorkflowID ¶
WorkflowID returns empty string since this component handles multiple workflows dynamically. The workflow context is tracked per-loop via WorkflowSlug/WorkflowStep fields.
type Config ¶
type Config struct {
MaxIterations int `` /* 153-byte string literal not displayed */
Timeout string `` /* 138-byte string literal not displayed */
StreamName string `json:"stream_name" schema:"type:string,description:JetStream stream name,default:AGENT,category:advanced"`
ConsumerNameSuffix string `json:"consumer_name_suffix" schema:"type:string,description:Suffix for consumer names,category:advanced"`
DeleteConsumerOnStop bool `` /* 157-byte string literal not displayed */
LoopsBucket string `` /* 142-byte string literal not displayed */
TrajectoriesBucket string `` /* 158-byte string literal not displayed */
PositionsBucket string `` /* 153-byte string literal not displayed */
BoidEnabled bool `` /* 170-byte string literal not displayed */
BoidSignalTTL string `` /* 160-byte string literal not displayed */
TrajectoryDetail string `` /* 152-byte string literal not displayed */
TrajectoryTTL string `json:"trajectory_ttl,omitempty" schema:"type:string,description:TTL for trajectory KV entries,default:24h,category:advanced"`
TrajectoryHistory int `` /* 144-byte string literal not displayed */
Context ContextConfig `` /* 142-byte string literal not displayed */
Ports *component.PortConfig `json:"ports,omitempty" schema:"type:ports,description:Port configuration for inputs and outputs,category:basic"`
}
Config represents the configuration for the agentic-loop processor
type ContextConfig ¶
type ContextConfig struct {
Enabled bool `json:"enabled" description:"Enable context memory management"`
CompactThreshold float64 `json:"compact_threshold" description:"Utilization threshold (0.01-1.0) that triggers context compaction"`
ToolResultMaxAge int `json:"tool_result_max_age" description:"Maximum age in iterations before tool results are garbage collected"`
HeadroomTokens int `json:"headroom_tokens" description:"Token headroom to reserve for model responses"`
// Multi-agent context budget fields
MaxBudgetTokens int `json:"max_budget_tokens,omitempty" description:"Hard token limit for context budget (overrides model limits when set)"`
SliceOnBudget bool `json:"slice_on_budget,omitempty" description:"Enable context slicing when budget is exceeded"`
PreserveEntities []string `json:"preserve_entities,omitempty" description:"Entity IDs to always keep in context during slicing"`
EntityPriority int `` /* 127-byte string literal not displayed */
}
ContextConfig represents configuration for context memory management. Model context limits are resolved from the unified model registry (component.Dependencies.ModelRegistry).
func DefaultContextConfig ¶
func DefaultContextConfig() ContextConfig
DefaultContextConfig returns the default context configuration
func (ContextConfig) Validate ¶
func (c ContextConfig) Validate() error
Validate validates the context configuration
type ContextManager ¶
type ContextManager struct {
// contains filtered or unexported fields
}
ContextManager manages conversation context with memory optimization
func NewContextManager ¶
func NewContextManager(loopID, model string, config ContextConfig, opts ...ContextManagerOption) *ContextManager
NewContextManager creates a new context manager
func (*ContextManager) AddGraphEntityContext ¶
func (cm *ContextManager) AddGraphEntityContext(entityID string, content string) error
AddGraphEntityContext adds context from graph entities to the dedicated region
func (*ContextManager) AddMessage ¶
func (cm *ContextManager) AddMessage(region RegionType, msg agentic.ChatMessage) error
AddMessage adds a message to a specific region
func (*ContextManager) AdvanceIteration ¶
func (cm *ContextManager) AdvanceIteration()
AdvanceIteration moves to the next iteration. Call this after processing each loop iteration to ensure proper age tracking for GC.
func (*ContextManager) ApplyBoidSteering ¶
func (cm *ContextManager) ApplyBoidSteering(steering BoidSteeringConfig)
ApplyBoidSteering reorders graph entity context based on Boid steering signals. This moves prioritized entities earlier and avoided entities later in the context.
func (*ContextManager) CheckBudget ¶
func (cm *ContextManager) CheckBudget(budgetTokens int) (bool, int)
CheckBudget checks if current context is within the token budget. Returns (withinBudget, currentTokens).
func (*ContextManager) ClearGraphEntities ¶
func (cm *ContextManager) ClearGraphEntities()
ClearGraphEntities clears the graph entities region
func (*ContextManager) GCToolResults ¶
func (cm *ContextManager) GCToolResults(currentIteration int) int
GCToolResults garbage collects old tool results based on age. Tool results live in RegionRecentHistory (for chronological ordering with assistant messages), so this scans that region for role="tool" messages. After eviction, repairs tool pairs to ensure no orphaned assistant/tool messages.
func (*ContextManager) GetContext ¶
func (cm *ContextManager) GetContext() []agentic.ChatMessage
GetContext returns all messages in region priority order
func (*ContextManager) GetGraphEntityTokens ¶
func (cm *ContextManager) GetGraphEntityTokens() int
GetGraphEntityTokens returns the total tokens in the graph entities region
func (*ContextManager) GetRegionTokens ¶
func (cm *ContextManager) GetRegionTokens(region RegionType) int
GetRegionTokens returns the total token count for a region
func (*ContextManager) ShouldCompact ¶
func (cm *ContextManager) ShouldCompact() bool
ShouldCompact returns true if compaction should be triggered
func (*ContextManager) SliceForBudget ¶
func (cm *ContextManager) SliceForBudget(budget int, slice ContextSlice) error
SliceForBudget removes content to fit within the budget. Uses graph-aware slicing that preserves entity context when EntityPriority is set. Boid steering signals influence eviction: AvoidEntities are evicted first.
func (*ContextManager) Utilization ¶
func (cm *ContextManager) Utilization() float64
Utilization returns the current context utilization (0.0 to 1.0)
type ContextManagerOption ¶
type ContextManagerOption func(*ContextManager)
ContextManagerOption is a functional option for configuring ContextManager
func WithLogger ¶
func WithLogger(logger *slog.Logger) ContextManagerOption
WithLogger sets the logger for the ContextManager
func WithModelRegistry ¶
func WithModelRegistry(reg model.RegistryReader) ContextManagerOption
WithModelRegistry provides registry-based model limit resolution
type ContextSlice ¶
type ContextSlice struct {
IncludeRegions []RegionType // Regions to include
ExcludeRegions []RegionType // Regions to exclude (takes precedence)
PreserveEntities []string // Entity IDs to always keep
// Boid steering configuration (from active steering signals)
AvoidEntities []string // Entities to deprioritize (from separation signals)
}
ContextSlice defines which regions to include when slicing context
type HandlerResult ¶
type HandlerResult struct {
LoopID string
State agentic.LoopState
PublishedMessages []PublishedMessage
PendingTools []string
TrajectorySteps []agentic.TrajectoryStep
ContextEvents []agentic.ContextEvent
RetryScheduled bool
MaxIterationsReached bool
// CompletionState contains enriched completion data for KV persistence.
// This is populated when a loop completes and is used by component.go
// to write to the loops bucket with key pattern COMPLETE_{loopID}.
CompletionState *agentic.LoopCompletedEvent
}
HandlerResult contains the results of a handler operation
type LLMSummarizer ¶
type LLMSummarizer struct {
// contains filtered or unexported fields
}
LLMSummarizer implements Summarizer using a graph/llm.Client.
func NewLLMSummarizer ¶
func NewLLMSummarizer(client llm.Client, logger *slog.Logger) *LLMSummarizer
NewLLMSummarizer creates a summarizer backed by an LLM client.
func (*LLMSummarizer) Summarize ¶
func (s *LLMSummarizer) Summarize(ctx context.Context, messages []agentic.ChatMessage, maxTokens int) (string, error)
Summarize calls the LLM to generate a summary of the conversation messages.
type LoopManager ¶
type LoopManager struct {
// contains filtered or unexported fields
}
LoopManager manages loop entity lifecycle and state
func NewLoopManager ¶
func NewLoopManager(opts ...LoopManagerOption) *LoopManager
NewLoopManager creates a new LoopManager
func NewLoopManagerWithConfig ¶
func NewLoopManagerWithConfig(contextConfig ContextConfig, opts ...LoopManagerOption) *LoopManager
NewLoopManagerWithConfig creates a new LoopManager with custom context config
func (*LoopManager) AddPendingTool ¶
func (m *LoopManager) AddPendingTool(loopID, callID string) error
AddPendingTool adds a pending tool call to the loop
func (*LoopManager) AllToolsComplete ¶
func (m *LoopManager) AllToolsComplete(loopID string) bool
AllToolsComplete returns true if there are no pending tool calls
func (*LoopManager) CacheMetadata ¶
func (m *LoopManager) CacheMetadata(loopID string, metadata map[string]any)
CacheMetadata stores domain context metadata for a loop (set once from task, reused for all tool calls)
func (*LoopManager) CacheTools ¶
func (m *LoopManager) CacheTools(loopID string, tools []agentic.ToolDefinition)
CacheTools stores tool definitions for a loop (discovered once, reused for all requests)
func (*LoopManager) CancelLoop ¶
func (m *LoopManager) CancelLoop(loopID, cancelledBy string) (agentic.LoopEntity, error)
CancelLoop atomically cancels a loop and populates completion data. Returns the updated entity for further processing, or an error if the loop cannot be cancelled (not found or already terminal).
func (*LoopManager) CreateLoop ¶
func (m *LoopManager) CreateLoop(taskID, role, model string, maxIterations ...int) (string, error)
CreateLoop creates a new loop entity with a generated UUID
func (*LoopManager) CreateLoopWithID ¶
func (m *LoopManager) CreateLoopWithID(loopID, taskID, role, model string, maxIterations ...int) (string, error)
CreateLoopWithID creates a new loop entity with a specific ID
func (*LoopManager) DeleteLoop ¶
func (m *LoopManager) DeleteLoop(loopID string) error
DeleteLoop deletes a loop entity and all associated tracking data.
func (*LoopManager) ExtractLoopIDFromRequest ¶
func (m *LoopManager) ExtractLoopIDFromRequest(requestID string) string
ExtractLoopIDFromRequest extracts the loop ID from a structured request ID. Returns empty string if the ID is not in structured format.
func (*LoopManager) ExtractLoopIDFromToolCall ¶
func (m *LoopManager) ExtractLoopIDFromToolCall(toolCallID string) string
ExtractLoopIDFromToolCall extracts the loop ID from a structured tool call ID. Returns empty string if the ID is not in structured format.
func (*LoopManager) GenerateRequestID ¶
func (m *LoopManager) GenerateRequestID(loopID string) string
GenerateRequestID creates a structured request ID that embeds the loop ID. Format: loopID:req:shortUUID This allows recovery of loop ID from request ID if in-memory maps are lost.
func (*LoopManager) GenerateToolCallID ¶
func (m *LoopManager) GenerateToolCallID(loopID string) string
GenerateToolCallID creates a structured tool call ID that embeds the loop ID. Format: loopID:tool:shortUUID This allows recovery of loop ID from tool call ID if in-memory maps are lost.
func (*LoopManager) GetAndClearToolResults ¶
func (m *LoopManager) GetAndClearToolResults(loopID string) []agentic.ToolResult
GetAndClearToolResults retrieves all accumulated tool results and clears them
func (*LoopManager) GetCachedMetadata ¶
func (m *LoopManager) GetCachedMetadata(loopID string) map[string]any
GetCachedMetadata retrieves the cached metadata for a loop
func (*LoopManager) GetCachedTools ¶
func (m *LoopManager) GetCachedTools(loopID string) []agentic.ToolDefinition
GetCachedTools retrieves the cached tool definitions for a loop
func (*LoopManager) GetContextManager ¶
func (m *LoopManager) GetContextManager(loopID string) *ContextManager
GetContextManager retrieves the context manager for a loop
func (*LoopManager) GetCurrentIteration ¶
func (m *LoopManager) GetCurrentIteration(loopID string) int
GetCurrentIteration returns the current iteration for a loop
func (*LoopManager) GetDepth ¶
func (m *LoopManager) GetDepth(loopID string) (depth, maxDepth int, err error)
GetDepth returns the current depth and max depth for a loop
func (*LoopManager) GetLoop ¶
func (m *LoopManager) GetLoop(loopID string) (agentic.LoopEntity, error)
GetLoop retrieves a loop entity by ID
func (*LoopManager) GetLoopForRequest ¶
func (m *LoopManager) GetLoopForRequest(requestID string) (string, bool)
GetLoopForRequest retrieves the loop ID for a request ID
func (*LoopManager) GetLoopForRequestWithRecovery ¶
func (m *LoopManager) GetLoopForRequestWithRecovery(requestID string) (string, bool)
GetLoopForRequestWithRecovery retrieves the loop ID for a request ID, attempting recovery from structured ID if not found in cache.
func (*LoopManager) GetLoopForToolCall ¶
func (m *LoopManager) GetLoopForToolCall(callID string) (string, bool)
GetLoopForToolCall retrieves the loop ID for a tool call ID
func (*LoopManager) GetLoopForToolCallWithRecovery ¶
func (m *LoopManager) GetLoopForToolCallWithRecovery(toolCallID string) (string, bool)
GetLoopForToolCallWithRecovery retrieves the loop ID for a tool call ID, attempting recovery from structured ID if not found in cache.
func (*LoopManager) GetPendingTools ¶
func (m *LoopManager) GetPendingTools(loopID string) []string
GetPendingTools returns all pending tool calls for a loop
func (*LoopManager) GetRequestStart ¶
func (m *LoopManager) GetRequestStart(requestID string) time.Time
GetRequestStart retrieves the start time for a model request.
func (*LoopManager) GetToolArguments ¶
func (m *LoopManager) GetToolArguments(callID string) map[string]any
GetToolArguments retrieves a shallow copy of the arguments for a tool call ID.
func (*LoopManager) GetToolName ¶
func (m *LoopManager) GetToolName(callID string) string
GetToolName retrieves the function name for a tool call ID.
func (*LoopManager) GetToolStart ¶
func (m *LoopManager) GetToolStart(callID string) time.Time
GetToolStart retrieves the start time for a tool call.
func (*LoopManager) IncrementIteration ¶
func (m *LoopManager) IncrementIteration(loopID string) error
IncrementIteration increments the loop iteration counter
func (*LoopManager) IsTimedOut ¶
func (m *LoopManager) IsTimedOut(loopID string) bool
IsTimedOut checks if a loop has exceeded its timeout
func (*LoopManager) RemovePendingTool ¶
func (m *LoopManager) RemovePendingTool(loopID, callID string) error
RemovePendingTool removes a pending tool call from the loop
func (*LoopManager) SetDepth ¶
func (m *LoopManager) SetDepth(loopID string, depth, maxDepth int) error
SetDepth sets the depth tracking for a loop in the multi-agent hierarchy
func (*LoopManager) SetParentLoop ¶
func (m *LoopManager) SetParentLoop(loopID, parentLoopID string) error
SetParentLoop sets the parent loop ID for tracking architect->editor relationships
func (*LoopManager) SetParentLoopID ¶
func (m *LoopManager) SetParentLoopID(loopID, parentLoopID string) error
SetParentLoopID is an alias for SetParentLoop for consistency with TaskMessage field names
func (*LoopManager) SetTimeout ¶
func (m *LoopManager) SetTimeout(loopID string, timeout time.Duration) error
SetTimeout sets the timeout for a loop
func (*LoopManager) SetUserContext ¶
func (m *LoopManager) SetUserContext(loopID, channelType, channelID, userID string) error
SetUserContext sets the user routing info for error notifications
func (*LoopManager) SetWorkflowContext ¶
func (m *LoopManager) SetWorkflowContext(loopID, workflowSlug, workflowStep string) error
SetWorkflowContext sets the workflow slug and step for loops created by workflow commands
func (*LoopManager) StoreToolResult ¶
func (m *LoopManager) StoreToolResult(loopID string, result agentic.ToolResult) error
StoreToolResult stores a tool result in the loop entity for later retrieval
func (*LoopManager) TrackRequest ¶
func (m *LoopManager) TrackRequest(requestID, loopID string)
TrackRequest associates a request ID with a loop ID
func (*LoopManager) TrackRequestStart ¶
func (m *LoopManager) TrackRequestStart(requestID string)
TrackRequestStart records when a model request was sent.
func (*LoopManager) TrackToolArguments ¶
func (m *LoopManager) TrackToolArguments(callID string, args map[string]any)
TrackToolArguments associates a tool call ID with its arguments. This is used to populate the ToolArguments field on trajectory steps for audit.
func (*LoopManager) TrackToolCall ¶
func (m *LoopManager) TrackToolCall(callID, loopID string)
TrackToolCall associates a tool call ID with a loop ID
func (*LoopManager) TrackToolName ¶
func (m *LoopManager) TrackToolName(callID, name string)
TrackToolName associates a tool call ID with its function name. This is used to populate the name field on tool result messages (required by Gemini).
func (*LoopManager) TrackToolStart ¶
func (m *LoopManager) TrackToolStart(callID string)
TrackToolStart records when a tool call was dispatched for execution.
func (*LoopManager) TransitionLoop ¶
func (m *LoopManager) TransitionLoop(loopID string, newState agentic.LoopState) error
TransitionLoop transitions a loop to a new state
func (*LoopManager) UpdateCompletion ¶
func (m *LoopManager) UpdateCompletion(loopID, outcome, result, errMsg string) error
UpdateCompletion updates a loop with completion data (outcome, result, error). This is called when a loop finishes to populate fields for SSE delivery via KV watch.
func (*LoopManager) UpdateLoop ¶
func (m *LoopManager) UpdateLoop(entity agentic.LoopEntity) error
UpdateLoop updates an existing loop entity
type LoopManagerOption ¶
type LoopManagerOption func(*LoopManager)
LoopManagerOption is a functional option for configuring LoopManager
func WithLoopManagerLogger ¶
func WithLoopManagerLogger(logger *slog.Logger) LoopManagerOption
WithLoopManagerLogger sets the logger for the LoopManager and its context managers
func WithLoopManagerModelRegistry ¶
func WithLoopManagerModelRegistry(reg model.RegistryReader) LoopManagerOption
WithLoopManagerModelRegistry sets the model registry for context managers
type MessageHandler ¶
type MessageHandler struct {
// contains filtered or unexported fields
}
MessageHandler handles incoming messages and coordinates loop execution
func NewMessageHandler ¶
func NewMessageHandler(config Config, loopManagerOpts ...LoopManagerOption) *MessageHandler
NewMessageHandler creates a new MessageHandler
func (*MessageHandler) BuildFailureEvent ¶
func (h *MessageHandler) BuildFailureEvent(loopID, reason, errorMsg string) (PublishedMessage, error)
BuildFailureEvent creates a failure event for publishing (public wrapper). Used by the component to publish failure events when handler returns errors.
func (*MessageHandler) BuildFailureMessages ¶
func (h *MessageHandler) BuildFailureMessages(loopID, reason, errorMsg string) ([]PublishedMessage, error)
BuildFailureMessages creates failure events for publishing. Returns the standard failure event for reactive workflows to observe via KV watch or subject subscription.
func (*MessageHandler) CancelLoop ¶
func (h *MessageHandler) CancelLoop(loopID, cancelledBy string) (agentic.LoopEntity, error)
CancelLoop atomically cancels a loop and populates completion data.
func (*MessageHandler) GetContextManager ¶
func (h *MessageHandler) GetContextManager(loopID string) *ContextManager
GetContextManager returns the ContextManager for a given loop ID. Used by BoidHandler to apply steering signals to context.
func (*MessageHandler) GetLoop ¶
func (h *MessageHandler) GetLoop(loopID string) (agentic.LoopEntity, error)
GetLoop retrieves a loop entity (for testing)
func (*MessageHandler) HandleModelResponse ¶
func (h *MessageHandler) HandleModelResponse(ctx context.Context, loopID string, response agentic.AgentResponse) (HandlerResult, error)
HandleModelResponse processes a model response
func (*MessageHandler) HandleTask ¶
func (h *MessageHandler) HandleTask(ctx context.Context, task TaskMessage) (HandlerResult, error)
HandleTask processes an incoming task message and creates a new loop
func (*MessageHandler) HandleToolResult ¶
func (h *MessageHandler) HandleToolResult(ctx context.Context, loopID string, toolResult agentic.ToolResult) (HandlerResult, error)
HandleToolResult processes a tool execution result
func (*MessageHandler) SetLogger ¶
func (h *MessageHandler) SetLogger(logger *slog.Logger)
SetLogger sets the logger for the handler
func (*MessageHandler) SetSummarizer ¶
func (h *MessageHandler) SetSummarizer(s Summarizer, modelName string)
SetSummarizer injects an LLM-backed summarizer into the compactor. When set, context compaction generates real summaries instead of stubs. modelName is the resolved endpoint name reported in CompactionResult.
func (*MessageHandler) SetToolCallFilter ¶
func (h *MessageHandler) SetToolCallFilter(filter agentic.ToolCallFilter)
SetToolCallFilter sets a filter that intercepts tool calls before execution. When set, each tool call batch is passed through the filter. Rejected calls receive immediate error results; approved calls proceed to tool.execute.
func (*MessageHandler) UpdateLoop ¶
func (h *MessageHandler) UpdateLoop(entity agentic.LoopEntity) error
UpdateLoop updates a loop entity
type PublishedMessage ¶
PublishedMessage represents a message published to NATS
type RegionType ¶
type RegionType string
RegionType defines the type of context region
const ( RegionSystemPrompt RegionType = "system_prompt" // System prompt and instructions RegionCompactedHistory RegionType = "compacted_history" // Summarized old conversation RegionRecentHistory RegionType = "recent_history" // Recent uncompacted messages RegionToolResults RegionType = "tool_results" // Tool execution results RegionHydratedContext RegionType = "hydrated_context" // Retrieved context from memory RegionGraphEntities RegionType = "graph_entities" // Graph entity context (multi-agent) )
Context region types define different areas of the conversation context
type SignalStore ¶
type SignalStore struct {
// contains filtered or unexported fields
}
SignalStore maintains active steering signals per loop, organized by signal type. Signals are time-limited and automatically expire after TTL.
func NewSignalStore ¶
func NewSignalStore(ttl time.Duration) *SignalStore
NewSignalStore creates a new signal store with the given TTL.
func (*SignalStore) Cleanup ¶
func (s *SignalStore) Cleanup() int
Cleanup removes expired signals (can be called periodically).
func (*SignalStore) Get ¶
func (s *SignalStore) Get(loopID, signalType string) *boid.SteeringSignal
Get retrieves an active signal by loop ID and signal type. Returns nil if no signal exists or if it has expired.
func (*SignalStore) GetAll ¶
func (s *SignalStore) GetAll(loopID string) map[string]*boid.SteeringSignal
GetAll retrieves all active signals for a loop. Returns a map of signal type -> signal, excluding expired signals.
func (*SignalStore) Remove ¶
func (s *SignalStore) Remove(loopID string)
Remove removes all signals for a loop (called on loop completion).
func (*SignalStore) Store ¶
func (s *SignalStore) Store(signal *boid.SteeringSignal)
Store adds or updates a steering signal for a loop. Signals are keyed by loop ID and signal type (only most recent per type kept).
type Summarizer ¶
type Summarizer interface {
// Summarize generates a concise summary of the given conversation messages.
// maxTokens limits the response length.
Summarize(ctx context.Context, messages []agentic.ChatMessage, maxTokens int) (string, error)
}
Summarizer abstracts the LLM call for context compaction.
type TaskMessage ¶
type TaskMessage = agentic.TaskMessage
TaskMessage is an alias for agentic.TaskMessage for backward compatibility. This allows existing code to use agenticloop.TaskMessage without modification.
type TrajectoryManager ¶
type TrajectoryManager struct {
// contains filtered or unexported fields
}
TrajectoryManager manages trajectory capture and persistence
func NewTrajectoryManager ¶
func NewTrajectoryManager() *TrajectoryManager
NewTrajectoryManager creates a new TrajectoryManager
func (*TrajectoryManager) AddStep ¶
func (m *TrajectoryManager) AddStep(loopID string, step agentic.TrajectoryStep) (agentic.Trajectory, error)
AddStep adds a step to a trajectory
func (*TrajectoryManager) CompleteTrajectory ¶
func (m *TrajectoryManager) CompleteTrajectory(loopID, outcome string) (agentic.Trajectory, error)
CompleteTrajectory marks a trajectory as complete
func (*TrajectoryManager) GetTrajectory ¶
func (m *TrajectoryManager) GetTrajectory(loopID string) (agentic.Trajectory, error)
GetTrajectory retrieves a trajectory by loop ID
func (*TrajectoryManager) SaveTrajectory ¶
func (m *TrajectoryManager) SaveTrajectory(_ context.Context, _ agentic.Trajectory) error
SaveTrajectory saves a trajectory to KV storage
func (*TrajectoryManager) StartTrajectory ¶
func (m *TrajectoryManager) StartTrajectory(loopID string) (agentic.Trajectory, error)
StartTrajectory starts a new trajectory for a loop