wsrpc

package module
v0.0.0-...-27d6e55 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 8, 2023 License: MIT Imports: 12 Imported by: 0

README

Web Socket RPC

A simple web socket api framework for sending json rpc requests. The framework is similar too but not necessarily compliant with the json rpc 2.0 standard.

GoDoc Go Report Card License

Web sockets are the primary transport method but the framework allows the client to fallback on long polls in case of unsuitable conditions.

A client library is available at: http://github.com/modfin/wsrpc-js

Implementing the server

The framework is similar to regular http frameworks by design to make it more intuitive, you have access to common resources like:

  • Middlewares
  • Context
  • Registering Handlers
  • Sticky Headers
  • Cookies
  • The raw HTTP request
Create the router
    import "github.com/modfin/wsrpc"

    ...

    router := wsrpc.NewRouter()
Apply middleware you wish to use or skip.

Middlewares can be appended to the router to affect all handlers on the router. It can also be specified in a specific handler registration to affect only that handler.

func someMiddleware(c wsrpc.Context, next wsrpc.NextFunc) error {
    err := next(ctx)
    if err != nil {
        log.Error(err)
    }

    return nil
}

    ...


func main() {
    ...
    router.Use(someMiddleware)
    ...
}
Extracting parameters

Paramaters are passed to the handler contexts request object as a raw json message.

func someCallHandler(ctx wsrpc.Context) error {
    ...
    var num int64
    err := json.Unmarshal(ctx.Request().Params, &num)
    if err != nil {
        return err
    }

    ...

}
Extracting and setting headers

Headers are passed to the handler contexts request object as key value objects.

func someCallHandler(ctx wsrpc.Context) error {
    ...
	num, ok := ctx.Request().Header.Get("state").Int()
	if !ok {
		return errors.New("missing valid request state")
	}
    ...

}
Working with the Context

The wsrpc Context used by handlers implements the standard go Context interface in addition to it's own features.

Context cancellations will cascade down from the top level connection down to each the connections batch of jobs and finally down to each job in said batches. Allowing us to cancel any action based on the handlers context.

func someStreamHandler(ctx wsrpc.Context, ch *wsrpc.ResponseChannel) error {
	countdown := 3
	
	for countdown > 0 {
		select {
			case <-ctx.Done():
				log.Println("context cancelled")
				return nil
			default:
		}
		
		countdown -= 1
	}
	
	return nil
}
The raw HTTP request

If we need something from the original HTTP request, e.g. the original request URL we can access it through the handler context

func someCallHandler(ctx wsrpc.Context) error {
...
	connectionUrl := ctx.HttpRequest().URL.String()
...
}
Sending responses to the client

Responses can be sent by setting the Content field on the contexts response object. THe response is automatically sent to the client when the handler returns.

If you wish to trigger the response right away, i.e. in the case of a streaming handler with multiple responses there is a channel available to stream handlers.

func someCallHandler(ctx wsrpc.Context) error {
...
	ctx.Response().Result, err = json.Marshal(num * num)
	if err != nil {
		return nil
	}}
...
}

func someStreamHandler(ctx wsrpc.Context, ch *wsrpc.ResponseChannel) error {
...
		rsp := ctx.NewResponse()
		
		var err error 
		rsp.Result, err = json.Marshal(42)
		if err != nil {
			return err
		}
		
		err = ch.Write(rsp)
		if err != nil {
			return err
		}
...
}
Registering Handlers

There are two handler funcs available. One for simple call and return once jobs, and one for streaming jobs where the server may respond an unknown number of times. Keep in min that a handler is expected to function in both web socket and long poll mode. Make sure your handlers are reentrant for long polls.

// type CallHandler func(ctx Context) (err error)
// type StreamHandler func(ctx Context, ch *ResponseChannel) (err error)

func main() {
    ...
    router.SetHandler("answerLife", someCallHandler)

    router.SetStream("countdown", someStreamHandler)
    ...

}

func myCallHandler(ctx wsrpc.Context) error {
    var err error
    ctx.Response().Content, err = json.Marshal(42)
    if err != nil {
        return err
    }
    
    return nil
}

func myStreamHandler(ctx wsrpc.Context, ch *wsrpc.ResponseChannel) (err error) {
    countdown := 3

    for countdown > 0 {
        rsp := ctx.NewResponse()
        rsp.Content, err = json.Marshal(countdown)
        if err != nil {
           return err
        }
     
        err = ch.Write(rsp)
        if err != nil {
            return err
        }

        countdown -= 1
    }

    return nil
}
A small reference setup
package main

import (
	"encoding/json"
	"errors"
	"log"
	"math/rand"

	"github.com/modfin/wsrpc"
)

func main() {
	router := wsrpc.NewRouter()

    router.Use(someMiddleware)
	
	router.SetHandler("cubify", someCallHandler, func(ctx wsrpc.Context, next wsrpc.NextFunc) error {
		if rand.Int() < 53200 {
			return errors.New("user is not authenticated")
		}
		
		return next(ctx)
	})
	
	router.SetStream("countdown", someStreamHandler)
	
	err := router.Start(":8080")
	if err != nil {
		log.Fatal(err)
	}
}

func someMiddleware(ctx wsrpc.Context, next wsrpc.NextFunc) error {
    err := next(ctx)
    if err != nil {
    	log.Println(err)
        return err
    }

    return next(ctx)
}

func someCallHandler(ctx wsrpc.Context) error {
	var num int64
	err := json.Unmarshal(ctx.Request().Params, &num)
	if err != nil {
		return err
	}

	ctx.Response().Result, err = json.Marshal(num * num)
	if err != nil {
		return nil
	}

	return nil
}

func someStreamHandler(ctx wsrpc.Context, ch *wsrpc.ResponseChannel) error {
	countdown, ok := ctx.Request().Header.Get("countdown").Int()
	if !ok {
		return errors.New("missing countdown")
	}

	for countdown > 0 {
		select {
			case <-ctx.Done():
				log.Println("context cancelled")
				return nil
			default:
		}

		rsp := ctx.NewResponse()

		var err error
		rsp.Result, err = json.Marshal(countdown)
		if err != nil {
			return err
		}

		err = ch.Write(rsp)
		if err != nil {
			return err
		}

		countdown -= 1
	}

	return nil
}

General guidelines

Server
  • Registered handlers are responsible for checking the provided input
  • Stream handlers channels go straight to client, mind your output

Issues

  • The error handling could probably be done with middleware, alternatively a logger could be attached
  • A data race occurs for both wrapped channel types when a stream handler is called with long polling.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrChanClosed is returned when writing to or reading from a closed channel
	ErrChanClosed = errors.New("channel closed")
)
View Source
var (
	// ErrContextCancelled is returned when detecting a cancelled context
	ErrContextCancelled = errors.New("context cancelled")
)

Functions

This section is empty.

Types

type CallHandler

type CallHandler func(ctx Context) (err error)

CallHandler is used to register a handler for RPCs which require exactly one response.

type Config

type Config struct {
	// Specifies the allowed origins that can initiate a websocket connection
	Origins []string
}

type Context

type Context interface {
	context.Context

	Request() *Request
	Response() *Response
	NewResponse() *Response
	HttpRequest() *http.Request
	WithValue(key interface{}, value interface{}) Context
}

Context is passed to request handlers with data that might be needed to handle the request. It fulfills the Context interface and adds a few wsrpc specific methods as well.

type Error

type Error struct {
	Code    int             `json:"code"`
	Message string          `json:"message"`
	Data    json.RawMessage `json:"data"`
	// contains filtered or unexported fields
}

Error contains all the necessary info when handling or returning an error in the wsrpc server.

func EOF

func EOF() *Error

EOF is used to denote end of contents in stream requests.

func IdIsRequiredError

func IdIsRequiredError() *Error

IdIsRequiredError is called when an id is missing from a request.

func MethodNotFoundError

func MethodNotFoundError(method string) *Error

MethodNotFoundError is called when a non existing method is requested.

func ServerError

func ServerError(outpErr error) *Error

ServerError repackages any regular error message into a wsrpc error which can be passed to a response.

func TypeNotFoundError

func TypeNotFoundError(t RequestType) *Error

TypeNotFoundError is called when an invalid request type is requested.

func (Error) Error

func (r Error) Error() string

Error is a stringer

type Headers

type Headers map[string]interface{}

Headers is the wsrpc equivalent of HTTP headers. They work much the same way.

func NewHeader

func NewHeader() Headers

NewHeader generates a new headers object.

func (Headers) Get

func (h Headers) Get(key string) *kv.KV

Get collects a key value pair from a headers object.

func (Headers) Set

func (h Headers) Set(key string, value interface{})

Set puts a key value pair on the headers object.

type InfChannel

type InfChannel struct {
	// contains filtered or unexported fields
}

InfChannel is used to pass data of unspecified types inside the wsrpc server. It is not meant to be made available inside a request handler like it's sibling ResponseChannel is

func NewInfChannel

func NewInfChannel() *InfChannel

NewInfChannel returns a new InfChannel

func (*InfChannel) Close

func (c *InfChannel) Close()

Close closes the channel.

type Middleware

type Middleware func(c Context, next NextFunc) error

Middleware is passed to middlewares to allow chaining of multiple actions before and/or after calling the handler.

type NextFunc

type NextFunc = func(ctx Context) error

NextFunc is used to call the next middleware/handler when registering middlware

type Request

type Request struct {
	Id     int             `json:"id"` // Legacy
	JobId  uuid.UUID       `json:"jobId"`
	Method string          `json:"method"`
	Type   RequestType     `json:"type"`
	Params json.RawMessage `json:"params"`
	Header Headers         `json:"header"`
}

Request contains information about the job requested by the client.

func (*Request) MarshalJSON

func (r *Request) MarshalJSON() ([]byte, error)

func (*Request) UnmarshalJSON

func (r *Request) UnmarshalJSON(data []byte) error

type RequestTarget

type RequestTarget Request

type RequestType

type RequestType string

RequestType is used to denote what kind of job is being requested

const (
	// TypeStream refers to jobs of streaming character.
	TypeStream RequestType = `STREAM`
	// TypeCall refers to jobs of on demand requests character.
	TypeCall RequestType = `CALL`
)

type Response

type Response struct {
	Id     int             `json:"id"` // Legacy
	JobId  uuid.UUID       `json:"jobId"`
	Result json.RawMessage `json:"result"`
	Header Headers         `json:"header"`
	Error  *Error          `json:"error"`
}

Response is serialized and passed back to the requester.

type ResponseChannel

type ResponseChannel struct {
	// contains filtered or unexported fields
}

ResponseChannel is passed to stream handlers to allow the implementer an easy way of sending responses back to the requester.

func NewResponseChannel

func NewResponseChannel(size int) *ResponseChannel

NewResponseChannel returns a new ResponseChannel.

func (*ResponseChannel) Close

func (c *ResponseChannel) Close()

Close closes the ResponseChannel.

func (*ResponseChannel) Closed

func (c *ResponseChannel) Closed() bool

Closed checks wether or not a ResponseChannel is closed.

func (*ResponseChannel) Write

func (c *ResponseChannel) Write(msg *Response) (err error)

Write sends a message through the ResponseChannel. Stream handlers are configured to pass this message back to the requester if possible.

type Router

type Router struct {
	// contains filtered or unexported fields
}

Router is used to demux incoming RPCs to the appropriate handlers.

func NewRouter

func NewRouter() *Router

Returns a new router with default settings

func NewRouterFromConfig

func NewRouterFromConfig(cfg *Config) *Router

NewRouterFromConfig returns a new router from a custom Config.

func (*Router) ServeHTTP

func (r *Router) ServeHTTP(w http.ResponseWriter, req *http.Request)

ServeHTTP is responsible for interpreting incoming HTTP requests and if appropriate upgrade the connection to web sockets. In either case it attempts to run the requested handler func.

func (*Router) SetErrorPostProc

func (r *Router) SetErrorPostProc(fn func(error))

SettErrorPostProc is called on errors that are propagated out of the system. Outside of middlewares this is the last chance to handle the error.

func (*Router) SetErrorPreProc

func (r *Router) SetErrorPreProc(fn func(error) error)

SetErrorPreProc is called on an error before it is propagated back out.

func (*Router) SetHandler

func (r *Router) SetHandler(method string, handler CallHandler, middleware ...Middleware)

SetHandler registers a call handler func.

func (*Router) SetStream

func (r *Router) SetStream(method string, handler StreamHandler, middleware ...Middleware)

SetStream registers a stream handler func.

func (*Router) Start

func (r *Router) Start(address string) error

Start is used to start a web server on the supplied address.

func (*Router) Use

func (r *Router) Use(middleware ...Middleware)

Use applies middleware to router

type StreamHandler

type StreamHandler func(ctx Context, ch *ResponseChannel) (err error)

StreamHandler is sued to register handlers which need to be able to send an unknown number of responses for any given request.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL