wal

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 20, 2025 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ErrorTypeValidation = "validation"
	ErrorTypeDatabase   = "database"
	ErrorTypeTimeout    = "timeout"
	ErrorTypeNotFound   = "not_found"
	ErrorTypeConflict   = "conflict"
	ErrorTypeInternal   = "internal"
	ErrorTypePermission = "permission"
	ErrorTypeRateLimit  = "rate_limit"
)

Common error types

Variables

View Source
var (
	// Core WAL errors
	ErrInvalidLSN       = errors.New("invalid LSN")
	ErrLSNOutOfRange    = errors.New("LSN out of range")
	ErrInvalidOperation = errors.New("invalid operation")
	ErrInvalidEntry     = errors.New("invalid WAL entry")

	// Connection and database errors
	ErrDatabaseConnection  = errors.New("database connection failed")
	ErrDatabaseTimeout     = errors.New("database operation timeout")
	ErrDatabaseUnavailable = errors.New("database temporarily unavailable")

	// Branch and project errors
	ErrBranchNotFound     = errors.New("branch not found")
	ErrProjectNotFound    = errors.New("project not found")
	ErrBranchExists       = errors.New("branch already exists")
	ErrProjectExists      = errors.New("project already exists")
	ErrInvalidBranchState = errors.New("invalid branch state")

	// Time travel errors
	ErrTimeTravelFailed      = errors.New("time travel operation failed")
	ErrInvalidTimestamp      = errors.New("invalid timestamp")
	ErrNoEntriesFound        = errors.New("no entries found for specified criteria")
	ErrMaterializationFailed = errors.New("state materialization failed")

	// Restore operation errors
	ErrRestoreFailed       = errors.New("restore operation failed")
	ErrUnsafeRestore       = errors.New("restore would cause data loss")
	ErrRestoreValidation   = errors.New("restore validation failed")
	ErrIncompatibleRestore = errors.New("incompatible restore operation")
)

WAL-specific error types for better error handling and recovery

View Source
var GlobalMetrics = NewMetrics()

Global metrics instance

Functions

func IsRetryable

func IsRetryable(err error) bool

IsRetryable checks if an error is retryable

func IsType

func IsType(err error, errorType string) bool

IsType checks if an error is of a specific type

func ShouldRetry

func ShouldRetry(err error, attempt int, maxAttempts int) bool

Recovery helpers

Types

type Alert

type Alert struct {
	Level      AlertLevel
	Title      string
	Message    string
	Timestamp  time.Time
	Data       map[string]interface{}
	Resolved   bool
	ResolvedAt time.Time
}

Alert represents a system alert

type AlertLevel

type AlertLevel string

AlertLevel defines alert severity

const (
	AlertLevelInfo     AlertLevel = "info"
	AlertLevelWarning  AlertLevel = "warning"
	AlertLevelError    AlertLevel = "error"
	AlertLevelCritical AlertLevel = "critical"
)

type AlertThresholds

type AlertThresholds struct {
	MaxErrorRate           float64       // Maximum acceptable error rate (0.0-1.0)
	MaxLatency             time.Duration // Maximum acceptable latency
	MaxConsecutiveFailures int           // Maximum consecutive health check failures
	MinSuccessRate         float64       // Minimum success rate (0.0-1.0)
}

AlertThresholds defines when to trigger alerts

type Branch

type Branch struct {
	ID         string    `bson:"_id" json:"id"`
	ProjectID  string    `bson:"project_id" json:"project_id"`
	Name       string    `bson:"name" json:"name"`
	ParentID   string    `bson:"parent_id,omitempty" json:"parent_id,omitempty"`
	HeadLSN    int64     `bson:"head_lsn" json:"head_lsn"`
	BaseLSN    int64     `bson:"base_lsn" json:"base_lsn"`
	CreatedAt  time.Time `bson:"created_at" json:"created_at"`
	CreatedLSN int64     `bson:"created_lsn" json:"created_lsn"`
	IsDeleted  bool      `bson:"is_deleted" json:"is_deleted"`
}

Branch represents a WAL-based branch

type BranchCacheStats

type BranchCacheStats struct {
	Items   int
	Hits    int64
	Misses  int64
	Expired int64
}

type Cache

type Cache struct {
	// contains filtered or unexported fields
}

Cache provides intelligent caching for WAL operations

func NewCache

func NewCache(config CacheConfig) *Cache

NewCache creates a new WAL cache

func (*Cache) AddRecentEntry

func (c *Cache) AddRecentEntry(lsn int64, entry interface{})

AddRecentEntry adds an entry to the recent entries cache

func (*Cache) Clear

func (c *Cache) Clear()

Clear removes all cached data

func (*Cache) Close

func (c *Cache) Close()

Close stops the cache and cleanup routines

func (*Cache) GetBranchMetadata

func (c *Cache) GetBranchMetadata(branchID string) (interface{}, bool)

GetBranchMetadata retrieves cached branch metadata

func (*Cache) GetMaterializedState

func (c *Cache) GetMaterializedState(collection string, lsn int64) (map[string]interface{}, bool)

GetMaterializedState retrieves a cached materialized state

func (*Cache) GetQueryResult

func (c *Cache) GetQueryResult(queryKey string) (interface{}, bool)

GetQueryResult retrieves a cached query result

func (*Cache) GetRecentEntry

func (c *Cache) GetRecentEntry(lsn int64) (interface{}, bool)

GetRecentEntry retrieves a recent entry

func (*Cache) GetStats

func (c *Cache) GetStats() CacheStats

GetStats returns cache performance statistics

func (*Cache) InvalidateBranch

func (c *Cache) InvalidateBranch(branchID string)

InvalidateBranch removes cached data for a branch

func (*Cache) InvalidateState

func (c *Cache) InvalidateState(collection string)

InvalidateState removes cached states for a collection

func (*Cache) SetBranchMetadata

func (c *Cache) SetBranchMetadata(branchID string, metadata interface{})

SetBranchMetadata caches branch metadata

func (*Cache) SetMaterializedState

func (c *Cache) SetMaterializedState(collection string, lsn int64, state map[string]interface{})

SetMaterializedState caches a materialized state

func (*Cache) SetQueryResult

func (c *Cache) SetQueryResult(queryKey string, result interface{})

SetQueryResult caches a query result

type CacheConfig

type CacheConfig struct {
	MaxStateMemory   int64         // Maximum memory for state cache (bytes)
	MaxRecentEntries int           // Maximum recent entries to keep
	QueryCacheTTL    time.Duration // Query cache time-to-live
	BranchCacheTTL   time.Duration // Branch cache time-to-live
	CleanupInterval  time.Duration // How often to run cleanup
	EnableMetrics    bool          // Enable cache metrics
}

CacheConfig configures cache behavior

type CacheStats

type CacheStats struct {
	StateCache   LRUStats
	QueryCache   QueryCacheStats
	BranchCache  BranchCacheStats
	RecentHits   int64
	RecentMisses int64
}

CacheStats provides cache performance statistics

type CachedBranch

type CachedBranch struct {
	BranchID   string
	Metadata   interface{}
	CreatedAt  time.Time
	ExpiresAt  time.Time
	AccessTime time.Time
}

CachedBranch represents cached branch metadata

type CachedQuery

type CachedQuery struct {
	Key        string
	Result     interface{}
	CreatedAt  time.Time
	ExpiresAt  time.Time
	AccessTime time.Time
	HitCount   int64
}

CachedQuery represents a cached query result

type CachedState

type CachedState struct {
	Collection string
	LSN        int64
	State      map[string]interface{}
	AccessTime time.Time
	Size       int64
}

CachedState represents a cached materialized state

type Entry

type Entry struct {
	ID          primitive.ObjectID     `bson:"_id,omitempty" json:"id,omitempty"`
	LSN         int64                  `bson:"lsn" json:"lsn"`
	Timestamp   time.Time              `bson:"timestamp" json:"timestamp"`
	ProjectID   string                 `bson:"project_id" json:"project_id"`
	BranchID    string                 `bson:"branch_id" json:"branch_id"`
	Operation   OperationType          `bson:"operation" json:"operation"`
	Collection  string                 `bson:"collection,omitempty" json:"collection,omitempty"`
	DocumentID  string                 `bson:"document_id,omitempty" json:"document_id,omitempty"`
	Document    bson.Raw               `bson:"document,omitempty" json:"-"`
	OldDocument bson.Raw               `bson:"old_document,omitempty" json:"-"`
	Metadata    map[string]interface{} `bson:"metadata,omitempty" json:"metadata,omitempty"`
}

Entry represents a single WAL entry

type ErrorContext

type ErrorContext struct {
	Operation string                 `json:"operation"`
	LSN       int64                  `json:"lsn,omitempty"`
	ProjectID string                 `json:"project_id,omitempty"`
	BranchID  string                 `json:"branch_id,omitempty"`
	Details   map[string]interface{} `json:"details,omitempty"`
}

Error context for debugging

func NewErrorContext

func NewErrorContext(operation string) *ErrorContext

func (*ErrorContext) WithBranch

func (ctx *ErrorContext) WithBranch(branchID string) *ErrorContext

func (*ErrorContext) WithDetail

func (ctx *ErrorContext) WithDetail(key string, value interface{}) *ErrorContext

func (*ErrorContext) WithLSN

func (ctx *ErrorContext) WithLSN(lsn int64) *ErrorContext

func (*ErrorContext) WithProject

func (ctx *ErrorContext) WithProject(projectID string) *ErrorContext

type HealthCheck

type HealthCheck struct {
	Name        string
	Description string
	Check       func() error
	Interval    time.Duration
	Timeout     time.Duration
	Critical    bool // If true, failure means system unhealthy
}

HealthCheck defines a health check function

type LRUCache

type LRUCache struct {
	// contains filtered or unexported fields
}

LRUCache implements a Least Recently Used cache

func NewLRUCache

func NewLRUCache(capacity int64) *LRUCache

NewLRUCache creates a new LRU cache

func (*LRUCache) Clear

func (lru *LRUCache) Clear()

func (*LRUCache) Get

func (lru *LRUCache) Get(key string) (map[string]interface{}, bool)

LRUCache methods

func (*LRUCache) GetStats

func (lru *LRUCache) GetStats() LRUStats

func (*LRUCache) InvalidateByPrefix

func (lru *LRUCache) InvalidateByPrefix(prefix string)

func (*LRUCache) Set

func (lru *LRUCache) Set(key string, state *CachedState)

type LRUItem

type LRUItem struct {
	// contains filtered or unexported fields
}

LRUItem represents an item in the LRU cache

type LRUStats

type LRUStats struct {
	Size     int64
	Capacity int64
	Items    int
	Hits     int64
	Misses   int64
}

type LatencyTracker

type LatencyTracker struct {
	// contains filtered or unexported fields
}

LatencyTracker tracks operation latencies with moving averages

type Metrics

type Metrics struct {
	// Operation counters
	AppendOps   int64 `json:"append_ops"`
	QueryOps    int64 `json:"query_ops"`
	MaterialOps int64 `json:"material_ops"`
	BranchOps   int64 `json:"branch_ops"`
	RestoreOps  int64 `json:"restore_ops"`

	// Error counters
	AppendErrors     int64 `json:"append_errors"`
	QueryErrors      int64 `json:"query_errors"`
	MaterialErrors   int64 `json:"material_errors"`
	ConnectionErrors int64 `json:"connection_errors"`

	// Performance metrics
	AvgAppendLatency   time.Duration `json:"avg_append_latency"`
	AvgQueryLatency    time.Duration `json:"avg_query_latency"`
	AvgMaterialLatency time.Duration `json:"avg_material_latency"`

	// Current state
	CurrentLSN        int64     `json:"current_lsn"`
	ActiveBranches    int       `json:"active_branches"`
	ActiveProjects    int       `json:"active_projects"`
	LastOperationTime time.Time `json:"last_operation_time"`
	// contains filtered or unexported fields
}

Metrics holds WAL operation metrics

func NewMetrics

func NewMetrics() *Metrics

NewMetrics creates a new metrics collector

func (*Metrics) GetSnapshot

func (m *Metrics) GetSnapshot() Metrics

GetSnapshot returns a read-only snapshot of current metrics

func (*Metrics) GetSuccessRate

func (m *Metrics) GetSuccessRate() map[string]float64

GetSuccessRate returns the success rate for each operation type

func (*Metrics) RecordAppend

func (m *Metrics) RecordAppend(latency time.Duration, success bool)

RecordAppend records a WAL append operation

func (*Metrics) RecordBranchOp

func (m *Metrics) RecordBranchOp()

RecordBranchOp records a branch operation

func (*Metrics) RecordConnectionError

func (m *Metrics) RecordConnectionError()

RecordConnectionError records a connection error

func (*Metrics) RecordMaterialization

func (m *Metrics) RecordMaterialization(latency time.Duration, success bool)

RecordMaterialization records a materialization operation

func (*Metrics) RecordQuery

func (m *Metrics) RecordQuery(latency time.Duration, success bool)

RecordQuery records a query operation

func (*Metrics) RecordRestoreOp

func (m *Metrics) RecordRestoreOp()

RecordRestoreOp records a restore operation

func (*Metrics) Reset

func (m *Metrics) Reset()

Reset resets all metrics (useful for testing)

func (*Metrics) UpdateActiveBranches

func (m *Metrics) UpdateActiveBranches(count int)

UpdateActiveBranches updates the active branch count

func (*Metrics) UpdateActiveProjects

func (m *Metrics) UpdateActiveProjects(count int)

UpdateActiveProjects updates the active project count

func (*Metrics) UpdateCurrentLSN

func (m *Metrics) UpdateCurrentLSN(lsn int64)

UpdateCurrentLSN updates the current LSN

type Monitor

type Monitor struct {
	// contains filtered or unexported fields
}

Monitor provides health monitoring and alerting for WAL operations

func NewMonitor

func NewMonitor(metrics *Metrics, config MonitorConfig) *Monitor

NewMonitor creates a new WAL monitor

func (*Monitor) AddHealthCheck

func (m *Monitor) AddHealthCheck(check HealthCheck)

AddHealthCheck adds a custom health check

func (*Monitor) GetActiveAlerts

func (m *Monitor) GetActiveAlerts() []Alert

GetActiveAlerts returns currently active alerts

func (*Monitor) GetHealthStatus

func (m *Monitor) GetHealthStatus() map[string]interface{}

GetHealthStatus returns detailed health information

func (*Monitor) IsHealthy

func (m *Monitor) IsHealthy() bool

IsHealthy returns current health status

func (*Monitor) ResolveAlert

func (m *Monitor) ResolveAlert(title string)

ResolveAlert resolves an alert by title

func (*Monitor) Start

func (m *Monitor) Start()

Start begins monitoring

func (*Monitor) Stop

func (m *Monitor) Stop()

Stop stops monitoring

func (*Monitor) TriggerAlert

func (m *Monitor) TriggerAlert(level AlertLevel, title, message string, data map[string]interface{})

TriggerAlert creates a new alert

type MonitorConfig

type MonitorConfig struct {
	HealthCheckInterval   time.Duration
	MetricsReportInterval time.Duration
	AlertThresholds       AlertThresholds
	EnableLogging         bool
	EnableMetricsExport   bool
}

MonitorConfig configures monitoring behavior

type OperationType

type OperationType string

OperationType represents the type of WAL operation

const (
	OpInsert        OperationType = "insert"
	OpUpdate        OperationType = "update"
	OpDelete        OperationType = "delete"
	OpCreateBranch  OperationType = "create_branch"
	OpDeleteBranch  OperationType = "delete_branch"
	OpCreateProject OperationType = "create_project"
	OpDeleteProject OperationType = "delete_project"
)

type Project

type Project struct {
	ID           string    `bson:"_id" json:"id"`
	Name         string    `bson:"name" json:"name"`
	MainBranchID string    `bson:"main_branch_id" json:"main_branch_id"`
	CreatedAt    time.Time `bson:"created_at" json:"created_at"`
	UseWAL       bool      `bson:"use_wal" json:"use_wal"`
}

Project represents a WAL-enabled project

type QueryCacheStats

type QueryCacheStats struct {
	Items   int
	Hits    int64
	Misses  int64
	Expired int64
}

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service manages WAL operations

func NewService

func NewService(db *mongo.Database) (*Service, error)

NewService creates a new WAL service

func (*Service) Append

func (s *Service) Append(entry *Entry) (int64, error)

Append adds a new entry to the WAL

func (*Service) GetBranchEntries

func (s *Service) GetBranchEntries(branchID, collection string, startLSN, endLSN int64) ([]*Entry, error)

GetBranchEntries retrieves all entries for a specific branch and collection

func (*Service) GetCurrentLSN

func (s *Service) GetCurrentLSN() int64

GetCurrentLSN returns the current LSN value

func (*Service) GetDocumentHistory

func (s *Service) GetDocumentHistory(branchID, collection, documentID string, startLSN, endLSN int64) ([]*Entry, error)

GetDocumentHistory retrieves WAL entries for a specific document

func (*Service) GetEntries

func (s *Service) GetEntries(filter bson.M, opts ...*options.FindOptions) ([]*Entry, error)

GetEntries retrieves WAL entries within an LSN range

func (*Service) GetEntriesByTimestamp

func (s *Service) GetEntriesByTimestamp(projectID string, timestamp time.Time) ([]*Entry, error)

GetEntriesByTimestamp retrieves entries up to a specific timestamp

func (*Service) GetEntry

func (s *Service) GetEntry(lsn int64) (*Entry, error)

GetEntry retrieves a single WAL entry by LSN

func (*Service) GetMetrics

func (s *Service) GetMetrics() Metrics

GetMetrics returns a snapshot of current metrics

func (*Service) GetProjectEntries

func (s *Service) GetProjectEntries(projectID, collection string, startLSN, endLSN int64) ([]*Entry, error)

GetProjectEntries retrieves all entries for a project within an LSN range

func (*Service) GetSuccessRates

func (s *Service) GetSuccessRates() map[string]float64

GetSuccessRates returns success rates for operations

type WALError

type WALError struct {
	Type      string                 `json:"type"`
	Message   string                 `json:"message"`
	Details   map[string]interface{} `json:"details,omitempty"`
	Cause     error                  `json:"-"`
	Retryable bool                   `json:"retryable"`
}

WALError provides structured error information

func NewConflictError

func NewConflictError(message string, details map[string]interface{}) *WALError

Conflict errors (not retryable without user intervention)

func NewDatabaseError

func NewDatabaseError(message string, cause error) *WALError

Database errors (retryable)

func NewNotFoundError

func NewNotFoundError(resource string, id string) *WALError

Not found errors (not retryable)

func NewTimeoutError

func NewTimeoutError(message string, cause error) *WALError

Timeout errors (retryable)

func NewValidationError

func NewValidationError(message string, details map[string]interface{}) *WALError

Validation errors

func NewWALError

func NewWALError(errorType, message string, cause error, retryable bool) *WALError

NewWALError creates a new structured WAL error

func (*WALError) Error

func (e *WALError) Error() string

func (*WALError) Unwrap

func (e *WALError) Unwrap() error

func (*WALError) WithDetail

func (e *WALError) WithDetail(key string, value interface{}) *WALError

WithDetail adds detail information to the error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL