Documentation
¶
Index ¶
- Constants
- Variables
- func Is(err, target error) bool
- func IsFatal(err error) bool
- func IsNoLeaderError(err error) bool
- func IsNoQuorumError(err error) bool
- func IsNotLeaderError(err error) bool
- func IsRetryable(err error) bool
- func IsStaleTermError(err error) bool
- func NewNoLeaderError() *errors.ForgeError
- func NewNoQuorumError(required, available int) *errors.ForgeError
- func NewNotLeaderError(nodeID string, leaderID string) *errors.ForgeError
- func NewStaleTermError(current, received uint64) *errors.ForgeError
- func NewTimeoutError(operation string) *errors.ForgeError
- type AdminAPIConfig
- type AdvancedConfig
- type AppendEntriesRequest
- type AppendEntriesResponse
- type BadgerOptions
- type BatchOp
- type BatchOpType
- type BoltOptions
- type ClusterInfo
- type ClusterManager
- type Command
- type Config
- type ConfigOption
- func WithBindAddress(addr string, port int) ConfigOption
- func WithClusterID(id string) ConfigOption
- func WithConfig(cfg Config) ConfigOption
- func WithDiscoveryType(discoveryType string) ConfigOption
- func WithMTLS(caFile string) ConfigOption
- func WithNodeID(id string) ConfigOption
- func WithPeers(peers []PeerConfig) ConfigOption
- func WithRequireConfig(require bool) ConfigOption
- func WithStoragePath(path string) ConfigOption
- func WithStorageType(storageType string) ConfigOption
- func WithTLS(certFile, keyFile string) ConfigOption
- func WithTransportType(transportType string) ConfigOption
- type ConsensusEvent
- type ConsensusEventType
- type ConsensusService
- type ConsensusStats
- type Discovery
- type DiscoveryConfig
- type DiscoveryEvent
- type DiscoveryEventType
- type ElectionConfig
- type EntryType
- type EventsConfig
- type HealthCheck
- type HealthConfig
- type HealthStatus
- type HealthStatusEvent
- type InstallSnapshotRequest
- type InstallSnapshotResponse
- type KeyValue
- type LeaderElectedEvent
- type LogEntry
- type LoggingConfig
- type MembershipChangedEvent
- type Message
- type MessageType
- type MetricsConfig
- type NodeChangeEvent
- type NodeChangeType
- type NodeInfo
- type NodeRole
- type NodeStatus
- type ObservabilityConfig
- type PeerConfig
- type QuorumStatusEvent
- type RaftConfig
- type RaftNode
- type RaftStats
- type RequestVoteRequest
- type RequestVoteResponse
- type ResilienceConfig
- type RoleChangedEvent
- type SecurityConfig
- type Snapshot
- type SnapshotEvent
- type SnapshotMetadata
- type StateMachine
- type Storage
- type StorageConfig
- type TracingConfig
- type Transport
- type TransportConfig
Constants ¶
const ( ErrCodeNotLeader = "CONSENSUS_NOT_LEADER" ErrCodeNoLeader = "CONSENSUS_NO_LEADER" ErrCodeNotStarted = "CONSENSUS_NOT_STARTED" ErrCodeAlreadyStarted = "CONSENSUS_ALREADY_STARTED" ErrCodeNodeNotFound = "CONSENSUS_NODE_NOT_FOUND" ErrCodeClusterNotFound = "CONSENSUS_CLUSTER_NOT_FOUND" ErrCodeNoQuorum = "CONSENSUS_NO_QUORUM" ErrCodeInvalidTerm = "CONSENSUS_INVALID_TERM" ErrCodeStaleTerm = "CONSENSUS_STALE_TERM" ErrCodeLogInconsistent = "CONSENSUS_LOG_INCONSISTENT" ErrCodeSnapshotFailed = "CONSENSUS_SNAPSHOT_FAILED" ErrCodeCompactionFailed = "CONSENSUS_COMPACTION_FAILED" ErrCodeElectionTimeout = "CONSENSUS_ELECTION_TIMEOUT" ErrCodeInvalidPeer = "CONSENSUS_INVALID_PEER" ErrCodePeerExists = "CONSENSUS_PEER_EXISTS" ErrCodePeerNotFound = "CONSENSUS_PEER_NOT_FOUND" ErrCodeInsufficientPeers = "CONSENSUS_INSUFFICIENT_PEERS" ErrCodeNoAvailablePeers = "CONSENSUS_NO_AVAILABLE_PEERS" ErrCodeFailoverFailed = "CONSENSUS_FAILOVER_FAILED" ErrCodeOperationFailed = "CONSENSUS_OPERATION_FAILED" ErrCodeReplicationFailed = "CONSENSUS_REPLICATION_FAILED" ErrCodeTimeout = "CONSENSUS_TIMEOUT" ErrCodeConnectionFailed = "CONSENSUS_CONNECTION_FAILED" ErrCodeStaleRead = "CONSENSUS_STALE_READ" ErrCodeRateLimitExceeded = "CONSENSUS_RATE_LIMIT_EXCEEDED" ErrCodeAuthenticationFailed = "CONSENSUS_AUTHENTICATION_FAILED" ErrCodePoolExhausted = "CONSENSUS_POOL_EXHAUSTED" ErrCodeConnectionTimeout = "CONSENSUS_CONNECTION_TIMEOUT" )
Consensus error codes
Variables ¶
var ( // ErrNotLeader indicates the node is not the leader ErrNotLeader = &errors.ForgeError{Code: ErrCodeNotLeader, Message: "node is not the leader"} // ErrNoLeader indicates there is no leader ErrNoLeader = &errors.ForgeError{Code: ErrCodeNoLeader, Message: "no leader available"} // ErrNotStarted indicates the service is not started ErrNotStarted = &errors.ForgeError{Code: ErrCodeNotStarted, Message: "consensus service not started"} // ErrAlreadyStarted indicates the service is already started ErrAlreadyStarted = &errors.ForgeError{Code: ErrCodeAlreadyStarted, Message: "consensus service already started"} // ErrNodeNotFound indicates a node was not found ErrNodeNotFound = &errors.ForgeError{Code: ErrCodeNodeNotFound, Message: "node not found"} // ErrClusterNotFound indicates a cluster was not found ErrClusterNotFound = &errors.ForgeError{Code: ErrCodeClusterNotFound, Message: "cluster not found"} ErrStorageUnavailable = &errors.ForgeError{Code: ErrCodeStorageUnavailable, Message: "storage unavailable"} ErrTransportUnavailable = &errors.ForgeError{Code: ErrCodeTransportUnavailable, Message: "transport unavailable"} ErrDiscoveryUnavailable = &errors.ForgeError{Code: ErrCodeDiscoveryUnavailable, Message: "discovery service unavailable"} // ErrNoQuorum indicates no quorum is available ErrNoQuorum = &errors.ForgeError{Code: ErrCodeNoQuorum, Message: "no quorum available"} // ErrInvalidTerm indicates an invalid term ErrInvalidTerm = &errors.ForgeError{Code: ErrCodeInvalidTerm, Message: "invalid term"} // ErrStaleTerm indicates a stale term ErrStaleTerm = &errors.ForgeError{Code: ErrCodeStaleTerm, Message: "stale term"} // ErrLogInconsistent indicates log inconsistency ErrLogInconsistent = &errors.ForgeError{Code: ErrCodeLogInconsistent, Message: "log inconsistent"} // ErrSnapshotFailed indicates snapshot operation failed ErrSnapshotFailed = &errors.ForgeError{Code: ErrCodeSnapshotFailed, Message: "snapshot operation failed"} // ErrCompactionFailed indicates compaction failed ErrCompactionFailed = &errors.ForgeError{Code: ErrCodeCompactionFailed, Message: "compaction failed"} // ErrElectionTimeout indicates election timeout ErrElectionTimeout = &errors.ForgeError{Code: ErrCodeElectionTimeout, Message: "election timeout"} // ErrInvalidPeer indicates an invalid peer ErrInvalidPeer = &errors.ForgeError{Code: ErrCodeInvalidPeer, Message: "invalid peer"} // ErrPeerExists indicates peer already exists ErrPeerExists = &errors.ForgeError{Code: ErrCodePeerExists, Message: "peer already exists"} // ErrPeerNotFound indicates peer not found ErrPeerNotFound = &errors.ForgeError{Code: ErrCodePeerNotFound, Message: "peer not found"} // ErrInsufficientPeers indicates insufficient peers ErrInsufficientPeers = &errors.ForgeError{Code: ErrCodeInsufficientPeers, Message: "insufficient peers"} // ErrNoAvailablePeers indicates no available peers for operation ErrNoAvailablePeers = &errors.ForgeError{Code: ErrCodeNoAvailablePeers, Message: "no available peers"} // ErrFailoverFailed indicates failover operation failed ErrFailoverFailed = &errors.ForgeError{Code: ErrCodeFailoverFailed, Message: "failover operation failed"} // ErrOperationFailed indicates a generic operation failure ErrOperationFailed = &errors.ForgeError{Code: ErrCodeOperationFailed, Message: "operation failed"} // ErrReplicationFailed indicates replication operation failed ErrReplicationFailed = &errors.ForgeError{Code: ErrCodeReplicationFailed, Message: "replication failed"} // ErrTimeout indicates operation timeout ErrTimeout = &errors.ForgeError{Code: ErrCodeTimeout, Message: "operation timeout"} // ErrConnectionFailed indicates connection failure ErrConnectionFailed = &errors.ForgeError{Code: ErrCodeConnectionFailed, Message: "connection failed"} // ErrStaleRead indicates a stale read attempt ErrStaleRead = &errors.ForgeError{Code: ErrCodeStaleRead, Message: "stale read"} // ErrRateLimitExceeded indicates rate limit exceeded ErrRateLimitExceeded = &errors.ForgeError{Code: ErrCodeRateLimitExceeded, Message: "rate limit exceeded"} // ErrAuthenticationFailed indicates authentication failure ErrAuthenticationFailed = &errors.ForgeError{Code: ErrCodeAuthenticationFailed, Message: "authentication failed"} // ErrPoolExhausted indicates connection pool exhausted ErrPoolExhausted = &errors.ForgeError{Code: ErrCodePoolExhausted, Message: "connection pool exhausted"} // ErrConnectionTimeout indicates connection timeout ErrConnectionTimeout = &errors.ForgeError{Code: ErrCodeConnectionTimeout, Message: "connection timeout"} // ErrInvalidConfig indicates invalid configuration ErrInvalidConfig = errors.ErrInvalidConfigSentinel )
Sentinel errors for use with errors.Is
Functions ¶
func IsNoLeaderError ¶
IsNoLeaderError returns true if the error is a no leader error
func IsNoQuorumError ¶
IsNoQuorumError returns true if the error is a no quorum error
func IsNotLeaderError ¶
IsNotLeaderError returns true if the error is a not leader error
func IsRetryable ¶
IsRetryable returns true if the error is retryable
func IsStaleTermError ¶
IsStaleTermError returns true if the error is a stale term error
func NewNoLeaderError ¶
func NewNoLeaderError() *errors.ForgeError
NewNoLeaderError creates a no leader error
func NewNoQuorumError ¶
func NewNoQuorumError(required, available int) *errors.ForgeError
NewNoQuorumError creates a no quorum error with context
func NewNotLeaderError ¶
func NewNotLeaderError(nodeID string, leaderID string) *errors.ForgeError
NewNotLeaderError creates a not leader error with context
func NewStaleTermError ¶
func NewStaleTermError(current, received uint64) *errors.ForgeError
NewStaleTermError creates a stale term error with context
func NewTimeoutError ¶
func NewTimeoutError(operation string) *errors.ForgeError
NewTimeoutError creates a timeout error
Types ¶
type AdminAPIConfig ¶
type AdminAPIConfig struct {
Enabled bool `yaml:"enabled" json:"enabled" default:"true"`
PathPrefix string `yaml:"path_prefix" json:"path_prefix" default:"/consensus"`
EnableAuth bool `yaml:"enable_auth" json:"enable_auth" default:"false"`
APIKey string `yaml:"api_key" json:"api_key"`
}
AdminAPIConfig contains admin API configuration
type AdvancedConfig ¶
type AdvancedConfig struct {
EnableAutoSnapshot bool `yaml:"enable_auto_snapshot" json:"enable_auto_snapshot" default:"true"`
EnableAutoCompaction bool `yaml:"enable_auto_compaction" json:"enable_auto_compaction" default:"true"`
CompactionInterval time.Duration `yaml:"compaction_interval" json:"compaction_interval" default:"1h"`
MaxMemoryUsage int64 `yaml:"max_memory_usage" json:"max_memory_usage" default:"1073741824"` // 1GB
GCInterval time.Duration `yaml:"gc_interval" json:"gc_interval" default:"5m"`
EnableReadIndex bool `yaml:"enable_read_index" json:"enable_read_index" default:"true"`
EnableLeasedReads bool `yaml:"enable_leased_reads" json:"enable_leased_reads" default:"true"`
}
AdvancedConfig contains advanced settings
type AppendEntriesRequest ¶
type AppendEntriesRequest struct {
Term uint64 `json:"term"`
LeaderID string `json:"leader_id"`
PrevLogIndex uint64 `json:"prev_log_index"`
PrevLogTerm uint64 `json:"prev_log_term"`
Entries []LogEntry `json:"entries"`
LeaderCommit uint64 `json:"leader_commit"`
}
AppendEntriesRequest represents an AppendEntries RPC request
type AppendEntriesResponse ¶
type AppendEntriesResponse struct {
Term uint64 `json:"term"`
Success bool `json:"success"`
MatchIndex uint64 `json:"match_index"`
NodeID string `json:"node_id"`
}
AppendEntriesResponse represents an AppendEntries RPC response
type BadgerOptions ¶
type BadgerOptions struct {
ValueLogMaxEntries uint32 `yaml:"value_log_max_entries" json:"value_log_max_entries" default:"1000000"`
MemTableSize int64 `yaml:"mem_table_size" json:"mem_table_size" default:"67108864"` // 64MB
NumMemTables int `yaml:"num_mem_tables" json:"num_mem_tables" default:"5"`
NumLevelZeroTables int `yaml:"num_level_zero_tables" json:"num_level_zero_tables" default:"5"`
NumCompactors int `yaml:"num_compactors" json:"num_compactors" default:"4"`
}
BadgerOptions contains BadgerDB-specific options
type BatchOp ¶
type BatchOp struct {
Type BatchOpType
Key []byte
Value []byte
}
BatchOp represents a batch operation
type BatchOpType ¶
type BatchOpType int
BatchOpType represents the type of batch operation
const ( // BatchOpSet is a set operation BatchOpSet BatchOpType = iota // BatchOpDelete is a delete operation BatchOpDelete )
type BoltOptions ¶
type BoltOptions struct {
NoSync bool `yaml:"no_sync" json:"no_sync" default:"false"`
NoGrowSync bool `yaml:"no_grow_sync" json:"no_grow_sync" default:"false"`
InitialMmapSize int `yaml:"initial_mmap_size" json:"initial_mmap_size" default:"0"`
Timeout time.Duration `yaml:"timeout" json:"timeout" default:"1s"`
}
BoltOptions contains BoltDB-specific options
type ClusterInfo ¶
type ClusterInfo struct {
ID string `json:"id"`
Leader string `json:"leader"`
Term uint64 `json:"term"`
Nodes []NodeInfo `json:"nodes"`
TotalNodes int `json:"total_nodes"`
ActiveNodes int `json:"active_nodes"`
HasQuorum bool `json:"has_quorum"`
CommitIndex uint64 `json:"commit_index"`
LastApplied uint64 `json:"last_applied"`
}
ClusterInfo represents information about the cluster
type ClusterManager ¶
type ClusterManager interface {
// GetNodes returns all nodes in the cluster
GetNodes() []NodeInfo
// GetNode returns information about a specific node
GetNode(nodeID string) (*NodeInfo, error)
// AddNode adds a node to the cluster
AddNode(nodeID, address string, port int) error
// RemoveNode removes a node from the cluster
RemoveNode(nodeID string) error
// UpdateNode updates node information
UpdateNode(nodeID string, info NodeInfo) error
// GetLeader returns the current leader node
GetLeader() *NodeInfo
// HasQuorum returns true if the cluster has quorum
HasQuorum() bool
// GetClusterSize returns the size of the cluster
GetClusterSize() int
// GetHealthyNodes returns the number of healthy nodes
GetHealthyNodes() int
}
ClusterManager defines the interface for cluster management
type Config ¶
type Config struct {
// Node configuration
NodeID string `yaml:"node_id" json:"node_id"`
ClusterID string `yaml:"cluster_id" json:"cluster_id"`
BindAddr string `yaml:"bind_addr" json:"bind_addr" default:"0.0.0.0"`
BindPort int `yaml:"bind_port" json:"bind_port" default:"7000"`
// Peers - initial cluster members
Peers []PeerConfig `yaml:"peers" json:"peers"`
// Raft configuration
Raft RaftConfig `yaml:"raft" json:"raft"`
// Transport configuration
Transport TransportConfig `yaml:"transport" json:"transport"`
// Discovery configuration
Discovery DiscoveryConfig `yaml:"discovery" json:"discovery"`
// Storage configuration
Storage StorageConfig `yaml:"storage" json:"storage"`
// Election configuration
Election ElectionConfig `yaml:"election" json:"election"`
// Health check configuration
Health HealthConfig `yaml:"health" json:"health"`
// Observability configuration
Observability ObservabilityConfig `yaml:"observability" json:"observability"`
// Security configuration
Security SecurityConfig `yaml:"security" json:"security"`
// Resilience configuration
Resilience ResilienceConfig `yaml:"resilience" json:"resilience"`
// Admin API configuration
AdminAPI AdminAPIConfig `yaml:"admin_api" json:"admin_api"`
// Events configuration
Events EventsConfig `yaml:"events" json:"events"`
// Advanced settings
Advanced AdvancedConfig `yaml:"advanced" json:"advanced"`
// Internal flag
RequireConfig bool `yaml:"-" json:"-"`
}
Config contains all consensus configuration
type ConfigOption ¶
type ConfigOption func(*Config)
ConfigOption is a functional option for Config
func WithBindAddress ¶
func WithBindAddress(addr string, port int) ConfigOption
WithBindAddress sets the bind address and port
func WithConfig ¶
func WithConfig(cfg Config) ConfigOption
WithConfig sets the entire config (for backward compatibility)
func WithDiscoveryType ¶
func WithDiscoveryType(discoveryType string) ConfigOption
WithDiscoveryType sets the discovery type
func WithMTLS ¶
func WithMTLS(caFile string) ConfigOption
WithMTLS enables mutual TLS with the given CA file
func WithPeers ¶
func WithPeers(peers []PeerConfig) ConfigOption
WithPeers sets the initial peer list
func WithRequireConfig ¶
func WithRequireConfig(require bool) ConfigOption
WithRequireConfig sets whether config is required from ConfigManager
func WithStoragePath ¶
func WithStoragePath(path string) ConfigOption
WithStoragePath sets the storage path
func WithStorageType ¶
func WithStorageType(storageType string) ConfigOption
WithStorageType sets the storage type
func WithTLS ¶
func WithTLS(certFile, keyFile string) ConfigOption
WithTLS enables TLS with the given cert and key files
func WithTransportType ¶
func WithTransportType(transportType string) ConfigOption
WithTransportType sets the transport type
type ConsensusEvent ¶
type ConsensusEvent struct {
Type ConsensusEventType `json:"type"`
NodeID string `json:"node_id"`
ClusterID string `json:"cluster_id"`
Data map[string]interface{} `json:"data"`
Timestamp time.Time `json:"timestamp"`
}
ConsensusEvent represents a consensus event
type ConsensusEventType ¶
type ConsensusEventType string
ConsensusEventType represents the type of consensus event
const ( // Node lifecycle events ConsensusEventNodeStarted ConsensusEventType = "consensus.node.started" ConsensusEventNodeStopped ConsensusEventType = "consensus.node.stopped" ConsensusEventNodeJoined ConsensusEventType = "consensus.node.joined" ConsensusEventNodeLeft ConsensusEventType = "consensus.node.left" ConsensusEventNodeFailed ConsensusEventType = "consensus.node.failed" ConsensusEventNodeRecovered ConsensusEventType = "consensus.node.recovered" // Leadership events ConsensusEventLeaderElected ConsensusEventType = "consensus.leader.elected" ConsensusEventLeaderStepDown ConsensusEventType = "consensus.leader.stepdown" ConsensusEventLeaderTransfer ConsensusEventType = "consensus.leader.transfer" ConsensusEventLeaderLost ConsensusEventType = "consensus.leader.lost" // Role change events ConsensusEventRoleChanged ConsensusEventType = "consensus.role.changed" ConsensusEventBecameFollower ConsensusEventType = "consensus.role.follower" ConsensusEventBecameCandidate ConsensusEventType = "consensus.role.candidate" ConsensusEventBecameLeader ConsensusEventType = "consensus.role.leader" // Cluster events ConsensusEventClusterFormed ConsensusEventType = "consensus.cluster.formed" ConsensusEventClusterUpdated ConsensusEventType = "consensus.cluster.updated" ConsensusEventQuorumAchieved ConsensusEventType = "consensus.cluster.quorum.achieved" ConsensusEventQuorumLost ConsensusEventType = "consensus.cluster.quorum.lost" ConsensusEventMembershipChanged ConsensusEventType = "consensus.cluster.membership.changed" // Log events ConsensusEventLogAppended ConsensusEventType = "consensus.log.appended" ConsensusEventLogCommitted ConsensusEventType = "consensus.log.committed" ConsensusEventLogCompacted ConsensusEventType = "consensus.log.compacted" ConsensusEventLogTruncated ConsensusEventType = "consensus.log.truncated" // Snapshot events ConsensusEventSnapshotStarted ConsensusEventType = "consensus.snapshot.started" ConsensusEventSnapshotCompleted ConsensusEventType = "consensus.snapshot.completed" ConsensusEventSnapshotFailed ConsensusEventType = "consensus.snapshot.failed" ConsensusEventSnapshotRestored ConsensusEventType = "consensus.snapshot.restored" // Health events ConsensusEventHealthy ConsensusEventType = "consensus.health.healthy" ConsensusEventUnhealthy ConsensusEventType = "consensus.health.unhealthy" ConsensusEventDegraded ConsensusEventType = "consensus.health.degraded" ConsensusEventRecovering ConsensusEventType = "consensus.health.recovering" // Configuration events ConsensusEventConfigUpdated ConsensusEventType = "consensus.config.updated" ConsensusEventConfigReloaded ConsensusEventType = "consensus.config.reloaded" )
type ConsensusService ¶
type ConsensusService interface {
// Start starts the consensus service
Start(ctx context.Context) error
// Stop stops the consensus service
Stop(ctx context.Context) error
// IsLeader returns true if this node is the leader
IsLeader() bool
// GetLeader returns the current leader node ID
GetLeader() string
// GetRole returns the current role of this node
GetRole() NodeRole
// GetTerm returns the current term
GetTerm() uint64
// Apply applies a command to the state machine
Apply(ctx context.Context, cmd Command) error
// Read performs a consistent read operation
Read(ctx context.Context, query interface{}) (interface{}, error)
// GetStats returns consensus statistics
GetStats() ConsensusStats
// HealthCheck performs a health check
HealthCheck(ctx context.Context) error
// GetHealthStatus returns detailed health status
GetHealthStatus(ctx context.Context) HealthStatus
// GetClusterInfo returns cluster information
GetClusterInfo() ClusterInfo
// AddNode adds a node to the cluster
AddNode(ctx context.Context, nodeID, address string, port int) error
// RemoveNode removes a node from the cluster
RemoveNode(ctx context.Context, nodeID string) error
// TransferLeadership transfers leadership to another node
TransferLeadership(ctx context.Context, targetNodeID string) error
// StepDown causes the leader to step down
StepDown(ctx context.Context) error
// Snapshot creates a snapshot
Snapshot(ctx context.Context) error
// UpdateConfig updates the configuration
UpdateConfig(ctx context.Context, config Config) error
}
ConsensusService defines the interface for the consensus service
type ConsensusStats ¶
type ConsensusStats struct {
NodeID string `json:"node_id"`
ClusterID string `json:"cluster_id"`
Role NodeRole `json:"role"`
Status NodeStatus `json:"status"`
Term uint64 `json:"term"`
LeaderID string `json:"leader_id"`
CommitIndex uint64 `json:"commit_index"`
LastApplied uint64 `json:"last_applied"`
LastLogIndex uint64 `json:"last_log_index"`
LastLogTerm uint64 `json:"last_log_term"`
ClusterSize int `json:"cluster_size"`
HealthyNodes int `json:"healthy_nodes"`
HasQuorum bool `json:"has_quorum"`
ElectionsTotal int64 `json:"elections_total"`
ElectionsFailed int64 `json:"elections_failed"`
OperationsTotal int64 `json:"operations_total"`
OperationsFailed int64 `json:"operations_failed"`
OperationsPerSec float64 `json:"operations_per_sec"`
AverageLatencyMs float64 `json:"average_latency_ms"`
ErrorRate float64 `json:"error_rate"`
LogEntries int64 `json:"log_entries"`
SnapshotsTotal int64 `json:"snapshots_total"`
LastSnapshotTime time.Time `json:"last_snapshot_time"`
Uptime time.Duration `json:"uptime"`
StartTime time.Time `json:"start_time"`
}
ConsensusStats represents consensus statistics
type Discovery ¶
type Discovery interface {
// Start starts the discovery service
Start(ctx context.Context) error
// Stop stops the discovery service
Stop(ctx context.Context) error
// Register registers this node with the discovery service
Register(ctx context.Context, node NodeInfo) error
// Unregister unregisters this node from the discovery service
Unregister(ctx context.Context) error
// GetNodes returns all discovered nodes
GetNodes(ctx context.Context) ([]NodeInfo, error)
// Watch watches for node changes
Watch(ctx context.Context) (<-chan NodeChangeEvent, error)
}
Discovery defines the interface for service discovery
type DiscoveryConfig ¶
type DiscoveryConfig struct {
Type string `yaml:"type" json:"type" default:"static"` // static, dns, consul, etcd, kubernetes
Endpoints []string `yaml:"endpoints" json:"endpoints"`
Namespace string `yaml:"namespace" json:"namespace" default:"default"`
ServiceName string `yaml:"service_name" json:"service_name" default:"forge-consensus"`
RefreshInterval time.Duration `yaml:"refresh_interval" json:"refresh_interval" default:"30s"`
Timeout time.Duration `yaml:"timeout" json:"timeout" default:"10s"`
TTL time.Duration `yaml:"ttl" json:"ttl" default:"60s"`
EnableWatch bool `yaml:"enable_watch" json:"enable_watch" default:"true"`
}
DiscoveryConfig contains service discovery configuration
type DiscoveryEvent ¶
type DiscoveryEvent struct {
Type DiscoveryEventType `json:"type"`
NodeID string `json:"node_id"`
Address string `json:"address"`
Port int `json:"port"`
Timestamp time.Time `json:"timestamp"`
}
DiscoveryEvent represents a discovery event
type DiscoveryEventType ¶
type DiscoveryEventType string
DiscoveryEventType represents the type of discovery event
const ( // DiscoveryEventTypeJoin indicates a node joined DiscoveryEventTypeJoin DiscoveryEventType = "join" // DiscoveryEventTypeLeave indicates a node left DiscoveryEventTypeLeave DiscoveryEventType = "leave" // DiscoveryEventTypeUpdate indicates a node updated DiscoveryEventTypeUpdate DiscoveryEventType = "update" )
type ElectionConfig ¶
type ElectionConfig struct {
Enabled bool `yaml:"enabled" json:"enabled" default:"true"`
RandomizedTimeout bool `yaml:"randomized_timeout" json:"randomized_timeout" default:"true"`
PreVote bool `yaml:"pre_vote" json:"pre_vote" default:"true"`
PriorityElection bool `yaml:"priority_election" json:"priority_election" default:"false"`
Priority int `yaml:"priority" json:"priority" default:"0"`
StepDownOnRemove bool `yaml:"step_down_on_remove" json:"step_down_on_remove" default:"true"`
LeaderStickiness time.Duration `yaml:"leader_stickiness" json:"leader_stickiness" default:"10s"`
}
ElectionConfig contains leader election configuration
type EventsConfig ¶
type EventsConfig struct {
Enabled bool `yaml:"enabled" json:"enabled" default:"true"`
EmitLeaderChange bool `yaml:"emit_leader_change" json:"emit_leader_change" default:"true"`
EmitNodeEvents bool `yaml:"emit_node_events" json:"emit_node_events" default:"true"`
EmitClusterEvents bool `yaml:"emit_cluster_events" json:"emit_cluster_events" default:"true"`
}
EventsConfig contains events configuration
type HealthCheck ¶
type HealthCheck struct {
Name string `json:"name"`
Healthy bool `json:"healthy"`
Message string `json:"message,omitempty"`
Error string `json:"error,omitempty"`
CheckedAt time.Time `json:"checked_at"`
}
HealthCheck represents a single health check result
type HealthConfig ¶
type HealthConfig struct {
Enabled bool `yaml:"enabled" json:"enabled" default:"true"`
CheckInterval time.Duration `yaml:"check_interval" json:"check_interval" default:"10s"`
Timeout time.Duration `yaml:"timeout" json:"timeout" default:"5s"`
UnhealthyThreshold int `yaml:"unhealthy_threshold" json:"unhealthy_threshold" default:"3"`
HealthyThreshold int `yaml:"healthy_threshold" json:"healthy_threshold" default:"2"`
}
HealthConfig contains health check configuration
type HealthStatus ¶
type HealthStatus struct {
Healthy bool `json:"healthy"`
Status string `json:"status"` // "healthy", "degraded", "unhealthy"
Leader bool `json:"leader"`
HasQuorum bool `json:"has_quorum"`
TotalNodes int `json:"total_nodes"`
ActiveNodes int `json:"active_nodes"`
Details []HealthCheck `json:"details"`
LastCheck time.Time `json:"last_check"`
Checks map[string]interface{} `json:"checks"`
}
HealthStatus represents the health status of the consensus system
type HealthStatusEvent ¶
type HealthStatusEvent struct {
Status string `json:"status"` // "healthy", "unhealthy", "degraded"
Details string `json:"details,omitempty"`
ChecksFailed []string `json:"checks_failed,omitempty"`
}
HealthStatusEvent contains data for health status events
type InstallSnapshotRequest ¶
type InstallSnapshotRequest struct {
Term uint64 `json:"term"`
LeaderID string `json:"leader_id"`
LastIncludedIndex uint64 `json:"last_included_index"`
LastIncludedTerm uint64 `json:"last_included_term"`
Offset uint64 `json:"offset"`
Data []byte `json:"data"`
Done bool `json:"done"`
}
InstallSnapshotRequest represents an InstallSnapshot RPC request
type InstallSnapshotResponse ¶
InstallSnapshotResponse represents an InstallSnapshot RPC response
type LeaderElectedEvent ¶
type LeaderElectedEvent struct {
LeaderID string `json:"leader_id"`
Term uint64 `json:"term"`
VotesReceived int `json:"votes_received"`
TotalVotes int `json:"total_votes"`
ElectionDuration time.Duration `json:"election_duration"`
}
LeaderElectedEvent contains data for leader election events
type LogEntry ¶
type LogEntry struct {
Index uint64 `json:"index"`
Term uint64 `json:"term"`
Type EntryType `json:"type"`
Data []byte `json:"data"`
Created time.Time `json:"created"`
}
LogEntry represents a log entry in the replicated log
type LoggingConfig ¶
type LoggingConfig struct {
Level string `yaml:"level" json:"level" default:"info"`
EnableStructured bool `yaml:"enable_structured" json:"enable_structured" default:"true"`
LogRaftDetails bool `yaml:"log_raft_details" json:"log_raft_details" default:"false"`
}
LoggingConfig contains logging configuration
type MembershipChangedEvent ¶
type MembershipChangedEvent struct {
Action string `json:"action"` // "added", "removed", "updated"
NodeID string `json:"affected_node_id"`
OldMembers []string `json:"old_members"`
NewMembers []string `json:"new_members"`
}
MembershipChangedEvent contains data for membership change events
type Message ¶
type Message struct {
Type MessageType `json:"type"`
From string `json:"from"`
To string `json:"to"`
Payload interface{} `json:"payload"`
Timestamp int64 `json:"timestamp"`
}
Message represents a message sent between nodes
type MessageType ¶
type MessageType string
MessageType represents the type of message
const ( // MessageTypeAppendEntries is for AppendEntries RPC MessageTypeAppendEntries MessageType = "append_entries" // MessageTypeAppendEntriesResponse is for AppendEntries response MessageTypeAppendEntriesResponse MessageType = "append_entries_response" // MessageTypeRequestVote is for RequestVote RPC MessageTypeRequestVote MessageType = "request_vote" // MessageTypeRequestVoteResponse is for RequestVote response MessageTypeRequestVoteResponse MessageType = "request_vote_response" // MessageTypeInstallSnapshot is for InstallSnapshot RPC MessageTypeInstallSnapshot MessageType = "install_snapshot" // MessageTypeInstallSnapshotResponse is for InstallSnapshot response MessageTypeInstallSnapshotResponse MessageType = "install_snapshot_response" // MessageTypeHeartbeat is for heartbeat messages MessageTypeHeartbeat MessageType = "heartbeat" )
type MetricsConfig ¶
type MetricsConfig struct {
Enabled bool `yaml:"enabled" json:"enabled" default:"true"`
CollectionInterval time.Duration `yaml:"collection_interval" json:"collection_interval" default:"15s"`
Namespace string `yaml:"namespace" json:"namespace" default:"forge_consensus"`
EnableDetailedMetrics bool `yaml:"enable_detailed_metrics" json:"enable_detailed_metrics" default:"true"`
}
MetricsConfig contains metrics configuration
type NodeChangeEvent ¶
type NodeChangeEvent struct {
Type NodeChangeType `json:"type"`
Node NodeInfo `json:"node"`
}
NodeChangeEvent represents a node change event
type NodeChangeType ¶
type NodeChangeType string
NodeChangeType represents the type of node change
const ( // NodeChangeTypeAdded indicates a node was added NodeChangeTypeAdded NodeChangeType = "added" // NodeChangeTypeRemoved indicates a node was removed NodeChangeTypeRemoved NodeChangeType = "removed" // NodeChangeTypeUpdated indicates a node was updated NodeChangeTypeUpdated NodeChangeType = "updated" )
type NodeInfo ¶
type NodeInfo struct {
ID string `json:"id"`
Address string `json:"address"`
Port int `json:"port"`
Role NodeRole `json:"role"`
Status NodeStatus `json:"status"`
Term uint64 `json:"term"`
LastHeartbeat time.Time `json:"last_heartbeat"`
Metadata map[string]interface{} `json:"metadata"`
}
NodeInfo represents information about a node
type NodeStatus ¶
type NodeStatus string
NodeStatus represents the status of a node
const ( // StatusActive indicates the node is active and healthy StatusActive NodeStatus = "active" // StatusInactive indicates the node is inactive StatusInactive NodeStatus = "inactive" // StatusSuspected indicates the node is suspected of failure StatusSuspected NodeStatus = "suspected" // StatusFailed indicates the node has failed StatusFailed NodeStatus = "failed" )
type ObservabilityConfig ¶
type ObservabilityConfig struct {
Metrics MetricsConfig `yaml:"metrics" json:"metrics"`
Tracing TracingConfig `yaml:"tracing" json:"tracing"`
Logging LoggingConfig `yaml:"logging" json:"logging"`
}
ObservabilityConfig contains observability configuration
type PeerConfig ¶
type PeerConfig struct {
ID string `yaml:"id" json:"id"`
Address string `yaml:"address" json:"address"`
Port int `yaml:"port" json:"port"`
}
PeerConfig represents a cluster peer
type QuorumStatusEvent ¶
type QuorumStatusEvent struct {
HasQuorum bool `json:"has_quorum"`
TotalNodes int `json:"total_nodes"`
HealthyNodes int `json:"healthy_nodes"`
RequiredForQuorum int `json:"required_for_quorum"`
}
QuorumStatusEvent contains data for quorum status events
type RaftConfig ¶
type RaftConfig struct {
HeartbeatInterval time.Duration `yaml:"heartbeat_interval" json:"heartbeat_interval" default:"1s"`
ElectionTimeoutMin time.Duration `yaml:"election_timeout_min" json:"election_timeout_min" default:"5s"`
ElectionTimeoutMax time.Duration `yaml:"election_timeout_max" json:"election_timeout_max" default:"10s"`
SnapshotInterval time.Duration `yaml:"snapshot_interval" json:"snapshot_interval" default:"30m"`
SnapshotThreshold uint64 `yaml:"snapshot_threshold" json:"snapshot_threshold" default:"10000"`
LogCacheSize int `yaml:"log_cache_size" json:"log_cache_size" default:"1024"`
MaxAppendEntries int `yaml:"max_append_entries" json:"max_append_entries" default:"64"`
TrailingLogs uint64 `yaml:"trailing_logs" json:"trailing_logs" default:"10000"`
ReplicationBatchSize int `yaml:"replication_batch_size" json:"replication_batch_size" default:"100"`
LeaderLeaseTimeout time.Duration `yaml:"leader_lease_timeout" json:"leader_lease_timeout" default:"500ms"`
PreVote bool `yaml:"pre_vote" json:"pre_vote" default:"true"`
CheckQuorum bool `yaml:"check_quorum" json:"check_quorum" default:"true"`
DisablePipeline bool `yaml:"disable_pipeline" json:"disable_pipeline" default:"false"`
}
RaftConfig contains Raft-specific configuration
type RaftNode ¶
type RaftNode interface {
// Start starts the Raft node
Start(ctx context.Context) error
// Stop stops the Raft node
Stop(ctx context.Context) error
// IsLeader returns true if this node is the leader
IsLeader() bool
// GetLeader returns the current leader ID
GetLeader() string
// GetTerm returns the current term
GetTerm() uint64
// GetRole returns the current role
GetRole() NodeRole
// Apply applies a log entry
Apply(ctx context.Context, entry LogEntry) error
// Propose proposes a new command to the cluster
Propose(ctx context.Context, command []byte) error
// GetCommitIndex returns the current commit index
GetCommitIndex() uint64
// AppendEntries handles AppendEntries RPC
AppendEntries(ctx context.Context, req *AppendEntriesRequest) (*AppendEntriesResponse, error)
// RequestVote handles RequestVote RPC
RequestVote(ctx context.Context, req *RequestVoteRequest) (*RequestVoteResponse, error)
// InstallSnapshot handles InstallSnapshot RPC
InstallSnapshot(ctx context.Context, req *InstallSnapshotRequest) (*InstallSnapshotResponse, error)
// GetStats returns Raft statistics
GetStats() RaftStats
// AddPeer adds a peer to the Raft node
AddPeer(peerID string)
}
RaftNode defines the interface for Raft node operations
type RaftStats ¶
type RaftStats struct {
NodeID string `json:"node_id"`
Role NodeRole `json:"role"`
Term uint64 `json:"term"`
Leader string `json:"leader"`
CommitIndex uint64 `json:"commit_index"`
LastApplied uint64 `json:"last_applied"`
LastLogIndex uint64 `json:"last_log_index"`
LastLogTerm uint64 `json:"last_log_term"`
VotedFor string `json:"voted_for"`
LastHeartbeat time.Time `json:"last_heartbeat"`
}
RaftStats represents Raft node statistics
type RequestVoteRequest ¶
type RequestVoteRequest struct {
Term uint64 `json:"term"`
CandidateID string `json:"candidate_id"`
LastLogIndex uint64 `json:"last_log_index"`
LastLogTerm uint64 `json:"last_log_term"`
PreVote bool `json:"pre_vote"`
}
RequestVoteRequest represents a RequestVote RPC request
type RequestVoteResponse ¶
type RequestVoteResponse struct {
Term uint64 `json:"term"`
VoteGranted bool `json:"vote_granted"`
NodeID string `json:"node_id"`
}
RequestVoteResponse represents a RequestVote RPC response
type ResilienceConfig ¶
type ResilienceConfig struct {
EnableRetry bool `yaml:"enable_retry" json:"enable_retry" default:"true"`
MaxRetries int `yaml:"max_retries" json:"max_retries" default:"3"`
RetryDelay time.Duration `yaml:"retry_delay" json:"retry_delay" default:"100ms"`
RetryBackoffFactor float64 `yaml:"retry_backoff_factor" json:"retry_backoff_factor" default:"2.0"`
MaxRetryDelay time.Duration `yaml:"max_retry_delay" json:"max_retry_delay" default:"5s"`
EnableCircuitBreaker bool `yaml:"enable_circuit_breaker" json:"enable_circuit_breaker" default:"true"`
CircuitBreakerThreshold int `yaml:"circuit_breaker_threshold" json:"circuit_breaker_threshold" default:"5"`
CircuitBreakerTimeout time.Duration `yaml:"circuit_breaker_timeout" json:"circuit_breaker_timeout" default:"30s"`
}
ResilienceConfig contains resilience configuration
type RoleChangedEvent ¶
type RoleChangedEvent struct {
OldRole string `json:"old_role"`
NewRole string `json:"new_role"`
Term uint64 `json:"term"`
Reason string `json:"reason,omitempty"`
}
RoleChangedEvent contains data for role change events
type SecurityConfig ¶
type SecurityConfig struct {
EnableTLS bool `yaml:"enable_tls" json:"enable_tls" default:"false"`
EnableMTLS bool `yaml:"enable_mtls" json:"enable_mtls" default:"false"`
CertFile string `yaml:"cert_file" json:"cert_file"`
KeyFile string `yaml:"key_file" json:"key_file"`
CAFile string `yaml:"ca_file" json:"ca_file"`
SkipVerify bool `yaml:"skip_verify" json:"skip_verify" default:"false"`
EnableEncryption bool `yaml:"enable_encryption" json:"enable_encryption" default:"false"`
EncryptionKey string `yaml:"encryption_key" json:"encryption_key"`
}
SecurityConfig contains security configuration
type Snapshot ¶
type Snapshot struct {
Index uint64 `json:"index"`
Term uint64 `json:"term"`
Data []byte `json:"data"`
Size int64 `json:"size"`
Created time.Time `json:"created"`
Checksum string `json:"checksum"`
}
Snapshot represents a point-in-time snapshot of the state machine
type SnapshotEvent ¶
type SnapshotEvent struct {
Index uint64 `json:"index"`
Term uint64 `json:"term"`
Size int64 `json:"size"`
Duration time.Duration `json:"duration,omitempty"`
Error string `json:"error,omitempty"`
}
SnapshotEvent contains data for snapshot events
type SnapshotMetadata ¶
type SnapshotMetadata struct {
Index uint64 `json:"index"`
Term uint64 `json:"term"`
Size int64 `json:"size"`
Created time.Time `json:"created"`
Checksum string `json:"checksum"`
}
SnapshotMetadata contains metadata about a snapshot
type StateMachine ¶
type StateMachine interface {
// Apply applies a log entry to the state machine
Apply(entry LogEntry) error
// Snapshot creates a snapshot of the current state
Snapshot() (*Snapshot, error)
// Restore restores the state machine from a snapshot
Restore(snapshot *Snapshot) error
// Query performs a read-only query
Query(query interface{}) (interface{}, error)
}
StateMachine defines the interface for the replicated state machine
type Storage ¶
type Storage interface {
// Start starts the storage backend
Start(ctx context.Context) error
// Stop stops the storage backend
Stop(ctx context.Context) error
// Set stores a key-value pair
Set(key, value []byte) error
// Get retrieves a value by key
Get(key []byte) ([]byte, error)
// Delete deletes a key
Delete(key []byte) error
// Batch executes a batch of operations
Batch(ops []BatchOp) error
// GetRange retrieves a range of key-value pairs
GetRange(start, end []byte) ([]KeyValue, error)
// Close closes the storage backend
Close() error
}
Storage defines the interface for persistent storage
type StorageConfig ¶
type StorageConfig struct {
Type string `yaml:"type" json:"type" default:"badger"` // badger, boltdb, pebble, postgres
Path string `yaml:"path" json:"path" default:"./data/consensus"`
SyncWrites bool `yaml:"sync_writes" json:"sync_writes" default:"true"`
MaxBatchSize int `yaml:"max_batch_size" json:"max_batch_size" default:"1000"`
MaxBatchDelay time.Duration `yaml:"max_batch_delay" json:"max_batch_delay" default:"10ms"`
CompactOnStart bool `yaml:"compact_on_start" json:"compact_on_start" default:"false"`
// BadgerDB specific
BadgerOptions BadgerOptions `yaml:"badger_options" json:"badger_options"`
// BoltDB specific
BoltOptions BoltOptions `yaml:"bolt_options" json:"bolt_options"`
}
StorageConfig contains storage backend configuration
type TracingConfig ¶
type TracingConfig struct {
Enabled bool `yaml:"enabled" json:"enabled" default:"false"`
ServiceName string `yaml:"service_name" json:"service_name" default:"forge-consensus"`
SampleRate float64 `yaml:"sample_rate" json:"sample_rate" default:"0.1"`
}
TracingConfig contains tracing configuration
type Transport ¶
type Transport interface {
// Start starts the transport
Start(ctx context.Context) error
// Stop stops the transport
Stop(ctx context.Context) error
// Send sends a message to a peer
Send(ctx context.Context, target string, message interface{}) error
// Receive returns a channel for receiving messages
Receive() <-chan Message
// AddPeer adds a peer
AddPeer(nodeID, address string, port int) error
// RemovePeer removes a peer
RemovePeer(nodeID string) error
// GetAddress returns the local address
GetAddress() string
}
Transport defines the interface for network transport
type TransportConfig ¶
type TransportConfig struct {
Type string `yaml:"type" json:"type" default:"grpc"` // grpc, tcp
MaxMessageSize int `yaml:"max_message_size" json:"max_message_size" default:"4194304"` // 4MB
Timeout time.Duration `yaml:"timeout" json:"timeout" default:"10s"`
KeepAlive bool `yaml:"keep_alive" json:"keep_alive" default:"true"`
KeepAliveInterval time.Duration `yaml:"keep_alive_interval" json:"keep_alive_interval" default:"30s"`
KeepAliveTimeout time.Duration `yaml:"keep_alive_timeout" json:"keep_alive_timeout" default:"10s"`
MaxConnections int `yaml:"max_connections" json:"max_connections" default:"100"`
ConnectionTimeout time.Duration `yaml:"connection_timeout" json:"connection_timeout" default:"5s"`
IdleTimeout time.Duration `yaml:"idle_timeout" json:"idle_timeout" default:"5m"`
EnableCompression bool `yaml:"enable_compression" json:"enable_compression" default:"true"`
CompressionLevel int `yaml:"compression_level" json:"compression_level" default:"6"`
EnableMultiplexing bool `yaml:"enable_multiplexing" json:"enable_multiplexing" default:"true"`
}
TransportConfig contains transport layer configuration