Documentation
¶
Overview ¶
Package queue provides a comprehensive task queue and background job processing system with advanced scheduling capabilities.
It supports in-memory, Redis, and RabbitMQ queues with features like:
- Priority-based job scheduling
- Worker pool management
- Retry mechanisms with exponential backoff
- Middleware support for logging, metrics, and recovery
- Batch processing capabilities
- Graceful shutdown handling
- Job progress tracking
- Comprehensive error handling
- RabbitMQ support with persistent message delivery
- Scheduled job execution
- Cron-based scheduling (using enhanced cron expressions with optional seconds support)
- Interval-based scheduling (using time.Duration)
- Task registration and management
- Error recovery and retries
- Context-aware execution
Queue Manager Example:
package main import ( "context" "fmt" "time" "github.com/Valentin-Kaiser/go-core/queue" ) func main() { // Create a new queue manager manager := queue.NewManager() // Register a job handler manager.RegisterHandler("email", func(ctx context.Context, job *queue.Job) error { fmt.Printf("Processing email job: %s\n", job.ID) return nil }) // Start the manager if err := manager.Start(context.Background()); err != nil { panic(err) } defer manager.Stop() // Enqueue a job job := queue.NewJob("email"). WithPayload(map[string]interface{}{ "to": "user@example.com", "subject": "Welcome!", }). Build() if err := manager.Enqueue(context.Background(), job); err != nil { panic(err) } // Wait for processing time.Sleep(time.Second) }
Task Scheduler Example:
package main import ( "context" "fmt" "time" "github.com/Valentin-Kaiser/go-core/queue" ) func main() { // Create a new task scheduler scheduler := queue.NewTaskScheduler() // Register a cron-based task (5 fields - traditional) scheduler.RegisterCronTask("cleanup", "0 0 * * *", func(ctx context.Context) error { fmt.Println("Running daily cleanup task") return nil }) // Register a cron-based task with seconds (6 fields) scheduler.RegisterCronTask("frequent", "*/30 * * * * *", func(ctx context.Context) error { fmt.Println("Running every 30 seconds") return nil }) // Register a predefined expression scheduler.RegisterCronTask("hourly", "@hourly", func(ctx context.Context) error { fmt.Println("Running every hour") return nil }) // Register a task with named months and days scheduler.RegisterCronTask("weekday", "0 0 9 * * MON-FRI", func(ctx context.Context) error { fmt.Println("Running on weekdays at 9 AM") return nil }) // Register an interval-based task scheduler.RegisterIntervalTask("health-check", time.Minute*5, func(ctx context.Context) error { fmt.Println("Running health check") return nil }) // Start the scheduler if err := scheduler.Start(context.Background()); err != nil { panic(err) } defer scheduler.Stop() // Keep the program running select {} }
Index ¶
- Variables
- func IsRetryable(err error) bool
- type Batch
- type BatchManager
- func (bm *BatchManager) CreateBatch(ctx context.Context, name string, jobs []*Job) (*Batch, error)
- func (bm *BatchManager) DeleteBatch(ctx context.Context, id string) error
- func (bm *BatchManager) GetBatch(id string) (*Batch, error)
- func (bm *BatchManager) GetBatches() []*Batch
- func (bm *BatchManager) UpdateBatchStatus(ctx context.Context, batchID string) error
- type CronExpression
- type CronField
- type Job
- type JobBuilder
- func (jb *JobBuilder) Build() *Job
- func (jb *JobBuilder) WithDelay(delay time.Duration) *JobBuilder
- func (jb *JobBuilder) WithID(id string) *JobBuilder
- func (jb *JobBuilder) WithJSONPayload(jsonData []byte) *JobBuilder
- func (jb *JobBuilder) WithMaxAttempts(maxAttempts int) *JobBuilder
- func (jb *JobBuilder) WithMetadata(key, value string) *JobBuilder
- func (jb *JobBuilder) WithPayload(payload map[string]interface{}) *JobBuilder
- func (jb *JobBuilder) WithPriority(priority Priority) *JobBuilder
- func (jb *JobBuilder) WithScheduleAt(scheduleAt time.Time) *JobBuilder
- type JobContext
- func (jc *JobContext) GetMetadata(key string) (string, bool)
- func (jc *JobContext) GetPayload(key string) (interface{}, bool)
- func (jc *JobContext) GetPayloadBool(key string) (bool, bool)
- func (jc *JobContext) GetPayloadInt(key string) (int, bool)
- func (jc *JobContext) GetPayloadString(key string) (string, bool)
- func (jc *JobContext) ReportProgress(progress float64)
- type JobHandler
- type JobProgress
- type Manager
- func (m *Manager) Enqueue(ctx context.Context, job *Job) error
- func (m *Manager) GetJob(ctx context.Context, id string) (*Job, error)
- func (m *Manager) GetJobs(ctx context.Context, status Status, limit int) ([]*Job, error)
- func (m *Manager) GetStats() *Stats
- func (m *Manager) IsRunning() bool
- func (m *Manager) RegisterHandler(jobType string, handler JobHandler)
- func (m *Manager) Schedule(ctx context.Context, job *Job) error
- func (m *Manager) Start(ctx context.Context) error
- func (m *Manager) Stop() error
- func (m *Manager) WithQueue(queue Queue) *Manager
- func (m *Manager) WithRabbitMQ(config RabbitMQConfig) *Manager
- func (m *Manager) WithRabbitMQFromURL(url string) *Manager
- func (m *Manager) WithRetryAttempts(attempts int) *Manager
- func (m *Manager) WithRetryBackoff(backoff float64) *Manager
- func (m *Manager) WithRetryDelay(delay time.Duration) *Manager
- func (m *Manager) WithScheduleInterval(interval time.Duration) *Manager
- func (m *Manager) WithWorkers(workers int) *Manager
- type MemoryQueue
- func (mq *MemoryQueue) Close() error
- func (mq *MemoryQueue) DeleteJob(_ context.Context, id string) error
- func (mq *MemoryQueue) Dequeue(ctx context.Context, timeout time.Duration) (*Job, error)
- func (mq *MemoryQueue) Enqueue(_ context.Context, job *Job) error
- func (mq *MemoryQueue) GetJob(_ context.Context, id string) (*Job, error)
- func (mq *MemoryQueue) GetJobs(_ context.Context, status Status, limit int) ([]*Job, error)
- func (mq *MemoryQueue) GetStats(_ context.Context) (*Stats, error)
- func (mq *MemoryQueue) Schedule(_ context.Context, job *Job) error
- func (mq *MemoryQueue) UpdateJob(_ context.Context, job *Job) error
- type Middleware
- type Priority
- type Queue
- type RabbitMQ
- func (rq *RabbitMQ) Close() error
- func (rq *RabbitMQ) DeleteJob(_ context.Context, id string) error
- func (rq *RabbitMQ) Dequeue(ctx context.Context, timeout time.Duration) (*Job, error)
- func (rq *RabbitMQ) Enqueue(ctx context.Context, job *Job) error
- func (rq *RabbitMQ) GetJob(_ context.Context, id string) (*Job, error)
- func (rq *RabbitMQ) GetJobs(_ context.Context, status Status, limit int) ([]*Job, error)
- func (rq *RabbitMQ) GetStats(_ context.Context) (*Stats, error)
- func (rq *RabbitMQ) IsConnectionOpen() bool
- func (rq *RabbitMQ) PurgeQueue(_ context.Context) error
- func (rq *RabbitMQ) Reconnect(config RabbitMQConfig) error
- func (rq *RabbitMQ) Schedule(ctx context.Context, job *Job) error
- func (rq *RabbitMQ) UpdateJob(_ context.Context, job *Job) error
- type RabbitMQConfig
- type RedisQueue
- func (rq *RedisQueue) Close() error
- func (rq *RedisQueue) DeleteJob(ctx context.Context, id string) error
- func (rq *RedisQueue) Dequeue(ctx context.Context, timeout time.Duration) (*Job, error)
- func (rq *RedisQueue) Enqueue(ctx context.Context, job *Job) error
- func (rq *RedisQueue) GetJob(ctx context.Context, id string) (*Job, error)
- func (rq *RedisQueue) GetJobs(ctx context.Context, status Status, limit int) ([]*Job, error)
- func (rq *RedisQueue) GetStats(ctx context.Context) (*Stats, error)
- func (rq *RedisQueue) MoveScheduledToPending(ctx context.Context) error
- func (rq *RedisQueue) Schedule(ctx context.Context, job *Job) error
- func (rq *RedisQueue) UpdateJob(ctx context.Context, job *Job) error
- type RetryableError
- type Stats
- type Status
- type Task
- type TaskFunc
- type TaskOptions
- type TaskScheduler
- func (s *TaskScheduler) DisableTask(name string) error
- func (s *TaskScheduler) EnableTask(name string) error
- func (s *TaskScheduler) GetTask(name string) (*Task, error)
- func (s *TaskScheduler) GetTasks() map[string]*Task
- func (s *TaskScheduler) IsRunning() bool
- func (s *TaskScheduler) ParseCronSpec(cronSpec string) (*CronExpression, error)
- func (s *TaskScheduler) RegisterCronTask(name, cronSpec string, fn TaskFunc) error
- func (s *TaskScheduler) RegisterCronTaskWithOptions(name, cronSpec string, fn TaskFunc, options TaskOptions) error
- func (s *TaskScheduler) RegisterIntervalTask(name string, interval time.Duration, fn TaskFunc) error
- func (s *TaskScheduler) RegisterIntervalTaskWithOptions(name string, interval time.Duration, fn TaskFunc, options TaskOptions) error
- func (s *TaskScheduler) RemoveTask(name string) error
- func (s *TaskScheduler) Start(ctx context.Context) error
- func (s *TaskScheduler) Stop()
- func (s *TaskScheduler) ValidateCronSpec(spec string) error
- func (s *TaskScheduler) WithCheckInterval(interval time.Duration) *TaskScheduler
- func (s *TaskScheduler) WithDefaultRetries(retries int) *TaskScheduler
- func (s *TaskScheduler) WithDefaultTimeout(timeout time.Duration) *TaskScheduler
- func (s *TaskScheduler) WithRetryDelay(delay time.Duration) *TaskScheduler
- type TaskType
Constants ¶
This section is empty.
Variables ¶
var ( // Presets are predefined cron expressions for common schedules Presets = map[string]string{ "@yearly": "0 0 0 1 1 *", "@annually": "0 0 0 1 1 *", "@monthly": "0 0 0 1 * *", "@weekly": "0 0 0 * * 0", "@daily": "0 0 0 * * *", "@midnight": "0 0 0 * * *", "@hourly": "0 0 * * * *", } // Months is a map of month names to their numeric values Months = map[string]int{ "JAN": 1, "FEB": 2, "MAR": 3, "APR": 4, "MAY": 5, "JUN": 6, "JUL": 7, "AUG": 8, "SEP": 9, "OCT": 10, "NOV": 11, "DEC": 12, } // Days is a map of day names to their numeric values Days = map[string]int{ "SUN": 0, "MON": 1, "TUE": 2, "WED": 3, "THU": 4, "FRI": 5, "SAT": 6, } )
var ErrNoJobAvailable = apperror.NewError("no job available")
ErrNoJobAvailable is returned when no job is available in the queue
Functions ¶
Types ¶
type Batch ¶
type Batch struct { ID string `json:"id"` Name string `json:"name"` JobIDs []string `json:"job_ids"` Status Status `json:"status"` Total int `json:"total"` Pending int `json:"pending"` Running int `json:"running"` Completed int `json:"completed"` Failed int `json:"failed"` CreatedAt time.Time `json:"created_at"` CompletedAt time.Time `json:"completed_at,omitempty"` Metadata map[string]string `json:"metadata,omitempty"` }
Batch represents a batch of jobs
type BatchManager ¶
type BatchManager struct {
// contains filtered or unexported fields
}
BatchManager manages job batches
func NewBatchManager ¶
func NewBatchManager(manager *Manager) *BatchManager
NewBatchManager creates a new batch manager
func (*BatchManager) CreateBatch ¶
CreateBatch creates a new batch of jobs
func (*BatchManager) DeleteBatch ¶
func (bm *BatchManager) DeleteBatch(ctx context.Context, id string) error
DeleteBatch removes a batch
func (*BatchManager) GetBatch ¶
func (bm *BatchManager) GetBatch(id string) (*Batch, error)
GetBatch retrieves a batch by ID
func (*BatchManager) GetBatches ¶
func (bm *BatchManager) GetBatches() []*Batch
GetBatches returns all batches
func (*BatchManager) UpdateBatchStatus ¶
func (bm *BatchManager) UpdateBatchStatus(ctx context.Context, batchID string) error
UpdateBatchStatus updates the status of a batch based on its jobs
type CronExpression ¶
type CronExpression struct { Second *CronField // Optional seconds field (0-59) Minute CronField // Minute field (0-59) Hour CronField // Hour field (0-23) Day CronField // Day field (1-31) Month CronField // Month field (1-12) DayOfWeek CronField // Day of week field (0-6, 0 = Sunday) }
CronExpression represents a parsed cron expression
type Job ¶
type Job struct { ID string `json:"id"` Type string `json:"type"` Priority Priority `json:"priority"` Status Status `json:"status"` Attempts int `json:"attempts"` MaxAttempts int `json:"max_attempts"` Progress float64 `json:"progress"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` CompletedAt time.Time `json:"completed_at,omitempty"` ScheduleAt time.Time `json:"schedule_at,omitempty"` RetryAt time.Time `json:"retry_at,omitempty"` Timeout time.Duration `json:"timeout"` Payload json.RawMessage `json:"payload,omitempty"` Results json.RawMessage `json:"results,omitempty"` Error string `json:"error,omitempty"` Metadata map[string]string `json:"metadata,omitempty"` }
Job represents a job to be processed
func (*Job) IsScheduled ¶
IsScheduled returns true if the job is scheduled for a future time
type JobBuilder ¶
type JobBuilder struct {
// contains filtered or unexported fields
}
JobBuilder helps create jobs with a fluent API
func (*JobBuilder) WithDelay ¶
func (jb *JobBuilder) WithDelay(delay time.Duration) *JobBuilder
WithDelay schedules the job to run after a delay
func (*JobBuilder) WithID ¶
func (jb *JobBuilder) WithID(id string) *JobBuilder
WithID sets the job ID
func (*JobBuilder) WithJSONPayload ¶
func (jb *JobBuilder) WithJSONPayload(jsonData []byte) *JobBuilder
WithJSONPayload sets the job payload from JSON
func (*JobBuilder) WithMaxAttempts ¶
func (jb *JobBuilder) WithMaxAttempts(maxAttempts int) *JobBuilder
WithMaxAttempts sets the maximum number of attempts
func (*JobBuilder) WithMetadata ¶
func (jb *JobBuilder) WithMetadata(key, value string) *JobBuilder
WithMetadata sets job metadata
func (*JobBuilder) WithPayload ¶
func (jb *JobBuilder) WithPayload(payload map[string]interface{}) *JobBuilder
WithPayload sets the job payload
func (*JobBuilder) WithPriority ¶
func (jb *JobBuilder) WithPriority(priority Priority) *JobBuilder
WithPriority sets the job priority
func (*JobBuilder) WithScheduleAt ¶
func (jb *JobBuilder) WithScheduleAt(scheduleAt time.Time) *JobBuilder
WithScheduleAt schedules the job for a specific time
type JobContext ¶
type JobContext struct {
Job *Job
}
JobContext provides context for job execution
func NewJobContext ¶
func NewJobContext(_ context.Context, job *Job) *JobContext
NewJobContext creates a new job context
func (*JobContext) GetMetadata ¶
func (jc *JobContext) GetMetadata(key string) (string, bool)
GetMetadata gets a metadata value
func (*JobContext) GetPayload ¶
func (jc *JobContext) GetPayload(key string) (interface{}, bool)
GetPayload gets a value from the job payload
func (*JobContext) GetPayloadBool ¶
func (jc *JobContext) GetPayloadBool(key string) (bool, bool)
GetPayloadBool gets a bool value from the job payload
func (*JobContext) GetPayloadInt ¶
func (jc *JobContext) GetPayloadInt(key string) (int, bool)
GetPayloadInt gets an int value from the job payload
func (*JobContext) GetPayloadString ¶
func (jc *JobContext) GetPayloadString(key string) (string, bool)
GetPayloadString gets a string value from the job payload
func (*JobContext) ReportProgress ¶
func (jc *JobContext) ReportProgress(progress float64)
ReportProgress reports job progress
type JobHandler ¶
JobHandler is a function that processes jobs
func LoggingMiddleware ¶
func LoggingMiddleware(next JobHandler) JobHandler
LoggingMiddleware logs job execution
func MetricsMiddleware ¶
func MetricsMiddleware(next JobHandler) JobHandler
MetricsMiddleware tracks job metrics
func MiddlewareChain ¶
func MiddlewareChain(handler JobHandler, middlewares ...Middleware) JobHandler
MiddlewareChain applies multiple middlewares to a job handler
func RecoveryMiddleware ¶
func RecoveryMiddleware(next JobHandler) JobHandler
RecoveryMiddleware recovers from panics in job handlers
type JobProgress ¶
type JobProgress struct { JobID string `json:"job_id"` Progress float64 `json:"progress"` Message string `json:"message,omitempty"` }
JobProgress represents job progress information
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager manages the job queue and workers
func NewManager ¶
func NewManager() *Manager
NewManager creates a new queue manager with default settings
func (*Manager) RegisterHandler ¶
func (m *Manager) RegisterHandler(jobType string, handler JobHandler)
RegisterHandler registers a job handler for a specific job type
func (*Manager) WithRabbitMQ ¶
func (m *Manager) WithRabbitMQ(config RabbitMQConfig) *Manager
WithRabbitMQ sets the queue to use RabbitMQ with the given configuration
func (*Manager) WithRabbitMQFromURL ¶
WithRabbitMQFromURL sets the queue to use RabbitMQ with the given URL
func (*Manager) WithRetryAttempts ¶
WithRetryAttempts sets the maximum number of retry attempts
func (*Manager) WithRetryBackoff ¶
WithRetryBackoff sets the retry backoff multiplier
func (*Manager) WithRetryDelay ¶
WithRetryDelay sets the retry delay
func (*Manager) WithScheduleInterval ¶
WithScheduleInterval sets the interval for checking scheduled jobs
func (*Manager) WithWorkers ¶
WithWorkers sets the number of workers
type MemoryQueue ¶
type MemoryQueue struct {
// contains filtered or unexported fields
}
MemoryQueue implements an in-memory job queue with priority support
func NewMemoryQueue ¶
func NewMemoryQueue() *MemoryQueue
NewMemoryQueue creates and initializes a new in-memory job queue with priority support.
Thread-safety: The queue is thread-safe, utilizing a sync.RWMutex for concurrent access and a channel for notifications. This ensures safe operations in multi-threaded environments.
Intended use cases: The queue is suitable for managing jobs in memory, particularly in scenarios requiring priority-based scheduling and thread-safe operations. It is ideal for applications where job persistence is not required, and all jobs can be managed in memory.
func (*MemoryQueue) DeleteJob ¶
func (mq *MemoryQueue) DeleteJob(_ context.Context, id string) error
DeleteJob removes a job from the queue
func (*MemoryQueue) Enqueue ¶
func (mq *MemoryQueue) Enqueue(_ context.Context, job *Job) error
Enqueue adds a job to the queue
func (*MemoryQueue) GetStats ¶
func (mq *MemoryQueue) GetStats(_ context.Context) (*Stats, error)
GetStats returns queue statistics
type Middleware ¶
type Middleware func(JobHandler) JobHandler
Middleware is a function that wraps a JobHandler
func TimeoutMiddleware ¶
func TimeoutMiddleware(timeout time.Duration) Middleware
TimeoutMiddleware adds timeout to job execution
type Queue ¶
type Queue interface { Enqueue(ctx context.Context, job *Job) error Dequeue(ctx context.Context, timeout time.Duration) (*Job, error) Schedule(ctx context.Context, job *Job) error UpdateJob(ctx context.Context, job *Job) error GetJob(ctx context.Context, id string) (*Job, error) GetJobs(ctx context.Context, status Status, limit int) ([]*Job, error) GetStats(ctx context.Context) (*Stats, error) DeleteJob(ctx context.Context, id string) error Close() error }
Queue defines the interface for job queues
type RabbitMQ ¶ added in v1.4.3
type RabbitMQ struct {
// contains filtered or unexported fields
}
RabbitMQ implements a RabbitMQ-backed job queue
func NewRabbitMQ ¶ added in v1.4.3
func NewRabbitMQ(config RabbitMQConfig) (*RabbitMQ, error)
NewRabbitMQ creates a new RabbitMQ-backed queue.
Configuration Parameters: - URL: The RabbitMQ server URL. This is required for establishing a connection. - QueueName: The name of the queue to use. This is required and must be unique. - ExchangeName: The name of the exchange to bind the queue to. This is required. - RoutingKey: The routing key for binding the queue to the exchange. This is required. - Durable: If true, the queue will survive a broker restart. - AutoDelete: If true, the queue will be deleted when no consumers are connected. - Exclusive: If true, the queue will be used by only one connection and deleted when the connection closes. - NoWait: If true, the queue declaration will not wait for a server response. - MaxPriority: The maximum priority level for the queue (0-255). Defaults to 10 if not specified.
Connection Setup: The function establishes a connection to the RabbitMQ server using the provided URL. It then declares a queue with the specified configuration parameters and binds it to the exchange.
Error Conditions: - Missing or empty URL, QueueName, ExchangeName, or RoutingKey will result in an error. - Connection failures (e.g., network issues, invalid URL) will return a wrapped error. - Invalid MaxPriority values (outside the range 0-255) may cause unexpected behavior.
func NewRabbitMQFromURL ¶ added in v1.4.3
NewRabbitMQFromURL creates a new RabbitMQ queue with a simple URL
func (*RabbitMQ) IsConnectionOpen ¶ added in v1.4.3
IsConnectionOpen checks if the RabbitMQ connection is open
func (*RabbitMQ) PurgeQueue ¶ added in v1.4.3
PurgeQueue removes all messages from the queue
func (*RabbitMQ) Reconnect ¶ added in v1.4.3
func (rq *RabbitMQ) Reconnect(config RabbitMQConfig) error
Reconnect attempts to reconnect to RabbitMQ
type RabbitMQConfig ¶
type RabbitMQConfig struct { URL string QueueName string ExchangeName string RoutingKey string Durable bool AutoDelete bool Exclusive bool NoWait bool MaxPriority int // Maximum priority level for the queue (0-255) }
RabbitMQConfig holds configuration for RabbitMQ queue
type RedisQueue ¶
type RedisQueue struct {
// contains filtered or unexported fields
}
RedisQueue implements a Redis-backed job queue
func NewRedisQueue ¶
func NewRedisQueue(client redis.Cmdable, keyPrefix string) *RedisQueue
NewRedisQueue creates a new Redis-backed queue.
Parameters: - client: A Redis client implementing the redis.Cmdable interface, used to interact with the Redis database. - keyPrefix: A string used as a prefix for all Redis keys created by the queue. If an empty string is provided, the default prefix "queue" is used.
Purpose: This function initializes a RedisQueue instance, which provides methods for enqueueing, dequeueing, and scheduling jobs in a Redis-backed job queue.
func (*RedisQueue) DeleteJob ¶
func (rq *RedisQueue) DeleteJob(ctx context.Context, id string) error
DeleteJob removes a job from the queue
func (*RedisQueue) Enqueue ¶
func (rq *RedisQueue) Enqueue(ctx context.Context, job *Job) error
Enqueue adds a job to the queue
func (*RedisQueue) GetStats ¶
func (rq *RedisQueue) GetStats(ctx context.Context) (*Stats, error)
GetStats returns queue statistics
func (*RedisQueue) MoveScheduledToPending ¶
func (rq *RedisQueue) MoveScheduledToPending(ctx context.Context) error
MoveScheduledToPending moves scheduled jobs that are ready to be processed
type RetryableError ¶
type RetryableError struct {
Err error
}
RetryableError represents an error that should trigger a retry
func NewRetryableError ¶
func NewRetryableError(err error) *RetryableError
NewRetryableError creates a new retryable error
func (*RetryableError) Error ¶
func (e *RetryableError) Error() string
type Stats ¶
type Stats struct { JobsProcessed int64 `json:"jobs_processed"` JobsFailed int64 `json:"jobs_failed"` JobsRetried int64 `json:"jobs_retried"` QueueSize int64 `json:"queue_size"` WorkersActive int64 `json:"workers_active"` WorkersBusy int64 `json:"workers_busy"` TotalJobs int64 `json:"total_jobs"` Pending int64 `json:"pending"` Running int64 `json:"running"` Completed int64 `json:"completed"` Failed int64 `json:"failed"` Retrying int64 `json:"retrying"` Scheduled int64 `json:"scheduled"` DeadLetter int64 `json:"dead_letter"` }
Stats represents queue statistics
type Status ¶
type Status int
Status represents the status of a job
const ( // StatusPending indicates the job is pending execution StatusPending Status = iota // StatusRunning indicates the job is currently being executed StatusRunning // StatusCompleted indicates the job has completed successfully StatusCompleted // StatusFailed indicates the job has failed StatusFailed // StatusRetrying indicates the job is scheduled for retry StatusRetrying // StatusScheduled indicates the job is scheduled for future execution StatusScheduled // StatusDeadLetter indicates the job has been moved to dead letter queue StatusDeadLetter )
type Task ¶
type Task struct { ID string `json:"id"` Name string `json:"name"` Type TaskType `json:"type"` CronSpec string `json:"cron_spec,omitempty"` Interval time.Duration `json:"interval,omitempty"` Function TaskFunc `json:"-"` NextRun time.Time `json:"next_run"` LastRun time.Time `json:"last_run"` RunCount int64 `json:"run_count"` ErrorCount int64 `json:"error_count"` LastError string `json:"last_error,omitempty"` IsRunning bool `json:"is_running"` MaxRetries int `json:"max_retries"` RetryDelay time.Duration `json:"retry_delay"` Timeout time.Duration `json:"timeout"` Enabled bool `json:"enabled"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` }
Task represents a scheduled task
type TaskOptions ¶
type TaskOptions struct { MaxRetries int RetryDelay time.Duration Timeout time.Duration Enabled *bool }
TaskOptions provides configuration options for tasks
type TaskScheduler ¶
type TaskScheduler struct {
// contains filtered or unexported fields
}
TaskScheduler manages background tasks
func NewTaskScheduler ¶
func NewTaskScheduler() *TaskScheduler
NewTaskScheduler creates a new task scheduler with default settings
func (*TaskScheduler) DisableTask ¶
func (s *TaskScheduler) DisableTask(name string) error
DisableTask disables a task
func (*TaskScheduler) EnableTask ¶
func (s *TaskScheduler) EnableTask(name string) error
EnableTask enables a task
func (*TaskScheduler) GetTask ¶
func (s *TaskScheduler) GetTask(name string) (*Task, error)
GetTask returns a task by name
func (*TaskScheduler) GetTasks ¶
func (s *TaskScheduler) GetTasks() map[string]*Task
GetTasks returns all registered tasks
func (*TaskScheduler) IsRunning ¶
func (s *TaskScheduler) IsRunning() bool
IsRunning returns true if the scheduler is running
func (*TaskScheduler) ParseCronSpec ¶
func (s *TaskScheduler) ParseCronSpec(cronSpec string) (*CronExpression, error)
ParseCronSpec parses a cron specification
func (*TaskScheduler) RegisterCronTask ¶
func (s *TaskScheduler) RegisterCronTask(name, cronSpec string, fn TaskFunc) error
RegisterCronTask registers a new cron-based task
func (*TaskScheduler) RegisterCronTaskWithOptions ¶
func (s *TaskScheduler) RegisterCronTaskWithOptions(name, cronSpec string, fn TaskFunc, options TaskOptions) error
RegisterCronTaskWithOptions registers a new cron-based task with options
func (*TaskScheduler) RegisterIntervalTask ¶
func (s *TaskScheduler) RegisterIntervalTask(name string, interval time.Duration, fn TaskFunc) error
RegisterIntervalTask registers a new interval-based task
func (*TaskScheduler) RegisterIntervalTaskWithOptions ¶
func (s *TaskScheduler) RegisterIntervalTaskWithOptions(name string, interval time.Duration, fn TaskFunc, options TaskOptions) error
RegisterIntervalTaskWithOptions registers a new interval-based task with options
func (*TaskScheduler) RemoveTask ¶
func (s *TaskScheduler) RemoveTask(name string) error
RemoveTask removes a task from the scheduler
func (*TaskScheduler) Start ¶
func (s *TaskScheduler) Start(ctx context.Context) error
Start starts the task scheduler
func (*TaskScheduler) ValidateCronSpec ¶
func (s *TaskScheduler) ValidateCronSpec(spec string) error
ValidateCronSpec validates a cron specification.
Purpose: This function checks whether the provided cron specification is valid. It supports both standard cron formats (5 or 6 fields) and predefined expressions.
Supported formats: - Standard cron expressions with 5 fields (minute, hour, day, month, day of week). - Extended cron expressions with 6 fields (second, minute, hour, day, month, day of week). - Predefined expressions such as "@yearly", "@daily", "@hourly", etc., which are mapped to standard cron strings.
Validation rules: - Ensures the cron syntax is correct. - Maps predefined expressions to their equivalent cron strings. - Delegates parsing and validation to the parseCronSpec function.
func (*TaskScheduler) WithCheckInterval ¶
func (s *TaskScheduler) WithCheckInterval(interval time.Duration) *TaskScheduler
WithCheckInterval sets the interval for checking scheduled tasks
func (*TaskScheduler) WithDefaultRetries ¶
func (s *TaskScheduler) WithDefaultRetries(retries int) *TaskScheduler
WithDefaultRetries sets the default number of retries for failed tasks
func (*TaskScheduler) WithDefaultTimeout ¶
func (s *TaskScheduler) WithDefaultTimeout(timeout time.Duration) *TaskScheduler
WithDefaultTimeout sets the default timeout for task execution
func (*TaskScheduler) WithRetryDelay ¶
func (s *TaskScheduler) WithRetryDelay(delay time.Duration) *TaskScheduler
WithRetryDelay sets the delay between retries