Documentation
¶
Overview ¶
Package parti provides a Go library for NATS-based work partitioning with stable worker IDs and leader-based coordination.
Parti enables distributed systems to dynamically assign work partitions across worker instances without requiring external coordination services. It provides stable worker identities, cache-affinity-aware rebalancing, and adaptive stabilization for different scaling scenarios.
Quick Start ¶
Basic usage with default settings:
import (
"github.com/arloliu/parti"
"github.com/arloliu/parti/source"
"github.com/arloliu/parti/strategy"
)
cfg := parti.Config{
WorkerIDPrefix: "worker",
WorkerIDMin: 0,
WorkerIDMax: 999,
}
partitions := []parti.Partition{{ID: "0"}, {ID: "1"}, {ID: "2"}}
src := source.NewStatic(partitions)
js, _ := jetstream.New(natsConn)
assignmentStrategy := strategy.NewConsistentHash()
mgr, _ := parti.NewManager(&cfg, js, src, assignmentStrategy)
if err := mgr.Start(ctx); err != nil {
log.Fatal(err)
}
defer mgr.Stop(context.Background())
Key Features ¶
- Stable Worker IDs: Workers claim stable IDs for consistent assignment during rolling updates
- Leader-Based Assignment: One worker calculates assignments without external coordination
- Adaptive Rebalancing: Different stabilization windows for cold start (30s) vs planned scale (10s)
- Cache Affinity: Preserves >80% partition locality during rebalancing
- Weighted Assignment: Supports partition weights for load balancing
Architecture ¶
Workers progress through a state machine:
INIT → CLAIMING_ID → ELECTION → WAITING_ASSIGNMENT → STABLE
The leader monitors heartbeats, detects topology changes, and publishes new assignments. All workers watch for assignment updates and trigger callbacks when their partitions change.
Advanced Usage ¶
Custom strategy with options:
import (
"github.com/arloliu/parti"
"github.com/arloliu/parti/source"
"github.com/arloliu/parti/strategy"
)
assignmentStrategy := strategy.NewConsistentHash(
strategy.WithVirtualNodes(300),
)
hooks := &parti.Hooks{
OnAssignmentChanged: func(ctx context.Context, oldPartitions, newPartitions []parti.Partition) error {
// Handle full assignment change; derive added/removed by diffing old vs new if needed.
return nil
},
}
partitions := []parti.Partition{{ID: "0"}, {ID: "1"}, {ID: "2"}}
src := source.NewStatic(partitions)
js, _ := jetstream.New(natsConn)
mgr, _ := parti.NewManager(&cfg, js, src, assignmentStrategy,
parti.WithHooks(hooks),
)
See the examples/ directory for complete working examples.
Index ¶
- Constants
- Variables
- func SetDefaults(cfg *Config)
- type AlertLevel
- type Assignment
- type AssignmentStrategy
- type CompositeConsumerUpdater
- type Config
- type DegradedAlertConfig
- type DegradedBehaviorConfig
- type ElectionAgent
- type HandoffClaim
- type HandoffClaimState
- type HandoffConfig
- type Hooks
- type KVBucketConfig
- type Logger
- type Manager
- func (m *Manager) CurrentAssignment() Assignment
- func (m *Manager) InspectHandoffClaims(ctx context.Context) ([]HandoffClaim, error)
- func (m *Manager) IsInRecoveryGrace() bool
- func (m *Manager) IsLeader() bool
- func (m *Manager) RefreshPartitions(ctx context.Context) error
- func (m *Manager) Start(ctx context.Context) error
- func (m *Manager) State() State
- func (m *Manager) Stop(ctx context.Context) error
- func (m *Manager) WaitState(expectedState State, timeout time.Duration) <-chan error
- func (m *Manager) WorkerID() string
- type MetricsCollector
- type Option
- func WithElectionAgent(agent ElectionAgent) Option
- func WithHandoffMetricsRecorder(mr handoff.MetricsRecorder) Option
- func WithHooks(hooks *Hooks) Option
- func WithLogger(logger Logger) Option
- func WithMetrics(metrics MetricsCollector) Option
- func WithWorkerConsumerUpdater(updater WorkerConsumerUpdater) Option
- type Partition
- type PartitionSource
- type PartitionSourceUpdater
- type PartitionUpdater
- type State
- type WatchablePartitionSource
- type WorkerConsumerUpdater
Constants ¶
const ( StateInit = types.StateInit StateClaimingID = types.StateClaimingID StateElection = types.StateElection StateWaitingAssignment = types.StateWaitingAssignment StateStable = types.StateStable StateScaling = types.StateScaling StateRebalancing = types.StateRebalancing StateEmergency = types.StateEmergency StateDegraded = types.StateDegraded StateShutdown = types.StateShutdown )
Re-export State constants from the internal types package.
const ( HandoffStateUnknown = types.HandoffStateUnknown HandoffStateStable = types.HandoffStateStable HandoffStatePrepare = types.HandoffStatePrepare HandoffStateCommit = types.HandoffStateCommit )
Re-export HandoffState constants.
Variables ¶
var ( ErrInvalidConfig = types.ErrInvalidConfig ErrNATSConnectionRequired = types.ErrNATSConnectionRequired ErrPartitionSourceRequired = types.ErrPartitionSourceRequired ErrAssignmentStrategyRequired = types.ErrAssignmentStrategyRequired ErrAlreadyStarted = types.ErrAlreadyStarted ErrNotStarted = types.ErrNotStarted ErrNotImplemented = types.ErrNotImplemented ErrNoWorkersAvailable = types.ErrNoWorkersAvailable ErrInvalidWorkerID = types.ErrInvalidWorkerID ErrElectionFailed = types.ErrElectionFailed ErrConnectivity = types.ErrConnectivity ErrDegraded = types.ErrDegraded ErrIDClaimFailed = types.ErrIDClaimFailed ErrAssignmentFailed = types.ErrAssignmentFailed )
Re-export sentinel errors from the internal types package.
These are stable, comparable values intended for use with errors.Is(). Keeping them in the root package provides a convenient public API while allowing internal packages to depend on the `types` subpackage without importing the root package.
Functions ¶
func SetDefaults ¶
func SetDefaults(cfg *Config)
SetDefaults applies default values to zero-valued configuration fields. If a field is zero-valued, it will be set to the corresponding default value.
Types ¶
type AlertLevel ¶
type AlertLevel int
AlertLevel represents the severity level of degraded mode alerts.
const ( // AlertLevelInfo indicates informational alerts (least severe). AlertLevelInfo AlertLevel = iota // AlertLevelWarn indicates warning alerts. AlertLevelWarn // AlertLevelError indicates error alerts. AlertLevelError // AlertLevelCritical indicates critical alerts (most severe). AlertLevelCritical )
func (AlertLevel) String ¶
func (l AlertLevel) String() string
String returns the string representation of the alert level.
Returns:
- string: Alert level name ("Info", "Warn", "Error", "Critical", or "Unknown")
type Assignment ¶
type Assignment = types.Assignment
Re-export types from the internal types package.
This file provides a stable, backward-compatible public API for the library's core types and interfaces. It uses type aliases to re-export definitions from the `types` subpackage, which contains the actual implementations.
This pattern solves the "import cycle" problem by allowing internal packages to depend on `types` without depending on the root `parti` package, while still providing a convenient `parti.State`, `parti.Logger`, etc. for users.
type AssignmentStrategy ¶
type AssignmentStrategy = types.AssignmentStrategy
Re-export interfaces from the internal types package for convenience.
type CompositeConsumerUpdater ¶ added in v1.6.0
type CompositeConsumerUpdater struct {
// contains filtered or unexported fields
}
CompositeConsumerUpdater combines multiple WorkerConsumerUpdater instances. When UpdateWorkerConsumer is called, it fans out to all registered updaters.
Use cases:
- Register both WorkerConsumer (per-partition) and BroadcastConsumer (fan-out)
- Multiple BroadcastConsumer instances for different stream/subject patterns
Error handling:
- Calls all updaters even if some fail
- Returns a combined error with all failures
func NewCompositeConsumerUpdater ¶ added in v1.6.0
func NewCompositeConsumerUpdater(updaters ...WorkerConsumerUpdater) *CompositeConsumerUpdater
NewCompositeConsumerUpdater creates a composite from multiple updaters. All provided updaters will receive partition updates when UpdateWorkerConsumer is called.
Example:
wc, _ := subscription.NewWorkerConsumer(js, wcConfig, handler1)
bc, _ := subscription.NewBroadcastConsumer(js, bcConfig, handler2)
composite := parti.NewCompositeConsumerUpdater(wc, bc)
mgr, _ := parti.NewManager(cfg, js, src, strategy,
parti.WithWorkerConsumerUpdater(composite),
)
func (*CompositeConsumerUpdater) Add ¶ added in v1.6.0
func (c *CompositeConsumerUpdater) Add(updaters ...WorkerConsumerUpdater)
Add appends additional updaters to the composite. This is useful for dynamically registering consumers after creation.
func (*CompositeConsumerUpdater) Len ¶ added in v1.6.0
func (c *CompositeConsumerUpdater) Len() int
Len returns the number of registered updaters.
func (*CompositeConsumerUpdater) UpdateWorkerConsumer ¶ added in v1.6.0
func (c *CompositeConsumerUpdater) UpdateWorkerConsumer(ctx context.Context, workerID string, partitions []Partition) error
UpdateWorkerConsumer implements WorkerConsumerUpdater. Calls UpdateWorkerConsumer on all registered updaters, collecting any errors. All updaters are called even if some fail.
type Config ¶
type Config struct {
// WorkerIDPrefix is the prefix for worker IDs (e.g., "worker" produces "worker-0", "worker-1").
WorkerIDPrefix string `yaml:"workerIdPrefix" default:"worker" validate:"required"`
// WorkerIDMin is the minimum stable ID number (inclusive).
// Set to 0 for most use cases.
WorkerIDMin int `yaml:"workerIdMin" default:"0" validate:"gte=0"`
// WorkerIDMax is the maximum stable ID number (inclusive).
// Determines the maximum number of concurrent workers: (WorkerIDMax - WorkerIDMin + 1).
// For example, WorkerIDMin=0 and WorkerIDMax=999 allows up to 1000 workers.
WorkerIDMax int `yaml:"workerIdMax" default:"999" validate:"gtefield=WorkerIDMin"`
// WorkerIDTTL is how long a worker ID claim remains valid in the key-value store.
// Must be greater than HeartbeatInterval to prevent premature expiration.
// Recommended: 3-5x HeartbeatInterval.
WorkerIDTTL time.Duration `yaml:"workerIdTtl" default:"30s" validate:"gt=0,gtefield=HeartbeatTTL"`
// HeartbeatInterval is how often workers publish heartbeat messages.
// Shorter intervals provide faster failure detection but increase network traffic.
// Recommended: 2-5 seconds.
HeartbeatInterval time.Duration `yaml:"heartbeatInterval" default:"2s" validate:"gt=0"`
// HeartbeatTTL is how long heartbeat messages remain valid before a worker is considered failed.
// Must be greater than HeartbeatInterval.
// Recommended: 3x HeartbeatInterval.
HeartbeatTTL time.Duration `yaml:"heartbeatTtl" default:"6s" validate:"gt=0"`
// ColdStartWindow is the stabilization period when starting workers from zero.
// During this window, partition assignment is delayed to allow all initial workers to join.
// Recommended: 30 seconds.
ColdStartWindow time.Duration `yaml:"coldStartWindow" default:"30s" validate:"gt=0,gtefield=PlannedScaleWindow"`
// PlannedScaleWindow is the stabilization period during rolling updates or planned scaling.
// Shorter than ColdStartWindow to minimize disruption during controlled changes.
// Recommended: 10 seconds.
PlannedScaleWindow time.Duration `yaml:"plannedScaleWindow" default:"10s" validate:"gt=0"`
// EmergencyGracePeriod is the minimum time a worker must be missing before
// triggering emergency rebalance. Prevents false positives from transient
// network issues or brief connectivity loss.
//
// Default: 0 (auto-calculated as 1.5 * HeartbeatInterval)
// Recommended: 1.5-2.0 * HeartbeatInterval
// Constraint: Must be <= HeartbeatTTL
EmergencyGracePeriod time.Duration `yaml:"emergencyGracePeriod" validate:"ltefield=HeartbeatTTL"`
// RestartDetectionRatio determines when a restart is classified as cold start vs planned.
// If (failed workers / total workers) > ratio, it's treated as a cold start.
// For example, 0.5 means if >50% of workers fail simultaneously, use ColdStartWindow.
// Recommended: 0.5.
RestartDetectionRatio float64 `yaml:"restartDetectionRatio" default:"0.5" validate:"gte=0,lte=1"`
// OperationTimeout is the timeout for KV operations (get, put, delete).
// Recommended: 10 seconds.
OperationTimeout time.Duration `yaml:"operationTimeout" default:"10s" validate:"gt=0"`
// ElectionTimeout is the maximum time to wait for leader election to complete.
// Recommended: 5 seconds.
ElectionTimeout time.Duration `yaml:"electionTimeout" default:"5s" validate:"gt=0"`
// StartupTimeout is the maximum time to wait for the manager to fully start.
// Includes worker ID claiming, leader election, and initial partition assignment.
// Recommended: 30 seconds.
StartupTimeout time.Duration `yaml:"startupTimeout" default:"30s" validate:"gt=0"`
// ShutdownTimeout is the maximum time to wait for graceful shutdown.
// Includes releasing worker ID, stopping heartbeats, and cleanup operations.
// Recommended: 10 seconds.
ShutdownTimeout time.Duration `yaml:"shutdownTimeout" default:"10s" validate:"gt=0"`
// RebalanceCooldown is the minimum time between rebalancing operations.
//
// Enforces rate limiting BEFORE stabilization windows to prevent thrashing
// during rapid topology changes. If a rebalance was completed <RebalanceCooldown
// ago, new topology changes are deferred until the interval expires.
//
// Default: 10 seconds
// Recommendation: Should be <= PlannedScaleWindow for proper coordination
//
// Note: This was renamed from MinRebalanceInterval in v0.x for semantic clarity.
RebalanceCooldown time.Duration `yaml:"rebalanceCooldown" default:"10s" validate:"gt=0,ltefield=PlannedScaleWindow"`
// KVBuckets controls NATS JetStream KV bucket configuration.
KVBuckets KVBucketConfig `yaml:"kvBuckets"`
// DegradedAlert controls alert emission during degraded mode operation.
DegradedAlert DegradedAlertConfig `yaml:"degradedAlert"`
// DegradedBehavior controls when the manager enters and exits degraded mode.
DegradedBehavior DegradedBehaviorConfig `yaml:"degradedBehavior"`
// EnableTwoPhaseHandoff gates the manager-side two-phase handoff coordinator.
//
// When false (default), assignment changes are applied directly via the
// WorkerConsumerUpdater in a simple "remove/add" sequence managed by the
// manager without KV claims. This keeps the control plane minimal but can
// permit a brief duplicate-consumption window during reassignment.
//
// When true, the manager initializes a modular handoff coordinator which
// can implement a prepare/commit protocol to make ownership transitions
// atomic, single-owner, auditable, and crash-resumable. This feature is
// wired behind a clean interface and can be enabled/disabled without
// scattering conditional logic throughout the manager code path.
EnableTwoPhaseHandoff bool `yaml:"enableTwoPhaseHandoff" default:"false"`
// Handoff controls two-phase handoff tuning knobs.
// Only applied when EnableTwoPhaseHandoff is true.
Handoff HandoffConfig `yaml:"handoff"`
}
Config is the configuration for the Manager.
All duration fields accept standard Go duration strings like "30s", "5m", "1h".
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns a Config with sensible defaults.
Returns:
- Config: Configuration with default values
func TestConfig ¶
func TestConfig() Config
TestConfig returns a configuration optimized for fast test execution.
Test timings are 10-100x faster than production defaults to enable rapid iteration without sacrificing test coverage. Use DefaultConfig() for production deployments.
Returns:
- Config: Configuration with fast timings for tests
Example:
cfg := parti.TestConfig() cfg.WorkerIDPrefix = "test-worker" manager, err := parti.NewManager(nc, cfg)
func (*Config) Validate ¶
Validate checks configuration constraints and returns error for invalid values.
Hard Validation Rules:
- HeartbeatTTL >= 2 * HeartbeatInterval (allow 1 missed heartbeat)
- WorkerIDTTL >= 3 * HeartbeatInterval (stable ID renewal)
- WorkerIDTTL >= HeartbeatTTL (ID must outlive heartbeat)
- RebalanceCooldown > 0 (prevent thrashing)
- ColdStartWindow >= PlannedScaleWindow (cold start is slower)
- RebalanceCooldown <= PlannedScaleWindow (rate limit coordination)
- RebalanceCooldown <= ColdStartWindow (rate limit coordination)
- EmergencyGracePeriod <= HeartbeatTTL (detection window)
Returns:
- error: Validation error with clear explanation, nil if valid
func (*Config) ValidateWithWarnings ¶
ValidateWithWarnings checks configuration and logs warnings for non-recommended values.
This is called after Validate() in NewManager() to provide operator guidance.
Parameters:
- logger: Logger instance for warning output
type DegradedAlertConfig ¶
type DegradedAlertConfig struct {
// InfoThreshold is the duration in degraded mode before emitting Info-level alerts.
// Default: 30 seconds.
InfoThreshold time.Duration `yaml:"infoThreshold" default:"30s" validate:"gt=0"`
// WarnThreshold is the duration in degraded mode before emitting Warn-level alerts.
// Default: 2 minutes.
WarnThreshold time.Duration `yaml:"warnThreshold" default:"2m" validate:"gt=0,gtefield=InfoThreshold"`
// ErrorThreshold is the duration in degraded mode before emitting Error-level alerts.
// Default: 5 minutes.
ErrorThreshold time.Duration `yaml:"errorThreshold" default:"5m" validate:"gt=0,gtefield=WarnThreshold"`
// CriticalThreshold is the duration in degraded mode before emitting Critical-level alerts.
// Default: 10 minutes.
CriticalThreshold time.Duration `yaml:"criticalThreshold" default:"10m" validate:"gt=0,gtefield=ErrorThreshold"`
// AlertInterval is the time between repeated alerts at the same severity level.
// Default: 1 minute.
AlertInterval time.Duration `yaml:"alertInterval" default:"1m" validate:"gt=0"`
}
DegradedAlertConfig controls alert emission during degraded mode operation.
type DegradedBehaviorConfig ¶
type DegradedBehaviorConfig struct {
// EnterThreshold is how long NATS connectivity errors must persist before entering degraded mode.
// Provides hysteresis to prevent flapping during transient issues.
// Default: 10 seconds.
EnterThreshold time.Duration `yaml:"enterThreshold" default:"10s" validate:"gte=0"`
// ExitThreshold is how long NATS connectivity must be stable before exiting degraded mode.
// Should be shorter than EnterThreshold to recover quickly.
// Default: 5 seconds.
ExitThreshold time.Duration `yaml:"exitThreshold" default:"5s" validate:"gte=0"`
// KVErrorThreshold is the number of consecutive KV operation errors that trigger degraded mode.
// Default: 5 errors.
KVErrorThreshold int `yaml:"kvErrorThreshold" default:"5" validate:"gte=0"`
// KVErrorWindow is the time window for counting consecutive KV errors.
// Errors outside this window are not counted.
// Default: 30 seconds.
KVErrorWindow time.Duration `yaml:"kvErrorWindow" default:"30s" validate:"gte=0"`
// RecoveryGracePeriod is the minimum time the leader must wait after recovering from
// degraded mode before declaring missing workers as failed (emergency rebalance).
// Prevents false emergencies when workers recover slightly slower than the leader.
// Default: 15 seconds.
RecoveryGracePeriod time.Duration `yaml:"recoveryGracePeriod" default:"15s" validate:"gte=0"`
}
DegradedBehaviorConfig controls when the manager enters and exits degraded mode.
func DegradedBehaviorPreset ¶
func DegradedBehaviorPreset(preset string) (DegradedBehaviorConfig, error)
DegradedBehaviorPreset returns a preconfigured DegradedBehaviorConfig based on the preset name.
Supported presets:
- "conservative": Slower to enter degraded, safer for production (30s enter, 10s exit, 10 errors)
- "balanced": Default behavior, good for most use cases (10s enter, 5s exit, 5 errors)
- "aggressive": Faster to enter degraded, better for development (5s enter, 3s exit, 3 errors)
Parameters:
- preset: Preset name ("conservative", "balanced", or "aggressive")
Returns:
- DegradedBehaviorConfig: Preconfigured behavior settings
- error: ErrInvalidPreset if preset name is not recognized
Example:
cfg, err := DegradedBehaviorPreset("conservative")
if err != nil {
log.Fatal(err)
}
type ElectionAgent ¶
type ElectionAgent = types.ElectionAgent
Re-export interfaces from the internal types package for convenience.
type HandoffClaim ¶
type HandoffClaim struct {
PartitionID string `json:"partition_id"`
Owner string `json:"owner"`
PendingOwner string `json:"pending_owner,omitempty"`
State HandoffClaimState `json:"state"`
Epoch int64 `json:"epoch"`
LastUpdated time.Time `json:"last_updated"`
TTLSeconds int64 `json:"ttl_seconds"`
ConflictCount int64 `json:"conflict_count,omitempty"`
}
HandoffClaim is a read-only view of a partition handoff claim stored in the two-phase handoff KV bucket. It is intentionally decoupled from internal implementation details and suitable for diagnostics and tests.
func InspectHandoffClaims ¶
func InspectHandoffClaims(ctx context.Context, js jetstream.JetStream, bucket string) ([]HandoffClaim, error)
InspectHandoffClaims opens the provided JetStream KV bucket and returns all current handoff claims stored under the "claims/" prefix.
This helper is public to allow tests to inspect claims without a Manager instance, given a JetStream context and bucket name.
Parameters:
- ctx: Context for cancellation
- js: JetStream context used to open the bucket
- bucket: KV bucket name where handoff claims are stored
Returns:
- []HandoffClaim: Decoded claims
- error: Error opening the bucket or reading/decoding entries
type HandoffClaimState ¶
type HandoffClaimState string
HandoffClaimState represents the logical state of a handoff claim.
Values:
- "stable": No handoff in progress
- "prepare": Target worker declared intent to take over
- "commit": Ownership switch in-progress, awaiting final stabilization
- "abort": Handoff aborted (future use)
- "unknown": Parsing fallback
const ( HandoffClaimStable HandoffClaimState = "stable" HandoffClaimPrepare HandoffClaimState = "prepare" HandoffClaimCommit HandoffClaimState = "commit" HandoffClaimAbort HandoffClaimState = "abort" HandoffClaimUnknown HandoffClaimState = "unknown" )
type HandoffConfig ¶
type HandoffConfig struct {
// SweepInterval controls how often stale/expired claims are opportunistically
// swept during Apply calls. If zero or negative, a sweep is attempted on every Apply.
SweepInterval time.Duration `yaml:"sweepInterval" default:"30s" validate:"gte=0"`
// MaxRetries controls bounded CAS retries for claim updates. Zero uses default (3).
MaxRetries int `yaml:"maxRetries" default:"3" validate:"gte=0"`
// BaseBackoff is the initial backoff for CAS retry with exponential backoff.
BaseBackoff time.Duration `yaml:"baseBackoff" default:"50ms" validate:"gte=0"`
// MaxBackoff caps the exponential backoff duration.
MaxBackoff time.Duration `yaml:"maxBackoff" default:"500ms" validate:"gte=0,gtefield=BaseBackoff"`
// Jitter is a fractional value [0.0, 1.0] to randomize backoff durations.
Jitter float64 `yaml:"jitter" default:"0.2" validate:"gte=0,lte=1"`
// DelayAfterPrepare introduces an artificial delay after the prepare phase completes
// and before the consumer updater is invoked. Useful for making intermediate states
// observable in tests and demonstrations. Ignored if <= 0.
DelayAfterPrepare time.Duration `yaml:"delayAfterPrepare" default:"0" validate:"gte=0"`
// DelayBeforeStable introduces an artificial delay after entering the commit state
// and before finalizing to stable. Useful for external observation of the commit state.
// Ignored if <= 0.
DelayBeforeStable time.Duration `yaml:"delayBeforeStable" default:"0" validate:"gte=0"`
}
HandoffConfig controls two-phase handoff coordinator behavior.
These settings are only used when EnableTwoPhaseHandoff is true.
type KVBucketConfig ¶
type KVBucketConfig struct {
// StableIDBucket is the bucket name for stable worker ID claims.
StableIDBucket string `yaml:"stableIdBucket" default:"parti-stableid" validate:"required"`
// ElectionBucket is the bucket name for leader election.
ElectionBucket string `yaml:"electionBucket" default:"parti-election" validate:"required"`
// HeartbeatBucket is the bucket name for worker heartbeats.
HeartbeatBucket string `yaml:"heartbeatBucket" default:"parti-heartbeat" validate:"required"`
// AssignmentBucket is the bucket name for partition assignments.
AssignmentBucket string `yaml:"assignmentBucket" default:"parti-assignment" validate:"required"`
// AssignmentTTL is how long assignments remain in KV (0 = no expiration).
// Assignments should persist across leader changes for version continuity.
// Recommended: 0 (no TTL) or very long (e.g., 1 hour).
AssignmentTTL time.Duration `yaml:"assignmentTtl" default:"0" validate:"gte=0"`
// HandoffBucket is the bucket name for two-phase handoff ownership claims.
// Used only when EnableTwoPhaseHandoff is true. Stores per-partition claim
// records that track prepare/commit/stable transitions to ensure atomic
// ownership changes and crash-resumable state. Kept separate from assignment
// data to allow independent TTL and operational policies (sweeps, compaction).
// Recommended: distinct bucket to isolate churn from stable assignment data.
HandoffBucket string `yaml:"handoffBucket" default:"parti-handoff"`
// HandoffTTL is how long a handoff claim remains valid in KV after last
// update. A short TTL bounds stale claim accumulation if a leader crashes
// mid-handoff; surviving leaders can safely recreate missing claims. Must be
// longer than expected multi-phase handoff duration (including retries) but
// significantly shorter than AssignmentTTL to permit natural cleanup.
// Recommended: 2-5 minutes in production; fast tests may use seconds.
HandoffTTL time.Duration `yaml:"handoffTtl" default:"2m"`
}
KVBucketConfig configures NATS JetStream KV bucket names and TTLs.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager coordinates workers in a distributed system for partition-based work distribution.
Manager is the main entry point of the Parti library. It handles:
- Stable worker ID claiming using NATS KV
- Leader election for assignment coordination
- Partition assignment calculation and distribution
- Heartbeat publishing and failure detection
- Graceful rebalancing during scaling events
Thread Safety:
- All public methods are safe for concurrent use
- State transitions are atomic and linearizable
- Assignment updates are copy-on-write
Lifecycle:
- Create with NewManager()
- Call Start() to claim ID and begin coordination
- Use hooks to react to assignment changes
- Call Stop() for graceful shutdown
Testing: Consumers can define minimal interfaces for mocking:
type WorkCoordinator interface {
Start(ctx context.Context) error
WorkerID() string
}
func NewManager ¶
func NewManager(cfg *Config, js jetstream.JetStream, source PartitionSource, strategy AssignmentStrategy, opts ...Option) (*Manager, error)
NewManager creates a new Manager instance with the provided configuration.
The Manager coordinates workers in a distributed system using NATS for:
- Stable worker ID claiming (via NATS KV)
- Leader election for assignment coordination
- Partition assignment distribution
- Heartbeat publication for health monitoring
Returns a concrete *Manager struct following the "accept interfaces, return structs" principle. Consumers can define their own interfaces for testing if needed.
Internal components (calculator, heartbeat, election, claimer) are initialized with NoOp implementations, ensuring they are never nil.
Parameters:
- cfg: Configuration for the manager
- js: JetStream context for NATS interaction
- source: Source of partitions to distribute
- strategy: Strategy for assigning partitions to workers
- opts: Optional configuration options
Returns:
- *Manager: Initialized manager instance
- error: Validation error if configuration is invalid
Example:
cfg := parti.Config{WorkerIDPrefix: "worker", WorkerIDMax: 999}
src := source.NewStatic(partitions)
curStrategy := strategy.NewConsistentHash()
js, _ := jetstream.New(natsConn)
mgr, err := parti.NewManager(&cfg, js, src, curStrategy)
if err != nil {
log.Fatal(err)
}
func (*Manager) CurrentAssignment ¶
func (m *Manager) CurrentAssignment() Assignment
CurrentAssignment returns the current partition assignment for this worker.
Returns:
- Assignment: The current assignment. Returns empty assignment if none received.
func (*Manager) InspectHandoffClaims ¶
func (m *Manager) InspectHandoffClaims(ctx context.Context) ([]HandoffClaim, error)
InspectHandoffClaims returns all current handoff claims from the configured handoff KV bucket for this Manager instance.
The method is best-effort and intended for integration tests and operational diagnostics. It requires two-phase handoff to be enabled and the handoff bucket to exist.
Parameters:
- ctx: Context for cancellation
Returns:
- []HandoffClaim: Decoded claim entries under the claims/ prefix
- error: Any failure opening the bucket or listing/decoding entries
func (*Manager) IsInRecoveryGrace ¶
IsInRecoveryGrace returns true if currently in recovery grace period.
This is part of the StateProvider interface and allows components like Calculator to check recovery grace status without circular dependencies.
Returns:
- bool: true if in recovery grace period
func (*Manager) IsLeader ¶
IsLeader returns whether this manager is currently the leader.
Returns:
- bool: true if this manager is the leader, false otherwise.
func (*Manager) RefreshPartitions ¶
RefreshPartitions triggers partition discovery refresh.
This method forces the partition source to be re-queried and, if the worker is the leader, triggers an immediate rebalance with the updated partition list. Non-leader workers will receive the updated assignments automatically.
Use this when:
- Partitions are added/removed dynamically (e.g., Kafka topics, Redis shards)
- You want to redistribute work after manual partition changes
- Your partition source has changed but workers haven't detected it yet
Parameters:
- ctx: Context for operation timeout
Returns:
- error: Refresh error, or ErrNotStarted if manager isn't running
Example:
// After adding new partitions to your partition source
if err := manager.RefreshPartitions(ctx); err != nil {
log.Printf("Failed to refresh partitions: %v", err)
}
func (*Manager) Start ¶
Start initializes and runs the manager.
Blocks until worker ID is claimed and the initial assignment is received. The manager lifecycle runs independently from the startup context - ctx is only used to control the startup timeout, not the manager's operational lifetime.
If a WorkerConsumerUpdater was provided via WithWorkerConsumerUpdater, the initial assignment is applied (best-effort, asynchronously) to the worker's durable JetStream consumer immediately after it is fetched. Subsequent assignment changes will also trigger UpdateWorkerConsumer before Hooks.OnAssignmentChanged is invoked, enabling hot-reload of FilterSubjects without restarting pull loops.
IMPORTANT: On error, caller MUST call Stop(ctx) to clean up resources:
- Stops ID renewal goroutine
- Releases claimed stable worker ID
- Cancels background operations
- Prevents goroutine and resource leaks
Parameters:
- ctx: Context for startup timeout control (not manager lifetime)
Returns:
- error: Startup error or context cancellation
Example usage:
mgr := parti.NewManager(cfg, js, source, strategy)
if err := mgr.Start(ctx); err != nil {
// Cleanup on startup failure
_ = mgr.Stop(context.Background())
return err
}
func (*Manager) State ¶
State returns the current state of the manager.
Returns:
- State: The current state (e.g., StateInit, StateStable).
func (*Manager) Stop ¶
Stop gracefully shuts down the manager.
Safe to call multiple times - subsequent calls will return ErrNotStarted.
Parameters:
- ctx: Context for shutdown timeout
Returns:
- error: Shutdown error or timeout
func (*Manager) WaitState ¶
WaitState waits for the manager to reach the expected state within the timeout period.
This method is useful for testing and synchronization scenarios where you need to wait for the manager to reach a specific state before proceeding.
The method returns a read-only channel that will receive exactly one value:
- nil if the expected state is reached within the timeout
- context.DeadlineExceeded if the timeout expires before reaching the state
The channel is closed after sending the result, allowing safe use in select statements.
Parameters:
- expectedState: The state to wait for
- timeout: Maximum duration to wait for the state
Returns:
- <-chan error: A channel that receives the result (nil on success, error on timeout)
Example:
// Wait for manager to reach Stable state
errCh := manager.WaitState(StateStable, 10*time.Second)
if err := <-errCh; err != nil {
log.Printf("Failed to reach Stable state: %v", err)
}
// Using with select for multiple operations
select {
case err := <-manager.WaitState(StateStable, 5*time.Second):
if err != nil {
return fmt.Errorf("timeout waiting for stable state: %w", err)
}
case <-ctx.Done():
return ctx.Err()
}
// Waiting for multiple managers
for i, mgr := range managers {
if err := <-mgr.WaitState(StateStable, 10*time.Second); err != nil {
return fmt.Errorf("manager %d failed: %w", i, err)
}
}
type MetricsCollector ¶
type MetricsCollector = types.MetricsCollector
Re-export interfaces from the internal types package for convenience.
type Option ¶
type Option func(*managerOptions)
Option configures a Manager with optional dependencies.
func WithElectionAgent ¶
func WithElectionAgent(agent ElectionAgent) Option
WithElectionAgent sets a custom election agent.
Parameters:
- agent: ElectionAgent implementation
Returns:
- Option: Functional option for NewManager
Example:
agent := myElectionAgent
js, _ := jetstream.New(conn)
mgr, err := parti.NewManager(&cfg, js, src, strategy.NewConsistentHash(), parti.WithElectionAgent(agent))
if err != nil { /* handle */ }
func WithHandoffMetricsRecorder ¶
func WithHandoffMetricsRecorder(mr handoff.MetricsRecorder) Option
WithHandoffMetricsRecorder sets a specialized metrics recorder for the internal two-phase handoff coordinator.
Intended primarily for tests to assert CAS conflicts, phase timings, and sweeper behavior. In production, leave unset to use the default no-op or a future global wiring.
Parameters:
- mr: handoff.MetricsRecorder implementation
Returns:
- Option: Functional option for NewManager
func WithHooks ¶
WithHooks sets lifecycle event hooks.
Parameters:
- hooks: Hooks structure with callback functions
Returns:
- Option: Functional option for NewManager
Example:
hooks := &parti.Hooks{
OnAssignmentChanged: func(ctx context.Context, old, new []parti.Partition) error {
// derive added/removed by diffing old vs new if needed
return nil
},
}
js, _ := jetstream.New(conn)
mgr, err := parti.NewManager(&cfg, js, src, strategy.NewConsistentHash(), parti.WithHooks(hooks))
if err != nil { /* handle */ }
func WithLogger ¶
WithLogger sets a logger.
Parameters:
- logger: Logger implementation (compatible with zap.SugaredLogger)
Returns:
- Option: Functional option for NewManager
Example:
logger := zap.NewExample().Sugar()
js, _ := jetstream.New(conn)
mgr, err := parti.NewManager(&cfg, js, src, strategy.NewConsistentHash(), parti.WithLogger(logger))
if err != nil { /* handle */ }
func WithMetrics ¶
func WithMetrics(metrics MetricsCollector) Option
WithMetrics sets a metrics collector.
Parameters:
- metrics: MetricsCollector implementation
Returns:
- Option: Functional option for NewManager
Example:
metrics := myPrometheusCollector
js, _ := jetstream.New(conn)
mgr, err := parti.NewManager(&cfg, js, src, strategy.NewConsistentHash(), parti.WithMetrics(metrics))
if err != nil { /* handle */ }
func WithWorkerConsumerUpdater ¶
func WithWorkerConsumerUpdater(updater WorkerConsumerUpdater) Option
WithWorkerConsumerUpdater injects a WorkerConsumerUpdater used by Manager to apply the worker's current assignment to a single durable JetStream consumer.
Invocation Points:
- Immediately after initial assignment (async, best-effort)
- After each subsequent assignment change
This option enables fully manager-driven consumer reconciliation; hooks.OnAssignmentChanged can then be reserved for metrics or side effects instead of subscription wiring.
Parameters:
- updater: Implementation that maps assignments to consumer FilterSubjects
Returns:
- Option: Functional option for NewManager
Example:
js, _ := jetstream.New(nc)
helper, _ := subscription.NewWorkerConsumer(js, subscription.WorkerConsumerConfig{ /* ... */ }, handler)
mgr, err := parti.NewManager(cfg, js, src, strategy, parti.WithWorkerConsumerUpdater(helper))
if err != nil { /* handle */ }
type Partition ¶
Re-export types from the internal types package.
This file provides a stable, backward-compatible public API for the library's core types and interfaces. It uses type aliases to re-export definitions from the `types` subpackage, which contains the actual implementations.
This pattern solves the "import cycle" problem by allowing internal packages to depend on `types` without depending on the root `parti` package, while still providing a convenient `parti.State`, `parti.Logger`, etc. for users.
type PartitionSource ¶
type PartitionSource = types.PartitionSource
Re-export interfaces from the internal types package for convenience.
type PartitionSourceUpdater ¶ added in v1.3.0
type PartitionSourceUpdater = types.PartitionSourceUpdater
Re-export interfaces from the internal types package for convenience.
type PartitionUpdater ¶ added in v1.3.0
type PartitionUpdater = types.PartitionUpdater
Re-export interfaces from the internal types package for convenience.
type State ¶
Re-export types from the internal types package.
This file provides a stable, backward-compatible public API for the library's core types and interfaces. It uses type aliases to re-export definitions from the `types` subpackage, which contains the actual implementations.
This pattern solves the "import cycle" problem by allowing internal packages to depend on `types` without depending on the root `parti` package, while still providing a convenient `parti.State`, `parti.Logger`, etc. for users.
type WatchablePartitionSource ¶ added in v1.3.0
type WatchablePartitionSource = types.WatchablePartitionSource
Re-export interfaces from the internal types package for convenience.
type WorkerConsumerUpdater ¶
type WorkerConsumerUpdater interface {
// UpdateWorkerConsumer applies the given partition assignment to the worker's durable consumer.
//
// See interface documentation for semantics and concurrency guarantees.
//
// Parameters:
// - ctx: Context for cancellation and deadline
// - workerID: Stable worker ID claimed by Manager
// - partitions: Complete assignment slice (may be empty for zero subjects)
//
// Returns:
// - error: Non-nil only on unrecoverable configuration or API failure after retries
UpdateWorkerConsumer(ctx context.Context, workerID string, partitions []Partition) error
}
WorkerConsumerUpdater applies partition assignments to a worker-level durable JetStream consumer.
Semantics:
- Single durable consumer per worker (named <ConsumerPrefix>-<workerID>)
- Complete partition set provided each call (NOT a delta)
- Must be idempotent: identical subject set re-applied => no change
- SHOULD implement internal retries/backoff for transient JetStream errors
- MUST return error only for unrecoverable misconfiguration (e.g., invalid stream)
Concurrency: Implementations SHOULD be safe for concurrent calls.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package consumer provides unified JetStream consumer types for different consumption patterns.
|
Package consumer provides unified JetStream consumer types for different consumption patterns. |
|
examples
|
|
|
basic
command
Package main demonstrates basic Parti usage with default settings.
|
Package main demonstrates basic Parti usage with default settings. |
|
kv-watcher
command
|
|
|
internal
|
|
|
assignment
Package assignment provides partition assignment calculation and distribution.
|
Package assignment provides partition assignment calculation and distribution. |
|
election
Package election provides leader election implementations for parti.
|
Package election provides leader election implementations for parti. |
|
heartbeat
Package heartbeat provides periodic health monitoring for workers through NATS KV.
|
Package heartbeat provides periodic health monitoring for workers through NATS KV. |
|
hooks
Package hooks provides hook implementations for the parti library.
|
Package hooks provides hook implementations for the parti library. |
|
logging
Package logging provides logging utilities for the Parti library.
|
Package logging provides logging utilities for the Parti library. |
|
testutil
Package testutil provides shared test utilities and fixtures for integration tests.
|
Package testutil provides shared test utilities and fixtures for integration tests. |
|
Package jsutil provides utilities for working with NATS JetStream streams and consumers.
|
Package jsutil provides utilities for working with NATS JetStream streams and consumers. |
|
Package kvutil provides utilities for working with NATS JetStream KeyValue stores.
|
Package kvutil provides utilities for working with NATS JetStream KeyValue stores. |
|
Package partition provides static partition-based NATS publishing and consuming.
|
Package partition provides static partition-based NATS publishing and consuming. |
|
scripts
|
|
|
gap_timeline
command
|
|
|
inspect_consumers
command
|
|
|
trace_visualizer
command
|
|
|
Partition sources discover available partitions for assignment.
|
Partition sources discover available partitions for assignment. |
|
Package strategy provides built-in assignment strategy implementations.
|
Package strategy provides built-in assignment strategy implementations. |
|
Package subscription provides durable JetStream consumer management utilities.
|
Package subscription provides durable JetStream consumer management utilities. |
|
test
|
|
|
cmd/nats-server
command
This server runs in a separate process to isolate NATS server memory overhead from parti library measurements.
|
This server runs in a separate process to isolate NATS server memory overhead from parti library measurements. |
|
simulation/cmd/simulation
command
|
|
|
simulation/internal/logging
Package logging provides logging utilities for the simulation.
|
Package logging provides logging utilities for the simulation. |
|
Package testing provides test utilities for the Parti library.
|
Package testing provides test utilities for the Parti library. |
|
This package contains shared types that are used across multiple packages in the Parti library.
|
This package contains shared types that are used across multiple packages in the Parti library. |