Documentation
¶
Overview ¶
Package workflow implements multi-step question-answering workflows. The workflow breaks the process into discrete steps that vary by version.
For v1 workflow:
import (
"github.com/malbeclabs/lake/agent/pkg/workflow"
v1 "github.com/malbeclabs/lake/agent/pkg/workflow/v1"
)
prompts, _ := v1.LoadPrompts()
p, _ := v1.New(&workflow.Config{Prompts: prompts, ...})
Index ¶
- func ContextWithWorkflowIDs(ctx context.Context, sessionID, workflowID string) context.Context
- func FormatValue(v any) string
- func SanitizeRows(rows []map[string]any)
- func SessionIDFromContext(ctx context.Context) (string, bool)
- func WorkflowIDFromContext(ctx context.Context) (string, bool)
- type AnthropicLLMClient
- type Classification
- type ClassifyResult
- type CompleteOption
- type CompleteOptions
- type Config
- type ConversationMessage
- type DataQuestion
- type ExecutedQuery
- type GeneratedQuery
- type HTTPQuerier
- type HTTPSchemaFetcher
- type LLMClient
- type Progress
- type ProgressCallback
- type ProgressStage
- type PromptsProvider
- type Querier
- type QueryResult
- type Runner
- type SchemaFetcher
- type ToolCallInfo
- type ToolContentBlock
- type ToolDefinition
- type ToolLLMClient
- type ToolLLMResponse
- type ToolMessage
- type WorkflowResult
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ContextWithWorkflowIDs ¶
ContextWithWorkflowIDs adds session and workflow IDs to a context for tracing.
func FormatValue ¶
FormatValue formats a single value for display to the LLM. Pointer types are dereferenced to get the actual value - this is critical for ClickHouse Decimal types which are scanned as pointers. This is exported so it can be used by other packages (api/handlers, slack, etc).
func SanitizeRows ¶
SanitizeRows replaces NaN and Inf float values with nil to ensure JSON serialization works. ClickHouse can return NaN for operations like division by zero, but JSON doesn't support NaN.
func SessionIDFromContext ¶
SessionIDFromContext extracts the session ID from context, if present.
Types ¶
type AnthropicLLMClient ¶
type AnthropicLLMClient struct {
// contains filtered or unexported fields
}
AnthropicLLMClient implements LLMClient using the Anthropic API.
func NewAnthropicLLMClient ¶
func NewAnthropicLLMClient(model anthropic.Model, maxTokens int64) *AnthropicLLMClient
NewAnthropicLLMClient creates a new Anthropic-based LLM client.
func NewAnthropicLLMClientWithName ¶
func NewAnthropicLLMClientWithName(model anthropic.Model, maxTokens int64, name string) *AnthropicLLMClient
NewAnthropicLLMClientWithName creates a new Anthropic-based LLM client with a custom name for logging.
func (*AnthropicLLMClient) Complete ¶
func (c *AnthropicLLMClient) Complete(ctx context.Context, systemPrompt, userPrompt string, opts ...CompleteOption) (string, error)
Complete sends a prompt to Claude and returns the response text.
func (*AnthropicLLMClient) CompleteWithTools ¶
func (c *AnthropicLLMClient) CompleteWithTools( ctx context.Context, systemPrompt string, messages []ToolMessage, tools []ToolDefinition, opts ...CompleteOption, ) (*ToolLLMResponse, error)
CompleteWithTools sends a request with tools and returns a response that may contain tool calls. Implements ToolLLMClient interface.
type Classification ¶
type Classification string
Classification represents the type of question being asked.
const ( ClassificationDataAnalysis Classification = "data_analysis" ClassificationConversational Classification = "conversational" ClassificationOutOfScope Classification = "out_of_scope" )
type ClassifyResult ¶
type ClassifyResult struct {
Classification Classification `json:"classification"`
Reasoning string `json:"reasoning"`
DirectResponse string `json:"direct_response,omitempty"`
}
ClassifyResult holds the result of question classification.
type CompleteOption ¶
type CompleteOption func(*CompleteOptions)
CompleteOption is a functional option for Complete.
func WithCacheControl ¶
func WithCacheControl() CompleteOption
WithCacheControl enables prompt caching for the system prompt. This marks the system prompt as cacheable, reducing costs for repeated calls with the same system prompt prefix.
type CompleteOptions ¶
type CompleteOptions struct {
CacheSystemPrompt bool // Enable prompt caching for the system prompt
}
CompleteOptions holds options for LLM completion.
type Config ¶
type Config struct {
Logger *slog.Logger
LLM LLMClient
FollowUpLLM LLMClient // Optional LLM for generating follow-up questions (defaults to LLM if nil)
Querier Querier
SchemaFetcher SchemaFetcher
Prompts PromptsProvider
MaxTokens int64
MaxRetries int // Max retries for failed queries (default 5)
FormatContext string // Optional formatting context to append to synthesize/respond prompts (e.g., Slack formatting guidelines)
EnvContext string // Optional environment context to prepend to system prompt (e.g., "You are querying the devnet environment.")
// Graph database support (optional)
GraphQuerier Querier // Optional Neo4j querier for execute_cypher tool
GraphSchemaFetcher SchemaFetcher // Optional Neo4j schema fetcher
}
Config holds the configuration for the workflow.
type ConversationMessage ¶
type ConversationMessage struct {
Role string // "user" or "assistant"
Content string
ExecutedQueries []string // SQL queries executed in this turn (assistant only)
}
ConversationMessage represents a message in conversation history.
type DataQuestion ¶
type DataQuestion struct {
Question string // The data question in natural language
Rationale string // Why this question helps answer the user's query
}
DataQuestion represents a single data question to be answered.
type ExecutedQuery ¶
type ExecutedQuery struct {
GeneratedQuery GeneratedQuery
Result QueryResult
}
ExecutedQuery represents an executed query with results.
type GeneratedQuery ¶
type GeneratedQuery struct {
DataQuestion DataQuestion
SQL string // The SQL query text (empty for Cypher queries)
Cypher string // The Cypher query text (empty for SQL queries)
Explanation string // Brief explanation of what the query does
}
GeneratedQuery represents a query generated for a data question.
func (GeneratedQuery) IsCypher ¶
func (q GeneratedQuery) IsCypher() bool
IsCypher returns true if this is a Cypher query.
func (GeneratedQuery) QueryText ¶
func (q GeneratedQuery) QueryText() string
QueryText returns the query text regardless of type.
type HTTPQuerier ¶
type HTTPQuerier struct {
// contains filtered or unexported fields
}
HTTPQuerier implements Querier using HTTP calls to ClickHouse.
func NewHTTPQuerier ¶
func NewHTTPQuerier(clickhouseURL string) *HTTPQuerier
NewHTTPQuerier creates a new HTTP-based querier.
func (*HTTPQuerier) Query ¶
func (q *HTTPQuerier) Query(ctx context.Context, sql string) (QueryResult, error)
Query executes a SQL query and returns the result.
type HTTPSchemaFetcher ¶
type HTTPSchemaFetcher struct {
ClickhouseURL string
Database string // defaults to "default" if empty
Username string // optional
Password string // optional
}
HTTPSchemaFetcher fetches schema from ClickHouse via HTTP.
func NewHTTPSchemaFetcher ¶
func NewHTTPSchemaFetcher(clickhouseURL string) *HTTPSchemaFetcher
NewHTTPSchemaFetcher creates a new HTTPSchemaFetcher.
func NewHTTPSchemaFetcherWithAuth ¶
func NewHTTPSchemaFetcherWithAuth(clickhouseURL, database, username, password string) *HTTPSchemaFetcher
NewHTTPSchemaFetcherWithAuth creates a new HTTPSchemaFetcher with authentication.
func (*HTTPSchemaFetcher) FetchSchema ¶
func (f *HTTPSchemaFetcher) FetchSchema(ctx context.Context) (string, error)
FetchSchema retrieves table columns and view definitions from ClickHouse.
type LLMClient ¶
type LLMClient interface {
// Complete sends a prompt and returns the response text.
// Options can be passed to control caching behavior.
Complete(ctx context.Context, systemPrompt, userPrompt string, opts ...CompleteOption) (string, error)
}
LLMClient is the interface for interacting with an LLM.
type Progress ¶
type Progress struct {
Stage ProgressStage
Classification Classification // Set after classifying
DataQuestions []DataQuestion // Set after decomposing
QueriesTotal int // Total queries to execute
QueriesDone int // Queries completed so far
Error error // Set if an error occurred
// v3 fields
ThinkingContent string // For StageThinking: the thinking content
// SQL tool call fields
SQLQuestion string // For StageSQLStarted/StageSQLComplete: the data question
SQL string // For StageSQLStarted/StageSQLComplete: the SQL query
SQLRows int // For StageSQLComplete: row count
SQLError string // For StageSQLComplete: error if failed
// Cypher tool call fields
CypherQuestion string // For StageCypherStarted/StageCypherComplete: the data question
Cypher string // For StageCypherStarted/StageCypherComplete: the Cypher query
CypherRows int // For StageCypherComplete: row count
CypherError string // For StageCypherComplete: error if failed
// ReadDocs tool call fields
DocsPage string // For StageReadDocsStarted/StageReadDocsComplete: page name
DocsContent string // For StageReadDocsComplete: content (truncated for progress)
DocsError string // For StageReadDocsComplete: error if failed
// Legacy fields (for backwards compatibility)
QueryQuestion string // For StageQueryStarted/StageQueryComplete: the query question
QuerySQL string // For StageQueryStarted/StageQueryComplete: the SQL
QueryRows int // For StageQueryComplete: row count
QueryError string // For StageQueryComplete: error if failed
}
Progress represents the current state of workflow execution.
type ProgressCallback ¶
type ProgressCallback func(Progress)
ProgressCallback is called at each stage of workflow execution.
type ProgressStage ¶
type ProgressStage string
ProgressStage represents a stage in the workflow execution.
const ( // v1 stages StageClassifying ProgressStage = "classifying" StageDecomposing ProgressStage = "decomposing" StageDecomposed ProgressStage = "decomposed" StageExecuting ProgressStage = "executing" StageSynthesizing ProgressStage = "synthesizing" StageComplete ProgressStage = "complete" StageError ProgressStage = "error" // v2 stages StageInterpreting ProgressStage = "interpreting" StageMapping ProgressStage = "mapping" StagePlanning ProgressStage = "planning" StageInspecting ProgressStage = "inspecting" // v3 stages StageThinking ProgressStage = "thinking" // Model is reasoning // Tool call stages - SQL StageSQLStarted ProgressStage = "sql_started" // SQL query started StageSQLComplete ProgressStage = "sql_done" // SQL query completed // Tool call stages - Cypher StageCypherStarted ProgressStage = "cypher_started" // Cypher query started StageCypherComplete ProgressStage = "cypher_done" // Cypher query completed // Tool call stages - ReadDocs StageReadDocsStarted ProgressStage = "read_docs_started" // Reading docs started StageReadDocsComplete ProgressStage = "read_docs_done" // Reading docs completed // Legacy v3 stages (for backwards compatibility during transition) StageQueryStarted ProgressStage = "query_started" // Individual query started StageQueryComplete ProgressStage = "query_done" // Individual query completed )
type PromptsProvider ¶
type PromptsProvider interface {
// GetPrompt returns the prompt content for the given name.
GetPrompt(name string) string
}
PromptsProvider provides access to prompt templates.
type Querier ¶
type Querier interface {
// Query executes a SQL query and returns formatted results.
Query(ctx context.Context, sql string) (QueryResult, error)
}
Querier executes SQL queries.
type QueryResult ¶
type QueryResult struct {
SQL string // The SQL query text (empty for Cypher queries)
Cypher string // The Cypher query text (empty for SQL queries)
Columns []string
Rows []map[string]any
Count int
Error string
Formatted string // Human-readable formatted result
}
QueryResult holds the result of a query execution.
func (QueryResult) QueryText ¶
func (r QueryResult) QueryText() string
QueryText returns the query text regardless of type.
type Runner ¶
type Runner interface {
// Run executes the workflow for a user question.
Run(ctx context.Context, userQuestion string) (*WorkflowResult, error)
// RunWithHistory executes the workflow with conversation context.
RunWithHistory(ctx context.Context, userQuestion string, history []ConversationMessage) (*WorkflowResult, error)
// RunWithProgress executes the workflow with progress callbacks for streaming updates.
RunWithProgress(ctx context.Context, userQuestion string, history []ConversationMessage, onProgress ProgressCallback) (*WorkflowResult, error)
}
Runner is the interface for workflow implementations. It provides methods for running the workflow with varying levels of control.
type SchemaFetcher ¶
type SchemaFetcher interface {
// FetchSchema returns a formatted string describing the database schema.
FetchSchema(ctx context.Context) (string, error)
}
SchemaFetcher retrieves database schema information.
type ToolCallInfo ¶
ToolCallInfo represents a tool invocation from the LLM.
type ToolContentBlock ¶
type ToolContentBlock struct {
Type string `json:"type"` // "text", "tool_use", "tool_result"
Text string `json:"text,omitempty"`
ID string `json:"id,omitempty"` // For tool_use
Name string `json:"name,omitempty"` // For tool_use
Input map[string]any `json:"input,omitempty"` // For tool_use
ToolUseID string `json:"tool_use_id,omitempty"` // For tool_result
Content string `json:"content,omitempty"` // For tool_result
IsError bool `json:"is_error,omitempty"` // For tool_result
}
ToolContentBlock represents a block of content in a tool message.
type ToolDefinition ¶
type ToolDefinition struct {
Name string `json:"name"`
Description string `json:"description"`
InputSchema any `json:"input_schema"` // JSON Schema for parameters
}
ToolDefinition represents a tool that can be called by the LLM.
type ToolLLMClient ¶
type ToolLLMClient interface {
LLMClient
// CompleteWithTools sends a request with tools and returns a response that may contain tool calls.
// The systemPrompt is separate to enable prompt caching.
// Returns the response, input tokens, output tokens, and any error.
CompleteWithTools(
ctx context.Context,
systemPrompt string,
messages []ToolMessage,
tools []ToolDefinition,
opts ...CompleteOption,
) (*ToolLLMResponse, error)
}
ToolLLMClient extends LLMClient with tool-calling capabilities. Used by v3 workflow for agentic tool loops.
type ToolLLMResponse ¶
type ToolLLMResponse struct {
StopReason string // "end_turn" or "tool_use"
Content []ToolContentBlock // May include both text and tool_use blocks
InputTokens int
OutputTokens int
}
ToolLLMResponse represents the response from a tool-calling LLM request.
func (*ToolLLMResponse) HasToolCalls ¶
func (r *ToolLLMResponse) HasToolCalls() bool
HasToolCalls returns true if the response contains tool calls. We check the actual content blocks instead of StopReason because the API can sometimes return tool_use blocks with stop_reason="end_turn".
func (*ToolLLMResponse) Text ¶
func (r *ToolLLMResponse) Text() string
Text extracts text content from the response.
func (*ToolLLMResponse) ToolCalls ¶
func (r *ToolLLMResponse) ToolCalls() []ToolCallInfo
ToolCalls extracts tool calls from the response.
type ToolMessage ¶
type ToolMessage struct {
Role string `json:"role"` // "user" or "assistant"
Content []ToolContentBlock `json:"content"`
}
ToolMessage represents a message in a tool-calling conversation.
type WorkflowResult ¶
type WorkflowResult struct {
// Input
UserQuestion string
// Pre-step: Classification
Classification Classification // How the question was classified
// Step 1: Decomposition (only for data_analysis)
DataQuestions []DataQuestion
// Step 2: Generation (only for data_analysis)
GeneratedQueries []GeneratedQuery
// Step 3: Execution (only for data_analysis)
ExecutedQueries []ExecutedQuery
// Step 4: Synthesis / Response
Answer string
// Step 5: Follow-up suggestions (optional)
FollowUpQuestions []string
}
WorkflowResult holds the complete result of running the workflow.