Version: v0.2.1 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Jun 26, 2021 License: GPL-3.0 Imports: 18 Imported by: 0



Package payloadComputation provides an repository of functions/statistical tests to be performed by trace file data



View Source
const Giga = 1024 * 1024 * 1024

Giga SI unit prefix

View Source
const Mega = 1024 * 1024

Mega SI unit prefix


View Source
var ErrInvSnapInterval = errors.New("snapshot interval larger than trace file count")
View Source
var ErrOneSetEmpty = errors.New("cannot compute, at least one of the sets is empty")


func ComputeCorrelation

func ComputeCorrelation(ctx context.Context, workerCount int, refereceTrace []float64, traces [][]float64) ([]float64, error)

ComputeCorrelation computes correlation between refereceTrace and all entries of traces using NormalizedCrossCorrelationBig in a parallelized manner

func GetAvailablePayloads

func GetAvailablePayloads() []string

GetAvailablePayloads returns a slice with all valid payload names that may be passed to GetWorkerPayloadCreator

func NormalizedCrossCorrelateFloat64

func NormalizedCrossCorrelateFloat64(a []float64, b []float64) (float64, error)

NormalizedCrossCorrelateFloat64 implements matlab's xcorr(a,b,0,'normalized')

func NormalizedCrossCorrelationBig

func NormalizedCrossCorrelationBig(a []float64, b []float64) (*big.Float, error)

NormalizedCrossCorrelationBig implements matlab's xcorr(a,b,0,'normalized'). Before normalization all computations are done with big.Float value range


type ComputationRuntime

type ComputationRuntime struct {
	//number of compute workers to spawn; increase if not cpu gated
	ComputeWorkers int
	//controls buffer (unit trace files) available to FeederWorkers; increase to fill RAM for max performance
	BufferSizeInGB int
	//Amount of files after which a snapshotDeltaShard is created
	SnapshotInterval int
	//constructor for the WorkerPayload that should be computed
	WorkerPayloadCreator WorkerPayloadCreator
	//gets called once the next snapshot is created. Increasing order of snapshots is guaranteed.
	SnapshotSaver SnapshotSaverFunc
	//For detailed status information useful for debugging but not for normal operation
	DebugLog *log.Logger
	//For status information that are useful during normal operations but could be omitted
	InfoLog *log.Logger
	//For critical warnings and errors that may not be omitted
	ErrLog *log.Logger

	//prometheus metrics
	MetricsRegistry  *prometheus.Registry
	MetrMaxTestValue prometheus.Gauge

	MetrXCorrAgainstFixedPrefix             prometheus.Gauge
	MetrXCorrAgainstRandomPrefix            prometheus.Gauge
	MetrInputFileCount                      prometheus.Gauge
	MetrReadFilesCount                      prometheus.Counter
	MetrProcessedFilesCount                 prometheus.Counter
	MetrInputBufferFreeSlots                prometheus.Gauge
	MetrQQBufferFreeSlots                   prometheus.Gauge
	MetrSnapshotterWaitQueueSize            prometheus.Gauge
	MetrSnapshotterDeltaShardQueueFreeSlots prometheus.Gauge
	// contains filtered or unexported fields

ComputationRuntime configures resource usage and performed payload computation

func NewComputationRuntime

func NewComputationRuntime(computeWorkers, bufferSizeInGB, snapshotInterval int, wpc WorkerPayloadCreator, ss SnapshotSaverFunc,
	debugLog, infoLog, errLog *log.Logger) (*ComputationRuntime, error)

func (*ComputationRuntime) Run

func (config *ComputationRuntime) Run(ctx context.Context, traceSource traceSource.TraceBlockReader, traceParser wfm.TraceParser) (WorkerPayload, error)

Run performs the parallel computation of the payload denoted by config.WorkerPayloadCreator on the data defined by traceSource and traceParser According to config.SnapshotInterval, config.SnapshotSaver is called with periodic snapshots/intermediate results.

type Plotable

type Plotable interface {
	//Plot values according to the implementation and store to writer
	Plot(values []float64, writer io.Writer) error

type SnapshotSaverFunc

type SnapshotSaverFunc func(result []float64, rawSnapshot WorkerPayload, snapshotIDX int) error

type WelchTTest

type WelchTTest struct {
	// contains filtered or unexported fields

func (*WelchTTest) Decode

func (bmv *WelchTTest) Decode(r io.Reader) error

Decode decodes a WelchTTest that hase been encoded with Encode

func (*WelchTTest) DeepCopy

func (bmv *WelchTTest) DeepCopy() WorkerPayload

func (*WelchTTest) Encode

func (bmv *WelchTTest) Encode(w io.Writer) error

Encode applies gob to each field of bmv

func (*WelchTTest) Finalize

func (bmv *WelchTTest) Finalize() ([]float64, error)

func (*WelchTTest) MaxSubroutines

func (bmv *WelchTTest) MaxSubroutines() int

func (*WelchTTest) Merge

func (bmv *WelchTTest) Merge(other WorkerPayload) error

func (*WelchTTest) Name

func (bmv *WelchTTest) Name() string

func (*WelchTTest) Plot

func (bmv *WelchTTest) Plot(values []float64, writer io.Writer) error

Plot creates a line plot for values with a trace length adaptive threshold line and stores the result in writer. The threshold values are from

func (*WelchTTest) Reset

func (bmv *WelchTTest) Reset()

func (*WelchTTest) Update

func (bmv *WelchTTest) Update(fixed, random [][]float64)

func (*WelchTTest) WriteToCSV

func (bmv *WelchTTest) WriteToCSV(w io.Writer) error

type WorkerPayload

type WorkerPayload interface {
	//Name returns a descriptive name for the performed computation
	Name() string
	//MaxSubroutines returns an (approximation) for the maximal amount of subroutines used by this payload.
	//It is intended as a hint of how many parallel instances should be spawned
	MaxSubroutines() int
	//Update processes and adds fixed and random to the internal state
	Update(fixed, random [][]float64)
	//Finalize returns the result of the payload computation based on the current state
	Finalize() ([]float64, error)
	//Merge updates the state of this WorkerPayload with the one of other (equal to calling Update on all data
	//that has been added to other)
	Merge(other WorkerPayload) error
	//Reset the internal state to it's initial values to be equal to the state of an object created by the
	//constructor called with the same arguments
	//DeepCopy returns a copy of this worker payload and all of its internal state
	DeepCopy() WorkerPayload
	Encode(w io.Writer) error
	Decode(r io.Reader) error

WorkerPayload interface is an abstraction for computations to be performed on trace data. Conceptually the computation is split in two functions: update which adds new data and may change the state and Finalize, which produces the result of the computation and must be IDEMPOTENT, i.e. not change the state of the object. The Merge function is intended to allow running multiple instances in parallel and still be able to produce the total result

func NewWelchTTest

func NewWelchTTest(datapointsPerTrace int) WorkerPayload

NewWelchTTest creates a new WelchTTest instance. All calls to Update must contains exactly datapointsPerTrace entries per trace otherwise we panic

type WorkerPayloadCreator

type WorkerPayloadCreator func(datapointsPerTrace int) WorkerPayload

WorkerPayloadCreator is the common constructor type for WorkerPayload

func GetWorkerPayloadCreator

func GetWorkerPayloadCreator(name string) (WorkerPayloadCreator, error)

GetWorkerPayloadCreator returns the WorkerPayloadCreator that is registered for name or an error if name is not found

type WorkerPayloadPool

type WorkerPayloadPool struct {
	// contains filtered or unexported fields

WorkerPayloadPool wraps sync.Pool for WorkerPayload

func NewWorkerPayloadPool

func NewWorkerPayloadPool(creator WorkerPayloadCreator, datapointsPerTrace int) *WorkerPayloadPool

NewWorkerPayloadPool creates a new pool of worker payloads with the same constructor arguments

func (*WorkerPayloadPool) Get

func (wpp *WorkerPayloadPool) Get() WorkerPayload

Get returns a WorkerPayload with wiped state (either newly allocated or Reset() called)

func (*WorkerPayloadPool) Put

func (wpp *WorkerPayloadPool) Put(wp WorkerPayload)

Put wipes the state in wp and makes it available for others.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
t or T : Toggle theme light dark auto
y or Y : Canonical URL