aigo

package module
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 9, 2026 License: MIT Imports: 13 Imported by: 0

README

aigo

中文说明

aigo is an agent-native Go SDK for multimodal media generation. Describe work as a lightweight workflow graph, route it to different execution engines, and get structured results with error classification, retry hints, and progress callbacks. Zero external dependencies — only Go stdlib.

Architecture

Agent (LLM / code)
  │
  ▼
AgentTask ──► BuildGraph() ──► workflow.Graph (DAG)
                                    │
                     ┌──────────────┼──────────────┐──────────────┐
                     ▼              ▼              ▼              ▼
               engine/aliyun  engine/openai  engine/newapi  engine/comfyui
                     │              │              │              │
                     ▼              ▼              ▼              ▼
               Bailian API    DALL-E API    Multi-gateway    ComfyUI WS

Engines

Engine Backend Capabilities
aliyun Alibaba Cloud Bailian / DashScope Image, video, TTS, voice design
openai OpenAI DALL-E Image generation
newapi Multi-route gateway OpenAI-compat, Kling, Jimeng, Sora, Qwen, Gemini
comfyui ComfyUI server Full passthrough via WebSocket

Install

go get github.com/godeps/aigo

Quick Start

Simple prompt
client := aigo.NewClient()

_ = client.RegisterEngine("img", aliyun.New(aliyun.Config{
    Model: aliyun.ModelQwenImage,
}))

result, err := client.ExecutePrompt(ctx, "img", "A shiba inu riding a vintage motorcycle")
fmt.Println(result.Value)   // URL or data URI
fmt.Println(result.Kind)    // aigo.OutputURL, OutputDataURI, etc.
fmt.Println(result.Engine)  // "img"
fmt.Println(result.Elapsed) // execution duration
Rich Result type

Every execution method returns aigo.Result:

type Result struct {
    Value    string         // raw output (URL, data URI, JSON, etc.)
    Kind     OutputKind     // authoritative classification
    Engine   string         // which engine produced this
    Elapsed  time.Duration  // wall-clock execution time
    Metadata map[string]any // engine-specific data (optional)
}

fmt.Println(result) // Result implements String(), prints Value
Structured task
result, err := client.ExecuteTask(ctx, "video", aigo.AgentTask{
    Prompt:   "Turn this product scene into a short ad",
    Duration: 2,
    Structured: &aigo.AgentTaskStructured{
        VideoSize: "1280*720",
        ImageSize: "1024*1024",
    },
    References: []aigo.ReferenceAsset{
        {Type: aigo.ReferenceTypeVideo, URL: "https://example.com/input.mp4"},
        {Type: aigo.ReferenceTypeImage, URL: "https://example.com/style.png"},
    },
})
TTS (text-to-speech)
result, err := client.ExecuteTask(ctx, "tts", aigo.AgentTask{
    Prompt: "Welcome to our product launch event",
    TTS: &aigo.TTSOptions{
        Voice:        "zhiyan",
        LanguageType: "zh",
    },
})
Voice design
result, err := client.ExecuteTask(ctx, "vd", aigo.AgentTask{
    Prompt: "design a voice",
    VoiceDesign: &aigo.VoiceDesignOptions{
        VoicePrompt:   "A warm, friendly female voice",
        PreviewText:   "Hello, welcome!",
        TargetModel:   "cosyvoice-v2",
        PreferredName: "custom-voice-01",
    },
})

Agent-Native Features

Structured errors with retry classification

Errors from all engines are classified so agents can make retry decisions:

import "github.com/godeps/aigo/engine/aigoerr"

_, err := client.ExecutePrompt(ctx, "img", "...")
if aigoerr.IsRetryable(err) {
    // safe to retry (429, 5xx, timeout)
}

code, ok := aigoerr.GetCode(err)
// aigoerr.CodeRateLimit, CodeServerError, CodeInvalidInput, etc.

var ae *aigoerr.Error
if errors.As(err, &ae) {
    fmt.Println(ae.StatusCode)  // original HTTP status
    fmt.Println(ae.RetryAfter)  // parsed Retry-After header
}
JSON Schema tool definitions

Register aigo tools with any agent framework (OpenAI, Anthropic, LangChain, Vercel AI SDK):

import "github.com/godeps/aigo/tooldef"

tools := tooldef.AllTools() // generate_image, generate_video, text_to_speech, ...
// Register with your framework's function-calling system
Engine capability discovery

Query what engines can do — for dynamic tool selection:

cap, _ := client.EngineCapabilities("aliyun-img")
// cap.MediaTypes  → ["image"]
// cap.Models      → ["qwen-image"]
// cap.SupportsSync, cap.SupportsPoll

// Find all engines that handle video:
videoEngines := client.AvailableFor("video")
Progress reporting

Monitor long-running tasks:

result, err := client.Execute(ctx, "video", graph, aigo.WithProgress(func(e aigo.ProgressEvent) {
    fmt.Printf("[%s] elapsed=%s\n", e.Phase, e.Elapsed)
    // Phase: "submitted", "completed"
}))
Middleware

Add cross-cutting concerns (logging, retry, timing):

client.Use(aigo.WithLogging(os.Stderr))
client.Use(aigo.WithRetry(3)) // retry retryable errors up to 3 times
Pipeline chaining

Chain multi-step workflows where each step feeds the next:

p := aigo.NewPipeline("img", aigo.AgentTask{Prompt: "a cat"}).
    Then(func(prev aigo.Result) (aigo.AgentTask, string) {
        return aigo.AgentTask{
            Prompt:     "animate this image",
            References: []aigo.ReferenceAsset{{Type: aigo.ReferenceTypeImage, URL: prev.Value}},
        }, "video"
    })

results, err := client.ExecutePipeline(ctx, p)
// results[0] = image result, results[1] = video result
DryRun estimation

Check what would happen without executing:

dr, err := client.DryRun("video", aigo.AgentTask{Prompt: "..."})
// dr.WillPoll       — whether the engine will poll
// dr.EstimatedTime  — human-readable time estimate
// dr.Warnings       — potential issues
Auto-routing with selector

Let the LLM inside your agent choose the engine:

result, err := client.ExecuteTaskAuto(ctx, selector, aigo.AgentTask{
    Prompt:   "make a 2 second product video",
    Duration: 2,
})
// result.Engine       — which engine was selected
// result.Reason       — why it was selected
// result.Output.Value — the generation result
Fallback across engines

Try multiple engines in order; first success wins:

result, err := client.ExecuteWithFallback(ctx, []string{"primary", "backup"}, graph)
// result.Engine       — which engine succeeded
// result.Output.Value — the result
// result.Skipped      — engines that failed (with errors)
Async execution

Non-blocking execution via channel:

ch := client.ExecuteAsync(ctx, "video", graph)
// ... do other work ...
ar := <-ch
if ar.Err != nil { ... }
fmt.Println(ar.Result.Value)

Low-Level API

If your agent already emits workflow graphs, call Execute directly:

graph := workflow.Graph{
    "1": {
        ClassType: "CLIPTextEncode",
        Inputs:    map[string]any{"text": "A cinematic lighthouse in a storm"},
    },
    "2": {
        ClassType: "EmptyLatentImage",
        Inputs:    map[string]any{"width": 1536, "height": 1024},
    },
}

result, err := client.Execute(ctx, "img", graph)

Alibaba Cloud Models

engine/aliyun supports:

Constant Model Type
ModelQwenImage qwen-image Image
ModelWanImage wan2.7-image Image
ModelZImageTurbo z-image-turbo Image
ModelWanTextToVideo wan2.6-t2v Video
ModelWanReferenceVideo wan2.6-r2v Video
ModelWanVideoEdit wan2.7-videoedit Video
ModelQwenTTSFlash qwen3-tts-flash TTS
ModelQwenTTSInstructFlash qwen3-tts-instruct-flash TTS
ModelQwenVoiceDesign qwen-voice-design Voice Design

Environment variable:

export DASHSCOPE_API_KEY=your_key

New API Gateway

engine/newapi supports multiple route families via a single gateway:

Route Protocol
RouteOpenAIImagesGenerations POST /v1/images/generations
RouteOpenAIImagesEdits POST /v1/images/edits
RouteOpenAIVideoGenerations POST+GET /v1/video/generations
RouteOpenAISpeech POST /v1/audio/speech
RouteKlingText2Video Kling text-to-video
RouteKlingImage2Video Kling image-to-video
RouteJimengVideo Jimeng (Volcengine) video
RouteSoraVideos OpenAI Sora video
RouteQwenImagesGenerations Qwen image generation
RouteGeminiGenerateContent Gemini native generateContent

Environment variables:

export NEWAPI_BASE_URL=https://your-gateway.example.com
export NEWAPI_API_KEY=your_key

Internal Packages

Package Purpose
workflow/resolve Shared graph resolution (node string extraction, option helpers, link following)
engine/poll Unified polling with exponential backoff, max attempts, and progress callbacks
engine/httpx HTTP client defaults and helpers
engine/aigoerr Structured error classification for agent retry logic
tooldef JSON Schema tool definitions for agent frameworks

Examples

# Alibaba Cloud
go run ./examples/aliyun_qwen_image
go run ./examples/aliyun_wan_image
go run ./examples/aliyun_zimage
go run ./examples/aliyun_wan_t2v
go run ./examples/aliyun_wan_r2v
go run ./examples/aliyun_wan_videoedit
go run ./examples/aliyun_qwen_tts
go run ./examples/aliyun_qwen_voice_design

# New API gateway
go run ./examples/newapi_image
go run ./examples/newapi_speech
go run ./examples/newapi_video

# Auto-routing
go run ./examples/agent_auto_router

Notes

  • Alibaba Cloud result URLs are temporary OSS links. Persist them immediately.
  • As of April 2026, Alibaba Cloud public docs still expose wan2.6-t2v and wan2.6-r2v for text/reference-to-video, while wan2.7-videoedit is the public video editing model.

Documentation

Index

Constants

View Source
const (
	OutputUnknown   = engine.OutputUnknown
	OutputURL       = engine.OutputURL
	OutputDataURI   = engine.OutputDataURI
	OutputJSON      = engine.OutputJSON
	OutputPlainText = engine.OutputPlainText
)

Variables

View Source
var (
	ErrEngineNil       = errors.New("aigo: engine is nil")
	ErrEngineExists    = errors.New("aigo: engine already registered")
	ErrEngineNotFound  = errors.New("aigo: engine not found")
	ErrEngineNameEmpty = errors.New("aigo: engine name is empty")
)

Functions

func BuildGraph

func BuildGraph(task AgentTask) workflow.Graph

BuildGraph compiles a high-level agent task into the SDK's workflow graph format.

Types

type AgentTask

type AgentTask struct {
	Prompt         string
	NegativePrompt string
	Width          int
	Height         int
	Size           string
	Duration       int
	Watermark      *bool
	References     []ReferenceAsset

	TTS         *TTSOptions
	VoiceDesign *VoiceDesignOptions

	// Structured groups image/video options separately for finer control.
	Structured *AgentTaskStructured
}

AgentTask is a graph-free request shape for agents.

type AgentTaskStructured

type AgentTaskStructured struct {
	ImageSize        string
	ImageWatermark   *bool
	VideoDuration    int
	VideoSize        string
	VideoWatermark   *bool
	VideoAspectRatio string
	VideoResolution  string // "480P", "720P", "1080P"
	VideoAudio       *bool
}

AgentTaskStructured 将图像与视频选项分组;便于扩展而无需继续增大 AgentTask 扁平字段。

type AsyncResult

type AsyncResult struct {
	Result Result
	Err    error
}

AsyncResult delivers an asynchronous execution outcome.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client routes a workflow graph to a registered execution engine.

func NewClient

func NewClient() *Client

NewClient creates a new SDK client.

func (*Client) AvailableFor

func (c *Client) AvailableFor(mediaType string) []string

AvailableFor returns registered engine names whose capabilities include the given media type. Engines that do not implement Describer are included (assumed capable).

func (*Client) DryRun

func (c *Client) DryRun(engineName string, task AgentTask) (engine.DryRunResult, error)

DryRun checks what would happen without actually executing the task. Returns an estimation if the engine implements DryRunner; otherwise returns a basic result based on Describer capabilities.

func (*Client) EngineCapabilities

func (c *Client) EngineCapabilities(name string) (engine.Capability, error)

EngineCapabilities returns the capabilities of a registered engine. If the engine does not implement Describer, an empty Capability is returned.

func (*Client) EngineNames

func (c *Client) EngineNames() []string

EngineNames returns registered engine names in deterministic order.

func (*Client) Execute

func (c *Client) Execute(ctx context.Context, engineName string, graph workflow.Graph, opts ...ExecuteOption) (Result, error)

Execute dispatches the graph to the named engine.

func (*Client) ExecuteAsync

func (c *Client) ExecuteAsync(ctx context.Context, engineName string, graph workflow.Graph) <-chan AsyncResult

ExecuteAsync runs Execute in a goroutine and delivers the result on the returned channel. The channel is closed after sending exactly one value. Cancelling ctx stops the work.

func (*Client) ExecutePipeline

func (c *Client) ExecutePipeline(ctx context.Context, p *Pipeline) ([]Result, error)

ExecutePipeline runs each step in sequence, feeding results forward.

func (*Client) ExecutePrompt

func (c *Client) ExecutePrompt(ctx context.Context, engineName string, prompt string) (Result, error)

ExecutePrompt runs the simplest agent request: a single prompt.

func (*Client) ExecutePromptAuto

func (c *Client) ExecutePromptAuto(ctx context.Context, selector Selector, prompt string) (RoutedResult, error)

ExecutePromptAuto lets a selector choose the engine for a prompt-driven request.

func (*Client) ExecuteTask

func (c *Client) ExecuteTask(ctx context.Context, engineName string, task AgentTask, opts ...ExecuteOption) (Result, error)

ExecuteTask converts an agent task into a workflow graph and routes it to the target engine.

func (*Client) ExecuteTaskAuto

func (c *Client) ExecuteTaskAuto(ctx context.Context, selector Selector, task AgentTask) (RoutedResult, error)

ExecuteTaskAuto lets a selector choose the engine for a structured agent task.

func (*Client) ExecuteTaskWithFallback

func (c *Client) ExecuteTaskWithFallback(ctx context.Context, engines []string, task AgentTask, opts ...ExecuteOption) (FallbackResult, error)

ExecuteTaskWithFallback is the AgentTask variant of ExecuteWithFallback.

func (*Client) ExecuteWithFallback

func (c *Client) ExecuteWithFallback(ctx context.Context, engines []string, graph workflow.Graph, opts ...ExecuteOption) (FallbackResult, error)

ExecuteWithFallback tries each engine in order; the first success wins. All engines that fail are recorded in FallbackResult.Skipped.

func (*Client) ExecuteWithHint deprecated

func (c *Client) ExecuteWithHint(ctx context.Context, engineName string, graph workflow.Graph) (OutputHint, error)

ExecuteWithHint 等价于 Execute,并附带 InterpretOutputKind 结果。

Deprecated: Use Execute directly; Result now includes Kind.

func (*Client) RegisterEngine

func (c *Client) RegisterEngine(name string, e engine.Engine) error

RegisterEngine registers an engine under a logical name.

func (*Client) Use

func (c *Client) Use(mw ...Middleware)

Use appends middleware that wraps every engine on each Execute call. Middleware is applied in the order added (first added = outermost wrapper).

type ExecuteOption

type ExecuteOption func(*executeConfig)

ExecuteOption configures optional Execute behavior.

func WithProgress

func WithProgress(fn func(ProgressEvent)) ExecuteOption

WithProgress registers a callback for execution progress events.

type FallbackError

type FallbackError struct {
	Engine string
	Err    error
}

FallbackError records a single engine failure during fallback execution.

func (FallbackError) Error

func (e FallbackError) Error() string

func (FallbackError) Unwrap

func (e FallbackError) Unwrap() error

type FallbackResult

type FallbackResult struct {
	Engine  string
	Output  Result
	Skipped []FallbackError
}

FallbackResult is the outcome of a fallback-enabled execution.

type Middleware

type Middleware func(name string, next engine.Engine) engine.Engine

Middleware wraps an engine to add cross-cutting behavior (logging, timing, retry, etc.).

func WithLogging

func WithLogging(w io.Writer) Middleware

WithLogging returns middleware that logs engine calls to the given writer.

func WithRetry

func WithRetry(maxRetries int) Middleware

WithRetry returns middleware that retries on retryable errors up to maxRetries times.

type OutputHint deprecated

type OutputHint struct {
	Kind OutputKind
	Raw  string
}

OutputHint 包含原始输出与 InterpretOutputKind 的推断类别。

Deprecated: Use Result instead; Execute now returns Result with Kind populated.

type OutputKind

type OutputKind = engine.OutputKind

OutputKind is an alias for engine.OutputKind.

func InterpretOutputKind

func InterpretOutputKind(s string) OutputKind

InterpretOutputKind 根据内容前缀与 JSON 合法性做轻量分类;任务 id、纯数字等会落在 OutputPlainText。

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline chains multiple engine executions where each step's input depends on the previous output.

func NewPipeline

func NewPipeline(engineName string, task AgentTask) *Pipeline

NewPipeline starts a pipeline with an initial engine and task.

func (*Pipeline) Then

func (p *Pipeline) Then(fn PipelineStep) *Pipeline

Then appends a step that transforms the previous result into the next task.

type PipelineStep

type PipelineStep func(prev Result) (AgentTask, string)

PipelineStep transforms a previous result into the next task and target engine.

type ProgressEvent

type ProgressEvent struct {
	Phase   string        // "submitted", "polling", "completed"
	Attempt int           // poll attempt number (0 for non-polling phases)
	Elapsed time.Duration // wall-clock time since execution start
}

ProgressEvent reports execution progress to the caller.

type ReferenceAsset

type ReferenceAsset struct {
	Type ReferenceType
	URL  string
}

ReferenceAsset describes an externally reachable media input.

type ReferenceType

type ReferenceType string

ReferenceType identifies the kind of remote asset to attach to an agent task.

const (
	ReferenceTypeImage ReferenceType = "image"
	ReferenceTypeVideo ReferenceType = "video"
)

type Result

type Result struct {
	Value    string         // Raw output from the engine.
	Kind     OutputKind     // Authoritative classification from the engine.
	Engine   string         // Name of the engine that produced the result.
	Elapsed  time.Duration  // Wall-clock execution time.
	Metadata map[string]any // Engine-specific data (optional).
}

Result is the public outcome of every Client execution method.

func (Result) String

func (r Result) String() string

String returns the raw output value, allowing fmt.Sprint(result) to work naturally.

type RoutedResult

type RoutedResult struct {
	Engine string
	Reason string
	Output Result
}

RoutedResult is the result of a selector-driven execution.

type Selection

type Selection struct {
	Engine string
	Reason string
}

Selection is the selector's routing decision.

type Selector

type Selector interface {
	SelectEngine(ctx context.Context, task AgentTask, engines []string) (Selection, error)
}

Selector decides which registered engine should handle a task.

type TTSOptions

type TTSOptions struct {
	Voice                string
	LanguageType         string
	Instructions         string
	OptimizeInstructions *bool
}

TTSOptions groups text-to-speech parameters.

type VoiceDesignOptions

type VoiceDesignOptions struct {
	VoicePrompt    string
	PreviewText    string
	TargetModel    string
	PreferredName  string
	Language       string
	SampleRate     int
	ResponseFormat string
	OmitPreview    bool
}

VoiceDesignOptions groups voice design parameters.

Directories

Path Synopsis
aigoerr
Package aigoerr provides structured, classifiable errors for aigo engines.
Package aigoerr provides structured, classifiable errors for aigo engines.
aliyun
Package aliyun 对接阿里云百炼(DashScope)多模态 API。
Package aliyun 对接阿里云百炼(DashScope)多模态 API。
aliyun/internal/async
Package async 封装百炼异步任务创建与轮询(图生图、文生视频等共用)。
Package async 封装百炼异步任务创建与轮询(图生图、文生视频等共用)。
aliyun/internal/audiogen
Package audiogen 实现阿里云百炼「语音合成 / 声音设计」类能力。
Package audiogen 实现阿里云百炼「语音合成 / 声音设计」类能力。
aliyun/internal/graphx
Package graphx 从 workflow.Graph 抽取各域(图/视频/音频)共用字段。
Package graphx 从 workflow.Graph 抽取各域(图/视频/音频)共用字段。
aliyun/internal/imggen
Package imggen 实现阿里云百炼「图片生成」类能力(文生图等)。
Package imggen 实现阿里云百炼「图片生成」类能力(文生图等)。
aliyun/internal/vidgen
Package vidgen 实现阿里云百炼「视频生成 / 编辑」类能力(Wan 系列异步接口)。
Package vidgen 实现阿里云百炼「视频生成 / 编辑」类能力(Wan 系列异步接口)。
ark
Package ark implements the Volcengine Ark (火山方舟) video generation engine, supporting Seedance 2.0 and other Ark content generation models.
Package ark implements the Volcengine Ark (火山方舟) video generation engine, supporting Seedance 2.0 and other Ark content generation models.
embed
Package embed defines the EmbedEngine interface for vector embedding backends.
Package embed defines the EmbedEngine interface for vector embedding backends.
embed/aliyun
Package aliyun implements the Alibaba Cloud DashScope (Bailian) embedding backend.
Package aliyun implements the Alibaba Cloud DashScope (Bailian) embedding backend.
embed/gemini
Package gemini implements the Gemini Embedding 2 backend.
Package gemini implements the Gemini Embedding 2 backend.
embed/jina
Package jina implements the Jina AI embedding backend.
Package jina implements the Jina AI embedding backend.
embed/openai
Package openai implements the OpenAI text embedding backend.
Package openai implements the OpenAI text embedding backend.
embed/voyage
Package voyage implements the Voyage AI embedding backend.
Package voyage implements the Voyage AI embedding backend.
httpx
Package httpx 提供各引擎共用的 HTTP Client 默认值(超时等)。
Package httpx 提供各引擎共用的 HTTP Client 默认值(超时等)。
newapi
Package newapi 对接 New API 文档中的大模型 HTTP 接口(图像 / 视频 / 语音等)。
Package newapi 对接 New API 文档中的大模型 HTTP 接口(图像 / 视频 / 语音等)。
newapi/internal/rt
Package rt 提供网关 BaseURL 规范化与路径拼接。
Package rt 提供网关 BaseURL 规范化与路径拼接。
examples
aliyun_qwen_tts command
aliyun_wan_r2v command
aliyun_wan_t2v command
aliyun_zimage command
newapi_image command
newapi_speech command
newapi_video command
Package tooldef provides JSON Schema tool definitions for AI agent frameworks.
Package tooldef provides JSON Schema tool definitions for AI agent frameworks.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL