Documentation
¶
Overview ¶
Package strategy provides built-in assignment strategy implementations.
Assignment strategies determine how partitions are distributed across workers. The package includes three built-in strategies:
- WeightedConsistentHash: Weighted consistent hashing with extreme partition handling and soft load caps (recommended for weighted workloads)
- ConsistentHash: Standard consistent hashing with virtual nodes (recommended for equal-weight partitions)
- RoundRobin: Simple round-robin distribution
Strategy Selection Guide ¶
WeightedConsistentHash:
- Use when partitions have significantly different processing costs
- Balances cache affinity with load distribution
- Handles extreme partitions (2x+ average weight) via round-robin
- Applies soft load caps to prevent worker overload
- Configuration: virtual nodes, hash seed, overload threshold, extreme threshold
ConsistentHash:
- Use when all partitions have equal or similar weights
- Maximizes cache affinity during scaling events
- Minimal configuration: virtual nodes, hash seed
RoundRobin:
- Use for simple, stateless workloads
- Guarantees even distribution
- No cache affinity preservation
Custom strategies can be implemented by satisfying the types.AssignmentStrategy interface.
Index ¶
- Variables
- type ConsistentHash
- type ConsistentHashOption
- type RoundRobin
- type WeightedConsistentHash
- type WeightedConsistentHashOption
- func WithDefaultWeight(weight int64) WeightedConsistentHashOption
- func WithExtremeThreshold(threshold float64) WeightedConsistentHashOption
- func WithMinPartitionCount(factor float64) WeightedConsistentHashOption
- func WithOverloadThreshold(threshold float64) WeightedConsistentHashOption
- func WithWeightedHashSeed(seed uint64) WeightedConsistentHashOption
- func WithWeightedLogger(logger types.Logger) WeightedConsistentHashOption
- func WithWeightedVirtualNodes(nodes int) WeightedConsistentHashOption
Constants ¶
This section is empty.
Variables ¶
var ErrNoWorkers = errors.New("no workers available for assignment")
ErrNoWorkers indicates that no workers were provided for assignment.
Functions ¶
This section is empty.
Types ¶
type ConsistentHash ¶
type ConsistentHash struct {
// contains filtered or unexported fields
}
ConsistentHash implements consistent hashing with virtual nodes.
func NewConsistentHash ¶
func NewConsistentHash(opts ...ConsistentHashOption) *ConsistentHash
NewConsistentHash creates a new consistent hash strategy.
The strategy uses a hash ring with virtual nodes to distribute partitions evenly across workers while minimizing partition movement during scaling. Achieves >80% cache affinity during rebalancing.
Parameters:
- opts: Optional configuration (WithVirtualNodes, WithHashSeed)
Returns:
- *ConsistentHash: Initialized consistent hash strategy
Example:
strategy := strategy.NewConsistentHash(
strategy.WithVirtualNodes(300),
)
js, _ := jetstream.New(conn)
mgr, err := parti.NewManager(&cfg, js, src, strategy)
if err != nil { /* handle */ }
func (*ConsistentHash) Assign ¶
func (ch *ConsistentHash) Assign(workers []string, partitions []types.Partition) (map[string][]types.Partition, error)
Assign calculates partition assignments using consistent hashing.
The algorithm:
- Build hash ring with virtual nodes for each worker
- Place each partition on ring based on hash of partition keys
- Assign partition to nearest clockwise virtual node
- Apply weight balancing if partition weights differ
Parameters:
- workers: List of worker IDs (e.g., ["worker-0", "worker-1"])
- partitions: List of partitions to assign
Returns:
- map[string][]types.Partition: Map from workerID to assigned partitions
- error: Assignment error (e.g., no workers available)
Example:
assignments, err := strategy.Assign(
[]string{"worker-0", "worker-1"},
partitions,
)
type ConsistentHashOption ¶
type ConsistentHashOption func(*ConsistentHash)
ConsistentHashOption configures a ConsistentHash strategy.
func WithHashSeed ¶
func WithHashSeed(seed uint64) ConsistentHashOption
WithHashSeed sets a custom hash seed for consistent hashing.
Parameters:
- seed: Hash seed value
Returns:
- consistentHashOption: Configuration option
func WithVirtualNodes ¶
func WithVirtualNodes(nodes int) ConsistentHashOption
WithVirtualNodes sets the number of virtual nodes per worker.
Higher values provide better distribution but increase memory usage. Recommended range: 100-300 (default: 150). Values below 1 are clamped to 1.
Parameters:
- nodes: Number of virtual nodes per worker (minimum: 1)
Returns:
- consistentHashOption: Configuration option
type RoundRobin ¶
type RoundRobin struct{}
RoundRobin implements simple round-robin partition assignment.
func NewRoundRobin ¶
func NewRoundRobin() *RoundRobin
NewRoundRobin creates a new round-robin strategy.
The strategy distributes partitions evenly across workers in a simple round-robin fashion. This provides predictable assignment but does not preserve cache affinity during scaling.
Returns:
- *RoundRobin: Initialized round-robin strategy
Example:
strategy := strategy.NewRoundRobin()
js, _ := jetstream.New(conn)
mgr, err := parti.NewManager(&cfg, js, src, strategy)
if err != nil { /* handle */ }
func (*RoundRobin) Assign ¶
func (rr *RoundRobin) Assign(workers []string, partitions []types.Partition) (map[string][]types.Partition, error)
Assign calculates partition assignments using round-robin distribution.
The algorithm:
- Sort workers and partitions for deterministic assignment
- Distribute partitions evenly in round-robin fashion
Parameters:
- workers: List of worker IDs (e.g., ["worker-0", "worker-1"])
- partitions: List of partitions to assign
Returns:
- map[string][]types.Partition: Map from workerID to assigned partitions
- error: Assignment error (e.g., no workers available)
Example:
assignments, err := strategy.Assign(
[]string{"worker-0", "worker-1"},
partitions,
)
type WeightedConsistentHash ¶
type WeightedConsistentHash struct {
// contains filtered or unexported fields
}
WeightedConsistentHash implements weighted consistent hashing with extreme partition handling.
func NewWeightedConsistentHash ¶
func NewWeightedConsistentHash(opts ...WeightedConsistentHashOption) *WeightedConsistentHash
NewWeightedConsistentHash creates a new weighted consistent hash strategy.
Parameters:
- opts: Optional configuration (WithWeightedVirtualNodes, WithWeightedHashSeed, WithOverloadThreshold, WithExtremeThreshold, WithMinPartitionCount, WithDefaultWeight, WithWeightedLogger)
Returns:
- *WeightedConsistentHash: Initialized weighted consistent hash strategy ready for use.
func (*WeightedConsistentHash) Assign ¶
func (wch *WeightedConsistentHash) Assign(workers []string, partitions []types.Partition) (map[string][]types.Partition, error)
Assign calculates partition assignments using weighted consistent hashing with extreme partition handling.
The algorithm balances two competing goals:
- Cache affinity - Keep partitions on the same workers across rebalancing (via consistent hashing)
- Load balance - Prevent workers from being overloaded by heavy partitions
Algorithm Overview:
- Validation - Check for workers and normalize partition weights
- Equal-weight fast path - When all partitions have the same weight, use pure consistent hashing
- Two-phase weighted assignment: a. Extreme partitions - Distribute heavy partitions (weight > avgWeight * extremeThreshold) round-robin b. Normal partitions - Assign remaining partitions using consistent hashing with soft load cap
The soft load cap (avgWeight * overloadThreshold) allows some imbalance to preserve cache affinity, but reassigns partitions to the lightest worker when the cap is exceeded.
Parameters:
- workers: List of worker IDs to assign partitions to
- partitions: List of partitions to distribute across workers
Returns:
- map[string][]types.Partition: Worker ID → assigned partitions
- error: ErrNoWorkers if workers list is empty, nil otherwise
Example:
strategy := NewWeightedConsistentHash(
WithOverloadThreshold(1.3), // Allow 30% overload
WithExtremeThreshold(2.0), // Partitions 2x average are "extreme"
)
assignments, err := strategy.Assign(workers, partitions)
if err != nil {
log.Fatal(err)
}
type WeightedConsistentHashOption ¶
type WeightedConsistentHashOption func(*WeightedConsistentHash)
WeightedConsistentHashOption configures a WeightedConsistentHash strategy.
func WithDefaultWeight ¶
func WithDefaultWeight(weight int64) WeightedConsistentHashOption
WithDefaultWeight sets the default weight applied when a partition reports zero weight.
func WithExtremeThreshold ¶
func WithExtremeThreshold(threshold float64) WeightedConsistentHashOption
WithExtremeThreshold sets the multiplier used to classify extreme partitions.
func WithMinPartitionCount ¶
func WithMinPartitionCount(factor float64) WeightedConsistentHashOption
WithMinPartitionCount sets the minimum percentage of average partition count that a worker must accept before load shedding occurs. factor: 0.0 to 1.0 (e.g., 0.3 means 30% of avg count)
func WithOverloadThreshold ¶
func WithOverloadThreshold(threshold float64) WeightedConsistentHashOption
WithOverloadThreshold sets the maximum allowed load variance per worker.
func WithWeightedHashSeed ¶
func WithWeightedHashSeed(seed uint64) WeightedConsistentHashOption
WithWeightedHashSeed sets a custom hash seed for consistent hashing.
func WithWeightedLogger ¶
func WithWeightedLogger(logger types.Logger) WeightedConsistentHashOption
WithWeightedLogger sets the logger used for configuration warnings and debug diagnostics.
func WithWeightedVirtualNodes ¶
func WithWeightedVirtualNodes(nodes int) WeightedConsistentHashOption
WithWeightedVirtualNodes sets the number of virtual nodes per worker.