gev

package module
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 2, 2020 License: MIT Imports: 22 Imported by: 0

README

gev

Github Actions Go Report Card Codacy Badge GoDoc LICENSE Code Size

中文 | English

gev is a lightweight, fast non-blocking TCP network library based on Reactor mode.

Support custom protocols to quickly and easily build high-performance servers.

Features

  • High-performance event loop based on epoll and kqueue
  • Support multi-core and multi-threading
  • Dynamic expansion of read and write buffers implemented by Ring Buffer
  • Asynchronous read and write
  • SO_REUSEPORT port reuse support
  • Automatically clean up idle connections
  • Support WebSocket/Protobuf
  • Support for scheduled tasks, delayed tasks
  • Support for custom protocols

Network model

gev uses only a few goroutines, one of them listens for connections and the others (work coroutines) handle read and write events of connected clients. The count of work coroutines is configurable, which is the core number of host CPUs by default.

Performance Test

📈 Test chart

Test environment: Ubuntu18.04 | 4 Virtual CPUs | 4.0 GiB

Throughput Test

limit GOMAXPROCS=1(Single thread),1 work goroutine

image

limit GOMAXPROCS=4,4 work goroutine

image

Other Test
Speed ​​Test

Compared with the simple performance of similar libraries, the pressure measurement method is the same as the evio project.

  • gnet
  • eviop
  • evio
  • net (StdLib)

limit GOMAXPROCS=1,1 work goroutine

image

limit GOMAXPROCS=1,4 work goroutine

image

limit GOMAXPROCS=4,4 work goroutine

image

Install

go get -u github.com/FTwOoO/gev

Getting start

echo demo
package main

import (
	"flag"
	"strconv"

	"github.com/FTwOoO/gev"
	"github.com/FTwOoO/gev/connection"
)

type example struct{}

func (s *example) OnConnect(c *connection.Connection) {
	//log.Println(" OnConnect : ", c.PeerAddr())
}
func (s *example) OnMessage(c *connection.Connection, ctx interface{}, data []byte) (out []byte) {
	//log.Println("OnMessage")
	out = data
	return
}

func (s *example) OnClose(c *connection.Connection) {
	//log.Println("OnClose")
}

func main() {
	handler := new(example)
	var port int
	var loops int

	flag.IntVar(&port, "port", 1833, "server port")
	flag.IntVar(&loops, "loops", -1, "num loops")
	flag.Parse()

	s, err := gev.NewServer(handler,
		gev.Address(":"+strconv.Itoa(port)),
		gev.NumLoops(loops))
	if err != nil {
		panic(err)
	}

	s.Start()
}

Handler is an interface that programs must implement.

type Handler interface {
	OnConnect(c *connection.Connection)
	OnMessage(c *connection.Connection, ctx interface{}, data []byte) []byte
	OnClose(c *connection.Connection)
}

func NewServer(handler Handler, opts ...Option) (server *Server, err error)

OnMessage will be called back when a complete data frame arrives.Users can get the data, process the business logic, and return the data that needs to be sent.

When there is data coming, gev does not call back OnMessage immediately, but instead calls back an UnPacket function.Probably the execution logic is as follows:

ctx, receivedData := c.protocol.UnPacket(c, buffer)
	if ctx != nil || len(receivedData) != 0 {
		sendData := c.OnMessage(c, ctx, receivedData)
		if len(sendData) > 0 {
			return c.protocol.Packet(c, sendData)
		}
	}

protocol

The UnPacket function will check whether the data in the ringbuffer is a complete data frame. If it is, the data will be unpacked and return the payload data. If it is not a complete data frame, it will return directly.

The return value of UnPacket (interface{}, []byte) will be passed in as a call to OnMessage ctx interface{}, data []byte and callback.Ctx is designed to pass special information generated when parsing data frames in the UnPacket function (which is required for complex data frame protocols), and data is used to pass payload data.

type Protocol interface {
	UnPacket(c *Connection, buffer *ringbuffer.RingBuffer) (interface{}, []byte)
	Packet(c *Connection, data []byte) []byte
}

type DefaultProtocol struct{}

func (d *DefaultProtocol) UnPacket(c *Connection, buffer *ringbuffer.RingBuffer) (interface{}, []byte) {
	ret := buffer.Bytes()
	buffer.RetrieveAll()
	return nil, ret
}

func (d *DefaultProtocol) Packet(c *Connection, data []byte) []byte {
	return data
}

As above, gev provides a default Protocol implementation that will fetch all data in the receive buffer ( ringbuffer ).In actual use, there is usually a data frame protocol of its own, and gev can be set in the form of a plug-in: it is set by variable parameters when creating Server.

s, err := gev.NewServer(handler,gev.Protocol(&ExampleProtocol{}))

Check out the example Protocol for a detailed.

There is also a Send method that can be used for sending data. But Send puts the data to Event-Loop and invokes it to send the data rather than sending data by itself immediately.

Check out the example Server timing push for a detailed.

func (c *Connection) Send(buffer []byte) error

ShutdownWrite works for reverting connected status to false and closing connection.

Check out the example Maximum connections for a detailed.

func (c *Connection) ShutdownWrite() error

RingBuffer is a dynamical expansion implementation of circular buffer.

WebSocket

The WebSocket protocol is built on top of the TCP protocol, so gev doesn't need to be built in, but instead provides support in the form of plugins, in the plugins/websocket directory.

code
type Protocol struct {
	upgrade *ws.Upgrader
}

func New(u *ws.Upgrader) *Protocol {
	return &Protocol{upgrade: u}
}

func (p *Protocol) UnPacket(c *connection.Connection, buffer *ringbuffer.RingBuffer) (ctx interface{}, out []byte) {
	upgraded := c.Context()
	if upgraded == nil {
		var err error
		out, _, err = p.upgrade.Upgrade(buffer)
		if err != nil {
			log.Println("Websocket Upgrade :", err)
			return
		}
		c.SetContext(true)
	} else {
		header, err := ws.VirtualReadHeader(buffer)
		if err != nil {
			log.Println(err)
			return
		}
		if buffer.VirtualLength() >= int(header.Length) {
			buffer.VirtualFlush()

			payload := make([]byte, int(header.Length))
			_, _ = buffer.Read(payload)

			if header.Masked {
				ws.Cipher(payload, header.Mask, 0)
			}

			ctx = &header
			out = payload
		} else {
			buffer.VirtualRevert()
		}
	}
	return
}

func (p *Protocol) Packet(c *connection.Connection, data []byte) []byte {
	return data
}

The detailed implementation can be viewed by the plugin. The source code can be viewed using the websocket example.

Example

echo server
package main

import (
	"flag"
	"strconv"

	"github.com/FTwOoO/gev"
	"github.com/FTwOoO/gev/connection"
)

type example struct{}

func (s *example) OnConnect(c *connection.Connection) {
	//log.Println(" OnConnect : ", c.PeerAddr())
}
func (s *example) OnMessage(c *connection.Connection, ctx interface{}, data []byte) (out []byte) {
	//log.Println("OnMessage")
	out = data
	return
}

func (s *example) OnClose(c *connection.Connection) {
	//log.Println("OnClose")
}

func main() {
	handler := new(example)
	var port int
	var loops int

	flag.IntVar(&port, "port", 1833, "server port")
	flag.IntVar(&loops, "loops", -1, "num loops")
	flag.Parse()

	s, err := gev.NewServer(handler,
		gev.Network("tcp"),
		gev.Address(":"+strconv.Itoa(port)),
		gev.NumLoops(loops))
	if err != nil {
		panic(err)
	}

	s.Start()
}
Automatically clean up idle connections
package main

import (
	"flag"
	"strconv"
	"time"

	"github.com/FTwOoO/gev"
	"github.com/FTwOoO/gev/connection"
	"github.com/FTwOoO/gev/log"
)

type example struct {
}

func (s *example) OnConnect(c *connection.Connection) {
	log.Info(" OnConnect : ", c.PeerAddr())
}
func (s *example) OnMessage(c *connection.Connection, ctx interface{}, data []byte) (out []byte) {
	log.Infof("OnMessage from : %s", c.PeerAddr())
	out = data
	return
}

func (s *example) OnClose(c *connection.Connection) {
	log.Info("OnClose: ", c.PeerAddr())
}

func main() {
	handler := new(example)
	var port int
	var loops int

	flag.IntVar(&port, "port", 1833, "server port")
	flag.IntVar(&loops, "loops", -1, "num loops")
	flag.Parse()

	s, err := gev.NewServer(handler,
		gev.Network("tcp"),
		gev.Address(":"+strconv.Itoa(port)),
		gev.NumLoops(loops),
		gev.IdleTime(5*time.Second))
	if err != nil {
		panic(err)
	}

	s.Start()
}
Maximum connections
package main

import (
	"log"

	"github.com/FTwOoO/gev"
	"github.com/FTwOoO/gev/connection"
	"github.com/FTwOoO/gev/toolkit/sync/atomic"
)

// Server example
type Server struct {
	clientNum     atomic.Int64
	maxConnection int64
	server        *gev.Server
}

// New server
func New(ip, port string, maxConnection int64) (*Server, error) {
	var err error
	s := new(Server)
	s.maxConnection = maxConnection
	s.server, err = gev.NewServer(s,
		gev.Address(ip+":"+port))
	if err != nil {
		return nil, err
	}

	return s, nil
}

// Start server
func (s *Server) Start() {
	s.server.Start()
}

// Stop server
func (s *Server) Stop() {
	s.server.Stop()
}

// OnConnect callback
func (s *Server) OnConnect(c *connection.Connection) {
	s.clientNum.Add(1)
	log.Println(" OnConnect : ", c.PeerAddr())

	if s.clientNum.Get() > s.maxConnection {
		_ = c.ShutdownWrite()
		log.Println("Refused connection")
		return
	}
}

// OnMessage callback
func (s *Server) OnMessage(c *connection.Connection, ctx interface{}, data []byte) (out []byte) {
	log.Println("OnMessage")
	out = data
	return
}

// OnClose callback
func (s *Server) OnClose(c *connection.Connection) {
	s.clientNum.Add(-1)
	log.Println("OnClose")
}

func main() {
	s, err := New("", "1833", 1)
	if err != nil {
		panic(err)
	}
	defer s.Stop()

	s.Start()
}
Server timing push
package main

import (
	"container/list"
	"github.com/FTwOoO/gev"
	"github.com/FTwOoO/gev/connection"
	"log"
	"sync"
	"time"
)

// Server example
type Server struct {
	conn   *list.List
	mu     sync.RWMutex
	server *gev.Server
}

// New server
func New(ip, port string) (*Server, error) {
	var err error
	s := new(Server)
	s.conn = list.New()
	s.server, err = gev.NewServer(s,
		gev.Address(ip+":"+port))
	if err != nil {
		return nil, err
	}

	return s, nil
}

// Start server
func (s *Server) Start() {
	s.server.RunEvery(1*time.Second, s.RunPush)
	s.server.Start()
}

// Stop server
func (s *Server) Stop() {
	s.server.Stop()
}

// RunPush push message
func (s *Server) RunPush() {
	var next *list.Element

	s.mu.RLock()
	defer s.mu.RUnlock()

	for e := s.conn.Front(); e != nil; e = next {
		next = e.Next()

		c := e.Value.(*connection.Connection)
		_ = c.Send([]byte("hello\n"))
	}
}

// OnConnect callback
func (s *Server) OnConnect(c *connection.Connection) {
	log.Println(" OnConnect : ", c.PeerAddr())

	s.mu.Lock()
	e := s.conn.PushBack(c)
	s.mu.Unlock()
	c.SetContext(e)
}

// OnMessage callback
func (s *Server) OnMessage(c *connection.Connection, ctx interface{}, data []byte) (out []byte) {
	log.Println("OnMessage")
	out = data
	return
}

// OnClose callback
func (s *Server) OnClose(c *connection.Connection) {
	log.Println("OnClose")
	e := c.Context().(*list.Element)

	s.mu.Lock()
	s.conn.Remove(e)
	s.mu.Unlock()
}

func main() {
	s, err := New("", "1833")
	if err != nil {
		panic(err)
	}
	defer s.Stop()

	s.Start()
}
WebSocket
package main

import (
	"flag"
	"log"
	"math/rand"
	"strconv"

	"github.com/FTwOoO/gev"
	"github.com/FTwOoO/gev/connection"
	"github.com/FTwOoO/gev/plugins/websocket/ws"
	"github.com/FTwOoO/gev/plugins/websocket/ws/util"
)

type example struct{}

func (s *example) OnConnect(c *connection.Connection) {
	log.Println(" OnConnect : ", c.PeerAddr())
}
func (s *example) OnMessage(c *connection.Connection, data []byte) (messageType ws.MessageType, out []byte) {
	log.Println("OnMessage:", string(data))
	messageType = ws.MessageBinary
	switch rand.Int() % 3 {
	case 0:
		out = data
	case 1:
		msg, err := util.PackData(ws.MessageText, data)
		if err != nil {
			panic(err)
		}
		if err := c.Send(msg); err != nil {
			msg, err := util.PackCloseData(err.Error())
			if err != nil {
				panic(err)
			}
			if e := c.Send(msg); e != nil {
				panic(e)
			}
		}
	case 2:
		msg, err := util.PackCloseData("close")
		if err != nil {
			panic(err)
		}
		if e := c.Send(msg); e != nil {
			panic(e)
		}
	}
	return
}

func (s *example) OnClose(c *connection.Connection) {
	log.Println("OnClose")
}

func main() {
	handler := new(example)
	var port int
	var loops int

	flag.IntVar(&port, "port", 1833, "server port")
	flag.IntVar(&loops, "loops", -1, "num loops")
	flag.Parse()

	s, err := NewWebSocketServer(handler, &ws.Upgrader{},
		gev.Network("tcp"),
		gev.Address(":"+strconv.Itoa(port)),
		gev.NumLoops(loops))
	if err != nil {
		panic(err)
	}

	s.Start()
}
package main

import (
	"github.com/FTwOoO/gev"
	"github.com/FTwOoO/gev/plugins/websocket"
	"github.com/FTwOoO/gev/plugins/websocket/ws"
)

// NewWebSocketServer 创建 WebSocket Server
func NewWebSocketServer(handler websocket.WebSocketHandler, u *ws.Upgrader, opts ...gev.Option) (server *gev.Server, err error) {
	opts = append(opts, gev.Protocol(websocket.New(u)))
	return gev.NewServer(websocket.NewHandlerWrap(u, handler), opts...)
}
protobuf
package main

import (
	"flag"
	"log"
	"strconv"

	"github.com/FTwOoO/gev"
	"github.com/FTwOoO/gev/connection"
	pb "github.com/FTwOoO/gev/example/protobuf/proto"
	"github.com/FTwOoO/gev/plugins/protobuf"
	"github.com/golang/protobuf/proto"
)

type example struct{}

func (s *example) OnConnect(c *connection.Connection) {
	log.Println(" OnConnect : ", c.PeerAddr())
}
func (s *example) OnMessage(c *connection.Connection, ctx interface{}, data []byte) (out []byte) {
	msgType := ctx.(string)

	switch msgType {
	case "msg1":
		msg := &pb.Msg1{}
		if err := proto.Unmarshal(data, msg); err != nil {
			log.Println(err)
		}
		log.Println(msgType, msg)
	case "msg2":
		msg := &pb.Msg2{}
		if err := proto.Unmarshal(data, msg); err != nil {
			log.Println(err)
		}
		log.Println(msgType, msg)
	default:
		log.Println("unknown msg type")
	}

	return
}

func (s *example) OnClose(c *connection.Connection) {
	log.Println("OnClose")
}

func main() {
	handler := new(example)
	var port int
	var loops int

	flag.IntVar(&port, "port", 1833, "server port")
	flag.IntVar(&loops, "loops", -1, "num loops")
	flag.Parse()

	s, err := gev.NewServer(handler,
		gev.Network("tcp"),
		gev.Address(":"+strconv.Itoa(port)),
		gev.NumLoops(loops),
		gev.Protocol(&protobuf.Protocol{}))
	if err != nil {
		panic(err)
	}

	log.Println("server start")
	s.Start()
}
package main

import (
	"bufio"
	"fmt"
	"log"
	"math/rand"
	"net"
	"os"

	pb "github.com/FTwOoO/gev/example/protobuf/proto"
	"github.com/FTwOoO/gev/plugins/protobuf"
	"github.com/golang/protobuf/proto"
)

func main() {
	conn, e := net.Dial("tcp", ":1833")
	if e != nil {
		log.Fatal(e)
	}
	defer conn.Close()

	var buffer []byte
	for {
		reader := bufio.NewReader(os.Stdin)
		fmt.Print("Text to send: ")
		text, _ := reader.ReadString('\n')
		name := text[:len(text)-1]

		switch rand.Int() % 2 {
		case 0:
			msg := &pb.Msg1{
				Name: name,
				Id:   1,
			}

			data, err := proto.Marshal(msg)
			if err != nil {
				panic(err)
			}
			buffer = protobuf.PackMessage("msg1", data)
		case 1:
			msg := &pb.Msg2{
				Name:  name,
				Alias: "big " + name,
				Id:    2,
			}

			data, err := proto.Marshal(msg)
			if err != nil {
				panic(err)
			}
			buffer = protobuf.PackMessage("msg2", data)
		}

		_, err := conn.Write(buffer)
		if err != nil {
			panic(err)
		}
	}
}

Thanks

Thanks JetBrains for the free open source license

References

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrConnectionClosed = errors.New("connection closed")

Functions

This section is empty.

Types

type CloseCallback added in v0.2.1

type CloseCallback func(c *Connection)

CloseCallback 关闭回调函数

type Connection added in v0.2.1

type Connection struct {
	// contains filtered or unexported fields
}

TCP 连接

func NewConnection added in v0.2.1

func NewConnection(fd int, loop *EventLoop, sa *unix.Sockaddr, tw *timingwheel.TimingWheel, idleTime time.Duration, readCb ReadCallback, closeCb CloseCallback) *Connection

func (*Connection) Close added in v0.2.1

func (c *Connection) Close() error

关闭连接

func (*Connection) Connected added in v0.2.1

func (c *Connection) Connected() bool

Connected 是否已连接

func (*Connection) Context added in v0.2.1

func (c *Connection) Context() interface{}

Context 获取 Context

func (*Connection) GetDecoderFlags added in v0.2.8

func (c *Connection) GetDecoderFlags() int

func (*Connection) GetLoop added in v0.2.2

func (c *Connection) GetLoop() *EventLoop

func (*Connection) HandleEvent added in v0.2.1

func (c *Connection) HandleEvent(fd int, events poller.Event)

内部使用,event loop 回调

func (*Connection) PeerAddr added in v0.2.1

func (c *Connection) PeerAddr() string

PeerAddr 获取客户端地址信息

func (*Connection) Send added in v0.2.1

func (c *Connection) Send(data []byte)

func (*Connection) SendBuffer added in v0.3.0

func (c *Connection) SendBuffer(buffer *bytes.Buffer)

func (*Connection) SetContext added in v0.2.1

func (c *Connection) SetContext(ctx interface{})

SetContext 设置 Context

func (*Connection) SetDecoderFlags added in v0.2.8

func (c *Connection) SetDecoderFlags(v int)

func (*Connection) ShutdownWrite added in v0.2.1

func (c *Connection) ShutdownWrite() error

关闭可写端,等待读取完接收缓冲区所有数据

type EventLoop added in v0.2.1

type EventLoop struct {
	// contains filtered or unexported fields
}

func NewLoop added in v0.2.1

func NewLoop() (*EventLoop, error)

func (*EventLoop) AddSocketAndEnableRead added in v0.2.1

func (l *EventLoop) AddSocketAndEnableRead(fd int, s Socket) error

AddSocketAndEnableRead 增加 Socket 到时间循环中,并注册可读事件

func (*EventLoop) DeleteFd added in v0.2.1

func (l *EventLoop) DeleteFd(fd int)

func (*EventLoop) EnableRead added in v0.2.1

func (l *EventLoop) EnableRead(fd int) error

EnableRead 只注册可写事件

func (*EventLoop) EnableReadWrite added in v0.2.1

func (l *EventLoop) EnableReadWrite(fd int) error

EnableReadWrite 注册可读可写事件

func (*EventLoop) GetBytesBufferPool added in v0.2.1

func (c *EventLoop) GetBytesBufferPool() *bytesbuffer.Pool

在一个eventloop中使用,每个eventLoop是串行执行的,没有并发问题

func (*EventLoop) GetBytesPool added in v0.2.1

func (c *EventLoop) GetBytesPool() *pbytes.Pool

在一个eventloop中使用,每个eventLoop是串行执行的,没有并发问题

func (*EventLoop) IterSocketsIn added in v0.2.5

func (l *EventLoop) IterSocketsIn(ids []string, f func(s Socket))

func (*EventLoop) PacketBuf added in v0.2.1

func (l *EventLoop) PacketBuf() []byte

PacketBuf 内部使用,临时缓冲区,直接避免内存池的反复使用和回收

func (*EventLoop) QueueInLoop added in v0.2.1

func (l *EventLoop) QueueInLoop(f func())

QueueInLoop 添加 func 到事件循环中执行

func (*EventLoop) RunLoop added in v0.2.1

func (l *EventLoop) RunLoop()

RunLoop 启动事件循环

func (*EventLoop) Stop added in v0.2.1

func (l *EventLoop) Stop() error

Stop 关闭事件循环

type HandleConnFunc added in v0.2.1

type HandleConnFunc func(fd int, sa *unix.Sockaddr)

HandleConnFunc 处理新连接

type Handler

type Handler interface {
	OnConnect(c *Connection)
	OnMessage(c *Connection, buffer *ringbuffer.RingBuffer)
	OnClose(c *Connection)
}

type Listener added in v0.2.1

type Listener struct {
	// contains filtered or unexported fields
}

Listener 监听TCP连接

func NewListener added in v0.2.1

func NewListener(network, addr string, reusePort bool, loop *EventLoop, handlerConn HandleConnFunc) (*Listener, error)

func (*Listener) Close added in v0.2.1

func (l *Listener) Close() error

func (*Listener) Fd added in v0.2.1

func (l *Listener) Fd() int

func (*Listener) HandleEvent added in v0.2.1

func (l *Listener) HandleEvent(fd int, events poller.Event)

HandleEvent 内部使用,供 event loop 回调处理事件

type Option

type Option func(*Options)

func Address

func Address(a string) Option

Address server 监听地址

func IdleTime

func IdleTime(t time.Duration) Option

IdleTime 最大空闲时间(秒)

func Network

func Network(n string) Option

Network [tcp] 暂时只支持tcp

func NumLoops

func NumLoops(n int) Option

NumLoops work eventloop 的数量

func ReusePort

func ReusePort(reusePort bool) Option

ReusePort 设置 SO_REUSEPORT

type Options

type Options struct {
	Network   string
	Address   string
	NumLoops  int
	ReusePort bool

	IdleTime time.Duration
	// contains filtered or unexported fields
}

Options 服务配置

type ReadCallback added in v0.2.1

type ReadCallback func(c *Connection, buffer *ringbuffer.RingBuffer)

ReadCallback 数据可读回调函数

type Server

type Server struct {
	// contains filtered or unexported fields
}

func NewServer

func NewServer(handler Handler, opts ...Option) (server *Server, err error)

func (*Server) ForEventLoops added in v0.2.3

func (s *Server) ForEventLoops(F func(e *EventLoop))

func (*Server) Options

func (s *Server) Options() Options

func (*Server) Start

func (s *Server) Start()

func (*Server) Stop

func (s *Server) Stop()

type Socket added in v0.2.1

type Socket interface {
	HandleEvent(fd int, events poller.Event)
	Close() error
}

Directories

Path Synopsis
plugins
toolkit
pool
Package pool contains helpers for pooling structures distinguishable by size.
Package pool contains helpers for pooling structures distinguishable by size.
pool/pbufio
Package pbufio contains tools for pooling bufio.Reader and bufio.Writers.
Package pbufio contains tools for pooling bufio.Reader and bufio.Writers.
pool/pbytes
Package pbytes contains tools for pooling byte pool.
Package pbytes contains tools for pooling byte pool.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL