taskqueue

package
v0.0.0-...-e636480 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 1, 2025 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrInvalidPayload = TaskError("invalid task payload")

ErrInvalidPayload 无效的任务载荷错误

View Source
var ErrTaskNotFound = TaskError("task not found")

ErrTaskNotFound 任务未找到错误

View Source
var ErrTaskTimeout = TaskError("task timed out")

ErrTaskTimeout 任务超时错误

Functions

func MarshalPayload

func MarshalPayload(payload interface{}) (json.RawMessage, error)

MarshalPayload 将任务载荷序列化为JSON

func RegisterQueueFactory

func RegisterQueueFactory(name string, factory Factory)

RegisterQueueFactory 注册队列工厂函数

func UnmarshalPayload

func UnmarshalPayload(data json.RawMessage, v interface{}) error

UnmarshalPayload 将JSON反序列化为任务载荷

Types

type CallbackProcessor

type CallbackProcessor struct {
	// contains filtered or unexported fields
}

CallbackProcessor 回调处理器 负责接收和处理任务回调

func GetSharedCallbackProcessor

func GetSharedCallbackProcessor(queue Queue, logger *logrus.Logger) *CallbackProcessor

GetSharedCallbackProcessor 返回一个单例的 CallbackProcessor 实例

func NewCallbackProcessor

func NewCallbackProcessor(queue Queue, logger *logrus.Logger) *CallbackProcessor

NewCallbackProcessor 创建新的回调处理器

func (*CallbackProcessor) GetRegisteredHandlerTypes

func (p *CallbackProcessor) GetRegisteredHandlerTypes() map[TaskType]bool

func (*CallbackProcessor) HandleCallback

func (p *CallbackProcessor) HandleCallback(ctx context.Context, req *CallbackRequest) (*CallbackResponse, error)

HandleCallback 处理HTTP回调请求

func (*CallbackProcessor) ProcessCallback

func (p *CallbackProcessor) ProcessCallback(ctx context.Context, callbackData []byte) error

ProcessCallback 处理回调数据

func (*CallbackProcessor) RegisterDefaultHandlers

func (p *CallbackProcessor) RegisterDefaultHandlers(queue Queue)

RegisterDefaultHandlers 注册默认的任务处理函数

func (*CallbackProcessor) RegisterHandler

func (p *CallbackProcessor) RegisterHandler(taskType TaskType, handler TaskCallbackHandler)

RegisterHandler 注册特定类型的任务处理函数

type CallbackRequest

type CallbackRequest struct {
	TaskID     string          `json:"task_id"`     // 任务ID
	DocumentID string          `json:"document_id"` // 文档ID
	Status     TaskStatus      `json:"status"`      // 任务状态
	Type       TaskType        `json:"type"`        // 任务类型
	Result     json.RawMessage `json:"result"`      // 任务结果
	Error      string          `json:"error"`       // 错误信息
	Timestamp  string          `json:"timestamp"`   // 时间戳
}

CallbackRequest HTTP回调请求结构体

type CallbackResponse

type CallbackResponse struct {
	Success   bool   `json:"success"`           // 是否成功
	Message   string `json:"message,omitempty"` // 消息
	TaskID    string `json:"task_id"`           // 任务ID
	Timestamp string `json:"timestamp"`         // 时间戳
}

CallbackResponse HTTP回调响应结构体

type ChunkInfo

type ChunkInfo struct {
	Text  string `json:"text"`  // 分块文本
	Index int    `json:"index"` // 分块索引
}

ChunkInfo 分块信息

type Config

type Config struct {
	RedisAddr     string         // Redis地址
	RedisPassword string         // Redis密码
	RedisDB       int            // Redis数据库
	Concurrency   int            // 并发处理任务数
	RetryLimit    int            // 最大重试次数
	RetryDelay    time.Duration  // 重试延迟
	Queues        map[string]int // 队列名称到优先级的映射
}

Config 队列配置

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig 返回默认配置

type DocumentParsePayload

type DocumentParsePayload struct {
	FilePath string            `json:"file_path"` // 文件存储路径
	FileName string            `json:"file_name"` // 文件名
	FileType string            `json:"file_type"` // 文件类型
	Metadata map[string]string `json:"metadata"`  // 元数据
}

DocumentParsePayload 文档解析任务载荷

type DocumentParseResult

type DocumentParseResult struct {
	Content string            `json:"content"` // 解析后的文本内容
	Title   string            `json:"title"`   // 文档标题(如果有)
	Meta    map[string]string `json:"meta"`    // 提取的元数据
	Error   string            `json:"error"`   // 错误信息(如果有)
	Pages   int               `json:"pages"`   // 文档页数(如果适用)
	Words   int               `json:"words"`   // 单词数
	Chars   int               `json:"chars"`   // 字符数
}

DocumentParseResult 文档解析任务结果

type Factory

type Factory func(cfg *Config) (Queue, error)

Factory 队列工厂函数类型 用于创建不同类型的队列实现

type Handler

type Handler interface {
	// ProcessTask 处理任务
	ProcessTask(ctx context.Context, task *Task) error

	// GetTaskTypes 返回此处理器支持的任务类型
	GetTaskTypes() []TaskType
}

Handler 任务处理器接口 负责实际执行任务的逻辑

type MockQueue

type MockQueue struct {
	mock.Mock
}

MockQueue is an autogenerated mock type for the Queue type

func NewMockQueue

func NewMockQueue(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockQueue

NewMockQueue creates a new instance of MockQueue. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockQueue) Close

func (_m *MockQueue) Close() error

Close provides a mock function with no fields

func (*MockQueue) DeleteTask

func (_m *MockQueue) DeleteTask(ctx context.Context, taskID string) error

DeleteTask provides a mock function with given fields: ctx, taskID

func (*MockQueue) EXPECT

func (_m *MockQueue) EXPECT() *MockQueue_Expecter

func (*MockQueue) Enqueue

func (_m *MockQueue) Enqueue(ctx context.Context, taskType TaskType, documentID string, payload interface{}) (string, error)

Enqueue provides a mock function with given fields: ctx, taskType, documentID, payload

func (*MockQueue) EnqueueAt

func (_m *MockQueue) EnqueueAt(ctx context.Context, taskType TaskType, documentID string, payload interface{}, processAt time.Time) (string, error)

EnqueueAt provides a mock function with given fields: ctx, taskType, documentID, payload, processAt

func (*MockQueue) EnqueueIn

func (_m *MockQueue) EnqueueIn(ctx context.Context, taskType TaskType, documentID string, payload interface{}, delay time.Duration) (string, error)

EnqueueIn provides a mock function with given fields: ctx, taskType, documentID, payload, delay

func (*MockQueue) GetTask

func (_m *MockQueue) GetTask(ctx context.Context, taskID string) (*Task, error)

GetTask provides a mock function with given fields: ctx, taskID

func (*MockQueue) GetTasksByDocument

func (_m *MockQueue) GetTasksByDocument(ctx context.Context, documentID string) ([]*Task, error)

GetTasksByDocument provides a mock function with given fields: ctx, documentID

func (*MockQueue) NotifyTaskUpdate

func (_m *MockQueue) NotifyTaskUpdate(ctx context.Context, taskID string) error

NotifyTaskUpdate provides a mock function with given fields: ctx, taskID

func (*MockQueue) UpdateTaskStatus

func (_m *MockQueue) UpdateTaskStatus(ctx context.Context, taskID string, status TaskStatus, result interface{}, errorMsg string) error

UpdateTaskStatus provides a mock function with given fields: ctx, taskID, status, result, errorMsg

func (*MockQueue) WaitForTask

func (_m *MockQueue) WaitForTask(ctx context.Context, taskID string, timeout time.Duration) (*Task, error)

WaitForTask provides a mock function with given fields: ctx, taskID, timeout

type MockQueue_Close_Call

type MockQueue_Close_Call struct {
	*mock.Call
}

MockQueue_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close'

func (*MockQueue_Close_Call) Return

func (*MockQueue_Close_Call) Run

func (_c *MockQueue_Close_Call) Run(run func()) *MockQueue_Close_Call

func (*MockQueue_Close_Call) RunAndReturn

func (_c *MockQueue_Close_Call) RunAndReturn(run func() error) *MockQueue_Close_Call

type MockQueue_DeleteTask_Call

type MockQueue_DeleteTask_Call struct {
	*mock.Call
}

MockQueue_DeleteTask_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteTask'

func (*MockQueue_DeleteTask_Call) Return

func (*MockQueue_DeleteTask_Call) Run

func (*MockQueue_DeleteTask_Call) RunAndReturn

type MockQueue_EnqueueAt_Call

type MockQueue_EnqueueAt_Call struct {
	*mock.Call
}

MockQueue_EnqueueAt_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'EnqueueAt'

func (*MockQueue_EnqueueAt_Call) Return

func (*MockQueue_EnqueueAt_Call) Run

func (_c *MockQueue_EnqueueAt_Call) Run(run func(ctx context.Context, taskType TaskType, documentID string, payload interface{}, processAt time.Time)) *MockQueue_EnqueueAt_Call

func (*MockQueue_EnqueueAt_Call) RunAndReturn

func (_c *MockQueue_EnqueueAt_Call) RunAndReturn(run func(context.Context, TaskType, string, interface{}, time.Time) (string, error)) *MockQueue_EnqueueAt_Call

type MockQueue_EnqueueIn_Call

type MockQueue_EnqueueIn_Call struct {
	*mock.Call
}

MockQueue_EnqueueIn_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'EnqueueIn'

func (*MockQueue_EnqueueIn_Call) Return

func (*MockQueue_EnqueueIn_Call) Run

func (_c *MockQueue_EnqueueIn_Call) Run(run func(ctx context.Context, taskType TaskType, documentID string, payload interface{}, delay time.Duration)) *MockQueue_EnqueueIn_Call

func (*MockQueue_EnqueueIn_Call) RunAndReturn

func (_c *MockQueue_EnqueueIn_Call) RunAndReturn(run func(context.Context, TaskType, string, interface{}, time.Duration) (string, error)) *MockQueue_EnqueueIn_Call

type MockQueue_Enqueue_Call

type MockQueue_Enqueue_Call struct {
	*mock.Call
}

MockQueue_Enqueue_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Enqueue'

func (*MockQueue_Enqueue_Call) Return

func (*MockQueue_Enqueue_Call) Run

func (_c *MockQueue_Enqueue_Call) Run(run func(ctx context.Context, taskType TaskType, documentID string, payload interface{})) *MockQueue_Enqueue_Call

func (*MockQueue_Enqueue_Call) RunAndReturn

func (_c *MockQueue_Enqueue_Call) RunAndReturn(run func(context.Context, TaskType, string, interface{}) (string, error)) *MockQueue_Enqueue_Call

type MockQueue_Expecter

type MockQueue_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockQueue_Expecter) Close

Close is a helper method to define mock.On call

func (*MockQueue_Expecter) DeleteTask

func (_e *MockQueue_Expecter) DeleteTask(ctx interface{}, taskID interface{}) *MockQueue_DeleteTask_Call

DeleteTask is a helper method to define mock.On call

  • ctx context.Context
  • taskID string

func (*MockQueue_Expecter) Enqueue

func (_e *MockQueue_Expecter) Enqueue(ctx interface{}, taskType interface{}, documentID interface{}, payload interface{}) *MockQueue_Enqueue_Call

Enqueue is a helper method to define mock.On call

  • ctx context.Context
  • taskType TaskType
  • documentID string
  • payload interface{}

func (*MockQueue_Expecter) EnqueueAt

func (_e *MockQueue_Expecter) EnqueueAt(ctx interface{}, taskType interface{}, documentID interface{}, payload interface{}, processAt interface{}) *MockQueue_EnqueueAt_Call

EnqueueAt is a helper method to define mock.On call

  • ctx context.Context
  • taskType TaskType
  • documentID string
  • payload interface{}
  • processAt time.Time

func (*MockQueue_Expecter) EnqueueIn

func (_e *MockQueue_Expecter) EnqueueIn(ctx interface{}, taskType interface{}, documentID interface{}, payload interface{}, delay interface{}) *MockQueue_EnqueueIn_Call

EnqueueIn is a helper method to define mock.On call

  • ctx context.Context
  • taskType TaskType
  • documentID string
  • payload interface{}
  • delay time.Duration

func (*MockQueue_Expecter) GetTask

func (_e *MockQueue_Expecter) GetTask(ctx interface{}, taskID interface{}) *MockQueue_GetTask_Call

GetTask is a helper method to define mock.On call

  • ctx context.Context
  • taskID string

func (*MockQueue_Expecter) GetTasksByDocument

func (_e *MockQueue_Expecter) GetTasksByDocument(ctx interface{}, documentID interface{}) *MockQueue_GetTasksByDocument_Call

GetTasksByDocument is a helper method to define mock.On call

  • ctx context.Context
  • documentID string

func (*MockQueue_Expecter) NotifyTaskUpdate

func (_e *MockQueue_Expecter) NotifyTaskUpdate(ctx interface{}, taskID interface{}) *MockQueue_NotifyTaskUpdate_Call

NotifyTaskUpdate is a helper method to define mock.On call

  • ctx context.Context
  • taskID string

func (*MockQueue_Expecter) UpdateTaskStatus

func (_e *MockQueue_Expecter) UpdateTaskStatus(ctx interface{}, taskID interface{}, status interface{}, result interface{}, errorMsg interface{}) *MockQueue_UpdateTaskStatus_Call

UpdateTaskStatus is a helper method to define mock.On call

  • ctx context.Context
  • taskID string
  • status TaskStatus
  • result interface{}
  • errorMsg string

func (*MockQueue_Expecter) WaitForTask

func (_e *MockQueue_Expecter) WaitForTask(ctx interface{}, taskID interface{}, timeout interface{}) *MockQueue_WaitForTask_Call

WaitForTask is a helper method to define mock.On call

  • ctx context.Context
  • taskID string
  • timeout time.Duration

type MockQueue_GetTask_Call

type MockQueue_GetTask_Call struct {
	*mock.Call
}

MockQueue_GetTask_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetTask'

func (*MockQueue_GetTask_Call) Return

func (*MockQueue_GetTask_Call) Run

func (_c *MockQueue_GetTask_Call) Run(run func(ctx context.Context, taskID string)) *MockQueue_GetTask_Call

func (*MockQueue_GetTask_Call) RunAndReturn

func (_c *MockQueue_GetTask_Call) RunAndReturn(run func(context.Context, string) (*Task, error)) *MockQueue_GetTask_Call

type MockQueue_GetTasksByDocument_Call

type MockQueue_GetTasksByDocument_Call struct {
	*mock.Call
}

MockQueue_GetTasksByDocument_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetTasksByDocument'

func (*MockQueue_GetTasksByDocument_Call) Return

func (*MockQueue_GetTasksByDocument_Call) Run

func (*MockQueue_GetTasksByDocument_Call) RunAndReturn

type MockQueue_NotifyTaskUpdate_Call

type MockQueue_NotifyTaskUpdate_Call struct {
	*mock.Call
}

MockQueue_NotifyTaskUpdate_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'NotifyTaskUpdate'

func (*MockQueue_NotifyTaskUpdate_Call) Return

func (*MockQueue_NotifyTaskUpdate_Call) Run

func (*MockQueue_NotifyTaskUpdate_Call) RunAndReturn

type MockQueue_UpdateTaskStatus_Call

type MockQueue_UpdateTaskStatus_Call struct {
	*mock.Call
}

MockQueue_UpdateTaskStatus_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateTaskStatus'

func (*MockQueue_UpdateTaskStatus_Call) Return

func (*MockQueue_UpdateTaskStatus_Call) Run

func (_c *MockQueue_UpdateTaskStatus_Call) Run(run func(ctx context.Context, taskID string, status TaskStatus, result interface{}, errorMsg string)) *MockQueue_UpdateTaskStatus_Call

func (*MockQueue_UpdateTaskStatus_Call) RunAndReturn

type MockQueue_WaitForTask_Call

type MockQueue_WaitForTask_Call struct {
	*mock.Call
}

MockQueue_WaitForTask_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WaitForTask'

func (*MockQueue_WaitForTask_Call) Return

func (*MockQueue_WaitForTask_Call) Run

func (_c *MockQueue_WaitForTask_Call) Run(run func(ctx context.Context, taskID string, timeout time.Duration)) *MockQueue_WaitForTask_Call

func (*MockQueue_WaitForTask_Call) RunAndReturn

type ProcessCompletePayload

type ProcessCompletePayload struct {
	DocumentID string            `json:"document_id"` // 文档ID
	FilePath   string            `json:"file_path"`   // 文件路径
	FileName   string            `json:"file_name"`   // 文件名
	FileType   string            `json:"file_type"`   // 文件类型
	ChunkSize  int               `json:"chunk_size"`  // 分块大小
	Overlap    int               `json:"overlap"`     // 重叠大小
	SplitType  string            `json:"split_type"`  // 分割类型
	Model      string            `json:"model"`       // 嵌入模型
	Metadata   map[string]string `json:"metadata"`    // 元数据
}

ProcessCompletePayload 完整处理流程任务载荷

type ProcessCompleteResult

type ProcessCompleteResult struct {
	DocumentID   string       `json:"document_id"`   // 文档ID
	ChunkCount   int          `json:"chunk_count"`   // 分块数量
	VectorCount  int          `json:"vector_count"`  // 向量数量
	Dimension    int          `json:"dimension"`     // 向量维度
	ParseStatus  string       `json:"parse_status"`  // 解析状态
	ChunkStatus  string       `json:"chunk_status"`  // 分块状态
	VectorStatus string       `json:"vector_status"` // 向量化状态
	Error        string       `json:"error"`         // 错误信息(如果有)
	Vectors      []VectorInfo `json:"vectors"`       // 可选,根据配置决定是否返回向量数据
}

ProcessCompleteResult 完整处理流程结果

type ProgressCallback

type ProgressCallback func(taskID string, progress float64, status string)

ProgressCallback 进度回调函数 用于报告任务处理进度

type Queue

type Queue interface {
	// Enqueue 将任务加入队列
	Enqueue(ctx context.Context, taskType TaskType, documentID string, payload interface{}) (string, error)

	// EnqueueAt 在指定时间将任务加入队列
	EnqueueAt(ctx context.Context, taskType TaskType, documentID string, payload interface{}, processAt time.Time) (string, error)

	// EnqueueIn 在指定延迟后将任务加入队列
	EnqueueIn(ctx context.Context, taskType TaskType, documentID string, payload interface{}, delay time.Duration) (string, error)

	// GetTask 获取任务信息
	GetTask(ctx context.Context, taskID string) (*Task, error)

	// GetTasksByDocument 获取文档相关的所有任务
	GetTasksByDocument(ctx context.Context, documentID string) ([]*Task, error)

	// WaitForTask 等待任务完成并返回结果
	// timeout为0表示不设置超时
	WaitForTask(ctx context.Context, taskID string, timeout time.Duration) (*Task, error)

	// DeleteTask 删除任务
	DeleteTask(ctx context.Context, taskID string) error

	// UpdateTaskStatus 更新任务状态和结果
	UpdateTaskStatus(ctx context.Context, taskID string, status TaskStatus, result interface{}, errorMsg string) error

	// NotifyTaskUpdate 通知任务状态已更新
	NotifyTaskUpdate(ctx context.Context, taskID string) error

	// Close 关闭队列连接
	Close() error
}

Queue 定义任务队列的接口 负责任务的入队、获取状态和结果等操作

func NewQueue

func NewQueue(name string, cfg *Config) (Queue, error)

NewQueue 根据名称创建队列实例

func NewRedisQueue

func NewRedisQueue(cfg *Config) (Queue, error)

NewRedisQueue 创建Redis任务队列实例

type RedisQueue

type RedisQueue struct {
	// contains filtered or unexported fields
}

RedisQueue Redis任务队列实现

func (*RedisQueue) Close

func (q *RedisQueue) Close() error

Close 关闭队列连接

func (*RedisQueue) DeleteTask

func (q *RedisQueue) DeleteTask(ctx context.Context, taskID string) error

DeleteTask 删除任务

func (*RedisQueue) Enqueue

func (q *RedisQueue) Enqueue(ctx context.Context, taskType TaskType, documentID string, payload interface{}) (string, error)

Enqueue 将任务加入队列

func (*RedisQueue) EnqueueAt

func (q *RedisQueue) EnqueueAt(ctx context.Context, taskType TaskType, documentID string, payload interface{}, processAt time.Time) (string, error)

EnqueueAt 在指定时间将任务加入队列

func (*RedisQueue) EnqueueIn

func (q *RedisQueue) EnqueueIn(ctx context.Context, taskType TaskType, documentID string, payload interface{}, delay time.Duration) (string, error)

EnqueueIn 在指定延迟后将任务加入队列

func (*RedisQueue) GetTask

func (q *RedisQueue) GetTask(ctx context.Context, taskID string) (*Task, error)

GetTask 获取任务信息

func (*RedisQueue) GetTasksByDocument

func (q *RedisQueue) GetTasksByDocument(ctx context.Context, documentID string) ([]*Task, error)

GetTasksByDocument 获取文档相关的所有任务

func (*RedisQueue) NotifyTaskUpdate

func (q *RedisQueue) NotifyTaskUpdate(ctx context.Context, taskID string) error

NotifyTaskUpdate 通知任务状态更新

func (*RedisQueue) UpdateTaskStatus

func (q *RedisQueue) UpdateTaskStatus(ctx context.Context, taskID string, status TaskStatus, result interface{}, errMsg string) error

UpdateTaskStatus 更新任务状态

func (*RedisQueue) WaitForTask

func (q *RedisQueue) WaitForTask(ctx context.Context, taskID string, timeout time.Duration) (*Task, error)

WaitForTask 等待任务完成并返回结果

type RedisWorker

type RedisWorker struct {
	// contains filtered or unexported fields
}

RedisWorker Redis工作者实现

func (*RedisWorker) RegisterHandler

func (w *RedisWorker) RegisterHandler(taskType TaskType, handler Handler)

RegisterHandler 注册任务处理器

func (*RedisWorker) Start

func (w *RedisWorker) Start() error

Start 启动工作者

func (*RedisWorker) Stop

func (w *RedisWorker) Stop()

Stop 停止工作者

type Task

type Task struct {
	ID          string          `json:"id"`           // 任务唯一标识符
	Type        TaskType        `json:"type"`         // 任务类型
	DocumentID  string          `json:"document_id"`  // 关联的文档ID
	Status      TaskStatus      `json:"status"`       // 任务状态
	Payload     json.RawMessage `json:"payload"`      // 任务载荷数据,不同任务类型对应不同结构
	Result      json.RawMessage `json:"result"`       // 任务结果数据,不同任务类型对应不同结构
	Error       string          `json:"error"`        // 错误信息(如果处理失败)
	CreatedAt   time.Time       `json:"created_at"`   // 创建时间
	UpdatedAt   time.Time       `json:"updated_at"`   // 更新时间
	StartedAt   *time.Time      `json:"started_at"`   // 开始处理时间
	CompletedAt *time.Time      `json:"completed_at"` // 完成时间
	Attempts    int             `json:"attempts"`     // 尝试次数
	MaxRetries  int             `json:"max_retries"`  // 最大重试次数
}

Task 任务基础结构

type TaskCallback

type TaskCallback struct {
	TaskID     string          `json:"task_id"`     // 任务ID
	DocumentID string          `json:"document_id"` // 文档ID
	Status     TaskStatus      `json:"status"`      // 任务状态
	Type       TaskType        `json:"type"`        // 任务类型
	Result     json.RawMessage `json:"result"`      // 任务结果
	Error      string          `json:"error"`       // 错误信息
	Timestamp  time.Time       `json:"timestamp"`   // 回调时间戳
}

TaskCallback 任务回调信息

type TaskCallbackHandler

type TaskCallbackHandler func(ctx context.Context, task *Task, result json.RawMessage) error

TaskCallbackHandler 任务回调处理函数类型 处理特定类型任务的回调,返回处理结果

func DefaultDocumentParseHandler

func DefaultDocumentParseHandler(ctx context.Context, queue Queue, logger *logrus.Logger) TaskCallbackHandler

DefaultDocumentParseHandler 默认的文档解析回调处理函数 处理完成后创建分块任务

func DefaultProcessCompleteHandler

func DefaultProcessCompleteHandler(ctx context.Context, queue Queue, logger *logrus.Logger) TaskCallbackHandler

DefaultProcessCompleteHandler 默认的完整处理流程回调处理函数

func DefaultTextChunkHandler

func DefaultTextChunkHandler(ctx context.Context, queue Queue, logger *logrus.Logger) TaskCallbackHandler

DefaultTextChunkHandler 默认的文本分块回调处理函数 处理完成后创建向量化任务

func DefaultVectorizeHandler

func DefaultVectorizeHandler(ctx context.Context, queue Queue, logger *logrus.Logger) TaskCallbackHandler

DefaultVectorizeHandler 默认的向量化回调处理函数 向量化是任务流程的最后一步,处理完成后更新文档状态

type TaskError

type TaskError string

TaskError 任务错误类型

func (TaskError) Error

func (e TaskError) Error() string

Error 实现error接口

type TaskInfo

type TaskInfo struct {
	ID          string     `json:"id"`           // 任务唯一标识符
	Type        TaskType   `json:"type"`         // 任务类型
	DocumentID  string     `json:"document_id"`  // 关联的文档ID
	Status      TaskStatus `json:"status"`       // 任务状态
	Error       string     `json:"error"`        // 错误信息
	CreatedAt   time.Time  `json:"created_at"`   // 创建时间
	StartedAt   *time.Time `json:"started_at"`   // 开始处理时间
	CompletedAt *time.Time `json:"completed_at"` // 完成时间
	Progress    float64    `json:"progress"`     // 处理进度(0-100)
}

TaskInfo 表示任务的元信息 用于传递给客户端的简化任务信息

func NewTaskInfo

func NewTaskInfo(task *Task) *TaskInfo

NewTaskInfo 从Task创建TaskInfo

type TaskMetrics

type TaskMetrics struct {
	ProcessingTime time.Duration // 处理时间
	RetryCount     int           // 重试次数
	UsedMemory     int64         // 使用内存
}

TaskMetrics 任务执行指标

type TaskResult

type TaskResult struct {
	Status  TaskStatus  // 任务状态
	Result  interface{} // 任务结果数据
	Error   string      // 错误信息
	Metrics TaskMetrics // 任务执行指标
}

TaskResult 表示任务执行的结果 通常由Handler返回,用于更新任务状态和结果

type TaskStatus

type TaskStatus string

TaskStatus 任务状态

const (
	// StatusPending 等待处理
	StatusPending TaskStatus = "pending"
	// StatusProcessing 处理中
	StatusProcessing TaskStatus = "processing"
	// StatusCompleted 已完成
	StatusCompleted TaskStatus = "completed"
	// StatusFailed 处理失败
	StatusFailed TaskStatus = "failed"
)

type TaskType

type TaskType string

TaskType 任务类型

const (
	// TaskDocumentParse 文档解析任务
	TaskDocumentParse TaskType = "document_parse"
	// TaskTextChunk 文本分块任务
	TaskTextChunk TaskType = "text_chunk"
	// TaskVectorize 文本向量化任务
	TaskVectorize TaskType = "vectorize"
	// TaskProcessComplete 文档处理完整流程任务
	TaskProcessComplete TaskType = "process_complete"
)

type TextChunkPayload

type TextChunkPayload struct {
	DocumentID string `json:"document_id"` // 文档ID
	Content    string `json:"content"`     // 文本内容
	ChunkSize  int    `json:"chunk_size"`  // 分块大小
	Overlap    int    `json:"overlap"`     // 重叠大小
	SplitType  string `json:"split_type"`  // 分割类型: paragraph, sentence, length
}

TextChunkPayload 文本分块任务载荷

type TextChunkResult

type TextChunkResult struct {
	DocumentID string      `json:"document_id"` // 文档ID
	Chunks     []ChunkInfo `json:"chunks"`      // 分块列表
	ChunkCount int         `json:"chunk_count"` // 分块数量
	Error      string      `json:"error"`       // 错误信息(如果有)
}

TextChunkResult 文本分块任务结果

type VectorInfo

type VectorInfo struct {
	ChunkIndex int       `json:"chunk_index"` // 分块索引
	Vector     []float32 `json:"vector"`      // 向量数据
}

VectorInfo 向量信息

type VectorizePayload

type VectorizePayload struct {
	DocumentID string      `json:"document_id"` // 文档ID
	Chunks     []ChunkInfo `json:"chunks"`      // 文本分块
	Model      string      `json:"model"`       // 嵌入模型名称
}

VectorizePayload 文本向量化任务载荷

type VectorizeResult

type VectorizeResult struct {
	DocumentID  string       `json:"document_id"`  // 文档ID
	Vectors     []VectorInfo `json:"vectors"`      // 向量列表
	VectorCount int          `json:"vector_count"` // 向量数量
	Model       string       `json:"model"`        // 使用的模型
	Dimension   int          `json:"dimension"`    // 向量维度
	Error       string       `json:"error"`        // 错误信息(如果有)
}

VectorizeResult 向量化任务结果

type Worker

type Worker interface {
	// RegisterHandler 注册任务处理器
	RegisterHandler(taskType TaskType, handler Handler)

	// Start 启动工作者,开始处理任务
	Start() error

	// Stop 停止工作者
	Stop()
}

Worker 工作者接口 负责运行一组Handler来处理队列中的任务

func NewRedisWorker

func NewRedisWorker(queue *RedisQueue, cfg *Config) Worker

NewRedisWorker 创建Redis工作者

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL