syncx

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 15, 2026 License: MIT Imports: 6 Imported by: 0

Documentation

Overview

Package syncx provides generic synchronization primitives that extend the standard sync package with type-safe, zero-cast APIs built on Go generics.

Per-key locking: KeyedMutex[K] provides per-key mutual exclusion with automatic entry cleanup via reference counting. KeyedLocker[K] extends this with read/write semantics. Both use RAII-style unlock functions.

Concurrent queues: BlockingQueue[T] and RingQueue[T] offer context-aware blocking, close semantics, and non-blocking try variants.

Cache-aside: ReadThrough[K,V] wraps a Cache backend with per-key stampede protection via double-checked locking.

Object pooling: Pool[T] is a type-safe wrapper around sync.Pool with optional reset.

Concurrency patterns: Dispatcher[K,V] routes keyed work to fixed goroutine slots by hash. SingleFlight[K,V] deduplicates concurrent calls for the same key. Group[T] collects results from multiple goroutines in submission order with panic recovery. Race[T] returns the first successful result from concurrent functions; if all fail, returns the last error.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrQueueClosed = Error("syncx.queue.closed")
	ErrQueueFull   = Error("syncx.queue.full")
	ErrQueueEmpty  = Error("syncx.queue.empty")
)

Queue sentinel errors.

View Source
var (
	ErrDispatcherClosed = Error("syncx.dispatcher.closed")
)

Dispatcher sentinel errors.

Functions

func Race added in v0.2.0

func Race[T any](ctx context.Context, fns ...func(ctx context.Context) (T, error)) (T, error)

Race runs fns concurrently and returns the first successful result. If all fns fail, Race returns the last error written by any goroutine (goroutine scheduling order, not submission order). The provided ctx is passed to each fn; once one fn succeeds, the derived context passed to remaining fns is canceled. Race waits for all goroutines to finish before returning to avoid leaks. If fns is empty, Race returns the zero value and nil.

Types

type BlockingQueue

type BlockingQueue[T any] struct {
	// contains filtered or unexported fields
}

BlockingQueue is a generic concurrent queue with blocking and non-blocking modes. It uses a ring buffer backed by sync.Cond for efficient blocking. After Close(), Pop continues returning remaining items then returns ErrQueueClosed. After CloseNow(), all operations immediately return ErrQueueClosed.

func NewBlockingQueue

func NewBlockingQueue[T any](capacity int) *BlockingQueue[T]

NewBlockingQueue creates a BlockingQueue with the given capacity. Panics if capacity < 1.

func (*BlockingQueue[T]) Close

func (q *BlockingQueue[T]) Close()

Close closes the queue in drain mode. Push returns ErrQueueClosed. Pop continues returning remaining items, then returns ErrQueueClosed when empty. Idempotent — calling Close or CloseNow again is a no-op.

func (*BlockingQueue[T]) CloseNow

func (q *BlockingQueue[T]) CloseNow()

CloseNow closes the queue immediately, discarding all remaining items. Both Push and Pop immediately return ErrQueueClosed. Idempotent — calling Close or CloseNow again is a no-op.

func (*BlockingQueue[T]) Len

func (q *BlockingQueue[T]) Len() int

Len returns the number of items currently in the queue.

func (*BlockingQueue[T]) Peek

func (q *BlockingQueue[T]) Peek() (T, bool)

Peek returns the front item without removing it. Returns false if the queue is empty.

func (*BlockingQueue[T]) Pop

func (q *BlockingQueue[T]) Pop(ctx context.Context) (T, error)

Pop removes and returns the front item, blocking if empty. After Close(), returns remaining items, then ErrQueueClosed. After CloseNow(), immediately returns ErrQueueClosed.

func (*BlockingQueue[T]) Push

func (q *BlockingQueue[T]) Push(ctx context.Context, item T) error

Push adds an item to the queue, blocking if full. Returns ErrQueueClosed if the queue is closed. Returns the context error if ctx is canceled.

func (*BlockingQueue[T]) TryPop

func (q *BlockingQueue[T]) TryPop() (T, error)

TryPop removes and returns the front item without blocking. Returns ErrQueueEmpty if empty, ErrQueueClosed if closed (and empty in drain mode).

func (*BlockingQueue[T]) TryPush

func (q *BlockingQueue[T]) TryPush(item T) error

TryPush adds an item without blocking. Returns ErrQueueFull if full, ErrQueueClosed if closed.

type Cache

type Cache[K comparable, V any] interface {
	// Get retrieves a value by key. The second return value reports whether
	// the key was found.
	Get(key K) (V, bool)
	// Set stores a value under the given key.
	Set(key K, value V)
}

Cache is the minimal interface that ReadThrough delegates storage to. Implementations must be safe for concurrent use from multiple goroutines. TTL, eviction, and capacity management are the responsibility of the implementation; ReadThrough only calls Get and Set.

type Dispatcher

type Dispatcher[K comparable, V any] struct {
	// contains filtered or unexported fields
}

Dispatcher routes tasks to fixed goroutines by key hash. Same key always goes to the same slot goroutine, guaranteeing serial execution.

func NewDispatcher

func NewDispatcher[K comparable, V any](slots int, handler func(K, V) error, opts ...DispatcherOption[K, V]) *Dispatcher[K, V]

NewDispatcher creates a dispatcher with the given number of slots.

  • slots: number of worker goroutines (must be >= 1)
  • handler: function called for each (key, value) pair in the assigned slot goroutine

handler and onError must not panic. A panic in either propagates out of the slot goroutine and crashes the program. Callers that need panic isolation should wrap their own handler with a recover.

Panics if slots <= 0 or handler is nil.

func (*Dispatcher[K, V]) Close

func (d *Dispatcher[K, V]) Close()

Close signals all slots to stop and waits for pending tasks to complete. Idempotent — calling Close again is a no-op.

func (*Dispatcher[K, V]) Submit

func (d *Dispatcher[K, V]) Submit(key K, value V) error

Submit submits a task to the slot goroutine assigned to key. Blocks until the slot has capacity or the dispatcher is closed. Use TrySubmit for non-blocking semantics. Returns ErrDispatcherClosed if the dispatcher is closed.

func (*Dispatcher[K, V]) TrySubmit

func (d *Dispatcher[K, V]) TrySubmit(key K, value V) bool

TrySubmit attempts to submit a task without blocking. Returns false if the slot's buffer is full or the dispatcher is closed.

type DispatcherOption

type DispatcherOption[K comparable, V any] func(*Dispatcher[K, V])

DispatcherOption configures a Dispatcher.

func WithBuffer

func WithBuffer[K comparable, V any](n int) DispatcherOption[K, V]

WithBuffer sets the per-slot queue capacity (must be >= 1). The default capacity is 1 (handoff semantics: Submit blocks until the slot goroutine takes the item). Use WithBuffer to increase the buffer size beyond the default. Panics if n < 1.

func WithOnError

func WithOnError[K comparable, V any](fn func(K, V, error)) DispatcherOption[K, V]

WithOnError sets a callback invoked when the handler returns an error. The slot goroutine continues processing subsequent tasks after the callback returns.

type Error

type Error = errorx.Sentinel[syncxTag]

Error is the sentinel error type for the syncx package. It can be used by callers to define additional syncx-domain errors that are distinguishable from errors in other packages.

type Group

type Group[T any] struct {
	// contains filtered or unexported fields
}

Group collects typed results from multiple goroutines. Unlike errgroup which returns only the first error, Group collects all (T, error) pairs, making it suitable for concurrent query aggregation where all results are needed.

func NewGroup

func NewGroup[T any](limit int) *Group[T]

NewGroup creates a new Group with the given concurrency limit. If limit <= 0, there is no limit (all goroutines run concurrently).

func (*Group[T]) Go

func (g *Group[T]) Go(fn func() (T, error))

Go spawns a goroutine to execute fn and collects its result. If a concurrency limit was set, Go blocks until a slot is available. Results are collected internally and returned by Group.Wait in submission order.

Panics if called after Group.Wait.

func (*Group[T]) Wait

func (g *Group[T]) Wait() []Result[T]

Wait blocks until all goroutines complete and returns all results. Results are ordered by submission (the order Go was called).

Panics if called after a previous Wait (double Wait is not allowed).

type KeyedLocker

type KeyedLocker[K comparable] struct {
	// contains filtered or unexported fields
}

KeyedLocker provides per-key read/write locking. Different keys can be locked concurrently; the same key is subject to standard read/write mutual exclusion. Entries are created on demand and removed when their reference count drops to zero, so there is no memory leak for unbounded key spaces.

func NewKeyedLocker

func NewKeyedLocker[K comparable]() *KeyedLocker[K]

NewKeyedLocker returns an initialized KeyedLocker.

func (*KeyedLocker[K]) Len

func (kl *KeyedLocker[K]) Len() int

Len returns the number of active entries currently tracked.

func (*KeyedLocker[K]) Lock

func (kl *KeyedLocker[K]) Lock(key K) func()

Lock acquires an exclusive write lock for key and returns an unlock function. The caller must invoke the returned function exactly once to release the lock.

func (*KeyedLocker[K]) RLock

func (kl *KeyedLocker[K]) RLock(key K) func()

RLock acquires a shared read lock for key and returns an unlock function. The caller must invoke the returned function exactly once to release the lock.

type KeyedMutex

type KeyedMutex[K comparable] struct {
	// contains filtered or unexported fields
}

KeyedMutex provides per-key mutual exclusion. Different keys can be locked concurrently; the same key is serialized. Entries are created on demand and removed when their reference count drops to zero, so there is no memory leak for unbounded key spaces.

func NewKeyedMutex

func NewKeyedMutex[K comparable]() *KeyedMutex[K]

NewKeyedMutex returns an initialized KeyedMutex.

func (*KeyedMutex[K]) Len

func (km *KeyedMutex[K]) Len() int

Len returns the number of active entries currently tracked.

func (*KeyedMutex[K]) Lock

func (km *KeyedMutex[K]) Lock(key K) func()

Lock acquires the mutex for key and returns an unlock function. The caller must invoke the returned function exactly once to release the lock.

type Pool

type Pool[T any] struct {
	// contains filtered or unexported fields
}

Pool is a type-safe wrapper around sync.Pool. It eliminates the need for manual type assertions by encoding the element type in the generic parameter.

The zero value is not usable; construct with NewPool.

func NewPool

func NewPool[T any](create func() T, reset ...func(T)) *Pool[T]

NewPool creates a new Pool.

  • create: required, called when the pool is empty to create a new object.
  • reset: optional (at most one), called on Put to reset the object state before returning to the pool.

Panics if create is nil or more than one reset function is provided.

func (*Pool[T]) Get

func (p *Pool[T]) Get() T

Get retrieves an object from the pool, calling create if the pool is empty. The returned object may have been previously used; callers must initialize or reset the object before use if its fields must be in a known state.

func (*Pool[T]) Put

func (p *Pool[T]) Put(x T)

Put returns an object to the pool, calling reset (if set) before storing.

type ReadThrough

type ReadThrough[K comparable, V any] struct {
	// contains filtered or unexported fields
}

ReadThrough wraps a Cache with per-key stampede protection using KeyedMutex. On a cache miss it calls the caller-provided loader, then populates the cache before returning. Loader errors are not cached.

Callers should prefer immutable value types for V. When V is a mutable reference type (pointer, slice, or map), the caller is responsible for ensuring that cached values are not modified after being returned.

The zero value is not usable; construct with NewReadThrough.

func NewReadThrough

func NewReadThrough[K comparable, V any](
	c Cache[K, V],
	loader func(ctx context.Context, key K) (V, error),
) *ReadThrough[K, V]

NewReadThrough creates a ReadThrough that delegates cache storage to c and loads missing values through loader. Panics if c or loader is nil.

func (*ReadThrough[K, V]) Get

func (rt *ReadThrough[K, V]) Get(ctx context.Context, key K) (V, error)

Get returns the value for key, loading and caching it on miss. On cache hit the loader is not invoked. On miss the loader is called under per-key exclusive lock with double-check, so concurrent callers for the same key block until the first load completes. Loader errors are propagated to the caller and are not cached.

The per-key lock acquisition is not context-aware: if ctx is already canceled when the lock is contended, the goroutine still blocks until the current holder releases it. Context cancellation only affects the loader call itself.

type Result

type Result[T any] struct {
	Value T
	Err   error
}

Result holds the outcome of a single goroutine spawned by Group.

type RingQueue

type RingQueue[T any] struct {
	// contains filtered or unexported fields
}

RingQueue is a generic concurrent ring queue that discards the oldest item when full. Push and PushEvict never block — if the queue is full, the oldest item is evicted. Pop blocks when empty, supporting context cancellation. After Close(), Pop continues returning remaining items then returns ErrQueueClosed. After CloseNow(), all operations immediately return ErrQueueClosed or are silently discarded.

func NewRingQueue

func NewRingQueue[T any](capacity int) *RingQueue[T]

NewRingQueue creates a RingQueue with the given capacity. Panics if capacity < 1.

func (*RingQueue[T]) Close

func (q *RingQueue[T]) Close()

Close closes the queue in drain mode. Push and PushEvict silently discard items. Pop continues returning remaining items, then returns ErrQueueClosed. Idempotent — calling Close or CloseNow again is a no-op.

func (*RingQueue[T]) CloseNow

func (q *RingQueue[T]) CloseNow()

CloseNow closes the queue immediately, discarding all remaining items. Pop immediately returns ErrQueueClosed. Idempotent — calling Close or CloseNow again is a no-op.

func (*RingQueue[T]) Len

func (q *RingQueue[T]) Len() int

Len returns the number of items currently in the queue.

func (*RingQueue[T]) Peek

func (q *RingQueue[T]) Peek() (T, bool)

Peek returns the front item without removing it. Returns false if the queue is empty.

func (*RingQueue[T]) Pop

func (q *RingQueue[T]) Pop(ctx context.Context) (T, error)

Pop removes and returns the front item, blocking if empty. After Close(), returns remaining items, then ErrQueueClosed. After CloseNow(), immediately returns ErrQueueClosed.

func (*RingQueue[T]) Push

func (q *RingQueue[T]) Push(item T)

Push adds an item to the queue. If the queue is full, the oldest item is silently discarded. After Close/CloseNow, the item is silently discarded.

func (*RingQueue[T]) PushEvict

func (q *RingQueue[T]) PushEvict(item T) (T, bool)

PushEvict adds an item and returns the evicted item if the queue was full. Returns (zero, false) if no item was evicted. After Close/CloseNow, the item is silently discarded and (zero, false) is returned.

func (*RingQueue[T]) TryPop

func (q *RingQueue[T]) TryPop() (T, error)

TryPop removes and returns the front item without blocking. Returns ErrQueueEmpty if empty, ErrQueueClosed if closed.

type SingleFlight

type SingleFlight[K comparable, V any] struct {
	// contains filtered or unexported fields
}

SingleFlight deduplicates concurrent calls for the same key. Multiple goroutines requesting the same key simultaneously share a single execution.

func NewSingleFlight

func NewSingleFlight[K comparable, V any]() *SingleFlight[K, V]

NewSingleFlight creates a new SingleFlight instance.

func (*SingleFlight[K, V]) Do

func (sf *SingleFlight[K, V]) Do(key K, fn func() (V, error)) (v V, shared bool, err error)

Do executes fn for the given key if no in-flight call exists for that key. If a call is already in progress, it waits and shares the result.

  • shared=false: this call executed fn
  • shared=true: this call shared another goroutine's result

func (*SingleFlight[K, V]) Forget

func (sf *SingleFlight[K, V]) Forget(key K)

Forget removes the key from the in-flight map, allowing future Do calls for this key to execute fn instead of sharing an in-flight result.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL