Documentation ¶
Index ¶
- Constants
- Variables
- func Append(buf []byte, more ...byte) []byte
- func AppendString(buf []byte, more string) []byte
- func AsyncExecute(f func())
- func AsyncResponse() bool
- func BatchRecv() bool
- func BatchSend() bool
- func BeforeRecv(h func(net.Conn) error)
- func BeforeSend(h func(net.Conn) error)
- func Free(buf []byte)
- func Handle(m string, h HandlerFunc, argv ...interface{})
- func HandleConnected(onConnected func(*Client))
- func HandleDisconnected(onDisConnected func(*Client))
- func HandleMalloc(f func(int) []byte)
- func HandleNotFound(h HandlerFunc)
- func HandleSessionMiss(onSessionMiss func(c *Client, m *Message))
- func LogDebugInfo()
- func Malloc(size int) []byte
- func MaxBodyLen() int
- func ReadTimeout() time.Duration
- func Realloc(buf []byte, size int) []byte
- func RecvBufferSize() int
- func SendBufferSize() int
- func SendQueueSize() int
- func SetAsyncExecutor(executor func(f func()))
- func SetAsyncResponse(async bool)
- func SetBatchRecv(batch bool)
- func SetBatchSend(batch bool)
- func SetDebug(enable bool)
- func SetHandler(h Handler)
- func SetLogTag(tag string)
- func SetMaxBodyLen(l int)
- func SetReadTimeout(timeout time.Duration)
- func SetReaderWrapper(wrapper func(conn net.Conn) io.Reader)
- func SetRecvBufferSize(size int)
- func SetSendBufferSize(size int)
- func SetSendQueueSize(size int)
- func SetWriteTimeout(timeout time.Duration)
- func Use(h HandlerFunc)
- func UseCoder(coder MessageCoder)
- func WriteTimeout() time.Duration
- type Allocator
- type BufferPool
- func (bp *BufferPool) Append(buf []byte, more ...byte) []byte
- func (bp *BufferPool) AppendString(buf []byte, more string) []byte
- func (bp *BufferPool) Free(buf []byte)
- func (bp *BufferPool) LogDebugInfo()
- func (bp *BufferPool) Malloc(size int) []byte
- func (bp *BufferPool) Realloc(buf []byte, size int) []byte
- type Client
- func (c *Client) Call(method string, req interface{}, rsp interface{}, timeout time.Duration, ...) error
- func (c *Client) CallWith(ctx context.Context, method string, req interface{}, rsp interface{}, ...) error
- func (c *Client) CheckState() error
- func (c *Client) Delete(key interface{})
- func (c *Client) Get(key interface{}) (interface{}, bool)
- func (c *Client) Keepalive(interval time.Duration)
- func (c *Client) NewMessage(cmd byte, method string, v interface{}, args ...interface{}) *Message
- func (c *Client) Ping()
- func (c *Client) Pong()
- func (c *Client) PushMsg(msg *Message, timeout time.Duration) error
- func (c *Client) Restart() error
- func (c *Client) Set(key interface{}, value interface{})
- func (c *Client) SetState(running bool)
- func (c *Client) Stop()
- type ClientPool
- type Context
- func (ctx *Context) Abort()
- func (ctx *Context) Arg() interface{}
- func (ctx *Context) Bind(v interface{}) error
- func (ctx *Context) Deadline() (deadline time.Time, ok bool)
- func (ctx *Context) Done() <-chan struct{}
- func (ctx *Context) Err() error
- func (ctx *Context) Error(v interface{}) error
- func (ctx *Context) Get(key interface{}) (interface{}, bool)
- func (ctx *Context) Next()
- func (ctx *Context) Release()
- func (ctx *Context) Set(key interface{}, value interface{})
- func (ctx *Context) Value(key interface{}) interface{}
- func (ctx *Context) Values() map[interface{}]interface{}
- func (ctx *Context) Write(v interface{}) error
- func (ctx *Context) WriteWithTimeout(v interface{}, timeout time.Duration) error
- type DialerFunc
- type Handler
- type HandlerFunc
- type Header
- type Message
- func (m *Message) BodyLen() int
- func (m *Message) Cmd() byte
- func (m *Message) Data() []byte
- func (m *Message) Error() error
- func (m *Message) Get(key interface{}) (interface{}, bool)
- func (m *Message) IsAsync() bool
- func (m *Message) IsError() bool
- func (m *Message) IsFlagBitSet(index int) bool
- func (m *Message) Len() int
- func (m *Message) Method() string
- func (m *Message) MethodLen() int
- func (m *Message) Payback()
- func (m *Message) Release()
- func (m *Message) ResetAttrs()
- func (m *Message) Seq() uint64
- func (m *Message) Set(key interface{}, value interface{})
- func (m *Message) SetAsync(isAsync bool)
- func (m *Message) SetBodyLen(l int)
- func (m *Message) SetCmd(cmd byte)
- func (m *Message) SetError(isError bool)
- func (m *Message) SetFlagBit(index int, value bool) error
- func (m *Message) SetMethodLen(l int)
- func (m *Message) SetSeq(seq uint64)
- func (m *Message) Values() map[interface{}]interface{}
- type MessageCoder
- type NativeAllocator
- type Server
- func (s *Server) ForEach(h func(*Client))
- func (s *Server) ForEachWithFilter(h func(*Client), filter func(*Client) bool)
- func (s *Server) NewMessage(cmd byte, method string, v interface{}, args ...interface{}) *Message
- func (s *Server) Run(addr string) error
- func (s *Server) Serve(ln net.Listener) error
- func (s *Server) Shutdown(ctx context.Context) error
- func (s *Server) Stop() error
- type WebsocketConn
Constants ¶
const ( // TimeZero represents zero time. TimeZero time.Duration = 0 // TimeForever represents forever time. TimeForever time.Duration = 1<<63 - 1 )
const ( // CmdNone is invalid CmdNone byte = 0 // CmdRequest the other side should response to a request message CmdRequest byte = 1 // CmdResponse the other side should not response to a request message CmdResponse byte = 2 // CmdNotify the other side should not response to a request message CmdNotify byte = 3 // CmdPing . CmdPing byte = 4 // CmdPong . CmdPong byte = 5 )
const ( // HeaderIndexBodyLenBegin . HeaderIndexBodyLenBegin = 0 // HeaderIndexBodyLenEnd . HeaderIndexBodyLenEnd = 4 // HeaderIndexReserved . HeaderIndexReserved = 4 // HeaderIndexCmd . HeaderIndexCmd = 5 // HeaderIndexFlag . HeaderIndexFlag = 6 // HeaderIndexMethodLen . HeaderIndexMethodLen = 7 // HeaderIndexSeqBegin . HeaderIndexSeqBegin = 8 // HeaderIndexSeqEnd . HeaderIndexSeqEnd = 16 // HeaderFlagMaskError . HeaderFlagMaskError byte = 0x01 // HeaderFlagMaskAsync . HeaderFlagMaskAsync byte = 0x02 )
const ( // HeadLen represents Message head length. HeadLen int = 16 // MaxMethodLen limits Message method length. MaxMethodLen int = 127 // DefaultMaxBodyLen limits Message body length. DefaultMaxBodyLen int = 1024*1024*64 - 16 )
Variables ¶
var ( // ErrClientTimeout represents a timeout error because of timer or context. ErrClientTimeout = errors.New("timeout") // ErrClientInvalidTimeoutZero represents an error of 0 time parameter. ErrClientInvalidTimeoutZero = errors.New("invalid timeout, should not be 0") // ErrClientInvalidTimeoutLessThanZero represents an error of less than 0 time parameter. ErrClientInvalidTimeoutLessThanZero = errors.New("invalid timeout, should not be < 0") // ErrClientInvalidTimeoutZeroWithNonNilCallback represents an error with 0 time parameter but with non-nil callback. ErrClientInvalidTimeoutZeroWithNonNilCallback = errors.New("invalid timeout 0 with non-nil callback") // ErrClientOverstock represents an error of Client's send queue is full. ErrClientOverstock = errors.New("timeout: rpc Client's send queue is full") // ErrClientReconnecting represents an error that Client is reconnecting. ErrClientReconnecting = errors.New("client reconnecting") // ErrClientStopped represents an error that Client is stopped. ErrClientStopped = errors.New("client stopped") // ErrClientInvalidPoolDialers represents an error of empty dialer array. ErrClientInvalidPoolDialers = errors.New("invalid dialers: empty array") )
client error
var ( // ErrInvalidRspMessage represents an error of invalid message CMD. ErrInvalidRspMessage = errors.New("invalid response message cmd") // ErrMethodNotFound represents an error of method not found. ErrMethodNotFound = errors.New("method not found") // ErrInvalidFlagBitIndex represents an error of invlaid flag bit index. ErrInvalidFlagBitIndex = errors.New("invalid index, should be 0-7") )
message error
var ( // PingMessage . PingMessage = newMessage(CmdPing, "", nil, false, false, 0, nil, nil, nil) // PongMessage . PongMessage = newMessage(CmdPong, "", nil, false, false, 0, nil, nil, nil) )
var ( // ErrContextResponseToNotify represents an error that response to a notify message. ErrContextResponseToNotify = errors.New("should not response to a context with notify message") )
context error
var ( // ErrTimeout represents an error of timeout. ErrTimeout = errors.New("timeout") )
general errors
Functions ¶
func AppendString ¶
AppendString exports default package method.
func BeforeRecv ¶
BeforeRecv registers default handler which will be called before Recv.
func BeforeSend ¶
BeforeSend registers default handler which will be called before Send.
func Handle ¶
func Handle(m string, h HandlerFunc, argv ...interface{})
Handle registers default method/router handler.
If pass a Boolean value of "true", the handler will be called asynchronously in a new goroutine, Else the handler will be called synchronously in the client's reading goroutine one by one.
func HandleConnected ¶
func HandleConnected(onConnected func(*Client))
HandleConnected registers default handler which will be called when client connected.
func HandleDisconnected ¶
func HandleDisconnected(onDisConnected func(*Client))
HandleDisconnected registers default handler which will be called when client disconnected.
func HandleMalloc ¶
HandleMalloc registers default buffer maker.
func HandleNotFound ¶
func HandleNotFound(h HandlerFunc)
HandleNotFound registers default "" method/router handler, It will be called when mothod/router is not found.
func HandleSessionMiss ¶
HandleSessionMiss registers default handler which will be called when async message seq not found.
func MaxBodyLen ¶
func MaxBodyLen() int
func RecvBufferSize ¶
func RecvBufferSize() int
RecvBufferSize returns default client's read buffer size.
func SendBufferSize ¶
func SendBufferSize() int
SendBufferSize returns default client's read buffer size.
func SendQueueSize ¶
func SendQueueSize() int
SendQueueSize returns default client's send queue channel capacity.
func SetAsyncExecutor ¶
func SetAsyncExecutor(executor func(f func()))
SetAsyncExecutor sets executor. AsyncExecute executes a func
func SetAsyncResponse ¶
func SetAsyncResponse(async bool)
SetAsyncResponse sets default AsyncResponse flag.
func SetMaxBodyLen ¶
func SetMaxBodyLen(l int)
func SetReadTimeout ¶
SetReadTimeout sets client's read timeout.
func SetReaderWrapper ¶
SetReaderWrapper registers default reader wrapper for net.Conn.
func SetRecvBufferSize ¶
func SetRecvBufferSize(size int)
SetRecvBufferSize sets default client's read buffer size.
func SetSendBufferSize ¶
func SetSendBufferSize(size int)
SetSendBufferSize sets default client's read buffer size.
func SetSendQueueSize ¶
func SetSendQueueSize(size int)
SetSendQueueSize sets default client's send queue channel capacity.
func SetWriteTimeout ¶
SetWriteTimeout sets client's write timeout.
func UseCoder ¶
func UseCoder(coder MessageCoder)
UseCoder registers default message coding middleware, coder.Encode will be called before message send, coder.Decode will be called after message recv.
Types ¶
type Allocator ¶
type BufferPool ¶
type BufferPool struct { Debug bool // contains filtered or unexported fields }
BufferPool .
func (*BufferPool) AppendString ¶
func (bp *BufferPool) AppendString(buf []byte, more string) []byte
AppendString .
func (*BufferPool) LogDebugInfo ¶
func (bp *BufferPool) LogDebugInfo()
type Client ¶
type Client struct { Conn net.Conn Codec codec.Codec Handler Handler Reader io.Reader Dialer DialerFunc // contains filtered or unexported fields }
Client represents an arpc Client. There may be multiple outstanding Calls or Notifys associated with a single Client, and a Client may be used by multiple goroutines simultaneously.
func NewClient ¶
func NewClient(dialer DialerFunc, args ...interface{}) (*Client, error)
NewClient creates a Client.
func (*Client) Call ¶
func (c *Client) Call(method string, req interface{}, rsp interface{}, timeout time.Duration, args ...interface{}) error
Call makes an rpc call with a timeout. Call will block waiting for the server's response until timeout.
func (*Client) CallWith ¶
func (c *Client) CallWith(ctx context.Context, method string, req interface{}, rsp interface{}, args ...interface{}) error
CallWith uses context to make rpc call. CallWith blocks to wait for a response from the server until it times out.
func (*Client) NewMessage ¶
NewMessage creates a Message by client's seq, handler and codec.
func (*Client) Set ¶
func (c *Client) Set(key interface{}, value interface{})
Set sets key-value pair.
type ClientPool ¶
type ClientPool struct {
// contains filtered or unexported fields
}
ClientPool represents an arpc Client Pool.
func NewClientPool ¶
func NewClientPool(dialer DialerFunc, size int, args ...interface{}) (*ClientPool, error)
NewClientPool creates a ClientPool.
func NewClientPoolFromDialers ¶
func NewClientPoolFromDialers(dialers []DialerFunc, args ...interface{}) (*ClientPool, error)
NewClientPoolFromDialers creates a ClientPool by multiple dialers.
func (*ClientPool) Get ¶
func (pool *ClientPool) Get(index int) *Client
Get returns a Client by index.
func (*ClientPool) Next ¶
func (pool *ClientPool) Next() *Client
Next returns a Client by round robin.
type Context ¶
type Context struct { Client *Client // contains filtered or unexported fields }
Context represents an arpc Call's context.
func (*Context) Abort ¶
func (ctx *Context) Abort()
Abort stops the one-by-one-calling of middlewares and method/router handler.
func (*Context) Bind ¶
Bind parses the body data and stores the result in the value pointed to by v.
func (*Context) Next ¶
func (ctx *Context) Next()
Next calls next middleware or method/router handler.
func (*Context) Set ¶
func (ctx *Context) Set(key interface{}, value interface{})
Set sets key-value pair.
func (*Context) Value ¶
func (ctx *Context) Value(key interface{}) interface{}
Value returns the value associated with this context for key, implements stdlib's Context.
func (*Context) Values ¶
func (ctx *Context) Values() map[interface{}]interface{}
Values returns values.
type DialerFunc ¶
DialerFunc defines the dialer used by arpc Client to connect to the server.
type Handler ¶
type Handler interface { // Clone returns a copy of Handler. Clone() Handler // LogTag returns log tag value. LogTag() string // SetLogTag sets log tag. SetLogTag(tag string) // HandleConnected registers handler which will be called when client connected. HandleConnected(onConnected func(*Client)) // OnConnected will be called when client is connected. OnConnected(c *Client) // HandleDisconnected registers handler which will be called when client is disconnected. HandleDisconnected(onDisConnected func(*Client)) // OnDisconnected will be called when client is disconnected. OnDisconnected(c *Client) // MaxReconnectTimes returns client's max reconnect times. MaxReconnectTimes() int // SetMaxReconnectTimes sets client's max reconnect times for. SetMaxReconnectTimes(n int) // HandleSessionMiss registers handler which will be called when async message seq not found. HandleSessionMiss(onSessionMiss func(c *Client, m *Message)) // OnSessionMiss will be called when async message seq not found. OnSessionMiss(c *Client, m *Message) // HandleContextDone registers handler which will be called when message dropped. HandleContextDone(onContextDone func(ctx *Context)) // OnContextDone will be called when message is dropped. OnContextDone(ctx *Context) // BeforeRecv registers handler which will be called before Recv. BeforeRecv(h func(net.Conn) error) // BeforeSend registers handler which will be called before Send. BeforeSend(h func(net.Conn) error) // BatchRecv returns BatchRecv flag. BatchRecv() bool // SetBatchRecv sets BatchRecv flag. SetBatchRecv(batch bool) // BatchSend returns BatchSend flag. BatchSend() bool // SetBatchSend sets BatchSend flag. SetBatchSend(batch bool) // AsyncWrite returns AsyncWrite flag. AsyncWrite() bool // SetAsyncWrite sets AsyncWrite flag. SetAsyncWrite(async bool) // AsyncResponse returns AsyncResponse flag. AsyncResponse() bool // SetAsyncResponse sets AsyncResponse flag. SetAsyncResponse(async bool) // WrapReader wraps net.Conn to Read data with io.Reader. WrapReader(conn net.Conn) io.Reader // SetReaderWrapper registers reader wrapper for net.Conn. SetReaderWrapper(wrapper func(conn net.Conn) io.Reader) // Send writes buffer data to a connection. Send(c net.Conn, buffer []byte) (int, error) // SendN writes multiple buffer data to a connection. SendN(conn net.Conn, buffers net.Buffers) (int, error) // RecvBufferSize returns client's recv buffer size. RecvBufferSize() int // SetRecvBufferSize sets client's recv buffer size. SetRecvBufferSize(size int) // SendBufferSize returns client's send buffer size. SendBufferSize() int // SetSendBufferSize sets client's send buffer size. SetSendBufferSize(size int) // ReadTimeout returns client's read timeout. ReadTimeout() time.Duration // SetReadTimeout sets client's read timeout. SetReadTimeout(timeout time.Duration) // WriteTimeout returns client's write timeout. WriteTimeout() time.Duration // SetWriteTimeout sets client's write timeout. SetWriteTimeout(timeout time.Duration) // SendQueueSize returns client's send queue channel capacity. SendQueueSize() int // SetSendQueueSize sets client's send queue channel capacity. SetSendQueueSize(size int) // MaxBodyLen returns max body length of a message. MaxBodyLen() int // SetMaxBodyLen sets max body length of a message. SetMaxBodyLen(l int) // Use registers method/router handler middleware. Use(h HandlerFunc) // UseCoder registers message coding middleware, // coder.Encode will be called before message send, // coder.Decode will be called after message recv. UseCoder(coder MessageCoder) // Coders returns coding middlewares. Coders() []MessageCoder // Handle registers method/router handler. // // If pass a Boolean value of "true", the handler will be called asynchronously in a new goroutine, // Else the handler will be called synchronously in the client's reading goroutine one by one. Handle(m string, h HandlerFunc, argv ...interface{}) // HandleNotFound registers "" method/router handler, // It will be called when mothod/router is not found. HandleNotFound(h HandlerFunc) // OnMessage finds method/router middlewares and handler, then call them one by one. OnMessage(c *Client, m *Message) // Malloc makes a buffer by size. Malloc(size int) []byte // HandleMalloc registers buffer maker. HandleMalloc(f func(size int) []byte) // Append append bytes to buffer. Append(b []byte, more ...byte) []byte // HandleAppend registers buffer appender. HandleAppend(f func(b []byte, more ...byte) []byte) // Free release a buffer. Free([]byte) // HandleFree registers buffer releaser. HandleFree(f func(buf []byte)) Context() (context.Context, context.CancelFunc) SetContext(ctx context.Context, cancel context.CancelFunc) Cancel() // NewMessage creates a Message. NewMessage(cmd byte, method string, v interface{}, isError bool, isAsync bool, seq uint64, codec codec.Codec, values map[interface{}]interface{}) *Message // NewMessageWithBuffer creates a message with the buffer and manage the message by the pool. // The buffer arg should be managed by a pool if EnablePool(true) . NewMessageWithBuffer(buffer []byte) *Message // SetAsyncExecutor sets executor. SetAsyncExecutor(executor func(f func())) // AsyncExecute executes a func AsyncExecute(f func()) // SetAsyncExecutor sets whether communication type is RPC only. SetRpcOnly(b bool) // SetAsyncExecutor returns whether communication type is RPC only. RpcOnly() bool }
Handler defines net message handler interface.
var DefaultHandler Handler = NewHandler()
DefaultHandler is the default Handler used by arpc
type HandlerFunc ¶
type HandlerFunc func(*Context)
HandlerFunc defines message handler of arpc middleware and method/router.
type Message ¶
type Message struct { Buffer []byte // contains filtered or unexported fields }
Message represents an arpc Message.
func NewMessage ¶
func NewMessage(cmd byte, method string, v interface{}, isError bool, isAsync bool, seq uint64, h Handler, codec codec.Codec, values map[interface{}]interface{}) *Message
NewMessage creates a Message.
func (*Message) IsFlagBitSet ¶
IsFlagBitSet returns flag bit value.
func (*Message) Release ¶
func (m *Message) Release()
Release decrement the reference count and returns the current value.
func (*Message) ResetAttrs ¶
func (m *Message) ResetAttrs()
ResetAttrs resets reserved/cmd/flag/methodLen to 0.
func (*Message) Set ¶
func (m *Message) Set(key interface{}, value interface{})
Set sets key-value pair.
func (*Message) SetFlagBit ¶
SetFlagBit sets flag bit value by index.
func (*Message) SetMethodLen ¶
SetMethodLen sets method length.
type MessageCoder ¶
type MessageCoder interface { // Encode wrap message before send to client Encode(*Client, *Message) *Message // Decode unwrap message between recv and handle Decode(*Client, *Message) *Message }
MessageCoder defines Message coding middleware interface.
type Server ¶
type Server struct { Accepted int64 CurrLoad int64 MaxLoad int64 Codec codec.Codec Handler Handler Listener net.Listener // contains filtered or unexported fields }
Server represents an arpc Server.
func (*Server) ForEachWithFilter ¶
func (*Server) NewMessage ¶
NewMessage creates a Message.
type WebsocketConn ¶
type WebsocketConn interface {
HandleWebsocket(func())
}
WebsocketConn defines websocket-conn interface.