Documentation ¶
Index ¶
- func BytesCompare(a, b []byte, want Symbol) bool
- func NewSyncMap() *syncMap
- func PutItem(item *Item)
- type BigSet
- type Cache
- type Item
- type NotifyHandler
- type Pubsub
- func (pubsub *Pubsub) AddRoute(req *ph.AddRouteReq, c interface{}) error
- func (pubsub *Pubsub) Notify(req *conn.NotifyReq, handler NotifyHandler)
- func (pubsub *Pubsub) Pull(ctx context.Context, req *ph.PullReq) (*ph.PullResp, error)
- func (pubsub *Pubsub) RemoveRoute(req *ph.RemoveRouteReq, c interface{}) error
- func (pubsub *Pubsub) Scan(service, topic string, f func(interface{}) error)
- func (pubsub *Pubsub) Size(service, topic string) int
- type Subscribers
- func (t *Subscribers) AddRoute(req *ph.AddRouteReq) error
- func (t *Subscribers) Count() int32
- func (t *Subscribers) Decr() int32
- func (t *Subscribers) DelRoute(req *ph.RemoveRouteReq) error
- func (t *Subscribers) Deregister(c interface{})
- func (t *Subscribers) Destory()
- func (t *Subscribers) GetCache(ctx context.Context, req *ph.PullReq) (*ph.PullResp, error)
- func (t *Subscribers) Incr() int32
- func (t *Subscribers) Notify(nr *conn.NotifyReq, notifyHandler NotifyHandler)
- func (t *Subscribers) Pull(ctx context.Context, req *ph.PullReq) (*ph.PullResp, error)
- func (t *Subscribers) Register(c interface{})
- func (t *Subscribers) Scan(handler func(c interface{}) error)
- type Symbol
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func BytesCompare ¶
BytesCompare 比较两个byte数组,如果a, b满足want指定的结果则返回true,其它返回false
Types ¶
type BigSet ¶
type BigSet struct {
// contains filtered or unexported fields
}
BigSet the collection of items provides concurrent safe additions, deletions, and traversals
func (*BigSet) Delete ¶
func (bs *BigSet) Delete(c interface{})
Delete elements, delete in mapItem, and add delQueue List is deleted if delQueue reaches a certain length
func (*BigSet) Put ¶
func (bs *BigSet) Put(c interface{})
Put elements, map and list need to be added Does not add or return an error if it already exists
type Cache ¶
type Cache struct {
// contains filtered or unexported fields
}
Cache 使用循环队列缓存pull拉取回来的消息 外部在调用`getResp`或`putResp`的时候需要先使用cache.mu进行加锁 buf按照前闭后开的方式存储消息,也就是[front, rear) 其中 buf[front-1] 存储用户拉取消息时的offset
type NotifyHandler ¶
type Pubsub ¶
type Pubsub struct {
// contains filtered or unexported fields
}
Pubsub use save the topic object for service+topic
func (*Pubsub) AddRoute ¶
func (pubsub *Pubsub) AddRoute(req *ph.AddRouteReq, c interface{}) error
Put find a topic object from topicMap, create new a topic object if not exist then send addroute request
func (*Pubsub) Notify ¶
func (pubsub *Pubsub) Notify(req *conn.NotifyReq, handler NotifyHandler)
Notify call topic the function of notify
func (*Pubsub) RemoveRoute ¶
func (pubsub *Pubsub) RemoveRoute(req *ph.RemoveRouteReq, c interface{}) error
Delete delete a topic object from topicMap if the num of topic size is zero then send remove request
type Subscribers ¶
type Subscribers struct {
// contains filtered or unexported fields
}
Subscribers ensure concurrency security on add route and remove route pull data asynchronously and store cache manage conn set
func NewSubscribers ¶
NewSubscribers create new a topic object
func (*Subscribers) AddRoute ¶
func (t *Subscribers) AddRoute(req *ph.AddRouteReq) error
AddRoute send addroute requst if the frist time topic is created guaranteed topic refers to adding a route once
func (*Subscribers) Count ¶
func (t *Subscribers) Count() int32
func (*Subscribers) Decr ¶
func (t *Subscribers) Decr() int32
func (*Subscribers) DelRoute ¶
func (t *Subscribers) DelRoute(req *ph.RemoveRouteReq) error
DelRoute send removeroute requst if the addroute is done
func (*Subscribers) Deregister ¶
func (t *Subscribers) Deregister(c interface{})
func (*Subscribers) Incr ¶
func (t *Subscribers) Incr() int32
func (*Subscribers) Notify ¶
func (t *Subscribers) Notify(nr *conn.NotifyReq, notifyHandler NotifyHandler)
Notify all conns and pull message
func (*Subscribers) Register ¶
func (t *Subscribers) Register(c interface{})
func (*Subscribers) Scan ¶
func (t *Subscribers) Scan(handler func(c interface{}) error)