Documentation
¶
Overview ¶
Package wal implements the Write-Ahead Log engine for BubbleFish Nexus.
Every payload is written to the WAL with CRC32 + fsync BEFORE it enters the queue. The database is NEVER written to directly — always through the queue. Temp files for WAL operations MUST be in filepath.Dir(wal.path), NEVER os.TempDir().
Index ¶
- Constants
- type Entry
- type Option
- type SecurityEventFunc
- type WAL
- func (w *WAL) Append(entry Entry) error
- func (w *WAL) CRCFailures() int64
- func (w *WAL) Close() error
- func (w *WAL) CurrentSegment() string
- func (w *WAL) IntegrityFailures() int64
- func (w *WAL) MarkDelivered(payloadID string) error
- func (w *WAL) MarkDeliveredBatch(payloadIDs []string) error
- func (w *WAL) MarkPermanentFailure(payloadID string) error
- func (w *WAL) PendingCount() int64
- func (w *WAL) Replay(fn func(Entry)) error
- func (w *WAL) SampleDelivered(count int) ([]Entry, error)
- type WALUpdater
Constants ¶
const ( // IntegrityModeCRC32 is the default integrity mode. Each WAL line is // JSON_BYTES<TAB>CRC32_HEX<NEWLINE>. No HMAC overhead. IntegrityModeCRC32 = "crc32" // IntegrityModeMAC adds HMAC-SHA256 tamper detection on top of CRC32. // Each WAL line is JSON_BYTES<TAB>CRC32_HEX<TAB>HMAC_HEX<NEWLINE>. // Reference: Tech Spec Section 4.1, Section 6.4.1. IntegrityModeMAC = "mac" )
const ( // StatusPending marks an entry that has not yet been delivered. StatusPending = "PENDING" // StatusDelivered marks an entry that was successfully written to the destination. StatusDelivered = "DELIVERED" // StatusPermanentFailure marks an entry that cannot be retried. StatusPermanentFailure = "PERMANENT_FAILURE" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Entry ¶
type Entry struct {
Version int `json:"version"`
PayloadID string `json:"payload_id"`
IdempotencyKey string `json:"idempotency_key"`
Status string `json:"status"`
Timestamp time.Time `json:"timestamp"`
Source string `json:"source"`
Destination string `json:"destination"`
Subject string `json:"subject"`
ActorType string `json:"actor_type"`
ActorID string `json:"actor_id"`
Payload json.RawMessage `json:"payload"`
}
Entry is a single WAL record. All fields except Payload are indexed at the WAL layer for routing and status tracking. Payload holds the full TranslatedPayload as raw JSON to avoid re-encoding on replay.
type Option ¶
type Option func(*WAL)
Option configures a WAL instance. Pass to Open.
func WithEncryption ¶
WithEncryption configures WAL at-rest encryption with the provided key. The key should be 32 bytes (AES-256). Encryption support is a future phase; this option is accepted but not yet implemented. Reference: Tech Spec Section 6.4.2.
func WithIntegrity ¶
WithIntegrity configures WAL integrity mode. When mode is "mac", key must be a non-empty HMAC-SHA256 key (typically 32 bytes). The key is stored once and never re-read. Reference: Tech Spec Section 6.4.1.
func WithSecurityEvent ¶
func WithSecurityEvent(fn SecurityEventFunc) Option
WithSecurityEvent registers a callback invoked on security events such as wal_tamper_detected. The callback is invoked synchronously during replay; it must not block.
type SecurityEventFunc ¶
SecurityEventFunc is a callback invoked when a security-relevant event occurs inside the WAL (e.g. HMAC mismatch indicating tampering). The attrs slice contains structured fields for the event.
type WAL ¶
type WAL struct {
// contains filtered or unexported fields
}
WAL is the write-ahead log engine. All state is held in struct fields; there are no package-level variables.
func Open ¶
Open opens or creates a WAL rooted at dir. maxSizeMB controls segment rotation (default 50). Panics if logger is nil. Options configure integrity mode and security event callbacks.
func (*WAL) Append ¶
Append writes entry to the WAL. The entry status is forced to PENDING and version is set to walVersion. CRC32 is computed over the JSON bytes and appended after a tab before the newline. fsync is called before returning.
On any failure the caller must return a 500 to the client. The WAL invariant is: if Append returns nil, the entry is durable on disk.
func (*WAL) CRCFailures ¶
CRCFailures returns the total number of CRC32 mismatches encountered during Replay calls. This counter is exposed to Prometheus in Phase 0D via bubblefish_wal_crc_failures_total.
func (*WAL) CurrentSegment ¶
PendingCount returns the current count of PENDING WAL entries. Incremented on Append and Replay, decremented on MarkDelivered/MarkPermanentFailure. Safe to call concurrently. Reference: Tech Spec Section 4.4. CurrentSegment returns the filename (not full path) of the current WAL segment. Used by the /api/status admin endpoint.
func (*WAL) IntegrityFailures ¶
IntegrityFailures returns the total number of HMAC mismatches encountered during Replay calls. Only non-zero when integrity=mac. Exposed to Prometheus via bubblefish_wal_integrity_failures_total.
func (*WAL) MarkDelivered ¶
MarkDelivered atomically rewrites the WAL entry for payloadID with status=DELIVERED. This rewrites the entire segment file containing the entry, which is O(segment_size).
HOT-PATH CALLERS MUST USE MarkDeliveredBatch instead, which amortizes the rewrite to one operation per segment per batch. The singular variant exists for low-frequency callers (recovery tools) and tests.
func (*WAL) MarkDeliveredBatch ¶
MarkDeliveredBatch atomically rewrites WAL entries for all payloadIDs with status=DELIVERED in a single segment rewrite pass. This is O(N) for N entries instead of O(N²) when calling MarkDelivered N times individually.
The WAL mutex is held for the duration so concurrent Append calls are serialised. This is intentional: correctness over throughput. MarkDeliveredBatch is called off the hot write path.
func (*WAL) MarkPermanentFailure ¶
MarkPermanentFailure atomically rewrites the WAL entry for payloadID with status=PERMANENT_FAILURE. Called by the queue worker when all retries are exhausted. The entry will never be re-enqueued on replay.
func (*WAL) PendingCount ¶
func (*WAL) Replay ¶
Replay reads all WAL segments oldest-first, calling fn for each PENDING entry.
Crash safety: if two segments exist (crash during rotation), both are replayed. Duplicate idempotency keys across segments are deduplicated — fn is called at most once per key.
Corrupt entries (CRC mismatch, malformed JSON, partial lines) are skipped with a WARN log. Replay does NOT hold the WAL mutex; callers must not call Append concurrently with Replay.
func (*WAL) SampleDelivered ¶
SampleDelivered scans all WAL segments and returns up to count randomly sampled entries with status DELIVERED. This is a read-only operation used by the consistency checker to verify that delivered payloads exist in the destination. Corrupt or malformed entries are silently skipped.
If fewer than count DELIVERED entries exist, all of them are returned. Reference: Tech Spec Section 11.5.
type WALUpdater ¶
type WALUpdater interface {
MarkDelivered(payloadID string) error
MarkDeliveredBatch(payloadIDs []string) error
MarkPermanentFailure(payloadID string) error
}
WALUpdater is implemented by WAL and consumed by the queue worker to mark entries after a destination write attempt. Callers MUST log errors from MarkDelivered/MarkDeliveredBatch at WARN level (not ERROR): a failure is non-fatal because destination idempotency prevents duplicate writes on replay. Callers MUST log errors from MarkPermanentFailure at ERROR level.