Documentation
¶
Index ¶
- Constants
- func IsRecoverable(err error) bool
- func IsTemporaryError(err error) bool
- func RetryWithBackoff(operation func() error) error
- func WithTimeout(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc)
- type AtomicDuration
- type BackPressure
- type CompressedMessage
- type CompressionType
- type Compressor
- type FlowController
- type GzipCompressor
- type Handler
- func (h *Handler) Close() error
- func (h *Handler) GetFeatures() uint32
- func (h *Handler) GetMetrics() *Metrics
- func (h *Handler) GetQueueMetrics() (send, receive QueueMetrics)
- func (h *Handler) GracefulClose(timeout time.Duration) error
- func (h *Handler) Handshake(authData []byte) error
- func (h *Handler) IsFeatureEnabled(feature uint32) bool
- func (h *Handler) ReceiveMessage(msg Message) error
- func (h *Handler) Reconnect(ctx context.Context) error
- func (h *Handler) SafeHandle(operation func() error) (err error)
- func (h *Handler) SendMessage(msg Message) error
- func (h *Handler) SetCompressor(c Compressor)
- func (h *Handler) SetDeadline(t time.Time) error
- func (h *Handler) StartHeartbeat()
- func (h *Handler) StartQueueProcessor(ctx context.Context)
- func (h *Handler) StopHeartbeat()
- type HandshakeRequest
- type HandshakeResponse
- type HeartbeatMessage
- type Message
- type MessageQueue
- func (q *MessageQueue) Clear()
- func (q *MessageQueue) Close()
- func (q *MessageQueue) Get(ctx context.Context) (*QueueItem, error)
- func (q *MessageQueue) GetWithTimeout(ctx context.Context, timeout time.Duration) (*QueueItem, error)
- func (q *MessageQueue) IsFull() bool
- func (q *MessageQueue) Len() int
- func (q *MessageQueue) Metrics() QueueMetrics
- func (q *MessageQueue) Put(ctx context.Context, msg Message, priority int) error
- func (q *MessageQueue) PutWithTimeout(ctx context.Context, msg Message, priority int, timeout time.Duration) error
- type MessageTracker
- type MessageWrapper
- type Metrics
- type Options
- type ProtocolError
- type QueueItem
- type QueueMetrics
- type QueuePolicy
- type RecoveryHandler
- type RetryConfig
Constants ¶
const ( ErrCodeInvalidLength = 1001 // 无效的消息长度 ErrCodeTimeout = 1002 // 超时错误 ErrCodeSerialize = 1003 // 序列化错误 ErrCodeDeserialize = 1004 // 反序列化错误 ErrCodeRead = 1005 // 读取错误 ErrCodeWrite = 1006 // 写入错误 ErrCodeConnection = 1007 // 连接错误 ErrCodePanic = 1008 // Panic错误 ErrCodeHeartbeat = 1009 // 心跳错误 ErrCodeReconnect = 1010 // 重连错误 ErrCodeChecksum = 1011 // 校验和错误 ErrCodeVersion = 1012 // 版本错误 ErrCodeSequence = 1013 // 序列号错误 ErrCodeMessageAge = 1014 // 消息过期 ErrCodeHandshake = 1015 // 握手错误 ErrCodeFlowControl = 1016 // 流量控制错误 ErrCodeQueueFull = 1017 // 队列已满 ErrCodeCompression = 1018 // 压缩错误 ErrCodeSize = 1019 // 消息大小错误 )
定义错误码
const ( // 握手状态码 HandshakeStatusOK = 0 HandshakeStatusVersionError = 1 HandshakeStatusAuthError = 2 HandshakeStatusError = 3 // 握手超时时间 HandshakeTimeout = 10 * time.Second )
const ( // 协议常量定义 MaxMessageSize = 100 * 1024 * 1024 // 最大消息大小(100MB) DefaultTimeout = 30 * time.Second // 默认超时时间 HeaderSize = 4 // 消息头大小(用于长度) MaxRetries = 3 // 最大重试次数 // 连接状态 StatusConnected = iota StatusDisconnected StatusReconnecting // 默认配置 DefaultBufferSize = 2 * 1024 * 1024 // 增大缓冲区到2MB DefaultMaxConns = 1000 DefaultQueueSize = 1000 CurrentVersion = 1 // 当前协议版本 MaxMessageAge = time.Minute // 消息最大有效期 // 添加功能位图常量 FeatureCompression = 1 << iota // 支持压缩 FeatureEncryption // 支持加密 FeatureHeartbeat // 支持心跳 FeatureReconnect // 支持重连 // 修改速率限制相关配置 RateLimitBurst = 20 * 1024 * 1024 // 突发限制提高到20MB RateLimitRate = 50 * 1024 * 1024 // 速率限制提高到50MB/s // 添加常量定义 MaxLogDataLength = 200 // 日志中显示的最大数据长度 )
Variables ¶
This section is empty.
Functions ¶
func RetryWithBackoff ¶
RetryWithBackoff 实现指数退避重试
func WithTimeout ¶
func WithTimeout(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc)
WithTimeout 设置超时上下文
Types ¶
type AtomicDuration ¶
type AtomicDuration struct {
// contains filtered or unexported fields
}
AtomicDuration 原子操作的 Duration 类型
type CompressedMessage ¶
type CompressedMessage struct { Type CompressionType Original uint32 Compressed []byte }
CompressedMessage 压缩消息包装
func (*CompressedMessage) Marshal ¶
func (m *CompressedMessage) Marshal() ([]byte, error)
Marshal 实现Message接口
func (*CompressedMessage) Unmarshal ¶
func (m *CompressedMessage) Unmarshal(data []byte) error
Unmarshal 实现Message接口
type CompressionType ¶
type CompressionType uint8
CompressionType 压缩类型
const ( CompressNone CompressionType = iota CompressGzip CompressSnappy CompressLZ4 )
type Compressor ¶
type Compressor interface { Compress([]byte) ([]byte, error) Decompress([]byte) ([]byte, error) Type() CompressionType }
Compressor 压缩器接口
type FlowController ¶
type FlowController struct {
// contains filtered or unexported fields
}
FlowController 流量控制器
func NewFlowController ¶
func NewFlowController(rate, window, threshold int64) *FlowController
NewFlowController 创建流量控制器
type GzipCompressor ¶
type GzipCompressor struct {
// contains filtered or unexported fields
}
GzipCompressor Gzip压缩实现
func NewGzipCompressor ¶
func NewGzipCompressor(level int) *GzipCompressor
NewGzipCompressor 创建Gzip压缩器
func (*GzipCompressor) Compress ¶
func (c *GzipCompressor) Compress(data []byte) ([]byte, error)
Compress 压缩数据
func (*GzipCompressor) Decompress ¶
func (c *GzipCompressor) Decompress(data []byte) ([]byte, error)
Decompress 解压数据
type Handler ¶
type Handler struct {
// contains filtered or unexported fields
}
Handler 处理协议通信
func (*Handler) GetQueueMetrics ¶
func (h *Handler) GetQueueMetrics() (send, receive QueueMetrics)
GetQueueMetrics 获取队列指标
func (*Handler) GracefulClose ¶
GracefulClose 优雅关闭
func (*Handler) IsFeatureEnabled ¶
IsFeatureEnabled 检查功能是否启用
func (*Handler) ReceiveMessage ¶
ReceiveMessage 接收消息
func (*Handler) SafeHandle ¶
SafeHandle 安全处理通信
func (*Handler) StartQueueProcessor ¶
StartQueueProcessor 启动队列处理器
type HandshakeRequest ¶
type HandshakeRequest struct { Version uint16 // 协议版本 Timestamp int64 // 请求时间戳 Features uint32 // 功能位图 AuthData []byte // 认证数据 }
HandshakeRequest 握手请求
func (*HandshakeRequest) Marshal ¶
func (r *HandshakeRequest) Marshal() ([]byte, error)
实现 Message 接口
func (*HandshakeRequest) Unmarshal ¶
func (r *HandshakeRequest) Unmarshal(data []byte) error
type HandshakeResponse ¶
type HandshakeResponse struct { Status uint16 // 状态码 Version uint16 // 协议版本 Features uint32 // 支持的功能 Message string // 响应消息 }
HandshakeResponse 握手响应
func (*HandshakeResponse) Marshal ¶
func (r *HandshakeResponse) Marshal() ([]byte, error)
实现 Message 接口
func (*HandshakeResponse) Unmarshal ¶
func (r *HandshakeResponse) Unmarshal(data []byte) error
type HeartbeatMessage ¶
type HeartbeatMessage struct {
Timestamp int64
}
HeartbeatMessage 心跳消息
func (*HeartbeatMessage) Marshal ¶
func (m *HeartbeatMessage) Marshal() ([]byte, error)
Marshal 实现Message接口
func (*HeartbeatMessage) Unmarshal ¶
func (m *HeartbeatMessage) Unmarshal(data []byte) error
Unmarshal 实现Message接口
type MessageQueue ¶
type MessageQueue struct {
// contains filtered or unexported fields
}
MessageQueue 消息队列
func NewMessageQueue ¶
func NewMessageQueue(size int, policy QueuePolicy) *MessageQueue
NewMessageQueue 创建消息队列
func (*MessageQueue) Get ¶
func (q *MessageQueue) Get(ctx context.Context) (*QueueItem, error)
Get 获取消息
func (*MessageQueue) GetWithTimeout ¶
func (q *MessageQueue) GetWithTimeout(ctx context.Context, timeout time.Duration) (*QueueItem, error)
GetWithTimeout 带超时的消息获取
func (*MessageQueue) PutWithTimeout ¶
func (q *MessageQueue) PutWithTimeout(ctx context.Context, msg Message, priority int, timeout time.Duration) error
PutWithTimeout 带超时的消息放入
type MessageTracker ¶
type MessageTracker struct {
// contains filtered or unexported fields
}
添加消息序列号和去重处理
func NewMessageTracker ¶
func NewMessageTracker(maxSize int) *MessageTracker
NewMessageTracker 创建消息跟踪器
func (*MessageTracker) IsDuplicate ¶
func (t *MessageTracker) IsDuplicate(seq uint64) bool
IsDuplicate 检查消息是否重复
func (*MessageTracker) IsOrdered ¶
func (t *MessageTracker) IsOrdered(seq uint64) bool
IsOrdered 检查消息是否有序
type MessageWrapper ¶
type MessageWrapper struct { Version uint16 // 协议版本 Checksum uint32 // 校验和 Timestamp int64 // 时间戳 Sequence uint64 // 序列号 Payload []byte // 消息内容 }
MessageWrapper 消息包装器
func (*MessageWrapper) Marshal ¶
func (w *MessageWrapper) Marshal() ([]byte, error)
Marshal 序列化消息包装器
func (*MessageWrapper) Unmarshal ¶
func (w *MessageWrapper) Unmarshal(data []byte) error
Unmarshal 反序列化消息包装器
type Metrics ¶
type Metrics struct { ActiveConnections atomic.Int64 MessagesSent atomic.Int64 MessagesReceived atomic.Int64 Errors atomic.Int64 Retries atomic.Int64 BufferUsage atomic.Int64 LastError error }
Metrics 统计指标
type Options ¶
type Options struct { Recovery *RecoveryHandler Reconnect bool MaxRetries int RetryDelay time.Duration HeartBeat time.Duration DialTimeout time.Duration WriteTimeout time.Duration ReadTimeout time.Duration ProcessTimeout time.Duration MaxConns int Rate int64 Window int64 Threshold int64 QueueSize int QueuePolicy QueuePolicy RetryConfig *RetryConfig }
Options 配置选项
type ProtocolError ¶
ProtocolError 定义协议错误类型
type QueueMetrics ¶
type QueueMetrics struct { Length atomic.Int64 Dropped atomic.Int64 Processed atomic.Int64 WaitTime AtomicDuration }
QueueMetrics 队列指标
type QueuePolicy ¶
type QueuePolicy int
QueuePolicy 队列策略
const ( PolicyBlock QueuePolicy = iota // 阻塞 PolicyDrop // 丢弃 PolicyOverwrite // 覆盖 )
type RecoveryHandler ¶
type RecoveryHandler struct { MaxRetries int // 最大重试次数 RetryInterval time.Duration // 重试间隔 OnPanic func(interface{}, []byte) // panic处理回调 OnDisconnect func(error) // 断开连接处理 }
RecoveryHandler 处理panic恢复
type RetryConfig ¶
RetryConfig 定义重试配置