cachegrid

package module
v0.0.0-...-7c62740 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 19, 2026 License: MIT Imports: 21 Imported by: 0

README

CacheGrid

A high-performance distributed cache for Go. Embed it as a library or run it as a standalone server.

Go Reference Go Report Card

Features

  • Sub-microsecond reads — 76ns Get, 441ns Set (benchmarked)
  • Distributed — Gossip-based cluster with consistent hashing
  • Three cache modes — Partitioned, Replicated, NearCache
  • Distributed locks — With fencing tokens and auto-release
  • Rate limiting — Token bucket and sliding window algorithms
  • Pub/Sub — Subscribe to cache events with glob patterns
  • Tag-based invalidation — Group keys by tags, invalidate in bulk
  • Namespaces — Multi-tenant key isolation
  • HTTP middleware — Rate limiting and response caching for net/http
  • Standalone server — REST API with Docker and Kubernetes support
  • Prometheus metrics/metrics endpoint out of the box
  • Zero external dependencies at runtime — No Redis, no Memcached

Install

go get github.com/skshohagmiah/cachegrid

Quick Start

Embedded (Go Library)
package main

import (
    "fmt"
    "time"

    "github.com/skshohagmiah/cachegrid"
)

func main() {
    cache, _ := cachegrid.New(cachegrid.Config{
        MaxMemoryMB: 256,
        DefaultTTL:  5 * time.Minute,
    })
    defer cache.Shutdown()

    // Set and Get
    cache.Set("user:123", map[string]string{"name": "Alice"}, 5*time.Minute)

    var user map[string]string
    if cache.Get("user:123", &user) {
        fmt.Println(user["name"]) // Alice
    }
}
Distributed Cluster
cache, _ := cachegrid.New(cachegrid.Config{
    ListenAddr:  ":7946",
    Peers:       []string{"10.0.0.2:7946", "10.0.0.3:7946"},
    Mode:        cachegrid.Partitioned,
    MaxMemoryMB: 256,
})
defer cache.Shutdown()

Nodes discover each other via gossip and automatically partition keys across the cluster.

Standalone Server
docker run -d -p 6380:6380 -p 7946:7946 cachegrid

curl -X PUT localhost:6380/cache/user:123 \
  -H "X-TTL: 5m" -d '{"name": "Alice"}'

curl localhost:6380/cache/user:123

API Reference

Cache Operations
// Basic CRUD
cache.Set("key", value, 5*time.Minute)
cache.Get("key", &dest)
cache.Delete("key")
cache.Exists("key")
cache.TTL("key")

// Cache-aside pattern
cache.GetOrSet("key", &dest, ttl, func() (interface{}, error) {
    return fetchFromDB()
})

// Bulk operations
cache.MSet(map[string]cachegrid.Item{
    "a": {Value: 1, TTL: time.Minute},
    "b": {Value: 2, TTL: time.Minute},
})
results := cache.MGet("a", "b")

// Atomic counters
cache.Incr("views", 1)
cache.Decr("stock", 1)
Tags and Namespaces
// Tag-based invalidation
cache.SetWithTags("user:1:profile", profile, ttl, []string{"user:1"})
cache.SetWithTags("user:1:settings", settings, ttl, []string{"user:1"})
cache.InvalidateTag("user:1") // deletes both keys

// Namespaces
tenant := cache.WithNamespace("acme")
tenant.Set("config", val, time.Hour)  // stored as "acme:config"
tenant.Get("config", &dest)
Distributed Locks
lock, err := cache.Lock("order:456", cachegrid.LockOptions{
    TTL:        30 * time.Second,
    RetryCount: 10,
    RetryDelay: 100 * time.Millisecond,
})
if err != nil {
    return err
}
defer lock.Release()

// Extend for long operations
lock.Extend(60 * time.Second)

// Fencing token for safe writes
token := lock.Token()

// Non-blocking attempt
lock, ok := cache.TryLock("resource", 30*time.Second)
Rate Limiting
// Token bucket
allowed, state := cache.RateLimit("api:user:1", cachegrid.RateLimitOptions{
    Limit:  100,
    Window: time.Minute,
})
// state.Remaining, state.Limit, state.ResetsAt

// Sliding window
allowed, state := cache.RateLimitSliding("api:user:1", cachegrid.SlidingWindowOptions{
    Limit:  100,
    Window: time.Minute,
})
Pub/Sub
sub := cache.Subscribe("user:*")
defer sub.Close()

go func() {
    for event := range sub.Events() {
        fmt.Printf("%s %s\n", event.Type, event.Key)
    }
}()

// Event hooks
cache.OnHit(func(key string) { /* metrics */ })
cache.OnMiss(func(key string) { /* metrics */ })
cache.OnEvict(func(key string, value interface{}) { /* cleanup */ })
HTTP Middleware
mux := http.NewServeMux()

// Rate limit by client IP (100 req/min)
handler := cachegrid.HTTPRateLimit(cache, 100, time.Minute)(mux)

// Cache GET responses
handler = cachegrid.HTTPCache(cache, 30*time.Second)(mux)

REST API (Standalone Mode)

Method Endpoint Description
GET /cache/{key} Get value
PUT /cache/{key} Set value (X-TTL, X-Tags headers)
DELETE /cache/{key} Delete key
HEAD /cache/{key} Check existence
POST /cache/_mget Bulk get ({"keys": [...]})
POST /locks/{key} Acquire lock ({"ttl": "30s"})
DELETE /locks/{key} Release lock (X-Lock-Token header)
PUT /locks/{key} Extend lock
POST /ratelimit/{key} Check rate limit
GET /subscribe/{pattern} SSE event stream
GET /health Health check
GET /metrics Prometheus metrics
GET /cluster/nodes Cluster state

Configuration

Embedded
cachegrid.Config{
    NumShards:       256,           // must be power of 2
    MaxMemoryMB:     256,           // 0 = unlimited
    DefaultTTL:      5*time.Minute, // 0 = no expiry
    SweeperInterval: time.Second,   // expired key cleanup interval

    // Cluster (optional — omit for local-only mode)
    ListenAddr:   ":7946",
    Peers:        []string{"10.0.0.2:7946"},
    Mode:         cachegrid.Partitioned, // or Replicated, NearCache
    NodeName:     "node-1",              // defaults to hostname
    GRPCPort:     7947,                  // inter-node RPC
    HTTPPort:     6380,                  // standalone HTTP API
    VirtualNodes: 150,                   // hash ring granularity
}
Standalone (Environment Variables)
Variable Default Description
CACHEGRID_SHARDS 256 Number of shards
CACHEGRID_MAX_MEMORY_MB 0 Memory limit
CACHEGRID_DEFAULT_TTL 0 Default TTL (e.g., 5m)
CACHEGRID_NODE_NAME hostname Node identifier
CACHEGRID_LISTEN_ADDR `` Gossip bind address
CACHEGRID_SEEDS `` Comma-separated seed nodes
CACHEGRID_MODE partitioned partitioned, replicated, nearcache
CACHEGRID_RPC_PORT 7947 Inter-node RPC port
CACHEGRID_HTTP_PORT 6380 HTTP API port

Deployment

Docker Compose
docker compose up -d   # starts 3-node cluster
curl localhost:6380/health
curl localhost:6381/health
curl localhost:6382/health
Kubernetes
kubectl apply -f kubernetes.yaml

Creates a 3-replica StatefulSet with headless service for gossip discovery.

Cache Modes

Mode Read Write Use Case
Partitioned From owner To owner Large datasets, even distribution
Replicated Local (fast) Fan-out to all Read-heavy, small datasets
NearCache Local first, then owner Local + owner Hot keys, read-heavy

Benchmarks

BenchmarkGetHit-10          15,733,113    75.85 ns/op     69 B/op    3 allocs/op
BenchmarkGetMiss-10         51,526,194    23.16 ns/op     16 B/op    1 allocs/op
BenchmarkSet-10              2,504,900   441.30 ns/op    353 B/op    6 allocs/op
BenchmarkExists-10          17,432,041    68.02 ns/op     15 B/op    1 allocs/op
BenchmarkIncr-10             7,588,982   156.50 ns/op    304 B/op    7 allocs/op
BenchmarkConcurrentRead-10  10,314,302   122.10 ns/op     87 B/op    4 allocs/op
BenchmarkConcurrentMixed-10  8,313,844   147.90 ns/op    138 B/op    5 allocs/op

Run locally: go test -bench=. -benchmem

Architecture

┌─────────────────────────────────────────────────────────────┐
│                      Public API (cachegrid)                 │
│  Set · Get · Delete · Lock · RateLimit · Subscribe · ...    │
├──────────────┬──────────────┬───────────────┬───────────────┤
│  internal/   │  internal/   │  internal/    │  internal/    │
│  cache       │  cluster     │  transport    │  lock         │
│  (shards,    │  (hashring,  │  (TCP+msgpack │  (fencing,    │
│   LRU,       │   gossip,    │   RPC)        │   auto-       │
│   eviction)  │   state)     │               │   release)    │
├──────────────┼──────────────┼───────────────┼───────────────┤
│  internal/   │  internal/   │  internal/    │               │
│  ratelimit   │  pubsub      │  server       │               │
│  (token      │  (broker,    │  (HTTP REST,  │               │
│   bucket,    │   events,    │   SSE,        │               │
│   sliding    │   hooks)     │   metrics)    │               │
│   window)    │              │               │               │
└──────────────┴──────────────┴───────────────┴───────────────┘

Project Structure

cachegrid/
├── cache.go, config.go, errors.go      # Core cache + configuration
├── distributed.go                       # Cluster routing logic
├── lock.go, ratelimit.go               # Distributed locks + rate limiting
├── pubsub.go, tags.go, namespace.go    # Events, tags, namespaces
├── middleware.go                        # HTTP middleware helpers
├── *_test.go                           # 121 tests across all features
├── internal/
│   ├── cache/       # Sharded store, LRU eviction, serialization
│   ├── cluster/     # Hash ring, gossip membership, cluster state
│   ├── transport/   # TCP+msgpack inter-node RPC
│   ├── lock/        # Lock engine with fencing tokens
│   ├── ratelimit/   # Token bucket + sliding window
│   ├── pubsub/      # Event broker + subscriptions
│   └── server/      # HTTP server, handlers, middleware
├── cmd/cachegrid/   # Standalone server binary
├── Dockerfile
├── docker-compose.yml
└── kubernetes.yaml

Contributing

See CONTRIBUTING.md for guidelines.

License

MIT License. See LICENSE for details.

Documentation

Index

Constants

View Source
const (
	EventSet    = pubsub.EventSet
	EventDelete = pubsub.EventDelete
	EventExpire = pubsub.EventExpire
	EventEvict  = pubsub.EventEvict
)

Event type constants.

Variables

View Source
var (
	ErrNotFound            = errors.New("cachegrid: key not found")
	ErrExpired             = errors.New("cachegrid: key expired")
	ErrKeyEmpty            = errors.New("cachegrid: key must not be empty")
	ErrSerializationFailed = errors.New("cachegrid: serialization failed")
	ErrShutdown            = errors.New("cachegrid: cache is shut down")
	ErrNodeNotFound        = errors.New("cachegrid: node not found in cluster")
	ErrRemoteCall          = errors.New("cachegrid: remote call failed")
	ErrLockNotAcquired     = errors.New("cachegrid: lock could not be acquired")
	ErrLockNotHeld         = errors.New("cachegrid: lock not held or expired")
	ErrRateLimited         = errors.New("cachegrid: rate limit exceeded")
)

Functions

func HTTPCache

func HTTPCache(c *Cache, ttl time.Duration) func(http.Handler) http.Handler

HTTPCache returns an HTTP middleware that caches responses.

func HTTPRateLimit

func HTTPRateLimit(c *Cache, limit int64, window time.Duration) func(http.Handler) http.Handler

HTTPRateLimit returns an HTTP middleware that rate limits by client IP.

Types

type Cache

type Cache struct {
	// contains filtered or unexported fields
}

Cache is a sharded in-memory cache with LRU eviction and TTL support. It optionally forms a distributed cluster via gossip and RPC.

func New

func New(config Config) (*Cache, error)

New creates a new Cache with the given configuration.

func (*Cache) Broker

func (c *Cache) Broker() *pubsub.Broker

Broker returns the pub/sub broker.

func (*Cache) ClusterState

func (c *Cache) ClusterState() *cluster.ClusterState

ClusterState returns the cluster state, or nil if running in local mode.

func (*Cache) Decr

func (c *Cache) Decr(key string, delta int64) (int64, error)

Decr atomically decrements a counter.

func (*Cache) Delete

func (c *Cache) Delete(key string)

Delete removes a key from the cache.

func (*Cache) Exists

func (c *Cache) Exists(key string) bool

Exists checks if a key exists and is not expired.

func (*Cache) Get

func (c *Cache) Get(key string, dest interface{}) bool

Get retrieves a value and deserializes it into dest.

func (*Cache) GetOrSet

func (c *Cache) GetOrSet(key string, dest interface{}, ttl time.Duration, fn func() (interface{}, error)) error

GetOrSet retrieves a value or computes and stores it on miss.

func (*Cache) HandleDelete

func (c *Cache) HandleDelete(key string) bool

func (*Cache) HandleExists

func (c *Cache) HandleExists(key string) bool

func (*Cache) HandleGet

func (c *Cache) HandleGet(key string) ([]byte, bool)

func (*Cache) HandleIncr

func (c *Cache) HandleIncr(key string, delta int64) (int64, error)

func (*Cache) HandleLockAcquire

func (c *Cache) HandleLockAcquire(key string, ttl time.Duration) (string, uint64, bool)

func (*Cache) HandleLockExtend

func (c *Cache) HandleLockExtend(key string, token string, ttl time.Duration) bool

func (*Cache) HandleLockRelease

func (c *Cache) HandleLockRelease(key string, token string) bool

func (*Cache) HandleMGet

func (c *Cache) HandleMGet(keys []string) map[string][]byte

func (*Cache) HandleSet

func (c *Cache) HandleSet(key string, value []byte, ttl time.Duration, tags []string)

func (*Cache) Incr

func (c *Cache) Incr(key string, delta int64) (int64, error)

Incr atomically increments a counter.

func (*Cache) InvalidateTag

func (c *Cache) InvalidateTag(tag string)

InvalidateTag deletes all keys associated with the given tag.

func (*Cache) Len

func (c *Cache) Len() int

Len returns the total number of items across all shards.

func (*Cache) Lock

func (c *Cache) Lock(key string, opts LockOptions) (*LockHandle, error)

Lock acquires a distributed lock with retry logic.

func (*Cache) LockEngine

func (c *Cache) LockEngine() *lock.Engine

LockEngine returns the lock engine.

func (*Cache) MGet

func (c *Cache) MGet(keys ...string) map[string][]byte

MGet retrieves multiple keys.

func (*Cache) MSet

func (c *Cache) MSet(items map[string]Item) error

MSet stores multiple key-value pairs.

func (*Cache) OnDelete

func (c *Cache) OnDelete(fn func(key string))

OnDelete registers a callback for delete operations.

func (*Cache) OnEvict

func (c *Cache) OnEvict(fn func(key string, value interface{}))

OnEvict registers a callback for evictions.

func (*Cache) OnHit

func (c *Cache) OnHit(fn func(key string))

OnHit registers a callback for cache hits.

func (*Cache) OnMiss

func (c *Cache) OnMiss(fn func(key string))

OnMiss registers a callback for cache misses.

func (*Cache) OnNodeJoin

func (c *Cache) OnNodeJoin(node *cluster.NodeInfo)

func (*Cache) OnNodeLeave

func (c *Cache) OnNodeLeave(node *cluster.NodeInfo)

func (*Cache) OnNodeUpdate

func (c *Cache) OnNodeUpdate(node *cluster.NodeInfo)

func (*Cache) OnSet

func (c *Cache) OnSet(fn func(key string, value interface{}))

OnSet registers a callback for set operations.

func (*Cache) RateLimit

func (c *Cache) RateLimit(key string, opts RateLimitOptions) (bool, RateLimitState)

RateLimit checks a token bucket rate limiter for the given key.

func (*Cache) RateLimitSliding

func (c *Cache) RateLimitSliding(key string, opts SlidingWindowOptions) (bool, RateLimitState)

RateLimitSliding checks a sliding window rate limiter for the given key.

func (*Cache) Ring

func (c *Cache) Ring() *cluster.Ring

Ring returns the hash ring, or nil if running in local mode.

func (*Cache) Set

func (c *Cache) Set(key string, value interface{}, ttl time.Duration) error

Set stores a value with the given TTL.

func (*Cache) SetWithTags

func (c *Cache) SetWithTags(key string, value interface{}, ttl time.Duration, tags []string) error

SetWithTags stores a value with associated tags.

func (*Cache) Shutdown

func (c *Cache) Shutdown() error

Shutdown gracefully stops all subsystems.

func (*Cache) Subscribe

func (c *Cache) Subscribe(pattern string) *Subscription

Subscribe returns a subscription for events matching the glob pattern.

func (*Cache) TTL

func (c *Cache) TTL(key string) time.Duration

TTL returns the remaining time-to-live for a key.

func (*Cache) TryLock

func (c *Cache) TryLock(key string, ttl time.Duration) (*LockHandle, bool)

TryLock attempts a single non-blocking lock acquisition.

func (*Cache) WithNamespace

func (c *Cache) WithNamespace(namespace string) *NamespacedCache

WithNamespace returns a namespaced view of the cache. All keys will be prefixed with "namespace:".

type CacheEvent

type CacheEvent = pubsub.Event

CacheEvent is a public cache event.

type Config

type Config struct {
	// NumShards is the number of internal shards. Must be a power of 2.
	// Default: 256.
	NumShards int

	// MaxMemoryMB is the maximum memory in megabytes across all shards.
	// 0 means unlimited.
	MaxMemoryMB int64

	// DefaultTTL is the default expiration for entries when no TTL is specified.
	// 0 means no expiry by default.
	DefaultTTL time.Duration

	// SweeperInterval controls how often the background goroutine
	// checks for expired entries. Default: 1 second.
	SweeperInterval time.Duration

	// ListenAddr is the gossip protocol listen address (e.g., ":7946").
	// If empty, the cache runs in local-only mode.
	ListenAddr string

	// Peers is the list of seed nodes for cluster discovery.
	Peers []string

	// Mode is the cache distribution strategy.
	Mode Mode

	// NodeName is a unique identifier for this node. If empty, hostname is used.
	NodeName string

	// GRPCPort is the port for inter-node gRPC/RPC communication. Default: 7947.
	GRPCPort int

	// HTTPPort is the HTTP server port for standalone mode. Default: 6380.
	HTTPPort int

	// VirtualNodes is the number of virtual nodes per physical node on the hash ring.
	// Default: 150.
	VirtualNodes int
}

Config holds configuration for a Cache instance.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns a Config with sensible defaults.

type EventType

type EventType = pubsub.EventType

EventType re-exports the internal event type.

type Item

type Item struct {
	Value interface{}
	TTL   time.Duration
}

Item represents a cache entry for bulk operations.

type LockHandle

type LockHandle struct {
	// contains filtered or unexported fields
}

LockHandle represents a held distributed lock.

func (*LockHandle) Extend

func (h *LockHandle) Extend(ttl time.Duration) error

Extend extends the lock TTL.

func (*LockHandle) Release

func (h *LockHandle) Release() error

Release releases the lock.

func (*LockHandle) Token

func (h *LockHandle) Token() uint64

Token returns the fencing token for this lock.

type LockOptions

type LockOptions struct {
	TTL        time.Duration // lock auto-release time (required)
	RetryCount int           // number of retries (0 = no retry)
	RetryDelay time.Duration // delay between retries
}

LockOptions configures lock acquisition behavior.

type Mode

type Mode int

Mode defines the cache distribution strategy.

const (
	Partitioned Mode = iota
	Replicated
	NearCache
)

type NamespacedCache

type NamespacedCache struct {
	// contains filtered or unexported fields
}

NamespacedCache wraps a Cache and prefixes all keys with a namespace.

func (*NamespacedCache) Decr

func (nc *NamespacedCache) Decr(key string, delta int64) (int64, error)

Decr decrements a counter in the namespaced cache.

func (*NamespacedCache) Delete

func (nc *NamespacedCache) Delete(key string)

Delete removes a key from the namespaced cache.

func (*NamespacedCache) Exists

func (nc *NamespacedCache) Exists(key string) bool

Exists checks if a key exists in the namespaced cache.

func (*NamespacedCache) Get

func (nc *NamespacedCache) Get(key string, dest interface{}) bool

Get retrieves a value from the namespaced cache.

func (*NamespacedCache) GetOrSet

func (nc *NamespacedCache) GetOrSet(key string, dest interface{}, ttl time.Duration, fn func() (interface{}, error)) error

GetOrSet retrieves or computes a value in the namespaced cache.

func (*NamespacedCache) Incr

func (nc *NamespacedCache) Incr(key string, delta int64) (int64, error)

Incr increments a counter in the namespaced cache.

func (*NamespacedCache) InvalidateTag

func (nc *NamespacedCache) InvalidateTag(tag string)

InvalidateTag invalidates all keys with the given tag in the namespace.

func (*NamespacedCache) Set

func (nc *NamespacedCache) Set(key string, value interface{}, ttl time.Duration) error

Set stores a value in the namespaced cache.

func (*NamespacedCache) SetWithTags

func (nc *NamespacedCache) SetWithTags(key string, value interface{}, ttl time.Duration, tags []string) error

SetWithTags stores a value with tags in the namespaced cache.

func (*NamespacedCache) TTL

func (nc *NamespacedCache) TTL(key string) time.Duration

TTL returns the TTL for a key in the namespaced cache.

type RateLimitOptions

type RateLimitOptions struct {
	Limit  int64
	Window time.Duration
}

RateLimitOptions configures token bucket rate limiting.

type RateLimitState

type RateLimitState struct {
	Allowed   bool
	Remaining int64
	Limit     int64
	ResetsAt  time.Time
}

RateLimitState is the public rate limit response.

type SlidingWindowOptions

type SlidingWindowOptions struct {
	Limit  int64
	Window time.Duration
}

SlidingWindowOptions configures sliding window rate limiting.

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Subscription wraps an internal subscription.

func (*Subscription) Close

func (s *Subscription) Close()

Close unsubscribes and closes the event channel.

func (*Subscription) Events

func (s *Subscription) Events() <-chan CacheEvent

Events returns the channel to receive events on.

Directories

Path Synopsis
cmd
cachegrid command
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL