Documentation
¶
Index ¶
- Variables
- func WithTenant(ctx context.Context, slug string) context.Context
- type AsyncEnvelope
- type Unit
- func (pu *Unit) DecorateInput(input string, overrides []resonator.BranchValue) (string, error)
- func (pu *Unit) DriveDeferredResume(runID, stage string)
- func (pu *Unit) EnableMCPSessionCache(ttl time.Duration)
- func (pu *Unit) Exec(ctx context.Context, op operation.Operation) (event.Payload, error)
- func (pu *Unit) ExecCompute(ctx context.Context, op operation.Operation) (event.Payload, error)
- func (pu *Unit) ExecCore(ctx context.Context, op operation.Operation) (event.Payload, error)
- func (pu *Unit) ExecHTTP(ctx context.Context, op operation.Operation) (event.Payload, error)
- func (pu *Unit) ExecHTTPAsync(ctx context.Context, op operation.Operation, env AsyncEnvelope, token string) (jobID string, err error)
- func (pu *Unit) ExecMCPHTTP(ctx context.Context, op operation.Operation) (payload event.Payload, err error)
- func (pu *Unit) Handle(prefix []byte, handler event.OpsHandler)
- func (pu *Unit) MakeMockResponse(op operation.Operation, errMsg string) event.Payload
- func (pu *Unit) MergeJSON(src string, dst string) (string, error)
- func (pu *Unit) NodeForOp(ctx context.Context, op operation.Operation) (*registry.Node, error)
- func (pu *Unit) OpsForStage(ctx context.Context, stage string) ([]operation.Operation, error)
- func (pu *Unit) OverlayResponse(env, output string, overrides []resonator.BranchValue) (string, error)
- func (pu *Unit) ResonatingOps(input string, ops []operation.Operation, hashSeed string) ([]operation.Operation, error)
- func (pu *Unit) Resume(ctx context.Context, runID, stage string) error
- func (pu *Unit) Run(ctx context.Context, raw string, stage string, resCh chan event.Payload) error
- func (pu *Unit) StageParse(stage string) (stack string, scope int, err error)
Constants ¶
This section is empty.
Variables ¶
var StagePartsRE = regexp.MustCompile(`(.*)\/+(\d+)$`)
StagePartsRE Set compile limit for stage
Functions ¶
func WithTenant ¶
WithTenant pins a tenant slug onto ctx for op resolution. Exposed for callers that drive Run/OpsForStage outside the normal ingress path (and for tests). Within the normal data plane, Run pins this itself from the envelope's `_txc.tenant`.
Types ¶
type AsyncEnvelope ¶
type AsyncEnvelope struct {
OpContinuationID string `json:"op_continuation_id"`
CallbackURL string `json:"callback_url"`
RunID string `json:"run_id"`
RunContinuationID string `json:"run_continuation_id"`
Stack string `json:"stack"`
Stage string `json:"stage"`
Op string `json:"op"`
ExpiresAt string `json:"expires_at,omitempty"`
}
AsyncEnvelope is the `_txc` block handed to an async worker so it can post one op result back. The single-use bearer token is delivered out-of-band of the body (request header), not in the envelope.
type Unit ¶
type Unit struct {
Conf config.Config
Logger *zap.Logger
Kv store.Store
RuntimeDB *sql.DB
AuthDB *sql.DB
Dbc *dbcache.DbCache
Mc *metrics.Metrics
Bus chan<- *event.Envelope
Reg registry.Registry
Mux *radix.Tree
HTTPClient *http.Client
// Runs is the continuation store wrapper. Used only on the barrier
// (async) path; nil-safe — the sync fast path never touches it.
Runs *continuation.Runs
CallbackBaseURL string
// Sink spawns resume traces from background goroutines (local
// async; future similar work). The same sink the web personality
// uses, exposed here so chassis-internal goroutines that outlive
// their originating request can write traces too. Nil-safe — when
// unset, local async still functions but the resume work is
// untraced (used in unit tests + non-server contexts).
Sink trace.Sink
// MCPSessions caches server-minted `Mcp-Session-Id` values per
// (tenant, endpoint) so hot MCP paths skip the init lifecycle
// (3 HTTPS round-trips → 1). Nil-safe — when unset, ExecMCPHTTP
// runs the full lifecycle every call (test default; backwards-
// compatible).
MCPSessions *mcpSessionCache
// Computes runs op:// computes resolved to "compute://<alg>/<digest>".
// nil-safe: ExecCompute fails loudly if a compute:// op fires while
// this is unset (no engine wired). Set when the chassis is built with
// a compute runtime.
Computes compute.Runner
// Usage is the usage sink. nil-safe. When set, each compute invocation
// emits a usage event (src="compute") alongside the per-request one.
Usage usage.Sink
// Secrets is the per-tenant secret-store Resolver. Non-nil when
// --secret-master-key is configured AND its file loads cleanly at
// boot; nil otherwise. PR 3 will consult this from the processor
// splice that populates op.Secrets between WHEN/SET/SELECT/WITH
// decoration and Exec. While Secrets is nil, any op with
// `secrets` in its WITH clause must fail loud with
// `secret_store_unavailable` rather than silently skip.
Secrets *secrets.Resolver
// contains filtered or unexported fields
}
Unit is the chassis's per-process processor handle. RuntimeDB holds content (ops, stacks, versions, files, tenants) and is always present. AuthDB holds identity (actors, keys, memberships, invitations, browser sessions) and is nil on data-plane-only chassis where the admin personality isn't active.
func New ¶
func New(conf config.Config, logger *zap.Logger, reg registry.Registry, mc *metrics.Metrics, bus chan<- *event.Envelope, kv store.Store, runtimeDB, authDB *sql.DB, dbc *dbcache.DbCache, runs *continuation.Runs, guard egress.Guard, secretsResolver *secrets.Resolver) *Unit
New Processor. authDB may be nil on data-plane-only chassis. guard is the outbound op-dial policy consulted at the dial step (see egress).
func (*Unit) DecorateInput ¶
DecorateInput overrides input with preset values, but only if the target path doesn't already exist (set-if-absent semantics — used by SET PRE / SET POST).
Each override.Value flows through runtime.Resolve so future Value shapes (PathRef, FunctionCall) plumb through without touching this function. On a resolution error the call returns the partially-updated input plus the error; per the strict-by-default semantics in internal docs/todo-txcl-expressions.md §5, callers MUST treat the error as a rule-halt signal (silently dropping the SET would hide protocol errors).
func (*Unit) DriveDeferredResume ¶
DriveDeferredResume claims and resumes a deferred-join suspend on a fresh background context (the request ctx is gone). Self-contained and safe for both the suspend-time race guard and the worker-completion path: it re-checks resumability and lets ClaimResume pick the single winner.
func (*Unit) EnableMCPSessionCache ¶
EnableMCPSessionCache turns on session caching for hot MCP paths. The cache type is unexported (it's an implementation detail of the processor); this method is the construction surface that server.go uses at boot. Idempotent — calling twice replaces the underlying cache (dropping any in-flight entries; safe at boot time before any traffic flows).
func (*Unit) ExecCompute ¶
ExecCompute runs a `compute://<alg>/<digest>` op: it resolves the content-addressed artifact and invokes it on its engine, returning the output as a JSON payload. The output is handed back to Exec's shared tail, so it flows the identical post-EXEC processing (EMIT overlay, per-scope merge, _txc.goto/halt) as http:// and txco:// — the compute transport is not a special case downstream.
func (*Unit) ExecHTTP ¶
ExecHTTP Handles execution of http, https operations.
When op.Meta declares `secrets.*` refs (per internal docs/todo-secret-store.md §4), this is the only place cleartext crosses from op.Secrets (the in-process bag) into the outbound wire — applied as request headers or as JSON body field overlays. op.Input is NOT mutated, so the trace event still records the request body the operator authored; the wire body is a separate buffer that lives only for the duration of the HTTP call.
func (*Unit) ExecHTTPAsync ¶
func (pu *Unit) ExecHTTPAsync(ctx context.Context, op operation.Operation, env AsyncEnvelope, token string) (jobID string, err error)
ExecHTTPAsync invokes an async worker. Unlike ExecHTTP it wraps the op input as `{ "input": <op.Input>, "_txc": {…} }` (async-only — ExecHTTP is untouched) and carries the single-use callback token in the X-Txco-Continuation-Token request header. The worker is expected to answer 202 Accepted (optionally `{"job_id":"…"}`); the real result arrives later via the callback endpoint. Returns the worker-supplied job id (optional, may be "").
func (*Unit) ExecMCPHTTP ¶
func (pu *Unit) ExecMCPHTTP(ctx context.Context, op operation.Operation) (payload event.Payload, err error)
ExecMCPHTTP dispatches an MCP-over-HTTP `tools/call` op. Mirrors ExecHTTP structurally and shares pu.HTTPClient so the egress guard, per-op context, and trace pipeline all apply for free.
v0 is correct but not optimized: every op pays the full session lifecycle (initialize → notifications/initialized → tools/call). A session cache per (tenant, endpoint) is a v0.5 optimization that doesn't change this signature or the rule syntax.
func (*Unit) Handle ¶
func (pu *Unit) Handle(prefix []byte, handler event.OpsHandler)
Handle Setup the internal routing table, passing off opnames starting with prefix to this handler
func (*Unit) MakeMockResponse ¶
func (*Unit) MergeJSON ¶
MergeJSON iterate through top level keys, add them to other struct, return it
func (*Unit) NodeForOp ¶
NodeForOp Given an operation description, find matching node.
Now that gRPC is removed and HTTP rules dispatch directly to op.Resonator.Exec, this is only consulted for txco:// (local) operations that route by the fixed registry. Kept around because the ExecCore path may still want logical-name resolution in the future; today it's effectively a passthrough.
func (*Unit) OpsForStage ¶
OpsForStage lookup operations for given stage. If no rows match the requested stack, the lookup falls back along stack-prefix boundaries: a request at `website/canary/100` that finds nothing tries `website/100`, then “. This implements the overlay model — a sparse `website/canary` tree inherits any scope it doesn't explicitly override from the parent stack. Wildcard stacks (containing SQL LIKE metacharacters like `%`) skip the fallback walk; they already match across stacks at one level.
func (*Unit) OverlayResponse ¶
func (pu *Unit) OverlayResponse(env, output string, overrides []resonator.BranchValue) (string, error)
OverlayResponse writes branch values onto a JSON document with overwrite semantics — the EMIT clause's runtime. Sibling to DecorateInput, which is set-if-absent.
Two distinct documents flow through here:
- env: the envelope to RESOLVE PathRef / FunctionCall args against. Typically op.Input — the envelope the op saw at dispatch, so EMIT's PathRefs like `@web.req.body` read live envelope state.
- output: the WRITE accumulator. EMIT's resolved values are sjson-set onto this; the merger picks it up after the op returns. Empty/missing becomes "{}" so callers don't need to special-case EMIT-only rules.
Splitting env from output is load-bearing: if both were the same document (the EMIT accumulator), `EMIT @x = @env.field` couldn't see the envelope at all — `@env.field` would resolve against the initially-empty accumulator and come back nil. The two-arg form is a fix for that.
Each override.Value flows through runtime.Resolve. Resolution errors halt and propagate up (strict-by-default); sjson write errors (path unwritable) keep the existing log-and-continue behavior because the path syntax is a programming bug rather than a value-computation failure.
func (*Unit) ResonatingOps ¶
func (pu *Unit) ResonatingOps(input string, ops []operation.Operation, hashSeed string) ([]operation.Operation, error)
ResonatingOps returns the subset of ops whose WHEN clause matches the input. If any matching op has Priority > 0, only the highest-priority op is returned; otherwise all matching ops are returned (parallel execution at this stage).
Clause evaluation order on a single rule: WHEN, PRE-SET, SELECT, POST-SET, WITH, PRIORITY, EXEC. PRE-SET, SELECT, and POST-SET control the event payload passed to dispatch; WITH carries per-call metadata; PRIORITY tie-breaks matching rules; EXEC is the dispatch target.
func (*Unit) Resume ¶
Resume merges all op-terminal outputs for (runID, stage) in ascending ordinal onto the stage's scope-entry envelope, then drives the opstack forward via advanceAfterScope (the same engine the sync path uses). Called by the winning continuation callback. Writes result.json on terminal completion; if the pipeline re-suspends at a later async barrier nothing is written here — that stage's callbacks drive it. A failed sibling op fails the stage (design: a failed sibling fails the scope). Runs in the caller's ctx, not the dead request ctx.
func (*Unit) Run ¶
Run Execute a request.
Snapshot semantics: the first call to Run captures the current dbcache in-memory *sql.DB and attaches it to ctx. All subsequent recursive Run calls (for goto / next-stage advancement within the same request) reuse that snapshot. A `txco apply` that completes mid-request swaps pu.Dbc.Db on the chassis side but leaves the captured pointer untouched — Go's GC keeps the old in-memory DB alive while any request holds a reference. So the opstack a request sees at stage 0 is the same opstack it sees at every later stage in the same request.
Tests that drive Run directly without a snapshot in ctx still work: OpsForStage falls back to pu.Dbc.Db when the ctx key is absent.