pubsub

package module
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 3, 2020 License: MIT Imports: 31 Imported by: 0

README

go-libp2p-pubsub

Discourse posts

A pubsub system with flooding and gossiping variants.

This is the canonical pubsub implementation for libp2p.

We currently provide three implementations:

  • floodsub, which is the baseline flooding protocol.
  • gossipsub, which is a more advanced router with mesh formation and gossip propagation. See spec and implementation for more details.
  • randomsub, which is a simple probabilistic router that propagates to random subsets of peers.

Table of Contents

Install

go get github.com/libp2p/go-libp2p-pubsub

Usage

To be used for messaging in p2p instrastructure (as part of libp2p) such as IPFS, Ethereum, other blockchains, etc.

Implementations

See libp2p/specs/pubsub#Implementations.

Documentation

See the libp2p specs for high level documentation and godoc for API documentation.

Tracing

The pubsub system supports tracing, which collects all events pertaining to the internals of the system. This allows you to recreate the complete message flow and state of the system for analysis purposes.

To enable tracing, instantiate the pubsub system using the WithEventTracer option; the option accepts a tracer with three available implementations in-package (trace to json, pb, or a remote peer). If you want to trace using a remote peer, you can do so using the traced daemon from go-libp2p-pubsub-tracer. The package also includes a utility program, tracestat, for analyzing the traces collected by the daemon.

For instance, to capture the trace as a json file, you can use the following option:

pubsub.NewGossipSub(..., pubsub.NewEventTracer(pubsub.NewJSONTracer("/path/to/trace.json")))

To capture the trace as a protobuf, you can use the following option:

pubsub.NewGossipSub(..., pubsub.NewEventTracer(pubsub.NewPBTracer("/path/to/trace.pb")))

Finally, to use the remote tracer, you can use the following incantations:

// assuming that your tracer runs in x.x.x.x and has a peer ID of QmTracer
pi, err := peer.AddrInfoFromP2pAddr(ma.StringCast("/ip4/x.x.x.x/tcp/4001/p2p/QmTracer"))
if err != nil {
  panic(err)
}

tracer, err := pubsub.NewRemoteTracer(ctx, host, pi)
if err != nil {
  panic(err)
}

ps, err := pubsub.NewGossipSub(..., pubsub.WithEventTracer(tracer))

Contribute

Contributions welcome. Please check out the issues.

Check out our contributing document for more information on how we work, and about contributing in general. Please be aware that all interactions related to multiformats are subject to the IPFS Code of Conduct.

Small note: If editing the README, please conform to the standard-readme specification.

License

MIT © Jeromy Johnson


The last gx published version of this module was: 0.11.16: QmfB4oDUTiaGEqT13P1JqCEhqW7cB1wpKtq3PP4BN8PhQd

Documentation

Overview

The pubsub package provides facilities for the Publish/Subscribe pattern of message propagation, also known as overlay multicast. The implementation provides topic-based pubsub, with pluggable routing algorithms.

The main interface to the library is the PubSub object. You can construct this object with the following constructors:

- NewFloodSub creates an instance that uses the floodsub routing algorithm.

- NewGossipSub creates an instance that uses the gossipsub routing algorithm.

- NewRandomSub creates an instance that uses the randomsub routing algorithm.

In addition, there is a generic constructor that creates a pubsub instance with a custom PubSubRouter interface. This procedure is currently reserved for internal use within the package.

Once you have constructed a PubSub instance, you need to establish some connections to your peers; the implementation relies on ambient peer discovery, leaving bootstrap and active peer discovery up to the client.

To publish a message to some topic, use Publish; you don't need to be subscribed to the topic in order to publish.

To subscribe to a topic, use Subscribe; this will give you a subscription interface from which new messages can be pumped.

Index

Constants

View Source
const (
	FloodSubID              = protocol.ID("/floodsub/1.0.0")
	FloodSubTopicSearchSize = 5
)
View Source
const DandelionEpochInterval = 5 * time.Second
View Source
const DandelionStemCheckInterval = 500 * time.Millisecond
View Source
const DandelionStemExpireSpan = 60 * time.Second
View Source
const DefaultMaxMessageSize = 1 << 20

DefaultMaximumMessageSize is 1mb.

View Source
const (
	GossipSubID = protocol.ID("/meshsub/1.0.0")
)
View Source
const (
	RandomSubID = protocol.ID("/randomsub/1.0.0")
)
View Source
const RemoteTracerProtoID = protocol.ID("/libp2p/pubsub/tracer/1.0.0")
View Source
const SignPrefix = "libp2p-pubsub:"

Variables

View Source
var (

	// DiscoveryPollInitialDelay is how long the discovery system waits after it first starts before polling
	DiscoveryPollInitialDelay = 0 * time.Millisecond
	// DiscoveryPollInterval is approximately how long the discovery system waits in between checks for whether the
	// more peers are needed for any topic
	DiscoveryPollInterval = 1 * time.Second
)
View Source
var (
	// overlay parameters
	GossipSubD   = 6
	GossipSubDlo = 4
	GossipSubDhi = 12

	// gossip parameters
	GossipSubHistoryLength = 5
	GossipSubHistoryGossip = 3

	// heartbeat interval
	GossipSubHeartbeatInitialDelay = 100 * time.Millisecond
	GossipSubHeartbeatInterval     = 1 * time.Second

	// fanout ttl
	GossipSubFanoutTTL = 60 * time.Second
)
View Source
var ErrTopicClosed = errors.New("this Topic is closed, try opening a new one")

ErrTopicClosed is returned if a Topic is utilized after it has been closed

View Source
var MinTraceBatchSize = 16
View Source
var (
	RandomSubD = 6
)
View Source
var (
	TimeCacheDuration = 120 * time.Second
)
View Source
var TraceBufferSize = 1 << 16 // 64K ought to be enough for everyone; famous last words.

Functions

func DefaultMsgIdFn

func DefaultMsgIdFn(pmsg *pb.Message) string

msgID returns a unique ID of the passed Message

func RandStringBytes

func RandStringBytes(n int) string

Types

type BackoffConnectorFactory

type BackoffConnectorFactory func(host host.Host) (*discimpl.BackoffConnector, error)

BackoffConnectorFactory creates a BackoffConnector that is attached to a given host

type Blacklist

type Blacklist interface {
	Add(peer.ID)
	Contains(peer.ID) bool
}

Blacklist is an interface for peer blacklisting.

func NewMapBlacklist

func NewMapBlacklist() Blacklist

NewMapBlacklist creates a new MapBlacklist

func NewTimeCachedBlacklist

func NewTimeCachedBlacklist(expiry time.Duration) (Blacklist, error)

NewTimeCachedBlacklist creates a new TimeCachedBlacklist with the given expiry duration

type CacheEntry

type CacheEntry struct {
	// contains filtered or unexported fields
}

type DiscoverOpt

type DiscoverOpt func(*discoverOptions) error

func WithDiscoverConnector

func WithDiscoverConnector(connFactory BackoffConnectorFactory) DiscoverOpt

WithDiscoverConnector adds a custom connector that deals with how the discovery subsystem connects to peers

func WithDiscoveryOpts

func WithDiscoveryOpts(opts ...discovery.Option) DiscoverOpt

WithDiscoveryOpts passes libp2p Discovery options into the PubSub discovery subsystem

type EventTracer

type EventTracer interface {
	Trace(evt *pb.TraceEvent)
}

Generic event tracer interface

type EventType

type EventType int
const (
	PeerJoin EventType = iota
	PeerLeave
)

type FloodSubRouter

type FloodSubRouter struct {
	// contains filtered or unexported fields
}

func (*FloodSubRouter) AddPeer

func (fs *FloodSubRouter) AddPeer(p peer.ID, proto protocol.ID)

func (*FloodSubRouter) Attach

func (fs *FloodSubRouter) Attach(p *PubSub)

func (*FloodSubRouter) EnoughPeers

func (fs *FloodSubRouter) EnoughPeers(topic string, suggested int) bool

func (*FloodSubRouter) HandleRPC

func (fs *FloodSubRouter) HandleRPC(rpc *RPC)

func (*FloodSubRouter) Join

func (fs *FloodSubRouter) Join(topic string)

func (*FloodSubRouter) Leave

func (fs *FloodSubRouter) Leave(topic string)

func (*FloodSubRouter) Protocols

func (fs *FloodSubRouter) Protocols() []protocol.ID

func (*FloodSubRouter) Publish

func (fs *FloodSubRouter) Publish(from peer.ID, msg *pb.Message)

func (*FloodSubRouter) RemovePeer

func (fs *FloodSubRouter) RemovePeer(p peer.ID)

type GossipSubRouter

type GossipSubRouter struct {
	// contains filtered or unexported fields
}

GossipSubRouter is a router that implements the gossipsub protocol. For each topic we have joined, we maintain an overlay through which messages flow; this is the mesh map. For each topic we publish to without joining, we maintain a list of peers to use for injecting our messages in the overlay with stable routes; this is the fanout map. Fanout peer lists are expired if we don't publish any messages to their topic for GossipSubFanoutTTL.

func (*GossipSubRouter) AddPeer

func (gs *GossipSubRouter) AddPeer(p peer.ID, proto protocol.ID)

func (*GossipSubRouter) Attach

func (gs *GossipSubRouter) Attach(p *PubSub)

func (*GossipSubRouter) EnoughPeers

func (gs *GossipSubRouter) EnoughPeers(topic string, suggested int) bool

func (*GossipSubRouter) HandleRPC

func (gs *GossipSubRouter) HandleRPC(rpc *RPC)

func (*GossipSubRouter) Join

func (gs *GossipSubRouter) Join(topic string)

func (*GossipSubRouter) Leave

func (gs *GossipSubRouter) Leave(topic string)

func (*GossipSubRouter) Protocols

func (gs *GossipSubRouter) Protocols() []protocol.ID

func (*GossipSubRouter) Publish

func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message)

func (*GossipSubRouter) RemovePeer

func (gs *GossipSubRouter) RemovePeer(p peer.ID)

type JSONTracer

type JSONTracer struct {
	// contains filtered or unexported fields
}

JSONTracer is a tracer that writes events to a file, encoded in ndjson.

func NewJSONTracer

func NewJSONTracer(file string) (*JSONTracer, error)

NewJsonTracer creates a new JSONTracer writing traces to file.

func OpenJSONTracer

func OpenJSONTracer(file string, flags int, perm os.FileMode) (*JSONTracer, error)

OpenJSONTracer creates a new JSONTracer, with explicit control of OpenFile flags and permissions.

func (*JSONTracer) Close

func (t *JSONTracer) Close()

func (*JSONTracer) Trace

func (t *JSONTracer) Trace(evt *pb.TraceEvent)

type MapBlacklist

type MapBlacklist map[peer.ID]struct{}

MapBlacklist is a blacklist implementation using a perfect map

func (MapBlacklist) Add

func (b MapBlacklist) Add(p peer.ID)

func (MapBlacklist) Contains

func (b MapBlacklist) Contains(p peer.ID) bool

type Message

type Message struct {
	*pb.Message
	ReceivedFrom  peer.ID
	ValidatorData interface{}
}

func (*Message) GetFrom

func (m *Message) GetFrom() peer.ID

type MessageCache

type MessageCache struct {
	// contains filtered or unexported fields
}

func NewMessageCache

func NewMessageCache(gossip, history int) *MessageCache

NewMessageCache creates a sliding window cache that remembers messages for as long as `history` slots.

When queried for messages to advertise, the cache only returns messages in the last `gossip` slots.

The `gossip` parameter must be smaller or equal to `history`, or this function will panic.

The slack between `gossip` and `history` accounts for the reaction time between when a message is advertised via IHAVE gossip, and the peer pulls it via an IWANT command.

func (*MessageCache) Get

func (mc *MessageCache) Get(mid string) (*pb.Message, bool)

func (*MessageCache) GetGossipIDs

func (mc *MessageCache) GetGossipIDs(topic string) []string

func (*MessageCache) Put

func (mc *MessageCache) Put(msg *pb.Message)

func (*MessageCache) SetMsgIdFn

func (mc *MessageCache) SetMsgIdFn(msgID MsgIdFunction)

func (*MessageCache) Shift

func (mc *MessageCache) Shift()

type MsgIdFunction

type MsgIdFunction func(pmsg *pb.Message) string

MsgIdFunction returns a unique ID for the passed Message, and PubSub can be customized to use any implementation of this function by configuring it with the Option from WithMessageIdFn.

type Option

type Option func(*PubSub) error

func WithBlacklist

func WithBlacklist(b Blacklist) Option

WithBlacklist provides an implementation of the blacklist; the default is a MapBlacklist

func WithDiscovery

func WithDiscovery(d discovery.Discovery, opts ...DiscoverOpt) Option

WithDiscovery provides a discovery mechanism used to bootstrap and provide peers into PubSub

func WithEventTracer

func WithEventTracer(tracer EventTracer) Option

WithEventTracer provides a tracer for the pubsub system

func WithMaxMessageSize

func WithMaxMessageSize(maxMessageSize int) Option

WithMaxMessageSize sets the global maximum message size for pubsub wire messages. The default value is 1MiB (DefaultMaxMessageSize).

Observe the following warnings when setting this option.

WARNING #1: Make sure to change the default protocol prefixes for floodsub (FloodSubID) and gossipsub (GossipSubID). This avoids accidentally joining the public default network, which uses the default max message size, and therefore will cause messages to be dropped.

WARNING #2: Reducing the default max message limit is fine, if you are certain that your application messages will not exceed the new limit. However, be wary of increasing the limit, as pubsub networks are naturally write-amplifying, i.e. for every message we receive, we send D copies of the message to our peers. If those messages are large, the bandwidth requirements will grow linearly. Note that propagation is sent on the uplink, which traditionally is more constrained than the downlink. Instead, consider out-of-band retrieval for large messages, by sending a CID (Content-ID) or another type of locator, such that messages can be fetched on-demand, rather than being pushed proactively. Under this design, you'd use the pubsub layer as a signalling system, rather than a data delivery system.

func WithMessageAuthor

func WithMessageAuthor(author peer.ID) Option

WithMessageAuthor sets the author for outbound messages to the given peer ID (defaults to the host's ID). If message signing is enabled, the private key must be available in the host's peerstore.

func WithMessageIdFn

func WithMessageIdFn(fn MsgIdFunction) Option

WithMessageIdFn is an option to customize the way a message ID is computed for a pubsub message. The default ID function is DefaultMsgIdFn (concatenate source and seq nr.), but it can be customized to e.g. the hash of the message.

func WithMessageSigning

func WithMessageSigning(enabled bool) Option

WithMessageSigning enables or disables message signing (enabled by default).

func WithPeerOutboundQueueSize

func WithPeerOutboundQueueSize(size int) Option

WithPeerOutboundQueueSize is an option to set the buffer size for outbound messages to a peer We start dropping messages to a peer if the outbound queue if full

func WithStrictSignatureVerification

func WithStrictSignatureVerification(required bool) Option

WithStrictSignatureVerification is an option to enable or disable strict message signing. When enabled (which is the default), unsigned messages will be discarded.

func WithValidateQueueSize

func WithValidateQueueSize(n int) Option

WithValidateQueueSize sets the buffer of validate queue. Defaults to 32. When queue is full, validation is throttled and new messages are dropped.

func WithValidateThrottle

func WithValidateThrottle(n int) Option

WithValidateThrottle sets the upper bound on the number of active validation goroutines across all topics. The default is 8192.

func WithValidateWorkers

func WithValidateWorkers(n int) Option

WithValidateWorkers sets the number of synchronous validation worker goroutines. Defaults to NumCPU.

The synchronous validation workers perform signature validation, apply inline user validators, and schedule asynchronous user validators. You can adjust this parameter to devote less cpu time to synchronous validation.

type PBTracer

type PBTracer struct {
	// contains filtered or unexported fields
}

PBTracer is a tracer that writes events to a file, as delimited protobufs.

func NewPBTracer

func NewPBTracer(file string) (*PBTracer, error)

func OpenPBTracer

func OpenPBTracer(file string, flags int, perm os.FileMode) (*PBTracer, error)

OpenPBTracer creates a new PBTracer, with explicit control of OpenFile flags and permissions.

func (*PBTracer) Close

func (t *PBTracer) Close()

func (*PBTracer) Trace

func (t *PBTracer) Trace(evt *pb.TraceEvent)

type PeerEvent

type PeerEvent struct {
	Type EventType
	Peer peer.ID
}

type PubOpt

type PubOpt func(pub *PublishOptions) error

func WithReadiness

func WithReadiness(ready RouterReady) PubOpt

WithReadiness returns a publishing option for only publishing when the router is ready. This option is not useful unless PubSub is also using WithDiscovery

type PubSub

type PubSub struct {
	// contains filtered or unexported fields
}

PubSub is the implementation of the pubsub system.

func NewFloodSub

func NewFloodSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error)

NewFloodSub returns a new PubSub object using the FloodSubRouter.

func NewFloodsubWithProtocols

func NewFloodsubWithProtocols(ctx context.Context, h host.Host, ps []protocol.ID, opts ...Option) (*PubSub, error)

NewFloodsubWithProtocols returns a new floodsub-enabled PubSub objecting using the protocols specified in ps.

func NewGossipSub

func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error)

NewGossipSub returns a new PubSub object using GossipSubRouter as the router.

func NewPubSub

func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option) (*PubSub, error)

NewPubSub returns a new PubSub management object.

func NewRandomSub

func NewRandomSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error)

NewRandomSub returns a new PubSub object using RandomSubRouter as the router.

func (*PubSub) BlacklistPeer

func (p *PubSub) BlacklistPeer(pid peer.ID)

BlacklistPeer blacklists a peer; all messages from this peer will be unconditionally dropped.

func (*PubSub) GetTopics

func (p *PubSub) GetTopics() []string

GetTopics returns the topics this node is subscribed to.

func (*PubSub) Join

func (p *PubSub) Join(topic string, opts ...TopicOpt) (*Topic, error)

Join joins the topic and returns a Topic handle. Only one Topic handle should exist per topic, and Join will error if the Topic handle already exists.

func (*PubSub) ListPeers

func (p *PubSub) ListPeers(topic string) []peer.ID

ListPeers returns a list of peers we are connected to in the given topic.

func (*PubSub) NewEpoch

func (p *PubSub) NewEpoch()

func (*PubSub) Publish

func (p *PubSub) Publish(topic string, data []byte) error

Publish publishes data to the given topic.

func (*PubSub) RegisterTopicValidator

func (p *PubSub) RegisterTopicValidator(topic string, val Validator, opts ...ValidatorOpt) error

RegisterTopicValidator registers a validator for topic. By default validators are asynchronous, which means they will run in a separate goroutine. The number of active goroutines is controlled by global and per topic validator throttles; if it exceeds the throttle threshold, messages will be dropped.

func (*PubSub) Subscribe deprecated

func (p *PubSub) Subscribe(topic string, opts ...SubOpt) (*Subscription, error)

Subscribe returns a new Subscription for the given topic. Note that subscription is not an instanteneous operation. It may take some time before the subscription is processed by the pubsub main loop and propagated to our peers.

Deprecated: use pubsub.Join() and topic.Subscribe() instead

func (*PubSub) SubscribeByTopicDescriptor deprecated

func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor, opts ...SubOpt) (*Subscription, error)

SubscribeByTopicDescriptor lets you subscribe a topic using a pb.TopicDescriptor.

Deprecated: use pubsub.Join() and topic.Subscribe() instead

func (*PubSub) UnregisterTopicValidator

func (p *PubSub) UnregisterTopicValidator(topic string) error

UnregisterTopicValidator removes a validator from a topic. Returns an error if there was no validator registered with the topic.

type PubSubNotif

type PubSubNotif PubSub

func (*PubSubNotif) ClosedStream

func (p *PubSubNotif) ClosedStream(n network.Network, s network.Stream)

func (*PubSubNotif) Connected

func (p *PubSubNotif) Connected(n network.Network, c network.Conn)

func (*PubSubNotif) Disconnected

func (p *PubSubNotif) Disconnected(n network.Network, c network.Conn)

func (*PubSubNotif) Listen

func (p *PubSubNotif) Listen(n network.Network, _ ma.Multiaddr)

func (*PubSubNotif) ListenClose

func (p *PubSubNotif) ListenClose(n network.Network, _ ma.Multiaddr)

func (*PubSubNotif) OpenedStream

func (p *PubSubNotif) OpenedStream(n network.Network, s network.Stream)

type PubSubRouter

type PubSubRouter interface {
	// Protocols returns the list of protocols supported by the router.
	Protocols() []protocol.ID
	// Attach is invoked by the PubSub constructor to attach the router to a
	// freshly initialized PubSub instance.
	Attach(*PubSub)
	// AddPeer notifies the router that a new peer has been connected.
	AddPeer(peer.ID, protocol.ID)
	// RemovePeer notifies the router that a peer has been disconnected.
	RemovePeer(peer.ID)
	// EnoughPeers returns whether the router needs more peers before it's ready to publish new records.
	// Suggested (if greater than 0) is a suggested number of peers that the router should need.
	EnoughPeers(topic string, suggested int) bool
	// HandleRPC is invoked to process control messages in the RPC envelope.
	// It is invoked after subscriptions and payload messages have been processed.
	HandleRPC(*RPC)
	// Publish is invoked to forward a new message that has been validated.
	Publish(peer.ID, *pb.Message)
	// Join notifies the router that we want to receive and forward messages in a topic.
	// It is invoked after the subscription announcement.
	Join(topic string)
	// Leave notifies the router that we are no longer interested in a topic.
	// It is invoked after the unsubscription announcement.
	Leave(topic string)
}

PubSubRouter is the message router component of PubSub.

type PublishOptions

type PublishOptions struct {
	// contains filtered or unexported fields
}

type RPC

type RPC struct {
	pb.RPC
	// contains filtered or unexported fields
}

type RandomSubRouter

type RandomSubRouter struct {
	// contains filtered or unexported fields
}

RandomSubRouter is a router that implements a random propagation strategy. For each message, it selects RandomSubD peers and forwards the message to them.

func (*RandomSubRouter) AddPeer

func (rs *RandomSubRouter) AddPeer(p peer.ID, proto protocol.ID)

func (*RandomSubRouter) Attach

func (rs *RandomSubRouter) Attach(p *PubSub)

func (*RandomSubRouter) EnoughPeers

func (rs *RandomSubRouter) EnoughPeers(topic string, suggested int) bool

func (*RandomSubRouter) HandleRPC

func (rs *RandomSubRouter) HandleRPC(rpc *RPC)

func (*RandomSubRouter) Join

func (rs *RandomSubRouter) Join(topic string)

func (*RandomSubRouter) Leave

func (rs *RandomSubRouter) Leave(topic string)

func (*RandomSubRouter) Protocols

func (rs *RandomSubRouter) Protocols() []protocol.ID

func (*RandomSubRouter) Publish

func (rs *RandomSubRouter) Publish(from peer.ID, msg *pb.Message)

func (*RandomSubRouter) RemovePeer

func (rs *RandomSubRouter) RemovePeer(p peer.ID)

type RemoteTracer

type RemoteTracer struct {
	// contains filtered or unexported fields
}

RemoteTracer is a tracer that sends trace events to a remote peer

func NewRemoteTracer

func NewRemoteTracer(ctx context.Context, host host.Host, pi peer.AddrInfo) (*RemoteTracer, error)

NewRemoteTracer constructs a RemoteTracer, tracing to the peer identified by pi

func (*RemoteTracer) Close

func (t *RemoteTracer) Close()

func (*RemoteTracer) Trace

func (t *RemoteTracer) Trace(evt *pb.TraceEvent)

type RouterReady

type RouterReady func(rt PubSubRouter, topic string) (bool, error)

RouterReady is a function that decides if a router is ready to publish

func MinTopicSize

func MinTopicSize(size int) RouterReady

MinTopicSize returns a function that checks if a router is ready for publishing based on the topic size. The router ultimately decides the whether it is ready or not, the given size is just a suggestion.

type StemMessage

type StemMessage struct {
	// contains filtered or unexported fields
}

type SubOpt

type SubOpt func(sub *Subscription) error

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Subscription handles the details of a particular Topic subscription. There may be many subscriptions for a given Topic.

func (*Subscription) Cancel

func (sub *Subscription) Cancel()

Cancel closes the subscription. If this is the last active subscription then pubsub will send an unsubscribe announcement to the network.

func (*Subscription) Next

func (sub *Subscription) Next(ctx context.Context) (*Message, error)

Next returns the next message in our subscription

func (*Subscription) Topic

func (sub *Subscription) Topic() string

Topic returns the topic string associated with the Subscription

type TimeCachedBlacklist

type TimeCachedBlacklist struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

TimeCachedBlacklist is a blacklist implementation using a time cache

func (*TimeCachedBlacklist) Add

func (b *TimeCachedBlacklist) Add(p peer.ID)

func (*TimeCachedBlacklist) Contains

func (b *TimeCachedBlacklist) Contains(p peer.ID) bool

type Topic

type Topic struct {
	// contains filtered or unexported fields
}

Topic is the handle for a pubsub topic

func (*Topic) Close

func (t *Topic) Close() error

Close closes down the topic. Will return an error unless there are no active event handlers or subscriptions. Does not error if the topic is already closed.

func (*Topic) EventHandler

func (t *Topic) EventHandler(opts ...TopicEventHandlerOpt) (*TopicEventHandler, error)

EventHandler creates a handle for topic specific events Multiple event handlers may be created and will operate independently of each other

func (*Topic) ListPeers

func (t *Topic) ListPeers() []peer.ID

ListPeers returns a list of peers we are connected to in the given topic.

func (*Topic) Publish

func (t *Topic) Publish(ctx context.Context, data []byte, opts ...PubOpt) error

Publish publishes data to topic.

func (*Topic) Subscribe

func (t *Topic) Subscribe(opts ...SubOpt) (*Subscription, error)

Subscribe returns a new Subscription for the topic. Note that subscription is not an instanteneous operation. It may take some time before the subscription is processed by the pubsub main loop and propagated to our peers.

type TopicEventHandler

type TopicEventHandler struct {
	// contains filtered or unexported fields
}

TopicEventHandler is used to manage topic specific events. No Subscription is required to receive events.

func (*TopicEventHandler) Cancel

func (t *TopicEventHandler) Cancel()

Cancel closes the topic event handler

func (*TopicEventHandler) NextPeerEvent

func (t *TopicEventHandler) NextPeerEvent(ctx context.Context) (PeerEvent, error)

NextPeerEvent returns the next event regarding subscribed peers Guarantees: Peer Join and Peer Leave events for a given peer will fire in order. Unless a peer both Joins and Leaves before NextPeerEvent emits either event all events will eventually be received from NextPeerEvent.

type TopicEventHandlerOpt

type TopicEventHandlerOpt func(t *TopicEventHandler) error

type TopicOpt

type TopicOpt func(t *Topic) error

type TopicOptions

type TopicOptions struct{}

type Validator

type Validator func(context.Context, peer.ID, *Message) bool

Validator is a function that validates a message.

type ValidatorOpt

type ValidatorOpt func(addVal *addValReq) error

ValidatorOpt is an option for RegisterTopicValidator.

func WithValidatorConcurrency

func WithValidatorConcurrency(n int) ValidatorOpt

WithValidatorConcurrency is an option that sets the topic validator throttle. This controls the number of active validation goroutines for the topic; the default is 1024.

func WithValidatorInline

func WithValidatorInline(inline bool) ValidatorOpt

WithValidatorInline is an option that sets the validation disposition to synchronous: it will be executed inline in validation front-end, without spawning a new goroutine. This is suitable for simple or cpu-bound validators that do not block.

func WithValidatorTimeout

func WithValidatorTimeout(timeout time.Duration) ValidatorOpt

WithValidatorTimeout is an option that sets a timeout for an (asynchronous) topic validator. By default there is no timeout in asynchronous validators.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL