tao

package module
v0.0.0-...-2c485cb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2020 License: Apache-2.0 Imports: 22 Imported by: 0

README

Tao

Light-weight TCP Asynchronous gOlang framework 轻量级TCP异步框架,Go语言实现 1.6.0

GitHub stars GitHub forks GitHub license GoDoc

Requirements

  • Golang 1.9 and above

Installation

go get -u -v github.com/leesper/tao

Usage

A Chat Server Example in 50 Lines
package main

import (
	"fmt"
	"net"

	"github.com/leesper/holmes"
	"github.com/leesper/tao"
	"github.com/leesper/tao/examples/chat"
)

// ChatServer is the chatting server.
type ChatServer struct {
	*tao.Server
}

// NewChatServer returns a ChatServer.
func NewChatServer() *ChatServer {
	onConnectOption := tao.OnConnectOption(func(conn tao.WriteCloser) bool {
		holmes.Infoln("on connect")
		return true
	})
	onErrorOption := tao.OnErrorOption(func(conn tao.WriteCloser) {
		holmes.Infoln("on error")
	})
	onCloseOption := tao.OnCloseOption(func(conn tao.WriteCloser) {
		holmes.Infoln("close chat client")
	})
	return &ChatServer{
		tao.NewServer(onConnectOption, onErrorOption, onCloseOption),
	}
}

func main() {
	defer holmes.Start().Stop()

	tao.Register(chat.ChatMessage, chat.DeserializeMessage, chat.ProcessMessage)

	l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", "0.0.0.0", 12345))
	if err != nil {
		holmes.Fatalln("listen error", err)
	}
	chatServer := NewChatServer()
	err = chatServer.Start(l)
	if err != nil {
		holmes.Fatalln("start error", err)
	}
}

Changelog

v1.6.0
  1. Bugfix: writeLoop() drains all pending messages before exit;
    writeLoop()函数退出前将所有的网络数据包发送完毕;
  2. Renaming getter methods according to Effective Go;
    根据Effective Go重命名getter方法;
  3. Bugfix: timer task expired forever due to system clock affected by NTP;
    修复因为受NTP协议校正系统时钟偏差的影响,导致定时任务永远过期的bug;
  4. Bugfix: asyncWrite() do not return error if called after ServerConn or ClientConn closes;
    修复网络连接关闭后调用asyncWrite()不返回错误的bug;
  5. Providing WorkerSizeOption() for tuning the size of worker go-routine pool;
    提供WorkerSizeOption()来调节工作者线程池大小;
  6. Providing BufferSizeOption() for tuning the size of buffered channel;
    提供BufferSizeOption()来调节缓冲通道大小;
  7. Providing ReconnectOption() for activating ClientConn's reconnecting mechanism;
    提供ReconnectOption()来启动ClientConn的断线重连机制;
  8. Providing CustomCodecOption() for setting self-defined codec;
    提供CustomCodecOption() 来设置自定义编解码器;
  9. Providing TLSCredsOption() for running a TLS server;
    提供TLSCredsOption()来运行TLS服务器;
  10. Providing OnConnectOption(), OnMessageOption(), OnCloseOption() and OnErrorOption() for setting callbacks of the four situations respectively;
    提供OnConnectOption(), OnMessageOption(), OnCloseOption() 和 OnErrorOption()来设置四种情况下的回调函数;
  11. Use the standard sync.Map instead of map guarded by rwmutex;
    使用标准库中的sync.Map替换使用rwmutex保护的map;
v1.5.0
  1. A Golang-style redesigning of the overall framework, a reduce about 500+ lines of codes;
    按照Go语言风格重新设计的整体框架,精简500多行代码;
  2. Providing new Server, ClientConn and ServerConn struct and a WriteCloser interface;
    提供Server,ClientConn和ServerConn三种新结构和WriteCloser新接口;
  3. Using standard context package to manage and spread request-scoped data acrossing go-routines;
    使用标准库中的context包在多个Go线程中管理和传播与请求有关的数据;
  4. Graceful stopping, all go-routines are related by context, and they will be noticed and exit when server stops or connection closes;
    优雅停机,所有的Go线程都通过上下文进行关联,当服务器停机或连接关闭时它们都会收到通知并执行退出;
  5. Providing new type HandlerFunc func(context.Context, WriteCloser) for defining message handlers;
    提供新的HandlerFunc类型来定义消息处理器;
  6. Developers can now use NewContextWithMessage() and MessageFromContext() to put and get message they are about to use in handler function's context, this also leads to a more clarified design;
    开发者现在可以通过NewContextWithMessage()和MessageFromContext()函数来在上下文中存取他们将在处理器函数中使用的消息,这样的设计更简洁;
  7. Go-routine functions readLoop(), writeLoop() and handleLoop() are all optimized to serve both ServerConn and ClientConn, serveral dead-lock bugs such as blocking on channels are fixed;
    优化Go线程函数readLoop(),writeLoop()和handleLoop()使得它们能同时为ServerConn和ClientConn服务,修复了多个“通道阻塞”的死锁问题;
  8. Reconnecting mechanism of ClientConn is redesigned and optimized;
    重新设计和优化ClientConn的断线重连机制;
v1.4.0
  1. bugfix:TLS重连失败问题;
    bugfix: failed to reconnect the TLS connection;
  2. bugfix:ConnectionMap死锁问题;
    bugfix: ConnectionMap dead-lock problem;
  3. 优化TCP网络连接的关闭过程;
    Optimize the closing process of TCP connection;
  4. 优化服务器的关闭过程;
    Optimize the closing process of server;
  5. 更优雅的消息处理注册接口;
    More elegant message handler register interface;
v1.3.0
  1. bugfix:修复断线重连状态不一致问题;
    bugfix: fixed inconsistent status caused by reconnecting;
  2. bugfix:修复ServerConnection和TimingWheel在连接关闭时并发访问导致崩溃问题;
    bugfix: fixed a corruption caused by concurrent accessing between ServerConnection and TimingWheel during connection closing;
  3. 无锁且线程安全的TimingWheel,优化CPU占用率;
    Lock-free and thread-safe TimingWheel, optimized occupancy rate;
  4. bugfix:修复TLS配置文件读取函数;
    bugfix: Fixed errors when loading TLS config;
  5. 添加消息相关的Context结构;简化消息注册机制,直接注册处理函数到HandlerMap;
    A message-related Context struct added; Register handler functions in HandlerMap directly to simplify message registration mechanism;
  6. 合并NewClientConnection()和NewTLSClientConnection(),提供一致的API;
    Combine NewTLSConnection() into NewClientConnection(), providing a consistent API;
  7. 工作者线程池改造成单例模式;
    Make WorkerPool a singleton pattern;
  8. 使用Holmes日志库代替glog;
    Using Holmes logging package instead of glog;
  9. 添加metrics.go:基于expvar标准包导出服务器关键信息;
    Add metrics.go: exporting critical server information based on expvar standard pacakge;
  10. 编写中文版框架设计原理文档,英文版正在翻译中;
    A document about framework designing principles in Chinese, English version under developed;
v1.2.0
  1. 更优雅的消息注册接口;
    More elegant message register interface;
  2. TCPConnection的断线重连机制;
    TCPConnection reconnecting upon closing;
  3. bugfix:协议未注册时不关闭客户端连接;
    bugfix: Don't close client when messages not registered;
  4. bugfix:在readLoop()协程中处理心跳时间戳更新;
    bugfix: Updating heart-beat timestamp in readLoop() go-routine;
  5. bugfix:Message接口使用Serialize()替代之前的MarshalBinary(),以免框架使用者使用gob.Encoder/Decoder的时候栈溢出;
    bugfix: Use Serialize() instead of MarshalBinary() in Message interface, preventing stack overflows when framework users use gob.Encoder/Decoder;
  6. bugfix:当应用层数据长度大于0时才对其进行序列化;
    bugfix: Serialize application data when its length greater than 0;
  7. 新API:SetCodec(),允许TCPConnection自定义编解码器;
    New API: SetCodec() allowing TCPConnection defines its own codec;
  8. 新API:SetDBInitializer(),允许框架使用者定义数据访问接口;
    New API: SetDBInitializer() allowing framework users define data access interface;
  9. 允许框架使用者在TCPConnection上设置自定义数据;
    Allowing framework users setting custom data on TCPConnection;
  10. 为新客户端连接的启动单独开辟一个对应的go协程;
    Allocating a corresponding go-routine for newly-connected clients respectively;
  11. bugfix:写事件循环在连接关闭时将信道中的数据全部发送出去;
    bugfix: writeLoop() flushes all packets left in channel when performing closing;
  12. bugfix:服务器和客户端连接等待所有go协程关闭后再退出;
    bugfix: Servers and client connections wait for the exits of all go-routines before shutting down;
  13. 重构Server和Connection,采用针对接口编程的设计;
    Refactoring Server and Connection, adopting a programming-by-interface design;
  14. 设置500毫秒读超时,防止readLoop()发生阻塞;
    Setting 500ms read-timeout prevents readLoop() from blocking;
v1.1.0
  1. 添加注释,提高代码可读性;
    Add comments, make it more readable;
  2. 限制服务器的最大并发连接数(默认1000);
    Server max connections limit (default to 1000);
  3. 新API:NewTLSTCPServer() 创建传输层安全的TCP服务器;
    New API: NewTLSTCPServer() for creating TLS-supported TCP server;
  4. 新特性:SetOnScheduleCallback() 由框架使用者来定义计划任务(比如心跳);
    New Feature: SetOnScheduleCallback() make scheduled task managed by framwork users(such as heart beat);
  5. 新特性:支持默认的消息编解码器TypeLengthValueCodec,并允许框架使用者开发自定义编解码器;
    Support TypeLengthValueCodec by default, while allowing framework users develop their own codecs;
v1.0.0
  1. 完全异步的读,写以及消息处理;
    Completely asynchronous reading, writing and message handling;
  2. 工作者协程池;
    Worker go-routine pool;
  3. 并发数据结构和原子数据类型;
    Concurrent data structure and atomic data types;
  4. 毫秒精度的定时器功能;
    Millisecond-precision timer function;
  5. 传输层安全支持;
    Transport layer security support;
  6. 应用层心跳协议;
    Application-level heart-beating protocol;

More Documentation

  1. Tao - Go语言实现的TCP网络编程框架
  2. English(TBD)

Documentation

Overview

Package tao implements a light-weight TCP network programming framework.

Server represents a TCP server with various ServerOption supported.

1. Provides custom codec by CustomCodecOption; 2. Provides TLS server by TLSCredsOption; 3. Provides callback on connected by OnConnectOption; 4. Provides callback on meesage arrived by OnMessageOption; 5. Provides callback on closed by OnCloseOption; 6. Provides callback on error occurred by OnErrorOption;

ServerConn represents a connection on the server side.

ClientConn represents a connection connect to other servers. You can make it reconnectable by passing ReconnectOption when creating.

AtomicInt64, AtomicInt32 and AtomicBoolean are providing concurrent-safe atomic types in a Java-like style while ConnMap is a go-routine safe map for connection management.

Every handler function is defined as func(context.Context, WriteCloser). Usually a meesage and a net ID are shifted within the Context, developers can retrieve them by calling the following functions.

func NewContextWithMessage(ctx context.Context, msg Message) context.Context
func MessageFromContext(ctx context.Context) Message
func NewContextWithNetID(ctx context.Context, netID int64) context.Context
func NetIDFromContext(ctx context.Context) int64

Programmers are free to define their own request-scoped data and put them in the context, but must be sure that the data is safe for multiple go-routines to access.

Every message must define according to the interface and a deserialization function:

  type Message interface {
	 MessageNumber() int32
   Serialize() ([]byte, error)
  }

  func Deserialize(data []byte) (message Message, err error)

There is a TypeLengthValueCodec defined, but one can also define his/her own codec:

  type Codec interface {
	  Decode(net.Conn) (Message, error)
	  Encode(Message) ([]byte, error)
  }

TimingWheel is a safe timer for running timed callbacks on connection.

WorkerPool is a go-routine pool for running message handlers, you can fetch one by calling func WorkerPoolInstance() *WorkerPool.

Index

Constants

View Source
const (
	// MessageTypeBytes is the length of type header.
	MessageTypeBytes = 4
	// MessageLenBytes is the length of length header.
	MessageLenBytes = 4
	// MessageMaxBytes is the maximum bytes allowed for application data.
	MessageMaxBytes = 1 << 23 // 8M
)
View Source
const (
	MaxConnections = 1000
	BufferSize128  = 128
	BufferSize256  = 256
	BufferSize512  = 512
	BufferSize1024 = 1024
)

definitions about some constants.

View Source
const (
	// HeartBeat is the default heart beat message number.
	HeartBeat = 0
)

Variables

View Source
var (
	ErrParameter     = errors.New("parameter error")
	ErrNilKey        = errors.New("nil key")
	ErrNilValue      = errors.New("nil value")
	ErrWouldBlock    = errors.New("would block")
	ErrNotHashable   = errors.New("not hashable")
	ErrNilData       = errors.New("nil data")
	ErrBadData       = errors.New("more than 8M data")
	ErrNotRegistered = errors.New("handler not registered")
	ErrServerClosed  = errors.New("server has been closed")
)

Error codes returned by failures dealing with server or connection.

Functions

func HandleHeartBeat

func HandleHeartBeat(ctx context.Context, c WriteCloser)

HandleHeartBeat updates connection heart beat timestamp.

func LoadTLSConfig

func LoadTLSConfig(certFile, keyFile string, isSkipVerify bool) (*tls.Config, error)

LoadTLSConfig returns a TLS configuration with the specified cert and key file.

func MonitorOn

func MonitorOn(port int)

MonitorOn starts up an HTTP monitor on port.

func NetIDFromContext

func NetIDFromContext(ctx context.Context) int64

NetIDFromContext returns a net ID from a Context.

func NewContextWithMessage

func NewContextWithMessage(ctx context.Context, msg Message) context.Context

NewContextWithMessage returns a new Context that carries message.

func NewContextWithNetID

func NewContextWithNetID(ctx context.Context, netID int64) context.Context

NewContextWithNetID returns a new Context that carries net ID.

func Register

func Register(msgType int32, unmarshaler func([]byte) (Message, error), handler func(context.Context, WriteCloser))

Register registers the unmarshal and handle functions for msgType. If no unmarshal function provided, the message will not be parsed. If no handler function provided, the message will not be handled unless you set a default one by calling SetOnMessageCallback. If Register being called twice on one msgType, it will panics.

Types

type AtomicBoolean

type AtomicBoolean int32

AtomicBoolean provides atomic boolean type.

func NewAtomicBoolean

func NewAtomicBoolean(initialValue bool) *AtomicBoolean

NewAtomicBoolean returns an atomic boolean type.

func (*AtomicBoolean) CompareAndSet

func (a *AtomicBoolean) CompareAndSet(oldValue, newValue bool) bool

CompareAndSet compares boolean with expected value, if equals as expected then sets the updated value, this operation performs atomically.

func (*AtomicBoolean) Get

func (a *AtomicBoolean) Get() bool

Get returns the value of boolean atomically.

func (*AtomicBoolean) GetAndSet

func (a *AtomicBoolean) GetAndSet(newValue bool) bool

GetAndSet sets new value and returns the old atomically.

func (*AtomicBoolean) Set

func (a *AtomicBoolean) Set(newValue bool)

Set sets the value of boolean atomically.

func (*AtomicBoolean) String

func (a *AtomicBoolean) String() string

type AtomicInt32

type AtomicInt32 int32

AtomicInt32 provides atomic int32 type.

func NewAtomicInt32

func NewAtomicInt32(initialValue int32) *AtomicInt32

NewAtomicInt32 returns an atomoic int32 type.

func (*AtomicInt32) AddAndGet

func (a *AtomicInt32) AddAndGet(delta int32) int32

AddAndGet adds the value by delta and then gets the value, this operation performs atomically.

func (*AtomicInt32) CompareAndSet

func (a *AtomicInt32) CompareAndSet(expect, update int32) bool

CompareAndSet compares int64 with expected value, if equals as expected then sets the updated value, this operation performs atomically.

func (*AtomicInt32) DecrementAndGet

func (a *AtomicInt32) DecrementAndGet() int32

DecrementAndGet decrements the value by 1 and then gets the value, this operation performs atomically.

func (*AtomicInt32) Get

func (a *AtomicInt32) Get() int32

Get returns the value of int32 atomically.

func (*AtomicInt32) GetAndAdd

func (a *AtomicInt32) GetAndAdd(delta int32) int32

GetAndAdd gets the old value and then add by delta, this operation performs atomically.

func (*AtomicInt32) GetAndDecrement

func (a *AtomicInt32) GetAndDecrement() int32

GetAndDecrement gets the old value and then decrement by 1, this operation performs atomically.

func (*AtomicInt32) GetAndIncrement

func (a *AtomicInt32) GetAndIncrement() int32

GetAndIncrement gets the old value and then increment by 1, this operation performs atomically.

func (*AtomicInt32) GetAndSet

func (a *AtomicInt32) GetAndSet(newValue int32) (oldValue int32)

GetAndSet sets new value and returns the old atomically.

func (*AtomicInt32) IncrementAndGet

func (a *AtomicInt32) IncrementAndGet() int32

IncrementAndGet increments the value by 1 and then gets the value, this operation performs atomically.

func (*AtomicInt32) Set

func (a *AtomicInt32) Set(newValue int32)

Set sets the value of int32 atomically.

func (*AtomicInt32) String

func (a *AtomicInt32) String() string

type AtomicInt64

type AtomicInt64 int64

AtomicInt64 provides atomic int64 type.

func NewAtomicInt64

func NewAtomicInt64(initialValue int64) *AtomicInt64

NewAtomicInt64 returns an atomic int64 type.

func (*AtomicInt64) AddAndGet

func (a *AtomicInt64) AddAndGet(delta int64) int64

AddAndGet adds the value by delta and then gets the value, this operation performs atomically.

func (*AtomicInt64) CompareAndSet

func (a *AtomicInt64) CompareAndSet(expect, update int64) bool

CompareAndSet compares int64 with expected value, if equals as expected then sets the updated value, this operation performs atomically.

func (*AtomicInt64) DecrementAndGet

func (a *AtomicInt64) DecrementAndGet() int64

DecrementAndGet decrements the value by 1 and then gets the value, this operation performs atomically.

func (*AtomicInt64) Get

func (a *AtomicInt64) Get() int64

Get returns the value of int64 atomically.

func (*AtomicInt64) GetAndAdd

func (a *AtomicInt64) GetAndAdd(delta int64) int64

GetAndAdd gets the old value and then add by delta, this operation performs atomically.

func (*AtomicInt64) GetAndDecrement

func (a *AtomicInt64) GetAndDecrement() int64

GetAndDecrement gets the old value and then decrement by 1, this operation performs atomically.

func (*AtomicInt64) GetAndIncrement

func (a *AtomicInt64) GetAndIncrement() int64

GetAndIncrement gets the old value and then increment by 1, this operation performs atomically.

func (*AtomicInt64) GetAndSet

func (a *AtomicInt64) GetAndSet(newValue int64) int64

GetAndSet sets new value and returns the old atomically.

func (*AtomicInt64) IncrementAndGet

func (a *AtomicInt64) IncrementAndGet() int64

IncrementAndGet increments the value by 1 and then gets the value, this operation performs atomically.

func (*AtomicInt64) Set

func (a *AtomicInt64) Set(newValue int64)

Set sets the value of int64 atomically.

func (*AtomicInt64) String

func (a *AtomicInt64) String() string

type ClientConn

type ClientConn struct {
	// contains filtered or unexported fields
}

ClientConn represents a client connection to a TCP server.

func NewClientConn

func NewClientConn(netid int64, c net.Conn, opt ...ServerOption) *ClientConn

NewClientConn returns a new client connection which has not started to serve requests yet.

func (*ClientConn) AddPendingTimer

func (cc *ClientConn) AddPendingTimer(timerID int64)

AddPendingTimer adds a new timer ID to client connection.

func (*ClientConn) CancelTimer

func (cc *ClientConn) CancelTimer(timerID int64)

CancelTimer cancels a timer with the specified ID.

func (*ClientConn) Close

func (cc *ClientConn) Close()

Close gracefully closes the client connection. It blocked until all sub go-routines are completed and returned.

func (*ClientConn) ContextValue

func (cc *ClientConn) ContextValue(k interface{}) interface{}

ContextValue gets extra data from client connection.

func (*ClientConn) HeartBeat

func (cc *ClientConn) HeartBeat() int64

HeartBeat gets the heart beats of client connection.

func (*ClientConn) LocalAddr

func (cc *ClientConn) LocalAddr() net.Addr

LocalAddr returns the local address of server connection.

func (*ClientConn) Name

func (cc *ClientConn) Name() string

Name gets the name of client connection.

func (*ClientConn) NetID

func (cc *ClientConn) NetID() int64

NetID returns the net ID of client connection.

func (*ClientConn) RemoteAddr

func (cc *ClientConn) RemoteAddr() net.Addr

RemoteAddr returns the remote address of server connection.

func (*ClientConn) RunAfter

func (cc *ClientConn) RunAfter(duration time.Duration, callback func(time.Time, WriteCloser)) int64

RunAfter runs a callback right after the specified duration ellapsed.

func (*ClientConn) RunAt

func (cc *ClientConn) RunAt(timestamp time.Time, callback func(time.Time, WriteCloser)) int64

RunAt runs a callback at the specified timestamp.

func (*ClientConn) RunEvery

func (cc *ClientConn) RunEvery(interval time.Duration, callback func(time.Time, WriteCloser)) int64

RunEvery runs a callback on every interval time.

func (*ClientConn) SetContextValue

func (cc *ClientConn) SetContextValue(k, v interface{})

SetContextValue sets extra data to client connection.

func (*ClientConn) SetHeartBeat

func (cc *ClientConn) SetHeartBeat(heart int64)

SetHeartBeat sets the heart beats of client connection.

func (*ClientConn) SetName

func (cc *ClientConn) SetName(name string)

SetName sets the name of client connection.

func (*ClientConn) Start

func (cc *ClientConn) Start()

Start starts the client connection, creating go-routines for reading, writing and handlng.

func (*ClientConn) Write

func (cc *ClientConn) Write(message Message) error

Write writes a message to the client.

type Codec

type Codec interface {
	Decode(net.Conn) (Message, error)
	Encode(Message) ([]byte, error)
}

Codec is the interface for message coder and decoder. Application programmer can define a custom codec themselves.

type ErrUndefined

type ErrUndefined int32

ErrUndefined for undefined message type.

func (ErrUndefined) Error

func (e ErrUndefined) Error() string

type Handler

type Handler interface {
	Handle(context.Context, interface{})
}

Handler takes the responsibility to handle incoming messages.

type HandlerFunc

type HandlerFunc func(context.Context, WriteCloser)

HandlerFunc serves as an adapter to allow the use of ordinary functions as handlers.

func GetHandlerFunc

func GetHandlerFunc(msgType int32) HandlerFunc

GetHandlerFunc returns the corresponding handler function for msgType.

func (HandlerFunc) Handle

func (f HandlerFunc) Handle(ctx context.Context, c WriteCloser)

Handle calls f(ctx, c)

type Hashable

type Hashable interface {
	HashCode() int32
}

Hashable is a interface for hashable object.

type HeartBeatMessage

type HeartBeatMessage struct {
	Timestamp int64
}

HeartBeatMessage for application-level keeping alive.

func (HeartBeatMessage) MessageNumber

func (hbm HeartBeatMessage) MessageNumber() int32

MessageNumber returns message number.

func (HeartBeatMessage) Serialize

func (hbm HeartBeatMessage) Serialize() ([]byte, error)

Serialize serializes HeartBeatMessage into bytes.

type Message

type Message interface {
	MessageNumber() int32
	Serialize() ([]byte, error)
}

Message represents the structured data that can be handled.

func DeserializeHeartBeat

func DeserializeHeartBeat(data []byte) (message Message, err error)

DeserializeHeartBeat deserializes bytes into Message.

func MessageFromContext

func MessageFromContext(ctx context.Context) Message

MessageFromContext extracts a message from a Context.

type MessageHandler

type MessageHandler struct {
	// contains filtered or unexported fields
}

MessageHandler is a combination of message and its handler function.

type OnTimeOut

type OnTimeOut struct {
	Callback func(time.Time, WriteCloser)
	Ctx      context.Context
}

OnTimeOut represents a timed task.

func NewOnTimeOut

func NewOnTimeOut(ctx context.Context, cb func(time.Time, WriteCloser)) *OnTimeOut

NewOnTimeOut returns OnTimeOut.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is a server to serve TCP requests.

func NewServer

func NewServer(opt ...ServerOption) *Server

NewServer returns a new TCP server which has not started to serve requests yet.

func ServerFromContext

func ServerFromContext(ctx context.Context) (*Server, bool)

ServerFromContext returns the server within the context.

func (*Server) Broadcast

func (s *Server) Broadcast(msg Message)

Broadcast broadcasts message to all server connections managed.

func (*Server) Conn

func (s *Server) Conn(id int64) (*ServerConn, bool)

Conn returns a server connection with specified ID.

func (*Server) ConnsSize

func (s *Server) ConnsSize() int

ConnsSize returns connections size.

func (*Server) Sched

func (s *Server) Sched(dur time.Duration, sched func(time.Time, WriteCloser))

Sched sets a callback to invoke every duration.

func (*Server) Start

func (s *Server) Start(l net.Listener) error

Start starts the TCP server, accepting new clients and creating service go-routine for each. The service go-routines read messages and then call the registered handlers to handle them. Start returns when failed with fatal errors, the listener willl be closed when returned.

func (*Server) Stop

func (s *Server) Stop()

Stop gracefully closes the server, it blocked until all connections are closed and all go-routines are exited.

func (*Server) Unicast

func (s *Server) Unicast(id int64, msg Message) error

Unicast unicasts message to a specified conn.

type ServerConn

type ServerConn struct {
	// contains filtered or unexported fields
}

ServerConn represents a server connection to a TCP server, it implments Conn.

func NewServerConn

func NewServerConn(id int64, s *Server, c net.Conn) *ServerConn

NewServerConn returns a new server connection which has not started to serve requests yet.

func (*ServerConn) AddPendingTimer

func (sc *ServerConn) AddPendingTimer(timerID int64)

AddPendingTimer adds a timer ID to server Connection.

func (*ServerConn) CancelTimer

func (sc *ServerConn) CancelTimer(timerID int64)

CancelTimer cancels a timer with the specified ID.

func (*ServerConn) Close

func (sc *ServerConn) Close()

Close gracefully closes the server connection. It blocked until all sub go-routines are completed and returned.

func (*ServerConn) ContextValue

func (sc *ServerConn) ContextValue(k interface{}) interface{}

ContextValue gets extra data from server connection.

func (*ServerConn) HeartBeat

func (sc *ServerConn) HeartBeat() int64

HeartBeat returns the heart beats of server connection.

func (*ServerConn) LocalAddr

func (sc *ServerConn) LocalAddr() net.Addr

LocalAddr returns the local address of server connection.

func (*ServerConn) Name

func (sc *ServerConn) Name() string

Name returns the name of server connection.

func (*ServerConn) NetID

func (sc *ServerConn) NetID() int64

NetID returns net ID of server connection.

func (*ServerConn) RemoteAddr

func (sc *ServerConn) RemoteAddr() net.Addr

RemoteAddr returns the remote address of server connection.

func (*ServerConn) RunAfter

func (sc *ServerConn) RunAfter(duration time.Duration, callback func(time.Time, WriteCloser)) int64

RunAfter runs a callback right after the specified duration ellapsed.

func (*ServerConn) RunAt

func (sc *ServerConn) RunAt(timestamp time.Time, callback func(time.Time, WriteCloser)) int64

RunAt runs a callback at the specified timestamp.

func (*ServerConn) RunEvery

func (sc *ServerConn) RunEvery(interval time.Duration, callback func(time.Time, WriteCloser)) int64

RunEvery runs a callback on every interval time.

func (*ServerConn) SetContextValue

func (sc *ServerConn) SetContextValue(k, v interface{})

SetContextValue sets extra data to server connection.

func (*ServerConn) SetHeartBeat

func (sc *ServerConn) SetHeartBeat(heart int64)

SetHeartBeat sets the heart beats of server connection.

func (*ServerConn) SetName

func (sc *ServerConn) SetName(name string)

SetName sets name of server connection.

func (*ServerConn) Start

func (sc *ServerConn) Start()

Start starts the server connection, creating go-routines for reading, writing and handlng.

func (*ServerConn) Write

func (sc *ServerConn) Write(message Message) error

Write writes a message to the client.

type ServerOption

type ServerOption func(*options)

ServerOption sets server options.

func BufferSizeOption

func BufferSizeOption(indicator int) ServerOption

BufferSizeOption returns a ServerOption that is the size of buffered channel, for example an indicator of BufferSize256 means a size of 256.

func CustomCodecOption

func CustomCodecOption(codec Codec) ServerOption

CustomCodecOption returns a ServerOption that will apply a custom Codec.

func OnCloseOption

func OnCloseOption(cb func(WriteCloser)) ServerOption

OnCloseOption returns a ServerOption that will set callback to call when client closed.

func OnConnectOption

func OnConnectOption(cb func(WriteCloser) bool) ServerOption

OnConnectOption returns a ServerOption that will set callback to call when new client connected.

func OnErrorOption

func OnErrorOption(cb func(WriteCloser)) ServerOption

OnErrorOption returns a ServerOption that will set callback to call when error occurs.

func OnMessageOption

func OnMessageOption(cb func(Message, WriteCloser)) ServerOption

OnMessageOption returns a ServerOption that will set callback to call when new message arrived.

func ReconnectOption

func ReconnectOption() ServerOption

ReconnectOption returns a ServerOption that will make ClientConn reconnectable.

func TLSCredsOption

func TLSCredsOption(config *tls.Config) ServerOption

TLSCredsOption returns a ServerOption that will set TLS credentials for server connections.

func WorkerSizeOption

func WorkerSizeOption(workerSz int) ServerOption

WorkerSizeOption returns a ServerOption that will set the number of go-routines in WorkerPool.

type TimingWheel

type TimingWheel struct {
	// contains filtered or unexported fields
}

TimingWheel manages all the timed task.

func NewTimingWheel

func NewTimingWheel(ctx context.Context) *TimingWheel

NewTimingWheel returns a *TimingWheel ready for use.

func (*TimingWheel) AddTimer

func (tw *TimingWheel) AddTimer(when time.Time, interv time.Duration, to *OnTimeOut) int64

AddTimer adds new timed task.

func (*TimingWheel) CancelTimer

func (tw *TimingWheel) CancelTimer(timerID int64)

CancelTimer cancels a timed task with specified timer ID.

func (*TimingWheel) Size

func (tw *TimingWheel) Size() int

Size returns the number of timed tasks.

func (*TimingWheel) Stop

func (tw *TimingWheel) Stop()

Stop stops the TimingWheel.

func (*TimingWheel) TimeOutChannel

func (tw *TimingWheel) TimeOutChannel() chan *OnTimeOut

TimeOutChannel returns the timeout channel.

type TypeLengthValueCodec

type TypeLengthValueCodec struct{}

TypeLengthValueCodec defines a special codec. Format: type-length-value |4 bytes|4 bytes|n bytes <= 8M|

func (TypeLengthValueCodec) Decode

func (codec TypeLengthValueCodec) Decode(raw net.Conn) (Message, error)

Decode decodes the bytes data into Message

func (TypeLengthValueCodec) Encode

func (codec TypeLengthValueCodec) Encode(msg Message) ([]byte, error)

Encode encodes the message into bytes data.

type UnmarshalFunc

type UnmarshalFunc func([]byte) (Message, error)

UnmarshalFunc unmarshals bytes into Message.

func GetUnmarshalFunc

func GetUnmarshalFunc(msgType int32) UnmarshalFunc

GetUnmarshalFunc returns the corresponding unmarshal function for msgType.

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool is a pool of go-routines running functions.

func WorkerPoolInstance

func WorkerPoolInstance() *WorkerPool

WorkerPoolInstance returns the global pool.

func (*WorkerPool) Close

func (wp *WorkerPool) Close()

Close closes the pool, stopping it from executing functions.

func (*WorkerPool) Put

func (wp *WorkerPool) Put(k interface{}, cb func()) error

Put appends a function to some worker's channel.

func (*WorkerPool) Size

func (wp *WorkerPool) Size() int

Size returns the size of pool.

type WriteCloser

type WriteCloser interface {
	Write(Message) error
	Close()
}

WriteCloser is the interface that groups Write and Close methods.

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL