pulse

package module
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 13, 2026 License: MIT Imports: 16 Imported by: 0

README

Pulse

High-performance, self-describing tabular data processing engine written in Go. Ships as a CLI binary and an embeddable Go library.

Pulse reads and writes .pulse files — a compact binary format with an inline schema, categorical dictionaries, and per-field descriptions. Import from CSV, TSV, NDJSON, Parquet, or Excel; run aggregations, filters, and groupings; export results back to any supported format.

Designed for LLM-native workflows: every command supports --json, a bundled skill pack teaches agents how to operate Pulse, and api predict validates requests before execution.

Installation

From source
git clone https://github.com/frankbardon/pulse.git
cd pulse
make build
# Binary at ./bin/pulse
Go install
go install github.com/frankbardon/pulse/cmd/pulse@latest

Requires Go 1.22+.

Quickstart

Import a CSV into a .pulse file
pulse import csv --input data.csv --output data.pulse

Schema is inferred automatically (up to 500 rows sampled). To supply an explicit schema:

# Generate a schema template from your data
pulse import schema-template data.csv > schema.json

# Edit schema.json — add descriptions, adjust types
# Then import with the schema
pulse import csv --input data.csv --schema schema.json --output data.pulse
Inspect the file
pulse cohort inspect data.pulse --json

Returns field names, types, byte offsets, descriptions, and categorical dictionaries.

Run an aggregation

Create a request file (request.json):

{
  "cohort": {"filename": "data.pulse"},
  "aggregations": [
    {"type": "AGG_COUNT", "field": "id", "label": "total"},
    {"type": "AGG_AVERAGE", "field": "score", "label": "avg_score"}
  ]
}
pulse api process --request request.json --json
Filter, group, and aggregate
{
  "cohort": {"filename": "data.pulse"},
  "filterers": [
    {"type": "FILTER_RANGE", "field": "score", "values": ["80", "100"]}
  ],
  "groups": [
    {"type": "GROUP_CATEGORY", "field": "brand"}
  ],
  "aggregations": [
    {"type": "AGG_COUNT", "field": "id", "label": "count"},
    {"type": "AGG_AVERAGE", "field": "score", "label": "avg_score"},
    {"type": "AGG_STDDEV", "field": "score", "label": "stddev"}
  ]
}
Validate before executing
pulse api predict --request request.json --json

Returns the proposed schema, warnings (e.g., numeric aggregation on a categorical field), and estimated row count without touching the data.

Export back to tabular
pulse export csv --input data.pulse --output results.csv
pulse export parquet --input data.pulse --output results.parquet
Convert between formats directly
pulse convert data.csv data.parquet
pulse convert data.xlsx output.tsv --schema schema.json

Format is auto-detected from file extensions. No intermediate .pulse file is written unless --keep-pulse=path is specified.

Sample rows
pulse api sample --input data.pulse --count 10
Get distinct values for a field
pulse api facet --input data.pulse --field brand

CLI Reference

pulse
├── --json                          Root manifest (self-description)
├── import
│   ├── csv|tsv|ndjson|parquet|excel  --input FILE --output FILE [--schema FILE]
│   ├── predict                       --input FILE [--schema FILE] --json
│   └── schema-template INPUT         Generate editable schema from source
├── export
│   ├── csv|tsv|ndjson|parquet|excel  --input FILE --output FILE
│   └── predict                       --input FILE --format FORMAT --json
├── convert INPUT OUTPUT [--schema FILE] [--keep-pulse PATH]
│   └── predict INPUT OUTPUT --json
├── cohort
│   ├── inspect PATH [--json] [--full-dict]
│   └── filter --input FILE --output FILE --filter EXPR
├── api
│   ├── process --request FILE [--json]
│   ├── compose --request FILE [--json]
│   ├── sample --input FILE --count N
│   ├── facet --input FILE --field NAME
│   └── predict --request FILE --json [--strict]
└── skills
    ├── list [--json]
    └── show NAME

Every leaf command supports --json for structured output wrapped in an envelope with format_version, data, errors, and warnings.

Embedding Pulse in a Go Application

Pulse is library-first. The CLI is a thin adapter over the public API.

package main

import (
    "context"
    "fmt"
    "log"

    "github.com/frankbardon/pulse"
    pio "github.com/frankbardon/pulse/io"
    "github.com/frankbardon/pulse/io/csv"
    "github.com/frankbardon/pulse/types"
)

func main() {
    ctx := context.Background()

    // Create a Pulse instance.
    p, err := pulse.New(pulse.Options{DataDir: "/var/data"})
    if err != nil {
        log.Fatal(err)
    }

    // Import a CSV.
    importJob := &pio.ImportJob{
        Source: csv.NewReader(nil, "input.csv"),
        Target: "dataset.pulse",
    }
    report, err := p.Import(ctx, importJob)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Imported %d rows\n", report.RowsImported)

    // Run an aggregation.
    resp, err := p.Process(ctx, &pulse.Request{
        Cohort: &types.Cohort{Filename: "dataset.pulse"},
        Aggregations: []*types.Aggregation{
            {Type: types.AGG_AVERAGE, Field: "score", Label: "avg"},
        },
    })
    if err != nil {
        log.Fatal(err)
    }
    fmt.Println(resp.Data)

    // Inspect a file.
    result, err := p.Inspect(ctx, "dataset.pulse")
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Fields: %d\n", result.FieldCount)

    // Validate a request before execution.
    prediction, err := p.Predict(ctx, &pulse.Request{
        Cohort: &types.Cohort{Filename: "dataset.pulse"},
        Aggregations: []*types.Aggregation{
            {Type: types.AGG_SUM, Field: "revenue"},
        },
    })
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Warnings: %v\n", prediction.Warnings)
}
Custom filesystem

Pulse accepts any afero.Fs for testing or non-local backends:

import "github.com/spf13/afero"

// In-memory filesystem for testing
p, _ := pulse.New(pulse.Options{
    FS: afero.NewMemMapFs(),
})

// Or a custom S3-backed filesystem
p, _ := pulse.New(pulse.Options{
    FS: myS3Fs,
})
Available types
import "github.com/frankbardon/pulse/types"

// Aggregations: AGG_COUNT, AGG_SUM, AGG_AVERAGE, AGG_MIN, AGG_MAX,
//               AGG_STDDEV, AGG_RANGE, AGG_FREQUENCY, AGG_ZSCORE

// Filters: FILTER_INCLUDE, FILTER_EXCLUDE, FILTER_RANGE, FILTER_EXPRESSION

// Groups: GROUP_CATEGORY, GROUP_ROUNDED

// Attributes: ATTR_ZSCORE, ATTR_TSCORE, ATTR_NORMALIZED,
//             ATTR_FORMULA, ATTR_PERCENTILE, ATTR_DATE_PART

// Windows: WIN_LAG, WIN_LEAD, WIN_ROW_NUMBER, WIN_RANK, WIN_DENSE_RANK,
//          WIN_RUNNING_SUM, WIN_RUNNING_AVG, WIN_MOVING_AVG,
//          WIN_EWMA, WIN_PCT_CHANGE

LLM Skill Pack

Pulse bundles 12 skill documents that teach LLM agents how to operate it. Skills are embedded in the binary via //go:embed — no external files needed.

Discovering skills
# List all bundled skills
pulse skills list

# List with metadata (for programmatic consumption)
pulse skills list --json

# Read a specific skill
pulse skills show aggregation-guide
Bundled skills
Skill Purpose
getting-started Pulse vocabulary, file format, CLI overview
cohort-schema-design Field types, nullability, descriptions
aggregation-guide When and how to use each aggregator
attribute-composition Derived attributes: z-score, formula, etc.
grouper-design GROUP_CATEGORY vs GROUP_ROUNDED
compose-requests Multi-request composition
debugging-with-predict Iterating with api predict
error-code-reference Error codes and recovery steps
import-best-practices Schema inference, fail-closed semantics
export-format-selection Choosing the right output format
Integrating with an LLM agent

The recommended workflow for an LLM agent using Pulse:

  1. Discover the surface: pulse --json returns the full manifest — commands, components, field types, and skills.
  2. Load relevant skills: based on the task, call pulse skills show <name> to inject domain guidance into the agent's context.
  3. Validate before execution: use pulse api predict to check a request for structural errors and warnings before running it.
  4. Execute: pulse api process --request req.json --json returns structured results.

From Go:

import "github.com/frankbardon/pulse/skills"

// At agent boot, load all skill metadata.
for _, s := range skills.List() {
    fmt.Printf("%s: %s\n", s.Name, s.Description)
}

// Inject a specific skill into the agent's context.
content, ok := skills.Get("aggregation-guide")
if ok {
    agent.AddContext(content)
}

The root manifest (pulse --json) includes a skills[] array so agents can discover available skills in a single call.

.pulse File Format

Binary, self-describing, fully transportable:

  • 9-byte header: magic bytes (PULSE\x00\x00\x00) + format version (0x01)
  • Schema block: field count, per-field descriptors (type, name, byte offset, bit position, CSV column index, optional description)
  • Dictionary blocks: one per categorical field (string-to-integer mapping stored inline)
  • Record data: fixed-width binary records, one per row

15 field types:

Type Bytes Notes
u8, u16, u32, u64 1, 2, 4, 8 Unsigned integers
f32, f64 4, 8 IEEE 754 floats
date 4 Days since Unix epoch
packed_bool 1 Single boolean
nullable_bool 1 Tri-state: true/false/null
nullable_u4 1 4-bit unsigned, nullable
nullable_u8, nullable_u16 1, 2 Nullable unsigned integers
categorical_u8, categorical_u16, categorical_u32 1, 2, 4 Dictionary-encoded strings

Categorical width is auto-selected from sample cardinality during import.

Configuration

Pulse uses a single environment variable:

export PULSE_DATA_DIR=/path/to/data

When set, the CLI resolves relative cohort paths against this directory. Not required — absolute paths always work.

No config files. No install command.

Development

Build
make build    # Binary at ./bin/pulse
make test     # go test ./...
make lint     # staticcheck (auto-installed via go run)
make cover    # Coverage report
make clean    # Remove artifacts
Run tests
# Full suite (17 packages, ~5 seconds)
go test ./...

# Single package
go test ./processing/...

# Verbose with specific test
go test ./service/... -v -run TestProcess

# Fuzz tests
go test ./encoding/... -fuzz FuzzPulseFileHeader -fuzztime 30s
Project structure
pulse/
├── pulse.go              Public facade: New, Open, Process, Import, Export, ...
├── cmd/pulse/            CLI binary (thin adapter)
├── encoding/             .pulse binary format: header, schema, records
├── processing/           Aggregators, attributes, filterers, groupers
├── service/              Orchestration layer
├── io/                   Bidirectional I/O pipeline
│   ├── csv/              CSV reader + writer
│   ├── tsv/              TSV reader + writer
│   ├── ndjson/           NDJSON reader + writer
│   ├── parquet/          Parquet reader + writer (Apache Arrow)
│   └── excel/            Excel reader + writer (Excelize)
├── fs/                   Filesystem abstraction (afero)
├── errors/               Typed error codes
├── types/                Request/response types
├── descriptor/           Self-description: manifest, predict, inspect
├── skills/               Embedded LLM skill pack
└── internal/cli/         CLI internals

License

MIT. See LICENSE.

Documentation

Overview

Package pulse is a high-performance, self-describing tabular data processing engine.

Pulse ships as a CLI binary and as an embeddable Go library. The library is the primary deliverable; the CLI is a thin adapter over it.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AskRequest added in v0.5.0

type AskRequest struct {
	File    string         `json:"file,omitempty"`
	Request *types.Request `json:"request,omitempty"`
	// Query is an optional natural-language query string. When set, the
	// server parses it against the cohort's schema and synthesises a
	// types.Request. The parsed request fills only the slots the
	// supplied Request left empty; explicit fields in Request always
	// win on collision. When Query is empty, Ask behaves exactly as
	// before (structured Request only).
	Query     string `json:"query,omitempty"`
	OnInvalid string `json:"on_invalid,omitempty"`
	Predict   bool   `json:"predict,omitempty"`

	// Source is an optional source-file path (csv, tsv, ndjson,
	// jsonarray, parquet, arrow, excel, or .pulse). When set and
	// Request.Cohort is unspecified, Ask runs the managed-import
	// pipeline first (same semantics as pulse.ImportFile / pulse_import)
	// and uses the resulting handle as the cohort for the rest of the
	// call. The import is sliding-window TTL-tracked — subsequent Ask
	// / Process / Inspect calls against the same handle bump expiry.
	Source string `json:"source,omitempty"`

	// SourceFormat overrides extension-based format detection on the
	// Source path. Empty falls back to imports.FormatFromExt(Source).
	SourceFormat string `json:"source_format,omitempty"`

	// SourceTTL overrides the default 7-day TTL on the auto-import.
	// Accepts the same forms as pulse_import: Go duration ("24h",
	// "30m"), day suffix ("7d", "30d"), or "pin" for never-expire.
	SourceTTL string `json:"source_ttl,omitempty"`

	// SourceHandle overrides the managed handle name (defaults to the
	// source basename without extension).
	SourceHandle string `json:"source_handle,omitempty"`

	// SourceSheet selects an Excel sheet; ignored for non-Excel sources.
	SourceSheet string `json:"source_sheet,omitempty"`

	// SourceOverwrite replaces an existing managed handle of the same
	// name. Defaults to false (collision -> PULSE_IMPORT_HANDLE_EXISTS).
	SourceOverwrite bool `json:"source_overwrite,omitempty"`
}

AskRequest is the input to pulse.Ask — the unified one-shot facade that collapses (optional) import, inspect, predict, and execute into a single call.

Cohort selection precedence (first non-empty wins):

  1. Request.Cohort.Filename — explicit cohort the caller already has.
  2. File — back-compat shorthand for Request.Cohort.Filename when Request.Cohort is nil.
  3. Source — a tabular source file (csv, tsv, ndjson, jsonarray, parquet, arrow, excel) or an existing .pulse. Ask imports it via the managed pool (default TTL 7d) and uses the resulting handle. This is the path that collapses pulse_import + pulse_ask into a single call — preferred for one-shot analytical queries.

OnInvalid governs what Ask does when predict reports the request as invalid:

  • "" or "abort" — return a SERVICE_VALIDATION error.
  • "suggest" — return an AskResponse with Suggestions populated.

Predict=true skips execution even on a valid request (the LLM's "what would happen if I ran this?" probe).

type AskResponse added in v0.5.0

type AskResponse struct {
	FormatVersion   string                    `json:"format_version"`
	Predict         *descriptor.PredictResult `json:"predict"`
	Process         *Response                 `json:"process,omitempty"`
	Suggestions     []errors.Fixup            `json:"suggestions,omitempty"`
	QueryResolution *QueryResolution          `json:"query_resolution,omitempty"`
	// Import is populated when AskRequest.Source triggered an
	// auto-import. Carries the same fields as ImportResult: managed
	// handle name, resulting path, format, row count, expiry. Nil
	// when no auto-import ran.
	Import   *ImportResult               `json:"import,omitempty"`
	Errors   []*descriptor.EnvelopeEntry `json:"errors"`
	Warnings []*descriptor.EnvelopeEntry `json:"warnings"`
}

AskResponse is the JSON-friendly envelope returned by pulse.Ask. It always carries the predict result; Process is populated only when execution ran; Suggestions is populated only on predict-invalid + OnInvalid="suggest".

FormatVersion mirrors the descriptor envelope version so callers can gate on a single value across endpoints.

type Cohort

type Cohort struct {
	// contains filtered or unexported fields
}

Cohort represents an opened .pulse file with its parsed schema. It wraps the service-layer Cohort to provide a clean public API.

func (*Cohort) Categorical

func (c *Cohort) Categorical(name string) (*encoding.Dictionary, bool)

Categorical returns the dictionary for a named categorical field. Returns nil, false if the field is not found or is not categorical.

func (*Cohort) Field

func (c *Cohort) Field(name string) *encoding.Field

Field returns a pointer to the named field, or nil if not found.

func (*Cohort) Schema

func (c *Cohort) Schema() *encoding.Schema

Schema returns the cohort's schema.

type ComposeOptions added in v0.2.0

type ComposeOptions = service.ComposeOptions

ComposeOptions controls parallel execution. See service.ComposeOptions.

type ComposedRequest

type ComposedRequest = types.ComposedRequest

Type aliases re-exported from the types package so embedders can use pulse.Request instead of types.Request.

type ErrorFixup added in v0.5.0

type ErrorFixup = errors.Fixup

ErrorFixup is one repair template attached to an error code.

type ErrorMetadata added in v0.5.0

type ErrorMetadata = errors.LookupResult

ErrorMetadata is the depth-on-demand projection returned by ErrorLookup, ErrorsByDomain, and ErrorsSearch. Carries the code, domain, canonical Message, and materialised Fixup templates.

type Example added in v0.5.0

type Example = examples.Example

Example is the full record returned by ExampleGet — runnable request JSON plus the indexed metadata.

type ExampleSummary added in v0.5.0

type ExampleSummary = examples.ExampleSummary

ExampleSummary is the lightweight projection returned by ExamplesSearch.

type ImportEntry added in v0.5.0

type ImportEntry = imports.Entry

Type aliases re-exported from the imports package so embedders can use pulse.ImportSpec instead of imports.Spec.

type ImportResult added in v0.5.0

type ImportResult = imports.Result

Type aliases re-exported from the imports package so embedders can use pulse.ImportSpec instead of imports.Spec.

type ImportSpec added in v0.5.0

type ImportSpec = imports.Spec

Type aliases re-exported from the imports package so embedders can use pulse.ImportSpec instead of imports.Spec.

type Options

type Options struct {
	// DataDir is the base directory for cohort files.
	// Defaults to PULSE_DATA_DIR if empty and FS is not set.
	DataDir string

	// FS is an optional custom filesystem.
	// When set, DataDir is ignored for filesystem construction.
	FS afero.Fs

	// DisableDefaults turns off the smart-defaults pass that infers
	// operator Type from the named field's schema type when the caller
	// omits it. Defaults to false (defaults enabled). Predict still
	// computes and reports DefaultsApplied independently — this flag
	// governs only what the runtime mutates on the live request.
	DisableDefaults bool

	// ImportsDir overrides the managed-imports directory. Defaults to
	// imports.DefaultImportsDir (resolved relative to the Pulse fs
	// root). Honoured before the PULSE_IMPORTS_DIR env var.
	ImportsDir string

	// ImportTTL overrides the default TTL applied to managed imports
	// when the caller does not pass one. Zero falls back to the
	// PULSE_IMPORT_TTL env var, then to imports.DefaultTTL. Negative
	// values pin imports (never expire) by default.
	ImportTTL time.Duration

	// ImportSourceFS is the afero.Fs used to read source files when
	// ImportFile / pulse_import receives an absolute path. Defaults to
	// afero.NewOsFs() on real OS installs, or to FS when FS is an
	// in-memory MemMapFs (so tests stay hermetic). Setting this
	// disables the default jail: the explicit fs IS the boundary.
	ImportSourceFS afero.Fs

	// ImportSourceJailRoot confines absolute source paths to a
	// directory tree. Empty string defaults to os.Getwd() at New
	// time — the natural sandbox for an MCP server or CLI invocation.
	// Ignored when ImportSourceFS is set explicitly.
	ImportSourceJailRoot string
}

Options configures a Pulse instance.

type Profile added in v0.2.0

type Profile = synth.Profile

Profile is the cohort statistical summary used by from-profile.

type ProfileOptions added in v0.2.0

type ProfileOptions = synth.ProfileOptions

ProfileOptions modulate which statistics the profiler captures.

type Pulse

type Pulse struct {
	// contains filtered or unexported fields
}

Pulse is the top-level library facade. It wraps the service layer and provides a clean API for embedding Pulse into Go programs.

func New

func New(opts Options) (*Pulse, error)

New creates a new Pulse instance with the given options.

func (*Pulse) Ask added in v0.5.0

func (p *Pulse) Ask(ctx context.Context, req *AskRequest) (*AskResponse, error)

Ask is the unified facade. It optionally imports a source file (when AskRequest.Source is set), validates the request against the cohort schema, and executes it. On validation failure it either errors (OnInvalid="abort") or returns structured fixup suggestions (OnInvalid="suggest"). Setting Predict=true skips execution after a successful predict pass.

Cohort precedence: Request.Cohort > File > Source (auto-import). The auto-import path collapses pulse_import + pulse_ask into a single call — the preferred one-shot entry point for analytical queries on raw source files.

func (*Pulse) Compose

func (p *Pulse) Compose(ctx context.Context, req *ComposedRequest) ([]*Response, error)

Compose executes multiple requests, returning a response for each.

func (*Pulse) ComposeParallel added in v0.2.0

func (p *Pulse) ComposeParallel(ctx context.Context, req *ComposedRequest, opts ComposeOptions) ([]*Response, error)

ComposeParallel runs every request in req concurrently across a bounded worker pool. Responses are returned in input order. Workers share the engine's read-only registries; each Process call constructs fresh stateful operators per request, so concurrent execution is safe.

Defaults: MaxWorkers = runtime.GOMAXPROCS(0), no per-request timeout, FailFast = true (set FailFast=false to collect every request's outcome instead of cancelling siblings on first error).

func (*Pulse) Convert

func (p *Pulse) Convert(ctx context.Context, job *pio.ConvertJob) (*pio.ConvertReport, error)

Convert chains import and export with no intermediate file on disk. The job's FS field is set to the Pulse instance's filesystem if not already set.

func (*Pulse) Drop added in v0.5.0

func (p *Pulse) Drop(ctx context.Context, handle string) error

Drop removes a managed import handle (and its sidecar) from the pool. Returns PULSE_IMPORT_SOURCE_MISSING when the handle is unknown.

func (*Pulse) ErrorLookup added in v0.5.0

func (p *Pulse) ErrorLookup(code string) (ErrorMetadata, bool)

ErrorLookup returns the metadata projection for a single error code. Case-sensitive exact match. Returns (ErrorMetadata{}, false) when the code is unknown.

The manifest carries only the alphabetized code-name list; per-code Message + Fixup detail lives behind this facade so per-session bootstrap stays lean. Use ErrorsByDomain / ErrorsSearch to enumerate in bulk.

func (*Pulse) ErrorsByDomain added in v0.5.0

func (p *Pulse) ErrorsByDomain(domain string) []ErrorMetadata

ErrorsByDomain returns every code's metadata in the named domain (CLI, DATA, ENCODING, PROCESSING, PULSE, SERVICE). Match is case-insensitive. Returns a non-nil empty slice when nothing matches; results are sorted alphabetically by code.

func (*Pulse) ErrorsSearch added in v0.5.0

func (p *Pulse) ErrorsSearch(query string) []ErrorMetadata

ErrorsSearch returns codes whose Message or Fixup hints contain the query (case-insensitive substring). Results are ranked by match source: description hits before fixup hits before code-name hits; ties resolve alphabetically. Returns a non-nil empty slice when nothing matches.

func (*Pulse) ExampleGet added in v0.5.0

func (p *Pulse) ExampleGet(name string) (*Example, bool)

ExampleGet returns the example whose _meta.name matches name. The returned Body is the request JSON with the _meta block stripped so it can be handed directly to Process / Predict.

func (*Pulse) ExamplesSearch added in v0.5.0

func (p *Pulse) ExamplesSearch(query string, tags []string, category string) []ExampleSummary

ExamplesSearch returns summaries from the embedded request-example library matching the given filters. An empty filter is treated as "no constraint" for that dimension. Query is case-insensitive substring search across name, description, and operators; tags is ANDed; category is an exact match. Always returns a non-nil slice (possibly empty) for safe JSON marshaling.

func (*Pulse) Export

func (p *Pulse) Export(ctx context.Context, job *pio.ExportJob) (*pio.ExportReport, error)

Export converts a .pulse file into tabular output. The job's FS field is set to the Pulse instance's filesystem if not already set.

func (*Pulse) Facet

func (p *Pulse) Facet(ctx context.Context, path string, field string) ([]string, error)

Facet returns distinct values for the named field in the cohort.

func (*Pulse) Fs added in v0.2.0

func (p *Pulse) Fs() afero.Fs

Fs returns the underlying afero.Fs. Embedders (e.g. the MCP server) need this to enumerate .pulse files; processing methods route through service and never expose the filesystem directly.

func (*Pulse) Import

func (p *Pulse) Import(ctx context.Context, job *pio.ImportJob) (*pio.ImportReport, error)

Import converts tabular source data into a .pulse file. The job's FS field is set to the Pulse instance's filesystem if not already set.

func (*Pulse) ImportFile added in v0.5.0

func (p *Pulse) ImportFile(ctx context.Context, spec ImportSpec) (*ImportResult, error)

ImportFile auto-detects the source format, converts the source into a managed .pulse file under the imports pool, and returns the resulting handle. Pulse-native sources pass through unchanged (no copy, no sidecar). Sliding-window TTL applies to managed handles — every subsequent Inspect / Predict / Process / Sample / Facet / Ask against the handle bumps the expiry forward.

func (*Pulse) Imports added in v0.5.0

func (p *Pulse) Imports(ctx context.Context) ([]ImportEntry, error)

Imports returns a snapshot of the managed-imports pool. Sweep is not invoked; expired entries are flagged via Entry.Expired so callers can render them. Results are sorted by handle name.

func (*Pulse) Inspect

func (p *Pulse) Inspect(ctx context.Context, path string) (*descriptor.InspectResult, error)

Inspect reads a .pulse file header and schema, returning structured field information. It never reads record data.

func (*Pulse) Manifest added in v0.5.0

func (p *Pulse) Manifest(_ context.Context) *descriptor.Manifest

Manifest returns the root Pulse self-description. The manifest is deterministic and process-wide: it does not depend on cohort data or the filesystem. Callers cache the result for a session.

func (*Pulse) Open

func (p *Pulse) Open(ctx context.Context, path string) (*Cohort, error)

Open reads a .pulse file and returns a Cohort with the parsed schema.

func (*Pulse) Predict

func (p *Pulse) Predict(ctx context.Context, req *Request) (*descriptor.PredictResult, error)

Predict validates a request against a .pulse file without executing it. It reads only the header and schema, never record data.

func (*Pulse) Process

func (p *Pulse) Process(ctx context.Context, req *Request) (*Response, error)

Process executes a single processing request against a cohort.

func (*Pulse) ProcessStream added in v0.2.0

func (p *Pulse) ProcessStream(ctx context.Context, req *Request) (RowIter, error)

ProcessStream executes a request and returns a pull-based row iterator over the result. Equivalent to Process for any request shape — same gates, same errors — but streaming consumers (HTTP responders, NDJSON writers, downstream pipelines) can drain rows one at a time without buffering the full result in their own memory.

Predict's Streamable flag reports whether the underlying execution avoids buffering inside the engine; ProcessStream wraps the result regardless, so the API is stable for non-streamable requests too.

func (*Pulse) Profile added in v0.2.0

func (p *Pulse) Profile(_ context.Context, path string, opts ProfileOptions) (*Profile, error)

Profile reads a .pulse file at path and returns a statistical summary suitable for from-profile synthesis. The profile retains no individual rows from the source data.

func (*Pulse) ResolveImport added in v0.5.0

func (p *Pulse) ResolveImport(ctx context.Context, handle string) (string, error)

ResolveImport returns the managed-pool path for a handle, or PULSE_IMPORT_SOURCE_MISSING when no such handle exists. Embedders who want to address a managed handle by name (instead of by path) run path through this resolver before passing it to Inspect/Process.

func (*Pulse) Sample

func (p *Pulse) Sample(ctx context.Context, path string, n int) ([]Record, error)

Sample returns up to n rows from the cohort as maps of field name to value.

func (*Pulse) SweepImports added in v0.5.0

func (p *Pulse) SweepImports(ctx context.Context) ([]string, error)

SweepImports removes every expired managed handle and returns the list of swept handle names. Invoked opportunistically by ImportFile and exposed here for callers that want explicit control (CLI maintenance, periodic ticker, etc.).

func (*Pulse) Synth added in v0.2.0

func (p *Pulse) Synth(_ context.Context, spec *SynthSpec, output string, opts SynthOptions) (*SynthResult, error)

Synth materializes a synthetic .pulse file at output from spec. The generator is deterministic for a given (spec, opts.Seed) pair: same seed produces a byte-identical file.

type QueryResolution added in v0.5.0

type QueryResolution struct {
	// Query is the verbatim input the parser processed.
	Query string `json:"query"`

	// MatchedFields is every schema field name the parser resolved
	// against, in first-appearance order.
	MatchedFields []string `json:"matched_fields"`

	// Confidence is the aggregate parser confidence in [0, 1].
	// Product of per-step weights (verb match × per-field match),
	// floored at 0 on PULSE_QUERY_UNRESOLVED.
	Confidence float64 `json:"confidence"`
}

QueryResolution echoes the parser's decision when AskRequest.Query was set. Nil when no query parse fired.

type Record

type Record = map[string]any

Record is a row of field→value data returned by Sample.

type Request

type Request = types.Request

Type aliases re-exported from the types package so embedders can use pulse.Request instead of types.Request.

type Response

type Response = types.Response

Type aliases re-exported from the types package so embedders can use pulse.Request instead of types.Request.

type Row added in v0.2.0

type Row = service.Row

Row is a single result row in a processing stream.

type RowIter added in v0.2.0

type RowIter = service.RowIter

RowIter is a pull-based iterator over a processing result. Each call to Next returns the next row or (nil, false, nil) on exhaustion. Close releases underlying resources. Metadata returns the run metadata once available (always present after the iterator is drained).

type SynthOptions added in v0.2.0

type SynthOptions = synth.Options

SynthOptions modulate the deterministic seed and other knobs.

type SynthResult added in v0.2.0

type SynthResult = synth.Result

SynthResult is the result of a successful Synth call.

type SynthSpec added in v0.2.0

type SynthSpec = synth.Spec

SynthSpec is the parsed synthesis request shape.

Directories

Path Synopsis
cmd
pulse command
Package main is the entry point for the pulse CLI binary.
Package main is the entry point for the pulse CLI binary.
Package descriptor provides self-description, manifest, and predict functionality for pulse.
Package descriptor provides self-description, manifest, and predict functionality for pulse.
Package encoding handles the binary .pulse file format: reading, writing, and schema management.
Package encoding handles the binary .pulse file format: reading, writing, and schema management.
Package errors provides structured error codes and error handling for pulse.
Package errors provides structured error codes and error handling for pulse.
Package examples embeds the catalogue of runnable Pulse request JSON files and exposes a search/get API over them.
Package examples embeds the catalogue of runnable Pulse request JSON files and exposes a search/get API over them.
Package fs provides the filesystem abstraction layer for pulse storage backends.
Package fs provides the filesystem abstraction layer for pulse storage backends.
Package imports manages tabular-source imports with a TTL-tracked on-disk pool.
Package imports manages tabular-source imports with a TTL-tracked on-disk pool.
internal
cli
Package cli provides internal CLI wiring and command construction for the pulse binary.
Package cli provides internal CLI wiring and command construction for the pulse binary.
mcp
Package mcp wraps the Pulse library facade in the Model Context Protocol surface.
Package mcp wraps the Pulse library facade in the Model Context Protocol surface.
mcp/mcptools
Package mcptools holds the metadata table for the MCP tools registered by internal/mcp.
Package mcptools holds the metadata table for the MCP tools registered by internal/mcp.
query
Package query is the heuristic English-to-types.Request parser used by the pulse.Ask facade and the `pulse api ask` CLI leaf.
Package query is the heuristic English-to-types.Request parser used by the pulse.Ask facade and the `pulse api ask` CLI leaf.
io
Package io defines the I/O pipeline framework for Pulse: Reader/Writer interfaces, schema inference, and job types (ImportJob, ExportJob, ConvertJob).
Package io defines the I/O pipeline framework for Pulse: Reader/Writer interfaces, schema inference, and job types (ImportJob, ExportJob, ConvertJob).
arrow
Package arrow provides Arrow IPC (Feather V2) import and export for the pulse I/O pipeline, plus shared Arrow<->Pulse type-mapping helpers used by both this package and io/parquet.
Package arrow provides Arrow IPC (Feather V2) import and export for the pulse I/O pipeline, plus shared Arrow<->Pulse type-mapping helpers used by both this package and io/parquet.
csv
Package csv provides CSV format adapters for the Pulse I/O pipeline.
Package csv provides CSV format adapters for the Pulse I/O pipeline.
excel
Package excel provides Excel import and export for the pulse I/O pipeline.
Package excel provides Excel import and export for the pulse I/O pipeline.
format
Package format dispatches tabular Reader construction by format identifier, sitting between the io/ interface definitions and the per-format leaf packages (io/csv, io/tsv, io/ndjson, io/jsonarray, io/parquet, io/arrow, io/excel).
Package format dispatches tabular Reader construction by format identifier, sitting between the io/ interface definitions and the per-format leaf packages (io/csv, io/tsv, io/ndjson, io/jsonarray, io/parquet, io/arrow, io/excel).
jsonarray
Package jsonarray provides JSON-array import and export for the pulse I/O pipeline.
Package jsonarray provides JSON-array import and export for the pulse I/O pipeline.
jsonshared
Package jsonshared holds value coercion helpers shared by the ndjson and jsonarray packages.
Package jsonshared holds value coercion helpers shared by the ndjson and jsonarray packages.
ndjson
Package ndjson provides NDJSON (newline-delimited JSON) import and export for the pulse I/O pipeline.
Package ndjson provides NDJSON (newline-delimited JSON) import and export for the pulse I/O pipeline.
parquet
Package parquet provides Parquet import and export for the pulse I/O pipeline.
Package parquet provides Parquet import and export for the pulse I/O pipeline.
tsv
Package tsv provides TSV import and export for the pulse I/O pipeline.
Package tsv provides TSV import and export for the pulse I/O pipeline.
Package processing provides the single dynamic processing engine for Pulse.
Package processing provides the single dynamic processing engine for Pulse.
arena
Package arena provides a bump-allocator backed by a single contiguous []byte.
Package arena provides a bump-allocator backed by a single contiguous []byte.
feature
Package feature implements the FEAT_* operators that run pre-filter to add derived columns to a record stream.
Package feature implements the FEAT_* operators that run pre-filter to add derived columns to a record stream.
window
Package window implements the WIN_* window operators for Pulse.
Package window implements the WIN_* window operators for Pulse.
Package service provides the orchestration layer for pulse operations.
Package service provides the orchestration layer for pulse operations.
Package skills provides the embedded skill pack for LLM-driven agents.
Package skills provides the embedded skill pack for LLM-driven agents.
Package synth produces synthetic .pulse cohorts from either a schema declaration ("from-schema") or a statistical profile of a real cohort ("from-profile").
Package synth produces synthetic .pulse cohorts from either a schema declaration ("from-schema") or a statistical profile of a real cohort ("from-profile").
Package types provides shared type definitions for pulse.
Package types provides shared type definitions for pulse.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL