wal

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2026 License: AGPL-3.0 Imports: 17 Imported by: 0

Documentation

Overview

Package wal implements the Write-Ahead Log engine for BubbleFish Nexus.

Every payload is written to the WAL with CRC32 + fsync BEFORE it enters the queue. The database is NEVER written to directly — always through the queue. Temp files for WAL operations MUST be in filepath.Dir(wal.path), NEVER os.TempDir().

Index

Constants

View Source
const (
	// IntegrityModeCRC32 is the default integrity mode. Each WAL line is
	// JSON_BYTES<TAB>CRC32_HEX<NEWLINE>. No HMAC overhead.
	IntegrityModeCRC32 = "crc32"

	// IntegrityModeMAC adds HMAC-SHA256 tamper detection on top of CRC32.
	// Each WAL line is JSON_BYTES<TAB>CRC32_HEX<TAB>HMAC_HEX<NEWLINE>.
	// Reference: Tech Spec Section 4.1, Section 6.4.1.
	IntegrityModeMAC = "mac"
)
View Source
const (
	// StatusPending marks an entry that has not yet been delivered.
	StatusPending = "PENDING"
	// StatusDelivered marks an entry that was successfully written to the destination.
	StatusDelivered = "DELIVERED"
	// StatusPermanentFailure marks an entry that cannot be retried.
	StatusPermanentFailure = "PERMANENT_FAILURE"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Entry

type Entry struct {
	Version        int             `json:"version"`
	PayloadID      string          `json:"payload_id"`
	IdempotencyKey string          `json:"idempotency_key"`
	Status         string          `json:"status"`
	Timestamp      time.Time       `json:"timestamp"`
	Source         string          `json:"source"`
	Destination    string          `json:"destination"`
	Subject        string          `json:"subject"`
	ActorType      string          `json:"actor_type"`
	ActorID        string          `json:"actor_id"`
	Payload        json.RawMessage `json:"payload"`
}

Entry is a single WAL record. All fields except Payload are indexed at the WAL layer for routing and status tracking. Payload holds the full TranslatedPayload as raw JSON to avoid re-encoding on replay.

type Option

type Option func(*WAL)

Option configures a WAL instance. Pass to Open.

func WithEncryption

func WithEncryption(key []byte) Option

WithEncryption configures WAL at-rest encryption with the provided key. The key should be 32 bytes (AES-256). Encryption support is a future phase; this option is accepted but not yet implemented. Reference: Tech Spec Section 6.4.2.

func WithIntegrity

func WithIntegrity(mode string, key []byte) Option

WithIntegrity configures WAL integrity mode. When mode is "mac", key must be a non-empty HMAC-SHA256 key (typically 32 bytes). The key is stored once and never re-read. Reference: Tech Spec Section 6.4.1.

func WithSecurityEvent

func WithSecurityEvent(fn SecurityEventFunc) Option

WithSecurityEvent registers a callback invoked on security events such as wal_tamper_detected. The callback is invoked synchronously during replay; it must not block.

type SecurityEventFunc

type SecurityEventFunc func(eventType string, attrs ...slog.Attr)

SecurityEventFunc is a callback invoked when a security-relevant event occurs inside the WAL (e.g. HMAC mismatch indicating tampering). The attrs slice contains structured fields for the event.

type WAL

type WAL struct {
	// contains filtered or unexported fields
}

WAL is the write-ahead log engine. All state is held in struct fields; there are no package-level variables.

func Open

func Open(dir string, maxSizeMB int64, logger *slog.Logger, opts ...Option) (*WAL, error)

Open opens or creates a WAL rooted at dir. maxSizeMB controls segment rotation (default 50). Panics if logger is nil. Options configure integrity mode and security event callbacks.

func (*WAL) Append

func (w *WAL) Append(entry Entry) error

Append writes entry to the WAL. The entry status is forced to PENDING and version is set to walVersion. CRC32 is computed over the JSON bytes and appended after a tab before the newline. fsync is called before returning.

On any failure the caller must return a 500 to the client. The WAL invariant is: if Append returns nil, the entry is durable on disk.

func (*WAL) CRCFailures

func (w *WAL) CRCFailures() int64

CRCFailures returns the total number of CRC32 mismatches encountered during Replay calls. This counter is exposed to Prometheus in Phase 0D via bubblefish_wal_crc_failures_total.

func (*WAL) Close

func (w *WAL) Close() error

Close closes the current WAL segment. Safe to call multiple times.

func (*WAL) CurrentSegment

func (w *WAL) CurrentSegment() string

PendingCount returns the current count of PENDING WAL entries. Incremented on Append and Replay, decremented on MarkDelivered/MarkPermanentFailure. Safe to call concurrently. Reference: Tech Spec Section 4.4. CurrentSegment returns the filename (not full path) of the current WAL segment. Used by the /api/status admin endpoint.

func (*WAL) IntegrityFailures

func (w *WAL) IntegrityFailures() int64

IntegrityFailures returns the total number of HMAC mismatches encountered during Replay calls. Only non-zero when integrity=mac. Exposed to Prometheus via bubblefish_wal_integrity_failures_total.

func (*WAL) MarkDelivered

func (w *WAL) MarkDelivered(payloadID string) error

MarkDelivered atomically rewrites the WAL entry for payloadID with status=DELIVERED. This rewrites the entire segment file containing the entry, which is O(segment_size).

HOT-PATH CALLERS MUST USE MarkDeliveredBatch instead, which amortizes the rewrite to one operation per segment per batch. The singular variant exists for low-frequency callers (recovery tools) and tests.

func (*WAL) MarkDeliveredBatch

func (w *WAL) MarkDeliveredBatch(payloadIDs []string) error

MarkDeliveredBatch atomically rewrites WAL entries for all payloadIDs with status=DELIVERED in a single segment rewrite pass. This is O(N) for N entries instead of O(N²) when calling MarkDelivered N times individually.

The WAL mutex is held for the duration so concurrent Append calls are serialised. This is intentional: correctness over throughput. MarkDeliveredBatch is called off the hot write path.

func (*WAL) MarkPermanentFailure

func (w *WAL) MarkPermanentFailure(payloadID string) error

MarkPermanentFailure atomically rewrites the WAL entry for payloadID with status=PERMANENT_FAILURE. Called by the queue worker when all retries are exhausted. The entry will never be re-enqueued on replay.

func (*WAL) PendingCount

func (w *WAL) PendingCount() int64

func (*WAL) Replay

func (w *WAL) Replay(fn func(Entry)) error

Replay reads all WAL segments oldest-first, calling fn for each PENDING entry.

Crash safety: if two segments exist (crash during rotation), both are replayed. Duplicate idempotency keys across segments are deduplicated — fn is called at most once per key.

Corrupt entries (CRC mismatch, malformed JSON, partial lines) are skipped with a WARN log. Replay does NOT hold the WAL mutex; callers must not call Append concurrently with Replay.

func (*WAL) SampleDelivered

func (w *WAL) SampleDelivered(count int) ([]Entry, error)

SampleDelivered scans all WAL segments and returns up to count randomly sampled entries with status DELIVERED. This is a read-only operation used by the consistency checker to verify that delivered payloads exist in the destination. Corrupt or malformed entries are silently skipped.

If fewer than count DELIVERED entries exist, all of them are returned. Reference: Tech Spec Section 11.5.

type WALUpdater

type WALUpdater interface {
	MarkDelivered(payloadID string) error
	MarkDeliveredBatch(payloadIDs []string) error
	MarkPermanentFailure(payloadID string) error
}

WALUpdater is implemented by WAL and consumed by the queue worker to mark entries after a destination write attempt. Callers MUST log errors from MarkDelivered/MarkDeliveredBatch at WARN level (not ERROR): a failure is non-fatal because destination idempotency prevents duplicate writes on replay. Callers MUST log errors from MarkPermanentFailure at ERROR level.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL