Documentation
¶
Index ¶
- Variables
- func ConsumeQueue(flowClient client.Client, conn *rabbitmq.Conn, queue string, ...) error
- func ConsumeQueueDefault(flowClient client.Client, conn *rabbitmq.Conn, queue string, concurrency int, ...) error
- func CreateNamespace(hostPort, namespace string, retention time.Duration) error
- func PublishQueueOnce(conn *rabbitmq.Conn, job *QueueJob, queue string) error
- func WithConsumerOption(opt func(*rabbitmq.ConsumerOptions)) func(*ConsumerOptions)
- func WithPublisherOption(opt func(*rabbitmq.PublisherOptions)) func(*PublisherOptions)
- type ConsumerOptions
- type PublisherOptions
- type QueueJob
- type QueuePublisher
- type ScheduleConfig
- type ScheduleRegistry
Constants ¶
This section is empty.
Variables ¶
var DefaultTaskQueue = "default"
DefaultTaskQueue 默认任务队列名称
Functions ¶
func ConsumeQueue ¶
func ConsumeQueue(flowClient client.Client, conn *rabbitmq.Conn, queue string, optionFuncs ...func(*ConsumerOptions)) error
ConsumeQueue 消费RabbitMQ队列并执行Temporal工作流(纯净版,无默认配置)
func ConsumeQueueDefault ¶
func ConsumeQueueDefault(flowClient client.Client, conn *rabbitmq.Conn, queue string, concurrency int, optionFuncs ...func(*ConsumerOptions)) error
ConsumeQueueDefault 消费RabbitMQ队列并执行Temporal工作流(带默认配置)
func CreateNamespace ¶ added in v0.4.0
CreateNamespace 创建 Temporal 命名空间 如果 namespace 为 "default" 或空字符串,则跳过创建 retention 为工作流执行保留时间,传 0 则默认 7 天
func PublishQueueOnce ¶
PublishQueueOnce 便捷函数:创建发布器、发布任务、自动关闭
func WithConsumerOption ¶
func WithConsumerOption(opt func(*rabbitmq.ConsumerOptions)) func(*ConsumerOptions)
WithConsumerOption 添加原生 rabbitmq.ConsumerOptions 配置
func WithPublisherOption ¶
func WithPublisherOption(opt func(*rabbitmq.PublisherOptions)) func(*PublisherOptions)
WithPublisherOption 添加原生 rabbitmq.PublisherOptions 配置
Types ¶
type ConsumerOptions ¶
type ConsumerOptions struct {
// contains filtered or unexported fields
}
ConsumerOptions 消费者配置选项
type PublisherOptions ¶
type PublisherOptions struct {
// contains filtered or unexported fields
}
PublisherOptions 发布器配置选项
type QueueJob ¶
type QueueJob struct {
// Id 工作流唯一ID
Id string `json:"id"`
// ID 与日期排重, 如果设置此项,则消费进程会将当前执行时候的日期附加到工作流ID后面,格式由 IdDateFormat 指定,如 "2006-01-02"
IdDateFormat string `json:"id_date_format"`
// Name 工作流名称
Name string `json:"name"`
// Arg 工作流参数
Arg any `json:"arg"`
// TaskQueue Temporal任务队列名称
TaskQueue string `json:"task_queue"`
StartDelay time.Duration `json:"start_delay"`
WorkflowExecutionTimeout time.Duration `json:"execution_timeout"`
WorkflowIDReusePolicy enums.WorkflowIdReusePolicy `json:"reuse_policy"`
WorkflowIDConflictPolicy enums.WorkflowIdConflictPolicy `json:"conflict_policy"`
WorkflowExecutionErrorWhenAlreadyStarted bool `json:"already_started"`
RetryPolicy *temporal.RetryPolicy `json:"retry_policy"`
// 是否异步执行,异步执行不会等待工作流完成,启动工作流后队列就会继续执行下一个任务
Async bool `json:"async"`
}
QueueJob 队列工作流任务定义
func (*QueueJob) GetRetryPolicy ¶
func (j *QueueJob) GetRetryPolicy() *temporal.RetryPolicy
GetRetryPolicy 获取重试策略
type QueuePublisher ¶
type QueuePublisher struct {
// contains filtered or unexported fields
}
QueuePublisher 队列工作流发布器
func NewQueuePublisher ¶
func NewQueuePublisher(conn *rabbitmq.Conn, optionFuncs ...func(*PublisherOptions)) (*QueuePublisher, error)
NewQueuePublisher 创建队列发布器
func (*QueuePublisher) Publish ¶
func (p *QueuePublisher) Publish(job *QueueJob, queue string) error
Publish 发布工作流任务到RabbitMQ队列
func (*QueuePublisher) PublishOnce ¶
func (p *QueuePublisher) PublishOnce(job *QueueJob, queue string) error
PublishOnce 发布一次后自动关闭
type ScheduleConfig ¶
type ScheduleConfig struct {
// Name 任务名称,同时作为 Schedule ID
Name string
// Cron cron 表达式列表
Cron []string
// Workflow 工作流名称
Workflow string
// TaskQueue 任务队列名称
TaskQueue string
// Args 工作流参数
Args []any
// Overlap 重叠策略,默认 SKIP
Overlap enums.ScheduleOverlapPolicy
}
ScheduleConfig 定时任务配置
func (*ScheduleConfig) ToScheduleOptions ¶
func (c *ScheduleConfig) ToScheduleOptions() client.ScheduleOptions
ToScheduleOptions 转换为 Temporal ScheduleOptions
type ScheduleRegistry ¶
type ScheduleRegistry struct {
// contains filtered or unexported fields
}
ScheduleRegistry 定时任务注册器
func (*ScheduleRegistry) Add ¶
func (r *ScheduleRegistry) Add(config ScheduleConfig) *ScheduleRegistry
Add 添加定时任务配置
func (*ScheduleRegistry) AddMany ¶
func (r *ScheduleRegistry) AddMany(configs ...ScheduleConfig) *ScheduleRegistry
AddMany 批量添加定时任务配置