peerstream

package
v0.3.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 12, 2016 License: MIT, MIT Imports: 10 Imported by: 0

README

go-peerstream p2p multi-multixplexing

Package peerstream is a peer-to-peer networking library that multiplexes connections to many hosts. It tried to simplify the complexity of:

  • accepting incoming connections over multiple listeners
  • dialing outgoing connections over multiple transports
  • multiplexing multiple connections per-peer
  • multiplexing multiple different servers or protocols
  • handling backpressure correctly
  • handling stream multiplexing (we use SPDY, but maybe QUIC some day)
  • providing a simple interface to the user
Godoc: https://godoc.org/github.com/jbenet/go-peerstream

See this working example/example.go:

package main

import (
  "fmt"
  "io"
  "net"
  "os"

  ps "github.com/jbenet/go-peerstream"
)

func main() {
  // create a new Swarm
  swarm := ps.NewSwarm()
  defer swarm.Close()

  // tell swarm what to do with a new incoming streams.
  // EchoHandler just echos back anything they write.
  swarm.SetStreamHandler(ps.EchoHandler)

  // Okay, let's try listening on some transports
  l1, err := net.Listen("tcp", "localhost:8001")
  if err != nil {
    panic(err)
  }

  l2, err := net.Listen("tcp", "localhost:8002")
  if err != nil {
    panic(err)
  }

  // tell swarm to accept incoming connections on these
  // listeners. Swarm will start accepting new connections.
  if err := swarm.AddListener(l1); err != nil {
    panic(err)
  }
  if err := swarm.AddListener(l2); err != nil {
    panic(err)
  }

  // ok, let's try some outgoing connections
  nc1, err := net.Dial("tcp", "localhost:8001")
  if err != nil {
    panic(err)
  }

  nc2, err := net.Dial("tcp", "localhost:8002")
  if err != nil {
    panic(err)
  }

  // add them to the swarm
  c1, err := swarm.AddConn(nc1)
  if err != nil {
    panic(err)
  }
  c2, err := swarm.AddConn(nc2)
  if err != nil {
    panic(err)
  }

  // Swarm treats listeners as sources of new connections and does
  // not distinguish between outgoing or incoming connections.
  // It provides the net.Conn to the StreamHandler so you can
  // distinguish between them however you wish.

  // now let's try opening some streams!
  // You can specify what connection you want to use
  s1, err := swarm.NewStreamWithConn(c1)
  if err != nil {
    panic(err)
  }

  // Or, you can specify a SelectConn function that picks between all
  // (it calls NewStreamWithConn underneath the hood)
  s2, err := swarm.NewStreamSelectConn(func(conns []*ps.Conn) *ps.Conn {
    if len(conns) > 0 {
      return conns[0]
    }
    return nil
  })
  if err != nil {
    panic(err)
  }

  // Or, you can bind connections to ConnGroup ids. You can bind a conn to
  // multiple groups. And, if conn wasn't in swarm, it calls swarm.AddConn.
  // You can use any Go `KeyType` as a group A `KeyType` as in maps...)
  swarm.AddConnToGroup(c2, 1)

  // And then use that group to select a connection. Swarm will use any
  // connection it finds in that group, using a SelectConn you can rebind:
  //   swarm.SetGroupSelectConn(1, SelectConn)
  //   swarm.SetDegaultGroupSelectConn(SelectConn)
  s3, err := swarm.NewStreamWithGroup(1)
  if err != nil {
    panic(err)
  }

  // Why groups? It's because with many connections, and many transports,
  // and many Servers (or Protocols), we can use the Swarm to associate
  // a different StreamHandlers per group, and to let us create NewStreams
  // on a given group.

  // Ok, we have streams. now what. Use them! Our Streams are basically
  // streams from github.com/docker/spdystream, so they work the same
  // way:

  for i, stream := range []ps.Stream{s1, s2, s3} {
    stream.Wait()
    str := "stream %d ready:"
    fmt.Fprintf(stream, str, i)

    buf := make([]byte, len(str))
    stream.Read(buf)
    fmt.Println(string(buf))
  }

  go io.Copy(os.Stdout, s1)
  go io.Copy(os.Stdout, s2)
  go io.Copy(os.Stdout, s3)
  io.Copy(io.MultiWriter(s1, s2, s3), os.Stdin)
}

func log(s string) {
  fmt.Fprintf(os.Stderr, s+"\n")
}

Documentation

Overview

Package peerstream is a peer-to-peer networking library that multiplexes connections to many hosts. It tried to simplify the complexity of:

  • accepting incoming connections over **multiple** listeners
  • dialing outgoing connections over **multiple** transports
  • multiplexing **multiple** connections per-peer
  • multiplexing **multiple** different servers or protocols
  • handling backpressure correctly
  • handling stream multiplexing (we use SPDY, but maybe QUIC some day)
  • providing a **simple** interface to the user

Index

Constants

This section is empty.

Variables

View Source
var AcceptConcurrency = 200

AcceptConcurrency is how many connections can simultaneously be in process of being accepted. Handshakes can sometimes occur as part of this process, so it may take some time. It is imporant to rate limit lest a malicious influx of connections would cause our node to consume all its resources accepting new connections.

View Source
var ErrGroupNotFound = errors.New("group not found")

ErrGroupNotFound signals no such group exists

View Source
var ErrInvalidConnSelected = errors.New("invalid selected connection")

ErrInvalidConnSelected signals that a connection selected with a SelectConn function is invalid. This may be due to the Conn not being part of the original set given to the function, or the value being nil.

View Source
var ErrNoConnections = errors.New("no connections")

ErrNoConnections signals that no connections are available

View Source
var GarbageCollectTimeout = 5 * time.Second

GarbageCollectTimeout governs the periodic connection closer.

View Source
var SelectRandomConn = func(conns []*Conn) *Conn {
	if len(conns) == 0 {
		return nil
	}

	return conns[rand.Intn(len(conns))]
}

Functions

func CloseHandler

func CloseHandler(s *Stream)

func ConnInConns

func ConnInConns(c1 *Conn, conns []*Conn) bool

func EchoHandler

func EchoHandler(s *Stream)

func NoOpConnHandler

func NoOpConnHandler(c *Conn)

func NoOpStreamHandler

func NoOpStreamHandler(s *Stream)

Types

type Conn

type Conn struct {
	// contains filtered or unexported fields
}

Conn is a Swarm-associated connection.

func ConnsWithGroup

func ConnsWithGroup(g Group, conns []*Conn) []*Conn

ConnsWithGroup narrows down a set of connections to those in a given group.

func (*Conn) AddGroup

func (c *Conn) AddGroup(g Group)

AddGroup assigns given Group to Conn

func (*Conn) Close

func (c *Conn) Close() error

Close closes this connection

func (*Conn) Conn

func (c *Conn) Conn() smux.Conn

Conn returns the underlying transport Connection we use Warning: modifying this object is undefined.

func (*Conn) GoClose added in v0.3.10

func (c *Conn) GoClose()

GoClose spawns off a goroutine to close the connection iff the connection is not already being closed and returns immediately

func (*Conn) Groups

func (c *Conn) Groups() []Group

Groups returns the Groups this Conn belongs to

func (*Conn) InGroup

func (c *Conn) InGroup(g Group) bool

InGroup returns whether this Conn belongs to a Group

func (*Conn) NetConn

func (c *Conn) NetConn() net.Conn

NetConn returns the underlying net.Conn

func (*Conn) NewStream

func (c *Conn) NewStream() (*Stream, error)

Stream returns a stream associated with this Conn

func (*Conn) Streams

func (c *Conn) Streams() []*Stream

func (*Conn) String

func (c *Conn) String() string

String returns a string representation of the Conn

func (*Conn) Swarm

func (c *Conn) Swarm() *Swarm

Swarm returns the Swarm associated with this Conn

type ConnHandler

type ConnHandler func(s *Conn)

ConnHandler is a function which receives a Conn. It allows clients to set a function to receive newly accepted connections. It works like StreamHandler, but is usually less useful than usual as most services will only use Streams. It is safe to pass or store the *Conn elsewhere. Note: the ConnHandler is called sequentially, so spawn goroutines or pass the Conn. See EchoHandler.

type Group

type Group interface{}

Group is an object used to associate a group of Streams, Connections, and Listeners. It can be anything, it is meant to work like a KeyType in maps

type Groupable

type Groupable interface {
	// Groups returns the groups this object belongs to
	Groups() []Group

	// InGroup returns whether this object belongs to a Group
	InGroup(g Group) bool

	// AddGroup adds this object to a group
	AddGroup(g Group)
}

Groupable is an interface for a set of objects that can be assigned groups: Streams, Connections, and Listeners. Objects inherit groups (e.g. a Stream inherits the groups of its parent Connection, and in turn that of its Listener).

type Listener

type Listener struct {
	// contains filtered or unexported fields
}

func ListenersWithGroup

func ListenersWithGroup(g Group, ls []*Listener) []*Listener

ListenersWithGroup narrows down a set of listeners to those in given group.

func (*Listener) AcceptErrors

func (l *Listener) AcceptErrors() <-chan error

AcceptError returns the error that we **might** on listener close

func (*Listener) AddGroup

func (l *Listener) AddGroup(g Group)

AddGroup assigns given Group to Listener

func (*Listener) Close

func (l *Listener) Close() error

func (*Listener) Groups

func (l *Listener) Groups() []Group

Groups returns the groups this Listener belongs to

func (*Listener) InGroup

func (l *Listener) InGroup(g Group) bool

InGroup returns whether this Listener belongs to a Group

func (*Listener) NetListener

func (l *Listener) NetListener() net.Listener

NetListener is the underlying net.Listener

func (*Listener) String

func (l *Listener) String() string

String returns a string representation of the Listener

type Notifiee

type Notifiee interface {
	Connected(*Conn)      // called when a connection opened
	Disconnected(*Conn)   // called when a connection closed
	OpenedStream(*Stream) // called when a stream opened
	ClosedStream(*Stream) // called when a stream closed
}

Notifiee is an interface for an object wishing to receive notifications from a Swarm

type SelectConn

type SelectConn func([]*Conn) *Conn

SelectConn selects a connection out of list. It allows delegation of decision making to clients. Clients can make SelectConn functons that check things connection qualities -- like latency andbandwidth -- or pick from a logical set of connections.

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream is an io.{Read,Write,Close}r to a remote counterpart. It wraps a spdystream.Stream, and links it to a Conn and groups

func StreamsWithGroup

func StreamsWithGroup(g Group, streams []*Stream) []*Stream

StreamsWithGroup narrows down a set of streams to those in given group.

func (*Stream) AddGroup

func (s *Stream) AddGroup(g Group)

AddGroup assigns given Group to Stream

func (*Stream) Close

func (s *Stream) Close() error

func (*Stream) Conn

func (s *Stream) Conn() *Conn

Conn returns the Conn associated with this Stream

func (*Stream) Groups

func (s *Stream) Groups() []Group

Groups returns the Groups this Stream belongs to

func (*Stream) InGroup

func (s *Stream) InGroup(g Group) bool

InGroup returns whether this stream belongs to a Group

func (*Stream) Read

func (s *Stream) Read(p []byte) (n int, err error)

func (*Stream) Stream

func (s *Stream) Stream() smux.Stream

SPDYStream returns the underlying *spdystream.Stream

func (*Stream) String

func (s *Stream) String() string

String returns a string representation of the Stream

func (*Stream) Swarm

func (s *Stream) Swarm() *Swarm

Swarm returns the Swarm asociated with this Stream

func (*Stream) Write

func (s *Stream) Write(p []byte) (n int, err error)

type StreamHandler

type StreamHandler func(s *Stream)

StreamHandler is a function which receives a Stream. It allows clients to set a function to receive newly created streams, and decide whether to continue adding them. It works sort of like a http.HandleFunc. Note: the StreamHandler is called sequentially, so spawn goroutines or pass the Stream. See EchoHandler.

type Swarm

type Swarm struct {
	// contains filtered or unexported fields
}

func NewSwarm

func NewSwarm(t smux.Transport) *Swarm

func (*Swarm) AddConn

func (s *Swarm) AddConn(netConn net.Conn) (*Conn, error)

AddConn gives the Swarm ownership of net.Conn. The Swarm will open a SPDY session and begin listening for Streams. Returns the resulting Swarm-associated peerstream.Conn. Idempotent: if the Connection has already been added, this is a no-op.

func (*Swarm) AddConnToGroup

func (s *Swarm) AddConnToGroup(conn *Conn, g Group)

AddConnToGroup assigns given Group to conn

func (*Swarm) AddListener

func (s *Swarm) AddListener(l net.Listener) (*Listener, error)

AddListener adds net.Listener to the Swarm, and immediately begins accepting incoming connections.

func (*Swarm) Close

func (s *Swarm) Close() error

Close shuts down the Swarm, and it's listeners.

func (*Swarm) ConnHandler

func (s *Swarm) ConnHandler() ConnHandler

ConnHandler returns the Swarm's current ConnHandler. This is a threadsafe (atomic) operation

func (*Swarm) Conns

func (s *Swarm) Conns() []*Conn

Conns returns all the connections associated with this Swarm.

func (*Swarm) ConnsWithGroup

func (s *Swarm) ConnsWithGroup(g Group) []*Conn

ConnsWithGroup returns all the connections with a given Group

func (*Swarm) Dump

func (s *Swarm) Dump() string

Dump returns a string with all the internal state

func (*Swarm) Listeners

func (s *Swarm) Listeners() []*Listener

Listeners returns all the listeners associated with this Swarm.

func (*Swarm) NewStream

func (s *Swarm) NewStream() (*Stream, error)

NewStream opens a new Stream on the best available connection, as selected by current swarm.SelectConn.

func (*Swarm) NewStreamSelectConn

func (s *Swarm) NewStreamSelectConn(selConn SelectConn) (*Stream, error)

NewStreamWithSelectConn opens a new Stream on a connection selected by selConn.

func (*Swarm) NewStreamWithConn

func (s *Swarm) NewStreamWithConn(conn *Conn) (*Stream, error)

NewStreamWithConnection opens a new Stream on given connection.

func (*Swarm) NewStreamWithGroup

func (s *Swarm) NewStreamWithGroup(group Group) (*Stream, error)

NewStreamWithGroup opens a new Stream on an available connection in the given group. Uses the current swarm.SelectConn to pick between multiple connections.

func (*Swarm) NewStreamWithNetConn

func (s *Swarm) NewStreamWithNetConn(netConn net.Conn) (*Stream, error)

NewStreamWithNetConn opens a new Stream on given net.Conn. Calls s.AddConn(netConn).

func (*Swarm) Notify

func (s *Swarm) Notify(n Notifiee)

Notify signs up Notifiee to receive signals when events happen

func (*Swarm) SelectConn

func (s *Swarm) SelectConn() SelectConn

ConnSelect returns the Swarm's current connection selector. ConnSelect is used in order to select the best of a set of possible connections. The default chooses one at random. This is a threadsafe (atomic) operation

func (*Swarm) SetConnHandler

func (s *Swarm) SetConnHandler(ch ConnHandler)

SetConnHandler assigns the conn handler in the swarm. Unlike the StreamHandler, the ConnHandler has less respon- ibility for the Connection. The Swarm is still its client. This handler is only a notification. This is a threadsafe (atomic) operation

func (*Swarm) SetSelectConn

func (s *Swarm) SetSelectConn(cs SelectConn)

SetConnSelect assigns the connection selector in the swarm. If cs is nil, will use SelectRandomConn This is a threadsafe (atomic) operation

func (*Swarm) SetStreamHandler

func (s *Swarm) SetStreamHandler(sh StreamHandler)

SetStreamHandler assigns the stream handler in the swarm. The handler assumes responsibility for closing the stream. This need not happen at the end of the handler, leaving the stream open (to be used and closed later) is fine. It is also fine to keep a pointer to the Stream. This is a threadsafe (atomic) operation

func (*Swarm) StopNotify

func (s *Swarm) StopNotify(n Notifiee)

StopNotify unregisters Notifiee fromr receiving signals

func (*Swarm) StreamHandler

func (s *Swarm) StreamHandler() StreamHandler

StreamHandler returns the Swarm's current StreamHandler. This is a threadsafe (atomic) operation

func (*Swarm) Streams

func (s *Swarm) Streams() []*Stream

Streams returns all the streams associated with this Swarm.

func (*Swarm) StreamsWithGroup

func (s *Swarm) StreamsWithGroup(g Group) []*Stream

StreamsWithGroup returns all the streams with a given Group

func (*Swarm) String

func (s *Swarm) String() string

String returns a string with various internal stats

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL