Documentation
¶
Index ¶
- Variables
- type CirBuf
- func (cb *CirBuf[T]) FilterItems(filter func(item T, index int) bool) []T
- func (cb *CirBuf[T]) ForEach(fn func(item T) bool)
- func (cb *CirBuf[T]) GetAll() ([]T, int)
- func (cb *CirBuf[T]) GetAt(index int) (T, bool)
- func (cb *CirBuf[T]) GetFirst() (T, int, bool)
- func (cb *CirBuf[T]) GetLast() (T, int, bool)
- func (cb *CirBuf[T]) GetRange(start int, end int) ([]T, int, bool)
- func (cb *CirBuf[T]) GetTotalCountAndHeadOffset() (int, int)
- func (cb *CirBuf[T]) IsEmpty() bool
- func (cb *CirBuf[T]) IsFull() bool
- func (cb *CirBuf[T]) Read() (T, bool)
- func (cb *CirBuf[T]) Size() int
- func (cb *CirBuf[T]) Write(element T) *T
- func (cb *CirBuf[T]) WriteAt(element T, index int) error
- type SetOnceConfig
- type SyncMap
- func (sm *SyncMap[K, T]) Delete(key K)
- func (sm *SyncMap[K, T]) ForEach(fn func(K, T))
- func (sm *SyncMap[K, T]) Get(key K) T
- func (sm *SyncMap[K, T]) GetEx(key K) (T, bool)
- func (sm *SyncMap[K, T]) GetOrCreate(key K, createFn func() T) (T, bool)
- func (sm *SyncMap[K, T]) Keys() []K
- func (sm *SyncMap[K, T]) Len() int
- func (sm *SyncMap[K, T]) Set(key K, value T)
- type TimeSampleAligner
- func (tsa *TimeSampleAligner) AddSample(ts int64) (int, error)
- func (tsa *TimeSampleAligner) GetFirstTimestamp() int64
- func (tsa *TimeSampleAligner) GetLogicalTimeFromRealTimestamp(ts int64) int
- func (tsa *TimeSampleAligner) GetMaxLogicalTime() int
- func (tsa *TimeSampleAligner) GetRealTimestampFromLogical(logical int) int64
- func (tsa *TimeSampleAligner) GetTimestamps() (int, []int64)
- type VersionedMap
- func (vm *VersionedMap[K, V]) ForEach(fn func(key K, value V, version int64))
- func (vm *VersionedMap[K, V]) Get(key K) (V, int64, bool)
- func (vm *VersionedMap[K, V]) GetSinceVersion(version int64) (map[K]V, int64)
- func (vm *VersionedMap[K, V]) Keys() []K
- func (vm *VersionedMap[K, V]) Set(key K, value V)
- func (vm *VersionedMap[K, V]) SetAndIncVersion(key K, value V)
- func (vm *VersionedMap[K, V]) SetVersion(version int64)
Constants ¶
This section is empty.
Variables ¶
var ErrSampleSkipped = errors.New("sample skipped due to timing skew")
var ErrSampleTooClose = errors.New("sample too close to previous timestamp")
Functions ¶
This section is empty.
Types ¶
type CirBuf ¶
type CirBuf[T any] struct { Lock *sync.Mutex MaxSize int TotalCount int HeadOffset int Buf []T Head int Tail int }
CirBuf is a generic circular buffer implementation that is thread-safe. It dynamically grows until it reaches MaxSize and can reclaim memory when emptied.
func MakeCirBuf ¶
MakeCirBuf creates a new circular buffer with the specified maximum size. The buffer is initially empty and will grow dynamically as elements are added.
func (*CirBuf[T]) FilterItems ¶
FilterItems returns a slice of items for which the provided filter function returns true. The filter function takes the item and its absolute index (TotalCount-based) and returns a boolean. This is useful for filtering items based on custom criteria, such as timestamp.
func (*CirBuf[T]) ForEach ¶ added in v0.9.0
ForEach iterates through all elements in the buffer in order from oldest to newest, calling the provided function on each element. If the function returns false, iteration stops early. The buffer is locked during the entire iteration.
func (*CirBuf[T]) GetAll ¶
GetAll returns a slice containing all elements in the buffer in order from oldest to newest. This does not remove elements from the buffer. It also returns the HeadOffset, which is the offset of the first element in the buffer.
func (*CirBuf[T]) GetAt ¶ added in v0.9.0
GetAt returns the element at the specified absolute index and a boolean indicating whether the element exists. The index is absolute (based on TotalCount), not relative to the current buffer position. Returns the zero value of T and false if the index is out of bounds.
func (*CirBuf[T]) GetFirst ¶ added in v0.9.0
GetFirst returns the first element in the buffer, its offset, and a boolean indicating whether the buffer has any elements. If the buffer is empty, the zero value of T, 0, and false are returned.
func (*CirBuf[T]) GetLast ¶
GetLast returns the last element in the buffer, its offset, and a boolean indicating whether the buffer has any elements. If the buffer is empty, the zero value of T, 0, and false are returned.
func (*CirBuf[T]) GetTotalCountAndHeadOffset ¶
func (*CirBuf[T]) Read ¶
Read removes and returns the oldest element from the circular buffer. If the buffer is empty, the zero value of T and false are returned.
func (*CirBuf[T]) Write ¶
func (cb *CirBuf[T]) Write(element T) *T
Write adds an element to the circular buffer. If the buffer is full, the oldest element will be overwritten. Returns a pointer to the element that was kicked out, or nil if no element was kicked out.
func (*CirBuf[T]) WriteAt ¶ added in v0.9.0
WriteAt writes an element at a specific index in the buffer. Returns an error if the index is before the current buffer range. If the index is beyond the current range, fills with zero values up to that index. If the index is within the current range, overwrites the element at that position.
type SetOnceConfig ¶ added in v0.8.1
type SetOnceConfig[T any] struct { // contains filtered or unexported fields }
SetOnceConfig provides a thread-safe way to set a configuration value exactly once with fallback to a default configuration if nil is provided
func NewSetOnceConfig ¶ added in v0.8.1
func NewSetOnceConfig[T any](defaultCfg T) *SetOnceConfig[T]
NewSetOnceConfig creates a new SetOnceConfig with the provided default configuration The default configuration is immediately stored and available via Get()
func (*SetOnceConfig[T]) Get ¶ added in v0.8.1
func (soc *SetOnceConfig[T]) Get() T
Get returns the current configuration value Always safe to call as default is set during NewSetOnceConfig
func (*SetOnceConfig[T]) SetOnce ¶ added in v0.8.1
func (soc *SetOnceConfig[T]) SetOnce(cfg *T) bool
SetOnce attempts to set the configuration value exactly once, overriding the default If cfg is nil, keeps the default configuration Returns true if the configuration was set, false if it was already set
type SyncMap ¶
type SyncMap[K comparable, T any] struct { // contains filtered or unexported fields }
func MakeSyncMap ¶
func MakeSyncMap[K comparable, T any]() *SyncMap[K, T]
func (*SyncMap[K, T]) ForEach ¶ added in v0.9.0
func (sm *SyncMap[K, T]) ForEach(fn func(K, T))
ForEach iterates over all key/value pairs in the map, calling the provided function for each pair
func (*SyncMap[K, T]) GetOrCreate ¶
GetOrCreate gets a value by key. If the key doesn't exist, it calls the provided function to create a new value, sets it in the map, and returns it. Returns the value and a boolean indicating if the key was found (true) or created (false).
func (*SyncMap[K, T]) Keys ¶
func (sm *SyncMap[K, T]) Keys() []K
Keys returns a slice of all keys in the map
type TimeSampleAligner ¶ added in v0.9.0
type TimeSampleAligner struct {
// contains filtered or unexported fields
}
TimeSampleAligner converts real-world timestamps (which may have skew, gaps, or timing drift) into clean logical time indices for frontend consumption.
Key features:
- Maps incoming samples to logical time slots (0, 1, 2, 3...)
- Maintains timing accuracy within ±1000ms of real timestamps
- Handles clock drift and power-sleep gaps gracefully
- Uses ring buffer for efficient memory management
- Provides both logical indices and real timestamps for graphing
Algorithm overview:
- Tracks running time skew between expected and actual sample timing
- When skew exceeds ±1000ms, corrects by either filling gaps or dropping samples
- For gaps (skew >= +1000ms): interpolates synthetic timestamps and advances logical time
- For samples arriving too fast (skew <= -1000ms): drops sample and resets skew
- Maintains invariant: abs(timeSkew) < 1000ms for timing accuracy
Usage pattern:
aligner := MakeTimeSampleAligner2(maxSamples) logicalTime, err := aligner.AddSample(timestampMs) baseLogical, timestamps := aligner.GetTimestamps() // Frontend can use logical indices 0,1,2... while mapping back to real timestamps
func MakeTimeSampleAligner ¶ added in v0.9.0
func MakeTimeSampleAligner(maxSamples int) *TimeSampleAligner
func (*TimeSampleAligner) AddSample ¶ added in v0.9.0
func (tsa *TimeSampleAligner) AddSample(ts int64) (int, error)
func (*TimeSampleAligner) GetFirstTimestamp ¶ added in v0.9.0
func (tsa *TimeSampleAligner) GetFirstTimestamp() int64
func (*TimeSampleAligner) GetLogicalTimeFromRealTimestamp ¶ added in v0.9.0
func (tsa *TimeSampleAligner) GetLogicalTimeFromRealTimestamp(ts int64) int
func (*TimeSampleAligner) GetMaxLogicalTime ¶ added in v0.9.0
func (tsa *TimeSampleAligner) GetMaxLogicalTime() int
func (*TimeSampleAligner) GetRealTimestampFromLogical ¶ added in v0.9.0
func (tsa *TimeSampleAligner) GetRealTimestampFromLogical(logical int) int64
func (*TimeSampleAligner) GetTimestamps ¶ added in v0.9.0
func (tsa *TimeSampleAligner) GetTimestamps() (int, []int64)
type VersionedMap ¶ added in v0.9.0
type VersionedMap[K comparable, V any] struct { // contains filtered or unexported fields }
func MakeVersionedMap ¶ added in v0.9.0
func MakeVersionedMap[K comparable, V any]() *VersionedMap[K, V]
func (*VersionedMap[K, V]) ForEach ¶ added in v0.9.0
func (vm *VersionedMap[K, V]) ForEach(fn func(key K, value V, version int64))
func (*VersionedMap[K, V]) Get ¶ added in v0.9.0
func (vm *VersionedMap[K, V]) Get(key K) (V, int64, bool)
func (*VersionedMap[K, V]) GetSinceVersion ¶ added in v0.9.0
func (vm *VersionedMap[K, V]) GetSinceVersion(version int64) (map[K]V, int64)
func (*VersionedMap[K, V]) Keys ¶ added in v0.9.0
func (vm *VersionedMap[K, V]) Keys() []K
func (*VersionedMap[K, V]) Set ¶ added in v0.9.0
func (vm *VersionedMap[K, V]) Set(key K, value V)
func (*VersionedMap[K, V]) SetAndIncVersion ¶ added in v0.9.0
func (vm *VersionedMap[K, V]) SetAndIncVersion(key K, value V)
func (*VersionedMap[K, V]) SetVersion ¶ added in v0.9.0
func (vm *VersionedMap[K, V]) SetVersion(version int64)