syncbuffer

package module
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 20, 2023 License: MIT Imports: 2 Imported by: 0

README

syncbuffer

go.dev reference

Syncbuffer's structs are useful for applications where multiple separate readers need to be kept roughly synchronised with a source; for example, audio or video streaming.

Example

package main

import (
    "fmt"
    "sync"
    "time"

    "github.com/ny0m/syncbuffer"
)

func main() {
    // New buffer with space for 2 items that adds messages every millisecond.
    sb := syncbuffer.NewSyncBuffer(time.Millisecond, 2)
    
    var wg sync.WaitGroup
    wg.Add(1)
    
    // Start streaming somewhere.
    go func() {
        defer wg.Done()
    
        s := syncbuffer.NewStreamer(sb)
        for p := range s.Stream() {
            fmt.Println(p)
        }
    }()
    
    time.Sleep(time.Millisecond)
    
    for i := 0; i < 10; i++ {
        // Add will block until a millisecond has passed.
        sb.Add([]byte{byte(i)})
    }
    
    // Close the buffer and stop its streamers.
    sb.Close()
    
    // Ensure that the program waits for the streamer to finish.
    wg.Wait()
    
    // Output:
    // [0]
    // [1]
    // [2]
    // [3]
    // [4]
    // [5]
    // [6]
    // [7]
    // [8]
    // [9]
}

Documentation

Overview

Package syncbuffer provides functionality for writing byte streams and keeping readers of those streams roughly in sync with each other.

Example
package main

import (
	"log"
	"sync"
	"time"

	"github.com/ny0m/syncbuffer"
)

func main() {
	// New buffer with space for 2 items that adds messages every millisecond.
	sb := syncbuffer.NewSyncBuffer(time.Second/2, 2)

	var wg sync.WaitGroup
	wg.Add(1)

	// Start streaming somewhere.
	go func() {
		defer wg.Done()

		s := syncbuffer.NewStreamer(sb)
		for p := range s.Stream() {
			log.Println(p)
		}
	}()

	for i := 0; i < 10; i++ {
		// Add will block until a millisecond has passed.
		sb.Add([]byte{byte(i)})
	}

	// Close the buffer and stop its streamers.
	sb.Close()

	// Ensure that the program waits for the streamer to finish.
	wg.Wait()

}
Output:

[0]
[1]
[2]
[3]
[4]
[5]
[6]
[7]
[8]
[9]

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Error

type Error string

Error allows errors to be consts.

func (Error) Error

func (e Error) Error() string

type RingBuffer

type RingBuffer struct {
	// contains filtered or unexported fields
}

RingBuffer stores a fixed-size list of any type of items. If the buffer is full, the oldest item is overwritten.

func NewRingBuffer

func NewRingBuffer(size int) *RingBuffer

NewRingBuffer returns a ring buffer of the given size.

func (*RingBuffer) Add

func (r *RingBuffer) Add(item interface{})

Add appends an item to the buffer. It may overwrite the oldest item in the buffer.

func (*RingBuffer) OldestCursor

func (r *RingBuffer) OldestCursor() int

OldestCursor returns the cursor of the oldest item in the buffer.

func (*RingBuffer) ReadFrom

func (r *RingBuffer) ReadFrom(cursor int) ([]interface{}, int)

ReadFrom returns all items newer than the given cursor, and a new cursor, which successive reads should use to query the buffer to keep data contiguous. If the given cursor is out of bounds, ReadFrom will return a nil slice. If the item at cursor is no longer available, ReadFrom will return the current entire buffer.

type Streamer

type Streamer struct {
	// contains filtered or unexported fields
}

Streamer allows items to be read from its parent buffer.

func NewStreamer

func NewStreamer(sb *SyncBuffer) *Streamer

NewStreamer returns a thing that allows data to be read from the parent buffer. Streamer.Close must be called when the streamer is done with; however, closing the parent buffer will have the same effect from the streamer's perspective.

func (*Streamer) Close

func (st *Streamer) Close()

Close stops the streamer.

func (*Streamer) Stream

func (st *Streamer) Stream() chan []byte

Stream returns a channel that emits items from the parent buffer as soon as they become available.

type SyncBuffer

type SyncBuffer struct {
	// contains filtered or unexported fields
}

SyncBuffer is a fixed-size buffer that allows writes at a period specified by its frequency. Writes via Add, and reads via a Streamer are thread-safe.

func NewSyncBuffer

func NewSyncBuffer(freq time.Duration, size int) *SyncBuffer

NewSyncBuffer returns a buffer of the given size that writes items at a period defined by the given frequency. Close must be called on the buffer after it's done with.

func (*SyncBuffer) Add

func (sb *SyncBuffer) Add(item []byte)

Add then adds the given item to the buffer, overwriting the oldest item if the buffer is full. Will block for at least the period defined by the buffer's frequency.

func (*SyncBuffer) Close

func (sb *SyncBuffer) Close()

Close cleans up the buffer's internal goroutines and closes all streamers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL