Documentation
¶
Overview ¶
Package s3db provides a concurrency-safe SQLite database backed by S3, using changeset logging and optimistic concurrency control. See DESIGN.md for the architecture.
Index ¶
- Variables
- func Init(ctx context.Context, store BlobStore, prefix, srcPath string) error
- func Push(ctx context.Context, store BlobStore, prefix, srcPath string, ...) error
- type BlobInfo
- type BlobStore
- type DB
- func (db *DB) Close() error
- func (db *DB) Compact(ctx context.Context) error
- func (db *DB) GC(ctx context.Context) error
- func (db *DB) Seq() int64
- func (db *DB) Stats() Stats
- func (db *DB) Update(ctx context.Context, fn func(*sqlite.Conn) error) error
- func (db *DB) View(ctx context.Context, fn func(*sqlite.Conn) error) error
- type ErrUnrecordedChanges
- type FileSystemBlobStore
- func (s *FileSystemBlobStore) Close() error
- func (s *FileSystemBlobStore) Delete(ctx context.Context, key string) error
- func (s *FileSystemBlobStore) DeletePrefix(ctx context.Context, prefix string) error
- func (s *FileSystemBlobStore) Get(ctx context.Context, key string) (io.ReadCloser, string, error)
- func (s *FileSystemBlobStore) GetRange(ctx context.Context, key string, start, end int64) (io.ReadCloser, error)
- func (s *FileSystemBlobStore) List(ctx context.Context, prefix string) ([]string, error)
- func (s *FileSystemBlobStore) Put(ctx context.Context, key string, body io.Reader, cond PutCondition) (string, error)
- func (s *FileSystemBlobStore) Root() string
- func (s *FileSystemBlobStore) Stat(ctx context.Context, key string) (BlobInfo, error)
- type MemBlobStore
- func (m *MemBlobStore) Delete(ctx context.Context, key string) error
- func (m *MemBlobStore) DeletePrefix(ctx context.Context, prefix string) error
- func (m *MemBlobStore) Get(ctx context.Context, key string) (io.ReadCloser, string, error)
- func (m *MemBlobStore) GetRange(ctx context.Context, key string, start, end int64) (io.ReadCloser, error)
- func (m *MemBlobStore) List(ctx context.Context, prefix string) ([]string, error)
- func (m *MemBlobStore) Put(ctx context.Context, key string, body io.Reader, cond PutCondition) (string, error)
- func (m *MemBlobStore) Stat(ctx context.Context, key string) (BlobInfo, error)
- type Migration
- type Option
- type PullInfo
- type PutCondition
- type S3BlobStore
- func (s *S3BlobStore) Delete(ctx context.Context, key string) error
- func (s *S3BlobStore) DeletePrefix(ctx context.Context, prefix string) error
- func (s *S3BlobStore) Get(ctx context.Context, key string) (io.ReadCloser, string, error)
- func (s *S3BlobStore) GetRange(ctx context.Context, key string, start, end int64) (io.ReadCloser, error)
- func (s *S3BlobStore) List(ctx context.Context, prefix string) ([]string, error)
- func (s *S3BlobStore) Put(ctx context.Context, key string, body io.Reader, cond PutCondition) (string, error)
- func (s *S3BlobStore) Stat(ctx context.Context, key string) (BlobInfo, error)
- type Stats
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNotFound is returned when a blob or manifest does not exist. ErrNotFound = errors.New("s3db: not found") // ErrPreconditionFailed is returned when a conditional write fails // because the precondition (If-Match or If-None-Match) was not met. // This indicates a concurrent writer modified the object. ErrPreconditionFailed = errors.New("s3db: precondition failed") // ErrConflict is returned when Update gives up after exhausting retries // due to repeated write contention. ErrConflict = errors.New("s3db: write conflict, retries exhausted") // ErrSchemaMismatch is returned when the caller's expected schema version // does not match the manifest's schema version. ErrSchemaMismatch = errors.New("s3db: schema version mismatch") // ErrSchemaTooNew is returned when the manifest's schema version is ahead // of the migrations this client knows about. Upgrade your code. ErrSchemaTooNew = errors.New("s3db: database schema is newer than this client supports") // ErrSeqMismatch is returned by Push when expectedSeq does not match // the current manifest seq, indicating the database has advanced // since the caller last pulled. ErrSeqMismatch = errors.New("s3db: sequence mismatch (concurrent write detected)") )
Sentinel errors returned by the library.
var NoCondition = PutCondition{}
NoCondition is a PutCondition with no preconditions — an unconditional write.
Functions ¶
func Init ¶
Init creates a new database at the given prefix. If srcPath is non-empty, that SQLite file becomes the initial snapshot; otherwise an empty database is created.
Returns an error wrapping ErrPreconditionFailed if a database already exists at the prefix. Safe under concurrent callers — exactly one Init succeeds; others see ErrPreconditionFailed.
Init does NOT validate that srcPath's schema matches any migrations — it just uploads the bytes. If your application uses migrations, either Init with srcPath="" and let the first Open run them, or be aware that the resulting manifest will have schema_version=0.
func Push ¶
Push uploads srcPath as the new database state, replacing the current snapshot and clearing the log (like a manual compaction).
expectedSeq guards against overwriting concurrent writes: it must match the current manifest's Seq. Pass the Seq returned by Pull. If the database has advanced since you pulled, Push returns ErrSeqMismatch and you should re-pull, re-apply your edits, and retry.
To skip the seq check (DANGEROUS — any concurrent writes since your last pull will be silently discarded), pass expectedSeq = -1.
Push does not advance Seq (it's a snapshot replacement, not a logical write) and does not change SchemaVersion. The pushed file should match the current schema — Push does NOT validate this. Pushing a file with a different schema will break subsequent clients.
Types ¶
type BlobStore ¶
type BlobStore interface {
// Get retrieves the object at key. The caller is responsible for
// closing the returned reader. Returns ErrNotFound if the key does
// not exist.
Get(ctx context.Context, key string) (body io.ReadCloser, etag string, err error)
// GetRange retrieves bytes [start, end] inclusive of the object at key.
// The caller is responsible for closing the returned reader. Used for
// parallel snapshot downloads. Stores that don't support range requests
// may return the full object (the caller reads what it needs and
// discards the rest, which is wasteful but correct). Returns ErrNotFound
// if the key does not exist.
GetRange(ctx context.Context, key string, start, end int64) (body io.ReadCloser, err error)
// Stat returns metadata for the object at key without fetching the
// body. Returns ErrNotFound if the key does not exist.
Stat(ctx context.Context, key string) (info BlobInfo, err error)
// Put writes body to key, subject to cond. The body reader is drained
// by the store. Returns the new ETag on success, ErrPreconditionFailed
// if cond is not met.
//
// The body may be partially or fully consumed before an error is
// returned. Callers that need to retry must supply a fresh reader.
Put(ctx context.Context, key string, body io.Reader, cond PutCondition) (etag string, err error)
// List returns all keys with the given prefix, in lexicographic order.
List(ctx context.Context, prefix string) (keys []string, err error)
// Delete removes the object at key. Deleting a nonexistent key is not an error.
Delete(ctx context.Context, key string) error
// DeletePrefix removes all objects whose keys start with prefix.
// Deleting an empty prefix (no matches) is not an error.
DeletePrefix(ctx context.Context, prefix string) error
}
BlobStore is the storage abstraction. Production uses S3; tests use an in-memory fake. All operations are key-scoped — the store has no concept of the manifest, changesets, or snapshots.
Implementations must provide:
- Atomic Put (readers never see partial writes)
- Strong read-after-write consistency
- ETag-based compare-and-swap via PutCondition.IfMatch
- Write-if-not-exists via PutCondition.IfNoneMatch
type DB ¶
type DB struct {
// contains filtered or unexported fields
}
DB is a concurrency-safe SQLite database backed by a BlobStore. It holds a single long-lived local SQLite connection synced to the remote state.
DB is safe for concurrent use — View and Update are serialized by an internal mutex. This is appropriate for Lambda (typically one goroutine per invocation) and prevents pointless intra-process CAS contention.
Create a DB with Open. Close it when done to release the local file and connection.
func Open ¶
Open connects to (or initializes) a database under the given prefix in the store. The prefix should end with "/" — e.g. "mydb/".
If no manifest exists at prefix, Open creates one with an empty SQLite database as the initial snapshot (schema_version=0, seq=0). This is safe under concurrent Opens: the creation uses If-None-Match, and the loser of that race falls back to reading the winner's manifest.
If WithMigrations is set, pending migrations are applied before Open returns. See DESIGN.md for the migration-as-forced-compaction model.
The returned DB holds a local SQLite file. If WithLocalPath is not set, a temp file is used and deleted on Close. For Lambda, set WithLocalPath to something under /tmp so the file persists across warm starts.
func (*DB) Close ¶
Close releases the local SQLite connection and, if the local file was created by Open (no WithLocalPath), deletes it. The DB is unusable after Close.
func (*DB) Compact ¶
Compact builds a fresh snapshot from the current state and replaces the manifest with one that points at it and has an empty log. The sequence number is unchanged — compaction doesn't advance the logical version.
If a writer commits during compaction (manifest CAS fails), Compact retries: it re-syncs to include the new changeset, rebuilds the snapshot, and tries again. After maxRetries attempts it gives up with ErrConflict.
Compaction never blocks writers and never loses work. If it's abandoned mid-way, the uploaded snapshot blob is an orphan that GC will clean up.
Compact holds the DB mutex for its entire duration. For long-running compactions (large databases), consider calling it from a dedicated goroutine or scheduled Lambda rather than inline with user requests.
func (*DB) GC ¶
GC deletes unreachable blobs: epoch prefixes whose changesets have all been subsumed by the current snapshot, and snapshots that are no longer referenced by the current manifest.
An epoch is deletable when none of its changesets appear in the current manifest's log. In practice this happens one or two compaction cycles after the epoch closes — two if a straggler write landed in the epoch during compaction. See DESIGN.md.
Orphaned blobs (uploaded but never committed due to crash or lost CAS) are cleaned up automatically as part of epoch prefix deletion — they live under the same prefix as committed changesets and get swept together.
GC is safe to run concurrently with writers. It only deletes prefixes that are not referenced by the current manifest, and the manifest is the source of truth — a writer cannot resurrect a deleted epoch (compaction opens a new epoch; writers never go back).
func (*DB) Seq ¶
Seq returns the sequence number the local DB is currently synced to. Equivalent to Stats().Seq but cheaper. Useful for diagnostics.
func (*DB) Stats ¶
Stats returns a snapshot of the database's current state. Useful for diagnostics, logging, and deciding when to trigger Compact/GC.
func (*DB) Update ¶
Update runs fn as a transaction against the current database state, then commits the resulting changeset via the CAS loop.
fn may be invoked multiple times if concurrent writers cause rebase conflicts. fn must therefore be idempotent with respect to side effects OUTSIDE the database — don't send emails or make external API calls inside fn. This is the same contract as a retry loop around a serializable transaction.
If fn performs only reads (no INSERTs/UPDATEs/DELETEs, or ones that change nothing), Update returns nil without touching the store.
Returns ErrConflict if CAS retries are exhausted, ErrSchemaMismatch or ErrSchemaTooNew if the manifest's schema version doesn't match this client's migrations, or fn's error if fn fails.
func (*DB) View ¶
View runs fn against the current database state, inside a read-only transaction. It first syncs the local DB to the latest manifest, then invokes fn inside BEGIN/ROLLBACK. Any writes fn performs are discarded when View returns — use Update for writes.
The ctx's Done channel is plumbed to SQLite via SetInterrupt, so long-running queries will be interrupted if ctx is cancelled.
View holds the DB mutex for its entire duration, serializing with other View and Update calls on the same DB instance.
type ErrUnrecordedChanges ¶
type ErrUnrecordedChanges struct {
Rows int64 // how many rows were modified but not captured
PKlessTables []string // tables in the schema without a PRIMARY KEY
}
ErrUnrecordedChanges is returned when fn modified rows that were not captured in the changeset. This is almost always because the modified table lacks a PRIMARY KEY — SQLite's session extension requires one to track changes, and silently skips tables without.
Fix: add a PRIMARY KEY to the affected table. If the table has no natural key, use "id INTEGER PRIMARY KEY" (which aliases the built-in rowid and costs nothing).
func (*ErrUnrecordedChanges) Error ¶
func (e *ErrUnrecordedChanges) Error() string
type FileSystemBlobStore ¶
type FileSystemBlobStore struct {
// contains filtered or unexported fields
}
FileSystemBlobStore implements BlobStore against a local filesystem directory. It is intended for local development, testing, and CLI workflows where running MinIO or hitting real S3 is overkill, and for embedding s3db in single-machine applications that want a real on-disk database with the same concurrency model.
All filesystem operations are scoped with os.Root, so a key cannot resolve outside the store directory via ".." components or symlinks — the boundary is enforced atomically by the OS, not by string checks.
Concurrency ¶
FileSystemBlobStore is safe for concurrent use by multiple goroutines AND multiple processes on the same machine. Cross-process safety is provided by an advisory file lock (flock(2) on Unix, LockFileEx on Windows) on a reserved file at <root>/.s3db.lock. The lock is held only for the duration of a Put or Delete; reads never take the lock (atomic rename guarantees readers see either the old or the new content, never a partial write).
Limitations — read this before using in production ¶
Advisory file locks are only reliable when a single OS kernel mediates all access to the directory. In practice that means:
SUPPORTED: local filesystems (ext4, xfs, btrfs, zfs, APFS, NTFS, tmpfs) accessed by one or more processes on one machine.
NOT SUPPORTED: any filesystem shared across machines.
NFS (v3 and v4): flock is silently ignored, mapped to local-only, or converted to lease-based locking depending on mount options, server, and kernel version. Two clients can both believe they hold the lock and corrupt the manifest.
CIFS/SMB: advisory locks are not reliably honored across clients.
FUSE filesystems (sshfs, s3fs-fuse, etc.): lock support depends entirely on the implementation; most do not support it.
Distributed/clustered filesystems (GlusterFS, CephFS, Lustre, GFS2): flock is often a no-op or local-node-only.
If you need multi-machine access, use S3BlobStore — that is the whole point of this library.
Other things to know:
- Durability: Put fsyncs the data file before the atomic rename, so a successful Put will not produce a torn or zero-length file after a crash. The parent directory entry is NOT fsynced, so a power loss immediately after Put may roll back the rename — readers would see the previous version of the key, not garbage. This is a weaker durability guarantee than S3.
- ETags: computed as the hex MD5 of the content, matching what S3 returns for single-part unencrypted uploads. ETags are recomputed on every Get and Stat by reading the file. This is cheap for the small objects s3db produces; if you Stat large blobs in a tight loop it will show up.
- Crash cleanup: a process that crashes mid-Put may leave a temporary file (basename prefix ".s3db-tmp-") in the target's directory. These are excluded from List and DeletePrefix and are harmless, but accumulate. Remove them by hand if they bother you.
- Key namespace: keys map directly to filesystem paths, so a key and a path-prefix of it cannot coexist (e.g. "a" and "a/b" — the filesystem cannot have a file and a directory at the same path). Put returns an error for the conflicting key; Get, GetRange, Stat, and Delete treat a path that is a directory as a nonexistent key. S3 has a flat key namespace and allows both. s3db never generates such keys, but callers using this type as a general-purpose store should not.
- Reserved names: the key ".s3db.lock" and any key whose basename starts with ".s3db-tmp-" are reserved and must not be used.
- Permissions: directories are created 0o700 and files 0o600 (owner-only), since the store typically holds a database. chmod the root if you need broader access; pre-existing directories keep their permissions.
Example ¶
store, err := s3db.NewFileSystemBlobStore("/var/lib/myapp/db")
if err != nil { ... }
defer store.Close()
db, err := s3db.Open(ctx, store, "mydb/")
func NewFileSystemBlobStore ¶
func NewFileSystemBlobStore(dir string) (*FileSystemBlobStore, error)
NewFileSystemBlobStore opens (or creates) a blob store rooted at dir. The directory and the lock file are created if they do not exist. Call Close when done to release the directory file descriptor.
func (*FileSystemBlobStore) Close ¶
func (s *FileSystemBlobStore) Close() error
Close releases the directory file descriptor held by the store. The store must not be used after Close. Close is optional — the descriptor is reclaimed by the OS on process exit — but recommended for long-running processes that open and discard many stores.
func (*FileSystemBlobStore) Delete ¶
func (s *FileSystemBlobStore) Delete(ctx context.Context, key string) error
func (*FileSystemBlobStore) DeletePrefix ¶
func (s *FileSystemBlobStore) DeletePrefix(ctx context.Context, prefix string) error
func (*FileSystemBlobStore) Get ¶
func (s *FileSystemBlobStore) Get(ctx context.Context, key string) (io.ReadCloser, string, error)
func (*FileSystemBlobStore) GetRange ¶
func (s *FileSystemBlobStore) GetRange(ctx context.Context, key string, start, end int64) (io.ReadCloser, error)
func (*FileSystemBlobStore) Put ¶
func (s *FileSystemBlobStore) Put(ctx context.Context, key string, body io.Reader, cond PutCondition) (string, error)
func (*FileSystemBlobStore) Root ¶
func (s *FileSystemBlobStore) Root() string
Root returns the absolute root directory backing this store.
type MemBlobStore ¶
type MemBlobStore struct {
// contains filtered or unexported fields
}
MemBlobStore is an in-memory BlobStore with real ETag-based CAS semantics. It is safe for concurrent use. It exists for testing and is not intended for production.
func NewMemBlobStore ¶
func NewMemBlobStore() *MemBlobStore
NewMemBlobStore returns an empty in-memory blob store.
func (*MemBlobStore) DeletePrefix ¶
func (m *MemBlobStore) DeletePrefix(ctx context.Context, prefix string) error
func (*MemBlobStore) Get ¶
func (m *MemBlobStore) Get(ctx context.Context, key string) (io.ReadCloser, string, error)
func (*MemBlobStore) GetRange ¶
func (m *MemBlobStore) GetRange(ctx context.Context, key string, start, end int64) (io.ReadCloser, error)
type Option ¶
type Option func(*options)
Option configures a DB at Open time.
func WithAutoCompact ¶
WithAutoCompact enables automatic compaction. When the log reaches threshold entries, Update triggers a compaction SYNCHRONOUSLY after a successful commit (inside the Update call, holding the DB lock). For large databases this adds noticeable latency to every Nth write.
Compaction errors do not affect the Update result — the write has already committed. Use WithCompactErrorHandler to observe them.
Default is 0 (disabled — call Compact explicitly).
func WithCompactErrorHandler ¶
WithCompactErrorHandler sets a callback invoked when auto-compaction fails. Without this, auto-compact errors are silently discarded (the Update that triggered compaction has already succeeded). Useful for logging/alerting when compaction is consistently failing.
func WithGCGracePeriod ¶
WithGCGracePeriod sets how old an unreferenced snapshot must be before GC will delete it. This protects in-flight readers who loaded an old manifest just before a compaction replaced the snapshot they're about to download. Default is 5 minutes. Set to 0 to disable (not recommended if GC runs close in time to Compact).
func WithLocalPath ¶
WithLocalPath sets the path for the local SQLite file. If unset, a temp file is used and cleaned up on Close.
Note: the file is re-downloaded from the store on every Open — this option only controls WHERE the file lives during one Open→Close cycle. For Lambda warm-start caching, keep the *DB handle itself alive across invocations (Open once in init(), never Close) rather than relying on file reuse.
Two DB instances must not share the same localPath concurrently — there is no lock file and the results are undefined.
func WithMaxRetries ¶
WithMaxRetries sets how many CAS attempts Update will make before returning ErrConflict. Default is 10. Each attempt is one round-trip to the store. Values less than 1 are clamped to 1.
func WithMigrations ¶
WithMigrations registers schema migrations to be run on Open. Each migration is a forced compaction — see DESIGN.md. Migrations must have strictly increasing Version numbers. The max Version becomes the client's expected schema version; Update will reject writes if the manifest's schema version doesn't match.
func WithSchemaUnchecked ¶
func WithSchemaUnchecked() Option
WithSchemaUnchecked makes Open adopt whatever schema_version the manifest has, rather than requiring it to match the provided migrations. Normally Open returns ErrSchemaTooNew if the database is ahead of your migrations, to prevent operating on a schema you don't understand.
Intended for ADMIN operations (Compact, GC, Stats) that don't touch user tables. These are schema-agnostic — VACUUM doesn't care about table layout, GC only looks at blob keys, Stats reads the manifest. The CLI tool uses this option for exactly these commands.
Do NOT use this for regular application code. View and Update WILL work (schema is adopted, not rejected), but your code is operating on tables whose layout you don't know. If a concurrent migrator bumps the schema AFTER your Open, subsequent operations will return ErrSchemaTooNew — the adopted version is frozen at Open time.
type PullInfo ¶
PullInfo describes the state of a pulled database file. Pass Seq to Push to guard against overwriting concurrent changes.
func Pull ¶
Pull downloads the current database state to destPath as a single SQLite file, with all changesets from the log applied on top of the snapshot. The resulting file is a fully-materialized, standalone database — you can open it with the sqlite3 CLI, DB Browser, or any SQLite tool.
The returned PullInfo.Seq should be passed to Push to detect concurrent writes. Think of it as the "base revision" for an edit/merge workflow.
Pull does NOT hold any locks or open connections after it returns.
type PutCondition ¶
type PutCondition struct {
// IfMatch, if non-empty, requires the current ETag of the object to equal
// this value. Use this for compare-and-swap.
IfMatch string
// IfNoneMatch, if true, requires the object to not exist. Use this for
// write-if-not-exists (e.g. claiming a unique sequence number).
IfNoneMatch bool
}
PutCondition specifies a precondition for Put. Only one of IfMatch or IfNoneMatch may be set; setting both is an error.
type S3BlobStore ¶
type S3BlobStore struct {
// contains filtered or unexported fields
}
S3BlobStore implements BlobStore against AWS S3 (or S3-compatible services like MinIO). It requires S3 conditional writes (If-Match, If-None-Match on PutObject), which became generally available in late 2024.
func NewS3BlobStore ¶
func NewS3BlobStore(client *s3.Client, bucket string) *S3BlobStore
NewS3BlobStore wraps an s3.Client and bucket name. The client should be constructed with whatever config/credentials loader is appropriate for your environment — this package doesn't impose one.
Example:
cfg, _ := config.LoadDefaultConfig(ctx) client := s3.NewFromConfig(cfg) store := s3db.NewS3BlobStore(client, "my-bucket") db, _ := s3db.Open(ctx, store, "mydb/")
func (*S3BlobStore) DeletePrefix ¶
func (s *S3BlobStore) DeletePrefix(ctx context.Context, prefix string) error
func (*S3BlobStore) Get ¶
func (s *S3BlobStore) Get(ctx context.Context, key string) (io.ReadCloser, string, error)
func (*S3BlobStore) GetRange ¶
func (s *S3BlobStore) GetRange(ctx context.Context, key string, start, end int64) (io.ReadCloser, error)
type Stats ¶
type Stats struct {
// Seq is the logical version the local DB is synced to. Each
// committed write increments it by exactly 1.
Seq int64
// SchemaVersion is the highest migration version applied.
SchemaVersion int
// SnapshotSize is the current snapshot's size in bytes.
// 0 if unknown (old manifest).
SnapshotSize int64
// LogEntries is the number of changesets not yet compacted.
LogEntries int
// LogBytes is the total size of uncompacted changesets in bytes.
// May undercount if any log entries have unknown size.
LogBytes int64
}
Stats describes the current state of the database. All fields are point-in-time snapshots from the last-seen manifest — other writers may have advanced the database since.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
cmd
|
|
|
s3db
command
Command s3db is a CLI for operating s3db databases from the shell.
|
Command s3db is a CLI for operating s3db databases from the shell. |
|
examples
|
|
|
basic
command
Package main demonstrates the core s3db operations: migrations, writes, reads, compaction, and GC.
|
Package main demonstrates the core s3db operations: migrations, writes, reads, compaction, and GC. |
|
inmemory
command
Package main demonstrates using s3db with the in-memory blob store — useful for local development, unit tests, and exploring the library without AWS credentials.
|
Package main demonstrates using s3db with the in-memory blob store — useful for local development, unit tests, and exploring the library without AWS credentials. |
|
lambda
command
Package main shows the recommended pattern for using s3db from AWS Lambda.
|
Package main shows the recommended pattern for using s3db from AWS Lambda. |
|
internal
|
|
|
etag
Package etag provides ETag computation and normalization.
|
Package etag provides ETag computation and normalization. |