Documentation
¶
Overview ¶
Package s3store provides append-only, versioned data storage on S3 with Parquet data files and a change stream. Pure Go, cgo-free. No server, no broker, no coordinator — S3 is the only dependency.
Capability sketch:
- Write, WriteWithKey: encode []T as Parquet, PUT to S3, write an empty stream-ref file.
- Read, ReadIter, ReadRangeIter: list Parquet files matching a key pattern, decode into []T, optionally dedup latest- per-entity in memory.
- Poll, PollRecords: stream ref entries or records from the ref stream, starting after a caller-supplied offset.
Every snapshot read defaults to latest-per-key deduplication (configured via EntityKeyOf + VersionOf) and accepts WithHistory() to opt out.
Glob patterns are restricted to a deliberately narrow grammar: whole-segment "*", a single trailing "*" inside a value (e.g. "period=2026-03-*"), and half-open "FROM..TO" ranges. See validateKeyPattern for the full spec.
Deduplication is in-memory per partition; large key cardinalities can exceed available RAM.
Index ¶
- Constants
- Variables
- func PartitionKeysOf(entries []StreamEntry) []string
- type BackfillStats
- type CompressionCodec
- type ConsistencyLevel
- type HivePartition
- type Layout
- type MaterializedViewDef
- type MaterializedViewLookupDef
- type MaterializedViewReader
- type Offset
- type PollOption
- type ReadOption
- type Reader
- func (s *Reader[T]) OffsetAt(t time.Time) Offset
- func (s *Reader[T]) Poll(ctx context.Context, since Offset, maxEntries int32, opts ...PollOption) (out []StreamEntry, nextOffset Offset, err error)
- func (s *Reader[T]) PollRange(ctx context.Context, since, until time.Time) ([]StreamEntry, error)
- func (s *Reader[T]) PollRecords(ctx context.Context, since Offset, maxEntries int32, opts ...PollOption) (out []T, nextOffset Offset, err error)
- func (s *Reader[T]) Read(ctx context.Context, keyPatterns []string, opts ...ReadOption) (out []T, err error)
- func (s *Reader[T]) ReadEntriesIter(ctx context.Context, entries []StreamEntry, opts ...ReadOption) iter.Seq2[T, error]
- func (s *Reader[T]) ReadIter(ctx context.Context, keyPatterns []string, opts ...ReadOption) iter.Seq2[T, error]
- func (s *Reader[T]) ReadPartitionEntriesIter(ctx context.Context, entries []StreamEntry, opts ...ReadOption) iter.Seq2[HivePartition[T], error]
- func (s *Reader[T]) ReadPartitionIter(ctx context.Context, keyPatterns []string, opts ...ReadOption) iter.Seq2[HivePartition[T], error]
- func (s *Reader[T]) ReadPartitionRangeIter(ctx context.Context, since, until time.Time, opts ...ReadOption) iter.Seq2[HivePartition[T], error]
- func (s *Reader[T]) ReadRangeIter(ctx context.Context, since, until time.Time, opts ...ReadOption) iter.Seq2[T, error]
- func (r *Reader[T]) Target() S3Target
- type ReaderConfig
- type S3Target
- func (t S3Target) Bucket() string
- func (t S3Target) CommitTimeout() time.Duration
- func (t S3Target) Config() S3TargetConfig
- func (t S3Target) ConsistencyControl() ConsistencyLevel
- func (t S3Target) EffectiveMaxInflightRequests() int
- func (t S3Target) MaxClockSkew() time.Duration
- func (t S3Target) PartitionKeyParts() []string
- func (t S3Target) Prefix() string
- func (t S3Target) S3Client() *s3.Client
- func (t S3Target) SettleWindow() time.Duration
- func (t S3Target) Validate() error
- func (t S3Target) ValidateLookup() error
- type S3TargetConfig
- type Store
- type StoreConfig
- type StreamEntry
- type WriteOption
- type WriteResult
- type Writer
- func (s *Writer[T]) GroupByPartition(records []T) []HivePartition[T]
- func (w *Writer[T]) LookupCommit(ctx context.Context, partition, token string) (wr WriteResult, ok bool, err error)
- func (w *Writer[T]) PartitionKey(rec T) string
- func (w *Writer[T]) RestampRef(ctx context.Context, prior *WriteResult) (result *WriteResult, err error)
- func (w *Writer[T]) Target() S3Target
- func (s *Writer[T]) Write(ctx context.Context, records []T, opts ...WriteOption) (results []WriteResult, err error)
- func (s *Writer[T]) WriteWithKey(ctx context.Context, key string, records []T, opts ...WriteOption) (result *WriteResult, err error)
- type WriterConfig
Constants ¶
const CommitTimeoutAdvisory = 8 * time.Second
CommitTimeoutAdvisory is the soft floor below which loadTimingConfig emits a slog.Warn at construction. It bounds the retry envelope for the two PUTs that participate in the SettleWindow contract (ref + token-commit): each PUT can retry up to retryMaxAttempts (5) with jittered backoffs drawn from 100-300 / 300-500 / 500-800 / 500-800 ms (worst-case sum 2.4 s sleep per call), so two PUTs in worst case spend ~4.8 s in retry sleep alone, plus actual request time. 8 s gives ~1.67× safety on that retry envelope. Operators on tightly- clocked dev/test deployments can set CommitTimeout below this and ignore the warning; production deployments should size CommitTimeout above the advisory so transient S3 retries cannot cause the SettleWindow contract to be missed.
const CommitTimeoutFloor = time.Millisecond
CommitTimeoutFloor is the lower bound enforced by loadDurationConfig: values strictly less than this are rejected with a "below the floor" error. Set to 1 ms — strictly positive (zero is rejected as "would cause every write to exceed the timeout") and small enough that test deployments can pick sub-second values without further plumbing. Production deployments should pick a value well above CommitTimeoutAdvisory: loadTimingConfig emits a slog.Warn when the configured value is below the advisory floor (the retry envelope of the ref-PUT + token-commit-PUT pair).
The historical 1 s floor existed because HTTP-date `Last-Modified` is second-precision and the dropped timeliness check could not resolve sub-second gaps; with LastModified out of the protocol, the writer's local elapsed bound is honest at any strictly positive value.
const MaxClockSkewFloor = time.Duration(0)
MaxClockSkewFloor is the minimum MaxClockSkew value the library accepts. Zero is a valid claim on tightly-clocked deployments (NTP-synced nodes typically run within milliseconds of each other); negative would be incoherent.
Variables ¶
var ErrCommitAfterTimeout = errors.New(
"write committed after CommitTimeout")
ErrCommitAfterTimeout is wrapped into the error returned by WriteWithKey / Write when the elapsed wall-clock from refMicroTs (captured just before the ref PUT) to token-commit-PUT completion exceeded the configured CommitTimeout. The write IS durable — data file, ref, and `<token>.commit` are all in S3, and snapshot reads (Read / ReadIter / ReadPartitionIter / MaterializedViewReader.Lookup / BackfillMaterializedView) see it — but a stream reader's SettleWindow (= CommitTimeout + MaxClockSkew) may already have advanced past refMicroTs by the time the commit became observable, so a stream reader past that offset will never re-visit it.
The returned *WriteResult is non-nil and reflects the durable commit; snapshot consumers can ignore the error. Stream consumers that need observability inside the SettleWindow should call RestampRef with the returned WriteResult — that writes an additional ref with a fresh refMicroTs so the next poll picks it up. Same-token retry of WriteWithKey does NOT recover stream observability: its upfront HEAD finds the prior commit and returns the original WriteResult unchanged without writing any new ref.
Use errors.Is(err, ErrCommitAfterTimeout) to distinguish from transient PUT failures (which return a nil *WriteResult).
Functions ¶
func PartitionKeysOf ¶ added in v0.20.0
func PartitionKeysOf(entries []StreamEntry) []string
PartitionKeysOf extracts the distinct Hive partition keys from a slice of StreamEntries, returned in lex-ascending order. Useful for cross-store intersection workflows where each Store's entries must be filtered to a common partition set before passing to ReadEntriesIter / ReadPartitionEntriesIter:
entriesA, _ := storeA.PollRange(ctx, since, until)
entriesB, _ := storeB.PollRange(ctx, since, until)
common := intersect(
PartitionKeysOf(entriesA), PartitionKeysOf(entriesB),
)
// Filter each Store's entries by `common`, then pass to
// the matching Store's ReadEntriesIter.
Empty input returns nil. Returned slice is freshly allocated; the caller may mutate it without affecting `entries`.
Types ¶
type BackfillStats ¶ added in v0.2.0
BackfillStats reports the work BackfillMaterializedView did: how many parquet objects it scanned, how many records it decoded, and how many marker PUTs it issued. Markers is per-object, not globally deduplicated — a marker path produced by N parquet files is counted N times (reflects S3 request cost, not unique marker count). Useful for progress logging in a migration job.
func BackfillMaterializedView ¶ added in v0.25.0
func BackfillMaterializedView[T any]( ctx context.Context, target S3Target, def MaterializedViewDef[T], keyPatterns []string, until time.Time, ) (stats BackfillStats, err error)
BackfillMaterializedView scans existing parquet data and writes view markers for every record already present. The normal path is to wire materialized views via WriterConfig.MaterializedViews / Config.MaterializedViews before the first Write; BackfillMaterializedView is the relief valve for records written before the view existed.
keyPatterns use the grammar from validateKeyPattern, evaluated against target.PartitionKeyParts() (NOT the view's Columns) — backfill walks parquet data files, which are keyed by partition. "*" covers everything; shard across partitions to parallelize a migration. Overlapping patterns are deduplicated, so each parquet file is scanned at most once.
until is an exclusive upper bound on data-file LastModified. Typical use: until = deployTime_of_live_writer, so backfill covers historical gaps while the live writer covers everything from deploy onward. Pass time.Time{} (the zero value) to cover every file currently present (redundant with the live writer but harmless — PUT is idempotent).
On a data-file GET returning NoSuchKey (LIST-to-GET race, operator-driven prune, or lifecycle deletion), the file is skipped: a slog.Warn at level WARN names the path, the s3store.read.missing_data counter is incremented with method=backfill, and the backfill continues. Failing the whole backfill on one missing file would force a full restart of a long-running operator job.
Safe to run concurrently with a live writer (S3 PUT is idempotent) and safe to retry after a crash. Empty patterns slice is a no-op: (BackfillStats{}, nil). First malformed pattern fails with its index.
type CompressionCodec ¶ added in v0.4.0
type CompressionCodec string
CompressionCodec selects the parquet-level compression applied to every column on Write. String-valued for easy config/YAML wiring; mapped to the parquet-go codec at Store construction time. Zero value ("") resolves to snappy — the de-facto ecosystem default (Spark, Trino, Athena all emit snappy unless told otherwise).
const ( // CompressionSnappy: fast encode/decode, ~2-3× ratio, // negligible CPU overhead. Default. CompressionSnappy CompressionCodec = "snappy" // CompressionZstd: better ratios than snappy at higher CPU // cost. Good for cold / archive data. CompressionZstd CompressionCodec = "zstd" // CompressionGzip: widely compatible, moderate CPU, decent // ratio. Mostly a legacy choice today. CompressionGzip CompressionCodec = "gzip" // CompressionUncompressed: no compression. Largest files; // only meaningful when the data is already high-entropy or // the CPU tradeoff matters more than S3 cost. CompressionUncompressed CompressionCodec = "uncompressed" )
type ConsistencyLevel ¶ added in v0.14.0
type ConsistencyLevel string
ConsistencyLevel is the value passed in the Consistency-Control HTTP header on S3 requests that require stronger-than-default consistency. NetApp StorageGRID defines the header; AWS S3 and MinIO ignore it. Declared as a string-alias enum so typos are caught at compile time (via the named constants) while leaving room for backends that add new levels without a library update (ConsistencyLevel("future-level") still compiles and passes through verbatim).
Configured on S3TargetConfig.ConsistencyControl, applied uniformly to every correctness-critical S3 call routed through the target — data PUTs (idempotent and unconditional), ref PUTs, matview marker PUTs, GETs, HEADs, and every LIST (partition LIST, marker LIST, ref-stream LIST in Poll, scoped retry-LIST in findExistingRef). Setting it on the target rather than per-config struct enforces NetApp's "same consistency for paired operations" rule by construction. See the README's "StorageGRID consistency" section for the full matrix.
const ( // ConsistencyDefault is the zero value: no header sent. The // bucket's default consistency applies. Correct on AWS S3 and // MinIO (strongly consistent out of the box); the per-op // opt-in for StorageGRID is one of the stronger levels below. ConsistencyDefault ConsistencyLevel = "" // ConsistencyAll requires every storage node to acknowledge. // Highest durability / visibility guarantee; highest latency. ConsistencyAll ConsistencyLevel = "all" // ConsistencyStrongGlobal ensures read-after-write and list- // after-write consistency across every site. Safe multi-site // choice for idempotency correctness. ConsistencyStrongGlobal ConsistencyLevel = "strong-global" // ConsistencyStrongSite ensures read-after-write and list- // after-write consistency within a single site. Cheaper than // ConsistencyStrongGlobal when the deployment is single-site // or all readers/writers co-locate. ConsistencyStrongSite ConsistencyLevel = "strong-site" // ConsistencyReadAfterNewWrite is StorageGRID's default: GET // of a newly-PUT object is strongly consistent, but LIST and // overwrite-PUT visibility is eventual. Insufficient for // Phase 3's scoped-LIST ref dedup — pick a stronger level if // idempotency guarantees matter on StorageGRID. ConsistencyReadAfterNewWrite ConsistencyLevel = "read-after-new-write" // ConsistencyAvailable is the weakest level: any replica may // answer. Use only when availability trumps correctness. ConsistencyAvailable ConsistencyLevel = "available" )
type HivePartition ¶ added in v0.20.0
HivePartition is the canonical (Key, Rows) value type for per-partition reads and writes. Key is the partition-key string ("period=X/customer=Y") matching what WriteWithKey takes as input.
Read-side: ReadPartitionIter / ReadPartitionRangeIter yield one HivePartition per Hive partition with Rows already sort+dedup'd (per-partition latest-per-entity by default; opt out with WithHistory for replica-only collapse). Yield order is lex-ascending by Key. Within Rows, order depends on dedup configuration: when EntityKeyOf is set, records are in (entity, version) ascending order under both dedup paths (the WithHistory replica path still sorts by the same comparator). When EntityKeyOf is nil (no dedup configured), records emit in decode order: file lex order, then parquet row order within each file.
Write-side: GroupByPartition returns []HivePartition[T] lex-ordered by Key, with per-partition Rows in input (insertion) order. No sort or dedup runs on the write side — the writer doesn't have an EntityKeyOf to compare on.
Both sides share the deterministic-emission contract: same input produces byte-identical HivePartition slices across calls. See "Deterministic emission order across read and write paths" in CLAUDE.md.
type Layout ¶ added in v0.14.0
type Layout struct {
// Time is the layout string passed to time.Time.Format for
// any column matching a time.Time field on T. Examples:
// time.RFC3339, "2006-01-02" (date-only), "2006-01" (month).
// Empty + a time column on T → error at NewWriter.
//
// time.Time.Format uses the value's own zone; call .UTC()
// upstream if you want zone-stable keys.
Time string
}
Layout configures auto-projection formatting for non-string fields on T. Each field is the layout for one type family; empty means "auto-projection refuses this type — set the field, or write Of explicitly."
Layout choices are part of the marker S3 key — once published, changing the layout orphans every prior marker. Pick a stable format up front (use BackfillMaterializedView if you ever need to migrate).
type MaterializedViewDef ¶ added in v0.25.0
type MaterializedViewDef[T any] struct { // Name identifies the view under the target's // <Prefix>/_matview/ subtree. Required. Must be non-empty, // free of '/', and unique across MaterializedViewDef[T] entries // on the same writer. Name string // Columns lists the view's column names in the order they // appear in the S3 path. Same ordering rules as // MaterializedViewLookupDef.Columns (most-selective first for // LIST pruning). Columns []string // Of extracts the per-record column values, in Columns order, // as a slice. Returning (nil, nil) skips the record (no marker // emitted). A non-nil values slice must have length equal to // len(Columns); each entry must satisfy // validateHivePartitionValue. // // Of is optional: nil → reflection. The library walks T's // parquet-tagged fields and, for every Columns entry, picks // the matching field. String fields project directly; // time.Time fields require Layout.Time to be set. T may // carry parquet tags not in Columns — those are ignored. // // Provide a custom Of when the auto-projection isn't enough // (per-column time formats, derived values, computed strings): // // Of: func(r Record) ([]string, error) { // return []string{r.SKU, r.At.Format(time.RFC3339)}, nil // } Of func(T) ([]string, error) // Layout configures how non-string parquet-tagged fields on T // are formatted into path segments when Of is nil. Honored // only by the auto-projection path; an explicit Of bypasses // Layout entirely. Layout Layout }
MaterializedViewDef is the write-side definition of a materialized view. Pass a slice of these on WriterConfig.MaterializedViews / Config.MaterializedViews at construction; the writer iterates each registered view per record, builds the marker S3 key from Columns + Of's return, and PUTs one empty marker per distinct key.
The previous late-binding RegisterMaterializedView API is gone: views are part of the writer's construction, so registration cannot race with Write and "registered after first Write" is not a reachable state.
type MaterializedViewLookupDef ¶ added in v0.25.0
type MaterializedViewLookupDef[K comparable] struct { // Name identifies the view under the target's // <Prefix>/_matview/ subtree. Required. Must be non-empty // and free of '/'. Name string // Columns lists the view's column names in the order // they appear in the S3 path. Earlier columns form a narrower // LIST prefix when Lookup specifies them literally. Pick the // order based on how queries filter: most-selective first. Columns []string // From projects a Lookup-result values slice (one entry per // Columns position, in declared order) onto a typed K. Nil → // reflection: every parquet-tagged field on K must match // exactly one Columns entry. String fields are set directly; // time.Time fields are parsed via time.Parse(Layout.Time, ...). // Use a custom From when K has no parquet tags or when extra // validation is needed. // // values length is guaranteed to equal len(Columns); custom // From implementations may rely on positional access (e.g. // values[0] for Columns[0]). // // Errors are propagated by Lookup with the view name as // wrapping context. From func(values []string) (K, error) // Layout configures how non-string parquet-tagged fields on K // are parsed from path segments when From is nil. Honored // only by the auto-binding path; an explicit From bypasses // Layout entirely. // // For correctness, Layout.Time on the read side MUST match // MaterializedViewDef.Layout.Time on the write side — drift // produces silently wrong values or parse errors. Define one // constant and reuse it on both sides. Layout Layout }
MaterializedViewLookupDef is the read-side definition of a materialized view. Build a MaterializedViewReader[K] from it via NewMaterializedViewReader when a service queries an existing materialized view (dashboard, query API).
Name + Columns identify the view in S3; From projects the per-marker column values back into a typed K. From is optional — when nil, the library reflects K's parquet-tagged string fields against Columns and assembles K from the values slice. Provide a custom From for K's that have no parquet tags or when extra validation/transformation is needed.
type MaterializedViewReader ¶ added in v0.25.0
type MaterializedViewReader[K comparable] struct { // contains filtered or unexported fields }
MaterializedViewReader is the typed read-handle for a materialized view. Build via NewMaterializedViewReader from a S3Target + MaterializedViewLookupDef[K]. Lookup issues LIST only — no parquet reads.
func NewMaterializedViewReader ¶ added in v0.25.0
func NewMaterializedViewReader[K comparable]( target S3Target, def MaterializedViewLookupDef[K], ) (*MaterializedViewReader[K], error)
NewMaterializedViewReader builds a query handle for a materialized view. Validates Name + Columns and, when def.From is nil, K's parquet tags against Columns. With a custom From, only the def-level fields are validated — the caller is responsible for producing valid K's.
func (*MaterializedViewReader[K]) Lookup ¶ added in v0.25.0
func (i *MaterializedViewReader[K]) Lookup( ctx context.Context, patterns []string, ) (out []K, err error)
Lookup returns every K whose marker matches any of the given key patterns. Patterns use the grammar from validateKeyPattern, evaluated against Columns. Pass multiple patterns when the target set isn't a Cartesian product (e.g. (sku=A, customer=X) and (sku=B, customer=Y) but not the off-diagonal pairs); overlapping patterns are deduplicated at the marker-key level, so each distinct marker contributes at most one entry. With the default reflection binder (or any non-lossy custom From) that means no duplicate K's. A custom From that drops a column — e.g. Columns=[sku, customer] but K holds only SKU — collapses distinct markers to equal K's; the returned slice will contain duplicates in that case.
Read-after-write: the marker LIST inherits the target's ConsistencyControl, so every marker the writer has already published is visible. Unlike Poll there is no SettleWindow filter.
Results are unbounded — narrow the patterns if a view has millions of matching markers. Empty patterns slice returns (nil, nil); a malformed pattern fails with the offending view.
type Offset ¶
type Offset string
Offset represents a position in the stream. Use the empty string Offset("") as the unbounded sentinel: as `since` it means stream head; as the upper bound (via WithUntilOffset) it means walk to the live tip (now - SettleWindow) as of the call. To keep up with new writes, call again from the last offset. ReadRangeIter takes time.Time bounds and uses time.Time{} as its own unbounded sentinel.
type PollOption ¶ added in v0.14.0
type PollOption func(*pollOpts)
PollOption configures Poll / PollRecords. Separate from ReadOption (which serves the snapshot read paths) so each option type only carries knobs its read path actually honours — no "ignored on this path" footguns.
func WithUntilOffset ¶
func WithUntilOffset(until Offset) PollOption
WithUntilOffset bounds Poll / PollRecords from above: only entries with offset < until are returned (half-open range). Pair with Reader.OffsetAt to read records in a time window. Zero-value offset disables the bound.
type ReadOption ¶ added in v0.18.0
type ReadOption func(*readOpts)
ReadOption configures read-path behavior. Shared across snapshot reads (Read / ReadIter / ReadRangeIter); Poll and PollRecords have their own PollOption type — the option spaces don't overlap.
func WithHistory ¶
func WithHistory() ReadOption
WithHistory disables latest-per-key deduplication on any read path. Without it, reads are deduped by EntityKeyOf + VersionOf (latest per entity per partition); with it, every version of every record is returned.
When no dedup rule is configured (EntityKeyOf or VersionOf nil), dedup is a no-op regardless of this option.
func WithReadAheadBytes ¶ added in v0.14.0
func WithReadAheadBytes(n int64) ReadOption
WithReadAheadBytes caps the cumulative uncompressed parquet bytes that may sit decoded in the ReadIter / ReadRangeIter pipeline ahead of the current yield position. Zero (default) disables the cap; only WithReadAheadPartitions binds.
Composes with WithReadAheadPartitions — both are evaluated and whichever cap binds first holds the producer back. Useful when partition sizes are skewed: a tiny WithReadAheadPartitions(1) is too conservative for many small partitions but a larger value risks OOM on a few large ones; a byte cap auto-tunes across both.
The byte total is read from each parquet file's footer (total_uncompressed_size summed across row groups), so the cap is exact, not a heuristic. Decoded Go memory typically runs 1–2× the uncompressed size depending on data shape.
Per-partition guarantee: if a single partition's uncompressed size exceeds the cap, that one partition still decodes (the cap can't be enforced below the partition granularity without row-group-level streaming). The cap only prevents *additional* partitions from joining the buffer.
func WithReadAheadPartitions ¶ added in v0.11.0
func WithReadAheadPartitions(n int) ReadOption
WithReadAheadPartitions tells ReadIter / ReadRangeIter how many partitions to buffer ahead of the current yield position. Default (option not supplied) is 1 — minimum useful lookahead so decode of partition N+1 overlaps yield of partition N. Pass a larger value for more aggressive prefetch on consumers that do non-trivial per-record work; combine with WithReadAheadBytes to bound stacking of skewed-size partitions.
n=0 is the explicit-no-buffer mode: unbuffered handoff between decoder and yield loop. The decoder still works on partition N+1 concurrent with yield emitting N (the handoff just blocks the decoder briefly), but never two decoded partitions sit in memory at once. Useful when records are large and the consumer's per-record work is fast — the byte cap is then the only memory regulator.
Negative values are floored to 0.
Each buffered partition holds its decoded records in memory until the yield loop consumes them. Memory: O((n+1) partitions) — current + n prefetched.
type Reader ¶ added in v0.14.0
type Reader[T any] struct { // contains filtered or unexported fields }
Reader is the read-side half of a Store. Owns Read / ReadIter / ReadRangeIter / Poll / PollRecords / OffsetAt. Construct directly via NewReader in read-only services, or via NewReaderFromWriter / NewReaderFromStore for a (possibly narrower-T) Reader over a Writer's / Store's data.
func NewReader ¶ added in v0.14.0
func NewReader[T any](cfg ReaderConfig[T]) (*Reader[T], error)
NewReader constructs a Reader directly from ReaderConfig. Intended for read-only services that have no write-side config to supply (no PartitionKeyOf / Compression).
Validates EntityKeyOf and VersionOf are both set or both nil.
func NewReaderFromStore ¶ added in v0.14.0
func NewReaderFromStore[T, U any]( s *Store[U], cfg ReaderConfig[T], ) (*Reader[T], error)
NewReaderFromStore is NewReaderFromWriter with a Store[U] instead of a Writer[U] — extracts the embedded Writer and forwards. Common shape: one Store writes FullRec, a few NewReaderFromStore calls produce narrow Readers for hot-path reads without respecifying the shared config.
func NewReaderFromWriter ¶ added in v0.14.0
func NewReaderFromWriter[T, U any]( w *Writer[U], cfg ReaderConfig[T], ) (*Reader[T], error)
NewReaderFromWriter constructs a Reader[T] over the data a Writer[U] produces. T may equal U (same-shape read) or differ from U — typically a narrower struct that omits heavy write-only columns (parquet-go skips unlisted columns on decode). The Writer's Target overrides whatever Target cfg carries (or doesn't), so the resulting Reader inherits the writer's ConsistencyControl automatically; read-side knobs (EntityKeyOf, VersionOf) come from cfg.
Dedup closures (EntityKeyOf / VersionOf) on the Writer are typed over U and cannot be auto-transformed to T; the caller re-declares them in cfg when dedup is needed.
func (*Reader[T]) OffsetAt ¶ added in v0.14.0
OffsetAt returns the stream offset corresponding to wall-clock time t: any ref written at or after t sorts >= the returned offset, any ref written before t sorts <. Pure computation — no S3 call. Pair with WithUntilOffset on Poll / PollRecords to read records within a time window — or use ReadRangeIter, which takes time.Time bounds directly.
func (*Reader[T]) Poll ¶ added in v0.14.0
func (s *Reader[T]) Poll( ctx context.Context, since Offset, maxEntries int32, opts ...PollOption, ) (out []StreamEntry, nextOffset Offset, err error)
Poll returns up to maxEntries stream entries (refs only) after the given offset, capped at now - SettleWindow to avoid races with in-flight writes. One LIST call against the ref stream plus one HEAD per ref against `<token>.commit` (collapsed by a per-poll cache when refs share a token); no parquet GETs. Returns (entries, nextOffset, error); checkpoint nextOffset and pass it as `since` on the next call.
Refs whose `<token>.commit` is missing (404) are skipped: by the time the ref clears the SettleWindow cutoff, the writer has either committed (200) or returned an error to its caller (no commit will land). Refs whose commit's `attemptid` doesn't match the ref's attempt-id are also skipped — they're orphans from a failed-mid-write retry under the same token.
Pass WithUntilOffset to bound the walk from above so long streams aren't scanned past the window of interest.
nextOffset advances over every ref the LIST visits, including ones the gate skips. Once a ref's refMicroTs is past the SettleWindow cutoff, its commit outcome is final — re-walking it on the next poll wouldn't surface anything new.
func (*Reader[T]) PollRange ¶ added in v0.20.0
func (s *Reader[T]) PollRange( ctx context.Context, since, until time.Time, ) ([]StreamEntry, error)
PollRange drains the ref stream over the [since, until) wall-clock window in one call, returning every StreamEntry gated by the commit-marker (same gating Poll applies). Pages internally at s3ListMaxKeys; one Poll per page until exhausted.
Zero time.Time on either bound means unbounded: since=zero starts at the stream head, until=zero walks to the live tip (now - SettleWindow, captured at call entry so the upper bound stays stable under concurrent writes). Same time-bound semantics as ReadRangeIter.
Use to enumerate refs / partitions before deciding scope, intersect partition sets across Stores for a filtered zip, or feed metadata into custom decode workflows. Each StreamEntry carries the partition Key, DataPath, RefPath, InsertedAt, and RowCount — everything a caller needs to inspect or pass downstream.
Memory: O(refs in range × StreamEntry size). For huge ranges chunk by smaller windows or use ReadRangeIter (streaming) / PollRecords (cursor-based) instead.
Order: refs in time order (refMicroTs lex-ascending, same as Poll). Partition emission order on snapshot reads is lex-ascending by partition Key — re-group via HivePartition.Key if you want partition-grouped output.
func (*Reader[T]) PollRecords ¶ added in v0.14.0
func (s *Reader[T]) PollRecords( ctx context.Context, since Offset, maxEntries int32, opts ...PollOption, ) (out []T, nextOffset Offset, err error)
PollRecords returns typed records from the files referenced by up to maxEntries refs after the offset, plus the next offset for checkpointing. Cursor-based, CDC-style: caller resumes from the returned offset on the next call.
Replica-dedup only: records sharing (entity, version) collapse to one (rare retries / zombies); distinct versions of the same entity all flow through. Latest-per-entity dedup is NOT offered — meaningless on a cursor since the next batch may carry a newer version of the same entity. For latest-per-entity, use Read or ReadRangeIter (snapshot-style).
Records emit in partition-lex order, then per-partition (entity, version) order within each. Cross-partition temporal ordering within a batch is NOT preserved — refs are still walked in time order to advance nextOffset, but record decode runs through the same per-partition pipeline as ReadIter / ReadRangeIter, so consumers needing wall-clock ordering across partitions must re-sort by their own timestamp field. The "don't miss records" property (correct nextOffset advancement) is unaffected.
Per-partition replica-dedup precondition: EntityKeyOf must be fully determined by the partition key — same as ReadIter. Replicas of the same (entity, version) always live in the same Hive partition under the precondition, so per-partition replica dedup is correctness-equivalent to global replica dedup.
func (*Reader[T]) Read ¶ added in v0.14.0
func (s *Reader[T]) Read( ctx context.Context, keyPatterns []string, opts ...ReadOption, ) (out []T, err error)
Read returns all records whose data files match any of the given key patterns. Patterns use the grammar described in validateKeyPattern; pass multiple when the target set isn't a Cartesian product (e.g. (period=A, customer=X) and (period=B, customer=Y) but not the off-diagonal pairs).
When EntityKeyOf and VersionOf are configured, the result is deduplicated per Hive partition to the latest version per entity (pass WithHistory to opt out). Correctness requires EntityKeyOf to be fully determined by the partition key so no entity ever spans partitions — same precondition as ReadIter. Overlapping patterns are safe — each parquet file is fetched and decoded at most once.
Records emit in partition-lex order with per-partition (entity, version) order within each. All records are buffered before return — for unbounded reads, use ReadIter or ReadPartitionIter instead. Empty patterns slice returns (nil, nil); a malformed pattern fails with the offending index.
On NoSuchKey: Read fails (LIST-to-GET race is rare enough that surfacing it as an error is more honest than silently skipping, and the caller's retry resolves it).
func (*Reader[T]) ReadEntriesIter ¶ added in v0.20.0
func (s *Reader[T]) ReadEntriesIter( ctx context.Context, entries []StreamEntry, opts ...ReadOption, ) iter.Seq2[T, error]
ReadEntriesIter streams records from a pre-resolved []StreamEntry — typically the output of PollRange or Poll — as an iter.Seq2[T, error]. Skips the LIST + commit-gate phase that ReadIter / ReadRangeIter do internally; useful when a caller already enumerated refs (for inspection, filtering, or cross-store coordination) and wants to decode them without paying the resolution cost a second time.
Same per-partition dedup, byte-budget, and read-ahead semantics as ReadRangeIter. Tolerant of NoSuchKey: an operator-driven prune between resolution and decode is logged + counted via s3store.read.missing_data and the affected file is skipped, so the consumer keeps advancing.
Cross-store safety: the iter validates upfront that every entry's DataPath belongs to this Reader's prefix (<prefix>/data/...). Passing entries from a different Store yields a wrapped error before any S3 traffic — entries cannot be silently mis-routed across Stores. Cross-bucket misuse (same prefix, different bucket) bypasses this check but fails loudly via NoSuchKey because the receiving bucket doesn't have those exact keys (UUIDv7 in the data path makes accidental hits effectively impossible).
Empty entries slice yields nothing without error. Records emit in partition-lex order (the entries are re-grouped into the same per-partition pipeline ReadIter / ReadRangeIter use).
func (*Reader[T]) ReadIter ¶ added in v0.14.0
func (s *Reader[T]) ReadIter( ctx context.Context, keyPatterns []string, opts ...ReadOption, ) iter.Seq2[T, error]
ReadIter returns an iter.Seq2[T, error] yielding records one at a time, streaming partition-by-partition. Use when Read's O(records) memory is a problem.
Dedup is per-partition (uniform across every read path now): correct only when the partition key strictly determines every component of EntityKeyOf so no entity ever spans partitions. For layouts that don't satisfy this invariant, pass WithHistory and dedup yourself.
Partitions emit in lex order. Within a partition, record order depends on dedup configuration: when EntityKeyOf is set, records are in (entity, version) ascending order — last-wins on tied versions for default dedup, first-wins per (entity, version) group for WithHistory replica dedup. When EntityKeyOf is nil (no dedup configured), records emit in decode order: file lex order, then parquet row order within each file.
Memory: O(one partition's records) by default. Tune with WithReadAheadPartitions (default 1; overlap decode of N+1 with yield of N) and/or WithReadAheadBytes (uncompressed-size cap).
Breaking out of the for-range loop cancels in-flight downloads — no manual Close. Empty patterns slice yields nothing; a malformed pattern surfaces as the iter's first error.
func (*Reader[T]) ReadPartitionEntriesIter ¶ added in v0.20.0
func (s *Reader[T]) ReadPartitionEntriesIter( ctx context.Context, entries []StreamEntry, opts ...ReadOption, ) iter.Seq2[HivePartition[T], error]
ReadPartitionEntriesIter is the partition-grouped variant of ReadEntriesIter: streams records from a pre-resolved []StreamEntry as one HivePartition[T] per Hive partition.
Use when the consumer already has the entries (typically from PollRange) AND wants per-partition batches for joins, group-bys, or zip-by-partition workflows across multiple Stores.
Same per-partition dedup, byte-budget, read-ahead, tolerant- NoSuchKey semantics as ReadPartitionRangeIter. Same upfront cross-store guard as ReadEntriesIter — every entry's DataPath must belong to this Reader's prefix or the iter yields a wrapped error before any S3 traffic.
Empty entries slice yields nothing without error. Partitions emit in lex order of HivePartition.Key.
func (*Reader[T]) ReadPartitionIter ¶ added in v0.20.0
func (s *Reader[T]) ReadPartitionIter( ctx context.Context, keyPatterns []string, opts ...ReadOption, ) iter.Seq2[HivePartition[T], error]
ReadPartitionIter is the partition-grouped variant of ReadIter: instead of yielding records one at a time, it yields one HivePartition[T] per Hive partition with all of that partition's records collected as Rows.
Use when the consumer needs to process a partition's records as a batch — joins, group-bys, per-partition aggregates — without re-grouping a flat record stream by hand.
Dedup, byte-budget, read-ahead, and missing-data semantics are identical to ReadIter:
- Per-partition dedup: correct only when the partition key strictly determines every component of EntityKeyOf so no entity ever spans partitions. Use WithHistory to opt out (replica-only collapse).
- Memory bounded by WithReadAheadPartitions / WithReadAheadBytes.
- NoSuchKey on a data file is strict (mirrors ReadIter): the iter surfaces the wrapped error so the caller's retry can resolve the LIST-to-GET race.
Breaking out of the for-range loop cancels in-flight downloads. Empty patterns slice yields nothing; a malformed pattern surfaces as the iter's first error.
func (*Reader[T]) ReadPartitionRangeIter ¶ added in v0.20.0
func (s *Reader[T]) ReadPartitionRangeIter( ctx context.Context, since, until time.Time, opts ...ReadOption, ) iter.Seq2[HivePartition[T], error]
ReadPartitionRangeIter is the partition-grouped variant of ReadRangeIter: streams every record written in the [since, until) time window as one HivePartition[T] per Hive partition.
Same time-window resolution as ReadRangeIter: zero time.Time on either bound means unbounded — since=zero starts at the stream head, until=zero walks to the live tip (now - SettleWindow, captured at call entry so the upper bound stays stable under concurrent writes).
Same per-partition dedup as ReadRangeIter (default latest-per-entity per partition; pass WithHistory to opt out for replica-only). Memory bounded by WithReadAheadPartitions / WithReadAheadBytes.
Tolerant of NoSuchKey on data files (mirrors ReadRangeIter): missing files are skipped with a slog.Warn + missing-data counter so the consumer can keep advancing — operator-driven prunes leave refs pointing at deleted files and a strict failure here can't be resolved by a caller retry.
Does NOT expose per-batch offsets; consumer aborts cannot safely resume. Use PollRecords + manual partition grouping when you need to checkpoint between batches.
func (*Reader[T]) ReadRangeIter ¶ added in v0.14.0
func (s *Reader[T]) ReadRangeIter( ctx context.Context, since, until time.Time, opts ...ReadOption, ) iter.Seq2[T, error]
ReadRangeIter streams every record written in the [since, until) time window as an iter.Seq2[T, error]. Snapshot view of the ref stream over a wall-clock range — no offset cursor, no resume.
Zero time.Time on either bound means unbounded: since=zero starts at the stream head, until=zero walks to the live tip (now - SettleWindow, captured at call entry so the upper bound stays stable under concurrent writes). Pair with non-zero values for time-windowed reads:
start := time.Date(2026, 4, 17, 0, 0, 0, 0, time.UTC)
end := time.Date(2026, 4, 18, 0, 0, 0, 0, time.UTC)
for r, err := range store.ReadRangeIter(ctx, start, end) { ... }
Same per-partition dedup as ReadIter (default latest-per-entity per partition; pass WithHistory to opt out). Memory bounded by WithReadAheadPartitions / WithReadAheadBytes.
The ref-LIST runs upfront before the first record yields — usually sub-100ms but huge windows can take seconds; chunk via since/until. Breaking out of the loop cancels in-flight downloads. Errors are yielded as (zero, err) and terminate the iter.
Does NOT expose per-batch offsets — consumer aborts cannot safely resume. Use PollRecords (Kafka-style cursor) when you need to checkpoint between batches.
type ReaderConfig ¶ added in v0.14.0
type ReaderConfig[T any] struct { Target S3Target EntityKeyOf func(T) string VersionOf func(T) int64 }
ReaderConfig is the narrower Config form for constructing a Reader directly. Holds the S3-wiring bundle (Target — a constructed S3Target) plus read-side-only knobs. Use NewReader(cfg) in read-only services that have no PartitionKeyOf / Compression config to supply.
Target is built once via NewS3Target and can be passed to both WriterConfig.Target and ReaderConfig.Target so the resulting Writer and Reader share the same MaxInflightRequests semaphore.
Dedup contract: EntityKeyOf and VersionOf are both set or both nil — NewReader rejects partial configurations. When set, records dedup to the latest-per-entity by VersionOf; when nil, every record flows through.
type S3Target ¶ added in v0.5.0
type S3Target struct {
// contains filtered or unexported fields
}
S3Target is the constructed live handle to an s3parquet dataset. Built once from an S3TargetConfig via NewS3Target; all S3 operations go through this type so the per-target MaxInflightRequests semaphore caps net in-flight requests regardless of fan-out axis.
Pass the same S3Target value to WriterConfig.Target and ReaderConfig.Target so the Writer and Reader share the same semaphore. The struct is copied by value but every field is a reference type (chan, pointer, slice header) so copies share the underlying state.
Fields are unexported and immutable after construction: callers read via the accessor methods. Re-construct with a fresh NewS3Target if you need to change MaxInflightRequests or any other field.
func NewS3Target ¶ added in v0.5.0
func NewS3Target(ctx context.Context, cfg S3TargetConfig) (S3Target, error)
NewS3Target constructs a live S3Target from config. Calls cfg.ValidateLookup (Bucket, Prefix, S3Client) up-front so the GETs that follow have the wiring they need, then GETs the persisted timing-config objects at <Prefix>/_config/commit-timeout and <Prefix>/_config/max-clock-skew, parses each as a Go time.Duration string, and rejects values below the configured floors (CommitTimeoutFloor = 0, MaxClockSkewFloor = 0 — only negatives are nonsensical) before stamping the values (and the derived SettleWindow) on the Target. Construction fails when ValidateLookup fails or when either object is missing, unparseable, or negative — operators must seed the dataset's prefix before any process can construct a Target against it (see README's "Initializing a new dataset").
Does not call Validate (PartitionKeyParts) — that's a Writer / Reader concern and is checked by NewWriter / NewReader, not by every Target consumer (NewMaterializedViewReader is read-only and doesn't need PartitionKeyParts).
Logs a warning when ConsistencyControl is non-empty but doesn't match a named ConsistencyLevel constant — typo guard with no effect on behaviour (the value is still sent verbatim).
func (S3Target) CommitTimeout ¶ added in v0.15.0
CommitTimeout returns the value resolved at construction time from <Prefix>/_config/commit-timeout. Pure accessor — no I/O. Bounds the writer's ref-PUT-to-token-commit-completion budget for one Write: past CommitTimeout (measured from refMicroTs, captured just before the ref PUT, to the moment the token-commit PUT returns), the s3store.write.commit_after_timeout counter increments and Write returns an error (the commit still lands; the metric flags that the reader's SettleWindow tuned for this CommitTimeout may not yet have included this write in the stream window). Pre-ref work — parquet encoding, marker PUTs, data PUT — is deliberately outside the budget; only the ref-LIST-visible → token-commit-visible window can put the SettleWindow contract at risk.
func (S3Target) Config ¶ added in v0.14.0
func (t S3Target) Config() S3TargetConfig
Config returns a copy of the S3TargetConfig the target was built from. Use for introspection or passing to tools that expect the config form.
func (S3Target) ConsistencyControl ¶ added in v0.14.0
func (t S3Target) ConsistencyControl() ConsistencyLevel
ConsistencyControl returns the configured Consistency-Control header value applied to every routed S3 call.
func (S3Target) EffectiveMaxInflightRequests ¶ added in v0.14.0
EffectiveMaxInflightRequests forwards to S3TargetConfig.EffectiveMaxInflightRequests.
func (S3Target) MaxClockSkew ¶ added in v0.15.0
MaxClockSkew returns the value resolved at construction time from <Prefix>/_config/max-clock-skew. Pure accessor — no I/O. Operator's assumed bound on writer↔reader wall-clock divergence (refMicroTs in the ref filename is writer-stamped, so this is the skew SettleWindow has to absorb). Consumed by the reader's refCutoff via SettleWindow.
func (S3Target) PartitionKeyParts ¶ added in v0.14.0
PartitionKeyParts returns the configured Hive-partition keys.
func (S3Target) SettleWindow ¶ added in v0.15.0
SettleWindow returns the derived sum CommitTimeout + MaxClockSkew. Used by Poll's `refCutoff = now - SettleWindow`. Sized so the cutoff cannot overtake refs whose token-commit is still being written: CommitTimeout bounds the writer's ref-PUT + token-commit-PUT envelope (measured from refMicroTs onward; pre-ref work is outside the budget by design), MaxClockSkew bounds the writer↔reader wall-clock divergence applied to the writer-stamped refMicroTs. Pure accessor — no I/O.
func (S3Target) ValidateLookup ¶ added in v0.14.0
ValidateLookup forwards to S3TargetConfig.ValidateLookup.
type S3TargetConfig ¶ added in v0.14.0
type S3TargetConfig struct {
// Bucket is the S3 bucket name.
Bucket string
// Prefix is the key prefix under which data/ref/matview files
// live. Must be non-empty — a bare bucket root would collide
// with any other tenant of the bucket.
Prefix string
// S3Client is the AWS SDK v2 client to use. Endpoint, region,
// credentials, and path-style setting are used as-is.
S3Client *s3.Client
// PartitionKeyParts declares the Hive-partition key segments in
// the order they appear in the S3 path. Read/Write key
// patterns are validated against this order.
PartitionKeyParts []string
// MaxInflightRequests caps the number of S3 requests a single
// constructed S3Target may have outstanding at once. Enforced
// by a semaphore inside S3Target — every PUT/GET/HEAD/LIST
// acquires one slot before issuing and releases on completion,
// so the cap holds across every fan-out axis (partitions,
// files, patterns, markers) without per-axis tuning.
//
// Zero → default (32). The cap is per S3Target: one Writer +
// one Reader sharing the same constructed S3Target share the
// cap; two Targets do not.
//
// The AWS SDK v2's default HTTP transport leaves
// MaxConnsPerHost unlimited (Go default 0) and sets
// MaxIdleConnsPerHost to 100, so this library cap is what
// bounds parallelism for stock-configured clients — no
// transport tuning is needed at the defaults. Only if you've
// explicitly set a non-zero MaxConnsPerHost on your
// *s3.Client's transport does it need to be >=
// MaxInflightRequests, otherwise excess requests queue at the
// transport layer instead of running in parallel.
MaxInflightRequests int
// ConsistencyControl sets the Consistency-Control HTTP header
// applied to every correctness-critical S3 operation routed
// through this target — data PUTs (per-attempt-path, never
// overwriting), ref PUTs, token-commit PUTs, matview marker
// PUTs, data / config GETs, the upfront-dedup HEAD on
// `<token>.commit`, and every LIST (partition LIST on the
// read path, matview-marker LIST in MaterializedViewReader.Lookup,
// ref-stream LIST in Poll/PollRecords/ReadRangeIter).
//
// Zero value (ConsistencyDefault) is substituted at
// construction with ConsistencyStrongGlobal — the safe
// multi-site choice that lets sequential same-token
// retries observe a prior token-commit overwrite without
// surfacing as a redundant PUT. Single-site deployments can
// downgrade to ConsistencyStrongSite explicitly when the
// per-call cost matters. AWS S3 and MinIO ignore the header
// entirely; the substitution is a no-op for those backends.
// See the README's "Consistency levels × S3 operations"
// section for the full matrix.
//
// Setting the level on the target rather than on the Writer /
// Reader / MaterializedView configs enforces NetApp's "same
// consistency for paired operations" rule by construction:
// every operation routed through this target uses one and the
// same value.
ConsistencyControl ConsistencyLevel
// MeterProvider, when set, supplies the OTel meter used to
// record S3 op latencies, library method durations, semaphore
// wait/inflight depth, fan-out spans, and bytes/records/files
// counters. Zero value falls back to otel.GetMeterProvider() —
// users who configure OTel globally pick up metrics without
// touching this field; users who don't get the OTel global
// no-op (zero overhead).
//
// Bucket and Prefix from this config plus ConsistencyControl
// (when non-empty) are baked into every observation as
// constant attributes (s3store.bucket / s3store.prefix /
// s3store.consistency_level), so multi-Target deployments
// can be distinguished without per-call overhead.
MeterProvider metric.MeterProvider
}
S3TargetConfig is the user-facing config for an s3parquet dataset — pure data, struct-literal-friendly. Convert to a live S3Target via NewS3Target before passing to a Writer/Reader/ MaterializedViewReader/BackfillMaterializedView.
Embedded indirectly via WriterConfig.Target / ReaderConfig.Target (which carry an S3Target — the live form) so the four S3-wiring fields plus knobs live in exactly one place. Surfaced on Writer/Reader/Store via .Target() so read-only tools (NewMaterializedViewReader, BackfillMaterializedView) can address the same dataset without carrying T through their call graph.
func (S3TargetConfig) EffectiveMaxInflightRequests ¶ added in v0.14.0
func (c S3TargetConfig) EffectiveMaxInflightRequests() int
EffectiveMaxInflightRequests returns the configured MaxInflightRequests, or 32 when unset. 32 utilises typical S3 backends (~50 ms request latency → ~640 req/s sustained per Target) while staying comfortably below per-project concurrency caps published by managed S3 vendors (typically several hundred concurrent operations). Tune lower for small MinIO setups or very large parquet files (memory cost = 32 × largest parquet body).
func (S3TargetConfig) Validate ¶ added in v0.14.0
func (c S3TargetConfig) Validate() error
Validate runs the full check for constructors that operate on partitioned data: Bucket, Prefix, S3Client, PartitionKeyParts. Used by NewWriter, NewReader, BackfillMaterializedView — anything that reads/writes data files keyed by partition.
func (S3TargetConfig) ValidateLookup ¶ added in v0.14.0
func (c S3TargetConfig) ValidateLookup() error
ValidateLookup is the reduced check for constructors that only LIST / GET / PUT under a known prefix (no partition-key predicates): Bucket, Prefix, S3Client. Used by NewMaterializedViewReader — Lookup walks the <Prefix>/_matview/<name>/ subtree, which is keyed by the view's own Columns, not the config's PartitionKeyParts. A read-only analytics service can pass a minimally-populated S3TargetConfig and still build a working MaterializedViewReader.
type Store ¶
Store is the pure-Go entry point to an s3store. It composes an internal Writer + Reader: the two halves own their own state and methods, Store re-exposes everything via embedding so existing "one Store does both" callers keep working.
func New ¶
New constructs a Store from StoreConfig: builds an S3Target from the embedded S3TargetConfig, then projects the side-specific knobs onto WriterConfig[T] / ReaderConfig[T] and delegates to NewWriter + NewReader. The Writer and Reader share the one constructed S3Target — same MaxInflightRequests semaphore, same ConsistencyControl, same resolved CommitTimeout + MaxClockSkew.
Performs two S3 GETs at construction time — the persisted timing-config objects at <Prefix>/_config/commit-timeout and <Prefix>/_config/max-clock-skew — via NewS3Target. Construction fails when either object is missing, unparseable, or below its floor (CommitTimeoutFloor / MaxClockSkewFloor).
Services that only write or only read can skip this and call NewS3Target + NewWriter / NewReader directly with hand-built WriterConfig / ReaderConfig values.
type StoreConfig ¶ added in v0.18.0
type StoreConfig[T any] struct { S3TargetConfig // PartitionKeyOf extracts the Hive-partition key from a // record. Required for Write(). The returned string must // conform to the PartitionKeyParts layout ("part=value/part=value"). PartitionKeyOf func(T) string // EntityKeyOf returns the logical entity identifier for a // record. When non-nil, Read and PollRecords deduplicate to // the record with the maximum VersionOf per entity. When // nil, every record is returned (pure stream semantics). EntityKeyOf func(T) string // VersionOf returns the monotonic version of a record for // dedup ordering. Required when EntityKeyOf is set; ignored // otherwise. Typical implementations return a domain // timestamp / sequence number from a record field // (`func(r T) int64 { return r.UpdatedAt.UnixMicro() }`). // // To use the library's writer-stamped insertedAt as the // version, configure InsertedAtField on the writer side and // reference the same field here: // // VersionOf: func(r T) int64 { return r.InsertedAt.UnixMicro() } VersionOf func(record T) int64 // InsertedAtField names a time.Time field on T that the writer // populates with its wall-clock time.Now() captured just before // parquet encoding. The field must carry a non-empty, non-"-" // parquet tag (e.g. `parquet:"inserted_at"`) — the value is a // real parquet column persisted on disk. Empty disables the // feature; no reflection cost when unset. // // On the read side the column shows up on T as a normal field // — no special handling. Use it from VersionOf when you want // the writer's stamp to drive dedup ordering: // // VersionOf: func(r T) int64 { return r.InsertedAt.UnixMicro() } // // Forwarded to WriterConfig only. InsertedAtField string // Compression selects the parquet compression codec used on // Write. Zero value is snappy — matches the ecosystem default // and produces ~2-3× smaller files than the parquet-go raw // default (uncompressed) for no meaningful CPU cost on // decode. Set to CompressionUncompressed to opt out, // CompressionZstd / CompressionGzip to trade CPU for ratio. // New() validates this value and stores the resolved codec so // the hot-path Write doesn't reparse it. Compression CompressionCodec // MaterializedViews lists the secondary materialized views the // writer should maintain. See WriterConfig.MaterializedViews // for the full contract. Forwarded to WriterConfig only — // readers don't emit markers. MaterializedViews []MaterializedViewDef[T] // EncodeBufPoolMaxBytes caps the encode-pool buffer capacity // the Writer retains across writes. See // WriterConfig.EncodeBufPoolMaxBytes for the contract. // Forwarded to WriterConfig only — reader path doesn't pool // encode buffers. Zero or negative selects the default // (48 MiB). EncodeBufPoolMaxBytes int64 }
StoreConfig defines how a Store is set up. T is the record type, which must be encodable and decodable by parquet-go directly (struct fields tagged with `parquet:"..."`, primitive-friendly types). Types with fields parquet-go can't encode (e.g. decimal.Decimal, custom wrappers) need a companion parquet-layout struct and a translation step in the caller's package.
Embeds S3TargetConfig so the S3 wiring (Bucket, Prefix, S3Client, PartitionKeyParts, MaxInflightRequests, ConsistencyControl, MeterProvider) lives in exactly one place. New() reads the embedded value, builds an S3Target via NewS3Target, then projects the side-specific knobs onto WriterConfig[T] / ReaderConfig[T] (both of which take a built S3Target directly). Advanced users who only need one side can skip this type and call NewWriter / NewReader with a hand-built S3Target — see those constructors.
type StreamEntry ¶
type StreamEntry struct {
Offset Offset
Key string
DataPath string
RefPath string
InsertedAt time.Time
RowCount int64
}
StreamEntry is a lightweight ref returned by Poll.
- Offset and RefPath carry the same underlying S3 key string. Offset is typed for cursor-advancing in Poll-style APIs; RefPath exposes the same value as an explicit S3 object path for callers that want to GET the ref directly. Mirrors the Offset / RefPath split on WriteResult.
- Key is the Hive-style partition key ("period=X/customer=Y") that the writer originally passed to WriteWithKey — useful for consumers to route records by partition without parsing DataPath.
- DataPath is the S3 object key of the data file this ref points at; GET it to fetch the parquet payload.
- InsertedAt is the writer's wall-clock capture immediately before the ref PUT (microsecond precision; same value embedded as `refMicroTs` in the ref filename). It approximates ref-LIST-visibility time. Slightly later than the InsertedAtField parquet column (stamped at pre-encode write-start so the column reflects logical record time, not commit-finalize time); the two values can drift by the encode + data-PUT duration.
- RowCount is the number of records in the data file this ref points at, recovered from the token-commit's `rowcount` user-metadata. Free for Poll to surface — the gate already HEADs the marker per ref/token (cached per poll cycle), so no extra round trip.
type WriteOption ¶ added in v0.13.0
type WriteOption func(*writeOpts)
WriteOption configures write-path behavior. Mirrors the ReadOption pattern so the write side has the same mental model: one option type, one accumulator, one place to add new knobs.
func WithIdempotencyToken ¶ added in v0.13.0
func WithIdempotencyToken(token string) WriteOption
WithIdempotencyToken marks a write as a retry-safe logical unit identified by token. Recovery on retry is automatic and works across arbitrarily long S3 outages:
- Each attempt writes data and ref to fresh per-attempt paths (id = {token}-{attemptID:32hex UUIDv7}). No data or ref PUT in the write path ever overwrites — sidesteps multi-site StorageGRID's eventual-consistency exposure on overwrites.
- On retry, the writer issues an upfront HEAD on `<dataPath>/<partition>/<token>.commit`. If a prior attempt's commit is in place, the writer reconstructs the original WriteResult from the marker's metadata (no body re-upload, same DataPath / RefPath / InsertedAt as the prior successful attempt) and returns.
- If the commit is missing (no prior attempt landed, or every prior attempt crashed before the commit PUT), the retry proceeds with a fresh attempt-id end-to-end.
Tokens are unique per (partition key, logical write), not globally — the same token may be reused across different partition keys without colliding. The upfront HEAD runs per-partition. Within one partition the token must remain unique per logical write.
**Reader-side dedup recommended.** Near-concurrent retry overlap (out of contract per the README's Concurrency contract, but bounded if it arises by accident) can produce two committed attempts whose records share (entity, version). Records are bit-identical (parquet encoding is deterministic; the writer captures InsertedAt once and the same value drives every column / filename / metadata field). Configure `EntityKeyOf` + `VersionOf` on the reader to collapse the duplicate records on read.
token must pass validateIdempotencyToken (non-empty, no "/", no "..", printable ASCII, <= 200 chars) — validation runs at resolve time so bad tokens fail at the call site rather than inside the write path.
An empty string is rejected (not silently downgraded to the auto-token path) so a caller wiring `WithIdempotencyToken(row.Token)` can't accidentally turn off idempotency by forgetting to populate the source. Callers whose token is genuinely optional should branch at construction:
opts := []s3store.WriteOption{...}
if row.Token != "" {
opts = append(opts, s3store.WithIdempotencyToken(row.Token))
}
Mutually exclusive with WithIdempotencyTokenOf — combining the two surfaces an error at option-resolution time.
func WithIdempotencyTokenOf ¶ added in v0.15.0
func WithIdempotencyTokenOf[T any]( fn func(partitionRecords []T) (string, error), ) WriteOption
WithIdempotencyTokenOf is the per-partition variant of WithIdempotencyToken: fn is invoked once per partition with the records routed to that partition, and its return value drives that partition's idempotency token. Lets a single multi- partition Write retry with a different token per partition (e.g. one outbox row per partition).
Semantics on each partition match WithIdempotencyToken: the returned token anchors the per-attempt id and the `<dataPath>/<partition>/<token>.commit` marker, the upfront HEAD dedups same-token retries, and the token must pass validateIdempotencyToken.
On WriteWithKey (single partition) fn is invoked once with the full records slice — symmetric with the multi-partition case.
fn returning an error fails that partition's write; under Write's parallel fan-out a single failure propagates as "first error wins" with partial-success on already-committed partitions (same shape as any other write-path error).
Mutually exclusive with WithIdempotencyToken — passing both surfaces an error at option-resolution time. fn=nil is a no-op (the partition runs the non-idempotent auto-token path).
func WithInsertedAt ¶ added in v0.16.0
func WithInsertedAt(t time.Time) WriteOption
WithInsertedAt overrides the writer's default "insertion time" for this batch. The supplied value flows uniformly to every downstream surface — the parquet InsertedAtField column (when configured), the token-commit `insertedat` user-metadata, and WriteResult.InsertedAt — replacing the default capture of time.Now() taken just before parquet encoding.
Use cases:
- Caller-owned event time. The "real" insertion timestamp lives in an outbox row / external system; passing it here keeps the in-file column and result aligned with that source rather than the writer's wall-clock.
- Reproducible writes. Tests and replays that supply the same records + same InsertedAt + same compression codec produce byte-identical parquet bytes (the only non-determinism in the encode path is the InsertedAtField injection).
Truncated to microsecond precision at use site so a LookupCommit round-trip yields a time.Time that compares Equal to the value in the original WriteResult.
The zero value (time.Time{}) is treated as "not supplied" — the writer falls back to time.Now(). Callers who legitimately need to stamp the year-1 zero time should pass a value with a non-zero monotonic component, but no real workload needs this.
Idempotency-retry interaction: when the upfront HEAD on `<token>.commit` finds a prior commit, the returned WriteResult is reconstructed from the prior commit's metadata — WithInsertedAt on the retry attempt is ignored in favour of the original attempt's value. This preserves the contract that a same-token retry returns the original WriteResult unchanged.
func WithOptimisticCommit ¶ added in v0.21.0
func WithOptimisticCommit() WriteOption
WithOptimisticCommit skips the per-write upfront HEAD on `<token>.commit` and detects prior commits via the commit PUT itself: the writer attaches `If-None-Match: *` so a backend that supports conditional PUT returns 412 PreconditionFailed when the commit already exists, or — on a backend with a bucket policy denying s3:PutOverwriteObject on the commit subtree — returns 403 AccessDenied. Either signal triggers a HEAD round-trip to recover the prior commit's metadata, and the caller receives the original WriteResult unchanged.
Trade-off:
- **Save** one HEAD per write (5–15ms on AWS, ~1× the request budget for that operation) on the steady-state path. At scale this is the dominant savings — a Write fanning out across N partitions is N HEADs lighter, ~20% fewer S3 requests per Write.
- **Pay** a multi-MB orphan parquet + orphan ref per retry-found-prior-commit (every same-token retry that lands after a successful commit). The orphans are invisible to readers via the commit gate (their attempt-id ≠ canonical), and operator-driven cleanup reclaims them (S3 lifecycle rule, manual prune).
Break-even sits around a 5% retry-found-prior rate: below that, the HEAD savings dominate; above that, the orphan + bandwidth cost outweighs the savings. Healthy CDC pipelines retry well under 1%, so the option is a clear win for high-throughput workloads near the request-rate ceiling. Bulk migrations with frequent orchestrator restarts may not benefit.
**Backend requirement.** The detection mechanism needs ONE of:
- Conditional PUT (`If-None-Match: *`) — atomic server-side precondition. Supported on AWS S3 since November 2024, MinIO recent versions, and StorageGRID 12.0+. Returns 412.
- Bucket policy denying `s3:PutOverwriteObject` on the `<prefix>/data/*/*.commit` subtree. Used on older StorageGRID versions where conditional PUT isn't available. Returns 403. The s3store README documents the policy + a boto3 setup snippet under "Optimistic commit setup".
On a backend that supports neither, the second commit PUT silently overwrites — the WriteResult returned to the caller will be the *new* attempt's, not the prior commit's, breaking the same-token-retry-returns-same-WriteResult contract. The option does not detect this at construction time; verify support before enabling.
Mutually compatible with WithIdempotencyToken / WithIdempotencyTokenOf. Has no effect on the auto-token (no-token) path: a freshly- generated UUIDv7 is guaranteed to 404 by construction, so the upfront HEAD is already skipped there regardless of this option.
type WriteResult ¶
type WriteResult struct {
Offset Offset
DataPath string
RefPath string
InsertedAt time.Time
// RowCount is the number of records persisted in this commit's
// parquet file. On a fresh write this is len(records) for the
// partition; on the retry-finds-prior-commit path it is recovered
// from the token-commit's `rowcount` user-metadata so the value
// matches the original attempt's parquet exactly. Surfaced so
// retry-recovery callers — who don't have the original records
// slice — can still report batch size without GETting the parquet.
RowCount int64
}
WriteResult contains metadata about a completed write.
InsertedAt is the writer's pre-encode wall-clock at write-start (microsecond precision) by default — or the caller's WithInsertedAt override, when supplied — the same value stamped into the parquet's InsertedAtField column. Surfaced on the result so callers can log / persist it (e.g., into an outbox table) without parsing the data path or issuing a HEAD.
Under WithIdempotencyToken, a same-token retry whose upfront HEAD on `<token>.commit` finds a prior commit returns *that* commit's WriteResult (DataPath, RefPath, InsertedAt all reflect the prior attempt — InsertedAt is recovered from the token-commit's `insertedat` user-metadata, so it agrees with the prior attempt's column value byte-for-byte). Callers comparing two results from the same token across retries will therefore see identical values whenever a prior attempt's token-commit is still in place.
type Writer ¶ added in v0.14.0
type Writer[T any] struct { // contains filtered or unexported fields }
Writer is the write-side half of a Store. Owns the write path (Write / WriteWithKey) and the materialized-view list that drives marker emission on Write.
Construct directly via NewWriter when a service only writes; embed in Store when it also reads.
func NewWriter ¶ added in v0.14.0
func NewWriter[T any](cfg WriterConfig[T]) (*Writer[T], error)
NewWriter constructs a Writer directly from WriterConfig. Use this in services that only write; use New(Config) when the same process also reads through a Reader/Store.
Validation mirrors the writer-side half of New: the Target must carry Bucket / Prefix / S3Client / PartitionKeyParts; Compression resolves to a codec (zero value → snappy); every MaterializedViewDef in cfg.MaterializedViews is shape-validated and Of must be non-nil. View names must be unique across the slice. PartitionKeyOf is optional at construction — Write errors if called without it, but WriteWithKey works regardless.
Constructor performs no S3 I/O. Idempotent retries are gated by the writer's upfront HEAD on `<dataPath>/<partition>/<token>.commit` (see WithIdempotencyToken): data and ref PUTs land at fresh per-attempt paths and never overwrite, and a prior attempt's commit marker reconstructs the original WriteResult unchanged. Cross-backend uniformity — no dependency on If-None-Match support or bucket-policy denies.
func (*Writer[T]) GroupByPartition ¶ added in v0.20.0
func (s *Writer[T]) GroupByPartition(records []T) []HivePartition[T]
GroupByPartition splits records by their Hive partition key (PartitionKeyOf) and returns one HivePartition per distinct key in lex-ascending order of Key. Deterministic across calls — same input produces byte-identical output, mirroring the read-side emission-order invariant. Same HivePartition[T] type the ReadPartition* methods yield.
Use to preview partitioning without paying the write cost (logging, sharding decisions, dry-run validation) or as a building block for custom write strategies (filter some partitions, write the rest; route partitions to different Targets; etc.).
Empty records returns nil. Records whose PartitionKeyOf returns the same string land in the same HivePartition; per-partition record order is preserved (insertion order from the input slice).
Public contract: partition emission is lex-ordered. See "Deterministic emission order across read and write paths" in CLAUDE.md — GroupByPartition is the write-side counterpart of that invariant.
func (*Writer[T]) LookupCommit ¶ added in v0.15.0
func (w *Writer[T]) LookupCommit( ctx context.Context, partition, token string, ) (wr WriteResult, ok bool, err error)
LookupCommit returns the WriteResult of the canonical write committed under (partition, token), if any. One HEAD against `<dataPath>/<partition>/<token>.commit`; no LIST, no parquet re-encode, no parquet GET.
Sits on Writer because the use case is write-side: a service that stores the (partition, token) pair alongside its outbox row and, on retry, wants to know "did the prior attempt commit?" before re-fetching records and re-encoding parquet. WriteWithKey under the same token would do the same upfront HEAD internally, but only after re-encoding parquet — calling LookupCommit first lets retries skip the encode entirely on the recovery path. The HEAD + reconstruction primitives (headTokenCommit / reconstructWriteResult) are exactly the ones writeEncodedPayload's Step 2 calls; this method is the externalized form of that step.
On a hit, the returned WriteResult is byte-identical to what the original Write returned (DataPath / RefPath / Offset reconstructed from the path scheme + the marker's metadata; InsertedAt sourced from the `insertedat` user-metadata so it matches the parquet's InsertedAtField column value).
ok=false signals 404: no commit exists for that (partition, token). If the caller knows a Write was attempted for this token and ok=false, the write either crashed or returned an error to its original caller.
Validates token via validateIdempotencyToken so callers fail loudly on garbage rather than HEADing nonsensical keys.
func (*Writer[T]) PartitionKey ¶ added in v0.14.0
PartitionKey applies the configured PartitionKeyOf to rec and returns the resulting partition key. Intended for callers that want the single-partition WriteWithKey path without having to re-implement the key format at the call site:
_, err := w.WriteWithKey(ctx, w.PartitionKey(recs[0]), recs)
Panics if PartitionKeyOf was not set at construction — the same nil-func-call semantics Write gets, just surfaced at a clearer site.
func (*Writer[T]) RestampRef ¶ added in v0.23.0
func (w *Writer[T]) RestampRef( ctx context.Context, prior *WriteResult, ) (result *WriteResult, err error)
RestampRef writes an additional ref for an already-committed write, with a fresh writer wall-clock refMicroTs. The data file and `<token>.commit` marker are reused unchanged — the only S3 op is a single zero-byte PUT at a new ref key. Use this to recover from ErrCommitAfterTimeout when stream consumers need the write inside their SettleWindow: the new ref's refMicroTs is current, so the next poll past now+SettleWindow is guaranteed to see it with the commit gate already satisfied.
The same-token-retry path is NOT a recovery for stream observability — its upfront HEAD finds the prior commit and returns the prior WriteResult unchanged without writing any new ref, so the original refMicroTs is unchanged and a stream consumer past that offset still misses it. RestampRef is the only primitive that brings a timed-out commit back into the SettleWindow.
The returned WriteResult differs from prior only in Offset / RefPath (the new ref's key, with a fresh refMicroTs). DataPath, InsertedAt, and RowCount are unchanged. The original ref is left in place, so stream readers see two emissions for the same data: the consumer is responsible for tolerating the duplicate, either by configuring EntityKeyOf+VersionOf dedup (which collapses to a single record per (entity, version)) or by accepting at-least-once at the consumer.
Constraints:
- prior must be the WriteResult of a commit produced by THIS Writer's store: prior.RefPath and prior.DataPath must lie under the Writer's _ref / data prefixes. A cross-Writer call surfaces an error rather than silently writing a ref that no reader can resolve through this Writer's commit gate.
- prior must reflect a real commit. RestampRef does not HEAD `<token>.commit` first — it trusts that prior came from a successful Write/WriteWithKey/LookupCommit return. If the commit was manually deleted out-of-band, the new ref will be filtered as an orphan by readers (no harm, just no effect).
Idempotency: each call writes a fresh ref at a new `<refMicroTs>-<token>-<attemptID>` key, so multiple calls produce multiple stream emissions. The token / attemptID are reused from prior unchanged — the canonical-attempt check in reader_poll.go gates ref→data lookup on attemptID matching `<token>.commit`'s metadata, so a fresh attemptID would render the new ref invisible.
Like Write/WriteWithKey, RestampRef can return a non-nil WriteResult alongside an ErrCommitAfterTimeout error: the new ref IS durable, but elapsed wall-clock from refMicroTs to ref-PUT-completion exceeded CommitTimeout, so a stream reader's SettleWindow may already have advanced past the new refMicroTs by the time the ref becomes LIST-visible. Recovery: call RestampRef again with the same prior. Detect via errors.Is(err, ErrCommitAfterTimeout).
The ref PUT runs under context.WithoutCancel(ctx): the body is a single zero-byte PUT, so caller-prompt cancellation isn't worth the cost of a partially-completed PUT (response in flight, ref already on S3) returning err to a caller who'd retry and double up the duplicate. Caller cancellation is observed only at the boundaries (parameter validation, paths-prefix check, parseRefKey).
func (*Writer[T]) Target ¶ added in v0.14.0
Target returns the untyped S3Target this Writer is bound to. Use when constructing read-only tools (NewMaterializedViewReader, BackfillMaterializedView) against the same dataset without carrying the Writer's T into their call graph.
func (*Writer[T]) Write ¶ added in v0.14.0
func (s *Writer[T]) Write( ctx context.Context, records []T, opts ...WriteOption, ) (results []WriteResult, err error)
Write extracts the key from each record via PartitionKeyOf, groups by key, and writes one Parquet file + stream ref per key in parallel (bounded by Target.MaxInflightRequests, default 32). Returns one WriteResult per partition that completed, in sorted-key order regardless of completion order.
On failure, cancels remaining partitions and returns whatever results landed first, with the first real (non-cancel) error. Partial success is the accepted outcome — each partition's data+markers+ref sequence is self-contained.
An empty records slice is a no-op: (nil, nil) is returned so callers don't have to guard against batch-pipeline edge cases.
func (*Writer[T]) WriteWithKey ¶ added in v0.14.0
func (s *Writer[T]) WriteWithKey( ctx context.Context, key string, records []T, opts ...WriteOption, ) (result *WriteResult, err error)
WriteWithKey encodes records as Parquet, uploads to S3, writes the ref file, and lands the token-commit marker that flips visibility for both the snapshot and stream read paths atomically.
Each attempt writes to a per-attempt data path (`{partition}/{token}-{attemptID}.parquet`), so no data PUT ever overwrites — sidesteps multi-site StorageGRID's eventual-consistency exposure on data-file overwrites. token is the caller's WithIdempotencyToken value, or a writer-generated UUIDv7 (used as both token and attemptID) for non-idempotent writes. The trade is a per-attempt orphan triple (data + ref + possibly token-commit) on failure; the reader's commit-marker gate filters them out, and the operator-driven sweeper reclaims them.
On any failure mid-sequence the returned error is wrapped and nothing is deleted. This is the at-least-once contract: a failed Write may leave per-attempt orphans, never deletes one. Reads ignore the orphans because their token-commits either didn't land or name a different attempt-id.
Passing WithIdempotencyToken makes this call retry-safe across arbitrary outages: an upfront HEAD on `<token>.commit` returns any prior commit's WriteResult reconstructed from metadata (no body re-upload, no new PUTs). If no prior commit exists, the retry proceeds with a fresh attempt-id; recovery is automatic regardless of how long ago the original landed. **Concurrent writes that share the same token are out of contract** — see README's Concurrency contract section.
type WriterConfig ¶ added in v0.14.0
type WriterConfig[T any] struct { Target S3Target PartitionKeyOf func(T) string Compression CompressionCodec // MaterializedViews lists the secondary materialized views the // writer should maintain. Every Write iterates each entry, // calls Of per record, and PUTs one empty marker per distinct // (view, column-values) tuple in the batch under // <Prefix>/_matview/<Name>/. Validation runs at NewWriter: // Name non-empty + free of '/', Columns valid + unique, // Of non-nil, Names unique across the slice. // // Constructed at writer-creation time so registration cannot // race with Write and "registered after the first Write" is // not a reachable state. Use BackfillMaterializedView to // retroactively cover records written before a view existed. MaterializedViews []MaterializedViewDef[T] // InsertedAtField names a time.Time field on T that the writer // populates with its wall-clock time.Now() just before parquet // encoding, so the value becomes a real parquet column in every // written file. The field must carry a non-empty, non-"-" // parquet tag (e.g. `parquet:"inserted_at"`) — the value is // persisted on disk, not library-managed metadata. Empty // disables the feature; there is no reflection cost when unset. // // On the read side the column shows up on T like any other // parquet field — no special reader configuration needed. // Reference it from VersionOf to use the write-time stamp as // the dedup version: // // VersionOf: func(r T) int64 { return r.InsertedAt.UnixMicro() } InsertedAtField string // EncodeBufPoolMaxBytes caps the *bytes.Buffer capacity that the // internal encode pool retains across writes. Buffers that grew // beyond this on a single Write are dropped on Put — preventing // one outlier batch from ballooning the pool's steady-state // footprint, at the cost of losing reuse on the next Write of // similar size. // // Set this slightly above the largest typical produced parquet // size for the workload. If too low, large writes pay // grow-from-zero plus copy-out on every Write (the encode // becomes net negative vs no pool — see BenchmarkEncode_Pooled // at sizes above the cap). If too high, idle pool retention // grows in proportion to GOMAXPROCS, multiplied by the number // of Writer instances in the process. // // The s3store.write.encode_buf_dropped counter increments every // time a buffer is dropped due to this cap; a non-zero rate // indicates the cap is undersized for the workload. // // Zero or negative selects the default (48 MiB). EncodeBufPoolMaxBytes int64 }
WriterConfig is the narrower Config form for constructing a Writer directly (without a Reader). Holds the S3-wiring bundle (Target — a constructed S3Target) plus write-side-only knobs. Use NewWriter(cfg) when a service writes but never reads.
Target is built once via NewS3Target and can be passed to both WriterConfig.Target and ReaderConfig.Target so the resulting Writer and Reader share the same MaxInflightRequests semaphore — net in-flight S3 requests across both halves stay bounded by one cap.
Source Files
¶
- commit.go
- concurrency.go
- consistency.go
- doc.go
- listing.go
- materialized_view.go
- materialized_view_backfill.go
- materialized_view_read.go
- materialized_view_write.go
- metrics.go
- paths.go
- patterns.go
- reader.go
- reader_dedup.go
- reader_iter.go
- reader_options.go
- reader_partition_iter.go
- reader_poll.go
- reader_read.go
- store.go
- target.go
- writer.go
- writer_options.go
- writer_write.go