Documentation
¶
Overview ¶
Package syncx provides generic synchronization primitives that extend the standard sync package with type-safe, zero-cast APIs built on Go generics.
Per-key locking: KeyedMutex[K] provides per-key mutual exclusion with automatic entry cleanup via reference counting. KeyedLocker[K] extends this with read/write semantics. Both use RAII-style unlock functions.
Concurrent queues: BlockingQueue[T] and RingQueue[T] offer context-aware blocking, close semantics, and non-blocking try variants.
Cache-aside: ReadThrough[K,V] wraps a Cache backend with per-key stampede protection via double-checked locking.
Object pooling: Pool[T] is a type-safe wrapper around sync.Pool with optional reset.
Concurrency patterns: Dispatcher[K,V] routes keyed work to fixed goroutine slots by hash. SingleFlight[K,V] deduplicates concurrent calls for the same key. Group[T] collects results from multiple goroutines in submission order with panic recovery. Race[T] returns the first successful result from concurrent functions; if all fail, returns the last error.
Index ¶
- Variables
- func Race[T any](ctx context.Context, fns ...func(ctx context.Context) (T, error)) (T, error)
- type BlockingQueue
- func (q *BlockingQueue[T]) Close()
- func (q *BlockingQueue[T]) CloseNow()
- func (q *BlockingQueue[T]) Len() int
- func (q *BlockingQueue[T]) Peek() (T, bool)
- func (q *BlockingQueue[T]) Pop(ctx context.Context) (T, error)
- func (q *BlockingQueue[T]) Push(ctx context.Context, item T) error
- func (q *BlockingQueue[T]) TryPop() (T, error)
- func (q *BlockingQueue[T]) TryPush(item T) error
- type Cache
- type Dispatcher
- type DispatcherOption
- type Error
- type Group
- type KeyedLocker
- type KeyedMutex
- type Pool
- type ReadThrough
- type Result
- type RingQueue
- func (q *RingQueue[T]) Close()
- func (q *RingQueue[T]) CloseNow()
- func (q *RingQueue[T]) Len() int
- func (q *RingQueue[T]) Peek() (T, bool)
- func (q *RingQueue[T]) Pop(ctx context.Context) (T, error)
- func (q *RingQueue[T]) Push(item T)
- func (q *RingQueue[T]) PushEvict(item T) (T, bool)
- func (q *RingQueue[T]) TryPop() (T, error)
- type SingleFlight
Constants ¶
This section is empty.
Variables ¶
var ( ErrQueueClosed = Error("syncx.queue.closed") ErrQueueFull = Error("syncx.queue.full") ErrQueueEmpty = Error("syncx.queue.empty") )
Queue sentinel errors.
var (
ErrDispatcherClosed = Error("syncx.dispatcher.closed")
)
Dispatcher sentinel errors.
Functions ¶
func Race ¶ added in v0.2.0
Race runs fns concurrently and returns the first successful result. If all fns fail, Race returns the last error written by any goroutine (goroutine scheduling order, not submission order). The provided ctx is passed to each fn; once one fn succeeds, the derived context passed to remaining fns is canceled. Race waits for all goroutines to finish before returning to avoid leaks. If fns is empty, Race returns the zero value and nil.
Types ¶
type BlockingQueue ¶
type BlockingQueue[T any] struct { // contains filtered or unexported fields }
BlockingQueue is a generic concurrent queue with blocking and non-blocking modes. It uses a ring buffer backed by sync.Cond for efficient blocking. After Close(), Pop continues returning remaining items then returns ErrQueueClosed. After CloseNow(), all operations immediately return ErrQueueClosed.
func NewBlockingQueue ¶
func NewBlockingQueue[T any](capacity int) *BlockingQueue[T]
NewBlockingQueue creates a BlockingQueue with the given capacity. Panics if capacity < 1.
func (*BlockingQueue[T]) Close ¶
func (q *BlockingQueue[T]) Close()
Close closes the queue in drain mode. Push returns ErrQueueClosed. Pop continues returning remaining items, then returns ErrQueueClosed when empty. Idempotent — calling Close or CloseNow again is a no-op.
func (*BlockingQueue[T]) CloseNow ¶
func (q *BlockingQueue[T]) CloseNow()
CloseNow closes the queue immediately, discarding all remaining items. Both Push and Pop immediately return ErrQueueClosed. Idempotent — calling Close or CloseNow again is a no-op.
func (*BlockingQueue[T]) Len ¶
func (q *BlockingQueue[T]) Len() int
Len returns the number of items currently in the queue.
func (*BlockingQueue[T]) Peek ¶
func (q *BlockingQueue[T]) Peek() (T, bool)
Peek returns the front item without removing it. Returns false if the queue is empty.
func (*BlockingQueue[T]) Pop ¶
func (q *BlockingQueue[T]) Pop(ctx context.Context) (T, error)
Pop removes and returns the front item, blocking if empty. After Close(), returns remaining items, then ErrQueueClosed. After CloseNow(), immediately returns ErrQueueClosed.
func (*BlockingQueue[T]) Push ¶
func (q *BlockingQueue[T]) Push(ctx context.Context, item T) error
Push adds an item to the queue, blocking if full. Returns ErrQueueClosed if the queue is closed. Returns the context error if ctx is canceled.
func (*BlockingQueue[T]) TryPop ¶
func (q *BlockingQueue[T]) TryPop() (T, error)
TryPop removes and returns the front item without blocking. Returns ErrQueueEmpty if empty, ErrQueueClosed if closed (and empty in drain mode).
func (*BlockingQueue[T]) TryPush ¶
func (q *BlockingQueue[T]) TryPush(item T) error
TryPush adds an item without blocking. Returns ErrQueueFull if full, ErrQueueClosed if closed.
type Cache ¶
type Cache[K comparable, V any] interface { // Get retrieves a value by key. The second return value reports whether // the key was found. Get(key K) (V, bool) // Set stores a value under the given key. Set(key K, value V) }
Cache is the minimal interface that ReadThrough delegates storage to. Implementations must be safe for concurrent use from multiple goroutines. TTL, eviction, and capacity management are the responsibility of the implementation; ReadThrough only calls Get and Set.
type Dispatcher ¶
type Dispatcher[K comparable, V any] struct { // contains filtered or unexported fields }
Dispatcher routes tasks to fixed goroutines by key hash. Same key always goes to the same slot goroutine, guaranteeing serial execution.
func NewDispatcher ¶
func NewDispatcher[K comparable, V any](slots int, handler func(K, V) error, opts ...DispatcherOption[K, V]) *Dispatcher[K, V]
NewDispatcher creates a dispatcher with the given number of slots.
- slots: number of worker goroutines (must be >= 1)
- handler: function called for each (key, value) pair in the assigned slot goroutine
handler and onError must not panic. A panic in either propagates out of the slot goroutine and crashes the program. Callers that need panic isolation should wrap their own handler with a recover.
Panics if slots <= 0 or handler is nil.
func (*Dispatcher[K, V]) Close ¶
func (d *Dispatcher[K, V]) Close()
Close signals all slots to stop and waits for pending tasks to complete. Idempotent — calling Close again is a no-op.
func (*Dispatcher[K, V]) Submit ¶
func (d *Dispatcher[K, V]) Submit(key K, value V) error
Submit submits a task to the slot goroutine assigned to key. Blocks until the slot has capacity or the dispatcher is closed. Use TrySubmit for non-blocking semantics. Returns ErrDispatcherClosed if the dispatcher is closed.
func (*Dispatcher[K, V]) TrySubmit ¶
func (d *Dispatcher[K, V]) TrySubmit(key K, value V) bool
TrySubmit attempts to submit a task without blocking. Returns false if the slot's buffer is full or the dispatcher is closed.
type DispatcherOption ¶
type DispatcherOption[K comparable, V any] func(*Dispatcher[K, V])
DispatcherOption configures a Dispatcher.
func WithBuffer ¶
func WithBuffer[K comparable, V any](n int) DispatcherOption[K, V]
WithBuffer sets the per-slot queue capacity (must be >= 1). The default capacity is 1 (handoff semantics: Submit blocks until the slot goroutine takes the item). Use WithBuffer to increase the buffer size beyond the default. Panics if n < 1.
func WithOnError ¶
func WithOnError[K comparable, V any](fn func(K, V, error)) DispatcherOption[K, V]
WithOnError sets a callback invoked when the handler returns an error. The slot goroutine continues processing subsequent tasks after the callback returns.
type Error ¶
Error is the sentinel error type for the syncx package. It can be used by callers to define additional syncx-domain errors that are distinguishable from errors in other packages.
type Group ¶
type Group[T any] struct { // contains filtered or unexported fields }
Group collects typed results from multiple goroutines. Unlike errgroup which returns only the first error, Group collects all (T, error) pairs, making it suitable for concurrent query aggregation where all results are needed.
func NewGroup ¶
NewGroup creates a new Group with the given concurrency limit. If limit <= 0, there is no limit (all goroutines run concurrently).
func (*Group[T]) Go ¶
Go spawns a goroutine to execute fn and collects its result. If a concurrency limit was set, Go blocks until a slot is available. Results are collected internally and returned by Group.Wait in submission order.
Panics if called after Group.Wait.
type KeyedLocker ¶
type KeyedLocker[K comparable] struct { // contains filtered or unexported fields }
KeyedLocker provides per-key read/write locking. Different keys can be locked concurrently; the same key is subject to standard read/write mutual exclusion. Entries are created on demand and removed when their reference count drops to zero, so there is no memory leak for unbounded key spaces.
func NewKeyedLocker ¶
func NewKeyedLocker[K comparable]() *KeyedLocker[K]
NewKeyedLocker returns an initialized KeyedLocker.
func (*KeyedLocker[K]) Len ¶
func (kl *KeyedLocker[K]) Len() int
Len returns the number of active entries currently tracked.
func (*KeyedLocker[K]) Lock ¶
func (kl *KeyedLocker[K]) Lock(key K) func()
Lock acquires an exclusive write lock for key and returns an unlock function. The caller must invoke the returned function exactly once to release the lock.
func (*KeyedLocker[K]) RLock ¶
func (kl *KeyedLocker[K]) RLock(key K) func()
RLock acquires a shared read lock for key and returns an unlock function. The caller must invoke the returned function exactly once to release the lock.
type KeyedMutex ¶
type KeyedMutex[K comparable] struct { // contains filtered or unexported fields }
KeyedMutex provides per-key mutual exclusion. Different keys can be locked concurrently; the same key is serialized. Entries are created on demand and removed when their reference count drops to zero, so there is no memory leak for unbounded key spaces.
func NewKeyedMutex ¶
func NewKeyedMutex[K comparable]() *KeyedMutex[K]
NewKeyedMutex returns an initialized KeyedMutex.
func (*KeyedMutex[K]) Len ¶
func (km *KeyedMutex[K]) Len() int
Len returns the number of active entries currently tracked.
func (*KeyedMutex[K]) Lock ¶
func (km *KeyedMutex[K]) Lock(key K) func()
Lock acquires the mutex for key and returns an unlock function. The caller must invoke the returned function exactly once to release the lock.
type Pool ¶
type Pool[T any] struct { // contains filtered or unexported fields }
Pool is a type-safe wrapper around sync.Pool. It eliminates the need for manual type assertions by encoding the element type in the generic parameter.
The zero value is not usable; construct with NewPool.
func NewPool ¶
NewPool creates a new Pool.
- create: required, called when the pool is empty to create a new object.
- reset: optional (at most one), called on Put to reset the object state before returning to the pool.
Panics if create is nil or more than one reset function is provided.
type ReadThrough ¶
type ReadThrough[K comparable, V any] struct { // contains filtered or unexported fields }
ReadThrough wraps a Cache with per-key stampede protection using KeyedMutex. On a cache miss it calls the caller-provided loader, then populates the cache before returning. Loader errors are not cached.
Callers should prefer immutable value types for V. When V is a mutable reference type (pointer, slice, or map), the caller is responsible for ensuring that cached values are not modified after being returned.
The zero value is not usable; construct with NewReadThrough.
func NewReadThrough ¶
func NewReadThrough[K comparable, V any]( c Cache[K, V], loader func(ctx context.Context, key K) (V, error), ) *ReadThrough[K, V]
NewReadThrough creates a ReadThrough that delegates cache storage to c and loads missing values through loader. Panics if c or loader is nil.
func (*ReadThrough[K, V]) Get ¶
func (rt *ReadThrough[K, V]) Get(ctx context.Context, key K) (V, error)
Get returns the value for key, loading and caching it on miss. On cache hit the loader is not invoked. On miss the loader is called under per-key exclusive lock with double-check, so concurrent callers for the same key block until the first load completes. Loader errors are propagated to the caller and are not cached.
The per-key lock acquisition is not context-aware: if ctx is already canceled when the lock is contended, the goroutine still blocks until the current holder releases it. Context cancellation only affects the loader call itself.
type RingQueue ¶
type RingQueue[T any] struct { // contains filtered or unexported fields }
RingQueue is a generic concurrent ring queue that discards the oldest item when full. Push and PushEvict never block — if the queue is full, the oldest item is evicted. Pop blocks when empty, supporting context cancellation. After Close(), Pop continues returning remaining items then returns ErrQueueClosed. After CloseNow(), all operations immediately return ErrQueueClosed or are silently discarded.
func NewRingQueue ¶
NewRingQueue creates a RingQueue with the given capacity. Panics if capacity < 1.
func (*RingQueue[T]) Close ¶
func (q *RingQueue[T]) Close()
Close closes the queue in drain mode. Push and PushEvict silently discard items. Pop continues returning remaining items, then returns ErrQueueClosed. Idempotent — calling Close or CloseNow again is a no-op.
func (*RingQueue[T]) CloseNow ¶
func (q *RingQueue[T]) CloseNow()
CloseNow closes the queue immediately, discarding all remaining items. Pop immediately returns ErrQueueClosed. Idempotent — calling Close or CloseNow again is a no-op.
func (*RingQueue[T]) Peek ¶
Peek returns the front item without removing it. Returns false if the queue is empty.
func (*RingQueue[T]) Pop ¶
Pop removes and returns the front item, blocking if empty. After Close(), returns remaining items, then ErrQueueClosed. After CloseNow(), immediately returns ErrQueueClosed.
func (*RingQueue[T]) Push ¶
func (q *RingQueue[T]) Push(item T)
Push adds an item to the queue. If the queue is full, the oldest item is silently discarded. After Close/CloseNow, the item is silently discarded.
type SingleFlight ¶
type SingleFlight[K comparable, V any] struct { // contains filtered or unexported fields }
SingleFlight deduplicates concurrent calls for the same key. Multiple goroutines requesting the same key simultaneously share a single execution.
func NewSingleFlight ¶
func NewSingleFlight[K comparable, V any]() *SingleFlight[K, V]
NewSingleFlight creates a new SingleFlight instance.
func (*SingleFlight[K, V]) Do ¶
func (sf *SingleFlight[K, V]) Do(key K, fn func() (V, error)) (v V, shared bool, err error)
Do executes fn for the given key if no in-flight call exists for that key. If a call is already in progress, it waits and shares the result.
- shared=false: this call executed fn
- shared=true: this call shared another goroutine's result
func (*SingleFlight[K, V]) Forget ¶
func (sf *SingleFlight[K, V]) Forget(key K)
Forget removes the key from the in-flight map, allowing future Do calls for this key to execute fn instead of sharing an in-flight result.