raftx

package module
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2025 License: Apache-2.0 Imports: 5 Imported by: 1

README

RaftX

RaftX is an extension of the classic Raft protocol, integrating the advantages of Multi-Paxos, ZAB (Zookeeper Atomic Broadcast), and Raft protocols. RaftX features rapid election, concurrent proposals, data synchronization, data rollback, and volatile data synchronization, making it suitable for high-concurrency and large-scale distributed system scenarios.


Table of Contents

  1. Introduction
  2. Feature Overview
  3. System Architecture
  4. Installation Guide
  5. Usage
  6. Application Scenarios
  7. License

Introduction

In distributed systems, consensus algorithms are typically used to coordinate the state among multiple nodes to ensure data consistency. The traditional Raft protocol is easy to understand but has certain limitations in scenarios with high concurrency, many nodes, and large-scale data synchronization. Therefore, RaftX extends Raft by incorporating the concurrent proposal capability of Multi-Paxos and the data synchronization feature of ZAB, providing a more efficient and stable distributed consensus solution.

raftx wiki

Feature Overview

1. Rapid Election Mechanism
  • Elections are based on the majority rule principle of Raft and have been extended to ensure that a Leader is elected within a term.
  • Avoids the multi-round re-election issue caused by circular voting in Raft.
  • In clusters with usually 3-15 nodes, elections do not exceed one second.
  • Even in clusters with dozens of nodes, elections can be completed within 0-2 seconds under normal circumstances, with the main time spent on message notifications between nodes.
2. Concurrent Proposals and Log Synchronization
  • Similar to Multi-Paxos, it supports concurrent handling of multiple proposals, allowing unordered log and transaction submissions to enhance concurrency processing capabilities, suitable for high-concurrency scenarios.
3. Data Synchronization and Rollback
  • Like the data synchronization feature of ZAB, both Leaders and Followers automatically detect and synchronize missing data.
  • Automatically rolls back data during cluster failures or failed proposals to maintain data consistency across nodes.
4. Support for Distributed Volatile Data
Volatile data refers to data that does not require persistent storage, especially data that no longer needs to be retained after quick expiration. This type of data is characterized by a short lifecycle, frequent updates, and can sometimes tolerate loss.
The volatile data feature in RaftX is not part of the protocol itself; it's an extension of RaftX primarily relying on its network architecture and partial synchronization protocol. It does not have persistent data logs, so unlike RaftX clusters, it cannot fully synchronize all data at any time when new nodes are added. It depends on the length of the log cache pool, which is currently set to 1048576, recording the latest 1048576 data logs. If a node reconnects after disconnection, as long as the number of unsynchronized data entries is less than 1048576, they can be synchronized back. Otherwise, only the latest 1048576 data entries will be synchronized if exceeded.
Since volatile data is not persisted, it offers very high efficiency, playing a crucial role in the timeliness of cluster systems.
Acquiring volatile data is similar to RaftX, where it can be obtained from the cluster or locally. When fetched from the cluster, it ensures strong consistency results. However, fetching from local provides eventual consistency, potentially retrieving dirty data, with efficiency comparable to accessing local cache. The choice of how to acquire data depends on the characteristics and requirements of specific business needs.
Volatile data plays a significant role in collaborative cluster operations and can be applied to numerous distributed services, such as distributed locks, distributed logging systems, and distributed messaging notifications.
Characteristics of Volatile Data
  • Efficient Data Collaboration
    • Designed for fast read/write and high-concurrency scenarios, volatile data optimizes memory access efficiency to support substantial real-time interaction demands.
  • Ordered Operations
    • Ensures that create, update, and delete operations occur in the same order on each node, which is critical for maintaining global view consistency and correctness in a distributed environment.
  • TTL (time to live) for Data in Memory
    • Each volatile data item can have a set TTL, after which it expires and is removed from memory automatically.
  • Event Listening and Triggering
    • Supports event listening and triggering for data operations like create, update, and delete.
  • Strong Consistency
    • Ensures strong consistency for data reads and writes.

System Architecture

+-----------------------------------------+
|                Raftx Api                |
+-----------------------------------------+
           |                 |
           v                 v
+-------------------+   +-------------------+
|       Node 1      |   |       Node 2      |
| (Leader/Follower) |   | (Leader/Follower) |
+-------------------+   +-------------------+
           |                 |
           v                 v
+-------------------+   +-------------------+
|       Node 3      |   |       Node 4      |
| (Leader/Follower) |   | (Leader/Follower) |
+-------------------+   +-------------------+
Component Description
  1. Leader: Responsible for receiving client requests, proposing and synchronizing logs.
  2. Follower: Executes leader's data proposals, data commits, rollbacks, proxies client requests, etc.
  3. Raftx Api: Sends requests to the cluster, retrieves data, or submits state updates.

Installation Guide

Dependencies
  • Operating System: Linux, macOS, Windows
  • Programming Language: Go 1.22 or higher
  • Dependent Tools: Git
Project Installation
go get github.com/donnie4w/raftx
API Call Example
config := &Config{ListenAddr: ":20001", PeerAddr: []string{"localhost:20001","localhost:20002","localhost:20003"}}
raft := NewRaftx(config)
raft.Open()

Application Scenarios

  1. High-Concurrency Distributed Storage Systems

    • Suitable for databases, KV stores, and other systems requiring rapid proposals and concurrent log synchronization.
  2. Real-Time Status Synchronization Systems

    • For quickly expiring volatile data synchronization, such as user online status, location information, etc.
  3. Large-Scale Cluster Elections

    • Fits distributed systems needing rapid Leader election.
  4. URL Systems and CDN

    • Quickly shares volatile data between clusters to improve system response speed and availability.

License

RaftX uses the Apache-2.0 license.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Raftx

type Raftx interface {
	// LastCommitTxId returns the ID of the last transaction that has been committed to the state machine.
	LastCommitTxId() int64

	// LastTransactionId returns the ID of the latest transaction, whether it has been committed or not.
	LastTransactionId() int64

	// GetMetrics retrieves monitoring metrics related to the current state and performance of the Raftx node.
	GetMetrics() *raft.Metrics

	// Command submits a new proposal for consensus to be applied to the state machine.
	Command(cmd []byte) error

	// AddNode adds a new member node with the specified address to the Raftx cluster.
	AddNode(address string) error

	// RemoveNode removes a member node with the specified address from the Raftx cluster.
	// Returns true if the removal was successful; otherwise, false.
	RemoveNode(address string) bool

	// GetNodes returns a map containing information about all nodes in the cluster, including their addresses and terms.
	GetNodes() map[string]int64

	// TakeSnapshot captures a snapshot of the state machine within a given range of transactions, which can be used to truncate the log.
	TakeSnapshot(fromTransactionId, toTransactionId int64) ([]byte, error)

	// GetTerm returns the current term of the Raftx node.
	GetTerm() int64

	// GetLeaderID returns the ID and term of the current leader in the cluster.
	GetLeaderID() (string, int64)

	// GetState returns the current state of the Raftx node (Follower, Candidate, Leader).
	GetState() raft.STATE

	// ReSetNodeId Reset the node ID number and return the previous ID number
	ReSetNodeId(id int64) int64

	// RestoreSnapshot restores the state machine from a provided snapshot, allowing recovery without replaying all logs.
	RestoreSnapshot(data []byte) error

	// Stop gracefully halts the Raftx node service, ensuring all operations are completed before shutdown.
	Stop() error

	// Open starts the Raftx node service, initializing any necessary resources and configurations.
	Open() error

	// GetNodeTime returns two int64 values representing the node's start time and the current service time.
	// The start time is the Unix timestamp indicating when the node was started.
	// The service time is the duration in seconds that the node has been running since its start time.
	//
	// Returns:
	//   - startTime: An int64 value representing the Unix timestamp of when the node was started.
	//   - serviceTime: An int64 value representing the number of seconds the node has been running.
	GetNodeTime() (startTime int64, serviceTime int64)

	// GetValue retrieves a value from the state machine using the provided key.
	// If the key does not exist, an empty byte slice and an error are returned.
	GetValue(key []byte) (value []byte, err error)

	// GetLocalValue retrieves a value locally from the state machine using the provided key.
	// This bypasses consensus and is intended for read-only operations.
	GetLocalValue(key []byte) (value []byte, err error)

	// GetValueList retrieves a list of values corresponding to the provided keys from the state machine.
	// If a key does not exist, it will not appear in the result.
	GetValueList(key [][]byte) (result [][2][]byte, err error)

	// GetLocalValueList retrieves a list of values corresponding to the provided keys locally from the state machine.
	// This bypasses consensus and is intended for read-only operations.
	GetLocalValueList(key [][]byte) (result [][2][]byte, err error)

	// GetMemValue retrieves a value from the volatile memory storage using the provided key.
	// This method accesses data that may not yet have been committed to the state machine.
	GetMemValue(key []byte) (value []byte, err error)

	// GetLocalMemValue retrieves a value locally from the volatile memory storage using the provided key.
	// This bypasses consensus and is intended for read-only operations on volatile data.
	GetLocalMemValue(key []byte) (value []byte)

	// GetMemValueList retrieves a list of values corresponding to the provided keys from the volatile memory storage.
	// This method accesses data that may not yet have been committed to the state machine.
	GetMemValueList(key [][]byte) (result [][2][]byte, err error)

	// GetMemMultiValue retrieves a value from the volatile memory storage using the provided key.
	// This method accesses data that may not yet have been committed to the state machine.
	GetMemMultiValue(key []byte) (value [][]byte, err error)

	// GetMultiValueList retrieves a list of values corresponding to the provided keys from the volatile memory storage.
	// This method accesses data that may not yet have been committed to the state machine.
	GetMultiValueList(keys [][]byte) (result [][2][]byte, err error)

	// GetLocalMemValueList retrieves a list of values corresponding to the provided keys locally from the volatile memory storage.
	// This bypasses consensus and is intended for read-only operations on volatile data.
	GetLocalMemValueList(key [][]byte) (result [][2][]byte)

	// GetLocalMemMultiValue retrieves a list of values corresponding to the provided keys from the volatile memory storage.
	GetLocalMemMultiValue(key []byte) (value [][]byte)

	//GetLocalMemMultiValueList retrieves a list of values corresponding to the provided keys from the volatile memory storage.
	GetLocalMemMultiValueList(keys [][]byte) (result [][2][]byte)

	// MemCommand applies a command to the volatile memory storage.
	// This method does not go through the consensus process but directly modifies the in-memory state.
	MemCommand(key, value []byte, ttl uint64, ptype raft.MTYPE) (err error)

	// MemLen returns the number of active MemBean items currently stored in fsm
	MemLen() int64

	// MemWatch listens for changes to the volatile data associated with a specified key.
	//
	// Parameters:
	//   - key: The key to watch, provided as a byte slice.
	//   - watchFunc: A callback function that gets invoked when changes are detected.
	//       - key: The changed key, passed as a byte slice to the callback function.
	//       - value: The latest value of the key, passed as a byte slice to the callback function.
	//       - watchType: Indicates the type of change (created, deleted, modified).
	//   - isFnSync: Specifies whether the callback function should be executed synchronously.
	//               If true, the callback is executed immediately upon an event; if false, it may execute asynchronously.
	//   - watchTypes: An optional variadic parameter list specifying particular types of changes to watch (e.g., only creation and deletion).
	//                 If not provided, defaults to watching all types of changes.
	//
	// 注意:当使用 MemWatch 时,请确保在不再需要监听时调用相应的取消监听方法 (如 MemUnWatch 或 MemUnWatchWithType) 以避免潜在的内存泄漏或其他性能问题。
	MemWatch(key []byte, watchFunc func(key, value []byte, watchType raft.WatchType), isFnSync bool, watchTypes ...raft.WatchType)

	// MemUnWatch removes all listeners for the specified key.
	//
	// Parameters:
	//   - key: The key to stop watching, provided as a byte slice.
	//
	// 注意:调用此方法后,对于该键的所有变化将不再触发任何回调函数。
	MemUnWatch(key []byte)

	// MemUnWatchWithType removes listeners for specific types of changes on the specified key.
	//
	// Parameters:
	//   - key: The key to stop watching, provided as a byte slice.
	//   - wt: The specific type of change to stop listening for.
	//
	// 注意:与 MemUnWatch 不同,此方法仅取消对指定类型变化的监听。如果要取消对所有类型变化的监听,
	//      应使用 MemUnWatch 或多次调用本方法针对每种变化类型。
	MemUnWatchWithType(key []byte, wt raft.WatchType)

	//Running returns whether the raftx service is running properly
	Running() bool

	//WaitRun wait for the raftx service until it is ready to run
	WaitRun() error
}

Raftx is an interface that defines the behavior of a node participating in a distributed consensus algorithm based on the Raftx protocol. It encapsulates operations for managing cluster membership, log replication, leader election, snapshotting, and state transitions.

func NewRaftx

func NewRaftx(config *raft.Config) Raftx

NewRaftx is a factory function that initializes a new Raftx instance using the provided configuration.

type SimpleRaftx

type SimpleRaftx struct {
	// contains filtered or unexported fields
}

SimpleRaftx implements the Raftx interface and serves as a concrete implementation of a Raftx node. It manages the core logic for consensus, including log replication, leader election, and snapshot management.

func (*SimpleRaftx) AddNode

func (r *SimpleRaftx) AddNode(address string) error

AddNode adds a new member node with the specified address to the Raftx cluster.

func (*SimpleRaftx) Command

func (r *SimpleRaftx) Command(cmd []byte) error

Command submits a new proposal for consensus to be applied to the state machine. It first checks the rate limiter before submitting the command.

func (*SimpleRaftx) GetLeaderID

func (r *SimpleRaftx) GetLeaderID() (string, int64)

GetLeaderID returns the ID and term of the current leader in the cluster.

func (*SimpleRaftx) GetLocalMemMultiValue

func (r *SimpleRaftx) GetLocalMemMultiValue(key []byte) (value [][]byte)

GetLocalMemMultiValue retrieves a list of values corresponding to the provided keys from the volatile memory storage.

func (*SimpleRaftx) GetLocalMemMultiValueList

func (r *SimpleRaftx) GetLocalMemMultiValueList(keys [][]byte) (result [][2][]byte)

GetLocalMemMultiValueList retrieves a list of values corresponding to the provided keys from the volatile memory storage.

func (*SimpleRaftx) GetLocalMemValue

func (r *SimpleRaftx) GetLocalMemValue(key []byte) (value []byte)

GetLocalMemValue retrieves a value locally from the volatile memory storage using the provided key. This bypasses consensus and is intended for read-only operations on volatile data.

func (*SimpleRaftx) GetLocalMemValueList

func (r *SimpleRaftx) GetLocalMemValueList(keys [][]byte) (result [][2][]byte)

GetLocalMemValueList retrieves a list of values corresponding to the provided keys locally from the volatile memory storage. This bypasses consensus and is intended for read-only operations on volatile data.

func (*SimpleRaftx) GetLocalValue

func (r *SimpleRaftx) GetLocalValue(key []byte) (value []byte, err error)

GetLocalValue retrieves a value locally from the state machine using the provided key. This bypasses consensus and is intended for read-only operations.

func (*SimpleRaftx) GetLocalValueList

func (r *SimpleRaftx) GetLocalValueList(keys [][]byte) (result [][2][]byte, err error)

GetLocalValueList retrieves a list of values corresponding to the provided keys locally from the state machine. This bypasses consensus and is intended for read-only operations.

func (*SimpleRaftx) GetMemMultiValue

func (r *SimpleRaftx) GetMemMultiValue(key []byte) (value [][]byte, err error)

GetMemMultiValue retrieves a value from the volatile memory storage using the provided key. This method accesses data that may not yet have been committed to the state machine.

func (*SimpleRaftx) GetMemValue

func (r *SimpleRaftx) GetMemValue(key []byte) (value []byte, err error)

GetMemValue retrieves a value from the volatile memory storage using the provided key. This method accesses data that may not yet have been committed to the state machine.

func (*SimpleRaftx) GetMemValueList

func (r *SimpleRaftx) GetMemValueList(keys [][]byte) (result [][2][]byte, err error)

GetMemValueList retrieves a list of values corresponding to the provided keys from the volatile memory storage. This method accesses data that may not yet have been committed to the state machine.

func (*SimpleRaftx) GetMetrics

func (r *SimpleRaftx) GetMetrics() *raft.Metrics

GetMetrics retrieves monitoring metrics related to the current state and performance of the Raftx node. Note: This method currently returns nil as no specific metrics are implemented.

func (*SimpleRaftx) GetMultiValueList

func (r *SimpleRaftx) GetMultiValueList(keys [][]byte) (result [][2][]byte, err error)

GetMultiValueList retrieves a list of values corresponding to the provided keys from the volatile memory storage. This method accesses data that may not yet have been committed to the state machine.

func (*SimpleRaftx) GetNodeTime

func (r *SimpleRaftx) GetNodeTime() (startTime int64, serviceTime int64)

GetNodeTime returns two int64 values representing the node's start time and the current service time

func (*SimpleRaftx) GetNodes

func (r *SimpleRaftx) GetNodes() map[string]int64

GetNodes returns a map containing information about all nodes in the cluster, including their addresses and terms.

func (*SimpleRaftx) GetState

func (r *SimpleRaftx) GetState() raft.STATE

GetState returns the current state of the Raftx node (Follower, Candidate, Leader).

func (*SimpleRaftx) GetTerm

func (r *SimpleRaftx) GetTerm() int64

GetTerm returns the current term of the Raftx node.

func (*SimpleRaftx) GetValue

func (r *SimpleRaftx) GetValue(key []byte) (value []byte, err error)

GetValue retrieves a value from the state machine using the provided key. If the key does not exist, an empty byte slice and an error are returned.

func (*SimpleRaftx) GetValueList

func (r *SimpleRaftx) GetValueList(keys [][]byte) (result [][2][]byte, err error)

GetValueList retrieves a list of values corresponding to the provided keys from the state machine. If a key does not exist, it will not appear in the result.

func (*SimpleRaftx) LastCommitTxId

func (r *SimpleRaftx) LastCommitTxId() int64

LastCommitTxId returns the ID of the last transaction that has been committed to the state machine.

func (*SimpleRaftx) LastTransactionId

func (r *SimpleRaftx) LastTransactionId() int64

LastTransactionId returns the ID of the latest transaction, whether it has been committed or not.

func (*SimpleRaftx) MemCommand

func (r *SimpleRaftx) MemCommand(key, value []byte, ttl uint64, ptype raft.MTYPE) (err error)

MemCommand applies a command to the volatile memory storage. This method does not go through the consensus process but directly modifies the in-memory state.

func (*SimpleRaftx) MemLen

func (r *SimpleRaftx) MemLen() int64

func (*SimpleRaftx) MemUnWatch

func (r *SimpleRaftx) MemUnWatch(key []byte)

func (*SimpleRaftx) MemUnWatchWithType

func (r *SimpleRaftx) MemUnWatchWithType(key []byte, wt raft.WatchType)

func (*SimpleRaftx) MemWatch

func (r *SimpleRaftx) MemWatch(key []byte, watchFunc func(key, value []byte, watchType raft.WatchType), isSync bool, watchTypes ...raft.WatchType)

func (*SimpleRaftx) Open

func (r *SimpleRaftx) Open() error

Open starts the Raftx node service, initializing any necessary resources and configurations.

func (*SimpleRaftx) ReSetNodeId

func (r *SimpleRaftx) ReSetNodeId(id int64) (prev int64)

ReSetNodeId Reset the node Id number and return the previous ID number

func (*SimpleRaftx) RemoveNode

func (r *SimpleRaftx) RemoveNode(address string) bool

RemoveNode removes a member node with the specified address from the Raftx cluster. Returns true if the removal was successful; otherwise, false.

func (*SimpleRaftx) RestoreSnapshot

func (r *SimpleRaftx) RestoreSnapshot(data []byte) error

RestoreSnapshot restores the state machine from a provided snapshot, allowing recovery without replaying all logs.

func (*SimpleRaftx) Running

func (r *SimpleRaftx) Running() bool

func (*SimpleRaftx) Stop

func (r *SimpleRaftx) Stop() error

Stop gracefully halts the Raftx node service, ensuring all operations are completed before shutdown.

func (*SimpleRaftx) TakeSnapshot

func (r *SimpleRaftx) TakeSnapshot(fromTransactionId, toTransactionId int64) ([]byte, error)

TakeSnapshot captures a snapshot of the state machine within a given range of transactions, which can be used to truncate the log.

func (*SimpleRaftx) WaitRun

func (r *SimpleRaftx) WaitRun() error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL