Documentation
¶
Overview ¶
Package session provides Canto's durable append-only conversation log.
A Session stores events as immutable facts and derives higher-level views from that log:
- Replayer reconstructs a Session from an event stream without re-emitting writer or subscriber side effects.
- Rebuilder reconstructs the canonical post-compaction or projection prompt view from durable snapshots plus later appended events.
- MessageAdded events are conversational transcript. ContextAdded events are model-visible context, replayed as ordinary user-role context rather than privileged instructions. Stable ContextAdded entries can be placed before the transcript for prompt-cache reuse. Lifecycle, handoff, approval, and audit events are durable but hidden from prompt history by default.
- ProjectionSnapshotter appends durable rebuild checkpoints on time/count policies so the rebuilder can fast-path cold starts and long transcripts.
- Messages returns the raw transcript exactly as messages were emitted.
- EffectiveMessages returns the model-visible history after durable compaction snapshots are applied.
- EffectiveEntries returns the same model-visible history together with originating event IDs and context markers when known.
- Artifact events carry durable artifact descriptors and provenance rather than embedding artifact bodies directly in the log.
- RecordArtifact and StoreArtifact provide the standard path for recording external descriptors or persisting new artifact bodies while emitting artifact_recorded events.
Forked sessions preserve lineage by minting fresh event IDs and recording fork_origin metadata that points back to the parent session and event. Compaction and projection snapshots are persisted as append-only events, so replay and resumed runs see the same effective prompt history the model saw.
Stores may also expose first-class session-tree queries through SessionTreeStore, so callers can navigate parent/child/lineage relationships without scanning copied event payloads.
Index ¶
- Constants
- func AssistantMessage(content string) llm.Message
- func AttachWriteThrough(ctx context.Context, sess *Session, store Store) func()
- func IsWorkspaceFileReferenceArtifact(ref ArtifactRef) bool
- func MarshalEventJSON(e Event) ([]byte, error)
- func MetadataFromContext(ctx context.Context) map[string]any
- func RecordArtifact(ctx context.Context, sess *Session, data ArtifactRecordedData) error
- func SystemMessage(content string) llm.Message
- func ToolMessage(name, toolID, content string) llm.Message
- func UserMessage(content string) llm.Message
- func WithMetadata(ctx context.Context, md map[string]any) context.Context
- type ArtifactRecordedData
- type ArtifactRef
- type ChildBlockedData
- type ChildCanceledData
- type ChildCompletedData
- type ChildFailedData
- type ChildMergedData
- type ChildMode
- type ChildProgressedData
- type ChildRequestedData
- type ChildRunLog
- type ChildStartedData
- type ChildStatus
- type CompactionSnapshot
- type ContextEntry
- type ContextKind
- type ContextPlacement
- type Episode
- type EpisodeCall
- type EscalationRetriedData
- type Event
- func NewArtifactRecordedEvent(sessionID string, data ArtifactRecordedData) Event
- func NewChildBlockedEvent(sessionID string, data ChildBlockedData) Event
- func NewChildCanceledEvent(sessionID string, data ChildCanceledData) Event
- func NewChildCompletedEvent(sessionID string, data ChildCompletedData) Event
- func NewChildFailedEvent(sessionID string, data ChildFailedData) Event
- func NewChildMergedEvent(sessionID string, data ChildMergedData) Event
- func NewChildProgressedEvent(sessionID string, data ChildProgressedData) Event
- func NewChildRequestedEvent(sessionID string, data ChildRequestedData) Event
- func NewChildStartedEvent(sessionID string, data ChildStartedData) Event
- func NewCompactionEvent(sessionID string, snapshot CompactionSnapshot) Event
- func NewContext(sessionID string, entry ContextEntry) Event
- func NewEscalationRetriedEvent(sessionID string, data EscalationRetriedData) Event
- func NewEvent(sessionID string, eventType EventType, data any) Event
- func NewMessage(sessionID string, msg llm.Message) Event
- func NewProjectionSnapshot(sessionID string, snapshot ProjectionSnapshot) Event
- func NewStepCompletedEvent(sessionID string, data StepCompletedData) Event
- func NewStepStartedEvent(sessionID string, data StepStartedData) Event
- func NewToolCompletedEvent(sessionID string, result ToolCompletedData) Event
- func NewToolStartedEvent(sessionID string, data ToolStartedData) Event
- func NewTurnCompletedEvent(sessionID string, data TurnCompletedData) Event
- func NewTurnStartedEvent(sessionID string, data TurnStartedData) Event
- func NewUserMessage(sessionID string, content string) Event
- func NewWaitResolvedEvent(sessionID string, data WaitData) Event
- func NewWaitStartedEvent(sessionID string, data WaitData) Event
- func UnmarshalEventJSON(data []byte) (Event, error)
- func (e Event) ArtifactRecordedData() (ArtifactRecordedData, bool, error)
- func (e Event) ChildBlockedData() (ChildBlockedData, bool, error)
- func (e Event) ChildCanceledData() (ChildCanceledData, bool, error)
- func (e Event) ChildCompletedData() (ChildCompletedData, bool, error)
- func (e Event) ChildFailedData() (ChildFailedData, bool, error)
- func (e Event) ChildMergedData() (ChildMergedData, bool, error)
- func (e Event) ChildProgressedData() (ChildProgressedData, bool, error)
- func (e Event) ChildRequestedData() (ChildRequestedData, bool, error)
- func (e Event) ChildStartedData() (ChildStartedData, bool, error)
- func (e Event) CompactionSnapshot() (CompactionSnapshot, bool, error)
- func (e Event) EscalationRetriedData() (EscalationRetriedData, bool, error)
- func (e Event) ForkOrigin() (ForkOrigin, bool, error)
- func (e Event) ProjectionSnapshot() (ProjectionSnapshot, bool, error)
- func (e Event) StepCompletedData() (StepCompletedData, bool, error)
- func (e Event) StepStartedData() (StepStartedData, bool, error)
- func (e Event) ToolCompletedData() (ToolCompletedData, bool, error)
- func (e Event) ToolStartedData() (ToolStartedData, bool, error)
- func (e Event) TurnCompletedData() (TurnCompletedData, bool, error)
- func (e Event) TurnStartedData() (TurnStartedData, bool, error)
- func (e Event) UnmarshalData(v any) error
- func (e Event) WaitData() (WaitData, bool, error)
- type EventType
- type ForkOptions
- type ForkOrigin
- type ForkStore
- type HistoryEntry
- type JSONLStore
- func (s *JSONLStore) BranchSession(_ context.Context, parent *Session, newID string, opts ForkOptions) (*Session, error)
- func (s *JSONLStore) Children(ctx context.Context, sessionID string) ([]SessionAncestry, error)
- func (s *JSONLStore) Close() error
- func (s *JSONLStore) Fork(ctx context.Context, originalID, newID string) (*Session, error)
- func (s *JSONLStore) ForkWithOptions(ctx context.Context, originalID, newID string, opts ForkOptions) (*Session, error)
- func (s *JSONLStore) Lineage(ctx context.Context, sessionID string) ([]SessionAncestry, error)
- func (s *JSONLStore) Load(ctx context.Context, sessionID string) (*Session, error)
- func (s *JSONLStore) LoadUntil(ctx context.Context, sessionID string, eventID ulid.ULID) (*Session, error)
- func (s *JSONLStore) Parent(ctx context.Context, sessionID string) (*SessionAncestry, error)
- func (s *JSONLStore) Save(ctx context.Context, e Event) error
- func (s *JSONLStore) SaveAncestry(_ context.Context, record SessionAncestry) error
- type ProjectionSnapshot
- type ProjectionSnapshotter
- type ProjectionTrigger
- type PromptCacheData
- type Rebuilder
- type Reducer
- type ReplayOption
- type Replayer
- type RunLog
- type RunTurn
- type SQLiteStore
- func (s *SQLiteStore) BranchSession(ctx context.Context, parent *Session, newID string, opts ForkOptions) (*Session, error)
- func (s *SQLiteStore) Children(ctx context.Context, sessionID string) ([]SessionAncestry, error)
- func (s *SQLiteStore) Close() error
- func (s *SQLiteStore) Fork(ctx context.Context, originalID, newID string) (*Session, error)
- func (s *SQLiteStore) ForkWithOptions(ctx context.Context, originalID, newID string, opts ForkOptions) (*Session, error)
- func (s *SQLiteStore) Lineage(ctx context.Context, sessionID string) ([]SessionAncestry, error)
- func (s *SQLiteStore) Load(ctx context.Context, sessionID string) (*Session, error)
- func (s *SQLiteStore) LoadUntil(ctx context.Context, sessionID string, eventID ulid.ULID) (*Session, error)
- func (s *SQLiteStore) Parent(ctx context.Context, sessionID string) (*SessionAncestry, error)
- func (s *SQLiteStore) Save(ctx context.Context, e Event) error
- func (s *SQLiteStore) SaveAncestry(ctx context.Context, record SessionAncestry) error
- func (s *SQLiteStore) Search(ctx context.Context, sessionID string, query string) ([]Event, error)
- type SearchStore
- type Session
- func (s *Session) All() iter.Seq[Event]
- func (s *Session) Append(ctx context.Context, e Event) error
- func (s *Session) AppendContext(ctx context.Context, entry ContextEntry) error
- func (s *Session) AppendUser(ctx context.Context, content string) error
- func (s *Session) Backward() iter.Seq[Event]
- func (s *Session) Branch(ctx context.Context, newID string, opts ForkOptions) (*Session, error)
- func (s *Session) EffectiveEntries() ([]HistoryEntry, error)
- func (s *Session) EffectiveMessages() ([]llm.Message, error)
- func (s *Session) Events() []Event
- func (s *Session) Fork(newID string) *Session
- func (s *Session) HasWatchers() bool
- func (s *Session) ID() string
- func (s *Session) IsWaiting() bool
- func (s *Session) LastAssistantMessage() (llm.Message, bool)
- func (s *Session) LastEvent() (Event, bool)
- func (s *Session) LastMessage() (llm.Message, bool)
- func (s *Session) Messages() []llm.Message
- func (s *Session) State() map[string]any
- func (s *Session) TotalCost() float64
- func (s *Session) Watch(ctx context.Context) *Subscription
- func (s *Session) WithReducer(r Reducer) *Session
- func (s *Session) WithWriter(w Writer) *Session
- type SessionAncestry
- type SessionAncestryWriter
- type SessionBranchStore
- type SessionTreeStore
- type StepCompletedData
- type StepStartedData
- type Store
- type Subscription
- type ToolCompletedData
- type ToolExecutionRecord
- type ToolHistory
- type ToolStartedData
- type TurnCompletedData
- type TurnStartedData
- type WaitData
- type Writer
Constants ¶
const ArtifactKindWorkspaceFileRef = "workspace_file_ref"
ArtifactKindWorkspaceFileRef marks durable file-reference records that should stay internal to the framework and not surface as child artifacts.
Variables ¶
This section is empty.
Functions ¶
func AssistantMessage ¶
AssistantMessage creates a plain assistant message without tool calls.
func AttachWriteThrough ¶
AttachWriteThrough subscribes to sess and saves every newly appended event to store immediately, rather than batching after the agent turn.
This is essential for long-horizon agents where a mid-turn crash would otherwise lose dozens of steps of work.
Returns a cancel function; call it to detach and release resources. Typically called just before the agent turn and deferred to cancel after.
cancel := session.AttachWriteThrough(ctx, sess, store) defer cancel() agent.Turn(ctx, sess)
func IsWorkspaceFileReferenceArtifact ¶
func IsWorkspaceFileReferenceArtifact(ref ArtifactRef) bool
IsWorkspaceFileReferenceArtifact reports whether ref is an internal framework record for a file identity seen during prompt construction.
func MarshalEventJSON ¶
MarshalEventJSON encodes an event using the durable session-log envelope.
func MetadataFromContext ¶
MetadataFromContext retrieves metadata from the context.
func RecordArtifact ¶
func RecordArtifact( ctx context.Context, sess *Session, data ArtifactRecordedData, ) error
RecordArtifact appends an artifact_recorded event for an existing durable artifact descriptor or external artifact reference.
func SystemMessage ¶
SystemMessage creates a provider-style system message. Durable session context should use ContextEntry via NewContext or AppendContext instead.
func ToolMessage ¶
ToolMessage creates a tool result message.
func UserMessage ¶
UserMessage creates a plain user message.
Types ¶
type ArtifactRecordedData ¶
type ArtifactRecordedData struct {
ChildID string `json:"child_id,omitzero"`
Artifact ArtifactRef `json:"artifact"`
SessionID string `json:"session_id,omitzero"`
}
ArtifactRecordedData records that a session or child run emitted an artifact.
type ArtifactRef ¶
type ArtifactRef = artifact.Descriptor
ArtifactRef identifies an artifact emitted by a session or child run. Storage, merge policy, and UI treatment are left to higher-level apps.
func StoreArtifact ¶
func StoreArtifact( ctx context.Context, sess *Session, store artifact.Store, data ArtifactRecordedData, body io.Reader, ) (ArtifactRef, error)
StoreArtifact persists an artifact body behind store, fills the common provenance defaults, and records the resulting descriptor in the session.
type ChildBlockedData ¶
type ChildBlockedData struct {
ChildID string `json:"child_id"`
ChildSessionID string `json:"child_session_id"`
Reason string `json:"reason"`
Metadata map[string]any `json:"metadata,omitzero"`
}
ChildBlockedData records that a child run cannot continue without input or approval.
type ChildCanceledData ¶
type ChildCanceledData struct {
ChildID string `json:"child_id"`
ChildSessionID string `json:"child_session_id"`
Reason string `json:"reason,omitzero"`
Metadata map[string]any `json:"metadata,omitzero"`
}
ChildCanceledData records that a child run was canceled.
type ChildCompletedData ¶
type ChildCompletedData struct {
ChildID string `json:"child_id"`
ChildSessionID string `json:"child_session_id"`
Summary string `json:"summary,omitzero"`
ArtifactIDs []string `json:"artifact_ids,omitzero"`
EpisodeID string `json:"episode_id,omitzero"`
Usage llm.Usage `json:"usage,omitzero"`
Metadata map[string]any `json:"metadata,omitzero"`
}
ChildCompletedData records the durable outcome of a completed child run.
type ChildFailedData ¶
type ChildFailedData struct {
ChildID string `json:"child_id"`
ChildSessionID string `json:"child_session_id"`
Error string `json:"error"`
Metadata map[string]any `json:"metadata,omitzero"`
}
ChildFailedData records that a child run failed.
type ChildMergedData ¶
type ChildMergedData struct {
ChildID string `json:"child_id"`
ChildSessionID string `json:"child_session_id"`
ArtifactIDs []string `json:"artifact_ids,omitzero"`
Note string `json:"note,omitzero"`
Metadata map[string]any `json:"metadata,omitzero"`
}
ChildMergedData records that an application merged a child outcome into parent flow.
type ChildProgressedData ¶
type ChildProgressedData struct {
ChildID string `json:"child_id"`
ChildSessionID string `json:"child_session_id"`
Status string `json:"status,omitzero"`
Message string `json:"message,omitzero"`
Metadata map[string]any `json:"metadata,omitzero"`
}
ChildProgressedData records an application-defined child status update.
type ChildRequestedData ¶
type ChildRequestedData struct {
ChildID string `json:"child_id"`
ChildSessionID string `json:"child_session_id"`
ParentEventID string `json:"parent_event_id,omitzero"`
AgentID string `json:"agent_id"`
Mode ChildMode `json:"mode"`
Task string `json:"task"`
Context string `json:"context,omitzero"`
Metadata map[string]any `json:"metadata,omitzero"`
}
ChildRequestedData records a parent request for a child run.
type ChildRunLog ¶
type ChildRunLog struct {
ChildID string `json:"child_id"`
SessionID string `json:"session_id"`
AgentID string `json:"agent_id"`
Mode ChildMode `json:"mode"`
Status ChildStatus `json:"status"`
Summary string `json:"summary,omitzero"`
Artifacts []ArtifactRef `json:"artifacts,omitzero"`
Run *RunLog `json:"run,omitzero"`
Metadata map[string]any `json:"metadata,omitzero"`
}
ChildRunLog records a child run linked from a parent session.
type ChildStartedData ¶
type ChildStartedData struct {
ChildID string `json:"child_id"`
ChildSessionID string `json:"child_session_id"`
AgentID string `json:"agent_id"`
Metadata map[string]any `json:"metadata,omitzero"`
}
ChildStartedData records that a child run has started execution.
type ChildStatus ¶
type ChildStatus string
ChildStatus captures the high-level lifecycle state of a child run.
const ( ChildStatusRequested ChildStatus = "requested" ChildStatusRunning ChildStatus = "running" ChildStatusBlocked ChildStatus = "blocked" ChildStatusCompleted ChildStatus = "completed" ChildStatusFailed ChildStatus = "failed" ChildStatusCanceled ChildStatus = "canceled" ChildStatusMerged ChildStatus = "merged" )
type CompactionSnapshot ¶
type CompactionSnapshot struct {
Strategy string `json:"strategy"`
MaxTokens int `json:"max_tokens,omitzero"`
ThresholdPct float64 `json:"threshold_pct,omitzero"`
CurrentTokens int `json:"current_tokens,omitzero"`
CutoffEventID string `json:"cutoff_event_id,omitzero"`
Entries []HistoryEntry `json:"entries,omitzero"`
Messages []llm.Message `json:"messages,omitzero"`
// ReadFiles tracks file paths the agent read during this compaction window.
ReadFiles []string `json:"read_files,omitzero"`
// ModifiedFiles tracks file paths the agent edited or wrote during this
// compaction window.
ModifiedFiles []string `json:"modified_files,omitzero"`
}
CompactionSnapshot captures the model-visible history after a compaction step.
type ContextEntry ¶
type ContextEntry struct {
Kind ContextKind `json:"kind,omitzero"`
Placement ContextPlacement `json:"placement,omitzero"`
Content string `json:"content"`
}
ContextEntry is durable, model-visible context. It is replayed into prompt history as ordinary user-role context, never as a system/developer message.
type ContextKind ¶
type ContextKind string
ContextKind identifies model-visible context that is not conversational transcript and must not be treated as privileged instructions.
const ( ContextKindGeneric ContextKind = "generic" ContextKindBootstrap ContextKind = "bootstrap" ContextKindHarness ContextKind = "harness" ContextKindSummary ContextKind = "summary" ContextKindWorkingSet ContextKind = "working_set" )
type ContextPlacement ¶
type ContextPlacement string
ContextPlacement controls where durable context is placed in the model-visible request relative to the conversational transcript.
const ( // ContextPlacementHistory replays context in the ordinary history suffix. ContextPlacementHistory ContextPlacement = "history" // ContextPlacementPrefix replays stable context before the transcript so // common prompt-cache prefixes survive ordinary turn growth. ContextPlacementPrefix ContextPlacement = "prefix" )
type Episode ¶
type Episode struct {
ID string `json:"id"`
SessionID string `json:"session_id"`
AgentID string `json:"agent_id"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
Conclusion string `json:"conclusion"` // last assistant message without tool calls
Calls []EpisodeCall `json:"calls,omitzero"`
TotalCost float64 `json:"total_cost"`
Metadata map[string]any `json:"metadata,omitzero"`
}
Episode is a compressed record of a completed agent run. It captures only the signal: successful tool call pairs and the final conclusion. Orchestrators retrieve episodes from archival memory rather than full session logs, keeping coordination practical at scale.
type EpisodeCall ¶
type EpisodeCall struct {
Tool string `json:"tool"`
Args string `json:"args"`
Result string `json:"result"`
}
EpisodeCall is a single successful tool invocation captured in an Episode.
type EscalationRetriedData ¶
type EscalationRetriedData struct {
AgentID string `json:"agent_id"`
Scope string `json:"scope"`
Target string `json:"target,omitzero"`
Attempt int `json:"attempt"`
Error string `json:"error"`
}
EscalationRetriedData records a recoverable loop error that was withheld from the caller and retried internally.
type Event ¶
type Event struct {
ID ulid.ULID `json:"id"`
SessionID string `json:"session_id"`
Type EventType `json:"type"`
Timestamp time.Time `json:"timestamp"`
Data jsontext.Value `json:"data"`
Metadata map[string]any `json:"metadata,omitzero"`
Cost float64 `json:"cost,omitzero"`
// contains filtered or unexported fields
}
Event is a single append-only fact in a session.
func NewArtifactRecordedEvent ¶
func NewArtifactRecordedEvent(sessionID string, data ArtifactRecordedData) Event
func NewChildBlockedEvent ¶
func NewChildBlockedEvent(sessionID string, data ChildBlockedData) Event
func NewChildCanceledEvent ¶
func NewChildCanceledEvent(sessionID string, data ChildCanceledData) Event
func NewChildCompletedEvent ¶
func NewChildCompletedEvent(sessionID string, data ChildCompletedData) Event
func NewChildFailedEvent ¶
func NewChildFailedEvent(sessionID string, data ChildFailedData) Event
func NewChildMergedEvent ¶
func NewChildMergedEvent(sessionID string, data ChildMergedData) Event
func NewChildProgressedEvent ¶
func NewChildProgressedEvent(sessionID string, data ChildProgressedData) Event
func NewChildRequestedEvent ¶
func NewChildRequestedEvent(sessionID string, data ChildRequestedData) Event
func NewChildStartedEvent ¶
func NewChildStartedEvent(sessionID string, data ChildStartedData) Event
func NewCompactionEvent ¶
func NewCompactionEvent(sessionID string, snapshot CompactionSnapshot) Event
NewCompactionEvent records a durable compaction snapshot in the session log.
func NewContext ¶
func NewContext(sessionID string, entry ContextEntry) Event
NewContext creates a context-added event.
func NewEscalationRetriedEvent ¶
func NewEscalationRetriedEvent(sessionID string, data EscalationRetriedData) Event
NewEscalationRetriedEvent records a recoverable retry inside the agent loop.
func NewMessage ¶
NewMessage creates a new message event.
func NewProjectionSnapshot ¶
func NewProjectionSnapshot(sessionID string, snapshot ProjectionSnapshot) Event
NewProjectionSnapshot records a durable projection snapshot in the session log.
func NewStepCompletedEvent ¶
func NewStepCompletedEvent(sessionID string, data StepCompletedData) Event
NewStepCompletedEvent records the durable end of a step.
func NewStepStartedEvent ¶
func NewStepStartedEvent(sessionID string, data StepStartedData) Event
NewStepStartedEvent records the durable start of a step.
func NewToolCompletedEvent ¶
func NewToolCompletedEvent(sessionID string, result ToolCompletedData) Event
NewToolCompletedEvent records the durable result of a tool execution.
func NewToolStartedEvent ¶
func NewToolStartedEvent(sessionID string, data ToolStartedData) Event
NewToolStartedEvent records the durable start of a tool call.
func NewTurnCompletedEvent ¶
func NewTurnCompletedEvent(sessionID string, data TurnCompletedData) Event
NewTurnCompletedEvent records the durable end of a turn.
func NewTurnStartedEvent ¶
func NewTurnStartedEvent(sessionID string, data TurnStartedData) Event
NewTurnStartedEvent records the durable start of a turn.
func NewUserMessage ¶
NewUserMessage creates a message-added event for a plain user message.
func NewWaitResolvedEvent ¶
NewWaitResolvedEvent records the resolution of an external wait.
func NewWaitStartedEvent ¶
NewWaitStartedEvent records the start of an external wait (HITL).
func UnmarshalEventJSON ¶
UnmarshalEventJSON decodes an event from the durable session-log envelope.
func (Event) ArtifactRecordedData ¶
func (e Event) ArtifactRecordedData() (ArtifactRecordedData, bool, error)
func (Event) ChildBlockedData ¶
func (e Event) ChildBlockedData() (ChildBlockedData, bool, error)
func (Event) ChildCanceledData ¶
func (e Event) ChildCanceledData() (ChildCanceledData, bool, error)
func (Event) ChildCompletedData ¶
func (e Event) ChildCompletedData() (ChildCompletedData, bool, error)
func (Event) ChildFailedData ¶
func (e Event) ChildFailedData() (ChildFailedData, bool, error)
func (Event) ChildMergedData ¶
func (e Event) ChildMergedData() (ChildMergedData, bool, error)
func (Event) ChildProgressedData ¶
func (e Event) ChildProgressedData() (ChildProgressedData, bool, error)
func (Event) ChildRequestedData ¶
func (e Event) ChildRequestedData() (ChildRequestedData, bool, error)
func (Event) ChildStartedData ¶
func (e Event) ChildStartedData() (ChildStartedData, bool, error)
func (Event) CompactionSnapshot ¶
func (e Event) CompactionSnapshot() (CompactionSnapshot, bool, error)
CompactionSnapshot decodes the payload of a compaction event.
func (Event) EscalationRetriedData ¶
func (e Event) EscalationRetriedData() (EscalationRetriedData, bool, error)
EscalationRetriedData decodes the payload of an escalation-retried event.
func (Event) ForkOrigin ¶
func (e Event) ForkOrigin() (ForkOrigin, bool, error)
ForkOrigin decodes the fork lineage metadata attached to a copied event.
func (Event) ProjectionSnapshot ¶
func (e Event) ProjectionSnapshot() (ProjectionSnapshot, bool, error)
ProjectionSnapshot decodes the payload of a projection snapshot event.
func (Event) StepCompletedData ¶
func (e Event) StepCompletedData() (StepCompletedData, bool, error)
StepCompletedData decodes the payload of a step-completed event.
func (Event) StepStartedData ¶
func (e Event) StepStartedData() (StepStartedData, bool, error)
StepStartedData decodes the payload of a step-started event.
func (Event) ToolCompletedData ¶
func (e Event) ToolCompletedData() (ToolCompletedData, bool, error)
ToolCompletedData decodes the payload of a tool-completed event.
func (Event) ToolStartedData ¶
func (e Event) ToolStartedData() (ToolStartedData, bool, error)
ToolStartedData decodes the payload of a tool-started event.
func (Event) TurnCompletedData ¶
func (e Event) TurnCompletedData() (TurnCompletedData, bool, error)
TurnCompletedData decodes the payload of a turn-completed event.
func (Event) TurnStartedData ¶
func (e Event) TurnStartedData() (TurnStartedData, bool, error)
TurnStartedData decodes the payload of a turn-started event.
func (Event) UnmarshalData ¶
UnmarshalData unmarshals the event's data into the given value.
type EventType ¶
type EventType string
EventType identifies the type of an event.
const ( MessageAdded EventType = "message_added" ContextAdded EventType = "context_added" Handoff EventType = "handoff" ExternalInput EventType = "external_input" // Observability / Lifecycle TurnStarted EventType = "turn_started" TurnCompleted EventType = "turn_completed" StepStarted EventType = "step_started" StepCompleted EventType = "step_completed" ToolStarted EventType = "tool_started" ToolCompleted EventType = "tool_completed" ApprovalRequested EventType = "approval_requested" ApprovalResolved EventType = "approval_resolved" ApprovalCanceled EventType = "approval_canceled" WaitStarted EventType = "wait_started" WaitResolved EventType = "wait_resolved" EscalationRetried EventType = "escalation_retried" CompactionTriggered EventType = "compaction_triggered" ProjectionSnapshotted EventType = "projection_snapshotted" ChildRequested EventType = "child_requested" ChildStarted EventType = "child_started" ChildProgressed EventType = "child_progressed" ChildBlocked EventType = "child_blocked" ChildCompleted EventType = "child_completed" ChildFailed EventType = "child_failed" ChildCanceled EventType = "child_canceled" ChildMerged EventType = "child_merged" ArtifactRecorded EventType = "artifact_recorded" // Framework Extensions ToolOutputDelta EventType = "tool_output_delta" )
type ForkOptions ¶
ForkOptions carries optional metadata for a forked session branch.
type ForkOrigin ¶
ForkOrigin identifies the parent event copied into a forked session.
type ForkStore ¶
type ForkStore interface {
ForkWithOptions(
ctx context.Context,
originalSessionID, newSessionID string,
opts ForkOptions,
) (*Session, error)
}
ForkStore materializes forked sessions from persisted parent session IDs, with optional ancestry metadata such as branch labels or fork reasons.
type HistoryEntry ¶
type HistoryEntry struct {
EventID string `json:"event_id,omitzero"`
EventType EventType `json:"event_type,omitzero"`
ContextKind ContextKind `json:"context_kind,omitzero"`
ContextPlacement ContextPlacement `json:"placement,omitzero"`
Message llm.Message `json:"message"`
Tool *ToolHistory `json:"tool,omitzero"`
}
HistoryEntry captures a model-visible message together with its originating message event ID when one exists.
type JSONLStore ¶
type JSONLStore struct {
// contains filtered or unexported fields
}
JSONLStore is a file-backed store that saves events as JSON lines.
func NewJSONLStore ¶
func NewJSONLStore(dir string) (*JSONLStore, error)
NewJSONLStore creates a new JSONL store rooted at dir.
func (*JSONLStore) BranchSession ¶
func (s *JSONLStore) BranchSession( _ context.Context, parent *Session, newID string, opts ForkOptions, ) (*Session, error)
BranchSession creates a persisted child branch from the current in-memory parent session, preserving copied history and ancestry metadata on disk.
func (*JSONLStore) Children ¶
func (s *JSONLStore) Children(ctx context.Context, sessionID string) ([]SessionAncestry, error)
Children lists the persisted ancestry records for direct children of sessionID.
func (*JSONLStore) Close ¶
func (s *JSONLStore) Close() error
Close releases the underlying filesystem root.
func (*JSONLStore) Fork ¶
Fork creates a new session by copying all events from an existing session.
func (*JSONLStore) ForkWithOptions ¶
func (s *JSONLStore) ForkWithOptions( ctx context.Context, originalID, newID string, opts ForkOptions, ) (*Session, error)
ForkWithOptions creates a new session by copying all events from an existing session and records session-level ancestry metadata in the JSONL index.
func (*JSONLStore) Lineage ¶
func (s *JSONLStore) Lineage(ctx context.Context, sessionID string) ([]SessionAncestry, error)
Lineage returns the root-to-current ancestry chain for sessionID.
func (*JSONLStore) LoadUntil ¶
func (s *JSONLStore) LoadUntil( ctx context.Context, sessionID string, eventID ulid.ULID, ) (*Session, error)
LoadUntil loads a session up to (and including) the given event ID.
func (*JSONLStore) Parent ¶
func (s *JSONLStore) Parent(ctx context.Context, sessionID string) (*SessionAncestry, error)
Parent returns the persisted ancestry record for the parent of sessionID.
func (*JSONLStore) Save ¶
func (s *JSONLStore) Save(ctx context.Context, e Event) error
Save appends an event to the session file.
func (*JSONLStore) SaveAncestry ¶
func (s *JSONLStore) SaveAncestry(_ context.Context, record SessionAncestry) error
SaveAncestry persists existing ancestry metadata for portable session imports.
type ProjectionSnapshot ¶
type ProjectionSnapshot = CompactionSnapshot
ProjectionSnapshot reuses the durable compaction snapshot payload shape for time/count rebuild checkpoints. Projection snapshots are a durable acceleration layer beside replay, not a separate mutable state model.
type ProjectionSnapshotter ¶
type ProjectionSnapshotter struct {
MaxEvents int
MaxAge time.Duration
Rebuilder *Rebuilder
Now func() time.Time
}
ProjectionSnapshotter appends durable rebuild checkpoints when a session has grown large enough by event count or wall-clock age.
func NewProjectionSnapshotter ¶
func NewProjectionSnapshotter() *ProjectionSnapshotter
NewProjectionSnapshotter creates a snapshotter with the default rebuilder.
func (*ProjectionSnapshotter) Snapshot ¶
Snapshot appends a durable projection snapshot unconditionally.
func (*ProjectionSnapshotter) SnapshotIfNeeded ¶
func (s *ProjectionSnapshotter) SnapshotIfNeeded( ctx context.Context, sess *Session, ) (bool, error)
SnapshotIfNeeded appends a durable projection snapshot when the configured count or age policy says the current session should be checkpointed.
type ProjectionTrigger ¶
type ProjectionTrigger string
ProjectionTrigger records why a projection snapshot was taken.
const ( ProjectionTriggerCount ProjectionTrigger = "count" ProjectionTriggerTime ProjectionTrigger = "time" ProjectionTriggerManual ProjectionTrigger = "manual" )
type PromptCacheData ¶
type PromptCacheData struct {
PrefixHash string `json:"prefix_hash,omitzero"`
ToolSchemaHash string `json:"tool_schema_hash,omitzero"`
}
PromptCacheData captures the stable pieces that should influence prefix-cache reuse.
type Rebuilder ¶
type Rebuilder struct {
FilesLimit int
}
Rebuilder standardizes how model-visible history is reconstructed after a durable compaction or projection snapshot. It keeps the append-only event log untouched and materializes a canonical prompt view from snapshot state plus later events.
func NewRebuilder ¶
func NewRebuilder() *Rebuilder
NewRebuilder creates a Rebuilder with default limits.
func (*Rebuilder) RebuildEntries ¶
func (r *Rebuilder) RebuildEntries(sess *Session) ([]HistoryEntry, error)
RebuildEntries returns the canonical model-visible history after compaction.
type ReplayOption ¶
type ReplayOption func(*Replayer)
ReplayOption configures a Replayer.
func WithReplayReducer ¶
func WithReplayReducer(reducer Reducer) ReplayOption
WithReplayReducer configures a reducer that is updated as events are replayed.
type Replayer ¶
type Replayer struct {
// contains filtered or unexported fields
}
Replayer reconstructs session state from an event stream.
func NewReplayer ¶
func NewReplayer(opts ...ReplayOption) *Replayer
NewReplayer creates a concrete event-log replayer for session reconstruction.
func (*Replayer) Apply ¶
Apply appends a replayed event to sess without triggering write-through or subscriber side effects.
func (*Replayer) NewSession ¶
NewSession creates an empty replay target session.
type RunLog ¶
type RunLog struct {
SessionID string `json:"session_id"`
AgentID string `json:"agent_id"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
Turns []RunTurn `json:"turns"`
ChildRuns []ChildRunLog `json:"child_runs,omitzero"`
TotalCost float64 `json:"total_cost"`
Metadata map[string]any `json:"metadata,omitzero"`
}
RunLog represents a structured trace of an agent's execution. It is used for evaluation, reinforcement learning (RL) fine-tuning, and offline analysis.
type RunTurn ¶
type RunTurn struct {
TurnID string `json:"turn_id"`
Timestamp time.Time `json:"timestamp"`
Input []llm.Message `json:"input"`
InputEntries []HistoryEntry `json:"input_entries,omitzero"`
Output llm.Message `json:"output"`
ToolCalls []llm.Call `json:"tool_calls,omitzero"`
ToolResults []llm.Message `json:"tool_results,omitzero"`
Cost float64 `json:"cost"`
Metrics map[string]any `json:"metrics,omitzero"`
}
RunTurn represents a single perceive-decide-act-observe loop.
type SQLiteStore ¶
type SQLiteStore struct {
// contains filtered or unexported fields
}
SQLiteStore is a persistent store that uses SQLite for durability and FTS5 for search.
func NewSQLiteStore ¶
func NewSQLiteStore(dsn string) (*SQLiteStore, error)
NewSQLiteStore creates a new SQLite store.
func (*SQLiteStore) BranchSession ¶
func (s *SQLiteStore) BranchSession( ctx context.Context, parent *Session, newID string, opts ForkOptions, ) (*Session, error)
BranchSession creates a persisted child branch from the current in-memory parent session, preserving copied history and ancestry metadata in SQLite.
func (*SQLiteStore) Children ¶
func (s *SQLiteStore) Children(ctx context.Context, sessionID string) ([]SessionAncestry, error)
Children lists the persisted ancestry records for direct children of sessionID.
func (*SQLiteStore) Close ¶
func (s *SQLiteStore) Close() error
Close closes the database connection.
func (*SQLiteStore) Fork ¶
Fork creates a new session by copying all events from an existing session.
func (*SQLiteStore) ForkWithOptions ¶
func (s *SQLiteStore) ForkWithOptions( ctx context.Context, originalID, newID string, opts ForkOptions, ) (*Session, error)
ForkWithOptions creates a new session by copying all events from an existing session and persists session-level ancestry metadata for the child.
func (*SQLiteStore) Lineage ¶
func (s *SQLiteStore) Lineage(ctx context.Context, sessionID string) ([]SessionAncestry, error)
Lineage returns the root-to-current ancestry chain for sessionID.
func (*SQLiteStore) LoadUntil ¶
func (s *SQLiteStore) LoadUntil( ctx context.Context, sessionID string, eventID ulid.ULID, ) (*Session, error)
LoadUntil loads a session up to (and including) the given event ID.
func (*SQLiteStore) Parent ¶
func (s *SQLiteStore) Parent(ctx context.Context, sessionID string) (*SessionAncestry, error)
Parent returns the persisted ancestry record for the parent of sessionID.
func (*SQLiteStore) Save ¶
func (s *SQLiteStore) Save(ctx context.Context, e Event) error
Save persists an event to the database.
func (*SQLiteStore) SaveAncestry ¶
func (s *SQLiteStore) SaveAncestry(ctx context.Context, record SessionAncestry) error
SaveAncestry persists existing ancestry metadata for portable session imports.
type SearchStore ¶
type SearchStore interface {
Search(ctx context.Context, sessionID string, query string) ([]Event, error)
}
SearchStore exposes full-text search over persisted session events. Not every store implements this capability.
type Session ¶
type Session struct {
// contains filtered or unexported fields
}
Session is a durable container for a conversation. All state is derived from an append-only event log.
func (*Session) Append ¶
Append adds a new event to the session and notifies all subscribers. If a writer is attached, the event is persisted to the store immediately. If the context contains metadata, it is merged into the event's metadata.
func (*Session) AppendContext ¶
func (s *Session) AppendContext(ctx context.Context, entry ContextEntry) error
AppendContext appends durable model-visible context to the session.
func (*Session) AppendUser ¶
AppendUser appends a plain user message to the session.
func (*Session) Backward ¶
Backward returns an iterator over the full event log from newest to oldest.
func (*Session) Branch ¶
Branch creates a persisted child branch from the current in-memory parent session, including copied history and ancestry metadata.
func (*Session) EffectiveEntries ¶
func (s *Session) EffectiveEntries() ([]HistoryEntry, error)
EffectiveEntries returns the model-visible session history after applying the latest durable compaction or projection snapshot, together with the originating event ID for each message when known.
func (*Session) EffectiveMessages ¶
EffectiveMessages returns the model-visible session history after applying the latest durable compaction or projection snapshot, if any.
func (*Session) Fork ¶
Fork creates a new session with a new ID, copying all existing events from this session. The subscribers are not copied.
func (*Session) HasWatchers ¶
HasWatchers returns true if the session has any active Watch subscriptions.
func (*Session) IsWaiting ¶
IsWaiting returns true if the session is currently waiting for external input or approval (HITL).
func (*Session) LastAssistantMessage ¶
LastAssistantMessage returns the most recent assistant message without tool calls in the session, if any.
func (*Session) LastMessage ¶
LastMessage returns the most recent message in the session, if any.
func (*Session) Watch ¶
func (s *Session) Watch(ctx context.Context) *Subscription
Watch returns a live, buffered stream of events appended after this call.
Slow consumers drop events rather than blocking Append. Prefer this API over Subscribe when you need a live UI, indexer, or observer and call Close when you are done.
func (*Session) WithReducer ¶
WithReducer attaches a reducer to the session for state management.
func (*Session) WithWriter ¶
WithWriter attaches a writer to the session for write-through persistence.
type SessionAncestry ¶
type SessionAncestry struct {
SessionID string `json:"session_id"`
ParentSessionID string `json:"parent_session_id,omitzero"`
ForkPointEventID string `json:"fork_point_event_id,omitzero"`
BranchLabel string `json:"branch_label,omitzero"`
ForkReason string `json:"fork_reason,omitzero"`
Depth int `json:"depth"`
CreatedAt time.Time `json:"created_at"`
}
SessionAncestry records the tree relationship of a persisted session.
type SessionAncestryWriter ¶
type SessionAncestryWriter interface {
SaveAncestry(ctx context.Context, record SessionAncestry) error
}
SessionAncestryWriter persists existing ancestry metadata for portable session imports.
type SessionBranchStore ¶
type SessionBranchStore interface {
BranchSession(
ctx context.Context,
parent *Session,
newSessionID string,
opts ForkOptions,
) (*Session, error)
}
SessionBranchStore materializes a persisted child branch from the current in-memory parent session.
type SessionTreeStore ¶
type SessionTreeStore interface {
Parent(ctx context.Context, sessionID string) (*SessionAncestry, error)
Children(ctx context.Context, sessionID string) ([]SessionAncestry, error)
Lineage(ctx context.Context, sessionID string) ([]SessionAncestry, error)
}
SessionTreeStore exposes persisted session-tree queries.
type StepCompletedData ¶
type StepCompletedData struct {
AgentID string `json:"agent_id"`
Usage llm.Usage `json:"usage,omitzero"`
Error string `json:"error,omitzero"`
}
StepCompletedData records the durable end of a step.
type StepStartedData ¶
type StepStartedData struct {
AgentID string `json:"agent_id"`
Model string `json:"model"`
PromptCache PromptCacheData `json:"prompt_cache,omitzero"`
}
StepStartedData records the durable start of a step.
type Store ¶
type Store interface {
Save(ctx context.Context, e Event) error
Load(ctx context.Context, sessionID string) (*Session, error)
// LoadUntil loads a session up to (and including) the given event ID.
LoadUntil(ctx context.Context, sessionID string, eventID ulid.ULID) (*Session, error)
// Fork creates a new session by copying all events from an existing session.
Fork(ctx context.Context, originalSessionID, newSessionID string) (*Session, error)
}
Store is an interface for persisting session state.
type Subscription ¶
type Subscription struct {
// contains filtered or unexported fields
}
Subscription is a live, lossy watch over newly appended session events.
Close is idempotent and should be called when the caller is done consuming events. The runtime also attaches a best-effort cleanup so abandoned subscriptions do not permanently retain session subscribers, but callers must not rely on GC for timely cleanup.
func (*Subscription) Close ¶
func (s *Subscription) Close()
Close detaches the subscription and closes its event channel.
func (*Subscription) Events ¶
func (s *Subscription) Events() <-chan Event
Events returns the live event channel for this subscription.
type ToolCompletedData ¶
type ToolCompletedData struct {
Tool string `json:"tool"`
ID string `json:"id"`
IdempotencyKey string `json:"idempotency_key,omitzero"`
Output string `json:"output,omitzero"`
Error string `json:"error,omitzero"`
}
ToolCompletedData captures the durable outcome of a completed tool call.
type ToolExecutionRecord ¶
type ToolExecutionRecord struct {
Started ToolStartedData
Completed ToolCompletedData
}
ToolExecutionRecord summarizes durable tool lifecycle facts for one idempotency key.
func FindToolExecutionByKey ¶
func FindToolExecutionByKey( s *Session, idempotencyKey string, ) (ToolExecutionRecord, bool, error)
FindToolExecutionByKey looks up the most recent durable tool lifecycle facts for an idempotency key.
type ToolHistory ¶
type ToolHistory struct {
ID string `json:"id,omitzero"`
Name string `json:"name,omitzero"`
Arguments string `json:"args,omitzero"`
IdempotencyKey string `json:"idempotency_key,omitzero"`
IsError bool `json:"is_error,omitzero"`
Error string `json:"error,omitzero"`
}
ToolHistory carries durable tool lifecycle metadata associated with a model-visible tool-result message. It is projection metadata for hosts and UIs; provider-visible prompt construction still uses Message.
type ToolStartedData ¶
type ToolStartedData struct {
Tool string `json:"tool"`
Arguments string `json:"args"`
ID string `json:"id"`
IdempotencyKey string `json:"idempotency_key,omitzero"`
}
ToolStartedData records the durable start of a tool call.
type TurnCompletedData ¶
type TurnCompletedData struct {
AgentID string `json:"agent_id"`
Steps int `json:"steps"`
Usage llm.Usage `json:"usage,omitzero"`
TurnStopReason string `json:"turn_stop_reason,omitzero"`
Error string `json:"error,omitzero"`
}
TurnCompletedData records the durable end of a turn.
type TurnStartedData ¶
type TurnStartedData struct {
AgentID string `json:"agent_id"`
}
TurnStartedData records the durable start of a turn.
Source Files
¶
- ancestry.go
- append.go
- artifact.go
- codec.go
- context.go
- doc.go
- episode.go
- event.go
- fork.go
- history.go
- jsonl.go
- jsonl_ancestry.go
- jsonl_branch.go
- jsonl_events.go
- lifecycle.go
- message.go
- metadata.go
- projection.go
- query.go
- rebuilder.go
- rebuilder_context.go
- rebuilder_entries.go
- rebuilder_tools.go
- replayer.go
- session.go
- sqlite.go
- sqlite_ancestry.go
- sqlite_branch.go
- sqlite_load.go
- sqlite_search.go
- store.go
- subagents.go
- subscription.go
- tools.go
- trajectory.go
- trajectory_export.go
- trajectory_tree.go
- writethrough.go