Documentation
¶
Overview ¶
Package state provides shared state management for distributed agent coordination.
The StateStore interface enables key-value storage with TTL, watch notifications, and distributed locking across various backends (NATS JetStream KV, in-memory).
Key Features ¶
- Key-value operations: Get, Put, Delete with optional TTL
- Watch: Subscribe to changes on key patterns
- Distributed locks: Acquire/release with automatic expiry
- Multiple backends: NATS JetStream KV (production), in-memory (testing)
Usage ¶
// Production: NATS JetStream KV
bus, _ := bus.NewNATSBus(bus.NATSConfig{URL: "nats://localhost:4222"})
store, _ := state.NewNATSStore(state.NATSStoreConfig{
Conn: bus.Conn(),
Bucket: "agent-state",
})
// Testing: In-memory
store := state.NewMemoryStore()
// Key-value operations
store.Put("config.timeout", []byte("30s"), time.Hour)
val, _ := store.Get("config.timeout")
// Watch for changes
ch, _ := store.Watch("config.*")
for kv := range ch {
fmt.Printf("Key %s changed: %s\n", kv.Key, kv.Value)
}
// Distributed locking
lock, _ := store.Lock("resource.mutex", 30*time.Second)
defer lock.Unlock()
// ... critical section ...
Index ¶
- Variables
- func MatchPattern(pattern, key string) bool
- func ValidateKey(key string) error
- func ValidateTTL(ttl time.Duration) error
- type KeyValue
- type Lock
- type MemoryStore
- func (s *MemoryStore) Close() error
- func (s *MemoryStore) Delete(key string) error
- func (s *MemoryStore) Get(key string) ([]byte, error)
- func (s *MemoryStore) GetKeyValue(key string) (*KeyValue, error)
- func (s *MemoryStore) Keys(pattern string) ([]string, error)
- func (s *MemoryStore) Lock(key string, ttl time.Duration) (Lock, error)
- func (s *MemoryStore) Put(key string, value []byte, ttl time.Duration) error
- func (s *MemoryStore) Watch(pattern string) (<-chan *KeyValue, error)
- type NATSStore
- func (s *NATSStore) Close() error
- func (s *NATSStore) Delete(key string) error
- func (s *NATSStore) Get(key string) ([]byte, error)
- func (s *NATSStore) GetKeyValue(key string) (*KeyValue, error)
- func (s *NATSStore) Keys(pattern string) ([]string, error)
- func (s *NATSStore) Lock(key string, ttl time.Duration) (Lock, error)
- func (s *NATSStore) Put(key string, value []byte, ttl time.Duration) error
- func (s *NATSStore) Watch(pattern string) (<-chan *KeyValue, error)
- type NATSStoreConfig
- type Operation
- type StateStore
Constants ¶
This section is empty.
Variables ¶
var ( ErrNotFound = errors.New("key not found") ErrClosed = errors.New("store closed") ErrLockHeld = errors.New("lock already held") ErrLockNotHeld = errors.New("lock not held") ErrLockExpired = errors.New("lock expired") ErrInvalidKey = errors.New("invalid key") ErrInvalidTTL = errors.New("invalid TTL") ErrWatchClosed = errors.New("watch closed") )
Common errors.
Functions ¶
func MatchPattern ¶
MatchPattern checks if a key matches a pattern. Supports * wildcard at the end (e.g., "config.*" matches "config.foo").
Types ¶
type KeyValue ¶
type KeyValue struct {
// Key is the entry key.
Key string
// Value is the entry value.
Value []byte
// Revision is a monotonic version number.
Revision uint64
// Operation indicates the type of change.
Operation Operation
// Created is when the key was first created.
Created time.Time
// Modified is when the key was last modified.
Modified time.Time
}
KeyValue represents a key-value entry with metadata.
type Lock ¶
type Lock interface {
// Unlock releases the lock.
// Returns ErrLockNotHeld if already released.
Unlock() error
// Refresh extends the lock TTL.
// Returns ErrLockExpired if the lock has expired.
Refresh() error
// Key returns the lock key.
Key() string
}
Lock represents a distributed lock.
type MemoryStore ¶
type MemoryStore struct {
// contains filtered or unexported fields
}
MemoryStore implements StateStore using in-memory storage. Useful for testing and single-process scenarios.
func NewMemoryStore ¶
func NewMemoryStore() *MemoryStore
NewMemoryStore creates a new in-memory state store.
func (*MemoryStore) Get ¶
func (s *MemoryStore) Get(key string) ([]byte, error)
Get retrieves a value by key.
func (*MemoryStore) GetKeyValue ¶
func (s *MemoryStore) GetKeyValue(key string) (*KeyValue, error)
GetKeyValue retrieves the full KeyValue entry.
func (*MemoryStore) Keys ¶
func (s *MemoryStore) Keys(pattern string) ([]string, error)
Keys returns all keys matching a pattern.
type NATSStore ¶
type NATSStore struct {
// contains filtered or unexported fields
}
NATSStore implements StateStore using NATS JetStream KV.
func NewNATSStore ¶
func NewNATSStore(cfg NATSStoreConfig) (*NATSStore, error)
NewNATSStore creates a new NATS JetStream KV store.
func (*NATSStore) GetKeyValue ¶
GetKeyValue retrieves the full KeyValue entry.
type NATSStoreConfig ¶
type NATSStoreConfig struct {
// Conn is the NATS connection to use.
Conn *nats.Conn
// Bucket is the KV bucket name.
Bucket string
// TTL is the default TTL for entries (0 = no default).
TTL time.Duration
// History is the number of revisions to keep per key.
// Default: 1
History int
// MaxValueSize is the maximum value size in bytes.
// Default: 1MB
MaxValueSize int32
}
NATSStoreConfig holds NATS KV store configuration.
func DefaultNATSStoreConfig ¶
func DefaultNATSStoreConfig() NATSStoreConfig
DefaultNATSStoreConfig returns configuration with sensible defaults.
type StateStore ¶
type StateStore interface {
// Get retrieves a value by key.
// Returns ErrNotFound if the key does not exist.
Get(key string) ([]byte, error)
// GetKeyValue retrieves the full KeyValue entry.
// Returns ErrNotFound if the key does not exist.
GetKeyValue(key string) (*KeyValue, error)
// Put stores a value with an optional TTL.
// If ttl is 0, the key never expires.
Put(key string, value []byte, ttl time.Duration) error
// Delete removes a key.
// Returns nil if the key does not exist.
Delete(key string) error
// Keys returns all keys matching a pattern.
// Pattern supports * wildcard at the end (e.g., "config.*").
Keys(pattern string) ([]string, error)
// Watch watches for changes to keys matching a pattern.
// Pattern supports * wildcard at the end (e.g., "config.*").
// The channel is closed when the watch ends or store closes.
Watch(pattern string) (<-chan *KeyValue, error)
// Lock acquires a distributed lock with the given TTL.
// Returns ErrLockHeld if the lock is already held.
Lock(key string, ttl time.Duration) (Lock, error)
// Close shuts down the store and releases resources.
Close() error
}
StateStore provides distributed key-value storage with locking.