scheduler

package module
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 6, 2025 License: MIT Imports: 12 Imported by: 1

README

Scheduler Module

Go Reference

The Scheduler Module provides job scheduling capabilities for Modular applications. It supports one-time and recurring jobs using cron syntax with comprehensive job history tracking.

Features

  • Schedule one-time jobs to run at a specific time
  • Schedule recurring jobs using cron expressions
  • Configurable worker pool for job execution
  • Job status tracking and history
  • Memory-based job storage with optional persistence
  • Graceful shutdown with configurable timeout

Installation

import (
    "github.com/GoCodeAlone/modular"
    "github.com/GoCodeAlone/modular/modules/scheduler"
)

// Register the scheduler module with your Modular application
app.RegisterModule(scheduler.NewModule())

Configuration

The scheduler module can be configured using the following options:

scheduler:
  workerCount: 5           # Number of worker goroutines to run jobs
  queueSize: 100           # Maximum size of the job queue
  shutdownTimeout: 30      # Time in seconds to wait for graceful shutdown
  storageType: memory      # Type of job storage (memory, file)
  checkInterval: 1         # How often to check for scheduled jobs (seconds)
  retentionDays: 7         # How many days to retain job history
  persistenceFile: "scheduler_jobs.json"  # File path for job persistence
  enablePersistence: false # Whether to persist jobs between restarts

Usage

Accessing the Scheduler Service
// In your module's Init function
func (m *MyModule) Init(app modular.Application) error {
    var schedulerService *scheduler.SchedulerModule
    err := app.GetService("scheduler.provider", &schedulerService)
    if err != nil {
        return fmt.Errorf("failed to get scheduler service: %w", err)
    }
    
    // Now you can use the scheduler service
    m.scheduler = schedulerService
    return nil
}
Using Interface-Based Service Matching
// Define the service dependency
func (m *MyModule) RequiresServices() []modular.ServiceDependency {
    return []modular.ServiceDependency{
        {
            Name:               "scheduler",
            Required:           true,
            MatchByInterface:   true,
            SatisfiesInterface: reflect.TypeOf((*scheduler.SchedulerModule)(nil)).Elem(),
        },
    }
}

// Access the service in your constructor
func (m *MyModule) Constructor() modular.ModuleConstructor {
    return func(app modular.Application, services map[string]any) (modular.Module, error) {
        schedulerService := services["scheduler"].(*scheduler.SchedulerModule)
        return &MyModule{scheduler: schedulerService}, nil
    }
}
Scheduling One-Time Jobs
// Schedule a job to run once at a specific time
jobID, err := schedulerService.ScheduleJob(scheduler.Job{
    Name:    "data-cleanup",
    RunAt:   time.Now().Add(1 * time.Hour),
    JobFunc: func(ctx context.Context) error {
        // Your job logic here
        return nil
    },
})

if err != nil {
    // Handle error
}
Scheduling Recurring Jobs
// Schedule a job to run every minute
jobID, err := schedulerService.ScheduleRecurring(
    "log-metrics",           // Job name
    "0 * * * * *",           // Cron expression (every minute)
    func(ctx context.Context) error {
        // Your job logic here
        return nil
    },
)

if err != nil {
    // Handle error
}
Managing Jobs
// Cancel a job
err := schedulerService.CancelJob(jobID)

// Get job status
job, err := schedulerService.GetJob(jobID)
if err == nil {
    fmt.Printf("Job status: %s\n", job.Status)
    if job.LastRun != nil {
        fmt.Printf("Last run: %s\n", job.LastRun.Format(time.RFC3339))
    }
    if job.NextRun != nil {
        fmt.Printf("Next run: %s\n", job.NextRun.Format(time.RFC3339))
    }
}

// List all jobs
jobs, err := schedulerService.ListJobs()
for _, job := range jobs {
    fmt.Printf("Job: %s, Status: %s\n", job.Name, job.Status)
}

// Get job execution history
history, err := schedulerService.GetJobHistory(jobID)
for _, exec := range history {
    fmt.Printf("Execution: %s, Status: %s\n", 
        exec.StartTime.Format(time.RFC3339),
        exec.Status)
}

Cron Expression Format

The scheduler uses standard cron expressions with seconds:

┌───────────── seconds (0-59)
│ ┌───────────── minute (0-59)
│ │ ┌───────────── hour (0-23)
│ │ │ ┌───────────── day of month (1-31)
│ │ │ │ ┌───────────── month (1-12)
│ │ │ │ │ ┌───────────── day of week (0-6) (Sunday to Saturday)
│ │ │ │ │ │
* * * * * *

Examples:

  • 0 0 * * * * - Every hour at 0 minutes 0 seconds
  • 0 */5 * * * * - Every 5 minutes
  • 0 0 8 * * * - Every day at 8:00 AM
  • 0 0 12 * * 1-5 - Every weekday at noon

Implementation Notes

  • The scheduler uses a worker pool model to process jobs concurrently
  • Each recurring job is registered with a cron scheduler
  • Job executions are tracked for history and reporting
  • The module supports graceful shutdown, completing in-progress jobs
  • Jobs can be persisted to disk and reloaded on application restart
Job Persistence

When the enablePersistence option is set to true, the scheduler will:

  1. Save all scheduled jobs to the configured persistenceFile when the module stops
  2. Load and reschedule jobs from this file when the module initializes

This ensures that scheduled jobs survive application restarts. Note that job functions cannot be persisted, so when loading persisted jobs:

  • For one-time jobs: Only future jobs are rescheduled
  • For recurring jobs: Jobs are rescheduled using the stored cron expression
  • Job functions need to be registered through a job registry or handler system

Example configuration with persistence enabled:

scheduler:
  enablePersistence: true
  persistenceFile: "/var/lib/myapp/scheduler_jobs.json"
  # Other configuration options...

Testing

The scheduler module includes comprehensive tests for both module integration and job scheduling logic.

Documentation

Overview

Package scheduler provides job scheduling and task execution capabilities for the modular framework.

This module implements a flexible job scheduler that supports both immediate and scheduled job execution, configurable worker pools, job persistence, and comprehensive job lifecycle management. It's designed for reliable background task processing in web applications and services.

Features

The scheduler module provides the following capabilities:

  • Immediate and scheduled job execution
  • Configurable worker pools for concurrent processing
  • Job persistence with multiple storage backends
  • Job status tracking and lifecycle management
  • Automatic job cleanup and retention policies
  • Service interface for dependency injection
  • Thread-safe operations for concurrent access

Service Registration

The module registers a scheduler service for dependency injection:

// Get the scheduler service
scheduler := app.GetService("scheduler.provider").(*SchedulerModule)

// Schedule immediate job
job := scheduler.ScheduleJob("process-data", processDataFunc, time.Now())

// Schedule delayed job
futureTime := time.Now().Add(time.Hour)
job := scheduler.ScheduleJob("cleanup", cleanupFunc, futureTime)

Usage Examples

Basic job scheduling:

// Define a job function
emailJob := func(ctx context.Context) error {
    return sendEmail("user@example.com", "Welcome!")
}

// Schedule immediate execution
job := scheduler.ScheduleJob("send-welcome-email", emailJob, time.Now())

// Schedule for later
scheduledTime := time.Now().Add(time.Minute * 30)
job := scheduler.ScheduleJob("send-reminder", reminderJob, scheduledTime)

Job with custom options:

// Create scheduler with custom options
customScheduler := NewScheduler(
    jobStore,
    WithWorkerCount(10),
    WithQueueSize(500),
    WithCheckInterval(time.Second * 5),
)

Index

Constants

View Source
const (
	// Configuration events
	EventTypeConfigLoaded    = "com.modular.scheduler.config.loaded"
	EventTypeConfigValidated = "com.modular.scheduler.config.validated"

	// Job lifecycle events
	EventTypeJobScheduled = "com.modular.scheduler.job.scheduled"
	EventTypeJobStarted   = "com.modular.scheduler.job.started"
	EventTypeJobCompleted = "com.modular.scheduler.job.completed"
	EventTypeJobFailed    = "com.modular.scheduler.job.failed"
	EventTypeJobCancelled = "com.modular.scheduler.job.cancelled"
	EventTypeJobRemoved   = "com.modular.scheduler.job.removed"

	// Scheduler events
	EventTypeSchedulerStarted = "com.modular.scheduler.scheduler.started"
	EventTypeSchedulerStopped = "com.modular.scheduler.scheduler.stopped"
	EventTypeSchedulerPaused  = "com.modular.scheduler.scheduler.paused"
	EventTypeSchedulerResumed = "com.modular.scheduler.scheduler.resumed"

	// Worker pool events
	EventTypeWorkerStarted = "com.modular.scheduler.worker.started"
	EventTypeWorkerStopped = "com.modular.scheduler.worker.stopped"
	EventTypeWorkerBusy    = "com.modular.scheduler.worker.busy"
	EventTypeWorkerIdle    = "com.modular.scheduler.worker.idle"

	// Module lifecycle events
	EventTypeModuleStarted = "com.modular.scheduler.module.started"
	EventTypeModuleStopped = "com.modular.scheduler.module.stopped"

	// Error events
	EventTypeError   = "com.modular.scheduler.error"
	EventTypeWarning = "com.modular.scheduler.warning"
)

Event type constants for scheduler module events. Following CloudEvents specification reverse domain notation.

View Source
const ModuleName = "scheduler"

ModuleName is the unique identifier for the scheduler module.

View Source
const ServiceName = "scheduler.provider"

ServiceName is the name of the service provided by this module. Other modules can use this name to request the scheduler service through dependency injection.

Variables

View Source
var (
	ErrJobAlreadyExists  = errors.New("job already exists")
	ErrJobNotFound       = errors.New("job not found")
	ErrNoExecutionsFound = errors.New("no executions found for job")
	ErrExecutionNotFound = errors.New("execution not found")
)

Memory store errors

View Source
var (
	ErrSchedulerShutdownTimeout  = errors.New("scheduler shutdown timed out")
	ErrJobInvalidSchedule        = errors.New("job must have either RunAt or Schedule specified")
	ErrRecurringJobNeedsSchedule = errors.New("recurring jobs must have a Schedule")
	ErrJobIDRequired             = errors.New("job ID must be provided when resuming a job")
	ErrJobNoValidNextRunTime     = errors.New("job has no valid next run time")
	ErrRecurringJobIDRequired    = errors.New("job ID must be provided when resuming a recurring job")
	ErrJobMustBeRecurring        = errors.New("job must be recurring and have a schedule")
)

Scheduler errors

View Source
var (
	ErrJobStoreNotPersistable = errors.New("job store does not implement PersistableJobStore interface")
)

Module errors

View Source
var (
	// ErrNoSubjectForEventEmission is returned when trying to emit events without a subject
	ErrNoSubjectForEventEmission = errors.New("no subject available for event emission")
)

Module-specific errors for scheduler module. These errors are defined locally to ensure proper linting compliance.

Functions

func NewModule

func NewModule() modular.Module

NewModule creates a new instance of the scheduler module. This is the primary constructor for the scheduler module and should be used when registering the module with the application.

Example:

app.RegisterModule(scheduler.NewModule())

Types

type EventEmitter added in v0.1.2

type EventEmitter interface {
	EmitEvent(ctx context.Context, event cloudevents.Event) error
}

EventEmitter interface for emitting events from the scheduler

type Job

type Job struct {
	ID          string     `json:"id"`
	Name        string     `json:"name"`
	Schedule    string     `json:"schedule,omitempty"`
	RunAt       time.Time  `json:"runAt,omitempty"`
	IsRecurring bool       `json:"isRecurring"`
	JobFunc     JobFunc    `json:"-"`
	CreatedAt   time.Time  `json:"createdAt"`
	UpdatedAt   time.Time  `json:"updatedAt"`
	Status      JobStatus  `json:"status"`
	LastRun     *time.Time `json:"lastRun,omitempty"`
	NextRun     *time.Time `json:"nextRun,omitempty"`
}

Job represents a scheduled job

type JobExecution

type JobExecution struct {
	JobID     string    `json:"jobId"`
	StartTime time.Time `json:"startTime"`
	EndTime   time.Time `json:"endTime,omitempty"`
	Status    string    `json:"status"`
	Error     string    `json:"error,omitempty"`
}

JobExecution records details about a single execution of a job

type JobFunc

type JobFunc func(ctx context.Context) error

JobFunc defines a function that can be executed as a job

type JobStatus

type JobStatus string

JobStatus represents the status of a job

const (
	// JobStatusPending indicates a job is waiting to be executed
	JobStatusPending JobStatus = "pending"
	// JobStatusRunning indicates a job is currently executing
	JobStatusRunning JobStatus = "running"
	// JobStatusCompleted indicates a job has completed successfully
	JobStatusCompleted JobStatus = "completed"
	// JobStatusFailed indicates a job has failed
	JobStatusFailed JobStatus = "failed"
	// JobStatusCancelled indicates a job has been cancelled
	JobStatusCancelled JobStatus = "cancelled"
)

type JobStore

type JobStore interface {
	// AddJob stores a new job
	AddJob(job Job) error

	// UpdateJob updates an existing job
	UpdateJob(job Job) error

	// GetJob retrieves a job by ID
	GetJob(jobID string) (Job, error)

	// GetJobs returns all jobs
	GetJobs() ([]Job, error)

	// GetPendingJobs returns all pending jobs
	GetPendingJobs() ([]Job, error)

	// GetDueJobs returns jobs that are due to run at or before the given time
	GetDueJobs(before time.Time) ([]Job, error)

	// DeleteJob removes a job
	DeleteJob(jobID string) error

	// AddJobExecution records a job execution
	AddJobExecution(execution JobExecution) error

	// UpdateJobExecution updates a job execution
	UpdateJobExecution(execution JobExecution) error

	// GetJobExecutions retrieves execution history for a job
	GetJobExecutions(jobID string) ([]JobExecution, error)

	// CleanupOldExecutions removes execution records older than retention period
	CleanupOldExecutions(before time.Time) error
}

JobStore defines the interface for job storage implementations

type MemoryJobStore

type MemoryJobStore struct {
	// contains filtered or unexported fields
}

MemoryJobStore implements JobStore using in-memory storage

func NewMemoryJobStore

func NewMemoryJobStore(retentionPeriod time.Duration) *MemoryJobStore

NewMemoryJobStore creates a new memory job store

func (*MemoryJobStore) AddJob

func (s *MemoryJobStore) AddJob(job Job) error

AddJob stores a new job

func (*MemoryJobStore) AddJobExecution

func (s *MemoryJobStore) AddJobExecution(execution JobExecution) error

AddJobExecution records a job execution

func (*MemoryJobStore) CleanupOldExecutions

func (s *MemoryJobStore) CleanupOldExecutions(before time.Time) error

CleanupOldExecutions removes execution records older than retention period

func (*MemoryJobStore) DeleteJob

func (s *MemoryJobStore) DeleteJob(jobID string) error

DeleteJob removes a job

func (*MemoryJobStore) GetDueJobs

func (s *MemoryJobStore) GetDueJobs(before time.Time) ([]Job, error)

GetDueJobs returns jobs that are due to run at or before the given time

func (*MemoryJobStore) GetJob

func (s *MemoryJobStore) GetJob(jobID string) (Job, error)

GetJob retrieves a job by ID

func (*MemoryJobStore) GetJobExecutions

func (s *MemoryJobStore) GetJobExecutions(jobID string) ([]JobExecution, error)

GetJobExecutions retrieves execution history for a job

func (*MemoryJobStore) GetJobs

func (s *MemoryJobStore) GetJobs() ([]Job, error)

GetJobs returns all jobs

func (*MemoryJobStore) GetPendingJobs

func (s *MemoryJobStore) GetPendingJobs() ([]Job, error)

GetPendingJobs returns all pending jobs

func (*MemoryJobStore) LoadFromFile

func (s *MemoryJobStore) LoadFromFile(filePath string) ([]Job, error)

LoadFromFile loads jobs from a JSON file

func (*MemoryJobStore) SaveToFile

func (s *MemoryJobStore) SaveToFile(jobs []Job, filePath string) error

SaveToFile saves jobs to a JSON file

func (*MemoryJobStore) UpdateJob

func (s *MemoryJobStore) UpdateJob(job Job) error

UpdateJob updates an existing job

func (*MemoryJobStore) UpdateJobExecution

func (s *MemoryJobStore) UpdateJobExecution(execution JobExecution) error

UpdateJobExecution updates a job execution

type PersistableJobStore

type PersistableJobStore interface {
	JobStore

	// LoadFromFile loads jobs from a file
	LoadFromFile(filePath string) ([]Job, error)

	// SaveToFile saves jobs to a file
	SaveToFile(jobs []Job, filePath string) error
}

PersistableJobStore extends JobStore with persistence capabilities

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler handles scheduling and executing jobs

func NewScheduler

func NewScheduler(jobStore JobStore, opts ...SchedulerOption) *Scheduler

NewScheduler creates a new scheduler

func (*Scheduler) CancelJob

func (s *Scheduler) CancelJob(jobID string) error

CancelJob cancels a scheduled job

func (*Scheduler) GetJob

func (s *Scheduler) GetJob(jobID string) (Job, error)

GetJob returns information about a scheduled job

func (*Scheduler) GetJobHistory

func (s *Scheduler) GetJobHistory(jobID string) ([]JobExecution, error)

GetJobHistory returns the execution history for a job

func (*Scheduler) ListJobs

func (s *Scheduler) ListJobs() ([]Job, error)

ListJobs returns a list of all scheduled jobs

func (*Scheduler) ResumeJob

func (s *Scheduler) ResumeJob(job Job) (string, error)

ResumeJob resumes a persisted job

func (*Scheduler) ResumeRecurringJob

func (s *Scheduler) ResumeRecurringJob(job Job) (string, error)

ResumeRecurringJob resumes a persisted recurring job, registering it with the cron scheduler

func (*Scheduler) ScheduleJob

func (s *Scheduler) ScheduleJob(job Job) (string, error)

ScheduleJob schedules a new job

func (*Scheduler) ScheduleRecurring

func (s *Scheduler) ScheduleRecurring(name string, cronExpr string, jobFunc JobFunc) (string, error)

ScheduleRecurring schedules a recurring job using a cron expression

func (*Scheduler) Start

func (s *Scheduler) Start(ctx context.Context) error

Start starts the scheduler

func (*Scheduler) Stop

func (s *Scheduler) Stop(ctx context.Context) error

Stop stops the scheduler

type SchedulerConfig

type SchedulerConfig struct {
	// WorkerCount is the number of worker goroutines to run
	WorkerCount int `json:"workerCount" yaml:"workerCount" validate:"min=1" env:"WORKER_COUNT"`

	// QueueSize is the maximum number of jobs to queue
	QueueSize int `json:"queueSize" yaml:"queueSize" validate:"min=1" env:"QUEUE_SIZE"`

	// ShutdownTimeout is the time to wait for graceful shutdown
	ShutdownTimeout time.Duration `json:"shutdownTimeout" yaml:"shutdownTimeout" env:"SHUTDOWN_TIMEOUT"`

	// StorageType is the type of job storage to use (memory, file, etc.)
	StorageType string `json:"storageType" yaml:"storageType" validate:"oneof=memory file" env:"STORAGE_TYPE"`

	// CheckInterval is how often to check for scheduled jobs
	CheckInterval time.Duration `json:"checkInterval" yaml:"checkInterval" env:"CHECK_INTERVAL"`

	// RetentionDays is how many days to retain job history
	RetentionDays int `json:"retentionDays" yaml:"retentionDays" validate:"min=1" env:"RETENTION_DAYS"`

	// PersistenceFile is the file path for job persistence
	PersistenceFile string `json:"persistenceFile" yaml:"persistenceFile" env:"PERSISTENCE_FILE"`

	// EnablePersistence determines if jobs should be persisted between restarts
	EnablePersistence bool `json:"enablePersistence" yaml:"enablePersistence" env:"ENABLE_PERSISTENCE"`
}

SchedulerConfig defines the configuration for the scheduler module

type SchedulerModule

type SchedulerModule struct {
	// contains filtered or unexported fields
}

SchedulerModule provides job scheduling and task execution capabilities. It manages a pool of worker goroutines that execute scheduled jobs and provides persistence and lifecycle management for jobs.

The module implements the following interfaces:

  • modular.Module: Basic module lifecycle
  • modular.Configurable: Configuration management
  • modular.ServiceAware: Service dependency management
  • modular.Startable: Startup logic
  • modular.Stoppable: Shutdown logic

Job execution is thread-safe and supports concurrent job processing.

func (*SchedulerModule) CancelJob

func (m *SchedulerModule) CancelJob(jobID string) error

CancelJob cancels a scheduled job

func (*SchedulerModule) Constructor

func (m *SchedulerModule) Constructor() modular.ModuleConstructor

Constructor provides a dependency injection constructor for the module

func (*SchedulerModule) Dependencies

func (m *SchedulerModule) Dependencies() []string

Dependencies returns the names of modules this module depends on

func (*SchedulerModule) EmitEvent added in v0.1.2

func (m *SchedulerModule) EmitEvent(ctx context.Context, event cloudevents.Event) error

EmitEvent implements the ObservableModule interface. This allows the scheduler module to emit events that other modules or observers can receive.

func (*SchedulerModule) GetJob

func (m *SchedulerModule) GetJob(jobID string) (Job, error)

GetJob returns information about a scheduled job

func (*SchedulerModule) GetJobHistory

func (m *SchedulerModule) GetJobHistory(jobID string) ([]JobExecution, error)

GetJobHistory returns the execution history for a job

func (*SchedulerModule) GetRegisteredEventTypes added in v0.1.2

func (m *SchedulerModule) GetRegisteredEventTypes() []string

GetRegisteredEventTypes implements the ObservableModule interface. Returns all event types that this scheduler module can emit.

func (*SchedulerModule) Init

Init initializes the module

func (*SchedulerModule) ListJobs

func (m *SchedulerModule) ListJobs() ([]Job, error)

ListJobs returns a list of all scheduled jobs

func (*SchedulerModule) Name

func (m *SchedulerModule) Name() string

Name returns the unique identifier for this module. This name is used for service registration, dependency resolution, and configuration section identification.

func (*SchedulerModule) ProvidesServices

func (m *SchedulerModule) ProvidesServices() []modular.ServiceProvider

ProvidesServices declares services provided by this module

func (*SchedulerModule) RegisterConfig

func (m *SchedulerModule) RegisterConfig(app modular.Application) error

RegisterConfig registers the module's configuration structure. This method is called during application initialization to register the default configuration values for the scheduler module.

Default configuration:

  • WorkerCount: 5 worker goroutines
  • QueueSize: 100 job queue capacity
  • ShutdownTimeout: 30s for graceful shutdown
  • StorageType: "memory" storage backend
  • CheckInterval: 1s for job polling
  • RetentionDays: 7 days for completed job retention

func (*SchedulerModule) RegisterObservers added in v0.1.2

func (m *SchedulerModule) RegisterObservers(subject modular.Subject) error

RegisterObservers implements the ObservableModule interface. This allows the scheduler module to register as an observer for events it's interested in.

func (*SchedulerModule) RequiresServices

func (m *SchedulerModule) RequiresServices() []modular.ServiceDependency

RequiresServices declares services required by this module

func (*SchedulerModule) ScheduleJob

func (m *SchedulerModule) ScheduleJob(job Job) (string, error)

ScheduleJob schedules a new job

func (*SchedulerModule) ScheduleRecurring

func (m *SchedulerModule) ScheduleRecurring(name string, cronExpr string, jobFunc JobFunc) (string, error)

ScheduleRecurring schedules a recurring job using a cron expression

func (*SchedulerModule) Start

func (m *SchedulerModule) Start(ctx context.Context) error

Start performs startup logic for the module

func (*SchedulerModule) Stop

func (m *SchedulerModule) Stop(ctx context.Context) error

Stop performs shutdown logic for the module

type SchedulerOption

type SchedulerOption func(*Scheduler)

SchedulerOption defines a function that can configure a scheduler

func WithCheckInterval

func WithCheckInterval(interval time.Duration) SchedulerOption

WithCheckInterval sets how often to check for scheduled jobs

func WithEventEmitter added in v0.1.2

func WithEventEmitter(emitter EventEmitter) SchedulerOption

WithEventEmitter sets the event emitter

func WithLogger

func WithLogger(logger modular.Logger) SchedulerOption

WithLogger sets the logger

func WithQueueSize

func WithQueueSize(size int) SchedulerOption

WithQueueSize sets the job queue size

func WithWorkerCount

func WithWorkerCount(count int) SchedulerOption

WithWorkerCount sets the number of workers

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL