Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrNotSupported = errors.New("mq: operation not supported") ErrConnectionClosed = errors.New("mq: connection closed") ErrNoMessage = errors.New("mq: no message available") )
基础错误定义
Functions ¶
This section is empty.
Types ¶
type Connection ¶
type Connection interface {
// NewProducer 创建生产者,topic 为默认主题/队列名
NewProducer(ctx context.Context, topic string) (Producer, error)
// NewConsumer 创建消费者,可指定消费组等配置
NewConsumer(ctx context.Context, topic string, options ...ConsumerOption) (Consumer, error)
// Close 关闭连接
io.Closer
// Ping 检查连接是否存活
Ping(ctx context.Context) error
}
Connection 代表与消息中间件的连接,可创建生产者和消费者
func NewMemoryConnection ¶
func NewMemoryConnection(bufferSize int) Connection
func NewNATSConnection ¶
func NewNATSConnection(url string, opts ...nats.Option) (Connection, error)
NewNATSConnection 创建一个新的 NATS JetStream 连接
type Consumer ¶
type Consumer interface {
// Consume 开始消费消息,通过回调函数处理每条消息
// handler 返回 nil 表示处理成功,将自动 Ack;返回 error 表示失败,将 Nack
// 该方法应阻塞直到上下文取消或发生致命错误
Consume(ctx context.Context, handler func(ctx context.Context, msg *Message) error) error
// Close 停止消费并释放资源
io.Closer
}
Consumer 定义消息消费者接口
type ConsumerConfig ¶
type ConsumerOption ¶
type ConsumerOption func(*ConsumerConfig)
ConsumerOption 用于定制消费者行为
func WithAutoAck ¶
func WithAutoAck(enabled bool) ConsumerOption
func WithConsumerGroup ¶
func WithConsumerGroup(group string) ConsumerOption
func WithPollTimeout ¶
func WithPollTimeout(d time.Duration) ConsumerOption
func WithPrefetchCount ¶
func WithPrefetchCount(n int) ConsumerOption
type Message ¶
type Message struct {
// ID 消息唯一标识,可能由中间件生成
ID string
// Key 路由键或分区键
Key string
// Payload 消息体
Payload []byte
// Attributes 扩展属性,如 headers
Attributes map[string]string
// Timestamp 消息产生时间
Timestamp time.Time
// AckFunc 确认函数,调用表示消息处理成功
AckFunc func() error
// NackFunc 否定确认,可要求重试
NackFunc func(requeue bool) error
}
Message 表示一条消息,包含通用元数据和负载
Click to show internal directories.
Click to hide internal directories.