mq

package
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 11, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Handler

type Handler func(ctx context.Context, msg *Message) error

Handler 消息处理函数

type Message

type Message struct {
	ID        string            `json:"id"`        // 消息ID
	Topic     string            `json:"topic"`     // 主题
	Payload   []byte            `json:"payload"`   // 消息内容
	Headers   map[string]string `json:"headers"`   // 消息头
	Timestamp time.Time         `json:"timestamp"` // 时间戳
	ReplyTo   string            `json:"reply_to"`  // 回复主题
}

Message 消息结构

func NewMessage

func NewMessage(topic string, payload []byte) *Message

NewMessage 创建消息

func (*Message) GetHeader

func (m *Message) GetHeader(key string) string

GetHeader 获取消息头

func (*Message) SetHeader

func (m *Message) SetHeader(key, value string) *Message

SetHeader 设置消息头

type MessageQueue

type MessageQueue interface {
	// Publish 发布消息
	Publish(ctx context.Context, topic string, msg *Message) error
	// Subscribe 订阅主题
	Subscribe(topic string, handler Handler) error
	// QueueSubscribe 队列订阅(负载均衡)
	QueueSubscribe(topic, queue string, handler Handler) error
	// Request 请求响应模式
	Request(ctx context.Context, topic string, msg *Message, timeout time.Duration) (*Message, error)
	// Unsubscribe 取消订阅
	Unsubscribe(topic string) error
	// Close 关闭连接
	Close() error
	// Ping 检查连接
	Ping(ctx context.Context) error
}

MessageQueue 消息队列接口

type NATSClient

type NATSClient struct {
	// contains filtered or unexported fields
}

NATSClient NATS消息队列客户端

func NewNATSClient

func NewNATSClient(opts ...NATSOption) (*NATSClient, error)

NewNATSClient 创建NATS客户端

func (*NATSClient) Close

func (c *NATSClient) Close() error

Close 关闭连接

func (*NATSClient) Conn

func (c *NATSClient) Conn() *nats.Conn

Conn 获取原始NATS连接

func (*NATSClient) Flush

func (c *NATSClient) Flush() error

Flush 刷新缓冲

func (*NATSClient) FlushTimeout

func (c *NATSClient) FlushTimeout(timeout time.Duration) error

FlushTimeout 带超时的刷新

func (*NATSClient) Ping

func (c *NATSClient) Ping(ctx context.Context) error

Ping 检查连接

func (*NATSClient) Publish

func (c *NATSClient) Publish(ctx context.Context, topic string, msg *Message) error

Publish 发布消息

func (*NATSClient) PublishRaw

func (c *NATSClient) PublishRaw(ctx context.Context, topic string, data []byte) error

PublishRaw 发布原始数据

func (*NATSClient) QueueSubscribe

func (c *NATSClient) QueueSubscribe(topic, queue string, handler Handler) error

QueueSubscribe 队列订阅

func (*NATSClient) Request

func (c *NATSClient) Request(ctx context.Context, topic string, msg *Message, timeout time.Duration) (*Message, error)

Request 请求响应模式

func (*NATSClient) Subscribe

func (c *NATSClient) Subscribe(topic string, handler Handler) error

Subscribe 订阅主题

func (*NATSClient) Unsubscribe

func (c *NATSClient) Unsubscribe(topic string) error

Unsubscribe 取消订阅

type NATSOption

type NATSOption func(*NATSClient)

NATSOption NATS选项

func WithNATSConfig

func WithNATSConfig(cfg *config.NATSConfig) NATSOption

WithNATSConfig 设置NATS配置

func WithNATSLogger

func WithNATSLogger(logger log.Logger) NATSOption

WithNATSLogger 设置日志

type Subscription

type Subscription struct {
	Topic   string
	Queue   string
	Handler Handler
}

Subscription 订阅信息

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL