Documentation ¶
Index ¶
- Variables
- func HttpAPI(p *Pulse, port int, env string)
- func IdentifierToAddr(iden Identifier) (ipAddr, port string)
- func PulseServer(ctx context.Context, addr Identifier, wg sync.WaitGroup) (net.Addr, error)
- func RequestPulse(ctx context.Context, node *Node, resp chan interface{})
- func SendPulse(ctx context.Context, node *Node, nStream chan FailureMessage, ...)
- type Cli
- type Coordinator
- type FailureMessage
- type FullNode
- type Identifier
- type Node
- type Pulse
- func (p *Pulse) AddPulser(ipAddr, port string, maxRetry, delay uint8, wg sync.WaitGroup) error
- func (p *Pulse) Gossip()
- func (p *Pulse) RemovePulser(ipAddr, port string) error
- func (p *Pulse) StartAllPulser() error
- func (p *Pulse) StartPulseRes(ctx context.Context, cancel context.CancelFunc, ipAddr, port string) error
- func (p *Pulse) Status(id Identifier) (*Status, error)
- func (p *Pulse) StatusAll() ([]*Status, error)
- func (p *Pulse) StopAllPulser()
- func (p *Pulse) StopPulseRes()
- type PulseResponse
- type Pulser
- type State
- type Status
Constants ¶
This section is empty.
Variables ¶
var (
MAGIC_BYTES = []byte("qbpulse") // Magic bytes allow clients to verify they are speaking the same protocol
)
Functions ¶
func IdentifierToAddr ¶
func IdentifierToAddr(iden Identifier) (ipAddr, port string)
func PulseServer ¶
Create a Pulse server that listens on addr Identifier and responds to pulse messages Spins up two go routines, one for listening for UDP messages, and the other to wait on ctx.Done
func RequestPulse ¶
func SendPulse ¶
Request a pulse signal to a node, if failure is suspected, choose 3 random nodes to inquire about liveliness Retry maxRetry times while waiting for average RTT between each messages. If both these methods fail to produce pulse from the suspect, mark the node as dead and send a Failure message to nStream channel.
Types ¶
type Coordinator ¶
type Coordinator interface { // Add an IP Address: ipAddr, Port: port to the monitor list and start asking for pulses AddPulser(ipAddr, port string, maxRetry int) error // Remove IP Address: ipAddr, Port: port from the monitor list and stop asking for pulses RemovePulser(ipAddr, port string) error // Collectively stop monitoring all pulsers StopAllPulser() // Collectively start all pulsers added to monitor list StartAllPulser() error // Get the current status of a specific Node identified by Identifier Status(id Identifier) (Status, error) // Get the current status of all Nodes StatusAll() ([]*Status, error) }
Coordinator is a node that requests pulse response from a map of nodes
type FailureMessage ¶
type FailureMessage struct { Id Identifier State State RetryAttempts uint8 GossipPulse []Identifier InitialConnect time.Time LastConnected time.Time }
Message struct to send to notify channel when a node is determined to have failed
type FullNode ¶
type FullNode interface { Pulser Coordinator Gossip() }
Pulser and Coordinator functions are indepedent, but can also be used together as a Full node
type Identifier ¶
type Identifier string
Identifier is ipv4:port format (eg. 212.189.35.68:3005)
func AddrToIdentifier ¶
func AddrToIdentifier(ipAddr, port string) Identifier
type Node ¶
type Node struct { IpAddr string Port string Status State Tracking bool MaxRetry uint8 Delay uint8 RTT float32 InitialConnect time.Time LastConnected time.Time // contains filtered or unexported fields }
func CreateNode ¶
Create a node to track
type Pulse ¶
type Pulse struct { Id string // contains filtered or unexported fields }
Pulse implements the FullNode interface, It can act as a Coordinator or Pulser indenpendently, Or it can act as a Full Node with gossip protocol
func Initialize ¶
func Initialize(capacity int) (*Pulse, chan FailureMessage, error)
func (*Pulse) AddPulser ¶
Add the pulser to the map of nodes to monitor and immediately start sending pulses maxRetry: the number of times to re-send the udp message before declaring the node dead delay: the number of seconds to delay between each message
func (*Pulse) RemovePulser ¶
Remove the pulser to the map of nodes to monitor and immediately stop sending pulses
func (*Pulse) StartAllPulser ¶
Collectively start all pulsers added to monitor list
func (*Pulse) StartPulseRes ¶
func (p *Pulse) StartPulseRes(ctx context.Context, cancel context.CancelFunc, ipAddr, port string) error
Start responding to pulse messages
func (*Pulse) Status ¶
func (p *Pulse) Status(id Identifier) (*Status, error)
Get the current status of a specific Node identified by Identifier
func (*Pulse) StopAllPulser ¶
func (p *Pulse) StopAllPulser()
func (*Pulse) StopPulseRes ¶
func (p *Pulse) StopPulseRes()
type PulseResponse ¶
Response struct to send upon receiving a pulse request
type Pulser ¶
type Pulser interface { // Starts responding to the pulse requests on IP Address: ipAddr and Port: port StartPulseRes(ipAddr, port string) error // Stops responding to pulse requests StopPulseRes() }
Pulser is a node that responds to pulse requests
type State ¶
type State int
const ( // Received a pulse directly or indirectly within a time bound Alive State = iota // Received no pulse directly within a time bound or received gossip message declaring dead Dead // State assumed when a node is first discovered and contacted Pending // Suspect state, awaiting time bound to officially announce as dead Suspect )