gev

package module
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 15, 2019 License: MIT Imports: 10 Imported by: 0

README

gev

Github Actions Go Report Card Codacy Badge GoDoc LICENSE Code Size

[English]

gev 是一个轻量、快速的基于 Reactor 模式的非阻塞 TCP 网络库。

特点

  • 基于 epoll 和 kqueue 实现的高性能事件循环
  • 支持多核多线程
  • 动态扩容 Ring Buffer 实现的读写缓冲区
  • 异步读写
  • SO_REUSEPORT 端口重用支持

网络模型

gev 只使用极少的 goroutine, 一个 goroutine 负责监听客户端连接,其他 goroutine (work 协程)负责处理已连接客户端的读写事件,work 协程数量可以配置,默认与运行主机 CPU 数量相同。

image

性能测试

测试环境 Ubuntu18.04 | 4 Virtual CPUs | 4.0 GiB

吞吐量测试

限制 GOMAXPROCS=1(单线程),1 个 work 协程

image

限制 GOMAXPROCS=4,4 个 work 协程

image

其他测试
速度测试

和同类库的简单性能比较, 压测方式与 evio 项目相同。

  • gnet
  • eviop
  • evio
  • net (标准库)

限制 GOMAXPROCS=1,1 个 work 协程

image

限制 GOMAXPROCS=1,4 个 work 协程

image

限制 GOMAXPROCS=4,4 个 work 协程

image

安装

go get -u github.com/Allenxuxu/gev

快速入门

package main

import (
	"log"

	"github.com/Allenxuxu/gev"
	"github.com/Allenxuxu/gev/connection"
	"github.com/Allenxuxu/ringbuffer"
)

type example struct{}

func (s *example) OnConnect(c *connection.Connection) {
	log.Println(" OnConnect : ", c.PeerAddr())
}

func (s *example) OnMessage(c *connection.Connection, buffer *ringbuffer.RingBuffer) (out []byte) {
	log.Println("OnMessage")
	first, end := buffer.PeekAll()
	out = first
	if len(end) > 0 {
		out = append(out, end...)
	}
	buffer.RetrieveAll()
	return
}

func (s *example) OnClose(c *connection.Connection) {
	log.Println("OnClose")
}

func main() {
	handler := new(example)

	s, err := gev.NewServer(handler,
		gev.Address(":1833"),
		gev.NumLoops(2),
		gev.ReusePort(true))
	if err != nil {
		panic(err)
	}

	s.Start()
}

Handler 是一个接口,我们的程序必须实现它。

type Handler interface {
	OnConnect(c *connection.Connection)
	OnMessage(c *connection.Connection, buffer *ringbuffer.RingBuffer) []byte
	OnClose(c *connection.Connection)
}

func NewServer(handler Handler, opts ...Option) (server *Server, err error) {

在消息到来时,gev 会回调 OnMessage ,在这个函数中可以通过返回一个切片来发送数据给客户端。

func (s *example) OnMessage(c *connection.Connection, buffer *ringbuffer.RingBuffer) (out []byte)

Connection 还提供 Send 方法来发送数据。Send 并不会立刻发送数据,而是先添加到 event loop 的任务队列中,然后唤醒 event loop 去发送。

更详细的使用方式可以参考示例:服务端定时推送

func (c *Connection) Send(buffer []byte) error

Connection ShutdownWrite 会关闭写端,从而断开连接。

更详细的使用方式可以参考示例:限制最大连接数

func (c *Connection) ShutdownWrite() error

RingBuffer 是一个动态扩容的循环缓冲区实现。

示例

echo server
package main

import (
	"flag"
	"strconv"
	"log"

	"github.com/Allenxuxu/gev"
	"github.com/Allenxuxu/gev/connection"
	"github.com/Allenxuxu/ringbuffer"
)

type example struct{}

func (s *example) OnConnect(c *connection.Connection) {
	log.Println(" OnConnect : ", c.PeerAddr())
}
func (s *example) OnMessage(c *connection.Connection, buffer *ringbuffer.RingBuffer) (out []byte) {
	//log.Println("OnMessage")
	first, end := buffer.PeekAll()
	out = first
	if len(end) > 0 {
		out = append(out, end...)
	}
	buffer.RetrieveAll()
	return
}

func (s *example) OnClose() {
	log.Println("OnClose")
}

func main() {
	handler := new(example)
	var port int
	var loops int

	flag.IntVar(&port, "port", 1833, "server port")
	flag.IntVar(&loops, "loops", -1, "num loops")
	flag.Parse()

	s, err := gev.NewServer(handler,
		gev.Network("tcp"),
		gev.Address(":"+strconv.Itoa(port)),
		gev.NumLoops(loops))
	if err != nil {
		panic(err)
	}

	s.Start()
}
限制最大连接数
package main

import (
	"log"

	"github.com/Allenxuxu/gev"
	"github.com/Allenxuxu/gev/connection"
	"github.com/Allenxuxu/ringbuffer"
	"github.com/Allenxuxu/toolkit/sync/atomic"
)

type Server struct {
	clientNum     atomic.Int64
	maxConnection int64
	server        *gev.Server
}

func New(ip, port string, maxConnection int64) (*Server, error) {
	var err error
	s := new(Server)
	s.maxConnection = maxConnection
	s.server, err = gev.NewServer(s,
		gev.Address(ip+":"+port))
	if err != nil {
		return nil, err
	}

	return s, nil
}

func (s *Server) Start() {
	s.server.Start()
}

func (s *Server) Stop() {
	s.server.Stop()
}

func (s *Server) OnConnect(c *connection.Connection) {
	s.clientNum.Add(1)
	log.Println(" OnConnect : ", c.PeerAddr())

	if s.clientNum.Get() > s.maxConnection {
		_ = c.ShutdownWrite()
		log.Println("Refused connection")
		return
	}
}
func (s *Server) OnMessage(c *connection.Connection, buffer *ringbuffer.RingBuffer) (out []byte) {
	log.Println("OnMessage")
	first, end := buffer.PeekAll()
	out = first
	if len(end) > 0 {
		out = append(out, end...)
	}
	buffer.RetrieveAll()
	return
}

func (s *Server) OnClose() {
	s.clientNum.Add(-1)
	log.Println("OnClose")
}

func main() {
	s, err := New("", "1833", 1)
	if err != nil {
		panic(err)
	}
	defer s.Stop()

	s.Start()
}
服务端定时推送
package main

import (
	"container/list"
	"github.com/Allenxuxu/gev"
	"github.com/Allenxuxu/gev/connection"
	"github.com/Allenxuxu/ringbuffer"
	"log"
	"sync"
	"time"
)

type Server struct {
	conn   *list.List
	mu     sync.RWMutex
	server *gev.Server
}

func New(ip, port string) (*Server, error) {
	var err error
	s := new(Server)
	s.conn = list.New()
	s.server, err = gev.NewServer(s,
		gev.Address(ip+":"+port))
	if err != nil {
		return nil, err
	}

	return s, nil
}

func (s *Server) Start() {
	s.server.RunEvery(1*time.Second, s.RunPush)
	s.server.Start()
}

func (s *Server) Stop() {
	s.server.Stop()
}

func (s *Server) RunPush() {
	var next *list.Element

	s.mu.RLock()
	defer s.mu.RUnlock()

	for e := s.conn.Front(); e != nil; e = next {
		next = e.Next()

		c := e.Value.(*connection.Connection)
		_ = c.Send([]byte("hello\n"))
	}
}

func (s *Server) OnConnect(c *connection.Connection) {
	log.Println(" OnConnect : ", c.PeerAddr())

	s.mu.Lock()
	e := s.conn.PushBack(c)
	s.mu.Unlock()
	c.SetContext(e)
}
func (s *Server) OnMessage(c *connection.Connection, buffer *ringbuffer.RingBuffer) (out []byte) {
	log.Println("OnMessage")
	first, end := buffer.PeekAll()
	out = first
	if len(end) > 0 {
		out = append(out, end...)
	}
	buffer.RetrieveAll()
	return
}

func (s *Server) OnClose(c *connection.Connection) {
	log.Println("OnClose")
	e := c.Context().(*list.Element)

	s.mu.Lock()
	s.conn.Remove(e)
	s.mu.Unlock()
}

func main() {
	s, err := New("", "1833")
	if err != nil {
		panic(err)
	}
	defer s.Stop()

	s.Start()
}

参考

本项目受 evio 启发,参考 muduo 实现。

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Handler

type Handler interface {
	OnConnect(c *connection.Connection)
	OnMessage(c *connection.Connection, buffer *ringbuffer.RingBuffer) []byte
	OnClose(c *connection.Connection)
}

Handler Server 注册接口

type Option

type Option func(*Options)

Option ...

func Address

func Address(a string) Option

Address server 监听地址

func Network

func Network(n string) Option

Network [tcp] 暂时只支持tcp

func NumLoops

func NumLoops(n int) Option

NumLoops work eventloop 的数量

func ReusePort

func ReusePort(reusePort bool) Option

ReusePort 设置 SO_REUSEPORT

type Options

type Options struct {
	Network   string
	Address   string
	NumLoops  int
	ReusePort bool
	// contains filtered or unexported fields
}

Options 服务配置

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server gev Server

func NewServer

func NewServer(handler Handler, opts ...Option) (server *Server, err error)

NewServer 创建 Server

func (*Server) RunAfter

func (s *Server) RunAfter(d time.Duration, f func()) *timingwheel.Timer

RunAfter 延时任务

Example
handler := new(example)

s, err := NewServer(handler,
	Network("tcp"),
	Address(":1833"),
	NumLoops(8),
	ReusePort(true))
if err != nil {
	panic(err)
}

go s.Start()
defer s.Stop()

s.RunAfter(time.Second, func() {
	fmt.Println("RunAfter")
})

time.Sleep(2500 * time.Millisecond)
Output:

RunAfter

func (*Server) RunEvery

func (s *Server) RunEvery(d time.Duration, f func()) *timingwheel.Timer

RunEvery 定时任务

Example
handler := new(example)

s, err := NewServer(handler,
	Network("tcp"),
	Address(":1833"),
	NumLoops(8),
	ReusePort(true))
if err != nil {
	panic(err)
}

go s.Start()
defer s.Stop()

t := s.RunEvery(time.Second, func() {
	fmt.Println("EveryFunc")
})

time.Sleep(4500 * time.Millisecond)
t.Stop()
time.Sleep(4500 * time.Millisecond)
Output:

EveryFunc
EveryFunc
EveryFunc
EveryFunc

func (*Server) Start

func (s *Server) Start()

Start 启动 Server

func (*Server) Stop

func (s *Server) Stop()

Stop 关闭 Server

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL