dht

package module
v0.0.0-...-a0ce14d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 12, 2022 License: MIT Imports: 22 Imported by: 0

README

DHT Go Reference Go Report Card Build Status

A Kademlia DHT implementation for Go with a focus on performance and ease of use. It is not seek to conform to any existing standards or implementations.

  • implements a 160 bit keyspace
  • replication factor (K) of 20
  • wire protocol using flatbuffers
  • SO_REUSEPORT to concurrently handle requests on the same port
  • asynchronous api
  • supports values larger than MTU

Usage

In order to start a cluster of nodes, you will first need a bootstrap node that all other nodes can connect to first. To start a bootstrap node:

func main() {
    cfg := &dht.Config{
        ListenAddress: "127.0.0.1:9000", // udp address to bind to
        Listeners: 4,                    // number of socket listeners, defaults to GOMAXPROCS
        Timeout: time.Minute / 2         // request timeout, defaults to 1 minute
    }

    dht, err := dht.New(cfg)
    if err != nil {
        log.Fatal(err)
    }

    log.Println("bootstrap node started!")
}

Once a bootstrap node is up and runing, you can add other nodes to the network:

func main() {
    cfg := &dht.Config{
        ListenAddress: "127.0.0.1:9001", // udp address to bind to
        BootstrapAddresses: []string{
            "127.0.0.1:9000",
        },
        Listeners: 4,                    // number of socket listeners, defaults to GOMAXPROCS
        Timeout: time.Minute / 2         // request timeout, defaults to 1 minute
    }

    dht, err := dht.New(cfg)
    if err != nil {
        log.Fatal(err)
    }

    log.Println("node started!")
}

From any node you can then store values as follows:

func main() {
    ...

    // helper function to construct a sha1 hash that
    // will be used as the values key
    myKey := dht.Key("my-awesome-key")
    myValue := []byte("my-even-more-awesome-value")

    // stores a value for a given amount of time
    dht.Store(myKey, myValue, time.Hour, func(err error) {
        if err != nil {
            log.Fatal(err)
        }
        log.Printf("successfully stored key: %s -> %s", string(myKey), string(myValue))
    })
}

Once your value is stored, you can retreive it from the network as follows:

func main() {
    ...

    // finds the value. please note it is not safe to use the value outside
    // of the provided callback unless it is copied
    dht.Find(myKey, func(value []byte, err error) {
        if err != nil {
            log.Fatal(err)
        }
        log.Printf("successfully retrieved key: %s -> %s !\n", string(myKey), string(value))
    })
}

OS Tuning

For most linux distros, socket send and receive buffers are set very low. This will almost certainly result in large amounts of packet loss at higher throughput levels as these buffers get overrun.

How large these buffers will need to be will be dependent on your workload, so you should experiment to find the correct value.

You can temporarily increase the of the read and write buffers via sysctl:

# set the rmem and wmem buffers to ~128 MB each
$ sysctl -w net.core.rmem_max=536870912 && sysctl -w net.core.rmem_default=134217728 && sysctl -w net.core.wmem_max=536870912 && sysctl -w net.core.wmem_default=134217728

Development

To re-generate the flatbuffer definitions for the wire protocol:

$ make generate

To run tests:

$ go test -v -race

To run benchmarks:

$ go test -v -bench=.

Implemented

  • routing
  • storage (custom)
  • storage (in-memory)
  • ping
  • store
  • findNode
  • findValue
  • benchmarks
  • node join/leave
  • user defined storage
  • multiple values per store request
  • handles packets larger than MTU
  • multiple values per key
  • batch socket reads and writes
  • peer refresh
  • key refresh
  • latency based route selection

Future Improvements

  • io_uring socket handler
  • storage (persistent)
  • NAT traversal
  • support SO_REUSEPORT on mac/windows
  • configurable logging
  • ntp time

Documentation

Index

Constants

View Source
const (
	// PacketHeaderSize the size of the header we use to reconstruct data
	PacketHeaderSize = KEY_BYTES + 4

	// MaxEventSize the maximum size of an event packet size
	MaxEventSize = 65024

	// MaxPacketSize the size of packets we will send according to MTU,
	// minus a 8 bytes for the UDP header
	MaxPacketSize = 1472

	// MaxPayloadSize the maximum payload of our packet. The max packet size,
	// minus 24 bytes for our fragment header
	MaxPayloadSize = MaxPacketSize - PacketHeaderSize
)
View Source
const (
	// K number of nodes in a bucket
	K = 20
	// KEY_BITS number of bits in a key
	KEY_BITS = 160
	// KEY_BYTES number of bytes in a key
	KEY_BYTES = KEY_BITS / 8
)

Variables

View Source
var (
	// ErrRequestTimeout returned when a pending request has not recevied a response before the TTL period
	ErrRequestTimeout = errors.New("request timeout")
)

Functions

func Key

func Key(k any) []byte

Key creates a new 20 byte key hasehed with sha1 from a string, byte slice or int

Types

type Config

type Config struct {
	// LocalID the id of this node. If not specified, a random id will be generated
	LocalID []byte
	// ListenAddress the udp ip and port to listen on
	ListenAddress string
	// BootstrapAddresses the udp ip and port of the bootstrap nodes
	BootstrapAddresses []string
	// Listeners the number of threads that will listen on the designated udp port
	Listeners int
	// Timeout the amount of time before a peer is declared unresponsive and removed
	Timeout time.Duration
	// Storage implementation to use for storing key value pairs
	Storage Storage
	// SocketBufferSize sets the size of the udp sockets send and receive buffer
	SocketBufferSize int
	// SocketBatchSize the batch size of udp messages that will be written to the underlying socket
	SocketBatchSize int
	// SocketBatchInterval the period with which the current batch of udp messages will be written to the underlying socket if not full
	SocketBatchInterval time.Duration
	// Logging enables basic logging
	Logging bool
}

Config configuration parameters for the dht

type DHT

type DHT struct {
	// contains filtered or unexported fields
}

DHT represents the distributed hash table

func New

func New(cfg *Config) (*DHT, error)

New creates a new dht

func (*DHT) Close

func (d *DHT) Close() error

Close shuts down the dht

func (*DHT) Find

func (d *DHT) Find(key []byte, callback func(value []byte, err error), opts ...*FindOption)

Find finds a value on the network if it exists. If the key being queried has multiple values, the callback will be invoked for each result Any returned value will not be safe to use outside of the callback, so you should copy it if its needed elsewhere

func (*DHT) Store

func (d *DHT) Store(key, value []byte, ttl time.Duration, callback func(err error))

Store a value on the network. If the value fails to store, the provided callback will be returned with the error

type FindOption

type FindOption struct {
	// contains filtered or unexported fields
}

FindOption for configuring find requests

func ValuesFrom

func ValuesFrom(from time.Time) *FindOption

ValuesFrom filters results to only those that were created after a given timestmap this is useful for repeat queries where duplicates ideally should be avoided

type Storage

type Storage interface {
	Get(key []byte, from time.Time) ([]*Value, bool)
	Set(key, value []byte, created time.Time, ttl time.Duration) bool
	Iterate(cb func(value *Value) bool)
}

Storage defines the storage interface used by the DLT

type Value

type Value struct {
	Key     []byte
	Value   []byte
	TTL     time.Duration
	Created time.Time
	// contains filtered or unexported fields
}

Value represents the value to be stored

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL