loadtest

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 19, 2023 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Overview

This file maintains data structures that correspond to those provided by CometBFT.

Index

Constants

View Source
const (
	SelectSuppliedEndpoints   = "supplied"   // Select only the supplied endpoint(s) for load testing (the default).
	SelectDiscoveredEndpoints = "discovered" // Select newly discovered endpoints only (excluding supplied endpoints).
	SelectAnyEndpoints        = "any"        // Select from any of supplied and/or discovered endpoints.
)
View Source
const CLIVersion = "v0.1.0"

CLIVersion must be manually updated as new versions are released.

View Source
const (
	KVStoreClientIDLen int = 5 // Allows for 6,471,002 random client IDs (62C5)

)

The CometBFT common.RandStr method can effectively generate human-readable (alphanumeric) strings from a set of 62 characters. We aim here with the KVStore client to generate unique client IDs as well as totally unique keys for all transactions. Values are not so important.

Variables

This section is empty.

Functions

func ExecuteStandalone

func ExecuteStandalone(cfg Config) error

ExecuteStandalone will run a standalone (non-coordinator/worker) load test.

func RegisterClientFactory

func RegisterClientFactory(name string, factory ClientFactory) error

RegisterClientFactory allows us to programmatically register different client factories to easily switch between different ones at runtime.

func Run

func Run(cli *CLIConfig)

Run must be executed from your `main` function in your Go code. This can be used to fast-track the construction of your own load testing tool for your CometBFT ABCI application.

Types

type AggregateStats

type AggregateStats struct {
	TotalTxs         int     // The total number of transactions sent.
	TotalTimeSeconds float64 // The total time taken to send `TotalTxs` transactions.
	TotalBytes       int64   // The cumulative number of bytes sent as transactions.

	// Computed statistics
	AvgTxRate   float64 // The rate at which transactions were submitted (tx/sec).
	AvgDataRate float64 // The rate at which data was transmitted in transactions (bytes/sec).
}

func (*AggregateStats) Compute

func (s *AggregateStats) Compute()

func (*AggregateStats) String

func (s *AggregateStats) String() string

type CLIConfig

type CLIConfig struct {
	AppName              string
	AppShortDesc         string
	AppLongDesc          string
	DefaultClientFactory string
}

CLIConfig allows developers to customize their own load testing tool.

type ChannelStatus

type ChannelStatus struct {
	ID                byte
	SendQueueCapacity JSONStrInt
	SendQueueSize     JSONStrInt
	Priority          JSONStrInt
	RecentlySent      JSONStrInt64
}

type Client

type Client interface {
	// GenerateTx must generate a raw transaction to be sent to the relevant
	// broadcast_tx method for a given endpoint.
	GenerateTx() ([]byte, error)
}

Client generates transactions to be sent to a specific endpoint.

type ClientFactory

type ClientFactory interface {
	// ValidateConfig must check whether the given configuration is valid for
	// our specific client factory.
	ValidateConfig(cfg Config) error

	// NewClient must instantiate a new load testing client, or produce an error
	// if that process fails.
	NewClient(cfg Config) (Client, error)
}

ClientFactory produces load testing clients.

type Config

type Config struct {
	ClientFactory        string   `json:"client_factory"`         // Which client factory should we use for load testing?
	Connections          int      `json:"connections"`            // The number of WebSockets connections to make to each target endpoint.
	Time                 int      `json:"time"`                   // The total time, in seconds, for which to handle the load test.
	SendPeriod           int      `json:"send_period"`            // The period (in seconds) at which to send batches of transactions.
	Rate                 int      `json:"rate"`                   // The number of transactions to generate, per send period.
	Size                 int      `json:"size"`                   // The desired size of each generated transaction, in bytes.
	Count                int      `json:"count"`                  // The maximum number of transactions to send. Set to -1 for unlimited.
	BroadcastTxMethod    string   `json:"broadcast_tx_method"`    // The broadcast_tx method to use (can be "sync", "async" or "commit").
	Endpoints            []string `json:"endpoints"`              // A list of the CometBFT node endpoints to which to connect for this load test.
	EndpointSelectMethod string   `json:"endpoint_select_method"` // The method by which to select endpoints for load testing.
	ExpectPeers          int      `json:"expect_peers"`           // The minimum number of peers to expect before starting a load test. Set to 0 by default (no minimum).
	MaxEndpoints         int      `json:"max_endpoints"`          // The maximum number of endpoints to use for load testing. Set to 0 by default (no maximum).
	MinConnectivity      int      `json:"min_connectivity"`       // The minimum number of peers to which each peer must be connected before starting the load test. Set to 0 by default (no minimum).
	PeerConnectTimeout   int      `json:"peer_connect_timeout"`   // The maximum time to wait (in seconds) for all peers to connect, if ExpectPeers > 0.
	StatsOutputFile      string   `json:"stats_output_file"`      // Where to store the final aggregate statistics file (in CSV format).
	NoTrapInterrupts     bool     `json:"no_trap_interrupts"`     // Should we avoid trapping Ctrl+Break? Only relevant for standalone execution mode.
}

Config represents the configuration for a single client (i.e. standalone or worker).

func (Config) MaxTxsPerEndpoint

func (c Config) MaxTxsPerEndpoint() uint64

MaxTxsPerEndpoint estimates the maximum number of transactions that this configuration would generate for a single endpoint.

func (Config) ToJSON

func (c Config) ToJSON() string

func (Config) Validate

func (c Config) Validate() error

type ConnectionStatus

type ConnectionStatus struct {
	Duration    JSONDuration
	SendMonitor FlowStatus
	RecvMonitor FlowStatus
	Channels    []ChannelStatus
}

type Coordinator

type Coordinator struct {
	// contains filtered or unexported fields
}

Coordinator is a WebSockets server that allows workers to connect to it to obtain configuration information. It does nothing but coordinate load testing amongst the workers.

func NewCoordinator

func NewCoordinator(cfg *Config, coordCfg *CoordinatorConfig) *Coordinator

func (*Coordinator) ReceiveWorkerUpdate

func (c *Coordinator) ReceiveWorkerUpdate(msg workerMsg)

func (*Coordinator) RegisterRemoteWorker

func (c *Coordinator) RegisterRemoteWorker(rw *remoteWorker) error

func (*Coordinator) Run

func (c *Coordinator) Run() error

Run will execute the coordinator's operations in a blocking manner, returning any error that causes one of the workers or the coordinator to fail.

func (*Coordinator) UnregisterRemoteWorker

func (c *Coordinator) UnregisterRemoteWorker(id string, err error)

type CoordinatorConfig

type CoordinatorConfig struct {
	BindAddr             string `json:"bind_addr"`       // The "host:port" to which to bind the coordinator node to listen for incoming workers.
	ExpectWorkers        int    `json:"expect_workers"`  // The number of workers to expect before starting the load test.
	WorkerConnectTimeout int    `json:"connect_timeout"` // The number of seconds to wait for all workers to connect.
	ShutdownWait         int    `json:"shutdown_wait"`   // The number of seconds to wait at shutdown (while keeping the HTTP server running - primarily to allow Prometheus to keep polling).
	LoadTestID           int    `json:"load_test_id"`    // An integer greater than 0 that will be exposed via a Prometheus gauge while the load test is underway.
}

CoordinatorConfig is the configuration options specific to a coordinator node.

func (CoordinatorConfig) ToJSON

func (c CoordinatorConfig) ToJSON() string

func (CoordinatorConfig) Validate

func (c CoordinatorConfig) Validate() error

type DefaultNodeInfo

type DefaultNodeInfo struct {
	ProtocolVersion ProtocolVersion `json:"protocol_version"`

	DefaultNodeID string `json:"id"`          // authenticated identifier
	ListenAddr    string `json:"listen_addr"` // accepting incoming

	Network  string   `json:"network"`  // network/chain ID
	Version  string   `json:"version"`  // major.minor.revision
	Channels HexBytes `json:"channels"` // channels this node knows about

	Moniker string               `json:"moniker"` // arbitrary moniker
	Other   DefaultNodeInfoOther `json:"other"`   // other application specific data
}

DefaultNodeInfo is the basic node information exchanged between two peers during the CometBFT P2P handshake.

type DefaultNodeInfoOther

type DefaultNodeInfoOther struct {
	TxIndex    string `json:"tx_index"`
	RPCAddress string `json:"rpc_address"`
}

DefaultNodeInfoOther is the misc. applcation specific data

type FlowStatus

type FlowStatus struct {
	Start    time.Time    // Transfer start time
	Bytes    JSONStrInt64 // Total number of bytes transferred
	Samples  JSONStrInt64 // Total number of samples taken
	InstRate JSONStrInt64 // Instantaneous transfer rate
	CurRate  JSONStrInt64 // Current transfer rate (EMA of InstRate)
	AvgRate  JSONStrInt64 // Average transfer rate (Bytes / Duration)
	PeakRate JSONStrInt64 // Maximum instantaneous transfer rate
	BytesRem JSONStrInt64 // Number of bytes remaining in the transfer
	Duration JSONDuration // Time period covered by the statistics
	Idle     JSONDuration // Time since the last transfer of at least 1 byte
	TimeRem  JSONDuration // Estimated time to completion
	Progress Percent      // Overall transfer progress
	Active   bool         // Flag indicating an active transfer
}

FlowStatus represents the current Monitor status. All transfer rates are in bytes per second rounded to the nearest byte.

type HexBytes

type HexBytes []byte

HexBytes enables HEX-encoding for json/encoding.

func (HexBytes) MarshalJSON

func (bz HexBytes) MarshalJSON() ([]byte, error)

func (*HexBytes) UnmarshalJSON

func (bz *HexBytes) UnmarshalJSON(data []byte) error

type JSONDuration

type JSONDuration time.Duration

JSONDuration is a time.Duration whose JSON representation is a string containing an integer value.

func (*JSONDuration) UnmarshalJSON

func (i *JSONDuration) UnmarshalJSON(data []byte) error

type JSONStrInt

type JSONStrInt int

JSONStrInt is an integer whose JSON representation is a string.

func (*JSONStrInt) UnmarshalJSON

func (i *JSONStrInt) UnmarshalJSON(data []byte) error

type JSONStrInt64

type JSONStrInt64 int64

JSONStrInt64 is a int64 whose JSON representation is a string.

func (*JSONStrInt64) UnmarshalJSON

func (i *JSONStrInt64) UnmarshalJSON(data []byte) error

type JSONStrUint64

type JSONStrUint64 uint64

JSONStrUint64 is a uint64 whose JSON representation is a string.

func (*JSONStrUint64) UnmarshalJSON

func (i *JSONStrUint64) UnmarshalJSON(data []byte) error

type KVStoreClient

type KVStoreClient struct {
	// contains filtered or unexported fields
}

KVStoreClient generates arbitrary transactions (random key=value pairs) to be sent to the kvstore ABCI application. The keys are structured as follows:

`[client_id][tx_id]=[tx_id]`

where each value (`client_id` and `tx_id`) is padded with 0s to meet the transaction size requirement.

func (*KVStoreClient) GenerateTx

func (c *KVStoreClient) GenerateTx() ([]byte, error)

type KVStoreClientFactory

type KVStoreClientFactory struct{}

KVStoreClientFactory creates load testing clients to interact with the built-in CometBFT kvstore ABCI application.

func NewKVStoreClientFactory

func NewKVStoreClientFactory() *KVStoreClientFactory

func (*KVStoreClientFactory) NewClient

func (f *KVStoreClientFactory) NewClient(cfg Config) (Client, error)

func (*KVStoreClientFactory) ValidateConfig

func (f *KVStoreClientFactory) ValidateConfig(cfg Config) error

type NetInfo

type NetInfo struct {
	Listening bool       `json:"listening"`
	Listeners []string   `json:"listeners"`
	NPeers    JSONStrInt `json:"n_peers"`
	Peers     []Peer     `json:"peers"`
}

NetInfo corresponds to the JSON-RPC response format produced by the CometBFT v0.34.x net_info RPC API.

type Peer

type Peer struct {
	NodeInfo         DefaultNodeInfo  `json:"node_info"`
	IsOutbound       bool             `json:"is_outbound"`
	ConnectionStatus ConnectionStatus `json:"connection_status"`
	RemoteIP         string           `json:"remote_ip"`
}

Peer represents a network peer.

type Percent

type Percent uint32

Percent represents a percentage in increments of 1/1000th of a percent.

type ProtocolVersion

type ProtocolVersion struct {
	P2P   JSONStrUint64 `json:"p2p"`
	Block JSONStrUint64 `json:"block"`
	App   JSONStrUint64 `json:"app"`
}

ProtocolVersion contains the protocol versions for the software.

type RPCError

type RPCError struct {
	Code    int    `json:"code"`
	Message string `json:"message"`
	Data    string `json:"data,omitempty"`
}

type RPCRequest

type RPCRequest struct {
	JSONRPC string          `json:"jsonrpc"`
	ID      int             `json:"id,omitempty"`
	Method  string          `json:"method"`
	Params  json.RawMessage `json:"params"` // must be map[string]interface{} or []interface{}
}

RPCRequest corresponds to the JSON-RPC request data format accepted by CometBFT v0.34.x.

type RPCResponse

type RPCResponse struct {
	JSONRPC string          `json:"jsonrpc"`
	ID      int             `json:"id,omitempty"`
	Result  json.RawMessage `json:"result,omitempty"`
	Error   *RPCError       `json:"error,omitempty"`
}

type Transactor

type Transactor struct {
	// contains filtered or unexported fields
}

Transactor represents a single wire-level connection to a CometBFT RPC endpoint, and this is responsible for sending transactions to that endpoint.

func NewTransactor

func NewTransactor(remoteAddr string, config *Config) (*Transactor, error)

NewTransactor initiates a WebSockets connection to the given host address. Must be a valid WebSockets URL, e.g. "ws://host:port/websocket"

func (*Transactor) Cancel

func (t *Transactor) Cancel()

Cancel will indicate to the transactor that it must stop, but does not wait until it has completely stopped. To wait, call the Transactor.Wait() method.

func (*Transactor) GetTxBytes

func (t *Transactor) GetTxBytes() int64

GetTxBytes returns the cumulative total number of bytes (as transactions) sent thus far by this transactor.

func (*Transactor) GetTxCount

func (t *Transactor) GetTxCount() int

GetTxCount returns the total number of transactions sent thus far by this transactor.

func (*Transactor) GetTxRate

func (t *Transactor) GetTxRate() float64

GetTxRate returns the average number of transactions per second sent by this transactor over the duration of its operation.

func (*Transactor) SetProgressCallback

func (t *Transactor) SetProgressCallback(id int, interval time.Duration, callback func(int, int, int64))

func (*Transactor) Start

func (t *Transactor) Start()

Start kicks off the transactor's operations in separate goroutines (one for reading from the WebSockets endpoint, and one for writing to it).

func (*Transactor) Wait

func (t *Transactor) Wait() error

Wait will block until the transactor terminates.

type TransactorGroup

type TransactorGroup struct {
	// contains filtered or unexported fields
}

TransactorGroup allows us to encapsulate the management of a group of transactors.

func NewTransactorGroup

func NewTransactorGroup() *TransactorGroup

func (*TransactorGroup) Add

func (g *TransactorGroup) Add(remoteAddr string, config *Config) error

Add will instantiate a new Transactor with the given parameters. If instantiation fails it'll automatically shut down and close all other transactors, returning the error.

func (*TransactorGroup) AddAll

func (g *TransactorGroup) AddAll(cfg *Config) error

func (*TransactorGroup) Cancel

func (g *TransactorGroup) Cancel()

Cancel signals to all transactors to stop their operations.

func (*TransactorGroup) SetLogger

func (g *TransactorGroup) SetLogger(logger logging.Logger)

func (*TransactorGroup) SetProgressCallback

func (g *TransactorGroup) SetProgressCallback(interval time.Duration, callback func(*TransactorGroup, int, int64))

func (*TransactorGroup) Start

func (g *TransactorGroup) Start()

Start will handle through all transactors and start them.

func (*TransactorGroup) Wait

func (g *TransactorGroup) Wait() error

Wait will wait for all transactors to complete, returning the first error we encounter.

func (*TransactorGroup) WriteAggregateStats

func (g *TransactorGroup) WriteAggregateStats(filename string) error

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker is a WebSockets client that interacts with the Coordinator node to (1) fetch its configuration, (2) execute a load test, and (3) report back to the coordinator node regularly on its progress.

func NewWorker

func NewWorker(cfg *WorkerConfig) (*Worker, error)

func (*Worker) Config

func (w *Worker) Config() Config

func (*Worker) ID

func (w *Worker) ID() string

func (*Worker) Run

func (w *Worker) Run() error

Run executes the primary event loop for this worker.

type WorkerConfig

type WorkerConfig struct {
	ID                  string `json:"id"`              // A unique ID for this worker instance. Will show up in the metrics reported by the coordinator for this worker.
	CoordAddr           string `json:"coord_addr"`      // The address at which to find the coordinator node.
	CoordConnectTimeout int    `json:"connect_timeout"` // The maximum amount of time, in seconds, to allow for the coordinator to become available.
}

WorkerConfig is the configuration options specific to a worker node.

func (WorkerConfig) ToJSON

func (c WorkerConfig) ToJSON() string

func (WorkerConfig) Validate

func (c WorkerConfig) Validate() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL