Documentation
¶
Index ¶
- Constants
- Variables
- func ErrorToProtoError(err error) *pb.ErrorDetail
- func ValidateAddress(addr string) error
- type ConnectionInfo
- type ConnectionManager
- type LeaderRedirectError
- type NoOpServerMetrics
- func (n *NoOpServerMetrics) IncrClientError(method string, errorCode pb.ErrorCode)
- func (n *NoOpServerMetrics) IncrConcurrentRequests(method string, delta int)
- func (n *NoOpServerMetrics) IncrGRPCRequest(method string, success bool)
- func (n *NoOpServerMetrics) IncrHealthCheck(healthy bool)
- func (n *NoOpServerMetrics) IncrLeaderRedirect(method string)
- func (n *NoOpServerMetrics) IncrLockExpiration()
- func (n *NoOpServerMetrics) IncrQueueOverflow(queueType string)
- func (n *NoOpServerMetrics) IncrRaftProposal(operation types.LockOperation, success bool)
- func (n *NoOpServerMetrics) IncrRetry(method string)
- func (n *NoOpServerMetrics) IncrServerError(method string, errorType string)
- func (n *NoOpServerMetrics) IncrValidationError(method string, errorType string)
- func (n *NoOpServerMetrics) ObserveQueueLength(queueType string, length int)
- func (n *NoOpServerMetrics) ObserveRaftProposalLatency(operation types.LockOperation, latency time.Duration)
- func (n *NoOpServerMetrics) ObserveRequestLatency(method string, latency time.Duration)
- func (n *NoOpServerMetrics) ObserveRequestSize(method string, sizeBytes int)
- func (n *NoOpServerMetrics) ObserveResponseSize(method string, sizeBytes int)
- func (n *NoOpServerMetrics) Reset()
- func (n *NoOpServerMetrics) SetActiveConnections(count int)
- func (n *NoOpServerMetrics) SetRaftCommitIndex(index types.Index)
- func (n *NoOpServerMetrics) SetRaftTerm(term types.Term)
- func (n *NoOpServerMetrics) SetServerState(isLeader bool, isHealthy bool)
- type ProposalTracker
- type ProposalTrackerOption
- type RaftLockServer
- type RaftLockServerBuilder
- func (b *RaftLockServerBuilder) Build() (RaftLockServer, error)
- func (b *RaftLockServerBuilder) WithClientAPIAddress(address string) *RaftLockServerBuilder
- func (b *RaftLockServerBuilder) WithDataDir(dataDir string) *RaftLockServerBuilder
- func (b *RaftLockServerBuilder) WithHealthCheck(interval, timeout time.Duration) *RaftLockServerBuilder
- func (b *RaftLockServerBuilder) WithLeaderRedirect(enabled bool) *RaftLockServerBuilder
- func (b *RaftLockServerBuilder) WithLimits(maxRequestSize, maxResponseSize, maxConcurrentReqs int) *RaftLockServerBuilder
- func (b *RaftLockServerBuilder) WithListenAddress(address string) *RaftLockServerBuilder
- func (b *RaftLockServerBuilder) WithLogger(logger logger.Logger) *RaftLockServerBuilder
- func (b *RaftLockServerBuilder) WithMetrics(metrics ServerMetrics) *RaftLockServerBuilder
- func (b *RaftLockServerBuilder) WithNodeID(nodeID types.NodeID) *RaftLockServerBuilder
- func (b *RaftLockServerBuilder) WithPeers(peers map[types.NodeID]raft.PeerConfig) *RaftLockServerBuilder
- func (b *RaftLockServerBuilder) WithRaftConfig(raftConfig raft.Config) *RaftLockServerBuilder
- func (b *RaftLockServerBuilder) WithRateLimit(enabled bool, rateLimit, burst int, window time.Duration) *RaftLockServerBuilder
- func (b *RaftLockServerBuilder) WithSerializer(serializer lock.Serializer) *RaftLockServerBuilder
- func (b *RaftLockServerBuilder) WithTimeouts(requestTimeout, shutdownTimeout, redirectTimeout time.Duration) *RaftLockServerBuilder
- type RaftLockServerConfig
- type RaftLockServerConfigError
- type RateLimiter
- type RequestValidator
- type ServerError
- type ServerMetrics
- type ServerOperationalState
- type TokenBucketRateLimiter
- type ValidationError
Constants ¶
const ( // DefaultListenAddress is the default address for the server's client-facing gRPC endpoint. DefaultListenAddress = "0.0.0.0:8080" // DefaultRequestTimeout is the default timeout for processing individual client requests. DefaultRequestTimeout = 30 * time.Second // DefaultShutdownTimeout is the default timeout for graceful server shutdown. DefaultShutdownTimeout = 10 * time.Second // DefaultMaxRequestSize is the default maximum size for incoming client gRPC requests (4MB). DefaultMaxRequestSize = 4 * 1024 * 1024 // DefaultMaxResponseSize is the default maximum size for outgoing client gRPC responses (4MB). DefaultMaxResponseSize = 4 * 1024 * 1024 // DefaultMaxConcurrentRequests is the default maximum number of concurrent client requests the server will process. DefaultMaxConcurrentRequests = 1000 // DefaultRateLimit is the default number of requests per second per client. DefaultRateLimit = 100 // DefaultRateLimitBurst is the default burst size for rate limiting. DefaultRateLimitBurst = 200 // DefaultRateLimitWindow is the default time window for rate limiting calculations. DefaultRateLimitWindow = time.Second // DefaultHealthCheckInterval is the default interval for internal server health checks. DefaultHealthCheckInterval = 30 * time.Second // DefaultHealthCheckTimeout is the default timeout for individual internal health checks. DefaultHealthCheckTimeout = 5 * time.Second // DefaultRedirectTimeout is the default timeout when redirecting a client request to the Raft leader. DefaultRedirectTimeout = 5 * time.Second // DefaultRaftTickInterval is the default interval at which the server application should call Raft.Tick(). // This should generally align with raft.NominalTickInterval. DefaultRaftTickInterval = 100 * time.Millisecond // DefaultLockManagerTickInterval is the default interval for LockManager's periodic tasks (e.g., expirations). DefaultLockManagerTickInterval = 1 * time.Second // DefaultMetricsReportInterval is the default interval for periodic metrics reporting or flushing. DefaultMetricsReportInterval = 10 * time.Second // DefaultGRPCMaxRecvMsgSize is the default maximum size for incoming gRPC messages from clients (16MB). DefaultGRPCMaxRecvMsgSize = 16 * 1024 * 1024 // DefaultGRPCMaxSendMsgSize is the default maximum size for outgoing gRPC messages to clients (16MB). DefaultGRPCMaxSendMsgSize = 16 * 1024 * 1024 // DefaultGRPCKeepaliveTime is the default interval for the server to send keepalive pings to idle client connections. // This helps detect dead connections and keep active ones alive through intermediaries. DefaultGRPCKeepaliveTime = 30 * time.Second // DefaultGRPCKeepaliveTimeout is the default timeout for the server to wait for a keepalive acknowledgment from clients. // If a client doesn't respond within this time after a ping, the connection may be considered dead. DefaultGRPCKeepaliveTimeout = 5 * time.Second // DefaultGRPCMinClientPingInterval is the minimum interval a client should wait before sending pings. DefaultGRPCMinClientPingInterval = 10 * time.Second // MaxLockIDLength is the maximum allowed length for lock IDs. MaxLockIDLength = 256 // MaxClientIDLength is the maximum allowed length for client IDs. MaxClientIDLength = 256 // MaxMetadataEntries is the maximum number of metadata entries allowed per lock. MaxMetadataEntries = 32 // MaxMetadataKeyLength is the maximum length for metadata keys. MaxMetadataKeyLength = 64 // MaxMetadataValueLength is the maximum length for metadata values. MaxMetadataValueLength = 256 // MinLockTTL is the minimum allowed TTL for locks. MinLockTTL = 1 * time.Second // MaxLockTTL is the maximum allowed TTL for locks. MaxLockTTL = 24 * time.Hour // MinWaitTimeout is the minimum allowed wait timeout for queue operations. MinWaitTimeout = 1 * time.Second // MaxWaitTimeout is the maximum allowed wait timeout for queue operations. MaxWaitTimeout = 10 * time.Minute // MinBackoffAdvice is the minimum backoff duration advised before retrying a lock acquisition. MinBackoffAdvice = 1 * time.Second // MaxBackoffAdvice is the maximum backoff duration advised before retrying a lock acquisition. MaxBackoffAdvice = 30 * time.Second // DefaultBackoffInitial is the initial backoff delay when retrying a lock. DefaultBackoffInitial = 50 * time.Millisecond // DefaultBackoffMax is the maximum backoff for retrying a lock when no info is available. DefaultBackoffMax = 1 * time.Second // DefaultBackoffMultiplier controls exponential growth of retry delays. DefaultBackoffMultiplier = 1.5 // DefaultBackoffJitterFactor controls randomization to avoid thundering herd. DefaultBackoffJitterFactor = 0.2 // MinPriority is the minimum allowed priority value for wait queue operations. MinPriority = -1000 // MaxPriority is the maximum allowed priority value for wait queue operations. MaxPriority = 1000 // DefaultPriority is the default priority for wait queue operations if not specified. DefaultPriority = 0 // DefaultPageLimit is the default number of items returned in paginated responses. DefaultPageLimit = 100 // MaxPageLimit is the maximum number of items that can be requested in a single page. MaxPageLimit = 1000 // MaxRequestIDLength is the maximum allowed length for request IDs (used for idempotency). MaxRequestIDLength = 128 // DefaultMetricsRetentionPeriod is how long to keep historical metrics data if applicable. DefaultMetricsRetentionPeriod = 1 * time.Hour // DefaultMetricsSampleRate is the rate at which detailed metrics are sampled if applicable. DefaultMetricsSampleRate = 0.1 // 10% sampling // ErrMsgInvalidLockID is the error message template for invalid lock IDs. ErrMsgInvalidLockID = "lock_id must be a non-empty string with length <= %d characters" // ErrMsgInvalidClientID is the error message template for invalid client IDs. ErrMsgInvalidClientID = "client_id must be a non-empty string with length <= %d characters" // ErrMsgInvalidTTL is the error message template for invalid TTL values. ErrMsgInvalidTTL = "ttl must be between %v and %v" // ErrMsgInvalidTimeout is the error message template for invalid timeout values. ErrMsgInvalidTimeout = "timeout must be between %v and %v" // ErrMsgInvalidPriority is the error message template for invalid priority values. ErrMsgInvalidPriority = "priority must be between %d and %d" // ErrMsgTooManyMetadata is the error message for too many metadata entries. ErrMsgTooManyMetadata = "metadata cannot have more than %d entries" // ErrMsgMetadataKeyTooLong is the error message for metadata keys that are too long. ErrMsgMetadataKeyTooLong = "metadata key length cannot exceed %d characters" // ErrMsgMetadataValueTooLong is the error message for metadata values that are too long. ErrMsgMetadataValueTooLong = "metadata value length cannot exceed %d characters" // DefaultProposalMaxPendingAge is the default max duration a proposal can remain pending before expiration. DefaultProposalMaxPendingAge = 5 * time.Minute // DefaultProposalCleanupInterval is the default interval for proposal tracker cleanup tasks. DefaultProposalCleanupInterval = 30 * time.Second )
const ( MethodAcquire = "Acquire" MethodRelease = "Release" MethodRenew = "Renew" MethodGetLockInfo = "GetLockInfo" MethodGetLocks = "GetLocks" MethodEnqueueWaiter = "EnqueueWaiter" MethodCancelWait = "CancelWait" MethodGetBackoffAdvice = "GetBackoffAdvice" MethodGetStatus = "GetStatus" MethodHealth = "Health" )
gRPC method names for metrics collection and logging
const ( QueueTypeGRPCRequests = "grpc_requests" QueueTypeRaftProposals = "raft_proposals" QueueTypeLockWaiters = "lock_waiters" )
Queue types for metrics and logging
const ( ErrorTypeMissingField = "missing_field" ErrorTypeInvalidFormat = "invalid_format" ErrorTypeOutOfRange = "out_of_range" ErrorTypeTooLong = "too_long" ErrorTypeInternalError = "internal_error" ErrorTypeTimeout = "timeout" ErrorTypeRateLimit = "rate_limit_exceeded" // More specific for when limit is hit )
Error types for metrics and logging (used with ServerMetrics.IncrValidationError/IncrServerError)
Variables ¶
var ( // ErrServerNotStarted indicates the server has not been started or is not yet ready. ErrServerNotStarted = errors.New("server: server not started or not ready") // ErrServerAlreadyStarted indicates an attempt to start an already running server. ErrServerAlreadyStarted = errors.New("server: server already started") // ErrServerStopped indicates the server has been stopped and cannot process requests. ErrServerStopped = errors.New("server: server stopped") // ErrNotLeader indicates this server is not the current Raft leader and cannot process the request. ErrNotLeader = errors.New("server: this node is not the Raft leader") // ErrNoLeader indicates that no leader is currently elected or known in the Raft cluster. ErrNoLeader = errors.New("server: no leader available in the cluster") // ErrRequestTooLarge indicates the client's request exceeds the configured maximum allowed size. ErrRequestTooLarge = errors.New("server: request too large") // ErrResponseTooLarge indicates an internally generated response exceeds the maximum allowed size. // This is typically an internal server issue. ErrResponseTooLarge = errors.New("server: response too large") // ErrRateLimited indicates the request was rejected due to rate limiting policies. ErrRateLimited = errors.New("server: request rate limited") // ErrShutdownTimeout indicates the server's graceful shutdown process timed out. ErrShutdownTimeout = errors.New("server: shutdown timed out") // ErrInvalidRequest indicates the request is malformed or contains invalid parameters // not caught by more specific validation errors. ErrInvalidRequest = errors.New("server: invalid request") ErrRaftUnavailable = errors.New("server: raft consensus system unavailable") ErrLockManagerUnavailable = errors.New("server: lock manager unavailable") )
Functions ¶
func ErrorToProtoError ¶
func ErrorToProtoError(err error) *pb.ErrorDetail
ErrorToProtoError converts Go errors to protobuf ErrorDetail messages. This function maps common server errors to appropriate ErrorCode values and provides structured error information for clients.
func ValidateAddress ¶
ValidateAddress checks if a server address is in valid host:port format.
Types ¶
type ConnectionInfo ¶
type ConnectionInfo struct {
RemoteAddr string // Client's remote address
ConnectedAt time.Time // Time the connection was established
LastActive time.Time // Last time a request was received
RequestCount int64 // Total number of requests from this connection
}
ConnectionInfo holds metadata about a gRPC client connection.
type ConnectionManager ¶
type ConnectionManager interface {
// Registers a new connection
OnConnect(remoteAddr string)
// Removes an existing connection
OnDisconnect(remoteAddr string)
// Updates activity for a connection
OnRequest(remoteAddr string)
// Returns the number of active connections
GetActiveConnections() int
// Returns a snapshot of all connections
GetAllConnectionInfo() map[string]ConnectionInfo
}
ConnectionManager manages gRPC client connections and their lifecycle.
func NewConnectionManager ¶
func NewConnectionManager( metrics ServerMetrics, logger logger.Logger, clock raft.Clock, ) ConnectionManager
NewConnectionManager returns a new ConnectionManager. Uses the provided metrics, logger, and clock. Falls back to raft.StandardClock if none is given.
type LeaderRedirectError ¶
LeaderRedirectError contains information needed to redirect a client to the current leader. It is used when a write request is sent to a follower node.
func NewLeaderRedirectError ¶
func NewLeaderRedirectError(leaderAddress, leaderID string) *LeaderRedirectError
NewLeaderRedirectError creates a new LeaderRedirectError.
func (*LeaderRedirectError) Error ¶
func (e *LeaderRedirectError) Error() string
Error implements the error interface, providing a human-readable message.
type NoOpServerMetrics ¶
type NoOpServerMetrics struct{}
NoOpServerMetrics provides a no-operation implementation of ServerMetrics. All methods are empty and safe for concurrent use.
func (*NoOpServerMetrics) IncrClientError ¶
func (n *NoOpServerMetrics) IncrClientError(method string, errorCode pb.ErrorCode)
func (*NoOpServerMetrics) IncrConcurrentRequests ¶
func (n *NoOpServerMetrics) IncrConcurrentRequests(method string, delta int)
func (*NoOpServerMetrics) IncrGRPCRequest ¶
func (n *NoOpServerMetrics) IncrGRPCRequest(method string, success bool)
func (*NoOpServerMetrics) IncrHealthCheck ¶
func (n *NoOpServerMetrics) IncrHealthCheck(healthy bool)
func (*NoOpServerMetrics) IncrLeaderRedirect ¶
func (n *NoOpServerMetrics) IncrLeaderRedirect(method string)
func (*NoOpServerMetrics) IncrLockExpiration ¶
func (n *NoOpServerMetrics) IncrLockExpiration()
func (*NoOpServerMetrics) IncrQueueOverflow ¶
func (n *NoOpServerMetrics) IncrQueueOverflow(queueType string)
func (*NoOpServerMetrics) IncrRaftProposal ¶
func (n *NoOpServerMetrics) IncrRaftProposal(operation types.LockOperation, success bool)
func (*NoOpServerMetrics) IncrRetry ¶
func (n *NoOpServerMetrics) IncrRetry(method string)
func (*NoOpServerMetrics) IncrServerError ¶
func (n *NoOpServerMetrics) IncrServerError(method string, errorType string)
func (*NoOpServerMetrics) IncrValidationError ¶
func (n *NoOpServerMetrics) IncrValidationError(method string, errorType string)
func (*NoOpServerMetrics) ObserveQueueLength ¶
func (n *NoOpServerMetrics) ObserveQueueLength(queueType string, length int)
func (*NoOpServerMetrics) ObserveRaftProposalLatency ¶
func (n *NoOpServerMetrics) ObserveRaftProposalLatency( operation types.LockOperation, latency time.Duration, )
func (*NoOpServerMetrics) ObserveRequestLatency ¶
func (n *NoOpServerMetrics) ObserveRequestLatency(method string, latency time.Duration)
func (*NoOpServerMetrics) ObserveRequestSize ¶
func (n *NoOpServerMetrics) ObserveRequestSize(method string, sizeBytes int)
func (*NoOpServerMetrics) ObserveResponseSize ¶
func (n *NoOpServerMetrics) ObserveResponseSize(method string, sizeBytes int)
func (*NoOpServerMetrics) Reset ¶
func (n *NoOpServerMetrics) Reset()
func (*NoOpServerMetrics) SetActiveConnections ¶
func (n *NoOpServerMetrics) SetActiveConnections(count int)
func (*NoOpServerMetrics) SetRaftCommitIndex ¶
func (n *NoOpServerMetrics) SetRaftCommitIndex(index types.Index)
func (*NoOpServerMetrics) SetRaftTerm ¶
func (n *NoOpServerMetrics) SetRaftTerm(term types.Term)
func (*NoOpServerMetrics) SetServerState ¶
func (n *NoOpServerMetrics) SetServerState(isLeader bool, isHealthy bool)
type ProposalTracker ¶
type ProposalTracker interface {
// Track begins monitoring a new proposal.
// The proposal.ID must be unique.
// The proposal.ResultCh is used to send back the outcome and
// should be a buffered channel to prevent blocking the tracker.
Track(proposal *types.PendingProposal) error
// HandleAppliedCommand is called when a Raft log entry has been successfully
// applied to the state machine. It resolves the associated pending proposal.
HandleAppliedCommand(applyMsg types.ApplyMsg)
// HandleSnapshotApplied is invoked when a snapshot is applied to the state machine.
// This invalidates any pending proposals that fall within the snapshot's covered range.
HandleSnapshotApplied(snapshotIndex types.Index, snapshotTerm types.Term)
// ClientCancel is called when a client's context is canceled before the proposal
// is committed and applied. It cleans up the corresponding pending proposal and
// signals any waiting goroutines.
// Returns true if the proposal was found and canceled; false otherwise.
ClientCancel(proposalID types.ProposalID, reason error) bool
// GetPendingCount returns the total number of proposals currently pending
// Raft commitment and application.
GetPendingCount() int64
// GetStats returns aggregated metrics and statistics related to proposal tracking.
GetStats() types.ProposalStats
// GetPendingProposal retrieves a *copy* of the specified pending proposal, if it exists.
// Useful for observation and debugging; does not alter internal tracker state.
GetPendingProposal(proposalID types.ProposalID) (types.PendingProposal, bool)
// Cleanup performs manual removal of expired or stale pending proposals.
// Returns the number of proposals removed.
Cleanup() int
// Close gracefully shuts down the tracker, terminating background processes
// and failing any unresolved proposals.
// Returns an error if the shutdown process encounters issues.
Close() error
}
ProposalTracker defines the interface for managing Raft proposals, tracking their lifecycle, and resolving outcomes as they are applied or invalidated.
func NewProposalTracker ¶
func NewProposalTracker(logger logger.Logger, opts ...ProposalTrackerOption) ProposalTracker
NewProposalTracker creates a new enhanced proposal tracker.
type ProposalTrackerOption ¶
type ProposalTrackerOption func(*proposalTracker)
ProposalTrackerOption defines a functional option for configuring a proposalTracker instance.
func WithCleanupInterval ¶
func WithCleanupInterval(interval time.Duration) ProposalTrackerOption
WithCleanupInterval sets the frequency at which the background cleanup process runs. A non-positive interval disables the periodic cleanup goroutine.
func WithClock ¶
func WithClock(clock raft.Clock) ProposalTrackerOption
WithClock injects a custom clock implementation used for time-based operations such as expiration.
func WithMaxPendingAge ¶
func WithMaxPendingAge(age time.Duration) ProposalTrackerOption
WithMaxPendingAge configures the maximum allowed age for a proposal before it is considered expired and eligible for removal during cleanup. A non-positive value disables automatic expiration.
type RaftLockServer ¶
type RaftLockServer interface {
pb.RaftLockServer
// Start initializes and runs the gRPC server and all underlying components,
// including the Raft node and background workers.
//
// Returns an error if initialization fails (e.g., port conflict, Raft failure).
Start(ctx context.Context) error
// Stop gracefully shuts down the server and all components.
// The provided context can set a deadline for shutdown.
Stop(ctx context.Context) error
// IsLeader reports whether this node is currently the Raft leader.
// Only the leader can handle write operations.
IsLeader() bool
// LeaderAddress returns the address of the current Raft leader.
// Returns an empty string if the leader is unknown.
GetLeaderAddress() string
// NodeID returns the unique Raft node ID of this server.
GetNodeID() string
// Metrics returns current metrics for observability and monitoring.
Metrics() ServerMetrics
}
RaftLockServer defines the interface for a Raft-backed distributed lock service. It provides strong consistency guarantees by leveraging the Raft consensus algorithm.
The server handles client requests, validates them, redirects to the leader if needed, and interfaces with the Raft-based lock manager.
func NewRaftLockServer ¶
func NewRaftLockServer(config RaftLockServerConfig) (RaftLockServer, error)
NewRaftLockServer creates a new RaftLockServer instance using the provided configuration.
func RaftLockServerQuickBuild ¶
func RaftLockServerQuickBuild( nodeID types.NodeID, listenAddr string, peers map[types.NodeID]raft.PeerConfig, dataDir string, ) (RaftLockServer, error)
RaftLockServerQuickBuild is a convenience method for building a server with minimal configuration.
type RaftLockServerBuilder ¶
type RaftLockServerBuilder struct {
// contains filtered or unexported fields
}
RaftLockServerBuilder helps construct a RaftLockServer with validated configuration and sane defaults.
func NewRaftLockServerBuilder ¶
func NewRaftLockServerBuilder() *RaftLockServerBuilder
NewRaftLockServerBuilder returns a ServerBuilder preloaded with default configuration values.
func (*RaftLockServerBuilder) Build ¶
func (b *RaftLockServerBuilder) Build() (RaftLockServer, error)
Build constructs a RaftLockServer using the current builder state. Returns an error if required fields are missing or configuration is invalid.
func (*RaftLockServerBuilder) WithClientAPIAddress ¶
func (b *RaftLockServerBuilder) WithClientAPIAddress(address string) *RaftLockServerBuilder
WithClientAPIAddress sets the address for the client-facing API. If not set, a default address is used.
func (*RaftLockServerBuilder) WithDataDir ¶
func (b *RaftLockServerBuilder) WithDataDir(dataDir string) *RaftLockServerBuilder
WithDataDir sets the directory for storing persistent data. This must be set explicitly.
func (*RaftLockServerBuilder) WithHealthCheck ¶
func (b *RaftLockServerBuilder) WithHealthCheck( interval, timeout time.Duration, ) *RaftLockServerBuilder
WithHealthCheck sets the health check interval and timeout. Values <= 0 use the default values.
func (*RaftLockServerBuilder) WithLeaderRedirect ¶
func (b *RaftLockServerBuilder) WithLeaderRedirect(enabled bool) *RaftLockServerBuilder
WithLeaderRedirect enables or disables automatic leader redirection.
func (*RaftLockServerBuilder) WithLimits ¶
func (b *RaftLockServerBuilder) WithLimits( maxRequestSize, maxResponseSize, maxConcurrentReqs int, ) *RaftLockServerBuilder
WithLimits sets request size and concurrency limits. Values <= 0 leave the defaults unchanged.
func (*RaftLockServerBuilder) WithListenAddress ¶
func (b *RaftLockServerBuilder) WithListenAddress(address string) *RaftLockServerBuilder
WithListenAddress sets the gRPC server's listening address. If not set, a default address is used.
func (*RaftLockServerBuilder) WithLogger ¶
func (b *RaftLockServerBuilder) WithLogger(logger logger.Logger) *RaftLockServerBuilder
WithLogger sets the server logger. If nil, a no-op logger is used.
func (*RaftLockServerBuilder) WithMetrics ¶
func (b *RaftLockServerBuilder) WithMetrics(metrics ServerMetrics) *RaftLockServerBuilder
WithMetrics sets the metrics collector. If nil, a no-op implementation is used.
func (*RaftLockServerBuilder) WithNodeID ¶
func (b *RaftLockServerBuilder) WithNodeID(nodeID types.NodeID) *RaftLockServerBuilder
WithNodeID sets the Raft node's unique identifier. This must be set explicitly.
func (*RaftLockServerBuilder) WithPeers ¶
func (b *RaftLockServerBuilder) WithPeers( peers map[types.NodeID]raft.PeerConfig, ) *RaftLockServerBuilder
WithPeers sets the Raft cluster configuration. The map must include an entry for this node's NodeID. This must be set explicitly.
func (*RaftLockServerBuilder) WithRaftConfig ¶
func (b *RaftLockServerBuilder) WithRaftConfig(raftConfig raft.Config) *RaftLockServerBuilder
WithRaftConfig overrides the default Raft configuration. RaftConfig.ID and RaftConfig.Peers should match NodeID and Peers in ServerConfig.
func (*RaftLockServerBuilder) WithRateLimit ¶
func (b *RaftLockServerBuilder) WithRateLimit( enabled bool, rateLimit, burst int, window time.Duration, ) *RaftLockServerBuilder
WithRateLimit configures rate limiting. Values <= 0 use the default if rate limiting is enabled.
func (*RaftLockServerBuilder) WithSerializer ¶
func (b *RaftLockServerBuilder) WithSerializer(serializer lock.Serializer) *RaftLockServerBuilder
WithSerializer sets the lock.Serializer used for encoding/decoding lock commands. If nil, the default JSON serializer is used.
func (*RaftLockServerBuilder) WithTimeouts ¶
func (b *RaftLockServerBuilder) WithTimeouts( requestTimeout, shutdownTimeout, redirectTimeout time.Duration, ) *RaftLockServerBuilder
WithTimeouts sets timeouts for request handling, shutdown, and leader redirection. Values <= 0 leave the defaults unchanged.
type RaftLockServerConfig ¶
type RaftLockServerConfig struct {
// NodeID uniquely identifies this node in the Raft cluster.
NodeID types.NodeID
// ListenAddress is the gRPC server's bind address (e.g., "0.0.0.0:8080").
ListenAddress string
// ClientAPIAddress is the network address exposed to external clients
// for accessing the public API (e.g., "127.0.0.1:9090").
ClientAPIAddress string
// Peers maps all known Raft nodes by NodeID, including this node itself.
Peers map[types.NodeID]raft.PeerConfig
// RaftConfig holds the Raft protocol-specific configuration.
RaftConfig raft.Config
// DataDir is the path to store Raft state (logs, snapshots, etc.).
DataDir string
RequestTimeout time.Duration // Max time to handle a client request
ShutdownTimeout time.Duration // Max time allowed for graceful shutdown
MaxRequestSize int // Maximum size of incoming requests (in bytes)
MaxResponseSize int // Maximum size of outgoing responses (in bytes)
MaxConcurrentReqs int // Max number of requests processed in parallel
EnableRateLimit bool // Whether rate limiting is enforced
RateLimit int // Requests per second allowed per client
RateLimitBurst int // Burst capacity for client requests
RateLimitWindow time.Duration // Time window used for rate calculation
Logger logger.Logger
Metrics ServerMetrics
Clock raft.Clock
Serializer lock.Serializer
HealthCheckInterval time.Duration // Frequency of internal health checks
HealthCheckTimeout time.Duration // Timeout for individual health checks
EnableLeaderRedirect bool // Redirect write requests to leader if not one
RedirectTimeout time.Duration // Timeout when attempting leader redirection
}
RaftLockServerConfig holds the configuration settings for a RaftLock server instance.
func DefaultRaftLockServerConfig ¶
func DefaultRaftLockServerConfig() RaftLockServerConfig
DefaultRaftLockServerConfig returns a ServerConfig pre-populated with safe defaults. Callers must explicitly set NodeID, Peers, DataDir, and RaftConfig.ID.
func (*RaftLockServerConfig) Validate ¶
func (c *RaftLockServerConfig) Validate() error
Validate checks if the server configuration is valid.
type RaftLockServerConfigError ¶
type RaftLockServerConfigError struct {
Message string
}
RaftLockServerConfigError represents a validation error in ServerConfig.
func NewRaftLockServerConfigError ¶
func NewRaftLockServerConfigError(msg string) *RaftLockServerConfigError
NewRaftLockServerConfigError returns a new ConfigError instance.
func (*RaftLockServerConfigError) Error ¶
func (e *RaftLockServerConfigError) Error() string
Error implements the error interface.
type RateLimiter ¶
RateLimiter defines the interface for request rate limiting.
type RequestValidator ¶
type RequestValidator interface {
// ValidateAcquireRequest validates an AcquireRequest.
ValidateAcquireRequest(req *pb.AcquireRequest) error
// ValidateReleaseRequest validates a ReleaseRequest.
ValidateReleaseRequest(req *pb.ReleaseRequest) error
// ValidateRenewRequest validates a RenewRequest.
ValidateRenewRequest(req *pb.RenewRequest) error
// ValidateGetLockInfoRequest validates a GetLockInfoRequest.
ValidateGetLockInfoRequest(req *pb.GetLockInfoRequest) error
// ValidateGetLocksRequest validates a GetLocksRequest.
ValidateGetLocksRequest(req *pb.GetLocksRequest) error
// ValidateEnqueueWaiterRequest validates an EnqueueWaiterRequest.
ValidateEnqueueWaiterRequest(req *pb.EnqueueWaiterRequest) error
// ValidateCancelWaitRequest validates a CancelWaitRequest.
ValidateCancelWaitRequest(req *pb.CancelWaitRequest) error
// ValidateBackoffAdviceRequest validates a BackoffAdviceRequest.
ValidateBackoffAdviceRequest(req *pb.BackoffAdviceRequest) error
// ValidateGetStatusRequest validates a GetStatusRequest.
ValidateGetStatusRequest(req *pb.GetStatusRequest) error
// ValidateHealthRequest validates a HealthRequest.
ValidateHealthRequest(req *pb.HealthRequest) error
}
RequestValidator defines the interface for validating incoming gRPC requests to the RaftLock server. Each method should return an error if the request is invalid.
func NewRequestValidator ¶
func NewRequestValidator(logger logger.Logger) RequestValidator
NewRequestValidator creates a new default request validator.
type ServerError ¶
type ServerError struct {
Operation string // The operation being performed when the error occurred.
Cause error // The underlying error, if any.
Message string // A high-level message describing the server error.
}
ServerError represents a generic internal server error, potentially wrapping an underlying cause.
func NewServerError ¶
func NewServerError(operation string, cause error, message string) *ServerError
NewServerError creates a new ServerError.
func (*ServerError) Error ¶
func (e *ServerError) Error() string
Error implements the error interface, providing context about the operation and cause.
func (*ServerError) Unwrap ¶
func (e *ServerError) Unwrap() error
Unwrap provides compatibility with Go's errors.Is and errors.As by returning the cause.
type ServerMetrics ¶
type ServerMetrics interface {
// IncrGRPCRequest increments the count for an RPC method invocation.
// 'method' should match a RaftLock RPC (e.g., "Acquire", "Release").
// 'success' should reflect overall success from the client’s perspective.
IncrGRPCRequest(method string, success bool)
// IncrLeaderRedirect increments the count of requests redirected to the current leader.
// Typically applies to write operations when the node is not the Raft leader.
IncrLeaderRedirect(method string)
// IncrRetry increments a counter when a client retries a failed or redirected request.
IncrRetry(method string)
// IncrRaftProposal increments proposal submission counters.
// 'operation' should reflect the type of lock operation (Acquire, Release, Renew).
// 'success' indicates whether the proposal was committed by Raft.
IncrRaftProposal(operation types.LockOperation, success bool)
// IncrValidationError increments validation failure counters.
// 'method' is the RPC where validation failed.
// 'errorType' is a string like "invalid_ttl", "missing_field", or "bad_lock_id".
IncrValidationError(method string, errorType string)
// IncrClientError increments counts for errors caused by invalid client input or actions.
// This typically maps to RaftLock ErrorCodes in the 100–199 or 300–399 range.
IncrClientError(method string, errorCode pb.ErrorCode)
// IncrServerError increments counts for internal server-side errors.
// 'errorType' might include values like "internal_error", "raft_unavailable", "timeout".
IncrServerError(method string, errorType string)
// IncrQueueOverflow increments a counter when a queue exceeds its configured capacity.
IncrQueueOverflow(queueType string)
// IncrLockExpiration increments a counter when a lock expires due to TTL.
IncrLockExpiration()
// ObserveRequestLatency records end-to-end latency for a gRPC method call.
// This includes validation, Raft proposal (if applicable), and response formatting.
ObserveRequestLatency(method string, latency time.Duration)
// ObserveRaftProposalLatency records the latency from submitting a Raft proposal
// to it being either committed or rejected.
ObserveRaftProposalLatency(operation types.LockOperation, latency time.Duration)
// ObserveQueueLength tracks the current size of internal queues (e.g., gRPC, Raft).
// Typical queue types: "grpc_requests", "raft_proposals", "wait_queue".
ObserveQueueLength(queueType string, length int)
// ObserveRequestSize records the raw size of incoming gRPC requests in bytes.
ObserveRequestSize(method string, sizeBytes int)
// ObserveResponseSize records the raw size of outgoing gRPC responses in bytes.
ObserveResponseSize(method string, sizeBytes int)
// IncrConcurrentRequests adjusts the count of concurrently active requests.
// Use delta +1 at request start, -1 when completed.
IncrConcurrentRequests(method string, delta int)
// IncrHealthCheck increments the count of health check invocations.
// 'healthy' reflects the result of the check.
IncrHealthCheck(healthy bool)
// SetServerState sets leadership and health state gauges.
// 'isLeader' reflects current Raft role; 'isHealthy' reflects readiness for requests.
SetServerState(isLeader bool, isHealthy bool)
// SetActiveConnections sets the number of live gRPC connections to this server.
SetActiveConnections(count int)
// SetRaftTerm sets the current Raft term for this node, useful for tracking elections.
SetRaftTerm(term types.Term)
// SetRaftCommitIndex sets the commit index last acknowledged by this Raft node.
SetRaftCommitIndex(index types.Index)
// Reset clears all metric counters and resets gauges.
// Useful primarily in unit or integration tests.
Reset()
}
ServerMetrics defines observability hooks for RaftLock server operations. Metrics cover gRPC lifecycle, Raft proposals, validation, error types, and system health. All methods must be safe for concurrent use.
func NewNoOpServerMetrics ¶
func NewNoOpServerMetrics() ServerMetrics
NewNoOpServerMetrics creates a new no-operation metrics implementation.
type ServerOperationalState ¶
type ServerOperationalState string
ServerOperationalState defines the possible operational states of the server.
const ( // ServerStateStarting indicates the server is in the process of starting up. ServerStateStarting ServerOperationalState = "starting" // ServerStateRunning indicates the server is running and accepting requests. ServerStateRunning ServerOperationalState = "running" // ServerStateStopping indicates the server is in the process of shutting down. ServerStateStopping ServerOperationalState = "stopping" // ServerStateStopped indicates the server has been stopped. ServerStateStopped ServerOperationalState = "stopped" )
type TokenBucketRateLimiter ¶
type TokenBucketRateLimiter struct {
// contains filtered or unexported fields
}
TokenBucketRateLimiter implements rate limiting using a token bucket algorithm.
func NewTokenBucketRateLimiter ¶
func NewTokenBucketRateLimiter( maxRequests, burst int, window time.Duration, logger logger.Logger, ) *TokenBucketRateLimiter
NewTokenBucketRateLimiter creates a new token bucket rate limiter.
func (*TokenBucketRateLimiter) Allow ¶
func (rl *TokenBucketRateLimiter) Allow() bool
Allow returns true if a request can proceed immediately.
type ValidationError ¶
type ValidationError struct {
Field string // The name of the field that failed validation.
Value any // The value of the field that caused the error.
Message string // A descriptive message explaining the validation failure.
}
ValidationError represents a request validation error with details about the specific field.
func NewValidationError ¶
func NewValidationError(field string, value any, message string) *ValidationError
NewValidationError creates a new ValidationError.
func (*ValidationError) Error ¶
func (e *ValidationError) Error() string
Error implements the error interface, providing a structured validation error message.