analyze

package
v0.84.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 22, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package analyze provides read-only pipeline constraint analysis. It identifies the bottleneck stage from interval telemetry and recommends worker allocation. Does not actuate — shadow mode only.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Analyzer

type Analyzer struct {
	// contains filtered or unexported fields
}

Analyzer periodically evaluates pipeline stages and logs constraint identification + worker allocation recommendations. Read-only.

func NewAnalyzer

func NewAnalyzer(interval time.Duration, opts ...Option) *Analyzer

NewAnalyzer creates an analyzer that evaluates every interval. Panics if interval <= 0.

func (*Analyzer) AddStage

func (a *Analyzer) AddStage(spec StageSpec)

AddStage registers a stage for analysis. Must be called before Run. Panics if Name is empty, Stats is nil, or Run has started.

func (*Analyzer) CurrentSnapshot

func (a *Analyzer) CurrentSnapshot() *Snapshot

Snapshot returns the most recent analysis. Nil before the first interval.

func (*Analyzer) Run

func (a *Analyzer) Run(ctx context.Context)

Run blocks, analyzing every interval until ctx is canceled. Panics if called twice.

func (*Analyzer) SetDrum added in v0.80.0

func (a *Analyzer) SetDrum(name string)

SetDrum manually overrides the constraint stage. Bypasses automatic identification — the analyzer will report this stage as the constraint with confidence 1.0 on every interval. Must be called before Run. Panics if called after Run.

type Option

type Option func(*Analyzer)

Option configures an Analyzer.

func WithLogger

func WithLogger(l *log.Logger) Option

WithLogger sets the logger for analyzer output.

type Rebalancer added in v0.79.0

type Rebalancer struct {
	// contains filtered or unexported fields
}

Rebalancer consumes Analyzer snapshots and moves workers between stages. Moves at most one worker per interval. Reverts if throughput regresses.

func NewRebalancer added in v0.79.0

func NewRebalancer(analyzer *Analyzer, opts ...RebalancerOption) *Rebalancer

NewRebalancer creates a rebalancer that consumes analyzer snapshots.

func (*Rebalancer) AddStage added in v0.79.0

func (r *Rebalancer) AddStage(sc StageControl)

AddStage registers a stage for rebalancing. Must be called before Run.

func (*Rebalancer) Disable added in v0.79.0

func (r *Rebalancer) Disable()

Disable stops actuation. Workers stay where they are.

func (*Rebalancer) Enable added in v0.79.0

func (r *Rebalancer) Enable()

Enable resumes actuation.

func (*Rebalancer) Enabled added in v0.79.0

func (r *Rebalancer) Enabled() bool

Enabled returns whether the rebalancer is currently actuating.

func (*Rebalancer) FormatStatus added in v0.79.0

func (r *Rebalancer) FormatStatus() string

FormatStatus returns a human-readable status line.

func (*Rebalancer) Run added in v0.79.0

func (r *Rebalancer) Run(ctx context.Context, interval time.Duration)

Run blocks, checking analyzer snapshots every interval and actuating.

type RebalancerOption added in v0.79.0

type RebalancerOption func(*Rebalancer)

RebalancerOption configures a Rebalancer.

func WithCooldown added in v0.79.0

func WithCooldown(n int) RebalancerOption

WithCooldown sets the number of intervals to wait after a move.

func WithKillSwitch added in v0.79.0

func WithKillSwitch(fn func() bool) RebalancerOption

WithKillSwitch sets a function that, when returning true, disables actuation.

func WithRebalancerLogger added in v0.79.0

func WithRebalancerLogger(l *log.Logger) RebalancerOption

WithRebalancerLogger sets the logger.

type Snapshot

type Snapshot struct {
	At         time.Time
	Constraint string          // empty if none identified
	Confidence float64         // 0.0-1.0
	Stages     []StageSnapshot // ordered by registration

	// DrumStarvationCount tracks consecutive intervals the identified
	// constraint was classified as [StateStarved] (high idle ratio,
	// queue draining). A non-zero value is a Step 2 violation — the
	// drum is being wasted. Reset when the drum stops being starved.
	DrumStarvationCount int
}

Snapshot is the analyzer's output for one interval. Published via atomic.Pointer. Callers receive a deep copy — safe to read and retain without synchronization.

type StageAnalysis

type StageAnalysis struct {
	State           StageState
	Utilization     float64
	IdleRatio       float64
	BlockedRatio    float64
	QueueGrowth     float64
	ErrorRate       float64
	Goodput         float64 // successful completions/sec
	ArrivalRate     float64 // submitted items/sec
	CurrentWorkers  int
	Recommendation  int    // suggested workers; 0 = no recommendation
	RecommendReason string // human-readable explanation
}

StageAnalysis holds the analysis of a single stage for one interval.

type StageControl added in v0.79.0

type StageControl struct {
	Name       string
	SetWorkers func(int) (int, error)
	Stats      func() toc.Stats
	Policy     WorkerPolicy
}

StageControl provides actuation and observation for a stage.

type StageSnapshot added in v0.78.0

type StageSnapshot struct {
	Name     string
	Analysis StageAnalysis
}

StageSnapshot pairs a stage name with its analysis.

type StageSpec

type StageSpec struct {
	Name       string
	Stats      func() toc.Stats
	MinWorkers int  // default 1
	MaxWorkers int  // 0 = unlimited
	Scalable   bool // false = don't recommend changes
}

StageSpec describes a stage for analysis.

type StageState

type StageState int

StageState classifies a stage's operational state from interval signals.

const (
	StateUnknown   StageState = iota // insufficient data
	StateHealthy                     // normal operation
	StateStarved                     // high idle, waiting for input
	StateBlocked                     // high output-blocked, downstream-limited
	StateSaturated                   // high busy, low idle/blocked — constraint candidate
	StateBroken                      // elevated errors
)

func (StageState) String

func (s StageState) String() string

type WorkerPolicy added in v0.79.0

type WorkerPolicy struct {
	Min       int  // minimum workers (default 1)
	Max       int  // maximum workers (0 = unlimited)
	DonateOK  bool // can workers be taken from this stage
	ReceiveOK bool // can workers be added to this stage
}

WorkerPolicy constrains how the rebalancer treats a stage.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL