Documentation
¶
Overview ¶
Package encoding handles the binary .pulse file format: reading, writing, and schema management.
Index ¶
- Constants
- Variables
- func EncodeDecimal128(d Decimal128) [16]byte
- func IsArchive(r io.ReaderAt, size int64) (bool, error)
- func NullDecimalSentinel() [16]byte
- func PromoteAdd(p1, s1, p2, s2 uint8) (uint8, uint8)
- func PromoteDiv(p1, s1, p2, s2 uint8) (uint8, uint8)
- func PromoteMul(p1, s1, p2, s2 uint8) (uint8, uint8)
- func ReadBit(r io.Reader, bitPos uint) (bool, error)
- func ReadDescription(r io.Reader) (string, error)
- func ReadFieldValue(r io.Reader, ft FieldType) (uint64, error)
- func ReadHeader(r io.Reader) error
- func ReadNibble(r io.Reader, high bool) (uint8, error)
- func RewriteShardCategoricals(shardBytes []byte, targetSchema *Schema, remap map[int]DictRemap) ([]byte, error)
- func ValidatePrecisionScale(precision, scale uint8) error
- func WriteBit(w io.Writer, bitPos uint, val bool) error
- func WriteDecimal128(w io.Writer, d Decimal128) error
- func WriteDecimal128Null(w io.Writer) error
- func WriteDescription(w io.Writer, desc string) error
- func WriteFieldValue(w io.Writer, ft FieldType, val uint64) error
- func WriteHeader(w io.Writer) error
- func WriteNibble(w io.Writer, high bool, val uint8) error
- func WriteSchema(w io.Writer, s *Schema) error
- func WriteSchemaDoc(w io.Writer, schema *Schema, agg uint64, shardCount uint16) error
- type Archive
- func (a *Archive) Entries() []ArchiveEntry
- func (a *Archive) IsStored(name string) bool
- func (a *Archive) Open(name string) (io.ReadCloser, error)
- func (a *Archive) OpenAt(name string) (io.SectionReader, error)
- func (a *Archive) PeekShardHeader(name string) error
- func (a *Archive) PeekShardRecordCount(name string) (int64, error)
- type ArchiveEntry
- type CohesionWarning
- type Decimal128
- func DecodeDecimal128(buf [16]byte) (Decimal128, bool)
- func NewDecimal128FromBigInt(m *big.Int) (Decimal128, error)
- func NewDecimal128FromInt(i int64) Decimal128
- func ParseDecimal128(s string) (Decimal128, uint8, error)
- func ReadDecimal128(r io.Reader) (Decimal128, bool, error)
- func ZeroDecimal128() Decimal128
- func (d Decimal128) Add(o Decimal128) (Decimal128, error)
- func (d Decimal128) Cmp(o Decimal128) int
- func (d Decimal128) Div(o Decimal128, s1, s2, resultScale uint8) (Decimal128, error)
- func (d Decimal128) FitsPrecision(precision uint8) bool
- func (d Decimal128) Float64(scale uint8) float64
- func (d Decimal128) Mantissa() *big.Int
- func (d Decimal128) Mul(o Decimal128) (Decimal128, error)
- func (d Decimal128) Rescale(sourceScale, targetScale uint8) (Decimal128, error)
- func (d Decimal128) Sign() int
- func (d Decimal128) Sqrt(sourceScale, targetScale uint8) (Decimal128, error)
- func (d Decimal128) String(scale uint8) string
- func (d Decimal128) Sub(o Decimal128) (Decimal128, error)
- type DictRemap
- type Dictionary
- func (d *Dictionary) Add(s string) (uint32, error)
- func (d *Dictionary) AddWithLimit(s string, maxEntries uint32) (uint32, error)
- func (d *Dictionary) Count() int
- func (d *Dictionary) IDFor(s string) (uint32, bool)
- func (d *Dictionary) ReadFrom(r io.Reader) (int64, error)
- func (d *Dictionary) Resolve(id uint32) string
- func (d *Dictionary) Values() []string
- func (d *Dictionary) WriteTo(w io.Writer) (int64, error)
- type Field
- type FieldFilter
- type FieldType
- func (ft FieldType) ByteSize() int
- func (ft FieldType) IsCategorical() bool
- func (ft FieldType) IsDecimal() bool
- func (ft FieldType) IsKnown() bool
- func (ft FieldType) IsNumeric() bool
- func (ft FieldType) IsNumericForAnalytics() bool
- func (ft FieldType) MaxCategoricalEntries() uint32
- func (ft FieldType) String() string
- type RecordReader
- func (rr *RecordReader) ReadRecord(values map[string]float64, nulls map[string]bool) error
- func (rr *RecordReader) ReadRecordReused(rec ReusableRecord) error
- func (rr *RecordReader) ReadRecordWithWide(values map[string]float64, nulls map[string]bool, wide map[string]any) error
- func (rr *RecordReader) ReadRecordWithWideProjected(values map[string]float64, nulls map[string]bool, wide map[string]any, ...) error
- type ReusableRecord
- type Schema
- type SchemaDoc
Constants ¶
const FormatVersion byte = 0x01
FormatVersion is the current .pulse format version.
const HeaderSize = 9
HeaderSize is the total byte size of the file header (magic + version).
const MaxDecimalPrecision = 38
MaxDecimalPrecision is the upper bound on decimal128 precision.
const MaxDescriptionBytes = 1000
MaxDescriptionBytes is the maximum allowed byte length for a field description.
const MinDecimalScale = 4
MinDecimalScale is the floor on division result scale (matches Arrow).
const ReservedSchemaName = "_schema.pulse"
ReservedSchemaName is the canonical entry name inside a Pulse shard archive that carries the cohort's canonical schema, dictionaries, and sharding metadata (aggregate record count, shard count). The name is reserved — inserting a shard with this basename raises PULSE_SHARD_RESERVED_NAME at write time. Read-side enumeration of shards EXCLUDES this entry.
Variables ¶
var MagicBytes = [8]byte{'P', 'U', 'L', 'S', 'E', 0x00, 0x00, 0x00}
MagicBytes identifies a .pulse file. 8 bytes: "PULSE\x00\x00\x00"
var SchemaDocMagic = [4]byte{'S', 'H', 'R', 'D'}
SchemaDocMagic is the four-byte marker that introduces the sharding metadata extension appended to a `_schema.pulse` entry after the standard schema block. Readers detect the marker and parse the extension; absence means the metadata is unavailable (the canonical fallback is to peek each shard's header for record counts).
var ZipMagic = [4]byte{'P', 'K', 0x03, 0x04}
ZipMagic is the standard PKZip local-file-header magic. Pulse shard archives begin with these four bytes when written as zip containers; single-file .pulse cohorts begin with MagicBytes ("PULSE\x00\x00\x00"). Pulse.Open dispatches on the first four bytes to pick the read path.
Functions ¶
func EncodeDecimal128 ¶ added in v0.2.0
func EncodeDecimal128(d Decimal128) [16]byte
EncodeDecimal128 serializes a Decimal128 as 16 bytes of two's-complement little-endian integer.
func IsArchive ¶ added in v0.8.0
IsArchive reports whether the first four bytes of r identify a zip container. It does no central-directory parsing — magic-byte check only — so it is cheap enough to call on every Open. Errors are returned for I/O failures; a short file returns (false, nil).
func NullDecimalSentinel ¶ added in v0.2.0
func NullDecimalSentinel() [16]byte
NullDecimalSentinel returns a copy of the canonical 16-byte NULL pattern for nullable_decimal128 fields.
func PromoteAdd ¶ added in v0.2.0
PromoteAdd returns the (precision, scale) of a SUM/SUB result given two operand types per SQL:2016 / Arrow Decimal128 rules:
(p1, s1) ± (p2, s2) => (max(p1-s1, p2-s2) + max(s1, s2) + 1, max(s1, s2))
The result precision is clamped at MaxDecimalPrecision; clamping callers must check ClampedPrecision and emit PULSE_DECIMAL_OVERFLOW when overflow surfaces at runtime.
func PromoteDiv ¶ added in v0.2.0
PromoteDiv returns the (precision, scale) of a DIV result.
(p1, s1) ÷ (p2, s2) => (p1 + s2 + 1, max(s1+s2, MIN_SCALE))
func PromoteMul ¶ added in v0.2.0
PromoteMul returns the (precision, scale) of a MUL result.
(p1, s1) × (p2, s2) => (p1 + p2, s1 + s2)
func ReadBit ¶
ReadBit reads a single bit from the byte at the current position in r. bitPos is 0-7 within that byte.
func ReadDescription ¶
ReadDescription reads a field description from r. Format: u16 length + utf8 bytes.
func ReadFieldValue ¶
ReadFieldValue reads a single field value from r, returning raw bits as uint64. For packed types (PackedBool, NullableBool, NullableU4), use ReadBit/ReadNibble instead.
func ReadHeader ¶
ReadHeader reads and validates the .pulse file header from r.
func ReadNibble ¶
ReadNibble reads a 4-bit value from a byte. If high is true, it reads bits 4-7; otherwise bits 0-3.
func RewriteShardCategoricals ¶ added in v0.8.0
func RewriteShardCategoricals(shardBytes []byte, targetSchema *Schema, remap map[int]DictRemap) ([]byte, error)
RewriteShardCategoricals re-emits a single-file .pulse shard with its schema block replaced by targetSchema and its records' categorical indices translated according to remap. The on-wire byte layout of every non-categorical field is preserved exactly.
targetSchema must be structurally identical to the shard's own schema (field count, names, types, byte offsets, bit positions) — only the per-categorical dictionaries differ.
When remap is empty (every incoming dict was a prefix of canonical), the function still re-emits the shard with targetSchema so the shard's standalone dictionary matches the archive's canonical (callers reading the shard via the `archive.pulse#shard.pulse` anchor see the union dictionary). Record bytes are copied verbatim.
Returns PULSE_SHARD_HEADER_INVALID if the input bytes are not a valid single-file Pulse cohort, and PULSE_SHARD_SCHEMA_MISMATCH if targetSchema is structurally incompatible with the shard's own schema.
func ValidatePrecisionScale ¶ added in v0.2.0
ValidatePrecisionScale reports whether (precision, scale) form a legal decimal128 type spec (1 ≤ precision ≤ 38, 0 ≤ scale ≤ precision).
func WriteDecimal128 ¶ added in v0.2.0
func WriteDecimal128(w io.Writer, d Decimal128) error
WriteDecimal128 writes a Decimal128 to the .pulse record stream.
func WriteDecimal128Null ¶ added in v0.2.0
WriteDecimal128Null writes the nullable_decimal128 NULL sentinel.
func WriteDescription ¶
WriteDescription writes a field description to w. Format: u16 length + utf8 bytes. Returns PULSE_IMPORT_DESCRIPTION_TOO_LONG if the UTF-8 byte length exceeds MaxDescriptionBytes.
func WriteFieldValue ¶
WriteFieldValue writes a single field value (as raw bits in uint64) to w. For packed types (PackedBool, NullableBool, NullableU4), use WriteBit/WriteNibble instead. For decimal128 / nullable_decimal128 (16-byte types), use the dedicated WriteDecimal128 helper; this function will reject those types with ENCODING_TYPE_MISMATCH.
func WriteHeader ¶
WriteHeader writes the .pulse file header to w.
func WriteNibble ¶
WriteNibble writes a 4-bit value into a byte. If high is true, it writes to bits 4-7; otherwise bits 0-3. The other nibble is zero.
func WriteSchema ¶
WriteSchema serializes a schema to w. Format:
u16 field_count per field: u8 type u16 name_length + utf8 name u32 byte_offset u8 bit_position u16 csv_column_idx u16 description_length + utf8 description (if categorical) dictionary block
func WriteSchemaDoc ¶ added in v0.8.0
WriteSchemaDoc emits a `_schema.pulse` payload: standard Pulse header, standard schema block, then the sharding metadata extension (magic "SHRD" + u64 aggregate_record_count + u16 shard_count, little-endian). The result is a valid single-file Pulse cohort with zero records, so legacy readers that don't know about the SHRD extension can still parse the header + schema and stop at the (empty) record region — the extension bytes live where records would otherwise begin and a header-only reader (`Inspect`) ignores them.
Types ¶
type Archive ¶ added in v0.8.0
type Archive struct {
// contains filtered or unexported fields
}
Archive is a parsed Pulse shard archive: zip central-directory cache plus the underlying ReaderAt. Callers Open or OpenAt entries by name. The zero value is not usable; construct via OpenArchive.
func OpenArchive ¶ added in v0.8.0
OpenArchive parses the zip central directory at the tail of r and returns an Archive ready for entry enumeration. The ReaderAt is retained — the caller is responsible for its lifetime. Returns PULSE_ARCHIVE_MAGIC_INVALID when r does not start with the zip magic, and PULSE_ARCHIVE_CORRUPT when the EOCD or central directory cannot be parsed.
func (*Archive) Entries ¶ added in v0.8.0
func (a *Archive) Entries() []ArchiveEntry
Entries returns every entry in central-directory order (equals shard insertion order for Pulse-written archives). The slice is a fresh copy; callers may mutate freely.
func (*Archive) IsStored ¶ added in v0.8.0
IsStored reports whether the named entry uses store-only (Method 0) compression. Pulse-written shards are always store-only; this helper is exposed for callers that want to confirm the contract before using OpenAt on a section reader.
func (*Archive) Open ¶ added in v0.8.0
func (a *Archive) Open(name string) (io.ReadCloser, error)
Open returns an io.ReadCloser over the named entry's payload. Returns PULSE_SHARD_MISSING when no entry of that name exists in the central directory. Suitable for streaming reads; the closer must be drained or closed when done.
func (*Archive) OpenAt ¶ added in v0.8.0
func (a *Archive) OpenAt(name string) (io.SectionReader, error)
OpenAt returns an io.SectionReader covering the named entry's stored payload. Only safe for store-only (Method 0) entries — Pulse always writes Method 0, but third-party archives that pass through this path may carry deflated entries; the caller should verify the entry's Method (see Archive.IsStored) before consuming the section reader. Returns PULSE_SHARD_MISSING for unknown names.
func (*Archive) PeekShardHeader ¶ added in v0.8.0
PeekShardHeader reads the first encoding.HeaderSize bytes of the named entry and validates them as a single-file Pulse header (magic + format version). Returns PULSE_SHARD_HEADER_INVALID on mismatch so callers can surface a structured error without parsing the full schema. Used by S2+ phases when peeking shard metadata on first read.
func (*Archive) PeekShardRecordCount ¶ added in v0.8.0
PeekShardRecordCount opens the named entry, reads its Pulse header and schema block, validates both, and computes the record count from the remaining bytes divided by the schema's per-record size. Used by `Pulse.Open` to populate `ShardEntry.RecordCount` from the per-shard headers (the source of truth — `_schema.pulse`'s AggregateRecordCount is only a sanity check).
Returns PULSE_SHARD_HEADER_INVALID when the shard's bytes are not a valid single-file Pulse cohort (bad magic, unsupported version, truncated header or schema).
type ArchiveEntry ¶ added in v0.8.0
type ArchiveEntry struct {
// Name is the zip entry name (basename inside the archive). For
// flat archives this is the shard filename, e.g. "20190101.pulse".
Name string
// Size is the uncompressed payload size in bytes.
Size int64
// Offset is the byte offset of the payload within the archive.
Offset int64
}
ArchiveEntry is a single payload inside a Pulse shard archive. Offset is the byte offset of the entry's compressed/stored payload within the archive (the equivalent of zip.File.DataOffset), NOT the offset of the local file header. Pulse stores entries uncompressed (Method 0), so Offset is a direct pointer into the shard's bytes.
type CohesionWarning ¶ added in v0.8.0
CohesionWarning is a structured non-fatal divergence emitted by the schema-cohesion validators. Its shape mirrors the descriptor envelope's {code, message, details} entries so callers (typically `service/shard add` and `pulse shard verify`) can forward warnings through the standard --json output without reshaping.
encoding/ stays free of descriptor/ imports — this is the local equivalent.
func ValidateStructuralCohesion ¶ added in v0.8.0
func ValidateStructuralCohesion(canonical, incoming *Schema) ([]CohesionWarning, error)
ValidateStructuralCohesion verifies that incoming's structural schema is byte-equal to canonical's. The check is field-count strict and per-field strict on:
- Name
- Type byte
- ByteOffset
- BitPosition
- Categorical-width identity (a categorical_u8 cannot become a categorical_u16 across shards — the width is fixed at folder creation)
Descriptions are advisory and may diverge across shards; per-field divergence emits a PULSE_SHARD_DESCRIPTION_DIVERGENCE warning but does NOT fail validation. Any other mismatch returns a coded PULSE_SHARD_SCHEMA_MISMATCH error.
This validator does NOT compare dictionaries — dictionary cohesion is governed by the append-only prefix rule (see ValidateDictPrefixRule).
type Decimal128 ¶ added in v0.2.0
type Decimal128 struct {
// contains filtered or unexported fields
}
Decimal128 is a fixed-point decimal value with up to 38 digits of precision. Logically it is a signed 128-bit two's-complement integer mantissa, paired with a per-field scale that places the implicit decimal point. Internally it is carried as *big.Int (always copied, never aliased) bounded to the decimal128 representable range [-(10^38 - 1), 10^38 - 1].
func DecodeDecimal128 ¶ added in v0.2.0
func DecodeDecimal128(buf [16]byte) (Decimal128, bool)
DecodeDecimal128 deserializes 16 bytes of two's-complement little-endian integer into a Decimal128. Returns the null sentinel detection result in the second return value (true == NULL bit pattern).
func NewDecimal128FromBigInt ¶ added in v0.2.0
func NewDecimal128FromBigInt(m *big.Int) (Decimal128, error)
NewDecimal128FromBigInt builds a Decimal128 from a big.Int mantissa. Returns PULSE_DECIMAL_OVERFLOW if |m| >= 10^38.
func NewDecimal128FromInt ¶ added in v0.2.0
func NewDecimal128FromInt(i int64) Decimal128
NewDecimal128FromInt builds a Decimal128 with the given integer mantissa.
func ParseDecimal128 ¶ added in v0.2.0
func ParseDecimal128(s string) (Decimal128, uint8, error)
ParseDecimal128 parses a strict decimal string into a (Decimal128, scale) pair. The scale is inferred from the input — exactly the count of digits after the decimal point. Accepts an optional single leading sign char. Rejects whitespace, thousand separators, scientific notation, currency symbols.
func ReadDecimal128 ¶ added in v0.2.0
func ReadDecimal128(r io.Reader) (Decimal128, bool, error)
ReadDecimal128 reads 16 bytes of decimal128 from r and decodes them. Returns the value and a flag indicating whether the bit pattern matches the NULL sentinel.
func ZeroDecimal128 ¶ added in v0.2.0
func ZeroDecimal128() Decimal128
ZeroDecimal128 returns a Decimal128 with mantissa 0.
func (Decimal128) Add ¶ added in v0.2.0
func (d Decimal128) Add(o Decimal128) (Decimal128, error)
Add returns d + o, treating both at the same scale. The result has the same scale; the caller is responsible for SQL:2016 precision propagation at the schema level.
func (Decimal128) Cmp ¶ added in v0.2.0
func (d Decimal128) Cmp(o Decimal128) int
Cmp compares two Decimal128 values with the same scale. Comparing values at different scales requires the caller to align scales first.
func (Decimal128) Div ¶ added in v0.2.0
func (d Decimal128) Div(o Decimal128, s1, s2, resultScale uint8) (Decimal128, error)
Div divides d by o using banker's rounding to produce a result at scale `resultScale` given the operands at scales s1 and s2. Returns PULSE_DECIMAL_DIVIDE_BY_ZERO when o is zero.
func (Decimal128) FitsPrecision ¶ added in v0.2.0
func (d Decimal128) FitsPrecision(precision uint8) bool
FitsPrecision reports whether the mantissa fits in `precision` digits.
func (Decimal128) Float64 ¶ added in v0.2.0
func (d Decimal128) Float64(scale uint8) float64
Float64 returns the decimal as a float64 at the given scale. Lossy for values that exceed float64 precision; callers preserving precision should keep using Decimal128 directly.
func (Decimal128) Mantissa ¶ added in v0.2.0
func (d Decimal128) Mantissa() *big.Int
Mantissa returns a copy of the unscaled integer mantissa.
func (Decimal128) Mul ¶ added in v0.2.0
func (d Decimal128) Mul(o Decimal128) (Decimal128, error)
Mul returns d * o. The product mantissa is the product of mantissas; callers are responsible for tracking scale propagation (s1 + s2).
func (Decimal128) Rescale ¶ added in v0.2.0
func (d Decimal128) Rescale(sourceScale, targetScale uint8) (Decimal128, error)
Rescale converts d at sourceScale to targetScale using banker's rounding. Returns PULSE_DECIMAL_OVERFLOW if the rescaled mantissa exceeds 10^38-1.
func (Decimal128) Sign ¶ added in v0.2.0
func (d Decimal128) Sign() int
Sign returns -1, 0, or +1 depending on the mantissa sign.
func (Decimal128) Sqrt ¶ added in v0.2.0
func (d Decimal128) Sqrt(sourceScale, targetScale uint8) (Decimal128, error)
Sqrt returns floor-banker-rounded sqrt(d) at the target scale, given the source value at sourceScale. The result is computed entirely in decimal arithmetic via big.Int.Sqrt and a half-to-even rounding step on the residual. Returns PULSE_DECIMAL_OVERFLOW if intermediate state cannot fit in the integer representation; returns PROCESSING_RUNTIME for negative inputs (sqrt of a negative decimal is undefined).
func (Decimal128) String ¶ added in v0.2.0
func (d Decimal128) String(scale uint8) string
String renders the decimal at the given scale. Trailing zeros are preserved; the leading sign is included only for negative values.
func (Decimal128) Sub ¶ added in v0.2.0
func (d Decimal128) Sub(o Decimal128) (Decimal128, error)
Sub returns d - o.
type DictRemap ¶ added in v0.8.0
DictRemap is the per-field index translation produced by MergeDictUnion. The key is the incoming dictionary index; the value is the corresponding canonical (union) dictionary index. Fields whose incoming dictionary is already a prefix of the canonical (union) dictionary are absent from the returned map — callers can skip remapping their record bytes.
type Dictionary ¶
type Dictionary struct {
// contains filtered or unexported fields
}
Dictionary maps string values to sequential uint32 IDs. It is used for categorical field types to encode string categories as compact integer IDs in the binary record format.
func (*Dictionary) Add ¶
func (d *Dictionary) Add(s string) (uint32, error)
Add inserts a string into the dictionary, returning its ID. If the string already exists, the existing ID is returned. There is no capacity limit with Add; use AddWithLimit to enforce one.
func (*Dictionary) AddWithLimit ¶
func (d *Dictionary) AddWithLimit(s string, maxEntries uint32) (uint32, error)
AddWithLimit inserts a string, enforcing a maximum entry count. Returns PULSE_IMPORT_CATEGORICAL_OVERFLOW if the dictionary is full.
func (*Dictionary) Count ¶
func (d *Dictionary) Count() int
Count returns the number of entries in the dictionary.
func (*Dictionary) IDFor ¶
func (d *Dictionary) IDFor(s string) (uint32, bool)
IDFor looks up the ID for a string. Returns the ID and true if found, or 0 and false otherwise.
func (*Dictionary) ReadFrom ¶
func (d *Dictionary) ReadFrom(r io.Reader) (int64, error)
ReadFrom deserializes a dictionary from r, replacing current contents. Format: u32 count + (u16 strlen + utf8 bytes) x count
Performance: a single byte buffer is grown to hold all string payloads across the dictionary, instead of allocating one []byte per entry. Each resolved string is copied once via the standard string([]byte) conversion (no unsafe), so we drop one allocation per entry while preserving the safety contract that callers can mutate the underlying buffer afterwards without affecting the stored strings.
func (*Dictionary) Resolve ¶
func (d *Dictionary) Resolve(id uint32) string
Resolve returns the string for a given ID. Returns "" if the ID is out of range.
func (*Dictionary) Values ¶
func (d *Dictionary) Values() []string
Values returns a copy of all dictionary values in insertion order.
type Field ¶
type Field struct {
Name string
Type FieldType
ByteOffset int
BitPosition int
CsvColumnIdx int
Description string // empty = synthesized at inspect time
Dictionary *Dictionary // non-nil only for categorical types
// Precision is the decimal128 precision (1-38). Meaningful only when
// Type is FieldTypeDecimal128 or FieldTypeNullableDecimal128.
Precision uint8
// Scale is the decimal128 scale (0-Precision). Meaningful only when
// Type is FieldTypeDecimal128 or FieldTypeNullableDecimal128.
Scale uint8
}
Field describes a single column in a .pulse schema.
type FieldFilter ¶ added in v0.8.0
FieldFilter returns true for field names whose values should be written into the caller's maps. Used by ReadRecordWithWideProjected to skip map writes for fields the request doesn't read. A nil FieldFilter is equivalent to "keep every field."
type FieldType ¶
type FieldType byte
FieldType identifies the data type stored in a schema field.
const ( FieldTypeU8 FieldType = iota // 0 FieldTypeU16 // 1 FieldTypeU32 // 2 FieldTypeU64 // 3 FieldTypeF32 // 4 FieldTypeF64 // 5 FieldTypeNullableBool // 6 FieldTypeNullableU4 // 7 FieldTypeNullableU8 // 8 FieldTypeNullableU16 // 9 FieldTypeDate // 10 FieldTypePackedBool // 11 FieldTypeCategoricalU8 // 12 FieldTypeCategoricalU16 // 13 FieldTypeCategoricalU32 // 14 FieldTypeDecimal128 // 15 FieldTypeNullableDecimal128 // 16 )
All 17 field types supported by the .pulse format.
func (FieldType) ByteSize ¶
ByteSize returns the number of bytes this field type occupies in a record. Packed types (PackedBool, NullableBool, NullableU4) share bytes with adjacent fields via bit packing and return 0 here.
func (FieldType) IsCategorical ¶
IsCategorical reports whether the field type is one of the categorical types.
func (FieldType) IsDecimal ¶ added in v0.2.0
IsDecimal reports whether the field type is a decimal128 variant.
func (FieldType) IsKnown ¶ added in v0.2.0
IsKnown reports whether the byte value corresponds to a registered type. Used by the schema reader to reject files written by a future binary version that introduces unknown type bytes.
func (FieldType) IsNumeric ¶ added in v0.7.2
IsNumeric reports whether the field type is a strict scalar number: the unsigned-integer family (u8/u16/u32/u64), the float family (f32/f64), and the decimal family (decimal128/nullable_decimal128). Date and bit-packed integer encodings are excluded — see IsNumericForAnalytics for the analytics-layer predicate.
func (FieldType) IsNumericForAnalytics ¶ added in v0.7.2
IsNumericForAnalytics reports whether the field type carries a meaningful scalar value for numeric analytics (regression, sum/avg/stddev/min/max/ variance aggregators). The set is broader than IsNumeric: bit-packed integer encodings (nullable_u4, nullable_bool, packed_bool) and date are included because their stored representation is an ordinal / cardinal number the analytics layer can average, sum, or regress without an explicit ATTR_FORMULA cast.
Null exclusion is the reader's responsibility: nullable_u4 marks 0x0F as null at decode time so the downstream Record.NumericValue contract (returns ok=false on null) keeps the aggregation denominator honest.
func (FieldType) MaxCategoricalEntries ¶
MaxCategoricalEntries returns the maximum dictionary size for a categorical type. Returns 0 for non-categorical types.
type RecordReader ¶
type RecordReader struct {
// contains filtered or unexported fields
}
RecordReader reads records one at a time from a binary stream. It reads directly from the io.Reader without buffering the entire file.
func NewRecordReader ¶
func NewRecordReader(r io.Reader, schema *Schema) *RecordReader
NewRecordReader creates a RecordReader. The reader must be positioned immediately after the header and schema (i.e., at the first record byte).
func (*RecordReader) ReadRecord ¶
ReadRecord reads a single record from the stream, populating the values and nulls maps. Returns io.EOF when no more records are available.
The caller provides pre-allocated maps to avoid per-record allocation. Maps are cleared at the start of each call.
Reuse contract: the maps are owned by the caller. ReadRecord does not retain references to them after returning. If the caller plans to reuse the same maps across calls (the typical pattern), they must consume the populated values BEFORE invoking ReadRecord again, because the next call clears and repopulates the maps in-place. If the caller needs to retain the values past the next call (e.g., collecting Records into a slice for later aggregation), it must pass distinct map instances per record OR copy the contents out before the next ReadRecord call.
To populate typed wide values for fields whose representation does not fit in float64 (decimal128), call ReadRecordWithWide instead and pass a third map.
func (*RecordReader) ReadRecordReused ¶ added in v0.2.0
func (rr *RecordReader) ReadRecordReused(rec ReusableRecord) error
ReadRecordReused reads one record into an existing ReusableRecord, reusing the record's internal maps. Returns io.EOF when the underlying reader is exhausted.
Hot path semantics:
- Caller MUST consume the populated rec before the next call.
- Fixed-size numeric fields are decoded via a single stack-resident [16]byte scratch + binary.LittleEndian, avoiding the per-field allocation of binary.Read's internal buffer.
- Bit-packed and 16-byte fields fall back to the existing typed readers.
func (*RecordReader) ReadRecordWithWide ¶ added in v0.2.0
func (rr *RecordReader) ReadRecordWithWide(values map[string]float64, nulls map[string]bool, wide map[string]any) error
ReadRecordWithWide reads a record and populates a wide map with typed values for decimal128 fields. The wide map may be nil to skip wide population.
func (*RecordReader) ReadRecordWithWideProjected ¶ added in v0.8.0
func (rr *RecordReader) ReadRecordWithWideProjected(values map[string]float64, nulls map[string]bool, wide map[string]any, keep FieldFilter) error
ReadRecordWithWideProjected reads a record but only writes the fields for which keep(name) returns true into the caller's maps. Bytes for excluded fields are still consumed from the underlying reader so byte offsets stay aligned — projection saves map allocations, not decode work.
keep == nil falls back to the full-decode path.
type ReusableRecord ¶ added in v0.2.0
type ReusableRecord interface {
SetNumeric(name string, value float64)
SetNullField(name string)
SetWideField(name string, value any)
ClearForRow()
}
ReusableRecord is the subset of *processing.Record needed by the reuse fast path. Declared here as an interface so encoding/ does not depend on processing/. Implementations (processing.Record) MUST clear their own null/wide maps before this call returns successfully; the reader populates them in place but only on fields where the value applies.
type Schema ¶
type Schema struct {
Fields []Field
}
Schema holds all field descriptors for a .pulse file.
func MergeDictUnion ¶ added in v0.8.0
MergeDictUnion is the relaxed dictionary cohesion check. For each categorical field it computes the union of canonical's and incoming's dictionaries: canonical entries first in their existing order, then any new entries from incoming in their order. Returns the extended canonical schema (deep copy when an extension is adopted; identical pointer when not) and a per-field-index map of DictRemap entries describing how to translate incoming record indices into canonical indices.
The structural validator (ValidateStructuralCohesion) must run FIRST — this validator assumes field positions, names, and categorical widths already match.
Errors:
- PULSE_SHARD_DICT_WIDTH_OVERFLOW when the union would exceed the field's categorical width capacity.
MergeDictUnion is the default cohesion mode at insert time (CreateShardArchive / AddShard). The stricter prefix-only rule (ValidateDictPrefixRule) is retained for callers that want to surface divergence as an error instead of merging — used by VerifyShardArchive when checking archives written before the union-merge default landed and by embedders that prefer to align dictionaries upstream.
func ReadSchema ¶
ReadSchema deserializes a schema from r.
func ValidateDictPrefixRule ¶ added in v0.8.0
ValidateDictPrefixRule enforces the append-only prefix rule for every categorical field shared by canonical and incoming. For each `categorical_*` field:
- If incoming's dictionary is a prefix of canonical's, accept the shard as-is; no canonical change. (Older shard that never saw newer dictionary values.)
- If canonical's dictionary is a prefix of incoming's, the canonical schema adopts the extension. The returned extendedCanonical carries the merged dictionaries and the caller is responsible for rewriting `_schema.pulse` to publish them before the new shard is placed (crash-safety ordering documented in placeholder §3.2).
- Neither is a prefix of the other → PULSE_SHARD_DICT_DIVERGENCE.
- An extension that would exceed the field's width capacity (256 for u8, 65 536 for u16, 2^32 for u32) → PULSE_SHARD_DICT_WIDTH_OVERFLOW.
The structural validator (ValidateStructuralCohesion) must run FIRST — this validator assumes field positions, names, and categorical widths already match.
The returned extendedCanonical is a deep copy when an extension is adopted, and is identically the canonical input otherwise. Callers can compare pointers to detect whether a rewrite is required.
func (*Schema) Categorical ¶
func (s *Schema) Categorical(name string) (*Dictionary, bool)
Categorical returns the dictionary for a named categorical field. Returns nil, false if the field is not found or is not categorical.
func (*Schema) RecordByteSize ¶ added in v0.8.4
RecordByteSize returns the on-wire stride of one record under this schema. Bit-packed fields (PackedBool, NullableBool, NullableU4) report ByteSize()==0 but the wire format still consumes one whole byte per such field (synth/writer.go and io/import.go both advance the byte cursor by one for these types and the RecordReader bit/ nibble helpers read one byte each). Summing ByteSize() directly therefore undercounts the stride on any schema that mixes byte- aligned and bit-packed fields — use this helper to keep stride math consistent with the wire writers and reader.
type SchemaDoc ¶ added in v0.8.0
SchemaDoc carries the contents of a `_schema.pulse` entry: the canonical Schema plus the sharding metadata extension. AggregateRecordCount is the cached sum of every shard's record count at the time the archive was last written; ShardCount mirrors the zip central directory's entry count (excluding the reserved schema entry).
Both metadata fields are a SANITY CHECK, not the source of truth. Per the design contract, the per-shard headers are authoritative — callers populate live `ShardEntry.RecordCount` values by peeking each shard's header. AggregateRecordCount is exposed so cheap `pulse inspect` calls can report an approximate total without opening every shard.
func ReadSchemaDoc ¶ added in v0.8.0
ReadSchemaDoc parses a `_schema.pulse` payload. It always reads the Pulse header + standard schema block. The sharding metadata extension (magic "SHRD" + agg + shard count) is read when present and silently skipped when absent — older `_schema.pulse` artifacts or single-file cohorts addressed by mistake still parse cleanly, returning AggregateRecordCount=0 and ShardCount=0.