Documentation
¶
Overview ¶
Package space is the Isopace coordination layer: a keyed tuple space for decoupling producers and consumers (queues, work hand-off, request/response rendezvous) across the components of a deployment.
The Space interface follows the classic associative-memory primitives: Out writes an entry under a key, In takes (removes and returns) one entry and blocks until one is available, Rd reads (copies) one without removing it, and Inp/Rdp are their non-blocking probes. A key holds a FIFO bag, so a key is a queue and Out/In is producer/consumer hand-off. Blocking calls take a context, so waits are cancelable and bounded.
Backends ¶
- Local is the in-process backend (a map guarded by a mutex, with a broadcast that wakes blocked waiters). It implements Space.
- Store is durable store-and-forward: a []byte queue backed by a crash-safe append-only log that replays on open, for messages that must survive a restart (e.g. queued for an endpoint that is currently down). Its API returns errors because disk I/O can fail, so it is a sibling of Space rather than an implementation of it.
A distributed backend — NATS / JetStream — is a drop-in adapter that implements Space without the core module taking on the dependency, keeping Isopace stdlib-only. It would live in a separate, optionally-imported module.
Index ¶
- Variables
- type Local
- func (l *Local) Close() error
- func (l *Local) In(ctx context.Context, key string) (any, error)
- func (l *Local) Inp(key string) (any, bool)
- func (l *Local) Len(key string) int
- func (l *Local) Out(key string, value any)
- func (l *Local) Rd(ctx context.Context, key string) (any, error)
- func (l *Local) Rdp(key string) (any, bool)
- type Space
- type Store
- func (s *Store) Close() error
- func (s *Store) Compact() error
- func (s *Store) Len(key string) int
- func (s *Store) Peek(key string) (value []byte, ok bool)
- func (s *Store) Pull(ctx context.Context, key string) ([]byte, error)
- func (s *Store) Pullp(key string) (value []byte, ok bool, err error)
- func (s *Store) Push(key string, value []byte) error
- type StoreOption
Constants ¶
This section is empty.
Variables ¶
var ErrClosed = errors.New("space: closed")
ErrClosed is returned by blocking operations on a closed space.
Functions ¶
This section is empty.
Types ¶
type Local ¶
type Local struct {
// contains filtered or unexported fields
}
Local is the in-process Space backend: a map of FIFO queues guarded by a mutex. Blocked In/Rd callers wait on a broadcast channel that Out and Close close-and-replace, so a waiter is woken without any per-call goroutine and the wait is fully cancelable through its context.
func (*Local) Close ¶
Close marks the space closed and unblocks all waiters with ErrClosed. It is idempotent.
type Space ¶
type Space interface {
// Out writes value under key. After the space is closed it is a no-op.
Out(key string, value any)
// In blocks until an entry is available under key, then removes and returns
// it. It returns the context error if ctx is cancelled or its deadline
// passes, or ErrClosed if the space is closed while waiting.
In(ctx context.Context, key string) (any, error)
// Rd is like In but returns the head entry without removing it, so the same
// entry can be read by several callers and later taken with In.
Rd(ctx context.Context, key string) (any, error)
// Inp is the non-blocking form of In: ok is false if no entry is present.
Inp(key string) (value any, ok bool)
// Rdp is the non-blocking form of Rd.
Rdp(key string) (value any, ok bool)
// Len reports how many entries are queued under key.
Len(key string) int
// Close releases the space and unblocks waiters with ErrClosed.
Close() error
}
Space is a keyed tuple space. A key holds a FIFO bag of entries, so a key is a queue: Out enqueues, In dequeues (take), and Rd peeks the head (read) without removing it. The blocking forms (In, Rd) wait until an entry is available, bounded by the supplied context; the probe forms (Inp, Rdp) never block.
Implementations must be safe for concurrent use.
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store is a durable store-and-forward queue: a keyed FIFO of []byte payloads backed by a crash-safe append-only log. Out (Push) appends an "out" record and In (Pull) appends a tombstone, so the queue state is reconstructed by replaying the log on Open. It is the persistence behind a switch that must hold messages for an endpoint that is currently down and forward them after a restart.
Because disk I/O can fail, Store's API returns errors and so is a sibling of Space rather than an implementation of it. Payloads are held in memory for fast access; the log is the durable backing. Store is safe for concurrent use.
func Open ¶
func Open(path string, opts ...StoreOption) (*Store, error)
Open opens (creating if needed) the store backed by the log at path, replaying it to rebuild the queue state.
func (*Store) Compact ¶
Compact rewrites the log with only the live entries, discarding tombstones and taken payloads, then atomically replaces the old log. Sequence numbers are preserved, so it is safe to call at any time.
func (*Store) Pull ¶
Pull removes and returns the head payload under key, blocking until one is available or ctx ends. The tombstone is durably written before the entry leaves memory, so a crash never resurrects a delivered entry.
type StoreOption ¶
type StoreOption func(*Store)
StoreOption configures a Store.
func WithSync ¶
func WithSync(on bool) StoreOption
WithSync controls whether each log write is flushed to disk with fsync (default true). Disabling it trades durability for throughput and is intended for tests or non-critical queues.