dht

package module
v2.0.0-...-03adce6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 28, 2023 License: MIT Imports: 50 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// ProtocolIPFS is the protocol identifier for the main Amino DHT network.
	// If the DHT is configured with this protocol, you must configure backends
	// for IPNS, Public Key, and provider records (ipns, pk, and providers
	// namespaces). Configuration validation will fail if backends are missing.
	ProtocolIPFS protocol.ID = "/ipfs/kad/1.0.0"

	// ProtocolFilecoin is the protocol identifier for Filecoin mainnet. If this
	// protocol is configured, the DHT won't automatically add support for any
	// of the above record types.
	ProtocolFilecoin protocol.ID = "/fil/kad/testnetnet/kad/1.0.0"
)
View Source
const ServiceName = "libp2p.DHT"

ServiceName is used to scope incoming streams for the resource manager.

Variables

This section is empty.

Functions

func AddrFilterIdentity

func AddrFilterIdentity(maddrs []ma.Multiaddr) []ma.Multiaddr

AddrFilterIdentity is an AddressFilter that does not apply any filtering and just returns that passed-in multi addresses without modification.

func AddrFilterPrivate

func AddrFilterPrivate(maddrs []ma.Multiaddr) []ma.Multiaddr

AddrFilterPrivate filters out any multiaddresses that are private. It evaluates the manet.IsPublicAddr on each multiaddress, and if it returns true, the multiaddress will be in the result set.

func AddrFilterPublic

func AddrFilterPublic(maddrs []ma.Multiaddr) []ma.Multiaddr

AddrFilterPublic filters out any multiaddresses that are public. It evaluates the manet.IsIPLoopback on each multiaddress, and if it returns true, the multiaddress will be in the result set.

func DefaultBootstrapPeers

func DefaultBootstrapPeers() []peer.AddrInfo

DefaultBootstrapPeers returns hard-coded public DHT bootstrap peers operated by Protocol Labs. You can configure your own set of bootstrap peers by overwriting the corresponding Config field.

func DefaultRoutingTable

func DefaultRoutingTable(nodeID kadt.PeerID) (routing.RoutingTableCpl[kadt.Key, kadt.PeerID], error)

DefaultRoutingTable returns a triert.TrieRT routing table. This routing table cannot be initialized in DefaultConfig because it requires information about the local peer.

func RoutingQuorum

func RoutingQuorum(n int) routing.Option

RoutingQuorum accepts the desired quorum that is required to terminate the search query. The quorum value must not be negative but can be 0 in which case we continue the query until we have exhausted the keyspace. If no quorum is specified, the [Config.DefaultQuorum] value will be used.

Types

type AddressFilter

type AddressFilter func([]ma.Multiaddr) []ma.Multiaddr

type Backend

type Backend interface {
	// Store stores the given value such that it can be retrieved via Fetch
	// with the same key parameter. It returns the written record. The key
	// that will be handed into the Store won't contain the namespace prefix. For
	// example, if we receive a request for /ipns/$binary_id, key will be set to
	// $binary_id. The backend implementation is free to decide how to store the
	// data in the datastore. However, it makes sense to prefix the record with
	// the namespace that this Backend operates in.
	Store(ctx context.Context, key string, value any) (any, error)

	// Fetch returns the record for the given path or a [ds.ErrNotFound] if it
	// wasn't found or another error if any occurred. key won't contain the
	// namespace prefix.
	Fetch(ctx context.Context, key string) (any, error)

	// Validate validates the given values and returns the index of the "best"
	// value or an error and -1 if all values are invalid. If the method is used
	// with a single value, it will return 0 and no error if it is valid or an
	// error and -1 if it is invalid. For multiple values, it will select the
	// "best" value based on user-defined logic and return its index in the
	// original values list. If we receive a request for /ipns/$binary_id, the
	// key parameter will be set to $binary_id. Decisions about which value is
	// the "best" from the given list must be stable. So if there are multiple
	// equally good values, the implementation must always return the same
	// index - for example, always the first good or last good value.
	Validate(ctx context.Context, key string, values ...any) (int, error)
}

A Backend implementation handles requests for certain record types from other peers. A Backend always belongs to a certain namespace. In this case a namespace is equivalent to a type of record that this DHT supports. In the case of IPFS, the DHT supports the "ipns", "pk", and "providers" namespaces and therefore uses three different backends. Depending on the request's key the DHT invokes the corresponding backend Store and Fetch methods. A key has the structure "/$namespace/$path". The DHT parses uses the $namespace part to decide which Backend to use. The $path part is then passed to the Backend's Store and Fetch methods as the "key" parameter. Backends for different namespace may or may not operate on the same underlying datastore.

To support additional record types, users would implement this Backend interface and register it for a custom namespace with the DHT Config by adding it to the [Config.Backend] map. Any PUT_VALUE/GET_VALUE requests would start to support the new record type. The requirement is though that all "any" types must be *recpb.Record types. The below interface cannot enforce that type because provider records are handled slightly differently. For example, with provider records, the return values are not assigned to the pb.Message.Record field but to the pb.Message.ProviderPeers field.

This repository defines default Backends for the "ipns", "pk", and "providers" namespaces. They can be instantiated with NewBackendIPNS, NewBackendPublicKey, and NewBackendProvider respectively.

type Config

type Config struct {
	// Clock
	Clock clock.Clock

	// Mode defines if the DHT should operate as a server or client or switch
	// between both automatically (see ModeOpt).
	Mode ModeOpt

	// Query holds the configuration used for queries managed by the DHT.
	Query *QueryConfig

	// BucketSize determines the number of closer peers to return
	BucketSize int

	// BootstrapPeers is the list of peers that should be used to bootstrap
	// into the DHT network.
	BootstrapPeers []peer.AddrInfo

	// ProtocolID represents the DHT [protocol] we can query with and respond to.
	//
	// [protocol]: https://docs.libp2p.io/concepts/fundamentals/protocols/
	ProtocolID protocol.ID

	// RoutingTable holds a reference to the specific routing table
	// implementation that this DHT should use. If this field is nil, the
	// [triert.TrieRT] routing table will be used. This field will be nil
	// in the default configuration because a routing table requires information
	// about the local node.
	RoutingTable kadt.RoutingTable

	// The Backends field holds a map of key namespaces to their corresponding
	// backend implementation. For example, if we received an IPNS record, the
	// key will have the form "/ipns/$binary_id". We will forward the handling
	// of this record to the corresponding backend behind the "ipns" key in this
	// map. A backend does record validation and handles the storage of the
	// record. If this map stays empty, it will be populated with the default
	// IPNS ([NewBackendIPNS]), PublicKey ([NewBackendPublicKey]), and
	// Providers ([NewBackendProvider]) backends.
	//
	// Backends that implement the [io.Closer] interface will get closed when
	// the DHT is closed.
	Backends map[string]Backend

	// Datastore will be used to construct the default backends. If this is nil,
	// an in-memory leveldb from [InMemoryDatastore] will be used for all
	// backends.
	// If you want to use individual datastores per backend, you will need to
	// construct them individually and register them with the above Backends
	// map. Note that if you configure the DHT to use [ProtocolIPFS] it is
	// required to register backends for the ipns, pk, and providers namespaces.
	//
	// This datastore must be thread-safe.
	Datastore Datastore

	// Logger can be used to configure a custom structured logger instance.
	// By default go.uber.org/zap is used (wrapped in ipfs/go-log).
	Logger *slog.Logger

	// TimeoutStreamIdle is the duration we're reading from a stream without
	// receiving before closing/resetting it. The timeout gets reset every time
	// we have successfully read a message from the stream.
	TimeoutStreamIdle time.Duration

	// AddressFilter is used to filter the addresses we put into the peer store and
	// also fetch from the peer store and serve to other peers. It is mainly
	// used to filter out private addresses.
	AddressFilter AddressFilter

	// MeterProvider provides access to named Meter instances. It's used to,
	// e.g., expose prometheus metrics. Check out the [opentelemetry docs]:
	//
	// [opentelemetry docs]: https://opentelemetry.io/docs/specs/otel/metrics/api/#meterprovider
	MeterProvider metric.MeterProvider

	// TracerProvider provides Tracers that are used by instrumentation code to
	// trace computational workflows. Check out the [opentelemetry docs]:
	//
	// [opentelemetry docs]: https://opentelemetry.io/docs/concepts/signals/traces/#tracer-provider
	TracerProvider trace.TracerProvider
}

Config contains all the configuration options for a DHT. Use DefaultConfig to build up your own configuration struct. The DHT constructor New uses the below method *Config.Validate to test for violations of configuration invariants.

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig returns a configuration struct that can be used as-is to instantiate a fully functional DHT client. All fields that are nil require some additional information to instantiate. The default values for these fields come from separate top-level methods prefixed with Default.

func (*Config) Validate

func (c *Config) Validate() error

Validate validates the configuration struct it is called on. It returns an error if any configuration issue was detected and nil if this is a valid configuration.

type ConfigurationError

type ConfigurationError struct {
	Component string
	Err       error
}

A ConfigurationError is returned when a component's configuration is found to be invalid or unusable.

func (*ConfigurationError) Error

func (e *ConfigurationError) Error() string

func (*ConfigurationError) Unwrap

func (e *ConfigurationError) Unwrap() error

type DHT

type DHT struct {
	// contains filtered or unexported fields
}

DHT is an implementation of Kademlia with S/Kademlia modifications. It is used to implement the base Routing module.

func New

func New(h host.Host, cfg *Config) (*DHT, error)

New constructs a new DHT for the given underlying host and with the given configuration. Use DefaultConfig to construct a configuration.

func (*DHT) AddAddresses

func (d *DHT) AddAddresses(ctx context.Context, ais []peer.AddrInfo, ttl time.Duration) error

AddAddresses suggests peers and their associated addresses to be added to the routing table. Addresses will be added to the peerstore with the supplied time to live.

func (*DHT) Bootstrap

func (d *DHT) Bootstrap(ctx context.Context) error

func (*DHT) Close

func (d *DHT) Close() error

Close cleans up all resources associated with this DHT.

func (*DHT) FindPeer

func (d *DHT) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error)

func (*DHT) FindProvidersAsync

func (d *DHT) FindProvidersAsync(ctx context.Context, c cid.Cid, count int) <-chan peer.AddrInfo

func (*DHT) GetValue

func (d *DHT) GetValue(ctx context.Context, key string, opts ...routing.Option) ([]byte, error)

func (*DHT) Provide

func (d *DHT) Provide(ctx context.Context, c cid.Cid, brdcst bool) error

func (*DHT) PutValue

func (d *DHT) PutValue(ctx context.Context, keyStr string, value []byte, opts ...routing.Option) error

PutValue satisfies the [routing.Routing] interface and will add the given value to the k-closest nodes to keyStr. The parameter keyStr should have the format `/$namespace/$binary_id`. Namespace examples are `pk` or `ipns`. To identify the closest peers to keyStr, that complete string will be SHA256 hashed.

func (*DHT) SearchValue

func (d *DHT) SearchValue(ctx context.Context, keyStr string, options ...routing.Option) (<-chan []byte, error)

SearchValue will search in the DHT for keyStr. keyStr must have the form `/$namespace/$binary_id`

type Datastore

type Datastore interface {
	ds.Datastore
	ds.BatchingFeature
	ds.TxnFeature
}

Datastore is an interface definition that gathers the datastore requirements. The DHT requires the datastore to support batching and transactions. Example datastores that implement both features are leveldb and badger. leveldb can also be used in memory - this is used as the default datastore.

func InMemoryDatastore

func InMemoryDatastore() (Datastore, error)

InMemoryDatastore returns an in-memory leveldb datastore.

type ModeOpt

type ModeOpt string

ModeOpt describes in which mode this DHT process should operate in. Possible options are client, server, and any variant that switches between both automatically based on public reachability. The DHT receives reachability updates from libp2p via the EvtLocalReachabilityChanged event. A DHT that operates in client mode won't register a stream handler for incoming requests and therefore won't store, e.g., any provider or IPNS records. A DHT in server mode, on the other hand, does all of that.

The unexported "mode" type, on the other hand, captures the current state that the DHT is in. This can either be client or server.

const (
	// ModeOptClient configures the DHT to only operate in client mode
	// regardless of potential public reachability.
	ModeOptClient ModeOpt = "client"

	// ModeOptServer configures the DHT to always operate in server mode
	// regardless of potentially not being publicly reachable.
	ModeOptServer ModeOpt = "server"

	// ModeOptAutoClient configures the DHT to start operating in client mode
	// and if publicly reachability is detected to switch to server mode.
	ModeOptAutoClient ModeOpt = "auto-client"

	// ModeOptAutoServer configures the DHT to start operating in server mode,
	// and if it is detected that we don't have public reachability switch
	// to client mode.
	ModeOptAutoServer ModeOpt = "auto-server"
)

type ProvidersBackend

type ProvidersBackend struct {
	// contains filtered or unexported fields
}

ProvidersBackend implements the Backend interface and handles provider record requests for the "/providers/" namespace.

func NewBackendProvider

func NewBackendProvider(pstore peerstore.Peerstore, dstore ds.Datastore, cfg *ProvidersBackendConfig) (be *ProvidersBackend, err error)

NewBackendProvider initializes a new backend for the "providers" namespace that can store and fetch provider records from the given datastore. The values passed into ProvidersBackend.Store must be of type peer.AddrInfo. The values returned from ProvidersBackend.Fetch will be of type [*providerSet] (unexported). The cfg parameter can be nil, in which case the DefaultProviderBackendConfig will be used.

func (*ProvidersBackend) Close

func (p *ProvidersBackend) Close() error

Close is here to implement the io.Closer interface. This will get called when the DHT "shuts down"/closes.

func (*ProvidersBackend) Fetch

func (p *ProvidersBackend) Fetch(ctx context.Context, key string) (any, error)

Fetch implements the Backend interface. In the case of a ProvidersBackend this method returns a [providerSet] (unexported) that contains all peer IDs and known multiaddresses for the given key. The key parameter should be of the form "/providers/$binary_multihash".

func (*ProvidersBackend) StartGarbageCollection

func (p *ProvidersBackend) StartGarbageCollection()

StartGarbageCollection starts the garbage collection loop. The garbage collection interval can be configured with [ProvidersBackendConfig.GCInterval]. The garbage collection loop can only be started a single time. Use [StopGarbageCollection] to stop the garbage collection loop.

func (*ProvidersBackend) StopGarbageCollection

func (p *ProvidersBackend) StopGarbageCollection()

StopGarbageCollection stops the garbage collection loop started with [StartGarbageCollection]. If garbage collection is not running, this method is a no-op.

func (*ProvidersBackend) Store

func (p *ProvidersBackend) Store(ctx context.Context, key string, value any) (any, error)

Store implements the Backend interface. In the case of a ProvidersBackend this method accepts a peer.AddrInfo as a value and stores it in the configured datastore.

func (*ProvidersBackend) Validate

func (p *ProvidersBackend) Validate(ctx context.Context, key string, values ...any) (int, error)

Validate verifies that the given values are of type peer.AddrInfo. Then it decides based on the number of attached multi addresses which value is "better" than the other. If there is a tie, Validate will return the index of the earliest occurrence.

type ProvidersBackendConfig

type ProvidersBackendConfig struct {

	// ProvideValidity specifies for how long provider records are valid
	ProvideValidity time.Duration

	// AddressTTL specifies for how long we will keep around provider multi
	// addresses in the peerstore's address book. If such multiaddresses are
	// present we send them alongside the peer ID to the requesting peer. This
	// prevents the necessity for a second look for the multiaddresses on the
	// requesting peers' side.
	AddressTTL time.Duration

	// CacheSize specifies the LRU cache size
	CacheSize int

	// GCInterval defines how frequently garbage collection should run
	GCInterval time.Duration

	// Logger is the logger to use
	Logger *slog.Logger

	// Tele holds a reference to the telemetry struct to capture metrics and
	// traces.
	Tele *Telemetry

	// AddressFilter is a filter function that any addresses that we attempt to
	// store or fetch from the peerstore's address book need to pass through.
	// If you're manually configuring this backend, make sure to align the
	// filter with the one configured in [Config.AddressFilter].
	AddressFilter AddressFilter
	// contains filtered or unexported fields
}

ProvidersBackendConfig is used to construct a ProvidersBackend. Use DefaultProviderBackendConfig to get a default configuration struct and then modify it to your liking.

func DefaultProviderBackendConfig

func DefaultProviderBackendConfig() (*ProvidersBackendConfig, error)

DefaultProviderBackendConfig returns a default ProvidersBackend configuration. Use this as a starting point and modify it. If a nil configuration is passed to NewBackendProvider, this default configuration here is used.

type QueryConfig

type QueryConfig struct {
	// Concurrency defines the maximum number of in-flight queries that may be waiting for message responses at any one time.
	Concurrency int

	// Timeout defines the time to wait before terminating a query that is not making progress
	Timeout time.Duration

	// RequestConcurrency defines the maximum number of concurrent requests that each query may have in flight.
	// The maximum number of concurrent requests is equal to [RequestConcurrency] multiplied by [Concurrency].
	RequestConcurrency int

	// RequestTimeout defines the time to wait before terminating a request to a node that has not responded.
	RequestTimeout time.Duration

	// DefaultQuorum specifies the minimum number of identical responses before
	// a SearchValue/GetValue operation returns. The responses must not only be
	// identical, but the responses must also correspond to the "best" records
	// we have observed in the network during the SearchValue/GetValue
	// operation. A DefaultQuorum of 0 means that we search the network until
	// we have exhausted the keyspace.
	DefaultQuorum int
}

QueryConfig contains the configuration options for queries managed by a DHT.

func DefaultQueryConfig

func DefaultQueryConfig() *QueryConfig

DefaultQueryConfig returns the default query configuration options for a DHT.

func (*QueryConfig) Validate

func (cfg *QueryConfig) Validate() error

Validate checks the configuration options and returns an error if any have invalid values.

type RecordBackend

type RecordBackend struct {
	// contains filtered or unexported fields
}

func NewBackendIPNS

func NewBackendIPNS(ds ds.TxnDatastore, kb peerstore.KeyBook, cfg *RecordBackendConfig) (be *RecordBackend, err error)

NewBackendIPNS initializes a new backend for the "ipns" namespace that can store and fetch IPNS records from the given datastore. The stored and returned records must be of type *recpb.Record. The cfg parameter can be nil, in which case the DefaultRecordBackendConfig will be used.

func NewBackendPublicKey

func NewBackendPublicKey(ds ds.TxnDatastore, cfg *RecordBackendConfig) (be *RecordBackend, err error)

NewBackendPublicKey initializes a new backend for the "pk" namespace that can store and fetch public key records from the given datastore. The stored and returned records must be of type *recpb.Record. The cfg parameter can be nil, in which case the DefaultRecordBackendConfig will be used.

func (*RecordBackend) Fetch

func (r *RecordBackend) Fetch(ctx context.Context, key string) (any, error)

func (*RecordBackend) Store

func (r *RecordBackend) Store(ctx context.Context, key string, value any) (any, error)

func (*RecordBackend) Validate

func (r *RecordBackend) Validate(ctx context.Context, key string, values ...any) (int, error)

type RecordBackendConfig

type RecordBackendConfig struct {
	MaxRecordAge time.Duration
	Logger       *slog.Logger
	Tele         *Telemetry
	// contains filtered or unexported fields
}

func DefaultRecordBackendConfig

func DefaultRecordBackendConfig() (*RecordBackendConfig, error)

type Telemetry

type Telemetry struct {
	Tracer                 trace.Tracer
	ReceivedMessages       metric.Int64Counter
	ReceivedMessageErrors  metric.Int64Counter
	ReceivedBytes          metric.Int64Histogram
	InboundRequestLatency  metric.Float64Histogram
	OutboundRequestLatency metric.Float64Histogram
	SentMessages           metric.Int64Counter
	SentMessageErrors      metric.Int64Counter
	SentRequests           metric.Int64Counter
	SentRequestErrors      metric.Int64Counter
	SentBytes              metric.Int64Histogram
	LRUCache               metric.Int64Counter
	NetworkSize            metric.Int64Counter
}

Telemetry is the struct that holds a reference to all metrics and the tracer. Initialize this struct with NewTelemetry. Make sure to also register the [MeterProviderOpts] with your custom or the global metric.MeterProvider.

To see the documentation for each metric below, check out NewTelemetry and the metric.WithDescription() calls when initializing each metric.

func NewTelemetry

func NewTelemetry(meterProvider metric.MeterProvider, tracerProvider trace.TracerProvider) (*Telemetry, error)

NewTelemetry initializes a Telemetry struct with the given meter and tracer providers. It constructs the different metric counters and histograms. The histograms have custom boundaries. Therefore, the given metric.MeterProvider should have the custom view registered that [MeterProviderOpts] returns.

func NewWithGlobalProviders

func NewWithGlobalProviders() (*Telemetry, error)

NewWithGlobalProviders uses the global meter and tracer providers from opentelemetry. Check out the documentation of [MeterProviderOpts] for implications of using this constructor.

Directories

Path Synopsis
internal
coord/brdcst
Package brdcst contains state machines that implement algorithms for broadcasting records into the DHT network.
Package brdcst contains state machines that implement algorithms for broadcasting records into the DHT network.
coord/internal/tiny
Package tiny implements Kademlia types suitable for tiny test networks
Package tiny implements Kademlia types suitable for tiny test networks
Package kadt contains the kademlia types for interacting with go-kademlia.
Package kadt contains the kademlia types for interacting with go-kademlia.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL