Documentation
¶
Overview ¶
Package chat implements Koopa's main conversational agent.
Agent is a stateless, LLM-powered agent that provides conversational interactions with tool calling and knowledge base integration. It uses the Google Genkit framework for LLM inference and tool orchestration.
Architecture ¶
The Agent follows a stateless design pattern with dependency injection:
InvocationContext (input)
|
v
Agent.ExecuteStream() or Agent.Execute()
|
+-- Load session history from SessionStore
|
+-- Build message list (history + user input)
|
+-- Call Genkit Generate with:
| - LLM model
| - Tool references (cached at initialization)
| - Message history
| - Optional: StreamCallback for real-time output
|
+-- Save updated history to SessionStore
|
v
Response (final text + tool requests)
Configuration ¶
Agent requires configuration via the Config struct at construction time:
type Config struct {
Genkit *genkit.Genkit
SessionStore *session.Store
Logger *slog.Logger
Tools []ai.Tool
// Configuration values
ModelName string // e.g., "googleai/gemini-2.5-flash"
MaxTurns int // Maximum agentic loop turns
Language string // Response language preference
// Resilience configuration
RetryConfig RetryConfig
CircuitBreakerConfig CircuitBreakerConfig
RateLimiter *rate.Limiter
// Token management
TokenBudget TokenBudget
}
Required fields are validated during construction.
Streaming Support ¶
Agent supports both streaming and non-streaming execution modes:
- Execute(): Non-streaming, returns complete response
- ExecuteStream(): Streaming with optional callback for real-time output
For streaming, provide a StreamCallback function:
type StreamCallback func(ctx context.Context, chunk *ai.ModelResponseChunk) error
The callback is invoked for each chunk of the response, enabling real-time display (typewriter effect) in CLI or SSE streaming in HTTP APIs.
Flow (Genkit Integration) ¶
The package provides a Genkit Flow for HTTP and observability:
- NewFlow(): Returns singleton streaming Flow (idempotent, safe to call multiple times)
- Flow supports both Run() and Stream() methods
- Stream() enables Server-Sent Events (SSE) for real-time responses
Example Flow usage:
// Create Flow (idempotent — first call initializes, subsequent calls return cached)
chatFlow := chat.NewFlow(g, chatAgent)
// Non-streaming
output, err := chatFlow.Run(ctx, chat.Input{Query: "Hello", SessionID: "..."})
// Streaming (for SSE)
for streamValue, err := range chatFlow.Stream(ctx, input) {
if streamValue.Done {
// Final output in streamValue.Output
} else {
// Partial chunk in streamValue.Stream.Text
}
}
Tool Registration ¶
Tools are registered from toolsets during initialization:
- For each toolset, get its available tools via Tools() method
- Convert ExecutableTools to Genkit format using genkit.DefineTool
- Cache tool references for reuse across invocations
- Validate that all tools were registered successfully
Tool references are cached to avoid re-registering on every Execute call.
Session Management ¶
Chat manages conversation history through the SessionStore:
History: Retrieves previous messages for a session AppendMessages: Persists new messages incrementally (preferred)
History save failures are logged but don't fail the request.
Example Usage ¶
// Create agent with required configuration
agent, err := chat.New(chat.Config{
Genkit: g,
SessionStore: sessionStore,
Logger: slog.Default(),
Tools: tools,
ModelName: "googleai/gemini-2.5-flash",
MaxTurns: 10,
Language: "auto",
})
if err != nil {
return err
}
// Non-streaming execution
resp, err := agent.Execute(ctx, sessionID, "What is the weather?")
// Streaming execution with callback
resp, err := agent.ExecuteStream(ctx, sessionID, "What is the weather?",
func(ctx context.Context, chunk *ai.ModelResponseChunk) error {
fmt.Print(chunk.Text()) // Real-time output
return nil
})
Error Handling ¶
The package uses sentinel errors for categorization:
- ErrInvalidSession: Invalid session ID format
- ErrExecutionFailed: LLM or tool execution failed
Empty responses are handled with a fallback message to improve UX.
Testing ¶
Agent is designed for testability:
- Dependencies are concrete types with clear interfaces
- Stateless design eliminates test ordering issues
- Config struct allows partial configuration for unit tests
Thread Safety ¶
Agent is safe for concurrent use. The underlying dependencies (SessionStore, Genkit) must also be thread-safe.
Index ¶
- Constants
- Variables
- func ResetFlowForTesting()
- type Agent
- func (a *Agent) DefineFlow(g *genkit.Genkit) *Flow
- func (a *Agent) Execute(ctx context.Context, sessionID uuid.UUID, input string) (*Response, error)
- func (a *Agent) ExecuteStream(ctx context.Context, sessionID uuid.UUID, input string, ...) (*Response, error)
- func (a *Agent) GenerateTitle(ctx context.Context, userMessage string) string
- type CircuitBreaker
- type CircuitBreakerConfig
- type CircuitState
- type Config
- type Flow
- type Input
- type Output
- type Response
- type RetryConfig
- type StreamCallback
- type StreamChunk
- type TokenBudget
Constants ¶
const ( // Name is the unique identifier for the Chat agent. Name = "chat" // Description describes the Chat agent's capabilities. Description = "A general purpose chat agent that can help with various tasks using tools and knowledge base." // KoopaPromptName is the name of the Dotprompt file for the Chat agent. // This corresponds to prompts/koopa.prompt. // NOTE: The LLM model is configured in the Dotprompt file, not via Config. KoopaPromptName = "koopa" )
Agent name and description constants
const FlowName = "koopa/chat"
FlowName is the registered name of the Chat Flow in Genkit.
Variables ¶
var ( // ErrInvalidSession indicates the session ID is invalid or malformed. ErrInvalidSession = errors.New("invalid session") // ErrExecutionFailed indicates agent execution failed. ErrExecutionFailed = errors.New("execution failed") )
Sentinel errors for agent operations.
var ErrCircuitOpen = errors.New("circuit breaker is open")
ErrCircuitOpen is returned when the circuit is open.
Functions ¶
func ResetFlowForTesting ¶
func ResetFlowForTesting()
ResetFlowForTesting resets the Flow singleton for testing. This allows tests to initialize with different configurations. WARNING: Only use in tests. Not safe for concurrent use.
Types ¶
type Agent ¶
type Agent struct {
// contains filtered or unexported fields
}
Agent is Koopa's main conversational agent. It provides LLM-powered conversations with tool calling and knowledge base integration.
Agent is stateless and uses dependency injection. Required parameters are provided via Config struct.
All configuration values are captured immutably at construction time to ensure thread-safe concurrent access.
func New ¶
New creates a new Agent with required configuration.
RAG context is provided by knowledge tools (search_documents, search_history, search_system_knowledge) which the LLM calls when it determines context is needed.
NOTE: The LLM model is configured in prompts/koopa.prompt, not via Config.
Example:
agent, err := chat.New(chat.Config{
Genkit: g,
SessionStore: sessionStore,
Logger: logger,
Tools: tools, // Pre-registered via RegisterXxxTools()
MaxTurns: cfg.MaxTurns,
Language: cfg.Language,
})
func (*Agent) DefineFlow ¶
DefineFlow defines the Genkit Streaming Flow for Chat Agent. Supports both streaming (via callback) and non-streaming modes.
IMPORTANT: Use NewFlow() instead of calling DefineFlow() directly. DefineFlow registers a global Flow; calling it twice causes panic.
Each Agent has its own dedicated Flow, responsible for: 1. Observability (Genkit DevUI tracing) 2. Type safety (Input/Output schema) 3. HTTP endpoint exposure via genkit.Handler() 4. Streaming support for real-time output
Design: Flow is a lightweight wrapper, Agent.ExecuteStream() contains core logic
Error Handling : - Errors are now properly returned using sentinel errors from agent package - Genkit tracing will correctly show error spans - HTTP handlers can use errors.Is() to determine error type and HTTP status
func (*Agent) Execute ¶
Execute runs the chat agent with the given input (non-streaming). This is a convenience wrapper around ExecuteStream with nil callback.
func (*Agent) ExecuteStream ¶
func (a *Agent) ExecuteStream(ctx context.Context, sessionID uuid.UUID, input string, callback StreamCallback) (*Response, error)
ExecuteStream runs the chat agent with optional streaming output. If callback is non-nil, it is called for each chunk of the response as it's generated. If callback is nil, the response is generated without streaming (equivalent to Execute). The final response is always returned after generation completes.
type CircuitBreaker ¶
type CircuitBreaker struct {
// contains filtered or unexported fields
}
CircuitBreaker implements the circuit breaker pattern.
func NewCircuitBreaker ¶
func NewCircuitBreaker(cfg CircuitBreakerConfig) *CircuitBreaker
NewCircuitBreaker creates a new circuit breaker.
func (*CircuitBreaker) Allow ¶
func (cb *CircuitBreaker) Allow() error
Allow checks if a request should be allowed. Uses exclusive lock to safely handle Open -> HalfOpen transition.
func (*CircuitBreaker) Reset ¶
func (cb *CircuitBreaker) Reset()
Reset resets the circuit breaker to closed state. This is primarily useful for testing.
func (*CircuitBreaker) State ¶
func (cb *CircuitBreaker) State() CircuitState
State returns the current circuit state.
func (*CircuitBreaker) Success ¶
func (cb *CircuitBreaker) Success()
Success records a successful call.
type CircuitBreakerConfig ¶
type CircuitBreakerConfig struct {
FailureThreshold int // Failures before opening (default: 5)
SuccessThreshold int // Successes to close from half-open (default: 2)
Timeout time.Duration // Time before trying half-open (default: 30s)
}
CircuitBreakerConfig configures the circuit breaker.
func DefaultCircuitBreakerConfig ¶
func DefaultCircuitBreakerConfig() CircuitBreakerConfig
DefaultCircuitBreakerConfig returns sensible defaults.
type CircuitState ¶
type CircuitState int
CircuitState represents the state of the circuit breaker.
const ( // CircuitClosed is the normal operation state. CircuitClosed CircuitState = iota // CircuitOpen rejects all requests. CircuitOpen // CircuitHalfOpen allows test requests to check recovery. CircuitHalfOpen )
func (CircuitState) String ¶
func (s CircuitState) String() string
String returns the string representation of the circuit state.
type Config ¶
type Config struct {
Genkit *genkit.Genkit
SessionStore *session.Store
Logger *slog.Logger
Tools []ai.Tool // Pre-registered tools from RegisterXxxTools()
// Configuration values
ModelName string // Provider-qualified model name (e.g., "googleai/gemini-2.5-flash", "ollama/llama3.3")
MaxTurns int // Maximum agentic loop turns
Language string // Response language preference
// Resilience configuration
RetryConfig RetryConfig // LLM retry settings (zero-value uses defaults)
CircuitBreakerConfig CircuitBreakerConfig // Circuit breaker settings (zero-value uses defaults)
RateLimiter *rate.Limiter // Optional: proactive rate limiting (nil = use default)
// Token management
TokenBudget TokenBudget // Token budget for context window (zero-value uses defaults)
// Memory (optional)
MemoryStore *memory.Store // User memory store (nil = memory disabled)
// Background lifecycle (required when MemoryStore is set).
// BackgroundCtx outlives individual requests — used for async extraction.
// WG tracks background goroutines for graceful shutdown.
BackgroundCtx context.Context //nolint:containedctx // App lifecycle context, not a request context
WG *sync.WaitGroup
}
Config contains all required parameters for Chat agent.
type Flow ¶
type Flow = core.Flow[Input, Output, StreamChunk]
Flow is the type alias for Chat Agent's Genkit Streaming Flow. Exported for use in api package with genkit.Handler().
type Input ¶
type Input struct {
Query string `json:"query"`
SessionID string `json:"sessionId"` // Required field: session ID
}
Input defines the request payload for the chat agent flow.
type Response ¶
type Response struct {
FinalText string // Model's final text output
ToolRequests []*ai.ToolRequest // Tool requests made during execution
}
Response represents the complete result of an agent execution.
type RetryConfig ¶
type RetryConfig struct {
MaxRetries int // Maximum number of retry attempts
InitialInterval time.Duration // Initial backoff interval
MaxInterval time.Duration // Maximum backoff interval
}
RetryConfig configures the retry behavior for LLM calls.
func DefaultRetryConfig ¶
func DefaultRetryConfig() RetryConfig
DefaultRetryConfig returns sensible defaults for LLM API calls.
type StreamCallback ¶
type StreamCallback func(ctx context.Context, chunk *ai.ModelResponseChunk) error
StreamCallback is called for each chunk of streaming response. The chunk contains partial content that can be immediately displayed to the user. Return an error to abort the stream.
type StreamChunk ¶
type StreamChunk struct {
Text string `json:"text"` // Partial text chunk
}
StreamChunk is the streaming output type for Chat Flow. Each chunk contains partial text that can be immediately displayed to the user.
type TokenBudget ¶
type TokenBudget struct {
MaxHistoryTokens int // Maximum tokens for conversation history
MaxMemoryTokens int // Maximum tokens for user memory injection
}
TokenBudget manages context window limits.
func DefaultTokenBudget ¶
func DefaultTokenBudget() TokenBudget
DefaultTokenBudget returns defaults for modern large-context models. 32K tokens ≈ 64 conversation turns — balances context retention with cost.