Documentation
¶
Overview ¶
Package syncbuffer provides functionality for writing byte streams and keeping readers of those streams roughly in sync with each other.
Example ¶
package main
import (
"log"
"sync"
"time"
"github.com/ny0m/syncbuffer"
)
func main() {
// New buffer with space for 2 items that adds messages every millisecond.
sb := syncbuffer.NewSyncBuffer(time.Second/2, 2)
var wg sync.WaitGroup
wg.Add(1)
// Start streaming somewhere.
go func() {
defer wg.Done()
s := syncbuffer.NewStreamer(sb)
for p := range s.Stream() {
log.Println(p)
}
}()
for i := 0; i < 10; i++ {
// Add will block until a millisecond has passed.
sb.Add([]byte{byte(i)})
}
// Close the buffer and stop its streamers.
sb.Close()
// Ensure that the program waits for the streamer to finish.
wg.Wait()
}
Output: [0] [1] [2] [3] [4] [5] [6] [7] [8] [9]
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type RingBuffer ¶
type RingBuffer struct {
// contains filtered or unexported fields
}
RingBuffer stores a fixed-size list of any type of items. If the buffer is full, the oldest item is overwritten.
func NewRingBuffer ¶
func NewRingBuffer(size int) *RingBuffer
NewRingBuffer returns a ring buffer of the given size.
func (*RingBuffer) Add ¶
func (r *RingBuffer) Add(item interface{})
Add appends an item to the buffer. It may overwrite the oldest item in the buffer.
func (*RingBuffer) OldestCursor ¶
func (r *RingBuffer) OldestCursor() int
OldestCursor returns the cursor of the oldest item in the buffer.
func (*RingBuffer) ReadFrom ¶
func (r *RingBuffer) ReadFrom(cursor int) ([]interface{}, int)
ReadFrom returns all items newer than the given cursor, and a new cursor, which successive reads should use to query the buffer to keep data contiguous. If the given cursor is out of bounds, ReadFrom will return a nil slice. If the item at cursor is no longer available, ReadFrom will return the current entire buffer.
type Streamer ¶
type Streamer struct {
// contains filtered or unexported fields
}
Streamer allows items to be read from its parent buffer.
func NewStreamer ¶
func NewStreamer(sb *SyncBuffer) *Streamer
NewStreamer returns a thing that allows data to be read from the parent buffer. Streamer.Close must be called when the streamer is done with; however, closing the parent buffer will have the same effect from the streamer's perspective.
type SyncBuffer ¶
type SyncBuffer struct {
// contains filtered or unexported fields
}
SyncBuffer is a fixed-size buffer that allows writes at a period specified by its frequency. Writes via Add, and reads via a Streamer are thread-safe.
func NewSyncBuffer ¶
func NewSyncBuffer(freq time.Duration, size int) *SyncBuffer
NewSyncBuffer returns a buffer of the given size that writes items at a period defined by the given frequency. Close must be called on the buffer after it's done with.
func (*SyncBuffer) Add ¶
func (sb *SyncBuffer) Add(item []byte)
Add then adds the given item to the buffer, overwriting the oldest item if the buffer is full. Will block for at least the period defined by the buffer's frequency.
func (*SyncBuffer) Close ¶
func (sb *SyncBuffer) Close()
Close cleans up the buffer's internal goroutines and closes all streamers.