websocket

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 9, 2021 License: MIT Imports: 23 Imported by: 6

README

websocket

WebSocket library for fasthttp and net/http.

Checkout examples to inspire yourself.

Install

go get github.com/dgrr/websocket

Why another WebSocket package?

Other WebSocket packages DON'T allow concurrent Read/Write operations on servers and they do not provide low level access to WebSocket packet crafting. Those WebSocket packages try to emulate the Golang API by implementing io.Reader and io.Writer interfaces on their connections. io.Writer might be a good idea to use it, but no io.Reader, given that WebSocket is an async protocol by nature (all protocols are (?)).

Sometimes, WebSocket servers are just cumbersome when we want to handle a lot of clients in an async manner. For example, in other WebSocket packages to broadcast a message generated internally we'll need to do the following:

type MyWebSocketService struct {
    clients sync.Map
}

type BlockingConn struct {
	lck sync.Mutex
	c websocketPackage.Conn
}

func (ws *MyWebSocketService) Broadcaster() {
	for msg := range messageProducerChannel {
        ws.clients.Range(func(_, v interface{}) bool {
            c := v.(*BlockingConn)
            c.lck.Lock() // oh, we need to block, otherwise we can break the program
            err := c.Write(msg)
            c.lck.Unlock()
            
            if err != nil {
                // we have an error, what can we do? Log it?
            	// if the connection has been closed we'll receive that on
            	// the Read call, so the connection will close automatically.
            }
            
            return true
        })
    }
}

func (ws *MyWebSocketService) Handle(request, response) {
	c, err := websocketPackage.Upgrade(request, response)
	if err != nil {
		// then it's clearly an error! Report back
    }
    
    bc := &BlockingConn{
        c: c,
    }   
    
	ws.clients.Store(bc, struct{}{})
    
	// even though I just want to write, I need to block somehow
    for {
    	content, err := bc.Read()
    	if err != nil {
            // handle the error
            break
        }
    }
    
    ws.clients.Delete(bc)
}

First, we need to store every client upon connection, and whenever we want to send data we need to iterate over a list, and send the message. If while, writing we get an error, then we need to handle that client's error What if the writing operation is happening at the same time in 2 different coroutines? Then we need a sync.Mutex and block until we finish writing.

To solve most of those problems websocket uses channels and separated coroutines, one for reading and another one for writing. By following the sharing principle.

Do not communicate by sharing memory; instead, share memory by communicating.

Following the fasthttp philosophy this library tries to take as much advantage of the Golang's multi-threaded model as possible, while keeping your code concurrently safe.

To see an example of what this package CAN do that others DONT checkout the broadcast example.

Server

How can I launch a server?

It's quite easy. You only need to create a Server, set your callbacks by calling the Handle* methods and then specify your fasthttp handler as Server.Upgrade.

package main

import (
	"fmt"
	
	"github.com/valyala/fasthttp"
	"github.com/dgrr/websocket"
)

func main() {
	ws := websocket.Server{}
	ws.HandleData(OnMessage)
	
	fasthttp.ListenAndServe(":8080", ws.Upgrade)
}

func OnMessage(c *websocket.Conn, isBinary bool, data []byte) {
	fmt.Printf("Received data from %s: %s\n", c.RemoteAddr(), data)
}

How can I launch a server if I use net/http?

package main

import (
	"fmt"
	"net/http"
	
	"github.com/dgrr/websocket"
)

func main() {
	ws := websocket.Server{}
	ws.HandleData(OnMessage)
	
	http.HandleFunc("/", ws.NetUpgrade)
	http.ListenAndServe(":8080", nil)
}

func OnMessage(c *websocket.Conn, isBinary bool, data []byte) {
	fmt.Printf("Received data from %s: %s\n", c.RemoteAddr(), data)
}

How can I handle pings?

Pings are handle automatically by the library, but you can get the content of those pings setting the callback using HandlePing.

For example, let's try to get the round trip time to a client by using the PING frame. The website http2.gofiber.io uses this method to measure the round trip time displayed at the bottom of the webpage.

package main

import (
	"sync"
	"encoding/binary"
	"log"
	"time"

	"github.com/valyala/fasthttp"
	"github.com/dgrr/websocket"
)

// Struct to keep the clients connected
//
// it should be safe to access the clients concurrently from Open and Close.
type RTTMeasure struct {
	clients sync.Map
}

// just trigger the ping sender
func (rtt *RTTMeasure) Start() {
	time.AfterFunc(time.Second * 2, rtt.sendPings)
}

func (rtt *RTTMeasure) sendPings() {
	var data [8]byte

	binary.BigEndian.PutUint64(data[:], uint64(
		time.Now().UnixNano()),
	)

	rtt.clients.Range(func(_, v interface{}) bool {
		c := v.(*websocket.Conn)
		c.Ping(data[:])
		return true
	})

	rtt.Start()
}

// register a connection when it's open
func (rtt *RTTMeasure) RegisterConn(c *websocket.Conn) {
	rtt.clients.Store(c.ID(), c)
	log.Printf("Client %s connected\n", c.RemoteAddr())
}

// remove the connection when receiving the close
func (rtt *RTTMeasure) RemoveConn(c *websocket.Conn, err error) {
	rtt.clients.Delete(c.ID())
	log.Printf("Client %s disconnected\n", c.RemoteAddr())
}

func main() {
	rtt := RTTMeasure{}

	ws := websocket.Server{}
	ws.HandleOpen(rtt.RegisterConn)
	ws.HandleClose(rtt.RemoveConn)
	ws.HandlePong(OnPong)

	// schedule the timer
	rtt.Start()

	fasthttp.ListenAndServe(":8080", ws.Upgrade)
}

// handle the pong message
func OnPong(c *websocket.Conn, data []byte) {
	if len(data) == 8 {
		n := binary.BigEndian.Uint64(data)
		ts := time.Unix(0, int64(n))

		log.Printf("RTT with %s is %s\n", c.RemoteAddr(), time.Now().Sub(ts))
	}
}

websocket vs gorilla vs nhooyr vs gobwas

Features websocket Gorilla Nhooyr gowabs
Concurrent R/W Yes No No. Only writes No
Passes Autobahn Test Suite Mostly Yes Yes Mostly
Receive fragmented message Yes Yes Yes Yes
Send close message Yes Yes Yes Yes
Send pings and receive pongs Yes Yes Yes Yes
Get the type of a received data message Yes Yes Yes Yes
Compression Extensions No Experimental Yes No (?)
Read message using io.Reader No Yes No No (?)
Write message using io.WriteCloser Yes Yes No No (?)

Stress tests

The following stress test were performed without timeouts:

Executing tcpkali --ws -c 100 -m 'hello world!!13212312!' -r 10k localhost:8081 the tests shows the following:

Websocket:
Total data sent:     267.7 MiB (280678466 bytes)
Total data received: 229.5 MiB (240626600 bytes)
Bandwidth per channel: 4.167⇅ Mbps (520.9 kBps)
Aggregate bandwidth: 192.357↓, 224.375↑ Mbps
Packet rate estimate: 247050.1↓, 61842.9↑ (1↓, 1↑ TCP MSS/op)
Test duration: 10.0075 s.
Websocket for net/http:
Total data sent:     267.3 MiB (280320124 bytes)
Total data received: 228.3 MiB (239396374 bytes)
Bandwidth per channel: 4.156⇅ Mbps (519.5 kBps)
Aggregate bandwidth: 191.442↓, 224.168↑ Mbps
Packet rate estimate: 188107.1↓, 52240.7↑ (1↓, 1↑ TCP MSS/op)
Test duration: 10.0039 s.

Either for fasthttp and net/http should be quite close, the only difference is the way they both upgrade.

Gorilla:
Total data sent:     260.2 MiB (272886768 bytes)
Total data received: 109.3 MiB (114632982 bytes)
Bandwidth per channel: 3.097⇅ Mbps (387.1 kBps)
Aggregate bandwidth: 91.615↓, 218.092↑ Mbps
Packet rate estimate: 109755.3↓, 66807.4↑ (1↓, 1↑ TCP MSS/op)
Test duration: 10.01 s.
Nhooyr: (Don't know why is that low)
Total data sent:     224.3 MiB (235184096 bytes)
Total data received: 41.2 MiB (43209780 bytes)
Bandwidth per channel: 2.227⇅ Mbps (278.3 kBps)
Aggregate bandwidth: 34.559↓, 188.097↑ Mbps
Packet rate estimate: 88474.0↓, 55256.1↑ (1↓, 1↑ TCP MSS/op)
Test duration: 10.0027 s.
Gobwas:
Total data sent:     265.8 MiB (278718160 bytes)
Total data received: 117.8 MiB (123548959 bytes)
Bandwidth per channel: 3.218⇅ Mbps (402.2 kBps)
Aggregate bandwidth: 98.825↓, 222.942↑ Mbps
Packet rate estimate: 148231.6↓, 72106.1↑ (1↓, 1↑ TCP MSS/op)
Test duration: 10.0015 s.

The source files are in this folder.

Documentation

Index

Constants

View Source
const (
	// StatusNone is used to let the peer know nothing happened.
	StatusNone StatusCode = 1000
	// StatusGoAway peer's error.
	StatusGoAway = 1001
	// StatusProtocolError problem with the peer's way to communicate.
	StatusProtocolError = 1002
	// StatusNotAcceptable when a request is not acceptable
	StatusNotAcceptable = 1003
	// StatusReserved when a reserved field have been used
	StatusReserved = 1004
	// StatusNotConsistent IDK
	StatusNotConsistent = 1007
	// StatusViolation a violation of the protocol happened
	StatusViolation = 1008
	// StatusTooBig payload bigger than expected
	StatusTooBig = 1009
	// StatuseExtensionsNeeded IDK
	StatuseExtensionsNeeded = 1010
	// StatusUnexpected IDK
	StatusUnexpected = 1011
)
View Source
const DefaultPayloadSize = 1 << 20

DefaultPayloadSize defines the default payload size (when none was defined).

Variables

View Source
var (
	// ErrCannotUpgrade shows up when an error occurred when upgrading a connection.
	ErrCannotUpgrade = errors.New("cannot upgrade connection")
)

Functions

func ReleaseFrame

func ReleaseFrame(fr *Frame)

ReleaseFrame puts fr Frame into the global pool.

func UpgradeAsClient

func UpgradeAsClient(c net.Conn, url string, r *fasthttp.Request) error

UpgradeAsClient will upgrade the connection as a client

This function should be used with connections that intend to use a plain framing.

r can be nil.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client holds a WebSocket connection.

The client is NOT concurrently safe. It is intended to be used with the Frame struct.

func ClientWithHeaders

func ClientWithHeaders(c net.Conn, url string, req *fasthttp.Request) (*Client, error)

ClientWithHeaders returns a Conn using an existing connection and sending custom headers.

func Dial

func Dial(url string) (*Client, error)

Dial establishes a websocket connection as client.

url parameter must follow the WebSocket URL format i.e. ws://host:port/path

func DialTLS

func DialTLS(url string, cnf *tls.Config) (*Client, error)

DialTLS establishes a websocket connection as client with the tls.Config. The config will be used if the URL is wss:// like.

func DialWithHeaders

func DialWithHeaders(url string, req *fasthttp.Request) (*Client, error)

DialWithHeaders establishes a websocket connection as client sending a personalized request.

func MakeClient

func MakeClient(c net.Conn, url string) (*Client, error)

MakeClient returns Conn using an existing connection.

url must be a complete URL format i.e. http://localhost:8080/ws

func (*Client) Close

func (c *Client) Close() error

Close gracefully closes the websocket connection.

func (*Client) ReadFrame

func (c *Client) ReadFrame(fr *Frame) (int, error)

ReadFrame reads a frame from the connection.

func (*Client) Write

func (c *Client) Write(b []byte) (int, error)

Write writes the content `b` as text.

To send binary content use WriteBinary.

func (*Client) WriteBinary added in v0.0.5

func (c *Client) WriteBinary(b []byte) (int, error)

WriteBinary writes the content `b` as binary.

To send text content use Write.

func (*Client) WriteFrame

func (c *Client) WriteFrame(fr *Frame) (int, error)

WriteFrame writes the frame into the WebSocket connection.

type CloseHandler

type CloseHandler func(c *Conn, err error)

CloseHandler fires when a connection has been closed.

type Code

type Code uint8

Code to send.

const (
	// CodeContinuation defines the continuation code
	CodeContinuation Code = 0x0
	// CodeText defines the text code
	CodeText Code = 0x1
	// CodeBinary defines the binary code
	CodeBinary Code = 0x2
	// CodeClose defines the close code
	CodeClose Code = 0x8
	// CodePing defines the ping code
	CodePing Code = 0x9
	// CodePong defines the pong code
	CodePong Code = 0xA
)

func (Code) String

func (code Code) String() string

type Conn added in v0.0.3

type Conn struct {

	// ReadTimeout ...
	ReadTimeout time.Duration

	// WriteTimeout ...
	WriteTimeout time.Duration

	// MaxPayloadSize prevents huge memory allocation.
	//
	// By default MaxPayloadSize is DefaultPayloadSize.
	MaxPayloadSize uint64
	// contains filtered or unexported fields
}

Conn represents a WebSocket connection on the server side.

This handler is compatible with io.Writer.

func (*Conn) Close added in v0.0.3

func (c *Conn) Close() error

func (*Conn) CloseDetail added in v0.0.6

func (c *Conn) CloseDetail(status StatusCode, reason string)

func (*Conn) ID added in v0.0.3

func (c *Conn) ID() uint64

ID returns a unique identifier for the connection.

func (*Conn) LocalAddr added in v0.0.3

func (c *Conn) LocalAddr() net.Addr

LocalAddr returns local address.

func (*Conn) Ping added in v0.0.3

func (c *Conn) Ping(data []byte)

func (*Conn) RemoteAddr added in v0.0.3

func (c *Conn) RemoteAddr() net.Addr

RemoteAddr returns peer remote address.

func (*Conn) SetUserValue added in v0.0.3

func (c *Conn) SetUserValue(key string, value interface{})

SetUserValue assigns a key to the given value

func (*Conn) UserValue added in v0.0.3

func (c *Conn) UserValue(key string) interface{}

UserValue returns the key associated value.

func (*Conn) Write added in v0.0.3

func (c *Conn) Write(data []byte) (int, error)

func (*Conn) WriteFrame added in v0.0.3

func (c *Conn) WriteFrame(fr *Frame)

type Error

type Error struct {
	Status StatusCode
	Reason string
}

func (Error) Error

func (e Error) Error() string

type ErrorHandler

type ErrorHandler func(c *Conn, err error)

ErrorHandler fires when an unknown error happens.

type Frame

type Frame struct {
	// contains filtered or unexported fields
}

Frame is the unit used to transfer message between endpoints using the websocket protocol.

func AcquireFrame

func AcquireFrame() *Frame

AcquireFrame gets Frame from the global pool.

func (*Frame) Code

func (fr *Frame) Code() Code

Code returns the code set in fr.

func (*Frame) CopyTo

func (fr *Frame) CopyTo(fr2 *Frame)

CopyTo copies the frame `fr` to `fr2`

func (*Frame) HasRSV1

func (fr *Frame) HasRSV1() bool

HasRSV1 checks if RSV1 bit is set.

func (*Frame) HasRSV2

func (fr *Frame) HasRSV2() bool

HasRSV2 checks if RSV2 bit is set.

func (*Frame) HasRSV3

func (fr *Frame) HasRSV3() bool

HasRSV3 checks if RSV3 bit is set.

func (*Frame) IsClose

func (fr *Frame) IsClose() bool

IsClose returns true if Code is CodeClose.

func (*Frame) IsContinuation

func (fr *Frame) IsContinuation() bool

IsContinuation returns true if the Frame code is Continuation

func (*Frame) IsControl

func (fr *Frame) IsControl() bool

IsControl returns whether the Frame is a control frame or not. That means if it's a Close, Ping or Pong frame.

func (*Frame) IsFin

func (fr *Frame) IsFin() bool

IsFin checks if FIN bit is set.

func (*Frame) IsMasked

func (fr *Frame) IsMasked() bool

IsMasked checks if Mask bit is set.

func (*Frame) IsPing

func (fr *Frame) IsPing() bool

IsPing returns true if Code is CodePing.

func (*Frame) IsPong

func (fr *Frame) IsPong() bool

IsPong returns true if Code is CodePong.

func (*Frame) Len

func (fr *Frame) Len() (length uint64)

Len returns the length of the payload based on the header bits.

If you want to know the actual payload length use #PayloadLen

func (*Frame) Mask

func (fr *Frame) Mask()

Mask performs the masking of the current payload

func (*Frame) MaskKey

func (fr *Frame) MaskKey() []byte

MaskKey returns mask key.

Returns zero-padded if doesn't have a mask

func (*Frame) Payload

func (fr *Frame) Payload() []byte

Payload returns the frame payload.

func (*Frame) PayloadLen

func (fr *Frame) PayloadLen() int

PayloadLen returns the actual payload length

func (*Frame) PayloadSize

func (fr *Frame) PayloadSize() uint64

PayloadSize returns the max payload size

func (*Frame) ReadFrom

func (fr *Frame) ReadFrom(rd io.Reader) (int64, error)

ReadFrom fills fr reading from rd.

func (*Frame) Reset

func (fr *Frame) Reset()

Reset resets all Frame values to the default.

func (*Frame) SetBinary

func (fr *Frame) SetBinary()

SetBinary sets CodeText in Code field.

func (*Frame) SetClose

func (fr *Frame) SetClose()

SetClose sets CodeClose in Code field.

func (*Frame) SetCode

func (fr *Frame) SetCode(code Code)

SetCode sets code bits.

func (*Frame) SetContinuation

func (fr *Frame) SetContinuation()

SetContinuation sets CodeContinuation in Code field.

func (*Frame) SetFin

func (fr *Frame) SetFin()

SetFin sets FIN bit.

func (*Frame) SetMask

func (fr *Frame) SetMask(b []byte)

SetMask sets the first 4 parsed bytes as mask key and enables the mask bit

func (*Frame) SetPayload

func (fr *Frame) SetPayload(b []byte)

SetPayload sets the parsed bytes as frame's payload

func (*Frame) SetPayloadSize

func (fr *Frame) SetPayloadSize(size uint64)

SetPayloadSize sets max payload size

func (*Frame) SetPing

func (fr *Frame) SetPing()

SetPing sets CodePing in Code field.

func (*Frame) SetPong

func (fr *Frame) SetPong()

SetPong sets CodePong in Code field.

func (*Frame) SetRSV1

func (fr *Frame) SetRSV1()

SetRSV1 sets RSV1 bit.

func (*Frame) SetRSV2

func (fr *Frame) SetRSV2()

SetRSV2 sets RSV2 bit.

func (*Frame) SetRSV3

func (fr *Frame) SetRSV3()

SetRSV3 sets RSV3 bit.

func (*Frame) SetStatus

func (fr *Frame) SetStatus(status StatusCode)

SetStatus sets status code.

Status code is usually used in Close request.

func (*Frame) SetText

func (fr *Frame) SetText()

SetText sets CodeText in Code field.

func (*Frame) Status

func (fr *Frame) Status() (status StatusCode)

Status returns StatusCode.

func (*Frame) String

func (fr *Frame) String() string

String returns a representation of Frame in a human-readable string format.

func (*Frame) Unmask

func (fr *Frame) Unmask()

Unmask performs the unmasking of the current payload

func (*Frame) UnsetMask

func (fr *Frame) UnsetMask()

UnsetMask only drops the mask bit.

func (*Frame) Write

func (fr *Frame) Write(b []byte) (int, error)

Write appends the parsed bytes to the frame's payload

func (*Frame) WriteTo

func (fr *Frame) WriteTo(wr io.Writer) (n int64, err error)

WriteTo writes the frame into wr.

type FrameHandler

type FrameHandler func(c *Conn, fr *Frame)

FrameHandler receives the raw frame. This handler is optional, if none is specified the server will run a default handler.

If the user specifies a FrameHandler, then it is going to receive all incoming frames.

type MessageHandler

type MessageHandler func(c *Conn, isBinary bool, data []byte)

MessageHandler receives the payload content of a data frame indicating whether the content is binary or not.

type OpenHandler

type OpenHandler func(c *Conn)

OpenHandler handles when a connection is open.

type PingHandler

type PingHandler func(c *Conn, data []byte)

PingHandler handles the data from a ping frame.

type PongHandler

type PongHandler func(c *Conn, data []byte)

PongHandler receives the data from a pong frame.

type RequestHandler

type RequestHandler func(conn *Conn)

RequestHandler is the websocket connection handler.

type Server

type Server struct {
	// UpgradeHandler allows the user to handle RequestCtx when upgrading for fasthttp.
	//
	// If UpgradeHandler returns false the connection won't be upgraded.
	UpgradeHandler UpgradeHandler

	// UpgradeHandler allows the user to handle the request when upgrading for net/http.
	//
	// If UpgradeNetHandler returns false, the connection won't be upgraded.
	UpgradeNetHandler UpgradeNetHandler

	// Protocols are the supported protocols.
	Protocols []string

	// Origin is used to limit the clients coming from the defined origin
	Origin string
	// contains filtered or unexported fields
}

Server represents the WebSocket server.

Server is going to be in charge of upgrading the connection, is not a server per-se.

func (*Server) HandleClose

func (s *Server) HandleClose(closeHandler CloseHandler)

HandleClose sets a callback for handling connection close.

func (*Server) HandleData

func (s *Server) HandleData(msgHandler MessageHandler)

HandleData sets the MessageHandler.

func (*Server) HandleError

func (s *Server) HandleError(errHandler ErrorHandler)

HandleError ...

func (*Server) HandleFrame

func (s *Server) HandleFrame(frameHandler FrameHandler)

HandleFrame sets a callback for handling all the incoming Frames.

If none is specified, the server will run a default handler.

func (*Server) HandleOpen

func (s *Server) HandleOpen(openHandler OpenHandler)

HandleOpen sets a callback for handling opening connections.

func (*Server) HandlePing

func (s *Server) HandlePing(pingHandler PingHandler)

HandlePing sets a callback for handling the data of the ping frames.

The server is in charge of replying to the PING frames, thus the client MUST not reply to any control frame.

func (*Server) HandlePong

func (s *Server) HandlePong(pongHandler PongHandler)

HandlePong sets a callback for handling the data of the pong frames.

func (*Server) NetUpgrade added in v0.0.9

func (s *Server) NetUpgrade(resp http.ResponseWriter, req *http.Request)

NetUpgrade upgrades the websocket connection for net/http.

func (*Server) Upgrade

func (s *Server) Upgrade(ctx *fasthttp.RequestCtx)

Upgrade upgrades websocket connections.

type StatusCode

type StatusCode uint16

StatusCode is sent when closing a connection.

The following constants have been defined by the RFC.

func (StatusCode) String

func (status StatusCode) String() string

type UpgradeHandler

type UpgradeHandler func(*fasthttp.RequestCtx) bool

UpgradeHandler is a middleware callback that determines whether the WebSocket connection should be upgraded or not. If UpgradeHandler returns false, the connection is not upgraded.

type UpgradeNetHandler added in v0.0.9

type UpgradeNetHandler func(resp http.ResponseWriter, req *http.Request) bool

UpgradeNetHandler is like UpgradeHandler but for net/http.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL