Documentation
¶
Index ¶
- Variables
- func MarshalPayload(payload interface{}) (json.RawMessage, error)
- func RegisterQueueFactory(name string, factory Factory)
- func UnmarshalPayload(data json.RawMessage, v interface{}) error
- type CallbackProcessor
- func (p *CallbackProcessor) GetRegisteredHandlerTypes() map[TaskType]bool
- func (p *CallbackProcessor) HandleCallback(ctx context.Context, req *CallbackRequest) (*CallbackResponse, error)
- func (p *CallbackProcessor) ProcessCallback(ctx context.Context, callbackData []byte) error
- func (p *CallbackProcessor) RegisterDefaultHandlers(queue Queue)
- func (p *CallbackProcessor) RegisterHandler(taskType TaskType, handler TaskCallbackHandler)
- type CallbackRequest
- type CallbackResponse
- type ChunkInfo
- type Config
- type DocumentParsePayload
- type DocumentParseResult
- type Factory
- type Handler
- type MockQueue
- func (_m *MockQueue) Close() error
- func (_m *MockQueue) DeleteTask(ctx context.Context, taskID string) error
- func (_m *MockQueue) EXPECT() *MockQueue_Expecter
- func (_m *MockQueue) Enqueue(ctx context.Context, taskType TaskType, documentID string, payload interface{}) (string, error)
- func (_m *MockQueue) EnqueueAt(ctx context.Context, taskType TaskType, documentID string, payload interface{}, ...) (string, error)
- func (_m *MockQueue) EnqueueIn(ctx context.Context, taskType TaskType, documentID string, payload interface{}, ...) (string, error)
- func (_m *MockQueue) GetTask(ctx context.Context, taskID string) (*Task, error)
- func (_m *MockQueue) GetTasksByDocument(ctx context.Context, documentID string) ([]*Task, error)
- func (_m *MockQueue) NotifyTaskUpdate(ctx context.Context, taskID string) error
- func (_m *MockQueue) UpdateTaskStatus(ctx context.Context, taskID string, status TaskStatus, result interface{}, ...) error
- func (_m *MockQueue) WaitForTask(ctx context.Context, taskID string, timeout time.Duration) (*Task, error)
- type MockQueue_Close_Call
- type MockQueue_DeleteTask_Call
- func (_c *MockQueue_DeleteTask_Call) Return(_a0 error) *MockQueue_DeleteTask_Call
- func (_c *MockQueue_DeleteTask_Call) Run(run func(ctx context.Context, taskID string)) *MockQueue_DeleteTask_Call
- func (_c *MockQueue_DeleteTask_Call) RunAndReturn(run func(context.Context, string) error) *MockQueue_DeleteTask_Call
- type MockQueue_EnqueueAt_Call
- type MockQueue_EnqueueIn_Call
- type MockQueue_Enqueue_Call
- type MockQueue_Expecter
- func (_e *MockQueue_Expecter) Close() *MockQueue_Close_Call
- func (_e *MockQueue_Expecter) DeleteTask(ctx interface{}, taskID interface{}) *MockQueue_DeleteTask_Call
- func (_e *MockQueue_Expecter) Enqueue(ctx interface{}, taskType interface{}, documentID interface{}, ...) *MockQueue_Enqueue_Call
- func (_e *MockQueue_Expecter) EnqueueAt(ctx interface{}, taskType interface{}, documentID interface{}, ...) *MockQueue_EnqueueAt_Call
- func (_e *MockQueue_Expecter) EnqueueIn(ctx interface{}, taskType interface{}, documentID interface{}, ...) *MockQueue_EnqueueIn_Call
- func (_e *MockQueue_Expecter) GetTask(ctx interface{}, taskID interface{}) *MockQueue_GetTask_Call
- func (_e *MockQueue_Expecter) GetTasksByDocument(ctx interface{}, documentID interface{}) *MockQueue_GetTasksByDocument_Call
- func (_e *MockQueue_Expecter) NotifyTaskUpdate(ctx interface{}, taskID interface{}) *MockQueue_NotifyTaskUpdate_Call
- func (_e *MockQueue_Expecter) UpdateTaskStatus(ctx interface{}, taskID interface{}, status interface{}, result interface{}, ...) *MockQueue_UpdateTaskStatus_Call
- func (_e *MockQueue_Expecter) WaitForTask(ctx interface{}, taskID interface{}, timeout interface{}) *MockQueue_WaitForTask_Call
- type MockQueue_GetTask_Call
- func (_c *MockQueue_GetTask_Call) Return(_a0 *Task, _a1 error) *MockQueue_GetTask_Call
- func (_c *MockQueue_GetTask_Call) Run(run func(ctx context.Context, taskID string)) *MockQueue_GetTask_Call
- func (_c *MockQueue_GetTask_Call) RunAndReturn(run func(context.Context, string) (*Task, error)) *MockQueue_GetTask_Call
- type MockQueue_GetTasksByDocument_Call
- func (_c *MockQueue_GetTasksByDocument_Call) Return(_a0 []*Task, _a1 error) *MockQueue_GetTasksByDocument_Call
- func (_c *MockQueue_GetTasksByDocument_Call) Run(run func(ctx context.Context, documentID string)) *MockQueue_GetTasksByDocument_Call
- func (_c *MockQueue_GetTasksByDocument_Call) RunAndReturn(run func(context.Context, string) ([]*Task, error)) *MockQueue_GetTasksByDocument_Call
- type MockQueue_NotifyTaskUpdate_Call
- func (_c *MockQueue_NotifyTaskUpdate_Call) Return(_a0 error) *MockQueue_NotifyTaskUpdate_Call
- func (_c *MockQueue_NotifyTaskUpdate_Call) Run(run func(ctx context.Context, taskID string)) *MockQueue_NotifyTaskUpdate_Call
- func (_c *MockQueue_NotifyTaskUpdate_Call) RunAndReturn(run func(context.Context, string) error) *MockQueue_NotifyTaskUpdate_Call
- type MockQueue_UpdateTaskStatus_Call
- func (_c *MockQueue_UpdateTaskStatus_Call) Return(_a0 error) *MockQueue_UpdateTaskStatus_Call
- func (_c *MockQueue_UpdateTaskStatus_Call) Run(...) *MockQueue_UpdateTaskStatus_Call
- func (_c *MockQueue_UpdateTaskStatus_Call) RunAndReturn(run func(context.Context, string, TaskStatus, interface{}, string) error) *MockQueue_UpdateTaskStatus_Call
- type MockQueue_WaitForTask_Call
- func (_c *MockQueue_WaitForTask_Call) Return(_a0 *Task, _a1 error) *MockQueue_WaitForTask_Call
- func (_c *MockQueue_WaitForTask_Call) Run(run func(ctx context.Context, taskID string, timeout time.Duration)) *MockQueue_WaitForTask_Call
- func (_c *MockQueue_WaitForTask_Call) RunAndReturn(run func(context.Context, string, time.Duration) (*Task, error)) *MockQueue_WaitForTask_Call
- type ProcessCompletePayload
- type ProcessCompleteResult
- type ProgressCallback
- type Queue
- type RedisQueue
- func (q *RedisQueue) Close() error
- func (q *RedisQueue) DeleteTask(ctx context.Context, taskID string) error
- func (q *RedisQueue) Enqueue(ctx context.Context, taskType TaskType, documentID string, payload interface{}) (string, error)
- func (q *RedisQueue) EnqueueAt(ctx context.Context, taskType TaskType, documentID string, payload interface{}, ...) (string, error)
- func (q *RedisQueue) EnqueueIn(ctx context.Context, taskType TaskType, documentID string, payload interface{}, ...) (string, error)
- func (q *RedisQueue) GetTask(ctx context.Context, taskID string) (*Task, error)
- func (q *RedisQueue) GetTasksByDocument(ctx context.Context, documentID string) ([]*Task, error)
- func (q *RedisQueue) NotifyTaskUpdate(ctx context.Context, taskID string) error
- func (q *RedisQueue) UpdateTaskStatus(ctx context.Context, taskID string, status TaskStatus, result interface{}, ...) error
- func (q *RedisQueue) WaitForTask(ctx context.Context, taskID string, timeout time.Duration) (*Task, error)
- type RedisWorker
- type Task
- type TaskCallback
- type TaskCallbackHandler
- func DefaultDocumentParseHandler(ctx context.Context, queue Queue, logger *logrus.Logger) TaskCallbackHandler
- func DefaultProcessCompleteHandler(ctx context.Context, queue Queue, logger *logrus.Logger) TaskCallbackHandler
- func DefaultTextChunkHandler(ctx context.Context, queue Queue, logger *logrus.Logger) TaskCallbackHandler
- func DefaultVectorizeHandler(ctx context.Context, queue Queue, logger *logrus.Logger) TaskCallbackHandler
- type TaskError
- type TaskInfo
- type TaskMetrics
- type TaskResult
- type TaskStatus
- type TaskType
- type TextChunkPayload
- type TextChunkResult
- type VectorInfo
- type VectorizePayload
- type VectorizeResult
- type Worker
Constants ¶
This section is empty.
Variables ¶
var ErrInvalidPayload = TaskError("invalid task payload")
ErrInvalidPayload 无效的任务载荷错误
var ErrTaskNotFound = TaskError("task not found")
ErrTaskNotFound 任务未找到错误
var ErrTaskTimeout = TaskError("task timed out")
ErrTaskTimeout 任务超时错误
Functions ¶
func MarshalPayload ¶
func MarshalPayload(payload interface{}) (json.RawMessage, error)
MarshalPayload 将任务载荷序列化为JSON
func RegisterQueueFactory ¶
RegisterQueueFactory 注册队列工厂函数
func UnmarshalPayload ¶
func UnmarshalPayload(data json.RawMessage, v interface{}) error
UnmarshalPayload 将JSON反序列化为任务载荷
Types ¶
type CallbackProcessor ¶
type CallbackProcessor struct {
// contains filtered or unexported fields
}
CallbackProcessor 回调处理器 负责接收和处理任务回调
func GetSharedCallbackProcessor ¶
func GetSharedCallbackProcessor(queue Queue, logger *logrus.Logger) *CallbackProcessor
GetSharedCallbackProcessor 返回一个单例的 CallbackProcessor 实例
func NewCallbackProcessor ¶
func NewCallbackProcessor(queue Queue, logger *logrus.Logger) *CallbackProcessor
NewCallbackProcessor 创建新的回调处理器
func (*CallbackProcessor) GetRegisteredHandlerTypes ¶
func (p *CallbackProcessor) GetRegisteredHandlerTypes() map[TaskType]bool
func (*CallbackProcessor) HandleCallback ¶
func (p *CallbackProcessor) HandleCallback(ctx context.Context, req *CallbackRequest) (*CallbackResponse, error)
HandleCallback 处理HTTP回调请求
func (*CallbackProcessor) ProcessCallback ¶
func (p *CallbackProcessor) ProcessCallback(ctx context.Context, callbackData []byte) error
ProcessCallback 处理回调数据
func (*CallbackProcessor) RegisterDefaultHandlers ¶
func (p *CallbackProcessor) RegisterDefaultHandlers(queue Queue)
RegisterDefaultHandlers 注册默认的任务处理函数
func (*CallbackProcessor) RegisterHandler ¶
func (p *CallbackProcessor) RegisterHandler(taskType TaskType, handler TaskCallbackHandler)
RegisterHandler 注册特定类型的任务处理函数
type CallbackRequest ¶
type CallbackRequest struct { TaskID string `json:"task_id"` // 任务ID DocumentID string `json:"document_id"` // 文档ID Status TaskStatus `json:"status"` // 任务状态 Type TaskType `json:"type"` // 任务类型 Result json.RawMessage `json:"result"` // 任务结果 Error string `json:"error"` // 错误信息 Timestamp string `json:"timestamp"` // 时间戳 }
CallbackRequest HTTP回调请求结构体
type CallbackResponse ¶
type CallbackResponse struct { Success bool `json:"success"` // 是否成功 Message string `json:"message,omitempty"` // 消息 TaskID string `json:"task_id"` // 任务ID Timestamp string `json:"timestamp"` // 时间戳 }
CallbackResponse HTTP回调响应结构体
type Config ¶
type Config struct { RedisAddr string // Redis地址 RedisPassword string // Redis密码 RedisDB int // Redis数据库 Concurrency int // 并发处理任务数 RetryLimit int // 最大重试次数 RetryDelay time.Duration // 重试延迟 Queues map[string]int // 队列名称到优先级的映射 }
Config 队列配置
type DocumentParsePayload ¶
type DocumentParsePayload struct { FilePath string `json:"file_path"` // 文件存储路径 FileName string `json:"file_name"` // 文件名 FileType string `json:"file_type"` // 文件类型 Metadata map[string]string `json:"metadata"` // 元数据 }
DocumentParsePayload 文档解析任务载荷
type DocumentParseResult ¶
type DocumentParseResult struct { Content string `json:"content"` // 解析后的文本内容 Title string `json:"title"` // 文档标题(如果有) Meta map[string]string `json:"meta"` // 提取的元数据 Error string `json:"error"` // 错误信息(如果有) Pages int `json:"pages"` // 文档页数(如果适用) Words int `json:"words"` // 单词数 Chars int `json:"chars"` // 字符数 }
DocumentParseResult 文档解析任务结果
type Handler ¶
type Handler interface { // ProcessTask 处理任务 ProcessTask(ctx context.Context, task *Task) error // GetTaskTypes 返回此处理器支持的任务类型 GetTaskTypes() []TaskType }
Handler 任务处理器接口 负责实际执行任务的逻辑
type MockQueue ¶
MockQueue is an autogenerated mock type for the Queue type
func NewMockQueue ¶
NewMockQueue creates a new instance of MockQueue. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.
func (*MockQueue) DeleteTask ¶
DeleteTask provides a mock function with given fields: ctx, taskID
func (*MockQueue) EXPECT ¶
func (_m *MockQueue) EXPECT() *MockQueue_Expecter
func (*MockQueue) Enqueue ¶
func (_m *MockQueue) Enqueue(ctx context.Context, taskType TaskType, documentID string, payload interface{}) (string, error)
Enqueue provides a mock function with given fields: ctx, taskType, documentID, payload
func (*MockQueue) EnqueueAt ¶
func (_m *MockQueue) EnqueueAt(ctx context.Context, taskType TaskType, documentID string, payload interface{}, processAt time.Time) (string, error)
EnqueueAt provides a mock function with given fields: ctx, taskType, documentID, payload, processAt
func (*MockQueue) EnqueueIn ¶
func (_m *MockQueue) EnqueueIn(ctx context.Context, taskType TaskType, documentID string, payload interface{}, delay time.Duration) (string, error)
EnqueueIn provides a mock function with given fields: ctx, taskType, documentID, payload, delay
func (*MockQueue) GetTasksByDocument ¶
GetTasksByDocument provides a mock function with given fields: ctx, documentID
func (*MockQueue) NotifyTaskUpdate ¶
NotifyTaskUpdate provides a mock function with given fields: ctx, taskID
func (*MockQueue) UpdateTaskStatus ¶
func (_m *MockQueue) UpdateTaskStatus(ctx context.Context, taskID string, status TaskStatus, result interface{}, errorMsg string) error
UpdateTaskStatus provides a mock function with given fields: ctx, taskID, status, result, errorMsg
type MockQueue_Close_Call ¶
MockQueue_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close'
func (*MockQueue_Close_Call) Return ¶
func (_c *MockQueue_Close_Call) Return(_a0 error) *MockQueue_Close_Call
func (*MockQueue_Close_Call) Run ¶
func (_c *MockQueue_Close_Call) Run(run func()) *MockQueue_Close_Call
func (*MockQueue_Close_Call) RunAndReturn ¶
func (_c *MockQueue_Close_Call) RunAndReturn(run func() error) *MockQueue_Close_Call
type MockQueue_DeleteTask_Call ¶
MockQueue_DeleteTask_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteTask'
func (*MockQueue_DeleteTask_Call) Return ¶
func (_c *MockQueue_DeleteTask_Call) Return(_a0 error) *MockQueue_DeleteTask_Call
func (*MockQueue_DeleteTask_Call) Run ¶
func (_c *MockQueue_DeleteTask_Call) Run(run func(ctx context.Context, taskID string)) *MockQueue_DeleteTask_Call
func (*MockQueue_DeleteTask_Call) RunAndReturn ¶
func (_c *MockQueue_DeleteTask_Call) RunAndReturn(run func(context.Context, string) error) *MockQueue_DeleteTask_Call
type MockQueue_EnqueueAt_Call ¶
MockQueue_EnqueueAt_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'EnqueueAt'
func (*MockQueue_EnqueueAt_Call) Return ¶
func (_c *MockQueue_EnqueueAt_Call) Return(_a0 string, _a1 error) *MockQueue_EnqueueAt_Call
func (*MockQueue_EnqueueAt_Call) Run ¶
func (_c *MockQueue_EnqueueAt_Call) Run(run func(ctx context.Context, taskType TaskType, documentID string, payload interface{}, processAt time.Time)) *MockQueue_EnqueueAt_Call
func (*MockQueue_EnqueueAt_Call) RunAndReturn ¶
func (_c *MockQueue_EnqueueAt_Call) RunAndReturn(run func(context.Context, TaskType, string, interface{}, time.Time) (string, error)) *MockQueue_EnqueueAt_Call
type MockQueue_EnqueueIn_Call ¶
MockQueue_EnqueueIn_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'EnqueueIn'
func (*MockQueue_EnqueueIn_Call) Return ¶
func (_c *MockQueue_EnqueueIn_Call) Return(_a0 string, _a1 error) *MockQueue_EnqueueIn_Call
func (*MockQueue_EnqueueIn_Call) Run ¶
func (_c *MockQueue_EnqueueIn_Call) Run(run func(ctx context.Context, taskType TaskType, documentID string, payload interface{}, delay time.Duration)) *MockQueue_EnqueueIn_Call
func (*MockQueue_EnqueueIn_Call) RunAndReturn ¶
func (_c *MockQueue_EnqueueIn_Call) RunAndReturn(run func(context.Context, TaskType, string, interface{}, time.Duration) (string, error)) *MockQueue_EnqueueIn_Call
type MockQueue_Enqueue_Call ¶
MockQueue_Enqueue_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Enqueue'
func (*MockQueue_Enqueue_Call) Return ¶
func (_c *MockQueue_Enqueue_Call) Return(_a0 string, _a1 error) *MockQueue_Enqueue_Call
func (*MockQueue_Enqueue_Call) Run ¶
func (_c *MockQueue_Enqueue_Call) Run(run func(ctx context.Context, taskType TaskType, documentID string, payload interface{})) *MockQueue_Enqueue_Call
func (*MockQueue_Enqueue_Call) RunAndReturn ¶
func (_c *MockQueue_Enqueue_Call) RunAndReturn(run func(context.Context, TaskType, string, interface{}) (string, error)) *MockQueue_Enqueue_Call
type MockQueue_Expecter ¶
type MockQueue_Expecter struct {
// contains filtered or unexported fields
}
func (*MockQueue_Expecter) Close ¶
func (_e *MockQueue_Expecter) Close() *MockQueue_Close_Call
Close is a helper method to define mock.On call
func (*MockQueue_Expecter) DeleteTask ¶
func (_e *MockQueue_Expecter) DeleteTask(ctx interface{}, taskID interface{}) *MockQueue_DeleteTask_Call
DeleteTask is a helper method to define mock.On call
- ctx context.Context
- taskID string
func (*MockQueue_Expecter) Enqueue ¶
func (_e *MockQueue_Expecter) Enqueue(ctx interface{}, taskType interface{}, documentID interface{}, payload interface{}) *MockQueue_Enqueue_Call
Enqueue is a helper method to define mock.On call
- ctx context.Context
- taskType TaskType
- documentID string
- payload interface{}
func (*MockQueue_Expecter) EnqueueAt ¶
func (_e *MockQueue_Expecter) EnqueueAt(ctx interface{}, taskType interface{}, documentID interface{}, payload interface{}, processAt interface{}) *MockQueue_EnqueueAt_Call
EnqueueAt is a helper method to define mock.On call
- ctx context.Context
- taskType TaskType
- documentID string
- payload interface{}
- processAt time.Time
func (*MockQueue_Expecter) EnqueueIn ¶
func (_e *MockQueue_Expecter) EnqueueIn(ctx interface{}, taskType interface{}, documentID interface{}, payload interface{}, delay interface{}) *MockQueue_EnqueueIn_Call
EnqueueIn is a helper method to define mock.On call
- ctx context.Context
- taskType TaskType
- documentID string
- payload interface{}
- delay time.Duration
func (*MockQueue_Expecter) GetTask ¶
func (_e *MockQueue_Expecter) GetTask(ctx interface{}, taskID interface{}) *MockQueue_GetTask_Call
GetTask is a helper method to define mock.On call
- ctx context.Context
- taskID string
func (*MockQueue_Expecter) GetTasksByDocument ¶
func (_e *MockQueue_Expecter) GetTasksByDocument(ctx interface{}, documentID interface{}) *MockQueue_GetTasksByDocument_Call
GetTasksByDocument is a helper method to define mock.On call
- ctx context.Context
- documentID string
func (*MockQueue_Expecter) NotifyTaskUpdate ¶
func (_e *MockQueue_Expecter) NotifyTaskUpdate(ctx interface{}, taskID interface{}) *MockQueue_NotifyTaskUpdate_Call
NotifyTaskUpdate is a helper method to define mock.On call
- ctx context.Context
- taskID string
func (*MockQueue_Expecter) UpdateTaskStatus ¶
func (_e *MockQueue_Expecter) UpdateTaskStatus(ctx interface{}, taskID interface{}, status interface{}, result interface{}, errorMsg interface{}) *MockQueue_UpdateTaskStatus_Call
UpdateTaskStatus is a helper method to define mock.On call
- ctx context.Context
- taskID string
- status TaskStatus
- result interface{}
- errorMsg string
func (*MockQueue_Expecter) WaitForTask ¶
func (_e *MockQueue_Expecter) WaitForTask(ctx interface{}, taskID interface{}, timeout interface{}) *MockQueue_WaitForTask_Call
WaitForTask is a helper method to define mock.On call
- ctx context.Context
- taskID string
- timeout time.Duration
type MockQueue_GetTask_Call ¶
MockQueue_GetTask_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetTask'
func (*MockQueue_GetTask_Call) Return ¶
func (_c *MockQueue_GetTask_Call) Return(_a0 *Task, _a1 error) *MockQueue_GetTask_Call
func (*MockQueue_GetTask_Call) Run ¶
func (_c *MockQueue_GetTask_Call) Run(run func(ctx context.Context, taskID string)) *MockQueue_GetTask_Call
func (*MockQueue_GetTask_Call) RunAndReturn ¶
func (_c *MockQueue_GetTask_Call) RunAndReturn(run func(context.Context, string) (*Task, error)) *MockQueue_GetTask_Call
type MockQueue_GetTasksByDocument_Call ¶
MockQueue_GetTasksByDocument_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetTasksByDocument'
func (*MockQueue_GetTasksByDocument_Call) Return ¶
func (_c *MockQueue_GetTasksByDocument_Call) Return(_a0 []*Task, _a1 error) *MockQueue_GetTasksByDocument_Call
func (*MockQueue_GetTasksByDocument_Call) Run ¶
func (_c *MockQueue_GetTasksByDocument_Call) Run(run func(ctx context.Context, documentID string)) *MockQueue_GetTasksByDocument_Call
func (*MockQueue_GetTasksByDocument_Call) RunAndReturn ¶
func (_c *MockQueue_GetTasksByDocument_Call) RunAndReturn(run func(context.Context, string) ([]*Task, error)) *MockQueue_GetTasksByDocument_Call
type MockQueue_NotifyTaskUpdate_Call ¶
MockQueue_NotifyTaskUpdate_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'NotifyTaskUpdate'
func (*MockQueue_NotifyTaskUpdate_Call) Return ¶
func (_c *MockQueue_NotifyTaskUpdate_Call) Return(_a0 error) *MockQueue_NotifyTaskUpdate_Call
func (*MockQueue_NotifyTaskUpdate_Call) Run ¶
func (_c *MockQueue_NotifyTaskUpdate_Call) Run(run func(ctx context.Context, taskID string)) *MockQueue_NotifyTaskUpdate_Call
func (*MockQueue_NotifyTaskUpdate_Call) RunAndReturn ¶
func (_c *MockQueue_NotifyTaskUpdate_Call) RunAndReturn(run func(context.Context, string) error) *MockQueue_NotifyTaskUpdate_Call
type MockQueue_UpdateTaskStatus_Call ¶
MockQueue_UpdateTaskStatus_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateTaskStatus'
func (*MockQueue_UpdateTaskStatus_Call) Return ¶
func (_c *MockQueue_UpdateTaskStatus_Call) Return(_a0 error) *MockQueue_UpdateTaskStatus_Call
func (*MockQueue_UpdateTaskStatus_Call) Run ¶
func (_c *MockQueue_UpdateTaskStatus_Call) Run(run func(ctx context.Context, taskID string, status TaskStatus, result interface{}, errorMsg string)) *MockQueue_UpdateTaskStatus_Call
func (*MockQueue_UpdateTaskStatus_Call) RunAndReturn ¶
func (_c *MockQueue_UpdateTaskStatus_Call) RunAndReturn(run func(context.Context, string, TaskStatus, interface{}, string) error) *MockQueue_UpdateTaskStatus_Call
type MockQueue_WaitForTask_Call ¶
MockQueue_WaitForTask_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WaitForTask'
func (*MockQueue_WaitForTask_Call) Return ¶
func (_c *MockQueue_WaitForTask_Call) Return(_a0 *Task, _a1 error) *MockQueue_WaitForTask_Call
func (*MockQueue_WaitForTask_Call) Run ¶
func (_c *MockQueue_WaitForTask_Call) Run(run func(ctx context.Context, taskID string, timeout time.Duration)) *MockQueue_WaitForTask_Call
func (*MockQueue_WaitForTask_Call) RunAndReturn ¶
func (_c *MockQueue_WaitForTask_Call) RunAndReturn(run func(context.Context, string, time.Duration) (*Task, error)) *MockQueue_WaitForTask_Call
type ProcessCompletePayload ¶
type ProcessCompletePayload struct { DocumentID string `json:"document_id"` // 文档ID FilePath string `json:"file_path"` // 文件路径 FileName string `json:"file_name"` // 文件名 FileType string `json:"file_type"` // 文件类型 ChunkSize int `json:"chunk_size"` // 分块大小 Overlap int `json:"overlap"` // 重叠大小 SplitType string `json:"split_type"` // 分割类型 Model string `json:"model"` // 嵌入模型 Metadata map[string]string `json:"metadata"` // 元数据 }
ProcessCompletePayload 完整处理流程任务载荷
type ProcessCompleteResult ¶
type ProcessCompleteResult struct { DocumentID string `json:"document_id"` // 文档ID ChunkCount int `json:"chunk_count"` // 分块数量 VectorCount int `json:"vector_count"` // 向量数量 Dimension int `json:"dimension"` // 向量维度 ParseStatus string `json:"parse_status"` // 解析状态 ChunkStatus string `json:"chunk_status"` // 分块状态 VectorStatus string `json:"vector_status"` // 向量化状态 Error string `json:"error"` // 错误信息(如果有) Vectors []VectorInfo `json:"vectors"` // 可选,根据配置决定是否返回向量数据 }
ProcessCompleteResult 完整处理流程结果
type ProgressCallback ¶
ProgressCallback 进度回调函数 用于报告任务处理进度
type Queue ¶
type Queue interface { // Enqueue 将任务加入队列 Enqueue(ctx context.Context, taskType TaskType, documentID string, payload interface{}) (string, error) // EnqueueAt 在指定时间将任务加入队列 EnqueueAt(ctx context.Context, taskType TaskType, documentID string, payload interface{}, processAt time.Time) (string, error) // EnqueueIn 在指定延迟后将任务加入队列 EnqueueIn(ctx context.Context, taskType TaskType, documentID string, payload interface{}, delay time.Duration) (string, error) // GetTask 获取任务信息 GetTask(ctx context.Context, taskID string) (*Task, error) // GetTasksByDocument 获取文档相关的所有任务 GetTasksByDocument(ctx context.Context, documentID string) ([]*Task, error) // WaitForTask 等待任务完成并返回结果 // timeout为0表示不设置超时 WaitForTask(ctx context.Context, taskID string, timeout time.Duration) (*Task, error) // DeleteTask 删除任务 DeleteTask(ctx context.Context, taskID string) error // UpdateTaskStatus 更新任务状态和结果 UpdateTaskStatus(ctx context.Context, taskID string, status TaskStatus, result interface{}, errorMsg string) error // NotifyTaskUpdate 通知任务状态已更新 NotifyTaskUpdate(ctx context.Context, taskID string) error // Close 关闭队列连接 Close() error }
Queue 定义任务队列的接口 负责任务的入队、获取状态和结果等操作
type RedisQueue ¶
type RedisQueue struct {
// contains filtered or unexported fields
}
RedisQueue Redis任务队列实现
func (*RedisQueue) DeleteTask ¶
func (q *RedisQueue) DeleteTask(ctx context.Context, taskID string) error
DeleteTask 删除任务
func (*RedisQueue) Enqueue ¶
func (q *RedisQueue) Enqueue(ctx context.Context, taskType TaskType, documentID string, payload interface{}) (string, error)
Enqueue 将任务加入队列
func (*RedisQueue) EnqueueAt ¶
func (q *RedisQueue) EnqueueAt(ctx context.Context, taskType TaskType, documentID string, payload interface{}, processAt time.Time) (string, error)
EnqueueAt 在指定时间将任务加入队列
func (*RedisQueue) EnqueueIn ¶
func (q *RedisQueue) EnqueueIn(ctx context.Context, taskType TaskType, documentID string, payload interface{}, delay time.Duration) (string, error)
EnqueueIn 在指定延迟后将任务加入队列
func (*RedisQueue) GetTasksByDocument ¶
GetTasksByDocument 获取文档相关的所有任务
func (*RedisQueue) NotifyTaskUpdate ¶
func (q *RedisQueue) NotifyTaskUpdate(ctx context.Context, taskID string) error
NotifyTaskUpdate 通知任务状态更新
func (*RedisQueue) UpdateTaskStatus ¶
func (q *RedisQueue) UpdateTaskStatus(ctx context.Context, taskID string, status TaskStatus, result interface{}, errMsg string) error
UpdateTaskStatus 更新任务状态
func (*RedisQueue) WaitForTask ¶
func (q *RedisQueue) WaitForTask(ctx context.Context, taskID string, timeout time.Duration) (*Task, error)
WaitForTask 等待任务完成并返回结果
type RedisWorker ¶
type RedisWorker struct {
// contains filtered or unexported fields
}
RedisWorker Redis工作者实现
func (*RedisWorker) RegisterHandler ¶
func (w *RedisWorker) RegisterHandler(taskType TaskType, handler Handler)
RegisterHandler 注册任务处理器
type Task ¶
type Task struct { ID string `json:"id"` // 任务唯一标识符 Type TaskType `json:"type"` // 任务类型 DocumentID string `json:"document_id"` // 关联的文档ID Status TaskStatus `json:"status"` // 任务状态 Payload json.RawMessage `json:"payload"` // 任务载荷数据,不同任务类型对应不同结构 Result json.RawMessage `json:"result"` // 任务结果数据,不同任务类型对应不同结构 Error string `json:"error"` // 错误信息(如果处理失败) CreatedAt time.Time `json:"created_at"` // 创建时间 UpdatedAt time.Time `json:"updated_at"` // 更新时间 StartedAt *time.Time `json:"started_at"` // 开始处理时间 CompletedAt *time.Time `json:"completed_at"` // 完成时间 Attempts int `json:"attempts"` // 尝试次数 MaxRetries int `json:"max_retries"` // 最大重试次数 }
Task 任务基础结构
type TaskCallback ¶
type TaskCallback struct { TaskID string `json:"task_id"` // 任务ID DocumentID string `json:"document_id"` // 文档ID Status TaskStatus `json:"status"` // 任务状态 Type TaskType `json:"type"` // 任务类型 Result json.RawMessage `json:"result"` // 任务结果 Error string `json:"error"` // 错误信息 Timestamp time.Time `json:"timestamp"` // 回调时间戳 }
TaskCallback 任务回调信息
type TaskCallbackHandler ¶
TaskCallbackHandler 任务回调处理函数类型 处理特定类型任务的回调,返回处理结果
func DefaultDocumentParseHandler ¶
func DefaultDocumentParseHandler(ctx context.Context, queue Queue, logger *logrus.Logger) TaskCallbackHandler
DefaultDocumentParseHandler 默认的文档解析回调处理函数 处理完成后创建分块任务
func DefaultProcessCompleteHandler ¶
func DefaultProcessCompleteHandler(ctx context.Context, queue Queue, logger *logrus.Logger) TaskCallbackHandler
DefaultProcessCompleteHandler 默认的完整处理流程回调处理函数
func DefaultTextChunkHandler ¶
func DefaultTextChunkHandler(ctx context.Context, queue Queue, logger *logrus.Logger) TaskCallbackHandler
DefaultTextChunkHandler 默认的文本分块回调处理函数 处理完成后创建向量化任务
func DefaultVectorizeHandler ¶
func DefaultVectorizeHandler(ctx context.Context, queue Queue, logger *logrus.Logger) TaskCallbackHandler
DefaultVectorizeHandler 默认的向量化回调处理函数 向量化是任务流程的最后一步,处理完成后更新文档状态
type TaskInfo ¶
type TaskInfo struct { ID string `json:"id"` // 任务唯一标识符 Type TaskType `json:"type"` // 任务类型 DocumentID string `json:"document_id"` // 关联的文档ID Status TaskStatus `json:"status"` // 任务状态 Error string `json:"error"` // 错误信息 CreatedAt time.Time `json:"created_at"` // 创建时间 StartedAt *time.Time `json:"started_at"` // 开始处理时间 CompletedAt *time.Time `json:"completed_at"` // 完成时间 Progress float64 `json:"progress"` // 处理进度(0-100) }
TaskInfo 表示任务的元信息 用于传递给客户端的简化任务信息
type TaskMetrics ¶
type TaskMetrics struct { ProcessingTime time.Duration // 处理时间 RetryCount int // 重试次数 UsedMemory int64 // 使用内存 }
TaskMetrics 任务执行指标
type TaskResult ¶
type TaskResult struct { Status TaskStatus // 任务状态 Result interface{} // 任务结果数据 Error string // 错误信息 Metrics TaskMetrics // 任务执行指标 }
TaskResult 表示任务执行的结果 通常由Handler返回,用于更新任务状态和结果
type TaskStatus ¶
type TaskStatus string
TaskStatus 任务状态
const ( // StatusPending 等待处理 StatusPending TaskStatus = "pending" // StatusProcessing 处理中 StatusProcessing TaskStatus = "processing" // StatusCompleted 已完成 StatusCompleted TaskStatus = "completed" // StatusFailed 处理失败 StatusFailed TaskStatus = "failed" )
type TextChunkPayload ¶
type TextChunkPayload struct { DocumentID string `json:"document_id"` // 文档ID Content string `json:"content"` // 文本内容 ChunkSize int `json:"chunk_size"` // 分块大小 Overlap int `json:"overlap"` // 重叠大小 SplitType string `json:"split_type"` // 分割类型: paragraph, sentence, length }
TextChunkPayload 文本分块任务载荷
type TextChunkResult ¶
type TextChunkResult struct { DocumentID string `json:"document_id"` // 文档ID Chunks []ChunkInfo `json:"chunks"` // 分块列表 ChunkCount int `json:"chunk_count"` // 分块数量 Error string `json:"error"` // 错误信息(如果有) }
TextChunkResult 文本分块任务结果
type VectorInfo ¶
type VectorInfo struct { ChunkIndex int `json:"chunk_index"` // 分块索引 Vector []float32 `json:"vector"` // 向量数据 }
VectorInfo 向量信息
type VectorizePayload ¶
type VectorizePayload struct { DocumentID string `json:"document_id"` // 文档ID Chunks []ChunkInfo `json:"chunks"` // 文本分块 Model string `json:"model"` // 嵌入模型名称 }
VectorizePayload 文本向量化任务载荷
type VectorizeResult ¶
type VectorizeResult struct { DocumentID string `json:"document_id"` // 文档ID Vectors []VectorInfo `json:"vectors"` // 向量列表 VectorCount int `json:"vector_count"` // 向量数量 Model string `json:"model"` // 使用的模型 Dimension int `json:"dimension"` // 向量维度 Error string `json:"error"` // 错误信息(如果有) }
VectorizeResult 向量化任务结果
type Worker ¶
type Worker interface { // RegisterHandler 注册任务处理器 RegisterHandler(taskType TaskType, handler Handler) // Start 启动工作者,开始处理任务 Start() error // Stop 停止工作者 Stop() }
Worker 工作者接口 负责运行一组Handler来处理队列中的任务
func NewRedisWorker ¶
func NewRedisWorker(queue *RedisQueue, cfg *Config) Worker
NewRedisWorker 创建Redis工作者