grpc

package module
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 29, 2023 License: MPL-2.0 Imports: 30 Imported by: 1

README

RPC简介

技术部的RPC框架,融合技术部的核心科技,带来如飞一般的体验。

相关配置信息说明

server配置项说明

// 服务器配置信息
type ServerConfig struct {
    Network           string            `json:"network"`           // 网络为rpc监听网络,默认值为tcp
    Addr              string            `json:"address"`           // 地址是rpc监听地址,默认值为0.0.0.0:9000
    Timeout           utils.Duration    `json:"timeout"`           // 超时是每个rpc调用的上下文超时。
    IdleTimeout       utils.Duration    `json:"idleTimeout"`       // IdleTimeout是一段持续时间,在这段时间内可以通过发送GoAway关闭空闲连接。 空闲持续时间是自最近一次未完成RPC的数量变为零或建立连接以来定义的。
    MaxLifeTime       utils.Duration    `json:"maxLife"`           // MaxLifeTime是连接通过发送GoAway关闭之前可能存在的最长时间的持续时间。 将向+/- 10%的随机抖动添加到MaxConnectionAge中以分散连接风暴.
    ForceCloseWait    utils.Duration    `json:"closeWait"`         // ForceCloseWait是MaxLifeTime之后的附加时间,在此之后将强制关闭连接。
    KeepAliveInterval utils.Duration    `json:"keepaliveInterval"` // 如果服务器没有看到任何活动,则KeepAliveInterval将在此时间段之后,对客户端进行ping操作以查看传输是否仍然有效。
    KeepAliveTimeout  utils.Duration    `json:"keepaliveTimeout"`  // 进行keepalive检查ping之后,服务器将等待一段时间的超时,并且即使在关闭连接后也看不到活动。
    RateLimit         *ratelimit.Config `json:"limit"`             // 限流
    EnableLog         bool              `json:"enableLog"`         // 是否打开日记
}

对应zk中的信息:

服务短基础信息地址为: /system/base/app/9999
{
  "network":"tcp",
  "address":"127.0.0.1:9090",
  "timeout":"2s",
  "idleTimeout":"2s",
  "maxLife":"2s",
  "closeWait":"2s",
  "keepaliveInterval":"2s",
  "keepaliveTimeout":"2s",
  "enableLog":true
}

注意:9999为具体app中的systemId

Client 配置项说明

// ClientConfig是rpc客户端配置.
type ClientConfig struct {
    Dial                utils.Duration           `json:"dial"`
    Timeout             utils.Duration           `json:"timeout"`
    Method              map[string]*ClientConfig `json:"method"`
    NonBlock            bool                     `json:"nonBlock"`
    KeepAliveInterval   utils.Duration           `json:"keepAliveInterval"`
    KeepAliveTimeout    utils.Duration           `json:"keepAliveTimeout"`
    PermitWithoutStream bool                     `json:"permitWithoutStream"`
    EnableLog           bool                     `json:"enableLog"`
}

对应zk中的配置信息:

客户端基础信息地址为: /system/base/rpc/9999
{
  "dial":"10s",
  "timeout":"10s",
  "nonBlock":false,
  "keepAliveInterval":"10s",
  "keepAliveTimeout":"10s",
  "keepAliveWithoutStream":true,
  "enableLog":true
}

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func FromError

func FromError(svrErr error) (gst *status.Status)

转换服务回复错误并尝试将其转换为grpc.Status.

func NewConn

func NewConn(target string, conf *ClientConfig, caller []string, opt ...grpc.DialOption) (*grpc.ClientConn, error)

NewConn 创建rpc连接.

func ToMetaCode

func ToMetaCode(gst *status.Status) metacode.Codes

将grpc.status转换为metacode.Codes

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client 客户端是框架的客户端实例,它包含ctx,opt和拦截器。 使用NewClient()创建Client的实例.

Example
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/aluka-7/configuration"
	rpc "github.com/aluka-7/grpc"
	"github.com/aluka-7/grpc/testproto"
	"github.com/rs/zerolog/log"
	"google.golang.org/grpc"
)

const systemId = "10000"

func main() {
	conf := configuration.DefaultEngine()
	conn, _ := rpc.Engine(systemId, conf).ClientConn("1000", func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker,
		opts ...grpc.CallOption) (ret error) {
		_ctx, cancel := context.WithTimeout(ctx, time.Second*5)
		defer cancel()
		ret = invoker(_ctx, method, req, reply, cc, opts...)
		return
	})
	defer conn.Close()

	c := testproto.NewGreeterClient(conn)
	name := "2233"
	rp, err := c.SayHello(context.Background(), &testproto.HelloRequest{Name: name, Age: 18})
	if err != nil {
		log.Err(err).Msg("could not greet")
		return
	}
	fmt.Println("rp", *rp)
}
Output:

func NewClient

func NewClient(conf *ClientConfig, opt ...grpc.DialOption) *Client

NewClient 返回带有默认客户端拦截器的新的空白Client实例. opt可用于添加rpc拨号选项.

func (*Client) Dial

func (c *Client) Dial(ctx context.Context, target string, caller []string, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error)

func (*Client) DialTLS

func (c *Client) DialTLS(ctx context.Context, target string, file string, name string, caller []string, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error)

DialTLS 通过tls传输创建到给定目标的客户端连接.

func (*Client) SetConfig

func (c *Client) SetConfig(conf *ClientConfig) (err error)

SetConfig 热重载客户端配置

func (*Client) Use

func (c *Client) Use(handlers ...grpc.UnaryClientInterceptor) *Client

Use Use将全局拦截器附加到客户端。 例如:这是断路器或错误管理拦截器的正确位置。

func (*Client) UseOpt

func (c *Client) UseOpt(opts ...grpc.DialOption) *Client

UseOpt 将全局 rpc DialOption 附加到客户端.

type ClientConfig

type ClientConfig struct {
	Dial                utils.Duration           `json:"dial"`
	Timeout             utils.Duration           `json:"timeout"`
	Method              map[string]*ClientConfig `json:"method"`
	NonBlock            bool                     `json:"nonBlock"`
	KeepAliveInterval   utils.Duration           `json:"keepAliveInterval"`
	KeepAliveTimeout    utils.Duration           `json:"keepAliveTimeout"`
	PermitWithoutStream bool                     `json:"permitWithoutStream"`
	EnableLog           bool                     `json:"enableLog"`
}

ClientConfig rpc客户端配置.

type RpcClientConfig

type RpcClientConfig struct {
	*ClientConfig
	Target string `json:"target"`
}

type RpcEngine

type RpcEngine interface {
	Server(monitor bool, app, group, path string, handlers ...grpc.UnaryServerInterceptor) (*Server, *RpcServerConfig)
	ClientConn(systemId string, handlers ...grpc.UnaryClientInterceptor) (conn *grpc.ClientConn, cc *RpcClientConfig)
}

func Engine

func Engine(systemId string, cfg configuration.Configuration) RpcEngine

type RpcServerConfig

type RpcServerConfig struct {
	*ServerConfig
	Tag []trace.Tag `json:"tag"`
}

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server 是框架的服务器端实例,它包含RpcServer,拦截器和拦截器。 通过使用NewServer()创建Server的实例。

Example
package main

import (
	"context"
	"io"
	"time"

	"github.com/aluka-7/configuration"
	rpc "github.com/aluka-7/grpc"
	"github.com/aluka-7/grpc/testproto"
	"google.golang.org/grpc"
)

type helloServer struct {
}

func (s *helloServer) SayHello(ctx context.Context, in *testproto.HelloRequest) (*testproto.HelloReply, error) {
	return &testproto.HelloReply{Message: "Hello " + in.Name, Success: true}, nil
}

func (s *helloServer) StreamHello(ss testproto.Greeter_StreamHelloServer) error {
	for i := 0; i < 3; i++ {
		in, err := ss.Recv()
		if err == io.EOF {
			return nil
		}
		if err != nil {
			return err
		}
		ret := &testproto.HelloReply{Message: "Hello " + in.Name, Success: true}
		err = ss.Send(ret)
		if err != nil {
			return err
		}
	}
	return nil

}

const systemId = "10000"

func main() {
	conf := configuration.DefaultEngine()
	s, _ := rpc.Engine(systemId, conf).Server("base", "app", systemId)
	// apply server interceptor middleware
	s.Use(func(ctx context.Context, req interface{}, args *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
		_ctx, cancel := context.WithTimeout(ctx, time.Second*10)
		defer cancel()
		resp, err := handler(_ctx, req)
		return resp, err
	})
	testproto.RegisterGreeterServer(s.Server(), &helloServer{})
	s.Start()
}
Output:

func NewServer

func NewServer(conf *ServerConfig, opt ...grpc.ServerOption) (s *Server)

NewServer 带有默认服务器拦截器的新的空白Server实例。

func (*Server) RegisterValidation

func (s *Server) RegisterValidation(key string, fn validator.Func) error

RegisterValidation 将验证功能添加到由键表示的验证者的验证者映射中 注意:如果密钥已经存在,则先前的验证功能将被替换。 注意:此方法不是线程安全的,因此应在进行任何验证之前先将它们全部注册

func (*Server) Run

func (s *Server) Run(addr string) error

Run 运行create tcp侦听器,并启动goroutine为每个传入请求提供服务。 除非调用Stop或GracefulStop,否则Run将返回非nil错误。

func (*Server) RunUnix

func (s *Server) RunUnix(file string) error

RunUnix 创建一个unix侦听器并启动goroutine来处理每个传入的请求. 除非调用Stop或GracefulStop,否则RunUnix将返回非nil错误.

func (*Server) Serve

func (s *Server) Serve(lis net.Listener) error

Serve在侦听器lis上接受传入连接,从而为每个连接创建一个新的ServerTransport和服务goroutine。 除非调用Stop或GracefulStop,否则Serve将返回非nil错误.

func (*Server) Server

func (s *Server) Server() *grpc.Server

Server 返回用于注册服务的rpc服务器.

func (*Server) SetConfig

func (s *Server) SetConfig(conf *ServerConfig) (err error)

SetConfig 热重载服务器配置

func (*Server) Shutdown

func (s *Server) Shutdown(ctx context.Context) (err error)

Shutdown可以正常停止服务器。 它停止服务器接受新的连接和RPC,并阻止直到所有未完成的RPC完成或到达上下文截止日期为止.

func (*Server) Start

func (s *Server) Start() (*Server, error)

Start 开始使用配置的listen addr创建一个新的goroutine运行服务器,如果发生任何错误,它将惊慌返回服务器本身.

func (*Server) StartWithAddr

func (s *Server) StartWithAddr() (*Server, net.Addr, error)

StartWithAddr 使用配置的监听地址创建一个新的goroutine运行服务器,如果发生任何错误,它将崩溃 返回服务器本身和实际的监听地址(如果配置的监听端口为零,则操作系统将分配一个未使用的端口)

func (*Server) Use

func (s *Server) Use(handlers ...grpc.UnaryServerInterceptor) *Server

Use 将全局拦截器附加到服务器. 例如:这是速率限制器或错误管理拦截器的正确位置.

type ServerConfig

type ServerConfig struct {
	Network           string         `json:"network"`           // 网络为rpc监听网络,默认值为 tcp
	Addr              string         `json:"address"`           // 地址是rpc监听地址,默认值为 0.0.0.0:9000
	Timeout           utils.Duration `json:"timeout"`           // 超时是每个rpc调用的上下文超时。
	IdleTimeout       utils.Duration `json:"idleTimeout"`       // IdleTimeout 是一段持续时间,在这段时间内可以通过发送 GoAway 关闭空闲连接。 空闲持续时间是自最近一次未完成RPC的数量变为零或建立连接以来定义的。
	MaxLifeTime       utils.Duration `json:"maxLife"`           // MaxLifeTime 是连接通过发送GoAway关闭之前可能存在的最长时间的持续时间。 将向+/- 10%的随机抖动添加到MaxConnectionAge中以分散连接风暴.
	ForceCloseWait    utils.Duration `json:"closeWait"`         // ForceCloseWait 是 MaxLifeTime 之后的附加时间,在此之后将强制关闭连接。
	KeepAliveInterval utils.Duration `json:"keepaliveInterval"` // 如果服务器没有看到任何活动,则 KeepAliveInterval 将在此时间段之后,对客户端进行ping操作以查看传输是否仍然有效。
	KeepAliveTimeout  utils.Duration `json:"keepaliveTimeout"`  // 进行 keepalive 检查 ping 之后,服务器将等待一段时间的超时,并且即使在关闭连接后也看不到活动。
	EnableLog         bool           `json:"enableLog"`         // 是否打开日志
}

ServerConfig 服务器配置信息

type TimeoutCallOption

type TimeoutCallOption struct {
	*grpc.EmptyCallOption
	Timeout time.Duration
}

TimeoutCallOption 超时选项.

func WithTimeoutCallOption

func WithTimeoutCallOption(timeout time.Duration) *TimeoutCallOption

WithTimeoutCallOption 可以覆盖ctx中的超时和配置文件中的超时

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL