livenet

package module
v0.0.0-...-81393df Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 2, 2018 License: MIT Imports: 18 Imported by: 0

README

LiveNet

Demo of gRPC streaming for liveness detection in a fully connected network.

Proof of concept for a fully connected network that uses gRPC streams for constant messaging without exchanging meta information between RPC requests. By detecting if the stream is closed, each host on the network can determine if the connection is live or not. Hosts can reconnect at anytime to repair the state of the network.

This system also demonstrates the use of a message multiplexer so that a single stream between hosts is used. This model hijacks the gRPC server handler definitions, but the hosts implement an actor model so each message is treated as an independent event.

LiveNet Architecture

As you can see, the LiveNet architecture implements two streams between each peer on the network. The first stream, implemented via the Remote struct allows the localhost to send requests to the remote peer and and get replies back. The second stream is connected when the remote peer connects to the stream server and sends requests, to which replies are sent back. All four streams are completely independent.

Questions

  1. How long can a stream be kept open and messages passed?
  2. Does the stream on the remote need to be guarded by a mutex?
  3. Can we implement a client service using streaming as well?

Running LiveNet

Install the LiveNet command as follows:

$ go get github.com/bbengfort/livenet/...

Create a configuration file per host, config.json, as follows:

{
  "tick": "500ms",
  "log_level": 2,
  "peers": [
    {
      "pid": 1,
      "name": "alpha",
      "ip_address": "localhost",
      "port": 3264
    },
    {
      "pid": 2,
      "name": "bravo",
      "ip_address": "localhost",
      "port": 3265
    },
    {
      "pid": 3,
      "name": "charlie",
      "ip_address": "localhost",
      "port": 3266
    }
  ]
}

Then run each server with the livenet serve command, specify -c to supply the path to the configuration file (looks for config.json by default). You can also specify the name of the localhost with the -n flag, by default the name is the hostname of the machine.

The LiveNet server will send heartbeat messages every 500ms - 1 second to all of its peers, and every 8 minutes or so will print a status message about the connections.

Documentation

Index

Constants

View Source
const (
	DefaultTick     = 500 * time.Millisecond
	DefaultLogLevel = LogCaution
)

Default configuration values

View Source
const (
	LogTrace uint8 = iota
	LogDebug
	LogInfo
	LogCaution
	LogStatus
	LogWarn
	LogSilent
)

Levels for implementing the debug and trace message functionality.

View Source
const CautionThreshold = 50

CautionThreshold for issuing caution logs after accumulating cautions.

View Source
const PackageVersion = "0.2"

PackageVersion of LiveNet

Variables

This section is empty.

Functions

func LogLevel

func LogLevel() string

LogLevel returns a string representation of the current level

func SetLogLevel

func SetLogLevel(level uint8)

SetLogLevel modifies the log level for messages at runtime. Ensures that the highest level that can be set is the trace level.

func SetLogger

func SetLogger(l *log.Logger)

SetLogger sets the logger for writing output to. Can set to a noplog to remove all log messages (or set the log level to silent).

Types

type Callback

type Callback func(Event) error

Callback is a function that can receive events.

type Config

type Config struct {
	Name     string       `json:"name,omitempty"`      // unique name of local replica (hostname by default)
	Seed     int64        `json:"seed"`                // random seed to initialize with
	Tick     string       `json:"tick"`                // click tick rate for timing (parseable duration)
	Uptime   string       `json:"uptime,omitempty"`    // run for a specified duration then shutdown
	LogLevel int          `json:"log_level,omitempty"` // verbosity of logging, lower is more verbose
	Peers    []peers.Peer `json:"peers"`               // all hosts on the LiveNet
}

Config implements a simple configuration object that can be loaded from a JSON file and defines the LiveNet network.

func (*Config) Dump

func (c *Config) Dump(path string) (err error)

Dump the configuration to the path on disk

func (*Config) GetLogLevel

func (c *Config) GetLogLevel() uint8

GetLogLevel returns the uint8 parsed logging verbosity

func (*Config) GetName

func (c *Config) GetName() (string, error)

GetName returns the configured name or the hostname, required.

func (*Config) GetPeer

func (c *Config) GetPeer() (peers.Peer, error)

GetPeer returns the local peer configuration or error if no peer found.

func (*Config) GetRemotes

func (c *Config) GetRemotes(actor Dispatcher) ([]*Remote, error)

GetRemotes returns all peer configurations for remote hosts on the network, excluding the local peer configuration.

func (*Config) GetTick

func (c *Config) GetTick() (tick time.Duration, err error)

GetTick returns the parsed duration from the tick configuration

func (*Config) GetUptime

func (c *Config) GetUptime() (uptime time.Duration, err error)

GetUptime returns the parsed duration from the uptime configuration. If no uptime is specified then a duration of 0 and no error is returned.

func (*Config) Load

func (c *Config) Load(path string) (err error)

Load the configuration from the path on disk

func (*Config) SetLogLevel

func (c *Config) SetLogLevel()

SetLogLevel from the configuration if specified, e.g. > 0

func (*Config) SetSeed

func (c *Config) SetSeed()

SetSeed if the seed is specified, e.g. > 0

type Dispatcher

type Dispatcher interface {
	Dispatch(e Event) error
	DispatchMessage(msg *pb.Envelope, source interface{}) error
	DispatchError(err error, source interface{})
}

Dispatcher is an object that listens for events and handles them.

type Event

type Event interface {
	Type() EventType
	Source() interface{}
	Value() interface{}
}

Event represents actions that occur during consensus. Listeners can register callbacks with event handlers for specific event types.

type EventType

type EventType uint16

EventType is an enumeration of the kind of events that can occur.

const (
	ErrorEvent EventType = iota
	MessageEvent
	TimeoutEvent
	HeartbeatTimeout
	StatusTimeout
)

Event types represented in LiveNet

func (EventType) String

func (t EventType) String() string

String returns the name of event types

type MessageCounts

type MessageCounts struct {
	// contains filtered or unexported fields
}

MessageCounts is a simple data structure for keeping track of how many messages are sent, received, and dropped from a connection.

func (*MessageCounts) Drop

func (c *MessageCounts) Drop()

Drop increments the dropped messages count

func (*MessageCounts) DropR

func (c *MessageCounts) DropR() float64

DropR returns the ratio of dropped to sent messages

func (*MessageCounts) Recv

func (c *MessageCounts) Recv()

Recv increments the received messages count

func (*MessageCounts) RecvR

func (c *MessageCounts) RecvR() float64

RecvR returns the ratio of received to sent messages

func (*MessageCounts) Reset

func (c *MessageCounts) Reset()

Reset the message counts back to zero

func (*MessageCounts) Sent

func (c *MessageCounts) Sent()

Sent increments the sent messages count

func (*MessageCounts) String

func (c *MessageCounts) String() string

type Remote

type Remote struct {
	sync.RWMutex
	peers.Peer
	// contains filtered or unexported fields
}

Remote implements a streaming connection to a remote peer on the network.

func NewRemote

func NewRemote(p peers.Peer, a Dispatcher) *Remote

NewRemote creates a new remote associated with the actor

func (*Remote) Send

func (r *Remote) Send(msg *pb.Envelope) error

Send a message to the remote

func (*Remote) Status

func (r *Remote) Status() string

Status returns a string with information about the remote connection.

type Server

type Server struct {
	peers.Peer
	// contains filtered or unexported fields
}

Server implements a LiveNet host that connects to all peers on the network via gRPC streams. It can send a variety of messages but primarily sends routine heartbeats to the other servers.

func New

func New(config *Config) (server *Server, err error)

New is the entry point to the LiveNet service for a single machine, it instantiates a LiveNet sever for the specified network and configuration.

func (*Server) Close

func (s *Server) Close() error

Close the event handler and shutdown the server gracefully.

func (*Server) Dispatch

func (s *Server) Dispatch(e Event) error

Dispatch an event to be serialized by the event channel.

func (*Server) DispatchError

func (s *Server) DispatchError(err error, source interface{})

DispatchError sends error messages that will stop the server and the event loop, returning an error and closing the process.

func (*Server) DispatchMessage

func (s *Server) DispatchMessage(msg *pb.Envelope, source interface{}) error

DispatchMessage creates an event for the specified message type

func (*Server) Handle

func (s *Server) Handle(e Event) error

Handle events by passing the event to the specified event handlers.

func (*Server) Heartbeat

func (s *Server) Heartbeat()

Heartbeat sends a routine liveness message to other peers.

func (*Server) Listen

func (s *Server) Listen() error

Listen for messages from peers and clients and run the event loop.

func (*Server) Post

func (s *Server) Post(stream pb.LiveNet_PostServer) (err error)

Post implements the LiveNet stream server, listening for stream connections from clients and remote hosts and dispatching each message as an event. Every message received on the stream is responded to before a new message event is dispatched on receive. This ensures that messages are ordered with respect to those that are sent from the client.

Post and Dispatch together also implements the simple multiplexer based on message type. Post sends events of the specified type, which gets handled by the specific event handler.

func (*Server) Status

func (s *Server) Status()

Status reports the liveness status to the console.

Directories

Path Synopsis
cmd
Package pb is a generated protocol buffer package.
Package pb is a generated protocol buffer package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL