package module
Version: v1.0.0 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Feb 27, 2020 License: Apache-2.0 Imports: 18 Imported by: 0


Image wrench - Workload-optimized and Re-engineered bench

Icon attribution: heb@Wikimedia Commons (mail) [CC BY-SA 3.0]

This is a fork of https://github.com/tylertreat/bench. For changes see here.

Code to run benchmarks against several Message Queues. Currently supported are the following providers

  • Kafka (two different client libraries)
  • NATS
  • NATS Streaming
  • Redis
  • No-Op - it does just test the performance of the tool itself.

For easier usage this code ships with several executables that can be found in the cmd folder. The most important one is benchmark to execute just that.


$ benchmark --help
Usage of benchmark:
  -broker string
        Type of msg bus to connect. Currently supported values are "kafka", "redis", "nats" and "nats-streaming" (default "kafka")
  -burst uint
        Burst rate for limiter. Only relevant if requestRate > 0
  -compression string
        Type of compression to be used in producer if available. Currently only works with Kafka. (default "none")
  -configFile string
        TOML file with configuration for this run. If this option is used all other options are ignored.
  -dataMode string
        Data mode to fill the packets. Can be "zero" (padding with zero bytes up to "payloadSize"), "random", "real" (default. Requires also --dfeedFile to be defined) and "parsed". (default "real")
  -duration uint
        Duration of benchmark in seconds (default 30)
  -hosts string
        Comma-separated hostnames to test against (default "localhost")
  -kafkaVersion string
        Version of Kafka to connect to. Default value "latest" will use latest known version of underlying connector library. (default "latest")
  -numPubs uint
        Publisher connections to split msg/sec upon (default 10)
  -numSubs uint
        Subscriber connections (default 10)
  -numTopics uint
        Shard data on that many topics (default 1)
  -partitions int
        Number of partitions per topic (default 1)
  -payloadSize uint
        Size of message payload (default 90)
        Enable replication
  -requestRate int
        Messages/second to be sent. 0 will send no requests, -1 runs full throttle. (default 1000000)
  -requestRateFile string
        File in CSV format containing duration and requestRate.
  -requestRateMultiplier uint
        Multiply all request rates by a constant factor. (default 1)
  -requestTimeMultiplier uint
        Multiply all request rate durations by a constant factor. (default 1)
  -subIp string
        The ip address where subscribers are running
  -subPort string
        The port on which subscribers are listening
  -topic string
        The topic used on the message bus (default "benchmark")

To run a benchmark at a fixed rate for a fixed amount of time use

benchmark --broker <broker> --requestRate <message rate> --duration <duration in seconds>

to run at a request rate in a CSV file (format: duration in seconds,messages per second) use

benchmark --broker <broker> --requestRateFile <csv file>

It is possible to run only publishers (--numSubs 0) or only subscribers (--numPubs 0). If they are run on separate machines it is necessary to provide --subPort <port> to the subscriber process and --subIp <ip of subscriber machine> --subPort <port> to the producer process so that they can handle clock skew. The subscriber process will then open a port on the given port and waits for the producer process to publish its clock. Failure to do so may result in benchmark crash.

For convenience all configuration parameters can be provided in a TOML file via --configFile parameter. If no config file is provided it will (over-)write a config file with the current settings for easier repeatability.


The benchmark process that runs the subscribers will create a raw latency file (binary format) for each subscriber. To convert these binary files into an easier-to-handle CSV-format latencyreader can be used

latencyreader <filename(s)>

The tool can handle an arbitrary number of input files and converts them to CSV.


This tool can either take the output of latencyreader or read the files directly themselves and creates aggregates over time.

$ latencyaggregator --help
Usage of latencyaggregator:
  -aggregation-interval uint
        Aggregation interval in seconds. (default 1)
  -i uint
        Aggregation interval in seconds. (default 1)
  -s    Output a summary at the end.
        Output a summary at the end.
  -v    Output more detailed stats.
        Output more detailed stats.

Please use the following reference if you want to cite this project:

Manuel Coenen, Christoph Wagner, Alexander Echler and Sebastian Frischbier. 2019. Poster: Benchmarking Financial Data Feed Systems. In DEBS ’19: The 13th ACM International Conference on Distributed and Event-based Systems (DEBS ’19), June 24–28, 2019, Darmstadt, Germany. ACM, New York, NY, USA, 2 pages. https://doi.org/10.1145/3328905.3332506



package wrench provides a generic framework for performing latency benchmarks.



This section is empty.


View Source
var Logarithmic = Percentiles{}/* 168 elements not displayed */

Logarithmic percentile scale.


func DetermineClockSkew

func DetermineClockSkew(o *config.Options) error

DetermineClockSkew checks the options on whether this instance is a) a subscriber and then opens a listening port to receive the time from a publisher or b) a publisher and tries to connect to the subscriber to publish its time


type BasePublisher

type BasePublisher struct {

	// ID of the Publisher instance
	ID uint64

BasePublisher provides methods common to all Publisher implementations

func (*BasePublisher) GetID

func (b *BasePublisher) GetID() uint64

GetID returns the Id of the Publisher

type BaseSubscriber

type BaseSubscriber struct {
	ID uint64
	// contains filtered or unexported fields

BaseSubscriber contains all shared methods for Subscriber implementations

func (*BaseSubscriber) BaseSetup

func (s *BaseSubscriber) BaseSetup(o *config.Options)

BaseSetup will initialize all fields of BaseSubscriber struct

func (*BaseSubscriber) BaseTeardown

func (s *BaseSubscriber) BaseTeardown()

BaseTeardown should be called from Subscriber impl's teardown method to finish tasks in BaseSubscriber

func (*BaseSubscriber) CheckForCompletion

func (s *BaseSubscriber) CheckForCompletion() error

CheckForCompletion checks if all messages have been fetched

func (*BaseSubscriber) RecordLatency

func (s *BaseSubscriber) RecordLatency(msg *[]byte)

RecordLatency will calculate the latency from the timestamp inside the provided byte-slice and put it to hdrhistogram. It also returns the calculated latency for further usage.

func (*BaseSubscriber) Summarize

func (s *BaseSubscriber) Summarize() *Summary

Summarize can/should be called to create a Summary object

type Benchmark

type Benchmark struct {
	// contains filtered or unexported fields

Benchmark performs a system benchmark by attempting to issue requests at a specified rate and capturing the latency distribution. The request rate is divided across the number of configured connections.

func NewBenchmark

func NewBenchmark(factory ConnectorFactory, o *config.Options) *Benchmark

NewBenchmark creates a Benchmark which runs a system benchmark using the given ConnectorFactory. The requestRate argument specifies the number of requests per second to issue. This value is divided across the number of connections specified, so if requestRate is 50,000 and connections is 10, each connection will attempt to issue 5,000 requests per second. A zero value permanently pauses requests and -1 disables rate limiting entirely. The duration argument specifies how long to run the benchmark.

func (*Benchmark) Run

func (b *Benchmark) Run() (*Summary, error)

Run the benchmark and return a summary of the results. An error is returned if something went wrong along the way.

type ConnectorFactory

type ConnectorFactory interface {

ConnectorFactory combines PublisherFactory and SubscriberFactory

type Latency

type Latency struct {
	Offset  uint32
	Latency uint32

Latency holds data about a latency in µs at a given offset in µs

type Percentiles

type Percentiles []float64

Percentiles is a list of percentiles to include in a latency distribution, e.g. 10.0, 50.0, 99.0, 99.99, etc.

type PubSummary

type PubSummary struct {
	NumPubs                     uint64
	AvgPublishRate              uint64
	SuccessTotal                uint64
	ErrorTotal                  uint64
	TimeElapsed                 time.Duration
	TotalAvgThroughput          float64
	PublishRates                []config.PublishRateChange
	AvgThroughputPerPublishRate []float64
	BytesTotal                  uint64
	BytesAvgThroughput          float64

PubSummary holds all statistic summary data for publishers

func (*PubSummary) String

func (s *PubSummary) String() string

String returns a stringified version of the Summary.

type Publisher

type Publisher interface {
	// Setup prepares the Publisher for benchmarking.
	Setup() error

	// Publish performs a request to the system under test.
	Publish(payload *[]byte) error

	// Teardown is called upon benchmark completion.
	Teardown() error

	// GetID returns the Id of the Publisher
	GetID() uint64

Publisher issues requests for a particular system under test.

type PublisherFactory

type PublisherFactory interface {
	// GetPublisher returns a new Publisher, called for each Benchmark
	// connection.
	GetPublisher(number uint64) Publisher

PublisherFactory creates new Publishers.

type SubSummary

type SubSummary struct {
	NumSubs            uint64
	ReceivedTotal      uint64
	TimeElapsed        time.Duration
	AvgReceiveRate     float64
	SuccessHistogram   *hdrhistogram.Histogram
	BytesTotal         uint64
	BytesAvgThroughput float64

SubSummary holds all statistic summary data for subscribers

func (*SubSummary) String

func (s *SubSummary) String() string

String returns a stringified version of the Summary.

type Subscriber

type Subscriber interface {
	// setup prepares the Subscriber for benchmarking.
	Setup(opts *config.Options) error

	// Start receiving
	CheckForCompletion() error

	// Teardown is called upon benchmark completion.
	Teardown() error

	// Summarize the results
	Summarize() *Summary

Subscriber reads messages from the system under test.

type SubscriberFactory

type SubscriberFactory interface {
	// GetSubscriber returns a new Subscriber
	GetSubscriber(number uint64) Subscriber

SubscriberFactory creates new Subscriber instances

type Summary

type Summary struct {
	Pub *PubSummary
	Sub *SubSummary

Summary contains the results of a Benchmark run.

func (*Summary) GenerateLatencyDistribution

func (s *Summary) GenerateLatencyDistribution(percentiles Percentiles, file string) error

GenerateLatencyDistribution generates a text file containing the specified latency distribution in a format plottable by http://hdrhistogram.github.io/HdrHistogram/plotFiles.html. Percentiles is a list of percentiles to include, e.g. 10.0, 50.0, 99.0, 99.99, etc. If percentiles is nil, it defaults to a logarithmic percentile scale. If a request rate was specified for the benchmark, this will also generate an uncorrected distribution file which does not account for coordinated omission.

func (*Summary) String

func (s *Summary) String() string

String returns a stringified version of the Summary.

func (*Summary) WriteRequestRatesAndThroughput

func (s *Summary) WriteRequestRatesAndThroughput(file string)

WriteRequestRatesAndThroughput writes a CSV file containing the request rate schedule and the actual measured throughputs

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL