pool

package
v0.6.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2026 License: MIT Imports: 6 Imported by: 0

Documentation

Overview

Package pool implements shard-based concurrent command execution.

ShardPool[T] spawns one dispatcher and multiple workers per shard. Each shard ensures serial ordering for commands targeting the same aggregate while allowing parallel execution across different aggregates.

  • Shard.dispatchCommands — Sole owner of versionMap; dispatches commands and handles version corrections on validation failures (single-worker only)
  • Shard.workerLoop — Executes commands in parallel; sends results on completion
  • version management — Incremented per dispatch, conditionally decremented on validation errors only when workersPerShard == 1

Shutdown is two-phase: signal all dispatchers to stop, drain jobQueues until workers finish, then close all channels. Uses sync.WaitGroup for coordination.

Package pool implements shard-based concurrent command execution.

Shard[T] encapsulates dispatcher and worker coordination for a subset of aggregates. The dispatcher (dispatchCommands) is the sole owner of versionMap and decides command dispatch order. Workers (workerLoop) execute commands in parallel.

  • incrementVersion — Called by dispatcher during dispatch; always increments
  • decrementVersion — Called by dispatcher on corrections; only when workersPerShard == 1
  • executeJob — Called by workers; sends corrections only when workersPerShard == 1
  • sendResult — Non-blocking result send; drops if receiver gone

Serial ordering per aggregate is guaranteed by consistent hashing at router level; each aggregate always routes to the same shard and dispatcher sequence.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Shard

type Shard[T any] struct {
	// contains filtered or unexported fields
}

func (*Shard[T]) CommandChan

func (s *Shard[T]) CommandChan() chan *models.CommandEnvelope[T]

func (*Shard[T]) SetOnDispatched

func (s *Shard[T]) SetOnDispatched(fn func())

ForTesting: SetOnDispatched sets a callback invoked each time the dispatcher reads a command from commandChan (slot is now free for new senders). Do not call in production code.

type ShardPool

type ShardPool[T any] struct {
	// contains filtered or unexported fields
}

func New

func New[T any](
	executor *exec.CommandExecutor[T],
	numShards int,
	queueDepth int,
	workersPerShard int,
) *ShardPool[T]

func (*ShardPool[T]) Drain

func (p *ShardPool[T]) Drain(ctx context.Context) error

func (*ShardPool[T]) Shards

func (p *ShardPool[T]) Shards() []*Shard[T]

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL