Documentation
¶
Index ¶
- Variables
- type Handler
- type Job
- type JobStatus
- type Queue
- type QueueEventListener
- type QueueEventProvider
- type QueueManager
- func (m *QueueManager) AddQueue(name string, queue Queue) error
- func (m *QueueManager) DefaultQueue() Queue
- func (m *QueueManager) GetDefaultQueue() (Queue, error)
- func (m *QueueManager) GetDefaultQueueName() (string, error)
- func (m *QueueManager) GetQueue(name string) (Queue, error)
- func (m *QueueManager) HasQueue(name string) bool
- func (m *QueueManager) ListQueues() []string
- func (m *QueueManager) Push(ctx context.Context, jobName string, payload map[string]interface{}) (string, error)
- func (m *QueueManager) PushWithDelay(ctx context.Context, jobName string, payload map[string]interface{}, ...) (string, error)
- func (m *QueueManager) Register(jobName string, handler Handler)
- func (m *QueueManager) RegisterWithMiddleware(jobName string, handler Handler, middlewares ...QueueMiddleware)
- func (m *QueueManager) RemoveQueue(name string) error
- func (m *QueueManager) Schedule(ctx context.Context, jobName string, payload map[string]interface{}, ...) (string, error)
- func (m *QueueManager) SetDefaultQueue(name string) error
- type QueueMiddleware
Constants ¶
This section is empty.
Variables ¶
var ( ErrQueueNotFound = errors.New("queue: 队列不存在") ErrQueueAlreadyExists = errors.New("queue: 队列已存在") ErrJobNotFound = errors.New("queue: 任务不存在") ErrInvalidJobID = errors.New("queue: 无效的任务ID") ErrQueueFull = errors.New("queue: 队列已满") ErrInvalidPayload = errors.New("queue: 无效的任务负载") )
常见错误定义
var (
ErrDefaultQueueNotSet = errors.New("未设置默认队列")
)
队列管理器特有的错误定义
Functions ¶
This section is empty.
Types ¶
type Handler ¶
Handler 表示任务处理器
func ApplyMiddleware ¶
func ApplyMiddleware(handler Handler, middlewares ...QueueMiddleware) Handler
ApplyMiddleware 应用中间件到任务处理器
type Job ¶
type Job struct {
ID string `json:"id"` // 任务唯一标识
Queue string `json:"queue"` // 所属队列
Name string `json:"name"` // 任务名称
Payload map[string]interface{} `json:"payload"` // 任务负载数据
Attempts int `json:"attempts"` // 尝试次数
MaxRetries int `json:"max_retries"` // 最大重试次数
Status JobStatus `json:"status"` // 任务状态
CreatedAt time.Time `json:"created_at"` // 创建时间
UpdatedAt time.Time `json:"updated_at"` // 更新时间
ScheduledAt *time.Time `json:"scheduled_at,omitempty"` // 计划执行时间
StartedAt *time.Time `json:"started_at,omitempty"` // 开始执行时间
FinishedAt *time.Time `json:"finished_at,omitempty"` // 完成时间
Error string `json:"error,omitempty"` // 错误信息
}
Job 表示队列中的一个任务
func (*Job) GetPayloadValue ¶
GetPayloadValue 从任务负载中获取指定键的值,返回值和是否存在
type JobStatus ¶
type JobStatus string
JobStatus 表示任务的状态
const ( JobStatusPending JobStatus = "pending" // 等待执行 JobStatusScheduled JobStatus = "scheduled" // 已计划执行 JobStatusRunning JobStatus = "running" // 执行中 JobStatusCompleted JobStatus = "completed" // 已完成 JobStatusFailed JobStatus = "failed" // 执行失败 JobStatusRetrying JobStatus = "retrying" // 等待重试 JobStatusCancelled JobStatus = "cancelled" // 已取消 )
type Queue ¶
type Queue interface {
// Push 将任务推送到队列
Push(ctx context.Context, queueName string, jobName string, payload map[string]interface{}) (string, error)
// PushWithDelay 将任务推送到队列,延迟指定时间后执行
PushWithDelay(ctx context.Context, queueName string, jobName string, payload map[string]interface{}, delay time.Duration) (string, error)
// Schedule 计划一个任务在指定时间执行
Schedule(ctx context.Context, queueName string, jobName string, payload map[string]interface{}, scheduledAt time.Time) (string, error)
// Get 获取任务信息
Get(ctx context.Context, queueName string, jobID string) (*Job, error)
// Delete 删除任务
Delete(ctx context.Context, queueName string, jobID string) error
// Clear 清空队列
Clear(ctx context.Context, queueName string) error
// Size 获取队列大小
Size(ctx context.Context, queueName string) (int, error)
// Register 注册任务处理器
Register(jobName string, handler Handler)
// ProcessNext 处理队列中的下一个任务
ProcessNext(ctx context.Context, queueName string) error
// StartWorker 启动工作进程处理任务
StartWorker(ctx context.Context, queueName string, concurrency int) error
// StopWorker 停止工作进程
StopWorker(ctx context.Context, queueName string) error
// Retry 重试失败的任务
Retry(ctx context.Context, queueName string, jobID string) error
}
Queue 表示一个任务队列的抽象接口
type QueueEventListener ¶
type QueueEventListener struct {
// contains filtered or unexported fields
}
QueueEventListener 队列事件监听器,将事件转换为队列任务
func NewQueueEventListener ¶
func NewQueueEventListener(manager *QueueManager, queueName string) *QueueEventListener
NewQueueEventListener 创建一个新的队列事件监听器
func (*QueueEventListener) Handle ¶
func (l *QueueEventListener) Handle(evt event.Event) error
Handle 处理事件,将其转换为队列任务
func (*QueueEventListener) RegisterEventJob ¶
func (l *QueueEventListener) RegisterEventJob(eventName, jobName string)
RegisterEventJob 注册事件到任务的映射
func (*QueueEventListener) ShouldHandle ¶
func (l *QueueEventListener) ShouldHandle(evt event.Event) bool
ShouldHandle 判断是否应该处理此事件
type QueueEventProvider ¶
type QueueEventProvider struct {
// contains filtered or unexported fields
}
QueueEventProvider 将队列任务状态变化转换为事件
func NewQueueEventProvider ¶
func NewQueueEventProvider(dispatcher event.Dispatcher) *QueueEventProvider
NewQueueEventProvider 创建一个新的队列事件提供者
func (*QueueEventProvider) JobStatusChanged ¶
func (p *QueueEventProvider) JobStatusChanged(job *Job, oldStatus JobStatus)
JobStatusChanged 当任务状态改变时触发事件
type QueueManager ¶
type QueueManager struct {
// contains filtered or unexported fields
}
QueueManager 队列管理器实现
func (*QueueManager) AddQueue ¶
func (m *QueueManager) AddQueue(name string, queue Queue) error
AddQueue 添加队列到管理器
func (*QueueManager) GetDefaultQueue ¶
func (m *QueueManager) GetDefaultQueue() (Queue, error)
GetDefaultQueue 获取默认队列
func (*QueueManager) GetDefaultQueueName ¶
func (m *QueueManager) GetDefaultQueueName() (string, error)
GetDefaultQueueName 获取默认队列名称
func (*QueueManager) GetQueue ¶
func (m *QueueManager) GetQueue(name string) (Queue, error)
GetQueue 获取指定队列
func (*QueueManager) Push ¶
func (m *QueueManager) Push(ctx context.Context, jobName string, payload map[string]interface{}) (string, error)
Push 使用默认队列推送任务
func (*QueueManager) PushWithDelay ¶
func (m *QueueManager) PushWithDelay(ctx context.Context, jobName string, payload map[string]interface{}, delay time.Duration) (string, error)
PushWithDelay 使用默认队列延迟推送任务
func (*QueueManager) Register ¶
func (m *QueueManager) Register(jobName string, handler Handler)
Register 为所有队列注册同一个处理器
func (*QueueManager) RegisterWithMiddleware ¶
func (m *QueueManager) RegisterWithMiddleware(jobName string, handler Handler, middlewares ...QueueMiddleware)
RegisterWithMiddleware 使用中间件注册任务处理器
func (*QueueManager) RemoveQueue ¶
func (m *QueueManager) RemoveQueue(name string) error
RemoveQueue 从管理器删除队列
func (*QueueManager) Schedule ¶
func (m *QueueManager) Schedule(ctx context.Context, jobName string, payload map[string]interface{}, scheduledAt time.Time) (string, error)
Schedule 使用默认队列计划任务
func (*QueueManager) SetDefaultQueue ¶
func (m *QueueManager) SetDefaultQueue(name string) error
SetDefaultQueue 设置默认队列
type QueueMiddleware ¶
QueueMiddleware 队列中间件,为任务处理添加额外功能
func RetryMiddleware ¶
func RetryMiddleware(maxRetries int) QueueMiddleware
RetryMiddleware 创建一个处理重试的中间件