Documentation

Overview

Package wal implements a full write-ahead log over a directory of files, including snapshot management, reply iterators, and rotating log writers.

Example
package main

import (
	"context"
	"fmt"
	"io/fs"
	"log"
	"os"

	"entrogo.com/stuffedio/wal"
)

func appendToWAL(ctx context.Context, dir string, values []string, finalize bool) error {
	w, err := wal.Open(ctx, dir,
		wal.WithAllowWrite(true),
		wal.WithEmptySnapshotLoader(true),
		wal.WithEmptyJournalPlayer(true),
	)
	if err != nil {
		return fmt.Errorf("append to WAL: %w", err)
	}
	defer w.Close()

	for _, v := range values {
		if err := w.Append([]byte(v)); err != nil {
			return fmt.Errorf("append to WAL: %w", err)
		}
	}
	if finalize {
		if err := w.Finalize(); err != nil {
			return fmt.Errorf("append to WAL: %w", err)
		}
	}
	return nil
}

func readWAL(ctx context.Context, dir string) (snapshot, journal []string, err error) {
	w, err := wal.Open(ctx, dir,
		wal.WithSnapshotLoaderFunc(func(_ context.Context, b []byte) error {
			snapshot = append(snapshot, string(b))
			return nil
		}),
		wal.WithJournalPlayerFunc(func(_ context.Context, b []byte) error {
			journal = append(journal, string(b))
			return nil
		}),
	)
	if err != nil {
		return nil, nil, fmt.Errorf("read WAL: %v", err)
	}
	defer w.Close()

	return snapshot, journal, nil
}

func makeSnapshot(ctx context.Context, dir string) (string, error) {
	var values []string
	w, err := wal.Open(ctx, dir,
		wal.WithExcludeLiveJournal(true),
		wal.WithJournalPlayerFunc(func(_ context.Context, b []byte) error {
			values = append(values, string(b))
			return nil
		}),
		wal.WithSnapshotLoaderFunc(func(_ context.Context, b []byte) error {
			values = append(values, string(b))
			return nil
		}),
	)
	if err != nil {
		return "", fmt.Errorf("make snapshot: %w", err)
	}

	if err := w.CheckCanSnapshot(); err != nil {
		return "", fmt.Errorf("make snapshot: %w", err)
	}

	fname, err := w.CreateSnapshot(func(a wal.ValueAdder) error {
		for _, val := range values {
			if err := a.AddValue([]byte(val)); err != nil {
				return fmt.Errorf("snapshotter: %w", err)
			}
		}
		return nil
	})
	if err != nil {
		return "", fmt.Errorf("make snapshot: %w", err)
	}
	return fname, nil
}

func mustLogFiles(dir string) {
	ds, err := fs.ReadDir(os.DirFS(dir), ".")
	if err != nil {
		log.Fatalf("Error reading dir %q: %v", dir, err)
	}
	for _, de := range ds {
		log.Print(de.Name())
	}
}

func main() {
	ctx := context.Background()
	// This example just makes something in tmp. Real use would use a more
	// durable location (and not delete afterward).
	dir, err := os.MkdirTemp("", "walex-")
	if err != nil {
		log.Fatalf("Error making temp dir: %v", err)
	}
	defer os.RemoveAll(dir)

	// Add some things to the WAL.
	if err := appendToWAL(ctx, dir, []string{
		"Message 1",
		"Message 2",
		"Message 3",
		"Message 4",
	}, true); err != nil {
		log.Fatalf("Error appending to empty WAL: %v", err)
	}

	// Read them back, also prepare to make a snapshot.
	snapshotVals, journalVals, err := readWAL(ctx, dir)
	if err != nil {
		log.Fatalf("Error reading initial WAL: %v", err)
	}

	fmt.Println("Read Initial:")
	for _, val := range snapshotVals {
		fmt.Println("- Snapshot: " + val)
	}
	for _, val := range journalVals {
		fmt.Println("- Journal: " + val)
	}

	// Now create a snapshot and dump values into it, based on where we ended
	// up when in read-only mode.
	if _, err := makeSnapshot(ctx, dir); err != nil {
		log.Fatalf("Error creating snapshot: %v", err)
	}

	// We can now open again in write mode, and dump more things in the
	// journal. We'll leave it "live" this time:
	if err := appendToWAL(ctx, dir, []string{
		"Message 5",
		"Message 6",
	}, false); err != nil {
		log.Fatalf("Error appending to WAL after snapshot: %v", err)
	}

	// Read the whole WAL back, which will include the snapshot and latest journal.
	finalSnapVals, finalJVals, err := readWAL(ctx, dir)
	if err != nil {
		log.Fatalf("Error reading final WAL: %v", err)
	}

	fmt.Println("Read Final:")
	for _, v := range finalSnapVals {
		fmt.Println("- Snapshot: " + v)
	}
	for _, v := range finalJVals {
		fmt.Println("- Journal: " + v)
	}

}
Output:

Read Initial:
- Journal: Message 1
- Journal: Message 2
- Journal: Message 3
- Journal: Message 4
Read Final:
- Snapshot: Message 1
- Snapshot: Message 2
- Snapshot: Message 3
- Snapshot: Message 4
- Journal: Message 5
- Journal: Message 6

Index

Examples

Constants

View Source
const (
	DefaultJournalBase  = "journal"
	DefaultSnapshotBase = "snapshot"
	DefaultMaxIndices   = 10 * 1 << 10 // 10Ki
	DefaultMaxBytes     = 10 * 1 << 20 // 10Mi
	OldPrefix           = "_old__"
	PartPrefix          = "_partial__"
	FinalSuffix         = "final"
)

Variables

This section is empty.

Functions

func CheckIndexName

func CheckIndexName(name, base string, index uint64) error

CheckIndexName checkes that the given file name contains the right base and index.

func Cleanup

func Cleanup(dir string) error

Cleanup cleans old and partial files. Be careful not to use this in the middle of a snapshot.

func IndexName

func IndexName(base string, idx uint64) string

IndexName returns a string for the given index value.

func OldFiles

func OldFiles(dir string) ([]string, error)

OldFiles returns a list of old files that have been moved out of the way (e.g., old journals after snapshotting).

func PartialFiles

func PartialFiles(dir string) ([]string, error)

PartialFiles returns a list of partial files. Be careful using this to clean up, as these might be actively written to. A partial snapshot can be deleted after a successful snapshot, for example. Live journals are not considered "partial".

Types

type FileMeta

type FileMeta struct {
	Name    string
	Base    string
	Index   uint64
	IsFinal bool
	IsOld   bool
}

FileMeta contains information about a file entry in the log directory.

func ParseIndexName

func ParseIndexName(name string) (*FileMeta, error)

ParseIndexName pulls the index from a file name. Should not have path components.

func (*FileMeta) String

func (m *FileMeta) String() string

String outputs friendly info for this file meta.

type Loader

type Loader interface {
	Load(context.Context, []byte) error
}

Loader is a function type called during snapshot item loading.

type LoaderFunc

type LoaderFunc func(context.Context, []byte) error

func (LoaderFunc) Load

func (f LoaderFunc) Load(ctx context.Context, b []byte) error

type Option

type Option func(*WAL)

Option describes a WAL creation option.

func WithAllowWrite

func WithAllowWrite(a bool) Option

WithAllowWrite lets the WAL go into "write" mode after it has been parsed. The default is to be read-only, and to create errors when attempting to do write operations on the file system. This makes it easy to not make mistakes, for example, when trying to collect journals to make a new snapshot. The WAL can be opened in read-only mode, a snapshot can be created, then it can be reopened in write mode and it will know to load that snapshot instead of replaying the entire set of journals. An empty snapshot loader can be given in that case to speed the loading process.

func WithEmptyJournalPlayer

func WithEmptyJournalPlayer(a bool) Option

WithEmptyJournalPlayer indicates that journals are to be scanned, not processed. This is a safety measure to avoid default behavior being unwanted: usually you want to process journal entries, but sometimes there is good reason to simply scan for the proper index and start appending.

func WithEmptySnapshotLoader

func WithEmptySnapshotLoader(a bool) Option

WithEmptySnapshotLoader indicates that not loading a snapshot is actually desired. This is to prevent mistakes: usually a snapshot adder is wanted, if there is a snapshot to be added.

func WithExcludeLiveJournal

func WithExcludeLiveJournal(e bool) Option

WithExcludeLiveJournal indicates that only "final" journals should be played. Implies writing disallowed. It is an error to specify this with WithAllowWrite.

func WithJournalBase

func WithJournalBase(p string) Option

WithJournalBase sets the journal base, otherwise uses DefaultJournalBase.

func WithJournalPlayer

func WithJournalPlayer(p Player) Option

WithJournalPlayer sets the journal player for all journal records after the latest snapshot. Clients provide this to allow journals to be replayed.

func WithJournalPlayerFunc

func WithJournalPlayerFunc(f PlayerFunc) Option

WithJournalPlayerFunc sets the journal player using a function.

func WithMaxJournalBytes

func WithMaxJournalBytes(m int64) Option

WithMaxJournalBytes sets the maximum number of bytes before a journal is rotated. Default is DefaultMaxBytes.

func WithMaxJournalIndices

func WithMaxJournalIndices(m int) Option

WithMaxJournalIndices sets the maximum number of indices in a journal file before it must be rotated.

func WithRequireEmpty

func WithRequireEmpty(e bool) Option

WithRequireEmpty can be used to indicate that this is a brand new WAL and it must have zero files in it when it is initially opened. Can be used with WithAllowWrite if desired. Implies that empty loaders and players are allowed.

func WithSnapshotBase

func WithSnapshotBase(p string) Option

WithSnaphotBase sets the snapshot base, otherwise uses DefaultSnapshotBase.

func WithSnapshotLoader

func WithSnapshotLoader(a Loader) Option

WithSnapshotLoader sets the record adder for all snapshot records. Clients provide one of these to allow snapshots to be loaded.

func WithSnapshotLoaderFunc

func WithSnapshotLoaderFunc(f LoaderFunc) Option

WithSnapshotLoaderFunc sets the snapshot loader using a function.

type Player

type Player interface {
	Play(context.Context, []byte) error
}

Player is a function type called during journal item replay.

type PlayerFunc

type PlayerFunc func(context.Context, []byte) error

func (PlayerFunc) Play

func (f PlayerFunc) Play(ctx context.Context, b []byte) error

type Snapshotter

type Snapshotter func(ValueAdder) error

Snapshotter receives a ValuerAdder that it can use to provide values. If it returns a non-nil error, the snapshot will not be finalized.

type ValueAdder

type ValueAdder interface {
	AddValue([]byte) error
}

ValueAdder describes the interface passed to snapshot creation functions, allowing them to add entries to a snapshot while important scaffolding is handled behind the scenes..

type WAL

type WAL struct {
	sync.Mutex
	// contains filtered or unexported fields
}

WAL implements a write-ahead logger capable of replaying snapshots and journals, setting up a writer for appending to journals and rotating them when full, etc.

func Open

func Open(ctx context.Context, dir string, opts ...Option) (*WAL, error)

Open opens a directory and loads the WAL found in it, then provides a WAL that can be appended to over time.

func (*WAL) Append

func (w *WAL) Append(b []byte) (err error)

Append sends another record to the journal, and can trigger rotation of underlying files.

func (*WAL) CanSnapshot

func (w *WAL) CanSnapshot() bool

CanSnapshot indicates whether a snapshot can be taken, ignoring the actual reason. Calls CheckCanSnapshot underneath. If a reason error is desired, use CheckCanSnapshot.

func (*WAL) CheckCanSnapshot

func (w *WAL) CheckCanSnapshot() error

func (*WAL) Close

func (w *WAL) Close() error

Close cleans up any open resources. Live journals are left live, however, for next time. To ensure that live journals get finalized, call Finalize.

func (*WAL) CreateSnapshot

func (w *WAL) CreateSnapshot(s Snapshotter) (string, error)

CreateSnapshot creates a snapshot encoder for writing. Checks whether it can proceed first, returning an error if it is not sensible to create a snapshot. Call CanSnapshot first to look before you leap.

func (*WAL) CurrIndex

func (w *WAL) CurrIndex() uint64

CurrIndex returns index number for the most recently read (or written) journal entry.

func (*WAL) Finalize

func (w *WAL) Finalize() error

Finalize causes an open live journal to become final, so it will not be appended to again. After completion, writes are disabled, but snapshots can be created.

func (*WAL) SnapshotUseful

func (w *WAL) SnapshotUseful() bool

SnapshotUseful indicates whether a not-live journal was loaded (non-live), e.g., after a snapshot file, or before a snapshot has been taken. Check this to determine whether taking a snapshot makes any sense from a "did we read anything new" perspective.

This only makes sense on load - it wouldn't be used on a live system that has live information in its memory, only on a system that excludes live journals. Returns false in any case where it makes not sense to snapshot.

Source Files