Documentation
¶
Index ¶
- Constants
- func DefaultSelectServer(key string, servers []string) (string, error)
- func JumpSelectServer(key string, servers []string) (string, error)
- type BatchCommands
- type BatchExecutor
- type Client
- func (c *Client) AllPoolStats() []ServerPoolStats
- func (c *Client) Close()
- func (c *Client) Execute(ctx context.Context, req *meta.Request) (*meta.Response, error)
- func (c *Client) ExecuteBatch(ctx context.Context, reqs []*meta.Request) ([]*meta.Response, error)
- func (c *Client) Stats(ctx context.Context, args ...string) ([]ServerStats, error)
- type Commands
- func (c *Commands) Add(ctx context.Context, item Item) error
- func (c *Commands) Delete(ctx context.Context, key string) error
- func (c *Commands) Get(ctx context.Context, key string) (Item, error)
- func (c *Commands) Increment(ctx context.Context, key string, delta int64, ttl time.Duration) (int64, error)
- func (c *Commands) Set(ctx context.Context, item Item) error
- type Config
- type Connection
- func (c *Connection) Close() error
- func (c *Connection) Execute(ctx context.Context, req *meta.Request) (*meta.Response, error)
- func (c *Connection) ExecuteBatch(ctx context.Context, reqs []*meta.Request) ([]*meta.Response, error)
- func (c *Connection) ExecuteStats(ctx context.Context, args ...string) (map[string]string, error)
- func (c *Connection) Ping() error
- type Dialer
- type Executor
- type Item
- type Pool
- type PoolStats
- type Querier
- type Resource
- type SelectServerFunc
- type ServerPool
- type ServerPoolStats
- type ServerStats
- type Servers
- type StaticServers
- type StatsExecutor
Examples ¶
Constants ¶
const NoTTL = 0
NoTTL represents an infinite TTL (no expiration). Use this constant when you want items to persist indefinitely in memcache.
Variables ¶
This section is empty.
Functions ¶
func DefaultSelectServer ¶
DefaultSelectServer uses CRC32 hash for consistent server selection. For a single server, it returns that server directly. For multiple servers, it uses CRC32 hash modulo the number of servers. Returns error if no servers are available.
func JumpSelectServer ¶
JumpSelectServer uses Jump Hash for consistent server selection. Jump Hash provides better distribution and fewer key movements when servers are added/removed. For a single server, it returns that server directly. Returns error if no servers are available.
Types ¶
type BatchCommands ¶
type BatchCommands struct {
// contains filtered or unexported fields
}
BatchCommands provides batch operations using a BatchExecutor. This struct is explicitly designed for batch operations and requires an executor that implements BatchExecutor.
func NewBatchCommands ¶
func NewBatchCommands(executor BatchExecutor) *BatchCommands
NewBatchCommands creates a new BatchCommands instance. The executor must implement BatchExecutor (e.g., ServerPool or Client).
func (*BatchCommands) MultiDelete ¶
func (b *BatchCommands) MultiDelete(ctx context.Context, keys []string) error
MultiDelete removes multiple items in a single batch operation. Returns error on first failure.
type BatchExecutor ¶
type BatchExecutor interface {
Executor
ExecuteBatch(ctx context.Context, reqs []*meta.Request) ([]*meta.Response, error)
}
BatchExecutor is an optional interface that Executors can implement to support efficient batch operations using pipelining. If the executor doesn't implement this, Commands will fall back to individual Execute calls.
type Client ¶
type Client struct {
*Commands // Embedded command operations
// contains filtered or unexported fields
}
Client is a memcache client that implements the Querier interface using a connection pool.
func NewClient ¶
NewClient creates a new memcache client with the given servers and configuration. For a single server, use: NewClient(NewStaticServers("host:port"), config)
Example ¶
Example demonstrating how to use circuit breakers with the memcache client
servers := memcache.NewStaticServers("localhost:11211", "localhost:11212")
// Create client with circuit breakers for each server
client, err := memcache.NewClient(servers, memcache.Config{
MaxSize: 10,
CircuitBreakerSettings: &gobreaker.Settings{
Name: "", // Name will be set to server address
MaxRequests: 3, // maxRequests in half-open state
Interval: time.Minute, // interval to reset failure counts
Timeout: 10 * time.Second, // timeout before transitioning to half-open
ReadyToTrip: func(counts gobreaker.Counts) bool {
failureRatio := float64(counts.TotalFailures) / float64(counts.Requests)
return counts.Requests >= 10 && failureRatio >= 0.6
},
OnStateChange: func(name string, from gobreaker.State, to gobreaker.State) {
fmt.Printf("Circuit breaker %s: %s -> %s\n", name, from, to)
},
},
})
if err != nil {
panic(err)
}
defer client.Close()
ctx := context.Background()
// Perform operations - circuit breaker protects against failing servers
_ = client.Set(ctx, memcache.Item{Key: "user:123", Value: []byte("John")})
// Check circuit breaker states
stats := client.AllPoolStats()
for _, serverStats := range stats {
fmt.Printf("Server: %s\n", serverStats.Addr)
fmt.Printf(" Circuit Breaker: %s\n", serverStats.CircuitBreakerState)
fmt.Printf(" Total Connections: %d\n", serverStats.PoolStats.TotalConns)
fmt.Printf(" Active Connections: %d\n", serverStats.PoolStats.ActiveConns)
}
func (*Client) AllPoolStats ¶
func (c *Client) AllPoolStats() []ServerPoolStats
AllPoolStats returns stats for all server pools
func (*Client) Close ¶
func (c *Client) Close()
Close closes the client and destroys all connections in all pools.
func (*Client) ExecuteBatch ¶
ExecuteBatch executes multiple requests with automatic server routing. Requests are grouped by server and executed concurrently using pipelined requests. Returns responses in the same order as requests.
type Commands ¶
type Commands struct {
// contains filtered or unexported fields
}
Commands provides memcache command operations. This struct can be used independently with a custom ExecuteFunc, or embedded in Client for full resilience features.
func NewCommands ¶
NewCommands creates a new Commands instance with the given execute function.
func (*Commands) Increment ¶
func (c *Commands) Increment(ctx context.Context, key string, delta int64, ttl time.Duration) (int64, error)
Increment increments a counter key by the given delta. Creates the key with the delta value if it doesn't exist. This uses auto-vivify (N flag) with initial value (J flag) set to the delta, so the returned value is correct even on first call. TTL of 0 means infinite TTL.
type Config ¶
type Config struct {
// MaxSize is the maximum number of connections in the pool.
// Required: must be > 0.
MaxSize int32
// MaxConnLifetime is the maximum duration a connection can be reused.
// Zero means no limit.
MaxConnLifetime time.Duration
// MaxConnIdleTime is the maximum duration a connection can be idle before being closed.
// Zero means no limit.
MaxConnIdleTime time.Duration
// HealthCheckInterval is how often to check idle connections for health.
// Zero disables health checks.
HealthCheckInterval time.Duration
// Timeout is the default timeout for memcache operations (read/write).
// This is used when the context passed to Execute/ExecuteBatch has no deadline.
// Zero means no timeout (not recommended for production).
// Recommended: 100ms-1s depending on your latency requirements.
Timeout time.Duration
// ConnectTimeout is the timeout for establishing new connections.
// This includes TCP handshake and TLS handshake if applicable.
// If zero, uses Timeout value.
// Set this higher than Timeout if TLS connections take longer to establish.
ConnectTimeout time.Duration
// Dialer is the net.Dialer used to create new connections.
// If nil, the default net.Dialer is used.
Dialer Dialer
// NewPool is the connection pool factory function.
// If nil, uses the default puddle-based pool.
// To use channel pool: NewPool: memcache.NewChannelPool
NewPool func(constructor func(ctx context.Context) (*Connection, error), maxSize int32) (Pool, error)
// SelectServer picks which server to use for a key.
// Receives the key and current server list from Servers.List().
// If nil, uses JumpSelectServer (Jump Hash-based).
// Alternative: DefaultSelectServer (CRC32-based, ~20ns faster).
SelectServer SelectServerFunc
// CircuitBreakerSettings configures the circuit breaker for each server pool.
// If nil, no circuit breaker is used.
// The Name field in the settings will be overridden with the server address.
CircuitBreakerSettings *gobreaker.Settings
}
Config holds configuration for the memcache client connection pool.
type Connection ¶
type Connection struct {
Reader *bufio.Reader
Writer *bufio.Writer
// contains filtered or unexported fields
}
Connection wraps a network connection with buffered reader and writer for efficient I/O.
func NewConnection ¶
func NewConnection(conn net.Conn, timeout time.Duration) *Connection
NewConnection creates a connection with an optional default timeout. The timeout is used when the context passed to Execute has no deadline. Zero timeout means no timeout.
func (*Connection) Close ¶
func (c *Connection) Close() error
func (*Connection) Execute ¶
Execute implements the Executor interface. Executes a single request and returns the response. Uses context deadline if present, otherwise uses the connection's default timeout.
func (*Connection) ExecuteBatch ¶
func (c *Connection) ExecuteBatch(ctx context.Context, reqs []*meta.Request) ([]*meta.Response, error)
ExecuteBatch implements the BatchExecutor interface. Executes multiple requests in a pipeline using the NoOp marker strategy. Sends all requests followed by a NoOp command, then reads responses until the NoOp response.
Returns responses in the same order as requests. Individual request errors are captured in Response.Error (protocol errors). I/O errors or connection failures are returned as Go errors.
Deadline handling: The deadline is extended before reading each response to prevent timeout due to cumulative time across multiple responses (inspired by Grafana PR #16).
func (*Connection) ExecuteStats ¶
ExecuteStats implements the StatsExecutor interface. Executes the stats command and returns the stats as a map.
func (*Connection) Ping ¶
func (c *Connection) Ping() error
Ping performs a simple health check on a connection using the noop command.
type Executor ¶
Executor executes a memcache request for a given key. The key is provided separately to allow server selection based on the key.
type Pool ¶
type Pool interface {
// Acquire gets a connection from the pool, creating one if necessary.
// Blocks until a connection is available or context is canceled.
Acquire(ctx context.Context) (Resource, error)
// AcquireAllIdle acquires all idle connections from the pool.
// Used for health checks and maintenance.
AcquireAllIdle() []Resource
// Close closes the pool and all connections.
Close()
// Stats returns a snapshot of pool statistics.
Stats() PoolStats
}
Pool manages a pool of connections.
func NewChannelPool ¶
func NewChannelPool(constructor func(ctx context.Context) (*Connection, error), maxSize int32) (Pool, error)
NewChannelPool creates a new channel-based connection pool. This is an alternative pool implementation, optimized for performance.
func NewPuddlePool ¶
func NewPuddlePool(constructor func(ctx context.Context) (*Connection, error), maxSize int32) (Pool, error)
NewPuddlePool creates a new puddle-based connection pool. This is the default pool implementation.
type PoolStats ¶
type PoolStats struct {
// Lifetime counters (uint64 - 8 bytes each)
AcquireCount uint64 // Total acquire attempts
AcquireWaitCount uint64 // Acquires that had to wait
CreatedConns uint64 // Total connections created
DestroyedConns uint64 // Total connections destroyed
AcquireErrors uint64 // Failed acquire attempts
AcquireWaitTimeNs uint64 // Total nanoseconds spent waiting
// Current state gauges (int32 - 4 bytes each)
TotalConns int32 // Total connections in pool (active + idle)
IdleConns int32 // Idle connections available
ActiveConns int32 // Connections currently in use
// contains filtered or unexported fields
}
PoolStats contains statistics about a connection pool. All fields are safe for concurrent access.
Struct is optimized to fit within a single cache line (64 bytes). Fields are ordered largest to smallest for optimal memory layout.
For Prometheus integration, expose these as:
- Gauges: TotalConns, IdleConns, ActiveConns
- Counters: AcquireCount, AcquireWaitCount, CreatedConns, DestroyedConns, AcquireErrors
- Histogram: AcquireWaitDuration (use AcquireWaitCount and AcquireWaitTimeNs to calculate)
type Resource ¶
type Resource interface {
// Value returns the underlying connection.
Value() *Connection
// Release returns the connection to the pool for reuse.
Release()
// ReleaseUnused returns the connection to the pool without marking it as used.
// Used for health checks that don't actually use the connection.
ReleaseUnused()
// Destroy closes the connection and removes it from the pool.
Destroy()
// CreationTime returns when the connection was created.
CreationTime() time.Time
// IdleDuration returns how long the connection has been idle.
IdleDuration() time.Duration
}
Resource represents a connection resource from the pool.
type SelectServerFunc ¶
SelectServerFunc picks which server to use for a given key. It receives the key and the current list of server addresses. Returns empty string and error if no server can be selected.
type ServerPool ¶
type ServerPool struct {
// contains filtered or unexported fields
}
ServerPool wraps a pool, a circuit breaker with its server address.
func NewServerPool ¶
func NewServerPool(addr string, config Config) (*ServerPool, error)
func (*ServerPool) Address ¶
func (sp *ServerPool) Address() string
func (*ServerPool) Execute ¶
Execute executes a single request-response cycle with proper connection management. It handles acquiring a connection, sending the request, reading the response, and releasing/destroying the connection based on error conditions. The request is wrapped with the server's circuit breaker.
func (*ServerPool) ExecuteBatch ¶
func (sp *ServerPool) ExecuteBatch(ctx context.Context, reqs []*meta.Request) ([]*meta.Response, error)
ExecuteBatch executes multiple requests in a pipeline using the NoOp marker strategy. Sends all requests followed by a NoOp command, then reads responses until the NoOp response. This leverages memcached's FIFO guarantee for optimal performance.
Returns responses in the same order as requests. Individual request errors are captured in Response.Error (protocol errors). I/O errors or connection failures are returned as Go errors.
The batch execution is wrapped with the circuit breaker to track success/failure.
func (*ServerPool) Stats ¶
func (sp *ServerPool) Stats() ServerPoolStats
type ServerPoolStats ¶
type ServerPoolStats struct {
Addr string
PoolStats PoolStats
CircuitBreakerState gobreaker.State
CircuitBreakerCounts gobreaker.Counts
}
ServerPoolStats contains stats for a single server pool
type ServerStats ¶
type ServerStats struct {
Addr string // Server address
Stats map[string]string // Server statistics (name -> value)
Error error // Error if stats request failed
}
ServerStats contains statistics from a single memcache server.
type Servers ¶
type Servers interface {
// List returns the current list of server addresses.
// The returned slice must not be modified by the caller.
List() []string
}
Servers provides the list of memcache server addresses. Implementations must be safe for concurrent use.
type StaticServers ¶
type StaticServers struct {
// contains filtered or unexported fields
}
StaticServers is a simple implementation that returns a fixed list of server addresses.
func NewStaticServers ¶
func NewStaticServers(addrs ...string) *StaticServers
NewStaticServers creates a new StaticServers with the given addresses.
func (*StaticServers) List ¶
func (s *StaticServers) List() []string
List returns the list of server addresses.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
internal
|
|
|
Package meta provides a low-level wire protocol implementation for the Memcached Meta Protocol (version 1.6+).
|
Package meta provides a low-level wire protocol implementation for the Memcached Meta Protocol (version 1.6+). |
|
references
|
|
|
implementations/golang/memcache
Package memcache provides a client for the memcached cache server.
|
Package memcache provides a client for the memcached cache server. |
|
spec
|
|
|
experiments
command
|