cluster

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 6, 2022 License: MIT Imports: 12 Imported by: 0

README

cluster

Reworked version of Dinghy

Cluster implements leader election using part of the raft protocol. It might be useful if you have several workers but only want one of them at a time doing things.

package main

import (
	"flag"
	"fmt"
	"log"
	"net/http"
	"os"
	"strings"

	"github.com/dmitry-kovalev/cluster"
)

func main() {
	addr := flag.String("addr", "localhost:8899", "The address to listen on.")
	nodesList := flag.String("nodes", "localhost:8899,localhost:8898,localhost:8897", "Comma separated list of host:port")
	flag.Parse()

	nodes := strings.Split(*nodesList, ",")

	onLeader := func() error {
		fmt.Println("leader")
		return nil
	}
	onFollower := func() error {
		fmt.Println("me follower")
		return nil
	}

	cl, err := cluster.New(
		nodes,
		onLeader,
		onFollower,
		&cluster.LogLogger{Logger: log.New(os.Stderr, "logger: ", log.Lshortfile)},
		cluster.DefaultElectionTickRange,
		cluster.DefaultHeartbeatTickRange,
	)
	if err != nil {
		log.Fatal(err)
	}
	for _, route := range cl.Routes() {
		http.HandleFunc(route.Path, route.Handler)
	}
	go func() {
		if err := cl.Start(); err != nil {
			log.Fatal(err)
		}
	}()
	log.Fatal(http.ListenAndServe(*addr, nil))
}

You can test cluster with several nodes using docker-compose

$ export LOCAL_IP=192.168.1.138
$ docker compose up
[+] Running 4/4
 ⠿ Network cluster_default    Created                                                                                                                                                              3.8s
 ⠿ Container cluster_node3_1  Created                                                                                                                                                              0.1s
 ⠿ Container cluster_node1_1  Created                                                                                                                                                              0.1s
 ⠿ Container cluster_node2_1  Created                                                                                                                                                              0.1s
Attaching to node1_1, node2_1, node3_1
node2_1  | 14:19:40 [INFO] [cluster] Creating cluster node with nodes: [192.168.1.138:8899 192.168.1.138:8898 192.168.1.138:8897]
node2_1  | 14:19:40 [INFO] [cluster] Starting cluster node
node2_1  | 14:19:40 [INFO] [cluster] Current state is {"id":3626187060,"leader_id":0,"state":"follower","term":1,"voted_for":0}
node2_1  | 14:19:40 [INFO] [cluster] Entering follower state, leader id 0
node2_1  | 14:19:40 [INFO] Me follower
node3_1  | 14:19:41 [INFO] [cluster] Creating cluster node with nodes: [192.168.1.138:8899 192.168.1.138:8898 192.168.1.138:8897]
node3_1  | 14:19:41 [INFO] [cluster] Starting cluster node
node3_1  | 14:19:41 [INFO] [cluster] Current state is {"id":3776595137,"leader_id":0,"state":"follower","term":1,"voted_for":0}
node3_1  | 14:19:41 [INFO] [cluster] Entering follower state, leader id 0
node3_1  | 14:19:41 [INFO] Me follower
node2_1  | 14:19:42 [INFO] [cluster] Follower heartbeat timeout, transitioning to candidate
node2_1  | 14:19:42 [INFO] [cluster] Current state is {"id":3626187060,"leader_id":0,"state":"candidate","term":2,"voted_for":0}
node2_1  | 14:19:42 [INFO] [cluster] Entering candidate state
node2_1  | 14:19:42 [INFO] [cluster] Requesting vote
node2_1  | 14:19:42 [INFO] [cluster] Got RequestVote request, voting for candidate id 3626187060 {"id":3626187060,"leader_id":0,"state":"candidate","term":2,"voted_for":0}
node3_1  | 14:19:42 [INFO] [cluster] got RequestVote request from newer term, stepping down {Term:2 CandidateID:3626187060 NodeID:3626187060} {"id":3776595137,"leader_id":0,"state":"follower","term":1,"voted_for":0}
node3_1  | 14:19:42 [INFO] [cluster] Got RequestVote request, voting for candidate id 3626187060 {"id":3776595137,"leader_id":0,"state":"follower","term":2,"voted_for":0}
node2_1  | 14:19:42 [INFO] [cluster] Election won with 2 votes, becoming leader {"id":3626187060,"leader_id":0,"state":"candidate","term":2,"voted_for":3626187060}
node2_1  | 14:19:42 [INFO] [cluster] Candidate state changed to leader
node2_1  | 14:19:42 [INFO] [cluster] Current state is {"id":3626187060,"leader_id":3626187060,"state":"leader","term":2,"voted_for":3626187060}
node2_1  | 14:19:42 [INFO] [cluster] Entering leader state
node2_1  | 14:19:42 [INFO] I'm leader
node1_1  | 14:19:43 [INFO] [cluster] Creating cluster node with nodes: [192.168.1.138:8899 192.168.1.138:8898 192.168.1.138:8897]
node1_1  | 14:19:43 [INFO] [cluster] Starting cluster node
node1_1  | 14:19:43 [INFO] [cluster] Current state is {"id":3070019158,"leader_id":0,"state":"follower","term":1,"voted_for":0}
node1_1  | 14:19:43 [INFO] [cluster] Entering follower state, leader id 0
node1_1  | 14:19:43 [INFO] Me follower
node1_1  | 14:19:43 [INFO] [cluster] Got AppendEntries request from newer term, stepping down {Term:2 LeaderID:3626187060 NodeID:3626187060} {"id":3070019158,"leader_id":0,"state":"follower","term":1,"voted_for":0}

Documentation

Overview

Package cluster is a leader election mechanism that follows the raft protocol https://raft.github.io/ using http as the transport, up until the use of replicated log. For a more complete raft implementation use https://godoc.org/github.com/coreos/etcd/raft or https://github.com/hashicorp/raft

Leader Election

The process begins with all nodes in FOLLOWER state and waiting for an election timeout. This timeout is recommended to be randomized between 150ms and 300ms. In order to reduce the amount of traffic this will be increased to ElectionTickRange.

After the election timeout, the FOLLOWER becomes a CANDIDATE and begins an election term. It starts by voting for itself and then sends out a RequestVote to all other nodes.

If the receiving nodes haven't voted yet, they will then vote for the candidate with a successful request. The current node will reset it's election timeout and when a candidate has a majority vote it will become a LEADER.

At this point the LEADER will begin sending an AppendEntries request to all other nodes at a rate specified by the heartbeat timeout. The heartbeat timeout should be shorter than the election timeout, preferably by a factor of 10. Followers respond to the AppendEntries, and this term will continue until a follower stops receiving a heartbeat and becomes a candidate.

There is a case where two nodes can be come candidates at the same time, which is referred to a split vote. In this case two nodes will start an election for the same term and each reaches a single follower node before the other. In this case each candidate will have two votes with no more available for the term and no majority. A new election will happen and finally a candidate will become a LEADER. This scenario can happen with an even number of nodes.

Index

Constants

View Source
const (
	// RouteAppendEntries for append entries requests.
	RouteAppendEntries = "/appendentries"
	// RouteID for id requests.
	RouteID = "/id"
	// RouteRequestVote for request vote requests.
	RouteRequestVote = "/requestvote"
	// RouteStatus will render the current nodes full state.
	RouteStatus = "/status"
	// RouteStepDown to force a node to step down.
	RouteStepDown = "/stepdown"
)
View Source
const (
	// UnknownLeaderID is set when a new election is in progress.
	UnknownLeaderID = uint32(0)
	// NoVote is set to represent the node has not voted.
	NoVote = uint32(0)
	// StateCandidate represents the raft candidate state
	StateCandidate = iota
	// StateFollower represents the raft follower state
	StateFollower
	// StateLeader represents the raft leader state
	StateLeader
)

Variables

View Source
var (
	// DefaultOnLeader is a no op function to execute when a node becomes a leader.
	DefaultOnLeader = func() error { return nil }
	// DefaultOnFollower is a no op function to execute when a node becomes a follower.
	DefaultOnFollower = func() error { return nil }
	// DefaultRoutePrefix is what is prefixed for the cluster routes. (/cluster)
	DefaultRoutePrefix = "/cluster"
	// ErrTooFewVotes happens on a RequestVote when the candidate receives less than the
	// majority of votes.
	ErrTooFewVotes = errors.New("too few votes")
	// ErrNewElectionTerm if during RequestVote there is a higher term found.
	ErrNewElectionTerm = errors.New("newer election term")
	// ErrLeader is returned when an operation can't be completed on a
	// leader node.
	ErrLeader = errors.New("node is the leader")
	// ErrNotLeader is returned when an operation can't be completed on a
	// follower or candidate node.
	ErrNotLeader = errors.New("node is not the leader")
)
View Source
var (
	// DefaultElectionTickRange will set the range of numbers for the election timeout. For example
	// a value of 1500 will first hash the input Addr to a number from 0 to 1500 and then
	// add that 1500 to give a result between 1500 and 3000
	DefaultElectionTickRange = 4000
	// DefaultHeartbeatTickRange will set the range of numbers for the heartbeat timeout.
	DefaultHeartbeatTickRange = 1000
)

Functions

This section is empty.

Types

type AppendEntriesRequest

type AppendEntriesRequest struct {
	Term     int    `json:"term"`
	LeaderID uint32 `json:"leader_id"`
	NodeID   uint32 `json:"node_id"`
}

AppendEntriesRequest represents AppendEntries requests. Replication logging is ignored.

type ApplyFunc

type ApplyFunc func() error

ApplyFunc is for on leader and on follower events.

type Cluster

type Cluster struct {

	// Nodes is a list of all nodes for consensus.
	Nodes []string
	// OnLeader is an optional function to execute when becoming a leader.
	OnLeader func() error
	// OnFollower is an optional function to execute when becoming a follower.
	OnFollower func() error
	// State for holding the raft state.
	State *State
	// contains filtered or unexported fields
}

Cluster manages the raft FSM and executes OnLeader and OnFollower events.

func New

func New(nodes []string, onLeader, onFollower ApplyFunc, l Logger, eMS, hMS int) (*Cluster, error)

New initializes a new cluster. Start is required to be run to begin leader election.

func (*Cluster) AppendEntriesHandler

func (c *Cluster) AppendEntriesHandler() func(w http.ResponseWriter, r *http.Request)

AppendEntriesHandler (POST) handles append entry requests

func (*Cluster) AppendEntriesRequest

func (c *Cluster) AppendEntriesRequest() (int, error)

AppendEntriesRequest will broadcast an AppendEntries request to peers. In the raft protocol this deals with appending and processing the replication log, however for leader election this is unused. It returns the current term with any errors.

func (*Cluster) BroadcastRequest

func (c *Cluster) BroadcastRequest(peers []string, method, route string, body []byte, timeoutMS int) []*http.Response

BroadcastRequest will send a request to all other nodes in the system.

func (*Cluster) IDHandler

func (c *Cluster) IDHandler() func(w http.ResponseWriter, r *http.Request)

IDHandler (GET) returns the nodes id.

func (*Cluster) RequestVoteHandler

func (c *Cluster) RequestVoteHandler() func(w http.ResponseWriter, r *http.Request)

RequestVoteHandler (POST) handles requests for votes

func (*Cluster) RequestVoteRequest

func (c *Cluster) RequestVoteRequest() (int, error)

RequestVoteRequest will broadcast a request for votes in order to update node to either a follower or leader. If this candidate becomes leader error will return nil. The latest known term is always returned (this could be a newer term from another peer).

func (*Cluster) Routes

func (c *Cluster) Routes() []*Route

Routes create the routes required for leader election.

func (*Cluster) Start

func (c *Cluster) Start() error

Start begins the leader election process.

func (*Cluster) StatusHandler

func (c *Cluster) StatusHandler() func(w http.ResponseWriter, r *http.Request)

StatusHandler (GET) returns the nodes full state.

func (*Cluster) StepDownHandler

func (c *Cluster) StepDownHandler() func(w http.ResponseWriter, r *http.Request)

StepDownHandler (POST) will force the node to step down to a follower state.

func (*Cluster) Stop

func (c *Cluster) Stop() error

Stop will stop any running event loop.

type DiscardLogger

type DiscardLogger struct {
}

DiscardLogger is a noop logger.

func (*DiscardLogger) Debugf

func (d *DiscardLogger) Debugf(format string, v ...interface{})

Debugf noop

func (*DiscardLogger) Errorf

func (d *DiscardLogger) Errorf(format string, v ...interface{})

Errorf noop

func (*DiscardLogger) Errorln

func (d *DiscardLogger) Errorln(v ...interface{})

Errorln noop

func (*DiscardLogger) Infof

func (d *DiscardLogger) Infof(format string, v ...interface{})

Infof noop

func (*DiscardLogger) Printf

func (d *DiscardLogger) Printf(format string, v ...interface{})

Printf noop

func (*DiscardLogger) Println

func (d *DiscardLogger) Println(v ...interface{})

Println noop

func (*DiscardLogger) Tracef added in v0.1.1

func (d *DiscardLogger) Tracef(format string, v ...interface{})

Tracef noop

type LogLogger

type LogLogger struct {
	Logger *log.Logger
}

LogLogger uses the std lib logger.

func (*LogLogger) Debugf

func (d *LogLogger) Debugf(format string, v ...interface{})

Debugf std lib

func (*LogLogger) Errorf

func (d *LogLogger) Errorf(format string, v ...interface{})

Errorf std lib

func (*LogLogger) Errorln

func (d *LogLogger) Errorln(v ...interface{})

Errorln std lib

func (*LogLogger) Infof

func (d *LogLogger) Infof(format string, v ...interface{})

Infof std lib

func (*LogLogger) Printf

func (d *LogLogger) Printf(format string, v ...interface{})

Printf std lib

func (*LogLogger) Println

func (d *LogLogger) Println(v ...interface{})

Println std lib

func (*LogLogger) Tracef added in v0.1.1

func (d *LogLogger) Tracef(format string, v ...interface{})

Debugf std lib

type Logger

type Logger interface {
	Printf(format string, v ...interface{})
	Println(v ...interface{})
	Infof(format string, v ...interface{})
	Debugf(format string, v ...interface{})
	Tracef(format string, v ...interface{})
	Errorf(format string, v ...interface{})
	Errorln(v ...interface{})
}

Logger is for logging to a writer. This is not the raft replication log.

type Route

type Route struct {
	Path    string
	Handler func(http.ResponseWriter, *http.Request)
}

Route holds path and handler information.

type State

type State struct {
	// contains filtered or unexported fields
}

State encapsulates the current nodes raft state.

func NewState

func NewState(id uint32, electionTimeoutMS, heartbeatTimeoutMS int) *State

NewState initializes a new raft state.

func (*State) AppendEntriesEvent

func (s *State) AppendEntriesEvent(event ...*AppendEntriesRequest) chan *AppendEntriesRequest

AppendEntriesEvent returns a channel for any successful append entries events.

func (*State) ElectionTick

func (s *State) ElectionTick() <-chan time.Time

ElectionTick returns a channel with a new random election tick.

func (*State) HeartbeatReset

func (s *State) HeartbeatReset(reset ...bool) <-chan struct{}

HeartbeatReset will signal a reset. This works with a listener for HeartbeatTick.

func (*State) HeartbeatTick

func (s *State) HeartbeatTick() <-chan time.Time

HeartbeatTick returns a channel with a heartbeat timeout set to heartbeatTimeoutMS.

func (*State) HeartbeatTickRandom

func (s *State) HeartbeatTickRandom() <-chan time.Time

HeartbeatTickRandom returns a channel with a random heartbeat timeout. 500ms is added to the minimum heartbeatTimeoutMS to compensate for possible network latency.

func (*State) ID

func (s *State) ID() uint32

ID returns the nodes id.

func (*State) LeaderID

func (s *State) LeaderID(id ...uint32) uint32

LeaderID will return the states current leader id or if an argument is passed in will set the current LeaderID.

func (*State) State

func (s *State) State(state ...int) int

State will return the states current state or if an argument is passed in will set the state

func (*State) StateChanged

func (s *State) StateChanged() chan int

StateChanged returns a channel for any state changes that occur.

func (*State) StateString

func (s *State) StateString(state int) string

StateString returns the current state as a string.

func (*State) StepDown

func (s *State) StepDown(term int)

StepDown will step down the state by resetting to the given term and emitting a state change.

func (*State) String

func (s *State) String() string

String will return the current states fields for debugging.

func (*State) Term

func (s *State) Term(term ...int) int

Term will return the states current term or if an argument is passed in will set the term

func (*State) VotedFor

func (s *State) VotedFor(votedFor ...uint32) uint32

VotedFor will return the states current vote or if an argument is passed in will set the vote

type Status

type Status struct {
	ID       uint32 `json:"id"`
	LeaderID uint32 `json:"leader_id"`
	State    string `json:"state"`
	Term     int    `json:"term"`
	VotedFor uint32 `json:"voted_for"`
}

Status is used to show the current states status.

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL