Documentation
¶
Index ¶
- Constants
- Variables
- func CreateLoggingMiddleware(log func(ctx context.Context, job *Job, err error)) func(JobHandler) JobHandler
- func CreatePanicRecoveryMiddleware(onPanic func(ctx context.Context, job *Job, recovered interface{})) func(JobHandler) JobHandler
- func CreateTimeoutMiddleware() func(JobHandler) JobHandler
- func NewExtension(opts ...ConfigOption) forge.Extension
- func NewExtensionWithConfig(config Config) forge.Extension
- type Config
- type ConfigOption
- func WithAPI(enable bool) ConfigOption
- func WithAPIPrefix(prefix string) ConfigOption
- func WithConfig(config Config) ConfigOption
- func WithConfigFile(path string) ConfigOption
- func WithConsensusExtension(name string) ConfigOption
- func WithDatabaseConnection(name string) ConfigOption
- func WithDefaultTimeout(timeout time.Duration) ConfigOption
- func WithDefaultTimezone(tz string) ConfigOption
- func WithHeartbeatInterval(interval time.Duration) ConfigOption
- func WithHistoryRetention(days int) ConfigOption
- func WithLeaderElection(enable bool) ConfigOption
- func WithLockTTL(ttl time.Duration) ConfigOption
- func WithMaxConcurrentJobs(max int) ConfigOption
- func WithMaxHistoryRecords(max int) ConfigOption
- func WithMaxRetries(max int) ConfigOption
- func WithMaxRetryBackoff(max time.Duration) ConfigOption
- func WithMetrics(enable bool) ConfigOption
- func WithMode(mode SchedulerMode) ConfigOption
- func WithModeString(mode string) ConfigOption
- func WithNodeID(id string) ConfigOption
- func WithRedisConnection(name string) ConfigOption
- func WithRequireConfig(require bool) ConfigOption
- func WithRetryBackoff(backoff time.Duration) ConfigOption
- func WithRetryMultiplier(multiplier float64) ConfigOption
- func WithShutdownTimeout(timeout time.Duration) ConfigOption
- func WithStorage(storage StorageType) ConfigOption
- func WithStorageString(storage string) ConfigOption
- func WithWebUI(enable bool) ConfigOption
- type ExecutionFilter
- type ExecutionStatus
- type Executor
- type Extension
- func (e *Extension) CreateJob(ctx context.Context, job *Job) error
- func (e *Extension) DeleteJob(ctx context.Context, jobID string) error
- func (e *Extension) Dependencies() []string
- func (e *Extension) GetExecutor() *Executor
- func (e *Extension) GetRegistry() *JobRegistry
- func (e *Extension) GetScheduler() Scheduler
- func (e *Extension) GetStats(ctx context.Context) (*SchedulerStats, error)
- func (e *Extension) GetStorage() Storage
- func (e *Extension) Health(ctx context.Context) error
- func (e *Extension) Register(app forge.App) error
- func (e *Extension) Run(ctx context.Context) error
- func (e *Extension) Shutdown(ctx context.Context) error
- func (e *Extension) Start(ctx context.Context) error
- func (e *Extension) Stop(ctx context.Context) error
- func (e *Extension) TriggerJob(ctx context.Context, jobID string) (string, error)
- func (e *Extension) UpdateJob(ctx context.Context, jobID string, update *JobUpdate) error
- type HistoryTracker
- func (h *HistoryTracker) DeleteJobExecutions(ctx context.Context, jobID string) (int64, error)
- func (h *HistoryTracker) DeleteOldExecutions(ctx context.Context, before time.Time) (int64, error)
- func (h *HistoryTracker) GetAllJobStats(ctx context.Context) (map[string]*JobStats, error)
- func (h *HistoryTracker) GetAverageDuration(ctx context.Context, jobID string) (time.Duration, error)
- func (h *HistoryTracker) GetExecution(ctx context.Context, executionID string) (*JobExecution, error)
- func (h *HistoryTracker) GetExecutionCount(ctx context.Context, filter *ExecutionFilter) (int64, error)
- func (h *HistoryTracker) GetExecutionTrend(ctx context.Context, jobID string, duration time.Duration) (map[string]int, error)
- func (h *HistoryTracker) GetExecutions(ctx context.Context, filter *ExecutionFilter) ([]*JobExecution, error)
- func (h *HistoryTracker) GetExecutionsByDateRange(ctx context.Context, after, before time.Time, limit int) ([]*JobExecution, error)
- func (h *HistoryTracker) GetExecutionsByStatus(ctx context.Context, status ExecutionStatus, limit int) ([]*JobExecution, error)
- func (h *HistoryTracker) GetFailedExecutions(ctx context.Context, limit int) ([]*JobExecution, error)
- func (h *HistoryTracker) GetJobExecutions(ctx context.Context, jobID string, limit int) ([]*JobExecution, error)
- func (h *HistoryTracker) GetJobStats(ctx context.Context, jobID string) (*JobStats, error)
- func (h *HistoryTracker) GetLastExecution(ctx context.Context, jobID string) (*JobExecution, error)
- func (h *HistoryTracker) GetLastSuccessfulExecution(ctx context.Context, jobID string) (*JobExecution, error)
- func (h *HistoryTracker) GetRecentExecutions(ctx context.Context, limit int) ([]*JobExecution, error)
- func (h *HistoryTracker) GetSuccessRate(ctx context.Context, jobID string) (float64, error)
- func (h *HistoryTracker) Start(ctx context.Context) error
- func (h *HistoryTracker) Stop(ctx context.Context) error
- type Job
- type JobConfig
- type JobExecution
- type JobHandler
- type JobLoader
- func (l *JobLoader) ExportToFile(ctx context.Context, jobs []*Job, filePath string) error
- func (l *JobLoader) LoadFromFile(ctx context.Context, filePath string) ([]*Job, error)
- func (l *JobLoader) LoadFromJSON(ctx context.Context, data []byte) ([]*Job, error)
- func (l *JobLoader) LoadFromYAML(ctx context.Context, data []byte) ([]*Job, error)
- func (l *JobLoader) ValidateJobConfig(config JobConfig) error
- type JobRegistry
- func (r *JobRegistry) Clear()
- func (r *JobRegistry) Count() int
- func (r *JobRegistry) Get(name string) (JobHandler, error)
- func (r *JobRegistry) Has(name string) bool
- func (r *JobRegistry) List() []string
- func (r *JobRegistry) MustRegister(name string, handler JobHandler)
- func (r *JobRegistry) Register(name string, handler JobHandler) error
- func (r *JobRegistry) RegisterBatch(handlers map[string]JobHandler) error
- func (r *JobRegistry) RegisterWithMiddleware(name string, handler JobHandler, middleware ...func(JobHandler) JobHandler) error
- func (r *JobRegistry) Unregister(name string) error
- func (r *JobRegistry) WrapHandler(name string, wrapper func(JobHandler) JobHandler) error
- type JobStats
- type JobUpdate
- type JobsConfig
- type MetricsCollector
- func (m *MetricsCollector) ExportMetrics(ctx context.Context) error
- func (m *MetricsCollector) GetAverageExecutionDuration() time.Duration
- func (m *MetricsCollector) GetAverageSchedulerLag() time.Duration
- func (m *MetricsCollector) GetExecutionsTotal() map[string]int64
- func (m *MetricsCollector) GetJobsTotal() int64
- func (m *MetricsCollector) GetLeaderStatus() int64
- func (m *MetricsCollector) GetMetricsSummary() map[string]interface{}
- func (m *MetricsCollector) GetQueueSize() int64
- func (m *MetricsCollector) RecordExecution(jobID, jobName string, status ExecutionStatus, duration time.Duration)
- func (m *MetricsCollector) RecordJobRegistered()
- func (m *MetricsCollector) RecordJobUnregistered()
- func (m *MetricsCollector) RecordLeaderStatus(isLeader bool)
- func (m *MetricsCollector) RecordQueueSize(size int64)
- func (m *MetricsCollector) RecordSchedulerLag(jobID string, lag time.Duration)
- func (m *MetricsCollector) Reset()
- type Scheduler
- type SchedulerMode
- type SchedulerStats
- type Storage
- type StorageType
Constants ¶
const ( // ErrCodeJobNotFound indicates a job with the given ID was not found ErrCodeJobNotFound = "CRON_JOB_NOT_FOUND" // ErrCodeJobAlreadyExists indicates a job with the given ID already exists ErrCodeJobAlreadyExists = "CRON_JOB_ALREADY_EXISTS" // ErrCodeInvalidSchedule indicates the cron expression is invalid ErrCodeInvalidSchedule = "CRON_INVALID_SCHEDULE" // ErrCodeInvalidConfig indicates the configuration is invalid ErrCodeInvalidConfig = "CRON_INVALID_CONFIG" // ErrCodeJobDisabled indicates an operation was attempted on a disabled job ErrCodeJobDisabled = "CRON_JOB_DISABLED" // ErrCodeHandlerNotFound indicates a handler with the given name was not registered ErrCodeHandlerNotFound = "CRON_HANDLER_NOT_FOUND" // ErrCodeExecutionNotFound indicates an execution with the given ID was not found ErrCodeExecutionNotFound = "CRON_EXECUTION_NOT_FOUND" // ErrCodeExecutionTimeout indicates a job execution exceeded its timeout ErrCodeExecutionTimeout = "CRON_EXECUTION_TIMEOUT" // ErrCodeExecutionCancelled indicates a job execution was cancelled ErrCodeExecutionCancelled = "CRON_EXECUTION_CANCELLED" // ErrCodeMaxRetriesExceeded indicates a job exceeded its maximum retry attempts ErrCodeMaxRetriesExceeded = "CRON_MAX_RETRIES_EXCEEDED" // ErrCodeSchedulerNotRunning indicates the scheduler is not running ErrCodeSchedulerNotRunning = "CRON_SCHEDULER_NOT_RUNNING" // ErrCodeNotLeader indicates the operation requires leader status in distributed mode ErrCodeNotLeader = "CRON_NOT_LEADER" // ErrCodeStorageError indicates a storage operation failed ErrCodeStorageError = "CRON_STORAGE_ERROR" // ErrCodeLockAcquisitionFailed indicates failed to acquire distributed lock ErrCodeLockAcquisitionFailed = "CRON_LOCK_ACQUISITION_FAILED" // ErrCodeJobRunning indicates the job is already running ErrCodeJobRunning = "CRON_JOB_RUNNING" // ErrCodeInvalidJobType indicates the job type (handler vs command) is invalid ErrCodeInvalidJobType = "CRON_INVALID_JOB_TYPE" )
Error codes for the cron extension.
Variables ¶
var ( // ErrJobNotFound is returned when a job is not found ErrJobNotFound = errors.New("job not found") // ErrJobAlreadyExists is returned when trying to create a job with an existing ID ErrJobAlreadyExists = errors.New("job already exists") // ErrInvalidSchedule is returned when a cron expression is invalid ErrInvalidSchedule = errors.New("invalid cron schedule") // ErrInvalidConfig is returned when the configuration is invalid ErrInvalidConfig = errors.New("invalid configuration") // ErrJobDisabled is returned when attempting to trigger a disabled job ErrJobDisabled = errors.New("job is disabled") // ErrHandlerNotFound is returned when a handler is not registered ErrHandlerNotFound = errors.New("handler not found") // ErrExecutionNotFound is returned when an execution is not found ErrExecutionNotFound = errors.New("execution not found") // ErrExecutionTimeout is returned when a job execution times out ErrExecutionTimeout = errors.New("execution timeout") // ErrExecutionCancelled is returned when a job execution is cancelled ErrExecutionCancelled = errors.New("execution cancelled") // ErrMaxRetriesExceeded is returned when max retries are exceeded ErrMaxRetriesExceeded = errors.New("max retries exceeded") // ErrSchedulerNotRunning is returned when the scheduler is not running ErrSchedulerNotRunning = errors.New("scheduler not running") // ErrNotLeader is returned when operation requires leader in distributed mode ErrNotLeader = errors.New("not leader") // ErrStorageError is returned when a storage operation fails ErrStorageError = errors.New("storage error") // ErrLockAcquisitionFailed is returned when failed to acquire distributed lock ErrLockAcquisitionFailed = errors.New("lock acquisition failed") // ErrJobRunning is returned when trying to start an already running job ErrJobRunning = errors.New("job is already running") // ErrInvalidJobType is returned when job type is invalid ErrInvalidJobType = errors.New("invalid job type: must have either handler or command") // ErrInvalidData is returned when invalid data is received from storage ErrInvalidData = errors.New("invalid data received from storage") )
Functions ¶
func CreateLoggingMiddleware ¶
func CreateLoggingMiddleware(log func(ctx context.Context, job *Job, err error)) func(JobHandler) JobHandler
CreateLoggingMiddleware creates middleware that logs job execution.
func CreatePanicRecoveryMiddleware ¶
func CreatePanicRecoveryMiddleware(onPanic func(ctx context.Context, job *Job, recovered interface{})) func(JobHandler) JobHandler
CreatePanicRecoveryMiddleware creates middleware that recovers from panics.
func CreateTimeoutMiddleware ¶
func CreateTimeoutMiddleware() func(JobHandler) JobHandler
CreateTimeoutMiddleware creates middleware that enforces a timeout.
func NewExtension ¶
func NewExtension(opts ...ConfigOption) forge.Extension
NewExtension creates a new cron extension with functional options.
func NewExtensionWithConfig ¶
NewExtensionWithConfig creates a new cron extension with a complete config.
Types ¶
type Config ¶
type Config struct {
// Mode specifies the scheduler mode: "simple" or "distributed"
Mode string `json:"mode" mapstructure:"mode" yaml:"mode"`
// Storage backend: "memory", "database", "redis"
Storage string `json:"storage" mapstructure:"storage" yaml:"storage"`
// DatabaseConnection is the name of the database connection to use (when Storage="database")
DatabaseConnection string `json:"databaseConnection,omitempty" mapstructure:"database_connection" yaml:"database_connection,omitempty"`
// RedisConnection is the name of the Redis connection to use (when Storage="redis")
RedisConnection string `json:"redisConnection,omitempty" mapstructure:"redis_connection" yaml:"redis_connection,omitempty"`
// Scheduler settings
MaxConcurrentJobs int `json:"maxConcurrentJobs" mapstructure:"max_concurrent_jobs" yaml:"max_concurrent_jobs"`
DefaultTimeout time.Duration `json:"defaultTimeout" mapstructure:"default_timeout" yaml:"default_timeout"`
DefaultTimezone string `json:"defaultTimezone" mapstructure:"default_timezone" yaml:"default_timezone"`
// Retry policy
MaxRetries int `json:"maxRetries" mapstructure:"max_retries" yaml:"max_retries"`
RetryBackoff time.Duration `json:"retryBackoff" mapstructure:"retry_backoff" yaml:"retry_backoff"`
RetryMultiplier float64 `json:"retryMultiplier" mapstructure:"retry_multiplier" yaml:"retry_multiplier"`
MaxRetryBackoff time.Duration `json:"maxRetryBackoff" mapstructure:"max_retry_backoff" yaml:"max_retry_backoff"`
// History retention
HistoryRetentionDays int `json:"historyRetentionDays" mapstructure:"history_retention_days" yaml:"history_retention_days"`
MaxHistoryRecords int `json:"maxHistoryRecords" mapstructure:"max_history_records" yaml:"max_history_records"`
// Distributed mode settings
LeaderElection bool `json:"leaderElection" mapstructure:"leader_election" yaml:"leader_election"`
ConsensusExtension string `json:"consensusExtension,omitempty" mapstructure:"consensus_extension" yaml:"consensus_extension,omitempty"`
HeartbeatInterval time.Duration `json:"heartbeatInterval" mapstructure:"heartbeat_interval" yaml:"heartbeat_interval"`
LockTTL time.Duration `json:"lockTTL" mapstructure:"lock_ttl" yaml:"lock_ttl"`
// API settings
EnableAPI bool `json:"enableApi" mapstructure:"enable_api" yaml:"enable_api"`
APIPrefix string `json:"apiPrefix" mapstructure:"api_prefix" yaml:"api_prefix"`
EnableWebUI bool `json:"enableWebUi" mapstructure:"enable_web_ui" yaml:"enable_web_ui"`
// Monitoring
EnableMetrics bool `json:"enableMetrics" mapstructure:"enable_metrics" yaml:"enable_metrics"`
// Job loading
ConfigFile string `json:"configFile,omitempty" mapstructure:"config_file" yaml:"config_file,omitempty"`
// Graceful shutdown timeout
ShutdownTimeout time.Duration `json:"shutdownTimeout" mapstructure:"shutdown_timeout" yaml:"shutdown_timeout"`
// Node ID for distributed mode (auto-generated if empty)
NodeID string `json:"nodeId,omitempty" mapstructure:"node_id" yaml:"node_id,omitempty"`
// Config loading flags (not serialized)
RequireConfig bool `json:"-" mapstructure:"-" yaml:"-"`
}
Config contains configuration for the cron extension.
func (*Config) GetMode ¶ added in v0.7.5
func (c *Config) GetMode() SchedulerMode
GetMode returns the scheduler mode as a typed SchedulerMode.
func (*Config) GetStorage ¶ added in v0.7.5
func (c *Config) GetStorage() StorageType
GetStorage returns the storage type as a typed StorageType.
type ConfigOption ¶
type ConfigOption func(*Config)
ConfigOption is a functional option for Config.
func WithAPIPrefix ¶
func WithAPIPrefix(prefix string) ConfigOption
WithAPIPrefix sets the API prefix.
func WithConfigFile ¶
func WithConfigFile(path string) ConfigOption
WithConfigFile sets the jobs configuration file path.
func WithConsensusExtension ¶
func WithConsensusExtension(name string) ConfigOption
WithConsensusExtension sets the consensus extension name.
func WithDatabaseConnection ¶
func WithDatabaseConnection(name string) ConfigOption
WithDatabaseConnection sets the database connection name.
func WithDefaultTimeout ¶
func WithDefaultTimeout(timeout time.Duration) ConfigOption
WithDefaultTimeout sets the default job timeout.
func WithDefaultTimezone ¶
func WithDefaultTimezone(tz string) ConfigOption
WithDefaultTimezone sets the default timezone.
func WithHeartbeatInterval ¶
func WithHeartbeatInterval(interval time.Duration) ConfigOption
WithHeartbeatInterval sets the heartbeat interval for distributed mode.
func WithHistoryRetention ¶
func WithHistoryRetention(days int) ConfigOption
WithHistoryRetention sets the history retention in days.
func WithLeaderElection ¶
func WithLeaderElection(enable bool) ConfigOption
WithLeaderElection enables/disables leader election.
func WithLockTTL ¶
func WithLockTTL(ttl time.Duration) ConfigOption
WithLockTTL sets the lock TTL for distributed locking.
func WithMaxConcurrentJobs ¶
func WithMaxConcurrentJobs(max int) ConfigOption
WithMaxConcurrentJobs sets the maximum concurrent jobs.
func WithMaxHistoryRecords ¶
func WithMaxHistoryRecords(max int) ConfigOption
WithMaxHistoryRecords sets the maximum history records to keep.
func WithMaxRetries ¶
func WithMaxRetries(max int) ConfigOption
WithMaxRetries sets the maximum retry attempts.
func WithMaxRetryBackoff ¶
func WithMaxRetryBackoff(max time.Duration) ConfigOption
WithMaxRetryBackoff sets the maximum retry backoff duration.
func WithMetrics ¶
func WithMetrics(enable bool) ConfigOption
WithMetrics enables/disables metrics collection.
func WithMode ¶
func WithMode(mode SchedulerMode) ConfigOption
WithMode sets the scheduler mode using a SchedulerMode constant.
func WithModeString ¶ added in v0.7.5
func WithModeString(mode string) ConfigOption
WithModeString sets the scheduler mode using a string value. Prefer WithMode with typed constants for better type safety.
func WithNodeID ¶
func WithNodeID(id string) ConfigOption
WithNodeID sets the node ID for distributed mode.
func WithRedisConnection ¶
func WithRedisConnection(name string) ConfigOption
WithRedisConnection sets the Redis connection name.
func WithRequireConfig ¶
func WithRequireConfig(require bool) ConfigOption
WithRequireConfig requires config from ConfigManager.
func WithRetryBackoff ¶
func WithRetryBackoff(backoff time.Duration) ConfigOption
WithRetryBackoff sets the retry backoff duration.
func WithRetryMultiplier ¶
func WithRetryMultiplier(multiplier float64) ConfigOption
WithRetryMultiplier sets the retry multiplier for exponential backoff.
func WithShutdownTimeout ¶
func WithShutdownTimeout(timeout time.Duration) ConfigOption
WithShutdownTimeout sets the graceful shutdown timeout.
func WithStorage ¶
func WithStorage(storage StorageType) ConfigOption
WithStorage sets the storage backend using a StorageType constant.
func WithStorageString ¶ added in v0.7.5
func WithStorageString(storage string) ConfigOption
WithStorageString sets the storage backend using a string value. Prefer WithStorage with typed constants for better type safety.
type ExecutionFilter ¶
type ExecutionFilter struct {
JobID string `json:"jobId,omitempty"`
Status []ExecutionStatus `json:"status,omitempty"`
StartedAt *time.Time `json:"startedAt,omitempty"`
Before *time.Time `json:"before,omitempty"`
After *time.Time `json:"after,omitempty"`
NodeID string `json:"nodeId,omitempty"`
Limit int `json:"limit,omitempty"`
Offset int `json:"offset,omitempty"`
OrderBy string `json:"orderBy,omitempty"` // Field to order by
OrderDir string `json:"orderDir,omitempty"` // "asc" or "desc"
}
ExecutionFilter is used to filter job executions when querying.
type ExecutionStatus ¶
type ExecutionStatus string
ExecutionStatus represents the status of a job execution.
const ( // ExecutionStatusPending indicates the execution is queued but not started ExecutionStatusPending ExecutionStatus = "pending" // ExecutionStatusRunning indicates the execution is in progress ExecutionStatusRunning ExecutionStatus = "running" // ExecutionStatusSuccess indicates the execution completed successfully ExecutionStatusSuccess ExecutionStatus = "success" // ExecutionStatusFailed indicates the execution failed ExecutionStatusFailed ExecutionStatus = "failed" // ExecutionStatusCancelled indicates the execution was cancelled ExecutionStatusCancelled ExecutionStatus = "cancelled" // ExecutionStatusTimeout indicates the execution exceeded the timeout ExecutionStatusTimeout ExecutionStatus = "timeout" // ExecutionStatusRetrying indicates the execution is being retried ExecutionStatusRetrying ExecutionStatus = "retrying" )
type Executor ¶
type Executor struct {
// contains filtered or unexported fields
}
Executor handles job execution with concurrency control, timeouts, and retries.
func NewExecutor ¶
func NewExecutor(config Config, storage Storage, registry *JobRegistry, logger forge.Logger, metrics forge.Metrics, nodeID string) *Executor
NewExecutor creates a new job executor.
func (*Executor) Execute ¶
Execute executes a job asynchronously. Returns the execution ID and any immediate errors.
func (*Executor) GetRunningCount ¶
GetRunningCount returns the number of currently running jobs.
func (*Executor) GetRunningJobs ¶
GetRunningJobs returns IDs of all currently running jobs.
func (*Executor) IsJobRunning ¶
IsJobRunning checks if a job is currently running.
type Extension ¶
type Extension struct {
*forge.BaseExtension
// contains filtered or unexported fields
}
Extension implements forge.Extension and forge.RunnableExtension for cron functionality.
func (*Extension) Dependencies ¶
Dependencies returns the names of extensions this extension depends on.
func (*Extension) GetExecutor ¶
GetExecutor returns the executor instance.
func (*Extension) GetRegistry ¶
func (e *Extension) GetRegistry() *JobRegistry
GetRegistry returns the job registry. This is useful for registering job handlers.
func (*Extension) GetScheduler ¶
GetScheduler returns the scheduler instance. This is useful for registering jobs programmatically.
func (*Extension) GetStats ¶
func (e *Extension) GetStats(ctx context.Context) (*SchedulerStats, error)
GetStats returns scheduler statistics.
func (*Extension) GetStorage ¶
GetStorage returns the storage instance.
func (*Extension) Run ¶
Run implements forge.RunnableExtension. This is called after the app starts to run any long-running processes.
func (*Extension) Shutdown ¶
Shutdown implements forge.RunnableExtension. This is called before Stop to gracefully shutdown long-running processes.
func (*Extension) TriggerJob ¶
TriggerJob manually triggers a job execution.
type HistoryTracker ¶
type HistoryTracker struct {
// contains filtered or unexported fields
}
HistoryTracker manages job execution history with automatic cleanup.
func NewHistoryTracker ¶
func NewHistoryTracker(config Config, storage Storage, logger forge.Logger) *HistoryTracker
NewHistoryTracker creates a new history tracker.
func (*HistoryTracker) DeleteJobExecutions ¶
DeleteJobExecutions deletes all executions for a specific job.
func (*HistoryTracker) DeleteOldExecutions ¶
DeleteOldExecutions manually triggers cleanup of old executions.
func (*HistoryTracker) GetAllJobStats ¶
GetAllJobStats retrieves statistics for all jobs.
func (*HistoryTracker) GetAverageDuration ¶
func (h *HistoryTracker) GetAverageDuration(ctx context.Context, jobID string) (time.Duration, error)
GetAverageDuration calculates the average execution duration for a job.
func (*HistoryTracker) GetExecution ¶
func (h *HistoryTracker) GetExecution(ctx context.Context, executionID string) (*JobExecution, error)
GetExecution retrieves a single execution by ID.
func (*HistoryTracker) GetExecutionCount ¶
func (h *HistoryTracker) GetExecutionCount(ctx context.Context, filter *ExecutionFilter) (int64, error)
GetExecutionCount returns the total number of executions matching the filter.
func (*HistoryTracker) GetExecutionTrend ¶
func (h *HistoryTracker) GetExecutionTrend(ctx context.Context, jobID string, duration time.Duration) (map[string]int, error)
GetExecutionTrend calculates execution trends over time.
func (*HistoryTracker) GetExecutions ¶
func (h *HistoryTracker) GetExecutions(ctx context.Context, filter *ExecutionFilter) ([]*JobExecution, error)
GetExecutions retrieves execution history with filtering.
func (*HistoryTracker) GetExecutionsByDateRange ¶
func (h *HistoryTracker) GetExecutionsByDateRange(ctx context.Context, after, before time.Time, limit int) ([]*JobExecution, error)
GetExecutionsByDateRange retrieves executions within a date range.
func (*HistoryTracker) GetExecutionsByStatus ¶
func (h *HistoryTracker) GetExecutionsByStatus(ctx context.Context, status ExecutionStatus, limit int) ([]*JobExecution, error)
GetExecutionsByStatus retrieves executions with a specific status.
func (*HistoryTracker) GetFailedExecutions ¶
func (h *HistoryTracker) GetFailedExecutions(ctx context.Context, limit int) ([]*JobExecution, error)
GetFailedExecutions retrieves failed executions.
func (*HistoryTracker) GetJobExecutions ¶
func (h *HistoryTracker) GetJobExecutions(ctx context.Context, jobID string, limit int) ([]*JobExecution, error)
GetJobExecutions retrieves executions for a specific job.
func (*HistoryTracker) GetJobStats ¶
GetJobStats retrieves aggregated statistics for a job. Results are cached for 1 minute to reduce storage queries.
func (*HistoryTracker) GetLastExecution ¶
func (h *HistoryTracker) GetLastExecution(ctx context.Context, jobID string) (*JobExecution, error)
GetLastExecution retrieves the most recent execution for a job.
func (*HistoryTracker) GetLastSuccessfulExecution ¶
func (h *HistoryTracker) GetLastSuccessfulExecution(ctx context.Context, jobID string) (*JobExecution, error)
GetLastSuccessfulExecution retrieves the most recent successful execution for a job.
func (*HistoryTracker) GetRecentExecutions ¶
func (h *HistoryTracker) GetRecentExecutions(ctx context.Context, limit int) ([]*JobExecution, error)
GetRecentExecutions retrieves the most recent executions across all jobs.
func (*HistoryTracker) GetSuccessRate ¶
GetSuccessRate calculates the success rate for a job.
type Job ¶
type Job struct {
// ID is the unique identifier for this job
ID string `json:"id" bson:"_id,omitempty"`
// Name is a human-readable name for the job
Name string `json:"name" bson:"name"`
// Schedule is the cron expression defining when the job runs
Schedule string `json:"schedule" bson:"schedule"`
// Handler is the function to execute (for code-based jobs)
// This field is not persisted to storage
Handler JobHandler `json:"-" bson:"-"`
// HandlerName is the name of the registered handler (for code-based jobs)
HandlerName string `json:"handlerName,omitempty" bson:"handler_name,omitempty"`
// Command is the shell command to execute (for command-based jobs)
Command string `json:"command,omitempty" bson:"command,omitempty"`
// Args are arguments for the command
Args []string `json:"args,omitempty" bson:"args,omitempty"`
// Env are environment variables for the command
Env []string `json:"env,omitempty" bson:"env,omitempty"`
// WorkingDir is the working directory for command execution
WorkingDir string `json:"workingDir,omitempty" bson:"working_dir,omitempty"`
// Payload is arbitrary data passed to the job handler
Payload map[string]interface{} `json:"payload,omitempty" bson:"payload,omitempty"`
// Enabled indicates whether the job is active
Enabled bool `json:"enabled" bson:"enabled"`
// Timezone specifies the timezone for schedule evaluation
Timezone *time.Location `json:"timezone,omitempty" bson:"timezone,omitempty"`
// MaxRetries is the maximum number of retry attempts on failure
MaxRetries int `json:"maxRetries" bson:"max_retries"`
// Timeout is the maximum execution time for the job
Timeout time.Duration `json:"timeout" bson:"timeout"`
// Metadata contains arbitrary key-value metadata
Metadata map[string]string `json:"metadata,omitempty" bson:"metadata,omitempty"`
// Tags for categorizing jobs
Tags []string `json:"tags,omitempty" bson:"tags,omitempty"`
// CreatedAt is when the job was created
CreatedAt time.Time `json:"createdAt" bson:"created_at"`
// UpdatedAt is when the job was last updated
UpdatedAt time.Time `json:"updatedAt" bson:"updated_at"`
// LastExecutionAt is when the job last executed
LastExecutionAt *time.Time `json:"lastExecutionAt,omitempty" bson:"last_execution_at,omitempty"`
// NextExecutionAt is when the job will next execute
NextExecutionAt *time.Time `json:"nextExecutionAt,omitempty" bson:"next_execution_at,omitempty"`
}
Job represents a scheduled job.
type JobConfig ¶
type JobConfig struct {
ID string `json:"id" yaml:"id"`
Name string `json:"name" yaml:"name"`
Schedule string `json:"schedule" yaml:"schedule"`
HandlerName string `json:"handlerName,omitempty" yaml:"handlerName,omitempty"`
Handler string `json:"handler,omitempty" yaml:"handler,omitempty"` // Alias for HandlerName
Command string `json:"command,omitempty" yaml:"command,omitempty"`
Args []string `json:"args,omitempty" yaml:"args,omitempty"`
Env []string `json:"env,omitempty" yaml:"env,omitempty"`
WorkingDir string `json:"workingDir,omitempty" yaml:"workingDir,omitempty"`
Payload map[string]interface{} `json:"payload,omitempty" yaml:"payload,omitempty"`
Enabled bool `json:"enabled" yaml:"enabled"`
Timezone string `json:"timezone,omitempty" yaml:"timezone,omitempty"`
MaxRetries int `json:"maxRetries" yaml:"maxRetries"`
Timeout string `json:"timeout" yaml:"timeout"` // Duration string
Metadata map[string]string `json:"metadata,omitempty" yaml:"metadata,omitempty"`
Tags []string `json:"tags,omitempty" yaml:"tags,omitempty"`
}
JobConfig represents a job configuration from YAML/JSON.
type JobExecution ¶
type JobExecution struct {
// ID is the unique identifier for this execution
ID string `json:"id" bson:"_id,omitempty"`
// JobID is the ID of the job being executed
JobID string `json:"jobId" bson:"job_id"`
// JobName is a denormalized copy of the job name for convenience
JobName string `json:"jobName" bson:"job_name"`
// Status is the current status of the execution
Status ExecutionStatus `json:"status" bson:"status"`
// ScheduledAt is when the job was scheduled to run
ScheduledAt time.Time `json:"scheduledAt" bson:"scheduled_at"`
// StartedAt is when the execution actually started
StartedAt time.Time `json:"startedAt" bson:"started_at"`
// CompletedAt is when the execution finished (success or failure)
CompletedAt *time.Time `json:"completedAt,omitempty" bson:"completed_at,omitempty"`
// Error contains the error message if the execution failed
Error string `json:"error,omitempty" bson:"error,omitempty"`
// Output contains stdout/stderr from command execution
Output string `json:"output,omitempty" bson:"output,omitempty"`
// Retries is the number of retries attempted
Retries int `json:"retries" bson:"retries"`
// NodeID identifies which node executed the job (for distributed mode)
NodeID string `json:"nodeId,omitempty" bson:"node_id,omitempty"`
// Duration is how long the execution took
Duration time.Duration `json:"duration" bson:"duration"`
// Metadata contains arbitrary execution metadata
Metadata map[string]string `json:"metadata,omitempty" bson:"metadata,omitempty"`
}
JobExecution represents a single execution of a job.
type JobHandler ¶
JobHandler is the function signature for code-based jobs. The handler receives a context (for cancellation) and the job definition. Return an error if the job fails.
type JobLoader ¶
type JobLoader struct {
// contains filtered or unexported fields
}
JobLoader loads jobs from configuration files.
func NewJobLoader ¶
func NewJobLoader(logger forge.Logger, registry *JobRegistry) *JobLoader
NewJobLoader creates a new job loader.
func (*JobLoader) ExportToFile ¶
ExportToFile exports jobs to a YAML or JSON file.
func (*JobLoader) LoadFromFile ¶
LoadFromFile loads jobs from a YAML or JSON file.
func (*JobLoader) LoadFromJSON ¶
LoadFromJSON loads jobs from JSON data.
func (*JobLoader) LoadFromYAML ¶
LoadFromYAML loads jobs from YAML data.
func (*JobLoader) ValidateJobConfig ¶
ValidateJobConfig validates a job configuration.
type JobRegistry ¶
type JobRegistry struct {
// contains filtered or unexported fields
}
JobRegistry manages registration of job handlers for code-based jobs. It allows you to register named handlers that can be referenced by name when creating jobs programmatically or from configuration.
func (*JobRegistry) Count ¶
func (r *JobRegistry) Count() int
Count returns the number of registered handlers.
func (*JobRegistry) Get ¶
func (r *JobRegistry) Get(name string) (JobHandler, error)
Get retrieves a job handler by name.
func (*JobRegistry) Has ¶
func (r *JobRegistry) Has(name string) bool
Has checks if a handler is registered.
func (*JobRegistry) List ¶
func (r *JobRegistry) List() []string
List returns all registered handler names.
func (*JobRegistry) MustRegister ¶
func (r *JobRegistry) MustRegister(name string, handler JobHandler)
MustRegister registers a job handler and panics on error. Useful for registration at startup where failure should be fatal.
func (*JobRegistry) Register ¶
func (r *JobRegistry) Register(name string, handler JobHandler) error
Register registers a job handler with the given name. The handler can then be referenced when creating jobs.
Example:
registry.Register("sendEmail", func(ctx context.Context, job *Job) error {
// Send email logic
return nil
})
func (*JobRegistry) RegisterBatch ¶
func (r *JobRegistry) RegisterBatch(handlers map[string]JobHandler) error
RegisterBatch registers multiple handlers at once. If any registration fails, all successful registrations are rolled back.
func (*JobRegistry) RegisterWithMiddleware ¶
func (r *JobRegistry) RegisterWithMiddleware(name string, handler JobHandler, middleware ...func(JobHandler) JobHandler) error
RegisterWithMiddleware registers a handler with middleware applied.
func (*JobRegistry) Unregister ¶
func (r *JobRegistry) Unregister(name string) error
Unregister removes a job handler.
func (*JobRegistry) WrapHandler ¶
func (r *JobRegistry) WrapHandler(name string, wrapper func(JobHandler) JobHandler) error
WrapHandler wraps a handler with additional functionality (middleware pattern). This is useful for adding logging, metrics, error handling, etc.
Example:
wrapped := registry.WrapHandler("myHandler", func(ctx context.Context, job *Job) error {
start := time.Now()
defer func() {
logger.Info("job completed", "duration", time.Since(start))
}()
return originalHandler(ctx, job)
})
type JobStats ¶
type JobStats struct {
JobID string `json:"jobId"`
JobName string `json:"jobName"`
TotalExecutions int64 `json:"totalExecutions"`
SuccessCount int64 `json:"successCount"`
FailureCount int64 `json:"failureCount"`
CancelCount int64 `json:"cancelCount"`
TimeoutCount int64 `json:"timeoutCount"`
AverageDuration time.Duration `json:"averageDuration"`
MinDuration time.Duration `json:"minDuration"`
MaxDuration time.Duration `json:"maxDuration"`
LastSuccess *time.Time `json:"lastSuccess,omitempty"`
LastFailure *time.Time `json:"lastFailure,omitempty"`
LastExecution *time.Time `json:"lastExecution,omitempty"`
NextExecution *time.Time `json:"nextExecution,omitempty"`
SuccessRate float64 `json:"successRate"` // Percentage
RecentExecutions int `json:"recentExecutions"`
}
JobStats contains aggregated statistics for a job.
type JobUpdate ¶
type JobUpdate struct {
Name *string `json:"name,omitempty"`
Schedule *string `json:"schedule,omitempty"`
HandlerName *string `json:"handlerName,omitempty"`
Command *string `json:"command,omitempty"`
Args []string `json:"args,omitempty"`
Env []string `json:"env,omitempty"`
WorkingDir *string `json:"workingDir,omitempty"`
Payload map[string]interface{} `json:"payload,omitempty"`
Enabled *bool `json:"enabled,omitempty"`
MaxRetries *int `json:"maxRetries,omitempty"`
Timeout *time.Duration `json:"timeout,omitempty"`
Metadata map[string]string `json:"metadata,omitempty"`
Tags []string `json:"tags,omitempty"`
}
JobUpdate represents fields that can be updated on a job.
type JobsConfig ¶
type JobsConfig struct {
Jobs []JobConfig `json:"jobs" yaml:"jobs"`
}
JobsConfig represents a jobs configuration file.
type MetricsCollector ¶
type MetricsCollector struct {
// contains filtered or unexported fields
}
MetricsCollector collects and exposes metrics for the cron extension.
func NewMetricsCollector ¶
func NewMetricsCollector(metrics forge.Metrics, logger forge.Logger) *MetricsCollector
NewMetricsCollector creates a new metrics collector.
func (*MetricsCollector) ExportMetrics ¶
func (m *MetricsCollector) ExportMetrics(ctx context.Context) error
ExportMetrics exports metrics to the provided metrics system. This can be called periodically to update gauge metrics.
func (*MetricsCollector) GetAverageExecutionDuration ¶
func (m *MetricsCollector) GetAverageExecutionDuration() time.Duration
GetAverageExecutionDuration returns the average execution duration.
func (*MetricsCollector) GetAverageSchedulerLag ¶
func (m *MetricsCollector) GetAverageSchedulerLag() time.Duration
GetAverageSchedulerLag returns the average scheduler lag.
func (*MetricsCollector) GetExecutionsTotal ¶
func (m *MetricsCollector) GetExecutionsTotal() map[string]int64
GetExecutionsTotal returns the total executions by status.
func (*MetricsCollector) GetJobsTotal ¶
func (m *MetricsCollector) GetJobsTotal() int64
GetJobsTotal returns the total number of registered jobs.
func (*MetricsCollector) GetLeaderStatus ¶
func (m *MetricsCollector) GetLeaderStatus() int64
GetLeaderStatus returns the current leader status.
func (*MetricsCollector) GetMetricsSummary ¶
func (m *MetricsCollector) GetMetricsSummary() map[string]interface{}
GetMetricsSummary returns a summary of all metrics.
func (*MetricsCollector) GetQueueSize ¶
func (m *MetricsCollector) GetQueueSize() int64
GetQueueSize returns the current executor queue size.
func (*MetricsCollector) RecordExecution ¶
func (m *MetricsCollector) RecordExecution(jobID, jobName string, status ExecutionStatus, duration time.Duration)
RecordExecution records a job execution completion.
func (*MetricsCollector) RecordJobRegistered ¶
func (m *MetricsCollector) RecordJobRegistered()
RecordJobRegistered records that a job was registered.
func (*MetricsCollector) RecordJobUnregistered ¶
func (m *MetricsCollector) RecordJobUnregistered()
RecordJobUnregistered records that a job was unregistered.
func (*MetricsCollector) RecordLeaderStatus ¶
func (m *MetricsCollector) RecordLeaderStatus(isLeader bool)
RecordLeaderStatus records the leader status (0 = follower, 1 = leader).
func (*MetricsCollector) RecordQueueSize ¶
func (m *MetricsCollector) RecordQueueSize(size int64)
RecordQueueSize records the current executor queue size.
func (*MetricsCollector) RecordSchedulerLag ¶
func (m *MetricsCollector) RecordSchedulerLag(jobID string, lag time.Duration)
RecordSchedulerLag records the lag between scheduled and actual execution time.
type Scheduler ¶
Scheduler is the interface for job schedulers. This is re-exported from core to avoid import cycles. Scheduler implementations should be in the scheduler package.
type SchedulerMode ¶ added in v0.7.5
type SchedulerMode string
SchedulerMode represents the scheduler mode.
const ( // ModeSimple is a single-node scheduler mode. ModeSimple SchedulerMode = "simple" // ModeDistributed is a multi-node scheduler mode with leader election. ModeDistributed SchedulerMode = "distributed" )
func (SchedulerMode) IsValid ¶ added in v0.7.5
func (m SchedulerMode) IsValid() bool
IsValid checks if the scheduler mode is valid.
func (SchedulerMode) String ¶ added in v0.7.5
func (m SchedulerMode) String() string
String returns the string representation of the scheduler mode.
type SchedulerStats ¶
type SchedulerStats struct {
TotalJobs int `json:"totalJobs"`
EnabledJobs int `json:"enabledJobs"`
DisabledJobs int `json:"disabledJobs"`
RunningExecutions int `json:"runningExecutions"`
QueuedExecutions int `json:"queuedExecutions"`
IsLeader bool `json:"isLeader"`
NodeID string `json:"nodeId"`
Uptime time.Duration `json:"uptime"`
JobStats map[string]int `json:"jobStats"` // Status counts
}
SchedulerStats contains overall scheduler statistics.
type Storage ¶
Storage is the interface for job and execution persistence. This is re-exported from core to avoid import cycles. Storage implementations should be in the storage package.
type StorageType ¶ added in v0.7.5
type StorageType string
StorageType represents the storage backend type.
const ( // StorageMemory uses in-memory storage (non-persistent, for development/testing). StorageMemory StorageType = "memory" // StorageDatabase uses database storage (requires database extension). StorageDatabase StorageType = "database" // StorageRedis uses Redis storage (required for distributed mode). StorageRedis StorageType = "redis" )
func (StorageType) IsValid ¶ added in v0.7.5
func (s StorageType) IsValid() bool
IsValid checks if the storage type is valid.
func (StorageType) String ¶ added in v0.7.5
func (s StorageType) String() string
String returns the string representation of the storage type.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package core contains shared interfaces to avoid import cycles.
|
Package core contains shared interfaces to avoid import cycles. |
|
Package register imports scheduler and storage implementations to register them.
|
Package register imports scheduler and storage implementations to register them. |