Documentation
¶
Index ¶
- type Simulator
- type Stage
- type StageConfig
- type StageMetrics
- func (m *StageMetrics) GetStats() map[string]any
- func (m *StageMetrics) RecordDropped()
- func (m *StageMetrics) RecordDroppedBurst(items int)
- func (m *StageMetrics) RecordGenerated()
- func (m *StageMetrics) RecordGeneratedBurst(items int)
- func (m *StageMetrics) RecordOutput()
- func (m *StageMetrics) RecordProcessing()
- func (m *StageMetrics) Stop()
- type StageStats
- type StateEntry
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Simulator ¶
type Simulator struct {
// Duration specifies how long the simulation should run.
// If set to a positive value, the simulation will automatically stop
// after this duration. Mutually exclusive with MaxGeneratedItems.
Duration time.Duration
// MaxGeneratedItems is the maximum number of items to generate before stopping.
// If set to a positive value, the simulation will stop once this many items
// have been generated by the first stage. Mutually exclusive with Duration.
MaxGeneratedItems int
// Stages contains all the processing stages in the pipeline, ordered
// from first (generator) to last (final stage).
Stages []*Stage
// Mu protects access to the Stages slice and other shared state
Mu sync.RWMutex
// Ctx provides cancellation context for all stages
Ctx context.Context
// Cancel function to stop all stages gracefully
Cancel context.CancelFunc
// Quit channel is closed when the simulation completes
Quit chan struct{}
// Wg tracks all running goroutines for proper cleanup
Wg sync.WaitGroup
}
Simulator represents a concurrent pipeline simulator that orchestrates multiple processing stages in a data flow pipeline.
The simulator manages the lifecycle of all stages, coordinates data flow between them, and collects comprehensive performance metrics. It supports both time-based and item-count-based termination conditions.
func NewSimulator ¶
func NewSimulator() *Simulator
NewSimulator creates a new simulator instance with proper initialization.
Returns a simulator with:
- A cancellable context for graceful shutdown
- A quit channel for completion signaling
- Proper synchronization primitives
The simulator is ready to have stages added and configured.
func (*Simulator) AddStage ¶
AddStage adds a new stage to the pipeline with validation.
The stage is added to the end of the pipeline. The first stage added should be a generator (IsGenerator: true), while subsequent stages should be processors (IsGenerator: false).
Args:
- stage: The stage to add to the pipeline
Returns:
- error: nil if successful, or an error describing the validation failure
Validation rules:
- Stage cannot be nil
- Stage name cannot be empty
- Stage name must be unique within the pipeline
func (*Simulator) Done ¶
func (s *Simulator) Done() <-chan struct{}
Done returns a channel that is closed when the simulation completes.
This channel can be used to wait for simulation completion without blocking the calling goroutine. Useful for implementing custom completion handling or integration with other systems.
Returns:
- <-chan struct{}: A channel that closes when simulation is done
func (*Simulator) GetStages ¶
GetStages returns a copy of all stages in the pipeline.
This method provides thread-safe access to the stages slice. The returned slice is a copy, so modifications won't affect the running simulation.
Returns:
- []*Stage: A copy of all stages in the pipeline
func (*Simulator) PrintStats ¶
func (s *Simulator) PrintStats()
func (*Simulator) Start ¶
Start begins the simulation and blocks until completion.
This method initializes all stages, starts their goroutines, and waits for the simulation to complete based on the configured termination condition (Duration or MaxGeneratedItems).
The simulation will automatically stop when:
- The configured Duration has elapsed (if Duration > 0)
- The configured MaxGeneratedItems have been generated (if MaxGeneratedItems > 0)
- Stop() is called explicitly
Returns:
- error: nil if successful, or an error describing the failure
Panics:
- If both Duration and MaxGeneratedItems are set to positive values
func (*Simulator) Stop ¶
func (s *Simulator) Stop()
Stop gracefully terminates the simulation by canceling the context.
This method signals all stages to stop processing and initiates a graceful shutdown. All goroutines will receive the cancellation signal and clean up their resources.
func (*Simulator) WaitForStats ¶
func (s *Simulator) WaitForStats()
WaitForStats blocks until the simulation completes and then prints statistics.
This is a convenience method that combines waiting for completion with automatic statistics reporting. Equivalent to calling <-s.Done() followed by s.PrintStats().
type Stage ¶
type Stage struct {
Name string
Input chan any
Output chan any
Sem chan struct{}
Config *StageConfig
Metrics *StageMetrics
IsFinal bool
MaxGeneratedItems int
Stop func()
// contains filtered or unexported fields
}
Stage represents a processing stage in the pipeline
func NewStage ¶
func NewStage(name string, config *StageConfig) *Stage
NewStage creates a new stage with the given configuration
func (*Stage) GetMetrics ¶
func (s *Stage) GetMetrics() *StageMetrics
type StageConfig ¶
type StageConfig struct {
// Rate at which items are generated (generator only)
InputRate time.Duration
// Custom item generator function
ItemGenerator func() any
// Handles load spikes and burst patterns
// Generates input bursts at intervals
InputBurst func() []any
// Total number of bursts to inject
BurstCountTotal int
// Interval between bursts
BurstInterval time.Duration
// Number of goroutines per stage
RoutineNum int
// Channel buffer size per stage
BufferSize int
// Simulated delay per item
WorkerDelay time.Duration
// Number of times to retry on error
RetryCount int
// Drop input if channel is full
DropOnBackpressure bool
// Whether the stage is a generator
IsGenerator bool
// Core processing function
// Worker function that processes each item
WorkerFunc func(item any) (any, error)
// Context for cancellation and deadlines
Ctx context.Context
}
StageConfig holds the configuration for a pipeline stage
func DefaultConfig ¶
func DefaultConfig() *StageConfig
DefaultConfig returns a new SimulationConfig with sensible defaults
type StageMetrics ¶
type StageMetrics struct {
// Counters
ProcessedItems uint64
DroppedItems uint64
OutputItems uint64
// State
StartTime time.Time
EndTime time.Time
// Generator stats
GeneratedItems uint64
// contains filtered or unexported fields
}
StageMetrics tracks performance metrics for a stage
func NewStageMetrics ¶
func NewStageMetrics() *StageMetrics
NewStageMetrics creates a new metrics collector
func (*StageMetrics) GetStats ¶
func (m *StageMetrics) GetStats() map[string]any
GetStats returns a map of current metrics
func (*StageMetrics) RecordDropped ¶
func (m *StageMetrics) RecordDropped()
RecordDropped records a dropped item
func (*StageMetrics) RecordDroppedBurst ¶
func (m *StageMetrics) RecordDroppedBurst(items int)
RecordDroppedBurst records a dropped burst
func (*StageMetrics) RecordGenerated ¶
func (m *StageMetrics) RecordGenerated()
RecordGenerated records a generated item
func (*StageMetrics) RecordGeneratedBurst ¶
func (m *StageMetrics) RecordGeneratedBurst(items int)
RecordGeneratedBurst records a generated burst
func (*StageMetrics) RecordOutput ¶
func (m *StageMetrics) RecordOutput()
RecordOutput records a successful output
func (*StageMetrics) RecordProcessing ¶
func (m *StageMetrics) RecordProcessing()
RecordProcessing records the processing of an item
type StageStats ¶
type StageStats struct {
StageName string
ProcessedItems uint64
OutputItems uint64
Throughput float64
DroppedItems uint64
DropRate float64
GeneratedItems uint64
ThruDiffPct float64
ProcDiffPct float64
IsGenerator bool
IsFinal bool
}
StageStats represents the statistics for a single stage
type StateEntry ¶ added in v1.0.2
type StateEntry struct {
Stats map[tracker.GoroutineId]*tracker.GoroutineStats
Label string
}