Documentation
¶
Overview ¶
Package runner is the user-facing entry point of atto. A Runner resolves the session for a request, drives the agent.Agent loop and exposes the resulting events as an iterator.
Two implementations sit behind the same Runner interface: a single-process synchronous runner that does not require a goakt actor system (returned when New is called with a nil ActorSystem) and a goakt-backed runner whose per-session actors hydrate from a shared store.SessionStore (returned when an ActorSystem is supplied). The user-facing API is identical.
Index ¶
- Variables
- func ClusterKinds() []gactor.Actor
- func Extension(opts ...ExtensionOption) extension.Extension
- func NewModelLLM(ctx context.Context, sys gactor.ActorSystem) (llm.LLM, error)
- func RemoteSerializables() []any
- type ExtensionOption
- func ExtensionWithLLM(l llm.LLM) ExtensionOption
- func ExtensionWithModelBaseBackoff(d time.Duration) ExtensionOption
- func ExtensionWithModelMaxBackoff(d time.Duration) ExtensionOption
- func ExtensionWithModelMaxRetries(n int) ExtensionOption
- func ExtensionWithStashBound(n int) ExtensionOption
- func ExtensionWithStore(s store.SessionStore) ExtensionOption
- type Option
- type Runner
Constants ¶
This section is empty.
Variables ¶
var ErrNilActorSystem = errors.New("atto/runner: actor system is nil")
ErrNilActorSystem is returned by NewModelLLM when sys is nil. The proxy needs the actor system to spawn the model actor and the per-call subscriber actor.
var ErrNoAgent = errors.New("atto/runner: no agent supplied")
ErrNoAgent is returned by New when called with a nil agent.
var ErrRunnerStopped = errors.New("atto/runner: runner stopped")
ErrRunnerStopped is returned from [Runner.Run] after [Runner.Stop] has been called.
var ErrRuntimeExtensionMissing = errors.New("atto/runner: actor system is missing the atto runtime extension")
ErrRuntimeExtensionMissing is returned by New when the supplied actor.ActorSystem has no atto runtime extension registered. Use Extension to construct one and pass it to actor.WithExtensions when creating the system.
Functions ¶
func ClusterKinds ¶
ClusterKinds returns the actor kinds atto needs registered on a goakt gactor.ClusterConfig when running in cluster mode. Without this registration, a node receiving a Spawn request for one of atto's actor types from another cluster member cannot instantiate the actor locally; cluster placement falls back to best-effort and session affinity is lost.
Pair this with RemoteSerializables when configuring goakt's remote layer; both registrations are required for a working cluster setup.
Typical wiring in cluster mode:
provider := static.NewDiscovery(&static.Config{Hosts: peers})
cluster := gactor.NewClusterConfig().
WithDiscovery(provider).
WithKinds(runner.ClusterKinds()...).
WithDiscoveryPort(3320).
WithPeersPort(3321).
WithMinimumPeersQuorum(2)
rem := remote.NewConfig("0.0.0.0", 3330,
remote.WithSerializables(runner.RemoteSerializables()...),
)
sys, _ := gactor.NewActorSystem("atto",
gactor.WithRemote(rem),
gactor.WithCluster(cluster),
gactor.WithExtensions(runner.Extension(
runner.ExtensionWithStore(sharedStore),
runner.ExtensionWithLLM(model),
)),
)
In single-process / non-cluster mode this helper is unused; the actor-backed runner spawns actors locally and never needs the kinds registry.
func Extension ¶
func Extension(opts ...ExtensionOption) extension.Extension
Extension returns the goakt extension that exposes atto's runtime configuration to the per-session and model actors. Register it on your actor system so the actors can resolve their dependencies on activation:
sys, _ := actor.NewActorSystem("agents",
actor.WithExtensions(runner.Extension(
runner.ExtensionWithStore(myStore),
runner.ExtensionWithLLM(myLLM),
runner.ExtensionWithStashBound(64),
)),
)
Resolving dependencies on PreStart (rather than capturing them in a constructor) is what makes the actors transparently relocatable in cluster mode: when goakt re-spawns an actor on another node, the extension on that node supplies the same dependencies.
func NewModelLLM ¶
NewModelLLM returns an llm.LLM proxy that dispatches every completion through atto's internal/actor.ModelActor running on sys. The actor is spawned once under internal/actor.ModelActorName and reused across calls; subsequent invocations on the same system reuse the existing actor (goakt's Spawn is idempotent for live actors).
The proxy is the bridge between user-facing code that holds an llm.LLM and the actor pipeline that owns retry semantics, rate limiting, and (in cluster mode) placement strategy. Use it when constructing the agent for the goakt-backed runner:
sys, _ := actor.NewActorSystem("agents",
actor.WithExtensions(runner.Extension(
runner.ExtensionWithStore(store),
runner.ExtensionWithLLM(rawLLM),
)),
)
_ = sys.Start(ctx)
model, _ := runner.NewModelLLM(ctx, sys)
a := agent.NewLLM(agent.WithModel(model), agent.WithInstruction("..."))
r, _ := runner.New(sys, a)
The runtime extension supplies the underlying llm.LLM (via ExtensionWithLLM) and the retry policy. NewModelLLM returns ErrRuntimeExtensionMissing when the extension is not registered.
The protocol between proxy, subscriber, and model actor uses only pure-data messages — no context.Context, no Go channels, no callbacks — so the model actor is safe to relocate across cluster nodes.
func RemoteSerializables ¶
func RemoteSerializables() []any
RemoteSerializables returns the slice of message values that goakt's remote layer must know how to serialise across the network. Atto's internal protocol uses plain Go structs (no context.Context, no Go channels, so they are pure data and safe to ship over the wire); the registration tells goakt which concrete types may appear.
Pass the returned slice to github.com/tochemey/goakt/v4/remote.WithSerializables when constructing the remote config. Without this registration, an actor on node A telling a message to an actor on node B fails to decode the payload and the call is dropped (or surfaces a confusing decode error in the goakt logs). Centralising the list here keeps user code free of bookkeeping bugs from missed additions.
In single-process / non-cluster mode this helper is unused.
Types ¶
type ExtensionOption ¶
type ExtensionOption func(*extensionConfig)
ExtensionOption configures the goakt extension returned by Extension.
func ExtensionWithLLM ¶
func ExtensionWithLLM(l llm.LLM) ExtensionOption
ExtensionWithLLM configures the llm.LLM adapter that the model actor wraps.
func ExtensionWithModelBaseBackoff ¶
func ExtensionWithModelBaseBackoff(d time.Duration) ExtensionOption
ExtensionWithModelBaseBackoff sets the initial backoff between model retry attempts.
func ExtensionWithModelMaxBackoff ¶
func ExtensionWithModelMaxBackoff(d time.Duration) ExtensionOption
ExtensionWithModelMaxBackoff caps the backoff between model retry attempts.
func ExtensionWithModelMaxRetries ¶
func ExtensionWithModelMaxRetries(n int) ExtensionOption
ExtensionWithModelMaxRetries caps the number of times the model actor retries a transient error.
func ExtensionWithStashBound ¶
func ExtensionWithStashBound(n int) ExtensionOption
ExtensionWithStashBound caps the number of pending GetContext requests a session actor may stash before responding with session.ErrSessionBacklogFull.
func ExtensionWithStore ¶
func ExtensionWithStore(s store.SessionStore) ExtensionOption
ExtensionWithStore configures the store.SessionStore backing the session actors.
type Option ¶
type Option func(*config)
Option configures a Runner at construction time.
func WithEventBufferSize ¶
WithEventBufferSize configures the buffered channel that the actor-backed runner uses to forward events from the worker goroutine to the iter.Seq2 consumer. The default is 64.
The synchronous runner emits events on the caller's goroutine and ignores this option.
func WithPassivationAfter ¶
WithPassivationAfter sets how long a session actor may stay idle before goakt passivates it. Passivation releases the in-memory session state; the next invocation re-spawns the actor and re-hydrates from the configured store. The default is 15 minutes.
Applied only in actor-backed mode.
func WithStashBound ¶
WithStashBound caps the number of pending invocations a single session may queue while a turn is in flight. Excess invocations are rejected with session.ErrSessionBacklogFull. The default is 32.
Effective only in synchronous mode (sys == nil at New). In actor-backed mode the bound is sourced from the Extension; this option is ignored.
func WithStore ¶
func WithStore(s store.SessionStore) Option
WithStore configures the store.SessionStore used to persist session history and state. The default is the in-memory store.
type Runner ¶
type Runner interface {
// Run drives one invocation of the configured root agent against
// sessionID, supplying input as the user message. It returns an
// iterator over the resulting events. The iterator terminates when
// the invocation completes, errors or the supplied context is
// cancelled.
Run(ctx context.Context, sessionID string, input session.Message) iter.Seq2[*session.Event, error]
// Stop releases any resources held by the runner. Calls to [Run]
// after Stop yield [ErrRunnerStopped].
Stop(ctx context.Context) error
}
Runner is the user-facing entry point. Implementations must be safe for concurrent use across distinct session IDs.
func New ¶
New constructs the default Runner.
When sys is nil the runner is the in-process synchronous implementation: invocations execute on the caller's goroutine and per-session serialisation is handled by an internal mutex. The configuration options (WithStore etc.) are read directly from the runner's option list.
When sys is non-nil the runner is backed by goakt: per-session actors hydrate from the store.SessionStore supplied by the atto runtime extension and per-invocation worker goroutines stream events through a buffered channel. The runtime extension MUST be registered on the actor system via Extension; New returns ErrRuntimeExtensionMissing otherwise. The runner-level option list is then ignored — the extension is the sole source of truth for cluster-relocatable actors.
The user-facing API is identical for both implementations.