subagent

package
v0.0.0-...-86b5f70 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 12, 2026 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package subagent runs jess subagents with bounded concurrency. A Pool executes named Spec definitions as tasks, merging their events (tagged by AgentPath) onto one stream. It builds each subagent through internal/core, so subagents are real *agentcore.Agent runs that inherit the parent's audit and gate.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrUnknownAgent = errors.New("subagent: unknown agent")
	ErrPoolClosed   = errors.New("subagent: pool is closed")
	ErrMaxDepth     = errors.New("subagent: max depth exceeded")
)

Errors returned by Submit.

Functions

This section is empty.

Types

type Event

type Event struct {
	ac.Event
	AgentPath []string
}

Event is an agentcore lifecycle event tagged with the AgentPath of the subagent that emitted it. The Pool merges events from many concurrent subagent runs onto one stream; AgentPath is how a consumer tells them apart (e.g. {"research/0001"}, or {"root/0001", "web/0002"} for a nested run).

agentcore.Event has no notion of a subagent path, so jess carries it alongside rather than inside the agentcore type.

type Option

type Option func(*poolConfig)

Option configures a Pool.

func WithDefaults

func WithDefaults(model ac.ChatModel, gate ac.ToolGate, sink ledger.Sink, agentID string) Option

WithDefaults sets the parent defaults each Spec inherits when its corresponding field is unset: the model, tool gate, audit sink, and agentID. jess.New uses this so subagents share the parent agent's safety controls.

func WithMaxConcurrent

func WithMaxConcurrent(n int) Option

WithMaxConcurrent caps how many subagent runs execute at once (default 8).

func WithMaxDepth

func WithMaxDepth(n int) Option

WithMaxDepth caps subagent nesting depth (default 8).

func WithMaxQueued

func WithMaxQueued(n int) Option

WithMaxQueued caps how many tasks may wait in the queue; Submit blocks when full (default 1024).

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool runs subagent tasks with bounded concurrency and merges their events.

Callers must call Close (graceful) or Cancel (abort) when done with the Pool; otherwise its worker goroutines run forever waiting for tasks.

func New

func New(opts ...Option) *Pool

New creates a Pool and starts its workers.

func (*Pool) Cancel

func (p *Pool) Cancel()

Cancel aborts all in-flight and queued tasks by cancelling the pool context, then behaves like Close. In-flight runs are interrupted (ctx -> abort).

func (*Pool) Close

func (p *Pool) Close()

Close stops accepting new tasks. In-flight and queued tasks still run; the merged event stream closes once they finish. Idempotent. Safe to call concurrently with Submit (a racing Submit returns ErrPoolClosed rather than panicking).

func (*Pool) Events

func (p *Pool) Events() <-chan Event

Events returns the merged, AgentPath-tagged event stream of all subagent runs. It closes after Close and all tasks finish. Consume it, or call Wait (which drains it); do not do both concurrently.

func (*Pool) Register

func (p *Pool) Register(s Spec)

Register adds or replaces a subagent Spec.

func (*Pool) Submit

func (p *Pool) Submit(ctx context.Context, name, input string, parentPath ...string) (*Task, error)

Submit queues a run whose events go to the pool's merged stream (Events()). It blocks if the queue is full and returns when a slot is available or ctx is cancelled. parentPath is the caller's AgentPath (nil at top level).

func (*Pool) SubmitTo

func (p *Pool) SubmitTo(ctx context.Context, sink *Stream, name, input string, parentPath ...string) (*Task, error)

SubmitTo queues a run whose events are forwarded to sink (AgentPath-tagged) instead of the pool's merged stream. Used to bubble a subagent's events into a parent run's stream. The sink is caller-owned; the pool never closes it.

The sink must be actively consumed. If the sink's buffer fills with no reader, the forwarding worker blocks on Send, which stalls the task until the sink drains.

func (*Pool) Wait

func (p *Pool) Wait() error

Wait blocks until all submitted tasks finish (call Close first). It drains the merged event stream so a caller that ignores Events never deadlocks.

type Result

type Result struct {
	AgentPath []string
	Output    string // the subagent's final assistant text
	Summary   *ac.RunSummary
}

Result is the outcome of a finished subagent Task.

type Spec

type Spec struct {
	Name         string
	Model        ac.ChatModel
	Tools        []ac.Tool
	Skills       *skill.Set
	SystemPrompt string
	AgentID      string
	MaxTurns     int

	// Gate and Audit are inherited from the Pool's base config when left nil, so
	// subagents share the parent's safety controls by default.
	Gate  ac.ToolGate
	Audit ledger.Sink
}

Spec defines a named subagent: its model and capabilities. The Pool runs a Spec as a task, each task on its own isolated run.

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream multiplexes Events from many producers (the Pool's per-job mergers) onto a single consumer that ranges over Events() (fan-in). Backpressure is by blocking send against the buffered channel: a slow consumer slows producers, bounding memory.

Send after Close is a no-op (never a panic), so a producer that outlives the run is harmless. Close is idempotent and unblocks any producer currently blocked on a full buffer, so a graceful Close never hangs behind a slow or absent consumer.

func NewStream

func NewStream(buffer int) *Stream

NewStream returns a Stream whose channel buffers up to buffer events.

func (*Stream) Close

func (s *Stream) Close()

Close closes the stream. Idempotent. Subsequent Send calls are no-ops and the Events channel is closed so consumers ranging over it terminate.

func (*Stream) Events

func (s *Stream) Events() <-chan Event

Events returns the receive channel. It is closed when Close runs, so callers can range over it.

func (*Stream) Send

func (s *Stream) Send(ev Event)

Send delivers ev to the consumer. It blocks while the buffer is full, unless the stream is closed (then it drops ev and returns).

type Task

type Task struct {
	// contains filtered or unexported fields
}

Task is the handle for one submitted subagent run.

func (*Task) AgentPath

func (t *Task) AgentPath() []string

AgentPath returns a copy of the task's path segment(s) (e.g. {"research/0001"}). The copy keeps the task's internal path immutable from callers, who could otherwise corrupt the path observed on events/results.

func (*Task) Wait

func (t *Task) Wait() (Result, error)

Wait blocks until the task finishes and returns its result and error. The returned Result carries a fresh copy of AgentPath, so a caller mutating it cannot corrupt the task's stored path or a later Wait call's result.

type Tool

type Tool struct {
	// contains filtered or unexported fields
}

Tool is the agentcore.Tool the model calls to delegate to a subagent. It runs the named subagent on the Pool and returns the subagent's final text output. The subagent's events are merged onto the Pool's stream (AgentPath-tagged) for any observer of the Pool.

func NewTool

func NewTool(pool *Pool) *Tool

NewTool builds the subagent tool backed by pool. Register the available subagent Specs on the pool before use.

func (*Tool) Description

func (t *Tool) Description() string

Description is what the model sees.

func (*Tool) Execute

func (t *Tool) Execute(ctx context.Context, raw json.RawMessage) (json.RawMessage, error)

Execute runs the subagent and returns its output as JSON. The subagent's events flow onto the Pool's merged stream, tagged by AgentPath.

func (*Tool) Name

func (t *Tool) Name() string

Name satisfies agentcore.Tool.

func (*Tool) Schema

func (t *Tool) Schema() map[string]any

Schema satisfies agentcore.Tool.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL