raft

package module
v0.0.0-...-42b9e0f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 25, 2023 License: Apache-2.0 Imports: 24 Imported by: 0

README

raft

Documentation

Index

Constants

View Source
const (
	AppendEntriesRequestType = MessageRequestType(iota + 1)
	AppendEntriesResponseType
	VoteRequestType
	VoteResponseType
	InstallSnapshotRequestType
	InstallSnapshotResponseType
	TimeoutNowRequestType
	TimeoutNowResponseType
	HeartbeatRequestType
	HeartbeatResponseType
	ClusterRequestType
	ClusterResponseType
	FsmReadRequestType
	FsmReadResponseType
)

Variables

View Source
var (
	ErrLogNotFound = fmt.Errorf("log not found")
	ErrNotFound    = fmt.Errorf("data not found")
)
View Source
var (
	FutureWaitTimeoutErr = fmt.Errorf("future wait timeout")
)

Functions

This section is empty.

Types

type Addresses

type Addresses interface {
	Local() (local string, err error)
	Members() (addresses []string, err error)
}

func DesignatedAddresses

func DesignatedAddresses(local string, members ...string) Addresses

type AppendEntriesRequest

type AppendEntriesRequest struct {
	RPCHeader
	Term              uint64
	PrevLogEntry      uint64
	PrevLogTerm       uint64
	LeaderCommitIndex uint64
	Key               []byte
	Entries           []*Log
}

func (*AppendEntriesRequest) Decode

func (request *AppendEntriesRequest) Decode(msg MessageReader) (err error)

func (*AppendEntriesRequest) Encode

func (request *AppendEntriesRequest) Encode() (writer MessageWriter, err error)

type AppendEntriesResponse

type AppendEntriesResponse struct {
	RPCHeader
	Term           uint64
	LastLog        uint64
	Succeed        bool
	NoRetryBackoff bool
}

func (*AppendEntriesResponse) Decode

func (response *AppendEntriesResponse) Decode(msg MessageReader) (err error)

func (*AppendEntriesResponse) Encode

func (response *AppendEntriesResponse) Encode() (writer MessageWriter, err error)

type ClusterRequest

type ClusterRequest struct {
	RPCHeader
	Command  []byte
	Argument []byte
}

func (*ClusterRequest) Decode

func (request *ClusterRequest) Decode(msg MessageReader) (err error)

func (*ClusterRequest) Encode

func (request *ClusterRequest) Encode() (writer MessageWriter, err error)

type ClusterResponse

type ClusterResponse struct {
	RPCHeader
	Result []byte
	Error  []byte
}

func (*ClusterResponse) Decode

func (response *ClusterResponse) Decode(msg MessageReader) (err error)

func (*ClusterResponse) Encode

func (response *ClusterResponse) Encode() (writer MessageWriter, err error)

type Configuration

type Configuration struct {
	Servers []Server
}

type FSM

type FSM interface {
	Apply(data []byte) (result []byte, err error)
	Read(command []byte, argument []byte) (result []byte, err error)
	Snapshot() (FSMSnapshot, error)
	Restore(snapshot io.ReadCloser) error
	Close() (err error)
}

type FSMSnapshot

type FSMSnapshot interface {
	Persist(sink SnapshotSink) error
	Release()
}

type FileSnapshotSink

type FileSnapshotSink struct {
	// contains filtered or unexported fields
}

func (*FileSnapshotSink) Cancel

func (sink *FileSnapshotSink) Cancel() (err error)

func (*FileSnapshotSink) Close

func (sink *FileSnapshotSink) Close() (err error)

func (*FileSnapshotSink) Id

func (sink *FileSnapshotSink) Id() (id string)

func (*FileSnapshotSink) Write

func (sink *FileSnapshotSink) Write(p []byte) (n int, err error)

type FsmReadRequest

type FsmReadRequest struct {
	RPCHeader
	Command  []byte
	Argument []byte
}

func (*FsmReadRequest) Decode

func (request *FsmReadRequest) Decode(msg MessageReader) (err error)

func (*FsmReadRequest) Encode

func (request *FsmReadRequest) Encode() (writer MessageWriter, err error)

type FsmReadResponse

type FsmReadResponse struct {
	RPCHeader
	Result []byte
	Error  []byte
}

func (*FsmReadResponse) Decode

func (response *FsmReadResponse) Decode(msg MessageReader) (err error)

func (*FsmReadResponse) Encode

func (response *FsmReadResponse) Encode() (writer MessageWriter, err error)

type Future

type Future[R any] interface {
	Wait(ctx context.Context) (result R, err error)
}

type Handler

type Handler struct {
}

func (*Handler) Handle

func (handler *Handler) Handle(request *AppendEntriesRequest) (response *AppendEntriesResponse)

func (*Handler) Key

func (handler *Handler) Key() (key string)

type Handlers

type Handlers struct {
	// contains filtered or unexported fields
}

func (*Handlers) Dispatch

func (handlers *Handlers) Dispatch(request *AppendEntriesRequest) (response *AppendEntriesResponse)

type HeartbeatPayload

type HeartbeatPayload struct {
	PrevLogEntry      uint64
	PrevLogTerm       uint64
	LeaderCommitIndex uint64
}

type HeartbeatRequest

type HeartbeatRequest struct {
	RPCHeader
	Term uint64
	Key  []byte
	// contains filtered or unexported fields
}

func (*HeartbeatRequest) Decode

func (request *HeartbeatRequest) Decode(msg MessageReader) (err error)

func (*HeartbeatRequest) Encode

func (request *HeartbeatRequest) Encode() (writer MessageWriter, err error)

type HeartbeatResponse

type HeartbeatResponse struct {
	RPCHeader
}

func (*HeartbeatResponse) Decode

func (response *HeartbeatResponse) Decode(msg MessageReader) (err error)

func (*HeartbeatResponse) Encode

func (response *HeartbeatResponse) Encode() (writer MessageWriter, err error)

type InstallSnapshotRequest

type InstallSnapshotRequest struct {
	RPCHeader
	Term               uint64
	LastLogIndex       uint64
	LastLogTerm        uint64
	ConfigurationIndex uint64
	Size               uint64
	Leader             []byte
	Configuration      []byte
	Snapshot           io.Reader
}

func (*InstallSnapshotRequest) Decode

func (request *InstallSnapshotRequest) Decode(msg MessageReader) (err error)

func (*InstallSnapshotRequest) Encode

func (request *InstallSnapshotRequest) Encode() (writer MessageWriter, err error)

type InstallSnapshotResponse

type InstallSnapshotResponse struct {
	RPCHeader
	Term    uint64
	Succeed bool
}

func (*InstallSnapshotResponse) Decode

func (response *InstallSnapshotResponse) Decode(msg MessageReader) (err error)

func (*InstallSnapshotResponse) Encode

func (response *InstallSnapshotResponse) Encode() (writer MessageWriter, err error)

type LeaderOptions

type LeaderOptions struct {
	HeartbeatTimeout time.Duration
	ElectionTimeout  time.Duration
	CommitTimeout    time.Duration
	// LeaseTimeout
	// 用于控制“租约”的持续时间
	// 作为领导者而无法联系法定人数
	// 个节点。如果我们在没有联系的情况下达到这个间隔,我们将
	// 辞去领导职务。
	LeaseTimeout time.Duration
}

func (LeaderOptions) Verify

func (options LeaderOptions) Verify() (err error)

type Log

type Log struct {
	Index      uint64
	Term       uint64
	Type       LogType
	Data       []byte
	Extensions []byte
	AppendedAt time.Time
	Committed  bool
}

type LogMeta

type LogMeta interface {
	Term() (term uint64, err error)
	SetTerm(term uint64) (err error)
	Set(key []byte, val []byte) (err error)
	Get(key []byte) (val []byte, err error)
}

type LogType

type LogType uint8
const (
	LogCommand LogType = iota
	LogNoop
	LogConfiguration
)

func (LogType) String

func (lt LogType) String() string

type Message

type Message struct {
	// contains filtered or unexported fields
}

Message +---------------------------------------------------------+-----------+-----------+ | Header | Body | Trunk | +---------------------+-----------------+-----------------+-----------+-----------+ | 4(BigEndian) | 2(BigEndian) | 2(BigEndian) | n | reader | +---------------------+-----------------+-----------------+-----------+-----------+ | Len(data) | request type | kind | data | snappy | +---------------------+-----------------+-----------------+-----------+-----------+

func (*Message) Bytes

func (msg *Message) Bytes() (p []byte)

func (*Message) ReadFrom

func (msg *Message) ReadFrom(r io.Reader) (n int64, err error)

func (*Message) RequestType

func (msg *Message) RequestType() (typ MessageRequestType)

func (*Message) Trunk

func (msg *Message) Trunk() (trunk *Trunk, has bool)

func (*Message) WriteTo

func (msg *Message) WriteTo(w io.Writer) (n int64, err error)

type MessageReader

type MessageReader interface {
	ReadFrom(r io.Reader) (n int64, err error)
	RequestType() (typ MessageRequestType)
	Bytes() (p []byte)
	Trunk() (trunk *Trunk, has bool)
}

func NewMessageReader

func NewMessageReader() (msg MessageReader)

type MessageRequestType

type MessageRequestType uint16

type MessageWriter

type MessageWriter interface {
	WriteTo(w io.Writer) (n int64, err error)
}

func NewMessageWriter

func NewMessageWriter(requestType MessageRequestType, data []byte) (msg MessageWriter)

func NewMessageWriterWithTrunk

func NewMessageWriterWithTrunk(requestType MessageRequestType, data []byte, sink io.Reader) (msg MessageWriter)

type Node

type Node struct {
	// contains filtered or unexported fields
}

func (*Node) Apply

func (node *Node) Apply(ctx context.Context, key []byte, body []byte, timeout time.Duration) (result []byte, err error)

func (*Node) Close

func (node *Node) Close(ctx context.Context) (err error)

func (*Node) Run

func (node *Node) Run(ctx context.Context) (err error)

type Options

type Options struct {
	Id        string
	Addresses Addresses
	Nonvoter  bool
	TLS       TLS
	Leader    LeaderOptions
	Snapshot  SnapshotOptions
	Store     Store
	FSM       FSM
	Transport Transport
}

func (Options) Verify

func (options Options) Verify() (err error)

type OptionsBuilder

type OptionsBuilder struct {
	// contains filtered or unexported fields
}

func NewOptionsBuilder

func NewOptionsBuilder() (builder *OptionsBuilder)

func (*OptionsBuilder) Addresses

func (builder *OptionsBuilder) Addresses(addresses Addresses)

func (*OptionsBuilder) Fsm

func (builder *OptionsBuilder) Fsm(fsm FSM)

func (*OptionsBuilder) Id

func (builder *OptionsBuilder) Id(id string)

func (*OptionsBuilder) LeaderCommitTimeout

func (builder *OptionsBuilder) LeaderCommitTimeout(timeout time.Duration)

func (*OptionsBuilder) LeaderElectionTimeout

func (builder *OptionsBuilder) LeaderElectionTimeout(timeout time.Duration)

func (*OptionsBuilder) LeaderHeartbeatTimeout

func (builder *OptionsBuilder) LeaderHeartbeatTimeout(timeout time.Duration)

func (*OptionsBuilder) LeaderLeaseTimeout

func (builder *OptionsBuilder) LeaderLeaseTimeout(timeout time.Duration)

func (*OptionsBuilder) Nonvoter

func (builder *OptionsBuilder) Nonvoter(yes bool)

func (*OptionsBuilder) SnapshotInterval

func (builder *OptionsBuilder) SnapshotInterval(interval time.Duration)

func (*OptionsBuilder) SnapshotNotRestoreOnStart

func (builder *OptionsBuilder) SnapshotNotRestoreOnStart(yes bool)

func (*OptionsBuilder) SnapshotStore

func (builder *OptionsBuilder) SnapshotStore(store SnapshotStore)

func (*OptionsBuilder) SnapshotThreshold

func (builder *OptionsBuilder) SnapshotThreshold(threshold uint64)

func (*OptionsBuilder) SnapshotTrailingLogs

func (builder *OptionsBuilder) SnapshotTrailingLogs(trailingLogs uint64)

func (*OptionsBuilder) Store

func (builder *OptionsBuilder) Store(store Store)

func (*OptionsBuilder) TLS

func (builder *OptionsBuilder) TLS(v TLS)

func (*OptionsBuilder) Transport

func (builder *OptionsBuilder) Transport(transport Transport)

type Promise

type Promise[R any] struct {
	// contains filtered or unexported fields
}

func (*Promise[R]) Failed

func (p *Promise[R]) Failed(err error)

func (*Promise[R]) Succeed

func (p *Promise[R]) Succeed(result R)

func (*Promise[R]) Wait

func (p *Promise[R]) Wait(ctx context.Context) (result R, err error)

type RPC

type RPC interface {
	Encode() (writer MessageWriter, err error)
	Decode(msg MessageReader) (err error)
}

type RPCHeader

type RPCHeader struct {
	// Id is the ServerId of the node sending the RPC Request or Response
	Id []byte
	// Addr is the ServerAddr of the node sending the RPC Request or Response
	Addr []byte
}

type Raft

type Raft interface {
	// Run start node
	//
	// check cluster was serving, when serving, then call leader to add this node,
	// when not serving, then boot a cluster.
	Run(ctx context.Context) (err error)
	// Close shutdown node
	//
	// check leader, when this node is leader, then just shutdown,
	// when this nod is not leader, then call leader to remove this node and shutdown.
	Close(ctx context.Context) (err error)
	// Apply make fsm to apply a log
	Apply(ctx context.Context, key []byte, body []byte, timeout time.Duration) (result []byte, err error)
}

func New

func New(options Options) (r Raft, err error)

type Server

type Server struct {
	Suffrage ServerSuffrage
	Id       string
	Address  string
}

type ServerSuffrage

type ServerSuffrage int
const (
	Voter ServerSuffrage = iota + 1
	Nonvoter
)

func (ServerSuffrage) String

func (s ServerSuffrage) String() string

type SnapshotMeta

type SnapshotMeta struct {
	Id                 string
	Index              uint64
	Term               uint64
	Configuration      Configuration
	ConfigurationIndex uint64
	Size               uint64
	CRC                []byte
	CreateAT           time.Time
}

func (*SnapshotMeta) Decode

func (meta *SnapshotMeta) Decode(r io.Reader) (err error)

func (*SnapshotMeta) Encode

func (meta *SnapshotMeta) Encode() (p []byte)

func (*SnapshotMeta) WriteTo

func (meta *SnapshotMeta) WriteTo(writer io.Writer) (n int64, err error)

type SnapshotMetas

type SnapshotMetas []*SnapshotMeta

func (SnapshotMetas) Len

func (metas SnapshotMetas) Len() int

func (SnapshotMetas) Less

func (metas SnapshotMetas) Less(i, j int) bool

func (SnapshotMetas) Swap

func (metas SnapshotMetas) Swap(i, j int)

type SnapshotOptions

type SnapshotOptions struct {
	Store SnapshotStore
	// TrailingLogs控制快照后留下的日志数量。这是为了让我们可以快速回放跟踪者的日志,而不是被迫发送整个快照。此处传递的值是使用的初始设置。这可以在运行期间使用ReloadConfig进行调整。
	TrailingLogs uint64
	// SnapshotThreshold控制在执行快照之前必须有多少未完成的日志。这是为了通过重放一小组日志来防止过度快照。此处传递的值是使用的初始设置。这可以在运行期间使用ReloadConfig进行调整。
	Threshold                uint64
	Interval                 time.Duration
	NoSnapshotRestoreOnStart bool
}

func (SnapshotOptions) Verify

func (options SnapshotOptions) Verify() (err error)

type SnapshotSink

type SnapshotSink interface {
	io.WriteCloser
	Id() string
	Cancel() error
}

type SnapshotStore

type SnapshotStore interface {
	Create(index uint64, term uint64, configuration Configuration, configurationIndex uint64) (sink SnapshotSink, err error)
	List() (metas []*SnapshotMeta, err error)
	Open(id string) (meta *SnapshotMeta, reader io.ReadCloser, err error)
}

func FileSnapshotStore

func FileSnapshotStore(dir string) (store SnapshotStore, err error)

type State

type State uint32
const (
	Shutdown State = iota
	Follower
	Candidate
	Leader
)

func (State) String

func (s State) String() string

type Store

type Store interface {
	AcquireIndex() (term uint64, index uint64, err error)
	Meta() (meta LogMeta)
	FirstIndex() (index uint64, err error)
	LastIndex() (index uint64, err error)
	Read(index uint64) (log *Log, err error)
	Write(logs ...*Log) (err error)
	Commit(logs ...*Log) (err error)
	Remove(logs ...*Log) (err error)
	Close()
}

func FileStore

func FileStore(dir string, caches int) (store Store, err error)

type TLS

type TLS interface {
	Config() (server *tls.Config, client *tls.Config, err error)
}

func TLSFiles

func TLSFiles(ca, serverCert, serverKey, clientCert, clientKey string) TLS

type TimeoutNowRequest

type TimeoutNowRequest struct {
	RPCHeader
}

func (*TimeoutNowRequest) Decode

func (request *TimeoutNowRequest) Decode(msg MessageReader) (err error)

func (*TimeoutNowRequest) Encode

func (request *TimeoutNowRequest) Encode() (writer MessageWriter, err error)

type TimeoutNowResponse

type TimeoutNowResponse struct {
	RPCHeader
}

func (*TimeoutNowResponse) Decode

func (response *TimeoutNowResponse) Decode(msg MessageReader) (err error)

func (*TimeoutNowResponse) Encode

func (response *TimeoutNowResponse) Encode() (writer MessageWriter, err error)

type Transport

type Transport interface {
	Dial(address string) (conn net.Conn, err error)
	Listen(address string) (ln net.Listener, err error)
}

func TcpTransport

func TcpTransport() Transport

func TcpTransportWithTLS

func TcpTransportWithTLS(serverTLS *tls.Config, clientTLS *tls.Config) Transport

type Trunk

type Trunk struct {
	// contains filtered or unexported fields
}

func (*Trunk) NextBlock

func (trunk *Trunk) NextBlock() (p []byte, err error)

func (*Trunk) Read

func (trunk *Trunk) Read(p []byte) (n int, err error)

func (*Trunk) SetLimiter

func (trunk *Trunk) SetLimiter(limiter uint64)

type VoteRequest

type VoteRequest struct {
	RPCHeader
	Term               uint64
	LastLogIndex       uint64
	LastLogTerm        uint64
	LeadershipTransfer bool
}

func (*VoteRequest) Decode

func (request *VoteRequest) Decode(msg MessageReader) (err error)

func (*VoteRequest) Encode

func (request *VoteRequest) Encode() (writer MessageWriter, err error)

type VoteResponse

type VoteResponse struct {
	RPCHeader
	Term    uint64
	Granted bool
}

func (*VoteResponse) Decode

func (response *VoteResponse) Decode(msg MessageReader) (err error)

func (*VoteResponse) Encode

func (response *VoteResponse) Encode() (writer MessageWriter, err error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL