Documentation
¶
Overview ¶
Package resilience provides cross-process state management for long-running CLI operations. It enables resumable operations by persisting state to disk with proper file locking for safe concurrent access.
Index ¶
- Constants
- type Bulkhead
- type BulkheadConfig
- type BulkheadState
- type CircuitBreaker
- type CircuitBreakerConfig
- type CircuitBreakerState
- type Config
- type GatingHooks
- func (h *GatingHooks) OnOperationEnd(ctx context.Context, op basecamp.OperationInfo, err error, ...)
- func (h *GatingHooks) OnOperationGate(ctx context.Context, op basecamp.OperationInfo) (context.Context, error)
- func (h *GatingHooks) OnOperationStart(ctx context.Context, op basecamp.OperationInfo) context.Context
- func (h *GatingHooks) OnRequestEnd(ctx context.Context, info basecamp.RequestInfo, result basecamp.RequestResult)
- func (h *GatingHooks) OnRequestStart(ctx context.Context, info basecamp.RequestInfo) context.Context
- func (h *GatingHooks) OnRetry(ctx context.Context, info basecamp.RequestInfo, attempt int, err error)
- type RateLimiter
- func (rl *RateLimiter) Allow() (bool, error)
- func (rl *RateLimiter) Reset() error
- func (rl *RateLimiter) RetryAfterRemaining() (time.Duration, error)
- func (rl *RateLimiter) SetRetryAfter(until time.Time) error
- func (rl *RateLimiter) SetRetryAfterDuration(d time.Duration) error
- func (rl *RateLimiter) Tokens() (float64, error)
- type RateLimiterConfig
- type RateLimiterState
- type State
- type Store
Constants ¶
const ( CircuitClosed = "closed" CircuitOpen = "open" CircuitHalfOpen = "half_open" )
Circuit breaker state constants.
const ( // StateFileName is the default state file name. StateFileName = "state.json" // DefaultDirName is the subdirectory within the cache dir. DefaultDirName = "resilience" )
const LockTimeout = 100 * time.Millisecond
LockTimeout is the maximum time to wait for acquiring the file lock. If exceeded, operations proceed without locking (fail-open) to avoid CLI hangs.
const (
// StateVersion is the current state schema version.
StateVersion = 1
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Bulkhead ¶
type Bulkhead struct {
// contains filtered or unexported fields
}
Bulkhead implements the bulkhead pattern with cross-process persistence. It uses PID-based tracking to limit concurrent operations across CLI invocations.
func NewBulkhead ¶
func NewBulkhead(store *Store, config BulkheadConfig) *Bulkhead
NewBulkhead creates a new bulkhead with the given config.
func (*Bulkhead) Acquire ¶
Acquire tries to acquire a slot in the bulkhead. Returns true if the slot was acquired, false if the bulkhead is full.
func (*Bulkhead) Available ¶
Available returns the number of available slots. Returns a value in [0, MaxConcurrent] even if Count exceeds MaxConcurrent (possible under fail-open or config changes).
func (*Bulkhead) ForceCleanup ¶
ForceCleanup performs a cleanup of stale slots without acquiring.
type BulkheadConfig ¶
type BulkheadConfig struct {
// MaxConcurrent is the maximum number of concurrent requests across all processes.
// Default: 10
MaxConcurrent int
}
BulkheadConfig configures the bulkhead pattern for concurrent request limiting.
func (BulkheadConfig) WithMaxConcurrent ¶
func (bh BulkheadConfig) WithMaxConcurrent(n int) BulkheadConfig
WithMaxConcurrent sets the maximum concurrent requests for the bulkhead.
type BulkheadState ¶
type BulkheadState struct {
// ActivePIDs is the list of process IDs currently holding permits.
// Stale/dead PIDs are cleaned up by bulkhead operations, not when the state is loaded.
ActivePIDs []int `json:"active_pids"`
}
BulkheadState tracks concurrent request limiting across processes. The bulkhead pattern limits concurrent requests to prevent resource exhaustion.
func (*BulkheadState) AddPID ¶
func (b *BulkheadState) AddPID(pid int)
AddPID adds a PID to the active list if not already present.
func (*BulkheadState) Count ¶
func (b *BulkheadState) Count() int
Count returns the number of active permits.
func (*BulkheadState) HasPID ¶
func (b *BulkheadState) HasPID(pid int) bool
HasPID returns true if the given PID holds a permit.
func (*BulkheadState) RemovePID ¶
func (b *BulkheadState) RemovePID(pid int)
RemovePID removes a PID from the active list.
type CircuitBreaker ¶
type CircuitBreaker struct {
// contains filtered or unexported fields
}
CircuitBreaker implements the circuit breaker pattern with cross-process persistence.
func NewCircuitBreaker ¶
func NewCircuitBreaker(store *Store, config CircuitBreakerConfig) *CircuitBreaker
NewCircuitBreaker creates a new circuit breaker with the given config.
func (*CircuitBreaker) Allow ¶
func (cb *CircuitBreaker) Allow() (bool, error)
Allow checks if a request should be allowed. Returns true if the request can proceed, false if it should be rejected. In half-open state, atomically reserves an attempt slot to prevent thundering herd.
Optimization: closed state only reads (no disk write), open/half-open may write.
Stale attempt cleanup: If half-open attempts are stuck at max (e.g., from a crashed process that never called RecordSuccess/RecordFailure), they are automatically reset after OpenTimeout. This prevents permanent blocking from process crashes.
func (*CircuitBreaker) RecordFailure ¶
func (cb *CircuitBreaker) RecordFailure() error
RecordFailure records a failed request.
func (*CircuitBreaker) RecordSuccess ¶
func (cb *CircuitBreaker) RecordSuccess() error
RecordSuccess records a successful request.
func (*CircuitBreaker) Reset ¶
func (cb *CircuitBreaker) Reset() error
Reset resets the circuit breaker to closed state.
func (*CircuitBreaker) State ¶
func (cb *CircuitBreaker) State() (string, error)
State returns the current circuit breaker state.
type CircuitBreakerConfig ¶
type CircuitBreakerConfig struct {
// FailureThreshold is the number of consecutive failures before opening.
// Default: 5
FailureThreshold int
// SuccessThreshold is the number of consecutive successes in half-open
// state before closing the circuit.
// Default: 2
SuccessThreshold int
// OpenTimeout is how long to wait before transitioning from open to half-open.
// Default: 30 seconds
OpenTimeout time.Duration
// HalfOpenMaxRequests is the max concurrent requests allowed in half-open state.
// Default: 1
HalfOpenMaxRequests int
// StaleAttemptTimeout is how long to wait before considering half-open attempts
// as stale (from crashed processes). This should be longer than the expected
// duration of slow/large operations to avoid resetting legitimate in-flight requests.
// Default: 2 minutes (4x OpenTimeout)
StaleAttemptTimeout time.Duration
}
CircuitBreakerConfig configures the circuit breaker pattern.
func (CircuitBreakerConfig) WithFailureThreshold ¶
func (cb CircuitBreakerConfig) WithFailureThreshold(n int) CircuitBreakerConfig
WithFailureThreshold sets the failure threshold for the circuit breaker.
func (CircuitBreakerConfig) WithOpenTimeout ¶
func (cb CircuitBreakerConfig) WithOpenTimeout(d time.Duration) CircuitBreakerConfig
WithOpenTimeout sets the open timeout for the circuit breaker.
func (CircuitBreakerConfig) WithSuccessThreshold ¶
func (cb CircuitBreakerConfig) WithSuccessThreshold(n int) CircuitBreakerConfig
WithSuccessThreshold sets the success threshold for the circuit breaker.
type CircuitBreakerState ¶
type CircuitBreakerState struct {
// State is the current circuit state: "closed", "open", or "half_open".
// - closed: normal operation, requests flow through
// - open: circuit tripped, requests fail fast
// - half_open: testing if service recovered, limited requests allowed
State string `json:"state"`
// Failures is the count of consecutive failures in the current window.
Failures int `json:"failures"`
// Successes is the count of consecutive successes (used in half_open state).
Successes int `json:"successes"`
// HalfOpenAttempts tracks in-flight requests during half-open state.
// Incremented atomically when a request is allowed, decremented on completion.
// Used to enforce HalfOpenMaxRequests limit across concurrent processes.
HalfOpenAttempts int `json:"half_open_attempts,omitempty"`
// HalfOpenLastAttemptAt is when the last half-open attempt was reserved.
// Used to detect stale attempts from crashed processes that never completed.
HalfOpenLastAttemptAt time.Time `json:"half_open_last_attempt_at"`
// LastFailureAt is when the most recent failure occurred.
LastFailureAt time.Time `json:"last_failure_at"`
// OpenedAt is when the circuit transitioned to open state.
OpenedAt time.Time `json:"opened_at"`
}
CircuitBreakerState tracks the circuit breaker pattern state. The circuit breaker prevents cascading failures by stopping requests when the error rate exceeds a threshold.
func (*CircuitBreakerState) IsClosed ¶
func (c *CircuitBreakerState) IsClosed() bool
IsClosed returns true if the circuit is in closed (normal) state.
func (*CircuitBreakerState) IsHalfOpen ¶
func (c *CircuitBreakerState) IsHalfOpen() bool
IsHalfOpen returns true if the circuit is in half-open (testing) state.
func (*CircuitBreakerState) IsOpen ¶
func (c *CircuitBreakerState) IsOpen() bool
IsOpen returns true if the circuit is open (failing fast).
type Config ¶
type Config struct {
// CircuitBreaker configures the circuit breaker pattern.
CircuitBreaker CircuitBreakerConfig
// RateLimiter configures the token bucket rate limiter.
RateLimiter RateLimiterConfig
// Bulkhead configures concurrent request limiting.
Bulkhead BulkheadConfig
}
Config holds configuration for all resilience primitives.
func DefaultConfig ¶
func DefaultConfig() *Config
DefaultConfig returns a Config with sensible defaults for the Basecamp API.
func (*Config) WithBulkhead ¶
func (c *Config) WithBulkhead(bh BulkheadConfig) *Config
WithBulkhead returns a copy of the config with custom bulkhead settings.
func (*Config) WithCircuitBreaker ¶
func (c *Config) WithCircuitBreaker(cb CircuitBreakerConfig) *Config
WithCircuitBreaker returns a copy of the config with custom circuit breaker settings.
func (*Config) WithRateLimiter ¶
func (c *Config) WithRateLimiter(rl RateLimiterConfig) *Config
WithRateLimiter returns a copy of the config with custom rate limiter settings.
type GatingHooks ¶
type GatingHooks struct {
// contains filtered or unexported fields
}
GatingHooks implements basecamp.GatingHooks to provide resilience patterns for SDK operations. It gates requests through circuit breaker, rate limiter, and bulkhead before they execute.
func NewGatingHooks ¶
func NewGatingHooks(cb *CircuitBreaker, rl *RateLimiter, bh *Bulkhead) *GatingHooks
NewGatingHooks creates a new GatingHooks with the given primitives.
func NewGatingHooksFromConfig ¶
func NewGatingHooksFromConfig(store *Store, cfg *Config) *GatingHooks
NewGatingHooksFromConfig creates a GatingHooks using the provided config and store.
func (*GatingHooks) OnOperationEnd ¶
func (h *GatingHooks) OnOperationEnd(ctx context.Context, op basecamp.OperationInfo, err error, duration time.Duration)
OnOperationEnd is called when a semantic SDK operation completes. It releases the bulkhead slot and records success/failure for circuit breaker.
func (*GatingHooks) OnOperationGate ¶
func (h *GatingHooks) OnOperationGate(ctx context.Context, op basecamp.OperationInfo) (context.Context, error)
OnOperationGate is called before OnOperationStart. It checks rate limiter, bulkhead, and circuit breaker before allowing the operation to proceed.
Gate order is important: rate limiter and bulkhead are checked BEFORE circuit breaker because the circuit breaker reserves a half-open slot atomically. If we checked circuit breaker first and then rate limiter rejected, the half-open slot would leak (never released).
Tradeoff: This ordering means rate limiter tokens are consumed even if bulkhead is full or circuit is open. This is acceptable for a CLI tool where occasional token waste is preferable to half-open slot leaks.
Returns a context that should be used for the operation and an error if the operation should be rejected.
func (*GatingHooks) OnOperationStart ¶
func (h *GatingHooks) OnOperationStart(ctx context.Context, op basecamp.OperationInfo) context.Context
OnOperationStart is called when a semantic SDK operation begins.
func (*GatingHooks) OnRequestEnd ¶
func (h *GatingHooks) OnRequestEnd(ctx context.Context, info basecamp.RequestInfo, result basecamp.RequestResult)
OnRequestEnd is called after an HTTP request completes. It honors Retry-After headers from 429/503 responses to back off the rate limiter.
func (*GatingHooks) OnRequestStart ¶
func (h *GatingHooks) OnRequestStart(ctx context.Context, info basecamp.RequestInfo) context.Context
OnRequestStart is called before an HTTP request is sent.
func (*GatingHooks) OnRetry ¶
func (h *GatingHooks) OnRetry(ctx context.Context, info basecamp.RequestInfo, attempt int, err error)
OnRetry is called before a retry attempt.
type RateLimiter ¶
type RateLimiter struct {
// contains filtered or unexported fields
}
RateLimiter implements the token bucket algorithm with cross-process persistence.
func NewRateLimiter ¶
func NewRateLimiter(store *Store, config RateLimiterConfig) *RateLimiter
NewRateLimiter creates a new rate limiter with the given config.
func (*RateLimiter) Allow ¶
func (rl *RateLimiter) Allow() (bool, error)
Allow checks if a request is allowed. Returns true if the request can proceed, false if it should be rejected. On success, consumes tokens from the bucket.
func (*RateLimiter) Reset ¶
func (rl *RateLimiter) Reset() error
Reset resets the rate limiter to a full bucket.
func (*RateLimiter) RetryAfterRemaining ¶
func (rl *RateLimiter) RetryAfterRemaining() (time.Duration, error)
RetryAfterRemaining returns the remaining duration of the Retry-After block, or 0 if there is no active block.
func (*RateLimiter) SetRetryAfter ¶
func (rl *RateLimiter) SetRetryAfter(until time.Time) error
SetRetryAfter sets a block until the given time due to a 429 response.
func (*RateLimiter) SetRetryAfterDuration ¶
func (rl *RateLimiter) SetRetryAfterDuration(d time.Duration) error
SetRetryAfterDuration sets a block for the given duration.
func (*RateLimiter) Tokens ¶
func (rl *RateLimiter) Tokens() (float64, error)
Tokens returns the current number of available tokens. This also persists any initialization or refill that occurs.
type RateLimiterConfig ¶
type RateLimiterConfig struct {
// MaxTokens is the maximum number of tokens in the bucket.
// Default: 50
MaxTokens float64
// RefillRate is how many tokens are added per second.
// Default: 10
RefillRate float64
// TokensPerRequest is how many tokens each request consumes.
// Default: 1
TokensPerRequest float64
}
RateLimiterConfig configures the token bucket rate limiter.
func (RateLimiterConfig) WithMaxTokens ¶
func (rl RateLimiterConfig) WithMaxTokens(n float64) RateLimiterConfig
WithMaxTokens sets the maximum tokens for the rate limiter.
func (RateLimiterConfig) WithRefillRate ¶
func (rl RateLimiterConfig) WithRefillRate(n float64) RateLimiterConfig
WithRefillRate sets the refill rate for the rate limiter.
type RateLimiterState ¶
type RateLimiterState struct {
// Tokens is the current number of available tokens.
Tokens float64 `json:"tokens"`
// LastRefillAt is when tokens were last refilled.
LastRefillAt time.Time `json:"last_refill_at"`
// RetryAfterUntil is set when we receive a 429 with Retry-After header.
// No requests should be made until this time passes.
RetryAfterUntil time.Time `json:"retry_after_until"`
}
RateLimiterState tracks the token bucket rate limiter state. Uses a token bucket algorithm that refills over time.
func (*RateLimiterState) BlockedFor ¶
func (r *RateLimiterState) BlockedFor() time.Duration
BlockedFor returns how long until the Retry-After window expires. Returns zero if not blocked.
func (*RateLimiterState) IsBlocked ¶
func (r *RateLimiterState) IsBlocked() bool
IsBlocked returns true if we're within a Retry-After window.
type State ¶
type State struct {
// Version is the schema version for future migrations.
Version int `json:"version"`
// CircuitBreaker tracks the circuit breaker state.
CircuitBreaker CircuitBreakerState `json:"circuit_breaker"`
// RateLimiter tracks the token bucket state.
RateLimiter RateLimiterState `json:"rate_limiter"`
// Bulkhead tracks concurrent request limiting.
Bulkhead BulkheadState `json:"bulkhead"`
// UpdatedAt is when the state was last modified.
UpdatedAt time.Time `json:"updated_at"`
}
State represents the persisted resilience state shared across CLI processes. This enables circuit breaker, rate limiter, and bulkhead patterns to coordinate across concurrent basecamp invocations.
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store handles reading and writing resilience state with file locking. It provides atomic operations safe for concurrent access across processes.
func NewStore ¶
NewStore creates a new resilience state store. If dir is empty, it uses the default location (~/.cache/basecamp/resilience/).
func (*Store) Clear ¶
Clear removes the state file. If the lock cannot be acquired, proceeds without locking (fail-open).
func (*Store) Load ¶
Load reads the state from disk with proper locking. Returns an empty state if the file doesn't exist. If the lock cannot be acquired, proceeds without locking (fail-open).
func (*Store) Save ¶
Save writes the state to disk atomically with proper locking. If the lock cannot be acquired, proceeds without locking (fail-open).
func (*Store) Update ¶
Update atomically loads, modifies, and saves the state. The updateFn receives the current state and should modify it in place. This is the preferred way to update state as it holds the lock throughout the entire read-modify-write cycle. If the lock cannot be acquired, proceeds without locking (fail-open).