clarify

package
v0.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 1, 2026 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Overview

Package clarify provides a durable store for clarifying questions that the LLM can ask the user. It uses a DB + in-process channel hybrid (the same pattern as the HITL approval store):

  • DB layer provides durable persistence so pending questions survive restarts.
  • In-process channels provide instant notification when a user answers.
  • A polling fallback ensures answers are detected even if the channel signal is missed.
  • RecoverPending re-registers channels on startup for questions that were pending when the server last stopped.

Index

Constants

View Source
const ToolName = "ask_clarifying_question"

Variables

This section is empty.

Functions

func NewTool

func NewTool(clarifyStore Store, emitter EventEmitter, opts ...ToolOption) tool.CallableTool

NewTool creates the "ask_clarifying_question" tool.

The tool blocks the LLM goroutine until the user responds (via [Store.Respond]) or the context expires. The emitter is called to forward the question to the AG-UI SSE stream and/or messenger.

Duplicate questions within the same tool instance are suppressed:

  • In-memory dedup map (bounded to [maxAskedQuestions] entries).
  • Cross-session dedup via [Store.FindPendingByQuestion].

Pass WithNonBlocking to return an interrupt.Error instead of blocking; this is the forward-looking path for Temporal integration.

Types

type AskClarifyingQuestionRequest

type AskClarifyingQuestionRequest struct {
	// Question is the question text shown to the user. Required.
	Question string `json:"question" jsonschema:"description=The question to ask the user. Be specific and concise.,required"`
	// Context is an optional explanation of why the LLM needs this
	// information, helping the user craft a better answer.
	Context string `` /* 133-byte string literal not displayed */
}

AskClarifyingQuestionRequest is the JSON input schema for the ask_clarifying_question tool.

type ClarificationEvent

type ClarificationEvent struct {
	RequestID string // unique ID for this clarification, matches the DB row
	Question  string // the question text displayed to the user
	Context   string // optional LLM-provided context (may be empty)
}

ClarificationEvent is emitted to the UI when the LLM invokes the ask_clarifying_question tool. It is defined in this package (rather than agui) to avoid a circular import: agui imports clarify.

type DBStore

type DBStore struct {
	// contains filtered or unexported fields
}

DBStore is the default Store implementation. It persists clarification requests to a GORM-backed SQLite database and uses in-process buffered channels for instant notification when a user answers.

The design mirrors [hitl.ApprovalStore]: the DB provides durability across restarts while channels avoid the latency of polling.

All exported methods are safe for concurrent use.

func (*DBStore) Ask

func (s *DBStore) Ask(ctx context.Context, question, qContext, senderContext string) (string, <-chan Response, error)

Ask persists a new "pending" clarification row and returns:

  • id: a UUID string uniquely identifying this question.
  • ch: a buffered channel (cap 1) that will receive a Response when [Respond] is called for this id.

The DB write is retried up to 5 times with 200 ms backoff to handle transient SQLITE_BUSY errors.

func (*DBStore) Cleanup

func (s *DBStore) Cleanup(id string)

Cleanup removes the in-process waiter channel for the given request ID. This prevents the sync.Map from growing unboundedly over long-running sessions. It does not modify the database row.

Callers should defer Cleanup(id) immediately after [Ask] returns.

func (*DBStore) Close

func (s *DBStore) Close()

Close removes all in-process waiter channels from the sync.Map. Call this during graceful shutdown to release resources. It does not close the underlying database connection.

func (*DBStore) FindPendingByQuestion

func (s *DBStore) FindPendingByQuestion(ctx context.Context, question string) (string, bool)

FindPendingByQuestion queries the DB for a row with status "pending" and the exact question text. Returns (requestID, true) if found, ("", false) otherwise. Used by NewTool for cross-session deduplication.

func (*DBStore) RecoverPending

func (s *DBStore) RecoverPending(ctx context.Context, maxAge time.Duration) (RecoverResult, error)

RecoverPending scans the DB for rows with status "pending" and:

  • Expires rows whose CreatedAt is older than maxAge (sets status to "expired" and records AnsweredAt).
  • Re-registers a fresh waiter channel for recent rows so that a subsequent [Respond] call can still signal the in-process select.

Call this once at application startup, before the AG-UI server starts accepting traffic. The RecoverResult tells you how many questions were expired vs recovered.

func (*DBStore) Respond

func (s *DBStore) Respond(id, answer string) error

Respond atomically marks the clarification as "answered" in the DB and pushes the answer to the in-process waiter channel (if one exists).

The DB update is retried with exponential back-off (default 3 attempts, 1 s interval) to handle SQLITE_BUSY under concurrent writes.

Returns an error if:

  • the request ID does not exist in the DB.
  • the request was already answered (status ≠ "pending").

func (*DBStore) WaitForResponse

func (s *DBStore) WaitForResponse(ctx context.Context, id string, ch <-chan Response) (Response, error)

WaitForResponse blocks the calling goroutine until the clarification identified by id is answered or ctx expires.

Strategy (hybrid wait/notify):

  1. Fast-path — immediately checks the DB; returns if already answered.
  2. Channel — selects on ch for instant notification from [Respond].
  3. Polling — every 5 s re-queries the DB as a fallback in case the channel signal was lost (e.g. [Respond] ran before the select, or the answer was written by a different process after a restart).

The ch parameter must be the channel returned by [Ask] for the same id.

type EventEmitter

type EventEmitter func(ctx context.Context, evt ClarificationEvent) error

EventEmitter sends a ClarificationEvent to the UI layer (AG-UI SSE stream, messenger, etc.). The implementation is wired at application startup and typically lives in the app package as a closure over the AG-UI event channel and the messenger client.

Returning a non-nil error aborts the tool call.

type RecoverResult

type RecoverResult struct {
	Expired   int // questions older than maxAge, marked "expired" in the DB
	Recovered int // questions within maxAge, waiter channels re-registered for live use
}

RecoverResult holds the outcome of [Store.RecoverPending].

type Request

type Request struct {
	ID       string `json:"id"`
	Question string `json:"question"`
	Context  string `json:"context,omitempty"` // optional LLM-provided context about why the question is needed
}

Request represents a pending clarifying question. It is typically serialized as JSON when sent over the AG-UI SSE stream.

type Response

type Response struct {
	Answer string `json:"answer"`
}

Response is the user's answer to a clarifying question. An empty Answer is valid and indicates the user explicitly provided an empty response.

type Store

type Store interface {
	// Ask creates a pending clarification request, persists it, and returns
	// the unique request ID together with a channel that will receive the
	// user's [Response] when [Respond] is called.
	//
	// Parameters:
	//   - question: the question text displayed to the user.
	//   - qContext: optional LLM-provided context explaining why the
	//     information is needed (may be empty).
	//   - senderContext: opaque platform-specific sender identifier
	//     (e.g. "slack:U1234:C5678") used for messenger routing and
	//     persisted so [RecoverPending] can replay questions.
	//
	// The returned channel is buffered (cap 1) so [Respond] never blocks.
	// The caller must eventually call [Cleanup] to release the channel.
	Ask(ctx context.Context, question, qContext, senderContext string) (string, <-chan Response, error)

	// Respond delivers the user's answer to the pending clarification
	// identified by id. It updates the persistent store and signals any
	// goroutine blocked in [WaitForResponse].
	//
	// Returns an error if id does not exist or was already answered.
	Respond(id, answer string) error

	// WaitForResponse blocks until the clarification identified by id is
	// answered or ctx is cancelled/expired. ch must be the channel returned
	// by [Ask] for the same id.
	//
	// The implementation uses a hybrid strategy: it listens on ch for
	// immediate notification from [Respond], and polls the database every
	// 5 seconds as a fallback (in case the channel signal was missed due
	// to a race with [Cleanup] or a server restart).
	WaitForResponse(ctx context.Context, id string, ch <-chan Response) (Response, error)

	// Cleanup removes the in-process waiter channel for the given request.
	// It should be called (typically via defer) after [WaitForResponse]
	// returns, whether the question was answered or timed out.
	Cleanup(id string)

	// FindPendingByQuestion returns the ID of an existing pending
	// clarification whose question text matches the given string.
	// Used for deduplication — both within a session and across sessions.
	// Returns ("", false) if no pending match is found.
	FindPendingByQuestion(ctx context.Context, question string) (string, bool)

	// RecoverPending handles clarifications left in "pending" state from
	// a previous server instance. Questions older than maxAge are marked
	// "expired"; recent ones get fresh waiter channels so they can still
	// be answered via the API.
	//
	// This should be called once at application startup, before the
	// AG-UI server begins accepting requests.
	RecoverPending(ctx context.Context, maxAge time.Duration) (RecoverResult, error)

	// Close releases any resources held by the store (e.g. in-memory
	// waiter channels). It does not close the underlying database.
	Close()
}

Store is the interface for managing clarification requests.

A Store persists questions durably (across restarts) and provides a synchronous wait/notify mechanism so the calling tool goroutine blocks until the user answers or the context is cancelled.

Typical lifecycle for a single question:

id, ch, err := store.Ask(ctx, question, qContext, senderCtx)
// … emit event to UI …
resp, err := store.WaitForResponse(ctx, id, ch)
store.Cleanup(id)

On the answering side (HTTP handler, messenger, etc.):

err := store.Respond(requestID, userAnswer)

All methods must be safe for concurrent use from multiple goroutines.

func NewStore

func NewStore(gormDB *gorm.DB) Store

NewStore creates a Store backed by the given GORM database. The caller must ensure the database has been opened and migrated (see db.Open and db.AutoMigrate) before calling NewStore.

type ToolOption

type ToolOption func(*askClarifyTool)

ToolOption configures the behaviour of the clarification tool returned by NewTool.

func WithNonBlocking

func WithNonBlocking() ToolOption

WithNonBlocking configures the tool to return an interrupt.Error instead of blocking in [Store.WaitForResponse].

Use this when the tool runs inside an executor that models human interaction as an interrupt/resume cycle (e.g. Temporal workflows). The returned InterruptError carries the request ID and a ClarificationEvent payload that the executor can use to register a signal wait.

Default: blocking (the current in-process behaviour).

type ToolProvider

type ToolProvider struct {
	// contains filtered or unexported fields
}

ToolProvider wraps a clarify Store and EventEmitter and satisfies the tools.ToolProviders interface so the clarify tool can be passed directly to tools.NewRegistry.

func NewToolProvider

func NewToolProvider(store Store, emitter EventEmitter, opts ...ToolOption) *ToolProvider

NewToolProvider creates a ToolProvider for the ask_clarifying_question tool.

func (*ToolProvider) GetTools

func (p *ToolProvider) GetTools() []tool.Tool

GetTools returns the clarify tool.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL