node

package
v0.0.0-...-7441410 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 8, 2026 License: Apache-2.0 Imports: 42 Imported by: 0

Documentation

Index

Constants

View Source
const TickInterval = 5 * time.Millisecond

Variables

View Source
var (
	AddrLocal   = Addr(0)
	AddrRemote  = Addr(-1)
	AddrInvalid = Addr(-2)
)
View Source
var (
	ErrServiceNotExist      = NewError(ErrServiceNotFound, "service not exist")
	ErrNodeMessageChanFull  = NewError(ErrTransport, "node message channel full")
	ErrRequestTimeoutRemote = NewError(ErrTimeout, "session timeout from remote")
	ErrRequestTimeoutLocal  = NewError(ErrTimeout, "session timeout from local")
	ErrRequestCancelled     = NewError(ErrCancelled, "request cancelled by context")
)
View Source
var Config = &nodeConfig{
	CurNodeMap: map[string]bool{},
}
View Source
var ErrRemoteDisconnected = NewError(ErrTransport, "remote disconnected")

Functions

func AddNode

func AddNode(b host.IBuilder, registerFactory func() *RegisterOption)

func CheckedServiceNilPtr

func CheckedServiceNilPtr[T any, U consService[T]]() any

func GetRegisteredService

func GetRegisteredService() []string

GetRegisteredService 获取所有自动注册的服务名称列表。 可直接用于 ElementOption.Services。

func IsCode

func IsCode(err error, code ErrorCode) bool

func NewError

func NewError(code ErrorCode, msg string) error

func NewService

func NewService(name string) (int32, error)

func NodeTracer

func NodeTracer() trace.Tracer

NodeTracer 返回节点包的 OTel Tracer。 调用方需提前通过 otel.SetTracerProvider() 配置全局 TracerProvider。

func Register

func Register[T any, U consService[T]](name string, setup ...func(host.IBuilder))

Register 自动注册服务,在服务包的 init() 中调用。

Kind 自动递增分配,Name 为服务名。 可选 setup 回调在 RegisterService 时执行,用于绑定配置等构建期操作。

用法:

func init() {
    node.Register[MyService, *MyService]("MyService")
}

带配置绑定:

func init() {
    node.Register[MyService, *MyService]("MyService", func(b host.IBuilder) {
        host.AddOption[MyConfig](b, "MyService")
    })
}

func RegisterService

func RegisterService(b host.IBuilder, opts ...func(*RegisterOption))

RegisterService 自动注册所有服务到节点。 替代手动 AddNode + 逐个 CheckedServiceRegisterInfoName 的方式。 可选 opts 用于设置 RegisterOption 的其他字段(如 Preprocessor 等)。

func StartService

func StartService(sAddr int32, arg any) bool

StartService 快速启动一个服务,保证异步调用到 Service 的 Start,由用户保证完整、正确启动

func StopService

func StopService(sAddr int32) bool

StopService 关闭一个服务,阻塞执行

func WrapError

func WrapError(code ErrorCode, op string, err error) error

Types

type Addr

type Addr int64

func NewNodeAddr

func NewNodeAddr(host string, port int) (Addr, error)

func (Addr) GetIPString

func (ss Addr) GetIPString() string

func (Addr) GetPort

func (ss Addr) GetPort() int

func (Addr) IsLocalhost

func (ss Addr) IsLocalhost() bool

func (Addr) String

func (ss Addr) String() string

type AddrUpdater

type AddrUpdater struct {
	// contains filtered or unexported fields
}

func NewNodeAddrUpdater

func NewNodeAddrUpdater(nAddr Addr, updateFunc func(chan<- Addr)) *AddrUpdater

func (*AddrUpdater) GetNodeAddr

func (ss *AddrUpdater) GetNodeAddr() Addr

func (*AddrUpdater) Start

func (ss *AddrUpdater) Start()

type ElementOption

type ElementOption struct {
	Order    int      `snow:"Order"`    // 节点顺序,节点服务的查找按 Order 值从小到大依次进行
	Host     string   `snow:"Host"`     // 节点主机名,可以是 IP,若当前节点的 Host 为空,则节点 Tcp 监听 LocalIP
	Port     int      `snow:"Port"`     // 节点 Tcp 端口
	HttpPort int      `snow:"HttpPort"` // 节点 Http 端口
	UseHttps bool     `snow:"UseHttps"` // 节点是否为 Https
	Services []string `snow:"Services"` // 节点包含的服务,若为当前节点则代表要启动的服务;服务按照顺序启动,逆序关闭
}

type Error

type Error struct {
	Code  ErrorCode
	Op    string
	Msg   string
	Cause error
}

Error 是统一错误包装,支持 errors.Is/errors.As。

func (*Error) Error

func (e *Error) Error() string

func (*Error) ErrorCode

func (e *Error) ErrorCode() string

ErrorCode 返回稳定错误码,供日志与告警聚合使用。

func (*Error) Is

func (e *Error) Is(target error) bool

func (*Error) Unwrap

func (e *Error) Unwrap() error

type ErrorCode

type ErrorCode string

ErrorCode 用于稳定的错误聚合与告警统计。

const (
	ErrUnknown         ErrorCode = "UNKNOWN"
	ErrTimeout         ErrorCode = "TIMEOUT"
	ErrServiceNotFound ErrorCode = "SERVICE_NOT_FOUND"
	ErrCodec           ErrorCode = "CODEC"
	ErrTransport       ErrorCode = "TRANSPORT"
	ErrCancelled       ErrorCode = "CANCELLED"
	ErrDraining        ErrorCode = "DRAINING"
	ErrInvalidArgument ErrorCode = "INVALID_ARGUMENT"
	ErrInternal        ErrorCode = "INTERNAL"
)

func CodeOf

func CodeOf(err error) ErrorCode

type ICodec

type ICodec interface {
	// Marshal 将对象编码为字节串。
	Marshal(v any) ([]byte, error)
	// Unmarshal 将字节串解码到对象。
	Unmarshal(data []byte, v any) error
	// Name 返回编解码器名称(如 "json"、"msgpack"),用于日志与调试。
	Name() string
}

ICodec RPC 消息体编解码接口,用于 TCP 二进制协议的参数序列化。 HTTP RPC 始终使用 JSON(因 Content-Type 语义绑定),不受此接口影响。

type IMetricCollector

type IMetricCollector interface {
	// Gauge 仪表,设置值
	Gauge(name string, val int64)
	// Counter 计数器,累加值
	Counter(name string, val uint64)
	// Histogram 直方图,累加,但值为浮点数,可为正负
	Histogram(name string, val float64)
}

type INodeAddr

type INodeAddr interface {
	IsLocalhost() bool
	GetIPString() string
	String() string
}

type IPromise

type IPromise interface {
	Then(f any) IPromise
	Catch(f func(error)) IPromise
	Final(f func()) IPromise
	// WithContext 绑定显式 Context,覆盖默认的 Service 生命周期 Context。
	// 用于自定义超时(context.WithTimeout)、上游取消传播、或附加 trace 信息。
	// 未调用时,框架自动使用发起方 Service 的 Context 作为父级。
	WithContext(ctx context.Context) IPromise
	Done()
}

type IProxy

type IProxy interface {
	Call(name string, args ...any) IPromise
	GetNodeAddr() INodeAddr
	Avail() bool
}

type IRpcContext

type IRpcContext interface {
	// Context 返回本次 RPC 调用关联的 context.Context,
	// 可用于派生下游调用的超时/取消,或传递 trace 信息。
	Context() context.Context
	GetRemoteNodeAddr() INodeAddr
	GetRemoteServiceAddr() int32
	Catch(f func(error)) IRpcContext
	Return(args ...any)
	Error(error)
}

type IServiceDiscovery

type IServiceDiscovery interface {
	// Resolve 根据服务名解析目标节点地址。
	// 返回 AddrInvalid 或 error 时框架回退到静态表查找。
	Resolve(serviceName string) (INodeAddr, error)
	// Deregister 停机时注销自身(由 Node.Stop 调用)。传入当前节点地址和服务列表。
	Deregister(nodeAddr INodeAddr, services []string)
}

IServiceDiscovery 服务发现接口,用于运行时动态解析服务名到节点地址。 未配置时框架回退到静态配置表(Config.Nodes),二者可共存。

type ITimeWheelHandle

type ITimeWheelHandle interface {
	Stop()
}

type JsonCodec

type JsonCodec struct{}

JsonCodec 基于 xjson(json-iterator)的默认编解码器。

func (JsonCodec) Marshal

func (JsonCodec) Marshal(v any) ([]byte, error)

func (JsonCodec) Name

func (JsonCodec) Name() string

func (JsonCodec) Unmarshal

func (JsonCodec) Unmarshal(data []byte, v any) error

type MsgpackCodec

type MsgpackCodec struct{}

MsgpackCodec 基于 vmihailenco/msgpack/v5 的二进制编解码器。 相比 JSON,序列化体积更小、编解码速度更快。 使用时通过 RegisterOption.Codec 注入。

func (MsgpackCodec) Marshal

func (MsgpackCodec) Marshal(v any) ([]byte, error)

func (MsgpackCodec) Name

func (MsgpackCodec) Name() string

func (MsgpackCodec) Unmarshal

func (MsgpackCodec) Unmarshal(data []byte, v any) error

type Node

type Node struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func (*Node) Construct

func (ss *Node) Construct(host host.IHost, logger *logging.Logger[Node], nodeOpt *option.Option[*Option], registerOpt *option.Option[*RegisterOption])

func (*Node) IsDraining

func (ss *Node) IsDraining() bool

func (*Node) Start

func (ss *Node) Start(ctx context.Context, wg *xsync.TimeoutWaitGroup)

func (*Node) Stop

func (ss *Node) Stop(ctx context.Context, wg *xsync.TimeoutWaitGroup)

type NodeInfo

type NodeInfo struct {
	Uid     int64            `json:"uid"`
	Sid     int64            `json:"sid"`
	Name    string           `json:"name"`
	Host    string           `json:"host"`
	Port    int              `json:"port"`
	Version *version.Version `json:"version"`
}

type Option

type Option struct {
	LocalIP              string                    `snow:"LocalIP"`              // 内网 ip,用于判断 RPC 连接是否是本地
	ProfileListenHost    string                    `snow:"ProfileListenHost"`    // Profile 监听地址,为空表示不监听
	ProfileListenMinPort int                       `snow:"ProfileListenMinPort"` // Profile 监听动态最小端口
	ProfileListenMaxPort int                       `snow:"ProfileListenMaxPort"` // Profile 监听动态最大端口,包含;若使用固定端口,则应该与最小端口一致
	HttpKeepAliveSeconds int                       `snow:"HttpKeepAliveSeconds"` // 节点 Http 服务保活时间
	HttpTimeoutSeconds   int                       `snow:"HttpTimeoutSeconds"`   // 节点 Http 服务超时时间
	HttpDebug            bool                      `snow:"HttpDebug"`            // 节点 Http 是否为调试模式
	StopDrainTimeoutSec  int                       `snow:"StopDrainTimeoutSec"`  // Drain 等待在途请求最大秒数
	StopDrainPollMs      int                       `snow:"StopDrainPollMs"`      // Drain 轮询间隔毫秒
	BootName             string                    `snow:"BootName"`             // 启动节点名
	Nodes                map[string]*ElementOption `snow:"Nodes"`                // 当前关注的节点信息
}

type RegisterOption

type RegisterOption struct {
	ServiceRegisterInfos     []*ServiceRegisterInfo
	ServiceDependencies      map[string][]string // key 依赖 value,A:[B] 表示 A 依赖 B
	ClientHandlePreprocessor xnet.IPreprocessor
	ServerHandlePreprocessor xnet.IPreprocessor
	PostInitializer          func()
	MetricCollector          IMetricCollector
	// Codec TCP RPC 参数编解码器,默认 JsonCodec(基于 xjson)。
	// HTTP RPC 始终使用 JSON,不受此字段影响。
	Codec ICodec
	// ServiceDiscovery 可选的服务发现实现(etcd / Consul / ZooKeeper 等)。
	// 配置后 CreateProxy 优先通过 Discovery 解析服务地址;
	// 未配置或解析失败时回退到静态配置表。
	ServiceDiscovery IServiceDiscovery
	// Tracer 可选的 OTel Tracer,nil 表示不开启链路追踪(零开销)。
	// 调用方需提前通过 otel.SetTracerProvider() 配置全局 TracerProvider,
	// 或直接传入 NodeTracer() 返回的 Tracer。
	Tracer trace.Tracer
}

type Service

type Service struct {
	// contains filtered or unexported fields
}

func (*Service) After

func (ss *Service) After(delay time.Duration, f func()) ITimeWheelHandle

After 定时器,延迟 delay 时间后执行函数 f,非线程安全

func (*Service) AfterStop

func (ss *Service) AfterStop()

func (*Service) Closed

func (ss *Service) Closed() bool

func (*Service) Construct

func (ss *Service) Construct(logger *logging.Logger[Service])

Construct 注入构造函数

func (*Service) CreateEmptyProxy

func (ss *Service) CreateEmptyProxy() IProxy

CreateEmptyProxy 创建空代理,线程安全

func (*Service) CreateHttpProxy

func (ss *Service) CreateHttpProxy(httpUrl, name string) IProxy

func (*Service) CreateProxy

func (ss *Service) CreateProxy(name string) IProxy

CreateProxy 根据服务名自动创建代理,线程安全

func (*Service) CreateProxyByNodeAddr

func (ss *Service) CreateProxyByNodeAddr(nAddr INodeAddr, sAddr int32) IProxy

CreateProxyByNodeAddr 根据节点地址及服务地址创建代理,线程安全

func (*Service) CreateProxyByNodeKind

func (ss *Service) CreateProxyByNodeKind(nAddr INodeAddr, name string) IProxy

CreateProxyByNodeKind 根据节点地址及服务名创建代理,线程安全

func (*Service) CreateProxyByUpdaterKind

func (ss *Service) CreateProxyByUpdaterKind(nAddrUpdater *AddrUpdater, name string) IProxy

CreateProxyByUpdaterKind 根据节点地址更新器及服务名创建代理,线程安全

func (*Service) Debugf

func (ss *Service) Debugf(format string, args ...any)

func (*Service) EnableHttpRPCForward

func (ss *Service) EnableHttpRPCForward(srvAddresses []int32)

EnableHttpRPCForward 设置 HTTP RPC 转发可用,此时队列中的 RPC 会开始执行转发,非线程安全

func (*Service) EnableHttpRpc

func (ss *Service) EnableHttpRpc()

EnableHttpRpc 设置 HTTP RPC 可用,此时队列中的 RPC 会开始执行,非线程安全

func (*Service) EnableRpc

func (ss *Service) EnableRpc()

EnableRpc 设置 RPC 可用,此时队列中的 RPC 会开始执行,非线程安全

func (*Service) Entry

func (ss *Service) Entry(ctx IRpcContext, funcName string, argGetter func(ft reflect.Type) ([]reflect.Value, error)) func()

func (*Service) Errorf

func (ss *Service) Errorf(format string, args ...any)

func (*Service) Fatalf

func (ss *Service) Fatalf(format string, args ...any)

func (*Service) Fork

func (ss *Service) Fork(tag string, f func()) bool

Fork 将函数放入主线程并在下一帧中执行,线程安全

func (*Service) GetAddr

func (ss *Service) GetAddr() int32

GetAddr 获取服务地址,线程安全

func (*Service) GetKind

func (ss *Service) GetKind() int32

GetKind 获取服务类型,线程安全

func (*Service) GetMillisecond

func (ss *Service) GetMillisecond() int64

GetMillisecond 获取当前时间毫秒,非线程安全

func (*Service) GetName

func (ss *Service) GetName() string

GetName 获取服务名称,线程安全

func (*Service) GetSecond

func (ss *Service) GetSecond() int64

GetSecond 获取当前时间秒,非线程安全

func (*Service) GetTime

func (ss *Service) GetTime() time.Time

GetTime 获取当前时间,非线程安全

func (*Service) InFlightRequests

func (ss *Service) InFlightRequests() int64

func (*Service) Infof

func (ss *Service) Infof(format string, args ...any)

func (*Service) Paused

func (ss *Service) Paused() bool

func (*Service) RpcReload

func (ss *Service) RpcReload(ctx IRpcContext)

RpcReload 重载服务配置 RPC 的默认实现

func (*Service) RpcStatus

func (ss *Service) RpcStatus(ctx IRpcContext)

RpcStatus 获取服务状态 RPC 的默认实现

func (*Service) SetAllowedRPC

func (ss *Service) SetAllowedRPC(names []string)

SetAllowedRPC 设置允许调用的 RPC 函数,不含 "Rpc" 头,非线程安全

func (*Service) Start

func (ss *Service) Start(_ any)

func (*Service) Stop

func (ss *Service) Stop(_ *sync.WaitGroup)

func (*Service) Tick

func (ss *Service) Tick(interval, delay time.Duration, f func()) ITimeWheelHandle

Tick 定时器,延迟 delay 时间并在后续以 interval 时间间隔执行函数 f,间隔时间与函数执行时间无关,非线程安全

func (*Service) TickAfter

func (ss *Service) TickAfter(interval time.Duration, f func(func())) ITimeWheelHandle

TickAfter 定时器,延迟 interval 时间后执行函数 f,若传入 f 的函数参数被执行,则延迟 interval 后继续执行函数 f,非线程安全

func (*Service) TickDelayRandom

func (ss *Service) TickDelayRandom(interval time.Duration, f func()) ITimeWheelHandle

TickDelayRandom 定时器,延迟 [0, interval) 随机时间并在后续以 interval 时间间隔执行函数 f,间隔时间与函数执行时间无关,非线程安全

func (*Service) Tracef

func (ss *Service) Tracef(format string, args ...any)

func (*Service) Warnf

func (ss *Service) Warnf(format string, args ...any)

type ServiceName

type ServiceName = string

type ServiceRegisterInfo

type ServiceRegisterInfo struct {
	Kind int32
	Name string
	Type reflect.Type
}

func CheckedServiceRegisterInfo

func CheckedServiceRegisterInfo[T any, U consService[T]](kind int32) *ServiceRegisterInfo

func CheckedServiceRegisterInfoName

func CheckedServiceRegisterInfoName[T any, U consService[T]](kind int32, name string) *ServiceRegisterInfo

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL