Documentation
¶
Overview ¶
Package scheduler provides job scheduling and task execution capabilities for the modular framework.
This module implements a flexible job scheduler that supports both immediate and scheduled job execution, configurable worker pools, job persistence, and comprehensive job lifecycle management. It's designed for reliable background task processing in web applications and services.
Features ¶
The scheduler module provides the following capabilities:
- Immediate and scheduled job execution
- Configurable worker pools for concurrent processing
- Job persistence with multiple storage backends
- Job status tracking and lifecycle management
- Automatic job cleanup and retention policies
- Service interface for dependency injection
- Thread-safe operations for concurrent access
Service Registration ¶
The module registers a scheduler service for dependency injection:
// Get the scheduler service scheduler := app.GetService("scheduler.provider").(*SchedulerModule) // Schedule immediate job job := scheduler.ScheduleJob("process-data", processDataFunc, time.Now()) // Schedule delayed job futureTime := time.Now().Add(time.Hour) job := scheduler.ScheduleJob("cleanup", cleanupFunc, futureTime)
Usage Examples ¶
Basic job scheduling:
// Define a job function emailJob := func(ctx context.Context) error { return sendEmail("user@example.com", "Welcome!") } // Schedule immediate execution job := scheduler.ScheduleJob("send-welcome-email", emailJob, time.Now()) // Schedule for later scheduledTime := time.Now().Add(time.Minute * 30) job := scheduler.ScheduleJob("send-reminder", reminderJob, scheduledTime)
Job with custom options:
// Create scheduler with custom options customScheduler := NewScheduler( jobStore, WithWorkerCount(10), WithQueueSize(500), WithCheckInterval(time.Second * 5), )
Index ¶
- Constants
- Variables
- func NewModule() modular.Module
- type EventEmitter
- type Job
- type JobExecution
- type JobFunc
- type JobStatus
- type JobStore
- type MemoryJobStore
- func (s *MemoryJobStore) AddJob(job Job) error
- func (s *MemoryJobStore) AddJobExecution(execution JobExecution) error
- func (s *MemoryJobStore) CleanupOldExecutions(before time.Time) error
- func (s *MemoryJobStore) DeleteJob(jobID string) error
- func (s *MemoryJobStore) GetDueJobs(before time.Time) ([]Job, error)
- func (s *MemoryJobStore) GetJob(jobID string) (Job, error)
- func (s *MemoryJobStore) GetJobExecutions(jobID string) ([]JobExecution, error)
- func (s *MemoryJobStore) GetJobs() ([]Job, error)
- func (s *MemoryJobStore) GetPendingJobs() ([]Job, error)
- func (s *MemoryJobStore) LoadFromFile(filePath string) ([]Job, error)
- func (s *MemoryJobStore) SaveToFile(jobs []Job, filePath string) error
- func (s *MemoryJobStore) UpdateJob(job Job) error
- func (s *MemoryJobStore) UpdateJobExecution(execution JobExecution) error
- type PersistableJobStore
- type Scheduler
- func (s *Scheduler) CancelJob(jobID string) error
- func (s *Scheduler) GetJob(jobID string) (Job, error)
- func (s *Scheduler) GetJobHistory(jobID string) ([]JobExecution, error)
- func (s *Scheduler) ListJobs() ([]Job, error)
- func (s *Scheduler) ResumeJob(job Job) (string, error)
- func (s *Scheduler) ResumeRecurringJob(job Job) (string, error)
- func (s *Scheduler) ScheduleJob(job Job) (string, error)
- func (s *Scheduler) ScheduleRecurring(name string, cronExpr string, jobFunc JobFunc) (string, error)
- func (s *Scheduler) Start(ctx context.Context) error
- func (s *Scheduler) Stop(ctx context.Context) error
- type SchedulerConfig
- type SchedulerModule
- func (m *SchedulerModule) CancelJob(jobID string) error
- func (m *SchedulerModule) Constructor() modular.ModuleConstructor
- func (m *SchedulerModule) Dependencies() []string
- func (m *SchedulerModule) EmitEvent(ctx context.Context, event cloudevents.Event) error
- func (m *SchedulerModule) GetJob(jobID string) (Job, error)
- func (m *SchedulerModule) GetJobHistory(jobID string) ([]JobExecution, error)
- func (m *SchedulerModule) GetRegisteredEventTypes() []string
- func (m *SchedulerModule) Init(app modular.Application) error
- func (m *SchedulerModule) ListJobs() ([]Job, error)
- func (m *SchedulerModule) Name() string
- func (m *SchedulerModule) ProvidesServices() []modular.ServiceProvider
- func (m *SchedulerModule) RegisterConfig(app modular.Application) error
- func (m *SchedulerModule) RegisterObservers(subject modular.Subject) error
- func (m *SchedulerModule) RequiresServices() []modular.ServiceDependency
- func (m *SchedulerModule) ScheduleJob(job Job) (string, error)
- func (m *SchedulerModule) ScheduleRecurring(name string, cronExpr string, jobFunc JobFunc) (string, error)
- func (m *SchedulerModule) Start(ctx context.Context) error
- func (m *SchedulerModule) Stop(ctx context.Context) error
- type SchedulerOption
Constants ¶
const ( // Configuration events EventTypeConfigLoaded = "com.modular.scheduler.config.loaded" EventTypeConfigValidated = "com.modular.scheduler.config.validated" // Job lifecycle events EventTypeJobScheduled = "com.modular.scheduler.job.scheduled" EventTypeJobStarted = "com.modular.scheduler.job.started" EventTypeJobCompleted = "com.modular.scheduler.job.completed" EventTypeJobFailed = "com.modular.scheduler.job.failed" EventTypeJobCancelled = "com.modular.scheduler.job.cancelled" EventTypeJobRemoved = "com.modular.scheduler.job.removed" // Scheduler events EventTypeSchedulerStarted = "com.modular.scheduler.scheduler.started" EventTypeSchedulerStopped = "com.modular.scheduler.scheduler.stopped" EventTypeSchedulerPaused = "com.modular.scheduler.scheduler.paused" EventTypeSchedulerResumed = "com.modular.scheduler.scheduler.resumed" // Worker pool events EventTypeWorkerStarted = "com.modular.scheduler.worker.started" EventTypeWorkerStopped = "com.modular.scheduler.worker.stopped" EventTypeWorkerBusy = "com.modular.scheduler.worker.busy" EventTypeWorkerIdle = "com.modular.scheduler.worker.idle" // Module lifecycle events EventTypeModuleStarted = "com.modular.scheduler.module.started" EventTypeModuleStopped = "com.modular.scheduler.module.stopped" // Error events EventTypeError = "com.modular.scheduler.error" EventTypeWarning = "com.modular.scheduler.warning" )
Event type constants for scheduler module events. Following CloudEvents specification reverse domain notation.
const ModuleName = "scheduler"
ModuleName is the unique identifier for the scheduler module.
const ServiceName = "scheduler.provider"
ServiceName is the name of the service provided by this module. Other modules can use this name to request the scheduler service through dependency injection.
Variables ¶
var ( ErrJobAlreadyExists = errors.New("job already exists") ErrJobNotFound = errors.New("job not found") ErrNoExecutionsFound = errors.New("no executions found for job") ErrExecutionNotFound = errors.New("execution not found") )
Memory store errors
var ( ErrSchedulerShutdownTimeout = errors.New("scheduler shutdown timed out") ErrJobInvalidSchedule = errors.New("job must have either RunAt or Schedule specified") ErrRecurringJobNeedsSchedule = errors.New("recurring jobs must have a Schedule") ErrJobIDRequired = errors.New("job ID must be provided when resuming a job") ErrJobNoValidNextRunTime = errors.New("job has no valid next run time") ErrRecurringJobIDRequired = errors.New("job ID must be provided when resuming a recurring job") ErrJobMustBeRecurring = errors.New("job must be recurring and have a schedule") )
Scheduler errors
var (
ErrJobStoreNotPersistable = errors.New("job store does not implement PersistableJobStore interface")
)
Module errors
var ( // ErrNoSubjectForEventEmission is returned when trying to emit events without a subject ErrNoSubjectForEventEmission = errors.New("no subject available for event emission") )
Module-specific errors for scheduler module. These errors are defined locally to ensure proper linting compliance.
Functions ¶
Types ¶
type EventEmitter ¶ added in v0.1.2
type EventEmitter interface {
EmitEvent(ctx context.Context, event cloudevents.Event) error
}
EventEmitter interface for emitting events from the scheduler
type Job ¶
type Job struct { ID string `json:"id"` Name string `json:"name"` Schedule string `json:"schedule,omitempty"` RunAt time.Time `json:"runAt,omitempty"` IsRecurring bool `json:"isRecurring"` JobFunc JobFunc `json:"-"` CreatedAt time.Time `json:"createdAt"` UpdatedAt time.Time `json:"updatedAt"` Status JobStatus `json:"status"` LastRun *time.Time `json:"lastRun,omitempty"` NextRun *time.Time `json:"nextRun,omitempty"` }
Job represents a scheduled job
type JobExecution ¶
type JobExecution struct { JobID string `json:"jobId"` StartTime time.Time `json:"startTime"` EndTime time.Time `json:"endTime,omitempty"` Status string `json:"status"` Error string `json:"error,omitempty"` }
JobExecution records details about a single execution of a job
type JobStatus ¶
type JobStatus string
JobStatus represents the status of a job
const ( // JobStatusPending indicates a job is waiting to be executed JobStatusPending JobStatus = "pending" // JobStatusRunning indicates a job is currently executing JobStatusRunning JobStatus = "running" // JobStatusCompleted indicates a job has completed successfully JobStatusCompleted JobStatus = "completed" // JobStatusFailed indicates a job has failed JobStatusFailed JobStatus = "failed" // JobStatusCancelled indicates a job has been cancelled JobStatusCancelled JobStatus = "cancelled" )
type JobStore ¶
type JobStore interface { // AddJob stores a new job AddJob(job Job) error // UpdateJob updates an existing job UpdateJob(job Job) error // GetJob retrieves a job by ID GetJob(jobID string) (Job, error) // GetJobs returns all jobs GetJobs() ([]Job, error) // GetPendingJobs returns all pending jobs GetPendingJobs() ([]Job, error) // GetDueJobs returns jobs that are due to run at or before the given time GetDueJobs(before time.Time) ([]Job, error) // DeleteJob removes a job DeleteJob(jobID string) error // AddJobExecution records a job execution AddJobExecution(execution JobExecution) error // UpdateJobExecution updates a job execution UpdateJobExecution(execution JobExecution) error // GetJobExecutions retrieves execution history for a job GetJobExecutions(jobID string) ([]JobExecution, error) // CleanupOldExecutions removes execution records older than retention period CleanupOldExecutions(before time.Time) error }
JobStore defines the interface for job storage implementations
type MemoryJobStore ¶
type MemoryJobStore struct {
// contains filtered or unexported fields
}
MemoryJobStore implements JobStore using in-memory storage
func NewMemoryJobStore ¶
func NewMemoryJobStore(retentionPeriod time.Duration) *MemoryJobStore
NewMemoryJobStore creates a new memory job store
func (*MemoryJobStore) AddJob ¶
func (s *MemoryJobStore) AddJob(job Job) error
AddJob stores a new job
func (*MemoryJobStore) AddJobExecution ¶
func (s *MemoryJobStore) AddJobExecution(execution JobExecution) error
AddJobExecution records a job execution
func (*MemoryJobStore) CleanupOldExecutions ¶
func (s *MemoryJobStore) CleanupOldExecutions(before time.Time) error
CleanupOldExecutions removes execution records older than retention period
func (*MemoryJobStore) DeleteJob ¶
func (s *MemoryJobStore) DeleteJob(jobID string) error
DeleteJob removes a job
func (*MemoryJobStore) GetDueJobs ¶
func (s *MemoryJobStore) GetDueJobs(before time.Time) ([]Job, error)
GetDueJobs returns jobs that are due to run at or before the given time
func (*MemoryJobStore) GetJob ¶
func (s *MemoryJobStore) GetJob(jobID string) (Job, error)
GetJob retrieves a job by ID
func (*MemoryJobStore) GetJobExecutions ¶
func (s *MemoryJobStore) GetJobExecutions(jobID string) ([]JobExecution, error)
GetJobExecutions retrieves execution history for a job
func (*MemoryJobStore) GetJobs ¶
func (s *MemoryJobStore) GetJobs() ([]Job, error)
GetJobs returns all jobs
func (*MemoryJobStore) GetPendingJobs ¶
func (s *MemoryJobStore) GetPendingJobs() ([]Job, error)
GetPendingJobs returns all pending jobs
func (*MemoryJobStore) LoadFromFile ¶
func (s *MemoryJobStore) LoadFromFile(filePath string) ([]Job, error)
LoadFromFile loads jobs from a JSON file
func (*MemoryJobStore) SaveToFile ¶
func (s *MemoryJobStore) SaveToFile(jobs []Job, filePath string) error
SaveToFile saves jobs to a JSON file
func (*MemoryJobStore) UpdateJob ¶
func (s *MemoryJobStore) UpdateJob(job Job) error
UpdateJob updates an existing job
func (*MemoryJobStore) UpdateJobExecution ¶
func (s *MemoryJobStore) UpdateJobExecution(execution JobExecution) error
UpdateJobExecution updates a job execution
type PersistableJobStore ¶
type PersistableJobStore interface { JobStore // LoadFromFile loads jobs from a file LoadFromFile(filePath string) ([]Job, error) // SaveToFile saves jobs to a file SaveToFile(jobs []Job, filePath string) error }
PersistableJobStore extends JobStore with persistence capabilities
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler handles scheduling and executing jobs
func NewScheduler ¶
func NewScheduler(jobStore JobStore, opts ...SchedulerOption) *Scheduler
NewScheduler creates a new scheduler
func (*Scheduler) GetJobHistory ¶
func (s *Scheduler) GetJobHistory(jobID string) ([]JobExecution, error)
GetJobHistory returns the execution history for a job
func (*Scheduler) ResumeRecurringJob ¶
ResumeRecurringJob resumes a persisted recurring job, registering it with the cron scheduler
func (*Scheduler) ScheduleJob ¶
ScheduleJob schedules a new job
func (*Scheduler) ScheduleRecurring ¶
func (s *Scheduler) ScheduleRecurring(name string, cronExpr string, jobFunc JobFunc) (string, error)
ScheduleRecurring schedules a recurring job using a cron expression
type SchedulerConfig ¶
type SchedulerConfig struct { // WorkerCount is the number of worker goroutines to run WorkerCount int `json:"workerCount" yaml:"workerCount" validate:"min=1" env:"WORKER_COUNT"` // QueueSize is the maximum number of jobs to queue QueueSize int `json:"queueSize" yaml:"queueSize" validate:"min=1" env:"QUEUE_SIZE"` // ShutdownTimeout is the time to wait for graceful shutdown ShutdownTimeout time.Duration `json:"shutdownTimeout" yaml:"shutdownTimeout" env:"SHUTDOWN_TIMEOUT"` // StorageType is the type of job storage to use (memory, file, etc.) StorageType string `json:"storageType" yaml:"storageType" validate:"oneof=memory file" env:"STORAGE_TYPE"` // CheckInterval is how often to check for scheduled jobs CheckInterval time.Duration `json:"checkInterval" yaml:"checkInterval" env:"CHECK_INTERVAL"` // RetentionDays is how many days to retain job history RetentionDays int `json:"retentionDays" yaml:"retentionDays" validate:"min=1" env:"RETENTION_DAYS"` // PersistenceFile is the file path for job persistence PersistenceFile string `json:"persistenceFile" yaml:"persistenceFile" env:"PERSISTENCE_FILE"` // EnablePersistence determines if jobs should be persisted between restarts EnablePersistence bool `json:"enablePersistence" yaml:"enablePersistence" env:"ENABLE_PERSISTENCE"` }
SchedulerConfig defines the configuration for the scheduler module
type SchedulerModule ¶
type SchedulerModule struct {
// contains filtered or unexported fields
}
SchedulerModule provides job scheduling and task execution capabilities. It manages a pool of worker goroutines that execute scheduled jobs and provides persistence and lifecycle management for jobs.
The module implements the following interfaces:
- modular.Module: Basic module lifecycle
- modular.Configurable: Configuration management
- modular.ServiceAware: Service dependency management
- modular.Startable: Startup logic
- modular.Stoppable: Shutdown logic
Job execution is thread-safe and supports concurrent job processing.
func (*SchedulerModule) CancelJob ¶
func (m *SchedulerModule) CancelJob(jobID string) error
CancelJob cancels a scheduled job
func (*SchedulerModule) Constructor ¶
func (m *SchedulerModule) Constructor() modular.ModuleConstructor
Constructor provides a dependency injection constructor for the module
func (*SchedulerModule) Dependencies ¶
func (m *SchedulerModule) Dependencies() []string
Dependencies returns the names of modules this module depends on
func (*SchedulerModule) EmitEvent ¶ added in v0.1.2
func (m *SchedulerModule) EmitEvent(ctx context.Context, event cloudevents.Event) error
EmitEvent implements the ObservableModule interface. This allows the scheduler module to emit events that other modules or observers can receive.
func (*SchedulerModule) GetJob ¶
func (m *SchedulerModule) GetJob(jobID string) (Job, error)
GetJob returns information about a scheduled job
func (*SchedulerModule) GetJobHistory ¶
func (m *SchedulerModule) GetJobHistory(jobID string) ([]JobExecution, error)
GetJobHistory returns the execution history for a job
func (*SchedulerModule) GetRegisteredEventTypes ¶ added in v0.1.2
func (m *SchedulerModule) GetRegisteredEventTypes() []string
GetRegisteredEventTypes implements the ObservableModule interface. Returns all event types that this scheduler module can emit.
func (*SchedulerModule) Init ¶
func (m *SchedulerModule) Init(app modular.Application) error
Init initializes the module
func (*SchedulerModule) ListJobs ¶
func (m *SchedulerModule) ListJobs() ([]Job, error)
ListJobs returns a list of all scheduled jobs
func (*SchedulerModule) Name ¶
func (m *SchedulerModule) Name() string
Name returns the unique identifier for this module. This name is used for service registration, dependency resolution, and configuration section identification.
func (*SchedulerModule) ProvidesServices ¶
func (m *SchedulerModule) ProvidesServices() []modular.ServiceProvider
ProvidesServices declares services provided by this module
func (*SchedulerModule) RegisterConfig ¶
func (m *SchedulerModule) RegisterConfig(app modular.Application) error
RegisterConfig registers the module's configuration structure. This method is called during application initialization to register the default configuration values for the scheduler module.
Default configuration:
- WorkerCount: 5 worker goroutines
- QueueSize: 100 job queue capacity
- ShutdownTimeout: 30s for graceful shutdown
- StorageType: "memory" storage backend
- CheckInterval: 1s for job polling
- RetentionDays: 7 days for completed job retention
func (*SchedulerModule) RegisterObservers ¶ added in v0.1.2
func (m *SchedulerModule) RegisterObservers(subject modular.Subject) error
RegisterObservers implements the ObservableModule interface. This allows the scheduler module to register as an observer for events it's interested in.
func (*SchedulerModule) RequiresServices ¶
func (m *SchedulerModule) RequiresServices() []modular.ServiceDependency
RequiresServices declares services required by this module
func (*SchedulerModule) ScheduleJob ¶
func (m *SchedulerModule) ScheduleJob(job Job) (string, error)
ScheduleJob schedules a new job
func (*SchedulerModule) ScheduleRecurring ¶
func (m *SchedulerModule) ScheduleRecurring(name string, cronExpr string, jobFunc JobFunc) (string, error)
ScheduleRecurring schedules a recurring job using a cron expression
type SchedulerOption ¶
type SchedulerOption func(*Scheduler)
SchedulerOption defines a function that can configure a scheduler
func WithCheckInterval ¶
func WithCheckInterval(interval time.Duration) SchedulerOption
WithCheckInterval sets how often to check for scheduled jobs
func WithEventEmitter ¶ added in v0.1.2
func WithEventEmitter(emitter EventEmitter) SchedulerOption
WithEventEmitter sets the event emitter
func WithQueueSize ¶
func WithQueueSize(size int) SchedulerOption
WithQueueSize sets the job queue size
func WithWorkerCount ¶
func WithWorkerCount(count int) SchedulerOption
WithWorkerCount sets the number of workers