backup

package
v0.0.0-...-2d92a17 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2026 License: AGPL-3.0 Imports: 24 Imported by: 0

Documentation

Overview

Package backup implements the per-adapter logical-backup format defined in docs/design/2026_04_29_proposed_snapshot_logical_decoder.md (Phase 0) and reused by docs/design/2026_04_29_proposed_logical_backup.md (Phase 1).

This file owns the filename encoding rules for non-S3 segments. S3 object keys preserve their `/` separators (and so are not transformed by EncodeSegment); every other adapter scope encodes user-supplied bytes through this path.

Encoding rules (see "Filename encoding" in the Phase 0 doc):

  • Bytes in the unreserved set [A-Za-z0-9._-] pass through.
  • Every other byte is rendered as %HH (uppercase hex), like application/x-www-form-urlencoded but applied to every non-allowlisted byte.
  • If the encoded result exceeds maxSegmentBytes (240), the segment is replaced with <sha256-hex-prefix-32>__<truncated-original> and the full original bytes must be recorded in KEYMAP.jsonl by the caller.
  • Binary DynamoDB partition / sort keys take a separate "b64.<base64url>" path so a binary key never collides with a string key whose hex encoding happens to look like base64. EncodeBinarySegment emits that form.

Index

Constants

View Source
const (
	DDBTableMetaPrefix = "!ddb|meta|table|"
	DDBTableGenPrefix  = "!ddb|meta|gen|"
	DDBItemPrefix      = "!ddb|item|"
	DDBGSIPrefix       = "!ddb|gsi|"
)

Snapshot prefixes the DynamoDB encoder dispatches on. Mirror the live constants in kv/shard_key.go (DynamoTableMetaPrefix etc.) so a renamed prefix is caught by the dispatch tests below.

View Source
const (
	KindSHAFallback   = "sha-fallback"
	KindS3LeafData    = "s3-leaf-data"
	KindMetaCollision = "meta-suffix-rename"
)

KEYMAP.jsonl shape (one record per line):

{"encoded":"<encoded-segment>","original":"<base64url-no-padding>","kind":"sha-fallback"}

Records are written in encounter order (the order the encoder produced them) and never modified after write. The file is append-only; if the same encoded segment is written twice the reader keeps the last entry, but the encoder is expected not to emit duplicates within a single dump.

Records exist only for entries whose original bytes are NOT recoverable from the encoded filename alone:

  • KindSHAFallback — segment is `<sha-prefix-32>__<truncated-original>` (filename length exceeded EncodeSegment's 240-byte ceiling).
  • KindS3LeafData — S3 object renamed to `<obj>.elastickv-leaf-data` because both `<obj>` and `<obj>/...` existed in the same bucket.
  • KindMetaCollision — user S3 object key happened to end in `.elastickv-meta.json`; renamed under --rename-collisions.

A consumer that does not care about reversing these to original bytes can ignore KEYMAP.jsonl entirely.

View Source
const (
	// PhasePhase0SnapshotDecode marks dumps produced by Phase 0a (offline
	// snapshot decoder).
	PhasePhase0SnapshotDecode = "phase0-snapshot-decode"
	// PhasePhase1LivePinned marks dumps produced by Phase 1 (live PIT
	// extraction with cluster-wide read_ts pinning).
	PhasePhase1LivePinned = "phase1-live-pinned"
)
View Source
const (
	// ChecksumAlgorithmSHA256 is the only checksum algorithm Phase 0a writes.
	// Phase 1 may add others later (e.g. blake3) under the same field.
	ChecksumAlgorithmSHA256 = "sha256"
	// ChecksumFormatSha256sum identifies the line-oriented sha256sum(1)
	// format used by the CHECKSUMS file. Operators verify with
	// `sha256sum -c CHECKSUMS` from the dump root.
	ChecksumFormatSha256sum = "sha256sum"
	// EncodedFilenameCharsetRFC3986 is the EncodeSegment charset used for
	// every non-S3-object filename in the dump.
	EncodedFilenameCharsetRFC3986 = "rfc3986-unreserved-plus-percent"
	// S3MetaSuffixDefault is the reserved suffix for the S3 sidecar
	// metadata file (`<obj>.elastickv-meta.json`).
	S3MetaSuffixDefault = ".elastickv-meta.json"
	// S3CollisionStrategyLeafDataSuffix renames the shorter of two
	// colliding S3 keys to `<obj>.elastickv-leaf-data` and records the
	// rename in KEYMAP.jsonl.
	S3CollisionStrategyLeafDataSuffix = "leaf-data-suffix"
	// DynamoDBLayoutPerItem emits one item per file
	// (`items/<pk>/<sk>.json`); the user's stated default.
	DynamoDBLayoutPerItem = "per-item"
	// DynamoDBLayoutJSONL bundles items into `items/data-<part>.jsonl`
	// (opt-in via --dynamodb-bundle-mode jsonl).
	DynamoDBLayoutJSONL = "jsonl"
	// KeySegmentMaxBytesDefault matches EncodeSegment's maxSegmentBytes.
	KeySegmentMaxBytesDefault uint32 = 240
)
View Source
const (
	RedisHashMetaPrefix      = "!hs|meta|"
	RedisHashFieldPrefix     = "!hs|fld|"
	RedisHashMetaDeltaPrefix = "!hs|meta|d|"
)

Snapshot key prefixes the hash encoder dispatches on. Mirror the live store/hash_helpers.go constants — a renamed prefix on the live side surfaces here at compile time via the dispatch tests.

View Source
const (
	RedisStringPrefix = "!redis|str|"
	RedisHLLPrefix    = "!redis|hll|"
	RedisTTLPrefix    = "!redis|ttl|"
)

Snapshot key prefixes the encoder dispatches on. Kept in sync with adapter/redis_compat_types.go so a renamed prefix in the live code is caught here at compile time via the corresponding tests.

View Source
const (
	S3BucketMetaPrefix     = s3keys.BucketMetaPrefix
	S3BucketGenPrefix      = s3keys.BucketGenerationPrefix
	S3ObjectManifestPrefix = s3keys.ObjectManifestPrefix
	S3UploadMetaPrefix     = s3keys.UploadMetaPrefix
	S3UploadPartPrefix     = s3keys.UploadPartPrefix
	S3BlobPrefix           = s3keys.BlobPrefix
	S3GCUploadPrefix       = s3keys.GCUploadPrefix
	S3RoutePrefix          = s3keys.RoutePrefix
)

Snapshot prefixes the S3 encoder dispatches on. Mirror internal/s3keys/keys.go so a renamed prefix surfaces at compile time via the dispatch tests.

View Source
const (
	SQSQueueMetaPrefix      = "!sqs|queue|meta|"
	SQSQueueGenPrefix       = "!sqs|queue|gen|"
	SQSQueueSeqPrefix       = "!sqs|queue|seq|"
	SQSQueueTombstonePrefix = "!sqs|queue|tombstone|"
	SQSMsgDataPrefix        = "!sqs|msg|data|"
	SQSMsgVisPrefix         = "!sqs|msg|vis|"
	SQSMsgByAgePrefix       = "!sqs|msg|byage|"
	SQSMsgDedupPrefix       = "!sqs|msg|dedup|"
	SQSMsgGroupPrefix       = "!sqs|msg|group|"
)

Snapshot key prefixes the SQS encoder dispatches on. Kept in sync with adapter/sqs_keys.go and adapter/sqs_messages.go (see SqsQueueMetaPrefix / SqsMsgDataPrefix); a renamed prefix in the live code is caught here at dispatch time by the corresponding tests that synthesise records with these literal byte strings.

View Source
const CurrentFormatVersion uint32 = 1

CurrentFormatVersion is the format major-version this code emits and accepts. Restore-side code MUST refuse `format_version > current`. A minor-version bump (e.g., adding optional fields) does not change this constant.

View Source
const S3LeafDataSuffix = ".elastickv-leaf-data"

S3LeafDataSuffix renames the shorter of two S3 keys when the longer would force its parent to be a directory. Recorded in KEYMAP.jsonl.

View Source
const S3MetaSuffixReserved = ".elastickv-meta.json"

S3MetaSuffixReserved is the sidecar suffix per the design doc. A user S3 object key whose suffix matches this is rejected at dump time unless WithRenameCollisions is on.

Variables

View Source
var (
	ErrDDBInvalidSchema = errors.New("backup: invalid !ddb|meta|table value")
	ErrDDBInvalidItem   = errors.New("backup: invalid !ddb|item value")
	ErrDDBMalformedKey  = errors.New("backup: malformed DynamoDB key")
)

ErrDDBInvalidSchema, ErrDDBInvalidItem, ErrDDBMalformedKey are the typed error classes for this encoder. Surface via errors.Is.

View Source
var (
	// ErrS3InvalidBucketMeta is returned when a !s3|bucket|meta value
	// fails JSON decoding.
	ErrS3InvalidBucketMeta = errors.New("backup: invalid !s3|bucket|meta value")
	// ErrS3InvalidManifest is returned when a !s3|obj|head value fails
	// JSON decoding.
	ErrS3InvalidManifest = errors.New("backup: invalid !s3|obj|head value")
	// ErrS3MalformedKey is returned when an S3 key cannot be parsed
	// for its structural components.
	ErrS3MalformedKey = errors.New("backup: malformed S3 key")
	// ErrS3MetaSuffixCollision is returned when a user object key
	// collides with the reserved S3MetaSuffixReserved suffix.
	ErrS3MetaSuffixCollision = errors.New("backup: user S3 object key collides with reserved sidecar suffix")
	// ErrS3IncompleteBlobChunks is returned when a manifest declares
	// N chunks for some part but the snapshot did not contain all N.
	// Without this guard a partial / racy snapshot would silently
	// emit a truncated body. Codex P1 #729.
	ErrS3IncompleteBlobChunks = errors.New("backup: incomplete blob chunks for manifest-declared part")
)
View Source
var ErrInvalidEncodedSegment = errors.New("backup: invalid encoded filename segment")

ErrInvalidEncodedSegment is returned by DecodeSegment when its input is neither a valid percent-encoded segment, a binary-prefixed segment, nor a SHA-fallback segment.

View Source
var ErrInvalidKeymapRecord = errors.New("backup: invalid KEYMAP.jsonl record")

ErrInvalidKeymapRecord is returned by Reader.Next when a line does not parse as a KeymapRecord (malformed JSON, missing field, malformed base64, etc.).

View Source
var ErrInvalidManifest = errors.New("backup: manifest invalid")

ErrInvalidManifest is returned by ReadManifest when the JSON parses but fails structural validation (missing required field, unknown phase, etc.).

View Source
var ErrRedisInvalidHashKey = cockroachdberr.New("backup: malformed !hs| key")

ErrRedisInvalidHashKey is returned when an !hs| key cannot be parsed for its userKeyLen+userKey segment (truncated, malformed, etc).

View Source
var ErrRedisInvalidHashMeta = cockroachdberr.New("backup: invalid !hs|meta| value")

ErrRedisInvalidHashMeta is returned when the !hs|meta| value is not the expected 8-byte big-endian field count.

View Source
var ErrRedisInvalidStringValue = cockroachdberr.New("backup: invalid !redis|str| value")

ErrRedisInvalidStringValue is returned when a !redis|str| value uses the new magic-prefix format but its declared TTL section is truncated. Legacy (no-magic) values are accepted as opaque raw bytes.

View Source
var ErrRedisInvalidTTLValue = cockroachdberr.New("backup: invalid !redis|ttl| value")

ErrRedisInvalidTTLValue is returned when a !redis|ttl| value is not the expected 8-byte big-endian uint64 millisecond expiry.

View Source
var ErrSQSInvalidMessage = errors.New("backup: invalid !sqs|msg|data value")

ErrSQSInvalidMessage is returned for !sqs|msg|data values that miss the magic prefix or fail JSON decoding.

View Source
var ErrSQSInvalidQueueMeta = errors.New("backup: invalid !sqs|queue|meta value")

ErrSQSInvalidQueueMeta is returned for !sqs|queue|meta values that miss the magic prefix or fail JSON decoding.

View Source
var ErrSQSMalformedKey = errors.New("backup: malformed SQS key")

ErrSQSMalformedKey is returned when an SQS key cannot be parsed for the queue-name segment (e.g., the heuristic boundary detection found no transition byte).

View Source
var ErrShaFallbackNeedsKeymap = errors.New("backup: filename uses SHA fallback; consult KEYMAP.jsonl")

ErrShaFallbackNeedsKeymap is returned by DecodeSegment when its input is a SHA-fallback segment. The segment cannot be reversed to its original bytes from the filename alone — the caller must consult KEYMAP.jsonl.

View Source
var ErrUnsupportedFormatVersion = errors.New("backup: manifest format_version unsupported")

ErrUnsupportedFormatVersion is returned by ReadManifest when the on-disk format_version is greater than CurrentFormatVersion or zero.

Functions

func DecodeSegment

func DecodeSegment(seg string) ([]byte, error)

DecodeSegment is the inverse of EncodeSegment for percent-encoded and binary-prefixed inputs. SHA-fallback inputs return ErrShaFallbackNeedsKeymap so the caller knows to consult KEYMAP.jsonl rather than treat the partial suffix as the original key.

As a defensive measure DecodeSegment refuses inputs longer than maxSegmentBytes. EncodeSegment never produces such inputs, so any caller passing one is either reading a corrupted dump or has a bug; either way the percentDecode allocation should not run.

func EncodeBinarySegment

func EncodeBinarySegment(raw []byte) string

EncodeBinarySegment encodes a DynamoDB B-attribute (binary) segment as "b64.<base64url-no-padding>" so that binary keys never collide with string keys whose hex-encoding happens to look like base64.

Short-circuits the SHA-fallback for inputs whose base64 expansion (~4/3 of the raw length, plus the 4-byte "b64." prefix) would always overflow maxSegmentBytes. As with EncodeSegment, this avoids an unnecessary large allocation when the result would have been discarded anyway.

func EncodeDDBItemKey

func EncodeDDBItemKey(tableName string, generation uint64, hashKey, rangeKey string) []byte

EncodeDDBItemKey constructs a !ddb|item key for tests. Mirrors the live legacyDynamoItemKey constructor in adapter/dynamodb.go (string hash + range, simplest shape).

func EncodeDDBTableMetaKey

func EncodeDDBTableMetaKey(tableName string) []byte

EncodeDDBTableMetaKey constructs a !ddb|meta|table key for tests.

func EncodeMsgDataKey

func EncodeMsgDataKey(queueName string, gen uint64, messageID string) []byte

EncodeMsgDataKey constructs a !sqs|msg|data key for tests. Mirrors the live sqsMsgDataKey constructor in adapter/sqs_messages.go.

func EncodeQueueMetaKey

func EncodeQueueMetaKey(queueName string) []byte

EncodeQueueMetaKey constructs a !sqs|queue|meta key for tests.

func EncodeSegment

func EncodeSegment(raw []byte) string

EncodeSegment encodes a single user-supplied path segment for use as a filename component. It is the inverse of DecodeSegment for non-fallback inputs.

The encoding is deterministic given the same input.

Three structural short-circuits ensure DecodeSegment cannot misclassify a legitimate key:

  • If `raw` is longer than maxSegmentBytes, even a fully-unreserved encoding (1:1) cannot fit, so we go straight to shaFallback. This also caps the percent-encode allocation at ~maxSegmentBytes, preventing OOM on adversarial input.
  • If the percent-encoded form happens to match the SHA-fallback shape (32 hex chars followed by "__"), we promote it to a real SHA-fallback so DecodeSegment's structural detection cannot fabricate a wrong original.
  • If the percent-encoded form starts with the binary "b64." prefix, we promote to SHA-fallback for the same reason: a plain string key like "b64.foo" would otherwise be decoded as base64 and produce different bytes on round-trip.

Both promoted-fallback paths leave the original in KEYMAP.jsonl (a correctness dependency, per the package doc), so exact-byte recovery is preserved.

func HasInlineTTL

func HasInlineTTL(value []byte) bool

HasInlineTTL reports whether a !redis|str| value carries the new-format inline TTL header. Useful for tests asserting the producer's choice.

func IsBinarySegment

func IsBinarySegment(seg string) bool

IsBinarySegment reports whether seg is a base64-url encoded binary segment emitted by EncodeBinarySegment.

func IsBlobAtomicWriteOutOfSpace

func IsBlobAtomicWriteOutOfSpace(err error) bool

IsBlobAtomicWriteOutOfSpace reports whether err from writeFileAtomic (or any os.File write the master pipeline issues) was driven by a full disk. The platform-specific error codes (POSIX ENOSPC vs. Windows ERROR_DISK_FULL / ERROR_HANDLE_DISK_FULL) live in disk_full_{unix,windows}.go so retry/alarm logic in callers classifies disk-full uniformly across operating systems (Codex P2 round 9).

func IsBlobAtomicWriteRetriable

func IsBlobAtomicWriteRetriable(err error) bool

IsBlobAtomicWriteRetriable reports whether err from writeFileAtomic is a retriable I/O failure. Today the only retriable signal is io.ErrShortWrite. ENOSPC (disk full) is intentionally NOT retriable here — the master pipeline must surface it to the operator rather than spin: a backup against a full disk has no business retrying. IsBlobAtomicWriteOutOfSpace is the explicit out-of-space probe so the pipeline can choose the right alarm wording.

func IsShaFallback

func IsShaFallback(seg string) bool

IsShaFallback reports whether seg uses the SHA-prefix-and-truncated-original form. Such segments cannot be reversed without KEYMAP.jsonl.

func LoadKeymap

func LoadKeymap(r io.Reader) (map[string]KeymapRecord, error)

LoadKeymap reads every record from r into an in-memory map keyed by encoded segment. The last record wins on duplicates. Suitable for scopes where the keymap fits comfortably in memory; for large scopes callers should use KeymapReader directly.

func WriteManifest

func WriteManifest(w io.Writer, m Manifest) error

WriteManifest serialises m as pretty-printed JSON to w.

Pretty-printing is deliberate — MANIFEST.json is operator-facing and is expected to be `cat`-ed and `jq`-ed during incident response.

Types

type Adapter

type Adapter struct {
	Tables    []string `json:"tables,omitempty"`
	Buckets   []string `json:"buckets,omitempty"`
	Databases []uint32 `json:"databases,omitempty"`
	Queues    []string `json:"queues,omitempty"`
}

Adapter holds the scope identifiers for one adapter. Field names are per-adapter to match the protocol's natural vocabulary.

type Adapters

type Adapters struct {
	DynamoDB *Adapter `json:"dynamodb,omitempty"`
	S3       *Adapter `json:"s3,omitempty"`
	Redis    *Adapter `json:"redis,omitempty"`
	SQS      *Adapter `json:"sqs,omitempty"`
}

Adapters lists which scopes were dumped per adapter. The pointer values express two distinguishable on-disk states:

  • nil -> the adapter was excluded from this dump (e.g. `--adapter dynamodb,s3` filtered it out). The corresponding JSON key is absent.
  • non-nil pointer to Adapter{} -> the adapter was in scope but no scopes for it were emitted (no tables, no buckets, etc.). The JSON key is present with an empty object.
  • non-nil pointer to a populated Adapter -> the listed scopes were emitted.

Storing pointers (rather than zero-value Adapter structs) is what keeps "excluded by filter" distinguishable from "included but empty" through json.Marshal — non-pointer fields would collapse both states into the same on-disk shape.

type DDBEncoder

type DDBEncoder struct {
	// contains filtered or unexported fields
}

DDBEncoder encodes the DynamoDB prefix family into the per-table layout described in docs/design/2026_04_29_proposed_snapshot_logical_decoder.md (Phase 0): one `_schema.json` per table and one `items/<pk>/<sk>.json` per item (default per-item layout).

Lifecycle: Handle* per record, Finalize once. Items arrive before the schema in lex order ('i' < 'm' under !ddb|), so the encoder buffers per-encoded-table-segment and emits at Finalize once the schema is known.

Wide-column GSI rows (!ddb|gsi|*) are NOT dumped: they are derivable from the base item set + schema, and replaying GSI rows on restore would conflict with the destination's own index maintenance.

func NewDDBEncoder

func NewDDBEncoder(outRoot string) *DDBEncoder

NewDDBEncoder constructs an encoder rooted at <outRoot>/dynamodb/.

func (*DDBEncoder) Finalize

func (d *DDBEncoder) Finalize() error

Finalize emits each table's _schema.json and per-item JSON files. Tables with items but no schema (orphans) emit a warning and are skipped — preserving the spec's lenient handling for incomplete inputs. Real flush errors fail fast so corruption surfaces immediately rather than being attributed to a later table (Gemini MEDIUM #182).

func (*DDBEncoder) HandleGSIRow

func (d *DDBEncoder) HandleGSIRow(_, _ []byte) error

HandleGSIRow drops GSI rows by default (they are derivable from the base item set + schema). Exposed as a no-op so the master pipeline can dispatch all !ddb|* prefixes uniformly without special-casing.

func (*DDBEncoder) HandleItem

func (d *DDBEncoder) HandleItem(key, value []byte) error

HandleItem processes a !ddb|item|<encTable>|<gen>|<rest> record. The encoded table segment AND the item generation are parsed out of the key; the proto is buffered keyed by generation so Finalize can emit only the rows belonging to the schema's active generation.

Stale-generation rows (left behind by an in-flight delete/recreate before async cleanup finishes) would otherwise silently leak under the new schema and either resurrect deleted data or fail Finalize when primary-key names changed across generations — Codex P1 #237.

func (*DDBEncoder) HandleTableGen

func (d *DDBEncoder) HandleTableGen(_, _ []byte) error

HandleTableGen drops the per-table generation counter (operational state, not user-visible).

func (*DDBEncoder) HandleTableMeta

func (d *DDBEncoder) HandleTableMeta(key, value []byte) error

HandleTableMeta processes a !ddb|meta|table|<encodedTable> record. Strips the magic prefix, proto-unmarshals into DynamoTableSchema, and parks it on the per-table state.

func (*DDBEncoder) WithBundleJSONL

func (d *DDBEncoder) WithBundleJSONL(on bool) *DDBEncoder

WithBundleJSONL switches per-table layout to `items/data-<part>.jsonl` (one item per line). Default is per-item files. The choice is recorded in MANIFEST.json (`dynamodb_layout`) by the master pipeline; the encoder itself only needs the flag to pick the on-disk shape.

Bundle mode is a follow-up: this PR ships per-item only. Calling WithBundleJSONL(true) returns an error from Finalize until the bundle path lands.

func (*DDBEncoder) WithWarnSink

func (d *DDBEncoder) WithWarnSink(fn func(event string, fields ...any)) *DDBEncoder

WithWarnSink wires structured-warning emission (orphan items, schema-less tables, etc.).

type Exclusions

type Exclusions struct {
	IncludeIncompleteUploads bool `json:"include_incomplete_uploads"`
	IncludeOrphans           bool `json:"include_orphans"`
	PreserveSQSVisibility    bool `json:"preserve_sqs_visibility"`
	IncludeSQSSideRecords    bool `json:"include_sqs_side_records"`
}

Exclusions records the producer-side flags that affected which records were emitted. Restore tools log these so an operator can correlate a surprising dump shape with the producer invocation.

type KeymapReader

type KeymapReader struct {
	// contains filtered or unexported fields
}

KeymapReader iterates JSONL records line-by-line. Memory footprint is bounded by keymapBufSizeReader regardless of file size.

func NewKeymapReader

func NewKeymapReader(r io.Reader) *KeymapReader

NewKeymapReader wraps r so the caller can iterate records via Next.

func (*KeymapReader) Next

func (r *KeymapReader) Next() (KeymapRecord, bool, error)

Next decodes the next record. It returns (rec, true, nil) on success, (zero, false, nil) at end of stream, and (zero, false, err) on parse failure or I/O error. Once an error is returned the reader is sticky: subsequent calls return the same error.

The base64-encoded `original` field is validated at parse time rather than lazily: a malformed dump must surface on the first read of the affected line, not propagate silently until a much later rec.Original() call. Same error class either way.

type KeymapRecord

type KeymapRecord struct {
	// Encoded is the filename segment as it appears in the dump tree.
	Encoded string `json:"encoded"`
	// OriginalB64 is base64url-no-padding of the original key bytes.
	OriginalB64 string `json:"original"`
	// Kind classifies why this record exists; see Kind* constants.
	Kind string `json:"kind"`
}

KeymapRecord is a single mapping from encoded filename component back to the original key bytes. Original bytes are arbitrary (binary safe), so they are encoded as base64url-no-padding for transport in JSON.

func (KeymapRecord) Original

func (r KeymapRecord) Original() ([]byte, error)

Original returns the decoded original key bytes from r.OriginalB64.

type KeymapWriter

type KeymapWriter struct {
	// contains filtered or unexported fields
}

KeymapWriter appends records to a KEYMAP.jsonl stream. Concurrent calls to Write are serialised through the underlying bufio.Writer; the caller is expected to use a single writer per scope.

func NewKeymapWriter

func NewKeymapWriter(w io.Writer) *KeymapWriter

NewKeymapWriter returns a writer that appends JSONL records to w. Close must be called to flush.

func (*KeymapWriter) Close

func (w *KeymapWriter) Close() error

Close flushes any buffered records to the underlying writer.

func (*KeymapWriter) Count

func (w *KeymapWriter) Count() int

Count returns the number of records written so far. Useful for the "omit empty KEYMAP file" decision after the dump completes.

func (*KeymapWriter) Write

func (w *KeymapWriter) Write(rec KeymapRecord) error

Write appends one KeymapRecord. The record is JSON-serialised with a trailing newline (json.Encoder behavior), giving the JSONL contract.

func (*KeymapWriter) WriteOriginal

func (w *KeymapWriter) WriteOriginal(encoded string, original []byte, kind string) error

WriteOriginal is a convenience wrapper that base64-encodes raw original bytes for the caller.

type Live

type Live struct {
	// ReadTS is the pinned read_ts at which BackupScanner traversed the
	// keyspace.
	ReadTS uint64 `json:"read_ts"`
	// PinTokenSHA256 is the hex SHA-256 of the pin_token issued by
	// BeginBackup. Stored as a hash rather than the raw token so the
	// manifest carries no auth-sensitive material.
	PinTokenSHA256 string `json:"pin_token_sha256,omitempty"`
}

Live records the cluster-wide pinning information that produced a Phase 1 dump. Phase 0 dumps leave this nil.

type Manifest

type Manifest struct {
	FormatVersion    uint32  `json:"format_version"`
	Phase            string  `json:"phase"`
	ElastickvVersion string  `json:"elastickv_version,omitempty"`
	ClusterID        string  `json:"cluster_id,omitempty"`
	SnapshotIndex    uint64  `json:"snapshot_index,omitempty"`
	LastCommitTS     uint64  `json:"last_commit_ts,omitempty"`
	WallTimeISO      string  `json:"wall_time_iso"`
	Source           *Source `json:"source,omitempty"`
	Live             *Live   `json:"live,omitempty"`
	// Adapters and Exclusions are pointer types so ReadManifest can
	// distinguish "section omitted entirely" (a corrupted or
	// truncated dump that should fail validation) from "section
	// present but populated with default values" (legitimate
	// scope-everything-excluded). Codex P2 #146 (round 3).
	Adapters          *Adapters   `json:"adapters"`
	Exclusions        *Exclusions `json:"exclusions"`
	ChecksumAlgorithm string      `json:"checksum_algorithm"`
	ChecksumFormat    string      `json:"checksum_format"`

	EncodedFilenameCharset string `json:"encoded_filename_charset"`
	KeySegmentMaxBytes     uint32 `json:"key_segment_max_bytes"`
	S3MetaSuffix           string `json:"s3_meta_suffix"`
	S3CollisionStrategy    string `json:"s3_collision_strategy"`
	DynamoDBLayout         string `json:"dynamodb_layout"`
}

Manifest is the on-disk MANIFEST.json structure. Field tags match the spec in docs/design/2026_04_29_proposed_snapshot_logical_decoder.md.

func NewPhase0SnapshotManifest

func NewPhase0SnapshotManifest(now time.Time) Manifest

NewPhase0SnapshotManifest seeds a manifest with the Phase 0a defaults. Callers fill in scope (Adapters), Source/wall time and exclusions before passing it to WriteManifest. Adapters and Exclusions are seeded to non-nil zero values so the resulting manifest passes the "section-present" validation; callers populating individual scopes reach in via the now-non-nil pointer.

func ReadManifest

func ReadManifest(r io.Reader) (Manifest, error)

ReadManifest decodes and validates a MANIFEST.json from r. The returned error is wrapped as ErrUnsupportedFormatVersion or ErrInvalidManifest so callers can branch on errors.Is.

type RedisDB

type RedisDB struct {
	// contains filtered or unexported fields
}

RedisDB encodes one logical Redis database (`redis/db_<n>/`). All operations are scoped to its outRoot; the caller wires per-database instances when the producer supports multiple databases (today only db_0 is meaningful, but the encoder is wired to take any non-negative index so a future multi-db dump does not silently collide on db_0).

Lifecycle:

r := NewRedisDB(outRoot, dbIndex)
for each snapshot record matching a redis prefix: r.Handle*(...)
r.Finalize()

Handle* methods are NOT goroutine-safe; the decoder pipeline is inherently sequential per scope, so a mutex would only add cost.

func NewRedisDB

func NewRedisDB(outRoot string, dbIndex int) *RedisDB

NewRedisDB constructs a RedisDB rooted at <outRoot>/redis/db_<n>/. dbIndex selects <n>; today the producer always passes 0, but accepting the index as a parameter prevents a future multi-db dump from silently colliding on db_0.

func (*RedisDB) Finalize

func (r *RedisDB) Finalize() error

Finalize flushes all open sidecar writers and emits warnings for any pending TTL records whose user key was never claimed by the wide-column encoders. Call exactly once after every snapshot record has been dispatched.

func (*RedisDB) HandleHLL

func (r *RedisDB) HandleHLL(userKey, value []byte) error

HandleHLL processes one !redis|hll|<userKey> record. The value is the raw HLL sketch bytes, written byte-for-byte to hll/<encoded>.bin. TTL for HLL keys lives in !redis|ttl|<userKey> and is consumed by HandleTTL.

func (*RedisDB) HandleHashField

func (r *RedisDB) HandleHashField(key, value []byte) error

HandleHashField processes one !hs|fld|<userKey><fieldName> record. The value is the raw field-value bytes (binary-safe).

Note: Redis hash field names are binary-safe and may legitimately be empty — `HSET k "" v` is a valid command and the live store emits a key shaped exactly `!hs|fld|<len><userKey>` with no trailing field bytes. We deliberately do NOT reject zero-length field names here so backup decoding succeeds on real data created via HSET with empty names. Codex P1 round 13 (PR #725).

func (*RedisDB) HandleHashMeta

func (r *RedisDB) HandleHashMeta(key, value []byte) error

HandleHashMeta processes one !hs|meta|<userKey> record. The value is the 8-byte BE field count. We park the state for finalize-time flush and register the user key so a later !redis|ttl|<userKey> record routes back to this hash state.

Delta keys (!hs|meta|d|...) share the !hs|meta| string prefix, so a snapshot dispatcher that routes by "starts with RedisHashMetaPrefix" will land delta records here too. Phase 0a's output (an array of observed fields) doesn't need to apply the delta arithmetic — the !hs|fld|... records are the source of truth — so we silently skip delta keys instead of returning ErrRedisInvalidHashKey. Codex P1 round 14 (PR #725 #13).

func (*RedisDB) HandleString

func (r *RedisDB) HandleString(userKey, value []byte) error

HandleString processes one !redis|str|<userKey> record. The value is the raw stored bytes; HandleString peels the magic-prefix TTL header (if present) and writes the user-visible value to strings/<encoded>.bin and the TTL — if any — to strings_ttl.jsonl.

func (*RedisDB) HandleTTL

func (r *RedisDB) HandleTTL(userKey, value []byte) error

HandleTTL processes one !redis|ttl|<userKey> record. Routing depends on what HandleString/HandleHLL recorded for the same userKey:

  • redisKindHLL -> hll_ttl.jsonl
  • redisKindString -> strings_ttl.jsonl (legacy strings, whose TTL lives in !redis|ttl| rather than the inline magic-prefix header)
  • redisKindUnknown -> counted in orphanTTLCount; reported via the warn sink on Finalize because Phase 0a's wide-column encoders have not landed yet.

func (*RedisDB) WithWarnSink

func (r *RedisDB) WithWarnSink(fn func(event string, fields ...any)) *RedisDB

WithWarnSink wires a structured-warning sink. The sink is called with stable event names ("redis_orphan_ttl", etc.) and key=value pairs.

type S3Encoder

type S3Encoder struct {
	// contains filtered or unexported fields
}

S3Encoder emits per-bucket _bucket.json + assembled object bodies + .elastickv-meta.json sidecars + KEYMAP.jsonl, per the Phase 0 design (docs/design/2026_04_29_proposed_snapshot_logical_decoder.md).

Lifecycle: Handle* per record, Finalize once. Records arrive in snapshot lex order:

!s3|blob|*           (b)  -- written to a per-(bucket,object)
                            scratch chunk pool
!s3|bucket|gen|*     (bg) -- ignored (operational counter)
!s3|bucket|meta|*    (bm) -- buffered until Finalize
!s3|gc|upload|*      (g)  -- ignored (in-flight cleanup state)
!s3|obj|head|*       (o)  -- buffered until Finalize
!s3|upload|meta|*    (um) -- excluded by default; opt in via
                            WithIncludeIncompleteUploads
!s3|upload|part|*    (up) -- same
!s3route|*           (r)  -- ignored (control plane)

Object body assembly happens at Finalize: for each object manifest, the encoder enumerates parts in PartNo order and chunks in ChunkNo order, concatenates the matching blob chunks (which were pre-spilled to scratch files as they arrived), and writes the assembled body to <outRoot>/s3/<bucket>/<object> with the metadata sidecar at <object>.elastickv-meta.json.

Memory: O(num_objects + num_buckets) buffered metadata. Per-blob payloads are streamed to disk as they arrive — never held in memory.

func NewS3Encoder

func NewS3Encoder(outRoot, scratchRoot string) *S3Encoder

NewS3Encoder constructs an encoder rooted at <outRoot>/s3/. Blob chunks are spilled to <scratchRoot>/s3/ as they arrive and assembled into final object bodies at Finalize. The caller owns scratchRoot; it must exist and be writable. A common choice is os.TempDir() under the dump runner — the encoder removes its scratch subtree on Close().

func (*S3Encoder) Finalize

func (s *S3Encoder) Finalize() error

Finalize assembles every object body, writes its sidecar, flushes per-bucket _bucket.json, and removes the scratch tree.

func (*S3Encoder) HandleBlob

func (s *S3Encoder) HandleBlob(key, value []byte) error

HandleBlob spills a !s3|blob| record to a per-chunk scratch file and registers it under the (bucket, object, gen, uploadID, partNo, chunkNo, partVersion) routing key. EncodeSegment percent-encodes `/` so a multi-segment object key like `../../tmp/pwn` collapses into one filename, but a literal `..` (or `.`) survives unchanged because both `.` chars are RFC3986-unreserved. Without explicit validation, a crafted bucket+object pair like `bucket="..", object=".."` would resolve to filepath.Join(scratchRoot, "..", "..") = the parent of scratchRoot, letting writeFileAtomic land outside the decoder's controlled directory before safeJoinUnderRoot ever runs at output time. Codex P1 round 11.

func (*S3Encoder) HandleBucketMeta

func (s *S3Encoder) HandleBucketMeta(key, value []byte) error

HandleBucketMeta decodes and parks a !s3|bucket|meta record.

func (*S3Encoder) HandleIgnored

func (s *S3Encoder) HandleIgnored(_, _ []byte) error

HandleIgnored is a no-op for prefixes the encoder explicitly drops (!s3|bucket|gen|, !s3|gc|upload|, !s3route|). Exposed so the master pipeline can dispatch all !s3|* prefixes uniformly without special-casing.

func (*S3Encoder) HandleIncompleteUpload

func (s *S3Encoder) HandleIncompleteUpload(prefix string, key, value []byte) error

HandleIncompleteUpload routes !s3|upload|meta|/!s3|upload|part| records to <bucket>/_incomplete_uploads/records.jsonl when the include flag is on; otherwise drops them.

The output writer is opened once per bucket on the first record and cached on s3BucketState. Re-opening per record (the prior implementation) used create/truncate semantics, so each call wiped the file and only the last record survived — Codex P2 #318 / Gemini HIGH+MEDIUM #318.

func (*S3Encoder) HandleObjectManifest

func (s *S3Encoder) HandleObjectManifest(key, value []byte) error

HandleObjectManifest decodes and parks an !s3|obj|head record. The manifest's UploadID and Parts list drive the Finalize-time blob assembly.

func (*S3Encoder) WithIncludeIncompleteUploads

func (s *S3Encoder) WithIncludeIncompleteUploads(on bool) *S3Encoder

WithIncludeIncompleteUploads routes !s3|upload|meta|/!s3|upload|part| records to s3/<bucket>/_incomplete_uploads/. Default is to skip them.

func (*S3Encoder) WithIncludeOrphans

func (s *S3Encoder) WithIncludeOrphans(on bool) *S3Encoder

WithIncludeOrphans surfaces blob chunks that have no matching manifest under s3/<bucket>/_orphans/. Default skips them.

func (*S3Encoder) WithRenameCollisions

func (s *S3Encoder) WithRenameCollisions(on bool) *S3Encoder

WithRenameCollisions opts in to renaming user objects that collide with the reserved S3MetaSuffixReserved suffix. Default rejects.

func (*S3Encoder) WithWarnSink

func (s *S3Encoder) WithWarnSink(fn func(event string, fields ...any)) *S3Encoder

WithWarnSink wires a structured warning sink.

type SQSEncoder

type SQSEncoder struct {
	// contains filtered or unexported fields
}

SQSEncoder encodes the SQS prefix family into the per-queue layout described in docs/design/2026_04_29_proposed_snapshot_logical_decoder.md (Phase 0): one `_queue.json` per queue and one ordered `messages.jsonl`.

Lifecycle: per-snapshot pass calls Handle* for each record, then exactly one Finalize. Side-records (vis/byage/dedup/group/tombstone) are excluded by default; opt in with WithIncludeSideRecords. Visibility state on emitted messages is zeroed by default; opt in to preserve with WithPreserveVisibility.

The encoder buffers messages per queue in memory and sorts them at Finalize-time by (SendTimestampMillis, SequenceNumber, MessageID). This is acceptable for typical operational queues; queues with hundreds of millions of messages will need a future stream-and-merge variant.

func NewSQSEncoder

func NewSQSEncoder(outRoot string) *SQSEncoder

NewSQSEncoder constructs an encoder rooted at <outRoot>/sqs/.

func (*SQSEncoder) Finalize

func (s *SQSEncoder) Finalize() error

Finalize flushes every queue's _queue.json and messages.jsonl. Queues with buffered messages but no meta record (orphans) emit a warning and have their messages dropped — restoring orphan messages without a queue config would silently create a queue with default settings, which is rarely what the operator wants. However, if --include-sqs-side-records is on and this orphan queue has buffered side records (vis/byage/dedup/group/tombstone), those are still flushed under the encoded-prefix directory: the most common reason for a missing meta is a DeleteQueue that left tombstones, and dropping exactly those records is the opposite of what the operator asked for. Codex P2 round 8.

func (*SQSEncoder) HandleMessageData

func (s *SQSEncoder) HandleMessageData(key, value []byte) error

HandleMessageData processes one !sqs|msg|data|<encQueue><gen><encMsgID> record. The encoded queue segment is parsed out of the key and used as the per-queue routing key; the message is buffered until Finalize so it can be sorted and emitted in send-order.

func (*SQSEncoder) HandleQueueGen

func (s *SQSEncoder) HandleQueueGen(key, value []byte) error

HandleQueueGen processes one !sqs|queue|gen|<encoded> record. The value is a base-10 decimal string holding the queue's current generation (mirrors adapter/sqs_catalog.go's CreateQueue Put: the live cluster writes strconv.FormatUint(gen, 10)). Capturing activeGen lets flushQueue drop messages tagged with older generations — those are residual rows left by PurgeQueue / DeleteQueue that the reaper has not yet cleaned, and emitting them to messages.jsonl would resurrect purged messages on restore. Codex P1 round 10.

func (*SQSEncoder) HandleQueueMeta

func (s *SQSEncoder) HandleQueueMeta(key, value []byte) error

HandleQueueMeta processes one !sqs|queue|meta|<encoded> record. Strips the magic prefix, decodes the JSON, projects to the dump-format sqsQueueMetaPublic, and parks it on the per-queue state.

func (*SQSEncoder) HandleSideRecord

func (s *SQSEncoder) HandleSideRecord(prefix string, key, value []byte) error

HandleSideRecord buffers (vis|byage|dedup|group|tombstone) records when includeSideRecords is on; otherwise drops them silently (this is the documented Phase 0 default).

func (*SQSEncoder) WithIncludeSideRecords

func (s *SQSEncoder) WithIncludeSideRecords(on bool) *SQSEncoder

WithIncludeSideRecords routes vis/byage/dedup/group/tombstone records into _internals/. Default is to exclude them — they are derivable from the queue config + message records and replaying them on restore can resurrect aborted state.

func (*SQSEncoder) WithPreserveVisibility

func (s *SQSEncoder) WithPreserveVisibility(on bool) *SQSEncoder

WithPreserveVisibility passes the visibility-state fields (visible_at_millis, current_receipt_token, receive_count, first_receive_millis) through to the dump. Default is to zero them so the restored queue starts with every message visible.

func (*SQSEncoder) WithWarnSink

func (s *SQSEncoder) WithWarnSink(fn func(event string, fields ...any)) *SQSEncoder

WithWarnSink wires a structured warning hook (same shape as RedisDB.WithWarnSink). Used for orphan messages and unresolvable side records.

type Source

type Source struct {
	// FSMPath is the absolute or relative path of the .fsm file the
	// decoder consumed.
	FSMPath string `json:"fsm_path"`
	// FSMCRC32C is the CRC32C value the decoder verified against the
	// .fsm file's footer (lowercase hex).
	FSMCRC32C string `json:"fsm_crc32c,omitempty"`
}

Source records where a Phase 0 dump came from. Phase 1 dumps leave Source nil and populate Live instead.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL