atreus

package
v0.0.0-...-2ebd195 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 29, 2022 License: MIT Imports: 59 Imported by: 0

README

简介

atreus 是一个基于 gRPC 封装的脚手架

0x01 错误处理(规范)

服务端错误返回

服务端错误返回需要使用如下代码完成,其中第一个参数来源于 官方,第二个参数为自定义,实现 代码

return status.Error(codes.Internal, pyerrors.InternalError)
客户端错误返回

需要纳入熔断错误计算的类型(超时类、服务器错误等):

  • codes.Unknown:异常错误(recover 拦截器)
  • codes.DeadlineExceeded:服务端 ctx 超时(timeout 拦截器)
  • codes.ResourceExhausted:服务端限速丢弃(limiter 拦截器)

不纳入的(逻辑错误等):

  • codes.InvalidArgument:非法参数(ACL 拦截器)
  • codes.Unauthenticated:未认证(auth 拦截器)
  • codes.InvalidArgument:非法参数(validator 拦截器)
错误的生成方式

所有的错误按照如下建议生成:

  1. 只使用 github.com/pkg/errors 定义的方法来生成(封装)错误
  2. 拦截器中的错误生成,只使用如下方式,不自己构造错误返回:
    • 调用 google.golang.org/grpc/status 包的 Error 方法构造,如 status.Error(codes.InvalidArgument, err.Error()),第一个参数为 grpc 标准错误类型,第二个参数为业务错误,可选为想给用户返回的错误字符串、或者 atreus 全局错误 errcode.MethodNotAllowed.Error()
    • 直接返回 atreus 定义的全局错误,比如服务端限流拦截器返回限流时,直接返回系统默认的错误 errcode.ServiceUnavailable;服务端发生 panic 时,直接返回 ecode.ServerErr,客户端发生熔断丢包时,直接返回 ecode.ServiceUnavailable 即可
    • 如前所说,错误的生成会熔断的错误计数,特别要注意

0x02 参数校验:go.validator 接入 proto 的步骤

使用 github.com/mwitkow/go-proto-validators/protoc-gen-govalidators 包
  1. 设置 GOPATH,如本机的 GOPATH 地址为 /root/go/

  2. 下载 https://github.com/protocolbuffers/protobuf 项目

  3. protobuf/src/* 目录复制到 GOPATH 中的如下路径:

cp src/ ${GOPATH}/src/github.com/google/protobuf/src -r
cp src/ /root/go/src/github.com/google/protobuf/ -r
  1. 下载 protoc-gen-govalidators 包:
go get github.com/mwitkow/go-proto-validators/protoc-gen-govalidators
  1. 编写 proto 文件,注意添加 validator.proto 包及协议字段的 validato 规则
syntax = "proto3";

// protoc -I=. *.proto --go_out=plugins=grpc:.

option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";

import "github.com/mwitkow/go-proto-validators/validator.proto";

package proto;

//a common RPC names with Serivce suffix
service GreeterService {
    rpc SayHello (HelloRequest) returns (HelloReply) {}
    //rpc ErrorSayHello(HelloRequest) returns  (HelloReply) {}
}

message HelloRequest {
    string name = 1 [(validator.field) = {regex: "^[a-z]{2,5}$"}];
}

message HelloReply {
    string message = 1;
}
  1. 生成 pb.govalidator.pb.go 文件,完成:
protoc    --proto_path=${GOPATH}/src   --proto_path=${GOPATH}/src/github.com/google/protobuf/src   --proto_path=.   --go_out=.   --govalidators_out=. --go_out=plugins=grpc:./   *.proto
使用 github.com/envoyproxy/protoc-gen-validate
  1. 安装 protoc-gen-validate 工具
go get -d github.com/envoyproxy/protoc-gen-validate
cd ${GOPATH}/src/github.com/envoyproxy/protoc-gen-validate
make build

安装成功:

GOBIN=/root/go/src/github.com/envoyproxy/protoc-gen-validate/bin go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.27.1
protoc -I . \
        --plugin=protoc-gen-go=/root/go//bin/protoc-gen-go \
        --go_opt=paths=source_relative \
        --go_out="Mvalidate/validate.proto=github.com/envoyproxy/protoc-gen-validate/validate,Mgoogle/protobuf/any.proto=google.golang.org/protobuf/types/known/anypb,Mgoogle/protobuf/duration.proto=google.golang.org/protobuf/types/known/durationpb,Mgoogle/protobuf/struct.proto=google.golang.org/protobuf/types/known/structpb,Mgoogle/protobuf/timestamp.proto=google.golang.org/protobuf/types/known/timestamppb,Mgoogle/protobuf/wrappers.proto=google.golang.org/protobuf/types/known/wrapperspb,Mgoogle/protobuf/descriptor.proto=google.golang.org/protobuf/types/descriptorpb:." validate/validate.proto
go install .
go: downloading github.com/lyft/protoc-gen-star v0.5.3
go: downloading github.com/iancoleman/strcase v0.2.0
  1. 编写 proto 文件,如下:
syntax = "proto3";

// protoc -I=. *.proto --go_out=plugins=grpc:.

option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";


import "validate/validate.proto";

package proto;

//a common RPC names with Serivce suffix
service GreeterService {
    rpc SayHello (HelloRequest) returns (HelloReply) {}
    //rpc ErrorSayHello(HelloRequest) returns  (HelloReply) {}
}

message HelloRequest {
    string name = 1  [(validate.rules).string = {
                      pattern:   "^[a-z]{2,5}$",
                      max_bytes: 256,
                   }];
}

message HelloReply {
    string message = 1;
}
  1. 编译 proto 文件,生成 pb.gopb.validator.go ,完成。
protoc   -I .   -I ${GOPATH}/src   -I ${GOPATH}/src/github.com/envoyproxy/protoc-gen-validate     --validate_out="lang=go:." --go_out=plugins=grpc:./   *.proto
  1. 规则可见:https://github.com/envoyproxy/protoc-gen-validate#constraint-rules

Documentation

Index

Constants

View Source
const (
	DefaultAtreusReqIDKey  = "atreus-requestid"
	DefaultAtreusReqIDVal  = "atreus-requestid-value"
	DefaultAtreusReqIDName = "atreus-reqid-name"
)
View Source
const (
	DEFAULT_ATREUS_SERVICE_NAME = "atreus_svc"
	DEFAULT_TIME_TO_QUIT        = 5 * time.Second
)
View Source
const (
	MAX_STACK_SIZE = 2048
)

Variables

View Source
var DefaultAtreusReqIDSKey = globalReqIDKey{}

Functions

func CloneClientOutgoingData

func CloneClientOutgoingData(ctx context.Context) metadata.MD

metadata API for client

func CloneServerIncomingData

func CloneServerIncomingData(ctx context.Context) metadata.MD

metadata API for server

func ConvertNormalError

func ConvertNormalError(svrErr error) (gst *status.Status)

ConvertNormalError convert error for service reply and try to convert it to grpc.Status.

func GetClientIP

func GetClientIP(ctx context.Context) (string, error)

获取调用端IP

func GetGlobalReqIDFromContext

func GetGlobalReqIDFromContext(ctx context.Context) string

func IsContextError

func IsContextError(err error) bool

判断 err 是否为 ctx 错误(DeadlineExceeded || Canceled)

func Recovery

func Recovery(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error)

Recovery interceptor:必须放在第 0 号链位置

func TimingOld

func TimingOld(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error)

计时(最后一个拦截器)

func ToErrEcode

func ToErrEcode(gst *status.Status) errcode.Codes

ToErrEcode convert grpc.status to ecode.Codes 外部接口

func TransContextErr2GrpcErr

func TransContextErr2GrpcErr(err error) error

转换 ctx 错误为 grpc 的标注错误

Types

type AtreusReqId

type AtreusReqId string

func NewAtreusReqId

func NewAtreusReqId(name string) AtreusReqId

func (*AtreusReqId) Validate

func (d *AtreusReqId) Validate() bool

校验reqid

type Client

type Client struct {
	Logger        *zap.Logger
	Conf          *config.AtreusCliConfig //客户端配置
	Lock          *sync.RWMutex
	DialOpts      []grpc.DialOption             //grpc-客户端option
	InnerHandlers []grpc.UnaryClientInterceptor //GRPC拦截器数组

	CliResolver dis.ServiceResolverWrapper

	//sony breaker
	CbBreakerMap    map[string]*gobreaker.CircuitBreaker
	CbBreakerConfig gobreaker.Settings //这里暂时全局配置

	//retry
	MaxRetry int

	RpcPersistClient *grpc.ClientConn
	// contains filtered or unexported fields
}

客户端封装结构

func NewClient

func NewClient(config *config.AtreusCliConfig) (*Client, error)

func (*Client) AddCliOpt

func (c *Client) AddCliOpt(opts ...grpc.DialOption) *Client

func (*Client) BuildUnaryInterceptorChain2

func (c *Client) BuildUnaryInterceptorChain2() grpc.UnaryClientInterceptor

实现链式的客户端拦截器

func (*Client) CircuitBreaker

func (c *Client) CircuitBreaker() grpc.UnaryClientInterceptor

客户端熔断拦截器

func (*Client) ClientCallTimeout

func (c *Client) ClientCallTimeout(timeout time.Duration) grpc.UnaryClientInterceptor

客户端超时调用处理

func (*Client) ClientValidator

func (c *Client) ClientValidator() grpc.UnaryClientInterceptor

客户端参数校验

func (*Client) DoClientRetry

func (c *Client) DoClientRetry(optFuncs ...retrys.CallOption) grpc.UnaryClientInterceptor

客户端重试

func (*Client) IsBreakerNeedError

func (c *Client) IsBreakerNeedError(err error) bool

check whether or not error is acceptable,根据服务端错误的返回,来判断哪些错误才进入熔断计算逻辑 https://grpc.github.io/grpc/core/md_doc_statuscodes.html https://github.com/sony/gobreaker/blob/master/gobreaker.go#L113

func (*Client) IsBreakerNeedErrorV2

func (c *Client) IsBreakerNeedErrorV2(err error) bool

如果框架使用grpc的原生错误,那么必须使用status.Code(err)方法对errors进行转换

func (*Client) OpenTracingForClient

func (c *Client) OpenTracingForClient() grpc.UnaryClientInterceptor

用于客户端及服务端的tracing拦截器(jaeger)

func (*Client) Recovery

func (c *Client) Recovery() grpc.UnaryClientInterceptor

客户端 recovery 拦截器

func (*Client) Timing

func (c *Client) Timing() grpc.UnaryClientInterceptor

客户端接口调用耗时拦截器 Timing is an interceptor that logs the processing time (for client)

func (*Client) TransError

func (c *Client) TransError() grpc.UnaryClientInterceptor

客户端错误统一处理,将服务端返回的 err 类型(status.Status)统一转换为 errcode.Codes 类型 因为熔断器需要 errcode.Codes 类型

func (*Client) Use

func (c *Client) Use(handlers ...grpc.UnaryClientInterceptor) *Client

Use方法为grpc的客户端添加一个全局拦截器,传入参数是多个grpc.UnaryClientInterceptor

type Graceful

type Graceful struct {
	Logger *zap.Logger
}

func (*Graceful) RenewListener

func (g *Graceful) RenewListener(new_bindaddr string) (net.Listener, error)

继承或创建listener

type GracefulGrpcAppserver

type GracefulGrpcAppserver struct {
	AtreusServer *Server
	Addr         string
	Listener     net.Listener //当前GracefulGrpcAppserver对应的listener
	Logger       *zap.Logger
}

func NewGracefulGrpcAppserver

func NewGracefulGrpcAppserver(srv *Server, new_bindaddr string) (*GracefulGrpcAppserver, error)

以GracefulGrpcAppserver启动并创建Listener

func (*GracefulGrpcAppserver) RunServer

func (g *GracefulGrpcAppserver) RunServer() error

使用GracefulGrpcAppserver的run方法启动服务,代替Server启动

type Limiter

type Limiter interface {
	Allow(method string) bool
}

提供统一的limiter接口

type Server

type Server struct {
	Logger   *zap.Logger
	Conf     *config.AtreusSvcConfig //TODO:hot loading
	ConfLock *sync.RWMutex

	EtcdClient          *etcdv3.Client
	InnerHandlers       []grpc.UnaryServerInterceptor  //拦截器数组
	InnerStreamHandlers []grpc.StreamServerInterceptor //stream拦截器数组
	ServiceReg          dis.ServiceRegisterWrapper
	//auth
	Auther *auth.Authenticator //通用的验证接口
	//limiter
	Limiters *XRateLimiter
	//context
	Ctx context.Context
	//acl
	CallerIp []string
	//max retry limit
	MaxRetry int
	//Log sampling
	Sampling float64
	Proba    *xmath.XProbability

	//wrapper Server
	RpcServer *grpc.Server //原生Server
	// contains filtered or unexported fields
}

grpc-server核心结构(封装)

func NewServer

func NewServer(conf *config.AtreusSvcConfig, opt ...grpc.ServerOption) *Server

func (*Server) Authorize

func (s *Server) Authorize() grpc.UnaryServerInterceptor

Authorize :Server端认证的一元拦截器

func (*Server) BuildUnaryInterceptorChain

func (s *Server) BuildUnaryInterceptorChain(interceptors ...grpc.UnaryServerInterceptor) grpc.UnaryServerInterceptor

opt = append(opt, grpc.UnaryInterceptor(BuildUnaryInterceptorChain(Interceptor1, Interceptor2, Interceptor3, Interceptor4)))

func (*Server) BuildUnaryInterceptorChain2

func (s *Server) BuildUnaryInterceptorChain2(ctx context.Context, req interface{}, args *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error)

func (*Server) ExitWithSignalHandler

func (s *Server) ExitWithSignalHandler()

func (*Server) GetServer

func (s *Server) GetServer() *grpc.Server

Server return the grpc server for registering service.

func (*Server) Limit

func (s *Server) Limit(limiter Limiter) grpc.UnaryServerInterceptor

UnaryServerInterceptor returns a new unary server interceptors that performs request rate limiting.

func (*Server) LimitStream

func (s *Server) LimitStream(limiter Limiter) grpc.StreamServerInterceptor

StreamServerInterceptor returns a new stream server interceptor that performs rate limiting on the request.

func (*Server) Metrics2Prometheus

func (s *Server) Metrics2Prometheus() grpc.UnaryServerInterceptor

func (*Server) OpenTracingForServer

func (s *Server) OpenTracingForServer() grpc.UnaryServerInterceptor

必须实现`metadata.TextMapReader`公共接口:https://pkg.go.dev/github.com/opentracing/opentracing-go#TextMapReader

func (*Server) Recovery

func (s *Server) Recovery() grpc.UnaryServerInterceptor

NICE:将 recovery 作为 Server 拦截器,调用,打印崩溃异常的 stack 信息 在 Server 初始化时,这样调用,s.Use(s.Recovery(),...)

func (*Server) ReloadConfig

func (s *Server) ReloadConfig() error

func (*Server) RetryChecking

func (s *Server) RetryChecking() grpc.UnaryServerInterceptor

服务端重试检测:检测ctx中的重传次数是否满足服务端限制

func (*Server) Run

func (s *Server) Run() error

启动服务

func (*Server) Serve

func (s *Server) Serve(lis net.Listener) error

func (*Server) ServerDealTimeout

func (s *Server) ServerDealTimeout(timeout time.Duration) grpc.UnaryServerInterceptor

Server 端超时调用处理

func (*Server) ServerStat

func (s *Server) ServerStat() grpc.UnaryServerInterceptor

将cpu数据作为拦截器,每一次rpc调用都采集并返回客户端

INFO 05/18-06:44:36.358 grpc-access-log ret=0 path=/testproto.Greeter/SayHello ts=0.000648521 args=name:"tom" age:23  ip=127.0.0.1:8081
get reply: {hello tom from 127.0.0.1:8081 false} map[cpu_usage:[36] serverinfo:[enjoy]]

func (*Server) Shutdown

func (s *Server) Shutdown(ctx context.Context) error

优雅退出

func (*Server) SrcIpFilter

func (s *Server) SrcIpFilter() grpc.UnaryServerInterceptor

func (*Server) SrvValidator

func (s *Server) SrvValidator() grpc.UnaryServerInterceptor

func (*Server) Timing

func (s *Server) Timing() grpc.UnaryServerInterceptor

服务端接口调用耗时拦截器

func (*Server) TransError

func (s *Server) TransError() grpc.UnaryServerInterceptor

服务端错误统一化处理

func (*Server) Use

func (s *Server) Use(handlers ...grpc.UnaryServerInterceptor) *Server

Use method attachs a global unary inteceptor to the server

func (*Server) UseStreamInterceptor

func (s *Server) UseStreamInterceptor(handlers ...grpc.UnaryServerInterceptor) *Server

TODO:添加stream interceptors chain

func (*Server) XRequestId

func (s *Server) XRequestId() grpc.UnaryServerInterceptor

type WMetadata

type WMetadata struct {
	metadata.MD
}

func (*WMetadata) Add

func (m *WMetadata) Add(key string, value string)

func (WMetadata) Copy

func (m WMetadata) Copy() *WMetadata

get a copy from m

func (*WMetadata) Del

func (m *WMetadata) Del(key string)

func (*WMetadata) FromIncoming

func (m *WMetadata) FromIncoming(ctx context.Context) bool

从服务端ctx获取metadata

func (*WMetadata) FromOutgoing

func (m *WMetadata) FromOutgoing(ctx context.Context) bool

从客户端ctx获取metadata

func (*WMetadata) Get

func (m *WMetadata) Get(key string) string

func (*WMetadata) GetArr

func (m *WMetadata) GetArr(key string) []string

func (*WMetadata) Set

func (m *WMetadata) Set(key string, value string)

func (*WMetadata) ToIncoming

func (m *WMetadata) ToIncoming(ctx context.Context) context.Context

服务端注入数据

func (*WMetadata) ToOutgoing

func (m *WMetadata) ToOutgoing(ctx context.Context) context.Context

客户端注入数据

type XRateLimiter

type XRateLimiter struct {
	RateStore  map[string]*rate.Limiter //按照RPC-method限流
	LogTime    int64
	Rate       rate.Limit
	BucketSize int
}

提供基础xrate的限速实现

func NewXRateLimiter

func NewXRateLimiter(rates rate.Limit, size int) *XRateLimiter

func (*XRateLimiter) Allow

func (x *XRateLimiter) Allow(method string) bool

true:限速,请求丢弃 false:请求放过

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL