plan

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 27, 2026 License: MIT Imports: 18 Imported by: 0

Documentation

Overview

Package plan holds Prism's logical DAG: the Node interface every node type satisfies, the immutable DAG type with structural-sharing optimizer passes, the sequential / bounded-pool executor, and the DOT / text / JSON renderers used by `prism plan`.

P02 shipped only the minimal Node shape needed by SourceNode (no Schema method) and an optional SchemaProbe capability. P03 widens Node to the canonical interface from design/05-dag-executor.md so every node — including the twelve P03 stubs — can declare its output schema without executing. SourceNode satisfies the new Schema(in) hook by delegating to its existing OutputSchema().

Decision: see D028 — Node.Schema(in) is required, not optional.

Index

Constants

This section is empty.

Variables

View Source
var DefaultPasses []Pass

DefaultPasses is the canonical pass list the executor would run if given no override. P07 populates it with the 5 passes from design/05-dag-executor.md, ordered per D047 (semantics-preserving passes first, sampling last). The slice is intentionally NOT pre-populated here — passes live in plan/passes/, which imports plan/, so registration happens via plan.SetDefaultPasses called from a small init shim in plan/passes/register.go. Optimize works correctly against either an empty or populated slice; tests cover both shapes.

Functions

func CacheKey

func CacheKey(n Node, ins []*table.Table) string

CacheKey is the deterministic cache key for one node execution. Computed as sha256 of (NodeID, Fingerprint, each input table's Hash()), hex-encoded. Two nodes with the same fingerprint but different IDs produce different keys because the ID is part of the digest — so the cache cannot accidentally share a result between two visually-equivalent nodes that the optimizer has chosen to keep distinct.

func RenderDOT

func RenderDOT(d *DAG, w io.Writer) error

RenderDOT emits a Graphviz-compatible representation of d to w.

Output shape:

digraph prism_plan {
  rankdir=LR;
  node [shape=box, style=rounded];
  "<id>" [label="<kind>\n<shortid>\n<summary>"];
  "<from>" -> "<to>";
}

Nodes and edges sort deterministically (lexicographic on NodeID and from/to pair respectively) so re-rendering identical DAGs produces byte-identical output — required for golden tests.

Approximate footprint: ~50 LOC per design/05-dag-executor.md.

func RenderJSON

func RenderJSON(d *DAG, w io.Writer) error

RenderJSON emits a stable JSON serialisation of d to w. The output is consumed by goldens (plan visualisation regression tests, future optimizer-pass before/after snapshots) so the field set and ordering are part of the contract — change with care.

func RenderText

func RenderText(d *DAG, w io.Writer) error

RenderText emits an indented, tree-style listing of d to w rooted at each sink, walking inputs depth-first. Two-space indent per depth level. Intended for terminal output — concise, copy-paste friendly.

func SetDefaultPasses

func SetDefaultPasses(ps []Pass)

SetDefaultPasses installs the canonical pass list. Called by the plan/passes package's init function. Exposed so external consumers can substitute a custom list before invoking Optimize.

Types

type Backend

type Backend interface {
	// Compile executes one node against its materialised input tables
	// and returns the resulting output table. ctx propagation is
	// best-effort — impls may honour cancellation between rows or only
	// at op boundaries.
	Compile(ctx context.Context, node Node, ins []*table.Table) (*table.Table, error)
}

Backend is the contract the compiler uses to execute one DAG node against its materialised input tables. plan/nodes never call into Pulse (or any specific compute engine) directly: they route Execute through whichever Backend the builder injected. Concrete impls live in compile/ (the in-memory backend ships in P04; future Pulse / DuckDB / Arrow backends drop in behind the same interface).

The interface lives in plan/ — not compile/ — because every plan node consumes it. Inverting the layering would force plan/nodes to import compile/ and risk an import cycle (compile/ already imports plan/ for the Node interface). See D032.

Nodes with no injected backend fall back to PRISM_COMPILE_001 to preserve P03's stub semantics (see D033 for the injection mechanism).

type Builder

type Builder struct {
	// contains filtered or unexported fields
}

Builder constructs a DAG. AddNode + MarkRoot + MarkSink populate state; Build validates the result and produces an immutable *DAG.

The Builder is intentionally narrow — no Edge() method, because edges are already implicit in each Node's Inputs(). Build verifies every input target exists in the node set and at least one sink exists.

func NewBuilder

func NewBuilder() *Builder

NewBuilder returns an empty Builder.

func (*Builder) AddNode

func (b *Builder) AddNode(n Node) error

AddNode registers n. Returns an error if a node with the same id is already present (duplicates are always a bug in the spec → DAG translator).

func (*Builder) AddNodeUnchecked

func (b *Builder) AddNodeUnchecked(n Node)

AddNodeUnchecked is identical to AddNode minus duplicate detection. Test-only.

func (*Builder) Build

func (b *Builder) Build() (*DAG, error)

Build validates the staged graph and returns the immutable *DAG.

Validation:

  • Every node's Inputs() target must exist in the node set.
  • Every root node must have zero Inputs.
  • At least one sink must be marked.

Topology (cycles) is checked separately by DAG.TopoLevels so callers can build and inspect a graph that contains a cycle for debug renderers.

func (*Builder) BuildUnchecked

func (b *Builder) BuildUnchecked() *DAG

BuildUnchecked skips input/root validation so tests can hand-build cycles and assert TopoLevels detects them. Test-only.

func (*Builder) MarkRoot

func (b *Builder) MarkRoot(id NodeID) error

MarkRoot declares id as a source node. Build verifies the node has no Inputs(); the marker is only for fast iteration (Roots()).

func (*Builder) MarkSink

func (b *Builder) MarkSink(id NodeID) error

MarkSink declares id as a terminal node. Build verifies at least one sink is registered.

func (*Builder) NodeIDs

func (b *Builder) NodeIDs() map[NodeID]struct{}

NodeIDs returns a copy of the set of currently-registered ids. Used by the build subpackage to detect "the newest leaf" after a dataset registration.

type ChildDAG

type ChildDAG struct {
	// DAG is the sub-plan for this child (one layer or one panel).
	DAG *DAG
	// Tip is the sub-plan's sole sink — the table the encoder reads.
	Tip NodeID
	// Spec is the merged child spec (parent datasets / data already
	// folded in by BuildComposite).
	Spec *spec.Spec
}

ChildDAG carries one child's plan + the encoder-facing spec. The spec is forwarded so the encoder can read the child's mark, encoding, title etc. without re-walking the parent.

type CompositeDAG

type CompositeDAG struct {
	Kind     CompositeKind
	Rows     int
	Cols     int
	Children []ChildDAG
	Resolve  *spec.Resolve
}

CompositeDAG is the plan-stage representation of a composite spec (layer / concat / hconcat / vconcat). Per D049 + D050 each child owns its own sub-DAG; the executor handles each child independently via the existing plan.Execute entry point.

Layout metadata (Rows, Cols) is normalised by BuildComposite based on the composition kind:

  • Layer: Rows=1, Cols=1 (cells flatten into one Scene's layers).
  • HConcat: Rows=1, Cols=len(Children).
  • VConcat: Rows=len(Children), Cols=1.
  • Concat: treated as HConcat in v1 (D053).
  • Facet: Rows=0, Cols=0 placeholders (D054); the encoder fills in concrete dimensions after partitioning the upstream table.
  • Repeat: Rows=len(repeat.Row) or 1, Cols=len(repeat.Column) or 1 (D056); per-cell sub-DAGs land in Children in row-major order.

Resolve carries cross-layer scale / axis resolution and is only meaningful when Kind == CompositeLayer; concat ignores it (cross- panel shared scales arrive with facet in P09).

type CompositeKind

type CompositeKind string

CompositeKind discriminates the six composition primitives Prism supports as of P09. Selection-rooted composition stays deferred to P13.

const (
	// CompositeLayer is `spec.Layer[]` — N layers in one chart with
	// (optional) cross-layer scale resolution.
	CompositeLayer CompositeKind = "layer"
	// CompositeConcat is `spec.Concat[]` — flat array of side-by-side
	// panels. In v1 this is functionally identical to CompositeHConcat
	// (D053); the `columns` wrap parameter lands in a future phase.
	CompositeConcat CompositeKind = "concat"
	// CompositeHConcat is `spec.HConcat[]` — 1 row × N cols.
	CompositeHConcat CompositeKind = "hconcat"
	// CompositeVConcat is `spec.VConcat[]` — N rows × 1 col.
	CompositeVConcat CompositeKind = "vconcat"
	// CompositeFacet is `spec.Facet{row, column}` — small multiples
	// driven by distinct values of the row / column field(s) (D054).
	// The builder returns a single shared sub-DAG (the parent's
	// pipeline); the encoder partitions the resulting Table by
	// `(row_value, col_value)` tuples and emits one SceneCell per
	// partition. `len(Children) == 1` is the convention "single
	// pipeline, encoder fans out" for facet.
	CompositeFacet CompositeKind = "facet"
	// CompositeRepeat is `spec.Repeat{row, column}` — small multiples
	// driven by a field-list. Each cell substitutes its field name
	// into the child spec and builds an independent sub-DAG (D056),
	// so `len(Children) == rows * cols` for repeat.
	CompositeRepeat CompositeKind = "repeat"
)

type DAG

type DAG struct {
	// contains filtered or unexported fields
}

DAG is the immutable plan graph. Constructed via Builder; mutated only by optimizer passes that return new DAG instances with structural sharing (D017). All public accessors return sorted / stable views so downstream code (renderers, executor, tests) sees deterministic ordering across runs.

func Optimize

func Optimize(d *DAG, passes []Pass) (*DAG, error)

Optimize runs the pass list to fixed point: the loop repeats until no pass mutates the DAG. Each iteration runs every pass once; passes can re-enable each other across iterations.

A bounded iteration cap guards against pathological pass interactions. Hitting the cap returns a generic error, not an AppError code — it indicates a developer bug, not a user-visible fault. Future profiling could surface this as a debug-only metric.

func (*DAG) Dependents

func (d *DAG) Dependents(id NodeID) []NodeID

Dependents returns the ids of nodes whose Inputs() include id, in stable sorted order. O(N) over the node set; fine for the small DAGs Prism builds. P07 may cache this if profiling motivates.

func (*DAG) Node

func (d *DAG) Node(id NodeID) (Node, bool)

Node looks up a node by id. The second return is false when no such node exists in the DAG.

func (*DAG) Nodes

func (d *DAG) Nodes() []NodeID

Nodes returns every node id in the DAG, sorted lexicographically. Determinism is important for goldens; using map iteration order would produce flaky DOT/JSON output.

func (*DAG) Roots

func (d *DAG) Roots() []NodeID

Roots returns the source node ids (no upstream). Sorted.

func (*DAG) Sinks

func (d *DAG) Sinks() []NodeID

Sinks returns the terminal node ids (the ones the Scene encoder reads in P05+). Sorted.

func (*DAG) Size

func (d *DAG) Size() int

Size returns the node count. Cheap; useful in tests and metrics.

func (*DAG) TopoLevels

func (d *DAG) TopoLevels() ([][]NodeID, error)

TopoLevels runs Kahn's algorithm over the DAG and returns the result as a slice of levels. Each level is a slice of NodeIDs whose upstream dependencies have all been scheduled in earlier levels; the executor consumes one level at a time.

Within a level, IDs are sorted lexicographically so test goldens stay deterministic. Across runs, the level shape is identical given identical inputs.

On a cyclic graph (Kahn cannot schedule every node), returns a PRISM_PLAN_001 AppError whose context carries one representative id from the cycle and the count of unscheduled nodes.

func (*DAG) WithNode

func (d *DAG) WithNode(n Node) *DAG

WithNode returns a new DAG with n added or replaced. Roots/sinks are copied verbatim; the caller is responsible for re-marking roots and sinks if the structure change requires it. All other node pointers are shared (structural sharing per D017).

func (*DAG) WithRootAdded

func (d *DAG) WithRootAdded(id NodeID) *DAG

WithRootAdded returns a new DAG with id appended to the root list (idempotent — adding an id that is already a root is a no-op). Optimizer passes that replace a source-rooted subtree call this on the new replacement node so it inherits the original source's root marker.

func (*DAG) WithSinkAdded

func (d *DAG) WithSinkAdded(id NodeID) *DAG

WithSinkAdded returns a new DAG with id appended to the sink list (idempotent — adding an id that is already a sink is a no-op). Passes that collapse a chain whose tail node was a sink call this so the replacement node terminates the graph in the tail's place.

func (*DAG) WithoutNode

func (d *DAG) WithoutNode(id NodeID) *DAG

WithoutNode returns a new DAG with id removed. If id was a root or sink, it is removed from those lists too.

type ExecOpts

type ExecOpts struct {
	// Workers is the upper bound on goroutines per level. 0 = consult
	// PRISM_QUERY_WORKERS (set positive) else runtime.NumCPU(); 1 =
	// strictly sequential.
	Workers        int
	Cache          TableCache
	JoinMaxRows    int
	PerNodeTimeout time.Duration
	AbortOnError   bool
	OnNodeStart    func(NodeID)
	OnNodeDone     func(NodeID, time.Duration, error)
}

ExecOpts controls one Execute call. Zero values are sensible: Workers=0 consults PRISM_QUERY_WORKERS env, then falls back to runtime.NumCPU(); Cache=nil disables memoization; PerNodeTimeout=0 disables per-node timeouts; AbortOnError=false is partial-failure mode.

type ExecResult

type ExecResult struct {
	Tables  map[NodeID]*table.Table
	Errors  []NodeError
	Elapsed time.Duration
}

ExecResult carries the per-node tables that successfully materialised plus per-failed-node errors.

func Execute

func Execute(ctx context.Context, d *DAG, opts ExecOpts) (*ExecResult, error)

Execute runs d through the executor with the given options.

Workers resolution order (P07):

  1. ExecOpts.Workers > 0 → use as-is.
  2. ExecOpts.Workers == 0 AND PRISM_QUERY_WORKERS > 0 → env wins.
  3. Otherwise → runtime.NumCPU().

Workers == 1 is the sequential path (P03 contract).

Partial-failure policy (D006): a node whose Execute returns an error leaves its dependents un-runnable (the inputsReady check detects the missing table); sibling paths continue. AbortOnError flips to fail-fast: returns the first NodeError immediately.

ctx cancellation is honoured at level boundaries.

type LRU

type LRU struct {
	// contains filtered or unexported fields
}

LRU is a thread-safe bounded TableCache backed by a doubly-linked list + map. Eviction is least-recently-used: each Get/Put moves the touched entry to the front; capacity overflow drops the tail.

Capacity defaults to limits.DefaultTableCacheSize when caller passes a non-positive value to NewLRU. Hits/Misses counters power the executor-level cache-hit test (see TestPrismTableCacheHit in plan/cache_test.go).

func NewLRU

func NewLRU(capacity int) *LRU

NewLRU constructs a bounded LRU. capacity <= 0 falls back to limits.DefaultTableCacheSize (consulted via env override).

func (*LRU) Capacity

func (c *LRU) Capacity() int

Capacity returns the configured capacity. Test-only.

func (*LRU) Get

func (c *LRU) Get(key string) (*table.Table, bool)

Get returns the cached table for key, moving the entry to the front of the recency list. Hit / miss counters increment per call.

func (*LRU) Hits

func (c *LRU) Hits() int64

Hits returns the cumulative cache-hit count since construction. Atomic read so callers can sample from any goroutine.

func (*LRU) Len

func (c *LRU) Len() int

Len returns the current number of entries. Test-only.

func (*LRU) Misses

func (c *LRU) Misses() int64

Misses returns the cumulative cache-miss count since construction.

func (*LRU) Put

func (c *LRU) Put(key string, t *table.Table)

Put inserts (or updates) key → t at the front of the recency list. Overflow evicts the tail entry; updates do not change capacity.

type Labeled

type Labeled interface {
	// Kind returns a short type name like "FilterNode" or "JoinNode".
	Kind() string
	// Summary returns a one-line parameter summary used in DOT labels
	// (e.g. "expr: score > 50"). Empty is acceptable.
	Summary() string
}

Labeled is the optional capability a Node implements when it wants the renderers to print something richer than its Go type name. The renderers fall back to reflect-based detection when a node does not satisfy this interface.

type Node

type Node interface {
	// ID returns the stable identifier for this node.
	ID() NodeID
	// Inputs returns the upstream NodeIDs this node depends on. A
	// SourceNode returns nil (it has no upstream).
	Inputs() []NodeID
	// Schema returns the node's output schema given its inputs'
	// schemas. Nodes that cannot compute their schema without
	// execution data (rare; Pivot is the only P03 example) return
	// the first input schema and document the gap.
	Schema(in []*encoding.Schema) (*encoding.Schema, error)
	// Execute runs the node and returns the materialised Table. The
	// `in` slice carries upstream Tables in declaration order; for a
	// SourceNode it is always nil. Stubbed P03 nodes return
	// PRISM_COMPILE_001 here.
	Execute(ctx context.Context, in []*table.Table) (*table.Table, error)
	// Fingerprint returns a deterministic string capturing this node's
	// identity (op + parameters) used as a cache key component.
	Fingerprint() string
}

Node is the contract every DAG node satisfies. It is the only interface the executor knows about: schedulers, optimizer passes, renderers, and the cache key builder all consume Nodes through this surface.

Schema(in) lets callers reason about a node's output shape without executing — required for DAG visualisation, optimizer-pass eligibility, and the stubbed P03 nodes whose Execute bodies return PRISM_COMPILE_001 until P04. The `in` slice carries upstream schemas in declaration order; nodes that ignore inputs (Source) pass nil.

Fingerprint is a deterministic string capturing this node's identity (op + parameters). The cache key builder combines it with each input Table's Hash() to form a content-addressable cache key.

type NodeError

type NodeError struct {
	Node NodeID
	Code string
	Err  error
}

NodeError is the per-failed-node detail entry.

func (*NodeError) Error

func (e *NodeError) Error() string

Error implements the error interface.

func (*NodeError) Unwrap

func (e *NodeError) Unwrap() error

Unwrap returns the inner error so errors.As / errors.Is work.

type NodeID

type NodeID string

NodeID is the stable identifier for one DAG node. Implementations choose the format (path basename, sha hash, monotonic counter); the executor only needs equality semantics. IDs are case-sensitive and must be unique within a single DAG.

type Pass

type Pass interface {
	Name() string
	Apply(*DAG) (*DAG, bool, error)
}

Pass is one optimizer transformation. Apply returns the (possibly new) DAG, a flag indicating whether anything changed, and an error. Passes MUST NOT mutate the input DAG — they return a fresh *DAG with structural sharing (WithNode / WithoutNode) for the changes they introduce.

type TableCache

type TableCache interface {
	Get(key string) (*table.Table, bool)
	Put(key string, t *table.Table)
}

TableCache is the interface every Prism table cache satisfies. P03 ships the interface only; the LRU implementation lands in P07. ExecOpts.Cache stays nil in P03.

Directories

Path Synopsis
Package build translates a *spec.Spec into a *plan.DAG.
Package build translates a *spec.Spec into a *plan.DAG.
Package nodes holds the DAG node implementations.
Package nodes holds the DAG node implementations.
Package passes holds the optimizer passes.
Package passes holds the optimizer passes.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL