subpub

package
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 28, 2022 License: AGPL-3.0 Imports: 30 Imported by: 0

README

SubPub

SubPub is a simplified PubSub protocol using libp2p

minimal usage example:

package subpub_test

import (
	"context"

	"go.vocdoni.io/dvote/ipfssync/subpub"
	"go.vocdoni.io/dvote/log"
	"go.vocdoni.io/dvote/util"
)

func Example() {
	log.Init("info", "stdout")

	messages := make(chan *subpub.Message)
	groupKey := []byte("test")
	port := 6543
	privKey := util.RandomHex(32)

	sp := subpub.NewSubPub(privKey, groupKey, int32(port), false)
	sp.Start(context.Background(), messages)
}
  • Creates a libp2p Host, with a PubSub service attached that uses the GossipSub router.
  • method SendBroadcast() uses this PubSub service for routing broadcast messages to peers.
  • method SendUnicast() sends a unicast packet directly to a single peer.
  • The topic used for p2p discovery is determined from the groupKey (hash)
  • Incoming messages (both unicasts and broadcasts) are passed to the messages channel indicated in Start()

Documentation

Overview

Example
package main

import (
	"context"

	"go.vocdoni.io/dvote/ipfssync/subpub"
	"go.vocdoni.io/dvote/log"
	"go.vocdoni.io/dvote/util"
)

func main() {
	log.Init("info", "stdout")

	messages := make(chan *subpub.Message)
	groupKey := []byte("test")
	port := 6543
	privKey := util.RandomHex(32)

	sp := subpub.NewSubPub(privKey, groupKey, int32(port), false)
	sp.Start(context.Background(), messages)
}
Output:

Index

Examples

Constants

View Source
const (
	IPv4 = 4
	IPv6 = 6
)
View Source
const GossipBufSize = 128

GossipBufSize is the number of incoming messages to buffer for each topic.

View Source
const UnicastBufSize = 128

UnicastBufSize is the number of unicast incoming messages to buffer.

Variables

This section is empty.

Functions

This section is empty.

Types

type Gossip

type Gossip struct {
	// Messages is a channel of messages received from other peers in the topic
	Messages chan *Message
	// contains filtered or unexported fields
}

Gossip represents a subscription to a single PubSub topic. Messages can be published to the topic with Gossip.Publish, and received messages are pushed to the Messages channel.

func JoinGossip

func JoinGossip(ctx context.Context, ps *pubsub.PubSub, selfID peer.ID, topic string) (*Gossip, error)

JoinGossip tries to subscribe to the PubSub topic, returning a Gossip on success.

func (*Gossip) Publish

func (g *Gossip) Publish(message []byte) error

Publish sends a message to the pubsub topic.

type Message

type Message struct {
	Data []byte
	Peer string
}

type SubPub

type SubPub struct {
	Key         ecdsa.PrivateKey
	GroupKey    [32]byte
	Topic       string
	NoBootStrap bool
	BootNodes   []string
	PubKey      string
	Private     bool
	NodeID      string
	Port        int32
	Host        host.Host
	MaxDHTpeers int

	Gossip      *Gossip       // Gossip deals with broadcasts
	Streams     sync.Map      // this is a thread-safe map[libpeer.ID]bufioWithMutex
	UnicastMsgs chan *Message // UnicastMsgs passes unicasts around
	Messages    chan *Message // both unicasts and broadcasts end up being passed to Messages

	DiscoveryPeriod time.Duration

	// These are used in testing
	OnPeerAdd    func(id libpeer.ID)
	OnPeerRemove func(id libpeer.ID)
	// contains filtered or unexported fields
}

SubPub is a simplified PubSub protocol using libp2p

func NewSubPub

func NewSubPub(hexKey string, groupKey []byte, port int32, private bool) *SubPub

NewSubPub creates a new SubPub instance. The private key is used to identify the node (by derivating its pubKey) on the p2p network. The groupKey is a secret shared among the PubSub participants. Only those with the key will be able to join. If private enabled, a libp2p private network is created using the groupKey as shared secret (experimental). If private enabled the default bootnodes will not work.

func (*SubPub) AddPeer

func (ps *SubPub) AddPeer(peer string) error

AddPeer creates a new libp2p peer connection (transport layer)

func (*SubPub) Address

func (s *SubPub) Address() string

func (*SubPub) Close

func (ps *SubPub) Close() error

Close terminaters the subpub networking stack

func (*SubPub) Listen

func (ps *SubPub) Listen(receiver chan<- *Message)

func (*SubPub) ReadMessage

func (ps *SubPub) ReadMessage(r *bufio.Reader) (*Message, error)

func (*SubPub) SendBroadcast

func (s *SubPub) SendBroadcast(msg Message) error

func (*SubPub) SendMessage

func (ps *SubPub) SendMessage(w *bufio.Writer, msg []byte) error

SendMessage encrypts and writes a message on the readwriter buffer

func (*SubPub) SendUnicast

func (s *SubPub) SendUnicast(address string, msg Message) error

func (*SubPub) Start

func (ps *SubPub) Start(ctx context.Context, receiver chan *Message)

Start connects the SubPub networking stack and begins passing incoming messages to the receiver chan

func (*SubPub) String

func (ps *SubPub) String() string

func (*SubPub) Unicast

func (ps *SubPub) Unicast(address string, message []byte) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL