space

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 2, 2026 License: AGPL-3.0 Imports: 9 Imported by: 0

Documentation

Overview

Package space is the Isopace coordination layer: a keyed tuple space for decoupling producers and consumers (queues, work hand-off, request/response rendezvous) across the components of a deployment.

The Space interface follows the classic associative-memory primitives: Out writes an entry under a key, In takes (removes and returns) one entry and blocks until one is available, Rd reads (copies) one without removing it, and Inp/Rdp are their non-blocking probes. A key holds a FIFO bag, so a key is a queue and Out/In is producer/consumer hand-off. Blocking calls take a context, so waits are cancelable and bounded.

Backends

  • Local is the in-process backend (a map guarded by a mutex, with a broadcast that wakes blocked waiters). It implements Space.
  • Store is durable store-and-forward: a []byte queue backed by a crash-safe append-only log that replays on open, for messages that must survive a restart (e.g. queued for an endpoint that is currently down). Its API returns errors because disk I/O can fail, so it is a sibling of Space rather than an implementation of it.

A distributed backend — NATS / JetStream — is a drop-in adapter that implements Space without the core module taking on the dependency, keeping Isopace stdlib-only. It would live in a separate, optionally-imported module.

Index

Constants

This section is empty.

Variables

View Source
var ErrClosed = errors.New("space: closed")

ErrClosed is returned by blocking operations on a closed space.

Functions

This section is empty.

Types

type Local

type Local struct {
	// contains filtered or unexported fields
}

Local is the in-process Space backend: a map of FIFO queues guarded by a mutex. Blocked In/Rd callers wait on a broadcast channel that Out and Close close-and-replace, so a waiter is woken without any per-call goroutine and the wait is fully cancelable through its context.

func NewLocal

func NewLocal() *Local

NewLocal returns an empty in-process space.

func (*Local) Close

func (l *Local) Close() error

Close marks the space closed and unblocks all waiters with ErrClosed. It is idempotent.

func (*Local) In

func (l *Local) In(ctx context.Context, key string) (any, error)

In removes and returns the head entry under key, blocking until one exists.

func (*Local) Inp

func (l *Local) Inp(key string) (any, bool)

Inp removes and returns the head entry under key, or ok=false if none.

func (*Local) Len

func (l *Local) Len(key string) int

Len reports the number of entries queued under key.

func (*Local) Out

func (l *Local) Out(key string, value any)

Out enqueues value under key and wakes any waiters.

func (*Local) Rd

func (l *Local) Rd(ctx context.Context, key string) (any, error)

Rd returns the head entry under key without removing it, blocking until one exists.

func (*Local) Rdp

func (l *Local) Rdp(key string) (any, bool)

Rdp returns the head entry under key without removing it, or ok=false if none.

type Space

type Space interface {
	// Out writes value under key. After the space is closed it is a no-op.
	Out(key string, value any)

	// In blocks until an entry is available under key, then removes and returns
	// it. It returns the context error if ctx is cancelled or its deadline
	// passes, or ErrClosed if the space is closed while waiting.
	In(ctx context.Context, key string) (any, error)

	// Rd is like In but returns the head entry without removing it, so the same
	// entry can be read by several callers and later taken with In.
	Rd(ctx context.Context, key string) (any, error)

	// Inp is the non-blocking form of In: ok is false if no entry is present.
	Inp(key string) (value any, ok bool)

	// Rdp is the non-blocking form of Rd.
	Rdp(key string) (value any, ok bool)

	// Len reports how many entries are queued under key.
	Len(key string) int

	// Close releases the space and unblocks waiters with ErrClosed.
	Close() error
}

Space is a keyed tuple space. A key holds a FIFO bag of entries, so a key is a queue: Out enqueues, In dequeues (take), and Rd peeks the head (read) without removing it. The blocking forms (In, Rd) wait until an entry is available, bounded by the supplied context; the probe forms (Inp, Rdp) never block.

Implementations must be safe for concurrent use.

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store is a durable store-and-forward queue: a keyed FIFO of []byte payloads backed by a crash-safe append-only log. Out (Push) appends an "out" record and In (Pull) appends a tombstone, so the queue state is reconstructed by replaying the log on Open. It is the persistence behind a switch that must hold messages for an endpoint that is currently down and forward them after a restart.

Because disk I/O can fail, Store's API returns errors and so is a sibling of Space rather than an implementation of it. Payloads are held in memory for fast access; the log is the durable backing. Store is safe for concurrent use.

func Open

func Open(path string, opts ...StoreOption) (*Store, error)

Open opens (creating if needed) the store backed by the log at path, replaying it to rebuild the queue state.

func (*Store) Close

func (s *Store) Close() error

Close flushes and closes the log and unblocks waiters with ErrClosed.

func (*Store) Compact

func (s *Store) Compact() error

Compact rewrites the log with only the live entries, discarding tombstones and taken payloads, then atomically replaces the old log. Sequence numbers are preserved, so it is safe to call at any time.

func (*Store) Len

func (s *Store) Len(key string) int

Len reports the number of entries queued under key.

func (*Store) Peek

func (s *Store) Peek(key string) (value []byte, ok bool)

Peek returns a copy of the head payload under key without removing it.

func (*Store) Pull

func (s *Store) Pull(ctx context.Context, key string) ([]byte, error)

Pull removes and returns the head payload under key, blocking until one is available or ctx ends. The tombstone is durably written before the entry leaves memory, so a crash never resurrects a delivered entry.

func (*Store) Pullp

func (s *Store) Pullp(key string) (value []byte, ok bool, err error)

Pullp is the non-blocking form of Pull: ok is false if the queue is empty.

func (*Store) Push

func (s *Store) Push(key string, value []byte) error

Push enqueues a durable copy of value under key.

type StoreOption

type StoreOption func(*Store)

StoreOption configures a Store.

func WithSync

func WithSync(on bool) StoreOption

WithSync controls whether each log write is flushed to disk with fsync (default true). Disabling it trades durability for throughput and is intended for tests or non-critical queues.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL