worker

package
v0.9.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 27, 2026 License: MIT Imports: 17 Imported by: 0

Documentation

Overview

Package worker implements the AgentFactory worker protocol: registration with the platform, work polling, heartbeat reporting, and multi-worker fleet process management.

This package is public so downstream projects can import it for fleet lifecycle commands that route through the platform API proxy.

Index

Constants

View Source
const (
	// WorkerStateRunning indicates the child process has been spawned and
	// has not yet exited.
	WorkerStateRunning = "running"
	// WorkerStateStopping indicates Stop or Scale has begun tearing the
	// child down (SIGTERM sent) but the child has not yet been reaped.
	WorkerStateStopping = "stopping"
	// WorkerStateExited indicates the child has exited (either because the
	// Fleet told it to or because it crashed).
	WorkerStateExited = "exited"
)

Worker process states reported by Fleet.Status.

Variables

View Source
var (
	// ErrRegistrationFailed indicates a failure during POST /api/workers/register
	// that is not covered by a more specific sentinel (e.g. ErrInvalidProvisioningToken).
	ErrRegistrationFailed = errors.New("worker registration failed")

	// ErrPollFailed indicates a failure during GET /api/workers/{id}/poll that
	// is not covered by a more specific sentinel.
	ErrPollFailed = errors.New("worker poll failed")

	// ErrHeartbeatFailed indicates a failure during POST /api/workers/{id}/heartbeat
	// that is not covered by a more specific sentinel.
	ErrHeartbeatFailed = errors.New("worker heartbeat failed")

	// ErrInvalidProvisioningToken is returned when the rsp_live_ provisioning
	// token is rejected by the coordinator (HTTP 401 on the register call).
	ErrInvalidProvisioningToken = errors.New("invalid provisioning token")

	// ErrRuntimeJWTExpired is returned when the short-lived runtime JWT has
	// expired (HTTP 401 on a poll or heartbeat call). The worker should
	// re-register to obtain a fresh token.
	ErrRuntimeJWTExpired = errors.New("runtime jwt expired")

	// ErrRuntimeJWTInvalid is returned when the runtime JWT is present but
	// refused by the coordinator (HTTP 403). This typically indicates the
	// worker has been revoked or the token is structurally invalid and a
	// fresh register will not help.
	ErrRuntimeJWTInvalid = errors.New("runtime jwt invalid")

	// ErrRateLimited is returned when the coordinator responds with HTTP 429.
	ErrRateLimited = errors.New("rate limited")

	// ErrServerError is returned for 5xx responses from the coordinator.
	ErrServerError = errors.New("server error")

	// ErrNotFound is returned when the coordinator responds with HTTP 404.
	ErrNotFound = errors.New("not found")
)

Sentinel errors for expected worker-protocol failure modes. Callers should test for these with errors.Is. These are intentionally duplicated from afclient so the worker package stays self-contained.

Functions

func FleetPIDPath

func FleetPIDPath() (string, error)

FleetPIDPath returns the path to the fleet PID file. It honors $DONMAI_FLEET_PID_FILE (legacy: $AGENTFACTORY_FLEET_PID_FILE) when set; otherwise it derives the path from os.UserConfigDir as <config>/donmai/fleet.pids.

func ReadFleetPIDs

func ReadFleetPIDs() ([]int, error)

ReadFleetPIDs reads the fleet PID file. Returns an empty slice (nil error) when the file does not exist. Blank lines are skipped.

func RemoveFleetPIDFile

func RemoveFleetPIDFile() error

RemoveFleetPIDFile deletes the fleet PID file, if present. A missing file is not an error.

func WriteFleetPIDs

func WriteFleetPIDs(pids []int) error

WriteFleetPIDs writes the given PIDs, one per line, to the fleet PID file. The parent directory is created with 0o750 if missing and the file is written with 0o600.

Types

type AgentRuntimeProviderCapabilities

type AgentRuntimeProviderCapabilities struct {
	// SupportsMessageInjection indicates whether injectMessage() works for
	// this provider (stateful providers: Claude, A2A).
	SupportsMessageInjection bool `json:"supportsMessageInjection"`
	// SupportsSessionResume indicates whether resume() can continue a prior
	// session.
	SupportsSessionResume bool `json:"supportsSessionResume"`
	// SupportsToolPlugins indicates whether the provider can use MCP tool
	// plugins delivered via stdio servers (af_linear_*, af_code_*).
	SupportsToolPlugins bool `json:"supportsToolPlugins,omitempty"`
	// NeedsBaseInstructions indicates that the provider requires persistent
	// base instructions via AgentSpawnConfig.baseInstructions.
	NeedsBaseInstructions bool `json:"needsBaseInstructions,omitempty"`
	// NeedsPermissionConfig indicates that the provider requires structured
	// permission config via AgentSpawnConfig.permissionConfig.
	NeedsPermissionConfig bool `json:"needsPermissionConfig,omitempty"`
	// SupportsCodeIntelligenceEnforcement indicates that the provider supports
	// canUseTool-style code intelligence enforcement.
	SupportsCodeIntelligenceEnforcement bool `json:"supportsCodeIntelligenceEnforcement,omitempty"`
	// ToolPermissionFormat is the tool permission format used by this provider
	// (one of "claude", "codex", "spring-ai"). Empty string means default.
	ToolPermissionFormat string `json:"toolPermissionFormat,omitempty"`
	// EmitsSubagentEvents indicates whether the provider emits Anthropic-style
	// subagent events (e.g. Task tool progress). Used by the Topology view.
	// Only true for the Claude provider; Codex and Spring AI have no equivalent
	// emission today.
	EmitsSubagentEvents bool `json:"emitsSubagentEvents"`
	// HumanLabel is the human-readable display name for this provider family
	// (e.g. "Claude", "Codex", "Spring AI"). Used in UI and log messages.
	HumanLabel string `json:"humanLabel,omitempty"`
}

AgentRuntimeProviderCapabilities mirrors the TypeScript AgentProviderCapabilities struct (packages/core/src/providers/types.ts) and is used for typed capability advertisement during worker registration (REN-1282).

The orchestrator prefers CapabilitiesTyped over the legacy Capabilities []string field when both are present; the string slice is retained for backward compatibility with coordinators that have not yet been updated.

Field names use JSON camelCase to match the TypeScript wire format.

type Client

type Client struct {
	// BaseURL is the coordinator base URL without a trailing slash.
	BaseURL string
	// ProvisioningToken is the long-lived rsp_live_ token used only for
	// the initial Register call.
	ProvisioningToken string
	// RuntimeJWT is the short-lived bearer token returned by Register,
	// used for Poll and Heartbeat.
	RuntimeJWT string
	// WorkerID is the coordinator-assigned worker identifier populated
	// after a successful Register.
	WorkerID string
	// HTTPClient is the underlying HTTP client. NewClient installs one
	// with a 30s timeout; callers may replace it.
	HTTPClient *http.Client
}

Client is the HTTP client for the AgentFactory worker protocol. A fresh Client is created with the long-lived provisioning token; after a successful Register call the coordinator-assigned WorkerID and runtime JWT are stored on the Client and used for subsequent Poll/Heartbeat calls.

Client is not safe for concurrent mutation: callers that share a Client across goroutines should not mutate WorkerID or RuntimeJWT after the initial Register. Concurrent reads of the HTTPClient are safe.

func NewClient

func NewClient(baseURL, provisioningToken string) *Client

NewClient creates a Client pointing at the given coordinator base URL and carrying the given provisioning token. The returned Client has an HTTPClient with a 30-second timeout.

func (*Client) Heartbeat

func (c *Client) Heartbeat(ctx context.Context, req HeartbeatRequest) (*HeartbeatResponse, error)

Heartbeat reports the worker's liveness + active agent count to the coordinator. It issues POST /api/workers/{WorkerID}/heartbeat with the runtime JWT in the Authorization header.

A 401 response maps to ErrRuntimeJWTExpired — callers should re-register to obtain a fresh token. 429 maps to ErrRateLimited, and any 5xx to ErrHeartbeatFailed.

func (*Client) HeartbeatLoop

func (c *Client) HeartbeatLoop(ctx context.Context, interval time.Duration, counter func() int) error

HeartbeatLoop sends a heartbeat on the given interval using counter() to determine ActiveAgentCount for each tick. It blocks until ctx is cancelled or the runtime JWT expires.

An ErrRuntimeJWTExpired response is returned to the caller so a fleet manager can re-register. Other heartbeat errors are logged at warn and the loop continues.

func (*Client) Poll

func (c *Client) Poll(ctx context.Context) (*PollResponse, error)

Poll fetches the next batch of work items assigned to this worker.

It issues GET /api/workers/{WorkerID}/poll with the runtime JWT in the Authorization header. An empty WorkItems slice is NOT an error; it simply means the coordinator has no pending work.

A 401 response maps to ErrRuntimeJWTExpired — callers should re-register to obtain a fresh token. 404 maps to ErrNotFound (unknown worker), 429 to ErrRateLimited, and any 5xx to ErrPollFailed.

func (*Client) PollLoop

func (c *Client) PollLoop(ctx context.Context, interval time.Duration, handler func(WorkItem) error) error

PollLoop drives Poll on the given interval, invoking handler for each WorkItem returned. It blocks until ctx is cancelled or an unrecoverable error occurs.

A handler error is logged at warn and does not stop the loop. An ErrRuntimeJWTExpired from Poll is returned to the caller (so a fleet manager can re-register). Other Poll errors are logged at warn and the loop continues.

func (*Client) Register

func (c *Client) Register(ctx context.Context, req RegisterRequest) (*RegisterResponse, error)

Register exchanges the Client's provisioning token for a worker identity and runtime JWT.

Register authenticates with the long-lived provisioning token carried on the Client (c.ProvisioningToken) and must be called before any Poll or Heartbeat calls. The request body is encoded as JSON from req.

On success, Register mutates the Client: c.WorkerID and c.RuntimeJWT are populated from the response so subsequent Poll/Heartbeat calls can use the runtime JWT. The returned *RegisterResponse carries the same values together with the coordinator's heartbeat cadence.

On failure, the Client's WorkerID and RuntimeJWT fields are left unchanged. A 401 response is mapped to ErrInvalidProvisioningToken, 429 to ErrRateLimited, and any 5xx to ErrRegistrationFailed. All other errors (transport, decode, context cancellation) are wrapped with a "register: " prefix and returned verbatim.

type Fleet

type Fleet struct {

	// Env is the environment passed to every spawned child. When nil the
	// child inherits the parent's environment (exec.Cmd default). Tests
	// set this to inject helper-mode flags.
	Env []string
	// contains filtered or unexported fields
}

Fleet supervises a set of worker child processes. It is safe for concurrent use by a single owner (one goroutine calling Start/Scale/Stop plus optional readers calling Status).

func NewFleet

func NewFleet(binaryPath string, baseArgs []string) *Fleet

NewFleet constructs a Fleet that will spawn binaryPath with baseArgs appended by each child. The caller supplies binaryPath (typically from os.Executable or exec.LookPath).

func (*Fleet) Scale

func (f *Fleet) Scale(ctx context.Context, target int) error

Scale grows or shrinks the fleet to target processes. Uses graceful-stop (SIGTERM, then SIGKILL after scaleGrace) when shrinking. After a successful scale the PID file is rewritten.

func (*Fleet) SetLogger

func (f *Fleet) SetLogger(l *slog.Logger)

SetLogger overrides the default slog logger. Primarily for tests.

func (*Fleet) Start

func (f *Fleet) Start(ctx context.Context, count int) error

Start spawns exactly count worker child processes. Returns an error if any spawn fails; partial state is left in place (callers should Stop if they want a clean rollback). After a successful Start the fleet PID file is written with the active PIDs.

func (*Fleet) Status

func (f *Fleet) Status() []WorkerProcess

Status returns a point-in-time snapshot of the supervised processes.

func (*Fleet) Stop

func (f *Fleet) Stop(_ context.Context, graceful time.Duration) error

Stop signals all supervised children with SIGTERM, waits up to graceful for them to exit, then SIGKILLs survivors. Clears the process table and removes the PID file.

type HeartbeatRequest

type HeartbeatRequest struct {
	// ActiveAgentCount is the number of agent sessions this worker is
	// currently running.
	ActiveAgentCount int `json:"active_agent_count"`
	// Status is an optional free-form status label (e.g. "idle", "busy",
	// "draining"). May be empty.
	Status string `json:"status,omitempty"`
}

HeartbeatRequest is the body of POST /api/workers/{id}/heartbeat. It reports the worker's current liveness signal back to the coordinator.

type HeartbeatResponse

type HeartbeatResponse struct {
	// Ack is true when the coordinator accepted the heartbeat. The field
	// is optional in the wire format and defaults to the zero value when
	// the coordinator returns an empty body.
	Ack bool `json:"ack,omitempty"`
}

HeartbeatResponse is the response body from POST /api/workers/{id}/heartbeat. It is intentionally minimal today; a non-error status code is the acknowledgement.

type PollRequest

type PollRequest struct {
	// MaxItems caps the number of work items the coordinator may return
	// in a single poll. Zero lets the server pick a default.
	MaxItems int `json:"max_items,omitempty"`
}

PollRequest describes optional query parameters for GET /api/workers/{id}/poll. The endpoint uses GET with no body today; this struct is declared for forward compatibility and is unused by the current client helpers.

type PollResponse

type PollResponse struct {
	// WorkItems is the batch of work items assigned to the worker. May be
	// empty when the coordinator has no pending work.
	WorkItems []WorkItem `json:"work_items"`
}

PollResponse is the response body from GET /api/workers/{id}/poll. It carries the batch of work items the coordinator has assigned to this worker since the last poll (possibly empty).

type RegisterRequest

type RegisterRequest struct {
	// Hostname is the machine hostname reported by the worker process.
	Hostname string `json:"hostname"`
	// PID is the operating system process id of the worker.
	PID int `json:"pid"`
	// Version is the worker binary version string (semver or git sha).
	Version string `json:"version"`
	// Capabilities is the list of capability tags this worker advertises
	// (e.g. "claude", "codex"). Empty when the worker has no special tags.
	//
	// Deprecated: prefer CapabilitiesTyped when available; this field is
	// retained for backward compatibility with older coordinators.
	Capabilities []string `json:"capabilities,omitempty"`
	// CapabilitiesTyped carries a fully-typed capability struct for this
	// worker's agent runtime provider. When present the orchestrator uses this
	// in preference to the untyped Capabilities slice. Nil when the worker
	// has not been updated to advertise typed capabilities.
	CapabilitiesTyped *AgentRuntimeProviderCapabilities `json:"capabilities_typed,omitempty"`
	// MaxAgents is the maximum number of concurrent agent sessions this
	// worker will run. Zero means unspecified/default.
	MaxAgents int `json:"max_agents,omitempty"`
}

RegisterRequest is the body of POST /api/workers/register. It is sent with the provisioning token (rsp_live_...) in the Authorization header and describes the worker that is coming online.

func (*RegisterRequest) ResolveCapabilities

func (r *RegisterRequest) ResolveCapabilities() (*AgentRuntimeProviderCapabilities, []string)

ResolveCapabilities returns a summary of the effective capability tags for this registration request, implementing the "prefer typed when present" migration path (REN-1282 / ADR-002).

If CapabilitiesTyped is non-nil the orchestrator should use it as the authoritative source. The legacy Capabilities slice is still returned for logging and backward-compatible coordinator behaviour.

Returns the typed struct (or nil) and the effective string tags to use. When CapabilitiesTyped is present the string tags are derived from the typed struct; otherwise the original Capabilities slice is returned unchanged.

type RegisterResponse

type RegisterResponse struct {
	// WorkerID is the coordinator-assigned identifier for this worker.
	WorkerID string `json:"worker_id"`
	// RuntimeJWT is the short-lived bearer token used for all subsequent
	// Poll and Heartbeat calls.
	RuntimeJWT string `json:"runtime_jwt"`
	// HeartbeatIntervalSeconds is the cadence at which the coordinator
	// expects heartbeats, expressed in seconds. Use HeartbeatInterval to
	// obtain a time.Duration.
	HeartbeatIntervalSeconds int `json:"heartbeat_interval_seconds"`
}

RegisterResponse is the response body from POST /api/workers/register. The coordinator returns a short-lived runtime JWT that must be used for subsequent Poll/Heartbeat calls, together with the assigned worker id and the cadence the worker should heartbeat at.

func (RegisterResponse) HeartbeatInterval

func (r RegisterResponse) HeartbeatInterval() time.Duration

HeartbeatInterval returns the heartbeat cadence as a time.Duration.

type WorkItem

type WorkItem struct {
	// ID is the coordinator-assigned identifier for the work item.
	ID string `json:"id"`
	// Type is the work item kind (e.g. "session.start", "session.stop").
	Type string `json:"type"`
	// Payload is the opaque, type-specific payload. Kept as
	// json.RawMessage so the worker package does not need to know the
	// shape of every work item kind.
	Payload json.RawMessage `json:"payload"`
	// CreatedAt is the server-side timestamp at which the work item was
	// created. Encoded as RFC3339/ISO8601 by the default time.Time JSON
	// marshaler.
	CreatedAt time.Time `json:"created_at"`
}

WorkItem is a single unit of work handed to a worker by the coordinator. The Payload is opaque to this package and is forwarded verbatim to the agent runner that knows how to interpret the given Type.

type WorkerProcess

type WorkerProcess struct {
	PID       int
	StartedAt time.Time
	State     string // "running" | "stopping" | "exited"
}

WorkerProcess is a snapshot of a supervised worker's state returned by Fleet.Status. It is a plain value — mutating it does not affect the Fleet. The name intentionally includes the "Worker" prefix so callers outside the worker package read it as a worker-owned process type.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL