natsrpc

package module
v0.0.0-...-73261c1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 16, 2026 License: MIT Imports: 12 Imported by: 0

README

nats_rpc

nats_rpc 是一个独立的 Go 库,负责基于 NATS 实现:

  • RPC request/reply
  • notify publish/subscribe
  • 基于 protobuf 的统一信封协议
  • 可注册的消息号与 protobuf 类型映射
  • 可选的 typed router
  • 基于 NATS KV 的服务注册与服务发现(最终一致)

核心库不依赖 GoFrame,也不依赖当前仓库的私有 pbmsg 协议。

目录

  • proto/natsrpc/v1: 公开 protobuf 协议
  • registry: msg_id <-> proto.Message 注册中心
  • 根包 natsrpc: client/server/router/subscription/options
  • adapters/goframe: GoFrame 适配层

核心思路

业务协议仍然使用 protobuf。

NATS 上传输的统一信封也使用 protobuf:

  • msg_id: 业务消息号
  • rpc_id: 请求流水号
  • body: protobuf 序列化后的业务消息体
  • code/message: 远端错误信息
  • trace_id: 链路追踪标识
  • headers: 扩展头

快速示例

reg := registry.New()
client := natsrpc.NewClient(nc, natsrpc.WithRegistry(reg))
server := natsrpc.NewServer(nc, natsrpc.WithRegistry(reg))
router := natsrpc.NewRouter(reg)

说明

当前版本先完成独立库抽取,主项目原有调用点尚未整体切换到这个新库。

服务发现(NATS KV)

新增 Discovery 能力:

  • 服务端注册实例(service + instance_id + subject)
  • 心跳续租与下线注销
  • 客户端按 service 发现并发送(支持轮询挑选实例)
  • 可直接按 instance_id 定向发送

默认鉴权器为 AllowAllAuthenticator(不鉴权),也支持注入自定义 ServiceAuthenticator

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrNoServiceInstances = errors.New("no service instances available")

Functions

This section is empty.

Types

type AllowAllAuthenticator

type AllowAllAuthenticator struct{}

func (AllowAllAuthenticator) AuthorizeRegister

func (AllowAllAuthenticator) AuthorizeResolve

func (AllowAllAuthenticator) AuthorizeResolve(context.Context, string) error

type AsyncErrorHandler

type AsyncErrorHandler func(ctx context.Context, err error)

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(conn *nats.Conn, opts ...Option) *Client

func (*Client) Call

func (c *Client) Call(ctx context.Context, subject string, req proto.Message, rsp proto.Message) error

func (*Client) CallService

func (c *Client) CallService(ctx context.Context, service string, req proto.Message, rsp proto.Message) error

func (*Client) Notify

func (c *Client) Notify(ctx context.Context, subject string, msgID uint32, message proto.Message) error

func (*Client) NotifyInstance

func (c *Client) NotifyInstance(ctx context.Context, service, instanceID string, msgID uint32, message proto.Message) error

func (*Client) NotifyMessage

func (c *Client) NotifyMessage(ctx context.Context, subject string, message proto.Message) error

func (*Client) NotifyService

func (c *Client) NotifyService(ctx context.Context, service string, msgID uint32, message proto.Message) error

func (*Client) NotifyServiceMessage

func (c *Client) NotifyServiceMessage(ctx context.Context, service string, message proto.Message) error

func (*Client) PublishEnvelope

func (c *Client) PublishEnvelope(ctx context.Context, subject string, env *natsrpcv1.Envelope) error

func (*Client) Request

func (c *Client) Request(ctx context.Context, subject string, msgID uint32, req proto.Message, rsp proto.Message) error

func (*Client) RequestEnvelope

func (c *Client) RequestEnvelope(ctx context.Context, subject string, env *natsrpcv1.Envelope) (*natsrpcv1.Envelope, error)

func (*Client) RequestInstance

func (c *Client) RequestInstance(ctx context.Context, service, instanceID string, msgID uint32, req proto.Message, rsp proto.Message) error

func (*Client) RequestService

func (c *Client) RequestService(ctx context.Context, service string, msgID uint32, req proto.Message, rsp proto.Message) error

type Discovery

type Discovery struct {
	// contains filtered or unexported fields
}

func NewDiscovery

func NewDiscovery(conn *nats.Conn, opts ...DiscoveryOption) (*Discovery, error)

func (*Discovery) Deregister

func (d *Discovery) Deregister(ctx context.Context, service, instanceID string) error

func (*Discovery) FindInstance

func (d *Discovery) FindInstance(ctx context.Context, service, instanceID string) (ServiceInstance, error)

func (*Discovery) Heartbeat

func (d *Discovery) Heartbeat(ctx context.Context, service, instanceID string, ttl time.Duration) error

func (*Discovery) List

func (d *Discovery) List(ctx context.Context, service string) ([]ServiceInstance, error)

func (*Discovery) Pick

func (d *Discovery) Pick(ctx context.Context, service string) (ServiceInstance, error)

func (*Discovery) Register

func (d *Discovery) Register(ctx context.Context, instance ServiceInstance, ttl time.Duration) error

type DiscoveryOption

type DiscoveryOption func(*discoveryConfig)

func WithDiscoveryBucket

func WithDiscoveryBucket(bucket string) DiscoveryOption

func WithServiceAuthenticator

func WithServiceAuthenticator(auth ServiceAuthenticator) DiscoveryOption

type ErrorEncoder

type ErrorEncoder func(err error) (code int32, message string)

type NotifyHandler

type NotifyHandler func(ctx context.Context, env *natsrpcv1.Envelope) error

type NotifyMessageHandler

type NotifyMessageHandler func(ctx context.Context, message proto.Message) error

type Option

type Option func(*config)

func WithAsyncErrorHandler

func WithAsyncErrorHandler(handler AsyncErrorHandler) Option

func WithDiscovery

func WithDiscovery(discovery *Discovery) Option

func WithErrorEncoder

func WithErrorEncoder(encoder ErrorEncoder) Option

func WithRegistry

func WithRegistry(reg *registry.Registry) Option

func WithTimeout

func WithTimeout(timeout time.Duration) Option

func WithTracePropagator

func WithTracePropagator(trace TracePropagator) Option

type RPCHandler

type RPCHandler func(ctx context.Context, env *natsrpcv1.Envelope) (*natsrpcv1.Envelope, error)

type RPCMessageHandler

type RPCMessageHandler func(ctx context.Context, request proto.Message) (proto.Message, error)

type RemoteError

type RemoteError struct {
	Code    int32
	Message string
}

func (*RemoteError) Error

func (e *RemoteError) Error() string

type Router

type Router struct {
	// contains filtered or unexported fields
}
Example (NotifyOneWay)
reg := registry.New()
r := NewRouter(reg)

received := ""
_ = r.RegisterNotify(10, &wrapperspb.StringValue{}, func(_ context.Context, message proto.Message) error {
	received = message.(*wrapperspb.StringValue).Value
	return nil
})

notify := &natsrpcv1.Envelope{MsgId: 10, Body: mustMarshalForExample(&wrapperspb.StringValue{Value: "hello-notify"})}
_ = r.HandleNotify(context.Background(), notify)

fmt.Println(received)
Output:
hello-notify
Example (RpcRequestReply)
reg := registry.New()
r := NewRouter(reg)

_ = r.RegisterRPC(1, &wrapperspb.StringValue{}, 2, &wrapperspb.Int32Value{}, func(_ context.Context, request proto.Message) (proto.Message, error) {
	msg := request.(*wrapperspb.StringValue)
	return &wrapperspb.Int32Value{Value: int32(len(msg.Value))}, nil
})

req := &natsrpcv1.Envelope{MsgId: 1, RpcId: 101, Body: mustMarshalForExample(&wrapperspb.StringValue{Value: "abcd"})}
resp, _ := r.HandleRPC(context.Background(), req)

out := &wrapperspb.Int32Value{}
_ = proto.Unmarshal(resp.Body, out)
fmt.Println(out.Value)
Output:
4

func NewRouter

func NewRouter(reg *registry.Registry) *Router

func (*Router) HandleNotify

func (r *Router) HandleNotify(ctx context.Context, env *natsrpcv1.Envelope) error

func (*Router) HandleRPC

func (r *Router) HandleRPC(ctx context.Context, env *natsrpcv1.Envelope) (*natsrpcv1.Envelope, error)

func (*Router) RegisterNotify

func (r *Router) RegisterNotify(msgID uint32, prototype proto.Message, handler NotifyMessageHandler) error

func (*Router) RegisterNotifyMessage

func (r *Router) RegisterNotifyMessage(prototype proto.Message, handler NotifyMessageHandler) error

func (*Router) RegisterRPC

func (r *Router) RegisterRPC(requestID uint32, requestPrototype proto.Message, responseID uint32, responsePrototype proto.Message, handler RPCMessageHandler) error

func (*Router) RegisterRPCMessage

func (r *Router) RegisterRPCMessage(requestPrototype proto.Message, responsePrototype proto.Message, handler RPCMessageHandler) error

func (*Router) Registry

func (r *Router) Registry() *registry.Registry

type Server

type Server struct {
	// contains filtered or unexported fields
}

func NewServer

func NewServer(conn *nats.Conn, opts ...Option) *Server

func (*Server) Close

func (s *Server) Close() error

func (*Server) QueueSubscribeNotify

func (s *Server) QueueSubscribeNotify(ctx context.Context, subject, queue string, handler NotifyHandler) (*Subscription, error)

func (*Server) QueueSubscribeRPC

func (s *Server) QueueSubscribeRPC(ctx context.Context, subject, queue string, handler RPCHandler) (*Subscription, error)

func (*Server) RegisterService

func (s *Server) RegisterService(ctx context.Context, discovery *Discovery, instance ServiceInstance, ttl time.Duration, heartbeatInterval time.Duration) (*ServiceRegistration, error)

func (*Server) SubscribeNotify

func (s *Server) SubscribeNotify(ctx context.Context, subject string, handler NotifyHandler) (*Subscription, error)

func (*Server) SubscribeRPC

func (s *Server) SubscribeRPC(ctx context.Context, subject string, handler RPCHandler) (*Subscription, error)

type ServiceAuthenticator

type ServiceAuthenticator interface {
	AuthorizeRegister(ctx context.Context, instance ServiceInstance) error
	AuthorizeResolve(ctx context.Context, service string) error
}

type ServiceInstance

type ServiceInstance struct {
	Service     string            `json:"service"`
	InstanceID  string            `json:"instance_id"`
	Subject     string            `json:"subject"`
	Version     string            `json:"version,omitempty"`
	Metadata    map[string]string `json:"metadata,omitempty"`
	HeartbeatAt time.Time         `json:"heartbeat_at"`
	ExpiresAt   time.Time         `json:"expires_at,omitempty"`
}

type ServiceRegistration

type ServiceRegistration struct {
	// contains filtered or unexported fields
}

func (*ServiceRegistration) Stop

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

func (*Subscription) Unsubscribe

func (s *Subscription) Unsubscribe() error

type TracePropagator

type TracePropagator interface {
	Inject(ctx context.Context, env *natsrpcv1.Envelope)
	Extract(ctx context.Context, env *natsrpcv1.Envelope) (context.Context, error)
}

Directories

Path Synopsis
adapters
goframe module
proto

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL