Documentation
¶
Overview ¶
Package mq 提供消息队列抽象层。
支持的消息队列:
- Kafka
- RabbitMQ
- Memory(内存队列,用于测试)
核心功能:
- 统一的生产者/消费者接口
- 消息发布/订阅、异步处理
- 链路追踪和指标采集
使用示例:
client := kafka.New("events", cfg)
client.Publish(ctx, "topic", data)
client.Subscribe(ctx, "topic", handler)
Index ¶
- type Base
- func (b *Base) CompareAndSwapState(old, new State) bool
- func (b *Base) IncConsumed()
- func (b *Base) IncErrors()
- func (b *Base) IncPublished()
- func (b *Base) IncRetries()
- func (b *Base) Name() string
- func (b *Base) SetState(s State)
- func (b *Base) State() State
- func (b *Base) Stats() Stats
- func (b *Base) Type() Type
- type Builder
- type Client
- type ConsumeResult
- type Consumer
- type Handler
- type HealthStatus
- type Manager
- type Message
- type Metriced
- func (m *Metriced) Close() error
- func (m *Metriced) Connect(ctx context.Context) error
- func (m *Metriced) Name() string
- func (m *Metriced) Ping(ctx context.Context) error
- func (m *Metriced) Publish(ctx context.Context, topic string, value []byte, opts ...PublishOption) (*PublishResult, error)
- func (m *Metriced) PublishAsync(ctx context.Context, topic string, value []byte, ...)
- func (m *Metriced) SetConsumerLag(topic, group string, lag float64)
- func (m *Metriced) State() State
- func (m *Metriced) Stats() Stats
- func (m *Metriced) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) error
- func (m *Metriced) Type() Type
- func (m *Metriced) Unsubscribe(topic string) error
- func (m *Metriced) Unwrap() Client
- type Metrics
- type Producer
- type PublishOption
- type PublishOptions
- type PublishResult
- type State
- type Stats
- type SubscribeOption
- type SubscribeOptions
- type Traced
- func (t *Traced) Close() error
- func (t *Traced) Connect(ctx context.Context) error
- func (t *Traced) Name() string
- func (t *Traced) Ping(ctx context.Context) error
- func (t *Traced) Publish(ctx context.Context, topic string, value []byte, opts ...PublishOption) (*PublishResult, error)
- func (t *Traced) PublishAsync(ctx context.Context, topic string, value []byte, ...)
- func (t *Traced) State() State
- func (t *Traced) Stats() Stats
- func (t *Traced) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) error
- func (t *Traced) Type() Type
- func (t *Traced) Unsubscribe(topic string) error
- func (t *Traced) Unwrap() Client
- type Type
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Base ¶
type Base struct {
// contains filtered or unexported fields
}
Base 客户端基类
func (*Base) CompareAndSwapState ¶
func (*Base) IncConsumed ¶
func (b *Base) IncConsumed()
func (*Base) IncPublished ¶
func (b *Base) IncPublished()
func (*Base) IncRetries ¶
func (b *Base) IncRetries()
type Client ¶
type Client interface {
Producer
Consumer
// Connect 建立连接
Connect(ctx context.Context) error
// Ping 健康检查
Ping(ctx context.Context) error
// Name 客户端名称
Name() string
// Type 队列类型
Type() Type
// State 连接状态
State() State
// Stats 统计信息
Stats() Stats
}
Client MQ 客户端接口(同时支持生产和消费)
type ConsumeResult ¶
type ConsumeResult struct {
Ack func() error
Nack func() error
Retry func(delay time.Duration) error
}
ConsumeResult 消费结果
type Consumer ¶
type Consumer interface {
// Subscribe 订阅主题
Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) error
// Unsubscribe 取消订阅
Unsubscribe(topic string) error
// Close 关闭消费者
Close() error
}
Consumer 消费者接口
type HealthStatus ¶
type HealthStatus struct {
Name string `json:"name"`
Type Type `json:"type"`
State string `json:"state"`
Healthy bool `json:"healthy"`
Latency time.Duration `json:"latency"`
Error string `json:"error,omitempty"`
CheckedAt time.Time `json:"checked_at"`
}
HealthStatus 健康状态
type Manager ¶
type Manager interface {
// Register 注册客户端
Register(client Client) error
// Get 获取客户端
Get(name string) (Client, bool)
// GetByType 按类型获取
GetByType(typ Type) []Client
// ConnectAll 连接所有客户端
ConnectAll(ctx context.Context) error
// CloseAll 关闭所有客户端
CloseAll(ctx context.Context) error
// HealthCheck 健康检查
HealthCheck(ctx context.Context) []HealthStatus
// List 列出所有客户端
List() []string
}
Manager MQ 管理器接口
type Message ¶
type Message struct {
ID string `json:"id"`
Topic string `json:"topic"`
Key string `json:"key,omitempty"`
Value []byte `json:"value"`
Headers map[string]string `json:"headers,omitempty"`
Timestamp time.Time `json:"timestamp"`
// 追踪信息
TraceID string `json:"trace_id,omitempty"`
SpanID string `json:"span_id,omitempty"`
// Raw 原始消息(用于 Ack/Nack)
Raw any `json:"-"`
}
Message 消息
type Metriced ¶
type Metriced struct {
// contains filtered or unexported fields
}
Metriced 指标装饰器
func (*Metriced) Publish ¶
func (m *Metriced) Publish(ctx context.Context, topic string, value []byte, opts ...PublishOption) (*PublishResult, error)
func (*Metriced) PublishAsync ¶
func (m *Metriced) PublishAsync(ctx context.Context, topic string, value []byte, callback func(*PublishResult, error), opts ...PublishOption)
func (*Metriced) SetConsumerLag ¶
SetConsumerLag 设置消费延迟
func (*Metriced) Unsubscribe ¶
type Metrics ¶
type Metrics struct {
MessagesTotal observability.Counter
MessageDuration observability.Histogram
MessageSize observability.Histogram
ConsumerLag observability.Gauge
}
Metrics MQ 指标
type Producer ¶
type Producer interface {
// Publish 发布消息
Publish(ctx context.Context, topic string, value []byte, opts ...PublishOption) (*PublishResult, error)
// PublishAsync 异步发布
PublishAsync(ctx context.Context, topic string, value []byte, callback func(*PublishResult, error), opts ...PublishOption)
// Close 关闭生产者
Close() error
}
Producer 生产者接口
type PublishOptions ¶
type PublishOptions struct {
Key string
Headers map[string]string
Partition *int32
Delay time.Duration
}
PublishOptions 发布选项结构
type PublishResult ¶
PublishResult 发布结果
type Stats ¶
type Stats struct {
Published int64 `json:"published"`
Consumed int64 `json:"consumed"`
Errors int64 `json:"errors"`
Retries int64 `json:"retries"`
PendingCount int64 `json:"pending_count"`
ConsumerCount int `json:"consumer_count"`
}
Stats 统计信息
type SubscribeOptions ¶
type SubscribeOptions struct {
Group string
Concurrency int
AutoAck bool
MaxRetries int
RetryDelay time.Duration
}
SubscribeOptions 订阅选项结构
func DefaultSubscribeOptions ¶
func DefaultSubscribeOptions() SubscribeOptions
DefaultSubscribeOptions 默认订阅选项
type Traced ¶
type Traced struct {
// contains filtered or unexported fields
}
Traced 追踪装饰器
func (*Traced) Publish ¶
func (t *Traced) Publish(ctx context.Context, topic string, value []byte, opts ...PublishOption) (*PublishResult, error)
func (*Traced) PublishAsync ¶
func (t *Traced) PublishAsync(ctx context.Context, topic string, value []byte, callback func(*PublishResult, error), opts ...PublishOption)
func (*Traced) Unsubscribe ¶
Click to show internal directories.
Click to hide internal directories.