server

package
v1.0.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 18, 2023 License: Apache-2.0 Imports: 52 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// EventMsgOffline 离线消息
	EventMsgOffline = "msg.offline"
	// EventMsgNotify 消息通知(将所有消息通知到第三方程序)
	EventMsgNotify = "msg.notify"
	// EventOnlineStatus 用户在线状态
	EventOnlineStatus = "user.onlinestatus"
)

Variables

View Source
var (
	ErrChannelNotFound = fmt.Errorf("channel not found")
	ErrParamInvalid    = fmt.Errorf("param invalid")
)
View Source
var (
	VERSION = "4.0.0" // 服务器版本
)

Functions

func GetCommunityTopicParentChannelID

func GetCommunityTopicParentChannelID(channelID string) string

GetCommunityTopicParentChannelID 获取社区话题频道的父频道ID

func GetFakeChannelIDWith

func GetFakeChannelIDWith(fromUID, toUID string) string

GetFakeChannelIDWith GetFakeChannelIDWith

func MarshalMessage

func MarshalMessage(version uint8, m *Message) []byte

MarshalMessage MarshalMessage

func UnmarshalMessage

func UnmarshalMessage(data []byte, m *Message) error

UnmarshalMessage UnmarshalMessage

Types

type APIServer

type APIServer struct {
	wklog.Log
	// contains filtered or unexported fields
}

APIServer ApiServer

func NewAPIServer

func NewAPIServer(s *Server) *APIServer

NewAPIServer new一个api server

func (*APIServer) Start

func (s *APIServer) Start()

Start 开始

func (*APIServer) Stop

func (s *APIServer) Stop()

Stop 停止服务

type Channel

type Channel struct {
	*wkstore.ChannelInfo

	wklog.Log
	// contains filtered or unexported fields
}

func NewChannel

func NewChannel(channelInfo *wkstore.ChannelInfo, s *Server) *Channel

NewChannel NewChannel

func (*Channel) AddAllowlist

func (c *Channel) AddAllowlist(uids []string)

AddAllowlist 添加白名单

func (*Channel) AddDenylist

func (c *Channel) AddDenylist(uids []string)

AddDenylist 添加黑名单

func (*Channel) AddSubscriber

func (c *Channel) AddSubscriber(uid string)

AddSubscriber Add subscribers

func (*Channel) AddSubscribers

func (c *Channel) AddSubscribers(uids []string)

func (*Channel) AddTmpSubscriber

func (c *Channel) AddTmpSubscriber(uid string)

func (*Channel) AddTmpSubscribers

func (c *Channel) AddTmpSubscribers(uids []string)

func (*Channel) Allow

func (c *Channel) Allow(uid string) (bool, wkproto.ReasonCode)

Allow Whether to allow sending of messages If it is in the white list or not in the black list, it is allowed to send

func (*Channel) GetAllSubscribers

func (c *Channel) GetAllSubscribers() []string

GetAllSubscribers 获取所有订阅者

func (*Channel) GetAllTmpSubscribers

func (c *Channel) GetAllTmpSubscribers() []string

func (*Channel) IsDenylist

func (c *Channel) IsDenylist(uid string) bool

IsDenylist 是否在黑名单内

func (*Channel) IsSubscriber

func (c *Channel) IsSubscriber(uid string) bool

IsSubscriber 是否已订阅

func (*Channel) IsTmpSubscriber

func (c *Channel) IsTmpSubscriber(uid string) bool

IsTmpSubscriber 是否是临时订阅者

func (*Channel) LoadData

func (c *Channel) LoadData() error

LoadData load data

func (*Channel) Put

func (c *Channel) Put(messages []*Message, customSubscribers []string, fromUID string, fromDeviceFlag wkproto.DeviceFlag, fromDeviceID string) error

func (*Channel) RealSubscribers

func (c *Channel) RealSubscribers(customSubscribers []string) ([]string, error)

real subscribers

func (*Channel) RemoveAllSubscriber

func (c *Channel) RemoveAllSubscriber()

RemoveAllSubscriber 移除所有订阅者

func (*Channel) RemoveAllTmpSubscriber

func (c *Channel) RemoveAllTmpSubscriber()

RemoveAllTmpSubscriber 移除所有临时订阅者

func (*Channel) RemoveAllowlist

func (c *Channel) RemoveAllowlist(uids []string)

RemoveAllowlist 移除白名单

func (*Channel) RemoveDenylist

func (c *Channel) RemoveDenylist(uids []string)

RemoveDenylist 移除黑名单

func (*Channel) RemoveSubscriber

func (c *Channel) RemoveSubscriber(uid string)

RemoveSubscriber 移除订阅者

func (*Channel) RemoveSubscribers

func (c *Channel) RemoveSubscribers(uids []string)

func (*Channel) RemoveTmSubscriber

func (c *Channel) RemoveTmSubscriber(uid string)

func (*Channel) RemoveTmpSubscribers

func (c *Channel) RemoveTmpSubscribers(uids []string)

func (*Channel) SetAllowlist

func (c *Channel) SetAllowlist(uids []string)

SetAllowlist SetAllowlist

func (*Channel) SetDenylist

func (c *Channel) SetDenylist(uids []string)

SetDenylist SetDenylist

type ChannelAPI

type ChannelAPI struct {
	wklog.Log
	// contains filtered or unexported fields
}

ChannelAPI ChannelAPI

func NewChannelAPI

func NewChannelAPI(s *Server) *ChannelAPI

NewChannelAPI 创建API

func (*ChannelAPI) Route

func (ch *ChannelAPI) Route(r *wkhttp.WKHttp)

Route Route

type ChannelCreateReq

type ChannelCreateReq struct {
	ChannelInfoReq
	Subscribers []string `json:"subscribers"` // 订阅者
}

ChannelCreateReq 频道创建请求

func (ChannelCreateReq) Check

func (r ChannelCreateReq) Check() error

Check 检查请求参数

type ChannelDeleteReq

type ChannelDeleteReq struct {
	ChannelID   string `json:"channel_id"`   // 频道ID
	ChannelType uint8  `json:"channel_type"` // 频道类型
}

ChannelDeleteReq 删除频道请求

type ChannelInfoReq

type ChannelInfoReq struct {
	ChannelID   string `json:"channel_id"`   // 频道ID
	ChannelType uint8  `json:"channel_type"` // 频道类型
	Large       int    `json:"large"`        // 是否是超大群
	Ban         int    `json:"ban"`          // 是否封禁频道(封禁后此频道所有人都将不能发消息,除了系统账号)
}

ChannelInfoReq ChannelInfoReq

func (ChannelInfoReq) ToChannelInfo

func (c ChannelInfoReq) ToChannelInfo() *wkstore.ChannelInfo

type ChannelInfoResp

type ChannelInfoResp struct {
	Large int `json:"large"` // 是否是超大群
	Ban   int `json:"ban"`   // 是否封禁频道(封禁后此频道所有人都将不能发消息,除了系统账号)
}

func (ChannelInfoResp) ToChannelInfo

func (c ChannelInfoResp) ToChannelInfo() *wkstore.ChannelInfo

type ChannelManager

type ChannelManager struct {
	wklog.Log
	// contains filtered or unexported fields
}

ChannelManager 频道管理

func NewChannelManager

func NewChannelManager(s *Server) *ChannelManager

NewChannelManager 创建一个频道管理者

func (*ChannelManager) CreateOrUpdatePersonChannel

func (cm *ChannelManager) CreateOrUpdatePersonChannel(uid string) error

CreateOrUpdatePersonChannel 创建或更新个人频道

func (*ChannelManager) CreateTmpChannel

func (cm *ChannelManager) CreateTmpChannel(channelID string, channelType uint8, subscribers []string) error

CreateTmpChannel 创建临时频道

func (*ChannelManager) DeleteChannel

func (cm *ChannelManager) DeleteChannel(channelID string, channelType uint8) error

DeleteChannel 删除频道

func (*ChannelManager) DeleteChannelFromCache

func (cm *ChannelManager) DeleteChannelFromCache(channelID string, channelType uint8)

DeleteChannelFromCache DeleteChannelFromCache

func (*ChannelManager) GetChannel

func (cm *ChannelManager) GetChannel(channelID string, channelType uint8) (*Channel, error)

GetChannel 获取频道

func (*ChannelManager) GetPersonChannel

func (cm *ChannelManager) GetPersonChannel(channelID string, channelType uint8) (*Channel, error)

GetPersonChannel 创建临时频道

func (*ChannelManager) GetTmpChannel

func (cm *ChannelManager) GetTmpChannel(channelID string, channelType uint8) (*Channel, error)

GetTmpChannel 获取临时频道

type ConnInfo

type ConnInfo struct {
	ID           int64     `json:"id"`            // 连接ID
	UID          string    `json:"uid"`           // 用户uid
	IP           string    `json:"ip"`            // 客户端IP
	Port         int       `json:"port"`          // 客户端端口
	LastActivity time.Time `json:"last_activity"` // 最后一次活动时间
	Uptime       string    `json:"uptime"`        // 启动时间
	Idle         string    `json:"idle"`          // 客户端闲置时间
	PendingBytes int       `json:"pending_bytes"` // 等待发送的字节数
	InMsgs       int64     `json:"in_msgs"`       // 流入的消息数
	OutMsgs      int64     `json:"out_msgs"`      // 流出的消息数量
	InBytes      int64     `json:"in_bytes"`      // 流入的字节数量
	OutBytes     int64     `json:"out_bytes"`     // 流出的字节数量
	Device       string    `json:"device"`        // 设备
	DeviceID     string    `json:"device_id"`     // 设备ID
	Version      uint8     `json:"version"`       // 客户端协议版本
}

type ConnManager

type ConnManager struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewConnManager

func NewConnManager(s *Server) *ConnManager

func (*ConnManager) AddConn

func (c *ConnManager) AddConn(conn wknet.Conn)

func (*ConnManager) ExistConnsWithUID

func (c *ConnManager) ExistConnsWithUID(uid string) bool

func (*ConnManager) GetConn

func (c *ConnManager) GetConn(id int64) wknet.Conn

func (*ConnManager) GetConnCountWith

func (c *ConnManager) GetConnCountWith(uid string, deviceFlag wkproto.DeviceFlag) (int, int)

GetConnCountWith 获取设备的在线数量和用户所有设备的在线数量

func (*ConnManager) GetConnsWith

func (c *ConnManager) GetConnsWith(uid string, deviceFlag wkproto.DeviceFlag) []wknet.Conn

func (*ConnManager) GetConnsWithUID

func (c *ConnManager) GetConnsWithUID(uid string) []wknet.Conn

func (*ConnManager) GetOnlineConns

func (c *ConnManager) GetOnlineConns(uids []string) []wknet.Conn

GetOnlineConns 传一批uids 返回在线的uids

func (*ConnManager) RemoveConn

func (c *ConnManager) RemoveConn(conn wknet.Conn)

func (*ConnManager) RemoveConnWithID

func (c *ConnManager) RemoveConnWithID(id int64)

type Connz

type Connz struct {
	Connections []*ConnInfo `json:"connections"` // 连接数
	Now         time.Time   `json:"now"`         // 查询时间
	Total       int         `json:"total"`       // 总连接数量
	Offset      int         `json:"offset"`      // 偏移位置
	Limit       int         `json:"limit"`       // 限制数量
}

type ConnzAPI

type ConnzAPI struct {
	wklog.Log
	// contains filtered or unexported fields
}

func NewConnzAPI

func NewConnzAPI(s *Server) *ConnzAPI

func (*ConnzAPI) HandleConnz

func (co *ConnzAPI) HandleConnz(c *wkhttp.Context)

func (*ConnzAPI) Route

func (co *ConnzAPI) Route(r *wkhttp.WKHttp)

type ConversationAPI

type ConversationAPI struct {
	wklog.Log
	// contains filtered or unexported fields
}

ConversationAPI ConversationAPI

func NewConversationAPI

func NewConversationAPI(s *Server) *ConversationAPI

NewConversationAPI NewConversationAPI

func (*ConversationAPI) Route

func (s *ConversationAPI) Route(r *wkhttp.WKHttp)

Route 路由

type ConversationManager

type ConversationManager struct {
	wklog.Log
	// contains filtered or unexported fields
}

ConversationManager ConversationManager

func NewConversationManager

func NewConversationManager(s *Server) *ConversationManager

NewConversationManager NewConversationManager

func (*ConversationManager) AddOrUpdateConversation

func (cm *ConversationManager) AddOrUpdateConversation(uid string, conversation *wkstore.Conversation)

func (*ConversationManager) DeleteConversation

func (cm *ConversationManager) DeleteConversation(uids []string, channelID string, channelType uint8) error

DeleteConversation 删除最近会话

func (*ConversationManager) FlushConversations

func (cm *ConversationManager) FlushConversations()

FlushConversations 同步最近会话

func (*ConversationManager) GetConversation

func (cm *ConversationManager) GetConversation(uid string, channelID string, channelType uint8) *wkstore.Conversation

func (*ConversationManager) GetConversations

func (cm *ConversationManager) GetConversations(uid string, version int64, larges []*wkproto.Channel) []*wkstore.Conversation

GetConversations GetConversations

func (*ConversationManager) PushMessage

func (cm *ConversationManager) PushMessage(message *Message, subscribers []string)

PushMessage PushMessage

func (*ConversationManager) SetConversationUnread

func (cm *ConversationManager) SetConversationUnread(uid string, channelID string, channelType uint8, unread int, messageSeq uint32) error

SetConversationUnread set unread data from conversation

func (*ConversationManager) Start

func (cm *ConversationManager) Start()

Start Start

func (*ConversationManager) Stop

func (cm *ConversationManager) Stop()

Stop Stop

type Datasource

type Datasource struct {
	// contains filtered or unexported fields
}

Datasource Datasource

func (*Datasource) GetBlacklist

func (d *Datasource) GetBlacklist(channelID string, channelType uint8) ([]string, error)

GetBlacklist 获取频道的黑名单

func (*Datasource) GetChannelInfo

func (d *Datasource) GetChannelInfo(channelID string, channelType uint8) (*wkstore.ChannelInfo, error)

func (*Datasource) GetSubscribers

func (d *Datasource) GetSubscribers(channelID string, channelType uint8) ([]string, error)

GetSubscribers 获取频道的订阅者

func (*Datasource) GetSystemUIDs

func (d *Datasource) GetSystemUIDs() ([]string, error)

GetSystemUIDs 获取系统账号

func (*Datasource) GetWhitelist

func (d *Datasource) GetWhitelist(channelID string, channelType uint8) ([]string, error)

GetWhitelist 获取频道的白明单

type DeliveryManager

type DeliveryManager struct {
	wklog.Log
	// contains filtered or unexported fields
}

func NewDeliveryManager

func NewDeliveryManager(s *Server) *DeliveryManager

type Dispatch

type Dispatch struct {
	wklog.Log
	// contains filtered or unexported fields
}

func NewDispatch

func NewDispatch(s *Server) *Dispatch

func (*Dispatch) Start

func (d *Dispatch) Start() error

func (*Dispatch) Stop

func (d *Dispatch) Stop() error

type Event

type Event struct {
	Event string      `json:"event"` // 事件标示
	Data  interface{} `json:"data"`  // 事件数据
}

Event Event

func (*Event) String

func (e *Event) String() string

type FrameWorkPool

type FrameWorkPool struct {
}

func NewFrameWorkPool

func NewFrameWorkPool() *FrameWorkPool

func (*FrameWorkPool) Submit

func (f *FrameWorkPool) Submit(task func())

type IDatasource

type IDatasource interface {
	// 获取订阅者
	GetSubscribers(channelID string, channelType uint8) ([]string, error)
	// 获取黑名单
	GetBlacklist(channelID string, channelType uint8) ([]string, error)
	// 获取白名单
	GetWhitelist(channelID string, channelType uint8) ([]string, error)
	// 获取系统账号的uid集合 系统账号可以给任何人发消息
	GetSystemUIDs() ([]string, error)
	// 获取频道信息
	GetChannelInfo(channelID string, channelType uint8) (*wkstore.ChannelInfo, error)
}

IDatasource 数据源第三方应用可以提供

func NewDatasource

func NewDatasource(s *Server) IDatasource

NewDatasource 创建一个数据源

type Message

type Message struct {
	*wkproto.RecvPacket
	ToUID       string   // 接受者
	Subscribers []string // 订阅者 如果此字段有值 则表示消息只发送给指定的订阅者
	// contains filtered or unexported fields
}

func (*Message) Decode

func (m *Message) Decode(msg []byte) error

func (*Message) DeepCopy

func (m *Message) DeepCopy() (*Message, error)

func (*Message) Encode

func (m *Message) Encode() []byte

func (*Message) GetMessageID

func (m *Message) GetMessageID() int64

func (*Message) GetSeq

func (m *Message) GetSeq() uint32

func (*Message) SetSeq

func (m *Message) SetSeq(seq uint32)

type MessageAPI

type MessageAPI struct {
	wklog.Log
	// contains filtered or unexported fields
}

MessageAPI MessageAPI

func NewMessageAPI

func NewMessageAPI(s *Server) *MessageAPI

NewMessageAPI NewMessageAPI

func (*MessageAPI) Route

func (m *MessageAPI) Route(r *wkhttp.WKHttp)

Route route

type MessageHeader

type MessageHeader struct {
	NoPersist int `json:"no_persist"` // Is it not persistent
	RedDot    int `json:"red_dot"`    // Whether to show red dot
	SyncOnce  int `json:"sync_once"`  // This message is only synchronized or consumed once
}

MessageHeader Message header

type MessageOfflineNotify

type MessageOfflineNotify struct {
	MessageResp
	ToUIDs          []string `json:"to_uids"`
	Compress        string   `json:"compress,omitempty"`         // 压缩ToUIDs 如果为空 表示不压缩 为gzip则采用gzip压缩
	CompresssToUIDs []byte   `json:"compress_to_uids,omitempty"` // 已压缩的to_uids
	SourceID        int64    `json:"source_id,omitempty"`        // 来源节点ID
}

type MessageResp

type MessageResp struct {
	Header       MessageHeader `json:"header"`        // 消息头
	Setting      uint8         `json:"setting"`       // 设置
	MessageID    int64         `json:"message_id"`    // 服务端的消息ID(全局唯一)
	MessageIDStr string        `json:"message_idstr"` // 服务端的消息ID(全局唯一)
	ClientMsgNo  string        `json:"client_msg_no"` // 客户端消息唯一编号
	MessageSeq   uint32        `json:"message_seq"`   // 消息序列号 (用户唯一,有序递增)
	FromUID      string        `json:"from_uid"`      // 发送者UID
	ChannelID    string        `json:"channel_id"`    // 频道ID
	ChannelType  uint8         `json:"channel_type"`  // 频道类型
	Topic        string        `json:"topic"`         // 话题ID
	Timestamp    int32         `json:"timestamp"`     // 服务器消息时间戳(10位,到秒)
	Payload      []byte        `json:"payload"`       // 消息内容
}

MessageResp 消息返回

type MessageRespSlice

type MessageRespSlice []*MessageResp

MessageRespSlice MessageRespSlice

func (MessageRespSlice) Len

func (m MessageRespSlice) Len() int

func (MessageRespSlice) Less

func (m MessageRespSlice) Less(i, j int) bool

func (MessageRespSlice) Swap

func (m MessageRespSlice) Swap(i, j int)

type MessageSendReq

type MessageSendReq struct {
	Header      MessageHeader `json:"header"`        // 消息头
	ClientMsgNo string        `json:"client_msg_no"` // 客户端消息编号(相同编号,客户端只会显示一条)
	FromUID     string        `json:"from_uid"`      // 发送者UID
	ChannelID   string        `json:"channel_id"`    // 频道ID
	ChannelType uint8         `json:"channel_type"`  // 频道类型
	Subscribers []string      `json:"subscribers"`   // 订阅者 如果此字段有值,表示消息只发给指定的订阅者
	Payload     []byte        `json:"payload"`       // 消息内容
}

MessageSendReq 消息发送请求

func (MessageSendReq) Check

func (m MessageSendReq) Check() error

Check 检查输入

type Mode

type Mode string
const (
	//debug 模式
	DebugMode Mode = "debug"
	// 正式模式
	ReleaseMode Mode = "release"
	// 压力测试模式
	BenchMode Mode = "bench"
	// TestMode indicates gin mode is test.
	TestMode = "test"
)

type MonitorAPI

type MonitorAPI struct {
	wklog.Log
	// contains filtered or unexported fields
}

func NewMonitorAPI

func NewMonitorAPI(s *Server) *MonitorAPI

NewMonitorAPI NewMonitorAPI

func (*MonitorAPI) Route

func (m *MonitorAPI) Route(r *wkhttp.WKHttp)

Route 用户相关路由配置

type MonitorServer

type MonitorServer struct {
	wklog.Log
	// contains filtered or unexported fields
}

func NewMonitorServer

func NewMonitorServer(s *Server) *MonitorServer

func (*MonitorServer) Start

func (m *MonitorServer) Start()

func (*MonitorServer) Stop

func (m *MonitorServer) Stop() error

type OnlinestatusResp

type OnlinestatusResp struct {
	UID        string `json:"uid"`         // 在线用户uid
	DeviceFlag uint8  `json:"device_flag"` // 设备标记 0. APP 1.web
	Online     int    `json:"online"`      // 是否在线
}

type Options

type Options struct {
	ID       int64  // 节点ID
	Mode     Mode   // 模式 debug 测试 release 正式 bench 压力测试
	HTTPAddr string // http api的监听地址 默认为 0.0.0.0:5001
	Addr     string // tcp监听地址 例如:tcp://0.0.0.0:5100
	RootDir  string // 根目录
	DataDir  string // 数据目录
	GinMode  string // gin框架的模式
	WSAddr   string // websocket 监听地址 例如:ws://0.0.0.0:5200 或wss://0.0.0.0:5200
	Logger   struct {
		Dir     string // 日志存储目录
		Level   zapcore.Level
		LineNum bool // 是否显示代码行数
	}
	Monitor struct {
		On   bool   // 是否开启监控
		Addr string // 监控地址 默认为 0.0.0.0:5300
	}
	External struct {
		IP          string // 外网IP 如果没配置将通过ifconfig.io获取
		TCPAddr     string // 节点的TCP地址 对外公开,APP端长连接通讯  格式: ip:port
		WSSAddr     string //  节点的wssAdd地址 对外公开 WEB端长连接通讯 格式: proto://ip:port
		MonitorAddr string // 对外访问的监控地址
	}
	Channel struct {
		CacheCount                int  // 频道缓存数量
		CreateIfNoExist           bool // 如果频道不存在是否创建
		SubscriberCompressOfCount int  // 订订阅者数组多大开始压缩(离线推送的时候订阅者数组太大 可以设置此参数进行压缩 默认为0 表示不压缩 )

	}
	TmpChannel struct {
		Suffix     string // 临时频道的后缀
		CacheCount int    // 临时频道缓存数量
	}
	Webhook struct {
		HTTPAddr                    string        // webhook的http地址 通过此地址通知数据给第三方 格式为 http://xxxxx
		GRPCAddr                    string        //  webhook的grpc地址 如果此地址有值 则不会再调用HttpAddr配置的地址,格式为 ip:port
		MsgNotifyEventPushInterval  time.Duration // 消息通知事件推送间隔,默认500毫秒发起一次推送
		MsgNotifyEventCountPerPush  int           // 每次webhook消息通知事件推送消息数量限制 默认一次请求最多推送100条
		MsgNotifyEventRetryMaxCount int           // 消息通知事件消息推送失败最大重试次数 默认为5次,超过将丢弃
	}
	Datasource struct {
		Addr          string // 数据源地址
		ChannelInfoOn bool   // 是否开启频道信息获取
	}
	Conversation struct {
		On           bool          // 是否开启最近会话
		CacheExpire  time.Duration // 最近会话缓存过期时间
		SyncInterval time.Duration // 最近会话同步间隔
		SyncOnce     int           //  当多少最近会话数量发送变化就保存一次
		UserMaxCount int           // 每个用户最大最近会话数量 默认为500
	}

	Proto wkproto.Protocol // 悟空IM protocol

	Version string

	UnitTest       bool // 是否开启单元测试
	HandlePoolSize int

	ConnIdleTime    time.Duration // 连接空闲时间 超过此时间没数据传输将关闭
	TimingWheelTick time.Duration // The time-round training interval must be 1ms or more
	TimingWheelSize int64         // Time wheel size

	UserMsgQueueMaxSize int // 用户消息队列最大大小,超过此大小此用户将被限速,0为不限制

	TokenAuthOn bool // 是否开启token验证 不配置将根据mode属性判断 debug模式下默认为false release模式为true

	EventPoolSize int // 事件协程池大小,此池主要处理im的一些通知事件 比如webhook,上下线等等 默认为1024

	WhitelistOffOfPerson int
	DeliveryMsgPoolSize  int // 投递消息协程池大小,此池的协程主要用来将消息投递给在线用户 默认大小为 10240

	MessageRetry struct {
		Interval     time.Duration // 消息重试间隔,如果消息发送后在此间隔内没有收到ack,将会在此间隔后重新发送
		MaxCount     int           // 消息最大重试次数
		ScanInterval time.Duration //  每隔多久扫描一次超时队列,看超时队列里是否有需要重试的消息
	}

	SlotNum int // 槽数量
	// contains filtered or unexported fields
}

func NewOptions

func NewOptions() *Options

func NewServerOptions

func NewServerOptions() *Options

NewServerOptions NewServerOptions

func NewTestOptions

func NewTestOptions(logLevel ...zapcore.Level) *Options

func (*Options) ConfigureWithViper

func (o *Options) ConfigureWithViper(vp *viper.Viper)

func (*Options) GetCustomerServiceVisitorUID

func (o *Options) GetCustomerServiceVisitorUID(channelID string) (string, bool)

获取客服频道的访客id

func (*Options) HasDatasource

func (o *Options) HasDatasource() bool

HasDatasource 是否有配置数据源

func (*Options) IsFakeChannel

func (o *Options) IsFakeChannel(channelID string) bool

IsFakeChannel 是fake频道

func (*Options) IsTmpChannel

func (o *Options) IsTmpChannel(channelID string) bool

IsTmpChannel 是否是临时频道

func (*Options) WebhookGRPCOn

func (o *Options) WebhookGRPCOn() bool

WebhookGRPCOn 是否配置了webhook grpc地址

func (*Options) WebhookOn

func (o *Options) WebhookOn() bool

WebhookOn WebhookOn

type Processor

type Processor struct {
	wklog.Log
	// contains filtered or unexported fields
}

func NewProcessor

func NewProcessor(s *Server) *Processor

type PullMode

type PullMode int // 拉取模式
const (
	PullModeDown PullMode = iota // 向下拉取
	PullModeUp                   // 向上拉取
)

type Queue

type Queue struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Queue Queue

func NewQueue

func NewQueue() *Queue

NewQueue 创建队列

func (*Queue) Close

func (e *Queue) Close()

Close Close

func (*Queue) Len

func (e *Queue) Len() int

Len 获取队列长度

func (*Queue) Pop

func (e *Queue) Pop() (v interface{})

Pop 取出队列,(阻塞模式)

func (*Queue) Push

func (e *Queue) Push(v interface{})

Push Push

func (*Queue) TryPop

func (e *Queue) TryPop() (v interface{}, ok bool)

TryPop 试着取出队列(非阻塞模式)返回ok == false 表示空

func (*Queue) Wait

func (e *Queue) Wait()

Wait 等待队列消费完成

type RetryQueue

type RetryQueue struct {
	// contains filtered or unexported fields
}

RetryQueue 重试队列

func NewRetryQueue

func NewRetryQueue(s *Server) *RetryQueue

NewRetryQueue NewRetryQueue

func (*RetryQueue) Start

func (r *RetryQueue) Start()

Start 开始运行重试

func (*RetryQueue) Stop

func (r *RetryQueue) Stop()

type RouteAPI

type RouteAPI struct {
	wklog.Log
	// contains filtered or unexported fields
}

这个主要为了模拟proxy模式。

func NewRouteAPI

func NewRouteAPI(s *Server) *RouteAPI

NewRouteAPI NewRouteAPI

func (*RouteAPI) Route

func (a *RouteAPI) Route(r *wkhttp.WKHttp)

Route Route

type Server

type Server struct {
	wklog.Log
	// contains filtered or unexported fields
}

func New

func New(opts *Options) *Server

func NewTestServer

func NewTestServer(ots ...*Options) *Server

NewTestServer NewTestServer

func (*Server) GetConnInfos added in v1.0.9

func (s *Server) GetConnInfos(sortOpt SortOpt, offset, limit int) []wknet.Conn

func (*Server) Init

func (s *Server) Init(env svc.Environment) error

func (*Server) Schedule

func (s *Server) Schedule(interval time.Duration, f func()) *timingwheel.Timer

Schedule 延迟任务

func (*Server) Start

func (s *Server) Start() error

func (*Server) Stop

func (s *Server) Stop() error

type SortOpt

type SortOpt string
const (
	ByID     SortOpt = "id"     // 通过连接id排序
	ByIDDesc SortOpt = "idDesc" // 通过连接id排序

	ByInMsg            SortOpt = "inMsg"            // 通过收到消息排序
	ByInMsgDesc        SortOpt = "inMsgDesc"        // 通过收到消息排序
	ByOutMsg           SortOpt = "outMsg"           // 通过发送消息排序
	ByOutMsgDesc       SortOpt = "outMsgDesc"       // 通过发送消息排序
	ByInBytes          SortOpt = "inBytes"          // 通过收到字节数排序
	ByInBytesDesc      SortOpt = "inBytesDesc"      // 通过收到字节数排序
	ByOutBytes         SortOpt = "outBytes"         // 通过发送字节数排序
	ByOutBytesDesc     SortOpt = "outBytesDesc"     // 通过发送字节数排序
	ByPendingBytes     SortOpt = "pendingBytes"     // 通过等待发送字节数排序
	ByPendingBytesDesc SortOpt = "pendingBytesDesc" // 通过等待发送字节数排序
	ByUptime           SortOpt = "uptime"           // 通过启动时间排序
	ByUptimeDesc       SortOpt = "uptimeDesc"       // 通过启动时间排序
)

type SystemUIDManager

type SystemUIDManager struct {
	// contains filtered or unexported fields
}

SystemUIDManager System uid management

func NewSystemUIDManager

func NewSystemUIDManager(s *Server) *SystemUIDManager

NewSystemUIDManager NewSystemUIDManager

func (*SystemUIDManager) AddSystemUIDs

func (s *SystemUIDManager) AddSystemUIDs(uids []string) error

AddSystemUID AddSystemUID

func (*SystemUIDManager) LoadIfNeed

func (s *SystemUIDManager) LoadIfNeed() error

LoadIfNeed LoadIfNeed

func (*SystemUIDManager) RemoveSystemUIDs

func (s *SystemUIDManager) RemoveSystemUIDs(uids []string) error

RemoveSystemUID RemoveSystemUID

func (*SystemUIDManager) SystemUID

func (s *SystemUIDManager) SystemUID(uid string) bool

SystemUID Is it a system account?

type TestConn

type TestConn struct {
	// contains filtered or unexported fields
}

func NewTestConn

func NewTestConn() *TestConn

func (*TestConn) Authed

func (t *TestConn) Authed() bool

func (*TestConn) Close

func (t *TestConn) Close() error

func (*TestConn) GetID

func (t *TestConn) GetID() uint32

func (*TestConn) OutboundBuffered

func (t *TestConn) OutboundBuffered() int

func (*TestConn) RemoteAddr

func (t *TestConn) RemoteAddr() net.Addr

func (*TestConn) SetAuthed

func (t *TestConn) SetAuthed(v bool)

func (*TestConn) SetID

func (t *TestConn) SetID(id uint32)

func (*TestConn) SetReadDeadline

func (t *TestConn) SetReadDeadline(tm time.Time) error

func (*TestConn) SetVersion

func (t *TestConn) SetVersion(version uint8)

func (*TestConn) SetWriteDeadline

func (t *TestConn) SetWriteDeadline(tm time.Time) error

func (*TestConn) Version

func (t *TestConn) Version() uint8

func (*TestConn) Write

func (t *TestConn) Write(buf []byte) (n int, err error)

func (*TestConn) WriteChan

func (t *TestConn) WriteChan() chan []byte

type UpdateTokenReq

type UpdateTokenReq struct {
	UID         string              `json:"uid"`          // 用户唯一uid
	Token       string              `json:"token"`        // 用户的token
	DeviceFlag  wkproto.DeviceFlag  `json:"device_flag"`  // 设备标识  0.app 1.web
	DeviceLevel wkproto.DeviceLevel `json:"device_level"` // 设备等级 0.为从设备 1.为主设备
}

UpdateTokenReq 更新token请求

func (UpdateTokenReq) Check

func (u UpdateTokenReq) Check() error

Check 检查输入

type UserAPI

type UserAPI struct {
	wklog.Log
	// contains filtered or unexported fields
}

UserAPI 用户相关API

func NewUserAPI

func NewUserAPI(s *Server) *UserAPI

NewUserAPI NewUserAPI

func (*UserAPI) Route

func (u *UserAPI) Route(r *wkhttp.WKHttp)

Route 用户相关路由配置

type Varz

type Varz struct {
	ServerID    string  `json:"server_id"`   // 服务端ID
	ServerName  string  `json:"server_name"` // 服务端名称
	Version     string  `json:"version"`     // 服务端版本
	Connections int     `json:"connections"` // 当前连接数量
	Uptime      string  `json:"uptime"`      // 上线时间
	Mem         int64   `json:"mem"`         // 内存
	CPU         float64 `json:"cpu"`         // cpu

	InMsgs      int64 `json:"in_msgs"`      // 流入消息数量
	OutMsgs     int64 `json:"out_msgs"`     // 流出消息数量
	InBytes     int64 `json:"in_bytes"`     // 流入字节数量
	OutBytes    int64 `json:"out_bytes"`    // 流出字节数量
	SlowClients int64 `json:"slow_clients"` // 慢客户端数量

	//
	TCPAddr     string `json:"tcp_addr"`     // tcp地址
	WSAddr      string `json:"ws_addr"`      // ws地址
	MonitorAddr string `json:"monitor_addr"` // 监控地址
	MonitorOn   int    `json:"monitor_on"`   // 监控是否开启
	Commit      string `json:"commit"`       // git commit id
	CommitDate  string `json:"commit_date"`  // git commit date
	TreeState   string `json:"tree_state"`   // git tree state

	Conns []*ConnInfo `json:"conns,omitempty"` // 连接信息

}

type VarzAPI

type VarzAPI struct {
	wklog.Log
	// contains filtered or unexported fields
}

func NewVarzAPI

func NewVarzAPI(s *Server) *VarzAPI

func (*VarzAPI) HandleVarz

func (v *VarzAPI) HandleVarz(c *wkhttp.Context)

func (*VarzAPI) Route

func (v *VarzAPI) Route(r *wkhttp.WKHttp)

type Webhook

type Webhook struct {
	wklog.Log
	// contains filtered or unexported fields
}

func NewWebhook

func NewWebhook(s *Server) *Webhook

func (*Webhook) Offline

func (w *Webhook) Offline(uid string, deviceFlag wkproto.DeviceFlag, id int64, onlineCount int, totalOnlineCount int)

Offline 用户离线 id 为用户在当前系统中的socket id left 为剩余在线数量

func (*Webhook) Online

func (w *Webhook) Online(uid string, deviceFlag wkproto.DeviceFlag, id int64, onlineCount int, totalOnlineCount int)

Online 用户在线

func (*Webhook) Start

func (w *Webhook) Start()

func (*Webhook) Stop

func (w *Webhook) Stop()

func (*Webhook) TriggerEvent

func (w *Webhook) TriggerEvent(event *Event)

TriggerEvent 触发事件

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL