tentacle

package module
v0.0.0-...-34879f3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 15, 2022 License: MIT Imports: 17 Imported by: 0

README

tentacle-go

Tentacle-go implementation

The tentacle framework has been running in the production environment for a long time, this project is a verification project used to verify the feasibility of the multi-language implementation of the tentacle framework, while ensuring the performance of the implementation as much as possible, but will not do a wider range of performance optimization work.

Development status

At present, example can communicate normally with the Rust version

Usage

Example
$ go build example/tentacle_example/simple.go
$ ./simple server

On another terminal:

$ ./simple

Now you can see some data interaction information on the terminal.

Communicate with the Rust version implementation
$ git clone https://github.com/nervosnetwork/tentacle.git
$ RUST_LOG=simple=info,tentacle=debug cargo run --example simple --features molc -- server

On another terminal:

$ go build example/tentacle_example/simple.go
$ ./simple

Or, you can use Go's server to communicate with Rust's client

API

Most of the api implementations are similar to the Rust version, mainly considering the comparison of the production version, but the Go The implementation has been streamlined and adapted to the Go language.

Documentation

Index

Constants

View Source
const (
	// All try open all protocol, target is nil
	All uint8 = iota
	// Single try open one protocol, target is ProtocolID/SessionID
	Single
	// Multi try open some protocol, target is []ProtocolID/[]SessionID
	Multi
)
View Source
const (
	// SessionClose a session close
	SessionClose uint = iota
	// SessionOpen a session open
	SessionOpen
	// ListenClose a listener close
	ListenClose
	// ListenStarted a listener started
	ListenStarted
)
View Source
const (
	// DialerError when dial remote error
	DialerError uint = iota
	// ListenError when listen error
	ListenError
	// ProtocolSelectError Protocol select fail
	ProtocolSelectError
	// ProtocolError Protocol error during interaction
	ProtocolError
	// SessionTimeout After initializing the connection, the session does not open any protocol,
	// suspected fd attack
	SessionTimeout
	// MuxerError Multiplex protocol error
	MuxerError
	// ProtocolHandleError protocol handle error, will cause memory leaks/abnormal CPU usage
	ProtocolHandleError
)
View Source
const (

	// RepeatedConnection connected to the connected peer, inner type SessionID
	RepeatedConnection uint = iota
	// PeerIDNotMatch when dial remote, peer id does not match
	PeerIDNotMatch
	// HandshakeError handshake error
	HandshakeError
	// TransportError transport error
	TransportError
	// IoError IO error
	IoError
)

Variables

View Source
var DefaultBeforeReceive = func(b []byte) []byte {
	return b
}

DefaultBeforeReceive use by default, do nothing

View Source
var DefaultBeforeSend = func(b []byte) []byte {
	return b
}

DefaultBeforeSend use by default, do nothing

View Source
var DefaultCodec = func(conn net.Conn) Codec {
	return msgio.Combine(msgio.NewWriter(conn), msgio.NewReader(conn))
}

DefaultCodec use by default, is a LengthDelimitedCodec

View Source
var DefaultNameFn = func(id ProtocolID) string {
	return fmt.Sprintf("/p2p/%s", strconv.Itoa(int(id)))
}

DefaultNameFn default protocol name

View Source
var ErrBrokenPipe = errors.New("BrokenPipe")

ErrBrokenPipe service has been shutdown

View Source
var ErrDialTimeout = errors.New("dial timeout")

ErrDialTimeout dial timeout

View Source
var ErrHandshakeTimeout = errors.New("handshake timeout")

ErrHandshakeTimeout secio handshake timeout

View Source
var ErrListenerTimeout = errors.New("listen timeout")

ErrListenerTimeout listen timeout

View Source
var ErrNotSupport = errors.New("Protocol doesn't support")

ErrNotSupport protocol doesn't support

View Source
var ErrProtocolNotMatch = errors.New("Don't support this protocol")

ErrProtocolNotMatch is server don't support this protocol

View Source
var ErrVersionNotMatch = errors.New("Can't find the same version")

ErrVersionNotMatch is node can't support this version

Functions

func ExtractPeerID

func ExtractPeerID(addr ma.Multiaddr) (secio.PeerID, error)

ExtractPeerID get peer id from multiaddr

func SelectVersion

func SelectVersion(local, remote []string) (string, error)

SelectVersion choose the highest version of the two sides, assume that slices are sorted

Types

type BeforeReceive

type BeforeReceive func([]byte) []byte

BeforeReceive unified processing of messages before user received

type BeforeSend

type BeforeSend func([]byte) []byte

BeforeSend unified processing of messages before they are sent

type Codec

type Codec interface {
	// ReadMsg reads the next message from the Reader
	ReadMsg() ([]byte, error)
	// WriteMsg writes the msg in the passed
	WriteMsg([]byte) error
	io.Closer
}

Codec use on protocol stream to en/decode message

type CodecFn

type CodecFn func(net.Conn) Codec

CodecFn generate a codec

type DialerErrorInner

type DialerErrorInner struct {
	Tag   uint
	Inner interface{}
	// remote addr
	Addr ma.Multiaddr
}

DialerErrorInner when dial remote error

func (*DialerErrorInner) Name

func (d *DialerErrorInner) Name() string

Name error tag name

func (*DialerErrorInner) String

func (d *DialerErrorInner) String() string

type ListenErrorInner

type ListenErrorInner struct {
	Tag   uint
	Inner interface{}
	// Listen address
	Addr ma.Multiaddr
}

ListenErrorInner when listen error

func (*ListenErrorInner) Name

func (d *ListenErrorInner) Name() string

Name error tag name

func (*ListenErrorInner) String

func (d *ListenErrorInner) String() string

type MetaBuilder

type MetaBuilder struct {
	// contains filtered or unexported fields
}

MetaBuilder builder for protocol meta

func DefaultMeta

func DefaultMeta() *MetaBuilder

DefaultMeta make a default builder

func (*MetaBuilder) BeforeReceive

func (m *MetaBuilder) BeforeReceive(beforeRecv BeforeReceive) *MetaBuilder

BeforeReceive unified processing of messages before user received

func (*MetaBuilder) BeforeSend

func (m *MetaBuilder) BeforeSend(beforeSend BeforeSend) *MetaBuilder

BeforeSend unified processing of messages before user received

func (*MetaBuilder) Build

func (m *MetaBuilder) Build() ProtocolMeta

Build combine the configuration of this builder to create a ProtocolMeta

func (*MetaBuilder) Codec

func (m *MetaBuilder) Codec(codec CodecFn) *MetaBuilder

Codec define protocol codec, default is LengthDelimitedCodec

func (*MetaBuilder) ID

func (m *MetaBuilder) ID(pid ProtocolID) *MetaBuilder

ID define protocol id

It is just an internal index of the system that identifies the open/close and message transfer for the specified protocol.

func (*MetaBuilder) Name

func (m *MetaBuilder) Name(name NameFn) *MetaBuilder

Name define protocol name, default is "/p2p/protocol_id"

Used to interact with the remote service to determine whether the protocol is supported.

If not found, the protocol connection(not session just sub stream) will be closed, and return a `ProtocolSelectError` event.

func (*MetaBuilder) ProtoSpawn

func (m *MetaBuilder) ProtoSpawn(protoSpawn ProtocolSpawn) *MetaBuilder

ProtoSpawn define the spawn process of the protocol read part

Mutually exclusive with protocol handle

func (*MetaBuilder) SelectVersion

func (m *MetaBuilder) SelectVersion(selectfn SelectFn) *MetaBuilder

SelectVersion protocol version selection rule, default is `SelectVersion`

func (*MetaBuilder) ServiceHandle

func (m *MetaBuilder) ServiceHandle(service ServiceProtocol) *MetaBuilder

ServiceHandle define protocol service handle, default is nil

func (*MetaBuilder) SessionHandle

func (m *MetaBuilder) SessionHandle(sessionFn SessionProtocolFn) *MetaBuilder

SessionHandle define protocol session handle, default is nil

func (*MetaBuilder) SupportVersions

func (m *MetaBuilder) SupportVersions(versions []string) *MetaBuilder

SupportVersions define protocol support versions, default is `[]string{"0.0.1"}`

Used to interact with the remote service to confirm that both parties open the same version of the protocol.

If not found, the protocol connection(not session just sub stream) will be closed, and return a `ProtocolSelectError` event.

type MuxerErrorInner

type MuxerErrorInner struct {
	Context *SessionContext
	Err     error
}

MuxerErrorInner multiplex protocol error

func (*MuxerErrorInner) String

func (s *MuxerErrorInner) String() string

type NameFn

type NameFn = func(ProtocolID) string

NameFn define protocol name, default is "/p2p/protocol_id"

Used to interact with the remote service to determine whether the protocol is supported.

If not found, the protocol connection(not session just sub stream) will be closed, and return a `ProtocolSelectError` event.

type ProtocolContext

type ProtocolContext struct {
	*ServiceContext
	Pid ProtocolID
}

ProtocolContext context with current protocol

type ProtocolContextRef

type ProtocolContextRef struct {
	*ProtocolContext
	*SessionContext
}

ProtocolContextRef context with current protocol and session

func (*ProtocolContextRef) QuickSendMessage

func (c *ProtocolContextRef) QuickSendMessage(data []byte)

QuickSendMessage send message to current protocol current session on quick channel

func (*ProtocolContextRef) SendMessage

func (c *ProtocolContextRef) SendMessage(data []byte)

SendMessage send message to current protocol current session

type ProtocolErrorInner

type ProtocolErrorInner struct {
	SID SessionID
	PID ProtocolID
	// Codec error
	Err error
}

ProtocolErrorInner protocol error during interaction

func (*ProtocolErrorInner) String

func (p *ProtocolErrorInner) String() string

type ProtocolHandleErrorInner

type ProtocolHandleErrorInner struct {
	PID ProtocolID
	// If SID == 0, it means that can not locate which session case this error
	SID SessionID
}

ProtocolHandleErrorInner is inner msg of this error

func (*ProtocolHandleErrorInner) String

func (p *ProtocolHandleErrorInner) String() string

type ProtocolID

type ProtocolID uint

ProtocolID define the protocol id

type ProtocolInfo

type ProtocolInfo struct {
	// contains filtered or unexported fields
}

ProtocolInfo is the handshake message of open protocol

type ProtocolMeta

type ProtocolMeta struct {
	// contains filtered or unexported fields
}

ProtocolMeta define the minimum data required for a custom protocol

type ProtocolSelectErrorInner

type ProtocolSelectErrorInner struct {
	// Protocol name, if none, timeout or other net problem,
	// if Some, don't support this proto
	Name string
	// Session context
	Context *SessionContext
}

ProtocolSelectErrorInner protocol select fail

func (*ProtocolSelectErrorInner) String

func (p *ProtocolSelectErrorInner) String() string

type ProtocolSpawn

type ProtocolSpawn interface {
	// Spawn call on protocol opened
	// It is assumed that the user will use the go syntax internally to put the Reader in a separate goroutine for execution
	Spawn(ctx *SessionContext, control *Service, read SubstreamReadPart)
}

When the negotiation is completed and the agreement is opened, will call the implementation, allow users to implement the read processing of the protocol by themselves

Implementing this interface means that streaming reading directly from the underlying substream will become possible

This interface implementation and the callback implementation are mutually exclusive, and will be checked during construction, if both exist, it will panic

type SelectFn

type SelectFn func([]string, []string) (string, error)

SelectFn is the function for protocol version select

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service user handle

func (*Service) CloseProtocol

func (s *Service) CloseProtocol(sid SessionID, pid ProtocolID) error

CloseProtocol try close a protocol, if the protocol has been closed, do nothing

func (*Service) Dial

func (s *Service) Dial(addr ma.Multiaddr, target TargetProtocol) error

Dial initiate a connection request to address, if service is shutdown/addr not support, return error

func (*Service) Disconnect

func (s *Service) Disconnect(id SessionID) error

Disconnect a connection

func (*Service) FilterBroadcast

func (s *Service) FilterBroadcast(target TargetSession, pid ProtocolID, data []byte) error

FilterBroadcast send data to the specified protocol for the specified sessions.

func (*Service) IsShutdown

func (s *Service) IsShutdown() bool

IsShutdown determine whether to shutdown

func (*Service) Key

func (s *Service) Key() secio.PrivKey

Key get local private key

func (*Service) Listen

func (s *Service) Listen(addr ma.Multiaddr) (ma.Multiaddr, error)

Listen create a new listener, blocking here util listen finished and return listen addr

func (*Service) ListenAsync

func (s *Service) ListenAsync(addr ma.Multiaddr) error

ListenAsync try create a new listener, if service is shutdown/addr not support, return error

func (*Service) OpenProtocol

func (s *Service) OpenProtocol(sid SessionID, pid ProtocolID) error

OpenProtocol try open a protocol, if the protocol has been open, do nothing

func (*Service) OpenProtocols

func (s *Service) OpenProtocols(sid SessionID, target TargetProtocol) error

OpenProtocols try open protocols, if the protocol has been open, do nothing

func (*Service) QuickFilterBroadcast

func (s *Service) QuickFilterBroadcast(target TargetSession, pid ProtocolID, data []byte) error

QuickFilterBroadcast send data to the specified protocol for the specified sessions on quick channel

func (*Service) QuickSendMessageTo

func (s *Service) QuickSendMessageTo(id SessionID, pid ProtocolID, data []byte) error

QuickSendMessageTo send message on quick channel

func (*Service) RemoveServiceNotify

func (s *Service) RemoveServiceNotify(pid ProtocolID, token uint64) error

RemoveServiceNotify remove a service notify token

func (*Service) RemoveSessionNotify

func (s *Service) RemoveSessionNotify(sid SessionID, pid ProtocolID, token uint64) error

RemoveSessionNotify remove a session notify token

func (*Service) SendMessageTo

func (s *Service) SendMessageTo(id SessionID, pid ProtocolID, data []byte) error

SendMessageTo send message

func (*Service) SetServiceNotify

func (s *Service) SetServiceNotify(pid ProtocolID, interval time.Duration, token uint64) error

SetServiceNotify set a service notify token

func (*Service) SetSessionNotify

func (s *Service) SetSessionNotify(sid SessionID, pid ProtocolID, interval time.Duration, token uint64) error

SetSessionNotify set a session notify token

func (*Service) Shutdown

func (s *Service) Shutdown() error

Shutdown service, Order: 1. close all listens 2. try close all session's protocol stream 3. try close all session 4. close service

type ServiceBuilder

type ServiceBuilder struct {
	// contains filtered or unexported fields
}

ServiceBuilder builder for Service

func DefaultServiceBuilder

func DefaultServiceBuilder() *ServiceBuilder

DefaultServiceBuilder create a default empty builder

func (*ServiceBuilder) Build

func (s *ServiceBuilder) Build(handle ServiceHandle) *Service

Build combine the configuration of this builder with service handle to create a Service.

func (*ServiceBuilder) ChannelSize

func (s *ServiceBuilder) ChannelSize(size uint) *ServiceBuilder

ChannelSize the size of each channel used on tentacle

Default is 128

func (*ServiceBuilder) Forever

func (s *ServiceBuilder) Forever(forever bool) *ServiceBuilder

Forever when the service has no tasks, it will be turned off by default. If you do not want to close service, set it to true.

func (*ServiceBuilder) InsertProtocol

func (s *ServiceBuilder) InsertProtocol(protocol ProtocolMeta) *ServiceBuilder

InsertProtocol insert a custom protocol

func (*ServiceBuilder) KeyPair

func (s *ServiceBuilder) KeyPair(key secio.PrivKey) *ServiceBuilder

KeyPair enable encrypted communication mode.

If you do not need encrypted communication, you do not need to call this method

func (*ServiceBuilder) MaxConnectionNumber

func (s *ServiceBuilder) MaxConnectionNumber(num uint) *ServiceBuilder

MaxConnectionNumber the limit of max open connection(file descriptors) If not limited, service will try to serve as many connections as possible until it exhausts system resources(os error), and then close the listener, no longer accepting new connection requests, and the established connections remain working

Default is 65535

func (*ServiceBuilder) TCPBind

func (s *ServiceBuilder) TCPBind(addr ma.Multiaddr) *ServiceBuilder

TCPBind use to bind all tcp session to listen port

func (*ServiceBuilder) TimeOut

func (s *ServiceBuilder) TimeOut(timeout time.Duration) *ServiceBuilder

TimeOut for handshake and connect Default 10 second

func (*ServiceBuilder) WsBind

func (s *ServiceBuilder) WsBind(addr ma.Multiaddr) *ServiceBuilder

WsBind use to bind all ws session to listen port

func (*ServiceBuilder) YamuxConfig

func (s *ServiceBuilder) YamuxConfig(config *yamux.Config) *ServiceBuilder

YamuxConfig for service

type ServiceContext

type ServiceContext struct {
	Listens []ma.Multiaddr
	Key     secio.PrivKey
	// contains filtered or unexported fields
}

ServiceContext context with current service

func (*ServiceContext) CloseProtocol

func (s *ServiceContext) CloseProtocol(sid SessionID, pid ProtocolID)

CloseProtocol try close a protocol, if the protocol has been closed, do nothing

func (*ServiceContext) Dial

func (s *ServiceContext) Dial(addr ma.Multiaddr, target TargetProtocol) error

Dial initiate a connection request to address, if addr not support, return error

func (*ServiceContext) Disconnect

func (s *ServiceContext) Disconnect(id SessionID)

Disconnect a connection

func (*ServiceContext) FilterBroadcast

func (s *ServiceContext) FilterBroadcast(target TargetSession, pid ProtocolID, data []byte)

FilterBroadcast send data to the specified protocol for the specified sessions.

func (*ServiceContext) ListenAsync

func (s *ServiceContext) ListenAsync(addr ma.Multiaddr) error

ListenAsync try create a new listener, if addr not support, return error

func (*ServiceContext) OpenProtocol

func (s *ServiceContext) OpenProtocol(sid SessionID, pid ProtocolID)

OpenProtocol try open a protocol, if the protocol has been open, do nothing

func (*ServiceContext) OpenProtocols

func (s *ServiceContext) OpenProtocols(sid SessionID, target TargetProtocol)

OpenProtocols try open protocols, if the protocol has been open, do nothing

func (*ServiceContext) QuickFilterBroadcast

func (s *ServiceContext) QuickFilterBroadcast(target TargetSession, pid ProtocolID, data []byte)

QuickFilterBroadcast send data to the specified protocol for the specified sessions on quick channel

func (*ServiceContext) QuickSendMessageTo

func (s *ServiceContext) QuickSendMessageTo(id SessionID, pid ProtocolID, data []byte)

QuickSendMessageTo send message on quick channel

func (*ServiceContext) RemoveServiceNotify

func (s *ServiceContext) RemoveServiceNotify(pid ProtocolID, token uint64)

RemoveServiceNotify remove a service notify token

func (*ServiceContext) RemoveSessionNotify

func (s *ServiceContext) RemoveSessionNotify(sid SessionID, pid ProtocolID, token uint64)

RemoveSessionNotify remove a session notify token

func (*ServiceContext) SendMessageTo

func (s *ServiceContext) SendMessageTo(id SessionID, pid ProtocolID, data []byte)

SendMessageTo send message

func (*ServiceContext) SetServiceNotify

func (s *ServiceContext) SetServiceNotify(pid ProtocolID, interval time.Duration, token uint64)

SetServiceNotify set a service notify token

func (*ServiceContext) SetSessionNotify

func (s *ServiceContext) SetSessionNotify(sid SessionID, pid ProtocolID, interval time.Duration, token uint64)

SetSessionNotify set a session notify token

func (*ServiceContext) Shutdown

func (s *ServiceContext) Shutdown()

Shutdown service, Order: 1. close all listens 2. try close all session's protocol stream 3. try close all session 4. close service

type ServiceError

type ServiceError struct {
	Tag   uint
	Event interface{}
}

ServiceError error generated by the Service

func (*ServiceError) Name

func (s *ServiceError) Name() string

Name error tag name

func (*ServiceError) String

func (s *ServiceError) String() string

type ServiceEvent

type ServiceEvent struct {
	Tag   uint
	Event interface{}
}

ServiceEvent event generated by the Service

func (*ServiceEvent) Name

func (s *ServiceEvent) Name() string

Name error tag name

func (*ServiceEvent) String

func (s *ServiceEvent) String() string

type ServiceHandle

type ServiceHandle interface {
	// Handling runtime errors
	HandleError(*ServiceContext, ServiceError)
	// Handling session establishment and disconnection events
	HandleEvent(*ServiceContext, ServiceEvent)
}

ServiceHandle is a handle to do something by service

#### Behavior

The handle that exists when the Service is created.

Mainly handle some Service-level errors thrown at runtime, such as listening errors.

At the same time, the session establishment and disconnection messages will also be perceived here.

type ServiceProtocol

type ServiceProtocol interface {
	// This function is called when the service start.
	//
	// The service handle will only be called once
	Init(*ProtocolContext)
	// Called when opening protocol
	Connected(ctx *ProtocolContextRef, version string)
	// Called when closing protocol
	Disconnected(*ProtocolContextRef)
	// Called when the corresponding protocol message is received
	Received(ctx *ProtocolContextRef, data []byte)
	// Called when the Service receives the notify task
	Notify(ctx *ProtocolContext, token uint64)
}

ServiceProtocol is Service level protocol handle

#### Behavior

Define the behavior of each custom protocol in each state.

Depending on whether the user defines a service handle or a session exclusive handle, the runtime has different performance.

The **important difference** is that some state values are allowed in the service handle, and the handle exclusive to the session is "stateless", relative to the service handle, it can only retain the information between a protocol stream on and off.

The opening and closing of the protocol will create and clean up the handle exclusive to the session, but the service handle will remain in the state until the service is closed.

type SessionContext

type SessionContext struct {
	// Sid session id
	Sid SessionID
	// Outbound or Inbound
	Ty SessionType
	// remote addr
	RemoteAddr ma.Multiaddr
	// remote pubkey, may nil on no secio mode
	RemotePub secio.PubKey
	// contains filtered or unexported fields
}

SessionContext context with current session

type SessionID

type SessionID uint

SessionID index of session

type SessionProtocol

type SessionProtocol interface {
	// Called when opening protocol
	Connected(ctx *ProtocolContextRef, version string)
	// Called when closing protocol
	Disconnected(*ProtocolContextRef)
	// Called when the corresponding protocol message is received
	Received(ctx *ProtocolContextRef, data []byte)
	// Called when the session receives the notify task
	Notify(ctx *ProtocolContextRef, token uint64)
}

SessionProtocol is Session level protocol handle

type SessionProtocolFn

type SessionProtocolFn func() SessionProtocol

SessionProtocolFn generate SessionProtocol

type SessionTimeoutInner

type SessionTimeoutInner struct {
	Context *SessionContext
}

SessionTimeoutInner after initializing the connection, the session does not open any protocol, suspected fd attack

func (*SessionTimeoutInner) String

func (s *SessionTimeoutInner) String() string

type SessionType

type SessionType uint8

SessionType Outbound or Inbound Outbound representing yourself as the active party means that you are the client side Inbound representing yourself as a passive recipient means that you are the server side

func (*SessionType) Name

func (t *SessionType) Name() string

Name type name

func (*SessionType) String

func (t *SessionType) String() string

type SubstreamReadPart

type SubstreamReadPart interface {
	// Get next message
	NextMsg() (msg []byte, err error)
	// Get protocol id
	ProtocolID() ProtocolID
	// Get protocol version
	Version() string
}

SubstreamReadPart is the read part for the protocol side

type TargetProtocol

type TargetProtocol struct {
	// must use All/Single/Multi
	Tag    uint8
	Target interface{}
}

TargetProtocol when dial, specify which protocol want to open

type TargetSession

type TargetSession struct {
	// must use All/Single/Multi
	Tag    uint8
	Target interface{}
}

TargetSession when sending a message, select the specified session

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL