Documentation ¶
Index ¶
- Constants
- Variables
- func Encode(m *Msg) []byte
- func NewQueue(name, bindKey string, topic *Topic) *queue
- type Channel
- type ClientErr
- type DelayMsg
- type Dispatcher
- func (d *Dispatcher) GetChannel(key string) *Channel
- func (d *Dispatcher) GetExistTopic(name string) (*Topic, error)
- func (d *Dispatcher) GetTopic(name string) *Topic
- func (d *Dispatcher) GetTopics() []*Topic
- func (d *Dispatcher) RemoveChannel(key string)
- func (d *Dispatcher) RemoveTopic(name string)
- func (d *Dispatcher) Run()
- func (d *Dispatcher) Set(name string, configure *topicConfigure) error
- type FatalClientErr
- type Msg
- type MsgIndex
- type QueueMeta
- type RecvMsgData
- type RespMsgData
- type Server
- type TcpConn
- func (c *TcpConn) ACK(params [][]byte) error
- func (c *TcpConn) DEAD(params [][]byte) error
- func (c *TcpConn) DECLAREQUEUE(params [][]byte) error
- func (c *TcpConn) Handle()
- func (c *TcpConn) MPUB(params [][]byte) error
- func (c *TcpConn) PING() error
- func (c *TcpConn) POP(params [][]byte) error
- func (c *TcpConn) PUB(params [][]byte) error
- func (c *TcpConn) PUBLISH(params [][]byte) error
- func (c *TcpConn) RespErr(err error) error
- func (c *TcpConn) RespMsg(msg *Msg) error
- func (c *TcpConn) RespOk() error
- func (c *TcpConn) RespPing() error
- func (c *TcpConn) RespRes(res []byte) error
- func (c *TcpConn) SET(params [][]byte) error
- func (c *TcpConn) SUBSCRIBE(params [][]byte) error
- func (c *TcpConn) Send(respType int16, respData []byte) error
- type TcpServ
- type Topic
- type TopicMeta
Constants ¶
const ( DEFAULT_KEY = "default" DEAD_QUEUE_FLAG = "dead" ROUTE_KEY_MATCH_FULL = 1 ROUTE_KEY_MATCH_FUZZY = 2 )
const ( MSG_STATUS_DEFAULT = iota // 消息默认状态 MSG_STATUS_READ // 消费已被客户端读取,当不确定是否已经没连接是否发送成功 MSG_STATUS_WAIT // 消息已成功发送到客户端,等待客户端确认 MSG_STATUS_FIN // 已得到客户端确认,可移除消息 MSG_STATUS_EXPIRE // 未得到客户端确认,已超时 MSG_MAX_DELAY = 259200 // 最大延迟时间 MSG_MAX_TTR = 30 // 最大超时时间 MSG_MAX_RETRY = 5 // 消息最大重试次数 )
const ( RespResult = 101 RespError = 102 )
const GROW_SIZE = 10 * 1024 * 1024
const MSG_FIX_LENGTH = 1 + 2 + 4
flag(1-byte) + status(2-bytes) + msg_len(4-bytes) + msg(n-bytes)
const REWRITE_SIZE = 100 * 1024 * 1024
Variables ¶
var ( ErrParams = "E_INVALID_PARAMS" ErrDelay = "E_INVALID_DELAY" ErrReadConn = "E_INVALID_READ" ErrPopMsg = "E_INVALID_POP" ErrAckMsg = "E_INVALID_ACK" ErrJson = "E_INVALID_JSON" ErrPushNum = "E_INVALID_PUSHNUM" ErrPush = "E_INVALID_PUSH" ErrDead = "E_INVALID_DEAD" ErrSet = "E_INVALID_SET" ErrDeclare = "E_INVALID_DECLARE" ErrSubscribe = "E_INVALID_SUBSCRIBE" ErrUnkownCmd = "E_INVALID_CMD" ErrTopicEmpty = "E_INVALID_TOPIC" ErrBindKeyEmpty = "E_INVALID_BINDKEY" ErrChannelEmpty = "E_INVALID_CHANNEL" ErrPublish = "E_INVALID_PUBLISH" ErrResp = "E_INVALID_RESPONSE" )
var ( ErrMessageNotExist = errors.New("no message") ErrMessageNotExpire = errors.New("no message expire") )
Functions ¶
Types ¶
type Channel ¶
func NewChannel ¶
type ClientErr ¶
type ClientErr struct {
// contains filtered or unexported fields
}
func NewClientErr ¶
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
func NewDispatcher ¶
func NewDispatcher(cfg *config.Config) *Dispatcher
func (*Dispatcher) GetChannel ¶
func (d *Dispatcher) GetChannel(key string) *Channel
GetChannel get channel create channel if is not exist
func (*Dispatcher) GetExistTopic ¶
func (d *Dispatcher) GetExistTopic(name string) (*Topic, error)
GetExistTopic get topic returns error when it is not exist
func (*Dispatcher) GetTopic ¶
func (d *Dispatcher) GetTopic(name string) *Topic
GetTopic get topic create topic if it is not exist
func (*Dispatcher) RemoveChannel ¶
func (d *Dispatcher) RemoveChannel(key string)
RemoveChannel remove channel by channel.key
func (*Dispatcher) RemoveTopic ¶
func (d *Dispatcher) RemoveTopic(name string)
RemoveTopic remove topic by topic.name
func (*Dispatcher) Run ¶
func (d *Dispatcher) Run()
func (*Dispatcher) Set ¶
func (d *Dispatcher) Set(name string, configure *topicConfigure) error
Set config
type FatalClientErr ¶
type FatalClientErr struct {
// contains filtered or unexported fields
}
func NewFatalClientErr ¶
func NewFatalClientErr(code, desc string) *FatalClientErr
func (*FatalClientErr) Error ¶
func (err *FatalClientErr) Error() string
type Msg ¶
type Msg struct { Id uint64 `json:"id"` Retry uint16 `json:"retry"` Delay uint32 `json:"delay"` Expire uint64 `json:"expire"` Body []byte `json:"body"` }
Msg 消息结构
type MsgIndex ¶
func NewMsgIndex ¶
type RecvMsgData ¶
type RespMsgData ¶
type TcpConn ¶
type TcpConn struct {
// contains filtered or unexported fields
}
func (*TcpConn) DECLAREQUEUE ¶
DECLAREQUEUE declare queue queue <topic_name> <bind_key>\n
func (*TcpConn) MPUB ¶
MPUB <topic_name> <num>\n <msg.len> <[]byte({"delay":1,"body":"xxx","topic":"xxx","routeKey":"xxx"})> <msg.len> <[]byte({"delay":1,"body":"xxx","topic":"xxx","routeKey":"xxx"})>
func (*TcpConn) PUB ¶
PUB <topic_name> <route_key> <delay-time>\n [ 4-byte size in bytes ][ N-byte binary data ]
func (*TcpConn) PUBLISH ¶
PUBLISH message to channel publish <channel_name>\n <message_len> <message>
type TopicMeta ¶
type TopicMeta struct { MsgTTR int `json:"msg_ttr"` MsgRetry int `json:"msg_retry"` Mode int `json:"mode"` IsAutoAck bool `json:"is_auto_ack"` PopNum int64 `json:"pop_num"` PushNum int64 `json:"push_num"` DeadNum int64 `json:"dead_num"` Queues []QueueMeta `json:"queues"` DeadQueues []QueueMeta `json:"dead_queues"` }