protocol

package
v2.0.42 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 18, 2025 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ErrCodeInvalidLength = 1001 // 无效的消息长度
	ErrCodeTimeout       = 1002 // 超时错误
	ErrCodeSerialize     = 1003 // 序列化错误
	ErrCodeDeserialize   = 1004 // 反序列化错误
	ErrCodeRead          = 1005 // 读取错误
	ErrCodeWrite         = 1006 // 写入错误
	ErrCodeConnection    = 1007 // 连接错误
	ErrCodePanic         = 1008 // Panic错误
	ErrCodeHeartbeat     = 1009 // 心跳错误
	ErrCodeReconnect     = 1010 // 重连错误
	ErrCodeChecksum      = 1011 // 校验和错误
	ErrCodeVersion       = 1012 // 版本错误
	ErrCodeSequence      = 1013 // 序列号错误
	ErrCodeMessageAge    = 1014 // 消息过期
	ErrCodeHandshake     = 1015 // 握手错误
	ErrCodeFlowControl   = 1016 // 流量控制错误
	ErrCodeQueueFull     = 1017 // 队列已满
	ErrCodeCompression   = 1018 // 压缩错误
	ErrCodeSize          = 1019 // 消息大小错误

)

定义错误码

View Source
const (
	// 握手状态码
	HandshakeStatusOK           = 0
	HandshakeStatusVersionError = 1
	HandshakeStatusAuthError    = 2
	HandshakeStatusError        = 3

	// 握手超时时间
	HandshakeTimeout = 10 * time.Second
)
View Source
const (
	// 协议常量定义
	MaxMessageSize = 100 * 1024 * 1024 // 最大消息大小(100MB)
	DefaultTimeout = 30 * time.Second  // 默认超时时间
	HeaderSize     = 4                 // 消息头大小(用于长度)
	MaxRetries     = 3                 // 最大重试次数

	// 连接状态
	StatusConnected = iota
	StatusDisconnected
	StatusReconnecting

	// 默认配置
	DefaultBufferSize = 2 * 1024 * 1024 // 增大缓冲区到2MB
	DefaultMaxConns   = 1000
	DefaultQueueSize  = 1000

	CurrentVersion = 1           // 当前协议版本
	MaxMessageAge  = time.Minute // 消息最大有效期

	// 添加功能位图常量
	FeatureCompression = 1 << iota // 支持压缩
	FeatureEncryption              // 支持加密
	FeatureHeartbeat               // 支持心跳
	FeatureReconnect               // 支持重连

	// 修改速率限制相关配置
	RateLimitBurst = 20 * 1024 * 1024 // 突发限制提高到20MB
	RateLimitRate  = 50 * 1024 * 1024 // 速率限制提高到50MB/s

	// 添加常量定义
	MaxLogDataLength = 200 // 日志中显示的最大数据长度
)

Variables

This section is empty.

Functions

func IsRecoverable

func IsRecoverable(err error) bool

IsRecoverable 判断错误是否可恢复

func IsTemporaryError

func IsTemporaryError(err error) bool

IsTemporaryError 判断是否为临时错误

func RetryWithBackoff

func RetryWithBackoff(operation func() error) error

RetryWithBackoff 实现指数退避重试

func WithTimeout

func WithTimeout(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc)

WithTimeout 设置超时上下文

Types

type AtomicDuration

type AtomicDuration struct {
	// contains filtered or unexported fields
}

AtomicDuration 原子操作的 Duration 类型

func (*AtomicDuration) Add

Add 添加 Duration

func (*AtomicDuration) Load

func (d *AtomicDuration) Load() time.Duration

Load 加载 Duration

func (*AtomicDuration) Store

func (d *AtomicDuration) Store(val time.Duration)

Store 存储 Duration

type BackPressure

type BackPressure struct {
	// contains filtered or unexported fields
}

添加背压控制

type CompressedMessage

type CompressedMessage struct {
	Type       CompressionType
	Original   uint32
	Compressed []byte
}

CompressedMessage 压缩消息包装

func (*CompressedMessage) Marshal

func (m *CompressedMessage) Marshal() ([]byte, error)

Marshal 实现Message接口

func (*CompressedMessage) Unmarshal

func (m *CompressedMessage) Unmarshal(data []byte) error

Unmarshal 实现Message接口

type CompressionType

type CompressionType uint8

CompressionType 压缩类型

const (
	CompressNone CompressionType = iota
	CompressGzip
	CompressSnappy
	CompressLZ4
)

type Compressor

type Compressor interface {
	Compress([]byte) ([]byte, error)
	Decompress([]byte) ([]byte, error)
	Type() CompressionType
}

Compressor 压缩器接口

type FlowController

type FlowController struct {
	// contains filtered or unexported fields
}

FlowController 流量控制器

func NewFlowController

func NewFlowController(rate, window, threshold int64) *FlowController

NewFlowController 创建流量控制器

func (*FlowController) Acquire

func (fc *FlowController) Acquire(size int64) error

Acquire 获取发送许可

func (*FlowController) Release

func (fc *FlowController) Release(size int64)

Release 释放资源

type GzipCompressor

type GzipCompressor struct {
	// contains filtered or unexported fields
}

GzipCompressor Gzip压缩实现

func NewGzipCompressor

func NewGzipCompressor(level int) *GzipCompressor

NewGzipCompressor 创建Gzip压缩器

func (*GzipCompressor) Compress

func (c *GzipCompressor) Compress(data []byte) ([]byte, error)

Compress 压缩数据

func (*GzipCompressor) Decompress

func (c *GzipCompressor) Decompress(data []byte) ([]byte, error)

Decompress 解压数据

func (*GzipCompressor) Type

func (c *GzipCompressor) Type() CompressionType

Type 返回压缩类型

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler 处理协议通信

func NewHandler

func NewHandler(conn net.Conn, opts *Options) *Handler

NewHandler 创建新的协议处理器

func (*Handler) Close

func (h *Handler) Close() error

Close 关闭连接

func (*Handler) GetFeatures

func (h *Handler) GetFeatures() uint32

GetFeatures 获取当前功能位图

func (*Handler) GetMetrics

func (h *Handler) GetMetrics() *Metrics

GetMetrics 获取指标统计

func (*Handler) GetQueueMetrics

func (h *Handler) GetQueueMetrics() (send, receive QueueMetrics)

GetQueueMetrics 获取队列指标

func (*Handler) GracefulClose

func (h *Handler) GracefulClose(timeout time.Duration) error

GracefulClose 优雅关闭

func (*Handler) Handshake

func (h *Handler) Handshake(authData []byte) error

Handshake 执行握手

func (*Handler) IsFeatureEnabled

func (h *Handler) IsFeatureEnabled(feature uint32) bool

IsFeatureEnabled 检查功能是否启用

func (*Handler) ReceiveMessage

func (h *Handler) ReceiveMessage(msg Message) error

ReceiveMessage 接收消息

func (*Handler) Reconnect

func (h *Handler) Reconnect(ctx context.Context) error

Reconnect 重新连接

func (*Handler) SafeHandle

func (h *Handler) SafeHandle(operation func() error) (err error)

SafeHandle 安全处理通信

func (*Handler) SendMessage

func (h *Handler) SendMessage(msg Message) error

SendMessage 发送消息

func (*Handler) SetCompressor

func (h *Handler) SetCompressor(c Compressor)

SetCompressor 设置压缩器

func (*Handler) SetDeadline

func (h *Handler) SetDeadline(t time.Time) error

SetDeadline 设置超时

func (*Handler) StartHeartbeat

func (h *Handler) StartHeartbeat()

StartHeartbeat 开始心跳

func (*Handler) StartQueueProcessor

func (h *Handler) StartQueueProcessor(ctx context.Context)

StartQueueProcessor 启动队列处理器

func (*Handler) StopHeartbeat

func (h *Handler) StopHeartbeat()

StopHeartbeat 停止心跳

type HandshakeRequest

type HandshakeRequest struct {
	Version   uint16 // 协议版本
	Timestamp int64  // 请求时间戳
	Features  uint32 // 功能位图
	AuthData  []byte // 认证数据
}

HandshakeRequest 握手请求

func (*HandshakeRequest) Marshal

func (r *HandshakeRequest) Marshal() ([]byte, error)

实现 Message 接口

func (*HandshakeRequest) Unmarshal

func (r *HandshakeRequest) Unmarshal(data []byte) error

type HandshakeResponse

type HandshakeResponse struct {
	Status   uint16 // 状态码
	Version  uint16 // 协议版本
	Features uint32 // 支持的功能
	Message  string // 响应消息
}

HandshakeResponse 握手响应

func (*HandshakeResponse) Marshal

func (r *HandshakeResponse) Marshal() ([]byte, error)

实现 Message 接口

func (*HandshakeResponse) Unmarshal

func (r *HandshakeResponse) Unmarshal(data []byte) error

type HeartbeatMessage

type HeartbeatMessage struct {
	Timestamp int64
}

HeartbeatMessage 心跳消息

func (*HeartbeatMessage) Marshal

func (m *HeartbeatMessage) Marshal() ([]byte, error)

Marshal 实现Message接口

func (*HeartbeatMessage) Unmarshal

func (m *HeartbeatMessage) Unmarshal(data []byte) error

Unmarshal 实现Message接口

type Message

type Message interface {
	Marshal() ([]byte, error) // 序列化
	Unmarshal([]byte) error   // 反序列化
}

Message 定义协议消息接口

type MessageQueue

type MessageQueue struct {
	// contains filtered or unexported fields
}

MessageQueue 消息队列

func NewMessageQueue

func NewMessageQueue(size int, policy QueuePolicy) *MessageQueue

NewMessageQueue 创建消息队列

func (*MessageQueue) Clear

func (q *MessageQueue) Clear()

Clear 清空队列

func (*MessageQueue) Close

func (q *MessageQueue) Close()

Close 关闭队列

func (*MessageQueue) Get

func (q *MessageQueue) Get(ctx context.Context) (*QueueItem, error)

Get 获取消息

func (*MessageQueue) GetWithTimeout

func (q *MessageQueue) GetWithTimeout(ctx context.Context, timeout time.Duration) (*QueueItem, error)

GetWithTimeout 带超时的消息获取

func (*MessageQueue) IsFull

func (q *MessageQueue) IsFull() bool

IsFull 检查队列是否已满

func (*MessageQueue) Len

func (q *MessageQueue) Len() int

Len 获取当前队列长度

func (*MessageQueue) Metrics

func (q *MessageQueue) Metrics() QueueMetrics

Metrics 获取队列指标

func (*MessageQueue) Put

func (q *MessageQueue) Put(ctx context.Context, msg Message, priority int) error

Put 放入消息

func (*MessageQueue) PutWithTimeout

func (q *MessageQueue) PutWithTimeout(ctx context.Context, msg Message, priority int, timeout time.Duration) error

PutWithTimeout 带超时的消息放入

type MessageTracker

type MessageTracker struct {
	// contains filtered or unexported fields
}

添加消息序列号和去重处理

func NewMessageTracker

func NewMessageTracker(maxSize int) *MessageTracker

NewMessageTracker 创建消息跟踪器

func (*MessageTracker) IsDuplicate

func (t *MessageTracker) IsDuplicate(seq uint64) bool

IsDuplicate 检查消息是否重复

func (*MessageTracker) IsOrdered

func (t *MessageTracker) IsOrdered(seq uint64) bool

IsOrdered 检查消息是否有序

func (*MessageTracker) Track

func (t *MessageTracker) Track(seq uint64)

Track 跟踪消息序列号

type MessageWrapper

type MessageWrapper struct {
	Version   uint16 // 协议版本
	Checksum  uint32 // 校验和
	Timestamp int64  // 时间戳
	Sequence  uint64 // 序列号
	Payload   []byte // 消息内容
}

MessageWrapper 消息包装器

func (*MessageWrapper) Marshal

func (w *MessageWrapper) Marshal() ([]byte, error)

Marshal 序列化消息包装器

func (*MessageWrapper) Unmarshal

func (w *MessageWrapper) Unmarshal(data []byte) error

Unmarshal 反序列化消息包装器

func (*MessageWrapper) Validate

func (w *MessageWrapper) Validate() error

Validate 验证消息

type Metrics

type Metrics struct {
	ActiveConnections atomic.Int64
	MessagesSent      atomic.Int64
	MessagesReceived  atomic.Int64
	Errors            atomic.Int64
	Retries           atomic.Int64
	BufferUsage       atomic.Int64
	LastError         error
}

Metrics 统计指标

type Options

type Options struct {
	Recovery       *RecoveryHandler
	Reconnect      bool
	MaxRetries     int
	RetryDelay     time.Duration
	HeartBeat      time.Duration
	DialTimeout    time.Duration
	WriteTimeout   time.Duration
	ReadTimeout    time.Duration
	ProcessTimeout time.Duration
	MaxConns       int
	Rate           int64
	Window         int64
	Threshold      int64
	QueueSize      int
	QueuePolicy    QueuePolicy
	RetryConfig    *RetryConfig
}

Options 配置选项

type ProtocolError

type ProtocolError struct {
	Code    int
	Message string
	Err     error
}

ProtocolError 定义协议错误类型

func (*ProtocolError) Error

func (e *ProtocolError) Error() string

Error 实现error接口

type QueueItem

type QueueItem struct {
	Message  Message
	Deadline time.Time
	Priority int
	Context  context.Context
}

QueueItem 队列项

type QueueMetrics

type QueueMetrics struct {
	Length    atomic.Int64
	Dropped   atomic.Int64
	Processed atomic.Int64
	WaitTime  AtomicDuration
}

QueueMetrics 队列指标

type QueuePolicy

type QueuePolicy int

QueuePolicy 队列策略

const (
	PolicyBlock     QueuePolicy = iota // 阻塞
	PolicyDrop                         // 丢弃
	PolicyOverwrite                    // 覆盖
)

type RecoveryHandler

type RecoveryHandler struct {
	MaxRetries    int                       // 最大重试次数
	RetryInterval time.Duration             // 重试间隔
	OnPanic       func(interface{}, []byte) // panic处理回调
	OnDisconnect  func(error)               // 断开连接处理
}

RecoveryHandler 处理panic恢复

type RetryConfig

type RetryConfig struct {
	MaxRetries int
	RetryDelay time.Duration
}

RetryConfig 定义重试配置

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL