pulse

package
v0.0.0-...-fdefbbc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 29, 2021 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	MAGIC_BYTES = []byte("qbpulse") // Magic bytes allow clients to verify they are speaking the same protocol

)

Functions

func HttpAPI

func HttpAPI(p *Pulse, port int, env string)

REST API provides status update on the nodes being tracked

func IdentifierToAddr

func IdentifierToAddr(iden Identifier) (ipAddr, port string)

func PulseServer

func PulseServer(ctx context.Context, addr Identifier, wg sync.WaitGroup) (net.Addr, error)

Create a Pulse server that listens on addr Identifier and responds to pulse messages Spins up two go routines, one for listening for UDP messages, and the other to wait on ctx.Done

func RequestPulse

func RequestPulse(ctx context.Context, node *Node, resp chan interface{})

func SendPulse

func SendPulse(ctx context.Context, node *Node, nStream chan FailureMessage, wg sync.WaitGroup)

Request a pulse signal to a node, if failure is suspected, choose 3 random nodes to inquire about liveliness Retry maxRetry times while waiting for average RTT between each messages. If both these methods fail to produce pulse from the suspect, mark the node as dead and send a Failure message to nStream channel.

Types

type Cli

type Cli struct {
	// contains filtered or unexported fields
}

func (*Cli) Run

func (c *Cli) Run()

type Coordinator

type Coordinator interface {
	// Add an IP Address: ipAddr, Port: port to the monitor list and start asking for pulses
	AddPulser(ipAddr, port string, maxRetry int) error

	// Remove IP Address: ipAddr, Port: port from the monitor list and stop asking for pulses
	RemovePulser(ipAddr, port string) error

	// Collectively stop monitoring all pulsers
	StopAllPulser()

	// Collectively start all pulsers added to monitor list
	StartAllPulser() error

	// Get the current status of a specific Node identified by Identifier
	Status(id Identifier) (Status, error)

	// Get the current status of all Nodes
	StatusAll() ([]*Status, error)
}

Coordinator is a node that requests pulse response from a map of nodes

type FailureMessage

type FailureMessage struct {
	Id             Identifier
	State          State
	RetryAttempts  uint8
	GossipPulse    []Identifier
	InitialConnect time.Time
	LastConnected  time.Time
}

Message struct to send to notify channel when a node is determined to have failed

type FullNode

type FullNode interface {
	Pulser
	Coordinator
	Gossip()
}

Pulser and Coordinator functions are indepedent, but can also be used together as a Full node

type Identifier

type Identifier string

Identifier is ipv4:port format (eg. 212.189.35.68:3005)

func AddrToIdentifier

func AddrToIdentifier(ipAddr, port string) Identifier

type Node

type Node struct {
	IpAddr         string
	Port           string
	Status         State
	Tracking       bool
	MaxRetry       uint8
	Delay          uint8
	RTT            float32
	InitialConnect time.Time
	LastConnected  time.Time
	// contains filtered or unexported fields
}

func CreateNode

func CreateNode(ipAddr, port string, maxRetry, delay uint8) *Node

Create a node to track

func PickThree

func PickThree() []*Node

type Pulse

type Pulse struct {
	Id string
	// contains filtered or unexported fields
}

Pulse implements the FullNode interface, It can act as a Coordinator or Pulser indenpendently, Or it can act as a Full Node with gossip protocol

func Initialize

func Initialize(capacity int) (*Pulse, chan FailureMessage, error)

func (*Pulse) AddPulser

func (p *Pulse) AddPulser(ipAddr, port string, maxRetry, delay uint8, wg sync.WaitGroup) error

Add the pulser to the map of nodes to monitor and immediately start sending pulses maxRetry: the number of times to re-send the udp message before declaring the node dead delay: the number of seconds to delay between each message

func (*Pulse) Gossip

func (p *Pulse) Gossip()

func (*Pulse) RemovePulser

func (p *Pulse) RemovePulser(ipAddr, port string) error

Remove the pulser to the map of nodes to monitor and immediately stop sending pulses

func (*Pulse) StartAllPulser

func (p *Pulse) StartAllPulser() error

Collectively start all pulsers added to monitor list

func (*Pulse) StartPulseRes

func (p *Pulse) StartPulseRes(ctx context.Context, cancel context.CancelFunc, ipAddr, port string) error

Start responding to pulse messages

func (*Pulse) Status

func (p *Pulse) Status(id Identifier) (*Status, error)

Get the current status of a specific Node identified by Identifier

func (*Pulse) StatusAll

func (p *Pulse) StatusAll() ([]*Status, error)

Get the current status of all Nodes

func (*Pulse) StopAllPulser

func (p *Pulse) StopAllPulser()

func (*Pulse) StopPulseRes

func (p *Pulse) StopPulseRes()

type PulseResponse

type PulseResponse struct {
	Id       uint8
	Addr     string
	Message  string
	Optional interface{}
}

Response struct to send upon receiving a pulse request

type Pulser

type Pulser interface {
	// Starts responding to the pulse requests on IP Address: ipAddr and Port: port
	StartPulseRes(ipAddr, port string) error

	// Stops responding to pulse requests
	StopPulseRes()
}

Pulser is a node that responds to pulse requests

type State

type State int
const (
	// Received a pulse directly or indirectly within a time bound
	Alive State = iota
	// Received no pulse directly within a time bound or received gossip message declaring dead
	Dead
	// State assumed when a node is first discovered and contacted
	Pending
	// Suspect state, awaiting time bound to officially announce as dead
	Suspect
)

type Status

type Status struct {
	Id            Identifier
	State         State
	RTT           float32
	LastConnected time.Time
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL