kitflow

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 5, 2026 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultTaskQueue = "default"

DefaultTaskQueue 默认任务队列名称

Functions

func ConsumeQueue

func ConsumeQueue(flowClient client.Client, conn *rabbitmq.Conn, queue string, optionFuncs ...func(*ConsumerOptions)) error

ConsumeQueue 消费RabbitMQ队列并执行Temporal工作流(纯净版,无默认配置)

func ConsumeQueueDefault

func ConsumeQueueDefault(flowClient client.Client, conn *rabbitmq.Conn, queue string, concurrency int, optionFuncs ...func(*ConsumerOptions)) error

ConsumeQueueDefault 消费RabbitMQ队列并执行Temporal工作流(带默认配置)

func CreateNamespace added in v0.4.0

func CreateNamespace(hostPort, namespace string, retention time.Duration) error

CreateNamespace 创建 Temporal 命名空间 如果 namespace 为 "default" 或空字符串,则跳过创建 retention 为工作流执行保留时间,传 0 则默认 7 天

func PublishQueueOnce

func PublishQueueOnce(conn *rabbitmq.Conn, job *QueueJob, queue string) error

PublishQueueOnce 便捷函数:创建发布器、发布任务、自动关闭

func WithConsumerOption

func WithConsumerOption(opt func(*rabbitmq.ConsumerOptions)) func(*ConsumerOptions)

WithConsumerOption 添加原生 rabbitmq.ConsumerOptions 配置

func WithPublisherOption

func WithPublisherOption(opt func(*rabbitmq.PublisherOptions)) func(*PublisherOptions)

WithPublisherOption 添加原生 rabbitmq.PublisherOptions 配置

Types

type ConsumerOptions

type ConsumerOptions struct {
	// contains filtered or unexported fields
}

ConsumerOptions 消费者配置选项

type PublisherOptions

type PublisherOptions struct {
	// contains filtered or unexported fields
}

PublisherOptions 发布器配置选项

type QueueJob

type QueueJob struct {
	// Id 工作流唯一ID
	Id string `json:"id"`
	// ID 与日期排重, 如果设置此项,则消费进程会将当前执行时候的日期附加到工作流ID后面,格式由 IdDateFormat 指定,如 "2006-01-02"
	IdDateFormat string `json:"id_date_format"`
	// Name 工作流名称
	Name string `json:"name"`
	// Arg 工作流参数
	Arg any `json:"arg"`
	// TaskQueue Temporal任务队列名称
	TaskQueue                                string                         `json:"task_queue"`
	StartDelay                               time.Duration                  `json:"start_delay"`
	WorkflowExecutionTimeout                 time.Duration                  `json:"execution_timeout"`
	WorkflowIDReusePolicy                    enums.WorkflowIdReusePolicy    `json:"reuse_policy"`
	WorkflowIDConflictPolicy                 enums.WorkflowIdConflictPolicy `json:"conflict_policy"`
	WorkflowExecutionErrorWhenAlreadyStarted bool                           `json:"already_started"`
	RetryPolicy                              *temporal.RetryPolicy          `json:"retry_policy"`
	// 是否异步执行,异步执行不会等待工作流完成,启动工作流后队列就会继续执行下一个任务
	Async bool `json:"async"`
}

QueueJob 队列工作流任务定义

func (*QueueJob) GetRetryPolicy

func (j *QueueJob) GetRetryPolicy() *temporal.RetryPolicy

GetRetryPolicy 获取重试策略

func (*QueueJob) Validate

func (j *QueueJob) Validate() error

Validate 校验任务配置

type QueuePublisher

type QueuePublisher struct {
	// contains filtered or unexported fields
}

QueuePublisher 队列工作流发布器

func NewQueuePublisher

func NewQueuePublisher(conn *rabbitmq.Conn, optionFuncs ...func(*PublisherOptions)) (*QueuePublisher, error)

NewQueuePublisher 创建队列发布器

func (*QueuePublisher) Close

func (p *QueuePublisher) Close()

Close 关闭发布器

func (*QueuePublisher) Publish

func (p *QueuePublisher) Publish(job *QueueJob, queue string) error

Publish 发布工作流任务到RabbitMQ队列

func (*QueuePublisher) PublishOnce

func (p *QueuePublisher) PublishOnce(job *QueueJob, queue string) error

PublishOnce 发布一次后自动关闭

type ScheduleConfig

type ScheduleConfig struct {
	// Name 任务名称,同时作为 Schedule ID
	Name string
	// Cron cron 表达式列表
	Cron []string
	// Workflow 工作流名称
	Workflow string
	// TaskQueue 任务队列名称
	TaskQueue string
	// Args 工作流参数
	Args []any
	// Overlap 重叠策略,默认 SKIP
	Overlap enums.ScheduleOverlapPolicy
}

ScheduleConfig 定时任务配置

func (*ScheduleConfig) ToScheduleOptions

func (c *ScheduleConfig) ToScheduleOptions() client.ScheduleOptions

ToScheduleOptions 转换为 Temporal ScheduleOptions

type ScheduleRegistry

type ScheduleRegistry struct {
	// contains filtered or unexported fields
}

ScheduleRegistry 定时任务注册器

func NewScheduleRegistry

func NewScheduleRegistry() *ScheduleRegistry

NewScheduleRegistry 创建注册器

func (*ScheduleRegistry) Add

Add 添加定时任务配置

func (*ScheduleRegistry) AddMany

func (r *ScheduleRegistry) AddMany(configs ...ScheduleConfig) *ScheduleRegistry

AddMany 批量添加定时任务配置

func (*ScheduleRegistry) Register

func (r *ScheduleRegistry) Register(cli client.Client, closeClient bool)

Register 注册所有定时任务到 Temporal closeClient: 是否在注册完成后关闭客户端

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL