wal

package
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 17, 2026 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func EnsureDir

func EnsureDir(path string) error

EnsureDir creates a directory if it doesn't exist.

func FileExists

func FileExists(path string) bool

FileExists checks if a file exists.

func FileSize

func FileSize(path string) (int64, error)

FileSize returns the size of a file in bytes.

Types

type BatchEntry

type BatchEntry struct {
	OpType OpType
	Data   []byte
}

BatchEntry is one (opType, data) pair for an atomic batch write. It is the exported input to AppendBatchAtomic — the storage layer's transaction-commit path builds these directly (the internal pendingEntry carries a done-channel the synchronous atomic path does not need).

type BatchedWAL

type BatchedWAL struct {
	// contains filtered or unexported fields
}

BatchedWAL wraps WAL with batching capabilities for better write performance

func NewBatchedWAL

func NewBatchedWAL(dataDir string, batchSize int, flushInterval time.Duration) (*BatchedWAL, error)

NewBatchedWAL creates a new batched WAL

func (*BatchedWAL) Append

func (bw *BatchedWAL) Append(opType OpType, data []byte) (uint64, error)

Append enqueues an entry and blocks until it is durable. Equivalent to Enqueue followed by Wait; retained for callers that don't need to release a lock between the two.

func (*BatchedWAL) AppendBatchAtomic

func (bw *BatchedWAL) AppendBatchAtomic(entries []BatchEntry) error

AppendBatchAtomic writes a batch of entries to the underlying WAL with a single fsync (all-or-none at the fsync boundary), bypassing the background- flush queue. The underlying WAL serializes this against the background flusher via its own mutex, so it is safe to call concurrently with ticker flushes. Used by Transaction.Commit for atomic commit durability.

func (*BatchedWAL) CheckpointLSN added in v0.5.0

func (bw *BatchedWAL) CheckpointLSN() uint64

CheckpointLSN drains every entry enqueued before the call — including any batch the background flusher has taken ownership of but not yet LSN-assigned — and returns the boundary LSN: all prior enqueues have LSN ≤ the returned value; anything enqueued after gets LSN > it (provided the caller serializes enqueuers against this call, as GraphStorage does via gs.mu). This is the batched-WAL boundary capture for the M-1 snapshot+TruncateUpTo checkpoint; a bare GetCurrentLSN would miss in-flight batches.

func (*BatchedWAL) Close

func (bw *BatchedWAL) Close() error

Close closes the batched WAL

func (*BatchedWAL) Enqueue

func (bw *BatchedWAL) Enqueue(opType OpType, data []byte) *Pending

Enqueue appends an entry to the batch buffer and returns a Pending handle WITHOUT waiting for durability. The caller MUST call Wait() on the returned handle before treating the write as durable. Entries become durable in enqueue order (batches flush FIFO), so a caller that enqueues under a lock preserves WAL order even after releasing that lock before Wait().

func (*BatchedWAL) GetCurrentLSN

func (bw *BatchedWAL) GetCurrentLSN() uint64

GetCurrentLSN returns the current LSN

func (*BatchedWAL) Replay

func (bw *BatchedWAL) Replay(handler func(*Entry) error) error

Replay replays WAL entries

func (*BatchedWAL) Truncate

func (bw *BatchedWAL) Truncate() error

Truncate truncates the WAL

func (*BatchedWAL) TruncateUpTo added in v0.5.0

func (bw *BatchedWAL) TruncateUpTo(lsn uint64) error

TruncateUpTo flushes pending entries, then delegates to the underlying WAL. Interleaving with a concurrent background flush is safe either way: AppendBatch holds the WAL's mutex for the whole batch, so an in-flight batch lands wholly before the rewrite (and is copied, its LSNs being > the checkpoint boundary) or wholly after (appended to the rewritten file).

type CompressedWAL

type CompressedWAL struct {
	// contains filtered or unexported fields
}

CompressedWAL is a Write-Ahead Log with snappy compression

func NewCompressedWAL

func NewCompressedWAL(dataDir string) (*CompressedWAL, error)

NewCompressedWAL creates a new compressed Write-Ahead Log

func (*CompressedWAL) Append

func (w *CompressedWAL) Append(opType OpType, data []byte) (uint64, error)

Append appends a new entry to the compressed WAL

func (*CompressedWAL) Close

func (w *CompressedWAL) Close() error

Close closes the WAL

func (*CompressedWAL) Flush

func (w *CompressedWAL) Flush() error

Flush flushes the WAL to disk

func (*CompressedWAL) GetCurrentLSN

func (w *CompressedWAL) GetCurrentLSN() uint64

GetCurrentLSN returns the current LSN

func (*CompressedWAL) GetStatistics

func (w *CompressedWAL) GetStatistics() CompressedWALStats

GetStatistics returns compression statistics

func (*CompressedWAL) ReadAll

func (w *CompressedWAL) ReadAll() ([]*Entry, error)

ReadAll reads all entries from the WAL (decompressing data)

func (*CompressedWAL) Replay

func (w *CompressedWAL) Replay(handler func(*Entry) error) error

Replay iterates through all WAL entries and calls the handler for each. This is used for recovery after restart.

func (*CompressedWAL) Truncate

func (w *CompressedWAL) Truncate() error

Truncate truncates the WAL (used after successful snapshot)

func (*CompressedWAL) TruncateUpTo added in v0.5.0

func (w *CompressedWAL) TruncateUpTo(lsn uint64) error

TruncateUpTo is the compressed-backend rewrite. Entries are re-encoded (snappy is deterministic, checksums recomputed over the compressed bytes) because readAllLocked returns decompressed payloads.

type CompressedWALStats

type CompressedWALStats struct {
	TotalWrites       uint64
	BytesUncompressed uint64
	BytesCompressed   uint64
	CompressionRatio  float64 // e.g., 0.75 = 75% compression
	SpaceSavings      float64 // MB saved
}

CompressedWALStats holds compression statistics

type Entry

type Entry struct {
	LSN       uint64 // Log Sequence Number
	OpType    OpType
	Data      []byte
	Checksum  uint32
	Timestamp int64
}

Entry represents a single WAL entry

type FileRotator

type FileRotator struct {
	// contains filtered or unexported fields
}

FileRotator handles atomic file rotation for WAL files. It ensures safe file replacement with recovery on failure.

func NewFileRotator

func NewFileRotator(path string, bufferSize int) *FileRotator

NewFileRotator creates a new file rotator for the given path. bufferSize controls the bufio.Writer buffer size (0 = default).

func (*FileRotator) Close

func (fr *FileRotator) Close() error

Close flushes, syncs, and closes the file.

func (*FileRotator) File

func (fr *FileRotator) File() *os.File

File returns the underlying file handle.

func (*FileRotator) Flush

func (fr *FileRotator) Flush() error

Flush flushes the buffered writer.

func (*FileRotator) Open

func (fr *FileRotator) Open() error

Open opens or creates the file for appending.

func (*FileRotator) Rotate

func (fr *FileRotator) Rotate() error

Rotate atomically replaces the current file with a new empty file. This is used after a snapshot to start a fresh WAL. On success, the rotator points to the new file. On failure, the rotator attempts to recover to the original file.

func (*FileRotator) Sync

func (fr *FileRotator) Sync() error

Sync flushes the buffer and syncs the file to disk.

func (*FileRotator) Writer

func (fr *FileRotator) Writer() *bufio.Writer

Writer returns the buffered writer.

type OpType

type OpType uint8

OpType represents the type of operation in the WAL

const (
	OpCreateNode OpType = iota
	OpUpdateNode
	OpDeleteNode
	OpCreateEdge
	OpUpdateEdge
	OpDeleteEdge
	OpCreatePropertyIndex
	OpDropPropertyIndex
	// Appended after the original set so existing WAL files (which encode
	// OpType as a single byte) keep replaying their stored ops correctly —
	// never renumber the values above.
	OpCreateVectorIndex
	OpDropVectorIndex
)

type Pending

type Pending struct {
	// contains filtered or unexported fields
}

Pending represents an entry that has been enqueued into the batch buffer but is not yet durable. Wait blocks until the entry's batch has been flushed and fsynced.

Splitting enqueue from wait lets a caller release a higher-level lock (e.g. storage's gs.mu) AFTER the entry is enqueued but BEFORE blocking on the fsync. Concurrent writers can then enqueue into the same batch — the group-commit path. Holding the higher-level lock across the wait (as a plain Append does) makes the batch unable to fill beyond one entry, defeating batching entirely. See Track P item (1).

func (*Pending) Wait

func (p *Pending) Wait() error

Wait blocks until the enqueued entry's batch has been flushed and fsynced, returning the flush error (if any).

type SafeWriter

type SafeWriter struct {
	// contains filtered or unexported fields
}

SafeWrite writes data with automatic flush on error.

func NewSafeWriter

func NewSafeWriter(w *bufio.Writer) *SafeWriter

NewSafeWriter creates a new safe writer.

func (*SafeWriter) Flush

func (sw *SafeWriter) Flush() error

Flush flushes the underlying writer.

func (*SafeWriter) Write

func (sw *SafeWriter) Write(p []byte) (int, error)

Write writes data to the buffer.

func (*SafeWriter) WriteAndFlush

func (sw *SafeWriter) WriteAndFlush(p []byte) error

WriteAndFlush writes data and immediately flushes.

type WAL

type WAL struct {
	// contains filtered or unexported fields
}

WAL is a Write-Ahead Log for durability

func NewWAL

func NewWAL(dataDir string) (*WAL, error)

NewWAL creates a new Write-Ahead Log

func (*WAL) Append

func (w *WAL) Append(opType OpType, data []byte) (uint64, error)

Append appends a new entry to the WAL

func (*WAL) AppendBatch

func (w *WAL) AppendBatch(entries []*pendingEntry) error

AppendBatch writes multiple buffered entries with a single fsync. Used by BatchedWAL.flush; delegates to AppendBatchAtomic so the write/flush/fsync logic lives in one place.

func (*WAL) AppendBatchAtomic

func (w *WAL) AppendBatchAtomic(entries []BatchEntry) error

AppendBatchAtomic writes all entries then a SINGLE flush + fsync, so the whole batch is durable all-or-none at the fsync boundary — the primitive a transaction commit needs for atomic durability. Synchronous: returns once durable (no done-channel). On a mid-batch write error it rolls back the LSN counter and returns the error.

func (*WAL) Close

func (w *WAL) Close() error

Close closes the WAL

func (*WAL) GetCurrentLSN

func (w *WAL) GetCurrentLSN() uint64

GetCurrentLSN returns the current LSN

func (*WAL) ReadAll

func (w *WAL) ReadAll() ([]*Entry, error)

ReadAll reads all entries from the WAL Returns all valid entries read before any corruption is detected. Corruption is logged but does not return an error to allow partial recovery.

func (*WAL) Replay

func (w *WAL) Replay(handler func(*Entry) error) error

Replay replays WAL entries to reconstruct state

func (*WAL) Truncate

func (w *WAL) Truncate() error

Truncate truncates the WAL (used after snapshot)

func (*WAL) TruncateUpTo added in v0.5.0

func (w *WAL) TruncateUpTo(lsn uint64) error

TruncateUpTo rewrites the WAL keeping only entries with LSN > lsn — the checkpoint primitive behind GraphStorage.CompactWAL (M-1, WAL remanence: purge a deleted tenant's records mid-flight without losing concurrent writers' entries). The caller is responsible for having captured a snapshot whose state covers every entry with LSN ≤ lsn.

currentLSN is intentionally NOT reset (unlike Truncate): kept entries retain their LSNs and new appends must continue past them to stay monotonic within the file.

Crash safety reuses Truncate's pattern: the rewrite lands in wal.log.new, fsynced, then atomically renamed over wal.log. A crash before the rename leaves the original file intact (the purge simply didn't happen yet); a stale .new from such a crash is overwritten by the next rewrite.

type WALAppender

type WALAppender interface {
	// Append appends a new entry to the WAL.
	// Returns the LSN (Log Sequence Number) assigned to the entry.
	Append(opType OpType, data []byte) (uint64, error)
}

WALAppender is the interface for appending entries to a WAL. This interface can be used by packages that need to write to WAL without depending on the concrete implementation.

type WALManager

type WALManager interface {
	// Truncate removes all entries from the WAL.
	// Typically called after a successful snapshot.
	Truncate() error

	// Close flushes any buffered data and closes the WAL.
	Close() error

	// GetCurrentLSN returns the current Log Sequence Number.
	GetCurrentLSN() uint64
}

WALManager is the interface for WAL lifecycle management.

type WALReader

type WALReader interface {
	// Replay iterates through all WAL entries and calls the handler for each.
	// Used for recovery after restart.
	Replay(handler func(*Entry) error) error
}

WALReader is the interface for reading entries from a WAL.

type WriteAheadLog

type WriteAheadLog interface {
	WALAppender
	WALReader
	WALManager
}

WriteAheadLog is the complete interface for a Write-Ahead Log implementation. All WAL implementations (WAL, BatchedWAL, CompressedWAL) implement this interface.

Directories

Path Synopsis
Package apply provides the structured-write-operation primitive and the fail-closed tenant gate that audits its application against storage.
Package apply provides the structured-write-operation primitive and the fail-closed tenant gate that audits its application against storage.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL