peering

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 1, 2022 License: MIT Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ModuleName = "PEERING"

	ConnectionRefuseTimeout = 10 * time.Second
	MaxRetries              = 1
	DefaultWorkers          = 50
)
View Source
var (
	PrunedErrorDistribution = prometheus.NewGaugeVec(prometheus.GaugeOpts{
		Namespace: "peering",
		Name:      "pruned_error_distribution",
		Help:      "Filter peers in Peer Queue by errors that were tracked",
	},
		[]string{"controldist"},
	)
	ErrorAttemptDistribution = prometheus.NewGaugeVec(prometheus.GaugeOpts{
		Namespace: "peering",
		Name:      "iteration_attempts_by_category_distribution",
		Help:      "Filter attempts in Peer Queue by errors that were tracked",
	},
		[]string{"controlAttemptdist"},
	)
	PeersAttemptedInLastIteration = prometheus.NewGauge(prometheus.GaugeOpts{
		Namespace: "peering",
		Name:      "peers_attempted_last_iteration",
		Help:      "The number of discovered peers with the crawler",
	})
	PeerstoreIterTime = prometheus.NewGauge(prometheus.GaugeOpts{
		Namespace: "peering",
		Name:      "peerstore_iteration_time_secs",
		Help:      "The time that the crawler takes to connect the entire peerstore in secs",
	})
	IterForcingNextConnTime = prometheus.NewGauge(prometheus.GaugeOpts{
		Namespace: "peering",
		Name:      "iteration_forcing_next_conn_time",
		Help:      "The time reported by the peer that forced the new peerstore iteration",
	})
)

List of metrics that we are going to export

View Source
var (
	PeerStratModuleName = "PRUNING"

	// Default Delays
	DeprecationTime       = 1024 * time.Minute // mMinutes after first negative connection that has to pass to deprecate a peer.
	DefaultNegDelay       = 12 * time.Hour     // Default delay that will be applied for those deprecated peers.
	DefaultPossitiveDelay = 6 * time.Hour      // Default delay after each positive severe negative attempts.
	StartExpD             = 2 * time.Minute    // Starting delay that will serve for the Exponential Delay.
	// Control variables
	MinIterTime = 15 * time.Second // Minimum time that has to pass before iterating again.

)
View Source
var (

	// define the types of delays in string
	PositiveDelayType           string = "Positive"
	NegativeWithHopeDelayType   string = "NegativeWithHope"
	NegativeWithNoHopeDelayType string = "NegativeWithNoHope"
	ZeroDelayType               string = "Zero"
	Minus1DelayType             string = "Minus1"
	TimeoutDelayType            string = "Timeout"

	MaxDelayTime time.Duration = time.Duration(math.Pow(2, 11) * float64(time.Minute))

	// define the initial delay we apply in each of the types
	InitialDelayTime = map[string]time.Duration{
		PositiveDelayType:           128 * time.Minute,
		NegativeWithHopeDelayType:   2 * time.Minute,
		NegativeWithNoHopeDelayType: 256 * time.Minute,
		ZeroDelayType:               0 * time.Hour,
		Minus1DelayType:             -1000 * time.Hour,
		TimeoutDelayType:            32 * time.Minute,
	}
)

Functions

func ErrorToDelayType

func ErrorToDelayType(errString string) string

ErrorToDelayType: Transforms an error into a DelayType. @param errString: the string to analyze. @return the categroy type in string format.

func ResetMapValues

func ResetMapValues(inputMap sync.Map) sync.Map

ResetMapValues Iterates over a string int map and resets all values to 0. @return the reset map

Types

type BaseDelay

type BaseDelay struct {
	DelayDegree int    // number of times we have delayed
	Type        string // type of delay we apply (positive, negativewithhope...)
}

All of our delay types will include this base, as they all have the same data just the delay calculation is different

func NewBaseDelay

func NewBaseDelay(inputType string) *BaseDelay

NewBaseDelay Constructor. We use pointers so the methods are directly added to inherited structs. @param inputType: the type of delay we want to set (just string).

func (*BaseDelay) AddDegree

func (bd *BaseDelay) AddDegree()

AddDegree: This method will add 1 to the delaydegree.

func (*BaseDelay) GetDegree

func (bd *BaseDelay) GetDegree() int

GetDegree: @return the delaydegree.

func (BaseDelay) GetType

func (bd BaseDelay) GetType() string

GetType: @return the type in string format.

func (*BaseDelay) SetDegree

func (bd *BaseDelay) SetDegree(newDegree int)

SetDegree: This method will the delaydegree.

type ConnectionAttemptStatus

type ConnectionAttemptStatus struct {
	Peer       models.Peer // TODO: right now just sending the entire info about the peer, (recheck after Peer struct subdivision)
	Attempts   int32       // attemps tried on the given peer
	Timestamp  time.Time   // Timestamp of when was the attempt done
	Successful bool        // Whether the connection attempt was successfully done or not
	RecError   error       // if the connection attempt reported any error, nil otherwise

}

ConnectionAttemptStatus * It is the struct that compiles the data of an active connection attempt done by the host * The struct will be shared between peering and strategy.

type DelayObject

type DelayObject interface {
	CalculateDelay() time.Duration
	AddDegree()
	GetType() string
	SetDegree(int)
	GetDegree() int
}
Basic Structs

the interface to use and defines which methods should be implemented

func ReturnAccordingDelayObject

func ReturnAccordingDelayObject(delayType string) DelayObject

ReturnAccordingDelayObject @param delayType: string representing a type of delay. @return the according delayobject

type Minus1Delay

type Minus1Delay struct {
	*BaseDelay
}

Minus1Delay: Delay type applied to new peers coming from the Discovery5 service. These are always set to be connected the first ones.

func NewMinus1Delay

func NewMinus1Delay() Minus1Delay

func (Minus1Delay) CalculateDelay

func (d Minus1Delay) CalculateDelay() time.Duration

CalculateDelay: This method will calculate the delay to be applied based on degree. @return the delay in Time.Duration format.

type NegativeDelay

type NegativeDelay struct {
	*BaseDelay
}

NegativeDelay: Delay type applied to peers that had any sort of error. The delays are exponentially increased. The child clases will apply a different type which only varies the baseDelay time.

func NewNegativeDelay

func NewNegativeDelay(inputType string) *NegativeDelay

func (NegativeDelay) CalculateDelay

func (d NegativeDelay) CalculateDelay() time.Duration

CalculateDelay: This method will calculate the delay to be applied based on degree. @return the delay in Time.Duration format.

type NegativeWithHopeDelay

type NegativeWithHopeDelay struct {
	*NegativeDelay
}

NegativeWithHopeDelay: In case of "connection reset by peer", "connection refused", "context deadline exceeded", "dial backoff", "metadata error" and default. Usually peers that have returned and error but could possibly be identified. baseDelay = 2 minutes.

func NewNegativeWithHopeDelay

func NewNegativeWithHopeDelay() NegativeWithHopeDelay

type NegativeWithNoHopeDelay

type NegativeWithNoHopeDelay struct {
	*NegativeDelay
}

NegativeWithNoHopeDelay: In case of "no route to host", "unreachable network", "peer id mismatch", "dial to self attempted". Usually peers that have returned and error and are not probably running anymore. baseDelay = 256 minutes.

func NewNegativeWithNoHopeDelay

func NewNegativeWithNoHopeDelay() NegativeWithNoHopeDelay

func NewTimeoutDelay

func NewTimeoutDelay() NegativeWithNoHopeDelay

type PeerQueue

type PeerQueue struct {
	PeerList []*PrunedPeer
	PeerMap  sync.Map
	// contains filtered or unexported fields
}

PeerQueue: Auxiliar peer array and map list to keep the list of peers sorted by connection time, and still able to modify in a short time the values of each peer.

func NewPeerQueue

func NewPeerQueue() PeerQueue

NewPeerQueue: Constructor of a NewPeerQueue. @return new PeerQueue.

func (*PeerQueue) AddPeer

func (c *PeerQueue) AddPeer(pPeer *PrunedPeer)

AddPeer Add a peer to the peerqueue. @params pPeer: the pruned peer to add

func (*PeerQueue) DelayDistribution

func (c *PeerQueue) DelayDistribution() sync.Map

DelayDistribution: @return the distribution of the delays in a map.

func (*PeerQueue) GetPeer

func (c *PeerQueue) GetPeer(peerID string) (*PrunedPeer, bool)

GetPeer: Retrieves the info of the peer requested from args. @params peerID: string of the peerID that we want to find. @return pointer to pruned peer. @return bool, true if exists, false if doesn't.

func (*PeerQueue) IsPeerAlready

func (c *PeerQueue) IsPeerAlready(peerID string) bool

IsPeerAlready: Check whether a peer is already in the Queue. @params peerID: string of the peerID that we want to find. @return true is peer is already, false if not.

func (PeerQueue) Len

func (c PeerQueue) Len() int

Len is part of sort.Interface. We use the peer list to get the length of the array.

func (PeerQueue) Less

func (c PeerQueue) Less(i, j int) bool

Less is part of sort.Interface. We use c.PeerList.NextConnection as the value to sort by.

func (*PeerQueue) SortPeerList

func (c *PeerQueue) SortPeerList()

SortPeerList: Sort the PeerQueue array leaving at the beginning the peers with the shorter next peer connection.

func (*PeerQueue) Swap

func (c *PeerQueue) Swap(i, j int)

Swap is part of sort.Interface.

func (*PeerQueue) UpdatePeerListFromPeerStore

func (c *PeerQueue) UpdatePeerListFromPeerStore(peerstore *db.PeerStore) error

UpdatePeerListFromPeerStore This method will refresh the peerqueue with the peerstore. Basically we add those peers that did not exist before in the peerqueue. @param peerstore: db where to read from.

type PeeringOption

type PeeringOption func(*PeeringService) error

func WithPeeringStrategy

func WithPeeringStrategy(strategy PeeringStrategy) PeeringOption

type PeeringService

type PeeringService struct {
	PeerStore *db.PeerStore

	// Control Flags
	Timeout    time.Duration
	MaxRetries int
	// contains filtered or unexported fields
}

PeeringService is the main service that will connect peers from the given peerstore and using the given Host. It will use the specified peering strategy, which might difer/change from the testing or desired purposes of the run.

func NewPeeringService

func NewPeeringService(
	ctx context.Context,
	h *hosts.BasicLibp2pHost,
	peerstore *db.PeerStore,
	opts ...PeeringOption) (PeeringService, error)

Constructor

func (*PeeringService) Run

func (c *PeeringService) Run()

Run: Main peering event selector. For every next peer received from the strategy, attempt the connection and record the status of this one. Notify the strategy of any conn/disconn recorded.

func (*PeeringService) ServeMetrics

func (c *PeeringService) ServeMetrics()

ServeMetrics: This method will serve the global peerstore values to the local prometheus instance.

type PeeringStrategy

type PeeringStrategy interface {
	// one channel to give the next peer, one to request the second one
	Run() chan models.Peer
	Type() string
	// Peering Strategy interaction
	NextPeer()
	NewConnectionAttempt(ConnectionAttemptStatus)
	NewConnectionEvent(hosts.ConnectionEvent)
	NewIdentificationEvent(hosts.IdentificationEvent)
	// Prometheus Export Calls
	LastIterTime() float64
	IterForcingNextConnTime() string
	AttemptedPeersSinceLastIter() int64
	ControlDistribution() sync.Map
	GetErrorAttemptDistribution() sync.Map
}

Strategy is the common interface the any desired Peering Strategy should follow TODO: -Still waiting to be defined to make it official

type PositiveDelay

type PositiveDelay struct {
	*BaseDelay // include it as pointer to have the methods added directly
}

func NewPositiveDelay

func NewPositiveDelay() PositiveDelay

NewPositiveDelay: Constructor. @return a PositiveDelay object.

func (PositiveDelay) CalculateDelay

func (d PositiveDelay) CalculateDelay() time.Duration

CalculateDelay: This method will calculate the delay to be applied based on degree. @return the delay in Time.Duration format.

type PrunedPeer

type PrunedPeer struct {
	PeerID                   string
	DelayObj                 DelayObject // define the delay to connect based on error
	BaseConnectionTimestamp  time.Time   // define the first event. To calculate the next connection we sum this with delay.
	BaseDeprecationTimestamp time.Time   // this + DeprecationTime defines when we are ready to deprecate
}

TODO: think about includint a sync.RWMutex in case we upgrade to workers

func NewPrunedPeer

func NewPrunedPeer(peerID string, inputType string) *PrunedPeer

func (*PrunedPeer) ConnEventHandler

func (c *PrunedPeer) ConnEventHandler(recErr string) string

RecErrorHandler: Function that selects actuation method for each of the possible errors while actively dialing peers. @params peerID in string format, recorded error in string format.

func (*PrunedPeer) Deprecable

func (c *PrunedPeer) Deprecable() bool

Deprecable: This method evaluates if the peer is in time to be deprecated. @return true (in time to be deprecated) / false (not ready to be deprecated).

func (*PrunedPeer) IsReadyForConnection

func (c *PrunedPeer) IsReadyForConnection() bool

IsReadyForConnection: This method evaluates if the given peer is ready to be connected. @return True of False if we are in time to connect or not.

func (*PrunedPeer) NextConnection

func (c *PrunedPeer) NextConnection() time.Time

func (*PrunedPeer) UpdateDelay

func (c *PrunedPeer) UpdateDelay(newDelayType string)

NewEvent: This method will reevaluate the delay in case of a new Positive or NegativeDelay happenned

type PruningStrategy

type PruningStrategy struct {
	PeerStore *db.PeerStore

	// List of peers sorted by the amount of time thatwe have to wait
	PeerQueue PeerQueue

	PeerQueueIterations      int
	ErrorAttemptDistribution sync.Map
	// contains filtered or unexported fields
}

Pruning Strategy is a Peering Strategy that applies penalties to peers that haven't shown activity when attempting to connect them. Combined with the Deprecated flag in the models.Peer struct, it produces more accurate metrics when exporting pruning peers that are no longer active.

func NewPruningStrategy

func NewPruningStrategy(ctx context.Context, network string, peerstore *db.PeerStore) (PruningStrategy, error)

NewPruningStrategy: Pruning strategy constructor, that will offer a models.Peer stream for the peering service. The provided models.Peer stream are ready to connect. @param ctx: parent context. @param peerstore: db.PeerStore. @param opts: base and logging option. @return peering strategy interface with the prunning service. @return error.

func (*PruningStrategy) AttemptedPeersSinceLastIter

func (c *PruningStrategy) AttemptedPeersSinceLastIter() int64

func (*PruningStrategy) ControlDistribution

func (c *PruningStrategy) ControlDistribution() sync.Map

func (*PruningStrategy) GetErrorAttemptDistribution

func (c *PruningStrategy) GetErrorAttemptDistribution() sync.Map

func (*PruningStrategy) IterForcingNextConnTime

func (c *PruningStrategy) IterForcingNextConnTime() string

func (*PruningStrategy) LastIterTime

func (c *PruningStrategy) LastIterTime() float64

LastIterTime @return the lastiteration time of the peerqueue

func (*PruningStrategy) NewConnectionAttempt

func (c *PruningStrategy) NewConnectionAttempt(connAttStat ConnectionAttemptStatus)

NewConnectionAttempt: Notifies the peerstore iterator that a new ConnStatus has been received. After it, the peerstore iterator will aggregate the extra info. @param connAttStat: the object containing the data from the attempt

func (*PruningStrategy) NewConnectionEvent

func (c *PruningStrategy) NewConnectionEvent(connEvent hosts.ConnectionEvent)

NewConnectionEvent: Notifies the peerstore iterator that a new Connection has been received. It puts the connection metadata in the connNot channel to let the select loop all the metadata of the received connection.

func (*PruningStrategy) NewIdentificationEvent

func (c *PruningStrategy) NewIdentificationEvent(newIdent hosts.IdentificationEvent)

NewIdentificationEvent: This method will insert a new identification item in the identificationeventnorifier channel. @param newIdent: the object containing data about the event.

func (*PruningStrategy) NextPeer

func (c *PruningStrategy) NextPeer()

NextPeer: Notifies the peerstore iterator that a new peer has been requested. After it, the peerstore iterator will put the new peer in the PeerStreamChan.

func (*PruningStrategy) Run

func (c *PruningStrategy) Run() chan models.Peer

Run: Initializes the models.Peer stream on the returning models.Peer chan stores locally an auxiliary map wuth an array that will keep track of the next connection time. @return models.Peer channel with the next peer to connect.

func (PruningStrategy) Type

func (c PruningStrategy) Type() string

Type: Returns the strategy type that has been set. @return string with the name of the pruning strategy.

type TimeoutDelay

type TimeoutDelay struct {
	*NegativeDelay
}

TimeoutDelay In case of "i/o timeout" Only peers that have returned a timeout error. baseDelay = 16 minutes.

type ZeroDelay

type ZeroDelay struct {
	*BaseDelay
}

ZeroDelay: It could be applied to specific error cases where we apply a delay of 0 minutes.

func NewZeroDelay

func NewZeroDelay() ZeroDelay

func (ZeroDelay) CalculateDelay

func (d ZeroDelay) CalculateDelay() time.Duration

CalculateDelay: This method will calculate the delay to be applied based on degree. @return the delay in Time.Duration format.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL