Documentation
¶
Index ¶
- Variables
- type BasicScheduleCodec
- type CronSchedule
- type CronSpec
- type ExecutionRecord
- type ExecutionRecordModel
- type GormStorage
- func (s *GormStorage) Close(ctx context.Context) error
- func (s *GormStorage) DeleteExecutions(ctx context.Context, jobID string, before time.Time) error
- func (s *GormStorage) DeleteJob(ctx context.Context, jobID string) error
- func (s *GormStorage) GetExecution(ctx context.Context, executionID string) (*ExecutionRecord, error)
- func (s *GormStorage) GetJob(ctx context.Context, jobID string) (*JobData, error)
- func (s *GormStorage) HealthCheck(ctx context.Context) error
- func (s *GormStorage) Initialize(ctx context.Context) error
- func (s *GormStorage) ListExecutions(ctx context.Context, jobID string, options *QueryOptions) ([]*ExecutionRecord, error)
- func (s *GormStorage) ListJobs(ctx context.Context) ([]*JobData, error)
- func (s *GormStorage) ListJobsByStatus(ctx context.Context, status JobStatus) ([]*JobData, error)
- func (s *GormStorage) SaveExecution(ctx context.Context, record *ExecutionRecord) error
- func (s *GormStorage) SaveJob(ctx context.Context, job *JobData) error
- func (s *GormStorage) UpdateJob(ctx context.Context, job *JobData) error
- type IntervalSchedule
- type Job
- type JobData
- type JobDataModel
- type JobEvent
- func (e JobEvent) Delay() time.Duration
- func (e JobEvent) ID() string
- func (e JobEvent) Job() Job
- func (e JobEvent) LastCompletedAt() time.Time
- func (e JobEvent) Metadata() map[string]string
- func (e JobEvent) Name() string
- func (e JobEvent) Schedule() Schedule
- func (e JobEvent) ScheduledAt() time.Time
- func (e JobEvent) StartedAt() time.Time
- type JobHandler
- type JobStatus
- type MemoryStorage
- func (s *MemoryStorage) Close(ctx context.Context) error
- func (s *MemoryStorage) DeleteExecutions(ctx context.Context, jobID string, before time.Time) error
- func (s *MemoryStorage) DeleteJob(ctx context.Context, jobID string) error
- func (s *MemoryStorage) GetExecution(ctx context.Context, executionID string) (*ExecutionRecord, error)
- func (s *MemoryStorage) GetJob(ctx context.Context, jobID string) (*JobData, error)
- func (s *MemoryStorage) HealthCheck(ctx context.Context) error
- func (s *MemoryStorage) Initialize(ctx context.Context) error
- func (s *MemoryStorage) ListExecutions(ctx context.Context, jobID string, options *QueryOptions) ([]*ExecutionRecord, error)
- func (s *MemoryStorage) ListJobs(ctx context.Context) ([]*JobData, error)
- func (s *MemoryStorage) ListJobsByStatus(ctx context.Context, status JobStatus) ([]*JobData, error)
- func (s *MemoryStorage) SaveExecution(ctx context.Context, record *ExecutionRecord) error
- func (s *MemoryStorage) SaveJob(ctx context.Context, job *JobData) error
- func (s *MemoryStorage) UpdateJob(ctx context.Context, job *JobData) error
- type MigrateResult
- type NATSSchedulerLogger
- type NATSSchedulerOption
- func WithAddJobRetryBudget(budget time.Duration) NATSSchedulerOption
- func WithJetStreamReadyTimeout(timeout time.Duration) NATSSchedulerOption
- func WithLoadJobsAsyncPublishTimeout(timeout time.Duration) NATSSchedulerOption
- func WithLoadJobsConcurrency(n int) NATSSchedulerOption
- func WithNATSConsumerName(name string) NATSSchedulerOption
- func WithNATSSchedulerCodec(codec ScheduleCodec) NATSSchedulerOption
- func WithNATSSchedulerExecBucket(name string) NATSSchedulerOption
- func WithNATSSchedulerJobBucket(name string) NATSSchedulerOption
- func WithNATSSchedulerLogger(logger NATSSchedulerLogger) NATSSchedulerOption
- func WithNATSStreamName(name string) NATSSchedulerOption
- func WithNATSSubjectPrefix(prefix string) NATSSchedulerOption
- func WithOnReschedulingFailed(fn RescheduleFailureFunc) NATSSchedulerOption
- func WithOnce(fn OnceFunc) NATSSchedulerOption
- func WithOnceKey(key string) NATSSchedulerOption
- func WithOnceLockBucket(name string) NATSSchedulerOption
- func WithPublishRetry(attempts int, initialBackoff time.Duration) NATSSchedulerOption
- func WithReconcilerGracePeriod(gracePeriod time.Duration) NATSSchedulerOption
- func WithReconcilerInterval(interval time.Duration) NATSSchedulerOption
- func WithStartPhaseTimeout(timeout time.Duration) NATSSchedulerOption
- func WithStartupStreamReadyTimeout(timeout time.Duration) NATSSchedulerOption
- func WithStreamDuplicatesWindow(window time.Duration) NATSSchedulerOption
- type NATSStorage
- func (s *NATSStorage) Close(ctx context.Context) error
- func (s *NATSStorage) DeleteExecutions(ctx context.Context, jobID string, before time.Time) error
- func (s *NATSStorage) DeleteJob(ctx context.Context, jobID string) error
- func (s *NATSStorage) GetExecution(ctx context.Context, executionID string) (*ExecutionRecord, error)
- func (s *NATSStorage) GetJob(ctx context.Context, jobID string) (*JobData, error)
- func (s *NATSStorage) HealthCheck(ctx context.Context) error
- func (s *NATSStorage) Initialize(ctx context.Context) error
- func (s *NATSStorage) ListExecutions(ctx context.Context, jobID string, options *QueryOptions) ([]*ExecutionRecord, error)
- func (s *NATSStorage) ListJobs(ctx context.Context) ([]*JobData, error)
- func (s *NATSStorage) ListJobsByStatus(ctx context.Context, status JobStatus) ([]*JobData, error)
- func (s *NATSStorage) SaveExecution(ctx context.Context, record *ExecutionRecord) error
- func (s *NATSStorage) SaveJob(ctx context.Context, job *JobData) error
- func (s *NATSStorage) UpdateJob(ctx context.Context, job *JobData) error
- type NATSStorageOption
- func WithNATSStorageExecBucket(name string) NATSStorageOption
- func WithNATSStorageJetStreamReadyTimeout(timeout time.Duration) NATSStorageOption
- func WithNATSStorageJobBucket(name string) NATSStorageOption
- func WithNATSStorageOnce(fn OnceFunc) NATSStorageOption
- func WithNATSStorageOnceKey(key string) NATSStorageOption
- func WithNATSStorageOnceLockBucket(name string) NATSStorageOption
- type OnceFunc
- type OnceSchedule
- type QueryOptions
- type RescheduleFailureFunc
- type Schedule
- type ScheduleBuilder
- type ScheduleCodec
- type Scheduler
- type StartAtIntervalSchedule
- type Storage
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrSchedulerNotStarted indicates the scheduler has not been started ErrSchedulerNotStarted = errors.New("scheduler not started") // ErrSchedulerAlreadyStarted indicates the scheduler is already running ErrSchedulerAlreadyStarted = errors.New("scheduler already started") // ErrSchedulerStopped indicates the scheduler has been stopped ErrSchedulerStopped = errors.New("scheduler stopped") // ErrInvalidInterval indicates an invalid schedule interval ErrInvalidInterval = errors.New("invalid schedule interval") // ErrInvalidCronExpression indicates an invalid cron expression ErrInvalidCronExpression = errors.New("invalid cron expression") // ErrInvalidScheduleTime indicates an invalid schedule time ErrInvalidScheduleTime = errors.New("invalid schedule time") // ErrInvalidScheduleConfig indicates an invalid serialized schedule configuration ErrInvalidScheduleConfig = errors.New("invalid schedule configuration") // ErrJobNotFound indicates the specified job does not exist ErrJobNotFound = errors.New("job not found") // ErrJobAlreadyExists indicates a job with the same ID already exists ErrJobAlreadyExists = errors.New("job already exists") // ErrEmptyJobID indicates the job ID cannot be empty ErrEmptyJobID = errors.New("job ID cannot be empty") // ErrHandlerNotDefined indicates the scheduler was created without a job handler ErrHandlerNotDefined = errors.New("scheduler handler not defined") )
Pre-defined errors
var ( // ErrStorageNotInitialized indicates the storage has not been initialized ErrStorageNotInitialized = errors.New("storage not initialized") // ErrStorageConnectionFailed indicates failed to connect to storage ErrStorageConnectionFailed = errors.New("storage connection failed") // ErrStorageOperationFailed indicates a storage operation failed ErrStorageOperationFailed = errors.New("storage operation failed") // ErrJobDataNotFound indicates the job data does not exist in storage ErrJobDataNotFound = errors.New("job data not found") // ErrJobDataAlreadyExists indicates the job data already exists in storage ErrJobDataAlreadyExists = errors.New("job data already exists") // ErrInvalidJobData indicates the job data is invalid or corrupted ErrInvalidJobData = errors.New("invalid job data") // ErrExecutionHistoryNotFound indicates no execution history found ErrExecutionHistoryNotFound = errors.New("execution history not found") )
Pre-defined storage errors
var ErrNATSServerTooOld = errors.New("NATS server does not support scheduled message delivery (requires 2.12+)")
ErrNATSServerTooOld indicates the connected NATS server does not support scheduled message delivery (requires NATS 2.12+).
Functions ¶
This section is empty.
Types ¶
type BasicScheduleCodec ¶
type BasicScheduleCodec struct{}
BasicScheduleCodec provides a schedule codec for the built-in schedules.
func NewBasicScheduleCodec ¶
func NewBasicScheduleCodec() *BasicScheduleCodec
NewBasicScheduleCodec creates a new BasicScheduleCodec instance.
type CronSchedule ¶ added in v0.0.2
type CronSchedule struct {
// contains filtered or unexported fields
}
CronSchedule runs a job based on a cron expression. Supports standard cron format: minute hour day month weekday Examples:
- "0 10 * * 5" - Every Friday at 10:00 AM
- "30 14 * * *" - Every day at 2:30 PM
- "0 0 1 * *" - First day of every month at midnight
- "*/5 * * * *" - Every 5 minutes
func NewCronSchedule ¶ added in v0.0.2
func NewCronSchedule(expression string) (*CronSchedule, error)
NewCronSchedule creates a new CronSchedule from a cron expression. The expression follows standard cron format: minute hour day month weekday
func NewCronScheduleFromSpec ¶ added in v0.0.2
func NewCronScheduleFromSpec(spec *CronSpec) (*CronSchedule, error)
NewCronScheduleFromSpec creates a new CronSchedule from a CronSpec. This provides a more structured way to define cron schedules without using string expressions. Empty fields default to "*" (any value).
Examples:
- Every Friday at 10:00 AM: NewCronScheduleFromSpec(&CronSpec{Minute: "0", Hour: "10", DayOfWeek: "5"})
- Every day at 2:30 PM: NewCronScheduleFromSpec(&CronSpec{Minute: "30", Hour: "14"})
- Every 5 minutes: NewCronScheduleFromSpec(&CronSpec{Minute: "*/5"})
func (*CronSchedule) Expression ¶ added in v0.0.2
func (s *CronSchedule) Expression() string
Expression returns the cron expression string.
type CronSpec ¶ added in v0.0.2
type CronSpec struct {
// Minute (0-59), default "*" (every minute)
Minute string
// Hour (0-23), default "*" (every hour)
Hour string
// DayOfMonth (1-31), default "*" (every day)
DayOfMonth string
// Month (1-12), default "*" (every month)
Month string
// DayOfWeek (0-6, Sunday=0), default "*" (every day)
DayOfWeek string
}
CronSpec defines a cron schedule using structured fields instead of a string expression. This provides a more type-safe and intuitive way to define cron schedules.
Field values can be:
- Specific number: e.g., "5" for day of week (Friday)
- Wildcard: "*" for any value
- Range: "1-5" for Monday to Friday
- List: "1,3,5" for Monday, Wednesday, Friday
- Step: "*/5" for every 5 units
Examples:
- Every Friday at 10:00 AM: {Minute: "0", Hour: "10", DayOfWeek: "5"}
- Every day at 2:30 PM: {Minute: "30", Hour: "14"}
- First day of month at midnight: {Minute: "0", Hour: "0", DayOfMonth: "1"}
- Every 5 minutes: {Minute: "*/5"}
type ExecutionRecord ¶
type ExecutionRecord struct {
// JobID is the identifier of the job
JobID string
// ExecutionID is the unique identifier of this execution
ExecutionID string
// StartTime is when the execution started
StartTime time.Time
// EndTime is when the execution completed
EndTime time.Time
// Duration is the execution duration
Duration time.Duration
// Status is the execution result status
Status JobStatus
// Error stores the error message if execution failed
Error string
// Metadata stores additional execution metadata
Metadata map[string]string
}
ExecutionRecord represents a single job execution record
type ExecutionRecordModel ¶
type ExecutionRecordModel struct {
ExecutionID string `gorm:"primaryKey;size:255"`
JobID string `gorm:"size:255;index"`
StartTime time.Time `gorm:"index"`
EndTime time.Time
Duration int64 // nanoseconds
Status string `gorm:"size:20;index"`
Error string `gorm:"type:text"`
Metadata string `gorm:"type:text"` // JSON-encoded map[string]string
}
ExecutionRecordModel represents the database model for ExecutionRecord
func (ExecutionRecordModel) TableName ¶
func (ExecutionRecordModel) TableName() string
TableName specifies the table name for ExecutionRecordModel
type GormStorage ¶
type GormStorage struct {
// contains filtered or unexported fields
}
GormStorage is a GORM-based implementation of the Storage interface
Example (Basic) ¶
ExampleGormStorage_basic demonstrates basic usage of GORM storage
package main
import (
"context"
"fmt"
"time"
"github.com/Weedbox/scheduler"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
)
func main() {
// Create a database connection (SQLite in-memory for this example)
db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
if err != nil {
panic(err)
}
// Create GORM storage
storage := scheduler.NewGormStorage(db)
// Initialize storage
ctx := context.Background()
if err := storage.Initialize(ctx); err != nil {
panic(err)
}
// Save a job
job := &scheduler.JobData{
ID: "daily-backup",
ScheduleType: "cron",
ScheduleConfig: "0 0 * * *",
Status: scheduler.JobStatusPending,
NextRun: time.Now().Add(24 * time.Hour),
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
Metadata: map[string]string{"type": "backup"},
}
if err := storage.SaveJob(ctx, job); err != nil {
panic(err)
}
// Retrieve the job
retrieved, err := storage.GetJob(ctx, "daily-backup")
if err != nil {
panic(err)
}
fmt.Printf("Job ID: %s, Type: %s\n", retrieved.ID, retrieved.Metadata["type"])
// Clean up
storage.Close(ctx)
}
Output: Job ID: daily-backup, Type: backup
Example (ExecutionRecords) ¶
ExampleGormStorage_executionRecords demonstrates saving and querying execution records
package main
import (
"context"
"fmt"
"time"
"github.com/Weedbox/scheduler"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
)
func main() {
db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
if err != nil {
panic(err)
}
storage := scheduler.NewGormStorage(db)
ctx := context.Background()
if err := storage.Initialize(ctx); err != nil {
panic(err)
}
// Save execution records
now := time.Now()
for i := 0; i < 3; i++ {
record := &scheduler.ExecutionRecord{
JobID: "job1",
ExecutionID: fmt.Sprintf("exec-%d", i),
StartTime: now.Add(time.Duration(i) * time.Hour),
EndTime: now.Add(time.Duration(i)*time.Hour + 10*time.Minute),
Duration: 10 * time.Minute,
Status: scheduler.JobStatusCompleted,
}
if err := storage.SaveExecution(ctx, record); err != nil {
panic(err)
}
}
// Query executions with limit
options := &scheduler.QueryOptions{
Limit: 2,
SortBy: "start_time",
}
executions, err := storage.ListExecutions(ctx, "job1", options)
if err != nil {
panic(err)
}
fmt.Printf("Found %d executions\n", len(executions))
storage.Close(ctx)
}
Output: Found 2 executions
Example (ListJobsByStatus) ¶
ExampleGormStorage_listJobsByStatus demonstrates querying jobs by status
package main
import (
"context"
"fmt"
"time"
"github.com/Weedbox/scheduler"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
)
func main() {
db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
if err != nil {
panic(err)
}
storage := scheduler.NewGormStorage(db)
ctx := context.Background()
if err := storage.Initialize(ctx); err != nil {
panic(err)
}
// Create jobs with different statuses
statuses := []scheduler.JobStatus{
scheduler.JobStatusPending,
scheduler.JobStatusRunning,
scheduler.JobStatusPending,
}
for i, status := range statuses {
job := &scheduler.JobData{
ID: fmt.Sprintf("job-%d", i),
Status: status,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
if err := storage.SaveJob(ctx, job); err != nil {
panic(err)
}
}
// Query pending jobs
pendingJobs, err := storage.ListJobsByStatus(ctx, scheduler.JobStatusPending)
if err != nil {
panic(err)
}
fmt.Printf("Pending jobs: %d\n", len(pendingJobs))
storage.Close(ctx)
}
Output: Pending jobs: 2
Example (WithScheduler) ¶
ExampleGormStorage_withScheduler demonstrates using GORM storage with scheduler
package main
import (
"context"
"fmt"
"github.com/Weedbox/scheduler"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
)
func main() {
// Create a database connection
db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
if err != nil {
panic(err)
}
// Create GORM storage and scheduler
storage := scheduler.NewGormStorage(db)
jobFuncs := make(map[string]func(context.Context) error)
handler := func(ctx context.Context, event scheduler.JobEvent) error {
if fn, ok := jobFuncs[event.ID()]; ok && fn != nil {
return fn(ctx)
}
return nil
}
sched := scheduler.NewScheduler(storage, handler, scheduler.NewBasicScheduleCodec())
// Start scheduler (this will initialize storage)
ctx := context.Background()
if err := sched.Start(ctx); err != nil {
panic(err)
}
// Note: In a real application, you would add jobs using a Schedule implementation
// This example demonstrates that the scheduler can work with GORM storage
// Stop scheduler (this will close storage)
if err := sched.Stop(ctx); err != nil {
panic(err)
}
fmt.Println("Scheduler started and stopped successfully with GORM storage")
}
Output: Scheduler started and stopped successfully with GORM storage
func NewGormStorage ¶
func NewGormStorage(db *gorm.DB) *GormStorage
NewGormStorage creates a new GORM storage instance
func (*GormStorage) Close ¶
func (s *GormStorage) Close(ctx context.Context) error
Close releases storage resources
func (*GormStorage) DeleteExecutions ¶
DeleteExecutions removes execution records older than the specified time
func (*GormStorage) DeleteJob ¶
func (s *GormStorage) DeleteJob(ctx context.Context, jobID string) error
DeleteJob removes job data from storage
func (*GormStorage) GetExecution ¶
func (s *GormStorage) GetExecution(ctx context.Context, executionID string) (*ExecutionRecord, error)
GetExecution retrieves a specific execution record
func (*GormStorage) HealthCheck ¶
func (s *GormStorage) HealthCheck(ctx context.Context) error
HealthCheck verifies the storage connection is healthy
func (*GormStorage) Initialize ¶
func (s *GormStorage) Initialize(ctx context.Context) error
Initialize prepares the storage for use
func (*GormStorage) ListExecutions ¶
func (s *GormStorage) ListExecutions(ctx context.Context, jobID string, options *QueryOptions) ([]*ExecutionRecord, error)
ListExecutions returns execution records for a job with optional filters
func (*GormStorage) ListJobs ¶
func (s *GormStorage) ListJobs(ctx context.Context) ([]*JobData, error)
ListJobs returns all jobs in storage
func (*GormStorage) ListJobsByStatus ¶
ListJobsByStatus returns jobs filtered by status
func (*GormStorage) SaveExecution ¶
func (s *GormStorage) SaveExecution(ctx context.Context, record *ExecutionRecord) error
SaveExecution persists an execution record
type IntervalSchedule ¶
type IntervalSchedule struct {
// contains filtered or unexported fields
}
IntervalSchedule runs a job at fixed intervals.
func NewIntervalSchedule ¶
func NewIntervalSchedule(interval time.Duration) (*IntervalSchedule, error)
NewIntervalSchedule creates a new IntervalSchedule.
func (*IntervalSchedule) Interval ¶
func (s *IntervalSchedule) Interval() time.Duration
Interval returns the configured interval duration.
type Job ¶
type Job interface {
// ID returns the unique identifier of the job
ID() string
// NextRun returns the next scheduled run time
NextRun() time.Time
// LastRun returns the last execution time
LastRun() time.Time
// IsRunning returns whether the job is currently executing
IsRunning() bool
// Metadata returns the job metadata
Metadata() map[string]string
}
Job represents a scheduled job configuration
type JobData ¶
type JobData struct {
// ID is the unique identifier of the job
ID string
// ScheduleType indicates the type of schedule (interval, cron, once, etc.)
ScheduleType string
// ScheduleConfig stores the schedule configuration as a string
// For interval: duration string (e.g., "5m", "1h")
// For cron: cron expression (e.g., "0 0 * * *")
// For once: timestamp (e.g., "2025-10-02T15:04:05Z")
ScheduleConfig string
// Status is the current status of the job
Status JobStatus
// NextRun is the next scheduled execution time
NextRun time.Time
// LastRun is the last execution time
LastRun time.Time
// CreatedAt is the job creation timestamp
CreatedAt time.Time
// UpdatedAt is the last update timestamp
UpdatedAt time.Time
// Metadata stores additional job metadata as key-value pairs
Metadata map[string]string
}
JobData represents the persistent data of a scheduled job
type JobDataModel ¶
type JobDataModel struct {
ID string `gorm:"primaryKey;size:255"`
ScheduleType string `gorm:"size:50"`
ScheduleConfig string `gorm:"type:text"`
Status string `gorm:"size:20;index"`
NextRun time.Time `gorm:"index"`
LastRun time.Time
CreatedAt time.Time `gorm:"index"`
UpdatedAt time.Time `gorm:"index"`
Metadata string `gorm:"type:text"` // JSON-encoded map[string]string
}
JobDataModel represents the database model for JobData
func (JobDataModel) TableName ¶
func (JobDataModel) TableName() string
TableName specifies the table name for JobDataModel
type JobEvent ¶
type JobEvent struct {
// contains filtered or unexported fields
}
JobEvent represents the context passed to a JobHandler when a job is executed.
func (JobEvent) Delay ¶
Delay returns the duration between the scheduled time and the handler invocation.
func (JobEvent) LastCompletedAt ¶
LastCompletedAt returns the completion time of the previous successful execution, or zero if none.
func (JobEvent) ScheduledAt ¶
ScheduledAt returns the time the job was scheduled to execute.
type JobHandler ¶
JobHandler handles job execution events.
type JobStatus ¶
type JobStatus string
JobStatus represents the current status of a job
const ( // JobStatusPending indicates the job is waiting to run JobStatusPending JobStatus = "pending" // JobStatusRunning indicates the job is currently executing JobStatusRunning JobStatus = "running" // JobStatusCompleted indicates the job completed successfully JobStatusCompleted JobStatus = "completed" // JobStatusFailed indicates the job execution failed JobStatusFailed JobStatus = "failed" // JobStatusReschedulingFailed indicates the handler completed but the // scheduler failed to enqueue the next-tick scheduled message. Distinct // from JobStatusFailed so observability can tell the two apart. JobStatusReschedulingFailed JobStatus = "rescheduling_failed" // JobStatusDisabled indicates the job is disabled JobStatusDisabled JobStatus = "disabled" )
type MemoryStorage ¶
type MemoryStorage struct {
// contains filtered or unexported fields
}
MemoryStorage is an in-memory implementation of the Storage interface
func NewMemoryStorage ¶
func NewMemoryStorage() *MemoryStorage
NewMemoryStorage creates a new in-memory storage instance
func (*MemoryStorage) Close ¶
func (s *MemoryStorage) Close(ctx context.Context) error
Close releases storage resources
func (*MemoryStorage) DeleteExecutions ¶
DeleteExecutions removes execution records older than the specified time
func (*MemoryStorage) DeleteJob ¶
func (s *MemoryStorage) DeleteJob(ctx context.Context, jobID string) error
DeleteJob removes job data from storage
func (*MemoryStorage) GetExecution ¶
func (s *MemoryStorage) GetExecution(ctx context.Context, executionID string) (*ExecutionRecord, error)
GetExecution retrieves a specific execution record
func (*MemoryStorage) HealthCheck ¶
func (s *MemoryStorage) HealthCheck(ctx context.Context) error
HealthCheck verifies the storage connection is healthy
func (*MemoryStorage) Initialize ¶
func (s *MemoryStorage) Initialize(ctx context.Context) error
Initialize prepares the storage for use
func (*MemoryStorage) ListExecutions ¶
func (s *MemoryStorage) ListExecutions(ctx context.Context, jobID string, options *QueryOptions) ([]*ExecutionRecord, error)
ListExecutions returns execution records for a job with optional filters
func (*MemoryStorage) ListJobs ¶
func (s *MemoryStorage) ListJobs(ctx context.Context) ([]*JobData, error)
ListJobs returns all jobs in storage
func (*MemoryStorage) ListJobsByStatus ¶
ListJobsByStatus returns jobs filtered by status
func (*MemoryStorage) SaveExecution ¶
func (s *MemoryStorage) SaveExecution(ctx context.Context, record *ExecutionRecord) error
SaveExecution persists an execution record
type MigrateResult ¶ added in v0.0.5
type MigrateResult struct {
JobsMigrated int
JobsSkipped int
ExecutionsMigrated int
ExecutionsSkipped int
}
MigrateResult contains the results of a storage migration.
func MigrateStorage ¶ added in v0.0.5
MigrateStorage copies all jobs and execution records from src to dst. It is idempotent: jobs that already exist in dst are skipped. This function works with any Storage implementations (e.g. GORM→NATS, Memory→GORM, etc).
type NATSSchedulerLogger ¶ added in v0.0.8
NATSSchedulerLogger is a structured-log callback used to surface errors from best-effort operations (KV writes, next-tick publishes, reconciler scans). Arguments after msg are key/value pairs in the slog style.
The scheduler invokes the logger from background goroutines; the callback must be safe to call concurrently.
type NATSSchedulerOption ¶ added in v0.0.5
type NATSSchedulerOption func(*natsSchedulerImpl)
NATSSchedulerOption configures a natsSchedulerImpl instance.
func WithAddJobRetryBudget ¶ added in v0.1.0
func WithAddJobRetryBudget(budget time.Duration) NATSSchedulerOption
WithAddJobRetryBudget bounds how long AddJob and UpdateJobSchedule will keep retrying their KV write and scheduled-message publish through a transient cluster hiccup (a raft leader re-election typically finishes well inside the default 5 s budget). A truly unavailable cluster will exhaust the budget and bubble the error back to the caller.
Setting budget <= 0 disables the retry entirely, restoring the previous "fail fast on the first transient error" behaviour.
func WithJetStreamReadyTimeout ¶ added in v0.1.1
func WithJetStreamReadyTimeout(timeout time.Duration) NATSSchedulerOption
WithJetStreamReadyTimeout bounds how long Start() waits for the JetStream metaleader to be reachable before issuing the first KV / stream / consumer create.
This is the fix for the multi-instance brand-new-deployment hang. nats.go's AccountInfo documents:
"For clustered topologies, AccountInfo will time out."
Every JetStream API the scheduler uses (CreateOrUpdateKeyValue, CreateOrUpdateStream, CreateOrUpdateConsumer) issues an AccountInfo internally. When several scheduler instances boot against a freshly- started 3-node NATS cluster, the metaleader may still be electing — the API request neither succeeds nor fails fast, it hangs for the caller's ctx deadline. Without this wait, Start() can hang for minutes (typical caller ctx is much longer than 30 s).
timeout <= 0 disables the wait (legacy behaviour). Default 30 s comfortably covers a metaleader election; the moment the metaleader answers, the wait returns.
func WithLoadJobsAsyncPublishTimeout ¶ added in v0.3.0
func WithLoadJobsAsyncPublishTimeout(timeout time.Duration) NATSSchedulerOption
WithLoadJobsAsyncPublishTimeout bounds how long loadJobsFromKV waits at the end of Start() for outstanding async PublishMsgAsync calls to be acked. A timeout here does not break correctness — the background reconciler will republish any persisted job whose next-tick message was lost or never acked — but it is logged so operators can spot a cluster that is slow to ack startup publishes.
timeout <= 0 disables the wait (workers exit and Start() returns as soon as the last PublishMsgAsync has been enqueued).
func WithLoadJobsConcurrency ¶ added in v0.3.0
func WithLoadJobsConcurrency(n int) NATSSchedulerOption
WithLoadJobsConcurrency sets the number of worker goroutines used by loadJobsFromKV during Start() to parallelize KV reads and async scheduled-message publishes for persisted jobs.
Each worker does a sync jobKV.Get + (optional) jobKV.Update + a fire-and- forget PublishMsgAsync for one job before pulling the next key. The JetStream client itself pipelines the asyncs behind the scenes, so the publish stage is effectively free; this option controls how many KV reads run in parallel.
Higher values cut startup time on deployments with hundreds or thousands of persisted jobs (O(N*RTT) → O(N/concurrency * RTT)). Values below 1 are treated as 1 (no parallelism). The default is 32, comfortable for most clusters; raise it only if you have measured KV.Get latency dominating Start() time.
Note on error visibility: per-message publish ack errors at startup are not logged by the scheduler (the reconciler is the safety net — any job whose scheduled message never landed will be republished within the reconciler interval). If you want per-message visibility, install jetstream.WithPublishAsyncErrHandler on the JetStream client you pass to NewNATSScheduler — that handler fires for every async publish whose ack fails or times out.
func WithNATSConsumerName ¶ added in v0.0.5
func WithNATSConsumerName(name string) NATSSchedulerOption
WithNATSConsumerName sets the durable consumer name.
func WithNATSSchedulerCodec ¶ added in v0.0.5
func WithNATSSchedulerCodec(codec ScheduleCodec) NATSSchedulerOption
WithNATSSchedulerCodec sets the schedule codec for encoding/decoding schedules.
func WithNATSSchedulerExecBucket ¶ added in v0.0.5
func WithNATSSchedulerExecBucket(name string) NATSSchedulerOption
WithNATSSchedulerExecBucket sets the KV bucket name for execution records.
func WithNATSSchedulerJobBucket ¶ added in v0.0.5
func WithNATSSchedulerJobBucket(name string) NATSSchedulerOption
WithNATSSchedulerJobBucket sets the KV bucket name for job data.
func WithNATSSchedulerLogger ¶ added in v0.0.8
func WithNATSSchedulerLogger(logger NATSSchedulerLogger) NATSSchedulerOption
WithNATSSchedulerLogger installs a logger called when the scheduler hits a non-fatal error in a best-effort path (KV puts, next-tick publishes, and the reconciler). Without this option, those errors are dropped silently.
func WithNATSStreamName ¶ added in v0.0.5
func WithNATSStreamName(name string) NATSSchedulerOption
WithNATSStreamName sets the JetStream stream name.
func WithNATSSubjectPrefix ¶ added in v0.0.5
func WithNATSSubjectPrefix(prefix string) NATSSchedulerOption
WithNATSSubjectPrefix sets the NATS subject prefix for job messages.
func WithOnReschedulingFailed ¶ added in v0.0.8
func WithOnReschedulingFailed(fn RescheduleFailureFunc) NATSSchedulerOption
WithOnReschedulingFailed installs a callback invoked when executeJob fails to enqueue the next-tick scheduled message after exhausting retries. Callers can use this to record a dead-letter, page on-call, or trigger an external reconciler. Without this option, the failure is logged (if a logger is configured) but otherwise silently absorbed.
func WithOnce ¶ added in v0.2.0
func WithOnce(fn OnceFunc) NATSSchedulerOption
WithOnce installs a caller-provided distributed-once implementation. By default the scheduler uses an internal JetStream-KV-backed Once. Pass a custom OnceFunc when you already have a distributed lock service (e.g. the one your platform's other modules use) and want the scheduler to share it so all first-deploy provisioning serializes through a single substrate.
func WithOnceKey ¶ added in v0.2.0
func WithOnceKey(key string) NATSSchedulerOption
WithOnceKey overrides the key handed to OnceFunc when serializing the scheduler's first-deploy provisioning. Defaults to "scheduler.init". Override when sharing a distributed-lock substrate with other modules and your global naming scheme expects a different value.
func WithOnceLockBucket ¶ added in v0.2.0
func WithOnceLockBucket(name string) NATSSchedulerOption
WithOnceLockBucket overrides the JetStream KV bucket name used by the built-in Once. Has no effect when WithOnce has installed a custom implementation. Defaults to SCHEDULER_LOCKS.
func WithPublishRetry ¶ added in v0.0.8
func WithPublishRetry(attempts int, initialBackoff time.Duration) NATSSchedulerOption
WithPublishRetry configures how many times executeJob retries the next-tick publishScheduledMessage call when it fails, and the backoff between attempts (doubling between each attempt). attempts < 1 is treated as 1 (no retry).
func WithReconcilerGracePeriod ¶ added in v0.0.8
func WithReconcilerGracePeriod(gracePeriod time.Duration) NATSSchedulerOption
WithReconcilerGracePeriod sets the lag tolerance the reconciler applies before treating a job as stuck. A job is republished only if its KV NextRun is older than now - gracePeriod. The default is 30 s, large enough to skip jobs that are mid-tick yet small enough to recover quickly.
func WithReconcilerInterval ¶ added in v0.0.8
func WithReconcilerInterval(interval time.Duration) NATSSchedulerOption
WithReconcilerInterval overrides how often the background reconciler scans the job KV and republishes the scheduled message for any recurring job whose NextRun is more than the supplied grace period in the past.
Combined with the stream's Duplicates window and the deterministic Nats-Msg-Id used by publishScheduledMessage, republishing is safe even when the original scheduled message is still alive in the stream: the duplicate is silently suppressed.
The reconciler is enabled by default (defaultReconcilerInterval). Passing interval <= 0 disables it.
func WithStartPhaseTimeout ¶ added in v0.1.1
func WithStartPhaseTimeout(timeout time.Duration) NATSSchedulerOption
WithStartPhaseTimeout caps each individual NATS API call inside Start (KV bucket creation, stream creation, backing-stream wait, persisted-job load, consumer creation, initial Consume subscribe). Without this cap a single hung JetStream API request consumes the caller's entire Start context — typical callers pass several minutes, so the hang surfaces 5+ minutes later with no indication of which phase blocked.
Each phase logs entry, exit, and elapsed time through the installed logger, and on failure the returned error is wrapped with the phase name.
timeout <= 0 falls back to using the caller's ctx directly (legacy behaviour, no per-phase bound).
func WithStartupStreamReadyTimeout ¶ added in v0.0.9
func WithStartupStreamReadyTimeout(timeout time.Duration) NATSSchedulerOption
WithStartupStreamReadyTimeout bounds how long Start() waits for the SCHEDULER stream and KV backing streams to have a raft leader before proceeding to load persisted jobs.
In multi-node JetStream clusters, CreateOrUpdateStream and CreateOrUpdateKeyValue return as soon as the metaleader has accepted the config — the asset's own raft group may still be electing a leader, so a publishScheduledMessage or KV Keys/Get call issued in that window returns nats: no responders available for request. Without this wait, loadJobsFromKV would silently or noisily drop the first startup publish for every persisted job until the chain naturally re-fires (it doesn't, because the chain is the publish we just lost).
timeout <= 0 disables the wait. Single-node JetStream (no Cluster info) is treated as ready immediately.
func WithStreamDuplicatesWindow ¶ added in v0.0.6
func WithStreamDuplicatesWindow(window time.Duration) NATSSchedulerOption
WithStreamDuplicatesWindow overrides the duration the stream tracks Nats-Msg-Id values for deduplication. Larger values catch republishes that arrive long after the original publish (e.g. a peer that restarts hours later), at the cost of more server-side state.
type NATSStorage ¶ added in v0.0.5
type NATSStorage struct {
// contains filtered or unexported fields
}
NATSStorage implements the Storage interface using NATS JetStream KV Store.
func NewNATSStorage ¶ added in v0.0.5
func NewNATSStorage(js jetstream.JetStream, opts ...NATSStorageOption) *NATSStorage
NewNATSStorage creates a new NATSStorage instance.
func (*NATSStorage) Close ¶ added in v0.0.5
func (s *NATSStorage) Close(ctx context.Context) error
Close marks the storage as uninitialized.
func (*NATSStorage) DeleteExecutions ¶ added in v0.0.5
DeleteExecutions removes execution records for a job that are older than the specified time.
func (*NATSStorage) DeleteJob ¶ added in v0.0.5
func (s *NATSStorage) DeleteJob(ctx context.Context, jobID string) error
DeleteJob removes a job from KV storage.
func (*NATSStorage) GetExecution ¶ added in v0.0.5
func (s *NATSStorage) GetExecution(ctx context.Context, executionID string) (*ExecutionRecord, error)
GetExecution retrieves a specific execution record by ID.
func (*NATSStorage) HealthCheck ¶ added in v0.0.5
func (s *NATSStorage) HealthCheck(ctx context.Context) error
HealthCheck verifies the storage is initialized and the KV bucket is accessible.
func (*NATSStorage) Initialize ¶ added in v0.0.5
func (s *NATSStorage) Initialize(ctx context.Context) error
Initialize creates the KV buckets and prepares the storage for use. The CreateOrUpdateKeyValue calls are wrapped in Once so concurrent first-deploy peers do not race the JetStream reply path (see once.go for the failure mode). After Once succeeds, each peer — leader or follower — picks up the bucket handle via a plain metadata lookup that cannot hang.
Before any JetStream call, Initialize gates on waitForJetStreamReady so a freshly-started cluster whose metaleader is still electing produces a bounded wait rather than the multi-minute hang nats.go's AccountInfo documents on clustered topologies.
func (*NATSStorage) ListExecutions ¶ added in v0.0.5
func (s *NATSStorage) ListExecutions(ctx context.Context, jobID string, options *QueryOptions) ([]*ExecutionRecord, error)
ListExecutions returns execution records for a job with optional filtering, sorting, and pagination.
func (*NATSStorage) ListJobs ¶ added in v0.0.5
func (s *NATSStorage) ListJobs(ctx context.Context) ([]*JobData, error)
ListJobs returns all jobs from KV storage.
func (*NATSStorage) ListJobsByStatus ¶ added in v0.0.5
ListJobsByStatus returns jobs filtered by status from KV storage.
func (*NATSStorage) SaveExecution ¶ added in v0.0.5
func (s *NATSStorage) SaveExecution(ctx context.Context, record *ExecutionRecord) error
SaveExecution persists an execution record to KV storage.
type NATSStorageOption ¶ added in v0.0.5
type NATSStorageOption func(*NATSStorage)
NATSStorageOption configures a NATSStorage instance.
func WithNATSStorageExecBucket ¶ added in v0.0.5
func WithNATSStorageExecBucket(name string) NATSStorageOption
WithNATSStorageExecBucket sets the KV bucket name for execution records.
func WithNATSStorageJetStreamReadyTimeout ¶ added in v0.2.0
func WithNATSStorageJetStreamReadyTimeout(timeout time.Duration) NATSStorageOption
WithNATSStorageJetStreamReadyTimeout bounds how long Initialize waits for the JetStream metaleader to be reachable before issuing the first KV create. Same rationale as the scheduler's WithJetStreamReadyTimeout: every CreateOrUpdateKeyValue call issues AccountInfo internally, which nats.go documents will time out on clustered topologies if the metaleader is still electing. Zero disables the wait (legacy behaviour). Defaults to 30 s.
func WithNATSStorageJobBucket ¶ added in v0.0.5
func WithNATSStorageJobBucket(name string) NATSStorageOption
WithNATSStorageJobBucket sets the KV bucket name for job data.
func WithNATSStorageOnce ¶ added in v0.2.0
func WithNATSStorageOnce(fn OnceFunc) NATSStorageOption
WithNATSStorageOnce installs a distributed-once implementation used to serialize the first-time CreateOrUpdateKeyValue calls in Initialize across processes. Storage and Scheduler use distinct keys (defaultStorageOnceKey vs. defaultSchedulerOnceKey) because their fn bodies provision different resource sets; sharing the same OnceFunc across both is fine (recommended in cluster deployments so both components serialize through the same lock substrate), but the keys must stay distinct or one side's provisioning gets silently skipped.
When unset, Initialize uses an internal JetStream-KV-backed Once with the same default lock bucket name (SCHEDULER_LOCKS) as NATSScheduler, so the two still share a single lock bucket across the cluster.
func WithNATSStorageOnceKey ¶ added in v0.2.0
func WithNATSStorageOnceKey(key string) NATSStorageOption
WithNATSStorageOnceKey overrides the key this package hands to its OnceFunc. Defaults to defaultStorageOnceKey ("scheduler.storage-init"). Override when sharing a distributed-lock substrate with other modules and your global naming scheme expects a different value. Keep the value distinct from NATSScheduler's WithOnceKey so each component's provisioning runs (see WithNATSStorageOnce).
func WithNATSStorageOnceLockBucket ¶ added in v0.2.0
func WithNATSStorageOnceLockBucket(name string) NATSStorageOption
WithNATSStorageOnceLockBucket overrides the JetStream KV bucket name used by the built-in Once. Has no effect when WithNATSStorageOnce installed a custom implementation. Defaults to SCHEDULER_LOCKS — the same default as NATSScheduler so both serialize against one bucket out of the box.
type OnceFunc ¶ added in v0.2.0
OnceFunc serializes execution of fn across processes that share the same distributed-coordination substrate. The contract:
- Exactly one caller (per key, per fn-success cycle) runs fn.
- Concurrent callers block until that fn returns, then return nil.
- Subsequent callers (after fn has succeeded) return nil without running fn.
Implementations may treat the "fn has succeeded" memory as bounded (e.g. expiring after some TTL); callers must therefore make fn idempotent so a delayed late-joiner that re-runs fn does not corrupt state. CreateOrUpdateKeyValue / CreateOrUpdateStream are idempotent in this sense, which is why Once is the right primitive for serializing scheduler bootstrap.
Callers can inject a custom OnceFunc via WithOnce to share a lock substrate with other modules; otherwise the scheduler uses its own built-in JetStream-KV-backed implementation.
type OnceSchedule ¶
type OnceSchedule struct {
// contains filtered or unexported fields
}
OnceSchedule runs a job only once at a specific time.
func NewOnceSchedule ¶
func NewOnceSchedule(runTime time.Time) (*OnceSchedule, error)
NewOnceSchedule creates a new OnceSchedule.
func (*OnceSchedule) Next ¶
func (s *OnceSchedule) Next(t time.Time) time.Time
Next returns the scheduled run time if it's still in the future, otherwise a far future time.
func (*OnceSchedule) RunAt ¶
func (s *OnceSchedule) RunAt() time.Time
RunAt returns the configured run time.
type QueryOptions ¶
type QueryOptions struct {
// Limit limits the number of records to return
Limit int
// Offset skips the specified number of records
Offset int
// StartTime filters records after this time
StartTime *time.Time
// EndTime filters records before this time
EndTime *time.Time
// Status filters records by status
Status *JobStatus
// SortBy specifies the field to sort by (e.g., "start_time", "duration")
SortBy string
// SortDesc specifies descending sort order
SortDesc bool
}
QueryOptions defines options for querying execution records
type RescheduleFailureFunc ¶ added in v0.0.8
RescheduleFailureFunc is invoked when executeJob has run the handler successfully (or with error) but failed to enqueue the next-tick scheduled message even after retries. It is the hook callers use to plug in a dead-letter or out-of-band reconciler. nextRun is the scheduled time of the publish that failed.
Invoked from a background goroutine; the callback must be safe to call concurrently and should not block for long.
type Schedule ¶
type Schedule interface {
// Next calculates the next run time based on the current time
Next(t time.Time) time.Time
}
Schedule defines the scheduling strategy for a job
type ScheduleBuilder ¶
type ScheduleBuilder interface {
// Every creates a schedule that runs at fixed intervals
Every(duration time.Duration) (Schedule, error)
// Cron creates a schedule from a cron expression
Cron(expression string) (Schedule, error)
// At creates a schedule that runs at a specific time daily
At(hour, minute, second int) (Schedule, error)
// Once creates a schedule that runs only once at the specified time
Once(t time.Time) (Schedule, error)
}
ScheduleBuilder provides methods to create common scheduling strategies
type ScheduleCodec ¶
type ScheduleCodec interface {
// Encode converts a schedule into a serializable representation
Encode(schedule Schedule) (scheduleType string, scheduleConfig string, err error)
// Decode reconstructs a schedule from its serialized representation
Decode(scheduleType string, scheduleConfig string) (Schedule, error)
}
ScheduleCodec encodes and decodes schedules for persistence
type Scheduler ¶
type Scheduler interface {
// Start begins the scheduler execution
Start(ctx context.Context) error
// WaitUntilRunning blocks until the scheduler has fully started or the context is done
WaitUntilRunning(ctx context.Context) error
// Stop gracefully shuts down the scheduler
Stop(ctx context.Context) error
// AddJob registers a new job with the scheduler
AddJob(id string, schedule Schedule, metadata map[string]string) error
// UpdateJobSchedule replaces the schedule for an existing job
UpdateJobSchedule(id string, schedule Schedule) error
// RemoveJob removes a job from the scheduler
RemoveJob(id string) error
// GetJob retrieves a job by its ID
GetJob(id string) (Job, error)
// ListJobs returns all registered jobs
ListJobs() []Job
// IsRunning returns whether the scheduler is currently running
IsRunning() bool
}
Scheduler defines the interface for job scheduling operations
func NewNATSScheduler ¶ added in v0.0.5
func NewNATSScheduler(js jetstream.JetStream, handler JobHandler, opts ...NATSSchedulerOption) Scheduler
NewNATSScheduler creates a new Scheduler backed by NATS JetStream. Requires NATS Server 2.12+ with JetStream enabled and AllowMsgSchedules support.
The scheduler uses:
- A JetStream stream with scheduled delivery for triggering jobs at their scheduled times
- A JetStream KV Store for persisting job metadata and execution records
- A durable consumer for reliable message consumption with automatic failover
Example:
nc, _ := nats.Connect(nats.DefaultURL) js, _ := jetstream.New(nc) s := scheduler.NewNATSScheduler(js, handler) s.Start(ctx)
func NewScheduler ¶
func NewScheduler(storage Storage, handler JobHandler, codec ScheduleCodec) Scheduler
NewScheduler creates a new Scheduler instance
type StartAtIntervalSchedule ¶
type StartAtIntervalSchedule struct {
// contains filtered or unexported fields
}
StartAtIntervalSchedule runs a job at a fixed interval starting from a specific time.
func NewStartAtIntervalSchedule ¶
func NewStartAtIntervalSchedule(startAt time.Time, interval time.Duration) (*StartAtIntervalSchedule, error)
NewStartAtIntervalSchedule creates a new StartAtIntervalSchedule.
func (*StartAtIntervalSchedule) Interval ¶
func (s *StartAtIntervalSchedule) Interval() time.Duration
Interval returns the configured interval.
func (*StartAtIntervalSchedule) Next ¶
func (s *StartAtIntervalSchedule) Next(t time.Time) time.Time
Next returns the next run time based on the configured start time and interval.
func (*StartAtIntervalSchedule) StartAt ¶
func (s *StartAtIntervalSchedule) StartAt() time.Time
StartAt returns the configured start time.
type Storage ¶
type Storage interface {
// Initialize prepares the storage for use
Initialize(ctx context.Context) error
// Close releases storage resources
Close(ctx context.Context) error
// SaveJob persists job data to storage
SaveJob(ctx context.Context, job *JobData) error
// UpdateJob updates existing job data in storage
UpdateJob(ctx context.Context, job *JobData) error
// DeleteJob removes job data from storage
DeleteJob(ctx context.Context, jobID string) error
// GetJob retrieves job data by ID
GetJob(ctx context.Context, jobID string) (*JobData, error)
// ListJobs returns all jobs in storage
ListJobs(ctx context.Context) ([]*JobData, error)
// ListJobsByStatus returns jobs filtered by status
ListJobsByStatus(ctx context.Context, status JobStatus) ([]*JobData, error)
// SaveExecution persists an execution record
SaveExecution(ctx context.Context, record *ExecutionRecord) error
// GetExecution retrieves a specific execution record
GetExecution(ctx context.Context, executionID string) (*ExecutionRecord, error)
// ListExecutions returns execution records for a job with optional filters
ListExecutions(ctx context.Context, jobID string, options *QueryOptions) ([]*ExecutionRecord, error)
// DeleteExecutions removes execution records older than the specified time
DeleteExecutions(ctx context.Context, jobID string, before time.Time) error
// HealthCheck verifies the storage connection is healthy
HealthCheck(ctx context.Context) error
}
Storage defines the interface for persistent storage operations
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
examples
|
|
|
cron_example
command
|
|
|
cron_spec_example
command
|
|
|
cron_with_gorm
command
|
|
|
migrate_gorm_to_nats
command
|
|
|
nats_jetstream
command
|