disruptor

package module
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 4, 2025 License: MIT Imports: 8 Imported by: 0

README

LMAX Disruptor written in Go

The Disruptor was originally a library written in Java that provided a concurrent ring buffer data structure of the same name, developed at LMAX Exchange.

This repo is yet-another port of the disruptor in Go. It is performant and free of heap allocation when running.

If for some reason you have Go code that needs to process messages at sub-microsecond latency, where shaving every nanosecond counts, then consider the disruptor pattern. Example situations:

  • Financial trading systems (high-frequency trading)
  • Real-time game servers (authoritative server logic)
  • High-performance network packet processing (within a user-space application)
  • Real-time data analytics/stream processing (very low latency pipelines)

Key Considerations When Choosing a Disruptor Over Channels

  • Benchmark in your specific scenario: Don't assume a disruptor is always better. Benchmark your application with both channels and your disruptor implementation to see if the latency reduction is actually significant and justifies the added complexity.
  • Complexity: The disruptor is generally more complex to understand than Go channels. Make sure the performance gain outweighs the added complexity in development and maintenance.
  • Memory Management: Disruptors often rely on pre-allocated buffers and ring buffer structures. Understand the memory implications and ensure you manage memory effectively, especially in long-running applications.
  • Garbage Collection: While you are using Go, be mindful that even with a disruptor, GC can still run and introduce pauses.

Comparisons to other Go ports

There is already an existing port (smarty-prototypes/go-disruptor), but the key advantages of this library over that are:

  • Better encapsulation: The user does not need to create and interact with the ring buffer directly (unless using WriteBatch and/or BatchReadFunc).
  • Batching support: WriteBatch and BatchReadFunc allow the user to efficiently batch items in/out of the ring buffer, e.g. via SIMD code.
  • Generics support: This library takes advantage of Go generics to simplify using the disruptor.

Nonetheless, these advantages come with slightly higher latency, on the order of O(1 ns). Take this into consideration when deciding which LMAX port to use.

Benchmarks

Benchmarks of 128-byte message throughput for smarty-prototypes/go-disruptor, five-vee/disruptor, and buffered Go channels. The producer and consumer run in their own goroutine. The buffer size is 1 << 22.

(Ran on my Macbook Air M3.)

$ go test -benchmem -run=^$ -bench . github.com/five-vee/go-disruptor/benchmarks
goos: darwin
goarch: arm64
pkg: github.com/five-vee/go-disruptor/benchmarks
cpu: Apple M3
BenchmarkDisruptor_22-8         112661708               10.26 ns/op            0 B/op          0 allocs/op
BenchmarkSmartystreets_22-8     131429239                9.208 ns/op           0 B/op          0 allocs/op
BenchmarkChannel_22-8           43865726                31.71 ns/op            0 B/op          0 allocs/op
PASS
ok      github.com/five-vee/go-disruptor/benchmarks     6.219s

Features

  • Support single producer and single consumer.
  • Support multiple producers. [^1]
  • Support multiple consumers.
  • Support different waiting strategies.
  • Support modifying the buffer directly.
  • Support consumer dependencies.
  • go.pkg.dev documentation.
  • Support producer and consumer batching.

[^1]: At the moment, multiple producers is explicitly not supported due to follow the single writer principle. I.e. a single writer can write messages faster than multiple writers.

Documentation

Overview

Package disruptor provides an implementation of the LMAX Disruptor.

If for some reason you have Go code that needs to process messages at sub-microsecond latency, where shaving every nanosecond counts, then consider the disruptor pattern.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrCapacity is the error corresponding to wrong capacity.
	ErrCapacity = fmt.Errorf("capacity must be a power of two")

	// ErrMissingReaderGroup is the error corresponding to missing
	// reader group(s).
	ErrMissingReaderGroup = fmt.Errorf("missing reader group(s)")

	// ErrEmptyReaderGroup is the error corresponding to an empty
	// reader group.
	ErrEmptyReaderGroup = fmt.Errorf("reader group is empty")
)

Functions

This section is empty.

Types

type Builder

type Builder[T any] struct {
	// contains filtered or unexported fields
}

Builder builds a disruptor.

func NewBuilder

func NewBuilder[T any](capacity int64) *Builder[T]

NewBuilder returns a builder of a disruptor.

func (*Builder[T]) Build

func (b *Builder[T]) Build() (*Disruptor[T], error)

Build builds the disruptor.

func (*Builder[T]) WithReaderGroup

func (b *Builder[T]) WithReaderGroup(group ...ReaderFunc) *Builder[T]

WithReaderGroup represents a group of readers. If this is the first time WithReaderGroup is called, the reader group is the descendant of the Writer. Otherwise, the reader group is a descendant of the reader group of the previously passed in WithReaderGroup().

func (*Builder[T]) WithReaderYield added in v0.1.3

func (b *Builder[T]) WithReaderYield(yield func()) *Builder[T]

WithReaderYield overrides how ReadLoop yields when the buffer is empty.

func (*Builder[T]) WithWriterYield added in v0.1.3

func (b *Builder[T]) WithWriterYield(yield func(spins int)) *Builder[T]

WithWriterYield overrides how Write/WriteBatch yields when the buffer is full. yield receives the number of times yield has been called so far in a Write/WriteBatch call.

type Disruptor

type Disruptor[T any] struct {
	// contains filtered or unexported fields
}

Disruptor supports a single writer and multiple readers.

func (*Disruptor[T]) Close

func (d *Disruptor[T]) Close()

Close stops the disruptor.

func (*Disruptor[T]) LoopRead

func (d *Disruptor[T]) LoopRead()

LoopRead continuously reads messages and passes them to a provided reader(s). Blocks until the ring buffer is closed and empty.

func (*Disruptor[T]) Write

func (d *Disruptor[T]) Write(f func(item *T))

Write adds an item to the disruptor. f writes in-place into the ring buffer.

func (*Disruptor[T]) WriteBatch

func (d *Disruptor[T]) WriteBatch(n int64, f func(ptrs [2]*T, lens [2]int))

WriteBatch adds n items to the disruptor. f is essentially a function that accepts a two sub-slices of the internal ring buffer:

1. First sub-slice is to the end of the ring buffer, 2. Secondsub-slice is from the beginning of the ring buffer.

It is possible the 2nd sub-slice is empty if n doesn't wrap around the ring buffer, i.e. length == 0.

Use WriteBatch over Write only if the complexity is needed and if the overhead of sub-slicing is much smaller than the time saved by batching, e.g. when working with SIMD code to write large numbers of items into the disruptor.

type ReaderFunc

type ReaderFunc interface {
	// contains filtered or unexported methods
}

ReaderFunc represents a reader function.

func BatchReaderFunc

func BatchReaderFunc[T any](f func(ptrs [2]*T, lens [2]int)) ReaderFunc

BatchReaderFunc returns a ReaderFunc that reads in batches. f is essentially a function that accepts a sub-slice of the internal ring buffer and is called twice:

1. First sub-slice is to the end of the ring buffer, 2. Secondsub-slice is from the beginning of the ring buffer.

It is possible the 2nd sub-slice is empty if n doesn't wrap around the ring buffer, i.e. length == 0.

Use BatchReaderFunc over SingleReaderFunc only if the complexity is needed and if the overhead of sub-slicing is much smaller than the time saved by batching, e.g. when working with SIMD code to read large numbers of items from the disruptor.

func SingleReaderFunc

func SingleReaderFunc[T any](f func(*T)) ReaderFunc

SingleReaderFunc returns a ReaderFunc that reads one at a time.

Directories

Path Synopsis
internal
pad

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL