queue

package
v1.4.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 8, 2025 License: BSD-3-Clause Imports: 16 Imported by: 0

Documentation

Overview

Package queue provides a comprehensive task queue and background job processing system with advanced scheduling capabilities.

It supports in-memory, Redis, and RabbitMQ queues with features like:

  • Priority-based job scheduling
  • Worker pool management
  • Retry mechanisms with exponential backoff
  • Middleware support for logging, metrics, and recovery
  • Batch processing capabilities
  • Graceful shutdown handling
  • Job progress tracking
  • Comprehensive error handling
  • RabbitMQ support with persistent message delivery
  • Scheduled job execution
  • Cron-based scheduling (using enhanced cron expressions with optional seconds support)
  • Interval-based scheduling (using time.Duration)
  • Task registration and management
  • Error recovery and retries
  • Context-aware execution

Queue Manager Example:

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/Valentin-Kaiser/go-core/queue"
)

func main() {
	// Create a new queue manager
	manager := queue.NewManager()

	// Register a job handler
	manager.RegisterHandler("email", func(ctx context.Context, job *queue.Job) error {
		fmt.Printf("Processing email job: %s\n", job.ID)
		return nil
	})

	// Start the manager
	if err := manager.Start(context.Background()); err != nil {
		panic(err)
	}
	defer manager.Stop()

	// Enqueue a job
	job := queue.NewJob("email").
		WithPayload(map[string]interface{}{
			"to":      "user@example.com",
			"subject": "Welcome!",
		}).
		Build()

	if err := manager.Enqueue(context.Background(), job); err != nil {
		panic(err)
	}

	// Wait for processing
	time.Sleep(time.Second)
}

Task Scheduler Example:

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/Valentin-Kaiser/go-core/queue"
)

func main() {
	// Create a new task scheduler
	scheduler := queue.NewTaskScheduler()

	// Register a cron-based task (5 fields - traditional)
	scheduler.RegisterCronTask("cleanup", "0 0 * * *", func(ctx context.Context) error {
		fmt.Println("Running daily cleanup task")
		return nil
	})

	// Register a cron-based task with seconds (6 fields)
	scheduler.RegisterCronTask("frequent", "*/30 * * * * *", func(ctx context.Context) error {
		fmt.Println("Running every 30 seconds")
		return nil
	})

	// Register a predefined expression
	scheduler.RegisterCronTask("hourly", "@hourly", func(ctx context.Context) error {
		fmt.Println("Running every hour")
		return nil
	})

	// Register a task with named months and days
	scheduler.RegisterCronTask("weekday", "0 0 9 * * MON-FRI", func(ctx context.Context) error {
		fmt.Println("Running on weekdays at 9 AM")
		return nil
	})

	// Register an interval-based task
	scheduler.RegisterIntervalTask("health-check", time.Minute*5, func(ctx context.Context) error {
		fmt.Println("Running health check")
		return nil
	})

	// Start the scheduler
	if err := scheduler.Start(context.Background()); err != nil {
		panic(err)
	}
	defer scheduler.Stop()

	// Keep the program running
	select {}
}

Index

Constants

This section is empty.

Variables

View Source
var (
	// Presets are predefined cron expressions for common schedules
	Presets = map[string]string{
		"@yearly":   "0 0 0 1 1 *",
		"@annually": "0 0 0 1 1 *",
		"@monthly":  "0 0 0 1 * *",
		"@weekly":   "0 0 0 * * 0",
		"@daily":    "0 0 0 * * *",
		"@midnight": "0 0 0 * * *",
		"@hourly":   "0 0 * * * *",
	}
	// Months is a map of month names to their numeric values
	Months = map[string]int{
		"JAN": 1, "FEB": 2, "MAR": 3, "APR": 4, "MAY": 5, "JUN": 6,
		"JUL": 7, "AUG": 8, "SEP": 9, "OCT": 10, "NOV": 11, "DEC": 12,
	}
	// Days is a map of day names to their numeric values
	Days = map[string]int{
		"SUN": 0, "MON": 1, "TUE": 2, "WED": 3, "THU": 4, "FRI": 5, "SAT": 6,
	}
)
View Source
var ErrNoJobAvailable = apperror.NewError("no job available")

ErrNoJobAvailable is returned when no job is available in the queue

Functions

func IsRetryable

func IsRetryable(err error) bool

IsRetryable checks if an error is retryable

Types

type Batch

type Batch struct {
	ID          string            `json:"id"`
	Name        string            `json:"name"`
	JobIDs      []string          `json:"job_ids"`
	Status      Status            `json:"status"`
	Total       int               `json:"total"`
	Pending     int               `json:"pending"`
	Running     int               `json:"running"`
	Completed   int               `json:"completed"`
	Failed      int               `json:"failed"`
	CreatedAt   time.Time         `json:"created_at"`
	CompletedAt time.Time         `json:"completed_at,omitempty"`
	Metadata    map[string]string `json:"metadata,omitempty"`
}

Batch represents a batch of jobs

type BatchManager

type BatchManager struct {
	// contains filtered or unexported fields
}

BatchManager manages job batches

func NewBatchManager

func NewBatchManager(manager *Manager) *BatchManager

NewBatchManager creates a new batch manager

func (*BatchManager) CreateBatch

func (bm *BatchManager) CreateBatch(ctx context.Context, name string, jobs []*Job) (*Batch, error)

CreateBatch creates a new batch of jobs

func (*BatchManager) DeleteBatch

func (bm *BatchManager) DeleteBatch(ctx context.Context, id string) error

DeleteBatch removes a batch

func (*BatchManager) GetBatch

func (bm *BatchManager) GetBatch(id string) (*Batch, error)

GetBatch retrieves a batch by ID

func (*BatchManager) GetBatches

func (bm *BatchManager) GetBatches() []*Batch

GetBatches returns all batches

func (*BatchManager) UpdateBatchStatus

func (bm *BatchManager) UpdateBatchStatus(ctx context.Context, batchID string) error

UpdateBatchStatus updates the status of a batch based on its jobs

type CronExpression

type CronExpression struct {
	Second    *CronField // Optional seconds field (0-59)
	Minute    CronField  // Minute field (0-59)
	Hour      CronField  // Hour field (0-23)
	Day       CronField  // Day field (1-31)
	Month     CronField  // Month field (1-12)
	DayOfWeek CronField  // Day of week field (0-6, 0 = Sunday)
}

CronExpression represents a parsed cron expression

type CronField

type CronField struct {
	Min, Max int
	Values   []int
}

CronField represents a field in a cron expression

type Job

type Job struct {
	ID          string            `json:"id"`
	Type        string            `json:"type"`
	Priority    Priority          `json:"priority"`
	Status      Status            `json:"status"`
	Attempts    int               `json:"attempts"`
	MaxAttempts int               `json:"max_attempts"`
	Progress    float64           `json:"progress"`
	CreatedAt   time.Time         `json:"created_at"`
	UpdatedAt   time.Time         `json:"updated_at"`
	CompletedAt time.Time         `json:"completed_at,omitempty"`
	ScheduleAt  time.Time         `json:"schedule_at,omitempty"`
	RetryAt     time.Time         `json:"retry_at,omitempty"`
	Timeout     time.Duration     `json:"timeout"`
	Payload     json.RawMessage   `json:"payload,omitempty"`
	Results     json.RawMessage   `json:"results,omitempty"`
	Error       string            `json:"error,omitempty"`
	Metadata    map[string]string `json:"metadata,omitempty"`
}

Job represents a job to be processed

func (*Job) IsExpired

func (j *Job) IsExpired() bool

IsExpired returns true if the job has exceeded its maximum attempts

func (*Job) IsScheduled

func (j *Job) IsScheduled() bool

IsScheduled returns true if the job is scheduled for a future time

type JobBuilder

type JobBuilder struct {
	// contains filtered or unexported fields
}

JobBuilder helps create jobs with a fluent API

func NewJob

func NewJob(jobType string) *JobBuilder

NewJob creates a new job builder

func (*JobBuilder) Build

func (jb *JobBuilder) Build() *Job

Build returns the constructed job

func (*JobBuilder) WithDelay

func (jb *JobBuilder) WithDelay(delay time.Duration) *JobBuilder

WithDelay schedules the job to run after a delay

func (*JobBuilder) WithID

func (jb *JobBuilder) WithID(id string) *JobBuilder

WithID sets the job ID

func (*JobBuilder) WithJSONPayload

func (jb *JobBuilder) WithJSONPayload(jsonData []byte) *JobBuilder

WithJSONPayload sets the job payload from JSON

func (*JobBuilder) WithMaxAttempts

func (jb *JobBuilder) WithMaxAttempts(maxAttempts int) *JobBuilder

WithMaxAttempts sets the maximum number of attempts

func (*JobBuilder) WithMetadata

func (jb *JobBuilder) WithMetadata(key, value string) *JobBuilder

WithMetadata sets job metadata

func (*JobBuilder) WithPayload

func (jb *JobBuilder) WithPayload(payload map[string]interface{}) *JobBuilder

WithPayload sets the job payload

func (*JobBuilder) WithPriority

func (jb *JobBuilder) WithPriority(priority Priority) *JobBuilder

WithPriority sets the job priority

func (*JobBuilder) WithScheduleAt

func (jb *JobBuilder) WithScheduleAt(scheduleAt time.Time) *JobBuilder

WithScheduleAt schedules the job for a specific time

type JobContext

type JobContext struct {
	Job *Job
}

JobContext provides context for job execution

func NewJobContext

func NewJobContext(_ context.Context, job *Job) *JobContext

NewJobContext creates a new job context

func (*JobContext) GetMetadata

func (jc *JobContext) GetMetadata(key string) (string, bool)

GetMetadata gets a metadata value

func (*JobContext) GetPayload

func (jc *JobContext) GetPayload(key string) (interface{}, bool)

GetPayload gets a value from the job payload

func (*JobContext) GetPayloadBool

func (jc *JobContext) GetPayloadBool(key string) (bool, bool)

GetPayloadBool gets a bool value from the job payload

func (*JobContext) GetPayloadInt

func (jc *JobContext) GetPayloadInt(key string) (int, bool)

GetPayloadInt gets an int value from the job payload

func (*JobContext) GetPayloadString

func (jc *JobContext) GetPayloadString(key string) (string, bool)

GetPayloadString gets a string value from the job payload

func (*JobContext) ReportProgress

func (jc *JobContext) ReportProgress(progress float64)

ReportProgress reports job progress

type JobHandler

type JobHandler func(ctx context.Context, job *Job) error

JobHandler is a function that processes jobs

func LoggingMiddleware

func LoggingMiddleware(next JobHandler) JobHandler

LoggingMiddleware logs job execution

func MetricsMiddleware

func MetricsMiddleware(next JobHandler) JobHandler

MetricsMiddleware tracks job metrics

func MiddlewareChain

func MiddlewareChain(handler JobHandler, middlewares ...Middleware) JobHandler

MiddlewareChain applies multiple middlewares to a job handler

func RecoveryMiddleware

func RecoveryMiddleware(next JobHandler) JobHandler

RecoveryMiddleware recovers from panics in job handlers

type JobProgress

type JobProgress struct {
	JobID    string  `json:"job_id"`
	Progress float64 `json:"progress"`
	Message  string  `json:"message,omitempty"`
}

JobProgress represents job progress information

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager manages the job queue and workers

func NewManager

func NewManager() *Manager

NewManager creates a new queue manager with default settings

func (*Manager) Enqueue

func (m *Manager) Enqueue(ctx context.Context, job *Job) error

Enqueue adds a job to the queue

func (*Manager) GetJob

func (m *Manager) GetJob(ctx context.Context, id string) (*Job, error)

GetJob retrieves a job by ID

func (*Manager) GetJobs

func (m *Manager) GetJobs(ctx context.Context, status Status, limit int) ([]*Job, error)

GetJobs retrieves jobs by status

func (*Manager) GetStats

func (m *Manager) GetStats() *Stats

GetStats returns current queue statistics

func (*Manager) IsRunning

func (m *Manager) IsRunning() bool

IsRunning returns true if the manager is currently running

func (*Manager) RegisterHandler

func (m *Manager) RegisterHandler(jobType string, handler JobHandler)

RegisterHandler registers a job handler for a specific job type

func (*Manager) Schedule

func (m *Manager) Schedule(ctx context.Context, job *Job) error

Schedule adds a scheduled job to the queue

func (*Manager) Start

func (m *Manager) Start(ctx context.Context) error

Start starts the queue manager

func (*Manager) Stop

func (m *Manager) Stop() error

Stop stops the queue manager gracefully

func (*Manager) WithQueue

func (m *Manager) WithQueue(queue Queue) *Manager

WithQueue sets the queue implementation

func (*Manager) WithRabbitMQ

func (m *Manager) WithRabbitMQ(config RabbitMQConfig) *Manager

WithRabbitMQ sets the queue to use RabbitMQ with the given configuration

func (*Manager) WithRabbitMQFromURL

func (m *Manager) WithRabbitMQFromURL(url string) *Manager

WithRabbitMQFromURL sets the queue to use RabbitMQ with the given URL

func (*Manager) WithRetryAttempts

func (m *Manager) WithRetryAttempts(attempts int) *Manager

WithRetryAttempts sets the maximum number of retry attempts

func (*Manager) WithRetryBackoff

func (m *Manager) WithRetryBackoff(backoff float64) *Manager

WithRetryBackoff sets the retry backoff multiplier

func (*Manager) WithRetryDelay

func (m *Manager) WithRetryDelay(delay time.Duration) *Manager

WithRetryDelay sets the retry delay

func (*Manager) WithScheduleInterval

func (m *Manager) WithScheduleInterval(interval time.Duration) *Manager

WithScheduleInterval sets the interval for checking scheduled jobs

func (*Manager) WithWorkers

func (m *Manager) WithWorkers(workers int) *Manager

WithWorkers sets the number of workers

type MemoryQueue

type MemoryQueue struct {
	// contains filtered or unexported fields
}

MemoryQueue implements an in-memory job queue with priority support

func NewMemoryQueue

func NewMemoryQueue() *MemoryQueue

NewMemoryQueue creates and initializes a new in-memory job queue with priority support.

Thread-safety: The queue is thread-safe, utilizing a sync.RWMutex for concurrent access and a channel for notifications. This ensures safe operations in multi-threaded environments.

Intended use cases: The queue is suitable for managing jobs in memory, particularly in scenarios requiring priority-based scheduling and thread-safe operations. It is ideal for applications where job persistence is not required, and all jobs can be managed in memory.

func (*MemoryQueue) Close

func (mq *MemoryQueue) Close() error

Close closes the queue

func (*MemoryQueue) DeleteJob

func (mq *MemoryQueue) DeleteJob(_ context.Context, id string) error

DeleteJob removes a job from the queue

func (*MemoryQueue) Dequeue

func (mq *MemoryQueue) Dequeue(ctx context.Context, timeout time.Duration) (*Job, error)

Dequeue removes and returns the next job from the queue

func (*MemoryQueue) Enqueue

func (mq *MemoryQueue) Enqueue(_ context.Context, job *Job) error

Enqueue adds a job to the queue

func (*MemoryQueue) GetJob

func (mq *MemoryQueue) GetJob(_ context.Context, id string) (*Job, error)

GetJob retrieves a job by ID

func (*MemoryQueue) GetJobs

func (mq *MemoryQueue) GetJobs(_ context.Context, status Status, limit int) ([]*Job, error)

GetJobs retrieves jobs by status

func (*MemoryQueue) GetStats

func (mq *MemoryQueue) GetStats(_ context.Context) (*Stats, error)

GetStats returns queue statistics

func (*MemoryQueue) Schedule

func (mq *MemoryQueue) Schedule(_ context.Context, job *Job) error

Schedule adds a scheduled job to the queue

func (*MemoryQueue) UpdateJob

func (mq *MemoryQueue) UpdateJob(_ context.Context, job *Job) error

UpdateJob updates an existing job

type Middleware

type Middleware func(JobHandler) JobHandler

Middleware is a function that wraps a JobHandler

func TimeoutMiddleware

func TimeoutMiddleware(timeout time.Duration) Middleware

TimeoutMiddleware adds timeout to job execution

type Priority

type Priority int

Priority defines the priority levels for jobs

const (
	// PriorityLow represents the lowest priority level
	PriorityLow Priority = iota
	// PriorityNormal represents the normal priority level
	PriorityNormal
	// PriorityHigh represents the high priority level
	PriorityHigh
	// PriorityCritical represents the critical priority level
	PriorityCritical
)

func (Priority) String

func (p Priority) String() string

String returns the string representation of the priority

type Queue

type Queue interface {
	Enqueue(ctx context.Context, job *Job) error
	Dequeue(ctx context.Context, timeout time.Duration) (*Job, error)
	Schedule(ctx context.Context, job *Job) error
	UpdateJob(ctx context.Context, job *Job) error
	GetJob(ctx context.Context, id string) (*Job, error)
	GetJobs(ctx context.Context, status Status, limit int) ([]*Job, error)
	GetStats(ctx context.Context) (*Stats, error)
	DeleteJob(ctx context.Context, id string) error
	Close() error
}

Queue defines the interface for job queues

type RabbitMQ added in v1.4.3

type RabbitMQ struct {
	// contains filtered or unexported fields
}

RabbitMQ implements a RabbitMQ-backed job queue

func NewRabbitMQ added in v1.4.3

func NewRabbitMQ(config RabbitMQConfig) (*RabbitMQ, error)

NewRabbitMQ creates a new RabbitMQ-backed queue.

Configuration Parameters: - URL: The RabbitMQ server URL. This is required for establishing a connection. - QueueName: The name of the queue to use. This is required and must be unique. - ExchangeName: The name of the exchange to bind the queue to. This is required. - RoutingKey: The routing key for binding the queue to the exchange. This is required. - Durable: If true, the queue will survive a broker restart. - AutoDelete: If true, the queue will be deleted when no consumers are connected. - Exclusive: If true, the queue will be used by only one connection and deleted when the connection closes. - NoWait: If true, the queue declaration will not wait for a server response. - MaxPriority: The maximum priority level for the queue (0-255). Defaults to 10 if not specified.

Connection Setup: The function establishes a connection to the RabbitMQ server using the provided URL. It then declares a queue with the specified configuration parameters and binds it to the exchange.

Error Conditions: - Missing or empty URL, QueueName, ExchangeName, or RoutingKey will result in an error. - Connection failures (e.g., network issues, invalid URL) will return a wrapped error. - Invalid MaxPriority values (outside the range 0-255) may cause unexpected behavior.

func NewRabbitMQFromURL added in v1.4.3

func NewRabbitMQFromURL(url string) (*RabbitMQ, error)

NewRabbitMQFromURL creates a new RabbitMQ queue with a simple URL

func (*RabbitMQ) Close added in v1.4.3

func (rq *RabbitMQ) Close() error

Close closes the RabbitMQ connection

func (*RabbitMQ) DeleteJob added in v1.4.3

func (rq *RabbitMQ) DeleteJob(_ context.Context, id string) error

DeleteJob removes a job from the queue

func (*RabbitMQ) Dequeue added in v1.4.3

func (rq *RabbitMQ) Dequeue(ctx context.Context, timeout time.Duration) (*Job, error)

Dequeue retrieves a job from the queue

func (*RabbitMQ) Enqueue added in v1.4.3

func (rq *RabbitMQ) Enqueue(ctx context.Context, job *Job) error

Enqueue adds a job to the queue

func (*RabbitMQ) GetJob added in v1.4.3

func (rq *RabbitMQ) GetJob(_ context.Context, id string) (*Job, error)

GetJob retrieves a job by ID

func (*RabbitMQ) GetJobs added in v1.4.3

func (rq *RabbitMQ) GetJobs(_ context.Context, status Status, limit int) ([]*Job, error)

GetJobs retrieves jobs by status

func (*RabbitMQ) GetStats added in v1.4.3

func (rq *RabbitMQ) GetStats(_ context.Context) (*Stats, error)

GetStats returns queue statistics

func (*RabbitMQ) IsConnectionOpen added in v1.4.3

func (rq *RabbitMQ) IsConnectionOpen() bool

IsConnectionOpen checks if the RabbitMQ connection is open

func (*RabbitMQ) PurgeQueue added in v1.4.3

func (rq *RabbitMQ) PurgeQueue(_ context.Context) error

PurgeQueue removes all messages from the queue

func (*RabbitMQ) Reconnect added in v1.4.3

func (rq *RabbitMQ) Reconnect(config RabbitMQConfig) error

Reconnect attempts to reconnect to RabbitMQ

func (*RabbitMQ) Schedule added in v1.4.3

func (rq *RabbitMQ) Schedule(ctx context.Context, job *Job) error

Schedule adds a job to be processed at a specific time

func (*RabbitMQ) UpdateJob added in v1.4.3

func (rq *RabbitMQ) UpdateJob(_ context.Context, job *Job) error

UpdateJob updates a job's status

type RabbitMQConfig

type RabbitMQConfig struct {
	URL          string
	QueueName    string
	ExchangeName string
	RoutingKey   string
	Durable      bool
	AutoDelete   bool
	Exclusive    bool
	NoWait       bool
	MaxPriority  int // Maximum priority level for the queue (0-255)
}

RabbitMQConfig holds configuration for RabbitMQ queue

type RedisQueue

type RedisQueue struct {
	// contains filtered or unexported fields
}

RedisQueue implements a Redis-backed job queue

func NewRedisQueue

func NewRedisQueue(client redis.Cmdable, keyPrefix string) *RedisQueue

NewRedisQueue creates a new Redis-backed queue.

Parameters: - client: A Redis client implementing the redis.Cmdable interface, used to interact with the Redis database. - keyPrefix: A string used as a prefix for all Redis keys created by the queue. If an empty string is provided, the default prefix "queue" is used.

Purpose: This function initializes a RedisQueue instance, which provides methods for enqueueing, dequeueing, and scheduling jobs in a Redis-backed job queue.

func (*RedisQueue) Close

func (rq *RedisQueue) Close() error

Close closes the queue

func (*RedisQueue) DeleteJob

func (rq *RedisQueue) DeleteJob(ctx context.Context, id string) error

DeleteJob removes a job from the queue

func (*RedisQueue) Dequeue

func (rq *RedisQueue) Dequeue(ctx context.Context, timeout time.Duration) (*Job, error)

Dequeue removes and returns the next job from the queue

func (*RedisQueue) Enqueue

func (rq *RedisQueue) Enqueue(ctx context.Context, job *Job) error

Enqueue adds a job to the queue

func (*RedisQueue) GetJob

func (rq *RedisQueue) GetJob(ctx context.Context, id string) (*Job, error)

GetJob retrieves a job by ID

func (*RedisQueue) GetJobs

func (rq *RedisQueue) GetJobs(ctx context.Context, status Status, limit int) ([]*Job, error)

GetJobs retrieves jobs by status

func (*RedisQueue) GetStats

func (rq *RedisQueue) GetStats(ctx context.Context) (*Stats, error)

GetStats returns queue statistics

func (*RedisQueue) MoveScheduledToPending

func (rq *RedisQueue) MoveScheduledToPending(ctx context.Context) error

MoveScheduledToPending moves scheduled jobs that are ready to be processed

func (*RedisQueue) Schedule

func (rq *RedisQueue) Schedule(ctx context.Context, job *Job) error

Schedule adds a scheduled job to the queue

func (*RedisQueue) UpdateJob

func (rq *RedisQueue) UpdateJob(ctx context.Context, job *Job) error

UpdateJob updates an existing job

type RetryableError

type RetryableError struct {
	Err error
}

RetryableError represents an error that should trigger a retry

func NewRetryableError

func NewRetryableError(err error) *RetryableError

NewRetryableError creates a new retryable error

func (*RetryableError) Error

func (e *RetryableError) Error() string

type Stats

type Stats struct {
	JobsProcessed int64 `json:"jobs_processed"`
	JobsFailed    int64 `json:"jobs_failed"`
	JobsRetried   int64 `json:"jobs_retried"`
	QueueSize     int64 `json:"queue_size"`
	WorkersActive int64 `json:"workers_active"`
	WorkersBusy   int64 `json:"workers_busy"`
	TotalJobs     int64 `json:"total_jobs"`
	Pending       int64 `json:"pending"`
	Running       int64 `json:"running"`
	Completed     int64 `json:"completed"`
	Failed        int64 `json:"failed"`
	Retrying      int64 `json:"retrying"`
	Scheduled     int64 `json:"scheduled"`
	DeadLetter    int64 `json:"dead_letter"`
}

Stats represents queue statistics

type Status

type Status int

Status represents the status of a job

const (
	// StatusPending indicates the job is pending execution
	StatusPending Status = iota
	// StatusRunning indicates the job is currently being executed
	StatusRunning
	// StatusCompleted indicates the job has completed successfully
	StatusCompleted
	// StatusFailed indicates the job has failed
	StatusFailed
	// StatusRetrying indicates the job is scheduled for retry
	StatusRetrying
	// StatusScheduled indicates the job is scheduled for future execution
	StatusScheduled
	// StatusDeadLetter indicates the job has been moved to dead letter queue
	StatusDeadLetter
)

func (Status) String

func (s Status) String() string

String returns the string representation of the status

type Task

type Task struct {
	ID         string        `json:"id"`
	Name       string        `json:"name"`
	Type       TaskType      `json:"type"`
	CronSpec   string        `json:"cron_spec,omitempty"`
	Interval   time.Duration `json:"interval,omitempty"`
	Function   TaskFunc      `json:"-"`
	NextRun    time.Time     `json:"next_run"`
	LastRun    time.Time     `json:"last_run"`
	RunCount   int64         `json:"run_count"`
	ErrorCount int64         `json:"error_count"`
	LastError  string        `json:"last_error,omitempty"`
	IsRunning  bool          `json:"is_running"`
	MaxRetries int           `json:"max_retries"`
	RetryDelay time.Duration `json:"retry_delay"`
	Timeout    time.Duration `json:"timeout"`
	Enabled    bool          `json:"enabled"`
	CreatedAt  time.Time     `json:"created_at"`
	UpdatedAt  time.Time     `json:"updated_at"`
}

Task represents a scheduled task

type TaskFunc

type TaskFunc func(ctx context.Context) error

TaskFunc represents a task function that can be executed

type TaskOptions

type TaskOptions struct {
	MaxRetries int
	RetryDelay time.Duration
	Timeout    time.Duration
	Enabled    *bool
}

TaskOptions provides configuration options for tasks

type TaskScheduler

type TaskScheduler struct {
	// contains filtered or unexported fields
}

TaskScheduler manages background tasks

func NewTaskScheduler

func NewTaskScheduler() *TaskScheduler

NewTaskScheduler creates a new task scheduler with default settings

func (*TaskScheduler) DisableTask

func (s *TaskScheduler) DisableTask(name string) error

DisableTask disables a task

func (*TaskScheduler) EnableTask

func (s *TaskScheduler) EnableTask(name string) error

EnableTask enables a task

func (*TaskScheduler) GetTask

func (s *TaskScheduler) GetTask(name string) (*Task, error)

GetTask returns a task by name

func (*TaskScheduler) GetTasks

func (s *TaskScheduler) GetTasks() map[string]*Task

GetTasks returns all registered tasks

func (*TaskScheduler) IsRunning

func (s *TaskScheduler) IsRunning() bool

IsRunning returns true if the scheduler is running

func (*TaskScheduler) ParseCronSpec

func (s *TaskScheduler) ParseCronSpec(cronSpec string) (*CronExpression, error)

ParseCronSpec parses a cron specification

func (*TaskScheduler) RegisterCronTask

func (s *TaskScheduler) RegisterCronTask(name, cronSpec string, fn TaskFunc) error

RegisterCronTask registers a new cron-based task

func (*TaskScheduler) RegisterCronTaskWithOptions

func (s *TaskScheduler) RegisterCronTaskWithOptions(name, cronSpec string, fn TaskFunc, options TaskOptions) error

RegisterCronTaskWithOptions registers a new cron-based task with options

func (*TaskScheduler) RegisterIntervalTask

func (s *TaskScheduler) RegisterIntervalTask(name string, interval time.Duration, fn TaskFunc) error

RegisterIntervalTask registers a new interval-based task

func (*TaskScheduler) RegisterIntervalTaskWithOptions

func (s *TaskScheduler) RegisterIntervalTaskWithOptions(name string, interval time.Duration, fn TaskFunc, options TaskOptions) error

RegisterIntervalTaskWithOptions registers a new interval-based task with options

func (*TaskScheduler) RemoveTask

func (s *TaskScheduler) RemoveTask(name string) error

RemoveTask removes a task from the scheduler

func (*TaskScheduler) Start

func (s *TaskScheduler) Start(ctx context.Context) error

Start starts the task scheduler

func (*TaskScheduler) Stop

func (s *TaskScheduler) Stop()

Stop stops the task scheduler

func (*TaskScheduler) ValidateCronSpec

func (s *TaskScheduler) ValidateCronSpec(spec string) error

ValidateCronSpec validates a cron specification.

Purpose: This function checks whether the provided cron specification is valid. It supports both standard cron formats (5 or 6 fields) and predefined expressions.

Supported formats: - Standard cron expressions with 5 fields (minute, hour, day, month, day of week). - Extended cron expressions with 6 fields (second, minute, hour, day, month, day of week). - Predefined expressions such as "@yearly", "@daily", "@hourly", etc., which are mapped to standard cron strings.

Validation rules: - Ensures the cron syntax is correct. - Maps predefined expressions to their equivalent cron strings. - Delegates parsing and validation to the parseCronSpec function.

func (*TaskScheduler) WithCheckInterval

func (s *TaskScheduler) WithCheckInterval(interval time.Duration) *TaskScheduler

WithCheckInterval sets the interval for checking scheduled tasks

func (*TaskScheduler) WithDefaultRetries

func (s *TaskScheduler) WithDefaultRetries(retries int) *TaskScheduler

WithDefaultRetries sets the default number of retries for failed tasks

func (*TaskScheduler) WithDefaultTimeout

func (s *TaskScheduler) WithDefaultTimeout(timeout time.Duration) *TaskScheduler

WithDefaultTimeout sets the default timeout for task execution

func (*TaskScheduler) WithRetryDelay

func (s *TaskScheduler) WithRetryDelay(delay time.Duration) *TaskScheduler

WithRetryDelay sets the delay between retries

type TaskType

type TaskType int

TaskType represents the type of task scheduling

const (
	// TaskTypeCron represents cron-based task scheduling
	TaskTypeCron TaskType = iota
	// TaskTypeInterval represents interval-based task scheduling
	TaskTypeInterval
)

func (TaskType) String

func (t TaskType) String() string

String returns the string representation of the task type

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL