server

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 26, 2025 License: MIT Imports: 24 Imported by: 0

README

README for the server Package

Overview

The server package provides the gRPC server implementation for the RaftLock distributed locking service. It acts as the primary entry point for clients, handling API requests, ensuring operational integrity, and coordinating with the underlying Raft consensus module to provide strongly consistent, fault-tolerant locking.

This package orchestrates all high-level server functionalities, including request validation, rate limiting, leader redirection, connection management, and state monitoring. It is designed to be configurable and observable for production environments.

Key Features

  • gRPC API Endpoint: Exposes the complete distributed locking functionality through a robust, type-safe gRPC interface.
  • Leader Redirection: Automatically forwards write requests to the current Raft leader if a request is received by a follower, ensuring clients can connect to any node.
  • Request Validation: Enforces strict validation on all incoming requests for fields like lock IDs, client IDs, TTLs, and metadata to ensure data integrity and prevent malformed operations.
  • Concurrency and Rate Limiting: Manages server load through configurable limits on concurrent requests and a token-bucket rate limiter to prevent abuse.
  • Graceful Shutdown: Ensures a clean shutdown process, completing in-flight requests and closing resources within a configurable timeout.
  • Comprehensive Health Checks: Provides a Health endpoint for load balancers and monitoring systems to check the server's operational status.
  • Connection Management: Tracks active client connections for observability and debugging.
  • Pluggable Dependencies: Uses interfaces for core dependencies like logging (Logger) and metrics (ServerMetrics), allowing for easy integration with external systems.

Architecture

The server package is built around the raftLockServer, which integrates several key components to manage client requests and interact with the Raft consensus layer.

graph TD
    CLIENT[Client Request] --> GRPC[gRPC Server]
    
    subgraph "RaftLock Server"
        GRPC -- unaryInterceptor --> VALIDATOR[Request Validator]
        VALIDATOR --> LIMITER[Rate Limiter]
        LIMITER --> SERVER_CORE[raftLockServer Core Logic]
        
        SERVER_CORE --> TRACKER[ProposalTracker]
        SERVER_CORE --> RAFT[Raft Node]
        
        CONN_MGR[Connection Manager]
        GRPC -- Manages --> CONN_MGR
    end
    
    RAFT -- Proposes/Applies --> LOCK_MGR[Lock Manager]
    TRACKER -- Resolves --> SERVER_CORE
    
    subgraph "External Dependencies"
        METRICS[Metrics]
        LOGGER[Logger]
        CLOCK[Clock]
    end

    SERVER_CORE -- Records --> METRICS
    SERVER_CORE -- Logs --> LOGGER
    SERVER_CORE -- Uses --> CLOCK
    
    SERVER_CORE --> RESPONSE[gRPC Response]
    RESPONSE --> CLIENT
    
    classDef server fill:#e3f2fd,stroke:#1565c0,stroke-width:2px
    classDef external fill:#f1f8e9,stroke:#558b2f,stroke-width:2px
    
    class CLIENT,GRPC,RESPONSE,VALIDATOR,LIMITER,SERVER_CORE,TRACKER,CONN_MGR server
    class RAFT,LOCK_MGR,METRICS,LOGGER,CLOCK external
  • gRPC Server: The public-facing endpoint that receives all client API calls. It uses a unary interceptor to apply common middleware.
  • Request Validator: The first step in the middleware chain, ensuring all request parameters are well-formed and within defined limits before further processing.
  • Rate Limiter: Protects the server from excessive traffic by enforcing a requests-per-second limit.
  • raftLockServer Core: The central component that implements the RaftLockServer interface. It handles business logic, determines if the node is the leader, and prepares commands for Raft consensus.
  • ProposalTracker: Manages the lifecycle of commands submitted to Raft. It tracks a proposal from submission until the corresponding log entry is applied, ensuring the client receives a response.
  • ConnectionManager: Monitors active client connections, tracking metadata like connection time and last activity for observability.

Quick Start

The RaftLockServerBuilder provides a fluent API for configuring and constructing a server instance.

  1. Use the Builder to Configure the Server: The builder pattern helps ensure all required fields are set.

    package main
    
    import (
        "context"
        "log"
        "time"
    
        "github.com/jathurchan/raftlock/raft"
        "github.com/jathurchan/raftlock/server"
        "github.com/jathurchan/raftlock/types"
    )
    
    func main() {
        nodeID := types.NodeID("node-1")
        dataDir := "/tmp/raftlock-1"
    
        // Define all peers in the cluster, including the current node.
        peers := map[types.NodeID]raft.PeerConfig{
            "node-1": {ID: "node-1", Address: "localhost:8081"},
            "node-2": {ID: "node-2", Address: "localhost:8082"},
            "node-3": {ID: "node-3", Address: "localhost:8083"},
        }
    
        // Use the builder to construct the server
        s, err := server.NewRaftLockServerBuilder().
            WithNodeID(nodeID).
            WithPeers(peers).
            WithDataDir(dataDir).
            WithClientAPIAddress("localhost:9090"). // Address for client connections
            WithTimeouts(30*time.Second, 10*time.Second, 5*time.Second).
            WithLimits(4*1024*1024, 4*1024*1024, 1000).
            Build()
    
        if err != nil {
            log.Fatalf("Failed to build server: %v", err)
        }
    
        // Start the server
        ctx := context.Background()
        if err := s.Start(ctx); err != nil {
            log.Fatalf("Failed to start server: %v", err)
        }
    
        log.Printf("Server %s started, listening for clients on localhost:9090", nodeID)
    
        // Wait for shutdown signal (e.g., from OS)
        <-ctx.Done()
    
        // Stop the server gracefully
        shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
        defer cancel()
        if err := s.Stop(shutdownCtx); err != nil {
            log.Fatalf("Server shutdown failed: %v", err)
        }
    
        log.Println("Server stopped gracefully")
    }
    
  2. Use RaftLockServerQuickBuild for Simplicity: For tests or simple setups, this helper function configures a server with common defaults.

    // QuickBuild provides a simpler alternative to the full builder
    s, err := server.RaftLockServerQuickBuild(
        nodeID,
        "localhost:8081", // Raft peer address
        peers,
        dataDir,
    )
    

Configuration (config.go)

The server's behavior is controlled by the RaftLockServerConfig struct. While the builder provides sensible defaults, you can customize these settings:

  • NodeID: The unique ID for this node. (Required)
  • ListenAddress: The address for internal Raft peer-to-peer communication.
  • ClientAPIAddress: The address for the public client-facing gRPC API.
  • Peers: A map of all nodes in the Raft cluster. (Required)
  • DataDir: The directory for storing Raft logs and snapshots. (Required)
  • RequestTimeout: The default timeout for client requests.
  • ShutdownTimeout: The grace period for the server to shut down.
  • MaxConcurrentReqs: The maximum number of requests to process simultaneously.
  • EnableRateLimit: A boolean to enable or disable the rate limiter.
  • RateLimit / RateLimitBurst / RateLimitWindow: Parameters for the token bucket rate limiter.
  • HealthCheckInterval: The frequency of internal health checks.
  • Logger, Metrics, Clock, Serializer: Interfaces for injecting custom dependencies.
Error Handling (errors.go)

The server package defines several custom error types to provide clear, actionable feedback to clients and operators:

  • ErrServerNotStarted / ErrServerStopped: Returned when requests are received before the server is running or after it has shut down.
  • ErrNotLeader / ErrNoLeader: Indicates that a write operation was sent to a non-leader node or that no leader is currently elected.
  • LeaderRedirectError: A special error type returned by followers that contains the address of the current leader, allowing clients to redirect their requests.
  • ValidationError: Returned when a request fails validation. It includes the field, value, and a message explaining the failure.
  • ServerError: A generic error for internal issues, which can wrap an underlying cause.

The ErrorToProtoError function translates these Go errors into detailed pb.ErrorDetail messages for gRPC responses.

Monitoring and Observability (metrics.go)

The ServerMetrics interface provides a comprehensive set of hooks to monitor the server's health and performance. You can implement this interface to integrate with systems like Prometheus, Datadog, or InfluxDB.

Key metrics include:

  • RPC-level counters: IncrGRPCRequest, ObserveRequestLatency
  • Raft-related counters: IncrRaftProposal, ObserveRaftProposalLatency, IncrLeaderRedirect
  • Error counters: IncrValidationError, IncrClientError, IncrServerError
  • Gauges: SetServerState, SetActiveConnections, SetRaftTerm, SetRaftCommitIndex

A NoOpServerMetrics implementation is provided for environments where metrics are not needed.

Documentation

Index

Constants

View Source
const (

	// DefaultListenAddress is the default address for the server's client-facing gRPC endpoint.
	DefaultListenAddress = "0.0.0.0:8080"

	// DefaultRequestTimeout is the default timeout for processing individual client requests.
	DefaultRequestTimeout = 30 * time.Second

	// DefaultShutdownTimeout is the default timeout for graceful server shutdown.
	DefaultShutdownTimeout = 10 * time.Second

	// DefaultMaxRequestSize is the default maximum size for incoming client gRPC requests (4MB).
	DefaultMaxRequestSize = 4 * 1024 * 1024

	// DefaultMaxResponseSize is the default maximum size for outgoing client gRPC responses (4MB).
	DefaultMaxResponseSize = 4 * 1024 * 1024

	// DefaultMaxConcurrentRequests is the default maximum number of concurrent client requests the server will process.
	DefaultMaxConcurrentRequests = 1000

	// DefaultRateLimit is the default number of requests per second per client.
	DefaultRateLimit = 100

	// DefaultRateLimitBurst is the default burst size for rate limiting.
	DefaultRateLimitBurst = 200

	// DefaultRateLimitWindow is the default time window for rate limiting calculations.
	DefaultRateLimitWindow = time.Second

	// DefaultHealthCheckInterval is the default interval for internal server health checks.
	DefaultHealthCheckInterval = 30 * time.Second

	// DefaultHealthCheckTimeout is the default timeout for individual internal health checks.
	DefaultHealthCheckTimeout = 5 * time.Second

	// DefaultRedirectTimeout is the default timeout when redirecting a client request to the Raft leader.
	DefaultRedirectTimeout = 5 * time.Second

	// DefaultRaftTickInterval is the default interval at which the server application should call Raft.Tick().
	// This should generally align with raft.NominalTickInterval.
	DefaultRaftTickInterval = 100 * time.Millisecond

	// DefaultLockManagerTickInterval is the default interval for LockManager's periodic tasks (e.g., expirations).
	DefaultLockManagerTickInterval = 1 * time.Second

	// DefaultMetricsReportInterval is the default interval for periodic metrics reporting or flushing.
	DefaultMetricsReportInterval = 10 * time.Second

	// DefaultGRPCMaxRecvMsgSize is the default maximum size for incoming gRPC messages from clients (16MB).
	DefaultGRPCMaxRecvMsgSize = 16 * 1024 * 1024

	// DefaultGRPCMaxSendMsgSize is the default maximum size for outgoing gRPC messages to clients (16MB).
	DefaultGRPCMaxSendMsgSize = 16 * 1024 * 1024

	// DefaultGRPCKeepaliveTime is the default interval for the server to send keepalive pings to idle client connections.
	// This helps detect dead connections and keep active ones alive through intermediaries.
	DefaultGRPCKeepaliveTime = 30 * time.Second

	// DefaultGRPCKeepaliveTimeout is the default timeout for the server to wait for a keepalive acknowledgment from clients.
	// If a client doesn't respond within this time after a ping, the connection may be considered dead.
	DefaultGRPCKeepaliveTimeout = 5 * time.Second

	// DefaultGRPCMinClientPingInterval is the minimum interval a client should wait before sending pings.
	DefaultGRPCMinClientPingInterval = 10 * time.Second

	// MaxLockIDLength is the maximum allowed length for lock IDs.
	MaxLockIDLength = 256

	// MaxClientIDLength is the maximum allowed length for client IDs.
	MaxClientIDLength = 256

	// MaxMetadataEntries is the maximum number of metadata entries allowed per lock.
	MaxMetadataEntries = 32

	// MaxMetadataKeyLength is the maximum length for metadata keys.
	MaxMetadataKeyLength = 64

	// MaxMetadataValueLength is the maximum length for metadata values.
	MaxMetadataValueLength = 256

	// MinLockTTL is the minimum allowed TTL for locks.
	MinLockTTL = 1 * time.Second

	// MaxLockTTL is the maximum allowed TTL for locks.
	MaxLockTTL = 24 * time.Hour

	// MinWaitTimeout is the minimum allowed wait timeout for queue operations.
	MinWaitTimeout = 1 * time.Second

	// MaxWaitTimeout is the maximum allowed wait timeout for queue operations.
	MaxWaitTimeout = 10 * time.Minute

	// MinBackoffAdvice is the minimum backoff duration advised before retrying a lock acquisition.
	MinBackoffAdvice = 1 * time.Second

	// MaxBackoffAdvice is the maximum backoff duration advised before retrying a lock acquisition.
	MaxBackoffAdvice = 30 * time.Second

	// DefaultBackoffInitial is the initial backoff delay when retrying a lock.
	DefaultBackoffInitial = 50 * time.Millisecond

	// DefaultBackoffMax is the maximum backoff for retrying a lock when no info is available.
	DefaultBackoffMax = 1 * time.Second

	// DefaultBackoffMultiplier controls exponential growth of retry delays.
	DefaultBackoffMultiplier = 1.5

	// DefaultBackoffJitterFactor controls randomization to avoid thundering herd.
	DefaultBackoffJitterFactor = 0.2

	// MinPriority is the minimum allowed priority value for wait queue operations.
	MinPriority = -1000

	// MaxPriority is the maximum allowed priority value for wait queue operations.
	MaxPriority = 1000

	// DefaultPriority is the default priority for wait queue operations if not specified.
	DefaultPriority = 0

	// DefaultPageLimit is the default number of items returned in paginated responses.
	DefaultPageLimit = 100

	// MaxPageLimit is the maximum number of items that can be requested in a single page.
	MaxPageLimit = 1000

	// MaxRequestIDLength is the maximum allowed length for request IDs (used for idempotency).
	MaxRequestIDLength = 128

	// DefaultMetricsRetentionPeriod is how long to keep historical metrics data if applicable.
	DefaultMetricsRetentionPeriod = 1 * time.Hour
	// DefaultMetricsSampleRate is the rate at which detailed metrics are sampled if applicable.
	DefaultMetricsSampleRate = 0.1 // 10% sampling

	// ErrMsgInvalidLockID is the error message template for invalid lock IDs.
	ErrMsgInvalidLockID = "lock_id must be a non-empty string with length <= %d characters"
	// ErrMsgInvalidClientID is the error message template for invalid client IDs.
	ErrMsgInvalidClientID = "client_id must be a non-empty string with length <= %d characters"
	// ErrMsgInvalidTTL is the error message template for invalid TTL values.
	ErrMsgInvalidTTL = "ttl must be between %v and %v"
	// ErrMsgInvalidTimeout is the error message template for invalid timeout values.
	ErrMsgInvalidTimeout = "timeout must be between %v and %v"
	// ErrMsgInvalidPriority is the error message template for invalid priority values.
	ErrMsgInvalidPriority = "priority must be between %d and %d"
	// ErrMsgTooManyMetadata is the error message for too many metadata entries.
	ErrMsgTooManyMetadata = "metadata cannot have more than %d entries"
	// ErrMsgMetadataKeyTooLong is the error message for metadata keys that are too long.
	ErrMsgMetadataKeyTooLong = "metadata key length cannot exceed %d characters"
	// ErrMsgMetadataValueTooLong is the error message for metadata values that are too long.
	ErrMsgMetadataValueTooLong = "metadata value length cannot exceed %d characters"

	// DefaultProposalMaxPendingAge is the default max duration a proposal can remain pending before expiration.
	DefaultProposalMaxPendingAge = 5 * time.Minute

	// DefaultProposalCleanupInterval is the default interval for proposal tracker cleanup tasks.
	DefaultProposalCleanupInterval = 30 * time.Second
)
View Source
const (
	MethodAcquire          = "Acquire"
	MethodRelease          = "Release"
	MethodRenew            = "Renew"
	MethodGetLockInfo      = "GetLockInfo"
	MethodGetLocks         = "GetLocks"
	MethodEnqueueWaiter    = "EnqueueWaiter"
	MethodCancelWait       = "CancelWait"
	MethodGetBackoffAdvice = "GetBackoffAdvice"
	MethodGetStatus        = "GetStatus"
	MethodHealth           = "Health"
)

gRPC method names for metrics collection and logging

View Source
const (
	QueueTypeGRPCRequests  = "grpc_requests"
	QueueTypeRaftProposals = "raft_proposals"
	QueueTypeLockWaiters   = "lock_waiters"
)

Queue types for metrics and logging

View Source
const (
	ErrorTypeMissingField    = "missing_field"
	ErrorTypeInvalidFormat   = "invalid_format"
	ErrorTypeOutOfRange      = "out_of_range"
	ErrorTypeTooLong         = "too_long"
	ErrorTypeInternalError   = "internal_error"
	ErrorTypeRaftUnavailable = "raft_unavailable"
	ErrorTypeTimeout         = "timeout"
	ErrorTypeRateLimit       = "rate_limit_exceeded" // More specific for when limit is hit
)

Error types for metrics and logging (used with ServerMetrics.IncrValidationError/IncrServerError)

Variables

View Source
var (
	// ErrServerNotStarted indicates the server has not been started or is not yet ready.
	ErrServerNotStarted = errors.New("server: server not started or not ready")

	// ErrServerAlreadyStarted indicates an attempt to start an already running server.
	ErrServerAlreadyStarted = errors.New("server: server already started")

	// ErrServerStopped indicates the server has been stopped and cannot process requests.
	ErrServerStopped = errors.New("server: server stopped")

	// ErrNotLeader indicates this server is not the current Raft leader and cannot process the request.
	ErrNotLeader = errors.New("server: this node is not the Raft leader")

	// ErrNoLeader indicates that no leader is currently elected or known in the Raft cluster.
	ErrNoLeader = errors.New("server: no leader available in the cluster")

	// ErrRequestTooLarge indicates the client's request exceeds the configured maximum allowed size.
	ErrRequestTooLarge = errors.New("server: request too large")

	// ErrResponseTooLarge indicates an internally generated response exceeds the maximum allowed size.
	// This is typically an internal server issue.
	ErrResponseTooLarge = errors.New("server: response too large")

	// ErrRateLimited indicates the request was rejected due to rate limiting policies.
	ErrRateLimited = errors.New("server: request rate limited")

	// ErrShutdownTimeout indicates the server's graceful shutdown process timed out.
	ErrShutdownTimeout = errors.New("server: shutdown timed out")

	// ErrInvalidRequest indicates the request is malformed or contains invalid parameters
	// not caught by more specific validation errors.
	ErrInvalidRequest = errors.New("server: invalid request")

	// ErrRaftUnavailable indicates the Raft consensus system is not available or has errors.
	ErrRaftUnavailable = errors.New("server: raft consensus system unavailable")

	// ErrLockManagerUnavailable indicates the core lock management logic is unavailable or has errors.
	ErrLockManagerUnavailable = errors.New("server: lock manager unavailable")
)

Functions

func ErrorToProtoError

func ErrorToProtoError(err error) *pb.ErrorDetail

ErrorToProtoError converts Go errors to protobuf ErrorDetail messages. This function maps common server errors to appropriate ErrorCode values and provides structured error information for clients.

func ValidateAddress

func ValidateAddress(addr string) error

ValidateAddress checks if a server address is in valid host:port format.

Types

type ConnectionInfo

type ConnectionInfo struct {
	RemoteAddr   string    // Client's remote address
	ConnectedAt  time.Time // Time the connection was established
	LastActive   time.Time // Last time a request was received
	RequestCount int64     // Total number of requests from this connection
}

ConnectionInfo holds metadata about a gRPC client connection.

type ConnectionManager

type ConnectionManager interface {
	// Registers a new connection
	OnConnect(remoteAddr string)

	// Removes an existing connection
	OnDisconnect(remoteAddr string)

	// Updates activity for a connection
	OnRequest(remoteAddr string)

	// Returns the number of active connections
	GetActiveConnections() int

	// Returns a snapshot of all connections
	GetAllConnectionInfo() map[string]ConnectionInfo
}

ConnectionManager manages gRPC client connections and their lifecycle.

func NewConnectionManager

func NewConnectionManager(
	metrics ServerMetrics,
	logger logger.Logger,
	clock raft.Clock,
) ConnectionManager

NewConnectionManager returns a new ConnectionManager. Uses the provided metrics, logger, and clock. Falls back to raft.StandardClock if none is given.

type LeaderRedirectError

type LeaderRedirectError struct {
	LeaderAddress string
	LeaderID      string
}

LeaderRedirectError contains information needed to redirect a client to the current leader. It is used when a write request is sent to a follower node.

func NewLeaderRedirectError

func NewLeaderRedirectError(leaderAddress, leaderID string) *LeaderRedirectError

NewLeaderRedirectError creates a new LeaderRedirectError.

func (*LeaderRedirectError) Error

func (e *LeaderRedirectError) Error() string

Error implements the error interface, providing a human-readable message.

type NoOpServerMetrics

type NoOpServerMetrics struct{}

NoOpServerMetrics provides a no-operation implementation of ServerMetrics. All methods are empty and safe for concurrent use.

func (*NoOpServerMetrics) IncrClientError

func (n *NoOpServerMetrics) IncrClientError(method string, errorCode pb.ErrorCode)

func (*NoOpServerMetrics) IncrConcurrentRequests

func (n *NoOpServerMetrics) IncrConcurrentRequests(method string, delta int)

func (*NoOpServerMetrics) IncrGRPCRequest

func (n *NoOpServerMetrics) IncrGRPCRequest(method string, success bool)

func (*NoOpServerMetrics) IncrHealthCheck

func (n *NoOpServerMetrics) IncrHealthCheck(healthy bool)

func (*NoOpServerMetrics) IncrLeaderRedirect

func (n *NoOpServerMetrics) IncrLeaderRedirect(method string)

func (*NoOpServerMetrics) IncrLockExpiration

func (n *NoOpServerMetrics) IncrLockExpiration()

func (*NoOpServerMetrics) IncrQueueOverflow

func (n *NoOpServerMetrics) IncrQueueOverflow(queueType string)

func (*NoOpServerMetrics) IncrRaftProposal

func (n *NoOpServerMetrics) IncrRaftProposal(operation types.LockOperation, success bool)

func (*NoOpServerMetrics) IncrRetry

func (n *NoOpServerMetrics) IncrRetry(method string)

func (*NoOpServerMetrics) IncrServerError

func (n *NoOpServerMetrics) IncrServerError(method string, errorType string)

func (*NoOpServerMetrics) IncrValidationError

func (n *NoOpServerMetrics) IncrValidationError(method string, errorType string)

func (*NoOpServerMetrics) ObserveQueueLength

func (n *NoOpServerMetrics) ObserveQueueLength(queueType string, length int)

func (*NoOpServerMetrics) ObserveRaftProposalLatency

func (n *NoOpServerMetrics) ObserveRaftProposalLatency(
	operation types.LockOperation,
	latency time.Duration,
)

func (*NoOpServerMetrics) ObserveRequestLatency

func (n *NoOpServerMetrics) ObserveRequestLatency(method string, latency time.Duration)

func (*NoOpServerMetrics) ObserveRequestSize

func (n *NoOpServerMetrics) ObserveRequestSize(method string, sizeBytes int)

func (*NoOpServerMetrics) ObserveResponseSize

func (n *NoOpServerMetrics) ObserveResponseSize(method string, sizeBytes int)

func (*NoOpServerMetrics) Reset

func (n *NoOpServerMetrics) Reset()

func (*NoOpServerMetrics) SetActiveConnections

func (n *NoOpServerMetrics) SetActiveConnections(count int)

func (*NoOpServerMetrics) SetRaftCommitIndex

func (n *NoOpServerMetrics) SetRaftCommitIndex(index types.Index)

func (*NoOpServerMetrics) SetRaftTerm

func (n *NoOpServerMetrics) SetRaftTerm(term types.Term)

func (*NoOpServerMetrics) SetServerState

func (n *NoOpServerMetrics) SetServerState(isLeader bool, isHealthy bool)

type ProposalTracker

type ProposalTracker interface {
	// Track begins monitoring a new proposal.
	// The proposal.ID must be unique.
	// The proposal.ResultCh is used to send back the outcome and
	// should be a buffered channel to prevent blocking the tracker.
	Track(proposal *types.PendingProposal) error

	// HandleAppliedCommand is called when a Raft log entry has been successfully
	// applied to the state machine. It resolves the associated pending proposal.
	HandleAppliedCommand(applyMsg types.ApplyMsg)

	// HandleSnapshotApplied is invoked when a snapshot is applied to the state machine.
	// This invalidates any pending proposals that fall within the snapshot's covered range.
	HandleSnapshotApplied(snapshotIndex types.Index, snapshotTerm types.Term)

	// ClientCancel is called when a client's context is canceled before the proposal
	// is committed and applied. It cleans up the corresponding pending proposal and
	// signals any waiting goroutines.
	// Returns true if the proposal was found and canceled; false otherwise.
	ClientCancel(proposalID types.ProposalID, reason error) bool

	// GetPendingCount returns the total number of proposals currently pending
	// Raft commitment and application.
	GetPendingCount() int64

	// GetStats returns aggregated metrics and statistics related to proposal tracking.
	GetStats() types.ProposalStats

	// GetPendingProposal retrieves a *copy* of the specified pending proposal, if it exists.
	// Useful for observation and debugging; does not alter internal tracker state.
	GetPendingProposal(proposalID types.ProposalID) (types.PendingProposal, bool)

	// Cleanup performs manual removal of expired or stale pending proposals.
	// Returns the number of proposals removed.
	Cleanup() int

	// Close gracefully shuts down the tracker, terminating background processes
	// and failing any unresolved proposals.
	// Returns an error if the shutdown process encounters issues.
	Close() error
}

ProposalTracker defines the interface for managing Raft proposals, tracking their lifecycle, and resolving outcomes as they are applied or invalidated.

func NewProposalTracker

func NewProposalTracker(logger logger.Logger, opts ...ProposalTrackerOption) ProposalTracker

NewProposalTracker creates a new enhanced proposal tracker.

type ProposalTrackerOption

type ProposalTrackerOption func(*proposalTracker)

ProposalTrackerOption defines a functional option for configuring a proposalTracker instance.

func WithCleanupInterval

func WithCleanupInterval(interval time.Duration) ProposalTrackerOption

WithCleanupInterval sets the frequency at which the background cleanup process runs. A non-positive interval disables the periodic cleanup goroutine.

func WithClock

func WithClock(clock raft.Clock) ProposalTrackerOption

WithClock injects a custom clock implementation used for time-based operations such as expiration.

func WithMaxPendingAge

func WithMaxPendingAge(age time.Duration) ProposalTrackerOption

WithMaxPendingAge configures the maximum allowed age for a proposal before it is considered expired and eligible for removal during cleanup. A non-positive value disables automatic expiration.

type RaftLockServer

type RaftLockServer interface {
	pb.RaftLockServer

	// Start initializes and runs the gRPC server and all underlying components,
	// including the Raft node and background workers.
	//
	// Returns an error if initialization fails (e.g., port conflict, Raft failure).
	Start(ctx context.Context) error

	// Stop gracefully shuts down the server and all components.
	// The provided context can set a deadline for shutdown.
	Stop(ctx context.Context) error

	// IsLeader reports whether this node is currently the Raft leader.
	// Only the leader can handle write operations.
	IsLeader() bool

	// LeaderAddress returns the address of the current Raft leader.
	// Returns an empty string if the leader is unknown.
	GetLeaderAddress() string

	// NodeID returns the unique Raft node ID of this server.
	GetNodeID() string

	// Metrics returns current metrics for observability and monitoring.
	Metrics() ServerMetrics
}

RaftLockServer defines the interface for a Raft-backed distributed lock service. It provides strong consistency guarantees by leveraging the Raft consensus algorithm.

The server handles client requests, validates them, redirects to the leader if needed, and interfaces with the Raft-based lock manager.

func NewRaftLockServer

func NewRaftLockServer(config RaftLockServerConfig) (RaftLockServer, error)

NewRaftLockServer creates a new RaftLockServer instance using the provided configuration.

func RaftLockServerQuickBuild

func RaftLockServerQuickBuild(
	nodeID types.NodeID,
	listenAddr string,
	peers map[types.NodeID]raft.PeerConfig,
	dataDir string,
) (RaftLockServer, error)

RaftLockServerQuickBuild is a convenience method for building a server with minimal configuration.

type RaftLockServerBuilder

type RaftLockServerBuilder struct {
	// contains filtered or unexported fields
}

RaftLockServerBuilder helps construct a RaftLockServer with validated configuration and sane defaults.

func NewRaftLockServerBuilder

func NewRaftLockServerBuilder() *RaftLockServerBuilder

NewRaftLockServerBuilder returns a ServerBuilder preloaded with default configuration values.

func (*RaftLockServerBuilder) Build

Build constructs a RaftLockServer using the current builder state. Returns an error if required fields are missing or configuration is invalid.

func (*RaftLockServerBuilder) WithClientAPIAddress

func (b *RaftLockServerBuilder) WithClientAPIAddress(address string) *RaftLockServerBuilder

WithClientAPIAddress sets the address for the client-facing API. If not set, a default address is used.

func (*RaftLockServerBuilder) WithDataDir

func (b *RaftLockServerBuilder) WithDataDir(dataDir string) *RaftLockServerBuilder

WithDataDir sets the directory for storing persistent data. This must be set explicitly.

func (*RaftLockServerBuilder) WithHealthCheck

func (b *RaftLockServerBuilder) WithHealthCheck(
	interval, timeout time.Duration,
) *RaftLockServerBuilder

WithHealthCheck sets the health check interval and timeout. Values <= 0 use the default values.

func (*RaftLockServerBuilder) WithLeaderRedirect

func (b *RaftLockServerBuilder) WithLeaderRedirect(enabled bool) *RaftLockServerBuilder

WithLeaderRedirect enables or disables automatic leader redirection.

func (*RaftLockServerBuilder) WithLimits

func (b *RaftLockServerBuilder) WithLimits(
	maxRequestSize, maxResponseSize, maxConcurrentReqs int,
) *RaftLockServerBuilder

WithLimits sets request size and concurrency limits. Values <= 0 leave the defaults unchanged.

func (*RaftLockServerBuilder) WithListenAddress

func (b *RaftLockServerBuilder) WithListenAddress(address string) *RaftLockServerBuilder

WithListenAddress sets the gRPC server's listening address. If not set, a default address is used.

func (*RaftLockServerBuilder) WithLogger

WithLogger sets the server logger. If nil, a no-op logger is used.

func (*RaftLockServerBuilder) WithMetrics

WithMetrics sets the metrics collector. If nil, a no-op implementation is used.

func (*RaftLockServerBuilder) WithNodeID

WithNodeID sets the Raft node's unique identifier. This must be set explicitly.

func (*RaftLockServerBuilder) WithPeers

WithPeers sets the Raft cluster configuration. The map must include an entry for this node's NodeID. This must be set explicitly.

func (*RaftLockServerBuilder) WithRaftConfig

func (b *RaftLockServerBuilder) WithRaftConfig(raftConfig raft.Config) *RaftLockServerBuilder

WithRaftConfig overrides the default Raft configuration. RaftConfig.ID and RaftConfig.Peers should match NodeID and Peers in ServerConfig.

func (*RaftLockServerBuilder) WithRateLimit

func (b *RaftLockServerBuilder) WithRateLimit(
	enabled bool,
	rateLimit, burst int,
	window time.Duration,
) *RaftLockServerBuilder

WithRateLimit configures rate limiting. Values <= 0 use the default if rate limiting is enabled.

func (*RaftLockServerBuilder) WithSerializer

func (b *RaftLockServerBuilder) WithSerializer(serializer lock.Serializer) *RaftLockServerBuilder

WithSerializer sets the lock.Serializer used for encoding/decoding lock commands. If nil, the default JSON serializer is used.

func (*RaftLockServerBuilder) WithTimeouts

func (b *RaftLockServerBuilder) WithTimeouts(
	requestTimeout, shutdownTimeout, redirectTimeout time.Duration,
) *RaftLockServerBuilder

WithTimeouts sets timeouts for request handling, shutdown, and leader redirection. Values <= 0 leave the defaults unchanged.

type RaftLockServerConfig

type RaftLockServerConfig struct {
	// NodeID uniquely identifies this node in the Raft cluster.
	NodeID types.NodeID

	// ListenAddress is the gRPC server's bind address (e.g., "0.0.0.0:8080").
	ListenAddress string

	// ClientAPIAddress is the network address exposed to external clients
	// for accessing the public API (e.g., "127.0.0.1:9090").
	ClientAPIAddress string

	// Peers maps all known Raft nodes by NodeID, including this node itself.
	Peers map[types.NodeID]raft.PeerConfig

	// RaftConfig holds the Raft protocol-specific configuration.
	RaftConfig raft.Config

	// DataDir is the path to store Raft state (logs, snapshots, etc.).
	DataDir string

	RequestTimeout    time.Duration // Max time to handle a client request
	ShutdownTimeout   time.Duration // Max time allowed for graceful shutdown
	MaxRequestSize    int           // Maximum size of incoming requests (in bytes)
	MaxResponseSize   int           // Maximum size of outgoing responses (in bytes)
	MaxConcurrentReqs int           // Max number of requests processed in parallel

	EnableRateLimit bool          // Whether rate limiting is enforced
	RateLimit       int           // Requests per second allowed per client
	RateLimitBurst  int           // Burst capacity for client requests
	RateLimitWindow time.Duration // Time window used for rate calculation

	Logger     logger.Logger
	Metrics    ServerMetrics
	Clock      raft.Clock
	Serializer lock.Serializer

	HealthCheckInterval time.Duration // Frequency of internal health checks
	HealthCheckTimeout  time.Duration // Timeout for individual health checks

	EnableLeaderRedirect bool          // Redirect write requests to leader if not one
	RedirectTimeout      time.Duration // Timeout when attempting leader redirection
}

RaftLockServerConfig holds the configuration settings for a RaftLock server instance.

func DefaultRaftLockServerConfig

func DefaultRaftLockServerConfig() RaftLockServerConfig

DefaultRaftLockServerConfig returns a ServerConfig pre-populated with safe defaults. Callers must explicitly set NodeID, Peers, DataDir, and RaftConfig.ID.

func (*RaftLockServerConfig) Validate

func (c *RaftLockServerConfig) Validate() error

Validate checks if the server configuration is valid.

type RaftLockServerConfigError

type RaftLockServerConfigError struct {
	Message string
}

RaftLockServerConfigError represents a validation error in ServerConfig.

func NewRaftLockServerConfigError

func NewRaftLockServerConfigError(msg string) *RaftLockServerConfigError

NewRaftLockServerConfigError returns a new ConfigError instance.

func (*RaftLockServerConfigError) Error

func (e *RaftLockServerConfigError) Error() string

Error implements the error interface.

type RateLimiter

type RateLimiter interface {
	Allow() bool
	Wait(ctx context.Context) error
}

RateLimiter defines the interface for request rate limiting.

type RequestValidator

type RequestValidator interface {
	// ValidateAcquireRequest validates an AcquireRequest.
	ValidateAcquireRequest(req *pb.AcquireRequest) error

	// ValidateReleaseRequest validates a ReleaseRequest.
	ValidateReleaseRequest(req *pb.ReleaseRequest) error

	// ValidateRenewRequest validates a RenewRequest.
	ValidateRenewRequest(req *pb.RenewRequest) error

	// ValidateGetLockInfoRequest validates a GetLockInfoRequest.
	ValidateGetLockInfoRequest(req *pb.GetLockInfoRequest) error

	// ValidateGetLocksRequest validates a GetLocksRequest.
	ValidateGetLocksRequest(req *pb.GetLocksRequest) error

	// ValidateEnqueueWaiterRequest validates an EnqueueWaiterRequest.
	ValidateEnqueueWaiterRequest(req *pb.EnqueueWaiterRequest) error

	// ValidateCancelWaitRequest validates a CancelWaitRequest.
	ValidateCancelWaitRequest(req *pb.CancelWaitRequest) error

	// ValidateBackoffAdviceRequest validates a BackoffAdviceRequest.
	ValidateBackoffAdviceRequest(req *pb.BackoffAdviceRequest) error

	// ValidateGetStatusRequest validates a GetStatusRequest.
	ValidateGetStatusRequest(req *pb.GetStatusRequest) error

	// ValidateHealthRequest validates a HealthRequest.
	ValidateHealthRequest(req *pb.HealthRequest) error
}

RequestValidator defines the interface for validating incoming gRPC requests to the RaftLock server. Each method should return an error if the request is invalid.

func NewRequestValidator

func NewRequestValidator(logger logger.Logger) RequestValidator

NewRequestValidator creates a new default request validator.

type ServerError

type ServerError struct {
	Operation string // The operation being performed when the error occurred.
	Cause     error  // The underlying error, if any.
	Message   string // A high-level message describing the server error.
}

ServerError represents a generic internal server error, potentially wrapping an underlying cause.

func NewServerError

func NewServerError(operation string, cause error, message string) *ServerError

NewServerError creates a new ServerError.

func (*ServerError) Error

func (e *ServerError) Error() string

Error implements the error interface, providing context about the operation and cause.

func (*ServerError) Unwrap

func (e *ServerError) Unwrap() error

Unwrap provides compatibility with Go's errors.Is and errors.As by returning the cause.

type ServerMetrics

type ServerMetrics interface {
	// IncrGRPCRequest increments the count for an RPC method invocation.
	// 'method' should match a RaftLock RPC (e.g., "Acquire", "Release").
	// 'success' should reflect overall success from the client’s perspective.
	IncrGRPCRequest(method string, success bool)

	// IncrLeaderRedirect increments the count of requests redirected to the current leader.
	// Typically applies to write operations when the node is not the Raft leader.
	IncrLeaderRedirect(method string)

	// IncrRetry increments a counter when a client retries a failed or redirected request.
	IncrRetry(method string)

	// IncrRaftProposal increments proposal submission counters.
	// 'operation' should reflect the type of lock operation (Acquire, Release, Renew).
	// 'success' indicates whether the proposal was committed by Raft.
	IncrRaftProposal(operation types.LockOperation, success bool)

	// IncrValidationError increments validation failure counters.
	// 'method' is the RPC where validation failed.
	// 'errorType' is a string like "invalid_ttl", "missing_field", or "bad_lock_id".
	IncrValidationError(method string, errorType string)

	// IncrClientError increments counts for errors caused by invalid client input or actions.
	// This typically maps to RaftLock ErrorCodes in the 100–199 or 300–399 range.
	IncrClientError(method string, errorCode pb.ErrorCode)

	// IncrServerError increments counts for internal server-side errors.
	// 'errorType' might include values like "internal_error", "raft_unavailable", "timeout".
	IncrServerError(method string, errorType string)

	// IncrQueueOverflow increments a counter when a queue exceeds its configured capacity.
	IncrQueueOverflow(queueType string)

	// IncrLockExpiration increments a counter when a lock expires due to TTL.
	IncrLockExpiration()

	// ObserveRequestLatency records end-to-end latency for a gRPC method call.
	// This includes validation, Raft proposal (if applicable), and response formatting.
	ObserveRequestLatency(method string, latency time.Duration)

	// ObserveRaftProposalLatency records the latency from submitting a Raft proposal
	// to it being either committed or rejected.
	ObserveRaftProposalLatency(operation types.LockOperation, latency time.Duration)

	// ObserveQueueLength tracks the current size of internal queues (e.g., gRPC, Raft).
	// Typical queue types: "grpc_requests", "raft_proposals", "wait_queue".
	ObserveQueueLength(queueType string, length int)

	// ObserveRequestSize records the raw size of incoming gRPC requests in bytes.
	ObserveRequestSize(method string, sizeBytes int)

	// ObserveResponseSize records the raw size of outgoing gRPC responses in bytes.
	ObserveResponseSize(method string, sizeBytes int)

	// IncrConcurrentRequests adjusts the count of concurrently active requests.
	// Use delta +1 at request start, -1 when completed.
	IncrConcurrentRequests(method string, delta int)

	// IncrHealthCheck increments the count of health check invocations.
	// 'healthy' reflects the result of the check.
	IncrHealthCheck(healthy bool)

	// SetServerState sets leadership and health state gauges.
	// 'isLeader' reflects current Raft role; 'isHealthy' reflects readiness for requests.
	SetServerState(isLeader bool, isHealthy bool)

	// SetActiveConnections sets the number of live gRPC connections to this server.
	SetActiveConnections(count int)

	// SetRaftTerm sets the current Raft term for this node, useful for tracking elections.
	SetRaftTerm(term types.Term)

	// SetRaftCommitIndex sets the commit index last acknowledged by this Raft node.
	SetRaftCommitIndex(index types.Index)

	// Reset clears all metric counters and resets gauges.
	// Useful primarily in unit or integration tests.
	Reset()
}

ServerMetrics defines observability hooks for RaftLock server operations. Metrics cover gRPC lifecycle, Raft proposals, validation, error types, and system health. All methods must be safe for concurrent use.

func NewNoOpServerMetrics

func NewNoOpServerMetrics() ServerMetrics

NewNoOpServerMetrics creates a new no-operation metrics implementation.

type ServerOperationalState

type ServerOperationalState string

ServerOperationalState defines the possible operational states of the server.

const (
	// ServerStateStarting indicates the server is in the process of starting up.
	ServerStateStarting ServerOperationalState = "starting"
	// ServerStateRunning indicates the server is running and accepting requests.
	ServerStateRunning ServerOperationalState = "running"
	// ServerStateStopping indicates the server is in the process of shutting down.
	ServerStateStopping ServerOperationalState = "stopping"
	// ServerStateStopped indicates the server has been stopped.
	ServerStateStopped ServerOperationalState = "stopped"
)

type TokenBucketRateLimiter

type TokenBucketRateLimiter struct {
	// contains filtered or unexported fields
}

TokenBucketRateLimiter implements rate limiting using a token bucket algorithm.

func NewTokenBucketRateLimiter

func NewTokenBucketRateLimiter(
	maxRequests, burst int,
	window time.Duration,
	logger logger.Logger,
) *TokenBucketRateLimiter

NewTokenBucketRateLimiter creates a new token bucket rate limiter.

func (*TokenBucketRateLimiter) Allow

func (rl *TokenBucketRateLimiter) Allow() bool

Allow returns true if a request can proceed immediately.

func (*TokenBucketRateLimiter) Wait

Wait blocks until a request can proceed or the context is cancelled.

type ValidationError

type ValidationError struct {
	Field   string // The name of the field that failed validation.
	Value   any    // The value of the field that caused the error.
	Message string // A descriptive message explaining the validation failure.
}

ValidationError represents a request validation error with details about the specific field.

func NewValidationError

func NewValidationError(field string, value any, message string) *ValidationError

NewValidationError creates a new ValidationError.

func (*ValidationError) Error

func (e *ValidationError) Error() string

Error implements the error interface, providing a structured validation error message.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL