Documentation
¶
Overview ¶
Package pool implements shard-based concurrent command execution.
ShardPool[T] spawns one dispatcher and multiple workers per shard. Each shard ensures serial ordering for commands targeting the same aggregate while allowing parallel execution across different aggregates.
- Shard.dispatchCommands — Sole owner of versionMap; dispatches commands and handles version corrections on validation failures (single-worker only)
- Shard.workerLoop — Executes commands in parallel; sends results on completion
- version management — Incremented per dispatch, conditionally decremented on validation errors only when workersPerShard == 1
Shutdown is two-phase: signal all dispatchers to stop, drain jobQueues until workers finish, then close all channels. Uses sync.WaitGroup for coordination.
Package pool implements shard-based concurrent command execution.
Shard[T] encapsulates dispatcher and worker coordination for a subset of aggregates. The dispatcher (dispatchCommands) is the sole owner of versionMap and decides command dispatch order. Workers (workerLoop) execute commands in parallel.
- incrementVersion — Called by dispatcher during dispatch; always increments
- decrementVersion — Called by dispatcher on corrections; only when workersPerShard == 1
- executeJob — Called by workers; sends corrections only when workersPerShard == 1
- sendResult — Non-blocking result send; drops if receiver gone
Serial ordering per aggregate is guaranteed by consistent hashing at router level; each aggregate always routes to the same shard and dispatcher sequence.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Shard ¶
type Shard[T any] struct { // contains filtered or unexported fields }
func (*Shard[T]) CommandChan ¶
func (s *Shard[T]) CommandChan() chan *models.CommandEnvelope[T]
func (*Shard[T]) SetOnDispatched ¶
func (s *Shard[T]) SetOnDispatched(fn func())
ForTesting: SetOnDispatched sets a callback invoked each time the dispatcher reads a command from commandChan (slot is now free for new senders). Do not call in production code.