frisbee

package module
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 28, 2021 License: Apache-2.0 Imports: 14 Imported by: 0

README

Frisbee

Build Status Go Report Card go-doc

This is the Go library for Frisbee, a bring-your-own protocol messaging framework designed for performance and stability.

This library requires Go1.15 or later.

Important note about releases and stability

This repository generally follows Semantic Versioning. However, this library is currently in Alpha and is still considered experimental. Breaking changes of the library will not trigger a new major release. The same is true for selected other new features explicitly marked as EXPERIMENTAL in CHANGELOG.md.

Usage

Documentation and example usage is available at https://loopholelabs.io/docs/frisbee.

Contributing

Bug reports and pull requests are welcome on GitHub at https://github.com/Loophole-Labs/frisbee. For more contribution information check out the contribution guide.

License

The Frisbee project is available as open source under the terms of the Apache License, Version 2.0.

Code of Conduct

Everyone interacting in the Frisbee project’s codebases, issue trackers, chat rooms and mailing lists is expected to follow the CNCF Code of Conduct.

Project Managed By:

Documentation

Overview

Package frisbee is the core package for using the frisbee messaging framework. The frisbee framework is a messaging framework designed around the aspect of "bring your own protocol", and can be used by simply defining your message types and their accompanying logic.

This package provides methods for defining message types and logic, as well as functionality for implementing frisbee servers and clients. Useful features like automatic heartbeats and automatic reconnections are provided as well.

In depth documentation and examples can be found at https://loopholelabs.io/docs/frisbee

All exported functions and methods are safe to be used concurrently unless specified otherwise.

An Echo Example

As a starting point, a very basic echo server:

package main

import (
	"github.com/loophole-labs/frisbee"
	"github.com/rs/zerolog/log"
	"os"
	"os/signal"
)

const PING = uint32(1)
const PONG = uint32(2)

func handlePing(_ *frisbee.Async, incomingMessage frisbee.Message, incomingContent []byte) (outgoingMessage *frisbee.Message, outgoingContent []byte, action frisbee.Action) {
	if incomingMessage.ContentLength > 0 {
		log.Printf("Server Received Message: %s", incomingContent)
		outgoingMessage = &frisbee.Message{
			From:          incomingMessage.From,
			To:            incomingMessage.To,
			Id:            incomingMessage.Id,
			Operation:     PONG,
			ContentLength: incomingMessage.ContentLength,
		}
		outgoingContent = incomingContent
	}

	return
}

func main() {
	router := make(frisbee.ServerRouter)
	router[PING] = handlePing
	exit := make(chan os.Signal)
	signal.Notify(exit, os.Interrupt)

	s := frisbee.NewServer(":8192", router)
	err := s.Start()
	if err != nil {
		panic(err)
	}

	<-exit
	err = s.Shutdown()
	if err != nil {
		panic(err)
	}
}

And an accompanying echo client:

package main

import (
	"fmt"
	"github.com/loophole-labs/frisbee"
	"github.com/rs/zerolog/log"
	"os"
	"os/signal"
	"time"
)

const PING = uint32(1)
const PONG = uint32(2)

func handlePong(incomingMessage frisbee.Message, incomingContent []byte) (outgoingMessage *frisbee.Message, outgoingContent []byte, action frisbee.Action) {
	if incomingMessage.ContentLength > 0 {
		log.Printf("Client Received Message: %s", string(incomingContent))
	}
	return
}

func main() {
	router := make(frisbee.ClientRouter)
	router[PONG] = handlePong
	exit := make(chan os.Signal)
	signal.Notify(exit, os.Interrupt)

	c := frisbee.NewClient("127.0.0.1:8192", router)
	err := c.ConnectAsync()
	if err != nil {
		panic(err)
	}

	go func() {
		i := 0
		for {
			message := []byte(fmt.Sprintf("ECHO MESSAGE: %d", i))
			err := c.WriteMessage(&frisbee.Message{
				To:            0,
				From:          0,
				Id:            uint32(i),
				Operation:     PING,
				ContentLength: uint64(len(message)),
			}, &message)
			if err != nil {
				panic(err)
			}
			i++
			time.Sleep(time.Second)
		}
	}()

	<-exit
	err = c.Close()
	if err != nil {
		panic(err)
	}
}

(Examples taken from https://github.com/Loophole-Labs/frisbee-examples/)

This example is a simple echo client/server, where the client will repeatedly send messages to the server, and the server will echo them back. Its purpose is to describe the flow of messages from Frisbee Client to Server, as well as give an example of how a Frisbee application must be implemented.

Index

Examples

Constants

View Source
const (
	// CONNECTED is used to specify that the connection is functioning normally
	CONNECTED = int32(iota)

	// CLOSED is used to specify that the connection has been closed (possibly due to an error)
	CLOSED

	// PAUSED is used in the event of a read or write error and puts the connection into a paused state,
	// this is then used by the reconnection logic to resume the connection
	PAUSED
)

These are states that frisbee connections can be in:

View Source
const (
	// DIAL is the error returned by the frisbee connection if dialing the server fails
	DIAL errors.ErrorContext = "error while dialing connection"

	// WRITECONN is the error returned by the frisbee client or server if writing to a frisbee connection fails
	WRITECONN errors.ErrorContext = "error while writing to frisbee connection"

	// READCONN is the error returned by the frisbee client or server if reading from a frisbee connection fails
	READCONN errors.ErrorContext = "error while reading from frisbee connection"

	// WRITE is the error returned by a frisbee connection if a write to the underlying TCP connection fails
	WRITE errors.ErrorContext = "error while writing to buffer"

	// PUSH is the error returned by a frisbee connection if a push to the message queue fails
	PUSH errors.ErrorContext = "error while pushing packet to message queue"

	// POP is the error returned by a frisbee connection if a pop from the message queue fails
	POP errors.ErrorContext = "error while popping packet from message queue"

	// ACCEPT is the error returned by a frisbee server if the underlying TCP server is unable to accept a new connection
	ACCEPT errors.ErrorContext = "error while accepting connections"
)

These are various frisbee error contexts that can be returned by the client or server:

View Source
const (
	// NONE is used to do nothing (default)
	NONE = Action(iota)

	// CLOSE is used to close the frisbee connection
	CLOSE

	// SHUTDOWN is used to shutdown the frisbee client or server
	SHUTDOWN
)

These are various frisbee actions, used to modify the state of the client or server from a router function:

View Source
const (
	// HEARTBEAT is used to send heartbeats from the client to the server (and measure round trip time)
	HEARTBEAT = uint32(iota)
	NEWSTREAM
	STREAMCLOSE
	RESERVED4
	RESERVED5
	RESERVED6
	RESERVED7
	RESERVED8
	RESERVED9
)

These are internal reserved message types, and are the reason you cannot use 0-9 in the ClientRouter or the ServerRouter:

View Source
const DefaultBufferSize = 1 << 19

Variables

View Source
var (
	InvalidContentLength     = errors.New("invalid content length")
	ConnectionClosed         = errors.New("connection closed")
	ConnectionPaused         = errors.New("connection paused")
	ConnectionNotInitialized = errors.New("connection not initialized")
	InvalidBufferContents    = errors.New("invalid buffer contents")
	InvalidBufferLength      = errors.New("invalid buffer length")
)

These are various frisbee errors that can be returned by the client or server:

View Source
var DefaultLogger = zerolog.New(os.Stdout)

DefaultLogger is the default logger used within frisbee

Functions

This section is empty.

Types

type Action

type Action int

Action is an ENUM used to modify the state of the client or server from a router function

NONE: used to do nothing (default)
CLOSE: close the frisbee connection
SHUTDOWN: shutdown the frisbee client or server

type Async added in v0.1.3

type Async struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Async is the underlying asynchronous frisbee connection which has extremely efficient read and write logic and can handle the specific frisbee requirements. This is not meant to be used on its own, and instead is meant to be used by frisbee client and server implementations

func ConnectAsync added in v0.1.3

func ConnectAsync(network string, addr string, keepAlive time.Duration, logger *zerolog.Logger, TLSConfig *tls.Config) (*Async, error)

ConnectAsync creates a new TCP connection (using net.Dial) and wraps it in a frisbee connection

func NewAsync added in v0.1.3

func NewAsync(c net.Conn, logger *zerolog.Logger) (conn *Async)

NewAsync takes an existing net.Conn object and wraps it in a frisbee connection

func (*Async) Close added in v0.1.3

func (c *Async) Close() error

Close closes the frisbee connection gracefully

func (*Async) Error added in v0.1.3

func (c *Async) Error() error

Error returns the error that caused the frisbee.Async to close or go into a paused state

func (*Async) Flush added in v0.1.3

func (c *Async) Flush() error

Flush allows for synchronous messaging by flushing the message buffer and instantly sending messages

func (*Async) LocalAddr added in v0.1.3

func (c *Async) LocalAddr() net.Addr

LocalAddr returns the local address of the underlying net.Conn

func (*Async) Logger added in v0.1.3

func (c *Async) Logger() *zerolog.Logger

Logger returns the underlying logger of the frisbee connection

func (*Async) NewStream added in v0.1.3

func (c *Async) NewStream(id uint64) *Stream

func (*Async) Raw added in v0.1.3

func (c *Async) Raw() net.Conn

Raw shuts off all of frisbee's underlying functionality and converts the frisbee connection into a normal TCP connection (net.Conn)

func (*Async) ReadMessage added in v0.1.3

func (c *Async) ReadMessage() (*Message, *[]byte, error)

ReadMessage is a blocking function that will wait until a frisbee message is available and then return it (and its content). In the event that the connection is closed, ReadMessage will return an error.

func (*Async) RemoteAddr added in v0.1.3

func (c *Async) RemoteAddr() net.Addr

RemoteAddr returns the remote address of the underlying net.Conn

func (*Async) SetDeadline added in v0.1.3

func (c *Async) SetDeadline(t time.Time) error

SetDeadline sets the read and write deadline on the underlying net.Conn

func (*Async) SetReadDeadline added in v0.1.3

func (c *Async) SetReadDeadline(t time.Time) error

SetReadDeadline sets the read deadline on the underlying net.Conn

func (*Async) SetWriteDeadline added in v0.1.3

func (c *Async) SetWriteDeadline(t time.Time) error

SetWriteDeadline sets the write deadline on the underlying net.Conn

func (*Async) StreamChannel added in v0.1.3

func (c *Async) StreamChannel() <-chan *Stream

StreamChannel returns a channel that can be listened to for new stream connections

func (*Async) WriteBufferSize added in v0.1.3

func (c *Async) WriteBufferSize() int

WriteBufferSize returns the size of the underlying message buffer (used for internal message handling and for heartbeat logic)

func (*Async) WriteMessage added in v0.1.3

func (c *Async) WriteMessage(message *Message, content *[]byte) error

WriteMessage takes a frisbee.Message and some (optional) accompanying content, and queues it up to send asynchronously.

If message.ContentLength == 0, then the content array must be nil. Otherwise, it is required that message.ContentLength == len(content).

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client connects to a frisbee Server and can send and receive frisbee messages

func NewClient

func NewClient(addr string, router ClientRouter, opts ...Option) *Client

NewClient returns an uninitialized frisbee Client with the registered ClientRouter. The ConnectAsync method must then be called to dial the server and initialize the connection

Example
package main

import (
	"github.com/loophole-labs/frisbee"
	"github.com/rs/zerolog"
	"os"
)

func main() {
	router := make(frisbee.ClientRouter)

	router[0] = func(incomingMessage frisbee.Message, incomingContent []byte) (outgoingMessage *frisbee.Message, outgoingContent []byte, action frisbee.Action) {
		return
	}

	logger := zerolog.New(os.Stdout)

	frisbee.NewClient("127.0.0.1:8080", router, frisbee.WithLogger(&logger))
}
Output:

func (*Client) Close

func (c *Client) Close() error

Close closes the frisbee client and kills all the goroutines

func (*Client) Closed added in v0.1.3

func (c *Client) Closed() bool

Closed checks whether this client has been closed

func (*Client) Connect

func (c *Client) Connect() error

Connect actually connects to the given frisbee server, and starts the reactor goroutines to receive and handle incoming messages.

func (*Client) Error added in v0.1.3

func (c *Client) Error() error

Error checks whether this client has an error

func (*Client) Logger

func (c *Client) Logger() *zerolog.Logger

Logger returns the client's logger (useful for ClientRouter functions)

func (*Client) NewStreamConn added in v0.1.3

func (c *Client) NewStreamConn(id uint64) *Stream

NewStreamConn creates a new Stream from the underlying frisbee.Async

func (*Client) Raw

func (c *Client) Raw() (net.Conn, error)

Raw converts the frisbee client into a normal net.Conn object, and returns it. This is especially useful in proxying and streaming scenarios.

func (*Client) StreamChannel added in v0.1.3

func (c *Client) StreamChannel() <-chan *Stream

StreamChannel returns a channel that can be listened on to retrieve stream connections as they're created

func (*Client) WriteMessage added in v0.1.3

func (c *Client) WriteMessage(message *Message, content *[]byte) error

WriteMessage sends a frisbee Message from the client to the server

type ClientRouter

type ClientRouter map[uint32]ClientRouterFunc

ClientRouter maps frisbee message types to specific handler functions (of type ClientRouterFunc)

type ClientRouterFunc

type ClientRouterFunc func(incomingMessage Message, incomingContent []byte) (outgoingMessage *Message, outgoingContent []byte, action Action)

ClientRouterFunc defines a message handler for a specific frisbee message

type Conn

type Conn interface {
	Close() error
	LocalAddr() net.Addr
	RemoteAddr() net.Addr
	SetDeadline(time.Time) error
	SetReadDeadline(time.Time) error
	SetWriteDeadline(time.Time) error
	WriteMessage(*Message, *[]byte) error
	ReadMessage() (*Message, *[]byte, error)
	Logger() *zerolog.Logger
	Error() error
	Raw() net.Conn
}

type Message

type Message protocol.Message

Message is the structured frisbee message, and contains the following:

type MessageV0 struct {
	From          uint32 // 4 Bytes
	To            uint32 // 4 Bytes
	Id            uint32 // 4 Bytes
	Operation     uint32 // 4 Bytes
	ContentLength uint64 // 8 Bytes
}

These fields can be used however the user sees fit, however ContentLength must match the length of the content being delivered with the frisbee message (see the Async.WriteMessage function for more details).

type Option

type Option func(opts *Options)

Option is used to generate frisbee client and server options internally

func WithHeartbeat

func WithHeartbeat(heartbeat time.Duration) Option

WithHeartbeat sets the minimum time between heartbeat messages. By default, messages are only sent if no messages have been sent since the last heartbeat message - to change this behaviour you can disable heartbeats (by passing in -1), and implementing your own logic.

func WithKeepAlive

func WithKeepAlive(keepAlive time.Duration) Option

WithKeepAlive allows users to define TCP keepalive options for the frisbee client or server (use -1 to disable)

func WithLogger

func WithLogger(logger *zerolog.Logger) Option

WithLogger sets the logger for the frisbee client or server

func WithOptions

func WithOptions(options Options) Option

WithOptions allows users to pass in an Options struct to configure a frisbee client or server

func WithTLS added in v0.1.3

func WithTLS(tlsConfig *tls.Config) Option

WithTLS sets the TLS configuration for Frisbee. By default no TLS configuration is used, and Frisbee will use unencrypted TCP connections. If the Frisbee Server is using TLS, then you must pass in a TLS config (even an empty one `&tls.Config{}`) for the Frisbee Client.

type Options

type Options struct {
	KeepAlive time.Duration
	Heartbeat time.Duration
	Logger    *zerolog.Logger
	TLSConfig *tls.Config
}

Options is used to provide the frisbee client and server with configuration options.

Default Values:

options := Options {
	KeepAlive: time.Minute * 3,
	Logger: &DefaultLogger,
	Heartbeat: time.Second * 5,
}

type Server

type Server struct {

	// OnOpened is a function run by the server whenever a connection is opened
	OnOpened func(server *Server, c *Async) Action

	// OnClosed is a function run by the server whenever a connection is closed
	OnClosed func(server *Server, c *Async, err error) Action

	// OnShutdown is run by the server before it shuts down
	OnShutdown func(server *Server)

	// PreWrite is run by the server before a write is done (useful for metrics)
	PreWrite func(server *Server)
	// contains filtered or unexported fields
}

Server accepts connections from frisbee Clients and can send and receive frisbee messages

func NewServer

func NewServer(addr string, router ServerRouter, opts ...Option) *Server

NewServer returns an uninitialized frisbee Server with the registered ServerRouter. The Start method must then be called to start the server and listen for connections

Example
package main

import (
	"github.com/loophole-labs/frisbee"
	"github.com/rs/zerolog"
	"os"
)

func main() {
	router := make(frisbee.ServerRouter)

	router[0] = func(c *frisbee.Async, incomingMessage frisbee.Message, incomingContent []byte) (outgoingMessage *frisbee.Message, outgoingContent []byte, action frisbee.Action) {
		return
	}

	logger := zerolog.New(os.Stdout)

	frisbee.NewServer("127.0.0.1:8080", router, frisbee.WithLogger(&logger))
}
Output:

func (*Server) Logger

func (s *Server) Logger() *zerolog.Logger

Logger returns the server's logger (useful for ServerRouter functions)

func (*Server) Shutdown

func (s *Server) Shutdown() error

Shutdown shuts down the frisbee server and kills all the goroutines and active connections

func (*Server) Start

func (s *Server) Start() error

Start will start the frisbee server and its reactor goroutines to receive and handle incoming connections. If the OnClosed, OnOpened, OnShutdown, or PreWrite functions have not been defined, it will use default null functions for these.

type ServerRouter

type ServerRouter map[uint32]ServerRouterFunc

ServerRouter maps frisbee message types to specific handler functions (of type ServerRouterFunc)

type ServerRouterFunc

type ServerRouterFunc func(c *Async, incomingMessage Message, incomingContent []byte) (outgoingMessage *Message, outgoingContent []byte, action Action)

ServerRouterFunc defines a message handler for a specific frisbee message

type Stream added in v0.1.3

type Stream struct {
	*Async
	// contains filtered or unexported fields
}

func (*Stream) Close added in v0.1.3

func (s *Stream) Close() error

func (*Stream) Closed added in v0.1.3

func (s *Stream) Closed() bool

func (*Stream) ID added in v0.1.3

func (s *Stream) ID() uint64

func (*Stream) Read added in v0.1.3

func (s *Stream) Read(p []byte) (int, error)

Read is a function that will read buffer messages into a byte slice. In the event that the connection is closed, Read will return an error.

func (*Stream) ReadFrom added in v0.1.3

func (s *Stream) ReadFrom(r io.Reader) (n int64, err error)

ReadFrom is a function that will send NEWSTREAM messages from an io.Reader until EOF or an error occurs In the event that the connection is closed, ReadFrom will return an error.

func (*Stream) Write added in v0.1.3

func (s *Stream) Write(p []byte) (int, error)

Write takes a byte slice and sends a NEWSTREAM message

func (*Stream) WriteTo added in v0.1.3

func (s *Stream) WriteTo(w io.Writer) (n int64, err error)

WriteTo is a function that will read buffer messages into an io.Writer until EOF or an error occurs In the event that the connection is closed, WriteTo will return an error.

type Sync added in v0.1.3

type Sync struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Sync is the underlying synchronous frisbee connection which has extremely efficient read and write logic and can handle the specific frisbee requirements. This is not meant to be used on its own, and instead is meant to be used by frisbee client and server implementations

func ConnectSync added in v0.1.3

func ConnectSync(network string, addr string, keepAlive time.Duration, logger *zerolog.Logger, TLSConfig *tls.Config) (*Sync, error)

ConnectSync creates a new TCP connection (using net.Dial) and wraps it in a frisbee connection

func NewSync added in v0.1.3

func NewSync(c net.Conn, logger *zerolog.Logger) (conn *Sync)

NewSync takes an existing net.Conn object and wraps it in a frisbee connection

func (*Sync) Close added in v0.1.3

func (c *Sync) Close() error

Close closes the frisbee connection gracefully

func (*Sync) Error added in v0.1.3

func (c *Sync) Error() error

Error returns the error that caused the frisbee.Sync to close or go into a paused state

func (*Sync) LocalAddr added in v0.1.3

func (c *Sync) LocalAddr() net.Addr

LocalAddr returns the local address of the underlying net.Conn

func (*Sync) Logger added in v0.1.3

func (c *Sync) Logger() *zerolog.Logger

Logger returns the underlying logger of the frisbee connection

func (*Sync) Raw added in v0.1.3

func (c *Sync) Raw() net.Conn

Raw shuts off all of frisbee's underlying functionality and converts the frisbee connection into a normal TCP connection (net.Conn)

func (*Sync) ReadMessage added in v0.1.3

func (c *Sync) ReadMessage() (*Message, *[]byte, error)

ReadMessage is a blocking function that will wait until a frisbee message is available and then return it (and its content). In the event that the connection is closed, ReadMessage will return an error.

func (*Sync) RemoteAddr added in v0.1.3

func (c *Sync) RemoteAddr() net.Addr

RemoteAddr returns the remote address of the underlying net.Conn

func (*Sync) SetDeadline added in v0.1.3

func (c *Sync) SetDeadline(t time.Time) error

SetDeadline sets the read and write deadline on the underlying net.Conn

func (*Sync) SetReadDeadline added in v0.1.3

func (c *Sync) SetReadDeadline(t time.Time) error

SetReadDeadline sets the read deadline on the underlying net.Conn

func (*Sync) SetWriteDeadline added in v0.1.3

func (c *Sync) SetWriteDeadline(t time.Time) error

SetWriteDeadline sets the write deadline on the underlying net.Conn

func (*Sync) WriteMessage added in v0.1.3

func (c *Sync) WriteMessage(message *Message, content *[]byte) error

WriteMessage takes a frisbee.Message and some (optional) accompanying content, sends it synchronously.

If message.ContentLength == 0, then the content array must be nil. Otherwise, it is required that message.ContentLength == len(content).

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL