core

package module
v0.0.0-...-dce34b2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 27, 2020 License: MIT Imports: 35 Imported by: 0

README

libp2p-mpi/core

standard-readme compliant

How does it work?

Interfaces

All of the interfaces use by the main mpi interface are defined in the type_definition.go file.

Using mpi.SetInitFunctions() you can set the init functions for all other interfaces.

ExtHost

ExtHost is an extended go-libp2p host interface that implements functions to manage peerstores for each interpreter.

Store

The Store interface is an ipfs interface to store interpreters.

Remote

The Remote interface implements the connection between two peers and peer reseting.

Interface

The interface interface implements the interactions between a SlaveComm interface and a local interpreter.

SlaveComm

The SlaveComm interface handles the interactions between the Remotes and a local Interface.

MasterComm

The MasterComm interface is a wrap-around of the SlaveComm interface.

standardFunctionsCloser

The standardInterface interface is used in all other classes to handle the functions in the standardFunctionsCloser interface.

Peer reset

The peer reset algorithm of libp2p-mpi is also defined in the type_definition.go file in the ResetReader function:

func ResetReader(received int, sent []interface{}, sendToRemote func(interface{}), pushToComm func(string)) (readFromRemote func(string)) {
  offset := received

  for _, msg := range sent {
    sendToRemote(msg)
  }

  return func(msg string) {
    if offset > 0 {
      offset--
      return
    }

    pushToComm(msg)
  }
}

This function takes as argument:

  • the number of messages already received (received) to know how many messages to discard from the remote,
  • the list of all sent messages (sent) to re-send them,
  • the function that handles sending messages to the remote (sendToRemote),
  • and the function that pushes messages back to the comm (readFromRemote).

It returns a function that handles new messages from the remote (readFromRemote).

Initializing messages that have to be send at remote reset are append before the list of sent messages (sent) and passed to the ResetReader function.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	HostHeader = "Host"
	HostLogger = log.Logger(HostHeader)

	LookUpInterval = 45 * time.Second
)
View Source
var (
	InterfaceHeader = "Interface"
	InterfaceLogger = log.Logger(InterfaceHeader)

	HeaderNotUnderstood  = errors.New("Header not understood")
	CommandNotUnderstood = errors.New("Command not understood")
	//NotMatserComm = errors.New("Not the MasterComm")
	NotEnoughFields = errors.New("Not enough field")
	EmptyString     = errors.New("Received an empty string")

	InterfaceLogHeader     = "Log"
	InterfaceSendHeader    = "Send"
	InterfaceResetHeader   = "Reset"
	InterfaceRequestHeader = "Req"

	LogSubFormat = "%s %d/%d"
)
View Source
var (
	IpfsHeader = "IpfsStore"
	IpfsLogger = log.Logger(IpfsHeader)

	ModePerm os.FileMode = 0777

	InstalledHeader = "installed/"
	FailedHeader    = "failed/"
)
View Source
var (
	MasterCommHeader = "MasterComm"
	MasterLogger     = log.Logger(MasterCommHeader)

	ResetCooldown = 2 * time.Second
)
View Source
var (
	MpiHeader = "Mpi"
	MpiLogger = log.Logger(MpiHeader)

	ThrottleDuration = 50 * time.Millisecond
)
View Source
var (
	RemoteHeader = "Remote"
	RemoteLogger = log.Logger(RemoteHeader)

	//ResetHandShakeHeader = "ResetHandShake"
	PingRespHeader  = "PingResp"
	HandShakeHeader = "HandShake"
	MessageHeader   = "Msg"
	CloseHeader     = "Close"
	PingHeader      = "Ping"
	ResetHeader     = "Reset"

	StandardTimeout      = 2 * time.Second        //Will be increase later
	StandardPingInterval = 200 * time.Millisecond //Will be increase later

	NilStreamError = errors.New("nil stream")
	ErrorInterval  = 4 * time.Second
)
View Source
var (
	StreamHeader = "SelfStream"

	StreamEnded = errors.New("Stream closed")

	StandardCheckingInterval = 300 * time.Millisecond
)
View Source
var (
	SlaveCommHeader = "SlaveComm"
	SlaveLogger     = log.Logger(SlaveCommHeader)
)

Functions

func ListIpAdresses

func ListIpAdresses() ([]maddr.Multiaddr, error)

func NewChannelBool

func NewChannelBool() *safeChannelBool

func NewChannelString

func NewChannelString() *safeChannelString

func NewNewLogger

func NewNewLogger(quiet bool) func(string, int, int) (func(string), error)

func NewSafeWaitgroupTwice

func NewSafeWaitgroupTwice(n int, m int) *safeWaitgroupTwice

func NewStandardInterface

func NewStandardInterface(additionalHandler ...func() error) standardFunctionsCloser

func ResetReader

func ResetReader(received int, sent []interface{}, sendToRemote func(interface{}), pushToComm func(string)) (readFromRemote func(string))

Types

type BasicExtHost

type BasicExtHost struct {
	Ctx            context.Context
	Host           host.Host
	StreamHandlers sync.Map
	Routing        *discovery.RoutingDiscovery
	PeerStores     map[protocol.ID]peerstore.Peerstore
	Standard       standardFunctionsCloser
}

func (*BasicExtHost) Addrs

func (h *BasicExtHost) Addrs() []maddr.Multiaddr

func (*BasicExtHost) Check

func (h *BasicExtHost) Check() bool

func (*BasicExtHost) Close

func (h *BasicExtHost) Close() error

func (*BasicExtHost) ConnManager

func (h *BasicExtHost) ConnManager() connmgr.ConnManager

func (*BasicExtHost) Connect

func (h *BasicExtHost) Connect(ctx context.Context, pi peer.AddrInfo) error

func (*BasicExtHost) EventBus

func (h *BasicExtHost) EventBus() event.Bus

func (*BasicExtHost) ID

func (h *BasicExtHost) ID() peer.ID

func (*BasicExtHost) Listen

func (h *BasicExtHost) Listen(pid protocol.ID, rendezvous string)

func (*BasicExtHost) Mux

func (h *BasicExtHost) Mux() protocol.Switch

func (*BasicExtHost) Network

func (h *BasicExtHost) Network() network.Network

func (*BasicExtHost) NewPeer

func (h *BasicExtHost) NewPeer(base protocol.ID) (peer.ID, error)

func (*BasicExtHost) NewStream

func (h *BasicExtHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.ID) (network.Stream, error)

func (*BasicExtHost) Peerstore

func (h *BasicExtHost) Peerstore() peerstore.Peerstore

func (*BasicExtHost) PeerstoreProtocol

func (h *BasicExtHost) PeerstoreProtocol(base protocol.ID) (peerstore.Peerstore, error)

func (*BasicExtHost) Raise

func (h *BasicExtHost) Raise(err error)

func (*BasicExtHost) RemoveStreamHandler

func (h *BasicExtHost) RemoveStreamHandler(pid protocol.ID)

func (*BasicExtHost) SelfStream

func (h *BasicExtHost) SelfStream(pid ...protocol.ID) (SelfStream, error)

func (*BasicExtHost) SetCloseHandler

func (h *BasicExtHost) SetCloseHandler(handler func())

func (*BasicExtHost) SetErrorHandler

func (h *BasicExtHost) SetErrorHandler(handler func(error))

func (*BasicExtHost) SetStreamHandler

func (h *BasicExtHost) SetStreamHandler(pid protocol.ID, handler network.StreamHandler)

func (*BasicExtHost) SetStreamHandlerMatch

func (h *BasicExtHost) SetStreamHandlerMatch(pid protocol.ID, match func(string) bool, handler network.StreamHandler)

type BasicFunctionsCloser

type BasicFunctionsCloser struct {
	Mutex             sync.Mutex
	Ended             bool
	EndHandler        *func()
	AdditionalHandler []func() error
	ErrorHandler      *func(error)
}

func (*BasicFunctionsCloser) Check

func (b *BasicFunctionsCloser) Check() bool

func (*BasicFunctionsCloser) Close

func (b *BasicFunctionsCloser) Close() error

func (*BasicFunctionsCloser) Raise

func (b *BasicFunctionsCloser) Raise(err error)

func (*BasicFunctionsCloser) SetCloseHandler

func (b *BasicFunctionsCloser) SetCloseHandler(handler func())

func (*BasicFunctionsCloser) SetErrorHandler

func (b *BasicFunctionsCloser) SetErrorHandler(handler func(error))

type BasicMasterComm

type BasicMasterComm struct {
	Mutex    sync.Mutex
	Param    Param
	Ctx      context.Context
	Comm     SlaveComm
	Standard standardFunctionsCloser
}

func (*BasicMasterComm) Check

func (c *BasicMasterComm) Check() bool

func (*BasicMasterComm) Close

func (c *BasicMasterComm) Close() error

func (*BasicMasterComm) Raise

func (c *BasicMasterComm) Raise(err error)

func (*BasicMasterComm) Reset

func (c *BasicMasterComm) Reset(i int, slaveId int)

func (*BasicMasterComm) SetCloseHandler

func (c *BasicMasterComm) SetCloseHandler(handler func())

func (*BasicMasterComm) SetErrorHandler

func (c *BasicMasterComm) SetErrorHandler(handler func(error))

func (*BasicMasterComm) SlaveComm

func (c *BasicMasterComm) SlaveComm() SlaveComm

type BasicMpi

type BasicMpi struct {
	ToClose    sync.Map
	Ctx        context.Context
	Pid        protocol.ID
	Maxsize    uint64
	Path       string
	Ipfs_store string
	MpiHost    ExtHost
	MpiStore   Store
	Id         safeInt
	Standard   standardFunctionsCloser

	NewSlaveComm       func(context.Context, ExtHost, io.ReadWriteCloser, protocol.ID, Param, Interface, []Remote) (SlaveComm, error)
	NewMasterSlaveComm func(context.Context, ExtHost, protocol.ID, Param, Interface, []Remote) (SlaveComm, error)
	NewMasterComm      func(context.Context, SlaveComm, Param) (MasterComm, error)
	NewInterface       func(ctx context.Context, file string, n int, i int, args ...string) (Interface, error)
	NewRemote          func(context.Context, int) (Remote, error)
	NewLogger          func(string, int, int) (func(string), error)
}

func (*BasicMpi) Add

func (m *BasicMpi) Add(file string) error

func (*BasicMpi) Check

func (m *BasicMpi) Check() bool

func (*BasicMpi) Close

func (m *BasicMpi) Close() error

func (*BasicMpi) Del

func (m *BasicMpi) Del(f string) error

func (*BasicMpi) Get

func (m *BasicMpi) Get(maxsize uint64) error

func (*BasicMpi) Host

func (m *BasicMpi) Host() ExtHost

func (*BasicMpi) Raise

func (m *BasicMpi) Raise(err error)

func (*BasicMpi) SetCloseHandler

func (m *BasicMpi) SetCloseHandler(handler func())

func (*BasicMpi) SetErrorHandler

func (m *BasicMpi) SetErrorHandler(handler func(error))

func (*BasicMpi) SetInitFunctions

func (m *BasicMpi) SetInitFunctions(
	newSlaveComm func(context.Context, ExtHost, io.ReadWriteCloser, protocol.ID, Param, Interface, []Remote) (SlaveComm, error),
	newMasterSlaveComm func(context.Context, ExtHost, protocol.ID, Param, Interface, []Remote) (SlaveComm, error),
	newMasterComm func(context.Context, SlaveComm, Param) (MasterComm, error),
	newInterface func(context.Context, string, int, int, ...string) (Interface, error),
	newRemote func(context.Context, int) (Remote, error),
	newLogger func(string, int, int) (func(string), error))

func (*BasicMpi) Start

func (m *BasicMpi) Start(file string, n int, args ...string) (err error)

func (*BasicMpi) Store

func (m *BasicMpi) Store() Store

type BasicRemote

type BasicRemote struct {
	Mutex sync.Mutex

	ReadChan      *safeChannelString
	HandshakeChan *safeChannelBool
	SendChan      *safeChannelString

	Ctx          context.Context
	Id           int
	ResetHandler *func(int, int)
	PingInterval time.Duration
	PingTimeout  time.Duration
	Sent         []interface{}
	Rw           io.ReadWriteCloser
	Received     int
	Standard     standardFunctionsCloser
}

func (*BasicRemote) Check

func (r *BasicRemote) Check() bool

func (*BasicRemote) Close

func (r *BasicRemote) Close() error

func (*BasicRemote) CloseRemote

func (r *BasicRemote) CloseRemote()

func (*BasicRemote) Get

func (r *BasicRemote) Get() string

func (*BasicRemote) Raise

func (r *BasicRemote) Raise(err error)

func (*BasicRemote) RequestReset

func (r *BasicRemote) RequestReset(i int, slaveId int)

func (*BasicRemote) Reset

func (r *BasicRemote) Reset(stream io.ReadWriteCloser, slaveId int, msgs ...interface{})

func (*BasicRemote) Send

func (r *BasicRemote) Send(msg string)

func (*BasicRemote) SendHandshake

func (r *BasicRemote) SendHandshake()

func (*BasicRemote) SetCloseHandler

func (r *BasicRemote) SetCloseHandler(handler func())

func (*BasicRemote) SetErrorHandler

func (r *BasicRemote) SetErrorHandler(handler func(error))

func (*BasicRemote) SetPingInterval

func (r *BasicRemote) SetPingInterval(interval time.Duration)

func (*BasicRemote) SetPingTimeout

func (r *BasicRemote) SetPingTimeout(timeoutDuration time.Duration)

func (*BasicRemote) SetResetHandler

func (r *BasicRemote) SetResetHandler(handler func(int, int))

func (*BasicRemote) SlaveId

func (r *BasicRemote) SlaveId() int

func (*BasicRemote) Stream

func (r *BasicRemote) Stream() io.ReadWriteCloser

func (*BasicRemote) WaitHandshake

func (r *BasicRemote) WaitHandshake()

type BasicSlaveComm

type BasicSlaveComm struct {
	SlaveIdMutex sync.Mutex
	RemotesMutex sync.Mutex

	Param Param

	Ctx      context.Context
	Inter    Interface
	CommHost ExtHost
	Base     protocol.ID
	Remotes  []Remote
	Standard standardFunctionsCloser
}

func (*BasicSlaveComm) Check

func (c *BasicSlaveComm) Check() bool

func (*BasicSlaveComm) Close

func (c *BasicSlaveComm) Close() error

func (*BasicSlaveComm) Connect

func (c *BasicSlaveComm) Connect(i int, addr peer.ID, msgs ...interface{}) error

func (*BasicSlaveComm) Host

func (c *BasicSlaveComm) Host() ExtHost

func (*BasicSlaveComm) Interface

func (c *BasicSlaveComm) Interface() Interface

func (*BasicSlaveComm) Protocol

func (c *BasicSlaveComm) Protocol() protocol.ID

func (*BasicSlaveComm) Raise

func (c *BasicSlaveComm) Raise(err error)

func (*BasicSlaveComm) Remote

func (c *BasicSlaveComm) Remote(idx int) Remote

func (*BasicSlaveComm) RequestReset

func (c *BasicSlaveComm) RequestReset(i int)

func (*BasicSlaveComm) SetCloseHandler

func (c *BasicSlaveComm) SetCloseHandler(handler func())

func (*BasicSlaveComm) SetErrorHandler

func (c *BasicSlaveComm) SetErrorHandler(handler func(error))

func (*BasicSlaveComm) Start

func (c *BasicSlaveComm) Start()

type CloseableBuffer

type CloseableBuffer struct {
	WritePipe         *io.PipeWriter
	ReadPipe          *io.PipeReader
	WritePipeReversed *io.PipeWriter
	ReadPipeReversed  *io.PipeReader
	WriteTimeout      time.Duration
	ReadTimeout       time.Duration
	Ended             bool
	Mutex             sync.Mutex
	Pid               protocol.ID
}

func (*CloseableBuffer) Close

func (b *CloseableBuffer) Close() error

func (*CloseableBuffer) Conn

func (b *CloseableBuffer) Conn() network.Conn

func (*CloseableBuffer) Protocol

func (b *CloseableBuffer) Protocol() protocol.ID

func (*CloseableBuffer) Read

func (b *CloseableBuffer) Read(p []byte) (int, error)

func (*CloseableBuffer) Reset

func (b *CloseableBuffer) Reset() error

func (*CloseableBuffer) Reverse

func (b *CloseableBuffer) Reverse() (SelfStream, error)

func (*CloseableBuffer) SetDeadline

func (b *CloseableBuffer) SetDeadline(t time.Time) error

func (*CloseableBuffer) SetProtocol

func (b *CloseableBuffer) SetProtocol(pid protocol.ID)

func (*CloseableBuffer) SetReadDeadline

func (b *CloseableBuffer) SetReadDeadline(t time.Time) error

func (*CloseableBuffer) SetWriteDeadline

func (b *CloseableBuffer) SetWriteDeadline(t time.Time) error

func (*CloseableBuffer) Stat

func (b *CloseableBuffer) Stat() network.Stat

func (*CloseableBuffer) Write

func (b *CloseableBuffer) Write(p []byte) (int, error)

type Config

type Config struct {
	Url            string
	Path           string
	Ipfs_store     string
	Maxsize        uint64
	Base           string
	BootstrapPeers addrList
}

type ExtHost

type ExtHost interface {
	host.Host

	PeerstoreProtocol(protocol.ID) (peerstore.Peerstore, error)
	NewPeer(protocol.ID) (peer.ID, error)
	Listen(protocol.ID, string)
	SelfStream(...protocol.ID) (SelfStream, error)
	// contains filtered or unexported methods
}

func NewHost

func NewHost(ctx context.Context, bootstrapPeers ...maddr.Multiaddr) (ExtHost, error)

type Interface

type Interface interface {
	Start()
	SetLogger(func(string))
	SetResetHandler(func(int))
	SetMessageHandler(func(int, string))
	SetRequestHandler(func(int))
	Push(string) error
	// contains filtered or unexported methods
}

func NewInterface

func NewInterface(ctx context.Context, file string, n int, i int, args ...string) (Interface, error)

type IpfsShell

type IpfsShell struct {
	Shell      *shell.Shell
	Store      []string
	Accessible []object
	Failed     []string
	Path       string
	IpfsStore  string
	Standard   standardFunctionsCloser
}

func (*IpfsShell) Add

func (s *IpfsShell) Add(f string)

func (*IpfsShell) Check

func (s *IpfsShell) Check() bool

func (*IpfsShell) Close

func (s *IpfsShell) Close() error

func (*IpfsShell) Del

func (s *IpfsShell) Del(f string, failed bool) error

func (*IpfsShell) Dowload

func (s *IpfsShell) Dowload(f string) error

func (*IpfsShell) Get

func (s *IpfsShell) Get(maxSize uint64) (string, error)

func (*IpfsShell) Has

func (s *IpfsShell) Has(f string) bool

func (*IpfsShell) List

func (s *IpfsShell) List() []string

func (*IpfsShell) Occupied

func (s *IpfsShell) Occupied() (uint64, error)

func (*IpfsShell) Raise

func (s *IpfsShell) Raise(err error)

func (*IpfsShell) SetCloseHandler

func (s *IpfsShell) SetCloseHandler(handler func())

func (*IpfsShell) SetErrorHandler

func (s *IpfsShell) SetErrorHandler(handler func(error))

type MasterComm

type MasterComm interface {
	SlaveComm() SlaveComm
	Reset(idx int, ith_time int)
	// contains filtered or unexported methods
}

func NewMasterComm

func NewMasterComm(ctx context.Context, slaveComm SlaveComm, param Param) (_ MasterComm, err error)

type Mpi

type Mpi interface {
	SetInitFunctions(
		newSlaveComm func(context.Context, ExtHost, io.ReadWriteCloser, protocol.ID, Param, Interface, []Remote) (SlaveComm, error),
		newMasterSlaveComm func(context.Context, ExtHost, protocol.ID, Param, Interface, []Remote) (SlaveComm, error),
		newMasterComm func(context.Context, SlaveComm, Param) (MasterComm, error),
		newInterface func(context.Context, string, int, int, ...string) (Interface, error),
		newRemote func(context.Context, int) (Remote, error),
		newLogger func(string, int, int) (func(string), error),
	)

	Add(string) error
	Del(string) error
	Get(uint64) error

	Host() ExtHost
	Store() Store
	Start(string, int, ...string) error
	// contains filtered or unexported methods
}

func NewMpi

func NewMpi(ctx context.Context, config Config, host ExtHost, store Store) (Mpi, error)

type Param

type Param struct {
	Init     bool
	Idx      int
	N        int
	Id       string
	SlaveIds []int
	Addrs    []peer.ID
}

func ParamFromString

func ParamFromString(msg string) (_ Param, err error)

func (*Param) String

func (p *Param) String() string

type Remote

type Remote interface {
	SlaveId() int
	SetResetHandler(func(int, int))
	RequestReset(int, int)
	CloseRemote()
	SetPingInterval(time.Duration)
	SetPingTimeout(time.Duration)
	Stream() io.ReadWriteCloser
	Reset(stream io.ReadWriteCloser, slaveId int, msgs ...interface{})
	Get() string
	WaitHandshake()
	Send(string)
	SendHandshake()
	// contains filtered or unexported methods
}

func NewRemote

func NewRemote(ctx context.Context, slaveId int) (Remote, error)

type SelfStream

type SelfStream interface {
	Reverse() (SelfStream, error)

	network.Stream
}

func NewStream

func NewStream(pid protocol.ID) SelfStream

type SlaveComm

type SlaveComm interface {
	Protocol() protocol.ID
	RequestReset(int)
	Start()
	Host() ExtHost
	Interface() Interface
	Remote(int) Remote
	Connect(int, peer.ID, ...interface{}) error
	// contains filtered or unexported methods
}

func NewMasterSlaveComm

func NewMasterSlaveComm(ctx context.Context, host ExtHost, base protocol.ID, param Param, inter Interface, Remotes []Remote) (_ SlaveComm, err error)

func NewSlaveComm

func NewSlaveComm(ctx context.Context, host ExtHost, zeroRw io.ReadWriteCloser, base protocol.ID, param Param, inter Interface, remotes []Remote) (_ SlaveComm, err error)

type StdInterface

type StdInterface struct {
	Ctx            context.Context
	Stdin          io.Writer
	MessageHandler *func(int, string)
	RequestHandler *func(int)
	ResetHandler   *func(int)
	Logger         *func(string)
	Idx            int
	Cmd            *exec.Cmd
	Standard       standardFunctionsCloser
}

func (*StdInterface) Check

func (s *StdInterface) Check() bool

func (*StdInterface) Close

func (s *StdInterface) Close() error

func (*StdInterface) Push

func (s *StdInterface) Push(msg string) error

func (*StdInterface) Raise

func (s *StdInterface) Raise(err error)

func (*StdInterface) SetCloseHandler

func (s *StdInterface) SetCloseHandler(handler func())

func (*StdInterface) SetErrorHandler

func (s *StdInterface) SetErrorHandler(handler func(error))

func (*StdInterface) SetLogger

func (s *StdInterface) SetLogger(handler func(string))

func (*StdInterface) SetMessageHandler

func (s *StdInterface) SetMessageHandler(handler func(int, string))

func (*StdInterface) SetRequestHandler

func (s *StdInterface) SetRequestHandler(handler func(int))

func (*StdInterface) SetResetHandler

func (s *StdInterface) SetResetHandler(handler func(int))

func (*StdInterface) Start

func (s *StdInterface) Start()

type Store

type Store interface {
	Add(string)
	List() []string
	Has(string) bool
	Del(name string, failed bool) error
	Dowload(string) error
	Occupied() (uint64, error)
	Get(uint64) (string, error)
	// contains filtered or unexported methods
}

func NewStore

func NewStore(url string, path string, ipfs_store string) (Store, error)

type StreamHandlerMatcher

type StreamHandlerMatcher struct {
	Match   func(string) bool
	Handler network.StreamHandler
}

Directories

Path Synopsis
messagestore module
store module
interfaces Module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL