exchange

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 11, 2022 License: BSD-3-Clause Imports: 73 Imported by: 1

Documentation

Index

Constants

View Source
const (
	// MaxStreamOpenAttempts is the number of times we try opening a stream with a given peer before giving up
	MaxStreamOpenAttempts = 5
	// MinAttemptDuration is the minimum amount of time we should wait before trying again
	MinAttemptDuration = 1 * time.Second
	// MaxAttemptDuration is maximum delay we should wait before trying again
	MaxAttemptDuration = 5 * time.Minute
)
View Source
const DefaultHashFunction = uint64(mh.SHA2_256)

DefaultHashFunction used for generating CIDs of imported data

View Source
const FilQueryProtocolID = protocol.ID("/fil/retrieval/qry/1.0.0")

FilQueryProtocolID is the protocol for querying information about retrieval deal parameters from Filecoin storage miners

View Source
const HeyProtocol = "/myel/pop/hey/1.0"

HeyProtocol identifies the supply greeter protocol

View Source
const KIndex = "idx"

KIndex is the datastore key for persisting the index of a workdag

View Source
const PopQueryProtocolID = protocol.ID("/myel/pop/query/1.0")

PopQueryProtocolID is the protocol for exchanging information about retrieval deal parameters from retrieval providers

View Source
const PopRequestProtocolID = protocol.ID("/myel/pop/request/1.0")

PopRequestProtocolID is the protocol for requesting caches to store new content

View Source
const RequestTopic = "/myel/pop/request/"

RequestTopic listens for peers looking for content blocks

Variables

View Source
var DefaultDispatchOptions = DispatchOptions{
	BackoffMin:      5 * time.Second,
	BackoffAttempts: 4,
	RF:              6,
}

DefaultDispatchOptions provides useful defaults We can change these if the content requires a long transfer time

View Source
var ErrNoStrategy = errors.New("no strategy")

ErrNoStrategy is returned when we try querying content without a read strategy

View Source
var ErrRefAlreadyExists = errors.New("ref already exists")

ErrRefAlreadyExists is used when trying to set a ref when one is already stored

View Source
var ErrRefNotFound = errors.New("ref not found")

ErrRefNotFound is returned when a given ref is not in the store

View Source
var ErrUserDeniedOffer = errors.New("user denied offer")

ErrUserDeniedOffer is returned when a user denies an offer

View Source
var Regions = map[string]Region{
	"Global":       global,
	"Asia":         asia,
	"Africa":       africa,
	"SouthAmerica": southAmerica,
	"NorthAmerica": northAmerica,
	"Europe":       europe,
	"Oceania":      oceania,
}

Regions is a list of preset regions

Functions

func KeyFromPath

func KeyFromPath(p string) string

KeyFromPath returns a key name from a file path

func NewDataTransfer

func NewDataTransfer(ctx context.Context, h host.Host, gs graphsync.GraphExchange, ds datastore.Batching, dsprefix string, dir string) (datatransfer.Manager, error)

NewDataTransfer packages together all the things needed for a new manager to work

func OpenStream

func OpenStream(ctx context.Context, h host.Host, p peer.ID, protos []protocol.ID) (network.Stream, error)

OpenStream is a generic method for opening streams with a backoff.

func PeekOfferID

func PeekOfferID(r io.Reader, buf *bytes.Buffer) (string, error)

PeekOfferID decodes the ID field only and returns the value while copying the bytes in a buffer

func SelectCheapest

func SelectCheapest(after int, t time.Duration) func(OfferExecutor) OfferWorker

SelectCheapest waits for a given amount of offers or delay whichever comes first and selects the cheapest then continues receiving offers while the transfer executes. If the transfer fails it will select the next cheapest given the buffered offers

func SelectFirstLowerThan

func SelectFirstLowerThan(amount abi.TokenAmount) func(oe OfferExecutor) OfferWorker

SelectFirstLowerThan returns the first offer which price is lower than given amount it keeps collecting offers below price threshold to fallback on before completing execution

func TransportConfigurer

func TransportConfigurer(idx *Index, isg IdxStoreGetter, pid peer.ID) datatransfer.TransportConfigurer

TransportConfigurer configurers the graphsync transport to use a custom blockstore per content

Types

type DataRef

type DataRef struct {
	PayloadCID  cid.Cid
	PayloadSize int64
	Keys        [][]byte
	Freq        int64
	BucketID    int64
	// contains filtered or unexported fields
}

DataRef encapsulates information about a content committed for storage

func (DataRef) Has

func (d DataRef) Has(key string) bool

func (*DataRef) MarshalCBOR

func (t *DataRef) MarshalCBOR(w io.Writer) error

func (*DataRef) UnmarshalCBOR

func (t *DataRef) UnmarshalCBOR(r io.Reader) error

type DealExecParams

type DealExecParams struct {
	Accepted   bool
	Selector   ipld.Node
	TotalFunds abi.TokenAmount
}

DealExecParams are params to apply when executing a selected deal Can be used to assign different parameters than the defaults in the offer while respecting the offer conditions otherwise it will fail

type DealParam

type DealParam func(*DealExecParams)

DealParam is a functional paramter to set a value on DealExecParams

func DealFunds

func DealFunds(amount abi.TokenAmount) DealParam

DealFunds sets a total amount to load in payment channel when executing an offer

func DealSel

func DealSel(sel ipld.Node) DealParam

DealSel sets a Deal Selector parameter when executing an offer

type DealRef

type DealRef struct {
	ID    deal.ID
	Offer deal.Offer
}

DealRef is the reference to an ongoing deal

type DealSelection

type DealSelection struct {
	Offer deal.Offer
	// contains filtered or unexported fields
}

DealSelection sends the selected offer with a channel to expect confirmation on

func (DealSelection) Exec

func (ds DealSelection) Exec(pms ...DealParam)

Exec accepts execution for an offer

func (DealSelection) Next

func (ds DealSelection) Next()

Next declines an offer and moves on to the next one if available

type DispatchOption

type DispatchOption = func(*DispatchOptions)

DispatchOption is a functional option for customizing DispatchOptions

type DispatchOptions

type DispatchOptions struct {
	BackoffMin      time.Duration
	BackoffAttempts int
	RF              int
	StoreID         multistore.StoreID
	Peers           []peer.ID // optional peers to target
}

DispatchOptions exposes parameters to affect the duration of a Dispatch operation

type Entry

type Entry struct {
	// Key is string name of the entry
	Key string `json:"key"`
	// Value is the CID of the represented content
	Value cid.Cid `json:"value"`
	// Size is the original file size. Not encoded in the DAG
	Size int64 `json:"size"`
}

Entry represents a link to an item in the DAG map

type EntrySlice

type EntrySlice []Entry

EntrySlice is a slice of entries exposing method for sorting

func (EntrySlice) Len

func (es EntrySlice) Len() int

func (EntrySlice) Less

func (es EntrySlice) Less(a, b int) bool

func (EntrySlice) Swap

func (es EntrySlice) Swap(a, b int)

type Exchange

type Exchange struct {
	// contains filtered or unexported fields
}

Exchange is a financially incentivized IPLD block exchange powered by Filecoin and IPFS

func New

func New(ctx context.Context, h host.Host, ds datastore.Batching, opts Options) (*Exchange, error)

New creates a long running exchange process from a libp2p host, an IPFS datastore and some optional modules which are provided by default

func (*Exchange) DataTransfer

func (e *Exchange) DataTransfer() datatransfer.Manager

DataTransfer returns the data transfer manager instance for this exchange

func (*Exchange) Deals

func (e *Exchange) Deals() *deal.Mgr

Deals returns the deal manager

func (*Exchange) FilecoinAPI

func (e *Exchange) FilecoinAPI() filecoin.API

FilecoinAPI returns the FilecoinAPI instance for this exchange may be nil so check with IsFilecoinOnline first

func (*Exchange) FindAndRetrieve

func (e *Exchange) FindAndRetrieve(ctx context.Context, root cid.Cid) error

FindAndRetrieve starts a new transaction for fetching an entire dag on the market. It handles everything from content routing to offer selection and blocks until done. It is used in the replication protocol for retrieving new content to serve. It also sets the new received content in the index.

func (*Exchange) Index

func (e *Exchange) Index() *Index

Index returns the exchange data index

func (*Exchange) IsFilecoinOnline

func (e *Exchange) IsFilecoinOnline() bool

IsFilecoinOnline returns whether we are connected to a Filecoin blockchain gateway

func (*Exchange) Payments

func (e *Exchange) Payments() payments.Manager

Payments returns the payment manager

func (*Exchange) R

func (e *Exchange) R() *Replication

R exposes replication scheme methods

func (*Exchange) Retrieval

func (e *Exchange) Retrieval() retrieval.Manager

Retrieval exposes the retrieval manager module

func (*Exchange) Tx

func (e *Exchange) Tx(ctx context.Context, opts ...TxOption) *Tx

Tx returns a new transaction. The caller must also call tx.Close to cleanup and perist the new blocks retrieved or created by the transaction.

func (*Exchange) Wallet

func (e *Exchange) Wallet() wallet.Driver

Wallet returns the wallet API

type GossipRouting

type GossipRouting struct {
	// contains filtered or unexported fields
}

GossipRouting is a content routing service to find content providers using pubsub gossip routing

func NewGossipRouting

func NewGossipRouting(h host.Host, ps *pubsub.PubSub, meta MessageTracker, rgs []Region) *GossipRouting

NewGossipRouting creates a new GossipRouting service

func (*GossipRouting) AddAddrs

func (gr *GossipRouting) AddAddrs(p peer.ID, addrs []ma.Multiaddr)

AddAddrs adds a new peer into the host peerstore

func (*GossipRouting) Addrs

func (gr *GossipRouting) Addrs() ([]ma.Multiaddr, error)

Addrs returns the host's p2p addresses

func (*GossipRouting) NewQueryStream

func (gr *GossipRouting) NewQueryStream(dest peer.ID, protos []protocol.ID) (*QueryStream, error)

NewQueryStream creates a new query stream using the provided peer.ID to handle the Query protocols

func (*GossipRouting) Query

func (gr *GossipRouting) Query(ctx context.Context, root cid.Cid, sel ipld.Node) error

Query asks the gossip network of providers if anyone can provide the blocks we're looking for it blocks execution until our conditions are satisfied

func (*GossipRouting) QueryProvider

func (gr *GossipRouting) QueryProvider(p peer.AddrInfo, root cid.Cid, sel ipld.Node) (deal.Offer, error)

QueryProvider asks a provider directly for retrieval conditions

func (*GossipRouting) SetReceiver

func (gr *GossipRouting) SetReceiver(fn ReceiveOffer)

SetReceiver sets a callback to receive offers from gossip routers

func (*GossipRouting) StartProviding

func (gr *GossipRouting) StartProviding(ctx context.Context, fn ResponseFunc) error

StartProviding opens up our gossip subscription and sets our stream handler

type GossipTracer

type GossipTracer struct {
	// contains filtered or unexported fields
}

GossipTracer tracks messages we've seen so we can relay responses back to the publisher

func NewGossipTracer

func NewGossipTracer() *GossipTracer

NewGossipTracer creates a new instance of GossipTracer

func (*GossipTracer) Published

func (gt *GossipTracer) Published(mid string) bool

Published checks if we were the publisher of a message

func (*GossipTracer) Sender

func (gt *GossipTracer) Sender(mid string) (peer.ID, error)

Sender returns the peer who sent us a message

func (*GossipTracer) Trace

func (gt *GossipTracer) Trace(evt *pb.TraceEvent)

Trace gets triggered for every internal gossip sub operation

type Hey

type Hey struct {
	Regions   []RegionCode
	IndexRoot *cid.Cid // If the node has an empty index the root will be nil
}

Hey is the greeting message which takes in network info

func (*Hey) MarshalCBOR

func (t *Hey) MarshalCBOR(w io.Writer) error

func (*Hey) UnmarshalCBOR

func (t *Hey) UnmarshalCBOR(r io.Reader) error

type HeyEvt

type HeyEvt struct {
	Peer      peer.ID
	IndexRoot *cid.Cid // nil index root means empty index i.e. brand new node
}

HeyEvt is emitted when a Hey is received and accessible via the libp2p event bus subscription

type IdxStoreGetter

type IdxStoreGetter interface {
	GetStore(cid.Cid) *multistore.Store
}

IdxStoreGetter returns the store used for retrieving a given index root

type Index

type Index struct {

	// We still need to keep a map in memory
	Refs map[string]*DataRef
	// contains filtered or unexported fields
}

Index contains the information about which objects are currently stored the key is a CID.String(). It also implements a Least Frequently Used cache eviction mechanism to maintain storage withing given bounds inspired by https://github.com/dgrijalva/lfu-go. Content is garbage collected during eviction.

func NewIndex

func NewIndex(ds datastore.Batching, bstore blockstore.Blockstore, opts ...IndexOption) (*Index, error)

NewIndex creates a new Index instance, loading entries into a doubly linked list for faster read and writes

func (*Index) Available

func (idx *Index) Available() uint64

Available returns the storage capacity still available or 0 if full a margin set by lower bound (lb) provides leeway for the eviction algorithm

func (*Index) Bstore

func (idx *Index) Bstore() blockstore.Blockstore

Bstore returns the lower level blockstore storing the hamt

func (*Index) CleanBlockStore

func (idx *Index) CleanBlockStore(ctx context.Context) error

CleanBlockStore removes blocks from blockstore which CIDs are not in index

func (*Index) DropInterest

func (idx *Index) DropInterest(k cid.Cid) error

DropInterest removes a ref from the interest list

func (*Index) DropRef

func (idx *Index) DropRef(k cid.Cid) error

DropRef removes all content linked to a root CID and associated Refs

func (*Index) Flush

func (idx *Index) Flush() error

Flush persists the Refs to the store, callers must take care of the mutex context is not actually used downstream so we use a TODO()

func (*Index) GC

func (idx *Index) GC() error

GC removes tagged CIDs

func (*Index) GetRef

func (idx *Index) GetRef(k cid.Cid) (*DataRef, error)

GetRef gets a ref in the index for a given root CID and increments the LFU list registering a Read

func (*Index) InterestLen

func (idx *Index) InterestLen() int

InterestLen returns the number of interesting refs in our index

func (*Index) Interesting

func (idx *Index) Interesting() (map[*DataRef]byte, error)

Interesting returns a bucket of most interesting refs in the index that could be retrieved to improve the local index

func (*Index) Len

func (idx *Index) Len() int

Len returns the number of roots this index is currently storing

func (*Index) ListRefs

func (idx *Index) ListRefs() ([]*DataRef, error)

ListRefs returns all the content refs currently stored on this node as well as their read frequencies

func (*Index) LoadInterest

func (idx *Index) LoadInterest(r cid.Cid, store cbor.IpldStore) error

LoadInterest loads potential new content in a different doubly linked list in this situation the most popular content is at the back of the list

func (*Index) LoadRoot

func (idx *Index) LoadRoot(r cid.Cid, store cbor.IpldStore) (*hamt.Node, error)

LoadRoot loads a new HAMT root node from a given CID, it can be used to load a node from a different root than the current one for example

func (*Index) PeekRef

func (idx *Index) PeekRef(k cid.Cid) (*DataRef, error)

PeekRef returns a ref from the index without actually registering a read in the LFU

func (*Index) Root

func (idx *Index) Root() cid.Cid

Root returns the HAMT root CID

func (*Index) SetRef

func (idx *Index) SetRef(ref *DataRef) error

SetRef adds a ref in the index and increments the LFU queue

func (*Index) UpdateRef

func (idx *Index) UpdateRef(ref *DataRef) error

UpdateRef updates a ref in the index

type IndexEvt

type IndexEvt struct {
	Root cid.Cid
}

IndexEvt is emitted when a new index is loaded in the replication service

type IndexOption

type IndexOption func(*Index)

IndexOption customizes the behavior of the index

func WithBounds

func WithBounds(up, lo uint64) IndexOption

WithBounds sets the upper and lower bounds of the LFU store

func WithDeleteFunc

func WithDeleteFunc(fn func(DataRef)) IndexOption

WithDeleteFunc sets a deleteFunc callback

func WithSetFunc

func WithSetFunc(fn func(DataRef)) IndexOption

WithSetFunc sets a setFunc callback

type MessageTracker

type MessageTracker interface {
	// Published checks if we are actually the peer expecting this offer
	Published(string) bool
	// Sender returns the peer we think this message should be forwarded to
	Sender(string) (peer.ID, error)
}

MessageTracker returns metadata about messages so we know if they're destined to this host or should be forwarded

type Method

type Method uint64

Method is the replication request method

const (
	// Dispatch is an initial request from a content plublisher
	Dispatch Method = iota
	// FetchIndex is a request from one content provider to another to retrieve their index
	FetchIndex
)

type OfferExecutor

type OfferExecutor interface {
	Execute(deal.Offer, DealExecParams) TxResult
	Confirm(deal.Offer) DealExecParams
	Finish(TxResult)
}

OfferExecutor exposes the methods required to execute offers

type OfferWorker

type OfferWorker interface {
	Start()
	PushFront(deal.Offer)
	PushBack(deal.Offer)
	Close() []deal.Offer
}

OfferWorker is a generic interface to manage the lifecycle of offers

func SelectFirst

func SelectFirst(oe OfferExecutor) OfferWorker

SelectFirst executes the first offer received and buffers other offers during the duration of the transfer. If the transfer hard fails it tries continuing with the following offer and so on.

type Options

type Options struct {
	// Blockstore is used by default for graphsync and metadata storage
	// content should be stored on a multistore for proper isolation.
	Blockstore blockstore.Blockstore
	// MultiStore should be used to interface with content like importing files to store with the exchange
	// or exporting files to disk etc.
	MultiStore *multistore.MultiStore
	// PubSub allows passing a different pubsub instance with alternative routing algorithms. Default is Gossip.
	PubSub *pubsub.PubSub
	// GraphSync is used as Transport for DataTransfer, if you're providing a DataTransfer manager instance
	//  you don't need to set it.
	GraphSync graphsync.GraphExchange
	// DataTransfer is a single manager instance used across every retrieval operation.
	DataTransfer datatransfer.Manager
	// Wallet is a minimal interface for signing things
	Wallet wallet.Driver
	// RepoPath is where to persist any file to disk. It's actually only used for the DataTransfer CID list
	// recommend passing the same path as the datastore.
	RepoPath string
	// FilecoinRPCEndpoint is the websocket url to connect to a remote Lotus node.
	FilecoinRPCEndpoint string
	// FilecoinRPCHeader provides any required header depending on the Lotus server policy.
	FilecoinRPCHeader http.Header
	// FilecoinAPI can be passed directly instead of providing an endpoint. This can be useful in case you are.
	// in an environment which already may have the API instance.
	FilecoinAPI filecoin.API
	// GossipTracer is provided if you are using an external PubSub instance.
	GossipTracer *GossipTracer
	// Regions is the geographic region this exchange should serve. Defaults to Global only.
	Regions []Region
	// Capacity is the maximum storage capacity in bytes this exchange can handle. Once we capacity is reached,
	// least frequently used content is evicted to make more room for new content.
	// Default is 10GB.
	Capacity uint64
	// ReplInterval is the replication interval after which a worker will try to retrieve fresh new content
	// on the network
	ReplInterval time.Duration
	// WatchQueriesFunc is an optional function that will return any queries received as a provider
	WatchQueriesFunc func(deal.Query)
	// WatchEvictionFunc is an optional function that will yield the root cid for any evicted content.
	WatchEvictionFunc func(DataRef)
	// WatchAdditionFunc is an optional callback notifying when content is added to the index.
	WatchAdditionFunc func(DataRef)
	// PPB is the price per byte the exchange sets when offering a deal.
	PPB big.Int
}

Options are optional modules for the exchange. We fill each field with a default instance when not provided

type PRecord

type PRecord struct {
	Provider   peer.ID
	PayloadCID cid.Cid
}

PRecord is a provider <> cid mapping for recording who is storing what content

type Peer

type Peer struct {
	Regions []RegionCode
	Latency time.Duration
}

Peer contains information recorded while interacted with a peer

type PeerMgr

type PeerMgr struct {
	// contains filtered or unexported fields
}

PeerMgr is in charge of maintaining an optimal network of peers to coordinate with

func NewPeerMgr

func NewPeerMgr(h host.Host, idx *Index, regions []Region) *PeerMgr

NewPeerMgr prepares a new PeerMgr instance

func (*PeerMgr) Peers

func (pm *PeerMgr) Peers(n int, rl []Region, ignore map[peer.ID]bool, init []peer.ID) []peer.ID

Peers returns n active peers for a given list of regions and peers to ignore

func (*PeerMgr) Run

func (pm *PeerMgr) Run(ctx context.Context) error

type QueryStream

type QueryStream struct {
	// contains filtered or unexported fields
}

QueryStream wraps convenience methods for writing and reading CBOR messages from a stream.

func (*QueryStream) Close

func (qs *QueryStream) Close() error

Close the underlying stream

func (*QueryStream) OtherPeer

func (qs *QueryStream) OtherPeer() peer.ID

OtherPeer returns the peer ID of the other peer at the end of the stream

func (*QueryStream) ReadOffer

func (qs *QueryStream) ReadOffer() (deal.Offer, error)

ReadOffer reads and decodes a CBOR encoded offer message.

func (*QueryStream) ReadQuery

func (qs *QueryStream) ReadQuery() (deal.Query, error)

ReadQuery reads and decodes a CBOR encoded Query from a stream buffer.

func (*QueryStream) ReadQueryResponse

func (qs *QueryStream) ReadQueryResponse() (deal.QueryResponse, error)

ReadQueryResponse reads and decodes a QueryResponse CBOR message from a stream buffer.

func (*QueryStream) WriteOffer

func (qs *QueryStream) WriteOffer(o deal.Offer) error

WriteOffer encodes and writes an Offer message to byte stream.

func (*QueryStream) WriteQuery

func (qs *QueryStream) WriteQuery(q deal.Query) error

WriteQuery encodes and writes a CBOR Query message to a stream.

func (*QueryStream) WriteQueryResponse

func (qs *QueryStream) WriteQueryResponse(qr deal.QueryResponse) error

WriteQueryResponse encodes and writes a CBOR QueryResponse message to a stream.

type ReceiveOffer

type ReceiveOffer func(deal.Offer)

ReceiveOffer is a callback for receiving a new offer

type ReceiveResponse

type ReceiveResponse func(peer.AddrInfo, deal.QueryResponse)

ReceiveResponse is fired every time we get a response

type Region

type Region struct {
	// The official region name should be unique to avoid clashing with other regions.
	Name string
	// Code is a compressed identifier for the region.
	Code RegionCode
	// Bootstrap is a list of peers that can be dialed to find other peers in that region
	Bootstrap []string
}

Region represents a CDN subnetwork.

func ParseRegions

func ParseRegions(list []string) []Region

ParseRegions converts region names to region structs

func RegionFromTopic

func RegionFromTopic(topic string) Region

RegionFromTopic formats a topic string into a Region struct

type RegionCode

type RegionCode uint64

RegionCode defines a subnetwork code

const (
	// GlobalRegion region is a free global network for anyone to try the network
	GlobalRegion RegionCode = iota
	// AsiaRegion is a specific region to connect caches in the Asian area
	AsiaRegion
	// AfricaRegion is a specific region in the African geographic area
	AfricaRegion
	// SouthAmericaRegion is a specific region
	SouthAmericaRegion
	// NorthAmericaRegion is a specific region to connect caches in the North American area
	NorthAmericaRegion
	// EuropeRegion is a specific region to connect caches in the European area
	EuropeRegion
	// OceaniaRegion is a specific region
	OceaniaRegion
	// CustomRegion is a user defined region
	CustomRegion = math.MaxUint64
)

type Replication

type Replication struct {
	// contains filtered or unexported fields
}

Replication manages the network replication scheme, it keeps track of read and write requests and decides whether to join a replication scheme or not

func NewReplication

func NewReplication(h host.Host, idx *Index, dt datatransfer.Manager, rtv RoutedRetriever, opts Options) (*Replication, error)

NewReplication starts the exchange replication management system

func (*Replication) AddStore

func (r *Replication) AddStore(k cid.Cid, sid multistore.StoreID) error

AddStore assigns a store for a given root cid and store ID

func (*Replication) AuthorizePull

func (r *Replication) AuthorizePull(k cid.Cid, p peer.ID)

AuthorizePull adds a peer to a set giving authorization to pull content without payment We assume that this authorizes the peer to pull as many links from the root CID as they can It runs on the client side to authorize caches

func (*Replication) Dispatch

func (r *Replication) Dispatch(root cid.Cid, size uint64, opt DispatchOptions) (chan PRecord, error)

Dispatch to the network until we have propagated the content to enough peers

func (*Replication) GetStore

func (r *Replication) GetStore(k cid.Cid) *multistore.Store

GetStore returns the store used for a given root index

func (*Replication) NewRequestStream

func (r *Replication) NewRequestStream(dest peer.ID) (*RequestStream, error)

NewRequestStream opens a multi stream with the given peer and sets up the interface to write requests to it

func (*Replication) RmStore

func (r *Replication) RmStore(k cid.Cid)

RmStore cleans up the store when it is not needed anymore

func (*Replication) Start

func (r *Replication) Start(ctx context.Context) error

Start initiates listeners to update our scheme if new peers join

func (*Replication) ValidatePull

func (r *Replication) ValidatePull(
	isRestart bool,
	chid datatransfer.ChannelID,
	receiver peer.ID,
	voucher datatransfer.Voucher,
	baseCid cid.Cid,
	selector ipld.Node) (datatransfer.VoucherResult, error)

ValidatePull returns a stubbed result for a pull validation

func (*Replication) ValidatePush

func (r *Replication) ValidatePush(
	isRestart bool,
	chid datatransfer.ChannelID,
	sender peer.ID,
	voucher datatransfer.Voucher,
	baseCid cid.Cid,
	selector ipld.Node) (datatransfer.VoucherResult, error)

ValidatePush returns a stubbed result for a push validation

type ReplicationResponse

type ReplicationResponse struct {
	Proof *cid.Cid
}

ReplicationResponse is sent by the recipient to express if they do or do not have content

func (*ReplicationResponse) MarshalCBOR

func (t *ReplicationResponse) MarshalCBOR(w io.Writer) error

func (*ReplicationResponse) UnmarshalCBOR

func (t *ReplicationResponse) UnmarshalCBOR(r io.Reader) error

type Request

type Request struct {
	Method     Method
	PayloadCID cid.Cid
	Size       uint64
	Seed       []byte // is added to seed the content hash when proving a peer has the content
}

Request describes the content to pull

func (*Request) MarshalCBOR

func (t *Request) MarshalCBOR(w io.Writer) error

func (Request) Type

Type defines Request as a datatransfer voucher for pulling the data from the request

func (*Request) UnmarshalCBOR

func (t *Request) UnmarshalCBOR(r io.Reader) error

type RequestStream

type RequestStream struct {
	// contains filtered or unexported fields
}

RequestStream allows reading and writing CBOR encoded messages to a stream

func (*RequestStream) Close

func (rs *RequestStream) Close() error

Close the stream

func (*RequestStream) OtherPeer

func (rs *RequestStream) OtherPeer() peer.ID

OtherPeer returns the peer ID of the peer at the other end of the stream

func (*RequestStream) ReadRequest

func (rs *RequestStream) ReadRequest() (Request, error)

ReadRequest reads and decodes a CBOR encoded Request message from a stream buffer

func (*RequestStream) ReadResponse

func (rs *RequestStream) ReadResponse() (ReplicationResponse, error)

ReadResponse from the replication protocol stream

func (*RequestStream) WriteRequest

func (rs *RequestStream) WriteRequest(m Request) error

WriteRequest encodes and writes a Request message to a stream

func (*RequestStream) WriteResponse

func (rs *RequestStream) WriteResponse(r ReplicationResponse) error

WriteResponse to the replication protocol stream

type ResponseFunc

type ResponseFunc func(context.Context, peer.ID, Region, deal.Query) (deal.Offer, error)

ResponseFunc takes a Query and returns an Offer or an error if request is declined

type RoutedRetriever

type RoutedRetriever interface {
	FindAndRetrieve(context.Context, cid.Cid) error
}

RoutedRetriever is a generic interface providing a method to find and retrieve content on the exchange

type SelectionStrategy

type SelectionStrategy func(OfferExecutor) OfferWorker

SelectionStrategy is a function that returns an OfferWorker with a defined strategy for selecting offers over a given session

type Status

type Status map[string]Entry

Status represents our staged values

func (Status) String

func (s Status) String() string

type StoreConfigurableTransport

type StoreConfigurableTransport interface {
	UseStore(datatransfer.ChannelID, ipld.LinkSystem) error
}

StoreConfigurableTransport defines the methods needed to configure a data transfer transport use a unique store for a given request

type Tx

type Tx struct {

	// Err exposes any error reported by the session during use
	Err error
	// contains filtered or unexported fields
}

Tx is an exchange transaction which may contain multiple DAGs to be exchanged with a set of connected peers

func (*Tx) AppendPaths

func (tx *Tx) AppendPaths(paths ...string)

AppendPaths used to combine multiple queries

func (*Tx) ApplyOffer

func (tx *Tx) ApplyOffer(offer deal.Offer)

ApplyOffer allows executing a transaction based on an existing offer without querying the routing service By the default the offer is added at the front of the queue. If there is already an offer in progress it will thus execute after it or if not will execute immediately.

func (*Tx) Close

func (tx *Tx) Close() error

Close removes any listeners and stream handlers related to a session If the transaction was not committed, any staged content will be deleted

func (*Tx) Commit

func (tx *Tx) Commit(dopts ...DispatchOption) error

Commit sends the transaction on the exchange

func (*Tx) Confirm

func (tx *Tx) Confirm(of deal.Offer) DealExecParams

Confirm takes an offer and blocks to wait for user confirmation before returning true or false

func (*Tx) Done

func (tx *Tx) Done() <-chan TxResult

Done returns a channel that receives any resulting error from the latest operation

func (*Tx) Entries

func (tx *Tx) Entries() ([]Entry, error)

Entries returns all the entries in the root map of this transaction sorted by key

func (*Tx) Execute

func (tx *Tx) Execute(of deal.Offer, p DealExecParams) TxResult

Execute starts a retrieval operation for a given offer and returns the deal ID for that operation

func (*Tx) Finish

func (tx *Tx) Finish(res TxResult)

Finish tells the tx all operations have been completed

func (*Tx) GetFile

func (tx *Tx) GetFile(k string) (files.Node, error)

GetFile retrieves a file associated with the given key from the cache

func (*Tx) IsLocal

func (tx *Tx) IsLocal(key string) bool

IsLocal tells us if this node is storing the content of this transaction or if it needs to retrieve it

func (*Tx) Keys

func (tx *Tx) Keys() ([]string, error)

Keys lists the keys for all the entries in the root map of this transaction

func (*Tx) Ongoing

func (tx *Tx) Ongoing() <-chan DealRef

Ongoing exposes the ongoing channel to get the reference of any in progress deals

func (*Tx) Put

func (tx *Tx) Put(key string, value cid.Cid, size int64) error

Put a DAG for a given key in the transaction

func (*Tx) Query

func (tx *Tx) Query(paths ...string) error

Query the discovery service for offers

func (*Tx) QueryOffer

func (tx *Tx) QueryOffer(info peer.AddrInfo, sel ipld.Node) (deal.Offer, error)

QueryOffer allows querying directly from a given peer

func (*Tx) Ref

func (tx *Tx) Ref() *DataRef

Ref returns the DataRef associated with this transaction

func (*Tx) Root

func (tx *Tx) Root() cid.Cid

Root returns the current root CID of the transaction

func (*Tx) RootFor

func (tx *Tx) RootFor(key string) (cid.Cid, error)

RootFor returns the root of a given key @TODO: improve scaling and performance for accessing subroots

func (*Tx) SetAddress

func (tx *Tx) SetAddress(addr address.Address)

SetAddress to use for funding the retriebal

func (*Tx) Size

func (tx *Tx) Size() int64

Size returns the current size of content cached by the transaction

func (*Tx) Status

func (tx *Tx) Status() (Status, error)

Status returns a list of the current entries

func (*Tx) Store

func (tx *Tx) Store() *multistore.Store

Store exposes the underlying store

func (*Tx) StoreID

func (tx *Tx) StoreID() multistore.StoreID

StoreID exposes the ID of the underlying store

func (*Tx) Triage

func (tx *Tx) Triage() (DealSelection, error)

Triage allows manually triaging the next selection

func (*Tx) WatchDispatch

func (tx *Tx) WatchDispatch(fn func(r PRecord))

WatchDispatch registers a function to be called every time the content is received by a peer

type TxOption

type TxOption func(*Tx)

TxOption sets optional fields on a Tx struct

func WithCodec

func WithCodec(codec uint64) TxOption

WithCodec changes the codec to use for aggregating the entries in a single root

func WithRoot

func WithRoot(r cid.Cid) TxOption

WithRoot assigns a new root to the transaction if we are working with a DAG that wasn't created during this transaction

func WithSize

func WithSize(size int64) TxOption

WithSize allows overriding the size manually if we know it ahead of time

func WithStrategy

func WithStrategy(strategy SelectionStrategy) TxOption

WithStrategy starts a new strategy worker to handle incoming discovery results

func WithTriage

func WithTriage() TxOption

WithTriage allows a transaction to manually prompt for external confirmation before executing an offer

type TxResult

type TxResult struct {
	Err   error
	Size  uint64 // Size is the total amount of bytes exchanged during this transaction
	Spent abi.TokenAmount
	PayCh address.Address
}

TxResult returns metadata about the transaction including a potential error if something failed

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL