elector

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2026 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Overview

Package elector integrates s3lect leader election into the Netsy node lifecycle. It manages the s3lect Elector, the dedicated HTTPS election health server, and wires leadership changes into the local node state.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type HeartbeatForwarder

type HeartbeatForwarder interface {
	SendHeartbeat(context.Context, *proto.NodeState) (*emptypb.Empty, error)
}

HeartbeatForwarder forwards heartbeats to another subsystem. When the node is both Elector and Primary, heartbeats received by the Elector are forwarded to the Primary so that its replica health tracker is updated using the same server-side code path.

type Metrics

type Metrics struct {
	RegisteredNodes         prometheus.Gauge
	HealthyNodes            prometheus.Gauge
	DegradedNodes           prometheus.Gauge
	PrimaryElections        *prometheus.CounterVec
	PrimaryElectionFailures *prometheus.CounterVec
	PrimaryElectionDuration *prometheus.HistogramVec
	PrimaryElectionContacts *prometheus.CounterVec
	AutoDeregistrations     prometheus.Counter
}

Metrics holds Elector-scoped Prometheus metrics. These are registered through a RoleGroup and disappear from scrape output when the node is not the Elector.

func NewMetrics

func NewMetrics() *Metrics

NewMetrics creates all Elector-scoped Prometheus metrics.

func (*Metrics) Collectors

func (m *Metrics) Collectors() []prometheus.Collector

Collectors returns all Elector-scoped collectors for registration with a RoleGroup.

type NodeEntry

type NodeEntry struct {
	NodeID                 string
	MemberID               uint64
	ClientAdvertiseAddress string
	PeerAdvertiseAddress   string
	LastHeartbeat          time.Time
	DegradedAt             time.Time
	HealthState            nodestate.HealthState
	PrimaryState           nodestate.PrimaryState
	LatestRevision         int64
	StartTime              int64
}

NodeEntry represents a registered node in the Elector's node map.

type NodeMap

type NodeMap struct {
	// contains filtered or unexported fields
}

NodeMap is the Elector's authoritative in-memory map of registered nodes.

func NewNodeMap

func NewNodeMap(logger *slog.Logger) *NodeMap

NewNodeMap creates a new empty NodeMap.

func (*NodeMap) Add

func (m *NodeMap) Add(entry NodeEntry)

Add adds or updates a node entry in the map.

func (*NodeMap) All

func (m *NodeMap) All() []NodeEntry

All returns copies of all node entries.

func (*NodeMap) ClearDeregistered

func (m *NodeMap) ClearDeregistered()

ClearDeregistered clears the deregistered set after bootstrap completes.

func (*NodeMap) Count

func (m *NodeMap) Count() int

Count returns the number of registered nodes.

func (*NodeMap) ForEach

func (m *NodeMap) ForEach(fn func(NodeEntry))

ForEach calls fn for each node entry while holding the read lock. The callback receives a copy of the entry. Avoid calling mutating NodeMap methods from within fn — use the returned node IDs to act on entries after iteration.

func (*NodeMap) Get

func (m *NodeMap) Get(nodeID string) (entry NodeEntry, ok bool)

Get returns a copy of a node entry. The second return value indicates whether the node was found.

func (*NodeMap) IsDeregistered

func (m *NodeMap) IsDeregistered(nodeID string) bool

IsDeregistered reports whether the given node was deregistered during the current bootstrap cycle.

func (*NodeMap) MarkDeregistered

func (m *NodeMap) MarkDeregistered(nodeID string)

MarkDeregistered records that a node was deregistered during bootstrap, preventing the bootstrap loader from overwriting the deregistration with stale data from object storage.

func (*NodeMap) Ready

func (m *NodeMap) Ready() bool

Ready reports whether the bootstrap load has completed.

func (*NodeMap) Remove

func (m *NodeMap) Remove(nodeID string)

Remove removes a node entry from the map.

func (*NodeMap) Reset

func (m *NodeMap) Reset()

Reset clears all nodes and marks the map as not ready. This is called when the node loses Elector leadership.

func (*NodeMap) SetHealthState

func (m *NodeMap) SetHealthState(nodeID string, health nodestate.HealthState) bool

SetHealthState updates the health state for a node. When transitioning to Degraded, DegradedAt is set to the current time. It returns false if the node is not registered.

func (*NodeMap) SetReady

func (m *NodeMap) SetReady()

SetReady marks the node map as ready after bootstrap completes.

func (*NodeMap) UpdateHeartbeat

func (m *NodeMap) UpdateHeartbeat(nodeID string, t time.Time, health nodestate.HealthState, primary nodestate.PrimaryState, latestRevision int64, startTime int64) bool

UpdateHeartbeat updates a node's heartbeat timestamp and state fields. It returns false if the node is not registered.

type RevisionSource

type RevisionSource interface {
	LatestRevision() (int64, error)
}

RevisionSource provides the latest revision for election tie-breaking.

type Runner

type Runner struct {
	// contains filtered or unexported fields
}

Runner manages the s3lect Elector, the dedicated HTTPS election health server, and wires leadership change notifications into the local node state.

func New

func New(
	logger *slog.Logger,
	cfg *config.Config,
	state *nodestate.State,
	store storage.ObjectStorage,
	serverCert *tls.Certificate,
	localStartTime int64,
	localDB RevisionSource,
	peers *peerclient.Manager,
	retryMetrics *metrics.RetryMetrics,
) (*Runner, error)

New creates a Runner that is ready to start. It configures the s3lect Elector with peer-mode enabled and registers leadership callbacks that update the local node state.

func (*Runner) ElectorServer

func (r *Runner) ElectorServer() *Server

ElectorServer returns the Elector gRPC server for registration with a gRPC server externally.

func (*Runner) GetLocalClusterState

func (r *Runner) GetLocalClusterState(ctx context.Context) (*proto.ClusterState, error)

GetLocalClusterState returns the cluster state directly from the in-process Elector server. This is used when the current Elector is self.

func (*Runner) IsLeader

func (r *Runner) IsLeader() bool

IsLeader reports whether this node is currently the Elector.

func (*Runner) LeaderAddr

func (r *Runner) LeaderAddr() string

LeaderAddr returns the advertise address of the current Elector.

func (*Runner) LeaderID

func (r *Runner) LeaderID() string

LeaderID returns the node ID of the current Elector.

func (*Runner) RegisterLocalNode

func (r *Runner) RegisterLocalNode(ctx context.Context, req *proto.RegisterNodeRequest) (*proto.RegisterNodeResponse, error)

RegisterLocalNode registers the local node directly with the in-process Elector server. This is used when the current Elector is the current node/itself.

func (*Runner) SetMetrics

func (r *Runner) SetMetrics(m *Metrics)

SetMetrics sets the Elector metrics on the underlying server. This is called after construction because the Runner is created before the metrics registry is available.

func (*Runner) Start

func (r *Runner) Start(ctx context.Context) error

Start begins the election health server and the s3lect election loop.

func (*Runner) Stop

func (r *Runner) Stop(ctx context.Context)

Stop gracefully stops the s3lect elector and health server. The elector resigns leadership if currently held.

func (*Runner) WaitForFirstElection

func (r *Runner) WaitForFirstElection(ctx context.Context) (*s3lect.LeadershipStatus, error)

WaitForFirstElection blocks until the first election cycle completes, then updates ClusterState with the current Elector. When this node is the leader, it ensures the elector state is set to ElectorLeader before returning — covering both the "Acquired" (fresh) and "Confirmed" (stale record) s3lect paths.

func (*Runner) WaitUntilReady

func (r *Runner) WaitUntilReady(ctx context.Context) error

WaitUntilReady blocks until the local Elector server has completed its bootstrap pass and can safely serve registration and cluster-state requests.

type Server

type Server struct {
	proto.UnimplementedElectorServer
	// contains filtered or unexported fields
}

Server implements the proto.ElectorServer gRPC interface. It is only active when the local node is the Elector (leader).

func NewServer

func NewServer(
	logger *slog.Logger,
	clusterID string,
	store storage.ObjectStorage,
	state *nodestate.State,
	heartbeatInterval time.Duration,
	deregTimeout time.Duration,
	degradationCount int,
	localNodeID string,
	localStartTime int64,
	localDB RevisionSource,
	quorum int,
	primaryPriorTimeout time.Duration,
	peers *peerclient.Manager,
	m *Metrics,
	retryMetrics *metrics.RetryMetrics,
) *Server

NewServer creates a new Elector gRPC server.

func (*Server) Bootstrap

func (s *Server) Bootstrap(ctx context.Context) error

Bootstrap loads the node map from object storage when this node acquires Elector leadership. It reads members.json and node registration files, populating the in-memory node map. If members.json does not exist (first Elector), it creates one from discovered node registration files.

func (*Server) DeregisterNode

func (s *Server) DeregisterNode(ctx context.Context, req *proto.DeregisterNodeRequest) (_ *emptypb.Empty, err error)

DeregisterNode removes a node from the Elector's node map and deletes its registration file. The durable member_id mapping in members.json is preserved for future re-registration.

func (*Server) GetClusterState

func (s *Server) GetClusterState(_ context.Context, _ *emptypb.Empty) (resp *proto.ClusterState, err error)

GetClusterState returns the current cluster state as known by the Elector.

func (*Server) GetMemberList

GetMemberList returns all registered nodes from the Elector's in-memory node map. Only callable when this node is the Elector leader and the node map bootstrap has completed.

func (*Server) RegisterNode

func (s *Server) RegisterNode(ctx context.Context, req *proto.RegisterNodeRequest) (resp *proto.RegisterNodeResponse, err error)

RegisterNode registers a node with the Elector, allocating or reusing a member_id. It returns the assigned member_id and the current cluster state.

func (*Server) SendHeartbeat

func (s *Server) SendHeartbeat(ctx context.Context, req *proto.NodeState) (_ *emptypb.Empty, err error)

SendHeartbeat receives a NodeState heartbeat from a Node, updating the node map with the latest heartbeat timestamp and reported state. When a HeartbeatForwarder is set (i.e. this node is also the Primary), the heartbeat is forwarded so the Primary's replica tracker is updated using the same server-side code path.

func (*Server) SetHeartbeatForwarder

func (s *Server) SetHeartbeatForwarder(f HeartbeatForwarder)

SetHeartbeatForwarder sets or clears the forwarder that receives a copy of every heartbeat processed by the Elector. Pass nil to clear.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL