walrus

package module
v0.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2026 License: MIT Imports: 19 Imported by: 0

README

Walrus - Write-Ahead Log (WAL) Implementation


Walrus Logo

Go Version License Go Report Card GitHub Stars

Buy Me A Coffee

Walrus is a high-performance Write-Ahead Log (WAL) implementation in Go with ACID transactions, zero-copy reads, and segment-based architecture.

Features

  • ACID Transactions - Atomic commits with timeout and rollback
  • Zero-Copy Reads - Memory-mapped files for high performance
  • Segment-Based - Automatic rotation and cleanup
  • Thread-Safe - Safe for concurrent access
  • Batch Operations - High throughput batch writing
  • Context Support - Timeout and cancellation support
  • Custom Logging - Pluggable logger interface

Installation

go get github.com/l00pss/walrus

Quick Start

config := walrus.DefaultConfig()
wal := walrus.NewWAL("./wal_data", config).Unwrap()
defer wal.Close()

entry := walrus.Entry{
    Data:      []byte("Hello World!"),
    Term:      1,
    Timestamp: time.Now(),
}

index := wal.Append(entry).Unwrap()
readEntry := wal.Get(index).Unwrap()

Transactions

txID := wal.BeginTransaction(30 * time.Second).Unwrap()

wal.AddToTransaction(txID, entry1)
wal.AddToTransaction(txID, entry2)

indices := wal.CommitTransaction(txID).Unwrap()
// Or rollback: wal.RollbackTransaction(txID)

Batch Operations

entries := []walrus.Entry{
    {Data: []byte("Entry 1"), Term: 1, Timestamp: time.Now()},
    {Data: []byte("Entry 2"), Term: 1, Timestamp: time.Now()},
}

indices := wal.WriteBatch(entries).Unwrap()

Context Support

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

index := wal.AppendWithContext(ctx, entry).Unwrap()

Configuration

config := walrus.Config{
    SegmentSize:     64 * 1024 * 1024,  // 64MB per segment
    MaxSegments:     100,
    SyncAfterWrite:  true,
    BufferSize:      4096,
    ZeroCopy:        true,
    Format:          walrus.BINARY,
}

Benchmarks

Tested on Apple M1 Pro:

Operation Throughput Latency Allocations
Append 290,697 ops/sec 3.44 µs/op 14 allocs/op
Get 741,289 ops/sec 1.35 µs/op 9 allocs/op
Get (Zero-Copy) 813,008 ops/sec 1.23 µs/op 0 allocs/op
Get (Regular) 709,220 ops/sec 1.41 µs/op 7 allocs/op

Zero-copy mode provides ~15% faster reads with zero memory allocations.

go test -bench=. -benchmem

License

MIT

Documentation

Index

Constants

View Source
const (
	SegmentFileExtension = ".wal"
	SegmentFilePrefix    = "segment_"
	EntryLengthSize      = 4
)
View Source
const (
	// Layout: Index(8) + Term(8) + Timestamp(8) + DataLen(4) + TxIDLen(4) + Checksum(4) = 36 bytes
	EntryHeaderSize = 8 + 8 + 8 + 4 + 4 + 4
)

Variables

View Source
var (
	ErrChecksumMismatch      = errors.New("checksum mismatch: data may be corrupted")
	ErrEmptyData             = errors.New("entry data cannot be empty")
	ErrInvalidEntry          = errors.New("invalid entry")
	ErrWALCorrupted          = errors.New("WAL is corrupted")
	ErrWALClosed             = errors.New("WAL is closed")
	ErrIndexOutOfRange       = errors.New("index out of range")
	ErrSegmentNotFound       = errors.New("segment not found")
	ErrSegmentFull           = errors.New("segment is full")
	ErrUnsupportedFormat     = errors.New("unsupported encode format")
	ErrTransactionNotFound   = errors.New("transaction not found")
	ErrTransactionNotPending = errors.New("transaction is not in pending state")
	ErrTransactionExpired    = errors.New("transaction has expired")
	ErrTransactionTooLarge   = errors.New("transaction has too many entries")
	UnknownError             = errors.New("Unknown error")
)

Functions

func MkDirIfNotExist

func MkDirIfNotExist(dir string) result.Result[struct{}]

func NewWAL

func NewWAL(dir string, config Config) result.Result[*WAL]

func Open

func Open(dir string, config Config) result.Result[*WAL]

Types

type Batch

type Batch struct {
	// contains filtered or unexported fields
}

func (*Batch) Len

func (b *Batch) Len() int

func (*Batch) Reset

func (b *Batch) Reset()

func (*Batch) Write

func (b *Batch) Write(index uint64, data []byte)

type BinaryEncoder

type BinaryEncoder struct{}

func (BinaryEncoder) Decode

func (e BinaryEncoder) Decode(data []byte) result.Result[Entry]

func (BinaryEncoder) DecodeZeroCopy

func (e BinaryEncoder) DecodeZeroCopy(data []byte) result.Result[Entry]

func (BinaryEncoder) Encode

func (e BinaryEncoder) Encode(entry Entry) result.Result[[]byte]

func (BinaryEncoder) EncodeInPlace

func (e BinaryEncoder) EncodeInPlace(entry Entry, buffer []byte) result.Result[int]

func (BinaryEncoder) EstimateSize

func (e BinaryEncoder) EstimateSize(entry Entry) int

type Config

type Config struct {
	// contains filtered or unexported fields
}

func DefaultConfig

func DefaultConfig() Config

func (*Config) Validate

func (c *Config) Validate() result.Result[Config]

type Cursor

type Cursor struct {
	FirstIndex uint64
	LastIndex  uint64
}

func StartCursor

func StartCursor() Cursor

func (*Cursor) IsValid

func (c *Cursor) IsValid() bool

type Encoder

type Encoder interface {
	Encode(entry Entry) result.Result[[]byte]
	Decode(data []byte) result.Result[Entry]
	EncodeInPlace(entry Entry, buffer []byte) result.Result[int]
}

type Entry

type Entry struct {
	Index         uint64
	Term          uint64
	Data          []byte
	Checksum      uint32
	Timestamp     time.Time
	TransactionID TransactionID
}

type JSONEncoder

type JSONEncoder struct{}

func (JSONEncoder) Decode

func (e JSONEncoder) Decode(data []byte) result.Result[Entry]

func (JSONEncoder) DecodeZeroCopy

func (e JSONEncoder) DecodeZeroCopy(data []byte) result.Result[Entry]

func (JSONEncoder) Encode

func (e JSONEncoder) Encode(entry Entry) result.Result[[]byte]

func (JSONEncoder) EncodeInPlace

func (e JSONEncoder) EncodeInPlace(entry Entry, buffer []byte) result.Result[int]

func (JSONEncoder) EstimateSize

func (e JSONEncoder) EstimateSize(entry Entry) int

type LogFormat

type LogFormat int
const (
	BINARY  LogFormat = 0
	JSON    LogFormat = 1
	DEFAULT           = BINARY
)

type Logger

type Logger interface {
	Debug(msg string, args ...any)
	Info(msg string, args ...any)
	Warn(msg string, args ...any)
	Error(msg string, args ...any)
}

Logger interface for WAL logging

type NoOpLogger

type NoOpLogger struct{}

NoOpLogger is a logger that does nothing (default)

func (NoOpLogger) Debug

func (n NoOpLogger) Debug(msg string, args ...any)

func (NoOpLogger) Error

func (n NoOpLogger) Error(msg string, args ...any)

func (NoOpLogger) Info

func (n NoOpLogger) Info(msg string, args ...any)

func (NoOpLogger) Warn

func (n NoOpLogger) Warn(msg string, args ...any)

type Permission

type Permission os.FileMode
const (
	DefaultFilePermission Permission = 0644
	DirectoryPermission   Permission = 0750
	FilePermission        Permission = 0640
)

type Segment

type Segment struct {
	// contains filtered or unexported fields
}

func (*Segment) Close

func (s *Segment) Close() error

func (*Segment) ContainsIndex

func (s *Segment) ContainsIndex(entryIndex uint64) bool

func (*Segment) GetEntryByIndex

func (s *Segment) GetEntryByIndex(entryIndex uint64) ([]byte, error)

func (*Segment) GetEntryByIndexZeroCopy

func (s *Segment) GetEntryByIndexZeroCopy(entryIndex uint64) ([]byte, error)

func (*Segment) ReadAll

func (s *Segment) ReadAll() ([][]byte, error)

func (*Segment) ReadAt

func (s *Segment) ReadAt(offset int64) ([]byte, int64, error)

func (*Segment) ReadAtZeroCopy

func (s *Segment) ReadAtZeroCopy(offset int64) ([]byte, int64, error)

func (*Segment) Size

func (s *Segment) Size() (int64, error)

func (*Segment) Sync

func (s *Segment) Sync() error

func (*Segment) TrackEntry

func (s *Segment) TrackEntry(entryIndex uint64, offset int64)

func (*Segment) Write

func (s *Segment) Write(data []byte) (int, error)

type State

type State int
const (
	Initializing State = iota
	Ready
	Closed
)

type Status

type Status int
const (
	OK Status = iota
	Corrupted
)

type Transaction

type Transaction struct {
	ID        TransactionID
	State     TransactionState
	Entries   []Entry
	StartTime time.Time
	Timeout   time.Duration
	Batch     Batch
}

func (*Transaction) IsExpired

func (t *Transaction) IsExpired() bool

func (*Transaction) Reset

func (t *Transaction) Reset()

type TransactionID

type TransactionID string

type TransactionState

type TransactionState int
const (
	TransactionPending TransactionState = iota
	TransactionCommitted
	TransactionAborted
)

type WAL

type WAL struct {
	// contains filtered or unexported fields
}

func (*WAL) AddToTransaction

func (w *WAL) AddToTransaction(txID TransactionID, entry Entry) result.Result[struct{}]

func (*WAL) Append

func (w *WAL) Append(entry Entry) result.Result[uint64]

func (*WAL) AppendWithContext

func (w *WAL) AppendWithContext(ctx context.Context, entry Entry) result.Result[uint64]

func (*WAL) BeginTransaction

func (w *WAL) BeginTransaction(timeout time.Duration) result.Result[TransactionID]

func (*WAL) CleanupExpiredTransactions

func (w *WAL) CleanupExpiredTransactions() int

func (*WAL) Close

func (w *WAL) Close() error

func (*WAL) CommitTransaction

func (w *WAL) CommitTransaction(txID TransactionID) result.Result[[]uint64]

func (*WAL) Get

func (w *WAL) Get(index uint64) result.Result[Entry]

func (*WAL) GetActiveTransactionCount

func (w *WAL) GetActiveTransactionCount() int

func (*WAL) GetFirstIndex

func (w *WAL) GetFirstIndex() uint64

func (*WAL) GetLastIndex

func (w *WAL) GetLastIndex() uint64

func (*WAL) GetRange

func (w *WAL) GetRange(start, end uint64) result.Result[[]Entry]

func (*WAL) GetRangeZeroCopy

func (w *WAL) GetRangeZeroCopy(start, end uint64) result.Result[[]Entry]

func (*WAL) GetTransactionState

func (w *WAL) GetTransactionState(txID TransactionID) result.Result[TransactionState]

func (*WAL) GetZeroCopy

func (w *WAL) GetZeroCopy(index uint64) result.Result[Entry]

func (*WAL) RollbackTransaction

func (w *WAL) RollbackTransaction(txID TransactionID) result.Result[struct{}]

func (*WAL) StartTransactionCleanup

func (w *WAL) StartTransactionCleanup()

func (*WAL) Truncate

func (w *WAL) Truncate(index uint64) result.Result[struct{}]

func (*WAL) WriteBatch

func (w *WAL) WriteBatch(entries []Entry) result.Result[[]uint64]

func (*WAL) WriteBatchWithContext

func (w *WAL) WriteBatchWithContext(ctx context.Context, entries []Entry) result.Result[[]uint64]

type ZeroCopyEncoder

type ZeroCopyEncoder interface {
	DecodeZeroCopy(data []byte) result.Result[Entry]
	EstimateSize(entry Entry) int
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL