streamer

package
v0.0.32 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 15, 2021 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	RequesterDeadline       = 5 * time.Minute
	MaxPackSize       int64 = 100000000
)
View Source
var (
	ErrUnknownMsgType = fmt.Errorf("unknown message type")
	ErrNopeReceived   = fmt.Errorf("nope received")
)
View Source
var (
	ErrNoProviderFound        = fmt.Errorf("no provider found")
	ErrEndObjMustExistLocally = fmt.Errorf("end object must already exist in the local repo")
)
View Source
var (
	ObjectStreamerProtocolID = protocol.ID("/object/1.0")
)

Functions

func GetCommitWithAncestors

func GetCommitWithAncestors(
	ctx context.Context,
	c dht3.Streamer,
	repoGetter repo.GetLocalRepoFunc,
	args dht3.GetAncestorArgs) (packfiles []io.ReadSeekerCloser, err error)

GetCommitWithAncestors gets a commit and its ancestors that do not exist in the local repository.

It stops fetching ancestors when it finds an ancestor matching the given end commit hash. If EndHash is true, it is expected that the EndHash commit exist locally. It will skip the start commit if it exist locally but try to add its parent to the internal wantlist allowing it to find ancestors that may have not been fetched before. Packfiles returned are expected to be closed by the caller. If ResultCB is set, packfiles will be passed to the callback and not returned. If ResultCB returns an error, the method exits with that error. Use ErrExit to exit with a nil error.

func GetTaggedCommitWithAncestors

func GetTaggedCommitWithAncestors(
	ctx context.Context,
	st dht3.Streamer,
	repoGetter repo.GetLocalRepoFunc,
	args dht3.GetAncestorArgs) (packfiles []io.ReadSeekerCloser, err error)

GetTaggedCommitWithAncestors gets the ancestors of the commit pointed by the given tag that do not exist in the local repository.

  • If the start tag points to another tag, the function is recursively called on the nested tag.
  • If the start tag does not point to a commit or a tag, the tag's packfile is returned.
  • If EndHash is set, it must be an already existing tag pointing to a commit or a tag. If it points to a tag, same rule is applied to the tag recursively.
  • If EndHash is set, it will stop fetching ancestors when it finds an ancestor matching the commit pointed by the end hash tag.
  • Packfiles returned are expected to be closed by the caller.
  • If ResultCB is set, packfiles will be passed to the callback as soon as they are received.
  • If ResultCB is set, empty slice will be returned by the method.
  • If ResultCB returns an error, the method exits with that error. Use ErrExit to exit with a nil error.

func MakeHaveCacheKey

func MakeHaveCacheKey(repoName string, hash plumb.Hash) string

MakeHaveCacheKey returns a key for storing HaveCache entries.

Types

type BasicObjectRequester

type BasicObjectRequester struct {
	OnWantResponseHandler func(network.Stream) error
	OnSendResponseHandler func(network.Stream) (io.ReadSeekerCloser, error)
	// contains filtered or unexported fields
}

BasicObjectRequester manages object download sessions between multiple providers

func NewBasicObjectRequester

func NewBasicObjectRequester(args RequestArgs) *BasicObjectRequester

NewBasicObjectRequester creates an instance of BasicObjectRequester

func (*BasicObjectRequester) AddProviderStream

func (r *BasicObjectRequester) AddProviderStream(streams ...network.Stream)

AddProviderStream adds provider streams

func (*BasicObjectRequester) Do

func (r *BasicObjectRequester) Do(ctx context.Context) (result *PackResult, err error)

Do starts the object request protocol

func (*BasicObjectRequester) DoWant

func (r *BasicObjectRequester) DoWant(ctx context.Context) (err error)

DoWant sends 'WANT' messages to providers, then caches the stream of providers that responded with 'HAVE' message.

func (*BasicObjectRequester) GetProviderStreams

func (r *BasicObjectRequester) GetProviderStreams() []network.Stream

GetProviderStreams returns the provider's streams

func (*BasicObjectRequester) OnSendResponse

func (r *BasicObjectRequester) OnSendResponse(s network.Stream) (io.ReadSeekerCloser, error)

OnSendResponse handles incoming packfile data from remote peer. If the remote peer responds with 'NOPE', it will be logged in the nope cache.

func (*BasicObjectRequester) OnWantResponse

func (r *BasicObjectRequester) OnWantResponse(s network.Stream) error

OnWantResponse handles a remote peer's response to a WANT message. If the remote stream responds with 'HAVE', it will be cached. If the remote stream responds with 'NOPE', it will be logged in the nope cache.

func (*BasicObjectRequester) Write

func (r *BasicObjectRequester) Write(ctx context.Context, prov peer.AddrInfo, pid protocol.ID, data []byte) (network.Stream, error)

Write writes a message to a provider

func (*BasicObjectRequester) WriteToStream

func (r *BasicObjectRequester) WriteToStream(str network.Stream, data []byte) error

WriteToStream writes a message to a stream

type BasicObjectStreamer

type BasicObjectStreamer struct {
	OnWantHandler    WantSendHandler
	OnSendHandler    WantSendHandler
	RepoGetter       repo.GetLocalRepoFunc
	PackObject       plumbing.CommitPacker
	MakeRequester    MakeObjectRequester
	PackObjectGetter plumbing.PackObjectFinder
	// contains filtered or unexported fields
}

BasicObjectStreamer implements Streamer. It provides a mechanism for announcing or transferring repository objects to/from the DHT.

func NewStreamer

func NewStreamer(dht dht3.DHT, cfg *config.AppConfig) *BasicObjectStreamer

NewStreamer creates an instance of BasicObjectStreamer

func (*BasicObjectStreamer) GetCommit

func (c *BasicObjectStreamer) GetCommit(
	ctx context.Context,
	repoName string,
	hash []byte) (io.ReadSeekerCloser, *object.Commit, error)

GetCommit gets a single commit by hash. It returns the packfile, the commit object and error.

func (*BasicObjectStreamer) GetCommitWithAncestors

func (c *BasicObjectStreamer) GetCommitWithAncestors(ctx context.Context,
	args dht3.GetAncestorArgs) (packfiles []io.ReadSeekerCloser, err error)

GetCommitWithAncestors gets a commit and also its ancestors that do not exist in the local repository.

It will stop fetching ancestors when it finds an ancestor matching the given end hash. If EndHash is true, it is expected that EndHash commit must exist locally. Packfiles returned are expected to be closed by the caller. If ResultCB is set, packfiles will be passed to the callback and not returned. If ResultCB returns an error, the method exits with that error. Use ErrExit to exit with a nil error.

func (*BasicObjectStreamer) GetProviders

func (c *BasicObjectStreamer) GetProviders(ctx context.Context, repoName string, objKey []byte) ([]peer.AddrInfo, error)

GetProviders find providers that may be able to provide an object.

It finds providers that have announced their ability to provide an object. It also finds providers that have announce their ability to provide a repository - these providers are used as fallback in cases where an object may exist in a repository but not announced.

TODO: In the future, we should sort the providers by rank such that hosts,

popular remotes and good-behaved providers are prioritized.

func (*BasicObjectStreamer) GetTag

func (c *BasicObjectStreamer) GetTag(
	ctx context.Context,
	repoName string,
	hash []byte) (io.ReadSeekerCloser, *object.Tag, error)

GetTag gets a single annotated tag by hash.

It returns the packfile, the tag object and error.

func (*BasicObjectStreamer) GetTaggedCommitWithAncestors

func (c *BasicObjectStreamer) GetTaggedCommitWithAncestors(ctx context.Context,
	args dht3.GetAncestorArgs) (packfiles []io.ReadSeekerCloser, err error)

GetTaggedCommitWithAncestors gets the ancestors of the commit pointed by the given tag that do not exist in the local repository.

  • If EndHash is set, it must be an already existing tag pointing to a commit.
  • If EndHash is set, it will stop fetching ancestors when it finds an ancestor matching the commit pointed by the end hash tag.
  • Packfiles returned are expected to be closed by the caller.
  • If ResultCB is set, packfiles will be passed to the callback as soon as they are received.
  • If ResultCB is set, empty slice will be returned by the method.
  • If ResultCB returns an error, the method exits with that error. Use ErrExit to exit with a nil error.

func (*BasicObjectStreamer) Handler

func (c *BasicObjectStreamer) Handler(s network.Stream)

Handler handles the lifecycle of the object streaming protocol

func (*BasicObjectStreamer) OnRequest

func (c *BasicObjectStreamer) OnRequest(s network.Stream) (bool, error)

OnRequest handles incoming commit object requests

func (*BasicObjectStreamer) OnSendRequest

func (c *BasicObjectStreamer) OnSendRequest(repo string, hash []byte, s network.Stream) error

OnSendRequest handles incoming "SEND" requests.

func (*BasicObjectStreamer) OnWantRequest

func (c *BasicObjectStreamer) OnWantRequest(repo string, hash []byte, s network.Stream) error

OnWantRequest handles incoming "WANT" requests

func (*BasicObjectStreamer) SetProviderTracker

func (c *BasicObjectStreamer) SetProviderTracker(t dht3.ProviderTracker)

SetProviderTracker overwrites the default provider tracker.

type MakeObjectRequester

type MakeObjectRequester func(args RequestArgs) ObjectRequester

MakeObjectRequester describes a function type for creating an object requester

type ObjectRequester

type ObjectRequester interface {
	Write(ctx context.Context, prov peer.AddrInfo, pid protocol.ID, data []byte) (network.Stream, error)
	WriteToStream(str network.Stream, data []byte) error
	DoWant(ctx context.Context) (err error)
	Do(ctx context.Context) (result *PackResult, err error)
	GetProviderStreams() []network.Stream
	OnWantResponse(s network.Stream) error
	OnSendResponse(s network.Stream) (io.ReadSeekerCloser, error)
	AddProviderStream(streams ...network.Stream)
}

type PackResult

type PackResult struct {
	Pack       io.ReadSeekerCloser
	RemotePeer peer.ID
}

type RequestArgs

type RequestArgs struct {

	// Host is the libp2p network host
	Host host.Host

	// Providers are addresses of providers
	Providers []peer.AddrInfo

	// ReposDir is the root directory for all repos
	ReposDir string

	// RepoName is the name of the repo to query object from
	RepoName string

	// Key is the requested object key
	Key []byte

	// Log is the app logger
	Log logger.Logger

	// BasicProviderTracker for recording and tracking provider behaviour
	ProviderTracker dht2.ProviderTracker
}

RequestArgs contain arguments for NewBasicObjectRequester function.

type WantSendHandler

type WantSendHandler func(repo string, hash []byte, s network.Stream) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL