agentpg

package module
v0.3.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 6, 2026 License: MPL-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultMaxConcurrentRuns          = 10
	DefaultMaxConcurrentStreamingRuns = 5 // Lower because streaming holds connections
	DefaultMaxConcurrentTools         = 50
	DefaultBatchPollInterval          = 30 * time.Second
	DefaultRunPollInterval            = 1 * time.Second
	DefaultToolPollInterval           = 500 * time.Millisecond
	DefaultHeartbeatInterval          = 15 * time.Second
	DefaultLeaderTTL                  = 30 * time.Second
	DefaultStuckRunTimeout            = 5 * time.Minute
	DefaultInstanceTTL                = 2 * time.Minute
	DefaultCleanupInterval            = 1 * time.Minute
	DefaultMaxToolRetries             = 3

	// Default tool retry configuration
	DefaultToolRetryMaxAttempts = 2   // Fast default: 2 attempts total (1 retry)
	DefaultToolRetryJitter      = 0.0 // No jitter by default for instant retry

	// Default run rescue configuration
	DefaultRescueInterval    = 1 * time.Minute
	DefaultRescueTimeout     = 5 * time.Minute // Should match StuckRunTimeout
	DefaultMaxRescueAttempts = 3
)

Default configuration values.

View Source
const (
	ContentTypeText            = "text"
	ContentTypeToolUse         = "tool_use"
	ContentTypeToolResult      = "tool_result"
	ContentTypeImage           = "image"
	ContentTypeDocument        = "document"
	ContentTypeThinking        = "thinking"
	ContentTypeServerToolUse   = "server_tool_use"
	ContentTypeWebSearchResult = "web_search_result"
)

ContentType constants aligned with Claude API and database schema (agentpg_content_type enum).

View Source
const (
	TriggerTypeUserPrompt   = "user_prompt"
	TriggerTypeToolResults  = "tool_results"
	TriggerTypeContinuation = "continuation"
)

TriggerType constants for iteration triggers.

View Source
const (
	ChannelRunCreated    = "agentpg_run_created"
	ChannelRunState      = "agentpg_run_state"
	ChannelRunFinalized  = "agentpg_run_finalized"
	ChannelToolPending   = "agentpg_tool_pending"
	ChannelToolsComplete = "agentpg_tools_complete"
)

LISTEN/NOTIFY channel names.

Variables

View Source
var (
	// Configuration errors
	ErrInvalidConfig = errors.New("invalid configuration")

	// Resource not found errors
	ErrSessionNotFound       = errors.New("session not found")
	ErrRunNotFound           = errors.New("run not found")
	ErrAgentNotFound         = errors.New("agent not found")
	ErrToolNotFound          = errors.New("tool not found")
	ErrIterationNotFound     = errors.New("iteration not found")
	ErrToolExecutionNotFound = errors.New("tool execution not found")

	// Registration errors
	ErrAgentNotRegistered = errors.New("agent not registered on this client")
	ErrToolNotRegistered  = errors.New("tool not registered on this client")

	// Client lifecycle errors
	ErrClientNotStarted     = errors.New("client not started")
	ErrClientAlreadyStarted = errors.New("client already started")
	ErrClientStopping       = errors.New("client is stopping")

	// State errors
	ErrInvalidStateTransition = errors.New("invalid state transition")
	ErrRunAlreadyFinalized    = errors.New("run already finalized")
	ErrRunCancelled           = errors.New("run was cancelled")
	ErrRunNotCancellable      = errors.New("run is not in a cancellable state")

	// Tool errors
	ErrInvalidToolSchema   = errors.New("invalid tool schema")
	ErrToolExecutionFailed = errors.New("tool execution failed")

	// Batch API errors
	ErrBatchAPIError = errors.New("claude batch API error")
	ErrBatchExpired  = errors.New("batch expired")
	ErrBatchFailed   = errors.New("batch processing failed")

	// Storage errors
	ErrStorageError = errors.New("storage operation failed")

	// Instance errors
	ErrInstanceDisconnected = errors.New("instance disconnected")
	ErrInstanceNotFound     = errors.New("instance not found")

	// Compaction errors
	ErrCompactionFailed = errors.New("context compaction failed")
)

Sentinel errors for AgentPG operations.

Functions

func Deref added in v0.1.4

func Deref[T any](p *T) T

Deref returns the value pointed to by p, or the zero value if p is nil.

func DerefOr added in v0.1.4

func DerefOr[T any](p *T, def T) T

DerefOr returns the value pointed to by p, or the default value if p is nil.

func Ptr added in v0.1.4

func Ptr[T any](v T) *T

Ptr returns a pointer to the given value.

func WrapError added in v0.1.4

func WrapError(op string, err error) error

WrapError wraps an error with operation context. If err is nil, returns nil.

Types

type AgentDefinition added in v0.1.4

type AgentDefinition struct {
	// ID is the unique identifier (UUID primary key).
	ID uuid.UUID `json:"id,omitempty"`

	// Name is the human-readable identifier.
	Name string `json:"name"`

	// Description is shown when agent is used as a tool.
	Description string `json:"description,omitempty"`

	// Model is the Claude model ID (required), e.g., "claude-sonnet-4-5-20250929".
	Model string `json:"model"`

	// SystemPrompt defines the agent's behavior.
	SystemPrompt string `json:"system_prompt,omitempty"`

	// Tools is the list of tool names this agent can use.
	// Only tools listed here will be available to the agent.
	// Must reference tools registered via client.RegisterTool().
	Tools []string `json:"tools,omitempty"`

	// AgentIDs is the list of agent IDs this agent can delegate to.
	// Listed agents become available as tools to this agent.
	// Enables multi-level agent hierarchies (PM -> Lead -> Worker pattern).
	AgentIDs []uuid.UUID `json:"agent_ids,omitempty"`

	// MaxTokens limits response length.
	MaxTokens *int `json:"max_tokens,omitempty"`

	// Temperature controls randomness (0.0 to 1.0).
	Temperature *float64 `json:"temperature,omitempty"`

	// TopK limits token selection.
	TopK *int `json:"top_k,omitempty"`

	// TopP (nucleus sampling) limits cumulative probability.
	TopP *float64 `json:"top_p,omitempty"`

	// Metadata for multi-tenancy and filtering (tenant_id, user_id, etc.).
	Metadata map[string]any `json:"metadata,omitempty"`

	// Config holds additional settings as JSON.
	Config map[string]any `json:"config,omitempty"`

	// Timestamps (populated from database)
	CreatedAt time.Time `json:"created_at,omitempty"`
	UpdatedAt time.Time `json:"updated_at,omitempty"`
}

AgentDefinition defines an agent's configuration.

func (*AgentDefinition) ToolNames added in v0.1.4

func (a *AgentDefinition) ToolNames() []string

ToolNames returns all regular tool names available to this agent. For agent-as-tool delegation, use AgentIDs and look up agents separately.

type AgentError

type AgentError struct {
	// Op is the operation that failed (e.g., "Run", "NewSession", "ExecuteTool")
	Op string

	// Err is the underlying error
	Err error

	// SessionID is the session ID if applicable
	SessionID string

	// RunID is the run ID if applicable
	RunID string

	// Context holds additional key-value pairs for debugging
	Context map[string]any
}

AgentError provides structured error context for AgentPG operations. It wraps an underlying error with additional context including operation name, session/run IDs, and arbitrary key-value context.

func NewAgentError

func NewAgentError(op string, err error) *AgentError

NewAgentError creates a new AgentError with the given operation and underlying error.

func (*AgentError) Error

func (e *AgentError) Error() string

Error returns a formatted error message.

func (*AgentError) Unwrap

func (e *AgentError) Unwrap() error

Unwrap returns the underlying error for errors.Is/errors.As support.

func (*AgentError) WithContext

func (e *AgentError) WithContext(key string, value any) *AgentError

WithContext adds a key-value pair to the error context and returns the error for chaining.

func (*AgentError) WithRun added in v0.1.4

func (e *AgentError) WithRun(runID string) *AgentError

WithRun sets the run ID on the error and returns the error for chaining.

func (*AgentError) WithSession added in v0.1.4

func (e *AgentError) WithSession(sessionID string) *AgentError

WithSession sets the session ID on the error and returns the error for chaining.

type BatchStatus added in v0.1.4

type BatchStatus string

BatchStatus represents the processing status of a Claude Batch API request (mirrors agentpg_batch_status enum).

const (
	BatchStatusInProgress BatchStatus = "in_progress"
	BatchStatusCanceling  BatchStatus = "canceling"
	BatchStatusEnded      BatchStatus = "ended"
)

func (BatchStatus) String added in v0.1.4

func (s BatchStatus) String() string

String returns the string representation of the batch status.

type Client added in v0.1.4

type Client[TTx any] struct {
	// contains filtered or unexported fields
}

Client is the main AgentPG client that orchestrates agents, tools, and workers. The TTx type parameter represents the native transaction type for the driver (e.g., pgx.Tx for pgxv5, *sql.Tx for database/sql).

func NewClient added in v0.1.4

func NewClient[TTx any](drv driver.Driver[TTx], config *ClientConfig) (*Client[TTx], error)

NewClient creates a new AgentPG client with the given driver and configuration. Agents and tools must be registered before calling Start().

func (*Client[TTx]) CancelRun added in v0.3.1

func (c *Client[TTx]) CancelRun(ctx context.Context, runID uuid.UUID) error

CancelRun cancels a running or pending run and all its child runs. If the run is already in a terminal state, this is a no-op (idempotent). Any in-progress batch API requests will be cancelled asynchronously. WaitForRun callers will be unblocked with a cancelled error.

func (*Client[TTx]) Compact added in v0.1.4

func (c *Client[TTx]) Compact(ctx context.Context, sessionID uuid.UUID) (*compaction.Result, error)

Compact performs context compaction on the specified session. This replaces older messages with a structured summary to reduce context size while preserving essential information.

Use NeedsCompaction to check if compaction is needed before calling this method.

func (*Client[TTx]) CompactIfNeeded added in v0.1.4

func (c *Client[TTx]) CompactIfNeeded(ctx context.Context, sessionID uuid.UUID) (*compaction.Result, error)

CompactIfNeeded performs compaction only if the session exceeds the trigger threshold. Returns nil result if compaction was not needed. This is useful for automatic compaction after runs complete.

func (*Client[TTx]) CompactWithConfig added in v0.1.4

func (c *Client[TTx]) CompactWithConfig(ctx context.Context, sessionID uuid.UUID, cfg *compaction.Config) (*compaction.Result, error)

CompactWithConfig performs context compaction with a custom configuration. This allows overriding the default compaction settings for a specific operation.

func (*Client[TTx]) Config added in v0.1.4

func (c *Client[TTx]) Config() *ClientConfig

Config returns the client configuration.

func (*Client[TTx]) CreateAgent added in v0.1.4

func (c *Client[TTx]) CreateAgent(ctx context.Context, def *AgentDefinition) (*AgentDefinition, error)

CreateAgent creates a new agent in the database. Returns the created agent with its ID populated.

func (*Client[TTx]) DeleteAgent added in v0.1.4

func (c *Client[TTx]) DeleteAgent(ctx context.Context, id uuid.UUID) error

DeleteAgent removes an agent from the database.

func (*Client[TTx]) GetAgentByID added in v0.1.4

func (c *Client[TTx]) GetAgentByID(ctx context.Context, id uuid.UUID) (*AgentDefinition, error)

GetAgentByID retrieves an agent definition by ID from the database.

func (*Client[TTx]) GetAgentByName added in v0.1.4

func (c *Client[TTx]) GetAgentByName(ctx context.Context, name string, metadata map[string]any) (*AgentDefinition, error)

GetAgentByName retrieves an agent definition by name and optional metadata from the database.

func (*Client[TTx]) GetAllToolNames added in v0.3.3

func (c *Client[TTx]) GetAllToolNames() []string

GetAllToolNames returns the names of all registered tools.

func (*Client[TTx]) GetCompactionStats added in v0.1.4

func (c *Client[TTx]) GetCompactionStats(ctx context.Context, sessionID uuid.UUID) (*compaction.Stats, error)

GetCompactionStats returns statistics about a session's compaction state. This includes token counts, usage percentage, and whether compaction is needed.

func (*Client[TTx]) GetIterationToolCalls added in v0.3.4

func (c *Client[TTx]) GetIterationToolCalls(ctx context.Context, iterationID uuid.UUID) ([]ToolCall, error)

GetIterationToolCalls returns tool calls for a specific iteration.

func (*Client[TTx]) GetOrCreateAgent added in v0.1.4

func (c *Client[TTx]) GetOrCreateAgent(ctx context.Context, def *AgentDefinition) (*AgentDefinition, error)

GetOrCreateAgent returns an existing agent or creates a new one if it doesn't exist. Matching is done by name and metadata (agents with the same name but different metadata are distinct). If the agent exists, the existing agent is returned (the definition is NOT updated). If you need to update an existing agent, use UpdateAgent instead.

func (*Client[TTx]) GetRun added in v0.1.4

func (c *Client[TTx]) GetRun(ctx context.Context, id uuid.UUID) (*Run, error)

GetRun retrieves a run by ID.

func (*Client[TTx]) GetRunToolCalls added in v0.3.4

func (c *Client[TTx]) GetRunToolCalls(ctx context.Context, runID uuid.UUID) ([]ToolCall, error)

GetRunToolCalls returns all tool calls for a run, ordered by creation time.

func (*Client[TTx]) GetSession added in v0.1.4

func (c *Client[TTx]) GetSession(ctx context.Context, id uuid.UUID) (*Session, error)

GetSession retrieves a session by ID.

func (*Client[TTx]) GetTool added in v0.1.4

func (c *Client[TTx]) GetTool(name string) tool.Tool

GetTool returns the registered tool by name.

func (*Client[TTx]) InstanceID added in v0.1.4

func (c *Client[TTx]) InstanceID() string

InstanceID returns the unique identifier for this client instance.

func (*Client[TTx]) ListAgents added in v0.1.4

func (c *Client[TTx]) ListAgents(ctx context.Context, metadata map[string]any, limit, offset int) ([]*AgentDefinition, int, error)

ListAgents returns agents from the database with optional filtering.

func (*Client[TTx]) NeedsCompaction added in v0.1.4

func (c *Client[TTx]) NeedsCompaction(ctx context.Context, sessionID uuid.UUID) (bool, error)

NeedsCompaction checks if a session needs compaction based on token usage. Returns true if the session's context exceeds the configured trigger threshold.

func (*Client[TTx]) NewSession added in v0.1.4

func (c *Client[TTx]) NewSession(ctx context.Context, parentSessionID *uuid.UUID, metadata map[string]any) (uuid.UUID, error)

NewSession creates a new conversation session. App-specific fields (tenant_id, user_id, etc.) should be stored in metadata.

func (*Client[TTx]) NewSessionTx added in v0.1.4

func (c *Client[TTx]) NewSessionTx(ctx context.Context, tx TTx, parentSessionID *uuid.UUID, metadata map[string]any) (uuid.UUID, error)

NewSessionTx creates a new conversation session within a transaction. App-specific fields (tenant_id, user_id, etc.) should be stored in metadata.

func (*Client[TTx]) RegenerateRun added in v0.3.1

func (c *Client[TTx]) RegenerateRun(ctx context.Context, runID uuid.UUID) (uuid.UUID, error)

RegenerateRun re-creates a run that was previously cancelled or failed. It deletes all messages from the original run and creates a new run with the same session, agent, prompt, mode, and options. The original run must be in a terminal state (completed, cancelled, or failed). Returns the new run ID.

func (*Client[TTx]) RegisterTool added in v0.1.4

func (c *Client[TTx]) RegisterTool(t tool.Tool) error

RegisterTool registers a tool with the client. Can be called before or after Start(). When called after Start(), the tool is also synced to the database immediately.

func (*Client[TTx]) Run added in v0.1.4

func (c *Client[TTx]) Run(ctx context.Context, sessionID uuid.UUID, agentID uuid.UUID, prompt string, opts *RunOptions) (uuid.UUID, error)

Run creates a new asynchronous agent run and returns immediately. Use WaitForRun to wait for completion. The agentID must reference an agent that exists in the database. Options can provide variables for tools and override/append to the agent's system prompt.

func (*Client[TTx]) RunFast added in v0.1.4

func (c *Client[TTx]) RunFast(ctx context.Context, sessionID uuid.UUID, agentID uuid.UUID, prompt string, opts *RunOptions) (uuid.UUID, error)

RunFast creates a new asynchronous agent run using the streaming API. This provides faster response times compared to the batch API. Use WaitForRun to wait for completion. The agentID must reference an agent that exists in the database. Options can provide variables for tools and override/append to the agent's system prompt.

func (*Client[TTx]) RunFastSync added in v0.1.4

func (c *Client[TTx]) RunFastSync(ctx context.Context, sessionID uuid.UUID, agentID uuid.UUID, prompt string, opts *RunOptions) (*Response, error)

RunFastSync creates a streaming run and waits for completion. This is a convenience wrapper around RunFast and WaitForRun. Note: Do not use RunFastSync inside a transaction as it will deadlock. Options can provide variables for tools and override/append to the agent's system prompt.

func (*Client[TTx]) RunFastTx added in v0.1.4

func (c *Client[TTx]) RunFastTx(ctx context.Context, tx TTx, sessionID uuid.UUID, agentID uuid.UUID, prompt string, opts *RunOptions) (uuid.UUID, error)

RunFastTx creates a new asynchronous agent run using the streaming API within a transaction. The run won't be visible to workers until the transaction commits. The agentID must reference an agent that exists in the database. Options can provide variables for tools and override/append to the agent's system prompt.

func (*Client[TTx]) RunSync added in v0.1.4

func (c *Client[TTx]) RunSync(ctx context.Context, sessionID uuid.UUID, agentID uuid.UUID, prompt string, opts *RunOptions) (*Response, error)

RunSync creates a run and waits for completion. This is a convenience wrapper around Run and WaitForRun. Note: Do not use RunSync inside a transaction as it will deadlock. Options can provide variables for tools and override/append to the agent's system prompt.

func (*Client[TTx]) RunTx added in v0.1.4

func (c *Client[TTx]) RunTx(ctx context.Context, tx TTx, sessionID uuid.UUID, agentID uuid.UUID, prompt string, opts *RunOptions) (uuid.UUID, error)

RunTx creates a new asynchronous agent run within a transaction. The run won't be visible to workers until the transaction commits. The agentID must reference an agent that exists in the database. Options can provide variables for tools and override/append to the agent's system prompt.

func (*Client[TTx]) Start added in v0.1.4

func (c *Client[TTx]) Start(ctx context.Context) error

Start initializes the client and begins background processing. This method: 1. Validates agent/tool references 2. Registers the instance in the database 3. Syncs agents and tools to the database 4. Starts background workers

func (*Client[TTx]) Stop added in v0.1.4

func (c *Client[TTx]) Stop(ctx context.Context) error

Stop gracefully shuts down the client.

func (*Client[TTx]) UpdateAgent added in v0.1.4

func (c *Client[TTx]) UpdateAgent(ctx context.Context, def *AgentDefinition) error

UpdateAgent updates an existing agent in the database.

func (*Client[TTx]) WaitForRun added in v0.1.4

func (c *Client[TTx]) WaitForRun(ctx context.Context, runID uuid.UUID) (*Response, error)

WaitForRun waits for a run to complete and returns the response.

type ClientConfig added in v0.1.4

type ClientConfig struct {
	// APIKey is the Anthropic API key (required).
	// Falls back to ANTHROPIC_API_KEY environment variable if not set.
	APIKey string

	// Name identifies this service instance for logging and debugging.
	// Defaults to hostname if not set.
	Name string

	// ID is the unique identifier for this client instance.
	// Defaults to a generated UUID if not set.
	// Must be unique across all running instances.
	ID string

	// MaxConcurrentRuns limits concurrent batch run processing.
	// Defaults to DefaultMaxConcurrentRuns (10).
	MaxConcurrentRuns int

	// MaxConcurrentStreamingRuns limits concurrent streaming run processing.
	// Streaming runs hold connections longer, so this is typically lower than MaxConcurrentRuns.
	// Defaults to DefaultMaxConcurrentStreamingRuns (5).
	MaxConcurrentStreamingRuns int

	// MaxConcurrentTools limits concurrent tool executions.
	// Defaults to DefaultMaxConcurrentTools (50).
	MaxConcurrentTools int

	// BatchPollInterval is how often to poll Claude Batch API for status.
	// Defaults to DefaultBatchPollInterval (30 seconds).
	BatchPollInterval time.Duration

	// RunPollInterval is the polling fallback interval for new runs.
	// Used when LISTEN/NOTIFY is unavailable.
	// Defaults to DefaultRunPollInterval (1 second).
	RunPollInterval time.Duration

	// ToolPollInterval is the polling interval for tool executions.
	// Used when LISTEN/NOTIFY is unavailable.
	// Defaults to DefaultToolPollInterval (500 milliseconds).
	ToolPollInterval time.Duration

	// HeartbeatInterval for instance liveness.
	// Defaults to DefaultHeartbeatInterval (15 seconds).
	HeartbeatInterval time.Duration

	// LeaderTTL is the leader election lease duration.
	// Defaults to DefaultLeaderTTL (30 seconds).
	LeaderTTL time.Duration

	// StuckRunTimeout marks runs as stuck after this duration without progress.
	// The leader will attempt to recover stuck runs.
	// Defaults to DefaultStuckRunTimeout (5 minutes).
	StuckRunTimeout time.Duration

	// InstanceTTL is how long an instance can go without heartbeat before cleanup.
	// Should be > 2x HeartbeatInterval.
	// Defaults to DefaultInstanceTTL (2 minutes).
	InstanceTTL time.Duration

	// CleanupInterval is how often to run cleanup jobs (stale instances, etc.).
	// Defaults to DefaultCleanupInterval (1 minute).
	CleanupInterval time.Duration

	// Logger for structured logging.
	// If nil, logs are discarded.
	Logger Logger

	// AutoCompactionEnabled enables automatic context compaction in workers.
	// When enabled, workers will check if compaction is needed after each run
	// completes and trigger compaction if the context exceeds the threshold.
	// Defaults to false (manual compaction only).
	AutoCompactionEnabled bool

	// CompactionConfig is the configuration for context compaction.
	// If nil, default compaction configuration is used.
	// Only used if AutoCompactionEnabled is true or when calling Compact() manually.
	CompactionConfig *compaction.Config

	// ToolRetryConfig configures tool execution retry behavior.
	// If nil, default retry configuration is used.
	ToolRetryConfig *ToolRetryConfig

	// RunRescueConfig configures run rescue behavior for stuck runs.
	// If nil, default rescue configuration is used.
	RunRescueConfig *RunRescueConfig
}

ClientConfig holds all configuration options for a Client.

func DefaultConfig added in v0.1.4

func DefaultConfig() *ClientConfig

DefaultConfig returns a new ClientConfig with all default values. Note: APIKey must still be set before use.

type CompactionEvent added in v0.1.4

type CompactionEvent struct {
	ID                  uuid.UUID   `json:"id"`
	SessionID           uuid.UUID   `json:"session_id"`
	Strategy            string      `json:"strategy"`
	OriginalTokens      int         `json:"original_tokens"`
	CompactedTokens     int         `json:"compacted_tokens"`
	MessagesRemoved     int         `json:"messages_removed"`
	SummaryContent      *string     `json:"summary_content,omitempty"`
	PreservedMessageIDs []uuid.UUID `json:"preserved_message_ids,omitempty"`
	ModelUsed           *string     `json:"model_used,omitempty"`
	DurationMS          *int64      `json:"duration_ms,omitempty"`
	CreatedAt           time.Time   `json:"created_at"`
}

CompactionEvent represents a context compaction operation.

type ContentBlock

type ContentBlock struct {
	Type string `json:"type"`

	// Text content (for ContentTypeText, ContentTypeThinking)
	Text string `json:"text,omitempty"`

	// Tool use fields (for ContentTypeToolUse, ContentTypeServerToolUse)
	ToolUseID string          `json:"tool_use_id,omitempty"`
	ToolName  string          `json:"tool_name,omitempty"`
	ToolInput json.RawMessage `json:"tool_input,omitempty"`

	// Tool result fields (for ContentTypeToolResult)
	ToolResultForUseID string `json:"tool_result_for_use_id,omitempty"`
	ToolContent        string `json:"tool_content,omitempty"`
	IsError            bool   `json:"is_error,omitempty"`

	// Media/document fields (for ContentTypeImage, ContentTypeDocument)
	Source json.RawMessage `json:"source,omitempty"`

	// Web search results (for ContentTypeWebSearchResult)
	SearchResults json.RawMessage `json:"search_results,omitempty"`

	Metadata map[string]any `json:"metadata,omitempty"`
}

ContentBlock represents a single content block within a message. Different fields are populated based on the Type.

type Instance added in v0.1.4

type Instance struct {
	ID                 string         `json:"id"`
	Name               string         `json:"name"`
	Hostname           *string        `json:"hostname,omitempty"`
	PID                *int           `json:"pid,omitempty"`
	Version            *string        `json:"version,omitempty"`
	MaxConcurrentRuns  int            `json:"max_concurrent_runs"`
	MaxConcurrentTools int            `json:"max_concurrent_tools"`
	ActiveRunCount     int            `json:"active_run_count"`
	ActiveToolCount    int            `json:"active_tool_count"`
	Metadata           map[string]any `json:"metadata,omitempty"`
	CreatedAt          time.Time      `json:"created_at"`
	LastHeartbeatAt    time.Time      `json:"last_heartbeat_at"`
}

Instance represents a running worker instance.

type Iteration added in v0.1.4

type Iteration struct {
	ID              uuid.UUID `json:"id"`
	RunID           uuid.UUID `json:"run_id"`
	IterationNumber int       `json:"iteration_number"`

	// API mode
	IsStreaming bool `json:"is_streaming"`

	// Batch API tracking (only populated when IsStreaming = false)
	BatchID          *string      `json:"batch_id,omitempty"`
	BatchRequestID   *string      `json:"batch_request_id,omitempty"`
	BatchStatus      *BatchStatus `json:"batch_status,omitempty"`
	BatchSubmittedAt *time.Time   `json:"batch_submitted_at,omitempty"`
	BatchCompletedAt *time.Time   `json:"batch_completed_at,omitempty"`
	BatchExpiresAt   *time.Time   `json:"batch_expires_at,omitempty"`
	BatchPollCount   int          `json:"batch_poll_count"`
	BatchLastPollAt  *time.Time   `json:"batch_last_poll_at,omitempty"`

	// Streaming API tracking (only populated when IsStreaming = true)
	StreamingStartedAt   *time.Time `json:"streaming_started_at,omitempty"`
	StreamingCompletedAt *time.Time `json:"streaming_completed_at,omitempty"`

	// Request context
	TriggerType       string      `json:"trigger_type"` // "user_prompt", "tool_results", "continuation"
	RequestMessageIDs []uuid.UUID `json:"request_message_ids,omitempty"`

	// Response
	StopReason         *string    `json:"stop_reason,omitempty"`
	ResponseMessageID  *uuid.UUID `json:"response_message_id,omitempty"`
	HasToolUse         bool       `json:"has_tool_use"`
	ToolExecutionCount int        `json:"tool_execution_count"`

	// Token usage (for this iteration only)
	InputTokens              int `json:"input_tokens"`
	OutputTokens             int `json:"output_tokens"`
	CacheCreationInputTokens int `json:"cache_creation_input_tokens"`
	CacheReadInputTokens     int `json:"cache_read_input_tokens"`

	// Error tracking
	ErrorMessage *string `json:"error_message,omitempty"`
	ErrorType    *string `json:"error_type,omitempty"`

	// Timestamps
	CreatedAt   time.Time  `json:"created_at"`
	StartedAt   *time.Time `json:"started_at,omitempty"`
	CompletedAt *time.Time `json:"completed_at,omitempty"`
}

Iteration represents a single Claude API call (batch or streaming) within a run.

func (*Iteration) Usage added in v0.1.4

func (i *Iteration) Usage() Usage

Usage returns the token usage for this iteration.

type Logger added in v0.1.4

type Logger interface {
	Debug(msg string, args ...any)
	Info(msg string, args ...any)
	Warn(msg string, args ...any)
	Error(msg string, args ...any)
}

Logger interface for structured logging. Compatible with slog.Logger and other structured loggers.

type Message

type Message struct {
	ID        uuid.UUID      `json:"id"`
	SessionID uuid.UUID      `json:"session_id"`
	RunID     *uuid.UUID     `json:"run_id,omitempty"`
	Role      MessageRole    `json:"role"`
	Content   []ContentBlock `json:"content"`
	Usage     Usage          `json:"usage"`

	// Compaction flags
	IsPreserved bool `json:"is_preserved"`
	IsSummary   bool `json:"is_summary"`

	Metadata  map[string]any `json:"metadata,omitempty"`
	CreatedAt time.Time      `json:"created_at"`
	UpdatedAt time.Time      `json:"updated_at"`
}

Message represents a conversation message stored in the database.

type MessageRole added in v0.1.4

type MessageRole string

MessageRole represents the role of a message in a conversation (mirrors agentpg_message_role enum).

const (
	MessageRoleUser      MessageRole = "user"
	MessageRoleAssistant MessageRole = "assistant"
	MessageRoleSystem    MessageRole = "system"
)

func (MessageRole) String added in v0.1.4

func (s MessageRole) String() string

String returns the string representation of the message role.

type Response

type Response struct {
	// Text is the final text response (extracted from the last assistant message).
	Text string

	// StopReason indicates why the run stopped: "end_turn", "max_tokens", "tool_use", "stop_sequence".
	StopReason string

	// Usage contains cumulative token statistics across all iterations.
	Usage Usage

	// Message is the full final assistant message with content blocks.
	Message *Message

	// IterationCount is the total number of API calls made for this run (batch or streaming).
	IterationCount int

	// ToolIterations is the number of iterations that involved tool_use.
	ToolIterations int

	// ToolCalls contains all tool calls made during this run, in execution order.
	// Populated from the database when the response is built.
	ToolCalls []ToolCall
}

Response is the result of a completed run.

type Run added in v0.1.4

type Run struct {
	ID        uuid.UUID `json:"id"`
	SessionID uuid.UUID `json:"session_id"`
	AgentID   uuid.UUID `json:"agent_id"`

	// Run mode (batch or streaming API)
	RunMode RunMode `json:"run_mode"`

	// Hierarchical run support
	ParentRunID           *uuid.UUID `json:"parent_run_id,omitempty"`
	ParentToolExecutionID *uuid.UUID `json:"parent_tool_execution_id,omitempty"`
	Depth                 int        `json:"depth"`

	// State machine
	State         RunState  `json:"state"`
	PreviousState *RunState `json:"previous_state,omitempty"`

	// Request
	Prompt string `json:"prompt"`

	// Iteration tracking
	CurrentIteration   int        `json:"current_iteration"`
	CurrentIterationID *uuid.UUID `json:"current_iteration_id,omitempty"`

	// Final response (populated when run completes)
	ResponseText *string `json:"response_text,omitempty"`
	StopReason   *string `json:"stop_reason,omitempty"`

	// Token usage (cumulative across all iterations)
	InputTokens              int `json:"input_tokens"`
	OutputTokens             int `json:"output_tokens"`
	CacheCreationInputTokens int `json:"cache_creation_input_tokens"`
	CacheReadInputTokens     int `json:"cache_read_input_tokens"`

	// Iteration counts
	IterationCount int `json:"iteration_count"`
	ToolIterations int `json:"tool_iterations"`

	// Error tracking
	ErrorMessage *string `json:"error_message,omitempty"`
	ErrorType    *string `json:"error_type,omitempty"`

	// Worker/claiming
	CreatedByInstanceID *string    `json:"created_by_instance_id,omitempty"`
	ClaimedByInstanceID *string    `json:"claimed_by_instance_id,omitempty"`
	ClaimedAt           *time.Time `json:"claimed_at,omitempty"`

	// Rescue tracking
	RescueAttempts int        `json:"rescue_attempts"`
	LastRescueAt   *time.Time `json:"last_rescue_at,omitempty"`

	// Metadata
	Metadata map[string]any `json:"metadata,omitempty"`

	// Per-run options (instruction overrides)
	Options *RunOptions `json:"options,omitempty"`

	// Timestamps
	CreatedAt   time.Time  `json:"created_at"`
	StartedAt   *time.Time `json:"started_at,omitempty"`
	FinalizedAt *time.Time `json:"finalized_at,omitempty"`
}

Run represents the state of an agent run.

func (*Run) Usage added in v0.1.4

func (r *Run) Usage() Usage

Usage returns the cumulative token usage for this run.

type RunMode added in v0.1.4

type RunMode string

RunMode represents the execution mode of a run (mirrors agentpg_run_mode enum). Determines which Claude API is used for processing.

const (
	// RunModeBatch uses the Claude Batch API (24h processing window, cost-effective).
	RunModeBatch RunMode = "batch"

	// RunModeStreaming uses the Claude Streaming API (real-time, low latency).
	RunModeStreaming RunMode = "streaming"
)

func (RunMode) String added in v0.1.4

func (m RunMode) String() string

String returns the string representation of the run mode.

type RunOptions added in v0.3.0

type RunOptions struct {
	// Variables are passed to tools via context during execution.
	// Accessed in tools using tool.GetVariable, tool.GetVariableOr, etc.
	Variables map[string]any `json:"variables,omitempty"`

	// OverrideInstructions completely replaces the agent's system prompt for this run.
	// If set, AppendInstructions is ignored.
	OverrideInstructions string `json:"override_instructions,omitempty"`

	// AppendInstructions is appended to the agent's system prompt for this run.
	// Ignored if OverrideInstructions is set.
	AppendInstructions string `json:"append_instructions,omitempty"`

	// OnToolStart is called when a tool execution begins.
	// Fires in a goroutine with panic recovery — must not block tool execution.
	// Only fires on the instance that executes the tool (in-memory callback).
	// For agent-as-tool, callbacks propagate to child runs automatically.
	OnToolStart func(ToolCallEvent) `json:"-"`

	// OnToolComplete is called when a tool execution finishes (success or error).
	// Fires in a goroutine with panic recovery — must not block tool execution.
	// Only fires on the instance that executes the tool (in-memory callback).
	// For agent-as-tool, callbacks propagate to child runs automatically.
	OnToolComplete func(ToolCallEvent) `json:"-"`
}

RunOptions provides per-run configuration that modifies agent behavior. Instruction overrides do NOT propagate to child runs in agent-as-tool hierarchies. Variables are propagated to child runs.

type RunRescueConfig added in v0.1.4

type RunRescueConfig struct {
	// RescueInterval is how often to check for stuck runs.
	// Only the leader performs rescue operations.
	// Default: 1 minute
	RescueInterval time.Duration

	// RescueTimeout is how long a run can be stuck before rescue.
	// A run is considered stuck if it's been in a non-terminal state
	// (batch_submitting, batch_pending, batch_processing, streaming, pending_tools)
	// for longer than this duration.
	// Default: 5 minutes (matches StuckRunTimeout)
	RescueTimeout time.Duration

	// MaxRescueAttempts is the maximum times a run can be rescued.
	// After this many rescue attempts, the run is marked as permanently failed.
	// Default: 3
	MaxRescueAttempts int
}

RunRescueConfig configures run rescue behavior for stuck runs. Runs stuck in non-terminal states are periodically rescued by the leader.

func DefaultRunRescueConfig added in v0.1.4

func DefaultRunRescueConfig() *RunRescueConfig

DefaultRunRescueConfig returns the default run rescue configuration.

type RunState added in v0.1.4

type RunState string

RunState represents the lifecycle of a run (mirrors agentpg_run_state enum). Supports both Batch API and Streaming API modes.

Batch mode state transitions:

pending ──────────────────┐
    │ (worker claims)     │
    v                     │
batch_submitting ─────────┤
    │ (batch created)     │
    v                     │
batch_pending ────────────┤
    │ (polling)           │
    v                     │
batch_processing ─────────┤
    │ (batch complete)    │
    ├──> pending_tools    │ (has tool_use blocks)
    ├──> completed        │ (stop_reason=end_turn)
    ├──> awaiting_input   │ (stop_reason=max_tokens, needs continuation)
    └──> failed           │ (error)

Streaming mode state transitions:

pending ──────────────────┐
    │ (worker claims)     │
    v                     │
streaming ────────────────┤
    │ (stream complete)   │
    ├──> pending_tools    │ (has tool_use blocks)
    ├──> completed        │ (stop_reason=end_turn)
    ├──> awaiting_input   │ (stop_reason=max_tokens)
    └──> failed           │ (error)

Common transitions (both modes):

pending_tools ────────────┤
    │ (all tools done)    │
    └──> pending          │ (continue with tool_results)

Terminal states: completed, cancelled, failed

const (
	RunStatePending         RunState = "pending"
	RunStateBatchSubmitting RunState = "batch_submitting"
	RunStateBatchPending    RunState = "batch_pending"
	RunStateBatchProcessing RunState = "batch_processing"
	RunStateStreaming       RunState = "streaming"
	RunStatePendingTools    RunState = "pending_tools"
	RunStateAwaitingInput   RunState = "awaiting_input"
	RunStateCompleted       RunState = "completed"
	RunStateCancelled       RunState = "cancelled"
	RunStateFailed          RunState = "failed"
)

func (RunState) IsTerminal added in v0.1.4

func (s RunState) IsTerminal() bool

IsTerminal returns true if the run state is a terminal state.

func (RunState) String added in v0.1.4

func (s RunState) String() string

String returns the string representation of the run state.

type Session added in v0.1.4

type Session struct {
	ID              uuid.UUID      `json:"id"`
	ParentSessionID *uuid.UUID     `json:"parent_session_id,omitempty"`
	Depth           int            `json:"depth"`
	Metadata        map[string]any `json:"metadata,omitempty"`
	CompactionCount int            `json:"compaction_count"`
	CreatedAt       time.Time      `json:"created_at"`
	UpdatedAt       time.Time      `json:"updated_at"`
}

Session represents a conversation context. App-specific fields (tenant_id, user_id, etc.) should be stored in Metadata.

type ToolCall added in v0.3.4

type ToolCall struct {
	Name            string             `json:"name"`
	Input           json.RawMessage    `json:"input"`
	Output          string             `json:"output"`
	IsError         bool               `json:"is_error"`
	ErrorMessage    string             `json:"error_message,omitempty"`
	IsAgentTool     bool               `json:"is_agent_tool"`
	AgentID         *uuid.UUID         `json:"agent_id,omitempty"`
	ChildRunID      *uuid.UUID         `json:"child_run_id,omitempty"`
	Duration        time.Duration      `json:"duration"`
	IterationNumber int                `json:"iteration_number"`
	State           ToolExecutionState `json:"state"`
	StartedAt       *time.Time         `json:"started_at,omitempty"`
	CompletedAt     *time.Time         `json:"completed_at,omitempty"`
}

ToolCall represents a single tool invocation within a run. This is a clean API type that strips internal scheduling fields from the raw tool execution.

type ToolCallEvent added in v0.3.4

type ToolCallEvent struct {
	RunID           uuid.UUID       `json:"run_id"`
	SessionID       uuid.UUID       `json:"session_id"`
	ToolName        string          `json:"tool_name"`
	ToolInput       json.RawMessage `json:"tool_input"`
	IsAgentTool     bool            `json:"is_agent_tool"`
	IterationNumber int             `json:"iteration_number"`
	// Only populated for OnToolComplete:
	Output       string        `json:"output,omitempty"`
	IsError      bool          `json:"is_error,omitempty"`
	ErrorMessage string        `json:"error_message,omitempty"`
	Duration     time.Duration `json:"duration,omitempty"`
}

ToolCallEvent is passed to OnToolStart and OnToolComplete callbacks.

type ToolDefinition added in v0.1.4

type ToolDefinition struct {
	Name        string         `json:"name"`
	Description string         `json:"description"`
	InputSchema map[string]any `json:"input_schema"`
	IsAgentTool bool           `json:"is_agent_tool"`
	AgentID     *uuid.UUID     `json:"agent_id,omitempty"`
	Metadata    map[string]any `json:"metadata,omitempty"`
	CreatedAt   time.Time      `json:"created_at,omitempty"`
	UpdatedAt   time.Time      `json:"updated_at,omitempty"`
}

ToolDefinition represents a tool stored in the database.

type ToolExecution added in v0.1.4

type ToolExecution struct {
	ID          uuid.UUID          `json:"id"`
	RunID       uuid.UUID          `json:"run_id"`
	IterationID uuid.UUID          `json:"iteration_id"`
	State       ToolExecutionState `json:"state"`

	// Tool identification
	ToolUseID string          `json:"tool_use_id"`
	ToolName  string          `json:"tool_name"`
	ToolInput json.RawMessage `json:"tool_input"`

	// Agent-as-tool support
	IsAgentTool bool       `json:"is_agent_tool"`
	AgentID     *uuid.UUID `json:"agent_id,omitempty"`
	ChildRunID  *uuid.UUID `json:"child_run_id,omitempty"`

	// Result
	ToolOutput   *string `json:"tool_output,omitempty"`
	IsError      bool    `json:"is_error"`
	ErrorMessage *string `json:"error_message,omitempty"`

	// Worker/claiming
	ClaimedByInstanceID *string    `json:"claimed_by_instance_id,omitempty"`
	ClaimedAt           *time.Time `json:"claimed_at,omitempty"`

	// Retry logic
	AttemptCount int `json:"attempt_count"`
	MaxAttempts  int `json:"max_attempts"`

	// Retry scheduling
	ScheduledAt time.Time `json:"scheduled_at"`
	SnoozeCount int       `json:"snooze_count"`
	LastError   *string   `json:"last_error,omitempty"`

	// Timestamps
	CreatedAt   time.Time  `json:"created_at"`
	StartedAt   *time.Time `json:"started_at,omitempty"`
	CompletedAt *time.Time `json:"completed_at,omitempty"`
}

ToolExecution represents a pending or completed tool execution.

type ToolExecutionState added in v0.1.4

type ToolExecutionState string

ToolExecutionState represents the lifecycle of a tool execution (mirrors agentpg_tool_execution_state enum).

State transitions:

pending ──────────────────┐
    │ (worker claims)     │
    v                     │
running ──────────────────┤
    ├──> completed        │ (success)
    ├──> failed           │ (error, may retry)
    └──> skipped          │ (run cancelled)
const (
	ToolStatePending   ToolExecutionState = "pending"
	ToolStateRunning   ToolExecutionState = "running"
	ToolStateCompleted ToolExecutionState = "completed"
	ToolStateFailed    ToolExecutionState = "failed"
	ToolStateSkipped   ToolExecutionState = "skipped"
)

func (ToolExecutionState) IsTerminal added in v0.1.4

func (s ToolExecutionState) IsTerminal() bool

IsTerminal returns true if the tool execution state is a terminal state.

func (ToolExecutionState) String added in v0.1.4

func (s ToolExecutionState) String() string

String returns the string representation of the tool execution state.

type ToolRetryConfig added in v0.1.4

type ToolRetryConfig struct {
	// MaxAttempts is the maximum number of execution attempts.
	// After this many attempts, the tool is marked as permanently failed.
	// Default: 3
	MaxAttempts int

	// Jitter adds randomness to prevent thundering herd.
	// Range: 0.0 to 1.0 (proportion of delay to randomize).
	// Default: 0.1 (10% jitter)
	Jitter float64
}

ToolRetryConfig configures tool execution retry behavior. Uses River's attempt^4 formula for exponential backoff.

func DefaultToolRetryConfig added in v0.1.4

func DefaultToolRetryConfig() *ToolRetryConfig

DefaultToolRetryConfig returns the default tool retry configuration.

func (*ToolRetryConfig) NextRetryDelay added in v0.1.4

func (c *ToolRetryConfig) NextRetryDelay(attemptCount int) time.Duration

NextRetryDelay calculates the delay before the next retry attempt. By default returns 0 for instant retry (snappy user experience). If Jitter > 0, uses River's attempt^4 formula with jitter for backoff.

type Usage

type Usage struct {
	InputTokens              int `json:"input_tokens"`
	OutputTokens             int `json:"output_tokens"`
	CacheCreationInputTokens int `json:"cache_creation_input_tokens"`
	CacheReadInputTokens     int `json:"cache_read_input_tokens"`
}

Usage contains token usage statistics from Claude API.

func (Usage) Add added in v0.1.4

func (u Usage) Add(other Usage) Usage

Add combines two Usage values.

func (Usage) Total added in v0.1.4

func (u Usage) Total() int

Total returns the total number of tokens (input + output).

Directories

Path Synopsis
Package compaction provides context window management for AI agent conversations.
Package compaction provides context window management for AI agent conversations.
Package driver defines the interfaces for database drivers used by AgentPG.
Package driver defines the interfaces for database drivers used by AgentPG.
databasesql module
pgxv5 module
examples
admin_ui command
Example: admin_ui
Example: admin_ui
admin_ui_auth command
Example: admin_ui_auth
Example: admin_ui_auth
admin_ui_full command
Example: admin_ui_full
Example: admin_ui_full
basic/01_simple_chat command
Package main demonstrates the basic usage of AgentPG.
Package main demonstrates the basic usage of AgentPG.
basic/02_shared_tools command
Package main demonstrates the Client API with multiple agents sharing tools.
Package main demonstrates the Client API with multiple agents sharing tools.
cancel_regenerate command
Package main demonstrates the CancelRun and RegenerateRun APIs.
Package main demonstrates the CancelRun and RegenerateRun APIs.
context_compaction/01_auto_compaction command
Package main demonstrates the Client API with auto compaction using per-client registration.
Package main demonstrates the Client API with auto compaction using per-client registration.
context_compaction/02_manual_compaction command
Package main demonstrates the per-client API with manual compaction.
Package main demonstrates the per-client API with manual compaction.
context_compaction/03_custom_strategy command
Package main demonstrates custom compaction strategies.
Package main demonstrates custom compaction strategies.
context_compaction/04_compaction_monitoring command
Package main demonstrates compaction monitoring using the Client API.
Package main demonstrates compaction monitoring using the Client API.
custom_tools/01_struct_tool command
Package main demonstrates the Client API with struct-based tools.
Package main demonstrates the Client API with struct-based tools.
custom_tools/02_func_tool command
Package main demonstrates the Client API with function-based tools.
Package main demonstrates the Client API with function-based tools.
custom_tools/03_schema_validation command
Package main demonstrates the Client API with advanced schema validation.
Package main demonstrates the Client API with advanced schema validation.
custom_tools/04_parallel_execution command
Package main demonstrates tool usage with AgentPG.
Package main demonstrates tool usage with AgentPG.
database_sql command
Package main demonstrates using the Client API with database/sql driver.
Package main demonstrates using the Client API with database/sql driver.
distributed command
Package main demonstrates the Client API with distributed workers.
Package main demonstrates the Client API with distributed workers.
extended_context command
Package main demonstrates the Client API with extended context support.
Package main demonstrates the Client API with extended context support.
mcp_tools/01_everything_server command
Package main demonstrates MCP tool integration using the Everything test server.
Package main demonstrates MCP tool integration using the Everything test server.
mcp_tools/02_filesystem_server command
Package main demonstrates MCP tool integration with the Filesystem server.
Package main demonstrates MCP tool integration with the Filesystem server.
mcp_tools/03_github_server command
Package main demonstrates MCP tool integration with the GitHub server.
Package main demonstrates MCP tool integration with the GitHub server.
mcp_tools/04_multi_server command
Package main demonstrates combining multiple MCP servers with local tools.
Package main demonstrates combining multiple MCP servers with local tools.
nested_agents/01_basic_delegation command
Package main demonstrates the Client API with basic agent delegation.
Package main demonstrates the Client API with basic agent delegation.
nested_agents/02_specialist_agents command
Package main demonstrates the Client API with specialist agents.
Package main demonstrates the Client API with specialist agents.
nested_agents/03_multi_level_hierarchy command
Package main demonstrates the Client API with multi-level agent hierarchy.
Package main demonstrates the Client API with multi-level agent hierarchy.
retry_rescue/01_instant_retry command
Package main demonstrates the default instant retry behavior.
Package main demonstrates the default instant retry behavior.
retry_rescue/02_error_types command
Package main demonstrates the different tool error types.
Package main demonstrates the different tool error types.
retry_rescue/03_exponential_backoff command
Package main demonstrates opt-in exponential backoff for retries.
Package main demonstrates opt-in exponential backoff for retries.
tool_calls command
Package main demonstrates the tool call visibility features of AgentPG.
Package main demonstrates the tool call visibility features of AgentPG.
mcp module
Package tool defines error types for controlling tool execution retry behavior.
Package tool defines error types for controlling tool execution retry behavior.
ui
Package ui provides an embedded web UI for AgentPG.
Package ui provides an embedded web UI for AgentPG.
frontend
Package frontend provides SSR frontend handlers for the AgentPG web UI.
Package frontend provides SSR frontend handlers for the AgentPG web UI.
service
Package service provides the shared business logic for the AgentPG web UI.
Package service provides the shared business logic for the AgentPG web UI.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL