walfs

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 22, 2026 License: MIT Imports: 20 Imported by: 0

README

walfs

This Has been extracted from the https://github.com/ankur-anand/unisondb

A high-performance Write-Ahead Log (WAL) implementation in Go using memory-mapped I/O (mmap), designed for both writing at scale and reading at scale.

Key Features:

  • Memory-mapped I/O for low-overhead random access
  • 8-byte aligned entries to support efficient page caching
  • Built-in corruption detection using CRC32 and trailer markers
  • Designed for fast recovery and streaming-based replication
  • Automatic segment rotation and cleanup policies
  • Reference-counted readers with safe concurrent access

Use Cases

walfs is ideal for systems requiring durable, high-performance sequential data storage:

  • Databases: Write-ahead logs, transaction logs, event sourcing, time-series storage
  • Distributed Systems: Replication, change data capture (CDC), message queues, consensus protocols (Raft/Paxos)
  • Streaming: Real-time data pipelines, log aggregation, commit logs (Kafka-like)
  • Recovery: Point-in-time recovery, disaster recovery, state machine replication

Installation

go get github.com/ankur-anand/walfs

Quick Start

package main

import (
    "fmt"
    "log"

    "github.com/ankur-anand/walfs"
)

func main() {
    // Create a new WAL
    wal, err := walfs.NewWALog("./data", ".wal")
    if err != nil {
        log.Fatal(err)
    }
    defer wal.Close()

    // Write some data
    data := []byte("Hello, WAL!")
    pos, err := wal.Write(data)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Written at: %s\n", pos)

    // Read the data back
    readData, err := wal.Read(pos)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Read: %s\n", readData)
}

Examples

Batch Writing

The batch API allows you to write multiple records in a single operation.

package main

import (
    "fmt"
    "log"

    "github.com/ankur-anand/walfs"
)

func main() {
    // Create a new WAL
    wal, err := walfs.NewWALog("./data", ".wal")
    if err != nil {
        log.Fatal(err)
    }
    defer wal.Close()

    // Prepare multiple records to write as a batch
    records := [][]byte{
        []byte("Transaction 1: User login"),
        []byte("Transaction 2: Update profile"),
        []byte("Transaction 3: Add to cart"),
        []byte("Transaction 4: Process payment"),
        []byte("Transaction 5: Send confirmation"),
    }

    // Write all records in a single batch operation
    positions, err := wal.WriteBatch(records)
    if err != nil {
        log.Fatal(err)
    }

    fmt.Printf("Successfully wrote %d records\n", len(positions))
    for i, pos := range positions {
        fmt.Printf("Record %d written at: %s\n", i+1, pos)
    }

    // Read back the written records
    for i, pos := range positions {
        data, err := wal.Read(pos)
        if err != nil {
            log.Printf("Failed to read record %d: %v", i+1, err)
            continue
        }
        fmt.Printf("Read record %d: %s\n", i+1, data)
    }
}
  • Automatic Segment Rotation: If the batch doesn't fit in the current segment, WriteBatch automatically handles rotation and continues writing to the new segment
Log Tailing (Continuous Reading)

You can continuously tail the WAL, similar to tail -f. It's useful for replication, streaming, or real-time processing scenarios.

package main

import (
    "context"
    "errors"
    "fmt"
    "io"
    "log"
    "time"

    "github.com/ankur-anand/walfs"
)

func main() {
    // Create or open existing WAL
    wal, err := walfs.NewWALog("./data", ".wal")
    if err != nil {
        log.Fatal(err)
    }
    defer wal.Close()

    // Start a goroutine to write data periodically
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    go func() {
        ticker := time.NewTicker(1 * time.Second)
        defer ticker.Stop()

        for {
            select {
            case <-ctx.Done():
                return
            case <-ticker.C:
                data := []byte(fmt.Sprintf("Log entry at %s", time.Now().Format(time.RFC3339)))
                pos, err := wal.Write(data)
                if err != nil {
                    log.Printf("Write error: %v", err)
                    continue
                }
                fmt.Printf("Written: %s (at %s)\n", data, pos)
            }
        }
    }()

    // Tail the log continuously
    reader := wal.NewReader()
    defer reader.Close()

    fmt.Println("Starting log tail...")
    for {
        data, pos, err := reader.Next()
        if err == io.EOF {
            // Reached end of sealed segments, wait for new data
            time.Sleep(100 * time.Millisecond)

            // Create new reader to pick up new segments
            reader.Close()
            reader = wal.NewReader()
            continue
        }
        if errors.Is(err, walfs.ErrNoNewData) {
            // No new data in active segment yet, wait a bit
            time.Sleep(100 * time.Millisecond)
            continue
        }
        if err != nil {
            log.Printf("Read error: %v", err)
            break
        }

        // Process the log entry
        fmt.Printf("Read: %s (from %s)\n", data, pos)
    }
}
Tailing from a Specific Position

You can also tail the WAL starting from a specific position, useful for resuming replication or processing from a known checkpoint.

package main

import (
    "errors"
    "fmt"
    "io"
    "log"
    "time"

    "github.com/ankur-anand/walfs"
)

func main() {
    // Create or open existing WAL
    wal, err := walfs.NewWALog("./data", ".wal")
    if err != nil {
        log.Fatal(err)
    }
    defer wal.Close()

    // Write some initial data
    for i := 0; i < 5; i++ {
        wal.Write([]byte(fmt.Sprintf("Initial entry %d", i)))
    }

    // Get a checkpoint position (e.g., saved from previous run)
    checkpointPos, _ := wal.Write([]byte("Checkpoint entry"))
    fmt.Printf("Checkpoint saved at: %s\n", checkpointPos)

    // Write more data after checkpoint
    for i := 0; i < 5; i++ {
        wal.Write([]byte(fmt.Sprintf("Post-checkpoint entry %d", i)))
    }

    // Resume tailing from the checkpoint position
    // Use NewReaderAfter to start AFTER the checkpoint (skip the checkpoint record itself)
    reader, err := wal.NewReaderAfter(checkpointPos)
    if err != nil {
        log.Fatal(err)
    }
    defer reader.Close()

    fmt.Printf("Tailing from position: %s\n", checkpointPos)

    // Continuous tail loop
    for {
        data, pos, err := reader.Next()
        if err == io.EOF {
            // Reached end of sealed segments
            time.Sleep(100 * time.Millisecond)

            // Get last read position and create new reader from there
            lastPos := reader.LastRecordPosition()
            reader.Close()

            reader, err = wal.NewReaderAfter(lastPos)
            if err != nil {
                log.Printf("Failed to create new reader: %v", err)
                break
            }
            continue
        }
        if errors.Is(err, walfs.ErrNoNewData) {
            // No new data yet, wait a bit
            time.Sleep(100 * time.Millisecond)
            continue
        }
        if err != nil {
            log.Printf("Read error: %v", err)
            break
        }

        // Process the log entry
        fmt.Printf("Read: %s (from %s)\n", data, pos)

        // Optionally save the position as new checkpoint
        // saveCheckpoint(pos)
    }
}

Benchmarks

Performance characteristics on Apple M2 Pro (tested with Go 1.24.0):

Write Performance (NoSync mode)
Data Size Throughput Latency Allocations
16B 252.90 MB/s 63.27 ns/op 0 allocs/op
1KB 2,590.33 MB/s 395.3 ns/op 0 allocs/op
32KB 3,511.94 MB/s 9.3 µs/op 0 allocs/op
64KB 3,952.91 MB/s 16.6 µs/op 0 allocs/op
512KB 4,270.20 MB/s 122.8 µs/op 0 allocs/op
Read Performance (NoSync mode)
Data Size Operation Throughput Latency Allocations
16B Direct Read 3,047.17 MB/s 5.25 ns/op 0 allocs/op
1KB Direct Read 195,042.14 MB/s 5.25 ns/op 0 allocs/op
1KB Sequential Read 100,798.86 MB/s 1.0 µs/op 4 allocs/op
512KB Direct Read 99,683,253.02 MB/s 5.26 ns/op 0 allocs/op
512KB Sequential Read 29,715,205.69 MB/s 547.0 ns/op 4 allocs/op
Concurrent Performance (NoSync mode)
Test Throughput Latency Allocations
Concurrent Write (2 goroutines) 1,336.37 MB/s 766.3 ns/op 0 allocs/op
Concurrent R/W (8 goroutines) 4,761.33 MB/s 215.1 ns/op 8 B/op
Sync Impact (SyncAfterWrite mode)
Data Size NoSync Latency SyncAfterWrite Latency Overhead
1KB 337.2 ns/op 44,687 ns/op ~132x
32KB 8.5 µs/op 65,278 ns/op ~7.6x
64KB 25.6 µs/op 86,395 ns/op ~3.4x

Notes:

  • Actual performance will vary based on hardware, OS, and workload patterns

Run benchmarks yourself:

go test -bench=. -benchmem -benchtime=3s

Architecture

WAL Structure

The Write-Ahead Log (WAL) consists of multiple segments. Each segment is an individual file that stores a sequential series of log records. When a segment reaches its maximum size (default 16MB), the WAL automatically rotates to a new segment.

WALog
├── Segment 1 (000000001.wal)
├── Segment 2 (000000002.wal)
├── Segment 3 (000000003.wal) [Active]
└── ...

The WALog manages:

  • Automatic segment creation and rotation
  • Concurrent read access across all segments
  • Segment lifecycle (active → sealed → cleanup)
  • Recovery from crashes by scanning segment files
Segment File Structure

Each segment is divided into two main regions:

  1. Segment Header (64 bytes) - Metadata about the segment
  2. Records - Sequential log entries, each consisting of header + data + trailer
+----------------------+-----------------------------+-------------+
|   Segment Header     |      Record 1               |  Record 2   |
|     (64 bytes)       |  Header + Data + Trailer    |     ...     |
+----------------------+-----------------------------+-------------+
Segment Header (Metadata Section)

The first 64 bytes contain segment metadata:

Offset Size Field Description
0 4 Magic Magic number (0x5557414C - "UWAL")
4 4 Version Metadata format version
8 8 CreatedAt Creation timestamp (nanoseconds)
16 8 LastModifiedAt Last modification timestamp (nanoseconds)
24 8 WriteOffset Offset where next chunk will be written
32 8 EntryCount Total number of chunks written
40 4 Flags Segment state flags (Active, Sealed)
44 12 Reserved Reserved for future use
56 4 CRC CRC32 checksum of first 56 bytes
60 4 Padding/Reserved Ensures 64-byte alignment
Record Format (Aligned)

Each record is written in its own 8-byte aligned frame. All components (header + data + trailer) are padded to the next multiple of 8 bytes.

Properties:

  • Each record is individually checksummed and terminates with a trailer marker
  • On recovery, scanning stops at the first invalid or torn record
  • The segment header includes a CRC for metadata validation

Record Layout:

Offset Size Field Description
0 4 bytes CRC CRC32 of [Length | Data]
4 4 bytes Length Size of the data payload in bytes
8 N bytes Data User payload
8 + N 8 bytes Trailer Canary marker (0xDEADBEEFFEEEDFACE)
... ≥0 bytes Padding Zero padding to align full frame to 8-byte boundary

Alignment Example:

For an 11-byte data payload:

  • Total = 8 (header) + 11 (data) + 8 (trailer) = 27 bytes
  • Final aligned frame = 32 bytes (with 5 bytes of padding)

License

This project is available as open source. Please check the repository for license details.

Acknowledgments

walfs draws inspiration from production-proven WAL implementations and incorporates lessons learned from real-world issues:

  • Trailer Marker Concept: Inspired by etcd issue #6191 - detecting torn writes with canary markers
  • Alignment Strategy: Influenced by BoltDB issue #548 - reducing partial writes across page boundaries

Documentation

Overview

Package walfs implements a Write-Ahead Log file system for durable, crash-recoverable storage.

Originally developed for unisondb, this package provides a general-purpose WAL that can be used by any database or system requiring durable append-only logging.

Index

Constants

View Source
const (
	StateOpen = iota
	StateClosing

	FlagActive uint32 = 1 << iota
	FlagSealed uint32 = 1 << 1
)

Variables

View Source
var (
	ErrClosed              = errors.New("the Segment file is closed")
	ErrInvalidCRC          = errors.New("invalid crc, the data may be corrupted")
	ErrCorruptHeader       = errors.New("corrupt record header, invalid length")
	ErrIncompleteChunk     = errors.New("incomplete or torn write detected at record trailer")
	ErrSegmentSealed       = errors.New("cannot write to sealed segment")
	ErrSegmentReaderClosed = errors.New("segment reader is closed")
	ErrNoNewData           = errors.New("no new data yet")
	ErrSegmentFull         = errors.New("segment is full, cannot write more records")
)
View Source
var (
	ErrSegmentNotFound    = errors.New("segment not found")
	ErrOffsetOutOfBounds  = errors.New("start offset is beyond segment size")
	ErrOffsetBeforeHeader = errors.New("start offset is within reserved segment header")
	ErrFsync              = errors.New("fsync error")
	ErrRecordTooLarge     = errors.New("record size exceeds maximum segment capacity")
)
View Source
var (
	// NilRecordPosition is a sentinel value representing an nil RecordPosition.
	NilRecordPosition = RecordPosition{}
)

Functions

func EncodeRecordPositionTo

func EncodeRecordPositionTo(pos RecordPosition, buf []byte) []byte

EncodeRecordPositionTo serializes a RecordPosition into the provided buffer. The buffer must be at least 12 bytes long. If it's shorter, a new 12-byte slice is allocated.

func IsActive

func IsActive(flags uint32) bool

func IsSealed

func IsSealed(flags uint32) bool

IsSealed returns if teh provided flag has sealed bit set.

func SegmentFileName

func SegmentFileName(dirPath string, extName string, id SegmentID) string

SegmentFileName returns the file name of a Segment file.

func SegmentIndexFileName

func SegmentIndexFileName(dirPath string, extName string, id SegmentID) string

SegmentIndexFileName returns the file name of the index for a segment.

func WithSegmentCustomMarker

func WithSegmentCustomMarker(marker uint32) func(*Segment)

WithSegmentCustomMarker sets the 4-byte marker written for new segments. Use WithSegmentCustomMarkerValidator to validate stored markers on open.

func WithSegmentCustomMarkerValidator

func WithSegmentCustomMarkerValidator(validator MarkerValidator) func(*Segment)

WithSegmentCustomMarkerValidator sets a validator for stored markers when opening segments.

func WithSegmentDirectorySyncer

func WithSegmentDirectorySyncer(syncer DirectorySyncer) func(*Segment)

WithSegmentDirectorySyncer sets the directory syncer used after destructive operations.

func WithSegmentSize

func WithSegmentSize(size int64) func(*Segment)

WithSegmentSize sets the size for the Segment.

func WithSyncOption

func WithSyncOption(opt MsyncOption) func(*Segment)

WithSyncOption sets the sync option for the Segment.

Types

type DeletionPredicate

type DeletionPredicate func(segID SegmentID) bool

DeletionPredicate is a function that determines if a segment ID is safe to delete.

type DirectorySyncFunc

type DirectorySyncFunc func(dir string) error

DirectorySyncFunc adapts a function to act as a DirectorySyncer.

func (DirectorySyncFunc) SyncDir

func (f DirectorySyncFunc) SyncDir(dir string) error

SyncDir implements DirectorySyncer.

type DirectorySyncer

type DirectorySyncer interface {
	SyncDir(dir string) error
}

DirectorySyncer syncs a directory path to stable storage.

type IndexEntry

type IndexEntry struct {
	Index uint64
	Pos   RecordPosition
}

IndexEntry represents an index to position mapping for batch operations.

type MarkerValidator

type MarkerValidator func(storedMarker uint32) error

type MsyncOption

type MsyncOption int
const (
	// MsyncNone skips msync after write.
	MsyncNone MsyncOption = iota

	// MsyncOnWrite calls msync (Flush) after every write.
	MsyncOnWrite
)

type NoopDecoder

type NoopDecoder struct{}

func (NoopDecoder) Decode

func (d NoopDecoder) Decode(data []byte) ([]byte, error)

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader represents a high-level sequential reader over a WALog. It reads across all available WAL segments in order, automatically advancing from one segment to the next. Reader is not safe for concurrent use.

func (*Reader) Close

func (r *Reader) Close()

Close closes all segment readers to release their references. IMPORTANT: This method MUST be called after the Reader is no longer needed.

func (*Reader) LastRecordPosition

func (r *Reader) LastRecordPosition() RecordPosition

LastRecordPosition returns the RecordPosition of the last successfully read entry.

func (*Reader) Next

func (r *Reader) Next() ([]byte, RecordPosition, error)

Next returns the next available WAL record data and its current position. IMPORTANT: The returned `[]byte` is a slice of a memory-mapped file, so data must not be retained or modified. If the data needs to be used beyond the lifetime of the segment, the caller MUST copy it. When readerCommitCheck is enabled on the WALog, returns ErrNoNewData if the reader has reached the committed boundary.

func (*Reader) SeekNext

func (r *Reader) SeekNext() error

SeekNext advances the reader by one record, discarding the data.

type ReaderOption

type ReaderOption func(*Reader)

ReaderOption configures a Reader.

func WithDecoder

func WithDecoder(decoder RecordDecoder) ReaderOption

WithDecoder sets a custom decoder for the reader. When set, NextRecord() will use this decoder to convert raw bytes to decoded data.

type RecordDecoder

type RecordDecoder interface {
	// Decode transforms raw WAL bytes into the actual record payload.
	// The returned bytes may reference the input (zero-copy) or be newly allocated.
	Decode(data []byte) ([]byte, error)
}

- Wrapped: WAL bytes contain a wrapper (e.g., Raft log) that must be unwrapped.

type RecordPosition

type RecordPosition struct {
	SegmentID SegmentID
	Offset    int64
}

RecordPosition is the logical location of a record entry within a WAL Segment.

func DecodeRecordPosition

func DecodeRecordPosition(data []byte) (RecordPosition, error)

DecodeRecordPosition deserializes a byte slice into a RecordPosition.

func (RecordPosition) Encode

func (rp RecordPosition) Encode() []byte

Encode serializes the RecordPosition into a fixed-length byte slice.

func (RecordPosition) IsZero

func (rp RecordPosition) IsZero() bool

IsZero returns true if the RecordPosition is uninitialized, meaning both SegmentID and Offset are zero.

func (RecordPosition) String

func (rp RecordPosition) String() string

type Segment

type Segment struct {
	// contains filtered or unexported fields
}

Segment represents a single WAL segment backed by a memory-mapped file.

func OpenSegmentFile

func OpenSegmentFile(dirPath, extName string, id uint32, opts ...func(*Segment)) (*Segment, error)

OpenSegmentFile opens an existing segment file or create a new one if not present. If SegmentFile is sealed it doesn't scan its content while opening.

func (*Segment) ClearIndexFromMemory

func (seg *Segment) ClearIndexFromMemory()

ClearIndexFromMemory releases the in-memory index entries to free memory. This can be called after the index has been copied to an external data structure. Note: After calling this, IndexEntries() will return an empty slice. Only sealed segments can be cleared; active segments are ignored.

func (*Segment) Close

func (seg *Segment) Close() error

Close gracefully shuts down the segment by waiting for all active readers to complete. It unmap the segment file and closes file descriptor.

func (*Segment) FirstLogIndex

func (seg *Segment) FirstLogIndex() uint64

func (*Segment) GetEntryCount

func (seg *Segment) GetEntryCount() int64

GetEntryCount returns the total entry count in segment.

func (*Segment) GetFlags

func (seg *Segment) GetFlags() uint32

GetFlags returns the flags stored in segment header.

func (*Segment) GetLastModifiedAt

func (seg *Segment) GetLastModifiedAt() int64

GetLastModifiedAt returns the last modified time of the segment.

func (*Segment) GetSegmentSize

func (seg *Segment) GetSegmentSize() int64

func (*Segment) HasActiveReaders

func (seg *Segment) HasActiveReaders() bool

HasActiveReaders returns true if there are any currently active readers on the segment.

func (*Segment) ID

func (seg *Segment) ID() SegmentID

ID returns the unique number of the Segment.

func (*Segment) IndexEntries

func (seg *Segment) IndexEntries() []SegmentIndexEntry

IndexEntries returns a copy of the index metadata for this segment.

func (*Segment) IsInMemorySealed

func (seg *Segment) IsInMemorySealed() bool

IsInMemorySealed returns true if the segment has been marked as sealed in memory.

func (*Segment) IsSealed

func (seg *Segment) IsSealed() bool

IsSealed returns true if the segment is sealed (on-disk flag).

func (*Segment) MSync

func (seg *Segment) MSync() error

func (*Segment) MarkForDeletion

func (seg *Segment) MarkForDeletion()

MarkForDeletion marks the segment as candidate for deletion. If no active readers, it will immediately call cleanup. Otherwise, cleanup will be deferred until the last reference is released.

func (*Segment) MarkSealedInMemory

func (seg *Segment) MarkSealedInMemory()

MarkSealedInMemory marks the segment as sealed in memory.

func (*Segment) NewReader

func (seg *Segment) NewReader() *SegmentReader

NewReader creates a new SegmentReader for reading from the segment.

func (*Segment) Read

func (seg *Segment) Read(offset int64) ([]byte, RecordPosition, error)

Read reads the record data at the specified offset within the segment. IMP: Don't retain any data. This method returns a slice of the mmap'd file content corresponding to the record payload. so slice becomes invalid immediately after the segment is closed or unmapped.

func (*Segment) Remove

func (seg *Segment) Remove() error

Remove closes the segment and removes its underlying files (segment and index).

func (*Segment) SealSegment

func (seg *Segment) SealSegment() error

SealSegment seals the given segment.

func (*Segment) Sync

func (seg *Segment) Sync() error

Sync Msync the Memory mapped file and the FSync the underlying file.

func (*Segment) TruncateTo

func (seg *Segment) TruncateTo(logIndex uint64) error

TruncateTo truncates the segment to the specified log index. All entries after the given log index will be discarded. If the log index is not found in this segment, it returns an error.

func (*Segment) WaitForIndexFlush

func (seg *Segment) WaitForIndexFlush()

WaitForIndexFlush blocks until any pending index flush operations complete.

func (*Segment) WillExceed

func (seg *Segment) WillExceed(dataSize int) bool

WillExceed returns true if writing a record of the given dataSize would overflow the segment's allocated (memory-mapped) size.

func (*Segment) Write

func (seg *Segment) Write(data []byte, logIndex uint64) (RecordPosition, error)

Write writes the provided slice of bytes to the open mmap file. It appends data to the segment and returns the offset where the record was written in the given segment.

func (*Segment) WriteBatch

func (seg *Segment) WriteBatch(records [][]byte, logIndexes []uint64) ([]RecordPosition, int, error)

WriteBatch writes multiple records to the segment in a single operation. Returns a slice of RecordPositions for successfully written records and the number written. If the segment fills up mid-batch, it returns positions for records that fit, the count of records written, and ErrSegmentFull. Callers should retry remaining records in a new segment. nolint: funlen

func (*Segment) WriteOffset

func (seg *Segment) WriteOffset() int64

WriteOffset returns the current write offset of the segment.

type SegmentHeader

type SegmentHeader struct {
	// at 0
	Magic uint32
	// at 4
	Version uint32
	// at 8
	CreatedAt int64
	// at 16
	LastModifiedAt int64
	// at 24
	WriteOffset int64
	// at 32
	EntryCount int64
	// at 40
	Flags uint32

	// at 44 -51
	FirstLogIndex uint64

	// at 56 byte: - CRC32 of first 56 bytes
	CRC uint32
	// contains filtered or unexported fields
}

SegmentHeader encodes all the necessary information about the segment file at the top of the file. Its Size is 64 byte once encoded.

type SegmentID

type SegmentID = uint32

type SegmentIndexEntry

type SegmentIndexEntry struct {
	SegmentID SegmentID
	Offset    int64
	Length    uint32
}

SegmentIndexEntry exposes a record's physical location within a WAL segment.

type SegmentReader

type SegmentReader struct {
	// contains filtered or unexported fields
}

SegmentReader is an iterator over records in a WAL segment. It maintains its own read offset and provides safe iteration over a Segment.

func (*SegmentReader) Close

func (r *SegmentReader) Close()

Close closes the SegmentReader and decrements the segment's reference count.

func (*SegmentReader) LastRecordPosition

func (r *SegmentReader) LastRecordPosition() RecordPosition

func (*SegmentReader) Next

func (r *SegmentReader) Next() ([]byte, RecordPosition, error)

Next reads the next record from the segment and also advances the read position. It returns the data, the record's position, or an error. Returns io.EOF if the segment is sealed and all data has been read. Returns ErrNoNewData if unsealed and no new data is available yet.

type ShardedIndex

type ShardedIndex struct {
	// contains filtered or unexported fields
}

ShardedIndex is a concurrent-safe sharded map for Raft index → WAL position.

func NewShardedIndex

func NewShardedIndex() *ShardedIndex

NewShardedIndex creates a sharded index with the specified number of shards.

func (*ShardedIndex) Clear

func (s *ShardedIndex) Clear()

Clear removes all entries from all shards.

func (*ShardedIndex) Delete

func (s *ShardedIndex) Delete(index uint64)

Delete removes an index from the map.

func (*ShardedIndex) DeleteRange

func (s *ShardedIndex) DeleteRange(min, max uint64) int64

DeleteRange removes all indices in [min, max] range.

func (*ShardedIndex) Get

func (s *ShardedIndex) Get(index uint64) (RecordPosition, bool)

Get retrieves the position for an index. Returns the position and true if found, zero value and false otherwise.

func (*ShardedIndex) GetFirstLast

func (s *ShardedIndex) GetFirstLast() (first, last uint64, ok bool)

GetFirstLast returns the first and last indices in the index. Returns (0, 0, false) if the index is empty.

func (*ShardedIndex) IsCurrentEntry

func (s *ShardedIndex) IsCurrentEntry(index uint64, segmentID SegmentID, offset int64) bool

IsCurrentEntry checks if the given position is the current one for an index. Returns true if the index exists and the position matches exactly.

func (*ShardedIndex) Len

func (s *ShardedIndex) Len() int64

Len returns the total number of entries across all shards.

func (*ShardedIndex) LenSlow

func (s *ShardedIndex) LenSlow() int64

LenSlow returns the total number of entries by summing all shards.

func (*ShardedIndex) Range

func (s *ShardedIndex) Range(fn func(index uint64, pos RecordPosition) bool)

Range iterates over all entries and calls fn for each. If fn returns false, iteration stops.

func (*ShardedIndex) Set

func (s *ShardedIndex) Set(index uint64, pos RecordPosition)

Set stores the position for an index.

func (*ShardedIndex) SetBatch

func (s *ShardedIndex) SetBatch(entries []IndexEntry)

SetBatch stores multiple index→position mappings.

type WALog

type WALog struct {
	// contains filtered or unexported fields
}

WALog manages the lifecycle of each individual segments, including creation, rotation, recovery, and read/write operations.

func NewWALog

func NewWALog(dir string, ext string, opts ...WALogOptions) (*WALog, error)

NewWALog returns an initialized WALog that manages the segments in the provided dir with the given ext.

func (*WALog) BackupLastRotatedSegment

func (wl *WALog) BackupLastRotatedSegment(backupDir string) (string, error)

BackupLastRotatedSegment copies the most recently sealed WAL segment into backupDir and returns the backup path.

func (*WALog) BackupSegmentsAfter

func (wl *WALog) BackupSegmentsAfter(afterID SegmentID, backupDir string) (map[SegmentID]string, error)

BackupSegmentsAfter copies every sealed segment with ID > afterID into backupDir. Returns a map of SegmentID to the backup file path.

func (*WALog) BytesPerSyncCallCount

func (wl *WALog) BytesPerSyncCallCount() int64

BytesPerSyncCallCount how many times this was called on current active segment.

func (*WALog) CleanupStalePendingSegments

func (wl *WALog) CleanupStalePendingSegments()

CleanupStalePendingSegments scans pendingDeletion and segments maps. If a segment's file no longer exists on disk, it removes those entries from both maps.

func (*WALog) Close

func (wl *WALog) Close() error

Close gracefully shuts down all segments managed by the WALog.

func (*WALog) Commit

func (wl *WALog) Commit(pos RecordPosition)

Commit advances the committed position to the given position. When readerCommitCheck is enabled, readers cannot advance beyond this position. In Raft mode, call this after Raft consensus confirms the log entry. This method is safe for concurrent use.

func (*WALog) CommittedPosition

func (wl *WALog) CommittedPosition() RecordPosition

CommittedPosition returns the current committed position. Returns NilRecordPosition if no position has been committed yet.

func (*WALog) Current

func (wl *WALog) Current() *Segment

Current returns a pointer to the currently active WAL segment.

func (*WALog) LogIndex

func (wl *WALog) LogIndex() *ShardedIndex

LogIndex returns the shared sharded index mapping log index to record position.

func (*WALog) MarkSegmentsForDeletion

func (wl *WALog) MarkSegmentsForDeletion()

MarkSegmentsForDeletion identifies and queues WAL segments for deletion based on their age and segment count retention constraints.

func (*WALog) NewReader

func (wl *WALog) NewReader(opts ...ReaderOption) *Reader

NewReader returns a new Reader that sequentially reads all segments in the WALog, starting from the beginning (lowest SegmentID).

func (*WALog) NewReaderAfter

func (wl *WALog) NewReaderAfter(pos RecordPosition, opts ...ReaderOption) (*Reader, error)

NewReaderAfter returns a reader that starts after the given RecordPosition. It first creates a reader from that position, then skips one record.

func (*WALog) NewReaderWithStart

func (wl *WALog) NewReaderWithStart(pos RecordPosition, opts ...ReaderOption) (*Reader, error)

NewReaderWithStart returns a new Reader that begins reading from the specified position. If SegmentID is 0, the reader will begin from the very start of the WAL.

func (*WALog) PositionForIndex

func (wl *WALog) PositionForIndex(idx uint64) (RecordPosition, error)

PositionForIndex returns the RecordPosition for the given log index.

func (*WALog) QueuedSegmentsForDeletion

func (wl *WALog) QueuedSegmentsForDeletion() map[SegmentID]*Segment

func (*WALog) Read

func (wl *WALog) Read(pos RecordPosition) ([]byte, error)

Read returns the data from the provided record position if found. IMPORTANT: The returned `[]byte` is a slice of a memory-mapped file, so data must not be retained or modified. If the data needs to be used beyond the lifetime of the segment, the caller MUST copy it.

func (*WALog) RotateSegment

func (wl *WALog) RotateSegment() error

RotateSegment rotates the current segment and create a new active segment.

func (*WALog) SegmentRotatedCount

func (wl *WALog) SegmentRotatedCount() int64

func (*WALog) Segments

func (wl *WALog) Segments() map[SegmentID]*Segment

Segments returns a snapshot (shallow) copy of all active segments managed by the WAL.

func (*WALog) StartPendingSegmentCleaner

func (wl *WALog) StartPendingSegmentCleaner(ctx context.Context,
	interval time.Duration,
	canDeleteFn func(segID SegmentID) bool,
)

StartPendingSegmentCleaner starts a background goroutine that periodically inspects segments marked for pending deletion and attempts to safely remove them. If there are any current reader it will mark it for deletion.

func (*WALog) Sync

func (wl *WALog) Sync() error

Sync flushes the current active segment's data to disk.

func (*WALog) Truncate

func (wl *WALog) Truncate(logIndex uint64) error

Truncate truncates the WAL to the specified log index. All entries after the given log index will be discarded. If logIndex is 0, all segments are deleted and the WAL is reset.

Users must ensure that reader creation and advancement are not interfering with the truncation logic. Active readers on segments that need to be deleted will cause the Truncate operation to fail.

func (*WALog) Write

func (wl *WALog) Write(data []byte, logIndex uint64) (RecordPosition, error)

Write appends the given data as a new record to the active segment. It returns RecordPosition indicating where the data was written.

func (*WALog) WriteBatch

func (wl *WALog) WriteBatch(records [][]byte, logIndexes []uint64) ([]RecordPosition, error)

WriteBatch appends multiple records to the active segment in a single batched operation. If the batch cannot fit entirely in the current segment, it handles automatic rotation: Returns a slice of RecordPositions for all successfully written records.

type WALogOptions

type WALogOptions func(*WALog)

func WithAutoCleanupPolicy

func WithAutoCleanupPolicy(maxAge time.Duration, minSegments, maxSegments int, enable bool) WALogOptions

WithAutoCleanupPolicy configures the automatic segment cleanup policy for the WAL. maxAge: Segments older than this duration are eligible for deletion. minSegments: Minimum number of WAL segments to always retain, regardless of age. maxSegments: If the total number of segments exceeds this limit, older segments will be deleted irrespective of its age.

func WithBytesPerSync

func WithBytesPerSync(bytes int64) WALogOptions

WithBytesPerSync sets the threshold in bytes after which a msync is triggered. Useful for batching writes. 0 disable this feature.

func WithClearIndexOnFlush

func WithClearIndexOnFlush() WALogOptions

WithClearIndexOnFlush enables clearing segment's in-memory index after it's flushed to disk. This is useful when an external index is maintained.

func WithCustomMarker

func WithCustomMarker(marker uint32) WALogOptions

WithCustomMarker sets the 4-byte marker written to new segments.

func WithCustomMarkerValidator

func WithCustomMarkerValidator(validator MarkerValidator) WALogOptions

WithCustomMarkerValidator sets a validator for stored markers on open.

func WithDirectorySyncer

func WithDirectorySyncer(syncer DirectorySyncer) WALogOptions

WithDirectorySyncer overrides the directory syncer used for new segment files.

func WithMSyncEveryWrite

func WithMSyncEveryWrite(enabled bool) WALogOptions

WithMSyncEveryWrite enables msync() after every write operation.

func WithMaxSegmentSize

func WithMaxSegmentSize(size int64) WALogOptions

WithMaxSegmentSize options sets the MaxSize of the Segment file.

func WithOnSegmentRotated

func WithOnSegmentRotated(fn func()) WALogOptions

WithOnSegmentRotated registers a fn callback function that will be called immediately after a WAL segment is rotated.

func WithReaderCommitCheck

func WithReaderCommitCheck() WALogOptions

WithReaderCommitCheck enables commit offset checking for readers. When enabled, readers will check against the committed position before advancing. If the reader's current position is at or beyond the committed position, Next() returns ErrNoNewData without advancing. Use Commit() to advance the committed position. This is used in Raft mode where writes are not visible until committed.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL