Documentation
¶
Overview ¶
Package backup implements the per-adapter logical-backup format defined in docs/design/2026_04_29_proposed_snapshot_logical_decoder.md (Phase 0) and reused by docs/design/2026_04_29_proposed_logical_backup.md (Phase 1).
This file owns the filename encoding rules for non-S3 segments. S3 object keys preserve their `/` separators (and so are not transformed by EncodeSegment); every other adapter scope encodes user-supplied bytes through this path.
Encoding rules (see "Filename encoding" in the Phase 0 doc):
- Bytes in the unreserved set [A-Za-z0-9._-] pass through.
- Every other byte is rendered as %HH (uppercase hex), like application/x-www-form-urlencoded but applied to every non-allowlisted byte.
- If the encoded result exceeds maxSegmentBytes (240), the segment is replaced with <sha256-hex-prefix-32>__<truncated-original> and the full original bytes must be recorded in KEYMAP.jsonl by the caller.
- Binary DynamoDB partition / sort keys take a separate "b64.<base64url>" path so a binary key never collides with a string key whose hex encoding happens to look like base64. EncodeBinarySegment emits that form.
Index ¶
- Constants
- Variables
- func DecodeSegment(seg string) ([]byte, error)
- func EncodeBinarySegment(raw []byte) string
- func EncodeDDBItemKey(tableName string, generation uint64, hashKey, rangeKey string) []byte
- func EncodeDDBTableMetaKey(tableName string) []byte
- func EncodeMsgDataKey(queueName string, gen uint64, messageID string) []byte
- func EncodeQueueMetaKey(queueName string) []byte
- func EncodeSegment(raw []byte) string
- func HasInlineTTL(value []byte) bool
- func IsBinarySegment(seg string) bool
- func IsBlobAtomicWriteOutOfSpace(err error) bool
- func IsBlobAtomicWriteRetriable(err error) bool
- func IsShaFallback(seg string) bool
- func LoadKeymap(r io.Reader) (map[string]KeymapRecord, error)
- func WriteManifest(w io.Writer, m Manifest) error
- type Adapter
- type Adapters
- type DDBEncoder
- func (d *DDBEncoder) Finalize() error
- func (d *DDBEncoder) HandleGSIRow(_, _ []byte) error
- func (d *DDBEncoder) HandleItem(key, value []byte) error
- func (d *DDBEncoder) HandleTableGen(_, _ []byte) error
- func (d *DDBEncoder) HandleTableMeta(key, value []byte) error
- func (d *DDBEncoder) WithBundleJSONL(on bool) *DDBEncoder
- func (d *DDBEncoder) WithWarnSink(fn func(event string, fields ...any)) *DDBEncoder
- type Exclusions
- type KeymapReader
- type KeymapRecord
- type KeymapWriter
- type Live
- type Manifest
- type RedisDB
- func (r *RedisDB) Finalize() error
- func (r *RedisDB) HandleHLL(userKey, value []byte) error
- func (r *RedisDB) HandleHashField(key, value []byte) error
- func (r *RedisDB) HandleHashMeta(key, value []byte) error
- func (r *RedisDB) HandleString(userKey, value []byte) error
- func (r *RedisDB) HandleTTL(userKey, value []byte) error
- func (r *RedisDB) WithWarnSink(fn func(event string, fields ...any)) *RedisDB
- type S3Encoder
- func (s *S3Encoder) Finalize() error
- func (s *S3Encoder) HandleBlob(key, value []byte) error
- func (s *S3Encoder) HandleBucketMeta(key, value []byte) error
- func (s *S3Encoder) HandleIgnored(_, _ []byte) error
- func (s *S3Encoder) HandleIncompleteUpload(prefix string, key, value []byte) error
- func (s *S3Encoder) HandleObjectManifest(key, value []byte) error
- func (s *S3Encoder) WithIncludeIncompleteUploads(on bool) *S3Encoder
- func (s *S3Encoder) WithIncludeOrphans(on bool) *S3Encoder
- func (s *S3Encoder) WithRenameCollisions(on bool) *S3Encoder
- func (s *S3Encoder) WithWarnSink(fn func(event string, fields ...any)) *S3Encoder
- type SQSEncoder
- func (s *SQSEncoder) Finalize() error
- func (s *SQSEncoder) HandleMessageData(key, value []byte) error
- func (s *SQSEncoder) HandleQueueGen(key, value []byte) error
- func (s *SQSEncoder) HandleQueueMeta(key, value []byte) error
- func (s *SQSEncoder) HandleSideRecord(prefix string, key, value []byte) error
- func (s *SQSEncoder) WithIncludeSideRecords(on bool) *SQSEncoder
- func (s *SQSEncoder) WithPreserveVisibility(on bool) *SQSEncoder
- func (s *SQSEncoder) WithWarnSink(fn func(event string, fields ...any)) *SQSEncoder
- type Source
Constants ¶
const ( DDBTableMetaPrefix = "!ddb|meta|table|" DDBTableGenPrefix = "!ddb|meta|gen|" DDBItemPrefix = "!ddb|item|" DDBGSIPrefix = "!ddb|gsi|" )
Snapshot prefixes the DynamoDB encoder dispatches on. Mirror the live constants in kv/shard_key.go (DynamoTableMetaPrefix etc.) so a renamed prefix is caught by the dispatch tests below.
const ( KindSHAFallback = "sha-fallback" KindS3LeafData = "s3-leaf-data" KindMetaCollision = "meta-suffix-rename" )
KEYMAP.jsonl shape (one record per line):
{"encoded":"<encoded-segment>","original":"<base64url-no-padding>","kind":"sha-fallback"}
Records are written in encounter order (the order the encoder produced them) and never modified after write. The file is append-only; if the same encoded segment is written twice the reader keeps the last entry, but the encoder is expected not to emit duplicates within a single dump.
Records exist only for entries whose original bytes are NOT recoverable from the encoded filename alone:
- KindSHAFallback — segment is `<sha-prefix-32>__<truncated-original>` (filename length exceeded EncodeSegment's 240-byte ceiling).
- KindS3LeafData — S3 object renamed to `<obj>.elastickv-leaf-data` because both `<obj>` and `<obj>/...` existed in the same bucket.
- KindMetaCollision — user S3 object key happened to end in `.elastickv-meta.json`; renamed under --rename-collisions.
A consumer that does not care about reversing these to original bytes can ignore KEYMAP.jsonl entirely.
const ( // PhasePhase0SnapshotDecode marks dumps produced by Phase 0a (offline // snapshot decoder). PhasePhase0SnapshotDecode = "phase0-snapshot-decode" // PhasePhase1LivePinned marks dumps produced by Phase 1 (live PIT // extraction with cluster-wide read_ts pinning). PhasePhase1LivePinned = "phase1-live-pinned" )
const ( // ChecksumAlgorithmSHA256 is the only checksum algorithm Phase 0a writes. // Phase 1 may add others later (e.g. blake3) under the same field. ChecksumAlgorithmSHA256 = "sha256" // ChecksumFormatSha256sum identifies the line-oriented sha256sum(1) // format used by the CHECKSUMS file. Operators verify with // `sha256sum -c CHECKSUMS` from the dump root. ChecksumFormatSha256sum = "sha256sum" // EncodedFilenameCharsetRFC3986 is the EncodeSegment charset used for // every non-S3-object filename in the dump. EncodedFilenameCharsetRFC3986 = "rfc3986-unreserved-plus-percent" // S3MetaSuffixDefault is the reserved suffix for the S3 sidecar // metadata file (`<obj>.elastickv-meta.json`). S3MetaSuffixDefault = ".elastickv-meta.json" // S3CollisionStrategyLeafDataSuffix renames the shorter of two // colliding S3 keys to `<obj>.elastickv-leaf-data` and records the // rename in KEYMAP.jsonl. S3CollisionStrategyLeafDataSuffix = "leaf-data-suffix" // DynamoDBLayoutPerItem emits one item per file // (`items/<pk>/<sk>.json`); the user's stated default. DynamoDBLayoutPerItem = "per-item" // DynamoDBLayoutJSONL bundles items into `items/data-<part>.jsonl` // (opt-in via --dynamodb-bundle-mode jsonl). DynamoDBLayoutJSONL = "jsonl" // KeySegmentMaxBytesDefault matches EncodeSegment's maxSegmentBytes. KeySegmentMaxBytesDefault uint32 = 240 )
const ( RedisHashMetaPrefix = "!hs|meta|" RedisHashFieldPrefix = "!hs|fld|" RedisHashMetaDeltaPrefix = "!hs|meta|d|" )
Snapshot key prefixes the hash encoder dispatches on. Mirror the live store/hash_helpers.go constants — a renamed prefix on the live side surfaces here at compile time via the dispatch tests.
const ( RedisStringPrefix = "!redis|str|" RedisHLLPrefix = "!redis|hll|" RedisTTLPrefix = "!redis|ttl|" )
Snapshot key prefixes the encoder dispatches on. Kept in sync with adapter/redis_compat_types.go so a renamed prefix in the live code is caught here at compile time via the corresponding tests.
const ( S3BucketMetaPrefix = s3keys.BucketMetaPrefix S3BucketGenPrefix = s3keys.BucketGenerationPrefix S3ObjectManifestPrefix = s3keys.ObjectManifestPrefix S3UploadMetaPrefix = s3keys.UploadMetaPrefix S3UploadPartPrefix = s3keys.UploadPartPrefix S3BlobPrefix = s3keys.BlobPrefix S3GCUploadPrefix = s3keys.GCUploadPrefix S3RoutePrefix = s3keys.RoutePrefix )
Snapshot prefixes the S3 encoder dispatches on. Mirror internal/s3keys/keys.go so a renamed prefix surfaces at compile time via the dispatch tests.
const ( SQSQueueMetaPrefix = "!sqs|queue|meta|" SQSQueueGenPrefix = "!sqs|queue|gen|" SQSQueueSeqPrefix = "!sqs|queue|seq|" SQSQueueTombstonePrefix = "!sqs|queue|tombstone|" SQSMsgDataPrefix = "!sqs|msg|data|" SQSMsgVisPrefix = "!sqs|msg|vis|" SQSMsgByAgePrefix = "!sqs|msg|byage|" SQSMsgDedupPrefix = "!sqs|msg|dedup|" SQSMsgGroupPrefix = "!sqs|msg|group|" )
Snapshot key prefixes the SQS encoder dispatches on. Kept in sync with adapter/sqs_keys.go and adapter/sqs_messages.go (see SqsQueueMetaPrefix / SqsMsgDataPrefix); a renamed prefix in the live code is caught here at dispatch time by the corresponding tests that synthesise records with these literal byte strings.
const CurrentFormatVersion uint32 = 1
CurrentFormatVersion is the format major-version this code emits and accepts. Restore-side code MUST refuse `format_version > current`. A minor-version bump (e.g., adding optional fields) does not change this constant.
const S3LeafDataSuffix = ".elastickv-leaf-data"
S3LeafDataSuffix renames the shorter of two S3 keys when the longer would force its parent to be a directory. Recorded in KEYMAP.jsonl.
const S3MetaSuffixReserved = ".elastickv-meta.json"
S3MetaSuffixReserved is the sidecar suffix per the design doc. A user S3 object key whose suffix matches this is rejected at dump time unless WithRenameCollisions is on.
Variables ¶
var ( ErrDDBInvalidSchema = errors.New("backup: invalid !ddb|meta|table value") ErrDDBInvalidItem = errors.New("backup: invalid !ddb|item value") ErrDDBMalformedKey = errors.New("backup: malformed DynamoDB key") )
ErrDDBInvalidSchema, ErrDDBInvalidItem, ErrDDBMalformedKey are the typed error classes for this encoder. Surface via errors.Is.
var ( // ErrS3InvalidBucketMeta is returned when a !s3|bucket|meta value // fails JSON decoding. ErrS3InvalidBucketMeta = errors.New("backup: invalid !s3|bucket|meta value") // ErrS3InvalidManifest is returned when a !s3|obj|head value fails // JSON decoding. ErrS3InvalidManifest = errors.New("backup: invalid !s3|obj|head value") // ErrS3MalformedKey is returned when an S3 key cannot be parsed // for its structural components. ErrS3MalformedKey = errors.New("backup: malformed S3 key") // ErrS3MetaSuffixCollision is returned when a user object key // collides with the reserved S3MetaSuffixReserved suffix. ErrS3MetaSuffixCollision = errors.New("backup: user S3 object key collides with reserved sidecar suffix") // ErrS3IncompleteBlobChunks is returned when a manifest declares // N chunks for some part but the snapshot did not contain all N. // Without this guard a partial / racy snapshot would silently // emit a truncated body. Codex P1 #729. ErrS3IncompleteBlobChunks = errors.New("backup: incomplete blob chunks for manifest-declared part") )
var ErrInvalidEncodedSegment = errors.New("backup: invalid encoded filename segment")
ErrInvalidEncodedSegment is returned by DecodeSegment when its input is neither a valid percent-encoded segment, a binary-prefixed segment, nor a SHA-fallback segment.
var ErrInvalidKeymapRecord = errors.New("backup: invalid KEYMAP.jsonl record")
ErrInvalidKeymapRecord is returned by Reader.Next when a line does not parse as a KeymapRecord (malformed JSON, missing field, malformed base64, etc.).
var ErrInvalidManifest = errors.New("backup: manifest invalid")
ErrInvalidManifest is returned by ReadManifest when the JSON parses but fails structural validation (missing required field, unknown phase, etc.).
var ErrRedisInvalidHashKey = cockroachdberr.New("backup: malformed !hs| key")
ErrRedisInvalidHashKey is returned when an !hs| key cannot be parsed for its userKeyLen+userKey segment (truncated, malformed, etc).
var ErrRedisInvalidHashMeta = cockroachdberr.New("backup: invalid !hs|meta| value")
ErrRedisInvalidHashMeta is returned when the !hs|meta| value is not the expected 8-byte big-endian field count.
var ErrRedisInvalidStringValue = cockroachdberr.New("backup: invalid !redis|str| value")
ErrRedisInvalidStringValue is returned when a !redis|str| value uses the new magic-prefix format but its declared TTL section is truncated. Legacy (no-magic) values are accepted as opaque raw bytes.
var ErrRedisInvalidTTLValue = cockroachdberr.New("backup: invalid !redis|ttl| value")
ErrRedisInvalidTTLValue is returned when a !redis|ttl| value is not the expected 8-byte big-endian uint64 millisecond expiry.
var ErrSQSInvalidMessage = errors.New("backup: invalid !sqs|msg|data value")
ErrSQSInvalidMessage is returned for !sqs|msg|data values that miss the magic prefix or fail JSON decoding.
var ErrSQSInvalidQueueMeta = errors.New("backup: invalid !sqs|queue|meta value")
ErrSQSInvalidQueueMeta is returned for !sqs|queue|meta values that miss the magic prefix or fail JSON decoding.
var ErrSQSMalformedKey = errors.New("backup: malformed SQS key")
ErrSQSMalformedKey is returned when an SQS key cannot be parsed for the queue-name segment (e.g., the heuristic boundary detection found no transition byte).
var ErrShaFallbackNeedsKeymap = errors.New("backup: filename uses SHA fallback; consult KEYMAP.jsonl")
ErrShaFallbackNeedsKeymap is returned by DecodeSegment when its input is a SHA-fallback segment. The segment cannot be reversed to its original bytes from the filename alone — the caller must consult KEYMAP.jsonl.
var ErrUnsupportedFormatVersion = errors.New("backup: manifest format_version unsupported")
ErrUnsupportedFormatVersion is returned by ReadManifest when the on-disk format_version is greater than CurrentFormatVersion or zero.
Functions ¶
func DecodeSegment ¶
DecodeSegment is the inverse of EncodeSegment for percent-encoded and binary-prefixed inputs. SHA-fallback inputs return ErrShaFallbackNeedsKeymap so the caller knows to consult KEYMAP.jsonl rather than treat the partial suffix as the original key.
As a defensive measure DecodeSegment refuses inputs longer than maxSegmentBytes. EncodeSegment never produces such inputs, so any caller passing one is either reading a corrupted dump or has a bug; either way the percentDecode allocation should not run.
func EncodeBinarySegment ¶
EncodeBinarySegment encodes a DynamoDB B-attribute (binary) segment as "b64.<base64url-no-padding>" so that binary keys never collide with string keys whose hex-encoding happens to look like base64.
Short-circuits the SHA-fallback for inputs whose base64 expansion (~4/3 of the raw length, plus the 4-byte "b64." prefix) would always overflow maxSegmentBytes. As with EncodeSegment, this avoids an unnecessary large allocation when the result would have been discarded anyway.
func EncodeDDBItemKey ¶
EncodeDDBItemKey constructs a !ddb|item key for tests. Mirrors the live legacyDynamoItemKey constructor in adapter/dynamodb.go (string hash + range, simplest shape).
func EncodeDDBTableMetaKey ¶
EncodeDDBTableMetaKey constructs a !ddb|meta|table key for tests.
func EncodeMsgDataKey ¶
EncodeMsgDataKey constructs a !sqs|msg|data key for tests. Mirrors the live sqsMsgDataKey constructor in adapter/sqs_messages.go.
func EncodeQueueMetaKey ¶
EncodeQueueMetaKey constructs a !sqs|queue|meta key for tests.
func EncodeSegment ¶
EncodeSegment encodes a single user-supplied path segment for use as a filename component. It is the inverse of DecodeSegment for non-fallback inputs.
The encoding is deterministic given the same input.
Three structural short-circuits ensure DecodeSegment cannot misclassify a legitimate key:
- If `raw` is longer than maxSegmentBytes, even a fully-unreserved encoding (1:1) cannot fit, so we go straight to shaFallback. This also caps the percent-encode allocation at ~maxSegmentBytes, preventing OOM on adversarial input.
- If the percent-encoded form happens to match the SHA-fallback shape (32 hex chars followed by "__"), we promote it to a real SHA-fallback so DecodeSegment's structural detection cannot fabricate a wrong original.
- If the percent-encoded form starts with the binary "b64." prefix, we promote to SHA-fallback for the same reason: a plain string key like "b64.foo" would otherwise be decoded as base64 and produce different bytes on round-trip.
Both promoted-fallback paths leave the original in KEYMAP.jsonl (a correctness dependency, per the package doc), so exact-byte recovery is preserved.
func HasInlineTTL ¶
HasInlineTTL reports whether a !redis|str| value carries the new-format inline TTL header. Useful for tests asserting the producer's choice.
func IsBinarySegment ¶
IsBinarySegment reports whether seg is a base64-url encoded binary segment emitted by EncodeBinarySegment.
func IsBlobAtomicWriteOutOfSpace ¶
IsBlobAtomicWriteOutOfSpace reports whether err from writeFileAtomic (or any os.File write the master pipeline issues) was driven by a full disk. The platform-specific error codes (POSIX ENOSPC vs. Windows ERROR_DISK_FULL / ERROR_HANDLE_DISK_FULL) live in disk_full_{unix,windows}.go so retry/alarm logic in callers classifies disk-full uniformly across operating systems (Codex P2 round 9).
func IsBlobAtomicWriteRetriable ¶
IsBlobAtomicWriteRetriable reports whether err from writeFileAtomic is a retriable I/O failure. Today the only retriable signal is io.ErrShortWrite. ENOSPC (disk full) is intentionally NOT retriable here — the master pipeline must surface it to the operator rather than spin: a backup against a full disk has no business retrying. IsBlobAtomicWriteOutOfSpace is the explicit out-of-space probe so the pipeline can choose the right alarm wording.
func IsShaFallback ¶
IsShaFallback reports whether seg uses the SHA-prefix-and-truncated-original form. Such segments cannot be reversed without KEYMAP.jsonl.
func LoadKeymap ¶
func LoadKeymap(r io.Reader) (map[string]KeymapRecord, error)
LoadKeymap reads every record from r into an in-memory map keyed by encoded segment. The last record wins on duplicates. Suitable for scopes where the keymap fits comfortably in memory; for large scopes callers should use KeymapReader directly.
Types ¶
type Adapter ¶
type Adapter struct {
Tables []string `json:"tables,omitempty"`
Buckets []string `json:"buckets,omitempty"`
Databases []uint32 `json:"databases,omitempty"`
Queues []string `json:"queues,omitempty"`
}
Adapter holds the scope identifiers for one adapter. Field names are per-adapter to match the protocol's natural vocabulary.
type Adapters ¶
type Adapters struct {
DynamoDB *Adapter `json:"dynamodb,omitempty"`
S3 *Adapter `json:"s3,omitempty"`
Redis *Adapter `json:"redis,omitempty"`
SQS *Adapter `json:"sqs,omitempty"`
}
Adapters lists which scopes were dumped per adapter. The pointer values express two distinguishable on-disk states:
- nil -> the adapter was excluded from this dump (e.g. `--adapter dynamodb,s3` filtered it out). The corresponding JSON key is absent.
- non-nil pointer to Adapter{} -> the adapter was in scope but no scopes for it were emitted (no tables, no buckets, etc.). The JSON key is present with an empty object.
- non-nil pointer to a populated Adapter -> the listed scopes were emitted.
Storing pointers (rather than zero-value Adapter structs) is what keeps "excluded by filter" distinguishable from "included but empty" through json.Marshal — non-pointer fields would collapse both states into the same on-disk shape.
type DDBEncoder ¶
type DDBEncoder struct {
// contains filtered or unexported fields
}
DDBEncoder encodes the DynamoDB prefix family into the per-table layout described in docs/design/2026_04_29_proposed_snapshot_logical_decoder.md (Phase 0): one `_schema.json` per table and one `items/<pk>/<sk>.json` per item (default per-item layout).
Lifecycle: Handle* per record, Finalize once. Items arrive before the schema in lex order ('i' < 'm' under !ddb|), so the encoder buffers per-encoded-table-segment and emits at Finalize once the schema is known.
Wide-column GSI rows (!ddb|gsi|*) are NOT dumped: they are derivable from the base item set + schema, and replaying GSI rows on restore would conflict with the destination's own index maintenance.
func NewDDBEncoder ¶
func NewDDBEncoder(outRoot string) *DDBEncoder
NewDDBEncoder constructs an encoder rooted at <outRoot>/dynamodb/.
func (*DDBEncoder) Finalize ¶
func (d *DDBEncoder) Finalize() error
Finalize emits each table's _schema.json and per-item JSON files. Tables with items but no schema (orphans) emit a warning and are skipped — preserving the spec's lenient handling for incomplete inputs. Real flush errors fail fast so corruption surfaces immediately rather than being attributed to a later table (Gemini MEDIUM #182).
func (*DDBEncoder) HandleGSIRow ¶
func (d *DDBEncoder) HandleGSIRow(_, _ []byte) error
HandleGSIRow drops GSI rows by default (they are derivable from the base item set + schema). Exposed as a no-op so the master pipeline can dispatch all !ddb|* prefixes uniformly without special-casing.
func (*DDBEncoder) HandleItem ¶
func (d *DDBEncoder) HandleItem(key, value []byte) error
HandleItem processes a !ddb|item|<encTable>|<gen>|<rest> record. The encoded table segment AND the item generation are parsed out of the key; the proto is buffered keyed by generation so Finalize can emit only the rows belonging to the schema's active generation.
Stale-generation rows (left behind by an in-flight delete/recreate before async cleanup finishes) would otherwise silently leak under the new schema and either resurrect deleted data or fail Finalize when primary-key names changed across generations — Codex P1 #237.
func (*DDBEncoder) HandleTableGen ¶
func (d *DDBEncoder) HandleTableGen(_, _ []byte) error
HandleTableGen drops the per-table generation counter (operational state, not user-visible).
func (*DDBEncoder) HandleTableMeta ¶
func (d *DDBEncoder) HandleTableMeta(key, value []byte) error
HandleTableMeta processes a !ddb|meta|table|<encodedTable> record. Strips the magic prefix, proto-unmarshals into DynamoTableSchema, and parks it on the per-table state.
func (*DDBEncoder) WithBundleJSONL ¶
func (d *DDBEncoder) WithBundleJSONL(on bool) *DDBEncoder
WithBundleJSONL switches per-table layout to `items/data-<part>.jsonl` (one item per line). Default is per-item files. The choice is recorded in MANIFEST.json (`dynamodb_layout`) by the master pipeline; the encoder itself only needs the flag to pick the on-disk shape.
Bundle mode is a follow-up: this PR ships per-item only. Calling WithBundleJSONL(true) returns an error from Finalize until the bundle path lands.
func (*DDBEncoder) WithWarnSink ¶
func (d *DDBEncoder) WithWarnSink(fn func(event string, fields ...any)) *DDBEncoder
WithWarnSink wires structured-warning emission (orphan items, schema-less tables, etc.).
type Exclusions ¶
type Exclusions struct {
IncludeIncompleteUploads bool `json:"include_incomplete_uploads"`
IncludeOrphans bool `json:"include_orphans"`
PreserveSQSVisibility bool `json:"preserve_sqs_visibility"`
IncludeSQSSideRecords bool `json:"include_sqs_side_records"`
}
Exclusions records the producer-side flags that affected which records were emitted. Restore tools log these so an operator can correlate a surprising dump shape with the producer invocation.
type KeymapReader ¶
type KeymapReader struct {
// contains filtered or unexported fields
}
KeymapReader iterates JSONL records line-by-line. Memory footprint is bounded by keymapBufSizeReader regardless of file size.
func NewKeymapReader ¶
func NewKeymapReader(r io.Reader) *KeymapReader
NewKeymapReader wraps r so the caller can iterate records via Next.
func (*KeymapReader) Next ¶
func (r *KeymapReader) Next() (KeymapRecord, bool, error)
Next decodes the next record. It returns (rec, true, nil) on success, (zero, false, nil) at end of stream, and (zero, false, err) on parse failure or I/O error. Once an error is returned the reader is sticky: subsequent calls return the same error.
The base64-encoded `original` field is validated at parse time rather than lazily: a malformed dump must surface on the first read of the affected line, not propagate silently until a much later rec.Original() call. Same error class either way.
type KeymapRecord ¶
type KeymapRecord struct {
// Encoded is the filename segment as it appears in the dump tree.
Encoded string `json:"encoded"`
// OriginalB64 is base64url-no-padding of the original key bytes.
OriginalB64 string `json:"original"`
// Kind classifies why this record exists; see Kind* constants.
Kind string `json:"kind"`
}
KeymapRecord is a single mapping from encoded filename component back to the original key bytes. Original bytes are arbitrary (binary safe), so they are encoded as base64url-no-padding for transport in JSON.
func (KeymapRecord) Original ¶
func (r KeymapRecord) Original() ([]byte, error)
Original returns the decoded original key bytes from r.OriginalB64.
type KeymapWriter ¶
type KeymapWriter struct {
// contains filtered or unexported fields
}
KeymapWriter appends records to a KEYMAP.jsonl stream. Concurrent calls to Write are serialised through the underlying bufio.Writer; the caller is expected to use a single writer per scope.
func NewKeymapWriter ¶
func NewKeymapWriter(w io.Writer) *KeymapWriter
NewKeymapWriter returns a writer that appends JSONL records to w. Close must be called to flush.
func (*KeymapWriter) Close ¶
func (w *KeymapWriter) Close() error
Close flushes any buffered records to the underlying writer.
func (*KeymapWriter) Count ¶
func (w *KeymapWriter) Count() int
Count returns the number of records written so far. Useful for the "omit empty KEYMAP file" decision after the dump completes.
func (*KeymapWriter) Write ¶
func (w *KeymapWriter) Write(rec KeymapRecord) error
Write appends one KeymapRecord. The record is JSON-serialised with a trailing newline (json.Encoder behavior), giving the JSONL contract.
func (*KeymapWriter) WriteOriginal ¶
func (w *KeymapWriter) WriteOriginal(encoded string, original []byte, kind string) error
WriteOriginal is a convenience wrapper that base64-encodes raw original bytes for the caller.
type Live ¶
type Live struct {
// ReadTS is the pinned read_ts at which BackupScanner traversed the
// keyspace.
ReadTS uint64 `json:"read_ts"`
// PinTokenSHA256 is the hex SHA-256 of the pin_token issued by
// BeginBackup. Stored as a hash rather than the raw token so the
// manifest carries no auth-sensitive material.
PinTokenSHA256 string `json:"pin_token_sha256,omitempty"`
}
Live records the cluster-wide pinning information that produced a Phase 1 dump. Phase 0 dumps leave this nil.
type Manifest ¶
type Manifest struct {
FormatVersion uint32 `json:"format_version"`
Phase string `json:"phase"`
ElastickvVersion string `json:"elastickv_version,omitempty"`
ClusterID string `json:"cluster_id,omitempty"`
SnapshotIndex uint64 `json:"snapshot_index,omitempty"`
LastCommitTS uint64 `json:"last_commit_ts,omitempty"`
WallTimeISO string `json:"wall_time_iso"`
Source *Source `json:"source,omitempty"`
Live *Live `json:"live,omitempty"`
// Adapters and Exclusions are pointer types so ReadManifest can
// distinguish "section omitted entirely" (a corrupted or
// truncated dump that should fail validation) from "section
// present but populated with default values" (legitimate
// scope-everything-excluded). Codex P2 #146 (round 3).
Adapters *Adapters `json:"adapters"`
Exclusions *Exclusions `json:"exclusions"`
ChecksumAlgorithm string `json:"checksum_algorithm"`
ChecksumFormat string `json:"checksum_format"`
EncodedFilenameCharset string `json:"encoded_filename_charset"`
KeySegmentMaxBytes uint32 `json:"key_segment_max_bytes"`
S3MetaSuffix string `json:"s3_meta_suffix"`
S3CollisionStrategy string `json:"s3_collision_strategy"`
DynamoDBLayout string `json:"dynamodb_layout"`
}
Manifest is the on-disk MANIFEST.json structure. Field tags match the spec in docs/design/2026_04_29_proposed_snapshot_logical_decoder.md.
func NewPhase0SnapshotManifest ¶
NewPhase0SnapshotManifest seeds a manifest with the Phase 0a defaults. Callers fill in scope (Adapters), Source/wall time and exclusions before passing it to WriteManifest. Adapters and Exclusions are seeded to non-nil zero values so the resulting manifest passes the "section-present" validation; callers populating individual scopes reach in via the now-non-nil pointer.
type RedisDB ¶
type RedisDB struct {
// contains filtered or unexported fields
}
RedisDB encodes one logical Redis database (`redis/db_<n>/`). All operations are scoped to its outRoot; the caller wires per-database instances when the producer supports multiple databases (today only db_0 is meaningful, but the encoder is wired to take any non-negative index so a future multi-db dump does not silently collide on db_0).
Lifecycle:
r := NewRedisDB(outRoot, dbIndex) for each snapshot record matching a redis prefix: r.Handle*(...) r.Finalize()
Handle* methods are NOT goroutine-safe; the decoder pipeline is inherently sequential per scope, so a mutex would only add cost.
func NewRedisDB ¶
NewRedisDB constructs a RedisDB rooted at <outRoot>/redis/db_<n>/. dbIndex selects <n>; today the producer always passes 0, but accepting the index as a parameter prevents a future multi-db dump from silently colliding on db_0.
func (*RedisDB) Finalize ¶
Finalize flushes all open sidecar writers and emits warnings for any pending TTL records whose user key was never claimed by the wide-column encoders. Call exactly once after every snapshot record has been dispatched.
func (*RedisDB) HandleHLL ¶
HandleHLL processes one !redis|hll|<userKey> record. The value is the raw HLL sketch bytes, written byte-for-byte to hll/<encoded>.bin. TTL for HLL keys lives in !redis|ttl|<userKey> and is consumed by HandleTTL.
func (*RedisDB) HandleHashField ¶
HandleHashField processes one !hs|fld|<userKey><fieldName> record. The value is the raw field-value bytes (binary-safe).
Note: Redis hash field names are binary-safe and may legitimately be empty — `HSET k "" v` is a valid command and the live store emits a key shaped exactly `!hs|fld|<len><userKey>` with no trailing field bytes. We deliberately do NOT reject zero-length field names here so backup decoding succeeds on real data created via HSET with empty names. Codex P1 round 13 (PR #725).
func (*RedisDB) HandleHashMeta ¶
HandleHashMeta processes one !hs|meta|<userKey> record. The value is the 8-byte BE field count. We park the state for finalize-time flush and register the user key so a later !redis|ttl|<userKey> record routes back to this hash state.
Delta keys (!hs|meta|d|...) share the !hs|meta| string prefix, so a snapshot dispatcher that routes by "starts with RedisHashMetaPrefix" will land delta records here too. Phase 0a's output (an array of observed fields) doesn't need to apply the delta arithmetic — the !hs|fld|... records are the source of truth — so we silently skip delta keys instead of returning ErrRedisInvalidHashKey. Codex P1 round 14 (PR #725 #13).
func (*RedisDB) HandleString ¶
HandleString processes one !redis|str|<userKey> record. The value is the raw stored bytes; HandleString peels the magic-prefix TTL header (if present) and writes the user-visible value to strings/<encoded>.bin and the TTL — if any — to strings_ttl.jsonl.
func (*RedisDB) HandleTTL ¶
HandleTTL processes one !redis|ttl|<userKey> record. Routing depends on what HandleString/HandleHLL recorded for the same userKey:
- redisKindHLL -> hll_ttl.jsonl
- redisKindString -> strings_ttl.jsonl (legacy strings, whose TTL lives in !redis|ttl| rather than the inline magic-prefix header)
- redisKindUnknown -> counted in orphanTTLCount; reported via the warn sink on Finalize because Phase 0a's wide-column encoders have not landed yet.
type S3Encoder ¶
type S3Encoder struct {
// contains filtered or unexported fields
}
S3Encoder emits per-bucket _bucket.json + assembled object bodies + .elastickv-meta.json sidecars + KEYMAP.jsonl, per the Phase 0 design (docs/design/2026_04_29_proposed_snapshot_logical_decoder.md).
Lifecycle: Handle* per record, Finalize once. Records arrive in snapshot lex order:
!s3|blob|* (b) -- written to a per-(bucket,object)
scratch chunk pool
!s3|bucket|gen|* (bg) -- ignored (operational counter)
!s3|bucket|meta|* (bm) -- buffered until Finalize
!s3|gc|upload|* (g) -- ignored (in-flight cleanup state)
!s3|obj|head|* (o) -- buffered until Finalize
!s3|upload|meta|* (um) -- excluded by default; opt in via
WithIncludeIncompleteUploads
!s3|upload|part|* (up) -- same
!s3route|* (r) -- ignored (control plane)
Object body assembly happens at Finalize: for each object manifest, the encoder enumerates parts in PartNo order and chunks in ChunkNo order, concatenates the matching blob chunks (which were pre-spilled to scratch files as they arrived), and writes the assembled body to <outRoot>/s3/<bucket>/<object> with the metadata sidecar at <object>.elastickv-meta.json.
Memory: O(num_objects + num_buckets) buffered metadata. Per-blob payloads are streamed to disk as they arrive — never held in memory.
func NewS3Encoder ¶
NewS3Encoder constructs an encoder rooted at <outRoot>/s3/. Blob chunks are spilled to <scratchRoot>/s3/ as they arrive and assembled into final object bodies at Finalize. The caller owns scratchRoot; it must exist and be writable. A common choice is os.TempDir() under the dump runner — the encoder removes its scratch subtree on Close().
func (*S3Encoder) Finalize ¶
Finalize assembles every object body, writes its sidecar, flushes per-bucket _bucket.json, and removes the scratch tree.
func (*S3Encoder) HandleBlob ¶
HandleBlob spills a !s3|blob| record to a per-chunk scratch file and registers it under the (bucket, object, gen, uploadID, partNo, chunkNo, partVersion) routing key. EncodeSegment percent-encodes `/` so a multi-segment object key like `../../tmp/pwn` collapses into one filename, but a literal `..` (or `.`) survives unchanged because both `.` chars are RFC3986-unreserved. Without explicit validation, a crafted bucket+object pair like `bucket="..", object=".."` would resolve to filepath.Join(scratchRoot, "..", "..") = the parent of scratchRoot, letting writeFileAtomic land outside the decoder's controlled directory before safeJoinUnderRoot ever runs at output time. Codex P1 round 11.
func (*S3Encoder) HandleBucketMeta ¶
HandleBucketMeta decodes and parks a !s3|bucket|meta record.
func (*S3Encoder) HandleIgnored ¶
HandleIgnored is a no-op for prefixes the encoder explicitly drops (!s3|bucket|gen|, !s3|gc|upload|, !s3route|). Exposed so the master pipeline can dispatch all !s3|* prefixes uniformly without special-casing.
func (*S3Encoder) HandleIncompleteUpload ¶
HandleIncompleteUpload routes !s3|upload|meta|/!s3|upload|part| records to <bucket>/_incomplete_uploads/records.jsonl when the include flag is on; otherwise drops them.
The output writer is opened once per bucket on the first record and cached on s3BucketState. Re-opening per record (the prior implementation) used create/truncate semantics, so each call wiped the file and only the last record survived — Codex P2 #318 / Gemini HIGH+MEDIUM #318.
func (*S3Encoder) HandleObjectManifest ¶
HandleObjectManifest decodes and parks an !s3|obj|head record. The manifest's UploadID and Parts list drive the Finalize-time blob assembly.
func (*S3Encoder) WithIncludeIncompleteUploads ¶
WithIncludeIncompleteUploads routes !s3|upload|meta|/!s3|upload|part| records to s3/<bucket>/_incomplete_uploads/. Default is to skip them.
func (*S3Encoder) WithIncludeOrphans ¶
WithIncludeOrphans surfaces blob chunks that have no matching manifest under s3/<bucket>/_orphans/. Default skips them.
func (*S3Encoder) WithRenameCollisions ¶
WithRenameCollisions opts in to renaming user objects that collide with the reserved S3MetaSuffixReserved suffix. Default rejects.
type SQSEncoder ¶
type SQSEncoder struct {
// contains filtered or unexported fields
}
SQSEncoder encodes the SQS prefix family into the per-queue layout described in docs/design/2026_04_29_proposed_snapshot_logical_decoder.md (Phase 0): one `_queue.json` per queue and one ordered `messages.jsonl`.
Lifecycle: per-snapshot pass calls Handle* for each record, then exactly one Finalize. Side-records (vis/byage/dedup/group/tombstone) are excluded by default; opt in with WithIncludeSideRecords. Visibility state on emitted messages is zeroed by default; opt in to preserve with WithPreserveVisibility.
The encoder buffers messages per queue in memory and sorts them at Finalize-time by (SendTimestampMillis, SequenceNumber, MessageID). This is acceptable for typical operational queues; queues with hundreds of millions of messages will need a future stream-and-merge variant.
func NewSQSEncoder ¶
func NewSQSEncoder(outRoot string) *SQSEncoder
NewSQSEncoder constructs an encoder rooted at <outRoot>/sqs/.
func (*SQSEncoder) Finalize ¶
func (s *SQSEncoder) Finalize() error
Finalize flushes every queue's _queue.json and messages.jsonl. Queues with buffered messages but no meta record (orphans) emit a warning and have their messages dropped — restoring orphan messages without a queue config would silently create a queue with default settings, which is rarely what the operator wants. However, if --include-sqs-side-records is on and this orphan queue has buffered side records (vis/byage/dedup/group/tombstone), those are still flushed under the encoded-prefix directory: the most common reason for a missing meta is a DeleteQueue that left tombstones, and dropping exactly those records is the opposite of what the operator asked for. Codex P2 round 8.
func (*SQSEncoder) HandleMessageData ¶
func (s *SQSEncoder) HandleMessageData(key, value []byte) error
HandleMessageData processes one !sqs|msg|data|<encQueue><gen><encMsgID> record. The encoded queue segment is parsed out of the key and used as the per-queue routing key; the message is buffered until Finalize so it can be sorted and emitted in send-order.
func (*SQSEncoder) HandleQueueGen ¶
func (s *SQSEncoder) HandleQueueGen(key, value []byte) error
HandleQueueGen processes one !sqs|queue|gen|<encoded> record. The value is a base-10 decimal string holding the queue's current generation (mirrors adapter/sqs_catalog.go's CreateQueue Put: the live cluster writes strconv.FormatUint(gen, 10)). Capturing activeGen lets flushQueue drop messages tagged with older generations — those are residual rows left by PurgeQueue / DeleteQueue that the reaper has not yet cleaned, and emitting them to messages.jsonl would resurrect purged messages on restore. Codex P1 round 10.
func (*SQSEncoder) HandleQueueMeta ¶
func (s *SQSEncoder) HandleQueueMeta(key, value []byte) error
HandleQueueMeta processes one !sqs|queue|meta|<encoded> record. Strips the magic prefix, decodes the JSON, projects to the dump-format sqsQueueMetaPublic, and parks it on the per-queue state.
func (*SQSEncoder) HandleSideRecord ¶
func (s *SQSEncoder) HandleSideRecord(prefix string, key, value []byte) error
HandleSideRecord buffers (vis|byage|dedup|group|tombstone) records when includeSideRecords is on; otherwise drops them silently (this is the documented Phase 0 default).
func (*SQSEncoder) WithIncludeSideRecords ¶
func (s *SQSEncoder) WithIncludeSideRecords(on bool) *SQSEncoder
WithIncludeSideRecords routes vis/byage/dedup/group/tombstone records into _internals/. Default is to exclude them — they are derivable from the queue config + message records and replaying them on restore can resurrect aborted state.
func (*SQSEncoder) WithPreserveVisibility ¶
func (s *SQSEncoder) WithPreserveVisibility(on bool) *SQSEncoder
WithPreserveVisibility passes the visibility-state fields (visible_at_millis, current_receipt_token, receive_count, first_receive_millis) through to the dump. Default is to zero them so the restored queue starts with every message visible.
func (*SQSEncoder) WithWarnSink ¶
func (s *SQSEncoder) WithWarnSink(fn func(event string, fields ...any)) *SQSEncoder
WithWarnSink wires a structured warning hook (same shape as RedisDB.WithWarnSink). Used for orphan messages and unresolvable side records.
type Source ¶
type Source struct {
// FSMPath is the absolute or relative path of the .fsm file the
// decoder consumed.
FSMPath string `json:"fsm_path"`
// FSMCRC32C is the CRC32C value the decoder verified against the
// .fsm file's footer (lowercase hex).
FSMCRC32C string `json:"fsm_crc32c,omitempty"`
}
Source records where a Phase 0 dump came from. Phase 1 dumps leave Source nil and populate Live instead.