Documentation ¶
Overview ¶
扫(确认消息):
- sfid为扫文件编号,soffset为当前扫偏移量,两个字段表示当前扫描到哪个文件哪个位置
- sfid为0,sfid加1,查看文件是否存在,存在则映射
- 根据soffset和roffset读取内容,更新soffset
- 扫描完毕,删除数据文件,删除读步骤的map表记录
跳跃表 这个是干嘛用的?估计是遗留下来的代码?原本可能是存储引擎
Index ¶
- Constants
- Variables
- func Encode(m *Msg) []byte
- func Get(c *gin.Context, key string) string
- func GetDefault(c *gin.Context, key string, def string) string
- func GetDefaultInt(c *gin.Context, key string, def int) int
- func GetInt(c *gin.Context, key string) int
- func GetInt64(c *gin.Context, key string) int64
- func GinLogger(s *HttpServ) gin.HandlerFunc
- func GinRecovery(s *HttpServ, stack bool) gin.HandlerFunc
- func JsonData(c *gin.Context, data interface{})
- func JsonErr(c *gin.Context, err error)
- func JsonMsg(c *gin.Context, code int, msg string)
- func JsonSuccess(c *gin.Context, msg string)
- func LoadConfigFromFile(cfgFile string) (*configs.GnodeConfig, error)
- func NewGnodeConfig() *configs.GnodeConfig
- func NewQueue(name, bindKey string, ctx *Context, topic *Topic) *queue
- func NewSkiplist(level int) *skiplist
- func Post(c *gin.Context, key string) string
- type Channel
- type Client
- type ClientErr
- type Context
- type DelayMsg
- type Dispatcher
- func (d *Dispatcher) GetChannel(key string) *Channel
- func (d *Dispatcher) GetExistTopic(name string) (*Topic, error)
- func (d *Dispatcher) GetTopic(name string) *Topic
- func (d *Dispatcher) GetTopics() []*Topic
- func (d *Dispatcher) LogError(msg ...interface{})
- func (d *Dispatcher) LogInfo(msg ...interface{})
- func (d *Dispatcher) LogWarn(msg ...interface{})
- func (d *Dispatcher) RemoveChannel(key string)
- func (d *Dispatcher) RemoveTopic(name string)
- func (d *Dispatcher) Run()
- type FatalClientErr
- type Gnode
- type HttpApi
- func (h *HttpApi) Ack(c *gin.Context)
- func (h *HttpApi) Config(c *gin.Context)
- func (h *HttpApi) DeclareQueue(c *gin.Context)
- func (h *HttpApi) ExitTopic(c *gin.Context)
- func (h *HttpApi) GetAllTopicStat(c *gin.Context)
- func (h *HttpApi) GetQueuesByTopic(c *gin.Context)
- func (h *HttpApi) GetTopicStat(c *gin.Context)
- func (h *HttpApi) Ping(c *gin.Context)
- func (h *HttpApi) Pop(c *gin.Context)
- func (h *HttpApi) Push(c *gin.Context)
- func (h *HttpApi) SetIsAutoAck(c *gin.Context)
- type HttpServ
- type Msg
- type MsgIndex
- type QueueMeta
- type RClient
- type RecvMsgData
- type RespMsgData
- type TcpConn
- func (c *TcpConn) ACK(params [][]byte) error
- func (c *TcpConn) DEAD(params [][]byte) error
- func (c *TcpConn) DECLAREQUEUE(params [][]byte) error
- func (c *TcpConn) Handle()
- func (c *TcpConn) LogError(msg ...interface{})
- func (c *TcpConn) LogInfo(msg ...interface{})
- func (c *TcpConn) LogWarn(msg ...interface{})
- func (c *TcpConn) MPUB(params [][]byte) error
- func (c *TcpConn) PING()
- func (c *TcpConn) POP(params [][]byte) error
- func (c *TcpConn) PUB(params [][]byte) error
- func (c *TcpConn) PUBLISH(params [][]byte) error
- func (c *TcpConn) RespErr(err error) bool
- func (c *TcpConn) RespMsg(msg *Msg) bool
- func (c *TcpConn) RespRes(msg string) bool
- func (c *TcpConn) SET(params [][]byte) error
- func (c *TcpConn) SUBSCRIBE(params [][]byte) error
- func (c *TcpConn) Send(respType int16, respData []byte) error
- type TcpServ
- type Topic
- type TopicMeta
Constants ¶
const ( RESP_SUCCESS = iota // 响应成功 RESP_FAILED // 响应失败 )
const ( MSG_STATUS_DEFAULT = iota // 消息默认状态 MSG_STATUS_READ // 消息已被客户端读取,不确定是否已经读取成功 MSG_STATUS_WAIT // 消息已成功发送到客户端,等待客户端确认 MSG_STATUS_FIN // 已得到客户端确认,可移除消息 MSG_STATUS_EXPIRE // 未得到客户端确认,已超时 MSG_MAX_DELAY = 259200 // 最大延迟时间 MSG_MAX_TTR = 30 // 最大超时时间 MSG_MAX_RETRY = 5 // 消息最大重试次数 )
const ( RESP_MESSAGE = 101 RESP_ERROR = 102 RESP_RESULT = 103 RESP_CHANNEL = 104 RESP_PING = 105 )
const ( DEFAULT_KEY = "default" DEAD_QUEUE_FLAG = "dead" ROUTE_KEY_MATCH_FULL = 1 ROUTE_KEY_MATCH_FUZZY = 2 )
const GROW_SIZE = 10 * 1024 * 1024
const MSG_FIX_LENGTH = 7
header: flag(1 byte)+status(2 byte)+msg_len(4 byte)+msg(? byte)
const REWRITE_SIZE = 100 * 1024 * 1024
Variables ¶
var ( ErrParams = "E_INVALID_PARAMS" ErrDelay = "E_INVALID_DELAY" ErrReadConn = "E_INVALID_READ" ErrPopMsg = "E_INVALID_POP" ErrAckMsg = "E_INVALID_ACK" ErrJson = "E_INVALID_JSON" ErrPushNum = "E_INVALID_PUSHNUM" ErrPush = "E_INVALID_PUSH" ErrDead = "E_INVALID_DEAD" ErrSet = "E_INVALID_SET" ErrDeclare = "E_INVALID_DECLARE" ErrSubscribe = "E_INVALID_SUBSCRIBE" ErrUnkownCmd = "E_INVALID_CMD" ErrTopicEmpty = "E_INVALID_TOPIC" ErrBindKeyEmpty = "E_INVALID_BINDKEY" ErrChannelEmpty = "E_INVALID_CHANNEL" ErrPublish = "E_INVALID_PUBLISH" ErrResp = "E_INVALID_RESPONSE" )
var ( ErrMessageNotExist = errors.New("no message") ErrMessageNotExpire = errors.New("no message expire") )
var ( ErrNotMultiple = errors.New("this topic is not a multiple topic") ErrHaveRegistered = errors.New("this client have registered") )
var (
ErrEmpty = errors.New("skiplist is empty")
)
var (
ErrQueueClosed = errors.New("queue has been cloesd")
)
局部错误变量
Functions ¶
func GinRecovery ¶
func GinRecovery(s *HttpServ, stack bool) gin.HandlerFunc
GinRecovery recover掉项目可能出现的panic,并使用zap记录相关日志
func JsonSuccess ¶
func LoadConfigFromFile ¶
func LoadConfigFromFile(cfgFile string) (*configs.GnodeConfig, error)
func NewSkiplist ¶
func NewSkiplist(level int) *skiplist
Types ¶
type ClientErr ¶
type ClientErr struct {
// contains filtered or unexported fields
}
func NewClientErr ¶
type Context ¶
type Context struct { Gnode *Gnode // 上下文保存的 节点对象 Dispatcher *Dispatcher Conf *configs.GnodeConfig // 配置 Logger *logs.Dispatcher // 日志适配器 }
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
dispatcher 调度器,负责管理topic
func (*Dispatcher) GetChannel ¶
func (d *Dispatcher) GetChannel(key string) *Channel
get channel create channel if is not exist
func (*Dispatcher) GetExistTopic ¶
func (d *Dispatcher) GetExistTopic(name string) (*Topic, error)
get topic returns error when it is not exist
func (*Dispatcher) GetTopic ¶
func (d *Dispatcher) GetTopic(name string) *Topic
get topic by name create a new topic if it is not exist
func (*Dispatcher) LogError ¶
func (d *Dispatcher) LogError(msg ...interface{})
func (*Dispatcher) LogInfo ¶
func (d *Dispatcher) LogInfo(msg ...interface{})
func (*Dispatcher) LogWarn ¶
func (d *Dispatcher) LogWarn(msg ...interface{})
func (*Dispatcher) RemoveChannel ¶
func (d *Dispatcher) RemoveChannel(key string)
remove channel by channel.key
func (*Dispatcher) RemoveTopic ¶
func (d *Dispatcher) RemoveTopic(name string)
remove topic by topic.name
func (*Dispatcher) Run ¶
func (d *Dispatcher) Run()
调度器的阻塞运行 直到收到来自 Gnode 的退出通知 调用 d.exit() 通知所有子goroutine退出
type FatalClientErr ¶
type FatalClientErr struct {
// contains filtered or unexported fields
}
func NewFatalClientErr ¶
func NewFatalClientErr(code, desc string) *FatalClientErr
func (*FatalClientErr) Error ¶
func (err *FatalClientErr) Error() string
type Gnode ¶
type Gnode struct {
// contains filtered or unexported fields
}
func New ¶
func New(cfg *configs.GnodeConfig) *Gnode
type HttpApi ¶
type HttpApi struct {
// contains filtered or unexported fields
}
func (*HttpApi) Ack ¶
curl http://127.0.0.1:9504/ack?msgId=xxx&topic=xxx&bindKey=xxx
func (*HttpApi) Config ¶
curl "http://127.0.0.1:9504/config?topic=xxx&isAuthoAck=1&mode=1&msgTTR=30&msgRetry=5" 配置topic
func (*HttpApi) DeclareQueue ¶
curl "http://127.0.0.1:9504/declareQueue?topic=xxx&bindKey=kkk" 声明队列
func (*HttpApi) ExitTopic ¶
退出topic curl http://127.0.0.1:9504/exitTopic?topic=xxx http://127.0.0.1:9504/exitTopic?topic=xxx
func (*HttpApi) GetAllTopicStat ¶
获取所有topic统计信息 curl http://127.0.0.1:9504/getAllTopicStat http://127.0.0.1:9504/getAllTopicStat
func (*HttpApi) GetQueuesByTopic ¶
func (*HttpApi) GetTopicStat ¶
获取指定topic统计信息 curl "http://127.0.0.1:9504/getTopicStat?topic=ketang"
func (*HttpApi) Pop ¶
curl "http://127.0.0.1:9504/pop?topic=xxx&bindKey=xxx" 消费任务
func (*HttpApi) Push ¶
curl http://127.0.0.1:9504/push -X POST -d 'data={"body":"this is a job","topic":"xxx","delay":20,"route_key":"xxx"}' 推送消息
func (*HttpApi) SetIsAutoAck ¶
设置主题自动确认消息 curl http://127.0.0.1:9504/setIsAutoAck?topic=xxx http://127.0.0.1:9504/setIsAutoAck?topic=xxx
type HttpServ ¶
type HttpServ struct {
// contains filtered or unexported fields
}
func NewHttpServ ¶
type Msg ¶
type Msg struct { Id uint64 `json:"id"` Retry uint16 `json:"retry"` Delay uint32 `json:"delay"` Expire uint64 `json:"expire"` Body []byte `json:"body"` }
消息结构
type MsgIndex ¶
func NewMsgIndex ¶
type RecvMsgData ¶
type RecvMsgData struct { Body string `json:"body"` Topic string `json:"topic"` Delay int `json:"delay"` RouteKey string `json:"route_key"` }
TODO 对外暴露的结构
type RespMsgData ¶
type RespMsgData struct { Id string `json:"id"` Body string `json:"body"` Retry uint16 `json:"retry_count"` }
TODO 对外暴露的结构
type TcpConn ¶
type TcpConn struct {
// contains filtered or unexported fields
}
func (*TcpConn) DECLAREQUEUE ¶
declare queue queue <topic_name> <bind_key>\n
func (*TcpConn) MPUB ¶
mpub <topic_name> <num>\n <msg.len> <[]byte({"delay":1,"body":"xxx","topic":"xxx","routeKey":"xxx"})> <msg.len> <[]byte({"delay":1,"body":"xxx","topic":"xxx","routeKey":"xxx"})>
func (*TcpConn) PUB ¶
pub <topic_name> <route_key> <delay-time>\n [ 4-byte size in bytes ][ N-byte binary data ]
func (*TcpConn) PUBLISH ¶
publish message to channel publish <channel_name>\n <message_len> <message>
func (*TcpConn) SET ¶
设置topic信息,目前只有isAutoAck选项 set <topic_name> <isAutoAck> <mode> <msg_ttr> <msg_retry>\n