Documentation
¶
Overview ¶
Package s3lect provides peer health server functionality for leader election.
The health server can be used in two ways:
As a standalone HTTPS server: server := s3lect.NewHealthServer(s3lect.HealthServerConfig{...}) server.Start(ctx)
As an HTTP handler embedded in existing servers: mux.HandleFunc(elector.GetConfig().PeerHealthPath, s3lect.NewLeadershipHandler(elector, logger))
Index ¶
- Variables
- func NewLeadershipHandler(elector Elector, logger *slog.Logger) http.HandlerFunc
- type Elector
- type ElectorConfig
- type HealthServer
- type HealthServerConfig
- type LeaderRecord
- type LeaderRecordMetadata
- type LeadershipStatus
- type MockStorage
- type S3Elector
- func (e *S3Elector) EnablePeerMode(caCert []byte) error
- func (e *S3Elector) GetConfig() *ElectorConfig
- func (e *S3Elector) GetLeadershipStatus() *LeadershipStatus
- func (e *S3Elector) IsLeader() bool
- func (e *S3Elector) LeaderID() string
- func (e *S3Elector) Start(ctx context.Context) error
- func (e *S3Elector) Stop() error
- func (e *S3Elector) UpdateConfig(newConfig ElectorConfig) error
- func (e *S3Elector) WaitForLeadership(ctx context.Context) error
- func (e *S3Elector) WaitForNextElection(ctx context.Context, since time.Time) (*LeadershipStatus, error)
- type S3ElectorOptions
- type Storage
Constants ¶
This section is empty.
Variables ¶
var ( ErrStorageNotFound = errors.New("object not found") ErrStoragePrecondition = errors.New("precondition failed") )
Functions ¶
func NewLeadershipHandler ¶
func NewLeadershipHandler(elector Elector, logger *slog.Logger) http.HandlerFunc
NewLeadershipHandler creates an HTTP handler for embedding in existing servers. This allows you to add the leadership health endpoint to an existing HTTP server.
Example:
mux := http.NewServeMux()
mux.HandleFunc(elector.GetConfig().PeerHealthPath, s3lect.NewLeadershipHandler(elector, logger))
server := &http.Server{Handler: mux, ...}
Types ¶
type Elector ¶
type Elector interface {
// Start begins the leader election process
Start(ctx context.Context) error
// Stop stops the leader election process
Stop() error
// IsLeader returns true if this instance is currently the leader
IsLeader() bool
// WaitForLeadership blocks until this instance becomes leader
WaitForLeadership(ctx context.Context) error
// WaitForNextElection blocks until an election cycle completes after the given timestamp
// If the timestamp is zero/empty, blocks until the first-ever election cycle completes
// Returns the leadership status after the election completes
WaitForNextElection(ctx context.Context, since time.Time) (*LeadershipStatus, error)
// LeaderID returns the current leader's identity
LeaderID() string
// GetLeadershipStatus returns detailed leadership status
GetLeadershipStatus() *LeadershipStatus
// EnablePeerMode enables peer mode with the provided CA certificate
EnablePeerMode(caCert []byte) error
// UpdateConfig allows dynamic reconfiguration of the elector
UpdateConfig(newConfig ElectorConfig) error
// GetConfig returns the current configuration
GetConfig() *ElectorConfig
}
Elector handles leader election for a distributed system
type ElectorConfig ¶
type ElectorConfig struct {
// LockfilePath is the S3 object key/path for the leader lockfile (e.g., "leader/my-group.json")
LockfilePath string
// ServerID is this instance's unique identifier
ServerID string
// ServerAddr is the address to advertise for peer health checks
ServerAddr string
// FrequentInterval is how often to check for leadership during transitions (default: 5s)
FrequentInterval time.Duration
// InfrequentInterval is how often to check for leadership during stable periods (default: 30s)
InfrequentInterval time.Duration
// LeaderTimeout is how long to wait before considering leader dead (default: 15s)
LeaderTimeout time.Duration
// PeerMode enables HTTP-based leader health checks to reduce S3 operations
PeerMode bool
// PeerHealthPath is the HTTP path for the peer leader health check endpoint (default: "/health/leadership")
PeerHealthPath string
// PeerTimeout is timeout for peer health check requests (default: 3s)
PeerTimeout time.Duration
// PeerCACert is the CA certificate for validating peer HTTPS connections
PeerCACert []byte
// OnAcquireLeadership is called after successfully claiming leadership in S3.
// If this hook returns an error, leadership will be automatically resigned.
// This is useful for critical operations like attaching network interfaces.
OnAcquireLeadership func(ctx context.Context) error
// OnLoseLeadership is called after losing leadership.
// Errors are logged but do not prevent leadership transition (best effort cleanup).
// This is useful for detaching network interfaces or stopping leader-only services.
OnLoseLeadership func(ctx context.Context) error
}
ElectorConfig contains configuration for leader election
func (*ElectorConfig) Validate ¶
func (c *ElectorConfig) Validate() error
Validate validates the elector configuration
type HealthServer ¶
type HealthServer struct {
// contains filtered or unexported fields
}
HealthServer provides the peer health endpoint for leader election
func NewHealthServer ¶
func NewHealthServer(config HealthServerConfig) (*HealthServer, error)
NewHealthServer creates a new standalone peer health server. This creates a dedicated HTTPS server for the leadership health endpoint.
Example:
cert, _ := tls.X509KeyPair(certPEM, keyPEM)
server, err := s3lect.NewHealthServer(s3lect.HealthServerConfig{
BindAddress: "0.0.0.0:8993",
Certificate: cert,
Elector: elector,
Logger: logger,
})
server.Start(ctx)
type HealthServerConfig ¶
type HealthServerConfig struct {
BindAddress string
Certificate tls.Certificate
Elector Elector
Logger *slog.Logger
}
HealthServerConfig contains configuration for the health server
type LeaderRecord ¶
type LeaderRecord struct {
LeaderID string `json:"leaderID,omitempty"`
LeaderAddr string `json:"leaderAddr,omitempty"`
LastUpdated time.Time `json:"lastUpdated"`
Metadata LeaderRecordMetadata `json:"-"`
}
LeaderRecord represents the leader information stored in S3
type LeaderRecordMetadata ¶
type LeaderRecordMetadata struct {
ETag string
}
LeaderRecordMetadata represents the S3 metadata of the LeaderRecord file
type LeadershipStatus ¶
type LeadershipStatus struct {
ServerID string `json:"serverID"`
LockfilePath string `json:"lockfilePath"`
IsLeader bool `json:"isLeader"`
LeaderID string `json:"leaderID"`
LeaderAddr string `json:"leaderAddr"`
LeaderLastSeen time.Time `json:"leaderLastSeen"`
LastElectionTime time.Time `json:"lastElectionTime"`
ConsecutiveFails int `json:"consecutiveFails"`
}
LeadershipStatus contains detailed information about current leadership
type MockStorage ¶
type MockStorage struct {
// contains filtered or unexported fields
}
MockStorage implements Storage interface using local filesystem for testing
func NewMock ¶
func NewMock() *MockStorage
NewMock creates a new mock storage instance with a temporary directory
func NewMockStorage ¶
func NewMockStorage(baseDir string) (*MockStorage, error)
NewMockStorage creates a new mock storage instance
func (*MockStorage) Cleanup ¶
func (m *MockStorage) Cleanup() error
Cleanup removes all files from mock storage
func (*MockStorage) Get ¶
Get retrieves an object from mock storage Returns ErrStorageNotFound if object does not exist
func (*MockStorage) PutIfMatch ¶
PutIfMatch stores an object only if the ETag matches (optimistic locking) Empty etag means object must not exist Returns ErrStoragePrecondition if ETag doesn't match
type S3Elector ¶
type S3Elector struct {
// contains filtered or unexported fields
}
S3Elector implements leader election using S3 as the coordination mechanism
func NewS3Elector ¶
func NewS3Elector(opts S3ElectorOptions) (*S3Elector, error)
NewS3Elector creates a new S3-based leader elector
func (*S3Elector) EnablePeerMode ¶
EnablePeerMode enables peer mode with the provided CA certificate
func (*S3Elector) GetConfig ¶
func (e *S3Elector) GetConfig() *ElectorConfig
GetConfig returns the current configuration
func (*S3Elector) GetLeadershipStatus ¶
func (e *S3Elector) GetLeadershipStatus() *LeadershipStatus
GetLeadershipStatus returns detailed leadership status
func (*S3Elector) UpdateConfig ¶
func (e *S3Elector) UpdateConfig(newConfig ElectorConfig) error
UpdateConfig allows dynamic reconfiguration of the elector
func (*S3Elector) WaitForLeadership ¶
WaitForLeadership blocks until this instance becomes leader
func (*S3Elector) WaitForNextElection ¶ added in v1.0.1
func (e *S3Elector) WaitForNextElection(ctx context.Context, since time.Time) (*LeadershipStatus, error)
WaitForNextElection blocks until an election cycle completes after the given timestamp If the timestamp is zero/empty, blocks until the first-ever election cycle completes Returns the leadership status after the election completes
type S3ElectorOptions ¶
type S3ElectorOptions struct {
Config *ElectorConfig
Storage Storage
Logger *slog.Logger
}
S3ElectorOptions contains options for creating a new S3Elector
type Storage ¶
type Storage interface {
// Get retrieves an object from storage and returns its contents and its etag
// Returns ErrStorageNotFound if object does not exist
Get(ctx context.Context, key string) (data []byte, etag string, err error)
// PutIfMatch stores an object only if the ETag matches (empty string = must not exist)
// Returns ErrStoragePrecondition if ETag doesn't match
PutIfMatch(ctx context.Context, key string, data []byte, etag string) error
}
Storage provides an abstraction for object storage operations