workflow

package
v0.0.0-...-b6e738b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 27, 2026 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Overview

Package workflow implements multi-step question-answering workflows. The workflow breaks the process into discrete steps that vary by version.

For v1 workflow:

import (
    "github.com/malbeclabs/lake/agent/pkg/workflow"
    v1 "github.com/malbeclabs/lake/agent/pkg/workflow/v1"
)

prompts, _ := v1.LoadPrompts()
p, _ := v1.New(&workflow.Config{Prompts: prompts, ...})

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ContextWithWorkflowIDs

func ContextWithWorkflowIDs(ctx context.Context, sessionID, workflowID string) context.Context

ContextWithWorkflowIDs adds session and workflow IDs to a context for tracing.

func FormatValue

func FormatValue(v any) string

FormatValue formats a single value for display to the LLM. Pointer types are dereferenced to get the actual value - this is critical for ClickHouse Decimal types which are scanned as pointers. This is exported so it can be used by other packages (api/handlers, slack, etc).

func SanitizeRows

func SanitizeRows(rows []map[string]any)

SanitizeRows replaces NaN and Inf float values with nil to ensure JSON serialization works. ClickHouse can return NaN for operations like division by zero, but JSON doesn't support NaN.

func SessionIDFromContext

func SessionIDFromContext(ctx context.Context) (string, bool)

SessionIDFromContext extracts the session ID from context, if present.

func WorkflowIDFromContext

func WorkflowIDFromContext(ctx context.Context) (string, bool)

WorkflowIDFromContext extracts the workflow ID from context, if present.

Types

type AnthropicLLMClient

type AnthropicLLMClient struct {
	// contains filtered or unexported fields
}

AnthropicLLMClient implements LLMClient using the Anthropic API.

func NewAnthropicLLMClient

func NewAnthropicLLMClient(model anthropic.Model, maxTokens int64) *AnthropicLLMClient

NewAnthropicLLMClient creates a new Anthropic-based LLM client.

func NewAnthropicLLMClientWithName

func NewAnthropicLLMClientWithName(model anthropic.Model, maxTokens int64, name string) *AnthropicLLMClient

NewAnthropicLLMClientWithName creates a new Anthropic-based LLM client with a custom name for logging.

func (*AnthropicLLMClient) Complete

func (c *AnthropicLLMClient) Complete(ctx context.Context, systemPrompt, userPrompt string, opts ...CompleteOption) (string, error)

Complete sends a prompt to Claude and returns the response text.

func (*AnthropicLLMClient) CompleteWithTools

func (c *AnthropicLLMClient) CompleteWithTools(
	ctx context.Context,
	systemPrompt string,
	messages []ToolMessage,
	tools []ToolDefinition,
	opts ...CompleteOption,
) (*ToolLLMResponse, error)

CompleteWithTools sends a request with tools and returns a response that may contain tool calls. Implements ToolLLMClient interface.

type Classification

type Classification string

Classification represents the type of question being asked.

const (
	ClassificationDataAnalysis   Classification = "data_analysis"
	ClassificationConversational Classification = "conversational"
	ClassificationOutOfScope     Classification = "out_of_scope"
)

type ClassifyResult

type ClassifyResult struct {
	Classification Classification `json:"classification"`
	Reasoning      string         `json:"reasoning"`
	DirectResponse string         `json:"direct_response,omitempty"`
}

ClassifyResult holds the result of question classification.

type CompleteOption

type CompleteOption func(*CompleteOptions)

CompleteOption is a functional option for Complete.

func WithCacheControl

func WithCacheControl() CompleteOption

WithCacheControl enables prompt caching for the system prompt. This marks the system prompt as cacheable, reducing costs for repeated calls with the same system prompt prefix.

type CompleteOptions

type CompleteOptions struct {
	CacheSystemPrompt bool // Enable prompt caching for the system prompt
}

CompleteOptions holds options for LLM completion.

type Config

type Config struct {
	Logger        *slog.Logger
	LLM           LLMClient
	FollowUpLLM   LLMClient // Optional LLM for generating follow-up questions (defaults to LLM if nil)
	Querier       Querier
	SchemaFetcher SchemaFetcher
	Prompts       PromptsProvider
	MaxTokens     int64
	MaxRetries    int    // Max retries for failed queries (default 5)
	FormatContext string // Optional formatting context to append to synthesize/respond prompts (e.g., Slack formatting guidelines)
	EnvContext    string // Optional environment context to prepend to system prompt (e.g., "You are querying the devnet environment.")

	// Graph database support (optional)
	GraphQuerier       Querier       // Optional Neo4j querier for execute_cypher tool
	GraphSchemaFetcher SchemaFetcher // Optional Neo4j schema fetcher
}

Config holds the configuration for the workflow.

type ConversationMessage

type ConversationMessage struct {
	Role            string // "user" or "assistant"
	Content         string
	ExecutedQueries []string // SQL queries executed in this turn (assistant only)
}

ConversationMessage represents a message in conversation history.

type DataQuestion

type DataQuestion struct {
	Question  string // The data question in natural language
	Rationale string // Why this question helps answer the user's query
}

DataQuestion represents a single data question to be answered.

type ExecutedQuery

type ExecutedQuery struct {
	GeneratedQuery GeneratedQuery
	Result         QueryResult
}

ExecutedQuery represents an executed query with results.

type GeneratedQuery

type GeneratedQuery struct {
	DataQuestion DataQuestion
	SQL          string // The SQL query text (empty for Cypher queries)
	Cypher       string // The Cypher query text (empty for SQL queries)
	Explanation  string // Brief explanation of what the query does
}

GeneratedQuery represents a query generated for a data question.

func (GeneratedQuery) IsCypher

func (q GeneratedQuery) IsCypher() bool

IsCypher returns true if this is a Cypher query.

func (GeneratedQuery) QueryText

func (q GeneratedQuery) QueryText() string

QueryText returns the query text regardless of type.

type HTTPQuerier

type HTTPQuerier struct {
	// contains filtered or unexported fields
}

HTTPQuerier implements Querier using HTTP calls to ClickHouse.

func NewHTTPQuerier

func NewHTTPQuerier(clickhouseURL string) *HTTPQuerier

NewHTTPQuerier creates a new HTTP-based querier.

func (*HTTPQuerier) Query

func (q *HTTPQuerier) Query(ctx context.Context, sql string) (QueryResult, error)

Query executes a SQL query and returns the result.

type HTTPSchemaFetcher

type HTTPSchemaFetcher struct {
	ClickhouseURL string
	Database      string // defaults to "default" if empty
	Username      string // optional
	Password      string // optional
}

HTTPSchemaFetcher fetches schema from ClickHouse via HTTP.

func NewHTTPSchemaFetcher

func NewHTTPSchemaFetcher(clickhouseURL string) *HTTPSchemaFetcher

NewHTTPSchemaFetcher creates a new HTTPSchemaFetcher.

func NewHTTPSchemaFetcherWithAuth

func NewHTTPSchemaFetcherWithAuth(clickhouseURL, database, username, password string) *HTTPSchemaFetcher

NewHTTPSchemaFetcherWithAuth creates a new HTTPSchemaFetcher with authentication.

func (*HTTPSchemaFetcher) FetchSchema

func (f *HTTPSchemaFetcher) FetchSchema(ctx context.Context) (string, error)

FetchSchema retrieves table columns and view definitions from ClickHouse.

type LLMClient

type LLMClient interface {
	// Complete sends a prompt and returns the response text.
	// Options can be passed to control caching behavior.
	Complete(ctx context.Context, systemPrompt, userPrompt string, opts ...CompleteOption) (string, error)
}

LLMClient is the interface for interacting with an LLM.

type Progress

type Progress struct {
	Stage          ProgressStage
	Classification Classification // Set after classifying
	DataQuestions  []DataQuestion // Set after decomposing
	QueriesTotal   int            // Total queries to execute
	QueriesDone    int            // Queries completed so far
	Error          error          // Set if an error occurred

	// v3 fields
	ThinkingContent string // For StageThinking: the thinking content

	// SQL tool call fields
	SQLQuestion string // For StageSQLStarted/StageSQLComplete: the data question
	SQL         string // For StageSQLStarted/StageSQLComplete: the SQL query
	SQLRows     int    // For StageSQLComplete: row count
	SQLError    string // For StageSQLComplete: error if failed

	// Cypher tool call fields
	CypherQuestion string // For StageCypherStarted/StageCypherComplete: the data question
	Cypher         string // For StageCypherStarted/StageCypherComplete: the Cypher query
	CypherRows     int    // For StageCypherComplete: row count
	CypherError    string // For StageCypherComplete: error if failed

	// ReadDocs tool call fields
	DocsPage    string // For StageReadDocsStarted/StageReadDocsComplete: page name
	DocsContent string // For StageReadDocsComplete: content (truncated for progress)
	DocsError   string // For StageReadDocsComplete: error if failed

	// Legacy fields (for backwards compatibility)
	QueryQuestion string // For StageQueryStarted/StageQueryComplete: the query question
	QuerySQL      string // For StageQueryStarted/StageQueryComplete: the SQL
	QueryRows     int    // For StageQueryComplete: row count
	QueryError    string // For StageQueryComplete: error if failed
}

Progress represents the current state of workflow execution.

type ProgressCallback

type ProgressCallback func(Progress)

ProgressCallback is called at each stage of workflow execution.

type ProgressStage

type ProgressStage string

ProgressStage represents a stage in the workflow execution.

const (
	// v1 stages
	StageClassifying  ProgressStage = "classifying"
	StageDecomposing  ProgressStage = "decomposing"
	StageDecomposed   ProgressStage = "decomposed"
	StageExecuting    ProgressStage = "executing"
	StageSynthesizing ProgressStage = "synthesizing"
	StageComplete     ProgressStage = "complete"
	StageError        ProgressStage = "error"

	// v2 stages
	StageInterpreting ProgressStage = "interpreting"
	StageMapping      ProgressStage = "mapping"
	StagePlanning     ProgressStage = "planning"
	StageInspecting   ProgressStage = "inspecting"

	// v3 stages
	StageThinking ProgressStage = "thinking" // Model is reasoning

	// Tool call stages - SQL
	StageSQLStarted  ProgressStage = "sql_started" // SQL query started
	StageSQLComplete ProgressStage = "sql_done"    // SQL query completed

	// Tool call stages - Cypher
	StageCypherStarted  ProgressStage = "cypher_started" // Cypher query started
	StageCypherComplete ProgressStage = "cypher_done"    // Cypher query completed

	// Tool call stages - ReadDocs
	StageReadDocsStarted  ProgressStage = "read_docs_started" // Reading docs started
	StageReadDocsComplete ProgressStage = "read_docs_done"    // Reading docs completed

	// Legacy v3 stages (for backwards compatibility during transition)
	StageQueryStarted  ProgressStage = "query_started" // Individual query started
	StageQueryComplete ProgressStage = "query_done"    // Individual query completed
)

type PromptsProvider

type PromptsProvider interface {
	// GetPrompt returns the prompt content for the given name.
	GetPrompt(name string) string
}

PromptsProvider provides access to prompt templates.

type Querier

type Querier interface {
	// Query executes a SQL query and returns formatted results.
	Query(ctx context.Context, sql string) (QueryResult, error)
}

Querier executes SQL queries.

type QueryResult

type QueryResult struct {
	SQL       string // The SQL query text (empty for Cypher queries)
	Cypher    string // The Cypher query text (empty for SQL queries)
	Columns   []string
	Rows      []map[string]any
	Count     int
	Error     string
	Formatted string // Human-readable formatted result
}

QueryResult holds the result of a query execution.

func (QueryResult) QueryText

func (r QueryResult) QueryText() string

QueryText returns the query text regardless of type.

type Runner

type Runner interface {
	// Run executes the workflow for a user question.
	Run(ctx context.Context, userQuestion string) (*WorkflowResult, error)

	// RunWithHistory executes the workflow with conversation context.
	RunWithHistory(ctx context.Context, userQuestion string, history []ConversationMessage) (*WorkflowResult, error)

	// RunWithProgress executes the workflow with progress callbacks for streaming updates.
	RunWithProgress(ctx context.Context, userQuestion string, history []ConversationMessage, onProgress ProgressCallback) (*WorkflowResult, error)
}

Runner is the interface for workflow implementations. It provides methods for running the workflow with varying levels of control.

type SchemaFetcher

type SchemaFetcher interface {
	// FetchSchema returns a formatted string describing the database schema.
	FetchSchema(ctx context.Context) (string, error)
}

SchemaFetcher retrieves database schema information.

type ToolCallInfo

type ToolCallInfo struct {
	ID         string
	Name       string
	Parameters map[string]any
}

ToolCallInfo represents a tool invocation from the LLM.

type ToolContentBlock

type ToolContentBlock struct {
	Type      string         `json:"type"` // "text", "tool_use", "tool_result"
	Text      string         `json:"text,omitempty"`
	ID        string         `json:"id,omitempty"`          // For tool_use
	Name      string         `json:"name,omitempty"`        // For tool_use
	Input     map[string]any `json:"input,omitempty"`       // For tool_use
	ToolUseID string         `json:"tool_use_id,omitempty"` // For tool_result
	Content   string         `json:"content,omitempty"`     // For tool_result
	IsError   bool           `json:"is_error,omitempty"`    // For tool_result
}

ToolContentBlock represents a block of content in a tool message.

type ToolDefinition

type ToolDefinition struct {
	Name        string `json:"name"`
	Description string `json:"description"`
	InputSchema any    `json:"input_schema"` // JSON Schema for parameters
}

ToolDefinition represents a tool that can be called by the LLM.

type ToolLLMClient

type ToolLLMClient interface {
	LLMClient

	// CompleteWithTools sends a request with tools and returns a response that may contain tool calls.
	// The systemPrompt is separate to enable prompt caching.
	// Returns the response, input tokens, output tokens, and any error.
	CompleteWithTools(
		ctx context.Context,
		systemPrompt string,
		messages []ToolMessage,
		tools []ToolDefinition,
		opts ...CompleteOption,
	) (*ToolLLMResponse, error)
}

ToolLLMClient extends LLMClient with tool-calling capabilities. Used by v3 workflow for agentic tool loops.

type ToolLLMResponse

type ToolLLMResponse struct {
	StopReason   string             // "end_turn" or "tool_use"
	Content      []ToolContentBlock // May include both text and tool_use blocks
	InputTokens  int
	OutputTokens int
}

ToolLLMResponse represents the response from a tool-calling LLM request.

func (*ToolLLMResponse) HasToolCalls

func (r *ToolLLMResponse) HasToolCalls() bool

HasToolCalls returns true if the response contains tool calls. We check the actual content blocks instead of StopReason because the API can sometimes return tool_use blocks with stop_reason="end_turn".

func (*ToolLLMResponse) Text

func (r *ToolLLMResponse) Text() string

Text extracts text content from the response.

func (*ToolLLMResponse) ToolCalls

func (r *ToolLLMResponse) ToolCalls() []ToolCallInfo

ToolCalls extracts tool calls from the response.

type ToolMessage

type ToolMessage struct {
	Role    string             `json:"role"` // "user" or "assistant"
	Content []ToolContentBlock `json:"content"`
}

ToolMessage represents a message in a tool-calling conversation.

type WorkflowResult

type WorkflowResult struct {
	// Input
	UserQuestion string

	// Pre-step: Classification
	Classification Classification // How the question was classified

	// Step 1: Decomposition (only for data_analysis)
	DataQuestions []DataQuestion

	// Step 2: Generation (only for data_analysis)
	GeneratedQueries []GeneratedQuery

	// Step 3: Execution (only for data_analysis)
	ExecutedQueries []ExecutedQuery

	// Step 4: Synthesis / Response
	Answer string

	// Step 5: Follow-up suggestions (optional)
	FollowUpQuestions []string
}

WorkflowResult holds the complete result of running the workflow.

Directories

Path Synopsis
v3

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL