s3store

package module
v0.25.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 6, 2026 License: MIT Imports: 34 Imported by: 0

README

s3store

Go Reference Release

Append-only, versioned data storage on S3 with a change stream. Pure Go, cgo-free. No server. No broker. No coordinator. Just a Go library and an S3 bucket.

What it does

  • Write typed records as Parquet, grouped into Hive-style partitions.
  • Stream changes via lightweight "ref" files — one empty S3 object per write, all metadata in the object key.
  • Read point-in-time deduplicated snapshots with glob support.
  • Iterate large reads with bounded-memory streaming.
  • Project secondary lookups with empty marker files (LIST-only Lookup).

Guarantees

No transactional coordinator — S3 is the only source of truth. The contract follows from that:

  • At-least-once on storage. A successful Write is durable; a retry after partial failure may produce duplicate data files and refs. The library never deletes data it has written.

  • Exactly-once at the consumer is opt-in. Two complementary mechanisms collapse the at-least-once duplicates:

    • EntityKeyOf + VersionOf on the reader collapses duplicates with the same (entity, version) to one record — the universal solution that works for any duplicate source.
    • WithIdempotencyToken on the writer makes sequential retries recover the prior commit unchanged via an upfront HEAD on <token>.commit — no re-upload, byte-identical WriteResult.

    Combine both for the strongest guarantee: tokens shrink the duplicate window at storage; reader dedup catches whatever slips through.

  • Read-after-write on snapshot reads. Read / ReadIter / ReadPartitionIter / MaterializedViewReader.Lookup / BackfillMaterializedView see new records the moment Write returns. The change-stream APIs (Poll, PollRecords, ReadRangeIter, ReadPartitionRangeIter) intentionally lag the tip by SettleWindow to tolerate S3 LIST propagation skew.

  • Read stability. Two consecutive snapshot reads with no intervening writes return the same records — the library never deletes or rewrites data on its own.

  • Stream replay stability. Refs (the change-stream offsets) are immutable: once a record is observed at offset N by Poll / PollRecords / ReadRangeIter / ReadPartitionRangeIter, replaying from offset 0 sees that same record at the same offset N, every time. Load-bearing for checkpointed pipelines — a consumer that processed up to offset 100 can crash, restart from offset 100, and resume without missing or duplicating. Holds because the ref filename embeds a writer-stamped refMicroTs (fixed-width lex-numeric), refs are per-attempt paths the library never rewrites or deletes, and MaxClockSkew + SettleWindow bound how late a ref can become visible relative to its stamped time — no new ref can be retroactively inserted at or before an already-observed offset.

  • Atomic per-file visibility via the token-commit marker. Every Write lands a single <token>.commit zero-byte object after the data and ref PUTs. Both read paths gate on its presence: snapshot reads (Read / ReadIter / ReadPartitionIter / MaterializedViewReader.Lookup) drop parquets without a sibling commit; the change-stream APIs (Poll / PollRecords / ReadRangeIter / ReadPartitionRangeIter / ReadEntriesIter / ReadPartitionEntriesIter) HEAD <token>.commit for each ref before yielding. A writer that crashes mid-sequence leaves an orphan parquet/ref pair that stays invisible to every read path — no in-flight states leak. A multi-partition Write is still not atomic across partitions; partition commits become visible one at a time. For per-partition workloads (typical CDC, read-modify-write under WithIdempotencyToken) that's sufficient. For workloads that compute deltas across partitions in one logical step, treat each partition's commit independently — read each via PollRecords / ReadRangeIter / ReadPartitionRangeIter so the read boundary lines up with committed refs, and checkpoint by offset rather than wall-clock.

Corollary: once a data file is in S3, it stays forever — even if the Write call returned an error. A crashed write can leave an orphan data file with no matching ref; it's still visible to snapshot reads, which is consistent with at-least-once. The library can't tell "committed" from "crashed-mid-write" without external transactional metadata, and can't know whether a reader has already observed a file — so automatic garbage collection isn't possible without breaking read stability. Cleanup of orphans is an operator decision (S3 lifecycle rules, or a manual prune with readers quiesced), not a library feature.

Concurrency contract: one in-flight write per token. Concurrent writes that share the same WithIdempotencyToken — or auto-token writes that happen to share an attempt-id, a non-occurrence under UUIDv7 — are out of contract. Sequential retries (a failed write followed by a same-token retry) are the design's primary use case and remain fully supported. The constraint buys back enough determinism that the writer's <token>.commit PUT can be tolerated as a no-overwrite write under sequential retries: the upfront HEAD short-circuits a same-token retry before any second commit PUT lands.

Writer wall-clock is in the protocol. Refs encode refMicroTs — the writer's microsecond wall-clock captured immediately before the ref PUT. The reader's `refCutoff = now

  • SettleWindowtherefore compares writer-stamped time against reader wall-clock;MaxClockSkew bounds writer↔reader skew. No backend-LastModified` dependency. See CLAUDE.md "Backend assumptions" for the full treatment.

Detailed contract and configuration in Durability guarantees.

Install

go get github.com/ueisele/s3store@latest

s3store is pre-v1 — minor version bumps (v0.x.0) may carry breaking API changes. Pin an exact version in your go.mod (or commit your go.sum) to control when you pick them up. Requires Go 1.26.2+ (declared in go.mod).

Initializing a new dataset

Each dataset's timing config is persisted as two objects so writer and reader agree on the values by construction:

  • <Prefix>/_config/commit-timeout — the writer's budget for the contract-relevant tail of the write: the elapsed wall-clock from refMicroTs (captured just before the ref PUT) to token-commit-PUT completion. Pre-ref work (parquet encoding, marker PUTs, data PUT — the last of which scales with payload size) is not in the budget. A write whose ref→commit elapsed exceeds this budget returns an error to the caller — the token-commit still lands, so a same-token retry recovers via the upfront HEAD; the error surfaces that the stream reader's SettleWindow (derived from this same value) may already have advanced past the write's refMicroTs. See Tuning CommitTimeout below for the recommended range.
  • <Prefix>/_config/max-clock-skew — the operator's assumed bound on writer↔reader wall-clock divergence. Refs encode the writer's refMicroTs directly, so this skew is what the reader's refCutoff has to absorb. Read by the reader's poll cutoff; the writer doesn't read it.

SettleWindow = CommitTimeout + MaxClockSkew is derived, not persisted; it's the cutoff Poll / PollRecords / ReadRangeIter / ReadPartitionRangeIter apply to the live tip.

Operators seed both objects once when provisioning a new prefix; NewS3Target (and New(StoreConfig)) GET them at construction time and stamp the resolved values (plus the derived SettleWindow) on the Target. Construction fails with a hint when either object is missing.

Each body is a Go time.Duration string. The library has no fallback — a missing or unparseable object fails construction with a hint at the seeding step. Floors: commit-timeout ≥ 1ms (CommitTimeoutFloor — strictly positive; zero would cause every write to exceed the timeout); construction additionally emits a slog.Warn at level WARN when the configured value is below CommitTimeoutAdvisory (8s — see Tuning CommitTimeout below). max-clock-skew ≥ 0 (zero is valid on tightly-clocked deployments, negative is incoherent). The boto3 snippet below ships 15s / 1s as sensible starting values for a typical deployment with NTP-synced nodes — comfortably above the advisory floor with headroom for both transient retries and actual request latency. Tune higher when you can't rule out larger skew or sustained S3 SlowDown bursts; values below the advisory still pass construction but trigger the warn.

Once seeded, both values are immutable — changing either silently rewrites history (decreasing commit-timeout makes valid markers fail; increasing it resurrects timed-out writes; decreasing max-clock-skew may shift the cutoff so stream consumers temporarily skip refs they used to include). An operator who genuinely needs different values re-creates the store at a new prefix and migrates.

Seed both with the Python boto3 snippet below (one-time, before any process constructs a Target):

import boto3

s3 = boto3.client("s3", endpoint_url="https://s3.example.com")
s3.put_object(
    Bucket="my-bucket",
    Key="my-prefix/_config/commit-timeout",
    Body=b"15s",
    ContentType="text/plain",
)
s3.put_object(
    Bucket="my-bucket",
    Key="my-prefix/_config/max-clock-skew",
    Body=b"1s",
    ContentType="text/plain",
)

The integration-test fixture (fixture_test.go) provides a SeedTimingConfig helper that does the same thing, called once per test.

Tuning CommitTimeout

CommitTimeout bounds the elapsed wall-clock from refMicroTs (captured just before the ref PUT) to token-commit-PUT completion. Two PUTs participate in that window. Each S3 call is wrapped by the library's transient-error retry policy: retryMaxAttempts = 5 (1 initial + 4 retries) with jittered backoffs drawn from [100ms, 300ms) / [300ms, 500ms) / [500ms, 800ms) / [500ms, 800ms) — worst-case 2.4s of backoff sleep per call (best case 1.4s, average 1.9s), plus the per-attempt request time. The jitter decorrelates concurrently failing callers so a burst of 503 SlowDown responses from one prefix doesn't translate into a synchronised retry storm against the same prefix on the next round; the plateau at the fourth retry caps per-attempt sleep so the cumulative envelope stays manageable while the wider attempt count lowers the probability that a sustained SlowDown burst leaves an orphan parquet (data PUT succeeded but ref or token-commit failed terminally). Across the two PUTs, the worst-case retry-sleep envelope is ~4.8s, again before counting actual request time.

CommitTimeoutAdvisory is set to 8s — roughly 1.67× the worst-case retry envelope, with headroom for actual request latency. Values below this still pass construction (the hard floor is CommitTimeoutFloor = 1ms), but NewS3Target emits a slog.Warn at level WARN naming the configured value and the advisory floor:

Configured value Behaviour
< 1ms or ≤ 0 Construction fails — "below the floor".
1ms< 8s Construction succeeds with a slog.Warn. Acceptable on tightly-clocked dev/test deployments where you control retry behaviour and want a tight SettleWindow.
≥ 8s No warning. Recommended for production.

Sizing guidance:

  • Production on AWS S3 / MinIO / single-site StorageGRID with NTP-synced nodes: 10–30s. Leaves headroom on transient retries and keeps SettleWindow = CommitTimeout + MaxClockSkew at a modest tens of seconds.
  • Multi-site StorageGRID: tune higher (30–120s) if your cross-site replication adds visible latency to PUT acknowledgement at strong-global.
  • Dev/test loops where you want sub-second SettleWindow: pick < 8s, accept the warning, run on a backend where retries are unlikely (local MinIO).

The advisory floor is a recommendation, not a contract — the library's correctness invariants don't depend on it. It exists so operators see a clear signal when their configured budget is below the worst-case retry envelope of the two SettleWindow- relevant PUTs.

Tuning the AWS SDK retryer for SlowDown

The library's retry layer (retryMaxAttempts = 5, jittered backoffs) is the outer envelope; the AWS SDK runs its own retryer underneath every S3 call. The SDK default (RetryMode: standard) uses a fixed 500-token retry quota that drains under sustained throttling — at that point the retry quota exceeded, 0 available, 5 requested lines appear in the library's masked-retry logs, and each SDK-denied retry surfaces the underlying 503 to the outer layer instead of absorbing it silently.

For workloads with sustained SlowDown pressure on a saturated prefix, switch the SDK to adaptive mode. The cleanest place is on the aws.Config you pass to s3.NewFromConfig — it then propagates to every client built from it:

import (
    "github.com/aws/aws-sdk-go-v2/aws"
    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/service/s3"
)

cfg, err := config.LoadDefaultConfig(ctx,
    config.WithRetryMode(aws.RetryModeAdaptive),
)
if err != nil { /* ... */ }
cli := s3.NewFromConfig(cfg)

Or override per-client when the aws.Config comes from elsewhere:

cli := s3.NewFromConfig(cfg, func(o *s3.Options) {
    o.RetryMode = aws.RetryModeAdaptive
})

Adaptive mode adds a client-side token-bucket rate-limiter on top of the standard retryer's quota: when the SDK observes throttle responses (5xx, 429 SlowDown) on a connection, it preemptively delays new requests before they hit the wire, smoothing bursts into the prefix's actual throughput. This is AWS's recommended configuration for sustained-SlowDown scenarios.

The library never wraps or replaces your *s3.Client, so the mode you set on it carries through every PUT/GET/HEAD/LIST. Diagnose whether the switch helped via the new s3store.s3.transient_error.count metric and the Transient retry causes (% of requests by error.type) dashboard panel: a drop in error.type="slowdown" after the change is the rate-limiter doing its job.

Trade-offs:

  • Latency variance grows under throttling. That is the intended behaviour — the rate-limiter adds delay so new requests don't pile onto a saturated prefix. If callers have tight per-call deadlines, raise them with the new floor in mind.
  • AWS marks adaptive as experimental (aws.RetryModeAdaptive carries a "subject to change" note in the SDK godoc). The semantics — preemptive client-side throttling on observed 503/429 — have been stable across recent SDK releases, but pin your aws-sdk-go-v2 version if you depend on the specific behaviour.
  • Setting matters per *s3.Client. If your process builds multiple clients (uncommon), set the mode on each — the library shares one client per S3Target but doesn't dictate how callers compose them.

Quick start

type CostRecord struct {
    CustomerID   string    `parquet:"customer_id"`
    ChargePeriod string    `parquet:"charge_period"`
    SKU          string    `parquet:"sku"`
    NetCost      float64   `parquet:"net_cost"`
    Currency     string    `parquet:"currency"`
    CalculatedAt time.Time `parquet:"calculated_at,timestamp(millisecond)"`
}

// New does two S3 GETs on the persisted timing-config objects at
// <Prefix>/_config/commit-timeout and <Prefix>/_config/max-clock-skew
// — seed both once via the snippet in "Initializing a new dataset"
// before this call.
store, err := s3store.New[CostRecord](ctx, s3store.StoreConfig[CostRecord]{
    S3TargetConfig: s3store.S3TargetConfig{
        Bucket:            "warehouse",
        Prefix:            "billing",
        S3Client:          s3Client,
        PartitionKeyParts: []string{"charge_period", "customer"},
    },
    PartitionKeyOf: func(r CostRecord) string {
        return fmt.Sprintf("charge_period=%s/customer=%s",
            r.ChargePeriod, r.CustomerID)
    },
    // Optional: enable latest-per-entity dedup on Read / PollRecords.
    // EntityKeyOf and VersionOf are required together — both or
    // neither. New rejects partial config.
    EntityKeyOf: func(r CostRecord) string {
        return r.CustomerID + "|" + r.SKU
    },
    VersionOf: func(r CostRecord) int64 {
        return r.CalculatedAt.UnixMicro()
    },
})
if err != nil {
    log.Fatal(err)
}

// Write — groups records by PartitionKeyOf, one Parquet file per group
_, err = store.Write(ctx, records)

// Snapshot read — deduplicated by VersionOf when EntityKeyOf is set
latest, err := store.Read(ctx, []string{"charge_period=2026-03-17/customer=abc"})

// Iterate over a range without buffering everything in memory
for r, err := range store.ReadIter(ctx, []string{"charge_period=2026-03-*/*"}) {
    if err != nil { return err }
    process(r)
}

// Stream changes since an offset (one S3 LIST, no GETs)
entries, next, err := store.Poll(ctx, since, 100)

s3store decodes directly into []CostRecord via the parquet struct tags on T (parquet-go's GenericReader[T]) — no ScanFunc or manual column-order bookkeeping. A field whose column is missing from a given file lands as Go's zero value.

Writer / Reader / Narrow-T reads

s3store.Store[T] is a composition of two public halves: a Writer[T] for the write path and a Reader[T] for the read path. Most users keep using New(StoreConfig) and get both through Store. Services that only write or only read can construct a single half with a narrower config:

// Build the shared S3 wiring once — Target is the untyped handle
// Writer, Reader, MaterializedViewReader, and BackfillMaterializedView all speak.
// NewS3Target does two S3 GETs (commit-timeout + max-clock-skew) at
// construction time; seed both via the snippet in "Initializing a new
// dataset" before this call.
target, err := s3store.NewS3Target(ctx, s3store.S3TargetConfig{
    Bucket:            "warehouse",
    Prefix:            "billing",
    S3Client:          s3Client,
    PartitionKeyParts: []string{"charge_period", "customer"},
})
if err != nil { log.Fatal(err) }

// Write-only service: no read-side knobs in config.
w, err := s3store.NewWriter[CostRecord](s3store.WriterConfig[CostRecord]{
    Target:         target,
    PartitionKeyOf: func(r CostRecord) string { /* ... */ },
})

// Read-only service: no PartitionKeyOf / Compression.
r, err := s3store.NewReader[CostRecord](s3store.ReaderConfig[CostRecord]{
    Target: target,
})
Narrow-T reads

When the on-disk record has a heavy write-only column you never read (a JSON blob, an audit log, an embedding vector), declare a narrower T' that omits it and open a Reader[T'] over the Writer's data. parquet-go skips unlisted columns on decode — the heavy bytes stay in S3.

type FullRec struct {
    Customer   string `parquet:"customer"`
    Amount     float64 `parquet:"amount"`
    ProcessLog string `parquet:"process_log"` // huge, write-only
}
type NarrowRec struct {
    Customer string  `parquet:"customer"`
    Amount   float64 `parquet:"amount"`
    // ProcessLog deliberately absent.
}

store, _ := s3store.New[FullRec](cfg)
view, _ := s3store.NewReaderFromStore[NarrowRec, FullRec](store,
    s3store.ReaderConfig[NarrowRec]{})

recs, _ := view.Read(ctx, []string{"*"}) // []NarrowRec — ProcessLog not fetched

The narrow ReaderConfig[T'] carries the read-side knobs (EntityKeyOf, VersionOf, ConsistencyControl); the constructor overwrites the Target field from the source Writer/Store so SettleWindow and the S3 wiring are inherited automatically. Dedup closures are typed over T', so you supply them explicitly for the narrow shape when needed.

The relationship between constructors:

Want Call
Both write and read, same T New(StoreConfig[T])*Store[T]
Write only, narrow config NewWriter(WriterConfig[T])*Writer[T]
Read only, narrow config NewReader(ReaderConfig[T])*Reader[T]
Same-/narrower-T' Reader over a Writer NewReaderFromWriter[T', U](writer, cfg)
Same-/narrower-T' Reader over a Store NewReaderFromStore[T', U](store, cfg)

Non-trivial Go types (decimal.Decimal, UUID wrappers, …)

parquet-go can't encode types like shopspring/decimal.Decimal or wrapper types with custom marshaling. The library takes no opinion on the translation — define a parquet-friendly shadow struct in your package and translate at the boundary:

// Domain type — used throughout your app
type Usage struct {
    InstanceID     string
    SkuID          string
    ProjectID      uuid.UUID
    Amount         decimal.Decimal
    CalculatedAt   time.Time
}

// File layout — parquet-friendly primitives only
type UsageFile struct {
    InstanceID   string    `parquet:"instance_id"`
    SkuID        string    `parquet:"sku_id"`
    ProjectID    string    `parquet:"project_id"`
    Amount       int64     `parquet:"amount,decimal(18,6)"` // scaled integer
    CalculatedAt time.Time `parquet:"calculated_at,timestamp(millisecond)"`
}

func toFile(u Usage) (UsageFile, error) {
    scaled := u.Amount.Shift(6)
    if !scaled.IsInteger() {
        return UsageFile{}, fmt.Errorf(
            "amount %s has more than 6 decimal places", u.Amount)
    }
    return UsageFile{
        InstanceID:   u.InstanceID,
        SkuID:        u.SkuID,
        ProjectID:    u.ProjectID.String(),
        Amount:       scaled.BigInt().Int64(),
        CalculatedAt: u.CalculatedAt,
    }, nil
}

func fromFile(f UsageFile) (Usage, error) {
    pid, err := uuid.Parse(f.ProjectID)
    if err != nil { return Usage{}, err }
    return Usage{
        InstanceID:   f.InstanceID,
        SkuID:        f.SkuID,
        ProjectID:    pid,
        Amount:       decimal.New(f.Amount, -6),
        CalculatedAt: f.CalculatedAt,
    }, nil
}

// Hand the library UsageFile, not Usage.
store, _ := s3store.New[UsageFile](s3store.StoreConfig[UsageFile]{ /* ... */ })

// Writes:
files := make([]UsageFile, len(usages))
for i, u := range usages {
    f, err := toFile(u); if err != nil { return err }
    files[i] = f
}
_, err := store.Write(ctx, files)

// Reads:
files, err := store.Read(ctx, []string{"..."})
usages := make([]Usage, len(files))
for i, f := range files {
    u, err := fromFile(f); if err != nil { return err }
    usages[i] = u
}

Parquet's DECIMAL(p, s) logical type is the preferred encoding for monetary values — most parquet readers (Spark, Trino, DuckDB) decode it as a real decimal so SUM(amount) just works. Use int64 backing for precision ≤ 18, [N]byte for more.

S3 layout

s3://warehouse/billing/
  data/
    charge_period=2026-03-17/
      customer=abc/
        job-2026-03-17-batch01-0190a8b4d3a87f4ab1c2d3e4f5a6b7c8.parquet
        job-2026-03-17-batch01.commit                                              ← atomic-visibility marker
  _ref/
    1710684000123456-job-2026-03-17-batch01-0190a8b4d3a87f4ab1c2d3e4f5a6b7c8;charge_period=2026-03-17%2Fcustomer=abc.ref
  • data/ holds Parquet files, partitioned Hive-style. Filenames are <token>-<UUIDv7>.parquet — under WithIdempotencyToken("job-…batch01") the token prefixes the basename; without a token, an auto-generated UUIDv7 stands in as the token (so the basename becomes <UUIDv7>-<UUIDv7>). Every attempt of a write lands on its own per-attempt path; no PUT in the write path overwrites.
  • <token>.commit siblings are zero-byte commit markers. A parquet without a paired commit is invisible to every read path.
  • _ref/ holds one empty file per ref. The filename encodes refMicroTs (writer wall-clock), the data file's <token>-<UUIDv7> id, and the URL-escaped partition key. Poll is a single S3 LIST over this prefix; per-ref HEAD on <token>.commit gates visibility.
Partition naming: column or path-only

PartitionKeyParts names can either match Parquet column names or be entirely separate — both patterns are supported.

Pattern A — partition name == Parquet column name (all the examples above). The partition key is a real record attribute, and PartitionKeyOf reads it directly from the struct. Because Write derives the path from the same record the Parquet file holds, the Hive value and the column value always agree, and external SQL engines pointed at the layout (DuckDB, Trino, Athena, …) see one consistent value either way. The one way to surface a mismatch is to call WriteWithKey with a key inconsistent with the record's fields — don't do that.

Pattern B — partition name separate from every Parquet column (classical Hive). The partition key is derived, not stored as a field:

// Parquet columns: ts, customer_id, amount — no "year"/"month".
PartitionKeyParts: []string{"year", "month"},
PartitionKeyOf: func(r Record) string {
    y, m, _ := r.Ts.Date()
    return fmt.Sprintf("year=%d/month=%02d", y, int(m))
},

Partition values live only in the path, saving storage. s3store ignores Hive paths on the typed read path — if you need year/month on records, reconstruct them from Ts. External SQL engines (DuckDB, Trino, Athena, …) still surface the Hive columns when pointed at the data path, so you can SELECT / WHERE on them from outside the library.

Pick A when the partition is a first-class attribute (customer, tenant); pick B when it's a derived time bucket. Mixing within one store is fine.

Glob grammar

The same grammar is used by every snapshot read entry point and validated once:

Pattern Accepted?
* (literal) — match everything
charge_period=2026-03-17/customer=abc — exact
charge_period=2026-03-*/customer=abc — trailing * in value
*/customer=abc — whole-segment *
charge_period=2026-03-01..2026-04-01/customer=abc — range FROM..TO
charge_period=2026-03-01../customer=abc — range, unbounded upper
charge_period=..2026-04-01/customer=abc — range, unbounded lower
charge_period=*-17/customer=abc — leading *
charge_period=2026-*-17/customer=abc — middle *
charge_period=[0-9]/customer=abc — char class
charge_period={2026,2027}/customer=abc — alternation

Truncated patterns (fewer segments than PartitionKeyParts) and mislabelled segments (part name in the wrong position) are also rejected.

Partition ranges

keyPart=FROM..TO matches any value v with FROM <= v < TO, lex order (half-open, mirroring WithUntilOffset). Either side may be empty for an unbounded end. Both sides are plain literals — no *, no ... .. alone is rejected; use * to match everything.

Ranges enable partition pruning: the read path extracts the common prefix of FROM and TO as an S3 LIST prefix so only potentially-matching keys are enumerated.

Bounds are compared lexicographically — byte-wise Go string compare. The range matches characters, not numbers or dates. Partition values must be chosen so lex order matches intent:

  • ISO-8601 timestamps (2026-03-01T00, 2026-03-01) — correct by design.
  • Zero-padded fixed-width numbers (00042, not 42) — correct.
  • Unpadded numbers are a trap: customer=10..100 validates cleanly, but lex order includes "2" (because "2" > "10" byte-wise) and excludes "42". The validator catches the fully-reversed case (42..100 is rejected as from > to) but not this subtler one. If your values are numeric, pad them or switch to a string shape that sorts correctly.

Equal endpoints match nothing. FROM..FROM is valid grammar but yields zero rows (half-open [a, a) is empty), mirroring WithUntilOffset where since == until returns no records. Use an exact segment (keyPart=FROM) if that's what you meant.

.. is reserved: the write path rejects any partition value that contains .. (otherwise a value like a..b would be unaddressable — any pattern mentioning it would be parsed as a range). Escape / reshape the value on the way in if your domain needs literal ...

Access patterns

Write
// Groups records by PartitionKeyOf, one PUT per group
results, err := store.Write(ctx, records)

// Or skip the grouping step and write a pre-grouped batch
result, err := store.WriteWithKey(ctx, "charge_period=X/customer=Y", recs)

Writes are atomic at the file level: every attempt PUTs <token>.commit last, and both read paths gate on its presence. A writer that fails or crashes before the commit lands leaves an orphan <token>-<UUIDv7>.parquet on S3, but every read path filters it out — no in-flight states leak to readers. The library never deletes the orphan; operators clean up via lifecycle rule or manual prune (see Read stability and Idempotent writes).

For retry-safe writes (orchestrator failover, crash-and-resume), see Idempotent writes.

Write fans out per-partition work in parallel, bounded by the S3Target-level MaxInflightRequests semaphore (default 32). The semaphore is acquired by every PUT/GET/HEAD/LIST inside the target, so net in-flight S3 requests stay ≤ MaxInflightRequests regardless of fan-out axis — the partition fan-out balances naturally against marker fan-out within a partition (a write touching one wide partition with many markers drains all 8 slots on those marker PUTs; a write touching 8 partitions with one marker each spreads the slots across partitions). Each partition is self-contained — on error the function cancels still-running partitions and returns the results that already committed in sorted-key order, alongside the first real error. Callers can retry the failed partitions via WriteWithKey without re-writing the ones that succeeded.

The cap is per S3Target, not library-wide. A Writer and a Reader built from the same S3Target share the cap. Two Targets do not.

The AWS SDK v2's default HTTP transport leaves MaxConnsPerHost unlimited and sets MaxIdleConnsPerHost to 100, so the library cap is what bounds parallelism for stock-configured clients — no transport tuning needed. Only if you've explicitly set a non-zero MaxConnsPerHost on your *s3.Client's transport, make sure it's ≥ MaxInflightRequests (or excess requests queue at the transport instead of running in parallel).

Error handling

The shape of (result, err) tells you what's durable, not just whether the call succeeded:

result err Meaning
non-nil nil Committed normally.
non-nil ErrCommitAfterTimeout Committed, but ref→gate-visibility elapsed exceeded CommitTimeout. Data is durable for snapshot reads; a stream reader's SettleWindow may already have advanced past refMicroTs. RestampRef surfaces the same sentinel under the same semantics — the new ref is durable, but its refMicroTs may be past the cutoff before the ref becomes LIST-visible.
nil non-nil Nothing reached commit. Per-attempt orphans (data, possibly ref) may remain on S3 — operator-cleaned, invisible to readers via the commit gate.

For Write (multi-partition), err != nil with len(results) > 0 is partial success: every entry in results is durable; the missing partitions are not. Retry only the missing ones via WriteWithKey. The err itself can be ErrCommitAfterTimeout (one of the partitions in results committed late — caller can't tell which from the err alone, and doesn't need to: all are durable) or context.Canceled (caller-cancellation aborted some partitions while others had already passed the data PUT and pushed their commit through under context.WithoutCancel — those landed durably and are in results).

ErrCommitAfterTimeout is the only exported sentinel. The library deliberately doesn't split transient vs persistent errors — a 403 may be IAM propagation or a permanent policy bug, and the classification isn't reliably knowable from one call. Validation and wiring errors (PartitionKeyOf is required, WithIdempotencyToken and WithIdempotencyTokenOf are mutually exclusive, parquet-encode type mismatches) surface immediately and don't need a sentinel to be recognized.

The recommended call-site pattern:

result, err := w.WriteWithKey(ctx, key, recs,
    s3store.WithIdempotencyToken(token))
switch {
case err == nil:
    persist(result) // committed normally

case errors.Is(err, s3store.ErrCommitAfterTimeout):
    // Data is durable. Persist the result like a success.
    // Stream consumers: optionally call w.RestampRef(ctx, result)
    // to write an additional ref with a fresh refMicroTs so the
    // next poll past now+SettleWindow picks the write up. The
    // original ref stays in place, so dedup-less stream readers
    // see the records twice — collapsed by EntityKeyOf+VersionOf
    // when configured. Same-token retry of WriteWithKey does NOT
    // recover stream observability (the upfront HEAD short-
    // circuits without writing a new ref).
    persist(result)

case errors.Is(err, context.Canceled),
     errors.Is(err, context.DeadlineExceeded):
    return err

default:
    // Transient PUT failure. With WithIdempotencyToken the retry's
    // upfront HEAD finds the prior commit if it actually landed
    // and returns its WriteResult unchanged; otherwise a fresh
    // attempt-id is used. Without the token, a retry produces a
    // duplicate parquet (correctness-safe via the commit gate,
    // but accumulates as orphans).
    return retryLater(token, recs)
}

The single most useful thing to do is always pass WithIdempotencyToken: it collapses every "did it actually commit?" branch on retry to one upfront HEAD, regardless of which step failed.

RestampRef itself can return ErrCommitAfterTimeout when the ref PUT is slow. The recovery is the same primitive again: each call writes a fresh ref at a new refMicroTs, so a retry whose PUT happens to land within CommitTimeout restores stream observability. Pass the original WriteResult (not the timed-out restamp's) — RestampRef only consumes (token, attemptID, hiveKey, DataPath, InsertedAt, RowCount) from prior, all of which are identical between the original and any restamp.

const maxRestamps = 4
for attempt := 0; attempt < maxRestamps; attempt++ {
    _, rerr := w.RestampRef(ctx, original)
    if rerr == nil {
        break // restamp landed inside the SettleWindow
    }
    if !errors.Is(rerr, s3store.ErrCommitAfterTimeout) {
        return rerr // genuine PUT failure — no ref landed
    }
    // Each ErrCommitAfterTimeout means a ref DID land but past
    // CommitTimeout; backoff and try again. Earlier refs stay on
    // S3 and contribute duplicates that EntityKeyOf+VersionOf
    // dedup collapses.
    time.Sleep(backoff(attempt))
}

Two things worth knowing:

  1. Each retry adds a stream emission. Without EntityKeyOf+VersionOf, a consumer past the original offset sees one record per landed ref. The cost of optimistic retry is paid in dedup work.
  2. Repeated ErrCommitAfterTimeout from both WriteWithKey and RestampRef is a signal, not noise. Both errors share a root cause (backend slower than CommitTimeout for ref→gate-visibility). After exhausting retries, the data is still durable for snapshot reads; the fix for stream observability is config (raise CommitTimeout + matching SettleWindow) or capacity, not more retries.
Stream — refs only
entries, newOffset, err := store.Poll(ctx, lastOffset, 100)

Each StreamEntry carries the ref's Offset (opaque cursor, pass back as since on the next call), the Key (partition key as written), and the DataPath (S3 key of the parquet file). No GETs are issued — the entire batch is one S3 LIST call over _ref/.

Stream — typed records
// When dedup is configured, the default is latest-per-key
// within each batch (Kafka compacted-topic semantics).
records, newOffset, err := store.PollRecords(ctx, lastOffset, 100)

// Opt out: every record in every referenced file, in ref order.
records, newOffset, err = store.PollRecords(ctx, lastOffset, 100,
    s3store.WithHistory())

Dedup runs on every read path when EntityKeyOf AND VersionOf are both set on the Reader (New / NewReader reject partial config). When neither is set, every record passes through in decode order. WithHistory() forces the no-dedup path explicitly.

Tie-break on equal max version (default dedup, no WithHistory). When two writes share the same version for the same entity key — common on an at-least-once retry that replays a batch with the same domain timestamp — the lex-later filename wins. Input is sorted by (entityKey, versionOf) stable on ties; files feed in lex order, so within a tied group the lex- later file's record is the last one and dedupLatest's pending advances onto it.

For the auto-token path the basename is <UUIDv7>-<UUIDv7> — UUIDv7 sorts by embedded millisecond time, so lex-later = wrote-later in practice. With WithIdempotencyToken the filename starts with the caller's token; the tie-break follows the token's lex order rather than wall-clock order. Use a time-sortable token format (e.g. {ISO-timestamp}-{suffix}) if you rely on chronological tie-breaking. Stable across repeated reads either way. To make ties impossible, ensure VersionOf strictly increases per write.

Replicas under WithHistory. Records that share (entity, version) describe the same logical write — a retry, zombie, or cross-node race — and are by definition equivalent. Both backends collapse them to one record; which physical instance is yielded is implementation-defined because the contract treats them as equivalent. If your writer can produce records that share (entity, version) but differ in content, that's a writer-side data quality issue and dedup can't paper over it.

Per-record "when was this inserted"

If a consumer needs the write time of every record, set StoreConfig.InsertedAtField to the name of a time.Time field on T. The field must carry a non-empty, non-"-" parquet tag — the writer populates it with its wall-clock time at write-start and persists the value as a real parquet column, so both read paths see the exact same timestamp on disk.

type Event struct {
    Customer   string    `parquet:"customer"`
    Amount     float64   `parquet:"amount"`
    InsertedAt time.Time `parquet:"inserted_at"` // writer populates this column
}

s3store.StoreConfig[Event]{
    // ...
    InsertedAtField: "InsertedAt",
}

recs, _ := store.Read(ctx, []string{"*"})
// recs[i].InsertedAt is the writer's wall-clock at write-start.

Works on every read path (Read, ReadIter, ReadPartitionIter, ReadRangeIter, ReadPartitionRangeIter, ReadEntriesIter, ReadPartitionEntriesIter, PollRecords) — the column round-trips like any other parquet field. The reader has no special handling. Zero reflection cost when unset.

To use the writer-stamped time as the dedup version, reference the field from VersionOf:

VersionOf: func(r Event) int64 { return r.InsertedAt.UnixMicro() }
Stream — time window

To read only records written within a time window (e.g. "yesterday's activity"), use ReadRangeIter with time.Time bounds. The range is half-open [since, until), matching Kafka offset semantics.

ReadRangeIter returns an iter.Seq2[T, error] backed by the same streaming pipeline that powers ReadIter — partition prefetch (WithReadAheadPartitions, default 1 = one partition lookahead), byte-budget streaming (WithReadAheadBytes, default 0 = uncapped), cross-file pipelining, per-partition dedup. Memory is bounded by whichever cap binds first:

// All records written on 2026-04-17 (UTC).
start := time.Date(2026, 4, 17, 0, 0, 0, 0, time.UTC)
end   := time.Date(2026, 4, 18, 0, 0, 0, 0, time.UTC)
for r, err := range store.ReadRangeIter(ctx, start, end) {
    if err != nil { return err }
    // process r
}

Breaking out of the range loop cleanly cancels in-flight downloads. The ref LIST walk runs upfront before the first record yields — sub-100ms for typical windows, but for huge backfill windows chunk via since/until to stream incrementally instead of walking everything first. If you need to checkpoint offsets between batches (resume on cancel), use PollRecords (Kafka-style batched API) — ReadRangeIter does not surface per-batch offsets.

Half-open boundary semantics:

  • A record written at 2026-04-17 23:59:59.999999included (offset < end).
  • A record written exactly at 2026-04-18 00:00:00.000000excluded (that instant belongs to the next window).
  • A record written exactly at 2026-04-17 00:00:00.000000included.

So to cover a full day, end is the start of the next day.

Timezone: ReadRangeIter compares in UTC microseconds internally (bounds are encoded from time.UnixMicro()). time.Date(..., time.UTC) gives UTC-day boundaries; time.Date(..., loc) gives local-day boundaries — both work, as long as since and until use the same timezone.

Pass time.Time{} (zero value) for since to start at the stream head, or for until to walk to the live tip (settle-window cutoff as of the call). The upper bound is snapshotted at call entry so the walk terminates even under sustained writes — writes landing after the call started are not picked up. To keep up with new writes, use PollRecords and checkpoint between batches.

For CDC / change-processing where every update matters and the caller checkpoints offsets between batches, use PollRecords (cursor-based, Kafka-style):

start := lastCheckpoint
for {
    records, next, err := store.PollRecords(ctx, start, 100,
        s3store.WithUntilOffset(store.OffsetAt(end)))
    if err != nil { return err }
    if len(records) == 0 { break }
    // process every record (no version collapse — see dedup below)
    start = next
    checkpoint(start)
}

Dedup asymmetry between the two stream APIs:

  • ReadRangeIter (range-bounded, snapshot-style): default latest-per-entity per partition, matching Read / ReadIter. You're reading "the state of this window"; collapsing to the latest version is what you want.
  • PollRecords (cursor-based, CDC-style): replica-dedup only — every distinct (entity, version) flows through. Per-batch latest-per-entity is meaningless on a cursor (the next batch may carry a newer version of the same entity), so it's not offered. WithHistory is accepted but is the default here.

OffsetAt is pure computation — no S3 call — and bridges time.Time to the offset cursor used by Poll / PollRecords / WithUntilOffset.

Snapshot
records, err := store.Read(ctx, []string{"charge_period=2026-03-17/customer=abc"})

Returns every record matching the glob, decoded directly into []T via parquet-go and the parquet tags on T. When dedup is configured (see Stream above), the result is the latest version per key; otherwise every version comes through.

Snapshot — streaming (ReadIter)

Read buffers the full result set before returning. For month-scale or otherwise unbounded ranges that's a memory problem; use ReadIter instead — it yields records one at a time via Go 1.23's iter.Seq2[T, error]:

for r, err := range store.ReadIter(ctx, []string{"charge_period=2026-03-*/*"}) {
    if err != nil { return err }
    aggregate(r)            // fold into an aggregation map and forget r
}
// breaking out of the loop cancels in-flight downloads — no Close

Available on both Store[T] and the underlying Reader[T].

Memory profile: O(one partition's records). The pipeline processes one partition at a time: files inside the partition download in parallel, the decoded batch is sort+dedup'd (latest-per-entity by default; replica-only on WithHistory()), records are yielded in (entity, version) ascending order, then the batch is dropped before the next partition starts. With EntityKeyOf unset the sort is skipped entirely and records emit in decode order (file lex order, then parquet row order). Month-scale reads go from O(month) to O(partition) peak memory, which is usually small enough for hourly/daily partitioning. If a single partition is large enough that even one pre-dedup batch is a problem, file an issue — we can follow up with a streaming fold that trades the code simplicity for peak memory proportional to unique entities rather than total records.

Pipeline shape: downloads are continuous and not partition-bound. A worker pool of MaxInflightRequests goroutines pulls files in partition+file order; cross-partition lookahead happens automatically. A separate decoder walks partitions in lex order — for each, it waits until all files are downloaded, parses each parquet footer to compute the partition's exact uncompressed size, gates on the budget, decodes, and pushes to the yield loop.

Two budget knobs, evaluated together — whichever binds first holds the producer back:

  • WithReadAheadPartitions(n): at most n decoded partitions sit buffered ahead of the yield position. Default 1 — one-partition lookahead so decode of N+1 overlaps yield of N. Values < 1 are floored to 1 (strict-serial decode is no longer offered as a public mode; use the byte cap for tighter bounds).
  • WithReadAheadBytes(n int64): at most n uncompressed bytes (read from each parquet file's footer — exact, not a heuristic) sit decoded in the buffer. Default 0 disables the byte cap.
// Many small partitions: prefetch generously by count.
for r, err := range store.ReadIter(ctx, []string{"*"},
    s3store.WithReadAheadPartitions(8)) { ... }

// Skewed partition sizes (mostly tiny + a few large): cap by bytes
// so prefetch self-throttles on the large ones. Decoded Go memory
// typically runs 1–2× the uncompressed size depending on data
// shape (string headers, slice/map pointer overhead).
for r, err := range store.ReadIter(ctx, []string{"*"},
    s3store.WithReadAheadBytes(2<<30)) { ... } // ≤ 2 GiB

// Combine — useful when both axes matter.
for r, err := range store.ReadIter(ctx, []string{"*"},
    s3store.WithReadAheadPartitions(8),
    s3store.WithReadAheadBytes(2<<30)) { ... }

Trade-off: peak memory ≈ WithReadAheadBytes (decoded uncompressed bytes) + at most max(MaxInflightRequests, largest_partition_file_count) compressed parquet bodies waiting to be decoded + one in-flight decode. Downloaders apply back-pressure when the compressed-body pool is full so the "downloaded but not yet decoded" backlog can't grow without bound; the floor at largest_partition_file_count guarantees the largest partition can always be downloaded in full (the decoder needs every file before the partition can be decoded). Speed-up is bounded by how much of your per-partition yield time would otherwise sit idle waiting for downloads.

Single oversized partition: if one partition's uncompressed total exceeds WithReadAheadBytes on its own, that partition still flows (the cap can't bind below partition granularity without row-group-level streaming). The cap only prevents additional partitions from joining the buffer.

Per-partition dedup contract — uniform across every read path: Read / ReadIter / ReadPartitionIter / ReadRangeIter / ReadPartitionRangeIter / ReadEntriesIter / ReadPartitionEntriesIter / PollRecords all dedup within one Hive partition at a time. Correct only when the partition key strictly determines every component of EntityKeyOf — i.e. no entity ever spans two partitions. For layouts where entities can move between partitions over time (e.g. a customer that switches region), pass WithHistory() and dedup yourself.

For typical time-series shapes (charge_period_start leads both PartitionKeyParts and the entity key) the contract holds. There is no global-dedup escape hatch: every snapshot read returns the same records (the per-partition pipeline is the single decode path), and peak memory stays bounded by WithReadAheadBytes / WithReadAheadPartitions.

Order: ReadIter visits partitions in lex order and downloads files within a partition in lex order. Within a partition the user- visible emission order is (entity, version) ascending when EntityKeyOf is set; when no dedup is configured the sort is skipped entirely and records emit in decode order (file lex order, then parquet row order).

Idempotent writes

s3store defaults to at-least-once on the write path: a retry after a partial failure re-runs the whole write and produces duplicate data, markers, and refs under fresh keys. For workloads where retries are common (orchestrator failover, network flakiness, crash-and-resume), WithIdempotencyToken makes the write retry-safe end-to-end.

const token = "job-2026-04-22T10:15:00Z-batch42"

_, err := store.WriteWithKey(ctx, key, records,
    s3store.WithIdempotencyToken(token))

On retry with the same token:

  • Per-attempt data path — every attempt lands at a fresh {token}-{UUIDv7}.parquet. Data and ref PUTs never overwrite by construction — sidesteps multi-site StorageGRID's eventual-consistency exposure on data-key overwrites; uniform behaviour across AWS S3, MinIO, and StorageGRID at any consistency level.
  • Upfront-HEAD dedup gate — before generating a fresh attempt-id, the writer issues a single HEAD on <dataPath>/<partition>/<token>.commit. 200 → reconstruct the prior WriteResult from the marker's user-metadata (attemptid, refmicrots, insertedat) and return without re-uploading anything. 404 → proceed with a fresh attempt-id end-to-end. The auto-token (no WithIdempotencyToken) path skips the HEAD: a freshly generated UUIDv7 is guaranteed to 404 by construction.
  • Recovery from arbitrarily long outages. Same token works forever: retries across S3 outages, orchestrator restarts, process crashes — all converge automatically. There's no ErrCommitWindowExpired, no generation-suffix juggling on the orchestrator side.

Cost: per-attempt orphans on failure. Every failed attempt under WithIdempotencyToken may leave a data file + ref on S3 as orphans (any attempt that crashed before the commit PUT). Reader paths filter them out — both snapshot and stream reads gate on the <token>.commit marker. The deferred operator-driven sweeper reclaims them. A retry storm with sane orchestrator backoff produces orphans bounded by (retry rate) × (parquet size) per token.

Per-partition tokens

When a single Write call spans several partitions and each partition has its own logical-write identifier (one outbox row per partition, distinct upstream batch IDs, etc.), use WithIdempotencyTokenOf instead of WithIdempotencyToken. The closure runs once per partition with the partition's records in hand and returns the token to use for that partition:

_, err := store.Write(ctx, records,
    s3store.WithIdempotencyTokenOf(func(part []Rec) (string, error) {
        // The partition is non-empty by construction (Write
        // groups by PartitionKeyOf and skips empty groups).
        // part[0] is sufficient for any per-partition derivation.
        return outboxRowID(part[0]), nil
    }))

Each partition's commit marker, upfront HEAD, and per-attempt id use the per-partition token; retries dedup independently per partition. Mutually exclusive with WithIdempotencyToken — combining them surfaces an error at option-resolution time. A non-nil error from the closure aborts that partition's write (under multi-partition fan-out, sibling partitions whose closure succeeded may still commit — partial-success contract). Tokens returned from the closure are validated per partition via the same rules as the static option (non-empty, no /, no .., printable ASCII, ≤200 chars).

Optimistic commit (skip the upfront HEAD)

Every idempotent write does an upfront HEAD on <token>.commit to detect a same-token retry. For workloads near the S3 request- rate ceiling — typically large per-call partition fan-outs at high frequency — that HEAD is the per-write cost worth optimising away. Pass WithOptimisticCommit() to swap the upfront HEAD for an on-collision HEAD: the writer skips the HEAD on the fresh path and detects a prior commit by the conditional commit PUT itself failing.

store.Write(ctx, records,
    s3store.WithIdempotencyToken(token),
    s3store.WithOptimisticCommit())

Mechanism. The token-commit PUT carries If-None-Match: * (modern S3 conditional-write semantics). When a prior commit exists:

  • Backends supporting conditional PUT (AWS S3 since November 2024, recent MinIO, StorageGRID 12.0+) return 412 Precondition Failed.
  • Backends behind a bucket policy denying s3:PutOverwriteObject on the commit subtree (older StorageGRID) return 403 Access Denied.

The writer recognises both, runs one HEAD on <token>.commit to recover the canonical attempt's metadata, and returns the prior commit's WriteResult unchanged — same DataPath, RefPath, Offset, InsertedAt, RowCount the caller would have got from the legacy upfront-HEAD path.

Trade-off. The fresh path saves one HEAD per write (5–15ms on AWS, ~20% of the per-Write request budget for a 3-PUT marker-less write). The retry-found-prior path leaves an orphan parquet + ref behind for every same-token retry — the data and ref PUTs from this attempt never get garbage- collected unless the operator runs a cleanup. Both orphans are invisible to readers via the commit gate (their attempt-id is not the canonical one named in <token>.commit metadata).

The break-even sits around a 5% retry-found-prior rate. Below that, the HEAD savings dominate; above that, the orphan and bandwidth cost outweighs the savings. Healthy CDC pipelines retry well under 1%, so the option is a clear win for high- throughput workloads near the request-rate ceiling. Bulk migrations with frequent orchestrator restarts (or any workload where retries are routine) probably don't benefit.

The companion metric is s3store.write.optimistic_commit.collisions — a counter incremented on each on-collision recovery. Chart it against s3store.method.calls{method="write"} to see your collision rate in production and confirm the option is paying off.

Mutual compatibility. WithOptimisticCommit composes with WithIdempotencyToken and WithIdempotencyTokenOf. It has no effect on auto-token writes (no caller-supplied token): the upfront HEAD is already skipped there because a freshly-generated UUIDv7 is guaranteed to 404 by construction.

Backend setup

The mechanism is portable across backends, but each one needs support for some form of "fail this PUT if the object already exists." Pick whichever path your backend supports.

AWS S3 / MinIO / StorageGRID 12.0+: conditional PUT (preferred). No setup. The library sends If-None-Match: * on every <token>.commit PUT under WithOptimisticCommit. Modern S3 APIs honour the precondition atomically server-side, returning 412 when the object exists. Verify your backend version supports it before enabling the option in production.

StorageGRID (legacy versions): bucket policy denying s3:PutOverwriteObject on the commit subtree. This works by rejecting any PUT that would overwrite an existing object whose key matches <prefix>/data/*/*.commit. Concurrent same-token writes (out of contract) are unaffected — the deny is post- completion, not a mutex — but for sequential retries the second PUT lands as 403 AccessDenied, which the library recognises and routes to the same recovery path.

The boto3 snippet below installs the deny on a single <prefix> (run it once per prefix you intend to use with WithOptimisticCommit against a deny-policy backend):

import json

import boto3

s3 = boto3.client("s3", endpoint_url="https://storagegrid.example.com")

bucket = "my-bucket"
prefix = "my-prefix"  # matches StoreConfig.Prefix

policy = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "DenyOverwriteOfTokenCommit",
            "Effect": "Deny",
            "Principal": "*",
            "Action": "s3:PutOverwriteObject",
            "Resource": (
                f"arn:aws:s3:::{bucket}/{prefix}/data/*/*.commit"
            ),
        }
    ],
}

s3.put_bucket_policy(
    Bucket=bucket,
    Policy=json.dumps(policy),
)

The deny scopes to <prefix>/data/*/*.commit — only the commit markers are protected. Data files (<token>-<UUIDv7>.parquet) and refs land under fresh per-attempt paths and never overwrite by construction, so they don't need the deny. Markers under <prefix>/_matview/... are intentionally overwriteable (byte-equivalent zero-byte content), so excluding them keeps matview writes working.

If you have multiple Prefix values on the same bucket, repeat the Statement entry per prefix or widen the Resource glob accordingly.

Backends without either mechanism. If the conditional PUT is silently ignored and no deny policy is in place, the library cannot detect a prior commit on the optimistic path. The second commit PUT would silently overwrite, and the caller would receive the new attempt's WriteResult instead of the prior commit's — breaking the same-token-retry-returns-same-result contract. The library does not detect this at construction time. Verify backend support before enabling WithOptimisticCommit.

Idempotency and reader dedup are complementary

Reader dedup (EntityKeyOf + VersionOf) and idempotency tokens cover different concerns; pick by what your data and your pipeline give you.

Reader dedup is primarily latest-version selection. When the same entity is written multiple times with different VersionOf values, dedup collapses to the latest version per entity. This is its main job, and tokens don't replace it.

Tokens collapse physical replicas of the same logical record. Sequential retries short-circuit at the upfront HEAD on <token>.commit. Out-of-contract near-concurrent retries are still absorbed at read time — the commit's attemptid arbitrates one canonical attempt; both read paths filter LIST/refs down to it. The reader sees one set of rows, not two. This works only under token stability: the same logical record must always derive the same token, across retries, restarts, and process boundaries. (On multi-site StorageGRID below strong-global the canonical-attempt choice can differ transiently across sites during overwrite propagation, but deterministic parquet encoding makes both attempts' records byte-equivalent — undetectable to the consumer.)

Reader dedup also handles replica collapse when token stability isn't achievable — token store lost, derivation differs across processes/replays, multiple writers emit the same logical record under independent tokens. No cross-token arbitration exists, so EntityKeyOf + VersionOf becomes the only collapse point.

Config Storage layer Consumer layer
No token, no dedup at-least-once at-least-once
No token, dedup configured at-least-once exactly-once (latest version per entity)
Stable token + dedup at-least-once (per-attempt orphans on failure) exactly-once (latest version per entity, across retry sessions)
Stable token alone at-least-once (per-attempt orphans) exactly-once per logical record; versions not collapsed

Recommendation: enable reader dedup whenever the data has versions to collapse (the typical case). Tokens are orthogonal — they make retries cheap and absorb replica duplication when token stability holds; reader dedup picks up both replica and version collapse when stability doesn't.

Cross-backend uniformity

Per-attempt-paths for data+refs plus the upfront-HEAD on <token>.commit produce the same retry semantics on every supported backend:

  • AWS S3 / MinIO — read-after-new-write is strongly consistent out of the box. The upfront HEAD always observes any prior commit; data and ref PUTs always observe what was just written.
  • NetApp StorageGRIDConsistencyControl defaults to strong-global (the safe multi-site choice). Single-site deployments can downgrade to strong-site explicitly when the per-call cost matters; see STORAGEGRID.md for the topology decision matrix. No s3:PutOverwriteObject bucket policy required — data and ref PUTs use per-attempt paths; the <token>.commit overwrite that arises only on declared-out- of-contract concurrent same-token retries is byte-equivalent record-wise (deterministic parquet encoding) regardless of which attempt's metadata wins.
Probing a token without writing (LookupCommit)

Writer.LookupCommit(ctx, partition, token) returns the prior WriteResult if <token>.commit already exists in the partition, or (WriteResult{}, false, nil) when it doesn't. A single HEAD against the marker — same primitive the write path uses for upfront-dedup, exposed for orchestrators that need to ask "did this logical step already commit?" without going through Write:

wr, ok, err := store.LookupCommit(ctx, partitionKey, token)
if err != nil { return err }
if ok {
    // the prior attempt of this token committed; skip the work
    return wr, nil
}
// proceed with the write…

Useful when retries are driven by an external orchestrator that already knows the partition key and the token, and wants to short-circuit before re-encoding parquet.

Zombie writers and orchestrators

The library does not enforce single-writer-per-partition — that's a caller invariant. Two cases:

  • Same token. Sequential retries collapse via the upfront HEAD on <token>.commit. Concurrent writers (out of contract per Concurrency contract) both miss the prior commit on their upfront HEAD and both write their own per-attempt data + ref + commit; reader dedup via (entity, version) collapses the duplicate records.
  • Different tokens. Two distinct per-attempt triples (different paths, different refs); reader dedup via (entity, version) collapses them at read time.

For orchestrator-driven jobs: reuse the same token across failovers (persist it in your job state). A fresh token per restart is valid but creates real storage duplication.

Alternative: external outbox (Postgres / similar)

If you already run a transactional database alongside your writers, an outbox pattern often composes better than s3store's ref stream:

  1. On every successful write, INSERT into an outbox table with columns (token, partition_key, data_path, created_at) and a monotonic id.
  2. Consumers read by id.
  3. Unique constraint on token makes zombie/retry writes visible as constraint violations — Postgres is the authoritative dedup.

This moves the dedup primitive off S3, so on StorageGRID you can downgrade ConsistencyControl from the default strong-global to read-after-new-write (ConsistencyDefault) and still get exactly-once at the consumer layer — the outbox absorbs every storage-layer race. Any rare storage-layer duplicate from a weak-consistency race becomes a "ghost" file that isn't referenced by an outbox row — wasted bytes, not a visible duplicate. The s3store ref stream still gets written alongside the outbox; consumers just ignore it.

s3store does not ship this pattern; document it as a valid alternative for callers who already have the transactional infrastructure.

Schema evolution

Reads tolerate missing columns out of the box: a field whose column isn't in a given parquet file lands as Go's zero value, never an error. That covers the common "added a column" case without any extra configuration. parquet-go matches columns to struct fields by the parquet: tag, so column order in the file doesn't matter and unknown columns are ignored.

Renames, splits, and row-level computed derivations still require a migration tool — rewrite the affected files with the new shape.

Materialized views

When a query filters on a column that isn't a partition key (e.g. "list every customer that had usage of SKU X in period P"), scanning every data file is prohibitive at scale. A materialized view solves this by writing one empty S3 marker per distinct tuple of the columns you want to query. The query is a single LIST under the marker prefix — zero parquet reads.

Shape

A materialized view has two halves that are wired separately:

  • Write side (MaterializedViewDef[T]) lives in StoreConfig.MaterializedViews / WriterConfig.MaterializedViews. Of returns the per-record column values as a []string aligned to Columns (positional, in declared order). Of is optional — nil means the library reflects T's parquet tags + Columns once at NewWriter and emits markers without any caller code.
  • Read side (MaterializedViewLookupDef[K]) is consumed by s3store.NewMaterializedViewReader(target, lookupDef) to build a typed MaterializedViewReader[K] query handle. From projects each marker back into K. Same convention: nil means reflect K's parquet tags against Columns; a non-nil custom From overrides.
// Usage carries parquet tags for the partition columns:
//   SKUID             string    `parquet:"sku_id"`
//   CausingCustomer   string    `parquet:"causing_customer"`
//   ChargePeriodStart time.Time `parquet:"charge_period_start"`
//   ChargePeriodEnd   time.Time `parquet:"charge_period_end"`

cfg.MaterializedViews = []s3store.MaterializedViewDef[Usage]{{
    Name: "sku_period_idx",
    Columns: []string{
        "sku_id", "charge_period_start",
        "causing_customer", "charge_period_end",
    },
    // String fields auto-project. The two time.Time columns
    // share one Layout.Time format — RFC3339 here. No Of needed.
    Layout: s3store.Layout{Time: time.RFC3339},
}}

store, _ := s3store.New[Usage](cfg)

// Build the typed query handle. From is nil, so the library
// reflects SkuPeriodEntry's parquet tags to project each marker
// back into a typed struct. K can carry time.Time fields directly
// — Layout.Time on the read side mirrors the write side, parsing
// path segments back into time.Time via time.Parse.
type SkuPeriodEntry struct {
    SKUID             string    `parquet:"sku_id"`
    ChargePeriodStart time.Time `parquet:"charge_period_start"`
    CausingCustomer   string    `parquet:"causing_customer"`
    ChargePeriodEnd   time.Time `parquet:"charge_period_end"`
}

skuIdx, _ := s3store.NewMaterializedViewReader(store.Target(),
    s3store.MaterializedViewLookupDef[SkuPeriodEntry]{
        Name: "sku_period_idx",
        Columns: []string{
            "sku_id", "charge_period_start",
            "causing_customer", "charge_period_end",
        },
        Layout: s3store.Layout{Time: time.RFC3339}, // must match write-side
    })

When every Columns entry is already a parquet:"..."-tagged string field on T, both Of and Layout can be left zero:

cfg.MaterializedViews = []s3store.MaterializedViewDef[Usage]{{
    Name:    "sku_customer_idx",
    Columns: []string{"sku_id", "causing_customer"},
    // Of: nil — library auto-projects from Usage's parquet tags.
}}

When time.Time columns need different layouts per column, or column values come from somewhere other than parquet-tagged fields on T, fall back to writing Of explicitly:

cfg.MaterializedViews = []s3store.MaterializedViewDef[Usage]{{
    Name:    "mixed_idx",
    Columns: []string{"sku_id", "month", "at"},
    Of: func(u Usage) ([]string, error) {
        return []string{
            u.SKUID,
            u.ChargePeriodStart.Format("2006-01"),    // month bucketing
            u.ChargePeriodStart.Format(time.RFC3339), // full timestamp
        }, nil
    },
}}

Materialized views are wired at construction time, so registration cannot race with Write and "registered after the first Write" is not a reachable state. Use BackfillMaterializedView (below) to retroactively cover records written before a view existed.

MaterializedViewReader[K] is T-free — a read-only service can build the handle without depending on the writer's record type.

Every Write call iterates each registered view, collects a deduplicated set of marker paths across the batch, and PUTs one empty marker per distinct path under <Prefix>/_matview/<name>/<col>=<val>/.../m.matview. Duplicate writes are idempotent (same S3 key, same empty body).

Of and From: positional, aligned to Columns

Of returns []string and From takes []string, both positional to Columns. Two consequences:

  • Ordering discipline. A custom Of/From that disagrees with Columns (e.g. swapping the order of two values) writes the wrong markers — silently. The library checks length but not semantics. Refactor Columns and the function together, or rely on the auto-projection (nil) which can't get order wrong.
  • No helper needed for the default case. With nil Of / nil From, the library does the reflection itself. Custom closures only when transformation is needed (formatted timestamps on write, post-processing on read) or when K has no parquet tags.
Lookup
hits, err := skuIdx.Lookup(ctx, []string{
    "sku_id=SKU-123/charge_period_start=2026-03-01..2026-04-01/"+
    "causing_customer=*/charge_period_end=*",
})
// hits []SkuPeriodEntry

The pattern grammar is the same one Read accepts (exact, trailing-*, whole-segment *, FROM..TO range). Results are unbounded — narrow the pattern if a view has millions of matches.

Multi-pattern reads

The single-pattern grammar is Cartesian per segment — period=* combined with customer=abc matches the cross product of all periods × abc. When the caller has an arbitrary set of tuples (e.g. (period=2026-03, customer=abc), (period=2026-04, customer=def) but not the off-diagonal combinations), pass them as additional elements of the same []string patterns slice. Every read entry point already takes []string:

Every read entry point that takes patterns — Read, ReadIter, ReadPartitionIter, MaterializedViewReader.Lookup, BackfillMaterializedView — accepts the same []string shape:

// Read across non-Cartesian tuples.
recs, _ := store.Read(ctx, []string{
    "period=2026-03-17/customer=abc",
    "period=2026-03-18/customer=def",
})

// Materialized-view lookup over an arbitrary set of (col, col) tuples.
entries, _ := idx.Lookup(ctx, []string{
    "sku=s1/customer=abc",
    "sku=s4/customer=def",
})

// One-off migration across several partitions.
stats, _ := s3store.BackfillMaterializedView(ctx, target, def,
    []string{"period=2026-03-*/customer=*", "period=2026-04-01/customer=*"},
    until, nil)

Execution model: LIST calls fan out across patterns with the Target's MaxInflightRequests cap, overlapping patterns are deduplicated at the key level, the GET+decode pool runs over the unioned set, and dedup (if configured) applies per Hive partition — an entity is kept as the latest version within its partition. Under the per-partition dedup precondition (EntityKeyOf fully determined by the partition key), an entity never spans two partitions, so multi-pattern unions still surface one latest pick per entity.

A single-element slice is the common case; the multi-pattern API simply lets the caller add more when they need a non-Cartesian set.

What's in scope for v1
  • Wire materialized views via StoreConfig.MaterializedViews; auto-write on Write + Lookup via the typed MaterializedViewReader[K] handle.
  • Read-after-write on Lookup: the marker PUT and the marker LIST both inherit ConsistencyControl from the shared S3Target, so a Lookup issued immediately after Write sees the new marker without a settle delay. On AWS S3 / MinIO that's native; on StorageGRID, set the level once on the target and it flows to marker PUT + LIST automatically (see STORAGEGRID.md).
  • Backfill as a standalone package function (below).
Backfill

The normal path is to wire a materialized view into StoreConfig.MaterializedViews before the first Write so every record produces markers. When that isn't possible — adding a view to a store that already has data — the typical shape is:

  1. Deploy the live app with the view in StoreConfig.MaterializedViews; every new Write emits markers from time T0 onward.
  2. Capture until := T0 — the watermark before which historical data is uncovered.
  3. Run a one-off migration job using the package-level s3store.BackfillMaterializedView that scans files with LastModified < until and PUTs the retroactive markers.
stats, err := s3store.BackfillMaterializedView(ctx,
    store.Target(),     // or construct via s3store.NewS3Target
    def,                // the same MaterializedViewDef the live app registered
    []string{"*"},      // patterns (PartitionKeyParts grammar)
    until,              // exclusive upper bound on LastModified (time.Time)
    func(path string) { slog.Warn("missing data", "path", path) },
)
// stats.DataObjects / Records / Markers

BackfillMaterializedView is deliberately standalone — no Writer / Reader argument — so the migration job doesn't need the live app's full config. It runs through the same S3Target abstraction used by the read-side NewMaterializedViewReader, issuing both parquet GETs and marker PUTs via target.S3Client. Typically invoked from a dedicated binary (cmd/backfill-<name>/main.go in your repo): a ~30-line main that builds an S3 client + S3Target + the same MaterializedViewDef[T] the live app uses, then calls BackfillMaterializedView.

The pattern is evaluated against the target's PartitionKeyParts (same grammar as Read), not against the view's Columns — backfill walks parquet data files, which are keyed by partition. A migration job can shard itself by partition (period=2026-01-* on one pod, period=2026-02-* on another) instead of running a single multi-hour call. The until bound lets the live writer and the migration job cooperate without overlap: live markers for everything from T0, backfill markers for everything before. Pass time.Time{} (the zero value) to disable the bound (covers every file currently present — harmless but redundant if the live writer has been up for a while, since PUT is idempotent).

Idempotent at the S3 level (same empty marker, same key), so a retry after cancel or crash is a no-op on work already done. Safe to run while the live writer keeps emitting markers for fresh records.

Backfilled markers are subject to the same read-after-write contract as live-write markers: on a strong-consistent backend (AWS, MinIO, or StorageGRID with ConsistencyControl set on the target) a Lookup issued after BackfillMaterializedView returns sees every marker it just wrote.

Not in v1 (deferred)
  • Delete materialized view — no general delete path on the store yet.
  • Verification / orphan cleanup tools — if Of changes semantically, stale markers remain. Backfill only adds missing markers; rebuild (delete-then-re-PUT) is a separate design.
Column ordering matters (for performance, not correctness)

Put columns you typically filter on first. They form the S3 LIST prefix, so a query that specifies them literally narrows the LIST. Trailing columns are always parsed out of the marker filename — correct but slower when there's nothing to prune on.

Column values are strings on disk

Column values land on disk as hive-encoded strings — Of returns []string, marker keys are strings, and the default binder accepts only string and time.Time fields on T / K. String fields project directly. time.Time fields are formatted (write) and parsed (read) via Layout.Time, which must be set on both sides to the same layout.

For other types (numbers, bools, custom enums), format them in a custom Of and post-process in a custom From — same way PartitionKeyOf already does for data paths. Keeps the read path a pure round-trip.

Compression

Write uses snappy by default — same as Spark, DuckDB's parquet writer, Trino, and Athena emit out of the box. Snappy cuts file size 2-3× on typical data with negligible CPU cost. Change via StoreConfig.Compression:

s3store.StoreConfig[T]{
    // ...
    Compression: s3store.CompressionZstd,
}

Accepted values:

  • CompressionSnappy — fast, default.
  • CompressionZstd — better ratio, higher CPU; good for cold / archive data.
  • CompressionGzip — legacy, moderate CPU and ratio.
  • CompressionUncompressed — no compression; only if CPU cost dwarfs S3 cost.

Zero value (empty string) resolves to snappy, so leaving the field unset is safe. New() rejects any other string.

parquet-go auto-detects the codec on read, so switching compression per Write doesn't require any read-side config. External engines (DuckDB, Spark, Trino) do the same.

Settle window

Refs on S3 (chronological):
... 1000 1001 1002 1003 1004 1005 1006 1007
                                 ↑
                          now - SettleWindow
                              ──────→ don't read yet

Refs don't appear in LIST in refMicroTs order. refMicroTs is captured by the writer with time.Now() before the ref PUT issues, but the ref isn't LIST-visible until the PUT is acknowledged — and the PUT's duration varies per writer (per request, per network blip, per retry). Two concurrent writers that capture refMicroTs 10 ms apart can land in LIST in the opposite order if their PUTs took 50 ms vs 200 ms. This is not a consistency-model artifact; it happens on every backend including AWS S3, MinIO, and StorageGRID at strong-global. So Poll reads up to now - SettleWindow to give the slowest in-flight PUT enough time to land before its refMicroTs falls inside the cutoff.

SettleWindow = CommitTimeout + MaxClockSkew is derived from the two persisted timing knobs (see Initializing a new dataset for seeding). The two-knob shape decomposes what the cutoff actually has to cover:

  • CommitTimeout absorbs the per-PUT wall-clock duration drift described above. A successful write — by the writer-side enforcement check — has time.Since(refMicroTs) ≤ CommitTimeout at token-commit completion, which means its ref PUT (one of the two PUTs in the budget) also completed within CommitTimeout of refMicroTs. So if refCutoff = now - SettleWindow ever advances past a ref's refMicroTs, the ref has already had at least CommitTimeout to land in LIST and its <token>.commit to land for HEAD — both are visible to the reader (under read-after-new-write, which the library's backend assumptions require). The drift caused by parallel writers with varying PUT durations is exactly what CommitTimeout is sized to absorb; no separate "concurrent-writer drift" term is needed.
  • MaxClockSkew absorbs writer↔reader wall-clock divergence. Refs encode refMicroTs directly, so the reader's comparison refMicroTs ≤ now - SettleWindow is across two different machines' clocks. MaxClockSkew is the operator's declared bound on that divergence (zero on tightly-clocked NTP-synced fleets).

Together they bound the cutoff so it can't overtake a ref whose ref PUT or token-commit PUT might still be landing server-side. This gives you a single monotonic offset with no seen-set or dedup bookkeeping.

The formula assumes read-after-new-write on the ref LIST and the token-commit HEAD — the library's documented backend prerequisite (see CLAUDE.md "Backend assumptions"). On a backend that doesn't honor that, no SettleWindow value is sufficient; the cutoff math has nothing to lean on.

Enforcement: the writer's elapsed-time check

The "CommitTimeout absorbs the per-PUT drift" claim above holds only if every successful write actually fits inside CommitTimeout. The writer enforces that directly. After the token-commit PUT, the writer compares time.Since(time.UnixMicro(refMicroTs)) — elapsed wall-clock from the moment just before the ref PUT — against CommitTimeout. Pre-ref work (parquet encoding, marker PUTs, data PUT) is deliberately outside the budget: only the ref-LIST-visible → token-commit-visible window can put the SettleWindow contract at risk. If exceeded, the writer returns an error to the caller — the commit IS in place (data is durable; snapshot reads see the new records immediately), but a stream reader whose SettleWindow already advanced past refMicroTs may have emitted past the ref. The error tells the caller their write is at risk; a same-token retry recovers via the upfront HEAD with the original WriteResult. Either way the contract holds: a ref is permanently invisible (writer raised the error → caller retries, fresh refMicroTs) or it landed within budget (the absorber claim holds for it).

If any PUT in the sequence fails (markers, data, ref, or token-commit), the call returns a wrapped error. The library never deletes anything it has written; per-attempt paths mean a failed attempt's data and ref don't conflict with a retry, and both read paths gate on <token>.commit so an orphan parquet remains invisible until an operator-driven prune removes it. Under WithIdempotencyToken the retry runs the upfront HEAD first — if the failed attempt happened to land its commit, the retry returns its WriteResult unchanged; otherwise the retry generates a fresh attempt-id and proceeds end-to-end.

if _, err := store.Write(ctx, recs,
    s3store.WithIdempotencyToken(token),
); err != nil {
    // Some PUT didn't land, or the writer's elapsed exceeded
    // CommitTimeout. Retry with the same token — the upfront
    // HEAD on <token>.commit dedups the prior attempt if it
    // secretly succeeded; otherwise the retry writes a fresh
    // attempt end-to-end.
    return retry(...)
}

For exactly-once at the consumer, configure reader dedup (EntityKeyOf + VersionOf). Near-concurrent retry overlap (out of contract per the Concurrency contract but still bounded if it arises by accident) can leave two committed attempts whose records share (entity, version); dedup collapses them.

StorageGRID

s3store works on AWS S3, MinIO, and NetApp StorageGRID. AWS and MinIO need no configuration — both are strongly consistent on LIST and GET by default and ignore the Consistency-Control header. StorageGRID setup:

  • ConsistencyControl defaults to ConsistencyStrongGlobal — the safe multi-site choice. Single-site grids (or multi-site grids with strictly co-located reader/writer pairs) can downgrade explicitly to ConsistencyStrongSite to avoid the cross-site PUT cost.

No bucket policy required: data and ref PUTs use per-attempt paths, and the <token>.commit overwrite that arises only on out-of-contract concurrent same-token retries is record-wise byte-equivalent (deterministic parquet encoding). The s3:PutOverwriteObject deny earlier versions required is no longer used. See STORAGEGRID.md for the full topology decision matrix, operational notes on the 11.9 availability cliff, and the consistency-model reasoning that motivates the ConsistencyControl default.

Durability guarantees

The contract is at-least-once on both sides of the wire, plus read-after-write on every operation except Poll / PollRecords / ReadRangeIter / ReadPartitionRangeIter (which deliberately lag the live tip by SettleWindow to tolerate S3 LIST propagation skew — see Settle window).

Read-after-write

If Write (or WriteWithKey) returns success, every one of the following operations issued from any process against the same bucket sees the new records immediately — no sleep, no settle delay:

  • Reader.Read / ReadIter / ReadPartitionIter
  • MaterializedViewReader.Lookup
  • BackfillMaterializedView

Poll / PollRecords / ReadRangeIter / ReadPartitionRangeIter are the intentional exceptions: they apply the SettleWindow cutoff so near-tip refs stay hidden until S3 LIST propagation has had time to settle. A ref that's written inside the window will be returned by a subsequent poll issued after one SettleWindow has elapsed.

What you need to configure
  • AWS S3 / MinIO. Nothing. Both backends give strong read-after-write on LIST and GET by default and ignore the Consistency-Control header — the default ConsistencyStrongGlobal is a no-op there.
  • StorageGRID (NetApp). The default ConsistencyStrongGlobal is the safe multi-site choice and needs no further configuration. Single-site grids can downgrade to ConsistencyStrongSite explicitly to save the cross-site PUT cost. No bucket policy required; both options are documented in STORAGEGRID.md, which also explains why a weaker level (or asymmetric writer/reader levels) breaks the read-after-write contract.
Write

If Write (or WriteWithKey) returns nil, every record in the batch is durably stored in S3 and will be returned by subsequent Read, Poll, PollRecords, and MaterializedViewReader.Lookup for the lifetime of the data. The write path commits in order: marker PUTs → data PUT → ref PUT → <token>.commit PUT, returning success only after the token-commit lands and the writer's ref→commit elapsed (time.Since(time.UnixMicro(refMicroTs))) is within CommitTimeout.

If Write returns an error, state is indeterminate — the records may or may not be durable, and the commit may or may not have landed. The caller must retry. A retry after partial success may write some records twice; dedupe those on read via EntityKeyOf + VersionOf. Multi-group Write returns ([]WriteResult, error) — consult the slice for records that did commit before the error so a retry can skip them if needed.

To make retries dedup against prior successful attempts, pass WithIdempotencyToken — see Idempotent writes. Each attempt writes its data and ref to per-attempt paths; the upfront HEAD on <token>.commit dedups sequential retries (returning the prior WriteResult unchanged); the writer's elapsed-time check against CommitTimeout enforces the write-path budget, returning a wrapped error if the commit landed too late so the caller retries.

Context cancellation boundary

Write honours ctx cancellation through the data PUT (Step 5 of the write sequence). A cancel before the data PUT lands returns ctx.Err() and leaves either nothing on S3 or an orphan parquet — invisible to readers via the commit gate, dead weight on S3 until an operator-driven prune.

Once the data PUT lands successfully, the ref PUT and the <token>.commit PUT issue under context.WithoutCancel(ctx) — they run to completion regardless of caller cancellation. Without this boundary, a cancel between the data PUT and the commit PUT would leave the most expensive form of orphan: a multi-megabyte parquet that's invisible-but-durable when the work to make it visible was two zero-byte PUTs away. Both PUTs are bounded by the library's retry policy (retryMaxAttempts = 5 with cumulative worst-case ~2.4s of jittered backoff per call) plus the AWS SDK's per-request timeouts; in practice they complete in milliseconds. A caller that genuinely needs to abort must do so before the data PUT returns; once Write returns success, the cancellation that may have arrived during the no-cancel window is moot — the records are committed.

Read

A data-file GET that returns S3 NoSuchKey is operator-driven (lifecycle policy, manual prune, or external delete — the library itself never deletes data it has written). The library splits the response by path:

  • Strict — fail loudly. Read, ReadIter, and ReadPartitionIter propagate the NoSuchKey as a wrapped error. These paths LIST the partition tree first, so a missing file is genuinely a LIST-to-GET race (the file vanished in the millisecond window between LIST and GET); a caller retry resolves it because the next LIST won't include the deleted key.
  • Tolerant — skip and signal. PollRecords, ReadRangeIter, ReadPartitionRangeIter, ReadEntriesIter, ReadPartitionEntriesIter, and BackfillMaterializedView walk the ref stream / data tree on a long-running shape where a single missing file shouldn't poison the whole job. They log via slog.Warn (level WARN, key=path, method=poll_records / read_range_iter / read_partition_range_iter / read_entries_iter / read_partition_entries_iter / backfill) and increment the s3store.read.missing_data counter, then continue. The caller's slog handler decides what to do with the warning; metrics are picked up by any OTel-configured backend.

Every other GET error (throttle, network, auth, timeout) is still fatal on every path — silently dropping records on transient failure is worse than propagating.

Configuration

type StoreConfig[T any] struct {
    // S3 wiring (Bucket, Prefix, S3Client, PartitionKeyParts,
    // ConsistencyControl, MaxInflightRequests, MeterProvider) —
    // see the S3TargetConfig docstring for the full per-field
    // contract. Embedded so callers can build helpers against
    // *S3TargetConfig once and reuse them across StoreConfig,
    // WriterConfig (via NewS3Target), and ReaderConfig.
    S3TargetConfig

    // Required for Write
    PartitionKeyOf func(T) string  // derive key from record (Write)

    // Read-side dedup (used by every read path: Read / ReadIter /
    // ReadPartitionIter / ReadRangeIter / ReadPartitionRangeIter /
    // ReadEntriesIter / ReadPartitionEntriesIter / PollRecords).
    // Both or neither — explicit opt-in, no default. New rejects
    // partial config.
    EntityKeyOf func(T) string  // identifies a unique entity
    VersionOf   func(T) int64   // monotonic version per entity

    // CommitTimeout / MaxClockSkew are NOT StoreConfig fields. They're
    // persisted at <Prefix>/_config/commit-timeout and
    // <Prefix>/_config/max-clock-skew so writer and reader agree
    // by construction. Seed once via the boto3 snippet in
    // "Initializing a new dataset"; New(StoreConfig) GETs both
    // values at construction time and stamps them (plus the derived
    // SettleWindow = CommitTimeout + MaxClockSkew) on the Target.

    // Optional write-time column: if set, the writer populates
    // this field on T (must be `time.Time` with a non-empty,
    // non-"-" parquet tag like `parquet:"inserted_at"`) with
    // its wall-clock at write-start. The reader has no special
    // handling — the column round-trips on T like any parquet
    // field. Reference it from VersionOf to use the writer's
    // stamp as the dedup version:
    //   VersionOf: func(r T) int64 { return r.InsertedAt.UnixMicro() }
    InsertedAtField string

    // Parquet compression codec (default snappy).
    Compression CompressionCodec

    // Optional materialized views.
    MaterializedViews []MaterializedViewDef[T]
}

StoreConfig is the all-in-one form for New(StoreConfig[T]). Services that only write or only read can use the narrower WriterConfig[T] / ReaderConfig[T] directly with NewWriter / NewReader — see the Writer / Reader / Narrow-T reads section.

Observability

The library emits OpenTelemetry metrics through the Metrics struct attached to every S3Target. Wiring is opt-in: pass a MeterProvider on S3TargetConfig, or — easier — set one globally with otel.SetMeterProvider(...) before constructing the target and the library picks it up automatically.

import (
    "go.opentelemetry.io/otel"
    sdkmetric "go.opentelemetry.io/otel/sdk/metric"
)

// One-time at process start.
otel.SetMeterProvider(sdkmetric.NewMeterProvider(
    sdkmetric.WithReader(prometheusReader), // or OTLP, etc.
))

target, err := s3store.NewS3Target(ctx, s3store.S3TargetConfig{
    Bucket: "my-bucket", Prefix: "events",
    S3Client: s3Client,
    PartitionKeyParts: []string{"period", "customer"},
    // MeterProvider not set → falls back to otel.GetMeterProvider().
})

Every observation carries s3store.bucket and s3store.prefix as constant attributes (so multi-target deployments can be split per target in dashboards), plus s3store.consistency_level when ConsistencyControl is non-empty. No bucket/key/partition-value attributes are added per call — cardinality stays bounded.

Instrument inventory

S3 ops (recorded at the wrapper layer in S3Target.put / get / existsLocked / listPage):

Name Kind Unit
s3store.s3.request.duration histogram s
s3store.s3.request.count counter 1
s3store.s3.transient_error.count counter 1
s3store.s3.request.attempts histogram 1
s3store.s3.request.body.size histogram By
s3store.s3.response.body.size histogram By

Per-op attributes: s3store.operationput|get|head|list, s3store.outcomesuccess|error|canceled, plus error.type (precondition_failed|not_found|slowdown|server|client|transport|canceled|other) on non-success. s3store.s3.request.attempts carries only s3store.operation + s3store.outcome (it measures retry behavior, not terminal error class — keeping cardinality bounded).

s3store.s3.transient_error.count fires inside retry() on every transient failure (HTTP 5xx, 429 SlowDown, transport-layer) regardless of whether a retry follows or it's the terminal attempt. It closes the visibility gap on s3store.s3.request.count, which only carries error.type for terminal errors — so a 2-attempt success used to record only outcome=success, attempts=2 with no clue about what the failed attempt 1 saw. Labels on this counter: s3store.operation, error.type, and s3store.attempt (the index of the failed attempt — note the singular s3store.attempt here vs. the plural s3store.attempts on s3store.s3.request.count which carries the call's final attempt count). Query "share of requests hitting a transient cause X" as rate(s3store_s3_transient_error_count_total{error_type="slowdown"}) / rate(s3store_s3_request_count_total{s3store_attempts!="0"}).

Library methods (recorded at the public entry point of Write, WriteWithKey, LookupCommit, Read, ReadIter, ReadPartitionIter, ReadRangeIter, ReadPartitionRangeIter, ReadEntriesIter, ReadPartitionEntriesIter, Poll, PollRecords, MaterializedViewReader.Lookup, BackfillMaterializedView):

Name Kind Unit
s3store.method.duration histogram s
s3store.method.calls counter 1
s3store.write.records histogram 1
s3store.write.partitions histogram 1
s3store.write.bytes histogram By
s3store.write.commit_after_timeout counter 1
s3store.write.optimistic_commit.collisions counter 1
s3store.read.records histogram 1
s3store.read.bytes histogram By
s3store.read.files histogram 1
s3store.read.partitions histogram 1
s3store.read.missing_data counter 1
s3store.read.malformed_refs counter 1
s3store.read.commit_head counter 1
s3store.read.commit_head_cache_hit counter 1

s3store.write.commit_after_timeout increments when the writer's elapsed time from refMicroTs (just before the ref PUT) to token-commit-PUT completion exceeded CommitTimeout. The write surfaces an error to the caller (commit is durable; the stream reader's SettleWindow may already have advanced past refMicroTs — a same-token retry recovers via the upfront HEAD). Pre-ref work (parquet encoding, marker PUTs, data PUT) is outside the budget by design.

s3store.write.optimistic_commit.collisions increments on each WithOptimisticCommit write where the conditional commit PUT was rejected because a prior <token>.commit already existed (412 PreconditionFailed under conditional PUT, or 403 AccessDenied under bucket-policy deny). The write recovers via a HEAD on the commit marker and returns the prior WriteResult unchanged; the orphan parquet + ref this attempt left behind are invisible to readers via the commit gate. Chart against s3store.method.calls{ method="write"} to see the collision rate — past ~5% the option is a net loss vs. the upfront-HEAD path.

s3store.read.commit_head increments on every commit-marker HEAD that the read paths issue: snapshot reads HEAD only when ≥2 parquets share a token in one partition; stream reads HEAD once per uncached (partition, token) per poll; Writer.LookupCommit HEADs unconditionally. Carries s3store.method so dashboards can split by which path issued the HEAD. s3store.read.commit_head_cache_hit is the companion: increments when a stream-read HEAD was satisfied from the per-poll cache instead.

s3store.read.missing_data increments on NoSuchKey skips along the tolerant read paths (PollRecords, ReadRangeIter, ReadPartitionRangeIter, ReadEntriesIter, ReadPartitionEntriesIter, BackfillMaterializedView). Carries s3store.method so dashboards can split by which path produced the skip. Strict paths (Read, ReadIter, ReadPartitionIter) fail instead of recording.

s3store.read.malformed_refs increments when a ref object's filename fails to parse during a LIST on the ref stream. Skipped after a slog.Warn so consumers don't crash on a future schema or externally-written object — the counter makes the drift visible. Surfaced under s3store.method = poll because the LIST that hit it always runs in Poll, even when invoked indirectly by PollRecords / ReadRangeIter / ReadPartitionRangeIter.

s3store.read.partitions records the distinct Hive partitions touched per call on every method that funnels through the iter pipeline: Read / ReadIter / ReadPartitionIter / ReadRangeIter / ReadPartitionRangeIter / ReadEntriesIter / ReadPartitionEntriesIter / PollRecords. Not recorded for Poll (refs only — no decode), MaterializedViewReader.Lookup, Writer.LookupCommit, or BackfillMaterializedView. Mirrors s3store.write.partitions on the write side.

Write and WriteWithKey show up as distinct s3store.method values; Write's per-partition dispatch is internal and does not double-count.

Target-level state (per-Target semaphore):

Name Kind Unit What
s3store.target.inflight up-down counter 1 currently holding a slot
s3store.target.waiting up-down counter 1 currently blocked in acquire() (queue depth)
s3store.target.semaphore.wait.duration histogram s wait time before a slot was granted
s3store.target.semaphore.acquires counter 1 by outcome

Fan-out (concurrency primitive used by writer/reader/matview work):

Name Kind Unit What
s3store.fanout.workers histogram 1 worker goroutines per fan-out call
s3store.fanout.items histogram 1 items per fan-out call

Saturation = s3store.target.inflight / MaxInflightRequests. Sustained s3store.target.waiting > 0 means callers are queuing on the semaphore; raise MaxInflightRequests (cost: more concurrent S3 connections) or reduce upstream concurrency.

The AWS SDK v2 itself does not ship OTel metrics. If you also want per-SDK-attempt observation (e.g. to distinguish smithy retries from this library's outer retry() retries), attach a smithy middleware to your *s3.Client directly — the wrapper-level metrics above sit one layer outside that.

Tuning the Go runtime (GOMEMLIMIT / GOGC)

s3store is allocation-heavy on the read side: BenchmarkDecode in parquet_bench_test.go shows ~32× the input file size in bytes allocated per decode (a 20 MiB parquet body produces ~620 MB of garbage during deserialisation), with ~8 allocations per decoded record. Most of that lives inside parquet-go and isn't reachable from this library; the writer pool we ship cuts encode-side allocations ~70 % but doesn't move the decode floor.

For a service that GCs frequently — visible as long p99 latency spikes correlated with heap growth, or runtime/metrics /cpu/classes/gc/total:cpu-seconds above 10 % of total CPU — the two highest-leverage knobs are GOMEMLIMIT and GOGC. They are process-level Go-runtime settings, not s3store config; we document them here because the library's allocation profile is the reason they matter.

What they do

GOMEMLIMIT (Go 1.19+) — soft cap on heap. As live heap approaches it, GC runs more aggressively to stay under. Soft means Go can briefly exceed it; the cgroup OOM-killer remains the hard ceiling. Default is effectively unlimited (math.MaxInt64), so on a memory-constrained pod Go is unaware of the cgroup limit and can't pace GC against it.

GOGC (Go 1.5+) — relative trigger. GOGC=100 (default) means GC when heap is 2× live. GOGC=200 means 3× live. Higher values reduce GC frequency at the cost of more memory; lower values reduce memory at the cost of more GC. With GOMEMLIMIT set, the actual trigger becomes min(GOGC ratio, GOMEMLIMIT), so you can run with higher GOGC and rely on GOMEMLIMIT to prevent runaway.

Go does NOT auto-detect cgroup memory limits the way Go 1.25+ auto-detects CPU quota for GOMAXPROCS. You must set GOMEMLIMIT explicitly. If you want auto-detection, KimMachineGun/automemlimit reads the cgroup limit at startup — same shape as automaxprocs but for memory. Worth checking against current Go release notes for your version before assuming the situation hasn't changed.

Suggested starting values

Set GOMEMLIMIT to ~80 % of the pod memory limit. The 20 % headroom covers non-heap memory (goroutine stacks, mmap, file handles, AWS-SDK connection pool, sidecar state if any).

Pod memory limit GOMEMLIMIT GOGC
1Gi 800MiB 100–200
2Gi 1600MiB 200
4Gi 3200MiB 200–400
8Gi 6400MiB 400

Lower GOGC (more frequent GC) for tight memory; higher for spacious. Tune from this baseline against measured GC CPU and p99 latency under your real workload.

How to set them

In Kubernetes, both values are environment variables:

spec:
  containers:
  - name: my-service
    resources:
      limits:
        memory: "2Gi"
    env:
    - name: GOMEMLIMIT
      value: "1600MiB"
    - name: GOGC
      value: "200"

Or programmatically at startup (use environment variables preferentially — they're tunable without rebuild):

import "runtime/debug"

func main() {
    debug.SetMemoryLimit(1600 * 1024 * 1024)
    debug.SetGCPercent(200)
    // ...
}
Verifying the settings work

Three signals to chart:

  1. GC CPU fraction. What fraction of CPU is spent in GC? Goal: below 5 %; above 10 % means GC is stealing real work. With Go's runtime/metrics package wired into your meter:

    rate(go_cpu_classes_gc_total_cpu_seconds_total[5m])
      / rate(go_cpu_classes_total_cpu_seconds_total[5m])
    
  2. p99 latency on read paths. GC pauses surface here. Sub- millisecond pauses are the norm on Go 1.20+ when GOMEMLIMIT is sized correctly; visible spikes that correlate with heap growth indicate insufficient headroom.

  3. Heap headroom. go_memory_classes_heap_objects_bytes should sit at 60–80 % of GOMEMLIMIT in steady state. If it chronically pegs the limit, raise GOMEMLIMIT (and the pod limit if needed); if it stays well under, you can be more generous with GOGC.

Tuning loop
  1. Set GOMEMLIMIT to 80 % of pod memory limit. Always.
  2. Start with GOGC=200.
  3. Run under representative load. Watch GC CPU fraction and p99.
  4. GC CPU > 10 %: bump GOGC upward (less frequent GC). If heap usage stays well below GOMEMLIMIT, also raise GOMEMLIMIT toward 90 %.
  5. OOM kills: lower GOGC, lower GOMEMLIMIT. The cgroup limit should never be hit.
  6. p99 latency spiky: that's GC pause time. Try GOGC=off + GOMEMLIMIT-only — eliminates the ratio-triggered GC, only memory-pressure-triggered. Risky if your workload genuinely needs the ratio trigger; verify heap doesn't grow unbounded under sustained load before relying on this in production.
Gotchas
  • Don't set GOMEMLIMIT ≥ cgroup limit. Defeats the purpose; the OOM-killer hits before GC reacts.
  • Don't disable GC entirely with GOGC=off unless GOMEMLIMIT is also set. Without either trigger, the heap grows until OOM.
  • Sidecars consume the cgroup memory budget. envoy, fluent-bit, similar agents take their share. The pod limit covers them; your service's GOMEMLIMIT should account for sidecar usage, not assume the full pod limit.
  • GOMEMLIMIT is per-process. Multiple Go binaries in one pod each get their own limit; the sum should still fit the cgroup.
How this composes with the writer pool

The writer pool (commit 214052c and follow-ups) reduces allocation count and bytes per encode by ~50–70 %. Runtime tuning reduces GC pressure for the same allocation rate. They compose well: the pool produces less garbage per Write, and GOMEMLIMIT / GOGC let the runtime hold more between collections. For a write-heavy workload the pool is the bigger short-term lever; for a read-heavy workload, runtime tuning is — decode-side allocations live mostly inside parquet-go and aren't reachable from outside.

Migration from earlier versions

Breaking changes in the single-package collapse:

  • s3sql package and DuckDB removed. The SQL-on-Parquet half (DuckDB-backed Query returning *sql.Rows) is gone. If your workload needs SQL aggregation, use DuckDB directly (point its httpfs extension at the same bucket and read_parquet() over the data path) or pin an older release.

  • QueryOption renamed to ReadOption. The read-path option type now matches the verbs it configures (Read / ReadIter / ReadRangeIter). QueryOpts and WriteOpts were also unexported to readOpts / writeOpts — the structs are an internal implementation detail now, but []s3store.ReadOption and []s3store.WriteOption slices built from the exported With* functions continue to work unchanged.

  • WithIdempotentRead removed. The barrier was load-bearing only under single-writer-per-partition, and even there the upfront-HEAD-on-commit gate already short-circuits same-token retries via WithIdempotencyToken — recalculated bytes on a retry are discarded in favour of the prior commit's WriteResult. For retry-safe read-modify-write under WithIdempotencyToken, no read-side option is needed; for read-modify-write without tokens, callers must enforce single- writer themselves.

  • Config[T] renamed to StoreConfig[T]; S3 wiring moved to embedded S3TargetConfig. The flattened wiring fields (Bucket, Prefix, S3Client, PartitionKeyParts, ConsistencyControl, MaxInflightRequests) now live on the embedded S3TargetConfig value — single source of truth for the S3 wiring across StoreConfig, WriterConfig (via NewS3Target), and ReaderConfig. Side knobs (PartitionKeyOf, EntityKeyOf, VersionOf, Compression, InsertedAtField, MaterializedViews) stay at the top level.

    // Before
    s3store.Config[T]{
        Bucket: "...", Prefix: "...", S3Client: c,
        PartitionKeyParts:  []string{"period", "customer"},
        ConsistencyControl: s3store.ConsistencyStrongGlobal,
        PartitionKeyOf:     ...,
    }
    
    // After
    s3store.StoreConfig[T]{
        S3TargetConfig: s3store.S3TargetConfig{
            Bucket: "...", Prefix: "...", S3Client: c,
            PartitionKeyParts:  []string{"period", "customer"},
            ConsistencyControl: s3store.ConsistencyStrongGlobal,
        },
        PartitionKeyOf: ...,
    }
    

    Field-promotion still gives cfg.Bucket = "..." access for callers that mutate after construction.

  • s3parquet package collapsed into the root s3store package. The single-package layout is the new home for everything: drop the s3parquet/ import path and use github.com/ueisele/s3store directly. Every public symbol moved with the same name (Writer[T], Reader[T], S3Target, MaterializedViewDef[T], MaterializedViewReader[K], BackfillMaterializedView, etc.).

  • DuckDB-only umbrella fields removed from Config. TableAlias, VersionColumn, EntityKeyColumns, ExtraInitSQL, Store.Query, Store.Close, and Store.SQL are gone. Reader dedup uses EntityKeyOf + VersionOf (the parquet-side equivalent that already existed) — drop the SQL fields from your config and add the typed closures. Store no longer needs Close (no DuckDB connection to release).

  • internal/testutil package removed. The MinIO test fixture is now in fixture_test.go in root. Only the integration build tag and your own integration tests are affected.

  • DisableCleanup field removed from Config and WriterConfig. The library no longer DELETEs orphan data on marker / ref PUT failure paths — at-least-once on storage now means data files written to S3 stay until an operator-driven prune removes them (S3 lifecycle rule, or manual cleanup with readers quiesced). Drop the field from your config; service accounts that were granted s3:DeleteObject for the cleanup path can drop that permission too. Retries still work unchanged via the token + conditional-PUT path.

  • OnMissingData callback removed from Config and ReaderConfig; BackfillMaterializedView no longer takes an onMissingData parameter. Replaced with a built-in slog.Warn + s3store.read.missing_data OTel counter at the three tolerant call sites (PollRecords, ReadRangeIter, BackfillMaterializedView). The strict paths (Read, ReadIter) now fail with a wrapped NoSuchKey error instead of skip-and- notify — a caller retry resolves the LIST-to-GET race. Drop the callback from your config and the trailing argument from BackfillMaterializedView calls; configure your slog handler to route / count the warning, or alert on the counter via your metrics backend.

For anyone upgrading across multiple releases, the older breaking-change history (projections-in-config, consistency-on-target, S3Target-as-handle, idempotent-write, bloom-filter removal, s3sql-as-Reader, the Projection refactor, the Projection→MaterializedView rename, the package split) is preserved in git history — git log -- README.md.

Testing

# Unit tests — pure Go, no C compiler needed.
go test -count=1 ./...

# Integration tests — full round-trip against a MinIO container.
# Uses testcontainers; one container is shared across the
# invocation.
go test -tags=integration -timeout=10m -count=1 ./...

# Lint (gofmt + govet + project linters in one shot).
golangci-lint run ./...

Integration tests require Docker and pull a pinned pgsty/minio release on first run (see fixture_test.go). After upstream minio/minio was archived in Feb 2026, the community-maintained pgsty/minio fork continues the same code under AGPLv3; it's a drop-in for the testcontainers MinIO module. -count=1 is the Go idiom for "bypass the test cache" — without it, unchanged packages return cached results.

Benchmarks
go test -bench=. -benchmem -benchtime=2s -run=^$ -count=3 \
  -cpu=1 ./...

Three benches in parquet_bench_test.go, covering encode (no-pool baseline + pooled steady-state) and decode across five size tiers (few_KB / 100KB / 1MB / 10MB / 20MB) that match a typical workload distribution.

go test ./... and the integration-test gate both compile benchmark functions but never execute them — Go's runner only runs Benchmark* when -bench=<pattern> is passed. Benchmarks are a measurement tool you invoke deliberately, not a CI gate.

A few notes for fair runs:

  • -cpu=1 keeps sync.Pool measurements honest. The pool is per-P internally; at the default GOMAXPROCS, a fixed b.N divides across multiple P slots and obscures the pool-empty vs pool-warm comparison the benches are designed to surface.

  • -run=^$ skips test functions so the binary only runs benchmarks (saves ~1s of test setup per package).

  • -count=3 gives variance signal. For two-version comparisons, feed the output to benchstat:

    go test -bench=. -benchmem -count=10 -cpu=1 ./... | tee bench-new.txt
    benchstat bench-old.txt bench-new.txt
    

    Single-sample comparisons can be misleading on noisy hardware.

  • The Encode_Pooled allocations-per-op number is the most sensitive signal. A regression there indicates the pool is churning — most likely the EncodeBufPoolMaxBytes cap is below the workload's typical produced parquet size; check the s3store.write.encode_buf_dropped counter on a running process to confirm.

Releasing

Versions are git tags following SemVer. The README badges auto-update from the latest tag and pkg.go.dev — nothing to edit in this file.

# From a clean, pushed main:
git tag -a v0.2.0 -m "v0.2.0"
git push origin v0.2.0

Optionally mint GitHub release notes from the new tag:

gh release create v0.2.0 --title "v0.2.0" --generate-notes

--generate-notes auto-populates the body with commits since the previous tag.

Version-bump rules while pre-v1:

  • v0.x.0 → any new feature or API change, however small.
  • v0.x.y → bug fixes only, no API surface change.

Immutability: tags pushed to the public repo are cached immutably by Go's module proxy, so a bad tag can't be replaced — cut v0.x.(y+1) instead. Only delete a tag if nobody could have pulled it yet:

git tag -d v0.2.0
git push origin :refs/tags/v0.2.0

Reaching v1.0.0: when the API feels stable after real-world use, tag v1.0.0. After that, breaking changes require a v2.0.0 and a module path rename to github.com/ueisele/s3store/v2 — one-way door, so don't rush it.

Limitations

  • No SQL engine. Reads are typed Go (Read / ReadIter / ReadPartitionIter / ReadRangeIter / ReadPartitionRangeIter / ReadEntriesIter / ReadPartitionEntriesIter / PollRecords) — no aggregation, no joins. For SQL workloads, point DuckDB at the same bucket via its httpfs extension and read_parquet() over the data path.
  • S3 key limit: 1024 bytes. Long partition values reduce the budget.
  • Stream latency = poll interval + settle window. Not real-time.
  • Upsert-only compacted mode. There is no tombstone / key-delete mechanism — keys can only be updated, not removed.
  • Dedup is in-memory. Large key cardinality can OOM; partition the dataset finely enough that any single partition's distinct entities fit comfortably in RAM, or read history with WithHistory() and dedup yourself.
  • Schema evolution is limited to tolerant reads. "Column added to T that isn't in an old file" returns the Go zero value. Renames, splits, type changes, and row-level computed derivations require rewriting the affected files.

Appendix: S3 calls per method

Every method's S3-call breakdown, for capacity-planning, cost estimation, and operator dashboards. Counts are per call; multi- pattern / multi-partition variants fan out independently and deduplicate at the key level. All ops route through the per- target MaxInflightRequests semaphore, carry the configured ConsistencyControl header, and have their own retry budget (retryMaxAttempts = 5 with jittered backoffs drawn from 100-300 / 300-500 / 500-800 / 500-800 ms).

Write

Fresh write (auto-token, or WithIdempotencyToken / WithIdempotencyTokenOf with no prior commit):

Op Count Key
HEAD 0–1 <Prefix>/data/<partition>/<token>.commit (skipped on auto-token; 1× upfront on the idempotent path, returns 404)
PUT M <Prefix>/_matview/<name>/.../m.matview — one per distinct matview-marker key in the batch (M = 0 with no views registered)
PUT 1 <Prefix>/data/<partition>/<token>-<attemptID>.parquet (data)
PUT 1 <Prefix>/_ref/<refMicroTs>-<token>-<attemptID>;<hiveEsc>.ref (ref)
PUT 1 <Prefix>/data/<partition>/<token>.commit (token-commit, atomic visibility flip)

Total: M+3 PUTs (auto-token) or 1 HEAD + M+3 PUTs (idempotent).

Idempotent retry (prior commit exists):

Op Count Key
HEAD 1 <Prefix>/data/<partition>/<token>.commit → 200, reconstruct WriteResult and return

Total: 1 HEAD; no PUTs, no body re-upload.

Multi-partition Write (one logical call) fans out: each partition runs the full sequence in parallel, capped by MaxInflightRequests. Marker PUTs fan out across all partitions' combined marker set.

LookupCommit
Op Count Key
HEAD 1 <Prefix>/data/<partition>/<token>.commit

200 → reconstructed WriteResult; 404 → ok=false; other → wrapped error.

Snapshot read

Read / ReadIter / ReadPartitionIter / ReadRangeIter (snapshot half) / ReadPartitionRangeIter (snapshot half) / ReadEntriesIter / ReadPartitionEntriesIter / BackfillMaterializedView share the same listing-and-gating skeleton:

Op Count Key
LIST ≥1 per plan <Prefix>/data/<plan-prefix>/ — one plan per pattern, paginated (~1000 keys/page); per-plan results unioned and key-deduplicated
HEAD 0–T <Prefix>/data/<partition>/<token>.commit — T = (partition, token) tuples with ≥2 parquets in the LIST (multi-attempt orphans). 0 in steady state
GET N <Prefix>/data/<partition>/<token>-<attemptID>.parquet — one per surviving parquet (post-commit-gate)

Read materializes records into a slice; ReadIter / ReadRangeIter stream them via iterator with bounded memory (WithReadAheadPartitions / WithReadAheadBytes). ReadPartitionIter / ReadPartitionRangeIter yield one HivePartition[T] per partition (same LIST + GET shape; the partition is the unit of yield). ReadEntriesIter / ReadPartitionEntriesIter skip the LIST stage entirely — the caller passes a pre-resolved []StreamEntry (typically from Poll), so the GET stage runs directly. BackfillMaterializedView adds 1 PUT per derived marker per parquet (deduped within the parquet) on top of the GETs.

ReadRangeIter / ReadPartitionRangeIter walk the ref stream first (see Stream read below) to collect data-file paths, then run the GET stage above.

Snapshot read — MaterializedViewReader.Lookup

LIST-only. Marker objects are zero-byte; key parsing alone yields the view's column values.

Op Count Key
LIST ≥1 per plan <Prefix>/_matview/<name>/<plan-prefix>/

No GETs, no HEADs. The ref-stream LIST runs in Poll, but Lookup doesn't touch the data tree.

Stream read — Poll
Op Count Key
LIST ≥1 <Prefix>/_ref/ — one paginated LIST starting at since, capped by maxEntries (≤1000 per page)
HEAD 0–U <Prefix>/data/<partition>/<token>.commit — U = unique (partition, token) tuples in the page; per-poll cache collapses repeats

No GETs. Returns refs only.

PollRecords = Poll + 1 GET per returned ref for the data file body (decoded via the same byte-budget pipeline as ReadIter).

ReadRangeIter / ReadPartitionRangeIter = one or more Poll cycles to walk the [since, until) window into a flat ref list, then 1 GET per data file through the iter pipeline.

ReadEntriesIter / ReadPartitionEntriesIter skip the ref-LIST walk entirely: the caller hands in []StreamEntry directly (typically from a prior Poll), so the only S3 ops are 1 GET per entry (plus the per-poll commit-cache HEADs, which the caller's earlier Poll already paid for).

License

See LICENSE.

Documentation

Overview

Package s3store provides append-only, versioned data storage on S3 with Parquet data files and a change stream. Pure Go, cgo-free. No server, no broker, no coordinator — S3 is the only dependency.

Capability sketch:

  • Write, WriteWithKey: encode []T as Parquet, PUT to S3, write an empty stream-ref file.
  • Read, ReadIter, ReadRangeIter: list Parquet files matching a key pattern, decode into []T, optionally dedup latest- per-entity in memory.
  • Poll, PollRecords: stream ref entries or records from the ref stream, starting after a caller-supplied offset.

Every snapshot read defaults to latest-per-key deduplication (configured via EntityKeyOf + VersionOf) and accepts WithHistory() to opt out.

Glob patterns are restricted to a deliberately narrow grammar: whole-segment "*", a single trailing "*" inside a value (e.g. "period=2026-03-*"), and half-open "FROM..TO" ranges. See validateKeyPattern for the full spec.

Deduplication is in-memory per partition; large key cardinalities can exceed available RAM.

Index

Constants

View Source
const CommitTimeoutAdvisory = 8 * time.Second

CommitTimeoutAdvisory is the soft floor below which loadTimingConfig emits a slog.Warn at construction. It bounds the retry envelope for the two PUTs that participate in the SettleWindow contract (ref + token-commit): each PUT can retry up to retryMaxAttempts (5) with jittered backoffs drawn from 100-300 / 300-500 / 500-800 / 500-800 ms (worst-case sum 2.4 s sleep per call), so two PUTs in worst case spend ~4.8 s in retry sleep alone, plus actual request time. 8 s gives ~1.67× safety on that retry envelope. Operators on tightly- clocked dev/test deployments can set CommitTimeout below this and ignore the warning; production deployments should size CommitTimeout above the advisory so transient S3 retries cannot cause the SettleWindow contract to be missed.

View Source
const CommitTimeoutFloor = time.Millisecond

CommitTimeoutFloor is the lower bound enforced by loadDurationConfig: values strictly less than this are rejected with a "below the floor" error. Set to 1 ms — strictly positive (zero is rejected as "would cause every write to exceed the timeout") and small enough that test deployments can pick sub-second values without further plumbing. Production deployments should pick a value well above CommitTimeoutAdvisory: loadTimingConfig emits a slog.Warn when the configured value is below the advisory floor (the retry envelope of the ref-PUT + token-commit-PUT pair).

The historical 1 s floor existed because HTTP-date `Last-Modified` is second-precision and the dropped timeliness check could not resolve sub-second gaps; with LastModified out of the protocol, the writer's local elapsed bound is honest at any strictly positive value.

View Source
const MaxClockSkewFloor = time.Duration(0)

MaxClockSkewFloor is the minimum MaxClockSkew value the library accepts. Zero is a valid claim on tightly-clocked deployments (NTP-synced nodes typically run within milliseconds of each other); negative would be incoherent.

Variables

View Source
var ErrCommitAfterTimeout = errors.New(
	"write committed after CommitTimeout")

ErrCommitAfterTimeout is wrapped into the error returned by WriteWithKey / Write when the elapsed wall-clock from refMicroTs (captured just before the ref PUT) to token-commit-PUT completion exceeded the configured CommitTimeout. The write IS durable — data file, ref, and `<token>.commit` are all in S3, and snapshot reads (Read / ReadIter / ReadPartitionIter / MaterializedViewReader.Lookup / BackfillMaterializedView) see it — but a stream reader's SettleWindow (= CommitTimeout + MaxClockSkew) may already have advanced past refMicroTs by the time the commit became observable, so a stream reader past that offset will never re-visit it.

The returned *WriteResult is non-nil and reflects the durable commit; snapshot consumers can ignore the error. Stream consumers that need observability inside the SettleWindow should call RestampRef with the returned WriteResult — that writes an additional ref with a fresh refMicroTs so the next poll picks it up. Same-token retry of WriteWithKey does NOT recover stream observability: its upfront HEAD finds the prior commit and returns the original WriteResult unchanged without writing any new ref.

Use errors.Is(err, ErrCommitAfterTimeout) to distinguish from transient PUT failures (which return a nil *WriteResult).

Functions

func PartitionKeysOf added in v0.20.0

func PartitionKeysOf(entries []StreamEntry) []string

PartitionKeysOf extracts the distinct Hive partition keys from a slice of StreamEntries, returned in lex-ascending order. Useful for cross-store intersection workflows where each Store's entries must be filtered to a common partition set before passing to ReadEntriesIter / ReadPartitionEntriesIter:

entriesA, _ := storeA.PollRange(ctx, since, until)
entriesB, _ := storeB.PollRange(ctx, since, until)
common := intersect(
    PartitionKeysOf(entriesA), PartitionKeysOf(entriesB),
)
// Filter each Store's entries by `common`, then pass to
// the matching Store's ReadEntriesIter.

Empty input returns nil. Returned slice is freshly allocated; the caller may mutate it without affecting `entries`.

Types

type BackfillStats added in v0.2.0

type BackfillStats struct {
	DataObjects int
	Records     int
	Markers     int
}

BackfillStats reports the work BackfillMaterializedView did: how many parquet objects it scanned, how many records it decoded, and how many marker PUTs it issued. Markers is per-object, not globally deduplicated — a marker path produced by N parquet files is counted N times (reflects S3 request cost, not unique marker count). Useful for progress logging in a migration job.

func BackfillMaterializedView added in v0.25.0

func BackfillMaterializedView[T any](
	ctx context.Context,
	target S3Target,
	def MaterializedViewDef[T],
	keyPatterns []string,
	until time.Time,
) (stats BackfillStats, err error)

BackfillMaterializedView scans existing parquet data and writes view markers for every record already present. The normal path is to wire materialized views via WriterConfig.MaterializedViews / Config.MaterializedViews before the first Write; BackfillMaterializedView is the relief valve for records written before the view existed.

keyPatterns use the grammar from validateKeyPattern, evaluated against target.PartitionKeyParts() (NOT the view's Columns) — backfill walks parquet data files, which are keyed by partition. "*" covers everything; shard across partitions to parallelize a migration. Overlapping patterns are deduplicated, so each parquet file is scanned at most once.

until is an exclusive upper bound on data-file LastModified. Typical use: until = deployTime_of_live_writer, so backfill covers historical gaps while the live writer covers everything from deploy onward. Pass time.Time{} (the zero value) to cover every file currently present (redundant with the live writer but harmless — PUT is idempotent).

On a data-file GET returning NoSuchKey (LIST-to-GET race, operator-driven prune, or lifecycle deletion), the file is skipped: a slog.Warn at level WARN names the path, the s3store.read.missing_data counter is incremented with method=backfill, and the backfill continues. Failing the whole backfill on one missing file would force a full restart of a long-running operator job.

Safe to run concurrently with a live writer (S3 PUT is idempotent) and safe to retry after a crash. Empty patterns slice is a no-op: (BackfillStats{}, nil). First malformed pattern fails with its index.

type CompressionCodec added in v0.4.0

type CompressionCodec string

CompressionCodec selects the parquet-level compression applied to every column on Write. String-valued for easy config/YAML wiring; mapped to the parquet-go codec at Store construction time. Zero value ("") resolves to snappy — the de-facto ecosystem default (Spark, Trino, Athena all emit snappy unless told otherwise).

const (
	// CompressionSnappy: fast encode/decode, ~2-3× ratio,
	// negligible CPU overhead. Default.
	CompressionSnappy CompressionCodec = "snappy"
	// CompressionZstd: better ratios than snappy at higher CPU
	// cost. Good for cold / archive data.
	CompressionZstd CompressionCodec = "zstd"
	// CompressionGzip: widely compatible, moderate CPU, decent
	// ratio. Mostly a legacy choice today.
	CompressionGzip CompressionCodec = "gzip"
	// CompressionUncompressed: no compression. Largest files;
	// only meaningful when the data is already high-entropy or
	// the CPU tradeoff matters more than S3 cost.
	CompressionUncompressed CompressionCodec = "uncompressed"
)

type ConsistencyLevel added in v0.14.0

type ConsistencyLevel string

ConsistencyLevel is the value passed in the Consistency-Control HTTP header on S3 requests that require stronger-than-default consistency. NetApp StorageGRID defines the header; AWS S3 and MinIO ignore it. Declared as a string-alias enum so typos are caught at compile time (via the named constants) while leaving room for backends that add new levels without a library update (ConsistencyLevel("future-level") still compiles and passes through verbatim).

Configured on S3TargetConfig.ConsistencyControl, applied uniformly to every correctness-critical S3 call routed through the target — data PUTs (idempotent and unconditional), ref PUTs, matview marker PUTs, GETs, HEADs, and every LIST (partition LIST, marker LIST, ref-stream LIST in Poll, scoped retry-LIST in findExistingRef). Setting it on the target rather than per-config struct enforces NetApp's "same consistency for paired operations" rule by construction. See the README's "StorageGRID consistency" section for the full matrix.

const (
	// ConsistencyDefault is the zero value: no header sent. The
	// bucket's default consistency applies. Correct on AWS S3 and
	// MinIO (strongly consistent out of the box); the per-op
	// opt-in for StorageGRID is one of the stronger levels below.
	ConsistencyDefault ConsistencyLevel = ""

	// ConsistencyAll requires every storage node to acknowledge.
	// Highest durability / visibility guarantee; highest latency.
	ConsistencyAll ConsistencyLevel = "all"

	// ConsistencyStrongGlobal ensures read-after-write and list-
	// after-write consistency across every site. Safe multi-site
	// choice for idempotency correctness.
	ConsistencyStrongGlobal ConsistencyLevel = "strong-global"

	// ConsistencyStrongSite ensures read-after-write and list-
	// after-write consistency within a single site. Cheaper than
	// ConsistencyStrongGlobal when the deployment is single-site
	// or all readers/writers co-locate.
	ConsistencyStrongSite ConsistencyLevel = "strong-site"

	// ConsistencyReadAfterNewWrite is StorageGRID's default: GET
	// of a newly-PUT object is strongly consistent, but LIST and
	// overwrite-PUT visibility is eventual. Insufficient for
	// Phase 3's scoped-LIST ref dedup — pick a stronger level if
	// idempotency guarantees matter on StorageGRID.
	ConsistencyReadAfterNewWrite ConsistencyLevel = "read-after-new-write"

	// ConsistencyAvailable is the weakest level: any replica may
	// answer. Use only when availability trumps correctness.
	ConsistencyAvailable ConsistencyLevel = "available"
)

type HivePartition added in v0.20.0

type HivePartition[T any] struct {
	Key  string
	Rows []T
}

HivePartition is the canonical (Key, Rows) value type for per-partition reads and writes. Key is the partition-key string ("period=X/customer=Y") matching what WriteWithKey takes as input.

Read-side: ReadPartitionIter / ReadPartitionRangeIter yield one HivePartition per Hive partition with Rows already sort+dedup'd (per-partition latest-per-entity by default; opt out with WithHistory for replica-only collapse). Yield order is lex-ascending by Key. Within Rows, order depends on dedup configuration: when EntityKeyOf is set, records are in (entity, version) ascending order under both dedup paths (the WithHistory replica path still sorts by the same comparator). When EntityKeyOf is nil (no dedup configured), records emit in decode order: file lex order, then parquet row order within each file.

Write-side: GroupByPartition returns []HivePartition[T] lex-ordered by Key, with per-partition Rows in input (insertion) order. No sort or dedup runs on the write side — the writer doesn't have an EntityKeyOf to compare on.

Both sides share the deterministic-emission contract: same input produces byte-identical HivePartition slices across calls. See "Deterministic emission order across read and write paths" in CLAUDE.md.

type Layout added in v0.14.0

type Layout struct {
	// Time is the layout string passed to time.Time.Format for
	// any column matching a time.Time field on T. Examples:
	// time.RFC3339, "2006-01-02" (date-only), "2006-01" (month).
	// Empty + a time column on T → error at NewWriter.
	//
	// time.Time.Format uses the value's own zone; call .UTC()
	// upstream if you want zone-stable keys.
	Time string
}

Layout configures auto-projection formatting for non-string fields on T. Each field is the layout for one type family; empty means "auto-projection refuses this type — set the field, or write Of explicitly."

Layout choices are part of the marker S3 key — once published, changing the layout orphans every prior marker. Pick a stable format up front (use BackfillMaterializedView if you ever need to migrate).

type MaterializedViewDef added in v0.25.0

type MaterializedViewDef[T any] struct {
	// Name identifies the view under the target's
	// <Prefix>/_matview/ subtree. Required. Must be non-empty,
	// free of '/', and unique across MaterializedViewDef[T] entries
	// on the same writer.
	Name string

	// Columns lists the view's column names in the order they
	// appear in the S3 path. Same ordering rules as
	// MaterializedViewLookupDef.Columns (most-selective first for
	// LIST pruning).
	Columns []string

	// Of extracts the per-record column values, in Columns order,
	// as a slice. Returning (nil, nil) skips the record (no marker
	// emitted). A non-nil values slice must have length equal to
	// len(Columns); each entry must satisfy
	// validateHivePartitionValue.
	//
	// Of is optional: nil → reflection. The library walks T's
	// parquet-tagged fields and, for every Columns entry, picks
	// the matching field. String fields project directly;
	// time.Time fields require Layout.Time to be set. T may
	// carry parquet tags not in Columns — those are ignored.
	//
	// Provide a custom Of when the auto-projection isn't enough
	// (per-column time formats, derived values, computed strings):
	//
	//	Of: func(r Record) ([]string, error) {
	//	    return []string{r.SKU, r.At.Format(time.RFC3339)}, nil
	//	}
	Of func(T) ([]string, error)

	// Layout configures how non-string parquet-tagged fields on T
	// are formatted into path segments when Of is nil. Honored
	// only by the auto-projection path; an explicit Of bypasses
	// Layout entirely.
	Layout Layout
}

MaterializedViewDef is the write-side definition of a materialized view. Pass a slice of these on WriterConfig.MaterializedViews / Config.MaterializedViews at construction; the writer iterates each registered view per record, builds the marker S3 key from Columns + Of's return, and PUTs one empty marker per distinct key.

The previous late-binding RegisterMaterializedView API is gone: views are part of the writer's construction, so registration cannot race with Write and "registered after first Write" is not a reachable state.

type MaterializedViewLookupDef added in v0.25.0

type MaterializedViewLookupDef[K comparable] struct {
	// Name identifies the view under the target's
	// <Prefix>/_matview/ subtree. Required. Must be non-empty
	// and free of '/'.
	Name string

	// Columns lists the view's column names in the order
	// they appear in the S3 path. Earlier columns form a narrower
	// LIST prefix when Lookup specifies them literally. Pick the
	// order based on how queries filter: most-selective first.
	Columns []string

	// From projects a Lookup-result values slice (one entry per
	// Columns position, in declared order) onto a typed K. Nil →
	// reflection: every parquet-tagged field on K must match
	// exactly one Columns entry. String fields are set directly;
	// time.Time fields are parsed via time.Parse(Layout.Time, ...).
	// Use a custom From when K has no parquet tags or when extra
	// validation is needed.
	//
	// values length is guaranteed to equal len(Columns); custom
	// From implementations may rely on positional access (e.g.
	// values[0] for Columns[0]).
	//
	// Errors are propagated by Lookup with the view name as
	// wrapping context.
	From func(values []string) (K, error)

	// Layout configures how non-string parquet-tagged fields on K
	// are parsed from path segments when From is nil. Honored
	// only by the auto-binding path; an explicit From bypasses
	// Layout entirely.
	//
	// For correctness, Layout.Time on the read side MUST match
	// MaterializedViewDef.Layout.Time on the write side — drift
	// produces silently wrong values or parse errors. Define one
	// constant and reuse it on both sides.
	Layout Layout
}

MaterializedViewLookupDef is the read-side definition of a materialized view. Build a MaterializedViewReader[K] from it via NewMaterializedViewReader when a service queries an existing materialized view (dashboard, query API).

Name + Columns identify the view in S3; From projects the per-marker column values back into a typed K. From is optional — when nil, the library reflects K's parquet-tagged string fields against Columns and assembles K from the values slice. Provide a custom From for K's that have no parquet tags or when extra validation/transformation is needed.

type MaterializedViewReader added in v0.25.0

type MaterializedViewReader[K comparable] struct {
	// contains filtered or unexported fields
}

MaterializedViewReader is the typed read-handle for a materialized view. Build via NewMaterializedViewReader from a S3Target + MaterializedViewLookupDef[K]. Lookup issues LIST only — no parquet reads.

func NewMaterializedViewReader added in v0.25.0

func NewMaterializedViewReader[K comparable](
	target S3Target, def MaterializedViewLookupDef[K],
) (*MaterializedViewReader[K], error)

NewMaterializedViewReader builds a query handle for a materialized view. Validates Name + Columns and, when def.From is nil, K's parquet tags against Columns. With a custom From, only the def-level fields are validated — the caller is responsible for producing valid K's.

func (*MaterializedViewReader[K]) Lookup added in v0.25.0

func (i *MaterializedViewReader[K]) Lookup(
	ctx context.Context, patterns []string,
) (out []K, err error)

Lookup returns every K whose marker matches any of the given key patterns. Patterns use the grammar from validateKeyPattern, evaluated against Columns. Pass multiple patterns when the target set isn't a Cartesian product (e.g. (sku=A, customer=X) and (sku=B, customer=Y) but not the off-diagonal pairs); overlapping patterns are deduplicated at the marker-key level, so each distinct marker contributes at most one entry. With the default reflection binder (or any non-lossy custom From) that means no duplicate K's. A custom From that drops a column — e.g. Columns=[sku, customer] but K holds only SKU — collapses distinct markers to equal K's; the returned slice will contain duplicates in that case.

Read-after-write: the marker LIST inherits the target's ConsistencyControl, so every marker the writer has already published is visible. Unlike Poll there is no SettleWindow filter.

Results are unbounded — narrow the patterns if a view has millions of matching markers. Empty patterns slice returns (nil, nil); a malformed pattern fails with the offending view.

type Offset

type Offset string

Offset represents a position in the stream. Use the empty string Offset("") as the unbounded sentinel: as `since` it means stream head; as the upper bound (via WithUntilOffset) it means walk to the live tip (now - SettleWindow) as of the call. To keep up with new writes, call again from the last offset. ReadRangeIter takes time.Time bounds and uses time.Time{} as its own unbounded sentinel.

type PollOption added in v0.14.0

type PollOption func(*pollOpts)

PollOption configures Poll / PollRecords. Separate from ReadOption (which serves the snapshot read paths) so each option type only carries knobs its read path actually honours — no "ignored on this path" footguns.

func WithUntilOffset

func WithUntilOffset(until Offset) PollOption

WithUntilOffset bounds Poll / PollRecords from above: only entries with offset < until are returned (half-open range). Pair with Reader.OffsetAt to read records in a time window. Zero-value offset disables the bound.

type ReadOption added in v0.18.0

type ReadOption func(*readOpts)

ReadOption configures read-path behavior. Shared across snapshot reads (Read / ReadIter / ReadRangeIter); Poll and PollRecords have their own PollOption type — the option spaces don't overlap.

func WithHistory

func WithHistory() ReadOption

WithHistory disables latest-per-key deduplication on any read path. Without it, reads are deduped by EntityKeyOf + VersionOf (latest per entity per partition); with it, every version of every record is returned.

When no dedup rule is configured (EntityKeyOf or VersionOf nil), dedup is a no-op regardless of this option.

func WithReadAheadBytes added in v0.14.0

func WithReadAheadBytes(n int64) ReadOption

WithReadAheadBytes caps the cumulative uncompressed parquet bytes that may sit decoded in the ReadIter / ReadRangeIter pipeline ahead of the current yield position. Zero (default) disables the cap; only WithReadAheadPartitions binds.

Composes with WithReadAheadPartitions — both are evaluated and whichever cap binds first holds the producer back. Useful when partition sizes are skewed: a tiny WithReadAheadPartitions(1) is too conservative for many small partitions but a larger value risks OOM on a few large ones; a byte cap auto-tunes across both.

The byte total is read from each parquet file's footer (total_uncompressed_size summed across row groups), so the cap is exact, not a heuristic. Decoded Go memory typically runs 1–2× the uncompressed size depending on data shape.

Per-partition guarantee: if a single partition's uncompressed size exceeds the cap, that one partition still decodes (the cap can't be enforced below the partition granularity without row-group-level streaming). The cap only prevents *additional* partitions from joining the buffer.

func WithReadAheadPartitions added in v0.11.0

func WithReadAheadPartitions(n int) ReadOption

WithReadAheadPartitions tells ReadIter / ReadRangeIter how many partitions to buffer ahead of the current yield position. Default (option not supplied) is 1 — minimum useful lookahead so decode of partition N+1 overlaps yield of partition N. Pass a larger value for more aggressive prefetch on consumers that do non-trivial per-record work; combine with WithReadAheadBytes to bound stacking of skewed-size partitions.

n=0 is the explicit-no-buffer mode: unbuffered handoff between decoder and yield loop. The decoder still works on partition N+1 concurrent with yield emitting N (the handoff just blocks the decoder briefly), but never two decoded partitions sit in memory at once. Useful when records are large and the consumer's per-record work is fast — the byte cap is then the only memory regulator.

Negative values are floored to 0.

Each buffered partition holds its decoded records in memory until the yield loop consumes them. Memory: O((n+1) partitions) — current + n prefetched.

type Reader added in v0.14.0

type Reader[T any] struct {
	// contains filtered or unexported fields
}

Reader is the read-side half of a Store. Owns Read / ReadIter / ReadRangeIter / Poll / PollRecords / OffsetAt. Construct directly via NewReader in read-only services, or via NewReaderFromWriter / NewReaderFromStore for a (possibly narrower-T) Reader over a Writer's / Store's data.

func NewReader added in v0.14.0

func NewReader[T any](cfg ReaderConfig[T]) (*Reader[T], error)

NewReader constructs a Reader directly from ReaderConfig. Intended for read-only services that have no write-side config to supply (no PartitionKeyOf / Compression).

Validates EntityKeyOf and VersionOf are both set or both nil.

func NewReaderFromStore added in v0.14.0

func NewReaderFromStore[T, U any](
	s *Store[U], cfg ReaderConfig[T],
) (*Reader[T], error)

NewReaderFromStore is NewReaderFromWriter with a Store[U] instead of a Writer[U] — extracts the embedded Writer and forwards. Common shape: one Store writes FullRec, a few NewReaderFromStore calls produce narrow Readers for hot-path reads without respecifying the shared config.

func NewReaderFromWriter added in v0.14.0

func NewReaderFromWriter[T, U any](
	w *Writer[U], cfg ReaderConfig[T],
) (*Reader[T], error)

NewReaderFromWriter constructs a Reader[T] over the data a Writer[U] produces. T may equal U (same-shape read) or differ from U — typically a narrower struct that omits heavy write-only columns (parquet-go skips unlisted columns on decode). The Writer's Target overrides whatever Target cfg carries (or doesn't), so the resulting Reader inherits the writer's ConsistencyControl automatically; read-side knobs (EntityKeyOf, VersionOf) come from cfg.

Dedup closures (EntityKeyOf / VersionOf) on the Writer are typed over U and cannot be auto-transformed to T; the caller re-declares them in cfg when dedup is needed.

func (*Reader[T]) OffsetAt added in v0.14.0

func (s *Reader[T]) OffsetAt(t time.Time) Offset

OffsetAt returns the stream offset corresponding to wall-clock time t: any ref written at or after t sorts >= the returned offset, any ref written before t sorts <. Pure computation — no S3 call. Pair with WithUntilOffset on Poll / PollRecords to read records within a time window — or use ReadRangeIter, which takes time.Time bounds directly.

func (*Reader[T]) Poll added in v0.14.0

func (s *Reader[T]) Poll(
	ctx context.Context,
	since Offset,
	maxEntries int32,
	opts ...PollOption,
) (out []StreamEntry, nextOffset Offset, err error)

Poll returns up to maxEntries stream entries (refs only) after the given offset, capped at now - SettleWindow to avoid races with in-flight writes. One LIST call against the ref stream plus one HEAD per ref against `<token>.commit` (collapsed by a per-poll cache when refs share a token); no parquet GETs. Returns (entries, nextOffset, error); checkpoint nextOffset and pass it as `since` on the next call.

Refs whose `<token>.commit` is missing (404) are skipped: by the time the ref clears the SettleWindow cutoff, the writer has either committed (200) or returned an error to its caller (no commit will land). Refs whose commit's `attemptid` doesn't match the ref's attempt-id are also skipped — they're orphans from a failed-mid-write retry under the same token.

Pass WithUntilOffset to bound the walk from above so long streams aren't scanned past the window of interest.

nextOffset advances over every ref the LIST visits, including ones the gate skips. Once a ref's refMicroTs is past the SettleWindow cutoff, its commit outcome is final — re-walking it on the next poll wouldn't surface anything new.

func (*Reader[T]) PollRange added in v0.20.0

func (s *Reader[T]) PollRange(
	ctx context.Context, since, until time.Time,
) ([]StreamEntry, error)

PollRange drains the ref stream over the [since, until) wall-clock window in one call, returning every StreamEntry gated by the commit-marker (same gating Poll applies). Pages internally at s3ListMaxKeys; one Poll per page until exhausted.

Zero time.Time on either bound means unbounded: since=zero starts at the stream head, until=zero walks to the live tip (now - SettleWindow, captured at call entry so the upper bound stays stable under concurrent writes). Same time-bound semantics as ReadRangeIter.

Use to enumerate refs / partitions before deciding scope, intersect partition sets across Stores for a filtered zip, or feed metadata into custom decode workflows. Each StreamEntry carries the partition Key, DataPath, RefPath, InsertedAt, and RowCount — everything a caller needs to inspect or pass downstream.

Memory: O(refs in range × StreamEntry size). For huge ranges chunk by smaller windows or use ReadRangeIter (streaming) / PollRecords (cursor-based) instead.

Order: refs in time order (refMicroTs lex-ascending, same as Poll). Partition emission order on snapshot reads is lex-ascending by partition Key — re-group via HivePartition.Key if you want partition-grouped output.

func (*Reader[T]) PollRecords added in v0.14.0

func (s *Reader[T]) PollRecords(
	ctx context.Context,
	since Offset,
	maxEntries int32,
	opts ...PollOption,
) (out []T, nextOffset Offset, err error)

PollRecords returns typed records from the files referenced by up to maxEntries refs after the offset, plus the next offset for checkpointing. Cursor-based, CDC-style: caller resumes from the returned offset on the next call.

Replica-dedup only: records sharing (entity, version) collapse to one (rare retries / zombies); distinct versions of the same entity all flow through. Latest-per-entity dedup is NOT offered — meaningless on a cursor since the next batch may carry a newer version of the same entity. For latest-per-entity, use Read or ReadRangeIter (snapshot-style).

Records emit in partition-lex order, then per-partition (entity, version) order within each. Cross-partition temporal ordering within a batch is NOT preserved — refs are still walked in time order to advance nextOffset, but record decode runs through the same per-partition pipeline as ReadIter / ReadRangeIter, so consumers needing wall-clock ordering across partitions must re-sort by their own timestamp field. The "don't miss records" property (correct nextOffset advancement) is unaffected.

Per-partition replica-dedup precondition: EntityKeyOf must be fully determined by the partition key — same as ReadIter. Replicas of the same (entity, version) always live in the same Hive partition under the precondition, so per-partition replica dedup is correctness-equivalent to global replica dedup.

func (*Reader[T]) Read added in v0.14.0

func (s *Reader[T]) Read(
	ctx context.Context, keyPatterns []string, opts ...ReadOption,
) (out []T, err error)

Read returns all records whose data files match any of the given key patterns. Patterns use the grammar described in validateKeyPattern; pass multiple when the target set isn't a Cartesian product (e.g. (period=A, customer=X) and (period=B, customer=Y) but not the off-diagonal pairs).

When EntityKeyOf and VersionOf are configured, the result is deduplicated per Hive partition to the latest version per entity (pass WithHistory to opt out). Correctness requires EntityKeyOf to be fully determined by the partition key so no entity ever spans partitions — same precondition as ReadIter. Overlapping patterns are safe — each parquet file is fetched and decoded at most once.

Records emit in partition-lex order with per-partition (entity, version) order within each. All records are buffered before return — for unbounded reads, use ReadIter or ReadPartitionIter instead. Empty patterns slice returns (nil, nil); a malformed pattern fails with the offending index.

On NoSuchKey: Read fails (LIST-to-GET race is rare enough that surfacing it as an error is more honest than silently skipping, and the caller's retry resolves it).

func (*Reader[T]) ReadEntriesIter added in v0.20.0

func (s *Reader[T]) ReadEntriesIter(
	ctx context.Context, entries []StreamEntry, opts ...ReadOption,
) iter.Seq2[T, error]

ReadEntriesIter streams records from a pre-resolved []StreamEntry — typically the output of PollRange or Poll — as an iter.Seq2[T, error]. Skips the LIST + commit-gate phase that ReadIter / ReadRangeIter do internally; useful when a caller already enumerated refs (for inspection, filtering, or cross-store coordination) and wants to decode them without paying the resolution cost a second time.

Same per-partition dedup, byte-budget, and read-ahead semantics as ReadRangeIter. Tolerant of NoSuchKey: an operator-driven prune between resolution and decode is logged + counted via s3store.read.missing_data and the affected file is skipped, so the consumer keeps advancing.

Cross-store safety: the iter validates upfront that every entry's DataPath belongs to this Reader's prefix (<prefix>/data/...). Passing entries from a different Store yields a wrapped error before any S3 traffic — entries cannot be silently mis-routed across Stores. Cross-bucket misuse (same prefix, different bucket) bypasses this check but fails loudly via NoSuchKey because the receiving bucket doesn't have those exact keys (UUIDv7 in the data path makes accidental hits effectively impossible).

Empty entries slice yields nothing without error. Records emit in partition-lex order (the entries are re-grouped into the same per-partition pipeline ReadIter / ReadRangeIter use).

func (*Reader[T]) ReadIter added in v0.14.0

func (s *Reader[T]) ReadIter(
	ctx context.Context, keyPatterns []string, opts ...ReadOption,
) iter.Seq2[T, error]

ReadIter returns an iter.Seq2[T, error] yielding records one at a time, streaming partition-by-partition. Use when Read's O(records) memory is a problem.

Dedup is per-partition (uniform across every read path now): correct only when the partition key strictly determines every component of EntityKeyOf so no entity ever spans partitions. For layouts that don't satisfy this invariant, pass WithHistory and dedup yourself.

Partitions emit in lex order. Within a partition, record order depends on dedup configuration: when EntityKeyOf is set, records are in (entity, version) ascending order — last-wins on tied versions for default dedup, first-wins per (entity, version) group for WithHistory replica dedup. When EntityKeyOf is nil (no dedup configured), records emit in decode order: file lex order, then parquet row order within each file.

Memory: O(one partition's records) by default. Tune with WithReadAheadPartitions (default 1; overlap decode of N+1 with yield of N) and/or WithReadAheadBytes (uncompressed-size cap).

Breaking out of the for-range loop cancels in-flight downloads — no manual Close. Empty patterns slice yields nothing; a malformed pattern surfaces as the iter's first error.

func (*Reader[T]) ReadPartitionEntriesIter added in v0.20.0

func (s *Reader[T]) ReadPartitionEntriesIter(
	ctx context.Context, entries []StreamEntry, opts ...ReadOption,
) iter.Seq2[HivePartition[T], error]

ReadPartitionEntriesIter is the partition-grouped variant of ReadEntriesIter: streams records from a pre-resolved []StreamEntry as one HivePartition[T] per Hive partition.

Use when the consumer already has the entries (typically from PollRange) AND wants per-partition batches for joins, group-bys, or zip-by-partition workflows across multiple Stores.

Same per-partition dedup, byte-budget, read-ahead, tolerant- NoSuchKey semantics as ReadPartitionRangeIter. Same upfront cross-store guard as ReadEntriesIter — every entry's DataPath must belong to this Reader's prefix or the iter yields a wrapped error before any S3 traffic.

Empty entries slice yields nothing without error. Partitions emit in lex order of HivePartition.Key.

func (*Reader[T]) ReadPartitionIter added in v0.20.0

func (s *Reader[T]) ReadPartitionIter(
	ctx context.Context, keyPatterns []string, opts ...ReadOption,
) iter.Seq2[HivePartition[T], error]

ReadPartitionIter is the partition-grouped variant of ReadIter: instead of yielding records one at a time, it yields one HivePartition[T] per Hive partition with all of that partition's records collected as Rows.

Use when the consumer needs to process a partition's records as a batch — joins, group-bys, per-partition aggregates — without re-grouping a flat record stream by hand.

Dedup, byte-budget, read-ahead, and missing-data semantics are identical to ReadIter:

  • Per-partition dedup: correct only when the partition key strictly determines every component of EntityKeyOf so no entity ever spans partitions. Use WithHistory to opt out (replica-only collapse).
  • Memory bounded by WithReadAheadPartitions / WithReadAheadBytes.
  • NoSuchKey on a data file is strict (mirrors ReadIter): the iter surfaces the wrapped error so the caller's retry can resolve the LIST-to-GET race.

Breaking out of the for-range loop cancels in-flight downloads. Empty patterns slice yields nothing; a malformed pattern surfaces as the iter's first error.

func (*Reader[T]) ReadPartitionRangeIter added in v0.20.0

func (s *Reader[T]) ReadPartitionRangeIter(
	ctx context.Context,
	since, until time.Time,
	opts ...ReadOption,
) iter.Seq2[HivePartition[T], error]

ReadPartitionRangeIter is the partition-grouped variant of ReadRangeIter: streams every record written in the [since, until) time window as one HivePartition[T] per Hive partition.

Same time-window resolution as ReadRangeIter: zero time.Time on either bound means unbounded — since=zero starts at the stream head, until=zero walks to the live tip (now - SettleWindow, captured at call entry so the upper bound stays stable under concurrent writes).

Same per-partition dedup as ReadRangeIter (default latest-per-entity per partition; pass WithHistory to opt out for replica-only). Memory bounded by WithReadAheadPartitions / WithReadAheadBytes.

Tolerant of NoSuchKey on data files (mirrors ReadRangeIter): missing files are skipped with a slog.Warn + missing-data counter so the consumer can keep advancing — operator-driven prunes leave refs pointing at deleted files and a strict failure here can't be resolved by a caller retry.

Does NOT expose per-batch offsets; consumer aborts cannot safely resume. Use PollRecords + manual partition grouping when you need to checkpoint between batches.

func (*Reader[T]) ReadRangeIter added in v0.14.0

func (s *Reader[T]) ReadRangeIter(
	ctx context.Context,
	since, until time.Time,
	opts ...ReadOption,
) iter.Seq2[T, error]

ReadRangeIter streams every record written in the [since, until) time window as an iter.Seq2[T, error]. Snapshot view of the ref stream over a wall-clock range — no offset cursor, no resume.

Zero time.Time on either bound means unbounded: since=zero starts at the stream head, until=zero walks to the live tip (now - SettleWindow, captured at call entry so the upper bound stays stable under concurrent writes). Pair with non-zero values for time-windowed reads:

start := time.Date(2026, 4, 17, 0, 0, 0, 0, time.UTC)
end   := time.Date(2026, 4, 18, 0, 0, 0, 0, time.UTC)
for r, err := range store.ReadRangeIter(ctx, start, end) { ... }

Same per-partition dedup as ReadIter (default latest-per-entity per partition; pass WithHistory to opt out). Memory bounded by WithReadAheadPartitions / WithReadAheadBytes.

The ref-LIST runs upfront before the first record yields — usually sub-100ms but huge windows can take seconds; chunk via since/until. Breaking out of the loop cancels in-flight downloads. Errors are yielded as (zero, err) and terminate the iter.

Does NOT expose per-batch offsets — consumer aborts cannot safely resume. Use PollRecords (Kafka-style cursor) when you need to checkpoint between batches.

func (*Reader[T]) Target added in v0.14.0

func (r *Reader[T]) Target() S3Target

Target returns the untyped S3Target this Reader is bound to. Use when constructing a MaterializedViewReader[K] or running BackfillMaterializedView against the same dataset without carrying T through the caller's signatures.

type ReaderConfig added in v0.14.0

type ReaderConfig[T any] struct {
	Target      S3Target
	EntityKeyOf func(T) string
	VersionOf   func(T) int64
}

ReaderConfig is the narrower Config form for constructing a Reader directly. Holds the S3-wiring bundle (Target — a constructed S3Target) plus read-side-only knobs. Use NewReader(cfg) in read-only services that have no PartitionKeyOf / Compression config to supply.

Target is built once via NewS3Target and can be passed to both WriterConfig.Target and ReaderConfig.Target so the resulting Writer and Reader share the same MaxInflightRequests semaphore.

Dedup contract: EntityKeyOf and VersionOf are both set or both nil — NewReader rejects partial configurations. When set, records dedup to the latest-per-entity by VersionOf; when nil, every record flows through.

type S3Target added in v0.5.0

type S3Target struct {
	// contains filtered or unexported fields
}

S3Target is the constructed live handle to an s3parquet dataset. Built once from an S3TargetConfig via NewS3Target; all S3 operations go through this type so the per-target MaxInflightRequests semaphore caps net in-flight requests regardless of fan-out axis.

Pass the same S3Target value to WriterConfig.Target and ReaderConfig.Target so the Writer and Reader share the same semaphore. The struct is copied by value but every field is a reference type (chan, pointer, slice header) so copies share the underlying state.

Fields are unexported and immutable after construction: callers read via the accessor methods. Re-construct with a fresh NewS3Target if you need to change MaxInflightRequests or any other field.

func NewS3Target added in v0.5.0

func NewS3Target(ctx context.Context, cfg S3TargetConfig) (S3Target, error)

NewS3Target constructs a live S3Target from config. Calls cfg.ValidateLookup (Bucket, Prefix, S3Client) up-front so the GETs that follow have the wiring they need, then GETs the persisted timing-config objects at <Prefix>/_config/commit-timeout and <Prefix>/_config/max-clock-skew, parses each as a Go time.Duration string, and rejects values below the configured floors (CommitTimeoutFloor = 0, MaxClockSkewFloor = 0 — only negatives are nonsensical) before stamping the values (and the derived SettleWindow) on the Target. Construction fails when ValidateLookup fails or when either object is missing, unparseable, or negative — operators must seed the dataset's prefix before any process can construct a Target against it (see README's "Initializing a new dataset").

Does not call Validate (PartitionKeyParts) — that's a Writer / Reader concern and is checked by NewWriter / NewReader, not by every Target consumer (NewMaterializedViewReader is read-only and doesn't need PartitionKeyParts).

Logs a warning when ConsistencyControl is non-empty but doesn't match a named ConsistencyLevel constant — typo guard with no effect on behaviour (the value is still sent verbatim).

func (S3Target) Bucket added in v0.14.0

func (t S3Target) Bucket() string

Bucket returns the S3 bucket name.

func (S3Target) CommitTimeout added in v0.15.0

func (t S3Target) CommitTimeout() time.Duration

CommitTimeout returns the value resolved at construction time from <Prefix>/_config/commit-timeout. Pure accessor — no I/O. Bounds the writer's ref-PUT-to-token-commit-completion budget for one Write: past CommitTimeout (measured from refMicroTs, captured just before the ref PUT, to the moment the token-commit PUT returns), the s3store.write.commit_after_timeout counter increments and Write returns an error (the commit still lands; the metric flags that the reader's SettleWindow tuned for this CommitTimeout may not yet have included this write in the stream window). Pre-ref work — parquet encoding, marker PUTs, data PUT — is deliberately outside the budget; only the ref-LIST-visible → token-commit-visible window can put the SettleWindow contract at risk.

func (S3Target) Config added in v0.14.0

func (t S3Target) Config() S3TargetConfig

Config returns a copy of the S3TargetConfig the target was built from. Use for introspection or passing to tools that expect the config form.

func (S3Target) ConsistencyControl added in v0.14.0

func (t S3Target) ConsistencyControl() ConsistencyLevel

ConsistencyControl returns the configured Consistency-Control header value applied to every routed S3 call.

func (S3Target) EffectiveMaxInflightRequests added in v0.14.0

func (t S3Target) EffectiveMaxInflightRequests() int

EffectiveMaxInflightRequests forwards to S3TargetConfig.EffectiveMaxInflightRequests.

func (S3Target) MaxClockSkew added in v0.15.0

func (t S3Target) MaxClockSkew() time.Duration

MaxClockSkew returns the value resolved at construction time from <Prefix>/_config/max-clock-skew. Pure accessor — no I/O. Operator's assumed bound on writer↔reader wall-clock divergence (refMicroTs in the ref filename is writer-stamped, so this is the skew SettleWindow has to absorb). Consumed by the reader's refCutoff via SettleWindow.

func (S3Target) PartitionKeyParts added in v0.14.0

func (t S3Target) PartitionKeyParts() []string

PartitionKeyParts returns the configured Hive-partition keys.

func (S3Target) Prefix added in v0.14.0

func (t S3Target) Prefix() string

Prefix returns the dataset's key prefix.

func (S3Target) S3Client added in v0.14.0

func (t S3Target) S3Client() *s3.Client

S3Client returns the configured AWS SDK v2 client.

func (S3Target) SettleWindow added in v0.15.0

func (t S3Target) SettleWindow() time.Duration

SettleWindow returns the derived sum CommitTimeout + MaxClockSkew. Used by Poll's `refCutoff = now - SettleWindow`. Sized so the cutoff cannot overtake refs whose token-commit is still being written: CommitTimeout bounds the writer's ref-PUT + token-commit-PUT envelope (measured from refMicroTs onward; pre-ref work is outside the budget by design), MaxClockSkew bounds the writer↔reader wall-clock divergence applied to the writer-stamped refMicroTs. Pure accessor — no I/O.

func (S3Target) Validate added in v0.14.0

func (t S3Target) Validate() error

Validate forwards to S3TargetConfig.Validate.

func (S3Target) ValidateLookup added in v0.14.0

func (t S3Target) ValidateLookup() error

ValidateLookup forwards to S3TargetConfig.ValidateLookup.

type S3TargetConfig added in v0.14.0

type S3TargetConfig struct {
	// Bucket is the S3 bucket name.
	Bucket string

	// Prefix is the key prefix under which data/ref/matview files
	// live. Must be non-empty — a bare bucket root would collide
	// with any other tenant of the bucket.
	Prefix string

	// S3Client is the AWS SDK v2 client to use. Endpoint, region,
	// credentials, and path-style setting are used as-is.
	S3Client *s3.Client

	// PartitionKeyParts declares the Hive-partition key segments in
	// the order they appear in the S3 path. Read/Write key
	// patterns are validated against this order.
	PartitionKeyParts []string

	// MaxInflightRequests caps the number of S3 requests a single
	// constructed S3Target may have outstanding at once. Enforced
	// by a semaphore inside S3Target — every PUT/GET/HEAD/LIST
	// acquires one slot before issuing and releases on completion,
	// so the cap holds across every fan-out axis (partitions,
	// files, patterns, markers) without per-axis tuning.
	//
	// Zero → default (32). The cap is per S3Target: one Writer +
	// one Reader sharing the same constructed S3Target share the
	// cap; two Targets do not.
	//
	// The AWS SDK v2's default HTTP transport leaves
	// MaxConnsPerHost unlimited (Go default 0) and sets
	// MaxIdleConnsPerHost to 100, so this library cap is what
	// bounds parallelism for stock-configured clients — no
	// transport tuning is needed at the defaults. Only if you've
	// explicitly set a non-zero MaxConnsPerHost on your
	// *s3.Client's transport does it need to be >=
	// MaxInflightRequests, otherwise excess requests queue at the
	// transport layer instead of running in parallel.
	MaxInflightRequests int

	// ConsistencyControl sets the Consistency-Control HTTP header
	// applied to every correctness-critical S3 operation routed
	// through this target — data PUTs (per-attempt-path, never
	// overwriting), ref PUTs, token-commit PUTs, matview marker
	// PUTs, data / config GETs, the upfront-dedup HEAD on
	// `<token>.commit`, and every LIST (partition LIST on the
	// read path, matview-marker LIST in MaterializedViewReader.Lookup,
	// ref-stream LIST in Poll/PollRecords/ReadRangeIter).
	//
	// Zero value (ConsistencyDefault) is substituted at
	// construction with ConsistencyStrongGlobal — the safe
	// multi-site choice that lets sequential same-token
	// retries observe a prior token-commit overwrite without
	// surfacing as a redundant PUT. Single-site deployments can
	// downgrade to ConsistencyStrongSite explicitly when the
	// per-call cost matters. AWS S3 and MinIO ignore the header
	// entirely; the substitution is a no-op for those backends.
	// See the README's "Consistency levels × S3 operations"
	// section for the full matrix.
	//
	// Setting the level on the target rather than on the Writer /
	// Reader / MaterializedView configs enforces NetApp's "same
	// consistency for paired operations" rule by construction:
	// every operation routed through this target uses one and the
	// same value.
	ConsistencyControl ConsistencyLevel

	// MeterProvider, when set, supplies the OTel meter used to
	// record S3 op latencies, library method durations, semaphore
	// wait/inflight depth, fan-out spans, and bytes/records/files
	// counters. Zero value falls back to otel.GetMeterProvider() —
	// users who configure OTel globally pick up metrics without
	// touching this field; users who don't get the OTel global
	// no-op (zero overhead).
	//
	// Bucket and Prefix from this config plus ConsistencyControl
	// (when non-empty) are baked into every observation as
	// constant attributes (s3store.bucket / s3store.prefix /
	// s3store.consistency_level), so multi-Target deployments
	// can be distinguished without per-call overhead.
	MeterProvider metric.MeterProvider
}

S3TargetConfig is the user-facing config for an s3parquet dataset — pure data, struct-literal-friendly. Convert to a live S3Target via NewS3Target before passing to a Writer/Reader/ MaterializedViewReader/BackfillMaterializedView.

Embedded indirectly via WriterConfig.Target / ReaderConfig.Target (which carry an S3Target — the live form) so the four S3-wiring fields plus knobs live in exactly one place. Surfaced on Writer/Reader/Store via .Target() so read-only tools (NewMaterializedViewReader, BackfillMaterializedView) can address the same dataset without carrying T through their call graph.

func (S3TargetConfig) EffectiveMaxInflightRequests added in v0.14.0

func (c S3TargetConfig) EffectiveMaxInflightRequests() int

EffectiveMaxInflightRequests returns the configured MaxInflightRequests, or 32 when unset. 32 utilises typical S3 backends (~50 ms request latency → ~640 req/s sustained per Target) while staying comfortably below per-project concurrency caps published by managed S3 vendors (typically several hundred concurrent operations). Tune lower for small MinIO setups or very large parquet files (memory cost = 32 × largest parquet body).

func (S3TargetConfig) Validate added in v0.14.0

func (c S3TargetConfig) Validate() error

Validate runs the full check for constructors that operate on partitioned data: Bucket, Prefix, S3Client, PartitionKeyParts. Used by NewWriter, NewReader, BackfillMaterializedView — anything that reads/writes data files keyed by partition.

func (S3TargetConfig) ValidateLookup added in v0.14.0

func (c S3TargetConfig) ValidateLookup() error

ValidateLookup is the reduced check for constructors that only LIST / GET / PUT under a known prefix (no partition-key predicates): Bucket, Prefix, S3Client. Used by NewMaterializedViewReader — Lookup walks the <Prefix>/_matview/<name>/ subtree, which is keyed by the view's own Columns, not the config's PartitionKeyParts. A read-only analytics service can pass a minimally-populated S3TargetConfig and still build a working MaterializedViewReader.

type Store

type Store[T any] struct {
	*Writer[T]
	*Reader[T]
}

Store is the pure-Go entry point to an s3store. It composes an internal Writer + Reader: the two halves own their own state and methods, Store re-exposes everything via embedding so existing "one Store does both" callers keep working.

func New

func New[T any](ctx context.Context, cfg StoreConfig[T]) (*Store[T], error)

New constructs a Store from StoreConfig: builds an S3Target from the embedded S3TargetConfig, then projects the side-specific knobs onto WriterConfig[T] / ReaderConfig[T] and delegates to NewWriter + NewReader. The Writer and Reader share the one constructed S3Target — same MaxInflightRequests semaphore, same ConsistencyControl, same resolved CommitTimeout + MaxClockSkew.

Performs two S3 GETs at construction time — the persisted timing-config objects at <Prefix>/_config/commit-timeout and <Prefix>/_config/max-clock-skew — via NewS3Target. Construction fails when either object is missing, unparseable, or below its floor (CommitTimeoutFloor / MaxClockSkewFloor).

Services that only write or only read can skip this and call NewS3Target + NewWriter / NewReader directly with hand-built WriterConfig / ReaderConfig values.

func (*Store[T]) Target added in v0.6.0

func (s *Store[T]) Target() S3Target

Target returns the untyped S3Target the Store was built with. The Writer and Reader share one S3Target value (set once in New) so the choice of which embedded half to delegate to is cosmetic.

type StoreConfig added in v0.18.0

type StoreConfig[T any] struct {
	S3TargetConfig

	// PartitionKeyOf extracts the Hive-partition key from a
	// record. Required for Write(). The returned string must
	// conform to the PartitionKeyParts layout ("part=value/part=value").
	PartitionKeyOf func(T) string

	// EntityKeyOf returns the logical entity identifier for a
	// record. When non-nil, Read and PollRecords deduplicate to
	// the record with the maximum VersionOf per entity. When
	// nil, every record is returned (pure stream semantics).
	EntityKeyOf func(T) string

	// VersionOf returns the monotonic version of a record for
	// dedup ordering. Required when EntityKeyOf is set; ignored
	// otherwise. Typical implementations return a domain
	// timestamp / sequence number from a record field
	// (`func(r T) int64 { return r.UpdatedAt.UnixMicro() }`).
	//
	// To use the library's writer-stamped insertedAt as the
	// version, configure InsertedAtField on the writer side and
	// reference the same field here:
	//
	//	VersionOf: func(r T) int64 { return r.InsertedAt.UnixMicro() }
	VersionOf func(record T) int64

	// InsertedAtField names a time.Time field on T that the writer
	// populates with its wall-clock time.Now() captured just before
	// parquet encoding. The field must carry a non-empty, non-"-"
	// parquet tag (e.g. `parquet:"inserted_at"`) — the value is a
	// real parquet column persisted on disk. Empty disables the
	// feature; no reflection cost when unset.
	//
	// On the read side the column shows up on T as a normal field
	// — no special handling. Use it from VersionOf when you want
	// the writer's stamp to drive dedup ordering:
	//
	//	VersionOf: func(r T) int64 { return r.InsertedAt.UnixMicro() }
	//
	// Forwarded to WriterConfig only.
	InsertedAtField string

	// Compression selects the parquet compression codec used on
	// Write. Zero value is snappy — matches the ecosystem default
	// and produces ~2-3× smaller files than the parquet-go raw
	// default (uncompressed) for no meaningful CPU cost on
	// decode. Set to CompressionUncompressed to opt out,
	// CompressionZstd / CompressionGzip to trade CPU for ratio.
	// New() validates this value and stores the resolved codec so
	// the hot-path Write doesn't reparse it.
	Compression CompressionCodec

	// MaterializedViews lists the secondary materialized views the
	// writer should maintain. See WriterConfig.MaterializedViews
	// for the full contract. Forwarded to WriterConfig only —
	// readers don't emit markers.
	MaterializedViews []MaterializedViewDef[T]

	// EncodeBufPoolMaxBytes caps the encode-pool buffer capacity
	// the Writer retains across writes. See
	// WriterConfig.EncodeBufPoolMaxBytes for the contract.
	// Forwarded to WriterConfig only — reader path doesn't pool
	// encode buffers. Zero or negative selects the default
	// (48 MiB).
	EncodeBufPoolMaxBytes int64
}

StoreConfig defines how a Store is set up. T is the record type, which must be encodable and decodable by parquet-go directly (struct fields tagged with `parquet:"..."`, primitive-friendly types). Types with fields parquet-go can't encode (e.g. decimal.Decimal, custom wrappers) need a companion parquet-layout struct and a translation step in the caller's package.

Embeds S3TargetConfig so the S3 wiring (Bucket, Prefix, S3Client, PartitionKeyParts, MaxInflightRequests, ConsistencyControl, MeterProvider) lives in exactly one place. New() reads the embedded value, builds an S3Target via NewS3Target, then projects the side-specific knobs onto WriterConfig[T] / ReaderConfig[T] (both of which take a built S3Target directly). Advanced users who only need one side can skip this type and call NewWriter / NewReader with a hand-built S3Target — see those constructors.

type StreamEntry

type StreamEntry struct {
	Offset     Offset
	Key        string
	DataPath   string
	RefPath    string
	InsertedAt time.Time
	RowCount   int64
}

StreamEntry is a lightweight ref returned by Poll.

  • Offset and RefPath carry the same underlying S3 key string. Offset is typed for cursor-advancing in Poll-style APIs; RefPath exposes the same value as an explicit S3 object path for callers that want to GET the ref directly. Mirrors the Offset / RefPath split on WriteResult.
  • Key is the Hive-style partition key ("period=X/customer=Y") that the writer originally passed to WriteWithKey — useful for consumers to route records by partition without parsing DataPath.
  • DataPath is the S3 object key of the data file this ref points at; GET it to fetch the parquet payload.
  • InsertedAt is the writer's wall-clock capture immediately before the ref PUT (microsecond precision; same value embedded as `refMicroTs` in the ref filename). It approximates ref-LIST-visibility time. Slightly later than the InsertedAtField parquet column (stamped at pre-encode write-start so the column reflects logical record time, not commit-finalize time); the two values can drift by the encode + data-PUT duration.
  • RowCount is the number of records in the data file this ref points at, recovered from the token-commit's `rowcount` user-metadata. Free for Poll to surface — the gate already HEADs the marker per ref/token (cached per poll cycle), so no extra round trip.

type WriteOption added in v0.13.0

type WriteOption func(*writeOpts)

WriteOption configures write-path behavior. Mirrors the ReadOption pattern so the write side has the same mental model: one option type, one accumulator, one place to add new knobs.

func WithIdempotencyToken added in v0.13.0

func WithIdempotencyToken(token string) WriteOption

WithIdempotencyToken marks a write as a retry-safe logical unit identified by token. Recovery on retry is automatic and works across arbitrarily long S3 outages:

  • Each attempt writes data and ref to fresh per-attempt paths (id = {token}-{attemptID:32hex UUIDv7}). No data or ref PUT in the write path ever overwrites — sidesteps multi-site StorageGRID's eventual-consistency exposure on overwrites.
  • On retry, the writer issues an upfront HEAD on `<dataPath>/<partition>/<token>.commit`. If a prior attempt's commit is in place, the writer reconstructs the original WriteResult from the marker's metadata (no body re-upload, same DataPath / RefPath / InsertedAt as the prior successful attempt) and returns.
  • If the commit is missing (no prior attempt landed, or every prior attempt crashed before the commit PUT), the retry proceeds with a fresh attempt-id end-to-end.

Tokens are unique per (partition key, logical write), not globally — the same token may be reused across different partition keys without colliding. The upfront HEAD runs per-partition. Within one partition the token must remain unique per logical write.

**Reader-side dedup recommended.** Near-concurrent retry overlap (out of contract per the README's Concurrency contract, but bounded if it arises by accident) can produce two committed attempts whose records share (entity, version). Records are bit-identical (parquet encoding is deterministic; the writer captures InsertedAt once and the same value drives every column / filename / metadata field). Configure `EntityKeyOf` + `VersionOf` on the reader to collapse the duplicate records on read.

token must pass validateIdempotencyToken (non-empty, no "/", no "..", printable ASCII, <= 200 chars) — validation runs at resolve time so bad tokens fail at the call site rather than inside the write path.

An empty string is rejected (not silently downgraded to the auto-token path) so a caller wiring `WithIdempotencyToken(row.Token)` can't accidentally turn off idempotency by forgetting to populate the source. Callers whose token is genuinely optional should branch at construction:

opts := []s3store.WriteOption{...}
if row.Token != "" {
    opts = append(opts, s3store.WithIdempotencyToken(row.Token))
}

Mutually exclusive with WithIdempotencyTokenOf — combining the two surfaces an error at option-resolution time.

func WithIdempotencyTokenOf added in v0.15.0

func WithIdempotencyTokenOf[T any](
	fn func(partitionRecords []T) (string, error),
) WriteOption

WithIdempotencyTokenOf is the per-partition variant of WithIdempotencyToken: fn is invoked once per partition with the records routed to that partition, and its return value drives that partition's idempotency token. Lets a single multi- partition Write retry with a different token per partition (e.g. one outbox row per partition).

Semantics on each partition match WithIdempotencyToken: the returned token anchors the per-attempt id and the `<dataPath>/<partition>/<token>.commit` marker, the upfront HEAD dedups same-token retries, and the token must pass validateIdempotencyToken.

On WriteWithKey (single partition) fn is invoked once with the full records slice — symmetric with the multi-partition case.

fn returning an error fails that partition's write; under Write's parallel fan-out a single failure propagates as "first error wins" with partial-success on already-committed partitions (same shape as any other write-path error).

Mutually exclusive with WithIdempotencyToken — passing both surfaces an error at option-resolution time. fn=nil is a no-op (the partition runs the non-idempotent auto-token path).

func WithInsertedAt added in v0.16.0

func WithInsertedAt(t time.Time) WriteOption

WithInsertedAt overrides the writer's default "insertion time" for this batch. The supplied value flows uniformly to every downstream surface — the parquet InsertedAtField column (when configured), the token-commit `insertedat` user-metadata, and WriteResult.InsertedAt — replacing the default capture of time.Now() taken just before parquet encoding.

Use cases:

  • Caller-owned event time. The "real" insertion timestamp lives in an outbox row / external system; passing it here keeps the in-file column and result aligned with that source rather than the writer's wall-clock.
  • Reproducible writes. Tests and replays that supply the same records + same InsertedAt + same compression codec produce byte-identical parquet bytes (the only non-determinism in the encode path is the InsertedAtField injection).

Truncated to microsecond precision at use site so a LookupCommit round-trip yields a time.Time that compares Equal to the value in the original WriteResult.

The zero value (time.Time{}) is treated as "not supplied" — the writer falls back to time.Now(). Callers who legitimately need to stamp the year-1 zero time should pass a value with a non-zero monotonic component, but no real workload needs this.

Idempotency-retry interaction: when the upfront HEAD on `<token>.commit` finds a prior commit, the returned WriteResult is reconstructed from the prior commit's metadata — WithInsertedAt on the retry attempt is ignored in favour of the original attempt's value. This preserves the contract that a same-token retry returns the original WriteResult unchanged.

func WithOptimisticCommit added in v0.21.0

func WithOptimisticCommit() WriteOption

WithOptimisticCommit skips the per-write upfront HEAD on `<token>.commit` and detects prior commits via the commit PUT itself: the writer attaches `If-None-Match: *` so a backend that supports conditional PUT returns 412 PreconditionFailed when the commit already exists, or — on a backend with a bucket policy denying s3:PutOverwriteObject on the commit subtree — returns 403 AccessDenied. Either signal triggers a HEAD round-trip to recover the prior commit's metadata, and the caller receives the original WriteResult unchanged.

Trade-off:

  • **Save** one HEAD per write (5–15ms on AWS, ~1× the request budget for that operation) on the steady-state path. At scale this is the dominant savings — a Write fanning out across N partitions is N HEADs lighter, ~20% fewer S3 requests per Write.
  • **Pay** a multi-MB orphan parquet + orphan ref per retry-found-prior-commit (every same-token retry that lands after a successful commit). The orphans are invisible to readers via the commit gate (their attempt-id ≠ canonical), and operator-driven cleanup reclaims them (S3 lifecycle rule, manual prune).

Break-even sits around a 5% retry-found-prior rate: below that, the HEAD savings dominate; above that, the orphan + bandwidth cost outweighs the savings. Healthy CDC pipelines retry well under 1%, so the option is a clear win for high-throughput workloads near the request-rate ceiling. Bulk migrations with frequent orchestrator restarts may not benefit.

**Backend requirement.** The detection mechanism needs ONE of:

  • Conditional PUT (`If-None-Match: *`) — atomic server-side precondition. Supported on AWS S3 since November 2024, MinIO recent versions, and StorageGRID 12.0+. Returns 412.
  • Bucket policy denying `s3:PutOverwriteObject` on the `<prefix>/data/*/*.commit` subtree. Used on older StorageGRID versions where conditional PUT isn't available. Returns 403. The s3store README documents the policy + a boto3 setup snippet under "Optimistic commit setup".

On a backend that supports neither, the second commit PUT silently overwrites — the WriteResult returned to the caller will be the *new* attempt's, not the prior commit's, breaking the same-token-retry-returns-same-WriteResult contract. The option does not detect this at construction time; verify support before enabling.

Mutually compatible with WithIdempotencyToken / WithIdempotencyTokenOf. Has no effect on the auto-token (no-token) path: a freshly- generated UUIDv7 is guaranteed to 404 by construction, so the upfront HEAD is already skipped there regardless of this option.

type WriteResult

type WriteResult struct {
	Offset     Offset
	DataPath   string
	RefPath    string
	InsertedAt time.Time
	// RowCount is the number of records persisted in this commit's
	// parquet file. On a fresh write this is len(records) for the
	// partition; on the retry-finds-prior-commit path it is recovered
	// from the token-commit's `rowcount` user-metadata so the value
	// matches the original attempt's parquet exactly. Surfaced so
	// retry-recovery callers — who don't have the original records
	// slice — can still report batch size without GETting the parquet.
	RowCount int64
}

WriteResult contains metadata about a completed write.

InsertedAt is the writer's pre-encode wall-clock at write-start (microsecond precision) by default — or the caller's WithInsertedAt override, when supplied — the same value stamped into the parquet's InsertedAtField column. Surfaced on the result so callers can log / persist it (e.g., into an outbox table) without parsing the data path or issuing a HEAD.

Under WithIdempotencyToken, a same-token retry whose upfront HEAD on `<token>.commit` finds a prior commit returns *that* commit's WriteResult (DataPath, RefPath, InsertedAt all reflect the prior attempt — InsertedAt is recovered from the token-commit's `insertedat` user-metadata, so it agrees with the prior attempt's column value byte-for-byte). Callers comparing two results from the same token across retries will therefore see identical values whenever a prior attempt's token-commit is still in place.

type Writer added in v0.14.0

type Writer[T any] struct {
	// contains filtered or unexported fields
}

Writer is the write-side half of a Store. Owns the write path (Write / WriteWithKey) and the materialized-view list that drives marker emission on Write.

Construct directly via NewWriter when a service only writes; embed in Store when it also reads.

func NewWriter added in v0.14.0

func NewWriter[T any](cfg WriterConfig[T]) (*Writer[T], error)

NewWriter constructs a Writer directly from WriterConfig. Use this in services that only write; use New(Config) when the same process also reads through a Reader/Store.

Validation mirrors the writer-side half of New: the Target must carry Bucket / Prefix / S3Client / PartitionKeyParts; Compression resolves to a codec (zero value → snappy); every MaterializedViewDef in cfg.MaterializedViews is shape-validated and Of must be non-nil. View names must be unique across the slice. PartitionKeyOf is optional at construction — Write errors if called without it, but WriteWithKey works regardless.

Constructor performs no S3 I/O. Idempotent retries are gated by the writer's upfront HEAD on `<dataPath>/<partition>/<token>.commit` (see WithIdempotencyToken): data and ref PUTs land at fresh per-attempt paths and never overwrite, and a prior attempt's commit marker reconstructs the original WriteResult unchanged. Cross-backend uniformity — no dependency on If-None-Match support or bucket-policy denies.

func (*Writer[T]) GroupByPartition added in v0.20.0

func (s *Writer[T]) GroupByPartition(records []T) []HivePartition[T]

GroupByPartition splits records by their Hive partition key (PartitionKeyOf) and returns one HivePartition per distinct key in lex-ascending order of Key. Deterministic across calls — same input produces byte-identical output, mirroring the read-side emission-order invariant. Same HivePartition[T] type the ReadPartition* methods yield.

Use to preview partitioning without paying the write cost (logging, sharding decisions, dry-run validation) or as a building block for custom write strategies (filter some partitions, write the rest; route partitions to different Targets; etc.).

Empty records returns nil. Records whose PartitionKeyOf returns the same string land in the same HivePartition; per-partition record order is preserved (insertion order from the input slice).

Public contract: partition emission is lex-ordered. See "Deterministic emission order across read and write paths" in CLAUDE.md — GroupByPartition is the write-side counterpart of that invariant.

func (*Writer[T]) LookupCommit added in v0.15.0

func (w *Writer[T]) LookupCommit(
	ctx context.Context, partition, token string,
) (wr WriteResult, ok bool, err error)

LookupCommit returns the WriteResult of the canonical write committed under (partition, token), if any. One HEAD against `<dataPath>/<partition>/<token>.commit`; no LIST, no parquet re-encode, no parquet GET.

Sits on Writer because the use case is write-side: a service that stores the (partition, token) pair alongside its outbox row and, on retry, wants to know "did the prior attempt commit?" before re-fetching records and re-encoding parquet. WriteWithKey under the same token would do the same upfront HEAD internally, but only after re-encoding parquet — calling LookupCommit first lets retries skip the encode entirely on the recovery path. The HEAD + reconstruction primitives (headTokenCommit / reconstructWriteResult) are exactly the ones writeEncodedPayload's Step 2 calls; this method is the externalized form of that step.

On a hit, the returned WriteResult is byte-identical to what the original Write returned (DataPath / RefPath / Offset reconstructed from the path scheme + the marker's metadata; InsertedAt sourced from the `insertedat` user-metadata so it matches the parquet's InsertedAtField column value).

ok=false signals 404: no commit exists for that (partition, token). If the caller knows a Write was attempted for this token and ok=false, the write either crashed or returned an error to its original caller.

Validates token via validateIdempotencyToken so callers fail loudly on garbage rather than HEADing nonsensical keys.

func (*Writer[T]) PartitionKey added in v0.14.0

func (w *Writer[T]) PartitionKey(rec T) string

PartitionKey applies the configured PartitionKeyOf to rec and returns the resulting partition key. Intended for callers that want the single-partition WriteWithKey path without having to re-implement the key format at the call site:

_, err := w.WriteWithKey(ctx, w.PartitionKey(recs[0]), recs)

Panics if PartitionKeyOf was not set at construction — the same nil-func-call semantics Write gets, just surfaced at a clearer site.

func (*Writer[T]) RestampRef added in v0.23.0

func (w *Writer[T]) RestampRef(
	ctx context.Context, prior *WriteResult,
) (result *WriteResult, err error)

RestampRef writes an additional ref for an already-committed write, with a fresh writer wall-clock refMicroTs. The data file and `<token>.commit` marker are reused unchanged — the only S3 op is a single zero-byte PUT at a new ref key. Use this to recover from ErrCommitAfterTimeout when stream consumers need the write inside their SettleWindow: the new ref's refMicroTs is current, so the next poll past now+SettleWindow is guaranteed to see it with the commit gate already satisfied.

The same-token-retry path is NOT a recovery for stream observability — its upfront HEAD finds the prior commit and returns the prior WriteResult unchanged without writing any new ref, so the original refMicroTs is unchanged and a stream consumer past that offset still misses it. RestampRef is the only primitive that brings a timed-out commit back into the SettleWindow.

The returned WriteResult differs from prior only in Offset / RefPath (the new ref's key, with a fresh refMicroTs). DataPath, InsertedAt, and RowCount are unchanged. The original ref is left in place, so stream readers see two emissions for the same data: the consumer is responsible for tolerating the duplicate, either by configuring EntityKeyOf+VersionOf dedup (which collapses to a single record per (entity, version)) or by accepting at-least-once at the consumer.

Constraints:

  • prior must be the WriteResult of a commit produced by THIS Writer's store: prior.RefPath and prior.DataPath must lie under the Writer's _ref / data prefixes. A cross-Writer call surfaces an error rather than silently writing a ref that no reader can resolve through this Writer's commit gate.
  • prior must reflect a real commit. RestampRef does not HEAD `<token>.commit` first — it trusts that prior came from a successful Write/WriteWithKey/LookupCommit return. If the commit was manually deleted out-of-band, the new ref will be filtered as an orphan by readers (no harm, just no effect).

Idempotency: each call writes a fresh ref at a new `<refMicroTs>-<token>-<attemptID>` key, so multiple calls produce multiple stream emissions. The token / attemptID are reused from prior unchanged — the canonical-attempt check in reader_poll.go gates ref→data lookup on attemptID matching `<token>.commit`'s metadata, so a fresh attemptID would render the new ref invisible.

Like Write/WriteWithKey, RestampRef can return a non-nil WriteResult alongside an ErrCommitAfterTimeout error: the new ref IS durable, but elapsed wall-clock from refMicroTs to ref-PUT-completion exceeded CommitTimeout, so a stream reader's SettleWindow may already have advanced past the new refMicroTs by the time the ref becomes LIST-visible. Recovery: call RestampRef again with the same prior. Detect via errors.Is(err, ErrCommitAfterTimeout).

The ref PUT runs under context.WithoutCancel(ctx): the body is a single zero-byte PUT, so caller-prompt cancellation isn't worth the cost of a partially-completed PUT (response in flight, ref already on S3) returning err to a caller who'd retry and double up the duplicate. Caller cancellation is observed only at the boundaries (parameter validation, paths-prefix check, parseRefKey).

func (*Writer[T]) Target added in v0.14.0

func (w *Writer[T]) Target() S3Target

Target returns the untyped S3Target this Writer is bound to. Use when constructing read-only tools (NewMaterializedViewReader, BackfillMaterializedView) against the same dataset without carrying the Writer's T into their call graph.

func (*Writer[T]) Write added in v0.14.0

func (s *Writer[T]) Write(
	ctx context.Context, records []T, opts ...WriteOption,
) (results []WriteResult, err error)

Write extracts the key from each record via PartitionKeyOf, groups by key, and writes one Parquet file + stream ref per key in parallel (bounded by Target.MaxInflightRequests, default 32). Returns one WriteResult per partition that completed, in sorted-key order regardless of completion order.

On failure, cancels remaining partitions and returns whatever results landed first, with the first real (non-cancel) error. Partial success is the accepted outcome — each partition's data+markers+ref sequence is self-contained.

An empty records slice is a no-op: (nil, nil) is returned so callers don't have to guard against batch-pipeline edge cases.

func (*Writer[T]) WriteWithKey added in v0.14.0

func (s *Writer[T]) WriteWithKey(
	ctx context.Context, key string, records []T, opts ...WriteOption,
) (result *WriteResult, err error)

WriteWithKey encodes records as Parquet, uploads to S3, writes the ref file, and lands the token-commit marker that flips visibility for both the snapshot and stream read paths atomically.

Each attempt writes to a per-attempt data path (`{partition}/{token}-{attemptID}.parquet`), so no data PUT ever overwrites — sidesteps multi-site StorageGRID's eventual-consistency exposure on data-file overwrites. token is the caller's WithIdempotencyToken value, or a writer-generated UUIDv7 (used as both token and attemptID) for non-idempotent writes. The trade is a per-attempt orphan triple (data + ref + possibly token-commit) on failure; the reader's commit-marker gate filters them out, and the operator-driven sweeper reclaims them.

On any failure mid-sequence the returned error is wrapped and nothing is deleted. This is the at-least-once contract: a failed Write may leave per-attempt orphans, never deletes one. Reads ignore the orphans because their token-commits either didn't land or name a different attempt-id.

Passing WithIdempotencyToken makes this call retry-safe across arbitrary outages: an upfront HEAD on `<token>.commit` returns any prior commit's WriteResult reconstructed from metadata (no body re-upload, no new PUTs). If no prior commit exists, the retry proceeds with a fresh attempt-id; recovery is automatic regardless of how long ago the original landed. **Concurrent writes that share the same token are out of contract** — see README's Concurrency contract section.

type WriterConfig added in v0.14.0

type WriterConfig[T any] struct {
	Target         S3Target
	PartitionKeyOf func(T) string
	Compression    CompressionCodec

	// MaterializedViews lists the secondary materialized views the
	// writer should maintain. Every Write iterates each entry,
	// calls Of per record, and PUTs one empty marker per distinct
	// (view, column-values) tuple in the batch under
	// <Prefix>/_matview/<Name>/. Validation runs at NewWriter:
	// Name non-empty + free of '/', Columns valid + unique,
	// Of non-nil, Names unique across the slice.
	//
	// Constructed at writer-creation time so registration cannot
	// race with Write and "registered after the first Write" is
	// not a reachable state. Use BackfillMaterializedView to
	// retroactively cover records written before a view existed.
	MaterializedViews []MaterializedViewDef[T]

	// InsertedAtField names a time.Time field on T that the writer
	// populates with its wall-clock time.Now() just before parquet
	// encoding, so the value becomes a real parquet column in every
	// written file. The field must carry a non-empty, non-"-"
	// parquet tag (e.g. `parquet:"inserted_at"`) — the value is
	// persisted on disk, not library-managed metadata. Empty
	// disables the feature; there is no reflection cost when unset.
	//
	// On the read side the column shows up on T like any other
	// parquet field — no special reader configuration needed.
	// Reference it from VersionOf to use the write-time stamp as
	// the dedup version:
	//
	//	VersionOf: func(r T) int64 { return r.InsertedAt.UnixMicro() }
	InsertedAtField string

	// EncodeBufPoolMaxBytes caps the *bytes.Buffer capacity that the
	// internal encode pool retains across writes. Buffers that grew
	// beyond this on a single Write are dropped on Put — preventing
	// one outlier batch from ballooning the pool's steady-state
	// footprint, at the cost of losing reuse on the next Write of
	// similar size.
	//
	// Set this slightly above the largest typical produced parquet
	// size for the workload. If too low, large writes pay
	// grow-from-zero plus copy-out on every Write (the encode
	// becomes net negative vs no pool — see BenchmarkEncode_Pooled
	// at sizes above the cap). If too high, idle pool retention
	// grows in proportion to GOMAXPROCS, multiplied by the number
	// of Writer instances in the process.
	//
	// The s3store.write.encode_buf_dropped counter increments every
	// time a buffer is dropped due to this cap; a non-zero rate
	// indicates the cap is undersized for the workload.
	//
	// Zero or negative selects the default (48 MiB).
	EncodeBufPoolMaxBytes int64
}

WriterConfig is the narrower Config form for constructing a Writer directly (without a Reader). Holds the S3-wiring bundle (Target — a constructed S3Target) plus write-side-only knobs. Use NewWriter(cfg) when a service writes but never reads.

Target is built once via NewS3Target and can be passed to both WriterConfig.Target and ReaderConfig.Target so the resulting Writer and Reader share the same MaxInflightRequests semaphore — net in-flight S3 requests across both halves stay bounded by one cap.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL