cron

package
v0.8.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 3, 2026 License: Apache-2.0 Imports: 17 Imported by: 0

README

Cron Job Extension

A production-grade cron job scheduler for Forge with distributed support, execution history, metrics, and web UI.

Features

  • Flexible Scheduling: Use standard cron expressions with seconds precision
  • Multiple Job Types: Code-based handlers or shell commands
  • Execution History: Track all job executions with detailed status
  • Retry Logic: Automatic retries with exponential backoff
  • Concurrency Control: Configurable worker pool for job execution
  • Storage Backends: In-memory, database, or Redis
  • Distributed Mode: Leader election and distributed locking (requires consensus extension)
  • REST API: Full HTTP API for job management
  • Metrics: Prometheus metrics for observability
  • Config-Based Jobs: Define jobs in YAML/JSON files
  • Web UI: Dashboard for job management and monitoring

Quick Start

Simple Mode (Single Instance)
package main

import (
    "context"
    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/cron"
)

func main() {
    app := forge.New()
    
    // Register cron extension
    cronExt := cron.NewExtension(
        cron.WithMode("simple"),
        cron.WithStorage("memory"),
        cron.WithMaxConcurrentJobs(5),
    )
    app.RegisterExtension(cronExt)
    
    // Register job handlers
    app.AfterRegister(func(ctx context.Context) error {
        registry := forge.MustResolve[*cron.JobRegistry](app.Container(), "cron.registry")
        
        // Register a handler
        registry.Register("sendReport", func(ctx context.Context, job *cron.Job) error {
            // Your job logic here
            return nil
        })
        
        // Create a job programmatically
        scheduler := forge.MustResolve[cron.Scheduler](app.Container(), "cron.scheduler")
        scheduler.AddJob(&cron.Job{
            ID:          "daily-report",
            Name:        "Daily Report",
            Schedule:    "0 9 * * *", // Every day at 9 AM
            HandlerName: "sendReport",
            Enabled:     true,
        })
        
        return nil
    })
    
    app.Run(context.Background())
}
Configuration File

Create jobs.yaml:

jobs:
  - id: cleanup-temp
    name: Cleanup Temporary Files
    schedule: "0 2 * * *"  # Daily at 2 AM
    command: /usr/local/bin/cleanup.sh
    timeout: 10m
    maxRetries: 3
    enabled: true
    
  - id: backup-database
    name: Database Backup
    schedule: "0 0 * * *"  # Daily at midnight
    command: /usr/local/bin/backup-db.sh
    args:
      - --compress
      - --output=/backups
    timeout: 30m
    enabled: true

Load jobs from config:

app.AfterRegister(func(ctx context.Context) error {
    loader := cron.NewJobLoader(app.Logger(), registry)
    jobs, err := loader.LoadFromFile(ctx, "jobs.yaml")
    if err != nil {
        return err
    }
    
    for _, job := range jobs {
        scheduler.AddJob(job)
    }
    
    return nil
})

Configuration

extensions:
  cron:
    mode: simple                    # "simple" or "distributed"
    storage: memory                 # "memory", "database", or "redis"
    max_concurrent_jobs: 10
    default_timeout: 5m
    default_timezone: UTC
    max_retries: 3
    retry_backoff: 1s
    retry_multiplier: 2.0
    max_retry_backoff: 30s
    history_retention_days: 30
    enable_api: true
    api_prefix: /api/cron
    enable_web_ui: true
    enable_metrics: true

Cron Schedule Format

The scheduler supports standard cron expressions with optional seconds:

┌────────────── second (0-59) [optional]
│ ┌──────────── minute (0-59)
│ │ ┌────────── hour (0-23)
│ │ │ ┌──────── day of month (1-31)
│ │ │ │ ┌────── month (1-12 or JAN-DEC)
│ │ │ │ │ ┌──── day of week (0-6 or SUN-SAT)
│ │ │ │ │ │
│ │ │ │ │ │
* * * * * *

Examples:

  • 0 9 * * * - Every day at 9 AM
  • */15 * * * * - Every 15 minutes
  • 0 0 * * 0 - Every Sunday at midnight
  • 0 9 * * 1-5 - Weekdays at 9 AM
  • 30 2 1 * * - 2:30 AM on the first of every month

REST API

Job Management
  • GET /api/cron/jobs - List all jobs
  • POST /api/cron/jobs - Create a job
  • GET /api/cron/jobs/:id - Get job details
  • PUT /api/cron/jobs/:id - Update a job
  • DELETE /api/cron/jobs/:id - Delete a job
  • POST /api/cron/jobs/:id/trigger - Manually trigger a job
  • POST /api/cron/jobs/:id/enable - Enable a job
  • POST /api/cron/jobs/:id/disable - Disable a job
Execution History
  • GET /api/cron/executions - List all executions
  • GET /api/cron/jobs/:id/executions - Get job execution history
  • GET /api/cron/executions/:id - Get execution details
Statistics
  • GET /api/cron/stats - Get scheduler statistics
  • GET /api/cron/jobs/:id/stats - Get job statistics
Health
  • GET /api/cron/health - Health check

Distributed Mode

For multi-instance deployments with leader election:

extensions:
  cron:
    mode: distributed
    storage: redis
    redis_connection: default
    leader_election: true
    consensus_extension: consensus
    heartbeat_interval: 5s
    lock_ttl: 30s

Distributed mode ensures only one instance schedules jobs, with automatic failover.

Metrics

Prometheus metrics exposed:

  • cron_jobs_total - Total registered jobs
  • cron_executions_total - Total executions by status
  • cron_execution_duration_seconds - Execution duration histogram
  • cron_scheduler_lag_seconds - Lag between scheduled and actual time
  • cron_executor_queue_size - Current executor queue size
  • cron_leader_status - Leader status (0=follower, 1=leader)

Web UI

Access the web UI at /cron/ui (configurable) to:

  • View all scheduled jobs
  • Monitor execution history
  • Manually trigger jobs
  • Enable/disable jobs
  • View real-time statistics

Advanced Usage

Retry Configuration

Configure retries per job:

job := &cron.Job{
    ID:          "retry-example",
    Name:        "Job with Custom Retry",
    Schedule:    "*/5 * * * *",
    HandlerName: "unreliableTask",
    MaxRetries:  5,
    Timeout:     2 * time.Minute,
    Enabled:     true,
}
Job Middleware

Add middleware to job handlers:

// Logging middleware
loggingMiddleware := cron.CreateLoggingMiddleware(func(ctx context.Context, job *cron.Job, err error) {
    logger.Info("Job executed",
        "job_id", job.ID,
        "duration", time.Since(start),
        "error", err,
    )
})

// Panic recovery middleware
panicMiddleware := cron.CreatePanicRecoveryMiddleware(func(ctx context.Context, job *cron.Job, recovered interface{}) {
    logger.Error("Job panicked", "panic", recovered)
})

// Register with middleware
registry.RegisterWithMiddleware("myJob", handler, loggingMiddleware, panicMiddleware)
Programmatic Job Management
// Get extension instance
cronExt := app.Extension("cron").(*cron.Extension)

// Create job
job := &cron.Job{
    ID:       "dynamic-job",
    Name:     "Dynamically Created Job",
    Schedule: "0 * * * *",
    Command:  "/usr/local/bin/script.sh",
    Enabled:  true,
}
cronExt.CreateJob(ctx, job)

// Update job
update := &cron.JobUpdate{
    Enabled: forge.Ptr(false),
}
cronExt.UpdateJob(ctx, "dynamic-job", update)

// Delete job
cronExt.DeleteJob(ctx, "dynamic-job")

// Trigger job
executionID, err := cronExt.TriggerJob(ctx, "my-job")

Security Considerations

  • Command injection: Validate all command inputs
  • API authentication: Integrate with auth extension
  • Rate limiting: Limit manual job triggers
  • Audit logging: Track all job modifications
  • Secrets: Use environment variables, never hardcode

License

See the main Forge license.

Documentation

Index

Constants

View Source
const (
	// ErrCodeJobNotFound indicates a job with the given ID was not found
	ErrCodeJobNotFound = "CRON_JOB_NOT_FOUND"

	// ErrCodeJobAlreadyExists indicates a job with the given ID already exists
	ErrCodeJobAlreadyExists = "CRON_JOB_ALREADY_EXISTS"

	// ErrCodeInvalidSchedule indicates the cron expression is invalid
	ErrCodeInvalidSchedule = "CRON_INVALID_SCHEDULE"

	// ErrCodeInvalidConfig indicates the configuration is invalid
	ErrCodeInvalidConfig = "CRON_INVALID_CONFIG"

	// ErrCodeJobDisabled indicates an operation was attempted on a disabled job
	ErrCodeJobDisabled = "CRON_JOB_DISABLED"

	// ErrCodeHandlerNotFound indicates a handler with the given name was not registered
	ErrCodeHandlerNotFound = "CRON_HANDLER_NOT_FOUND"

	// ErrCodeExecutionNotFound indicates an execution with the given ID was not found
	ErrCodeExecutionNotFound = "CRON_EXECUTION_NOT_FOUND"

	// ErrCodeExecutionTimeout indicates a job execution exceeded its timeout
	ErrCodeExecutionTimeout = "CRON_EXECUTION_TIMEOUT"

	// ErrCodeExecutionCancelled indicates a job execution was cancelled
	ErrCodeExecutionCancelled = "CRON_EXECUTION_CANCELLED"

	// ErrCodeMaxRetriesExceeded indicates a job exceeded its maximum retry attempts
	ErrCodeMaxRetriesExceeded = "CRON_MAX_RETRIES_EXCEEDED"

	// ErrCodeSchedulerNotRunning indicates the scheduler is not running
	ErrCodeSchedulerNotRunning = "CRON_SCHEDULER_NOT_RUNNING"

	// ErrCodeNotLeader indicates the operation requires leader status in distributed mode
	ErrCodeNotLeader = "CRON_NOT_LEADER"

	// ErrCodeStorageError indicates a storage operation failed
	ErrCodeStorageError = "CRON_STORAGE_ERROR"

	// ErrCodeLockAcquisitionFailed indicates failed to acquire distributed lock
	ErrCodeLockAcquisitionFailed = "CRON_LOCK_ACQUISITION_FAILED"

	// ErrCodeJobRunning indicates the job is already running
	ErrCodeJobRunning = "CRON_JOB_RUNNING"

	// ErrCodeInvalidJobType indicates the job type (handler vs command) is invalid
	ErrCodeInvalidJobType = "CRON_INVALID_JOB_TYPE"
)

Error codes for the cron extension.

Variables

View Source
var (
	// ErrJobNotFound is returned when a job is not found
	ErrJobNotFound = errors.New("job not found")

	// ErrJobAlreadyExists is returned when trying to create a job with an existing ID
	ErrJobAlreadyExists = errors.New("job already exists")

	// ErrInvalidSchedule is returned when a cron expression is invalid
	ErrInvalidSchedule = errors.New("invalid cron schedule")

	// ErrInvalidConfig is returned when the configuration is invalid
	ErrInvalidConfig = errors.New("invalid configuration")

	// ErrJobDisabled is returned when attempting to trigger a disabled job
	ErrJobDisabled = errors.New("job is disabled")

	// ErrHandlerNotFound is returned when a handler is not registered
	ErrHandlerNotFound = errors.New("handler not found")

	// ErrExecutionNotFound is returned when an execution is not found
	ErrExecutionNotFound = errors.New("execution not found")

	// ErrExecutionTimeout is returned when a job execution times out
	ErrExecutionTimeout = errors.New("execution timeout")

	// ErrExecutionCancelled is returned when a job execution is cancelled
	ErrExecutionCancelled = errors.New("execution cancelled")

	// ErrMaxRetriesExceeded is returned when max retries are exceeded
	ErrMaxRetriesExceeded = errors.New("max retries exceeded")

	// ErrSchedulerNotRunning is returned when the scheduler is not running
	ErrSchedulerNotRunning = errors.New("scheduler not running")

	// ErrNotLeader is returned when operation requires leader in distributed mode
	ErrNotLeader = errors.New("not leader")

	// ErrStorageError is returned when a storage operation fails
	ErrStorageError = errors.New("storage error")

	// ErrLockAcquisitionFailed is returned when failed to acquire distributed lock
	ErrLockAcquisitionFailed = errors.New("lock acquisition failed")

	// ErrJobRunning is returned when trying to start an already running job
	ErrJobRunning = errors.New("job is already running")

	// ErrInvalidJobType is returned when job type is invalid
	ErrInvalidJobType = errors.New("invalid job type: must have either handler or command")

	// ErrInvalidData is returned when invalid data is received from storage
	ErrInvalidData = errors.New("invalid data received from storage")
)

Functions

func CreateLoggingMiddleware

func CreateLoggingMiddleware(log func(ctx context.Context, job *Job, err error)) func(JobHandler) JobHandler

CreateLoggingMiddleware creates middleware that logs job execution.

func CreatePanicRecoveryMiddleware

func CreatePanicRecoveryMiddleware(onPanic func(ctx context.Context, job *Job, recovered interface{})) func(JobHandler) JobHandler

CreatePanicRecoveryMiddleware creates middleware that recovers from panics.

func CreateTimeoutMiddleware

func CreateTimeoutMiddleware() func(JobHandler) JobHandler

CreateTimeoutMiddleware creates middleware that enforces a timeout.

func NewExtension

func NewExtension(opts ...ConfigOption) forge.Extension

NewExtension creates a new cron extension with functional options.

func NewExtensionWithConfig

func NewExtensionWithConfig(config Config) forge.Extension

NewExtensionWithConfig creates a new cron extension with a complete config.

Types

type Config

type Config struct {
	// Mode specifies the scheduler mode: "simple" or "distributed"
	Mode string `json:"mode" mapstructure:"mode" yaml:"mode"`

	// Storage backend: "memory", "database", "redis"
	Storage string `json:"storage" mapstructure:"storage" yaml:"storage"`

	// DatabaseConnection is the name of the database connection to use (when Storage="database")
	DatabaseConnection string `json:"databaseConnection,omitempty" mapstructure:"database_connection" yaml:"database_connection,omitempty"`

	// RedisConnection is the name of the Redis connection to use (when Storage="redis")
	RedisConnection string `json:"redisConnection,omitempty" mapstructure:"redis_connection" yaml:"redis_connection,omitempty"`

	// Scheduler settings
	MaxConcurrentJobs int           `json:"maxConcurrentJobs" mapstructure:"max_concurrent_jobs" yaml:"max_concurrent_jobs"`
	DefaultTimeout    time.Duration `json:"defaultTimeout" mapstructure:"default_timeout" yaml:"default_timeout"`
	DefaultTimezone   string        `json:"defaultTimezone" mapstructure:"default_timezone" yaml:"default_timezone"`

	// Retry policy
	MaxRetries      int           `json:"maxRetries" mapstructure:"max_retries" yaml:"max_retries"`
	RetryBackoff    time.Duration `json:"retryBackoff" mapstructure:"retry_backoff" yaml:"retry_backoff"`
	RetryMultiplier float64       `json:"retryMultiplier" mapstructure:"retry_multiplier" yaml:"retry_multiplier"`
	MaxRetryBackoff time.Duration `json:"maxRetryBackoff" mapstructure:"max_retry_backoff" yaml:"max_retry_backoff"`

	// History retention
	HistoryRetentionDays int `json:"historyRetentionDays" mapstructure:"history_retention_days" yaml:"history_retention_days"`
	MaxHistoryRecords    int `json:"maxHistoryRecords" mapstructure:"max_history_records" yaml:"max_history_records"`

	// Distributed mode settings
	LeaderElection     bool          `json:"leaderElection" mapstructure:"leader_election" yaml:"leader_election"`
	ConsensusExtension string        `json:"consensusExtension,omitempty" mapstructure:"consensus_extension" yaml:"consensus_extension,omitempty"`
	HeartbeatInterval  time.Duration `json:"heartbeatInterval" mapstructure:"heartbeat_interval" yaml:"heartbeat_interval"`
	LockTTL            time.Duration `json:"lockTTL" mapstructure:"lock_ttl" yaml:"lock_ttl"`

	// API settings
	EnableAPI   bool   `json:"enableApi" mapstructure:"enable_api" yaml:"enable_api"`
	APIPrefix   string `json:"apiPrefix" mapstructure:"api_prefix" yaml:"api_prefix"`
	EnableWebUI bool   `json:"enableWebUi" mapstructure:"enable_web_ui" yaml:"enable_web_ui"`

	// Monitoring
	EnableMetrics bool `json:"enableMetrics" mapstructure:"enable_metrics" yaml:"enable_metrics"`

	// Job loading
	ConfigFile string `json:"configFile,omitempty" mapstructure:"config_file" yaml:"config_file,omitempty"`

	// Graceful shutdown timeout
	ShutdownTimeout time.Duration `json:"shutdownTimeout" mapstructure:"shutdown_timeout" yaml:"shutdown_timeout"`

	// Node ID for distributed mode (auto-generated if empty)
	NodeID string `json:"nodeId,omitempty" mapstructure:"node_id" yaml:"node_id,omitempty"`

	// Config loading flags (not serialized)
	RequireConfig bool `json:"-" mapstructure:"-" yaml:"-"`
}

Config contains configuration for the cron extension.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns default cron configuration.

func (*Config) GetMode added in v0.7.5

func (c *Config) GetMode() SchedulerMode

GetMode returns the scheduler mode as a typed SchedulerMode.

func (*Config) GetStorage added in v0.7.5

func (c *Config) GetStorage() StorageType

GetStorage returns the storage type as a typed StorageType.

func (*Config) Validate

func (c *Config) Validate() error

Validate validates the configuration.

type ConfigOption

type ConfigOption func(*Config)

ConfigOption is a functional option for Config.

func WithAPI

func WithAPI(enable bool) ConfigOption

WithAPI enables/disables the REST API.

func WithAPIPrefix

func WithAPIPrefix(prefix string) ConfigOption

WithAPIPrefix sets the API prefix.

func WithConfig

func WithConfig(config Config) ConfigOption

WithConfig sets the complete config.

func WithConfigFile

func WithConfigFile(path string) ConfigOption

WithConfigFile sets the jobs configuration file path.

func WithConsensusExtension

func WithConsensusExtension(name string) ConfigOption

WithConsensusExtension sets the consensus extension name.

func WithDatabaseConnection

func WithDatabaseConnection(name string) ConfigOption

WithDatabaseConnection sets the database connection name.

func WithDefaultTimeout

func WithDefaultTimeout(timeout time.Duration) ConfigOption

WithDefaultTimeout sets the default job timeout.

func WithDefaultTimezone

func WithDefaultTimezone(tz string) ConfigOption

WithDefaultTimezone sets the default timezone.

func WithHeartbeatInterval

func WithHeartbeatInterval(interval time.Duration) ConfigOption

WithHeartbeatInterval sets the heartbeat interval for distributed mode.

func WithHistoryRetention

func WithHistoryRetention(days int) ConfigOption

WithHistoryRetention sets the history retention in days.

func WithLeaderElection

func WithLeaderElection(enable bool) ConfigOption

WithLeaderElection enables/disables leader election.

func WithLockTTL

func WithLockTTL(ttl time.Duration) ConfigOption

WithLockTTL sets the lock TTL for distributed locking.

func WithMaxConcurrentJobs

func WithMaxConcurrentJobs(max int) ConfigOption

WithMaxConcurrentJobs sets the maximum concurrent jobs.

func WithMaxHistoryRecords

func WithMaxHistoryRecords(max int) ConfigOption

WithMaxHistoryRecords sets the maximum history records to keep.

func WithMaxRetries

func WithMaxRetries(max int) ConfigOption

WithMaxRetries sets the maximum retry attempts.

func WithMaxRetryBackoff

func WithMaxRetryBackoff(max time.Duration) ConfigOption

WithMaxRetryBackoff sets the maximum retry backoff duration.

func WithMetrics

func WithMetrics(enable bool) ConfigOption

WithMetrics enables/disables metrics collection.

func WithMode

func WithMode(mode SchedulerMode) ConfigOption

WithMode sets the scheduler mode using a SchedulerMode constant.

func WithModeString added in v0.7.5

func WithModeString(mode string) ConfigOption

WithModeString sets the scheduler mode using a string value. Prefer WithMode with typed constants for better type safety.

func WithNodeID

func WithNodeID(id string) ConfigOption

WithNodeID sets the node ID for distributed mode.

func WithRedisConnection

func WithRedisConnection(name string) ConfigOption

WithRedisConnection sets the Redis connection name.

func WithRequireConfig

func WithRequireConfig(require bool) ConfigOption

WithRequireConfig requires config from ConfigManager.

func WithRetryBackoff

func WithRetryBackoff(backoff time.Duration) ConfigOption

WithRetryBackoff sets the retry backoff duration.

func WithRetryMultiplier

func WithRetryMultiplier(multiplier float64) ConfigOption

WithRetryMultiplier sets the retry multiplier for exponential backoff.

func WithShutdownTimeout

func WithShutdownTimeout(timeout time.Duration) ConfigOption

WithShutdownTimeout sets the graceful shutdown timeout.

func WithStorage

func WithStorage(storage StorageType) ConfigOption

WithStorage sets the storage backend using a StorageType constant.

func WithStorageString added in v0.7.5

func WithStorageString(storage string) ConfigOption

WithStorageString sets the storage backend using a string value. Prefer WithStorage with typed constants for better type safety.

func WithWebUI

func WithWebUI(enable bool) ConfigOption

WithWebUI enables/disables the web UI.

type ExecutionFilter

type ExecutionFilter struct {
	JobID     string            `json:"jobId,omitempty"`
	Status    []ExecutionStatus `json:"status,omitempty"`
	StartedAt *time.Time        `json:"startedAt,omitempty"`
	Before    *time.Time        `json:"before,omitempty"`
	After     *time.Time        `json:"after,omitempty"`
	NodeID    string            `json:"nodeId,omitempty"`
	Limit     int               `json:"limit,omitempty"`
	Offset    int               `json:"offset,omitempty"`
	OrderBy   string            `json:"orderBy,omitempty"`  // Field to order by
	OrderDir  string            `json:"orderDir,omitempty"` // "asc" or "desc"
}

ExecutionFilter is used to filter job executions when querying.

type ExecutionStatus

type ExecutionStatus string

ExecutionStatus represents the status of a job execution.

const (
	// ExecutionStatusPending indicates the execution is queued but not started
	ExecutionStatusPending ExecutionStatus = "pending"

	// ExecutionStatusRunning indicates the execution is in progress
	ExecutionStatusRunning ExecutionStatus = "running"

	// ExecutionStatusSuccess indicates the execution completed successfully
	ExecutionStatusSuccess ExecutionStatus = "success"

	// ExecutionStatusFailed indicates the execution failed
	ExecutionStatusFailed ExecutionStatus = "failed"

	// ExecutionStatusCancelled indicates the execution was cancelled
	ExecutionStatusCancelled ExecutionStatus = "cancelled"

	// ExecutionStatusTimeout indicates the execution exceeded the timeout
	ExecutionStatusTimeout ExecutionStatus = "timeout"

	// ExecutionStatusRetrying indicates the execution is being retried
	ExecutionStatusRetrying ExecutionStatus = "retrying"
)

type Executor

type Executor struct {
	// contains filtered or unexported fields
}

Executor handles job execution with concurrency control, timeouts, and retries.

func NewExecutor

func NewExecutor(config Config, storage Storage, registry *JobRegistry, logger forge.Logger, metrics forge.Metrics, nodeID string) *Executor

NewExecutor creates a new job executor.

func (*Executor) Execute

func (e *Executor) Execute(ctx context.Context, job *Job) (string, error)

Execute executes a job asynchronously. Returns the execution ID and any immediate errors.

func (*Executor) GetRunningCount

func (e *Executor) GetRunningCount() int

GetRunningCount returns the number of currently running jobs.

func (*Executor) GetRunningJobs

func (e *Executor) GetRunningJobs() []string

GetRunningJobs returns IDs of all currently running jobs.

func (*Executor) IsJobRunning

func (e *Executor) IsJobRunning(jobID string) bool

IsJobRunning checks if a job is currently running.

func (*Executor) Shutdown

func (e *Executor) Shutdown(ctx context.Context) error

Shutdown gracefully shuts down the executor. It waits for all running jobs to complete or until the context is cancelled.

type Extension

type Extension struct {
	*forge.BaseExtension
	// contains filtered or unexported fields
}

Extension implements forge.Extension and forge.RunnableExtension for cron functionality.

func (*Extension) CreateJob

func (e *Extension) CreateJob(ctx context.Context, job *Job) error

CreateJob creates a new job and adds it to the scheduler.

func (*Extension) DeleteJob

func (e *Extension) DeleteJob(ctx context.Context, jobID string) error

DeleteJob deletes a job.

func (*Extension) Dependencies

func (e *Extension) Dependencies() []string

Dependencies returns the names of extensions this extension depends on.

func (*Extension) GetExecutor

func (e *Extension) GetExecutor() *Executor

GetExecutor returns the executor instance.

func (*Extension) GetRegistry

func (e *Extension) GetRegistry() *JobRegistry

GetRegistry returns the job registry. This is useful for registering job handlers.

func (*Extension) GetScheduler

func (e *Extension) GetScheduler() Scheduler

GetScheduler returns the scheduler instance. This is useful for registering jobs programmatically.

func (*Extension) GetStats

func (e *Extension) GetStats(ctx context.Context) (*SchedulerStats, error)

GetStats returns scheduler statistics.

func (*Extension) GetStorage

func (e *Extension) GetStorage() Storage

GetStorage returns the storage instance.

func (*Extension) Health

func (e *Extension) Health(ctx context.Context) error

Health checks if the cron extension is healthy.

func (*Extension) Register

func (e *Extension) Register(app forge.App) error

Register registers the cron extension with the app.

func (*Extension) Run

func (e *Extension) Run(ctx context.Context) error

Run implements forge.RunnableExtension. This is called after the app starts to run any long-running processes.

func (*Extension) Shutdown

func (e *Extension) Shutdown(ctx context.Context) error

Shutdown implements forge.RunnableExtension. This is called before Stop to gracefully shutdown long-running processes.

func (*Extension) Start

func (e *Extension) Start(ctx context.Context) error

Start starts the cron extension.

func (*Extension) Stop

func (e *Extension) Stop(ctx context.Context) error

Stop stops the cron extension.

func (*Extension) TriggerJob

func (e *Extension) TriggerJob(ctx context.Context, jobID string) (string, error)

TriggerJob manually triggers a job execution.

func (*Extension) UpdateJob

func (e *Extension) UpdateJob(ctx context.Context, jobID string, update *JobUpdate) error

UpdateJob updates an existing job.

type HistoryTracker

type HistoryTracker struct {
	// contains filtered or unexported fields
}

HistoryTracker manages job execution history with automatic cleanup.

func NewHistoryTracker

func NewHistoryTracker(config Config, storage Storage, logger forge.Logger) *HistoryTracker

NewHistoryTracker creates a new history tracker.

func (*HistoryTracker) DeleteJobExecutions

func (h *HistoryTracker) DeleteJobExecutions(ctx context.Context, jobID string) (int64, error)

DeleteJobExecutions deletes all executions for a specific job.

func (*HistoryTracker) DeleteOldExecutions

func (h *HistoryTracker) DeleteOldExecutions(ctx context.Context, before time.Time) (int64, error)

DeleteOldExecutions manually triggers cleanup of old executions.

func (*HistoryTracker) GetAllJobStats

func (h *HistoryTracker) GetAllJobStats(ctx context.Context) (map[string]*JobStats, error)

GetAllJobStats retrieves statistics for all jobs.

func (*HistoryTracker) GetAverageDuration

func (h *HistoryTracker) GetAverageDuration(ctx context.Context, jobID string) (time.Duration, error)

GetAverageDuration calculates the average execution duration for a job.

func (*HistoryTracker) GetExecution

func (h *HistoryTracker) GetExecution(ctx context.Context, executionID string) (*JobExecution, error)

GetExecution retrieves a single execution by ID.

func (*HistoryTracker) GetExecutionCount

func (h *HistoryTracker) GetExecutionCount(ctx context.Context, filter *ExecutionFilter) (int64, error)

GetExecutionCount returns the total number of executions matching the filter.

func (*HistoryTracker) GetExecutionTrend

func (h *HistoryTracker) GetExecutionTrend(ctx context.Context, jobID string, duration time.Duration) (map[string]int, error)

GetExecutionTrend calculates execution trends over time.

func (*HistoryTracker) GetExecutions

func (h *HistoryTracker) GetExecutions(ctx context.Context, filter *ExecutionFilter) ([]*JobExecution, error)

GetExecutions retrieves execution history with filtering.

func (*HistoryTracker) GetExecutionsByDateRange

func (h *HistoryTracker) GetExecutionsByDateRange(ctx context.Context, after, before time.Time, limit int) ([]*JobExecution, error)

GetExecutionsByDateRange retrieves executions within a date range.

func (*HistoryTracker) GetExecutionsByStatus

func (h *HistoryTracker) GetExecutionsByStatus(ctx context.Context, status ExecutionStatus, limit int) ([]*JobExecution, error)

GetExecutionsByStatus retrieves executions with a specific status.

func (*HistoryTracker) GetFailedExecutions

func (h *HistoryTracker) GetFailedExecutions(ctx context.Context, limit int) ([]*JobExecution, error)

GetFailedExecutions retrieves failed executions.

func (*HistoryTracker) GetJobExecutions

func (h *HistoryTracker) GetJobExecutions(ctx context.Context, jobID string, limit int) ([]*JobExecution, error)

GetJobExecutions retrieves executions for a specific job.

func (*HistoryTracker) GetJobStats

func (h *HistoryTracker) GetJobStats(ctx context.Context, jobID string) (*JobStats, error)

GetJobStats retrieves aggregated statistics for a job. Results are cached for 1 minute to reduce storage queries.

func (*HistoryTracker) GetLastExecution

func (h *HistoryTracker) GetLastExecution(ctx context.Context, jobID string) (*JobExecution, error)

GetLastExecution retrieves the most recent execution for a job.

func (*HistoryTracker) GetLastSuccessfulExecution

func (h *HistoryTracker) GetLastSuccessfulExecution(ctx context.Context, jobID string) (*JobExecution, error)

GetLastSuccessfulExecution retrieves the most recent successful execution for a job.

func (*HistoryTracker) GetRecentExecutions

func (h *HistoryTracker) GetRecentExecutions(ctx context.Context, limit int) ([]*JobExecution, error)

GetRecentExecutions retrieves the most recent executions across all jobs.

func (*HistoryTracker) GetSuccessRate

func (h *HistoryTracker) GetSuccessRate(ctx context.Context, jobID string) (float64, error)

GetSuccessRate calculates the success rate for a job.

func (*HistoryTracker) Start

func (h *HistoryTracker) Start(ctx context.Context) error

Start starts the history tracker and its cleanup routine.

func (*HistoryTracker) Stop

func (h *HistoryTracker) Stop(ctx context.Context) error

Stop stops the history tracker.

type Job

type Job struct {
	// ID is the unique identifier for this job
	ID string `json:"id" bson:"_id,omitempty"`

	// Name is a human-readable name for the job
	Name string `json:"name" bson:"name"`

	// Schedule is the cron expression defining when the job runs
	Schedule string `json:"schedule" bson:"schedule"`

	// Handler is the function to execute (for code-based jobs)
	// This field is not persisted to storage
	Handler JobHandler `json:"-" bson:"-"`

	// HandlerName is the name of the registered handler (for code-based jobs)
	HandlerName string `json:"handlerName,omitempty" bson:"handler_name,omitempty"`

	// Command is the shell command to execute (for command-based jobs)
	Command string `json:"command,omitempty" bson:"command,omitempty"`

	// Args are arguments for the command
	Args []string `json:"args,omitempty" bson:"args,omitempty"`

	// Env are environment variables for the command
	Env []string `json:"env,omitempty" bson:"env,omitempty"`

	// WorkingDir is the working directory for command execution
	WorkingDir string `json:"workingDir,omitempty" bson:"working_dir,omitempty"`

	// Payload is arbitrary data passed to the job handler
	Payload map[string]interface{} `json:"payload,omitempty" bson:"payload,omitempty"`

	// Enabled indicates whether the job is active
	Enabled bool `json:"enabled" bson:"enabled"`

	// Timezone specifies the timezone for schedule evaluation
	Timezone *time.Location `json:"timezone,omitempty" bson:"timezone,omitempty"`

	// MaxRetries is the maximum number of retry attempts on failure
	MaxRetries int `json:"maxRetries" bson:"max_retries"`

	// Timeout is the maximum execution time for the job
	Timeout time.Duration `json:"timeout" bson:"timeout"`

	// Metadata contains arbitrary key-value metadata
	Metadata map[string]string `json:"metadata,omitempty" bson:"metadata,omitempty"`

	// Tags for categorizing jobs
	Tags []string `json:"tags,omitempty" bson:"tags,omitempty"`

	// CreatedAt is when the job was created
	CreatedAt time.Time `json:"createdAt" bson:"created_at"`

	// UpdatedAt is when the job was last updated
	UpdatedAt time.Time `json:"updatedAt" bson:"updated_at"`

	// LastExecutionAt is when the job last executed
	LastExecutionAt *time.Time `json:"lastExecutionAt,omitempty" bson:"last_execution_at,omitempty"`

	// NextExecutionAt is when the job will next execute
	NextExecutionAt *time.Time `json:"nextExecutionAt,omitempty" bson:"next_execution_at,omitempty"`
}

Job represents a scheduled job.

type JobConfig

type JobConfig struct {
	ID          string                 `json:"id" yaml:"id"`
	Name        string                 `json:"name" yaml:"name"`
	Schedule    string                 `json:"schedule" yaml:"schedule"`
	HandlerName string                 `json:"handlerName,omitempty" yaml:"handlerName,omitempty"`
	Handler     string                 `json:"handler,omitempty" yaml:"handler,omitempty"` // Alias for HandlerName
	Command     string                 `json:"command,omitempty" yaml:"command,omitempty"`
	Args        []string               `json:"args,omitempty" yaml:"args,omitempty"`
	Env         []string               `json:"env,omitempty" yaml:"env,omitempty"`
	WorkingDir  string                 `json:"workingDir,omitempty" yaml:"workingDir,omitempty"`
	Payload     map[string]interface{} `json:"payload,omitempty" yaml:"payload,omitempty"`
	Enabled     bool                   `json:"enabled" yaml:"enabled"`
	Timezone    string                 `json:"timezone,omitempty" yaml:"timezone,omitempty"`
	MaxRetries  int                    `json:"maxRetries" yaml:"maxRetries"`
	Timeout     string                 `json:"timeout" yaml:"timeout"` // Duration string
	Metadata    map[string]string      `json:"metadata,omitempty" yaml:"metadata,omitempty"`
	Tags        []string               `json:"tags,omitempty" yaml:"tags,omitempty"`
}

JobConfig represents a job configuration from YAML/JSON.

type JobExecution

type JobExecution struct {
	// ID is the unique identifier for this execution
	ID string `json:"id" bson:"_id,omitempty"`

	// JobID is the ID of the job being executed
	JobID string `json:"jobId" bson:"job_id"`

	// JobName is a denormalized copy of the job name for convenience
	JobName string `json:"jobName" bson:"job_name"`

	// Status is the current status of the execution
	Status ExecutionStatus `json:"status" bson:"status"`

	// ScheduledAt is when the job was scheduled to run
	ScheduledAt time.Time `json:"scheduledAt" bson:"scheduled_at"`

	// StartedAt is when the execution actually started
	StartedAt time.Time `json:"startedAt" bson:"started_at"`

	// CompletedAt is when the execution finished (success or failure)
	CompletedAt *time.Time `json:"completedAt,omitempty" bson:"completed_at,omitempty"`

	// Error contains the error message if the execution failed
	Error string `json:"error,omitempty" bson:"error,omitempty"`

	// Output contains stdout/stderr from command execution
	Output string `json:"output,omitempty" bson:"output,omitempty"`

	// Retries is the number of retries attempted
	Retries int `json:"retries" bson:"retries"`

	// NodeID identifies which node executed the job (for distributed mode)
	NodeID string `json:"nodeId,omitempty" bson:"node_id,omitempty"`

	// Duration is how long the execution took
	Duration time.Duration `json:"duration" bson:"duration"`

	// Metadata contains arbitrary execution metadata
	Metadata map[string]string `json:"metadata,omitempty" bson:"metadata,omitempty"`
}

JobExecution represents a single execution of a job.

type JobHandler

type JobHandler func(ctx context.Context, job *Job) error

JobHandler is the function signature for code-based jobs. The handler receives a context (for cancellation) and the job definition. Return an error if the job fails.

type JobLoader

type JobLoader struct {
	// contains filtered or unexported fields
}

JobLoader loads jobs from configuration files.

func NewJobLoader

func NewJobLoader(logger forge.Logger, registry *JobRegistry) *JobLoader

NewJobLoader creates a new job loader.

func (*JobLoader) ExportToFile

func (l *JobLoader) ExportToFile(ctx context.Context, jobs []*Job, filePath string) error

ExportToFile exports jobs to a YAML or JSON file.

func (*JobLoader) LoadFromFile

func (l *JobLoader) LoadFromFile(ctx context.Context, filePath string) ([]*Job, error)

LoadFromFile loads jobs from a YAML or JSON file.

func (*JobLoader) LoadFromJSON

func (l *JobLoader) LoadFromJSON(ctx context.Context, data []byte) ([]*Job, error)

LoadFromJSON loads jobs from JSON data.

func (*JobLoader) LoadFromYAML

func (l *JobLoader) LoadFromYAML(ctx context.Context, data []byte) ([]*Job, error)

LoadFromYAML loads jobs from YAML data.

func (*JobLoader) ValidateJobConfig

func (l *JobLoader) ValidateJobConfig(config JobConfig) error

ValidateJobConfig validates a job configuration.

type JobRegistry

type JobRegistry struct {
	// contains filtered or unexported fields
}

JobRegistry manages registration of job handlers for code-based jobs. It allows you to register named handlers that can be referenced by name when creating jobs programmatically or from configuration.

func NewJobRegistry

func NewJobRegistry() *JobRegistry

NewJobRegistry creates a new job registry.

func (*JobRegistry) Clear

func (r *JobRegistry) Clear()

Clear removes all registered handlers.

func (*JobRegistry) Count

func (r *JobRegistry) Count() int

Count returns the number of registered handlers.

func (*JobRegistry) Get

func (r *JobRegistry) Get(name string) (JobHandler, error)

Get retrieves a job handler by name.

func (*JobRegistry) Has

func (r *JobRegistry) Has(name string) bool

Has checks if a handler is registered.

func (*JobRegistry) List

func (r *JobRegistry) List() []string

List returns all registered handler names.

func (*JobRegistry) MustRegister

func (r *JobRegistry) MustRegister(name string, handler JobHandler)

MustRegister registers a job handler and panics on error. Useful for registration at startup where failure should be fatal.

func (*JobRegistry) Register

func (r *JobRegistry) Register(name string, handler JobHandler) error

Register registers a job handler with the given name. The handler can then be referenced when creating jobs.

Example:

registry.Register("sendEmail", func(ctx context.Context, job *Job) error {
    // Send email logic
    return nil
})

func (*JobRegistry) RegisterBatch

func (r *JobRegistry) RegisterBatch(handlers map[string]JobHandler) error

RegisterBatch registers multiple handlers at once. If any registration fails, all successful registrations are rolled back.

func (*JobRegistry) RegisterWithMiddleware

func (r *JobRegistry) RegisterWithMiddleware(name string, handler JobHandler, middleware ...func(JobHandler) JobHandler) error

RegisterWithMiddleware registers a handler with middleware applied.

func (*JobRegistry) Unregister

func (r *JobRegistry) Unregister(name string) error

Unregister removes a job handler.

func (*JobRegistry) WrapHandler

func (r *JobRegistry) WrapHandler(name string, wrapper func(JobHandler) JobHandler) error

WrapHandler wraps a handler with additional functionality (middleware pattern). This is useful for adding logging, metrics, error handling, etc.

Example:

wrapped := registry.WrapHandler("myHandler", func(ctx context.Context, job *Job) error {
    start := time.Now()
    defer func() {
        logger.Info("job completed", "duration", time.Since(start))
    }()
    return originalHandler(ctx, job)
})

type JobStats

type JobStats struct {
	JobID            string        `json:"jobId"`
	JobName          string        `json:"jobName"`
	TotalExecutions  int64         `json:"totalExecutions"`
	SuccessCount     int64         `json:"successCount"`
	FailureCount     int64         `json:"failureCount"`
	CancelCount      int64         `json:"cancelCount"`
	TimeoutCount     int64         `json:"timeoutCount"`
	AverageDuration  time.Duration `json:"averageDuration"`
	MinDuration      time.Duration `json:"minDuration"`
	MaxDuration      time.Duration `json:"maxDuration"`
	LastSuccess      *time.Time    `json:"lastSuccess,omitempty"`
	LastFailure      *time.Time    `json:"lastFailure,omitempty"`
	LastExecution    *time.Time    `json:"lastExecution,omitempty"`
	NextExecution    *time.Time    `json:"nextExecution,omitempty"`
	SuccessRate      float64       `json:"successRate"` // Percentage
	RecentExecutions int           `json:"recentExecutions"`
}

JobStats contains aggregated statistics for a job.

type JobUpdate

type JobUpdate struct {
	Name        *string                `json:"name,omitempty"`
	Schedule    *string                `json:"schedule,omitempty"`
	HandlerName *string                `json:"handlerName,omitempty"`
	Command     *string                `json:"command,omitempty"`
	Args        []string               `json:"args,omitempty"`
	Env         []string               `json:"env,omitempty"`
	WorkingDir  *string                `json:"workingDir,omitempty"`
	Payload     map[string]interface{} `json:"payload,omitempty"`
	Enabled     *bool                  `json:"enabled,omitempty"`
	MaxRetries  *int                   `json:"maxRetries,omitempty"`
	Timeout     *time.Duration         `json:"timeout,omitempty"`
	Metadata    map[string]string      `json:"metadata,omitempty"`
	Tags        []string               `json:"tags,omitempty"`
}

JobUpdate represents fields that can be updated on a job.

type JobsConfig

type JobsConfig struct {
	Jobs []JobConfig `json:"jobs" yaml:"jobs"`
}

JobsConfig represents a jobs configuration file.

type MetricsCollector

type MetricsCollector struct {
	// contains filtered or unexported fields
}

MetricsCollector collects and exposes metrics for the cron extension.

func NewMetricsCollector

func NewMetricsCollector(metrics forge.Metrics, logger forge.Logger) *MetricsCollector

NewMetricsCollector creates a new metrics collector.

func (*MetricsCollector) ExportMetrics

func (m *MetricsCollector) ExportMetrics(ctx context.Context) error

ExportMetrics exports metrics to the provided metrics system. This can be called periodically to update gauge metrics.

func (*MetricsCollector) GetAverageExecutionDuration

func (m *MetricsCollector) GetAverageExecutionDuration() time.Duration

GetAverageExecutionDuration returns the average execution duration.

func (*MetricsCollector) GetAverageSchedulerLag

func (m *MetricsCollector) GetAverageSchedulerLag() time.Duration

GetAverageSchedulerLag returns the average scheduler lag.

func (*MetricsCollector) GetExecutionsTotal

func (m *MetricsCollector) GetExecutionsTotal() map[string]int64

GetExecutionsTotal returns the total executions by status.

func (*MetricsCollector) GetJobsTotal

func (m *MetricsCollector) GetJobsTotal() int64

GetJobsTotal returns the total number of registered jobs.

func (*MetricsCollector) GetLeaderStatus

func (m *MetricsCollector) GetLeaderStatus() int64

GetLeaderStatus returns the current leader status.

func (*MetricsCollector) GetMetricsSummary

func (m *MetricsCollector) GetMetricsSummary() map[string]interface{}

GetMetricsSummary returns a summary of all metrics.

func (*MetricsCollector) GetQueueSize

func (m *MetricsCollector) GetQueueSize() int64

GetQueueSize returns the current executor queue size.

func (*MetricsCollector) RecordExecution

func (m *MetricsCollector) RecordExecution(jobID, jobName string, status ExecutionStatus, duration time.Duration)

RecordExecution records a job execution completion.

func (*MetricsCollector) RecordJobRegistered

func (m *MetricsCollector) RecordJobRegistered()

RecordJobRegistered records that a job was registered.

func (*MetricsCollector) RecordJobUnregistered

func (m *MetricsCollector) RecordJobUnregistered()

RecordJobUnregistered records that a job was unregistered.

func (*MetricsCollector) RecordLeaderStatus

func (m *MetricsCollector) RecordLeaderStatus(isLeader bool)

RecordLeaderStatus records the leader status (0 = follower, 1 = leader).

func (*MetricsCollector) RecordQueueSize

func (m *MetricsCollector) RecordQueueSize(size int64)

RecordQueueSize records the current executor queue size.

func (*MetricsCollector) RecordSchedulerLag

func (m *MetricsCollector) RecordSchedulerLag(jobID string, lag time.Duration)

RecordSchedulerLag records the lag between scheduled and actual execution time.

func (*MetricsCollector) Reset

func (m *MetricsCollector) Reset()

Reset resets all metrics.

type Scheduler

type Scheduler = core.Scheduler

Scheduler is the interface for job schedulers. This is re-exported from core to avoid import cycles. Scheduler implementations should be in the scheduler package.

type SchedulerMode added in v0.7.5

type SchedulerMode string

SchedulerMode represents the scheduler mode.

const (
	// ModeSimple is a single-node scheduler mode.
	ModeSimple SchedulerMode = "simple"

	// ModeDistributed is a multi-node scheduler mode with leader election.
	ModeDistributed SchedulerMode = "distributed"
)

func (SchedulerMode) IsValid added in v0.7.5

func (m SchedulerMode) IsValid() bool

IsValid checks if the scheduler mode is valid.

func (SchedulerMode) String added in v0.7.5

func (m SchedulerMode) String() string

String returns the string representation of the scheduler mode.

type SchedulerStats

type SchedulerStats struct {
	TotalJobs         int            `json:"totalJobs"`
	EnabledJobs       int            `json:"enabledJobs"`
	DisabledJobs      int            `json:"disabledJobs"`
	RunningExecutions int            `json:"runningExecutions"`
	QueuedExecutions  int            `json:"queuedExecutions"`
	IsLeader          bool           `json:"isLeader"`
	NodeID            string         `json:"nodeId"`
	Uptime            time.Duration  `json:"uptime"`
	JobStats          map[string]int `json:"jobStats"` // Status counts
}

SchedulerStats contains overall scheduler statistics.

type Storage

type Storage = core.Storage

Storage is the interface for job and execution persistence. This is re-exported from core to avoid import cycles. Storage implementations should be in the storage package.

type StorageType added in v0.7.5

type StorageType string

StorageType represents the storage backend type.

const (
	// StorageMemory uses in-memory storage (non-persistent, for development/testing).
	StorageMemory StorageType = "memory"

	// StorageDatabase uses database storage (requires database extension).
	StorageDatabase StorageType = "database"

	// StorageRedis uses Redis storage (required for distributed mode).
	StorageRedis StorageType = "redis"
)

func (StorageType) IsValid added in v0.7.5

func (s StorageType) IsValid() bool

IsValid checks if the storage type is valid.

func (StorageType) String added in v0.7.5

func (s StorageType) String() string

String returns the string representation of the storage type.

Directories

Path Synopsis
Package core contains shared interfaces to avoid import cycles.
Package core contains shared interfaces to avoid import cycles.
Package register imports scheduler and storage implementations to register them.
Package register imports scheduler and storage implementations to register them.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL