rpc

package
v0.0.0-...-cc3858d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 21, 2026 License: MIT Imports: 20 Imported by: 6

Documentation

Index

Constants

View Source
const (
	RpcMsgTagKey            = "RpcMsgTag"
	RpcMsgTag_Common        = 0
	RpcMsgTag_Sync          = 1
	RpcMsgTag_AsyncWaitResp = 2
	RpcMsgTag_AsyncNoResp   = 3 // push
)

Variables

View Source
var (
	ErrInvalidReqData  = errors.New("invalid request data")
	ErrWaitReplyExceed = errors.New("wait reply exceed")
	ErrRpcCallTimeout  = errors.New("rpc call timeout")
	ErrWriteClosed     = errors.New("write: client closed")
)
View Source
var File_rpc_message_proto protoreflect.FileDescriptor

Functions

func AsyncCallWithResp

func AsyncCallWithResp(ctx context.Context, cc *ClientConn, req proto.Message,
	outRsp proto.Message, outRet chan *AsyncCallResult, passData any, timeout time.Duration, md ...*Meta) (err error)

异步调用,带有回应

func AsyncCallWithoutResp

func AsyncCallWithoutResp(ctx context.Context, cc *ClientConn, req proto.Message, md ...*Meta) (err error)

异步调用,不需要回应

func CheckInternalIPForRpcAddr

func CheckInternalIPForRpcAddr(rpcAddr string) (string, error)

func GetInnerIPs

func GetInnerIPs() ([]string, error)

获取本机所有内网IP

func GetOneInnerIP

func GetOneInnerIP() (string, error)

获取本机内网IP

func SyncCall

func SyncCall(ctx context.Context, cc *ClientConn, req proto.Message, outRsp proto.Message, timeout time.Duration, md ...*Meta) (err error)

同步调用

Types

type AsyncCallResult

type AsyncCallResult struct {
	Err         error //出错
	Rsp         any   //proto.Message或[]byte
	RspMd       *Meta //回应的元数据
	PassThrough any   //CallOption.PassThrough
}

type CallOption

type CallOption struct {
	Async        bool                  //是否异步,默认同步调用
	Timeout      time.Duration         //同步、异步都有效
	AsyncRetChan chan *AsyncCallResult //异步调用的输出,不关注回应的话设置为nil
	ReqMd        *Meta                 //其他请求数据
	PassThrough  any                   //异步调用的透传数据
}

type CallResult

type CallResult struct {
	Err     error //出错
	RspData any   //proto.Message或[]byte
	RspMd   *Meta //回应的元数据
}

type ClientConn

type ClientConn struct {
	// contains filtered or unexported fields
}

func NewClientConn

func NewClientConn(network, address string, opt *ClientOptions) (*ClientConn, error)

func (*ClientConn) Close

func (c *ClientConn) Close() error

func (*ClientConn) Invoke

func (c *ClientConn) Invoke(ctx context.Context, service, method string, req any, outRsp proto.Message, opts ...*CallOption) (rspMd *Meta, rspData []byte, err error)

req:支持proto.Message和[]byte outRsp:req对应的response,调用者明确指定具体的proto.Message对象,该方法不会帮助创建response对象 rspData: 同步调用且outRsp为nil时,以[]byte返回rspData 默认是同步调用

type ClientConnInterface

type ClientConnInterface interface {
	// req:支持proto.Message和[]byte
	// outRsp:req对应的response,调用者明确指定具体的proto.Message对象,该方法不会帮助创建response对象
	// rspData: 同步调用且outRsp为nil时,以[]byte返回rspData
	// 默认是同步调用
	Invoke(ctx context.Context, service, method string, req any, outRsp proto.Message, opts ...*CallOption) (rspMd *Meta, rspData []byte, err error)
}

type ClientOptions

type ClientOptions struct {
	DailTimeout    time.Duration
	ReadTimeout    time.Duration
	WriteTimeout   time.Duration
	OnClientClose  func(netpoll.Connection) error
	MsgUnmarshaler Unmarshaler
	MsgMarshaler   Marshaler
}

type Marshaler

type Marshaler interface {
	Marshal(m proto.Message) ([]byte, error)
}

type Meta

type Meta struct {
	Kvs map[string]*MetaValues `` /* 133-byte string literal not displayed */
	// contains filtered or unexported fields
}

func (*Meta) AddInt

func (md *Meta) AddInt(key string, valInt int64)

func (*Meta) AddStr

func (md *Meta) AddStr(key string, valStr string)

func (*Meta) Del

func (md *Meta) Del(key string)

func (*Meta) Descriptor deprecated

func (*Meta) Descriptor() ([]byte, []int)

Deprecated: Use Meta.ProtoReflect.Descriptor instead.

func (*Meta) GetInt

func (md *Meta) GetInt(key string) int64

func (*Meta) GetKvs

func (x *Meta) GetKvs() map[string]*MetaValues

func (*Meta) GetStr

func (md *Meta) GetStr(key string) string

func (*Meta) IntValues

func (md *Meta) IntValues(key string) []int64

func (*Meta) ProtoMessage

func (*Meta) ProtoMessage()

func (*Meta) ProtoReflect

func (x *Meta) ProtoReflect() protoreflect.Message

func (*Meta) Reset

func (x *Meta) Reset()

func (*Meta) SetInt

func (md *Meta) SetInt(key string, valInt int64)

func (*Meta) SetStr

func (md *Meta) SetStr(key string, valStr string)

func (*Meta) StrValues

func (md *Meta) StrValues(key string) []string

func (*Meta) String

func (x *Meta) String() string

type MetaValues

type MetaValues struct {
	Ints []int64  `protobuf:"varint,1,rep,packed,name=ints,proto3" json:"ints,omitempty"`
	Strs []string `protobuf:"bytes,2,rep,name=strs,proto3" json:"strs,omitempty"`
	// contains filtered or unexported fields
}

func (*MetaValues) Descriptor deprecated

func (*MetaValues) Descriptor() ([]byte, []int)

Deprecated: Use MetaValues.ProtoReflect.Descriptor instead.

func (*MetaValues) GetInts

func (x *MetaValues) GetInts() []int64

func (*MetaValues) GetStrs

func (x *MetaValues) GetStrs() []string

func (*MetaValues) ProtoMessage

func (*MetaValues) ProtoMessage()

func (*MetaValues) ProtoReflect

func (x *MetaValues) ProtoReflect() protoreflect.Message

func (*MetaValues) Reset

func (x *MetaValues) Reset()

func (*MetaValues) String

func (x *MetaValues) String() string

type MethodDesc

type MethodDesc struct {
	MethodName string
	Handler    MethodHandler
}

type MethodHandler

type MethodHandler func(srv any) (proto.Message, MsgHandler)

type MsgHandler

type MsgHandler func(ctx context.Context, req proto.Message) (proto.Message, error)

type RPCReg

type RPCReg func(serv ServiceRegistrar, nodeName string) error

type RPCReq

type RPCReq func(ctx context.Context, cc *ClientConn) (resp proto.Message, err error)

type RpcClient

type RpcClient interface {
	Call(serviceName string, cb RPCReq) (resp proto.Message, err error)
}

type RpcContext

type RpcContext struct {
	Conn       *SvrWriter
	Seq        uint32
	MethodName string
	ReqMd      *Meta
	ReqMsg     proto.Message
	Handler    MsgHandler

	Reply    proto.Message
	ReplyErr error
	ReplyMd  *Meta
	// contains filtered or unexported fields
}

func (*RpcContext) SerializeResponse

func (rc *RpcContext) SerializeResponse()

type RpcHandler

type RpcHandler func(rc *RpcContext)

type RpcImp

type RpcImp struct {
	// contains filtered or unexported fields
}

func NewRpc

func NewRpc(pctx context.Context, rpcAddr, serviceGroup string, etcdConf *etcd.EtcdOptions, serverOpt *ServerOptions, cliOpt *ClientOptions) (*RpcImp, error)

func (*RpcImp) Call

func (imp *RpcImp) Call(serviceName string, cb RPCReq) (proto.Message, error)

func (*RpcImp) CallAll

func (imp *RpcImp) CallAll(serviceName string, cb RPCReq) ([]proto.Message, error)

func (*RpcImp) RegisterService

func (imp *RpcImp) RegisterService(serviceName string, unique bool, cb func(rpcSrv ServiceRegistrar, nodeName string) error) error

向etcd注册服务, unique=true:注册已存在的服务则返回错误

func (*RpcImp) Run

func (imp *RpcImp) Run() error

func (*RpcImp) Stop

func (imp *RpcImp) Stop() error

func (*RpcImp) UnregisterService

func (imp *RpcImp) UnregisterService(serviceName string) error

取消注册服务

type RpcRequestMessage

type RpcRequestMessage struct {
	Seq         uint32 `protobuf:"fixed32,1,opt,name=seq,proto3" json:"seq,omitempty"`
	ServiceName string `protobuf:"bytes,2,opt,name=serviceName,proto3" json:"serviceName,omitempty"`
	MethodName  string `protobuf:"bytes,3,opt,name=methodName,proto3" json:"methodName,omitempty"`
	Md          *Meta  `protobuf:"bytes,4,opt,name=md,proto3" json:"md,omitempty"`
	Payload     []byte `protobuf:"bytes,5,opt,name=payload,proto3" json:"payload,omitempty"`
	// contains filtered or unexported fields
}

func (*RpcRequestMessage) Descriptor deprecated

func (*RpcRequestMessage) Descriptor() ([]byte, []int)

Deprecated: Use RpcRequestMessage.ProtoReflect.Descriptor instead.

func (*RpcRequestMessage) GetMd

func (x *RpcRequestMessage) GetMd() *Meta

func (*RpcRequestMessage) GetMethodName

func (x *RpcRequestMessage) GetMethodName() string

func (*RpcRequestMessage) GetPayload

func (x *RpcRequestMessage) GetPayload() []byte

func (*RpcRequestMessage) GetSeq

func (x *RpcRequestMessage) GetSeq() uint32

func (*RpcRequestMessage) GetServiceName

func (x *RpcRequestMessage) GetServiceName() string

func (*RpcRequestMessage) ProtoMessage

func (*RpcRequestMessage) ProtoMessage()

func (*RpcRequestMessage) ProtoReflect

func (x *RpcRequestMessage) ProtoReflect() protoreflect.Message

func (*RpcRequestMessage) Reset

func (x *RpcRequestMessage) Reset()

func (*RpcRequestMessage) String

func (x *RpcRequestMessage) String() string

type RpcResponseMessage

type RpcResponseMessage struct {
	Seq     uint32 `protobuf:"fixed32,1,opt,name=seq,proto3" json:"seq,omitempty"`
	Ecode   int32  `protobuf:"varint,2,opt,name=ecode,proto3" json:"ecode,omitempty"`
	Error   string `protobuf:"bytes,3,opt,name=error,proto3" json:"error,omitempty"`
	Md      *Meta  `protobuf:"bytes,4,opt,name=md,proto3" json:"md,omitempty"`
	Payload []byte `protobuf:"bytes,5,opt,name=payload,proto3" json:"payload,omitempty"`
	// contains filtered or unexported fields
}

func (*RpcResponseMessage) Descriptor deprecated

func (*RpcResponseMessage) Descriptor() ([]byte, []int)

Deprecated: Use RpcResponseMessage.ProtoReflect.Descriptor instead.

func (*RpcResponseMessage) GetEcode

func (x *RpcResponseMessage) GetEcode() int32

func (*RpcResponseMessage) GetError

func (x *RpcResponseMessage) GetError() string

func (*RpcResponseMessage) GetMd

func (x *RpcResponseMessage) GetMd() *Meta

func (*RpcResponseMessage) GetPayload

func (x *RpcResponseMessage) GetPayload() []byte

func (*RpcResponseMessage) GetSeq

func (x *RpcResponseMessage) GetSeq() uint32

func (*RpcResponseMessage) ParserError

func (msg *RpcResponseMessage) ParserError() error

func (*RpcResponseMessage) ProtoMessage

func (*RpcResponseMessage) ProtoMessage()

func (*RpcResponseMessage) ProtoReflect

func (x *RpcResponseMessage) ProtoReflect() protoreflect.Message

func (*RpcResponseMessage) Reset

func (x *RpcResponseMessage) Reset()

func (*RpcResponseMessage) String

func (x *RpcResponseMessage) String() string

type RpcServer

type RpcServer interface {
	RegisterService(serviceName string, cb RPCReg) error
}

type Server

type Server struct {
	// contains filtered or unexported fields
}

func NewServer

func NewServer(opt *ServerOptions, ctx context.Context) (*Server, error)

func (*Server) RegisterService

func (s *Server) RegisterService(sd *ServiceDesc, ss any)

func (*Server) Run

func (s *Server) Run() error

func (*Server) Stop

func (s *Server) Stop(ctx context.Context) error

type ServerInterface

type ServerInterface interface {
	ServiceRegistrar
	Run() error
	Stop(context.Context) error
}

type ServerOptions

type ServerOptions struct {
	ListenAddr     string //在某些情况不能和rpcAddr一样,比如域名、k8s service,所以单独控制
	PollerNum      int
	BufferSize     int // size of a new connection's read LinkBuffer, netpoll default=8k
	PollOpts       []netpoll.Option
	HandlerFunc    RpcHandler
	MsgUnmarshaler Unmarshaler
	MsgMarshaler   Marshaler
	WriteChanSize  int
}

type ServiceDesc

type ServiceDesc struct {
	ServiceName string
	HandlerType any
	Methods     []MethodDesc
}

type ServiceRegistrar

type ServiceRegistrar interface {
	RegisterService(desc *ServiceDesc, impl any)
}

type SvrWriter

type SvrWriter struct {
	// contains filtered or unexported fields
}

func (*SvrWriter) IsActive

func (w *SvrWriter) IsActive() bool

func (*SvrWriter) IsClosed

func (w *SvrWriter) IsClosed() bool

type Unmarshaler

type Unmarshaler interface {
	Unmarshal(b []byte, m proto.Message) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL