experiments

package
v0.0.0-...-8b14664 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 17, 2026 License: MIT Imports: 33 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNoMessage = errors.New("no message available")

Functions

func ExportCSV

func ExportCSV(results []ExperimentResult, dir string) error

ExportCSV writes results as a flat CSV table.

func ExportJSON

func ExportJSON(results []ExperimentResult, dir string) error

ExportJSON writes results as a structured JSON file.

Types

type AlgorithmConfig

type AlgorithmConfig struct {
	Name          string
	NewBalancer   func(k int) broker.Balancer
	NewScheduler  func() broker.Scheduler
	Mode          broker.ScheduleMode
	WithOptimizer bool
}

AlgorithmConfig describes a balancer+scheduler combination to benchmark.

Mode picks the partition storage layout the scheduler depends on. FIFO-based schedulers read sequentially from the WAL; priority and DQN schedulers pop from the per-partition PriorityIndex, which only exists when the partition is created with a non-FIFO mode.

NewBalancer receives the actual partition count k so that partition-aware balancers (PSA, DQN) are initialised with the correct topology.

func AllAlgorithms

func AllAlgorithms() []AlgorithmConfig

func KafkaBaseline

func KafkaBaseline() AlgorithmConfig

KafkaBaseline returns a stub config used for labeling Kafka results.

type BatchItem

type BatchItem struct {
	Key      []byte
	Value    []byte
	Priority uint8
}

type ConsumedMessage

type ConsumedMessage struct {
	Value       []byte
	Offset      int64
	Priority    uint8 // 0 = highest, 9 = lowest; 0 when not available (Kafka)
	ProduceTime int64 // broker receive timestamp (unix nanos), 0 if unavailable
	AppendTime  int64 // WAL write timestamp (unix nanos), 0 if unavailable
}

type ExperimentResult

type ExperimentResult struct {
	Scenario          string        `json:"scenario"`
	Algorithm         string        `json:"algorithm"`
	System            string        `json:"system"`
	Mode              string        `json:"mode"`
	Throughput        float64       `json:"throughput_msg_per_sec"`
	LatencyP50        time.Duration `json:"latency_p50_ns"`
	LatencyP95        time.Duration `json:"latency_p95_ns"`
	LatencyP99        time.Duration `json:"latency_p99_ns"`
	LoadStdDev        float64       `json:"load_stddev"`
	Produced          int64         `json:"produced"`
	Consumed          int64         `json:"consumed"`
	PublishErrors     int64         `json:"publish_errors"`
	MessagesThrottled int64         `json:"messages_throttled"`
	ConsumeErrors     int64         `json:"consume_errors"`
	Duration          time.Duration `json:"duration_ns"`

	LatencyEnqueueToFlushP50  time.Duration `json:"latency_enqueue_to_flush_p50_ns"`
	LatencyFlushToAppendP50   time.Duration `json:"latency_flush_to_append_p50_ns"`
	LatencyAppendToConsumeP50 time.Duration `json:"latency_append_to_consume_p50_ns"`
	LatencyAppendToConsumeP95 time.Duration `json:"latency_append_to_consume_p95_ns"`

	// LatencyByPriority breaks down latency per priority level (0=highest,
	// 9=lowest). Meaningful only when the scenario uses mixed priorities and
	// the system under test supports priority scheduling.
	LatencyByPriority [10]PriorityStats `json:"latency_by_priority"`
}

ExperimentResult holds the aggregated metrics for a single experiment run.

type Harness

type Harness struct {
	// contains filtered or unexported fields
}

Harness abstracts the target system (NyaQueue in-process, NyaQueue gRPC, NyaQueue HTTP, or Kafka).

func NewHarness

func NewHarness(ctx context.Context, cfg HarnessConfig) (*Harness, error)

NewHarness creates and starts the target system.

func (*Harness) Broker

func (h *Harness) Broker() *broker.Broker

Broker returns the in-process broker (nil for Kafka mode).

func (*Harness) Close

func (h *Harness) Close() error

func (*Harness) ConsumeBatch

func (h *Harness) ConsumeBatch(ctx context.Context, topic, group string, partition int) ([]*ConsumedMessage, error)

func (*Harness) CreateTopic

func (h *Harness) CreateTopic(ctx context.Context, topic string, cfg broker.TopicConfig) error

func (*Harness) DeleteTopic

func (h *Harness) DeleteTopic(ctx context.Context, topic string) error

DeleteTopic removes a topic. Used between runs to ensure each experiment starts with a clean topic state.

func (*Harness) GetMetricsSnapshot

func (h *Harness) GetMetricsSnapshot(ctx context.Context) (*MetricsSnapshot, error)

GetMetricsSnapshot retrieves partition loads and load stddev from the broker.

func (*Harness) GetPartitionLoads

func (h *Harness) GetPartitionLoads(ctx context.Context) ([]float64, error)

GetPartitionLoads retrieves partition loads from the broker regardless of mode.

func (*Harness) IsExternal

func (h *Harness) IsExternal() bool

func (*Harness) Publish

func (h *Harness) Publish(ctx context.Context, topic string, key, value []byte, priority uint8) error

Publish sends a message. For Kafka mode, priority is ignored.

func (*Harness) PublishBatch

func (h *Harness) PublishBatch(ctx context.Context, topic string, items []BatchItem) (int, error)

func (*Harness) SetManagedTopic

func (h *Harness) SetManagedTopic(topic string)

type HarnessConfig

type HarnessConfig struct {
	Mode           Mode
	BrokerConfig   broker.Config
	DataDir        string
	Algorithm      AlgorithmConfig
	NumPartitions  int // passed to NewBalancer so partition-aware balancers use the correct K
	KafkaBrokers   []string
	BrokerAddr     string // gRPC address
	HTTPBrokerAddr string // HTTP address; falls back to BrokerAddr when empty
}

HarnessConfig describes how to create a harness.

type MetricsCollector

type MetricsCollector struct {
	// contains filtered or unexported fields
}

MetricsCollector gathers per-message latencies, per-priority latencies, partition-load samples and error counters during an experiment run. Latency histograms are sharded by consumer partition to minimize contention.

func NewMetricsCollector

func NewMetricsCollector() *MetricsCollector

func (*MetricsCollector) RecordConsume

func (c *MetricsCollector) RecordConsume(latency time.Duration)

RecordConsume records aggregate latency (no priority tracking). Use RecordConsumeWithPriority when the priority level is known.

func (*MetricsCollector) RecordConsumeError

func (c *MetricsCollector) RecordConsumeError()

func (*MetricsCollector) RecordConsumeMultiStage

func (c *MetricsCollector) RecordConsumeMultiStage(enqueueTime, produceTime, appendTime int64)

RecordConsumeMultiStage records the breakdown of latency phases. enqueueTime: client timestamp, produceTime: broker receive, appendTime: WAL write.

func (*MetricsCollector) RecordConsumeWithPriority

func (c *MetricsCollector) RecordConsumeWithPriority(priority uint8, latency time.Duration)

RecordConsumeWithPriority records both the aggregate latency and per-priority latency for the given message. priority must be in [0, 9].

func (*MetricsCollector) RecordLoadSample

func (c *MetricsCollector) RecordLoadSample(loads []float64)

RecordLoadSample appends one stddev value computed across partitions at a single point in time. Snapshot averages these to get the run-level imbalance.

func (*MetricsCollector) RecordLoadStdDev

func (c *MetricsCollector) RecordLoadStdDev(sd float64)

RecordLoadStdDev records a pre-computed load stddev from the broker.

func (*MetricsCollector) RecordProduce

func (c *MetricsCollector) RecordProduce()

func (*MetricsCollector) RecordPublishError

func (c *MetricsCollector) RecordPublishError()

func (*MetricsCollector) RecordPublishThrottled

func (c *MetricsCollector) RecordPublishThrottled()

func (*MetricsCollector) Reset

func (c *MetricsCollector) Reset()

Reset clears latency histograms and per-priority samples so that reconnection downtime latency does not pollute P99 measurements. Throughput counters are intentionally preserved.

func (*MetricsCollector) Snapshot

func (c *MetricsCollector) Snapshot(scenario, algorithm, system, mode string) ExperimentResult

func (*MetricsCollector) Start

func (c *MetricsCollector) Start()

func (*MetricsCollector) Stop

func (c *MetricsCollector) Stop()

type MetricsSnapshot

type MetricsSnapshot struct {
	PartitionLoads []float64
	LoadStdDev     float64
	HasStdDev      bool
}

MetricsSnapshot holds partition loads and pre-computed load stddev from the broker.

type Mode

type Mode int

Mode selects how the experiment communicates with the broker.

const (
	ModeInProcess Mode = iota
	ModeGRPC
	ModeHTTP
	ModeKafka
)

func (Mode) String

func (m Mode) String() string

type PriorityStats

type PriorityStats struct {
	P50   time.Duration `json:"p50_ns"`
	P99   time.Duration `json:"p99_ns"`
	Count int64         `json:"count"`
}

type Runner

type Runner struct {
	Scenarios      []benchmarks.Scenario
	Algorithms     []AlgorithmConfig
	Modes          []Mode
	KafkaBrokers   []string
	Duration       time.Duration // override scenario duration if > 0
	BrokerAddr     string        // gRPC broker address for external mode
	HTTPBrokerAddr string        // HTTP broker address for external mode (default: BrokerAddr)
	// contains filtered or unexported fields
}

Runner orchestrates experiment runs across scenarios and algorithms.

func (*Runner) RunAll

func (r *Runner) RunAll(ctx context.Context) ([]ExperimentResult, error)

RunAll executes every (scenario, algorithm, mode) combination and returns results.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL