Documentation
¶
Overview ¶
Package cogito provides LLM-powered reasoning chains for Go.
cogito implements a Thought-Note architecture for building autonomous systems that reason and adapt.
Core Types ¶
The package is built around two core concepts:
- Thought - A reasoning context that accumulates information across pipeline steps
- Note - Atomic units of information (key-value pairs with metadata)
Creating Thoughts ¶
Use New or NewWithTrace to create thoughts:
thought := cogito.New(ctx, "analyze customer feedback") thought.SetContent(ctx, "feedback", customerMessage, "input")
Primitives ¶
Cogito provides a comprehensive set of reasoning primitives:
Decision & Analysis:
- NewDecide - Binary yes/no decisions with confidence scores
- NewAnalyze - Extract structured data into typed results
- NewCategorize - Classify into one of N categories
- NewAssess - Sentiment analysis with emotional scoring
- NewPrioritize - Rank items by specified criteria
Control Flow:
- NewSift - Semantic gate - LLM decides whether to execute wrapped processor
- NewDiscern - Semantic router - LLM classifies and routes to different processors
Reflection:
- NewReflect - Consolidate current Thought's Notes into a summary
Session Management:
- NewReset - Clear session state
- NewCompress - LLM-summarize session history to reduce tokens
- NewTruncate - Sliding window session trimming (no LLM)
Synthesis:
- NewAmplify - Iterative refinement until criteria met
- NewConverge - Parallel execution with semantic synthesis
Pipeline Helpers ¶
Cogito wraps pipz connectors for Thought processing:
- Sequence - Sequential execution
- Filter - Conditional execution
- Switch - Route to different processors
- Fallback - Try alternatives on failure
- Retry - Retry on failure
- Backoff - Retry with exponential backoff
- Timeout - Enforce time limits
- Concurrent - Run processors in parallel
- Race - Return first successful result
Provider ¶
LLM access uses a resolution hierarchy:
- Explicit parameter (.WithProvider(p))
- Context value (cogito.WithProvider(ctx, p))
- Global default (cogito.SetProvider(p))
Use SetProvider to configure the global default:
cogito.SetProvider(myProvider)
Observability ¶
Cogito emits capitan signals throughout execution. See [signals.go] for the complete list of events including ThoughtCreated, StepStarted, StepCompleted, StepFailed, NoteAdded, and NotesPublished.
Index ¶
- Variables
- func Backoff(identity pipz.Identity, processor pipz.Chainable[*Thought], maxAttempts int, ...) *pipz.Backoff[*Thought]
- func CircuitBreaker(identity pipz.Identity, processor pipz.Chainable[*Thought], ...) *pipz.CircuitBreaker[*Thought]
- func Concurrent(identity pipz.Identity, ...) *pipz.Concurrent[*Thought]
- func Do(identity pipz.Identity, fn func(context.Context, *Thought) (*Thought, error)) pipz.Processor[*Thought]
- func Effect(identity pipz.Identity, fn func(context.Context, *Thought) error) pipz.Processor[*Thought]
- func Enrich(identity pipz.Identity, fn func(context.Context, *Thought) (*Thought, error)) pipz.Processor[*Thought]
- func Fallback(identity pipz.Identity, processors ...pipz.Chainable[*Thought]) *pipz.Fallback[*Thought]
- func Filter(identity pipz.Identity, predicate func(context.Context, *Thought) bool, ...) *pipz.Filter[*Thought]
- func Gate(identity pipz.Identity, predicate func(context.Context, *Thought) bool) pipz.Processor[*Thought]
- func Handle(identity pipz.Identity, processor pipz.Chainable[*Thought], ...) *pipz.Handle[*Thought]
- func Mutate(identity pipz.Identity, fn func(context.Context, *Thought) *Thought, ...) pipz.Processor[*Thought]
- func Race(identity pipz.Identity, processors ...pipz.Chainable[*Thought]) *pipz.Race[*Thought]
- func RateLimiter(identity pipz.Identity, requestsPerSecond float64, burst int, ...) *pipz.RateLimiter[*Thought]
- func RenderNotesToContext(notes []Note) string
- func Retry(identity pipz.Identity, processor pipz.Chainable[*Thought], maxAttempts int) *pipz.Retry[*Thought]
- func Sequence(identity pipz.Identity, processors ...pipz.Chainable[*Thought]) *pipz.Sequence[*Thought]
- func SetProvider(p Provider)
- func Switch(identity pipz.Identity, condition func(context.Context, *Thought) string) *pipz.Switch[*Thought]
- func Timeout(identity pipz.Identity, processor pipz.Chainable[*Thought], ...) *pipz.Timeout[*Thought]
- func Transform(identity pipz.Identity, fn func(context.Context, *Thought) *Thought) pipz.Processor[*Thought]
- func WithProvider(ctx context.Context, p Provider) context.Context
- func WorkerPool(identity pipz.Identity, workers int, processors ...pipz.Chainable[*Thought]) *pipz.WorkerPool[*Thought]
- type Amplify
- func (a *Amplify) Close() error
- func (a *Amplify) Identity() pipz.Identity
- func (a *Amplify) Process(ctx context.Context, t *Thought) (*Thought, error)
- func (a *Amplify) Scan(t *Thought) (*AmplifyResult, error)
- func (a *Amplify) Schema() pipz.Node
- func (a *Amplify) WithCompletionTemperature(temp float32) *Amplify
- func (a *Amplify) WithMaxIterations(maxIter int) *Amplify
- func (a *Amplify) WithProvider(p Provider) *Amplify
- func (a *Amplify) WithRefinementTemperature(temp float32) *Amplify
- func (a *Amplify) WithTemperature(temp float32) *Amplify
- type AmplifyResult
- type Analyze
- func (a *Analyze[T]) Close() error
- func (a *Analyze[T]) Identity() pipz.Identity
- func (a *Analyze[T]) Process(ctx context.Context, t *Thought) (*Thought, error)
- func (a *Analyze[T]) Scan(t *Thought) (T, error)
- func (a *Analyze[T]) Schema() pipz.Node
- func (a *Analyze[T]) WithIntrospection() *Analyze[T]
- func (a *Analyze[T]) WithIntrospectionTemperature(temp float32) *Analyze[T]
- func (a *Analyze[T]) WithProvider(p Provider) *Analyze[T]
- func (a *Analyze[T]) WithReasoningTemperature(temp float32) *Analyze[T]
- func (a *Analyze[T]) WithSummaryKey(key string) *Analyze[T]
- func (a *Analyze[T]) WithTemperature(temp float32) *Analyze[T]
- type Assess
- func (s *Assess) Close() error
- func (s *Assess) Identity() pipz.Identity
- func (s *Assess) Process(ctx context.Context, t *Thought) (*Thought, error)
- func (s *Assess) Scan(t *Thought) (*zyn.SentimentResponse, error)
- func (s *Assess) Schema() pipz.Node
- func (s *Assess) WithIntrospection() *Assess
- func (s *Assess) WithIntrospectionTemperature(temp float32) *Assess
- func (s *Assess) WithProvider(p Provider) *Assess
- func (s *Assess) WithReasoningTemperature(temp float32) *Assess
- func (s *Assess) WithSummaryKey(key string) *Assess
- func (s *Assess) WithTemperature(temp float32) *Assess
- type Categorize
- func (c *Categorize) Close() error
- func (c *Categorize) Identity() pipz.Identity
- func (c *Categorize) Process(ctx context.Context, t *Thought) (*Thought, error)
- func (c *Categorize) Scan(t *Thought) (*zyn.ClassificationResponse, error)
- func (c *Categorize) Schema() pipz.Node
- func (c *Categorize) WithIntrospection() *Categorize
- func (c *Categorize) WithIntrospectionTemperature(temp float32) *Categorize
- func (c *Categorize) WithProvider(p Provider) *Categorize
- func (c *Categorize) WithReasoningTemperature(temp float32) *Categorize
- func (c *Categorize) WithSummaryKey(key string) *Categorize
- func (c *Categorize) WithTemperature(temp float32) *Categorize
- type Compress
- func (c *Compress) Close() error
- func (c *Compress) Identity() pipz.Identity
- func (c *Compress) Process(ctx context.Context, t *Thought) (*Thought, error)
- func (c *Compress) Schema() pipz.Node
- func (c *Compress) WithProvider(p Provider) *Compress
- func (c *Compress) WithSummaryKey(key string) *Compress
- func (c *Compress) WithTemperature(temp float32) *Compress
- func (c *Compress) WithThreshold(n int) *Compress
- type Converge
- func (c *Converge) AddProcessor(processor pipz.Chainable[*Thought]) *Converge
- func (c *Converge) ClearProcessors() *Converge
- func (c *Converge) Close() error
- func (c *Converge) Identity() pipz.Identity
- func (c *Converge) Process(ctx context.Context, t *Thought) (*Thought, error)
- func (c *Converge) Processors() []pipz.Chainable[*Thought]
- func (c *Converge) RemoveProcessor(identity pipz.Identity) *Converge
- func (c *Converge) Scan(t *Thought) (string, error)
- func (c *Converge) Schema() pipz.Node
- func (c *Converge) WithProvider(p Provider) *Converge
- func (c *Converge) WithSynthesisTemperature(temp float32) *Converge
- func (c *Converge) WithTemperature(temp float32) *Converge
- type Decide
- func (d *Decide) Close() error
- func (d *Decide) Identity() pipz.Identity
- func (d *Decide) Process(ctx context.Context, t *Thought) (*Thought, error)
- func (d *Decide) Scan(t *Thought) (*zyn.BinaryResponse, error)
- func (d *Decide) Schema() pipz.Node
- func (d *Decide) WithIntrospection() *Decide
- func (d *Decide) WithIntrospectionTemperature(temp float32) *Decide
- func (d *Decide) WithProvider(p Provider) *Decide
- func (d *Decide) WithReasoningTemperature(temp float32) *Decide
- func (d *Decide) WithSummaryKey(key string) *Decide
- func (d *Decide) WithTemperature(temp float32) *Decide
- type Discern
- func (d *Discern) AddRoute(category string, processor pipz.Chainable[*Thought]) *Discern
- func (d *Discern) ClearRoutes() *Discern
- func (d *Discern) Close() error
- func (d *Discern) HasRoute(category string) bool
- func (d *Discern) Identity() pipz.Identity
- func (d *Discern) Process(ctx context.Context, t *Thought) (*Thought, error)
- func (d *Discern) RemoveRoute(category string) *Discern
- func (d *Discern) Routes() map[string]pipz.Chainable[*Thought]
- func (d *Discern) Scan(t *Thought) (*zyn.ClassificationResponse, error)
- func (d *Discern) Schema() pipz.Node
- func (d *Discern) SetFallback(processor pipz.Chainable[*Thought]) *Discern
- func (d *Discern) WithIntrospection() *Discern
- func (d *Discern) WithIntrospectionTemperature(temp float32) *Discern
- func (d *Discern) WithProvider(p Provider) *Discern
- func (d *Discern) WithReasoningTemperature(temp float32) *Discern
- func (d *Discern) WithSummaryKey(key string) *Discern
- func (d *Discern) WithTemperature(temp float32) *Discern
- type Note
- type Prioritize
- func (r *Prioritize) Close() error
- func (r *Prioritize) Identity() pipz.Identity
- func (r *Prioritize) Process(ctx context.Context, t *Thought) (*Thought, error)
- func (r *Prioritize) Scan(t *Thought) (*zyn.RankingResponse, error)
- func (r *Prioritize) Schema() pipz.Node
- func (r *Prioritize) WithIntrospection() *Prioritize
- func (r *Prioritize) WithIntrospectionTemperature(temp float32) *Prioritize
- func (r *Prioritize) WithProvider(p Provider) *Prioritize
- func (r *Prioritize) WithReasoningTemperature(temp float32) *Prioritize
- func (r *Prioritize) WithSummaryKey(key string) *Prioritize
- func (r *Prioritize) WithTemperature(temp float32) *Prioritize
- type Provider
- type Reflect
- func (r *Reflect) Close() error
- func (r *Reflect) Identity() pipz.Identity
- func (r *Reflect) Process(ctx context.Context, t *Thought) (*Thought, error)
- func (r *Reflect) Schema() pipz.Node
- func (r *Reflect) WithPrompt(prompt string) *Reflect
- func (r *Reflect) WithProvider(p Provider) *Reflect
- func (r *Reflect) WithUnpublishedOnly() *Reflect
- type Reset
- type Sift
- func (s *Sift) Close() error
- func (s *Sift) Identity() pipz.Identity
- func (s *Sift) Process(ctx context.Context, t *Thought) (*Thought, error)
- func (s *Sift) Scan(t *Thought) (*zyn.BinaryResponse, error)
- func (s *Sift) Schema() pipz.Node
- func (s *Sift) SetProcessor(processor pipz.Chainable[*Thought]) *Sift
- func (s *Sift) WithIntrospection() *Sift
- func (s *Sift) WithIntrospectionTemperature(temp float32) *Sift
- func (s *Sift) WithProvider(p Provider) *Sift
- func (s *Sift) WithReasoningTemperature(temp float32) *Sift
- func (s *Sift) WithSummaryKey(key string) *Sift
- func (s *Sift) WithTemperature(temp float32) *Sift
- type Thought
- func (t *Thought) AddNote(ctx context.Context, note Note) error
- func (t *Thought) AllNotes() []Note
- func (t *Thought) Clone() *Thought
- func (t *Thought) GetBool(key string) (bool, error)
- func (t *Thought) GetContent(key string) (string, error)
- func (t *Thought) GetFloat(key string) (float64, error)
- func (t *Thought) GetInt(key string) (int, error)
- func (t *Thought) GetLatestNote() (Note, bool)
- func (t *Thought) GetMetadata(key, field string) (string, error)
- func (t *Thought) GetNote(key string) (Note, bool)
- func (t *Thought) GetUnpublishedNotes() []Note
- func (t *Thought) MarkNotesPublished(ctx context.Context)
- func (t *Thought) PublishedCount() int
- func (t *Thought) SetContent(ctx context.Context, key, content, source string) error
- func (t *Thought) SetNote(ctx context.Context, key, content, source string, metadata map[string]string) error
- func (t *Thought) SetPublishedCount(count int)
- type Truncate
- func (tr *Truncate) Close() error
- func (tr *Truncate) Identity() pipz.Identity
- func (tr *Truncate) Process(ctx context.Context, t *Thought) (*Thought, error)
- func (tr *Truncate) Schema() pipz.Node
- func (tr *Truncate) WithKeepFirst(n int) *Truncate
- func (tr *Truncate) WithKeepLast(n int) *Truncate
- func (tr *Truncate) WithThreshold(n int) *Truncate
Constants ¶
This section is empty.
Variables ¶
var ( // DefaultIntrospection controls whether primitives generate semantic summaries // after reasoning. Disabled by default for cost efficiency. Enable per-step // with WithIntrospection() or globally by setting this to true. DefaultIntrospection = false // DefaultReasoningTemperature is used for the primary LLM call in each primitive. // Defaults to deterministic (low temperature) for consistent outputs. DefaultReasoningTemperature = zyn.DefaultTemperatureDeterministic // DefaultIntrospectionTemperature is used for the transform synapse that // synthesizes semantic summaries. Defaults to creative (higher temperature) // for richer context generation. DefaultIntrospectionTemperature = zyn.DefaultTemperatureCreative )
Default configuration for cogito primitives. These can be overridden per-step using builder methods.
var ( // Thought lifecycle signals. ThoughtCreated = capitan.NewSignal( "cogito.thought.created", "New thought chain initiated with intent and trace ID", ) // Step execution signals. StepStarted = capitan.NewSignal( "cogito.step.started", "Reasoning step began execution", ) StepCompleted = capitan.NewSignal( "cogito.step.completed", "Reasoning step finished successfully", ) StepFailed = capitan.NewSignal( "cogito.step.failed", "Reasoning step encountered an error", ) // Note management signals. NoteAdded = capitan.NewSignal( "cogito.note.added", "New note added to thought context", ) NotesPublished = capitan.NewSignal( "cogito.notes.published", "Notes marked as published to LLM", ) // Introspection signals. IntrospectionCompleted = capitan.NewSignal( "cogito.introspection.completed", "Transform synapse completed semantic summary", ) // Sift signals. SiftDecided = capitan.NewSignal( "cogito.sift.decided", "Semantic gate decision made", ) // Amplify signals. AmplifyIterationCompleted = capitan.NewSignal( "cogito.amplify.iteration.completed", "Refinement iteration finished", ) AmplifyCompleted = capitan.NewSignal( "cogito.amplify.completed", "Refinement met completion criteria", ) // Converge signals. ConvergeBranchStarted = capitan.NewSignal( "cogito.converge.branch.started", "Parallel branch began execution", ) ConvergeBranchCompleted = capitan.NewSignal( "cogito.converge.branch.completed", "Parallel branch finished execution", ) ConvergeSynthesisStarted = capitan.NewSignal( "cogito.converge.synthesis.started", "Synthesis phase began", ) )
Signal definitions for cogito reasoning chain events. Signals follow the pattern: cogito.<entity>.<event>.
var ( // Thought metadata. FieldIntent = capitan.NewStringKey("intent") FieldTraceID = capitan.NewStringKey("trace_id") FieldNoteCount = capitan.NewIntKey("note_count") // Step metadata. FieldStepName = capitan.NewStringKey("step_name") FieldStepType = capitan.NewStringKey("step_type") // decide, classify, analyze, sentiment, rank FieldTemperature = capitan.NewFloat32Key("temperature") FieldProvider = capitan.NewStringKey("provider") // Note metadata. FieldNoteKey = capitan.NewStringKey("note_key") FieldNoteSource = capitan.NewStringKey("note_source") FieldContentSize = capitan.NewIntKey("content_size") // character count // Context metrics. FieldUnpublishedCount = capitan.NewIntKey("unpublished_count") FieldPublishedCount = capitan.NewIntKey("published_count") FieldContextSize = capitan.NewIntKey("context_size") // character count // Timing. FieldStepDuration = capitan.NewDurationKey("step_duration") // Error information. FieldError = capitan.NewErrorKey("error") // Decision metadata (for Sift, Amplify). FieldDecision = capitan.NewBoolKey("decision") FieldConfidence = capitan.NewFloat64Key("confidence") // Iteration metadata (for Amplify). FieldIterationCount = capitan.NewIntKey("iteration_count") // Branch metadata (for Converge). FieldBranchCount = capitan.NewIntKey("branch_count") FieldBranchName = capitan.NewStringKey("branch_name") )
Field keys for cogito event data.
var ErrNoProvider = errors.New("no provider configured: set via context, step-level, or global")
ErrNoProvider is returned when no provider can be resolved.
Functions ¶
func Backoff ¶
func Backoff(identity pipz.Identity, processor pipz.Chainable[*Thought], maxAttempts int, baseDelay time.Duration) *pipz.Backoff[*Thought]
Backoff creates a processor that retries with exponential backoff. Useful for operations that need time to recover between attempts.
Example:
resilient := cogito.Backoff(pipz.NewIdentity("api-call", "API with backoff"), apiProcessor, 5, time.Second)
func CircuitBreaker ¶
func CircuitBreaker(identity pipz.Identity, processor pipz.Chainable[*Thought], failureThreshold int, resetTimeout time.Duration) *pipz.CircuitBreaker[*Thought]
CircuitBreaker creates a processor that prevents cascade failures. Opens the circuit after failureThreshold consecutive failures.
Example:
protected := cogito.CircuitBreaker(pipz.NewIdentity("service-call", "Protected service call"), apiProcessor, 5, 30*time.Second)
func Concurrent ¶
func Concurrent(identity pipz.Identity, reducer func(original *Thought, results map[pipz.Identity]*Thought, errors map[pipz.Identity]error) *Thought, processors ...pipz.Chainable[*Thought]) *pipz.Concurrent[*Thought]
Concurrent runs all processors in parallel and returns the original thought. Each processor receives an isolated clone. Use the reducer to aggregate results.
Example:
parallel := cogito.Concurrent(pipz.NewIdentity("notify-all", "Parallel notifications"), nil,
emailNotifier,
smsNotifier,
webhookNotifier,
)
func Do ¶
func Do(identity pipz.Identity, fn func(context.Context, *Thought) (*Thought, error)) pipz.Processor[*Thought]
Do creates a processor from a custom function that can fail. This is the easiest way to add custom logic to a chain.
Example:
route := cogito.Do(pipz.NewIdentity("route-ticket", "Route tickets to queues"), func(ctx context.Context, t *cogito.Thought) (*cogito.Thought, error) {
ticketType, _ := t.GetContent("ticket_type")
if ticketType == "urgent" {
t.SetContent("queue", "urgent-queue", "route-ticket")
} else {
t.SetContent("queue", "standard-queue", "route-ticket")
}
return t, nil
})
func Effect ¶
func Effect(identity pipz.Identity, fn func(context.Context, *Thought) error) pipz.Processor[*Thought]
Effect creates a processor that performs a side effect without modifying the thought. Use this for logging, metrics, or other observational operations.
Example:
logger := cogito.Effect(pipz.NewIdentity("log-intent", "Log processing intent"), func(ctx context.Context, t *cogito.Thought) error {
log.Printf("Processing thought with intent: %s", t.Intent)
return nil
})
func Enrich ¶
func Enrich(identity pipz.Identity, fn func(context.Context, *Thought) (*Thought, error)) pipz.Processor[*Thought]
Enrich creates a processor that optionally enhances a thought. Unlike Do, errors are logged but don't stop the pipeline.
Example:
addContext := cogito.Enrich(pipz.NewIdentity("add-context", "Fetch external context"), func(ctx context.Context, t *cogito.Thought) (*cogito.Thought, error) {
extra, err := fetchExternalContext(ctx, t.Intent)
if err != nil {
return t, err // Logged but pipeline continues
}
t.SetContent("external_context", extra, "add-context")
return t, nil
})
func Fallback ¶
func Fallback(identity pipz.Identity, processors ...pipz.Chainable[*Thought]) *pipz.Fallback[*Thought]
Fallback creates a processor that tries alternatives on failure. Each processor is tried in order until one succeeds.
Example:
resilient := cogito.Fallback(pipz.NewIdentity("resilient-analysis", "Try multiple analyzers"),
primaryAnalyzer,
backupAnalyzer,
fallbackAnalyzer,
)
func Filter ¶
func Filter(identity pipz.Identity, predicate func(context.Context, *Thought) bool, processor pipz.Chainable[*Thought]) *pipz.Filter[*Thought]
Filter creates a conditional processor that either processes or passes through. When the predicate returns true, the processor is executed. When false, the thought passes through unchanged.
Example:
onlyUrgent := cogito.Filter(pipz.NewIdentity("urgent-only", "Process only urgent items"),
func(ctx context.Context, t *cogito.Thought) bool {
priority, _ := t.GetContent("priority")
return priority == "urgent"
},
urgentProcessor,
)
func Gate ¶
func Gate(identity pipz.Identity, predicate func(context.Context, *Thought) bool) pipz.Processor[*Thought]
Gate creates a simple pass/fail filter that blocks thoughts not meeting criteria. Unlike Filter which has a fallback processor, Gate simply passes through or blocks.
Example:
validOnly := cogito.Gate(pipz.NewIdentity("valid-only", "Filter invalid thoughts"), func(ctx context.Context, t *cogito.Thought) bool {
_, err := t.GetContent("required_field")
return err == nil
})
func Handle ¶
func Handle(identity pipz.Identity, processor pipz.Chainable[*Thought], errorHandler pipz.Chainable[*pipz.Error[*Thought]]) *pipz.Handle[*Thought]
Handle creates a processor that handles errors without stopping the pipeline. When the primary processor fails, the error handler is invoked for monitoring. The error handler receives a pipz.Error[*Thought] with full error context.
Example:
errorLogger := pipz.Effect(pipz.NewIdentity("log-error", "Log errors"), func(ctx context.Context, err *pipz.Error[*cogito.Thought]) error {
log.Printf("thought %s failed: %v", err.InputData.TraceID, err.Err)
return nil
})
observed := cogito.Handle(pipz.NewIdentity("observed", "Observed processor"), riskyProcessor, errorLogger)
func Mutate ¶
func Mutate(identity pipz.Identity, fn func(context.Context, *Thought) *Thought, predicate func(context.Context, *Thought) bool) pipz.Processor[*Thought]
Mutate creates a processor that conditionally modifies a thought. The modification is only applied if the predicate returns true.
Example:
prioritize := cogito.Mutate(pipz.NewIdentity("prioritize", "Set high priority for critical items"),
func(ctx context.Context, t *cogito.Thought) *cogito.Thought {
t.SetContent("priority", "high", "prioritize")
return t
},
func(ctx context.Context, t *cogito.Thought) bool {
urgency, _ := t.GetContent("urgency")
return urgency == "critical"
},
)
func Race ¶
Race runs all processors in parallel and returns the first successful result. Useful for reducing latency when multiple paths can produce the same result.
Example:
fastest := cogito.Race(pipz.NewIdentity("fastest-lookup", "First successful lookup"),
cacheProcessor,
databaseProcessor,
externalAPIProcessor,
)
func RateLimiter ¶
func RateLimiter(identity pipz.Identity, requestsPerSecond float64, burst int, processor pipz.Chainable[*Thought]) *pipz.RateLimiter[*Thought]
RateLimiter creates a processor that enforces rate limits. Useful for protecting rate-limited external services.
Example:
limited := cogito.RateLimiter(pipz.NewIdentity("api-limit", "API rate limiter"), 100, 10, apiCall) // 100/sec, burst 10
func RenderNotesToContext ¶
RenderNotesToContext converts a slice of notes to a formatted context string suitable for LLM consumption. Each note is rendered as "key: content".
func Retry ¶
func Retry(identity pipz.Identity, processor pipz.Chainable[*Thought], maxAttempts int) *pipz.Retry[*Thought]
Retry creates a processor that retries on failure up to maxAttempts times. Immediate retry without delay - for backoff, use Backoff instead.
Example:
reliable := cogito.Retry(pipz.NewIdentity("reliable-call", "Retry external service"), externalServiceCall, 3)
func Sequence ¶
func Sequence(identity pipz.Identity, processors ...pipz.Chainable[*Thought]) *pipz.Sequence[*Thought]
Sequence creates a sequential pipeline of thought processors. Each processor receives the output of the previous one.
Example:
pipeline := cogito.Sequence(pipz.NewIdentity("reasoning-chain", "Main reasoning pipeline"),
cogito.NewAnalyze("analyze", "Examine the situation"),
cogito.NewDecide("decide", "What action to take?"),
)
func SetProvider ¶
func SetProvider(p Provider)
SetProvider sets the global fallback provider. This provider is used when no context or step-level provider is available.
func Switch ¶
func Switch(identity pipz.Identity, condition func(context.Context, *Thought) string) *pipz.Switch[*Thought]
Switch creates a router that directs thoughts to different processors. The condition function returns a route key string that determines which processor handles the thought.
Example:
router := cogito.Switch(pipz.NewIdentity("intent-router", "Route by category"), func(ctx context.Context, t *cogito.Thought) string {
category, _ := t.GetContent("category")
return category
})
router.AddRoute("question", questionHandler)
router.AddRoute("command", commandHandler)
func Timeout ¶
func Timeout(identity pipz.Identity, processor pipz.Chainable[*Thought], duration time.Duration) *pipz.Timeout[*Thought]
Timeout creates a processor that enforces a time limit on execution. If the timeout expires, the operation is canceled and an error is returned.
Example:
bounded := cogito.Timeout(pipz.NewIdentity("bounded-analysis", "Time-limited analysis"), analyzer, 30*time.Second)
func Transform ¶
func Transform(identity pipz.Identity, fn func(context.Context, *Thought) *Thought) pipz.Processor[*Thought]
Transform creates a processor from a pure transformation function. Use this when your operation cannot fail.
Example:
addMetadata := cogito.Transform(pipz.NewIdentity("add-metadata", "Add timestamp"), func(ctx context.Context, t *cogito.Thought) *cogito.Thought {
t.SetContent("timestamp", time.Now().Format(time.RFC3339), "add-metadata")
return t
})
func WithProvider ¶
WithProvider adds a provider to the context. This is the preferred method for provider management.
func WorkerPool ¶
func WorkerPool(identity pipz.Identity, workers int, processors ...pipz.Chainable[*Thought]) *pipz.WorkerPool[*Thought]
WorkerPool creates a bounded parallel executor with a fixed number of workers. Useful for controlling parallelism when processing multiple thought streams.
Example:
pool := cogito.WorkerPool(pipz.NewIdentity("bounded-analysis", "Bounded worker pool"), 5,
analyzer1,
analyzer2,
analyzer3,
)
Types ¶
type Amplify ¶
type Amplify struct {
// contains filtered or unexported fields
}
Amplify is an iterative refinement primitive that implements pipz.Chainable[*Thought]. It repeatedly refines content until an LLM determines completion criteria are met.
Unlike pipz.Retry which retries on errors, Amplify iterates based on semantic quality. Each iteration transforms the content, then checks if the result meets the specified criteria.
func NewAmplify ¶
func NewAmplify(key, sourceKey, refinementPrompt, completionCriteria string, maxIterations int) *Amplify
NewAmplify creates a new iterative refinement primitive.
The primitive uses two zyn synapses per iteration:
- Transform synapse: Refines the content based on refinementPrompt
- Binary synapse: Checks if completionCriteria are met
The loop continues until either:
- The completion criteria are satisfied (Binary returns true)
- maxIterations is reached
Output Notes:
- {key}: JSON-serialized AmplifyResult
Example:
refine := cogito.NewAmplify(
"refined_response",
"draft_response",
"Improve clarity, remove redundancy, and ensure actionable recommendations",
"The response is clear, concise, and provides specific actionable steps",
3,
)
result, _ := refine.Process(ctx, thought)
output, _ := refine.Scan(result)
fmt.Println(output.Content, "in", output.Iterations, "iterations")
func (*Amplify) Scan ¶
func (a *Amplify) Scan(t *Thought) (*AmplifyResult, error)
Scan retrieves the typed amplify result from a thought.
func (*Amplify) WithCompletionTemperature ¶
WithCompletionTemperature sets the temperature for the completion check phase.
func (*Amplify) WithMaxIterations ¶
WithMaxIterations sets the maximum number of refinement iterations.
func (*Amplify) WithProvider ¶
WithProvider sets the provider for this step.
func (*Amplify) WithRefinementTemperature ¶
WithRefinementTemperature sets the temperature for the refinement phase.
func (*Amplify) WithTemperature ¶
WithTemperature sets the default temperature for both refinement and completion phases.
type AmplifyResult ¶
type AmplifyResult struct {
Content string `json:"content"` // Final refined content
Iterations int `json:"iterations"` // Number of iterations performed
Completed bool `json:"completed"` // Whether completion criteria was met
Reasoning []string `json:"reasoning"` // Reasoning from final completion check
}
AmplifyResult captures the outcome of an iterative refinement.
type Analyze ¶
Analyze is a structured data extraction primitive that implements pipz.Chainable[*Thought]. It extracts typed data from unstructured input using generics.
func NewAnalyze ¶
NewAnalyze creates a new structured data extraction primitive with introspection enabled by default.
The primitive uses two zyn synapses:
- Extract synapse: Pulls out structured data of type T
- Transform synapse: Synthesizes a semantic summary for context accumulation
Output Notes:
- {key}: JSON-serialized T (the extracted data)
- {key}_summary: Semantic summary for next steps (if introspection enabled)
Example:
type TicketData struct {
Severity string `json:"severity"`
Component string `json:"component"`
}
func (t TicketData) Validate() error { return nil }
step := cogito.NewAnalyze[TicketData]("ticket_data", "ticket metadata")
result, _ := step.Process(ctx, thought)
data, _ := step.Scan(result)
fmt.Println(data.Severity, data.Component)
func (*Analyze[T]) Scan ¶
Scan retrieves the typed extracted data from a thought. Returns T directly (the user-defined type), not a wrapper.
func (*Analyze[T]) WithIntrospection ¶
WithIntrospection enables the introspection phase.
func (*Analyze[T]) WithIntrospectionTemperature ¶
WithIntrospectionTemperature sets the temperature for the introspection phase.
func (*Analyze[T]) WithProvider ¶
WithProvider sets the provider for this step.
func (*Analyze[T]) WithReasoningTemperature ¶
WithReasoningTemperature sets the temperature for the reasoning phase.
func (*Analyze[T]) WithSummaryKey ¶
WithSummaryKey sets a custom key for the introspection summary note.
func (*Analyze[T]) WithTemperature ¶
WithTemperature sets the default temperature for this step.
type Assess ¶
type Assess struct {
// contains filtered or unexported fields
}
Assess is a sentiment assessment primitive that implements pipz.Chainable[*Thought]. It assesses the emotional tone of input and stores the full response for typed retrieval.
func NewAssess ¶
NewAssess creates a new sentiment assessment primitive with introspection enabled by default.
The primitive uses two zyn synapses:
- Sentiment synapse: Assesses emotional tone (positive/negative/neutral/mixed)
- Transform synapse: Synthesizes a semantic summary for context accumulation
Output Notes:
- {key}: JSON-serialized zyn.SentimentResponse
- {key}_summary: Semantic summary for next steps (if introspection enabled)
Example:
step := cogito.NewAssess("user_mood")
result, _ := step.Process(ctx, thought)
resp, _ := step.Scan(result)
fmt.Println(resp.Overall, resp.Confidence, resp.Scores)
func (*Assess) Scan ¶
func (s *Assess) Scan(t *Thought) (*zyn.SentimentResponse, error)
Scan retrieves the typed sentiment response from a thought.
func (*Assess) WithIntrospection ¶
WithIntrospection enables the introspection phase.
func (*Assess) WithIntrospectionTemperature ¶
WithIntrospectionTemperature sets the temperature for the introspection phase.
func (*Assess) WithProvider ¶
WithProvider sets the provider for this step.
func (*Assess) WithReasoningTemperature ¶
WithReasoningTemperature sets the temperature for the reasoning phase.
func (*Assess) WithSummaryKey ¶
WithSummaryKey sets a custom key for the introspection summary note.
func (*Assess) WithTemperature ¶
WithTemperature sets the default temperature for this step.
type Categorize ¶
type Categorize struct {
// contains filtered or unexported fields
}
Categorize is a multi-class categorization primitive that implements pipz.Chainable[*Thought]. It asks the LLM to place input into one of the provided categories.
func NewCategorize ¶
func NewCategorize(key, question string, categories []string) *Categorize
NewCategorize creates a new multi-class categorization primitive with introspection enabled by default.
The primitive uses two zyn synapses:
- Classification synapse: Places input into best matching category
- Transform synapse: Synthesizes a semantic summary for context accumulation
Output Notes:
- {key}: JSON-serialized zyn.ClassificationResponse
- {key}_summary: Semantic summary for next steps (if introspection enabled)
Example:
step := cogito.NewCategorize("ticket_type", "What type of ticket is this?", []string{"bug", "feature", "question"})
result, _ := step.Process(ctx, thought)
resp, _ := step.Scan(result)
fmt.Println(resp.Primary, resp.Confidence, resp.Reasoning)
func (*Categorize) Close ¶
func (c *Categorize) Close() error
Close implements pipz.Chainable[*Thought].
func (*Categorize) Identity ¶
func (c *Categorize) Identity() pipz.Identity
Identity implements pipz.Chainable[*Thought].
func (*Categorize) Scan ¶
func (c *Categorize) Scan(t *Thought) (*zyn.ClassificationResponse, error)
Scan retrieves the typed classification response from a thought.
func (*Categorize) Schema ¶
func (c *Categorize) Schema() pipz.Node
Schema implements pipz.Chainable[*Thought].
func (*Categorize) WithIntrospection ¶
func (c *Categorize) WithIntrospection() *Categorize
WithIntrospection enables the introspection phase.
func (*Categorize) WithIntrospectionTemperature ¶
func (c *Categorize) WithIntrospectionTemperature(temp float32) *Categorize
WithIntrospectionTemperature sets the temperature for the introspection phase.
func (*Categorize) WithProvider ¶
func (c *Categorize) WithProvider(p Provider) *Categorize
WithProvider sets the provider for this step.
func (*Categorize) WithReasoningTemperature ¶
func (c *Categorize) WithReasoningTemperature(temp float32) *Categorize
WithReasoningTemperature sets the temperature for the reasoning phase.
func (*Categorize) WithSummaryKey ¶
func (c *Categorize) WithSummaryKey(key string) *Categorize
WithSummaryKey sets a custom key for the introspection summary note.
func (*Categorize) WithTemperature ¶
func (c *Categorize) WithTemperature(temp float32) *Categorize
WithTemperature sets the default temperature for this step.
type Compress ¶
type Compress struct {
// contains filtered or unexported fields
}
Compress is a session management primitive that implements pipz.Chainable[*Thought]. It summarizes the current session via LLM and replaces it with a fresh session containing the summary as context.
func NewCompress ¶
NewCompress creates a new session compression primitive.
The primitive uses a zyn Transform synapse to summarize the session history, then replaces the session with a new one containing the summary as the first message.
Output Notes:
- {key}: The generated summary text
Example:
compress := cogito.NewCompress("session_compress").
WithThreshold(10) // Only compress if >= 10 messages
result, _ := compress.Process(ctx, thought)
func (*Compress) WithProvider ¶
WithProvider sets the provider for this step.
func (*Compress) WithSummaryKey ¶
WithSummaryKey sets the note key where the summary will be stored.
func (*Compress) WithTemperature ¶
WithTemperature sets the temperature for summarisation.
func (*Compress) WithThreshold ¶
WithThreshold sets the minimum message count to trigger compression. If the session has fewer messages, the primitive passes through unchanged.
type Converge ¶
type Converge struct {
// contains filtered or unexported fields
}
Converge is a parallel execution primitive with LLM-powered synthesis that implements pipz.Chainable[*Thought]. It runs multiple processors concurrently, then uses an LLM to synthesize their outputs into a unified result.
Unlike pipz.Concurrent which uses a programmatic reducer, Converge uses semantic synthesis to merge perspectives intelligently based on meaning rather than simple aggregation.
func NewConverge ¶
NewConverge creates a new parallel synthesis primitive.
The primitive executes all processors concurrently on cloned thoughts, then uses zyn.Transform to synthesize their outputs into a unified result.
Output Notes:
- {key}: The synthesized output combining all processor results
- Notes from each processor are preserved with their original keys
Example:
converge := cogito.NewConverge(
"unified_analysis",
"Synthesize these perspectives into a unified recommendation, highlighting agreements and resolving conflicts",
technicalAnalysis,
businessAnalysis,
riskAnalysis,
)
result, _ := converge.Process(ctx, thought)
synthesis, _ := converge.Scan(result)
fmt.Println(synthesis)
func (*Converge) AddProcessor ¶
AddProcessor adds a processor to the parallel execution list.
func (*Converge) ClearProcessors ¶
ClearProcessors removes all processors.
func (*Converge) Close ¶
Close implements pipz.Chainable[*Thought]. Propagates Close to all registered processors.
func (*Converge) Processors ¶
Processors returns a copy of the current processors list.
func (*Converge) RemoveProcessor ¶
RemoveProcessor removes a processor by identity.
func (*Converge) WithProvider ¶
WithProvider sets the provider for synthesis.
func (*Converge) WithSynthesisTemperature ¶
WithSynthesisTemperature sets the temperature for the synthesis phase.
func (*Converge) WithTemperature ¶
WithTemperature sets the default temperature for synthesis.
type Decide ¶
type Decide struct {
// contains filtered or unexported fields
}
Decide is a binary decision primitive that implements pipz.Chainable[*Thought]. It asks the LLM a yes/no question and stores the full response for typed retrieval.
func NewDecide ¶
NewDecide creates a new binary decision primitive with introspection enabled by default.
The primitive uses two zyn synapses:
- Binary synapse: Makes the decision and provides reasoning
- Transform synapse: Synthesizes a semantic summary for context accumulation
Output Notes:
- {key}: JSON-serialized zyn.BinaryResponse
- {key}_summary: Semantic summary for next steps (if introspection enabled)
Example:
step := cogito.NewDecide("is_urgent", "Is this ticket urgent?")
result, _ := step.Process(ctx, thought)
resp, _ := step.Scan(result)
fmt.Println(resp.Decision, resp.Confidence, resp.Reasoning)
func (*Decide) Scan ¶
func (d *Decide) Scan(t *Thought) (*zyn.BinaryResponse, error)
Scan retrieves the typed binary response from a thought.
func (*Decide) WithIntrospection ¶
WithIntrospection enables the introspection phase.
func (*Decide) WithIntrospectionTemperature ¶
WithIntrospectionTemperature sets the temperature for the introspection phase.
func (*Decide) WithProvider ¶
WithProvider sets the provider for this step.
func (*Decide) WithReasoningTemperature ¶
WithReasoningTemperature sets the temperature for the reasoning phase.
func (*Decide) WithSummaryKey ¶
WithSummaryKey sets a custom key for the introspection summary note.
func (*Decide) WithTemperature ¶
WithTemperature sets the default temperature for this step.
type Discern ¶
type Discern struct {
// contains filtered or unexported fields
}
Discern is an LLM-powered semantic routing connector that implements pipz.Chainable[*Thought]. It uses zyn.Classification directly to determine which route to take based on semantic analysis.
func NewDiscern ¶
NewDiscern creates a new semantic routing connector.
The connector uses zyn.Classification to determine which route to take. Routes are matched against the Primary category from the classification response.
Output Notes:
- {key}: JSON-serialized zyn.ClassificationResponse
- {key}_summary: Semantic summary for next steps (if introspection enabled)
Example:
router := cogito.NewDiscern(
"ticket_route",
"What type of support ticket is this?",
[]string{"billing", "technical", "general"},
)
router.AddRoute("billing", billingPipeline)
router.AddRoute("technical", technicalPipeline)
router.SetFallback(generalPipeline)
func (*Discern) ClearRoutes ¶
ClearRoutes removes all routes.
func (*Discern) Close ¶
Close implements pipz.Chainable[*Thought]. Propagates Close to all registered routes and fallback.
func (*Discern) RemoveRoute ¶
RemoveRoute removes a route for a category.
func (*Discern) Scan ¶
func (d *Discern) Scan(t *Thought) (*zyn.ClassificationResponse, error)
Scan retrieves the typed classification response from a thought.
func (*Discern) SetFallback ¶
SetFallback sets the fallback processor for unmatched categories.
func (*Discern) WithIntrospection ¶
WithIntrospection enables the introspection phase.
func (*Discern) WithIntrospectionTemperature ¶
WithIntrospectionTemperature sets the temperature for the introspection phase.
func (*Discern) WithProvider ¶
WithProvider sets the provider for classification.
func (*Discern) WithReasoningTemperature ¶
WithReasoningTemperature sets the temperature for the classification phase.
func (*Discern) WithSummaryKey ¶
WithSummaryKey sets a custom key for the introspection summary note.
func (*Discern) WithTemperature ¶
WithTemperature sets the default temperature for classification.
type Note ¶
type Note struct {
ID string `db:"id" type:"uuid" constraints:"primarykey" default:"gen_random_uuid()"`
ThoughtID string `db:"thought_id" type:"uuid" constraints:"notnull" references:"thoughts(id)"`
Key string `db:"key" type:"text" constraints:"notnull"`
Content string `db:"content" type:"text" constraints:"notnull"`
Metadata map[string]string `db:"metadata" type:"jsonb" default:"'{}'"`
Source string `db:"source" type:"text" constraints:"notnull"`
Created time.Time `db:"created" type:"timestamp" constraints:"notnull"`
}
Note represents a semantic piece of information in the reasoning chain. Everything in LLM space is fundamentally text-based, so Content is always a string. Metadata provides structured extension without breaking type safety.
type Prioritize ¶
type Prioritize struct {
// contains filtered or unexported fields
}
Prioritize is a prioritization primitive that implements pipz.Chainable[*Thought]. It prioritizes items by criteria and stores the full response for typed retrieval.
func NewPrioritize ¶
func NewPrioritize(key, criteria string, items []string) *Prioritize
NewPrioritize creates a new prioritization primitive with explicit items to prioritize.
The primitive uses two zyn synapses:
- Ranking synapse: Prioritizes items by criteria
- Transform synapse: Synthesizes a semantic summary for context accumulation
Output Notes:
- {key}: JSON-serialized zyn.RankingResponse
- {key}_summary: Semantic summary for next steps (if introspection enabled)
Example:
items := []string{
"ticket_1: Login broken",
"ticket_2: Feature request",
"ticket_3: Critical outage",
}
step := cogito.NewPrioritize("ticket_priority", "urgency and impact", items)
result, _ := step.Process(ctx, thought)
resp, _ := step.Scan(result)
fmt.Println(resp.Ranked, resp.Confidence, resp.Reasoning)
func NewPrioritizeFrom ¶
func NewPrioritizeFrom(key, criteria, itemsKey string) *Prioritize
NewPrioritizeFrom creates a new prioritization primitive that reads items from a note. The items are read from the specified note key (expected to be JSON array of strings).
The primitive uses two zyn synapses:
- Ranking synapse: Prioritizes items by criteria
- Transform synapse: Synthesizes a semantic summary for context accumulation
Output Notes:
- {key}: JSON-serialized zyn.RankingResponse
- {key}_summary: Semantic summary for next steps (if introspection enabled)
Example:
// First, extract items into a note
type TicketList struct {
Tickets []string `json:"tickets"`
}
cogito.NewAnalyze[TicketList]("ticket_list", "list of all tickets"),
// Then rank them
cogito.NewPrioritizeFrom("ticket_priority", "urgency and impact", "ticket_list"),
func (*Prioritize) Close ¶
func (r *Prioritize) Close() error
Close implements pipz.Chainable[*Thought].
func (*Prioritize) Identity ¶
func (r *Prioritize) Identity() pipz.Identity
Identity implements pipz.Chainable[*Thought].
func (*Prioritize) Scan ¶
func (r *Prioritize) Scan(t *Thought) (*zyn.RankingResponse, error)
Scan retrieves the typed ranking response from a thought.
func (*Prioritize) Schema ¶
func (r *Prioritize) Schema() pipz.Node
Schema implements pipz.Chainable[*Thought].
func (*Prioritize) WithIntrospection ¶
func (r *Prioritize) WithIntrospection() *Prioritize
WithIntrospection enables the introspection phase.
func (*Prioritize) WithIntrospectionTemperature ¶
func (r *Prioritize) WithIntrospectionTemperature(temp float32) *Prioritize
WithIntrospectionTemperature sets the temperature for the introspection phase.
func (*Prioritize) WithProvider ¶
func (r *Prioritize) WithProvider(p Provider) *Prioritize
WithProvider sets the provider for this step.
func (*Prioritize) WithReasoningTemperature ¶
func (r *Prioritize) WithReasoningTemperature(temp float32) *Prioritize
WithReasoningTemperature sets the temperature for the reasoning phase.
func (*Prioritize) WithSummaryKey ¶
func (r *Prioritize) WithSummaryKey(key string) *Prioritize
WithSummaryKey sets a custom key for the introspection summary note.
func (*Prioritize) WithTemperature ¶
func (r *Prioritize) WithTemperature(temp float32) *Prioritize
WithTemperature sets the default temperature for this step.
type Provider ¶
type Provider interface {
Call(ctx context.Context, messages []zyn.Message, temperature float32) (*zyn.ProviderResponse, error)
Name() string
}
Provider defines the interface for LLM providers. This matches zyn.Provider interface for compatibility.
func ProviderFromContext ¶
ProviderFromContext retrieves the provider from context, if present.
type Reflect ¶
type Reflect struct {
// contains filtered or unexported fields
}
Reflect is a memory primitive that summarizes the current Thought's notes into a single consolidated note. This enables self-compression for long reasoning chains,. allowing the agent to "step back" and consolidate its accumulated context.
func NewReflect ¶
NewReflect creates a new reflect primitive.
The primitive:
- Gathers notes from the current Thought
- Renders them to text
- Summarizes via LLM transform synapse
- Stores the summary as a new note
Output Notes:
- {key}: LLM-generated summary/reflection of the Thought's notes
Example:
// Consolidate reasoning after many steps
reflect := cogito.NewReflect("consolidated_context").
WithPrompt("Synthesize all findings into key insights and next steps")
result, _ := reflect.Process(ctx, thought)
func (*Reflect) WithPrompt ¶
WithPrompt sets a custom reflection prompt.
func (*Reflect) WithProvider ¶
WithProvider sets the provider for the LLM call.
func (*Reflect) WithUnpublishedOnly ¶
WithUnpublishedOnly limits reflection to unpublished notes only. This is useful for reflecting on recent work without re-processing context that has already been sent to the LLM.
type Reset ¶
type Reset struct {
// contains filtered or unexported fields
}
Reset is a session management primitive that implements pipz.Chainable[*Thought]. It clears the session entirely, optionally injecting a starting message.
func NewReset ¶
NewReset creates a new session reset primitive.
The primitive clears all messages from the session. Optionally, a system message can be injected into the fresh session. No LLM call is made.
Example:
reset := cogito.NewReset("session_reset").
WithSystemMessage("You are a helpful assistant.")
result, _ := reset.Process(ctx, thought)
// Or preserve context from a note:
reset := cogito.NewReset("session_reset").
WithPreserveNote("session_compress") // Use compression summary as new context
func (*Reset) WithPreserveNote ¶
WithPreserveNote sets a note key whose content will be used as the system message. This is useful for chaining with Compress—use the summary as the new context. If the note doesn't exist, falls back to WithSystemMessage value.
func (*Reset) WithSystemMessage ¶
WithSystemMessage sets a system message to inject after clearing.
type Sift ¶
type Sift struct {
// contains filtered or unexported fields
}
Sift is an LLM-powered conditional gate that implements pipz.Chainable[*Thought]. It uses semantic reasoning to decide whether to execute a wrapped processor or pass through unchanged.
Unlike pipz.Filter which uses a programmatic condition, Sift uses an LLM to make the decision based on semantic understanding of the context. This enables gates based on meaning rather than. simple data inspection.
func NewSift ¶
NewSift creates a new semantic gate primitive.
The primitive uses zyn.Binary to decide whether to execute the processor. If the decision is true, the processor is executed. Otherwise, the thought passes through unchanged.
Output Notes:
- {key}: JSON-serialized zyn.BinaryResponse (the gate decision)
- {key}_summary: Semantic summary for next steps (if introspection enabled)
Example:
gate := cogito.NewSift(
"escalation_gate",
"Does this ticket require human escalation?",
humanEscalationPipeline,
)
result, _ := gate.Process(ctx, thought)
resp, _ := gate.Scan(result)
fmt.Println("Escalated:", resp.Decision)
func (*Sift) Close ¶
Close implements pipz.Chainable[*Thought]. Propagates Close to the wrapped processor.
func (*Sift) Scan ¶
func (s *Sift) Scan(t *Thought) (*zyn.BinaryResponse, error)
Scan retrieves the typed binary response from a thought.
func (*Sift) SetProcessor ¶
SetProcessor updates the wrapped processor.
func (*Sift) WithIntrospection ¶
WithIntrospection enables the introspection phase.
func (*Sift) WithIntrospectionTemperature ¶
WithIntrospectionTemperature sets the temperature for the introspection phase.
func (*Sift) WithProvider ¶
WithProvider sets the provider for this step.
func (*Sift) WithReasoningTemperature ¶
WithReasoningTemperature sets the temperature for the gate decision phase.
func (*Sift) WithSummaryKey ¶
WithSummaryKey sets a custom key for the introspection summary note.
func (*Sift) WithTemperature ¶
WithTemperature sets the default temperature for this step.
type Thought ¶
type Thought struct {
// Identity
ID string `db:"id" type:"uuid" constraints:"primarykey" default:"gen_random_uuid()"`
Intent string `db:"intent" type:"text" constraints:"notnull"`
TraceID string `db:"trace_id" type:"text" constraints:"notnull,unique"`
// Lineage
ParentID *string `db:"parent_id" type:"uuid" references:"thoughts(id)"`
// LLM conversation state
Session *zyn.Session // Shared session for LLM continuity
// Timestamps
CreatedAt time.Time `db:"created_at" type:"timestamp" constraints:"notnull"`
UpdatedAt time.Time `db:"updated_at" type:"timestamp" constraints:"notnull"`
// contains filtered or unexported fields
}
Thought represents the rolling context of a chain of thought. It maintains an append-only log of Notes, providing semantic reasoning history.
Concurrency ¶
Thought is safe for concurrent reads but not concurrent writes. Multiple goroutines may call read methods (GetNote, GetContent, AllNotes, etc.). simultaneously, but write methods (AddNote, SetContent, MarkNotesPublished). must not be called concurrently with each other or with reads.
For parallel processing, use Clone to create independent copies for each goroutine. The cloned thoughts can then be merged or selected as needed.
Failure Behavior ¶
Primitive steps (Decide, Classify, Analyze, etc.) may modify the thought before encountering an error. If a step returns an error, the thought may. be in a partially-modified state. Callers requiring atomicity should Clone. the thought before processing and discard it on failure.
func New ¶
New creates a new Thought with the given intent. ID and TraceID are auto-generated using UUID.
func NewWithTrace ¶
NewWithTrace creates a new Thought with an explicit trace ID.
func (*Thought) AddNote ¶
AddNote adds a new note to the thought. If a note with the same key exists, the new note becomes the current value.
func (*Thought) Clone ¶
Clone creates a deep copy of the thought for concurrent processing. Required for pipz.Concurrent and other parallel operations.
The clone has an independent Session and notes. Modifications to the clone do not affect the original and vice versa.
Note: Clone should only be called when the original thought is not being concurrently modified. The Session.Messages() call is internally synchronized, but concurrent writes to the original thought during clone could result in an inconsistent snapshot.
func (*Thought) GetContent ¶
GetContent retrieves the content of the most recent note with the given key.
func (*Thought) GetLatestNote ¶
GetLatestNote returns the most recently added note.
func (*Thought) GetMetadata ¶
GetMetadata retrieves a specific metadata field from the most recent note.
func (*Thought) GetUnpublishedNotes ¶
GetUnpublishedNotes returns all notes that have not yet been sent to the LLM. These are notes added after the last MarkNotesPublished call.
func (*Thought) MarkNotesPublished ¶
MarkNotesPublished marks all current notes as published to the LLM. This should be called after successfully sending notes to a synapse.
func (*Thought) PublishedCount ¶
PublishedCount returns the number of notes that have been published to the LLM.
func (*Thought) SetContent ¶
SetContent adds a simple note with just key and content.
func (*Thought) SetNote ¶
func (t *Thought) SetNote(ctx context.Context, key, content, source string, metadata map[string]string) error
SetNote adds a note with metadata.
func (*Thought) SetPublishedCount ¶
SetPublishedCount sets the published count directly. This is primarily used for restoring thought state from persistence.
type Truncate ¶
type Truncate struct {
// contains filtered or unexported fields
}
Truncate is a session management primitive that implements pipz.Chainable[*Thought]. It removes messages from the session without LLM involvement, using a sliding window approach.
func NewTruncate ¶
NewTruncate creates a new session truncation primitive.
The primitive removes messages from the middle of the session, preserving the first N messages (typically system prompts) and the last M messages. (recent context). No LLM call is made.
Example:
truncate := cogito.NewTruncate("session_truncate").
WithKeepFirst(1). // Preserve system prompt
WithKeepLast(10). // Keep last 10 messages
WithThreshold(20) // Only truncate if >= 20 messages
result, _ := truncate.Process(ctx, thought)
func (*Truncate) WithKeepFirst ¶
WithKeepFirst sets the number of messages to preserve from the start. Typically used to preserve system prompts.
func (*Truncate) WithKeepLast ¶
WithKeepLast sets the number of recent messages to preserve.
func (*Truncate) WithThreshold ¶
WithThreshold sets the minimum message count to trigger truncation. If the session has fewer messages, the primitive passes through unchanged.