Documentation
¶
Overview ¶
Package varopt contains an implementation of VarOpt, an unbiased weighted sampling algorithm described in the paper "Stream sampling for variance-optimal estimation of subset sums" https://arxiv.org/pdf/0803.0473.pdf (2008), by Edith Cohen, Nick Duffield, Haim Kaplan, Carsten Lund, and Mikkel Thorup.
VarOpt is a reservoir-type sampler that maintains a fixed-size sample and provides a mechanism for merging unequal-weight samples.
This package also includes a simple reservoir sampling algorithm, often useful in conjunction with weighed reservoir sampling, using Algorithm R from "Random sampling with a reservoir", https://en.wikipedia.org/wiki/Reservoir_sampling#Algorithm_R (1985), by Jeffrey Vitter.
See https://github.com/lightstep/varopt/blob/master/README.md for more detail.
Index ¶
- Variables
- type Varopt
- func (s *Varopt[T]) Add(item T, weight float64) (T, error)
- func (s *Varopt[T]) Capacity() int
- func (s *Varopt[T]) CopyFrom(from *Varopt[T])
- func (s *Varopt[T]) Get(i int) (T, float64)
- func (s *Varopt[T]) GetOriginalWeight(i int) float64
- func (v *Varopt[T]) Init(capacity int, rnd *rand.Rand)
- func (s *Varopt[T]) Reset()
- func (s *Varopt[T]) Size() int
- func (s *Varopt[T]) Tau() float64
- func (s *Varopt[T]) TotalCount() int
- func (s *Varopt[T]) TotalWeight() float64
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrInvalidWeight = fmt.Errorf("Negative, Zero, Inf or NaN weight")
Functions ¶
This section is empty.
Types ¶
type Varopt ¶
type Varopt[T any] struct { // Large-weight items stored in a min-heap. L internal.SampleHeap[T] // Light-weight items. T []internal.Vsample[T] // Temporary buffer. X []internal.Vsample[T] // contains filtered or unexported fields }
Varopt implements the algorithm from Stream sampling for variance-optimal estimation of subset sums Edith Cohen, Nick Duffield, Haim Kaplan, Carsten Lund, Mikkel Thorup 2008
https://arxiv.org/pdf/0803.0473.pdf
func New ¶
New returns a new Varopt sampler with given capacity (i.e., reservoir size) and random number generator.
Example ¶
package main import ( "fmt" "math" "math/rand" "github.com/lightstep/varopt" ) type packet struct { size int color string protocol string } func main() { const totalPackets = 1e6 const sampleRatio = 0.01 colors := []string{"red", "green", "blue"} protocols := []string{"http", "tcp", "udp"} sizeByColor := map[string]int{} sizeByProtocol := map[string]int{} trueTotalWeight := 0.0 rnd := rand.New(rand.NewSource(32491)) sampler := varopt.New[packet](totalPackets*sampleRatio, rnd) for i := 0; i < totalPackets; i++ { packet := packet{ size: 1 + rnd.Intn(100000), color: colors[rnd.Intn(len(colors))], protocol: protocols[rnd.Intn(len(protocols))], } sizeByColor[packet.color] += packet.size sizeByProtocol[packet.protocol] += packet.size trueTotalWeight += float64(packet.size) sampler.Add(packet, float64(packet.size)) } estSizeByColor := map[string]float64{} estSizeByProtocol := map[string]float64{} estTotalWeight := 0.0 for i := 0; i < sampler.Size(); i++ { packet, weight := sampler.Get(i) estSizeByColor[packet.color] += weight estSizeByProtocol[packet.protocol] += weight estTotalWeight += weight } // Compute mean average percentage error for colors colorMape := 0.0 for _, c := range colors { colorMape += math.Abs(float64(sizeByColor[c])-estSizeByColor[c]) / float64(sizeByColor[c]) } colorMape /= float64(len(colors)) // Compute mean average percentage error for protocols protocolMape := 0.0 for _, p := range protocols { protocolMape += math.Abs(float64(sizeByProtocol[p])-estSizeByProtocol[p]) / float64(sizeByProtocol[p]) } protocolMape /= float64(len(protocols)) // Compute total sum error percentage fmt.Printf("Total sum error %.2g%%\n", 100*math.Abs(estTotalWeight-trueTotalWeight)/trueTotalWeight) fmt.Printf("Color mean absolute percentage error %.2f%%\n", 100*colorMape) fmt.Printf("Protocol mean absolute percentage error %.2f%%\n", 100*protocolMape) }
Output: Total sum error 2.4e-11% Color mean absolute percentage error 0.73% Protocol mean absolute percentage error 1.62%
func (*Varopt[T]) Add ¶
Add considers a new observation for the sample with given weight. If there is an item ejected from the sample as a result, the item is returned to allow re-use of memory.
An error will be returned if the weight is either negative or NaN.
func (*Varopt[T]) Capacity ¶
Capacity returns the size of the reservoir. This is the maximum size of the sample.
func (*Varopt[T]) CopyFrom ¶ added in v1.4.0
CopyFrom copies the fields of `from` into this Varopt[T].
func (*Varopt[T]) Get ¶
Get() returns the i'th sample and its adjusted weight. To obtain the sample's original weight (i.e. what was passed to Add), use GetOriginalWeight(i).
func (*Varopt[T]) GetOriginalWeight ¶
GetOriginalWeight returns the original input weight of the sample item that was passed to Add(). This can be useful for computing a frequency from the adjusted sample weight.
Example ¶
This example shows how to use Varopt sampling to estimate frequencies with the use of inverse probability weights. The use of inverse probability creates a uniform expected value, in this of the number of sample points per second.
While the number of expected points per second is uniform, the output sample weights are expected to match the original frequencies.
package main import ( "fmt" "math" "math/rand" "github.com/lightstep/varopt" ) type curve struct { color string mean float64 stddev float64 } type testPoint struct { color int xvalue float64 } var colors = []curve{ {color: "red", mean: 10, stddev: 15}, {color: "green", mean: 30, stddev: 10}, {color: "blue", mean: 50, stddev: 20}, } // This example shows how to use Varopt sampling to estimate // frequencies with the use of inverse probability weights. The use // of inverse probability creates a uniform expected value, in this of // the number of sample points per second. // // While the number of expected points per second is uniform, the // output sample weights are expected to match the original // frequencies. func main() { // Number of points. const totalCount = 1e6 // Relative size of the sample. const sampleRatio = 0.01 // Ensure this test is deterministic. rnd := rand.New(rand.NewSource(104729)) // Construct a timeseries consisting of three colored signals, // for x=0 to x=60 seconds. var points []testPoint // origCounts stores the original signals at second granularity. origCounts := make([][]int, len(colors)) for i := range colors { origCounts[i] = make([]int, 60) } // Construct the signals by choosing a random color, then // using its Gaussian to compute a timestamp. for len(points) < totalCount { choose := rnd.Intn(len(colors)) series := colors[choose] xvalue := rnd.NormFloat64()*series.stddev + series.mean if xvalue < 0 || xvalue > 60 { continue } origCounts[choose][int(math.Floor(xvalue))]++ points = append(points, testPoint{ color: choose, xvalue: xvalue, }) } // Compute the total number of points per second. This will be // used to establish the per-second probability. xcount := make([]int, 60) for _, point := range points { xcount[int(math.Floor(point.xvalue))]++ } // Compute the sample with using the inverse probability as a // weight. This ensures a uniform distribution of points in each // second. sampleSize := int(sampleRatio * float64(totalCount)) sampler := varopt.New[testPoint](sampleSize, rnd) for _, point := range points { second := int(math.Floor(point.xvalue)) prob := float64(xcount[second]) / float64(totalCount) sampler.Add(point, 1/prob) } // sampleCounts stores the reconstructed signals. sampleCounts := make([][]float64, len(colors)) for i := range colors { sampleCounts[i] = make([]float64, 60) } // pointCounts stores the number of points per second. pointCounts := make([]int, 60) // Reconstruct the signals using the output sample weights. // The effective count of each sample point is its output // weight divided by its original weight. for i := 0; i < sampler.Size(); i++ { point, weight := sampler.Get(i) origWeight := sampler.GetOriginalWeight(i) second := int(math.Floor(point.xvalue)) sampleCounts[point.color][second] += (weight / origWeight) pointCounts[second]++ } // Compute standard deviation of sample points per second. sum := 0.0 mean := float64(sampleSize) / 60 for s := 0; s < 60; s++ { e := float64(pointCounts[s]) - mean sum += e * e } stddev := math.Sqrt(sum / (60 - 1)) fmt.Printf("Samples per second mean %.2f\n", mean) fmt.Printf("Samples per second standard deviation %.2f\n", stddev) // Compute mean absolute percentage error between sampleCounts // and origCounts for each signal. for c := range colors { mae := 0.0 for s := 0; s < 60; s++ { mae += math.Abs(sampleCounts[c][s]-float64(origCounts[c][s])) / float64(origCounts[c][s]) } mae /= 60 fmt.Printf("Mean absolute percentage error (%s) = %.2f%%\n", colors[c].color, mae*100) } }
Output: Samples per second mean 166.67 Samples per second standard deviation 13.75 Mean absolute percentage error (red) = 25.16% Mean absolute percentage error (green) = 14.30% Mean absolute percentage error (blue) = 14.23%
func (*Varopt[T]) Init ¶ added in v1.4.0
Init initializes a Varopt[T] in-place, avoiding an allocation compared with New().
func (*Varopt[T]) Reset ¶ added in v1.2.0
func (s *Varopt[T]) Reset()
Reset returns the sampler to its initial state, maintaining its capacity and random number source.
func (*Varopt[T]) Size ¶
Size returns the current number of items in the sample. If the reservoir is full, this returns Capacity().
func (*Varopt[T]) Tau ¶
Tau returns the current large-weight threshold. Weights larger than Tau() carry their exact weight in the sample. See the VarOpt paper for details.
func (*Varopt[T]) TotalCount ¶
TotalCount returns the number of calls to Add().
func (*Varopt[T]) TotalWeight ¶
TotalWeight returns the sum of weights that were passed to Add().