Documentation
¶
Overview ¶
Package walfs implements a Write-Ahead Log file system for durable, crash-recoverable storage.
Originally developed for unisondb, this package provides a general-purpose WAL that can be used by any database or system requiring durable append-only logging.
Index ¶
- Constants
- Variables
- func EncodeRecordPositionTo(pos RecordPosition, buf []byte) []byte
- func IsActive(flags uint32) bool
- func IsSealed(flags uint32) bool
- func SegmentFileName(dirPath string, extName string, id SegmentID) string
- func SegmentIndexFileName(dirPath string, extName string, id SegmentID) string
- func WithSegmentCustomMarker(marker uint32) func(*Segment)
- func WithSegmentCustomMarkerValidator(validator MarkerValidator) func(*Segment)
- func WithSegmentDirectorySyncer(syncer DirectorySyncer) func(*Segment)
- func WithSegmentSize(size int64) func(*Segment)
- func WithSyncOption(opt MsyncOption) func(*Segment)
- type DeletionPredicate
- type DirectorySyncFunc
- type DirectorySyncer
- type IndexEntry
- type MarkerValidator
- type MsyncOption
- type NoopDecoder
- type Reader
- type ReaderOption
- type RecordDecoder
- type RecordPosition
- type Segment
- func (seg *Segment) ClearIndexFromMemory()
- func (seg *Segment) Close() error
- func (seg *Segment) FirstLogIndex() uint64
- func (seg *Segment) GetEntryCount() int64
- func (seg *Segment) GetFlags() uint32
- func (seg *Segment) GetLastModifiedAt() int64
- func (seg *Segment) GetSegmentSize() int64
- func (seg *Segment) HasActiveReaders() bool
- func (seg *Segment) ID() SegmentID
- func (seg *Segment) IndexEntries() []SegmentIndexEntry
- func (seg *Segment) IsInMemorySealed() bool
- func (seg *Segment) IsSealed() bool
- func (seg *Segment) MSync() error
- func (seg *Segment) MarkForDeletion()
- func (seg *Segment) MarkSealedInMemory()
- func (seg *Segment) NewReader() *SegmentReader
- func (seg *Segment) Read(offset int64) ([]byte, RecordPosition, error)
- func (seg *Segment) Remove() error
- func (seg *Segment) SealSegment() error
- func (seg *Segment) Sync() error
- func (seg *Segment) TruncateTo(logIndex uint64) error
- func (seg *Segment) WaitForIndexFlush()
- func (seg *Segment) WillExceed(dataSize int) bool
- func (seg *Segment) Write(data []byte, logIndex uint64) (RecordPosition, error)
- func (seg *Segment) WriteBatch(records [][]byte, logIndexes []uint64) ([]RecordPosition, int, error)
- func (seg *Segment) WriteOffset() int64
- type SegmentHeader
- type SegmentID
- type SegmentIndexEntry
- type SegmentReader
- type ShardedIndex
- func (s *ShardedIndex) Clear()
- func (s *ShardedIndex) Delete(index uint64)
- func (s *ShardedIndex) DeleteRange(min, max uint64) int64
- func (s *ShardedIndex) Get(index uint64) (RecordPosition, bool)
- func (s *ShardedIndex) GetFirstLast() (first, last uint64, ok bool)
- func (s *ShardedIndex) IsCurrentEntry(index uint64, segmentID SegmentID, offset int64) bool
- func (s *ShardedIndex) Len() int64
- func (s *ShardedIndex) LenSlow() int64
- func (s *ShardedIndex) Range(fn func(index uint64, pos RecordPosition) bool)
- func (s *ShardedIndex) Set(index uint64, pos RecordPosition)
- func (s *ShardedIndex) SetBatch(entries []IndexEntry)
- type WALog
- func (wl *WALog) BackupLastRotatedSegment(backupDir string) (string, error)
- func (wl *WALog) BackupSegmentsAfter(afterID SegmentID, backupDir string) (map[SegmentID]string, error)
- func (wl *WALog) BytesPerSyncCallCount() int64
- func (wl *WALog) CleanupStalePendingSegments()
- func (wl *WALog) Close() error
- func (wl *WALog) Commit(pos RecordPosition)
- func (wl *WALog) CommittedPosition() RecordPosition
- func (wl *WALog) Current() *Segment
- func (wl *WALog) LogIndex() *ShardedIndex
- func (wl *WALog) MarkSegmentsForDeletion()
- func (wl *WALog) NewReader(opts ...ReaderOption) *Reader
- func (wl *WALog) NewReaderAfter(pos RecordPosition, opts ...ReaderOption) (*Reader, error)
- func (wl *WALog) NewReaderWithStart(pos RecordPosition, opts ...ReaderOption) (*Reader, error)
- func (wl *WALog) PositionForIndex(idx uint64) (RecordPosition, error)
- func (wl *WALog) QueuedSegmentsForDeletion() map[SegmentID]*Segment
- func (wl *WALog) Read(pos RecordPosition) ([]byte, error)
- func (wl *WALog) RotateSegment() error
- func (wl *WALog) SegmentRotatedCount() int64
- func (wl *WALog) Segments() map[SegmentID]*Segment
- func (wl *WALog) StartPendingSegmentCleaner(ctx context.Context, interval time.Duration, ...)
- func (wl *WALog) Sync() error
- func (wl *WALog) Truncate(logIndex uint64) error
- func (wl *WALog) Write(data []byte, logIndex uint64) (RecordPosition, error)
- func (wl *WALog) WriteBatch(records [][]byte, logIndexes []uint64) ([]RecordPosition, error)
- type WALogOptions
- func WithAutoCleanupPolicy(maxAge time.Duration, minSegments, maxSegments int, enable bool) WALogOptions
- func WithBytesPerSync(bytes int64) WALogOptions
- func WithClearIndexOnFlush() WALogOptions
- func WithCustomMarker(marker uint32) WALogOptions
- func WithCustomMarkerValidator(validator MarkerValidator) WALogOptions
- func WithDirectorySyncer(syncer DirectorySyncer) WALogOptions
- func WithMSyncEveryWrite(enabled bool) WALogOptions
- func WithMaxSegmentSize(size int64) WALogOptions
- func WithOnSegmentRotated(fn func()) WALogOptions
- func WithReaderCommitCheck() WALogOptions
Constants ¶
const ( StateOpen = iota StateClosing FlagActive uint32 = 1 << iota FlagSealed uint32 = 1 << 1 )
Variables ¶
var ( ErrClosed = errors.New("the Segment file is closed") ErrInvalidCRC = errors.New("invalid crc, the data may be corrupted") ErrCorruptHeader = errors.New("corrupt record header, invalid length") ErrIncompleteChunk = errors.New("incomplete or torn write detected at record trailer") ErrSegmentSealed = errors.New("cannot write to sealed segment") ErrSegmentReaderClosed = errors.New("segment reader is closed") ErrNoNewData = errors.New("no new data yet") ErrSegmentFull = errors.New("segment is full, cannot write more records") )
var ( ErrSegmentNotFound = errors.New("segment not found") ErrOffsetOutOfBounds = errors.New("start offset is beyond segment size") ErrOffsetBeforeHeader = errors.New("start offset is within reserved segment header") ErrFsync = errors.New("fsync error") ErrRecordTooLarge = errors.New("record size exceeds maximum segment capacity") )
var ( // NilRecordPosition is a sentinel value representing an nil RecordPosition. NilRecordPosition = RecordPosition{} )
Functions ¶
func EncodeRecordPositionTo ¶
func EncodeRecordPositionTo(pos RecordPosition, buf []byte) []byte
EncodeRecordPositionTo serializes a RecordPosition into the provided buffer. The buffer must be at least 12 bytes long. If it's shorter, a new 12-byte slice is allocated.
func SegmentFileName ¶
SegmentFileName returns the file name of a Segment file.
func SegmentIndexFileName ¶
SegmentIndexFileName returns the file name of the index for a segment.
func WithSegmentCustomMarker ¶
WithSegmentCustomMarker sets the 4-byte marker written for new segments. Use WithSegmentCustomMarkerValidator to validate stored markers on open.
func WithSegmentCustomMarkerValidator ¶
func WithSegmentCustomMarkerValidator(validator MarkerValidator) func(*Segment)
WithSegmentCustomMarkerValidator sets a validator for stored markers when opening segments.
func WithSegmentDirectorySyncer ¶
func WithSegmentDirectorySyncer(syncer DirectorySyncer) func(*Segment)
WithSegmentDirectorySyncer sets the directory syncer used after destructive operations.
func WithSegmentSize ¶
WithSegmentSize sets the size for the Segment.
func WithSyncOption ¶
func WithSyncOption(opt MsyncOption) func(*Segment)
WithSyncOption sets the sync option for the Segment.
Types ¶
type DeletionPredicate ¶
DeletionPredicate is a function that determines if a segment ID is safe to delete.
type DirectorySyncFunc ¶
DirectorySyncFunc adapts a function to act as a DirectorySyncer.
func (DirectorySyncFunc) SyncDir ¶
func (f DirectorySyncFunc) SyncDir(dir string) error
SyncDir implements DirectorySyncer.
type DirectorySyncer ¶
DirectorySyncer syncs a directory path to stable storage.
type IndexEntry ¶
type IndexEntry struct {
Index uint64
Pos RecordPosition
}
IndexEntry represents an index to position mapping for batch operations.
type MarkerValidator ¶
type MsyncOption ¶
type MsyncOption int
const ( // MsyncNone skips msync after write. MsyncNone MsyncOption = iota // MsyncOnWrite calls msync (Flush) after every write. MsyncOnWrite )
type NoopDecoder ¶
type NoopDecoder struct{}
type Reader ¶
type Reader struct {
// contains filtered or unexported fields
}
Reader represents a high-level sequential reader over a WALog. It reads across all available WAL segments in order, automatically advancing from one segment to the next. Reader is not safe for concurrent use.
func (*Reader) Close ¶
func (r *Reader) Close()
Close closes all segment readers to release their references. IMPORTANT: This method MUST be called after the Reader is no longer needed.
func (*Reader) LastRecordPosition ¶
func (r *Reader) LastRecordPosition() RecordPosition
LastRecordPosition returns the RecordPosition of the last successfully read entry.
func (*Reader) Next ¶
func (r *Reader) Next() ([]byte, RecordPosition, error)
Next returns the next available WAL record data and its current position. IMPORTANT: The returned `[]byte` is a slice of a memory-mapped file, so data must not be retained or modified. If the data needs to be used beyond the lifetime of the segment, the caller MUST copy it. When readerCommitCheck is enabled on the WALog, returns ErrNoNewData if the reader has reached the committed boundary.
type ReaderOption ¶
type ReaderOption func(*Reader)
ReaderOption configures a Reader.
func WithDecoder ¶
func WithDecoder(decoder RecordDecoder) ReaderOption
WithDecoder sets a custom decoder for the reader. When set, NextRecord() will use this decoder to convert raw bytes to decoded data.
type RecordDecoder ¶
type RecordDecoder interface {
// Decode transforms raw WAL bytes into the actual record payload.
// The returned bytes may reference the input (zero-copy) or be newly allocated.
Decode(data []byte) ([]byte, error)
}
- Wrapped: WAL bytes contain a wrapper (e.g., Raft log) that must be unwrapped.
type RecordPosition ¶
RecordPosition is the logical location of a record entry within a WAL Segment.
func DecodeRecordPosition ¶
func DecodeRecordPosition(data []byte) (RecordPosition, error)
DecodeRecordPosition deserializes a byte slice into a RecordPosition.
func (RecordPosition) Encode ¶
func (rp RecordPosition) Encode() []byte
Encode serializes the RecordPosition into a fixed-length byte slice.
func (RecordPosition) IsZero ¶
func (rp RecordPosition) IsZero() bool
IsZero returns true if the RecordPosition is uninitialized, meaning both SegmentID and Offset are zero.
func (RecordPosition) String ¶
func (rp RecordPosition) String() string
type Segment ¶
type Segment struct {
// contains filtered or unexported fields
}
Segment represents a single WAL segment backed by a memory-mapped file.
func OpenSegmentFile ¶
OpenSegmentFile opens an existing segment file or create a new one if not present. If SegmentFile is sealed it doesn't scan its content while opening.
func (*Segment) ClearIndexFromMemory ¶
func (seg *Segment) ClearIndexFromMemory()
ClearIndexFromMemory releases the in-memory index entries to free memory. This can be called after the index has been copied to an external data structure. Note: After calling this, IndexEntries() will return an empty slice. Only sealed segments can be cleared; active segments are ignored.
func (*Segment) Close ¶
Close gracefully shuts down the segment by waiting for all active readers to complete. It unmap the segment file and closes file descriptor.
func (*Segment) FirstLogIndex ¶
func (*Segment) GetEntryCount ¶
GetEntryCount returns the total entry count in segment.
func (*Segment) GetLastModifiedAt ¶
GetLastModifiedAt returns the last modified time of the segment.
func (*Segment) GetSegmentSize ¶
func (*Segment) HasActiveReaders ¶
HasActiveReaders returns true if there are any currently active readers on the segment.
func (*Segment) IndexEntries ¶
func (seg *Segment) IndexEntries() []SegmentIndexEntry
IndexEntries returns a copy of the index metadata for this segment.
func (*Segment) IsInMemorySealed ¶
IsInMemorySealed returns true if the segment has been marked as sealed in memory.
func (*Segment) MarkForDeletion ¶
func (seg *Segment) MarkForDeletion()
MarkForDeletion marks the segment as candidate for deletion. If no active readers, it will immediately call cleanup. Otherwise, cleanup will be deferred until the last reference is released.
func (*Segment) MarkSealedInMemory ¶
func (seg *Segment) MarkSealedInMemory()
MarkSealedInMemory marks the segment as sealed in memory.
func (*Segment) NewReader ¶
func (seg *Segment) NewReader() *SegmentReader
NewReader creates a new SegmentReader for reading from the segment.
func (*Segment) Read ¶
func (seg *Segment) Read(offset int64) ([]byte, RecordPosition, error)
Read reads the record data at the specified offset within the segment. IMP: Don't retain any data. This method returns a slice of the mmap'd file content corresponding to the record payload. so slice becomes invalid immediately after the segment is closed or unmapped.
func (*Segment) Remove ¶
Remove closes the segment and removes its underlying files (segment and index).
func (*Segment) SealSegment ¶
SealSegment seals the given segment.
func (*Segment) TruncateTo ¶
TruncateTo truncates the segment to the specified log index. All entries after the given log index will be discarded. If the log index is not found in this segment, it returns an error.
func (*Segment) WaitForIndexFlush ¶
func (seg *Segment) WaitForIndexFlush()
WaitForIndexFlush blocks until any pending index flush operations complete.
func (*Segment) WillExceed ¶
WillExceed returns true if writing a record of the given dataSize would overflow the segment's allocated (memory-mapped) size.
func (*Segment) Write ¶
func (seg *Segment) Write(data []byte, logIndex uint64) (RecordPosition, error)
Write writes the provided slice of bytes to the open mmap file. It appends data to the segment and returns the offset where the record was written in the given segment.
func (*Segment) WriteBatch ¶
func (seg *Segment) WriteBatch(records [][]byte, logIndexes []uint64) ([]RecordPosition, int, error)
WriteBatch writes multiple records to the segment in a single operation. Returns a slice of RecordPositions for successfully written records and the number written. If the segment fills up mid-batch, it returns positions for records that fit, the count of records written, and ErrSegmentFull. Callers should retry remaining records in a new segment. nolint: funlen
func (*Segment) WriteOffset ¶
WriteOffset returns the current write offset of the segment.
type SegmentHeader ¶
type SegmentHeader struct {
// at 0
Magic uint32
// at 4
Version uint32
// at 8
CreatedAt int64
// at 16
LastModifiedAt int64
// at 24
WriteOffset int64
// at 32
EntryCount int64
// at 40
Flags uint32
// at 44 -51
FirstLogIndex uint64
// at 56 byte: - CRC32 of first 56 bytes
CRC uint32
// contains filtered or unexported fields
}
SegmentHeader encodes all the necessary information about the segment file at the top of the file. Its Size is 64 byte once encoded.
type SegmentIndexEntry ¶
SegmentIndexEntry exposes a record's physical location within a WAL segment.
type SegmentReader ¶
type SegmentReader struct {
// contains filtered or unexported fields
}
SegmentReader is an iterator over records in a WAL segment. It maintains its own read offset and provides safe iteration over a Segment.
func (*SegmentReader) Close ¶
func (r *SegmentReader) Close()
Close closes the SegmentReader and decrements the segment's reference count.
func (*SegmentReader) LastRecordPosition ¶
func (r *SegmentReader) LastRecordPosition() RecordPosition
func (*SegmentReader) Next ¶
func (r *SegmentReader) Next() ([]byte, RecordPosition, error)
Next reads the next record from the segment and also advances the read position. It returns the data, the record's position, or an error. Returns io.EOF if the segment is sealed and all data has been read. Returns ErrNoNewData if unsealed and no new data is available yet.
type ShardedIndex ¶
type ShardedIndex struct {
// contains filtered or unexported fields
}
ShardedIndex is a concurrent-safe sharded map for Raft index → WAL position.
func NewShardedIndex ¶
func NewShardedIndex() *ShardedIndex
NewShardedIndex creates a sharded index with the specified number of shards.
func (*ShardedIndex) Clear ¶
func (s *ShardedIndex) Clear()
Clear removes all entries from all shards.
func (*ShardedIndex) Delete ¶
func (s *ShardedIndex) Delete(index uint64)
Delete removes an index from the map.
func (*ShardedIndex) DeleteRange ¶
func (s *ShardedIndex) DeleteRange(min, max uint64) int64
DeleteRange removes all indices in [min, max] range.
func (*ShardedIndex) Get ¶
func (s *ShardedIndex) Get(index uint64) (RecordPosition, bool)
Get retrieves the position for an index. Returns the position and true if found, zero value and false otherwise.
func (*ShardedIndex) GetFirstLast ¶
func (s *ShardedIndex) GetFirstLast() (first, last uint64, ok bool)
GetFirstLast returns the first and last indices in the index. Returns (0, 0, false) if the index is empty.
func (*ShardedIndex) IsCurrentEntry ¶
func (s *ShardedIndex) IsCurrentEntry(index uint64, segmentID SegmentID, offset int64) bool
IsCurrentEntry checks if the given position is the current one for an index. Returns true if the index exists and the position matches exactly.
func (*ShardedIndex) Len ¶
func (s *ShardedIndex) Len() int64
Len returns the total number of entries across all shards.
func (*ShardedIndex) LenSlow ¶
func (s *ShardedIndex) LenSlow() int64
LenSlow returns the total number of entries by summing all shards.
func (*ShardedIndex) Range ¶
func (s *ShardedIndex) Range(fn func(index uint64, pos RecordPosition) bool)
Range iterates over all entries and calls fn for each. If fn returns false, iteration stops.
func (*ShardedIndex) Set ¶
func (s *ShardedIndex) Set(index uint64, pos RecordPosition)
Set stores the position for an index.
func (*ShardedIndex) SetBatch ¶
func (s *ShardedIndex) SetBatch(entries []IndexEntry)
SetBatch stores multiple index→position mappings.
type WALog ¶
type WALog struct {
// contains filtered or unexported fields
}
WALog manages the lifecycle of each individual segments, including creation, rotation, recovery, and read/write operations.
func NewWALog ¶
func NewWALog(dir string, ext string, opts ...WALogOptions) (*WALog, error)
NewWALog returns an initialized WALog that manages the segments in the provided dir with the given ext.
func (*WALog) BackupLastRotatedSegment ¶
BackupLastRotatedSegment copies the most recently sealed WAL segment into backupDir and returns the backup path.
func (*WALog) BackupSegmentsAfter ¶
func (wl *WALog) BackupSegmentsAfter(afterID SegmentID, backupDir string) (map[SegmentID]string, error)
BackupSegmentsAfter copies every sealed segment with ID > afterID into backupDir. Returns a map of SegmentID to the backup file path.
func (*WALog) BytesPerSyncCallCount ¶
BytesPerSyncCallCount how many times this was called on current active segment.
func (*WALog) CleanupStalePendingSegments ¶
func (wl *WALog) CleanupStalePendingSegments()
CleanupStalePendingSegments scans pendingDeletion and segments maps. If a segment's file no longer exists on disk, it removes those entries from both maps.
func (*WALog) Commit ¶
func (wl *WALog) Commit(pos RecordPosition)
Commit advances the committed position to the given position. When readerCommitCheck is enabled, readers cannot advance beyond this position. In Raft mode, call this after Raft consensus confirms the log entry. This method is safe for concurrent use.
func (*WALog) CommittedPosition ¶
func (wl *WALog) CommittedPosition() RecordPosition
CommittedPosition returns the current committed position. Returns NilRecordPosition if no position has been committed yet.
func (*WALog) LogIndex ¶
func (wl *WALog) LogIndex() *ShardedIndex
LogIndex returns the shared sharded index mapping log index to record position.
func (*WALog) MarkSegmentsForDeletion ¶
func (wl *WALog) MarkSegmentsForDeletion()
MarkSegmentsForDeletion identifies and queues WAL segments for deletion based on their age and segment count retention constraints.
func (*WALog) NewReader ¶
func (wl *WALog) NewReader(opts ...ReaderOption) *Reader
NewReader returns a new Reader that sequentially reads all segments in the WALog, starting from the beginning (lowest SegmentID).
func (*WALog) NewReaderAfter ¶
func (wl *WALog) NewReaderAfter(pos RecordPosition, opts ...ReaderOption) (*Reader, error)
NewReaderAfter returns a reader that starts after the given RecordPosition. It first creates a reader from that position, then skips one record.
func (*WALog) NewReaderWithStart ¶
func (wl *WALog) NewReaderWithStart(pos RecordPosition, opts ...ReaderOption) (*Reader, error)
NewReaderWithStart returns a new Reader that begins reading from the specified position. If SegmentID is 0, the reader will begin from the very start of the WAL.
func (*WALog) PositionForIndex ¶
func (wl *WALog) PositionForIndex(idx uint64) (RecordPosition, error)
PositionForIndex returns the RecordPosition for the given log index.
func (*WALog) QueuedSegmentsForDeletion ¶
func (*WALog) Read ¶
func (wl *WALog) Read(pos RecordPosition) ([]byte, error)
Read returns the data from the provided record position if found. IMPORTANT: The returned `[]byte` is a slice of a memory-mapped file, so data must not be retained or modified. If the data needs to be used beyond the lifetime of the segment, the caller MUST copy it.
func (*WALog) RotateSegment ¶
RotateSegment rotates the current segment and create a new active segment.
func (*WALog) SegmentRotatedCount ¶
func (*WALog) Segments ¶
Segments returns a snapshot (shallow) copy of all active segments managed by the WAL.
func (*WALog) StartPendingSegmentCleaner ¶
func (wl *WALog) StartPendingSegmentCleaner(ctx context.Context, interval time.Duration, canDeleteFn func(segID SegmentID) bool, )
StartPendingSegmentCleaner starts a background goroutine that periodically inspects segments marked for pending deletion and attempts to safely remove them. If there are any current reader it will mark it for deletion.
func (*WALog) Truncate ¶
Truncate truncates the WAL to the specified log index. All entries after the given log index will be discarded. If logIndex is 0, all segments are deleted and the WAL is reset.
Users must ensure that reader creation and advancement are not interfering with the truncation logic. Active readers on segments that need to be deleted will cause the Truncate operation to fail.
func (*WALog) Write ¶
func (wl *WALog) Write(data []byte, logIndex uint64) (RecordPosition, error)
Write appends the given data as a new record to the active segment. It returns RecordPosition indicating where the data was written.
func (*WALog) WriteBatch ¶
func (wl *WALog) WriteBatch(records [][]byte, logIndexes []uint64) ([]RecordPosition, error)
WriteBatch appends multiple records to the active segment in a single batched operation. If the batch cannot fit entirely in the current segment, it handles automatic rotation: Returns a slice of RecordPositions for all successfully written records.
type WALogOptions ¶
type WALogOptions func(*WALog)
func WithAutoCleanupPolicy ¶
func WithAutoCleanupPolicy(maxAge time.Duration, minSegments, maxSegments int, enable bool) WALogOptions
WithAutoCleanupPolicy configures the automatic segment cleanup policy for the WAL. maxAge: Segments older than this duration are eligible for deletion. minSegments: Minimum number of WAL segments to always retain, regardless of age. maxSegments: If the total number of segments exceeds this limit, older segments will be deleted irrespective of its age.
func WithBytesPerSync ¶
func WithBytesPerSync(bytes int64) WALogOptions
WithBytesPerSync sets the threshold in bytes after which a msync is triggered. Useful for batching writes. 0 disable this feature.
func WithClearIndexOnFlush ¶
func WithClearIndexOnFlush() WALogOptions
WithClearIndexOnFlush enables clearing segment's in-memory index after it's flushed to disk. This is useful when an external index is maintained.
func WithCustomMarker ¶
func WithCustomMarker(marker uint32) WALogOptions
WithCustomMarker sets the 4-byte marker written to new segments.
func WithCustomMarkerValidator ¶
func WithCustomMarkerValidator(validator MarkerValidator) WALogOptions
WithCustomMarkerValidator sets a validator for stored markers on open.
func WithDirectorySyncer ¶
func WithDirectorySyncer(syncer DirectorySyncer) WALogOptions
WithDirectorySyncer overrides the directory syncer used for new segment files.
func WithMSyncEveryWrite ¶
func WithMSyncEveryWrite(enabled bool) WALogOptions
WithMSyncEveryWrite enables msync() after every write operation.
func WithMaxSegmentSize ¶
func WithMaxSegmentSize(size int64) WALogOptions
WithMaxSegmentSize options sets the MaxSize of the Segment file.
func WithOnSegmentRotated ¶
func WithOnSegmentRotated(fn func()) WALogOptions
WithOnSegmentRotated registers a fn callback function that will be called immediately after a WAL segment is rotated.
func WithReaderCommitCheck ¶
func WithReaderCommitCheck() WALogOptions
WithReaderCommitCheck enables commit offset checking for readers. When enabled, readers will check against the committed position before advancing. If the reader's current position is at or beyond the committed position, Next() returns ErrNoNewData without advancing. Use Commit() to advance the committed position. This is used in Raft mode where writes are not visible until committed.