Documentation
¶
Index ¶
- Variables
- func ExportCSV(results []ExperimentResult, dir string) error
- func ExportJSON(results []ExperimentResult, dir string) error
- type AlgorithmConfig
- type BatchItem
- type ConsumedMessage
- type ExperimentResult
- type Harness
- func (h *Harness) Broker() *broker.Broker
- func (h *Harness) Close() error
- func (h *Harness) ConsumeBatch(ctx context.Context, topic, group string, partition int) ([]*ConsumedMessage, error)
- func (h *Harness) CreateTopic(ctx context.Context, topic string, cfg broker.TopicConfig) error
- func (h *Harness) DeleteTopic(ctx context.Context, topic string) error
- func (h *Harness) GetMetricsSnapshot(ctx context.Context) (*MetricsSnapshot, error)
- func (h *Harness) GetPartitionLoads(ctx context.Context) ([]float64, error)
- func (h *Harness) IsExternal() bool
- func (h *Harness) Publish(ctx context.Context, topic string, key, value []byte, priority uint8) error
- func (h *Harness) PublishBatch(ctx context.Context, topic string, items []BatchItem) (int, error)
- func (h *Harness) SetManagedTopic(topic string)
- type HarnessConfig
- type MetricsCollector
- func (c *MetricsCollector) RecordConsume(latency time.Duration)
- func (c *MetricsCollector) RecordConsumeError()
- func (c *MetricsCollector) RecordConsumeMultiStage(enqueueTime, produceTime, appendTime int64)
- func (c *MetricsCollector) RecordConsumeWithPriority(priority uint8, latency time.Duration)
- func (c *MetricsCollector) RecordLoadSample(loads []float64)
- func (c *MetricsCollector) RecordLoadStdDev(sd float64)
- func (c *MetricsCollector) RecordProduce()
- func (c *MetricsCollector) RecordPublishError()
- func (c *MetricsCollector) RecordPublishThrottled()
- func (c *MetricsCollector) Reset()
- func (c *MetricsCollector) Snapshot(scenario, algorithm, system, mode string) ExperimentResult
- func (c *MetricsCollector) Start()
- func (c *MetricsCollector) Stop()
- type MetricsSnapshot
- type Mode
- type PriorityStats
- type Runner
Constants ¶
This section is empty.
Variables ¶
var ErrNoMessage = errors.New("no message available")
Functions ¶
func ExportCSV ¶
func ExportCSV(results []ExperimentResult, dir string) error
ExportCSV writes results as a flat CSV table.
func ExportJSON ¶
func ExportJSON(results []ExperimentResult, dir string) error
ExportJSON writes results as a structured JSON file.
Types ¶
type AlgorithmConfig ¶
type AlgorithmConfig struct {
Name string
NewBalancer func(k int) broker.Balancer
NewScheduler func() broker.Scheduler
Mode broker.ScheduleMode
WithOptimizer bool
}
AlgorithmConfig describes a balancer+scheduler combination to benchmark.
Mode picks the partition storage layout the scheduler depends on. FIFO-based schedulers read sequentially from the WAL; priority and DQN schedulers pop from the per-partition PriorityIndex, which only exists when the partition is created with a non-FIFO mode.
NewBalancer receives the actual partition count k so that partition-aware balancers (PSA, DQN) are initialised with the correct topology.
func AllAlgorithms ¶
func AllAlgorithms() []AlgorithmConfig
func KafkaBaseline ¶
func KafkaBaseline() AlgorithmConfig
KafkaBaseline returns a stub config used for labeling Kafka results.
type ConsumedMessage ¶
type ExperimentResult ¶
type ExperimentResult struct {
Scenario string `json:"scenario"`
Algorithm string `json:"algorithm"`
System string `json:"system"`
Mode string `json:"mode"`
Throughput float64 `json:"throughput_msg_per_sec"`
LatencyP50 time.Duration `json:"latency_p50_ns"`
LatencyP95 time.Duration `json:"latency_p95_ns"`
LatencyP99 time.Duration `json:"latency_p99_ns"`
LoadStdDev float64 `json:"load_stddev"`
Produced int64 `json:"produced"`
Consumed int64 `json:"consumed"`
PublishErrors int64 `json:"publish_errors"`
MessagesThrottled int64 `json:"messages_throttled"`
ConsumeErrors int64 `json:"consume_errors"`
Duration time.Duration `json:"duration_ns"`
LatencyEnqueueToFlushP50 time.Duration `json:"latency_enqueue_to_flush_p50_ns"`
LatencyFlushToAppendP50 time.Duration `json:"latency_flush_to_append_p50_ns"`
LatencyAppendToConsumeP50 time.Duration `json:"latency_append_to_consume_p50_ns"`
LatencyAppendToConsumeP95 time.Duration `json:"latency_append_to_consume_p95_ns"`
// LatencyByPriority breaks down latency per priority level (0=highest,
// 9=lowest). Meaningful only when the scenario uses mixed priorities and
// the system under test supports priority scheduling.
LatencyByPriority [10]PriorityStats `json:"latency_by_priority"`
}
ExperimentResult holds the aggregated metrics for a single experiment run.
type Harness ¶
type Harness struct {
// contains filtered or unexported fields
}
Harness abstracts the target system (NyaQueue in-process, NyaQueue gRPC, NyaQueue HTTP, or Kafka).
func NewHarness ¶
func NewHarness(ctx context.Context, cfg HarnessConfig) (*Harness, error)
NewHarness creates and starts the target system.
func (*Harness) ConsumeBatch ¶
func (*Harness) CreateTopic ¶
func (*Harness) DeleteTopic ¶
DeleteTopic removes a topic. Used between runs to ensure each experiment starts with a clean topic state.
func (*Harness) GetMetricsSnapshot ¶
func (h *Harness) GetMetricsSnapshot(ctx context.Context) (*MetricsSnapshot, error)
GetMetricsSnapshot retrieves partition loads and load stddev from the broker.
func (*Harness) GetPartitionLoads ¶
GetPartitionLoads retrieves partition loads from the broker regardless of mode.
func (*Harness) IsExternal ¶
func (*Harness) Publish ¶
func (h *Harness) Publish(ctx context.Context, topic string, key, value []byte, priority uint8) error
Publish sends a message. For Kafka mode, priority is ignored.
func (*Harness) PublishBatch ¶
func (*Harness) SetManagedTopic ¶
type HarnessConfig ¶
type HarnessConfig struct {
Mode Mode
BrokerConfig broker.Config
DataDir string
Algorithm AlgorithmConfig
NumPartitions int // passed to NewBalancer so partition-aware balancers use the correct K
KafkaBrokers []string
BrokerAddr string // gRPC address
HTTPBrokerAddr string // HTTP address; falls back to BrokerAddr when empty
}
HarnessConfig describes how to create a harness.
type MetricsCollector ¶
type MetricsCollector struct {
// contains filtered or unexported fields
}
MetricsCollector gathers per-message latencies, per-priority latencies, partition-load samples and error counters during an experiment run. Latency histograms are sharded by consumer partition to minimize contention.
func NewMetricsCollector ¶
func NewMetricsCollector() *MetricsCollector
func (*MetricsCollector) RecordConsume ¶
func (c *MetricsCollector) RecordConsume(latency time.Duration)
RecordConsume records aggregate latency (no priority tracking). Use RecordConsumeWithPriority when the priority level is known.
func (*MetricsCollector) RecordConsumeError ¶
func (c *MetricsCollector) RecordConsumeError()
func (*MetricsCollector) RecordConsumeMultiStage ¶
func (c *MetricsCollector) RecordConsumeMultiStage(enqueueTime, produceTime, appendTime int64)
RecordConsumeMultiStage records the breakdown of latency phases. enqueueTime: client timestamp, produceTime: broker receive, appendTime: WAL write.
func (*MetricsCollector) RecordConsumeWithPriority ¶
func (c *MetricsCollector) RecordConsumeWithPriority(priority uint8, latency time.Duration)
RecordConsumeWithPriority records both the aggregate latency and per-priority latency for the given message. priority must be in [0, 9].
func (*MetricsCollector) RecordLoadSample ¶
func (c *MetricsCollector) RecordLoadSample(loads []float64)
RecordLoadSample appends one stddev value computed across partitions at a single point in time. Snapshot averages these to get the run-level imbalance.
func (*MetricsCollector) RecordLoadStdDev ¶
func (c *MetricsCollector) RecordLoadStdDev(sd float64)
RecordLoadStdDev records a pre-computed load stddev from the broker.
func (*MetricsCollector) RecordProduce ¶
func (c *MetricsCollector) RecordProduce()
func (*MetricsCollector) RecordPublishError ¶
func (c *MetricsCollector) RecordPublishError()
func (*MetricsCollector) RecordPublishThrottled ¶
func (c *MetricsCollector) RecordPublishThrottled()
func (*MetricsCollector) Reset ¶
func (c *MetricsCollector) Reset()
Reset clears latency histograms and per-priority samples so that reconnection downtime latency does not pollute P99 measurements. Throughput counters are intentionally preserved.
func (*MetricsCollector) Snapshot ¶
func (c *MetricsCollector) Snapshot(scenario, algorithm, system, mode string) ExperimentResult
func (*MetricsCollector) Start ¶
func (c *MetricsCollector) Start()
func (*MetricsCollector) Stop ¶
func (c *MetricsCollector) Stop()
type MetricsSnapshot ¶
MetricsSnapshot holds partition loads and pre-computed load stddev from the broker.
type PriorityStats ¶
type Runner ¶
type Runner struct {
Scenarios []benchmarks.Scenario
Algorithms []AlgorithmConfig
Modes []Mode
KafkaBrokers []string
Duration time.Duration // override scenario duration if > 0
BrokerAddr string // gRPC broker address for external mode
HTTPBrokerAddr string // HTTP broker address for external mode (default: BrokerAddr)
// contains filtered or unexported fields
}
Runner orchestrates experiment runs across scenarios and algorithms.