Documentation
¶
Overview ¶
Package elector integrates s3lect leader election into the Netsy node lifecycle. It manages the s3lect Elector, the dedicated HTTPS election health server, and wires leadership changes into the local node state.
Index ¶
- type HeartbeatForwarder
- type Metrics
- type NodeEntry
- type NodeMap
- func (m *NodeMap) Add(entry NodeEntry)
- func (m *NodeMap) All() []NodeEntry
- func (m *NodeMap) ClearDeregistered()
- func (m *NodeMap) Count() int
- func (m *NodeMap) ForEach(fn func(NodeEntry))
- func (m *NodeMap) Get(nodeID string) (entry NodeEntry, ok bool)
- func (m *NodeMap) IsDeregistered(nodeID string) bool
- func (m *NodeMap) MarkDeregistered(nodeID string)
- func (m *NodeMap) Ready() bool
- func (m *NodeMap) Remove(nodeID string)
- func (m *NodeMap) Reset()
- func (m *NodeMap) SetHealthState(nodeID string, health nodestate.HealthState) bool
- func (m *NodeMap) SetReady()
- func (m *NodeMap) UpdateHeartbeat(nodeID string, t time.Time, health nodestate.HealthState, ...) bool
- type RevisionSource
- type Runner
- func (r *Runner) ElectorServer() *Server
- func (r *Runner) GetLocalClusterState(ctx context.Context) (*proto.ClusterState, error)
- func (r *Runner) IsLeader() bool
- func (r *Runner) LeaderAddr() string
- func (r *Runner) LeaderID() string
- func (r *Runner) RegisterLocalNode(ctx context.Context, req *proto.RegisterNodeRequest) (*proto.RegisterNodeResponse, error)
- func (r *Runner) SetMetrics(m *Metrics)
- func (r *Runner) Start(ctx context.Context) error
- func (r *Runner) Stop(ctx context.Context)
- func (r *Runner) WaitForFirstElection(ctx context.Context) (*s3lect.LeadershipStatus, error)
- func (r *Runner) WaitUntilReady(ctx context.Context) error
- type Server
- func (s *Server) Bootstrap(ctx context.Context) error
- func (s *Server) DeregisterNode(ctx context.Context, req *proto.DeregisterNodeRequest) (_ *emptypb.Empty, err error)
- func (s *Server) GetClusterState(_ context.Context, _ *emptypb.Empty) (resp *proto.ClusterState, err error)
- func (s *Server) GetMemberList(_ context.Context, _ *proto.GetMemberListRequest) (*proto.GetMemberListResponse, error)
- func (s *Server) RegisterNode(ctx context.Context, req *proto.RegisterNodeRequest) (resp *proto.RegisterNodeResponse, err error)
- func (s *Server) SendHeartbeat(ctx context.Context, req *proto.NodeState) (_ *emptypb.Empty, err error)
- func (s *Server) SetHeartbeatForwarder(f HeartbeatForwarder)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type HeartbeatForwarder ¶
type HeartbeatForwarder interface {
SendHeartbeat(context.Context, *proto.NodeState) (*emptypb.Empty, error)
}
HeartbeatForwarder forwards heartbeats to another subsystem. When the node is both Elector and Primary, heartbeats received by the Elector are forwarded to the Primary so that its replica health tracker is updated using the same server-side code path.
type Metrics ¶
type Metrics struct {
RegisteredNodes prometheus.Gauge
HealthyNodes prometheus.Gauge
DegradedNodes prometheus.Gauge
PrimaryElections *prometheus.CounterVec
PrimaryElectionFailures *prometheus.CounterVec
PrimaryElectionDuration *prometheus.HistogramVec
PrimaryElectionContacts *prometheus.CounterVec
AutoDeregistrations prometheus.Counter
}
Metrics holds Elector-scoped Prometheus metrics. These are registered through a RoleGroup and disappear from scrape output when the node is not the Elector.
func NewMetrics ¶
func NewMetrics() *Metrics
NewMetrics creates all Elector-scoped Prometheus metrics.
func (*Metrics) Collectors ¶
func (m *Metrics) Collectors() []prometheus.Collector
Collectors returns all Elector-scoped collectors for registration with a RoleGroup.
type NodeEntry ¶
type NodeEntry struct {
NodeID string
MemberID uint64
ClientAdvertiseAddress string
PeerAdvertiseAddress string
LastHeartbeat time.Time
DegradedAt time.Time
HealthState nodestate.HealthState
PrimaryState nodestate.PrimaryState
LatestRevision int64
StartTime int64
}
NodeEntry represents a registered node in the Elector's node map.
type NodeMap ¶
type NodeMap struct {
// contains filtered or unexported fields
}
NodeMap is the Elector's authoritative in-memory map of registered nodes.
func NewNodeMap ¶
NewNodeMap creates a new empty NodeMap.
func (*NodeMap) ClearDeregistered ¶
func (m *NodeMap) ClearDeregistered()
ClearDeregistered clears the deregistered set after bootstrap completes.
func (*NodeMap) ForEach ¶
ForEach calls fn for each node entry while holding the read lock. The callback receives a copy of the entry. Avoid calling mutating NodeMap methods from within fn — use the returned node IDs to act on entries after iteration.
func (*NodeMap) Get ¶
Get returns a copy of a node entry. The second return value indicates whether the node was found.
func (*NodeMap) IsDeregistered ¶
IsDeregistered reports whether the given node was deregistered during the current bootstrap cycle.
func (*NodeMap) MarkDeregistered ¶
MarkDeregistered records that a node was deregistered during bootstrap, preventing the bootstrap loader from overwriting the deregistration with stale data from object storage.
func (*NodeMap) Reset ¶
func (m *NodeMap) Reset()
Reset clears all nodes and marks the map as not ready. This is called when the node loses Elector leadership.
func (*NodeMap) SetHealthState ¶
func (m *NodeMap) SetHealthState(nodeID string, health nodestate.HealthState) bool
SetHealthState updates the health state for a node. When transitioning to Degraded, DegradedAt is set to the current time. It returns false if the node is not registered.
func (*NodeMap) SetReady ¶
func (m *NodeMap) SetReady()
SetReady marks the node map as ready after bootstrap completes.
func (*NodeMap) UpdateHeartbeat ¶
func (m *NodeMap) UpdateHeartbeat(nodeID string, t time.Time, health nodestate.HealthState, primary nodestate.PrimaryState, latestRevision int64, startTime int64) bool
UpdateHeartbeat updates a node's heartbeat timestamp and state fields. It returns false if the node is not registered.
type RevisionSource ¶
RevisionSource provides the latest revision for election tie-breaking.
type Runner ¶
type Runner struct {
// contains filtered or unexported fields
}
Runner manages the s3lect Elector, the dedicated HTTPS election health server, and wires leadership change notifications into the local node state.
func New ¶
func New( logger *slog.Logger, cfg *config.Config, state *nodestate.State, store storage.ObjectStorage, serverCert *tls.Certificate, localStartTime int64, localDB RevisionSource, peers *peerclient.Manager, retryMetrics *metrics.RetryMetrics, ) (*Runner, error)
New creates a Runner that is ready to start. It configures the s3lect Elector with peer-mode enabled and registers leadership callbacks that update the local node state.
func (*Runner) ElectorServer ¶
ElectorServer returns the Elector gRPC server for registration with a gRPC server externally.
func (*Runner) GetLocalClusterState ¶
GetLocalClusterState returns the cluster state directly from the in-process Elector server. This is used when the current Elector is self.
func (*Runner) LeaderAddr ¶
LeaderAddr returns the advertise address of the current Elector.
func (*Runner) RegisterLocalNode ¶
func (r *Runner) RegisterLocalNode(ctx context.Context, req *proto.RegisterNodeRequest) (*proto.RegisterNodeResponse, error)
RegisterLocalNode registers the local node directly with the in-process Elector server. This is used when the current Elector is the current node/itself.
func (*Runner) SetMetrics ¶
SetMetrics sets the Elector metrics on the underlying server. This is called after construction because the Runner is created before the metrics registry is available.
func (*Runner) Stop ¶
Stop gracefully stops the s3lect elector and health server. The elector resigns leadership if currently held.
func (*Runner) WaitForFirstElection ¶
WaitForFirstElection blocks until the first election cycle completes, then updates ClusterState with the current Elector. When this node is the leader, it ensures the elector state is set to ElectorLeader before returning — covering both the "Acquired" (fresh) and "Confirmed" (stale record) s3lect paths.
type Server ¶
type Server struct {
proto.UnimplementedElectorServer
// contains filtered or unexported fields
}
Server implements the proto.ElectorServer gRPC interface. It is only active when the local node is the Elector (leader).
func NewServer ¶
func NewServer( logger *slog.Logger, clusterID string, store storage.ObjectStorage, state *nodestate.State, heartbeatInterval time.Duration, deregTimeout time.Duration, degradationCount int, localNodeID string, localStartTime int64, localDB RevisionSource, quorum int, primaryPriorTimeout time.Duration, peers *peerclient.Manager, m *Metrics, retryMetrics *metrics.RetryMetrics, ) *Server
NewServer creates a new Elector gRPC server.
func (*Server) Bootstrap ¶
Bootstrap loads the node map from object storage when this node acquires Elector leadership. It reads members.json and node registration files, populating the in-memory node map. If members.json does not exist (first Elector), it creates one from discovered node registration files.
func (*Server) DeregisterNode ¶
func (s *Server) DeregisterNode(ctx context.Context, req *proto.DeregisterNodeRequest) (_ *emptypb.Empty, err error)
DeregisterNode removes a node from the Elector's node map and deletes its registration file. The durable member_id mapping in members.json is preserved for future re-registration.
func (*Server) GetClusterState ¶
func (s *Server) GetClusterState(_ context.Context, _ *emptypb.Empty) (resp *proto.ClusterState, err error)
GetClusterState returns the current cluster state as known by the Elector.
func (*Server) GetMemberList ¶
func (s *Server) GetMemberList(_ context.Context, _ *proto.GetMemberListRequest) (*proto.GetMemberListResponse, error)
GetMemberList returns all registered nodes from the Elector's in-memory node map. Only callable when this node is the Elector leader and the node map bootstrap has completed.
func (*Server) RegisterNode ¶
func (s *Server) RegisterNode(ctx context.Context, req *proto.RegisterNodeRequest) (resp *proto.RegisterNodeResponse, err error)
RegisterNode registers a node with the Elector, allocating or reusing a member_id. It returns the assigned member_id and the current cluster state.
func (*Server) SendHeartbeat ¶
func (s *Server) SendHeartbeat(ctx context.Context, req *proto.NodeState) (_ *emptypb.Empty, err error)
SendHeartbeat receives a NodeState heartbeat from a Node, updating the node map with the latest heartbeat timestamp and reported state. When a HeartbeatForwarder is set (i.e. this node is also the Primary), the heartbeat is forwarded so the Primary's replica tracker is updated using the same server-side code path.
func (*Server) SetHeartbeatForwarder ¶
func (s *Server) SetHeartbeatForwarder(f HeartbeatForwarder)
SetHeartbeatForwarder sets or clears the forwarder that receives a copy of every heartbeat processed by the Elector. Pass nil to clear.