roletalk

package module
v1.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 2, 2022 License: MIT Imports: 25 Imported by: 0

README

Roletalk: Microservice communication framework for the Go language

Roletalk is peer-to-peer communication framework for microservices; asynchronous and event-driven. Developed in honor of scalability, simplicity and efficiency.

Essentially, it is peer-to-service framework, which allows you to create multiple services (roles) on single peer and communicate to them with ease.

Roletalk internally uses Websocket for data transferring as TCP framing tool with minimal network overhead. Currently there is corresponding wire-compatible Node.js framework.

Table of Contents

Installation

Download:

$ go get github.com/xshkut/roletalk-go

Import in your code:

import "github.com/xshkut/roletalk-go"

API

API referrence is available on GoDoc

Use case

Choose roletalk if you need to build brokerless architecture. Such approach brings some advantages: minimal RTT (round-trip time), less network transferring, no SPOF (single point of failure). In some scenarios broker can be a bottleneck also.

Roletalk lets you implement flexible communication patterns:

  • Publish-Subscribe,
  • Remote Procedure Call (RPC),
  • Event (single message with optional payload),
  • Pipeline (sequental distributed data processing),
  • Bus (many-to-many communication),
  • Survey (request-reply to multiple peers).

Features

• Client as service. No matter who establish connection. Both listener and dialer of a connection are services. It allows you to deploy services not exposing them to the Internet or behind private network etc. Of course such instances should connect to listeners they have network access to.

• Automatic service-recognition. Just connect peers together and each of them will know each about other's roles (services), IDs, names and meta info. If peer's role gets disabled or enabled each connected peer immediately gets informed and rebuilds its internal state.

• Redunant connections support. There could be multiple connections between two peers. All communication is load-balanced between them. With auto-reconnection this can be useful to keep connected two peers which are in case of one peer's IP address is changed.

• Binary streams. Transfer large or unknown amounts of data via streams.

• Round-robin client-side load balancing between units implementing a role (service);

• No internal heavy message conversions (json/xml serialization/parsing). Just binary to utf-8/int and vice-versa.

• Optional TLS on transport layer.

• Scalable and simple in-built authentication: each peer can have zero or multiple ID: KEY combinations.

Concept

Structure

Roletalk concept consists of:

  • Peer - local node in your peer-to-peer architecture. Peer can have Units, Roles and Destinations.
  • Unit - remote node connected to the Peer
  • Connection - communication link between Peer and Unit. Each Unit can have one or more connections. If there are redunant connection, communications is load-balanced between them: checking which of them is not busy or choosing random one if no vacant one found. In case of last connection gets aborted Unit gets closed and removed from the Peer
  • Role - service registered on Peer. Role can have handlers for each communication type. Role can be active or inactive. When role changes its state all connected units get informed to rebuild their state
  • Destination - role registered on Units. Destination includes all connected units which serve corresponding role. If last Unit gets disconnected or disables the role, Destinations gets closed.

All communications is performed via Destinations's methods and is load-balanced among its Units unless Unit is explicitly specified. Unit is chosen for each message / request / stream.

Communication

Roletalk defines three types of communication:

  • Message - one-way act of communication. Should be used when no delivery acknowledgement is needed. Successfully sent message means that it has been written to underlying socket
  • Request - request in common meaning. Request can only be rejected or replied. Returns error when Unit rejects it, timeout exceeds or Unit disconnects after request was sent.
  • Stream - one-way stream of binary data. Streams can be Readable and Writable. If Peer calls Readable ( Destination.Readable() ) then Units handle Writable ( Role.OnWritable() ) and vice-versa. Stream sessions begin with Request. After Unit replied for request, data is transferred over connection used for the reply. If connection aborts stream destroys.

Incoming messages are wrapped in Context - object with payload and meta info for all types of incoming messages (message, request, request for stream).

All communication is performed with two basic properties: Role and Event (name of action. Some synonyms in other frameworks: method, path, action) to identify which handler to call. When Unit gets message it forwards it to corresponding role or rejects it if such role isn't specified on the Peer. If role has no handlers for event it rejects it, otherwise call handlers.

Data types

Roletalk uses six data types:

  • Binary - []byte
  • Null - nil
  • Bool - bool
  • String - string
  • Number - float64
  • Object - []byte of JSON-stringified object

All communication (except stream sessions) can use data of any type. Type of data should be chosen on stage of designing microservice specification or retrieved by calling Context methods.

Acquaintance

One aditional feature of roletalk is acquaintance. It is could be considered as redunant but it nicely fits the use case of frameworks.

The process of it is simple. You have listener PEER_A which is connected to listener PEER_B. After some PEER_C has connected to PEER_A, PEER_A will send to PEER_C info {ID, ADDRESS, ROLES} of PEER_A. If PEER_C is FRIENDLY, has one or more destination from received ROLES and PEER_C is not connected to peer with provided ID it will start connection to the ADDRESS

Acquantance is enabled by default, but you can set Friendly option to FALSE to disable is.

Security

To achieve strong MITM-protection use HTTPS.

Framework is provided with simple in-built authentication mechanism: preshared ID: KEY pairs. Peers which are supposed to be connected should have at least one common ID: KEY pair.

The authentication process between two peers can be described in a simple few steps:

  1. Each peer sends a list of his auth ID's and randomly generated CHALLENGE to another. If no auth ID:KEY specified peer sends auth confirmation.

  2. Each peer checks if locally registered list of ID's intersects with the one received from remote peer. If no ID matched, sends error and closes conenctions with corresponding code.

  3. Each peer chooses random intersected ID and hashes received CHALLENGE with corresponding KEY for that ID. Sends hashed RESPONSE value in pair with that ID.

  4. Each peer receives RESPONSE with ID and checks if CHALLENGE hashed with corresponding KEY of received ID equals received RESPONSE. In such case sends auth confirmation. Else rejects auth and closes connection.

  5. In case of both peers confirm each others's RESPONSES the handshake process is complete and then connections take part in futher communication. If auth exceeds timeout or some peer sends error and/or closes connection handshake fails.

Contribution

Project is MIT-licensed.

Feel free to open issues and fork.

If you have any ideas or remarks you are welcome to contact the author.

Documentation

Overview

Package roletalk is asynchronous peer-to-peer communication framework for microservices with its own transport protocol. It was developed in honor of scalability, simplicity and efficiency.

For detailed overview and use case please refer to readme.md in project's repository.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConnectOptions

type ConnectOptions struct {
	DoNotReconnect bool //set true if connection is not supposed to reconnect after abort
	DoNotAcquaint  bool //set true if connection is not supposed to be introduced to remote peers nor to be acquainted with remote peers
	InsecureTLS    bool //set true if TLS errors are supposed to be ignoredt
}

ConnectOptions specifies options for outgoing connection

type Datatype

type Datatype byte

Datatype represents type of data defined by roletalk communication protocol. Can be checked with corresponding constants. Implements Stringer (https://golang.org/pkg/fmt/#Stringer) interface

const (
	//DatatypeBinary represents []byte
	DatatypeBinary Datatype = 0

	//DatatypeNull represents nil
	DatatypeNull Datatype = 1

	//DatatypeBool represents bool
	DatatypeBool Datatype = 2

	//DatatypeString represents string
	DatatypeString Datatype = 3

	//DatatypeNumber represents float64
	DatatypeNumber Datatype = 4

	//DatatypeJSON represents []byte of JSON stringified object
	DatatypeJSON Datatype = 5
)

func (Datatype) String

func (d Datatype) String() string

type Destination

type Destination struct {
	// contains filtered or unexported fields
}

Destination represents a role (a service name) of remote peers (units). Destination is used as a gateway for outgoing communication. It implements round-robin load balancing between units. To communicate with specific remote peer (unit) use EmitOptions

func (*Destination) HasUnit

func (dest *Destination) HasUnit(unit *Unit) bool

HasUnit returns true if provided unit serves a role with Destination's name

func (*Destination) Name

func (dest *Destination) Name() string

Name returns Destination's name

func (*Destination) NewReader

func (dest *Destination) NewReader(event string, opts EmitOptions) (res *MessageContext, r *Readable, err error)

NewReader requests for creating binary stream session and returns its readable end. Returns error if remote peer rejected the request or request timed out

func (*Destination) NewWriter

func (dest *Destination) NewWriter(event string, opts EmitOptions) (res *MessageContext, writable *Writable, err error)

NewWriter requests for creating binary stream session and returns its writable end. Returns error if remote peer rejected the request or request timed out

func (*Destination) OnClose

func (dest *Destination) OnClose(f func())

OnClose adds handler function f which runs synchronosly with other close handlers in FIFO order when last Unit gets disconnected

func (*Destination) OnUnit

func (dest *Destination) OnUnit(f func(unit *Unit))

OnUnit adds handler function f which executes synchronosly with other unit handlers in FIFO order when it gets new unit

func (*Destination) Ready

func (dest *Destination) Ready() bool

Ready indicates whether Destination has connected units

func (*Destination) Request

func (dest *Destination) Request(event string, opts EmitOptions) (res *MessageContext, err error)

Request emits request message to remote peer (Unit). Returns error if remote peer rejected the request or request timed out, otherwise returns response context

func (*Destination) Send

func (dest *Destination) Send(event string, opts EmitOptions) error

Send sends one-way message to remote peer (Unit). Returns error if message has not been written to underlying connection

func (*Destination) Units

func (dest *Destination) Units() []*Unit

Units returns slice of all connected units serving corresponding role

type EmitOptions

type EmitOptions struct {
	Data            interface{}
	Unit            *Unit
	Timeout         time.Duration
	IgnoreUnitClose bool
}

EmitOptions determines Data to send and additional transfer options. All fields are optional. Specify Unit to send data to; Timeout for callback (Timeout option is ignored for Send and Broadcast methods); If IgnoreUnitClose is true, request will not be rejected internally when communicated unit disconnects, but timeout will still has its place.

type MessageContext

type MessageContext struct {
	Data interface{} //Payload of message. Feel free to be change it on your needs
	// contains filtered or unexported fields
}

MessageContext is context for all types of incoming messages, including ones for readable and writable streams. Data option is allowed to be changed by middleware. To get original data call OriginData(). Note: MessageContext has no relation to type Context (https://golang.org/pkg/context/#Context)

func (*MessageContext) Conn

func (ctx *MessageContext) Conn() *websocket.Conn

Conn returns underlying connections which was used to transfer the message

func (*MessageContext) Event

func (ctx *MessageContext) Event() string

Event returns the message's event name

func (*MessageContext) OriginData

func (ctx *MessageContext) OriginData() OriginData

OriginData returns unchanged context's data.

func (*MessageContext) Role

func (ctx *MessageContext) Role() string

Role returns the role which message is addressed to

func (*MessageContext) String

func (ctx *MessageContext) String() string

func (*MessageContext) Unit

func (ctx *MessageContext) Unit() *Unit

Unit returns Unit who sent the message

type MessageHandler

type MessageHandler func(im *MessageContext)

MessageHandler is function which handles incoming messages.

type MetaInfo

type MetaInfo struct {
	Os       string `json:"os"`
	Runtime  string `json:"runtime"`
	Uptime   int64  `json:"uptime"`
	Time     int64  `json:"time"`
	Protocol string `json:"protocol"`
}

MetaInfo represents meta info of remote peer

type OriginData

type OriginData struct {
	T    Datatype
	Data []byte
}

OriginData represents unchanged received context's data.

type Peer

type Peer struct {
	Name     string
	Friendly bool //Friendly means that Peer will follow acquaint messages from remote peers (Units) and connect to them if it isn't connected yet
	// contains filtered or unexported fields
}

Peer represents the local node in your peer-to-peer architecture. Create Peer with NewPeer() only. It is recommended to use Singleton() instance instead.

func NewPeer

func NewPeer(opts PeerOptions) *Peer

NewPeer creates Peer and initializes its internal state

func Singleton

func Singleton() *Peer

Singleton returns singleton Peer instance. It is used to share single Peer between multiple places of your code

func (*Peer) AddKey

func (peer *Peer) AddKey(id, key string)

AddKey adds provided authentication key with corresponded id. Only peers with matched id's will proceed to futher authentication. Peer without keys will approve remote peer (Unit).

func (*Peer) Close

func (peer *Peer) Close()

Close all listeners and connections

func (*Peer) Connect

func (peer *Peer) Connect(urlStr string, opts ...ConnectOptions) (unit *Unit, err error)

Connect establishes connection to remote peer and creates Unit

func (*Peer) Destination

func (peer *Peer) Destination(name string) *Destination

Destination returns Destination with provided name or first creates it if such does not exist. Destination prepresents corresponding remote peers' roles

func (*Peer) ID

func (peer *Peer) ID() string

ID returns peer's unique identificator. It is created when peer is constructed.

func (*Peer) InvolveConn

func (peer *Peer) InvolveConn(c *websocket.Conn) (*Unit, error)

InvolveConn accepts websocket.Conn for authentication and further communication

func (*Peer) ListDestinations

func (peer *Peer) ListDestinations() []string

ListDestinations returns all registered destinations

func (*Peer) ListRoles

func (peer *Peer) ListRoles() []string

ListRoles returns all registered roles

func (*Peer) Listen

func (peer *Peer) Listen(address string) (net.Addr, error)

Listen to incoming connections. Creates new http.Server and blocks till it listens

func (*Peer) OnRole

func (peer *Peer) OnRole(f func(role *Role))

OnRole adds role handler function f which executes synchronosly with other role handlers in FIFO order when Peer gets new Role

func (*Peer) OnUnit

func (peer *Peer) OnUnit(f func(unit *Unit))

OnUnit adds unit handler function f which executes synchronosly with other unit handlers in FIFO order when Peer gets new Unit

func (*Peer) Role

func (peer *Peer) Role(name string) *Role

Role creates or retrieves Role with provided name

func (*Peer) ServeHTTP

func (peer *Peer) ServeHTTP(w http.ResponseWriter, r *http.Request)

func (*Peer) Unit

func (peer *Peer) Unit(id string) *Unit

Unit returns referrence to Unit with provided id or nil if such does not exist or is not connected

func (*Peer) Units

func (peer *Peer) Units() []*Unit

Units returns all connected units of the Peer

func (*Peer) WaitForClose

func (peer *Peer) WaitForClose()

WaitForClose waits until all units (their underlying connections) and listeners will be closed. Could be used to prevent Main() from returning

type PeerOptions

type PeerOptions struct {
	Name     string
	Friendly bool
}

PeerOptions provide options to create Peer

type Readable

type Readable struct {
	// contains filtered or unexported fields
}

Readable implements Reader

func (*Readable) Destroy

func (r *Readable) Destroy(err error) error

Destroy sends err end and closes stream

func (*Readable) Read

func (r *Readable) Read(p []byte) (n int, err error)

type ReadableRequestHandler

type ReadableRequestHandler func(im *ReaderRequestContext)

ReadableRequestHandler is function which handles incoming requests.

type ReaderRequestContext

type ReaderRequestContext struct {
	*RequestContext
	// contains filtered or unexported fields
}

ReaderRequestContext is context for incoming request to establish binary stream readable on this end

func (*ReaderRequestContext) Reply

func (ctx *ReaderRequestContext) Reply(data interface{}) (*Readable, error)

Reply stops middleware flow and responds to the message. If data argument is provided, it overwrites im.Data

func (*ReaderRequestContext) Then

func (ctx *ReaderRequestContext) Then(cb func(ctx *ReaderRequestContext))

Then binds middleware to message context. Middleware runs in LIFO order

type RequestContext

type RequestContext struct {
	*MessageContext

	Res interface{}
	Err error
	// contains filtered or unexported fields
}

RequestContext is context for incoming requests

func (*RequestContext) OriginData

func (ctx *RequestContext) OriginData() OriginData

OriginData returns unchanged received context's data.

func (*RequestContext) Reject

func (ctx *RequestContext) Reject(data interface{}) error

Reject responds to request with error; data can be error, string or nil. If Err argument is nil, Err option will be taken for rejection

func (*RequestContext) Reply

func (ctx *RequestContext) Reply(data interface{}) error

Reply stops middleware flow and responds to the message. If data argument is provided, it overrides Data option

func (*RequestContext) Then

func (ctx *RequestContext) Then(cb func(ctx *RequestContext))

Then binds middleware to message context. Middleware runs in LIFO order

type RequestHandler

type RequestHandler func(im *RequestContext)

RequestHandler is function which handles incoming requests.

type Role

type Role struct {
	// contains filtered or unexported fields
}

Role represents a service on the local Peer. It should handle incoming messages, requests and stream requests for certain functionality

func (*Role) Active

func (role *Role) Active() bool

Active is used to check the role's state; returns true if Peer serves the role.

func (*Role) Disable

func (role *Role) Disable()

Disable stops peer to serve the role; immediately hides the role for all connected units

func (*Role) Enable

func (role *Role) Enable()

Enable starts peer to serve the role; immediately shows the role to all connected units

func (*Role) Name

func (role *Role) Name() string

Name returns Role's name

func (*Role) OnMessage

func (role *Role) OnMessage(event string, handler func(im *MessageContext))

OnMessage registers message handler for provided event. It does not support wildcard or regexp matching. Providing empty string as event sets handler for all messages despite the event

func (*Role) OnReader

func (role *Role) OnReader(event string, handler func(ctx *ReaderRequestContext))

OnReader registers readable stream handler for provided event. It does not support wildcard or regexp matching. Providing empty string as event sets handler for all requests despite the event

func (*Role) OnRequest

func (role *Role) OnRequest(event string, handler func(im *RequestContext))

OnRequest registers request handler for provided event. It does not support wildcard or regexp matching. Providing empty string as event sets handler for all requests despite the event

func (*Role) OnStatusChange

func (role *Role) OnStatusChange(fnc func())

OnStatusChange registers handler for role status change (when it gets activated or deactivated)

func (*Role) OnWriter

func (role *Role) OnWriter(event string, handler func(ctx *WriterRequestContext))

OnWriter registers writable stream handler for provided event. It does not support wildcard or regexp matching. Providing empty string as event sets handler for all requests despite the event

type Unit

type Unit struct {
	// contains filtered or unexported fields
}

Unit represents remote peer

func (*Unit) Close

func (unit *Unit) Close()

Close all underlying connections

func (*Unit) Connected

func (unit *Unit) Connected() bool

Connected returns true if unit's underlying conn's have not been closed. If all conns's are closed or there was at least a moment when all conn's were closed, Connected returns false

Explanation: when all connections of a unit are closed, Peer gets rid of the unit. But you could still keep referrence to it. When unit gets reconnected, a new instance of type Unit is created, possibly with different ID. So Connected() method can be used to check whether unit is still attached to Peer. That's the reason why Unit has no communication methods.

func (*Unit) Friendly

func (unit *Unit) Friendly() bool

Friendly indicates whether remote peer handles acquaint messages

func (*Unit) GetRoles

func (unit *Unit) GetRoles() []string

GetRoles returns list of roles the unit serves

func (*Unit) HasRole

func (unit *Unit) HasRole(name string) bool

HasRole returns true if unit serves the role with provided name

func (*Unit) ID

func (unit *Unit) ID() string

ID returns remote peer's id

func (*Unit) Meta

func (unit *Unit) Meta() MetaInfo

Meta returns unit's meta data

func (*Unit) Name

func (unit *Unit) Name() string

Name returns remote peer's name

func (*Unit) OnClose

func (unit *Unit) OnClose(f func(err error))

OnClose adds handler function f which runs synchronosly with other close handlers of the destination in FIFO order when Destination losts last unit

type Writable

type Writable struct {
	// contains filtered or unexported fields
}

Writable implement WriteCLoser

func (*Writable) Close

func (w *Writable) Close() error

Close successfully

func (*Writable) Destroy

func (w *Writable) Destroy(err error) error

Destroy sends err end and closes stream

func (*Writable) Write

func (w *Writable) Write(p []byte) (n int, err error)

type WritableRequestHandler

type WritableRequestHandler func(im *WriterRequestContext)

WritableRequestHandler is function which handles incoming requests.

type WriterRequestContext

type WriterRequestContext struct {
	*RequestContext
	// contains filtered or unexported fields
}

WriterRequestContext is context for incoming request to establish binary stream writable on this end

func (*WriterRequestContext) Reply

func (ctx *WriterRequestContext) Reply(data interface{}) (*Writable, error)

Reply stops middleware flow and responds to the message. If data argument is provided, it overwrites im.Data

func (*WriterRequestContext) Then

func (ctx *WriterRequestContext) Then(cb func(ctx *WriterRequestContext))

Then binds middleware to message context. Middleware runs in LIFO order

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL