Documentation
¶
Index ¶
- Constants
- Variables
- func IsRetryable(err error) bool
- func IsType(err error, errorType string) bool
- func ShouldRetry(err error, attempt int, maxAttempts int) bool
- type Alert
- type AlertLevel
- type AlertThresholds
- type Branch
- type BranchCacheStats
- type Cache
- func (c *Cache) AddRecentEntry(lsn int64, entry interface{})
- func (c *Cache) Clear()
- func (c *Cache) Close()
- func (c *Cache) GetBranchMetadata(branchID string) (interface{}, bool)
- func (c *Cache) GetMaterializedState(collection string, lsn int64) (map[string]interface{}, bool)
- func (c *Cache) GetQueryResult(queryKey string) (interface{}, bool)
- func (c *Cache) GetRecentEntry(lsn int64) (interface{}, bool)
- func (c *Cache) GetStats() CacheStats
- func (c *Cache) InvalidateBranch(branchID string)
- func (c *Cache) InvalidateState(collection string)
- func (c *Cache) SetBranchMetadata(branchID string, metadata interface{})
- func (c *Cache) SetMaterializedState(collection string, lsn int64, state map[string]interface{})
- func (c *Cache) SetQueryResult(queryKey string, result interface{})
- type CacheConfig
- type CacheStats
- type CachedBranch
- type CachedQuery
- type CachedState
- type Entry
- type ErrorContext
- type HealthCheck
- type LRUCache
- type LRUItem
- type LRUStats
- type LatencyTracker
- type Metrics
- func (m *Metrics) GetSnapshot() Metrics
- func (m *Metrics) GetSuccessRate() map[string]float64
- func (m *Metrics) RecordAppend(latency time.Duration, success bool)
- func (m *Metrics) RecordBranchOp()
- func (m *Metrics) RecordConnectionError()
- func (m *Metrics) RecordMaterialization(latency time.Duration, success bool)
- func (m *Metrics) RecordQuery(latency time.Duration, success bool)
- func (m *Metrics) RecordRestoreOp()
- func (m *Metrics) Reset()
- func (m *Metrics) UpdateActiveBranches(count int)
- func (m *Metrics) UpdateActiveProjects(count int)
- func (m *Metrics) UpdateCurrentLSN(lsn int64)
- type Monitor
- func (m *Monitor) AddHealthCheck(check HealthCheck)
- func (m *Monitor) GetActiveAlerts() []Alert
- func (m *Monitor) GetHealthStatus() map[string]interface{}
- func (m *Monitor) IsHealthy() bool
- func (m *Monitor) ResolveAlert(title string)
- func (m *Monitor) Start()
- func (m *Monitor) Stop()
- func (m *Monitor) TriggerAlert(level AlertLevel, title, message string, data map[string]interface{})
- type MonitorConfig
- type OperationType
- type Project
- type QueryCacheStats
- type Service
- func (s *Service) Append(entry *Entry) (int64, error)
- func (s *Service) GetBranchEntries(branchID, collection string, startLSN, endLSN int64) ([]*Entry, error)
- func (s *Service) GetCurrentLSN() int64
- func (s *Service) GetDocumentHistory(branchID, collection, documentID string, startLSN, endLSN int64) ([]*Entry, error)
- func (s *Service) GetEntries(filter bson.M, opts ...*options.FindOptions) ([]*Entry, error)
- func (s *Service) GetEntriesByTimestamp(projectID string, timestamp time.Time) ([]*Entry, error)
- func (s *Service) GetEntry(lsn int64) (*Entry, error)
- func (s *Service) GetMetrics() Metrics
- func (s *Service) GetProjectEntries(projectID, collection string, startLSN, endLSN int64) ([]*Entry, error)
- func (s *Service) GetSuccessRates() map[string]float64
- type WALError
- func NewConflictError(message string, details map[string]interface{}) *WALError
- func NewDatabaseError(message string, cause error) *WALError
- func NewNotFoundError(resource string, id string) *WALError
- func NewTimeoutError(message string, cause error) *WALError
- func NewValidationError(message string, details map[string]interface{}) *WALError
- func NewWALError(errorType, message string, cause error, retryable bool) *WALError
Constants ¶
const ( ErrorTypeValidation = "validation" ErrorTypeDatabase = "database" ErrorTypeTimeout = "timeout" ErrorTypeNotFound = "not_found" ErrorTypeConflict = "conflict" ErrorTypeInternal = "internal" ErrorTypePermission = "permission" ErrorTypeRateLimit = "rate_limit" )
Common error types
Variables ¶
var ( // Core WAL errors ErrInvalidLSN = errors.New("invalid LSN") ErrLSNOutOfRange = errors.New("LSN out of range") ErrInvalidOperation = errors.New("invalid operation") ErrInvalidEntry = errors.New("invalid WAL entry") // Connection and database errors ErrDatabaseConnection = errors.New("database connection failed") ErrDatabaseTimeout = errors.New("database operation timeout") // Branch and project errors ErrBranchNotFound = errors.New("branch not found") ErrProjectNotFound = errors.New("project not found") ErrBranchExists = errors.New("branch already exists") ErrProjectExists = errors.New("project already exists") ErrInvalidBranchState = errors.New("invalid branch state") // Time travel errors ErrTimeTravelFailed = errors.New("time travel operation failed") ErrInvalidTimestamp = errors.New("invalid timestamp") ErrNoEntriesFound = errors.New("no entries found for specified criteria") ErrMaterializationFailed = errors.New("state materialization failed") // Restore operation errors ErrRestoreFailed = errors.New("restore operation failed") ErrUnsafeRestore = errors.New("restore would cause data loss") ErrRestoreValidation = errors.New("restore validation failed") ErrIncompatibleRestore = errors.New("incompatible restore operation") )
WAL-specific error types for better error handling and recovery
var GlobalMetrics = NewMetrics()
Global metrics instance
Functions ¶
Types ¶
type Alert ¶
type Alert struct {
Level AlertLevel
Title string
Message string
Timestamp time.Time
Data map[string]interface{}
Resolved bool
ResolvedAt time.Time
}
Alert represents a system alert
type AlertLevel ¶
type AlertLevel string
AlertLevel defines alert severity
const ( AlertLevelInfo AlertLevel = "info" AlertLevelWarning AlertLevel = "warning" AlertLevelError AlertLevel = "error" AlertLevelCritical AlertLevel = "critical" )
type AlertThresholds ¶
type AlertThresholds struct {
MaxErrorRate float64 // Maximum acceptable error rate (0.0-1.0)
MaxLatency time.Duration // Maximum acceptable latency
MaxConsecutiveFailures int // Maximum consecutive health check failures
MinSuccessRate float64 // Minimum success rate (0.0-1.0)
}
AlertThresholds defines when to trigger alerts
type Branch ¶
type Branch struct {
ID string `bson:"_id" json:"id"`
ProjectID string `bson:"project_id" json:"project_id"`
Name string `bson:"name" json:"name"`
ParentID string `bson:"parent_id,omitempty" json:"parent_id,omitempty"`
HeadLSN int64 `bson:"head_lsn" json:"head_lsn"`
BaseLSN int64 `bson:"base_lsn" json:"base_lsn"`
CreatedAt time.Time `bson:"created_at" json:"created_at"`
CreatedLSN int64 `bson:"created_lsn" json:"created_lsn"`
IsDeleted bool `bson:"is_deleted" json:"is_deleted"`
}
Branch represents a WAL-based branch
type BranchCacheStats ¶
type Cache ¶
type Cache struct {
// contains filtered or unexported fields
}
Cache provides intelligent caching for WAL operations
func (*Cache) AddRecentEntry ¶
AddRecentEntry adds an entry to the recent entries cache
func (*Cache) GetBranchMetadata ¶
GetBranchMetadata retrieves cached branch metadata
func (*Cache) GetMaterializedState ¶
GetMaterializedState retrieves a cached materialized state
func (*Cache) GetQueryResult ¶
GetQueryResult retrieves a cached query result
func (*Cache) GetRecentEntry ¶
GetRecentEntry retrieves a recent entry
func (*Cache) GetStats ¶
func (c *Cache) GetStats() CacheStats
GetStats returns cache performance statistics
func (*Cache) InvalidateBranch ¶
InvalidateBranch removes cached data for a branch
func (*Cache) InvalidateState ¶
InvalidateState removes cached states for a collection
func (*Cache) SetBranchMetadata ¶
SetBranchMetadata caches branch metadata
func (*Cache) SetMaterializedState ¶
SetMaterializedState caches a materialized state
func (*Cache) SetQueryResult ¶
SetQueryResult caches a query result
type CacheConfig ¶
type CacheConfig struct {
MaxStateMemory int64 // Maximum memory for state cache (bytes)
MaxRecentEntries int // Maximum recent entries to keep
QueryCacheTTL time.Duration // Query cache time-to-live
BranchCacheTTL time.Duration // Branch cache time-to-live
CleanupInterval time.Duration // How often to run cleanup
EnableMetrics bool // Enable cache metrics
}
CacheConfig configures cache behavior
type CacheStats ¶
type CacheStats struct {
StateCache LRUStats
QueryCache QueryCacheStats
BranchCache BranchCacheStats
RecentHits int64
RecentMisses int64
}
CacheStats provides cache performance statistics
type CachedBranch ¶
type CachedBranch struct {
BranchID string
Metadata interface{}
CreatedAt time.Time
ExpiresAt time.Time
AccessTime time.Time
}
CachedBranch represents cached branch metadata
type CachedQuery ¶
type CachedQuery struct {
Key string
Result interface{}
CreatedAt time.Time
ExpiresAt time.Time
AccessTime time.Time
HitCount int64
}
CachedQuery represents a cached query result
type CachedState ¶
type CachedState struct {
Collection string
LSN int64
State map[string]interface{}
AccessTime time.Time
Size int64
}
CachedState represents a cached materialized state
type Entry ¶
type Entry struct {
ID primitive.ObjectID `bson:"_id,omitempty" json:"id,omitempty"`
LSN int64 `bson:"lsn" json:"lsn"`
Timestamp time.Time `bson:"timestamp" json:"timestamp"`
ProjectID string `bson:"project_id" json:"project_id"`
BranchID string `bson:"branch_id" json:"branch_id"`
Operation OperationType `bson:"operation" json:"operation"`
Collection string `bson:"collection,omitempty" json:"collection,omitempty"`
DocumentID string `bson:"document_id,omitempty" json:"document_id,omitempty"`
Document bson.Raw `bson:"document,omitempty" json:"-"`
OldDocument bson.Raw `bson:"old_document,omitempty" json:"-"`
Metadata map[string]interface{} `bson:"metadata,omitempty" json:"metadata,omitempty"`
}
Entry represents a single WAL entry
type ErrorContext ¶
type ErrorContext struct {
Operation string `json:"operation"`
LSN int64 `json:"lsn,omitempty"`
ProjectID string `json:"project_id,omitempty"`
BranchID string `json:"branch_id,omitempty"`
Details map[string]interface{} `json:"details,omitempty"`
}
Error context for debugging
func NewErrorContext ¶
func NewErrorContext(operation string) *ErrorContext
func (*ErrorContext) WithBranch ¶
func (ctx *ErrorContext) WithBranch(branchID string) *ErrorContext
func (*ErrorContext) WithDetail ¶
func (ctx *ErrorContext) WithDetail(key string, value interface{}) *ErrorContext
func (*ErrorContext) WithLSN ¶
func (ctx *ErrorContext) WithLSN(lsn int64) *ErrorContext
func (*ErrorContext) WithProject ¶
func (ctx *ErrorContext) WithProject(projectID string) *ErrorContext
type HealthCheck ¶
type HealthCheck struct {
Name string
Description string
Check func() error
Interval time.Duration
Timeout time.Duration
Critical bool // If true, failure means system unhealthy
}
HealthCheck defines a health check function
type LRUCache ¶
type LRUCache struct {
// contains filtered or unexported fields
}
LRUCache implements a Least Recently Used cache
func (*LRUCache) InvalidateByPrefix ¶
func (*LRUCache) Set ¶
func (lru *LRUCache) Set(key string, state *CachedState)
type LRUItem ¶
type LRUItem struct {
// contains filtered or unexported fields
}
LRUItem represents an item in the LRU cache
type LatencyTracker ¶
type LatencyTracker struct {
// contains filtered or unexported fields
}
LatencyTracker tracks operation latencies with moving averages
type Metrics ¶
type Metrics struct {
// Operation counters
AppendOps int64 `json:"append_ops"`
QueryOps int64 `json:"query_ops"`
MaterialOps int64 `json:"material_ops"`
BranchOps int64 `json:"branch_ops"`
RestoreOps int64 `json:"restore_ops"`
// Error counters
AppendErrors int64 `json:"append_errors"`
QueryErrors int64 `json:"query_errors"`
MaterialErrors int64 `json:"material_errors"`
ConnectionErrors int64 `json:"connection_errors"`
// Performance metrics
AvgAppendLatency time.Duration `json:"avg_append_latency"`
AvgQueryLatency time.Duration `json:"avg_query_latency"`
AvgMaterialLatency time.Duration `json:"avg_material_latency"`
// Current state
CurrentLSN int64 `json:"current_lsn"`
ActiveBranches int `json:"active_branches"`
ActiveProjects int `json:"active_projects"`
LastOperationTime time.Time `json:"last_operation_time"`
// contains filtered or unexported fields
}
Metrics holds WAL operation metrics
func (*Metrics) GetSnapshot ¶
GetSnapshot returns a read-only snapshot of current metrics
func (*Metrics) GetSuccessRate ¶
GetSuccessRate returns the success rate for each operation type
func (*Metrics) RecordAppend ¶
RecordAppend records a WAL append operation
func (*Metrics) RecordBranchOp ¶
func (m *Metrics) RecordBranchOp()
RecordBranchOp records a branch operation
func (*Metrics) RecordConnectionError ¶
func (m *Metrics) RecordConnectionError()
RecordConnectionError records a connection error
func (*Metrics) RecordMaterialization ¶
RecordMaterialization records a materialization operation
func (*Metrics) RecordQuery ¶
RecordQuery records a query operation
func (*Metrics) RecordRestoreOp ¶
func (m *Metrics) RecordRestoreOp()
RecordRestoreOp records a restore operation
func (*Metrics) UpdateActiveBranches ¶
UpdateActiveBranches updates the active branch count
func (*Metrics) UpdateActiveProjects ¶
UpdateActiveProjects updates the active project count
func (*Metrics) UpdateCurrentLSN ¶
UpdateCurrentLSN updates the current LSN
type Monitor ¶
type Monitor struct {
// contains filtered or unexported fields
}
Monitor provides health monitoring and alerting for WAL operations
func NewMonitor ¶
func NewMonitor(metrics *Metrics, config MonitorConfig) *Monitor
NewMonitor creates a new WAL monitor
func (*Monitor) AddHealthCheck ¶
func (m *Monitor) AddHealthCheck(check HealthCheck)
AddHealthCheck adds a custom health check
func (*Monitor) GetActiveAlerts ¶
GetActiveAlerts returns currently active alerts
func (*Monitor) GetHealthStatus ¶
GetHealthStatus returns detailed health information
func (*Monitor) ResolveAlert ¶
ResolveAlert resolves an alert by title
func (*Monitor) TriggerAlert ¶
func (m *Monitor) TriggerAlert(level AlertLevel, title, message string, data map[string]interface{})
TriggerAlert creates a new alert
type MonitorConfig ¶
type MonitorConfig struct {
HealthCheckInterval time.Duration
MetricsReportInterval time.Duration
AlertThresholds AlertThresholds
EnableLogging bool
EnableMetricsExport bool
}
MonitorConfig configures monitoring behavior
type OperationType ¶
type OperationType string
OperationType represents the type of WAL operation
const ( OpInsert OperationType = "insert" OpUpdate OperationType = "update" OpDelete OperationType = "delete" OpCreateBranch OperationType = "create_branch" OpDeleteBranch OperationType = "delete_branch" OpCreateProject OperationType = "create_project" OpDeleteProject OperationType = "delete_project" )
type Project ¶
type Project struct {
ID string `bson:"_id" json:"id"`
Name string `bson:"name" json:"name"`
MainBranchID string `bson:"main_branch_id" json:"main_branch_id"`
CreatedAt time.Time `bson:"created_at" json:"created_at"`
UseWAL bool `bson:"use_wal" json:"use_wal"`
}
Project represents a WAL-enabled project
type QueryCacheStats ¶
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service manages WAL operations
func NewService ¶
NewService creates a new WAL service
func (*Service) GetBranchEntries ¶
func (s *Service) GetBranchEntries(branchID, collection string, startLSN, endLSN int64) ([]*Entry, error)
GetBranchEntries retrieves all entries for a specific branch and collection
func (*Service) GetCurrentLSN ¶
GetCurrentLSN returns the current LSN value
func (*Service) GetDocumentHistory ¶
func (s *Service) GetDocumentHistory(branchID, collection, documentID string, startLSN, endLSN int64) ([]*Entry, error)
GetDocumentHistory retrieves WAL entries for a specific document
func (*Service) GetEntries ¶
GetEntries retrieves WAL entries within an LSN range
func (*Service) GetEntriesByTimestamp ¶
GetEntriesByTimestamp retrieves entries up to a specific timestamp
func (*Service) GetMetrics ¶
GetMetrics returns a snapshot of current metrics
func (*Service) GetProjectEntries ¶
func (s *Service) GetProjectEntries(projectID, collection string, startLSN, endLSN int64) ([]*Entry, error)
GetProjectEntries retrieves all entries for a project within an LSN range
func (*Service) GetSuccessRates ¶
GetSuccessRates returns success rates for operations
type WALError ¶
type WALError struct {
Type string `json:"type"`
Message string `json:"message"`
Details map[string]interface{} `json:"details,omitempty"`
Cause error `json:"-"`
Retryable bool `json:"retryable"`
}
WALError provides structured error information
func NewConflictError ¶
Conflict errors (not retryable without user intervention)
func NewDatabaseError ¶
Database errors (retryable)
func NewNotFoundError ¶
Not found errors (not retryable)
func NewTimeoutError ¶
Timeout errors (retryable)
func NewValidationError ¶
Validation errors
func NewWALError ¶
NewWALError creates a new structured WAL error
func (*WALError) WithDetail ¶
WithDetail adds detail information to the error