niro

package module
v0.3.0-alpha Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 1, 2026 License: MIT Imports: 20 Imported by: 0

README

Niro

Streaming-first LLM runtime for Go.

Go Reference Go Report Card


Niro is a high-performance, streaming-native runtime for building real-time AI systems in Go. Voice agents, telephony pipelines, streaming chat, tool calling, multimodal, parallel orchestration — with millisecond-level control over every frame of data.

This is not LangChain for Go. There are no chains, no prompt templates, no document loaders, no vector store abstractions. Niro is a runtime for continuous intelligence — closer to net/http than a notebook framework.

Why Niro

Most LLM frameworks treat streaming as an afterthought. Niro inverts this: streaming is the primitive. Concurrency is a first-class design goal, not an addon.

LangChain-style Niro
Primary model Request/Response Streaming
Data unit String / Document Frame (multimodal)
Composition Chain of calls Pipeline of streams
Concurrency None Fan / Race / Sequence built-in
Backpressure None Bounded channels
Telemetry Plugin Hook interface (core)
Providers HTTP wrappers Official SDK-backed
Target Notebooks Production systems
Design Principles
  1. Streaming-first — not streaming-compatible
  2. Concurrency as core strength — Fan, Race, Sequence out of the box
  3. SDK-backed providers — OpenAI, Anthropic, Google, Bedrock via official SDKs
  4. Observable by default — Hook interface for telemetry at every stage
  5. Minimal abstractions — maximum control
  6. Zero magic — no reflection, no hidden state, no globals
  7. Low allocations — tagged-union Frame, value types on the hot path
  8. Go idiomaticcontext.Context, interfaces, channels

Quick Start

package main

import (
    "context"
    "fmt"
    "os"

    "github.com/alexedtionweb/niro-stream"
    "github.com/alexedtionweb/niro-stream/provider/openai"
)

func main() {
    ctx := context.Background()
    llm := openai.New(os.Getenv("OPENAI_API_KEY"))

    stream, err := llm.Generate(ctx, &niro.Request{
        Model:        "gpt-4o",
        SystemPrompt: "You are a helpful assistant. Be concise.",
        Messages:     []niro.Message{niro.UserText("Explain Go channels in 3 sentences.")},
        Options:      niro.Options{MaxTokens: 256, Temperature: niro.Temp(0.7)},
    })
    if err != nil {
        fmt.Fprintf(os.Stderr, "error: %v\n", err)
        os.Exit(1)
    }

    for stream.Next(ctx) {
        fmt.Print(stream.Frame().Text)
    }
    fmt.Println()

    // Usage is accumulated automatically from the stream
    usage := stream.Usage()
    fmt.Fprintf(os.Stderr, "tokens: in=%d out=%d\n", usage.InputTokens, usage.OutputTokens)
}

Tokens arrive as they're generated. Usage is tracked silently. No buffering. No callbacks. Just a stream.

Providers

Niro uses a plugin model: the core (github.com/alexedtionweb/niro-stream) has zero external dependencies. Each SDK-backed provider lives in its own Go module — you only go get what you use. No SDK you don't need ever enters your build graph.

Provider Module Install SDK
OpenAI github.com/alexedtionweb/niro-stream/provider/openai go get github.com/alexedtionweb/niro-stream/provider/openai openai/openai-go
Anthropic github.com/alexedtionweb/niro-stream/provider/anthropic go get github.com/alexedtionweb/niro-stream/provider/anthropic anthropics/anthropic-sdk-go
Google Gemini github.com/alexedtionweb/niro-stream/provider/google go get github.com/alexedtionweb/niro-stream/provider/google googleapis/go-genai
Google Speech TTS/STT github.com/alexedtionweb/niro-stream/provider/googlespeech go get github.com/alexedtionweb/niro-stream/provider/googlespeech cloud.google.com/go/texttospeech, cloud.google.com/go/speech
AWS Bedrock github.com/alexedtionweb/niro-stream/provider/bedrock go get github.com/alexedtionweb/niro-stream/provider/bedrock aws-sdk-go-v2
OpenAI-compatible github.com/alexedtionweb/niro-stream/provider/compat included in core (zero deps) stdlib HTTP + SSE
Agent plugin github.com/alexedtionweb/niro-stream/plugin/agent go get github.com/alexedtionweb/niro-stream/plugin/agent optional component-based agent runtime
// OpenAI — go get github.com/alexedtionweb/niro-stream/provider/openai
llm := openai.New(os.Getenv("OPENAI_API_KEY"))

// Anthropic — go get github.com/alexedtionweb/niro-stream/provider/anthropic
llm := anthropic.New(os.Getenv("ANTHROPIC_API_KEY"))

// Google Gemini — go get github.com/alexedtionweb/niro-stream/provider/google
llm := google.New(ctx, os.Getenv("GOOGLE_API_KEY"))

// AWS Bedrock — go get github.com/alexedtionweb/niro-stream/provider/bedrock
llm := bedrock.New(cfg) // from aws-sdk-go-v2 config

// Any OpenAI-compatible endpoint (Ollama, vLLM, LiteLLM, etc.)
// Included in core — no extra install needed
llm := compat.New("http://localhost:11434/v1", "")

All providers implement the same niro.Provider interface. Swap providers by changing one line.

Custom Providers
mock := niro.ProviderFunc(func(ctx context.Context, req *niro.Request) (*niro.Stream, error) {
    s, e := niro.NewStream(0)
    go func() {
        defer e.Close()
        e.Emit(ctx, niro.TextFrame("hello from mock"))
    }()
    return s, nil
})
SDK Extensibility

Every SDK provider exposes its underlying client and a RequestHook for raw SDK parameter access. This gives you full control without forking the provider.

Expose underlying client:

llm := openai.New(apiKey)
client := llm.Client() // returns openai-go's Client for direct API calls

Per-provider RequestHook — modify raw SDK params before each request:

// Provider-level hook (applied to every request)
llm := openai.New(apiKey, openai.WithRequestHook(func(p *oai.ChatCompletionNewParams) {
    p.StreamOptions = oai.F(oai.ChatCompletionStreamOptionsParam{IncludeUsage: oai.Bool(true)})
}))

// Per-request hook (via Request.Extra)
stream, err := llm.Generate(ctx, &niro.Request{
    Messages: msgs,
    Extra: openai.RequestHook(func(p *oai.ChatCompletionNewParams) {
        p.LogProbs = oai.Bool(true)
    }),
})

Each provider defines its own RequestHook type with the appropriate SDK parameter struct. See the provider packages for details.

For auth/custom transport customization per SDK:

  • OpenAI / Anthropic: WithRequestOption(...)
  • Google: WithClientOption(...)
  • Bedrock: pass AWS aws.Config (credentials/region/retries) into bedrock.New(cfg, ...)

Core Concepts

Frame

The universal unit of data. A Frame is a tagged union — a single struct with a Kind discriminator. Zero allocations on the text hot path.

niro.TextFrame("Hello")                                           // text token
niro.AudioFrame(pcmChunk, "audio/pcm")                           // audio
niro.ImageFrame(pngBytes, "image/png")                            // image
niro.ToolCallFrame(&niro.ToolCall{ID: "1", Name: "fn", Args: j})  // tool call
niro.UsageFrame(&niro.Usage{InputTokens: 10, OutputTokens: 50})   // usage report
niro.CustomFrame(&niro.ExperimentalFrame{
    Type: "reasoning_summary",
    Data: "condensed reasoning output",
}) // provider-specific extension

For reasoning accounting, use stable usage-detail keys:

usage.Detail[niro.UsageReasoningTokens] = 128
usage.Detail[niro.UsageReasoningCost] = 42 // provider-defined units
Stream & Emitter

A Stream is a backpressure-aware, cancellable sequence of Frames. An Emitter is the write side.

stream, emitter := niro.NewStream(16) // buffered channel

go func() {
    defer emitter.Close()
    emitter.Emit(ctx, niro.TextFrame("hello"))
    emitter.Emit(ctx, niro.TextFrame(" world"))
}()

for stream.Next(ctx) {
    fmt.Print(stream.Frame().Text)
}

Usage auto-accumulation: KindUsage frames are consumed silently by stream.Next() and accumulated in stream.Usage(). Providers emit them; your application reads the totals after streaming.

ResponseMeta: Providers set model name, finish reason, and response ID via Emitter.SetResponse(). Access it after streaming with stream.Response().

Token Budget

Token budget is controlled via Request.Options.MaxTokens (output cap) and tracked via stream.Usage():

req := &niro.Request{
    Messages: []niro.Message{niro.UserText("Summarize this document.")},
    Options:  niro.Options{MaxTokens: 512},
}

stream, _ := llm.Generate(ctx, req)
_, _ = niro.CollectText(ctx, stream)
usage := stream.Usage()
fmt.Printf("budget out=%d, actual out=%d\n", req.Options.MaxTokens, usage.OutputTokens)

Recommended production pattern:

  • Set MaxTokens per route/use case (chat, tools, summaries).
  • Enforce upstream request-size limits before provider call.
  • Use Usage.TotalTokens and Usage.Detail for per-tenant policy/cost accounting.
Provider Prompt Cache (Input-Side)

Niro supports provider-agnostic prompt cache intent via Options.Cache. This is an input optimization only: it does not replay output and does not change stream ordering.

req := &niro.Request{
    Client:   "tenant-a", // required for tenant-safe deterministic cache keys
    Model:    "gpt-4o",
    Messages: []niro.Message{niro.UserText("Summarize this policy doc")},
    Options: niro.Options{
        Cache: &niro.CacheOptions{
            Mode: niro.CachePrefer, // Auto | Prefer | Require | Bypass
            TTL:  10 * time.Minute, // hint only
        },
    },
}

stream, _ := rt.Generate(ctx, req)
_, _ = niro.CollectText(ctx, stream)
u := stream.Usage()
fmt.Printf("attempted=%d hit=%d cached_in=%d\n",
    u.Detail[niro.UsageCacheAttempted],
    u.Detail[niro.UsageCacheHit],
    u.Detail[niro.UsageCachedInputTokens],
)

Canonical cache metrics in Usage.Detail:

  • cache_attempted (0/1)
  • cache_hit (0/1)
  • cache_write (0/1)
  • cached_input_tokens (int)
  • cache_latency_saved_ms (int)

Runtime behavior:

  • Deterministic key (when CacheOptions.Key is empty): SHA256(tenant + ":" + model + ":" + normalized_prefix), stored as tenant:<hex>.
  • CacheOptions.Key is always tenant-prefixed by the runtime (tenant:user_key) to prevent cross-tenant reuse.
  • CacheRequire fails when the provider cannot honor requested cache semantics or when the provider explicitly reports a cache miss in require mode.
  • CacheBypass skips cache hint/context wiring entirely (fast path remains unchanged).

Advanced hooks:

  • runtime.WithPrefixNormalizer(...) lets you plug a custom canonicalizer for deterministic key derivation.
  • runtime.WithCacheEngine(...) provides local prefix lookup/store hooks (used by adapters like Gemini cached-content IDs).
Experimental reasoning

Options.ExperimentalReasoning is an opt-in flag for provider-specific reasoning extensions (for example, KindCustom summaries/traces). Providers that do not support it should return an explicit error instead of silently ignoring the option.

Processor & Pipeline

Transform streams with composable stages:

pipeline := niro.Pipe(
    niro.TextOnly(),
    niro.Map(func(f niro.Frame) niro.Frame {
        f.Text = strings.ToUpper(f.Text)
        return f
    }),
    niro.Tap(func(f niro.Frame) { log.Printf("token: %q", f.Text) }),
).WithBuffer(32)

out := pipeline.Run(ctx, inputStream)

Built-in: Map, Filter, Tap, TextOnly, PassThrough, Accumulate. Or implement Processor directly.

Each stage runs in its own goroutine, connected by bounded channels. Backpressure propagates naturally.

Orchestration

The core differentiator: concurrent LLM workflow primitives.

Fan — Parallel Merge

Run N generations concurrently, merge all frames into one stream:

stream := niro.Fan(ctx,
    func(ctx context.Context) (*niro.Stream, error) {
        return llm.Generate(ctx, &niro.Request{Messages: []niro.Message{niro.UserText("What is Go?")}})
    },
    func(ctx context.Context) (*niro.Stream, error) {
        return llm.Generate(ctx, &niro.Request{Messages: []niro.Message{niro.UserText("What is Rust?")}})
    },
)

Use cases: parallel tool calls, multi-model ensembles, scatter-gather.

Race — First Wins

Send the same request to multiple providers; take the fastest response:

text, usage, err := niro.Race(ctx,
    func(ctx context.Context) (*niro.Stream, error) {
        return openaiLLM.Generate(ctx, req)
    },
    func(ctx context.Context) (*niro.Stream, error) {
        return anthropicLLM.Generate(ctx, req)
    },
)

Losers are canceled immediately. Use for latency hedging and speculative execution.

Sequence — Chained Generations

Each step receives the text output of the previous:

stream, err := niro.Sequence(ctx,
    func(ctx context.Context, _ string) (*niro.Stream, error) {
        return llm.Generate(ctx, &niro.Request{Messages: []niro.Message{niro.UserText("Write a haiku about Go")}})
    },
    func(ctx context.Context, haiku string) (*niro.Stream, error) {
        return llm.Generate(ctx, &niro.Request{Messages: []niro.Message{niro.UserText("Critique this: " + haiku)}})
    },
)

Build multi-step refinement pipelines with zero boilerplate.

Tool Calling

Tool calls are first-class streaming citizens:

stream, _ := llm.Generate(ctx, &niro.Request{
    Messages: messages,
    Tools: []niro.Tool{{
        Name:        "get_weather",
        Description: "Get current weather",
        Parameters:  json.RawMessage(`{"type":"object","properties":{"city":{"type":"string"}}}`),
    }},
})

for stream.Next(ctx) {
    f := stream.Frame()
    switch f.Kind {
    case niro.KindText:
        fmt.Print(f.Text)
    case niro.KindToolCall:
        result := executeTool(f.Tool)
        messages = append(messages, niro.ToolMessage(f.Tool.ID, result))
    }
}

See examples/tools/main.go for a complete tool-calling loop.

Structured Output (JSON Schema → Typed)

Use JSON Schema to constrain model output and decode it into a typed struct.

Final typed output
type Weather struct {
    City  string `json:"city"`
    TempF int    `json:"temp_f"`
}

schema := json.RawMessage(`{"type":"object","properties":{"city":{"type":"string"},"temp_f":{"type":"integer"}},"required":["city","temp_f"]}`)

result, resp, usage, err := niro.GenerateStructured[Weather](ctx, llm, &niro.Request{
    Messages: []niro.Message{niro.UserText("Weather in NYC?")},
}, schema)
Streaming partial + final output
ss, err := niro.StreamStructured[Weather](ctx, llm, &niro.Request{
    Messages: []niro.Message{niro.UserText("Weather in NYC?")},
}, schema)
if err != nil { /* handle */ }

for ss.Next(ctx) {
    ev := ss.Event()
    if ev.Partial != nil {
        // partial valid JSON (may update as stream progresses)
    }
    if ev.Final != nil {
        // final typed output
    }
}
if err := ss.Err(); err != nil {
    // handle error
}

Hooks — Telemetry & Observability

Every generation is observable through the Hook interface:

type Hook interface {
    OnGenerateStart(ctx context.Context, info GenerateStartInfo) context.Context
    OnGenerateEnd(ctx context.Context, info GenerateEndInfo)
    OnFrame(ctx context.Context, f Frame) error
    OnToolCall(ctx context.Context, call ToolCall)
    OnToolResult(ctx context.Context, result ToolResult, elapsed time.Duration)
    OnError(ctx context.Context, err error)
}

Implement for Langfuse, Datadog, OpenTelemetry, or custom logging. Embed niro.NoOpHook to implement only the methods you care about. Compose multiple hooks with niro.Hooks(h1, h2, h3).

Wire it via Runtime:

rt := niro.NewRuntime(llm).
    WithHook(myHook).
    WithPipeline(myPipeline)

stream, err := rt.Generate(ctx, req)

Error Handling & Validation

Niro provides semantic error types and request validation for robust error handling.

Request Validation

Validate requests before invoking a provider:

req := &niro.Request{
    Model: "gpt-4o",
    Messages: []niro.Message{niro.UserText("hello")},
    ResponseFormat: "json_schema",
    ResponseSchema: schema,
    Options: niro.Options{Temperature: niro.Temp(0.7)},
}

if err := req.Validate(); err != nil {
    fmt.Printf("validation error: %v (code: %d)\n", err.Message, err.Code)
}

Checks: non-empty messages, valid ResponseFormat, ResponseSchema validity (if json_schema), parameter ranges (Temperature ∈ [0, 2.0], TopP ∈ [0, 1.0], etc.), tool definitions, and more.

Error Types & Semantic Handling

Errors are typed for proper handling:

// Check error category
if niro.IsRetryable(err) {
    // Safe to retry
}
if niro.IsRateLimited(err) {
    // Rate limit — use backoff
}
if niro.IsAuthError(err) {
    // Invalid credentials — don't retry
}
if niro.IsTimeout(err) {
    // Timeout — may retry with longer deadline
}

// Error chaining
err := niro.WrapError(niro.ErrCodeProviderError, "OpenAI failed", underlying)
err.WithProvider("openai").WithRequestID("req_123")

Error codes: InvalidRequest (400), AuthenticationFailed (401), ModelNotFound (404), RateLimited (429), ProviderError (500), ServiceUnavailable (503), Timeout (504), and Niro-specific codes.

Retry & Backoff

Automatic retry with exponential backoff for transient failures:

config := niro.RetryConfig{
    MaxAttempts: 5,
    Backoff: niro.ExponentialBackoff{
        InitialDelay: 100 * time.Millisecond,
        Multiplier:   2.0,
        MaxDelay:     10 * time.Second,
        Jitter:       true, // avoid thundering herd
    },
    ShouldRetry: niro.IsRetryable, // only retry transient errors
    OnRetry: func(attempt int, err error) {
        log.Printf("Retry %d: %v", attempt, err)
    },
}

provider := niro.NewRetryProvider(llm, config)
stream, err := provider.Generate(ctx, req)

Works with context cancellation and respects deadlines. Only retries errors marked as retryable (429, 503, 504, stream errors).

Timeouts & Tracing

Timeouts

Enforce generation timeouts:

provider := niro.NewTimeoutProvider(llm, 5*time.Minute)
ctx, cancel := niro.WithGenerationTimeout(context.Background(), 5*time.Minute)
defer cancel()

stream, err := provider.Generate(ctx, req)
Request Tracing

Automatic request ID generation and propagation:

// Generate unique request ID
requestID := niro.GenerateRequestID() // "req_<random>"

// Inject trace context
trace := niro.TraceContext{
    RequestID: requestID,
    UserID:    "user123",
    SessionID: "session456",
}
ctx = niro.WithTraceContext(ctx, trace)

// Use TracingProvider to auto-inject trace context
provider := niro.NewTracingProvider(llm)
stream, err := provider.Generate(ctx, req)

// Retrieve in hooks for logging
trace := niro.GetTraceContext(ctx)
fmt.Printf("Request: %s (user: %s)", trace.RequestID, trace.UserID)

Usage Metrics

Niro emits raw usage values (InputTokens, OutputTokens, TotalTokens, and Usage.Detail) for billing and policy engines:

hook := &MyHook{
    onEnd: func(ctx context.Context, info niro.GenerateEndInfo) {
        usage := info.Usage
        // send raw usage to your billing/finance service
        log.Printf("model=%s in=%d out=%d total=%d",
            info.Model, usage.InputTokens, usage.OutputTokens, usage.TotalTokens)
    },
}

Price calculation is intentionally out of core runtime scope.

Tool Execution

Automatic tool calling with loop management:

executor := niro.ToolExecutorFunc(func(ctx context.Context, name string, args json.RawMessage) (string, error) {
    switch name {
    case "weather":
        return getWeather(args)
    case "calculator":
        return calculate(args)
    default:
        return "", fmt.Errorf("unknown tool %q", name)
    }
})

loop := niro.NewToolLoop(executor, 5) // max 5 rounds
stream, err := loop.GenerateWithTools(ctx, llm, &niro.Request{
    Messages: []niro.Message{niro.UserText("What's the weather and 2+2?")},
    Tools:    []niro.Tool{ /* ... */ },
})

Or use a wrapping provider:

provider := niro.NewStreamWithToolHandling(llm, executor, 5)
stream, err := provider.Generate(ctx, req)
// Tool calls handled automatically
Smart Tooling Abstraction (Toolset)

For Genkit-style tool definition + validation + hooks, use Toolset:

type sumArgs struct {
    A int `json:"a"`
    B int `json:"b"`
}

toolset := niro.NewToolset()

sumTool, err := niro.NewToolDefinitionAny(
    "sum",
    "Add two integers",
    map[string]any{
        "type": "object",
        "properties": map[string]any{
            "a": map[string]any{"type": "integer"},
            "b": map[string]any{"type": "integer"},
        },
        "required": []string{"a", "b"},
    },
    func(ctx context.Context, raw json.RawMessage) (any, error) {
        var in sumArgs
        if err := niro.JSONUnmarshal(raw, &in); err != nil {
            return nil, err
        }
        return map[string]int{"sum": in.A + in.B}, nil
    },
)
if err != nil { /* handle */ }

toolset.MustRegister(sumTool)

provider := niro.NewToolingProvider(
    llm,
    toolset,
    niro.DefaultToolStreamOptions(),
)

stream, err := provider.Generate(ctx, &niro.Request{
    Messages: []niro.Message{niro.UserText("What is 20+22?")},
})

What this adds automatically:

  • Tool schema validation for call arguments
  • Tool execution lifecycle hooks (OnToolValidate, OnToolExecuteStart, OnToolExecuteEnd)
  • Tool definitions mapped to provider-native Request.Tools
  • Tool results fed back into subsequent turns inside the loop

Production Composition

Combine multiple wrappers for a production-ready provider:

provider := niro.ComposedProvider(
    baseProvider,
    5 * time.Minute,                          // timeout
    &niro.DefaultRetryConfig(),                // retry
)
// Adds tracing, timeout, and retry all at once

Multimodal

Messages carry mixed content — text, images, audio, URLs:

msg := niro.Multi(niro.RoleUser,
    niro.TextPart("What's in this image?"),
    niro.ImagePart(pngBytes, "image/png"),
    niro.ImageURLPart("https://example.com/photo.jpg", "image/jpeg"),
)

Streams carry interleaved text, tool calls, usage, and control signals. No separate APIs for different modalities.

Performance

  • Zero-dependency core: github.com/alexedtionweb/niro-stream has no external imports — only the Go stdlib.
  • Frame: Tagged union (~80B value type). Text tokens: zero allocations beyond the string header.
  • Stream: chan Frame with sync/atomic error propagation. No mutexes on the read path.
  • Pipeline: One goroutine per stage, bounded channels for backpressure.
  • Providers: Separate Go modules — your build only includes SDKs you use.
  • Target: First token in <100ms over the full pipeline (network permitting).

Run benchmarks yourself:

go test -bench=. -benchmem ./...

Production Infrastructure

Niro ships with production-grade infrastructure for high-concurrency deployments (millions of concurrent calls).

BytePool — Zero-Alloc Media

BytePool eliminates per-frame []byte allocations for audio, image, and video data using size-class sync.Pool buckets (4KB, 64KB, 1MB).

pool := niro.DefaultBytePool // process-wide pool

// Provider emits pooled frames:
frame := niro.AudioFramePooled(pool, pcmChunk, "audio/pcm")

// Consumer returns buffer when done:
pool.Put(frame.Data)

Benchmark: ~60ns per Get/Put cycle, 1 alloc (pointer indirection). Scales linearly under parallel load.

Transport — Connection Pooling & Keep-Alive

Transport() returns an optimized *http.Transport tuned for LLM API traffic: aggressive keep-alive, TLS session resumption, large idle pool, HTTP/2 negotiation.

// Use the process-wide default (recommended):
client := niro.DefaultHTTPClient

// Or create with custom options:
client := niro.HTTPClient(&niro.TransportOptions{
    MaxIdleConnsPerHost: 50,
    IdleConnTimeout:     5 * time.Minute,
})

// Pass to any provider:
llm := compat.New(url, key, compat.WithClient(client))

Defaults: GOMAXPROCS×64 idle connections, GOMAXPROCS×16 per host, 120s idle timeout, TLS 1.2+, 64KB write / 32KB read buffers.

Cache — LRU Response Cache

Thread-safe, sharded (64 shards) LRU cache with TTL for caching identical LLM requests.

cache := niro.NewCache(niro.CacheOptions{
    MaxEntries: 10_000,
    TTL:        5 * time.Minute,
})
provider := cache.Wrap(llm) // transparent caching provider

stream, _ := provider.Generate(ctx, req)  // first call: miss → provider
stream, _ := provider.Generate(ctx, req)  // same request: hit → cached replay

Benchmark: ~1.6μs per cache hit. Lock-free reads via atomic counters. cache.Stats() returns hit/miss/rate.

Registry — Named Provider Routing

Registry manages named providers for runtime lookup, multi-provider deployments, and A/B routing.

reg := niro.NewRegistry()
reg.Register("openai", openaiProvider)
reg.Register("anthropic", anthropicProvider)
reg.Register("fast", cache.Wrap(openaiProvider))

// Route by name at request time:
stream, err := reg.Generate(ctx, "openai", req)

// List available providers:
names := reg.Names()  // ["anthropic", "fast", "openai"]

Benchmark: 0 allocs, ~34ns per lookup. RWMutex-protected, safe for concurrent registration and lookup.

Multi-Tenancy — Runtime Client Selection

Use MultiTenantProvider to select provider/client at request time.

reg := niro.NewRegistry()
reg.Register("tenant-a-openai", openaiA)
reg.Register("tenant-b-openai", openaiB)
reg.Register("tenant-c-bedrock", bedrockC)

router := niro.NewMultiTenantProvider(
    reg,
    niro.WithDefaultClient("tenant-a-openai"),
)

// Per-request selection
stream, err := router.Generate(ctx, &niro.Request{
    Client:   "tenant-c-bedrock",
    Messages: []niro.Message{niro.UserText("hello")},
})

You can also set the client in context:

ctx = niro.WithClient(ctx, "tenant-b-openai")
stream, err := router.Generate(ctx, &niro.Request{Messages: msgs})

Per-client customization is supported via mutators:

router := niro.NewMultiTenantProvider(reg,
    niro.WithClientMutator("tenant-c-bedrock", func(ctx context.Context, req *niro.Request) error {
        req.Extra = bedrock.Extras{
            InferenceProfile: "arn:aws:bedrock:us-west-2:123456789012:inference-profile/my-profile",
        }
        return nil
    }),
)
AWS Bedrock Inference Profiles

Bedrock supports default and per-request inference profile targeting.

llm := bedrock.New(cfg,
    bedrock.WithInferenceProfile("arn:aws:bedrock:us-west-2:123456789012:inference-profile/team-prod"),
)

// Override per request:
stream, err := llm.Generate(ctx, &niro.Request{
    Messages: []niro.Message{niro.UserText("status summary")},
    Extra: bedrock.Extras{
        InferenceProfile: "arn:aws:bedrock:us-west-2:123456789012:inference-profile/team-blue",
        Hook: func(in *bedrockruntime.ConverseStreamInput) {
            // Optional raw SDK customization
        },
    },
})

Agent Plugin (Optional Module)

Core stays agent-agnostic. Agent behavior (agent-to-agent, memory, MCP memory adapters) lives in the optional plugin module.

import "github.com/alexedtionweb/niro-stream/plugin/agent"

mem := agent.NewInMemoryMemory()

rt, err := agent.New(
    llm,
    agent.WithMemory(mem),
    agent.WithComponent(&agent.ToolingComponent{Toolset: toolset}),
)
if err != nil { /* handle */ }

_ = rt.Start(ctx)
defer rt.Close()

out, err := rt.Run(ctx, "session-1", "plan trip to tokyo")
fmt.Println(out.Text)

agent.Runtime also supports peer calls via WithPeer(...) and CallPeer(...) for agent-to-agent workflows.

Migration Notes (Cache)

  • Existing code remains valid: Options.Cache is optional and defaults to disabled.
  • For cache-enabled requests, set Request.Client to enforce tenant-safe key namespacing.
  • CacheRequire fails fast when provider cache capabilities cannot satisfy the requested semantics.
  • Provider adapters map native cache signals to canonical Usage.Detail keys listed above.
Object Pools

sync.Pool-backed pools for hot-path objects:

u := niro.GetUsage()         // 0 allocs, ~21ns
defer niro.PutUsage(u)

m := niro.GetResponseMeta()  // 0 allocs, ~27ns
defer niro.PutResponseMeta(m)
JSON Backend (Configurable)

Niro allows swapping the JSON implementation globally (same compatible set as Fiber):

  • encoding/json (stdlib)
  • github.com/goccy/go-json
  • github.com/bytedance/sonic
  • github.com/segmentio/encoding/json
  • github.com/json-iterator/go
niro.SetJSON(&niro.JSONLibrary{
    Marshal:   json.Marshal,
    Unmarshal: json.Unmarshal,
    Valid:     json.Valid,
    NewEncoder: func(w io.Writer) niro.JSONEncoder {
        return json.NewEncoder(w)
    },
    NewDecoder: func(r io.Reader) niro.JSONDecoder {
        return json.NewDecoder(r)
    },
})

Examples

Example Description
chat Streaming chat with provider selection
tools Tool-calling loop with automatic round-trip
parallel Fan, Race, Sequence orchestration
pipeline Processing pipeline with hooks

Requirements

  • Go 1.23+

Architecture

See ARCHITECTURE.md for the full design document covering Frame internals, Stream lifecycle, provider adapter patterns, orchestration execution model, and Hook integration.

Roadmap

Designed for forward compatibility:

  • Audio streaming pipelines (STT → LLM → TTS)
  • Duplex pipelines (bidirectional streams)
  • Tool execution graphs (automatic dispatch + re-invoke)
  • Realtime agent loops with interruption
  • Provider middleware (retries, rate limiting, fallback chains)
  • WASM edge runtime support

None require breaking changes to the core.

License

MIT

Documentation

Overview

Package niro is a streaming-first LLM runtime for Go.

Niro provides a minimal, composable architecture for building real-time AI systems. It is designed for low-latency, multimodal streaming pipelines — not request/response wrappers.

Core Concepts

  • Frame: Universal unit of data (text tokens, audio, image, video, tool calls)
  • Stream: Backpressure-aware, cancellable sequence of Frames with usage tracking
  • [Processor]: Composable stream transformer (the building block)
  • [Pipeline]: Concurrent chain of Processors with automatic lifecycle
  • Provider: LLM backend interface (OpenAI, Anthropic, Google, Bedrock, or custom)
  • [Hook]: Telemetry / observability interface for tracing every generation

Quick Start

provider := openai.New(os.Getenv("OPENAI_API_KEY"))

stream, err := provider.Generate(ctx, &niro.Request{
    Model: "gpt-4o",
    Messages: []niro.Message{
        niro.UserText("Hello!"),
    },
})
if err != nil {
    log.Fatal(err)
}

for stream.Next(ctx) {
    fmt.Print(stream.Frame().Text)
}
usage := stream.Usage()
fmt.Printf("tokens: %d in, %d out\n", usage.InputTokens, usage.OutputTokens)

Design Principles

Streaming-first, not streaming-compatible. Minimal abstractions, maximum control. Zero magic. Composable pipelines. Backpressure-aware. Low allocations. Go idiomatic. Production-first.

Index

Constants

View Source
const (
	// AudioPCM8k is 8 kHz 16-bit mono PCM — telephony grade (PSTN / G.711-compatible).
	AudioPCM8k = "audio/pcm;rate=8000;bits=16;channels=1"

	// AudioPCM16k is 16 kHz 16-bit mono PCM — standard ASR / Nova Sonic input.
	AudioPCM16k = "audio/pcm;rate=16000;bits=16;channels=1"

	// AudioPCM24k is 24 kHz 16-bit mono PCM — Nova Sonic output, OpenAI Realtime.
	AudioPCM24k = "audio/pcm;rate=24000;bits=16;channels=1"

	// AudioPCM44k is 44.1 kHz 16-bit mono PCM — CD quality.
	AudioPCM44k = "audio/pcm;rate=44100;bits=16;channels=1"

	// AudioPCM48k is 48 kHz 16-bit mono PCM — WebRTC / studio quality.
	AudioPCM48k = "audio/pcm;rate=48000;bits=16;channels=1"
)

--- Audio format MIME types ---

These constants encode sample rate, bit depth, and channel count in a single MIME string carried by Frame.Mime / Part.Mime. All formats are raw uncompressed PCM (little-endian signed 16-bit, mono) unless noted.

Use them with AudioFrame and AudioPart so consumers can decode without additional out-of-band configuration.

View Source
const (
	// UsageReasoningTokens is the standard key for reasoning token count.
	UsageReasoningTokens = "reasoning_tokens"
	// UsageReasoningCost is the standard key for provider-reported reasoning cost units.
	UsageReasoningCost = "reasoning_cost"
	// UsageCacheAttempted is 0|1 indicating cache was attempted.
	UsageCacheAttempted = "cache_attempted"
	// UsageCacheHit is 0|1 indicating cache hit occurred.
	UsageCacheHit = "cache_hit"
	// UsageCacheWrite is 0|1 indicating cache write occurred.
	UsageCacheWrite = "cache_write"
	// UsageCachedInputTokens is the number of prompt tokens served from cache.
	UsageCachedInputTokens = "cached_input_tokens"
	// UsageCacheLatencySavedMS is integer milliseconds saved by cache.
	UsageCacheLatencySavedMS = "cache_latency_saved_ms"
)
View Source
const (
	// AudioOGGOpus is OGG-encapsulated Opus — ElevenLabs default, web-friendly.
	AudioOGGOpus = "audio/ogg;codecs=opus"

	// AudioMP3 is MPEG Layer 3 — universal playback support.
	AudioMP3 = "audio/mpeg"

	// AudioAAC is AAC in MP4 container — mobile-friendly.
	AudioAAC = "audio/aac"

	// AudioFLAC is lossless FLAC — high-quality archival.
	AudioFLAC = "audio/flac"

	// AudioWAV is RIFF/WAV — uncompressed, widely supported.
	AudioWAV = "audio/wav"

	// AudioPCMU8k is G.711 μ-law at 8kHz (telephony/PSTN).
	AudioPCMU8k = "audio/pcmu;rate=8000"

	// AudioPCMA8k is G.711 A-law at 8kHz (telephony/PSTN).
	AudioPCMA8k = "audio/pcma;rate=8000"
)

── Encoded audio MIME constants ────────────────────────────────────────────

These complement the raw PCM constants in frame.go. Use them for TTS output and STT input when dealing with compressed formats.

Variables

View Source
var (
	ErrClosed             = NewError(ErrCodeStreamClosed, "stream closed")
	ErrNoStructuredOutput = NewError(ErrCodeNoStructuredOutput, "no structured output")
	ErrContextCancelled   = NewError(ErrCodeContextCancelled, "context cancelled")
)
View Source
var DefaultBytePool = pool.DefaultBytePool

DefaultBytePool is the process-wide byte pool. Providers and processors should use this unless they need isolation.

Functions

func AttachCacheContext

func AttachCacheContext(ctx context.Context, hint CacheHint, engine CacheEngine) context.Context

AttachCacheContext stores cache metadata in one context.WithValue call.

func CollectText

func CollectText(ctx context.Context, s *Stream) (string, error)

CollectText reads all text frames and concatenates their content. Uses a byte buffer with pre-allocated capacity.

func DefaultScrubber

func DefaultScrubber(key string, val any) any

DefaultScrubber redacts values whose keys contain substrings associated with authentication material or cardholder data, satisfying PCI-DSS Requirements 3.4 and 10.3. Key matching is case-insensitive substring.

Redacted key patterns:

authorization, api_key, apikey, password, passwd, secret,
token, credential, auth, bearer, pan, card, cvv, cvc, ssn.

func Forward

func Forward(ctx context.Context, src *Stream, dst *Emitter) error

Forward reads all frames from src and emits them to dst. Useful for connecting streams in custom Processors.

func IsAuthError

func IsAuthError(err error) bool

IsAuthError reports whether an error is an authentication error.

func IsRateLimited

func IsRateLimited(err error) bool

IsRateLimited reports whether an error is a rate limit error.

func IsRetryable

func IsRetryable(err error) bool

IsRetryable reports whether an error is retryable.

func IsTimeout

func IsTimeout(err error) bool

IsTimeout reports whether an error is a timeout error.

func JSONMarshal

func JSONMarshal(v any) ([]byte, error)

JSONMarshal marshals v using the configured JSON library.

func JSONMarshalIndent

func JSONMarshalIndent(v any, prefix, indent string) ([]byte, error)

JSONMarshalIndent marshals v with indentation using the configured library.

func JSONUnmarshal

func JSONUnmarshal(data []byte, v any) error

JSONUnmarshal unmarshals data into v using the configured JSON library.

func JSONValid

func JSONValid(data []byte) bool

JSONValid reports whether data is valid JSON using the configured library.

func LogDebug

func LogDebug(ctx context.Context, msg string, args ...any)

LogDebug emits a DEBUG record. Zero overhead when DEBUG is disabled.

PCI-DSS: DEBUG MUST NOT be enabled in production or PCI-scoped environments.

func LogError

func LogError(ctx context.Context, msg string, args ...any)

LogError emits an ERROR record for failures requiring operator attention.

PCI-DSS Req 10.2.4: authentication failures are always emitted at ERROR.

func LogInfo

func LogInfo(ctx context.Context, msg string, args ...any)

LogInfo emits an INFO record. Safe for production.

func LogWarn

func LogWarn(ctx context.Context, msg string, args ...any)

LogWarn emits a WARN record for transient failures and retries.

PCI-DSS Req 10.2: warn events are audit signals; retain logs for 12 months.

func NewStream

func NewStream(bufSize int) (*Stream, *Emitter)

NewStream creates a paired Stream and Emitter.

bufSize controls the channel buffer size between writer and reader. A bufSize of 0 means fully synchronous (unbuffered) — the writer blocks until the reader consumes each frame. Larger values allow the writer to get ahead, trading memory for throughput.

Typical values:

  • 0: telephony / real-time (minimal latency)
  • 16: general streaming (good default)
  • 64: batch-style processing

func NormalizeCacheOptions

func NormalizeCacheOptions(req *Request, normalizer PrefixNormalizer) (CacheHint, *Error)

NormalizeCacheOptions validates and normalizes cache metadata.

Key ownership is enforced to avoid cross-tenant data leakage: finalKey = Request.Client + ":" + CacheOptions.Key.

func PutResponseMeta

func PutResponseMeta(m *ResponseMeta)

PutResponseMeta returns a ResponseMeta to the pool.

func PutUsage

func PutUsage(u *Usage)

PutUsage returns a Usage to the pool.

func ResetLogger

func ResetLogger()

ResetLogger removes any override installed by SetLogger, restoring live delegation to slog.Default.

func SetCacheUsageDetail

func SetCacheUsageDetail(u *Usage, attempted, hit, write bool, cachedInputTokens int, latencySavedMS int)

SetCacheUsageDetail writes canonical provider-agnostic cache metrics.

func SetJSON

func SetJSON(lib *JSONLibrary)

SetJSON replaces the JSON implementation used by Niro. If lib is nil, the stdlib encoding/json implementation is used. Any nil fields are filled with stdlib defaults.

func SetLogger

func SetLogger(l Logger)

SetLogger replaces the library-wide logger shared by all niro packages. It is safe to call concurrently at any time.

Pass nil to install Discard (suppress all output). Call ResetLogger to restore live slog.Default delegation.

// JSON output at debug level (use only in non-PCI environments):
niro.SetLogger(niro.NewSlogAdapter(slog.New(
    slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug}),
)))

// Suppress all output:
niro.SetLogger(niro.Discard())

// Custom backend (zap, zerolog, …):
niro.SetLogger(&myZapAdapter{l: zapLogger})

func SetScrubber

func SetScrubber(s Scrubber)

SetScrubber installs a global Scrubber applied to all niro log helpers. Pass nil to remove the installed scrubber (default: none).

niro.SetScrubber(niro.DefaultScrubber)

func Temp

func Temp(v float64) *float64

Temp returns a *float64 for use in Options.Temperature.

func TopKVal

func TopKVal(v int) *int

TopKVal returns a *int for use in Options.TopK.

func TopPVal

func TopPVal(v float64) *float64

TopPVal returns a *float64 for use in Options.TopP.

func WithCacheEngine

func WithCacheEngine(ctx context.Context, engine CacheEngine) context.Context

WithCacheEngine stores an optional CacheEngine in context.

func WithCacheHint

func WithCacheHint(ctx context.Context, hint CacheHint) context.Context

WithCacheHint stores normalized cache metadata in context.

func WithGenerationTimeout

func WithGenerationTimeout(ctx context.Context, timeout time.Duration) (context.Context, context.CancelFunc)

WithGenerationTimeout returns a context with a generation timeout applied.

Types

type BytePool

type BytePool = pool.BytePool

BytePool is an alias for pool.BytePool. See github.com/alexedtionweb/niro-stream/pool for direct usage without the niro import.

func NewBytePool

func NewBytePool() *BytePool

NewBytePool creates a new BytePool.

type CacheCapabilities

type CacheCapabilities struct {
	SupportsPrefix       bool
	SupportsExplicitKeys bool
	SupportsTTL          bool
	SupportsBypass       bool
}

CacheCapabilities describes what a provider can honor.

func ProviderCacheCaps

func ProviderCacheCaps(p Provider) CacheCapabilities

ProviderCacheCaps returns provider cache capabilities if exposed.

func (CacheCapabilities) SupportsHint

func (c CacheCapabilities) SupportsHint(h CacheHint) bool

SupportsHint returns whether these capabilities can honor a require-level hint.

type CacheCapableProvider

type CacheCapableProvider interface {
	Provider
	CacheCaps() CacheCapabilities
}

CacheCapableProvider optionally exposes provider cache capabilities. This is additive and keeps the Provider interface backward-compatible.

type CacheEngine

type CacheEngine interface {
	ResolvePrefixHash(ctx context.Context, req *Request, scope CacheScope) (key string, ok bool, err error)
	StorePrefix(ctx context.Context, key string, scope CacheScope, ttl time.Duration, meta map[string]string) error
	LookupPrefix(ctx context.Context, key string, scope CacheScope) (meta map[string]string, ok bool, err error)
}

CacheEngine is an optional pluggable local cache extension point. Core runtime works without any engine; providers may consume this via context.

func GetCacheEngine

func GetCacheEngine(ctx context.Context) (CacheEngine, bool)

GetCacheEngine retrieves an optional CacheEngine from context.

type CacheHint

type CacheHint struct {
	Mode       CacheMode
	Scope      CacheScope
	Key        string
	TTL        time.Duration
	PrefixHash string
}

CacheHint is normalized cache metadata attached to request context. It is derived once per Runtime.Generate call and reused for retries.

func GetCacheHint

func GetCacheHint(ctx context.Context) (CacheHint, bool)

GetCacheHint retrieves normalized cache metadata from context.

type CacheMode

type CacheMode uint8

CacheMode defines cache intent semantics.

const (
	// CacheAuto lets the provider choose best-effort cache behavior.
	CacheAuto CacheMode = iota
	// CachePrefer requests cache usage but does not fail if unsupported.
	CachePrefer
	// CacheRequire fails the request if cache semantics cannot be applied.
	CacheRequire
	// CacheBypass explicitly disables cache usage for this request.
	CacheBypass
)

type CacheOptions

type CacheOptions struct {
	Mode  CacheMode
	Key   string        // Optional deterministic key (namespaced by tenant/client)
	TTL   time.Duration // Hint only; providers may ignore
	Scope CacheScope
}

CacheOptions declares cache intent in a provider-agnostic way.

Zero value means best-effort prefix cache with no explicit key/ttl hint.

type CacheScope

type CacheScope uint8

CacheScope defines the logical cache scope.

const (
	// CacheScopePrefix caches reusable prompt prefixes.
	// This is the only stable provider-agnostic scope currently supported.
	CacheScopePrefix CacheScope = iota
)

type DefaultPrefixNormalizer

type DefaultPrefixNormalizer struct{}

DefaultPrefixNormalizer normalizes model + effective messages as JSON.

func (DefaultPrefixNormalizer) NormalizePrefix

func (DefaultPrefixNormalizer) NormalizePrefix(req *Request) ([]byte, error)

func (DefaultPrefixNormalizer) WritePrefixHash

func (DefaultPrefixNormalizer) WritePrefixHash(h hash.Hash, req *Request) error

type DiscardHandler

type DiscardHandler struct{}

DiscardHandler is a slog.Handler that silently discards all records. Use it when you need a silent *slog.Logger for NewSlogAdapter:

niro.SetLogger(niro.NewSlogAdapter(slog.New(niro.DiscardHandler{})))

For most cases, prefer Discard directly.

func (DiscardHandler) Enabled

func (DiscardHandler) Enabled(_ context.Context, _ slog.Level) bool

func (DiscardHandler) Handle

func (DiscardHandler) WithAttrs

func (h DiscardHandler) WithAttrs(_ []slog.Attr) slog.Handler

func (DiscardHandler) WithGroup

func (h DiscardHandler) WithGroup(_ string) slog.Handler

type Emitter

type Emitter struct {
	// contains filtered or unexported fields
}

Emitter writes Frames into a Stream. It is the write half of a Stream pipe.

Contract: do not call Emit after Close or Error. The Pipeline ensures this automatically for Processors.

func (*Emitter) Close

func (e *Emitter) Close()

Close closes the stream. Safe to call multiple times.

func (*Emitter) Emit

func (e *Emitter) Emit(ctx context.Context, f Frame) error

Emit sends a Frame into the stream. Blocks if the stream buffer is full (backpressure). Returns an error if the context is canceled or the stream is closed.

Emit is safe to call concurrently with Close — a concurrent close returns ErrClosed instead of panicking.

func (*Emitter) Error

func (e *Emitter) Error(err error)

Error sets an error on the stream and closes it. The error is visible to the reader via Stream.Err().

func (*Emitter) SetResponse

func (e *Emitter) SetResponse(meta *ResponseMeta)

SetResponse stores provider metadata on the stream. Call this before Close, typically after all frames are emitted.

type Error

type Error struct {
	Code       ErrorCode
	Message    string
	Err        error  // underlying error for error chaining
	Provider   string // which provider failed (if applicable)
	RequestID  string // trace ID for debugging
	Retryable  bool   // whether the operation can be safely retried
	StatusCode int    // HTTP status code (if applicable)
}

Error represents a detailed error from Niro or a provider.

func NewError

func NewError(code ErrorCode, msg string) *Error

NewError creates a new Error with the given code and message.

func NewErrorf

func NewErrorf(code ErrorCode, format string, args ...interface{}) *Error

NewErrorf creates a new Error with formatted message.

func WrapError

func WrapError(code ErrorCode, msg string, err error) *Error

WrapError wraps an existing error with context.

func WrapErrorf

func WrapErrorf(code ErrorCode, format string, err error, args ...interface{}) *Error

WrapErrorf wraps an error with formatted message.

func (*Error) Error

func (e *Error) Error() string

func (*Error) Is

func (e *Error) Is(target error) bool

Is implements errors.Is for semantic error matching.

func (*Error) LogValue

func (e *Error) LogValue() slog.Value

LogValue implements slog.LogValuer so *Error emits structured attributes when passed to any slog call:

slog.Error("generate failed", "err", err)
// → err.code=429 err.provider=google err.message="Quota exceeded" err.retryable=true

func (*Error) Unwrap

func (e *Error) Unwrap() error

Unwrap returns the underlying error for error chaining.

func (*Error) WithProvider

func (e *Error) WithProvider(provider string) *Error

WithProvider adds provider context to an error.

func (*Error) WithRequestID

func (e *Error) WithRequestID(id string) *Error

WithRequestID adds request/trace ID context to an error.

func (*Error) WithStatusCode

func (e *Error) WithStatusCode(code int) *Error

WithStatusCode adds HTTP status code context to an error.

type ErrorCode

type ErrorCode int

ErrorCode categorizes runtime errors for proper handling.

const (
	// Client errors (4xx)
	ErrCodeInvalidRequest       ErrorCode = 400
	ErrCodeAuthenticationFailed ErrorCode = 401
	ErrCodeModelNotFound        ErrorCode = 404
	ErrCodeInvalidModel         ErrorCode = 422
	ErrCodeInsufficientQuota    ErrorCode = 429

	// Server errors (5xx)
	ErrCodeProviderError      ErrorCode = 500
	ErrCodeServiceUnavailable ErrorCode = 503
	ErrCodeRateLimited        ErrorCode = 509
	ErrCodeTimeout            ErrorCode = 504
	ErrCodeInternalError      ErrorCode = 510

	// Niro-specific errors (6xx)
	ErrCodeStreamClosed       ErrorCode = 600
	ErrCodeNoStructuredOutput ErrorCode = 601
	ErrCodeInvalidSchema      ErrorCode = 602
	ErrCodeContextCancelled   ErrorCode = 603
	ErrCodeStreamError        ErrorCode = 604
)

func ConvertHTTPStatusToCode

func ConvertHTTPStatusToCode(statusCode int) ErrorCode

ConvertHTTPStatusToCode maps HTTP status codes to ErrorCode.

type ExperimentalFrame

type ExperimentalFrame struct {
	Type string
	Data any
}

ExperimentalFrame carries provider-specific data without expanding core kinds. Type is an application/provider-defined discriminator (e.g. "reasoning_summary").

type Frame

type Frame struct {
	Kind   Kind               // Discriminator — always check this first
	Text   string             // Token text (KindText)
	Data   []byte             // Binary payload (KindAudio, KindImage, KindVideo)
	Mime   string             // MIME type for Data (e.g. "audio/pcm", "image/png")
	Tool   *ToolCall          // Tool call request (KindToolCall)
	Result *ToolResult        // Tool call result (KindToolResult)
	Usage  *Usage             // Token usage (KindUsage) — emitted by providers at end of stream
	Custom *ExperimentalFrame // Provider-specific/experimental payload (KindCustom)
	Signal Signal             // Control signal (KindControl)
}

Frame is the fundamental unit of data flowing through a Niro pipeline.

Frame is a tagged union optimized for the common case: text tokens. For text, only Kind and Text are populated — zero allocations beyond the string header. Binary payloads (audio, image, video) use the Data and Mime fields. Tool interactions use Tool and Result.

Frames are passed by value through channels. They are small and most fields are zero for any given Kind.

func AudioFrame

func AudioFrame(data []byte, mime string) Frame

AudioFrame creates a Frame carrying audio data.

func AudioFramePooled

func AudioFramePooled(bp *BytePool, data []byte, mime string) Frame

AudioFramePooled creates an audio Frame using a pooled buffer. The data is copied into a buffer from the pool. The caller should call [BytePool.Put] on Frame.Data after consumption.

func Collect

func Collect(ctx context.Context, s *Stream) ([]Frame, error)

Collect reads all frames from a Stream into a slice. Pre-allocates capacity when possible to minimize slice growth.

func ControlFrame

func ControlFrame(sig Signal) Frame

ControlFrame creates a Frame carrying a control signal.

func CustomFrame

func CustomFrame(c *ExperimentalFrame) Frame

CustomFrame creates a Frame carrying an experimental/provider-specific payload.

func ImageFrame

func ImageFrame(data []byte, mime string) Frame

ImageFrame creates a Frame carrying image data.

func ImageFramePooled

func ImageFramePooled(bp *BytePool, data []byte, mime string) Frame

ImageFramePooled creates an image Frame using a pooled buffer.

func TextFrame

func TextFrame(s string) Frame

TextFrame creates a Frame carrying a text token.

func ToolCallFrame

func ToolCallFrame(call *ToolCall) Frame

ToolCallFrame creates a Frame carrying a tool call request.

func ToolResultFrame

func ToolResultFrame(result *ToolResult) Frame

ToolResultFrame creates a Frame carrying a tool call result.

func UsageFrame

func UsageFrame(u *Usage) Frame

UsageFrame creates a Frame carrying token usage data.

func VideoFrame

func VideoFrame(data []byte, mime string) Frame

VideoFrame creates a Frame carrying video data.

func VideoFramePooled

func VideoFramePooled(bp *BytePool, data []byte, mime string) Frame

VideoFramePooled creates a video Frame using a pooled buffer.

type JSONDecoder

type JSONDecoder interface {
	Decode(v any) error
}

JSONDecoder is the minimal interface required by JSON decoders. Compatible with encoding/json and the same JSON libraries supported by Fiber.

func JSONNewDecoder

func JSONNewDecoder(r io.Reader) JSONDecoder

JSONNewDecoder returns a new decoder using the configured JSON library.

type JSONEncoder

type JSONEncoder interface {
	Encode(v any) error
}

JSONEncoder is the minimal interface required by JSON encoders. Compatible with encoding/json and the same JSON libraries supported by Fiber.

func JSONNewEncoder

func JSONNewEncoder(w io.Writer) JSONEncoder

JSONNewEncoder returns a new encoder using the configured JSON library.

type JSONLibrary

type JSONLibrary struct {
	Marshal       func(v any) ([]byte, error)
	Unmarshal     func(data []byte, v any) error
	Valid         func(data []byte) bool
	NewEncoder    func(w io.Writer) JSONEncoder
	NewDecoder    func(r io.Reader) JSONDecoder
	MarshalIndent func(v any, prefix, indent string) ([]byte, error)
}

JSONLibrary defines the JSON functions used by Niro.

Compatible with the same libraries supported by Fiber:

  • encoding/json (stdlib)
  • github.com/goccy/go-json
  • github.com/bytedance/sonic
  • github.com/segmentio/encoding/json
  • github.com/json-iterator/go

Users can call SetJSON to swap the implementation globally.

func JSON

func JSON() *JSONLibrary

JSON returns the currently active JSON library.

type Kind

type Kind uint8

Kind identifies the type of data a Frame carries.

const (
	KindText       Kind = iota + 1 // Text token (the hot path)
	KindAudio                      // Audio chunk (PCM, opus, etc.)
	KindImage                      // Image data (PNG, JPEG, etc.)
	KindVideo                      // Video frame
	KindToolCall                   // Tool invocation request from LLM
	KindToolResult                 // Tool invocation result
	KindUsage                      // Token usage report
	KindCustom                     // Experimental/provider-specific payload
	KindControl                    // Pipeline control signal
)

func (Kind) String

func (k Kind) String() string

String returns the human-readable name of the Kind.

type Level

type Level int

Level is the severity of a log record.

Values are intentionally identical to slog.Level so adapters convert between the two types with a zero-cost integer cast: slog.Level(rynLevel).

PCI-DSS guidance for each level is documented on each constant below.

const (
	// LevelDebug is for verbose, per-request diagnostics.
	//
	// PCI-DSS Requirement 10: DEBUG MUST NOT be enabled in production or any
	// PCI-scoped environment. Debug output may include request or response
	// payloads that could expose cardholder data or authentication credentials.
	LevelDebug Level = -4

	// LevelInfo records normal operational events (provider selection, model
	// routing). Safe for production.
	LevelInfo Level = 0

	// LevelWarn records transient, recoverable conditions: retries, rate-limit
	// back-off, degraded provider state. Always safe for production.
	//
	// PCI-DSS Requirement 10.2: repeated warn events are audit signals; ensure
	// your log aggregator retains them for the required 12-month period.
	LevelWarn Level = 4

	// LevelError records unrecoverable failures requiring operator attention.
	// Always safe for production.
	//
	// PCI-DSS Requirement 10.2.4: authentication failures MUST be logged at
	// this level or above.
	LevelError Level = 8
)

type Logger

type Logger interface {
	// Enabled reports whether records at level would be processed.
	// MUST be cheap: no allocation, no lock, branch-predictor friendly.
	// niro calls Enabled before constructing expensive args; a false return
	// is a guaranteed zero-cost exit from the log helper.
	Enabled(ctx context.Context, level Level) bool

	// Log emits a structured record. niro guarantees Enabled(ctx,level)==true.
	// args is a flat alternating (string key, any value) list.
	Log(ctx context.Context, level Level, msg string, args ...any)
}

Logger is the minimal structured-logging interface used internally by niro. Any logging backend can be adapted by implementing two methods.

Key-value pairs in args follow the slog alternating-key-value convention (string, any, string, any, …). niro's own call sites use only plain string keys and standard Go values — no slog.Attr — so non-slog adapters require no special-case handling.

Implementations MUST be safe for concurrent use.

PCI-DSS guarantee

niro only ever passes pre-approved, non-sensitive attributes to the logger: error codes, retry counts, durations, and request IDs. Request/response content and credentials are NEVER passed to the logger. Install a Scrubber as defence-in-depth if your application extends niro log call sites.

Adapter recipes

// slog (built-in, zero boilerplate):
niro.SetLogger(niro.NewSlogAdapter(slog.Default()))

// zap:
niro.SetLogger(&zapAdapter{l: zapLogger})
// Enabled: l.Core().Enabled(zapcore.Level(level+4))
// Log:     l.Sugar().Log(zapcore.Level(level+4), msg, keysAndValues...)

// zerolog:
niro.SetLogger(&zerologAdapter{l: &zerologLogger})
// Enabled: l.GetLevel() <= zerolog.Level(level/4+1)
// Log:     l.WithLevel(...).Fields(args).Msg(msg)

func Discard

func Discard() Logger

Discard returns a Logger that silently drops all records. Enabled always returns false so callers never build args.

niro.SetLogger(niro.Discard())

func GetLogger

func GetLogger() Logger

GetLogger returns the active library Logger.

Before any SetLogger call (or after ResetLogger) this returns a live adapter over slog.Default: changes applied via slog.SetDefault are automatically visible here with no extra configuration.

func NewSlogAdapter

func NewSlogAdapter(l *slog.Logger) Logger

NewSlogAdapter wraps a *slog.Logger as a niro Logger. The Level↔slog.Level conversion is a zero-cost integer cast (same values). If l is nil, Discard is returned.

type Message

type Message struct {
	Role  Role
	Parts []Part
}

Message represents a single message in a conversation. A message contains one or more Parts, enabling multimodal content: a user message can carry text alongside images, audio, or video.

func AssistantText

func AssistantText(text string) Message

AssistantText creates an assistant text message. Useful for injecting assistant-turn prefills.

func Multi

func Multi(role Role, parts ...Part) Message

Multi creates a multimodal message with multiple parts.

func SystemText

func SystemText(text string) Message

SystemText creates a system message.

func ToolErrorMessage

func ToolErrorMessage(callID, errMsg string) Message

ToolErrorMessage creates a tool error result message.

func ToolMessage

func ToolMessage(callID, content string) Message

ToolMessage creates a tool result message.

func UserText

func UserText(text string) Message

UserText creates a single-part text message from the user.

func (*Message) Validate

func (m *Message) Validate() error

Validate checks if a Message is valid.

type Options

type Options struct {
	MaxTokens        int      // Maximum output tokens
	Temperature      *float64 // Sampling temperature
	TopP             *float64 // Nucleus sampling
	TopK             *int     // Top-K sampling (Anthropic, Google)
	FrequencyPenalty *float64 // Frequency penalty (OpenAI)
	PresencePenalty  *float64 // Presence penalty (OpenAI)
	Stop             []string // Stop sequences
	// Cache configures provider-agnostic input cache intent.
	// Nil means cache is disabled with zero hot-path overhead.
	Cache *CacheOptions
	// ExperimentalReasoning enables provider-specific reasoning extensions.
	// Providers may emit KindCustom frames (summaries/traces) when enabled.
	ExperimentalReasoning bool
}

Options controls LLM generation parameters. Pointer fields distinguish "not set" from zero values.

func (*Options) Validate

func (o *Options) Validate() error

Validate checks if generation Options are valid.

type Part

type Part struct {
	Kind Kind

	// Text content (KindText)
	Text string

	// Binary content (KindAudio, KindImage, KindVideo)
	Data []byte
	Mime string // MIME type for Data

	// URL reference for remote content (alternative to Data).
	// Providers that support URL references will use this directly;
	// others will fetch and inline the data.
	URL string

	// Tool call (KindToolCall) — for assistant messages
	Tool *ToolCall

	// Tool result (KindToolResult) — for tool messages
	Result *ToolResult

	// Provider-specific/experimental payload (KindCustom)
	Custom *ExperimentalFrame
}

Part is a content segment within a Message. Each Part carries exactly one kind of content.

func AudioPart

func AudioPart(data []byte, mime string) Part

AudioPart creates an audio content Part from binary data.

func CustomPart

func CustomPart(custom *ExperimentalFrame) Part

CustomPart creates a Part carrying an experimental/provider-specific payload.

func ImagePart

func ImagePart(data []byte, mime string) Part

ImagePart creates an image content Part from binary data.

func ImageURLPart

func ImageURLPart(url, mime string) Part

ImageURLPart creates an image content Part from a URL.

func TextPart

func TextPart(s string) Part

TextPart creates a text content Part.

func ToolCallPart

func ToolCallPart(call *ToolCall) Part

ToolCallPart creates a tool call Part (for assistant messages).

func ToolResultPart

func ToolResultPart(result *ToolResult) Part

ToolResultPart creates a tool result Part (for tool messages).

func VideoPart

func VideoPart(data []byte, mime string) Part

VideoPart creates a video content Part from binary data.

func (*Part) Validate

func (p *Part) Validate() error

Validate checks if a Part is valid.

type PrefixNormalizer

type PrefixNormalizer interface {
	NormalizePrefix(req *Request) ([]byte, error)
}

PrefixNormalizer creates deterministic bytes for prefix hashing. Implementations should avoid non-deterministic ordering.

type PrefixNormalizerFunc

type PrefixNormalizerFunc func(req *Request) ([]byte, error)

PrefixNormalizerFunc adapts a function to PrefixNormalizer.

func (PrefixNormalizerFunc) NormalizePrefix

func (f PrefixNormalizerFunc) NormalizePrefix(req *Request) ([]byte, error)

type Provider

type Provider interface {
	Generate(ctx context.Context, req *Request) (*Stream, error)
}

Provider generates streaming LLM responses.

This is the primary interface for integrating LLM backends. Implementations must return a Stream that emits Frames as they arrive from the model — not after the full response.

Provider implementations should:

  • Emit KindText frames for each text token delta
  • Emit KindToolCall frames for completed tool calls
  • Emit KindUsage frames with token counts (consumed automatically by Stream)
  • Optionally emit KindCustom frames for provider-specific extensions
  • Set ResponseMeta via Emitter.SetResponse before closing
  • Respect context cancellation

Built-in: provider/openai, provider/anthropic, provider/google, provider/bedrock. Custom: implement this interface or use ProviderFunc.

type ProviderFunc

type ProviderFunc func(ctx context.Context, req *Request) (*Stream, error)

ProviderFunc adapts a plain function to the Provider interface. Useful for ad-hoc providers, testing, and bring-your-own-model.

mock := niro.ProviderFunc(func(ctx context.Context, req *niro.Request) (*niro.Stream, error) {
    s, e := niro.NewStream(0)
    go func() {
        defer e.Close()
        e.Emit(ctx, niro.TextFrame("hello from mock"))
    }()
    return s, nil
})

func (ProviderFunc) Generate

func (f ProviderFunc) Generate(ctx context.Context, req *Request) (*Stream, error)

type RealtimeConfig

type RealtimeConfig struct {
	// Model identifier. If empty, the provider's default model is used.
	//   Nova Sonic: "amazon.nova-sonic-v1:0"
	//   OpenAI:     "gpt-4o-realtime-preview"
	Model string

	// SystemPrompt is sent as the initial SYSTEM instruction.
	SystemPrompt string

	// Voice selects the TTS synthesis voice.
	//   Nova Sonic: "matthew", "tiffany", "amy", "brian"
	//   OpenAI:     "alloy", "echo", "fable", "nova", "onyx", "shimmer"
	Voice string

	// InputFormat is the MIME type of audio sent via Send.
	// Defaults to AudioPCM16k for Nova Sonic, AudioPCM24k for OpenAI.
	// Use the AudioPCM* constants.
	InputFormat string

	// OutputFormat is the requested audio MIME type received via Recv.
	// Defaults to AudioPCM24k (supported by all providers).
	OutputFormat string

	// Tools available for the model to call.
	// Received as KindToolCall frames from Recv.
	Tools []Tool

	// ToolChoice controls how the model selects tools.
	ToolChoice ToolChoice

	// VAD configures server-side Voice Activity Detection.
	// When nil, VAD is not configured; the caller signals turn boundaries
	// manually via Send(ControlFrame(SignalEOT)).
	VAD *VADConfig

	// Options controls generation parameters (temperature, max tokens, etc.)
	Options Options
}

RealtimeConfig configures a bidirectional speech session.

type RealtimeProvider

type RealtimeProvider interface {
	Session(ctx context.Context, cfg RealtimeConfig) (RealtimeSession, error)
}

RealtimeProvider creates long-lived bidirectional speech sessions. Unlike Provider (unidirectional request-response), RealtimeProvider supports continuous audio streaming in both directions simultaneously — essential for voice assistants, telephony agents, and live translation.

Built-in implementations:

  • provider/bedrock [SonicProvider] — Amazon Nova Sonic (16 kHz in, 24 kHz out)
  • provider/realtime Provider — OpenAI Realtime API (24 kHz in/out)

type RealtimeSession

type RealtimeSession interface {
	// Send sends a frame to the model.
	//
	// Supported frame kinds:
	//   - KindAudio      raw PCM audio chunk from the microphone
	//   - KindText       text message (provider-dependent support)
	//   - KindToolResult result of a tool call received via Recv
	//   - KindControl
	//       SignalEOT    end of user turn; model generates a response
	//       SignalAbort  cancel an in-progress model response (barge-in)
	Send(ctx context.Context, f Frame) error

	// Recv returns the read-only output stream from the model.
	// Call once; the stream remains open for the full session lifetime.
	//
	// Frame kinds emitted:
	//   - KindAudio     synthesized speech
	//   - KindText      transcript of the model's speech (when available)
	//   - KindToolCall  tool invocation request; respond with Send(ToolResultFrame)
	//   - KindControl
	//       SignalFlush barge-in detected; clear the audio playback buffer
	//       SignalEOT   model finished speaking this turn
	//   - KindUsage     token/audio usage at end of turn
	Recv() *Stream

	// Close terminates the session and releases all resources.
	// Safe to call multiple times and from any goroutine.
	Close() error

	// Err returns the first error that caused the session to fail.
	// Returns nil if the session closed cleanly.
	Err() error
}

RealtimeSession is a live bidirectional speech session.

Send and Recv operate concurrently. Run the audio-sending loop in a separate goroutine from the receiving loop.

sess, err := provider.Session(ctx, niro.RealtimeConfig{
    SystemPrompt: "You are a helpful voice assistant.",
})
defer sess.Close()

// --- send goroutine ---
go func() {
    for chunk := range mic.Chunks() {
        if err := sess.Send(ctx, niro.AudioFrame(chunk, niro.AudioPCM16k)); err != nil {
            return
        }
    }
    // Signal end of user turn; model will respond.
    sess.Send(ctx, niro.ControlFrame(niro.SignalEOT))
}()

// --- receive loop ---
out := sess.Recv()
for out.Next(ctx) {
    f := out.Frame()
    switch f.Kind {
    case niro.KindAudio:
        speaker.Play(f.Data)         // PCM24k synthesized speech
    case niro.KindText:
        fmt.Print(f.Text)            // transcript (when available)
    case niro.KindToolCall:
        result := executeTool(f.Tool)
        sess.Send(ctx, niro.ToolResultFrame(&niro.ToolResult{
            CallID:  f.Tool.ID,
            Content: result,
        }))
    case niro.KindControl:
        if f.Signal == niro.SignalFlush {
            speaker.ClearBuffer() // barge-in: user interrupted
        }
    }
}

type Request

type Request struct {
	// Client selects a logical provider/client at runtime.
	//
	// This is used by multi-tenant routers (e.g. MultiTenantProvider)
	// to pick the underlying SDK client/provider for this request.
	//
	// Example values: "tenant-a-openai", "enterprise-bedrock-usw2".
	//
	// If empty, router-specific fallbacks apply (context/default client).
	Client string

	// Model identifier (e.g. "gpt-4o", "claude-sonnet-4-5", "gemini-2.0-flash").
	// If empty, the provider's default model is used.
	Model string

	// SystemPrompt is a convenience field for a single system message.
	// Prepended to Messages automatically. If you need multiple system
	// messages or interleaved system turns, use Messages directly.
	SystemPrompt string

	// Messages is the conversation history.
	// Multimodal: messages can contain text, images, audio, video.
	Messages []Message

	// Tools available for the LLM to call.
	Tools []Tool

	// ToolChoice controls how the model selects tools.
	// Default is ToolChoiceAuto.
	ToolChoice ToolChoice

	// ResponseFormat controls the output format.
	// Supported values depend on the provider:
	//   - "" (default): plain text
	//   - "json": JSON output
	//   - "json_schema": structured output (use with ResponseSchema)
	ResponseFormat string

	// ResponseSchema is a JSON Schema for structured output.
	// Only used when ResponseFormat is "json_schema".
	ResponseSchema json.RawMessage

	// Options controls generation parameters.
	Options Options

	// Extra carries provider-specific configuration.
	// Each provider documents its accepted types (typically a RequestHook
	// function that receives the raw SDK params). Providers ignore
	// unrecognized types. Use for per-request SDK customization not
	// covered by the common Options.
	//
	//   stream, err := llm.Generate(ctx, &niro.Request{
	//       Messages: msgs,
	//       Extra: openai.RequestHook(func(p *oai.ChatCompletionNewParams) {
	//           p.LogProbs = oai.Bool(true)
	//       }),
	//   })
	Extra any
}

Request contains everything needed to call an LLM.

func (*Request) EffectiveMessages

func (r *Request) EffectiveMessages() []Message

EffectiveMessages returns the final message list including any SystemPrompt prepended as a system message.

func (*Request) Validate

func (r *Request) Validate() *Error

Validate checks if the Request is valid and returns a detailed Error if not. This should be called before invoking Provider.Generate.

type ResponseMeta

type ResponseMeta struct {
	// Model actually used (may differ from requested if provider aliases).
	Model string

	// FinishReason indicates why generation stopped.
	// Common values: "stop", "length", "tool_calls", "content_filter".
	FinishReason string

	// ID is the provider-assigned response ID.
	ID string

	// Usage is the token usage for this generation.
	Usage Usage

	// Provider-specific metadata (opaque).
	ProviderMeta map[string]any
}

ResponseMeta carries metadata about a completed generation. Available after the stream is fully consumed via Stream.Response().

func GetResponseMeta

func GetResponseMeta() *ResponseMeta

GetResponseMeta returns a ResponseMeta from the pool. Reset to zero.

type Role

type Role string

Role identifies the sender of a message in a conversation.

const (
	RoleSystem    Role = "system"
	RoleUser      Role = "user"
	RoleAssistant Role = "assistant"
	RoleTool      Role = "tool"
)

type STTFunc

type STTFunc func(ctx context.Context, req *STTRequest) (*Stream, error)

STTFunc adapts a plain function to the STTProvider interface.

func (STTFunc) Transcribe

func (f STTFunc) Transcribe(ctx context.Context, req *STTRequest) (*Stream, error)

type STTProvider

type STTProvider interface {
	Transcribe(ctx context.Context, req *STTRequest) (*Stream, error)
}

STTProvider transcribes audio to text.

The returned Stream emits KindText frames for transcript segments. Interim (partial) results and final results are both KindText; use [STTMeta] in Frame.Extra or provider conventions to distinguish them.

Built-in: provider/elevenlabs, provider/googlespeech. Custom: implement this interface or use STTFunc.

type STTRequest

type STTRequest struct {
	// Audio is the input audio data.
	// For single-shot transcription: provide the full audio bytes.
	// For streaming: use AudioStream instead.
	Audio []byte

	// AudioStream is a Stream of [KindAudio] frames for streaming transcription.
	// When set, Audio is ignored.
	AudioStream *Stream

	// InputFormat is the MIME type of the input audio (e.g. AudioPCM16k, AudioOGGOpus).
	// Required so the provider knows how to decode.
	InputFormat string

	// Model selects the STT model (provider-specific).
	// If empty, the provider's default model is used.
	Model string

	// Language is a BCP-47 language tag hint.
	Language string

	// InterimResults requests partial/interim transcripts in addition to final ones.
	InterimResults bool

	// Extra carries provider-specific configuration.
	Extra any
}

STTRequest contains everything needed for a speech-to-text call.

type Scrubber

type Scrubber func(key string, val any) any

Scrubber sanitises log attribute values before they reach the Logger.

PCI-DSS Requirements 3.4 and 10.3 prohibit logging PANs, credentials, and authentication tokens in clear text. Install a Scrubber as a defence-in-depth layer. niro itself never passes sensitive values to the logger; the Scrubber protects against application code that adds attrs to niro log call sites.

The function receives each attribute key and value; it returns the value to emit, a masked replacement such as "[REDACTED]", or nil to drop the field.

Install via SetScrubber. DefaultScrubber covers the most common PCI-DSS sensitive key patterns. The scrubber is only invoked when the logger is enabled for the record's level — zero overhead when the level is off.

type Signal

type Signal uint8

Signal represents a pipeline control signal.

const (
	SignalNone  Signal = iota
	SignalFlush        // Flush buffered data downstream
	SignalEOT          // End of turn
	SignalAbort        // Abort pipeline
)

func (Signal) String

func (s Signal) String() string

String returns the human-readable name of the Signal.

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream reads Frames from a pipeline stage.

Stream follows the bufio.Scanner iteration pattern:

for stream.Next(ctx) {
    f := stream.Frame()
    // process f
}
if err := stream.Err(); err != nil {
    // handle error
}
usage := stream.Usage()

Streams respect context cancellation and propagate errors from the writing side (Emitter). Usage data is accumulated automatically from KindUsage frames.

func StreamFromSlice

func StreamFromSlice(frames []Frame) *Stream

StreamFromSlice creates a Stream pre-loaded with the given frames. The stream is immediately closed after all frames are buffered. Useful for testing and providing static input.

func (*Stream) Chan

func (s *Stream) Chan() <-chan Frame

Chan exposes the underlying channel for use in select statements. Advanced use only — prefer Next for standard iteration.

func (*Stream) Err

func (s *Stream) Err() error

Err returns the first error encountered during iteration. Returns nil on clean end-of-stream.

func (*Stream) Frame

func (s *Stream) Frame() Frame

Frame returns the current Frame. Must only be called after Next returns true.

func (*Stream) Next

func (s *Stream) Next(ctx context.Context) bool

Next advances the Stream to the next Frame. Returns false when the stream is exhausted, an error occurs, or the context is canceled.

KindUsage frames are consumed automatically and accumulated in the Usage — they are not returned to the caller.

func (*Stream) Response

func (s *Stream) Response() *ResponseMeta

Response returns provider metadata set by the Emitter. Available after the stream is fully consumed.

func (*Stream) Usage

func (s *Stream) Usage() Usage

Usage returns the accumulated token usage. Fully populated after the stream is exhausted.

type TTSFunc

type TTSFunc func(ctx context.Context, req *TTSRequest) (*Stream, error)

TTSFunc adapts a plain function to the TTSProvider interface.

func (TTSFunc) Synthesize

func (f TTSFunc) Synthesize(ctx context.Context, req *TTSRequest) (*Stream, error)

type TTSProvider

type TTSProvider interface {
	Synthesize(ctx context.Context, req *TTSRequest) (*Stream, error)
}

TTSProvider synthesizes speech from text.

Unlike Provider (which operates on message turns), TTSProvider is a simple text→audio streaming interface. The returned Stream emits KindAudio frames as chunks arrive from the synthesis engine.

Built-in: provider/elevenlabs, provider/googlespeech. Custom: implement this interface or use TTSFunc.

type TTSRequest

type TTSRequest struct {
	// Text is the input to synthesize.
	Text string

	// Voice selects the synthesis voice (provider-specific).
	// If empty, the provider's default voice is used.
	Voice string

	// Model selects the TTS model (provider-specific).
	// If empty, the provider's default model is used.
	Model string

	// Language is a BCP-47 language tag (e.g. "en", "es", "de").
	// Not all providers require this.
	Language string

	// OutputFormat is the desired output MIME type (e.g. AudioOGGOpus, AudioMP3).
	// If empty, the provider picks its default format.
	OutputFormat string

	// Speed is a playback speed multiplier. 1.0 = normal.
	// Not all providers support this.
	Speed float64

	// Extra carries provider-specific configuration.
	// Each provider documents its accepted types.
	Extra any
}

TTSRequest contains everything needed for a text-to-speech call.

type TimeoutConfig

type TimeoutConfig struct {
	// GenerationTimeout is the max time for a single generation request
	GenerationTimeout time.Duration
	// FrameTimeout is the max time to wait for the next frame
	FrameTimeout time.Duration
	// ToolTimeout is the max time to wait for a tool execution result
	ToolTimeout time.Duration
}

TimeoutConfig configures timeout behavior.

func DefaultTimeoutConfig

func DefaultTimeoutConfig() TimeoutConfig

DefaultTimeoutConfig returns sensible defaults: - 5 minutes for generation - 30 seconds for each frame - 1 minute for tool execution

func TelephonyTimeoutConfig

func TelephonyTimeoutConfig() TimeoutConfig

TelephonyTimeoutConfig returns timeouts tuned for real-time voice pipelines where the full response must fit inside a conversational turn:

  • GenerationTimeout: 8 s — total budget for a single voice turn
  • FrameTimeout: 300 ms — barge-in detection window; stalled tokens abort the stream before the user notices a freeze
  • ToolTimeout: 5 s — tools must resolve within the voice-turn budget

These values are intentionally conservative. Adjust per deployment after measuring p99 TTFT and tool-execution latencies in production.

type TimeoutProvider

type TimeoutProvider struct {
	// contains filtered or unexported fields
}

TimeoutProvider wraps a Provider with generation timeout enforcement.

The timeout covers the entire generation lifecycle: from the initial API call through streaming until the last frame is consumed.

func NewTimeoutProvider

func NewTimeoutProvider(p Provider, timeout time.Duration) *TimeoutProvider

NewTimeoutProvider creates a Provider that enforces generation timeouts.

func (*TimeoutProvider) Generate

func (tp *TimeoutProvider) Generate(ctx context.Context, req *Request) (*Stream, error)

Generate implements Provider with timeout enforcement.

The timeout context is propagated into the underlying stream so that it remains active for the full duration of stream consumption — not just the initial Generate call.

type Tool

type Tool struct {
	Name        string          // Function name
	Description string          // Human-readable description
	Parameters  json.RawMessage // JSON Schema for parameters
}

Tool defines a tool that can be provided to an LLM.

func (*Tool) Validate

func (t *Tool) Validate() error

Validate checks if a Tool definition is valid.

type ToolCall

type ToolCall struct {
	ID   string          // Provider-assigned call ID
	Name string          // Tool function name
	Args json.RawMessage // Arguments as JSON
}

ToolCall represents an LLM's request to invoke a tool.

func (*ToolCall) Validate

func (tc *ToolCall) Validate() error

Validate checks if a ToolCall is valid.

type ToolChoice

type ToolChoice string

ToolChoice controls how the model selects tools.

const (
	ToolChoiceAuto     ToolChoice = "auto"     // Model decides (default)
	ToolChoiceNone     ToolChoice = "none"     // Never call tools
	ToolChoiceRequired ToolChoice = "required" // Must call at least one tool
)

func ToolChoiceFunc

func ToolChoiceFunc(name string) ToolChoice

ToolChoiceFunc forces the model to call a specific tool.

func (ToolChoice) Validate

func (tc ToolChoice) Validate() error

Validate checks if ToolChoice is valid when tools are present.

type ToolResult

type ToolResult struct {
	CallID  string // Matches ToolCall.ID
	Content string // Result content (may be JSON or plain text)
	IsError bool   // Whether this result represents an error
}

ToolResult represents the outcome of a tool invocation.

func (*ToolResult) Validate

func (tr *ToolResult) Validate() error

Validate checks if a ToolResult is valid.

type Usage

type Usage struct {
	InputTokens  int // Prompt tokens
	OutputTokens int // Completion tokens
	TotalTokens  int // InputTokens + OutputTokens (some providers report directly)

	// Provider-specific detail (optional).
	// E.g. cached tokens, audio tokens, reasoning tokens.
	Detail map[string]int
}

Usage tracks token consumption for a generation.

func GetUsage

func GetUsage() *Usage

GetUsage returns a Usage from the pool. Reset to zero values. Call PutUsage when done.

func (*Usage) Add

func (u *Usage) Add(other *Usage)

Add accumulates usage from another Usage into this one.

func (*Usage) Reset

func (u *Usage) Reset()

Reset zeroes all fields. Useful when reusing a Usage from a pool.

type VADConfig

type VADConfig struct {
	// Threshold is the activation confidence (0.0–1.0).
	// Higher values reduce false positives at the cost of onset latency.
	// Default: 0.5
	Threshold float64

	// PrefixPaddingMs is the pre-speech audio prepended to each utterance
	// to avoid clipping word onset. Default: 300 ms.
	PrefixPaddingMs int

	// SilenceDurationMs is the post-speech silence before turn-end is
	// triggered. Default: 200 ms.
	SilenceDurationMs int
}

VADConfig configures server-side Voice Activity Detection.

When enabled, the provider automatically detects speech boundaries and emits KindControl frames:

  • SignalFlush on speech start (user is talking; clear playback buffer)
  • SignalEOT on speech end (user stopped talking; model will respond)

Directories

Path Synopsis
Package component provides the Component interface and ComponentHost for managing pluggable runtime extensions with lifecycle management.
Package component provides the Component interface and ComponentHost for managing pluggable runtime extensions with lifecycle management.
Package hook provides the Hook interface for observing LLM generation lifecycle events: start, end, per-frame, tool calls, and errors.
Package hook provides the Hook interface for observing LLM generation lifecycle events: start, end, per-frame, tool calls, and errors.
internal
sse
Package sse provides a minimal Server-Sent Events reader.
Package sse provides a minimal Server-Sent Events reader.
Package middleware provides provider wrappers: Cache, Retry, Timeout, and Tracing.
Package middleware provides provider wrappers: Cache, Retry, Timeout, and Tracing.
Package orchestrate provides concurrent workflow primitives: Fan (parallel merge), Race (first wins), and Sequence (chained).
Package orchestrate provides concurrent workflow primitives: Fan (parallel merge), Race (first wins), and Sequence (chained).
Package pipe provides the Processor interface and built-in processors for transforming Frame streams in a Pipeline.
Package pipe provides the Processor interface and built-in processors for transforming Frame streams in a Pipeline.
Package pool provides a size-class byte buffer pool that eliminates allocations on media-heavy hot paths (audio chunks, image tiles, video frames).
Package pool provides a size-class byte buffer pool that eliminates allocations on media-heavy hot paths (audio chunks, image tiles, video frames).
provider
compat
Package compat implements a Niro Provider using raw HTTP + SSE.
Package compat implements a Niro Provider using raw HTTP + SSE.
anthropic module
bedrock module
elevenlabs module
google module
openai module
realtime module
Package registry provides named provider registration and routing.
Package registry provides named provider registration and routing.
Package runtime manages the lifecycle of a Provider with optional hooks and a post-processing Pipeline.
Package runtime manages the lifecycle of a Provider with optional hooks and a post-processing Pipeline.
Package transport provides production-grade HTTP transport and client configurations tuned for high-concurrency LLM API traffic.
Package transport provides production-grade HTTP transport and client configurations tuned for high-concurrency LLM API traffic.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL