state

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 5, 2026 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Overview

Package state provides shared state management for distributed agent coordination.

The StateStore interface enables key-value storage with TTL, watch notifications, and distributed locking across various backends (NATS JetStream KV, in-memory).

Key Features

  • Key-value operations: Get, Put, Delete with optional TTL
  • Watch: Subscribe to changes on key patterns
  • Distributed locks: Acquire/release with automatic expiry
  • Multiple backends: NATS JetStream KV (production), in-memory (testing)

Usage

// Production: NATS JetStream KV
bus, _ := bus.NewNATSBus(bus.NATSConfig{URL: "nats://localhost:4222"})
store, _ := state.NewNATSStore(state.NATSStoreConfig{
    Conn:   bus.Conn(),
    Bucket: "agent-state",
})

// Testing: In-memory
store := state.NewMemoryStore()

// Key-value operations
store.Put("config.timeout", []byte("30s"), time.Hour)
val, _ := store.Get("config.timeout")

// Watch for changes
ch, _ := store.Watch("config.*")
for kv := range ch {
    fmt.Printf("Key %s changed: %s\n", kv.Key, kv.Value)
}

// Distributed locking
lock, _ := store.Lock("resource.mutex", 30*time.Second)
defer lock.Unlock()
// ... critical section ...

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNotFound    = errors.New("key not found")
	ErrClosed      = errors.New("store closed")
	ErrLockHeld    = errors.New("lock already held")
	ErrLockNotHeld = errors.New("lock not held")
	ErrLockExpired = errors.New("lock expired")
	ErrInvalidKey  = errors.New("invalid key")
	ErrInvalidTTL  = errors.New("invalid TTL")
	ErrWatchClosed = errors.New("watch closed")
)

Common errors.

Functions

func MatchPattern

func MatchPattern(pattern, key string) bool

MatchPattern checks if a key matches a pattern. Supports * wildcard at the end (e.g., "config.*" matches "config.foo").

func ValidateKey

func ValidateKey(key string) error

ValidateKey checks if a key is valid.

func ValidateTTL

func ValidateTTL(ttl time.Duration) error

ValidateTTL checks if a TTL is valid.

Types

type KeyValue

type KeyValue struct {
	// Key is the entry key.
	Key string

	// Value is the entry value.
	Value []byte

	// Revision is a monotonic version number.
	Revision uint64

	// Operation indicates the type of change.
	Operation Operation

	// Created is when the key was first created.
	Created time.Time

	// Modified is when the key was last modified.
	Modified time.Time
}

KeyValue represents a key-value entry with metadata.

type Lock

type Lock interface {
	// Unlock releases the lock.
	// Returns ErrLockNotHeld if already released.
	Unlock() error

	// Refresh extends the lock TTL.
	// Returns ErrLockExpired if the lock has expired.
	Refresh() error

	// Key returns the lock key.
	Key() string
}

Lock represents a distributed lock.

type MemoryStore

type MemoryStore struct {
	// contains filtered or unexported fields
}

MemoryStore implements StateStore using in-memory storage. Useful for testing and single-process scenarios.

func NewMemoryStore

func NewMemoryStore() *MemoryStore

NewMemoryStore creates a new in-memory state store.

func (*MemoryStore) Close

func (s *MemoryStore) Close() error

Close shuts down the store.

func (*MemoryStore) Delete

func (s *MemoryStore) Delete(key string) error

Delete removes a key.

func (*MemoryStore) Get

func (s *MemoryStore) Get(key string) ([]byte, error)

Get retrieves a value by key.

func (*MemoryStore) GetKeyValue

func (s *MemoryStore) GetKeyValue(key string) (*KeyValue, error)

GetKeyValue retrieves the full KeyValue entry.

func (*MemoryStore) Keys

func (s *MemoryStore) Keys(pattern string) ([]string, error)

Keys returns all keys matching a pattern.

func (*MemoryStore) Lock

func (s *MemoryStore) Lock(key string, ttl time.Duration) (Lock, error)

Lock acquires a distributed lock.

func (*MemoryStore) Put

func (s *MemoryStore) Put(key string, value []byte, ttl time.Duration) error

Put stores a value with optional TTL.

func (*MemoryStore) Watch

func (s *MemoryStore) Watch(pattern string) (<-chan *KeyValue, error)

Watch watches for changes to keys matching a pattern.

type NATSStore

type NATSStore struct {
	// contains filtered or unexported fields
}

NATSStore implements StateStore using NATS JetStream KV.

func NewNATSStore

func NewNATSStore(cfg NATSStoreConfig) (*NATSStore, error)

NewNATSStore creates a new NATS JetStream KV store.

func (*NATSStore) Close

func (s *NATSStore) Close() error

Close shuts down the store.

func (*NATSStore) Delete

func (s *NATSStore) Delete(key string) error

Delete removes a key.

func (*NATSStore) Get

func (s *NATSStore) Get(key string) ([]byte, error)

Get retrieves a value by key.

func (*NATSStore) GetKeyValue

func (s *NATSStore) GetKeyValue(key string) (*KeyValue, error)

GetKeyValue retrieves the full KeyValue entry.

func (*NATSStore) Keys

func (s *NATSStore) Keys(pattern string) ([]string, error)

Keys returns all keys matching a pattern.

func (*NATSStore) Lock

func (s *NATSStore) Lock(key string, ttl time.Duration) (Lock, error)

Lock acquires a distributed lock.

func (*NATSStore) Put

func (s *NATSStore) Put(key string, value []byte, ttl time.Duration) error

Put stores a value with optional TTL.

func (*NATSStore) Watch

func (s *NATSStore) Watch(pattern string) (<-chan *KeyValue, error)

Watch watches for changes to keys matching a pattern.

type NATSStoreConfig

type NATSStoreConfig struct {
	// Conn is the NATS connection to use.
	Conn *nats.Conn

	// Bucket is the KV bucket name.
	Bucket string

	// TTL is the default TTL for entries (0 = no default).
	TTL time.Duration

	// History is the number of revisions to keep per key.
	// Default: 1
	History int

	// MaxValueSize is the maximum value size in bytes.
	// Default: 1MB
	MaxValueSize int32
}

NATSStoreConfig holds NATS KV store configuration.

func DefaultNATSStoreConfig

func DefaultNATSStoreConfig() NATSStoreConfig

DefaultNATSStoreConfig returns configuration with sensible defaults.

type Operation

type Operation int

Operation represents the type of change to a key.

const (
	// OpPut indicates a key was created or updated.
	OpPut Operation = iota
	// OpDelete indicates a key was deleted.
	OpDelete
)

func (Operation) String

func (o Operation) String() string

String returns the operation name.

type StateStore

type StateStore interface {
	// Get retrieves a value by key.
	// Returns ErrNotFound if the key does not exist.
	Get(key string) ([]byte, error)

	// GetKeyValue retrieves the full KeyValue entry.
	// Returns ErrNotFound if the key does not exist.
	GetKeyValue(key string) (*KeyValue, error)

	// Put stores a value with an optional TTL.
	// If ttl is 0, the key never expires.
	Put(key string, value []byte, ttl time.Duration) error

	// Delete removes a key.
	// Returns nil if the key does not exist.
	Delete(key string) error

	// Keys returns all keys matching a pattern.
	// Pattern supports * wildcard at the end (e.g., "config.*").
	Keys(pattern string) ([]string, error)

	// Watch watches for changes to keys matching a pattern.
	// Pattern supports * wildcard at the end (e.g., "config.*").
	// The channel is closed when the watch ends or store closes.
	Watch(pattern string) (<-chan *KeyValue, error)

	// Lock acquires a distributed lock with the given TTL.
	// Returns ErrLockHeld if the lock is already held.
	Lock(key string, ttl time.Duration) (Lock, error)

	// Close shuts down the store and releases resources.
	Close() error
}

StateStore provides distributed key-value storage with locking.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL