encoding

package
v0.10.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 21, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package encoding handles the binary .pulse file format: reading, writing, and schema management.

Index

Constants

View Source
const FormatVersion byte = 0x01

FormatVersion is the current .pulse format version.

View Source
const HeaderSize = 9

HeaderSize is the total byte size of the file header (magic + version).

View Source
const MaxDecimalPrecision = 38

MaxDecimalPrecision is the upper bound on decimal128 precision.

View Source
const MaxDescriptionBytes = 1000

MaxDescriptionBytes is the maximum allowed byte length for a field description.

View Source
const MinDecimalScale = 4

MinDecimalScale is the floor on division result scale (matches Arrow).

View Source
const ReservedSchemaName = "_schema.pulse"

ReservedSchemaName is the canonical entry name inside a Pulse shard archive that carries the cohort's canonical schema, dictionaries, and sharding metadata (aggregate record count, shard count). The name is reserved — inserting a shard with this basename raises PULSE_SHARD_RESERVED_NAME at write time. Read-side enumeration of shards EXCLUDES this entry.

Variables

View Source
var MagicBytes = [8]byte{'P', 'U', 'L', 'S', 'E', 0x00, 0x00, 0x00}

MagicBytes identifies a .pulse file. 8 bytes: "PULSE\x00\x00\x00"

View Source
var SchemaDocMagic = [4]byte{'S', 'H', 'R', 'D'}

SchemaDocMagic is the four-byte marker that introduces the sharding metadata extension appended to a `_schema.pulse` entry after the standard schema block. Readers detect the marker and parse the extension; absence means the metadata is unavailable (the canonical fallback is to peek each shard's header for record counts).

View Source
var ZipMagic = [4]byte{'P', 'K', 0x03, 0x04}

ZipMagic is the standard PKZip local-file-header magic. Pulse shard archives begin with these four bytes when written as zip containers; single-file .pulse cohorts begin with MagicBytes ("PULSE\x00\x00\x00"). Pulse.Open dispatches on the first four bytes to pick the read path.

Functions

func BitmapIsNull added in v0.9.0

func BitmapIsNull(bitmap []byte, i int) bool

BitmapIsNull reports whether field at index i is marked null in bitmap. Bit ordering: field index i → byte i/8, bit i%8 (LSB-first within each byte). 1 = null, 0 = present.

func BitmapSetNull added in v0.9.0

func BitmapSetNull(bitmap []byte, i int)

BitmapSetNull marks the field at index i as null in bitmap. The caller must have pre-allocated bitmap to at least ceil((i+1)/8) bytes.

func EncodeDecimal128 added in v0.2.0

func EncodeDecimal128(d Decimal128) [16]byte

EncodeDecimal128 serializes a Decimal128 as 16 bytes of two's-complement little-endian integer.

func IsArchive added in v0.8.0

func IsArchive(r io.ReaderAt, size int64) (bool, error)

IsArchive reports whether the first four bytes of r identify a zip container. It does no central-directory parsing — magic-byte check only — so it is cheap enough to call on every Open. Errors are returned for I/O failures; a short file returns (false, nil).

func PromoteAdd added in v0.2.0

func PromoteAdd(p1, s1, p2, s2 uint8) (uint8, uint8)

PromoteAdd returns the (precision, scale) of a SUM/SUB result given two operand types per SQL:2016 / Arrow Decimal128 rules:

(p1, s1) ± (p2, s2) => (max(p1-s1, p2-s2) + max(s1, s2) + 1, max(s1, s2))

The result precision is clamped at MaxDecimalPrecision; clamping callers must check ClampedPrecision and emit PULSE_DECIMAL_OVERFLOW when overflow surfaces at runtime.

func PromoteDiv added in v0.2.0

func PromoteDiv(p1, s1, p2, s2 uint8) (uint8, uint8)

PromoteDiv returns the (precision, scale) of a DIV result.

(p1, s1) ÷ (p2, s2) => (p1 + s2 + 1, max(s1+s2, MIN_SCALE))

func PromoteMul added in v0.2.0

func PromoteMul(p1, s1, p2, s2 uint8) (uint8, uint8)

PromoteMul returns the (precision, scale) of a MUL result.

(p1, s1) × (p2, s2) => (p1 + p2, s1 + s2)

func ReadBit

func ReadBit(r io.Reader, bitPos uint) (bool, error)

ReadBit reads a single bit from the byte at the current position in r. bitPos is 0-7 within that byte.

func ReadBitmap added in v0.9.0

func ReadBitmap(r io.Reader, byteLen int) ([]byte, error)

ReadBitmap reads a per-record null bitmap of the given byte length and returns the raw bytes. byteLen MUST be Schema.BitmapByteSize(); callers invoke this only when the schema has at least one nullable field.

func ReadDescription

func ReadDescription(r io.Reader) (string, error)

ReadDescription reads a field description from r. Format: u16 length + utf8 bytes.

func ReadFieldValue

func ReadFieldValue(r io.Reader, ft FieldType) (uint64, error)

ReadFieldValue reads a single field value from r, returning raw bits as uint64. For bit-packed types (U4, PackedBool), use ReadBit/ReadNibble instead.

func ReadHeader

func ReadHeader(r io.Reader) error

ReadHeader reads and validates the .pulse file header from r.

func ReadNibble

func ReadNibble(r io.Reader, high bool) (uint8, error)

ReadNibble reads a 4-bit value from a byte. If high is true, it reads bits 4-7; otherwise bits 0-3.

func RewriteShardCategoricals added in v0.8.0

func RewriteShardCategoricals(shardBytes []byte, targetSchema *Schema, remap map[int]DictRemap) ([]byte, error)

RewriteShardCategoricals re-emits a single-file .pulse shard with its schema block replaced by targetSchema and its records' categorical indices translated according to remap. The on-wire byte layout of every non-categorical field is preserved exactly.

targetSchema must be structurally identical to the shard's own schema (field count, names, types, byte offsets, bit positions) — only the per-categorical dictionaries differ.

When remap is empty (every incoming dict was a prefix of canonical), the function still re-emits the shard with targetSchema so the shard's standalone dictionary matches the archive's canonical (callers reading the shard via the `archive.pulse#shard.pulse` anchor see the union dictionary). Record bytes are copied verbatim.

Returns PULSE_SHARD_HEADER_INVALID if the input bytes are not a valid single-file Pulse cohort, and PULSE_SHARD_SCHEMA_MISMATCH if targetSchema is structurally incompatible with the shard's own schema.

func ValidatePrecisionScale added in v0.2.0

func ValidatePrecisionScale(precision, scale uint8) error

ValidatePrecisionScale reports whether (precision, scale) form a legal decimal128 type spec (1 ≤ precision ≤ 38, 0 ≤ scale ≤ precision).

func WriteBit

func WriteBit(w io.Writer, bitPos uint, val bool) error

WriteBit writes a byte with a single bit set or cleared at bitPos.

func WriteBitmap added in v0.9.0

func WriteBitmap(w io.Writer, bitmap []byte) error

WriteBitmap writes a per-record null bitmap. The slice MUST be exactly Schema.BitmapByteSize() bytes; this helper is intentionally thin to keep the writer responsible for allocating and populating the buffer.

func WriteDecimal128 added in v0.2.0

func WriteDecimal128(w io.Writer, d Decimal128) error

WriteDecimal128 writes a Decimal128 to the .pulse record stream.

func WriteDescription

func WriteDescription(w io.Writer, desc string) error

WriteDescription writes a field description to w. Format: u16 length + utf8 bytes. Returns PULSE_IMPORT_DESCRIPTION_TOO_LONG if the UTF-8 byte length exceeds MaxDescriptionBytes.

func WriteFieldValue

func WriteFieldValue(w io.Writer, ft FieldType, val uint64) error

WriteFieldValue writes a single field value (as raw bits in uint64) to w. For bit-packed types (U4, PackedBool), use WriteBit/WriteNibble instead. For decimal128 (16-byte type), use the dedicated WriteDecimal128 helper; this function will reject those types with ENCODING_TYPE_MISMATCH.

func WriteHeader

func WriteHeader(w io.Writer) error

WriteHeader writes the .pulse file header to w.

func WriteNibble

func WriteNibble(w io.Writer, high bool, val uint8) error

WriteNibble writes a 4-bit value into a byte. If high is true, it writes to bits 4-7; otherwise bits 0-3. The other nibble is zero.

func WriteSchema

func WriteSchema(w io.Writer, s *Schema) error

WriteSchema serializes a schema to w. Format:

u16 field_count
per field:
  u8 type
  u8 nullable (0 or 1)
  u16 name_length + utf8 name
  u32 byte_offset
  u8 bit_position
  u16 csv_column_idx
  u16 description_length + utf8 description
  (if decimal128) u8 precision + u8 scale
  (if categorical) dictionary block

func WriteSchemaDoc added in v0.8.0

func WriteSchemaDoc(w io.Writer, schema *Schema, agg uint64, shardCount uint16) error

WriteSchemaDoc emits a `_schema.pulse` payload: standard Pulse header, standard schema block, then the sharding metadata extension (magic "SHRD" + u64 aggregate_record_count + u16 shard_count, little-endian). The result is a valid single-file Pulse cohort with zero records, so legacy readers that don't know about the SHRD extension can still parse the header + schema and stop at the (empty) record region — the extension bytes live where records would otherwise begin and a header-only reader (`Inspect`) ignores them.

Types

type Archive added in v0.8.0

type Archive struct {
	// contains filtered or unexported fields
}

Archive is a parsed Pulse shard archive: zip central-directory cache plus the underlying ReaderAt. Callers Open or OpenAt entries by name. The zero value is not usable; construct via OpenArchive.

func OpenArchive added in v0.8.0

func OpenArchive(r io.ReaderAt, size int64) (*Archive, error)

OpenArchive parses the zip central directory at the tail of r and returns an Archive ready for entry enumeration. The ReaderAt is retained — the caller is responsible for its lifetime. Returns PULSE_ARCHIVE_MAGIC_INVALID when r does not start with the zip magic, and PULSE_ARCHIVE_CORRUPT when the EOCD or central directory cannot be parsed.

func (*Archive) Entries added in v0.8.0

func (a *Archive) Entries() []ArchiveEntry

Entries returns every entry in central-directory order (equals shard insertion order for Pulse-written archives). The slice is a fresh copy; callers may mutate freely.

func (*Archive) IsStored added in v0.8.0

func (a *Archive) IsStored(name string) bool

IsStored reports whether the named entry uses store-only (Method 0) compression. Pulse-written shards are always store-only; this helper is exposed for callers that want to confirm the contract before using OpenAt on a section reader.

func (*Archive) Open added in v0.8.0

func (a *Archive) Open(name string) (io.ReadCloser, error)

Open returns an io.ReadCloser over the named entry's payload. Returns PULSE_SHARD_MISSING when no entry of that name exists in the central directory. Suitable for streaming reads; the closer must be drained or closed when done.

func (*Archive) OpenAt added in v0.8.0

func (a *Archive) OpenAt(name string) (io.SectionReader, error)

OpenAt returns an io.SectionReader covering the named entry's stored payload. Only safe for store-only (Method 0) entries — Pulse always writes Method 0, but third-party archives that pass through this path may carry deflated entries; the caller should verify the entry's Method (see Archive.IsStored) before consuming the section reader. Returns PULSE_SHARD_MISSING for unknown names.

func (*Archive) PeekShardHeader added in v0.8.0

func (a *Archive) PeekShardHeader(name string) error

PeekShardHeader reads the first encoding.HeaderSize bytes of the named entry and validates them as a single-file Pulse header (magic + format version). Returns PULSE_SHARD_HEADER_INVALID on mismatch so callers can surface a structured error without parsing the full schema. Used by S2+ phases when peeking shard metadata on first read.

func (*Archive) PeekShardRecordCount added in v0.8.0

func (a *Archive) PeekShardRecordCount(name string) (int64, error)

PeekShardRecordCount opens the named entry, reads its Pulse header and schema block, validates both, and computes the record count from the remaining bytes divided by the schema's per-record size. Used by `Pulse.Open` to populate `ShardEntry.RecordCount` from the per-shard headers (the source of truth — `_schema.pulse`'s AggregateRecordCount is only a sanity check).

Returns PULSE_SHARD_HEADER_INVALID when the shard's bytes are not a valid single-file Pulse cohort (bad magic, unsupported version, truncated header or schema).

type ArchiveEntry added in v0.8.0

type ArchiveEntry struct {
	// Name is the zip entry name (basename inside the archive). For
	// flat archives this is the shard filename, e.g. "20190101.pulse".
	Name string

	// Size is the uncompressed payload size in bytes.
	Size int64

	// Offset is the byte offset of the payload within the archive.
	Offset int64
}

ArchiveEntry is a single payload inside a Pulse shard archive. Offset is the byte offset of the entry's compressed/stored payload within the archive (the equivalent of zip.File.DataOffset), NOT the offset of the local file header. Pulse stores entries uncompressed (Method 0), so Offset is a direct pointer into the shard's bytes.

type CohesionWarning added in v0.8.0

type CohesionWarning struct {
	Code    string
	Message string
	Details map[string]any
}

CohesionWarning is a structured non-fatal divergence emitted by the schema-cohesion validators. Its shape mirrors the descriptor envelope's {code, message, details} entries so callers (typically `service/shard add` and `pulse shard verify`) can forward warnings through the standard --json output without reshaping.

encoding/ stays free of descriptor/ imports — this is the local equivalent.

func ValidateStructuralCohesion added in v0.8.0

func ValidateStructuralCohesion(canonical, incoming *Schema) ([]CohesionWarning, error)

ValidateStructuralCohesion verifies that incoming's structural schema is byte-equal to canonical's. The check is field-count strict and per-field strict on:

  • Name
  • Type byte
  • ByteOffset
  • BitPosition
  • Categorical-width identity (a categorical_u8 cannot become a categorical_u16 across shards — the width is fixed at folder creation)

Descriptions are advisory and may diverge across shards; per-field divergence emits a PULSE_SHARD_DESCRIPTION_DIVERGENCE warning but does NOT fail validation. Any other mismatch returns a coded PULSE_SHARD_SCHEMA_MISMATCH error.

This validator does NOT compare dictionaries — dictionary cohesion is governed by the append-only prefix rule (see ValidateDictPrefixRule).

type Decimal128 added in v0.2.0

type Decimal128 struct {
	// contains filtered or unexported fields
}

Decimal128 is a fixed-point decimal value with up to 38 digits of precision. Logically it is a signed 128-bit two's-complement integer mantissa, paired with a per-field scale that places the implicit decimal point. Internally it is carried as *big.Int (always copied, never aliased) bounded to the decimal128 representable range [-(10^38 - 1), 10^38 - 1].

func DecodeDecimal128 added in v0.2.0

func DecodeDecimal128(buf [16]byte) Decimal128

DecodeDecimal128 deserializes 16 bytes of two's-complement little-endian integer into a Decimal128. Null state is now carried by the per-record bitmap (see encoding.Schema.HasBitmap), so this function does not flag null values.

func NewDecimal128FromBigInt added in v0.2.0

func NewDecimal128FromBigInt(m *big.Int) (Decimal128, error)

NewDecimal128FromBigInt builds a Decimal128 from a big.Int mantissa. Returns PULSE_DECIMAL_OVERFLOW if |m| >= 10^38.

func NewDecimal128FromInt added in v0.2.0

func NewDecimal128FromInt(i int64) Decimal128

NewDecimal128FromInt builds a Decimal128 with the given integer mantissa.

func ParseDecimal128 added in v0.2.0

func ParseDecimal128(s string) (Decimal128, uint8, error)

ParseDecimal128 parses a strict decimal string into a (Decimal128, scale) pair. The scale is inferred from the input — exactly the count of digits after the decimal point. Accepts an optional single leading sign char. Rejects whitespace, thousand separators, scientific notation, currency symbols.

func ReadDecimal128 added in v0.2.0

func ReadDecimal128(r io.Reader) (Decimal128, error)

ReadDecimal128 reads 16 bytes of decimal128 from r and decodes them. Null state is carried by the per-record bitmap, not by the payload bytes, so this function has no null channel.

func ZeroDecimal128 added in v0.2.0

func ZeroDecimal128() Decimal128

ZeroDecimal128 returns a Decimal128 with mantissa 0.

func (Decimal128) Add added in v0.2.0

func (d Decimal128) Add(o Decimal128) (Decimal128, error)

Add returns d + o, treating both at the same scale. The result has the same scale; the caller is responsible for SQL:2016 precision propagation at the schema level.

func (Decimal128) Cmp added in v0.2.0

func (d Decimal128) Cmp(o Decimal128) int

Cmp compares two Decimal128 values with the same scale. Comparing values at different scales requires the caller to align scales first.

func (Decimal128) Div added in v0.2.0

func (d Decimal128) Div(o Decimal128, s1, s2, resultScale uint8) (Decimal128, error)

Div divides d by o using banker's rounding to produce a result at scale `resultScale` given the operands at scales s1 and s2. Returns PULSE_DECIMAL_DIVIDE_BY_ZERO when o is zero.

func (Decimal128) FitsPrecision added in v0.2.0

func (d Decimal128) FitsPrecision(precision uint8) bool

FitsPrecision reports whether the mantissa fits in `precision` digits.

func (Decimal128) Float64 added in v0.2.0

func (d Decimal128) Float64(scale uint8) float64

Float64 returns the decimal as a float64 at the given scale. Lossy for values that exceed float64 precision; callers preserving precision should keep using Decimal128 directly.

func (Decimal128) Mantissa added in v0.2.0

func (d Decimal128) Mantissa() *big.Int

Mantissa returns a copy of the unscaled integer mantissa.

func (Decimal128) Mul added in v0.2.0

func (d Decimal128) Mul(o Decimal128) (Decimal128, error)

Mul returns d * o. The product mantissa is the product of mantissas; callers are responsible for tracking scale propagation (s1 + s2).

func (Decimal128) Rescale added in v0.2.0

func (d Decimal128) Rescale(sourceScale, targetScale uint8) (Decimal128, error)

Rescale converts d at sourceScale to targetScale using banker's rounding. Returns PULSE_DECIMAL_OVERFLOW if the rescaled mantissa exceeds 10^38-1.

func (Decimal128) Sign added in v0.2.0

func (d Decimal128) Sign() int

Sign returns -1, 0, or +1 depending on the mantissa sign.

func (Decimal128) Sqrt added in v0.2.0

func (d Decimal128) Sqrt(sourceScale, targetScale uint8) (Decimal128, error)

Sqrt returns floor-banker-rounded sqrt(d) at the target scale, given the source value at sourceScale. The result is computed entirely in decimal arithmetic via big.Int.Sqrt and a half-to-even rounding step on the residual. Returns PULSE_DECIMAL_OVERFLOW if intermediate state cannot fit in the integer representation; returns PROCESSING_RUNTIME for negative inputs (sqrt of a negative decimal is undefined).

func (Decimal128) String added in v0.2.0

func (d Decimal128) String(scale uint8) string

String renders the decimal at the given scale. Trailing zeros are preserved; the leading sign is included only for negative values.

func (Decimal128) Sub added in v0.2.0

func (d Decimal128) Sub(o Decimal128) (Decimal128, error)

Sub returns d - o.

type DictRemap added in v0.8.0

type DictRemap = map[uint32]uint32

DictRemap is the per-field index translation produced by MergeDictUnion. The key is the incoming dictionary index; the value is the corresponding canonical (union) dictionary index. Fields whose incoming dictionary is already a prefix of the canonical (union) dictionary are absent from the returned map — callers can skip remapping their record bytes.

type Dictionary

type Dictionary struct {
	// contains filtered or unexported fields
}

Dictionary maps string values to sequential uint32 IDs. It is used for categorical field types to encode string categories as compact integer IDs in the binary record format.

func NewDictionary

func NewDictionary() *Dictionary

NewDictionary creates an empty dictionary.

func (*Dictionary) Add

func (d *Dictionary) Add(s string) (uint32, error)

Add inserts a string into the dictionary, returning its ID. If the string already exists, the existing ID is returned. There is no capacity limit with Add; use AddWithLimit to enforce one.

func (*Dictionary) AddWithLimit

func (d *Dictionary) AddWithLimit(s string, maxEntries uint32) (uint32, error)

AddWithLimit inserts a string, enforcing a maximum entry count. Returns PULSE_IMPORT_CATEGORICAL_OVERFLOW if the dictionary is full.

func (*Dictionary) Count

func (d *Dictionary) Count() int

Count returns the number of entries in the dictionary.

func (*Dictionary) IDFor

func (d *Dictionary) IDFor(s string) (uint32, bool)

IDFor looks up the ID for a string. Returns the ID and true if found, or 0 and false otherwise.

func (*Dictionary) ReadFrom

func (d *Dictionary) ReadFrom(r io.Reader) (int64, error)

ReadFrom deserializes a dictionary from r, replacing current contents. Format: u32 count + (u16 strlen + utf8 bytes) x count

Performance: a single byte buffer is grown to hold all string payloads across the dictionary, instead of allocating one []byte per entry. Each resolved string is copied once via the standard string([]byte) conversion (no unsafe), so we drop one allocation per entry while preserving the safety contract that callers can mutate the underlying buffer afterwards without affecting the stored strings.

func (*Dictionary) Resolve

func (d *Dictionary) Resolve(id uint32) string

Resolve returns the string for a given ID. Returns "" if the ID is out of range.

func (*Dictionary) Values

func (d *Dictionary) Values() []string

Values returns a copy of all dictionary values in insertion order.

func (*Dictionary) WriteTo

func (d *Dictionary) WriteTo(w io.Writer) (int64, error)

WriteTo serializes the dictionary to w. Format: u32 count + (u16 strlen + utf8 bytes) x count

type Field

type Field struct {
	Name         string
	Type         FieldType
	Nullable     bool // true ⇒ field participates in per-record null bitmap
	ByteOffset   int
	BitPosition  int
	CsvColumnIdx int
	Description  string      // empty = synthesized at inspect time
	Dictionary   *Dictionary // non-nil only for categorical types

	// Precision is the decimal128 precision (1-38). Meaningful only when
	// Type is FieldTypeDecimal128.
	Precision uint8
	// Scale is the decimal128 scale (0-Precision). Meaningful only when
	// Type is FieldTypeDecimal128.
	Scale uint8
}

Field describes a single column in a .pulse schema.

type FieldFilter added in v0.8.0

type FieldFilter func(name string) bool

FieldFilter returns true for field names whose values should be written into the caller's maps. Used by ReadRecordWithWideProjected to skip map writes for fields the request doesn't read. A nil FieldFilter is equivalent to "keep every field."

type FieldType

type FieldType byte

FieldType identifies the data type stored in a schema field.

const (
	FieldTypeU8             FieldType = iota // 0
	FieldTypeU16                             // 1
	FieldTypeU32                             // 2
	FieldTypeU64                             // 3
	FieldTypeF32                             // 4
	FieldTypeF64                             // 5
	FieldTypeU4                              // 6  (4-bit, bit-packed)
	FieldTypeDate                            // 7
	FieldTypePackedBool                      // 8  (1-bit, bit-packed)
	FieldTypeCategoricalU8                   // 9
	FieldTypeCategoricalU16                  // 10
	FieldTypeCategoricalU32                  // 11
	FieldTypeDecimal128                      // 12

)

All 13 field types supported by the .pulse format. Nullability is orthogonal to type — any field can be marked nullable via encoding.Field.Nullable and the per-record null bitmap carries the actual null state.

func (FieldType) ByteSize

func (ft FieldType) ByteSize() int

ByteSize returns the number of bytes this field type occupies in a record. Bit-packed types (U4, PackedBool) share bytes with adjacent fields and return 0 here; their on-wire stride is handled by Schema.RecordByteSize.

func (FieldType) IsBitPacked added in v0.9.0

func (ft FieldType) IsBitPacked() bool

IsBitPacked reports whether the field type shares its on-wire byte with adjacent fields via bit packing (U4, PackedBool). Used by stride math and the schema layout pass to advance the bit cursor instead of the byte cursor.

func (FieldType) IsCategorical

func (ft FieldType) IsCategorical() bool

IsCategorical reports whether the field type is one of the categorical types.

func (FieldType) IsDecimal added in v0.2.0

func (ft FieldType) IsDecimal() bool

IsDecimal reports whether the field type is decimal128.

func (FieldType) IsKnown added in v0.2.0

func (ft FieldType) IsKnown() bool

IsKnown reports whether the byte value corresponds to a registered type. Used by the schema reader to reject files written by a future binary version that introduces unknown type bytes.

func (FieldType) IsNumeric added in v0.7.2

func (ft FieldType) IsNumeric() bool

IsNumeric reports whether the field type is a strict scalar number: the unsigned-integer family (u8/u16/u32/u64), the float family (f32/f64), and decimal128. Date and bit-packed integer encodings are excluded — see IsNumericForAnalytics for the analytics-layer predicate.

func (FieldType) IsNumericForAnalytics added in v0.7.2

func (ft FieldType) IsNumericForAnalytics() bool

IsNumericForAnalytics reports whether the field type carries a meaningful scalar value for numeric analytics (regression, sum/avg/stddev/min/max/ variance aggregators). The set is broader than IsNumeric: bit-packed integer encodings (u4, packed_bool) and date are included because their stored representation is an ordinal / cardinal number the analytics layer can average, sum, or regress without an explicit ATTR_FORMULA cast.

Null exclusion is the reader's responsibility: the per-record null bitmap marks any field index as null at decode time so the downstream Record.NumericValue contract (returns ok=false on null) keeps the aggregation denominator honest.

func (FieldType) MaxCategoricalEntries

func (ft FieldType) MaxCategoricalEntries() uint32

MaxCategoricalEntries returns the maximum dictionary size for a categorical type. Returns 0 for non-categorical types.

func (FieldType) String

func (ft FieldType) String() string

String returns a human-readable name for the field type.

type RecordReader

type RecordReader struct {
	// contains filtered or unexported fields
}

RecordReader reads records one at a time from a binary stream. It reads directly from the io.Reader without buffering the entire file.

func NewRecordReader

func NewRecordReader(r io.Reader, schema *Schema) *RecordReader

NewRecordReader creates a RecordReader. The reader must be positioned immediately after the header and schema (i.e., at the first record byte).

func (*RecordReader) ReadRecord

func (rr *RecordReader) ReadRecord(values map[string]float64, nulls map[string]bool) error

ReadRecord reads a single record from the stream, populating the values and nulls maps. Returns io.EOF when no more records are available.

The caller provides pre-allocated maps to avoid per-record allocation. Maps are cleared at the start of each call.

Reuse contract: the maps are owned by the caller. ReadRecord does not retain references to them after returning. If the caller plans to reuse the same maps across calls (the typical pattern), they must consume the populated values BEFORE invoking ReadRecord again, because the next call clears and repopulates the maps in-place. If the caller needs to retain the values past the next call (e.g., collecting Records into a slice for later aggregation), it must pass distinct map instances per record OR copy the contents out before the next ReadRecord call.

To populate typed wide values for fields whose representation does not fit in float64 (decimal128), call ReadRecordWithWide instead and pass a third map.

func (*RecordReader) ReadRecordReused added in v0.2.0

func (rr *RecordReader) ReadRecordReused(rec ReusableRecord) error

ReadRecordReused reads one record into an existing ReusableRecord, reusing the record's internal maps. Returns io.EOF when the underlying reader is exhausted.

Hot path semantics:

  • Caller MUST consume the populated rec before the next call.
  • Fixed-size numeric fields are decoded via a single stack-resident [16]byte scratch + binary.LittleEndian, avoiding the per-field allocation of binary.Read's internal buffer.
  • Bit-packed and 16-byte fields fall back to the existing typed readers.

func (*RecordReader) ReadRecordWithWide added in v0.2.0

func (rr *RecordReader) ReadRecordWithWide(values map[string]float64, nulls map[string]bool, wide map[string]any) error

ReadRecordWithWide reads a record and populates a wide map with typed values for decimal128 fields. The wide map may be nil to skip wide population.

func (*RecordReader) ReadRecordWithWideProjected added in v0.8.0

func (rr *RecordReader) ReadRecordWithWideProjected(values map[string]float64, nulls map[string]bool, wide map[string]any, keep FieldFilter) error

ReadRecordWithWideProjected reads a record but only writes the fields for which keep(name) returns true into the caller's maps. Bytes for excluded fields are still consumed from the underlying reader so byte offsets stay aligned — projection saves map allocations, not decode work.

keep == nil falls back to the full-decode path.

type ReusableRecord added in v0.2.0

type ReusableRecord interface {
	SetNumeric(name string, value float64)
	SetNullField(name string)
	SetWideField(name string, value any)
	ClearForRow()
}

ReusableRecord is the subset of *processing.Record needed by the reuse fast path. Declared here as an interface so encoding/ does not depend on processing/. Implementations (processing.Record) MUST clear their own null/wide maps before this call returns successfully; the reader populates them in place but only on fields where the value applies.

type Schema

type Schema struct {
	Fields []Field
}

Schema holds all field descriptors for a .pulse file.

func MergeDictUnion added in v0.8.0

func MergeDictUnion(canonical, incoming *Schema) (*Schema, map[int]DictRemap, error)

MergeDictUnion is the relaxed dictionary cohesion check. For each categorical field it computes the union of canonical's and incoming's dictionaries: canonical entries first in their existing order, then any new entries from incoming in their order. Returns the extended canonical schema (deep copy when an extension is adopted; identical pointer when not) and a per-field-index map of DictRemap entries describing how to translate incoming record indices into canonical indices.

The structural validator (ValidateStructuralCohesion) must run FIRST — this validator assumes field positions, names, and categorical widths already match.

Errors:

  • PULSE_SHARD_DICT_WIDTH_OVERFLOW when the union would exceed the field's categorical width capacity.

MergeDictUnion is the default cohesion mode at insert time (CreateShardArchive / AddShard). The stricter prefix-only rule (ValidateDictPrefixRule) is retained for callers that want to surface divergence as an error instead of merging — used by VerifyShardArchive when checking archives written before the union-merge default landed and by embedders that prefer to align dictionaries upstream.

func ReadSchema

func ReadSchema(r io.Reader) (*Schema, error)

ReadSchema deserializes a schema from r.

func ValidateDictPrefixRule added in v0.8.0

func ValidateDictPrefixRule(canonical, incoming *Schema) (*Schema, error)

ValidateDictPrefixRule enforces the append-only prefix rule for every categorical field shared by canonical and incoming. For each `categorical_*` field:

  • If incoming's dictionary is a prefix of canonical's, accept the shard as-is; no canonical change. (Older shard that never saw newer dictionary values.)
  • If canonical's dictionary is a prefix of incoming's, the canonical schema adopts the extension. The returned extendedCanonical carries the merged dictionaries and the caller is responsible for rewriting `_schema.pulse` to publish them before the new shard is placed (crash-safety ordering documented in placeholder §3.2).
  • Neither is a prefix of the other → PULSE_SHARD_DICT_DIVERGENCE.
  • An extension that would exceed the field's width capacity (256 for u8, 65 536 for u16, 2^32 for u32) → PULSE_SHARD_DICT_WIDTH_OVERFLOW.

The structural validator (ValidateStructuralCohesion) must run FIRST — this validator assumes field positions, names, and categorical widths already match.

The returned extendedCanonical is a deep copy when an extension is adopted, and is identically the canonical input otherwise. Callers can compare pointers to detect whether a rewrite is required.

func (*Schema) BitmapByteSize added in v0.9.0

func (s *Schema) BitmapByteSize() int

BitmapByteSize returns the number of bytes the null bitmap occupies per record, or 0 when no field is nullable.

func (*Schema) Categorical

func (s *Schema) Categorical(name string) (*Dictionary, bool)

Categorical returns the dictionary for a named categorical field. Returns nil, false if the field is not found or is not categorical.

func (*Schema) Field

func (s *Schema) Field(name string) *Field

Field returns a pointer to the named field, or nil if not found.

func (*Schema) HasBitmap added in v0.9.0

func (s *Schema) HasBitmap() bool

HasBitmap reports whether any field in the schema is marked nullable. When true, every record carries a trailing null bitmap of ceil(field_count/8) bytes after the payload; when false, records have no bitmap (legacy fixed-stride path, zero overhead).

func (*Schema) RecordByteSize added in v0.8.4

func (s *Schema) RecordByteSize() int

RecordByteSize returns the on-wire stride of one record under this schema. Bit-packed fields (U4, PackedBool) report ByteSize()==0 but the wire format still consumes one whole byte per such field. When the schema declares at least one nullable field, the trailing null bitmap of ceil(field_count/8) bytes is appended to every record and included in the stride.

type SchemaDoc added in v0.8.0

type SchemaDoc struct {
	Schema               *Schema
	AggregateRecordCount uint64
	ShardCount           uint16
}

SchemaDoc carries the contents of a `_schema.pulse` entry: the canonical Schema plus the sharding metadata extension. AggregateRecordCount is the cached sum of every shard's record count at the time the archive was last written; ShardCount mirrors the zip central directory's entry count (excluding the reserved schema entry).

Both metadata fields are a SANITY CHECK, not the source of truth. Per the design contract, the per-shard headers are authoritative — callers populate live `ShardEntry.RecordCount` values by peeking each shard's header. AggregateRecordCount is exposed so cheap `pulse inspect` calls can report an approximate total without opening every shard.

func ReadSchemaDoc added in v0.8.0

func ReadSchemaDoc(r io.Reader) (*SchemaDoc, error)

ReadSchemaDoc parses a `_schema.pulse` payload. It always reads the Pulse header + standard schema block. The sharding metadata extension (magic "SHRD" + agg + shard count) is read when present and silently skipped when absent — older `_schema.pulse` artifacts or single-file cohorts addressed by mistake still parse cleanly, returning AggregateRecordCount=0 and ShardCount=0.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL