pubsub

package
v0.0.0-...-2aa8555 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 6, 2021 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BytesCompare

func BytesCompare(a, b []byte, want Symbol) bool

BytesCompare 比较两个byte数组,如果a, b满足want指定的结果则返回true,其它返回false

func NewSyncMap

func NewSyncMap() *syncMap

NewSyncMap create a new map

func PutItem

func PutItem(item *Item)

PutItem put item to pool

Types

type BigSet

type BigSet struct {
	// contains filtered or unexported fields
}

BigSet the collection of items provides concurrent safe additions, deletions, and traversals

func NewBigSet

func NewBigSet(discardThreshold int) *BigSet

NewBigSet new a object

func (*BigSet) Delete

func (bs *BigSet) Delete(c interface{})

Delete elements, delete in mapItem, and add delQueue List is deleted if delQueue reaches a certain length

func (*BigSet) Destory

func (bs *BigSet) Destory()

Destory bigset

func (*BigSet) Put

func (bs *BigSet) Put(c interface{})

Put elements, map and list need to be added Does not add or return an error if it already exists

func (*BigSet) Scan

func (bs *BigSet) Scan(handlers func(c interface{}) error)

Scan Iterates through the handlers callback function

func (*BigSet) TotalSize

func (bs *BigSet) TotalSize() int

TotalSize Total number of current elements: the number of items that are valid and the number of items that are invalid for deletion

func (*BigSet) ValidSize

func (bs *BigSet) ValidSize() int

ValidSize valid number of current elements

type Cache

type Cache struct {
	// contains filtered or unexported fields
}

Cache 使用循环队列缓存pull拉取回来的消息 外部在调用`getResp`或`putResp`的时候需要先使用cache.mu进行加锁 buf按照前闭后开的方式存储消息,也就是[front, rear) 其中 buf[front-1] 存储用户拉取消息时的offset

type Item

type Item struct {
	Prev *Item
	Next *Item
	// contains filtered or unexported fields
}

Item list a node

func GetItem

func GetItem(c interface{}) *Item

GetItem get a item

func NewItem

func NewItem() *Item

NewItem create a new item

type NotifyHandler

type NotifyHandler func(c interface{}, topic string, last []byte)

type Pubsub

type Pubsub struct {
	// contains filtered or unexported fields
}

Pubsub use save the topic object for service+topic

func NewPubsub

func NewPubsub(config *conf.Pubsub, ctx context.Context, pushcli *pushcli.PushCli) *Pubsub

NewPubsub create new a pubsub object

func (*Pubsub) AddRoute

func (pubsub *Pubsub) AddRoute(req *ph.AddRouteReq, c interface{}) error

Put find a topic object from topicMap, create new a topic object if not exist then send addroute request

func (*Pubsub) Notify

func (pubsub *Pubsub) Notify(req *conn.NotifyReq, handler NotifyHandler)

Notify call topic the function of notify

func (*Pubsub) Pull

func (pubsub *Pubsub) Pull(ctx context.Context, req *ph.PullReq) (*ph.PullResp, error)

Pull call topic the function of pull

func (*Pubsub) RemoveRoute

func (pubsub *Pubsub) RemoveRoute(req *ph.RemoveRouteReq, c interface{}) error

Delete delete a topic object from topicMap if the num of topic size is zero then send remove request

func (*Pubsub) Scan

func (pubsub *Pubsub) Scan(service, topic string, f func(interface{}) error)

Scan call topic the function of scan

func (*Pubsub) Size

func (pubsub *Pubsub) Size(service, topic string) int

Size return the topic object is size

type Subscribers

type Subscribers struct {
	// contains filtered or unexported fields
}

Subscribers ensure concurrency security on add route and remove route pull data asynchronously and store cache manage conn set

func NewSubscribers

func NewSubscribers(ctx context.Context, cf *conf.Topic, pushcli *pushcli.PushCli) *Subscribers

NewSubscribers create new a topic object

func (*Subscribers) AddRoute

func (t *Subscribers) AddRoute(req *ph.AddRouteReq) error

AddRoute send addroute requst if the frist time topic is created guaranteed topic refers to adding a route once

func (*Subscribers) Count

func (t *Subscribers) Count() int32

func (*Subscribers) Decr

func (t *Subscribers) Decr() int32

func (*Subscribers) DelRoute

func (t *Subscribers) DelRoute(req *ph.RemoveRouteReq) error

DelRoute send removeroute requst if the addroute is done

func (*Subscribers) Deregister

func (t *Subscribers) Deregister(c interface{})

func (*Subscribers) Destory

func (t *Subscribers) Destory()

TODO Destory

func (*Subscribers) GetCache

func (t *Subscribers) GetCache(ctx context.Context, req *ph.PullReq) (*ph.PullResp, error)

GetCache

func (*Subscribers) Incr

func (t *Subscribers) Incr() int32

func (*Subscribers) Notify

func (t *Subscribers) Notify(nr *conn.NotifyReq, notifyHandler NotifyHandler)

Notify all conns and pull message

func (*Subscribers) Pull

func (t *Subscribers) Pull(ctx context.Context, req *ph.PullReq) (*ph.PullResp, error)

Pull messages from cache or pushd

func (*Subscribers) Register

func (t *Subscribers) Register(c interface{})

func (*Subscribers) Scan

func (t *Subscribers) Scan(handler func(c interface{}) error)

type Symbol

type Symbol byte
const (
	Equal Symbol = iota
	Greater
	Less
	GreaterOrEqual
	LessOrEqual
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL