service

package
v0.7.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 15, 2026 License: MIT Imports: 20 Imported by: 0

Documentation

Overview

Package service provides the orchestration layer for pulse operations.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AskInput added in v0.5.0

type AskInput struct {
	// Request is the structured processing request. Required.
	Request *types.Request

	// OnInvalid controls behavior when predict reports the request as
	// invalid. "" / "abort" returns a SERVICE_VALIDATION error; "suggest"
	// returns a successful AskOutput with structured Suggestions populated.
	OnInvalid string

	// PredictOnly skips Process even when predict succeeds. Predict
	// always runs; the caller can read schema info, streamability,
	// suggestions, and defaults_applied from PredictResult.
	PredictOnly bool
}

AskInput captures the per-call options the facade hands the service. Mirrors the public pulse.AskRequest shape but reads the cohort path from the embedded request's Cohort so the service does not duplicate the path-resolution dance.

type AskOutput added in v0.5.0

type AskOutput struct {
	// Predict is always populated when Ask returns without error.
	Predict *descriptor.PredictResult

	// Process is the execution result. Nil when PredictOnly=true,
	// nil when predict reported the request invalid (OnInvalid="suggest").
	Process *types.Response

	// Suggestions enumerates structured Fixup entries derived from every
	// predict error code's metadata template, de-duplicated by
	// (Code, Action). Empty when there were no errors or when
	// OnInvalid != "suggest".
	Suggestions []errors.Fixup

	// Errors / Warnings echo the predict envelope's entries so callers
	// can present them without re-running predict. Never nil — empty slices
	// when there are none.
	Errors   []*descriptor.EnvelopeEntry
	Warnings []*descriptor.EnvelopeEntry
}

AskOutput is the service-level envelope returned by Service.Ask. The facade re-marshals into the public pulse.AskResponse — see pulse.go.

type Cohort

type Cohort struct {
	// contains filtered or unexported fields
}

Cohort represents an opened .pulse file with its parsed schema.

func (*Cohort) RecordCount

func (c *Cohort) RecordCount() (int64, error)

RecordCount returns the number of records in the cohort file. It reads the file and counts records based on the schema's per-record byte size.

func (*Cohort) Records

Records returns a streaming iterator over records in the cohort. Records are decoded lazily from disk — the full file is not materialized.

func (*Cohort) Schema

func (c *Cohort) Schema() *encoding.Schema

Schema returns the cohort's schema.

type ComposeOptions added in v0.2.0

type ComposeOptions struct {
	// MaxWorkers caps concurrent in-flight Process calls. Zero means
	// runtime.GOMAXPROCS(0). Negative values are clamped to 1.
	MaxWorkers int

	// PerRequestTimeout, if positive, derives a context.WithTimeout for
	// each request. Zero means no per-request timeout (the parent ctx's
	// deadline still applies).
	PerRequestTimeout time.Duration

	// FailFast cancels in-flight siblings on the first request error.
	// Default is true: surface errors quickly. Set false to collect every
	// request's outcome (errors aggregated into a single CodedError with
	// per-index detail).
	FailFast bool
}

ComposeOptions controls parallel execution of a ComposedRequest.

Order of responses is always preserved — slot-by-index — regardless of MaxWorkers or completion order.

type Row added in v0.2.0

type Row = map[string]any

Row is a single result row in a processing stream. Aliased so callers can write `service.Row` without leaking the underlying map type.

type RowIter added in v0.2.0

type RowIter interface {
	Next(ctx context.Context) (Row, bool, error)
	Close() error
	// Metadata returns the run metadata (total/filtered row counts,
	// cohort filename). Available after the iterator is exhausted; may
	// return nil before then. Streaming consumers that need metadata
	// before draining should call Process instead.
	Metadata() *types.ResponseMetadata
}

RowIter is a pull-based iterator over a processing result. Next reports (row, true, nil) for each available row, then (nil, false, nil) on exhaustion. Close releases any underlying file handles.

Implementations are NOT goroutine-safe. A single consumer per iterator.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service is the orchestration layer connecting filesystem, encoding, and processing.

func New

func New(fsConfig *fs.Config) *Service

New creates a new Service with the given filesystem configuration. Smart-defaults resolution runs by default; call DisableDefaults to opt out per instance (or pass pulse.Options{DisableDefaults: true} via the facade).

func (*Service) Ask added in v0.5.0

func (s *Service) Ask(ctx context.Context, in AskInput) (*AskOutput, error)

Ask is the unified pipeline that collapses inspect → predict → process into one call. It opens the cohort file, validates the request against the schema via Predict, and on success runs Process. The exposed pulse.Ask facade is a thin shim over this method.

OnInvalid governs predict-invalid behavior:

  • "" or "abort" — return a SERVICE_VALIDATION error wrapping the predict envelope.
  • "suggest" — return AskOutput with Suggestions populated; Process stays nil.

PredictOnly skips execution even on a valid request (the caller's "what would happen if I ran this?" probe).

func (*Service) Compose

func (s *Service) Compose(ctx context.Context, composed *types.ComposedRequest) ([]*types.Response, error)

Compose executes multiple requests, returning a response for each.

func (*Service) ComposeParallel added in v0.2.0

func (s *Service) ComposeParallel(
	ctx context.Context,
	composed *types.ComposedRequest,
	opts ComposeOptions,
) ([]*types.Response, error)

ComposeParallel runs every request in composed concurrently across a bounded worker pool. Responses are returned in the same order as composed.Requests; per-request errors are surfaced according to opts.

Registry factories return fresh stateful instances per request, so concurrent Process calls do not share aggregator/attribute state. Geo and decimal aggregators dispatch through buffered code paths that are also safe for concurrent invocation (no shared mutable state).

func (*Service) Extensions added in v0.7.0

func (s *Service) Extensions() *processing.ExtensionRegistry

Extensions returns the installed ExtensionRegistry, or nil when no extensions are registered. Exposed for descriptor/manifest emission and MCP schema-binding paths.

func (*Service) ExtensionsSnapshot added in v0.7.0

func (s *Service) ExtensionsSnapshot() *descriptor.ExtensionsSnapshot

ExtensionsSnapshot returns the descriptor-side projection of the registered extensions, or nil when no extensions are installed.

func (*Service) Facet

func (s *Service) Facet(ctx context.Context, path string, field string) ([]string, error)

Facet returns distinct values for the named field in the cohort. For categorical fields, it returns the dictionary values. For numeric fields, it returns string representations of all distinct values seen.

func (*Service) FacetSchema added in v0.7.0

func (s *Service) FacetSchema(ctx context.Context, req *types.FacetRequest) (*types.FacetResult, error)

FacetSchema is the rich multi-field facet implementation. The pulse.FacetSchema facade is a thin wrapper around this method.

Algorithm: one streaming pass over the cohort applies the base filterers, then for each requested field updates the appropriate accumulator (discrete dictionary count, online numeric stats, online histogram bin). Numeric percentiles force a buffered second-stage sort over the per-field non-null value slices. Additive contribution counts run a parallel discrete accumulator with the additive field's own filter clauses stripped from the base filter.

func (*Service) Open

func (s *Service) Open(_ context.Context, path string) (*Cohort, error)

Open reads a .pulse file and returns a Cohort with the parsed schema.

func (*Service) Process

func (s *Service) Process(ctx context.Context, req *types.Request) (*types.Response, error)

Process executes a single request against the specified cohort. Records are streamed from disk — the full file is never held in memory as raw bytes alongside the decoded records.

func (*Service) ProcessStream added in v0.2.0

func (s *Service) ProcessStream(ctx context.Context, req *types.Request) (RowIter, error)

ProcessStream executes a request and returns a pull-based row iterator over the result. The semantics match Process — same gates, same errors, same metadata — but the consumer can drain rows incrementally instead of receiving the whole []map[string]any up front.

Today the iterator buffers internally (calls Process and walks Data). The signature is forward-compatible with a future true-streaming path driven by the streaming Processor; consumers that adopt it now will pick up that change without a code change on their side.

func (*Service) Sample

func (s *Service) Sample(ctx context.Context, path string, n int) ([]map[string]any, error)

Sample returns up to n rows from the cohort as maps of field name to value. Streams from disk — stops reading as soon as n rows are collected.

func (*Service) SetDisableDefaults added in v0.5.0

func (s *Service) SetDisableDefaults(disabled bool)

SetDisableDefaults toggles the smart-defaults pass. Predict still computes and reports DefaultsApplied independently; this flag governs only what the runtime mutates before Process / Compose execution.

func (*Service) SetExtensions added in v0.7.0

func (s *Service) SetExtensions(r *processing.ExtensionRegistry)

SetExtensions installs an ExtensionRegistry containing embedder- registered operator overlays. The registry is read-only after this call; pass nil to clear. The processor consults this registry before falling through to built-in factories.

func (*Service) SetExtensionsSnapshot added in v0.7.0

func (s *Service) SetExtensionsSnapshot(snap *descriptor.ExtensionsSnapshot)

SetExtensionsSnapshot installs the descriptor-side projection of the registered extensions for manifest + predict consumption. Pass nil to clear; pulse.New populates this alongside SetExtensions.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL