rpcx

package module
v1.1.3-beta.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 28, 2019 License: MIT Imports: 12 Imported by: 0

README

rpcx

Easy to use and developer friendly RPC library

go report license Maintenance PRs Welcome Ask Me Anything !

Difference Between Other RPC Libraries

  • Once Callable established, local and remote are both in Full duplex mode.
  • No more client or server roles, Callables can call/called each others
  • Lots of Middlewares and developer can Custom their own middleware!
  • It works without any proto files , just define functions as usual.
  • Based on asynchronous I/O (liblpc) and provides synchronous call semantics
  • High Performace: event drivend, msgpack(for serialize),reuse context,goroutine pool...

Usage

import "github.com/gen-iot/rpcx"

Overview

  • Developer friendly
  • Middlewares: dump,proxy,recover,validate...

Getting Started

RPC Functions Formal

Remember :param ctx,err always required

  1. Both have in&out
func Function(ctx rpcx.Context, in InType)(out OutType, err error)
  1. Only err
func Function(ctx rpcx.Context)(err error)
  1. out and err
func Function(ctx rpcx.Context) (out OutType,err error)
  1. in and err
func Function(ctx rpcx.Context, in InType)(err error)
Create RPC
rpc, err := rpcx.New()
Close RPC
err := rpc.Close()
Register RPC Function
rpc, err := rpcx.New()
std.AssertError(err, "new rpc")
rpc.RegFuncWithName(
    "hello", // manual specified func name
    func(ctx rpcx.Context, msg string) (string, error) {
        return "hello from server", nil
    })
Connect To RPC
sockAddr, err := liblpc.ResolveTcpAddr("127.0.0.1")
std.AssertError(err,"resolve addr")
callable := rpc.NewClientCallable(sockAddr,nil)
callable.Start()
Add Exist Conn To RPC
// `fd` must be a valid file descriptor
callable := rpc.NewConnCallable(fd, nil)
callable.Start()
Invoke RPC Functions

Suppose there is a remote function:

func hello(ctx rpcx.Context)(string,error)

Now call it

out := new(string)
err := callable.Call3(time.Second*5, "hello", out)
std.AssertError(err, "call 'hello'")
fmt.Println("result:",*out)

Callable Calls

All Call[0-6] support Timeout And Middlewares

Function Header In Out
Call
Call0
Call1
Call2
Call3
Call4
Call5
Call6

Middleware

middlewares works like AOP as we know in java.

📌Import middlewares before use

import "github.com/gen-iot/rpcx/middleware"
RPC Core

📌middlewares apply on rpc core will affect whole rpc context

rpc, err := rpcx.New()
std.AssertError(err, "new rpc")
rpc.Use(
    middleware.Recover(true), // recover 
    middleware.ValidateStruct( // validator
        middleware.ValidateInOut,
        std.NewValidator(std.LANG_EN)),
)
RPC Functions
rpc, err := rpcx.New()
std.AssertError(err, "new rpc")
rpc.RegFuncWithName("hello",
    func(ctx rpcx.Context, msg string) (string, error) {
        return "hello from server", nil
    },
    middleware.LoginRequred(),  // require logined
    middleware.MustAdmin(),       // require admin role
)

Callable
err := callable.Call("hello",
    middleware.Recover(true), // recover 
)

More Example

try examples

License

Released under the MIT License

Documentation

Index

Constants

View Source
const RpcLoopDefaultBufferSize = 1024 * 1024 * 4

Variables

View Source
var Debug = true

noinspection GoUnusedGlobalVariable

View Source
var ErrNeedMore = errors.New("codec want read more bytes")

Functions

This section is empty.

Types

type Callable

type Callable interface {
	liblpc.BucketEntry
	liblpc.UserDataStorage

	Start()

	Call(timeout time.Duration, name string, mids ...MiddlewareFunc) error
	Call0(timeout time.Duration, name string, headers RpcMsgHeader, mids ...MiddlewareFunc) (ackHeader RpcMsgHeader, err error)

	Call1(timeout time.Duration, name string, in interface{}, mids ...MiddlewareFunc) error
	Call2(timeout time.Duration, name string, headers RpcMsgHeader, in interface{}, mids ...MiddlewareFunc) (ackHeader RpcMsgHeader, err error)

	Call3(timeout time.Duration, name string, out interface{}, mids ...MiddlewareFunc) error
	Call4(timeout time.Duration, name string, headers RpcMsgHeader, out interface{}, mids ...MiddlewareFunc) (ackHeader RpcMsgHeader, err error)

	Call5(timeout time.Duration, name string, in, out interface{}, mids ...MiddlewareFunc) error
	Call6(timeout time.Duration, name string, headers RpcMsgHeader, in, out interface{}, mids ...MiddlewareFunc) (ackHeader RpcMsgHeader, err error)

	Perform(timeout time.Duration, ctx Context)

	SetOnReady(cb CallableCallback)
	SetOnClose(cb CallableCallback)

	BindTimeWheel(timeWheel *liblpc.TimeWheel)
	NotifyTimeWheel()
}

type CallableCallback

type CallableCallback func(callable Callable, err error)

type ClientCallableOnConnect

type ClientCallableOnConnect func(callable Callable, err error)

type Context

type Context interface {
	Callable() Callable

	Id() string

	SetMethod(string)
	Method() string

	LocalFuncDesc() FuncDesc

	SetRequestHeader(header RpcMsgHeader)
	RequestHeader() RpcMsgHeader

	SetResponseHeader(header RpcMsgHeader)
	ResponseHeader() RpcMsgHeader

	SetRequest(in interface{})
	Request() (in interface{})

	SetResponse(out interface{})
	Response() (out interface{})

	RequestType() reflect.Type
	ResponseType() reflect.Type

	SetError(err error)
	Error() error

	SetWriter(w Writer)
	Writer() Writer

	AddDefer(deferFunc func())

	liblpc.UserDataStorage
}

type FuncDesc

type FuncDesc uint8
const (
	ReqHasData FuncDesc = 0x01
	RspHasData FuncDesc = 0x02
)

type HandleFunc

type HandleFunc func(ctx Context)

type MiddlewareFunc

type MiddlewareFunc func(next HandleFunc) HandleFunc

type RPC

type RPC struct {
	// contains filtered or unexported fields
}

func New

func New() (*RPC, error)

func (*RPC) Close

func (this *RPC) Close() error

func (*RPC) Len

func (this *RPC) Len() int

func (*RPC) Loop

func (this *RPC) Loop() liblpc.EventLoop

func (*RPC) NewClientCallable

func (this *RPC) NewClientCallable(
	addr *liblpc.SyscallSockAddr,
	userData interface{},
	m ...MiddlewareFunc) (Callable, error)

func (*RPC) NewConnCallable

func (this *RPC) NewConnCallable(fd int, userData interface{}, m ...MiddlewareFunc) Callable

func (*RPC) PreUse

func (this *RPC) PreUse(m ...MiddlewareFunc)

func (*RPC) RegFunc

func (this *RPC) RegFunc(f interface{}, m ...MiddlewareFunc)

func (*RPC) RegFuncWithName

func (this *RPC) RegFuncWithName(fname string, f interface{}, m ...MiddlewareFunc)

func (*RPC) Run

func (this *RPC) Run(ctx context.Context)

func (*RPC) Start

func (this *RPC) Start(ctx context.Context)

func (*RPC) Use

func (this *RPC) Use(m ...MiddlewareFunc)

type RpcMsgHeader

type RpcMsgHeader = map[string]string

type SignalCallable

type SignalCallable struct {
	Callable
	// contains filtered or unexported fields
}

func NewSignalCallable

func NewSignalCallable(call Callable) *SignalCallable

func (*SignalCallable) Close

func (this *SignalCallable) Close() error

func (*SignalCallable) CloseSignal

func (this *SignalCallable) CloseSignal() <-chan error

usage : err,ok := CloseSignal(); !ok -> return , ok -> check error

func (*SignalCallable) ReadySignal

func (this *SignalCallable) ReadySignal() <-chan error

usage : err,ok := ReadySignal(); !ok -> return , ok -> check error

type TimeWheelEntryImpl

type TimeWheelEntryImpl struct {
	io.Closer
	// contains filtered or unexported fields
}

func (*TimeWheelEntryImpl) BindTimeWheel

func (this *TimeWheelEntryImpl) BindTimeWheel(timeWheel *liblpc.TimeWheel)

not thread safe

func (*TimeWheelEntryImpl) GetRefCounter

func (this *TimeWheelEntryImpl) GetRefCounter() *int32

func (*TimeWheelEntryImpl) NotifyTimeWheel

func (this *TimeWheelEntryImpl) NotifyTimeWheel()

type Writer

type Writer interface {
	Write(data []byte, inLoop bool)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL