scheduler

package module
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 18, 2026 License: Apache-2.0 Imports: 16 Imported by: 0

README

Scheduler

A flexible and powerful job scheduler library for Go with pluggable storage backends.

Go Version License

Requirements

Component Version Notes
Go 1.24+ Required by NATS client dependency
NATS Server 2.12+ Only for NATS JetStream scheduler; must have JetStream enabled

The standard scheduler (NewScheduler) with In-Memory or GORM storage has no NATS dependency and works with Go 1.23+.

Features

  • 🚀 Simple API - Easy to use interface for scheduling jobs
  • 💾 Pluggable Storage - Support for multiple storage backends (In-Memory, GORM/SQL, NATS JetStream KV)
  • High Performance - Efficient job execution with concurrent support
  • 🔄 Flexible Scheduling - Support for interval-based, cron expressions, and one-time schedules
  • 📊 Execution History - Track job execution records with rich query capabilities
  • 🛡️ Thread-Safe - Safe for concurrent use
  • 🌐 Distributed Ready - Optional NATS JetStream backend with built-in distributed scheduling
  • 🎯 Production Ready - Comprehensive test coverage

Installation

go get github.com/Weedbox/scheduler
For GORM Storage Support
go get gorm.io/gorm
go get gorm.io/driver/sqlite  # or postgres, mysql, etc.
For NATS JetStream Support
go get github.com/nats-io/nats.go

Requires NATS Server 2.12+ with JetStream enabled.

Quick Start

Basic Usage with In-Memory Storage
package main

import (
    "context"
    "fmt"
    "time"

    "github.com/Weedbox/scheduler"
)

func main() {
    // Create scheduler with in-memory storage
    storage := scheduler.NewMemoryStorage()
    handler := func(ctx context.Context, event scheduler.JobEvent) error {
        switch event.ID() {
        case "my-job":
            fmt.Println("Job executed at", time.Now())
        }
        return nil
    }

    sched := scheduler.NewScheduler(storage, handler, scheduler.NewBasicScheduleCodec())

    // Start the scheduler
    ctx := context.Background()
    if err := sched.Start(ctx); err != nil {
        panic(err)
    }
    if err := sched.WaitUntilRunning(ctx); err != nil {
        panic(err)
    }
    defer sched.Stop(ctx)

    // Create a schedule (runs every 5 seconds)
    schedule, err := scheduler.NewIntervalSchedule(5 * time.Second)
    if err != nil {
        panic(err)
    }

    // Add the job to scheduler
    if err := sched.AddJob("my-job", schedule, map[string]string{"type": "print"}); err != nil {
        panic(err)
    }

    // Update the schedule later without removing the job
    startAt := time.Now().Add(1 * time.Minute)
    updated, err := scheduler.NewStartAtIntervalSchedule(startAt, 5*time.Second)
    if err != nil {
        panic(err)
    }
    if err := sched.UpdateJobSchedule("my-job", updated); err != nil {
        panic(err)
    }

    // Keep running
    time.Sleep(30 * time.Second)
}
Using GORM Storage (Persistent)
package main

import (
    "context"
    "fmt"
    "time"

    "github.com/Weedbox/scheduler"
    "gorm.io/driver/sqlite"
    "gorm.io/gorm"
)

func main() {
    // Create database connection
    db, err := gorm.Open(sqlite.Open("scheduler.db"), &gorm.Config{})
    if err != nil {
        panic(err)
    }

    // Create scheduler with GORM storage
    storage := scheduler.NewGormStorage(db)
    handler := func(ctx context.Context, event scheduler.JobEvent) error {
        switch event.Metadata()["task"] {
        case "cleanup":
            fmt.Println("Running cleanup at", time.Now())
        }
        return nil
    }

    sched := scheduler.NewScheduler(storage, handler, scheduler.NewBasicScheduleCodec())

    // Start the scheduler
    ctx := context.Background()
    if err := sched.Start(ctx); err != nil {
        panic(err)
    }
    if err := sched.WaitUntilRunning(ctx); err != nil {
        panic(err)
    }
    defer sched.Stop(ctx)

    // Add your jobs...
}
Using NATS JetStream (Distributed)

The NATS JetStream scheduler replaces the polling mechanism with JetStream's native scheduled message delivery. It provides built-in distributed scheduling, automatic failover, and persistence via JetStream KV Store — no external database required.

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/Weedbox/scheduler"
    "github.com/nats-io/nats.go"
    "github.com/nats-io/nats.go/jetstream"
)

func main() {
    // Connect to NATS
    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        panic(err)
    }
    defer nc.Close()

    js, err := jetstream.New(nc)
    if err != nil {
        panic(err)
    }

    // Create NATS scheduler
    handler := func(ctx context.Context, event scheduler.JobEvent) error {
        fmt.Printf("Job %s executed at %s\n", event.ID(), time.Now())
        return nil
    }

    sched := scheduler.NewNATSScheduler(js, handler)

    ctx := context.Background()
    if err := sched.Start(ctx); err != nil {
        panic(err)
    }
    defer sched.Stop(ctx)

    // Add jobs — same API as the standard scheduler
    schedule, _ := scheduler.NewIntervalSchedule(10 * time.Second)
    sched.AddJob("heartbeat", schedule, map[string]string{"type": "interval"})

    cronSchedule, _ := scheduler.NewCronSchedule("0 10 * * 5")
    sched.AddJob("weekly-report", cronSchedule, map[string]string{"type": "cron"})

    // Jobs are persisted in NATS JetStream KV and survive restarts
    select {}
}

Core Concepts

Scheduler

The Scheduler is the main component that manages job execution.

type Scheduler interface {
    Start(ctx context.Context) error
    WaitUntilRunning(ctx context.Context) error
    Stop(ctx context.Context) error
    AddJob(id string, schedule Schedule, metadata map[string]string) error
    UpdateJobSchedule(id string, schedule Schedule) error
    RemoveJob(id string) error
    GetJob(id string) (Job, error)
    ListJobs() []Job
    IsRunning() bool
}

Call WaitUntilRunning when another goroutine handles Start, or when you need to ensure initialization (storage loading, ticker setup, etc.) has completed before proceeding with dependent work.

Use UpdateJobSchedule to swap the schedule for an existing job without removing it:

newSchedule, _ := scheduler.NewStartAtIntervalSchedule(time.Now().Add(5*time.Minute), 5*time.Minute)
if err := sched.UpdateJobSchedule("my-job", newSchedule); err != nil {
    // handle update failure
}

Directly editing ScheduleType or ScheduleConfig in storage does not change the in-memory scheduler; always use UpdateJobSchedule (or remove/re-add) for runtime updates.

Job

A Job represents a scheduled task with its metadata.

type Job interface {
    ID() string
    NextRun() time.Time
    LastRun() time.Time
    IsRunning() bool
    Metadata() map[string]string
}
JobHandler

A job handler is provided when creating the scheduler and receives every execution.

type JobHandler func(ctx context.Context, event JobEvent) error

Inside the handler, use event.ID() (or event.Name()) and event.Metadata() to dispatch to the correct business logic.

The JobEvent passed to the handler also includes rich scheduling context:

  • event.Schedule() returns the schedule instance when available (e.g. *IntervalSchedule).
  • event.ScheduledAt() is the intended execution time based on the schedule.
  • event.StartedAt() is when the handler was actually invoked, and event.Delay() is the difference.
  • event.LastCompletedAt() reports when the job last finished (zero value if it has never run).
Schedule

The Schedule interface defines when a job should run.

type Schedule interface {
    Next(t time.Time) time.Time
}
ScheduleCodec

When persistence is enabled, a schedule codec serializes schedules into storage-friendly values and rehydrates them on restart.

type ScheduleCodec interface {
    Encode(schedule Schedule) (scheduleType string, scheduleConfig string, err error)
    Decode(scheduleType string, scheduleConfig string) (Schedule, error)
}

codec := scheduler.NewBasicScheduleCodec()

The basic codec supports IntervalSchedule, StartAtIntervalSchedule, and OnceSchedule. Custom codecs can be supplied for additional schedule types.

Storage

The Storage interface allows you to persist job data and execution history.

type Storage interface {
    Initialize(ctx context.Context) error
    Close(ctx context.Context) error
    SaveJob(ctx context.Context, job *JobData) error
    UpdateJob(ctx context.Context, job *JobData) error
    DeleteJob(ctx context.Context, jobID string) error
    GetJob(ctx context.Context, jobID string) (*JobData, error)
    ListJobs(ctx context.Context) ([]*JobData, error)
    ListJobsByStatus(ctx context.Context, status JobStatus) ([]*JobData, error)
    SaveExecution(ctx context.Context, record *ExecutionRecord) error
    GetExecution(ctx context.Context, executionID string) (*ExecutionRecord, error)
    ListExecutions(ctx context.Context, jobID string, options *QueryOptions) ([]*ExecutionRecord, error)
    DeleteExecutions(ctx context.Context, jobID string, before time.Time) error
    HealthCheck(ctx context.Context) error
}

Storage Backends

In-Memory Storage

Best for testing, development, or when persistence is not required.

storage := scheduler.NewMemoryStorage()

Pros:

  • Fast and simple
  • No external dependencies
  • Perfect for testing

Cons:

  • Data lost on restart
  • Not suitable for distributed systems
GORM Storage

Production-ready storage backed by SQL databases.

import (
    "gorm.io/driver/postgres"  // or mysql, sqlite, sqlserver
    "gorm.io/gorm"
)

db, _ := gorm.Open(postgres.Open(dsn), &gorm.Config{})
storage := scheduler.NewGormStorage(db)

Supported Databases:

  • PostgreSQL
  • MySQL
  • SQLite
  • SQL Server
  • Any GORM-supported database

Pros:

  • Persistent storage
  • ACID transactions
  • Advanced querying
  • Production-ready

Features:

  • Automatic schema migration
  • Indexed fields for performance
  • JSON metadata support
  • Health check support
NATS JetStream Storage

Implements the Storage interface using NATS JetStream KV Store. Can be used with the standard polling scheduler as a drop-in storage replacement.

import (
    "github.com/nats-io/nats.go"
    "github.com/nats-io/nats.go/jetstream"
)

nc, _ := nats.Connect(nats.DefaultURL)
js, _ := jetstream.New(nc)

storage := scheduler.NewNATSStorage(js,
    scheduler.WithNATSStorageJobBucket("MY_JOBS"),
    scheduler.WithNATSStorageExecBucket("MY_EXECS"),
)

// Use with the standard polling scheduler
sched := scheduler.NewScheduler(storage, handler, scheduler.NewBasicScheduleCodec())
NATS JetStream Scheduler

A dedicated scheduler implementation that uses JetStream's native scheduled message delivery (AllowMsgSchedules) instead of polling.

Requires NATS Server 2.12+ with JetStream enabled (nats-server -js). Start() will return ErrNATSServerTooOld if the connected server version is below 2.12, or if the server silently ignores AllowMsgSchedules.

nc, _ := nats.Connect(nats.DefaultURL)
js, _ := jetstream.New(nc)

sched := scheduler.NewNATSScheduler(js, handler,
    scheduler.WithNATSStreamName("SCHEDULER"),
    scheduler.WithNATSSubjectPrefix("scheduler"),
    scheduler.WithNATSConsumerName("scheduler-worker"),
    scheduler.WithNATSSchedulerJobBucket("SCHEDULER_JOBS"),
    scheduler.WithNATSSchedulerExecBucket("SCHEDULER_EXECUTIONS"),
)

Architecture:

NewNATSScheduler
├── JetStream Stream (AllowMsgSchedules)
│   └── Messages with "Nats-Scheduled-Delivery: @at <time>" header
├── JetStream Consumer (durable, work queue)
│   └── Receives triggered messages → executes JobHandler → schedules next run
└── JetStream KV Store
    ├── Job metadata (schedule, status, next/last run)
    └── Execution records (start/end time, duration, errors)

Comparison with standard scheduler:

Standard (NewScheduler) NATS JetStream (NewNATSScheduler)
Triggering Polls every second JetStream delivers at scheduled time
Persistence Pluggable Storage interface JetStream KV Store (built-in)
Distributed Requires external coordination Built-in via consumer ack mechanism
Failover Manual implementation Automatic message redelivery
Scaling Single instance Multiple workers sharing a consumer
Requirements None (or a database) NATS Server 2.12+ with JetStream

Distributed usage:

Multiple workers can share the same NATS scheduler. JetStream's consumer ensures each job trigger is delivered to exactly one worker:

# Terminal 1 — first worker
NATS_URL=nats://localhost:4222 go run .

# Terminal 2 — second worker (same consumer group, auto load-balanced)
NATS_URL=nats://localhost:4222 go run .

Configuration options:

Option Default Description
WithNATSStreamName SCHEDULER JetStream stream name
WithNATSSubjectPrefix scheduler NATS subject prefix for job messages
WithNATSConsumerName scheduler-worker Durable consumer name
WithNATSSchedulerJobBucket SCHEDULER_JOBS KV bucket for job metadata
WithNATSSchedulerExecBucket SCHEDULER_EXECUTIONS KV bucket for execution records
WithNATSSchedulerCodec BasicScheduleCodec Schedule encoder/decoder

Storage Migration

MigrateStorage copies all jobs and execution records from one storage backend to another. It works with any combination of Storage implementations.

func MigrateStorage(ctx context.Context, src Storage, dst Storage) (*MigrateResult, error)
GORM → NATS JetStream Migration
// 1. Stop the old scheduler
oldSched.Stop(ctx)

// 2. Set up source and destination
gormStorage := scheduler.NewGormStorage(db)
gormStorage.Initialize(ctx)

natsStorage := scheduler.NewNATSStorage(js)
natsStorage.Initialize(ctx)

// 3. Migrate — one line
result, err := scheduler.MigrateStorage(ctx, gormStorage, natsStorage)
fmt.Printf("Migrated %d jobs, %d executions\n", result.JobsMigrated, result.ExecutionsMigrated)

// 4. Switch to NATS scheduler — jobs resume automatically from KV
sched := scheduler.NewNATSScheduler(js, handler)
sched.Start(ctx)
Migration Result
type MigrateResult struct {
    JobsMigrated       int  // jobs successfully copied
    JobsSkipped        int  // jobs already in destination (skipped)
    ExecutionsMigrated int  // execution records copied
    ExecutionsSkipped  int  // execution records already in destination (skipped)
}

Key properties:

  • Generic — works between any Storage implementations (GORM→NATS, Memory→GORM, etc.)
  • Idempotent — safe to run multiple times; existing data is skipped
  • Non-destructive — source data is only read, never modified

See the migration example for a complete working demo.

Examples

Example 1: Interval-Based Schedule
schedule, err := scheduler.NewIntervalSchedule(10 * time.Minute)
if err != nil {
    panic(err)
}

metadata := map[string]string{"job_type": "backup"}
if err := sched.AddJob("backup-job", schedule, metadata); err != nil {
    panic(err)
}
Example 2: One-Time Schedule
runAt := time.Now().Add(1 * time.Hour)
schedule, err := scheduler.NewOnceSchedule(runAt)
if err != nil {
    panic(err)
}

if err := sched.AddJob("one-time-task", schedule, map[string]string{"task": "report"}); err != nil {
    panic(err)
}
Example 3: Cron-Based Schedule
Using Cron Expression
// Every Friday at 10:00 AM
schedule, err := scheduler.NewCronSchedule("0 10 * * 5")
if err != nil {
    panic(err)
}

if err := sched.AddJob("weekly-report", schedule, map[string]string{"task": "report"}); err != nil {
    panic(err)
}

// Other cron examples:
// "30 14 * * *"   - Every day at 2:30 PM
// "0 0 1 * *"     - First day of every month at midnight
// "*/5 * * * *"   - Every 5 minutes
// "0 9 * * 1-5"   - Monday to Friday at 9:00 AM
Using CronSpec (Structured API)
// Every Friday at 10:00 AM - using structured fields
spec := &scheduler.CronSpec{
    Minute:    "0",
    Hour:      "10",
    DayOfWeek: "5",  // Friday
}
schedule, err := scheduler.NewCronScheduleFromSpec(spec)
if err != nil {
    panic(err)
}

if err := sched.AddJob("weekly-report", schedule, map[string]string{"task": "report"}); err != nil {
    panic(err)
}

// More examples:
// Every day at 2:30 PM
spec := &scheduler.CronSpec{Minute: "30", Hour: "14"}

// Every 5 minutes
spec := &scheduler.CronSpec{Minute: "*/5"}

// Monday to Friday at 9:00 AM
spec := &scheduler.CronSpec{
    Minute:    "0",
    Hour:      "9",
    DayOfWeek: "1-5",
}
Example 4: Query Execution History
ctx := context.Background()

// Get all executions for a job
executions, err := storage.ListExecutions(ctx, "my-job", nil)

// Query with filters
filterTime := time.Now().Add(-24 * time.Hour)
completedStatus := scheduler.JobStatusCompleted

options := &scheduler.QueryOptions{
    StartTime: &filterTime,           // Only after this time
    Status:    &completedStatus,      // Only completed
    SortBy:    "start_time",          // Sort by start time
    SortDesc:  true,                  // Descending order
    Limit:     10,                    // Max 10 records
    Offset:    0,                     // Starting from first
}

executions, err := storage.ListExecutions(ctx, "my-job", options)
if err != nil {
    panic(err)
}

for _, exec := range executions {
    fmt.Printf("Execution %s: Status=%s, Duration=%s\n",
        exec.ExecutionID, exec.Status, exec.Duration)
}
Example 5: Job with Error Handling
handler := func(ctx context.Context, event scheduler.JobEvent) error {
    if event.ID() != "work-job" {
        return nil
    }

    // Do some work
    if err := doWork(); err != nil {
        // Error will be recorded in execution history
        return fmt.Errorf("work failed: %w", err)
    }
    return nil
}

storage := scheduler.NewMemoryStorage()
sched := scheduler.NewScheduler(storage, handler, scheduler.NewBasicScheduleCodec())

ctx := context.Background()
if err := sched.Start(ctx); err != nil {
    panic(err)
}
defer sched.Stop(ctx)

schedule, err := scheduler.NewIntervalSchedule(15 * time.Minute)
if err != nil {
    panic(err)
}

if err := sched.AddJob("work-job", schedule, map[string]string{"task": "worker"}); err != nil {
    panic(err)
}

// Later, check for failed executions
failedStatus := scheduler.JobStatusFailed
options := &scheduler.QueryOptions{
    Status: &failedStatus,
}

failedExecutions, _ := storage.ListExecutions(ctx, "work-job", options)
for _, exec := range failedExecutions {
    fmt.Printf("Failed: %s - Error: %s\n", exec.ExecutionID, exec.Error)
}
Example 6: Graceful Shutdown
func main() {
    storage := scheduler.NewMemoryStorage()
    runJob := func(ctx context.Context) error {
        fmt.Println("Processing job1 at", time.Now())
        return nil
    }
    handler := func(ctx context.Context, event scheduler.JobEvent) error {
        switch event.ID() {
        case "job1":
            return runJob(ctx)
        }
        return nil
    }

    sched := scheduler.NewScheduler(storage, handler, scheduler.NewBasicScheduleCodec())

    ctx := context.Background()
    if err := sched.Start(ctx); err != nil {
        panic(err)
    }

    // Add jobs...
    schedule, err := scheduler.NewIntervalSchedule(5 * time.Minute)
    if err != nil {
        panic(err)
    }

    if err := sched.AddJob("job1", schedule, map[string]string{"task": "worker"}); err != nil {
        panic(err)
    }

    // Handle shutdown signals
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)

    <-sigChan
    fmt.Println("Shutting down gracefully...")

    // Stop will wait for running jobs to complete
    if err := sched.Stop(ctx); err != nil {
        fmt.Printf("Error during shutdown: %v\n", err)
    }

    fmt.Println("Shutdown complete")
}
Example 7: Multiple Database Support
// PostgreSQL
import "gorm.io/driver/postgres"

dsn := "host=localhost user=gorm password=gorm dbname=scheduler port=5432"
db, _ := gorm.Open(postgres.Open(dsn), &gorm.Config{})
storage := scheduler.NewGormStorage(db)

// MySQL
import "gorm.io/driver/mysql"

dsn := "user:pass@tcp(127.0.0.1:3306)/scheduler?charset=utf8mb4"
db, _ := gorm.Open(mysql.Open(dsn), &gorm.Config{})
storage := scheduler.NewGormStorage(db)

// SQLite
import "gorm.io/driver/sqlite"

db, _ := gorm.Open(sqlite.Open("scheduler.db"), &gorm.Config{})
storage := scheduler.NewGormStorage(db)
Example 8: Job Metadata
// Save job with metadata
job := &scheduler.JobData{
    ID:             "report-job",
    ScheduleType:   "cron",
    ScheduleConfig: "0 0 * * *",
    Status:         scheduler.JobStatusPending,
    CreatedAt:      time.Now(),
    UpdatedAt:      time.Now(),
    Metadata: map[string]string{
        "department": "sales",
        "report_type": "monthly",
        "recipients": "team@example.com",
    },
}

storage.SaveJob(ctx, job)

// Retrieve and use metadata
retrieved, _ := storage.GetJob(ctx, "report-job")
department := retrieved.Metadata["department"]
fmt.Printf("Generating %s report for %s\n",
    retrieved.Metadata["report_type"], department)
Example 9: Clean Up Old Executions
// Delete execution records older than 30 days
cutoffTime := time.Now().Add(-30 * 24 * time.Hour)

err := storage.DeleteExecutions(ctx, "my-job", cutoffTime)
if err != nil {
    panic(err)
}

fmt.Println("Old execution records cleaned up")

API Reference

Scheduler Methods
Start
Start(ctx context.Context) error

Starts the scheduler and initializes storage.

Stop
Stop(ctx context.Context) error

Gracefully stops the scheduler, waiting for running jobs to complete.

AddJob
AddJob(id string, schedule Schedule, metadata map[string]string) error

Registers a new job with the scheduler.

Errors:

  • ErrEmptyJobID: Job ID is empty
  • ErrJobAlreadyExists: Job with this ID already exists
  • ErrInvalidInterval: Schedule is nil
  • Additional errors returned by the configured ScheduleCodec or storage implementation
RemoveJob
RemoveJob(id string) error

Removes a job from the scheduler. Waits for the job to complete if running.

Errors:

  • ErrEmptyJobID: Job ID is empty
  • ErrJobNotFound: Job does not exist
GetJob
GetJob(id string) (Job, error)

Retrieves a job by its ID.

ListJobs
ListJobs() []Job

Returns all registered jobs.

IsRunning
IsRunning() bool

Returns whether the scheduler is currently running.

Storage Methods
Initialize
Initialize(ctx context.Context) error

Prepares the storage for use (creates tables, etc.).

Close
Close(ctx context.Context) error

Releases storage resources and closes connections.

SaveJob
SaveJob(ctx context.Context, job *JobData) error

Persists a new job to storage.

UpdateJob
UpdateJob(ctx context.Context, job *JobData) error

Updates an existing job in storage.

DeleteJob
DeleteJob(ctx context.Context, jobID string) error

Removes a job from storage.

ListJobsByStatus
ListJobsByStatus(ctx context.Context, status JobStatus) ([]*JobData, error)

Returns jobs filtered by status.

Statuses:

  • JobStatusPending: Job is waiting to run
  • JobStatusRunning: Job is currently executing
  • JobStatusCompleted: Job completed successfully
  • JobStatusFailed: Job execution failed
  • JobStatusDisabled: Job is disabled
SaveExecution
SaveExecution(ctx context.Context, record *ExecutionRecord) error

Saves a job execution record.

ListExecutions
ListExecutions(ctx context.Context, jobID string, options *QueryOptions) ([]*ExecutionRecord, error)

Lists execution records with optional filters.

QueryOptions Fields:

  • Limit: Maximum number of records
  • Offset: Number of records to skip
  • StartTime: Filter records after this time
  • EndTime: Filter records before this time
  • Status: Filter by execution status
  • SortBy: Field to sort by ("start_time", "duration", "end_time")
  • SortDesc: Sort in descending order
HealthCheck
HealthCheck(ctx context.Context) error

Verifies storage connection is healthy.

Data Structures

JobData
type JobData struct {
    ID             string
    ScheduleType   string
    ScheduleConfig string
    Status         JobStatus
    NextRun        time.Time
    LastRun        time.Time
    CreatedAt      time.Time
    UpdatedAt      time.Time
    Metadata       map[string]string
}
ExecutionRecord
type ExecutionRecord struct {
    JobID       string
    ExecutionID string
    StartTime   time.Time
    EndTime     time.Time
    Duration    time.Duration
    Status      JobStatus
    Error       string
    Metadata    map[string]string
}
QueryOptions
type QueryOptions struct {
    Limit     int
    Offset    int
    StartTime *time.Time
    EndTime   *time.Time
    Status    *JobStatus
    SortBy    string
    SortDesc  bool
}

Database Schema (GORM)

scheduler_jobs Table
Column Type Index Description
id string PK Job identifier
schedule_type string Schedule type
schedule_config text Schedule configuration
status string Yes Current job status
next_run timestamp Yes Next execution time
last_run timestamp Last execution time
created_at timestamp Yes Creation time
updated_at timestamp Yes Last update time
metadata text JSON-encoded metadata
scheduler_executions Table
Column Type Index Description
execution_id string PK Execution identifier
job_id string Yes Associated job ID
start_time timestamp Yes Execution start time
end_time timestamp Execution end time
duration int64 Duration in nanoseconds
status string Yes Execution status
error text Error message if failed
metadata text JSON-encoded metadata

Error Handling

Common Errors
var (
    ErrSchedulerNotStarted       = errors.New("scheduler not started")
    ErrSchedulerAlreadyStarted   = errors.New("scheduler already started")
    ErrSchedulerStopped          = errors.New("scheduler stopped")
    ErrStorageNotInitialized     = errors.New("storage not initialized")
    ErrJobNotFound               = errors.New("job not found")
    ErrJobAlreadyExists          = errors.New("job already exists")
    ErrEmptyJobID                = errors.New("job ID cannot be empty")
    ErrInvalidInterval           = errors.New("invalid schedule interval")
    ErrInvalidScheduleTime       = errors.New("invalid schedule time")
    ErrInvalidScheduleConfig     = errors.New("invalid schedule configuration")
    ErrHandlerNotDefined         = errors.New("scheduler handler not defined")
    ErrInvalidJobData            = errors.New("invalid job data")
    ErrJobDataNotFound           = errors.New("job data not found")
    ErrExecutionHistoryNotFound  = errors.New("execution history not found")
    ErrNATSServerTooOld          = errors.New("NATS server does not support scheduled message delivery (requires 2.12+)")
)
Error Handling Example
metadata := map[string]string{"task": "worker"}
if err := sched.AddJob("my-job", schedule, metadata); err != nil {
    switch {
    case errors.Is(err, scheduler.ErrJobAlreadyExists):
        fmt.Println("Job already exists, skipping...")
    case errors.Is(err, scheduler.ErrEmptyJobID):
        fmt.Println("Invalid job ID")
    default:
        panic(err)
    }
}

Best Practices

1. Always Use Context
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

sched.Start(ctx)
2. Handle Shutdown Gracefully
defer func() {
    if err := sched.Stop(ctx); err != nil {
        log.Printf("Error stopping scheduler: %v", err)
    }
}()
3. Use Appropriate Storage
  • Development/Testing: Use MemoryStorage
  • Production (single instance): Use GormStorage with PostgreSQL or MySQL
  • Production (distributed): Use NewNATSScheduler with NATS JetStream
4. Monitor Job Execution
// Periodically check for failed jobs
ticker := time.NewTicker(1 * time.Hour)
go func() {
    for range ticker.C {
        checkFailedJobs(storage)
    }
}()
5. Set Reasonable Intervals
// Good: Reasonable interval
schedule, err := scheduler.NewIntervalSchedule(5 * time.Minute)
if err != nil {
    panic(err)
}

// Avoid: Too frequent, may cause performance issues
if _, err := scheduler.NewIntervalSchedule(100 * time.Millisecond); err != nil {
    panic(err)
}
6. Clean Up Old Executions
cleanupSchedule, err := scheduler.NewIntervalSchedule(24 * time.Hour)
if err != nil {
    panic(err)
}

if err := sched.AddJob("cleanup", cleanupSchedule, map[string]string{"task": "cleanup"}); err != nil {
    panic(err)
}

// Inside your handler:
// switch event.ID() {
// case "cleanup":
//     cutoff := time.Now().Add(-30 * 24 * time.Hour)
//     return storage.DeleteExecutions(ctx, "all-jobs", cutoff)
// }

Testing

Run all tests:

go test -v

Run with coverage:

go test -cover

Generate coverage report:

go test -coverprofile=coverage.out
go tool cover -html=coverage.out

Performance Considerations

  • In-Memory Storage: Fast, suitable for high-frequency jobs
  • GORM Storage: Optimized with indexes on frequently queried fields
  • NATS JetStream: Event-driven delivery eliminates polling overhead; consumer-based distribution scales horizontally
  • Concurrent Execution: Jobs run in separate goroutines
  • Graceful Shutdown: Waits for running jobs to complete

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

This project is licensed under the APL - see the LICENSE file for details.

Support

For issues, questions, or contributions, please visit the GitHub repository.

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrSchedulerNotStarted indicates the scheduler has not been started
	ErrSchedulerNotStarted = errors.New("scheduler not started")

	// ErrSchedulerAlreadyStarted indicates the scheduler is already running
	ErrSchedulerAlreadyStarted = errors.New("scheduler already started")

	// ErrSchedulerStopped indicates the scheduler has been stopped
	ErrSchedulerStopped = errors.New("scheduler stopped")

	// ErrInvalidInterval indicates an invalid schedule interval
	ErrInvalidInterval = errors.New("invalid schedule interval")

	// ErrInvalidCronExpression indicates an invalid cron expression
	ErrInvalidCronExpression = errors.New("invalid cron expression")

	// ErrInvalidScheduleTime indicates an invalid schedule time
	ErrInvalidScheduleTime = errors.New("invalid schedule time")

	// ErrInvalidScheduleConfig indicates an invalid serialized schedule configuration
	ErrInvalidScheduleConfig = errors.New("invalid schedule configuration")

	// ErrJobNotFound indicates the specified job does not exist
	ErrJobNotFound = errors.New("job not found")

	// ErrJobAlreadyExists indicates a job with the same ID already exists
	ErrJobAlreadyExists = errors.New("job already exists")

	// ErrEmptyJobID indicates the job ID cannot be empty
	ErrEmptyJobID = errors.New("job ID cannot be empty")

	// ErrHandlerNotDefined indicates the scheduler was created without a job handler
	ErrHandlerNotDefined = errors.New("scheduler handler not defined")
)

Pre-defined errors

View Source
var (
	// ErrStorageNotInitialized indicates the storage has not been initialized
	ErrStorageNotInitialized = errors.New("storage not initialized")

	// ErrStorageConnectionFailed indicates failed to connect to storage
	ErrStorageConnectionFailed = errors.New("storage connection failed")

	// ErrStorageOperationFailed indicates a storage operation failed
	ErrStorageOperationFailed = errors.New("storage operation failed")

	// ErrJobDataNotFound indicates the job data does not exist in storage
	ErrJobDataNotFound = errors.New("job data not found")

	// ErrJobDataAlreadyExists indicates the job data already exists in storage
	ErrJobDataAlreadyExists = errors.New("job data already exists")

	// ErrInvalidJobData indicates the job data is invalid or corrupted
	ErrInvalidJobData = errors.New("invalid job data")

	// ErrExecutionHistoryNotFound indicates no execution history found
	ErrExecutionHistoryNotFound = errors.New("execution history not found")
)

Pre-defined storage errors

View Source
var ErrNATSServerTooOld = errors.New("NATS server does not support scheduled message delivery (requires 2.12+)")

ErrNATSServerTooOld indicates the connected NATS server does not support scheduled message delivery (requires NATS 2.12+).

Functions

This section is empty.

Types

type BasicScheduleCodec

type BasicScheduleCodec struct{}

BasicScheduleCodec provides a schedule codec for the built-in schedules.

func NewBasicScheduleCodec

func NewBasicScheduleCodec() *BasicScheduleCodec

NewBasicScheduleCodec creates a new BasicScheduleCodec instance.

func (*BasicScheduleCodec) Decode

func (c *BasicScheduleCodec) Decode(scheduleType string, scheduleConfig string) (Schedule, error)

Decode implements ScheduleCodec.Decode.

func (*BasicScheduleCodec) Encode

func (c *BasicScheduleCodec) Encode(schedule Schedule) (string, string, error)

Encode implements ScheduleCodec.Encode.

type CronSchedule added in v0.0.2

type CronSchedule struct {
	// contains filtered or unexported fields
}

CronSchedule runs a job based on a cron expression. Supports standard cron format: minute hour day month weekday Examples:

  • "0 10 * * 5" - Every Friday at 10:00 AM
  • "30 14 * * *" - Every day at 2:30 PM
  • "0 0 1 * *" - First day of every month at midnight
  • "*/5 * * * *" - Every 5 minutes

func NewCronSchedule added in v0.0.2

func NewCronSchedule(expression string) (*CronSchedule, error)

NewCronSchedule creates a new CronSchedule from a cron expression. The expression follows standard cron format: minute hour day month weekday

func NewCronScheduleFromSpec added in v0.0.2

func NewCronScheduleFromSpec(spec *CronSpec) (*CronSchedule, error)

NewCronScheduleFromSpec creates a new CronSchedule from a CronSpec. This provides a more structured way to define cron schedules without using string expressions. Empty fields default to "*" (any value).

Examples:

  • Every Friday at 10:00 AM: NewCronScheduleFromSpec(&CronSpec{Minute: "0", Hour: "10", DayOfWeek: "5"})
  • Every day at 2:30 PM: NewCronScheduleFromSpec(&CronSpec{Minute: "30", Hour: "14"})
  • Every 5 minutes: NewCronScheduleFromSpec(&CronSpec{Minute: "*/5"})

func (*CronSchedule) Expression added in v0.0.2

func (s *CronSchedule) Expression() string

Expression returns the cron expression string.

func (*CronSchedule) Next added in v0.0.2

func (s *CronSchedule) Next(t time.Time) time.Time

Next returns the next run time based on the cron expression.

type CronSpec added in v0.0.2

type CronSpec struct {
	// Minute (0-59), default "*" (every minute)
	Minute string

	// Hour (0-23), default "*" (every hour)
	Hour string

	// DayOfMonth (1-31), default "*" (every day)
	DayOfMonth string

	// Month (1-12), default "*" (every month)
	Month string

	// DayOfWeek (0-6, Sunday=0), default "*" (every day)
	DayOfWeek string
}

CronSpec defines a cron schedule using structured fields instead of a string expression. This provides a more type-safe and intuitive way to define cron schedules.

Field values can be:

  • Specific number: e.g., "5" for day of week (Friday)
  • Wildcard: "*" for any value
  • Range: "1-5" for Monday to Friday
  • List: "1,3,5" for Monday, Wednesday, Friday
  • Step: "*/5" for every 5 units

Examples:

  • Every Friday at 10:00 AM: {Minute: "0", Hour: "10", DayOfWeek: "5"}
  • Every day at 2:30 PM: {Minute: "30", Hour: "14"}
  • First day of month at midnight: {Minute: "0", Hour: "0", DayOfMonth: "1"}
  • Every 5 minutes: {Minute: "*/5"}

type ExecutionRecord

type ExecutionRecord struct {
	// JobID is the identifier of the job
	JobID string

	// ExecutionID is the unique identifier of this execution
	ExecutionID string

	// StartTime is when the execution started
	StartTime time.Time

	// EndTime is when the execution completed
	EndTime time.Time

	// Duration is the execution duration
	Duration time.Duration

	// Status is the execution result status
	Status JobStatus

	// Error stores the error message if execution failed
	Error string

	// Metadata stores additional execution metadata
	Metadata map[string]string
}

ExecutionRecord represents a single job execution record

type ExecutionRecordModel

type ExecutionRecordModel struct {
	ExecutionID string    `gorm:"primaryKey;size:255"`
	JobID       string    `gorm:"size:255;index"`
	StartTime   time.Time `gorm:"index"`
	EndTime     time.Time
	Duration    int64  // nanoseconds
	Status      string `gorm:"size:20;index"`
	Error       string `gorm:"type:text"`
	Metadata    string `gorm:"type:text"` // JSON-encoded map[string]string
}

ExecutionRecordModel represents the database model for ExecutionRecord

func (ExecutionRecordModel) TableName

func (ExecutionRecordModel) TableName() string

TableName specifies the table name for ExecutionRecordModel

type GormStorage

type GormStorage struct {
	// contains filtered or unexported fields
}

GormStorage is a GORM-based implementation of the Storage interface

Example (Basic)

ExampleGormStorage_basic demonstrates basic usage of GORM storage

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/Weedbox/scheduler"
	"gorm.io/driver/sqlite"
	"gorm.io/gorm"
)

func main() {
	// Create a database connection (SQLite in-memory for this example)
	db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
	if err != nil {
		panic(err)
	}

	// Create GORM storage
	storage := scheduler.NewGormStorage(db)

	// Initialize storage
	ctx := context.Background()
	if err := storage.Initialize(ctx); err != nil {
		panic(err)
	}

	// Save a job
	job := &scheduler.JobData{
		ID:             "daily-backup",
		ScheduleType:   "cron",
		ScheduleConfig: "0 0 * * *",
		Status:         scheduler.JobStatusPending,
		NextRun:        time.Now().Add(24 * time.Hour),
		CreatedAt:      time.Now(),
		UpdatedAt:      time.Now(),
		Metadata:       map[string]string{"type": "backup"},
	}

	if err := storage.SaveJob(ctx, job); err != nil {
		panic(err)
	}

	// Retrieve the job
	retrieved, err := storage.GetJob(ctx, "daily-backup")
	if err != nil {
		panic(err)
	}

	fmt.Printf("Job ID: %s, Type: %s\n", retrieved.ID, retrieved.Metadata["type"])

	// Clean up
	storage.Close(ctx)

}
Output:
Job ID: daily-backup, Type: backup
Example (ExecutionRecords)

ExampleGormStorage_executionRecords demonstrates saving and querying execution records

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/Weedbox/scheduler"
	"gorm.io/driver/sqlite"
	"gorm.io/gorm"
)

func main() {
	db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
	if err != nil {
		panic(err)
	}

	storage := scheduler.NewGormStorage(db)
	ctx := context.Background()
	if err := storage.Initialize(ctx); err != nil {
		panic(err)
	}

	// Save execution records
	now := time.Now()
	for i := 0; i < 3; i++ {
		record := &scheduler.ExecutionRecord{
			JobID:       "job1",
			ExecutionID: fmt.Sprintf("exec-%d", i),
			StartTime:   now.Add(time.Duration(i) * time.Hour),
			EndTime:     now.Add(time.Duration(i)*time.Hour + 10*time.Minute),
			Duration:    10 * time.Minute,
			Status:      scheduler.JobStatusCompleted,
		}
		if err := storage.SaveExecution(ctx, record); err != nil {
			panic(err)
		}
	}

	// Query executions with limit
	options := &scheduler.QueryOptions{
		Limit:  2,
		SortBy: "start_time",
	}
	executions, err := storage.ListExecutions(ctx, "job1", options)
	if err != nil {
		panic(err)
	}

	fmt.Printf("Found %d executions\n", len(executions))

	storage.Close(ctx)

}
Output:
Found 2 executions
Example (ListJobsByStatus)

ExampleGormStorage_listJobsByStatus demonstrates querying jobs by status

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/Weedbox/scheduler"
	"gorm.io/driver/sqlite"
	"gorm.io/gorm"
)

func main() {
	db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
	if err != nil {
		panic(err)
	}

	storage := scheduler.NewGormStorage(db)
	ctx := context.Background()
	if err := storage.Initialize(ctx); err != nil {
		panic(err)
	}

	// Create jobs with different statuses
	statuses := []scheduler.JobStatus{
		scheduler.JobStatusPending,
		scheduler.JobStatusRunning,
		scheduler.JobStatusPending,
	}

	for i, status := range statuses {
		job := &scheduler.JobData{
			ID:        fmt.Sprintf("job-%d", i),
			Status:    status,
			CreatedAt: time.Now(),
			UpdatedAt: time.Now(),
		}
		if err := storage.SaveJob(ctx, job); err != nil {
			panic(err)
		}
	}

	// Query pending jobs
	pendingJobs, err := storage.ListJobsByStatus(ctx, scheduler.JobStatusPending)
	if err != nil {
		panic(err)
	}

	fmt.Printf("Pending jobs: %d\n", len(pendingJobs))

	storage.Close(ctx)

}
Output:
Pending jobs: 2
Example (WithScheduler)

ExampleGormStorage_withScheduler demonstrates using GORM storage with scheduler

package main

import (
	"context"
	"fmt"

	"github.com/Weedbox/scheduler"
	"gorm.io/driver/sqlite"
	"gorm.io/gorm"
)

func main() {
	// Create a database connection
	db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
	if err != nil {
		panic(err)
	}

	// Create GORM storage and scheduler
	storage := scheduler.NewGormStorage(db)
	jobFuncs := make(map[string]func(context.Context) error)
	handler := func(ctx context.Context, event scheduler.JobEvent) error {
		if fn, ok := jobFuncs[event.ID()]; ok && fn != nil {
			return fn(ctx)
		}
		return nil
	}
	sched := scheduler.NewScheduler(storage, handler, scheduler.NewBasicScheduleCodec())

	// Start scheduler (this will initialize storage)
	ctx := context.Background()
	if err := sched.Start(ctx); err != nil {
		panic(err)
	}

	// Note: In a real application, you would add jobs using a Schedule implementation
	// This example demonstrates that the scheduler can work with GORM storage

	// Stop scheduler (this will close storage)
	if err := sched.Stop(ctx); err != nil {
		panic(err)
	}

	fmt.Println("Scheduler started and stopped successfully with GORM storage")

}
Output:
Scheduler started and stopped successfully with GORM storage

func NewGormStorage

func NewGormStorage(db *gorm.DB) *GormStorage

NewGormStorage creates a new GORM storage instance

func (*GormStorage) Close

func (s *GormStorage) Close(ctx context.Context) error

Close releases storage resources

func (*GormStorage) DeleteExecutions

func (s *GormStorage) DeleteExecutions(ctx context.Context, jobID string, before time.Time) error

DeleteExecutions removes execution records older than the specified time

func (*GormStorage) DeleteJob

func (s *GormStorage) DeleteJob(ctx context.Context, jobID string) error

DeleteJob removes job data from storage

func (*GormStorage) GetExecution

func (s *GormStorage) GetExecution(ctx context.Context, executionID string) (*ExecutionRecord, error)

GetExecution retrieves a specific execution record

func (*GormStorage) GetJob

func (s *GormStorage) GetJob(ctx context.Context, jobID string) (*JobData, error)

GetJob retrieves job data by ID

func (*GormStorage) HealthCheck

func (s *GormStorage) HealthCheck(ctx context.Context) error

HealthCheck verifies the storage connection is healthy

func (*GormStorage) Initialize

func (s *GormStorage) Initialize(ctx context.Context) error

Initialize prepares the storage for use

func (*GormStorage) ListExecutions

func (s *GormStorage) ListExecutions(ctx context.Context, jobID string, options *QueryOptions) ([]*ExecutionRecord, error)

ListExecutions returns execution records for a job with optional filters

func (*GormStorage) ListJobs

func (s *GormStorage) ListJobs(ctx context.Context) ([]*JobData, error)

ListJobs returns all jobs in storage

func (*GormStorage) ListJobsByStatus

func (s *GormStorage) ListJobsByStatus(ctx context.Context, status JobStatus) ([]*JobData, error)

ListJobsByStatus returns jobs filtered by status

func (*GormStorage) SaveExecution

func (s *GormStorage) SaveExecution(ctx context.Context, record *ExecutionRecord) error

SaveExecution persists an execution record

func (*GormStorage) SaveJob

func (s *GormStorage) SaveJob(ctx context.Context, job *JobData) error

SaveJob persists job data to storage

func (*GormStorage) UpdateJob

func (s *GormStorage) UpdateJob(ctx context.Context, job *JobData) error

UpdateJob updates existing job data in storage

type IntervalSchedule

type IntervalSchedule struct {
	// contains filtered or unexported fields
}

IntervalSchedule runs a job at fixed intervals.

func NewIntervalSchedule

func NewIntervalSchedule(interval time.Duration) (*IntervalSchedule, error)

NewIntervalSchedule creates a new IntervalSchedule.

func (*IntervalSchedule) Interval

func (s *IntervalSchedule) Interval() time.Duration

Interval returns the configured interval duration.

func (*IntervalSchedule) Next

func (s *IntervalSchedule) Next(t time.Time) time.Time

Next returns the next run time based on the provided time.

type Job

type Job interface {
	// ID returns the unique identifier of the job
	ID() string

	// NextRun returns the next scheduled run time
	NextRun() time.Time

	// LastRun returns the last execution time
	LastRun() time.Time

	// IsRunning returns whether the job is currently executing
	IsRunning() bool

	// Metadata returns the job metadata
	Metadata() map[string]string
}

Job represents a scheduled job configuration

type JobData

type JobData struct {
	// ID is the unique identifier of the job
	ID string

	// ScheduleType indicates the type of schedule (interval, cron, once, etc.)
	ScheduleType string

	// ScheduleConfig stores the schedule configuration as a string
	// For interval: duration string (e.g., "5m", "1h")
	// For cron: cron expression (e.g., "0 0 * * *")
	// For once: timestamp (e.g., "2025-10-02T15:04:05Z")
	ScheduleConfig string

	// Status is the current status of the job
	Status JobStatus

	// NextRun is the next scheduled execution time
	NextRun time.Time

	// LastRun is the last execution time
	LastRun time.Time

	// CreatedAt is the job creation timestamp
	CreatedAt time.Time

	// UpdatedAt is the last update timestamp
	UpdatedAt time.Time

	// Metadata stores additional job metadata as key-value pairs
	Metadata map[string]string
}

JobData represents the persistent data of a scheduled job

type JobDataModel

type JobDataModel struct {
	ID             string    `gorm:"primaryKey;size:255"`
	ScheduleType   string    `gorm:"size:50"`
	ScheduleConfig string    `gorm:"type:text"`
	Status         string    `gorm:"size:20;index"`
	NextRun        time.Time `gorm:"index"`
	LastRun        time.Time
	CreatedAt      time.Time `gorm:"index"`
	UpdatedAt      time.Time `gorm:"index"`
	Metadata       string    `gorm:"type:text"` // JSON-encoded map[string]string
}

JobDataModel represents the database model for JobData

func (JobDataModel) TableName

func (JobDataModel) TableName() string

TableName specifies the table name for JobDataModel

type JobEvent

type JobEvent struct {
	// contains filtered or unexported fields
}

JobEvent represents the context passed to a JobHandler when a job is executed.

func (JobEvent) Delay

func (e JobEvent) Delay() time.Duration

Delay returns the duration between the scheduled time and the handler invocation.

func (JobEvent) ID

func (e JobEvent) ID() string

ID returns the unique identifier of the job associated with this event.

func (JobEvent) Job

func (e JobEvent) Job() Job

Job returns the underlying Job instance for advanced inspection.

func (JobEvent) LastCompletedAt

func (e JobEvent) LastCompletedAt() time.Time

LastCompletedAt returns the completion time of the previous successful execution, or zero if none.

func (JobEvent) Metadata

func (e JobEvent) Metadata() map[string]string

Metadata returns a copy of the job metadata captured at execution time.

func (JobEvent) Name

func (e JobEvent) Name() string

Name returns the job name (alias for ID) associated with this event.

func (JobEvent) Schedule

func (e JobEvent) Schedule() Schedule

Schedule returns the schedule associated with the job when it executed.

func (JobEvent) ScheduledAt

func (e JobEvent) ScheduledAt() time.Time

ScheduledAt returns the time the job was scheduled to execute.

func (JobEvent) StartedAt

func (e JobEvent) StartedAt() time.Time

StartedAt returns the actual time the job handler was invoked.

type JobHandler

type JobHandler func(ctx context.Context, event JobEvent) error

JobHandler handles job execution events.

type JobStatus

type JobStatus string

JobStatus represents the current status of a job

const (
	// JobStatusPending indicates the job is waiting to run
	JobStatusPending JobStatus = "pending"

	// JobStatusRunning indicates the job is currently executing
	JobStatusRunning JobStatus = "running"

	// JobStatusCompleted indicates the job completed successfully
	JobStatusCompleted JobStatus = "completed"

	// JobStatusFailed indicates the job execution failed
	JobStatusFailed JobStatus = "failed"

	// JobStatusReschedulingFailed indicates the handler completed but the
	// scheduler failed to enqueue the next-tick scheduled message. Distinct
	// from JobStatusFailed so observability can tell the two apart.
	JobStatusReschedulingFailed JobStatus = "rescheduling_failed"

	// JobStatusDisabled indicates the job is disabled
	JobStatusDisabled JobStatus = "disabled"
)

type MemoryStorage

type MemoryStorage struct {
	// contains filtered or unexported fields
}

MemoryStorage is an in-memory implementation of the Storage interface

func NewMemoryStorage

func NewMemoryStorage() *MemoryStorage

NewMemoryStorage creates a new in-memory storage instance

func (*MemoryStorage) Close

func (s *MemoryStorage) Close(ctx context.Context) error

Close releases storage resources

func (*MemoryStorage) DeleteExecutions

func (s *MemoryStorage) DeleteExecutions(ctx context.Context, jobID string, before time.Time) error

DeleteExecutions removes execution records older than the specified time

func (*MemoryStorage) DeleteJob

func (s *MemoryStorage) DeleteJob(ctx context.Context, jobID string) error

DeleteJob removes job data from storage

func (*MemoryStorage) GetExecution

func (s *MemoryStorage) GetExecution(ctx context.Context, executionID string) (*ExecutionRecord, error)

GetExecution retrieves a specific execution record

func (*MemoryStorage) GetJob

func (s *MemoryStorage) GetJob(ctx context.Context, jobID string) (*JobData, error)

GetJob retrieves job data by ID

func (*MemoryStorage) HealthCheck

func (s *MemoryStorage) HealthCheck(ctx context.Context) error

HealthCheck verifies the storage connection is healthy

func (*MemoryStorage) Initialize

func (s *MemoryStorage) Initialize(ctx context.Context) error

Initialize prepares the storage for use

func (*MemoryStorage) ListExecutions

func (s *MemoryStorage) ListExecutions(ctx context.Context, jobID string, options *QueryOptions) ([]*ExecutionRecord, error)

ListExecutions returns execution records for a job with optional filters

func (*MemoryStorage) ListJobs

func (s *MemoryStorage) ListJobs(ctx context.Context) ([]*JobData, error)

ListJobs returns all jobs in storage

func (*MemoryStorage) ListJobsByStatus

func (s *MemoryStorage) ListJobsByStatus(ctx context.Context, status JobStatus) ([]*JobData, error)

ListJobsByStatus returns jobs filtered by status

func (*MemoryStorage) SaveExecution

func (s *MemoryStorage) SaveExecution(ctx context.Context, record *ExecutionRecord) error

SaveExecution persists an execution record

func (*MemoryStorage) SaveJob

func (s *MemoryStorage) SaveJob(ctx context.Context, job *JobData) error

SaveJob persists job data to storage

func (*MemoryStorage) UpdateJob

func (s *MemoryStorage) UpdateJob(ctx context.Context, job *JobData) error

UpdateJob updates existing job data in storage

type MigrateResult added in v0.0.5

type MigrateResult struct {
	JobsMigrated       int
	JobsSkipped        int
	ExecutionsMigrated int
	ExecutionsSkipped  int
}

MigrateResult contains the results of a storage migration.

func MigrateStorage added in v0.0.5

func MigrateStorage(ctx context.Context, src Storage, dst Storage) (*MigrateResult, error)

MigrateStorage copies all jobs and execution records from src to dst. It is idempotent: jobs that already exist in dst are skipped. This function works with any Storage implementations (e.g. GORM→NATS, Memory→GORM, etc).

type NATSSchedulerLogger added in v0.0.8

type NATSSchedulerLogger func(msg string, keysAndValues ...any)

NATSSchedulerLogger is a structured-log callback used to surface errors from best-effort operations (KV writes, next-tick publishes, reconciler scans). Arguments after msg are key/value pairs in the slog style.

The scheduler invokes the logger from background goroutines; the callback must be safe to call concurrently.

type NATSSchedulerOption added in v0.0.5

type NATSSchedulerOption func(*natsSchedulerImpl)

NATSSchedulerOption configures a natsSchedulerImpl instance.

func WithAddJobRetryBudget added in v0.1.0

func WithAddJobRetryBudget(budget time.Duration) NATSSchedulerOption

WithAddJobRetryBudget bounds how long AddJob and UpdateJobSchedule will keep retrying their KV write and scheduled-message publish through a transient cluster hiccup (a raft leader re-election typically finishes well inside the default 5 s budget). A truly unavailable cluster will exhaust the budget and bubble the error back to the caller.

Setting budget <= 0 disables the retry entirely, restoring the previous "fail fast on the first transient error" behaviour.

func WithJetStreamReadyTimeout added in v0.1.1

func WithJetStreamReadyTimeout(timeout time.Duration) NATSSchedulerOption

WithJetStreamReadyTimeout bounds how long Start() waits for the JetStream metaleader to be reachable before issuing the first KV / stream / consumer create.

This is the fix for the multi-instance brand-new-deployment hang. nats.go's AccountInfo documents:

"For clustered topologies, AccountInfo will time out."

Every JetStream API the scheduler uses (CreateOrUpdateKeyValue, CreateOrUpdateStream, CreateOrUpdateConsumer) issues an AccountInfo internally. When several scheduler instances boot against a freshly- started 3-node NATS cluster, the metaleader may still be electing — the API request neither succeeds nor fails fast, it hangs for the caller's ctx deadline. Without this wait, Start() can hang for minutes (typical caller ctx is much longer than 30 s).

timeout <= 0 disables the wait (legacy behaviour). Default 30 s comfortably covers a metaleader election; the moment the metaleader answers, the wait returns.

func WithLoadJobsAsyncPublishTimeout added in v0.3.0

func WithLoadJobsAsyncPublishTimeout(timeout time.Duration) NATSSchedulerOption

WithLoadJobsAsyncPublishTimeout bounds how long loadJobsFromKV waits at the end of Start() for outstanding async PublishMsgAsync calls to be acked. A timeout here does not break correctness — the background reconciler will republish any persisted job whose next-tick message was lost or never acked — but it is logged so operators can spot a cluster that is slow to ack startup publishes.

timeout <= 0 disables the wait (workers exit and Start() returns as soon as the last PublishMsgAsync has been enqueued).

func WithLoadJobsConcurrency added in v0.3.0

func WithLoadJobsConcurrency(n int) NATSSchedulerOption

WithLoadJobsConcurrency sets the number of worker goroutines used by loadJobsFromKV during Start() to parallelize KV reads and async scheduled-message publishes for persisted jobs.

Each worker does a sync jobKV.Get + (optional) jobKV.Update + a fire-and- forget PublishMsgAsync for one job before pulling the next key. The JetStream client itself pipelines the asyncs behind the scenes, so the publish stage is effectively free; this option controls how many KV reads run in parallel.

Higher values cut startup time on deployments with hundreds or thousands of persisted jobs (O(N*RTT) → O(N/concurrency * RTT)). Values below 1 are treated as 1 (no parallelism). The default is 32, comfortable for most clusters; raise it only if you have measured KV.Get latency dominating Start() time.

Note on error visibility: per-message publish ack errors at startup are not logged by the scheduler (the reconciler is the safety net — any job whose scheduled message never landed will be republished within the reconciler interval). If you want per-message visibility, install jetstream.WithPublishAsyncErrHandler on the JetStream client you pass to NewNATSScheduler — that handler fires for every async publish whose ack fails or times out.

func WithNATSConsumerName added in v0.0.5

func WithNATSConsumerName(name string) NATSSchedulerOption

WithNATSConsumerName sets the durable consumer name.

func WithNATSSchedulerCodec added in v0.0.5

func WithNATSSchedulerCodec(codec ScheduleCodec) NATSSchedulerOption

WithNATSSchedulerCodec sets the schedule codec for encoding/decoding schedules.

func WithNATSSchedulerExecBucket added in v0.0.5

func WithNATSSchedulerExecBucket(name string) NATSSchedulerOption

WithNATSSchedulerExecBucket sets the KV bucket name for execution records.

func WithNATSSchedulerJobBucket added in v0.0.5

func WithNATSSchedulerJobBucket(name string) NATSSchedulerOption

WithNATSSchedulerJobBucket sets the KV bucket name for job data.

func WithNATSSchedulerLogger added in v0.0.8

func WithNATSSchedulerLogger(logger NATSSchedulerLogger) NATSSchedulerOption

WithNATSSchedulerLogger installs a logger called when the scheduler hits a non-fatal error in a best-effort path (KV puts, next-tick publishes, and the reconciler). Without this option, those errors are dropped silently.

func WithNATSStreamName added in v0.0.5

func WithNATSStreamName(name string) NATSSchedulerOption

WithNATSStreamName sets the JetStream stream name.

func WithNATSSubjectPrefix added in v0.0.5

func WithNATSSubjectPrefix(prefix string) NATSSchedulerOption

WithNATSSubjectPrefix sets the NATS subject prefix for job messages.

func WithOnReschedulingFailed added in v0.0.8

func WithOnReschedulingFailed(fn RescheduleFailureFunc) NATSSchedulerOption

WithOnReschedulingFailed installs a callback invoked when executeJob fails to enqueue the next-tick scheduled message after exhausting retries. Callers can use this to record a dead-letter, page on-call, or trigger an external reconciler. Without this option, the failure is logged (if a logger is configured) but otherwise silently absorbed.

func WithOnce added in v0.2.0

func WithOnce(fn OnceFunc) NATSSchedulerOption

WithOnce installs a caller-provided distributed-once implementation. By default the scheduler uses an internal JetStream-KV-backed Once. Pass a custom OnceFunc when you already have a distributed lock service (e.g. the one your platform's other modules use) and want the scheduler to share it so all first-deploy provisioning serializes through a single substrate.

func WithOnceKey added in v0.2.0

func WithOnceKey(key string) NATSSchedulerOption

WithOnceKey overrides the key handed to OnceFunc when serializing the scheduler's first-deploy provisioning. Defaults to "scheduler.init". Override when sharing a distributed-lock substrate with other modules and your global naming scheme expects a different value.

func WithOnceLockBucket added in v0.2.0

func WithOnceLockBucket(name string) NATSSchedulerOption

WithOnceLockBucket overrides the JetStream KV bucket name used by the built-in Once. Has no effect when WithOnce has installed a custom implementation. Defaults to SCHEDULER_LOCKS.

func WithPublishRetry added in v0.0.8

func WithPublishRetry(attempts int, initialBackoff time.Duration) NATSSchedulerOption

WithPublishRetry configures how many times executeJob retries the next-tick publishScheduledMessage call when it fails, and the backoff between attempts (doubling between each attempt). attempts < 1 is treated as 1 (no retry).

func WithReconcilerGracePeriod added in v0.0.8

func WithReconcilerGracePeriod(gracePeriod time.Duration) NATSSchedulerOption

WithReconcilerGracePeriod sets the lag tolerance the reconciler applies before treating a job as stuck. A job is republished only if its KV NextRun is older than now - gracePeriod. The default is 30 s, large enough to skip jobs that are mid-tick yet small enough to recover quickly.

func WithReconcilerInterval added in v0.0.8

func WithReconcilerInterval(interval time.Duration) NATSSchedulerOption

WithReconcilerInterval overrides how often the background reconciler scans the job KV and republishes the scheduled message for any recurring job whose NextRun is more than the supplied grace period in the past.

Combined with the stream's Duplicates window and the deterministic Nats-Msg-Id used by publishScheduledMessage, republishing is safe even when the original scheduled message is still alive in the stream: the duplicate is silently suppressed.

The reconciler is enabled by default (defaultReconcilerInterval). Passing interval <= 0 disables it.

func WithStartPhaseTimeout added in v0.1.1

func WithStartPhaseTimeout(timeout time.Duration) NATSSchedulerOption

WithStartPhaseTimeout caps each individual NATS API call inside Start (KV bucket creation, stream creation, backing-stream wait, persisted-job load, consumer creation, initial Consume subscribe). Without this cap a single hung JetStream API request consumes the caller's entire Start context — typical callers pass several minutes, so the hang surfaces 5+ minutes later with no indication of which phase blocked.

Each phase logs entry, exit, and elapsed time through the installed logger, and on failure the returned error is wrapped with the phase name.

timeout <= 0 falls back to using the caller's ctx directly (legacy behaviour, no per-phase bound).

func WithStartupStreamReadyTimeout added in v0.0.9

func WithStartupStreamReadyTimeout(timeout time.Duration) NATSSchedulerOption

WithStartupStreamReadyTimeout bounds how long Start() waits for the SCHEDULER stream and KV backing streams to have a raft leader before proceeding to load persisted jobs.

In multi-node JetStream clusters, CreateOrUpdateStream and CreateOrUpdateKeyValue return as soon as the metaleader has accepted the config — the asset's own raft group may still be electing a leader, so a publishScheduledMessage or KV Keys/Get call issued in that window returns nats: no responders available for request. Without this wait, loadJobsFromKV would silently or noisily drop the first startup publish for every persisted job until the chain naturally re-fires (it doesn't, because the chain is the publish we just lost).

timeout <= 0 disables the wait. Single-node JetStream (no Cluster info) is treated as ready immediately.

func WithStreamDuplicatesWindow added in v0.0.6

func WithStreamDuplicatesWindow(window time.Duration) NATSSchedulerOption

WithStreamDuplicatesWindow overrides the duration the stream tracks Nats-Msg-Id values for deduplication. Larger values catch republishes that arrive long after the original publish (e.g. a peer that restarts hours later), at the cost of more server-side state.

type NATSStorage added in v0.0.5

type NATSStorage struct {
	// contains filtered or unexported fields
}

NATSStorage implements the Storage interface using NATS JetStream KV Store.

func NewNATSStorage added in v0.0.5

func NewNATSStorage(js jetstream.JetStream, opts ...NATSStorageOption) *NATSStorage

NewNATSStorage creates a new NATSStorage instance.

func (*NATSStorage) Close added in v0.0.5

func (s *NATSStorage) Close(ctx context.Context) error

Close marks the storage as uninitialized.

func (*NATSStorage) DeleteExecutions added in v0.0.5

func (s *NATSStorage) DeleteExecutions(ctx context.Context, jobID string, before time.Time) error

DeleteExecutions removes execution records for a job that are older than the specified time.

func (*NATSStorage) DeleteJob added in v0.0.5

func (s *NATSStorage) DeleteJob(ctx context.Context, jobID string) error

DeleteJob removes a job from KV storage.

func (*NATSStorage) GetExecution added in v0.0.5

func (s *NATSStorage) GetExecution(ctx context.Context, executionID string) (*ExecutionRecord, error)

GetExecution retrieves a specific execution record by ID.

func (*NATSStorage) GetJob added in v0.0.5

func (s *NATSStorage) GetJob(ctx context.Context, jobID string) (*JobData, error)

GetJob retrieves a job by ID from KV storage.

func (*NATSStorage) HealthCheck added in v0.0.5

func (s *NATSStorage) HealthCheck(ctx context.Context) error

HealthCheck verifies the storage is initialized and the KV bucket is accessible.

func (*NATSStorage) Initialize added in v0.0.5

func (s *NATSStorage) Initialize(ctx context.Context) error

Initialize creates the KV buckets and prepares the storage for use. The CreateOrUpdateKeyValue calls are wrapped in Once so concurrent first-deploy peers do not race the JetStream reply path (see once.go for the failure mode). After Once succeeds, each peer — leader or follower — picks up the bucket handle via a plain metadata lookup that cannot hang.

Before any JetStream call, Initialize gates on waitForJetStreamReady so a freshly-started cluster whose metaleader is still electing produces a bounded wait rather than the multi-minute hang nats.go's AccountInfo documents on clustered topologies.

func (*NATSStorage) ListExecutions added in v0.0.5

func (s *NATSStorage) ListExecutions(ctx context.Context, jobID string, options *QueryOptions) ([]*ExecutionRecord, error)

ListExecutions returns execution records for a job with optional filtering, sorting, and pagination.

func (*NATSStorage) ListJobs added in v0.0.5

func (s *NATSStorage) ListJobs(ctx context.Context) ([]*JobData, error)

ListJobs returns all jobs from KV storage.

func (*NATSStorage) ListJobsByStatus added in v0.0.5

func (s *NATSStorage) ListJobsByStatus(ctx context.Context, status JobStatus) ([]*JobData, error)

ListJobsByStatus returns jobs filtered by status from KV storage.

func (*NATSStorage) SaveExecution added in v0.0.5

func (s *NATSStorage) SaveExecution(ctx context.Context, record *ExecutionRecord) error

SaveExecution persists an execution record to KV storage.

func (*NATSStorage) SaveJob added in v0.0.5

func (s *NATSStorage) SaveJob(ctx context.Context, job *JobData) error

SaveJob persists a new job to KV storage.

func (*NATSStorage) UpdateJob added in v0.0.5

func (s *NATSStorage) UpdateJob(ctx context.Context, job *JobData) error

UpdateJob updates an existing job in KV storage.

type NATSStorageOption added in v0.0.5

type NATSStorageOption func(*NATSStorage)

NATSStorageOption configures a NATSStorage instance.

func WithNATSStorageExecBucket added in v0.0.5

func WithNATSStorageExecBucket(name string) NATSStorageOption

WithNATSStorageExecBucket sets the KV bucket name for execution records.

func WithNATSStorageJetStreamReadyTimeout added in v0.2.0

func WithNATSStorageJetStreamReadyTimeout(timeout time.Duration) NATSStorageOption

WithNATSStorageJetStreamReadyTimeout bounds how long Initialize waits for the JetStream metaleader to be reachable before issuing the first KV create. Same rationale as the scheduler's WithJetStreamReadyTimeout: every CreateOrUpdateKeyValue call issues AccountInfo internally, which nats.go documents will time out on clustered topologies if the metaleader is still electing. Zero disables the wait (legacy behaviour). Defaults to 30 s.

func WithNATSStorageJobBucket added in v0.0.5

func WithNATSStorageJobBucket(name string) NATSStorageOption

WithNATSStorageJobBucket sets the KV bucket name for job data.

func WithNATSStorageOnce added in v0.2.0

func WithNATSStorageOnce(fn OnceFunc) NATSStorageOption

WithNATSStorageOnce installs a distributed-once implementation used to serialize the first-time CreateOrUpdateKeyValue calls in Initialize across processes. Storage and Scheduler use distinct keys (defaultStorageOnceKey vs. defaultSchedulerOnceKey) because their fn bodies provision different resource sets; sharing the same OnceFunc across both is fine (recommended in cluster deployments so both components serialize through the same lock substrate), but the keys must stay distinct or one side's provisioning gets silently skipped.

When unset, Initialize uses an internal JetStream-KV-backed Once with the same default lock bucket name (SCHEDULER_LOCKS) as NATSScheduler, so the two still share a single lock bucket across the cluster.

func WithNATSStorageOnceKey added in v0.2.0

func WithNATSStorageOnceKey(key string) NATSStorageOption

WithNATSStorageOnceKey overrides the key this package hands to its OnceFunc. Defaults to defaultStorageOnceKey ("scheduler.storage-init"). Override when sharing a distributed-lock substrate with other modules and your global naming scheme expects a different value. Keep the value distinct from NATSScheduler's WithOnceKey so each component's provisioning runs (see WithNATSStorageOnce).

func WithNATSStorageOnceLockBucket added in v0.2.0

func WithNATSStorageOnceLockBucket(name string) NATSStorageOption

WithNATSStorageOnceLockBucket overrides the JetStream KV bucket name used by the built-in Once. Has no effect when WithNATSStorageOnce installed a custom implementation. Defaults to SCHEDULER_LOCKS — the same default as NATSScheduler so both serialize against one bucket out of the box.

type OnceFunc added in v0.2.0

type OnceFunc func(ctx context.Context, key string, fn func(context.Context) error) error

OnceFunc serializes execution of fn across processes that share the same distributed-coordination substrate. The contract:

  • Exactly one caller (per key, per fn-success cycle) runs fn.
  • Concurrent callers block until that fn returns, then return nil.
  • Subsequent callers (after fn has succeeded) return nil without running fn.

Implementations may treat the "fn has succeeded" memory as bounded (e.g. expiring after some TTL); callers must therefore make fn idempotent so a delayed late-joiner that re-runs fn does not corrupt state. CreateOrUpdateKeyValue / CreateOrUpdateStream are idempotent in this sense, which is why Once is the right primitive for serializing scheduler bootstrap.

Callers can inject a custom OnceFunc via WithOnce to share a lock substrate with other modules; otherwise the scheduler uses its own built-in JetStream-KV-backed implementation.

type OnceSchedule

type OnceSchedule struct {
	// contains filtered or unexported fields
}

OnceSchedule runs a job only once at a specific time.

func NewOnceSchedule

func NewOnceSchedule(runTime time.Time) (*OnceSchedule, error)

NewOnceSchedule creates a new OnceSchedule.

func (*OnceSchedule) Next

func (s *OnceSchedule) Next(t time.Time) time.Time

Next returns the scheduled run time if it's still in the future, otherwise a far future time.

func (*OnceSchedule) RunAt

func (s *OnceSchedule) RunAt() time.Time

RunAt returns the configured run time.

type QueryOptions

type QueryOptions struct {
	// Limit limits the number of records to return
	Limit int

	// Offset skips the specified number of records
	Offset int

	// StartTime filters records after this time
	StartTime *time.Time

	// EndTime filters records before this time
	EndTime *time.Time

	// Status filters records by status
	Status *JobStatus

	// SortBy specifies the field to sort by (e.g., "start_time", "duration")
	SortBy string

	// SortDesc specifies descending sort order
	SortDesc bool
}

QueryOptions defines options for querying execution records

type RescheduleFailureFunc added in v0.0.8

type RescheduleFailureFunc func(jobID string, nextRun time.Time, err error)

RescheduleFailureFunc is invoked when executeJob has run the handler successfully (or with error) but failed to enqueue the next-tick scheduled message even after retries. It is the hook callers use to plug in a dead-letter or out-of-band reconciler. nextRun is the scheduled time of the publish that failed.

Invoked from a background goroutine; the callback must be safe to call concurrently and should not block for long.

type Schedule

type Schedule interface {
	// Next calculates the next run time based on the current time
	Next(t time.Time) time.Time
}

Schedule defines the scheduling strategy for a job

type ScheduleBuilder

type ScheduleBuilder interface {
	// Every creates a schedule that runs at fixed intervals
	Every(duration time.Duration) (Schedule, error)

	// Cron creates a schedule from a cron expression
	Cron(expression string) (Schedule, error)

	// At creates a schedule that runs at a specific time daily
	At(hour, minute, second int) (Schedule, error)

	// Once creates a schedule that runs only once at the specified time
	Once(t time.Time) (Schedule, error)
}

ScheduleBuilder provides methods to create common scheduling strategies

type ScheduleCodec

type ScheduleCodec interface {
	// Encode converts a schedule into a serializable representation
	Encode(schedule Schedule) (scheduleType string, scheduleConfig string, err error)

	// Decode reconstructs a schedule from its serialized representation
	Decode(scheduleType string, scheduleConfig string) (Schedule, error)
}

ScheduleCodec encodes and decodes schedules for persistence

type Scheduler

type Scheduler interface {
	// Start begins the scheduler execution
	Start(ctx context.Context) error

	// WaitUntilRunning blocks until the scheduler has fully started or the context is done
	WaitUntilRunning(ctx context.Context) error

	// Stop gracefully shuts down the scheduler
	Stop(ctx context.Context) error

	// AddJob registers a new job with the scheduler
	AddJob(id string, schedule Schedule, metadata map[string]string) error

	// UpdateJobSchedule replaces the schedule for an existing job
	UpdateJobSchedule(id string, schedule Schedule) error

	// RemoveJob removes a job from the scheduler
	RemoveJob(id string) error

	// GetJob retrieves a job by its ID
	GetJob(id string) (Job, error)

	// ListJobs returns all registered jobs
	ListJobs() []Job

	// IsRunning returns whether the scheduler is currently running
	IsRunning() bool
}

Scheduler defines the interface for job scheduling operations

func NewNATSScheduler added in v0.0.5

func NewNATSScheduler(js jetstream.JetStream, handler JobHandler, opts ...NATSSchedulerOption) Scheduler

NewNATSScheduler creates a new Scheduler backed by NATS JetStream. Requires NATS Server 2.12+ with JetStream enabled and AllowMsgSchedules support.

The scheduler uses:

  • A JetStream stream with scheduled delivery for triggering jobs at their scheduled times
  • A JetStream KV Store for persisting job metadata and execution records
  • A durable consumer for reliable message consumption with automatic failover

Example:

nc, _ := nats.Connect(nats.DefaultURL)
js, _ := jetstream.New(nc)
s := scheduler.NewNATSScheduler(js, handler)
s.Start(ctx)

func NewScheduler

func NewScheduler(storage Storage, handler JobHandler, codec ScheduleCodec) Scheduler

NewScheduler creates a new Scheduler instance

type StartAtIntervalSchedule

type StartAtIntervalSchedule struct {
	// contains filtered or unexported fields
}

StartAtIntervalSchedule runs a job at a fixed interval starting from a specific time.

func NewStartAtIntervalSchedule

func NewStartAtIntervalSchedule(startAt time.Time, interval time.Duration) (*StartAtIntervalSchedule, error)

NewStartAtIntervalSchedule creates a new StartAtIntervalSchedule.

func (*StartAtIntervalSchedule) Interval

func (s *StartAtIntervalSchedule) Interval() time.Duration

Interval returns the configured interval.

func (*StartAtIntervalSchedule) Next

Next returns the next run time based on the configured start time and interval.

func (*StartAtIntervalSchedule) StartAt

func (s *StartAtIntervalSchedule) StartAt() time.Time

StartAt returns the configured start time.

type Storage

type Storage interface {
	// Initialize prepares the storage for use
	Initialize(ctx context.Context) error

	// Close releases storage resources
	Close(ctx context.Context) error

	// SaveJob persists job data to storage
	SaveJob(ctx context.Context, job *JobData) error

	// UpdateJob updates existing job data in storage
	UpdateJob(ctx context.Context, job *JobData) error

	// DeleteJob removes job data from storage
	DeleteJob(ctx context.Context, jobID string) error

	// GetJob retrieves job data by ID
	GetJob(ctx context.Context, jobID string) (*JobData, error)

	// ListJobs returns all jobs in storage
	ListJobs(ctx context.Context) ([]*JobData, error)

	// ListJobsByStatus returns jobs filtered by status
	ListJobsByStatus(ctx context.Context, status JobStatus) ([]*JobData, error)

	// SaveExecution persists an execution record
	SaveExecution(ctx context.Context, record *ExecutionRecord) error

	// GetExecution retrieves a specific execution record
	GetExecution(ctx context.Context, executionID string) (*ExecutionRecord, error)

	// ListExecutions returns execution records for a job with optional filters
	ListExecutions(ctx context.Context, jobID string, options *QueryOptions) ([]*ExecutionRecord, error)

	// DeleteExecutions removes execution records older than the specified time
	DeleteExecutions(ctx context.Context, jobID string, before time.Time) error

	// HealthCheck verifies the storage connection is healthy
	HealthCheck(ctx context.Context) error
}

Storage defines the interface for persistent storage operations

Directories

Path Synopsis
examples
cron_example command
cron_with_gorm command
nats_jetstream command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL