transport

package
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 16, 2024 License: Apache-2.0, BSD-2-Clause, BSD-3-Clause, + 2 more Imports: 29 Imported by: 16

README

English | 中文

Background

tRPC frameworks support multiple network protocols, such as tcp, udp, etc. For the udp protocol, a udp packet corresponds to an RPC request or response. For streaming protocols such as tcp, the framework requires additional package splitting mechanism. In order to isolate the differences between different network protocols, tRPC-Go provides a transport abstraction.

Principle

In tRPC-Go:

  • The client transport is responsible for establishing a connection with the peer and providing advanced features such as multiplexed;
  • The server transport is responsible for listening on socket, accepting incoming connections, and processing requests arriving at the connection.

For streaming RPC, tRPC-Go also provides a set of transport abstractions.

Below we will introduce the design and implementation of these transports in turn.

ClientTransport

The interface definition of ClientTransport is as follows:

type ClientTransport interface {
    RoundTrip(ctx context.Context, req interface{}, opts ...RoundTripOption) (rsp interface{}, err error)
}

The RoundTrip method implements the sending and receiving of requests. It supports multiple connection modes, such as connection pooling and multiplexing. It also supports high-performance network library tnet. These options can be set via RoundTripOptions, for example:

rsp, err := transport.RoundTrip(ctx, req,
    transport.WithDialNetwork("tcp"),
    transport.WithDialAddress(":8888"),
    transport.WithMultiplexed(true))

ServerTransport

The interface of ServerTransport is defined as follows:

type ServerTransport interface {
    ListenAndServe(ctx context.Context, opts ...ListenServeOption) error
}

Just like RoundTripOptions of client side, the server has ServerTransportOptions. It can be used to set asynchronous processing, idle timeout, tls certificate, etc.

st := transport.NewServerTransport(transport.WithServerAsync(true))

ClientStreamTransport

ClientStreamTransport is used to send/receive streaming requests. Because the stream is created by the client, it provides the Init method to initialize the stream, such as establishing a network connection with the peer.

type ClientStreamTransport interface {
    Send(ctx context.Context, req []byte, opts ...RoundTripOption) error
    Recv(ctx context.Context, opts ...RoundTripOption) ([]byte, error)
    Init(ctx context.Context, opts ...RoundTripOption) error
    Close(ctx context.Context)
}

The client stream transport uses the same RoundTripOption as the ordinary RPC transport, and its underlying connection also supports multiplexing, etc.

ServerStreamTransport

ServerStreamTransport is used for server-side processing of streaming requests. When the server receives the client's Init packet, it creates a new goroutine to run the user's business logic, and the original network packet receiving goroutine is responsible for dispatching the received packets to the new goroutine.

type ServerStreamTransport interface {
    ServerTransport
    Send(ctx context.Context, req []byte) error
    Close(ctx context.Context)
}

Note that ServerStreamTransport embeds ServerTransport, which is used to listen on the port and create the corresponding network goroutine. Therefore, the ListenServeOption of ordinary RPC is also applicable to the streaming server.

Split Package

tRPC packets are composed of frame header, packet header, and packet body. When the server receives the request or the client receives the response packet (streaming requests are also applicable), the original data stream needs to be divided into individual requests and then handed over to the corresponding processing logic. codec.FramerBuild and codec.Framer are used to split the data stream.

On the client side, you can set the frame builder through WithClientFramerBuilder. On the server side, you can set it through WithServerFramerBuilder.

Documentation

Overview

Package transport is the network transport layer. It is only used for basic binary data network communication without any business logic. By default, there is only one pluggable ServerTransport and ClientTransport.

Index

Constants

View Source
const (
	Connected    = false // UDP which isolates packets from non-same path
	NotConnected = true  // UDP which allows returning packets from non-same path
)

ConnectionMode of UDP.

View Source
const (
	// EnvGraceRestart is the flag of graceful restart.
	EnvGraceRestart = "TRPC_IS_GRACEFUL"

	// EnvGraceFirstFd is the fd of graceful first listener.
	EnvGraceFirstFd = "TRPC_GRACEFUL_1ST_LISTENFD"

	// EnvGraceRestartFdNum is the number of fd for graceful restart.
	EnvGraceRestartFdNum = "TRPC_GRACEFUL_LISTENFD_NUM"

	// EnvGraceRestartPPID is the PPID of graceful restart.
	EnvGraceRestartPPID = "TRPC_GRACEFUL_PPID"
)

Variables

View Source
var (
	// LocalAddrContextKey is the local address context key.
	LocalAddrContextKey = &contextKey{"local-addr"}

	// RemoteAddrContextKey is the remote address context key.
	RemoteAddrContextKey = &contextKey{"remote-addr"}
)
View Source
var DefaultClientStreamTransport = NewClientStreamTransport()

DefaultClientStreamTransport is the default client stream transport.

View Source
var DefaultClientTransport = NewClientTransport()

DefaultClientTransport is the default client transport.

View Source
var DefaultServerStreamTransport = NewServerStreamTransport()

DefaultServerStreamTransport is the default ServerStreamTransport.

View Source
var DefaultServerTransport = NewServerTransport(WithReusePort(true))

DefaultServerTransport is the default implementation of ServerStreamTransport.

Functions

func GetFramerBuilder

func GetFramerBuilder(name string) codec.FramerBuilder

GetFramerBuilder gets the FramerBuilder by name.

func GetPassedListener

func GetPassedListener(network, address string) (interface{}, error)

GetPassedListener gets the inherited listener from parent process by network and address.

func ListenAndServe

func ListenAndServe(opts ...ListenServeOption) error

ListenAndServe wraps and starts the default server transport.

func RegisterClientStreamTransport

func RegisterClientStreamTransport(name string, t ClientStreamTransport)

RegisterClientStreamTransport registers a named ClientStreamTransport.

func RegisterClientTransport

func RegisterClientTransport(name string, t ClientTransport)

RegisterClientTransport register a ClientTransport.

func RegisterFramerBuilder

func RegisterFramerBuilder(name string, fb codec.FramerBuilder)

RegisterFramerBuilder register a codec.FramerBuilder.

func RegisterServerStreamTransport

func RegisterServerStreamTransport(name string, t ServerStreamTransport)

RegisterServerStreamTransport Registers a named ServerStreamTransport.

func RegisterServerTransport

func RegisterServerTransport(name string, t ServerTransport)

RegisterServerTransport register a ServerTransport.

func RemoteAddrFromContext

func RemoteAddrFromContext(ctx context.Context) net.Addr

RemoteAddrFromContext gets remote address from context.

func RoundTrip

func RoundTrip(ctx context.Context, req []byte, opts ...RoundTripOption) ([]byte, error)

RoundTrip wraps and starts the default client transport.

func SaveListener

func SaveListener(listener interface{}) error

SaveListener saves the listener.

Types

type ClientStreamTransport

type ClientStreamTransport interface {
	// Send sends stream messages.
	Send(ctx context.Context, req []byte, opts ...RoundTripOption) error
	// Recv receives stream messages.
	Recv(ctx context.Context, opts ...RoundTripOption) ([]byte, error)
	// Init inits the stream.
	Init(ctx context.Context, opts ...RoundTripOption) error
	// Close closes stream transport, return connection to the resource pool.
	Close(ctx context.Context)
}

ClientStreamTransport is the client stream transport interface. It's compatible with common RPC transport.

func GetClientStreamTransport

func GetClientStreamTransport(name string) ClientStreamTransport

GetClientStreamTransport returns ClientStreamTransport by name.

func NewClientStreamTransport

func NewClientStreamTransport(opts ...ClientStreamTransportOption) ClientStreamTransport

NewClientStreamTransport creates a new ClientStreamTransport.

type ClientStreamTransportOption

type ClientStreamTransportOption func(*cstOptions)

ClientStreamTransportOption sets properties of ClientStreamTransport.

func WithMaxConcurrentStreams

func WithMaxConcurrentStreams(n int) ClientStreamTransportOption

WithMaxConcurrentStreams sets the maximum concurrent streams in each TCP connection.

func WithMaxIdleConnsPerHost

func WithMaxIdleConnsPerHost(n int) ClientStreamTransportOption

WithMaxIdleConnsPerHost sets the maximum idle connections per host.

type ClientTransport

type ClientTransport interface {
	RoundTrip(ctx context.Context, req []byte, opts ...RoundTripOption) (rsp []byte, err error)
}

ClientTransport defines the client transport layer interface.

func GetClientTransport

func GetClientTransport(name string) ClientTransport

GetClientTransport gets the ClientTransport.

func NewClientTransport

func NewClientTransport(opt ...ClientTransportOption) ClientTransport

NewClientTransport creates a new ClientTransport.

type ClientTransportOption

type ClientTransportOption func(*ClientTransportOptions)

ClientTransportOption modifies the ClientTransportOptions.

func WithDisableEncodeTransInfoBase64

func WithDisableEncodeTransInfoBase64() ClientTransportOption

WithDisableEncodeTransInfoBase64 returns a ClientTransportOption indicates disable encoding the transinfo value by base64 in HTTP.

type ClientTransportOptions

type ClientTransportOptions struct {
	DisableHTTPEncodeTransInfoBase64 bool
}

ClientTransportOptions is the client transport options.

type CloseHandler

type CloseHandler interface {
	HandleClose(ctx context.Context) error
}

CloseHandler handles the logic after connection closed.

type ConnectionMode

type ConnectionMode bool

ConnectionMode is the connection mode, either Connected or NotConnected.

type Framer

type Framer = codec.Framer

Framer is the alias of codec.Framer.

type FramerBuilder

type FramerBuilder = codec.FramerBuilder

FramerBuilder is the alias of codec.FramerBuilder.

type Handler

type Handler interface {
	Handle(ctx context.Context, req []byte) (rsp []byte, err error)
}

Handler is the process function when server transport receive a package.

type ListenFd

type ListenFd struct {
	Fd      uintptr
	Name    string
	Network string
	Address string
}

ListenFd is the listener fd.

func GetListenersFds

func GetListenersFds() []*ListenFd

GetListenersFds gets listener fds.

type ListenServeOption

type ListenServeOption func(*ListenServeOptions)

ListenServeOption modifies the ListenServeOptions.

func WithCopyFrame

func WithCopyFrame(copyFrame bool) ListenServeOption

WithCopyFrame returns a ListenServeOption which sets whether copy frames. In stream RPC, even server use sync mod, stream is asynchronous, we need to copy frame to avoid over writing.

func WithDisableKeepAlives

func WithDisableKeepAlives(disable bool) ListenServeOption

WithDisableKeepAlives returns a ListenServeOption which disables keep-alives.

func WithHandler

func WithHandler(handler Handler) ListenServeOption

WithHandler returns a ListenServeOption which sets business Handler.

func WithListenAddress

func WithListenAddress(address string) ListenServeOption

WithListenAddress returns a ListenServerOption which sets listening address.

func WithListenNetwork

func WithListenNetwork(network string) ListenServeOption

WithListenNetwork returns a ListenServeOption which sets listen network.

func WithListener

func WithListener(lis net.Listener) ListenServeOption

WithListener returns a ListenServeOption which allows users to use their customized listener for specific accept/read/write logics.

func WithMaxRoutines

func WithMaxRoutines(routines int) ListenServeOption

WithMaxRoutines returns a ListenServeOption which sets the max number of async goroutines. It's recommended to reserve twice of expected goroutines, but no less than MAXPROCS. The default value is (1<<31 - 1). This option takes effect only when async mod is enabled. It's ignored on sync mod.

func WithServeTLS

func WithServeTLS(certFile, keyFile, caFile string) ListenServeOption

WithServeTLS returns a ListenServeOption which sets TLS relatives.

func WithServerAsync

func WithServerAsync(serverAsync bool) ListenServeOption

WithServerAsync returns a ListenServeOption which enables server async. When another frameworks call trpc, they may use long connections. tRPC server can not handle them concurrently, thus timeout. This option takes effect for each TCP connections.

func WithServerFramerBuilder

func WithServerFramerBuilder(fb codec.FramerBuilder) ListenServeOption

WithServerFramerBuilder returns a ListenServeOption which sets server frame builder.

func WithServerIdleTimeout

func WithServerIdleTimeout(timeout time.Duration) ListenServeOption

WithServerIdleTimeout returns a ListenServeOption which sets the server idle timeout.

func WithServiceName

func WithServiceName(name string) ListenServeOption

WithServiceName returns a ListenServeOption which sets the service name.

func WithStopListening added in v1.0.3

func WithStopListening(ch <-chan struct{}) ListenServeOption

WithStopListening returns a ListenServeOption which notifies the transport to stop listening.

func WithWritev

func WithWritev(writev bool) ListenServeOption

WithWritev returns a ListenServeOption which enables writev.

type ListenServeOptions

type ListenServeOptions struct {
	ServiceName   string
	Address       string
	Network       string
	Handler       Handler
	FramerBuilder codec.FramerBuilder
	Listener      net.Listener

	CACertFile  string        // ca certification file
	TLSCertFile string        // server certification file
	TLSKeyFile  string        // server key file
	Routines    int           // size of goroutine pool
	ServerAsync bool          // whether enable server async
	Writev      bool          // whether enable writev in server
	CopyFrame   bool          // whether copy frame
	IdleTimeout time.Duration // idle timeout of connection

	// DisableKeepAlives, if true, disables keep-alives and only use the
	// connection for a single request.
	// This used for rpc transport layer like http, it's unrelated to
	// the TCP keep-alives.
	DisableKeepAlives bool

	// StopListening is used to instruct the server transport to stop listening.
	StopListening <-chan struct{}
}

ListenServeOptions is the server options on start.

type RequestType

type RequestType = codec.RequestType

RequestType is the client request type, such as SendAndRecv or SendOnly.

const (
	SendAndRecv RequestType = codec.SendAndRecv // send and receive
	SendOnly    RequestType = codec.SendOnly    // send only
)

Request types.

type RoundTripOption

type RoundTripOption func(*RoundTripOptions)

RoundTripOption modifies the RoundTripOptions.

func WithClientFramerBuilder

func WithClientFramerBuilder(builder codec.FramerBuilder) RoundTripOption

WithClientFramerBuilder returns a RoundTripOption which sets FramerBuilder.

func WithConnectionMode

func WithConnectionMode(connMode ConnectionMode) RoundTripOption

WithConnectionMode returns a RoundTripOption which sets UDP connection mode.

func WithDialAddress

func WithDialAddress(address string) RoundTripOption

WithDialAddress returns a RoundTripOption which sets dial address.

func WithDialNetwork

func WithDialNetwork(network string) RoundTripOption

WithDialNetwork returns a RoundTripOption which sets dial network.

func WithDialPassword

func WithDialPassword(password string) RoundTripOption

WithDialPassword returns a RoundTripOption which sets dial password.

func WithDialPool

func WithDialPool(pool connpool.Pool) RoundTripOption

WithDialPool returns a RoundTripOption which sets dial pool.

func WithDialTLS

func WithDialTLS(certFile, keyFile, caFile, serverName string) RoundTripOption

WithDialTLS returns a RoundTripOption which sets UDP TLS relatives.

func WithDialTimeout

func WithDialTimeout(dur time.Duration) RoundTripOption

WithDialTimeout returns a RoundTripOption which sets dial timeout.

func WithDisableConnectionPool

func WithDisableConnectionPool() RoundTripOption

WithDisableConnectionPool returns a RoundTripOption which disables connection pool.

func WithLocalAddr

func WithLocalAddr(addr string) RoundTripOption

WithLocalAddr returns a RoundTripOption which sets local address. Random selection by default when there are multiple NICs.

func WithMsg

func WithMsg(msg codec.Msg) RoundTripOption

WithMsg returns a RoundTripOption which sets msg.

func WithMultiplexed

func WithMultiplexed(enable bool) RoundTripOption

WithMultiplexed returns a RoundTripOption which enables multiplexed.

func WithMultiplexedPool

func WithMultiplexedPool(p multiplexed.Pool) RoundTripOption

WithMultiplexedPool returns a RoundTripOption which sets multiplexed pool. This function also enables multiplexed.

func WithProtocol

func WithProtocol(s string) RoundTripOption

WithProtocol returns a RoundTripOption which sets protocol name, such as trpc.

func WithReqType

func WithReqType(reqType RequestType) RoundTripOption

WithReqType returns a RoundTripOption which sets request type.

type RoundTripOptions

type RoundTripOptions struct {
	Address               string // IP:Port. Note: address has been resolved from naming service.
	Password              string
	Network               string // tcp/udp
	LocalAddr             string // a random selected local address when accept a connection.
	DialTimeout           time.Duration
	Pool                  connpool.Pool // client connection pool
	ReqType               RequestType   // SendAndRecv, SendOnly
	FramerBuilder         codec.FramerBuilder
	ConnectionMode        ConnectionMode
	DisableConnectionPool bool // disable connection pool
	EnableMultiplexed     bool // enable multiplexed
	Multiplexed           multiplexed.Pool
	Msg                   codec.Msg
	Protocol              string // protocol type

	CACertFile    string // CA certificate file
	TLSCertFile   string // client certificate file
	TLSKeyFile    string // client key file
	TLSServerName string // the name when client verifies the server, default as HTTP hostname
}

RoundTripOptions is the options for one roundtrip.

type ServerStreamTransport

type ServerStreamTransport interface {
	// ServerTransport is used to keep compatibility with common RPC transport.
	ServerTransport
	// Send sends messages.
	Send(ctx context.Context, req []byte) error
	// Close is called when server encounters an error and cleans up.
	Close(ctx context.Context)
}

ServerStreamTransport is the server stream transport interface. It's compatible with common RPC transport.

func GetServerStreamTransport

func GetServerStreamTransport(name string) ServerStreamTransport

GetServerStreamTransport returns ServerStreamTransport by name.

func NewServerStreamTransport

func NewServerStreamTransport(opt ...ServerTransportOption) ServerStreamTransport

NewServerStreamTransport creates a new ServerTransport, which is wrapped in serverStreamTransport as the return ServerStreamTransport interface.

type ServerTransport

type ServerTransport interface {
	ListenAndServe(ctx context.Context, opts ...ListenServeOption) error
}

ServerTransport defines the server transport layer interface.

func GetServerTransport

func GetServerTransport(name string) ServerTransport

GetServerTransport gets the ServerTransport.

func NewServerTransport

func NewServerTransport(opt ...ServerTransportOption) ServerTransport

NewServerTransport creates a new ServerTransport.

type ServerTransportOption

type ServerTransportOption func(*ServerTransportOptions)

ServerTransportOption modifies the ServerTransportOptions.

func WithIdleTimeout

func WithIdleTimeout(timeout time.Duration) ServerTransportOption

WithIdleTimeout returns a ServerTransportOption which sets the server connection idle timeout.

func WithKeepAlivePeriod

func WithKeepAlivePeriod(d time.Duration) ServerTransportOption

WithKeepAlivePeriod returns a ServerTransportOption which sets the period to keep TCP connection alive. It's not available for TLS, since TLS neither use net.TCPConn nor net.Conn.

func WithRecvMsgChannelSize

func WithRecvMsgChannelSize(size int) ServerTransportOption

WithRecvMsgChannelSize returns a ServerTransportOption which sets the size of receive buf of ServerTransport TCP.

func WithRecvUDPPacketBufferSize

func WithRecvUDPPacketBufferSize(size int) ServerTransportOption

WithRecvUDPPacketBufferSize returns a ServerTransportOption which sets the pre-allocated buffer size of ServerTransport UDP.

func WithRecvUDPRawSocketBufSize

func WithRecvUDPRawSocketBufSize(size int) ServerTransportOption

WithRecvUDPRawSocketBufSize returns a ServerTransportOption which sets the size of the operating system's receive buffer associated with the UDP connection.

func WithReusePort

func WithReusePort(reuse bool) ServerTransportOption

WithReusePort returns a ServerTransportOption which enable reuse port or not.

func WithSendMsgChannelSize

func WithSendMsgChannelSize(size int) ServerTransportOption

WithSendMsgChannelSize returns a ServerTransportOption which sets the size of sendCh of ServerTransport TCP.

type ServerTransportOptions

type ServerTransportOptions struct {
	RecvMsgChannelSize      int
	SendMsgChannelSize      int
	RecvUDPPacketBufferSize int
	RecvUDPRawSocketBufSize int
	IdleTimeout             time.Duration
	KeepAlivePeriod         time.Duration
	ReusePort               bool
}

ServerTransportOptions is options of the server transport.

Directories

Path Synopsis
internal
frame
Package frame contains transport-level frame utilities.
Package frame contains transport-level frame utilities.
Package tnet provides tRPC-Go transport implementation for tnet networking framework.
Package tnet provides tRPC-Go transport implementation for tnet networking framework.
multiplex
Package multiplex implements a connection pool that supports connection multiplexing.
Package multiplex implements a connection pool that supports connection multiplexing.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL