litestream

package module
v0.3.4-rc1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2021 License: Apache-2.0 Imports: 27 Imported by: 7

README

Litestream GitHub release (latest by date) Status GitHub Docker Pulls test

Litestream is a standalone streaming replication tool for SQLite. It runs as a background process and safely replicates changes incrementally to another file or S3. Litestream only communicates with SQLite through the SQLite API so it will not corrupt your database.

If you need support or have ideas for improving Litestream, please join the Litestream Slack or visit the GitHub Discussions. Please visit the Litestream web site for installation instructions and documentation.

If you find this project interesting, please consider starring the project on GitHub.

Acknowledgements

While the Litestream project does not accept external code patches, many of the most valuable contributions are in the forms of testing, feedback, and documentation. These help harden software and streamline usage for other users.

I want to give special thanks to individuals who invest much of their time and energy into the project to help make it better. Shout out to Michael Lynch for digging into issues and contributing to the documentation.

Open-source, not open-contribution

Similar to SQLite, Litestream is open source but closed to code contributions. This keeps the code base free of proprietary or licensed code but it also helps me continue to maintain and build Litestream.

As the author of BoltDB, I found that accepting and maintaining third party patches contributed to my burn out and I eventually archived the project. Writing databases & low-level replication tools involves nuance and simple one line changes can have profound and unexpected changes in correctness and performance. Small contributions typically required hours of my time to properly test and validate them.

I am grateful for community involvement, bug reports, & feature requests. I do not wish to come off as anything but welcoming, however, I've made the decision to keep this project closed to contributions for my own mental health and long term viability of the project.

The documentation repository is MIT licensed and pull requests are welcome there.

Documentation

Index

Constants

View Source
const (
	DefaultMonitorInterval    = 1 * time.Second
	DefaultCheckpointInterval = 1 * time.Minute
	DefaultMinCheckpointPageN = 1000
	DefaultMaxCheckpointPageN = 10000
)

Default DB settings.

View Source
const (
	WALHeaderChecksumOffset      = 24
	WALFrameHeaderChecksumOffset = 16
)

SQLite WAL constants

View Source
const (
	MetaDirSuffix = "-litestream"

	WALDirName  = "wal"
	WALExt      = ".wal"
	SnapshotExt = ".snapshot"

	GenerationNameLen = 16
)

Naming constants.

View Source
const (
	CheckpointModePassive  = "PASSIVE"
	CheckpointModeFull     = "FULL"
	CheckpointModeRestart  = "RESTART"
	CheckpointModeTruncate = "TRUNCATE"
)

SQLite checkpoint modes.

View Source
const (
	// WALHeaderSize is the size of the WAL header, in bytes.
	WALHeaderSize = 32

	// WALFrameHeaderSize is the size of the WAL frame header, in bytes.
	WALFrameHeaderSize = 24
)
View Source
const (
	DefaultRetention              = 24 * time.Hour
	DefaultRetentionCheckInterval = 1 * time.Hour
)

Default file replica settings.

View Source
const BusyTimeout = 1 * time.Second

BusyTimeout is the timeout to wait for EBUSY from SQLite.

View Source
const DefaultRestoreParallelism = 8

DefaultRestoreParallelism is the default parallelism when downloading WAL files.

View Source
const MaxIndex = 0x7FFFFFFF

MaxIndex is the maximum possible WAL index. If this index is reached then a new generation will be started.

Variables

View Source
var (
	ErrNoGeneration     = errors.New("no generation available")
	ErrNoSnapshots      = errors.New("no snapshots available")
	ErrChecksumMismatch = errors.New("invalid replica, checksum mismatch")
)

Litestream errors.

View Source
var Tracef = func(format string, a ...interface{}) {}

Tracef is used for low-level tracing.

Functions

func Checksum

func Checksum(bo binary.ByteOrder, s0, s1 uint32, b []byte) (uint32, uint32)

Checksum computes a running SQLite checksum over a byte slice.

func FormatWALPath added in v0.2.0

func FormatWALPath(index int) string

FormatWALPath formats a WAL filename with a given index.

func FormatWALPathWithOffset added in v0.2.0

func FormatWALPathWithOffset(index int, offset int64) string

FormatWALPathWithOffset formats a WAL filename with a given index & offset.

func IsGenerationName

func IsGenerationName(s string) bool

IsGenerationName returns true if s is the correct length and is only lowercase hex characters.

func IsSnapshotPath

func IsSnapshotPath(s string) bool

IsSnapshotPath returns true if s is a path to a snapshot file.

func IsWALPath

func IsWALPath(s string) bool

IsWALPath returns true if s is a path to a WAL file.

func ParseSnapshotPath

func ParseSnapshotPath(s string) (index int, ext string, err error)

ParseSnapshotPath returns the index for the snapshot. Returns an error if the path is not a valid snapshot path.

func ParseWALPath

func ParseWALPath(s string) (index int, offset int64, ext string, err error)

ParseWALPath returns the index & offset for the WAL file. Returns an error if the path is not a valid snapshot path.

func RestoreReplica added in v0.3.1

func RestoreReplica(ctx context.Context, r Replica, opt RestoreOptions) (err error)

RestoreReplica restores the database from a replica based on the options given. This method will restore into opt.OutputPath, if specified, or into the DB's original database path. It can optionally restore from a specific replica or generation or it will automatically choose the best one. Finally, a timestamp can be specified to restore the database to a specific point-in-time.

func SnapshotIndexAt added in v0.3.0

func SnapshotIndexAt(ctx context.Context, r Replica, generation string, timestamp time.Time) (int, error)

SnapshotIndexAt returns the highest index for a snapshot within a generation that occurs before timestamp. If timestamp is zero, returns the latest snapshot.

func SnapshotIndexByIndex added in v0.3.4

func SnapshotIndexByIndex(ctx context.Context, r Replica, generation string, index int) (int, error)

SnapshotIndexbyIndex returns the highest index for a snapshot within a generation that occurs before a given index. If index is MaxInt32, returns the latest snapshot.

func ValidateReplica added in v0.3.0

func ValidateReplica(ctx context.Context, r Replica) error

ValidateReplica restores the most recent data from a replica and validates that the resulting database matches the current database.

func WALIndexAt added in v0.3.0

func WALIndexAt(ctx context.Context, r Replica, generation string, maxIndex int, timestamp time.Time) (int, error)

WALIndexAt returns the highest index for a WAL file that occurs before maxIndex & timestamp. If timestamp is zero, returns the highest WAL index. Returns -1 if no WAL found and MaxInt32 specified.

Types

type DB

type DB struct {

	// Minimum threshold of WAL size, in pages, before a passive checkpoint.
	// A passive checkpoint will attempt a checkpoint but fail if there are
	// active transactions occurring at the same time.
	MinCheckpointPageN int

	// Maximum threshold of WAL size, in pages, before a forced checkpoint.
	// A forced checkpoint will block new transactions and wait for existing
	// transactions to finish before issuing a checkpoint and resetting the WAL.
	//
	// If zero, no checkpoints are forced. This can cause the WAL to grow
	// unbounded if there are always read transactions occurring.
	MaxCheckpointPageN int

	// Time between automatic checkpoints in the WAL. This is done to allow
	// more fine-grained WAL files so that restores can be performed with
	// better precision.
	CheckpointInterval time.Duration

	// Frequency at which to perform db sync.
	MonitorInterval time.Duration

	// List of replicas for the database.
	// Must be set before calling Open().
	Replicas []Replica
	// contains filtered or unexported fields
}

DB represents a managed instance of a SQLite database in the file system.

func NewDB

func NewDB(path string) *DB

NewDB returns a new instance of DB for a given path.

func (*DB) CRC64 added in v0.3.0

func (db *DB) CRC64(ctx context.Context) (uint64, Pos, error)

CRC64 returns a CRC-64 ISO checksum of the database and its current position.

This function obtains a read lock so it prevents syncs from occurring until the operation is complete. The database will still be usable but it will be unable to checkpoint during this time.

If dst is set, the database file is copied to that location before checksum.

func (*DB) CalcRestoreTarget added in v0.3.1

func (db *DB) CalcRestoreTarget(ctx context.Context, opt RestoreOptions) (Replica, string, error)

CalcRestoreTarget returns a replica & generation to restore from based on opt criteria.

func (*DB) Checkpoint added in v0.3.0

func (db *DB) Checkpoint(mode string) (err error)

Checkpoint performs a checkpoint on the WAL file.

func (*DB) Close

func (db *DB) Close() (err error)

Close releases the read lock & closes the database. This method should only be called by tests as it causes the underlying database to be checkpointed.

func (*DB) CurrentGeneration

func (db *DB) CurrentGeneration() (string, error)

CurrentGeneration returns the name of the generation saved to the "generation" file in the meta data directory. Returns empty string if none exists.

func (*DB) CurrentShadowWALIndex

func (db *DB) CurrentShadowWALIndex(generation string) (index int, size int64, err error)

CurrentShadowWALIndex returns the current WAL index & total size.

func (*DB) CurrentShadowWALPath

func (db *DB) CurrentShadowWALPath(generation string) (string, error)

CurrentShadowWALPath returns the path to the last shadow WAL in a generation.

func (*DB) GenerationNamePath

func (db *DB) GenerationNamePath() string

GenerationNamePath returns the path of the name of the current generation.

func (*DB) GenerationPath

func (db *DB) GenerationPath(generation string) string

GenerationPath returns the path of a single generation. Panics if generation is blank.

func (*DB) MetaPath

func (db *DB) MetaPath() string

MetaPath returns the path to the database metadata.

func (*DB) Notify

func (db *DB) Notify() <-chan struct{}

Notify returns a channel that closes when the shadow WAL changes.

func (*DB) Open

func (db *DB) Open() (err error)

Open initializes the background monitoring goroutine.

func (*DB) PageSize

func (db *DB) PageSize() int

PageSize returns the page size of the underlying database. Only valid after database exists & Init() has successfully run.

func (*DB) Path

func (db *DB) Path() string

Path returns the path to the database.

func (*DB) Pos added in v0.2.0

func (db *DB) Pos() (Pos, error)

Pos returns the current position of the database.

func (*DB) Replica added in v0.2.0

func (db *DB) Replica(name string) Replica

Replica returns a replica by name.

func (*DB) SQLDB added in v0.3.0

func (db *DB) SQLDB() *sql.DB

SQLDB returns a reference to the underlying sql.DB connection.

func (*DB) ShadowWALDir added in v0.2.0

func (db *DB) ShadowWALDir(generation string) string

ShadowWALDir returns the path of the shadow wal directory. Panics if generation is blank.

func (*DB) ShadowWALPath

func (db *DB) ShadowWALPath(generation string, index int) string

ShadowWALPath returns the path of a single shadow WAL file. Panics if generation is blank or index is negative.

func (*DB) ShadowWALReader

func (db *DB) ShadowWALReader(pos Pos) (r *ShadowWALReader, err error)

ShadowWALReader opens a reader for a shadow WAL file at a given position. If the reader is at the end of the file, it attempts to return the next file.

The caller should check Pos() & Size() on the returned reader to check offset.

func (*DB) Snapshots added in v0.2.0

func (db *DB) Snapshots(ctx context.Context) ([]*SnapshotInfo, error)

Snapshots returns a list of all snapshots across all replicas.

func (*DB) SoftClose

func (db *DB) SoftClose() (err error)

SoftClose closes everything but the underlying db connection. This method is available because the binary needs to avoid closing the database on exit to prevent autocheckpointing.

func (*DB) Sync

func (db *DB) Sync(ctx context.Context) (err error)

Sync copies pending data from the WAL to the shadow WAL.

func (*DB) UpdatedAt

func (db *DB) UpdatedAt() (time.Time, error)

UpdatedAt returns the last modified time of the database or WAL file.

func (*DB) WALPath

func (db *DB) WALPath() string

WALPath returns the path to the database's WAL file.

func (*DB) WALs added in v0.2.0

func (db *DB) WALs(ctx context.Context) ([]*WALInfo, error)

WALs returns a list of all WAL files across all replicas.

type FileReplica

type FileReplica struct {

	// Frequency to create new snapshots.
	SnapshotInterval time.Duration

	// Time to keep snapshots and related WAL files.
	// Database is snapshotted after interval, if needed, and older WAL files are discarded.
	Retention time.Duration

	// Time between checks for retention.
	RetentionCheckInterval time.Duration

	// Time between validation checks.
	ValidationInterval time.Duration

	// If true, replica monitors database for changes automatically.
	// Set to false if replica is being used synchronously (such as in tests).
	MonitorEnabled bool
	// contains filtered or unexported fields
}

FileReplica is a replica that replicates a DB to a local file path.

func NewFileReplica

func NewFileReplica(db *DB, name, dst string) *FileReplica

NewFileReplica returns a new instance of FileReplica.

func (*FileReplica) CalcPos added in v0.2.0

func (r *FileReplica) CalcPos(ctx context.Context, generation string) (pos Pos, err error)

CalcPos returns the position for the replica for the current generation. Returns a zero value if there is no active generation.

func (*FileReplica) DB added in v0.3.0

func (r *FileReplica) DB() *DB

DB returns the parent database reference.

func (*FileReplica) EnforceRetention added in v0.3.0

func (r *FileReplica) EnforceRetention(ctx context.Context) (err error)

EnforceRetention forces a new snapshot once the retention interval has passed. Older snapshots and WAL files are then removed.

func (*FileReplica) GenerationDir added in v0.3.0

func (r *FileReplica) GenerationDir(generation string) string

GenerationDir returns the path to a generation's root directory.

func (*FileReplica) GenerationStats

func (r *FileReplica) GenerationStats(ctx context.Context, generation string) (stats GenerationStats, err error)

GenerationStats returns stats for a generation.

func (*FileReplica) Generations

func (r *FileReplica) Generations(ctx context.Context) ([]string, error)

Generations returns a list of available generation names.

func (*FileReplica) LastPos added in v0.2.0

func (r *FileReplica) LastPos() Pos

LastPos returns the last successfully replicated position.

func (*FileReplica) MaxSnapshotIndex added in v0.2.0

func (r *FileReplica) MaxSnapshotIndex(generation string) (int, error)

MaxSnapshotIndex returns the highest index for the snapshots.

func (*FileReplica) Name

func (r *FileReplica) Name() string

Name returns the name of the replica. Returns the type if no name set.

func (*FileReplica) Path added in v0.3.0

func (r *FileReplica) Path() string

Path returns the path the replica was initialized with.

func (*FileReplica) Snapshot added in v0.3.3

func (r *FileReplica) Snapshot(ctx context.Context) error

Snapshot copies the entire database to the replica path.

func (*FileReplica) SnapshotDir

func (r *FileReplica) SnapshotDir(generation string) string

SnapshotDir returns the path to a generation's snapshot directory.

func (*FileReplica) SnapshotPath

func (r *FileReplica) SnapshotPath(generation string, index int) string

SnapshotPath returns the path to a snapshot file.

func (*FileReplica) SnapshotReader

func (r *FileReplica) SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error)

SnapshotReader returns a reader for snapshot data at the given generation/index. Returns os.ErrNotExist if no matching index is found.

func (*FileReplica) Snapshots added in v0.2.0

func (r *FileReplica) Snapshots(ctx context.Context) ([]*SnapshotInfo, error)

Snapshots returns a list of available snapshots in the replica.

func (*FileReplica) Start

func (r *FileReplica) Start(ctx context.Context) (err error)

Start starts replication for a given generation.

func (*FileReplica) Stop

func (r *FileReplica) Stop(hard bool) (err error)

Stop cancels any outstanding replication and blocks until finished.

Performing a hard stop will close the DB file descriptor which could release locks on per-process locks. Hard stops should only be performed when stopping the entire process.

func (*FileReplica) Sync added in v0.2.0

func (r *FileReplica) Sync(ctx context.Context) (err error)

Sync replays data from the shadow WAL into the file replica.

func (*FileReplica) Type

func (r *FileReplica) Type() string

Type returns the type of replica.

func (*FileReplica) WALDir

func (r *FileReplica) WALDir(generation string) string

WALDir returns the path to a generation's WAL directory

func (*FileReplica) WALPath

func (r *FileReplica) WALPath(generation string, index int) string

WALPath returns the path to a WAL file.

func (*FileReplica) WALReader

func (r *FileReplica) WALReader(ctx context.Context, generation string, index int) (io.ReadCloser, error)

WALReader returns a reader for WAL data at the given index. Returns os.ErrNotExist if no matching index is found.

func (*FileReplica) WALs added in v0.2.0

func (r *FileReplica) WALs(ctx context.Context) ([]*WALInfo, error)

WALs returns a list of available WAL files in the replica.

type GenerationStats

type GenerationStats struct {
	// Count of snapshot & WAL files.
	SnapshotN int
	WALN      int

	// Time range for the earliest snapshot & latest WAL file update.
	CreatedAt time.Time
	UpdatedAt time.Time
}

GenerationStats represents high level stats for a single generation.

func CalcReplicaRestoreTarget added in v0.3.1

func CalcReplicaRestoreTarget(ctx context.Context, r Replica, opt RestoreOptions) (generation string, stats GenerationStats, err error)

CalcReplicaRestoreTarget returns a generation to restore from.

type Pos

type Pos struct {
	Generation string // generation name
	Index      int    // wal file index
	Offset     int64  // offset within wal file
}

Pos is a position in the WAL for a generation.

func (Pos) IsZero

func (p Pos) IsZero() bool

IsZero returns true if p is the zero value.

func (Pos) String

func (p Pos) String() string

String returns a string representation.

type Replica

type Replica interface {
	// The name of the replica. Defaults to type if no name specified.
	Name() string

	// String identifier for the type of replica ("file", "s3", etc).
	Type() string

	// The parent database.
	DB() *DB

	// Starts replicating in a background goroutine.
	Start(ctx context.Context) error

	// Stops all replication processing. Blocks until processing stopped.
	Stop(hard bool) error

	// Performs a backup of outstanding WAL frames to the replica.
	Sync(ctx context.Context) error

	// Returns the last replication position.
	LastPos() Pos

	// Returns the computed position of the replica for a given generation.
	CalcPos(ctx context.Context, generation string) (Pos, error)

	// Returns a list of generation names for the replica.
	Generations(ctx context.Context) ([]string, error)

	// Returns basic information about a generation including the number of
	// snapshot & WAL files as well as the time range covered.
	GenerationStats(ctx context.Context, generation string) (GenerationStats, error)

	// Returns a list of available snapshots in the replica.
	Snapshots(ctx context.Context) ([]*SnapshotInfo, error)

	// Returns a list of available WAL files in the replica.
	WALs(ctx context.Context) ([]*WALInfo, error)

	// Returns a reader for snapshot data at the given generation/index.
	SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error)

	// Returns a reader for WAL data at the given position.
	WALReader(ctx context.Context, generation string, index int) (io.ReadCloser, error)
}

Replica represents a remote destination to replicate the database & WAL.

type RestoreOptions

type RestoreOptions struct {
	// Target path to restore into.
	// If blank, the original DB path is used.
	OutputPath string

	// Specific replica to restore from.
	// If blank, all replicas are considered.
	ReplicaName string

	// Specific generation to restore from.
	// If blank, all generations considered.
	Generation string

	// Specific index to restore from.
	// Set to math.MaxInt32 to ignore index.
	Index int

	// Point-in-time to restore database.
	// If zero, database restore to most recent state available.
	Timestamp time.Time

	// Specifies how many WAL files are downloaded in parallel during restore.
	Parallelism int

	// Logging settings.
	Logger  *log.Logger
	Verbose bool
}

RestoreOptions represents options for DB.Restore().

func NewRestoreOptions added in v0.2.0

func NewRestoreOptions() RestoreOptions

NewRestoreOptions returns a new instance of RestoreOptions with defaults.

type ShadowWALReader

type ShadowWALReader struct {
	// contains filtered or unexported fields
}

ShadowWALReader represents a reader for a shadow WAL file that tracks WAL position.

func (*ShadowWALReader) Close

func (r *ShadowWALReader) Close() error

Close closes the underlying WAL file handle.

func (*ShadowWALReader) N

func (r *ShadowWALReader) N() int64

N returns the remaining bytes in the reader.

func (*ShadowWALReader) Name added in v0.3.2

func (r *ShadowWALReader) Name() string

Name returns the filename of the underlying file.

func (*ShadowWALReader) Pos

func (r *ShadowWALReader) Pos() Pos

Pos returns the current WAL position.

func (*ShadowWALReader) Read

func (r *ShadowWALReader) Read(p []byte) (n int, err error)

Read reads bytes into p, updates the position, and returns the bytes read. Returns io.EOF at the end of the available section of the WAL.

type SnapshotInfo added in v0.2.0

type SnapshotInfo struct {
	Name       string
	Replica    string
	Generation string
	Index      int
	Size       int64
	CreatedAt  time.Time
}

SnapshotInfo represents file information about a snapshot.

func FilterSnapshotsAfter added in v0.3.0

func FilterSnapshotsAfter(a []*SnapshotInfo, t time.Time) []*SnapshotInfo

FilterSnapshotsAfter returns all snapshots that were created on or after t.

func FindMinSnapshotByGeneration added in v0.3.0

func FindMinSnapshotByGeneration(a []*SnapshotInfo, generation string) *SnapshotInfo

FindMinSnapshotByGeneration finds the snapshot with the lowest index in a generation.

type WALInfo added in v0.2.0

type WALInfo struct {
	Name       string
	Replica    string
	Generation string
	Index      int
	Offset     int64
	Size       int64
	CreatedAt  time.Time
}

WALInfo represents file information about a WAL file.

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL