buffer

package
v0.0.0-...-e2be882 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 18, 2024 License: GPL-3.0 Imports: 12 Imported by: 0

README

TODO : get this from go-ex-machina

Documentation

Index

Constants

View Source
const Delimiter = "|"

Variables

This section is empty.

Functions

func Avg

func Avg(b TimeWindowView) float64

Avg returns the avg value

func Rat

func Rat(b TimeWindowView) float64

Rat returns the avg value

Types

type BatchWindow

type BatchWindow struct {
	Window *IntervalWindow `json:"Window"`
	// contains filtered or unexported fields
}

BatchWindow is a window struct that gathers the passed `x` elements from an IntervalWindow.

func NewBatchWindow

func NewBatchWindow(id string, dim int, duration time.Duration, size int) (*BatchWindow, <-chan []StatsMessage)

NewBatchWindow creates a new batch window.

func (*BatchWindow) Close

func (bw *BatchWindow) Close() error

Close closes the channel

func (*BatchWindow) Push

func (bw *BatchWindow) Push(t time.Time, v ...float64)

Push adds an element to the given time Index. It will return true, if there was a new bucket completed at the last operation

func (*BatchWindow) WithEcho

func (bw *BatchWindow) WithEcho() *BatchWindow

type Bucket

type Bucket struct {
	// contains filtered or unexported fields
}

Bucket groups together stats collectors with the same Index

func NewBucket

func NewBucket(id int64, dim int) Bucket

NewBucket creates a new bucket with a collector of the given dimensions.

func (Bucket) Flush

func (b Bucket) Flush() ([]Stats, []Stats)

Flush flushes the current stats of the bucket.

func (Bucket) Index

func (b Bucket) Index() int64

Index returns the bucket Index.

func (*Bucket) Push

func (b *Bucket) Push(index int64, v ...float64) bool

Push adds an element to the bucket for the given Index. it returns true if the bucket has the right Index, false otherwise. This allows to build higher level abstractions i.e. Window etc ...

func (Bucket) Size

func (b Bucket) Size() int

Size returns the number of elements in the bucket.

func (Bucket) String

func (b Bucket) String() string

func (Bucket) Values

func (b Bucket) Values() StatsCollector

Values returns the bucket StatsCollector for the bucket.

type Buffer

type Buffer struct {
	// contains filtered or unexported fields
}

Buffer defines a simple float buffer that acts like a constant size queue

func NewBuffer

func NewBuffer(size int) *Buffer

NewBuffer creates a new buffer.

func (*Buffer) GetAsFloats

func (b *Buffer) GetAsFloats(reverse bool) []float64

GetAsFloats returns the buffer elements in the order they were added.

func (*Buffer) GetAsStrings

func (b *Buffer) GetAsStrings(reverse bool) []string

GetAsStrings returns the buffer elements in the order they were added.

func (*Buffer) Push

func (b *Buffer) Push(x interface{}) (interface{}, bool)

Push adds an element to the buffer.

func (*Buffer) Size

func (b *Buffer) Size() int

type Data

type Data struct {
	Count int     `json:"count"`
	Mean  float64 `json:"value"`
	Ratio float64 `json:"ratio"`
	First float64 `json:"first"`
	Last  float64 `json:"last"`
	StDev float64 `json:"std"`
	EMA   float64 `json:"ema"`
}

type HMM

type HMM struct {
	Config []HMMConfig                     `json:"config"`
	State  map[Sequence]map[Sequence]State `json:"state"`
	Status Status                          `json:"status"`
	// contains filtered or unexported fields
}

HMM counts occurrences in a sequence of strings. It implements effectively several hidden markov model of the n-grams lengths provided in the Config.

func HMMFromState

func HMMFromState(hmm HMM) *HMM

HMMFromState creates a new model from a previous one.

func NewMultiHMM

func NewMultiHMM(config ...HMMConfig) *HMM

NewMultiHMM creates a new State.

func (*HMM) Add

func (hmm *HMM) Add(s string) Status

Add adds a string to the State sequence. TODO : reverse the logic by accepting the result instead of the input. (This should allow us to filter out irrelevant data and save space, Note : State is expensive in terms of memory storage )

func (*HMM) Load

func (hmm *HMM) Load(filename string) error

func (*HMM) Predict

func (hmm *HMM) Predict(key Sequence) map[Sequence]Predictions

func (*HMM) Save

func (hmm *HMM) Save(filename string) error

type HMMConfig

type HMMConfig struct {
	IgnoreValues []string `json:"ignore"`
	LookBack     int      `json:"lookback"`
	LookAhead    int      `json:"lookahead"`
}

HMMConfig defines the configuration for the hidden markov model analysis.

func NewHMMConfig

func NewHMMConfig(back, ahead int, ignoreValues ...string) HMMConfig

type HistoryWindow

type HistoryWindow struct {
	Window TimeWindow `json:"Window"`
	// contains filtered or unexported fields
}

HistoryWindow keeps the last x buckets based on the Window interval given

func NewHistoryWindow

func NewHistoryWindow(duration time.Duration, size int) HistoryWindow

NewHistoryWindow creates a new history Window.

func (HistoryWindow) Buffer

func (h HistoryWindow) Buffer(index int, extract func(b TimeWindowView) float64) ([]float64, error)

Buffer evaluates the elements of the current buffer

func (HistoryWindow) Extract

func (h HistoryWindow) Extract(index int, extract func(b TimeWindowView) float64) ([]float64, []float64, error)

Extract extracts the bucket values as a series

func (HistoryWindow) Get

func (h HistoryWindow) Get(transform TimeBucketTransform) []interface{}

Get returns the transformed bucket value at the corresponding Index. TODO : change the signature to act like json.Decode etc... so that we control the appending of properties on our own

func (HistoryWindow) Polynomial

func (h HistoryWindow) Polynomial(index int, extract func(b TimeWindowView) float64, degree int, trace bool) ([]float64, []float64, []float64, error)

Polynomial evaluates the polynomial regression for the given polynomial degree and based on the value extracted from the TimeBucket scaled at the corresponding time duration.

func (HistoryWindow) Push

func (h HistoryWindow) Push(t time.Time, v ...float64) (TimeBucket, bool)

Push adds an element to the given time Index. It will return true, if there was a new bucket completed at the last operation

func (HistoryWindow) Raw

func (h HistoryWindow) Raw() WindowStatus

func (HistoryWindow) String

func (h HistoryWindow) String() string

func (HistoryWindow) Values

func (h HistoryWindow) Values(index int, extract func(b TimeWindowView) float64) ([]float64, error)

Values returns the values of the time window in a slice

type IntervalWindow

type IntervalWindow struct {
	ID       string
	First    time.Time     `json:"first"`
	Last     time.Time     `json:"last"`
	Duration time.Duration `json:"duration"`
	Dim      int           `json:"dimensions"`
	// contains filtered or unexported fields
}

IntervalWindow defines a struct that returns the stats for the given interval. It gathers events for the given interval and flushes them at the predefined duration. If no events are gathered the window will return a StatsMessage with the flag OK set to false. The WithEcho method can control the window behaviour to flush a valid StatsMessage even without an event, in this case the value of the last StatsMessage will be echoed with count of `0` WithLimit sets a limit for the window in terms of number of event WithInterval sets the interval for the window based on the time dimension The window can either operate as 'live' , with a limit or with an interval (TODO : enforce)

func NewIntervalWindow

func NewIntervalWindow(id string, dim int, duration time.Duration) (*IntervalWindow, <-chan StatsMessage)

NewIntervalWindow creates a new IntervalWindow with the given Duration.

func (*IntervalWindow) Close

func (iw *IntervalWindow) Close() error

Close closes the channel

func (*IntervalWindow) Flush

func (iw *IntervalWindow) Flush()

Flush flushes the current bucket contents

func (*IntervalWindow) Push

func (iw *IntervalWindow) Push(t time.Time, v ...float64)

Push adds an element to the interval Window.

func (*IntervalWindow) WithEcho

func (iw *IntervalWindow) WithEcho() *IntervalWindow

func (*IntervalWindow) WithInterval

func (iw *IntervalWindow) WithInterval(interval int) *IntervalWindow

WithInterval sets a duration limit on the time dimension for a window to flush

func (*IntervalWindow) WithLimit

func (iw *IntervalWindow) WithLimit(limit int) *IntervalWindow

WithLimit sets a limit on the number of events for a window to flush

type MultiBuffer

type MultiBuffer struct {
	// contains filtered or unexported fields
}

MultiBuffer defines a simple float slice buffer that acts like a constant size queue

func NewMultiBuffer

func NewMultiBuffer(size int) *MultiBuffer

NewMultiBuffer creates a new buffer.

func (*MultiBuffer) Get

func (b *MultiBuffer) Get() [][]float64

Get returns the buffer elements in the order they were added.

func (*MultiBuffer) GetReverse

func (b *MultiBuffer) GetReverse() [][]float64

GetReverse returns the buffer elements in the reverse order they were added.

func (*MultiBuffer) Last

func (b *MultiBuffer) Last() []float64

Last returns the last element in the buffer.

func (*MultiBuffer) Len

func (b *MultiBuffer) Len() int

Len returns the current length of the buffer.

func (*MultiBuffer) Push

func (b *MultiBuffer) Push(x ...float64) ([]float64, bool)

Push adds an element to the buffer.

type Prediction

type Prediction struct {
	// ID is a unique numeric id related to the prediction
	ID int64
	// Value for the prediction . Essentially the concatenated string of the predicted sequence
	Value Sequence

	// Probability for the bucket prediction
	Probability float64
	// EMP is the exponential moving probability e.g. on-the-fly calculated probability with integrated exponential decay
	EMP float64
	// contains filtered or unexported fields
}

Prediction defines a prediction result with the computed Probability

func NewPrediction

func NewPrediction(s Sequence, st State) *Prediction

NewPrediction TODO : maybe better to choose a uuid, for now the unix second should be enough

func (*Prediction) String

func (p *Prediction) String() string

type PredictionList

type PredictionList []*Prediction

PredictionList is a sortable list of predictions

func (PredictionList) Len

func (p PredictionList) Len() int

func (PredictionList) Less

func (p PredictionList) Less(i, j int) bool

func (PredictionList) Swap

func (p PredictionList) Swap(i, j int)

type Predictions

type Predictions struct {
	Key Sequence
	// Values are the prediction details for each prediction
	Values PredictionList
	// Sample is the number of previous incidents of the source sequence that generated the bucket probability matrix
	Sample int
	// Groups is the number of groups / combinations of source sequences encountered of the given length.
	// TODO :assess the statistical significance of this
	Groups int
	// Count is the Count of invocations for this model
	Count int
	// Label is a string acting as metadata for the prediction
	Label string
}

type Ring

type Ring struct {
	// contains filtered or unexported fields
}

Ring is a ring buffer keeping the last x elements TODO : use container/ring

func NewRing

func NewRing(size int) *Ring

NewRing creates a new ring with the given buffer size.

func (*Ring) Get

func (r *Ring) Get(transform Transform) []interface{}

Get returns an ordered slice of the ring elements

func (*Ring) Push

func (r *Ring) Push(v interface{}) bool

Push adds an element to the ring.

func (*Ring) Size

func (r *Ring) Size() int

Size returns the number of non-nil elements within the ring.

func (Ring) String

func (r Ring) String() string

type Sequence

type Sequence string

Sequence defines a sequence of strings.

func NewSequence

func NewSequence(s []string) Sequence

NewSequence creates a new sequence from a string

func (Sequence) Values

func (s Sequence) Values() []string

Values returns the hidden values of the sequence. We are doing this work-around to be able to use a slice of strings as a key in the Predictions map

type StackOfStats

type StackOfStats []Stats

func (StackOfStats) ToData

func (sos StackOfStats) ToData() []Data

type State

type State struct {
	Count int     `json:"count"`
	EMP   float64 `json:"emp"`
}

type Stats

type Stats struct {
	// contains filtered or unexported fields
}

Stats is a set of statistical properties of a set of numbers.

func MockStats

func MockStats(count int, sum float64, first float64, last float64) Stats

func NewStats

func NewStats() *Stats

NewStats creates a new Stats.

func (Stats) Avg

func (s Stats) Avg() float64

Avg returns the average value of the set.

func (Stats) Count

func (s Stats) Count() int

Count returns the number of elements.

func (Stats) Diff

func (s Stats) Diff() float64

Diff returns the difference of max and min.

func (Stats) EMA

func (s Stats) EMA() float64

EMA is the exponential moving average of the set.

func (*Stats) Push

func (s *Stats) Push(v float64)

Push adds another element to the set.

func (Stats) Range

func (s Stats) Range() (float64, float64)

Range returns the range of the diff.

func (Stats) Ratio

func (s Stats) Ratio() float64

Ratio returns the percentage of the diff.

func (Stats) SampleStDev

func (s Stats) SampleStDev() float64

SampleStDev is the sample standard deviation of the set.

func (Stats) SampleVariance

func (s Stats) SampleVariance() float64

SampleVariance is the sample variance of the set.

func (Stats) Size

func (s Stats) Size() int

Size returns the number of non-zero elements.

func (Stats) StDev

func (s Stats) StDev() float64

StDev is the standard deviation of the set.

func (Stats) Sum

func (s Stats) Sum() float64

Sum returns the sum value of the set.

func (Stats) Variance

func (s Stats) Variance() float64

Variance is the mathematical variance of the set.

type StatsCollector

type StatsCollector struct {
	// contains filtered or unexported fields
}

StatsCollector is a collection of Stats variables. This enabled multi-dimensional tracking.

func NewStatsCollector

func NewStatsCollector(dim int) *StatsCollector

NewStatsCollector creates a new Stats collector.

func (*StatsCollector) Push

func (sc *StatsCollector) Push(v ...float64)

Push pushes each value to the corresponding dimension.

func (*StatsCollector) Size

func (sc *StatsCollector) Size() int

Size returns the size of the bucket.

func (StatsCollector) Stats

func (sc StatsCollector) Stats() []*Stats

func (StatsCollector) String

func (sc StatsCollector) String() string

type StatsMessage

type StatsMessage struct {
	OK       bool          `json:"ok"`
	First    time.Time     `json:"first"`
	Init     time.Time     `json:"init"`
	Last     time.Time     `json:"last"`
	ID       string        `json:"id"`
	Duration time.Duration `json:"Duration"`
	Dim      int           `json:"Dimensions"`
	Stats    []Stats       `json:"-"`
	Data     []Data        `json:"data"`
}

StatsMessage defines a stats message instance. It gathers stats and metadata for the aggregation of the provided values.

type Status

type Status struct {
	Count   int                           `json:"count"`
	Samples map[Sequence]map[Sequence]int `json:"sample"`
}

Status reflects the bucket Status of the HMM it is the inverse map of the state

type TimeBucket

type TimeBucket struct {
	Bucket
	Time time.Time
}

TimeBucket is a wrapper for a bucket of a TimeWindow, that carries also the time Index.

type TimeBucketTransform

type TimeBucketTransform func(bucket TimeBucket) interface{}

TimeBucketTransform is a operation acting on an TimeWindow slice.

func StatsWindow

func StatsWindow(dim int) TimeBucketTransform

StatsWindow is a predefined TimeBucketTransform function that will gather the stats for the given dimensions.

func WindowDensity

func WindowDensity() TimeBucketTransform

type TimeWindow

type TimeWindow struct {
	Index    int64 `json:"Index"`
	Duration int64 `json:"Duration"`
	// contains filtered or unexported fields
}

TimeWindow is a Window indexed by the bucket time.

func NewTimeWindow

func NewTimeWindow(duration time.Duration) TimeWindow

NewTimeWindow creates a new TimeWindow with the given Duration.

func (*TimeWindow) Bucket

func (tw *TimeWindow) Bucket() TimeBucket

Bucket returns the current active bucket stats from the window.

func (*TimeWindow) Next

func (tw *TimeWindow) Next(iterations int64) time.Time

Next returns the next timestamp for the coming Window.

func (*TimeWindow) Push

func (tw *TimeWindow) Push(t time.Time, v ...float64) (TimeBucket, bool)

Push adds an element to the time Window. It will return true, if the last addition caused a bucket to close.

func (TimeWindow) String

func (tw TimeWindow) String() string

type TimeWindowView

type TimeWindowView struct {
	Time    time.Time `json:"time"`
	Count   int       `json:"Count"`
	Value   float64   `json:"value"`
	EMADiff float64   `json:"ema_diff"`
	Diff    float64   `json:"diff"`
	Ratio   float64   `json:"ratio"`
	StdDev  float64   `json:"std"`
	Density int       `json:"density"`
}

TimeWindowView is a time specific but static snapshot on top of a StatsCollector. It allows to retrieve buckets of Stats from a streaming data set. the bucket indexing is based on the time.

func NewView

func NewView(bucket TimeBucket, index int) TimeWindowView

NewView creates a new time view from a time bucket.

type Transform

type Transform func(bucket interface{}) interface{}

Transform is a operation acting on an object and returning another one. Because users know what objects they work with, this abstraction makes sense for the bucket scope. Otherwise it's clear that this method is too generic to be used in another context.

type Window

type Window struct {
	// contains filtered or unexported fields
}

Window is a helper struct allowing grouping together Buckets of StatsCollectors for the given Index.

func NewWindow

func NewWindow(size int64, dim int) *Window

NewWindow creates a new Window of the given Window size e.g. the Index range for each bucket.

func (*Window) Bucket

func (w *Window) Bucket() Bucket

Bucket returns the current active bucket for the window.

func (*Window) Current

func (w *Window) Current() int64

Current returns the bucket Index the Window accumulates data on.

func (*Window) Next

func (w *Window) Next() int64

Next is the next Index at which a new bucket will be created

func (*Window) Push

func (w *Window) Push(index int64, value ...float64) (int64, Bucket, bool)

Push adds an element to a Window at the given Index. returns if the Window closed, e.g. if last element initiated a new bucket. (Note that based on this logic we ll only know when a Window closed only on the initiation of a new one) (Note that the Index must be increasing for this logic to work) NOTE : This is not a hashmap implementation !

func (Window) String

func (w Window) String() string

type WindowStatus

type WindowStatus struct {
	LastIndex  int64
	TimeValues []interface{}
	TimeSize   int
	Values     []string
	Size       int
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL