mq

package
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 3, 2025 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Overview

Package mq 提供消息队列抽象层。

支持的消息队列:

  • Kafka
  • RabbitMQ
  • Memory(内存队列,用于测试)

核心功能:

  • 统一的生产者/消费者接口
  • 消息发布/订阅、异步处理
  • 链路追踪和指标采集

使用示例:

client := kafka.New("events", cfg)
client.Publish(ctx, "topic", data)
client.Subscribe(ctx, "topic", handler)

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Base

type Base struct {
	// contains filtered or unexported fields
}

Base 客户端基类

func NewBase

func NewBase(name string, typ Type) *Base

NewBase 创建基类

func (*Base) CompareAndSwapState

func (b *Base) CompareAndSwapState(old, new State) bool

func (*Base) IncConsumed

func (b *Base) IncConsumed()

func (*Base) IncErrors

func (b *Base) IncErrors()

func (*Base) IncPublished

func (b *Base) IncPublished()

func (*Base) IncRetries

func (b *Base) IncRetries()

func (*Base) Name

func (b *Base) Name() string

func (*Base) SetState

func (b *Base) SetState(s State)

func (*Base) State

func (b *Base) State() State

func (*Base) Stats

func (b *Base) Stats() Stats

func (*Base) Type

func (b *Base) Type() Type

type Builder

type Builder struct {
	// contains filtered or unexported fields
}

Builder MQ 客户端构建器

func NewBuilder

func NewBuilder(c Client) *Builder

NewBuilder 创建构建器

func (*Builder) Build

func (b *Builder) Build() Client

Build 构建最终客户端(装饰器顺序:Traced -> Metriced)

func (*Builder) WithMetrics

func (b *Builder) WithMetrics(m *Metrics) *Builder

WithMetrics 启用指标

func (*Builder) WithTracing

func (b *Builder) WithTracing(tracer trace.Tracer) *Builder

WithTracing 启用追踪

type Client

type Client interface {
	Producer
	Consumer
	// Connect 建立连接
	Connect(ctx context.Context) error
	// Ping 健康检查
	Ping(ctx context.Context) error
	// Name 客户端名称
	Name() string
	// Type 队列类型
	Type() Type
	// State 连接状态
	State() State
	// Stats 统计信息
	Stats() Stats
}

Client MQ 客户端接口(同时支持生产和消费)

func Unwrap

func Unwrap(c Client) Client

Unwrap 解包装饰器获取原始客户端

type ConsumeResult

type ConsumeResult struct {
	Ack   func() error
	Nack  func() error
	Retry func(delay time.Duration) error
}

ConsumeResult 消费结果

type Consumer

type Consumer interface {
	// Subscribe 订阅主题
	Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) error
	// Unsubscribe 取消订阅
	Unsubscribe(topic string) error
	// Close 关闭消费者
	Close() error
}

Consumer 消费者接口

type Handler

type Handler func(ctx context.Context, msg *Message) error

Handler 消息处理器

type HealthStatus

type HealthStatus struct {
	Name      string        `json:"name"`
	Type      Type          `json:"type"`
	State     string        `json:"state"`
	Healthy   bool          `json:"healthy"`
	Latency   time.Duration `json:"latency"`
	Error     string        `json:"error,omitempty"`
	CheckedAt time.Time     `json:"checked_at"`
}

HealthStatus 健康状态

type Manager

type Manager interface {
	// Register 注册客户端
	Register(client Client) error
	// Get 获取客户端
	Get(name string) (Client, bool)
	// GetByType 按类型获取
	GetByType(typ Type) []Client
	// ConnectAll 连接所有客户端
	ConnectAll(ctx context.Context) error
	// CloseAll 关闭所有客户端
	CloseAll(ctx context.Context) error
	// HealthCheck 健康检查
	HealthCheck(ctx context.Context) []HealthStatus
	// List 列出所有客户端
	List() []string
}

Manager MQ 管理器接口

func NewManager

func NewManager() Manager

NewManager 创建 MQ 管理器

type Message

type Message struct {
	ID        string            `json:"id"`
	Topic     string            `json:"topic"`
	Key       string            `json:"key,omitempty"`
	Value     []byte            `json:"value"`
	Headers   map[string]string `json:"headers,omitempty"`
	Timestamp time.Time         `json:"timestamp"`

	// 追踪信息
	TraceID string `json:"trace_id,omitempty"`
	SpanID  string `json:"span_id,omitempty"`

	// Raw 原始消息(用于 Ack/Nack)
	Raw any `json:"-"`
}

Message 消息

type Metriced

type Metriced struct {
	// contains filtered or unexported fields
}

Metriced 指标装饰器

func NewMetriced

func NewMetriced(c Client, m *Metrics) *Metriced

NewMetriced 创建指标装饰器

func (*Metriced) Close

func (m *Metriced) Close() error

func (*Metriced) Connect

func (m *Metriced) Connect(ctx context.Context) error

func (*Metriced) Name

func (m *Metriced) Name() string

func (*Metriced) Ping

func (m *Metriced) Ping(ctx context.Context) error

func (*Metriced) Publish

func (m *Metriced) Publish(ctx context.Context, topic string, value []byte, opts ...PublishOption) (*PublishResult, error)

func (*Metriced) PublishAsync

func (m *Metriced) PublishAsync(ctx context.Context, topic string, value []byte, callback func(*PublishResult, error), opts ...PublishOption)

func (*Metriced) SetConsumerLag

func (m *Metriced) SetConsumerLag(topic, group string, lag float64)

SetConsumerLag 设置消费延迟

func (*Metriced) State

func (m *Metriced) State() State

func (*Metriced) Stats

func (m *Metriced) Stats() Stats

func (*Metriced) Subscribe

func (m *Metriced) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) error

func (*Metriced) Type

func (m *Metriced) Type() Type

func (*Metriced) Unsubscribe

func (m *Metriced) Unsubscribe(topic string) error

func (*Metriced) Unwrap

func (m *Metriced) Unwrap() Client

type Metrics

type Metrics struct {
	MessagesTotal   observability.Counter
	MessageDuration observability.Histogram
	MessageSize     observability.Histogram
	ConsumerLag     observability.Gauge
}

Metrics MQ 指标

func NewMetrics

func NewMetrics(p observability.MetricsProvider) *Metrics

NewMetrics 创建 MQ 指标

type Producer

type Producer interface {
	// Publish 发布消息
	Publish(ctx context.Context, topic string, value []byte, opts ...PublishOption) (*PublishResult, error)
	// PublishAsync 异步发布
	PublishAsync(ctx context.Context, topic string, value []byte, callback func(*PublishResult, error), opts ...PublishOption)
	// Close 关闭生产者
	Close() error
}

Producer 生产者接口

type PublishOption

type PublishOption func(*PublishOptions)

PublishOption 发布选项

func WithDelay

func WithDelay(d time.Duration) PublishOption

WithDelay 延迟发送

func WithHeaders

func WithHeaders(headers map[string]string) PublishOption

WithHeaders 设置消息头

func WithKey

func WithKey(key string) PublishOption

WithKey 设置消息 Key

func WithPartition

func WithPartition(p int32) PublishOption

WithPartition 指定分区

type PublishOptions

type PublishOptions struct {
	Key       string
	Headers   map[string]string
	Partition *int32
	Delay     time.Duration
}

PublishOptions 发布选项结构

type PublishResult

type PublishResult struct {
	MessageID string
	Partition int32
	Offset    int64
}

PublishResult 发布结果

type State

type State int32

State 连接状态

const (
	StateDisconnected State = iota
	StateConnecting
	StateConnected
	StateDisconnecting
)

func (State) String

func (s State) String() string

type Stats

type Stats struct {
	Published     int64 `json:"published"`
	Consumed      int64 `json:"consumed"`
	Errors        int64 `json:"errors"`
	Retries       int64 `json:"retries"`
	PendingCount  int64 `json:"pending_count"`
	ConsumerCount int   `json:"consumer_count"`
}

Stats 统计信息

type SubscribeOption

type SubscribeOption func(*SubscribeOptions)

SubscribeOption 订阅选项

func WithAutoAck

func WithAutoAck(auto bool) SubscribeOption

WithAutoAck 自动确认

func WithConcurrency

func WithConcurrency(n int) SubscribeOption

WithConcurrency 设置并发数

func WithGroup

func WithGroup(group string) SubscribeOption

WithGroup 设置消费组

func WithMaxRetries

func WithMaxRetries(n int) SubscribeOption

WithMaxRetries 最大重试次数

func WithRetryDelay

func WithRetryDelay(d time.Duration) SubscribeOption

WithRetryDelay 重试延迟

type SubscribeOptions

type SubscribeOptions struct {
	Group       string
	Concurrency int
	AutoAck     bool
	MaxRetries  int
	RetryDelay  time.Duration
}

SubscribeOptions 订阅选项结构

func DefaultSubscribeOptions

func DefaultSubscribeOptions() SubscribeOptions

DefaultSubscribeOptions 默认订阅选项

type Traced

type Traced struct {
	// contains filtered or unexported fields
}

Traced 追踪装饰器

func NewTraced

func NewTraced(c Client, tracer trace.Tracer) *Traced

NewTraced 创建追踪装饰器

func (*Traced) Close

func (t *Traced) Close() error

func (*Traced) Connect

func (t *Traced) Connect(ctx context.Context) error

func (*Traced) Name

func (t *Traced) Name() string

func (*Traced) Ping

func (t *Traced) Ping(ctx context.Context) error

func (*Traced) Publish

func (t *Traced) Publish(ctx context.Context, topic string, value []byte, opts ...PublishOption) (*PublishResult, error)

func (*Traced) PublishAsync

func (t *Traced) PublishAsync(ctx context.Context, topic string, value []byte, callback func(*PublishResult, error), opts ...PublishOption)

func (*Traced) State

func (t *Traced) State() State

func (*Traced) Stats

func (t *Traced) Stats() Stats

func (*Traced) Subscribe

func (t *Traced) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) error

func (*Traced) Type

func (t *Traced) Type() Type

func (*Traced) Unsubscribe

func (t *Traced) Unsubscribe(topic string) error

func (*Traced) Unwrap

func (t *Traced) Unwrap() Client

Unwrap 获取底层客户端

type Type

type Type string

Type 消息队列类型

const (
	TypeKafka    Type = "kafka"
	TypeRabbitMQ Type = "rabbitmq"
	TypeRocketMQ Type = "rocketmq"
	TypePulsar   Type = "pulsar"
	TypeNSQ      Type = "nsq"
	TypeRedis    Type = "redis"
	TypeMemory   Type = "memory"
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL