greatws

package module
v0.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 20, 2023 License: Apache-2.0 Imports: 38 Imported by: 2

README

greatws

支持海量连接的websocket库,callback写法

特性

  • 支持 epoll/kqueue
  • 低内存占用
  • 高tps

暂不支持

  • ssl
  • windows
  • io-uring

警告⚠️

早期阶段,暂时不建议生产使用

例子-服务端


type echoHandler struct{}

func (e *echoHandler) OnOpen(c *greatws.Conn) {
	// fmt.Printf("OnOpen: %p\n", c)
}

func (e *echoHandler) OnMessage(c *greatws.Conn, op greatws.Opcode, msg []byte) {
	if err := c.WriteTimeout(op, msg, 3*time.Second); err != nil {
		fmt.Println("write fail:", err)
	}
	// if err := c.WriteMessage(op, msg); err != nil {
	// 	slog.Error("write fail:", err)
	// }
}

func (e *echoHandler) OnClose(c *greatws.Conn, err error) {
	errMsg := ""
	if err != nil {
		errMsg = err.Error()
	}
	slog.Error("OnClose:", errMsg)
}

type handler struct {
	m *greatws.MultiEventLoop
}

func (h *handler) echo(w http.ResponseWriter, r *http.Request) {
	c, err := greatws.Upgrade(w, r,
		greatws.WithServerReplyPing(),
		// greatws.WithServerDecompression(),
		greatws.WithServerIgnorePong(),
		greatws.WithServerCallback(&echoHandler{}),
		// greatws.WithServerEnableUTF8Check(),
		greatws.WithServerReadTimeout(5*time.Second),
		greatws.WithServerMultiEventLoop(h.m),
	)
	if err != nil {
		slog.Error("Upgrade fail:", "err", err.Error())
	}
	_ = c
}

func main() {

	var h handler

	h.m = greatws.NewMultiEventLoopMust(greatws.WithEventLoops(0), greatws.WithMaxEventNum(1000), greatws.WithLogLevel(slog.LevelError)) // epoll, kqueue
	h.m.Start()
	fmt.Printf("apiname:%s\n", h.m.GetApiName())

	mux := &http.ServeMux{}
	mux.HandleFunc("/autobahn", h.echo)

	rawTCP, err := net.Listen("tcp", ":9001")
	if err != nil {
		fmt.Println("Listen fail:", err)
		return
	}
}

Documentation

Overview

Copyright 2021-2023 antlabs. All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright 2021-2023 antlabs. All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright 2021-2023 antlabs. All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

View Source
const (
	EVENT_EPOLL evFlag = 1 << iota
	EVENT_IOURING
)
View Source
const (
	Continuation = opcode.Continuation
	Text         = opcode.Text
	Binary       = opcode.Binary

	Close = opcode.Close
	Ping  = opcode.Ping
	Pong  = opcode.Pong
)

Variables

View Source
var (
	// conn已经被关闭
	ErrClosed = errors.New("closed")

	ErrWrongStatusCode      = errors.New("Wrong status code")
	ErrUpgradeFieldValue    = errors.New("The value of the upgrade field is not 'websocket'")
	ErrConnectionFieldValue = errors.New("The value of the connection field is not 'upgrade'")
	ErrSecWebSocketAccept   = errors.New("The value of Sec-WebSocketAaccept field is invalid")

	ErrHostCannotBeEmpty   = errors.New("Host cannot be empty")
	ErrSecWebSocketKey     = errors.New("The value of SEC websocket key field is wrong")
	ErrSecWebSocketVersion = errors.New("The value of SEC websocket version field is wrong, not 13")

	ErrHTTPProtocolNotSupported = errors.New("HTTP protocol not supported")

	ErrOnlyGETSupported     = errors.New("error:Only get methods are supported")
	ErrMaxControlFrameSize  = errors.New("error:max control frame size > 125, need <= 125")
	ErrRsv123               = errors.New("error:rsv1 or rsv2 or rsv3 has a value")
	ErrOpcode               = errors.New("error:wrong opcode")
	ErrNOTBeFragmented      = errors.New("error:since control message MUST NOT be fragmented")
	ErrFrameOpcode          = errors.New("error:since all data frames after the initial data frame must have opcode 0.")
	ErrTextNotUTF8          = errors.New("error:text is not utf8 data")
	ErrClosePayloadTooSmall = errors.New("error:close payload too small")
	ErrCloseValue           = errors.New("error:close value is wrong") // close值不对
	ErrEmptyClose           = errors.New("error:close value is empty") // close的值是空的
	ErrWriteClosed          = errors.New("write close")
)
View Source
var (
	ErrNotFoundHijacker = errors.New("not found Hijacker")
)

Functions

func GetPayloadBytes

func GetPayloadBytes(n int) (rv *[]byte)

func PutPayloadBytes

func PutPayloadBytes(bytes *[]byte)

func StringToBytes

func StringToBytes(s string) (b []byte)

StringToBytes 没有内存开销的转换

Types

type Callback

type Callback interface {
	OnOpen(*Conn)
	OnMessage(*Conn, Opcode, []byte)
	OnClose(*Conn, error)
}

type ClientOption

type ClientOption func(*DialOption)

func WithClientBindHTTPHeader

func WithClientBindHTTPHeader(h *http.Header) ClientOption

6.获取http header

func WithClientCallback

func WithClientCallback(cb Callback) ClientOption

1. callback 配置客户端callback

func WithClientCallbackFunc

func WithClientCallbackFunc(open OnOpenFunc, m OnMessageFunc, c OnCloseFunc) ClientOption

0. CallbackFunc

func WithClientCompression

func WithClientCompression() ClientOption

4.配置压缩

func WithClientDecompressAndCompress

func WithClientDecompressAndCompress() ClientOption

5.配置压缩和解压缩

func WithClientDecompression

func WithClientDecompression() ClientOption

10 配置解压缩

func WithClientDelayWriteInitBufferSize

func WithClientDelayWriteInitBufferSize(n int32) ClientOption

15.2 配置延迟包的初始化buffer大小

func WithClientDialTimeout

func WithClientDialTimeout(t time.Duration) ClientOption

3.配置握手时的timeout

func WithClientDisableBufioClearHack

func WithClientDisableBufioClearHack() ClientOption

func WithClientEnableUTF8Check

func WithClientEnableUTF8Check() ClientOption

func WithClientHTTPHeader

func WithClientHTTPHeader(h http.Header) ClientOption

2.配置http.Header

func WithClientIgnorePong

func WithClientIgnorePong() ClientOption

6 配置忽略pong消息

func WithClientMaxDelayWriteDuration

func WithClientMaxDelayWriteDuration(d time.Duration) ClientOption

13. 配置延迟发送 配置延迟最大发送时间

func WithClientMaxDelayWriteNum

func WithClientMaxDelayWriteNum(n int32) ClientOption

14.2 配置最大延迟个数.client

func WithClientOnCloseFunc

func WithClientOnCloseFunc(onClose func(c *Conn, err error)) ClientOption

17.2 配置客户端OnClose

func WithClientOnMessageFunc

func WithClientOnMessageFunc(cb OnMessageFunc) ClientOption

仅仅配置OnMessae函数

func WithClientReadTimeout

func WithClientReadTimeout(t time.Duration) ClientOption

16.2 .设置客户端读超时时间

func WithClientReplyPing

func WithClientReplyPing() ClientOption

配置自动回应ping frame, 当收到ping, 回一个pong

func WithClientTCPDelay

func WithClientTCPDelay() ClientOption

2. 设置TCP_NODELAY 设置客户端TCP_NODELAY

func WithClientTLSConfig

func WithClientTLSConfig(tls *tls.Config) ClientOption

1.配置tls.config

func WithClientWindowsMultipleTimesPayloadSize

func WithClientWindowsMultipleTimesPayloadSize(mt float32) ClientOption

type CloseErrMsg

type CloseErrMsg struct {
	Code StatusCode
	Msg  string
}

func (CloseErrMsg) Error

func (c CloseErrMsg) Error() string

type Config

type Config struct {
	Callback
	// contains filtered or unexported fields
}

type Conn

type Conn struct {
	*Config // 配置
	// contains filtered or unexported fields
}

func Dial

func Dial(rawUrl string, opts ...ClientOption) (*Conn, error)

https://datatracker.ietf.org/doc/html/rfc6455#section-4.1 又是一顿if else, 咬文嚼字

func DialConf

func DialConf(rawUrl string, conf *DialOption) (*Conn, error)

func Upgrade

func Upgrade(w http.ResponseWriter, r *http.Request, opts ...ServerOption) (c *Conn, err error)

func (*Conn) Close

func (c *Conn) Close()

func (*Conn) Write

func (c *Conn) Write(b []byte) (n int, err error)

func (*Conn) WriteFrameOnlyIoUring

func (c *Conn) WriteFrameOnlyIoUring(fw *fixedwriter.FixedWriter, payload []byte, fin bool, rsv1 bool, isMask bool, code opcode.Opcode, maskValue uint32) (err error)

func (*Conn) WriteMessage

func (c *Conn) WriteMessage(op Opcode, writeBuf []byte) (err error)

func (*Conn) WriteTimeout

func (c *Conn) WriteTimeout(op Opcode, data []byte, t time.Duration) (err error)

type ConnOption

type ConnOption struct {
	Config
}

type DefCallback

type DefCallback struct{}

1. 默认的OnOpen, OnMessage, OnClose都是空函数

func (*DefCallback) OnClose

func (defcallback *DefCallback) OnClose(_ *Conn, _ error)

func (*DefCallback) OnMessage

func (defcallback *DefCallback) OnMessage(_ *Conn, _ Opcode, _ []byte)

func (*DefCallback) OnOpen

func (defcallback *DefCallback) OnOpen(_ *Conn)

type DialOption

type DialOption struct {
	Header http.Header

	Config
	// contains filtered or unexported fields
}

func ClientOptionToConf

func ClientOptionToConf(opts ...ClientOption) *DialOption

func (*DialOption) Dial

func (d *DialOption) Dial() (c *Conn, err error)

type EvOption

type EvOption func(e *MultiEventLoop)

func WithEventLoops

func WithEventLoops(num int) EvOption

开启几个事件循环

func WithIoUring

func WithIoUring() EvOption

是否使用io_uring, 支持linux系统,需要内核版本6.2.0以上(以后只会在>=6.2.0的版本上测试)

func WithLogLevel

func WithLogLevel(level slog.Level) EvOption

func WithMaxEventNum

func WithMaxEventNum(num int) EvOption

设置每个事件循环的最大事件数量

func WithMinBusinessGoNum

func WithMinBusinessGoNum(num int) EvOption

最小业务goroutine数量

type EventLoop

type EventLoop struct {
	// contains filtered or unexported fields
}

func CreateEventLoop

func CreateEventLoop(setSize int, flag evFlag) (e *EventLoop, err error)

初始化函数

func (*EventLoop) GetApiName

func (el *EventLoop) GetApiName() string

func (*EventLoop) Loop

func (el *EventLoop) Loop()

func (*EventLoop) Shutdown

func (e *EventLoop) Shutdown(ctx context.Context) error

柔性关闭所有的连接

func (*EventLoop) StartLoop

func (el *EventLoop) StartLoop()

type MultiEventLoop

type MultiEventLoop struct {
	*slog.Logger
	// contains filtered or unexported fields
}

func NewMultiEventLoop

func NewMultiEventLoop(opts ...EvOption) (e *MultiEventLoop, err error)

创建一个多路事件循环

func NewMultiEventLoopMust

func NewMultiEventLoopMust(opts ...EvOption) *MultiEventLoop

func (*MultiEventLoop) GetApiName

func (m *MultiEventLoop) GetApiName() string

func (*MultiEventLoop) GetCurConnNum

func (m *MultiEventLoop) GetCurConnNum() int64

获取当前连接数

func (*MultiEventLoop) GetCurTaskNum

func (m *MultiEventLoop) GetCurTaskNum() int64

获取当前运行的任务数

func (*MultiEventLoop) Start

func (m *MultiEventLoop) Start()

启动多路事件循环

type OnCloseFunc

type OnCloseFunc func(*Conn, error)

3. 只设置OnClose, 和OnMessage互斥

func (OnCloseFunc) OnClose

func (o OnCloseFunc) OnClose(c *Conn, err error)

func (OnCloseFunc) OnMessage

func (o OnCloseFunc) OnMessage(_ *Conn, _ Opcode, _ []byte)

func (OnCloseFunc) OnOpen

func (o OnCloseFunc) OnOpen(_ *Conn)

type OnMessageFunc

type OnMessageFunc func(*Conn, Opcode, []byte)

2. 只设置OnMessage, 和OnClose互斥

func (OnMessageFunc) OnClose

func (o OnMessageFunc) OnClose(_ *Conn, _ error)

func (OnMessageFunc) OnMessage

func (o OnMessageFunc) OnMessage(c *Conn, op Opcode, data []byte)

func (OnMessageFunc) OnOpen

func (o OnMessageFunc) OnOpen(_ *Conn)

type OnOpenFunc

type OnOpenFunc func(*Conn)

type Opcode

type Opcode = opcode.Opcode

type ServerOption

type ServerOption func(*ConnOption)

func WithServerCallback

func WithServerCallback(cb Callback) ServerOption

配置服务端回调函数

func WithServerCallbackFunc

func WithServerCallbackFunc(open OnOpenFunc, m OnMessageFunc, c OnCloseFunc) ServerOption

配置服务端回调函数

func WithServerDecompressAndCompress

func WithServerDecompressAndCompress() ServerOption

1.配置压缩和解压缩

func WithServerDecompression

func WithServerDecompression() ServerOption

func WithServerDelayWriteInitBufferSize

func WithServerDelayWriteInitBufferSize(n int32) ServerOption

15.1 配置延迟包的初始化buffer大小

func WithServerDisableBufioClearHack

func WithServerDisableBufioClearHack() ServerOption

11 关闭bufio clear hack优化

func WithServerEnableUTF8Check

func WithServerEnableUTF8Check() ServerOption

3.关闭utf8检查

func WithServerIgnorePong

func WithServerIgnorePong() ServerOption

func WithServerMaxDelayWriteDuration

func WithServerMaxDelayWriteDuration(d time.Duration) ServerOption

13. 配置延迟发送 配置延迟最大发送时间

func WithServerMaxDelayWriteNum

func WithServerMaxDelayWriteNum(n int32) ServerOption

14.1 配置最大延迟个数.server

func WithServerMultiEventLoop

func WithServerMultiEventLoop(m *MultiEventLoop) ServerOption

last 配置event

func WithServerOnCloseFunc

func WithServerOnCloseFunc(onClose func(c *Conn, err error)) ServerOption

17。 只配置OnClose 17.1 配置服务端OnClose

func WithServerOnMessageFunc

func WithServerOnMessageFunc(cb OnMessageFunc) ServerOption

4.仅仅配置OnMessae函数 仅仅配置OnMessae函数

func WithServerReadTimeout

func WithServerReadTimeout(t time.Duration) ServerOption

16. 配置读超时时间

16.1 .设置服务端读超时时间

func WithServerReplyPing

func WithServerReplyPing() ServerOption

5. 配置自动回应ping frame, 当收到ping, 回一个pong

func WithServerSubprotocols

func WithServerSubprotocols(subprotocols []string) ServerOption

2. 设置服务端支持的子协议

func WithServerTCPDelay

func WithServerTCPDelay() ServerOption

设置TCP_NODELAY 为false, 开启nagle算法 设置服务端TCP_NODELAY

func WithServerWindowsMultipleTimesPayloadSize

func WithServerWindowsMultipleTimesPayloadSize(mt float32) ServerOption

7. 设置几倍payload的缓冲区 只有解析方式是窗口的时候才有效 如果为1.0就是1024 + 14, 如果是2.0就是2048 + 14

type StatusCode

type StatusCode int16

https://datatracker.ietf.org/doc/html/rfc6455#section-7.4.1 这里记录了各种状态码的含义

const (
	// NormalClosure 正常关闭
	NormalClosure StatusCode = 1000
	// EndpointGoingAway 对端正在消失
	EndpointGoingAway StatusCode = 1001
	// ProtocolError 表示对端由于协议错误正在终止连接
	ProtocolError StatusCode = 1002
	// DataCannotAccept 收到一个不能接受的数据类型
	DataCannotAccept StatusCode = 1003
	// NotConsistentMessageType 表示对端正在终止连接, 消息类型不一致
	NotConsistentMessageType StatusCode = 1007
	// TerminatingConnection 表示对端正在终止连接, 没有好用的错误, 可以用这个错误码表示
	TerminatingConnection StatusCode = 1008
	// TooBigMessage  消息太大, 不能处理, 关闭连接
	TooBigMessage StatusCode = 1009
	// NoExtensions 只用于客户端, 服务端返回扩展消息
	NoExtensions StatusCode = 1010
	// ServerTerminating 服务端遇到意外情况, 中止请求
	ServerTerminating StatusCode = 1011
)

func (StatusCode) String

func (s StatusCode) String() string

type UpgradeServer

type UpgradeServer struct {
	// contains filtered or unexported fields
}

func NewUpgrade

func NewUpgrade(opts ...ServerOption) *UpgradeServer

func (*UpgradeServer) Upgrade

func (u *UpgradeServer) Upgrade(w http.ResponseWriter, r *http.Request) (c *Conn, err error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL