cogito

package module
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 22, 2026 License: MIT Imports: 12 Imported by: 0

README

cogito

CI Status codecov Go Report Card CodeQL Go Reference License Go Version Release

LLM-powered reasoning chains with semantic memory for Go.

Build autonomous systems that reason, remember, and adapt.

Reasoning That Accumulates

A Thought is a reasoning context. Primitives read it, reason via LLM, and contribute Notes back. Each step sees what came before.

thought, _ := cogito.New(ctx, memory, "should we approve this refund?")
thought.SetContent(ctx, "request", customerEmail, "input")

// Pipeline: each primitive builds on accumulated context
pipeline := cogito.Sequence("refund-decision",
    cogito.NewAnalyze[RefundRequest]("parse", "extract order ID, amount, and reason"),
    cogito.NewSeek("history", "this customer's previous refund requests"),
    cogito.NewDecide("approve", "does this meet our refund policy?").
        WithIntrospection(),
)

result, _ := pipeline.Process(ctx, thought)

// Every step left a Note — an auditable chain of reasoning
for _, note := range result.AllNotes() {
    fmt.Printf("[%s] %s: %s\n", note.Source, note.Key, note.Content)
}
// [input] request: "I'd like a refund for order #12345..."
// [analyze] parse: {"order_id": "12345", "amount": 49.99, "reason": "arrived damaged"}
// [seek] history: "No previous refund requests found for this customer"
// [decide] approve: "yes"
// [introspect] approve: "Approved: first-time request, clear damage claim, within policy window"

Introspection adds a semantic summary explaining why — context for subsequent steps or human review. Notes persist with vector embeddings, enabling semantic search across all stored reasoning.

Install

go get github.com/zoobzio/cogito

Requires Go 1.21+.

Quick Start

package main

import (
    "context"
    "fmt"
    "log"

    "github.com/zoobzio/cogito"
)

type TicketData struct {
    CustomerName string   `json:"customer_name"`
    Issue        string   `json:"issue"`
    Urgency      []string `json:"urgency_indicators"`
}

func main() {
    ctx := context.Background()

    // Configure providers
    cogito.SetProvider(myLLMProvider)
    cogito.SetEmbedder(cogito.NewOpenAIEmbedder(apiKey))

    // Connect to memory (implement cogito.Memory interface)
    memory := NewMyMemory()

    // Build a reasoning pipeline
    pipeline := cogito.Sequence("ticket-triage",
        cogito.NewAnalyze[TicketData]("parse",
            "extract customer name, issue description, and urgency indicators"),
        cogito.NewSeek("history", "similar support issues and resolutions").
            WithLimit(5),
        cogito.NewCategorize("category", "what type of support issue is this?",
            []string{"billing", "technical", "account", "feature_request"}),
        cogito.NewAssess("urgency",
            "how urgent is this ticket based on tone and content?"),
        cogito.NewDecide("escalate",
            "should this ticket be escalated to a senior agent?").
            WithIntrospection(),
    )

    // Create a thought with initial context
    thought, _ := cogito.New(ctx, memory, "triage support ticket")
    thought.SetContent(ctx, "ticket", incomingTicket, "input")

    // Execute the pipeline
    result, err := pipeline.Process(ctx, thought)
    if err != nil {
        log.Fatal(err)
    }

    // Results are Notes
    category, _ := result.GetContent("category")
    escalate, _ := result.GetContent("escalate")
    fmt.Printf("Category: %s, Escalate: %s\n", category, escalate)
}

Capabilities

Feature Description Docs
Reasoning Primitives Decide, Analyze, Categorize, Assess, Prioritize Primitives
Semantic Control Flow Sift (LLM gate) and Discern (LLM router) Control Flow
Memory & Reflection Recall, Reflect, Checkpoint, Seek, Survey Memory
Session Management Compress and Truncate for token control Sessions
Synthesis Amplify (iterative refinement), Converge (parallel synthesis) Synthesis
Two-Phase Reasoning Deterministic decisions with optional creative introspection Introspection

Why cogito?

  • Composable reasoning — Chain primitives into pipelines via pipz
  • Semantic memory — Notes persist with vector embeddings for similarity search
  • Context accumulation — Each step builds on previous reasoning
  • Two-phase reasoning — Deterministic decisions with optional creative introspection
  • Observable — Emits capitan signals throughout execution
  • Extensible — Implement pipz.Chainable[*Thought] for custom primitives

Semantic Control Flow

Traditional pipelines route on data. Cogito routes on meaning.

// Sift: LLM decides whether to execute
urgent := cogito.NewSift("urgent-only",
    "is this request time-sensitive?",
    expeditedHandler,
)

// Discern: LLM classifies and routes
router := cogito.NewDiscern("route", "how should we handle this?",
    map[string]pipz.Chainable[*cogito.Thought]{
        "approve": approvalFlow,
        "review":  manualReviewFlow,
        "decline": declineFlow,
    },
)

Control flow adapts to domain changes without code changes — the LLM interprets intent.

Integrate with flume for declarative, hot-reloadable pipeline definitions.

Documentation

  • Overview — Design philosophy and architecture
Learn
Guides
  • Control Flow — Sift and Discern for semantic routing
  • Memory — Persistence and semantic search
  • Sessions — Token management with Compress and Truncate
  • Synthesis — Amplify and Converge patterns
  • Custom Primitives — Implementing Chainable[*Thought]
  • Testing — Testing reasoning pipelines
Cookbook
Reference
  • API — Complete function documentation
  • Primitives — All primitives with signatures
  • Types — Thought, Note, Memory, Provider

Contributing

See CONTRIBUTING.md for guidelines.

License

MIT License — see LICENSE for details.

Documentation

Overview

Package cogito provides LLM-powered reasoning chains for Go.

cogito implements a Thought-Note architecture for building autonomous systems that reason and adapt.

Core Types

The package is built around two core concepts:

  • Thought - A reasoning context that accumulates information across pipeline steps
  • Note - Atomic units of information (key-value pairs with metadata)

Creating Thoughts

Use New or NewWithTrace to create thoughts:

thought := cogito.New(ctx, "analyze customer feedback")
thought.SetContent(ctx, "feedback", customerMessage, "input")

Primitives

Cogito provides a comprehensive set of reasoning primitives:

Decision & Analysis:

Control Flow:

  • NewSift - Semantic gate - LLM decides whether to execute wrapped processor
  • NewDiscern - Semantic router - LLM classifies and routes to different processors

Reflection:

  • NewReflect - Consolidate current Thought's Notes into a summary

Session Management:

Synthesis:

  • NewAmplify - Iterative refinement until criteria met
  • NewConverge - Parallel execution with semantic synthesis

Pipeline Helpers

Cogito wraps pipz connectors for Thought processing:

  • Sequence - Sequential execution
  • Filter - Conditional execution
  • Switch - Route to different processors
  • Fallback - Try alternatives on failure
  • Retry - Retry on failure
  • Backoff - Retry with exponential backoff
  • Timeout - Enforce time limits
  • Concurrent - Run processors in parallel
  • Race - Return first successful result

Provider

LLM access uses a resolution hierarchy:

  1. Explicit parameter (.WithProvider(p))
  2. Context value (cogito.WithProvider(ctx, p))
  3. Global default (cogito.SetProvider(p))

Use SetProvider to configure the global default:

cogito.SetProvider(myProvider)

Observability

Cogito emits capitan signals throughout execution. See [signals.go] for the complete list of events including ThoughtCreated, StepStarted, StepCompleted, StepFailed, NoteAdded, and NotesPublished.

Index

Constants

This section is empty.

Variables

View Source
var (
	// DefaultIntrospection controls whether primitives generate semantic summaries
	// after reasoning. Disabled by default for cost efficiency. Enable per-step
	// with WithIntrospection() or globally by setting this to true.
	DefaultIntrospection = false

	// DefaultReasoningTemperature is used for the primary LLM call in each primitive.
	// Defaults to deterministic (low temperature) for consistent outputs.
	DefaultReasoningTemperature = zyn.DefaultTemperatureDeterministic

	// DefaultIntrospectionTemperature is used for the transform synapse that
	// synthesizes semantic summaries. Defaults to creative (higher temperature)
	// for richer context generation.
	DefaultIntrospectionTemperature = zyn.DefaultTemperatureCreative
)

Default configuration for cogito primitives. These can be overridden per-step using builder methods.

View Source
var (
	// Thought lifecycle signals.
	ThoughtCreated = capitan.NewSignal(
		"cogito.thought.created",
		"New thought chain initiated with intent and trace ID",
	)

	// Step execution signals.
	StepStarted = capitan.NewSignal(
		"cogito.step.started",
		"Reasoning step began execution",
	)
	StepCompleted = capitan.NewSignal(
		"cogito.step.completed",
		"Reasoning step finished successfully",
	)
	StepFailed = capitan.NewSignal(
		"cogito.step.failed",
		"Reasoning step encountered an error",
	)

	// Note management signals.
	NoteAdded = capitan.NewSignal(
		"cogito.note.added",
		"New note added to thought context",
	)
	NotesPublished = capitan.NewSignal(
		"cogito.notes.published",
		"Notes marked as published to LLM",
	)

	// Introspection signals.
	IntrospectionCompleted = capitan.NewSignal(
		"cogito.introspection.completed",
		"Transform synapse completed semantic summary",
	)

	// Sift signals.
	SiftDecided = capitan.NewSignal(
		"cogito.sift.decided",
		"Semantic gate decision made",
	)

	// Amplify signals.
	AmplifyIterationCompleted = capitan.NewSignal(
		"cogito.amplify.iteration.completed",
		"Refinement iteration finished",
	)
	AmplifyCompleted = capitan.NewSignal(
		"cogito.amplify.completed",
		"Refinement met completion criteria",
	)

	// Converge signals.
	ConvergeBranchStarted = capitan.NewSignal(
		"cogito.converge.branch.started",
		"Parallel branch began execution",
	)
	ConvergeBranchCompleted = capitan.NewSignal(
		"cogito.converge.branch.completed",
		"Parallel branch finished execution",
	)
	ConvergeSynthesisStarted = capitan.NewSignal(
		"cogito.converge.synthesis.started",
		"Synthesis phase began",
	)
)

Signal definitions for cogito reasoning chain events. Signals follow the pattern: cogito.<entity>.<event>.

View Source
var (
	// Thought metadata.
	FieldIntent    = capitan.NewStringKey("intent")
	FieldTraceID   = capitan.NewStringKey("trace_id")
	FieldNoteCount = capitan.NewIntKey("note_count")

	// Step metadata.
	FieldStepName    = capitan.NewStringKey("step_name")
	FieldStepType    = capitan.NewStringKey("step_type") // decide, classify, analyze, sentiment, rank
	FieldTemperature = capitan.NewFloat32Key("temperature")
	FieldProvider    = capitan.NewStringKey("provider")

	// Note metadata.
	FieldNoteKey     = capitan.NewStringKey("note_key")
	FieldNoteSource  = capitan.NewStringKey("note_source")
	FieldContentSize = capitan.NewIntKey("content_size") // character count

	// Context metrics.
	FieldUnpublishedCount = capitan.NewIntKey("unpublished_count")
	FieldPublishedCount   = capitan.NewIntKey("published_count")
	FieldContextSize      = capitan.NewIntKey("context_size") // character count

	// Timing.
	FieldStepDuration = capitan.NewDurationKey("step_duration")

	// Error information.
	FieldError = capitan.NewErrorKey("error")

	// Decision metadata (for Sift, Amplify).
	FieldDecision   = capitan.NewBoolKey("decision")
	FieldConfidence = capitan.NewFloat64Key("confidence")

	// Iteration metadata (for Amplify).
	FieldIterationCount = capitan.NewIntKey("iteration_count")

	// Branch metadata (for Converge).
	FieldBranchCount = capitan.NewIntKey("branch_count")
	FieldBranchName  = capitan.NewStringKey("branch_name")
)

Field keys for cogito event data.

View Source
var ErrNoProvider = errors.New("no provider configured: set via context, step-level, or global")

ErrNoProvider is returned when no provider can be resolved.

Functions

func Backoff

func Backoff(identity pipz.Identity, processor pipz.Chainable[*Thought], maxAttempts int, baseDelay time.Duration) *pipz.Backoff[*Thought]

Backoff creates a processor that retries with exponential backoff. Useful for operations that need time to recover between attempts.

Example:

resilient := cogito.Backoff(pipz.NewIdentity("api-call", "API with backoff"), apiProcessor, 5, time.Second)

func CircuitBreaker

func CircuitBreaker(identity pipz.Identity, processor pipz.Chainable[*Thought], failureThreshold int, resetTimeout time.Duration) *pipz.CircuitBreaker[*Thought]

CircuitBreaker creates a processor that prevents cascade failures. Opens the circuit after failureThreshold consecutive failures.

Example:

protected := cogito.CircuitBreaker(pipz.NewIdentity("service-call", "Protected service call"), apiProcessor, 5, 30*time.Second)

func Concurrent

func Concurrent(identity pipz.Identity, reducer func(original *Thought, results map[pipz.Identity]*Thought, errors map[pipz.Identity]error) *Thought, processors ...pipz.Chainable[*Thought]) *pipz.Concurrent[*Thought]

Concurrent runs all processors in parallel and returns the original thought. Each processor receives an isolated clone. Use the reducer to aggregate results.

Example:

parallel := cogito.Concurrent(pipz.NewIdentity("notify-all", "Parallel notifications"), nil,
    emailNotifier,
    smsNotifier,
    webhookNotifier,
)

func Do

func Do(identity pipz.Identity, fn func(context.Context, *Thought) (*Thought, error)) pipz.Processor[*Thought]

Do creates a processor from a custom function that can fail. This is the easiest way to add custom logic to a chain.

Example:

route := cogito.Do(pipz.NewIdentity("route-ticket", "Route tickets to queues"), func(ctx context.Context, t *cogito.Thought) (*cogito.Thought, error) {
    ticketType, _ := t.GetContent("ticket_type")
    if ticketType == "urgent" {
        t.SetContent("queue", "urgent-queue", "route-ticket")
    } else {
        t.SetContent("queue", "standard-queue", "route-ticket")
    }
    return t, nil
})

func Effect

func Effect(identity pipz.Identity, fn func(context.Context, *Thought) error) pipz.Processor[*Thought]

Effect creates a processor that performs a side effect without modifying the thought. Use this for logging, metrics, or other observational operations.

Example:

logger := cogito.Effect(pipz.NewIdentity("log-intent", "Log processing intent"), func(ctx context.Context, t *cogito.Thought) error {
    log.Printf("Processing thought with intent: %s", t.Intent)
    return nil
})

func Enrich

func Enrich(identity pipz.Identity, fn func(context.Context, *Thought) (*Thought, error)) pipz.Processor[*Thought]

Enrich creates a processor that optionally enhances a thought. Unlike Do, errors are logged but don't stop the pipeline.

Example:

addContext := cogito.Enrich(pipz.NewIdentity("add-context", "Fetch external context"), func(ctx context.Context, t *cogito.Thought) (*cogito.Thought, error) {
    extra, err := fetchExternalContext(ctx, t.Intent)
    if err != nil {
        return t, err // Logged but pipeline continues
    }
    t.SetContent("external_context", extra, "add-context")
    return t, nil
})

func Fallback

func Fallback(identity pipz.Identity, processors ...pipz.Chainable[*Thought]) *pipz.Fallback[*Thought]

Fallback creates a processor that tries alternatives on failure. Each processor is tried in order until one succeeds.

Example:

resilient := cogito.Fallback(pipz.NewIdentity("resilient-analysis", "Try multiple analyzers"),
    primaryAnalyzer,
    backupAnalyzer,
    fallbackAnalyzer,
)

func Filter

func Filter(identity pipz.Identity, predicate func(context.Context, *Thought) bool, processor pipz.Chainable[*Thought]) *pipz.Filter[*Thought]

Filter creates a conditional processor that either processes or passes through. When the predicate returns true, the processor is executed. When false, the thought passes through unchanged.

Example:

onlyUrgent := cogito.Filter(pipz.NewIdentity("urgent-only", "Process only urgent items"),
    func(ctx context.Context, t *cogito.Thought) bool {
        priority, _ := t.GetContent("priority")
        return priority == "urgent"
    },
    urgentProcessor,
)

func Gate

func Gate(identity pipz.Identity, predicate func(context.Context, *Thought) bool) pipz.Processor[*Thought]

Gate creates a simple pass/fail filter that blocks thoughts not meeting criteria. Unlike Filter which has a fallback processor, Gate simply passes through or blocks.

Example:

validOnly := cogito.Gate(pipz.NewIdentity("valid-only", "Filter invalid thoughts"), func(ctx context.Context, t *cogito.Thought) bool {
    _, err := t.GetContent("required_field")
    return err == nil
})

func Handle

func Handle(identity pipz.Identity, processor pipz.Chainable[*Thought], errorHandler pipz.Chainable[*pipz.Error[*Thought]]) *pipz.Handle[*Thought]

Handle creates a processor that handles errors without stopping the pipeline. When the primary processor fails, the error handler is invoked for monitoring. The error handler receives a pipz.Error[*Thought] with full error context.

Example:

errorLogger := pipz.Effect(pipz.NewIdentity("log-error", "Log errors"), func(ctx context.Context, err *pipz.Error[*cogito.Thought]) error {
    log.Printf("thought %s failed: %v", err.InputData.TraceID, err.Err)
    return nil
})
observed := cogito.Handle(pipz.NewIdentity("observed", "Observed processor"), riskyProcessor, errorLogger)

func Mutate

func Mutate(identity pipz.Identity, fn func(context.Context, *Thought) *Thought, predicate func(context.Context, *Thought) bool) pipz.Processor[*Thought]

Mutate creates a processor that conditionally modifies a thought. The modification is only applied if the predicate returns true.

Example:

prioritize := cogito.Mutate(pipz.NewIdentity("prioritize", "Set high priority for critical items"),
    func(ctx context.Context, t *cogito.Thought) *cogito.Thought {
        t.SetContent("priority", "high", "prioritize")
        return t
    },
    func(ctx context.Context, t *cogito.Thought) bool {
        urgency, _ := t.GetContent("urgency")
        return urgency == "critical"
    },
)

func Race

func Race(identity pipz.Identity, processors ...pipz.Chainable[*Thought]) *pipz.Race[*Thought]

Race runs all processors in parallel and returns the first successful result. Useful for reducing latency when multiple paths can produce the same result.

Example:

fastest := cogito.Race(pipz.NewIdentity("fastest-lookup", "First successful lookup"),
    cacheProcessor,
    databaseProcessor,
    externalAPIProcessor,
)

func RateLimiter

func RateLimiter(identity pipz.Identity, requestsPerSecond float64, burst int, processor pipz.Chainable[*Thought]) *pipz.RateLimiter[*Thought]

RateLimiter creates a processor that enforces rate limits. Useful for protecting rate-limited external services.

Example:

limited := cogito.RateLimiter(pipz.NewIdentity("api-limit", "API rate limiter"), 100, 10, apiCall) // 100/sec, burst 10

func RenderNotesToContext

func RenderNotesToContext(notes []Note) string

RenderNotesToContext converts a slice of notes to a formatted context string suitable for LLM consumption. Each note is rendered as "key: content".

func Retry

func Retry(identity pipz.Identity, processor pipz.Chainable[*Thought], maxAttempts int) *pipz.Retry[*Thought]

Retry creates a processor that retries on failure up to maxAttempts times. Immediate retry without delay - for backoff, use Backoff instead.

Example:

reliable := cogito.Retry(pipz.NewIdentity("reliable-call", "Retry external service"), externalServiceCall, 3)

func Sequence

func Sequence(identity pipz.Identity, processors ...pipz.Chainable[*Thought]) *pipz.Sequence[*Thought]

Sequence creates a sequential pipeline of thought processors. Each processor receives the output of the previous one.

Example:

pipeline := cogito.Sequence(pipz.NewIdentity("reasoning-chain", "Main reasoning pipeline"),
    cogito.NewAnalyze("analyze", "Examine the situation"),
    cogito.NewDecide("decide", "What action to take?"),
)

func SetProvider

func SetProvider(p Provider)

SetProvider sets the global fallback provider. This provider is used when no context or step-level provider is available.

func Switch

func Switch(identity pipz.Identity, condition func(context.Context, *Thought) string) *pipz.Switch[*Thought]

Switch creates a router that directs thoughts to different processors. The condition function returns a route key string that determines which processor handles the thought.

Example:

router := cogito.Switch(pipz.NewIdentity("intent-router", "Route by category"), func(ctx context.Context, t *cogito.Thought) string {
    category, _ := t.GetContent("category")
    return category
})
router.AddRoute("question", questionHandler)
router.AddRoute("command", commandHandler)

func Timeout

func Timeout(identity pipz.Identity, processor pipz.Chainable[*Thought], duration time.Duration) *pipz.Timeout[*Thought]

Timeout creates a processor that enforces a time limit on execution. If the timeout expires, the operation is canceled and an error is returned.

Example:

bounded := cogito.Timeout(pipz.NewIdentity("bounded-analysis", "Time-limited analysis"), analyzer, 30*time.Second)

func Transform

func Transform(identity pipz.Identity, fn func(context.Context, *Thought) *Thought) pipz.Processor[*Thought]

Transform creates a processor from a pure transformation function. Use this when your operation cannot fail.

Example:

addMetadata := cogito.Transform(pipz.NewIdentity("add-metadata", "Add timestamp"), func(ctx context.Context, t *cogito.Thought) *cogito.Thought {
    t.SetContent("timestamp", time.Now().Format(time.RFC3339), "add-metadata")
    return t
})

func WithProvider

func WithProvider(ctx context.Context, p Provider) context.Context

WithProvider adds a provider to the context. This is the preferred method for provider management.

func WorkerPool

func WorkerPool(identity pipz.Identity, workers int, processors ...pipz.Chainable[*Thought]) *pipz.WorkerPool[*Thought]

WorkerPool creates a bounded parallel executor with a fixed number of workers. Useful for controlling parallelism when processing multiple thought streams.

Example:

pool := cogito.WorkerPool(pipz.NewIdentity("bounded-analysis", "Bounded worker pool"), 5,
    analyzer1,
    analyzer2,
    analyzer3,
)

Types

type Amplify

type Amplify struct {
	// contains filtered or unexported fields
}

Amplify is an iterative refinement primitive that implements pipz.Chainable[*Thought]. It repeatedly refines content until an LLM determines completion criteria are met.

Unlike pipz.Retry which retries on errors, Amplify iterates based on semantic quality. Each iteration transforms the content, then checks if the result meets the specified criteria.

func NewAmplify

func NewAmplify(key, sourceKey, refinementPrompt, completionCriteria string, maxIterations int) *Amplify

NewAmplify creates a new iterative refinement primitive.

The primitive uses two zyn synapses per iteration:

  1. Transform synapse: Refines the content based on refinementPrompt
  2. Binary synapse: Checks if completionCriteria are met

The loop continues until either:

  • The completion criteria are satisfied (Binary returns true)
  • maxIterations is reached

Output Notes:

  • {key}: JSON-serialized AmplifyResult

Example:

refine := cogito.NewAmplify(
    "refined_response",
    "draft_response",
    "Improve clarity, remove redundancy, and ensure actionable recommendations",
    "The response is clear, concise, and provides specific actionable steps",
    3,
)
result, _ := refine.Process(ctx, thought)
output, _ := refine.Scan(result)
fmt.Println(output.Content, "in", output.Iterations, "iterations")

func (*Amplify) Close

func (a *Amplify) Close() error

Close implements pipz.Chainable[*Thought].

func (*Amplify) Identity

func (a *Amplify) Identity() pipz.Identity

Identity implements pipz.Chainable[*Thought].

func (*Amplify) Process

func (a *Amplify) Process(ctx context.Context, t *Thought) (*Thought, error)

Process implements pipz.Chainable[*Thought].

func (*Amplify) Scan

func (a *Amplify) Scan(t *Thought) (*AmplifyResult, error)

Scan retrieves the typed amplify result from a thought.

func (*Amplify) Schema

func (a *Amplify) Schema() pipz.Node

Schema implements pipz.Chainable[*Thought].

func (*Amplify) WithCompletionTemperature

func (a *Amplify) WithCompletionTemperature(temp float32) *Amplify

WithCompletionTemperature sets the temperature for the completion check phase.

func (*Amplify) WithMaxIterations

func (a *Amplify) WithMaxIterations(maxIter int) *Amplify

WithMaxIterations sets the maximum number of refinement iterations.

func (*Amplify) WithProvider

func (a *Amplify) WithProvider(p Provider) *Amplify

WithProvider sets the provider for this step.

func (*Amplify) WithRefinementTemperature

func (a *Amplify) WithRefinementTemperature(temp float32) *Amplify

WithRefinementTemperature sets the temperature for the refinement phase.

func (*Amplify) WithTemperature

func (a *Amplify) WithTemperature(temp float32) *Amplify

WithTemperature sets the default temperature for both refinement and completion phases.

type AmplifyResult

type AmplifyResult struct {
	Content    string   `json:"content"`    // Final refined content
	Iterations int      `json:"iterations"` // Number of iterations performed
	Completed  bool     `json:"completed"`  // Whether completion criteria was met
	Reasoning  []string `json:"reasoning"`  // Reasoning from final completion check
}

AmplifyResult captures the outcome of an iterative refinement.

type Analyze

type Analyze[T zyn.Validator] struct {
	// contains filtered or unexported fields
}

Analyze is a structured data extraction primitive that implements pipz.Chainable[*Thought]. It extracts typed data from unstructured input using generics.

func NewAnalyze

func NewAnalyze[T zyn.Validator](key, what string) *Analyze[T]

NewAnalyze creates a new structured data extraction primitive with introspection enabled by default.

The primitive uses two zyn synapses:

  1. Extract synapse: Pulls out structured data of type T
  2. Transform synapse: Synthesizes a semantic summary for context accumulation

Output Notes:

  • {key}: JSON-serialized T (the extracted data)
  • {key}_summary: Semantic summary for next steps (if introspection enabled)

Example:

type TicketData struct {
    Severity  string `json:"severity"`
    Component string `json:"component"`
}
func (t TicketData) Validate() error { return nil }

step := cogito.NewAnalyze[TicketData]("ticket_data", "ticket metadata")
result, _ := step.Process(ctx, thought)
data, _ := step.Scan(result)
fmt.Println(data.Severity, data.Component)

func (*Analyze[T]) Close

func (a *Analyze[T]) Close() error

Close implements pipz.Chainable[*Thought].

func (*Analyze[T]) Identity

func (a *Analyze[T]) Identity() pipz.Identity

Identity implements pipz.Chainable[*Thought].

func (*Analyze[T]) Process

func (a *Analyze[T]) Process(ctx context.Context, t *Thought) (*Thought, error)

Process implements pipz.Chainable[*Thought].

func (*Analyze[T]) Scan

func (a *Analyze[T]) Scan(t *Thought) (T, error)

Scan retrieves the typed extracted data from a thought. Returns T directly (the user-defined type), not a wrapper.

func (*Analyze[T]) Schema

func (a *Analyze[T]) Schema() pipz.Node

Schema implements pipz.Chainable[*Thought].

func (*Analyze[T]) WithIntrospection

func (a *Analyze[T]) WithIntrospection() *Analyze[T]

WithIntrospection enables the introspection phase.

func (*Analyze[T]) WithIntrospectionTemperature

func (a *Analyze[T]) WithIntrospectionTemperature(temp float32) *Analyze[T]

WithIntrospectionTemperature sets the temperature for the introspection phase.

func (*Analyze[T]) WithProvider

func (a *Analyze[T]) WithProvider(p Provider) *Analyze[T]

WithProvider sets the provider for this step.

func (*Analyze[T]) WithReasoningTemperature

func (a *Analyze[T]) WithReasoningTemperature(temp float32) *Analyze[T]

WithReasoningTemperature sets the temperature for the reasoning phase.

func (*Analyze[T]) WithSummaryKey

func (a *Analyze[T]) WithSummaryKey(key string) *Analyze[T]

WithSummaryKey sets a custom key for the introspection summary note.

func (*Analyze[T]) WithTemperature

func (a *Analyze[T]) WithTemperature(temp float32) *Analyze[T]

WithTemperature sets the default temperature for this step.

type Assess

type Assess struct {
	// contains filtered or unexported fields
}

Assess is a sentiment assessment primitive that implements pipz.Chainable[*Thought]. It assesses the emotional tone of input and stores the full response for typed retrieval.

func NewAssess

func NewAssess(key string) *Assess

NewAssess creates a new sentiment assessment primitive with introspection enabled by default.

The primitive uses two zyn synapses:

  1. Sentiment synapse: Assesses emotional tone (positive/negative/neutral/mixed)
  2. Transform synapse: Synthesizes a semantic summary for context accumulation

Output Notes:

  • {key}: JSON-serialized zyn.SentimentResponse
  • {key}_summary: Semantic summary for next steps (if introspection enabled)

Example:

step := cogito.NewAssess("user_mood")
result, _ := step.Process(ctx, thought)
resp, _ := step.Scan(result)
fmt.Println(resp.Overall, resp.Confidence, resp.Scores)

func (*Assess) Close

func (s *Assess) Close() error

Close implements pipz.Chainable[*Thought].

func (*Assess) Identity

func (s *Assess) Identity() pipz.Identity

Identity implements pipz.Chainable[*Thought].

func (*Assess) Process

func (s *Assess) Process(ctx context.Context, t *Thought) (*Thought, error)

Process implements pipz.Chainable[*Thought].

func (*Assess) Scan

func (s *Assess) Scan(t *Thought) (*zyn.SentimentResponse, error)

Scan retrieves the typed sentiment response from a thought.

func (*Assess) Schema

func (s *Assess) Schema() pipz.Node

Schema implements pipz.Chainable[*Thought].

func (*Assess) WithIntrospection

func (s *Assess) WithIntrospection() *Assess

WithIntrospection enables the introspection phase.

func (*Assess) WithIntrospectionTemperature

func (s *Assess) WithIntrospectionTemperature(temp float32) *Assess

WithIntrospectionTemperature sets the temperature for the introspection phase.

func (*Assess) WithProvider

func (s *Assess) WithProvider(p Provider) *Assess

WithProvider sets the provider for this step.

func (*Assess) WithReasoningTemperature

func (s *Assess) WithReasoningTemperature(temp float32) *Assess

WithReasoningTemperature sets the temperature for the reasoning phase.

func (*Assess) WithSummaryKey

func (s *Assess) WithSummaryKey(key string) *Assess

WithSummaryKey sets a custom key for the introspection summary note.

func (*Assess) WithTemperature

func (s *Assess) WithTemperature(temp float32) *Assess

WithTemperature sets the default temperature for this step.

type Categorize

type Categorize struct {
	// contains filtered or unexported fields
}

Categorize is a multi-class categorization primitive that implements pipz.Chainable[*Thought]. It asks the LLM to place input into one of the provided categories.

func NewCategorize

func NewCategorize(key, question string, categories []string) *Categorize

NewCategorize creates a new multi-class categorization primitive with introspection enabled by default.

The primitive uses two zyn synapses:

  1. Classification synapse: Places input into best matching category
  2. Transform synapse: Synthesizes a semantic summary for context accumulation

Output Notes:

  • {key}: JSON-serialized zyn.ClassificationResponse
  • {key}_summary: Semantic summary for next steps (if introspection enabled)

Example:

step := cogito.NewCategorize("ticket_type", "What type of ticket is this?", []string{"bug", "feature", "question"})
result, _ := step.Process(ctx, thought)
resp, _ := step.Scan(result)
fmt.Println(resp.Primary, resp.Confidence, resp.Reasoning)

func (*Categorize) Close

func (c *Categorize) Close() error

Close implements pipz.Chainable[*Thought].

func (*Categorize) Identity

func (c *Categorize) Identity() pipz.Identity

Identity implements pipz.Chainable[*Thought].

func (*Categorize) Process

func (c *Categorize) Process(ctx context.Context, t *Thought) (*Thought, error)

Process implements pipz.Chainable[*Thought].

func (*Categorize) Scan

Scan retrieves the typed classification response from a thought.

func (*Categorize) Schema

func (c *Categorize) Schema() pipz.Node

Schema implements pipz.Chainable[*Thought].

func (*Categorize) WithIntrospection

func (c *Categorize) WithIntrospection() *Categorize

WithIntrospection enables the introspection phase.

func (*Categorize) WithIntrospectionTemperature

func (c *Categorize) WithIntrospectionTemperature(temp float32) *Categorize

WithIntrospectionTemperature sets the temperature for the introspection phase.

func (*Categorize) WithProvider

func (c *Categorize) WithProvider(p Provider) *Categorize

WithProvider sets the provider for this step.

func (*Categorize) WithReasoningTemperature

func (c *Categorize) WithReasoningTemperature(temp float32) *Categorize

WithReasoningTemperature sets the temperature for the reasoning phase.

func (*Categorize) WithSummaryKey

func (c *Categorize) WithSummaryKey(key string) *Categorize

WithSummaryKey sets a custom key for the introspection summary note.

func (*Categorize) WithTemperature

func (c *Categorize) WithTemperature(temp float32) *Categorize

WithTemperature sets the default temperature for this step.

type Compress

type Compress struct {
	// contains filtered or unexported fields
}

Compress is a session management primitive that implements pipz.Chainable[*Thought]. It summarizes the current session via LLM and replaces it with a fresh session containing the summary as context.

func NewCompress

func NewCompress(key string) *Compress

NewCompress creates a new session compression primitive.

The primitive uses a zyn Transform synapse to summarize the session history, then replaces the session with a new one containing the summary as the first message.

Output Notes:

  • {key}: The generated summary text

Example:

compress := cogito.NewCompress("session_compress").
    WithThreshold(10)  // Only compress if >= 10 messages
result, _ := compress.Process(ctx, thought)

func (*Compress) Close

func (c *Compress) Close() error

Close implements pipz.Chainable[*Thought].

func (*Compress) Identity

func (c *Compress) Identity() pipz.Identity

Identity implements pipz.Chainable[*Thought].

func (*Compress) Process

func (c *Compress) Process(ctx context.Context, t *Thought) (*Thought, error)

Process implements pipz.Chainable[*Thought].

func (*Compress) Schema

func (c *Compress) Schema() pipz.Node

Schema implements pipz.Chainable[*Thought].

func (*Compress) WithProvider

func (c *Compress) WithProvider(p Provider) *Compress

WithProvider sets the provider for this step.

func (*Compress) WithSummaryKey

func (c *Compress) WithSummaryKey(key string) *Compress

WithSummaryKey sets the note key where the summary will be stored.

func (*Compress) WithTemperature

func (c *Compress) WithTemperature(temp float32) *Compress

WithTemperature sets the temperature for summarisation.

func (*Compress) WithThreshold

func (c *Compress) WithThreshold(n int) *Compress

WithThreshold sets the minimum message count to trigger compression. If the session has fewer messages, the primitive passes through unchanged.

type Converge

type Converge struct {
	// contains filtered or unexported fields
}

Converge is a parallel execution primitive with LLM-powered synthesis that implements pipz.Chainable[*Thought]. It runs multiple processors concurrently, then uses an LLM to synthesize their outputs into a unified result.

Unlike pipz.Concurrent which uses a programmatic reducer, Converge uses semantic synthesis to merge perspectives intelligently based on meaning rather than simple aggregation.

func NewConverge

func NewConverge(key, synthesisPrompt string, processors ...pipz.Chainable[*Thought]) *Converge

NewConverge creates a new parallel synthesis primitive.

The primitive executes all processors concurrently on cloned thoughts, then uses zyn.Transform to synthesize their outputs into a unified result.

Output Notes:

  • {key}: The synthesized output combining all processor results
  • Notes from each processor are preserved with their original keys

Example:

converge := cogito.NewConverge(
    "unified_analysis",
    "Synthesize these perspectives into a unified recommendation, highlighting agreements and resolving conflicts",
    technicalAnalysis,
    businessAnalysis,
    riskAnalysis,
)
result, _ := converge.Process(ctx, thought)
synthesis, _ := converge.Scan(result)
fmt.Println(synthesis)

func (*Converge) AddProcessor

func (c *Converge) AddProcessor(processor pipz.Chainable[*Thought]) *Converge

AddProcessor adds a processor to the parallel execution list.

func (*Converge) ClearProcessors

func (c *Converge) ClearProcessors() *Converge

ClearProcessors removes all processors.

func (*Converge) Close

func (c *Converge) Close() error

Close implements pipz.Chainable[*Thought]. Propagates Close to all registered processors.

func (*Converge) Identity

func (c *Converge) Identity() pipz.Identity

Identity implements pipz.Chainable[*Thought].

func (*Converge) Process

func (c *Converge) Process(ctx context.Context, t *Thought) (*Thought, error)

Process implements pipz.Chainable[*Thought].

func (*Converge) Processors

func (c *Converge) Processors() []pipz.Chainable[*Thought]

Processors returns a copy of the current processors list.

func (*Converge) RemoveProcessor

func (c *Converge) RemoveProcessor(identity pipz.Identity) *Converge

RemoveProcessor removes a processor by identity.

func (*Converge) Scan

func (c *Converge) Scan(t *Thought) (string, error)

Scan retrieves the synthesis result from a thought.

func (*Converge) Schema

func (c *Converge) Schema() pipz.Node

Schema implements pipz.Chainable[*Thought].

func (*Converge) WithProvider

func (c *Converge) WithProvider(p Provider) *Converge

WithProvider sets the provider for synthesis.

func (*Converge) WithSynthesisTemperature

func (c *Converge) WithSynthesisTemperature(temp float32) *Converge

WithSynthesisTemperature sets the temperature for the synthesis phase.

func (*Converge) WithTemperature

func (c *Converge) WithTemperature(temp float32) *Converge

WithTemperature sets the default temperature for synthesis.

type Decide

type Decide struct {
	// contains filtered or unexported fields
}

Decide is a binary decision primitive that implements pipz.Chainable[*Thought]. It asks the LLM a yes/no question and stores the full response for typed retrieval.

func NewDecide

func NewDecide(key, question string) *Decide

NewDecide creates a new binary decision primitive with introspection enabled by default.

The primitive uses two zyn synapses:

  1. Binary synapse: Makes the decision and provides reasoning
  2. Transform synapse: Synthesizes a semantic summary for context accumulation

Output Notes:

  • {key}: JSON-serialized zyn.BinaryResponse
  • {key}_summary: Semantic summary for next steps (if introspection enabled)

Example:

step := cogito.NewDecide("is_urgent", "Is this ticket urgent?")
result, _ := step.Process(ctx, thought)
resp, _ := step.Scan(result)
fmt.Println(resp.Decision, resp.Confidence, resp.Reasoning)

func (*Decide) Close

func (d *Decide) Close() error

Close implements pipz.Chainable[*Thought].

func (*Decide) Identity

func (d *Decide) Identity() pipz.Identity

Identity implements pipz.Chainable[*Thought].

func (*Decide) Process

func (d *Decide) Process(ctx context.Context, t *Thought) (*Thought, error)

Process implements pipz.Chainable[*Thought].

func (*Decide) Scan

func (d *Decide) Scan(t *Thought) (*zyn.BinaryResponse, error)

Scan retrieves the typed binary response from a thought.

func (*Decide) Schema

func (d *Decide) Schema() pipz.Node

Schema implements pipz.Chainable[*Thought].

func (*Decide) WithIntrospection

func (d *Decide) WithIntrospection() *Decide

WithIntrospection enables the introspection phase.

func (*Decide) WithIntrospectionTemperature

func (d *Decide) WithIntrospectionTemperature(temp float32) *Decide

WithIntrospectionTemperature sets the temperature for the introspection phase.

func (*Decide) WithProvider

func (d *Decide) WithProvider(p Provider) *Decide

WithProvider sets the provider for this step.

func (*Decide) WithReasoningTemperature

func (d *Decide) WithReasoningTemperature(temp float32) *Decide

WithReasoningTemperature sets the temperature for the reasoning phase.

func (*Decide) WithSummaryKey

func (d *Decide) WithSummaryKey(key string) *Decide

WithSummaryKey sets a custom key for the introspection summary note.

func (*Decide) WithTemperature

func (d *Decide) WithTemperature(temp float32) *Decide

WithTemperature sets the default temperature for this step.

type Discern

type Discern struct {
	// contains filtered or unexported fields
}

Discern is an LLM-powered semantic routing connector that implements pipz.Chainable[*Thought]. It uses zyn.Classification directly to determine which route to take based on semantic analysis.

func NewDiscern

func NewDiscern(key, question string, categories []string) *Discern

NewDiscern creates a new semantic routing connector.

The connector uses zyn.Classification to determine which route to take. Routes are matched against the Primary category from the classification response.

Output Notes:

  • {key}: JSON-serialized zyn.ClassificationResponse
  • {key}_summary: Semantic summary for next steps (if introspection enabled)

Example:

router := cogito.NewDiscern(
    "ticket_route",
    "What type of support ticket is this?",
    []string{"billing", "technical", "general"},
)
router.AddRoute("billing", billingPipeline)
router.AddRoute("technical", technicalPipeline)
router.SetFallback(generalPipeline)

func (*Discern) AddRoute

func (d *Discern) AddRoute(category string, processor pipz.Chainable[*Thought]) *Discern

AddRoute adds or updates a route for a category.

func (*Discern) ClearRoutes

func (d *Discern) ClearRoutes() *Discern

ClearRoutes removes all routes.

func (*Discern) Close

func (d *Discern) Close() error

Close implements pipz.Chainable[*Thought]. Propagates Close to all registered routes and fallback.

func (*Discern) HasRoute

func (d *Discern) HasRoute(category string) bool

HasRoute checks if a route exists for a category.

func (*Discern) Identity

func (d *Discern) Identity() pipz.Identity

Identity implements pipz.Chainable[*Thought].

func (*Discern) Process

func (d *Discern) Process(ctx context.Context, t *Thought) (*Thought, error)

Process implements pipz.Chainable[*Thought].

func (*Discern) RemoveRoute

func (d *Discern) RemoveRoute(category string) *Discern

RemoveRoute removes a route for a category.

func (*Discern) Routes

func (d *Discern) Routes() map[string]pipz.Chainable[*Thought]

Routes returns a copy of the current routes map.

func (*Discern) Scan

Scan retrieves the typed classification response from a thought.

func (*Discern) Schema

func (d *Discern) Schema() pipz.Node

Schema implements pipz.Chainable[*Thought].

func (*Discern) SetFallback

func (d *Discern) SetFallback(processor pipz.Chainable[*Thought]) *Discern

SetFallback sets the fallback processor for unmatched categories.

func (*Discern) WithIntrospection

func (d *Discern) WithIntrospection() *Discern

WithIntrospection enables the introspection phase.

func (*Discern) WithIntrospectionTemperature

func (d *Discern) WithIntrospectionTemperature(temp float32) *Discern

WithIntrospectionTemperature sets the temperature for the introspection phase.

func (*Discern) WithProvider

func (d *Discern) WithProvider(p Provider) *Discern

WithProvider sets the provider for classification.

func (*Discern) WithReasoningTemperature

func (d *Discern) WithReasoningTemperature(temp float32) *Discern

WithReasoningTemperature sets the temperature for the classification phase.

func (*Discern) WithSummaryKey

func (d *Discern) WithSummaryKey(key string) *Discern

WithSummaryKey sets a custom key for the introspection summary note.

func (*Discern) WithTemperature

func (d *Discern) WithTemperature(temp float32) *Discern

WithTemperature sets the default temperature for classification.

type Note

type Note struct {
	ID        string            `db:"id" type:"uuid" constraints:"primarykey" default:"gen_random_uuid()"`
	ThoughtID string            `db:"thought_id" type:"uuid" constraints:"notnull" references:"thoughts(id)"`
	Key       string            `db:"key" type:"text" constraints:"notnull"`
	Content   string            `db:"content" type:"text" constraints:"notnull"`
	Metadata  map[string]string `db:"metadata" type:"jsonb" default:"'{}'"`
	Source    string            `db:"source" type:"text" constraints:"notnull"`
	Created   time.Time         `db:"created" type:"timestamp" constraints:"notnull"`
}

Note represents a semantic piece of information in the reasoning chain. Everything in LLM space is fundamentally text-based, so Content is always a string. Metadata provides structured extension without breaking type safety.

type Prioritize

type Prioritize struct {
	// contains filtered or unexported fields
}

Prioritize is a prioritization primitive that implements pipz.Chainable[*Thought]. It prioritizes items by criteria and stores the full response for typed retrieval.

func NewPrioritize

func NewPrioritize(key, criteria string, items []string) *Prioritize

NewPrioritize creates a new prioritization primitive with explicit items to prioritize.

The primitive uses two zyn synapses:

  1. Ranking synapse: Prioritizes items by criteria
  2. Transform synapse: Synthesizes a semantic summary for context accumulation

Output Notes:

  • {key}: JSON-serialized zyn.RankingResponse
  • {key}_summary: Semantic summary for next steps (if introspection enabled)

Example:

items := []string{
    "ticket_1: Login broken",
    "ticket_2: Feature request",
    "ticket_3: Critical outage",
}
step := cogito.NewPrioritize("ticket_priority", "urgency and impact", items)
result, _ := step.Process(ctx, thought)
resp, _ := step.Scan(result)
fmt.Println(resp.Ranked, resp.Confidence, resp.Reasoning)

func NewPrioritizeFrom

func NewPrioritizeFrom(key, criteria, itemsKey string) *Prioritize

NewPrioritizeFrom creates a new prioritization primitive that reads items from a note. The items are read from the specified note key (expected to be JSON array of strings).

The primitive uses two zyn synapses:

  1. Ranking synapse: Prioritizes items by criteria
  2. Transform synapse: Synthesizes a semantic summary for context accumulation

Output Notes:

  • {key}: JSON-serialized zyn.RankingResponse
  • {key}_summary: Semantic summary for next steps (if introspection enabled)

Example:

// First, extract items into a note
type TicketList struct {
    Tickets []string `json:"tickets"`
}
cogito.NewAnalyze[TicketList]("ticket_list", "list of all tickets"),

// Then rank them
cogito.NewPrioritizeFrom("ticket_priority", "urgency and impact", "ticket_list"),

func (*Prioritize) Close

func (r *Prioritize) Close() error

Close implements pipz.Chainable[*Thought].

func (*Prioritize) Identity

func (r *Prioritize) Identity() pipz.Identity

Identity implements pipz.Chainable[*Thought].

func (*Prioritize) Process

func (r *Prioritize) Process(ctx context.Context, t *Thought) (*Thought, error)

Process implements pipz.Chainable[*Thought].

func (*Prioritize) Scan

func (r *Prioritize) Scan(t *Thought) (*zyn.RankingResponse, error)

Scan retrieves the typed ranking response from a thought.

func (*Prioritize) Schema

func (r *Prioritize) Schema() pipz.Node

Schema implements pipz.Chainable[*Thought].

func (*Prioritize) WithIntrospection

func (r *Prioritize) WithIntrospection() *Prioritize

WithIntrospection enables the introspection phase.

func (*Prioritize) WithIntrospectionTemperature

func (r *Prioritize) WithIntrospectionTemperature(temp float32) *Prioritize

WithIntrospectionTemperature sets the temperature for the introspection phase.

func (*Prioritize) WithProvider

func (r *Prioritize) WithProvider(p Provider) *Prioritize

WithProvider sets the provider for this step.

func (*Prioritize) WithReasoningTemperature

func (r *Prioritize) WithReasoningTemperature(temp float32) *Prioritize

WithReasoningTemperature sets the temperature for the reasoning phase.

func (*Prioritize) WithSummaryKey

func (r *Prioritize) WithSummaryKey(key string) *Prioritize

WithSummaryKey sets a custom key for the introspection summary note.

func (*Prioritize) WithTemperature

func (r *Prioritize) WithTemperature(temp float32) *Prioritize

WithTemperature sets the default temperature for this step.

type Provider

type Provider interface {
	Call(ctx context.Context, messages []zyn.Message, temperature float32) (*zyn.ProviderResponse, error)
	Name() string
}

Provider defines the interface for LLM providers. This matches zyn.Provider interface for compatibility.

func GetProvider

func GetProvider() Provider

GetProvider returns the global provider, if set.

func ProviderFromContext

func ProviderFromContext(ctx context.Context) (Provider, bool)

ProviderFromContext retrieves the provider from context, if present.

func ResolveProvider

func ResolveProvider(ctx context.Context, stepProvider Provider) (Provider, error)

ResolveProvider determines which provider to use based on resolution order: 1. Step-level provider (passed as argument) 2. Context provider 3. Global provider 4. Error if none found.

type Reflect

type Reflect struct {
	// contains filtered or unexported fields
}

Reflect is a memory primitive that summarizes the current Thought's notes into a single consolidated note. This enables self-compression for long reasoning chains,. allowing the agent to "step back" and consolidate its accumulated context.

func NewReflect

func NewReflect(key string) *Reflect

NewReflect creates a new reflect primitive.

The primitive:

  1. Gathers notes from the current Thought
  2. Renders them to text
  3. Summarizes via LLM transform synapse
  4. Stores the summary as a new note

Output Notes:

  • {key}: LLM-generated summary/reflection of the Thought's notes

Example:

// Consolidate reasoning after many steps
reflect := cogito.NewReflect("consolidated_context").
    WithPrompt("Synthesize all findings into key insights and next steps")
result, _ := reflect.Process(ctx, thought)

func (*Reflect) Close

func (r *Reflect) Close() error

Close implements pipz.Chainable[*Thought].

func (*Reflect) Identity

func (r *Reflect) Identity() pipz.Identity

Identity implements pipz.Chainable[*Thought].

func (*Reflect) Process

func (r *Reflect) Process(ctx context.Context, t *Thought) (*Thought, error)

Process implements pipz.Chainable[*Thought].

func (*Reflect) Schema

func (r *Reflect) Schema() pipz.Node

Schema implements pipz.Chainable[*Thought].

func (*Reflect) WithPrompt

func (r *Reflect) WithPrompt(prompt string) *Reflect

WithPrompt sets a custom reflection prompt.

func (*Reflect) WithProvider

func (r *Reflect) WithProvider(p Provider) *Reflect

WithProvider sets the provider for the LLM call.

func (*Reflect) WithUnpublishedOnly

func (r *Reflect) WithUnpublishedOnly() *Reflect

WithUnpublishedOnly limits reflection to unpublished notes only. This is useful for reflecting on recent work without re-processing context that has already been sent to the LLM.

type Reset

type Reset struct {
	// contains filtered or unexported fields
}

Reset is a session management primitive that implements pipz.Chainable[*Thought]. It clears the session entirely, optionally injecting a starting message.

func NewReset

func NewReset(key string) *Reset

NewReset creates a new session reset primitive.

The primitive clears all messages from the session. Optionally, a system message can be injected into the fresh session. No LLM call is made.

Example:

reset := cogito.NewReset("session_reset").
    WithSystemMessage("You are a helpful assistant.")
result, _ := reset.Process(ctx, thought)

// Or preserve context from a note:
reset := cogito.NewReset("session_reset").
    WithPreserveNote("session_compress")  // Use compression summary as new context

func (*Reset) Close

func (r *Reset) Close() error

Close implements pipz.Chainable[*Thought].

func (*Reset) Identity

func (r *Reset) Identity() pipz.Identity

Identity implements pipz.Chainable[*Thought].

func (*Reset) Process

func (r *Reset) Process(ctx context.Context, t *Thought) (*Thought, error)

Process implements pipz.Chainable[*Thought].

func (*Reset) Schema

func (r *Reset) Schema() pipz.Node

Schema implements pipz.Chainable[*Thought].

func (*Reset) WithPreserveNote

func (r *Reset) WithPreserveNote(noteKey string) *Reset

WithPreserveNote sets a note key whose content will be used as the system message. This is useful for chaining with Compress—use the summary as the new context. If the note doesn't exist, falls back to WithSystemMessage value.

func (*Reset) WithSystemMessage

func (r *Reset) WithSystemMessage(msg string) *Reset

WithSystemMessage sets a system message to inject after clearing.

type Sift

type Sift struct {
	// contains filtered or unexported fields
}

Sift is an LLM-powered conditional gate that implements pipz.Chainable[*Thought]. It uses semantic reasoning to decide whether to execute a wrapped processor or pass through unchanged.

Unlike pipz.Filter which uses a programmatic condition, Sift uses an LLM to make the decision based on semantic understanding of the context. This enables gates based on meaning rather than. simple data inspection.

func NewSift

func NewSift(key, question string, processor pipz.Chainable[*Thought]) *Sift

NewSift creates a new semantic gate primitive.

The primitive uses zyn.Binary to decide whether to execute the processor. If the decision is true, the processor is executed. Otherwise, the thought passes through unchanged.

Output Notes:

  • {key}: JSON-serialized zyn.BinaryResponse (the gate decision)
  • {key}_summary: Semantic summary for next steps (if introspection enabled)

Example:

gate := cogito.NewSift(
    "escalation_gate",
    "Does this ticket require human escalation?",
    humanEscalationPipeline,
)
result, _ := gate.Process(ctx, thought)
resp, _ := gate.Scan(result)
fmt.Println("Escalated:", resp.Decision)

func (*Sift) Close

func (s *Sift) Close() error

Close implements pipz.Chainable[*Thought]. Propagates Close to the wrapped processor.

func (*Sift) Identity

func (s *Sift) Identity() pipz.Identity

Identity implements pipz.Chainable[*Thought].

func (*Sift) Process

func (s *Sift) Process(ctx context.Context, t *Thought) (*Thought, error)

Process implements pipz.Chainable[*Thought].

func (*Sift) Scan

func (s *Sift) Scan(t *Thought) (*zyn.BinaryResponse, error)

Scan retrieves the typed binary response from a thought.

func (*Sift) Schema

func (s *Sift) Schema() pipz.Node

Schema implements pipz.Chainable[*Thought].

func (*Sift) SetProcessor

func (s *Sift) SetProcessor(processor pipz.Chainable[*Thought]) *Sift

SetProcessor updates the wrapped processor.

func (*Sift) WithIntrospection

func (s *Sift) WithIntrospection() *Sift

WithIntrospection enables the introspection phase.

func (*Sift) WithIntrospectionTemperature

func (s *Sift) WithIntrospectionTemperature(temp float32) *Sift

WithIntrospectionTemperature sets the temperature for the introspection phase.

func (*Sift) WithProvider

func (s *Sift) WithProvider(p Provider) *Sift

WithProvider sets the provider for this step.

func (*Sift) WithReasoningTemperature

func (s *Sift) WithReasoningTemperature(temp float32) *Sift

WithReasoningTemperature sets the temperature for the gate decision phase.

func (*Sift) WithSummaryKey

func (s *Sift) WithSummaryKey(key string) *Sift

WithSummaryKey sets a custom key for the introspection summary note.

func (*Sift) WithTemperature

func (s *Sift) WithTemperature(temp float32) *Sift

WithTemperature sets the default temperature for this step.

type Thought

type Thought struct {
	// Identity
	ID      string `db:"id" type:"uuid" constraints:"primarykey" default:"gen_random_uuid()"`
	Intent  string `db:"intent" type:"text" constraints:"notnull"`
	TraceID string `db:"trace_id" type:"text" constraints:"notnull,unique"`

	// Lineage
	ParentID *string `db:"parent_id" type:"uuid" references:"thoughts(id)"`

	// LLM conversation state
	Session *zyn.Session // Shared session for LLM continuity

	// Timestamps
	CreatedAt time.Time `db:"created_at" type:"timestamp" constraints:"notnull"`
	UpdatedAt time.Time `db:"updated_at" type:"timestamp" constraints:"notnull"`
	// contains filtered or unexported fields
}

Thought represents the rolling context of a chain of thought. It maintains an append-only log of Notes, providing semantic reasoning history.

Concurrency

Thought is safe for concurrent reads but not concurrent writes. Multiple goroutines may call read methods (GetNote, GetContent, AllNotes, etc.). simultaneously, but write methods (AddNote, SetContent, MarkNotesPublished). must not be called concurrently with each other or with reads.

For parallel processing, use Clone to create independent copies for each goroutine. The cloned thoughts can then be merged or selected as needed.

Failure Behavior

Primitive steps (Decide, Classify, Analyze, etc.) may modify the thought before encountering an error. If a step returns an error, the thought may. be in a partially-modified state. Callers requiring atomicity should Clone. the thought before processing and discard it on failure.

func New

func New(ctx context.Context, intent string) *Thought

New creates a new Thought with the given intent. ID and TraceID are auto-generated using UUID.

func NewWithTrace

func NewWithTrace(ctx context.Context, intent, traceID string) *Thought

NewWithTrace creates a new Thought with an explicit trace ID.

func (*Thought) AddNote

func (t *Thought) AddNote(ctx context.Context, note Note) error

AddNote adds a new note to the thought. If a note with the same key exists, the new note becomes the current value.

func (*Thought) AllNotes

func (t *Thought) AllNotes() []Note

AllNotes returns all notes in chronological order.

func (*Thought) Clone

func (t *Thought) Clone() *Thought

Clone creates a deep copy of the thought for concurrent processing. Required for pipz.Concurrent and other parallel operations.

The clone has an independent Session and notes. Modifications to the clone do not affect the original and vice versa.

Note: Clone should only be called when the original thought is not being concurrently modified. The Session.Messages() call is internally synchronized, but concurrent writes to the original thought during clone could result in an inconsistent snapshot.

func (*Thought) GetBool

func (t *Thought) GetBool(key string) (bool, error)

GetBool parses the content as a boolean ("true"/"false").

func (*Thought) GetContent

func (t *Thought) GetContent(key string) (string, error)

GetContent retrieves the content of the most recent note with the given key.

func (*Thought) GetFloat

func (t *Thought) GetFloat(key string) (float64, error)

GetFloat parses the content as a float64.

func (*Thought) GetInt

func (t *Thought) GetInt(key string) (int, error)

GetInt parses the content as an int.

func (*Thought) GetLatestNote

func (t *Thought) GetLatestNote() (Note, bool)

GetLatestNote returns the most recently added note.

func (*Thought) GetMetadata

func (t *Thought) GetMetadata(key, field string) (string, error)

GetMetadata retrieves a specific metadata field from the most recent note.

func (*Thought) GetNote

func (t *Thought) GetNote(key string) (Note, bool)

GetNote retrieves the most recent note with the given key.

func (*Thought) GetUnpublishedNotes

func (t *Thought) GetUnpublishedNotes() []Note

GetUnpublishedNotes returns all notes that have not yet been sent to the LLM. These are notes added after the last MarkNotesPublished call.

func (*Thought) MarkNotesPublished

func (t *Thought) MarkNotesPublished(ctx context.Context)

MarkNotesPublished marks all current notes as published to the LLM. This should be called after successfully sending notes to a synapse.

func (*Thought) PublishedCount

func (t *Thought) PublishedCount() int

PublishedCount returns the number of notes that have been published to the LLM.

func (*Thought) SetContent

func (t *Thought) SetContent(ctx context.Context, key, content, source string) error

SetContent adds a simple note with just key and content.

func (*Thought) SetNote

func (t *Thought) SetNote(ctx context.Context, key, content, source string, metadata map[string]string) error

SetNote adds a note with metadata.

func (*Thought) SetPublishedCount

func (t *Thought) SetPublishedCount(count int)

SetPublishedCount sets the published count directly. This is primarily used for restoring thought state from persistence.

type Truncate

type Truncate struct {
	// contains filtered or unexported fields
}

Truncate is a session management primitive that implements pipz.Chainable[*Thought]. It removes messages from the session without LLM involvement, using a sliding window approach.

func NewTruncate

func NewTruncate(key string) *Truncate

NewTruncate creates a new session truncation primitive.

The primitive removes messages from the middle of the session, preserving the first N messages (typically system prompts) and the last M messages. (recent context). No LLM call is made.

Example:

truncate := cogito.NewTruncate("session_truncate").
    WithKeepFirst(1).   // Preserve system prompt
    WithKeepLast(10).   // Keep last 10 messages
    WithThreshold(20)   // Only truncate if >= 20 messages
result, _ := truncate.Process(ctx, thought)

func (*Truncate) Close

func (tr *Truncate) Close() error

Close implements pipz.Chainable[*Thought].

func (*Truncate) Identity

func (tr *Truncate) Identity() pipz.Identity

Identity implements pipz.Chainable[*Thought].

func (*Truncate) Process

func (tr *Truncate) Process(ctx context.Context, t *Thought) (*Thought, error)

Process implements pipz.Chainable[*Thought].

func (*Truncate) Schema

func (tr *Truncate) Schema() pipz.Node

Schema implements pipz.Chainable[*Thought].

func (*Truncate) WithKeepFirst

func (tr *Truncate) WithKeepFirst(n int) *Truncate

WithKeepFirst sets the number of messages to preserve from the start. Typically used to preserve system prompts.

func (*Truncate) WithKeepLast

func (tr *Truncate) WithKeepLast(n int) *Truncate

WithKeepLast sets the number of recent messages to preserve.

func (*Truncate) WithThreshold

func (tr *Truncate) WithThreshold(n int) *Truncate

WithThreshold sets the minimum message count to trigger truncation. If the session has fewer messages, the primitive passes through unchanged.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL