swarm

package module
v0.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 1, 2025 License: MIT Imports: 19 Imported by: 3

README

Swarm

An ergonomic and lightweight multi-agent orchestration framework inspired by OpenAI's Swarm. Designed for simplicity and efficiency, this framework empowers developers to build scalable and flexible multi-agent systems in Go.

Features:

  • 🚀 Lightweight Orchestration: Efficiently manage multi-agent systems with a minimalistic and performant design. Perfect for applications where simplicity and speed are critical.
  • 🛠️ Native Function Calls: Seamlessly integrate with your existing tools and services using native Go function calls. No complex wrappers or unnecessary abstractions—just straightforward integration.
  • Event-Driven Workflows: Build extensible and dynamic workflows driven by events. This approach ensures flexibility and adaptability for automating complex processes.
  • 🧩 Composable Architecture: Create sophisticated systems by combining simple, reusable components. The framework’s modular design encourages clean and maintainable code.

Getting Started

Add swarm to go mod by:

go get -u github.com/feiskyer/swarm-go

Examples

Setup environment variables first:

  • For OpenAI, set OPENAI_API_KEY and optional OPENAI_API_BASE for OpenAI API compatible AI service.
  • For Azure OpenAI, set AZURE_OPENAI_API_KE and AZURE_OPENAI_API_BASE.
Basic Agent

A basic agent with function calls:

package main

import (
    "fmt"
    "reflect"

    "github.com/feiskyer/swarm-go"
)

func main() {
    // Create a new agent
    agent := swarm.NewAgent("Assistant")
    agent.WithModel("gpt-4o").
        WithInstructions("You are a helpful assistant.")

    // Example function that the agent can call
    weatherFunc := swarm.NewAgentFunction(
        "getWeather",
        "Get the current weather for a given location. Requires a location parameter.",
        func(args map[string]interface{}) (interface{}, error) {
            location, ok := args["location"].(string)
            if !ok {
                return nil, fmt.Errorf("location not provided")
            }
            return fmt.Sprintf("The weather in %s is sunny", location), nil
        },
        []swarm.Parameter{{Name: "location", Type: reflect.TypeOf("string")}},
    )

    // Add function to agent
    agent.AddFunction(weatherFunc)

    // Run the demo loop
    swarm.RunDemoLoop(agent, nil, false, false)
}
Streaming Output

Use streaming output for your agent:

package main

import (
    "fmt"
    "reflect"

    "github.com/feiskyer/swarm-go"
)

func main() {
    // Create a new agent
    agent := swarm.NewAgent("Assistant")
    agent.WithModel("gpt-4o").
        WithInstructions("You are a helpful assistant.")

    // Example function that the agent can call
    weatherFunc := func(args map[string]interface{}) (interface{}, error) {
        location, ok := args["location"].(string)
        if !ok {
            return nil, fmt.Errorf("location not provided")
        }
        return fmt.Sprintf("The weather in %s is sunny", location), nil
    }

    // Add function to agent
    agent.AddFunction(swarm.NewAgentFunction(
        "getWeather",
        "Get the current weather for a given location. Requires a location parameter.",
        weatherFunc,
        []swarm.Parameter{{Name: "location", Type: reflect.TypeOf("string")}},
    ))

    // Run the demo loop
    swarm.RunDemoLoop(agent, nil, true, false)
}
Multi-agent handoff

Handoff example for multiple agents:

package main

import (
    "context"
    "fmt"
    "os"

    "github.com/feiskyer/swarm-go"
)

func main() {
    client := swarm.NewSwarm(swarm.NewOpenAIClient(os.Getenv("OPENAI_API_KEY")))

    englishAgent := swarm.NewAgent("English Agent").WithInstructions("You only speak English.")
    spanishAgent := swarm.NewAgent("Spanish Agent").WithInstructions("You only speak Spanish.")

    transferToSpanishAgent := swarm.NewAgentFunction(
        "transferToSpanishAgent",
        "Transfer spanish speaking users immediately.",
        func(args map[string]interface{}) (interface{}, error) {
            return spanishAgent, nil
        },
        []swarm.Parameter{},
    )
    englishAgent.AddFunction(transferToSpanishAgent)

    messages := []map[string]interface{}{
        {
            "role":    "user",
            "content": "Hola. ¿Como estás?",
        },
    }
    response, err := client.Run(context.TODO(), englishAgent, messages, nil, "gpt-4o", false, true, 10, true)
    if err != nil {
        fmt.Println(err)
        return
    }
    fmt.Println(response.Messages[len(response.Messages)-1]["content"])
}
Simple Workflow

Use sequential simple workflow.

Flexible Workflow

Use flexible workflow for event-driven multi-agent orchestration.

Flexible workflow with parallel tasks

Use flexible workflow for event-driven multi-agent orchestration with parallel tasks.

Contribution

The project is opensourced at github feiskyer/swarm-go with MIT License.

If you would like to contribute to the project, please follow these guidelines:

  1. Fork the repository and clone it to your local machine.
  2. Create a new branch for your changes.
  3. Make your changes and commit them with a descriptive commit message.
  4. Push your changes to your forked repository.
  5. Open a pull request to the main repository.

Documentation

Overview

Package swarm provides functionality for orchestrating interactions between agents and OpenAI's language models. It implements a flexible framework for building AI-powered workflows and agent-based systems.

The package supports:

  • Agent-based interactions with OpenAI models
  • Tool/function calling capabilities
  • Streaming and non-streaming responses
  • Context management and workflow orchestration
  • Custom function execution
  • Event-driven architecture for workflow management
  • Parallel task execution and coordination
  • Configurable retry policies and timeout handling

Key Components:

  • Agent: Represents an AI agent with specific instructions and capabilities
  • Workflow: Manages the execution of sequential or parallel tasks
  • Context: Handles state management and event propagation
  • Events: Provides event types for workflow coordination
  • OpenAI Client: Manages interactions with OpenAI's API

Index

Constants

View Source
const ContextVariablesName = "context_variables"

ContextVariablesName is the key used to store context variables in function arguments. This constant is used internally to pass context between function calls.

Variables

View Source
var (
	// ErrEmptyMessages indicates that the messages array is empty when making a request.
	// This error is returned when attempting to run an agent interaction without any initial messages.
	ErrEmptyMessages = errors.New("messages cannot be empty")

	// ErrInvalidToolCall indicates that a tool call request was malformed or invalid.
	// This can occur when the tool call parameters don't match the function signature.
	ErrInvalidToolCall = errors.New("invalid tool call")
)
View Source
var (
	ErrInvalidName        = fmt.Errorf("invalid name")
	ErrInvalidModel       = fmt.Errorf("invalid model")
	ErrInvalidFunction    = fmt.Errorf("invalid function")
	ErrInvalidParameter   = fmt.Errorf("invalid parameter")
	ErrInvalidInstruction = fmt.Errorf("invalid instruction type")
)

Common errors

Functions

func DebugPrint

func DebugPrint(debug bool, args ...interface{})

DebugPrint prints timestamped debug information if debugging is enabled. It formats the output with ANSI color codes for better visibility.

func FunctionToJSON

func FunctionToJSON(f AgentFunction) map[string]interface{}

FunctionToJSON converts a Go function to OpenAI function format specification. It analyzes the function's parameters and creates a JSON schema representation that can be used with the OpenAI API.

Parameters:

  • f: AgentFunction interface to convert

Returns:

  • map[string]interface{}: JSON schema representation of the function

func MergeFields

func MergeFields(target, source map[string]interface{})

MergeFields merges source fields into target map recursively

func NewEvent added in v0.2.0

func NewEvent[T any](eventType EventType, data T) *T

NewEvent creates a new event of type T with the given event type and data.

func RunDemoLoop

func RunDemoLoop(startingAgent *Agent, contextVariables map[string]interface{}, stream bool, debug bool)

RunDemoLoop starts an interactive CLI session for testing and demonstrating agent capabilities. It provides a REPL-like interface for communicating with AI agents and visualizing their responses and tool calls.

Parameters:

  • startingAgent: initial agent configuration
  • contextVariables: map of context variables for the session
  • stream: enable streaming mode for responses
  • debug: enable debug output

func ToMap added in v0.2.0

func ToMap(v interface{}) (map[string]interface{}, error)

ToMap converts an interface{} to map[string]interface{} using JSON marshaling.

func ToStruct added in v0.2.0

func ToStruct(m map[string]interface{}, v interface{}) error

ToStruct converts a map[string]interface{} to a struct using JSON marshaling.

Types

type Agent

type Agent struct {
	// Name is the unique identifier for the agent
	Name string
	// Instructions define the agent's behavior and role
	Instructions interface{}
	// Functions are the tools available to this agent
	Functions []AgentFunction
	// Model specifies which OpenAI model to use (e.g., "gpt-4")
	Model string
	// Temperature controls randomness in responses (0.0 to 2.0)
	Temperature float32
	// MaxTokens limits the response length
	MaxTokens int
	// ToolChoice specifies how the agent should use tools
	ToolChoice *openai.ChatCompletionToolChoiceOptionUnionParam
	// ParallelToolCalls indicates if multiple tools can be called in parallel
	ParallelToolCalls bool
}

Agent represents an AI agent with its configuration and capabilities.

func NewAgent

func NewAgent(name string) *Agent

NewAgent creates a new Agent with default values.

func (*Agent) AddFunction

func (a *Agent) AddFunction(f AgentFunction) *Agent

AddFunction adds a function to the agent's capabilities and returns the agent for chaining.

func (*Agent) WithInstructions

func (a *Agent) WithInstructions(instructions interface{}) *Agent

WithInstructions sets the instructions for the agent and returns the agent for chaining.

func (*Agent) WithMaxTokens

func (a *Agent) WithMaxTokens(tokens int) *Agent

WithMaxTokens sets the maximum tokens for the agent and returns the agent for chaining.

func (*Agent) WithModel

func (a *Agent) WithModel(model string) *Agent

WithModel sets the model for the agent and returns the agent for chaining.

func (*Agent) WithTemperature

func (a *Agent) WithTemperature(temp float32) *Agent

WithTemperature sets the temperature for the agent and returns the agent for chaining.

type AgentFunction

type AgentFunction interface {
	// Call executes the function with given arguments
	Call(args map[string]interface{}) (interface{}, error)
	// Description returns the function's documentation
	Description() string
	// Name returns the function's name
	Name() string
	// Parameters returns the function's parameters
	Parameters() []Parameter
	// Validate checks if the function is properly configured
	Validate() error
}

AgentFunction represents a callable function that can be used by an agent.

func NewAgentFunction

func NewAgentFunction(name string, desc string, fn func(map[string]interface{}) (interface{}, error), parameters []Parameter) AgentFunction

NewAgentFunction creates a new AgentFunction from a function and description

type BaseEvent added in v0.2.0

type BaseEvent struct {
	// contains filtered or unexported fields
}

BaseEvent provides common functionality for all event types. It implements the basic Event interface and can be embedded in specific event types.

func NewBaseEvent added in v0.2.0

func NewBaseEvent(eventType EventType, data map[string]interface{}) *BaseEvent

NewBaseEvent creates a new BaseEvent with the given event type and data.

func (*BaseEvent) Data added in v0.2.0

func (e *BaseEvent) Data() map[string]interface{}

Data returns the event data

func (*BaseEvent) Get added in v0.2.0

func (e *BaseEvent) Get(key string) interface{}

Get retrieves a value from the event data by key.

func (*BaseEvent) Set added in v0.2.0

func (e *BaseEvent) Set(key string, value interface{})

Set stores a value in the event data with the given key.

func (*BaseEvent) SetData added in v0.2.0

func (e *BaseEvent) SetData(data map[string]interface{})

SetData replaces the entire event data map.

func (*BaseEvent) Type added in v0.2.0

func (e *BaseEvent) Type() EventType

Type returns the event type

func (*BaseEvent) Validate added in v0.2.0

func (e *BaseEvent) Validate() error

Validate validates the base event

type BaseStep added in v0.2.0

type BaseStep struct {
	// contains filtered or unexported fields
}

BaseStep provides common step functionality

func (*BaseStep) Config added in v0.2.0

func (s *BaseStep) Config() StepConfig

Config returns the step's configuration

func (*BaseStep) EventType added in v0.2.0

func (s *BaseStep) EventType() EventType

EventType returns the type of event this step handles

func (*BaseStep) Handle added in v0.2.0

func (s *BaseStep) Handle(ctx *Context, event Event) (Event, error)

Handle executes the step's handler function

func (*BaseStep) Name added in v0.2.0

func (s *BaseStep) Name() string

Name returns the step's name

type Context added in v0.2.0

type Context struct {
	// contains filtered or unexported fields
}

Context represents a workflow execution context that manages state and event flow. It wraps a standard context.Context and provides additional functionality for event handling, state management, and workflow control.

The Context is safe for concurrent use by multiple goroutines.

func NewContext added in v0.2.0

func NewContext(ctx context.Context) *Context

NewContext creates a new workflow Context with the provided parent context. It initializes event channels with a buffer size of 100 and creates an empty state map.

func (*Context) Cancel added in v0.2.0

func (c *Context) Cancel()

Cancel cancels the Context and all operations using it. After calling Cancel, all event channels will be closed and subsequent operations will return context.Canceled error.

func (*Context) Clear added in v0.2.0

func (c *Context) Clear()

Clear removes all key-value pairs from the Context's state map. This operation is atomic and thread-safe.

func (*Context) Clone added in v0.2.0

func (c *Context) Clone() map[string]interface{}

Clone creates and returns a deep copy of the Context's state map. The returned map is independent of the Context and can be safely modified.

func (*Context) Context added in v0.2.0

func (c *Context) Context() context.Context

Context returns the underlying context.Context that this Context wraps.

func (*Context) Delete added in v0.2.0

func (c *Context) Delete(key string)

Delete removes a key and its associated value from the Context's state map. If the key doesn't exist, the operation is a no-op.

func (*Context) Events added in v0.2.0

func (c *Context) Events() <-chan Event

Events returns a receive-only channel for consuming workflow events. The channel has a buffer size of 100 events.

func (*Context) Get added in v0.2.0

func (c *Context) Get(key string) (interface{}, bool)

Get retrieves a value from the Context's state map. Returns the value and a boolean indicating whether the key was found.

func (*Context) GetBool added in v0.2.0

func (c *Context) GetBool(key string) (bool, bool)

GetBool retrieves a bool value from the Context's state map. Returns the bool value and a boolean indicating whether the key was found and the value was of type bool.

func (*Context) GetInt added in v0.2.0

func (c *Context) GetInt(key string) (int, bool)

GetInt retrieves an int value from the Context's state map. Returns the int value and a boolean indicating whether the key was found and the value was of type int.

func (*Context) GetMap added in v0.2.0

func (c *Context) GetMap(key string) (map[string]interface{}, bool)

GetMap retrieves a map value from the Context's state map. Returns the map value and a boolean indicating whether the key was found and the value was of type map[string]interface{}.

func (*Context) GetString added in v0.2.0

func (c *Context) GetString(key string) (string, bool)

GetString retrieves a string value from the Context's state map. Returns the string value and a boolean indicating whether the key was found and the value was of type string.

func (*Context) Has added in v0.2.0

func (c *Context) Has(key string) bool

Has checks if a key exists in the Context's state map. Returns true if the key exists, false otherwise.

func (*Context) Keys added in v0.2.0

func (c *Context) Keys() []string

Keys returns a slice containing all keys present in the Context's state map. The order of keys in the returned slice is not guaranteed to be stable.

func (*Context) Len added in v0.2.0

func (c *Context) Len() int

Len returns the number of key-value pairs in the Context's state map.

func (*Context) SendEvent added in v0.2.0

func (c *Context) SendEvent(event Event) error

SendEvent sends an event to the workflow's event channel. It validates the event before sending and returns an error if the event is nil or invalid. The event is also sent to the stream channel if there are listeners.

Returns an error if the context is canceled or if the event is invalid.

func (*Context) Set added in v0.2.0

func (c *Context) Set(key string, value interface{})

Set stores a key-value pair in the Context's state map. The operation is thread-safe and will overwrite any existing value for the key.

func (*Context) Stream added in v0.2.0

func (c *Context) Stream() <-chan Event

Stream returns a receive-only channel for streaming workflow events. Unlike Events(), this channel is intended for real-time monitoring and may drop events if no receiver is ready.

type ErrorEvent added in v0.2.0

type ErrorEvent struct {
	BaseEvent
	Error     error  `json:"error"`
	StepName  string `json:"step_name,omitempty"`
	TaskID    string `json:"task_id,omitempty"`
	Retriable bool   `json:"retriable"`
}

ErrorEvent represents an error in the workflow

func NewErrorEvent added in v0.2.0

func NewErrorEvent(err error) *ErrorEvent

NewErrorEvent creates a new ErrorEvent with the given error.

func (*ErrorEvent) Validate added in v0.2.0

func (e *ErrorEvent) Validate() error

Validate checks if the ErrorEvent is properly configured.

func (*ErrorEvent) WithRetriable added in v0.2.0

func (e *ErrorEvent) WithRetriable(retriable bool) *ErrorEvent

WithRetriable sets whether the error is retriable and returns the event.

func (*ErrorEvent) WithStep added in v0.2.0

func (e *ErrorEvent) WithStep(stepName string) *ErrorEvent

WithStep adds step information to the error event and returns the event.

func (*ErrorEvent) WithTask added in v0.2.0

func (e *ErrorEvent) WithTask(taskID string) *ErrorEvent

WithTask adds task information to the error event and returns the event.

type Event added in v0.2.0

type Event interface {
	// Type returns the event type name that identifies this event.
	Type() EventType

	// Data returns the event's associated data as a map.
	// The map contains event-specific information needed for processing.
	Data() map[string]interface{}

	// Validate checks if the event is properly configured.
	// Returns an error if validation fails, nil otherwise.
	Validate() error
}

Event defines the core interface for all workflow events. Events are the primary mechanism for communication between workflow components.

type EventType added in v0.2.0

type EventType string

EventType represents the type of an event in the workflow. It is used to identify and route different kinds of events through the workflow.

const (
	// EventStart indicates the beginning of a workflow execution
	EventStart EventType = "StartEvent"
	// EventStop signals the successful completion of a workflow
	EventStop EventType = "StopEvent"
	// EventError represents an error condition in the workflow
	EventError EventType = "ErrorEvent"
	// EventInputRequired signals that user input is needed to proceed
	EventInputRequired EventType = "InputRequiredEvent"
	// EventHumanResponse represents a response received from human interaction
	EventHumanResponse EventType = "HumanResponseEvent"
	// EventParallel indicates tasks that should be executed concurrently
	EventParallel EventType = "ParallelEvent"
	// EventParallelResult represents the aggregated results from parallel task execution
	EventParallelResult EventType = "ParallelResultEvent"
)

type EventValidator added in v0.2.0

type EventValidator interface {
	// Validate checks if the event data is valid.
	// Returns an error if validation fails, nil otherwise.
	Validate() error
}

EventValidator defines the interface for validating event data. Implementations should check if the event data meets required criteria.

type Function

type Function struct {
	Name      string `json:"name"`      // Name of the function to call
	Arguments string `json:"arguments"` // JSON-encoded arguments for the function
}

Function represents a function call specification, including its name and arguments. This structure is used to marshal function calls between the AI and the system.

type OpenAIClient

type OpenAIClient interface {
	// CreateChatCompletion sends a chat completion request to the OpenAI API and returns
	// the complete response. It handles both standard GPT model interactions.
	//
	// Parameters:
	//   - ctx: The context for the API request
	//   - params: Configuration parameters for the chat completion
	//
	// Returns a ChatCompletion response or an error if the request fails.
	CreateChatCompletion(ctx context.Context, params openai.ChatCompletionNewParams) (*openai.ChatCompletion, error)

	// CreateChatCompletionStream creates a streaming connection to the OpenAI API for
	// chat completions. It returns responses as they are generated.
	//
	// Parameters:
	//   - ctx: The context for the API request
	//   - params: Configuration parameters for the chat completion
	//
	// Returns a Stream of ChatCompletionChunk or an error if the stream creation fails.
	CreateChatCompletionStream(ctx context.Context, params openai.ChatCompletionNewParams) (*ssestream.Stream[openai.ChatCompletionChunk], error)
}

OpenAIClient defines the interface for OpenAI API interactions. It provides methods for creating both standard and streaming chat completions using the OpenAI API.

func NewAzureOpenAIClient

func NewAzureOpenAIClient(apiKey, endpoint, apiVersion string) OpenAIClient

NewAzureOpenAIClient creates a new OpenAI client wrapper configured for Azure OpenAI Services. It returns nil if either the API key or endpoint is empty.

Parameters:

  • apiKey: The Azure OpenAI API key
  • endpoint: The Azure OpenAI endpoint URL
  • apiVersion: The Azure OpenAI API version

func NewOpenAIClient

func NewOpenAIClient(apiKey string) OpenAIClient

NewOpenAIClient creates a new OpenAI client wrapper with the provided API key. It returns nil if the API key is empty.

Parameters:

  • apiKey: The OpenAI API key for authentication

func NewOpenAIClientWithBaseURL added in v0.1.3

func NewOpenAIClientWithBaseURL(apiKey string, baseURL string) OpenAIClient

NewOpenAIClientWithBaseURL creates a new OpenAI client wrapper with a custom base URL. This allows for using alternative OpenAI-compatible API endpoints.

Parameters:

  • apiKey: The OpenAI API key for authentication
  • baseURL: The custom base URL for the API endpoint

type ParallelEvent added in v0.2.0

type ParallelEvent struct {
	BaseEvent
	Tasks      []Task `json:"tasks"`
	SourceStep string `json:"source_step"` // Name of the step that generated this parallel event
}

ParallelEvent represents an event that triggers parallel execution

func NewParallelEvent added in v0.2.0

func NewParallelEvent(tasks []Task, sourceStep string) (*ParallelEvent, error)

NewParallelEvent creates a new ParallelEvent with the given tasks and source step.

func (*ParallelEvent) GetTasks added in v0.2.0

func (e *ParallelEvent) GetTasks() []Task

GetTasks returns the tasks to be executed in parallel.

func (*ParallelEvent) Validate added in v0.2.0

func (e *ParallelEvent) Validate() error

Validate checks if the ParallelEvent is properly configured.

type ParallelResultEvent added in v0.2.0

type ParallelResultEvent struct {
	BaseEvent
	Results    map[string]interface{} `json:"results"`
	Errors     map[string]error       `json:"errors"`
	Successful int                    `json:"successful"`
	Failed     int                    `json:"failed"`
	Duration   time.Duration          `json:"duration"`
	SourceStep string                 `json:"source_step"` // Name of the step that generated the original parallel event
}

ParallelResultEvent represents the results of parallel execution

func NewParallelResultEvent added in v0.2.0

func NewParallelResultEvent(results map[string]interface{}, errors map[string]error, duration time.Duration, sourceStep string) *ParallelResultEvent

NewParallelResultEvent creates a new ParallelResultEvent with the given results, errors, duration and source step.

func (*ParallelResultEvent) GetErrors added in v0.2.0

func (e *ParallelResultEvent) GetErrors() map[string]error

GetErrors returns the errors from parallel execution.

func (*ParallelResultEvent) GetResults added in v0.2.0

func (e *ParallelResultEvent) GetResults() map[string]interface{}

GetResults returns the results of parallel execution.

func (*ParallelResultEvent) GetStats added in v0.2.0

func (e *ParallelResultEvent) GetStats() (successful int, failed int, duration time.Duration)

GetStats returns execution statistics including successful count, failed count and duration.

func (*ParallelResultEvent) Validate added in v0.2.0

func (e *ParallelResultEvent) Validate() error

Validate checks if the ParallelResultEvent is properly configured.

type Parameter

type Parameter struct {
	Name        string
	Description string
	Type        reflect.Type
	Required    bool
}

Parameter represents a function parameter with its metadata

func (Parameter) Validate

func (p Parameter) Validate() error

Validate checks if the parameter is properly configured

type Response

type Response struct {
	// Messages contains the conversation history
	Messages []map[string]interface{}

	// Agent is the current active agent (may change during conversation)
	Agent *Agent

	// ContextVariables stores shared context between function calls
	ContextVariables map[string]interface{}

	// TokensUsed tracks the number of tokens used in this response
	TokensUsed int

	// Cost tracks the estimated cost of this response
	Cost float64
}

Response encapsulates the result of an agent interaction. It includes messages generated, context updates, and any agent switches.

type Result

type Result struct {
	// Value contains the function's string output
	Value string

	// Agent optionally specifies a new agent to switch to
	Agent *Agent

	// ContextVariables allows functions to update shared context
	ContextVariables map[string]interface{}

	// Error contains any error that occurred during function execution
	Error error
}

Result represents the outcome of a function execution. It includes both the execution result and any error that occurred.

type RetryPolicy added in v0.2.0

type RetryPolicy struct {
	// MaxRetries is the maximum number of retry attempts
	MaxRetries int

	// InitialInterval is the delay before first retry
	InitialInterval time.Duration

	// MaxInterval caps the maximum delay between retries
	MaxInterval time.Duration

	// Multiplier controls exponential backoff rate
	Multiplier float64

	// Errors specifies which errors trigger retries. Empty means all errors.
	Errors []error
}

RetryPolicy configures step execution retry behavior using exponential backoff.

func DefaultRetryPolicy added in v0.2.0

func DefaultRetryPolicy() *RetryPolicy

DefaultRetryPolicy returns the default retry policy

type SimpleAgentFunction

type SimpleAgentFunction struct {
	CallFn     func(map[string]interface{}) (interface{}, error)
	DescString string
	NameString string

	// TODO: auto infer parameters from function signature
	ParametersList []Parameter
}

SimpleAgentFunction is a helper struct to create AgentFunction from a simple function

func (*SimpleAgentFunction) Call

func (f *SimpleAgentFunction) Call(args map[string]interface{}) (interface{}, error)

Call executes the function with the given arguments.

func (*SimpleAgentFunction) Description

func (f *SimpleAgentFunction) Description() string

Description returns the function's documentation.

func (*SimpleAgentFunction) Name

func (f *SimpleAgentFunction) Name() string

Name returns the function's name.

func (*SimpleAgentFunction) Parameters

func (f *SimpleAgentFunction) Parameters() []Parameter

Parameters returns the function's parameters.

func (*SimpleAgentFunction) Validate

func (f *SimpleAgentFunction) Validate() error

Validate checks if the function is properly configured.

type SimpleFlow added in v0.2.0

type SimpleFlow struct {
	// Name is the name of the workflow.
	Name string `yaml:"name" json:"name"`
	// Model specifies the model used in the workflow.
	Model string `yaml:"model" json:"model"`
	// MaxTurns defines the maximum number of turns allowed in the workflow.
	MaxTurns int `yaml:"max_turns" json:"max_turns"`
	// System represents the system prompt for the workflow.
	System string `yaml:"system" json:"system"`
	// Steps is a list of steps involved in the workflow.
	Steps []SimpleFlowStep `yaml:"steps" json:"steps"`
	// Verbose specifies whether to print verbose logs.
	Verbose bool `yaml:"verbose" json:"verbose"`
	// Timeout specifies the timeout for the entire workflow.
	Timeout time.Duration `yaml:"timeout" json:"timeout"`
	// JSONMode indicates whether to use JSON format for input and output.
	JSONMode bool `yaml:"json_mode" json:"json_mode"`
}

SimpleFlow represents a sequential workflow that executes a series of steps using AI agents. Each step in the workflow is executed in order, with the ability to pass context between steps and manage timeouts.

func LoadSimpleFlow added in v0.2.0

func LoadSimpleFlow(path string) (*SimpleFlow, error)

LoadSimpleFlow creates a new SimpleFlow instance from a YAML configuration file. The function reads the file, unmarshals the YAML content, and initializes the workflow.

func (*SimpleFlow) Initialize added in v0.2.0

func (w *SimpleFlow) Initialize() error

Initialize prepares the workflow for execution by setting up default values, configuring agents, and establishing connections between steps. It must be called before running the workflow.

The function performs the following setup:

  • Sets default values for MaxTurns and Timeout if not specified
  • Validates the workflow has at least one step
  • Initializes agents for each step
  • Configures step-specific timeouts
  • Sets up handoff functions between consecutive steps

Returns an error if the workflow configuration is invalid.

func (*SimpleFlow) Run added in v0.2.0

func (w *SimpleFlow) Run(ctx context.Context, client *Swarm) (string, []map[string]interface{}, error)

Run executes all steps in the workflow sequentially, managing timeouts and passing context between steps. It initializes the workflow if needed and handles any errors that occur during execution.

Parameters:

  • ctx: Context for timeout and cancellation
  • client: The Swarm client for executing AI operations

Returns:

  • string: The content from the last step
  • []map[string]interface{}: The complete conversation history
  • error: Any error encountered during execution

func (*SimpleFlow) Save added in v0.2.0

func (w *SimpleFlow) Save(path string) error

Save persists the workflow configuration to a YAML file at the specified path. The function marshals the workflow structure to YAML format and writes it to the filesystem.

type SimpleFlowStep added in v0.2.0

type SimpleFlowStep struct {
	// Name is the name of the workflow step.
	Name string `yaml:"name" json:"name"`
	// Instructions are the instructions for the workflow step.
	Instructions string `yaml:"instructions" json:"instructions"`
	// Inputs are the inputs required for the workflow step.
	Inputs map[string]interface{} `yaml:"inputs" json:"inputs"`
	// Timeout specifies the timeout for this step. If not set, uses workflow timeout.
	Timeout time.Duration `yaml:"timeout" json:"timeout"`

	// Agent is the agent responsible for executing the workflow step.
	Agent *Agent `yaml:"-" json:"-"`
	// Functions are the functions that the agent can perform in this workflow step.
	Functions []AgentFunction `yaml:"-" json:"-"`
}

SimpleFlowStep defines a single step within a SimpleFlow workflow. Each step represents an atomic operation performed by an AI agent with specific instructions and capabilities.

type SimpleStepResult added in v0.2.0

type SimpleStepResult struct {
	StepName string
	Content  string
	Messages []map[string]interface{}
	Error    error
}

SimpleStepResult contains the output and metadata from executing a workflow step.

type StartEvent added in v0.2.0

type StartEvent struct {
	BaseEvent
}

StartEvent represents the initialization of a workflow. It carries the initial inputs needed to begin workflow execution.

func NewStartEvent added in v0.2.0

func NewStartEvent(inputs map[string]interface{}) *StartEvent

NewStartEvent creates a new StartEvent with the given inputs.

func (*StartEvent) Validate added in v0.2.0

func (e *StartEvent) Validate() error

Validate checks if the StartEvent is properly configured.

type Step added in v0.2.0

type Step interface {
	// Name returns the step's unique identifier
	Name() string

	// EventType returns the type of event this step handles
	EventType() EventType

	// Config returns the step's configuration
	Config() StepConfig

	// Handle processes an event and returns a new event or error
	Handle(ctx *Context, event Event) (Event, error)
}

Step defines the interface for workflow steps

func NewStartStep added in v0.2.0

func NewStartStep(handler StepFunc, retryPolicy *RetryPolicy) Step

NewStartStep creates a new start event handler step

func NewStep added in v0.2.0

func NewStep(name string, eventType EventType, handler StepFunc, config StepConfig) Step

NewStep creates a new step with the given configuration

type StepConfig added in v0.2.0

type StepConfig struct {
	MaxParallel int64
	Timeout     time.Duration
	RetryPolicy *RetryPolicy
}

StepConfig holds step configuration settings

type StepFunc added in v0.2.0

type StepFunc func(ctx *Context, event Event) (Event, error)

StepFunc represents a workflow step function that processes an event and returns a new event or error. The function receives a workflow context and an input event.

type StopEvent added in v0.2.0

type StopEvent struct {
	BaseEvent
	// Result contains the final output of the workflow
	Result interface{} `json:"result"`
}

StopEvent signals the successful completion of a workflow. It contains the final result of the workflow execution.

func NewStopEvent added in v0.2.0

func NewStopEvent(result interface{}) *StopEvent

NewStopEvent creates a new StopEvent with the given result.

func (*StopEvent) Validate added in v0.2.0

func (e *StopEvent) Validate() error

Validate checks if the StopEvent is properly configured.

type StreamResponse

type StreamResponse struct {
	Content   string     `json:"content,omitempty"`    // The text content of the response
	Sender    string     `json:"sender,omitempty"`     // The identity of the sender
	ToolCalls []ToolCall `json:"tool_calls,omitempty"` // Any function calls made by the agent
	Delim     string     `json:"delim,omitempty"`      // Delimiter for streaming chunks
	Response  *Response  `json:"response,omitempty"`   // Complete response object if present
}

StreamResponse represents a streaming response chunk from an AI agent.

type Swarm

type Swarm struct {
	// Client is the interface to OpenAI's API
	Client OpenAIClient
}

Swarm orchestrates interactions between agents and OpenAI's language models. It handles message processing, tool execution, and response management.

func NewDefaultSwarm added in v0.1.2

func NewDefaultSwarm() (*Swarm, error)

NewDefaultSwarm creates a new Swarm instance with default OpenAI client configuration. It uses the OPENAI_API_KEY environment variable for authentication. Returns an error if the API key is not set or if client creation fails.

func NewSwarm

func NewSwarm(client OpenAIClient) *Swarm

NewSwarm creates a new Swarm instance with the provided OpenAI client.

Parameters:

  • client: An implementation of OpenAIClient interface for API communication

Returns:

  • *Swarm: A new Swarm instance

func (*Swarm) Run

func (s *Swarm) Run(
	ctx context.Context,
	agent *Agent,
	messages []map[string]interface{},
	contextVariables map[string]interface{},
	modelOverride string,
	stream bool,
	debug bool,
	maxTurns int,
	executeTools bool,
	jsonMode bool,
) (*Response, error)

Run executes a single interaction with the OpenAI model using the provided agent configuration. It supports both streaming and non-streaming modes, tool execution, and debug logging.

Parameters:

  • ctx: Context for the request
  • agent: Agent configuration including tools and instructions
  • messages: Conversation history
  • contextVariables: Variables to be used in the conversation
  • modelOverride: Optional model override (uses agent's default if empty)
  • stream: Enable streaming mode
  • debug: Enable debug logging
  • maxTurns: Maximum number of interaction turns
  • executeTools: Whether to execute tool calls

Returns a Response containing the model's output and any tool execution results, or an error if the interaction fails.

func (*Swarm) RunAndStream

func (s *Swarm) RunAndStream(
	ctx context.Context,
	agent *Agent,
	messages []map[string]interface{},
	contextVariables map[string]interface{},
	modelOverride string,
	debug bool,
	maxTurns int,
	executeTools bool,
	jsonMode bool,
) (<-chan map[string]interface{}, error)

RunAndStream executes an interaction with the OpenAI model and returns a channel that streams the response tokens as they arrive.

Parameters:

  • ctx: Context for the request
  • agent: Agent configuration including tools and instructions
  • messages: Conversation history
  • contextVariables: Variables to be used in the conversation
  • modelOverride: Optional model override (uses agent's default if empty)
  • debug: Enable debug logging
  • maxTurns: Maximum number of interaction turns
  • executeTools: Whether to execute tool calls

Returns a channel of response tokens or an error if the streaming setup fails.

type Task added in v0.2.0

type Task struct {
	// ID uniquely identifies the task
	ID string `json:"id"`
	// Type indicates the kind of task to be executed
	Type EventType `json:"type"`
	// Payload contains the data needed for task execution
	Payload interface{} `json:"payload"`
	// Status represents the current state of the task
	Status TaskStatus `json:"status"`
	// Error holds any error that occurred during task execution
	Error error `json:"error,omitempty"`
	// Priority determines the order of execution when multiple tasks are queued
	Priority int `json:"priority"`
	// Timeout specifies the maximum duration allowed for task execution
	Timeout time.Duration `json:"timeout"`
}

Task represents a unit of work to be executed as part of a workflow. Tasks can be executed sequentially or in parallel depending on the workflow configuration.

func NewTask added in v0.2.0

func NewTask(id string, eventType EventType, payload interface{}) Task

NewTask creates a new task with default values

func (Task) Validate added in v0.2.0

func (t Task) Validate() error

Validate validates the task configuration

func (Task) WithPriority added in v0.2.0

func (t Task) WithPriority(priority int) Task

WithPriority sets the task priority and returns the task.

func (Task) WithTimeout added in v0.2.0

func (t Task) WithTimeout(timeout time.Duration) Task

WithTimeout sets the task timeout and returns the task.

type TaskStatus added in v0.2.0

type TaskStatus string

TaskStatus represents the status of a task

const (
	// TaskStatusPending indicates the task is waiting to be executed
	TaskStatusPending TaskStatus = "pending"
	// TaskStatusRunning indicates the task is currently being executed
	TaskStatusRunning TaskStatus = "running"
	// TaskStatusComplete indicates the task has completed successfully
	TaskStatusComplete TaskStatus = "complete"
	// TaskStatusFailed indicates the task has failed
	TaskStatusFailed TaskStatus = "failed"
	// TaskStatusCancelled indicates the task has been cancelled
	TaskStatusCancelled TaskStatus = "cancelled"
)

type ToolCall

type ToolCall struct {
	Function Function `json:"function"` // The function being called
}

ToolCall represents a call to a specific tool or function by an AI agent. It encapsulates the function details and its invocation parameters.

type Workflow added in v0.1.2

type Workflow struct {
	// contains filtered or unexported fields
}

Workflow represents an executable workflow composed of ordered steps. Each workflow must have a start event handler as the entry point and at least one stop event to terminate execution.

Required Event Types:

  • EventStart: At least one step must handle start events
  • EventStop: Workflow terminates when a stop event is received
  • EventParallelResult: Required if using parallel execution

Steps are mapped to events based on their EventType, allowing multiple steps to handle the same event type in parallel.

func NewWorkflow added in v0.2.0

func NewWorkflow(name string) *Workflow

NewWorkflow creates a new workflow instance with the given name.

func (*Workflow) AddStep added in v0.2.0

func (w *Workflow) AddStep(step Step) error

AddStep adds a step to the workflow. Returns an error if the step is invalid.

func (*Workflow) Initialize added in v0.1.2

func (w *Workflow) Initialize() error

Initialize initializes the workflow

func (*Workflow) Run added in v0.1.2

func (w *Workflow) Run(ctx context.Context, inputs map[string]interface{}) (*WorkflowHandler, error)

Run executes the workflow with the given context and input parameters. Returns a WorkflowHandler for monitoring execution.

func (*Workflow) WithConfig added in v0.2.0

func (w *Workflow) WithConfig(config WorkflowConfig) *Workflow

WithConfig sets the workflow configuration and returns the workflow.

type WorkflowConfig added in v0.2.0

type WorkflowConfig struct {
	Name       string        `yaml:"name" json:"name"`
	MaxTurns   int           `yaml:"max_turns" json:"max_turns"`
	Verbose    bool          `yaml:"verbose" json:"verbose"`
	Timeout    time.Duration `yaml:"timeout" json:"timeout"`
	MaxRetries int           `yaml:"max_retries" json:"max_retries"`
}

WorkflowConfig holds workflow-level configuration settings.

func DefaultConfig added in v0.2.0

func DefaultConfig() WorkflowConfig

DefaultConfig returns default workflow configuration

type WorkflowHandler added in v0.2.0

type WorkflowHandler struct {
	// contains filtered or unexported fields
}

WorkflowHandler manages workflow execution and provides status updates.

func NewWorkflowHandler added in v0.2.0

func NewWorkflowHandler(ctx *Context) *WorkflowHandler

NewWorkflowHandler creates a new workflow handler

func (*WorkflowHandler) Cancel added in v0.2.0

func (h *WorkflowHandler) Cancel()

Cancel stops workflow execution.

func (*WorkflowHandler) Context added in v0.2.0

func (h *WorkflowHandler) Context() *Context

Context returns the workflow context.

func (*WorkflowHandler) Status added in v0.2.0

func (h *WorkflowHandler) Status() WorkflowStatus

Status returns the current workflow status.

func (*WorkflowHandler) Stream added in v0.2.0

func (h *WorkflowHandler) Stream() <-chan Event

Stream returns a channel for receiving workflow events.

func (*WorkflowHandler) Wait added in v0.2.0

func (h *WorkflowHandler) Wait() (interface{}, error)

Wait blocks until the workflow completes and returns the result or error.

type WorkflowStatus added in v0.2.0

type WorkflowStatus string

WorkflowStatus represents the current state of a workflow execution.

const (
	// WorkflowStatusPending indicates the workflow is waiting to be executed
	WorkflowStatusPending WorkflowStatus = "pending"
	// WorkflowStatusRunning indicates the workflow is currently being executed
	WorkflowStatusRunning WorkflowStatus = "running"
	// WorkflowStatusComplete indicates the workflow has completed successfully
	WorkflowStatusComplete WorkflowStatus = "complete"
	// WorkflowStatusFailed indicates the workflow has failed
	WorkflowStatusFailed WorkflowStatus = "failed"
	// WorkflowStatusCancelled indicates the workflow has been cancelled
	WorkflowStatusCancelled WorkflowStatus = "cancelled"
)

Directories

Path Synopsis
demo
basic command
handoff command
joke command
novel command
simple command
streaming command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL