internal

package
v0.1.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 28, 2025 License: MIT Imports: 5 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ErrCodeNotLeader            = "CONSENSUS_NOT_LEADER"
	ErrCodeNoLeader             = "CONSENSUS_NO_LEADER"
	ErrCodeNotStarted           = "CONSENSUS_NOT_STARTED"
	ErrCodeAlreadyStarted       = "CONSENSUS_ALREADY_STARTED"
	ErrCodeNodeNotFound         = "CONSENSUS_NODE_NOT_FOUND"
	ErrCodeClusterNotFound      = "CONSENSUS_CLUSTER_NOT_FOUND"
	ErrCodeStorageUnavailable   = "CONSENSUS_STORAGE_UNAVAILABLE"
	ErrCodeTransportUnavailable = "CONSENSUS_TRANSPORT_UNAVAILABLE"
	ErrCodeDiscoveryUnavailable = "CONSENSUS_DISCOVERY_UNAVAILABLE"
	ErrCodeNoQuorum             = "CONSENSUS_NO_QUORUM"
	ErrCodeInvalidTerm          = "CONSENSUS_INVALID_TERM"
	ErrCodeStaleTerm            = "CONSENSUS_STALE_TERM"
	ErrCodeLogInconsistent      = "CONSENSUS_LOG_INCONSISTENT"
	ErrCodeSnapshotFailed       = "CONSENSUS_SNAPSHOT_FAILED"
	ErrCodeCompactionFailed     = "CONSENSUS_COMPACTION_FAILED"
	ErrCodeElectionTimeout      = "CONSENSUS_ELECTION_TIMEOUT"
	ErrCodeInvalidPeer          = "CONSENSUS_INVALID_PEER"
	ErrCodePeerExists           = "CONSENSUS_PEER_EXISTS"
	ErrCodePeerNotFound         = "CONSENSUS_PEER_NOT_FOUND"
	ErrCodeInsufficientPeers    = "CONSENSUS_INSUFFICIENT_PEERS"
	ErrCodeNoAvailablePeers     = "CONSENSUS_NO_AVAILABLE_PEERS"
	ErrCodeFailoverFailed       = "CONSENSUS_FAILOVER_FAILED"
	ErrCodeOperationFailed      = "CONSENSUS_OPERATION_FAILED"
	ErrCodeReplicationFailed    = "CONSENSUS_REPLICATION_FAILED"
	ErrCodeTimeout              = "CONSENSUS_TIMEOUT"
	ErrCodeConnectionFailed     = "CONSENSUS_CONNECTION_FAILED"
	ErrCodeStaleRead            = "CONSENSUS_STALE_READ"
	ErrCodeRateLimitExceeded    = "CONSENSUS_RATE_LIMIT_EXCEEDED"
	ErrCodeAuthenticationFailed = "CONSENSUS_AUTHENTICATION_FAILED"
	ErrCodePoolExhausted        = "CONSENSUS_POOL_EXHAUSTED"
	ErrCodeConnectionTimeout    = "CONSENSUS_CONNECTION_TIMEOUT"
)

Consensus error codes

Variables

View Source
var (
	// ErrNotLeader indicates the node is not the leader
	ErrNotLeader = &errors.ForgeError{Code: ErrCodeNotLeader, Message: "node is not the leader"}

	// ErrNoLeader indicates there is no leader
	ErrNoLeader = &errors.ForgeError{Code: ErrCodeNoLeader, Message: "no leader available"}

	// ErrNotStarted indicates the service is not started
	ErrNotStarted = &errors.ForgeError{Code: ErrCodeNotStarted, Message: "consensus service not started"}

	// ErrAlreadyStarted indicates the service is already started
	ErrAlreadyStarted = &errors.ForgeError{Code: ErrCodeAlreadyStarted, Message: "consensus service already started"}

	// ErrNodeNotFound indicates a node was not found
	ErrNodeNotFound = &errors.ForgeError{Code: ErrCodeNodeNotFound, Message: "node not found"}

	// ErrClusterNotFound indicates a cluster was not found
	ErrClusterNotFound = &errors.ForgeError{Code: ErrCodeClusterNotFound, Message: "cluster not found"}

	// ErrStorageUnavailable indicates storage is unavailable
	ErrStorageUnavailable = &errors.ForgeError{Code: ErrCodeStorageUnavailable, Message: "storage unavailable"}

	// ErrTransportUnavailable indicates transport is unavailable
	ErrTransportUnavailable = &errors.ForgeError{Code: ErrCodeTransportUnavailable, Message: "transport unavailable"}

	// ErrDiscoveryUnavailable indicates discovery service is unavailable
	ErrDiscoveryUnavailable = &errors.ForgeError{Code: ErrCodeDiscoveryUnavailable, Message: "discovery service unavailable"}

	// ErrNoQuorum indicates no quorum is available
	ErrNoQuorum = &errors.ForgeError{Code: ErrCodeNoQuorum, Message: "no quorum available"}

	// ErrInvalidTerm indicates an invalid term
	ErrInvalidTerm = &errors.ForgeError{Code: ErrCodeInvalidTerm, Message: "invalid term"}

	// ErrStaleTerm indicates a stale term
	ErrStaleTerm = &errors.ForgeError{Code: ErrCodeStaleTerm, Message: "stale term"}

	// ErrLogInconsistent indicates log inconsistency
	ErrLogInconsistent = &errors.ForgeError{Code: ErrCodeLogInconsistent, Message: "log inconsistent"}

	// ErrSnapshotFailed indicates snapshot operation failed
	ErrSnapshotFailed = &errors.ForgeError{Code: ErrCodeSnapshotFailed, Message: "snapshot operation failed"}

	// ErrCompactionFailed indicates compaction failed
	ErrCompactionFailed = &errors.ForgeError{Code: ErrCodeCompactionFailed, Message: "compaction failed"}

	// ErrElectionTimeout indicates election timeout
	ErrElectionTimeout = &errors.ForgeError{Code: ErrCodeElectionTimeout, Message: "election timeout"}

	// ErrInvalidPeer indicates an invalid peer
	ErrInvalidPeer = &errors.ForgeError{Code: ErrCodeInvalidPeer, Message: "invalid peer"}

	// ErrPeerExists indicates peer already exists
	ErrPeerExists = &errors.ForgeError{Code: ErrCodePeerExists, Message: "peer already exists"}

	// ErrPeerNotFound indicates peer not found
	ErrPeerNotFound = &errors.ForgeError{Code: ErrCodePeerNotFound, Message: "peer not found"}

	// ErrInsufficientPeers indicates insufficient peers
	ErrInsufficientPeers = &errors.ForgeError{Code: ErrCodeInsufficientPeers, Message: "insufficient peers"}

	// ErrNoAvailablePeers indicates no available peers for operation
	ErrNoAvailablePeers = &errors.ForgeError{Code: ErrCodeNoAvailablePeers, Message: "no available peers"}

	// ErrFailoverFailed indicates failover operation failed
	ErrFailoverFailed = &errors.ForgeError{Code: ErrCodeFailoverFailed, Message: "failover operation failed"}

	// ErrOperationFailed indicates a generic operation failure
	ErrOperationFailed = &errors.ForgeError{Code: ErrCodeOperationFailed, Message: "operation failed"}

	// ErrReplicationFailed indicates replication operation failed
	ErrReplicationFailed = &errors.ForgeError{Code: ErrCodeReplicationFailed, Message: "replication failed"}

	// ErrTimeout indicates operation timeout
	ErrTimeout = &errors.ForgeError{Code: ErrCodeTimeout, Message: "operation timeout"}

	// ErrConnectionFailed indicates connection failure
	ErrConnectionFailed = &errors.ForgeError{Code: ErrCodeConnectionFailed, Message: "connection failed"}

	// ErrStaleRead indicates a stale read attempt
	ErrStaleRead = &errors.ForgeError{Code: ErrCodeStaleRead, Message: "stale read"}

	// ErrRateLimitExceeded indicates rate limit exceeded
	ErrRateLimitExceeded = &errors.ForgeError{Code: ErrCodeRateLimitExceeded, Message: "rate limit exceeded"}

	// ErrAuthenticationFailed indicates authentication failure
	ErrAuthenticationFailed = &errors.ForgeError{Code: ErrCodeAuthenticationFailed, Message: "authentication failed"}

	// ErrPoolExhausted indicates connection pool exhausted
	ErrPoolExhausted = &errors.ForgeError{Code: ErrCodePoolExhausted, Message: "connection pool exhausted"}

	// ErrConnectionTimeout indicates connection timeout
	ErrConnectionTimeout = &errors.ForgeError{Code: ErrCodeConnectionTimeout, Message: "connection timeout"}

	// ErrInvalidConfig indicates invalid configuration
	ErrInvalidConfig = errors.ErrInvalidConfigSentinel
)

Sentinel errors for use with errors.Is

Functions

func Is

func Is(err, target error) bool

Is is a helper function that wraps errors.Is from stdlib

func IsFatal

func IsFatal(err error) bool

IsFatal returns true if the error is fatal and should cause shutdown

func IsNoLeaderError

func IsNoLeaderError(err error) bool

IsNoLeaderError returns true if the error is a no leader error

func IsNoQuorumError

func IsNoQuorumError(err error) bool

IsNoQuorumError returns true if the error is a no quorum error

func IsNotLeaderError

func IsNotLeaderError(err error) bool

IsNotLeaderError returns true if the error is a not leader error

func IsRetryable

func IsRetryable(err error) bool

IsRetryable returns true if the error is retryable

func IsStaleTermError

func IsStaleTermError(err error) bool

IsStaleTermError returns true if the error is a stale term error

func NewNoLeaderError

func NewNoLeaderError() *errors.ForgeError

NewNoLeaderError creates a no leader error

func NewNoQuorumError

func NewNoQuorumError(required, available int) *errors.ForgeError

NewNoQuorumError creates a no quorum error with context

func NewNotLeaderError

func NewNotLeaderError(nodeID string, leaderID string) *errors.ForgeError

NewNotLeaderError creates a not leader error with context

func NewStaleTermError

func NewStaleTermError(current, received uint64) *errors.ForgeError

NewStaleTermError creates a stale term error with context

func NewTimeoutError

func NewTimeoutError(operation string) *errors.ForgeError

NewTimeoutError creates a timeout error

Types

type AdminAPIConfig

type AdminAPIConfig struct {
	Enabled    bool   `yaml:"enabled" json:"enabled" default:"true"`
	PathPrefix string `yaml:"path_prefix" json:"path_prefix" default:"/consensus"`
	EnableAuth bool   `yaml:"enable_auth" json:"enable_auth" default:"false"`
	APIKey     string `yaml:"api_key" json:"api_key"`
}

AdminAPIConfig contains admin API configuration

type AdvancedConfig

type AdvancedConfig struct {
	EnableAutoSnapshot   bool          `yaml:"enable_auto_snapshot" json:"enable_auto_snapshot" default:"true"`
	EnableAutoCompaction bool          `yaml:"enable_auto_compaction" json:"enable_auto_compaction" default:"true"`
	CompactionInterval   time.Duration `yaml:"compaction_interval" json:"compaction_interval" default:"1h"`
	MaxMemoryUsage       int64         `yaml:"max_memory_usage" json:"max_memory_usage" default:"1073741824"` // 1GB
	GCInterval           time.Duration `yaml:"gc_interval" json:"gc_interval" default:"5m"`
	EnableReadIndex      bool          `yaml:"enable_read_index" json:"enable_read_index" default:"true"`
	EnableLeasedReads    bool          `yaml:"enable_leased_reads" json:"enable_leased_reads" default:"true"`
}

AdvancedConfig contains advanced settings

type AppendEntriesRequest

type AppendEntriesRequest struct {
	Term         uint64     `json:"term"`
	LeaderID     string     `json:"leader_id"`
	PrevLogIndex uint64     `json:"prev_log_index"`
	PrevLogTerm  uint64     `json:"prev_log_term"`
	Entries      []LogEntry `json:"entries"`
	LeaderCommit uint64     `json:"leader_commit"`
}

AppendEntriesRequest represents an AppendEntries RPC request

type AppendEntriesResponse

type AppendEntriesResponse struct {
	Term       uint64 `json:"term"`
	Success    bool   `json:"success"`
	MatchIndex uint64 `json:"match_index"`
	NodeID     string `json:"node_id"`
}

AppendEntriesResponse represents an AppendEntries RPC response

type BadgerOptions

type BadgerOptions struct {
	ValueLogMaxEntries uint32 `yaml:"value_log_max_entries" json:"value_log_max_entries" default:"1000000"`
	MemTableSize       int64  `yaml:"mem_table_size" json:"mem_table_size" default:"67108864"` // 64MB
	NumMemTables       int    `yaml:"num_mem_tables" json:"num_mem_tables" default:"5"`
	NumLevelZeroTables int    `yaml:"num_level_zero_tables" json:"num_level_zero_tables" default:"5"`
	NumCompactors      int    `yaml:"num_compactors" json:"num_compactors" default:"4"`
}

BadgerOptions contains BadgerDB-specific options

type BatchOp

type BatchOp struct {
	Type  BatchOpType
	Key   []byte
	Value []byte
}

BatchOp represents a batch operation

type BatchOpType

type BatchOpType int

BatchOpType represents the type of batch operation

const (
	// BatchOpSet is a set operation
	BatchOpSet BatchOpType = iota
	// BatchOpDelete is a delete operation
	BatchOpDelete
)

type BoltOptions

type BoltOptions struct {
	NoSync          bool          `yaml:"no_sync" json:"no_sync" default:"false"`
	NoGrowSync      bool          `yaml:"no_grow_sync" json:"no_grow_sync" default:"false"`
	InitialMmapSize int           `yaml:"initial_mmap_size" json:"initial_mmap_size" default:"0"`
	Timeout         time.Duration `yaml:"timeout" json:"timeout" default:"1s"`
}

BoltOptions contains BoltDB-specific options

type ClusterInfo

type ClusterInfo struct {
	ID          string     `json:"id"`
	Leader      string     `json:"leader"`
	Term        uint64     `json:"term"`
	Nodes       []NodeInfo `json:"nodes"`
	TotalNodes  int        `json:"total_nodes"`
	ActiveNodes int        `json:"active_nodes"`
	HasQuorum   bool       `json:"has_quorum"`
	CommitIndex uint64     `json:"commit_index"`
	LastApplied uint64     `json:"last_applied"`
}

ClusterInfo represents information about the cluster

type ClusterManager

type ClusterManager interface {
	// GetNodes returns all nodes in the cluster
	GetNodes() []NodeInfo

	// GetNode returns information about a specific node
	GetNode(nodeID string) (*NodeInfo, error)

	// AddNode adds a node to the cluster
	AddNode(nodeID, address string, port int) error

	// RemoveNode removes a node from the cluster
	RemoveNode(nodeID string) error

	// UpdateNode updates node information
	UpdateNode(nodeID string, info NodeInfo) error

	// GetLeader returns the current leader node
	GetLeader() *NodeInfo

	// HasQuorum returns true if the cluster has quorum
	HasQuorum() bool

	// GetClusterSize returns the size of the cluster
	GetClusterSize() int

	// GetHealthyNodes returns the number of healthy nodes
	GetHealthyNodes() int
}

ClusterManager defines the interface for cluster management

type Command

type Command struct {
	Type    string                 `json:"type"`
	Payload map[string]interface{} `json:"payload"`
}

Command represents a command to be applied to the state machine

type Config

type Config struct {
	// Node configuration
	NodeID    string `yaml:"node_id" json:"node_id"`
	ClusterID string `yaml:"cluster_id" json:"cluster_id"`
	BindAddr  string `yaml:"bind_addr" json:"bind_addr" default:"0.0.0.0"`
	BindPort  int    `yaml:"bind_port" json:"bind_port" default:"7000"`

	// Peers - initial cluster members
	Peers []PeerConfig `yaml:"peers" json:"peers"`

	// Raft configuration
	Raft RaftConfig `yaml:"raft" json:"raft"`

	// Transport configuration
	Transport TransportConfig `yaml:"transport" json:"transport"`

	// Discovery configuration
	Discovery DiscoveryConfig `yaml:"discovery" json:"discovery"`

	// Storage configuration
	Storage StorageConfig `yaml:"storage" json:"storage"`

	// Election configuration
	Election ElectionConfig `yaml:"election" json:"election"`

	// Health check configuration
	Health HealthConfig `yaml:"health" json:"health"`

	// Observability configuration
	Observability ObservabilityConfig `yaml:"observability" json:"observability"`

	// Security configuration
	Security SecurityConfig `yaml:"security" json:"security"`

	// Resilience configuration
	Resilience ResilienceConfig `yaml:"resilience" json:"resilience"`

	// Admin API configuration
	AdminAPI AdminAPIConfig `yaml:"admin_api" json:"admin_api"`

	// Events configuration
	Events EventsConfig `yaml:"events" json:"events"`

	// Advanced settings
	Advanced AdvancedConfig `yaml:"advanced" json:"advanced"`

	// Internal flag
	RequireConfig bool `yaml:"-" json:"-"`
}

Config contains all consensus configuration

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns default configuration

func (*Config) Validate

func (c *Config) Validate() error

Validate validates the configuration

type ConfigOption

type ConfigOption func(*Config)

ConfigOption is a functional option for Config

func WithBindAddress

func WithBindAddress(addr string, port int) ConfigOption

WithBindAddress sets the bind address and port

func WithClusterID

func WithClusterID(id string) ConfigOption

WithClusterID sets the cluster ID

func WithConfig

func WithConfig(cfg Config) ConfigOption

WithConfig sets the entire config (for backward compatibility)

func WithDiscoveryType

func WithDiscoveryType(discoveryType string) ConfigOption

WithDiscoveryType sets the discovery type

func WithMTLS

func WithMTLS(caFile string) ConfigOption

WithMTLS enables mutual TLS with the given CA file

func WithNodeID

func WithNodeID(id string) ConfigOption

WithNodeID sets the node ID

func WithPeers

func WithPeers(peers []PeerConfig) ConfigOption

WithPeers sets the initial peer list

func WithRequireConfig

func WithRequireConfig(require bool) ConfigOption

WithRequireConfig sets whether config is required from ConfigManager

func WithStoragePath

func WithStoragePath(path string) ConfigOption

WithStoragePath sets the storage path

func WithStorageType

func WithStorageType(storageType string) ConfigOption

WithStorageType sets the storage type

func WithTLS

func WithTLS(certFile, keyFile string) ConfigOption

WithTLS enables TLS with the given cert and key files

func WithTransportType

func WithTransportType(transportType string) ConfigOption

WithTransportType sets the transport type

type ConsensusEvent

type ConsensusEvent struct {
	Type      ConsensusEventType     `json:"type"`
	NodeID    string                 `json:"node_id"`
	ClusterID string                 `json:"cluster_id"`
	Data      map[string]interface{} `json:"data"`
	Timestamp time.Time              `json:"timestamp"`
}

ConsensusEvent represents a consensus event

type ConsensusEventType

type ConsensusEventType string

ConsensusEventType represents the type of consensus event

const (
	// Node lifecycle events
	ConsensusEventNodeStarted   ConsensusEventType = "consensus.node.started"
	ConsensusEventNodeStopped   ConsensusEventType = "consensus.node.stopped"
	ConsensusEventNodeJoined    ConsensusEventType = "consensus.node.joined"
	ConsensusEventNodeLeft      ConsensusEventType = "consensus.node.left"
	ConsensusEventNodeFailed    ConsensusEventType = "consensus.node.failed"
	ConsensusEventNodeRecovered ConsensusEventType = "consensus.node.recovered"

	// Leadership events
	ConsensusEventLeaderElected  ConsensusEventType = "consensus.leader.elected"
	ConsensusEventLeaderStepDown ConsensusEventType = "consensus.leader.stepdown"
	ConsensusEventLeaderTransfer ConsensusEventType = "consensus.leader.transfer"
	ConsensusEventLeaderLost     ConsensusEventType = "consensus.leader.lost"

	// Role change events
	ConsensusEventRoleChanged     ConsensusEventType = "consensus.role.changed"
	ConsensusEventBecameFollower  ConsensusEventType = "consensus.role.follower"
	ConsensusEventBecameCandidate ConsensusEventType = "consensus.role.candidate"
	ConsensusEventBecameLeader    ConsensusEventType = "consensus.role.leader"

	// Cluster events
	ConsensusEventClusterFormed     ConsensusEventType = "consensus.cluster.formed"
	ConsensusEventClusterUpdated    ConsensusEventType = "consensus.cluster.updated"
	ConsensusEventQuorumAchieved    ConsensusEventType = "consensus.cluster.quorum.achieved"
	ConsensusEventQuorumLost        ConsensusEventType = "consensus.cluster.quorum.lost"
	ConsensusEventMembershipChanged ConsensusEventType = "consensus.cluster.membership.changed"

	// Log events
	ConsensusEventLogAppended  ConsensusEventType = "consensus.log.appended"
	ConsensusEventLogCommitted ConsensusEventType = "consensus.log.committed"
	ConsensusEventLogCompacted ConsensusEventType = "consensus.log.compacted"
	ConsensusEventLogTruncated ConsensusEventType = "consensus.log.truncated"

	// Snapshot events
	ConsensusEventSnapshotStarted   ConsensusEventType = "consensus.snapshot.started"
	ConsensusEventSnapshotCompleted ConsensusEventType = "consensus.snapshot.completed"
	ConsensusEventSnapshotFailed    ConsensusEventType = "consensus.snapshot.failed"
	ConsensusEventSnapshotRestored  ConsensusEventType = "consensus.snapshot.restored"

	// Health events
	ConsensusEventHealthy    ConsensusEventType = "consensus.health.healthy"
	ConsensusEventUnhealthy  ConsensusEventType = "consensus.health.unhealthy"
	ConsensusEventDegraded   ConsensusEventType = "consensus.health.degraded"
	ConsensusEventRecovering ConsensusEventType = "consensus.health.recovering"

	// Configuration events
	ConsensusEventConfigUpdated  ConsensusEventType = "consensus.config.updated"
	ConsensusEventConfigReloaded ConsensusEventType = "consensus.config.reloaded"
)

type ConsensusService

type ConsensusService interface {
	// Start starts the consensus service
	Start(ctx context.Context) error

	// Stop stops the consensus service
	Stop(ctx context.Context) error

	// IsLeader returns true if this node is the leader
	IsLeader() bool

	// GetLeader returns the current leader node ID
	GetLeader() string

	// GetRole returns the current role of this node
	GetRole() NodeRole

	// GetTerm returns the current term
	GetTerm() uint64

	// Apply applies a command to the state machine
	Apply(ctx context.Context, cmd Command) error

	// Read performs a consistent read operation
	Read(ctx context.Context, query interface{}) (interface{}, error)

	// GetStats returns consensus statistics
	GetStats() ConsensusStats

	// HealthCheck performs a health check
	HealthCheck(ctx context.Context) error

	// GetHealthStatus returns detailed health status
	GetHealthStatus(ctx context.Context) HealthStatus

	// GetClusterInfo returns cluster information
	GetClusterInfo() ClusterInfo

	// AddNode adds a node to the cluster
	AddNode(ctx context.Context, nodeID, address string, port int) error

	// RemoveNode removes a node from the cluster
	RemoveNode(ctx context.Context, nodeID string) error

	// TransferLeadership transfers leadership to another node
	TransferLeadership(ctx context.Context, targetNodeID string) error

	// StepDown causes the leader to step down
	StepDown(ctx context.Context) error

	// Snapshot creates a snapshot
	Snapshot(ctx context.Context) error

	// UpdateConfig updates the configuration
	UpdateConfig(ctx context.Context, config Config) error
}

ConsensusService defines the interface for the consensus service

type ConsensusStats

type ConsensusStats struct {
	NodeID           string        `json:"node_id"`
	ClusterID        string        `json:"cluster_id"`
	Role             NodeRole      `json:"role"`
	Status           NodeStatus    `json:"status"`
	Term             uint64        `json:"term"`
	LeaderID         string        `json:"leader_id"`
	CommitIndex      uint64        `json:"commit_index"`
	LastApplied      uint64        `json:"last_applied"`
	LastLogIndex     uint64        `json:"last_log_index"`
	LastLogTerm      uint64        `json:"last_log_term"`
	ClusterSize      int           `json:"cluster_size"`
	HealthyNodes     int           `json:"healthy_nodes"`
	HasQuorum        bool          `json:"has_quorum"`
	ElectionsTotal   int64         `json:"elections_total"`
	ElectionsFailed  int64         `json:"elections_failed"`
	OperationsTotal  int64         `json:"operations_total"`
	OperationsFailed int64         `json:"operations_failed"`
	OperationsPerSec float64       `json:"operations_per_sec"`
	AverageLatencyMs float64       `json:"average_latency_ms"`
	ErrorRate        float64       `json:"error_rate"`
	LogEntries       int64         `json:"log_entries"`
	SnapshotsTotal   int64         `json:"snapshots_total"`
	LastSnapshotTime time.Time     `json:"last_snapshot_time"`
	Uptime           time.Duration `json:"uptime"`
	StartTime        time.Time     `json:"start_time"`
}

ConsensusStats represents consensus statistics

type Discovery

type Discovery interface {
	// Start starts the discovery service
	Start(ctx context.Context) error

	// Stop stops the discovery service
	Stop(ctx context.Context) error

	// Register registers this node with the discovery service
	Register(ctx context.Context, node NodeInfo) error

	// Unregister unregisters this node from the discovery service
	Unregister(ctx context.Context) error

	// GetNodes returns all discovered nodes
	GetNodes(ctx context.Context) ([]NodeInfo, error)

	// Watch watches for node changes
	Watch(ctx context.Context) (<-chan NodeChangeEvent, error)
}

Discovery defines the interface for service discovery

type DiscoveryConfig

type DiscoveryConfig struct {
	Type            string        `yaml:"type" json:"type" default:"static"` // static, dns, consul, etcd, kubernetes
	Endpoints       []string      `yaml:"endpoints" json:"endpoints"`
	Namespace       string        `yaml:"namespace" json:"namespace" default:"default"`
	ServiceName     string        `yaml:"service_name" json:"service_name" default:"forge-consensus"`
	RefreshInterval time.Duration `yaml:"refresh_interval" json:"refresh_interval" default:"30s"`
	Timeout         time.Duration `yaml:"timeout" json:"timeout" default:"10s"`
	TTL             time.Duration `yaml:"ttl" json:"ttl" default:"60s"`
	EnableWatch     bool          `yaml:"enable_watch" json:"enable_watch" default:"true"`
}

DiscoveryConfig contains service discovery configuration

type DiscoveryEvent

type DiscoveryEvent struct {
	Type      DiscoveryEventType `json:"type"`
	NodeID    string             `json:"node_id"`
	Address   string             `json:"address"`
	Port      int                `json:"port"`
	Timestamp time.Time          `json:"timestamp"`
}

DiscoveryEvent represents a discovery event

type DiscoveryEventType

type DiscoveryEventType string

DiscoveryEventType represents the type of discovery event

const (
	// DiscoveryEventTypeJoin indicates a node joined
	DiscoveryEventTypeJoin DiscoveryEventType = "join"
	// DiscoveryEventTypeLeave indicates a node left
	DiscoveryEventTypeLeave DiscoveryEventType = "leave"
	// DiscoveryEventTypeUpdate indicates a node updated
	DiscoveryEventTypeUpdate DiscoveryEventType = "update"
)

type ElectionConfig

type ElectionConfig struct {
	Enabled           bool          `yaml:"enabled" json:"enabled" default:"true"`
	RandomizedTimeout bool          `yaml:"randomized_timeout" json:"randomized_timeout" default:"true"`
	PreVote           bool          `yaml:"pre_vote" json:"pre_vote" default:"true"`
	PriorityElection  bool          `yaml:"priority_election" json:"priority_election" default:"false"`
	Priority          int           `yaml:"priority" json:"priority" default:"0"`
	StepDownOnRemove  bool          `yaml:"step_down_on_remove" json:"step_down_on_remove" default:"true"`
	LeaderStickiness  time.Duration `yaml:"leader_stickiness" json:"leader_stickiness" default:"10s"`
}

ElectionConfig contains leader election configuration

type EntryType

type EntryType int

EntryType represents the type of log entry

const (
	// EntryNormal is a normal command entry
	EntryNormal EntryType = iota
	// EntryConfig is a configuration change entry
	EntryConfig
	// EntryBarrier is a barrier entry for read consistency
	EntryBarrier
	// EntryNoop is a no-op entry
	EntryNoop
)

type EventsConfig

type EventsConfig struct {
	Enabled           bool `yaml:"enabled" json:"enabled" default:"true"`
	EmitLeaderChange  bool `yaml:"emit_leader_change" json:"emit_leader_change" default:"true"`
	EmitNodeEvents    bool `yaml:"emit_node_events" json:"emit_node_events" default:"true"`
	EmitClusterEvents bool `yaml:"emit_cluster_events" json:"emit_cluster_events" default:"true"`
}

EventsConfig contains events configuration

type HealthCheck

type HealthCheck struct {
	Name      string    `json:"name"`
	Healthy   bool      `json:"healthy"`
	Message   string    `json:"message,omitempty"`
	Error     string    `json:"error,omitempty"`
	CheckedAt time.Time `json:"checked_at"`
}

HealthCheck represents a single health check result

type HealthConfig

type HealthConfig struct {
	Enabled            bool          `yaml:"enabled" json:"enabled" default:"true"`
	CheckInterval      time.Duration `yaml:"check_interval" json:"check_interval" default:"10s"`
	Timeout            time.Duration `yaml:"timeout" json:"timeout" default:"5s"`
	UnhealthyThreshold int           `yaml:"unhealthy_threshold" json:"unhealthy_threshold" default:"3"`
	HealthyThreshold   int           `yaml:"healthy_threshold" json:"healthy_threshold" default:"2"`
}

HealthConfig contains health check configuration

type HealthStatus

type HealthStatus struct {
	Healthy     bool                   `json:"healthy"`
	Status      string                 `json:"status"` // "healthy", "degraded", "unhealthy"
	Leader      bool                   `json:"leader"`
	HasQuorum   bool                   `json:"has_quorum"`
	TotalNodes  int                    `json:"total_nodes"`
	ActiveNodes int                    `json:"active_nodes"`
	Details     []HealthCheck          `json:"details"`
	LastCheck   time.Time              `json:"last_check"`
	Checks      map[string]interface{} `json:"checks"`
}

HealthStatus represents the health status of the consensus system

type HealthStatusEvent

type HealthStatusEvent struct {
	Status       string   `json:"status"` // "healthy", "unhealthy", "degraded"
	Details      string   `json:"details,omitempty"`
	ChecksFailed []string `json:"checks_failed,omitempty"`
}

HealthStatusEvent contains data for health status events

type InstallSnapshotRequest

type InstallSnapshotRequest struct {
	Term              uint64 `json:"term"`
	LeaderID          string `json:"leader_id"`
	LastIncludedIndex uint64 `json:"last_included_index"`
	LastIncludedTerm  uint64 `json:"last_included_term"`
	Offset            uint64 `json:"offset"`
	Data              []byte `json:"data"`
	Done              bool   `json:"done"`
}

InstallSnapshotRequest represents an InstallSnapshot RPC request

type InstallSnapshotResponse

type InstallSnapshotResponse struct {
	Term   uint64 `json:"term"`
	NodeID string `json:"node_id"`
}

InstallSnapshotResponse represents an InstallSnapshot RPC response

type KeyValue

type KeyValue struct {
	Key   []byte
	Value []byte
}

KeyValue represents a key-value pair

type LeaderElectedEvent

type LeaderElectedEvent struct {
	LeaderID         string        `json:"leader_id"`
	Term             uint64        `json:"term"`
	VotesReceived    int           `json:"votes_received"`
	TotalVotes       int           `json:"total_votes"`
	ElectionDuration time.Duration `json:"election_duration"`
}

LeaderElectedEvent contains data for leader election events

type LogEntry

type LogEntry struct {
	Index   uint64    `json:"index"`
	Term    uint64    `json:"term"`
	Type    EntryType `json:"type"`
	Data    []byte    `json:"data"`
	Created time.Time `json:"created"`
}

LogEntry represents a log entry in the replicated log

type LoggingConfig

type LoggingConfig struct {
	Level            string `yaml:"level" json:"level" default:"info"`
	EnableStructured bool   `yaml:"enable_structured" json:"enable_structured" default:"true"`
	LogRaftDetails   bool   `yaml:"log_raft_details" json:"log_raft_details" default:"false"`
}

LoggingConfig contains logging configuration

type MembershipChangedEvent

type MembershipChangedEvent struct {
	Action     string   `json:"action"` // "added", "removed", "updated"
	NodeID     string   `json:"affected_node_id"`
	OldMembers []string `json:"old_members"`
	NewMembers []string `json:"new_members"`
}

MembershipChangedEvent contains data for membership change events

type Message

type Message struct {
	Type      MessageType `json:"type"`
	From      string      `json:"from"`
	To        string      `json:"to"`
	Payload   interface{} `json:"payload"`
	Timestamp int64       `json:"timestamp"`
}

Message represents a message sent between nodes

type MessageType

type MessageType string

MessageType represents the type of message

const (
	// MessageTypeAppendEntries is for AppendEntries RPC
	MessageTypeAppendEntries MessageType = "append_entries"
	// MessageTypeAppendEntriesResponse is for AppendEntries response
	MessageTypeAppendEntriesResponse MessageType = "append_entries_response"
	// MessageTypeRequestVote is for RequestVote RPC
	MessageTypeRequestVote MessageType = "request_vote"
	// MessageTypeRequestVoteResponse is for RequestVote response
	MessageTypeRequestVoteResponse MessageType = "request_vote_response"
	// MessageTypeInstallSnapshot is for InstallSnapshot RPC
	MessageTypeInstallSnapshot MessageType = "install_snapshot"
	// MessageTypeInstallSnapshotResponse is for InstallSnapshot response
	MessageTypeInstallSnapshotResponse MessageType = "install_snapshot_response"
	// MessageTypeHeartbeat is for heartbeat messages
	MessageTypeHeartbeat MessageType = "heartbeat"
)

type MetricsConfig

type MetricsConfig struct {
	Enabled               bool          `yaml:"enabled" json:"enabled" default:"true"`
	CollectionInterval    time.Duration `yaml:"collection_interval" json:"collection_interval" default:"15s"`
	Namespace             string        `yaml:"namespace" json:"namespace" default:"forge_consensus"`
	EnableDetailedMetrics bool          `yaml:"enable_detailed_metrics" json:"enable_detailed_metrics" default:"true"`
}

MetricsConfig contains metrics configuration

type NodeChangeEvent

type NodeChangeEvent struct {
	Type NodeChangeType `json:"type"`
	Node NodeInfo       `json:"node"`
}

NodeChangeEvent represents a node change event

type NodeChangeType

type NodeChangeType string

NodeChangeType represents the type of node change

const (
	// NodeChangeTypeAdded indicates a node was added
	NodeChangeTypeAdded NodeChangeType = "added"
	// NodeChangeTypeRemoved indicates a node was removed
	NodeChangeTypeRemoved NodeChangeType = "removed"
	// NodeChangeTypeUpdated indicates a node was updated
	NodeChangeTypeUpdated NodeChangeType = "updated"
)

type NodeInfo

type NodeInfo struct {
	ID            string                 `json:"id"`
	Address       string                 `json:"address"`
	Port          int                    `json:"port"`
	Role          NodeRole               `json:"role"`
	Status        NodeStatus             `json:"status"`
	Term          uint64                 `json:"term"`
	LastHeartbeat time.Time              `json:"last_heartbeat"`
	Metadata      map[string]interface{} `json:"metadata"`
}

NodeInfo represents information about a node

type NodeRole

type NodeRole string

NodeRole represents the role of a node in the consensus system

const (
	// RoleFollower indicates the node is a follower
	RoleFollower NodeRole = "follower"
	// RoleCandidate indicates the node is a candidate in an election
	RoleCandidate NodeRole = "candidate"
	// RoleLeader indicates the node is the leader
	RoleLeader NodeRole = "leader"
)

type NodeStatus

type NodeStatus string

NodeStatus represents the status of a node

const (
	// StatusActive indicates the node is active and healthy
	StatusActive NodeStatus = "active"
	// StatusInactive indicates the node is inactive
	StatusInactive NodeStatus = "inactive"
	// StatusSuspected indicates the node is suspected of failure
	StatusSuspected NodeStatus = "suspected"
	// StatusFailed indicates the node has failed
	StatusFailed NodeStatus = "failed"
)

type ObservabilityConfig

type ObservabilityConfig struct {
	Metrics MetricsConfig `yaml:"metrics" json:"metrics"`
	Tracing TracingConfig `yaml:"tracing" json:"tracing"`
	Logging LoggingConfig `yaml:"logging" json:"logging"`
}

ObservabilityConfig contains observability configuration

type PeerConfig

type PeerConfig struct {
	ID      string `yaml:"id" json:"id"`
	Address string `yaml:"address" json:"address"`
	Port    int    `yaml:"port" json:"port"`
}

PeerConfig represents a cluster peer

type QuorumStatusEvent

type QuorumStatusEvent struct {
	HasQuorum         bool `json:"has_quorum"`
	TotalNodes        int  `json:"total_nodes"`
	HealthyNodes      int  `json:"healthy_nodes"`
	RequiredForQuorum int  `json:"required_for_quorum"`
}

QuorumStatusEvent contains data for quorum status events

type RaftConfig

type RaftConfig struct {
	HeartbeatInterval    time.Duration `yaml:"heartbeat_interval" json:"heartbeat_interval" default:"1s"`
	ElectionTimeoutMin   time.Duration `yaml:"election_timeout_min" json:"election_timeout_min" default:"5s"`
	ElectionTimeoutMax   time.Duration `yaml:"election_timeout_max" json:"election_timeout_max" default:"10s"`
	SnapshotInterval     time.Duration `yaml:"snapshot_interval" json:"snapshot_interval" default:"30m"`
	SnapshotThreshold    uint64        `yaml:"snapshot_threshold" json:"snapshot_threshold" default:"10000"`
	LogCacheSize         int           `yaml:"log_cache_size" json:"log_cache_size" default:"1024"`
	MaxAppendEntries     int           `yaml:"max_append_entries" json:"max_append_entries" default:"64"`
	TrailingLogs         uint64        `yaml:"trailing_logs" json:"trailing_logs" default:"10000"`
	ReplicationBatchSize int           `yaml:"replication_batch_size" json:"replication_batch_size" default:"100"`
	LeaderLeaseTimeout   time.Duration `yaml:"leader_lease_timeout" json:"leader_lease_timeout" default:"500ms"`
	PreVote              bool          `yaml:"pre_vote" json:"pre_vote" default:"true"`
	CheckQuorum          bool          `yaml:"check_quorum" json:"check_quorum" default:"true"`
	DisablePipeline      bool          `yaml:"disable_pipeline" json:"disable_pipeline" default:"false"`
}

RaftConfig contains Raft-specific configuration

type RaftNode

type RaftNode interface {
	// Start starts the Raft node
	Start(ctx context.Context) error

	// Stop stops the Raft node
	Stop(ctx context.Context) error

	// IsLeader returns true if this node is the leader
	IsLeader() bool

	// GetLeader returns the current leader ID
	GetLeader() string

	// GetTerm returns the current term
	GetTerm() uint64

	// GetRole returns the current role
	GetRole() NodeRole

	// Apply applies a log entry
	Apply(ctx context.Context, entry LogEntry) error

	// Propose proposes a new command to the cluster
	Propose(ctx context.Context, command []byte) error

	// GetCommitIndex returns the current commit index
	GetCommitIndex() uint64

	// AppendEntries handles AppendEntries RPC
	AppendEntries(ctx context.Context, req *AppendEntriesRequest) (*AppendEntriesResponse, error)

	// RequestVote handles RequestVote RPC
	RequestVote(ctx context.Context, req *RequestVoteRequest) (*RequestVoteResponse, error)

	// InstallSnapshot handles InstallSnapshot RPC
	InstallSnapshot(ctx context.Context, req *InstallSnapshotRequest) (*InstallSnapshotResponse, error)

	// GetStats returns Raft statistics
	GetStats() RaftStats

	// AddPeer adds a peer to the Raft node
	AddPeer(peerID string)
}

RaftNode defines the interface for Raft node operations

type RaftStats

type RaftStats struct {
	NodeID        string    `json:"node_id"`
	Role          NodeRole  `json:"role"`
	Term          uint64    `json:"term"`
	Leader        string    `json:"leader"`
	CommitIndex   uint64    `json:"commit_index"`
	LastApplied   uint64    `json:"last_applied"`
	LastLogIndex  uint64    `json:"last_log_index"`
	LastLogTerm   uint64    `json:"last_log_term"`
	VotedFor      string    `json:"voted_for"`
	LastHeartbeat time.Time `json:"last_heartbeat"`
}

RaftStats represents Raft node statistics

type RequestVoteRequest

type RequestVoteRequest struct {
	Term         uint64 `json:"term"`
	CandidateID  string `json:"candidate_id"`
	LastLogIndex uint64 `json:"last_log_index"`
	LastLogTerm  uint64 `json:"last_log_term"`
	PreVote      bool   `json:"pre_vote"`
}

RequestVoteRequest represents a RequestVote RPC request

type RequestVoteResponse

type RequestVoteResponse struct {
	Term        uint64 `json:"term"`
	VoteGranted bool   `json:"vote_granted"`
	NodeID      string `json:"node_id"`
}

RequestVoteResponse represents a RequestVote RPC response

type ResilienceConfig

type ResilienceConfig struct {
	EnableRetry             bool          `yaml:"enable_retry" json:"enable_retry" default:"true"`
	MaxRetries              int           `yaml:"max_retries" json:"max_retries" default:"3"`
	RetryDelay              time.Duration `yaml:"retry_delay" json:"retry_delay" default:"100ms"`
	RetryBackoffFactor      float64       `yaml:"retry_backoff_factor" json:"retry_backoff_factor" default:"2.0"`
	MaxRetryDelay           time.Duration `yaml:"max_retry_delay" json:"max_retry_delay" default:"5s"`
	EnableCircuitBreaker    bool          `yaml:"enable_circuit_breaker" json:"enable_circuit_breaker" default:"true"`
	CircuitBreakerThreshold int           `yaml:"circuit_breaker_threshold" json:"circuit_breaker_threshold" default:"5"`
	CircuitBreakerTimeout   time.Duration `yaml:"circuit_breaker_timeout" json:"circuit_breaker_timeout" default:"30s"`
}

ResilienceConfig contains resilience configuration

type RoleChangedEvent

type RoleChangedEvent struct {
	OldRole string `json:"old_role"`
	NewRole string `json:"new_role"`
	Term    uint64 `json:"term"`
	Reason  string `json:"reason,omitempty"`
}

RoleChangedEvent contains data for role change events

type SecurityConfig

type SecurityConfig struct {
	EnableTLS        bool   `yaml:"enable_tls" json:"enable_tls" default:"false"`
	EnableMTLS       bool   `yaml:"enable_mtls" json:"enable_mtls" default:"false"`
	CertFile         string `yaml:"cert_file" json:"cert_file"`
	KeyFile          string `yaml:"key_file" json:"key_file"`
	CAFile           string `yaml:"ca_file" json:"ca_file"`
	SkipVerify       bool   `yaml:"skip_verify" json:"skip_verify" default:"false"`
	EnableEncryption bool   `yaml:"enable_encryption" json:"enable_encryption" default:"false"`
	EncryptionKey    string `yaml:"encryption_key" json:"encryption_key"`
}

SecurityConfig contains security configuration

type Snapshot

type Snapshot struct {
	Index    uint64    `json:"index"`
	Term     uint64    `json:"term"`
	Data     []byte    `json:"data"`
	Size     int64     `json:"size"`
	Created  time.Time `json:"created"`
	Checksum string    `json:"checksum"`
}

Snapshot represents a point-in-time snapshot of the state machine

type SnapshotEvent

type SnapshotEvent struct {
	Index    uint64        `json:"index"`
	Term     uint64        `json:"term"`
	Size     int64         `json:"size"`
	Duration time.Duration `json:"duration,omitempty"`
	Error    string        `json:"error,omitempty"`
}

SnapshotEvent contains data for snapshot events

type SnapshotMetadata

type SnapshotMetadata struct {
	Index    uint64    `json:"index"`
	Term     uint64    `json:"term"`
	Size     int64     `json:"size"`
	Created  time.Time `json:"created"`
	Checksum string    `json:"checksum"`
}

SnapshotMetadata contains metadata about a snapshot

type StateMachine

type StateMachine interface {
	// Apply applies a log entry to the state machine
	Apply(entry LogEntry) error

	// Snapshot creates a snapshot of the current state
	Snapshot() (*Snapshot, error)

	// Restore restores the state machine from a snapshot
	Restore(snapshot *Snapshot) error

	// Query performs a read-only query
	Query(query interface{}) (interface{}, error)
}

StateMachine defines the interface for the replicated state machine

type Storage

type Storage interface {
	// Start starts the storage backend
	Start(ctx context.Context) error

	// Stop stops the storage backend
	Stop(ctx context.Context) error

	// Set stores a key-value pair
	Set(key, value []byte) error

	// Get retrieves a value by key
	Get(key []byte) ([]byte, error)

	// Delete deletes a key
	Delete(key []byte) error

	// Batch executes a batch of operations
	Batch(ops []BatchOp) error

	// GetRange retrieves a range of key-value pairs
	GetRange(start, end []byte) ([]KeyValue, error)

	// Close closes the storage backend
	Close() error
}

Storage defines the interface for persistent storage

type StorageConfig

type StorageConfig struct {
	Type           string        `yaml:"type" json:"type" default:"badger"` // badger, boltdb, pebble, postgres
	Path           string        `yaml:"path" json:"path" default:"./data/consensus"`
	SyncWrites     bool          `yaml:"sync_writes" json:"sync_writes" default:"true"`
	MaxBatchSize   int           `yaml:"max_batch_size" json:"max_batch_size" default:"1000"`
	MaxBatchDelay  time.Duration `yaml:"max_batch_delay" json:"max_batch_delay" default:"10ms"`
	CompactOnStart bool          `yaml:"compact_on_start" json:"compact_on_start" default:"false"`
	// BadgerDB specific
	BadgerOptions BadgerOptions `yaml:"badger_options" json:"badger_options"`
	// BoltDB specific
	BoltOptions BoltOptions `yaml:"bolt_options" json:"bolt_options"`
}

StorageConfig contains storage backend configuration

type TracingConfig

type TracingConfig struct {
	Enabled     bool    `yaml:"enabled" json:"enabled" default:"false"`
	ServiceName string  `yaml:"service_name" json:"service_name" default:"forge-consensus"`
	SampleRate  float64 `yaml:"sample_rate" json:"sample_rate" default:"0.1"`
}

TracingConfig contains tracing configuration

type Transport

type Transport interface {
	// Start starts the transport
	Start(ctx context.Context) error

	// Stop stops the transport
	Stop(ctx context.Context) error

	// Send sends a message to a peer
	Send(ctx context.Context, target string, message interface{}) error

	// Receive returns a channel for receiving messages
	Receive() <-chan Message

	// AddPeer adds a peer
	AddPeer(nodeID, address string, port int) error

	// RemovePeer removes a peer
	RemovePeer(nodeID string) error

	// GetAddress returns the local address
	GetAddress() string
}

Transport defines the interface for network transport

type TransportConfig

type TransportConfig struct {
	Type               string        `yaml:"type" json:"type" default:"grpc"`                            // grpc, tcp
	MaxMessageSize     int           `yaml:"max_message_size" json:"max_message_size" default:"4194304"` // 4MB
	Timeout            time.Duration `yaml:"timeout" json:"timeout" default:"10s"`
	KeepAlive          bool          `yaml:"keep_alive" json:"keep_alive" default:"true"`
	KeepAliveInterval  time.Duration `yaml:"keep_alive_interval" json:"keep_alive_interval" default:"30s"`
	KeepAliveTimeout   time.Duration `yaml:"keep_alive_timeout" json:"keep_alive_timeout" default:"10s"`
	MaxConnections     int           `yaml:"max_connections" json:"max_connections" default:"100"`
	ConnectionTimeout  time.Duration `yaml:"connection_timeout" json:"connection_timeout" default:"5s"`
	IdleTimeout        time.Duration `yaml:"idle_timeout" json:"idle_timeout" default:"5m"`
	EnableCompression  bool          `yaml:"enable_compression" json:"enable_compression" default:"true"`
	CompressionLevel   int           `yaml:"compression_level" json:"compression_level" default:"6"`
	EnableMultiplexing bool          `yaml:"enable_multiplexing" json:"enable_multiplexing" default:"true"`
}

TransportConfig contains transport layer configuration

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL