messaging

package
v0.0.0-...-1d04b4a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2026 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func FilterByTransport

func FilterByTransport(addrs []multiaddr.Multiaddr, transport string) []multiaddr.Multiaddr

FilterByTransport filters multiaddrs by transport protocol.

Parameters:

  • addrs: List of multiaddrs to filter
  • transport: Transport protocol to filter by ("tcp", "quic")

Returns:

  • Filtered list of multiaddrs containing the specified transport

func FormatMultiaddr

func FormatMultiaddr(addr multiaddr.Multiaddr, peerID peer.ID) string

FormatMultiaddr formats a peer ID and multiaddr into a full multiaddr string.

Parameters:

  • addr: Base multiaddr (e.g., "/ip4/127.0.0.1/udp/4001/quic-v1")
  • peerID: Peer ID to append

Returns:

  • Full multiaddr string with peer ID

func HasProtocol

func HasProtocol(addr multiaddr.Multiaddr, protocol string) bool

HasProtocol checks if a multiaddr contains a specific protocol.

Parameters:

  • addr: Multiaddr to check
  • protocol: Protocol name to look for (e.g., "tcp", "quic", "quic-v1")

Returns:

  • true if the protocol is present in the multiaddr

func ParseMultiaddrs

func ParseMultiaddrs(addrs []multiaddr.Multiaddr) (peer.AddrInfo, error)

ParseMultiaddrs parses a list of multiaddr strings and extracts peer information.

Parameters:

  • addrs: List of Multiaddr strings (e.g., ["/ip4/127.0.0.1/udp/4001/quic-v1/p2p/QmPeerID", ...])

Returns:

  • peer.AddrInfo containing peer ID and addresses
  • Error if parsing fails or peer IDs mismatch

func ReadAccountsSyncServerMessageSkippingHeartbeats

func ReadAccountsSyncServerMessageSkippingHeartbeats(stream accountSyncStream) (*accountspb.AccountSyncServerMessage, error)

ReadAccountsSyncServerMessageSkippingHeartbeats reads the next non-heartbeat AccountSyncServerMessage from an already-open AccountSync stream.

Heartbeat frames are silently consumed; the read deadline is reset on each one to prevent the stream from timing out during long diff computations on the server.

All other payload types are returned as-is — inspect with:

msg.GetBatchAck()  → AccountBatchAck       (server acked one ART batch)
msg.GetResponse()  → AccountSyncResponse   (one page of missing accounts)
msg.GetEnd()       → AccountSyncEndOfStream (completion signal — stop looping)

Call this in a loop on the same stream after sending the final AccountNonceSyncRequest (is_last=true) until msg.GetEnd() is non-nil or error.

func SelectTransportAddr

func SelectTransportAddr(addrs []multiaddr.Multiaddr, version uint16) (multiaddr.Multiaddr, error)

SelectTransportAddr selects the appropriate multiaddr based on version and transport priority.

Version-based selection:

  • V1 (version == 1): Returns first TCP address only
  • V2 (version >= 2): Prioritizes QUIC, falls back to TCP if no QUIC available

Parameters:

  • addrs: List of multiaddrs to select from
  • version: Protocol version (1 = TCP only, 2+ = QUIC preferred)

Returns:

  • Selected multiaddr
  • Error if no suitable transport is available

func SelectTransportAddrWithFallback

func SelectTransportAddrWithFallback(addrs []multiaddr.Multiaddr, version uint16) (primary, fallback multiaddr.Multiaddr, err error)

SelectTransportAddrWithFallback attempts to connect using the selected transport, and provides fallback logic for V2.

This is a helper that returns both primary and fallback addresses.

Parameters:

  • addrs: List of multiaddrs
  • version: Protocol version

Returns:

  • primary: Primary address to try first
  • fallback: Fallback address to try if primary fails (nil if no fallback)
  • Error if no suitable transport

func SendAccountsSyncAllChunksWithHeartbeat

func SendAccountsSyncAllChunksWithHeartbeat(
	ctx context.Context,
	version uint16,
	host host.Host,
	peerInfo peer.AddrInfo,
	protocolID protocol.ID,
	chunks <-chan *accountspb.AccountNonceSyncRequest,
	handlers AccountSyncHandlers,
) error

SendAccountsSyncAllChunksWithHeartbeat uploads all ART chunks to the server on a single persistent stream and dispatches every server→client frame via handlers.

Protocol:

For each non-final chunk (is_last=false):
  write chunk → read BatchAck (heartbeats skipped) → handlers.OnBatchAck → next chunk

For the final chunk (is_last=true):
  write chunk → read frames until EndOfStream:
    batch_ack  → handlers.OnBatchAck, loop
    response   → handlers.OnResponse, loop
    end        → handlers.OnEnd, return nil

All chunks share ONE stream so the server's sessionLockedART accumulates every batch before the diff computation begins.

func SendDataSyncProtoDelimitedWithHeartbeat

func SendDataSyncProtoDelimitedWithHeartbeat(
	ctx context.Context,
	version uint16,
	host host.Host,
	peerInfo peer.AddrInfo,
	protocolID protocol.ID,
	request proto.Message,
	response *datasyncpb.DataSyncResponse,
) error

SendDataSyncProtoDelimitedWithHeartbeat is a heartbeat-aware variant of SendProtoDelimited, designed for the DataSync protocol.

func SendMessage

func SendMessage(
	ctx context.Context,
	host host.Host,
	peerAddr string,
	protocolID protocol.ID,
	payload []byte,
) ([]byte, error)

SendMessage sends raw bytes to a peer using the specified protocol. It handles connection establishment, stream creation, message exchange, and cleanup.

Parameters:

  • ctx: Context for cancellation and timeout
  • host: The libp2p host to send from
  • peerAddr: Multiaddr string of the peer (e.g., "/ip4/127.0.0.1/udp/4001/quic-v1/p2p/QmPeerID")
  • protocolID: The protocol ID to use for communication
  • payload: Raw bytes to send

Returns:

  • Response bytes from the peer
  • Error if any step fails

func SendMessageToPeer

func SendMessageToPeer(
	ctx context.Context,
	host host.Host,
	peerID peer.ID,
	protocolID protocol.ID,
	payload []byte,
) ([]byte, error)

SendMessageToPeer sends raw bytes to an already-connected peer. Use this when you've already established a connection to avoid redundant connection attempts.

Parameters:

  • ctx: Context for cancellation and timeout
  • host: The libp2p host to send from
  • peerID: The peer ID to send to
  • protocolID: The protocol ID to use for communication
  • payload: Raw bytes to send

Returns:

  • Response bytes from the peer
  • Error if any step fails

func SendPoTSProtoDelimitedWithHeartbeat

func SendPoTSProtoDelimitedWithHeartbeat(
	ctx context.Context,
	version uint16,
	host host.Host,
	peerInfo peer.AddrInfo,
	protocolID protocol.ID,
	request proto.Message,
	response *potspb.PoTSResponse,
) error

SendPoTSProtoDelimitedWithHeartbeat is a heartbeat-aware variant of SendProtoDelimited, designed for the PoTS (Proof of Time Sync) protocol.

func SendProtoDelimited

func SendProtoDelimited(
	ctx context.Context,
	version uint16,
	host host.Host,
	peerInfo peer.AddrInfo,
	protocolID protocol.ID,
	request proto.Message,
	response proto.Message,
) error

SendProtoDelimited sends a length-delimited protobuf message and receives a length-delimited response. This uses the pbstream package for proper protobuf stream handling.

Transport selection based on version:

  • V1 (version == 1): Uses TCP transport only
  • V2 (version >= 2): Prioritizes QUIC transport, automatically falls back to TCP if QUIC fails

Parameters:

  • ctx: Context for cancellation and timeout
  • version: Protocol version (1 = TCP only, 2+ = QUIC with TCP fallback)
  • host: The libp2p host to send from
  • peerInfo: Peer information including ID and multiple multiaddrs
  • protocolID: The protocol ID to use for communication
  • request: Protobuf message to send
  • response: Protobuf message to unmarshal response into

Returns:

  • Error if any step fails

func SendProtoDelimitedWithHeartbeat

func SendProtoDelimitedWithHeartbeat(
	ctx context.Context,
	version uint16,
	host host.Host,
	peerInfo peer.AddrInfo,
	protocolID protocol.ID,
	request proto.Message,
	response *priorsyncpb.PriorSyncMessage,
) error

SendProtoDelimitedWithHeartbeat is a heartbeat-aware variant of SendProtoDelimited, designed for the PriorSync protocol.

func StringToMultiaddr

func StringToMultiaddr(addr []string) ([]multiaddr.Multiaddr, error)

Convert string into multiaddress

Types

type AccountSyncHandlers

type AccountSyncHandlers struct {
	// OnBatchAck is called for each AccountBatchAck (one per uploaded ART batch).
	OnBatchAck func(*accountspb.AccountBatchAck) error
	// OnResponse is called for each AccountSyncResponse page of missing accounts.
	// Pages may arrive out of order; use page_index to reorder if needed.
	OnResponse func(*accountspb.AccountSyncResponse) error
	// OnEnd is called exactly once when AccountSyncEndOfStream arrives.
	// Use total_pages to verify all response pages were received before the
	// function returns.
	OnEnd func(*accountspb.AccountSyncEndOfStream) error
}

AccountSyncHandlers holds per-frame callbacks for SendAccountsSyncAllChunksWithHeartbeat. Heartbeat frames are silently consumed; no callback is needed for them. Returning a non-nil error from any handler aborts the read loop immediately.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL