Documentation
¶
Index ¶
- Constants
- Variables
- func DateTime() string
- func GetServiceNames() []string
- func Now() time.Time
- func NowString() string
- func RegisterPacketMaker(name string, f PacketMaker)
- func RegisterService(s Service)
- func StartClock()
- func StopClock()
- func WallClock() *datetime.Clock
- type ConcurrentEndpointMap
- func (m *ConcurrentEndpointMap) Clear()
- func (m *ConcurrentEndpointMap) Foreach(action func(Endpoint) bool)
- func (m *ConcurrentEndpointMap) Get(node NodeID) Endpoint
- func (m *ConcurrentEndpointMap) Has(node NodeID) bool
- func (m *ConcurrentEndpointMap) IsEmpty() bool
- func (m *ConcurrentEndpointMap) IterBuffered() <-chan Tuple
- func (m *ConcurrentEndpointMap) Keys() []NodeID
- func (m *ConcurrentEndpointMap) Pop(node NodeID) Endpoint
- func (m *ConcurrentEndpointMap) Put(node NodeID, endpoint Endpoint)
- func (m *ConcurrentEndpointMap) PutIfAbsent(node NodeID, endpoint Endpoint)
- func (m *ConcurrentEndpointMap) Remove(node NodeID)
- func (m *ConcurrentEndpointMap) Size() int
- func (m *ConcurrentEndpointMap) Values() []Endpoint
- type Endpoint
- type EndpointFlag
- type EndpointHashMap
- func (m *EndpointHashMap) Clear()
- func (m *EndpointHashMap) Foreach(action func(Endpoint) bool)
- func (m *EndpointHashMap) Get(node NodeID) Endpoint
- func (m *EndpointHashMap) Has(node NodeID) bool
- func (m *EndpointHashMap) IsEmpty() bool
- func (m *EndpointHashMap) Keys() []NodeID
- func (m *EndpointHashMap) Pop(node NodeID) Endpoint
- func (m *EndpointHashMap) Put(node NodeID, endpoint Endpoint)
- func (m *EndpointHashMap) PutIfAbsent(node NodeID, endpoint Endpoint)
- func (m *EndpointHashMap) Remove(node NodeID)
- func (m *EndpointHashMap) Size() int
- func (m *EndpointHashMap) Values() []Endpoint
- type EndpointMap
- type EndpointMapShard
- type IPacket
- type MessageEndpoint
- type NodeID
- type NodeIDSet
- type PacketFlag
- type PacketHandler
- type PacketMaker
- type Service
- type ServiceContext
- func (c *ServiceContext) ASend(pkt IPacket) error
- func (c *ServiceContext) AddFinalizer(action func())
- func (c *ServiceContext) Close()
- func (c *ServiceContext) InboundQueue() chan<- IPacket
- func (c *ServiceContext) Instance() Service
- func (c *ServiceContext) MessageQueue() <-chan IPacket
- func (c *ServiceContext) QuitDone() <-chan struct{}
- func (c *ServiceContext) RunID() string
- func (c *ServiceContext) Send(pkt IPacket)
- func (c *ServiceContext) SetInstance(inst Service)
- func (c *ServiceContext) StartTime() time.Time
- type State
- type Tuple
Constants ¶
View Source
const ( NodeServiceShift = 16 NodeTypeShift = 31 NodeServiceMask = 0x00FF0000 NodeInstanceMask = 0x0000FFFF )
View Source
const ( SERVICE_ALL = 0xFF // 所有服务 INSTANCE_ALL = 0xFFFF // 所有实例 )
View Source
const ( StateInit = 0 StateStarting = 1 StateRunning = 2 StateShuttingDown = 3 StateTerminated = 4 )
View Source
const ClockEpoch int64 = 1577836800 // 2020-01-01 00:00:00 UTC
epoch of clock
View Source
const EndpointMapShardCount = 16
View Source
const VERSION = "0.3.8"
版本号
Variables ¶
View Source
var ErrContextInboundQueueFull = errors.New("context inbound queue full")
Functions ¶
func RegisterPacketMaker ¶ added in v0.3.8
func RegisterPacketMaker(name string, f PacketMaker)
Types ¶
type ConcurrentEndpointMap ¶ added in v0.1.32
type ConcurrentEndpointMap struct {
// contains filtered or unexported fields
}
线程安全的EndpointMap,适合数据量很大的场景
func NewConcurrentEndpointMap ¶ added in v0.1.32
func NewConcurrentEndpointMap() *ConcurrentEndpointMap
func (*ConcurrentEndpointMap) Clear ¶ added in v0.1.32
func (m *ConcurrentEndpointMap) Clear()
func (*ConcurrentEndpointMap) Foreach ¶ added in v0.1.32
func (m *ConcurrentEndpointMap) Foreach(action func(Endpoint) bool)
action应该对map是read-only
func (*ConcurrentEndpointMap) Get ¶ added in v0.1.32
func (m *ConcurrentEndpointMap) Get(node NodeID) Endpoint
func (*ConcurrentEndpointMap) Has ¶ added in v0.1.32
func (m *ConcurrentEndpointMap) Has(node NodeID) bool
func (*ConcurrentEndpointMap) IsEmpty ¶ added in v0.1.32
func (m *ConcurrentEndpointMap) IsEmpty() bool
func (*ConcurrentEndpointMap) IterBuffered ¶ added in v0.1.32
func (m *ConcurrentEndpointMap) IterBuffered() <-chan Tuple
IterBuffered returns a buffered iterator which could be used in a for range loop.
func (*ConcurrentEndpointMap) Keys ¶ added in v0.1.32
func (m *ConcurrentEndpointMap) Keys() []NodeID
func (*ConcurrentEndpointMap) Pop ¶ added in v0.1.32
func (m *ConcurrentEndpointMap) Pop(node NodeID) Endpoint
Pop removes an element from the map and returns it
func (*ConcurrentEndpointMap) Put ¶ added in v0.1.32
func (m *ConcurrentEndpointMap) Put(node NodeID, endpoint Endpoint)
func (*ConcurrentEndpointMap) PutIfAbsent ¶ added in v0.1.32
func (m *ConcurrentEndpointMap) PutIfAbsent(node NodeID, endpoint Endpoint)
func (*ConcurrentEndpointMap) Remove ¶ added in v0.1.32
func (m *ConcurrentEndpointMap) Remove(node NodeID)
func (*ConcurrentEndpointMap) Size ¶ added in v0.1.32
func (m *ConcurrentEndpointMap) Size() int
func (*ConcurrentEndpointMap) Values ¶ added in v0.1.32
func (m *ConcurrentEndpointMap) Values() []Endpoint
type Endpoint ¶
type Endpoint interface {
MessageEndpoint
// 原始连接对象
RawConn() net.Conn
// 发送/接收计数数据
Stats() *stats.Stats
ErrorChan() <-chan error
// 开启read/write线程
Go(EndpointFlag)
// 设置加解密
SetEncryptPair(cipher.BlockCryptor, cipher.BlockCryptor)
}
网络端点
type EndpointFlag ¶
type EndpointFlag uint32
开启reader/writer标记
const ( EndpointReader EndpointFlag = 0x01 // 只开启reader EndpointWriter EndpointFlag = 0x02 // 只开启writer EndpointReadWriter EndpointFlag = 0x03 // 开启reader和writer )
type EndpointHashMap ¶ added in v0.1.32
type EndpointHashMap struct {
// contains filtered or unexported fields
}
线程安全的EndpointMap,适合数据量不是很大的场景
func NewEndpointHashMap ¶ added in v0.1.32
func NewEndpointHashMap() *EndpointHashMap
func (*EndpointHashMap) Clear ¶ added in v0.1.32
func (m *EndpointHashMap) Clear()
func (*EndpointHashMap) Foreach ¶ added in v0.1.32
func (m *EndpointHashMap) Foreach(action func(Endpoint) bool)
action应该对map是read-only
func (*EndpointHashMap) Get ¶ added in v0.1.32
func (m *EndpointHashMap) Get(node NodeID) Endpoint
func (*EndpointHashMap) Has ¶ added in v0.1.32
func (m *EndpointHashMap) Has(node NodeID) bool
func (*EndpointHashMap) IsEmpty ¶ added in v0.1.32
func (m *EndpointHashMap) IsEmpty() bool
func (*EndpointHashMap) Keys ¶ added in v0.1.32
func (m *EndpointHashMap) Keys() []NodeID
func (*EndpointHashMap) Pop ¶ added in v0.1.32
func (m *EndpointHashMap) Pop(node NodeID) Endpoint
Pop removes an element from the map and returns it
func (*EndpointHashMap) Put ¶ added in v0.1.32
func (m *EndpointHashMap) Put(node NodeID, endpoint Endpoint)
func (*EndpointHashMap) PutIfAbsent ¶ added in v0.1.32
func (m *EndpointHashMap) PutIfAbsent(node NodeID, endpoint Endpoint)
func (*EndpointHashMap) Remove ¶ added in v0.1.32
func (m *EndpointHashMap) Remove(node NodeID)
func (*EndpointHashMap) Size ¶ added in v0.1.32
func (m *EndpointHashMap) Size() int
func (*EndpointHashMap) Values ¶ added in v0.1.32
func (m *EndpointHashMap) Values() []Endpoint
type EndpointMap ¶
type EndpointMap interface {
Size() int
Has(node NodeID) bool
Get(node NodeID) Endpoint
IsEmpty() bool
Keys() []NodeID
Values() []Endpoint
Foreach(func(Endpoint) bool)
Put(node NodeID, endpoint Endpoint)
PutIfAbsent(node NodeID, endpoint Endpoint)
Remove(node NodeID)
Pop(node NodeID) Endpoint
Clear()
}
线程安全的Endpoint字典
type EndpointMapShard ¶ added in v0.1.32
func (*EndpointMapShard) Clear ¶ added in v0.1.32
func (s *EndpointMapShard) Clear()
func (*EndpointMapShard) Range ¶ added in v0.1.32
func (s *EndpointMapShard) Range(action func(Endpoint) bool)
type IPacket ¶
type IPacket interface {
// 消息命令,即如何执行消息
Command() int32
SetCommand(int32)
// 消息ID,即如何解析消息body
MsgID() uint32
SetMsgID(uint32)
// 会话序号
Seq() uint16
SetSeq(uint16)
// 消息Flag标记
Flags() PacketFlag
SetFlags(PacketFlag)
// 消息目标节点
Node() NodeID
SetNode(NodeID)
// 引用节点
Refers() []NodeID
SetRefers([]NodeID)
AddRefers(...NodeID)
// 消息错误码
Errno() int32
SetErrno(ec int32)
// 绑定的endpoint
Endpoint() MessageEndpoint
SetEndpoint(MessageEndpoint)
// 消息body,仅支持int32/int64/float64/string/bytes/proto.Message类型
Body() interface{}
SetBody(v interface{})
// 编码/解码
EncodeToBytes() []byte
DecodeTo(msg proto.Message) error
AutoDecode() error
// 响应ack消息
ReplyAny(body interface{}) error
Reply(ack proto.Message) error
// 响应错误码
Refuse(errno int32) error
// 拷贝和重置
Clone() IPacket
Reset()
// pool回收
Recycle()
}
定义应用层消息接口
type MessageEndpoint ¶
type MessageEndpoint interface {
// 节点ID
NodeID() NodeID
SetNodeID(NodeID)
// 远端地址
RemoteAddr() string
// 发送消息
SendPacket(IPacket) error
// 关闭读/写
Close() error
ForceClose(error)
IsRunning() bool
// 绑定自定义数据
SetUserData(interface{})
UserData() interface{}
}
绑定到消息上的endpoint
type NodeID ¶
type NodeID uint32
节点ID 一个32位整数表示的节点号,用以标识一个service(最高位为0),或者一个客户端session(最高位为1) 如果是服务编号:8位服务编号,16位服务实例编号
服务实例二进制布局 -------------------------------------- | reserved | service | instance | -------------------------------------- 32 24 16 0
type PacketFlag ¶
type PacketFlag uint16
消息标志位
const ( // 低8位用于表达一些传输flag PFlagCompressed PacketFlag = 0x01 // 压缩 PFlagEncrypted PacketFlag = 0x02 // 加密 PFlagError PacketFlag = 0x04 // 错误标记 PFlagRPC PacketFlag = 0x08 // 远程调用 PFlagHashCmd PacketFlag = 0x10 // Cmd是字符串hash // 高8位用于server之间通信使用 PFlagMulticast PacketFlag = 0x0100 // 组播消息 PFlagRoute PacketFlag = 0x0200 // 路由消息 )
func (PacketFlag) Clear ¶ added in v0.1.21
func (g PacketFlag) Clear(n PacketFlag) PacketFlag
func (PacketFlag) Has ¶ added in v0.1.21
func (g PacketFlag) Has(n PacketFlag) bool
type PacketMaker ¶ added in v0.3.6
type PacketMaker func() IPacket
func GetPacketMaker ¶ added in v0.3.8
func GetPacketMaker(name string) PacketMaker
type Service ¶
type Service interface {
Type() uint8
Name() string
// 节点ID
NodeID() NodeID
SetNodeID(id NodeID)
// 当前状态
State() int32
SetState(int32)
// 上下文对象
Context() *ServiceContext
// 初始化
Init(context.Context, *ServiceContext) error
// 启动服务
Startup(context.Context) error
}
一个Service代表一个服务
type ServiceContext ¶
type ServiceContext struct {
// contains filtered or unexported fields
}
服务的上下文
func NewServiceContext ¶
func NewServiceContext(queueSize int) *ServiceContext
func (*ServiceContext) ASend ¶ added in v0.2.5
func (c *ServiceContext) ASend(pkt IPacket) error
非阻塞发送
func (*ServiceContext) AddFinalizer ¶ added in v0.1.28
func (c *ServiceContext) AddFinalizer(action func())
func (*ServiceContext) InboundQueue ¶ added in v0.1.6
func (c *ServiceContext) InboundQueue() chan<- IPacket
消息队列,仅接收
func (*ServiceContext) Instance ¶ added in v0.1.2
func (c *ServiceContext) Instance() Service
service实例
func (*ServiceContext) MessageQueue ¶
func (c *ServiceContext) MessageQueue() <-chan IPacket
消息队列,仅消费
func (*ServiceContext) QuitDone ¶ added in v0.1.24
func (c *ServiceContext) QuitDone() <-chan struct{}
等待close完成
func (*ServiceContext) Send ¶ added in v0.1.5
func (c *ServiceContext) Send(pkt IPacket)
投递一条消息到context
func (*ServiceContext) SetInstance ¶ added in v0.2.9
func (c *ServiceContext) SetInstance(inst Service)
func (*ServiceContext) StartTime ¶ added in v0.1.24
func (c *ServiceContext) StartTime() time.Time
type State ¶ added in v0.1.19
type State int32
service state
func (*State) IsShuttingDown ¶ added in v0.1.19
func (*State) IsTerminated ¶ added in v0.1.19
Source Files
¶
Click to show internal directories.
Click to hide internal directories.