Documentation
¶
Overview ¶
Package tasks provides task handling and execution functionality
Index ¶
- Constants
- Variables
- type Executor
- type IncrementalTaskPayload
- type QueueManager
- func (q *QueueManager) Close() error
- func (q *QueueManager) Enqueue(task *asynq.Task, opts ...asynq.Option) (*asynq.TaskInfo, error)
- func (q *QueueManager) EnqueueTransformation(payload TaskPayload, opts ...asynq.Option) error
- func (q *QueueManager) IsTaskPendingOrRunning(task TaskPayload) (bool, error)
- type ScheduledTaskPayload
- type TaskContext
- type TaskHandler
- type TaskPayload
- type TaskType
Constants ¶
const ( // DirectionForward represents forward fill processing DirectionForward = "forward" // DirectionBack represents backfill processing DirectionBack = "back" )
const (
// TypeModelTransformation is the task type for model transformations
TypeModelTransformation = "model:transformation"
)
Variables ¶
var ( // ErrModelConfigNotFound is returned when model configuration is not found ErrModelConfigNotFound = errors.New("model configuration not found") // ErrDependenciesNotSatisfied is returned when dependencies are not satisfied ErrDependenciesNotSatisfied = errors.New("dependencies not satisfied") // ErrModelIDNotFound is returned when model_id is not found in payload ErrModelIDNotFound = errors.New("model_id not found in payload") // ErrScanTypeNotFound is returned when scan_type is not found in payload ErrScanTypeNotFound = errors.New("scan_type not found in payload") // ErrUnexpectedExternalType is returned when an external task type appears in transformation payload. ErrUnexpectedExternalType = errors.New("unexpected external task type in transformation payload") )
Functions ¶
This section is empty.
Types ¶
type Executor ¶
type Executor interface {
Execute(ctx context.Context, taskCtx any) error
Validate(ctx context.Context, taskCtx any) error
UpdateBounds(ctx context.Context, modelID, scanType string) error
}
Executor defines the interface for task executors
type IncrementalTaskPayload ¶ added in v0.0.25
type IncrementalTaskPayload struct {
Type TaskType `json:"type"`
ModelID string `json:"model_id"`
Position uint64 `json:"position"`
Interval uint64 `json:"interval"`
Direction string `json:"direction"` // DirectionForward or DirectionBack
EnqueuedAt time.Time `json:"enqueued_at"`
}
IncrementalTaskPayload represents a position-based incremental task
func (IncrementalTaskPayload) GetEnqueuedAt ¶ added in v0.0.25
func (p IncrementalTaskPayload) GetEnqueuedAt() time.Time
GetEnqueuedAt returns the enqueued time
func (IncrementalTaskPayload) GetModelID ¶ added in v0.0.25
func (p IncrementalTaskPayload) GetModelID() string
GetModelID returns the model ID
func (IncrementalTaskPayload) GetType ¶ added in v0.0.25
func (p IncrementalTaskPayload) GetType() TaskType
GetType returns the task type
func (IncrementalTaskPayload) QueueName ¶ added in v0.0.25
func (p IncrementalTaskPayload) QueueName() string
QueueName returns the queue name for this task
func (IncrementalTaskPayload) UniqueID ¶ added in v0.0.25
func (p IncrementalTaskPayload) UniqueID() string
UniqueID returns a unique identifier for this task Uses model.id:direction to ensure only one task per direction can run at a time. This prevents duplicate work when intervals expand (e.g., model:100:25 -> model:100:50) and leverages the natural separation between forward fill (frontier) and backfill (historical gaps).
type QueueManager ¶
type QueueManager struct {
// contains filtered or unexported fields
}
QueueManager manages task queuing
func NewQueueManager ¶
func NewQueueManager(redisOpt *asynq.RedisClientOpt) *QueueManager
NewQueueManager creates a new queue manager
func (*QueueManager) EnqueueTransformation ¶
func (q *QueueManager) EnqueueTransformation(payload TaskPayload, opts ...asynq.Option) error
EnqueueTransformation enqueues a transformation task
func (*QueueManager) IsTaskPendingOrRunning ¶
func (q *QueueManager) IsTaskPendingOrRunning(task TaskPayload) (bool, error)
IsTaskPendingOrRunning checks if a task is pending or running
type ScheduledTaskPayload ¶ added in v0.0.25
type ScheduledTaskPayload struct {
Type TaskType `json:"type"`
ModelID string `json:"model_id"`
ExecutionTime time.Time `json:"execution_time"`
EnqueuedAt time.Time `json:"enqueued_at"`
}
ScheduledTaskPayload represents a scheduled cron-based task
func (ScheduledTaskPayload) GetEnqueuedAt ¶ added in v0.0.25
func (p ScheduledTaskPayload) GetEnqueuedAt() time.Time
GetEnqueuedAt returns the enqueued time
func (ScheduledTaskPayload) GetModelID ¶ added in v0.0.25
func (p ScheduledTaskPayload) GetModelID() string
GetModelID returns the model ID
func (ScheduledTaskPayload) GetType ¶ added in v0.0.25
func (p ScheduledTaskPayload) GetType() TaskType
GetType returns the task type
func (ScheduledTaskPayload) QueueName ¶ added in v0.0.25
func (p ScheduledTaskPayload) QueueName() string
QueueName returns the queue name for this task
func (ScheduledTaskPayload) UniqueID ¶ added in v0.0.25
func (p ScheduledTaskPayload) UniqueID() string
UniqueID returns a unique identifier for this task
type TaskContext ¶
type TaskContext struct {
Transformation models.Transformation
Position uint64
Interval uint64
ExecutionTime time.Time // Time the task should execute for (from payload or start time)
StartTime time.Time // When the handler started processing
WorkerID string
}
TaskContext contains all context needed for task execution
type TaskHandler ¶
type TaskHandler struct {
// contains filtered or unexported fields
}
TaskHandler handles task execution
func NewTaskHandler ¶
func NewTaskHandler( logger logrus.FieldLogger, chClient clickhouse.ClientInterface, adminService admin.Service, validator validation.Validator, modelExecutor Executor, transformations []models.Transformation, ) *TaskHandler
NewTaskHandler creates a new task handler
func (*TaskHandler) HandleExternalScan ¶ added in v0.0.18
HandleExternalScan handles external model scan tasks (both incremental and full)
func (*TaskHandler) HandleTransformation ¶
HandleTransformation handles transformation tasks
func (*TaskHandler) Routes ¶
func (h *TaskHandler) Routes() map[string]asynq.HandlerFunc
Routes returns the task handler routes for Asynq
type TaskPayload ¶
type TaskPayload interface {
GetModelID() string
GetEnqueuedAt() time.Time
GetType() TaskType
UniqueID() string
QueueName() string
}
TaskPayload is the common interface for all task payloads
type TaskType ¶ added in v0.0.25
type TaskType string
TaskType indicates whether this is a scheduled or incremental task
const ( // TaskTypeIncremental represents incremental position-based tasks TaskTypeIncremental TaskType = "incremental" // TaskTypeScheduled represents scheduled cron-based tasks TaskTypeScheduled TaskType = "scheduled" // TaskTypeExternal represents external model scan tasks TaskTypeExternal TaskType = "external" )