Documentation
¶
Index ¶
- Variables
- func FuelUsedFromEnvelope(raw string) int64
- func StripBudgetFromOutbound(raw string) string
- func TenantFromEnvelope(raw string) string
- func TenantScope(ctx context.Context) string
- func WithTenant(ctx context.Context, slug string) context.Context
- func WithTenantObserver(ctx context.Context, o *TenantObserver) context.Context
- type AsyncEnvelope
- type FuelExhaustedError
- type TTLExhaustedError
- type TenantObserver
- type Unit
- func (pu *Unit) DecorateInput(input string, overrides []resonator.BranchValue) (string, error)
- func (pu *Unit) DriveDeferredResume(runID, stage string)
- func (pu *Unit) EnableMCPSessionCache(ttl time.Duration)
- func (pu *Unit) Exec(ctx context.Context, op operation.Operation) (event.Payload, bool, error)
- func (pu *Unit) ExecAI(ctx context.Context, op operation.Operation) (event.Payload, error)
- func (pu *Unit) ExecCompute(ctx context.Context, op operation.Operation) (event.Payload, error)
- func (pu *Unit) ExecCore(ctx context.Context, op operation.Operation) (event.Payload, error)
- func (pu *Unit) ExecHTTP(ctx context.Context, op operation.Operation) (event.Payload, error)
- func (pu *Unit) ExecHTTPAsync(ctx context.Context, op operation.Operation, env AsyncEnvelope, token string) (jobID string, err error)
- func (pu *Unit) ExecMCPHTTP(ctx context.Context, op operation.Operation) (payload event.Payload, err error)
- func (pu *Unit) Handle(prefix []byte, handler event.OpsHandler)
- func (pu *Unit) MakeMockResponse(op operation.Operation, errMsg string) event.Payload
- func (pu *Unit) MergeJSON(src string, dst string) (string, error)
- func (pu *Unit) NodeForOp(ctx context.Context, op operation.Operation) (*registry.Node, error)
- func (pu *Unit) OpsForStage(ctx context.Context, stage string) ([]operation.Operation, error)
- func (pu *Unit) OverlayResponse(env, output string, overrides []resonator.BranchValue) (string, error)
- func (pu *Unit) ResonatingOps(input string, ops []operation.Operation, hashSeed string) ([]operation.Operation, error)
- func (pu *Unit) Resume(ctx context.Context, runID, stage string) error
- func (pu *Unit) Run(ctx context.Context, raw string, stage string, resCh chan event.Payload) error
- func (pu *Unit) StageParse(stage string) (stack string, scope int, err error)
Constants ¶
This section is empty.
Variables ¶
var StagePartsRE = regexp.MustCompile(`(.*)\/+(\d+)$`)
StagePartsRE Set compile limit for stage
Functions ¶
func FuelUsedFromEnvelope ¶ added in v0.2.4
FuelUsedFromEnvelope reads the live fuel value from an envelope. Used at the UsageEvent construction site (server.go:1010) where the merged envelope is available but the per-request context is not.
func StripBudgetFromOutbound ¶ added in v0.2.4
StripBudgetFromOutbound deletes the chassis-internal budget fields from the response delivered to the inlet. Called by server.go just before forwarding the payload to the inlet client. Mirrors the existing `_txc.halt` / `_txc.goto` strip; exported so the server package (which owns the convergence between chassis emit and inlet write) can apply it after capturing the fuel value for UsageEvent.
func TenantFromEnvelope ¶ added in v0.2.5
TenantFromEnvelope reads the resolved tenant slug from an envelope's `_txc.tenant`. Mirrors FuelUsedFromEnvelope; used at the trace-usage emit sites (main + resume paths) to attribute a trace to its tenant — the slug admin tenant-scoping filters on.
func TenantScope ¶ added in v0.2.6
TenantScope exposes the pinned request tenant slug to bundled ops outside this package (e.g. txco://sendmail's From-domain anti-spoof check). It is the trusted, immutable tenant pinned at first Run — NOT the mutable `_txc.tenant` envelope field, which a mid-pipeline op could overwrite.
func WithTenant ¶
WithTenant pins a tenant slug onto ctx for op resolution. Exposed for callers that drive Run/OpsForStage outside the normal ingress path (and for tests). Within the normal data plane, Run pins this itself from the envelope's `_txc.tenant`.
func WithTenantObserver ¶ added in v0.2.4
func WithTenantObserver(ctx context.Context, o *TenantObserver) context.Context
WithTenantObserver attaches a tenant observer to ctx for the processor's pin sites to record into.
Types ¶
type AsyncEnvelope ¶
type AsyncEnvelope struct {
OpContinuationID string `json:"op_continuation_id"`
CallbackURL string `json:"callback_url"`
RunID string `json:"run_id"`
RunContinuationID string `json:"run_continuation_id"`
Stack string `json:"stack"`
Stage string `json:"stage"`
Op string `json:"op"`
ExpiresAt string `json:"expires_at,omitempty"`
}
AsyncEnvelope is the `_txc` block handed to an async worker so it can post one op result back. The single-use bearer token is delivered out-of-band of the body (request header), not in the envelope.
type FuelExhaustedError ¶ added in v0.2.4
type FuelExhaustedError struct {
MaxFuel int64 `json:"max_fuel"`
FuelUsed int64 `json:"fuel_used"`
LastStage string `json:"last_stage"`
LastTransitions []string `json:"last_transitions"`
}
FuelExhaustedError is the structured terminal error for fuel exhaustion. Serialized through the same final-response path as a wall-clock cancel.
func (*FuelExhaustedError) AsJSON ¶ added in v0.2.4
func (e *FuelExhaustedError) AsJSON() string
func (*FuelExhaustedError) Error ¶ added in v0.2.4
func (e *FuelExhaustedError) Error() string
type TTLExhaustedError ¶ added in v0.2.4
type TTLExhaustedError struct {
MaxTTL int64 `json:"max_ttl"`
Consumed int64 `json:"consumed"`
LastStage string `json:"last_stage"`
LastTransitions []string `json:"last_transitions"`
}
TTLExhaustedError is the structured terminal error for hop-budget exhaustion. Distinct code from fuel exhaustion so operators can see "this is a loop" vs "this is expensive work" at a glance.
func (*TTLExhaustedError) AsJSON ¶ added in v0.2.4
func (e *TTLExhaustedError) AsJSON() string
func (*TTLExhaustedError) Error ¶ added in v0.2.4
func (e *TTLExhaustedError) Error() string
type TenantObserver ¶ added in v0.2.4
type TenantObserver struct {
// contains filtered or unexported fields
}
TenantObserver records the tenant a request is pinned to, so the server can attribute usage/billing from immutable pipeline state rather than the mutable response envelope. The envelope's `_txc.tenant` is display/debug/routing metadata an author-controlled stack can rewrite; billing must not trust it.
It mirrors the admission.Lease holder pattern: created in the server bus-loop goroutine that owns a request's lifetime, attached to the request context, and read back after the pipeline returns. The processor's tenant-pin sites (first pin in Run; the one-way `_sys`->concrete retenant) record the resolved slug via WithTenant. Last write wins — for a routed request that is the concrete tenant; for an unrouted one it stays `_sys`.
Nil-safe so non-server callers (tests, CLI) can ignore it.
func NewTenantObserver ¶ added in v0.2.4
func NewTenantObserver() *TenantObserver
NewTenantObserver returns a fresh observer with no tenant recorded yet.
func (*TenantObserver) Tenant ¶ added in v0.2.4
func (o *TenantObserver) Tenant() (slug string, ok bool)
Tenant returns the last-recorded tenant slug and whether any pin was observed. ok is false if the pipeline never pinned a tenant (e.g. a request that errored before Run pinned), letting the caller fall back.
type Unit ¶
type Unit struct {
Conf config.Config
Logger *zap.Logger
Kv store.Store
RuntimeDB *sql.DB
AuthDB *sql.DB
Dbc *dbcache.DbCache
Mc *metrics.Metrics
Bus chan<- *event.Envelope
Reg registry.Registry
Mux *radix.Tree
HTTPClient *http.Client
// Runs is the continuation store wrapper. Used only on the barrier
// (async) path; nil-safe — the sync fast path never touches it.
Runs *continuation.Runs
CallbackBaseURL string
// Sink spawns resume traces from background goroutines (local
// async; future similar work). The same sink the web personality
// uses, exposed here so chassis-internal goroutines that outlive
// their originating request can write traces too. Nil-safe — when
// unset, local async still functions but the resume work is
// untraced (used in unit tests + non-server contexts).
Sink trace.Sink
// MCPSessions caches server-minted `Mcp-Session-Id` values per
// (tenant, endpoint) so hot MCP paths skip the init lifecycle
// (3 HTTPS round-trips → 1). Nil-safe — when unset, ExecMCPHTTP
// runs the full lifecycle every call (test default; backwards-
// compatible).
MCPSessions *mcpSessionCache
// Computes runs op:// computes resolved to "compute://<alg>/<digest>".
// nil-safe: ExecCompute fails loudly if a compute:// op fires while
// this is unset (no engine wired). Set when the chassis is built with
// a compute runtime.
Computes compute.Runner
// Usage is the usage sink. nil-safe. When set, each compute invocation
// emits a usage event (src="compute") alongside the per-request one.
Usage usage.Sink
// Secrets is the per-tenant secret-store Resolver. Non-nil when
// --secret-master-key is configured AND its file loads cleanly at
// boot; nil otherwise. PR 3 will consult this from the processor
// splice that populates op.Secrets between WHEN/SET/SELECT/WITH
// decoration and Exec. While Secrets is nil, any op with
// `secrets` in its WITH clause must fail loud with
// `secret_store_unavailable` rather than silently skip.
Secrets *secrets.Resolver
// Admission gates the one-time _sys -> concrete-tenant handoff: a
// suspended/disabled tenant is denied (402/403) before its stack
// runs. nil-safe — when unset every tenant is admitted (the test and
// non-server default). Set by the server from a dbcache-backed
// provider rebuilt on every reload.
Admission admission.Provider
// contains filtered or unexported fields
}
Unit is the chassis's per-process processor handle. RuntimeDB holds content (ops, stacks, versions, files, tenants) and is always present. AuthDB holds identity (actors, keys, memberships, invitations, browser sessions) and is nil on data-plane-only chassis where the admin personality isn't active.
func New ¶
func New(conf config.Config, logger *zap.Logger, reg registry.Registry, mc *metrics.Metrics, bus chan<- *event.Envelope, kv store.Store, runtimeDB, authDB *sql.DB, dbc *dbcache.DbCache, runs *continuation.Runs, guard egress.Guard, secretsResolver *secrets.Resolver) *Unit
New Processor. authDB may be nil on data-plane-only chassis. guard is the outbound op-dial policy consulted at the dial step (see egress).
func (*Unit) DecorateInput ¶
DecorateInput overrides input with preset values, but only if the target path doesn't already exist (set-if-absent semantics — used by SET PRE / SET POST).
Each override.Value flows through runtime.Resolve so future Value shapes (PathRef, FunctionCall) plumb through without touching this function. On a resolution error the call returns the partially-updated input plus the error; per the strict-by-default semantics in internal docs/todo-txcl-expressions.md §5, callers MUST treat the error as a rule-halt signal (silently dropping the SET would hide protocol errors).
func (*Unit) DriveDeferredResume ¶
DriveDeferredResume claims and resumes a deferred-join suspend on a fresh background context (the request ctx is gone). Self-contained and safe for both the suspend-time race guard and the worker-completion path: it re-checks resumability and lets ClaimResume pick the single winner.
func (*Unit) EnableMCPSessionCache ¶
EnableMCPSessionCache turns on session caching for hot MCP paths. The cache type is unexported (it's an implementation detail of the processor); this method is the construction surface that server.go uses at boot. Idempotent — calling twice replaces the underlying cache (dropping any in-flight entries; safe at boot time before any traffic flows).
func (*Unit) Exec ¶
Exec Execute an operation at this step Exec dispatches an operation to its transport and returns the produced payload. The bool return — authorControlled — reports whether the output came from an author-controlled producer (remote HTTP, sandboxed compute, an MCP tool, or a rule-author mock) as opposed to a trusted built-in core handler, the chassis-owned ai:// namespace, or a synthesized control output. Callers that merge this output into the envelope MUST sanitizeAuthorOutput it when authorControlled is true, so an untrusted producer cannot forge reserved `_txc.*` control fields (tenant, computed auth, budget, …).
func (*Unit) ExecAI ¶ added in v0.2.4
ExecAI dispatches ai://<sub_op>. v1 supports only ai://chat.
The handler:
- Parses the sub-op (chat is the only one wired today).
- Decodes op.Meta (WITH-clause materialization) into a chat.Request.
- Resolves a backend via chat.Resolve (provider override or first-registered default; capability matching is its own follow-up).
- Materializes the backend's RequiredSecrets() through the per-tenant store with optional env-var fallback (AIChatEnvFallback config), reusing the existing chassis pattern: SecretBag, RecordSecretMaterialize, fuelCostSecretMaterialize fuel charge.
- Renders {{@path}} prompt templates over op.Input.
- Calls backend.Run exactly once (no tool loop in v1).
- Validates the response against WITH schema if present (binary ok / failed; no repair semantics).
- Emits a chat.completion trace event with provider, model, token counts, routing decision, retries, latency, and schema validation outcome.
- Builds the response envelope: top-level text + schema_validated_payload + chat.error (when set) + _txc.chat.* metadata. Token counts are recorded HERE for downstream observability/billing; they are NOT charged to the chassis fuel meter — provider compute is a separate dimension.
Error policy: chat-level failures (auth, provider HTTP/net, schema validation, missing secret, no backend) surface as a structured `chat.error` field on the response envelope so rule authors can handle uniformly via `WHEN @chat.error EXEC ...`. ExecAI returns a non-nil error only for malformed-shape conditions a rule author cannot work around (unrecognized sub-op, op.Meta failing JSON parse, an unset runtime dependency). This gives rule authors one error-handling pattern; structured codes (txco_chat_*) live on the envelope.
func (*Unit) ExecCompute ¶
ExecCompute runs a `compute://<alg>/<digest>` op: it resolves the content-addressed artifact and invokes it on its engine, returning the output as a JSON payload. The output is handed back to Exec's shared tail, so it flows the identical post-EXEC processing (EMIT overlay, per-scope merge, _txc.goto/halt) as http:// and txco:// — the compute transport is not a special case downstream.
func (*Unit) ExecHTTP ¶
ExecHTTP Handles execution of http, https operations.
When op.Meta declares `secrets.*` refs (per internal docs/todo-secret-store.md §4), this is the only place cleartext crosses from op.Secrets (the in-process bag) into the outbound wire — applied as request headers or as JSON body field overlays. op.Input is NOT mutated, so the trace event still records the request body the operator authored; the wire body is a separate buffer that lives only for the duration of the HTTP call.
func (*Unit) ExecHTTPAsync ¶
func (pu *Unit) ExecHTTPAsync(ctx context.Context, op operation.Operation, env AsyncEnvelope, token string) (jobID string, err error)
ExecHTTPAsync invokes an async worker. Unlike ExecHTTP it wraps the op input as `{ "input": <op.Input>, "_txc": {…} }` (async-only — ExecHTTP is untouched) and carries the single-use callback token in the X-Txco-Continuation-Token request header. The worker is expected to answer 202 Accepted (optionally `{"job_id":"…"}`); the real result arrives later via the callback endpoint. Returns the worker-supplied job id (optional, may be "").
func (*Unit) ExecMCPHTTP ¶
func (pu *Unit) ExecMCPHTTP(ctx context.Context, op operation.Operation) (payload event.Payload, err error)
ExecMCPHTTP dispatches an MCP-over-HTTP `tools/call` op. Mirrors ExecHTTP structurally and shares pu.HTTPClient so the egress guard, per-op context, and trace pipeline all apply for free.
v0 is correct but not optimized: every op pays the full session lifecycle (initialize → notifications/initialized → tools/call). A session cache per (tenant, endpoint) is a v0.5 optimization that doesn't change this signature or the rule syntax.
func (*Unit) Handle ¶
func (pu *Unit) Handle(prefix []byte, handler event.OpsHandler)
Handle Setup the internal routing table, passing off opnames starting with prefix to this handler
func (*Unit) MakeMockResponse ¶
func (*Unit) MergeJSON ¶
MergeJSON iterate through top level keys, add them to other struct, return it
func (*Unit) NodeForOp ¶
NodeForOp Given an operation description, find matching node.
Now that gRPC is removed and HTTP rules dispatch directly to op.Resonator.Exec, this is only consulted for txco:// (local) operations that route by the fixed registry. Kept around because the ExecCore path may still want logical-name resolution in the future; today it's effectively a passthrough.
func (*Unit) OpsForStage ¶
OpsForStage lookup operations for given stage. If no rows match the requested stack, the lookup falls back along stack-prefix boundaries: a request at `website/canary/100` that finds nothing tries `website/100`, then “. This implements the overlay model — a sparse `website/canary` tree inherits any scope it doesn't explicitly override from the parent stack. Wildcard stacks (containing SQL LIKE metacharacters like `%`) skip the fallback walk; they already match across stacks at one level.
func (*Unit) OverlayResponse ¶
func (pu *Unit) OverlayResponse(env, output string, overrides []resonator.BranchValue) (string, error)
OverlayResponse writes branch values onto a JSON document with overwrite semantics — the EMIT clause's runtime. Sibling to DecorateInput, which is set-if-absent.
Two distinct documents flow through here:
- env: the envelope to RESOLVE PathRef / FunctionCall args against. Typically op.Input — the envelope the op saw at dispatch, so EMIT's PathRefs like `@web.req.body` read live envelope state.
- output: the WRITE accumulator. EMIT's resolved values are sjson-set onto this; the merger picks it up after the op returns. Empty/missing becomes "{}" so callers don't need to special-case EMIT-only rules.
Splitting env from output is load-bearing: if both were the same document (the EMIT accumulator), `EMIT @x = @env.field` couldn't see the envelope at all — `@env.field` would resolve against the initially-empty accumulator and come back nil. The two-arg form is a fix for that.
Each override.Value flows through runtime.Resolve. Resolution errors halt and propagate up (strict-by-default); sjson write errors (path unwritable) keep the existing log-and-continue behavior because the path syntax is a programming bug rather than a value-computation failure.
func (*Unit) ResonatingOps ¶
func (pu *Unit) ResonatingOps(input string, ops []operation.Operation, hashSeed string) ([]operation.Operation, error)
ResonatingOps returns the subset of ops whose WHEN clause matches the input. If any matching op has Priority > 0, only the highest-priority op is returned; otherwise all matching ops are returned (parallel execution at this stage).
Clause evaluation order on a single rule: WHEN, PRE-SET, SELECT, POST-SET, WITH, PRIORITY, EXEC. PRE-SET, SELECT, and POST-SET control the event payload passed to dispatch; WITH carries per-call metadata; PRIORITY tie-breaks matching rules; EXEC is the dispatch target.
func (*Unit) Resume ¶
Resume merges all op-terminal outputs for (runID, stage) in ascending ordinal onto the stage's scope-entry envelope, then drives the opstack forward via advanceAfterScope (the same engine the sync path uses). Called by the winning continuation callback. Writes result.json on terminal completion; if the pipeline re-suspends at a later async barrier nothing is written here — that stage's callbacks drive it. A failed sibling op fails the stage (design: a failed sibling fails the scope). Runs in the caller's ctx, not the dead request ctx.
func (*Unit) Run ¶
Run Execute a request.
Snapshot semantics: the first call to Run captures the current dbcache in-memory *sql.DB and attaches it to ctx. All subsequent recursive Run calls (for goto / next-stage advancement within the same request) reuse that snapshot. A `txco apply` that completes mid-request swaps pu.Dbc.Db on the chassis side but leaves the captured pointer untouched — Go's GC keeps the old in-memory DB alive while any request holds a reference. So the opstack a request sees at stage 0 is the same opstack it sees at every later stage in the same request.
Tests that drive Run directly without a snapshot in ctx still work: OpsForStage falls back to pu.Dbc.Db when the ctx key is absent.