arpc

package
v0.0.0-...-012d1c6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 19, 2021 License: Apache-2.0 Imports: 11 Imported by: 0

README

arpc asynchronous rpc

异步RPC通信框架,主要功能

  • 1:消息的粘包,以及编解码处理,使用私有协议
  • 2:消息路由的注册
  • 3:服务注册与发现
  • 4:客户端RPC调用
    • 消息重传
    • 客户端Load Balancer
    • 同步调用,异步调用支持

TODO

  • 需要提供一种机制,保证底层socket读写失败能通知上层处理异常
  • RPC Call时,超时不再是必须参数,需要底层能通过socket断开连接时,主动触发rpc失败回调
  • Retry机制梳理
  • registry支持Namespace,Zone等信息,Namespace可用于支持多环境,Zone可用于支持多区域,客户端选举时,可优先选举相同区域的,相同区域不存在再选择其他区域,以达到异地多活的效果
  • reigstry是否需要支持鉴权,如何支持?
  • registry改名为naming service?
  • selector优化,更加丰富的策略以及失败处理
  • 优雅下线

其他资料

Documentation

Index

Constants

View Source
const (
	HFAck         HeadFlag = 0  // 标识是否是消息应答,bool
	HFStatus               = 1  // 返回状态信息,空表示OK,string
	HFContentType          = 2  // 编码协议,0表示使用默认双方约定的协议,int
	HFSeqID                = 3  // RPC唯一ID,改用uint64,0表示无效,系统只会产生低32位ID,高32位可以自定义使用,比如proxy用于存储connID
	HFMsgID                = 4  // 静态唯一消息ID,非零值,负数表示系统消息,正数表示用户消息
	HFNameMethod           = 5  // 编码格式,以/开始表示Method,否则表示消息名
	HFService              = 6  // 调用服务名,string,和method合并成1个值?
	HFHeadMap              = 7  // 自定义消息头,map[string]string
	HFExtra                = 8  // 扩展字段,key<16
	HFMax                  = 15 // 最大可用位数
)
View Source
const (
	HFExtraTraceID   = 0
	HFExtraSpanID    = 1
	HFExtraRemoteIP  = 2
	HFExtraUserID    = 3
	HFExtraProjectID = 4
)

预定义extra枚举,外部可以自行定义

View Source
const (
	DefaultTTL = time.Second * 15
)
View Source
const HFExtraMask = ^uint16(1<<HFExtra - 1)
View Source
const HFExtraMax = 6

使用2个字节作为Flag标识,目前系统已经使用了9个,还剩7个可以自定义,取值范围[0-6] 服务器集群内经常使用的有TraceID,SpanID,RemoteIP,UserID,ProjectID等 客户端与服务器通信经常使用的有,Auth,Checksum

Variables

View Source
var (
	// 可以外部启动前指定修改
	IDMin = -100  // 最小消息ID,用于指定系统ID范围
	IDMax = 65535 // 最大消息ID
)
View Source
var (
	ErrNotSupport      = errors.New("not support")
	ErrNoCodec         = errors.New("no codec")
	ErrNoHandler       = errors.New("no handler")
	ErrInvalidID       = errors.New("invalid id")
	ErrInvalidHandler  = errors.New("invalid handler")
	ErrInvalidResponse = errors.New("invalid response")
	ErrInvalidFuture   = errors.New("invalid future")
	ErrTimeout         = errors.New("timeout")
	ErrNotFoundID      = errors.New("not found id")
	ErrInvalidParam    = errors.New("invalid param")
)

Functions

func DecodeBody

func DecodeBody(pkg Packet, msg interface{}) error

util help

func IsValidID

func IsValidID(id int) bool

func NewSequenceID

func NewSequenceID() uint64

func SetContextFactory

func SetContextFactory(fn ContextFactory)

func SetIDProvider

func SetIDProvider(p IDProvider)

func SetPacketFactory

func SetPacketFactory(fn PacketFactory)

func SetRouter

func SetRouter(r Router)

SetRouter 设置全局Router

func Use

func Use(middleware HandlerFunc)

Use 设置全局Middleware

Types

type Client

type Client interface {
	Init(opts ...Option) error
	Send(service string, req interface{}, opts ...MiscOption) error
	Call(service string, req interface{}, rsp interface{}, opts ...MiscOption) error
}

发送的消息可以是Packet,也可以是结构体指针, 如果是Packet,则需要自己确保填充ID,Method等信息 如果是结构体指针,底层会自动创建Packet,并填充ID,Method,Name等信息

Send函数:

发送消息,不关系返回结果

Call函数: RPC调用:分为三种形式 1:异步调用,rsp为回调函数

a:函数原型为:func(rsp *Response) error 或者func(ctx Context, rsp *Response) error

2:同步调用

a:rsp为需要返回的消息结构体指针,例如&EchoRsp{},底层会自动创建Feature

3:多个连续调用:限制要求,任意一次调用出错则全部失败,包括业务逻辑返回值

	例如: 需要连续请求A,B,C三个协议,但三个协议都返回后才能继续执行
	可以使用下面方法实现:
	a:外部手动创建一个Future名为f
	b:分别请求A,B,C协议,并使用f作为参数,底层会自动为Future调用Add
	c:调用f.Wait()方法,可以同步也可以异步
 需要特别注意:如果外部创建Future,则必须自己手动托管Wait调用

type Context

type Context interface {
	Init(conn anet.Conn, msg Packet)    // 初始化
	Free()                              // 释放,可用于pool回收
	Get(key string) (interface{}, bool) // 根据key获取数据
	Set(key string, val interface{})    // 根据key设置数据
	Data() interface{}                  // 自定义数据
	SetData(v interface{})              // 设置数据
	Error() error                       // 错误信息,比如Timeout
	SetError(err error)                 // 设置错误
	Handler() HandlerFunc               // 获取最终要执行的回调,可能为nil
	SetHandler(h HandlerFunc)           // 设置Handler
	SetMiddleware(h HandlerChain)       // 设置Middleware
	Conn() anet.Conn                    // 原始Socket
	Message() Packet                    // 消息
	Response() Packet                   // 应答消息
	SetResponse(rsp Packet)             // 按需设置Response
	Send(msg interface{}) error         // 发送消息,不关心返回结果
	Abort(err error)                    // 手动中止调用
	Next() error                        // 调用下一个,返回错误则自动中止
}

Handler 上下文

func NewContext

func NewContext() Context

type ContextFactory

type ContextFactory func() Context

type Future

type Future interface {
	Add()
	Done(err error)
	Wait() error
}

Future 用于异步RPC调用时,阻塞当前调用 需要能够支持同时多个,其中任意一次调用失败则全失败 调用Done时如果err不为nil,则会立刻唤醒Wait,并返回错误

type HandlerChain

type HandlerChain []HandlerFunc

type HandlerFunc

type HandlerFunc func(ctx Context) error

Handler 消息回调处理函数

type HeadFlag

type HeadFlag uint

type IDProvider

type IDProvider interface {
	SetBindName(flag bool)
	Register(name string, id int) error
	GetID(name string) int
	GetName(id int) string
	Fill(packet Packet, msg interface{}) error
}

IDProvider 用于消息名和ID一一映射 纯粹的RPC调用并不需要填充消息ID 但如果是以MsgID作为唯一标识的情况下,需要提供MsgID才能保证客户端能够查询到消息回调 在测试环境下,可以使用名字作为唯一标识,使用SetBindName设置绑定开关,默认false

func GetIDProvider

func GetIDProvider() IDProvider

type MessageID

type MessageID interface {
	MsgID() int
}

MessageID 用于通过反射识别消息是否提供了消息ID,从而避免通过Name映射查询ID 接口函数可以使用工具自动生成代码

type MiscOption

type MiscOption func(o *MiscOptions)

用于Send,Call,Register

type MiscOptions

type MiscOptions struct {
	selector.Options               // 用于调用Call时,指定服务发现策略
	Method           string        // 调用方法名
	ID               int           // 消息ID,非零值
	RetryNum         int           // 重试次数
	RetryCB          RetryFunc     // 重试回调函数
	TTL              time.Duration // 超时时间
	Future           Future        // 异步等待
	Response         interface{}   // callback
	Extra            interface{}   // 自定义扩展数据
}

func (*MiscOptions) Init

func (o *MiscOptions) Init(opts ...MiscOption)

type Option

type Option func(o *Options)

用于创建Server和Client

type Options

type Options struct {
	Context   context.Context   //
	Registry  registry.Registry //
	Tran      anet.Tran         //
	Name      string            // 服务名
	Id        string            // 服务ID
	Version   string            // 服务版本
	Address   string            // Listen使用
	Advertise string            // 注册服务使用
	Selector  selector.Selector // client load balance
	Proxy     string            // 代理服务名,空代表不使用代理
}

type Packet

type Packet interface {
	Reset()
	IsAck() bool
	SetAck(ack bool)
	Code() int
	Status() string
	SetStatus(code int, info string)
	ContentType() int
	SetContentType(ct int)
	SeqID() uint64
	SetSeqID(id uint64)
	MsgID() int
	SetMsgID(id int)
	Name() string
	SetName(name string)
	Method() string
	SetMethod(string)
	Service() string
	SetService(service string)
	Extra(key uint) string
	SetExtra(key uint, value string) error
	Head(key string) string
	SetHead(key string, value string)
	Body() interface{}
	SetBody(interface{})
	Codec() codec.Codec
	SetCodec(codec.Codec)
	Buffer() *buffer.Buffer
	SetBuffer(b *buffer.Buffer)
	// 不需要序列化字段
	Internal() interface{}
	SetInternal(interface{})
	// 编解码接口
	Encode(data *buffer.Buffer) error
	Decode(data *buffer.Buffer) error
}

私有通信协议 编码格式:Flag[2byte]+Head+Body Flag: 固定两个字节,每位标识对应的head是否有数据 Head:

	1:系统依赖必须的字段,类型固定:比如Ack,Status,ContentType,SequenceID,ID,Name,Method,Service,
	2:系统非必须但很常用,类型string:比如TraceID,SpanID,RemoteIP,UserID,Project,Auth,Checksum
 3:Key-Value类型Head:

Body:

	需要根据ContentType进行编解码,需要根据MsgID等信息查询到具体类型,因此解码需要分成两个接口
 body需要是个指针类型

Ack:是否是应答消息 Status:错误信息,status line格式,例如 "200 OK" ContentType使用枚举形式,默认protobuf和json SeqID:唯一序列号,用于RPC调用,全局唯一 MsgID:消息静态唯一ID,不超过65535 Name :消息名 Method:调用方法名 Service:服务类型,用于消息路由,也可以不使用此字段,而是自行根据消息ID分段或者自行编码 Extra: 扩展字段,使用者可自行定义含义, 使用int索引定位,不能超过7 Head:附加参数,kv结构,更加灵活,但是消耗也会更多,key要求不能含有|

有几个特殊字段,不需要进行编码通信,仅仅用于系统内部调度 Internal:用于系统扩展,可以透传任意数据

func NewPacket

func NewPacket() Packet

type PacketFactory

type PacketFactory func() Packet

type RetryFunc

type RetryFunc func(req Packet) time.Duration

重试回调函数,每次需要返回新的TTL,小于等于0则终止重试

type Router

type Router interface {
	Use(middleware ...HandlerFunc)
	Handle(ctx Context) error
	Register(cb interface{}, opts ...MiscOption) error
}

消息路由 消息类型上分为两种: 一:客户端请求消息,Ack为false,回调函数通常静态注册 二:服务器应答消息,Ack为true, 回调函数通常由调用Call时注册 应用场景上分两种: 一:普通的消息注册与查询 二:代理请求,通常只需要根据规则转发,通常使用全局静态函数,但是需要上下文参数

消息处理支持中间件,可用于异常处理,消息统计过滤,全局代理也可以使用中间件进行处理

func GetRouter

func GetRouter() Router

GetRouter 获取全局Router

type Server

type Server interface {
	Init(opts ...Option) error
	Start() error
	Stop() error
}

type Status

type Status struct {
	Code int
	Info string
}

状态信息,使用空格分开,不单纯使用Code,因为字符串能返回一些调试信息 空表示OK Code 与http状态码保存一致

func (*Status) Decode

func (s *Status) Decode(text string)

func (*Status) Encode

func (s *Status) Encode() string

Directories

Path Synopsis
filter
rc4
xor

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL