Documentation ¶
Index ¶
- Constants
- Variables
- func ConnIDFromContext(ctx context.Context) int64
- func NewContextWithConnID(ctx context.Context, connID int64) context.Context
- func NewContextWithMessage(ctx context.Context, msg Packet) context.Context
- func NewContextWithServer(ctx context.Context, s *Server) context.Context
- func PutRPCPacket(p *RPCPacket)
- func SetTimestamp(ctx context.Context, t int64, tags ...interface{}) context.Context
- type Channel
- func (c *Channel) AddPendingTimer(ctx *timer.Context)
- func (c *Channel) CancelTimer(ctx *timer.Context)
- func (c *Channel) Close() error
- func (c *Channel) ConnID() int64
- func (c *Channel) ContextValue(k interface{}) interface{}
- func (c *Channel) LastActive() int64
- func (c *Channel) LocalAddr() net.Addr
- func (c *Channel) Name() string
- func (c *Channel) RemoteAddr() net.Addr
- func (c *Channel) RunAfter(delay time.Duration, interval time.Duration, cb func(time.Time, WriteCloser), ...) *timer.Context
- func (c *Channel) SetContextValue(k, v interface{})
- func (c *Channel) SetLastActive(heart int64)
- func (c *Channel) SetName(name string)
- func (c *Channel) Start()
- func (c *Channel) Write(ctx context.Context, packet Packet) (int, error)
- type ClientChannel
- type Codec
- type ErrUndefined
- type HandlePoolType
- type Handler
- type HandlerFunc
- type Hashable
- type LineCodec
- type LinePacket
- type MessageHandler
- type OnTimeOut
- type Option
- func CustomCodecOption(codec func() Codec) Option
- func HandlerBufferSizeOption(indicator int) Option
- func MetricsTags(kvs ...interface{}) Option
- func OnCloseOption(cb func(WriteCloser)) Option
- func OnConnectOption(cb func(WriteCloser) bool) Option
- func OnMessageOption(cb func(Packet, WriteCloser), poolType ...HandlePoolType) Option
- func ReadTimeoutOption(t time.Duration) Option
- func TimerBufferSizeOption(indicator int) Option
- func WithLogger(l *log.Logger) Option
- func WorkerSizeOption(workerSz int) Option
- func WriteTimeoutOption(t time.Duration) Option
- func WriterBufferSizeOption(indicator int) Option
- type Packet
- type RPCCodec
- type RPCNegoPacket
- type RPCPacket
- func (rp *RPCPacket) AddHeader(key []byte, value []byte)
- func (rp *RPCPacket) ForeachHeader(cb func(key, value []byte) error) error
- func (rp *RPCPacket) GetHeader(key []byte) ([]byte, bool)
- func (rp *RPCPacket) GetHeaderUint32(key []byte) (uint32, bool)
- func (rp *RPCPacket) GetHeaderUint64(key []byte) (uint64, bool)
- func (rp *RPCPacket) Serialize() ([]byte, error)
- func (rp *RPCPacket) Timestamp() int64
- func (rp *RPCPacket) Type() int32
- type RPCPacketHeader
- type Server
- func (s *Server) Broadcast(ctx context.Context, msg Packet, except func(connid int64) bool)
- func (s *Server) Conn(id int64) (*ServerChannel, bool)
- func (s *Server) Register(msgType int32, handler HandlerFunc, handleType HandlePoolType)
- func (s *Server) Start(l net.Listener) error
- func (s *Server) Stop()
- func (s *Server) Unicast(ctx context.Context, id int64, msg Packet) (int, error)
- type ServerChannel
- type Stats
- type WorkerPool
- type WriteCloser
Constants ¶
const ( PacketTypeRequest = 0 PacketTypeResponse = 1 PacketTypeHint = 2 )
const ( RPCDefaultMaxHeaderLen = 10 * 1024 * 1024 RPCDefaultMaxBodyLen = 10 * 1024 * 1024 RPCDefaultMaxPeerIDLen = 1024 )
RPC body/header limit
const (
HintTypeKeepalive = 0
)
const (
RPCNegoMessageType = 0xFFFF
)
RPC Nego msg type
Variables ¶
var ( ErrRPCHeaderSizeLimit = errors.New("rpc header size max error") ErrRPCBodySizeLimit = errors.New("rpc body size max error") ErrRPCPeerIDSizeLimit = errors.New("rpc peerid size max error") )
rpc codec errors
var ( ErrWouldBlock = errors.New("would block") ErrServerClosed = errors.New("server has been closed") )
Error codes returned by failures dealing with server or connection.
var Buffer bufferPool
Functions ¶
func ConnIDFromContext ¶
func NewContextWithConnID ¶
func NewContextWithMessage ¶
NewContextWithMessage returns a new Context that carries message.
func NewContextWithServer ¶
NewContextWithServer returns a new Context that carries server.
func PutRPCPacket ¶
func PutRPCPacket(p *RPCPacket)
Types ¶
type Channel ¶
type Channel struct {
// contains filtered or unexported fields
}
Channel represents a server connection to a TCP server, it implments Conn.
func NewChannel ¶
NewChannel returns a new Channel
func (*Channel) AddPendingTimer ¶
AddPendingTimer adds a new timer ID to client connection.
func (*Channel) CancelTimer ¶
CancelTimer cancels a timer with the specified ID.
func (*Channel) Close ¶
Close gracefully closes the server connection. It blocked until all sub go-routines are completed and returned.
func (*Channel) ContextValue ¶
func (c *Channel) ContextValue(k interface{}) interface{}
ContextValue gets extra data from server connection.
func (*Channel) LastActive ¶
LastActive returns the heart beats of server connection.
func (*Channel) RemoteAddr ¶
RemoteAddr returns the remote address of server connection.
func (*Channel) SetContextValue ¶
func (c *Channel) SetContextValue(k, v interface{})
SetContextValue sets extra data to server connection.
func (*Channel) SetLastActive ¶
SetLastActive sets the heart beats of server connection.
type ClientChannel ¶
type ClientChannel struct { *Channel // contains filtered or unexported fields }
func NewClientChannel ¶
func NewClientChannel(c net.Conn, opt ...Option) *ClientChannel
type Codec ¶
type Codec interface { Decode(*bufio.Reader) (Packet, error) Encode(Packet, io.Writer) (int, error) }
网络读包
type ErrUndefined ¶
type ErrUndefined int32
ErrUndefined for undefined message type.
func (ErrUndefined) Error ¶
func (e ErrUndefined) Error() string
type HandlePoolType ¶
type HandlePoolType int8
HandlePoolType handle pool type
const ( HandleNoPooled HandlePoolType = iota HandlePooledRandom HandlePooledStick HandlePoolNewRoutine )
handler type
type Handler ¶
type Handler interface {
Handle(context.Context, WriteCloser)
}
Handler takes the responsibility to handle incoming messages.
type HandlerFunc ¶
type HandlerFunc func(context.Context, WriteCloser)
HandlerFunc serves as an adapter to allow the use of ordinary functions as handlers.
func (HandlerFunc) Handle ¶
func (f HandlerFunc) Handle(ctx context.Context, c WriteCloser)
Handle calls f(ctx, c)
type Hashable ¶
type Hashable interface {
HashCode() int32
}
Hashable is a interface for hashable object.
type LinePacket ¶
type LinePacket struct {
Payload []byte
}
func (*LinePacket) Serialize ¶
func (lp *LinePacket) Serialize() ([]byte, error)
func (*LinePacket) Timestamp ¶
func (lp *LinePacket) Timestamp() int64
func (*LinePacket) Type ¶
func (lp *LinePacket) Type() int32
type MessageHandler ¶
type MessageHandler struct {
// contains filtered or unexported fields
}
MessageHandler is a combination of message and its handler function.
type OnTimeOut ¶
type OnTimeOut struct { Callback func(time.Time, WriteCloser) Ctx context.Context AskForWorkers bool }
OnTimeOut represents a timed task.
func NewOnTimeOut ¶
NewOnTimeOut returns OnTimeOut.
type Option ¶
type Option func(*options)
Option sets server options.
func CustomCodecOption ¶
CustomCodecOption returns a Option that will apply a custom Codec.
func HandlerBufferSizeOption ¶
HandlerBufferSizeOption returns a Option that is the size of handler buffered channel,
func OnCloseOption ¶
func OnCloseOption(cb func(WriteCloser)) Option
OnCloseOption returns a Option that will set callback to call when client closed.
func OnConnectOption ¶
func OnConnectOption(cb func(WriteCloser) bool) Option
OnConnectOption returns a Option that will set callback to call when new client connected.
func OnMessageOption ¶
func OnMessageOption(cb func(Packet, WriteCloser), poolType ...HandlePoolType) Option
OnMessageOption returns a Option that will set callback to call when new message arrived.
func ReadTimeoutOption ¶
ReadTimeoutOption SetSocket read timeout
func TimerBufferSizeOption ¶
TimerBufferSizeOption returns a Option that is the size of timer buffered channel,
func WorkerSizeOption ¶
WorkerSizeOption returns a Option that will set the number of go-routines in WorkerPool.
func WriteTimeoutOption ¶
WriteTimeoutOption Option SetSocket write timeout
func WriterBufferSizeOption ¶
WriterBufferSizeOption returns a Option that is the size of writer buffered channel,
type RPCCodec ¶
type RPCNegoPacket ¶
func (*RPCNegoPacket) Serialize ¶
func (rnp *RPCNegoPacket) Serialize() ([]byte, error)
func (*RPCNegoPacket) Timestamp ¶
func (rnp *RPCNegoPacket) Timestamp() int64
func (*RPCNegoPacket) Type ¶
func (rnp *RPCNegoPacket) Type() int32
type RPCPacket ¶
type RPCPacket struct { ID int64 Code int32 Header []RPCPacketHeader Payload []byte Tp int32 Flags int32 // timestamp T int64 }
func GetRPCPacket ¶
func GetRPCPacket() *RPCPacket
func (*RPCPacket) ForeachHeader ¶
type RPCPacketHeader ¶
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
func ServerFromContext ¶
func (*Server) Conn ¶
func (s *Server) Conn(id int64) (*ServerChannel, bool)
Conn returns a server connection with specified ID.
func (*Server) Register ¶
func (s *Server) Register(msgType int32, handler HandlerFunc, handleType HandlePoolType)
type ServerChannel ¶
type ServerChannel struct {
*Channel
}
func NewServerChannel ¶
func NewServerChannel(id int64, s *Server, c net.Conn) *ServerChannel
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool is a pool of go-routines running functions.
func WorkerPoolInstance ¶
func WorkerPoolInstance() *WorkerPool
WorkerPoolInstance returns the global pool.
func (*WorkerPool) Close ¶
func (wp *WorkerPool) Close()
Close closes the pool, stopping it from executing functions.
func (*WorkerPool) Put ¶
func (wp *WorkerPool) Put(code uint32, cb func()) error
Put appends a function to some worker's channel.