server

package
v0.9.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 8, 2025 License: MIT Imports: 25 Imported by: 6

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	BuildAgentName        = ""
	BuildAgentDescription = ""
	BuildAgentVersion     = ""
)

Build-time metadata variables set via LD flags

Functions

func GenerateTaskID

func GenerateTaskID() string

GenerateTaskID generates a unique task ID using UUID v4

func GetSupportedProviders added in v0.9.0

func GetSupportedProviders() []string

GetSupportedProviders returns a list of all registered providers

func JSONTool

func JSONTool(result any) (string, error)

JSONTool creates a tool result that can be marshaled to JSON

func NewEmptyMessagePartsError

func NewEmptyMessagePartsError() error

NewEmptyMessagePartsError creates a new EmptyMessagePartsError

func NewStreamingNotImplementedError

func NewStreamingNotImplementedError() error

NewStreamingNotImplementedError creates a new StreamingNotImplementedError

func NewTaskNotCancelableError added in v0.9.0

func NewTaskNotCancelableError(taskID string, state types.TaskState) error

NewTaskNotCancelableError creates a new TaskNotCancelableError

func NewTaskNotFoundError

func NewTaskNotFoundError(taskID string) error

NewTaskNotFoundError creates a new TaskNotFoundError

func RegisterStorageProvider added in v0.9.0

func RegisterStorageProvider(provider string, factory StorageFactory)

RegisterStorageProvider registers a storage provider factory

func StringPtr

func StringPtr(s string) *string

StringPtr returns a pointer to the given string

Types

type A2AProtocolHandler added in v0.9.4

type A2AProtocolHandler interface {
	// HandleMessageSend processes message/send requests
	HandleMessageSend(c *gin.Context, req types.JSONRPCRequest)

	// HandleMessageStream processes message/stream requests
	HandleMessageStream(c *gin.Context, req types.JSONRPCRequest)

	// HandleTaskGet processes tasks/get requests
	HandleTaskGet(c *gin.Context, req types.JSONRPCRequest)

	// HandleTaskList processes tasks/list requests
	HandleTaskList(c *gin.Context, req types.JSONRPCRequest)

	// HandleTaskCancel processes tasks/cancel requests
	HandleTaskCancel(c *gin.Context, req types.JSONRPCRequest)

	// HandleTaskPushNotificationConfigSet processes tasks/pushNotificationConfig/set requests
	HandleTaskPushNotificationConfigSet(c *gin.Context, req types.JSONRPCRequest)

	// HandleTaskPushNotificationConfigGet processes tasks/pushNotificationConfig/get requests
	HandleTaskPushNotificationConfigGet(c *gin.Context, req types.JSONRPCRequest)

	// HandleTaskPushNotificationConfigList processes tasks/pushNotificationConfig/list requests
	HandleTaskPushNotificationConfigList(c *gin.Context, req types.JSONRPCRequest)

	// HandleTaskPushNotificationConfigDelete processes tasks/pushNotificationConfig/delete requests
	HandleTaskPushNotificationConfigDelete(c *gin.Context, req types.JSONRPCRequest)
}

A2AProtocolHandler defines the interface for handling A2A protocol requests

type A2AServer

type A2AServer interface {
	// Start starts the A2A server on the configured port
	Start(ctx context.Context) error

	// Stop gracefully stops the A2A server
	Stop(ctx context.Context) error

	// GetAgentCard returns the agent's capabilities and metadata
	// Returns nil if no agent card has been explicitly set
	GetAgentCard() *types.AgentCard

	// StartTaskProcessor starts the background task processor
	StartTaskProcessor(ctx context.Context)

	// SetPollingTaskHandler sets the task handler for polling/queue-based scenarios
	SetBackgroundTaskHandler(handler TaskHandler)

	// GetPollingTaskHandler returns the configured polling task handler
	GetBackgroundTaskHandler() TaskHandler

	// SetStreamingTaskHandler sets the task handler for streaming scenarios
	SetStreamingTaskHandler(handler StreamableTaskHandler)

	// GetStreamingTaskHandler returns the configured streaming task handler
	GetStreamingTaskHandler() StreamableTaskHandler

	// SetAgent sets the OpenAI-compatible agent for processing tasks
	SetAgent(agent OpenAICompatibleAgent)

	// GetAgent returns the configured OpenAI-compatible agent
	GetAgent() OpenAICompatibleAgent

	// SetAgentName sets the agent's name dynamically
	SetAgentName(name string)

	// SetAgentDescription sets the agent's description dynamically
	SetAgentDescription(description string)

	// SetAgentURL sets the agent's URL dynamically
	SetAgentURL(url string)

	// SetAgentVersion sets the agent's version dynamically
	SetAgentVersion(version string)

	// SetAgentCard sets a custom agent card that overrides the default card generation
	SetAgentCard(agentCard types.AgentCard)

	// LoadAgentCardFromFile loads and sets an agent card from a JSON file
	// The optional overrides map allows dynamic replacement of JSON attribute values
	LoadAgentCardFromFile(filePath string, overrides map[string]any) error
}

A2AServer defines the interface for an A2A-compatible server

func CustomA2AServer

func CustomA2AServer(
	cfg config.Config,
	logger *zap.Logger,
	pollingTaskHandler TaskHandler,
	streamingTaskHandler StreamableTaskHandler,
	taskResultProcessor TaskResultProcessor,
	agentCard types.AgentCard,
) (A2AServer, error)

CustomA2AServer creates an A2A server with custom components This provides more control over the server configuration

func CustomA2AServerWithAgent

func CustomA2AServerWithAgent(
	cfg config.Config,
	logger *zap.Logger,
	agent OpenAICompatibleAgent,
	toolBox ToolBox,
	taskResultProcessor TaskResultProcessor,
	agentCard types.AgentCard,
) (A2AServer, error)

CustomA2AServerWithAgent creates an A2A server with custom components and an agent This provides maximum control over the server configuration

func SimpleA2AServerWithAgent

func SimpleA2AServerWithAgent(cfg config.Config, logger *zap.Logger, agent OpenAICompatibleAgent, agentCard types.AgentCard) (A2AServer, error)

SimpleA2AServerWithAgent creates a basic A2A server with an OpenAI-compatible agent This is a convenience function for agent-based use cases

type A2AServerBuilder

type A2AServerBuilder interface {
	// WithBackgroundTaskHandler sets a custom task handler for polling/queue-based scenarios.
	// This handler will be used for message/send requests and background queue processing.
	WithBackgroundTaskHandler(handler TaskHandler) A2AServerBuilder

	// WithStreamingTaskHandler sets a custom task handler for streaming scenarios.
	// This handler will be used for message/stream requests.
	WithStreamingTaskHandler(handler StreamableTaskHandler) A2AServerBuilder

	// WithDefaultBackgroundTaskHandler sets a default background task handler optimized for background scenarios.
	// This handler automatically handles input-required pausing without requiring custom implementation.
	WithDefaultBackgroundTaskHandler() A2AServerBuilder // WithDefaultStreamingTaskHandler sets a default streaming task handler optimized for streaming scenarios.

	// This handler automatically handles input-required pausing with streaming-aware behavior.
	WithDefaultStreamingTaskHandler() A2AServerBuilder

	// WithDefaultTaskHandlers sets both default polling and streaming task handlers.
	// This is a convenience method that sets up optimized handlers for both scenarios.
	WithDefaultTaskHandlers() A2AServerBuilder

	// WithTaskResultProcessor sets a custom task result processor for handling tool call results.
	// This allows custom business logic for determining when tasks should be completed.
	WithTaskResultProcessor(processor TaskResultProcessor) A2AServerBuilder

	// WithAgent sets a pre-configured OpenAI-compatible agent for processing tasks.
	// This is useful when you have already configured an agent with specific settings.
	WithAgent(agent OpenAICompatibleAgent) A2AServerBuilder

	// WithAgentCard sets a custom agent card that overrides the default card generation.
	// This gives full control over the agent's advertised capabilities and metadata.
	WithAgentCard(agentCard types.AgentCard) A2AServerBuilder

	// WithAgentCardFromFile loads and sets an agent card from a JSON file.
	// This provides a convenient way to load agent configuration from a static file.
	// The optional overrides map allows dynamic replacement of JSON attribute values.
	WithAgentCardFromFile(filePath string, overrides map[string]any) A2AServerBuilder

	// WithLogger sets a custom logger for the builder and resulting server.
	// This allows using a logger configured with appropriate level based on the Debug config.
	WithLogger(logger *zap.Logger) A2AServerBuilder

	// Build creates and returns the configured A2A server.
	// This method applies configuration defaults and initializes all components.
	Build() (A2AServer, error)
}

A2AServerBuilder provides a fluent interface for building A2A servers with custom configurations. This interface allows for flexible server construction with optional components and settings. Use NewA2AServerBuilder to create an instance, then chain method calls to configure the server.

Example:

server := NewA2AServerBuilder(config, logger).
  WithAgent(agent).
  Build()

func NewA2AServerBuilder

func NewA2AServerBuilder(cfg config.Config, logger *zap.Logger) A2AServerBuilder

NewA2AServerBuilder creates a new server builder with required dependencies. The configuration passed here will be used to configure the server. Any nil nested configuration objects will be populated with sensible defaults when Build() is called.

Parameters:

  • cfg: The base configuration for the server (agent name, port, etc.)
  • logger: Logger instance to use for the server (should match cfg.Debug level)

Returns:

A2AServerBuilder interface that can be used to further configure the server before building.

Example:

cfg := config.Config{
  AgentName: "my-agent",
  Port: "8080",
  Debug: true,
}
logger, _ := zap.NewDevelopment() // Use development logger for debug
server := NewA2AServerBuilder(cfg, logger).
  WithAgent(myAgent).
  Build()

type A2AServerBuilderImpl

type A2AServerBuilderImpl struct {
	// contains filtered or unexported fields
}

A2AServerBuilderImpl is the concrete implementation of the A2AServerBuilder interface. It provides a fluent interface for building A2A servers with custom configurations. This struct holds the configuration and optional components that will be used to create the server.

func (*A2AServerBuilderImpl) Build

func (b *A2AServerBuilderImpl) Build() (A2AServer, error)

Build creates and returns the configured A2A server.

func (*A2AServerBuilderImpl) WithAgent

WithAgent sets a custom OpenAI-compatible agent

func (*A2AServerBuilderImpl) WithAgentCard

func (b *A2AServerBuilderImpl) WithAgentCard(agentCard types.AgentCard) A2AServerBuilder

WithAgentCard sets a custom agent card that overrides the default card generation

func (*A2AServerBuilderImpl) WithAgentCardFromFile

func (b *A2AServerBuilderImpl) WithAgentCardFromFile(filePath string, overrides map[string]any) A2AServerBuilder

WithAgentCardFromFile loads and sets an agent card from a JSON file The optional overrides map allows dynamic replacement of JSON attribute values

func (*A2AServerBuilderImpl) WithBackgroundTaskHandler added in v0.9.0

func (b *A2AServerBuilderImpl) WithBackgroundTaskHandler(handler TaskHandler) A2AServerBuilder

WithBackgroundTaskHandler sets a custom task handler for polling/queue-based scenarios

func (*A2AServerBuilderImpl) WithDefaultBackgroundTaskHandler added in v0.9.0

func (b *A2AServerBuilderImpl) WithDefaultBackgroundTaskHandler() A2AServerBuilder

WithDefaultBackgroundTaskHandler sets a default background task handler optimized for background scenarios

func (*A2AServerBuilderImpl) WithDefaultStreamingTaskHandler added in v0.9.0

func (b *A2AServerBuilderImpl) WithDefaultStreamingTaskHandler() A2AServerBuilder

WithDefaultStreamingTaskHandler sets a default streaming task handler optimized for streaming scenarios

func (*A2AServerBuilderImpl) WithDefaultTaskHandlers added in v0.9.0

func (b *A2AServerBuilderImpl) WithDefaultTaskHandlers() A2AServerBuilder

WithDefaultTaskHandlers sets both default background and streaming task handlers

func (*A2AServerBuilderImpl) WithLogger

func (b *A2AServerBuilderImpl) WithLogger(logger *zap.Logger) A2AServerBuilder

WithLogger sets a custom logger for the builder

func (*A2AServerBuilderImpl) WithStreamingTaskHandler added in v0.9.0

func (b *A2AServerBuilderImpl) WithStreamingTaskHandler(handler StreamableTaskHandler) A2AServerBuilder

WithStreamingTaskHandler sets a custom task handler for streaming scenarios

func (*A2AServerBuilderImpl) WithTaskResultProcessor

func (b *A2AServerBuilderImpl) WithTaskResultProcessor(processor TaskResultProcessor) A2AServerBuilder

WithTaskResultProcessor sets a custom task result processor

type A2AServerImpl

type A2AServerImpl struct {
	// contains filtered or unexported fields
}

func NewA2AServer

func NewA2AServer(cfg *config.Config, logger *zap.Logger, otel otel.OpenTelemetry) *A2AServerImpl

NewA2AServer creates a new A2A server with the provided configuration and logger

func NewA2AServerEnvironmentAware

func NewA2AServerEnvironmentAware(cfg *config.Config, logger *zap.Logger, otel otel.OpenTelemetry) *A2AServerImpl

NewA2AServerEnvironmentAware creates a new A2A server with environment-aware configuration.

func NewA2AServerWithAgent

func NewA2AServerWithAgent(cfg *config.Config, logger *zap.Logger, otel otel.OpenTelemetry, agent OpenAICompatibleAgent) *A2AServerImpl

NewA2AServerWithAgent creates a new A2A server with an optional OpenAI-compatible agent

func NewDefaultA2AServer

func NewDefaultA2AServer(cfg *config.Config) *A2AServerImpl

NewDefaultA2AServer creates a new default A2A server implementation

func (*A2AServerImpl) GetAgent

func (s *A2AServerImpl) GetAgent() OpenAICompatibleAgent

GetAgent returns the configured OpenAI-compatible agent

func (*A2AServerImpl) GetAgentCard

func (s *A2AServerImpl) GetAgentCard() *types.AgentCard

GetAgentCard returns the agent's capabilities and metadata Returns nil if no agent card has been explicitly set

func (*A2AServerImpl) GetBackgroundTaskHandler added in v0.9.0

func (s *A2AServerImpl) GetBackgroundTaskHandler() TaskHandler

GetBackgroundTaskHandler returns the configured polling task handler

func (*A2AServerImpl) GetStreamingTaskHandler added in v0.9.0

func (s *A2AServerImpl) GetStreamingTaskHandler() StreamableTaskHandler

GetStreamingTaskHandler returns the configured streaming task handler

func (*A2AServerImpl) LoadAgentCardFromFile

func (s *A2AServerImpl) LoadAgentCardFromFile(filePath string, overrides map[string]any) error

LoadAgentCardFromFile loads and sets an agent card from a JSON file The optional overrides map allows dynamic replacement of JSON attribute values

func (*A2AServerImpl) SetAgent

func (s *A2AServerImpl) SetAgent(agent OpenAICompatibleAgent)

SetAgent sets the OpenAI-compatible agent for processing tasks

func (*A2AServerImpl) SetAgentCard

func (s *A2AServerImpl) SetAgentCard(agentCard types.AgentCard)

SetAgentCard sets a custom agent card that overrides the default card generation

func (*A2AServerImpl) SetAgentDescription

func (s *A2AServerImpl) SetAgentDescription(description string)

SetAgentDescription sets the agent's description dynamically

func (*A2AServerImpl) SetAgentName

func (s *A2AServerImpl) SetAgentName(name string)

SetAgentName sets the agent's name dynamically

func (*A2AServerImpl) SetAgentURL

func (s *A2AServerImpl) SetAgentURL(url string)

SetAgentURL sets the agent's URL dynamically

func (*A2AServerImpl) SetAgentVersion

func (s *A2AServerImpl) SetAgentVersion(version string)

SetAgentVersion sets the agent's version dynamically

func (*A2AServerImpl) SetBackgroundTaskHandler added in v0.9.0

func (s *A2AServerImpl) SetBackgroundTaskHandler(handler TaskHandler)

SetBackgroundTaskHandler sets the task handler for polling/queue-based scenarios

func (*A2AServerImpl) SetStreamingTaskHandler added in v0.9.0

func (s *A2AServerImpl) SetStreamingTaskHandler(handler StreamableTaskHandler)

SetStreamingTaskHandler sets the task handler for streaming scenarios

func (*A2AServerImpl) SetTaskResultProcessor

func (s *A2AServerImpl) SetTaskResultProcessor(processor TaskResultProcessor)

SetTaskResultProcessor sets the task result processor for custom business logic

func (*A2AServerImpl) Start

func (s *A2AServerImpl) Start(ctx context.Context) error

Start starts the A2A server

func (*A2AServerImpl) StartTaskProcessor

func (s *A2AServerImpl) StartTaskProcessor(ctx context.Context)

StartTaskProcessor starts the background task processing goroutine

func (*A2AServerImpl) Stop

func (s *A2AServerImpl) Stop(ctx context.Context) error

Stop gracefully stops the A2A server

type AgentBuilder

type AgentBuilder interface {
	// WithConfig sets the agent configuration
	WithConfig(config *config.AgentConfig) AgentBuilder
	// WithLLMClient sets a pre-configured LLM client
	WithLLMClient(client LLMClient) AgentBuilder
	// WithToolBox sets a custom toolbox
	WithToolBox(toolBox ToolBox) AgentBuilder
	// WithSystemPrompt sets the system prompt (overrides config)
	WithSystemPrompt(prompt string) AgentBuilder
	// WithMaxChatCompletion sets the maximum chat completion iterations for the agent
	WithMaxChatCompletion(max int) AgentBuilder
	// WithMaxConversationHistory sets the maximum conversation history for the agent
	WithMaxConversationHistory(max int) AgentBuilder
	// GetConfig returns the current agent configuration (for testing purposes)
	GetConfig() *config.AgentConfig
	// Build creates and returns the configured agent
	Build() (*OpenAICompatibleAgentImpl, error)
}

AgentBuilder provides a fluent interface for building OpenAI-compatible agents with custom configurations. This interface allows for flexible agent construction with optional components and settings. Use NewAgentBuilder to create an instance, then chain method calls to configure the agent.

Example:

agent := NewAgentBuilder(logger).
  WithConfig(agentConfig).
  WithLLMClient(client).
  Build()

func NewAgentBuilder

func NewAgentBuilder(logger *zap.Logger) AgentBuilder

NewAgentBuilder creates a new agent builder with required dependencies.

Parameters:

  • logger: Logger instance to use for the agent

Returns:

AgentBuilder interface that can be used to configure the agent before building.

Example:

logger, _ := zap.NewDevelopment()
agent, err := NewAgentBuilder(logger).
  WithConfig(agentConfig).
  Build()

type AgentBuilderImpl

type AgentBuilderImpl struct {
	// contains filtered or unexported fields
}

AgentBuilderImpl is the concrete implementation of the AgentBuilder interface. It provides a fluent interface for building OpenAI-compatible agents with custom configurations.

func (*AgentBuilderImpl) Build

Build creates and returns the configured agent

func (*AgentBuilderImpl) GetConfig

func (b *AgentBuilderImpl) GetConfig() *config.AgentConfig

GetConfig returns the current agent configuration (for testing purposes)

func (*AgentBuilderImpl) WithConfig

func (b *AgentBuilderImpl) WithConfig(userConfig *config.AgentConfig) AgentBuilder

WithConfig sets the agent configuration

func (*AgentBuilderImpl) WithLLMClient

func (b *AgentBuilderImpl) WithLLMClient(client LLMClient) AgentBuilder

WithLLMClient sets a pre-configured LLM client

func (*AgentBuilderImpl) WithMaxChatCompletion

func (b *AgentBuilderImpl) WithMaxChatCompletion(max int) AgentBuilder

WithMaxChatCompletion sets the maximum chat completion iterations for the agent

func (*AgentBuilderImpl) WithMaxConversationHistory

func (b *AgentBuilderImpl) WithMaxConversationHistory(max int) AgentBuilder

WithMaxConversationHistory sets the maximum conversation history for the agent

func (*AgentBuilderImpl) WithSystemPrompt

func (b *AgentBuilderImpl) WithSystemPrompt(prompt string) AgentBuilder

WithSystemPrompt sets the system prompt (overrides config)

func (*AgentBuilderImpl) WithToolBox

func (b *AgentBuilderImpl) WithToolBox(toolBox ToolBox) AgentBuilder

WithToolBox sets a custom toolbox

type AgentResponse added in v0.9.0

type AgentResponse struct {
	// Response is the main assistant response message
	Response *types.Message
	// AdditionalMessages contains any tool calls, tool responses, or intermediate messages
	// that should be added to the conversation history
	AdditionalMessages []types.Message
}

AgentResponse contains the response and any additional messages generated during agent execution

type BasicTool

type BasicTool struct {
	// contains filtered or unexported fields
}

BasicTool is a simple implementation of the Tool interface using function callbacks

func NewBasicTool

func NewBasicTool(
	name string,
	description string,
	parameters map[string]any,
	executor func(ctx context.Context, arguments map[string]any) (string, error),
) *BasicTool

NewBasicTool creates a new BasicTool

func (*BasicTool) Execute

func (t *BasicTool) Execute(ctx context.Context, arguments map[string]any) (string, error)

func (*BasicTool) GetDescription

func (t *BasicTool) GetDescription() string

func (*BasicTool) GetName

func (t *BasicTool) GetName() string

func (*BasicTool) GetParameters

func (t *BasicTool) GetParameters() map[string]any

type DatabaseStorage added in v0.9.0

type DatabaseStorage struct {
	// contains filtered or unexported fields
}

DatabaseStorage is an example implementation of Storage interface using a database This is a placeholder to show the extensibility pattern

func NewDatabaseStorage added in v0.9.0

func NewDatabaseStorage(logger *zap.Logger, connectionString string) (*DatabaseStorage, error)

NewDatabaseStorage creates a new database-backed storage instance

func (*DatabaseStorage) AddMessageToConversation added in v0.9.0

func (d *DatabaseStorage) AddMessageToConversation(contextID string, message types.Message) error

func (*DatabaseStorage) CleanupCompletedTasks added in v0.9.0

func (d *DatabaseStorage) CleanupCompletedTasks() int

func (*DatabaseStorage) CleanupOldConversations added in v0.9.0

func (d *DatabaseStorage) CleanupOldConversations(maxAge int64) int

func (*DatabaseStorage) DeleteContext added in v0.9.0

func (d *DatabaseStorage) DeleteContext(contextID string) error

func (*DatabaseStorage) DeleteContextAndTasks added in v0.9.0

func (d *DatabaseStorage) DeleteContextAndTasks(contextID string) error

func (*DatabaseStorage) DeleteTask added in v0.9.0

func (d *DatabaseStorage) DeleteTask(taskID string) error

func (*DatabaseStorage) GetContexts added in v0.9.0

func (d *DatabaseStorage) GetContexts() []string

func (*DatabaseStorage) GetContextsWithTasks added in v0.9.0

func (d *DatabaseStorage) GetContextsWithTasks() []string

func (*DatabaseStorage) GetConversationHistory added in v0.9.0

func (d *DatabaseStorage) GetConversationHistory(contextID string) []types.Message

func (*DatabaseStorage) GetStats added in v0.9.0

func (d *DatabaseStorage) GetStats() StorageStats

func (*DatabaseStorage) GetTask added in v0.9.0

func (d *DatabaseStorage) GetTask(taskID string) (*types.Task, bool)

func (*DatabaseStorage) GetTaskByContextAndID added in v0.9.0

func (d *DatabaseStorage) GetTaskByContextAndID(contextID, taskID string) (*types.Task, bool)

func (*DatabaseStorage) ListTasks added in v0.9.0

func (d *DatabaseStorage) ListTasks(filter TaskFilter) ([]*types.Task, error)

func (*DatabaseStorage) ListTasksByContext added in v0.9.0

func (d *DatabaseStorage) ListTasksByContext(contextID string, filter TaskFilter) ([]*types.Task, error)

func (*DatabaseStorage) StoreTask added in v0.9.0

func (d *DatabaseStorage) StoreTask(task *types.Task) error

func (*DatabaseStorage) TrimConversationHistory added in v0.9.0

func (d *DatabaseStorage) TrimConversationHistory(contextID string, maxMessages int) error

func (*DatabaseStorage) UpdateConversationHistory added in v0.9.0

func (d *DatabaseStorage) UpdateConversationHistory(contextID string, messages []types.Message)

func (*DatabaseStorage) UpdateTask added in v0.9.0

func (d *DatabaseStorage) UpdateTask(task *types.Task) error

type DefaultA2AProtocolHandler added in v0.9.4

type DefaultA2AProtocolHandler struct {
	// contains filtered or unexported fields
}

DefaultA2AProtocolHandler implements the A2AProtocolHandler interface

func NewDefaultA2AProtocolHandler added in v0.9.4

func NewDefaultA2AProtocolHandler(
	logger *zap.Logger,
	storage Storage,
	taskManager TaskManager,
	responseSender ResponseSender,
	backgroundTaskHandler TaskHandler,
	streamingTaskHandler StreamableTaskHandler,
) *DefaultA2AProtocolHandler

NewDefaultA2AProtocolHandler creates a new default A2A protocol handler

func (*DefaultA2AProtocolHandler) HandleMessageSend added in v0.9.4

func (h *DefaultA2AProtocolHandler) HandleMessageSend(c *gin.Context, req types.JSONRPCRequest)

HandleMessageSend processes message/send requests

func (*DefaultA2AProtocolHandler) HandleMessageStream added in v0.9.4

func (h *DefaultA2AProtocolHandler) HandleMessageStream(c *gin.Context, req types.JSONRPCRequest)

HandleMessageStream processes message/stream requests

func (*DefaultA2AProtocolHandler) HandleTaskCancel added in v0.9.4

func (h *DefaultA2AProtocolHandler) HandleTaskCancel(c *gin.Context, req types.JSONRPCRequest)

HandleTaskCancel processes tasks/cancel requests

func (*DefaultA2AProtocolHandler) HandleTaskGet added in v0.9.4

func (h *DefaultA2AProtocolHandler) HandleTaskGet(c *gin.Context, req types.JSONRPCRequest)

HandleTaskGet processes tasks/get requests

func (*DefaultA2AProtocolHandler) HandleTaskList added in v0.9.4

func (h *DefaultA2AProtocolHandler) HandleTaskList(c *gin.Context, req types.JSONRPCRequest)

HandleTaskList processes tasks/list requests

func (*DefaultA2AProtocolHandler) HandleTaskPushNotificationConfigDelete added in v0.9.4

func (h *DefaultA2AProtocolHandler) HandleTaskPushNotificationConfigDelete(c *gin.Context, req types.JSONRPCRequest)

HandleTaskPushNotificationConfigDelete processes tasks/pushNotificationConfig/delete requests

func (*DefaultA2AProtocolHandler) HandleTaskPushNotificationConfigGet added in v0.9.4

func (h *DefaultA2AProtocolHandler) HandleTaskPushNotificationConfigGet(c *gin.Context, req types.JSONRPCRequest)

HandleTaskPushNotificationConfigGet processes tasks/pushNotificationConfig/get requests

func (*DefaultA2AProtocolHandler) HandleTaskPushNotificationConfigList added in v0.9.4

func (h *DefaultA2AProtocolHandler) HandleTaskPushNotificationConfigList(c *gin.Context, req types.JSONRPCRequest)

HandleTaskPushNotificationConfigList processes tasks/pushNotificationConfig/list requests

func (*DefaultA2AProtocolHandler) HandleTaskPushNotificationConfigSet added in v0.9.4

func (h *DefaultA2AProtocolHandler) HandleTaskPushNotificationConfigSet(c *gin.Context, req types.JSONRPCRequest)

HandleTaskPushNotificationConfigSet processes tasks/pushNotificationConfig/set requests

type DefaultBackgroundTaskHandler added in v0.9.0

type DefaultBackgroundTaskHandler struct {
	// contains filtered or unexported fields
}

DefaultBackgroundTaskHandler implements the TaskHandler interface optimized for background scenarios This handler automatically handles input-required pausing without requiring custom implementation

func NewDefaultBackgroundTaskHandler added in v0.9.0

func NewDefaultBackgroundTaskHandler(logger *zap.Logger, agent OpenAICompatibleAgent) *DefaultBackgroundTaskHandler

NewDefaultBackgroundTaskHandler creates a new default background task handler

func NewDefaultBackgroundTaskHandlerWithAgent added in v0.9.0

func NewDefaultBackgroundTaskHandlerWithAgent(logger *zap.Logger, agent OpenAICompatibleAgent) *DefaultBackgroundTaskHandler

NewDefaultBackgroundTaskHandlerWithAgent creates a new default background task handler with an agent

func (*DefaultBackgroundTaskHandler) GetAgent added in v0.9.0

GetAgent returns the configured agent

func (*DefaultBackgroundTaskHandler) HandleTask added in v0.9.0

func (bth *DefaultBackgroundTaskHandler) HandleTask(ctx context.Context, task *types.Task, message *types.Message) (*types.Task, error)

HandleTask processes a task with optimized logic for background scenarios

func (*DefaultBackgroundTaskHandler) SetAgent added in v0.9.0

SetAgent sets the agent for the task handler

type DefaultResponseSender

type DefaultResponseSender struct {
	// contains filtered or unexported fields
}

DefaultResponseSender implements the ResponseSender interface

func NewDefaultResponseSender

func NewDefaultResponseSender(logger *zap.Logger) *DefaultResponseSender

NewDefaultResponseSender creates a new default response sender

func (*DefaultResponseSender) SendError

func (rs *DefaultResponseSender) SendError(c *gin.Context, id any, code int, message string)

SendError sends a JSON-RPC error response

func (*DefaultResponseSender) SendSuccess

func (rs *DefaultResponseSender) SendSuccess(c *gin.Context, id any, result any)

SendSuccess sends a JSON-RPC success response

type DefaultStreamingTaskHandler added in v0.9.0

type DefaultStreamingTaskHandler struct {
	// contains filtered or unexported fields
}

DefaultStreamingTaskHandler implements the TaskHandler interface optimized for streaming scenarios This handler automatically handles input-required pausing with streaming-aware behavior

func NewDefaultStreamingTaskHandler added in v0.9.0

func NewDefaultStreamingTaskHandler(logger *zap.Logger, agent OpenAICompatibleAgent) *DefaultStreamingTaskHandler

NewDefaultStreamingTaskHandler creates a new default streaming task handler

func (*DefaultStreamingTaskHandler) GetAgent added in v0.9.0

GetAgent returns the configured agent

func (*DefaultStreamingTaskHandler) HandleStreamingTask added in v0.9.4

func (sth *DefaultStreamingTaskHandler) HandleStreamingTask(ctx context.Context, task *types.Task, message *types.Message) (<-chan StreamEvent, error)

HandleStreamingTask processes a task and returns a channel of streaming events

func (*DefaultStreamingTaskHandler) SetAgent added in v0.9.0

SetAgent sets the agent for the task handler

type DefaultTaskHandler

type DefaultTaskHandler struct {
	// contains filtered or unexported fields
}

DefaultTaskHandler implements the TaskHandler interface for basic scenarios For optimized background or streaming with automatic input-required pausing, use DefaultBackgroundTaskHandler or DefaultStreamingTaskHandler instead

func NewDefaultTaskHandler

func NewDefaultTaskHandler(logger *zap.Logger) *DefaultTaskHandler

NewDefaultTaskHandler creates a new default task handler

func NewDefaultTaskHandlerWithAgent added in v0.9.0

func NewDefaultTaskHandlerWithAgent(logger *zap.Logger, agent OpenAICompatibleAgent) *DefaultTaskHandler

NewDefaultTaskHandlerWithAgent creates a new default task handler with an agent

func (*DefaultTaskHandler) GetAgent added in v0.9.0

GetAgent returns the configured OpenAI-compatible agent

func (*DefaultTaskHandler) HandleTask

func (th *DefaultTaskHandler) HandleTask(ctx context.Context, task *types.Task, message *types.Message) (*types.Task, error)

HandleTask processes a task and returns the updated task If an agent is configured, it will use the agent's capabilities, otherwise it will provide a simple response

func (*DefaultTaskHandler) SetAgent added in v0.9.0

func (th *DefaultTaskHandler) SetAgent(agent OpenAICompatibleAgent)

SetAgent sets the OpenAI-compatible agent for the task handler

type DefaultTaskManager

type DefaultTaskManager struct {
	// contains filtered or unexported fields
}

DefaultTaskManager implements the TaskManager interface

func NewDefaultTaskManager

func NewDefaultTaskManager(logger *zap.Logger) *DefaultTaskManager

NewDefaultTaskManager creates a new default task manager

func NewDefaultTaskManagerWithNotifications

func NewDefaultTaskManagerWithNotifications(logger *zap.Logger, notificationSender PushNotificationSender) *DefaultTaskManager

NewDefaultTaskManagerWithNotifications creates a new default task manager with push notification support

func NewDefaultTaskManagerWithStorage added in v0.9.0

func NewDefaultTaskManagerWithStorage(logger *zap.Logger, storage Storage) *DefaultTaskManager

NewDefaultTaskManagerWithStorage creates a new default task manager with custom storage

func (*DefaultTaskManager) CancelTask

func (tm *DefaultTaskManager) CancelTask(taskID string) error

CancelTask cancels a task

func (*DefaultTaskManager) CleanupCompletedTasks

func (tm *DefaultTaskManager) CleanupCompletedTasks()

CleanupCompletedTasks removes old completed tasks from memory

func (*DefaultTaskManager) CreateTask

func (tm *DefaultTaskManager) CreateTask(contextID string, state types.TaskState, message *types.Message) *types.Task

CreateTask creates a new task with message history managed within the task

func (*DefaultTaskManager) CreateTaskWithHistory added in v0.9.0

func (tm *DefaultTaskManager) CreateTaskWithHistory(contextID string, state types.TaskState, message *types.Message, history []types.Message) *types.Task

CreateTaskWithHistory creates a new task with existing conversation history

func (*DefaultTaskManager) DeleteTaskPushNotificationConfig

func (tm *DefaultTaskManager) DeleteTaskPushNotificationConfig(params types.DeleteTaskPushNotificationConfigParams) error

DeleteTaskPushNotificationConfig deletes a push notification configuration

func (*DefaultTaskManager) GetConversationHistory

func (tm *DefaultTaskManager) GetConversationHistory(contextID string) []types.Message

GetConversationHistory retrieves conversation history for a context ID

func (*DefaultTaskManager) GetStorage added in v0.9.0

func (tm *DefaultTaskManager) GetStorage() Storage

GetStorage returns the storage interface used by this task manager

func (*DefaultTaskManager) GetTask

func (tm *DefaultTaskManager) GetTask(taskID string) (*types.Task, bool)

GetTask retrieves a task by ID

func (*DefaultTaskManager) GetTaskPushNotificationConfig

GetTaskPushNotificationConfig gets push notification configuration for a task

func (*DefaultTaskManager) IsTaskPaused added in v0.9.0

func (tm *DefaultTaskManager) IsTaskPaused(taskID string) (bool, error)

IsTaskPaused checks if a task is currently paused (in input-required state)

func (*DefaultTaskManager) ListTaskPushNotificationConfigs

ListTaskPushNotificationConfigs lists all push notification configurations for a task

func (*DefaultTaskManager) ListTasks

func (tm *DefaultTaskManager) ListTasks(params types.TaskListParams) (*types.TaskList, error)

ListTasks retrieves a list of tasks based on the provided parameters

func (*DefaultTaskManager) PauseTaskForInput added in v0.9.0

func (tm *DefaultTaskManager) PauseTaskForInput(taskID string, message *types.Message) error

PauseTaskForInput pauses a task waiting for additional input from the client

func (*DefaultTaskManager) PollTaskStatus

func (tm *DefaultTaskManager) PollTaskStatus(taskID string, interval time.Duration, timeout time.Duration) (*types.Task, error)

PollTaskStatus periodically checks the status of a task until it is completed or failed

func (*DefaultTaskManager) ResumeTaskWithInput added in v0.9.0

func (tm *DefaultTaskManager) ResumeTaskWithInput(taskID string, message *types.Message) error

ResumeTaskWithInput resumes a paused task with new input from the client

func (*DefaultTaskManager) SetNotificationSender

func (tm *DefaultTaskManager) SetNotificationSender(sender PushNotificationSender)

SetNotificationSender sets the push notification sender

func (*DefaultTaskManager) SetRetentionConfig added in v0.9.0

func (tm *DefaultTaskManager) SetRetentionConfig(retentionConfig config.TaskRetentionConfig)

SetRetentionConfig sets the task retention configuration and starts automatic cleanup

func (*DefaultTaskManager) SetTaskPushNotificationConfig

func (tm *DefaultTaskManager) SetTaskPushNotificationConfig(config types.TaskPushNotificationConfig) (*types.TaskPushNotificationConfig, error)

SetTaskPushNotificationConfig sets push notification configuration for a task

func (*DefaultTaskManager) StopCleanup added in v0.9.0

func (tm *DefaultTaskManager) StopCleanup()

StopCleanup stops the automatic cleanup process

func (*DefaultTaskManager) UpdateConversationHistory

func (tm *DefaultTaskManager) UpdateConversationHistory(contextID string, messages []types.Message)

UpdateConversationHistory updates conversation history for a context ID

func (*DefaultTaskManager) UpdateError added in v0.9.0

func (tm *DefaultTaskManager) UpdateError(taskID string, message *types.Message) error

UpdateError updates a task to failed state with an error message

func (*DefaultTaskManager) UpdateState added in v0.9.0

func (tm *DefaultTaskManager) UpdateState(taskID string, state types.TaskState) error

UpdateState updates a task's state

func (*DefaultTaskManager) UpdateTask

func (tm *DefaultTaskManager) UpdateTask(task *types.Task) error

UpdateTask updates a complete task (including history, state, and message)

type DefaultToolBox

type DefaultToolBox struct {
	// contains filtered or unexported fields
}

DefaultToolBox is a default implementation of ToolBox

func NewDefaultToolBox

func NewDefaultToolBox() *DefaultToolBox

NewDefaultToolBox creates a new DefaultToolBox with built-in tools

func NewToolBox added in v0.9.0

func NewToolBox() *DefaultToolBox

NewToolBox creates a new empty DefaultToolBox

func (*DefaultToolBox) AddTool

func (tb *DefaultToolBox) AddTool(tool Tool)

AddTool adds a tool to the toolbox

func (*DefaultToolBox) ExecuteTool

func (tb *DefaultToolBox) ExecuteTool(ctx context.Context, toolName string, arguments map[string]any) (string, error)

ExecuteTool executes a tool by name with the provided arguments

func (*DefaultToolBox) GetToolNames

func (tb *DefaultToolBox) GetToolNames() []string

GetToolNames returns a list of all available tool names

func (*DefaultToolBox) GetTools

func (tb *DefaultToolBox) GetTools() []sdk.ChatCompletionTool

GetTools returns all available tools in OpenAI function call format

func (*DefaultToolBox) HasTool

func (tb *DefaultToolBox) HasTool(toolName string) bool

HasTool checks if a tool with the given name exists

type DeltaStreamEvent added in v0.9.4

type DeltaStreamEvent struct {
	Data interface{}
}

DeltaStreamEvent represents a delta streaming event

func (*DeltaStreamEvent) GetData added in v0.9.4

func (e *DeltaStreamEvent) GetData() interface{}

func (*DeltaStreamEvent) GetEventType added in v0.9.4

func (e *DeltaStreamEvent) GetEventType() string

type EmptyMessagePartsError

type EmptyMessagePartsError struct{}

EmptyMessagePartsError represents an error for empty message parts

func (*EmptyMessagePartsError) Error

func (e *EmptyMessagePartsError) Error() string

type ErrorStreamEvent added in v0.9.4

type ErrorStreamEvent struct {
	ErrorMessage string
}

ErrorStreamEvent represents an error streaming event

func (*ErrorStreamEvent) GetData added in v0.9.4

func (e *ErrorStreamEvent) GetData() interface{}

func (*ErrorStreamEvent) GetEventType added in v0.9.4

func (e *ErrorStreamEvent) GetEventType() string

type HTTPPushNotificationSender

type HTTPPushNotificationSender struct {
	// contains filtered or unexported fields
}

HTTPPushNotificationSender implements push notifications via HTTP webhooks

func NewHTTPPushNotificationSender

func NewHTTPPushNotificationSender(logger *zap.Logger) *HTTPPushNotificationSender

NewHTTPPushNotificationSender creates a new HTTP-based push notification sender

func (*HTTPPushNotificationSender) SendTaskUpdate

func (s *HTTPPushNotificationSender) SendTaskUpdate(ctx context.Context, config types.PushNotificationConfig, task *types.Task) error

SendTaskUpdate sends a push notification about a task update

type InMemoryStorage added in v0.9.0

type InMemoryStorage struct {
	// contains filtered or unexported fields
}

InMemoryStorage implements Storage interface using in-memory storage

func NewInMemoryStorage added in v0.9.0

func NewInMemoryStorage(logger *zap.Logger, maxConversationHistory int) *InMemoryStorage

NewInMemoryStorage creates a new in-memory storage instance

func (*InMemoryStorage) CleanupCompletedTasks added in v0.9.0

func (s *InMemoryStorage) CleanupCompletedTasks() int

CleanupCompletedTasks removes completed, failed, and canceled tasks

func (*InMemoryStorage) CleanupOldConversations added in v0.9.0

func (s *InMemoryStorage) CleanupOldConversations(maxAge int64) int

CleanupOldConversations removes conversations older than maxAge (in seconds)

func (*InMemoryStorage) CleanupTasksWithRetention added in v0.9.0

func (s *InMemoryStorage) CleanupTasksWithRetention(maxCompleted, maxFailed int) int

CleanupTasksWithRetention removes old completed and failed tasks while keeping the specified number of most recent ones

func (*InMemoryStorage) ClearQueue added in v0.9.0

func (s *InMemoryStorage) ClearQueue() error

ClearQueue removes all tasks from the queue

func (*InMemoryStorage) CreateActiveTask added in v0.9.0

func (s *InMemoryStorage) CreateActiveTask(task *types.Task) error

CreateActiveTask creates a new active task in the active tasks storage

func (*InMemoryStorage) DeleteContext added in v0.9.0

func (s *InMemoryStorage) DeleteContext(contextID string) error

DeleteContext deletes all tasks for a context (not applicable since no conversation history)

func (*InMemoryStorage) DeleteContextAndTasks added in v0.9.0

func (s *InMemoryStorage) DeleteContextAndTasks(contextID string) error

DeleteContextAndTasks deletes all tasks for a context

func (*InMemoryStorage) DeleteTask added in v0.9.0

func (s *InMemoryStorage) DeleteTask(taskID string) error

DeleteTask deletes a task from dead letter queue and cleans up context mapping

func (*InMemoryStorage) DequeueTask added in v0.9.0

func (s *InMemoryStorage) DequeueTask(ctx context.Context) (*QueuedTask, error)

DequeueTask retrieves and removes the next task from the processing queue Blocks until a task is available or context is cancelled

func (*InMemoryStorage) EnqueueTask added in v0.9.0

func (s *InMemoryStorage) EnqueueTask(task *types.Task, requestID any) error

EnqueueTask adds a task to the processing queue

func (*InMemoryStorage) GetActiveTask added in v0.9.0

func (s *InMemoryStorage) GetActiveTask(taskID string) (*types.Task, error)

GetActiveTask retrieves an active task by ID (from queue or processing)

func (*InMemoryStorage) GetContexts added in v0.9.0

func (s *InMemoryStorage) GetContexts() []string

GetContexts returns all context IDs that have tasks (both active and dead letter)

func (*InMemoryStorage) GetContextsWithTasks added in v0.9.0

func (s *InMemoryStorage) GetContextsWithTasks() []string

GetContextsWithTasks returns all context IDs that have tasks

func (*InMemoryStorage) GetQueueLength added in v0.9.0

func (s *InMemoryStorage) GetQueueLength() int

GetQueueLength returns the current number of tasks in the queue

func (*InMemoryStorage) GetStats added in v0.9.0

func (s *InMemoryStorage) GetStats() StorageStats

GetStats provides statistics about the storage

func (*InMemoryStorage) GetTask added in v0.9.0

func (s *InMemoryStorage) GetTask(taskID string) (*types.Task, bool)

GetTask retrieves a task by ID from dead letter queue

func (*InMemoryStorage) GetTaskByContextAndID added in v0.9.0

func (s *InMemoryStorage) GetTaskByContextAndID(contextID, taskID string) (*types.Task, bool)

GetTaskByContextAndID retrieves a task by context ID and task ID from dead letter queue

func (*InMemoryStorage) ListTasks added in v0.9.0

func (s *InMemoryStorage) ListTasks(filter TaskFilter) ([]*types.Task, error)

ListTasks retrieves a list of tasks based on the provided filter from both active and dead letter queues

func (*InMemoryStorage) ListTasksByContext added in v0.9.0

func (s *InMemoryStorage) ListTasksByContext(contextID string, filter TaskFilter) ([]*types.Task, error)

ListTasksByContext retrieves tasks for a specific context with filtering from dead letter queue

func (*InMemoryStorage) StoreDeadLetterTask added in v0.9.0

func (s *InMemoryStorage) StoreDeadLetterTask(task *types.Task) error

StoreDeadLetterTask stores a completed/failed task in the dead letter queue for audit

func (*InMemoryStorage) UpdateActiveTask added in v0.9.0

func (s *InMemoryStorage) UpdateActiveTask(task *types.Task) error

UpdateActiveTask updates an active task's metadata

type InMemoryStorageFactory added in v0.9.0

type InMemoryStorageFactory struct{}

InMemoryStorageFactory implements StorageFactory for in-memory storage

func (*InMemoryStorageFactory) CreateStorage added in v0.9.0

func (f *InMemoryStorageFactory) CreateStorage(ctx context.Context, config config.QueueConfig, logger *zap.Logger) (Storage, error)

CreateStorage creates an in-memory storage instance

func (*InMemoryStorageFactory) SupportedProvider added in v0.9.0

func (f *InMemoryStorageFactory) SupportedProvider() string

SupportedProvider returns the provider name

func (*InMemoryStorageFactory) ValidateConfig added in v0.9.0

func (f *InMemoryStorageFactory) ValidateConfig(config config.QueueConfig) error

ValidateConfig validates the configuration for in-memory storage

type JRPCErrorCode

type JRPCErrorCode int

JRPCErrorCode represents JSON-RPC error codes

const (
	ErrParseError     JRPCErrorCode = -32700
	ErrInvalidRequest JRPCErrorCode = -32600
	ErrMethodNotFound JRPCErrorCode = -32601
	ErrInvalidParams  JRPCErrorCode = -32602
	ErrInternalError  JRPCErrorCode = -32603
	ErrServerError    JRPCErrorCode = -32000
)

type LLMClient

type LLMClient interface {
	// CreateChatCompletion sends a chat completion request using SDK messages
	CreateChatCompletion(ctx context.Context, messages []sdk.Message, tools ...sdk.ChatCompletionTool) (*sdk.CreateChatCompletionResponse, error)

	// CreateStreamingChatCompletion sends a streaming chat completion request using SDK messages
	CreateStreamingChatCompletion(ctx context.Context, messages []sdk.Message, tools ...sdk.ChatCompletionTool) (<-chan *sdk.CreateChatCompletionStreamResponse, <-chan error)
}

LLMClient defines the interface for Language Model clients

type OpenAICompatibleAgent

type OpenAICompatibleAgent interface {
	// Run processes a conversation and returns the assistant's response along with any additional messages
	// Uses the agent's configured toolbox for tool execution
	Run(ctx context.Context, messages []types.Message) (*AgentResponse, error)

	// RunWithStream processes a conversation and returns a streaming response
	// Uses the agent's configured toolbox for tool execution
	RunWithStream(ctx context.Context, messages []types.Message) (<-chan cloudevents.Event, error)
}

OpenAICompatibleAgent represents an agent that can interact with OpenAI-compatible LLM APIs and execute tools The agent is stateless and does not maintain conversation history Tools are configured during agent creation via the toolbox

type OpenAICompatibleAgentImpl added in v0.9.0

type OpenAICompatibleAgentImpl struct {
	// contains filtered or unexported fields
}

OpenAICompatibleAgentImpl is the implementation of OpenAICompatibleAgent This implementation is stateless and does not maintain conversation history

func AgentWithConfig

func AgentWithConfig(logger *zap.Logger, config *config.AgentConfig) (*OpenAICompatibleAgentImpl, error)

AgentWithConfig creates an agent with the provided configuration

func AgentWithLLM

func AgentWithLLM(logger *zap.Logger, llmClient LLMClient) (*OpenAICompatibleAgentImpl, error)

AgentWithLLM creates an agent with a pre-configured LLM client

func FullyConfiguredAgent

func FullyConfiguredAgent(logger *zap.Logger, config *config.AgentConfig, llmClient LLMClient, toolBox ToolBox) (*OpenAICompatibleAgentImpl, error)

FullyConfiguredAgent creates an agent with all components configured

func NewOpenAICompatibleAgent added in v0.9.0

func NewOpenAICompatibleAgent(logger *zap.Logger) *OpenAICompatibleAgentImpl

NewOpenAICompatibleAgent creates a new OpenAICompatibleAgentImpl

func NewOpenAICompatibleAgentWithConfig

func NewOpenAICompatibleAgentWithConfig(logger *zap.Logger, cfg *config.AgentConfig) *OpenAICompatibleAgentImpl

NewOpenAICompatibleAgentWithConfig creates a new OpenAICompatibleAgentImpl with configuration

func NewOpenAICompatibleAgentWithLLM

func NewOpenAICompatibleAgentWithLLM(logger *zap.Logger, llmClient LLMClient) *OpenAICompatibleAgentImpl

NewOpenAICompatibleAgentWithLLM creates a new agent with an LLM client

func NewOpenAICompatibleAgentWithLLMConfig added in v0.9.0

func NewOpenAICompatibleAgentWithLLMConfig(logger *zap.Logger, config *config.AgentConfig) (*OpenAICompatibleAgentImpl, error)

NewOpenAICompatibleAgentWithLLMConfig creates a new agent with LLM configuration

func SimpleAgent

func SimpleAgent(logger *zap.Logger) (*OpenAICompatibleAgentImpl, error)

SimpleAgent creates a basic agent with default configuration

func (*OpenAICompatibleAgentImpl) Run added in v0.9.0

Run processes a conversation and returns the assistant's response along with additional messages

func (*OpenAICompatibleAgentImpl) RunWithStream added in v0.9.0

func (a *OpenAICompatibleAgentImpl) RunWithStream(ctx context.Context, messages []types.Message) (<-chan cloudevents.Event, error)

RunWithStream processes a conversation and returns a streaming response with iterative tool calling support

func (*OpenAICompatibleAgentImpl) SetLLMClient added in v0.9.0

func (a *OpenAICompatibleAgentImpl) SetLLMClient(client LLMClient)

SetLLMClient sets the LLM client for the agent

func (*OpenAICompatibleAgentImpl) SetToolBox added in v0.9.0

func (a *OpenAICompatibleAgentImpl) SetToolBox(toolBox ToolBox)

SetToolBox sets the tool box for the agent

type OpenAICompatibleLLMClient

type OpenAICompatibleLLMClient struct {
	// contains filtered or unexported fields
}

OpenAICompatibleLLMClient implements LLMClient using an OpenAI-compatible API via the Inference Gateway SDK

func NewOpenAICompatibleLLMClient

func NewOpenAICompatibleLLMClient(cfg *config.AgentConfig, logger *zap.Logger) (*OpenAICompatibleLLMClient, error)

NewOpenAICompatibleLLMClient creates a new OpenAI-compatible LLM client

func (*OpenAICompatibleLLMClient) CreateChatCompletion

func (c *OpenAICompatibleLLMClient) CreateChatCompletion(ctx context.Context, messages []sdk.Message, tools ...sdk.ChatCompletionTool) (*sdk.CreateChatCompletionResponse, error)

CreateChatCompletion implements LLMClient.CreateChatCompletion using SDK messages

func (*OpenAICompatibleLLMClient) CreateStreamingChatCompletion

func (c *OpenAICompatibleLLMClient) CreateStreamingChatCompletion(ctx context.Context, messages []sdk.Message, tools ...sdk.ChatCompletionTool) (<-chan *sdk.CreateChatCompletionStreamResponse, <-chan error)

CreateStreamingChatCompletion implements LLMClient.CreateStreamingChatCompletion using SDK messages

type PushNotificationSender

type PushNotificationSender interface {
	SendTaskUpdate(ctx context.Context, config types.PushNotificationConfig, task *types.Task) error
}

PushNotificationSender handles sending push notifications

type QueuedTask

type QueuedTask struct {
	Task      *types.Task
	RequestID any
}

QueuedTask represents a task in the processing queue

type RedisStorage added in v0.9.0

type RedisStorage struct {
	// contains filtered or unexported fields
}

RedisStorage implements Storage interface using Redis

func (*RedisStorage) CleanupCompletedTasks added in v0.9.0

func (s *RedisStorage) CleanupCompletedTasks() int

CleanupCompletedTasks removes completed, failed, and canceled tasks

func (*RedisStorage) CleanupTasksWithRetention added in v0.9.0

func (s *RedisStorage) CleanupTasksWithRetention(maxCompleted, maxFailed int) int

CleanupTasksWithRetention removes old completed and failed tasks while keeping specified number

func (*RedisStorage) ClearQueue added in v0.9.0

func (s *RedisStorage) ClearQueue() error

ClearQueue removes all tasks from the queue

func (*RedisStorage) CreateActiveTask added in v0.9.0

func (s *RedisStorage) CreateActiveTask(task *types.Task) error

CreateActiveTask creates a new active task

func (*RedisStorage) DeleteContext added in v0.9.0

func (s *RedisStorage) DeleteContext(contextID string) error

DeleteContext deletes all tasks for a context

func (*RedisStorage) DeleteContextAndTasks added in v0.9.0

func (s *RedisStorage) DeleteContextAndTasks(contextID string) error

DeleteContextAndTasks deletes all tasks for a context

func (*RedisStorage) DeleteTask added in v0.9.0

func (s *RedisStorage) DeleteTask(taskID string) error

DeleteTask deletes a task from dead letter queue and cleans up context mapping

func (*RedisStorage) DequeueTask added in v0.9.0

func (s *RedisStorage) DequeueTask(ctx context.Context) (*QueuedTask, error)

DequeueTask retrieves and removes the next task from the processing queue

func (*RedisStorage) EnqueueTask added in v0.9.0

func (s *RedisStorage) EnqueueTask(task *types.Task, requestID any) error

EnqueueTask adds a task to the processing queue

func (*RedisStorage) GetActiveTask added in v0.9.0

func (s *RedisStorage) GetActiveTask(taskID string) (*types.Task, error)

GetActiveTask retrieves an active task by ID

func (*RedisStorage) GetContexts added in v0.9.0

func (s *RedisStorage) GetContexts() []string

GetContexts returns all context IDs that have tasks

func (*RedisStorage) GetContextsWithTasks added in v0.9.0

func (s *RedisStorage) GetContextsWithTasks() []string

GetContextsWithTasks returns all context IDs that have tasks

func (*RedisStorage) GetQueueLength added in v0.9.0

func (s *RedisStorage) GetQueueLength() int

GetQueueLength returns the current number of tasks in the queue

func (*RedisStorage) GetStats added in v0.9.0

func (s *RedisStorage) GetStats() StorageStats

GetStats provides statistics about the storage

func (*RedisStorage) GetTask added in v0.9.0

func (s *RedisStorage) GetTask(taskID string) (*types.Task, bool)

GetTask retrieves a task by ID from dead letter queue

func (*RedisStorage) GetTaskByContextAndID added in v0.9.0

func (s *RedisStorage) GetTaskByContextAndID(contextID, taskID string) (*types.Task, bool)

GetTaskByContextAndID retrieves a task by context ID and task ID

func (*RedisStorage) ListTasks added in v0.9.0

func (s *RedisStorage) ListTasks(filter TaskFilter) ([]*types.Task, error)

ListTasks retrieves a list of tasks based on the provided filter

func (*RedisStorage) ListTasksByContext added in v0.9.0

func (s *RedisStorage) ListTasksByContext(contextID string, filter TaskFilter) ([]*types.Task, error)

ListTasksByContext retrieves tasks for a specific context

func (*RedisStorage) StoreDeadLetterTask added in v0.9.0

func (s *RedisStorage) StoreDeadLetterTask(task *types.Task) error

StoreDeadLetterTask stores a completed/failed task in the dead letter queue

func (*RedisStorage) UpdateActiveTask added in v0.9.0

func (s *RedisStorage) UpdateActiveTask(task *types.Task) error

UpdateActiveTask updates an active task's metadata

type RedisStorageFactory added in v0.9.0

type RedisStorageFactory struct{}

RedisStorageFactory implements StorageFactory for Redis storage

func (*RedisStorageFactory) CreateStorage added in v0.9.0

func (f *RedisStorageFactory) CreateStorage(ctx context.Context, config config.QueueConfig, logger *zap.Logger) (Storage, error)

CreateStorage creates a Redis storage instance

func (*RedisStorageFactory) SupportedProvider added in v0.9.0

func (f *RedisStorageFactory) SupportedProvider() string

SupportedProvider returns the provider name

func (*RedisStorageFactory) ValidateConfig added in v0.9.0

func (f *RedisStorageFactory) ValidateConfig(config config.QueueConfig) error

ValidateConfig validates the configuration for Redis storage

type ResponseSender

type ResponseSender interface {
	// SendSuccess sends a JSON-RPC success response
	SendSuccess(c *gin.Context, id any, result any)

	// SendError sends a JSON-RPC error response
	SendError(c *gin.Context, id any, code int, message string)
}

ResponseSender defines how to send JSON-RPC responses

type SortOrder added in v0.9.0

type SortOrder string

SortOrder defines the sort order

const (
	SortOrderAsc  SortOrder = "asc"
	SortOrderDesc SortOrder = "desc"
)

type StatusStreamEvent added in v0.9.4

type StatusStreamEvent struct {
	Status interface{}
}

StatusStreamEvent represents a status update streaming event

func (*StatusStreamEvent) GetData added in v0.9.4

func (e *StatusStreamEvent) GetData() interface{}

func (*StatusStreamEvent) GetEventType added in v0.9.4

func (e *StatusStreamEvent) GetEventType() string

type Storage added in v0.9.0

type Storage interface {
	// Task Queue Management (primary storage for active tasks)
	EnqueueTask(task *types.Task, requestID any) error
	DequeueTask(ctx context.Context) (*QueuedTask, error)
	GetQueueLength() int
	ClearQueue() error

	// Active Task Queries (for tasks currently in queue or being processed)
	GetActiveTask(taskID string) (*types.Task, error)
	CreateActiveTask(task *types.Task) error
	UpdateActiveTask(task *types.Task) error

	// Dead Letter Queue (completed/failed tasks with full history for audit)
	StoreDeadLetterTask(task *types.Task) error
	GetTask(taskID string) (*types.Task, bool)
	GetTaskByContextAndID(contextID, taskID string) (*types.Task, bool)
	DeleteTask(taskID string) error
	ListTasks(filter TaskFilter) ([]*types.Task, error)
	ListTasksByContext(contextID string, filter TaskFilter) ([]*types.Task, error)

	// Context Management (contexts are implicit from tasks)
	GetContexts() []string
	GetContextsWithTasks() []string
	DeleteContext(contextID string) error
	DeleteContextAndTasks(contextID string) error

	// Cleanup Operations
	CleanupCompletedTasks() int
	CleanupTasksWithRetention(maxCompleted, maxFailed int) int

	// Health and Statistics
	GetStats() StorageStats
}

Storage defines the interface for queue-centric task management Tasks carry their complete message history and flow through: Queue -> Processing -> Dead Letter

func CreateStorage added in v0.9.0

func CreateStorage(ctx context.Context, config config.QueueConfig, logger *zap.Logger) (Storage, error)

CreateStorage creates a storage instance using the registered factories

type StorageFactory added in v0.9.0

type StorageFactory interface {
	// CreateStorage creates a storage instance with the given configuration
	CreateStorage(ctx context.Context, config config.QueueConfig, logger *zap.Logger) (Storage, error)

	// SupportedProvider returns the provider name this factory supports
	SupportedProvider() string

	// ValidateConfig validates the configuration for this provider
	ValidateConfig(config config.QueueConfig) error
}

StorageFactory defines the interface for creating storage instances

func GetStorageProvider added in v0.9.0

func GetStorageProvider(provider string) (StorageFactory, error)

GetStorageProvider retrieves a storage provider factory

type StorageFactoryRegistry added in v0.9.0

type StorageFactoryRegistry struct {
	// contains filtered or unexported fields
}

StorageFactoryRegistry manages registered storage providers

func (*StorageFactoryRegistry) CreateStorage added in v0.9.0

func (r *StorageFactoryRegistry) CreateStorage(ctx context.Context, config config.QueueConfig, logger *zap.Logger) (Storage, error)

CreateStorage creates a storage instance using the appropriate factory

func (*StorageFactoryRegistry) GetFactory added in v0.9.0

func (r *StorageFactoryRegistry) GetFactory(provider string) (StorageFactory, error)

GetFactory retrieves a factory for a provider

func (*StorageFactoryRegistry) GetProviders added in v0.9.0

func (r *StorageFactoryRegistry) GetProviders() []string

GetProviders returns a list of all registered provider names

func (*StorageFactoryRegistry) Register added in v0.9.0

func (r *StorageFactoryRegistry) Register(provider string, factory StorageFactory)

Register registers a factory for a provider

type StorageStats added in v0.9.0

type StorageStats struct {
	TotalTasks                int            `json:"total_tasks"`
	TasksByState              map[string]int `json:"tasks_by_state"`
	TotalContexts             int            `json:"total_contexts"`
	ContextsWithTasks         int            `json:"contexts_with_tasks"`
	AverageTasksPerContext    float64        `json:"average_tasks_per_context"`
	TotalMessages             int            `json:"total_messages"`
	AverageMessagesPerContext float64        `json:"average_messages_per_context"`
}

StorageStats provides statistics about the storage

type StreamEvent added in v0.9.4

type StreamEvent interface {
	// GetEventType returns the type of the streaming event (delta, status, error, etc.)
	GetEventType() string

	// GetData returns the event data
	GetData() interface{}
}

StreamEvent represents a streaming event that can be sent to clients

type StreamableTaskHandler added in v0.9.4

type StreamableTaskHandler interface {
	// HandleStreamingTask processes a task and returns a channel of streaming events
	// The channel should be closed when streaming is complete
	HandleStreamingTask(ctx context.Context, task *types.Task, message *types.Message) (<-chan StreamEvent, error)

	// SetAgent sets the OpenAI-compatible agent for the task handler
	SetAgent(agent OpenAICompatibleAgent)

	// GetAgent returns the configured OpenAI-compatible agent
	GetAgent() OpenAICompatibleAgent
}

StreamableTaskHandler defines how to handle streaming task processing This interface should be implemented by streaming task handlers that need to return real-time data

type StreamingNotImplementedError

type StreamingNotImplementedError struct{}

StreamingNotImplementedError represents an error for unimplemented streaming

func (*StreamingNotImplementedError) Error

type TaskCompleteStreamEvent added in v0.9.4

type TaskCompleteStreamEvent struct {
	Task *types.Task
}

TaskCompleteStreamEvent represents a task completion streaming event

func (*TaskCompleteStreamEvent) GetData added in v0.9.4

func (e *TaskCompleteStreamEvent) GetData() interface{}

func (*TaskCompleteStreamEvent) GetEventType added in v0.9.4

func (e *TaskCompleteStreamEvent) GetEventType() string

type TaskFilter added in v0.9.0

type TaskFilter struct {
	State     *types.TaskState
	ContextID *string
	Limit     int
	Offset    int
	SortBy    TaskSortField
	SortOrder SortOrder
}

TaskFilter defines filtering criteria for listing tasks

type TaskHandler

type TaskHandler interface {
	// HandleTask processes a task and returns the updated task
	// This is where the main business logic should be implemented
	HandleTask(ctx context.Context, task *types.Task, message *types.Message) (*types.Task, error)

	// SetAgent sets the OpenAI-compatible agent for the task handler
	SetAgent(agent OpenAICompatibleAgent)

	// GetAgent returns the configured OpenAI-compatible agent
	GetAgent() OpenAICompatibleAgent
}

TaskHandler defines how to handle task processing This interface should be implemented by domain-specific task handlers

type TaskInterruptedStreamEvent added in v0.9.4

type TaskInterruptedStreamEvent struct {
	Task   *types.Task
	Reason string
}

TaskInterruptedStreamEvent represents a task interruption streaming event

func (*TaskInterruptedStreamEvent) GetData added in v0.9.4

func (e *TaskInterruptedStreamEvent) GetData() interface{}

func (*TaskInterruptedStreamEvent) GetEventType added in v0.9.4

func (e *TaskInterruptedStreamEvent) GetEventType() string

type TaskManager

type TaskManager interface {
	// CreateTask creates a new task and stores it
	CreateTask(contextID string, state types.TaskState, message *types.Message) *types.Task

	// CreateTaskWithHistory creates a new task with existing conversation history
	CreateTaskWithHistory(contextID string, state types.TaskState, message *types.Message, history []types.Message) *types.Task

	// UpdateState updates a task's state
	UpdateState(taskID string, state types.TaskState) error

	// UpdateTask updates a complete task (including history, state, and message)
	UpdateTask(task *types.Task) error

	// UpdateError updates a task to failed state with an error message
	UpdateError(taskID string, message *types.Message) error

	// GetTask retrieves a task by ID
	GetTask(taskID string) (*types.Task, bool)

	// ListTasks retrieves a list of tasks based on the provided parameters
	ListTasks(params types.TaskListParams) (*types.TaskList, error)

	// CancelTask cancels a task
	CancelTask(taskID string) error

	// CleanupCompletedTasks removes old completed tasks from memory
	CleanupCompletedTasks()

	// PollTaskStatus periodically checks the status of a task until it is completed or failed
	PollTaskStatus(taskID string, interval time.Duration, timeout time.Duration) (*types.Task, error)

	// GetConversationHistory retrieves conversation history for a context ID
	GetConversationHistory(contextID string) []types.Message

	// UpdateConversationHistory updates conversation history for a context ID
	UpdateConversationHistory(contextID string, messages []types.Message)

	// SetTaskPushNotificationConfig sets push notification configuration for a task
	SetTaskPushNotificationConfig(config types.TaskPushNotificationConfig) (*types.TaskPushNotificationConfig, error)

	// GetTaskPushNotificationConfig gets push notification configuration for a task
	GetTaskPushNotificationConfig(params types.GetTaskPushNotificationConfigParams) (*types.TaskPushNotificationConfig, error)

	// ListTaskPushNotificationConfigs lists all push notification configurations for a task
	ListTaskPushNotificationConfigs(params types.ListTaskPushNotificationConfigParams) ([]types.TaskPushNotificationConfig, error)

	// DeleteTaskPushNotificationConfig deletes a push notification configuration
	DeleteTaskPushNotificationConfig(params types.DeleteTaskPushNotificationConfigParams) error

	// PauseTaskForInput pauses a task waiting for additional input from the client
	PauseTaskForInput(taskID string, message *types.Message) error

	// ResumeTaskWithInput resumes a paused task with new input from the client
	ResumeTaskWithInput(taskID string, message *types.Message) error

	// IsTaskPaused checks if a task is currently paused (in input-required state)
	IsTaskPaused(taskID string) (bool, error)

	// SetRetentionConfig sets the task retention configuration and starts automatic cleanup
	SetRetentionConfig(retentionConfig config.TaskRetentionConfig)

	// StopCleanup stops the automatic cleanup process
	StopCleanup()
}

TaskManager defines task lifecycle management

type TaskNotCancelableError added in v0.9.0

type TaskNotCancelableError struct {
	TaskID string
	State  types.TaskState
}

TaskNotCancelableError represents an error when a task cannot be canceled due to its current state

func (*TaskNotCancelableError) Error added in v0.9.0

func (e *TaskNotCancelableError) Error() string

type TaskNotFoundError

type TaskNotFoundError struct {
	TaskID string
}

TaskNotFoundError represents an error when a task is not found

func (*TaskNotFoundError) Error

func (e *TaskNotFoundError) Error() string

type TaskResultProcessor

type TaskResultProcessor interface {
	// ProcessToolResult processes a tool call result and returns a completion message if the task should be completed
	// Returns nil if the task should continue processing
	ProcessToolResult(toolCallResult string) *types.Message
}

TaskResultProcessor defines how to process tool call results for task completion

type TaskSortField added in v0.9.0

type TaskSortField string

TaskSortField defines the fields that can be used for sorting tasks

const (
	TaskSortFieldCreatedAt TaskSortField = "created_at"
	TaskSortFieldUpdatedAt TaskSortField = "updated_at"
	TaskSortFieldState     TaskSortField = "state"
	TaskSortFieldContextID TaskSortField = "context_id"
)

type TaskUpdateNotification

type TaskUpdateNotification struct {
	Type      string      `json:"type"`
	TaskID    string      `json:"taskId"`
	State     string      `json:"state"`
	Timestamp string      `json:"timestamp"`
	Task      *types.Task `json:"task,omitempty"`
}

TaskUpdateNotification represents the payload sent to webhook URLs

type Tool

type Tool interface {
	// GetName returns the name of the tool
	GetName() string

	// GetDescription returns a description of what the tool does
	GetDescription() string

	// GetParameters returns the JSON schema for the tool parameters
	GetParameters() map[string]any

	// Execute runs the tool with the provided arguments
	Execute(ctx context.Context, arguments map[string]any) (string, error)
}

Tool represents a single tool that can be executed

type ToolBox

type ToolBox interface {
	// GetTools returns all available tools in OpenAI function call format
	GetTools() []sdk.ChatCompletionTool

	// ExecuteTool executes a tool by name with the provided arguments
	// Returns the tool result as a string and any error that occurred
	ExecuteTool(ctx context.Context, toolName string, arguments map[string]any) (string, error)

	// GetToolNames returns a list of all available tool names
	GetToolNames() []string

	// HasTool checks if a tool with the given name exists
	HasTool(toolName string) bool
}

ToolBox defines the interface for a collection of tools that can be used by OpenAI-compatible agents

type ToolNotFoundError

type ToolNotFoundError struct {
	ToolName string
}

ToolNotFoundError represents an error when a requested tool is not found

func (*ToolNotFoundError) Error

func (e *ToolNotFoundError) Error() string

Directories

Path Synopsis
Code generated by counterfeiter.
Code generated by counterfeiter.
Code generated by counterfeiter.
Code generated by counterfeiter.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL