module

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 15, 2025 License: MIT Imports: 17 Imported by: 0

README

internal/module

High-level DSGo behaviors built on top of internal/core.

Composition patterns

flowchart LR
  P[Predict] --> PR[Program]
  C[ChainOfThought] --> PR
  R[ReAct] --> PR
  RF[Refine] --> PR

  PR --> B[BestOfN]
  PR --> PA[Parallel]
  PR --> POT[ProgramOfThought]
  PR --> MCC[MultiChainComparison]

Tiny composition examples

Program
program := dsgo.NewProgram("pipeline").
  AddModule(classifier).
  AddModule(generator)

pred, err := program.Forward(ctx, inputs)
Parallel
par := dsgo.NewParallel(workerModule).WithMaxWorkers(10)

pred, err := par.Forward(ctx, inputs)
BestOfN
best := dsgo.NewBestOfN(workerModule, 5).
  WithScorer(dsgo.DefaultScorer()).
  WithParallel(true)

pred, err := best.Forward(ctx, inputs)
Refine
ref := dsgo.NewRefine(sig, lm).WithMaxIterations(2)

// First pass
pred1, err := ref.Forward(ctx, inputs)

// Second pass (triggers refinement)
pred2, err := ref.Forward(ctx, map[string]any{
  "feedback": "Make it shorter",
  // ...plus original inputs as needed...
})
ProgramOfThought
pot := dsgo.NewProgramOfThought(sig, lm, "python")

pred, err := pot.Forward(ctx, inputs)
MultiChainComparison
mcc := dsgo.NewMultiChainComparison(sig, lm, 3)

pred, err := mcc.Forward(ctx, map[string]any{
  "completions": []any{predA.Outputs, predB.Outputs, predC.Outputs},
})

Examples

Documentation

Index

Constants

View Source
const (
	MaxReActIterations = 10
)

Variables

This section is empty.

Functions

This section is empty.

Types

type BestOfN

type BestOfN struct {
	Module      core.Module
	N           int
	Scorer      ScoringFunction
	Parallel    bool
	ReturnAll   bool
	MaxFailures int     // Maximum number of failures before giving up
	Threshold   float64 // Early-stop if score meets or exceeds this threshold
}

BestOfN executes a module N times and returns the best result.

Thread Safety: BestOfN is now completely thread-safe. When using WithParallel(true), the module automatically creates independent instances for parallel execution, eliminating data race concerns even with stateful modules like Predict, ChainOfThought, and ReAct that maintain internal History.

func NewBestOfN

func NewBestOfN(module core.Module, n int) *BestOfN

NewBestOfN creates a new BestOfN module

func (*BestOfN) Clone added in v0.6.0

func (b *BestOfN) Clone() core.Module

Clone creates an independent copy of BestOfN module

func (*BestOfN) Forward

func (b *BestOfN) Forward(ctx context.Context, inputs map[string]any) (*core.Prediction, error)

Forward executes the module N times and returns the best result

func (*BestOfN) GetSignature

func (b *BestOfN) GetSignature() *core.Signature

GetSignature returns the module's signature

func (*BestOfN) WithMaxFailures

func (b *BestOfN) WithMaxFailures(max int) *BestOfN

WithMaxFailures sets the maximum number of failures before giving up

func (*BestOfN) WithParallel

func (b *BestOfN) WithParallel(parallel bool) *BestOfN

WithParallel enables parallel execution.

func (*BestOfN) WithReturnAll

func (b *BestOfN) WithReturnAll(returnAll bool) *BestOfN

WithReturnAll enables returning all results, not just the best

func (*BestOfN) WithScorer

func (b *BestOfN) WithScorer(scorer ScoringFunction) *BestOfN

WithScorer sets the scoring function

func (*BestOfN) WithThreshold

func (b *BestOfN) WithThreshold(threshold float64) *BestOfN

WithThreshold sets the early-stop threshold

type BestOfNResult

type BestOfNResult struct {
	BestOutput   map[string]any
	BestScore    float64
	AllOutputs   []map[string]any
	AllScores    []float64
	FailureCount int
}

BestOfNResult contains the results of BestOfN execution (deprecated - use Prediction.Completions)

type ChainOfThought

type ChainOfThought struct {
	Signature *core.Signature
	LM        core.LM
	Options   *core.GenerateOptions
	Adapter   core.Adapter
	History   *core.History  // Optional conversation history
	Demos     []core.Example // Optional few-shot examples
}

ChainOfThought module encourages step-by-step reasoning

func NewChainOfThought

func NewChainOfThought(signature *core.Signature, lm core.LM) *ChainOfThought

NewChainOfThought creates a new ChainOfThought module

func (*ChainOfThought) Clone added in v0.6.0

func (cot *ChainOfThought) Clone() core.Module

Clone creates an independent copy of ChainOfThought module

func (*ChainOfThought) Forward

func (cot *ChainOfThought) Forward(ctx context.Context, inputs map[string]any) (*core.Prediction, error)

Forward executes the chain of thought reasoning

func (*ChainOfThought) GetSignature

func (cot *ChainOfThought) GetSignature() *core.Signature

GetSignature returns the module's signature

func (*ChainOfThought) WithAdapter

func (cot *ChainOfThought) WithAdapter(adapter core.Adapter) *ChainOfThought

WithAdapter sets a custom adapter

func (*ChainOfThought) WithDemos

func (cot *ChainOfThought) WithDemos(demos []core.Example) *ChainOfThought

WithDemos sets few-shot examples for in-context learning

func (*ChainOfThought) WithHistory

func (cot *ChainOfThought) WithHistory(history *core.History) *ChainOfThought

WithHistory sets conversation history for multi-turn interactions

func (*ChainOfThought) WithOptions

func (cot *ChainOfThought) WithOptions(options *core.GenerateOptions) *ChainOfThought

WithOptions sets custom generation options

type MultiChainComparison added in v0.6.3

type MultiChainComparison struct {
	BaseSignature *core.Signature // Original signature without transformations

	LM core.LM // Language model for synthesis
	M  int     // Number of reasoning attempts

	AttemptTemplate string // Template for formatting attempts
	Options         *core.GenerateOptions
	Adapter         core.Adapter
	History         *core.History  // Optional conversation history
	Demos           []core.Example // Optional few-shot examples
	// contains filtered or unexported fields
}

MultiChainComparison synthesizes the best answer from M reasoning attempts. It follows DSPy's design: accepts pre-generated completions and performs synthesis. The signature is transformed to include reasoning attempts as INPUT fields.

func NewMultiChainComparison added in v0.6.3

func NewMultiChainComparison(baseSignature *core.Signature, lm core.LM, m int) *MultiChainComparison

NewMultiChainComparison creates a new MultiChainComparison module. It accepts a base signature, language model, and number of reasoning attempts M.

func (*MultiChainComparison) Clone added in v0.6.3

func (mcc *MultiChainComparison) Clone() core.Module

Clone creates an independent copy of MultiChainComparison module.

func (*MultiChainComparison) Forward added in v0.6.3

func (mcc *MultiChainComparison) Forward(ctx context.Context, inputs map[string]any) (*core.Prediction, error)

Forward executes multi-chain comparison synthesis. It expects completions to be provided via inputs["completions"].

func (*MultiChainComparison) GetSignature added in v0.6.3

func (mcc *MultiChainComparison) GetSignature() *core.Signature

GetSignature returns the base signature (not the internal transformed one). This preserves the original API contract for composability.

func (*MultiChainComparison) WithAdapter added in v0.6.3

func (mcc *MultiChainComparison) WithAdapter(adapter core.Adapter) *MultiChainComparison

WithAdapter sets a custom adapter.

func (*MultiChainComparison) WithAttemptTemplate added in v0.6.3

func (mcc *MultiChainComparison) WithAttemptTemplate(template string) *MultiChainComparison

WithAttemptTemplate sets the template for formatting reasoning attempts.

func (*MultiChainComparison) WithDemos added in v0.6.3

func (mcc *MultiChainComparison) WithDemos(demos []core.Example) *MultiChainComparison

WithDemos sets few-shot examples for the synthesis module.

func (*MultiChainComparison) WithHistory added in v0.6.3

func (mcc *MultiChainComparison) WithHistory(history *core.History) *MultiChainComparison

WithHistory sets conversation history for the synthesis module.

func (*MultiChainComparison) WithOptions added in v0.6.3

func (mcc *MultiChainComparison) WithOptions(options *core.GenerateOptions) *MultiChainComparison

WithOptions sets custom generation options.

func (*MultiChainComparison) WithTemperature added in v0.6.3

func (mcc *MultiChainComparison) WithTemperature(temp float64) *MultiChainComparison

WithTemperature sets the temperature for synthesis.

type Parallel

type Parallel struct {
	// contains filtered or unexported fields
}

Parallel executes a module across multiple inputs concurrently.

By default, Parallel creates isolated module instances by cloning the base module for each task, ensuring no shared state between parallel executions. This provides semantic isolation similar to DSPy's Parallel behavior.

Advanced usage patterns:

  • Default: Module is cloned per task (isolated state)
  • NewParallelWithFactory: Custom instance creation per task
  • NewParallelWithInstances: Pre-created instances with controlled sharing

Input modes:

  • Batch: inputs["_batch"] = []map[string]any
  • Map-of-slices: any []any values are zipped (must have equal length)
  • Repeat: WithRepeat(n) duplicates single input n times

func NewParallel

func NewParallel(module core.Module) *Parallel

NewParallel creates a Parallel module with automatic module cloning. The base module is cloned for each task to ensure state isolation. This is the recommended default for all modules, including stateful ones.

func NewParallelWithFactory

func NewParallelWithFactory(factory func(i int) core.Module) *Parallel

NewParallelWithFactory creates a Parallel module with a factory function. The factory is called for each task with the task index. This is the recommended approach for stateful modules.

func NewParallelWithInstances

func NewParallelWithInstances(instances []core.Module) *Parallel

NewParallelWithInstances creates a Parallel module with pre-created instances. Each task will use instances[i % len(instances)].

func (*Parallel) Clone added in v0.6.0

func (p *Parallel) Clone() core.Module

Clone creates an independent copy of Parallel module

func (*Parallel) Forward

func (p *Parallel) Forward(ctx context.Context, inputs map[string]any) (*core.Prediction, error)

Forward executes the module in parallel across expanded inputs

func (*Parallel) GetSignature

func (p *Parallel) GetSignature() *core.Signature

GetSignature returns the wrapped module's signature

func (*Parallel) WithBatchKey

func (p *Parallel) WithBatchKey(key string) *Parallel

WithBatchKey sets the key to use for batch input (default: "_batch")

func (*Parallel) WithFailFast

func (p *Parallel) WithFailFast(on bool) *Parallel

WithFailFast enables cancellation on first failure

func (*Parallel) WithMaxFailures

func (p *Parallel) WithMaxFailures(n int) *Parallel

WithMaxFailures sets the maximum number of failures before giving up. Set to 0 to require all tasks to succeed.

func (*Parallel) WithMaxWorkers

func (p *Parallel) WithMaxWorkers(n int) *Parallel

WithMaxWorkers sets the maximum number of concurrent workers

func (*Parallel) WithOnlySuccessful

func (p *Parallel) WithOnlySuccessful(on bool) *Parallel

WithOnlySuccessful filters failures from Completions (only when ReturnAll is true)

func (*Parallel) WithRepeat

func (p *Parallel) WithRepeat(n int) *Parallel

WithRepeat sets the number of times to repeat the same input

func (*Parallel) WithReturnAll

func (p *Parallel) WithReturnAll(on bool) *Parallel

WithReturnAll enables returning all results in Completions

func (*Parallel) WithVerbose added in v0.6.0

func (p *Parallel) WithVerbose(verbose bool) *Parallel

WithVerbose enables verbose logging of parallel execution details.

Verbose logging contract (schema v1)

When verbose is enabled, Parallel emits the following log messages:

  • "Parallel batch started" (always INFO)
  • "Parallel task started"
  • "Parallel task completed"
  • "Parallel task failed"
  • "Parallel batch completed"

Per-task and batch-completed logs are emitted at INFO when verbose, DEBUG otherwise. Each log includes a stable set of structured fields to make it easy to filter and aggregate. The canonical module name is always "module.Parallel".

Common fields (present on all Parallel verbose logs):

  • module: "module.Parallel"
  • parallel_id: correlation identifier for the whole batch
  • parallel_mode: one of "clone", "factory", "instances"
  • inner_module: inner module type name (best-effort; may be empty for factory mode at batch start)
  • lm_model: LM model name (best-effort)
  • batch_size: number of tasks
  • max_workers: maximum concurrent workers
  • fail_fast, max_failures, return_all, only_success, repeat_factor, batch_key, verbose

Task fields (present on per-task logs):

  • task_index: 0-based index within the batch
  • task_total: total tasks in the batch (equals batch_size)
  • inputs: summarized inputs (truncated; complex types are summarized)

When enabled, per-task logs are emitted at INFO level instead of DEBUG.

type ParallelMetrics

type ParallelMetrics struct {
	Total     int `json:"total"`
	Successes int `json:"successes"`
	Failures  int `json:"failures"`
	Latency   struct {
		MinMs int64 `json:"min_ms"`
		MaxMs int64 `json:"max_ms"`
		AvgMs int64 `json:"avg_ms"`
		P50Ms int64 `json:"p50_ms"`
	} `json:"latency"`
}

ParallelMetrics contains execution metrics for parallel execution

type Predict

type Predict struct {
	Signature *core.Signature
	LM        core.LM
	Options   *core.GenerateOptions
	Adapter   core.Adapter
	History   *core.History  // Optional conversation history
	Demos     []core.Example // Optional few-shot examples
}

Predict is the basic prediction module

func NewPredict

func NewPredict(signature *core.Signature, lm core.LM) *Predict

NewPredict creates a new Predict module

func (*Predict) Clone added in v0.6.0

func (p *Predict) Clone() core.Module

Clone creates an independent copy of Predict module

func (*Predict) Forward

func (p *Predict) Forward(ctx context.Context, inputs map[string]any) (*core.Prediction, error)

Forward executes the prediction

func (*Predict) GetSignature

func (p *Predict) GetSignature() *core.Signature

GetSignature returns the module's signature

func (*Predict) Stream

func (p *Predict) Stream(ctx context.Context, inputs map[string]any) (*StreamResult, error)

Stream executes the prediction with streaming output Returns channels for chunks, final prediction, and errors The chunks channel emits incremental content in real-time The prediction channel emits the final parsed prediction after the stream completes The errors channel emits any errors that occur during streaming or parsing

func (*Predict) WithAdapter

func (p *Predict) WithAdapter(adapter core.Adapter) *Predict

WithAdapter sets a custom adapter

func (*Predict) WithDemos

func (p *Predict) WithDemos(demos []core.Example) *Predict

WithDemos sets few-shot examples for in-context learning

func (*Predict) WithHistory

func (p *Predict) WithHistory(history *core.History) *Predict

WithHistory sets conversation history for multi-turn interactions

func (*Predict) WithOptions

func (p *Predict) WithOptions(options *core.GenerateOptions) *Predict

WithOptions sets custom generation options

type Program

type Program struct {
	// contains filtered or unexported fields
}

Program represents a composable pipeline of modules

func NewProgram

func NewProgram(name string) *Program

NewProgram creates a new program

func (*Program) AddModule

func (p *Program) AddModule(module core.Module) *Program

AddModule adds a module to the program pipeline

func (*Program) Clone added in v0.6.0

func (p *Program) Clone() core.Module

Clone creates an independent copy of Program module

func (*Program) Forward

func (p *Program) Forward(ctx context.Context, inputs map[string]any) (*core.Prediction, error)

Forward executes the program by running modules in sequence Each module's outputs become available as inputs to subsequent modules

func (*Program) GetSignature

func (p *Program) GetSignature() *core.Signature

GetSignature returns the signature of the last module in the pipeline

func (*Program) ModuleCount

func (p *Program) ModuleCount() int

ModuleCount returns the number of modules in the program

func (*Program) Name

func (p *Program) Name() string

Name returns the program name

type ProgramOfThought

type ProgramOfThought struct {
	Signature        *core.Signature
	LM               core.LM
	Options          *core.GenerateOptions
	Language         string // "python", "javascript", "go"
	AllowExecution   bool
	ExecutionTimeout int // seconds
}

ProgramOfThought generates and executes code to solve problems This is useful for mathematical reasoning, data processing, etc.

func NewProgramOfThought

func NewProgramOfThought(signature *core.Signature, lm core.LM, language string) *ProgramOfThought

NewProgramOfThought creates a new ProgramOfThought module

func (*ProgramOfThought) Clone added in v0.6.0

func (pot *ProgramOfThought) Clone() core.Module

Clone creates an independent copy of ProgramOfThought module

func (*ProgramOfThought) Forward

func (pot *ProgramOfThought) Forward(ctx context.Context, inputs map[string]any) (*core.Prediction, error)

Forward executes the program of thought

func (*ProgramOfThought) GetSignature

func (pot *ProgramOfThought) GetSignature() *core.Signature

GetSignature returns the module's signature

func (*ProgramOfThought) WithAllowExecution

func (pot *ProgramOfThought) WithAllowExecution(allow bool) *ProgramOfThought

WithAllowExecution enables code execution (use with caution!)

func (*ProgramOfThought) WithExecutionTimeout

func (pot *ProgramOfThought) WithExecutionTimeout(seconds int) *ProgramOfThought

WithExecutionTimeout sets the execution timeout in seconds

func (*ProgramOfThought) WithOptions

func (pot *ProgramOfThought) WithOptions(options *core.GenerateOptions) *ProgramOfThought

WithOptions sets custom generation options

type ReAct

type ReAct struct {
	Signature     *core.Signature
	LM            core.LM
	Tools         []core.Tool
	Options       *core.GenerateOptions
	Adapter       core.Adapter
	History       *core.History  // Optional conversation history
	Demos         []core.Example // Optional few-shot examples
	MaxIterations int
	Verbose       bool
}

ReAct implements the Reasoning and Acting pattern

func NewReAct

func NewReAct(signature *core.Signature, lm core.LM, tools []core.Tool) *ReAct

NewReAct creates a new ReAct module

Panics if signature or lm is nil to fail fast on invalid configuration. ReAct instances are not safe for concurrent use. For parallel execution, call Clone() per concurrent worker and configure each clone before use.

func (*ReAct) Clone added in v0.6.0

func (r *ReAct) Clone() core.Module

Clone creates an independent copy of ReAct module

func (*ReAct) Forward

func (r *ReAct) Forward(ctx context.Context, inputs map[string]any) (*core.Prediction, error)

Forward executes the ReAct loop

func (*ReAct) GetSignature

func (r *ReAct) GetSignature() *core.Signature

GetSignature returns the module's signature

func (*ReAct) WithAdapter

func (r *ReAct) WithAdapter(adapter core.Adapter) *ReAct

WithAdapter sets a custom adapter

func (*ReAct) WithDemos

func (r *ReAct) WithDemos(demos []core.Example) *ReAct

WithDemos sets few-shot examples for in-context learning

func (*ReAct) WithHistory

func (r *ReAct) WithHistory(history *core.History) *ReAct

WithHistory sets conversation history for multi-turn interactions

func (*ReAct) WithMaxIterations

func (r *ReAct) WithMaxIterations(max int) *ReAct

WithMaxIterations sets the maximum number of ReAct iterations

func (*ReAct) WithOptions

func (r *ReAct) WithOptions(options *core.GenerateOptions) *ReAct

WithOptions sets custom generation options If nil is passed, defaults are used

func (*ReAct) WithVerbose

func (r *ReAct) WithVerbose(verbose bool) *ReAct

WithVerbose enables verbose logging

type Refine

type Refine struct {
	Signature       *core.Signature
	LM              core.LM
	Options         *core.GenerateOptions
	Adapter         core.Adapter
	MaxIterations   int
	RefinementField string         // Field name to use for refinement feedback
	History         *core.History  // Optional conversation history
	TrackHistory    bool           // Opt-in: automatically append to History during Forward
	Demos           []core.Example // Optional few-shot examples
}

Refine is a module that iteratively improves outputs through refinement.

It supports:

  • Conversation history for multi-turn context (WithHistory)
  • Automatic history tracking (WithHistoryTracking)
  • Few-shot examples for in-context learning (WithDemos)
  • Configurable refinement iterations (WithMaxIterations)
  • Custom feedback field names (WithRefinementField)

Thread Safety: Refine is safe for concurrent use when History is shared, as core.History uses internal synchronization. However, if TrackHistory is enabled, concurrent calls will interleave history entries.

By default, History is read-only (TrackHistory=false) to avoid conflicts with caller-managed history.

Refine does not support tool execution loops. If the model requests tool execution, use the ReAct module instead.

func NewRefine

func NewRefine(signature *core.Signature, lm core.LM) *Refine

NewRefine creates a new Refine module

func (*Refine) Clone added in v0.6.0

func (r *Refine) Clone() core.Module

Clone creates a new Refine module sharing the same LM, Signature, Options, Adapter, History, and Demos.

The clone can be configured independently but shares underlying resources with the original. This is suitable for parallel execution where each clone needs its own state but can share read-only configuration.

func (*Refine) Forward

func (r *Refine) Forward(ctx context.Context, inputs map[string]any) (*core.Prediction, error)

Forward executes the refinement loop

func (*Refine) GetSignature

func (r *Refine) GetSignature() *core.Signature

GetSignature returns the module's signature

func (*Refine) WithAdapter

func (r *Refine) WithAdapter(adapter core.Adapter) *Refine

WithAdapter sets a custom adapter

func (*Refine) WithDemos added in v0.7.0

func (r *Refine) WithDemos(demos []core.Example) *Refine

WithDemos sets few-shot examples for in-context learning

func (*Refine) WithHistory added in v0.7.0

func (r *Refine) WithHistory(history *core.History) *Refine

WithHistory sets conversation history for multi-turn interactions

func (*Refine) WithHistoryTracking added in v0.7.0

func (r *Refine) WithHistoryTracking(enabled bool) *Refine

WithHistoryTracking enables automatic history updates during Forward.

When enabled, Refine will append user prompts and assistant responses to History. Default is false (read-only) to avoid conflicts with caller-managed history.

func (*Refine) WithMaxIterations

func (r *Refine) WithMaxIterations(max int) *Refine

WithMaxIterations sets the maximum number of refinement iterations

func (*Refine) WithOptions

func (r *Refine) WithOptions(options *core.GenerateOptions) *Refine

WithOptions sets custom generation options

func (*Refine) WithRefinementField

func (r *Refine) WithRefinementField(field string) *Refine

WithRefinementField sets the field name for refinement feedback

type ScoringFunction

type ScoringFunction func(inputs map[string]any, prediction *core.Prediction) (float64, error)

ScoringFunction evaluates the quality of a prediction

func ConfidenceScorer

func ConfidenceScorer(field string) ScoringFunction

ConfidenceScorer returns a scorer based on a confidence field

func DefaultScorer

func DefaultScorer() ScoringFunction

DefaultScorer returns a simple length-based scorer This is a basic scorer that prefers longer outputs

type StreamResult

type StreamResult struct {
	Chunks     <-chan core.Chunk       // Channel for receiving streaming chunks
	Prediction <-chan *core.Prediction // Channel for receiving final prediction (sent after stream completes)
	Errors     <-chan error            // Channel for receiving errors
}

StreamResult represents the result of a streaming prediction

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL