Documentation
¶
Index ¶
- Constants
- Variables
- func AddNode(b host.IBuilder, registerFactory func() *RegisterOption)
- func CheckedServiceNilPtr[T any, U consService[T]]() any
- func GetRegisteredService() []string
- func IsCode(err error, code ErrorCode) bool
- func NewError(code ErrorCode, msg string) error
- func NewService(name string) (int32, error)
- func NodeTracer() trace.Tracer
- func Register[T any, U consService[T]](name string, setup ...func(host.IBuilder))
- func RegisterService(b host.IBuilder, opts ...func(*RegisterOption))
- func StartService(sAddr int32, arg any) bool
- func StopService(sAddr int32) bool
- func WrapError(code ErrorCode, op string, err error) error
- type Addr
- type AddrUpdater
- type ElementOption
- type Error
- type ErrorCode
- type ICodec
- type IMetricCollector
- type INodeAddr
- type IPromise
- type IProxy
- type IRpcContext
- type IServiceDiscovery
- type ITimeWheelHandle
- type JsonCodec
- type MsgpackCodec
- type Node
- type NodeInfo
- type Option
- type RegisterOption
- type Service
- func (ss *Service) After(delay time.Duration, f func()) ITimeWheelHandle
- func (ss *Service) AfterStop()
- func (ss *Service) Closed() bool
- func (ss *Service) Construct(logger *logging.Logger[Service])
- func (ss *Service) CreateEmptyProxy() IProxy
- func (ss *Service) CreateHttpProxy(httpUrl, name string) IProxy
- func (ss *Service) CreateProxy(name string) IProxy
- func (ss *Service) CreateProxyByNodeAddr(nAddr INodeAddr, sAddr int32) IProxy
- func (ss *Service) CreateProxyByNodeKind(nAddr INodeAddr, name string) IProxy
- func (ss *Service) CreateProxyByUpdaterKind(nAddrUpdater *AddrUpdater, name string) IProxy
- func (ss *Service) Debugf(format string, args ...any)
- func (ss *Service) EnableHttpRPCForward(srvAddresses []int32)
- func (ss *Service) EnableHttpRpc()
- func (ss *Service) EnableRpc()
- func (ss *Service) Entry(ctx IRpcContext, funcName string, ...) func()
- func (ss *Service) Errorf(format string, args ...any)
- func (ss *Service) Fatalf(format string, args ...any)
- func (ss *Service) Fork(tag string, f func()) bool
- func (ss *Service) GetAddr() int32
- func (ss *Service) GetKind() int32
- func (ss *Service) GetMillisecond() int64
- func (ss *Service) GetName() string
- func (ss *Service) GetSecond() int64
- func (ss *Service) GetTime() time.Time
- func (ss *Service) InFlightRequests() int64
- func (ss *Service) Infof(format string, args ...any)
- func (ss *Service) Paused() bool
- func (ss *Service) RpcReload(ctx IRpcContext)
- func (ss *Service) RpcStatus(ctx IRpcContext)
- func (ss *Service) SetAllowedRPC(names []string)
- func (ss *Service) Start(_ any)
- func (ss *Service) Stop(_ *sync.WaitGroup)
- func (ss *Service) Tick(interval, delay time.Duration, f func()) ITimeWheelHandle
- func (ss *Service) TickAfter(interval time.Duration, f func(func())) ITimeWheelHandle
- func (ss *Service) TickDelayRandom(interval time.Duration, f func()) ITimeWheelHandle
- func (ss *Service) Tracef(format string, args ...any)
- func (ss *Service) Warnf(format string, args ...any)
- type ServiceName
- type ServiceRegisterInfo
Constants ¶
const TickInterval = 5 * time.Millisecond
Variables ¶
var ( AddrLocal = Addr(0) AddrRemote = Addr(-1) AddrInvalid = Addr(-2) )
var ( ErrServiceNotExist = NewError(ErrServiceNotFound, "service not exist") ErrNodeMessageChanFull = NewError(ErrTransport, "node message channel full") ErrRequestTimeoutRemote = NewError(ErrTimeout, "session timeout from remote") ErrRequestTimeoutLocal = NewError(ErrTimeout, "session timeout from local") ErrRequestCancelled = NewError(ErrCancelled, "request cancelled by context") )
var Config = &nodeConfig{ CurNodeMap: map[string]bool{}, }
var ErrRemoteDisconnected = NewError(ErrTransport, "remote disconnected")
Functions ¶
func AddNode ¶
func AddNode(b host.IBuilder, registerFactory func() *RegisterOption)
func CheckedServiceNilPtr ¶
func GetRegisteredService ¶
func GetRegisteredService() []string
GetRegisteredService 获取所有自动注册的服务名称列表。 可直接用于 ElementOption.Services。
func NewService ¶
func NodeTracer ¶
NodeTracer 返回节点包的 OTel Tracer。 调用方需提前通过 otel.SetTracerProvider() 配置全局 TracerProvider。
func Register ¶
Register 自动注册服务,在服务包的 init() 中调用。
Kind 自动递增分配,Name 为服务名。 可选 setup 回调在 RegisterService 时执行,用于绑定配置等构建期操作。
用法:
func init() {
node.Register[MyService, *MyService]("MyService")
}
带配置绑定:
func init() {
node.Register[MyService, *MyService]("MyService", func(b host.IBuilder) {
host.AddOption[MyConfig](b, "MyService")
})
}
func RegisterService ¶
func RegisterService(b host.IBuilder, opts ...func(*RegisterOption))
RegisterService 自动注册所有服务到节点。 替代手动 AddNode + 逐个 CheckedServiceRegisterInfoName 的方式。 可选 opts 用于设置 RegisterOption 的其他字段(如 Preprocessor 等)。
func StartService ¶
StartService 快速启动一个服务,保证异步调用到 Service 的 Start,由用户保证完整、正确启动
Types ¶
type AddrUpdater ¶
type AddrUpdater struct {
// contains filtered or unexported fields
}
func NewNodeAddrUpdater ¶
func NewNodeAddrUpdater(nAddr Addr, updateFunc func(chan<- Addr)) *AddrUpdater
func (*AddrUpdater) GetNodeAddr ¶
func (ss *AddrUpdater) GetNodeAddr() Addr
func (*AddrUpdater) Start ¶
func (ss *AddrUpdater) Start()
type ElementOption ¶
type ElementOption struct {
Order int `snow:"Order"` // 节点顺序,节点服务的查找按 Order 值从小到大依次进行
Host string `snow:"Host"` // 节点主机名,可以是 IP,若当前节点的 Host 为空,则节点 Tcp 监听 LocalIP
Port int `snow:"Port"` // 节点 Tcp 端口
HttpPort int `snow:"HttpPort"` // 节点 Http 端口
UseHttps bool `snow:"UseHttps"` // 节点是否为 Https
Services []string `snow:"Services"` // 节点包含的服务,若为当前节点则代表要启动的服务;服务按照顺序启动,逆序关闭
}
type ErrorCode ¶
type ErrorCode string
ErrorCode 用于稳定的错误聚合与告警统计。
const ( ErrUnknown ErrorCode = "UNKNOWN" ErrTimeout ErrorCode = "TIMEOUT" ErrServiceNotFound ErrorCode = "SERVICE_NOT_FOUND" ErrCodec ErrorCode = "CODEC" ErrTransport ErrorCode = "TRANSPORT" ErrCancelled ErrorCode = "CANCELLED" ErrDraining ErrorCode = "DRAINING" ErrInvalidArgument ErrorCode = "INVALID_ARGUMENT" ErrInternal ErrorCode = "INTERNAL" )
type ICodec ¶
type ICodec interface {
// Marshal 将对象编码为字节串。
Marshal(v any) ([]byte, error)
// Unmarshal 将字节串解码到对象。
Unmarshal(data []byte, v any) error
// Name 返回编解码器名称(如 "json"、"msgpack"),用于日志与调试。
Name() string
}
ICodec RPC 消息体编解码接口,用于 TCP 二进制协议的参数序列化。 HTTP RPC 始终使用 JSON(因 Content-Type 语义绑定),不受此接口影响。
type IMetricCollector ¶
type IPromise ¶
type IPromise interface {
Then(f any) IPromise
Catch(f func(error)) IPromise
Final(f func()) IPromise
// WithContext 绑定显式 Context,覆盖默认的 Service 生命周期 Context。
// 用于自定义超时(context.WithTimeout)、上游取消传播、或附加 trace 信息。
// 未调用时,框架自动使用发起方 Service 的 Context 作为父级。
WithContext(ctx context.Context) IPromise
Done()
}
type IRpcContext ¶
type IServiceDiscovery ¶
type IServiceDiscovery interface {
// Resolve 根据服务名解析目标节点地址。
// 返回 AddrInvalid 或 error 时框架回退到静态表查找。
Resolve(serviceName string) (INodeAddr, error)
// Deregister 停机时注销自身(由 Node.Stop 调用)。传入当前节点地址和服务列表。
Deregister(nodeAddr INodeAddr, services []string)
}
IServiceDiscovery 服务发现接口,用于运行时动态解析服务名到节点地址。 未配置时框架回退到静态配置表(Config.Nodes),二者可共存。
type ITimeWheelHandle ¶
type ITimeWheelHandle interface {
Stop()
}
type MsgpackCodec ¶
type MsgpackCodec struct{}
MsgpackCodec 基于 vmihailenco/msgpack/v5 的二进制编解码器。 相比 JSON,序列化体积更小、编解码速度更快。 使用时通过 RegisterOption.Codec 注入。
func (MsgpackCodec) Name ¶
func (MsgpackCodec) Name() string
type Option ¶
type Option struct {
LocalIP string `snow:"LocalIP"` // 内网 ip,用于判断 RPC 连接是否是本地
ProfileListenHost string `snow:"ProfileListenHost"` // Profile 监听地址,为空表示不监听
ProfileListenMinPort int `snow:"ProfileListenMinPort"` // Profile 监听动态最小端口
ProfileListenMaxPort int `snow:"ProfileListenMaxPort"` // Profile 监听动态最大端口,包含;若使用固定端口,则应该与最小端口一致
HttpKeepAliveSeconds int `snow:"HttpKeepAliveSeconds"` // 节点 Http 服务保活时间
HttpTimeoutSeconds int `snow:"HttpTimeoutSeconds"` // 节点 Http 服务超时时间
HttpDebug bool `snow:"HttpDebug"` // 节点 Http 是否为调试模式
StopDrainTimeoutSec int `snow:"StopDrainTimeoutSec"` // Drain 等待在途请求最大秒数
StopDrainPollMs int `snow:"StopDrainPollMs"` // Drain 轮询间隔毫秒
BootName string `snow:"BootName"` // 启动节点名
Nodes map[string]*ElementOption `snow:"Nodes"` // 当前关注的节点信息
}
type RegisterOption ¶
type RegisterOption struct {
ServiceRegisterInfos []*ServiceRegisterInfo
ServiceDependencies map[string][]string // key 依赖 value,A:[B] 表示 A 依赖 B
ClientHandlePreprocessor xnet.IPreprocessor
ServerHandlePreprocessor xnet.IPreprocessor
PostInitializer func()
MetricCollector IMetricCollector
// Codec TCP RPC 参数编解码器,默认 JsonCodec(基于 xjson)。
// HTTP RPC 始终使用 JSON,不受此字段影响。
Codec ICodec
// ServiceDiscovery 可选的服务发现实现(etcd / Consul / ZooKeeper 等)。
// 配置后 CreateProxy 优先通过 Discovery 解析服务地址;
// 未配置或解析失败时回退到静态配置表。
ServiceDiscovery IServiceDiscovery
// Tracer 可选的 OTel Tracer,nil 表示不开启链路追踪(零开销)。
// 调用方需提前通过 otel.SetTracerProvider() 配置全局 TracerProvider,
// 或直接传入 NodeTracer() 返回的 Tracer。
Tracer trace.Tracer
}
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
func (*Service) After ¶
func (ss *Service) After(delay time.Duration, f func()) ITimeWheelHandle
After 定时器,延迟 delay 时间后执行函数 f,非线程安全
func (*Service) CreateEmptyProxy ¶
CreateEmptyProxy 创建空代理,线程安全
func (*Service) CreateHttpProxy ¶
func (*Service) CreateProxy ¶
CreateProxy 根据服务名自动创建代理,线程安全
func (*Service) CreateProxyByNodeAddr ¶
CreateProxyByNodeAddr 根据节点地址及服务地址创建代理,线程安全
func (*Service) CreateProxyByNodeKind ¶
CreateProxyByNodeKind 根据节点地址及服务名创建代理,线程安全
func (*Service) CreateProxyByUpdaterKind ¶
func (ss *Service) CreateProxyByUpdaterKind(nAddrUpdater *AddrUpdater, name string) IProxy
CreateProxyByUpdaterKind 根据节点地址更新器及服务名创建代理,线程安全
func (*Service) EnableHttpRPCForward ¶
EnableHttpRPCForward 设置 HTTP RPC 转发可用,此时队列中的 RPC 会开始执行转发,非线程安全
func (*Service) EnableHttpRpc ¶
func (ss *Service) EnableHttpRpc()
EnableHttpRpc 设置 HTTP RPC 可用,此时队列中的 RPC 会开始执行,非线程安全
func (*Service) EnableRpc ¶
func (ss *Service) EnableRpc()
EnableRpc 设置 RPC 可用,此时队列中的 RPC 会开始执行,非线程安全
func (*Service) GetMillisecond ¶
GetMillisecond 获取当前时间毫秒,非线程安全
func (*Service) InFlightRequests ¶
func (*Service) RpcReload ¶
func (ss *Service) RpcReload(ctx IRpcContext)
RpcReload 重载服务配置 RPC 的默认实现
func (*Service) RpcStatus ¶
func (ss *Service) RpcStatus(ctx IRpcContext)
RpcStatus 获取服务状态 RPC 的默认实现
func (*Service) SetAllowedRPC ¶
SetAllowedRPC 设置允许调用的 RPC 函数,不含 "Rpc" 头,非线程安全
func (*Service) Tick ¶
func (ss *Service) Tick(interval, delay time.Duration, f func()) ITimeWheelHandle
Tick 定时器,延迟 delay 时间并在后续以 interval 时间间隔执行函数 f,间隔时间与函数执行时间无关,非线程安全
func (*Service) TickAfter ¶
func (ss *Service) TickAfter(interval time.Duration, f func(func())) ITimeWheelHandle
TickAfter 定时器,延迟 interval 时间后执行函数 f,若传入 f 的函数参数被执行,则延迟 interval 后继续执行函数 f,非线程安全
func (*Service) TickDelayRandom ¶
func (ss *Service) TickDelayRandom(interval time.Duration, f func()) ITimeWheelHandle
TickDelayRandom 定时器,延迟 [0, interval) 随机时间并在后续以 interval 时间间隔执行函数 f,间隔时间与函数执行时间无关,非线程安全
type ServiceName ¶
type ServiceName = string
type ServiceRegisterInfo ¶
func CheckedServiceRegisterInfo ¶
func CheckedServiceRegisterInfo[T any, U consService[T]](kind int32) *ServiceRegisterInfo
func CheckedServiceRegisterInfoName ¶
func CheckedServiceRegisterInfoName[T any, U consService[T]](kind int32, name string) *ServiceRegisterInfo