comet

package
v0.0.0-...-9fc3494 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 5, 2019 License: MIT Imports: 26 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func InitTCP

func InitTCP(server *Server, addrs []string, accept int) (err error)

InitTCP listen all tcp.bind and start accept connections.

func InitWebsocket

func InitWebsocket(server *Server, addrs []string, accept int) (err error)

InitWebsocket listen all tcp.bind and start accept connections.

func InitWebsocketWithTLS

func InitWebsocketWithTLS(server *Server, addrs []string, certFile, privateFile string, accept int) (err error)

InitWebsocketWithTLS init websocket with tls.

func InitWhitelist

func InitWhitelist(c *conf.Whitelist) (err error)

InitWhitelist a whitelist struct.

Types

type Bucket

type Bucket struct {
	// contains filtered or unexported fields
}

func NewBucket

func NewBucket(c *conf.Bucket) (b *Bucket)

NewBucket new a bucket struct. store the key with im channel.

func (*Bucket) Broadcast

func (b *Bucket) Broadcast(p *grpc.Proto, op int32)

Broadcast push msgs to all channels in the bucket. 把消息 p 推送到对本 Bucket 内所有关注房间 op 的 channels 中。

func (*Bucket) BroadcastRoom

func (b *Bucket) BroadcastRoom(arg *grpc.BroadcastRoomReq)

BroadcastRoom broadcast a message to specified room

一个 Bucket 会开多个 goroutine 来并发的做单房间的广播消息推送,每个 goroutine 监听一个管道。 推送单房间广播消息时,可以随机发往任何管道,交由对应的 goroutine 来处理。 这里为了保证负载均衡,使用了 原子锁 + 递增 方式,本质就是轮循选择一个 goroutine 。

这里 BroadcastRoom() 函数就是简单选择一个管道,然后把消息发给它,后面 go roomproc() 中会取出并处理。

func (*Bucket) ChangeRoom

func (b *Bucket) ChangeRoom(nrid string, ch *Channel) (err error)

ChangeRoom change ro room user 更换房间。

func (*Bucket) Channel

func (b *Bucket) Channel(key string) (ch *Channel)

Channel get a channel by sub key. 根据 user key 获取对应 Channel

func (*Bucket) ChannelCount

func (b *Bucket) ChannelCount() int

ChannelCount channel count in the bucket 当前 bucket 内用户总数。

func (*Bucket) Del

func (b *Bucket) Del(dch *Channel)

func (*Bucket) DelRoom

func (b *Bucket) DelRoom(room *Room)

DelRoom delete a room by roomid. 从 b.rooms 中删除 room.ID 并执行关闭 room.Close()。

func (*Bucket) IPCount

func (b *Bucket) IPCount() (res map[string]struct{})

IPCount get ip count. 获取 bucket 内用户的独立 IP 总数。

func (*Bucket) Put

func (b *Bucket) Put(rid string, ch *Channel) (err error)

func (*Bucket) Room

func (b *Bucket) Room(rid string) (room *Room)

Room get a room by roomid. 根据房间 ID 获取房间对象。

func (*Bucket) RoomCount

func (b *Bucket) RoomCount() int

RoomCount room count in the bucket 当前 bucket 内房间总数。

func (*Bucket) Rooms

func (b *Bucket) Rooms() (res map[string]struct{})

Rooms get all room id where online number > 0.

func (*Bucket) RoomsCount

func (b *Bucket) RoomsCount() (res map[string]int32)

RoomsCount get all room id where online number > 0. 获取每个房间内的用户数目。

func (*Bucket) UpRoomsCount

func (b *Bucket) UpRoomsCount(roomCountMap map[string]int32)

UpRoomsCount update all room count

type Channel

type Channel struct {
	Room *Room // user 归属房间

	CliProto Ring // cliProto 是一个 Ring Buffer,保存 Room 广播或是 client 直接发送过来的消息体

	Writer bufio.Writer // 客户端连接 conn 的 写 封装
	Reader bufio.Reader // 客户端连接 conn 的 读 封装

	Next *Channel // 双向链表 rlink
	Prev *Channel // 双向链表 llink

	Mid int64  // user id
	Key string // user 在 logic service 的 key
	IP  string // user ip
	// contains filtered or unexported fields
}

func NewChannel

func NewChannel(cli, svr int) *Channel

NewChannel new a channel.

func (*Channel) Close

func (c *Channel) Close()

Close close the channel.

func (*Channel) NeedPush

func (c *Channel) NeedPush(op int32) bool

NeedPush verify if in watch. 判断 user 是否正在关注操作 op

func (*Channel) Push

func (c *Channel) Push(p *grpc.Proto) (err error)

Push server push message. 给当前 user 推送消息

func (*Channel) Ready

func (c *Channel) Ready() *grpc.Proto

Ready check the channel ready or close?

func (*Channel) Signal

func (c *Channel) Signal()

Signal send signal to the channel, protocol ready.

func (*Channel) UnWatch

func (c *Channel) UnWatch(accepts ...int32)

UnWatch unwatch an operation 移除 user 关注操作 ops

func (*Channel) Watch

func (c *Channel) Watch(accepts ...int32)

Watch watch a operation. 设置 user 关注的操作 ops

type Ring

type Ring struct {
	// contains filtered or unexported fields
}

Ring ring proto buffer.

func NewRing

func NewRing(num int) *Ring

NewRing new a ring buffer.

func (*Ring) Get

func (r *Ring) Get() (proto *grpc.Proto, err error)

func (*Ring) GetAdv

func (r *Ring) GetAdv()

GetAdv incr read index. 读游标 +1

func (*Ring) Init

func (r *Ring) Init(num int)

Init init ring.

func (*Ring) Reset

func (r *Ring) Reset()

Reset reset ring. 重置读写游标

func (*Ring) Set

func (r *Ring) Set() (proto *grpc.Proto, err error)

Set get a proto to write. 取待写入对象

func (*Ring) SetAdv

func (r *Ring) SetAdv()

SetAdv incr write index. 写游标 +1

type Room

type Room struct {
	ID string // 房间号

	Online    int32 // 房间的 channel 数量,即房间的在线用户的多少 // dirty read is ok
	AllOnline int32
	// contains filtered or unexported fields
}

func NewRoom

func NewRoom(id string) (r *Room)

NewRoom new a room struct, store channel room info.

func (*Room) Close

func (r *Room) Close()

Close close the room.

func (*Room) Del

func (r *Room) Del(ch *Channel) bool

Del delete channel from the room.

func (*Room) OnlineNum

func (r *Room) OnlineNum() int32

OnlineNum the room all online.

func (*Room) Push

func (r *Room) Push(p *grpc.Proto)

Push push msg to the room, if chan full discard it. 房间消息广播

func (*Room) Put

func (r *Room) Put(ch *Channel) (err error)

Put put channel into the room. 加入 Room

type Round

type Round struct {
	// contains filtered or unexported fields
}

Round userd for connection round-robin get a reader/writer/timer for split big lock.

func NewRound

func NewRound(c *conf.Config) (r *Round)

NewRound new a round struct.

func (*Round) Reader

func (r *Round) Reader(rn int) *bytes.Pool

Reader get a reader memory buffer. 取 Reader Pool ,给定数字 rn 会以取余方式选择其中一个,用于分散锁竞争压力,增加并发量。

func (*Round) Timer

func (r *Round) Timer(rn int) *time.Timer

Timer get a timer. 取 Timer Pool ,给定数字 rn 会以取余方式选择其中一个,用于分散锁竞争压力,增加并发量。

func (*Round) Writer

func (r *Round) Writer(rn int) *bytes.Pool

Writer get a writer memory buffer pool. 取 Writer Pool ,给定数字 rn 会以取余方式选择其中一个,用于分散锁竞争压力,增加并发量。

type RoundOptions

type RoundOptions struct {
	Timer        int // 每次要分配多少个用于 time.Timer 的 Pool
	TimerSize    int // 每个 time.Timer 一开始能接收的 TimerData 数量
	Reader       int // 每次要分配多少个用于 Reader bytes 的 Pool
	ReadBuf      int // 每个 Reader bytes Pool 有多少个 Buffer
	ReadBufSize  int // 每个 Reader bytes Pool 的 Buffer 能有多大的空间
	Writer       int // 每次要分配多少个用于 Writer bytes 的 Pool
	WriteBuf     int // 每个 Writer bytes Pool 有多少个 Buffer
	WriteBufSize int // 每个 Writer bytes Pool 的 Buffer 能有多大的空间
}

RoundOptions round options.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is comet server.

func NewServer

func NewServer(c *conf.Config) *Server

NewServer returns a new Server.

func (*Server) Bucket

func (s *Server) Bucket(subKey string) *Bucket

Bucket get the bucket by subkey.

用 CityHash32 计算 user key 的哈希值,然后对 bucket 总数取余算得 idx ,来取出 user key 对应的 bucket。 这种方法能够尽可能将 user 分散到不同的 bucket 中,随着 bucket 的增加,每个 bucket 内的 user 总数是降低的, 这样便可以在高并发时,避免在同一个 bucket 上的频繁锁竞争。

注,一个 user key 会始终映射到相同的 bucket 里,除非发生 bucket 的扩容,这个另做讨论。

func (*Server) Buckets

func (s *Server) Buckets() []*Bucket

Buckets return all buckets.

func (*Server) Close

func (s *Server) Close() (err error)

Close close the server.

func (*Server) Connect

func (s *Server) Connect(c context.Context, p *model.Proto, cookie string) (mid int64, key, rid string, accepts []int32, heartbeat time.Duration, err error)

Connect connected a connection. 告知 logic service 有人想要进入某个房间。

func (*Server) Disconnect

func (s *Server) Disconnect(c context.Context, mid int64, key string) (err error)

Disconnect disconnected a connection. client 连接中断,告知 logic service 需清理此人的状态信息。

func (*Server) Heartbeat

func (s *Server) Heartbeat(ctx context.Context, mid int64, key string) (err error)

Heartbeat heartbeat a connection session. 告知 logic service 要刷新某人的在线状态。

func (*Server) Operate

func (s *Server) Operate(ctx context.Context, p *model.Proto, ch *Channel, b *Bucket) error

Operate operate. 处理 Proto 相关逻辑。

func (*Server) RandServerHearbeat

func (s *Server) RandServerHearbeat() time.Duration

RandServerHearbeat rand server heartbeat. 返回 [10 min, 30 min] 之间的一个随机值,被用作 comet - logic 之间上报 user 心跳的时间间隔

func (*Server) Receive

func (s *Server) Receive(ctx context.Context, mid int64, p *model.Proto) (err error)

Receive receive a message.

func (*Server) RenewOnline

func (s *Server) RenewOnline(ctx context.Context, serverID string, rommCount map[string]int32) (allRoom map[string]int32, err error)

RenewOnline renew room online.

func (*Server) ServeTCP

func (s *Server) ServeTCP(conn *net.TCPConn, rp, wp *bytes.Pool, tr *xtime.Timer)

ServeTCP serve a tcp connection.

ServeTCP() 函数负责从连接 conn 中读取 client 发来的数据,写入到从 ring buffer 中,然后触发 ch.signal 通知 dispatchTCP() 协程去取消息。

1. 初始化 I/O 对象,定时器 ... 2. 鉴权 3. 启动 dispatchTCP() 协程异步处理消息 4. 从 conn 中读取消息内容写入到 ring buffer 中,并通过 ch.signal 通知 dispatchTCP() 协程去取消息 5. ...

func (*Server) ServeWebsocket

func (s *Server) ServeWebsocket(conn net.Conn, rp, wp *bytes.Pool, tr *xtime.Timer)

ServeWebsocket serve a websocket connection.

type Whitelist

type Whitelist struct {
	// contains filtered or unexported fields
}

Whitelist .

func (*Whitelist) Contains

func (w *Whitelist) Contains(mid int64) (ok bool)

Contains whitelist contains a mid or not.

func (*Whitelist) Printf

func (w *Whitelist) Printf(format string, v ...interface{})

Printf calls l.Output to print to the logger.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL