Documentation
¶
Overview ¶
Package clarify provides a durable store for clarifying questions that the LLM can ask the user. It uses a DB + in-process channel hybrid (the same pattern as the HITL approval store):
- DB layer provides durable persistence so pending questions survive restarts.
- In-process channels provide instant notification when a user answers.
- A polling fallback ensures answers are detected even if the channel signal is missed.
- RecoverPending re-registers channels on startup for questions that were pending when the server last stopped.
Index ¶
- Constants
- func NewTool(clarifyStore Store, emitter EventEmitter, opts ...ToolOption) tool.CallableTool
- type AskClarifyingQuestionRequest
- type ClarificationEvent
- type DBStore
- func (s *DBStore) Ask(ctx context.Context, question, qContext, senderContext string) (string, <-chan Response, error)
- func (s *DBStore) Cleanup(id string)
- func (s *DBStore) Close()
- func (s *DBStore) FindPendingByQuestion(ctx context.Context, question string) (string, bool)
- func (s *DBStore) RecoverPending(ctx context.Context, maxAge time.Duration) (RecoverResult, error)
- func (s *DBStore) Respond(id, answer string) error
- func (s *DBStore) WaitForResponse(ctx context.Context, id string, ch <-chan Response) (Response, error)
- type EventEmitter
- type RecoverResult
- type Request
- type Response
- type Store
- type ToolOption
- type ToolProvider
Constants ¶
const ToolName = "ask_clarifying_question"
Variables ¶
This section is empty.
Functions ¶
func NewTool ¶
func NewTool(clarifyStore Store, emitter EventEmitter, opts ...ToolOption) tool.CallableTool
NewTool creates the "ask_clarifying_question" tool.
The tool blocks the LLM goroutine until the user responds (via [Store.Respond]) or the context expires. The emitter is called to forward the question to the AG-UI SSE stream and/or messenger.
Duplicate questions within the same tool instance are suppressed:
- In-memory dedup map (bounded to [maxAskedQuestions] entries).
- Cross-session dedup via [Store.FindPendingByQuestion].
Pass WithNonBlocking to return an interrupt.Error instead of blocking; this is the forward-looking path for Temporal integration.
Types ¶
type AskClarifyingQuestionRequest ¶
type AskClarifyingQuestionRequest struct {
// Question is the question text shown to the user. Required.
Question string `json:"question" jsonschema:"description=The question to ask the user. Be specific and concise.,required"`
// Context is an optional explanation of why the LLM needs this
// information, helping the user craft a better answer.
Context string `` /* 133-byte string literal not displayed */
}
AskClarifyingQuestionRequest is the JSON input schema for the ask_clarifying_question tool.
type ClarificationEvent ¶
type ClarificationEvent struct {
RequestID string // unique ID for this clarification, matches the DB row
Question string // the question text displayed to the user
Context string // optional LLM-provided context (may be empty)
}
ClarificationEvent is emitted to the UI when the LLM invokes the ask_clarifying_question tool. It is defined in this package (rather than agui) to avoid a circular import: agui imports clarify.
type DBStore ¶
type DBStore struct {
// contains filtered or unexported fields
}
DBStore is the default Store implementation. It persists clarification requests to a GORM-backed SQLite database and uses in-process buffered channels for instant notification when a user answers.
The design mirrors [hitl.ApprovalStore]: the DB provides durability across restarts while channels avoid the latency of polling.
All exported methods are safe for concurrent use.
func (*DBStore) Ask ¶
func (s *DBStore) Ask(ctx context.Context, question, qContext, senderContext string) (string, <-chan Response, error)
Ask persists a new "pending" clarification row and returns:
- id: a UUID string uniquely identifying this question.
- ch: a buffered channel (cap 1) that will receive a Response when [Respond] is called for this id.
The DB write is retried up to 5 times with 200 ms backoff to handle transient SQLITE_BUSY errors.
func (*DBStore) Cleanup ¶
Cleanup removes the in-process waiter channel for the given request ID. This prevents the sync.Map from growing unboundedly over long-running sessions. It does not modify the database row.
Callers should defer Cleanup(id) immediately after [Ask] returns.
func (*DBStore) Close ¶
func (s *DBStore) Close()
Close removes all in-process waiter channels from the sync.Map. Call this during graceful shutdown to release resources. It does not close the underlying database connection.
func (*DBStore) FindPendingByQuestion ¶
FindPendingByQuestion queries the DB for a row with status "pending" and the exact question text. Returns (requestID, true) if found, ("", false) otherwise. Used by NewTool for cross-session deduplication.
func (*DBStore) RecoverPending ¶
RecoverPending scans the DB for rows with status "pending" and:
- Expires rows whose CreatedAt is older than maxAge (sets status to "expired" and records AnsweredAt).
- Re-registers a fresh waiter channel for recent rows so that a subsequent [Respond] call can still signal the in-process select.
Call this once at application startup, before the AG-UI server starts accepting traffic. The RecoverResult tells you how many questions were expired vs recovered.
func (*DBStore) Respond ¶
Respond atomically marks the clarification as "answered" in the DB and pushes the answer to the in-process waiter channel (if one exists).
The DB update is retried with exponential back-off (default 3 attempts, 1 s interval) to handle SQLITE_BUSY under concurrent writes.
Returns an error if:
- the request ID does not exist in the DB.
- the request was already answered (status ≠ "pending").
func (*DBStore) WaitForResponse ¶
func (s *DBStore) WaitForResponse(ctx context.Context, id string, ch <-chan Response) (Response, error)
WaitForResponse blocks the calling goroutine until the clarification identified by id is answered or ctx expires.
Strategy (hybrid wait/notify):
- Fast-path — immediately checks the DB; returns if already answered.
- Channel — selects on ch for instant notification from [Respond].
- Polling — every 5 s re-queries the DB as a fallback in case the channel signal was lost (e.g. [Respond] ran before the select, or the answer was written by a different process after a restart).
The ch parameter must be the channel returned by [Ask] for the same id.
type EventEmitter ¶
type EventEmitter func(ctx context.Context, evt ClarificationEvent) error
EventEmitter sends a ClarificationEvent to the UI layer (AG-UI SSE stream, messenger, etc.). The implementation is wired at application startup and typically lives in the app package as a closure over the AG-UI event channel and the messenger client.
Returning a non-nil error aborts the tool call.
type RecoverResult ¶
type RecoverResult struct {
Expired int // questions older than maxAge, marked "expired" in the DB
Recovered int // questions within maxAge, waiter channels re-registered for live use
}
RecoverResult holds the outcome of [Store.RecoverPending].
type Request ¶
type Request struct {
ID string `json:"id"`
Question string `json:"question"`
Context string `json:"context,omitempty"` // optional LLM-provided context about why the question is needed
}
Request represents a pending clarifying question. It is typically serialized as JSON when sent over the AG-UI SSE stream.
type Response ¶
type Response struct {
Answer string `json:"answer"`
}
Response is the user's answer to a clarifying question. An empty Answer is valid and indicates the user explicitly provided an empty response.
type Store ¶
type Store interface {
// Ask creates a pending clarification request, persists it, and returns
// the unique request ID together with a channel that will receive the
// user's [Response] when [Respond] is called.
//
// Parameters:
// - question: the question text displayed to the user.
// - qContext: optional LLM-provided context explaining why the
// information is needed (may be empty).
// - senderContext: opaque platform-specific sender identifier
// (e.g. "slack:U1234:C5678") used for messenger routing and
// persisted so [RecoverPending] can replay questions.
//
// The returned channel is buffered (cap 1) so [Respond] never blocks.
// The caller must eventually call [Cleanup] to release the channel.
Ask(ctx context.Context, question, qContext, senderContext string) (string, <-chan Response, error)
// Respond delivers the user's answer to the pending clarification
// identified by id. It updates the persistent store and signals any
// goroutine blocked in [WaitForResponse].
//
// Returns an error if id does not exist or was already answered.
Respond(id, answer string) error
// WaitForResponse blocks until the clarification identified by id is
// answered or ctx is cancelled/expired. ch must be the channel returned
// by [Ask] for the same id.
//
// The implementation uses a hybrid strategy: it listens on ch for
// immediate notification from [Respond], and polls the database every
// 5 seconds as a fallback (in case the channel signal was missed due
// to a race with [Cleanup] or a server restart).
WaitForResponse(ctx context.Context, id string, ch <-chan Response) (Response, error)
// Cleanup removes the in-process waiter channel for the given request.
// It should be called (typically via defer) after [WaitForResponse]
// returns, whether the question was answered or timed out.
Cleanup(id string)
// FindPendingByQuestion returns the ID of an existing pending
// clarification whose question text matches the given string.
// Used for deduplication — both within a session and across sessions.
// Returns ("", false) if no pending match is found.
FindPendingByQuestion(ctx context.Context, question string) (string, bool)
// RecoverPending handles clarifications left in "pending" state from
// a previous server instance. Questions older than maxAge are marked
// "expired"; recent ones get fresh waiter channels so they can still
// be answered via the API.
//
// This should be called once at application startup, before the
// AG-UI server begins accepting requests.
RecoverPending(ctx context.Context, maxAge time.Duration) (RecoverResult, error)
// Close releases any resources held by the store (e.g. in-memory
// waiter channels). It does not close the underlying database.
Close()
}
Store is the interface for managing clarification requests.
A Store persists questions durably (across restarts) and provides a synchronous wait/notify mechanism so the calling tool goroutine blocks until the user answers or the context is cancelled.
Typical lifecycle for a single question:
id, ch, err := store.Ask(ctx, question, qContext, senderCtx) // … emit event to UI … resp, err := store.WaitForResponse(ctx, id, ch) store.Cleanup(id)
On the answering side (HTTP handler, messenger, etc.):
err := store.Respond(requestID, userAnswer)
All methods must be safe for concurrent use from multiple goroutines.
type ToolOption ¶
type ToolOption func(*askClarifyTool)
ToolOption configures the behaviour of the clarification tool returned by NewTool.
func WithNonBlocking ¶
func WithNonBlocking() ToolOption
WithNonBlocking configures the tool to return an interrupt.Error instead of blocking in [Store.WaitForResponse].
Use this when the tool runs inside an executor that models human interaction as an interrupt/resume cycle (e.g. Temporal workflows). The returned InterruptError carries the request ID and a ClarificationEvent payload that the executor can use to register a signal wait.
Default: blocking (the current in-process behaviour).
type ToolProvider ¶
type ToolProvider struct {
// contains filtered or unexported fields
}
ToolProvider wraps a clarify Store and EventEmitter and satisfies the tools.ToolProviders interface so the clarify tool can be passed directly to tools.NewRegistry.
func NewToolProvider ¶
func NewToolProvider(store Store, emitter EventEmitter, opts ...ToolOption) *ToolProvider
NewToolProvider creates a ToolProvider for the ask_clarifying_question tool.
func (*ToolProvider) GetTools ¶
func (p *ToolProvider) GetTools() []tool.Tool
GetTools returns the clarify tool.