gaio

package module
v1.2.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 29, 2021 License: MIT Imports: 13 Imported by: 7

README

gaio

GoDoc MIT licensed Build Status Go Report Card Coverage Statusd

gaio

Introduction

中文介绍

For a typical golang network program, you would first conn := lis.Accept() to get a connection and go func(net.Conn) to start a goroutine for handling the incoming data, then you would buf:=make([]byte, 4096) to allocate some buffer and finally waits on conn.Read(buf).

For a server holding >10K connections with frequent short messages(e.g. < 512B), cost for context switching is much more expensive than receiving message(a context switch needs at least 1000 CPU cycles or 600ns on 2.1GHz).

And by eliminating one goroutine per one connection scheme with Edge-Triggered IO Multiplexing, the 2KB(R)+2KB(W) per connection goroutine stack can be saved. By using internal swap buffer, buf:=make([]byte, 4096) can be saved(at the cost of performance).

gaio is an proactor pattern networking library satisfy both memory constraints and performance goals.

Features

  1. Tested in High Frequency Trading for handling HTTP requests for 30K~40K RPS on a single HVM server.
  2. Designed for >C10K concurrent connections, maximized parallelism, and nice single connection throughput.
  3. Read(ctx, conn, buffer) can be called with nil buffer to make use of internal swap buffer.
  4. Non-intrusive design, this library works with net.Listener and net.Conn. (with syscall.RawConn support), easy to be integrated into your existing software.
  5. Amortized context switching cost for tiny messages, able to handle frequent chat message exchanging.
  6. Application can decide when to delegate net.Conn to gaio, for example, you can delegate net.Conn to gaio after some handshaking procedure, or having some net.TCPConn settings done.
  7. Application can decide when to submit read or write requests, per-connection back-pressure can be propagated to peer to slow down sending. This features is particular useful to transmit data from A to B via gaio, which B is slower than A.
  8. Tiny, around 1000 LOC, easy to debug.
  9. Support for Linux, BSD.

Conventions

  1. Once you submit an async read/write requests with related net.Conn to gaio.Watcher, this conn will be delegated to gaio.Watcher at first submit. Future use of this conn like conn.Read or conn.Write will return error, but TCP properties set by SetReadBuffer(), SetWriteBuffer(), SetLinger(), SetKeepAlive(), SetNoDelay() will be inherited.
  2. If you decide not to use this connection anymore, you could call Watcher.Free(net.Conn) to close socket and free related resources immediately.
  3. If you forget to call Watcher.Free(net.Conn), runtime garbage collector will cleanup related system resources if nowhere in the system holds the net.Conn.
  4. If you forget to call Watcher.Close(), runtime garbage collector will cleanup ALL related system resources if nowhere in the system holds this Watcher.
  5. For connection Load-Balance, you can create multiple gaio.Watcher with your own strategy to distribute net.Conn.
  6. For acceptor Load-Balance, you can use go-reuseport as the listener.
  7. For read requests submitted with 'nil' buffer, the returning []byte from Watcher.WaitIO() is SAFE to use before next call to Watcher.WaitIO() returned.

TL;DR

package main

import (
        "log"
        "net"

        "github.com/xtaci/gaio"
)

// this goroutine will wait for all io events, and sents back everything it received
// in async way
func echoServer(w *gaio.Watcher) {
        for {
                // loop wait for any IO events
                results, err := w.WaitIO()
                if err != nil {
                        log.Println(err)
                        return
                }

                for _, res := range results {
                        switch res.Operation {
                        case gaio.OpRead: // read completion event
                                if res.Error == nil {
                                        // send back everything, we won't start to read again until write completes.
                                        // submit an async write request
                                        w.Write(nil, res.Conn, res.Buffer[:res.Size])
                                }
                        case gaio.OpWrite: // write completion event
                                if res.Error == nil {
                                        // since write has completed, let's start read on this conn again
                                        w.Read(nil, res.Conn, res.Buffer[:cap(res.Buffer)])
                                }
                        }
                }
        }
}

func main() {
        w, err := gaio.NewWatcher()
        if err != nil {
              log.Fatal(err)
        }
        defer w.Close()
	
        go echoServer(w)

        ln, err := net.Listen("tcp", "localhost:0")
        if err != nil {
                log.Fatal(err)
        }
        log.Println("echo server listening on", ln.Addr())

        for {
                conn, err := ln.Accept()
                if err != nil {
                        log.Println(err)
                        return
                }
                log.Println("new client", conn.RemoteAddr())

                // submit the first async read IO request
                err = w.Read(nil, conn, make([]byte, 128))
                if err != nil {
                        log.Println(err)
                        return
                }
        }
}

More examples
Push server package main
package main

import (
        "fmt"
        "log"
        "net"
        "time"

        "github.com/xtaci/gaio"
)

func main() {
        // by simply replace net.Listen with reuseport.Listen, everything is the same as in push-server
        // ln, err := reuseport.Listen("tcp", "localhost:0")
        ln, err := net.Listen("tcp", "localhost:0")
        if err != nil {
                log.Fatal(err)
        }

        log.Println("pushing server listening on", ln.Addr(), ", use telnet to receive push")

        // create a watcher
        w, err := gaio.NewWatcher()
        if err != nil {
                log.Fatal(err)
        }

        // channel
        ticker := time.NewTicker(time.Second)
        chConn := make(chan net.Conn)
        chIO := make(chan gaio.OpResult)

        // watcher.WaitIO goroutine
        go func() {
                for {
                        results, err := w.WaitIO()
                        if err != nil {
                                log.Println(err)
                                return
                        }

                        for _, res := range results {
                                chIO <- res
                        }
                }
        }()

        // main logic loop, like your program core loop.
        go func() {
                var conns []net.Conn
                for {
                        select {
                        case res := <-chIO: // receive IO events from watcher
                                if res.Error != nil {
                                        continue
                                }
                                conns = append(conns, res.Conn)
                        case t := <-ticker.C: // receive ticker events
                                push := []byte(fmt.Sprintf("%s\n", t))
                                // all conn will receive the same 'push' content
                                for _, conn := range conns {
                                        w.Write(nil, conn, push)
                                }
                                conns = nil
                        case conn := <-chConn: // receive new connection events
                                conns = append(conns, conn)
                        }
                }
        }()

        // this loop keeps on accepting connections and send to main loop
        for {
                conn, err := ln.Accept()
                if err != nil {
                        log.Println(err)
                        return
                }
                chConn <- conn
        }
}

Documentation

For complete documentation, see the associated Godoc.

Benchmarks

Test Case Throughput test with 64KB buffer
Description A client keep on sending 64KB bytes to server, server keeps on reading and sending back whatever it received, the client keeps on receiving whatever the server sent back until all bytes received successfully
Command go test -v -run=^$ -bench Echo
Macbook Pro 1695.27 MB/s 518 B/op 4 allocs/op
Linux AMD64 1883.23 MB/s 518 B/op 4 allocs/op
Raspberry Pi4 354.59 MB/s 334 B/op 4 allocs/op
Test Case 8K concurrent connection echo test
Description Start 8192 clients, each client send 1KB data to server, server keeps on reading and sending back whatever it received, the client keeps on receiving whatever the server sent back until all bytes received successfully.
Command go test -v -run=8k
Macbook Pro 1.09s
Linux AMD64 0.94s
Raspberry Pi4 2.09s
Regression

regression

X -> number of concurrent connections, Y -> time of completion in seconds

Best-fit values	 
Slope	8.613e-005 ± 5.272e-006
Y-intercept	0.08278 ± 0.03998
X-intercept	-961.1
1/Slope	11610
 
95% Confidence Intervals	 
Slope	7.150e-005 to 0.0001008
Y-intercept	-0.02820 to 0.1938
X-intercept	-2642 to 287.1
 
Goodness of Fit	 
R square	0.9852
Sy.x	0.05421
 
Is slope significantly non-zero?	 
F	266.9
DFn,DFd	1,4
P Value	< 0.0001
Deviation from horizontal?	Significant
 
Data	 
Number of XY pairs	6
Equation	Y = 8.613e-005*X + 0.08278

License

gaio source code is available under the MIT License.

References

Status

Stable

Documentation

Overview

Package gaio is an Async-IO library for Golang.

gaio acts in proactor mode, https://en.wikipedia.org/wiki/Proactor_pattern. User submit async IO operations and waits for IO-completion signal.

Index

Constants

View Source
const (
	EV_READ  = 0x1
	EV_WRITE = 0x2
)

Variables

View Source
var (
	// ErrUnsupported means the watcher cannot support this type of connection
	ErrUnsupported = errors.New("unsupported connection type")
	// ErrNoRawConn means the connection has not implemented SyscallConn
	ErrNoRawConn = errors.New("net.Conn does implement net.RawConn")
	// ErrWatcherClosed means the watcher is closed
	ErrWatcherClosed = errors.New("watcher closed")
	// ErrPollerClosed suggest that poller has closed
	ErrPollerClosed = errors.New("poller closed")
	// ErrConnClosed means the user called Free() on related connection
	ErrConnClosed = errors.New("connection closed")
	// ErrDeadline means the specific operation has exceeded deadline before completion
	ErrDeadline = errors.New("operation exceeded deadline")
	// ErrEmptyBuffer means the buffer is nil
	ErrEmptyBuffer = errors.New("empty buffer")
	// ErrCPUID indicates the given cpuid is invalid
	ErrCPUID = errors.New("no such core")
)

Functions

This section is empty.

Types

type OpResult

type OpResult struct {
	// Operation Type
	Operation OpType
	// User context associated with this requests
	Context interface{}
	// Related net.Conn to this result
	Conn net.Conn
	// Buffer points to user's supplied buffer or watcher's internal swap buffer
	Buffer []byte
	// IsSwapBuffer marks true if the buffer internal one
	IsSwapBuffer bool
	// Number of bytes sent or received, Buffer[:Size] is the content sent or received.
	Size int
	// IO error,timeout error
	Error error
}

OpResult is the result of an aysnc-io

type OpType added in v1.0.5

type OpType int

OpType defines Operation Type

const (
	// OpRead means the aiocb is a read operation
	OpRead OpType = iota
	// OpWrite means the aiocb is a write operation
	OpWrite
)

type Watcher

type Watcher struct {
	// contains filtered or unexported fields
}

Watcher will monitor events and process async-io request(s),

func NewWatcher added in v1.0.11

func NewWatcher() (*Watcher, error)

NewWatcher creates a management object for monitoring file descriptors with default internal buffer size - 64KB

func NewWatcherSize added in v1.1.7

func NewWatcherSize(bufsize int) (*Watcher, error)

NewWatcherSize creates a management object for monitoring file descriptors. 'bufsize' sets the internal swap buffer size for Read() with nil, 2 slices with'bufsize' will be allocated for performance.

func (Watcher) Close

func (w Watcher) Close() (err error)

Close stops monitoring on events for all connections

func (Watcher) Free added in v1.1.0

func (w Watcher) Free(conn net.Conn) error

Free let the watcher to release resources related to this conn immediately, like socket file descriptors.

func (Watcher) Read

func (w Watcher) Read(ctx interface{}, conn net.Conn, buf []byte) error

Read submits an async read request on 'fd' with context 'ctx', using buffer 'buf'. 'buf' can be set to nil to use internal buffer. 'ctx' is the user-defined value passed through the gaio watcher unchanged.

func (Watcher) ReadFull added in v1.2.7

func (w Watcher) ReadFull(ctx interface{}, conn net.Conn, buf []byte, deadline time.Time) error

ReadFull submits an async read request on 'fd' with context 'ctx', using buffer 'buf', and expects to fill the buffer before 'deadline'. 'ctx' is the user-defined value passed through the gaio watcher unchanged. 'buf' can't be nil in ReadFull.

func (Watcher) ReadTimeout added in v1.0.6

func (w Watcher) ReadTimeout(ctx interface{}, conn net.Conn, buf []byte, deadline time.Time) error

ReadTimeout submits an async read request on 'fd' with context 'ctx', using buffer 'buf', and expects to read some bytes into the buffer before 'deadline'. 'ctx' is the user-defined value passed through the gaio watcher unchanged.

func (Watcher) SetLoopAffinity added in v1.2.10

func (w Watcher) SetLoopAffinity(cpuid int) (err error)

Set Loop Affinity for syscall.Read/syscall.Write

func (Watcher) SetPollerAffinity added in v1.2.10

func (w Watcher) SetPollerAffinity(cpuid int) (err error)

Set Poller Affinity for Epoll/Kqueue

func (Watcher) WaitIO added in v1.0.5

func (w Watcher) WaitIO() (r []OpResult, err error)

WaitIO blocks until any read/write completion, or error. An internal 'buf' returned or 'r []OpResult' are safe to use BEFORE next call to WaitIO().

func (Watcher) Write

func (w Watcher) Write(ctx interface{}, conn net.Conn, buf []byte) error

Write submits an async write request on 'fd' with context 'ctx', using buffer 'buf'. 'ctx' is the user-defined value passed through the gaio watcher unchanged.

func (Watcher) WriteTimeout added in v1.0.6

func (w Watcher) WriteTimeout(ctx interface{}, conn net.Conn, buf []byte, deadline time.Time) error

WriteTimeout submits an async write request on 'fd' with context 'ctx', using buffer 'buf', and expects to complete writing the buffer before 'deadline', 'buf' can be set to nil to use internal buffer. 'ctx' is the user-defined value passed through the gaio watcher unchanged.

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL