ws

package
v1.3.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 30, 2024 License: MIT Imports: 24 Imported by: 0

README

Read this in other languages: English, 中文.

Introduction to the ws Module

  • Interface-oriented programming, automatic connection management
  • Built-in support for multiple platforms, devices, versions, and character sets
  • Supports asynchronous messaging and synchronous RPC calls
  • Extremely efficient binary-level protocol for minimal data transfer
  • Message body object pool
  • Zero-copy message data
  • Multi-language support (Go/JavaScript/TypeScript/C++)

Module Usage

plug and play

Using GoLang to write a simple server:

  1. Define request and response packet protocol IDs for the server and client.
const C2S_REQ  = 2
const S2C_RESP = 3
  1. Register a server-side connection reception route.
ws.InitServerWithOpt(ServerOption{[]HubOption{HubShardOption(4)}}) 
  1. Register a server-side message reception handler, which sends a response packet after processing.
ws.RegisterHandler(C2S_REQ, func(ctx context.Context, connection IConnection, message IMessage) error {
    log.Info(ctx, "server recv: %v, %v", message.GetProtocolId(), string(message.GetData()))
    packet := GetPoolMessage(S2C_RESP)
    packet.SetData([]byte("server response"))
    connection.SendMsg(ctx, packet, nil)
    return nil
})
  1. Create a listening service and start it.
http.HandleFunc("/join", func(w http.ResponseWriter, r *http.Request) {
    connMeta := ws.ConnectionMeta{
        UserId:   r.URL.Query().Get("uid"),
    }
    _, err := ws.Accept(ctx, w, r, connMeta, 
        ws.ConnEstablishHandlerOption(func(ctx context.Context, conn IConnection) {
            log.Info(ctx, "server conn establish: %v, %p", conn.Id(), conn)
        }),
        ws.ConnClosingHandlerOption(func(ctx context.Context, conn IConnection) {
            log.Info(ctx, "server conn closing: %v, %p", conn.Id(), conn)
        }),
        ws.ConnClosedHandlerOption(func(ctx context.Context, conn IConnection) {
            log.Info(ctx, "server conn closed: %v, %p", conn.Id(), conn)
        }))
    if err != nil {
        log.Error(ctx, "Accept client connection failed. error: %v", err)
        return
    }
})
http.ListenAndServe(":8003", nil)

Using GoLang to write a simple client:

  1. Register a client-side connection reception route.
ws.InitClient()
  1. Register a client-side message reception handler for receiving messages from the server.
ws.RegisterHandler(S2C_RESP, func(ctx context.Context, connection IConnection, message IMessage) error {
    log.Info(ctx, "client recv: %v, %v", message.GetProtocolId(), string(message.GetData()))
    return nil
})
  1. Connect to the established server.
url := "ws://127.0.0.1:8003/join?uid=100"
conn, _ := ws.DialConnect(context.Background(), url, http.Header{},
    ws.ClientIdOption("server1"),
    ws.ConnEstablishHandlerOption(func(ctx context.Context, conn IConnection) {
        log.Info(ctx, "client conn establish: %v, %p", conn.Id(), conn)
    }),
    ws.ConnClosingHandlerOption(func(ctx context.Context, conn IConnection) {
        log.Info(ctx, "client conn closing: %v, %p", conn.Id(), conn)
    }),
    ws.ConnClosedHandlerOption(func(ctx context.Context, conn IConnection) {
        log.Info(ctx, "client conn closed: %v, %p", conn.Id(), conn)
    }),
)
log.Info(ctx, "%v", conn)
  1. In the callback after the connection is established, use ConnEstablishHandlerOption to send messages to the server.
packet := ws.GetPoolMessage(C2S_REQ)
packet.SetData([]byte("client request"))
conn.SendMsg(context.Background(), packet, nil)
  1. Example of sending request-response RPC calls based on WebSocket (ws).
packet := GetPoolMessage(C2S_REQ)
packet.SetData([]byte("client rpc req info"))
resp, err := conn.SendRequestMsg(context.Background(), packet, nil)
if err == nil {
    log.Info(ctx, "client recv: sn: %v, data: %v", resp.GetSn(), string(resp.GetData()))
}
  1. Example of sending request-response RPC calls with a timeout based on WebSocket (ws).
timeoutCtx, _ := context.WithTimeout(ctx, time.Second*5)
packet := GetPoolMessage(C2S_REQ)
packet.SetData([]byte("client rpc req info timeout"))
resp, err := conn.SendRequestMsg(timeoutCtx, packet, nil)
if err == nil {
    log.Info(ctx, "client recv: sn: %v, data: %v", resp.GetSn(), string(resp.GetData()))
} else {
    log.Error(ctx, "client recv err: %v", err)
}

Advanced Usage

About protobuf

Protobuf definitions generate corresponding source code. The Git repository already includes the generated results, so this step can be skipped.

The .pb files only define top-level related structure definitions; the framework communication protocol does not use protobuf implementation. Business message structures can choose to be implemented using protobuf or JSON.

If protobuf is used, the framework can support object pool functionality.

protoc --go_out=. ws/msg.proto
Available Callable Interfaces

Following the principle of interface-oriented design, implementation is separated from definition. The def.go file contains all the functions and interfaces that users need to use.

Other Language Clients

JavaScript Client Usage

Supports lib mode and CommonJS mode.

https://www.npmjs.com/package/google-protobuf

npm i google-protobuf

//lib js  (msg_pb_libs.js+google-protobuf.js)
protoc --js_out=library=msg_pb_libs,binary:ws/js  ws/msg.proto

//commonjs  (msg_pb_dist.js or msg_pb_dist.min.js)
cd ws
protoc --js_out=import_style=commonjs,binary:js  msg.proto

cd js
npm i -g browserify
npm i -g minifier
browserify msg_pb.js <custom_pb.js> -o  msg_pb_dist.js
minify msg_pb_dist.js   //msg_pb_dist.min.js

http://127.0.0.1:8003/js/demo.html
TypeScript Client Usage in CommonJS Mode
npm i protobufjs
npm i -g protobufjs-cli

cd ts

pbjs -t static-module -w commonjs -o dist/msg_pb.js ../msg.proto
pbts -o msg_pb.d.ts dist/msg_pb.js

tsc -p tsconfig.json
node demo.js //const WebSocket = require("ws");

npm i -g browserify

browserify dist/msg_pb.js dist/wsc.js dist/demo.js  -o dist/bundle.js

http://127.0.0.1:8003/ts/demo.html
C++ Client
#1. unzip cpp/protobuf.zip (download from https://github.com/protocolbuffers/protobuf/releases  sourcecode: protobuf-cpp-3.21.12.zip then build)
#2. gen compatible protobuf cpp code
cpp\protobuf\bin\protoc --cpp_out=cpp/QWS msg.proto

#build sln

More Comprehensive Demo Examples

InitServerWithOpt(ServerOption{[]HubOption{HubShardOption(4)}}) //server invoke 
ctx := context.Background()

const (
    C2S_REQ  = 2
    S2C_RESP = 3
)

//server reg handler
RegisterHandler(C2S_REQ, func(ctx context.Context, connection IConnection, message IMessage) error {
    log.Info(ctx, "server recv: %v, %v", message.GetProtocolId(), string(message.GetData()))
    packet := GetPoolMessage(S2C_RESP)
    packet.SetData([]byte("server response"))
    connection.SendMsg(ctx, packet, nil)
    return nil
})

http.HandleFunc("/join", func(w http.ResponseWriter, r *http.Request) {
    connMeta := ConnectionMeta{
        UserId:   r.URL.Query().Get("uid"),
        Typed:    0,
        DeviceId: "",
        Version:  0,
        Charset:  0,
    }
    _, err := Accept(ctx, w, r, connMeta, DebugOption(true),
        SrvUpgraderCompressOption(true),
        CompressionLevelOption(2),
        ConnEstablishHandlerOption(func(ctx context.Context, conn IConnection) {
            log.Info(ctx, "server conn establish: %v, %p", conn.Id(), conn)
        }),
        ConnClosingHandlerOption(func(ctx context.Context, conn IConnection) {
            log.Info(ctx, "server conn closing: %v, %p", conn.Id(), conn)
        }),
        ConnClosedHandlerOption(func(ctx context.Context, conn IConnection) {
            log.Info(ctx, "server conn closed: %v, %p", conn.Id(), conn)
        }))
    if err != nil {
        log.Error(ctx, "Accept client connection failed. error: %v", err)
        return
    }
})
http.ListenAndServe(":8003", nil)

GoLang client-side Demo Examples

InitClient()        
                                            //client invoke 
const (
    C2S_REQ  = 2
    S2C_RESP = 3
)

//client reg handler
RegisterHandler(S2C_RESP, func(ctx context.Context, connection IConnection, message IMessage) error {
    log.Info(ctx, "client recv: %v, %v", message.GetProtocolId(), string(message.GetData()))
    return nil
})

//client connect
uid := "100"
url := "ws://127.0.0.1:8003/join?uid=" + uid
conn, _ := DialConnect(context.Background(), url, http.Header{},
    DebugOption(true),
    ClientIdOption("server1"),
    ClientDialWssOption(url, false),
    ClientDialCompressOption(true),
    CompressionLevelOption(2),
    ConnEstablishHandlerOption(func(ctx context.Context, conn IConnection) {
        log.Info(ctx, "client conn establish: %v, %p", conn.Id(), conn)
    }),
    ConnClosingHandlerOption(func(ctx context.Context, conn IConnection) {
        log.Info(ctx, "client conn closing: %v, %p", conn.Id(), conn)
    }),
    ConnClosedHandlerOption(func(ctx context.Context, conn IConnection) {
        log.Info(ctx, "client conn closed: %v, %p", conn.Id(), conn)
    }),
)

log.Info(ctx, "%v", conn)
time.Sleep(time.Second * 5)

packet := GetPoolMessage(C2S_REQ)
packet.SetData([]byte("client request"))
conn.SendMsg(context.Background(), packet, nil)

Documentation

Index

Constants

View Source
const (
	CONN_KIND_CLIENT = 0
	CONN_KIND_SERVER = 1
)
View Source
const (
	CHARSET_UTF8 = 0
	CHARSET_GBK  = 1
)

Variables

View Source
var (
	P_BASE_name = map[int32]string{
		0:          "none",
		2147483647: "s2c_err_displace",
	}
	P_BASE_value = map[string]int32{
		"none":             0,
		"s2c_err_displace": 2147483647,
	}
)

Enum value maps for P_BASE.

View Source
var ErrWsRpcResponseTimeout = errors.New("rpc cancel or timeout")
View Source
var ErrWsRpcWaitChanClosed = errors.New("sn channel is closed")
View Source
var File_ws_msg_proto protoreflect.FileDescriptor

Functions

func AutoReDialConnect added in v1.1.7

func AutoReDialConnect(ctx context.Context, sUrl string, header http.Header, cancelAutoConn chan interface{}, connInterval time.Duration,
	opts ...ConnOption)

func InitClient added in v1.0.15

func InitClient()

client invoke 客户端调用

func InitServer added in v1.0.15

func InitServer()

server invoke 服务端调用

func InitServerWithOpt added in v1.1.21

func InitServerWithOpt(serverOpt ServerOption)

func RegisterDataMsgType added in v1.1.4

func RegisterDataMsgType(protocolId uint32, pMsg IDataMessage)

注册数据消息类型[Data],功能可选,当需要使用框架提供的池功能时使用

func RegisterHandler

func RegisterHandler(protocolId uint32, h MsgHandler)

注册消息处理器

Types

type ConnOption

type ConnOption func(*Connection)

连接动态参数选项

func ClientAutoReconHandlerOption added in v1.1.22

func ClientAutoReconHandlerOption(handler EventHandler) ConnOption

func ClientDialCompressOption added in v1.1.5

func ClientDialCompressOption(compress bool) ConnOption

func ClientDialConnFailedHandlerOption added in v1.1.13

func ClientDialConnFailedHandlerOption(handler EventHandler) ConnOption

func ClientDialHandshakeTimeoutOption added in v1.1.5

func ClientDialHandshakeTimeoutOption(handshakeTimeout time.Duration) ConnOption

func ClientDialOption added in v1.1.5

func ClientDialOption(dialer *websocket.Dialer) ConnOption

func ClientDialRetryOption added in v1.1.5

func ClientDialRetryOption(retryNum int, retryInterval time.Duration) ConnOption

func ClientDialWssOption added in v1.1.5

func ClientDialWssOption(sUrl string, secureWss bool) ConnOption

func ClientIdOption added in v1.1.5

func ClientIdOption(id string) ConnOption

客户端专用 默认使用时间戳来记录客户端所连服务器的id

func CompressionLevelOption added in v1.1.5

func CompressionLevelOption(compressionLevel int) ConnOption

func ConnClosedHandlerOption added in v1.1.8

func ConnClosedHandlerOption(handler EventHandler) ConnOption

func ConnClosingHandlerOption added in v1.1.8

func ConnClosingHandlerOption(handler EventHandler) ConnOption

func ConnEstablishHandlerOption added in v1.1.8

func ConnEstablishHandlerOption(handler EventHandler) ConnOption

callback

func DebugOption added in v1.1.8

func DebugOption(debug bool) ConnOption

func MaxMessageBytesSizeOption added in v1.2.0

func MaxMessageBytesSizeOption(size uint32) ConnOption

func NetMaxFailureRetryOption added in v1.1.5

func NetMaxFailureRetryOption(maxFailureRetry int) ConnOption

func NetReadWaitOption added in v1.1.5

func NetReadWaitOption(readWait time.Duration) ConnOption

func NetTemporaryWaitOption added in v1.1.5

func NetTemporaryWaitOption(temporaryWait time.Duration) ConnOption

func NetWriteWaitOption added in v1.1.5

func NetWriteWaitOption(writeWait time.Duration) ConnOption

func RecvPingHandlerOption added in v1.1.8

func RecvPingHandlerOption(handler EventHandler) ConnOption

func RecvPongHandlerOption added in v1.1.8

func RecvPongHandlerOption(handler EventHandler) ConnOption

func SendBufferOption

func SendBufferOption(bufferSize int) ConnOption

func SrvCheckOriginOption added in v1.1.5

func SrvCheckOriginOption(checkOrigin func(r *http.Request) bool) ConnOption

func SrvPullChannelsOption

func SrvPullChannelsOption(pullChannelIds []int) ConnOption

为每种消息拉取逻辑分别注册不同的通道

func SrvUpgraderCompressOption added in v1.1.5

func SrvUpgraderCompressOption(compress bool) ConnOption

func SrvUpgraderOption

func SrvUpgraderOption(upgrader *websocket.Upgrader) ConnOption

服务端特有 upgrader定制

type ConnType

type ConnType int8

func (ConnType) String

func (t ConnType) String() string

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

websocket连接封装

func (*Connection) Charset

func (c *Connection) Charset() int

func (*Connection) ClientIp added in v1.1.10

func (c *Connection) ClientIp() string

func (*Connection) ConnType added in v1.2.0

func (c *Connection) ConnType() ConnType

func (*Connection) DeviceId added in v1.1.14

func (c *Connection) DeviceId() string

func (*Connection) DisplaceClientByIp added in v1.1.23

func (c *Connection) DisplaceClientByIp(ctx context.Context, displaceIp string)

func (*Connection) GetCommDataValue

func (c *Connection) GetCommDataValue(key string) (interface{}, bool)

连接数据存储结构

func (*Connection) GetPullChannel

func (c *Connection) GetPullChannel(pullChannelId int) (chan struct{}, bool)

func (*Connection) Id

func (c *Connection) Id() string

func (*Connection) IncrCommDataValueBy

func (c *Connection) IncrCommDataValueBy(key string, delta int)

func (*Connection) IsDisplaced

func (c *Connection) IsDisplaced() bool

func (*Connection) IsStopped

func (c *Connection) IsStopped() bool

func (*Connection) KickClient

func (c *Connection) KickClient(displace bool)

func (*Connection) KickServer

func (c *Connection) KickServer()

func (*Connection) RefreshDeadline

func (c *Connection) RefreshDeadline()

func (*Connection) RemoveCommDataValue

func (c *Connection) RemoveCommDataValue(key string)

func (*Connection) SendMsg

func (c *Connection) SendMsg(ctx context.Context, payload IMessage, sc SendCallback) (err error)

func (*Connection) SendPullNotify

func (c *Connection) SendPullNotify(ctx context.Context, pullChannelId int) (err error)

func (*Connection) SendRequestMsg added in v1.3.3

func (c *Connection) SendRequestMsg(ctx context.Context, reqMsg IMessage, sc SendCallback) (respMsg IMessage, err error)

func (*Connection) SendResponseMsg added in v1.3.3

func (c *Connection) SendResponseMsg(ctx context.Context, respMsg IMessage, reqSn uint32, sc SendCallback) (err error)

func (*Connection) SetCommDataValue

func (c *Connection) SetCommDataValue(key string, value interface{})

func (*Connection) SignalPullSend added in v1.3.3

func (c *Connection) SignalPullSend(ctx context.Context, pullChannelId int) (err error)

通知指定消息通道转发消息

func (*Connection) Type

func (c *Connection) Type() int

func (*Connection) UserId

func (c *Connection) UserId() string

func (*Connection) Version

func (c *Connection) Version() int

type ConnectionMeta

type ConnectionMeta struct {
	UserId   string //userId
	Typed    int    //客户端类型枚举
	DeviceId string //设备ID
	Version  int    //版本
	Charset  int    //客户端使用的字符集
	// contains filtered or unexported fields
}

func (*ConnectionMeta) BuildConnId

func (m *ConnectionMeta) BuildConnId() string

type EventHandler added in v1.1.8

type EventHandler func(context.Context, IConnection)

客户端事件处理函数 ConnEstablishHandlerOption sync(阻塞主流程) ConnClosingHandlerOption sync(阻塞主流程) ConnClosedHandlerOption async

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

连接管理器

func (*Hub) ConnectionIds

func (h *Hub) ConnectionIds() []string

func (*Hub) Find

func (h *Hub) Find(id string) (IConnection, error)

func (*Hub) RangeConnsByFunc

func (h *Hub) RangeConnsByFunc(f func(string, IConnection) bool)

type HubOption added in v1.1.21

type HubOption func(IHub)

func HubShardOption added in v1.1.21

func HubShardOption(cnt uint16) HubOption

type IConnection added in v1.1.21

type IConnection interface {
	Id() string
	ConnType() ConnType
	UserId() string
	Type() int
	DeviceId() string
	Version() int
	Charset() int
	ClientIp() string
	IsStopped() bool
	IsDisplaced() bool
	RefreshDeadline()
	SendMsg(ctx context.Context, payload IMessage, sc SendCallback) error
	SendRequestMsg(ctx context.Context, reqMsg IMessage, sc SendCallback) (IMessage, error)
	SendResponseMsg(ctx context.Context, respMsg IMessage, reqSn uint32, sc SendCallback) error

	KickClient(displace bool)                                  //server side invoke
	KickServer()                                               //client side invoke
	DisplaceClientByIp(ctx context.Context, displaceIp string) //server side invoke

	GetPullChannel(pullChannelId int) (chan struct{}, bool)
	SendPullNotify(ctx context.Context, pullChannelId int) error       //deprecated, change to SignalPull
	SignalPullSend(ctx context.Context, pullChannelId int) (err error) //signal pull msg to send, see Puller.PullSend

	GetCommDataValue(key string) (interface{}, bool)
	SetCommDataValue(key string, value interface{})
	RemoveCommDataValue(key string)
	IncrCommDataValueBy(key string, delta int)
}

func DialConnect added in v1.1.5

func DialConnect(ctx context.Context, sUrl string, header http.Header, opts ...ConnOption) (IConnection, error)

type IDataMessage added in v1.1.4

type IDataMessage interface {
	proto.Message
	Reset()
}

P_MESSAGE.Data类型的接口

type IHub added in v1.1.21

type IHub interface {
	Find(string) (IConnection, error)
	RangeConnsByFunc(func(string, IConnection) bool)
	ConnectionIds() []string
	// contains filtered or unexported methods
}
var ClientConnHub IHub //服务端管理的来自客户端的连接
var ServerConnHub IHub //客户端管理的连向服务端的连接

type IMessage added in v1.1.21

type IMessage interface {
	GetProtocolId() uint32
	GetSn() uint32
	GetData() []byte
	SetData(data []byte)
	DataMsg() IDataMessage
}

func GetPoolMessage added in v1.1.3

func GetPoolMessage(protocolId uint32) IMessage

pool message 对象池消息

func NewMessage added in v1.1.3

func NewMessage(protocolId uint32) IMessage

normal message 普通消息

type Message added in v1.1.3

type Message struct {
	// contains filtered or unexported fields
}

不能手动创建,必须使用 NewMessage() 或 GetPoolMessage()

func (*Message) DataMsg added in v1.1.4

func (t *Message) DataMsg() IDataMessage

func (*Message) GetData added in v1.3.0

func (t *Message) GetData() []byte

func (*Message) GetProtocolId added in v1.3.0

func (t *Message) GetProtocolId() uint32

func (*Message) GetSn added in v1.3.3

func (t *Message) GetSn() uint32

func (*Message) SetData added in v1.3.0

func (t *Message) SetData(data []byte)

type MsgHandler added in v1.1.21

type MsgHandler func(context.Context, IConnection, IMessage) error

客户端消息处理函数对象 use RegisterHandler(protocolId, MsgHandler)

type P_BASE added in v1.1.25

type P_BASE int32

基础协议

const (
	P_BASE_none             P_BASE = 0          //保留字段
	P_BASE_s2c_err_displace P_BASE = 2147483647 //被顶号
)

func (P_BASE) Descriptor added in v1.1.25

func (P_BASE) Descriptor() protoreflect.EnumDescriptor

func (P_BASE) Enum added in v1.1.25

func (x P_BASE) Enum() *P_BASE

func (P_BASE) EnumDescriptor deprecated added in v1.1.25

func (P_BASE) EnumDescriptor() ([]byte, []int)

Deprecated: Use P_BASE.Descriptor instead.

func (P_BASE) Number added in v1.1.25

func (x P_BASE) Number() protoreflect.EnumNumber

func (P_BASE) String added in v1.1.25

func (x P_BASE) String() string

func (P_BASE) Type added in v1.1.25

func (P_BASE) Type() protoreflect.EnumType

type P_DISPLACE added in v1.1.11

type P_DISPLACE struct {
	OldIp []byte `protobuf:"bytes,1,opt,name=old_ip,json=oldIp,proto3" json:"old_ip,omitempty"`
	NewIp []byte `protobuf:"bytes,2,opt,name=new_ip,json=newIp,proto3" json:"new_ip,omitempty"`
	Ts    int64  `protobuf:"varint,3,opt,name=ts,proto3" json:"ts,omitempty"`
	// contains filtered or unexported fields
}

func (*P_DISPLACE) Descriptor deprecated added in v1.1.11

func (*P_DISPLACE) Descriptor() ([]byte, []int)

Deprecated: Use P_DISPLACE.ProtoReflect.Descriptor instead.

func (*P_DISPLACE) GetNewIp added in v1.1.11

func (x *P_DISPLACE) GetNewIp() []byte

func (*P_DISPLACE) GetOldIp added in v1.1.11

func (x *P_DISPLACE) GetOldIp() []byte

func (*P_DISPLACE) GetTs added in v1.1.11

func (x *P_DISPLACE) GetTs() int64

func (*P_DISPLACE) ProtoMessage added in v1.1.11

func (*P_DISPLACE) ProtoMessage()

func (*P_DISPLACE) ProtoReflect added in v1.1.11

func (x *P_DISPLACE) ProtoReflect() protoreflect.Message

func (*P_DISPLACE) Reset added in v1.1.11

func (x *P_DISPLACE) Reset()

func (*P_DISPLACE) String added in v1.1.11

func (x *P_DISPLACE) String() string

type Puller added in v1.1.23

type Puller interface {
	PullSend()
}

func NewDefaultPuller added in v1.1.23

func NewDefaultPuller(conn IConnection, pullChannelId int, firstPullFunc, pullFunc func(context.Context, IConnection)) Puller

type SendCallback

type SendCallback func(ctx context.Context, c IConnection, err error)

消息发送回调接口

type ServerOption added in v1.1.21

type ServerOption struct {
	HubOpts []HubOption
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL