Documentation ¶
Index ¶
Constants ¶
View Source
const ( MasterRunner = true AssistRunner = false ErrTag = "PC-ErrTag" DebugTag = "PC-DebugTag" PanicTag = "PC-PanicTag" CacheType = ContainerType("cache") ChannelType = ContainerType("channel") )
View Source
const (
// (秒)
DefaultReadTimeout = 3
)
Variables ¶
Functions ¶
This section is empty.
Types ¶
type CBaseInfo ¶
type CBaseInfo struct { // 记录内部日志信息 Record func(tag string, msg interface{}) // contains filtered or unexported fields }
基本属性
type Config ¶
type Config struct { // * channel型(ChannelType):基于缓冲channel队列实现的。 // * cache型(CacheType):基于redis型队列实现的。 Type ContainerType // 消费信息的函数 // 信息体最终落到此函数处理 // 由用户自定义函数实体内容 ConsumeFunc func(IMessage) // 消息队列长度 // 如果为channel型,此变量为int类型。请自行处理不一致。 MsgLen int64 // 空闲存活时间(针对AssistRunner),单位为秒(s) // 如果是redis型的,此值等同于ReadTimeout。 AssistIdleKeepAlive int64 // 记录内部日志信息 Record func(tag string, msg interface{}) // redis 队列名字(唯一标识) // 针对redis型 CacheListKey string // 把redis队列中的字符串信息解析成相应的信息结构体 // 针对redis型 Unmarshal func([]byte) (IMessage, error) // 把信息结构体序列化成字符串,以便保存到redis队列中 // 针对redis型 Marshal func(IMessage) ([]byte, error) // redis 操作实例 实现接口IRedis // 针对redis型 CacheInstance ICache }
初始配置项
type Container ¶
type Container struct { CBaseInfo // contains filtered or unexported fields }
容器 实现基于缓冲channel队列的生产/消费模式
1、Produce(msg interface{}) 生产信息,把消息放入消息列表中。 2、Consume() 消费消息。
开启主线程一直消费消息,如果消息过多时(消息队列满),则会开启协助协程消费消息。 协助协程将会在消息队列持续为空一段的时间后关闭.
func NewContainerPC ¶
新建生产/消费模式容器
func (*Container) SetAssistIdleKeepAlive ¶
设置空闲存活时间
type ContainerRedis ¶
type ContainerRedis struct { CBaseInfo // 把Cache队列中的字符串信息解析成相应的信息结构体 Unmarshal func([]byte) (IMessage, error) // 把信息结构体序列化成字符串,以便保存到Cache(如:redis)队列中 Marshal func(IMessage) ([]byte, error) // 读取信息列表超时时间(秒) // 如果列表没有元素会阻塞列表直到等待超时 ReadTimeout int64 // 信息队列长度。 // 如果为零,代表程序不限制(则以Cache(如:redis)的队列最大限制为准) // 但不保证真正Cache(如:redis)队列长度一定会小于此值。 // 另外,此值关系到是否有协助协程产生。 MsgLen int64 // cache 队列名字(唯一标识) CacheListKey string // contains filtered or unexported fields }
以Cache(如:redis)队列作为信息队列
func NewContainerCachePC ¶
func NewContainerCachePC(cacheInstance ICache, consumeFunc func(IMessage), unmarshal func([]byte) (IMessage, error), marshal func(IMessage) ([]byte, error)) (*ContainerRedis, error)
新建生产/消费模式容器
func (*ContainerRedis) NumGoroutine ¶
func (cr *ContainerRedis) NumGoroutine() (master, assistActive int64)
type ContainerType ¶
type ContainerType string
type ICache ¶
type ICache interface { // BLPOP key1 timeout(秒) // 移出并获取列表的第一个元素, // 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。 BLPop(key string, timeout int64) (map[string]string, error) // 在列表尾部中添加一个或多个值 RPush(key string, values ...interface{}) (int64, error) // 获取列表长度 LLen(key string) (int64, error) }
缓存Cache接口
type IContainer ¶
type IContainer interface { // 生产消息 Produce(msg IMessage) error // 消费消息 Consume() error // 消费消息的协程数目 NumGoroutine() (master, assistActive int64) }
容器接口
生产/消费模式 通过调用`Consume()`可以产生一个主要消费协程。主协程将一直存在,在没有消息体处理的时候进入阻塞等待。
可以通过调用`Consume()`的次数来控制产生主协程的数目。
当消息体队列的已满,则会产生协助协程消费消息体。协助协程在消息体猛涨时候出现,在没有消息体处理的时候
阻塞等待一定时间后将被销毁。
1、Produce(msg interface{}) 生产信息,把消息放入消息列表中。 2、Consume() 消费消息。
Source Files ¶
Click to show internal directories.
Click to hide internal directories.