processor

package
v0.2.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 9, 2026 License: MPL-2.0 Imports: 53 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var StagePartsRE = regexp.MustCompile(`(.*)\/+(\d+)$`)

StagePartsRE Set compile limit for stage

Functions

func FuelUsedFromEnvelope added in v0.2.4

func FuelUsedFromEnvelope(raw string) int64

FuelUsedFromEnvelope reads the live fuel value from an envelope. Used at the UsageEvent construction site (server.go:1010) where the merged envelope is available but the per-request context is not.

func StripBudgetFromOutbound added in v0.2.4

func StripBudgetFromOutbound(raw string) string

StripBudgetFromOutbound deletes the chassis-internal budget fields from the response delivered to the inlet. Called by server.go just before forwarding the payload to the inlet client. Mirrors the existing `_txc.halt` / `_txc.goto` strip; exported so the server package (which owns the convergence between chassis emit and inlet write) can apply it after capturing the fuel value for UsageEvent.

func TenantFromEnvelope added in v0.2.5

func TenantFromEnvelope(raw string) string

TenantFromEnvelope reads the resolved tenant slug from an envelope's `_txc.tenant`. Mirrors FuelUsedFromEnvelope; used at the trace-usage emit sites (main + resume paths) to attribute a trace to its tenant — the slug admin tenant-scoping filters on.

func TenantScope added in v0.2.6

func TenantScope(ctx context.Context) string

TenantScope exposes the pinned request tenant slug to bundled ops outside this package (e.g. txco://sendmail's From-domain anti-spoof check). It is the trusted, immutable tenant pinned at first Run — NOT the mutable `_txc.tenant` envelope field, which a mid-pipeline op could overwrite.

func WithTenant

func WithTenant(ctx context.Context, slug string) context.Context

WithTenant pins a tenant slug onto ctx for op resolution. Exposed for callers that drive Run/OpsForStage outside the normal ingress path (and for tests). Within the normal data plane, Run pins this itself from the envelope's `_txc.tenant`.

func WithTenantObserver added in v0.2.4

func WithTenantObserver(ctx context.Context, o *TenantObserver) context.Context

WithTenantObserver attaches a tenant observer to ctx for the processor's pin sites to record into.

Types

type AsyncEnvelope

type AsyncEnvelope struct {
	OpContinuationID  string `json:"op_continuation_id"`
	CallbackURL       string `json:"callback_url"`
	RunID             string `json:"run_id"`
	RunContinuationID string `json:"run_continuation_id"`
	Stack             string `json:"stack"`
	Stage             string `json:"stage"`
	Op                string `json:"op"`
	ExpiresAt         string `json:"expires_at,omitempty"`
}

AsyncEnvelope is the `_txc` block handed to an async worker so it can post one op result back. The single-use bearer token is delivered out-of-band of the body (request header), not in the envelope.

type FuelExhaustedError added in v0.2.4

type FuelExhaustedError struct {
	MaxFuel         int64    `json:"max_fuel"`
	FuelUsed        int64    `json:"fuel_used"`
	LastStage       string   `json:"last_stage"`
	LastTransitions []string `json:"last_transitions"`
}

FuelExhaustedError is the structured terminal error for fuel exhaustion. Serialized through the same final-response path as a wall-clock cancel.

func (*FuelExhaustedError) AsJSON added in v0.2.4

func (e *FuelExhaustedError) AsJSON() string

func (*FuelExhaustedError) Error added in v0.2.4

func (e *FuelExhaustedError) Error() string

type TTLExhaustedError added in v0.2.4

type TTLExhaustedError struct {
	MaxTTL          int64    `json:"max_ttl"`
	Consumed        int64    `json:"consumed"`
	LastStage       string   `json:"last_stage"`
	LastTransitions []string `json:"last_transitions"`
}

TTLExhaustedError is the structured terminal error for hop-budget exhaustion. Distinct code from fuel exhaustion so operators can see "this is a loop" vs "this is expensive work" at a glance.

func (*TTLExhaustedError) AsJSON added in v0.2.4

func (e *TTLExhaustedError) AsJSON() string

func (*TTLExhaustedError) Error added in v0.2.4

func (e *TTLExhaustedError) Error() string

type TenantObserver added in v0.2.4

type TenantObserver struct {
	// contains filtered or unexported fields
}

TenantObserver records the tenant a request is pinned to, so the server can attribute usage/billing from immutable pipeline state rather than the mutable response envelope. The envelope's `_txc.tenant` is display/debug/routing metadata an author-controlled stack can rewrite; billing must not trust it.

It mirrors the admission.Lease holder pattern: created in the server bus-loop goroutine that owns a request's lifetime, attached to the request context, and read back after the pipeline returns. The processor's tenant-pin sites (first pin in Run; the one-way `_sys`->concrete retenant) record the resolved slug via WithTenant. Last write wins — for a routed request that is the concrete tenant; for an unrouted one it stays `_sys`.

Nil-safe so non-server callers (tests, CLI) can ignore it.

func NewTenantObserver added in v0.2.4

func NewTenantObserver() *TenantObserver

NewTenantObserver returns a fresh observer with no tenant recorded yet.

func (*TenantObserver) Tenant added in v0.2.4

func (o *TenantObserver) Tenant() (slug string, ok bool)

Tenant returns the last-recorded tenant slug and whether any pin was observed. ok is false if the pipeline never pinned a tenant (e.g. a request that errored before Run pinned), letting the caller fall back.

type Unit

type Unit struct {
	Conf      config.Config
	Logger    *zap.Logger
	Kv        store.Store
	RuntimeDB *sql.DB
	AuthDB    *sql.DB
	Dbc       *dbcache.DbCache
	Mc        *metrics.Metrics

	Bus        chan<- *event.Envelope
	Reg        registry.Registry
	Mux        *radix.Tree
	HTTPClient *http.Client

	// Runs is the continuation store wrapper. Used only on the barrier
	// (async) path; nil-safe — the sync fast path never touches it.
	Runs            *continuation.Runs
	CallbackBaseURL string

	// Sink spawns resume traces from background goroutines (local
	// async; future similar work). The same sink the web personality
	// uses, exposed here so chassis-internal goroutines that outlive
	// their originating request can write traces too. Nil-safe — when
	// unset, local async still functions but the resume work is
	// untraced (used in unit tests + non-server contexts).
	Sink trace.Sink

	// MCPSessions caches server-minted `Mcp-Session-Id` values per
	// (tenant, endpoint) so hot MCP paths skip the init lifecycle
	// (3 HTTPS round-trips → 1). Nil-safe — when unset, ExecMCPHTTP
	// runs the full lifecycle every call (test default; backwards-
	// compatible).
	MCPSessions *mcpSessionCache

	// Computes runs op:// computes resolved to "compute://<alg>/<digest>".
	// nil-safe: ExecCompute fails loudly if a compute:// op fires while
	// this is unset (no engine wired). Set when the chassis is built with
	// a compute runtime.
	Computes compute.Runner

	// Usage is the usage sink. nil-safe. When set, each compute invocation
	// emits a usage event (src="compute") alongside the per-request one.
	Usage usage.Sink

	// Secrets is the per-tenant secret-store Resolver. Non-nil when
	// --secret-master-key is configured AND its file loads cleanly at
	// boot; nil otherwise. PR 3 will consult this from the processor
	// splice that populates op.Secrets between WHEN/SET/SELECT/WITH
	// decoration and Exec. While Secrets is nil, any op with
	// `secrets` in its WITH clause must fail loud with
	// `secret_store_unavailable` rather than silently skip.
	Secrets *secrets.Resolver

	// Admission gates the one-time _sys -> concrete-tenant handoff: a
	// suspended/disabled tenant is denied (402/403) before its stack
	// runs. nil-safe — when unset every tenant is admitted (the test and
	// non-server default). Set by the server from a dbcache-backed
	// provider rebuilt on every reload.
	Admission admission.Provider
	// contains filtered or unexported fields
}

Unit is the chassis's per-process processor handle. RuntimeDB holds content (ops, stacks, versions, files, tenants) and is always present. AuthDB holds identity (actors, keys, memberships, invitations, browser sessions) and is nil on data-plane-only chassis where the admin personality isn't active.

func New

func New(conf config.Config, logger *zap.Logger, reg registry.Registry, mc *metrics.Metrics, bus chan<- *event.Envelope, kv store.Store, runtimeDB, authDB *sql.DB, dbc *dbcache.DbCache, runs *continuation.Runs, guard egress.Guard, secretsResolver *secrets.Resolver) *Unit

New Processor. authDB may be nil on data-plane-only chassis. guard is the outbound op-dial policy consulted at the dial step (see egress).

func (*Unit) DecorateInput

func (pu *Unit) DecorateInput(input string, overrides []resonator.BranchValue) (string, error)

DecorateInput overrides input with preset values, but only if the target path doesn't already exist (set-if-absent semantics — used by SET PRE / SET POST).

Each override.Value flows through runtime.Resolve so future Value shapes (PathRef, FunctionCall) plumb through without touching this function. On a resolution error the call returns the partially-updated input plus the error; per the strict-by-default semantics in internal docs/todo-txcl-expressions.md §5, callers MUST treat the error as a rule-halt signal (silently dropping the SET would hide protocol errors).

func (*Unit) DriveDeferredResume

func (pu *Unit) DriveDeferredResume(runID, stage string)

DriveDeferredResume claims and resumes a deferred-join suspend on a fresh background context (the request ctx is gone). Self-contained and safe for both the suspend-time race guard and the worker-completion path: it re-checks resumability and lets ClaimResume pick the single winner.

func (*Unit) EnableMCPSessionCache

func (pu *Unit) EnableMCPSessionCache(ttl time.Duration)

EnableMCPSessionCache turns on session caching for hot MCP paths. The cache type is unexported (it's an implementation detail of the processor); this method is the construction surface that server.go uses at boot. Idempotent — calling twice replaces the underlying cache (dropping any in-flight entries; safe at boot time before any traffic flows).

func (*Unit) Exec

func (pu *Unit) Exec(ctx context.Context, op operation.Operation) (event.Payload, bool, error)

Exec Execute an operation at this step Exec dispatches an operation to its transport and returns the produced payload. The bool return — authorControlled — reports whether the output came from an author-controlled producer (remote HTTP, sandboxed compute, an MCP tool, or a rule-author mock) as opposed to a trusted built-in core handler, the chassis-owned ai:// namespace, or a synthesized control output. Callers that merge this output into the envelope MUST sanitizeAuthorOutput it when authorControlled is true, so an untrusted producer cannot forge reserved `_txc.*` control fields (tenant, computed auth, budget, …).

func (*Unit) ExecAI added in v0.2.4

func (pu *Unit) ExecAI(ctx context.Context, op operation.Operation) (event.Payload, error)

ExecAI dispatches ai://<sub_op>. v1 supports only ai://chat.

The handler:

  1. Parses the sub-op (chat is the only one wired today).
  2. Decodes op.Meta (WITH-clause materialization) into a chat.Request.
  3. Resolves a backend via chat.Resolve (provider override or first-registered default; capability matching is its own follow-up).
  4. Materializes the backend's RequiredSecrets() through the per-tenant store with optional env-var fallback (AIChatEnvFallback config), reusing the existing chassis pattern: SecretBag, RecordSecretMaterialize, fuelCostSecretMaterialize fuel charge.
  5. Renders {{@path}} prompt templates over op.Input.
  6. Calls backend.Run exactly once (no tool loop in v1).
  7. Validates the response against WITH schema if present (binary ok / failed; no repair semantics).
  8. Emits a chat.completion trace event with provider, model, token counts, routing decision, retries, latency, and schema validation outcome.
  9. Builds the response envelope: top-level text + schema_validated_payload + chat.error (when set) + _txc.chat.* metadata. Token counts are recorded HERE for downstream observability/billing; they are NOT charged to the chassis fuel meter — provider compute is a separate dimension.

Error policy: chat-level failures (auth, provider HTTP/net, schema validation, missing secret, no backend) surface as a structured `chat.error` field on the response envelope so rule authors can handle uniformly via `WHEN @chat.error EXEC ...`. ExecAI returns a non-nil error only for malformed-shape conditions a rule author cannot work around (unrecognized sub-op, op.Meta failing JSON parse, an unset runtime dependency). This gives rule authors one error-handling pattern; structured codes (txco_chat_*) live on the envelope.

func (*Unit) ExecCompute

func (pu *Unit) ExecCompute(ctx context.Context, op operation.Operation) (event.Payload, error)

ExecCompute runs a `compute://<alg>/<digest>` op: it resolves the content-addressed artifact and invokes it on its engine, returning the output as a JSON payload. The output is handed back to Exec's shared tail, so it flows the identical post-EXEC processing (EMIT overlay, per-scope merge, _txc.goto/halt) as http:// and txco:// — the compute transport is not a special case downstream.

func (*Unit) ExecCore

func (pu *Unit) ExecCore(ctx context.Context, op operation.Operation) (event.Payload, error)

ExecCore on core execution (vs remote)

func (*Unit) ExecHTTP

func (pu *Unit) ExecHTTP(ctx context.Context, op operation.Operation) (event.Payload, error)

ExecHTTP Handles execution of http, https operations.

When op.Meta declares `secrets.*` refs (per internal docs/todo-secret-store.md §4), this is the only place cleartext crosses from op.Secrets (the in-process bag) into the outbound wire — applied as request headers or as JSON body field overlays. op.Input is NOT mutated, so the trace event still records the request body the operator authored; the wire body is a separate buffer that lives only for the duration of the HTTP call.

func (*Unit) ExecHTTPAsync

func (pu *Unit) ExecHTTPAsync(ctx context.Context, op operation.Operation, env AsyncEnvelope, token string) (jobID string, err error)

ExecHTTPAsync invokes an async worker. Unlike ExecHTTP it wraps the op input as `{ "input": <op.Input>, "_txc": {…} }` (async-only — ExecHTTP is untouched) and carries the single-use callback token in the X-Txco-Continuation-Token request header. The worker is expected to answer 202 Accepted (optionally `{"job_id":"…"}`); the real result arrives later via the callback endpoint. Returns the worker-supplied job id (optional, may be "").

func (*Unit) ExecMCPHTTP

func (pu *Unit) ExecMCPHTTP(ctx context.Context, op operation.Operation) (payload event.Payload, err error)

ExecMCPHTTP dispatches an MCP-over-HTTP `tools/call` op. Mirrors ExecHTTP structurally and shares pu.HTTPClient so the egress guard, per-op context, and trace pipeline all apply for free.

v0 is correct but not optimized: every op pays the full session lifecycle (initialize → notifications/initialized → tools/call). A session cache per (tenant, endpoint) is a v0.5 optimization that doesn't change this signature or the rule syntax.

func (*Unit) Handle

func (pu *Unit) Handle(prefix []byte, handler event.OpsHandler)

Handle Setup the internal routing table, passing off opnames starting with prefix to this handler

func (*Unit) MakeMockResponse

func (pu *Unit) MakeMockResponse(op operation.Operation, errMsg string) event.Payload

func (*Unit) MergeJSON

func (pu *Unit) MergeJSON(src string, dst string) (string, error)

MergeJSON iterate through top level keys, add them to other struct, return it

func (*Unit) NodeForOp

func (pu *Unit) NodeForOp(ctx context.Context, op operation.Operation) (*registry.Node, error)

NodeForOp Given an operation description, find matching node.

Now that gRPC is removed and HTTP rules dispatch directly to op.Resonator.Exec, this is only consulted for txco:// (local) operations that route by the fixed registry. Kept around because the ExecCore path may still want logical-name resolution in the future; today it's effectively a passthrough.

func (*Unit) OpsForStage

func (pu *Unit) OpsForStage(ctx context.Context, stage string) ([]operation.Operation, error)

OpsForStage lookup operations for given stage. If no rows match the requested stack, the lookup falls back along stack-prefix boundaries: a request at `website/canary/100` that finds nothing tries `website/100`, then “. This implements the overlay model — a sparse `website/canary` tree inherits any scope it doesn't explicitly override from the parent stack. Wildcard stacks (containing SQL LIKE metacharacters like `%`) skip the fallback walk; they already match across stacks at one level.

func (*Unit) OverlayResponse

func (pu *Unit) OverlayResponse(env, output string, overrides []resonator.BranchValue) (string, error)

OverlayResponse writes branch values onto a JSON document with overwrite semantics — the EMIT clause's runtime. Sibling to DecorateInput, which is set-if-absent.

Two distinct documents flow through here:

  • env: the envelope to RESOLVE PathRef / FunctionCall args against. Typically op.Input — the envelope the op saw at dispatch, so EMIT's PathRefs like `@web.req.body` read live envelope state.
  • output: the WRITE accumulator. EMIT's resolved values are sjson-set onto this; the merger picks it up after the op returns. Empty/missing becomes "{}" so callers don't need to special-case EMIT-only rules.

Splitting env from output is load-bearing: if both were the same document (the EMIT accumulator), `EMIT @x = @env.field` couldn't see the envelope at all — `@env.field` would resolve against the initially-empty accumulator and come back nil. The two-arg form is a fix for that.

Each override.Value flows through runtime.Resolve. Resolution errors halt and propagate up (strict-by-default); sjson write errors (path unwritable) keep the existing log-and-continue behavior because the path syntax is a programming bug rather than a value-computation failure.

func (*Unit) ResonatingOps

func (pu *Unit) ResonatingOps(input string, ops []operation.Operation, hashSeed string) ([]operation.Operation, error)

ResonatingOps returns the subset of ops whose WHEN clause matches the input. If any matching op has Priority > 0, only the highest-priority op is returned; otherwise all matching ops are returned (parallel execution at this stage).

Clause evaluation order on a single rule: WHEN, PRE-SET, SELECT, POST-SET, WITH, PRIORITY, EXEC. PRE-SET, SELECT, and POST-SET control the event payload passed to dispatch; WITH carries per-call metadata; PRIORITY tie-breaks matching rules; EXEC is the dispatch target.

func (*Unit) Resume

func (pu *Unit) Resume(ctx context.Context, runID, stage string) error

Resume merges all op-terminal outputs for (runID, stage) in ascending ordinal onto the stage's scope-entry envelope, then drives the opstack forward via advanceAfterScope (the same engine the sync path uses). Called by the winning continuation callback. Writes result.json on terminal completion; if the pipeline re-suspends at a later async barrier nothing is written here — that stage's callbacks drive it. A failed sibling op fails the stage (design: a failed sibling fails the scope). Runs in the caller's ctx, not the dead request ctx.

func (*Unit) Run

func (pu *Unit) Run(ctx context.Context, raw string, stage string, resCh chan event.Payload) error

Run Execute a request.

Snapshot semantics: the first call to Run captures the current dbcache in-memory *sql.DB and attaches it to ctx. All subsequent recursive Run calls (for goto / next-stage advancement within the same request) reuse that snapshot. A `txco apply` that completes mid-request swaps pu.Dbc.Db on the chassis side but leaves the captured pointer untouched — Go's GC keeps the old in-memory DB alive while any request holds a reference. So the opstack a request sees at stage 0 is the same opstack it sees at every later stage in the same request.

Tests that drive Run directly without a snapshot in ctx still work: OpsForStage falls back to pu.Dbc.Db when the ctx key is absent.

func (*Unit) StageParse

func (pu *Unit) StageParse(stage string) (stack string, scope int, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL