Documentation
¶
Index ¶
- Constants
- Variables
- type BlockingStrategy
- type BusySpinStrategy
- type Disruptor
- func (d *Disruptor[T]) Close() error
- func (d *Disruptor[T]) Commit(lower, upper int64)
- func (d *Disruptor[T]) Drain(ctx context.Context) error
- func (d *Disruptor[T]) Listen()
- func (d *Disruptor[T]) Reserve(count uint32) (int64, error)
- func (d *Disruptor[T]) RingBuffer() *RingBuffer[T]
- func (d *Disruptor[T]) TryReserve(count uint32) (int64, error)
- type Handler
- type HandlerOption
- type Metrics
- type NoopMetrics
- func (NoopMetrics) BufferUsage(used, capacity int64)
- func (NoopMetrics) CommitCount(int64)
- func (NoopMetrics) GateCount(string, int64)
- func (NoopMetrics) HandleCount(string, int64)
- func (NoopMetrics) HandleEvents(string, int64)
- func (NoopMetrics) IdleCount(string, int64)
- func (NoopMetrics) ReserveCount(int64)
- func (NoopMetrics) ReserveWaitCount(int64)
- type Option
- type RingBuffer
- type SequenceBarrier
- type SleepingStrategy
- type WaitStrategy
- type YieldingStrategy
Constants ¶
View Source
const CacheLineBytes = 64
CacheLineBytes 是当前架构的 CPU 缓存行大小(字节)
Variables ¶
View Source
var ( ErrInvalidReservation = errors.New("seqflow: invalid reservation size (zero or exceeds capacity)") ErrClosed = errors.New("seqflow: disruptor is closed") ErrInvalidCapacity = errors.New("seqflow: capacity must be a positive power of 2") ErrNoHandlers = errors.New("seqflow: at least one handler is required") ErrDuplicateHandler = errors.New("seqflow: duplicate handler name") ErrUnknownDependency = errors.New("seqflow: unknown dependency in DependsOn") ErrCyclicDependency = errors.New("seqflow: cyclic dependency detected") )
Functions ¶
This section is empty.
Types ¶
type BlockingStrategy ¶
type BlockingStrategy struct {
// contains filtered or unexported fields
}
BlockingStrategy 阻塞策略,使用 sync.Cond 最小化 CPU 占用,延迟较高。
func NewBlockingStrategy ¶
func NewBlockingStrategy() *BlockingStrategy
func (*BlockingStrategy) Gate ¶
func (s *BlockingStrategy) Gate(int64)
func (*BlockingStrategy) Idle ¶
func (s *BlockingStrategy) Idle(int64)
func (*BlockingStrategy) Reserve ¶
func (s *BlockingStrategy) Reserve(int64)
func (*BlockingStrategy) Signal ¶
func (s *BlockingStrategy) Signal()
type BusySpinStrategy ¶
type BusySpinStrategy struct{}
BusySpinStrategy 忙等策略,不让出 CPU。适用于极低延迟、独占 CPU 核心的场景。
func NewBusySpinStrategy ¶
func NewBusySpinStrategy() *BusySpinStrategy
func (*BusySpinStrategy) Gate ¶
func (s *BusySpinStrategy) Gate(int64)
func (*BusySpinStrategy) Idle ¶
func (s *BusySpinStrategy) Idle(int64)
func (*BusySpinStrategy) Reserve ¶
func (s *BusySpinStrategy) Reserve(int64)
func (*BusySpinStrategy) Signal ¶
func (s *BusySpinStrategy) Signal()
type Disruptor ¶
type Disruptor[T any] struct { // contains filtered or unexported fields }
Disruptor 是序列驱动流的顶层容器。
核心优化:
- 预计算剩余容量:快路径仅 1 次比较
- 零接口分发:单写者字段直接嵌入结构体
- 零原子读:关闭时毒化 remainingCapacity 迫使进入慢路径
func (*Disruptor[T]) Reserve ¶
Reserve 在环形缓冲区中预留槽位。
单写者快路径(最常见场景):
- 1 次 nil 检查(分支预测命中)
- 1 次比较(remainingCapacity)
- 2 次加减法
- 零原子操作,零接口分发,零 error 构造
func (*Disruptor[T]) RingBuffer ¶
func (d *Disruptor[T]) RingBuffer() *RingBuffer[T]
RingBuffer 返回底层环形缓冲区
type Metrics ¶
type Metrics interface {
// ReserveCount 生产者 Reserve 调用次数
ReserveCount(count int64)
// CommitCount 生产者 Commit 调用次数
CommitCount(count int64)
// ReserveWaitCount Reserve 进入慢路径的次数
ReserveWaitCount(count int64)
// HandleCount 指定 handler 处理批次数
HandleCount(name string, count int64)
// HandleEvents 指定 handler 处理事件总数
HandleEvents(name string, count int64)
// IdleCount 指定 handler 空闲等待次数
IdleCount(name string, count int64)
// GateCount 指定 handler 被 gate 阻塞次数
GateCount(name string, count int64)
// BufferUsage 缓冲区使用率快照
BufferUsage(used, capacity int64)
}
Metrics 收集可选的性能指标。当为 nil 时,热路径中通过 nil 检查跳过所有指标调用,零开销。
type NoopMetrics ¶
type NoopMetrics struct{}
NoopMetrics 是 Metrics 的空实现
func (NoopMetrics) BufferUsage ¶
func (NoopMetrics) BufferUsage(used, capacity int64)
func (NoopMetrics) CommitCount ¶
func (NoopMetrics) CommitCount(int64)
func (NoopMetrics) GateCount ¶
func (NoopMetrics) GateCount(string, int64)
func (NoopMetrics) HandleCount ¶
func (NoopMetrics) HandleCount(string, int64)
func (NoopMetrics) HandleEvents ¶
func (NoopMetrics) HandleEvents(string, int64)
func (NoopMetrics) IdleCount ¶
func (NoopMetrics) IdleCount(string, int64)
func (NoopMetrics) ReserveCount ¶
func (NoopMetrics) ReserveCount(int64)
func (NoopMetrics) ReserveWaitCount ¶
func (NoopMetrics) ReserveWaitCount(int64)
type Option ¶
type Option func(*config)
Option 配置 Disruptor
func WithHandler ¶
func WithHandler(name string, h Handler, opts ...HandlerOption) Option
WithHandler 注册一个命名的 handler 及其可选依赖
type RingBuffer ¶
type RingBuffer[T any] struct { // contains filtered or unexported fields }
RingBuffer 是泛型的预分配环形缓冲区,容量必须是 2 的幂。 Get 返回指向缓冲区槽位的指针(零拷贝),该指针仅在生产者回绕覆写该槽位前有效。
func NewRingBuffer ¶
func NewRingBuffer[T any](capacity uint32) (*RingBuffer[T], error)
NewRingBuffer 创建指定容量的环形缓冲区(必须是 2 的幂)
type SequenceBarrier ¶
SequenceBarrier 抽象了从一个或多个序列读取已提交/已处理位置的操作。
type SleepingStrategy ¶
type SleepingStrategy struct{}
SleepingStrategy 默认策略。Gate 使用 Gosched(工作即将到来),Idle/Reserve 使用 Sleep。
func NewSleepingStrategy ¶
func NewSleepingStrategy() *SleepingStrategy
func (*SleepingStrategy) Gate ¶
func (s *SleepingStrategy) Gate(int64)
func (*SleepingStrategy) Idle ¶
func (s *SleepingStrategy) Idle(int64)
func (*SleepingStrategy) Reserve ¶
func (s *SleepingStrategy) Reserve(int64)
func (*SleepingStrategy) Signal ¶
func (s *SleepingStrategy) Signal()
type WaitStrategy ¶
type WaitStrategy interface {
// Gate 当数据已提交但上游 Handler 组未完成时调用
Gate(count int64)
// Idle 当没有数据可处理时调用
Idle(count int64)
// Reserve 当环形缓冲区已满、生产者等待时调用
Reserve(count int64)
// Signal 在生产者 Commit 时调用,用于唤醒阻塞的消费者
Signal()
}
WaitStrategy 控制生产者和消费者的背压行为
type YieldingStrategy ¶
type YieldingStrategy struct{}
YieldingStrategy 让出处理器策略。适用于低延迟、共享 CPU 的场景。
func NewYieldingStrategy ¶
func NewYieldingStrategy() *YieldingStrategy
func (*YieldingStrategy) Gate ¶
func (s *YieldingStrategy) Gate(int64)
func (*YieldingStrategy) Idle ¶
func (s *YieldingStrategy) Idle(int64)
func (*YieldingStrategy) Reserve ¶
func (s *YieldingStrategy) Reserve(int64)
func (*YieldingStrategy) Signal ¶
func (s *YieldingStrategy) Signal()
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
example
|
|
|
basic
command
Basic example: single producer → single consumer
|
Basic example: single producer → single consumer |
|
batch
command
Batch reserve example: claim multiple slots in one atomic operation
|
Batch reserve example: claim multiple slots in one atomic operation |
|
diamond
command
Diamond DAG example:
|
Diamond DAG example: |
|
fanout
command
Fan-out example: one event dispatched to multiple independent consumers
|
Fan-out example: one event dispatched to multiple independent consumers |
|
metrics
command
Metrics example: custom metrics collector for monitoring Disruptor internals
|
Metrics example: custom metrics collector for monitoring Disruptor internals |
|
multiwriter
command
Multi-writer example: 4 goroutines writing concurrently to one Disruptor
|
Multi-writer example: 4 goroutines writing concurrently to one Disruptor |
Click to show internal directories.
Click to hide internal directories.