Documentation
¶
Index ¶
- Constants
- type Client
- type HandlerRegistrar
- type Message
- type MessageHandler
- type MsgHandlerAdapter
- type MsgHandlerSelector
- type NetListenHandler
- type ProcessMessageHandler
- type Queue
- type QueueBroadcastor
- type QueueMessageHandler
- type ReadMessageHandler
- type Server
- type ServerOption
- func WithDefaultQueueSize(defaultQueueSize int) ServerOption
- func WithHeartbeatDuration(heartbeatDuration time.Duration) ServerOption
- func WithMsgHandlerSelector(msgHandlerSelector MsgHandlerSelector) ServerOption
- func WithNetwork(network string) ServerOption
- func WithSocketWriteDeadline(socketWriteDeadline time.Duration) ServerOption
- func WithTopicQueueSize(topicQueueSize map[string]int) ServerOption
- type TimeoutReadWriteCloser
- type TopicQueue
Constants ¶
const ( //请求授权 REQ_AUTH string = "REQ_AUTH" //响应授权 RES_AUTH string = "RES_AUTH" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client interface {
// Close 关闭连接
Close() error
// Submit 发起请求,不需要等待服务器返回处理结果(并行安全),
// expireAfterNowMillis用于设置消息的过期时间,单位为ms。过期时间 = 服务端接收消息的时间 + expireAfterNowMillis,
// 当expireAfterNowMillis=0时,提交的消息无过期时间。
Submit(topic string, payload []byte, expireAfterNowMillis uint32) error
// SubmitWithAsyncCallback 发起请求,不等待服务器返回处理结果,通过callback异步处理服务端响应(并行不安全),
// expireAfterNowMillis用于设置消息的过期时间,单位为ms。过期时间 = 服务端接收消息的时间 + expireAfterNowMillis,
// 当expireAfterNowMillis=0时,提交的消息无过期时间。
// callback 异步处理服务端响应。
SubmitWithAsyncCallback(topic string, payload []byte, expireAfterNowMillis uint32, callback func(msg *common.Msg, occuredErr error)) error
// SubmitWaitReply 发起请求,需要等待服务器返回处理结果(并行不安全)
// expireAfterNowMillis用于设置消息的过期时间,单位为ms。过期时间 = 服务端接收消息的时间 + expireAfterNowMillis
// 当expireAfterNowMillis=0时,提交的消息无过期时间
SubmitWaitReply(topic string, payload []byte, expireAfterNowMillis uint32) (*common.Msg, error)
// Query 查询topic对应的消息,不删除消息(并行不安全)
Query(topic string, payload []byte) (*common.Msg, error)
// QueryPop查询topic对应的消息,并删除消息(并行不安全)
QueryPop(topic string, payload []byte) (*common.Msg, error)
// QueryStream 发起流请求,通过Receive接收服务器返回处理结果,一个消息只能被一个client查询(并行不安全)
QueryStream(topic string, payload []byte) (receive func() (*common.Msg, error), err error)
// QueryStreamGroup 发起流请求,通过Receive接收服务器返回处理结果,每个消息可以被多个client查询(并行不安全)
QueryStreamGroup(topic string, payload []byte) (receive func() (*common.Msg, error), err error)
// Auth 请求授权
Auth(payload []byte) error
SetTimeOut(timeout time.Duration)
}
func NewClientByConn ¶ added in v1.0.0
func NewClientByConn(conn TimeoutReadWriteCloser, clientId uint32) Client
type HandlerRegistrar ¶
type HandlerRegistrar interface {
// RegisterGlobalPreHandler 注册全局的消息处理器,优先执行。
RegisterGlobalPreHandler(handlers ...MessageHandler)
// RegisterHandler 具体的消息处理器,在全局的消息处理器后执行。
RegisterHandler(topic string, handlers ...MessageHandler)
GetHandlers(topic string) []MessageHandler
}
HandlerRegistrar 消息处理器,注册中心。
type MessageHandler ¶
type MessageHandler interface {
// OnReceive 处理接收到的消息。
// 如果返回的newMessage为nil,则继续执行后续的Handler。
// 如果newMessage的Type为RESPONSE,则直接将newMessage通知客户端,终止后续handler。
// 如果newMessage不为nil,则将newMessage视为新的消息(或指令),放入队列。
OnReceive(requestMsg *Message) (newMessage *Message)
// OnResponse 当服务端向客户端返回消息时,执行。当存在多个MessageHandler时,按照注册时的反序执行。
OnResponse(responseMsg *Message, requestMsg *Message) (replyToClientMsg *Message)
}
MessageHandler 消息处理
var DefaultMsgHandler MessageHandler = &defaultMsgHandler{}
DefaultMsgHandler 默认消息处理,什么都不做。
var SuccessResponseMsgHandler MessageHandler = &successResponseMsgHandler{}
SuccessResponseMsgHandler POST请求,默认返回成功。
func NewSimpleAuthMsgHandler ¶
func NewSimpleAuthMsgHandler(authorizeLogic func(msg *Message) bool, authCodeGen func(msg *Message) string) MessageHandler
NewSimpleAuthMsgHandler 授权设备接入。 创建设备接入授权Handler(简单版本,可仿造该版本随意定制)。 authorizeLogic:决定是否授权。 authCodeGen:生成授权码。
type MsgHandlerAdapter ¶ added in v1.1.0
type MsgHandlerAdapter struct {
}
MsgHandlerAdapter 实现了默认的OnResponse方法。
func (MsgHandlerAdapter) OnResponse ¶ added in v1.1.0
func (h MsgHandlerAdapter) OnResponse(responseMsg *Message, requestMsg *Message) (replyToClientMsg *Message)
type MsgHandlerSelector ¶ added in v1.1.1
type NetListenHandler ¶
type NetListenHandler struct {
flowprocess.TaskHandlerAdapter
// contains filtered or unexported fields
}
Node0,监听端口
func (*NetListenHandler) Handle ¶
func (h *NetListenHandler) Handle(inTask flowprocess.Task, dispatch func(outTask flowprocess.Task) error) (err error)
type ProcessMessageHandler ¶
type ProcessMessageHandler struct {
flowprocess.TaskHandlerAdapter
// contains filtered or unexported fields
}
Node2 处理和回复消息
func (*ProcessMessageHandler) Handle ¶
func (h *ProcessMessageHandler) Handle(inTask flowprocess.Task, dispatch func(outTask flowprocess.Task) error) error
type Queue ¶
type Queue interface {
Len() int
PushFront(msg *Message)
PushBack(msg *Message)
PopFront() (*Message, bool)
PopBack() (*Message, bool)
Front() (*Message, bool)
Back() (*Message, bool)
Wait() <-chan struct{}
Size() int
}
func NewListQueue ¶ added in v1.1.0
func NewOrderQueue ¶ added in v1.1.0
type QueueBroadcastor ¶ added in v1.1.0
type QueueBroadcastor struct {
// contains filtered or unexported fields
}
QueueBroadcastor 将sourceQueue里的数据,广播给多个接收者。例如: sourceQueue --msgs(goroutine0)--|---msgs---> [splitQueue1] --msgs(goroutine1)--> broadcast to Target1
|---msgs---> [splitQueue2] --msgs(goroutine2)--> broadcast to Target2 |---msgs---> [splitQueue3] --msgs(goroutine3)--> broadcast to Target3
func NewQueueBroadcastor ¶ added in v1.1.0
type QueueMessageHandler ¶
type QueueMessageHandler struct {
flowprocess.TaskHandlerAdapter
// contains filtered or unexported fields
}
Node3 将消息放入队列, 单goroutine运行,一般不用加锁
func NewQueueMessageHandler ¶ added in v1.1.0
func NewQueueMessageHandler(server *server) *QueueMessageHandler
func (*QueueMessageHandler) Handle ¶
func (h *QueueMessageHandler) Handle(inTask flowprocess.Task, dispatch func(outTask flowprocess.Task) error) error
type ReadMessageHandler ¶
type ReadMessageHandler struct {
flowprocess.TaskHandlerAdapter
// contains filtered or unexported fields
}
Node1,组装消息
func (*ReadMessageHandler) Handle ¶
func (h *ReadMessageHandler) Handle(inTask flowprocess.Task, dispatch func(outTask flowprocess.Task) error) (err error)
type Server ¶
type Server interface {
HandlerRegistrar
Start() error
// Submit 在Server启动后,直接提交消息
Submit(msg *common.Msg) (*common.Msg, error)
Close() error
}
func NewServer ¶
func NewServer(address string, options ...ServerOption) Server
type ServerOption ¶
type ServerOption func(s *server)
func WithDefaultQueueSize ¶
func WithDefaultQueueSize(defaultQueueSize int) ServerOption
WithDefaultQueueSize 配置默认的topicQueueSize
func WithHeartbeatDuration ¶ added in v1.0.0
func WithHeartbeatDuration(heartbeatDuration time.Duration) ServerOption
WithHeartbeatDuration 心跳周期
func WithMsgHandlerSelector ¶ added in v1.1.1
func WithMsgHandlerSelector(msgHandlerSelector MsgHandlerSelector) ServerOption
func WithNetwork ¶ added in v1.1.0
func WithNetwork(network string) ServerOption
WithNetwork 必须是 "tcp", "tcp4", "tcp6", "unix", "unixpacket".
func WithSocketWriteDeadline ¶
func WithSocketWriteDeadline(socketWriteDeadline time.Duration) ServerOption
WithSocketWriteDeadline 配置端口超时
func WithTopicQueueSize ¶
func WithTopicQueueSize(topicQueueSize map[string]int) ServerOption
WithTopicQueueSize 配置topicQueueSize
