pulse

package module
v0.8.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 19, 2026 License: MIT Imports: 23 Imported by: 0

README

Pulse

High-performance, self-describing tabular data processing engine written in Go. Ships as a Go library (github.com/frankbardon/pulse) and a CLI binary (cmd/pulse/). Library is primary; CLI is a thin adapter.

Pulse reads and writes .pulse files — a compact binary format with an inline schema, categorical dictionaries, and per-field descriptions. Import from CSV, TSV, NDJSON, JSON array, Parquet, Apache Arrow IPC, or Excel; run aggregations, filters, groupers, windows, features, statistical tests, and regressions; export back to any supported format.

LLM-native by design: every command supports --json, every operator declares its accepted types and streamability in the manifest, an embedded skill pack teaches agents how to operate Pulse, an embedded request-example library gives them runnable templates, and pulse mcp serves the full surface over Model Context Protocol.

Installation

From source
git clone https://github.com/frankbardon/pulse.git
cd pulse
make build
# Binary at ./bin/pulse
Go install
go install github.com/frankbardon/pulse/cmd/pulse@latest

Requires Go 1.22+.

Quickstart

Import a CSV into a .pulse file
pulse import csv --input data.csv --output data.pulse

Schema is inferred from a sample of rows (default 500). To supply an explicit schema:

# Generate a schema template from your data
pulse import schema-template data.csv > schema.json

# Edit schema.json — adjust types, add descriptions
pulse import csv --input data.csv --schema schema.json --output data.pulse
Inspect the file
pulse cohort inspect data.pulse --json

Returns field names, types, byte offsets, descriptions, and categorical dictionaries (truncated by default; pass --full-dict for the full mapping).

Run an aggregation

Create a request file (request.json):

{
  "cohort": {"filename": "data.pulse"},
  "aggregations": [
    {"type": "AGG_COUNT", "field": "id", "label": "total"},
    {"type": "AGG_AVERAGE", "field": "score", "label": "avg_score"}
  ]
}
pulse api process --request request.json --json
Filter, group, and aggregate
{
  "cohort": {"filename": "data.pulse"},
  "filterers": [
    {"type": "FILTER_RANGE", "field": "score", "values": ["80", "100"]}
  ],
  "groups": [
    {"type": "GROUP_CATEGORY", "field": "brand"}
  ],
  "aggregations": [
    {"type": "AGG_COUNT", "field": "id", "label": "count"},
    {"type": "AGG_AVERAGE", "field": "score", "label": "avg_score"},
    {"type": "AGG_STDDEV", "field": "score", "label": "stddev"}
  ]
}
Smart defaults

If you name a field but omit type, Pulse fills in a sensible operator from the schema type — AGG_SUM for numerics, AGG_FREQUENCY for categoricals, GROUP_RANGE (interval 10) for numerics, GROUP_CATEGORY for categoricals, GROUP_DATE ("day") for dates. Disable with --no-defaults or pulse.Options{DisableDefaults: true}. The full rule table lives in descriptor/defaults.go.

Validate before executing
pulse api predict --request request.json --json

Returns the proposed schema, applied defaults, streamability, warnings (e.g., numeric aggregation on a categorical field), and structural errors without touching record data. Pass --strict to treat warnings as errors.

Streaming

For requests Pulse can stream (single-pass aggregations on non-decimal fields, no buffered ops), use --stream to get NDJSON one row per line:

pulse api process --request request.json --stream
pulse api compose --request batch.json --stream

Library equivalent: pulse.ProcessStream(ctx, req) returns a RowIter.

Parallel compose
pulse api compose --request batch.json --parallel 4 --json

Library equivalent: pulse.ComposeParallel(ctx, req, pulse.ComposeOptions{MaxWorkers: 4}). Order-preserving by slot; --no-fail-fast aggregates errors instead of cancelling on first failure.

Natural-language ask

Beta — do not trust yet. The natural-language query path (pulse api ask --query, pulse_ask with query) is experimental. Parsing is heuristic, coverage is partial, and silent misinterpretation is possible. Inspect the resolved request before relying on the result, and prefer a structured request for production workloads.

pulse api ask --file data.pulse --query "average revenue by region, top 5"

Parses the query against the schema, validates, and executes — one round trip. Pass --predict to validate without executing, --on-invalid suggest to get structured fixup hints instead of an error.

Export and convert
pulse export csv --input data.pulse --output results.csv
pulse export parquet --input data.pulse --output results.parquet

pulse convert data.csv data.parquet
pulse convert data.xlsx output.tsv --schema schema.json

Format auto-detected from extensions. convert does not write an intermediate .pulse unless --keep-pulse=path.

Sample rows / facet a field
pulse api sample --input data.pulse --count 10
pulse api facet --input data.pulse --field brand
Synthetic data

Generate a deterministic synthetic cohort from a schema spec or from a previously-captured profile:

pulse synth from-schema --spec spec.json --output synth.pulse --seed 42
pulse profile create --input real.pulse --output profile.json --include-correlations
pulse synth from-profile --profile profile.json --output synth.pulse --rows 100000 --seed 42

12 distributions (normal, lognormal, poisson, exponential, pareto, bernoulli, weighted_categorical, regex, uniform, uniform_date, monotonic_from, constant), pairwise correlations, value constraints. See the synthetic-data skill.

CLI Reference

pulse
├── --json [--slim]                       Root manifest (self-description)
├── import
│   ├── csv|tsv|ndjson|jsonarray|         --input FILE --output FILE [--schema FILE]
│   │   parquet|arrow|excel
│   ├── auto SOURCE                       Managed import (auto-detect format)
│   ├── list                              List managed import handles
│   ├── drop HANDLE                       Drop a managed handle
│   ├── predict                           --input FILE [--schema FILE] --json
│   └── schema-template INPUT             Generate editable schema from source
├── export
│   ├── csv|tsv|ndjson|jsonarray|         --input FILE --output FILE
│   │   parquet|arrow|excel
│   └── predict                           --input FILE --format FORMAT --json
├── convert INPUT OUTPUT [--from F] [--to F] [--schema FILE] [--keep-pulse PATH]
│   └── predict INPUT OUTPUT --json
├── cohort
│   ├── inspect PATH [--json] [--full-dict]
│   └── filter --input FILE --output FILE --filter EXPR
├── api
│   ├── process --request FILE [--json] [--stream] [--no-defaults]
│   ├── compose --request FILE [--json] [--stream] [--parallel N] [--no-fail-fast]
│   ├── sample --input FILE --count N
│   ├── facet --input FILE --field NAME
│   ├── predict --request FILE --json [--strict]
│   └── ask [--file F] [--query Q] [--request FILE] [--predict] [--on-invalid suggest]
├── synth
│   ├── from-schema --spec FILE --output FILE [--rows N] [--seed N]
│   └── from-profile --profile FILE --output FILE --rows N [--seed N]
├── profile
│   └── create --input FILE --output FILE [--top-k N] [--include-correlations]
├── skills
│   ├── list [--json]
│   └── show NAME
├── examples
│   ├── search [--query Q] [--tag T ...] [--category C] [--json]
│   └── show NAME [--json]
├── errors
│   ├── lookup CODE [--json]
│   └── list [--domain D] [--query Q] [--json]
└── mcp [--data-dir DIR] [--bind-on-open]   Serve MCP over stdio

Every leaf supports --json for output wrapped in a descriptor.Envelope with format_version: "1.0", data, errors, and warnings.

MCP Usage

Pulse ships an MCP (Model Context Protocol) server that exposes the full library surface to AI clients (Claude Desktop, Claude Code, Cursor, any MCP-aware host). One binary, one command:

pulse mcp --data-dir /path/to/data

PULSE_DATA_DIR is honored if --data-dir is omitted. The server speaks JSON-RPC over stdio (logs to stderr).

Wiring into a host

For Claude Desktop, add to ~/Library/Application Support/Claude/claude_desktop_config.json:

{
  "mcpServers": {
    "pulse": {
      "command": "pulse",
      "args": ["mcp"],
      "env": {
        "PULSE_DATA_DIR": "/Users/me/data"
      }
    }
  }
}

For Claude Code:

claude mcp add pulse --env PULSE_DATA_DIR=/Users/me/data -- pulse mcp

Restart the host. Pulse tools appear in the tool list.

Tool surface
Tool Purpose
pulse_manifest Call first. Self-description: commands, operators, accepted types, tests, regressions, distributions, error codes, MCP tools, cohort field types. Cache once per session.
pulse_ask Preferred entry point. One-shot import → inspect → predict → execute. Accepts source (raw file) + query (natural language, beta — see below) or a structured request.
pulse_inspect Read header + schema (no record bytes). Also binds session-scoped field-name enums on action tools.
pulse_predict Validate a request against the schema without executing.
pulse_process Execute one pre-built request.
pulse_compose Execute a batch of requests in one round trip.
pulse_sample Return up to N rows for preview.
pulse_facet Distinct values for a single field.
pulse_import Import a tabular source into a managed .pulse handle (TTL-tracked, default 7d).
pulse_drop Drop a managed handle.
pulse_imports_list Enumerate managed handles with sidecar metadata.
pulse_examples_search Search the embedded request-example library by query, tags, category.
pulse_examples_get Fetch one runnable example by name.
pulse_errors_lookup Per-code Message + Fixup detail (kept out of the manifest for context economy).
pulse_skills_list / pulse_skills_get Embedded skill pack.
Resources and prompts
URI What
pulse://<path> One per .pulse file under the data directory. Read returns descriptor.InspectResult JSON.
pulse-skill://<name> One per embedded skill. Read returns the markdown body.

Two prompts (pulse-bootstrap, pulse-author-request) are registered for hosts that surface them as slash commands.

  1. pulse_manifest once. Cache the result.
  2. pulse_ask with source + query (or cohort + query) for everything else.
  3. Reach for pulse_predict / pulse_sample / pulse_facet / pulse_compose only when you need finer control than the one-shot affords.

Natural-language query is beta. Heuristic parsing only — silent misinterpretation is possible. Always check query_resolution and the resolved request in the response before trusting results. For production, author a structured request against the cached manifest and skip the query field.

Schema-bound enums

After a successful pulse_inspect (or after pulse_ask opens a cohort), the server registers session-scoped variants of the action tools whose JSON Schemas embed enums on field-name parameters — picked from a typed list rather than free-texted. Works on SSE / Streamable HTTP transports; on stdio the session does not support tool overrides, so enums are advisory and pulse_predict remains the validation gate. Disable with --bind-on-open=false.

The mcp-integration skill (pulse skills show mcp-integration) is the authoritative reference.

Embedding Pulse in a Go Application

Pulse is library-first. The CLI is a thin adapter over the public API.

package main

import (
    "context"
    "fmt"
    "log"

    "github.com/frankbardon/pulse"
    pio "github.com/frankbardon/pulse/io"
    "github.com/frankbardon/pulse/io/csv"
    "github.com/frankbardon/pulse/types"
)

func main() {
    ctx := context.Background()

    p, err := pulse.New(pulse.Options{DataDir: "/var/data"})
    if err != nil {
        log.Fatal(err)
    }

    // Import a CSV.
    importJob := &pio.ImportJob{
        Source: csv.NewReader(nil, "input.csv"),
        Target: "dataset.pulse",
    }
    report, err := p.Import(ctx, importJob)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Imported %d rows\n", report.RowsImported)

    // Run an aggregation.
    resp, err := p.Process(ctx, &pulse.Request{
        Cohort: &types.Cohort{Filename: "dataset.pulse"},
        Aggregations: []*types.Aggregation{
            {Type: types.AGG_AVERAGE, Field: "score", Label: "avg"},
        },
    })
    if err != nil {
        log.Fatal(err)
    }
    fmt.Println(resp.Data)

    // Inspect a file.
    result, err := p.Inspect(ctx, "dataset.pulse")
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Fields: %d\n", result.FieldCount)

    // Validate before execution.
    prediction, err := p.Predict(ctx, &pulse.Request{
        Cohort: &types.Cohort{Filename: "dataset.pulse"},
        Aggregations: []*types.Aggregation{
            {Type: types.AGG_SUM, Field: "revenue"},
        },
    })
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Warnings: %v\n", prediction.Warnings)
}
Public facade

pulse.Pulse exposes: New, Open, Process, ProcessStream, Compose, ComposeParallel, Ask, Import, ImportFile, Drop, Imports, SweepImports, ResolveImport, Export, Convert, Inspect, Predict, Sample, Facet, Synth, Profile, Manifest, plus example/error lookup helpers.

Custom filesystem

Pulse accepts any afero.Fs for testing or non-local backends:

import "github.com/spf13/afero"

// In-memory filesystem for testing
p, _ := pulse.New(pulse.Options{
    FS: afero.NewMemMapFs(),
})

fs.NewMemMap() returns a complete in-memory Config for hermetic tests — no disk I/O.

Operator catalogue

Counts as currently registered (the manifest is the source of truth — pulse --json --slim):

  • 16 aggregators (AGG_*): COUNT, SUM, AVERAGE, MIN, MAX, MEDIAN, STDDEV, RANGE, FREQUENCY, MODE, PERCENTILE, ZSCORE, KURTOSIS, …
  • 9 attributes (ATTR_*): ZSCORE, TSCORE, NORMALIZED, FORMULA, PERCENTILE, DATE_PART, …
  • 5 filterers (FILTER_*): INCLUDE, EXCLUDE, RANGE, EXPRESSION, NULL
  • 5 groupers (GROUP_*): CATEGORY, RANGE, ROUNDED, DATE, QUANTILE
  • 10 windows (WIN_*): LAG, LEAD, ROW_NUMBER, RANK, DENSE_RANK, RUNNING_SUM, RUNNING_AVG, MOVING_AVG, EWMA, PCT_CHANGE
  • 9 features (FEAT_*): LOG, SQRT, BUCKETIZE, ONE_HOT, FREQUENCY_ENCODE, TARGET_ENCODE, DATE_FEATURES, TRAIN_TEST_SPLIT, POLY
  • 20 statistical tests (TEST_*): tier-1 row tests (T, WELCH, CHISQ, ANOVA_F, ANOVA_WELCH, ANOVA_RM, KS, PAIRED_T, PROP_Z, PEARSON_R, SPEARMAN_R, KENDALL_TAU, MANN_WHITNEY_U, WILCOXON_SR, KRUSKAL_WALLIS, BROWN_FORSYTHE, FISHER_EXACT, SHAPIRO_WILK) and tier-2 post-tests (TUKEY_HSD, TREND, variants)
  • 3 regressions (REG_*): OLS, GLM, BAYES_LINEAR — with Resample / Selection modifiers and FEAT_POLY composition cover 13 textbook regression names
  • 12 synth distributions

LLM Skill Pack

Pulse bundles 22 skill documents that teach LLM agents how to operate it. Skills are embedded via //go:embed — no external files.

Discovering skills
pulse skills list
pulse skills list --json
pulse skills show aggregation-guide
Bundled skills
Skill Purpose
getting-started Pulse vocabulary, MCP tool surface, file format, operator catalog
cohort-schema-design Field types, nullability, bit-packing, descriptions
aggregation-guide Aggregator selection (AGG_) and filterer selection (FILTER_)
attribute-composition ATTR_* derived columns: z-score, formula, percentile, date_part
grouper-design CATEGORY, RANGE, ROUNDED, DATE, QUANTILE
window-operations LAG/LEAD/RANK/MOVING_AVG/EWMA partitioning and frame semantics
feature-engineering Pre-filter FEAT_* operators for ML pipelines + leakage trap
statistical-testing Tier-1 row tests and tier-2 post-tests
regression-modeling OLS, GLM, Bayesian linear; modifiers; 13 textbook names mapped
synthetic-data Distributions, correlations, constraints
compose-requests Multi-request batching against one cohort
debugging-with-predict Iterating with pulse_predict / pulse api predict
error-code-reference Reading envelopes; calling pulse_errors_lookup
import-best-practices Schema inference, fail-closed semantics, PULSE_IMPORT_*
export-format-selection CSV / TSV / NDJSON / JSON array / Parquet / Arrow / Excel
financial-cohorts decimal128 semantics for money
mcp-integration MCP tool surface, schema-bound enums, session bootstrap
request-recipes Canonical request JSON skeletons keyed by intent
query-router-prompt System-prompt template for natural-language → AskRequest
contributor-workflow Recipes for extending Pulse
From Go
import "github.com/frankbardon/pulse/skills"

for _, s := range skills.List() {
    fmt.Printf("%s: %s\n", s.Name, s.Description)
}

content, ok := skills.Get("aggregation-guide")
if ok {
    agent.AddContext(content)
}

The root manifest (pulse --json) includes a skills[] array so agents can discover available skills in one call.

.pulse File Format

Binary, self-describing, fully transportable:

  • 9-byte header: magic bytes (PULSE\x00\x00\x00) + format version (0x01)
  • Schema block: field count, per-field descriptors (type, name, byte offset, bit position, source column index, optional description capped at 1000 bytes)
  • Dictionary blocks: one per categorical field (string-to-integer mapping stored inline)
  • Record data: fixed-width binary records, one per row

17 field types:

Type Bytes Notes
u8, u16, u32, u64 1, 2, 4, 8 Unsigned integers
f32, f64 4, 8 IEEE 754 floats
date 4 Days since Unix epoch
packed_bool 0 Bit-packed; shares bytes with adjacent packed fields
nullable_bool 0 Tri-state; bit-packed
nullable_u4 0 4-bit unsigned, nullable; bit-packed
nullable_u8, nullable_u16 1, 2 Nullable unsigned integers
categorical_u8, categorical_u16, categorical_u32 1, 2, 4 Dictionary-encoded strings
decimal128 16 Fixed precision/scale; banker's rounding
nullable_decimal128 16 Nullable decimal128

Categorical width auto-selected from sample cardinality during import. Bit-packed types report ByteSize() == 0 — they share bytes with adjacent packed fields. Schema reader rejects unknown type bytes at parse time with ENCODING_INVALID.

Configuration

Three environment variables, all optional:

export PULSE_DATA_DIR=/path/to/data        # Base directory for .pulse cohort files
export PULSE_IMPORTS_DIR=imports           # Subdir for managed-import handles (default "imports")
export PULSE_IMPORT_TTL=7d                 # Default TTL for managed handles ("24h", "30m", "7d", "pin")

Embedders override per-instance via pulse.Options{DataDir, ImportsDir, ImportTTL, FS}. No config files.

Output Format Contract

All --json output is wrapped in a descriptor.Envelope:

{
  "format_version": "1.0",
  "data": { ... },
  "errors": [],
  "warnings": []
}

format_version is "1.0". Errors/warnings use {"code", "message", "details"} entries (empty array when absent, never null). Additive-only: new data fields don't bump the version; renames or removals do.

Development

Build
make build    # Binary at ./bin/pulse
make test     # go test ./...
make fmt      # gofmt
make vet      # go vet
make lint     # staticcheck (auto-installed via go run)
make cover    # Coverage report
make docs     # Build mdBook
make clean    # Remove artifacts

A .env at repo root is auto-loaded by the Makefile.

Run tests
go test ./...
go test ./processing/...
go test ./service/... -v -run TestProcess
go test ./encoding/... -fuzz FuzzPulseFileHeader -fuzztime 30s
Project structure
pulse/
├── pulse.go                Public facade
├── cmd/pulse/              CLI binary (thin adapter)
├── service/                Orchestration: wires processing to encoding
├── processing/             Aggregators, attributes, filterers, groupers
│   ├── window/             WIN_* operators
│   └── feature/            FEAT_* pre-filter feature engineers
├── encoding/               .pulse binary codec
├── io/                     Tabular ↔ .pulse adapters
│   └── csv|tsv|ndjson|jsonarray|arrow|parquet|excel/
├── fs/                     afero-based filesystem abstraction
├── errors/                 Typed error codes (CodedError system)
├── types/                  Request/response structs + streamability table
├── descriptor/             manifest, predict, inspect, envelope (no-execute)
├── skills/                 //go:embed markdown skill pack
├── examples/               //go:embed runnable request examples
├── synth/                  Synthetic data generator
├── docs/                   mdBook source (GitHub Pages)
└── internal/
    ├── cli/                CLI internals
    └── mcp/                MCP server (wraps pulse.Pulse)

Documentation: https://frankbardon.github.io/pulse/.

License

MIT. See LICENSE.

Documentation

Overview

Package pulse is a high-performance, self-describing tabular data processing engine.

Pulse ships as a CLI binary and as an embeddable Go library. The library is the primary deliverable; the CLI is a thin adapter over it.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AggregatorRegistration added in v0.7.0

type AggregatorRegistration struct {
	Name        types.AggregationType
	Description string
	Factory     processing.AggregatorFactory
	Streamable  bool
	Accepts     []encoding.FieldType
	Params      []ParamMeta
	// FieldInputs is the optional buffered-projection introspection
	// hook. See FieldInputsFunc. Omit to keep the operator opaque to
	// projection (runtime widens the field set when this operator
	// appears in a request).
	FieldInputs FieldInputsFunc
}

AggregatorRegistration installs a custom AGG_* operator. The factory must obey processing.AggregatorFactory: it builds a fresh Aggregator per Process call against the supplied Aggregation spec + schema.

When Streamable=true the factory MUST return a value that also implements processing.OnlineAggregator. Probe-validation at registration time enforces the contract via PULSE_EXTENSION_STREAMABLE_MISMATCH.

type AskRequest added in v0.5.0

type AskRequest struct {
	File    string         `json:"file,omitempty"`
	Request *types.Request `json:"request,omitempty"`
	// Query is an optional natural-language query string. When set, the
	// server parses it against the cohort's schema and synthesises a
	// types.Request. The parsed request fills only the slots the
	// supplied Request left empty; explicit fields in Request always
	// win on collision. When Query is empty, Ask behaves exactly as
	// before (structured Request only).
	Query     string `json:"query,omitempty"`
	OnInvalid string `json:"on_invalid,omitempty"`
	Predict   bool   `json:"predict,omitempty"`

	// Source is an optional source-file path (csv, tsv, ndjson,
	// jsonarray, parquet, arrow, excel, or .pulse). When set and
	// Request.Cohort is unspecified, Ask runs the managed-import
	// pipeline first (same semantics as pulse.ImportFile / pulse_import)
	// and uses the resulting handle as the cohort for the rest of the
	// call. The import is sliding-window TTL-tracked — subsequent Ask
	// / Process / Inspect calls against the same handle bump expiry.
	Source string `json:"source,omitempty"`

	// SourceFormat overrides extension-based format detection on the
	// Source path. Empty falls back to imports.FormatFromExt(Source).
	SourceFormat string `json:"source_format,omitempty"`

	// SourceTTL overrides the default 7-day TTL on the auto-import.
	// Accepts the same forms as pulse_import: Go duration ("24h",
	// "30m"), day suffix ("7d", "30d"), or "pin" for never-expire.
	SourceTTL string `json:"source_ttl,omitempty"`

	// SourceHandle overrides the managed handle name (defaults to the
	// source basename without extension).
	SourceHandle string `json:"source_handle,omitempty"`

	// SourceSheet selects an Excel sheet; ignored for non-Excel sources.
	SourceSheet string `json:"source_sheet,omitempty"`

	// SourceOverwrite replaces an existing managed handle of the same
	// name. Defaults to false (collision -> PULSE_IMPORT_HANDLE_EXISTS).
	SourceOverwrite bool `json:"source_overwrite,omitempty"`
}

AskRequest is the input to pulse.Ask — the unified one-shot facade that collapses (optional) import, inspect, predict, and execute into a single call.

Cohort selection precedence (first non-empty wins):

  1. Request.Cohort.Filename — explicit cohort the caller already has.
  2. File — back-compat shorthand for Request.Cohort.Filename when Request.Cohort is nil.
  3. Source — a tabular source file (csv, tsv, ndjson, jsonarray, parquet, arrow, excel) or an existing .pulse. Ask imports it via the managed pool (default TTL 7d) and uses the resulting handle. This is the path that collapses pulse_import + pulse_ask into a single call — preferred for one-shot analytical queries.

OnInvalid governs what Ask does when predict reports the request as invalid:

  • "" or "abort" — return a SERVICE_VALIDATION error.
  • "suggest" — return an AskResponse with Suggestions populated.

Predict=true skips execution even on a valid request (the LLM's "what would happen if I ran this?" probe).

type AskResponse added in v0.5.0

type AskResponse struct {
	FormatVersion   string                    `json:"format_version"`
	Predict         *descriptor.PredictResult `json:"predict"`
	Process         *Response                 `json:"process,omitempty"`
	Suggestions     []errors.Fixup            `json:"suggestions,omitempty"`
	QueryResolution *QueryResolution          `json:"query_resolution,omitempty"`
	// Import is populated when AskRequest.Source triggered an
	// auto-import. Carries the same fields as ImportResult: managed
	// handle name, resulting path, format, row count, expiry. Nil
	// when no auto-import ran.
	Import   *ImportResult               `json:"import,omitempty"`
	Errors   []*descriptor.EnvelopeEntry `json:"errors"`
	Warnings []*descriptor.EnvelopeEntry `json:"warnings"`
}

AskResponse is the JSON-friendly envelope returned by pulse.Ask. It always carries the predict result; Process is populated only when execution ran; Suggestions is populated only on predict-invalid + OnInvalid="suggest".

FormatVersion mirrors the descriptor envelope version so callers can gate on a single value across endpoints.

type AttributeEmitType added in v0.7.0

type AttributeEmitType string

AttributeEmitType is a manifest hint declaring the dtype the attribute emits per record. Defaults to "float64" when empty.

const (
	AttributeEmitFloat64 AttributeEmitType = "float64"
	AttributeEmitBool    AttributeEmitType = "bool"
	AttributeEmitString  AttributeEmitType = "string"
)

type AttributeMode added in v0.7.0

type AttributeMode string

AttributeMode declares which streaming tier an attribute factory promises to satisfy.

const (
	// AttributeModeRowLocal — factory returns a processing.RowLocalAttribute.
	// Equivalent to built-in ATTR_FORMULA / ATTR_DATE_PART.
	AttributeModeRowLocal AttributeMode = "row_local"
	// AttributeModeTwoPass — factory returns a processing.TwoPassAttribute.
	// Equivalent to ATTR_ZSCORE / ATTR_TSCORE / ATTR_NORMALIZED.
	AttributeModeTwoPass AttributeMode = "two_pass"
	// AttributeModeBuffered — factory returns a plain
	// processing.AttributeComputer; no streaming. Equivalent to
	// ATTR_PERCENTILE.
	AttributeModeBuffered AttributeMode = "buffered"
)

type AttributeRegistration added in v0.7.0

type AttributeRegistration struct {
	Name        types.AttributeType
	Description string
	Factory     processing.AttributeFactory
	Mode        AttributeMode
	Accepts     []encoding.FieldType
	Emits       AttributeEmitType
	Params      []ParamMeta
	// FieldInputs is the optional buffered-projection introspection
	// hook. See FieldInputsFunc.
	FieldInputs FieldInputsFunc
}

AttributeRegistration installs a custom ATTR_* operator.

type Cohort

type Cohort struct {
	// contains filtered or unexported fields
}

Cohort represents an opened .pulse file with its parsed schema. It wraps the service-layer Cohort to provide a clean public API.

func (*Cohort) Categorical

func (c *Cohort) Categorical(name string) (*encoding.Dictionary, bool)

Categorical returns the dictionary for a named categorical field. Returns nil, false if the field is not found or is not categorical.

func (*Cohort) Field

func (c *Cohort) Field(name string) *encoding.Field

Field returns a pointer to the named field, or nil if not found.

func (*Cohort) RecordCount added in v0.8.3

func (c *Cohort) RecordCount() (int64, error)

RecordCount returns the number of records in the cohort. For single-file cohorts this is derived from the byte length of the record region divided by the per-record size implied by the schema. For archive-backed cohorts the caller should sum per-shard RecordCount values from Shards() — the underlying service Cohort errors on RecordCount for archives because the byte-region path doesn't apply across shards.

func (*Cohort) Schema

func (c *Cohort) Schema() *encoding.Schema

Schema returns the cohort's schema.

func (*Cohort) Shards added in v0.8.0

func (c *Cohort) Shards() []ShardEntry

Shards returns the shard manifest for an archive-backed cohort. Empty for single-file cohorts. The returned slice is a defensive copy.

type ComposeOptions added in v0.2.0

type ComposeOptions = service.ComposeOptions

ComposeOptions controls parallel execution. See service.ComposeOptions.

type ComposedRequest

type ComposedRequest = types.ComposedRequest

Type aliases re-exported from the types package so embedders can use pulse.Request instead of types.Request.

type DistributionRegistration added in v0.7.0

type DistributionRegistration struct {
	Name        string
	Description string
	// Factory is the sampler-construction callback. Its concrete
	// signature is finalised when the synth distribution overlay
	// lands; until then this field is held opaque to keep the API
	// shape stable.
	Factory any
	Params  []ParamMeta
}

DistributionRegistration installs a custom synthetic-data distribution kind. The factory shape is finalised alongside the synth wiring in a later phase; for now this registration is validated for naming + duplicates only and the Factory field is held opaque.

type ErrorFixup added in v0.5.0

type ErrorFixup = errors.Fixup

ErrorFixup is one repair template attached to an error code.

type ErrorMetadata added in v0.5.0

type ErrorMetadata = errors.LookupResult

ErrorMetadata is the depth-on-demand projection returned by ErrorLookup, ErrorsByDomain, and ErrorsSearch. Carries the code, domain, canonical Message, and materialised Fixup templates.

type Example added in v0.5.0

type Example = examples.Example

Example is the full record returned by ExampleGet — runnable request JSON plus the indexed metadata.

type ExampleSummary added in v0.5.0

type ExampleSummary = examples.ExampleSummary

ExampleSummary is the lightweight projection returned by ExamplesSearch.

type ExprFunction added in v0.7.0

type ExprFunction struct {
	Name        string
	Description string
	// Signature is a human-readable signature surfaced in the
	// manifest and in predict error messages when typecheck fails
	// (e.g. `rank_familiarity(value float64, total_pop bool) float64`).
	Signature string
	// Fn is the Go function. expr-lang accepts any func value;
	// the runtime resolves argument types against the call site.
	Fn any
	// Pure declares the function is side-effect-free and depends
	// only on its arguments. Reserved for a future memoization
	// optimisation; declaring it has no runtime effect today.
	Pure bool
}

ExprFunction describes a function callable from runtime expressions (ATTR_FORMULA / FILTER_EXPRESSION). At pulse.New time the Fn value is registered with the expr-lang engine under Name.

type Extensions added in v0.7.0

type Extensions struct {
	Aggregators        []AggregatorRegistration
	Attributes         []AttributeRegistration
	Filterers          []FiltererRegistration
	Groupers           []GrouperRegistration
	Windows            []WindowRegistration
	Features           []FeatureRegistration
	Tests              []TestRegistration
	SynthDistributions []DistributionRegistration

	// ExprFunctions are merged into the runtime expression environment
	// used by ATTR_FORMULA and FILTER_EXPRESSION (plus any future
	// expression hook). Each entry is callable from request expressions
	// under its declared Name.
	ExprFunctions []ExprFunction

	// LookupTables expose static keyed maps to the runtime expression
	// environment. Callers reference them as
	//   lookup("table_name", key1, key2, ...)
	// The built-in lookup() function is added to the expression env
	// whenever LookupTables is non-empty. Tables are read-only after
	// pulse.New.
	LookupTables map[string]LookupTable
}

Extensions bundles every per-category registration slot plus the expression-environment additions an embedder wants exposed to runtime ATTR_FORMULA and FILTER_EXPRESSION evaluation.

Zero value is the zero-extension case — pulse.New(Options{}) with no Extensions field behaves identically to a Pulse instance shipped without this surface. The registration types live in a separate namespace from the built-in registries (no collision risk) and the runtime treats them identically to built-ins.

type FacetDiscrete added in v0.7.0

type FacetDiscrete = types.FacetDiscrete

FacetDiscrete is the per-value count list for discrete fields.

type FacetField added in v0.7.0

type FacetField = types.FacetField

FacetField wraps either a discrete or numeric per-field summary.

type FacetHistogram added in v0.7.0

type FacetHistogram = types.FacetHistogram

FacetHistogram is the fixed-width binning of a numeric field.

type FacetNumeric added in v0.7.0

type FacetNumeric = types.FacetNumeric

FacetNumeric is the streaming-stats summary for numeric fields.

type FacetRequest added in v0.7.0

type FacetRequest = types.FacetRequest

FacetRequest is the input to FacetSchema — multi-field, with optional filters, percentiles, histograms, and additive contribution counts.

type FacetResult added in v0.7.0

type FacetResult = types.FacetResult

FacetResult is the response from FacetSchema.

type FacetValueCount added in v0.7.0

type FacetValueCount = types.FacetValueCount

FacetValueCount is one (value, count) tuple inside FacetDiscrete.

type FeatureRegistration added in v0.7.0

type FeatureRegistration struct {
	Name        types.FeatureType
	Description string
	Factory     feature.Factory
	Streamable  bool
	Accepts     []encoding.FieldType
	Params      []ParamMeta
	// FieldInputs is the optional buffered-projection introspection
	// hook. See FieldInputsFunc.
	FieldInputs FieldInputsFunc
}

FeatureRegistration installs a custom FEAT_* operator. Set Streamable=true when the factory returns a feature.Computer that also implements feature.StreamingComputer.

type FieldInputsFunc added in v0.8.0

type FieldInputsFunc func(raw json.RawMessage) []string

FieldInputsFunc is the optional introspection callback an extension registration may supply so the buffered-projection extractor can determine which extra schema fields the operator reads beyond the spec's explicit Field/Field2/PartitionBy/etc. references. raw is the operator's Params block. Return value lists schema field names to include; nil/empty means "no extra fields."

Embedders that omit this hook leave their operator opaque to the projection extractor — the runtime then widens the buffered-decode field set to "every field" so the operator stays correct.

type FiltererRegistration added in v0.7.0

type FiltererRegistration struct {
	Name        types.FiltererType
	Description string
	Factory     processing.FiltererFactory
	Accepts     []encoding.FieldType
	Params      []ParamMeta
	// FieldInputs is the optional buffered-projection introspection
	// hook. See FieldInputsFunc. Filterers don't carry a Params
	// block today; the callback receives nil raw bytes and should
	// return the static set of extra fields the filterer reads
	// beyond Filterer.Field.
	FieldInputs FieldInputsFunc
}

FiltererRegistration installs a custom FILTER_* operator. Filterers are always row-local streamable today — every built-in filterer evaluates per-row. The runtime records custom filterers as streamable; a buffered-filterer registration shape lands the day Pulse grows a buffered-filterer path.

type GrouperRegistration added in v0.7.0

type GrouperRegistration struct {
	Name        types.GroupType
	Description string
	Factory     processing.GrouperFactory
	Streamable  bool
	Accepts     []encoding.FieldType
	Params      []ParamMeta
	// FieldInputs is the optional buffered-projection introspection
	// hook. See FieldInputsFunc.
	FieldInputs FieldInputsFunc
}

GrouperRegistration installs a custom GROUP_* operator. Set Streamable=true when the factory returns a processing.Grouper that also implements processing.StreamingGrouper (KeyForRow).

type ImportEntry added in v0.5.0

type ImportEntry = imports.Entry

Type aliases re-exported from the imports package so embedders can use pulse.ImportSpec instead of imports.Spec.

type ImportResult added in v0.5.0

type ImportResult = imports.Result

Type aliases re-exported from the imports package so embedders can use pulse.ImportSpec instead of imports.Spec.

type ImportSpec added in v0.5.0

type ImportSpec = imports.Spec

Type aliases re-exported from the imports package so embedders can use pulse.ImportSpec instead of imports.Spec.

type LoadMemberSetResult added in v0.8.3

type LoadMemberSetResult = processing.LoadMemberSetResult

LoadMemberSetResult mirrors processing.LoadMemberSetResult so callers can inspect drop counts after loading an include-set file.

func LoadMemberSetFromReader added in v0.8.3

func LoadMemberSetFromReader(r io.Reader, schema *encoding.Schema, fieldName string) (LoadMemberSetResult, error)

LoadMemberSetFromReader is the public alias for the underlying processing-package loader. It reads newline-delimited values from r and returns the best MemberSet impl for the named field on schema (bitset for categorical, uint64 map for integer / date, string map for decimal / fallback). Float fields are rejected.

type LookupTable added in v0.7.0

type LookupTable struct {
	Description string
	// Rows is the canonical static map. The map key is the joined
	// composite key (caller joins arguments with the table's
	// configured separator, conventionally "|").
	Rows map[string]float64
	// Lookup is the function-driven accessor. Returns the value
	// and true on hit; the second return is the miss signal. An
	// error from Lookup surfaces as PULSE_LOOKUP_MISS with the
	// embedder's message attached.
	Lookup func(keys ...string) (value float64, ok bool, err error)
}

LookupTable exposes a static keyed map to the runtime expression environment. Exactly one of Rows / Lookup must be non-nil.

Rows is the simple path — a composite-key map where the caller of lookup() supplies the joined key. Lookup is the escape hatch for embedders that compose keys, perform partial-match fallback, or pull from an external store.

type MemberSet added in v0.8.3

type MemberSet = processing.MemberSet

MemberSet is the public alias for processing.MemberSet — the read-only set type consumed by FilterToFileBySetAndExpr. Build one via LoadMemberSetFromReader (recommended for newline-delimited files) or by constructing a concrete impl directly.

type Options

type Options struct {
	// DataDir is the base directory for cohort files.
	// Defaults to PULSE_DATA_DIR if empty and FS is not set.
	DataDir string

	// FS is an optional custom filesystem.
	// When set, DataDir is ignored for filesystem construction.
	FS afero.Fs

	// DisableDefaults turns off the smart-defaults pass that infers
	// operator Type from the named field's schema type when the caller
	// omits it. Defaults to false (defaults enabled). Predict still
	// computes and reports DefaultsApplied independently — this flag
	// governs only what the runtime mutates on the live request.
	DisableDefaults bool

	// ImportsDir overrides the managed-imports directory. Defaults to
	// imports.DefaultImportsDir (resolved relative to the Pulse fs
	// root). Honoured before the PULSE_IMPORTS_DIR env var.
	ImportsDir string

	// ImportTTL overrides the default TTL applied to managed imports
	// when the caller does not pass one. Zero falls back to the
	// PULSE_IMPORT_TTL env var, then to imports.DefaultTTL. Negative
	// values pin imports (never expire) by default.
	ImportTTL time.Duration

	// ImportSourceFS is the afero.Fs used to read source files when
	// ImportFile / pulse_import receives an absolute path. Defaults to
	// afero.NewOsFs() on real OS installs, or to FS when FS is an
	// in-memory MemMapFs (so tests stay hermetic). Setting this
	// disables the default jail: the explicit fs IS the boundary.
	ImportSourceFS afero.Fs

	// ImportSourceJailRoot confines absolute source paths to a
	// directory tree. Empty string defaults to os.Getwd() at New
	// time — the natural sandbox for an MCP server or CLI invocation.
	// Ignored when ImportSourceFS is set explicitly.
	ImportSourceJailRoot string

	// Extensions registers domain-specific operators + expression
	// extensions that the runtime treats identically to built-ins.
	// Zero value disables the extension path entirely. See
	// extensions.go for the full surface.
	Extensions Extensions

	// ShardWorkers caps the per-shard parallel worker pool used when
	// Process operates on a shard archive (S6 of the sharding rollout).
	// Zero means runtime.NumCPU(); 1 forces strictly serial execution
	// (same byte-for-byte semantics as the pre-S6 path). Negative
	// values are rejected at New() time.
	//
	// The parallel reducer engages only when every operator in the
	// request is mergeable per processing.CanMergeRequest. Non-
	// mergeable requests (percentile aggregators, window operators,
	// tier-2 tests, two-pass attributes combined with groupers, ...)
	// fall through to the serial shardIter path with no worker
	// spawning. Worker count is also capped at the shard count — no
	// point spawning more workers than shards.
	//
	// Order semantics: partials merge in shard insertion order (zip
	// central-directory order). Associative+commutative aggregators
	// (count, sum, min, max, null_count, frequency, distinct_count,
	// mode) produce byte-equal results vs the serial path; Welford
	// mean / variance / stddev drift within ULP on well-conditioned
	// inputs (parallel formula, see processing.MergeOnline docstrings).
	ShardWorkers int

	// Strict promotes request-validation warnings into hard errors at
	// runtime. Today this covers the numeric-aggregation-on-categorical
	// check (PULSE_AGG_NOT_MEANINGFUL_FOR_CATEGORICAL); future runtime
	// validations follow the same flag. Defaults to false — Process
	// runs the request and emits warnings through the descriptor
	// Envelope (visible via --json at the CLI boundary). Predict's
	// PredictOptions.Strict remains independently controllable.
	Strict bool

	// ProjectBufferedFields enables buffered-decode field projection.
	// When true the runtime walks each request to compute the set of
	// schema fields the operators actually read (processing.NeededFields)
	// and skips map writes for fields outside that set. Per-record
	// memory drops proportional to the projection ratio. Decode CPU
	// is unchanged.
	//
	// Defaults to false to preserve byte-identical behavior with the
	// pre-projection codepath. Embedders with wide cohorts (many
	// fields) and narrow requests (few referenced) see the largest
	// win. Extension operators without a registered FieldInputs hook
	// widen the projection automatically, so enabling this flag is
	// always safe — the worst case degenerates to the full-decode
	// behaviour.
	ProjectBufferedFields bool
}

Options configures a Pulse instance.

type ParamMeta added in v0.7.0

type ParamMeta struct {
	Name        string `json:"name"`
	Description string `json:"description,omitempty"`
	JSONType    string `json:"json_type"`
	Required    bool   `json:"required,omitempty"`
	Default     any    `json:"default,omitempty"`
}

ParamMeta describes one operator-specific parameter for manifest emission. It is descriptive only — runtime parameter resolution happens inside each factory.

type Profile added in v0.2.0

type Profile = synth.Profile

Profile is the cohort statistical summary used by from-profile.

type ProfileOptions added in v0.2.0

type ProfileOptions = synth.ProfileOptions

ProfileOptions modulate which statistics the profiler captures.

type Pulse

type Pulse struct {
	// contains filtered or unexported fields
}

Pulse is the top-level library facade. It wraps the service layer and provides a clean API for embedding Pulse into Go programs.

func New

func New(opts Options) (*Pulse, error)

New creates a new Pulse instance with the given options.

func (*Pulse) AddShard added in v0.8.0

func (p *Pulse) AddShard(ctx context.Context, archivePath, shardPath string) error

AddShard validates the incoming single-file shard against the archive's canonical schema and appends it. Dict growth that the incoming shard introduces is reflected in the rewritten `_schema.pulse` payload before the new shard payload is appended. v1 reads the whole archive into memory and writes it back via temp+rename — semantically equivalent to true in-place append and crash-safe at the canonical-path level.

func (*Pulse) Ask added in v0.5.0

func (p *Pulse) Ask(ctx context.Context, req *AskRequest) (*AskResponse, error)

Ask is the unified facade. It optionally imports a source file (when AskRequest.Source is set), validates the request against the cohort schema, and executes it. On validation failure it either errors (OnInvalid="abort") or returns structured fixup suggestions (OnInvalid="suggest"). Setting Predict=true skips execution after a successful predict pass.

Cohort precedence: Request.Cohort > File > Source (auto-import). The auto-import path collapses pulse_import + pulse_ask into a single call — the preferred one-shot entry point for analytical queries on raw source files.

func (*Pulse) CompactShardArchive added in v0.8.0

func (p *Pulse) CompactShardArchive(ctx context.Context, archivePath string) error

CompactShardArchive rewrites the archive to eliminate orphaned bytes from prior in-place mutations and refreshes the canonical metadata (aggregate_record_count + shard_count). v1 AddShard / RemoveShard already use temp+rename (no orphan bytes in v1 archives), so Compact primarily serves to refresh canonical metadata that may have drifted if the archive was edited outside Pulse. The whole-archive rewrite pattern is the explicit reclaim path per the design contract §7.1.

func (*Pulse) Compose

func (p *Pulse) Compose(ctx context.Context, req *ComposedRequest) ([]*Response, error)

Compose executes multiple requests, returning a response for each.

func (*Pulse) ComposeParallel added in v0.2.0

func (p *Pulse) ComposeParallel(ctx context.Context, req *ComposedRequest, opts ComposeOptions) ([]*Response, error)

ComposeParallel runs every request in req concurrently across a bounded worker pool. Responses are returned in input order. Workers share the engine's read-only registries; each Process call constructs fresh stateful operators per request, so concurrent execution is safe.

Defaults: MaxWorkers = runtime.GOMAXPROCS(0), no per-request timeout, FailFast = true (set FailFast=false to collect every request's outcome instead of cancelling siblings on first error).

func (*Pulse) Convert

func (p *Pulse) Convert(ctx context.Context, job *pio.ConvertJob) (*pio.ConvertReport, error)

Convert chains import and export with no intermediate file on disk. The job's FS field is set to the Pulse instance's filesystem if not already set.

func (*Pulse) CreateShardArchive added in v0.8.0

func (p *Pulse) CreateShardArchive(ctx context.Context, archivePath string, shardPaths []string) error

CreateShardArchive writes a fresh Pulse shard archive at archivePath containing the supplied single-file shardPaths. The first shard seeds the canonical schema; remaining shards are validated via structural cohesion + the append-only dictionary prefix rule. The archive is written atomically (temp file + rename) so partial writes never appear at archivePath. See service.CreateShardArchive for the full error surface.

func (*Pulse) Drop added in v0.5.0

func (p *Pulse) Drop(ctx context.Context, handle string) error

Drop removes a managed import handle (and its sidecar) from the pool. Returns PULSE_IMPORT_SOURCE_MISSING when the handle is unknown.

func (*Pulse) ErrorLookup added in v0.5.0

func (p *Pulse) ErrorLookup(code string) (ErrorMetadata, bool)

ErrorLookup returns the metadata projection for a single error code. Case-sensitive exact match. Returns (ErrorMetadata{}, false) when the code is unknown.

The manifest carries only the alphabetized code-name list; per-code Message + Fixup detail lives behind this facade so per-session bootstrap stays lean. Use ErrorsByDomain / ErrorsSearch to enumerate in bulk.

func (*Pulse) ErrorsByDomain added in v0.5.0

func (p *Pulse) ErrorsByDomain(domain string) []ErrorMetadata

ErrorsByDomain returns every code's metadata in the named domain (CLI, DATA, ENCODING, PROCESSING, PULSE, SERVICE). Match is case-insensitive. Returns a non-nil empty slice when nothing matches; results are sorted alphabetically by code.

func (*Pulse) ErrorsSearch added in v0.5.0

func (p *Pulse) ErrorsSearch(query string) []ErrorMetadata

ErrorsSearch returns codes whose Message or Fixup hints contain the query (case-insensitive substring). Results are ranked by match source: description hits before fixup hits before code-name hits; ties resolve alphabetically. Returns a non-nil empty slice when nothing matches.

func (*Pulse) ExampleGet added in v0.5.0

func (p *Pulse) ExampleGet(name string) (*Example, bool)

ExampleGet returns the example whose _meta.name matches name. The returned Body is the request JSON with the _meta block stripped so it can be handed directly to Process / Predict.

func (*Pulse) ExamplesSearch added in v0.5.0

func (p *Pulse) ExamplesSearch(query string, tags []string, category string) []ExampleSummary

ExamplesSearch returns summaries from the embedded request-example library matching the given filters. An empty filter is treated as "no constraint" for that dimension. Query is case-insensitive substring search across name, description, and operators; tags is ANDed; category is an exact match. Always returns a non-nil slice (possibly empty) for safe JSON marshaling.

func (*Pulse) Export

func (p *Pulse) Export(ctx context.Context, job *pio.ExportJob) (*pio.ExportReport, error)

Export converts a .pulse file into tabular output. The job's FS field is set to the Pulse instance's filesystem if not already set.

func (*Pulse) ExtractShard added in v0.8.0

func (p *Pulse) ExtractShard(ctx context.Context, archivePath, shardBasename string) (io.ReadCloser, error)

ExtractShard returns an io.ReadCloser over the named shard's standalone single-file `.pulse` bytes. Suitable for piping to `pulse inspect -` or writing back to disk.

func (*Pulse) Facet

func (p *Pulse) Facet(ctx context.Context, path string, field string) ([]string, error)

Facet returns distinct values for the named field in the cohort. Categorical fields short-circuit through the dictionary; numeric fields stream the file collecting distinct float values. For richer summaries (counts, null tallies, statistics, histograms, additive contributions) call FacetSchema instead.

func (*Pulse) FacetSchema added in v0.7.0

func (p *Pulse) FacetSchema(ctx context.Context, req *FacetRequest) (*FacetResult, error)

FacetSchema runs a multi-field rich facet against the cohort named in req.Cohort. Returns per-field summaries (discrete value counts or numeric statistics), with optional percentiles, fixed-width histograms, and "additive" contribution counts that report what each distinct value of an additive field would yield if it were added to the base filter.

Streamability: requests with no NumericPercentiles run in a single pass; requests with percentiles buffer the requested numeric fields' non-null values and sort once before percentile interpolation.

func (*Pulse) FilterToFile added in v0.8.1

func (p *Pulse) FilterToFile(ctx context.Context, src, dst, filterExpr string) (int64, error)

FilterToFile reads the .pulse cohort at src, evaluates filterExpr (FILTER_EXPRESSION semantics — same operators, identifiers, expr functions, and lookup tables available to pulse.Process with a FILTER_EXPRESSION filterer) against every record, and writes a new .pulse cohort at dst containing only matching records.

Dispatch matches the rest of the facade. Single-file inputs produce single-file outputs whose header + schema bytes are copied byte-for- byte from src. Shard archives produce shard archives that preserve the per-shard layout: one input shard maps to one output shard at the same basename, in central-directory (insertion) order, with per-shard header + schema + categorical-dictionary bytes copied verbatim. Empty shards survive so shard_count metadata stays stable; the canonical `_schema.pulse` trailer's aggregate_record_count is refreshed to the surviving total. The anchor form `archive.pulse#shard.pulse` resolves a single shard and writes a single-file output.

Returns the number of records written to dst (sum across shards for archive inputs).

func (*Pulse) FilterToFileBySetAndExpr added in v0.8.3

func (p *Pulse) FilterToFileBySetAndExpr(ctx context.Context, src, dst, includeField string, set MemberSet, filterExpr string) (int64, error)

FilterToFileBySetAndExpr applies an optional MemberSet membership test (record's value for includeField must be in set) combined with an optional FILTER_EXPRESSION (filterExpr) to every record in src, writing the survivors to dst. At least one of (set, filterExpr) must be supplied; if both are present they are AND-combined and the set is tested first so per-row work short-circuits on misses without paying the expr eval cost.

Input-shape dispatch is identical to FilterToFile (single-file, shard archive, anchor). The set must be built against the same canonical schema as src.

Returns the number of records written to dst (sum across shards for archive inputs).

func (*Pulse) Fs added in v0.2.0

func (p *Pulse) Fs() afero.Fs

Fs returns the underlying afero.Fs. Embedders (e.g. the MCP server) need this to enumerate .pulse files; processing methods route through service and never expose the filesystem directly.

func (*Pulse) Import

func (p *Pulse) Import(ctx context.Context, job *pio.ImportJob) (*pio.ImportReport, error)

Import converts tabular source data into a .pulse file. The job's FS field is set to the Pulse instance's filesystem if not already set.

func (*Pulse) ImportFile added in v0.5.0

func (p *Pulse) ImportFile(ctx context.Context, spec ImportSpec) (*ImportResult, error)

ImportFile auto-detects the source format, converts the source into a managed .pulse file under the imports pool, and returns the resulting handle. Pulse-native sources pass through unchanged (no copy, no sidecar). Sliding-window TTL applies to managed handles — every subsequent Inspect / Predict / Process / Sample / Facet / Ask against the handle bumps the expiry forward.

func (*Pulse) Imports added in v0.5.0

func (p *Pulse) Imports(ctx context.Context) ([]ImportEntry, error)

Imports returns a snapshot of the managed-imports pool. Sweep is not invoked; expired entries are flagged via Entry.Expired so callers can render them. Results are sorted by handle name.

func (*Pulse) Inspect

func (p *Pulse) Inspect(ctx context.Context, path string) (*descriptor.InspectResult, error)

Inspect reads a .pulse file header and schema, returning structured field information. It never reads record data.

func (*Pulse) ListShards added in v0.8.0

func (p *Pulse) ListShards(ctx context.Context, archivePath string) ([]ShardEntry, error)

ListShards returns the archive's shard manifest in central- directory order (which equals shard insertion order). Single-file cohorts return an empty slice.

func (*Pulse) Manifest added in v0.5.0

func (p *Pulse) Manifest(_ context.Context) *descriptor.Manifest

Manifest returns the root Pulse self-description. The manifest is deterministic and process-wide: it does not depend on cohort data or the filesystem. Callers cache the result for a session.

func (*Pulse) Open

func (p *Pulse) Open(ctx context.Context, path string) (*Cohort, error)

Open reads a .pulse file and returns a Cohort with the parsed schema.

Anchor syntax: a path of the form "archive.pulse#shard.pulse" opens the archive, locates the named shard inside it, and returns a single- shard cohort whose schema comes from the shard's own header (not the canonical schema in `_schema.pulse`). The returned Cohort has an empty Shards slice — anchor-resolved shards stand alone for the purposes of facade methods. Anchors require an archive backing the path; using `#` against a single-file `.pulse` raises PULSE_ARCHIVE_MAGIC_INVALID. A literal `#` in a filename is not supported in v1.

Anchor parsing happens inside service.Service.Open as well, so the other facade methods (Process, Sample, Facet, ...) that receive an anchored Cohort path resolve consistently.

func (*Pulse) Predict

func (p *Pulse) Predict(ctx context.Context, req *Request) (*descriptor.PredictResult, error)

Predict validates a request against a .pulse file without executing it. It reads only the header and schema, never record data.

func (*Pulse) Process

func (p *Pulse) Process(ctx context.Context, req *Request) (*Response, error)

Process executes a single processing request against a cohort.

func (*Pulse) ProcessStream added in v0.2.0

func (p *Pulse) ProcessStream(ctx context.Context, req *Request) (RowIter, error)

ProcessStream executes a request and returns a pull-based row iterator over the result. Equivalent to Process for any request shape — same gates, same errors — but streaming consumers (HTTP responders, NDJSON writers, downstream pipelines) can drain rows one at a time without buffering the full result in their own memory.

Predict's Streamable flag reports whether the underlying execution avoids buffering inside the engine; ProcessStream wraps the result regardless, so the API is stable for non-streamable requests too.

func (*Pulse) Profile added in v0.2.0

func (p *Pulse) Profile(_ context.Context, path string, opts ProfileOptions) (*Profile, error)

Profile reads a .pulse file at path and returns a statistical summary suitable for from-profile synthesis. The profile retains no individual rows from the source data.

func (*Pulse) RemoveShard added in v0.8.0

func (p *Pulse) RemoveShard(ctx context.Context, archivePath, shardBasename string) error

RemoveShard rewrites the archive omitting the named shard. The canonical schema is preserved (dictionary entries are never shrunk). Returns PULSE_SHARD_MISSING when the named shard is not in the archive.

func (*Pulse) ResolveCanonicalSchema added in v0.8.3

func (p *Pulse) ResolveCanonicalSchema(ctx context.Context, src string) (*encoding.Schema, error)

ResolveCanonicalSchema returns the canonical encoding.Schema for the cohort at src without streaming records. Resolves single-file, shard-archive, and `archive#shard` anchor inputs identically to the rest of the facade.

Useful for callers that need to build an include-set before invoking FilterToFileBySetAndExpr: the set loader requires the schema to pick the best MemberSet impl (bitset / uint64 / string) for the field's type.

func (*Pulse) ResolveImport added in v0.5.0

func (p *Pulse) ResolveImport(ctx context.Context, handle string) (string, error)

ResolveImport returns the managed-pool path for a handle, or PULSE_IMPORT_SOURCE_MISSING when no such handle exists. Embedders who want to address a managed handle by name (instead of by path) run path through this resolver before passing it to Inspect/Process.

func (*Pulse) Sample

func (p *Pulse) Sample(ctx context.Context, path string, n int) ([]Record, error)

Sample returns up to n rows from the cohort as maps of field name to value.

func (*Pulse) Service added in v0.7.0

func (p *Pulse) Service() *service.Service

Service returns the underlying service handle. Exposed so tests (and advanced embedders) can inspect the installed extension registry, FS configuration, and orchestration state.

func (*Pulse) SweepImports added in v0.5.0

func (p *Pulse) SweepImports(ctx context.Context) ([]string, error)

SweepImports removes every expired managed handle and returns the list of swept handle names. Invoked opportunistically by ImportFile and exposed here for callers that want explicit control (CLI maintenance, periodic ticker, etc.).

func (*Pulse) Synth added in v0.2.0

func (p *Pulse) Synth(_ context.Context, spec *SynthSpec, output string, opts SynthOptions) (*SynthResult, error)

Synth materializes a synthetic .pulse file at output from spec. The generator is deterministic for a given (spec, opts.Seed) pair: same seed produces a byte-identical file.

func (*Pulse) VerifyShardArchive added in v0.8.0

func (p *Pulse) VerifyShardArchive(ctx context.Context, archivePath string) (*VerifyResult, error)

VerifyShardArchive opens the archive and re-validates every shard's header (magic + format_version), structural cohesion against the canonical schema, dictionary prefix rule, and cross-checks each shard's record count against the canonical aggregate. Returns a VerifyResult carrying any errors (PULSE_SHARD_HEADER_INVALID, PULSE_SHARD_SCHEMA_MISMATCH, PULSE_SHARD_DICT_DIVERGENCE) and any non-fatal warnings (PULSE_SHARD_DESCRIPTION_DIVERGENCE, aggregate drift). Returns a non-nil error only when the archive itself cannot be opened (archive corrupt, file missing, etc.); per-shard issues are reported through the result struct so the caller can render the full diagnosis.

type QueryResolution added in v0.5.0

type QueryResolution struct {
	// Query is the verbatim input the parser processed.
	Query string `json:"query"`

	// MatchedFields is every schema field name the parser resolved
	// against, in first-appearance order.
	MatchedFields []string `json:"matched_fields"`

	// Confidence is the aggregate parser confidence in [0, 1].
	// Product of per-step weights (verb match × per-field match),
	// floored at 0 on PULSE_QUERY_UNRESOLVED.
	Confidence float64 `json:"confidence"`
}

QueryResolution echoes the parser's decision when AskRequest.Query was set. Nil when no query parse fired.

type Record

type Record = map[string]any

Record is a row of field→value data returned by Sample.

type Request

type Request = types.Request

Type aliases re-exported from the types package so embedders can use pulse.Request instead of types.Request.

type Response

type Response = types.Response

Type aliases re-exported from the types package so embedders can use pulse.Request instead of types.Request.

type Row added in v0.2.0

type Row = service.Row

Row is a single result row in a processing stream.

type RowIter added in v0.2.0

type RowIter = service.RowIter

RowIter is a pull-based iterator over a processing result. Each call to Next returns the next row or (nil, false, nil) on exhaustion. Close releases underlying resources. Metadata returns the run metadata once available (always present after the iterator is drained).

type ShardEntry added in v0.8.0

type ShardEntry = service.ShardEntry

ShardEntry is one shard inside a Pulse shard archive. Re-exported from service so embedders can address pulse.ShardEntry directly.

type ShardInfo added in v0.8.0

type ShardInfo = descriptor.ShardInfo

ShardInfo is one shard entry as surfaced by Inspect / Predict. Re- exported from descriptor so embedders consuming the no-execute surface can address pulse.ShardInfo directly. Mirrors ShardEntry's shape (filename + record count); the two types are parallel because descriptor/ cannot import service/.

type SynthOptions added in v0.2.0

type SynthOptions = synth.Options

SynthOptions modulate the deterministic seed and other knobs.

type SynthResult added in v0.2.0

type SynthResult = synth.Result

SynthResult is the result of a successful Synth call.

type SynthSpec added in v0.2.0

type SynthSpec = synth.Spec

SynthSpec is the parsed synthesis request shape.

type TestRegistration added in v0.7.0

type TestRegistration struct {
	Name        types.TestType
	Description string
	Tier        TestTier
	RowFactory  processing.RowTestFactory
	PostFactory processing.PostTestFactory
	Streamable  bool
	Accepts     []encoding.FieldType
	Params      []ParamMeta
	// FieldInputs is the optional buffered-projection introspection
	// hook for tier-1 row tests. Tier-2 post-tests run on materialized
	// result rows rather than source records, so projection doesn't
	// apply — leave nil for tier-2 registrations.
	FieldInputs FieldInputsFunc
}

TestRegistration installs a custom TEST_* operator. Exactly one of RowFactory / PostFactory must be non-nil, matching Tier.

Streamable applies to tier-1 only and indicates whether the test can co-stream with online aggregators (no extra pass over the data).

type TestTier added in v0.7.0

type TestTier string

TestTier selects which factory shape a TestRegistration provides. Tier-1 row tests fold per-row during the streaming aggregation pass; tier-2 post-tests consume the materialized result row set after windows.

const (
	TestTierRow  TestTier = "tier1"
	TestTierPost TestTier = "tier2"
)

type VerifyResult added in v0.8.0

type VerifyResult = service.VerifyResult

VerifyResult carries the structured outcome of VerifyShardArchive. Errors aggregate every fatal cohesion failure discovered while walking the archive's shards; Warnings carry non-fatal divergences (per-field description drift, aggregate-record-count mismatch). An empty Errors slice means the archive is structurally sound.

type WindowRegistration added in v0.7.0

type WindowRegistration struct {
	Name        types.WindowType
	Description string
	Factory     window.WindowFactory
	Accepts     []encoding.FieldType
	Params      []ParamMeta
	// FieldInputs is the optional buffered-projection introspection
	// hook. See FieldInputsFunc.
	FieldInputs FieldInputsFunc
}

WindowRegistration installs a custom WIN_* operator. Window operators run buffered today; the runtime records custom windows as non-streamable. A streaming-window shape lands when the runtime gains a streaming-window pipeline.

Directories

Path Synopsis
cmd
pulse command
Package main is the entry point for the pulse CLI binary.
Package main is the entry point for the pulse CLI binary.
Package descriptor provides self-description, manifest, and predict functionality for pulse.
Package descriptor provides self-description, manifest, and predict functionality for pulse.
Package encoding handles the binary .pulse file format: reading, writing, and schema management.
Package encoding handles the binary .pulse file format: reading, writing, and schema management.
Package errors provides structured error codes and error handling for pulse.
Package errors provides structured error codes and error handling for pulse.
Package examples embeds the catalogue of runnable Pulse request JSON files and exposes a search/get API over them.
Package examples embeds the catalogue of runnable Pulse request JSON files and exposes a search/get API over them.
Package fs provides the filesystem abstraction layer for pulse storage backends.
Package fs provides the filesystem abstraction layer for pulse storage backends.
Package imports manages tabular-source imports with a TTL-tracked on-disk pool.
Package imports manages tabular-source imports with a TTL-tracked on-disk pool.
internal
cli
Package cli provides internal CLI wiring and command construction for the pulse binary.
Package cli provides internal CLI wiring and command construction for the pulse binary.
mcp
Package mcp wraps the Pulse library facade in the Model Context Protocol surface.
Package mcp wraps the Pulse library facade in the Model Context Protocol surface.
mcp/mcptools
Package mcptools holds the metadata table for the MCP tools registered by internal/mcp.
Package mcptools holds the metadata table for the MCP tools registered by internal/mcp.
query
Package query is the heuristic English-to-types.Request parser used by the pulse.Ask facade and the `pulse api ask` CLI leaf.
Package query is the heuristic English-to-types.Request parser used by the pulse.Ask facade and the `pulse api ask` CLI leaf.
io
Package io defines the I/O pipeline framework for Pulse: Reader/Writer interfaces, schema inference, and job types (ImportJob, ExportJob, ConvertJob).
Package io defines the I/O pipeline framework for Pulse: Reader/Writer interfaces, schema inference, and job types (ImportJob, ExportJob, ConvertJob).
arrow
Package arrow provides Arrow IPC (Feather V2) import and export for the pulse I/O pipeline, plus shared Arrow<->Pulse type-mapping helpers used by both this package and io/parquet.
Package arrow provides Arrow IPC (Feather V2) import and export for the pulse I/O pipeline, plus shared Arrow<->Pulse type-mapping helpers used by both this package and io/parquet.
csv
Package csv provides CSV format adapters for the Pulse I/O pipeline.
Package csv provides CSV format adapters for the Pulse I/O pipeline.
excel
Package excel provides Excel import and export for the pulse I/O pipeline.
Package excel provides Excel import and export for the pulse I/O pipeline.
format
Package format dispatches tabular Reader construction by format identifier, sitting between the io/ interface definitions and the per-format leaf packages (io/csv, io/tsv, io/ndjson, io/jsonarray, io/parquet, io/arrow, io/excel).
Package format dispatches tabular Reader construction by format identifier, sitting between the io/ interface definitions and the per-format leaf packages (io/csv, io/tsv, io/ndjson, io/jsonarray, io/parquet, io/arrow, io/excel).
jsonarray
Package jsonarray provides JSON-array import and export for the pulse I/O pipeline.
Package jsonarray provides JSON-array import and export for the pulse I/O pipeline.
jsonshared
Package jsonshared holds value coercion helpers shared by the ndjson and jsonarray packages.
Package jsonshared holds value coercion helpers shared by the ndjson and jsonarray packages.
ndjson
Package ndjson provides NDJSON (newline-delimited JSON) import and export for the pulse I/O pipeline.
Package ndjson provides NDJSON (newline-delimited JSON) import and export for the pulse I/O pipeline.
parquet
Package parquet provides Parquet import and export for the pulse I/O pipeline.
Package parquet provides Parquet import and export for the pulse I/O pipeline.
tsv
Package tsv provides TSV import and export for the pulse I/O pipeline.
Package tsv provides TSV import and export for the pulse I/O pipeline.
Package processing provides the single dynamic processing engine for Pulse.
Package processing provides the single dynamic processing engine for Pulse.
arena
Package arena provides a bump-allocator backed by a single contiguous []byte.
Package arena provides a bump-allocator backed by a single contiguous []byte.
feature
Package feature implements the FEAT_* operators that run pre-filter to add derived columns to a record stream.
Package feature implements the FEAT_* operators that run pre-filter to add derived columns to a record stream.
regression
Package regression hosts the REG_* operators that fit a regression model against the filtered record set.
Package regression hosts the REG_* operators that fit a regression model against the filtered record set.
window
Package window implements the WIN_* window operators for Pulse.
Package window implements the WIN_* window operators for Pulse.
Package service provides the orchestration layer for pulse operations.
Package service provides the orchestration layer for pulse operations.
Package skills provides the embedded skill pack for LLM-driven agents.
Package skills provides the embedded skill pack for LLM-driven agents.
Package synth produces synthetic .pulse cohorts from either a schema declaration ("from-schema") or a statistical profile of a real cohort ("from-profile").
Package synth produces synthetic .pulse cohorts from either a schema declaration ("from-schema") or a statistical profile of a real cohort ("from-profile").
Package types provides shared type definitions for pulse.
Package types provides shared type definitions for pulse.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL