seqflow

package module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 14, 2026 License: MIT Imports: 9 Imported by: 1

README

seqflow

High-performance lock-free Disruptor for Go

MIT zero allocs

中文  |  English


What is seqflow

The LMAX Disruptor is a high-performance inter-thread messaging framework born in the financial trading world. Its core idea: everything is driven by sequences — producers claim slots by advancing a sequence number, consumers read data by tracking that number. No locks, no queues, just atomic progression of integers.

seqflow = Sequence-driven Flow. A Go implementation of the Disruptor pattern with mechanism-level optimizations for Go's runtime.

How it Works

sequenceDiagram
    participant P as Producer
    participant D as Disruptor
    participant RB as RingBuffer
    participant L as Listener
    participant HA as Handler A
    participant HB as Handler B

    Note over P,D: 1. Claim slots atomically
    P->>D: Reserve(n)
    D-->>P: upper sequence number

    Note over P,RB: 2. Write data (zero-copy pointer)
    P->>RB: rb.Set(seq, event)

    Note over P,D: 3. Publish — atomic Store-Release
    P->>D: Commit(lower, upper)

    Note over D,L: 4. Listener detects committed sequence
    L->>D: barrier.Load()
    D-->>L: committedSeq

    Note over L,HA: 5. DAG dispatch: A first (no deps)
    L->>HA: Handle(lower, upper)
    HA-->>L: seq.Store(upper)

    Note over L,HB: 6. B waits for A via compositeBarrier
    L->>HB: Handle(lower, upper)
    HB-->>L: seq.Store(upper)

    Note over D,P: 7. Terminal barrier advances — frees slots
    L-->>D: terminalBarrier.Load()
    D-->>P: capacity freed

Install

go get github.com/gocronx/seqflow

Benchmark

Apple M4 / darwin arm64 / Go 1.22+ / 0 allocs across all tests

Single Writer

Scenario seqflow channel Speedup
1 slot per op 2.1 ns 21 ns 10x
16 slots per op 0.13 ns 22 ns/msg 160x

Multi Writer (4 goroutines)

Scenario seqflow channel Speedup
1 slot per op 39 ns 100 ns 2.6x
16 slots per op 2.3 ns 103 ns/msg 45x

Why is batch so fast? Reserve(16) claims 16 slots with a single atomic operation. Channel must send 16 times — 16 lock acquisitions.

Raw output
BenchmarkSeqflow_SingleWriter_Reserve1-10       2.131 ns/op    0 B/op    0 allocs/op
BenchmarkSeqflow_SingleWriter_Reserve16-10      0.1341 ns/op   0 B/op    0 allocs/op
BenchmarkSeqflow_MultiWriter4_Reserve1-10       38.58 ns/op    0 B/op    0 allocs/op
BenchmarkSeqflow_MultiWriter4_Reserve16-10      2.306 ns/op    0 B/op    0 allocs/op
BenchmarkChannel_SingleWriter-10                21.45 ns/op    0 B/op    0 allocs/op
BenchmarkChannel_SingleWriter_Batch16-10        355.3 ns/op    0 B/op    0 allocs/op  (22.2 ns/msg)
BenchmarkChannel_MultiWriter4-10                100.5 ns/op    0 B/op    0 allocs/op
BenchmarkChannel_MultiWriter4_Batch16-10        1649 ns/op     0 B/op    0 allocs/op  (103 ns/msg)

Quick Start

d, _ := seqflow.New[Event](
    seqflow.WithCapacity(1024),
    seqflow.WithHandler("decode", decodeHandler),
    seqflow.WithHandler("process", processHandler, seqflow.DependsOn("decode")),
    seqflow.WithHandler("store", storeHandler, seqflow.DependsOn("process")),
)

go d.Listen()

rb := d.RingBuffer()
for i := int64(0); i < 10; i++ {
    upper, _ := d.Reserve(1)
    rb.Set(upper, Event{Value: i})
    d.Commit(upper, upper)
}

d.Drain(ctx)

Concepts

RingBuffer[T] Generic ring buffer. Power-of-2 capacity. Get() returns pointer (zero-copy).
Reserve / Commit Producer claims slots, writes data, then commits. Visible to consumers after commit.
Handler Consumer callback. Receives (lower, upper) sequence range for batch processing.
DAG Topology Declare dependencies between handlers. Supports pipelines, diamonds, fan-out — any DAG.
Producer → [A] → [B] → [D]
                → [C] ↗
seqflow.WithHandler("A", h1),
seqflow.WithHandler("B", h2, seqflow.DependsOn("A")),
seqflow.WithHandler("C", h3, seqflow.DependsOn("A")),
seqflow.WithHandler("D", h4, seqflow.DependsOn("B", "C")),

Examples

Example Description
basic Single producer, single consumer
batch Batch reserve — claim 16 slots in one atomic op
multiwriter 4 concurrent producers with shared sequencer
diamond DAG: decode → risk + calc → store
fanout Fan-out: one event → 3 independent consumers
metrics Custom metrics collector
go run github.com/gocronx/seqflow/example/basic

Options

Option Description Default
WithCapacity(n) Buffer size (power of 2) 1024
WithWriterCount(n) Concurrent writers. >1 enables shared sequencer 1
WithWaitStrategy(s) Backpressure strategy SleepingStrategy
WithMetrics(m) Optional metrics collector nil

Wait Strategies

Strategy Latency CPU Use case
BusySpinStrategy Lowest Highest Dedicated cores
YieldingStrategy Low Medium Shared CPU
SleepingStrategy Medium Low Default
BlockingStrategy High Lowest Low-frequency

Shutdown

d.Close()    // stop immediately
d.Drain(ctx) // drain remaining events, then stop

Mutually exclusive. Second call returns ErrClosed.

Design

  • Single package — no cross-package interface dispatch overhead
  • Cache-line padding — atomic sequences aligned to CPU cache lines, prevents false sharing
  • Pre-computed remaining capacity — Reserve fast path: 1 comparison + 2 add/sub
  • Zero interface dispatch — single-writer fields embedded directly in Disruptor struct
  • Zero atomic loads on fast path — Close poisons capacity counter, no atomic.Load in hot loop
  • Optional metrics — nil check in hot path, zero overhead when disabled

Documentation

Index

Constants

View Source
const CacheLineBytes = 64

CacheLineBytes 是当前架构的 CPU 缓存行大小(字节)

Variables

View Source
var (
	ErrInvalidReservation  = errors.New("seqflow: invalid reservation size (zero or exceeds capacity)")
	ErrCapacityUnavailable = errors.New("seqflow: capacity unavailable")
	ErrClosed              = errors.New("seqflow: disruptor is closed")
	ErrInvalidCapacity     = errors.New("seqflow: capacity must be a positive power of 2")
	ErrNoHandlers          = errors.New("seqflow: at least one handler is required")
	ErrDuplicateHandler    = errors.New("seqflow: duplicate handler name")
	ErrUnknownDependency   = errors.New("seqflow: unknown dependency in DependsOn")
	ErrCyclicDependency    = errors.New("seqflow: cyclic dependency detected")
)

Functions

This section is empty.

Types

type BlockingStrategy

type BlockingStrategy struct {
	// contains filtered or unexported fields
}

BlockingStrategy 阻塞策略,使用 sync.Cond 最小化 CPU 占用,延迟较高。

func NewBlockingStrategy

func NewBlockingStrategy() *BlockingStrategy

func (*BlockingStrategy) Gate

func (s *BlockingStrategy) Gate(int64)

func (*BlockingStrategy) Idle

func (s *BlockingStrategy) Idle(int64)

func (*BlockingStrategy) Reserve

func (s *BlockingStrategy) Reserve(int64)

func (*BlockingStrategy) Signal

func (s *BlockingStrategy) Signal()

type BusySpinStrategy

type BusySpinStrategy struct{}

BusySpinStrategy 忙等策略,不让出 CPU。适用于极低延迟、独占 CPU 核心的场景。

func NewBusySpinStrategy

func NewBusySpinStrategy() *BusySpinStrategy

func (*BusySpinStrategy) Gate

func (s *BusySpinStrategy) Gate(int64)

func (*BusySpinStrategy) Idle

func (s *BusySpinStrategy) Idle(int64)

func (*BusySpinStrategy) Reserve

func (s *BusySpinStrategy) Reserve(int64)

func (*BusySpinStrategy) Signal

func (s *BusySpinStrategy) Signal()

type Disruptor

type Disruptor[T any] struct {
	// contains filtered or unexported fields
}

Disruptor 是序列驱动流的顶层容器。

核心优化:

  1. 预计算剩余容量:快路径仅 1 次比较
  2. 零接口分发:单写者字段直接嵌入结构体
  3. 零原子读:关闭时毒化 remainingCapacity 迫使进入慢路径

func New

func New[T any](opts ...Option) (*Disruptor[T], error)

New 创建一个 Disruptor 实例

func (*Disruptor[T]) Close

func (d *Disruptor[T]) Close() error

Close 立即停止所有消费者,不等待排空

func (*Disruptor[T]) Commit

func (d *Disruptor[T]) Commit(lower, upper int64)

Commit 使已预留的槽位对消费者可见。 单写者快路径:直接写入嵌入的 atomic.Int64,零指针追逐。

func (*Disruptor[T]) Drain

func (d *Disruptor[T]) Drain(ctx context.Context) error

Drain 等待所有已提交事件被终端消费者处理完毕,然后停止

func (*Disruptor[T]) Listen

func (d *Disruptor[T]) Listen()

Listen 阻塞当前 goroutine,运行所有消费者 handler

func (*Disruptor[T]) Reserve

func (d *Disruptor[T]) Reserve(count uint32) (int64, error)

Reserve 在环形缓冲区中预留槽位。

单写者快路径(最常见场景):

  • 1 次 nil 检查(分支预测命中)
  • 1 次比较(remainingCapacity)
  • 2 次加减法
  • 零原子操作,零接口分发,零 error 构造

func (*Disruptor[T]) RingBuffer

func (d *Disruptor[T]) RingBuffer() *RingBuffer[T]

RingBuffer 返回底层环形缓冲区

func (*Disruptor[T]) TryReserve

func (d *Disruptor[T]) TryReserve(count uint32) (int64, error)

TryReserve 非阻塞式尝试预留

type Handler

type Handler interface {
	Handle(lower, upper int64)
}

Handler 是消费者回调接口,接收可用序列的批次范围

type HandlerOption

type HandlerOption func(*handlerNode)

HandlerOption 配置 handler 注册

func DependsOn

func DependsOn(names ...string) HandlerOption

DependsOn 声明对其他命名 handler 的依赖

type Metrics

type Metrics interface {
	// ReserveCount 生产者 Reserve 调用次数
	ReserveCount(count int64)
	// CommitCount 生产者 Commit 调用次数
	CommitCount(count int64)
	// ReserveWaitCount Reserve 进入慢路径的次数
	ReserveWaitCount(count int64)
	// HandleCount 指定 handler 处理批次数
	HandleCount(name string, count int64)
	// HandleEvents 指定 handler 处理事件总数
	HandleEvents(name string, count int64)
	// IdleCount 指定 handler 空闲等待次数
	IdleCount(name string, count int64)
	// GateCount 指定 handler 被 gate 阻塞次数
	GateCount(name string, count int64)
	// BufferUsage 缓冲区使用率快照
	BufferUsage(used, capacity int64)
}

Metrics 收集可选的性能指标。当为 nil 时,热路径中通过 nil 检查跳过所有指标调用,零开销。

type NoopMetrics

type NoopMetrics struct{}

NoopMetrics 是 Metrics 的空实现

func (NoopMetrics) BufferUsage

func (NoopMetrics) BufferUsage(used, capacity int64)

func (NoopMetrics) CommitCount

func (NoopMetrics) CommitCount(int64)

func (NoopMetrics) GateCount

func (NoopMetrics) GateCount(string, int64)

func (NoopMetrics) HandleCount

func (NoopMetrics) HandleCount(string, int64)

func (NoopMetrics) HandleEvents

func (NoopMetrics) HandleEvents(string, int64)

func (NoopMetrics) IdleCount

func (NoopMetrics) IdleCount(string, int64)

func (NoopMetrics) ReserveCount

func (NoopMetrics) ReserveCount(int64)

func (NoopMetrics) ReserveWaitCount

func (NoopMetrics) ReserveWaitCount(int64)

type Option

type Option func(*config)

Option 配置 Disruptor

func WithCapacity

func WithCapacity(n uint32) Option

WithCapacity 设置环形缓冲区容量(必须是 2 的幂)

func WithHandler

func WithHandler(name string, h Handler, opts ...HandlerOption) Option

WithHandler 注册一个命名的 handler 及其可选依赖

func WithMetrics

func WithMetrics(m Metrics) Option

WithMetrics 设置指标收集器

func WithWaitStrategy

func WithWaitStrategy(ws WaitStrategy) Option

WithWaitStrategy 设置背压策略

func WithWriterCount

func WithWriterCount(n uint8) Option

WithWriterCount 设置并发生产者数量

type RingBuffer

type RingBuffer[T any] struct {
	// contains filtered or unexported fields
}

RingBuffer 是泛型的预分配环形缓冲区,容量必须是 2 的幂。 Get 返回指向缓冲区槽位的指针(零拷贝),该指针仅在生产者回绕覆写该槽位前有效。

func NewRingBuffer

func NewRingBuffer[T any](capacity uint32) (*RingBuffer[T], error)

NewRingBuffer 创建指定容量的环形缓冲区(必须是 2 的幂)

func (*RingBuffer[T]) Capacity

func (rb *RingBuffer[T]) Capacity() uint32

Capacity 返回环形缓冲区容量

func (*RingBuffer[T]) Get

func (rb *RingBuffer[T]) Get(seq int64) *T

Get 返回指定序列位置的元素指针

func (*RingBuffer[T]) Set

func (rb *RingBuffer[T]) Set(seq int64, value T)

Set 写入值到指定序列位置

type SequenceBarrier

type SequenceBarrier interface {
	// Load 返回从给定下界开始可安全读取的最高序列号
	Load(lower int64) int64
}

SequenceBarrier 抽象了从一个或多个序列读取已提交/已处理位置的操作。

type SleepingStrategy

type SleepingStrategy struct{}

SleepingStrategy 默认策略。Gate 使用 Gosched(工作即将到来),Idle/Reserve 使用 Sleep。

func NewSleepingStrategy

func NewSleepingStrategy() *SleepingStrategy

func (*SleepingStrategy) Gate

func (s *SleepingStrategy) Gate(int64)

func (*SleepingStrategy) Idle

func (s *SleepingStrategy) Idle(int64)

func (*SleepingStrategy) Reserve

func (s *SleepingStrategy) Reserve(int64)

func (*SleepingStrategy) Signal

func (s *SleepingStrategy) Signal()

type WaitStrategy

type WaitStrategy interface {
	// Gate 当数据已提交但上游 Handler 组未完成时调用
	Gate(count int64)
	// Idle 当没有数据可处理时调用
	Idle(count int64)
	// Reserve 当环形缓冲区已满、生产者等待时调用
	Reserve(count int64)
	// Signal 在生产者 Commit 时调用,用于唤醒阻塞的消费者
	Signal()
}

WaitStrategy 控制生产者和消费者的背压行为

type YieldingStrategy

type YieldingStrategy struct{}

YieldingStrategy 让出处理器策略。适用于低延迟、共享 CPU 的场景。

func NewYieldingStrategy

func NewYieldingStrategy() *YieldingStrategy

func (*YieldingStrategy) Gate

func (s *YieldingStrategy) Gate(int64)

func (*YieldingStrategy) Idle

func (s *YieldingStrategy) Idle(int64)

func (*YieldingStrategy) Reserve

func (s *YieldingStrategy) Reserve(int64)

func (*YieldingStrategy) Signal

func (s *YieldingStrategy) Signal()

Directories

Path Synopsis
example
basic command
Basic example: single producer → single consumer
Basic example: single producer → single consumer
batch command
Batch reserve example: claim multiple slots in one atomic operation
Batch reserve example: claim multiple slots in one atomic operation
diamond command
Diamond DAG example:
Diamond DAG example:
fanout command
Fan-out example: one event dispatched to multiple independent consumers
Fan-out example: one event dispatched to multiple independent consumers
metrics command
Metrics example: custom metrics collector for monitoring Disruptor internals
Metrics example: custom metrics collector for monitoring Disruptor internals
multiwriter command
Multi-writer example: 4 goroutines writing concurrently to one Disruptor
Multi-writer example: 4 goroutines writing concurrently to one Disruptor

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL