Documentation
¶
Overview ¶
Package transfer 提供生产者端的客户端 SDK。
用于管理 JetStream 流和向收集器发布 BSON 数据。
基本用法:
// 创建客户端
t, _ := transfer.New(ctx, "alpha", nc)
// 注册流
t.Add(ctx, transfer.Option{
Key: "metrics",
Collection: "metrics",
})
// 发布数据
t.Send("metrics", bson.M{"cpu": 0.42})
// 移除流
t.Remove(ctx, "metrics")
Index ¶
- type Option
- type State
- type Transfer
- func (x *Transfer) Add(ctx context.Context, option Option) (err error)
- func (x *Transfer) Get(ctx context.Context, key string) (option *Option, err error)
- func (x *Transfer) Remove(ctx context.Context, key string) (err error)
- func (x *Transfer) Send(key string, data any) (err error)
- func (x *Transfer) StreamName(key string) string
- func (x *Transfer) SubName(key string) string
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Option ¶
type Option struct {
// Key 流的唯一标识符,用于生成流名称和主题
Key string `json:"key"`
// Subs 额外订阅的主题列表(可选)
// 允许流接收来自其他主题的消息
Subs []string `json:"subs"`
// Description 流的描述信息
Description string `json:"description"`
// Collection MongoDB 目标集合名称
// 为空时使用 Key 作为集合名称
Collection string `json:"collection"`
// State 收集器运行时状态(仅 Get 方法返回)
*State
}
Option 定义流配置选项。
type State ¶
type State struct {
// BufferSize 收集器缓冲区中待写入的消息数量
BufferSize int `json:"buffer_size"`
}
State 表示收集器的运行时状态。
type Transfer ¶
type Transfer struct {
// Namespace 应用命名空间,用于流和主题命名
Namespace string
// Nc NATS 连接
Nc *nats.Conn
// Js JetStream 上下文
Js jetstream.JetStream
// Kv 命名空间 KV 存储桶
Kv jetstream.KeyValue
}
Transfer 是生产者端的客户端 SDK。
提供流管理和消息发布功能:
- Add: 创建/更新流和消费者
- Send: 发布 BSON 数据到流
- Get: 查询流配置和收集器状态
- Remove: 删除流配置
func New ¶
func New(ctx context.Context, namespace string, nc *nats.Conn, opts ...jetstream.JetStreamOpt) (x *Transfer, err error)
New 创建绑定到命名空间 KV 存储桶的 Transfer 实例。
参数:
- ctx: 上下文
- namespace: 命名空间(不能包含连字符 '-')
- nc: NATS 连接
- opts: JetStream 选项(可选)
注意:命名空间必须与收集器配置的命名空间一致。 KV 存储桶必须已存在(由收集器或手动创建)。
func (*Transfer) Add ¶
Add 创建或更新流并将配置持久化到 KV。
执行流程:
- 创建/更新 JetStream 流(WorkQueue 保留策略)
- 创建/更新 durable 消费者(显式 ACK 策略)
- 将配置写入 KV
收集器通过监听 KV 变更自动订阅新流。
流配置:
- Name: {namespace}_{key}
- Subjects: [{namespace}.{key}, ...option.Subs]
- Retention: WorkQueuePolicy(消息 ACK 后删除)
消费者配置:
- Durable: "default"
- AckPolicy: AckExplicitPolicy(需要显式 ACK)
func (*Transfer) Get ¶
Get 获取流配置和收集器状态。
执行流程:
- 从 KV 读取流配置
- 向收集器发送状态查询请求
- 合并配置和状态返回
如果收集器未运行或未订阅该流,状态查询会超时。
func (*Transfer) Remove ¶
Remove 从 KV 删除流配置。
收集器监听 KV 变更,会自动:
- 停止收集器
- 删除 JetStream 流
注意:这会删除流中所有未消费的消息。
func (*Transfer) Send ¶
Send 发布 BSON 编码的数据到指定流。
数据会被序列化为 BSON 格式后发布到 {namespace}.{key} 主题。 使用异步发布,调用返回不代表消息已持久化。
如需确认发布完成,可使用:
<-t.Js.PublishAsyncComplete()
func (*Transfer) StreamName ¶
StreamName 根据 key 生成 JetStream 流名称。
格式:{namespace}_{key} 示例:alpha_metrics
Click to show internal directories.
Click to hide internal directories.