resilience

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 1, 2026 License: MIT Imports: 11 Imported by: 0

Documentation

Overview

Package resilience provides cross-process state management for long-running CLI operations. It enables resumable operations by persisting state to disk with proper file locking for safe concurrent access.

Index

Constants

View Source
const (
	CircuitClosed   = "closed"
	CircuitOpen     = "open"
	CircuitHalfOpen = "half_open"
)

Circuit breaker state constants.

View Source
const (
	// StateFileName is the default state file name.
	StateFileName = "state.json"

	// DefaultDirName is the subdirectory within the cache dir.
	DefaultDirName = "resilience"
)
View Source
const LockTimeout = 100 * time.Millisecond

LockTimeout is the maximum time to wait for acquiring the file lock. If exceeded, operations proceed without locking (fail-open) to avoid CLI hangs.

View Source
const (
	// StateVersion is the current state schema version.
	StateVersion = 1
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Bulkhead

type Bulkhead struct {
	// contains filtered or unexported fields
}

Bulkhead implements the bulkhead pattern with cross-process persistence. It uses PID-based tracking to limit concurrent operations across CLI invocations.

func NewBulkhead

func NewBulkhead(store *Store, config BulkheadConfig) *Bulkhead

NewBulkhead creates a new bulkhead with the given config.

func (*Bulkhead) Acquire

func (b *Bulkhead) Acquire() (bool, error)

Acquire tries to acquire a slot in the bulkhead. Returns true if the slot was acquired, false if the bulkhead is full.

func (*Bulkhead) Available

func (b *Bulkhead) Available() (int, error)

Available returns the number of available slots. Returns a value in [0, MaxConcurrent] even if Count exceeds MaxConcurrent (possible under fail-open or config changes).

func (*Bulkhead) ForceCleanup

func (b *Bulkhead) ForceCleanup() error

ForceCleanup performs a cleanup of stale slots without acquiring.

func (*Bulkhead) InUse

func (b *Bulkhead) InUse() (int, error)

InUse returns the number of slots currently in use.

func (*Bulkhead) Release

func (b *Bulkhead) Release() error

Release releases the slot held by this process.

func (*Bulkhead) Reset

func (b *Bulkhead) Reset() error

Reset clears all slots (useful for cleanup or testing).

type BulkheadConfig

type BulkheadConfig struct {
	// MaxConcurrent is the maximum number of concurrent requests across all processes.
	// Default: 10
	MaxConcurrent int
}

BulkheadConfig configures the bulkhead pattern for concurrent request limiting.

func (BulkheadConfig) WithMaxConcurrent

func (bh BulkheadConfig) WithMaxConcurrent(n int) BulkheadConfig

WithMaxConcurrent sets the maximum concurrent requests for the bulkhead.

type BulkheadState

type BulkheadState struct {
	// ActivePIDs is the list of process IDs currently holding permits.
	// Stale/dead PIDs are cleaned up by bulkhead operations, not when the state is loaded.
	ActivePIDs []int `json:"active_pids"`
}

BulkheadState tracks concurrent request limiting across processes. The bulkhead pattern limits concurrent requests to prevent resource exhaustion.

func (*BulkheadState) AddPID

func (b *BulkheadState) AddPID(pid int)

AddPID adds a PID to the active list if not already present.

func (*BulkheadState) Count

func (b *BulkheadState) Count() int

Count returns the number of active permits.

func (*BulkheadState) HasPID

func (b *BulkheadState) HasPID(pid int) bool

HasPID returns true if the given PID holds a permit.

func (*BulkheadState) RemovePID

func (b *BulkheadState) RemovePID(pid int)

RemovePID removes a PID from the active list.

type CircuitBreaker

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

CircuitBreaker implements the circuit breaker pattern with cross-process persistence.

func NewCircuitBreaker

func NewCircuitBreaker(store *Store, config CircuitBreakerConfig) *CircuitBreaker

NewCircuitBreaker creates a new circuit breaker with the given config.

func (*CircuitBreaker) Allow

func (cb *CircuitBreaker) Allow() (bool, error)

Allow checks if a request should be allowed. Returns true if the request can proceed, false if it should be rejected. In half-open state, atomically reserves an attempt slot to prevent thundering herd.

Optimization: closed state only reads (no disk write), open/half-open may write.

Stale attempt cleanup: If half-open attempts are stuck at max (e.g., from a crashed process that never called RecordSuccess/RecordFailure), they are automatically reset after OpenTimeout. This prevents permanent blocking from process crashes.

func (*CircuitBreaker) RecordFailure

func (cb *CircuitBreaker) RecordFailure() error

RecordFailure records a failed request.

func (*CircuitBreaker) RecordSuccess

func (cb *CircuitBreaker) RecordSuccess() error

RecordSuccess records a successful request.

func (*CircuitBreaker) Reset

func (cb *CircuitBreaker) Reset() error

Reset resets the circuit breaker to closed state.

func (*CircuitBreaker) State

func (cb *CircuitBreaker) State() (string, error)

State returns the current circuit breaker state.

type CircuitBreakerConfig

type CircuitBreakerConfig struct {
	// FailureThreshold is the number of consecutive failures before opening.
	// Default: 5
	FailureThreshold int

	// SuccessThreshold is the number of consecutive successes in half-open
	// state before closing the circuit.
	// Default: 2
	SuccessThreshold int

	// OpenTimeout is how long to wait before transitioning from open to half-open.
	// Default: 30 seconds
	OpenTimeout time.Duration

	// HalfOpenMaxRequests is the max concurrent requests allowed in half-open state.
	// Default: 1
	HalfOpenMaxRequests int

	// StaleAttemptTimeout is how long to wait before considering half-open attempts
	// as stale (from crashed processes). This should be longer than the expected
	// duration of slow/large operations to avoid resetting legitimate in-flight requests.
	// Default: 2 minutes (4x OpenTimeout)
	StaleAttemptTimeout time.Duration
}

CircuitBreakerConfig configures the circuit breaker pattern.

func (CircuitBreakerConfig) WithFailureThreshold

func (cb CircuitBreakerConfig) WithFailureThreshold(n int) CircuitBreakerConfig

WithFailureThreshold sets the failure threshold for the circuit breaker.

func (CircuitBreakerConfig) WithOpenTimeout

WithOpenTimeout sets the open timeout for the circuit breaker.

func (CircuitBreakerConfig) WithSuccessThreshold

func (cb CircuitBreakerConfig) WithSuccessThreshold(n int) CircuitBreakerConfig

WithSuccessThreshold sets the success threshold for the circuit breaker.

type CircuitBreakerState

type CircuitBreakerState struct {
	// State is the current circuit state: "closed", "open", or "half_open".
	// - closed: normal operation, requests flow through
	// - open: circuit tripped, requests fail fast
	// - half_open: testing if service recovered, limited requests allowed
	State string `json:"state"`

	// Failures is the count of consecutive failures in the current window.
	Failures int `json:"failures"`

	// Successes is the count of consecutive successes (used in half_open state).
	Successes int `json:"successes"`

	// HalfOpenAttempts tracks in-flight requests during half-open state.
	// Incremented atomically when a request is allowed, decremented on completion.
	// Used to enforce HalfOpenMaxRequests limit across concurrent processes.
	HalfOpenAttempts int `json:"half_open_attempts,omitempty"`

	// HalfOpenLastAttemptAt is when the last half-open attempt was reserved.
	// Used to detect stale attempts from crashed processes that never completed.
	HalfOpenLastAttemptAt time.Time `json:"half_open_last_attempt_at"`

	// LastFailureAt is when the most recent failure occurred.
	LastFailureAt time.Time `json:"last_failure_at"`

	// OpenedAt is when the circuit transitioned to open state.
	OpenedAt time.Time `json:"opened_at"`
}

CircuitBreakerState tracks the circuit breaker pattern state. The circuit breaker prevents cascading failures by stopping requests when the error rate exceeds a threshold.

func (*CircuitBreakerState) IsClosed

func (c *CircuitBreakerState) IsClosed() bool

IsClosed returns true if the circuit is in closed (normal) state.

func (*CircuitBreakerState) IsHalfOpen

func (c *CircuitBreakerState) IsHalfOpen() bool

IsHalfOpen returns true if the circuit is in half-open (testing) state.

func (*CircuitBreakerState) IsOpen

func (c *CircuitBreakerState) IsOpen() bool

IsOpen returns true if the circuit is open (failing fast).

type Config

type Config struct {
	// CircuitBreaker configures the circuit breaker pattern.
	CircuitBreaker CircuitBreakerConfig

	// RateLimiter configures the token bucket rate limiter.
	RateLimiter RateLimiterConfig

	// Bulkhead configures concurrent request limiting.
	Bulkhead BulkheadConfig
}

Config holds configuration for all resilience primitives.

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig returns a Config with sensible defaults for the Basecamp API.

func (*Config) WithBulkhead

func (c *Config) WithBulkhead(bh BulkheadConfig) *Config

WithBulkhead returns a copy of the config with custom bulkhead settings.

func (*Config) WithCircuitBreaker

func (c *Config) WithCircuitBreaker(cb CircuitBreakerConfig) *Config

WithCircuitBreaker returns a copy of the config with custom circuit breaker settings.

func (*Config) WithRateLimiter

func (c *Config) WithRateLimiter(rl RateLimiterConfig) *Config

WithRateLimiter returns a copy of the config with custom rate limiter settings.

type GatingHooks

type GatingHooks struct {
	// contains filtered or unexported fields
}

GatingHooks implements basecamp.GatingHooks to provide resilience patterns for SDK operations. It gates requests through circuit breaker, rate limiter, and bulkhead before they execute.

func NewGatingHooks

func NewGatingHooks(cb *CircuitBreaker, rl *RateLimiter, bh *Bulkhead) *GatingHooks

NewGatingHooks creates a new GatingHooks with the given primitives.

func NewGatingHooksFromConfig

func NewGatingHooksFromConfig(store *Store, cfg *Config) *GatingHooks

NewGatingHooksFromConfig creates a GatingHooks using the provided config and store.

func (*GatingHooks) OnOperationEnd

func (h *GatingHooks) OnOperationEnd(ctx context.Context, op basecamp.OperationInfo, err error, duration time.Duration)

OnOperationEnd is called when a semantic SDK operation completes. It releases the bulkhead slot and records success/failure for circuit breaker.

func (*GatingHooks) OnOperationGate

func (h *GatingHooks) OnOperationGate(ctx context.Context, op basecamp.OperationInfo) (context.Context, error)

OnOperationGate is called before OnOperationStart. It checks rate limiter, bulkhead, and circuit breaker before allowing the operation to proceed.

Gate order is important: rate limiter and bulkhead are checked BEFORE circuit breaker because the circuit breaker reserves a half-open slot atomically. If we checked circuit breaker first and then rate limiter rejected, the half-open slot would leak (never released).

Tradeoff: This ordering means rate limiter tokens are consumed even if bulkhead is full or circuit is open. This is acceptable for a CLI tool where occasional token waste is preferable to half-open slot leaks.

Returns a context that should be used for the operation and an error if the operation should be rejected.

func (*GatingHooks) OnOperationStart

func (h *GatingHooks) OnOperationStart(ctx context.Context, op basecamp.OperationInfo) context.Context

OnOperationStart is called when a semantic SDK operation begins.

func (*GatingHooks) OnRequestEnd

func (h *GatingHooks) OnRequestEnd(ctx context.Context, info basecamp.RequestInfo, result basecamp.RequestResult)

OnRequestEnd is called after an HTTP request completes. It honors Retry-After headers from 429/503 responses to back off the rate limiter.

func (*GatingHooks) OnRequestStart

func (h *GatingHooks) OnRequestStart(ctx context.Context, info basecamp.RequestInfo) context.Context

OnRequestStart is called before an HTTP request is sent.

func (*GatingHooks) OnRetry

func (h *GatingHooks) OnRetry(ctx context.Context, info basecamp.RequestInfo, attempt int, err error)

OnRetry is called before a retry attempt.

type RateLimiter

type RateLimiter struct {
	// contains filtered or unexported fields
}

RateLimiter implements the token bucket algorithm with cross-process persistence.

func NewRateLimiter

func NewRateLimiter(store *Store, config RateLimiterConfig) *RateLimiter

NewRateLimiter creates a new rate limiter with the given config.

func (*RateLimiter) Allow

func (rl *RateLimiter) Allow() (bool, error)

Allow checks if a request is allowed. Returns true if the request can proceed, false if it should be rejected. On success, consumes tokens from the bucket.

func (*RateLimiter) Reset

func (rl *RateLimiter) Reset() error

Reset resets the rate limiter to a full bucket.

func (*RateLimiter) RetryAfterRemaining

func (rl *RateLimiter) RetryAfterRemaining() (time.Duration, error)

RetryAfterRemaining returns the remaining duration of the Retry-After block, or 0 if there is no active block.

func (*RateLimiter) SetRetryAfter

func (rl *RateLimiter) SetRetryAfter(until time.Time) error

SetRetryAfter sets a block until the given time due to a 429 response.

func (*RateLimiter) SetRetryAfterDuration

func (rl *RateLimiter) SetRetryAfterDuration(d time.Duration) error

SetRetryAfterDuration sets a block for the given duration.

func (*RateLimiter) Tokens

func (rl *RateLimiter) Tokens() (float64, error)

Tokens returns the current number of available tokens. This also persists any initialization or refill that occurs.

type RateLimiterConfig

type RateLimiterConfig struct {
	// MaxTokens is the maximum number of tokens in the bucket.
	// Default: 50
	MaxTokens float64

	// RefillRate is how many tokens are added per second.
	// Default: 10
	RefillRate float64

	// TokensPerRequest is how many tokens each request consumes.
	// Default: 1
	TokensPerRequest float64
}

RateLimiterConfig configures the token bucket rate limiter.

func (RateLimiterConfig) WithMaxTokens

func (rl RateLimiterConfig) WithMaxTokens(n float64) RateLimiterConfig

WithMaxTokens sets the maximum tokens for the rate limiter.

func (RateLimiterConfig) WithRefillRate

func (rl RateLimiterConfig) WithRefillRate(n float64) RateLimiterConfig

WithRefillRate sets the refill rate for the rate limiter.

type RateLimiterState

type RateLimiterState struct {
	// Tokens is the current number of available tokens.
	Tokens float64 `json:"tokens"`

	// LastRefillAt is when tokens were last refilled.
	LastRefillAt time.Time `json:"last_refill_at"`

	// RetryAfterUntil is set when we receive a 429 with Retry-After header.
	// No requests should be made until this time passes.
	RetryAfterUntil time.Time `json:"retry_after_until"`
}

RateLimiterState tracks the token bucket rate limiter state. Uses a token bucket algorithm that refills over time.

func (*RateLimiterState) BlockedFor

func (r *RateLimiterState) BlockedFor() time.Duration

BlockedFor returns how long until the Retry-After window expires. Returns zero if not blocked.

func (*RateLimiterState) IsBlocked

func (r *RateLimiterState) IsBlocked() bool

IsBlocked returns true if we're within a Retry-After window.

type State

type State struct {
	// Version is the schema version for future migrations.
	Version int `json:"version"`

	// CircuitBreaker tracks the circuit breaker state.
	CircuitBreaker CircuitBreakerState `json:"circuit_breaker"`

	// RateLimiter tracks the token bucket state.
	RateLimiter RateLimiterState `json:"rate_limiter"`

	// Bulkhead tracks concurrent request limiting.
	Bulkhead BulkheadState `json:"bulkhead"`

	// UpdatedAt is when the state was last modified.
	UpdatedAt time.Time `json:"updated_at"`
}

State represents the persisted resilience state shared across CLI processes. This enables circuit breaker, rate limiter, and bulkhead patterns to coordinate across concurrent basecamp invocations.

func NewState

func NewState() *State

NewState returns a new State with default values. Note: RateLimiterState.LastRefillAt is left as zero so that the first refill call will initialize Tokens to MaxTokens based on the config.

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store handles reading and writing resilience state with file locking. It provides atomic operations safe for concurrent access across processes.

func NewStore

func NewStore(dir string) *Store

NewStore creates a new resilience state store. If dir is empty, it uses the default location (~/.cache/basecamp/resilience/).

func (*Store) Clear

func (s *Store) Clear() error

Clear removes the state file. If the lock cannot be acquired, proceeds without locking (fail-open).

func (*Store) Dir

func (s *Store) Dir() string

Dir returns the state directory path.

func (*Store) Exists

func (s *Store) Exists() bool

Exists returns true if a state file exists.

func (*Store) Load

func (s *Store) Load() (*State, error)

Load reads the state from disk with proper locking. Returns an empty state if the file doesn't exist. If the lock cannot be acquired, proceeds without locking (fail-open).

func (*Store) Path

func (s *Store) Path() string

Path returns the full path to the state file.

func (*Store) Save

func (s *Store) Save(state *State) error

Save writes the state to disk atomically with proper locking. If the lock cannot be acquired, proceeds without locking (fail-open).

func (*Store) Update

func (s *Store) Update(updateFn func(*State) error) error

Update atomically loads, modifies, and saves the state. The updateFn receives the current state and should modify it in place. This is the preferred way to update state as it holds the lock throughout the entire read-modify-write cycle. If the lock cannot be acquired, proceeds without locking (fail-open).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL