common

package
v0.0.0-...-e775907 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 15, 2020 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// redis key 前缀
	RedisKeyPrefix = "dmq"
	// 自增ID的key
	IdIncrKey = "message:id:generate:incr"

	MessageStatusWaiting = 1 //默认状态
	MessageStatusDoing   = 2 //正在消费
	MessageStatusDone    = 3 //消费完
	MessageStatusFailed  = 4 //消费失败
)
View Source
const (
	ErrorCodeDefault           = 100
	ErrorCodeParseParamsFailed = ErrorCodeDefault + iota
	ErrorCodeValidateFailed
	ErrorCodeUnknownProduct
	ErrorCodeUnknownCommand
	ErrorCodeJsonMarshal
	ErrorCodeRedisSave
	ErrorCodeFoundBucketsFailed
	ErrorCodeFoundPointFailed
	ErrorCodeRemoveBucketsFailed
	ErrorCodeResponseCodeNot200
	ErrorCodePreRequestFailed
	ErrorCodeRequestFailed
	ErrorCodeGetStatusFailed
	ErrorCodeGetMessageFailed
	ErrorCodeRedisLoadLuaFailed
	ErrorCodeGetPendingFailed
)
View Source
const (
	// 获取时间点的lua脚本
	GetTimePointLuaScript = `` /* 365-byte string literal not displayed */

	// 保存消息的lua脚本
	SaveMessageLuaScript = `` /* 1026-byte string literal not displayed */

)

Variables

View Source
var (
	RedisCli        *redis.Client
	GetTimePointSha string
	SaveMessageSha  string
)
View Source
var (
	//全局变量 Config配置信息
	Config baseConfig
)

Functions

func AutoClearExpirePending

func AutoClearExpirePending()

定时任务清除已经被删除的未处理的消息ID

func AutoSplitLog

func AutoSplitLog(logType string)

自动切割Log

func ClearConsumerPending

func ClearConsumerPending(consumer string)

清除过期的未处理的消息

func CopyFile

func CopyFile(src, dst string) (int64, error)

拷贝文件

func ExitWithNotice

func ExitWithNotice(notice Notice)

输出错误信息并退出

func FileExists

func FileExists(filePath string) bool

判断文件/目录是否存在

func GenerateId

func GenerateId(i int64, number int64, count int64) uint64

生成ID

func GetBucketName

func GetBucketName(bucket string, pointName string) string

获取消息bucket名字

func GetClientIP

func GetClientIP(r *http.Request) string

获取客户端IP

func GetConfigCmdKey

func GetConfigCmdKey(cmd string) string

获取配置信息的cmd map key

func GetConsumerFullUrl

func GetConsumerFullUrl(host string, path string, msgId uint64) string

获取消费者下游接口

func GetIdBaseNumber

func GetIdBaseNumber(count int64) int64

获取生成ID的基数

func GetMessageDetailKey

func GetMessageDetailKey(msgId uint64) string

消息是否存在的hash key

func GetMessageIdListName

func GetMessageIdListName(bucketName string) string

获取储存消息的list名字

func GetMessagePendingKey

func GetMessagePendingKey(consumer string) string

未处理的消息列表key

func GetMessageStatusHashField

func GetMessageStatusHashField(consumerName string) string

获取消息状态的hash field key

func GetMessageStatusHashName

func GetMessageStatusHashName(id uint64) string

获取消息状态的hash key 小时区分

func GetPointGroup

func GetPointGroup(project string) string

获取时间点分组的key 按照项目分组

func GetPointName

func GetPointName(project string, timestamp uint64) string

获取消息触发时间点名字

func GetYamlConfig

func GetYamlConfig(configFile string, mc interface{}) error

获取配置信息

func LoadLuaScript

func LoadLuaScript() error

提前加载lua script到redis里

func RecordError

func RecordError(err error)

记录错误信息

func StringHash

func StringHash(data string) string

获取字符串hash

Types

type ConsumerConfig

type ConsumerConfig struct {
	Name       string `yaml:"name"`
	Host       string `yaml:"host"`
	Path       string `yaml:"path"`
	Timeout    uint   `yaml:"timeout"`
	RetryTimes uint   `yaml:"retry_times"`
	Interval   uint   `yaml:"interval"`
}

消费者配置

type Message

type Message struct {
	Id         uint64 `json:"id"`          // 消息ID
	Cmd        string `json:"cmd"`         // command
	Timestamp  uint64 `json:"timestamp"`   // 执行时间
	Params     string `json:"params"`      // 命令参数
	Project    string `json:"project"`     // 项目
	Bucket     string `json:"bucket"`      // 消息桶
	CreateTime uint64 `json:"create_time"` // 创建时间
	RequestId  string `json:"request_id"`  // 请求ID
}

func (*Message) GetBucketMessages

func (m *Message) GetBucketMessages(bucket string) []Message

从redis获取bucket对应的任务

func (*Message) GetMessageDetail

func (m *Message) GetMessageDetail() error

获取消息详情

func (*Message) GetPendingMessageIdList

func (m *Message) GetPendingMessageIdList(consumer string, start string, end string) (map[string]interface{}, error)

查看没有消费的消息IdList

func (*Message) GetPointBuckets

func (m *Message) GetPointBuckets(point string) ([]string, error)

获取时间点的buckets

func (*Message) GetTimePoint

func (m *Message) GetTimePoint() (string, error)

获取最近的时间点并删除 lua script 保证原子性

func (*Message) Save

func (m *Message) Save() error

保存消息

func (*Message) SetMessageStatus

func (m *Message) SetMessageStatus(consumerName string, status int, removeFromPending bool) bool

设置消息消费状态

func (*Message) Status

func (m *Message) Status(consumerName string) (string, error)

获取消息消费状态

type Notice

type Notice struct {
	// contains filtered or unexported fields
}

func ThrowNotice

func ThrowNotice(code int, err error) Notice

func (Notice) Code

func (n Notice) Code() int

func (Notice) Error

func (n Notice) Error() string

type ProductConfig

type ProductConfig struct {
	Project string   `yaml:"project"`
	AllowIp []string `yaml:"allow_ip"`
}

生产者

type ThrowAble

type ThrowAble interface {
	error
	Code() int
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL