gotalk

package module
v1.3.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 6, 2020 License: MIT Imports: 26 Imported by: 51

README

gotalk

Gotalk exists to make it easy for programs to talk with one another over the internet, like a web app coordinating with a web server, or a bunch of programs dividing work amongst each other.

GitHub tag (latest SemVer) PkgGoDev Go Report Card

Gotalk...

  • is an efficient, easily debuggable multiplexing data transfer protocol
  • is transport agnostic: works on any byte stream
  • offers a high-level, easy-to-get-started API for WebSockets
  • enables arbitrary number of requests & responses over a single persistent connection
  • includes a small built-in JavaScript library
  • provides a small and focused Go API

Go API Documentation on godoc.org →

Usage

Gotalk is a simple go module - import it into your program and go build:

import "github.com/rsms/gotalk"

To use a specific version, run go get github.com/rsms/gotalk@v1.0.1 (substituting the version number for the version you desire.)

Examples can be found in the examples directory. Build them with go build:

$ cd examples/websocket-chat
$ go build
$ ./websocket-chat
Listening on http://localhost:1235/

Here's a minimal but complete example program: (examples/websocket-minimal)

package main
import (
  "net/http"
  "github.com/rsms/gotalk"
)
func main() {
  gotalk.Handle("echo", func(in string) (string, error) {
    return in, nil
  })
  http.Handle("/gotalk/", gotalk.WebSocketHandler())
  http.Handle("/", http.FileServer(http.Dir(".")))
  print("Listening on http://localhost:1234/\n")
  panic(http.ListenAndServe("localhost:1234", nil))
}
Developing Gotalk & contributing

See CONTRIBUTING.md

Other implementations

Introduction

A terribly boring amateur comic strip

Gotalk takes the natural approach of bidirectional and concurrent communication — any peer have the ability to expose "operations" as well as asking other peers to perform operations. The traditional restrictions of who can request and who can respond usually associated with a client-server model is nowhere to be found in Gotalk.

Gotalk in a nutshell

Bidirectional — There's no discrimination on capabilities depending on who connected or who accepted. Both "servers" and "clients" can expose operations as well as send requests to the other side.

Concurrent — Requests, results, and notifications all share a single connection without blocking each other by means of pipelining. There's no serialization on request-result or even for a single large message, as the Gotalk protocol is frame-based and multiplexes messages over a single connection. This means you can perform several requests at once without having to think about queueing or blocking.

Diagram of how Gotalk uses connection pipelining

Simple — Gotalk has a simple and opinionated API with very few components. You expose an operation via "handle" and send requests via "request".

Debuggable — The Gotalk protocol's wire format is ASCII-based for easy on-the-wire inspection of data. For example, here's a protocol message representing an operation request: r0001005hello00000005world. The Gotalk protocol can thus be operated over any reliable byte transport.

Practical — Gotalk includes a JavaScript implementation for Web Sockets alongside the full-featured Go implementation, making it easy to build real-time web applications. The Gotalk source code also includes a number of easily-readable examples.

By example

There are a few examples in the examples directory demonstrating Gotalk. But let's explore a simple program right now — here's a little something written in Go which demonstrates the use of an operation named "greet":

func server() {
  gotalk.Handle("greet", func(in GreetIn) (GreetOut, error) {
    return GreetOut{"Hello " + in.Name}, nil
  })
  if err := gotalk.Serve("tcp", "localhost:1234"); err != nil {
    log.Fatalln(err)
  }
}

func client() {
  s, err := gotalk.Connect("tcp", "localhost:1234")
  if err != nil {
    log.Fatalln(err)
  }
  greeting := &GreetOut{}
  if err := s.Request("greet", GreetIn{"Rasmus"}, greeting); err != nil {
    log.Fatalln(err)
  }
  log.Printf("greeting: %+v\n", greeting)
  s.Close()
}

Let's look at the above example in more detail, broken apart to see what's going on.

We begin by importing the gotalk library together with log which we use for printing to the console:

package main
import (
  "log"
  "github.com/rsms/gotalk"
)

We define two types: Expected input (request parameters) and output (result) for our "greet" operation:

type GreetIn struct {
  Name string `json:"name"`
}
type GreetOut struct {
  Greeting string `json:"greeting"`
}

Registers a process-global request handler for an operation called "greet" accepting parameters of type GreetIn, returning results of type GreetOut:

func server() {
  gotalk.Handle("greet", func(in GreetIn) (GreetOut, error) {
    return GreetOut{"Hello " + in.Name}, nil
  })

Finally at the bottom of our server function we call gotalk.Serve, which starts a local TCP server on port 1234:

  if err := gotalk.Serve("tcp", "localhost:1234"); err != nil {
    log.Fatalln(err)
  }
}

In out client function we start by connecting to the server:

func client() {
  s, err := gotalk.Connect("tcp", "localhost:1234")
  if err != nil {
    log.Fatalln(err)
  }

Finally we send a request for "greet" and print the result:

  greeting := GreetOut{}
  if err := s.Request("greet", GreetIn{"Rasmus"}, &greeting); err != nil {
    log.Fatalln(err)
  }
  log.Printf("greeting: %+v\n", greeting)

  s.Close()
}

Output:

greeting: {Greeting:Hello Rasmus}

Gotalk in the web browser

Gotalk is implemented not only in the full-fledged Go package, but also in a JavaScript library. This allows writing web apps talking Gotalk via Web Sockets possible.

// server.go:
package main
import (
  "net/http"
  "github.com/rsms/gotalk"
)
func main() {
  gotalk.Handle("echo", func(in string) (string, error) {
    return in, nil
  })
  http.Handle("/gotalk/", gotalk.WebSocketHandler())
  http.Handle("/", http.FileServer(http.Dir(".")))
  err := http.ListenAndServe("localhost:1234", nil)
  if err != nil {
    panic(err)
  }
}

In our html document, we begin by registering any operations we can handle:

<!-- index.html -->
<body>
<script type="text/javascript" src="/gotalk/gotalk.js"></script>
<script>
gotalk.handle('greet', function (params, result) {
  result({ greeting: 'Hello ' + params.name });
});
</script>

Notice how we load a JavaScript from "/gotalk/gotalk.js" — a gotalk web socket server embeds a matching web browser JS library which it returns from {path where gotalk web socket is mounted}/gotalk.js. It uses Etag cache validation, so you shouldn't need to think about "cache busting" the URL.

We can't "listen & accept" connections in a web browser, but we can "connect" so we do just that:

<!-- index.html -->
<body>
<script type="text/javascript" src="/gotalk/gotalk.js"></script>
<script>
gotalk.handle('greet', function (params, result) {
  result({ greeting: 'Hello ' + params.name });
});

var s = gotalk.connection().on('open', function () {
  // do something useful
}).on('close', function (err) {
  if (err.isGotalkProtocolError) return console.error(err);
});
</script>

This is enough for enabling the server to do things in the browser ...

But you probably want to have the browser send requests to the server, so let's send a "echo" request just as our connection opens:

var s = gotalk.connection().on('open', function () {
  s.request("echo", "Hello world", function (err, result) {
    if (err) return console.error('echo failed:', err);
    console.log('echo result:', result);
  });
});

We could rewrite our code like this to allow some UI component to send a request:

var s = gotalk.connection();

button.addEventListener('click', function () {
  s.request("echo", "Hello world", function (err, result) {
    if (err) return console.error('echo failed:', err);
    console.log('echo result:', result);
  });
});

The request will fail with an error "socket is closed" if the user clicks our button while the connection isn't open.

There are two ways to open a connection on a socket: Sock.prototype.open which simply opens a connection, and Sock.prototype.openKeepAlive which keeps the connection open, reconnecting as needed with exponential back-off and internet reachability knowledge. gotalk.connection() is a short-hand for creating a new Sock with gotalk.defaultHandlers and then calling openKeepAlive on it.

Protocol and wire format

The wire format is designed to be human-readable and flexible; it's byte-based and can be efficiently implemented in a number of environments ranging from HTTP and WebSocket in a web browser to raw TCP in Go or C. The protocol provides only a small set of operations on which more elaborate operations can be modeled by the user.

This document describes protocol version 1

Here's a complete description of the protocol:

conversation    = ProtocolVersion Message*
message         = SingleRequest | StreamRequest
                | SingleResult | StreamResult
                | ErrorResult | RetryResult
                | Notification | ProtocolError

ProtocolVersion = <hexdigit> <hexdigit>

SingleRequest   = "r" requestID operation payload
StreamRequest   = "s" requestID operation payload StreamReqPart*
StreamReqPart   = "p" requestID payload
SingleResult    = "R" requestID payload
StreamResult    = "S" requestID payload StreamResult*
ErrorResult     = "E" requestID payload
RetryResult     = "e" requestID wait payload
Notification    = "n" name payload
Heartbeat       = "h" load time
ProtocolError   = "f" code

requestID       = <byte> <byte> <byte> <byte>

operation       = text3
name            = text3
wait            = hexUInt8
code            = hexUInt8
time            = hexUInt8
load            = hexUInt4

text3           = text3Size text3Value
text3Size       = hexUInt3
text3Value      = <<byte>{text3Size} as utf8 text>

payload         = payloadSize payloadData
payloadSize     = hexUInt8
payloadData     = <byte>{payloadSize}

hexUInt3        = <hexdigit> <hexdigit> <hexdigit>
hexUInt4        = <hexdigit> <hexdigit> <hexdigit> <hexdigit>
hexUInt8        = <hexdigit> <hexdigit> <hexdigit> <hexdigit>
                  <hexdigit> <hexdigit> <hexdigit> <hexdigit>
Handshake

A conversation begins with the protocol version:

01  -- ProtocolVersion 1

If the version of the protocol spoken by the other end is not supported by the reader, a ProtocolError message is sent with code 1 and the connection is terminated. Otherwise, any messages are read and/or written.

Single-payload requests and results

This is a "single-payload" request ...

+------------------ SingleRequest
|   +---------------- requestID   "0001"
|   |      +--------- operation   "echo" (text3Size 4, text3Value "echo")
|   |      |       +- payloadSize 25
|   |      |       |
r0001004echo00000019{"message":"Hello World"}

... and a corresponding "single-payload" result:

+------------------ SingleResult
|   +---------------- requestID   "0001"
|   |       +-------- payloadSize 25
|   |       |
R000100000019{"message":"Hello World"}

Each request is identified by exactly three bytes—the requestID—which is requestor-specific and has no purpose beyond identity, meaning the value is never interpreted. 4 bytes can express 4 294 967 296 different values, meaning we can send up to 4 294 967 295 requests while another request is still being served. Should be enough.

These "single" requests & results are the most common protocol messages, and as their names indicates, their payloads follow immediately after the header. For large payloads this can become an issue when dealing with many concurrent requests over a single connection, for which there's a more complicated "streaming" request & result type which we will explore later on.

Faults

There are two types of replies indicating a fault: ErrorResult for requestor faults and RetryResult for responder faults.

If a request is faulty, like missing some required input data or sent over an unauthorized connection, an "error" is send as the reply instead of a regular result:

+------------------ ErrorResult
|   +---------------- requestID   "0001"
|   |       +-------- payloadSize 38
|   |       |
E000100000026{"error":"Unknown operation \"echo\""}

A request that produces an error should not be retried as-is, similar to the 400-class of errors of the HTTP protocol.

In the scenario a fault occurs on the responder side, like suffering a temporary internal error or is unable to complete the request because of resource starvation, a RetryResult is sent as the reply to a request:

+-------------------- RetryResult
|   +------------------ requestID   "0001"
|   |       +---------- wait        0
|   |       |       +-- payloadSize 20
|   |       |       |
e00010000000000000014"service restarting"

In this case — where wait is zero — the requestor is free to retry the request at its convenience.

However in some scenarios the responder might require the requestor to wait for some time before retrying the request, in which case the wait property has a non-zero value:

+-------------------- RetryResult
|   +------------------ requestID   "0001"
|   |       +---------- wait        5000 ms
|   |       |       +-- payloadSize 20
|   |       |       |
e00010000138800000014"request rate limit"

In this case the requestor must not retry the request until at least 5000 milliseconds has passed.

If the protocol communication itself experiences issues—e.g. an illegal message is received—a ProtocolError is written and the connection is closed.

ProtocolError codes:

Code Meaning Comments
0 Abnormal Closed because of an abnormal condition (e.g. server fault, etc)
1 Unsupported protocol The other side does not support the callers protocol
2 Invalid message An invalid message was transmitted
3 Timeout The other side closed the connection because communicating took too long

Example of a peer which does not support the version of the protocol spoken by the sender:

+-------- ProtocolError
|       +-- code 1
|       |
f00000001
Streaming requests and results

For more complicated scenarios there are "streaming-payload" requests and results at our disposal. This allows transmitting of large amounts of data without the need for large buffers. For example this could be used to forward audio data to audio playback hardware, or to transmit a large file off of slow media like a tape drive or hard-disk drive.

Because transmitting a streaming request or result does not occupy "the line" (single-payloads are transmitted serially), they can also be useful when there are many concurrent requests happening over a single connection.

Here's an example of a "streaming-payload" request ...

+------------------ StreamRequest
|   +---------------- requestID   "0001"
|   |      +--------- operation   "echo" (text3Size 4, text3Value "echo")
|   |      |       +- payloadSize 11
|   |      |       |
s0001004echo0000000b{"message":

+------------------ streamReqPart
|   +---------------- requestID   "0001"
|   |       +-------- payloadSize 14
|   |       |
p00010000000e"Hello World"}

+------------------ streamReqPart
|   +---------------- requestID   "0001"
|   |       +-------- payloadSize 0 (end of stream)
|   |       |
p000100000000

... followed by a "streaming-payload" result:

+------------------ StreamResult (1st part)
|   +---------------- requestID   "0001"
|   |       +-------- payloadSize 11
|   |       |
S00010000000b{"message":

+------------------ StreamResult (2nd part)
|   +---------------- requestID   "0001"
|   |       +-------- payloadSize 14
|   |       |
S00010000000e"Hello World"}

+------------------ StreamResult
|   +---------------- requestID   "0001"
|   |       +-------- payloadSize 0 (end of stream)
|   |       |
S000100000000

Streaming requests occupy resources on the responder's side for the duration of the "stream session". Therefore handling of streaming requests should be limited and "RetryResult" used to throttle requests:

+-------------------- RetryResult
|   +------------------ requestID   "0001"
|   |       +---------- wait        5000 ms
|   |       |       +-- payloadSize 19
|   |       |       |
e00010000138800000013"stream rate limit"

This means that the requestor must not send any new requests until wait time has passed.

Notifications

When there's no expectation on a response, Gotalk provides a "notification" message type:

+---------------------- Notification
|              +--------- name        "chat message" (text3Size 12, text3Value "chat message")
|              |       +- payloadSize 46
|              |       |
n00cchat message0000002e{"message":"Hi","from":"nthn","room":"gonuts"}

Notifications are never replied to nor can they cause "error" results. Applications needing acknowledgement of notification delivery might consider using a request instead.

Heartbeats

Because most responders will limit the time it waits for reads, a heartbeat message is send at a certain interval. When a heartbeat is sent is up to the implementation.

A heartbeat contains the sender's local time in the form of an unsigned 32-bit UNIX timestamp. This is enought to cover usage until 2106. I really hope gotalk is nowhere to be found in 2106.

It also contains an optional "load" value, indicating how pressured, or under what load, the sender is. 0 means "idle" and 65535 (0xffff) means "omg I think I'm dying." This can be used to distribute work to less loaded responders in a load-balancing setup.

+------------------ Heartbeat
|   +--------- load 2
|   |       +- time 2015-02-08 22:09:30 UTC
|   |       |
h000254d7de9a
Notes

Requests and results does not need to match on the "single" vs "streaming" detail — it's perfectly fine to send a streaming request and read a single response, or send a single response just to receive a streaming result. The payload type is orthogonal to the message type, with the exception of an error response which is always a "single-payload" message, carrying any information about the error in its payload. Note however that the current version of the Go package does not provide a high-level API for mixed-kind request-response handling.

For transports which might need "heartbeats" to stay alive, like some raw TCP connections over the internet, the suggested way to implement this is by notifications, e.g. send a "heartbeat" notification at a certain interval while no requests are being sent. The Gotalk protocol does not include a "heartbeat" feature because of this reason, as well as the fact that some transports (like web socket) already provide "heartbeat" features.

Documentation

Overview

Gotalk is a complete muli-peer real-time messaging library. See https://github.com/rsms/gotalk#readme for a more in-depth explanation of what Gotalk is and what it can do for you.

Most commonly Gotalk is used for rich web app development, as an alternative to HTTP APIs, when the web app mainly runs client-side rather than uses a traditional "request a new page" style.

WebSocket example

Here is an example of a minimal but fully functional web server with Gotalk over websocket:

package main
import (
	"github.com/rsms/gotalk"
	"net/http"
)
type Message struct {
	Author string
	Body   string
}
func main() {
	// This function handles requests for "test/message".
	gotalk.Handle("test/message", func(input string) (*Message, error) {
		// It can return any Go type. Here we return a structure and no error.
		return &Message{Author: "Bob", Body: input}, nil
	})
	// mount Gotalk at "/gotalk/"
	http.Handle("/gotalk/", gotalk.NewWebSocketServer())
	// mount a file server to handle all other requests
	http.Handle("/", http.FileServer(http.Dir(".")))
	panic(http.ListenAndServe("localhost:1234", nil))
}

Here is a matching HTML document; a very basic web app:

<!DOCTYPE HTML>
<html lang="en">
	<head>
		<meta charset="utf-8">
		<!-- load the built-in JS library -->
		<script type="text/javascript" src="/gotalk/gotalk.js"></script>
	</head>
	<body style="white-space:pre;font-family:monospace"><button>Send request</button>
	<script>
	// create a connection (automatically reconnects as needed)
	let c = gotalk.connection()
		.on('open', async() => log(`connection opened\n`))
		.on('close', reason => log(`connection closed (reason: ${reason})\n`))
	// make out button send a request
	document.body.firstChild.onclick = async () => {
		let res = await c.requestp('test/message', 'hello ' + new Date())
		log(`reply: ${JSON.stringify(res, null, 2)}\n`)
	}
	function log(message) {
		document.body.appendChild(document.createTextNode(message))
	}
	</script>
	</body>
</html>

API layers

Gotalk can be thought of as being composed by four layers:

  1. Request-response API with automatic Go data encoding/decoding
  2. Request management, connection management
  3. Transport (TCP, pipes, unix sockets, HTTP, WebSocket, etc)
  4. Framed & streaming byte-based message protocol

You can make use of only some parts. For example you could write and read structured data in files using the message protocol and basic file I/O, or use the high-level request-response API with some custom transport.

Index

Constants

View Source
const (
	MsgTypeSingleReq     = MsgType('r')
	MsgTypeStreamReq     = MsgType('s')
	MsgTypeStreamReqPart = MsgType('p')
	MsgTypeSingleRes     = MsgType('R')
	MsgTypeStreamRes     = MsgType('S')
	MsgTypeErrorRes      = MsgType('E')
	MsgTypeRetryRes      = MsgType('e')
	MsgTypeNotification  = MsgType('n')
	MsgTypeHeartbeat     = MsgType('h')
	MsgTypeProtocolError = MsgType('f')
)

Protocol message types

View Source
const (
	ProtocolErrorAbnormal    = 0
	ProtocolErrorUnsupported = 1
	ProtocolErrorInvalidMsg  = 2
	ProtocolErrorTimeout     = 3
)

ProtocolError codes

View Source
const JSLibSHA1Base64 = "YLDgrGNN3M7gn9qZDeBmFusYWIQ="
View Source
const JSLibString = "" /* 18406-byte string literal not displayed */
View Source
const ProtocolVersion = uint8(1)

Version of this protocol

View Source
const Unlimited = uint32(0xFFFFFFFF)

Unlimited can be used with Limits.BufferRequests and Limits.StreamRequests

View Source
const Version = "1.3.7"

Current version of gotalk

Variables

View Source
var (
	ErrUnexpectedStreamingRes = errors.New("unexpected streaming response")
	ErrSockClosed             = errors.New("socket closed")
)

Returned by (Sock)BufferRequest when a streaming response is recieved

View Source
var (
	ErrAbnormal    = errors.New("abnormal condition")
	ErrUnsupported = errors.New("unsupported protocol")
	ErrInvalidMsg  = errors.New("invalid protocol message")
	ErrTimeout     = errors.New("timeout")
)
View Source
var DefaultHandlers = &Handlers{}

Default handlers, manipulated by the package-level handle functions like HandleBufferRequest

View Source
var DefaultLimits = &Limits{
	ReadTimeout:    30 * time.Second,
	BufferRequests: Unlimited,
	StreamRequests: 0,
	BufferMinWait:  500 * time.Millisecond,
	BufferMaxWait:  5000 * time.Millisecond,
	StreamMinWait:  500 * time.Millisecond,
	StreamMaxWait:  5000 * time.Millisecond,
}

DefaultLimits does not limit buffer requests, and disables stream requests.

View Source
var HeartbeatMsgMaxLoad = 0xffff

Maximum value of a heartbeat's "load"

View Source
var NoLimits = &Limits{
	BufferRequests: Unlimited,
	StreamRequests: Unlimited,
}

NoLimits does not limit buffer requests or stream requests, nor does it have a read timeout.

Functions

func DefaultLoggerFunc added in v1.3.4

func DefaultLoggerFunc(s *Sock, format string, args ...interface{})

DefaultLoggerFunc forwards the message to Go's "log" package; log.Printf(format, args...)

func FormatRequestID

func FormatRequestID(n uint32) []byte

Returns a 4-byte representation of a 32-bit integer, suitable an integer-based request ID.

func Handle

func Handle(op string, fn interface{})

Handle operation with automatic JSON encoding of values.

`fn` must conform to one of the following signatures:

func(*Sock, string, interface{}) (interface{}, error) -- takes socket, op and parameters
func(*Sock, interface{}) (interface{}, error)         -- takes socket and parameters
func(*Sock) (interface{}, error)                      -- takes no parameters
func(interface{}) (interface{}, error)                -- takes parameters, but no socket
func() (interface{},error)                            -- takes no socket or parameters

Where optionally the `interface{}` return value can be omitted, i.e:

func(*Sock, string, interface{}) error
func(*Sock, interface{}) error
func(*Sock) error
func(interface{}) error
func() error

If `op` is empty, handle all requests which doesn't have a specific handler registered.

func HandleBufferNotification

func HandleBufferNotification(name string, fn BufferNoteHandler)

Handle notifications of a certain name with raw input buffers. If `name` is empty, handle all notifications which doesn't have a specific handler registered.

func HandleBufferRequest

func HandleBufferRequest(op string, fn BufferReqHandler)

Handle operation with raw input and output buffers. If `op` is empty, handle all requests which doesn't have a specific handler registered.

func HandleNotification

func HandleNotification(name string, fn interface{})

Handle notifications of a certain name with automatic JSON encoding of values.

`fn` must conform to one of the following signatures:

func(s *Sock, name string, v interface{}) -- takes socket, name and parameters
func(name string, v interface{})          -- takes name and parameters, but no socket
func(v interface{})                       -- takes only parameters
func()                                    -- takes nothing

If `name` is empty, handle all notifications which doesn't have a specific handler registered.

func HandleStreamRequest

func HandleStreamRequest(op string, fn StreamReqHandler)

Handle operation by reading and writing directly from/to the underlying stream. If `op` is empty, handle all requests which doesn't have a specific handler registered.

func MakeHeartbeatMsg

func MakeHeartbeatMsg(load uint16, b []byte) []byte

Create a slice of bytes representing a heartbeat message

func MakeMsg

func MakeMsg(t MsgType, id, name3 string, wait, size uint32) []byte

Create a slice of bytes representing a message (w/o any payload)

func Pipe

func Pipe(handlers *Handlers, limits *Limits) (*Sock, *Sock, error)

Creates two sockets which are connected to eachother without any resource limits. If `handlers` is nil, DefaultHandlers are used. If `limits` is nil, DefaultLimits are used.

func ReadVersion

func ReadVersion(s io.Reader) (uint8, error)

Read the version the other end implements. Returns an error if this side's protocol is incompatible with the other side's version.

func Serve

func Serve(how, addr string, acceptHandler SockHandler) error

Start a `how` server accepting connections at `addr`

func TLSAddRootCerts

func TLSAddRootCerts(certFile string) error

TLSAddRootCerts is a convenience for adding root (CA) certificates from a PEM file to the cert pool used by gotalk's TLS functions and returned by TLSCertPool()

func TLSCertPool

func TLSCertPool() *x509.CertPool

TLSCertPool returns the root CA pool. This is normally the same as returned by crypto/x509.SystemCertPool and can be modified, i.e. by adding your own development CA certs. All gotalk TLS functions that creates tls.Config uses this.

func WriteVersion

func WriteVersion(s io.Writer) (int, error)

Write the version this protocol implements to `s`

Types

type BufferNoteHandler

type BufferNoteHandler func(s *Sock, name string, payload []byte)

type BufferReqHandler

type BufferReqHandler func(s *Sock, op string, payload []byte) ([]byte, error)

If a handler panics, it's assumed that the effect of the panic was isolated to the active request. Panic is recovered, a stack trace is logged, and connection is closed.

type Handlers

type Handlers struct {
	// contains filtered or unexported fields
}

The Handlers struct contains request and notifications handlers. Create a new set of handlers by simply creating a zero struct: `&Handlers{}`

func NewHandlers

func NewHandlers() *Handlers

NewHandlers creates a new Handlers struct. DEPRECATED: use `&Handlers{}` instead

func (*Handlers) FindBufferRequestHandler

func (h *Handlers) FindBufferRequestHandler(op string) BufferReqHandler

Look up a single-buffer handler for operation `op`. Returns `nil` if not found.

func (*Handlers) FindNotificationHandler

func (h *Handlers) FindNotificationHandler(name string) BufferNoteHandler

Look up a handler for notification `name`. Returns `nil` if not found.

func (*Handlers) FindStreamRequestHandler

func (h *Handlers) FindStreamRequestHandler(op string) StreamReqHandler

Look up a stream handler for operation `op`. Returns `nil` if not found.

func (*Handlers) Handle

func (h *Handlers) Handle(op string, fn interface{})

Handle operation with automatic JSON encoding of values.

`fn` must conform to one of the following signatures:

func(*Sock, string, interface{}) (interface{}, error) -- takes socket, op and parameters
func(*Sock, interface{}) (interface{}, error)         -- takes socket and parameters
func(*Sock) (interface{}, error)                      -- takes no parameters
func(interface{}) (interface{}, error)                -- takes parameters, but no socket
func() (interface{},error)                            -- takes no socket or parameters

Where optionally the `interface{}` return value can be omitted, i.e:

func(*Sock, string, interface{}) error
func(*Sock, interface{}) error
func(*Sock) error
func(interface{}) error
func() error

If `op` is empty, handle all requests which doesn't have a specific handler registered.

func (*Handlers) HandleBufferNotification

func (h *Handlers) HandleBufferNotification(name string, fn BufferNoteHandler)

Handle notifications of a certain name with raw input buffers. If `name` is empty, handle all notifications which doesn't have a specific handler registered.

func (*Handlers) HandleBufferRequest

func (h *Handlers) HandleBufferRequest(op string, fn BufferReqHandler)

Handle operation with raw input and output buffers. If `op` is empty, handle all requests which doesn't have a specific handler registered.

func (*Handlers) HandleNotification

func (h *Handlers) HandleNotification(name string, fn interface{})

Handle notifications of a certain name with automatic JSON encoding of values.

`fn` must conform to one of the following signatures:

func(s *Sock, name string, v interface{}) -- takes socket, name and parameters
func(name string, v interface{})          -- takes name and parameters, but no socket
func(v interface{})                       -- takes only parameters
func()                                    -- takes nothing

If `name` is empty, handle all notifications which doesn't have a specific handler registered.

func (*Handlers) HandleStreamRequest

func (h *Handlers) HandleStreamRequest(op string, fn StreamReqHandler)

Handle operation by reading and writing directly from/to the underlying stream. If `op` is empty, handle all requests which doesn't have a specific handler registered.

func (*Handlers) NewSubHandlers added in v1.3.5

func (h *Handlers) NewSubHandlers() *Handlers

NewSubHandlers returns a new Handlers object which wraps the receiver. It can be used to override or extend h, without modifying h. For example, it could be used to expose an extra set of operations to certain sockets, like signed-in users.

type Limits

type Limits struct {
	ReadTimeout time.Duration // timeout for reading messages from the network (0=no limit)

	BufferRequests uint32 // max number of concurrent buffer requests
	StreamRequests uint32 // max number of concurrent buffer requests

	BufferMinWait time.Duration // minimum time to wait when BufferRequests has been reached
	BufferMaxWait time.Duration // max time to wait when BufferRequests has been reached

	StreamMinWait time.Duration // minimum time to wait when StreamRequests has been reached
	StreamMaxWait time.Duration // max time to wait when StreamRequests has been reached
}

func NewLimits

func NewLimits(bufferRequestLimit uint32, streamRequestLimit uint32) *Limits

Create new Limits based on DefaultLimits It's usually easier to just construct Limits{} manually. This function is here mainly for backwards API compatibility with earlier Gotalk.

type LoggerFunc added in v1.3.4

type LoggerFunc func(s *Sock, format string, args ...interface{})

LoggerFunc is the signature of log functions. s is a related socket, or nil if none apply to the error.

var (
	// ErrorLogger is called when an error occurs during writing or reading (rare)
	ErrorLogger LoggerFunc = DefaultLoggerFunc

	// HandlerErrorLogger is called when a handler function either panics or returns an error
	HandlerErrorLogger LoggerFunc = DefaultLoggerFunc
)

type MsgType

type MsgType byte

Protocol message type

func ReadMsg

func ReadMsg(s io.Reader, b []byte) (t MsgType, id, name3 string, wait, size uint32, err error)

Read a message from `s` If t is MsgTypeHeartbeat, wait==load, size==time

type Request

type Request struct {
	MsgType
	Op   string
	Data []byte
}

func NewRequest

func NewRequest(op string, buf []byte) *Request

Creates a new single request

type Response

type Response struct {
	MsgType
	Data []byte
	Wait time.Duration // only valid when IsRetry()==true
}

func (*Response) Error

func (r *Response) Error() string

Returns a string describing the error, when IsError()==true

func (*Response) IsError

func (r *Response) IsError() bool

True if this response is a requestor error (ErrorResult)

func (*Response) IsRetry

func (r *Response) IsRetry() bool

True if response is a "server can't handle it right now, please retry" (RetryResult)

func (*Response) IsStreaming

func (r *Response) IsStreaming() bool

True if this is part of a streaming response (StreamResult)

type Server

type Server struct {
	// Handlers associated with this server. Accepted sockets inherit the value.
	*Handlers

	// Limits. Accepted sockets are subject to the same limits.
	*Limits

	// Function to be invoked just after a new socket connection has been accepted and
	// protocol handshake has sucessfully completed. At this point the socket is ready
	// to be used. However the function will be called in the socket's "read" goroutine,
	// meaning no messages will be received on the socket until this function returns.
	AcceptHandler SockHandler

	// Template value for accepted sockets. Defaults to 0 (no automatic heartbeats)
	HeartbeatInterval time.Duration

	// Template value for accepted sockets. Defaults to nil
	OnHeartbeat func(load int, t time.Time)

	// Transport
	Listener net.Listener
}

Accepts socket connections

func Listen

func Listen(how, addr string) (*Server, error)

Start a `how` server listening for connections at `addr`. You need to call Accept() on the returned socket to start accepting connections. `how` and `addr` are passed to `net.Listen()` and thus any values accepted by net.Listen are valid. The returned server has Handlers=DefaultHandlers and Limits=DefaultLimits set, which you can change if you want.

func ListenTLS

func ListenTLS(how, addr string, certFile, keyFile string) (*Server, error)

Start a `how` server listening for connections at `addr` with TLS certificates. You need to call Accept() on the returned socket to start accepting connections. `how` and `addr` are passed to `net.Listen()` and thus any values accepted by net.Listen are valid. The returned server has Handlers=DefaultHandlers and Limits=DefaultLimits set, which you can change if you want.

func ListenTLSCustom

func ListenTLSCustom(how, addr string, config *tls.Config) (*Server, error)

Start a `how` server listening for connections at `addr` with custom TLS configuration. You need to call Accept() on the returned socket to start accepting connections. `how` and `addr` are passed to `net.Listen()` and thus any values accepted by net.Listen are valid. The returned server has Handlers=DefaultHandlers and Limits=DefaultLimits set, which you can change if you want.

func NewServer

func NewServer(h *Handlers, limits *Limits, l net.Listener) *Server

Create a new server already listening on `l`

func (*Server) Accept

func (s *Server) Accept() error

Accept connections. Blocks until Close() is called or an error occurs.

func (*Server) Addr

func (s *Server) Addr() string

Address this server is listening at

func (*Server) Close

func (s *Server) Close() error

Stop listening for and accepting connections

func (*Server) EnableUnixSocketGC

func (s *Server) EnableUnixSocketGC()

Unix sockets must be unlink()ed before being reused again. If you don't manage this yourself already, this function provides a limited but quick way to deal with cleanup by installing a signal handler.

type Sock

type Sock struct {
	// Handlers associated with this socket
	Handlers *Handlers

	// Associate some application-specific data with this socket
	UserData interface{}

	// Enable streaming requests and set the limit for how many streaming requests this socket
	// can handle at the same time. Setting this to `0` disables streaming requests alltogether
	// (the default) while setting this to a large number might be cause for security concerns
	// as a malicious peer could send many "start stream" messages, but never sending
	// any "end stream" messages, slowly exhausting memory.
	StreamReqLimit int

	// A function to be called when the socket closes.
	// If the socket was closed because of a protocol error, `code` is >=0 and represents a
	// ProtocolError* constant.
	CloseHandler func(s *Sock, code int)

	// Automatically retry requests which can be retried
	AutoRetryRequests bool

	// HeartbeatInterval controls how much time a socket waits between sending its heartbeats.
	// If this is 0, automatic sending of heartbeats is disabled.
	// Defaults to 20 seconds when created with NewSock.
	HeartbeatInterval time.Duration

	// If not nil, this function is invoked when a heartbeat is recevied
	OnHeartbeat func(load int, t time.Time)
	// contains filtered or unexported fields
}

func Connect

func Connect(how, addr string) (*Sock, error)

Connect to a server via `how` at `addr`. Unless there's an error, the returned socket is already reading in a different goroutine and is ready to be used.

func ConnectTLS

func ConnectTLS(how, addr string, config *tls.Config) (*Sock, error)

Connect to a server via `how` at `addr` over TLS. Unless there's an error, the returned socket is already reading in a different goroutine and is ready to be used.

func NewSock

func NewSock(h *Handlers) *Sock

func (*Sock) Addr

func (s *Sock) Addr() string

Address of this socket

func (*Sock) Adopt

func (s *Sock) Adopt(r io.ReadWriteCloser)

Adopt an I/O stream, which should already be in a "connected" state. After adopting a new connection, you should call Handshake to perform the protocol handshake, followed by Read to read messages.

func (*Sock) BufferNotify

func (s *Sock) BufferNotify(name string, buf []byte) error

Send a single-buffer notification

func (*Sock) BufferRequest

func (s *Sock) BufferRequest(op string, buf []byte) ([]byte, error)

Send a single-buffer request, wait for and return the response. Automatically retries the request if needed.

func (*Sock) Close

func (s *Sock) Close() error

Close this socket. It is safe for multiple goroutines to call this concurrently.

func (*Sock) CloseError

func (s *Sock) CloseError(protocolErrorCode int32) error

Close this socket because of a protocol error (ProtocolErrorXXX)

func (*Sock) Conn

func (s *Sock) Conn() io.ReadWriteCloser

Access the socket's underlying connection

func (*Sock) Connect

func (s *Sock) Connect(how, addr string, limits *Limits) error

Connect to a server via `how` at `addr`

func (*Sock) ConnectReader

func (s *Sock) ConnectReader(r io.ReadWriteCloser, limits *Limits) error

Take control over reader r, perform initial handshake and begin communication on a background goroutine.

func (*Sock) ConnectTLS

func (s *Sock) ConnectTLS(how, addr string, limits *Limits, config *tls.Config) error

Connect to a server via `how` at `addr` over TLS. tls.Config is optional; passing nil is equivalent to &tls.Config{}

func (*Sock) Handshake

func (s *Sock) Handshake() error

Before reading any messages over a socket, handshake must happen. This function will block until the handshake either succeeds or fails.

func (*Sock) IsClosed added in v1.3.2

func (s *Sock) IsClosed() bool

IsClosed returns true if Close() has been called. It is safe for multiple goroutines to call this concurrently.

func (*Sock) Notify

func (s *Sock) Notify(name string, v interface{}) error

Send a single-value request where the value is JSON-encoded

func (*Sock) Read

func (s *Sock) Read(limits *Limits) error

After completing a succesful handshake, call this function to read messages received to this socket. Does not return until the socket is closed. If HeartbeatInterval > 0 this method also sends automatic heartbeats.

func (*Sock) Request

func (s *Sock) Request(op string, in interface{}, out interface{}) error

Send a single-value request where the input and output values are JSON-encoded

func (*Sock) SendHeartbeat

func (s *Sock) SendHeartbeat(load float32, buf []byte) error

func (*Sock) SendRequest

func (s *Sock) SendRequest(r *Request, reschan chan Response) error

Send a single-buffer request. A response should be received from reschan.

func (*Sock) Shutdown added in v1.2.1

func (s *Sock) Shutdown(wg *sync.WaitGroup, timeout time.Duration) error

Shut down this socket, giving it timeout time to complete any ongoing work.

timeout should be a short duration as its used for I/O read and write timeout; any work in handlers does not account to the timeout (and is unlimited.) timeout is ignored if the underlying Conn() does not implement SetReadDeadline or SetWriteDeadline.

This method returns immediately. Once all work is complete, calls s.Close() and wg.Done().

This method should not be used with web socket connections. Instead, call Close() from your http.Server.RegisterOnShutdown handler.

func (*Sock) StreamRequest

func (s *Sock) StreamRequest(op string) (*StreamRequest, chan Response)

Send a multi-buffer streaming request

func (*Sock) String added in v1.2.0

func (s *Sock) String() string

String returns a name that uniquely identifies the socket during its lifetime

type SockHandler

type SockHandler func(*Sock)

type StreamReqHandler

type StreamReqHandler func(s *Sock, name string, rch chan []byte, out io.WriteCloser) error

EOS when <-rch==nil

type StreamRequest

type StreamRequest struct {
	// contains filtered or unexported fields
}

func (*StreamRequest) End

func (r *StreamRequest) End() error

func (*StreamRequest) Write

func (r *StreamRequest) Write(b []byte) error

type WebSocket added in v1.2.0

type WebSocket struct {
	Sock

	// A function to be called when the socket closes. See Socket.CloseHandler for details.
	CloseHandler func(s *WebSocket, code int)
}

WebSocket is a type of gotalk.Sock used for web socket connections, managed by a WebSocketServer.

func (*WebSocket) Conn added in v1.2.0

func (s *WebSocket) Conn() *WebSocketConnection

Conn returns the underlying web socket connection.

Accessing the web socket connection inside a handler function: Handler functions can opt in to receive a pointer to a Sock but not a WebSocket. This makes handlers more portable, testable and the implementation becomes simpler. However, sometimes you might need to access the web socket connection anyhow:

gotalk.Handle("foo", func(s *gotalk.Sock, m FooMessage) error {
  ws := s.Conn().(*gotalk.WebSocketConnection)
  // do something with ws
  return nil
})

func (*WebSocket) Context added in v1.2.0

func (s *WebSocket) Context() context.Context

Context returns the http request's context. The returned context is never nil.

func (*WebSocket) Request added in v1.2.0

func (s *WebSocket) Request() *http.Request

Request returns the http request upgraded to the WebSocket

type WebSocketConnection added in v1.3.1

type WebSocketConnection = websocket.Conn

WebSocketConnection is an alias for the websocket connection type, to spare the use from having to import golang.org/x/net/websocket

type WebSocketServer

type WebSocketServer struct {
	// Handlers describe what this server is capable of responding to.
	// Initially set to gotalk.DefaultHandlers by NewWebSocketServer().
	//
	// Handler can be assigned a new set of handlers at any time.
	// Whenever a new socket is connected, it references the current value of Handlers, therefore
	// changes to Handlers has an effect for newly connected sockets only.
	*Handlers

	// Limits control resource limits.
	// Initially set to gotalk.DefaultLimits by NewWebSocketServer().
	*Limits

	// OnConnect is an optional handler to be invoked when a new socket is connected.
	// This handler is only called for sockets which passed the protocol handshake.
	// If you want to deny a connection, simply call s.Close() on the socket in this handler.
	//
	// Gotalk checks if Origin header is a valid URL by default but does nothing else in terms of
	// origin validation. You might want to verify s.Conn().Config().Origin in OnConnect.
	OnConnect func(s *WebSocket)

	// HeartbeatInterval is not used directly by WebSocketServer but assigned to every new socket
	// that is connected. The default initial value (0) means "no automatic heartbeats" (disabled.)
	//
	// Note that automatic heartbeats are usually not a good idea for web sockets for two reasons:
	//
	//   a) You usually want to keep as few connections open as possible; letting them time out is
	//      often desired (heartbeats prevent connection timeout.)
	//
	//   b) Automatic timeout uses more resources
	//
	HeartbeatInterval time.Duration

	// OnHeartbeat is an optional callback for heartbeat confirmation messages.
	// Not used directly by WebSocketServer but assigned to every new socket that is connected.
	OnHeartbeat func(load int, t time.Time)

	// Underlying websocket server (will become a function in gotalk 2)
	Server *websocket.Server

	// DEPRECATED use OnConnect instead
	OnAccept SockHandler
	// contains filtered or unexported fields
}

WebSocketServer conforms to http.HandlerFunc and is used to serve Gotalk over HTTP or HTTPS

func NewWebSocketServer added in v1.2.0

func NewWebSocketServer() *WebSocketServer

NewWebSocketServer creates a web socket server which is a http.Handler

func WebSocketHandler

func WebSocketHandler() *WebSocketServer

DEPREACTED use NewWebSocketServer

func (*WebSocketServer) ServeHTTP

func (s *WebSocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request)

Directories

Path Synopsis
examples
limits
Demonstrates request limiting
Demonstrates request limiting
pipe
A simple example demonstrating the use of gotalk.Pipe()
A simple example demonstrating the use of gotalk.Pipe()
read-timeout
Demonstrates read timeout (or "idle" timeout)
Demonstrates read timeout (or "idle" timeout)
stream
Demonstrates using streaming requests and results Demonstrates
Demonstrates using streaming requests and results Demonstrates
tcp
Demonstrates how to use gotalk over direct TCP connections
Demonstrates how to use gotalk over direct TCP connections
tls
demonstrates how to use gotalk over encrypted connections (TLS)
demonstrates how to use gotalk over encrypted connections (TLS)
websocket-chat
Multi-room chat app implemented in gotalk
Multi-room chat app implemented in gotalk
websocket-minimal
A minimal gotalk web app
A minimal gotalk web app

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL