streaming

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 26, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package streaming is the Stage-1 namespace for response caching, formatting, stream optimisation, thinking protocol, and steering. See ../REFACTOR_PLAN.md.

Index

Constants

View Source
const DefaultMaxAge = 24 * time.Hour

DefaultMaxAge is the default maximum age of cache entries.

View Source
const DefaultMaxEntries = 1000

DefaultMaxEntries is the default maximum number of cache entries.

Variables

This section is empty.

Functions

func EstimateTokenSavings

func EstimateTokenSavings(original, formatted string) int

EstimateTokenSavings estimates the difference in token count between original and formatted text. Uses a simple ~4 chars per token heuristic.

func FixCodeFences

func FixCodeFences(text string) string

FixCodeFences counts opening/closing ``` and adds missing ones. It also ensures language labels on opening fences.

func FixMarkdown

func FixMarkdown(text string) string

FixMarkdown fixes broken markdown links and unclosed bold/italic.

func FormatChanges

func FormatChanges(result *FormattedResponse) string

FormatChanges returns a human-readable summary of the changes made.

func HashPrompt

func HashPrompt(prompt string) string

HashPrompt returns a SHA-256 hash of the normalized prompt. Normalization: lowercase, collapse whitespace.

func NormalizeLists

func NormalizeLists(text string) string

NormalizeLists converts mixed bullet styles to consistent dashes and ensures proper indentation.

func RemoveFluff

func RemoveFluff(text string) string

RemoveFluff strips filler phrases that waste tokens.

func ShouldCache

func ShouldCache(prompt string) bool

ShouldCache determines whether a prompt should be cached. It skips prompts referencing current time, volatile file contents, or one-off questions.

Types

type CacheEntry

type CacheEntry struct {
	Key        string    `json:"key"`
	Prompt     string    `json:"prompt"`
	Response   string    `json:"response"`
	Model      string    `json:"model"`
	Tokens     int       `json:"tokens"`
	CreatedAt  time.Time `json:"created_at"`
	LastHit    time.Time `json:"last_hit"`
	HitCount   int       `json:"hit_count"`
	Similarity float64   `json:"similarity"`
}

CacheEntry holds a cached LLM response.

type CacheStats

type CacheStats struct {
	Entries      int     `json:"entries"`
	HitCount     int64   `json:"hit_count"`
	MissCount    int64   `json:"miss_count"`
	HitRate      float64 `json:"hit_rate"`
	SavedTokens  int64   `json:"saved_tokens"`
	SavedCostUSD float64 `json:"saved_cost_usd"`
}

CacheStats holds statistics about the response cache.

type FormatRule

type FormatRule struct {
	Name    string
	Pattern *regexp.Regexp
	Fix     func(string) string
	Enabled bool
}

FormatRule defines a single formatting rule that can be applied to responses.

type FormattedResponse

type FormattedResponse struct {
	Original   string
	Formatted  string
	Changes    []string
	TokensDiff int
}

FormattedResponse holds the result of formatting a response.

type ResponseCache

type ResponseCache struct {
	Entries    map[string]*CacheEntry
	MaxEntries int
	MaxAge     time.Duration
	HitCount   int64
	MissCount  int64
	// contains filtered or unexported fields
}

ResponseCache caches LLM responses for identical or similar prompts.

func NewResponseCache

func NewResponseCache(maxEntries int, maxAge time.Duration) *ResponseCache

NewResponseCache creates a new ResponseCache with the given parameters. If maxEntries is 0, DefaultMaxEntries is used. If maxAge is 0, DefaultMaxAge is used.

func (*ResponseCache) EvictLRU

func (rc *ResponseCache) EvictLRU()

EvictLRU removes the least-recently-hit entry from the cache.

func (*ResponseCache) Export

func (rc *ResponseCache) Export() ([]byte, error)

Export serializes the cache to JSON bytes.

func (*ResponseCache) FormatStats

func (rc *ResponseCache) FormatStats() string

FormatStats returns a human-readable summary of cache statistics.

func (*ResponseCache) Get

func (rc *ResponseCache) Get(prompt, model string) (*CacheEntry, bool)

Get retrieves a cached response for the given prompt and model. It first tries an exact hash match, then falls back to similarity matching.

func (*ResponseCache) Import

func (rc *ResponseCache) Import(data []byte) error

Import loads cache entries from JSON bytes, merging with existing entries.

func (*ResponseCache) Invalidate

func (rc *ResponseCache) Invalidate(pattern string)

Invalidate removes entries whose prompt matches the given pattern (regex).

func (*ResponseCache) InvalidateByAge

func (rc *ResponseCache) InvalidateByAge(maxAge time.Duration)

InvalidateByAge removes entries older than maxAge.

func (*ResponseCache) Set

func (rc *ResponseCache) Set(prompt, response, model string, tokens int)

Set stores a prompt/response pair in the cache. If the cache is at capacity, it evicts the least recently used entry.

func (*ResponseCache) SimilarityMatch

func (rc *ResponseCache) SimilarityMatch(prompt string, threshold float64) (*CacheEntry, float64)

SimilarityMatch finds the most similar cached prompt above the threshold. Returns a defensive copy to prevent callers from mutating cache state.

func (*ResponseCache) Stats

func (rc *ResponseCache) Stats() CacheStats

Stats returns current cache statistics.

type ResponseFormatter

type ResponseFormatter struct {
	Rules []FormatRule
	// contains filtered or unexported fields
}

ResponseFormatter post-processes LLM responses to ensure consistent formatting, fix common issues, and enhance readability.

func NewResponseFormatter

func NewResponseFormatter() *ResponseFormatter

NewResponseFormatter creates a ResponseFormatter with built-in rules.

func (*ResponseFormatter) AddRule

func (rf *ResponseFormatter) AddRule(rule FormatRule)

AddRule adds a new formatting rule to the formatter.

func (*ResponseFormatter) DisableRule

func (rf *ResponseFormatter) DisableRule(name string)

DisableRule disables a rule by name.

func (*ResponseFormatter) EnableRule

func (rf *ResponseFormatter) EnableRule(name string)

EnableRule enables a rule by name.

func (*ResponseFormatter) Format

func (rf *ResponseFormatter) Format(response string) *FormattedResponse

Format applies all enabled rules to the response and tracks changes.

type SteeringMessage

type SteeringMessage struct {
	Content  string // user guidance text
	Priority int    // 0 = normal, 1 = high (inject immediately)
}

SteeringMessage represents a user guidance message injected into the agent loop.

type SteeringQueue

type SteeringQueue struct {
	// contains filtered or unexported fields
}

SteeringQueue allows external callers (TUI, daemon) to inject messages into the agent loop between tool execution batches. This enables users to guide the agent without interrupting ongoing work.

func NewSteeringQueue

func NewSteeringQueue() *SteeringQueue

NewSteeringQueue creates a new SteeringQueue with an initialized notify channel.

func (*SteeringQueue) Clear

func (sq *SteeringQueue) Clear()

Clear discards all pending messages.

func (*SteeringQueue) Drain

func (sq *SteeringQueue) Drain() []SteeringMessage

Drain returns all pending steering messages and clears the queue. Called from agent loop goroutine between tool batches.

func (*SteeringQueue) Enqueue

func (sq *SteeringQueue) Enqueue(msg SteeringMessage)

Enqueue adds a steering message. Thread-safe — called from TUI goroutine.

func (*SteeringQueue) HasPending

func (sq *SteeringQueue) HasPending() bool

HasPending returns true if there are queued messages.

func (*SteeringQueue) Notify

func (sq *SteeringQueue) Notify() <-chan struct{}

Notify returns the channel that is signaled when a message is enqueued. Consumers can select on this to wake up when steering input arrives.

type StreamOptimizer

type StreamOptimizer struct {
	BufferSize         int
	MinFlushInterval   time.Duration
	DeduplicateRepeats bool
	ProgressiveRender  bool
	// contains filtered or unexported fields
}

StreamOptimizer buffers, deduplicates, and progressively renders LLM streaming output to improve perceived speed and terminal rendering quality.

func NewStreamOptimizer

func NewStreamOptimizer() *StreamOptimizer

NewStreamOptimizer creates a StreamOptimizer with sensible defaults.

func (*StreamOptimizer) DetectIncomplete

func (s *StreamOptimizer) DetectIncomplete(buffer string) (complete, remainder string)

DetectIncomplete splits the buffer at the last safe break point. Safe breaks are: newline, space, period, comma. If inside an unclosed code fence, no flush happens until the fence is closed.

func (*StreamOptimizer) DetectStutter

func (s *StreamOptimizer) DetectStutter(buffer string) string

DetectStutter removes repeated content from the buffer. If the last N chars repeat the previous N chars, the duplicate is removed.

func (*StreamOptimizer) OptimizeToolOutput

func (s *StreamOptimizer) OptimizeToolOutput(output string) string

OptimizeToolOutput optimizes non-streaming tool output for display. It collapses long paths and repeated similar lines.

func (*StreamOptimizer) Process

func (s *StreamOptimizer) Process(ch <-chan string) <-chan string

Process takes raw stream chunks and returns optimized chunks that are buffered, deduplicated, and split at safe boundaries.

func (*StreamOptimizer) ProgressIndicator

func (s *StreamOptimizer) ProgressIndicator(elapsed time.Duration, charsReceived int) string

ProgressIndicator returns a progress indicator string based on elapsed time and characters received.

func (*StreamOptimizer) Reset

func (s *StreamOptimizer) Reset()

Reset clears all state and statistics.

func (*StreamOptimizer) ShouldFlush

func (s *StreamOptimizer) ShouldFlush() bool

ShouldFlush reports whether the buffer should be flushed now.

func (*StreamOptimizer) Stats

func (s *StreamOptimizer) Stats() StreamStats

Stats returns statistics about the stream processing session.

func (*StreamOptimizer) WordWrap

func (s *StreamOptimizer) WordWrap(text string, width int) string

WordWrap wraps text at word boundaries for terminal display.

type StreamStats

type StreamStats struct {
	TotalChars        int
	FlushCount        int
	AvgFlushSize      int
	Duration          time.Duration
	CharsPerSecond    float64
	DeduplicatedChars int
	BufferedChars     int
}

StreamStats holds statistics about stream processing.

type ThinkingPhase

type ThinkingPhase string

ThinkingPhase represents a phase in the agent's reasoning process.

const (
	PhaseUnderstand ThinkingPhase = "understand"
	PhasePlan       ThinkingPhase = "plan"
	PhaseExecute    ThinkingPhase = "execute"
	PhaseVerify     ThinkingPhase = "verify"
	PhaseReflect    ThinkingPhase = "reflect"
)

type ThinkingProtocol

type ThinkingProtocol struct {
	Enabled      bool
	Visible      bool
	Steps        []ThinkingStep
	CurrentPhase ThinkingPhase
	// contains filtered or unexported fields
}

ThinkingProtocol structures the agent's reasoning process, making it explicit when the agent is planning vs executing.

func NewThinkingProtocol

func NewThinkingProtocol() *ThinkingProtocol

NewThinkingProtocol creates a new ThinkingProtocol with defaults.

func (*ThinkingProtocol) AddAlternative

func (tp *ThinkingProtocol) AddAlternative(thought, alternative string)

AddAlternative records an alternative approach considered for a thought.

func (*ThinkingProtocol) AddThought

func (tp *ThinkingProtocol) AddThought(content string, confidence float64)

AddThought records a thinking step in the current phase.

func (*ThinkingProtocol) BuildThinkingPrompt

func (tp *ThinkingProtocol) BuildThinkingPrompt(task string) string

BuildThinkingPrompt generates a structured thinking prompt for a task.

func (*ThinkingProtocol) FormatThinking

func (tp *ThinkingProtocol) FormatThinking(steps []ThinkingStep) string

FormatThinking formats thinking steps for display to the user.

func (*ThinkingProtocol) GetPhaseHistory

func (tp *ThinkingProtocol) GetPhaseHistory() map[ThinkingPhase][]ThinkingStep

GetPhaseHistory returns all thinking steps grouped by phase.

func (*ThinkingProtocol) ParseThinking

func (tp *ThinkingProtocol) ParseThinking(response string) []ThinkingStep

ParseThinking extracts thinking steps from an LLM response. It looks for phase markers like "Understanding:", "Plan:", "Risks:", etc. It also handles <thinking> tags if the model supports them.

func (*ThinkingProtocol) ResetForNewTask

func (tp *ThinkingProtocol) ResetForNewTask()

ResetForNewTask clears all thinking state for a new task.

func (*ThinkingProtocol) ShouldThinkFirst

func (tp *ThinkingProtocol) ShouldThinkFirst(task string) bool

ShouldThinkFirst applies heuristics to determine if a task needs structured thinking before execution.

func (*ThinkingProtocol) StartPhase

func (tp *ThinkingProtocol) StartPhase(phase ThinkingPhase)

StartPhase transitions to a new thinking phase.

func (*ThinkingProtocol) SummarizeThinking

func (tp *ThinkingProtocol) SummarizeThinking() string

SummarizeThinking provides a one-line summary of the thinking process.

type ThinkingStep

type ThinkingStep struct {
	Phase        ThinkingPhase
	Content      string
	Confidence   float64
	Alternatives []string
	Duration     time.Duration
	Timestamp    time.Time
}

ThinkingStep represents a single step in the thinking process.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL