scheduler

package
v0.0.41 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 14, 2026 License: Apache-2.0 Imports: 12 Imported by: 0

README

Scheduler Module

A common weedbox job scheduler module built on top of github.com/Weedbox/scheduler and wired into Uber Fx. Supports two backends:

Mode Backend Deployment
gorm *gorm.DB + in-memory timer Single node only
nats NATS 2.12+ JetStream Scheduled Message Delivery Multi-instance with KV-backed persistence

The public API is identical in both modes — only scheduler.mode in config changes.

⚠️ Important: gorm mode uses an in-memory timer inside the process. Running multiple replicas against the same database will cause every job to fire on every replica. Do not deploy gorm mode with more than one instance. Use nats mode for multi-node deployments.

Installation

go get github.com/weedbox/common-modules/scheduler

Configuration

[scheduler]
mode = "gorm"      # or "nats"

# Only relevant in nats mode
[scheduler.nats]
streamName    = "SCHEDULER"
subjectPrefix = "scheduler"
consumerName  = "scheduler-worker"
jobBucket     = "SCHEDULER_JOBS"
execBucket    = "SCHEDULER_EXECUTIONS"

# Optional tuning knobs (omit to use upstream defaults)
# duplicatesWindow         = "24h"   # stream-level dedup window
# reconcilerInterval       = "30s"   # background republish reconciler
# reconcilerGracePeriod    = "30s"   # lag tolerated before a job is treated as stuck
# addJobRetryBudget        = "5s"    # AddJob/Update retry budget across raft hiccups
# startupStreamReadyTimeout = "30s"  # wait for SCHEDULER stream raft leader
# jetStreamReadyTimeout    = "30s"   # wait for JetStream metaleader
# startPhaseTimeout        = "30s"   # per-phase cap inside Start()
# loadJobsConcurrency      = 32      # worker pool size for parallel KV reads at startup
# loadJobsAsyncPublishTimeout = "30s" # cap for draining startup async publishes
# onceKey                  = "scheduler.init"  # key used on the shared nats_connector lock bucket
# [scheduler.nats.publishRetry]
# attempts        = 3
# initialBackoff  = "1s"

In gorm mode the module also needs a database.DatabaseConnector to be provided (e.g. sqlite_connector or postgres_connector). In nats mode it needs nats_connector; the scheduler reuses the connector's distributed-once primitive (NATSConnector.Once) so first-deploy provisioning serializes through the same lock substrate as every other module.

Quick Start

GORM Mode (single node)
package main

import (
    "context"
    "time"

    libsched "github.com/Weedbox/scheduler"
    "github.com/weedbox/common-modules/logger"
    "github.com/weedbox/common-modules/scheduler"
    "github.com/weedbox/common-modules/sqlite_connector"
    "go.uber.org/fx"
)

func main() {
    fx.New(
        logger.Module(),
        sqlite_connector.Module("sqlite"),
        scheduler.Module("scheduler"),

        // Set the global job handler. Users dispatch on their own, typically
        // by switching on event.ID() or a metadata field.
        fx.Invoke(func(s *scheduler.SchedulerModule) {
            s.SetHandler(func(ctx context.Context, e libsched.JobEvent) error {
                switch e.ID() {
                case "daily_cleanup":
                    return doCleanup(ctx)
                case "hourly_metrics":
                    return flushMetrics(ctx)
                }
                return nil
            })
        }),

        // Idempotent registration — safe to call on every startup.
        fx.Invoke(func(s *scheduler.SchedulerModule) error {
            sch, _ := libsched.NewCronSchedule("0 3 * * *")
            return s.EnsureJob("daily_cleanup", sch, nil)
        }),
        fx.Invoke(func(s *scheduler.SchedulerModule) error {
            sch, _ := libsched.NewIntervalSchedule(time.Hour)
            return s.EnsureJob("hourly_metrics", sch, nil)
        }),
    ).Run()
}

func doCleanup(ctx context.Context) error  { /* ... */ return nil }
func flushMetrics(ctx context.Context) error { /* ... */ return nil }
NATS / JetStream Mode
fx.New(
    logger.Module(),
    nats_connector.Module("nats"),
    scheduler.Module("scheduler"),
    // ... same SetHandler / EnsureJob invokes as above
)

Job definitions are persisted in a JetStream KV bucket and scheduled deliveries are published to a JetStream stream. On startup the scheduler reloads jobs from KV and lets the shared durable consumer fan triggers out to whichever instance is free; stale messages left by a previous run are filtered against KV state in the message handler rather than purged from the stream. This keeps single-instance restarts and multi-instance load splitting on the same code path.

Multi-instance deployments. Multiple scheduler replicas can run against the same stream concurrently. First-deploy provisioning of the stream, KV buckets, and durable consumer is serialized through a distributed-once primitive (sourced from nats_connector), and the durable consumer's work-queue semantics ensure each trigger is delivered to exactly one replica. Recurring chains survive replica crashes, leader re-elections, and rolling restarts; a background reconciler republishes any job whose next-run has slipped past its grace window.

API

// Set the single global job handler. Typically called once at startup.
// Calling SetHandler after the scheduler has started is allowed; subsequent
// events will be routed to the new handler.
func (m *SchedulerModule) SetHandler(h libsched.JobHandler)

// EnsureJob registers a job idempotently.
// - If the ID does not exist, it is added.
// - If the ID exists, its schedule is updated (no-op if unchanged).
// Safe to call from fx.Invoke — the operation is queued until the scheduler
// has started.
func (m *SchedulerModule) EnsureJob(id string, schedule libsched.Schedule, metadata map[string]string) error

// SubmitJob adds a one-shot or dynamically created job. Returns
// libsched.ErrJobAlreadyExists if the ID collides. Also supports the
// pending-queue pattern used by EnsureJob.
func (m *SchedulerModule) SubmitJob(id string, schedule libsched.Schedule, metadata map[string]string) error

// Lookup / removal / listing — valid after the module has started.
func (m *SchedulerModule) RemoveJob(id string) error
func (m *SchedulerModule) GetJob(id string) (libsched.Job, error)
func (m *SchedulerModule) ListJobs() []libsched.Job

// Escape hatch — direct access to the underlying library scheduler.
func (m *SchedulerModule) GetScheduler() libsched.Scheduler
Static vs. Dynamic Jobs

Two patterns that cover most use cases:

Static jobs — registered at startup with compile-time constant IDs. Use EnsureJob; repeat calls are harmless, so a service can always register its cron-like jobs on boot without worrying about duplicates.

const JobDailyCleanup = "daily_cleanup"

sch, _ := libsched.NewCronSchedule("0 3 * * *")
_ = sm.EnsureJob(JobDailyCleanup, sch, nil)

Dynamic jobs — created at runtime with unique IDs, e.g. reminders, delayed tasks. Use SubmitJob:

func scheduleReminder(sm *scheduler.SchedulerModule, userID string, at time.Time) error {
    sch, _ := libsched.NewOnceSchedule(at)
    return sm.SubmitJob(
        fmt.Sprintf("reminder-%s-%d", userID, at.Unix()),
        sch,
        map[string]string{"kind": "reminder", "user_id": userID},
    )
}

In the global handler you dispatch dynamic jobs by metadata or ID prefix:

sm.SetHandler(func(ctx context.Context, e libsched.JobEvent) error {
    if kind := e.Metadata()["kind"]; kind == "reminder" {
        return sendReminder(ctx, e.Metadata()["user_id"])
    }
    switch e.ID() {
    case JobDailyCleanup:
        return doCleanup(ctx)
    }
    return nil
})

The module itself is intentionally unopinionated about how you classify jobs — you choose the convention (ID prefix, metadata key, etc.).

Schedule Types

Schedule constructors live in the underlying library; import libsched "github.com/Weedbox/scheduler" and use:

  • libsched.NewIntervalSchedule(d) — every d
  • libsched.NewCronSchedule(expr) — cron expression
  • libsched.NewOnceSchedule(t) — one-shot at time t
  • libsched.NewStartAtIntervalSchedule(start, d) — begin at start, every d after

Deployment Notes

  • gorm mode is for single-node deployments (a worker, a single-process service, a CLI). Do not scale horizontally — timers fire independently in every process.
  • nats mode requires NATS Server 2.12 or later with JetStream enabled. Startup will fail with ErrNATSServerTooOld on older servers.
  • On startup in NATS mode the library reloads jobs from the KV bucket and filters stale in-flight scheduled messages against the persisted state (rather than purging the stream), so a crashed run does not duplicate triggers and a healthy peer is not disrupted.
  • nats mode supports multi-instance deployments. Replicas share the same durable consumer; each scheduled message is delivered to exactly one replica. First-deploy provisioning is serialized through nats_connector.Once. The scheduler's background reconciler republishes any recurring job whose next-run has slipped past nats.reconcilerGracePeriod to recover from publish failures or unclean shutdowns.

Documentation

Index

Constants

View Source
const (
	ModeGorm = "gorm"
	ModeNATS = "nats"

	DefaultMode          = ModeGorm
	DefaultStreamName    = "SCHEDULER"
	DefaultSubjectPrefix = "scheduler"
	DefaultConsumerName  = "scheduler-worker"
	DefaultJobBucket     = "SCHEDULER_JOBS"
	DefaultExecBucket    = "SCHEDULER_EXECUTIONS"
)

Variables

View Source
var ErrModeNotConfigured = errors.New("scheduler: required dependency not provided for selected mode")

ErrModeNotConfigured indicates required dependency for the selected mode is missing.

Functions

func Module

func Module(scope string) fx.Option

Types

type Params

type Params struct {
	fx.In

	Lifecycle fx.Lifecycle
	Logger    *zap.Logger
	NATS      *nats_connector.NATSConnector `optional:"true"`
	DB        database.DatabaseConnector    `optional:"true"`
}

type SchedulerModule

type SchedulerModule struct {
	// contains filtered or unexported fields
}

func (*SchedulerModule) EnsureJob

func (m *SchedulerModule) EnsureJob(id string, schedule libsched.Schedule, metadata map[string]string) error

EnsureJob registers a job idempotently. If the job ID already exists, its schedule is updated. If it does not exist, the job is added. Safe to call from fx.Invoke: if the scheduler has not started yet, the operation is queued and applied during OnStart.

func (*SchedulerModule) GetJob

func (m *SchedulerModule) GetJob(id string) (libsched.Job, error)

GetJob looks up a job by ID. Only valid after the scheduler has started.

func (*SchedulerModule) GetScheduler

func (m *SchedulerModule) GetScheduler() libsched.Scheduler

GetScheduler returns the underlying library Scheduler. Use only when the wrapper API is insufficient. Returns nil if called before OnStart.

func (*SchedulerModule) ListJobs

func (m *SchedulerModule) ListJobs() []libsched.Job

ListJobs returns all registered jobs. Returns nil if the scheduler has not started yet.

func (*SchedulerModule) RemoveJob

func (m *SchedulerModule) RemoveJob(id string) error

RemoveJob removes a job by ID. Only valid after the scheduler has started.

func (*SchedulerModule) SetHandler

func (m *SchedulerModule) SetHandler(h libsched.JobHandler)

SetHandler sets the global job handler. Typically called inside fx.Invoke at startup. Multiple calls overwrite the previous handler.

func (*SchedulerModule) SubmitJob

func (m *SchedulerModule) SubmitJob(id string, schedule libsched.Schedule, metadata map[string]string) error

SubmitJob adds a one-shot or dynamically created job. Returns libsched.ErrJobAlreadyExists if the ID collides. Also supports being called before OnStart via the pending queue.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL