server

package
v0.0.0-...-1df5a02 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 12, 2022 License: MIT Imports: 24 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DEFAULT_KEY           = "default"
	DEAD_QUEUE_FLAG       = "dead"
	ROUTE_KEY_MATCH_FULL  = 1
	ROUTE_KEY_MATCH_FUZZY = 2
)
View Source
const (
	MSG_STATUS_DEFAULT = iota // 消息默认状态
	MSG_STATUS_READ           // 消费已被客户端读取,当不确定是否已经没连接是否发送成功
	MSG_STATUS_WAIT           // 消息已成功发送到客户端,等待客户端确认
	MSG_STATUS_FIN            // 已得到客户端确认,可移除消息
	MSG_STATUS_EXPIRE         // 未得到客户端确认,已超时

	MSG_MAX_DELAY = 259200 // 最大延迟时间
	MSG_MAX_TTR   = 30     // 最大超时时间
	MSG_MAX_RETRY = 5      // 消息最大重试次数
)
View Source
const (
	RespResult = 101
	RespError  = 102
)
View Source
const GROW_SIZE = 10 * 1024 * 1024
View Source
const MSG_FIX_LENGTH = 1 + 2 + 4

flag(1-byte) + status(2-bytes) + msg_len(4-bytes) + msg(n-bytes)

View Source
const REWRITE_SIZE = 100 * 1024 * 1024

Variables

View Source
var (
	ErrParams       = "E_INVALID_PARAMS"
	ErrDelay        = "E_INVALID_DELAY"
	ErrReadConn     = "E_INVALID_READ"
	ErrPopMsg       = "E_INVALID_POP"
	ErrAckMsg       = "E_INVALID_ACK"
	ErrJson         = "E_INVALID_JSON"
	ErrPushNum      = "E_INVALID_PUSHNUM"
	ErrPush         = "E_INVALID_PUSH"
	ErrDead         = "E_INVALID_DEAD"
	ErrSet          = "E_INVALID_SET"
	ErrDeclare      = "E_INVALID_DECLARE"
	ErrSubscribe    = "E_INVALID_SUBSCRIBE"
	ErrUnkownCmd    = "E_INVALID_CMD"
	ErrTopicEmpty   = "E_INVALID_TOPIC"
	ErrBindKeyEmpty = "E_INVALID_BINDKEY"
	ErrChannelEmpty = "E_INVALID_CHANNEL"
	ErrPublish      = "E_INVALID_PUBLISH"
	ErrResp         = "E_INVALID_RESPONSE"
)
View Source
var (
	ErrMessageNotExist  = errors.New("no message")
	ErrMessageNotExpire = errors.New("no message expire")
)

Functions

func Encode

func Encode(m *Msg) []byte

Encode 消息编码 expire(8-bytes) + id(8-bytes) + retry(2-bytes) + body(n-bytes)

func NewQueue

func NewQueue(name, bindKey string, topic *Topic) *queue

Types

type Channel

type Channel struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewChannel

func NewChannel(key string) *Channel

type ClientErr

type ClientErr struct {
	// contains filtered or unexported fields
}

func NewClientErr

func NewClientErr(code, desc string) *ClientErr

func (*ClientErr) Error

func (err *ClientErr) Error() string

type DelayMsg

type DelayMsg struct {
	Msg      *Msg     `json:"msg"`
	BindKeys []string `json:"bind_key"`
}

DelayMsg 延迟消息结构

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

func NewDispatcher

func NewDispatcher(cfg *config.Config) *Dispatcher

func (*Dispatcher) GetChannel

func (d *Dispatcher) GetChannel(key string) *Channel

GetChannel get channel create channel if is not exist

func (*Dispatcher) GetExistTopic

func (d *Dispatcher) GetExistTopic(name string) (*Topic, error)

GetExistTopic get topic returns error when it is not exist

func (*Dispatcher) GetTopic

func (d *Dispatcher) GetTopic(name string) *Topic

GetTopic get topic create topic if it is not exist

func (*Dispatcher) GetTopics

func (d *Dispatcher) GetTopics() []*Topic

GetTopics get all topics

func (*Dispatcher) RemoveChannel

func (d *Dispatcher) RemoveChannel(key string)

RemoveChannel remove channel by channel.key

func (*Dispatcher) RemoveTopic

func (d *Dispatcher) RemoveTopic(name string)

RemoveTopic remove topic by topic.name

func (*Dispatcher) Run

func (d *Dispatcher) Run()

func (*Dispatcher) Set

func (d *Dispatcher) Set(name string, configure *topicConfigure) error

Set config

type FatalClientErr

type FatalClientErr struct {
	// contains filtered or unexported fields
}

func NewFatalClientErr

func NewFatalClientErr(code, desc string) *FatalClientErr

func (*FatalClientErr) Error

func (err *FatalClientErr) Error() string

type Msg

type Msg struct {
	Id     uint64 `json:"id"`
	Retry  uint16 `json:"retry"`
	Delay  uint32 `json:"delay"`
	Expire uint64 `json:"expire"`
	Body   []byte `json:"body"`
}

Msg 消息结构

func Decode

func Decode(data []byte) *Msg

Decode 消息解码 expire(8-bytes) + id(8-bytes) + retry(2-bytes) + body(n-bytes)

type MsgIndex

type MsgIndex struct {
	Fid    int
	Offset int
}

func NewMsgIndex

func NewMsgIndex(fid, offset int) *MsgIndex

type QueueMeta

type QueueMeta struct {
	Num         int64  `json:"queue_num"`
	Name        string `json:"queue_name"`
	BindKey     string `json:"bind_key"`
	WriteOffset int64  `json:"write_offset"`
	ReadOffset  int64  `json:"read_offset"`
	ScanOffset  int64  `json:"scan_offset"`
}

type RecvMsgData

type RecvMsgData struct {
	Body     string `json:"body"`
	Topic    string `json:"topic"`
	Delay    int    `json:"delay"`
	RouteKey string `json:"route_key"`
}

type RespMsgData

type RespMsgData struct {
	Id    string `json:"id"`
	Body  string `json:"body"`
	Retry uint16 `json:"retry_count"`
}

type Server

type Server struct {
	Logger log.Logger
	// contains filtered or unexported fields
}

func NewServer

func NewServer(cfg *config.Config) *Server

func (*Server) Exit

func (s *Server) Exit()

func (*Server) Run

func (s *Server) Run() error

type TcpConn

type TcpConn struct {
	// contains filtered or unexported fields
}

func (*TcpConn) ACK

func (c *TcpConn) ACK(params [][]byte) error

ACK 确认消息 ack <message_id> <topic> <bind_key>\n

func (*TcpConn) DEAD

func (c *TcpConn) DEAD(params [][]byte) error

DEAD 死信队列消费 dead <topic_name> <bind_key>\n

func (*TcpConn) DECLAREQUEUE

func (c *TcpConn) DECLAREQUEUE(params [][]byte) error

DECLAREQUEUE declare queue queue <topic_name> <bind_key>\n

func (*TcpConn) Handle

func (c *TcpConn) Handle()

Handle <cmd_name> <param_1> ... <param_n>\n

func (*TcpConn) MPUB

func (c *TcpConn) MPUB(params [][]byte) error

MPUB <topic_name> <num>\n <msg.len> <[]byte({"delay":1,"body":"xxx","topic":"xxx","routeKey":"xxx"})> <msg.len> <[]byte({"delay":1,"body":"xxx","topic":"xxx","routeKey":"xxx"})>

func (*TcpConn) PING

func (c *TcpConn) PING() error

func (*TcpConn) POP

func (c *TcpConn) POP(params [][]byte) error

POP 消费消息 pop <topic_name> <bind_key>\n

func (*TcpConn) PUB

func (c *TcpConn) PUB(params [][]byte) error

PUB <topic_name> <route_key> <delay-time>\n [ 4-byte size in bytes ][ N-byte binary data ]

func (*TcpConn) PUBLISH

func (c *TcpConn) PUBLISH(params [][]byte) error

PUBLISH message to channel publish <channel_name>\n <message_len> <message>

func (*TcpConn) RespErr

func (c *TcpConn) RespErr(err error) error

func (*TcpConn) RespMsg

func (c *TcpConn) RespMsg(msg *Msg) error

func (*TcpConn) RespOk

func (c *TcpConn) RespOk() error

func (*TcpConn) RespPing

func (c *TcpConn) RespPing() error

func (*TcpConn) RespRes

func (c *TcpConn) RespRes(res []byte) error

func (*TcpConn) SET

func (c *TcpConn) SET(params [][]byte) error

SET topic isAutoAck mode msgTTR msgRetry

func (*TcpConn) SUBSCRIBE

func (c *TcpConn) SUBSCRIBE(params [][]byte) error

SUBSCRIBE subscribe channel subscribe <channel_name> \n

func (*TcpConn) Send

func (c *TcpConn) Send(respType int16, respData []byte) error

type TcpServ

type TcpServ struct {
	// contains filtered or unexported fields
}

func NewTcpServ

func NewTcpServ(cfg *config.Config) *TcpServ

func (*TcpServ) Run

func (s *TcpServ) Run()

type Topic

type Topic struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewTopic

func NewTopic(name string, cfg *config.Config) *Topic

func (*Topic) Serialize

func (t *Topic) Serialize() error

type TopicMeta

type TopicMeta struct {
	MsgTTR    int  `json:"msg_ttr"`
	MsgRetry  int  `json:"msg_retry"`
	Mode      int  `json:"mode"`
	IsAutoAck bool `json:"is_auto_ack"`

	PopNum  int64 `json:"pop_num"`
	PushNum int64 `json:"push_num"`
	DeadNum int64 `json:"dead_num"`

	Queues     []QueueMeta `json:"queues"`
	DeadQueues []QueueMeta `json:"dead_queues"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL