iouring

package module
Version: v0.0.0-...-1de0d3d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 12, 2021 License: MIT Imports: 13 Imported by: 0

README

What is io_uring

io_uring

io_uring-wahtsnew

LWN io_uring

Lord of the io_uring

【译】高性能异步 IO —— io_uring (Effecient IO with io_uring)

Go 与异步 IO - io_uring 的思考

Features

  • register a file set for io_uring instance
  • support file IO
  • support socket IO
  • support IO timeout
  • link request
  • set timer
  • add request extra info, could get it from the result
  • set logger
  • register buffers and IO with buffers
  • support SQPoll

OS Requirements

  • Linux Kernel >= 5.6

Installation

go get github.com/iceber/iouring-go

doc

Quickstart

package main

import (
        "fmt"
        "os"

        "github.com/iceber/iouring-go"
)

var str = "io with iouring"

func main() {
        iour, err := iouring.New(1)
        if err != nil {
                panic(fmt.Sprintf("new IOURing error: %v", err))
        }
        defer iour.Close()

        file, err := os.Create("./tmp.txt")
        if err != nil {
                panic(err)
        }

        ch := make(chan iouring.Result, 1)

        prepRequest := iouring.Write(int(file.Fd()), []byte(str))
        if _, err := iour.SubmitRequest(prepRequest, ch); err != nil {
                panic(err)
        }

        result := <-ch
        i, err := result.ReturnInt()
        if err != nil {
                fmt.Println("write error: ", err)
                return
        }

        fmt.Printf("write byte: %d\n", i)
}

Request With Extra Info

prepRequest := iouring.Write(int(file.Fd()), []byte(str)).WithInfo(file.Name())

request, err := iour.SubmitRequest(prepRequest, nil)
if err != nil {
    panic(err)
}

<- request.Done()
info, ok := request.GetRequestInfo().(string)

Cancel Request

prepR := iouring.Timeout(5 * time.Second)
request, err := iour.SubmitRequest(prepR, nil)
if err != nil {
    panic(err)
}

if _, err := request.Cancel(); err != nil{
    fmt.Printf("cancel request error: %v\n", err)
    return
}

<- request.Done()
if err := request.Err(); err != nil{
    if err == iouring.ErrRequestCanceled{
        fmt.Println("request is canceled"0
        return
    }
    fmt.Printf("request error: %v\n", err)
    return
}

Submit multitude request

var offset uint64
buf1 := make([]byte, 1024)
prep1:= iouring.Pread(fd, buf1, offset)

offset += 1024
buf2 := make([]byte, 1024)
prep2:= iouring.Pread(fd, buf1, offset)

requests, err := iour.SubmitRequests([]iouring.PrepRequest{prep1,prep2}, nil)
if err != nil{
    panic(err)
}
<- requests.Done()
fmt.Println("requests are completed")

requests is concurrent execution

var offset uint64
buf := make([]byte, 1024)
prep1 := iouring.Pread(fd, buf1, offset)
prep2 := iouring.Write(int(os.Stdout.Fd()), buf)

iour.SubmitLinkRequests([]iouring.PrepRequest{prep1, prep2}, nil)

Examples

cat

concurrent-cat

cp

request-with-timeout

link-request

link-with-timeout

timer

echo

echo-with-callback

TODO

  • add tests
  • arguments type (eg. int and int32)
  • set logger

Documentation

Index

Constants

View Source
const (
	OpNop uint8 = iota
	OpReadv
	OpWritev
	OpFsync
	OpReadFixed
	OpWriteFixed
	OpPollAdd
	OpPollRemove
	OpSyncFileRange
	OpSendmsg
	OpRecvmsg
	OpTimeout
	OpTimeoutRemove
	OpAccept
	OpAsyncCancel
	OpLinkTimeout
	OpConnect
	OpFallocate
	OpOpenat
	OpClose
	OpFilesUpdate
	OpStatx
	OpRead
	OpWrite
	OpFadvise
	OpMadvise
	OpSend
	OpRecv
	OpOpenat2
	OpEpollCtl
	OpSplice
	OpProvideBuffers
	OpRemoveBuffers
	OpTee
	OpShutdown
)

iouring operations

View Source
const (
	RequestCanceledSuccessfully = 0
	RequestMaybeCanceled        = 1
)

cancel operation return value

View Source
const (
	TimeoutExpiration = 0
	CountCompletion   = 1
)

timeout operation return value

Variables

View Source
var (
	ErrIOURingClosed = errors.New("iouring closed")

	ErrRequestCanceled     = errors.New("request is canceled")
	ErrRequestNotFound     = errors.New("request is not found")
	ErrRequestCompleted    = errors.New("request has already been completed")
	ErrRequestNotCompleted = errors.New("request is not completed")
	ErrNoRequestCallback   = errors.New("no request callback")

	ErrUnregisteredFile = errors.New("file is unregistered")
)

Functions

This section is empty.

Types

type CompletionQueue

type CompletionQueue struct {
	// contains filtered or unexported fields
}

type FileRegister

type FileRegister interface {
	GetFileIndex(fd int32) (int, bool)
	RegisterFile(fd int32) error
	RegisterFiles(fds []int32) error
	UnregisterFile(fd int32) error
	UnregisterFiles(fds []int32) error
}

type IOURing

type IOURing struct {
	Flags    uint32
	Features uint32
	// contains filtered or unexported fields
}

IOURing contains iouring_syscall submission and completion queue. It's safe for concurrent use by multiple goroutines.

func New

func New(entries uint, opts ...IOURingOption) (*IOURing, error)

New return a IOURing instance by IOURingOptions

func (*IOURing) Close

func (iour *IOURing) Close() error

Close IOURing

func (*IOURing) FileRegister

func (iour *IOURing) FileRegister() FileRegister

func (*IOURing) GetFixedFileIndex

func (iour *IOURing) GetFixedFileIndex(file *os.File) (int, bool)

func (*IOURing) IsClosed

func (iour *IOURing) IsClosed() (closed bool)

IsClosed IOURing is closed

func (*IOURing) Pread

func (iour *IOURing) Pread(file *os.File, b []byte, offset uint64, ch chan<- Result) (Request, error)

func (*IOURing) Pwrite

func (iour *IOURing) Pwrite(file *os.File, b []byte, offset uint64, ch chan<- Result) (Request, error)

func (*IOURing) Read

func (iour *IOURing) Read(file *os.File, b []byte, ch chan<- Result) (Request, error)

func (*IOURing) RegisterBuffers

func (iour *IOURing) RegisterBuffers(bs [][]byte) error

func (*IOURing) RegisterFile

func (iour *IOURing) RegisterFile(file *os.File) error

func (*IOURing) RegisterFiles

func (iour *IOURing) RegisterFiles(files []*os.File) error

func (*IOURing) Size

func (iour *IOURing) Size() int

Size iouring submission queue size

func (*IOURing) SubmitHardLinkRequests

func (iour *IOURing) SubmitHardLinkRequests(requests []PrepRequest, ch chan<- Result) (RequestSet, error)

func (*IOURing) SubmitLinkRequests

func (iour *IOURing) SubmitLinkRequests(requests []PrepRequest, ch chan<- Result) (RequestSet, error)

func (*IOURing) SubmitRequest

func (iour *IOURing) SubmitRequest(request PrepRequest, ch chan<- Result) (Request, error)

SubmitRequest by Request function and io result is notified via channel return request id, can be used to cancel a request

func (*IOURing) SubmitRequests

func (iour *IOURing) SubmitRequests(requests []PrepRequest, ch chan<- Result) (RequestSet, error)

SubmitRequests by Request functions and io results are notified via channel

func (*IOURing) UnRegisterBuffers

func (iour *IOURing) UnRegisterBuffers() error

func (*IOURing) UnregisterFile

func (iour *IOURing) UnregisterFile(file *os.File) error

func (*IOURing) UnregisterFiles

func (iour *IOURing) UnregisterFiles(files []*os.File) error

func (*IOURing) Write

func (iour *IOURing) Write(file *os.File, b []byte, ch chan<- Result) (Request, error)

type IOURingOption

type IOURingOption func(*IOURing)

func WithAsync

func WithAsync() IOURingOption

func WithAttachWQ

func WithAttachWQ(iour *IOURing) IOURingOption

WithAttachWQ new iouring instance being create will share the asynchronous worker thread backend of the specified io_uring ring, rather than create a new separate thread pool

func WithCQSize

func WithCQSize(size uint32) IOURingOption

WithCQSize create the completion queue with size entries size must bue greater than entries

func WithDisableRing

func WithDisableRing() IOURingOption

WithDisableRing the io_uring ring starts in a disabled state In this state, restrictions can be registered, but submissions are not allowed Available since 5.10

func WithDrain

func WithDrain() IOURingOption

WithDrain every SQE will not be started before previously submitted SQEs have completed

func WithParams

func WithParams(params *iouring_syscall.IOURingParams) IOURingOption

WithParams use params

func WithSQPoll

func WithSQPoll() IOURingOption

WithSQPoll a kernel thread is created to perform submission queue polling In Version 5.10 and later, allow using this as non-root, if the user has the CAP_SYS_NICE capability

func WithSQPollThreadCPU

func WithSQPollThreadCPU(cpu uint32) IOURingOption

WithSQPollThreadCPU the poll thread will be bound to the cpu set, only meaningful when WithSQPoll option

func WithSQPollThreadIdle

func WithSQPollThreadIdle(idle time.Duration) IOURingOption

type PrepRequest

type PrepRequest func(sqe *iouring_syscall.SubmissionQueueEntry, userData *UserData)

func Accept

func Accept(sockfd int) PrepRequest

func Accept4

func Accept4(sockfd int, flags int) PrepRequest

func Close

func Close(fd int) PrepRequest

func Connect

func Connect(sockfd int, sa syscall.Sockaddr) (PrepRequest, error)

func CountCompletionEvent

func CountCompletionEvent(n uint64) PrepRequest

func EpollCtl

func EpollCtl(epfd int, op int, fd int, event *syscall.EpollEvent) PrepRequest

func Fallocate

func Fallocate(fd int, mode uint32, off int64, length int64) PrepRequest

func Fdatasync

func Fdatasync(fd int) PrepRequest

func Fsync

func Fsync(fd int) PrepRequest

func Madvise

func Madvise(b []byte, advice int) PrepRequest

func Nop

func Nop() PrepRequest

func Openat

func Openat(dirfd int, path string, flags uint32, mode uint32) (PrepRequest, error)

func Openat2

func Openat2(dirfd int, path string, how *unix.OpenHow) (PrepRequest, error)

func Pread

func Pread(fd int, b []byte, offset uint64) PrepRequest

func Preadv

func Preadv(fd int, bs [][]byte, offset uint64) PrepRequest

func Pwrite

func Pwrite(fd int, b []byte, offset uint64) PrepRequest

func Pwritev

func Pwritev(fd int, bs [][]byte, offset int64) PrepRequest

func Read

func Read(fd int, b []byte) PrepRequest

func Readv

func Readv(fd int, bs [][]byte) PrepRequest

func Recv

func Recv(sockfd int, b []byte, flags int) PrepRequest

func Recvmsg

func Recvmsg(sockfd int, p, oob []byte, to syscall.Sockaddr, flags int) (PrepRequest, error)

func RemoveTimeout

func RemoveTimeout(id uint64) PrepRequest

func Send

func Send(sockfd int, b []byte, flags int) PrepRequest

func Sendmsg

func Sendmsg(sockfd int, p, oob []byte, to syscall.Sockaddr, flags int) (PrepRequest, error)

func Statx

func Statx(dirfd int, path string, flags uint32, mask int, stat *unix.Statx_t) (PrepRequest, error)

func Timeout

func Timeout(t time.Duration) PrepRequest

func TimeoutWithTime

func TimeoutWithTime(t time.Time) (PrepRequest, error)

func Write

func Write(fd int, b []byte) PrepRequest

func Writev

func Writev(fd int, bs [][]byte) PrepRequest

func (PrepRequest) WithCallback

func (prepReq PrepRequest) WithCallback(callback RequestCallback) PrepRequest

func (PrepRequest) WithDrain

func (prepReq PrepRequest) WithDrain() PrepRequest

func (PrepRequest) WithInfo

func (prepReq PrepRequest) WithInfo(info interface{}) PrepRequest

WithInfo request with extra info

func (PrepRequest) WithTimeout

func (prepReq PrepRequest) WithTimeout(timeout time.Duration) []PrepRequest

type Request

type Request interface {
	Result

	Cancel() (Request, error)
	Done() <-chan struct{}

	GetRes() (int, error)
	// Can Only be used in ResultResolver
	SetResult(r0, r1 interface{}, err error) error
}

type RequestCallback

type RequestCallback func(result Result) error

type RequestSet

type RequestSet interface {
	Len() int
	Done() <-chan struct{}
	Requests() []Request
	ErrResults() []Result
}

type Result

type Result interface {
	Fd() int
	Opcode() uint8
	GetRequestBuffer() (b0, b1 []byte)
	GetRequestBuffers() [][]byte
	GetRequestInfo() interface{}
	FreeRequestBuffer()

	Err() error
	ReturnValue0() interface{}
	ReturnValue1() interface{}
	ReturnFd() (int, error)
	ReturnInt() (int, error)

	Callback() error
}

type ResultResolver

type ResultResolver func(req Request)

type SubmissionQueue

type SubmissionQueue struct {
	// contains filtered or unexported fields
}

type UserData

type UserData struct {
	// contains filtered or unexported fields
}

func (*UserData) Hold

func (data *UserData) Hold(vars ...interface{})

func (*UserData) SetRequestBuffer

func (data *UserData) SetRequestBuffer(b0, b1 []byte)

func (*UserData) SetRequestBuffers

func (data *UserData) SetRequestBuffers(bs [][]byte)

func (*UserData) SetRequestCallback

func (data *UserData) SetRequestCallback(callback RequestCallback)

func (*UserData) SetRequestInfo

func (data *UserData) SetRequestInfo(info interface{})

func (*UserData) SetResultResolver

func (data *UserData) SetResultResolver(resolver ResultResolver)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL