Documentation
¶
Index ¶
- func EnsureDir(path string) error
- func FileExists(path string) bool
- func FileSize(path string) (int64, error)
- type BatchEntry
- type BatchedWAL
- func (bw *BatchedWAL) Append(opType OpType, data []byte) (uint64, error)
- func (bw *BatchedWAL) AppendBatchAtomic(entries []BatchEntry) error
- func (bw *BatchedWAL) CheckpointLSN() uint64
- func (bw *BatchedWAL) Close() error
- func (bw *BatchedWAL) Enqueue(opType OpType, data []byte) *Pending
- func (bw *BatchedWAL) GetCurrentLSN() uint64
- func (bw *BatchedWAL) Replay(handler func(*Entry) error) error
- func (bw *BatchedWAL) Truncate() error
- func (bw *BatchedWAL) TruncateUpTo(lsn uint64) error
- type CompressedWAL
- func (w *CompressedWAL) Append(opType OpType, data []byte) (uint64, error)
- func (w *CompressedWAL) Close() error
- func (w *CompressedWAL) Flush() error
- func (w *CompressedWAL) GetCurrentLSN() uint64
- func (w *CompressedWAL) GetStatistics() CompressedWALStats
- func (w *CompressedWAL) ReadAll() ([]*Entry, error)
- func (w *CompressedWAL) Replay(handler func(*Entry) error) error
- func (w *CompressedWAL) Truncate() error
- func (w *CompressedWAL) TruncateUpTo(lsn uint64) error
- type CompressedWALStats
- type Entry
- type FileRotator
- type OpType
- type Pending
- type SafeWriter
- type WAL
- func (w *WAL) Append(opType OpType, data []byte) (uint64, error)
- func (w *WAL) AppendBatch(entries []*pendingEntry) error
- func (w *WAL) AppendBatchAtomic(entries []BatchEntry) error
- func (w *WAL) Close() error
- func (w *WAL) GetCurrentLSN() uint64
- func (w *WAL) ReadAll() ([]*Entry, error)
- func (w *WAL) Replay(handler func(*Entry) error) error
- func (w *WAL) Truncate() error
- func (w *WAL) TruncateUpTo(lsn uint64) error
- type WALAppender
- type WALManager
- type WALReader
- type WriteAheadLog
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type BatchEntry ¶
BatchEntry is one (opType, data) pair for an atomic batch write. It is the exported input to AppendBatchAtomic — the storage layer's transaction-commit path builds these directly (the internal pendingEntry carries a done-channel the synchronous atomic path does not need).
type BatchedWAL ¶
type BatchedWAL struct {
// contains filtered or unexported fields
}
BatchedWAL wraps WAL with batching capabilities for better write performance
func NewBatchedWAL ¶
NewBatchedWAL creates a new batched WAL
func (*BatchedWAL) Append ¶
func (bw *BatchedWAL) Append(opType OpType, data []byte) (uint64, error)
Append enqueues an entry and blocks until it is durable. Equivalent to Enqueue followed by Wait; retained for callers that don't need to release a lock between the two.
func (*BatchedWAL) AppendBatchAtomic ¶
func (bw *BatchedWAL) AppendBatchAtomic(entries []BatchEntry) error
AppendBatchAtomic writes a batch of entries to the underlying WAL with a single fsync (all-or-none at the fsync boundary), bypassing the background- flush queue. The underlying WAL serializes this against the background flusher via its own mutex, so it is safe to call concurrently with ticker flushes. Used by Transaction.Commit for atomic commit durability.
func (*BatchedWAL) CheckpointLSN ¶ added in v0.5.0
func (bw *BatchedWAL) CheckpointLSN() uint64
CheckpointLSN drains every entry enqueued before the call — including any batch the background flusher has taken ownership of but not yet LSN-assigned — and returns the boundary LSN: all prior enqueues have LSN ≤ the returned value; anything enqueued after gets LSN > it (provided the caller serializes enqueuers against this call, as GraphStorage does via gs.mu). This is the batched-WAL boundary capture for the M-1 snapshot+TruncateUpTo checkpoint; a bare GetCurrentLSN would miss in-flight batches.
func (*BatchedWAL) Enqueue ¶
func (bw *BatchedWAL) Enqueue(opType OpType, data []byte) *Pending
Enqueue appends an entry to the batch buffer and returns a Pending handle WITHOUT waiting for durability. The caller MUST call Wait() on the returned handle before treating the write as durable. Entries become durable in enqueue order (batches flush FIFO), so a caller that enqueues under a lock preserves WAL order even after releasing that lock before Wait().
func (*BatchedWAL) GetCurrentLSN ¶
func (bw *BatchedWAL) GetCurrentLSN() uint64
GetCurrentLSN returns the current LSN
func (*BatchedWAL) Replay ¶
func (bw *BatchedWAL) Replay(handler func(*Entry) error) error
Replay replays WAL entries
func (*BatchedWAL) TruncateUpTo ¶ added in v0.5.0
func (bw *BatchedWAL) TruncateUpTo(lsn uint64) error
TruncateUpTo flushes pending entries, then delegates to the underlying WAL. Interleaving with a concurrent background flush is safe either way: AppendBatch holds the WAL's mutex for the whole batch, so an in-flight batch lands wholly before the rewrite (and is copied, its LSNs being > the checkpoint boundary) or wholly after (appended to the rewritten file).
type CompressedWAL ¶
type CompressedWAL struct {
// contains filtered or unexported fields
}
CompressedWAL is a Write-Ahead Log with snappy compression
func NewCompressedWAL ¶
func NewCompressedWAL(dataDir string) (*CompressedWAL, error)
NewCompressedWAL creates a new compressed Write-Ahead Log
func (*CompressedWAL) Append ¶
func (w *CompressedWAL) Append(opType OpType, data []byte) (uint64, error)
Append appends a new entry to the compressed WAL
func (*CompressedWAL) GetCurrentLSN ¶
func (w *CompressedWAL) GetCurrentLSN() uint64
GetCurrentLSN returns the current LSN
func (*CompressedWAL) GetStatistics ¶
func (w *CompressedWAL) GetStatistics() CompressedWALStats
GetStatistics returns compression statistics
func (*CompressedWAL) ReadAll ¶
func (w *CompressedWAL) ReadAll() ([]*Entry, error)
ReadAll reads all entries from the WAL (decompressing data)
func (*CompressedWAL) Replay ¶
func (w *CompressedWAL) Replay(handler func(*Entry) error) error
Replay iterates through all WAL entries and calls the handler for each. This is used for recovery after restart.
func (*CompressedWAL) Truncate ¶
func (w *CompressedWAL) Truncate() error
Truncate truncates the WAL (used after successful snapshot)
func (*CompressedWAL) TruncateUpTo ¶ added in v0.5.0
func (w *CompressedWAL) TruncateUpTo(lsn uint64) error
TruncateUpTo is the compressed-backend rewrite. Entries are re-encoded (snappy is deterministic, checksums recomputed over the compressed bytes) because readAllLocked returns decompressed payloads.
type CompressedWALStats ¶
type CompressedWALStats struct {
TotalWrites uint64
BytesUncompressed uint64
BytesCompressed uint64
CompressionRatio float64 // e.g., 0.75 = 75% compression
SpaceSavings float64 // MB saved
}
CompressedWALStats holds compression statistics
type Entry ¶
type Entry struct {
LSN uint64 // Log Sequence Number
OpType OpType
Data []byte
Checksum uint32
Timestamp int64
}
Entry represents a single WAL entry
type FileRotator ¶
type FileRotator struct {
// contains filtered or unexported fields
}
FileRotator handles atomic file rotation for WAL files. It ensures safe file replacement with recovery on failure.
func NewFileRotator ¶
func NewFileRotator(path string, bufferSize int) *FileRotator
NewFileRotator creates a new file rotator for the given path. bufferSize controls the bufio.Writer buffer size (0 = default).
func (*FileRotator) Close ¶
func (fr *FileRotator) Close() error
Close flushes, syncs, and closes the file.
func (*FileRotator) File ¶
func (fr *FileRotator) File() *os.File
File returns the underlying file handle.
func (*FileRotator) Open ¶
func (fr *FileRotator) Open() error
Open opens or creates the file for appending.
func (*FileRotator) Rotate ¶
func (fr *FileRotator) Rotate() error
Rotate atomically replaces the current file with a new empty file. This is used after a snapshot to start a fresh WAL. On success, the rotator points to the new file. On failure, the rotator attempts to recover to the original file.
func (*FileRotator) Sync ¶
func (fr *FileRotator) Sync() error
Sync flushes the buffer and syncs the file to disk.
func (*FileRotator) Writer ¶
func (fr *FileRotator) Writer() *bufio.Writer
Writer returns the buffered writer.
type OpType ¶
type OpType uint8
OpType represents the type of operation in the WAL
const ( OpCreateNode OpType = iota OpUpdateNode OpDeleteNode OpCreateEdge OpUpdateEdge OpDeleteEdge OpCreatePropertyIndex OpDropPropertyIndex // Appended after the original set so existing WAL files (which encode // OpType as a single byte) keep replaying their stored ops correctly — // never renumber the values above. OpCreateVectorIndex OpDropVectorIndex )
type Pending ¶
type Pending struct {
// contains filtered or unexported fields
}
Pending represents an entry that has been enqueued into the batch buffer but is not yet durable. Wait blocks until the entry's batch has been flushed and fsynced.
Splitting enqueue from wait lets a caller release a higher-level lock (e.g. storage's gs.mu) AFTER the entry is enqueued but BEFORE blocking on the fsync. Concurrent writers can then enqueue into the same batch — the group-commit path. Holding the higher-level lock across the wait (as a plain Append does) makes the batch unable to fill beyond one entry, defeating batching entirely. See Track P item (1).
type SafeWriter ¶
type SafeWriter struct {
// contains filtered or unexported fields
}
SafeWrite writes data with automatic flush on error.
func NewSafeWriter ¶
func NewSafeWriter(w *bufio.Writer) *SafeWriter
NewSafeWriter creates a new safe writer.
func (*SafeWriter) Write ¶
func (sw *SafeWriter) Write(p []byte) (int, error)
Write writes data to the buffer.
func (*SafeWriter) WriteAndFlush ¶
func (sw *SafeWriter) WriteAndFlush(p []byte) error
WriteAndFlush writes data and immediately flushes.
type WAL ¶
type WAL struct {
// contains filtered or unexported fields
}
WAL is a Write-Ahead Log for durability
func (*WAL) AppendBatch ¶
AppendBatch writes multiple buffered entries with a single fsync. Used by BatchedWAL.flush; delegates to AppendBatchAtomic so the write/flush/fsync logic lives in one place.
func (*WAL) AppendBatchAtomic ¶
func (w *WAL) AppendBatchAtomic(entries []BatchEntry) error
AppendBatchAtomic writes all entries then a SINGLE flush + fsync, so the whole batch is durable all-or-none at the fsync boundary — the primitive a transaction commit needs for atomic durability. Synchronous: returns once durable (no done-channel). On a mid-batch write error it rolls back the LSN counter and returns the error.
func (*WAL) GetCurrentLSN ¶
GetCurrentLSN returns the current LSN
func (*WAL) ReadAll ¶
ReadAll reads all entries from the WAL Returns all valid entries read before any corruption is detected. Corruption is logged but does not return an error to allow partial recovery.
func (*WAL) TruncateUpTo ¶ added in v0.5.0
TruncateUpTo rewrites the WAL keeping only entries with LSN > lsn — the checkpoint primitive behind GraphStorage.CompactWAL (M-1, WAL remanence: purge a deleted tenant's records mid-flight without losing concurrent writers' entries). The caller is responsible for having captured a snapshot whose state covers every entry with LSN ≤ lsn.
currentLSN is intentionally NOT reset (unlike Truncate): kept entries retain their LSNs and new appends must continue past them to stay monotonic within the file.
Crash safety reuses Truncate's pattern: the rewrite lands in wal.log.new, fsynced, then atomically renamed over wal.log. A crash before the rename leaves the original file intact (the purge simply didn't happen yet); a stale .new from such a crash is overwritten by the next rewrite.
type WALAppender ¶
type WALAppender interface {
// Append appends a new entry to the WAL.
// Returns the LSN (Log Sequence Number) assigned to the entry.
Append(opType OpType, data []byte) (uint64, error)
}
WALAppender is the interface for appending entries to a WAL. This interface can be used by packages that need to write to WAL without depending on the concrete implementation.
type WALManager ¶
type WALManager interface {
// Truncate removes all entries from the WAL.
// Typically called after a successful snapshot.
Truncate() error
// Close flushes any buffered data and closes the WAL.
Close() error
// GetCurrentLSN returns the current Log Sequence Number.
GetCurrentLSN() uint64
}
WALManager is the interface for WAL lifecycle management.
type WALReader ¶
type WALReader interface {
// Replay iterates through all WAL entries and calls the handler for each.
// Used for recovery after restart.
Replay(handler func(*Entry) error) error
}
WALReader is the interface for reading entries from a WAL.
type WriteAheadLog ¶
type WriteAheadLog interface {
WALAppender
WALReader
WALManager
}
WriteAheadLog is the complete interface for a Write-Ahead Log implementation. All WAL implementations (WAL, BatchedWAL, CompressedWAL) implement this interface.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package apply provides the structured-write-operation primitive and the fail-closed tenant gate that audits its application against storage.
|
Package apply provides the structured-write-operation primitive and the fail-closed tenant gate that audits its application against storage. |