mq

package
v0.0.0-...-e9c6a42 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 3, 2026 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNotSupported     = errors.New("mq: operation not supported")
	ErrConnectionClosed = errors.New("mq: connection closed")
	ErrNoMessage        = errors.New("mq: no message available")
)

基础错误定义

Functions

This section is empty.

Types

type Connection

type Connection interface {
	// NewProducer 创建生产者,topic 为默认主题/队列名
	NewProducer(ctx context.Context, topic string) (Producer, error)
	// NewConsumer 创建消费者,可指定消费组等配置
	NewConsumer(ctx context.Context, topic string, options ...ConsumerOption) (Consumer, error)
	// Close 关闭连接
	io.Closer
	// Ping 检查连接是否存活
	Ping(ctx context.Context) error
}

Connection 代表与消息中间件的连接,可创建生产者和消费者

func NewMemoryConnection

func NewMemoryConnection(bufferSize int) Connection

func NewNATSConnection

func NewNATSConnection(url string, opts ...nats.Option) (Connection, error)

NewNATSConnection 创建一个新的 NATS JetStream 连接

type Consumer

type Consumer interface {
	// Consume 开始消费消息,通过回调函数处理每条消息
	// handler 返回 nil 表示处理成功,将自动 Ack;返回 error 表示失败,将 Nack
	// 该方法应阻塞直到上下文取消或发生致命错误
	Consume(ctx context.Context, handler func(ctx context.Context, msg *Message) error) error
	// Close 停止消费并释放资源
	io.Closer
}

Consumer 定义消息消费者接口

type ConsumerConfig

type ConsumerConfig struct {
	Group         string        // 消费组名称
	AutoAck       bool          // 是否自动确认
	PrefetchCount int           // 预取数量
	MaxRetries    int           // 最大重试次数
	PollTimeout   time.Duration // 轮询超时
}

type ConsumerOption

type ConsumerOption func(*ConsumerConfig)

ConsumerOption 用于定制消费者行为

func WithAutoAck

func WithAutoAck(enabled bool) ConsumerOption

func WithConsumerGroup

func WithConsumerGroup(group string) ConsumerOption

func WithPollTimeout

func WithPollTimeout(d time.Duration) ConsumerOption

func WithPrefetchCount

func WithPrefetchCount(n int) ConsumerOption

type Message

type Message struct {
	// ID 消息唯一标识,可能由中间件生成
	ID string
	// Key 路由键或分区键
	Key string
	// Payload 消息体
	Payload []byte
	// Attributes 扩展属性,如 headers
	Attributes map[string]string
	// Timestamp 消息产生时间
	Timestamp time.Time
	// AckFunc 确认函数,调用表示消息处理成功
	AckFunc func() error
	// NackFunc 否定确认,可要求重试
	NackFunc func(requeue bool) error
}

Message 表示一条消息,包含通用元数据和负载

type Producer

type Producer interface {
	// Send 发送单条消息,同步等待确认
	Send(ctx context.Context, msg *Message) error
	// SendBatch 批量发送,提高吞吐
	SendBatch(ctx context.Context, msgs []*Message) error
	// Close 释放生产者资源
	io.Closer
}

Producer 定义消息生产者接口

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL