server

package
v1.8.29 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2024 License: Apache-2.0 Imports: 46 Imported by: 882

Documentation

Index

Constants

View Source
const (
	XVersion           = "X-RPCX-Version"
	XMessageType       = "X-RPCX-MessageType"
	XHeartbeat         = "X-RPCX-Heartbeat"
	XOneway            = "X-RPCX-Oneway"
	XMessageStatusType = "X-RPCX-MessageStatusType"
	XSerializeType     = "X-RPCX-SerializeType"
	XMessageID         = "X-RPCX-MessageID"
	XServicePath       = "X-RPCX-ServicePath"
	XServiceMethod     = "X-RPCX-ServiceMethod"
	XMeta              = "X-RPCX-Meta"
	XErrorMessage      = "X-RPCX-ErrorMessage"
)
View Source
const (
	// CodeUnknownJSONRPCError should be used for all non coded errors.
	CodeUnknownJSONRPCError = -32001
	// CodeParseJSONRPCError is used when invalid JSON was received by the server.
	CodeParseJSONRPCError = -32700
	// CodeInvalidjsonrpcRequest is used when the JSON sent is not a valid jsonrpcRequest object.
	CodeInvalidjsonrpcRequest = -32600
	// CodeMethodNotFound should be returned by the handler when the method does
	// not exist / is not available.
	CodeMethodNotFound = -32601
	// CodeInvalidParams should be returned by the handler when method
	// parameter(s) were invalid.
	CodeInvalidParams = -32602
	// CodeInternalJSONRPCError is not currently returned but defined for completeness.
	CodeInternalJSONRPCError = -32603
)
View Source
const (
	// ReaderBuffsize is used for bufio reader.
	ReaderBuffsize = 1024
	// WriterBuffsize is used for bufio writer.
	WriterBuffsize = 1024
)
View Source
const (
	// RPCXHandlerMetrics is the name of HandlerViewer
	RPCXHandlerMetrics = "rpcx_handler"
)
View Source
const (
	// RPCXProcessTimeMetrics is the name of ProcessTimeViewer
	RPCXProcessTimeMetrics = "rpcx_processtime"
)
View Source
const (
	// RPCXRequestRateMetrics is the name of RequestRateViewer
	RPCXRequestRateMetrics = "rpcx_request_rate"
)

Variables

View Source
var (
	ErrServerClosed  = errors.New("http: Server closed")
	ErrReqReachLimit = errors.New("request reached rate limit")
)

ErrServerClosed is returned by the Server's Serve, ListenAndServe after a call to Shutdown or Close.

View Source
var (
	// RemoteConnContextKey is a context key. It can be used in
	// services with context.WithValue to access the connection arrived on.
	// The associated value will be of type net.Conn.
	RemoteConnContextKey = &contextKey{"remote-conn"}
	// StartRequestContextKey records the start time
	StartRequestContextKey = &contextKey{"start-parse-request"}
	// StartSendRequestContextKey records the start time
	StartSendRequestContextKey = &contextKey{"start-send-request"}
	// TagContextKey is used to record extra info in handling services. Its value is a map[string]interface{}
	TagContextKey = &contextKey{"service-tag"}
	// HttpConnContextKey is used to store http connection.
	HttpConnContextKey = &contextKey{"http-conn"}
)
View Source
var ErrNotAccept = errors.New("server refused the connection")

Functions

func HTTPRequest2RpcxRequest

func HTTPRequest2RpcxRequest(r *http.Request) (*protocol.Message, error)

HTTPRequest2RpcxRequest converts a http request to a rpcx request.

func NewHandlerViewer added in v1.8.18

func NewHandlerViewer(s *Server) viewer.Viewer

NewHandlerViewer returns the HandlerViewer instance

func NewProcessTimeViewer added in v1.8.18

func NewProcessTimeViewer(s *Server) viewer.Viewer

NewProcessTimeViewer returns the ProcessTimeViewer instance

func NewRequestRateViewer added in v1.8.18

func NewRequestRateViewer(s *Server) viewer.Viewer

NewRequestRateViewer returns the RequestRateViewer instance

func RegisterMakeListener

func RegisterMakeListener(network string, ml MakeListener)

RegisterMakeListener registers a MakeListener for network.

Types

type CMuxPlugin added in v1.6.3

type CMuxPlugin interface {
	MuxMatch(m cmux.CMux)
}

type CORSOptions added in v1.4.1

type CORSOptions = cors.Options

func AllowAllCORSOptions added in v1.4.1

func AllowAllCORSOptions() *CORSOptions

AllowAllCORSOptions returns a option that allows access.

type Context added in v1.6.9

type Context struct {
	// contains filtered or unexported fields
}

Context represents a rpcx FastCall context.

func NewContext added in v1.6.9

func NewContext(ctx *share.Context, conn net.Conn, req *protocol.Message, async bool) *Context

NewContext creates a server.Context for Handler.

func (*Context) Bind added in v1.6.9

func (ctx *Context) Bind(v interface{}) error

Bind parses the body data and stores the result to v.

func (*Context) DeleteKey added in v1.6.12

func (ctx *Context) DeleteKey(key interface{})

DeleteKey delete the kv pair by key.

func (*Context) Get added in v1.6.9

func (ctx *Context) Get(key interface{}) interface{}

Get returns value for key.

func (*Context) Metadata added in v1.6.9

func (ctx *Context) Metadata() map[string]string

Metadata returns the metadata.

func (*Context) Payload added in v1.6.9

func (ctx *Context) Payload() []byte

Payload returns the payload.

func (*Context) ServiceMethod added in v1.6.9

func (ctx *Context) ServiceMethod() string

ServiceMethod returns the ServiceMethod.

func (*Context) ServicePath added in v1.6.9

func (ctx *Context) ServicePath() string

ServicePath returns the ServicePath.

func (*Context) SetValue added in v1.6.9

func (ctx *Context) SetValue(key, val interface{})

SetValue sets the kv pair.

func (*Context) Write added in v1.6.9

func (ctx *Context) Write(v interface{}) error

func (*Context) WriteError added in v1.6.9

func (ctx *Context) WriteError(err error) error

type DownloadFileHandler added in v1.6.2

type DownloadFileHandler func(conn net.Conn, args *share.DownloadFileArgs)

DownloadFileHandler handles downloading file. Must close the connection after it finished.

type FileTransfer added in v1.6.2

type FileTransfer struct {
	Addr          string
	AdvertiseAddr string
	// contains filtered or unexported fields
}

FileTransfer support transfer files from clients. It registers a file transfer service and listens a on the given port. Clients will invokes this service to get the token and send the token and the file to this port.

func NewFileTransfer added in v1.6.2

func NewFileTransfer(addr string, handler FileTransferHandler, downloadFileHandler DownloadFileHandler, waitNum int) *FileTransfer

NewFileTransfer creates a FileTransfer with given parameters.

func (*FileTransfer) Start added in v1.6.2

func (s *FileTransfer) Start() error

func (*FileTransfer) Stop added in v1.6.2

func (s *FileTransfer) Stop() error

type FileTransferHandler added in v1.6.2

type FileTransferHandler func(conn net.Conn, args *share.FileTransferArgs)

FileTransferHandler handles uploading file. Must close the connection after it finished.

type FileTransferService added in v1.6.2

type FileTransferService struct {
	FileTransfer *FileTransfer
}

func (*FileTransferService) DownloadFile added in v1.6.2

func (*FileTransferService) TransferFile added in v1.6.2

type Handler added in v1.6.9

type Handler func(ctx *Context) error

type HandlerViewer added in v1.8.18

type HandlerViewer struct {
	// contains filtered or unexported fields
}

HandlerViewer collects metrics of rpcx.

func (*HandlerViewer) Name added in v1.8.18

func (vr *HandlerViewer) Name() string

func (*HandlerViewer) Serve added in v1.8.18

func (vr *HandlerViewer) Serve(w http.ResponseWriter, _ *http.Request)

func (*HandlerViewer) SetStatsMgr added in v1.8.18

func (vr *HandlerViewer) SetStatsMgr(smgr *viewer.StatsMgr)

func (*HandlerViewer) View added in v1.8.18

func (vr *HandlerViewer) View() *charts.Line

type HeartbeatPlugin added in v1.6.2

type HeartbeatPlugin interface {
	HeartbeatRequest(ctx context.Context, req *protocol.Message) error
}

HeartbeatPlugin is .

type ID added in v1.4.1

type ID struct {
	Name   string
	Number int64
}

ID is a jsonrpcRequest identifier. Only one of either the Name or Number members will be set, using the number form if the Name is the empty string.

func (*ID) MarshalJSON added in v1.4.1

func (id *ID) MarshalJSON() ([]byte, error)

func (*ID) String added in v1.4.1

func (id *ID) String() string

String returns a string representation of the ID. The representation is non ambiguous, string forms are quoted, number forms are preceded by a #

func (*ID) UnmarshalJSON added in v1.4.1

func (id *ID) UnmarshalJSON(data []byte) error

type JSONRPCError added in v1.4.1

type JSONRPCError struct {
	// Code is an error code indicating the type of failure.
	Code int64 `json:"code"`
	// Message is a short description of the error.
	Message string `json:"message"`
	// Data is optional structured data containing additional information about the error.
	Data *json.RawMessage `json:"data"`
}

JSONRPCError represents a structured error in a jsonrpcRespone.

func (*JSONRPCError) JSONRPCError added in v1.4.1

func (err *JSONRPCError) JSONRPCError() string

type MakeListener

type MakeListener func(s *Server, address string) (ln net.Listener, err error)

MakeListener defines a listener generator.

type OptionFn

type OptionFn func(*Server)

OptionFn configures options of server.

func WithAsyncWrite added in v1.7.5

func WithAsyncWrite() OptionFn

WithAsyncWrite sets AsyncWrite to true.

func WithCustomPool added in v1.7.12

func WithCustomPool(pool WorkerPool) OptionFn

WithCustomPool uses a custom goroutine pool.

func WithPool added in v1.7.5

func WithPool(maxWorkers, maxCapacity int, options ...pond.Option) OptionFn

WithPool sets goroutine pool.

func WithReadTimeout

func WithReadTimeout(readTimeout time.Duration) OptionFn

WithReadTimeout sets readTimeout.

func WithTCPKeepAlivePeriod added in v1.6.2

func WithTCPKeepAlivePeriod(period time.Duration) OptionFn

WithTCPKeepAlivePeriod sets tcp keepalive period.

func WithTLSConfig

func WithTLSConfig(cfg *tls.Config) OptionFn

WithTLSConfig sets tls.Config.

func WithWriteTimeout

func WithWriteTimeout(writeTimeout time.Duration) OptionFn

WithWriteTimeout sets writeTimeout.

type Plugin

type Plugin interface{}

Plugin is the server plugin interface.

type PluginContainer

type PluginContainer interface {
	Add(plugin Plugin)
	Remove(plugin Plugin)
	All() []Plugin

	DoRegister(name string, rcvr interface{}, metadata string) error
	DoRegisterFunction(serviceName, fname string, fn interface{}, metadata string) error
	DoUnregister(name string) error

	DoPostConnAccept(net.Conn) (net.Conn, bool)
	DoPostConnClose(net.Conn) bool

	DoPreReadRequest(ctx context.Context) error
	DoPostReadRequest(ctx context.Context, r *protocol.Message, e error) error
	DoPostHTTPRequest(ctx context.Context, r *http.Request, params httprouter.Params) error

	DoPreHandleRequest(ctx context.Context, req *protocol.Message) error
	DoPreCall(ctx context.Context, serviceName, methodName string, args interface{}) (interface{}, error)
	DoPostCall(ctx context.Context, serviceName, methodName string, args, reply interface{}, err error) (interface{}, error)

	DoPreWriteResponse(context.Context, *protocol.Message, *protocol.Message, error) error
	DoPostWriteResponse(context.Context, *protocol.Message, *protocol.Message, error) error

	DoPreWriteRequest(ctx context.Context) error
	DoPostWriteRequest(ctx context.Context, r *protocol.Message, e error) error

	DoHeartbeatRequest(ctx context.Context, req *protocol.Message) error

	MuxMatch(m cmux.CMux)
}

PluginContainer represents a plugin container that defines all methods to manage plugins. And it also defines all extension points.

type PostCallPlugin added in v1.6.2

type PostCallPlugin interface {
	PostCall(ctx context.Context, serviceName, methodName string, args, reply interface{}, err error) (interface{}, error)
}

type PostConnAcceptPlugin

type PostConnAcceptPlugin interface {
	HandleConnAccept(net.Conn) (net.Conn, bool)
}

PostConnAcceptPlugin represents connection accept plugin. if returns false, it means subsequent IPostConnAcceptPlugins should not continue to handle this conn and this conn has been closed.

type PostConnClosePlugin added in v1.4.1

type PostConnClosePlugin interface {
	HandleConnClose(net.Conn) bool
}

PostConnClosePlugin represents client connection close plugin.

type PostHTTPRequestPlugin added in v1.8.8

type PostHTTPRequestPlugin interface {
	PostHTTPRequest(ctx context.Context, r *http.Request, params httprouter.Params) error
}

PostHTTPRequestPlugin represents .

type PostReadRequestPlugin

type PostReadRequestPlugin interface {
	PostReadRequest(ctx context.Context, r *protocol.Message, e error) error
}

PostReadRequestPlugin represents .

type PostWriteRequestPlugin

type PostWriteRequestPlugin interface {
	PostWriteRequest(ctx context.Context, r *protocol.Message, e error) error
}

PostWriteRequestPlugin represents .

type PostWriteResponsePlugin

type PostWriteResponsePlugin interface {
	PostWriteResponse(context.Context, *protocol.Message, *protocol.Message, error) error
}

PostWriteResponsePlugin represents .

type PreCallPlugin added in v1.6.2

type PreCallPlugin interface {
	PreCall(ctx context.Context, serviceName, methodName string, args interface{}) (interface{}, error)
}

type PreHandleRequestPlugin added in v1.4.1

type PreHandleRequestPlugin interface {
	PreHandleRequest(ctx context.Context, r *protocol.Message) error
}

PreHandleRequestPlugin represents .

type PreReadRequestPlugin

type PreReadRequestPlugin interface {
	PreReadRequest(ctx context.Context) error
}

PreReadRequestPlugin represents .

type PreWriteRequestPlugin

type PreWriteRequestPlugin interface {
	PreWriteRequest(ctx context.Context) error
}

PreWriteRequestPlugin represents .

type PreWriteResponsePlugin

type PreWriteResponsePlugin interface {
	PreWriteResponse(context.Context, *protocol.Message, *protocol.Message, error) error
}

PreWriteResponsePlugin represents .

type ProcessTimeViewer added in v1.8.18

type ProcessTimeViewer struct {
	// contains filtered or unexported fields
}

ProcessTimeViewer collects metrics of rpcx.

func (*ProcessTimeViewer) Name added in v1.8.18

func (vr *ProcessTimeViewer) Name() string

func (*ProcessTimeViewer) Serve added in v1.8.18

func (*ProcessTimeViewer) SetStatsMgr added in v1.8.18

func (vr *ProcessTimeViewer) SetStatsMgr(smgr *viewer.StatsMgr)

func (*ProcessTimeViewer) View added in v1.8.18

func (vr *ProcessTimeViewer) View() *charts.Line

type RegisterFunctionPlugin

type RegisterFunctionPlugin interface {
	RegisterFunction(serviceName, fname string, fn interface{}, metadata string) error
}

RegisterFunctionPlugin is .

type RegisterPlugin

type RegisterPlugin interface {
	Register(name string, rcvr interface{}, metadata string) error
	Unregister(name string) error
}

RegisterPlugin is .

type RequestRateViewer added in v1.8.18

type RequestRateViewer struct {
	// contains filtered or unexported fields
}

RequestRateViewer collects metrics of rpcx.

func (*RequestRateViewer) Name added in v1.8.18

func (vr *RequestRateViewer) Name() string

func (*RequestRateViewer) Serve added in v1.8.18

func (*RequestRateViewer) SetStatsMgr added in v1.8.18

func (vr *RequestRateViewer) SetStatsMgr(smgr *viewer.StatsMgr)

func (*RequestRateViewer) View added in v1.8.18

func (vr *RequestRateViewer) View() *charts.Line

type Reset

type Reset interface {
	Reset()
}

Reset defines Reset method for pooled object.

type Server

type Server struct {
	DisableHTTPGateway bool // disable http invoke or not.
	DisableJSONRPC     bool // disable json rpc or not.
	EnableProfile      bool // enable profile and statsview or not
	AsyncWrite         bool // set true if your server only serves few clients

	Plugins PluginContainer

	// AuthFunc can be used to auth.
	AuthFunc func(ctx context.Context, req *protocol.Message, token string) error

	// HandleServiceError is used to get all service errors. You can use it write logs or others.
	HandleServiceError func(error)

	// ServerErrorFunc is a customized error handlers and you can use it to return customized error strings to clients.
	// If not set, it use err.Error()
	ServerErrorFunc func(res *protocol.Message, err error) string

	ViewManager *ViewManager
	// contains filtered or unexported fields
}

Server is rpcx server that use TCP or UDP.

func NewServer

func NewServer(options ...OptionFn) *Server

NewServer returns a server.

func (*Server) ActiveClientConn added in v1.4.1

func (s *Server) ActiveClientConn() []net.Conn

ActiveClientConn returns active connections.

func (*Server) AddHandler added in v1.6.9

func (s *Server) AddHandler(servicePath, serviceMethod string, handler func(*Context) error)

func (*Server) Address

func (s *Server) Address() net.Addr

Address returns listened address.

func (*Server) Close

func (s *Server) Close() error

Close immediately closes all active net.Listeners.

func (*Server) EnableFileTransfer added in v1.6.2

func (s *Server) EnableFileTransfer(serviceName string, fileTransfer *FileTransfer)

EnableFileTransfer supports filetransfer service in this server.

func (*Server) EnableStreamService added in v1.6.2

func (s *Server) EnableStreamService(serviceName string, streamService *StreamService)

EnableStreamService supports stream service in this server.

func (*Server) Register

func (s *Server) Register(rcvr interface{}, metadata string) error

Register publishes in the server the set of methods of the receiver value that satisfy the following conditions:

  • exported method of exported type
  • three arguments, the first is of context.Context, both of exported type for three arguments
  • the third argument is a pointer
  • one return value, of type error

It returns an error if the receiver is not an exported type or has no suitable methods. It also logs the error. The client accesses each method using a string of the form "Type.Method", where Type is the receiver's concrete type.

func (*Server) RegisterFunction

func (s *Server) RegisterFunction(servicePath string, fn interface{}, metadata string) error

RegisterFunction publishes a function that satisfy the following conditions:

  • three arguments, the first is of context.Context, both of exported type for three arguments
  • the third argument is a pointer
  • one return value, of type error

The client accesses function using a string of the form "servicePath.Method".

func (*Server) RegisterFunctionName

func (s *Server) RegisterFunctionName(servicePath string, name string, fn interface{}, metadata string) error

RegisterFunctionName is like RegisterFunction but uses the provided name for the function instead of the function's concrete type.

func (*Server) RegisterName

func (s *Server) RegisterName(name string, rcvr interface{}, metadata string) error

RegisterName is like Register but uses the provided name for the type instead of the receiver's concrete type.

func (*Server) RegisterOnRestart added in v1.6.2

func (s *Server) RegisterOnRestart(f func(s *Server))

RegisterOnRestart registers a function to call on Restart.

func (*Server) RegisterOnShutdown

func (s *Server) RegisterOnShutdown(f func(s *Server))

RegisterOnShutdown registers a function to call on Shutdown. This can be used to gracefully shutdown connections.

func (*Server) Restart added in v1.6.2

func (s *Server) Restart(ctx context.Context) error

Restart restarts this server gracefully. It starts a new rpcx server with the same port with SO_REUSEPORT socket option, and shutdown this rpcx server gracefully.

func (*Server) SendMessage

func (s *Server) SendMessage(conn net.Conn, servicePath, serviceMethod string, metadata map[string]string, data []byte) error

SendMessage a request to the specified client. The client is designated by the conn. conn can be gotten from context in services:

ctx.Value(RemoteConnContextKey)

servicePath, serviceMethod, metadata can be set to zero values.

func (*Server) Serve

func (s *Server) Serve(network, address string) (err error)

Serve starts and listens RPC requests. It is blocked until receiving connections from clients.

func (*Server) ServeHTTP

func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request)

ServeHTTP implements an http.Handler that answers RPC requests.

func (*Server) ServeListener added in v1.6.2

func (s *Server) ServeListener(network string, ln net.Listener) (err error)

ServeListener listens RPC requests. It is blocked until receiving connections from clients.

func (*Server) ServeWS added in v1.6.2

func (s *Server) ServeWS(conn *websocket.Conn)

func (*Server) SetCORS added in v1.4.1

func (s *Server) SetCORS(options *CORSOptions)

SetCORS sets CORS options. for example:

cors.Options{
	AllowedOrigins:   []string{"foo.com"},
	AllowedMethods:   []string{http.MethodGet, http.MethodPost, http.MethodDelete},
	AllowCredentials: true,
}

func (*Server) Shutdown added in v1.4.1

func (s *Server) Shutdown(ctx context.Context) error

Shutdown gracefully shuts down the server without interrupting any active connections. Shutdown works by first closing the listener, then closing all idle connections, and then waiting indefinitely for connections to return to idle and then shut down. If the provided context expires before the shutdown is complete, Shutdown returns the context's error, otherwise it returns any error returned from closing the Server's underlying Listener.

func (*Server) UnregisterAll added in v1.4.1

func (s *Server) UnregisterAll() error

UnregisterAll unregisters all services. You can call this method when you want to shutdown/upgrade this node.

type StreamAcceptor added in v1.6.2

type StreamAcceptor func(ctx context.Context, args *share.StreamServiceArgs) bool

StreamAcceptor accepts connection from clients or not. You can use it to validate clients and determine if accept or drop the connection.

type StreamHandler added in v1.6.2

type StreamHandler func(conn net.Conn, args *share.StreamServiceArgs)

StreamHandler handles a streaming connection with client.

type StreamService added in v1.6.2

type StreamService struct {
	Addr          string
	AdvertiseAddr string
	// contains filtered or unexported fields
}

StreamService support streaming between clients and server. It registers a streaming service and listens on the given port. Clients will invokes this service to get the token and send the token and begin to stream.

func NewStreamService added in v1.6.2

func NewStreamService(addr string, streamHandler StreamHandler, acceptor StreamAcceptor, waitNum int) *StreamService

NewStreamService creates a stream service.

func (*StreamService) Start added in v1.6.2

func (s *StreamService) Start() error

func (*StreamService) Stop added in v1.6.2

func (s *StreamService) Stop() error

func (*StreamService) Stream added in v1.6.2

type VersionTag added in v1.4.1

type VersionTag struct{}

VersionTag is a special 0 sized struct that encodes as the jsonrpc version tag. It will fail during decode if it is not the correct version tag in the stream.

func (VersionTag) MarshalJSON added in v1.4.1

func (VersionTag) MarshalJSON() ([]byte, error)

func (VersionTag) UnmarshalJSON added in v1.4.1

func (VersionTag) UnmarshalJSON(data []byte) error

type ViewManager added in v1.8.17

type ViewManager struct {
	Smgr   *viewer.StatsMgr
	Ctx    context.Context
	Cancel context.CancelFunc
	Views  []viewer.Viewer
	// contains filtered or unexported fields
}

ViewManager

func NewViewManager added in v1.8.17

func NewViewManager(ln net.Listener, s *Server) *ViewManager

NewViewManager creates a new ViewManager instance

func (*ViewManager) Register added in v1.8.17

func (vm *ViewManager) Register(views ...viewer.Viewer)

Register registers views to the ViewManager

func (*ViewManager) Start added in v1.8.17

func (vm *ViewManager) Start() error

Start runs a http server and begin to collect metrics

func (*ViewManager) Stop added in v1.8.17

func (vm *ViewManager) Stop()

Stop shutdown the http server gracefully

type WorkerPool added in v1.7.12

type WorkerPool interface {
	Submit(task func())
	StopAndWaitFor(deadline time.Duration)
	Stop()
	StopAndWait()
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL