gnode

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 15, 2022 License: MIT Imports: 31 Imported by: 0

Documentation

Overview

扫(确认消息):

  • sfid为扫文件编号,soffset为当前扫偏移量,两个字段表示当前扫描到哪个文件哪个位置
  • sfid为0,sfid加1,查看文件是否存在,存在则映射
  • 根据soffset和roffset读取内容,更新soffset
  • 扫描完毕,删除数据文件,删除读步骤的map表记录

跳跃表 这个是干嘛用的?估计是遗留下来的代码?原本可能是存储引擎

Index

Constants

View Source
const (
	RESP_SUCCESS = iota // 响应成功
	RESP_FAILED         // 响应失败
)
View Source
const (
	MSG_STATUS_DEFAULT = iota // 消息默认状态
	MSG_STATUS_READ           // 消息已被客户端读取,不确定是否已经读取成功
	MSG_STATUS_WAIT           // 消息已成功发送到客户端,等待客户端确认
	MSG_STATUS_FIN            // 已得到客户端确认,可移除消息
	MSG_STATUS_EXPIRE         // 未得到客户端确认,已超时

	MSG_MAX_DELAY = 259200 // 最大延迟时间
	MSG_MAX_TTR   = 30     // 最大超时时间
	MSG_MAX_RETRY = 5      // 消息最大重试次数
)
View Source
const (
	RESP_MESSAGE = 101
	RESP_ERROR   = 102
	RESP_RESULT  = 103
	RESP_CHANNEL = 104
	RESP_PING    = 105
)
View Source
const (
	DEFAULT_KEY           = "default"
	DEAD_QUEUE_FLAG       = "dead"
	ROUTE_KEY_MATCH_FULL  = 1
	ROUTE_KEY_MATCH_FUZZY = 2
)
View Source
const GROW_SIZE = 10 * 1024 * 1024
View Source
const MSG_FIX_LENGTH = 7

header: flag(1 byte)+status(2 byte)+msg_len(4 byte)+msg(? byte)

View Source
const REWRITE_SIZE = 100 * 1024 * 1024

Variables

View Source
var (
	ErrParams       = "E_INVALID_PARAMS"
	ErrDelay        = "E_INVALID_DELAY"
	ErrReadConn     = "E_INVALID_READ"
	ErrPopMsg       = "E_INVALID_POP"
	ErrAckMsg       = "E_INVALID_ACK"
	ErrJson         = "E_INVALID_JSON"
	ErrPushNum      = "E_INVALID_PUSHNUM"
	ErrPush         = "E_INVALID_PUSH"
	ErrDead         = "E_INVALID_DEAD"
	ErrSet          = "E_INVALID_SET"
	ErrDeclare      = "E_INVALID_DECLARE"
	ErrSubscribe    = "E_INVALID_SUBSCRIBE"
	ErrUnkownCmd    = "E_INVALID_CMD"
	ErrTopicEmpty   = "E_INVALID_TOPIC"
	ErrBindKeyEmpty = "E_INVALID_BINDKEY"
	ErrChannelEmpty = "E_INVALID_CHANNEL"
	ErrPublish      = "E_INVALID_PUBLISH"
	ErrResp         = "E_INVALID_RESPONSE"
)
View Source
var (
	ErrMessageNotExist  = errors.New("no message")
	ErrMessageNotExpire = errors.New("no message expire")
)
View Source
var (
	ErrNotMultiple    = errors.New("this topic is not a multiple topic")
	ErrHaveRegistered = errors.New("this client have registered")
)
View Source
var (
	ErrEmpty = errors.New("skiplist is empty")
)
View Source
var (
	ErrQueueClosed = errors.New("queue has been cloesd")
)

局部错误变量

Functions

func Encode

func Encode(m *Msg) []byte

消息编码 expire(8-bytes) + id(8-bytes) + retry(2-bytes) + body(n-bytes)

func Get

func Get(c *gin.Context, key string) string

func GetDefault

func GetDefault(c *gin.Context, key string, def string) string

func GetDefaultInt

func GetDefaultInt(c *gin.Context, key string, def int) int

func GetInt

func GetInt(c *gin.Context, key string) int

func GetInt64

func GetInt64(c *gin.Context, key string) int64

func GinLogger

func GinLogger(s *HttpServ) gin.HandlerFunc

GinLogger 接收gin框架默认的日志

func GinRecovery

func GinRecovery(s *HttpServ, stack bool) gin.HandlerFunc

GinRecovery recover掉项目可能出现的panic,并使用zap记录相关日志

func JsonData

func JsonData(c *gin.Context, data interface{})

func JsonErr

func JsonErr(c *gin.Context, err error)

func JsonMsg

func JsonMsg(c *gin.Context, code int, msg string)

func JsonSuccess

func JsonSuccess(c *gin.Context, msg string)

func LoadConfigFromFile

func LoadConfigFromFile(cfgFile string) (*configs.GnodeConfig, error)

func NewGnodeConfig

func NewGnodeConfig() *configs.GnodeConfig

新建配置

func NewQueue

func NewQueue(name, bindKey string, ctx *Context, topic *Topic) *queue

新建一个队列

func NewSkiplist

func NewSkiplist(level int) *skiplist

func Post

func Post(c *gin.Context, key string) string

Types

type Channel

type Channel struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

TODO 一种抽象的通道

func NewChannel

func NewChannel(key string, ctx *Context) *Channel

func (*Channel) LogError

func (c *Channel) LogError(msg ...interface{})

func (*Channel) LogInfo

func (c *Channel) LogInfo(msg ...interface{})

func (*Channel) LogWarn

func (c *Channel) LogWarn(msg ...interface{})

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(addr string, weight int) (*Client, error)

func (*Client) Push

func (c *Client) Push(topic, delay, content string) error

生产消息

func (*Client) Recv

func (c *Client) Recv() (int, []byte)

type ClientErr

type ClientErr struct {
	// contains filtered or unexported fields
}

func NewClientErr

func NewClientErr(code, desc string) *ClientErr

func (*ClientErr) Error

func (err *ClientErr) Error() string

type Context

type Context struct {
	Gnode      *Gnode // 上下文保存的 节点对象
	Dispatcher *Dispatcher
	Conf       *configs.GnodeConfig // 配置
	Logger     *logs.Dispatcher     // 日志适配器
}

type DelayMsg

type DelayMsg struct {
	Msg      *Msg     `json:"msg"`
	BindKeys []string `json:"bind_key"`
}

延迟消息结构

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

dispatcher 调度器,负责管理topic

func NewDispatcher

func NewDispatcher(ctx *Context) *Dispatcher

新建并初始化 调度器

func (*Dispatcher) GetChannel

func (d *Dispatcher) GetChannel(key string) *Channel

get channel create channel if is not exist

func (*Dispatcher) GetExistTopic

func (d *Dispatcher) GetExistTopic(name string) (*Topic, error)

get topic returns error when it is not exist

func (*Dispatcher) GetTopic

func (d *Dispatcher) GetTopic(name string) *Topic

get topic by name create a new topic if it is not exist

func (*Dispatcher) GetTopics

func (d *Dispatcher) GetTopics() []*Topic

get all topics

func (*Dispatcher) LogError

func (d *Dispatcher) LogError(msg ...interface{})

func (*Dispatcher) LogInfo

func (d *Dispatcher) LogInfo(msg ...interface{})

func (*Dispatcher) LogWarn

func (d *Dispatcher) LogWarn(msg ...interface{})

func (*Dispatcher) RemoveChannel

func (d *Dispatcher) RemoveChannel(key string)

remove channel by channel.key

func (*Dispatcher) RemoveTopic

func (d *Dispatcher) RemoveTopic(name string)

remove topic by topic.name

func (*Dispatcher) Run

func (d *Dispatcher) Run()

调度器的阻塞运行 直到收到来自 Gnode 的退出通知 调用 d.exit() 通知所有子goroutine退出

type FatalClientErr

type FatalClientErr struct {
	// contains filtered or unexported fields
}

func NewFatalClientErr

func NewFatalClientErr(code, desc string) *FatalClientErr

func (*FatalClientErr) Error

func (err *FatalClientErr) Error() string

type Gnode

type Gnode struct {
	// contains filtered or unexported fields
}

func New

func New(cfg *configs.GnodeConfig) *Gnode

func (*Gnode) Exit

func (gn *Gnode) Exit()

通知并等待所有 goroutine 退出

func (*Gnode) Run

func (gn *Gnode) Run()

开始运行

type HttpApi

type HttpApi struct {
	// contains filtered or unexported fields
}

func (*HttpApi) DeclareQueue

func (h *HttpApi) DeclareQueue(c *gin.Context)

curl "http://127.0.0.1:9504/declareQueue?topic=xxx&bindKey=kkk" 声明队列

func (*HttpApi) GetAllTopicStat

func (h *HttpApi) GetAllTopicStat(c *gin.Context)

获取所有topic统计信息 curl http://127.0.0.1:9504/getAllTopicStat http://127.0.0.1:9504/getAllTopicStat

func (*HttpApi) GetQueuesByTopic

func (h *HttpApi) GetQueuesByTopic(c *gin.Context)

func (*HttpApi) GetTopicStat

func (h *HttpApi) GetTopicStat(c *gin.Context)

获取指定topic统计信息 curl "http://127.0.0.1:9504/getTopicStat?topic=ketang"

func (*HttpApi) Ping

func (h *HttpApi) Ping(c *gin.Context)

心跳接口

func (*HttpApi) Pop

func (h *HttpApi) Pop(c *gin.Context)

curl "http://127.0.0.1:9504/pop?topic=xxx&bindKey=xxx" 消费任务

func (*HttpApi) Push

func (h *HttpApi) Push(c *gin.Context)

curl http://127.0.0.1:9504/push -X POST -d 'data={"body":"this is a job","topic":"xxx","delay":20,"route_key":"xxx"}' 推送消息

func (*HttpApi) SetIsAutoAck

func (h *HttpApi) SetIsAutoAck(c *gin.Context)

设置主题自动确认消息 curl http://127.0.0.1:9504/setIsAutoAck?topic=xxx http://127.0.0.1:9504/setIsAutoAck?topic=xxx

type HttpServ

type HttpServ struct {
	// contains filtered or unexported fields
}

func NewHttpServ

func NewHttpServ(ctx *Context) *HttpServ

func (*HttpServ) LogDebug

func (s *HttpServ) LogDebug(msg interface{})

func (*HttpServ) LogError

func (s *HttpServ) LogError(msg interface{})

func (*HttpServ) LogInfo

func (s *HttpServ) LogInfo(msg interface{})

func (*HttpServ) LogWarn

func (s *HttpServ) LogWarn(msg interface{})

func (*HttpServ) Run

func (s *HttpServ) Run()

type Msg

type Msg struct {
	Id     uint64 `json:"id"`
	Retry  uint16 `json:"retry"`
	Delay  uint32 `json:"delay"`
	Expire uint64 `json:"expire"`
	Body   []byte `json:"body"`
}

消息结构

func Decode

func Decode(data []byte) *Msg

消息解码 expire(8-bytes) + id(8-bytes) + retry(2-bytes) + body(n-bytes)

type MsgIndex

type MsgIndex struct {
	Fid    int
	Offset int
}

func NewMsgIndex

func NewMsgIndex(fid, offset int) *MsgIndex

type QueueMeta

type QueueMeta struct {
	Num         int64  `sjon:"queue_num"`    // 队列长度
	Name        string `sjon:"queue_name"`   // 队列名字
	BindKey     string `json:"bind_key"`     // 队列绑定的唯一值
	WriteOffset int64  `json:"write_offset"` // 写指针
	ReadOffset  int64  `json:"read_offset"`  // 读指针
	ScanOffset  int64  `json:"scan_offset"`  // 扫描指针
}

type RClient added in v0.6.0

type RClient struct {
	// contains filtered or unexported fields
}

type RecvMsgData

type RecvMsgData struct {
	Body     string `json:"body"`
	Topic    string `json:"topic"`
	Delay    int    `json:"delay"`
	RouteKey string `json:"route_key"`
}

TODO 对外暴露的结构

type RespMsgData

type RespMsgData struct {
	Id    string `json:"id"`
	Body  string `json:"body"`
	Retry uint16 `json:"retry_count"`
}

TODO 对外暴露的结构

type TcpConn

type TcpConn struct {
	// contains filtered or unexported fields
}

func (*TcpConn) ACK

func (c *TcpConn) ACK(params [][]byte) error

确认消息 ack <message_id> <topic> <bind_key>\n

func (*TcpConn) DEAD

func (c *TcpConn) DEAD(params [][]byte) error

死信队列消费 dead <topic_name> <bind_key>\n

func (*TcpConn) DECLAREQUEUE

func (c *TcpConn) DECLAREQUEUE(params [][]byte) error

declare queue queue <topic_name> <bind_key>\n

func (*TcpConn) Handle

func (c *TcpConn) Handle()

<cmd_name> <param_1> ... <param_n>\n

func (*TcpConn) LogError

func (c *TcpConn) LogError(msg ...interface{})

func (*TcpConn) LogInfo

func (c *TcpConn) LogInfo(msg ...interface{})

func (*TcpConn) LogWarn

func (c *TcpConn) LogWarn(msg ...interface{})

func (*TcpConn) MPUB

func (c *TcpConn) MPUB(params [][]byte) error

mpub <topic_name> <num>\n <msg.len> <[]byte({"delay":1,"body":"xxx","topic":"xxx","routeKey":"xxx"})> <msg.len> <[]byte({"delay":1,"body":"xxx","topic":"xxx","routeKey":"xxx"})>

func (*TcpConn) PING

func (c *TcpConn) PING()

func (*TcpConn) POP

func (c *TcpConn) POP(params [][]byte) error

消费消息 pop <topic_name> <bind_key>\n

func (*TcpConn) PUB

func (c *TcpConn) PUB(params [][]byte) error

pub <topic_name> <route_key> <delay-time>\n [ 4-byte size in bytes ][ N-byte binary data ]

func (*TcpConn) PUBLISH

func (c *TcpConn) PUBLISH(params [][]byte) error

publish message to channel publish <channel_name>\n <message_len> <message>

func (*TcpConn) RespErr

func (c *TcpConn) RespErr(err error) bool

func (*TcpConn) RespMsg

func (c *TcpConn) RespMsg(msg *Msg) bool

func (*TcpConn) RespRes

func (c *TcpConn) RespRes(msg string) bool

func (*TcpConn) SET

func (c *TcpConn) SET(params [][]byte) error

设置topic信息,目前只有isAutoAck选项 set <topic_name> <isAutoAck> <mode> <msg_ttr> <msg_retry>\n

func (*TcpConn) SUBSCRIBE

func (c *TcpConn) SUBSCRIBE(params [][]byte) error

subscribe channel subscribe <channel_name> \n

func (*TcpConn) Send

func (c *TcpConn) Send(respType int16, respData []byte) error

type TcpServ

type TcpServ struct {
	// contains filtered or unexported fields
}

func NewTcpServ

func NewTcpServ(ctx *Context) *TcpServ

func (*TcpServ) LogDebug

func (s *TcpServ) LogDebug(msg interface{})

func (*TcpServ) LogError

func (s *TcpServ) LogError(msg interface{})

func (*TcpServ) LogInfo

func (s *TcpServ) LogInfo(msg interface{})

func (*TcpServ) LogWarn

func (s *TcpServ) LogWarn(msg interface{})

func (*TcpServ) Run

func (s *TcpServ) Run()

type Topic

type Topic struct {
	sync.Mutex
	// contains filtered or unexported fields
}

topic 消息主题,即消息类型,每一条消息都有所属topic,topic会维护多个queue topic 的所有数据会存放在 他自身拥有的 Dispatcher 的DB对象的一个桶中(topic.name 为该桶的 key)

func NewTopic

func NewTopic(name string, ctx *Context) *Topic

func (*Topic) LogDebug

func (t *Topic) LogDebug(msg interface{})

func (*Topic) LogError

func (t *Topic) LogError(msg interface{})

func (*Topic) LogInfo

func (t *Topic) LogInfo(msg interface{})

func (*Topic) LogTrace

func (t *Topic) LogTrace(msg interface{})

func (*Topic) LogWarn

func (t *Topic) LogWarn(msg interface{})

type TopicMeta

type TopicMeta struct {
	Mode        int         `json:"mode"`
	PopNum      int64       `json:"pop_num"`
	PushNum     int64       `json:"push_num"`
	DeadNum     int64       `json:"dead_num"`
	IsAutoAck   bool        `json:"is_auto_ack"`
	Queues      []QueueMeta `json:"queues"`
	DeataQueues []QueueMeta `json:"dead_queues"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL