rtrb

package
v0.1.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 14, 2025 License: Apache-2.0, Apache-2.0 Imports: 3 Imported by: 0

README

LocklessGenericRingBuffer

LocklessGenericRingBuffer is a single producer, multi reader lockless ring buffer utilizing the new generics available in go 1.18+. Instead of passing typeless interface{} or byte arrays we are able to pass serialized structs between go routines in a type safe manner.

What is a lockless ringbuffer ?

A ring buffer, also known as a circular buffer, is a fixed-size buffer that can be efficiently appended to and read from. This implementation allows for multiple goroutines to concurrently read and a single goroutine to write to the buffer without the need for locks, ensuring maximum throughput and minimal latency.

Benefits

  • Zero Heap Allocations
  • Cache Friendly as underlying structures are continuous in memory
  • Faster then channels for highly contended workloads (See Benchmarks)
  • Zero Dependencies

Requirements

  • golang 1.18.x or above

Getting started

Note: writers and consumers are NOT thread safe, i.e. only use a consumer in a single go routine

Install
go get github.com/GavinClarke0/lockless-generic-ring-buffer
Create and Consume
var buffer, _ = CreateBuffer[int](16) // buffer size must be to the power 2

messages := []int{1, 2, 3, 4, 5, 6, 7, 8, 9}
consumer, _ := buffer.CreateConsumer()

for _, value := range messages {
	buffer.Write(value)
}

for _, _ = range messages {
	_ = consumer.Get()
}
Remove a Consumer
var consumer, _ = buffer.CreateConsumer()
consumer.Remove()

Benchmarks

Comparison against channels

Benchmarks are ran on a M1 Macbook Air (16gb ram).

Note: each benchmark does not include creation time of the consumers/channels.

BenchmarkConsumerSequentialReadWriteLarge-8           20          55602675 ns/op               0 B/op          0 allocs/op
BenchmarkChannelsSequentialReadWriteLarge-8            8         133155344 ns/op               0 B/op          0 allocs/op
BenchmarkConsumerSequentialReadWriteMedium-8        1063           1123298 ns/op               0 B/op          0 allocs/op
BenchmarkChannelsSequentialReadWriteMedium-8         451           2650842 ns/op               0 B/op          0 allocs/op
BenchmarkConsumerSequentialReadWriteSmall-8        99393             12099 ns/op               0 B/op          0 allocs/op
BenchmarkChannelsSequentialReadWriteSmall-8        41755             28758 ns/op               0 B/op          0 allocs/op
BenchmarkConsumerConcurrentReadWriteLarge-8            5         223985800 ns/op             345 B/op          2 allocs/op
BenchmarkChannelsConcurrentReadWriteLarge-8            2         858931292 ns/op             144 B/op          2 allocs/op
BenchmarkConsumerConcurrentReadWriteMedium-8         278           4554057 ns/op             217 B/op          2 allocs/op
BenchmarkChannelsConcurrentReadWriteMedium-8          90          17578294 ns/op             169 B/op          2 allocs/op
BenchmarkConsumerConcurrentReadWriteSmall-8        36378             33837 ns/op              96 B/op          2 allocs/op
BenchmarkChannelsConcurrentReadWriteSmall-8        25004             47466 ns/op              97 B/op          2 allocs/op

In sequential benchmarks it is about 2x the read write speed of channels.

In concurrent benchmarks, where operations can block, it is about 2x faster than the channel implementation.

Testing

To run current tests: go test

To detect race conditions run with go test -race, which as of the latest commit (January 24, 2021) with the current test cases it passes.

Note this does not mean it is race condition free. Additional tests, especially on creating and removing consumers in concurrent environments are needed.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	MaxConsumerError  = errors.New("max amount of consumers reached cannot create any more")
	InvalidBufferSize = errors.New("buffer must be of size 2^n")
)

定义错误类型

Functions

This section is empty.

Types

type Consumer

type Consumer[T any] struct {
	// contains filtered or unexported fields
}

Consumer 消费者结构体

func (*Consumer[T]) Get

func (consumer *Consumer[T]) Get() T

Get 从环形缓冲区获取数据(阻塞直到数据可用)

func (*Consumer[T]) Remove

func (consumer *Consumer[T]) Remove()

Remove 销毁当前消费者

type RingBuffer

type RingBuffer[T any] struct {
	// contains filtered or unexported fields
}

RingBuffer 是一个支持多消费者的并发安全环形缓冲区

func CreateBuffer

func CreateBuffer[T any](size uint32, maxReaders uint32) (RingBuffer[T], error)

CreateBuffer 初始化环形缓冲区 size 必须是2的幂,maxReaders 指定最大消费者数量

func (*RingBuffer[T]) CreateConsumer

func (buffer *RingBuffer[T]) CreateConsumer() (Consumer[T], error)

CreateConsumer 创建一个新的消费者 通过原子操作分配未使用的消费者槽位,支持并发安全创建

func (*RingBuffer[T]) Write

func (buffer *RingBuffer[T]) Write(value T)

Write 向缓冲区写入数据(并发安全)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL