swim

package module
v0.0.0-...-5713405 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 9, 2019 License: Apache-2.0 Imports: 20 Imported by: 0

README

SWIM: Scalabe Weakly-consistent Infection-style Process Group Membership Protocol

Build Status License Language Coverage Status

SWIM Paper

많은 분산 P2P(peer-to-peer) 어플리케이션은 모든 참여하는 Process에 대해 weakly-consistent한 Process 그룹 구성원 정보가 필요하다. SWIM은 대규모 프로세스 그룹에서 그룹 구성원 정보 서비스를 제공하는 범용 소프트웨어 모듈이다. SWIM은 전통적인 heart-beating 프로토콜의 unscalability를 극복하는 것을 목표로 한다. 전통적인 heart-beating protocol과는 다르게, SWIM은 membership protocol에서 failure detection과 membership 업데이트 기능을 분리하였다.

Basic SWIM Approach

SWIM은 크게 2개의 컴포넌트로 구성된다.

  • A Failure Detector Component: 멤버들의 failure를 감지
  • Dissemination Component: 최근에 fail, join, or left한 멤버들의 정보를 전파
SWIM Failure Detector

SWIM failure Detector 알고리즘은 2개의 파라미터를 사용한다: T(protocol period)k(the size of failure detection subgroups)

T 마다 Failure Detection을 Figure 1 같이 수행한다. Figure 1은 임의의 노드 M_i를 대상으로 Failure Detection 알고리즘을 보여준다.

  1. M_iT시간 마다 membership List에서 랜덤으로 member(M_j)를 하나 고르고, M_j 에게 ping을 보낸다.

  2. M_i가 ack timeout 시간(T 보다 작은 시간) 동안 M_jack를 기다린다.

    1. ack 메세지가 올 경우 종료
    2. ack 메세지가 오지 않을 경우 3 수행
  3. M_i는 mebership List에서 k개의 member를 고르고, ping-req(M_j)를 보낸다.

  4. ping-req(M_j)메세지를 받은 노드들은 M_j에게 ping을 보내고, ack를 받으면 M_i에게 전달한다.

  5. T 가 끝날때 쯤, M_iM_j로 부터 혹은 k개의 member로 부터 ack메세지를 받았는지 확인하고, 메세지가 없을 경우 M_j는 fail되었다 판단하여 Dissemination component에게 memberlist update를 요청한다.

ack timeout은 네트워크 안에서의 round-trip을 기반으로 결정한다(평균 혹은 99%를 커버 하도록), T는 최소 round-trip보다 3배 이상 이어야 한다(논문에서는 평균 round-trip시간을 timeout으로 설정하였고, T는 평균 round-trip시간보다 훨씬 높은 값으로 설정하였다)

각 Message의 data는 unique sequence number를 가지고 있다. ping, ping-req, ack 메세지의 사이즈는 constant며 group의 size와는 관련이 없다.

M_iM_j에게 k번의 ping을 날리지 않고 indirect하게 M_i의 다른 member에게 요청하는 이유는, M_iM_j사이의 네트워크의 혼잡성을 피하기 위함이다.

Dissemination Component and Dynamic Membership

Group member(M_j)의 fail을 감지하면, 노드(M_i1)는 Group의 다른 member에게 failed(M_j) 를 multicast한다. failed(M_j)를 받은 member들은 local membership List에서 M_j를 제거한다.

새로운 노드가 들어오거나 자발적으로 나가게 되면, 해당 정보를 비슷한 방식으로 multicast한다. 그러나 어떤 노드가 Group에 들어오기 위해서는 그룹의 최소한 하나의 member를 알아야 한다. 그 방법은 다음과 같다.

  • Group이 well known server와 관련이 있으면, 모든 join은 해당 address를 통해 이루어 질 수 있다.
  • Join 메세지를 broadcast하고 각 group의 member들은 확률적으로 reply 한다.
  • Group별로Join메세지를 전용으로 처리하는 Static coordinator를 둔다.

A More Robust and Efficient SWIM

일반적인 multicast 방식의 Dissemination은 패킷의 전달을 보장하지 않는 best-effort 방식이다. 왜냐하면, 패킷의 손실로 group member 간의 상관관계가 없는 임의의 member가 전달될 수 있기 때문이다.

하지만 SWIM에서는 multicast 방식이 아니라 Infection style을 사용한다. Infection style은 multicast 방식보다 데이터 전파가 많이 되지만, SWIM에서는 failure detection protocol의 membership update 정보를 ping, ping-req, ack 메시지에 같이 넣어서 보내는 piggyback 방식을 사용하여 기존 dissemination에서 발생하는 extra packet을 없앨 수 있게했다.

SWIM은 이렇게 failure detection과 membership update dissemination component를 분리하여 message overhead를 constant expected 하게 했고 이것은 packet 손실에 있어서 robustness하고 low latency를 제공한다.

Infection-style dissemination mechanism

Dissemination Component는 Failure Detector Protocol로부터 ping, ack 메시지를 받을 때마다 piggyback한다. 이 때 piggyback을 통해서 Dissemination Component는 불필요한 패킷들을 네트워크 내에서 날리지 않는다. SWIM에서 발생하는 패킷은 ping, ping-req, ack 메시지들에 의해서만 발생한다. 그리고 이것을 우리는 infection-style dissemination mechanism이라고 부른다.

Implementation Hint

각 그룹의 멤버 M_i는 버퍼를 가지고 있는데

  • 버퍼에는 최근에 발생한 membership update 정보들을 담고 있다.
  • 그리고 버퍼에 자신(M_i)이 주위 멤버들에게 piggyback한 횟수를 담고 있는 local count의 정보를 담는다. 그리고 이 local count로 자신이 다음에 누구에게 piggyback 할지를 결정할 수 있다.
  • 버퍼의 사이즈가 한 멤버가 ping (또는 ack)을 piggyback 할 수 있는 최대 횟수보다 크다면(충분히 크다면) 다음으로 piggyback 할 멤버는 local count가 작은 멤버로 한다.

그리고 두 개의 그룹 멤버 리스트를 가지고 있다.

  • 하나에는 아직 그룹에서 “failed” 되지 않은 멤버들을 담고 있고
  • 다른 하나에는 현재 “failed” 된 멤버들을 담고 있다.
Suspicion mechanism

만약 프로세스 M이 오버헤드가 큰 작업을 하고 있다면, ping을 받아도 오버헤드가 큰 작업 때문에 제 시간에 응답할 수 없을 것이다. 그렇다면 그룹 내의 M을 제외한 다른 프로세스들은 ping에 대한 M의 응답을 받지 못했기 때문에 정상 프로세스(non-faulty)를 잘못됨(faulty)으로 처리하고 네트워크에서 제외시킬 수 있다.

이렇게 non-faulty 프로세스를 faulty 프로세스로 잘못 처리하는 것을 방지하기 위해서 SWIM은 Suspicion 메커니즘을 도입했다. Suspicion 메커니즘은 non-faulty 프로세스가 ping을 제시간에 응답을 보내지 못하였을 때 바로 네트워크에서 제외시키는 것이 아니라 해당 프로세스를 “suspected”라고 체크하는 것이다. 그리고 해당 프로세스가 “suspected”가 되었다고 Dissemination Component를 통해서 전파한다.

이렇게 “suspected”로 선언된 프로세스가 일정시간이 지나도 ping에 대한 응답을 보내지 못했다면 그 때 비로소 “faulty”로 해당 프로세스를 처리하고 네트워크에서 제외시킨다. 그런데 일정시간이 지나기 전에 프로세스가 ping에 대한 응답을 준다면 “suspected” 된 프로세스의 상태를 “alive”로 바꾸고 해당 프로세스가 “alive” 되었다는 사실을 네트워크에 전파한다.

How Suspicion Protocol Works

Suspicion 프로토콜은 다음과 같이 동작한다. 예를 들어 현재 SWIM failure detector protocol period T 시간동안 멤버 M_i가 다른 멤버 M_jping 타겟으로 정하고 보냈다고 생각하자. 이때 M_iM_j에 대한 ack를 받지 못해도, M_iM_j를 바로 “failed”라고 선언하지 않는다. 대신 M_j를 “Suspected” 멤버라고 로컬 멤버십 리스트에 마크한다. 그리고

{Suspect M_j: M_j suspects M_j}

와 같은 형태의 메시지가 Dissemination Component를 통해서 infection-style로 전파된다. 그리고 전파된 메시지를 받은 멤버 M_iM_j를 “Suspected”로 마크한다. 그렇지만 “Suspected” 멤버는 여전히 정상적인(non-faulty) 멤버들과 똑같이 SWIM failure detector protocol에서 ping 타겟으로 삼을 수 있다.

protocol period T시간동안 M_iM_j에게 ping을 보내는데 성공했다면 로컬 멤버십 리스트의 M_j에 대한 “Suspected” 마크를 지운다. 그리고

{Alive M_j: M_l knows M_j is alive}

와 같은 형태의 메시지를 Dissemination Component를 통해서 전파한다. 그리고 이런 Alive 메시지를 다른 멤버들이 받으면 똑같이 M_j 의 “Suspected” 마크를 지운다.

M_j 입장에서 다른 멤버로부터 M_j를 “Suspected”하고 있다는 메시지를 받으면 M_j는 “Alive” 메시지를 다른 멤버들에게 전파할 수 있다.

멤버십 리스트에서 “Suspected” 멤버 리스트는 일정 시간이 지나면 expired된다. 예를 들어 M_hM_j를 “Suspected” 리스트에 가지고 있을 때, 일정 시간이 지날 동안 M_j로부터 “Alive” 메시지를 받지 못한다면 M_hM_j를 잘못된(faulty) 멤버라고 생각하고 로컬 멤버십 리스트에서 제외한다. 그리고 M_j

{Confirm M_j: M_h declares M_j as faulty}

와 같은 형태의 메시지를 Dissemination Component를 통해서 전파한다. 이와 같은 Suspicion 프로토콜을 통해서 failure detection의 false positives의 비율을 낮출 수 있다.

Message Override Rule

위의 예시에서 살펴보면 “Alive” 메시지는 “Suspect” 메시지를 오버라이드할 수 있다. 마찬가지로 “Confirm” 메시지는 “Suspect”메시지와 “Alive” 메시지를 오버라이드 할 수 있다. 그런데 메시지 오버라이드 규칙을 정하기위해서 메시지에 메시지 종류뿐만아니라 incarnation number도 추가하여야하는데 이는 멤버의 라이프사이클 동안 여러번 메시지를 받을 수 있기 때문이다.

첫 번째로 받은 “Alive” 메시지와 두 번째로 받은 “Alive” 메시지를 구분해야 하는데 이를 incarnation number을 통해서 하게된다. incarnation number는 글로벌하다. 각 멤버 M_iincarnation number은 그룹에 처음 들어오게되면 0으로 초기화된다. 그리고 Dissemination Component로부터 M_i 자신이 현재 incarnation에서 “Suspected” 되고 있다면 M_iincarnation number를 증가시키고 “Alive” 메시지를 Dissemination Component를 통해서 그룹에 전파하게된다.

이와 같이 “Suspect”, “Alive”, “Confirm” 메시지는 각 멤버의 incarnation number, 멤버의 id를 포함하고 있다. 그래서 메시지 별 선호 규칙과 그것이 어떻게 멤버십 리스트에 영향을 미치는지는 다음과 같다.

* {Alive Ml, inc=i} overrides
    - {Suspect Ml, inc=j}, i>j
    - {Alive Ml, inc=j}, i>j

* {Suspect Ml, inc=i} overrides
    - {Suspect Ml, inc=j}, i>j
    - {Alive Ml, inc=j}, ij

* {Confirm Ml, inc=i} overrides
    - {Alive Ml, inc=j}, any j
    - {Alive Ml, inc=j}, any j

이와 같은 방식으로 각 메시지가 도착했을 때 각 멤버의 상태를 수정한다.

Round-Robin Probe Target Selection

SWIM Failure detector 알고리즘은 그룹 내에서 첫 번째로 문제가 생긴 멤버를 찾아 낼 때 큰 딜레이가 생길 수 있고, 심한 경우에는 문제가 생긴 멤버를 ping 타겟으로 삼지 못해 멤버십의 멤버로 찾아내지 못할 수도 있다. 이러한 문제를 해결하기 위해서 각 멤버 M_i는 현재 멤버십에서 알고 있는 멤버들의 리스트를 들고 있고 이 중에서 ping 타겟을 랜덤으로 뽑는 것이 아니라 round-robin 방식으로 뽑는다. 전체 리스트를 한 바퀴 다 돌면 M_i는 다시 리스트를 랜덤으로 재배열한다.

이런 방식으로 ping을 보내게 되면 M_i이 가지고 있는 멤버십 리스트를 한바퀴 다 돌 때 모든 멤버들을 한 번씩은 호출하게 된다. 그래서 문제가 생긴 멤버가 생기더라도 그 멤버를 찾는데 걸리는 시간의 worst case를 M_i이 가지고 있는 멤버의 수로 한정할 수 있다.

License

Project source code files are made available under the Apache License, Version 2.0 (Apache-2.0), located in the LICENSE.

CLA Hub

To get started, sign the Contributor License Agreement.

Documentation

Index

Constants

View Source
const (
	AVAILABLE status = iota
	DIE
)
View Source
const (
	InitialPriority = 0
)

Variables

View Source
var ErrCallbackCollectIntervalNotSpecified = errors.New("Error callback collect interval should be specified")
View Source
var ErrCreatingSuspicion = errors.New("error occurred while creating suspicion")
View Source
var ErrEmptyMemberID = errors.New("MemberID is empty")
View Source
var ErrIndProbeFailed = errors.New("error when indirect-probe failed")
View Source
var ErrInvalidMbrStatsMsgType = errors.New("error invalid mbrStatsMsg type")
View Source
var ErrInvalidMessage = errors.New("Error invalid message")
View Source
var ErrInvalidPayloadType = errors.New("invalid indirect-ping response payload type")
View Source
var ErrMemberUnknownState = errors.New("error member is in unknown state")
View Source
var ErrPingFailed = errors.New("error ping failed")
View Source
var ErrPopInvalidType = errors.New("pop invalid typed item")
View Source
var ErrSendTimeout = errors.New("Error send timeout")
View Source
var ErrStoreEmpty = errors.New("empty store")
View Source
var ErrUnreachable = errors.New("Error this shouldn't reach")

Functions

func GenerateKey

func GenerateKey(keyPath string, SecLv int) (heimdall.PriKey, heimdall.PubKey)

if the key already exists, create a new one, or load the existing key.

func GenerateNewKey

func GenerateNewKey(keyPath string, SecLv int) (heimdall.PriKey, heimdall.PubKey)

func GetNodeID

func GetNodeID(keyPath string) string

func LoadKeyPair

func LoadKeyPair(keyPath string) (heimdall.PriKey, heimdall.PubKey)

func NewPacketListener

func NewPacketListener(ip net.IP, port int) (*net.UDPConn, error)

func ParseHostPort

func ParseHostPort(address string) (net.IP, uint16, error)

Types

type AliveMessage

type AliveMessage struct {
	MemberMessage
}

type Awareness

type Awareness struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Awareness manages health of the local node. This related to Lifeguard L1 "self-awareness" concept. Expect to receive replies to probe messages we sent Start timeouts low, increase in response to absence of replies

func NewAwareness

func NewAwareness(max int) *Awareness

func (*Awareness) ApplyDelta

func (a *Awareness) ApplyDelta(delta int)

ApplyDelta with given delta applies it to the score in thread-safe manner score must be bound from 0 to max value

func (*Awareness) GetHealthScore

func (a *Awareness) GetHealthScore() int

func (*Awareness) ScaleTimeout

func (a *Awareness) ScaleTimeout(timeout time.Duration) time.Duration

ScaleTimeout takes the given duration and scales it based on the current score. Less healthyness will lead to longer timeouts.

type Config

type Config struct {

	// The maximum number of times the same piggyback data can be queried
	MaxlocalCount int

	// The maximum number of node-self-awareness counter
	MaxNsaCounter int

	// T is the the period of the probe
	T int

	// Timeout of ack after ping to a member
	AckTimeOut int

	// K is the number of members to send indirect ping
	K int

	// my address and port
	BindAddress string
	BindPort    int
}

type DefaultMessageEndpoint

type DefaultMessageEndpoint struct {
	// contains filtered or unexported fields
}

MessageEndpoint basically do receiving packet and determine which logic executed based on the packet.

func (*DefaultMessageEndpoint) Listen

func (m *DefaultMessageEndpoint) Listen()

Listen is a log running goroutine that pulls packet from the transport and pass it for processing

func (*DefaultMessageEndpoint) Send

func (m *DefaultMessageEndpoint) Send(addr string, msg pb.Message) error

Send asynchronously send message to member of addr, don't wait until response come back, after response came back, callback function executed, Send can be used in the case of gossip message to other members

func (*DefaultMessageEndpoint) Shutdown

func (m *DefaultMessageEndpoint) Shutdown()

func (*DefaultMessageEndpoint) SyncSend

func (m *DefaultMessageEndpoint) SyncSend(addr string, msg pb.Message) (pb.Message, error)

SyncSend synchronously send message to member of addr, waits until response come back, whether it is timeout or send failed, SyncSend can be used in the case of pinging to other members. if @timeout is provided then set send timeout to given parameters, if not then calculate timeout based on the its awareness

type IndProbeResponse

type IndProbeResponse struct {
	// contains filtered or unexported fields
}

func (IndProbeResponse) Ok

func (r IndProbeResponse) Ok() bool

type Item

type Item struct {
	// contains filtered or unexported fields
}

An Item is something we manage in a priority queue.

type MbrStatsMsgStore

type MbrStatsMsgStore interface {
	Len() int
	Push(pbk pb.MbrStatsMsg)
	Get() (pb.MbrStatsMsg, error)
	IsEmpty() bool
}

type Member

type Member struct {
	// Id of member
	ID MemberID

	// Ip address
	Addr net.IP

	// Port
	Port uint16

	// Current member status from my point of view
	Status Status

	// Time last status change happened
	LastStatusChange time.Time

	// Incarnation helps to keep the most fresh information about member status in the system
	// which tells that suspect member confirming that it is alive, and only when suspect
	// got suspicion message, that member can increments incarnation
	Incarnation uint32

	// Suspicion manages the suspect timer and helps to accelerate the timeout
	// as member self got more independent confirmations that a target member is suspect.
	Suspicion *Suspicion
}

Struct of Member

func (*Member) Address

func (m *Member) Address() string

Convert member addr and port to string

func (*Member) GetID

func (m *Member) GetID() MemberID

Get

func (*Member) GetIDString

func (m *Member) GetIDString() string

type MemberID

type MemberID struct {
	ID string
}

func NewMemberID

func NewMemberID(s string) MemberID

type MemberMap

type MemberMap struct {
	// contains filtered or unexported fields
}

func NewMemberMap

func NewMemberMap(config *SuspicionConfig) *MemberMap

func (*MemberMap) Alive

func (m *MemberMap) Alive(aliveMessage AliveMessage) (bool, error)

Process Alive message

Return 2 parameter bool, error bool means Changed in MemberList

1. If aliveMessage Id is empty return false and ErrEmptyMemberID 2. if aliveMessage Id is not in MemberList then Create Member and Add MemberList 3. if aliveMessage Id is in MemberList and existingMember's Incarnation is bigger than AliveMessage's Incarnation Than return false, ErrIncarnation 4. if aliveMessage Id is in MemberList and AliveMessage's Incarnation is bigger than existingMember's Incarnation Than Update Member and Return true, nil

func (*MemberMap) GetMembers

func (m *MemberMap) GetMembers() []Member

func (*MemberMap) Merge

func (memberMap *MemberMap) Merge(membership *pb.Message_Membership)

func (*MemberMap) Reset

func (m *MemberMap) Reset()

Delete all dead node, Reset waiting list,

func (*MemberMap) SelectKRandomMemberID

func (m *MemberMap) SelectKRandomMemberID(k int) []Member

Select K random member (length of returning member can be lower than k).

func (*MemberMap) Suspect

func (m *MemberMap) Suspect(msg SuspectMessage) (bool, error)

Suspect handle suspectMessage, if this function update member states return true otherwise false

type MemberMessage

type MemberMessage struct {
	ID          string
	Addr        net.IP
	Port        uint16
	Incarnation uint32
}

type MessageEndpoint

type MessageEndpoint interface {
	Listen()
	SyncSend(addr string, msg pb.Message) (pb.Message, error)
	Send(addr string, msg pb.Message) error
	Shutdown()
}

func NewMessageEndpoint

func NewMessageEndpoint(config MessageEndpointConfig, transport UDPTransport, messageHandler MessageHandler) (MessageEndpoint, error)

type MessageEndpointConfig

type MessageEndpointConfig struct {
	EncryptionEnabled bool
	SendTimeout       time.Duration

	// callbackCollect Interval indicate time interval to clean up old
	// callback function
	CallbackCollectInterval time.Duration
}

type MessageHandler

type MessageHandler interface {
	// contains filtered or unexported methods
}

handler interface to handle received message

type Packet

type Packet struct {
	// Buffer is the contents of the packet.
	Buf []byte

	// From has the address of the peer. This is an actual net.Addr so we
	// can expose some concrete details about incoming packets.
	Addr net.Addr

	// Timestamp is the time when the packet was received.
	Timestamp time.Time
}

Packet is used to provide some metadata about incoming packets from peers over a packet connection

type PacketTransport

type PacketTransport struct {
	// contains filtered or unexported fields
}

PacketTransport implements Transport interface, which is used ONLY for connectionless UDP packet operations

func NewPacketTransport

func NewPacketTransport(config *PacketTransportConfig) (*PacketTransport, error)

func (*PacketTransport) PacketCh

func (t *PacketTransport) PacketCh() <-chan *Packet

func (*PacketTransport) Shutdown

func (t *PacketTransport) Shutdown() error

func (*PacketTransport) WriteTo

func (t *PacketTransport) WriteTo(b []byte, addr string) (time.Time, error)

type PacketTransportConfig

type PacketTransportConfig struct {
	// BindAddrs is representing an address to use for UDP communication
	BindAddress string

	// BindPort is the port to listen on, for the address specified above
	BindPort int
}

PacketTransportConfig is used to configure a udp transport

type PriorityMbrStatsMsgStore

type PriorityMbrStatsMsgStore struct {
	// contains filtered or unexported fields
}

PiggyBackStore stores piggyback data in the priority queue and returns data with smallest local count.

func NewPriorityMbrStatsMsgStore

func NewPriorityMbrStatsMsgStore(maxLocalCount int) *PriorityMbrStatsMsgStore

macLocalCount is the max priority value

func (*PriorityMbrStatsMsgStore) Get

Return the mbrStatsMsg with the smallest local count in the list, increment the local count and sort it again, not delete the data.

func (*PriorityMbrStatsMsgStore) IsEmpty

func (p *PriorityMbrStatsMsgStore) IsEmpty() bool

func (*PriorityMbrStatsMsgStore) Len

func (p *PriorityMbrStatsMsgStore) Len() int

Return current size of data

func (*PriorityMbrStatsMsgStore) Push

Initially, set the local count to zero. If the queue size is max, delete the data with the highest localcount and insert it.

type PriorityQueue

type PriorityQueue []*Item

A PriorityQueue implements heap.Interface and holds Items.

func (PriorityQueue) Len

func (pq PriorityQueue) Len() int

Return length of priority queue

func (PriorityQueue) Less

func (pq PriorityQueue) Less(i, j int) bool

We want Pop to give us the lowest, not highest, priority so we use lower than here.

func (*PriorityQueue) Pop

func (pq *PriorityQueue) Pop() interface{}

Pop item from last

func (*PriorityQueue) Push

func (pq *PriorityQueue) Push(x interface{})

func (PriorityQueue) Swap

func (pq PriorityQueue) Swap(i, j int)

We want Pop to give us the lowest, not highest, priority so we use lower than here.

type ProbeResponse

type ProbeResponse struct {
	// contains filtered or unexported fields
}

func (ProbeResponse) Ok

func (r ProbeResponse) Ok() bool

type SWIM

type SWIM struct {
	// contains filtered or unexported fields
}

func New

func New(config *Config, suspicionConfig *SuspicionConfig, messageEndpointConfig MessageEndpointConfig, member *Member) *SWIM

func (*SWIM) Gossip

func (s *SWIM) Gossip(msg []byte)

Gossip message to p2p network.

func (*SWIM) Join

func (s *SWIM) Join(peerAddresses []string) error

Dial to the all peerAddresses and exchange memberList.

func (*SWIM) ShutDown

func (s *SWIM) ShutDown()

Shutdown the running swim.

func (*SWIM) Start

func (s *SWIM) Start()

Start SWIM protocol.

type Status

type Status int

Status of members

const (
	// Unknown status of a member
	Unknown Status = iota

	// Alive status
	Alive

	// Suspicious status of whether a member is dead or not
	Suspected

	// Dead status
	Dead
)

type SuspectMessage

type SuspectMessage struct {
	MemberMessage
	ConfirmerID string
}

Suspect message struct

type Suspicion

type Suspicion struct {
	// contains filtered or unexported fields
}

Suspicion manages the suspect timer and helps to accelerate the timeout as member self got more independent confirmations that a target member is suspect.

func NewSuspicion

func NewSuspicion(confirmer MemberID, k int, min time.Duration, max time.Duration, timeoutHandler func()) (*Suspicion, error)

NewSuspicion returns a timer started with the max value, and according to Lifeguard L2 (Dynamic Suspicion timeout) each unique confirmation will drive the timer to min value

func (*Suspicion) Confirm

func (s *Suspicion) Confirm(confirmer MemberID) bool

Confirm register new member who also determined the given suspected member as suspect. This returns true if this confirmer is new, and false if it was a duplicate information or if we've got enough confirmations to hit the value of timer to minimum

type SuspicionConfig

type SuspicionConfig struct {
	// contains filtered or unexported fields
}

type Task

type Task func() (interface{}, error)

type TaskResponse

type TaskResponse struct {
	// contains filtered or unexported fields
}

type TaskRunner

type TaskRunner struct {
	// contains filtered or unexported fields
}

func NewTaskRunner

func NewTaskRunner(task Task, ctx context.Context) *TaskRunner

func (*TaskRunner) Start

func (t *TaskRunner) Start() TaskResponse

type UDPTransport

type UDPTransport interface {

	// WriteTo is a packet-oriented interface that fires off the given
	// payload to the given address in a connectionless fashion. This should
	// return a time stamp that's as close as possible to when the packet
	// was transmitted to help make accurate RTT measurements during probes.
	//
	// We also treat the address here as a
	// string, similar to Dial, so it's network neutral, so this usually is
	// in the form of "host:port".
	WriteTo(b []byte, addr string) (time.Time, error)

	// PacketCh returns a channel that can be read to receive incoming
	// packets from other peers.
	PacketCh() <-chan *Packet

	// Shutdown all running go-routine inside UDPTransport
	Shutdown() error
}

UDP Transport is used to udp abstract over communicating with other peers.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL