Documentation
¶
Index ¶
- Variables
- func Dial(addr string, opts ...grpc.DialOption) (*grpc.ClientConn, error)
- func GetHashID(key []byte) []byte
- func IDToString(id []byte) string
- func NewInode(id string, addr string) *cm.Node
- type Config
- type GrpcTransport
- func (g *GrpcTransport) CheckPredecessor(node *cm.Node) error
- func (g *GrpcTransport) DeleteKey(node *cm.Node, key []byte) error
- func (g *GrpcTransport) DeleteKeys(node *cm.Node, keys [][]byte) error
- func (g *GrpcTransport) FindSuccessor(node *cm.Node, id []byte) (*cm.Node, error)
- func (g *GrpcTransport) GetKey(node *cm.Node, key []byte) (*cm.GetResponse, error)
- func (g *GrpcTransport) GetPredecessor(node *cm.Node) (*cm.Node, error)
- func (g *GrpcTransport) GetServer() *grpc.Server
- func (g *GrpcTransport) GetSuccessor(node *cm.Node) (*cm.Node, error)
- func (g *GrpcTransport) Notify(node, pred *cm.Node) error
- func (g *GrpcTransport) RequestKeys(node *cm.Node, from, to []byte) ([]*cm.KV, error)
- func (g *GrpcTransport) SetKey(node *cm.Node, key, value []byte) error
- func (g *GrpcTransport) SetPredecessor(node *cm.Node, pred *cm.Node) error
- func (g *GrpcTransport) SetSuccessor(node *cm.Node, succ *cm.Node) error
- func (g *GrpcTransport) Start() error
- func (g *GrpcTransport) Stop() error
- type Message
- type Node
- func (n *Node) CheckPredecessor(ctx context.Context, id *cm.ID) (*cm.ER, error)
- func (n *Node) Delete(key []byte) error
- func (n *Node) Find(key []byte) (*cm.Node, error)
- func (n *Node) FindSuccessor(ctx context.Context, id *cm.ID) (*cm.Node, error)
- func (node *Node) FingerTableString() string
- func (n *Node) Get(key []byte) ([]byte, error)
- func (n *Node) GetConfig() *Config
- func (n *Node) GetPredecessor(ctx context.Context, r *cm.ER) (*cm.Node, error)
- func (n *Node) GetShutdownCh() chan struct{}
- func (n *Node) GetStorage() Storage
- func (n *Node) GetSuccessor(ctx context.Context, r *cm.ER) (*cm.Node, error)
- func (n *Node) Notify(ctx context.Context, node *cm.Node) (*cm.ER, error)
- func (n *Node) Set(key []byte, value []byte) error
- func (n *Node) SetPredecessor(ctx context.Context, pred *cm.Node) (*cm.ER, error)
- func (n *Node) SetSuccessor(ctx context.Context, succ *cm.Node) (*cm.ER, error)
- func (n *Node) Stop()
- func (n *Node) XDelete(ctx context.Context, req *cm.DeleteRequest) (*cm.DeleteResponse, error)
- func (n *Node) XGet(ctx context.Context, req *cm.GetRequest) (*cm.GetResponse, error)
- func (n *Node) XMultiDelete(ctx context.Context, req *cm.MultiDeleteRequest) (*cm.DeleteResponse, error)
- func (n *Node) XRequestKeys(ctx context.Context, req *cm.RequestKeysRequest) (*cm.RequestKeysResponse, error)
- func (n *Node) XSet(ctx context.Context, req *cm.SetRequest) (*cm.SetResponse, error)
- type Storage
- type Transport
- type TxStorage
Constants ¶
This section is empty.
Variables ¶
Functions ¶
func Dial ¶
func Dial(addr string, opts ...grpc.DialOption) (*grpc.ClientConn, error)
func IDToString ¶
IDToString converts a []byte to a big.Int string, useful for debugging/logging.
Types ¶
type Config ¶
type Config struct {
Id string
Addr string
ServerOpts []grpc.ServerOption
DialOpts []grpc.DialOption
Hash func() hash.Hash // Hash function to use
HashSize int
StabilizeMin time.Duration // Minimum stabilization time
StabilizeMax time.Duration // Maximum stabilization time
Timeout time.Duration
MaxIdle time.Duration
}
一个Node一个Config
func DefaultConfig ¶
func DefaultConfig() *Config
type GrpcTransport ¶
type GrpcTransport struct {
*cm.UnimplementedChordServer
// contains filtered or unexported fields
}
func NewGrpcTransport ¶
func NewGrpcTransport(cnf *Config) (*GrpcTransport, error)
func NewGrpcTransport(config *Config) (cm.ChordClient, error) {
func (*GrpcTransport) CheckPredecessor ¶
func (g *GrpcTransport) CheckPredecessor(node *cm.Node) error
func (*GrpcTransport) DeleteKey ¶
func (g *GrpcTransport) DeleteKey(node *cm.Node, key []byte) error
func (*GrpcTransport) DeleteKeys ¶
func (g *GrpcTransport) DeleteKeys(node *cm.Node, keys [][]byte) error
func (*GrpcTransport) FindSuccessor ¶
FindSuccessor the successor ID of a remote node.
func (*GrpcTransport) GetKey ¶
func (g *GrpcTransport) GetKey(node *cm.Node, key []byte) (*cm.GetResponse, error)
func (*GrpcTransport) GetPredecessor ¶
GetPredecessor the successor ID of a remote node.
func (*GrpcTransport) GetServer ¶
func (g *GrpcTransport) GetServer() *grpc.Server
func (*GrpcTransport) GetSuccessor ¶
GetSuccessor the successor ID of a remote node.
func (*GrpcTransport) RequestKeys ¶
func (*GrpcTransport) SetKey ¶
func (g *GrpcTransport) SetKey(node *cm.Node, key, value []byte) error
func (*GrpcTransport) SetPredecessor ¶
func (*GrpcTransport) SetSuccessor ¶
func (*GrpcTransport) Start ¶
func (g *GrpcTransport) Start() error
type Node ¶
type Node struct {
*cm.Node
*cm.UnimplementedChordServer
// contains filtered or unexported fields
}
func NewNode ¶
NewNode creates a new Chord node. Returns error if node alreadyexists in the chord ring
func (*Node) CheckPredecessor ¶
func (*Node) FindSuccessor ¶
func (*Node) FingerTableString ¶
FingerTableString takes a node and converts it's finger table into a string.
func (*Node) GetPredecessor ¶
func (*Node) GetShutdownCh ¶
func (n *Node) GetShutdownCh() chan struct{}
func (*Node) GetStorage ¶
func (*Node) GetSuccessor ¶
ctx context.Context上下文 GetSuccessor gets the successor on the node..
func (*Node) Notify ¶
已验证逻辑,transfer_Keys部分存疑 Notify notifies Chord that Node(Client) thinks it is our predecessor Notify(n0): n0通知n它的存在,若此时n没有前序节点或,n0比n现有的前序节点更加靠近n,则n将其设置为前序节点。
func (*Node) SetPredecessor ¶
SetPredecessor sets the predecessor on the node..
func (*Node) SetSuccessor ¶
SetSuccessor sets the successor on the node..
func (*Node) XDelete ¶
func (n *Node) XDelete(ctx context.Context, req *cm.DeleteRequest) (*cm.DeleteResponse, error)
func (*Node) XGet ¶
func (n *Node) XGet(ctx context.Context, req *cm.GetRequest) (*cm.GetResponse, error)
获取key对应的数据
func (*Node) XMultiDelete ¶
func (n *Node) XMultiDelete(ctx context.Context, req *cm.MultiDeleteRequest) (*cm.DeleteResponse, error)
func (*Node) XRequestKeys ¶
func (n *Node) XRequestKeys(ctx context.Context, req *cm.RequestKeysRequest) (*cm.RequestKeysResponse, error)
func (*Node) XSet ¶
func (n *Node) XSet(ctx context.Context, req *cm.SetRequest) (*cm.SetResponse, error)
type Transport ¶
type Transport interface {
Start() error
Stop() error
//RPC
GetSuccessor(*cm.Node) (*cm.Node, error)
FindSuccessor(*cm.Node, []byte) (*cm.Node, error)
GetPredecessor(*cm.Node) (*cm.Node, error)
Notify(*cm.Node, *cm.Node) error
CheckPredecessor(*cm.Node) error
SetPredecessor(*cm.Node, *cm.Node) error
SetSuccessor(*cm.Node, *cm.Node) error
//Storage
GetKey(*cm.Node, []byte) (*cm.GetResponse, error)
SetKey(*cm.Node, []byte, []byte) error
DeleteKey(*cm.Node, []byte) error
RequestKeys(*cm.Node, []byte, []byte) ([]*cm.KV, error)
DeleteKeys(*cm.Node, [][]byte) error
}
要实现一个 RPC 框架,只需要把以下三点实现了就基本完成了:
Call ID 映射:可以直接使用函数字符串,也可以使用整数 ID。映射表一般就是一个哈希表。 序列化反序列化:可以自己写,也可以使用 Protobuf 或者 FlatBuffers 之类的。 网络传输库:可以自己写 Socket,或者用 Asio,ZeroMQ,Netty 之类。
Transport enables a node to talk to the other nodes in the ring