http

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 23, 2024 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Overview

This package contains a few utilities to make it easier to make and send http requests and handle the http responses. Another important part of this package is simple to use wrapper over Gorilla websockets so that both the client and server loops can be written in a uniform way.

Utilities for uniform websocket server and clients handling

Gorilla websockets is an amazing package to bring websocket functionality to your application. It is rather barebones (but extremely robust). We want some extra properties from our Websockets like:

  1. Typed messages
  2. Customized pings/pongs with tuneable timeouts
  3. Custom validation before upgrades to websocket
  4. More

A simple websocket example

Let us look at an example (available at cmd/timews/main.go). We want to build a very simple websocket endpoint that sends out the current time periodically seconds to connected subscribers. The subscribers can also publish a message that will be broadcast to all other connected subscribers (via a simple GET request). We would need two endpoints for this:

  • /subscribe: This endpoint lets a client connect to the websocket endpoint and subscribe to messages.
  • /publish: The publish endpoint is used by clients to broadcast an arbitrary message to all connected clients.

Let us start with the main function and setup these routes. This example uses the gorilla mux router to obtain request variables but any library accepting http.Handler should do.

package main
import (
	"fmt"
	"log"
	"net/http"
	"github.com/gorilla/mux"
	gohttp "github.com/github/goutils/http"
)

func main() {
	r := mux.NewRouter()

	// Publish Handler
	r.HandleFunc("/publish", func(w http.ResponseWriter, r *http.Request) {
		fmt.Fprintf(w, "Publishing Custom Message")
	})

	// Subscribe Handler
	r.HandleFunc("/subscribe", func (w http.ResponseWriter, r *http.Request) {
		fmt.Fprintf(w, "Subscribing to get time")
	})

	srv := http.Server{Handler: r}
	log.Fatal(srv.ListenAndServe())
}

Here the subscription is a normal http handler that returns a response. However, we want this to be a websocket subscription handler. So we need a couple helpers here:

1. A WSConn type to handle the lifecycle of this connection:

// Our handler is the place to put all our "state" for this connection type
type TimeHandler struct {
	// empty for now
}

// The Validate method gates the subscribe request to see if it should be upgraded
// and if so creates the right connection type to wrap the connection
// This examples allows all upgrades and is only needed to specify the kind of
// connection type to use - in this case TimeConn.
func (t *TimeHandler) Validate(w http.ResponseWriter, r *http.Request) (out *TimeConn, isValid bool) {
	return &TimeConn{handler: t}, true
}

// Our TimeConn allows us to override any connection instance specific behaviours
type TimeConn struct {
  gohttp.JSONConn
  handler *TimeHandler
}

We can now change our subscription http handler to:

timeHandler := NewTimeHandler()
r.HandleFunc("/subscribe", gohttp.WSServe(timeHandler, nil))

Now that we have a basic structure, we will use a conc.FanOut type to keep track of the list of subscribers. Update the TimeHandler to:

type TimeHandler struct {
	Fanout *conc.FanOut[conc.Message[any]]
}

// ... along with a corresponding New method
func NewTimeHandler() *TimeHandler {
	return &TimeHandler{Fanout: conc.NewFanOut[conc.Message[any]](nil)}
}

The TimeHandler ensures that (in its Validate method) a new TimeConn is created to manage the connection lifecycle. We will now register the TimeConn's "output" channel into the FanOut:

func (t *TimeConn) OnStart(conn *websocket.Conn) error {
	t.JSONConn.OnStart(conn)
	writer := t.JSONConn.Writer

	log.Println("Got a new connection.....")
	// Register the writer channel into the fanout
	t.handler.Fanout.Add(writer.SendChan(), nil, false)
	return nil
}

Similarly when a connection closes we want to de-register its output channel from the fanout:

func (t *TimeConn) OnClose() {
	writer := t.JSONConn.Writer

	// Removal can be synchronous or asynchronous - we want to ensure it is done
	// synchronously so another publish (if one came in) wont be attempted on a closed channel
	<- t.handler.Fanout.Remove(writer.SendChan(), true)
	t.JSONConn.OnClose()
}

Optional but we will disable timeouts from disconnecting our connection as we do not want to implement any client side logic (yet):

func (t *TimeConn) OnTimeout() bool {
	return false
}

That's all there is. Create a websocket connection to ws://localhost/subscribe. Easiest way is to use the tool websocat (https://github.com/vi/websocat) and:

websocat ws://localhost/subscribe

You will note that nothing is printed. That is because nothing is being published. Let us update our main method to send messages on the Fanout:

func main() {
	r := mux.NewRouter()
	timeHandler := NewTimeHandler()
	r.HandleFunc("/publish", func(w http.ResponseWriter, r *http.Request) {
		msg := r.URL.Query().Get("msg")
		timeHandler.Fanout.Send(conc.Message[any]{Value: fmt.Sprintf("%s: %s", time.Now().String(), msg)})
		fmt.Fprintf(w, "Published Message Successfully")
	})

	// Send the time every 1 second
	go func() {
		t := time.NewTicker(1 * time.Second)
		defer t.Stop()
		for {
			<-t.C
			timeHandler.Fanout.Send(conc.Message[any]{Value: time.Now().String()})
		}
	}()

	r.HandleFunc("/subscribe", gohttp.WSServe(timeHandler, nil))
	srv := http.Server{Handler: r}
	log.Fatal(srv.ListenAndServe())
}

We have also updated the /publish handler to send custom messages on the fanout.

Now our subscriptions will show the time as well as custom publishes (curl http://localhost/publish?msg=YOUR_CUSTOM_MESSAGE)

Mesage reading example

The above example was quite simplistic. One immediate improvement is to also read messages from subscribers and broadcast them to all other subscribers. This can be implemented with the HandleMessage method:

func (t *TimeConn) HandleMessage(msg any) error {
	log.Println("Received Message To Handle: ", msg)
	// sending to all listeners
	t.handler.Fanout.Send(conc.Message[any]{Value: msg})
	return nil
}

Note that since the TimeConn type composes/extends the JSONConn type, messages of type JSON are automatically read by the JSONConn.

Custom Typed Messages

Since TimeConn in the running example extended JSONConn, messages were implicitly read as JSON. Not so implicitly! WSConn interface offers a way to reading messages into any typed structure we desire. See the ReadMessage method in the WSConn interface.

Detailed example - TBD

WS Client Example

So far we have only seen the server side and used the websocat cli utility to subscribe. Here we will write a simple client side utility to replace websocat and also test our server using the same helpers.

1. Create a connection first

// create a (gorilla) websocker dialer
dialer := *websocket.DefaultDialer

// and dial - ignoring errors for now
conn, _, _ := dialer.Dial(u, header)

2. Create a WSConn handler

Just like in the server example create a WSConn type to handle the client side of the connection. This will also be similar to our server side conn with minor differences:

type TimeClientConn struct {
	gohttp.JSONConn
}

// Handle each message by just printing it
func (t *TimeClientConn) HandleMessage(msg any) error {
	log.Println("Received Message To Handle: ", msg)
	return nil
}

3. Associate WSConn with websocket.Conn

var timeconn TimeClientConn
gohttp.WSHandleConn(conn, &timeconn, nil)

4. As you start the server (in cmd/timews/main.go) and the client (cmd/timewsclient/main.go) you will see the client handle the messages from the server like:

```
2024/05/22 22:20:08 Starting JSONConn connection: lu2qgslo5e
2024/05/22 22:20:09 Received Message To Handle:  2024-05-22 22:20:09.360056 -0700 PDT m=+37.002593626
2024/05/22 22:20:10 Received Message To Handle:  2024-05-22 22:20:10.360081 -0700 PDT m=+38.002662293
2024/05/22 22:20:11 Received Message To Handle:  2024-05-22 22:20:11.359567 -0700 PDT m=+39.002191418
2024/05/22 22:20:12 Received Message To Handle:  2024-05-22 22:20:12.359239 -0700 PDT m=+40.001906418
2024/05/22 22:20:13 Received Message To Handle:  2024-05-22 22:20:13.359018 -0700 PDT m=+41.001728293
2024/05/22 22:20:14 Received Message To Handle:  2024-05-22 22:20:14.35917 -0700 PDT m=+42.001922418
2024/05/22 22:20:15 Received Message To Handle:  2024-05-22 22:20:15.359876 -0700 PDT m=+43.002670918
2024/05/22 22:20:16 Received Message To Handle:  2024-05-22 22:20:16.359953 -0700 PDT m=+44.002790543
```

Index

Examples

Constants

This section is empty.

Variables

View Source
var DefaultHttpClient *http.Client
View Source
var HighQPSHttpClient *http.Client
View Source
var LowQPSHttpClient *http.Client
View Source
var MediumQPSHttpClient *http.Client

Functions

func CORS added in v0.0.98

func CORS(next http.Handler) http.Handler

A very sample http handler func that disables CORS for local development.

func Call

func Call(req *http.Request, client *http.Client) (response interface{}, err error)

Makes a http with the tiven request and the http client. This is a wrapper over the standard library caller that creates a Client (if not provided), performs the request reads the entire body adn optionally converts the payload to an appropriate type based on the response' Content-Type header (for now only application/json is supported.

func ErrorToHttpCode

func ErrorToHttpCode(err error) int

func HTTPErrorCode

func HTTPErrorCode(err error) int

func JsonGet added in v0.0.98

func JsonGet(url string, onReq func(req *http.Request)) (interface{}, *http.Response, error)

A simple wrapper for performing JSON Get requests. The url is the full url once all query params have been added. The onReq callback allows customization of the http requests before it is sent.

func JsonToQueryString added in v0.0.98

func JsonToQueryString(json map[string]any) string

Helper method to convert a map into a json query string

Example
input := map[string]any{"a": 1, "b": 2}
queryStr := JsonToQueryString(input)
fmt.Println(queryStr)
Output:

a=1&b=2

func MakeUrl

func MakeUrl(host, path string, args string) (url string)

Creates a URL on a host, path and with optional query parameters

func NewBytesRequest

func NewBytesRequest(method string, endpoint string, body []byte) (req *http.Request, err error)

Wraps the NewRequest helper to create request to set the body from a byte array.

func NewJsonRequest

func NewJsonRequest(method string, endpoint string, body map[string]any) (req *http.Request, err error)

Wraps the NewRequest helper to create a request with the payload marshalled as JSON.

func NewRequest

func NewRequest(method string, endpoint string, bodyReader io.Reader) (req *http.Request, err error)

Creates a new http request with the given method, endpoint and a bodyready that provides the content the request body.

func NormalizeWsUrl added in v0.0.90

func NormalizeWsUrl(httpOrWsUrl string) string

Returns a normalized WS url equivalent for a given http url.

Example
fmt.Println(NormalizeWsUrl("http://google.com"))
fmt.Println(NormalizeWsUrl("https://github.com"))
Output:

ws://google.com
wss://github.com

func SendJsonResponse

func SendJsonResponse(writer http.ResponseWriter, resp interface{}, err error)

func WSConnJSONReaderWriter

func WSConnJSONReaderWriter(conn *websocket.Conn) (reader *conc.Reader[gut.StrMap], writer *conc.Writer[conc.Message[gut.StrMap]])

func WSConnWriteError

func WSConnWriteError(wsConn *websocket.Conn, err error) error

func WSConnWriteMessage

func WSConnWriteMessage(wsConn *websocket.Conn, msg interface{}) error

func WSHandleConn

func WSHandleConn[I any, S WSConn[I]](conn *websocket.Conn, ctx S, config *WSConnConfig)

Once a websocket connection is established (either by the server or by the client), this method handles the lifecycle of the connection by taking care of (healthceck) pings, handling closures, handling received messages.

Example
{
}
Output:

func WSServe

func WSServe[I any, S WSConn[I]](handler WSHandler[I, S], config *WSConnConfig) http.HandlerFunc

Returns a http.HandlerFunc that takes care of upgrading the request to a Websocket connection and handling its lifecycle by delegating important activities to the WSHandler type This method is often used to create a handler for particular routes on http routers.

The handler parameter is responsible for validating (eg authenticating/authorizing) the request to ensure an upgrade is allowed as well as handling messages received on the upgraded connection.

Example
r := mux.NewRouter()
r.HandleFunc("/publish", func(w http.ResponseWriter, r *http.Request) {
	fmt.Fprintf(w, "Publishing Custom Message")
})

r.HandleFunc("/subscribe", WSServe(&JSONHandler{}, nil))
srv := http.Server{Handler: r}
log.Fatal(srv.ListenAndServe())
Output:

Types

type BiDirStreamConfig

type BiDirStreamConfig struct {
	// Our connection can send pings at repeated intervals as a form of
	// healthcheck.  This property is a way to specify that duration.
	PingPeriod time.Duration

	// If no data (ping or otherwise) has been received from the remote
	// side within this duration then it is an indication for the handler to treat
	// this as a closed/timedout connection.  The handler an chose to terminate the connection
	// at this point by handling the OnTimeout method on the Conn interface.
	PongPeriod time.Duration
}

Configuration for a bidirectional "stream"

func DefaultBiDirStreamConfig

func DefaultBiDirStreamConfig() *BiDirStreamConfig

Creates a bidirectional stream config with default values for ping and pong durations.

type BiDirStreamConn

type BiDirStreamConn[I any] interface {
	// Called to send the next ping message.
	SendPing() error

	// Optional Name of the connection
	Name() string

	// Optional connection id
	ConnId() string

	// Called to handle the next message from the input stream on the ws conn.
	HandleMessage(msg I) error

	// Called to handle or suppress an error
	OnError(err error) error

	// Called when the connection closes.
	OnClose()

	// Called when data has not been received within the PongPeriod.
	OnTimeout() bool
}

type HTTPError

type HTTPError struct {
	Code    int
	Message string
}

Representing HTTP specific errors

func (*HTTPError) Error

func (t *HTTPError) Error() string

type JSONConn

type JSONConn struct {
	// The output writer as a channel to send outgoing messages on
	Writer *conc.Writer[conc.Message[any]]

	// Name of this connection (for clarity)
	NameStr string

	// A connection ID to identify this connection
	ConnIdStr string

	// Keeps track of the current Ping count to send with the ping
	PingId int64
}

An implementation of the WSConn interface that exchanges JSON message paylods

func (*JSONConn) ConnId

func (j *JSONConn) ConnId() string

Returns the (possibly auto-generated) Connection Id

func (*JSONConn) DebugInfo

func (j *JSONConn) DebugInfo() any

Basic debug information.

func (*JSONConn) HandleMessage

func (j *JSONConn) HandleMessage(msg any) error

Called to handle the next message from the input stream on the ws conn.

func (*JSONConn) Name

func (j *JSONConn) Name() string

Gets the name of this connection

func (*JSONConn) OnClose

func (j *JSONConn) OnClose()

Called when the connection closes.

func (*JSONConn) OnError

func (j *JSONConn) OnError(err error) error

func (*JSONConn) OnStart

func (j *JSONConn) OnStart(conn *websocket.Conn) error

This (callback) method is called when the websocket connection is upgraded but before the websocket event loop begins (in the WSHandleConn method)

func (*JSONConn) OnTimeout

func (j *JSONConn) OnTimeout() bool

Handle read timeouts. By default returns true to disconnect and close on a timeout.

func (*JSONConn) ReadMessage

func (j *JSONConn) ReadMessage(conn *websocket.Conn) (out any, err error)

Reads the next message from the websocket connection as a JSON payload

func (*JSONConn) SendPing

func (j *JSONConn) SendPing() error

Called to send the next ping message.

type JSONHandler

type JSONHandler struct {
}

func (*JSONHandler) Validate

func (j *JSONHandler) Validate(w http.ResponseWriter, r *http.Request) (out WSConn[any], isValid bool)

type URLWaiter added in v0.0.90

type URLWaiter struct {
	Method             string
	Url                string
	Headers            map[string]string
	Payload            map[string]any
	DelayBetweenChecks time.Duration

	// Func versions of above so we can do something dynamcially on each iteration
	RequestFunc  func(iter int, prevError error) (*http.Request, error)
	ValidateFunc func(req *http.Request, resp *http.Response) error
}

A simple utility that waits for a url to return a successful response before proceeding. This can be used for things like waiting for a database or another service to become available before performing other activities.

func (*URLWaiter) Run added in v0.0.90

func (u *URLWaiter) Run() (success bool, iter int, err error)

Runs the "waiter".

Returns a tuple of:

	success - whether the waited-upon eventually became active.
	iter    - How many iterations where run before a success/failure
 	err     - Error encountered if the "wait" was a failure

type WSConn

type WSConn[I any] interface {
	BiDirStreamConn[I]

	// Reads the next message from the ws conn.
	ReadMessage(w *websocket.Conn) (I, error)

	// Callback to be called when the WS connection is started
	OnStart(conn *websocket.Conn) error
}

Represents a bidirectional websocket connection

type WSConnConfig

type WSConnConfig struct {
	*BiDirStreamConfig
	Upgrader websocket.Upgrader
}

Extends BiDirStreamConfig to include Websocket specific configrations

func DefaultWSConnConfig

func DefaultWSConnConfig() *WSConnConfig

This method creates a WSConnConfig with a default websocket Upgrader

type WSHandler

type WSHandler[I any, S WSConn[I]] interface {
	// Called to validate an http request to see if it is upgradeable to a ws conn
	Validate(w http.ResponseWriter, r *http.Request) (S, bool)
}

Handlers validate a http request and decide whether they can/should be upgraded to create and begin a websocket connection (WSConn)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL