topology

package
v0.0.0-...-da72ffe Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 16, 2025 License: MIT Imports: 21 Imported by: 0

Documentation

Overview

pkg/topology/actors.go

pkg/topology/errors.go

pkg/topology/metrics.go

pkg/topology/metrics_integration.go

pkg/topology/topology.go

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrPeerNotFound               = errors.New("peer not found in topology")
	ErrPeerAlreadyExists          = errors.New("peer already exists in topology")
	ErrInvalidMessageType         = errors.New("invalid message type")
	ErrInsufficientPermissions    = errors.New("insufficient permissions")
	ErrPeerNotInPendingPeers      = errors.New("peer not found in pendingPeers")
	ErrFailedToRemovePeer         = errors.New("failed to remove peer from topology")
	ErrFailedToProcessActorPacket = errors.New("failed to process actor packet")
	ErrUnknownActorPacketStatus   = errors.New("unknown ActorPacket status")
	ErrInvalidSignature           = errors.New("invalid signature")
	ErrTimeoutExceeded            = errors.New("wait operation timed out")
)

Custom errors for the topology package.

Functions

This section is empty.

Types

type Actor

type Actor struct {
	ID                  peer.ID
	Address             types.Address
	Addresses           []multiaddr.Multiaddr
	Roles               []types.Role
	SupportedTransports []types.TransportType
	SupportedProtocols  []types.ProtocolType
	SupportedSigners    []types.SignerType
	ConsensusPublicKey  kyber.Point
	PublicKey           libp2pCrypto.PubKey
	ConsensusActor      *share.Actor // Optional: Link to share.Actor for consensus participation
}

Actor represents both the node's own actor information and that of connected peers.

type ActorAddedCallback

type ActorAddedCallback func(actor *Actor) error

ActorAddedCallback is a function type that is called when an actor is added.

type ActorRemovedCallback

type ActorRemovedCallback func(actor *Actor) error

ActorRemovedCallback is a function type that is called when an actor is removed.

type Actors

type Actors struct {
	// contains filtered or unexported fields
}

Actors manages actor information within the topology.

func NewActors

func NewActors(logger logger.Logger, account share.Account, network *networking.Network, rbacMgr *rbac.Manager, collector share.Collector, consensusSet *share.ActorSet, onPeerEvent func()) (*Actors, error)

NewActors creates a new Actors instance and registers event handlers. The onPeerEvent function is called whenever a peer is added or removed.

func (*Actors) AddPeer

func (a *Actors) AddPeer(peerID peer.ID, address types.Address, addresses []multiaddr.Multiaddr, roles []types.Role, transports []types.TransportType, protocols []types.ProtocolType, signers []types.SignerType, pubKey libp2pCrypto.PubKey) error

AddPeer adds a peer to the topology with verified information.

func (*Actors) AddPeerRole

func (a *Actors) AddPeerRole(peerID peer.ID, role types.Role) error

AddPeerRole assigns a new role to an existing peer.

func (*Actors) GetPeer

func (a *Actors) GetPeer(peerID peer.ID) (*Actor, error)

GetPeer retrieves a peer's information from the topology.

func (*Actors) GetPeers

func (a *Actors) GetPeers() ([]*Actor, error)

GetPeers returns a list of all peers in the topology.

func (*Actors) GetPeersByRole

func (a *Actors) GetPeersByRole(role types.Role) ([]*Actor, error)

GetPeersByRole retrieves all peers assigned a specific role.

func (*Actors) GetPeersByRoles

func (a *Actors) GetPeersByRoles(roles ...types.Role) ([]*Actor, error)

GetPeersByRoles retrieves all peers assigned any of the specified roles.

func (*Actors) GetPeersCount

func (a *Actors) GetPeersCount() (int, error)

GetPeersCount returns a count of all peers in the topology.

func (*Actors) GetRolesOfPeer

func (a *Actors) GetRolesOfPeer(peerID peer.ID) ([]types.Role, error)

GetRolesOfPeer retrieves all roles associated with a specific peer.

func (*Actors) HandlePeerConnected

func (a *Actors) HandlePeerConnected(ctx context.Context, peerInfo peer.AddrInfo) error

HandlePeerConnected is invoked when a new peer connects.

func (*Actors) HandlePeerDisconnected

func (a *Actors) HandlePeerDisconnected(ctx context.Context, peerID peer.ID) error

HandlePeerDisconnected is invoked when a peer disconnects.

func (*Actors) HasPeer

func (a *Actors) HasPeer(peerID peer.ID) bool

HasPeer checks if a peer is already in the topology or pending.

func (*Actors) PeerHasRole

func (a *Actors) PeerHasRole(peerID peer.ID, role types.Role) (bool, error)

PeerHasRole checks if a specific peer possesses a particular role.

func (*Actors) RegisterActorAddedCallback

func (a *Actors) RegisterActorAddedCallback(cb ActorAddedCallback)

RegisterActorAddedCallback allows external packages to register a callback that is invoked whenever an actor is added.

func (*Actors) RegisterActorRemovedCallback

func (a *Actors) RegisterActorRemovedCallback(cb ActorRemovedCallback)

RegisterActorRemovedCallback allows external packages to register a callback that is invoked whenever an actor is removed.

func (*Actors) RemovePeer

func (a *Actors) RemovePeer(ctx context.Context, peerID peer.ID, force bool) error

RemovePeer removes a peer from the topology.

func (*Actors) RemovePeerRole

func (a *Actors) RemovePeerRole(peerID peer.ID, role types.Role) error

RemovePeerRole removes a specific role from an existing peer.

func (*Actors) SendActorResponsePacket

func (a *Actors) SendActorResponsePacket(ctx context.Context, peerID peer.ID, ap *packets.ActorPacket) error

SendActorResponsePacket sends an ActorPacket (e.g., approved) to a peer.

type Topology

type Topology struct {
	// contains filtered or unexported fields
}

Topology orchestrates the overall network topology and peer management.

func NewTopology

func NewTopology(ctx context.Context, logger logger.Logger, account share.Account, network *networking.Network, rbacMgr *rbac.Manager, collector *metrics.Collector, actorSet *share.ActorSet) (*Topology, error)

NewTopology creates a new Topology instance with the given logger and network reference.

func (*Topology) ActorSet

func (t *Topology) ActorSet() *share.ActorSet

ActorSet return the share.ActorSet instance managed by Topology. share.ActorSet is in charge of consensus-related-actors TODO: Rename to ConsensusSet

func (*Topology) Actors

func (t *Topology) Actors() *Actors

Actors return the Actors instance managed by Topology.

func (*Topology) HandleActorPacket

func (t *Topology) HandleActorPacket(ctx context.Context, msg *packets.NetworkPacket, sender peer.ID) error

HandleActorPacket handles incoming ActorPacket based on its Status.

func (*Topology) RegisterHandlers

func (t *Topology) RegisterHandlers() error

RegisterHandlers registers the Topology's packet handlers with the networking layer.

func (*Topology) Shutdown

func (t *Topology) Shutdown()

Shutdown gracefully shuts down the topology.

func (*Topology) WaitForPeer

func (t *Topology) WaitForPeer(ctx context.Context, targetPeerID peer.ID, timeout time.Duration) error

WaitForPeer waits until the specified peer is available in the topology or the context times out.

func (*Topology) WaitForPeers

func (t *Topology) WaitForPeers(ctx context.Context, timeout time.Duration) error

WaitForPeers waits until at least one peer is available in the topology or the context times out.

func (*Topology) WaitForPeersWithRole

func (t *Topology) WaitForPeersWithRole(ctx context.Context, role types.Role, timeout time.Duration) error

WaitForPeersWithRole waits until at least one peer with the specified role is available or the context times out.

func (*Topology) WaitForPeersWithRoles

func (t *Topology) WaitForPeersWithRoles(ctx context.Context, roles []types.Role, timeout time.Duration) error

WaitForPeersWithRoles waits until at least one peer with any of the specified roles is available or the context times out.

type TopologyMetrics

type TopologyMetrics struct {
	// Actor management metrics
	ActorsAddedTotal          metric.Int64Counter
	ActorsRemovedTotal        metric.Int64Counter
	ActorVerificationsTotal   metric.Int64Counter
	ActorVerificationFailures metric.Int64Counter
	ActorVerificationLatency  metric.Float64Histogram

	// Pending peer metrics
	PendingPeersTotal        metric.Int64Counter
	PendingPeersTimeoutTotal metric.Int64Counter

	// Role-based metrics
	ActorsByRoleCount metric.Int64UpDownCounter

	// Connection metrics
	ActorConnectionsTotal    metric.Int64Counter
	ActorDisconnectionsTotal metric.Int64Counter

	// Consensus-related metrics
	ConsensusActorsTotal metric.Int64UpDownCounter

	// Health metrics
	TopologyChangeLatency metric.Float64Histogram
	// Wait operation metrics
	WaitOperationsTotal  metric.Int64Counter
	WaitOperationSuccess metric.Int64Counter
	WaitOperationTimeout metric.Int64Counter
	WaitOperationLatency metric.Float64Histogram
}

TopologyMetrics holds all the metrics instruments for the Topology management.

func InitializeMetrics

func InitializeMetrics(meter metric.Meter) (*TopologyMetrics, error)

InitializeMetrics initializes all topology metrics using the provided meter.

func InitializeTopologyMetrics

func InitializeTopologyMetrics(meter metric.Meter) (*TopologyMetrics, error)

InitializeTopologyMetrics initializes the metrics instruments for Topology.

func (*TopologyMetrics) RecordActorAdded

func (m *TopologyMetrics) RecordActorAdded(ctx context.Context, count int64, role string)

RecordActorAdded increments the ActorsAddedTotal counter.

func (*TopologyMetrics) RecordActorConnection

func (m *TopologyMetrics) RecordActorConnection(ctx context.Context, count int64)

RecordActorConnection increments the ActorConnectionsTotal counter.

func (*TopologyMetrics) RecordActorDisconnection

func (m *TopologyMetrics) RecordActorDisconnection(ctx context.Context, count int64)

RecordActorDisconnection increments the ActorDisconnectionsTotal counter.

func (*TopologyMetrics) RecordActorRemoved

func (m *TopologyMetrics) RecordActorRemoved(ctx context.Context, count int64, role string)

RecordActorRemoved increments the ActorsRemovedTotal counter.

func (*TopologyMetrics) RecordActorVerification

func (m *TopologyMetrics) RecordActorVerification(ctx context.Context, count int64)

RecordActorVerification increments the ActorVerificationsTotal counter.

func (*TopologyMetrics) RecordActorVerificationFailure

func (m *TopologyMetrics) RecordActorVerificationFailure(ctx context.Context, count int64)

RecordActorVerificationFailure increments the ActorVerificationFailures counter.

func (*TopologyMetrics) RecordActorVerificationLatency

func (m *TopologyMetrics) RecordActorVerificationLatency(ctx context.Context, duration time.Duration)

RecordActorVerificationLatency records the latency of actor verification operations.

func (*TopologyMetrics) RecordConsensusActor

func (m *TopologyMetrics) RecordConsensusActor(ctx context.Context, delta int64)

RecordConsensusActor updates the ConsensusActorsTotal counter.

func (*TopologyMetrics) RecordPendingPeer

func (m *TopologyMetrics) RecordPendingPeer(ctx context.Context, count int64)

RecordPendingPeer increments the PendingPeersTotal counter.

func (*TopologyMetrics) RecordPendingPeerTimeout

func (m *TopologyMetrics) RecordPendingPeerTimeout(ctx context.Context, count int64)

RecordPendingPeerTimeout increments the PendingPeersTimeoutTotal counter.

func (*TopologyMetrics) RecordTopologyChangeLatency

func (m *TopologyMetrics) RecordTopologyChangeLatency(ctx context.Context, duration time.Duration)

RecordTopologyChangeLatency records the latency of topology change operations.

func (*TopologyMetrics) RecordWaitOperation

func (m *TopologyMetrics) RecordWaitOperation(ctx context.Context, count int64, waitType string)

RecordWaitOperation increments the WaitOperationsTotal counter.

func (*TopologyMetrics) RecordWaitOperationLatency

func (m *TopologyMetrics) RecordWaitOperationLatency(ctx context.Context, duration time.Duration, waitType string)

RecordWaitOperationLatency records the latency of wait operations.

func (*TopologyMetrics) RecordWaitOperationSuccess

func (m *TopologyMetrics) RecordWaitOperationSuccess(ctx context.Context, count int64, waitType string)

RecordWaitOperationSuccess increments the WaitOperationSuccess counter.

func (*TopologyMetrics) RecordWaitOperationTimeout

func (m *TopologyMetrics) RecordWaitOperationTimeout(ctx context.Context, count int64, waitType string)

RecordWaitOperationTimeout increments the WaitOperationTimeout counter.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL