rpc

package
v0.0.0-...-020e20f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 1, 2022 License: Apache-2.0 Imports: 18 Imported by: 0

README

DH-RPC

The traditional key exchange like TLS or SSL needs a CA to ensure key exchange run safely. But in DH-RPC, a DHT is used. The main idea is removing CA Cert from the whole system by using a DHT for Naming and Key Exchange.

DH-RPC is a secp256k1-ECDH-AES encrypted P2P RPC framework for decentralized applications written in golang.

SQLess is built on DH-RPC, including:

  • Byzantine Fault Tolerance consensus protocol
  • Consistent Secure DHT
  • DB API
  • Metric Collect
  • Blocks sync

Usage

Alice Client:

// Init Key Management System
route.InitKMS(PubKeyStoreFile)

// Register Node public key, addr to Tracker
reqA := &proto.PingReq{
    Node: AliceNode,
}
respA := new(proto.PingResp)
rpc.NewCaller().CallNode(Tracker.NodeID, "DHT.Ping", reqA, respA)

pc := rpc.NewPersistentCaller(BobNodeID)
respSimple := new(string)
pc.Call("Test.Talk", "Hi there", respSimple)
fmt.Printf("Response msg: %s", *respSimple)

Bob Server:

// RPC logic
// TestService to be register to RPC server
type TestService struct {}

func (s *TestService) Talk(msg string, ret *string) error {
	fmt.Println(msg)
	resp := fmt.Sprintf("got msg %s", msg)
	*ret = resp
	return nil
}

// Init Key Management System
route.InitKMS(PubKeyStoreFile)

// Register DHT service
server, err := rpc.NewServerWithService(rpc.ServiceMap{
    "Test": &TestService{},
})

// Init RPC server with an empty master key, which is not recommend
server.InitRPCServer("0.0.0.0:2120", PrivateKeyFile, "")

// Start Node RPC server
server.Serve()

Tracker stuff can refer to the Example section below

Features

  • 100% compatible with Go net/rpc standard.
  • ID based routing and Key exchange built on Secure Enhanced DHT.
  • use MessagePack for serialization which support most types without writing Marshal and Unmarshal.
  • Crypto Schema
    • Use Elliptic Curve Secp256k1 for Asymmetric Encryption
    • ECDH for Key Exchange
    • PKCS#7 for padding
    • AES-256-CFB for Symmetric Encryption
    • Private key protected by master key
    • Annoymous connection is also supported
  • DHT persistence layer has 2 implementations:
    • SQLite3 based simple traditional DHT
    • BFT based 2PC strong consistent DHT
  • Connection pool based on Yamux, make thousands of connections multiplexed over One TCP connection.

Stack

  • DH-RPC = TLS - Cert + DHT
    • RPC Layer: compatible with golang net/rpc
    • Naming Layer: Consistent Secure DHT
    • Pooling Layer: session pool built on Yamux
    • Multiplex Layer: Yamux by Hashicorp
    • Transport Security Layer: Enhanced TLS
    • Network Layer: TCP or KCP for optional later

How it worked

As we all know, Elliptic Curve Public Key is computed form Private Key

ECPubKey := ECPrivKey.Pub()

DH-RPC node is generated by hash of NodePublicKey and an Uint256 Nonce:

NodeID := sha256(blake2b-512(NodePublicKey + Uint256Nonce))

DHT is used to hold the NodeID:PublicKey NodeID:Addr map. A RPC connection will do ECDH to get shared secret after TCP connection established.

GenECDHSharedSecret(APub, BPriv) == GenECDHSharedSecret(BPub, APriv)

The main procedure is described as sequence chart below:

So anyone tries to fake NodeB by overwriting the address or public key on DHT without the private key of NodeB will be failed to get the correct shared secret.

Example

The example below is 1 tracker and 2 nodes.

Tracker Code
package main

import (
	"os"

	"github.com/SQLess/SQLess/conf"
	"github.com/SQLess/SQLess/consistent"
	"github.com/SQLess/SQLess/route"
	"github.com/SQLess/SQLess/rpc"
	"github.com/SQLess/SQLess/utils/log"
)

func main() {
	//log.SetLevel(log.DebugLevel)
	conf.GConf, _ = conf.LoadConfig(os.Args[1])
	log.Debugf("GConf: %#v", conf.GConf)

	// Init Key Management System
	route.InitKMS(conf.GConf.PubKeyStoreFile)

	// Creating DHT RPC with simple persistence layer
	dht, err := route.NewDHTService(conf.GConf.DHTFileName, new(consistent.KMSStorage), true)
	if err != nil {
		log.Fatalf("init dht failed: %v", err)
	}

	// Register DHT service
	server, err := rpc.NewServerWithService(rpc.ServiceMap{route.DHTRPCName: dht})
	if err != nil {
		log.Fatal(err)
	}

	// Init RPC server with an empty master key, which is not recommend
	addr := conf.GConf.ListenAddr
	masterKey := []byte("")
	server.InitRPCServer(addr, conf.GConf.PrivateKeyFile, masterKey)
	server.Serve()
}
Node Code
package main

import (
	"bufio"
	"fmt"
	"os"
	"strings"

	"github.com/SQLess/SQLess/conf"
	"github.com/SQLess/SQLess/proto"
	"github.com/SQLess/SQLess/route"
	"github.com/SQLess/SQLess/rpc"
	"github.com/SQLess/SQLess/utils/log"
)

// TestService to be register to RPC server
type TestService struct {
}

func NewTestService() *TestService {
	return &TestService{}
}

func (s *TestService) Talk(msg string, ret *string) error {
	fmt.Println(msg)
	resp := fmt.Sprintf("got %s", msg)
	*ret = resp
	return nil
}

func main() {
	//log.SetLevel(log.DebugLevel)
	conf.GConf, _ = conf.LoadConfig(os.Args[1])
	log.Debugf("GConf: %#v", conf.GConf)

	// Init Key Management System
	route.InitKMS(conf.GConf.PubKeyStoreFile)

	// Register DHT service
	server, err := rpc.NewServerWithService(rpc.ServiceMap{
		"Test": NewTestService(),
	})
	if err != nil {
		log.Fatal(err)
	}

	// Init RPC server with an empty master key, which is not recommend
	addr := conf.GConf.ListenAddr
	masterKey := []byte("")
	server.InitRPCServer(addr, conf.GConf.PrivateKeyFile, masterKey)

	// Start Node RPC server
	go server.Serve()

	// Register Node public key, addr to Tracker(BP)
	for _, n := range conf.GConf.KnownNodes {
		client := rpc.NewCaller()
		reqA := &proto.PingReq{
			Node: n,
		}
		respA := new(proto.PingResp)
		err = client.CallNode(conf.GConf.BP.NodeID, "DHT.Ping", reqA, respA)
		if err != nil {
			log.Fatal(err)
		}
		log.Debugf("respA: %v", respA)
	}

	// Read target node and connect to it
	scanner := bufio.NewScanner(os.Stdin)
	fmt.Print("Input target node ID: ")
	scanner.Scan()
	if scanner.Err() == nil {
		target := proto.NodeID(strings.TrimSpace(scanner.Text()))
		pc := rpc.NewPersistentCaller(target)
		log.Debugf("connecting to %s", scanner.Text())

		fmt.Print("Input msg: ")
		for scanner.Scan() {
			input := scanner.Text()
			log.Debugf("get input %s", input)
			repSimple := new(string)
			err = pc.Call("Test.Talk", input, repSimple)
			if err != nil {
				log.Fatal(err)
			}
			log.Infof("resp msg: %s", *repSimple)
		}
	}
}

Start tracker and node1, node2

$ ./runTracker.sh &
$ ./runNode2.sh &
$ ./runNode1.sh
$ Input target node ID: 000005aa62048f85da4ae9698ed59c14ec0d48a88a07c15a32265634e7e64ade #node2
$ Input msg: abcdefg

Documentation

Overview

Package rpc provides a RPC implementation over the node-oriented connections.

Index

Constants

This section is empty.

Variables

View Source
var (
	Dial   = naconn.Dial
	DialEx = naconn.DialEx
	Accept = naconn.Accept
)

The following variables define a method set to Dial/Accept node-oriented connections for this RPC package.

TODO(leventeliu): allow to config other node-oriented connection dialer/accepter.

Functions

func AcceptNAConn

func AcceptNAConn(ctx context.Context, conn net.Conn) (net.Conn, error)

AcceptNAConn accepts connection as a naconn.NAConn.

Default accept function of RPC server, and also the only accept function for the connections from a NAConnPool.

Corresponding dialer is naconn.Dial/naconn.DialEx.

func AcceptRawConn

func AcceptRawConn(ctx context.Context, conn net.Conn) (net.Conn, error)

AcceptRawConn accepts raw connection without encryption or node-oriented mechanism.

Corresponding dialer is net.Dial.

func NewClient

func NewClient(stream io.ReadWriteCloser) (client *rpc.Client)

NewClient returns a new Client with stream.

NOTE(leventeliu): ownership of stream is passed through:

io.Closer -> rpc.ClientCodec -> *rpc.Client

Closing the *rpc.Client will cause io.Closer invoked.

func ServeDirect

func ServeDirect(
	ctx context.Context, server *rpc.Server, stream io.ReadWriteCloser, remote *proto.RawNodeID,
)

ServeDirect serves data stream directly.

Types

type AcceptConn

type AcceptConn func(ctx context.Context, conn net.Conn) (net.Conn, error)

AcceptConn defines the function type which accepts a raw connetion as a specific type of connection.

func NewAcceptCryptoConnFunc

func NewAcceptCryptoConnFunc(handler etls.CipherHandler) AcceptConn

NewAcceptCryptoConnFunc returns a AcceptConn function which accepts raw connection and uses the cipher handler to handle it as etls.CryptoConn.

Corresponding dialer is crypto/etls.Dial.

type Caller

type Caller struct {
	// contains filtered or unexported fields
}

Caller is a wrapper for session pooling and RPC calling.

func NewCaller

func NewCaller() *Caller

NewCaller returns a new RPCCaller.

func NewCallerWithPool

func NewCallerWithPool(pool NOClientPool) *Caller

NewCallerWithPool returns a new Caller with the pool.

func (*Caller) CallNode

func (c *Caller) CallNode(node proto.NodeID, method string, args, reply interface{}) (err error)

CallNode calls node method.

func (*Caller) CallNodeWithContext

func (c *Caller) CallNodeWithContext(
	ctx context.Context, node proto.NodeID, method string, args, reply interface{}) (err error,
)

CallNodeWithContext calls node method with context.

type Client

type Client interface {
	Call(serviceMethod string, args interface{}, reply interface{}) error
	Go(serviceMethod string, args interface{}, reply interface{}, done chan *rpc.Call) *rpc.Call
	Close() error
}

Client defines the RPC client interface.

func DialToNodeWithPool

func DialToNodeWithPool(pool NOClientPool, nodeID proto.NodeID, isAnonymous bool) (Client, error)

DialToNodeWithPool ties use connection in pool, if fails then connects to the node with nodeID.

type ClientPool

type ClientPool struct {
	// contains filtered or unexported fields
}

ClientPool is the struct type of connection pool.

func (*ClientPool) Close

func (p *ClientPool) Close() error

Close closes all FreeLists in the pool.

func (*ClientPool) Get

func (p *ClientPool) Get(id proto.NodeID) (cli Client, err error)

Get returns existing freelist to the node, if not exist try best to create one.

func (*ClientPool) GetEx

func (p *ClientPool) GetEx(id proto.NodeID, isAnonymous bool) (cli Client, err error)

GetEx returns a client with an one-off connection if it's anonymous, otherwise returns existing freelist with Get.

func (*ClientPool) Len

func (p *ClientPool) Len() (total int)

Len returns the connection count in the pool.

func (*ClientPool) Remove

func (p *ClientPool) Remove(id proto.NodeID)

Remove the node freelist in the pool.

type LastErrSetter

type LastErrSetter interface {
	SetLastErr(error)
}

LastErrSetter defines the extend method to set client last error.

type NOClientPool

type NOClientPool interface {
	Get(remote proto.NodeID) (Client, error)
	GetEx(remote proto.NodeID, isAnonymous bool) (Client, error)
	Close() error
}

NOClientPool defines the node-oriented client pool interface.

type NodeAwareServerCodec

type NodeAwareServerCodec struct {
	rpc.ServerCodec
	NodeID *proto.RawNodeID
	Ctx    context.Context
}

NodeAwareServerCodec wraps normal rpc.ServerCodec and inject node id during request process.

func NewNodeAwareServerCodec

func NewNodeAwareServerCodec(ctx context.Context, codec rpc.ServerCodec, nodeID *proto.RawNodeID) *NodeAwareServerCodec

NewNodeAwareServerCodec returns new NodeAwareServerCodec with normal rpc.ServerCode and proto.RawNodeID.

func (*NodeAwareServerCodec) ReadRequestBody

func (nc *NodeAwareServerCodec) ReadRequestBody(body interface{}) (err error)

ReadRequestBody override default rpc.ServerCodec behaviour and inject remote node id into request.

type PCaller

type PCaller interface {
	Call(method string, request interface{}, reply interface{}) (err error)
	Close()
	Target() string
	New() PCaller // returns new instance of current caller
}

PCaller defines generic interface shared with PersistentCaller and RawCaller.

type PersistentCaller

type PersistentCaller struct {

	//TargetAddr string
	TargetID proto.NodeID
	sync.Mutex
	// contains filtered or unexported fields
}

PersistentCaller is a wrapper for session pooling and RPC calling.

func NewPersistentCaller

func NewPersistentCaller(target proto.NodeID) *PersistentCaller

NewPersistentCaller returns a persistent RPCCaller.

IMPORTANT: If a PersistentCaller is firstly used by a DHT.Ping, which is an anonymous
ETLS connection. It should not be used by any other RPC except DHT.Ping.

func NewPersistentCallerWithPool

func NewPersistentCallerWithPool(pool NOClientPool, target proto.NodeID) *PersistentCaller

NewPersistentCallerWithPool returns a persistent RPCCaller.

IMPORTANT: If a PersistentCaller is firstly used by a DHT.Ping, which is an anonymous
ETLS connection. It should not be used by any other RPC except DHT.Ping.

func (*PersistentCaller) Call

func (c *PersistentCaller) Call(method string, args interface{}, reply interface{}) (err error)

Call invokes the named function, waits for it to complete, and returns its error status.

func (*PersistentCaller) Close

func (c *PersistentCaller) Close()

Close closes the stream and RPC client.

func (*PersistentCaller) New

func (c *PersistentCaller) New() PCaller

New returns brand new persistent caller.

func (*PersistentCaller) ResetClient

func (c *PersistentCaller) ResetClient() (err error)

ResetClient resets client.

func (*PersistentCaller) Target

func (c *PersistentCaller) Target() string

Target returns the request target for logging purpose.

type ServeStream

type ServeStream func(
	ctx context.Context, server *rpc.Server, stream io.ReadWriteCloser, remote *proto.RawNodeID,
)

ServeStream defines the data stream serving function type which serves RPC at the given io.ReadWriteCloser.

type Server

type Server struct {
	Listener net.Listener
	// contains filtered or unexported fields
}

Server is the RPC server struct.

func NewServer

func NewServer() *Server

NewServer return a new Server.

func NewServerWithServeFunc

func NewServerWithServeFunc(f ServeStream) *Server

NewServerWithServeFunc return a new Server.

func NewServerWithService

func NewServerWithService(serviceMap ServiceMap) (server *Server, err error)

NewServerWithService returns a new Server and registers the Server.ServiceMap.

func (*Server) InitRPCServer

func (s *Server) InitRPCServer(
	addr string,
	privateKeyPath string,
	masterKey []byte,
) (err error)

InitRPCServer load the private key, init the crypto transfer layer and register RPC services. IF ANY ERROR returned, please raise a FATAL.

func (*Server) RegisterService

func (s *Server) RegisterService(name string, service interface{}) error

RegisterService registers service with a Service name, used by Client RPC.

func (*Server) Serve

func (s *Server) Serve()

Serve start the Server main loop,.

func (*Server) SetListener

func (s *Server) SetListener(l net.Listener)

SetListener set the service loop listener, used by func Serve main loop.

func (*Server) Stop

func (s *Server) Stop()

Stop Server main loop.

func (*Server) WithAcceptConnFunc

func (s *Server) WithAcceptConnFunc(f AcceptConn) *Server

WithAcceptConnFunc resets the AcceptConn function of server.

type ServiceMap

type ServiceMap map[string]interface{}

ServiceMap maps service name to service instance.

Directories

Path Synopsis
Package mux provides a RPC implementation with connection multiplexing.
Package mux provides a RPC implementation with connection multiplexing.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL