consensus

package
v0.8.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 3, 2026 License: Apache-2.0 Imports: 7 Imported by: 0

README

Forge Consensus Extension

Production-grade distributed consensus extension for Forge that enables high availability, leader election, and distributed coordination using the Raft consensus algorithm.

Features

Core Capabilities
  • Raft Consensus: Industry-standard Raft algorithm implementation
  • Leader Election: Automatic leader election with configurable timeouts
  • Log Replication: Consistent log replication across all nodes
  • Snapshot & Compaction: Automatic snapshotting and log compaction
  • Membership Changes: Dynamic cluster membership with joint consensus
Transport Layer
  • Multiple Transports: gRPC (production) and TCP support
  • Connection Pooling: Efficient connection management
  • TLS/mTLS: Secure communication with mutual TLS
  • Compression: Optional message compression for reduced bandwidth
Service Discovery
  • Static Configuration: Simple peer list for small clusters
  • DNS Discovery: DNS-based service discovery
  • Consul Integration: HashiCorp Consul service discovery
  • Kubernetes: Native Kubernetes service discovery
  • etcd: etcd-based discovery support
Storage Backends
  • BadgerDB: High-performance embedded key-value store (default)
  • BoltDB: Reliable embedded database
  • Pebble: RocksDB-inspired LSM storage
  • PostgreSQL: Distributed PostgreSQL backend (experimental)
Observability
  • Prometheus Metrics: Comprehensive metrics for monitoring
  • Health Checks: Multi-level health checking
  • Distributed Tracing: OpenTelemetry tracing support
  • Structured Logging: Contextual logging with correlation IDs
Resilience
  • Circuit Breakers: Automatic failure detection and recovery
  • Retry Logic: Exponential backoff with jitter
  • Timeouts: Configurable timeouts at all layers
  • Failover: Automatic leader failover
Security
  • TLS/mTLS: Transport layer security
  • Authentication: Node authentication
  • Authorization: Role-based access control (RBAC)
  • Encryption: Optional data encryption at rest
Administration
  • REST API: Complete admin API for cluster management
  • Health Endpoints: Health and readiness checks
  • Metrics Endpoints: Real-time metrics
  • CLI Tools: Command-line tools for administration

Quick Start

Installation
go get github.com/xraph/forge/extensions/consensus
Basic Usage
package main

import (
    "context"
    "log"
    
    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/consensus"
)

func main() {
    // Create Forge app
    app := forge.New()
    
    // Add consensus extension
    app.RegisterExtension(consensus.NewExtension(
        consensus.WithNodeID("node-1"),
        consensus.WithClusterID("my-cluster"),
        consensus.WithBindAddress("0.0.0.0", 7000),
        consensus.WithPeers([]consensus.PeerConfig{
            {ID: "node-1", Address: "localhost", Port: 7000},
            {ID: "node-2", Address: "localhost", Port: 7001},
            {ID: "node-3", Address: "localhost", Port: 7002},
        }),
    ))
    
    // Start app
    if err := app.Start(context.Background()); err != nil {
        log.Fatal(err)
    }
    
    // Get consensus service
    consensusService, _ := forge.Resolve[*consensus.Service](app.Container(), "consensus")
    
    // Check if leader
    if consensusService.IsLeader() {
        log.Println("This node is the leader!")
    }
    
    // Apply a command (only on leader)
    cmd := consensus.Command{
        Type: "set",
        Payload: map[string]interface{}{
            "key":   "user:123",
            "value": "John Doe",
        },
    }
    
    if err := consensusService.Apply(context.Background(), cmd); err != nil {
        log.Printf("Failed to apply command: %v", err)
    }
    
    // Block forever
    select {}
}
Configuration File
# config.yaml
extensions:
  consensus:
    node_id: "node-1"
    cluster_id: "production-cluster"
    bind_addr: "0.0.0.0"
    bind_port: 7000
    
    peers:
      - id: "node-1"
        address: "10.0.1.10"
        port: 7000
      - id: "node-2"
        address: "10.0.1.11"
        port: 7000
      - id: "node-3"
        address: "10.0.1.12"
        port: 7000
    
    raft:
      heartbeat_interval: 1s
      election_timeout_min: 5s
      election_timeout_max: 10s
      snapshot_interval: 30m
      snapshot_threshold: 10000
    
    transport:
      type: grpc
      enable_compression: true
      max_message_size: 4194304  # 4MB
      timeout: 10s
    
    storage:
      type: badger
      path: ./data/consensus
      sync_writes: true
    
    discovery:
      type: kubernetes
      namespace: default
      service_name: forge-consensus
    
    security:
      enable_tls: true
      cert_file: /etc/certs/server.crt
      key_file: /etc/certs/server.key
      ca_file: /etc/certs/ca.crt
      enable_mtls: true
    
    admin_api:
      enabled: true
      path_prefix: /consensus
    
    observability:
      metrics:
        enabled: true
        collection_interval: 15s
      tracing:
        enabled: true
        sample_rate: 0.1
      logging:
        level: info
        log_raft_details: false

Usage Examples

Leader Election

The consensus extension automatically handles leader election. You can check leadership status:

service := consensusService.Service()

// Check if this node is the leader
if service.IsLeader() {
    log.Println("I am the leader")
}

// Get current leader ID
leaderID := service.GetLeader()
log.Printf("Current leader: %s", leaderID)

// Get current role
role := service.GetRole() // "follower", "candidate", or "leader"
log.Printf("My role: %s", role)
Applying Commands

Only the leader can apply commands to the state machine:

cmd := consensus.Command{
    Type: "create_user",
    Payload: map[string]interface{}{
        "id":    "123",
        "name":  "Alice",
        "email": "alice@example.com",
    },
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

err := service.Apply(ctx, cmd)
if consensus.IsNotLeaderError(err) {
    // Redirect to leader
    leaderID := service.GetLeader()
    log.Printf("Not leader. Redirect to: %s", leaderID)
} else if err != nil {
    log.Printf("Failed to apply command: %v", err)
}
Reading Data

Reads can be performed on any node (eventual consistency) or restricted to the leader (strong consistency):

// Read from any node (eventual consistency)
result, err := service.Read(ctx, "user:123")

// For strong consistency, check leadership first
if service.IsLeader() {
    result, err := service.Read(ctx, "user:123")
}
Cluster Management

Add or remove nodes dynamically:

// Add a new node (only leader can do this)
err := service.AddNode(ctx, "node-4", "10.0.1.13", 7000)

// Remove a node
err := service.RemoveNode(ctx, "node-4")

// Get cluster information
info := service.GetClusterInfo()
fmt.Printf("Cluster ID: %s\n", info.ID)
fmt.Printf("Leader: %s\n", info.Leader)
fmt.Printf("Total Nodes: %d\n", info.TotalNodes)
fmt.Printf("Active Nodes: %d\n", info.ActiveNodes)
fmt.Printf("Has Quorum: %v\n", info.HasQuorum)
Leadership Transfer

Transfer leadership to another node:

// Transfer leadership to node-2
err := service.TransferLeadership(ctx, "node-2")

// Or step down as leader (triggers new election)
err := service.StepDown(ctx)
Snapshots

Create snapshots manually or rely on automatic snapshots:

// Manual snapshot
err := service.Snapshot(ctx)

// Automatic snapshots are configured via:
// - snapshot_interval: time between snapshots
// - snapshot_threshold: number of log entries before snapshot
Health Checks
// Simple health check
err := service.HealthCheck(ctx)
if err != nil {
    log.Printf("Unhealthy: %v", err)
}

// Detailed health status
status := service.GetHealthStatus(ctx)
fmt.Printf("Status: %s\n", status.Status)
fmt.Printf("Leader: %v\n", status.Leader)
fmt.Printf("Has Quorum: %v\n", status.HasQuorum)
fmt.Printf("Active Nodes: %d/%d\n", status.ActiveNodes, status.TotalNodes)

for _, check := range status.Details {
    fmt.Printf("  %s: %v - %s\n", check.Name, check.Healthy, check.Message)
}
Middleware Integration

Use consensus middleware to enforce leadership:

import (
    "github.com/xraph/forge/extensions/consensus/middleware"
)

// Get leadership checker
checker, _ := forge.Resolve[*consensus.LeadershipChecker](app.Container(), "consensus:leadership")
leadershipMW := middleware.NewLeadershipMiddleware(checker, logger)

// Require leader for write endpoints
router.POST("/api/users", leadershipMW.RequireLeader()(createUserHandler))

// Add leader information to all responses
router.Use(leadershipMW.AddLeaderHeader())

// Route reads to any node, writes to leader only
router.Use(leadershipMW.ReadOnlyRouting())

// Enforce consistency levels
router.GET("/api/users/:id", 
    leadershipMW.ConsistencyMiddleware(middleware.ConsistencyStrong)(getUserHandler))

Admin API

The consensus extension provides a REST API for administration:

Health Check
curl http://localhost:8080/consensus/health
Cluster Status
curl http://localhost:8080/consensus/status
List Nodes
curl http://localhost:8080/consensus/nodes
Get Leader
curl http://localhost:8080/consensus/leader
Metrics
curl http://localhost:8080/consensus/metrics
Add Node
curl -X POST http://localhost:8080/consensus/add-node \
  -H "Content-Type: application/json" \
  -d '{
    "node_id": "node-4",
    "address": "10.0.1.13",
    "port": 7000
  }'
Remove Node
curl -X POST http://localhost:8080/consensus/remove-node \
  -H "Content-Type: application/json" \
  -d '{"node_id": "node-4"}'
Transfer Leadership
curl -X POST http://localhost:8080/consensus/transfer-leadership \
  -H "Content-Type: application/json" \
  -d '{"target_node_id": "node-2"}'
Create Snapshot
curl -X POST http://localhost:8080/consensus/snapshot

Monitoring

Prometheus Metrics

The extension exports comprehensive Prometheus metrics:

# Node metrics
forge_consensus_cluster_size
forge_consensus_cluster_healthy_nodes
forge_consensus_is_leader
forge_consensus_has_quorum
forge_consensus_term

# Operations metrics
forge_consensus_operations_total
forge_consensus_operations_failed
forge_consensus_operations_per_sec
forge_consensus_operation_duration_seconds

# Election metrics
forge_consensus_leader_elections_total
forge_consensus_leader_election_duration_seconds

# Log metrics
forge_consensus_commit_index
forge_consensus_last_applied
forge_consensus_log_size
forge_consensus_log_entries

# Snapshot metrics
forge_consensus_snapshots_total
forge_consensus_snapshot_duration_seconds
Grafana Dashboard

A complete Grafana dashboard is available at examples/grafana/consensus-dashboard.json.

Production Considerations

Cluster Size
  • Minimum: 3 nodes for fault tolerance
  • Recommended: 5 nodes for production
  • Maximum: 7-9 nodes (more nodes = slower consensus)
Performance Tuning
raft:
  # Faster heartbeats for quicker failure detection
  heartbeat_interval: 500ms
  
  # Shorter election timeout for faster recovery
  election_timeout_min: 2s
  election_timeout_max: 4s
  
  # Larger batch size for higher throughput
  replication_batch_size: 500
  max_append_entries: 128

transport:
  # Enable compression for reduced bandwidth
  enable_compression: true
  compression_level: 6
  
  # Larger message size for bulk operations
  max_message_size: 16777216  # 16MB

storage:
  # Increase batch size for higher write throughput
  max_batch_size: 5000
  max_batch_delay: 5ms
High Availability
  1. Deploy across availability zones: Distribute nodes across different AZs
  2. Use persistent storage: Ensure data is persisted to disk
  3. Monitor health: Set up health checks and alerting
  4. Backup snapshots: Regularly backup snapshot data
  5. Test failover: Regularly test node failures
Security Hardening
security:
  # Always enable TLS in production
  enable_tls: true
  enable_mtls: true
  
  # Use strong cipher suites
  # Rotate certificates regularly
  
  # Enable encryption at rest
  enable_encryption: true
  encryption_key: "${CONSENSUS_ENCRYPTION_KEY}"

Troubleshooting

No Leader Elected

Symptoms: Cluster has no leader, writes fail

Causes:

  • Network partitions
  • Majority of nodes down
  • Clock skew between nodes
  • Incorrect peer configuration

Solutions:

  • Check network connectivity between nodes
  • Ensure at least N/2 + 1 nodes are healthy
  • Synchronize clocks with NTP
  • Verify peer configuration
Split Brain

Symptoms: Multiple nodes think they are leader

Causes:

  • Network partition
  • Inconsistent peer configuration

Prevention:

  • Use odd number of nodes (3, 5, 7)
  • Deploy across availability zones
  • Use proper network policies
High Latency

Symptoms: Slow command application

Causes:

  • Large log entries
  • Slow disk I/O
  • Network latency
  • Too many nodes

Solutions:

  • Reduce message size
  • Use faster disks (SSD/NVMe)
  • Reduce cluster size
  • Enable compression
  • Increase snapshot frequency
Log Growth

Symptoms: Disk space filling up

Solutions:

  • Reduce snapshot_interval
  • Lower snapshot_threshold
  • Enable automatic compaction
  • Monitor disk usage

Architecture

┌─────────────────────────────────────────────────────────────────┐
│                    Forge Consensus Extension                     │
├─────────────────────────────────────────────────────────────────┤
│                                                                   │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐          │
│  │   Raft Core  │  │   Election   │  │   Cluster    │          │
│  │              │◄─┤   Manager    │◄─┤   Manager    │          │
│  └──────┬───────┘  └──────────────┘  └──────────────┘          │
│         │                                                         │
│  ┌──────▼───────┐  ┌──────────────┐  ┌──────────────┐          │
│  │   Storage    │  │  Transport   │  │  Discovery   │          │
│  │   Layer      │  │   Layer      │  │   Service    │          │
│  └──────────────┘  └──────────────┘  └──────────────┘          │
│         │                  │                  │                  │
│  ┌──────▼──────────────────▼──────────────────▼────────┐        │
│  │           State Machine & Event Emitter              │        │
│  └──────────────────────────────────────────────────────┘        │
│         │                                                         │
│  ┌──────▼──────────────────────────────────────────────┐        │
│  │    Health • Metrics • Observability • Admin API      │        │
│  └──────────────────────────────────────────────────────┘        │
└─────────────────────────────────────────────────────────────────┘

Contributing

Contributions are welcome! Please see CONTRIBUTING.md for guidelines.

License

MIT License - see LICENSE for details.

Support

Acknowledgments

Documentation

Index

Constants

View Source
const (
	RoleFollower  = internal.RoleFollower
	RoleCandidate = internal.RoleCandidate
	RoleLeader    = internal.RoleLeader
)

Constants - Node Roles.

View Source
const (
	StatusActive    = internal.StatusActive
	StatusInactive  = internal.StatusInactive
	StatusSuspected = internal.StatusSuspected
	StatusFailed    = internal.StatusFailed
)

Constants - Node Status.

View Source
const (
	EntryNormal  = internal.EntryNormal
	EntryConfig  = internal.EntryConfig
	EntryBarrier = internal.EntryBarrier
	EntryNoop    = internal.EntryNoop
)

Constants - Entry Types.

View Source
const (
	MessageTypeAppendEntries   = internal.MessageTypeAppendEntries
	MessageTypeRequestVote     = internal.MessageTypeRequestVote
	MessageTypeInstallSnapshot = internal.MessageTypeInstallSnapshot
	MessageTypeHeartbeat       = internal.MessageTypeHeartbeat
)

Constants - Message Types.

View Source
const (
	NodeChangeTypeAdded   = internal.NodeChangeTypeAdded
	NodeChangeTypeRemoved = internal.NodeChangeTypeRemoved
	NodeChangeTypeUpdated = internal.NodeChangeTypeUpdated
)

Constants - Node Change Types.

View Source
const (
	BatchOpSet    = internal.BatchOpSet
	BatchOpDelete = internal.BatchOpDelete
)

Constants - Batch Op Types.

View Source
const (
	ConsensusEventNodeStarted       = internal.ConsensusEventNodeStarted
	ConsensusEventNodeStopped       = internal.ConsensusEventNodeStopped
	ConsensusEventNodeJoined        = internal.ConsensusEventNodeJoined
	ConsensusEventNodeLeft          = internal.ConsensusEventNodeLeft
	ConsensusEventNodeFailed        = internal.ConsensusEventNodeFailed
	ConsensusEventNodeRecovered     = internal.ConsensusEventNodeRecovered
	ConsensusEventLeaderElected     = internal.ConsensusEventLeaderElected
	ConsensusEventLeaderStepDown    = internal.ConsensusEventLeaderStepDown
	ConsensusEventLeaderTransfer    = internal.ConsensusEventLeaderTransfer
	ConsensusEventLeaderLost        = internal.ConsensusEventLeaderLost
	ConsensusEventRoleChanged       = internal.ConsensusEventRoleChanged
	ConsensusEventBecameFollower    = internal.ConsensusEventBecameFollower
	ConsensusEventBecameCandidate   = internal.ConsensusEventBecameCandidate
	ConsensusEventBecameLeader      = internal.ConsensusEventBecameLeader
	ConsensusEventClusterFormed     = internal.ConsensusEventClusterFormed
	ConsensusEventClusterUpdated    = internal.ConsensusEventClusterUpdated
	ConsensusEventQuorumAchieved    = internal.ConsensusEventQuorumAchieved
	ConsensusEventQuorumLost        = internal.ConsensusEventQuorumLost
	ConsensusEventMembershipChanged = internal.ConsensusEventMembershipChanged
	ConsensusEventLogAppended       = internal.ConsensusEventLogAppended
	ConsensusEventLogCommitted      = internal.ConsensusEventLogCommitted
	ConsensusEventLogCompacted      = internal.ConsensusEventLogCompacted
	ConsensusEventLogTruncated      = internal.ConsensusEventLogTruncated
	ConsensusEventSnapshotStarted   = internal.ConsensusEventSnapshotStarted
	ConsensusEventSnapshotCompleted = internal.ConsensusEventSnapshotCompleted
	ConsensusEventSnapshotFailed    = internal.ConsensusEventSnapshotFailed
	ConsensusEventSnapshotRestored  = internal.ConsensusEventSnapshotRestored
	ConsensusEventHealthy           = internal.ConsensusEventHealthy
	ConsensusEventUnhealthy         = internal.ConsensusEventUnhealthy
	ConsensusEventDegraded          = internal.ConsensusEventDegraded
	ConsensusEventRecovering        = internal.ConsensusEventRecovering
	ConsensusEventConfigUpdated     = internal.ConsensusEventConfigUpdated
	ConsensusEventConfigReloaded    = internal.ConsensusEventConfigReloaded
)

Constants - Event Types.

Variables

View Source
var (
	ErrNotLeader            = internal.ErrNotLeader
	ErrNoLeader             = internal.ErrNoLeader
	ErrNotStarted           = internal.ErrNotStarted
	ErrAlreadyStarted       = internal.ErrAlreadyStarted
	ErrNodeNotFound         = internal.ErrNodeNotFound
	ErrClusterNotFound      = internal.ErrClusterNotFound
	ErrStorageUnavailable   = internal.ErrStorageUnavailable
	ErrTransportUnavailable = internal.ErrTransportUnavailable
	ErrDiscoveryUnavailable = internal.ErrDiscoveryUnavailable
	ErrNoQuorum             = internal.ErrNoQuorum
	ErrInvalidTerm          = internal.ErrInvalidTerm
	ErrStaleTerm            = internal.ErrStaleTerm
	ErrLogInconsistent      = internal.ErrLogInconsistent
	ErrSnapshotFailed       = internal.ErrSnapshotFailed
	ErrCompactionFailed     = internal.ErrCompactionFailed
	ErrElectionTimeout      = internal.ErrElectionTimeout
	ErrInvalidPeer          = internal.ErrInvalidPeer
	ErrPeerExists           = internal.ErrPeerExists
	ErrPeerNotFound         = internal.ErrPeerNotFound
	ErrInsufficientPeers    = internal.ErrInsufficientPeers
	ErrInvalidConfig        = internal.ErrInvalidConfig
)

Error exports.

View Source
var (
	NewNotLeaderError = internal.NewNotLeaderError
	NewNoLeaderError  = internal.NewNoLeaderError
	NewTimeoutError   = internal.NewTimeoutError
	NewNoQuorumError  = internal.NewNoQuorumError
	NewStaleTermError = internal.NewStaleTermError
	IsNotLeaderError  = internal.IsNotLeaderError
	IsNoLeaderError   = internal.IsNoLeaderError
	IsNoQuorumError   = internal.IsNoQuorumError
	IsStaleTermError  = internal.IsStaleTermError
	IsRetryable       = internal.IsRetryable
	IsFatal           = internal.IsFatal
)

Error helper functions.

View Source
var (
	DefaultConfig     = internal.DefaultConfig
	WithNodeID        = internal.WithNodeID
	WithClusterID     = internal.WithClusterID
	WithBindAddress   = internal.WithBindAddress
	WithPeers         = internal.WithPeers
	WithTransportType = internal.WithTransportType
	WithDiscoveryType = internal.WithDiscoveryType
	WithStorageType   = internal.WithStorageType
	WithStoragePath   = internal.WithStoragePath
	WithTLS           = internal.WithTLS
	WithMTLS          = internal.WithMTLS
	WithConfig        = internal.WithConfig
	WithRequireConfig = internal.WithRequireConfig
)

Config functions.

Functions

func NewExtension

func NewExtension(opts ...internal.ConfigOption) forge.Extension

NewExtension creates a new consensus extension with functional options.

func NewExtensionWithConfig

func NewExtensionWithConfig(config Config) forge.Extension

NewExtensionWithConfig creates a new consensus extension with a complete config.

Types

type AdminAPIConfig

type AdminAPIConfig = internal.AdminAPIConfig

type AdvancedConfig

type AdvancedConfig = internal.AdvancedConfig

type AppendEntriesRequest

type AppendEntriesRequest = internal.AppendEntriesRequest

type AppendEntriesResponse

type AppendEntriesResponse = internal.AppendEntriesResponse

type BadgerOptions

type BadgerOptions = internal.BadgerOptions

type BatchOp

type BatchOp = internal.BatchOp

type BatchOpType

type BatchOpType = internal.BatchOpType

type BoltOptions

type BoltOptions = internal.BoltOptions

type ClusterInfo

type ClusterInfo = internal.ClusterInfo

type ClusterManager

type ClusterManager = internal.ClusterManager

type Command

type Command = internal.Command

type Config

type Config = internal.Config

Config exports.

type ConfigOption

type ConfigOption = internal.ConfigOption

type ConsensusEvent

type ConsensusEvent = internal.ConsensusEvent

type ConsensusEventType

type ConsensusEventType = internal.ConsensusEventType

Event exports.

type ConsensusService

type ConsensusService = internal.ConsensusService

type ConsensusStats

type ConsensusStats = internal.ConsensusStats

type Discovery

type Discovery = internal.Discovery

type DiscoveryConfig

type DiscoveryConfig = internal.DiscoveryConfig

type ElectionConfig

type ElectionConfig = internal.ElectionConfig

type EntryType

type EntryType = internal.EntryType

type EventsConfig

type EventsConfig = internal.EventsConfig

type Extension

type Extension struct {
	*forge.BaseExtension
	// contains filtered or unexported fields
}

Extension implements forge.Extension for distributed consensus.

func (*Extension) Apply

func (e *Extension) Apply(ctx context.Context, cmd Command) error

Apply applies a command to the consensus system.

func (*Extension) Dependencies

func (e *Extension) Dependencies() []string

Dependencies returns extension dependencies.

func (*Extension) GetClusterInfo

func (e *Extension) GetClusterInfo() ClusterInfo

GetClusterInfo returns cluster information.

func (*Extension) GetLeader

func (e *Extension) GetLeader() string

GetLeader returns the current leader node ID.

func (*Extension) GetRole

func (e *Extension) GetRole() NodeRole

GetRole returns the current node role.

func (*Extension) GetStats

func (e *Extension) GetStats() ConsensusStats

GetStats returns consensus statistics.

func (*Extension) Health

func (e *Extension) Health(ctx context.Context) error

Health checks the health of the consensus system.

func (*Extension) IsLeader

func (e *Extension) IsLeader() bool

IsLeader returns true if this node is the leader.

func (*Extension) Metrics

func (e *Extension) Metrics() map[string]any

Metrics returns observable metrics (implements ObservableExtension).

func (*Extension) Read

func (e *Extension) Read(ctx context.Context, query any) (any, error)

Read performs a consistent read operation.

func (*Extension) Register

func (e *Extension) Register(app forge.App) error

Register registers the extension with the app.

func (*Extension) Reload

func (e *Extension) Reload(ctx context.Context) error

Reload reloads the extension configuration (implements HotReloadableExtension).

func (*Extension) Service

func (e *Extension) Service() *Service

Service returns the consensus service (for advanced usage).

func (*Extension) Start

func (e *Extension) Start(ctx context.Context) error

Start starts the consensus extension.

func (*Extension) Stop

func (e *Extension) Stop(ctx context.Context) error

Stop stops the consensus extension.

type HealthCheck

type HealthCheck = internal.HealthCheck

type HealthConfig

type HealthConfig = internal.HealthConfig

type HealthStatus

type HealthStatus = internal.HealthStatus

type HealthStatusEvent

type HealthStatusEvent = internal.HealthStatusEvent

type InstallSnapshotRequest

type InstallSnapshotRequest = internal.InstallSnapshotRequest

type InstallSnapshotResponse

type InstallSnapshotResponse = internal.InstallSnapshotResponse

type KeyValue

type KeyValue = internal.KeyValue

type LeaderElectedEvent

type LeaderElectedEvent = internal.LeaderElectedEvent

type LeadershipChecker

type LeadershipChecker struct {
	// contains filtered or unexported fields
}

LeadershipChecker provides middleware support for checking leadership.

func NewLeadershipChecker

func NewLeadershipChecker(service ConsensusService) *LeadershipChecker

NewLeadershipChecker creates a new leadership checker.

func (*LeadershipChecker) GetLeader

func (lc *LeadershipChecker) GetLeader() string

GetLeader returns the current leader node ID.

func (*LeadershipChecker) IsLeader

func (lc *LeadershipChecker) IsLeader() bool

IsLeader returns true if this node is the leader.

func (*LeadershipChecker) RequireLeader

func (lc *LeadershipChecker) RequireLeader() error

RequireLeader returns an error if this node is not the leader.

func (*LeadershipChecker) RequireQuorum

func (lc *LeadershipChecker) RequireQuorum() error

RequireQuorum returns an error if there is no quorum.

type LogEntry

type LogEntry = internal.LogEntry

type LoggingConfig

type LoggingConfig = internal.LoggingConfig

type MembershipChangedEvent

type MembershipChangedEvent = internal.MembershipChangedEvent

type Message

type Message = internal.Message

type MessageType

type MessageType = internal.MessageType

type MetricsConfig

type MetricsConfig = internal.MetricsConfig

type NodeChangeEvent

type NodeChangeEvent = internal.NodeChangeEvent

type NodeChangeType

type NodeChangeType = internal.NodeChangeType

type NodeInfo

type NodeInfo = internal.NodeInfo

type NodeRole

type NodeRole = internal.NodeRole

Type exports.

type NodeStatus

type NodeStatus = internal.NodeStatus

type ObservabilityConfig

type ObservabilityConfig = internal.ObservabilityConfig

type PeerConfig

type PeerConfig = internal.PeerConfig

type QuorumStatusEvent

type QuorumStatusEvent = internal.QuorumStatusEvent

type RaftConfig

type RaftConfig = internal.RaftConfig

type RaftNode

type RaftNode = internal.RaftNode

type RaftStats

type RaftStats = internal.RaftStats

type RequestVoteRequest

type RequestVoteRequest = internal.RequestVoteRequest

type RequestVoteResponse

type RequestVoteResponse = internal.RequestVoteResponse

type ResilienceConfig

type ResilienceConfig = internal.ResilienceConfig

type RoleChangedEvent

type RoleChangedEvent = internal.RoleChangedEvent

type SecurityConfig

type SecurityConfig = internal.SecurityConfig

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service implements internal.ConsensusService.

func NewConsensusService

func NewConsensusService(config internal.Config, logger forge.Logger, metrics forge.Metrics) (*Service, error)

NewConsensusService creates a new consensus service.

func (*Service) AddNode

func (s *Service) AddNode(ctx context.Context, nodeID, address string, port int) error

AddNode adds a node to the cluster.

func (*Service) Apply

func (s *Service) Apply(ctx context.Context, cmd Command) error

Apply applies a command to the state machine.

func (*Service) GetClusterInfo

func (s *Service) GetClusterInfo() ClusterInfo

GetClusterInfo returns cluster information.

func (*Service) GetClusterManager

func (s *Service) GetClusterManager() ClusterManager

GetClusterManager returns the cluster manager.

func (*Service) GetDiscovery

func (s *Service) GetDiscovery() Discovery

GetDiscovery returns the discovery service.

func (*Service) GetHealthStatus

func (s *Service) GetHealthStatus(ctx context.Context) HealthStatus

GetHealthStatus returns detailed health status.

func (*Service) GetLeader

func (s *Service) GetLeader() string

GetLeader returns the current leader node ID.

func (*Service) GetRaftNode

func (s *Service) GetRaftNode() RaftNode

GetRaftNode returns the Raft node.

func (*Service) GetRole

func (s *Service) GetRole() NodeRole

GetRole returns the current role of this node.

func (*Service) GetStateMachine

func (s *Service) GetStateMachine() StateMachine

GetStateMachine returns the state machine.

func (*Service) GetStats

func (s *Service) GetStats() internal.ConsensusStats

GetStats returns consensus statistics.

func (*Service) GetStorage

func (s *Service) GetStorage() Storage

GetStorage returns the storage backend.

func (*Service) GetTerm

func (s *Service) GetTerm() uint64

GetTerm returns the current term.

func (*Service) GetTransport

func (s *Service) GetTransport() Transport

GetTransport returns the transport.

func (*Service) HealthCheck

func (s *Service) HealthCheck(ctx context.Context) error

HealthCheck performs a health check.

func (*Service) IsLeader

func (s *Service) IsLeader() bool

IsLeader returns true if this node is the leader.

func (*Service) Read

func (s *Service) Read(ctx context.Context, query any) (any, error)

Read performs a consistent read operation.

func (*Service) RemoveNode

func (s *Service) RemoveNode(ctx context.Context, nodeID string) error

RemoveNode removes a node from the cluster.

func (*Service) Snapshot

func (s *Service) Snapshot(ctx context.Context) error

Snapshot creates a snapshot.

func (*Service) Start

func (s *Service) Start(ctx context.Context) error

Start starts the consensus service.

func (*Service) StepDown

func (s *Service) StepDown(ctx context.Context) error

StepDown causes the leader to step down.

func (*Service) Stop

func (s *Service) Stop(ctx context.Context) error

Stop stops the consensus service.

func (*Service) TransferLeadership

func (s *Service) TransferLeadership(ctx context.Context, targetNodeID string) error

TransferLeadership transfers leadership to another node.

func (*Service) UpdateConfig

func (s *Service) UpdateConfig(ctx context.Context, config Config) error

UpdateConfig updates the configuration.

type Snapshot

type Snapshot = internal.Snapshot

type SnapshotEvent

type SnapshotEvent = internal.SnapshotEvent

type StateMachine

type StateMachine = internal.StateMachine

type Storage

type Storage = internal.Storage

type StorageConfig

type StorageConfig = internal.StorageConfig

type TracingConfig

type TracingConfig = internal.TracingConfig

type Transport

type Transport = internal.Transport

type TransportConfig

type TransportConfig = internal.TransportConfig

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL