etp

package module
v3.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 5, 2024 License: MIT Imports: 10 Imported by: 1

README

etp

GoDoc Build and test codecov Go Report Card

ETP - event transport protocol on WebSocket, simple and powerful.

Highly inspired by socket.io. But there is no good implementation in Go, that is why etp exists.

The package based on github.com/nhooyr/websocket.

Install

go get -u github.com/txix-open/etp/v3

Features:

  • Rooms for server
  • Javascript client
  • Concurrent event emitting
  • Context based API
  • Event acknowledgment (for sync communication)

Internals

  • WebSocket message
    • websocket.MessageText (not binary)
    • <eventName>||<ackId>||<eventData>
  • Each event is handled concurrently in separated goroutine
  • Message limit is 1MB by default

Complete example

package main

import (
	"context"
	"fmt"
	"net/http"
	"time"

	"github.com/txix-open/etp/v3"
	"github.com/txix-open/etp/v3/msg"
)

func main() {
	srv := etp.NewServer(etp.WithServerAcceptOptions(&etp.AcceptOptions{
		InsecureSkipVerify: true, //completely ignore CORS checks, enable only for dev purposes
	}))

	//callback to handle new connection
	srv.OnConnect(func(conn *etp.Conn) {
		//you have access to original HTTP request
		fmt.Printf("id: %d, url: %s, connected\n", conn.Id(), conn.HttpRequest().URL)
		srv.Rooms().Join(conn, "goodClients") //leave automatically then disconnected

		conn.Data().Set("key", "value") //put any data associative with connection
	})

	//callback to handle disconnection
	srv.OnDisconnect(func(conn *etp.Conn, err error) {
		fmt.Println("disconnected", conn.Id(), err, etp.IsNormalClose(err))
	})

	//callback to handle any error during serving
	srv.OnError(func(conn *etp.Conn, err error) {
		connId := uint64(0)
		// be careful, conn can be nil on upgrading protocol error (before success WebSocket connection)
		if conn != nil {
			connId = conn.Id()
		}
		fmt.Println("error", connId, err)
	})

	//your business event handler
	//each event is handled in separated goroutine
	srv.On("hello", etp.HandlerFunc(func(ctx context.Context, conn *etp.Conn, event msg.Event) []byte {
		fmt.Printf("hello event received: %s, %s\n", event.Name, event.Data)
		return []byte("hello handled")
	}))

	//fallback handler
	srv.OnUnknownEvent(etp.HandlerFunc(func(ctx context.Context, conn *etp.Conn, event msg.Event) []byte {
		fmt.Printf("unknown event received, %s, %s\n", event.Name, event.Data)
		_ = conn.Close() //you may close a connection whenever you want
		return nil
	}))

	mux := http.NewServeMux()
	mux.Handle("GET /ws", srv)
	go func() {
		err := http.ListenAndServe(":8080", mux)
		if err != nil {
			panic(err)
		}
	}()
	time.Sleep(1 * time.Second)

	cli := etp.NewClient().
		OnDisconnect(func(conn *etp.Conn, err error) { //basically you have all handlers like a server here
			fmt.Println("client disconnected", conn.Id(), err)
		})
	err := cli.Dial(context.Background(), "ws://localhost:8080/ws")
	if err != nil {
		panic(err)
	}

	//to check connection is still alive
	err = cli.Ping(context.Background())
	if err != nil {
		panic(err)
	}

	//simply emit event
	err = cli.Emit(context.Background(), "hello", []byte("fire and forget"))
	if err != nil {
		panic(err)
	}

	//emit event and wait server response
	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) //highly recommended to set up timeout for waiting acknowledgment
	defer cancel()
	resp, err := cli.EmitWithAck(ctx, "hello", []byte("wait acknowledgment"))
	if err != nil {
		panic(err)
	}
	fmt.Printf("hello resposne: '%s'\n", resp)

	//data can be empty
	err = cli.Emit(ctx, "unknown event", nil)
	if err != nil {
		panic(err)
	}

	time.Sleep(15 * time.Second)

	//call to disconnect all clients
	srv.Shutdown()
}

V3 Migration

  • Internal message format is the same as v2
  • Each event now is handled in separated goroutine (completely async)
  • Significantly reduce code base, removed redundant interfaces
  • Fixed some memory leaks and potential deadlocks
  • Main package github.com/txix-open/isp-etp-go/v2 -> github.com/txix-open/etp/v3
  • OnDefault -> OnUnknownEvent
  • On* API are the same either etp.Client and etp.Server
  • WAS
srv.On("event", func (conn etp.Conn, data []byte) {
    log.Println("Received " + testEvent + ":" + string(data))
}).
  • BECOME
srv.On("hello", etp.HandlerFunc(func(ctx context.Context, conn *etp.Conn, event msg.Event) []byte {
    fmt.Printf("hello event received: %s, %s\n", event.Name, event.Data)
    return []byte("hello handled")
}))

The second param now is a interface.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrClientClosed = errors.New("client closed")
)

Functions

func IsNormalClose

func IsNormalClose(err error) bool

Types

type AcceptOptions

type AcceptOptions = websocket.AcceptOptions

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(opts ...ClientOption) *Client

func (*Client) Close

func (c *Client) Close() error

func (*Client) Dial

func (c *Client) Dial(ctx context.Context, url string) error

func (*Client) Emit

func (c *Client) Emit(ctx context.Context, event string, data []byte) error

func (*Client) EmitWithAck

func (c *Client) EmitWithAck(ctx context.Context, event string, data []byte) ([]byte, error)

func (*Client) On

func (c *Client) On(event string, handler Handler) *Client

func (*Client) OnConnect

func (c *Client) OnConnect(handler ConnectHandler) *Client

func (*Client) OnDisconnect

func (c *Client) OnDisconnect(handler DisconnectHandler) *Client

func (*Client) OnError

func (c *Client) OnError(handler ErrorHandler) *Client

func (*Client) OnUnknownEvent

func (c *Client) OnUnknownEvent(handler Handler) *Client

func (*Client) Ping

func (c *Client) Ping(ctx context.Context) error

type ClientOption

type ClientOption func(*clientOptions)

func WithClientDialOptions

func WithClientDialOptions(opts *DialOptions) ClientOption

func WithClientReadLimit

func WithClientReadLimit(limit int64) ClientOption

type Conn

type Conn struct {
	// contains filtered or unexported fields
}

func (*Conn) Close

func (c *Conn) Close() error

func (*Conn) Data added in v3.1.0

func (c *Conn) Data() *store.Store

func (*Conn) Emit

func (c *Conn) Emit(ctx context.Context, event string, data []byte) error

func (*Conn) EmitWithAck

func (c *Conn) EmitWithAck(ctx context.Context, event string, data []byte) ([]byte, error)

func (*Conn) HttpRequest

func (c *Conn) HttpRequest() *http.Request

func (*Conn) Id

func (c *Conn) Id() uint64

func (*Conn) Ping

func (c *Conn) Ping(ctx context.Context) error

type ConnectHandler

type ConnectHandler func(conn *Conn)

type DialOptions

type DialOptions = websocket.DialOptions

type DisconnectHandler

type DisconnectHandler func(conn *Conn, err error)

type ErrorHandler

type ErrorHandler func(conn *Conn, err error)

type Handler

type Handler interface {
	Handle(ctx context.Context, conn *Conn, event msg.Event) []byte
}

type HandlerFunc

type HandlerFunc func(ctx context.Context, conn *Conn, event msg.Event) []byte

func (HandlerFunc) Handle

func (h HandlerFunc) Handle(ctx context.Context, conn *Conn, event msg.Event) []byte

type Rooms

type Rooms struct {
	// contains filtered or unexported fields
}

func (*Rooms) AllConns

func (s *Rooms) AllConns() []*Conn

func (*Rooms) Clear

func (s *Rooms) Clear(rooms ...string)

func (*Rooms) Get

func (s *Rooms) Get(connId uint64) (*Conn, bool)

func (*Rooms) Join

func (s *Rooms) Join(conn *Conn, rooms ...string)

func (*Rooms) LeaveByConnId

func (s *Rooms) LeaveByConnId(id uint64, rooms ...string)

func (*Rooms) Len

func (s *Rooms) Len(room string) int

func (*Rooms) Rooms

func (s *Rooms) Rooms() []string

func (*Rooms) ToBroadcast

func (s *Rooms) ToBroadcast(rooms ...string) []*Conn

type Server

type Server struct {
	// contains filtered or unexported fields
}

func NewServer

func NewServer(opts ...ServerOption) *Server

func (*Server) On

func (s *Server) On(event string, handler Handler) *Server

func (*Server) OnConnect

func (s *Server) OnConnect(handler ConnectHandler) *Server

func (*Server) OnDisconnect

func (s *Server) OnDisconnect(handler DisconnectHandler) *Server

func (*Server) OnError

func (s *Server) OnError(handler ErrorHandler) *Server

func (*Server) OnUnknownEvent

func (s *Server) OnUnknownEvent(handler Handler) *Server

func (*Server) Rooms

func (s *Server) Rooms() *Rooms

func (*Server) ServeHTTP

func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request)

func (*Server) Shutdown

func (s *Server) Shutdown()

type ServerOption

type ServerOption func(*serverOptions)

func WithServerAcceptOptions

func WithServerAcceptOptions(opts *AcceptOptions) ServerOption

func WithServerReadLimit

func WithServerReadLimit(limit int64) ServerOption

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL