memcache

package module
v0.0.0-...-4464274 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 8, 2025 License: MIT Imports: 13 Imported by: 0

README

memcache

A modern memcache client for Go implementing the meta protocol.

Work in Progress: This is an active development project. The low-level meta protocol implementation is stable, and the high-level client includes production-ready features like multi-server support, circuit breakers, and connection pooling.

Features

Low-Level Meta Protocol (meta package)
  • Meta protocol implementation (get, set, delete, arithmetic, debug)
  • Pipelined request batching
  • Error handling with connection state management
High-Level Client
  • Multi-server support with CRC32-based consistent hashing
  • Circuit breakers using gobreaker for fault tolerance
  • Connection pooling with health checks and lifecycle management
  • jackc/puddle pool (default) and optional channel-based pool
  • Pool statistics for monitoring connection health and usage
  • Reusable Commands struct for building custom clients
  • Context support for timeouts and cancellation
  • Type-safe operations

Installation

go get github.com/pior/memcache

Quick Start

Using the High-Level Client
import (
    "context"
    "time"
    "github.com/pior/memcache"
)

// Create client with static servers
servers := memcache.NewStaticServers("localhost:11211", "localhost:11212")
client, _ := memcache.NewClient(servers, memcache.Config{
    MaxSize:             10,
    MaxConnLifetime:     5 * time.Minute,
    MaxConnIdleTime:     1 * time.Minute,
    HealthCheckInterval: 30 * time.Second,
})
defer client.Close()

ctx := context.Background()

// Set with TTL
_ = client.Set(ctx, memcache.Item{
    Key:   "mykey",
    Value: []byte("hello world"),
    TTL:   1 * time.Hour,
})

// Get
item, _ := client.Get(ctx, "mykey")
if item.Found {
    fmt.Printf("Value: %s\n", item.Value)
}

// Increment counter
count, _ := client.Increment(ctx, "counter", 1, memcache.NoTTL)
fmt.Printf("Count: %d\n", count)

// Delete
_ = client.Delete(ctx, "mykey")
Using the Meta Protocol Directly
import (
    "bufio"
    "net"
    "github.com/pior/memcache/meta"
)

// Create connection
conn, _ := net.Dial("tcp", "localhost:11211")
defer conn.Close()

// Write request
req := meta.NewRequest(meta.CmdSet, "mykey", []byte("hello world"), []meta.Flag{
    {Type: meta.FlagTTL, Token: "3600"},
})
meta.WriteRequest(conn, req)

// Read response
r := bufio.NewReader(conn)
resp, _ := meta.ReadResponse(r)
if resp.Status == meta.StatusHD {
    fmt.Println("Stored!")
}

Multi-Server Support

The client supports multiple memcache servers with automatic server selection:

// Static server list
servers := memcache.NewStaticServers(
    "cache1.example.com:11211",
    "cache2.example.com:11211",
    "cache3.example.com:11211",
)

client, _ := memcache.NewClient(servers, memcache.Config{
    MaxSize: 10,
    // Optional: Custom server selection (default is Jump Hash-based)
    // Alternative: memcache.DefaultSelectServer for CRC32 (~20ns faster)
    SelectServer: memcache.JumpSelectServer,
})

The client uses Jump Hash consistent hashing by default for key distribution across servers. Alternatively, DefaultSelectServer provides CRC32-based hashing (~20ns faster but with potentially worse distribution).

Server Selection Algorithms

Choose the appropriate server selection algorithm based on your requirements:

JumpSelectServer (Jump Hash) - Default
  • Algorithm: Jump Hash algorithm for optimal distribution
  • Performance: ~42-56 ns/op
  • Pros: Better load balancing, fewer key movements during scaling, more uniform distribution
  • Cons: Higher computational cost
  • Best for: Most deployments, especially large-scale or dynamic scaling scenarios
DefaultSelectServer (CRC32-based)
  • Algorithm: Simple CRC32 hash modulo number of servers
  • Performance: ~19 ns/op (~20ns faster than Jump Hash)
  • Pros: Fast, low computational overhead, deterministic
  • Cons: Can have clustering issues with non-uniform key distributions
  • Best for: Performance-critical applications where distribution quality is less important
// Use Jump Hash for better distribution in large deployments
client, _ := memcache.NewClient(servers, memcache.Config{
    SelectServer: memcache.JumpSelectServer,
})

Circuit Breakers

Protect your application from cascading failures with built-in circuit breakers:

client, _ := memcache.NewClient(servers, memcache.Config{
    MaxSize: 10,
    CircuitBreakerSettings: &gobreaker.Settings{
        MaxRequests: 3,                // maxRequests in half-open state
        Interval:    time.Minute,      // interval to reset failure counts
        Timeout:     10 * time.Second, // timeout before transitioning to half-open
        ReadyToTrip: func(counts gobreaker.Counts) bool {
            failureRatio := float64(counts.TotalFailures) / float64(counts.Requests)
            return counts.Requests >= 10 && failureRatio >= 0.6
        },
    },
})

// Monitor circuit breaker state
stats := client.AllPoolStats()
for _, serverStats := range stats {
    fmt.Printf("Server: %s, Circuit: %s\n",
        serverStats.Addr,
        serverStats.CircuitBreakerState)

    // Access circuit breaker metrics
    counts := serverStats.CircuitBreakerCounts
    fmt.Printf("  Requests: %d, Failures: %d\n",
        counts.Requests,
        counts.TotalFailures)
}

Connection Pooling

Optional Channel based Pool
client, _ := memcache.NewClient(servers, memcache.Config{
    MaxSize: 10,
    Pool:    memcache.NewChannelPool,
})
Pool Statistics

Monitor connection pool health and usage:

stats := client.AllPoolStats()
for _, serverStats := range stats {
    poolStats := serverStats.PoolStats

    fmt.Printf("Server: %s\n", serverStats.Addr)
    fmt.Printf("  Total Connections: %d\n", poolStats.TotalConns)
    fmt.Printf("  Idle Connections: %d\n", poolStats.IdleConns)
    fmt.Printf("  Active Connections: %d\n", poolStats.ActiveConns)
    fmt.Printf("  Connections Created: %d\n", poolStats.CreatedConns)
    fmt.Printf("  Acquire Errors: %d\n", poolStats.AcquireErrors)

    // Circuit breaker state
    fmt.Printf("  Circuit State: %s\n", serverStats.CircuitBreakerState)
}

Reusable Commands

The Commands struct provides a reusable, composable way to execute memcache operations:

// Create a custom execute function
executeFunc := func(ctx context.Context, key string, req *meta.Request) (*meta.Response, error) {
    conn, _ := net.Dial("tcp", "localhost:11211")
    defer conn.Close()

    connection := memcache.NewConnection(conn, 0)
    // Direct execution without pooling
    return connection.Execute(ctx, req)
}

// Create Commands with custom executor
commands := memcache.NewCommands(executeFunc)

// Use commands
item, _ := commands.Get(ctx, "mykey")
_ = commands.Set(ctx, memcache.Item{Key: "key", Value: []byte("value")})

This allows you to build custom clients with different execution strategies while reusing the command logic.

Testing

# Run all tests
go test ./...

# Run with race detector
go test -race ./...

# Run benchmarks
go test -bench=. ./...

# Run with channel pool
go test -bench=BenchmarkPool ./...

Dependencies

Core dependencies:

  • github.com/sony/gobreaker/v2 - Circuit breaker implementation
  • github.com/jackc/puddle/v2 - Default pool implementation

Command-line tools (in cmd/) have their own go.mod files with separate dependencies.

Requirements

  • Go 1.24+
  • Memcached 1.6+ (with meta protocol support)

License

MIT License - See LICENSE file for details.

Status

This project is under active development. The meta protocol implementation and core client features (multi-server, circuit breakers, pooling) are production-ready. The API is stabilizing but breaking changes may occur before v1.0.

Contributions and feedback are welcome!

Documentation

Index

Examples

Constants

View Source
const NoTTL = 0

NoTTL represents an infinite TTL (no expiration). Use this constant when you want items to persist indefinitely in memcache.

Variables

This section is empty.

Functions

func DefaultSelectServer

func DefaultSelectServer(key string, servers []string) (string, error)

DefaultSelectServer uses CRC32 hash for consistent server selection. For a single server, it returns that server directly. For multiple servers, it uses CRC32 hash modulo the number of servers. Returns error if no servers are available.

func JumpSelectServer

func JumpSelectServer(key string, servers []string) (string, error)

JumpSelectServer uses Jump Hash for consistent server selection. Jump Hash provides better distribution and fewer key movements when servers are added/removed. For a single server, it returns that server directly. Returns error if no servers are available.

Types

type BatchCommands

type BatchCommands struct {
	// contains filtered or unexported fields
}

BatchCommands provides batch operations using a BatchExecutor. This struct is explicitly designed for batch operations and requires an executor that implements BatchExecutor.

func NewBatchCommands

func NewBatchCommands(executor BatchExecutor) *BatchCommands

NewBatchCommands creates a new BatchCommands instance. The executor must implement BatchExecutor (e.g., ServerPool or Client).

func (*BatchCommands) MultiDelete

func (b *BatchCommands) MultiDelete(ctx context.Context, keys []string) error

MultiDelete removes multiple items in a single batch operation. Returns error on first failure.

func (*BatchCommands) MultiGet

func (b *BatchCommands) MultiGet(ctx context.Context, keys []string) ([]Item, error)

MultiGet retrieves multiple items in a single batch operation. Returns items in the same order as the keys, with Found=false for missing items.

func (*BatchCommands) MultiSet

func (b *BatchCommands) MultiSet(ctx context.Context, items []Item) error

MultiSet stores multiple items in a single batch operation. Returns error on first failure.

type BatchExecutor

type BatchExecutor interface {
	Executor
	ExecuteBatch(ctx context.Context, reqs []*meta.Request) ([]*meta.Response, error)
}

BatchExecutor is an optional interface that Executors can implement to support efficient batch operations using pipelining. If the executor doesn't implement this, Commands will fall back to individual Execute calls.

type Client

type Client struct {
	*Commands // Embedded command operations
	// contains filtered or unexported fields
}

Client is a memcache client that implements the Querier interface using a connection pool.

func NewClient

func NewClient(servers Servers, config Config) (*Client, error)

NewClient creates a new memcache client with the given servers and configuration. For a single server, use: NewClient(NewStaticServers("host:port"), config)

Example

Example demonstrating how to use circuit breakers with the memcache client

servers := memcache.NewStaticServers("localhost:11211", "localhost:11212")

// Create client with circuit breakers for each server
client, err := memcache.NewClient(servers, memcache.Config{
	MaxSize: 10,
	CircuitBreakerSettings: &gobreaker.Settings{
		Name:        "",               // Name will be set to server address
		MaxRequests: 3,                // maxRequests in half-open state
		Interval:    time.Minute,      // interval to reset failure counts
		Timeout:     10 * time.Second, // timeout before transitioning to half-open
		ReadyToTrip: func(counts gobreaker.Counts) bool {
			failureRatio := float64(counts.TotalFailures) / float64(counts.Requests)
			return counts.Requests >= 10 && failureRatio >= 0.6
		},
		OnStateChange: func(name string, from gobreaker.State, to gobreaker.State) {
			fmt.Printf("Circuit breaker %s: %s -> %s\n", name, from, to)
		},
	},
})
if err != nil {
	panic(err)
}
defer client.Close()

ctx := context.Background()

// Perform operations - circuit breaker protects against failing servers
_ = client.Set(ctx, memcache.Item{Key: "user:123", Value: []byte("John")})

// Check circuit breaker states
stats := client.AllPoolStats()
for _, serverStats := range stats {
	fmt.Printf("Server: %s\n", serverStats.Addr)
	fmt.Printf("  Circuit Breaker: %s\n", serverStats.CircuitBreakerState)
	fmt.Printf("  Total Connections: %d\n", serverStats.PoolStats.TotalConns)
	fmt.Printf("  Active Connections: %d\n", serverStats.PoolStats.ActiveConns)
}

func (*Client) AllPoolStats

func (c *Client) AllPoolStats() []ServerPoolStats

AllPoolStats returns stats for all server pools

func (*Client) Close

func (c *Client) Close()

Close closes the client and destroys all connections in all pools.

func (*Client) Execute

func (c *Client) Execute(ctx context.Context, req *meta.Request) (*meta.Response, error)

func (*Client) ExecuteBatch

func (c *Client) ExecuteBatch(ctx context.Context, reqs []*meta.Request) ([]*meta.Response, error)

ExecuteBatch executes multiple requests with automatic server routing. Requests are grouped by server and executed concurrently using pipelined requests. Returns responses in the same order as requests.

func (*Client) Stats

func (c *Client) Stats(ctx context.Context, args ...string) ([]ServerStats, error)

Stats retrieves statistics from all memcache servers. Sends a stats request to each server and collects the responses. Returns a slice of ServerStats, one per server. Individual server errors are returned in ServerStats.Error, not as a Go error.

type Commands

type Commands struct {
	// contains filtered or unexported fields
}

Commands provides memcache command operations. This struct can be used independently with a custom ExecuteFunc, or embedded in Client for full resilience features.

func NewCommands

func NewCommands(executor Executor) *Commands

NewCommands creates a new Commands instance with the given execute function.

func (*Commands) Add

func (c *Commands) Add(ctx context.Context, item Item) error

Add stores an item in memcache only if the key doesn't already exist.

func (*Commands) Delete

func (c *Commands) Delete(ctx context.Context, key string) error

Delete removes an item from memcache.

func (*Commands) Get

func (c *Commands) Get(ctx context.Context, key string) (Item, error)

Get retrieves a single item from memcache.

func (*Commands) Increment

func (c *Commands) Increment(ctx context.Context, key string, delta int64, ttl time.Duration) (int64, error)

Increment increments a counter key by the given delta. Creates the key with the delta value if it doesn't exist. This uses auto-vivify (N flag) with initial value (J flag) set to the delta, so the returned value is correct even on first call. TTL of 0 means infinite TTL.

func (*Commands) Set

func (c *Commands) Set(ctx context.Context, item Item) error

Set stores an item in memcache.

type Config

type Config struct {
	// MaxSize is the maximum number of connections in the pool.
	// Required: must be > 0.
	MaxSize int32

	// MaxConnLifetime is the maximum duration a connection can be reused.
	// Zero means no limit.
	MaxConnLifetime time.Duration

	// MaxConnIdleTime is the maximum duration a connection can be idle before being closed.
	// Zero means no limit.
	MaxConnIdleTime time.Duration

	// HealthCheckInterval is how often to check idle connections for health.
	// Zero disables health checks.
	HealthCheckInterval time.Duration

	// Timeout is the default timeout for memcache operations (read/write).
	// This is used when the context passed to Execute/ExecuteBatch has no deadline.
	// Zero means no timeout (not recommended for production).
	// Recommended: 100ms-1s depending on your latency requirements.
	Timeout time.Duration

	// ConnectTimeout is the timeout for establishing new connections.
	// This includes TCP handshake and TLS handshake if applicable.
	// If zero, uses Timeout value.
	// Set this higher than Timeout if TLS connections take longer to establish.
	ConnectTimeout time.Duration

	// Dialer is the net.Dialer used to create new connections.
	// If nil, the default net.Dialer is used.
	Dialer Dialer

	// NewPool is the connection pool factory function.
	// If nil, uses the default puddle-based pool.
	// To use channel pool: NewPool: memcache.NewChannelPool
	NewPool func(constructor func(ctx context.Context) (*Connection, error), maxSize int32) (Pool, error)

	// SelectServer picks which server to use for a key.
	// Receives the key and current server list from Servers.List().
	// If nil, uses JumpSelectServer (Jump Hash-based).
	// Alternative: DefaultSelectServer (CRC32-based, ~20ns faster).
	SelectServer SelectServerFunc

	// CircuitBreakerSettings configures the circuit breaker for each server pool.
	// If nil, no circuit breaker is used.
	// The Name field in the settings will be overridden with the server address.
	CircuitBreakerSettings *gobreaker.Settings
}

Config holds configuration for the memcache client connection pool.

type Connection

type Connection struct {
	Reader *bufio.Reader
	Writer *bufio.Writer
	// contains filtered or unexported fields
}

Connection wraps a network connection with buffered reader and writer for efficient I/O.

func NewConnection

func NewConnection(conn net.Conn, timeout time.Duration) *Connection

NewConnection creates a connection with an optional default timeout. The timeout is used when the context passed to Execute has no deadline. Zero timeout means no timeout.

func (*Connection) Close

func (c *Connection) Close() error

func (*Connection) Execute

func (c *Connection) Execute(ctx context.Context, req *meta.Request) (*meta.Response, error)

Execute implements the Executor interface. Executes a single request and returns the response. Uses context deadline if present, otherwise uses the connection's default timeout.

func (*Connection) ExecuteBatch

func (c *Connection) ExecuteBatch(ctx context.Context, reqs []*meta.Request) ([]*meta.Response, error)

ExecuteBatch implements the BatchExecutor interface. Executes multiple requests in a pipeline using the NoOp marker strategy. Sends all requests followed by a NoOp command, then reads responses until the NoOp response.

Returns responses in the same order as requests. Individual request errors are captured in Response.Error (protocol errors). I/O errors or connection failures are returned as Go errors.

Deadline handling: The deadline is extended before reading each response to prevent timeout due to cumulative time across multiple responses (inspired by Grafana PR #16).

func (*Connection) ExecuteStats

func (c *Connection) ExecuteStats(ctx context.Context, args ...string) (map[string]string, error)

ExecuteStats implements the StatsExecutor interface. Executes the stats command and returns the stats as a map.

func (*Connection) Ping

func (c *Connection) Ping() error

Ping performs a simple health check on a connection using the noop command.

type Dialer

type Dialer interface {
	DialContext(ctx context.Context, network, address string) (net.Conn, error)
}

type Executor

type Executor interface {
	Execute(ctx context.Context, req *meta.Request) (*meta.Response, error)
}

Executor executes a memcache request for a given key. The key is provided separately to allow server selection based on the key.

type Item

type Item struct {
	Key   string
	Value []byte
	TTL   time.Duration
	Found bool // indicates whether the key was found in cache
}

type Pool

type Pool interface {
	// Acquire gets a connection from the pool, creating one if necessary.
	// Blocks until a connection is available or context is canceled.
	Acquire(ctx context.Context) (Resource, error)

	// AcquireAllIdle acquires all idle connections from the pool.
	// Used for health checks and maintenance.
	AcquireAllIdle() []Resource

	// Close closes the pool and all connections.
	Close()

	// Stats returns a snapshot of pool statistics.
	Stats() PoolStats
}

Pool manages a pool of connections.

func NewChannelPool

func NewChannelPool(constructor func(ctx context.Context) (*Connection, error), maxSize int32) (Pool, error)

NewChannelPool creates a new channel-based connection pool. This is an alternative pool implementation, optimized for performance.

func NewPuddlePool

func NewPuddlePool(constructor func(ctx context.Context) (*Connection, error), maxSize int32) (Pool, error)

NewPuddlePool creates a new puddle-based connection pool. This is the default pool implementation.

type PoolStats

type PoolStats struct {
	// Lifetime counters (uint64 - 8 bytes each)
	AcquireCount      uint64 // Total acquire attempts
	AcquireWaitCount  uint64 // Acquires that had to wait
	CreatedConns      uint64 // Total connections created
	DestroyedConns    uint64 // Total connections destroyed
	AcquireErrors     uint64 // Failed acquire attempts
	AcquireWaitTimeNs uint64 // Total nanoseconds spent waiting

	// Current state gauges (int32 - 4 bytes each)
	TotalConns  int32 // Total connections in pool (active + idle)
	IdleConns   int32 // Idle connections available
	ActiveConns int32 // Connections currently in use
	// contains filtered or unexported fields
}

PoolStats contains statistics about a connection pool. All fields are safe for concurrent access.

Struct is optimized to fit within a single cache line (64 bytes). Fields are ordered largest to smallest for optimal memory layout.

For Prometheus integration, expose these as:

  • Gauges: TotalConns, IdleConns, ActiveConns
  • Counters: AcquireCount, AcquireWaitCount, CreatedConns, DestroyedConns, AcquireErrors
  • Histogram: AcquireWaitDuration (use AcquireWaitCount and AcquireWaitTimeNs to calculate)

type Querier

type Querier interface {
	Get(ctx context.Context, key string) (Item, error)
	Set(ctx context.Context, item Item) error
	Add(ctx context.Context, item Item) error
	Delete(ctx context.Context, key string) error
	Increment(ctx context.Context, key string, delta int64, ttl time.Duration) (int64, error)
}

type Resource

type Resource interface {
	// Value returns the underlying connection.
	Value() *Connection

	// Release returns the connection to the pool for reuse.
	Release()

	// ReleaseUnused returns the connection to the pool without marking it as used.
	// Used for health checks that don't actually use the connection.
	ReleaseUnused()

	// Destroy closes the connection and removes it from the pool.
	Destroy()

	// CreationTime returns when the connection was created.
	CreationTime() time.Time

	// IdleDuration returns how long the connection has been idle.
	IdleDuration() time.Duration
}

Resource represents a connection resource from the pool.

type SelectServerFunc

type SelectServerFunc func(key string, servers []string) (string, error)

SelectServerFunc picks which server to use for a given key. It receives the key and the current list of server addresses. Returns empty string and error if no server can be selected.

type ServerPool

type ServerPool struct {
	// contains filtered or unexported fields
}

ServerPool wraps a pool, a circuit breaker with its server address.

func NewServerPool

func NewServerPool(addr string, config Config) (*ServerPool, error)

func (*ServerPool) Address

func (sp *ServerPool) Address() string

func (*ServerPool) Execute

func (sp *ServerPool) Execute(ctx context.Context, req *meta.Request) (*meta.Response, error)

Execute executes a single request-response cycle with proper connection management. It handles acquiring a connection, sending the request, reading the response, and releasing/destroying the connection based on error conditions. The request is wrapped with the server's circuit breaker.

func (*ServerPool) ExecuteBatch

func (sp *ServerPool) ExecuteBatch(ctx context.Context, reqs []*meta.Request) ([]*meta.Response, error)

ExecuteBatch executes multiple requests in a pipeline using the NoOp marker strategy. Sends all requests followed by a NoOp command, then reads responses until the NoOp response. This leverages memcached's FIFO guarantee for optimal performance.

Returns responses in the same order as requests. Individual request errors are captured in Response.Error (protocol errors). I/O errors or connection failures are returned as Go errors.

The batch execution is wrapped with the circuit breaker to track success/failure.

func (*ServerPool) Stats

func (sp *ServerPool) Stats() ServerPoolStats

type ServerPoolStats

type ServerPoolStats struct {
	Addr                 string
	PoolStats            PoolStats
	CircuitBreakerState  gobreaker.State
	CircuitBreakerCounts gobreaker.Counts
}

ServerPoolStats contains stats for a single server pool

type ServerStats

type ServerStats struct {
	Addr  string            // Server address
	Stats map[string]string // Server statistics (name -> value)
	Error error             // Error if stats request failed
}

ServerStats contains statistics from a single memcache server.

type Servers

type Servers interface {
	// List returns the current list of server addresses.
	// The returned slice must not be modified by the caller.
	List() []string
}

Servers provides the list of memcache server addresses. Implementations must be safe for concurrent use.

type StaticServers

type StaticServers struct {
	// contains filtered or unexported fields
}

StaticServers is a simple implementation that returns a fixed list of server addresses.

func NewStaticServers

func NewStaticServers(addrs ...string) *StaticServers

NewStaticServers creates a new StaticServers with the given addresses.

func (*StaticServers) List

func (s *StaticServers) List() []string

List returns the list of server addresses.

type StatsExecutor

type StatsExecutor interface {
	ExecuteStats(ctx context.Context, args ...string) (map[string]string, error)
}

StatsExecutor is an optional interface for executing the stats command. The stats command has a different response format than regular meta commands.

Directories

Path Synopsis
internal
Package meta provides a low-level wire protocol implementation for the Memcached Meta Protocol (version 1.6+).
Package meta provides a low-level wire protocol implementation for the Memcached Meta Protocol (version 1.6+).
references
implementations/golang/memcache
Package memcache provides a client for the memcached cache server.
Package memcache provides a client for the memcached cache server.
spec
experiments command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL