Documentation
¶
Overview ¶
Package agenticmemory provides a graph-backed agent memory processor component that manages context hydration, fact extraction, and memory checkpointing for agentic loops.
Package agenticmemory provides a graph-backed agent memory processor component that manages context hydration, fact extraction, and memory checkpointing for agentic loops.
Overview ¶
The agentic-memory processor bridges the agentic loop system with the knowledge graph, providing persistent memory capabilities for agents. It responds to context events from agentic-loop, extracts facts for long-term storage, and hydrates context when needed.
Key capabilities:
- Context hydration from knowledge graph
- LLM-assisted fact extraction
- Memory checkpointing for recovery
- Integration with agentic-loop context events
Architecture ¶
The memory processor sits between the agentic loop and the knowledge graph:
┌────────────────┐ ┌─────────────────┐ ┌──────────────┐ │ agentic-loop │────▶│ agentic-memory │────▶│ Graph │ │ │ │ (this pkg) │ │ Processor │ │ │◀────│ │ │ │ └────────────────┘ └─────────────────┘ └──────────────┘ context.compaction.* graph.mutation.* hydrate.request.* context.injected.*
Message Flow ¶
The processor handles two main flows:
**Compaction Flow** (from agentic-loop):
- agentic-loop publishes compaction_starting event
- agentic-memory extracts facts from context using LLM
- Facts published to graph.mutation.* for storage
- agentic-loop publishes compaction_complete event
- agentic-memory queries graph for relevant context
- Hydrated context published to agent.context.injected.*
**Hydration Request Flow** (explicit):
- External system publishes hydrate request to memory.hydrate.request.*
- agentic-memory queries graph based on request type
- Hydrated context published to agent.context.injected.*
Key Types ¶
**Hydrator** - Retrieves context from knowledge graph:
hydrator, err := NewHydrator(config.Hydration, graphClient) // Pre-task hydration (before loop starts) context, err := hydrator.HydratePreTask(ctx, loopID, taskDescription) // Post-compaction hydration (after context compressed) context, err := hydrator.HydratePostCompaction(ctx, loopID)
**LLMExtractor** - Extracts facts using language models:
extractor, err := NewLLMExtractor(config.Extraction, llmClient) // Extract facts from conversation content triples, err := extractor.ExtractFacts(ctx, loopID, content)
**Publisher** - Publishes results to NATS:
// Published to agent.context.injected.{loopID}
err := c.publishInjectedContext(ctx, loopID, "post_compaction", hydrated)
// Published to graph.mutation.{loopID}
err := c.publishGraphMutations(ctx, loopID, "add_triples", triples)
Configuration ¶
The processor is configured via JSON:
{
"extraction": {
"llm_assisted": {
"enabled": true,
"model": "fast",
"trigger_iteration_interval": 5,
"trigger_context_threshold": 0.8,
"max_tokens": 1000
}
},
"hydration": {
"pre_task": {
"enabled": true,
"max_context_tokens": 2000,
"include_decisions": true,
"include_files": true
},
"post_compaction": {
"enabled": true,
"reconstruct_from_checkpoint": true,
"max_recovery_tokens": 1500
}
},
"checkpoint": {
"enabled": true,
"storage_bucket": "AGENT_MEMORY_CHECKPOINTS",
"retention_days": 7
},
"stream_name": "AGENT"
}
Configuration fields:
- extraction.llm_assisted.enabled: Enable LLM-assisted fact extraction
- extraction.llm_assisted.model: Model alias for extraction (from agentic-model)
- extraction.llm_assisted.trigger_iteration_interval: Extract every N iterations
- extraction.llm_assisted.trigger_context_threshold: Extract at N% utilization
- extraction.llm_assisted.max_tokens: Max tokens for extraction request
- hydration.pre_task.enabled: Enable pre-task context hydration
- hydration.pre_task.max_context_tokens: Max tokens for pre-task context
- hydration.pre_task.include_decisions: Include past decisions
- hydration.pre_task.include_files: Include file context
- hydration.post_compaction.enabled: Enable post-compaction reconstruction
- hydration.post_compaction.reconstruct_from_checkpoint: Use checkpoints
- hydration.post_compaction.max_recovery_tokens: Max tokens for recovery
- checkpoint.enabled: Enable memory checkpointing
- checkpoint.storage_bucket: KV bucket for checkpoints
- checkpoint.retention_days: Days to retain checkpoints
- stream_name: JetStream stream name (default: "AGENT")
Ports ¶
Input ports (JetStream consumers):
- compaction_events: Context compaction events from agentic-loop (subject: agent.context.compaction.>)
- hydrate_requests: Explicit hydration requests (subject: memory.hydrate.request.*)
- entity_states: Entity state changes for reactive hydration (type: kv-watch, bucket: ENTITY_STATES)
Output ports (publishers):
- injected_context: Hydrated context for agentic-loop (subject: agent.context.injected.*)
- graph_mutations: Fact triples for graph processor (subject: graph.mutation.*)
- checkpoint_events: Checkpoint creation notifications (subject: memory.checkpoint.created.*)
Context Events ¶
The processor consumes context events from agentic-loop:
// Compaction starting - extract facts before content is lost
{
"type": "compaction_starting",
"loop_id": "loop_123",
"iteration": 5,
"utilization": 0.65
}
// Compaction complete - hydrate recovered context
{
"type": "compaction_complete",
"loop_id": "loop_123",
"iteration": 5,
"tokens_saved": 2500,
"summary": "Discussed authentication..."
}
// GC complete - logged for observability
{
"type": "gc_complete",
"loop_id": "loop_123",
"iteration": 6
}
Hydration Requests ¶
Explicit hydration can be requested:
// Pre-task hydration
{
"loop_id": "loop_123",
"type": "pre_task",
"task_description": "Implement user authentication"
}
// Post-compaction hydration
{
"loop_id": "loop_123",
"type": "post_compaction"
}
Memory Lifecycle ¶
A typical memory lifecycle for an agentic loop:
- Loop starts, pre-task hydration injects relevant context
- Loop executes, context grows with each iteration
- Context approaches limit, compaction_starting published
- agentic-memory extracts key facts before compaction
- agentic-loop compresses context, publishes compaction_complete
- agentic-memory hydrates recovered context from graph
- Process repeats as needed until loop completes
- Final checkpoint created for future reference
Quick Start ¶
Create and start the component:
config := agenticmemory.DefaultConfig() rawConfig, _ := json.Marshal(config) comp, err := agenticmemory.NewComponent(rawConfig, deps) lc := comp.(component.LifecycleComponent) lc.Initialize() lc.Start(ctx) defer lc.Stop(5 * time.Second)
Thread Safety ¶
The Component is safe for concurrent use after Start() is called:
- Event handlers run in separate goroutines
- Internal state protected by RWMutex
- Atomic counters for metrics
Error Handling ¶
Errors are categorized:
- Graph query errors: Logged, hydration returns empty context
- LLM extraction errors: Logged, facts not extracted
- Publish errors: Logged, counted as errors in metrics
- Invalid events: Logged with error counter increment
Errors don't fail the agentic loop - memory is supplementary.
Integration with agentic-loop ¶
agentic-memory integrates with agentic-loop through context events:
- agentic-loop publishes to agent.context.compaction.*
- agentic-memory consumes these events
- agentic-memory publishes to agent.context.injected.*
- agentic-loop can consume injected context for enhancement
Testing ¶
For testing, use the ConsumerNameSuffix config option:
config := agenticmemory.Config{
ConsumerNameSuffix: "test-" + t.Name(),
// ...
}
Limitations ¶
Current limitations:
- Hydration quality depends on graph content
- LLM extraction has cost implications
- No streaming support for large contexts
- Checkpoint size limited by KV bucket limits
See Also ¶
Related packages:
- processor/agentic-loop: Loop orchestration (publishes context events)
- processor/agentic-model: LLM endpoint integration (for extraction)
- graph: Knowledge graph operations
- agentic: Shared types
Index ¶
- func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
- func Register(registry RegistryInterface) error
- type CheckpointConfig
- type CheckpointEventMessage
- type Component
- func (c *Component) ConfigSchema() component.ConfigSchema
- func (c *Component) DataFlow() component.FlowMetrics
- func (c *Component) Health() component.HealthStatus
- func (c *Component) Initialize() error
- func (c *Component) InputPorts() []component.Port
- func (c *Component) Meta() component.Metadata
- func (c *Component) OutputPorts() []component.Port
- func (c *Component) Start(ctx context.Context) error
- func (c *Component) Stop(_ time.Duration) error
- type Config
- type ContextEvent
- type ExtractionConfig
- type GraphClient
- type GraphMutationMessage
- type HydrateRequest
- type HydratedContext
- type HydrationConfig
- type Hydrator
- func (h *Hydrator) FormatContext(ctx context.Context, decisionsJSON, filesJSON, toolsJSON string, maxTokens int) (string, error)
- func (h *Hydrator) HydrateForEntities(ctx context.Context, loopID string, entityIDs []string, depth int) (*HydratedContext, error)
- func (h *Hydrator) HydratePostCompaction(ctx context.Context, loopID string) (*HydratedContext, error)
- func (h *Hydrator) HydratePreTask(ctx context.Context, loopID, taskDescription string) (*HydratedContext, error)
- type InjectedContextMessage
- type LLMAssistedConfig
- type LLMClient
- type LLMExtractor
- type PostCompactionConfig
- type PreTaskConfig
- type Publisher
- type RegistryInterface
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewComponent ¶
func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
NewComponent creates a new agentic-memory processor component
func Register ¶
func Register(registry RegistryInterface) error
Register registers the agentic-memory processor component with the given registry
Types ¶
type CheckpointConfig ¶
type CheckpointConfig struct {
Enabled bool `json:"enabled" schema:"type:bool,description:Enable memory checkpointing,category:basic,default:true"`
StorageBucket string `` /* 133-byte string literal not displayed */
RetentionDays int `json:"retention_days" schema:"type:int,description:Checkpoint retention days,category:advanced,default:7"`
}
CheckpointConfig holds memory checkpoint configuration
func (*CheckpointConfig) Validate ¶
func (c *CheckpointConfig) Validate() error
Validate checks the checkpoint configuration
type CheckpointEventMessage ¶
type CheckpointEventMessage struct {
LoopID string `json:"loop_id"`
CheckpointID string `json:"checkpoint_id"`
Bucket string `json:"bucket"`
Timestamp int64 `json:"timestamp"` // Unix timestamp
}
CheckpointEventMessage represents a checkpoint creation event
type Component ¶
type Component struct {
// contains filtered or unexported fields
}
Component implements the agentic-memory processor
func (*Component) ConfigSchema ¶
func (c *Component) ConfigSchema() component.ConfigSchema
ConfigSchema returns the configuration schema
func (*Component) DataFlow ¶
func (c *Component) DataFlow() component.FlowMetrics
DataFlow returns current data flow metrics
func (*Component) Health ¶
func (c *Component) Health() component.HealthStatus
Health returns the current health status
func (*Component) Initialize ¶
Initialize prepares the component
func (*Component) InputPorts ¶
InputPorts returns configured input port definitions
func (*Component) OutputPorts ¶
OutputPorts returns configured output port definitions
type Config ¶
type Config struct {
Extraction ExtractionConfig `json:"extraction" schema:"type:object,description:Fact extraction configuration,category:basic"`
Hydration HydrationConfig `json:"hydration" schema:"type:object,description:Context hydration configuration,category:basic"`
Checkpoint CheckpointConfig `json:"checkpoint" schema:"type:object,description:Memory checkpoint configuration,category:basic"`
Ports *component.PortConfig `json:"ports,omitempty" schema:"type:ports,description:Port configuration,category:basic"`
StreamName string `json:"stream_name,omitempty" schema:"type:string,description:JetStream stream name,category:advanced,default:AGENT"`
ConsumerNameSuffix string `json:"consumer_name_suffix,omitempty" schema:"type:string,description:Consumer name suffix for uniqueness,category:advanced"`
}
Config holds configuration for agentic-memory processor component
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns default configuration for agentic-memory processor
type ContextEvent ¶
type ContextEvent struct {
Type string `json:"type"` // "compaction_starting", "compaction_complete", "gc_complete"
LoopID string `json:"loop_id"`
Iteration int `json:"iteration"`
Utilization float64 `json:"utilization,omitempty"`
TokensSaved int `json:"tokens_saved,omitempty"`
Summary string `json:"summary,omitempty"`
}
ContextEvent matches the structure from agentic-loop/handlers.go
type ExtractionConfig ¶
type ExtractionConfig struct {
LLMAssisted LLMAssistedConfig `json:"llm_assisted" schema:"type:object,description:LLM-assisted extraction configuration,category:basic"`
}
ExtractionConfig holds fact extraction configuration
type GraphClient ¶
GraphClient defines the interface for graph operations
type GraphMutationMessage ¶
type GraphMutationMessage struct {
Operation string `json:"operation"` // "add_triples", "delete_triples"
LoopID string `json:"loop_id"`
Triples []message.Triple `json:"triples"`
Timestamp int64 `json:"timestamp"` // Unix timestamp
}
GraphMutationMessage represents a graph mutation command
type HydrateRequest ¶
type HydrateRequest struct {
LoopID string `json:"loop_id"`
TaskDescription string `json:"task_description,omitempty"`
Type string `json:"type"` // "pre_task" or "post_compaction"
}
HydrateRequest represents an explicit hydration request message
type HydratedContext ¶
HydratedContext represents hydrated context with token count
type HydrationConfig ¶
type HydrationConfig struct {
PreTask PreTaskConfig `json:"pre_task" schema:"type:object,description:Pre-task hydration configuration,category:basic"`
PostCompaction PostCompactionConfig `json:"post_compaction" schema:"type:object,description:Post-compaction hydration configuration,category:basic"`
}
HydrationConfig holds context hydration configuration
type Hydrator ¶
type Hydrator struct {
// contains filtered or unexported fields
}
Hydrator provides context hydration from the knowledge graph
func NewHydrator ¶
func NewHydrator(config HydrationConfig, graphClient GraphClient) (*Hydrator, error)
NewHydrator creates a new Hydrator instance
func (*Hydrator) FormatContext ¶
func (h *Hydrator) FormatContext(ctx context.Context, decisionsJSON, filesJSON, toolsJSON string, maxTokens int) (string, error)
FormatContext formats context from graph query results
func (*Hydrator) HydrateForEntities ¶
func (h *Hydrator) HydrateForEntities(ctx context.Context, loopID string, entityIDs []string, depth int) (*HydratedContext, error)
HydrateForEntities hydrates context for specific entity IDs. This supports explicit entity-based hydration for the embedded context pattern.
func (*Hydrator) HydratePostCompaction ¶
func (h *Hydrator) HydratePostCompaction(ctx context.Context, loopID string) (*HydratedContext, error)
HydratePostCompaction reconstructs context after compaction events
func (*Hydrator) HydratePreTask ¶
func (h *Hydrator) HydratePreTask(ctx context.Context, loopID, taskDescription string) (*HydratedContext, error)
HydratePreTask injects context before task execution
type InjectedContextMessage ¶
type InjectedContextMessage struct {
LoopID string `json:"loop_id"`
Context string `json:"context"`
TokenCount int `json:"token_count"`
Source string `json:"source"` // "post_compaction", "pre_task"
Timestamp int64 `json:"timestamp"` // Unix timestamp
}
InjectedContextMessage represents context injected back to the agent loop
type LLMAssistedConfig ¶
type LLMAssistedConfig struct {
Enabled bool `json:"enabled" schema:"type:bool,description:Enable LLM-assisted fact extraction,category:basic,default:true"`
Model string `json:"model" schema:"type:string,description:Model alias for extraction,category:basic,default:fast"`
TriggerIterationInterval int `` /* 129-byte string literal not displayed */
TriggerContextThreshold float64 `` /* 143-byte string literal not displayed */
MaxTokens int `json:"max_tokens" schema:"type:int,description:Maximum tokens for extraction request,category:advanced,default:1000"`
}
LLMAssistedConfig holds LLM-assisted extraction settings
func (*LLMAssistedConfig) Validate ¶
func (l *LLMAssistedConfig) Validate() error
Validate checks the LLM-assisted extraction configuration
type LLMClient ¶
type LLMClient interface {
ExtractFacts(ctx context.Context, model string, content string, maxTokens int) ([]message.Triple, error)
}
LLMClient defines the interface for LLM operations
type LLMExtractor ¶
type LLMExtractor struct {
// contains filtered or unexported fields
}
LLMExtractor extracts semantic facts from agent responses using an LLM
func NewLLMExtractor ¶
func NewLLMExtractor(config ExtractionConfig, llmClient LLMClient) (*LLMExtractor, error)
NewLLMExtractor creates a new LLMExtractor instance
func (*LLMExtractor) ExtractFacts ¶
func (e *LLMExtractor) ExtractFacts(ctx context.Context, loopID, responseContent string) ([]message.Triple, error)
ExtractFacts extracts semantic triples from response content
type PostCompactionConfig ¶
type PostCompactionConfig struct {
Enabled bool `json:"enabled" schema:"type:bool,description:Enable post-compaction reconstruction,category:basic,default:true"`
ReconstructFromCheckpoint bool `` /* 141-byte string literal not displayed */
MaxRecoveryTokens int `json:"max_recovery_tokens" schema:"type:int,description:Maximum tokens for recovery context,category:advanced,default:1500"`
}
PostCompactionConfig holds post-compaction hydration settings
func (*PostCompactionConfig) Validate ¶
func (p *PostCompactionConfig) Validate() error
Validate checks the post-compaction hydration configuration
type PreTaskConfig ¶
type PreTaskConfig struct {
Enabled bool `json:"enabled" schema:"type:bool,description:Enable pre-task context hydration,category:basic,default:true"`
MaxContextTokens int `json:"max_context_tokens" schema:"type:int,description:Maximum tokens for pre-task context,category:advanced,default:2000"`
IncludeDecisions bool `json:"include_decisions" schema:"type:bool,description:Include past decisions in context,category:advanced,default:true"`
IncludeFiles bool `json:"include_files" schema:"type:bool,description:Include file context,category:advanced,default:true"`
}
PreTaskConfig holds pre-task hydration settings
func (*PreTaskConfig) Validate ¶
func (p *PreTaskConfig) Validate() error
Validate checks the pre-task hydration configuration
type RegistryInterface ¶
type RegistryInterface interface {
RegisterWithConfig(component.RegistrationConfig) error
}
RegistryInterface defines the minimal interface needed for registration