README

go-libp2p-pubsub


This repo contains the canonical pubsub implementation for libp2p. We currently provide three message router options:

  • Floodsub, which is the baseline flooding protocol.
  • Randomsub, which is a simple probabilistic router that propagates to random subsets of peers.
  • Gossipsub, which is a more advanced router with mesh formation and gossip propagation. See spec and implementation for more details.

PSA: The Hardening Extensions for Gossipsub (Gossipsub V1.1) can be found under development at https://github.com/libp2p/go-libp2p-pubsub/pull/263

Repo Lead Maintainer

@vyzo

This repo follows the Repo Lead Maintainer Protocol

Table of Contents

Install

go get github.com/libp2p/go-libp2p-pubsub

Usage

To be used for messaging in p2p instrastructure (as part of libp2p) such as IPFS, Ethereum, other blockchains, etc.

Example

https://github.com/libp2p/go-libp2p-examples/tree/master/pubsub

Documentation

See the libp2p specs for high level documentation and godoc for API documentation.

In this repo, you will find
.
├── LICENSE
├── README.md
# Regular Golang repo set up
├── codecov.yml
├── pb
├── go.mod
├── go.sum
├── doc.go
# PubSub base
├── pubsub.go
├── blacklist.go
├── notify.go
├── comm.go
├── discovery.go
├── sign.go
├── subscription.go
├── topic.go
├── trace.go
├── tracer.go
├── validation.go
# Floodsub router
├── floodsub.go
# Randomsub router
├── randomsub.go
# Gossipsub router
├── gossipsub.go
├── score.go
├── score_params.go
└── mcache.go
Tracing

The pubsub system supports tracing, which collects all events pertaining to the internals of the system. This allows you to recreate the complete message flow and state of the system for analysis purposes.

To enable tracing, instantiate the pubsub system using the WithEventTracer option; the option accepts a tracer with three available implementations in-package (trace to json, pb, or a remote peer). If you want to trace using a remote peer, you can do so using the traced daemon from go-libp2p-pubsub-tracer. The package also includes a utility program, tracestat, for analyzing the traces collected by the daemon.

For instance, to capture the trace as a json file, you can use the following option:

pubsub.NewGossipSub(..., pubsub.NewEventTracer(pubsub.NewJSONTracer("/path/to/trace.json")))

To capture the trace as a protobuf, you can use the following option:

pubsub.NewGossipSub(..., pubsub.NewEventTracer(pubsub.NewPBTracer("/path/to/trace.pb")))

Finally, to use the remote tracer, you can use the following incantations:

// assuming that your tracer runs in x.x.x.x and has a peer ID of QmTracer
pi, err := peer.AddrInfoFromP2pAddr(ma.StringCast("/ip4/x.x.x.x/tcp/4001/p2p/QmTracer"))
if err != nil {
  panic(err)
}

tracer, err := pubsub.NewRemoteTracer(ctx, host, pi)
if err != nil {
  panic(err)
}

ps, err := pubsub.NewGossipSub(..., pubsub.WithEventTracer(tracer))

Contribute

Contributions welcome. Please check out the issues.

Check out our contributing document for more information on how we work, and about contributing in general. Please be aware that all interactions related to multiformats are subject to the IPFS Code of Conduct.

Small note: If editing the README, please conform to the standard-readme specification.

License

The go-libp2p-pubsub project is dual-licensed under Apache 2.0 and MIT terms:

Expand ▾ Collapse ▴

Documentation

Overview

    The pubsub package provides facilities for the Publish/Subscribe pattern of message propagation, also known as overlay multicast. The implementation provides topic-based pubsub, with pluggable routing algorithms.

    The main interface to the library is the PubSub object. You can construct this object with the following constructors:

    - NewFloodSub creates an instance that uses the floodsub routing algorithm.

    - NewGossipSub creates an instance that uses the gossipsub routing algorithm.

    - NewRandomSub creates an instance that uses the randomsub routing algorithm.

    In addition, there is a generic constructor that creates a pubsub instance with a custom PubSubRouter interface. This procedure is currently reserved for internal use within the package.

    Once you have constructed a PubSub instance, you need to establish some connections to your peers; the implementation relies on ambient peer discovery, leaving bootstrap and active peer discovery up to the client.

    To publish a message to some topic, use Publish; you don't need to be subscribed to the topic in order to publish.

    To subscribe to a topic, use Subscribe; this will give you a subscription interface from which new messages can be pumped.

    Index

    Constants

    View Source
    const (
    	FloodSubID              = protocol.ID("/floodsub/1.0.0")
    	FloodSubTopicSearchSize = 5
    )
    View Source
    const (
    	// GossipSubID_v10 is the protocol ID for version 1.0.0 of the GossipSub protocol.
    	// It is advertised along with GossipSubID_v11 for backwards compatibility.
    	GossipSubID_v10 = protocol.ID("/meshsub/1.0.0")
    
    	// GossipSubID_v11 is the protocol ID for version 1.1.0 of the GossipSub protocol.
    	// See the spec for details about how v1.1.0 compares to v1.0.0:
    	// https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md
    	GossipSubID_v11 = protocol.ID("/meshsub/1.1.0")
    )
    View Source
    const (
    	DefaultDecayInterval = time.Second
    	DefaultDecayToZero   = 0.01
    )
    View Source
    const (
    	// StrictSign produces signatures and expects and verifies incoming signatures
    	StrictSign = msgSigning | msgVerification
    	// StrictNoSign does not produce signatures and drops and penalises incoming messages that carry one
    	StrictNoSign = msgVerification
    	// LaxSign produces signatures and validates incoming signatures iff one is present
    	// Deprecated: it is recommend to either strictly enable, or strictly disable, signatures.
    	LaxSign = msgSigning
    	// LaxNoSign does not produce signatures and validates incoming signatures iff one is present
    	// Deprecated: it is recommend to either strictly enable, or strictly disable, signatures.
    	LaxNoSign = 0
    )
    View Source
    const (
    	// ValidationAccept is a validation decision that indicates a valid message that should be accepted and
    	// delivered to the application and forwarded to the network.
    	ValidationAccept = ValidationResult(0)
    	// ValidationReject is a validation decision that indicates an invalid message that should not be
    	// delivered to the application or forwarded to the application. Furthermore the peer that forwarded
    	// the message should be penalized by peer scoring routers.
    	ValidationReject = ValidationResult(1)
    	// ValidationIgnore is a validation decision that indicates a message that should be ignored: it will
    	// be neither delivered to the application nor forwarded to the network. However, in contrast to
    	// ValidationReject, the peer that forwarded the message must not be penalized by peer scoring routers.
    	ValidationIgnore = ValidationResult(2)
    )
    View Source
    const DefaultMaxMessageSize = 1 << 20

      DefaultMaximumMessageSize is 1mb.

      View Source
      const (
      	RandomSubID = protocol.ID("/randomsub/1.0.0")
      )
      View Source
      const RemoteTracerProtoID = protocol.ID("/libp2p/pubsub/tracer/1.0.0")
      View Source
      const SignPrefix = "libp2p-pubsub:"

      Variables

      View Source
      var (
      
      	// DiscoveryPollInitialDelay is how long the discovery system waits after it first starts before polling
      	DiscoveryPollInitialDelay = 0 * time.Millisecond
      	// DiscoveryPollInterval is approximately how long the discovery system waits in between checks for whether the
      	// more peers are needed for any topic
      	DiscoveryPollInterval = 1 * time.Second
      )
      View Source
      var (
      
      	// GossipSubD sets the optimal degree for a GossipSub topic mesh. For example, if GossipSubD == 6,
      	// each peer will want to have about six peers in their mesh for each topic they're subscribed to.
      	// GossipSubD should be set somewhere between GossipSubDlo and GossipSubDhi.
      	GossipSubD = 6
      
      	// GossipSubDlo sets the lower bound on the number of peers we keep in a GossipSub topic mesh.
      	// If we have fewer than GossipSubDlo peers, we will attempt to graft some more into the mesh at
      	// the next heartbeat.
      	GossipSubDlo = 5
      
      	// GossipSubDhi sets the upper bound on the number of peers we keep in a GossipSub topic mesh.
      	// If we have more than GossipSubDhi peers, we will select some to prune from the mesh at the next heartbeat.
      	GossipSubDhi = 12
      
      	// GossipSubDscore affects how peers are selected when pruning a mesh due to over subscription.
      	// At least GossipSubDscore of the retained peers will be high-scoring, while the remainder are
      	// chosen randomly.
      	GossipSubDscore = 4
      
      	// GossipSubDout sets the quota for the number of outbound connections to maintain in a topic mesh.
      	// When the mesh is pruned due to over subscription, we make sure that we have outbound connections
      	// to at least GossipSubDout of the survivor peers. This prevents sybil attackers from overwhelming
      	// our mesh with incoming connections.
      	//
      	// GossipSubDout must be set below GossipSubDlo, and must not exceed GossipSubD / 2.
      	GossipSubDout = 2
      
      	// GossipSubHistoryLength controls the size of the message cache used for gossip.
      	// The message cache will remember messages for GossipSubHistoryLength heartbeats.
      	GossipSubHistoryLength = 5
      
      	// GossipSubHistoryGossip controls how many cached message ids we will advertise in
      	// IHAVE gossip messages. When asked for our seen message IDs, we will return
      	// only those from the most recent GossipSubHistoryGossip heartbeats. The slack between
      	// GossipSubHistoryGossip and GossipSubHistoryLength allows us to avoid advertising messages
      	// that will be expired by the time they're requested.
      	//
      	// GossipSubHistoryGossip must be less than or equal to GossipSubHistoryLength to
      	// avoid a runtime panic.
      	GossipSubHistoryGossip = 3
      
      	// GossipSubDlazy affects how many peers we will emit gossip to at each heartbeat.
      	// We will send gossip to at least GossipSubDlazy peers outside our mesh. The actual
      	// number may be more, depending on GossipSubGossipFactor and how many peers we're
      	// connected to.
      	GossipSubDlazy = 6
      
      	// GossipSubGossipFactor affects how many peers we will emit gossip to at each heartbeat.
      	// We will send gossip to GossipSubGossipFactor * (total number of non-mesh peers), or
      	// GossipSubDlazy, whichever is greater.
      	GossipSubGossipFactor = 0.25
      
      	// GossipSubGossipRetransmission controls how many times we will allow a peer to request
      	// the same message id through IWANT gossip before we start ignoring them. This is designed
      	// to prevent peers from spamming us with requests and wasting our resources.
      	GossipSubGossipRetransmission = 3
      
      	// GossipSubHeartbeatInitialDelay is the short delay before the heartbeat timer begins
      	// after the router is initialized.
      	GossipSubHeartbeatInitialDelay = 100 * time.Millisecond
      
      	// GossipSubHeartbeatInterval controls the time between heartbeats.
      	GossipSubHeartbeatInterval = 1 * time.Second
      
      	// GossipSubFanoutTTL controls how long we keep track of the fanout state. If it's been
      	// GossipSubFanoutTTL since we've published to a topic that we're not subscribed to,
      	// we'll delete the fanout map for that topic.
      	GossipSubFanoutTTL = 60 * time.Second
      
      	// GossipSubPrunePeers controls the number of peers to include in prune Peer eXchange.
      	// When we prune a peer that's eligible for PX (has a good score, etc), we will try to
      	// send them signed peer records for up to GossipSubPrunePeers other peers that we
      	// know of.
      	GossipSubPrunePeers = 16
      
      	// GossipSubPruneBackoff controls the backoff time for pruned peers. This is how long
      	// a peer must wait before attempting to graft into our mesh again after being pruned.
      	// When pruning a peer, we send them our value of GossipSubPruneBackoff so they know
      	// the minimum time to wait. Peers running older versions may not send a backoff time,
      	// so if we receive a prune message without one, we will wait at least GossipSubPruneBackoff
      	// before attempting to re-graft.
      	GossipSubPruneBackoff = time.Minute
      
      	// GossipSubConnectors controls the number of active connection attempts for peers obtained through PX.
      	GossipSubConnectors = 8
      
      	// GossipSubMaxPendingConnections sets the maximum number of pending connections for peers attempted through px.
      	GossipSubMaxPendingConnections = 128
      
      	// GossipSubConnectionTimeout controls the timeout for connection attempts.
      	GossipSubConnectionTimeout = 30 * time.Second
      
      	// GossipSubDirectConnectTicks is the number of heartbeat ticks for attempting to reconnect direct peers
      	// that are not currently connected.
      	GossipSubDirectConnectTicks uint64 = 300
      
      	// GossipSubDirectConnectInitialDelay is the initial delay before opening connections to direct peers
      	GossipSubDirectConnectInitialDelay = time.Second
      
      	// GossipSubOpportunisticGraftTicks is the number of heartbeat ticks for attempting to improve the mesh
      	// with opportunistic grafting. Every GossipSubOpportunisticGraftTicks we will attempt to select some
      	// high-scoring mesh peers to replace lower-scoring ones, if the median score of our mesh peers falls
      	// below a threshold (see https://godoc.org/github.com/libp2p/go-libp2p-pubsub#PeerScoreThresholds).
      	GossipSubOpportunisticGraftTicks uint64 = 60
      
      	// GossipSubOpportunisticGraftPeers is the number of peers to opportunistically graft.
      	GossipSubOpportunisticGraftPeers = 2
      
      	// If a GRAFT comes before GossipSubGraftFloodThreshold has elapsed since the last PRUNE,
      	// then there is an extra score penalty applied to the peer through P7.
      	GossipSubGraftFloodThreshold = 10 * time.Second
      
      	// GossipSubMaxIHaveLength is the maximum number of messages to include in an IHAVE message.
      	// Also controls the maximum number of IHAVE ids we will accept and request with IWANT from a
      	// peer within a heartbeat, to protect from IHAVE floods. You should adjust this value from the
      	// default if your system is pushing more than 5000 messages in GossipSubHistoryGossip heartbeats;
      	// with the defaults this is 1666 messages/s.
      	GossipSubMaxIHaveLength = 5000
      
      	// GossipSubMaxIHaveMessages is the maximum number of IHAVE messages to accept from a peer within a heartbeat.
      	GossipSubMaxIHaveMessages = 10
      
      	// Time to wait for a message requested through IWANT following an IHAVE advertisement.
      	// If the message is not received within this window, a broken promise is declared and
      	// the router may apply bahavioural penalties.
      	GossipSubIWantFollowupTime = 3 * time.Second
      )
      View Source
      var (
      	DefaultPeerGaterRetainStats     = 6 * time.Hour
      	DefaultPeerGaterQuiet           = time.Minute
      	DefaultPeerGaterDuplicateWeight = 0.125
      	DefaultPeerGaterIgnoreWeight    = 1.0
      	DefaultPeerGaterRejectWeight    = 16.0
      	DefaultPeerGaterThreshold       = 0.33
      	DefaultPeerGaterGlobalDecay     = ScoreParameterDecay(2 * time.Minute)
      	DefaultPeerGaterSourceDecay     = ScoreParameterDecay(time.Hour)
      )
      View Source
      var (
      	TimeCacheDuration = 120 * time.Second
      
      	// ErrSubscriptionCancelled may be returned when a subscription Next() is called after the
      	// subscription has been cancelled.
      	ErrSubscriptionCancelled = errors.New("subscription cancelled")
      )
      View Source
      var (
      	// GossipSubConnTagBumpMessageDelivery is the amount to add to the connection manager
      	// tag that tracks message deliveries. Each time a peer is the first to deliver a
      	// message within a topic, we "bump" a tag by this amount, up to a maximum
      	// of GossipSubConnTagMessageDeliveryCap.
      	// Note that the delivery tags decay over time, decreasing by GossipSubConnTagDecayAmount
      	// at every GossipSubConnTagDecayInterval.
      	GossipSubConnTagBumpMessageDelivery = 1
      
      	// GossipSubConnTagDecayInterval is the decay interval for decaying connection manager tags.
      	GossipSubConnTagDecayInterval = 10 * time.Minute
      
      	// GossipSubConnTagDecayAmount is subtracted from decaying tag values at each decay interval.
      	GossipSubConnTagDecayAmount = 1
      
      	// GossipSubConnTagMessageDeliveryCap is the maximum value for the connection manager tags that
      	// track message deliveries.
      	GossipSubConnTagMessageDeliveryCap = 15
      )
      View Source
      var ErrTooManySubscriptions = errors.New("too many subscriptions")

        ErrTooManySubscriptions may be returned by a SubscriptionFilter to signal that there are too many subscriptions to process.

        View Source
        var ErrTopicClosed = errors.New("this Topic is closed, try opening a new one")

          ErrTopicClosed is returned if a Topic is utilized after it has been closed

          View Source
          var MinTraceBatchSize = 16
          View Source
          var (
          	RandomSubD = 6
          )
          View Source
          var TraceBufferSize = 1 << 16 // 64K ought to be enough for everyone; famous last words.
          

          Functions

          func DefaultMsgIdFn

          func DefaultMsgIdFn(pmsg *pb.Message) string

            DefaultMsgIdFn returns a unique ID of the passed Message

            func FilterSubscriptions

            func FilterSubscriptions(subs []*pb.RPC_SubOpts, filter func(string) bool) []*pb.RPC_SubOpts

              FilterSubscriptions filters (and deduplicates) a list of subscriptions. filter should return true if a topic is of interest.

              func ScoreParameterDecay

              func ScoreParameterDecay(decay time.Duration) float64

                ScoreParameterDecay computes the decay factor for a parameter, assuming the DecayInterval is 1s and that the value decays to zero if it drops below 0.01

                func ScoreParameterDecayWithBase

                func ScoreParameterDecayWithBase(decay time.Duration, base time.Duration, decayToZero float64) float64

                  ScoreParameterDecay computes the decay factor for a parameter using base as the DecayInterval

                  Types

                  type AcceptStatus

                  type AcceptStatus int
                  const (
                  	// AcceptAll signals to accept the incoming RPC for full processing
                  	AcceptNone AcceptStatus = iota
                  	// AcceptControl signals to accept the incoming RPC only for control message processing by
                  	// the router. Included payload messages will _not_ be pushed to the validation queue.
                  	AcceptControl
                  	// AcceptNone signals to drop the incoming RPC
                  	AcceptAll
                  )

                  type BackoffConnectorFactory

                  type BackoffConnectorFactory func(host host.Host) (*discimpl.BackoffConnector, error)

                    BackoffConnectorFactory creates a BackoffConnector that is attached to a given host

                    type Blacklist

                    type Blacklist interface {
                    	Add(peer.ID) bool
                    	Contains(peer.ID) bool
                    }

                      Blacklist is an interface for peer blacklisting.

                      func NewMapBlacklist

                      func NewMapBlacklist() Blacklist

                        NewMapBlacklist creates a new MapBlacklist

                        func NewTimeCachedBlacklist

                        func NewTimeCachedBlacklist(expiry time.Duration) (Blacklist, error)

                          NewTimeCachedBlacklist creates a new TimeCachedBlacklist with the given expiry duration

                          type CacheEntry

                          type CacheEntry struct {
                          	// contains filtered or unexported fields
                          }

                          type DiscoverOpt

                          type DiscoverOpt func(*discoverOptions) error

                          func WithDiscoverConnector

                          func WithDiscoverConnector(connFactory BackoffConnectorFactory) DiscoverOpt

                            WithDiscoverConnector adds a custom connector that deals with how the discovery subsystem connects to peers

                            func WithDiscoveryOpts

                            func WithDiscoveryOpts(opts ...discovery.Option) DiscoverOpt

                              WithDiscoveryOpts passes libp2p Discovery options into the PubSub discovery subsystem

                              type EventTracer

                              type EventTracer interface {
                              	Trace(evt *pb.TraceEvent)
                              }

                                Generic event tracer interface

                                type EventType

                                type EventType int
                                const (
                                	PeerJoin EventType = iota
                                	PeerLeave
                                )

                                type ExtendedPeerScoreInspectFn

                                type ExtendedPeerScoreInspectFn = func(map[peer.ID]*PeerScoreSnapshot)

                                type FloodSubRouter

                                type FloodSubRouter struct {
                                	// contains filtered or unexported fields
                                }

                                func (*FloodSubRouter) AcceptFrom

                                func (fs *FloodSubRouter) AcceptFrom(peer.ID) AcceptStatus

                                func (*FloodSubRouter) AddPeer

                                func (fs *FloodSubRouter) AddPeer(p peer.ID, proto protocol.ID)

                                func (*FloodSubRouter) Attach

                                func (fs *FloodSubRouter) Attach(p *PubSub)

                                func (*FloodSubRouter) EnoughPeers

                                func (fs *FloodSubRouter) EnoughPeers(topic string, suggested int) bool

                                func (*FloodSubRouter) HandleRPC

                                func (fs *FloodSubRouter) HandleRPC(rpc *RPC)

                                func (*FloodSubRouter) Join

                                func (fs *FloodSubRouter) Join(topic string)

                                func (*FloodSubRouter) Leave

                                func (fs *FloodSubRouter) Leave(topic string)

                                func (*FloodSubRouter) Protocols

                                func (fs *FloodSubRouter) Protocols() []protocol.ID

                                func (*FloodSubRouter) Publish

                                func (fs *FloodSubRouter) Publish(msg *Message)

                                func (*FloodSubRouter) RemovePeer

                                func (fs *FloodSubRouter) RemovePeer(p peer.ID)

                                type GossipSubRouter

                                type GossipSubRouter struct {
                                
                                	// overly parameter "constants"
                                	// these are pulled from their global value or else the race detector is angry on travis
                                	// it also allows us to change them per peer in tests, which is a plus
                                	D, Dlo, Dhi, Dscore, Dout, Dlazy int
                                	// contains filtered or unexported fields
                                }

                                  GossipSubRouter is a router that implements the gossipsub protocol. For each topic we have joined, we maintain an overlay through which messages flow; this is the mesh map. For each topic we publish to without joining, we maintain a list of peers to use for injecting our messages in the overlay with stable routes; this is the fanout map. Fanout peer lists are expired if we don't publish any messages to their topic for GossipSubFanoutTTL.

                                  func (*GossipSubRouter) AcceptFrom

                                  func (gs *GossipSubRouter) AcceptFrom(p peer.ID) AcceptStatus

                                  func (*GossipSubRouter) AddPeer

                                  func (gs *GossipSubRouter) AddPeer(p peer.ID, proto protocol.ID)

                                  func (*GossipSubRouter) Attach

                                  func (gs *GossipSubRouter) Attach(p *PubSub)

                                  func (*GossipSubRouter) EnoughPeers

                                  func (gs *GossipSubRouter) EnoughPeers(topic string, suggested int) bool

                                  func (*GossipSubRouter) HandleRPC

                                  func (gs *GossipSubRouter) HandleRPC(rpc *RPC)

                                  func (*GossipSubRouter) Join

                                  func (gs *GossipSubRouter) Join(topic string)

                                  func (*GossipSubRouter) Leave

                                  func (gs *GossipSubRouter) Leave(topic string)

                                  func (*GossipSubRouter) Protocols

                                  func (gs *GossipSubRouter) Protocols() []protocol.ID

                                  func (*GossipSubRouter) Publish

                                  func (gs *GossipSubRouter) Publish(msg *Message)

                                  func (*GossipSubRouter) RemovePeer

                                  func (gs *GossipSubRouter) RemovePeer(p peer.ID)

                                  type JSONTracer

                                  type JSONTracer struct {
                                  	// contains filtered or unexported fields
                                  }

                                    JSONTracer is a tracer that writes events to a file, encoded in ndjson.

                                    func NewJSONTracer

                                    func NewJSONTracer(file string) (*JSONTracer, error)

                                      NewJsonTracer creates a new JSONTracer writing traces to file.

                                      func OpenJSONTracer

                                      func OpenJSONTracer(file string, flags int, perm os.FileMode) (*JSONTracer, error)

                                        OpenJSONTracer creates a new JSONTracer, with explicit control of OpenFile flags and permissions.

                                        func (*JSONTracer) Close

                                        func (t *JSONTracer) Close()

                                        func (*JSONTracer) Trace

                                        func (t *JSONTracer) Trace(evt *pb.TraceEvent)

                                        type MapBlacklist

                                        type MapBlacklist map[peer.ID]struct{}

                                          MapBlacklist is a blacklist implementation using a perfect map

                                          func (MapBlacklist) Add

                                          func (b MapBlacklist) Add(p peer.ID) bool

                                          func (MapBlacklist) Contains

                                          func (b MapBlacklist) Contains(p peer.ID) bool

                                          type Message

                                          type Message struct {
                                          	*pb.Message
                                          	ReceivedFrom  peer.ID
                                          	ValidatorData interface{}
                                          }

                                          func (*Message) GetFrom

                                          func (m *Message) GetFrom() peer.ID

                                          type MessageCache

                                          type MessageCache struct {
                                          	// contains filtered or unexported fields
                                          }

                                          func NewMessageCache

                                          func NewMessageCache(gossip, history int) *MessageCache

                                            NewMessageCache creates a sliding window cache that remembers messages for as long as `history` slots.

                                            When queried for messages to advertise, the cache only returns messages in the last `gossip` slots.

                                            The `gossip` parameter must be smaller or equal to `history`, or this function will panic.

                                            The slack between `gossip` and `history` accounts for the reaction time between when a message is advertised via IHAVE gossip, and the peer pulls it via an IWANT command.

                                            func (*MessageCache) Get

                                            func (mc *MessageCache) Get(mid string) (*pb.Message, bool)

                                            func (*MessageCache) GetForPeer

                                            func (mc *MessageCache) GetForPeer(mid string, p peer.ID) (*pb.Message, int, bool)

                                            func (*MessageCache) GetGossipIDs

                                            func (mc *MessageCache) GetGossipIDs(topic string) []string

                                            func (*MessageCache) Put

                                            func (mc *MessageCache) Put(msg *pb.Message)

                                            func (*MessageCache) SetMsgIdFn

                                            func (mc *MessageCache) SetMsgIdFn(msgID MsgIdFunction)

                                            func (*MessageCache) Shift

                                            func (mc *MessageCache) Shift()

                                            type MessageSignaturePolicy

                                            type MessageSignaturePolicy uint8

                                              MessageSignaturePolicy describes if signatures are produced, expected, and/or verified.

                                              type MsgIdFunction

                                              type MsgIdFunction func(pmsg *pb.Message) string

                                                MsgIdFunction returns a unique ID for the passed Message, and PubSub can be customized to use any implementation of this function by configuring it with the Option from WithMessageIdFn.

                                                type Option

                                                type Option func(*PubSub) error

                                                func WithBlacklist

                                                func WithBlacklist(b Blacklist) Option

                                                  WithBlacklist provides an implementation of the blacklist; the default is a MapBlacklist

                                                  func WithDirectConnectTicks

                                                  func WithDirectConnectTicks(t uint64) Option

                                                    WithDirectConnectTicks is a gossipsub router option that sets the number of heartbeat ticks between attempting to reconnect direct peers that are not currently connected. A "tick" is based on the heartbeat interval, which is 1s by default. The default value for direct connect ticks is 300.

                                                    func WithDirectPeers

                                                    func WithDirectPeers(pis []peer.AddrInfo) Option

                                                      WithDirectPeers is a gossipsub router option that specifies peers with direct peering agreements. These peers are connected outside of the mesh, with all (valid) message unconditionally forwarded to them. The router will maintain open connections to these peers. Note that the peering agreement should be reciprocal with direct peers symmetrically configured at both ends.

                                                      func WithDiscovery

                                                      func WithDiscovery(d discovery.Discovery, opts ...DiscoverOpt) Option

                                                        WithDiscovery provides a discovery mechanism used to bootstrap and provide peers into PubSub

                                                        func WithEventTracer

                                                        func WithEventTracer(tracer EventTracer) Option

                                                          WithEventTracer provides a tracer for the pubsub system

                                                          func WithFloodPublish

                                                          func WithFloodPublish(floodPublish bool) Option

                                                            WithFloodPublish is a gossipsub router option that enables flood publishing. When this is enabled, published messages are forwarded to all peers with score >= to publishThreshold

                                                            func WithMaxMessageSize

                                                            func WithMaxMessageSize(maxMessageSize int) Option

                                                              WithMaxMessageSize sets the global maximum message size for pubsub wire messages. The default value is 1MiB (DefaultMaxMessageSize).

                                                              Observe the following warnings when setting this option.

                                                              WARNING #1: Make sure to change the default protocol prefixes for floodsub (FloodSubID) and gossipsub (GossipSubID). This avoids accidentally joining the public default network, which uses the default max message size, and therefore will cause messages to be dropped.

                                                              WARNING #2: Reducing the default max message limit is fine, if you are certain that your application messages will not exceed the new limit. However, be wary of increasing the limit, as pubsub networks are naturally write-amplifying, i.e. for every message we receive, we send D copies of the message to our peers. If those messages are large, the bandwidth requirements will grow linearly. Note that propagation is sent on the uplink, which traditionally is more constrained than the downlink. Instead, consider out-of-band retrieval for large messages, by sending a CID (Content-ID) or another type of locator, such that messages can be fetched on-demand, rather than being pushed proactively. Under this design, you'd use the pubsub layer as a signalling system, rather than a data delivery system.

                                                              func WithMessageAuthor

                                                              func WithMessageAuthor(author peer.ID) Option

                                                                WithMessageAuthor sets the author for outbound messages to the given peer ID (defaults to the host's ID). If message signing is enabled, the private key must be available in the host's peerstore.

                                                                func WithMessageIdFn

                                                                func WithMessageIdFn(fn MsgIdFunction) Option

                                                                  WithMessageIdFn is an option to customize the way a message ID is computed for a pubsub message. The default ID function is DefaultMsgIdFn (concatenate source and seq nr.), but it can be customized to e.g. the hash of the message.

                                                                  func WithMessageSignaturePolicy

                                                                  func WithMessageSignaturePolicy(policy MessageSignaturePolicy) Option

                                                                    WithMessageSignaturePolicy sets the mode of operation for producing and verifying message signatures.

                                                                    func WithMessageSigning

                                                                    func WithMessageSigning(enabled bool) Option

                                                                      WithMessageSigning enables or disables message signing (enabled by default). Deprecated: signature verification without message signing, or message signing without verification, are not recommended.

                                                                      func WithNoAuthor

                                                                      func WithNoAuthor() Option

                                                                        WithNoAuthor omits the author and seq-number data of messages, and disables the use of signatures. Not recommended to use with the default message ID function, see WithMessageIdFn.

                                                                        func WithPeerExchange

                                                                        func WithPeerExchange(doPX bool) Option

                                                                          WithPeerExchange is a gossipsub router option that enables Peer eXchange on PRUNE. This should generally be enabled in bootstrappers and well connected/trusted nodes used for bootstrapping.

                                                                          func WithPeerGater

                                                                          func WithPeerGater(params *PeerGaterParams) Option

                                                                            WithPeerGater is a gossipsub router option that enables reactive validation queue management. The Gater is activated if the ratio of throttled/validated messages exceeds the specified threshold. Once active, the Gater probabilistically throttles peers _before_ they enter the validation queue, performing Random Early Drop. The throttle decision is randomized, with the probability of allowing messages to enter the validation queue controlled by the statistical observations of the performance of all peers in the IP address of the gated peer. The Gater deactivates if there is no validation throttlinc occurring for the specified quiet interval.

                                                                            func WithPeerOutboundQueueSize

                                                                            func WithPeerOutboundQueueSize(size int) Option

                                                                              WithPeerOutboundQueueSize is an option to set the buffer size for outbound messages to a peer We start dropping messages to a peer if the outbound queue if full

                                                                              func WithPeerScore

                                                                              func WithPeerScore(params *PeerScoreParams, thresholds *PeerScoreThresholds) Option

                                                                                WithPeerScore is a gossipsub router option that enables peer scoring.

                                                                                func WithPeerScoreInspect

                                                                                func WithPeerScoreInspect(inspect interface{}, period time.Duration) Option

                                                                                  WithPeerScoreInspect is a gossipsub router option that enables peer score debugging. When this option is enabled, the supplied function will be invoked periodically to allow the application to inspect or dump the scores for connected peers. The supplied function can have one of two signatures:

                                                                                  - PeerScoreInspectFn, which takes a map of peer IDs to score.
                                                                                  - ExtendedPeerScoreInspectFn, which takes a map of peer IDs to
                                                                                    PeerScoreSnapshots and allows inspection of individual score
                                                                                    components for debugging peer scoring.
                                                                                  

                                                                                  This option must be passed _after_ the WithPeerScore option.

                                                                                  func WithStrictSignatureVerification

                                                                                  func WithStrictSignatureVerification(required bool) Option

                                                                                    WithStrictSignatureVerification is an option to enable or disable strict message signing. When enabled (which is the default), unsigned messages will be discarded. Deprecated: signature verification without message signing, or message signing without verification, are not recommended.

                                                                                    func WithSubscriptionFilter

                                                                                    func WithSubscriptionFilter(subFilter SubscriptionFilter) Option

                                                                                      WithSubscriptionFilter is a pubsub option that specifies a filter for subscriptions in topics of interest.

                                                                                      func WithValidateQueueSize

                                                                                      func WithValidateQueueSize(n int) Option

                                                                                        WithValidateQueueSize sets the buffer of validate queue. Defaults to 32. When queue is full, validation is throttled and new messages are dropped.

                                                                                        func WithValidateThrottle

                                                                                        func WithValidateThrottle(n int) Option

                                                                                          WithValidateThrottle sets the upper bound on the number of active validation goroutines across all topics. The default is 8192.

                                                                                          func WithValidateWorkers

                                                                                          func WithValidateWorkers(n int) Option

                                                                                            WithValidateWorkers sets the number of synchronous validation worker goroutines. Defaults to NumCPU.

                                                                                            The synchronous validation workers perform signature validation, apply inline user validators, and schedule asynchronous user validators. You can adjust this parameter to devote less cpu time to synchronous validation.

                                                                                            type PBTracer

                                                                                            type PBTracer struct {
                                                                                            	// contains filtered or unexported fields
                                                                                            }

                                                                                              PBTracer is a tracer that writes events to a file, as delimited protobufs.

                                                                                              func NewPBTracer

                                                                                              func NewPBTracer(file string) (*PBTracer, error)

                                                                                              func OpenPBTracer

                                                                                              func OpenPBTracer(file string, flags int, perm os.FileMode) (*PBTracer, error)

                                                                                                OpenPBTracer creates a new PBTracer, with explicit control of OpenFile flags and permissions.

                                                                                                func (*PBTracer) Close

                                                                                                func (t *PBTracer) Close()

                                                                                                func (*PBTracer) Trace

                                                                                                func (t *PBTracer) Trace(evt *pb.TraceEvent)

                                                                                                type PeerEvent

                                                                                                type PeerEvent struct {
                                                                                                	Type EventType
                                                                                                	Peer peer.ID
                                                                                                }

                                                                                                type PeerGaterParams

                                                                                                type PeerGaterParams struct {
                                                                                                	// when the ratio of throttled/validated messages exceeds this threshold, the gater turns on
                                                                                                	Threshold float64
                                                                                                	// (linear) decay parameter for gater counters
                                                                                                	GlobalDecay float64 // global counter decay
                                                                                                	SourceDecay float64 // per IP counter decay
                                                                                                	// decay interval
                                                                                                	DecayInterval time.Duration
                                                                                                	// counter zeroing threshold
                                                                                                	DecayToZero float64
                                                                                                	// how long to retain stats
                                                                                                	RetainStats time.Duration
                                                                                                	// quiet interval before turning off the gater; if there are no validation throttle events
                                                                                                	// for this interval, the gater turns off
                                                                                                	Quiet time.Duration
                                                                                                	// weight of duplicate message deliveries
                                                                                                	DuplicateWeight float64
                                                                                                	// weight of ignored messages
                                                                                                	IgnoreWeight float64
                                                                                                	// weight of rejected messages
                                                                                                	RejectWeight float64
                                                                                                
                                                                                                	// priority topic delivery weights
                                                                                                	TopicDeliveryWeights map[string]float64
                                                                                                }

                                                                                                  PeerGaterParams groups together parameters that control the operation of the peer gater

                                                                                                  func DefaultPeerGaterParams

                                                                                                  func DefaultPeerGaterParams() *PeerGaterParams

                                                                                                    DefaultPeerGaterParams creates a new PeerGaterParams struct using default values

                                                                                                    func NewPeerGaterParams

                                                                                                    func NewPeerGaterParams(threshold, globalDecay, sourceDecay float64) *PeerGaterParams

                                                                                                      NewPeerGaterParams creates a new PeerGaterParams struct, using the specified threshold and decay parameters and default values for all other parameters.

                                                                                                      func (*PeerGaterParams) WithTopicDeliveryWeights

                                                                                                      func (p *PeerGaterParams) WithTopicDeliveryWeights(w map[string]float64) *PeerGaterParams

                                                                                                        WithTopicDeliveryWeights is a fluid setter for the priority topic delivery weights

                                                                                                        type PeerScoreInspectFn

                                                                                                        type PeerScoreInspectFn = func(map[peer.ID]float64)

                                                                                                        type PeerScoreParams

                                                                                                        type PeerScoreParams struct {
                                                                                                        	// Score parameters per topic.
                                                                                                        	Topics map[string]*TopicScoreParams
                                                                                                        
                                                                                                        	// Aggregate topic score cap; this limits the total contribution of topics towards a positive
                                                                                                        	// score. It must be positive (or 0 for no cap).
                                                                                                        	TopicScoreCap float64
                                                                                                        
                                                                                                        	// P5: Application-specific peer scoring
                                                                                                        	AppSpecificScore  func(p peer.ID) float64
                                                                                                        	AppSpecificWeight float64
                                                                                                        
                                                                                                        	// P6: IP-colocation factor.
                                                                                                        	// The parameter has an associated counter which counts the number of peers with the same IP.
                                                                                                        	// If the number of peers in the same IP exceeds IPColocationFactorThreshold, then the value
                                                                                                        	// is the square of the difference, ie (PeersInSameIP - IPColocationThreshold)^2.
                                                                                                        	// If the number of peers in the same IP is less than the threshold, then the value is 0.
                                                                                                        	// The weight of the parameter MUST be negative, unless you want to disable for testing.
                                                                                                        	// Note: In order to simulate many IPs in a managable manner when testing, you can set the weight to 0
                                                                                                        	//       thus disabling the IP colocation penalty.
                                                                                                        	IPColocationFactorWeight    float64
                                                                                                        	IPColocationFactorThreshold int
                                                                                                        	IPColocationFactorWhitelist map[string]struct{}
                                                                                                        
                                                                                                        	// P7: behavioural pattern penalties.
                                                                                                        	// This parameter has an associated counter which tracks misbehaviour as detected by the
                                                                                                        	// router. The router currently applies penalties for the following behaviors:
                                                                                                        	// - attempting to re-graft before the prune backoff time has elapsed.
                                                                                                        	// - not following up in IWANT requests for messages advertised with IHAVE.
                                                                                                        	//
                                                                                                        	// The value of the parameter is the square of the counter over the threshold, which decays with
                                                                                                        	// BehaviourPenaltyDecay.
                                                                                                        	// The weight of the parameter MUST be negative (or zero to disable).
                                                                                                        	BehaviourPenaltyWeight, BehaviourPenaltyThreshold, BehaviourPenaltyDecay float64
                                                                                                        
                                                                                                        	// the decay interval for parameter counters.
                                                                                                        	DecayInterval time.Duration
                                                                                                        
                                                                                                        	// counter value below which it is considered 0.
                                                                                                        	DecayToZero float64
                                                                                                        
                                                                                                        	// time to remember counters for a disconnected peer.
                                                                                                        	RetainScore time.Duration
                                                                                                        }

                                                                                                        type PeerScoreSnapshot

                                                                                                        type PeerScoreSnapshot struct {
                                                                                                        	Score              float64
                                                                                                        	Topics             map[string]*TopicScoreSnapshot
                                                                                                        	AppSpecificScore   float64
                                                                                                        	IPColocationFactor float64
                                                                                                        	BehaviourPenalty   float64
                                                                                                        }

                                                                                                        type PeerScoreThresholds

                                                                                                        type PeerScoreThresholds struct {
                                                                                                        	// GossipThreshold is the score threshold below which gossip propagation is supressed;
                                                                                                        	// should be negative.
                                                                                                        	GossipThreshold float64
                                                                                                        
                                                                                                        	// PublishThreshold is the score threshold below which we shouldn't publish when using flood
                                                                                                        	// publishing (also applies to fanout and floodsub peers); should be negative and <= GossipThreshold.
                                                                                                        	PublishThreshold float64
                                                                                                        
                                                                                                        	// GraylistThreshold is the score threshold below which message processing is supressed altogether,
                                                                                                        	// implementing an effective graylist according to peer score; should be negative and <= PublisThreshold.
                                                                                                        	GraylistThreshold float64
                                                                                                        
                                                                                                        	// AcceptPXThreshold is the score threshold below which PX will be ignored; this should be positive
                                                                                                        	// and limited to scores attainable by bootstrappers and other trusted nodes.
                                                                                                        	AcceptPXThreshold float64
                                                                                                        
                                                                                                        	// OpportunisticGraftThreshold is the median mesh score threshold before triggering opportunistic
                                                                                                        	// grafting; this should have a small positive value.
                                                                                                        	OpportunisticGraftThreshold float64
                                                                                                        }

                                                                                                        type PubOpt

                                                                                                        type PubOpt func(pub *PublishOptions) error

                                                                                                        func WithReadiness

                                                                                                        func WithReadiness(ready RouterReady) PubOpt

                                                                                                          WithReadiness returns a publishing option for only publishing when the router is ready. This option is not useful unless PubSub is also using WithDiscovery

                                                                                                          type PubSub

                                                                                                          type PubSub struct {
                                                                                                          	// contains filtered or unexported fields
                                                                                                          }

                                                                                                            PubSub is the implementation of the pubsub system.

                                                                                                            func NewFloodSub

                                                                                                            func NewFloodSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error)

                                                                                                              NewFloodSub returns a new PubSub object using the FloodSubRouter.

                                                                                                              func NewFloodsubWithProtocols

                                                                                                              func NewFloodsubWithProtocols(ctx context.Context, h host.Host, ps []protocol.ID, opts ...Option) (*PubSub, error)

                                                                                                                NewFloodsubWithProtocols returns a new floodsub-enabled PubSub objecting using the protocols specified in ps.

                                                                                                                func NewGossipSub

                                                                                                                func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error)

                                                                                                                  NewGossipSub returns a new PubSub object using GossipSubRouter as the router.

                                                                                                                  func NewPubSub

                                                                                                                  func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option) (*PubSub, error)

                                                                                                                    NewPubSub returns a new PubSub management object.

                                                                                                                    func NewRandomSub

                                                                                                                    func NewRandomSub(ctx context.Context, h host.Host, size int, opts ...Option) (*PubSub, error)

                                                                                                                      NewRandomSub returns a new PubSub object using RandomSubRouter as the router.

                                                                                                                      func (*PubSub) BlacklistPeer

                                                                                                                      func (p *PubSub) BlacklistPeer(pid peer.ID)

                                                                                                                        BlacklistPeer blacklists a peer; all messages from this peer will be unconditionally dropped.

                                                                                                                        func (*PubSub) GetTopics

                                                                                                                        func (p *PubSub) GetTopics() []string

                                                                                                                          GetTopics returns the topics this node is subscribed to.

                                                                                                                          func (*PubSub) Join

                                                                                                                          func (p *PubSub) Join(topic string, opts ...TopicOpt) (*Topic, error)

                                                                                                                            Join joins the topic and returns a Topic handle. Only one Topic handle should exist per topic, and Join will error if the Topic handle already exists.

                                                                                                                            func (*PubSub) ListPeers

                                                                                                                            func (p *PubSub) ListPeers(topic string) []peer.ID

                                                                                                                              ListPeers returns a list of peers we are connected to in the given topic.

                                                                                                                              func (*PubSub) Publish

                                                                                                                              func (p *PubSub) Publish(topic string, data []byte, opts ...PubOpt) error

                                                                                                                                Publish publishes data to the given topic.

                                                                                                                                Deprecated: use pubsub.Join() and topic.Publish() instead

                                                                                                                                func (*PubSub) RegisterTopicValidator

                                                                                                                                func (p *PubSub) RegisterTopicValidator(topic string, val interface{}, opts ...ValidatorOpt) error

                                                                                                                                  RegisterTopicValidator registers a validator for topic. By default validators are asynchronous, which means they will run in a separate goroutine. The number of active goroutines is controlled by global and per topic validator throttles; if it exceeds the throttle threshold, messages will be dropped.

                                                                                                                                  func (*PubSub) Subscribe

                                                                                                                                  func (p *PubSub) Subscribe(topic string, opts ...SubOpt) (*Subscription, error)

                                                                                                                                    Subscribe returns a new Subscription for the given topic. Note that subscription is not an instanteneous operation. It may take some time before the subscription is processed by the pubsub main loop and propagated to our peers.

                                                                                                                                    Deprecated: use pubsub.Join() and topic.Subscribe() instead

                                                                                                                                    func (*PubSub) SubscribeByTopicDescriptor

                                                                                                                                    func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor, opts ...SubOpt) (*Subscription, error)

                                                                                                                                      SubscribeByTopicDescriptor lets you subscribe a topic using a pb.TopicDescriptor.

                                                                                                                                      Deprecated: use pubsub.Join() and topic.Subscribe() instead

                                                                                                                                      func (*PubSub) UnregisterTopicValidator

                                                                                                                                      func (p *PubSub) UnregisterTopicValidator(topic string) error

                                                                                                                                        UnregisterTopicValidator removes a validator from a topic. Returns an error if there was no validator registered with the topic.

                                                                                                                                        type PubSubNotif

                                                                                                                                        type PubSubNotif PubSub

                                                                                                                                        func (*PubSubNotif) ClosedStream

                                                                                                                                        func (p *PubSubNotif) ClosedStream(n network.Network, s network.Stream)

                                                                                                                                        func (*PubSubNotif) Connected

                                                                                                                                        func (p *PubSubNotif) Connected(n network.Network, c network.Conn)

                                                                                                                                        func (*PubSubNotif) Disconnected

                                                                                                                                        func (p *PubSubNotif) Disconnected(n network.Network, c network.Conn)

                                                                                                                                        func (*PubSubNotif) Initialize

                                                                                                                                        func (p *PubSubNotif) Initialize()

                                                                                                                                        func (*PubSubNotif) Listen

                                                                                                                                        func (p *PubSubNotif) Listen(n network.Network, _ ma.Multiaddr)

                                                                                                                                        func (*PubSubNotif) ListenClose

                                                                                                                                        func (p *PubSubNotif) ListenClose(n network.Network, _ ma.Multiaddr)

                                                                                                                                        func (*PubSubNotif) OpenedStream

                                                                                                                                        func (p *PubSubNotif) OpenedStream(n network.Network, s network.Stream)

                                                                                                                                        type PubSubRouter

                                                                                                                                        type PubSubRouter interface {
                                                                                                                                        	// Protocols returns the list of protocols supported by the router.
                                                                                                                                        	Protocols() []protocol.ID
                                                                                                                                        	// Attach is invoked by the PubSub constructor to attach the router to a
                                                                                                                                        	// freshly initialized PubSub instance.
                                                                                                                                        	Attach(*PubSub)
                                                                                                                                        	// AddPeer notifies the router that a new peer has been connected.
                                                                                                                                        	AddPeer(peer.ID, protocol.ID)
                                                                                                                                        	// RemovePeer notifies the router that a peer has been disconnected.
                                                                                                                                        	RemovePeer(peer.ID)
                                                                                                                                        	// EnoughPeers returns whether the router needs more peers before it's ready to publish new records.
                                                                                                                                        	// Suggested (if greater than 0) is a suggested number of peers that the router should need.
                                                                                                                                        	EnoughPeers(topic string, suggested int) bool
                                                                                                                                        	// AcceptFrom is invoked on any incoming message before pushing it to the validation pipeline
                                                                                                                                        	// or processing control information.
                                                                                                                                        	// Allows routers with internal scoring to vet peers before committing any processing resources
                                                                                                                                        	// to the message and implement an effective graylist and react to validation queue overload.
                                                                                                                                        	AcceptFrom(peer.ID) AcceptStatus
                                                                                                                                        	// HandleRPC is invoked to process control messages in the RPC envelope.
                                                                                                                                        	// It is invoked after subscriptions and payload messages have been processed.
                                                                                                                                        	HandleRPC(*RPC)
                                                                                                                                        	// Publish is invoked to forward a new message that has been validated.
                                                                                                                                        	Publish(*Message)
                                                                                                                                        	// Join notifies the router that we want to receive and forward messages in a topic.
                                                                                                                                        	// It is invoked after the subscription announcement.
                                                                                                                                        	Join(topic string)
                                                                                                                                        	// Leave notifies the router that we are no longer interested in a topic.
                                                                                                                                        	// It is invoked after the unsubscription announcement.
                                                                                                                                        	Leave(topic string)
                                                                                                                                        }

                                                                                                                                          PubSubRouter is the message router component of PubSub.

                                                                                                                                          type PublishOptions

                                                                                                                                          type PublishOptions struct {
                                                                                                                                          	// contains filtered or unexported fields
                                                                                                                                          }

                                                                                                                                          type RPC

                                                                                                                                          type RPC struct {
                                                                                                                                          	pb.RPC
                                                                                                                                          	// contains filtered or unexported fields
                                                                                                                                          }

                                                                                                                                          type RandomSubRouter

                                                                                                                                          type RandomSubRouter struct {
                                                                                                                                          	// contains filtered or unexported fields
                                                                                                                                          }

                                                                                                                                            RandomSubRouter is a router that implements a random propagation strategy. For each message, it selects the square root of the network size peers, with a min of RandomSubD, and forwards the message to them.

                                                                                                                                            func (*RandomSubRouter) AcceptFrom

                                                                                                                                            func (rs *RandomSubRouter) AcceptFrom(peer.ID) AcceptStatus

                                                                                                                                            func (*RandomSubRouter) AddPeer

                                                                                                                                            func (rs *RandomSubRouter) AddPeer(p peer.ID, proto protocol.ID)

                                                                                                                                            func (*RandomSubRouter) Attach

                                                                                                                                            func (rs *RandomSubRouter) Attach(p *PubSub)

                                                                                                                                            func (*RandomSubRouter) EnoughPeers

                                                                                                                                            func (rs *RandomSubRouter) EnoughPeers(topic string, suggested int) bool

                                                                                                                                            func (*RandomSubRouter) HandleRPC

                                                                                                                                            func (rs *RandomSubRouter) HandleRPC(rpc *RPC)

                                                                                                                                            func (*RandomSubRouter) Join

                                                                                                                                            func (rs *RandomSubRouter) Join(topic string)

                                                                                                                                            func (*RandomSubRouter) Leave

                                                                                                                                            func (rs *RandomSubRouter) Leave(topic string)

                                                                                                                                            func (*RandomSubRouter) Protocols

                                                                                                                                            func (rs *RandomSubRouter) Protocols() []protocol.ID

                                                                                                                                            func (*RandomSubRouter) Publish

                                                                                                                                            func (rs *RandomSubRouter) Publish(msg *Message)

                                                                                                                                            func (*RandomSubRouter) RemovePeer

                                                                                                                                            func (rs *RandomSubRouter) RemovePeer(p peer.ID)

                                                                                                                                            type RelayCancelFunc

                                                                                                                                            type RelayCancelFunc func()

                                                                                                                                            type RemoteTracer

                                                                                                                                            type RemoteTracer struct {
                                                                                                                                            	// contains filtered or unexported fields
                                                                                                                                            }

                                                                                                                                              RemoteTracer is a tracer that sends trace events to a remote peer

                                                                                                                                              func NewRemoteTracer

                                                                                                                                              func NewRemoteTracer(ctx context.Context, host host.Host, pi peer.AddrInfo) (*RemoteTracer, error)

                                                                                                                                                NewRemoteTracer constructs a RemoteTracer, tracing to the peer identified by pi

                                                                                                                                                func (*RemoteTracer) Close

                                                                                                                                                func (t *RemoteTracer) Close()

                                                                                                                                                func (*RemoteTracer) Trace

                                                                                                                                                func (t *RemoteTracer) Trace(evt *pb.TraceEvent)

                                                                                                                                                type RouterReady

                                                                                                                                                type RouterReady func(rt PubSubRouter, topic string) (bool, error)

                                                                                                                                                  RouterReady is a function that decides if a router is ready to publish

                                                                                                                                                  func MinTopicSize

                                                                                                                                                  func MinTopicSize(size int) RouterReady

                                                                                                                                                    MinTopicSize returns a function that checks if a router is ready for publishing based on the topic size. The router ultimately decides the whether it is ready or not, the given size is just a suggestion.

                                                                                                                                                    type SubOpt

                                                                                                                                                    type SubOpt func(sub *Subscription) error

                                                                                                                                                    type Subscription

                                                                                                                                                    type Subscription struct {
                                                                                                                                                    	// contains filtered or unexported fields
                                                                                                                                                    }

                                                                                                                                                      Subscription handles the details of a particular Topic subscription. There may be many subscriptions for a given Topic.

                                                                                                                                                      func (*Subscription) Cancel

                                                                                                                                                      func (sub *Subscription) Cancel()

                                                                                                                                                        Cancel closes the subscription. If this is the last active subscription then pubsub will send an unsubscribe announcement to the network.

                                                                                                                                                        func (*Subscription) Next

                                                                                                                                                        func (sub *Subscription) Next(ctx context.Context) (*Message, error)

                                                                                                                                                          Next returns the next message in our subscription

                                                                                                                                                          func (*Subscription) Topic

                                                                                                                                                          func (sub *Subscription) Topic() string

                                                                                                                                                            Topic returns the topic string associated with the Subscription

                                                                                                                                                            type SubscriptionFilter

                                                                                                                                                            type SubscriptionFilter interface {
                                                                                                                                                            	// CanSubscribe returns true if the topic is of interest and we can subscribe to it
                                                                                                                                                            	CanSubscribe(topic string) bool
                                                                                                                                                            
                                                                                                                                                            	// FilterIncomingSubscriptions is invoked for all RPCs containing subscription notifications.
                                                                                                                                                            	// It should filter only the subscriptions of interest and my return an error if (for instance)
                                                                                                                                                            	// there are too many subscriptions.
                                                                                                                                                            	FilterIncomingSubscriptions(peer.ID, []*pb.RPC_SubOpts) ([]*pb.RPC_SubOpts, error)
                                                                                                                                                            }

                                                                                                                                                              SubscriptionFilter is a function that tells us whether we are interested in allowing and tracking subscriptions for a given topic.

                                                                                                                                                              The filter is consulted whenever a subscription notification is received by another peer; if the filter returns false, then the notification is ignored.

                                                                                                                                                              The filter is also consulted when joining topics; if the filter returns false, then the Join operation will result in an error.

                                                                                                                                                              func NewAllowlistSubscriptionFilter

                                                                                                                                                              func NewAllowlistSubscriptionFilter(topics ...string) SubscriptionFilter

                                                                                                                                                                NewAllowlistSubscriptionFilter creates a subscription filter that only allows explicitly specified topics for local subscriptions and incoming peer subscriptions.

                                                                                                                                                                func NewRegexpSubscriptionFilter

                                                                                                                                                                func NewRegexpSubscriptionFilter(rx *regexp.Regexp) SubscriptionFilter

                                                                                                                                                                  NewRegexpSubscriptionFilter creates a subscription filter that only allows topics that match a regular expression for local subscriptions and incoming peer subscriptions.

                                                                                                                                                                  Warning: the user should take care to match start/end of string in the supplied regular expression, otherwise the filter might match unwanted topics unexpectedly.

                                                                                                                                                                  func WrapLimitSubscriptionFilter

                                                                                                                                                                  func WrapLimitSubscriptionFilter(filter SubscriptionFilter, limit int) SubscriptionFilter

                                                                                                                                                                    WrapLimitSubscriptionFilter wraps a subscription filter with a hard limit in the number of subscriptions allowed in an RPC message.

                                                                                                                                                                    type TimeCachedBlacklist

                                                                                                                                                                    type TimeCachedBlacklist struct {
                                                                                                                                                                    	sync.RWMutex
                                                                                                                                                                    	// contains filtered or unexported fields
                                                                                                                                                                    }

                                                                                                                                                                      TimeCachedBlacklist is a blacklist implementation using a time cache

                                                                                                                                                                      func (*TimeCachedBlacklist) Add

                                                                                                                                                                      func (b *TimeCachedBlacklist) Add(p peer.ID) bool

                                                                                                                                                                        Add returns a bool saying whether Add of peer was successful

                                                                                                                                                                        func (*TimeCachedBlacklist) Contains

                                                                                                                                                                        func (b *TimeCachedBlacklist) Contains(p peer.ID) bool

                                                                                                                                                                        type Topic

                                                                                                                                                                        type Topic struct {
                                                                                                                                                                        	// contains filtered or unexported fields
                                                                                                                                                                        }

                                                                                                                                                                          Topic is the handle for a pubsub topic

                                                                                                                                                                          func (*Topic) Close

                                                                                                                                                                          func (t *Topic) Close() error

                                                                                                                                                                            Close closes down the topic. Will return an error unless there are no active event handlers or subscriptions. Does not error if the topic is already closed.

                                                                                                                                                                            func (*Topic) EventHandler

                                                                                                                                                                            func (t *Topic) EventHandler(opts ...TopicEventHandlerOpt) (*TopicEventHandler, error)

                                                                                                                                                                              EventHandler creates a handle for topic specific events Multiple event handlers may be created and will operate independently of each other

                                                                                                                                                                              func (*Topic) ListPeers

                                                                                                                                                                              func (t *Topic) ListPeers() []peer.ID

                                                                                                                                                                                ListPeers returns a list of peers we are connected to in the given topic.

                                                                                                                                                                                func (*Topic) Publish

                                                                                                                                                                                func (t *Topic) Publish(ctx context.Context, data []byte, opts ...PubOpt) error

                                                                                                                                                                                  Publish publishes data to topic.

                                                                                                                                                                                  func (*Topic) Relay

                                                                                                                                                                                  func (t *Topic) Relay() (RelayCancelFunc, error)

                                                                                                                                                                                    Relay enables message relaying for the topic and returns a reference cancel function. Subsequent calls increase the reference counter. To completely disable the relay, all references must be cancelled.

                                                                                                                                                                                    func (*Topic) SetScoreParams

                                                                                                                                                                                    func (t *Topic) SetScoreParams(p *TopicScoreParams) error

                                                                                                                                                                                      SetScoreParams sets the topic score parameters if the pubsub router supports peer scoring

                                                                                                                                                                                      func (*Topic) String

                                                                                                                                                                                      func (t *Topic) String() string

                                                                                                                                                                                        String returns the topic associated with t

                                                                                                                                                                                        func (*Topic) Subscribe

                                                                                                                                                                                        func (t *Topic) Subscribe(opts ...SubOpt) (*Subscription, error)

                                                                                                                                                                                          Subscribe returns a new Subscription for the topic. Note that subscription is not an instanteneous operation. It may take some time before the subscription is processed by the pubsub main loop and propagated to our peers.

                                                                                                                                                                                          type TopicEventHandler

                                                                                                                                                                                          type TopicEventHandler struct {
                                                                                                                                                                                          	// contains filtered or unexported fields
                                                                                                                                                                                          }

                                                                                                                                                                                            TopicEventHandler is used to manage topic specific events. No Subscription is required to receive events.

                                                                                                                                                                                            func (*TopicEventHandler) Cancel

                                                                                                                                                                                            func (t *TopicEventHandler) Cancel()

                                                                                                                                                                                              Cancel closes the topic event handler

                                                                                                                                                                                              func (*TopicEventHandler) NextPeerEvent

                                                                                                                                                                                              func (t *TopicEventHandler) NextPeerEvent(ctx context.Context) (PeerEvent, error)

                                                                                                                                                                                                NextPeerEvent returns the next event regarding subscribed peers Guarantees: Peer Join and Peer Leave events for a given peer will fire in order. Unless a peer both Joins and Leaves before NextPeerEvent emits either event all events will eventually be received from NextPeerEvent.

                                                                                                                                                                                                type TopicEventHandlerOpt

                                                                                                                                                                                                type TopicEventHandlerOpt func(t *TopicEventHandler) error

                                                                                                                                                                                                type TopicOpt

                                                                                                                                                                                                type TopicOpt func(t *Topic) error

                                                                                                                                                                                                type TopicOptions

                                                                                                                                                                                                type TopicOptions struct{}

                                                                                                                                                                                                type TopicScoreParams

                                                                                                                                                                                                type TopicScoreParams struct {
                                                                                                                                                                                                	// The weight of the topic.
                                                                                                                                                                                                	TopicWeight float64
                                                                                                                                                                                                
                                                                                                                                                                                                	// P1: time in the mesh
                                                                                                                                                                                                	// This is the time the peer has ben grafted in the mesh.
                                                                                                                                                                                                	// The value of of the parameter is the time/TimeInMeshQuantum, capped by TimeInMeshCap
                                                                                                                                                                                                	// The weight of the parameter MUST be positive (or zero to disable).
                                                                                                                                                                                                	TimeInMeshWeight  float64
                                                                                                                                                                                                	TimeInMeshQuantum time.Duration
                                                                                                                                                                                                	TimeInMeshCap     float64
                                                                                                                                                                                                
                                                                                                                                                                                                	// P2: first message deliveries
                                                                                                                                                                                                	// This is the number of message deliveries in the topic.
                                                                                                                                                                                                	// The value of the parameter is a counter, decaying with FirstMessageDeliveriesDecay, and capped
                                                                                                                                                                                                	// by FirstMessageDeliveriesCap.
                                                                                                                                                                                                	// The weight of the parameter MUST be positive (or zero to disable).
                                                                                                                                                                                                	FirstMessageDeliveriesWeight, FirstMessageDeliveriesDecay float64
                                                                                                                                                                                                	FirstMessageDeliveriesCap                                 float64
                                                                                                                                                                                                
                                                                                                                                                                                                	// P3: mesh message deliveries
                                                                                                                                                                                                	// This is the number of message deliveries in the mesh, within the MeshMessageDeliveriesWindow of
                                                                                                                                                                                                	// message validation; deliveries during validation also count and are retroactively applied
                                                                                                                                                                                                	// when validation succeeds.
                                                                                                                                                                                                	// This window accounts for the minimum time before a hostile mesh peer trying to game the score
                                                                                                                                                                                                	// could replay back a valid message we just sent them.
                                                                                                                                                                                                	// It effectively tracks first and near-first deliveries, ie a message seen from a mesh peer
                                                                                                                                                                                                	// before we have forwarded it to them.
                                                                                                                                                                                                	// The parameter has an associated counter, decaying with MeshMessageDeliveriesDecay.
                                                                                                                                                                                                	// If the counter exceeds the threshold, its value is 0.
                                                                                                                                                                                                	// If the counter is below the MeshMessageDeliveriesThreshold, the value is the square of
                                                                                                                                                                                                	// the deficit, ie (MessageDeliveriesThreshold - counter)^2
                                                                                                                                                                                                	// The penalty is only activated after MeshMessageDeliveriesActivation time in the mesh.
                                                                                                                                                                                                	// The weight of the parameter MUST be negative (or zero to disable).
                                                                                                                                                                                                	MeshMessageDeliveriesWeight, MeshMessageDeliveriesDecay      float64
                                                                                                                                                                                                	MeshMessageDeliveriesCap, MeshMessageDeliveriesThreshold     float64
                                                                                                                                                                                                	MeshMessageDeliveriesWindow, MeshMessageDeliveriesActivation time.Duration
                                                                                                                                                                                                
                                                                                                                                                                                                	// P3b: sticky mesh propagation failures
                                                                                                                                                                                                	// This is a sticky penalty that applies when a peer gets pruned from the mesh with an active
                                                                                                                                                                                                	// mesh message delivery penalty.
                                                                                                                                                                                                	// The weight of the parameter MUST be negative (or zero to disable)
                                                                                                                                                                                                	MeshFailurePenaltyWeight, MeshFailurePenaltyDecay float64
                                                                                                                                                                                                
                                                                                                                                                                                                	// P4: invalid messages
                                                                                                                                                                                                	// This is the number of invalid messages in the topic.
                                                                                                                                                                                                	// The value of the parameter is the square of the counter, decaying with
                                                                                                                                                                                                	// InvalidMessageDeliveriesDecay.
                                                                                                                                                                                                	// The weight of the parameter MUST be negative (or zero to disable).
                                                                                                                                                                                                	InvalidMessageDeliveriesWeight, InvalidMessageDeliveriesDecay float64
                                                                                                                                                                                                }

                                                                                                                                                                                                type TopicScoreSnapshot

                                                                                                                                                                                                type TopicScoreSnapshot struct {
                                                                                                                                                                                                	TimeInMesh               time.Duration
                                                                                                                                                                                                	FirstMessageDeliveries   float64
                                                                                                                                                                                                	MeshMessageDeliveries    float64
                                                                                                                                                                                                	InvalidMessageDeliveries float64
                                                                                                                                                                                                }

                                                                                                                                                                                                type ValidationResult

                                                                                                                                                                                                type ValidationResult int

                                                                                                                                                                                                  ValidationResult represents the decision of an extended validator

                                                                                                                                                                                                  type Validator

                                                                                                                                                                                                  type Validator func(context.Context, peer.ID, *Message) bool

                                                                                                                                                                                                    Validator is a function that validates a message with a binary decision: accept or reject.

                                                                                                                                                                                                    type ValidatorEx

                                                                                                                                                                                                    type ValidatorEx func(context.Context, peer.ID, *Message) ValidationResult

                                                                                                                                                                                                      ValidatorEx is an extended validation function that validates a message with an enumerated decision

                                                                                                                                                                                                      type ValidatorOpt

                                                                                                                                                                                                      type ValidatorOpt func(addVal *addValReq) error

                                                                                                                                                                                                        ValidatorOpt is an option for RegisterTopicValidator.

                                                                                                                                                                                                        func WithValidatorConcurrency

                                                                                                                                                                                                        func WithValidatorConcurrency(n int) ValidatorOpt

                                                                                                                                                                                                          WithValidatorConcurrency is an option that sets the topic validator throttle. This controls the number of active validation goroutines for the topic; the default is 1024.

                                                                                                                                                                                                          func WithValidatorInline

                                                                                                                                                                                                          func WithValidatorInline(inline bool) ValidatorOpt

                                                                                                                                                                                                            WithValidatorInline is an option that sets the validation disposition to synchronous: it will be executed inline in validation front-end, without spawning a new goroutine. This is suitable for simple or cpu-bound validators that do not block.

                                                                                                                                                                                                            func WithValidatorTimeout

                                                                                                                                                                                                            func WithValidatorTimeout(timeout time.Duration) ValidatorOpt

                                                                                                                                                                                                              WithValidatorTimeout is an option that sets a timeout for an (asynchronous) topic validator. By default there is no timeout in asynchronous validators.

                                                                                                                                                                                                              Directories

                                                                                                                                                                                                              Path Synopsis