Documentation
¶
Index ¶
Constants ¶
const ( READY = 0 // 模块的状态之就绪态 RUNNING = 1 // 模块的状态之运行态 StartErrorFormat = "start model [%s] error" CloseErrorFormat = "close model [%s] error" )
Variables ¶
var (
ClosedError = errors.New("the queue has been closed")
)
Functions ¶
This section is empty.
Types ¶
type ChanBlockStrategy ¶
type ChanBlockStrategy struct {
// contains filtered or unexported fields
}
ChanBlockStrategy chan阻塞策略
func NewChanBlockStrategy ¶
func NewChanBlockStrategy() *ChanBlockStrategy
type ConditionBlockStrategy ¶
type ConditionBlockStrategy struct {
// contains filtered or unexported fields
}
ConditionBlockStrategy condition 阻塞策略
func NewConditionBlockStrategy ¶
func NewConditionBlockStrategy() *ConditionBlockStrategy
type EventHandler ¶
type EventHandler[T any] interface { // OnEvent 用户侧实现,事件处理方法 OnEvent(t T) }
EventHandler 事件处理器接口 整个无锁队列中唯一需要用户实现的接口,该接口描述消费端收到消息时该如何处理 使用泛型,通过编译阶段确定事件类型,提高性能
type Lockfree ¶
type Lockfree[T any] struct { // contains filtered or unexported fields }
Lockfree 包装类,内部包装了生产者和消费者
func NewLockfree ¶
func NewLockfree[T any](capacity int, handler EventHandler[T], blocks blockStrategy) *Lockfree[T]
NewLockfree 自定义创建消费端的Disruptor capacity:buffer的容量大小,类似于chan的大小,但要求必须是2^n,即2的指数倍,如果不是的话会被修改 handler:消费端的事件处理器 blocks:读取阻塞时的处理策略
type OSYieldBlockStrategy ¶
type OSYieldBlockStrategy struct {
}
OSYieldBlockStrategy 操作系统调度策略
func NewOSYieldWaitStrategy ¶
func NewOSYieldWaitStrategy() *OSYieldBlockStrategy
type ProcYieldBlockStrategy ¶
type ProcYieldBlockStrategy struct {
// contains filtered or unexported fields
}
ProcYieldBlockStrategy CPU空指令策略
func NewProcYieldBlockStrategy ¶
func NewProcYieldBlockStrategy(cycle uint32) *ProcYieldBlockStrategy
type Producer ¶
type Producer[T any] struct { // contains filtered or unexported fields }
Producer 生产者 核心方法是Write,通过调用Write方法可以将对象写入到队列中
func (*Producer[T]) Write ¶
Write 对象写入核心逻辑 首先会从序号产生器中获取一个序号,该序号由atomic自增,不会重复; 然后通过&运算获取该序号应该放入的位置pos; 通过循环的方式,判断对应pos位置是否可以写入内容,这个判断是通过available数组判断的; 如果无法写入则持续循环等待,直到可以写入为止,此处基于一种思想即写入的实时性,写入操作不需要等太久,因此此处是没有阻塞的, 仅仅通过调度让出的方式,进行一部分cpu让渡,防止持续占用cpu资源 获取到写入资格后将内容写入到ringbuffer,同时更新available数组,并且调用release,以便于释放消费端的阻塞等待
func (*Producer[T]) WriteByCursor ¶
WriteByCursor 根据游标写入内容,wc是调用 WriteTimeout 方法返回false时对应的写入位置, 该位置有严格的含义,不要随意填值,否则会造成整个队列异常 函数返回值:是否写入成功和是否存在error,若返回false表示写入失败,可以继续调用重复写入
func (*Producer[T]) WriteTimeout ¶
WriteTimeout 在写入的基础上设定一个时间,如果时间到了仍然没有写入则会放弃本次写入,返回写入的位置和false 使用方需要调用 WriteByCursor 来继续写入该位置,因为位置已经被占用,是必须要写的,不能跳跃性写入 在指定时间内写入成功会返回true 三个返回项:写入位置、是否写入成功及是否有error
func (*Producer[T]) WriteWindow ¶
WriteWindow 写入窗口 描述当前可写入的状态,如果不能写入则返回零值,如果可以写入则返回写入窗口大小 由于执行时不加锁,所以该结果是不可靠的,仅用于在并发环境很高的情况下,进行丢弃行为
type SchedBlockStrategy ¶
type SchedBlockStrategy struct {
}
SchedBlockStrategy 调度等待策略 调用runtime.Gosched()方法使当前 g 主动让出 cpu 资源。
type SleepBlockStrategy ¶
type SleepBlockStrategy struct {
// contains filtered or unexported fields
}
SleepBlockStrategy 休眠等待策略 调用 Sleep 方法使当前 g 主动让出 cpu 资源。 sleep poll 参考值: 轮询时长为 10us 时,cpu 开销约 2-3% 左右。 轮询时长为 5us 时,cpu 开销约在 10% 左右。 轮询时长小于 5us 时,cpu 开销接近 100% 满载。
func NewSleepBlockStrategy ¶
func NewSleepBlockStrategy(wait time.Duration) *SleepBlockStrategy












