session

package
v0.0.0-...-3613e4e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 14, 2026 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Overview

Package session provides Canto's durable append-only conversation log.

A Session stores events as immutable facts and derives higher-level views from that log:

  • Replayer reconstructs a Session from an event stream without re-emitting writer or subscriber side effects.
  • Rebuilder reconstructs the canonical post-compaction or projection prompt view from durable snapshots plus later appended events.
  • MessageAdded events are conversational transcript. ContextAdded events are model-visible context, replayed as ordinary user-role context rather than privileged instructions. Stable ContextAdded entries can be placed before the transcript for prompt-cache reuse. Lifecycle, handoff, approval, and audit events are durable but hidden from prompt history by default.
  • ProjectionSnapshotter appends durable rebuild checkpoints on time/count policies so the rebuilder can fast-path cold starts and long transcripts.
  • Messages returns the raw transcript exactly as messages were emitted.
  • EffectiveMessages returns the model-visible history after durable compaction snapshots are applied.
  • EffectiveEntries returns the same model-visible history together with originating event IDs and context markers when known.
  • Artifact events carry durable artifact descriptors and provenance rather than embedding artifact bodies directly in the log.
  • RecordArtifact and StoreArtifact provide the standard path for recording external descriptors or persisting new artifact bodies while emitting artifact_recorded events.

Forked sessions preserve lineage by minting fresh event IDs and recording fork_origin metadata that points back to the parent session and event. Compaction and projection snapshots are persisted as append-only events, so replay and resumed runs see the same effective prompt history the model saw.

Stores may also expose first-class session-tree queries through SessionTreeStore, so callers can navigate parent/child/lineage relationships without scanning copied event payloads.

Index

Constants

View Source
const ArtifactKindWorkspaceFileRef = "workspace_file_ref"

ArtifactKindWorkspaceFileRef marks durable file-reference records that should stay internal to the framework and not surface as child artifacts.

Variables

This section is empty.

Functions

func AssistantMessage

func AssistantMessage(content string) llm.Message

AssistantMessage creates a plain assistant message without tool calls.

func AttachWriteThrough

func AttachWriteThrough(ctx context.Context, sess *Session, store Store) func()

AttachWriteThrough subscribes to sess and saves every newly appended event to store immediately, rather than batching after the agent turn.

This is essential for long-horizon agents where a mid-turn crash would otherwise lose dozens of steps of work.

Returns a cancel function; call it to detach and release resources. Typically called just before the agent turn and deferred to cancel after.

cancel := session.AttachWriteThrough(ctx, sess, store)
defer cancel()
agent.Turn(ctx, sess)

func IsWorkspaceFileReferenceArtifact

func IsWorkspaceFileReferenceArtifact(ref ArtifactRef) bool

IsWorkspaceFileReferenceArtifact reports whether ref is an internal framework record for a file identity seen during prompt construction.

func MarshalEventJSON

func MarshalEventJSON(e Event) ([]byte, error)

MarshalEventJSON encodes an event using the durable session-log envelope.

func MetadataFromContext

func MetadataFromContext(ctx context.Context) map[string]any

MetadataFromContext retrieves metadata from the context.

func RecordArtifact

func RecordArtifact(
	ctx context.Context,
	sess *Session,
	data ArtifactRecordedData,
) error

RecordArtifact appends an artifact_recorded event for an existing durable artifact descriptor or external artifact reference.

func SystemMessage

func SystemMessage(content string) llm.Message

SystemMessage creates a provider-style system message. Durable session context should use ContextEntry via NewContext or AppendContext instead.

func ToolMessage

func ToolMessage(name, toolID, content string) llm.Message

ToolMessage creates a tool result message.

func UserMessage

func UserMessage(content string) llm.Message

UserMessage creates a plain user message.

func WithMetadata

func WithMetadata(ctx context.Context, md map[string]any) context.Context

WithMetadata attaches metadata to the context. This metadata will be automatically added to all events appended to a session using this context.

Types

type ArtifactRecordedData

type ArtifactRecordedData struct {
	ChildID   string      `json:"child_id,omitzero"`
	Artifact  ArtifactRef `json:"artifact"`
	SessionID string      `json:"session_id,omitzero"`
}

ArtifactRecordedData records that a session or child run emitted an artifact.

type ArtifactRef

type ArtifactRef = artifact.Descriptor

ArtifactRef identifies an artifact emitted by a session or child run. Storage, merge policy, and UI treatment are left to higher-level apps.

func StoreArtifact

func StoreArtifact(
	ctx context.Context,
	sess *Session,
	store artifact.Store,
	data ArtifactRecordedData,
	body io.Reader,
) (ArtifactRef, error)

StoreArtifact persists an artifact body behind store, fills the common provenance defaults, and records the resulting descriptor in the session.

type ChildBlockedData

type ChildBlockedData struct {
	ChildID        string         `json:"child_id"`
	ChildSessionID string         `json:"child_session_id"`
	Reason         string         `json:"reason"`
	Metadata       map[string]any `json:"metadata,omitzero"`
}

ChildBlockedData records that a child run cannot continue without input or approval.

type ChildCanceledData

type ChildCanceledData struct {
	ChildID        string         `json:"child_id"`
	ChildSessionID string         `json:"child_session_id"`
	Reason         string         `json:"reason,omitzero"`
	Metadata       map[string]any `json:"metadata,omitzero"`
}

ChildCanceledData records that a child run was canceled.

type ChildCompletedData

type ChildCompletedData struct {
	ChildID        string         `json:"child_id"`
	ChildSessionID string         `json:"child_session_id"`
	Summary        string         `json:"summary,omitzero"`
	ArtifactIDs    []string       `json:"artifact_ids,omitzero"`
	EpisodeID      string         `json:"episode_id,omitzero"`
	Usage          llm.Usage      `json:"usage,omitzero"`
	Metadata       map[string]any `json:"metadata,omitzero"`
}

ChildCompletedData records the durable outcome of a completed child run.

type ChildFailedData

type ChildFailedData struct {
	ChildID        string         `json:"child_id"`
	ChildSessionID string         `json:"child_session_id"`
	Error          string         `json:"error"`
	Metadata       map[string]any `json:"metadata,omitzero"`
}

ChildFailedData records that a child run failed.

type ChildMergedData

type ChildMergedData struct {
	ChildID        string         `json:"child_id"`
	ChildSessionID string         `json:"child_session_id"`
	ArtifactIDs    []string       `json:"artifact_ids,omitzero"`
	Note           string         `json:"note,omitzero"`
	Metadata       map[string]any `json:"metadata,omitzero"`
}

ChildMergedData records that an application merged a child outcome into parent flow.

type ChildMode

type ChildMode string

ChildMode identifies how a child session was created.

const (
	ChildModeFork    ChildMode = "fork"
	ChildModeHandoff ChildMode = "handoff"
	ChildModeFresh   ChildMode = "fresh"
)

type ChildProgressedData

type ChildProgressedData struct {
	ChildID        string         `json:"child_id"`
	ChildSessionID string         `json:"child_session_id"`
	Status         string         `json:"status,omitzero"`
	Message        string         `json:"message,omitzero"`
	Metadata       map[string]any `json:"metadata,omitzero"`
}

ChildProgressedData records an application-defined child status update.

type ChildRequestedData

type ChildRequestedData struct {
	ChildID         string         `json:"child_id"`
	ChildSessionID  string         `json:"child_session_id"`
	ParentEventID   string         `json:"parent_event_id,omitzero"`
	AgentID         string         `json:"agent_id"`
	Mode            ChildMode      `json:"mode"`
	Task            string         `json:"task"`
	Context         string         `json:"context,omitzero"`
	SharedPrefixKey string         `json:"shared_prefix_key,omitzero"`
	Metadata        map[string]any `json:"metadata,omitzero"`
}

ChildRequestedData records a parent request for a child run.

type ChildRunLog

type ChildRunLog struct {
	ChildID   string         `json:"child_id"`
	SessionID string         `json:"session_id"`
	AgentID   string         `json:"agent_id"`
	Mode      ChildMode      `json:"mode"`
	Status    ChildStatus    `json:"status"`
	Summary   string         `json:"summary,omitzero"`
	Artifacts []ArtifactRef  `json:"artifacts,omitzero"`
	Run       *RunLog        `json:"run,omitzero"`
	Metadata  map[string]any `json:"metadata,omitzero"`
}

ChildRunLog records a child run linked from a parent session.

type ChildStartedData

type ChildStartedData struct {
	ChildID        string         `json:"child_id"`
	ChildSessionID string         `json:"child_session_id"`
	AgentID        string         `json:"agent_id"`
	Metadata       map[string]any `json:"metadata,omitzero"`
}

ChildStartedData records that a child run has started execution.

type ChildStatus

type ChildStatus string

ChildStatus captures the high-level lifecycle state of a child run.

const (
	ChildStatusRequested ChildStatus = "requested"
	ChildStatusRunning   ChildStatus = "running"
	ChildStatusBlocked   ChildStatus = "blocked"
	ChildStatusCompleted ChildStatus = "completed"
	ChildStatusFailed    ChildStatus = "failed"
	ChildStatusCanceled  ChildStatus = "canceled"
	ChildStatusMerged    ChildStatus = "merged"
)

type CompactionSnapshot

type CompactionSnapshot struct {
	Strategy      string         `json:"strategy"`
	MaxTokens     int            `json:"max_tokens,omitzero"`
	ThresholdPct  float64        `json:"threshold_pct,omitzero"`
	CurrentTokens int            `json:"current_tokens,omitzero"`
	CutoffEventID string         `json:"cutoff_event_id,omitzero"`
	Entries       []HistoryEntry `json:"entries,omitzero"`
	Messages      []llm.Message  `json:"messages,omitzero"`
	// ReadFiles tracks file paths the agent read during this compaction window.
	ReadFiles []string `json:"read_files,omitzero"`
	// ModifiedFiles tracks file paths the agent edited or wrote during this
	// compaction window.
	ModifiedFiles []string `json:"modified_files,omitzero"`
}

CompactionSnapshot captures the model-visible history after a compaction step.

type ContextEntry

type ContextEntry struct {
	Kind      ContextKind      `json:"kind,omitzero"`
	Placement ContextPlacement `json:"placement,omitzero"`
	Content   string           `json:"content"`
}

ContextEntry is durable, model-visible context. It is replayed into prompt history as ordinary user-role context, never as a system/developer message.

type ContextKind

type ContextKind string

ContextKind identifies model-visible context that is not conversational transcript and must not be treated as privileged instructions.

const (
	ContextKindGeneric    ContextKind = "generic"
	ContextKindBootstrap  ContextKind = "bootstrap"
	ContextKindHarness    ContextKind = "harness"
	ContextKindSummary    ContextKind = "summary"
	ContextKindWorkingSet ContextKind = "working_set"
)

type ContextPlacement

type ContextPlacement string

ContextPlacement controls where durable context is placed in the model-visible request relative to the conversational transcript.

const (
	// ContextPlacementHistory replays context in the ordinary history suffix.
	ContextPlacementHistory ContextPlacement = "history"
	// ContextPlacementPrefix replays stable context before the transcript so
	// common prompt-cache prefixes survive ordinary turn growth.
	ContextPlacementPrefix ContextPlacement = "prefix"
)

type Episode

type Episode struct {
	ID         string         `json:"id"`
	SessionID  string         `json:"session_id"`
	AgentID    string         `json:"agent_id"`
	StartTime  time.Time      `json:"start_time"`
	EndTime    time.Time      `json:"end_time"`
	Conclusion string         `json:"conclusion"` // last assistant message without tool calls
	Calls      []EpisodeCall  `json:"calls,omitzero"`
	TotalCost  float64        `json:"total_cost"`
	Metadata   map[string]any `json:"metadata,omitzero"`
}

Episode is a compressed record of a completed agent run. It captures only the signal: successful tool call pairs and the final conclusion. Orchestrators retrieve episodes from archival memory rather than full session logs, keeping coordination practical at scale.

func Distill

func Distill(traj *RunLog) *Episode

Distill compresses a RunLog into an Episode by extracting only the signal: successful tool call pairs and the final textual conclusion.

func (*Episode) Text

func (ep *Episode) Text() string

Text returns the searchable text for this Episode: conclusion followed by tool names. Used as FTS5 content when storing in memory.

type EpisodeCall

type EpisodeCall struct {
	Tool   string `json:"tool"`
	Args   string `json:"args"`
	Result string `json:"result"`
}

EpisodeCall is a single successful tool invocation captured in an Episode.

type EscalationRetriedData

type EscalationRetriedData struct {
	AgentID string `json:"agent_id"`
	Scope   string `json:"scope"`
	Target  string `json:"target,omitzero"`
	Attempt int    `json:"attempt"`
	Error   string `json:"error"`
}

EscalationRetriedData records a recoverable loop error that was withheld from the caller and retried internally.

type Event

type Event struct {
	ID        ulid.ULID      `json:"id"`
	SessionID string         `json:"session_id"`
	Type      EventType      `json:"type"`
	Timestamp time.Time      `json:"timestamp"`
	Data      jsontext.Value `json:"data"`
	Metadata  map[string]any `json:"metadata,omitzero"`
	Cost      float64        `json:"cost,omitzero"`
	// contains filtered or unexported fields
}

Event is a single append-only fact in a session.

func NewArtifactRecordedEvent

func NewArtifactRecordedEvent(sessionID string, data ArtifactRecordedData) Event

func NewChildBlockedEvent

func NewChildBlockedEvent(sessionID string, data ChildBlockedData) Event

func NewChildCanceledEvent

func NewChildCanceledEvent(sessionID string, data ChildCanceledData) Event

func NewChildCompletedEvent

func NewChildCompletedEvent(sessionID string, data ChildCompletedData) Event

func NewChildFailedEvent

func NewChildFailedEvent(sessionID string, data ChildFailedData) Event

func NewChildMergedEvent

func NewChildMergedEvent(sessionID string, data ChildMergedData) Event

func NewChildProgressedEvent

func NewChildProgressedEvent(sessionID string, data ChildProgressedData) Event

func NewChildRequestedEvent

func NewChildRequestedEvent(sessionID string, data ChildRequestedData) Event

func NewChildStartedEvent

func NewChildStartedEvent(sessionID string, data ChildStartedData) Event

func NewCompactionEvent

func NewCompactionEvent(sessionID string, snapshot CompactionSnapshot) Event

NewCompactionEvent records a durable compaction snapshot in the session log.

func NewContext

func NewContext(sessionID string, entry ContextEntry) Event

NewContext creates a context-added event.

func NewEscalationRetriedEvent

func NewEscalationRetriedEvent(sessionID string, data EscalationRetriedData) Event

NewEscalationRetriedEvent records a recoverable retry inside the agent loop.

func NewEvent

func NewEvent(sessionID string, eventType EventType, data any) Event

NewEvent creates a new event with a unique ID and current timestamp.

func NewMessage

func NewMessage(sessionID string, msg llm.Message) Event

NewMessage creates a new message event.

func NewProjectionSnapshot

func NewProjectionSnapshot(sessionID string, snapshot ProjectionSnapshot) Event

NewProjectionSnapshot records a durable projection snapshot in the session log.

func NewStepCompletedEvent

func NewStepCompletedEvent(sessionID string, data StepCompletedData) Event

NewStepCompletedEvent records the durable end of a step.

func NewStepStartedEvent

func NewStepStartedEvent(sessionID string, data StepStartedData) Event

NewStepStartedEvent records the durable start of a step.

func NewToolCompletedEvent

func NewToolCompletedEvent(sessionID string, result ToolCompletedData) Event

NewToolCompletedEvent records the durable result of a tool execution.

func NewToolStartedEvent

func NewToolStartedEvent(sessionID string, data ToolStartedData) Event

NewToolStartedEvent records the durable start of a tool call.

func NewTurnCompletedEvent

func NewTurnCompletedEvent(sessionID string, data TurnCompletedData) Event

NewTurnCompletedEvent records the durable end of a turn.

func NewTurnStartedEvent

func NewTurnStartedEvent(sessionID string, data TurnStartedData) Event

NewTurnStartedEvent records the durable start of a turn.

func NewUserMessage

func NewUserMessage(sessionID string, content string) Event

NewUserMessage creates a message-added event for a plain user message.

func NewWaitResolvedEvent

func NewWaitResolvedEvent(sessionID string, data WaitData) Event

NewWaitResolvedEvent records the resolution of an external wait.

func NewWaitStartedEvent

func NewWaitStartedEvent(sessionID string, data WaitData) Event

NewWaitStartedEvent records the start of an external wait (HITL).

func UnmarshalEventJSON

func UnmarshalEventJSON(data []byte) (Event, error)

UnmarshalEventJSON decodes an event from the durable session-log envelope.

func (Event) ArtifactRecordedData

func (e Event) ArtifactRecordedData() (ArtifactRecordedData, bool, error)

func (Event) ChildBlockedData

func (e Event) ChildBlockedData() (ChildBlockedData, bool, error)

func (Event) ChildCanceledData

func (e Event) ChildCanceledData() (ChildCanceledData, bool, error)

func (Event) ChildCompletedData

func (e Event) ChildCompletedData() (ChildCompletedData, bool, error)

func (Event) ChildFailedData

func (e Event) ChildFailedData() (ChildFailedData, bool, error)

func (Event) ChildMergedData

func (e Event) ChildMergedData() (ChildMergedData, bool, error)

func (Event) ChildProgressedData

func (e Event) ChildProgressedData() (ChildProgressedData, bool, error)

func (Event) ChildRequestedData

func (e Event) ChildRequestedData() (ChildRequestedData, bool, error)

func (Event) ChildStartedData

func (e Event) ChildStartedData() (ChildStartedData, bool, error)

func (Event) CompactionSnapshot

func (e Event) CompactionSnapshot() (CompactionSnapshot, bool, error)

CompactionSnapshot decodes the payload of a compaction event.

func (Event) EscalationRetriedData

func (e Event) EscalationRetriedData() (EscalationRetriedData, bool, error)

EscalationRetriedData decodes the payload of an escalation-retried event.

func (Event) ForkOrigin

func (e Event) ForkOrigin() (ForkOrigin, bool, error)

ForkOrigin decodes the fork lineage metadata attached to a copied event.

func (Event) ProjectionSnapshot

func (e Event) ProjectionSnapshot() (ProjectionSnapshot, bool, error)

ProjectionSnapshot decodes the payload of a projection snapshot event.

func (Event) StepCompletedData

func (e Event) StepCompletedData() (StepCompletedData, bool, error)

StepCompletedData decodes the payload of a step-completed event.

func (Event) StepStartedData

func (e Event) StepStartedData() (StepStartedData, bool, error)

StepStartedData decodes the payload of a step-started event.

func (Event) ToolCompletedData

func (e Event) ToolCompletedData() (ToolCompletedData, bool, error)

ToolCompletedData decodes the payload of a tool-completed event.

func (Event) ToolStartedData

func (e Event) ToolStartedData() (ToolStartedData, bool, error)

ToolStartedData decodes the payload of a tool-started event.

func (Event) TurnCompletedData

func (e Event) TurnCompletedData() (TurnCompletedData, bool, error)

TurnCompletedData decodes the payload of a turn-completed event.

func (Event) TurnStartedData

func (e Event) TurnStartedData() (TurnStartedData, bool, error)

TurnStartedData decodes the payload of a turn-started event.

func (Event) UnmarshalData

func (e Event) UnmarshalData(v any) error

UnmarshalData unmarshals the event's data into the given value.

func (Event) WaitData

func (e Event) WaitData() (WaitData, bool, error)

WaitData decodes the payload of a wait-started or wait-resolved event.

type EventType

type EventType string

EventType identifies the type of an event.

const (
	MessageAdded  EventType = "message_added"
	ContextAdded  EventType = "context_added"
	Handoff       EventType = "handoff"
	ExternalInput EventType = "external_input"

	// Observability / Lifecycle
	TurnStarted           EventType = "turn_started"
	TurnCompleted         EventType = "turn_completed"
	StepStarted           EventType = "step_started"
	StepCompleted         EventType = "step_completed"
	ToolStarted           EventType = "tool_started"
	ToolCompleted         EventType = "tool_completed"
	ApprovalRequested     EventType = "approval_requested"
	ApprovalResolved      EventType = "approval_resolved"
	ApprovalCanceled      EventType = "approval_canceled"
	WaitStarted           EventType = "wait_started"
	WaitResolved          EventType = "wait_resolved"
	EscalationRetried     EventType = "escalation_retried"
	CompactionTriggered   EventType = "compaction_triggered"
	ProjectionSnapshotted EventType = "projection_snapshotted"
	ChildRequested        EventType = "child_requested"
	ChildStarted          EventType = "child_started"
	ChildProgressed       EventType = "child_progressed"
	ChildBlocked          EventType = "child_blocked"
	ChildCompleted        EventType = "child_completed"
	ChildFailed           EventType = "child_failed"
	ChildCanceled         EventType = "child_canceled"
	ChildMerged           EventType = "child_merged"
	ArtifactRecorded      EventType = "artifact_recorded"

	// Framework Extensions
	ToolOutputDelta EventType = "tool_output_delta"
)

type ForkOptions

type ForkOptions struct {
	BranchLabel string
	ForkReason  string
}

ForkOptions carries optional metadata for a forked session branch.

type ForkOrigin

type ForkOrigin struct {
	SessionID string `json:"session_id"`
	EventID   string `json:"event_id"`
}

ForkOrigin identifies the parent event copied into a forked session.

type ForkStore

type ForkStore interface {
	ForkWithOptions(
		ctx context.Context,
		originalSessionID, newSessionID string,
		opts ForkOptions,
	) (*Session, error)
}

ForkStore materializes forked sessions from persisted parent session IDs, with optional ancestry metadata such as branch labels or fork reasons.

type HistoryEntry

type HistoryEntry struct {
	EventID          string           `json:"event_id,omitzero"`
	EventType        EventType        `json:"event_type,omitzero"`
	ContextKind      ContextKind      `json:"context_kind,omitzero"`
	ContextPlacement ContextPlacement `json:"placement,omitzero"`
	Message          llm.Message      `json:"message"`
	Tool             *ToolHistory     `json:"tool,omitzero"`
}

HistoryEntry captures a model-visible message together with its originating message event ID when one exists.

type JSONLStore

type JSONLStore struct {
	// contains filtered or unexported fields
}

JSONLStore is a file-backed store that saves events as JSON lines.

func NewJSONLStore

func NewJSONLStore(dir string) (*JSONLStore, error)

NewJSONLStore creates a new JSONL store rooted at dir.

func (*JSONLStore) BranchSession

func (s *JSONLStore) BranchSession(
	_ context.Context,
	parent *Session,
	newID string,
	opts ForkOptions,
) (*Session, error)

BranchSession creates a persisted child branch from the current in-memory parent session, preserving copied history and ancestry metadata on disk.

func (*JSONLStore) Children

func (s *JSONLStore) Children(ctx context.Context, sessionID string) ([]SessionAncestry, error)

Children lists the persisted ancestry records for direct children of sessionID.

func (*JSONLStore) Close

func (s *JSONLStore) Close() error

Close releases the underlying filesystem root.

func (*JSONLStore) Fork

func (s *JSONLStore) Fork(ctx context.Context, originalID, newID string) (*Session, error)

Fork creates a new session by copying all events from an existing session.

func (*JSONLStore) ForkWithOptions

func (s *JSONLStore) ForkWithOptions(
	ctx context.Context,
	originalID, newID string,
	opts ForkOptions,
) (*Session, error)

ForkWithOptions creates a new session by copying all events from an existing session and records session-level ancestry metadata in the JSONL index.

func (*JSONLStore) Lineage

func (s *JSONLStore) Lineage(ctx context.Context, sessionID string) ([]SessionAncestry, error)

Lineage returns the root-to-current ancestry chain for sessionID.

func (*JSONLStore) Load

func (s *JSONLStore) Load(ctx context.Context, sessionID string) (*Session, error)

Load reads all events for a session and reconstructs it.

func (*JSONLStore) LoadUntil

func (s *JSONLStore) LoadUntil(
	ctx context.Context,
	sessionID string,
	eventID ulid.ULID,
) (*Session, error)

LoadUntil loads a session up to (and including) the given event ID.

func (*JSONLStore) Parent

func (s *JSONLStore) Parent(ctx context.Context, sessionID string) (*SessionAncestry, error)

Parent returns the persisted ancestry record for the parent of sessionID.

func (*JSONLStore) Save

func (s *JSONLStore) Save(ctx context.Context, e Event) error

Save appends an event to the session file.

func (*JSONLStore) SaveAncestry

func (s *JSONLStore) SaveAncestry(_ context.Context, record SessionAncestry) error

SaveAncestry persists existing ancestry metadata for portable session imports.

type ProjectionSnapshot

type ProjectionSnapshot = CompactionSnapshot

ProjectionSnapshot reuses the durable compaction snapshot payload shape for time/count rebuild checkpoints. Projection snapshots are a durable acceleration layer beside replay, not a separate mutable state model.

type ProjectionSnapshotter

type ProjectionSnapshotter struct {
	MaxEvents int
	MaxAge    time.Duration
	Rebuilder *Rebuilder
	Now       func() time.Time
}

ProjectionSnapshotter appends durable rebuild checkpoints when a session has grown large enough by event count or wall-clock age.

func NewProjectionSnapshotter

func NewProjectionSnapshotter() *ProjectionSnapshotter

NewProjectionSnapshotter creates a snapshotter with the default rebuilder.

func (*ProjectionSnapshotter) Snapshot

func (s *ProjectionSnapshotter) Snapshot(
	ctx context.Context,
	sess *Session,
) (bool, error)

Snapshot appends a durable projection snapshot unconditionally.

func (*ProjectionSnapshotter) SnapshotIfNeeded

func (s *ProjectionSnapshotter) SnapshotIfNeeded(
	ctx context.Context,
	sess *Session,
) (bool, error)

SnapshotIfNeeded appends a durable projection snapshot when the configured count or age policy says the current session should be checkpointed.

type ProjectionTrigger

type ProjectionTrigger string

ProjectionTrigger records why a projection snapshot was taken.

const (
	ProjectionTriggerCount  ProjectionTrigger = "count"
	ProjectionTriggerTime   ProjectionTrigger = "time"
	ProjectionTriggerManual ProjectionTrigger = "manual"
)

type PromptCacheData

type PromptCacheData struct {
	PrefixHash     string `json:"prefix_hash,omitzero"`
	ToolSchemaHash string `json:"tool_schema_hash,omitzero"`
}

PromptCacheData captures the stable pieces that should influence prefix-cache reuse.

type Rebuilder

type Rebuilder struct {
	FilesLimit int
}

Rebuilder standardizes how model-visible history is reconstructed after a durable compaction or projection snapshot. It keeps the append-only event log untouched and materializes a canonical prompt view from snapshot state plus later events.

func NewRebuilder

func NewRebuilder() *Rebuilder

NewRebuilder creates a Rebuilder with default limits.

func (*Rebuilder) RebuildEntries

func (r *Rebuilder) RebuildEntries(sess *Session) ([]HistoryEntry, error)

RebuildEntries returns the canonical model-visible history after compaction.

func (*Rebuilder) RebuildMessages

func (r *Rebuilder) RebuildMessages(sess *Session) ([]llm.Message, error)

RebuildMessages returns the rebuilt model-visible history as plain messages.

type Reducer

type Reducer func(state map[string]any, e Event) map[string]any

Reducer computes a state snapshot from a sequence of events.

type ReplayOption

type ReplayOption func(*Replayer)

ReplayOption configures a Replayer.

func WithReplayReducer

func WithReplayReducer(reducer Reducer) ReplayOption

WithReplayReducer configures a reducer that is updated as events are replayed.

type Replayer

type Replayer struct {
	// contains filtered or unexported fields
}

Replayer reconstructs session state from an event stream.

func NewReplayer

func NewReplayer(opts ...ReplayOption) *Replayer

NewReplayer creates a concrete event-log replayer for session reconstruction.

func (*Replayer) Apply

func (r *Replayer) Apply(sess *Session, e Event) error

Apply appends a replayed event to sess without triggering write-through or subscriber side effects.

func (*Replayer) NewSession

func (r *Replayer) NewSession(sessionID string) *Session

NewSession creates an empty replay target session.

func (*Replayer) Replay

func (r *Replayer) Replay(sessionID string, events iter.Seq[Event]) (*Session, error)

Replay reconstructs a session from an event stream.

type RunLog

type RunLog struct {
	SessionID string         `json:"session_id"`
	AgentID   string         `json:"agent_id"`
	StartTime time.Time      `json:"start_time"`
	EndTime   time.Time      `json:"end_time"`
	Turns     []RunTurn      `json:"turns"`
	ChildRuns []ChildRunLog  `json:"child_runs,omitzero"`
	TotalCost float64        `json:"total_cost"`
	Metadata  map[string]any `json:"metadata,omitzero"`
}

RunLog represents a structured trace of an agent's execution. It is used for evaluation, reinforcement learning (RL) fine-tuning, and offline analysis.

func ExportRun

func ExportRun(sess *Session) (*RunLog, error)

ExportRun converts a session's event log into a structured RunLog.

func ExportRunTree

func ExportRunTree(sess *Session, load func(sessionID string) (*Session, error)) (*RunLog, error)

ExportRunTree converts a session's event log into a structured RunLog and, when load is provided, recursively attaches child runs referenced by durable child lifecycle events.

type RunTurn

type RunTurn struct {
	TurnID       string         `json:"turn_id"`
	Timestamp    time.Time      `json:"timestamp"`
	Input        []llm.Message  `json:"input"`
	InputEntries []HistoryEntry `json:"input_entries,omitzero"`
	Output       llm.Message    `json:"output"`
	ToolCalls    []llm.Call     `json:"tool_calls,omitzero"`
	ToolResults  []llm.Message  `json:"tool_results,omitzero"`
	Cost         float64        `json:"cost"`
	Metrics      map[string]any `json:"metrics,omitzero"`
}

RunTurn represents a single perceive-decide-act-observe loop.

type SQLiteStore

type SQLiteStore struct {
	// contains filtered or unexported fields
}

SQLiteStore is a persistent store that uses SQLite for durability and FTS5 for search.

func NewSQLiteStore

func NewSQLiteStore(dsn string) (*SQLiteStore, error)

NewSQLiteStore creates a new SQLite store.

func (*SQLiteStore) BranchSession

func (s *SQLiteStore) BranchSession(
	ctx context.Context,
	parent *Session,
	newID string,
	opts ForkOptions,
) (*Session, error)

BranchSession creates a persisted child branch from the current in-memory parent session, preserving copied history and ancestry metadata in SQLite.

func (*SQLiteStore) Children

func (s *SQLiteStore) Children(ctx context.Context, sessionID string) ([]SessionAncestry, error)

Children lists the persisted ancestry records for direct children of sessionID.

func (*SQLiteStore) Close

func (s *SQLiteStore) Close() error

Close closes the database connection.

func (*SQLiteStore) Fork

func (s *SQLiteStore) Fork(ctx context.Context, originalID, newID string) (*Session, error)

Fork creates a new session by copying all events from an existing session.

func (*SQLiteStore) ForkWithOptions

func (s *SQLiteStore) ForkWithOptions(
	ctx context.Context,
	originalID, newID string,
	opts ForkOptions,
) (*Session, error)

ForkWithOptions creates a new session by copying all events from an existing session and persists session-level ancestry metadata for the child.

func (*SQLiteStore) Lineage

func (s *SQLiteStore) Lineage(ctx context.Context, sessionID string) ([]SessionAncestry, error)

Lineage returns the root-to-current ancestry chain for sessionID.

func (*SQLiteStore) Load

func (s *SQLiteStore) Load(ctx context.Context, sessionID string) (*Session, error)

Load reconstructs a session from the database.

func (*SQLiteStore) LoadUntil

func (s *SQLiteStore) LoadUntil(
	ctx context.Context,
	sessionID string,
	eventID ulid.ULID,
) (*Session, error)

LoadUntil loads a session up to (and including) the given event ID.

func (*SQLiteStore) Parent

func (s *SQLiteStore) Parent(ctx context.Context, sessionID string) (*SessionAncestry, error)

Parent returns the persisted ancestry record for the parent of sessionID.

func (*SQLiteStore) Save

func (s *SQLiteStore) Save(ctx context.Context, e Event) error

Save persists an event to the database.

func (*SQLiteStore) SaveAncestry

func (s *SQLiteStore) SaveAncestry(ctx context.Context, record SessionAncestry) error

SaveAncestry persists existing ancestry metadata for portable session imports.

func (*SQLiteStore) Search

func (s *SQLiteStore) Search(ctx context.Context, sessionID string, query string) ([]Event, error)

Search searches the event log using FTS5.

type SearchStore

type SearchStore interface {
	Search(ctx context.Context, sessionID string, query string) ([]Event, error)
}

SearchStore exposes full-text search over persisted session events. Not every store implements this capability.

type Session

type Session struct {
	// contains filtered or unexported fields
}

Session is a durable container for a conversation. All state is derived from an append-only event log.

func New

func New(id string) *Session

New creates a new session.

func (*Session) All

func (s *Session) All() iter.Seq[Event]

All returns an iterator over the full event log from oldest to newest.

func (*Session) Append

func (s *Session) Append(ctx context.Context, e Event) error

Append adds a new event to the session and notifies all subscribers. If a writer is attached, the event is persisted to the store immediately. If the context contains metadata, it is merged into the event's metadata.

func (*Session) AppendContext

func (s *Session) AppendContext(ctx context.Context, entry ContextEntry) error

AppendContext appends durable model-visible context to the session.

func (*Session) AppendUser

func (s *Session) AppendUser(ctx context.Context, content string) error

AppendUser appends a plain user message to the session.

func (*Session) Backward

func (s *Session) Backward() iter.Seq[Event]

Backward returns an iterator over the full event log from newest to oldest.

func (*Session) Branch

func (s *Session) Branch(
	ctx context.Context,
	newID string,
	opts ForkOptions,
) (*Session, error)

Branch creates a persisted child branch from the current in-memory parent session, including copied history and ancestry metadata.

func (*Session) EffectiveEntries

func (s *Session) EffectiveEntries() ([]HistoryEntry, error)

EffectiveEntries returns the model-visible session history after applying the latest durable compaction or projection snapshot, together with the originating event ID for each message when known.

func (*Session) EffectiveMessages

func (s *Session) EffectiveMessages() ([]llm.Message, error)

EffectiveMessages returns the model-visible session history after applying the latest durable compaction or projection snapshot, if any.

func (*Session) Events

func (s *Session) Events() []Event

Events returns the full event log.

func (*Session) Fork

func (s *Session) Fork(newID string) *Session

Fork creates a new session with a new ID, copying all existing events from this session. The subscribers are not copied.

func (*Session) HasWatchers

func (s *Session) HasWatchers() bool

HasWatchers returns true if the session has any active Watch subscriptions.

func (*Session) ID

func (s *Session) ID() string

ID returns the session identifier.

func (*Session) IsWaiting

func (s *Session) IsWaiting() bool

IsWaiting returns true if the session is currently waiting for external input or approval (HITL).

func (*Session) LastAssistantMessage

func (s *Session) LastAssistantMessage() (llm.Message, bool)

LastAssistantMessage returns the most recent assistant message without tool calls in the session, if any.

func (*Session) LastEvent

func (s *Session) LastEvent() (Event, bool)

LastEvent returns the most recent event in the session, if any.

func (*Session) LastMessage

func (s *Session) LastMessage() (llm.Message, bool)

LastMessage returns the most recent message in the session, if any.

func (*Session) Messages

func (s *Session) Messages() []llm.Message

Messages extracts all messages from the event log.

func (*Session) State

func (s *Session) State() map[string]any

State returns a snapshot of the current session state.

func (*Session) TotalCost

func (s *Session) TotalCost() float64

TotalCost returns the sum of costs across all events in the session.

func (*Session) Watch

func (s *Session) Watch(ctx context.Context) *Subscription

Watch returns a live, buffered stream of events appended after this call.

Slow consumers drop events rather than blocking Append. Prefer this API over Subscribe when you need a live UI, indexer, or observer and call Close when you are done.

func (*Session) WithReducer

func (s *Session) WithReducer(r Reducer) *Session

WithReducer attaches a reducer to the session for state management.

func (*Session) WithWriter

func (s *Session) WithWriter(w Writer) *Session

WithWriter attaches a writer to the session for write-through persistence.

type SessionAncestry

type SessionAncestry struct {
	SessionID        string    `json:"session_id"`
	ParentSessionID  string    `json:"parent_session_id,omitzero"`
	ForkPointEventID string    `json:"fork_point_event_id,omitzero"`
	BranchLabel      string    `json:"branch_label,omitzero"`
	ForkReason       string    `json:"fork_reason,omitzero"`
	Depth            int       `json:"depth"`
	CreatedAt        time.Time `json:"created_at"`
}

SessionAncestry records the tree relationship of a persisted session.

type SessionAncestryWriter

type SessionAncestryWriter interface {
	SaveAncestry(ctx context.Context, record SessionAncestry) error
}

SessionAncestryWriter persists existing ancestry metadata for portable session imports.

type SessionBranchStore

type SessionBranchStore interface {
	BranchSession(
		ctx context.Context,
		parent *Session,
		newSessionID string,
		opts ForkOptions,
	) (*Session, error)
}

SessionBranchStore materializes a persisted child branch from the current in-memory parent session.

type SessionTreeStore

type SessionTreeStore interface {
	Parent(ctx context.Context, sessionID string) (*SessionAncestry, error)
	Children(ctx context.Context, sessionID string) ([]SessionAncestry, error)
	Lineage(ctx context.Context, sessionID string) ([]SessionAncestry, error)
}

SessionTreeStore exposes persisted session-tree queries.

type StepCompletedData

type StepCompletedData struct {
	AgentID string    `json:"agent_id"`
	Usage   llm.Usage `json:"usage,omitzero"`
	Error   string    `json:"error,omitzero"`
}

StepCompletedData records the durable end of a step.

type StepStartedData

type StepStartedData struct {
	AgentID     string          `json:"agent_id"`
	Model       string          `json:"model"`
	PromptCache PromptCacheData `json:"prompt_cache,omitzero"`
}

StepStartedData records the durable start of a step.

type Store

type Store interface {
	Save(ctx context.Context, e Event) error
	Load(ctx context.Context, sessionID string) (*Session, error)
	// LoadUntil loads a session up to (and including) the given event ID.
	LoadUntil(ctx context.Context, sessionID string, eventID ulid.ULID) (*Session, error)
	// Fork creates a new session by copying all events from an existing session.
	Fork(ctx context.Context, originalSessionID, newSessionID string) (*Session, error)
}

Store is an interface for persisting session state.

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Subscription is a live, lossy watch over newly appended session events.

Close is idempotent and should be called when the caller is done consuming events. The runtime also attaches a best-effort cleanup so abandoned subscriptions do not permanently retain session subscribers, but callers must not rely on GC for timely cleanup.

func (*Subscription) Close

func (s *Subscription) Close()

Close detaches the subscription and closes its event channel.

func (*Subscription) Events

func (s *Subscription) Events() <-chan Event

Events returns the live event channel for this subscription.

type ToolCompletedData

type ToolCompletedData struct {
	Tool           string `json:"tool"`
	ID             string `json:"id"`
	IdempotencyKey string `json:"idempotency_key,omitzero"`
	Output         string `json:"output,omitzero"`
	Error          string `json:"error,omitzero"`
}

ToolCompletedData captures the durable outcome of a completed tool call.

type ToolExecutionRecord

type ToolExecutionRecord struct {
	Started   ToolStartedData
	Completed ToolCompletedData
}

ToolExecutionRecord summarizes durable tool lifecycle facts for one idempotency key.

func FindToolExecutionByKey

func FindToolExecutionByKey(
	s *Session,
	idempotencyKey string,
) (ToolExecutionRecord, bool, error)

FindToolExecutionByKey looks up the most recent durable tool lifecycle facts for an idempotency key.

type ToolHistory

type ToolHistory struct {
	ID             string `json:"id,omitzero"`
	Name           string `json:"name,omitzero"`
	Arguments      string `json:"args,omitzero"`
	IdempotencyKey string `json:"idempotency_key,omitzero"`
	IsError        bool   `json:"is_error,omitzero"`
	Error          string `json:"error,omitzero"`
}

ToolHistory carries durable tool lifecycle metadata associated with a model-visible tool-result message. It is projection metadata for hosts and UIs; provider-visible prompt construction still uses Message.

type ToolStartedData

type ToolStartedData struct {
	Tool           string `json:"tool"`
	Arguments      string `json:"args"`
	ID             string `json:"id"`
	IdempotencyKey string `json:"idempotency_key,omitzero"`
}

ToolStartedData records the durable start of a tool call.

type TurnCompletedData

type TurnCompletedData struct {
	AgentID        string    `json:"agent_id"`
	Steps          int       `json:"steps"`
	Usage          llm.Usage `json:"usage,omitzero"`
	TurnStopReason string    `json:"turn_stop_reason,omitzero"`
	Error          string    `json:"error,omitzero"`
}

TurnCompletedData records the durable end of a turn.

type TurnStartedData

type TurnStartedData struct {
	AgentID string `json:"agent_id"`
}

TurnStartedData records the durable start of a turn.

type WaitData

type WaitData struct {
	Reason string `json:"reason,omitzero"`
	// ExternalID is an optional identifier for the external process or person
	// being waited on.
	ExternalID string `json:"external_id,omitzero"`
}

WaitData is the payload for WaitStarted and WaitResolved events.

type Writer

type Writer interface {
	Save(ctx context.Context, e Event) error
}

Writer persists events to a durable store.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL