scheduler

package
v1.115.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2026 License: Apache-2.0 Imports: 37 Imported by: 0

Documentation

Overview

* ChatCLI - Scheduler: append-only JSONL audit log. * * Every scheduler mutation (create, transition, cancel, fire) writes a * line to <dir>/audit.log. Operators and compliance auditors consume * this file via their existing log pipelines. The file is a plain * JSONL (one JSON object per line) to keep parsers simple. * * Rotation: lumberjack handles size-based rolling. Default 10 MiB per * file, 7 backups kept. Configurable via Config.AuditMaxSizeMB.

* ChatCLI - Scheduler: circuit breaker. * * A breaker protects the scheduler from a cascading failure mode: the * k8s API is down → every k8s_resource_ready evaluator times out → * hundreds of jobs stack up polling, saturating the worker pool and * preventing unrelated jobs from making progress. * * Breaker states (classic three-state pattern): * * Closed — calls pass through. Failures are counted. When the * count crosses the threshold within a sliding window, * trip to Open. * * Open — calls short-circuit with ErrBreakerOpen. After * cooldown elapses, allow one probe by transitioning * to HalfOpen. * * HalfOpen — exactly one probe permitted; subsequent callers * short-circuit until the probe result arrives. * probe success → Closed * probe failure → Open (cooldown restarted) * * Granularity: one breaker per (kind, key) — e.g. one for the * k8s_resource_ready evaluator type as a whole, another for each * shell action's command class. The breakerGroup holds them.

* ChatCLI - Scheduler: CLIBridge interface. * * The scheduler lives inside the cli package tree but deliberately does * NOT import the top-level cli.ChatCLI struct — that would create a * circular dependency with command_handler.go, cli_completer.go, * agent_mode.go, and everything else that will in turn want to call * the scheduler. * * Instead, everything the scheduler needs from the host process is * abstracted behind CLIBridge. The implementation lives in * cli/scheduler_bridge.go (top-level cli package) and is constructed * at NewChatCLI time. Unit tests provide a mock implementation. * * The bridge surface is deliberately wide — a narrow surface would * force us to duplicate logic, and the scheduler is already a * privileged subsystem (it can fire any slash command the user can).

* ChatCLI - Scheduler: IPC client. * * RemoteClient wraps a UNIX-socket connection and exposes methods that * mirror the in-process Scheduler API. The chatcli CLI uses this when * a daemon is detected on Start. * * The connection is persistent — reads and writes are serialized. When * the connection drops, every pending request gets ErrNoDaemon and * the CLI falls back to in-process mode (configurable).

* ChatCLI - Scheduler: autocomplete suggestions. * * The top-level cli/cli_completer.go calls Suggest(...) to fetch * /schedule, /wait and /jobs suggestions. Returning plain strings * rather than go-prompt.Suggest structs keeps this package free of a * TUI library dependency.

* ChatCLI - Scheduler: configuration. * * All operator-tunable knobs live here. Environment variables are * resolved once at Scheduler construction (via LoadConfigFromEnv); * subsequent changes require a restart — on purpose, so a mid-flight * `/reload` doesn't mutate the rate-limiter or the data dir under a * running queue. * * Env vars (prefixed CHATCLI_SCHEDULER_): * * ENABLED bool — master kill switch (default true) * DATA_DIR path — WAL + audit base dir * (default ~/.chatcli/scheduler) * MAX_JOBS int — max non-terminal jobs at once * (default 256) * DEFAULT_ACTION_TIMEOUT dur — default Action timeout (default 5m) * DEFAULT_POLL_INTERVAL dur — default wait poll (default 5s) * DEFAULT_WAIT_TIMEOUT dur — default wait timeout (default 30m) * DEFAULT_MAX_POLLS int — default wait poll cap (default 0=unlim) * DEFAULT_BACKOFF_INITIAL dur — default retry initial (default 1s) * DEFAULT_BACKOFF_MAX dur — default retry cap (default 5m) * DEFAULT_BACKOFF_MULT float — default retry multiplier (default 2.0) * DEFAULT_BACKOFF_JITTER float — default jitter fraction (default 0.2) * DEFAULT_MAX_RETRIES int — default MaxRetries (default 3) * DEFAULT_TTL dur — how long terminal jobs linger for * /jobs history (default 24h) * HISTORY_LIMIT int — max ExecutionResult per job * (default 16) * * ALLOW_AGENTS bool — may agents create jobs? (default true) * ACTION_ALLOWLIST csv — action types that may be scheduled * (default: slash_cmd,llm_prompt, * agent_task,worker_dispatch,hook, * noop,webhook,shell — shell still * goes through CoderMode safety) * * RATE_LIMIT_GLOBAL_RPS float (default 5.0) * RATE_LIMIT_GLOBAL_BURST int (default 20) * RATE_LIMIT_OWNER_RPS float (default 1.0) * RATE_LIMIT_OWNER_BURST int (default 10) * * BREAKER_FAILURE_THRESHOLD int (default 5) * BREAKER_WINDOW dur (default 60s) * BREAKER_COOLDOWN dur (default 30s) * * AUDIT_ENABLED bool (default true) * AUDIT_MAX_SIZE_MB int (default 10) * AUDIT_MAX_BACKUPS int (default 7) * AUDIT_MAX_AGE_DAYS int (default 30) * * DAEMON_SOCKET path (default ~/.chatcli/scheduler/daemon.sock) * DAEMON_AUTO_CONNECT bool (default true) * * SNAPSHOT_INTERVAL dur (default 5m) * WAL_GC_INTERVAL dur (default 1h) * * WORKER_COUNT int (default 4) * WAIT_WORKER_COUNT int (default 8)

* ChatCLI - Scheduler: daemon mode. * * Running `chatcli daemon start` detaches the scheduler from the * interactive process. Any CLI that comes up on the same host detects * the daemon via the UNIX socket and becomes a thin client: /schedule, * /wait, /jobs all round-trip over IPC. * * Lifecycle: * * start → bind socket, start Scheduler, accept connections * stop → orderly Scheduler.DrainAndShutdown + close listener * status → print stats retrieved via IPC * * The daemon uses a PID file (daemon.pid) alongside the socket to * detect stale sockets after a crash. If the PID file exists but the * process is gone, the socket file is removed and start retries.

* ChatCLI - Scheduler: per-job dispatch logic (wait → action → finalize). * * handleJob is the heart of the scheduler. For a single fire: * * 1. Load the Job and verify it is still eligible (not cancelled * between pump and worker dequeue). * * 2. If Wait is set, enter the wait loop: poll Condition every * PollInterval until satisfied, timed out, or budget exhausted. * Each poll result is metrics-emitted and the breaker is updated. * * 3. Run the Action under a bounded timeout. Outcome classification * drives the next transition (success → Completed, transient * failure → retry-with-backoff, permanent → Failed). * * 4. If the job has Triggers edges, spawn the children and mark them * ready. Children created from a Triggers edge get a fresh ID * (NewJobID) so a recurring parent can fan out a fresh DAG on * every cycle. * * 5. Recurring schedules (cron / interval) re-enqueue with the * freshly computed next time unless the job itself is terminal.

* ChatCLI - Scheduler / Chronos subsystem. * * The scheduler is chatcli's durable, crash-consistent automation layer. * It turns chatcli into a standing process that can be told "wait until * X, then do Y" and keep that promise across restarts, network hiccups, * and user-input lulls. The same machinery is used by the ReAct loop * (agents and workers gain schedule_job / wait_until tools) and by the * human via /schedule, /wait and /jobs commands. * * High-level model: * * ┌─────────────┐ ┌──────────────┐ ┌─────────────┐ * │ CLI surface │◀──────▶│ Scheduler │◀──────▶│ WAL + snap │ * │ /schedule… │ │ state m/c │ │ on disk │ * └─────────────┘ │ priority Q │ └─────────────┘ * │ breakers │ * ┌─────────────┐ │ rate limit │ ┌─────────────┐ * │ Agent tools │◀──────▶│ dispatcher │──────▶│ Condition │ * │ schedule_* │ │ │ │ evaluators │ * └─────────────┘ │ │ └─────────────┘ * │ │ ┌─────────────┐ * │ │──────▶│ Action │ * │ │ │ executors │ * │ │ └─────────────┘ * │ │ * │ │─────▶ event bus + hooks + audit * └──────────────┘ * * Durability contract (identical to the Reflexion lesson queue): * 1. Accept Enqueue only after the WAL record is fsynced. * 2. Process, then ACK (delete or update the WAL). * 3. On crash, replay the WAL on boot and resume. * 4. A missed fire is classified by MissPolicy (fire_once / fire_all / * skip) so laptops that slept through a schedule do not thrash. * * Autonomy: * In-process — jobs run while chatcli is open. WAL persists state; a * new CLI session replays pending work. * Daemon — `chatcli daemon start` detaches the scheduler from the * interactive process. Any CLI that comes up detects the daemon via * UNIX socket (chatcli-scheduler.sock), thin-client style. See * daemon.go and ipc.go for the protocol. * * Safety: * Actions are gated by an allowlist (CHATCLI_SCHEDULER_ACTION_ALLOWLIST); * shell actions run under CoderMode SafeMode; agent-created jobs are * flagged trust=agent and cannot cancel user-owned jobs without the * hook manager firing a PreJobCancel event. See action/ subpackage. * * See SCHEDULER_DESIGN.md (docs/) for the architecture rationale and * tradeoff discussion. Every design decision here was made explicitly * and the options are documented alongside the chosen path.

* ChatCLI - Scheduler: sentinel errors. * * Every surface-facing error the scheduler can return is declared here * so callers (CLI handlers, agent tool adapters, daemon IPC) can match * with errors.Is. Internal-only errors that never cross a package * boundary stay inline where they occur.

* ChatCLI - Scheduler: event payloads + publishing helpers. * * The scheduler uses three notification channels, each serving a * different audience: * * 1. Event bus (cli/bus) — live fan-out to UI subscribers (status * line, Ctrl+J overlay). Non-durable; dropped on disconnect. * * 2. Hook manager (cli/hooks) — user-configured command/HTTP * callbacks. Durable to the extent the hook command is. * * 3. Audit log — append-only JSONL for forensics. Always written. * * This file defines the payload shape shared by all three. Scheduler * helper methods (emit*) centralize the triple-fan-out so every state * change produces a consistent observability signal.

* ChatCLI - Scheduler: JobID derivation. * * JobIDs are derived deterministically from (name | owner | nonce) via * sha256 truncated to 16 hex chars. Deterministic derivation gives us * two things for free: * * 1. Idempotency — re-submitting the same (name, owner) with the * same nonce produces the same ID. WAL Append dedupes on ID so * a nervous caller can retry without creating duplicates. * * 2. Readability — 16 hex chars is enough entropy to avoid collisions * in a single workspace, and operators can eyeball log lines. * * For recurring jobs the nonce is the schedule's CreatedAt timestamp * (rounded to microseconds); each occurrence gets its own JobID but * the *definition* is stable — cancel_by_name uses that stability.

* ChatCLI - Scheduler: daemon IPC protocol. * * The daemon exposes the scheduler over a UNIX socket. The protocol is * length-prefixed JSON frames (so a stream can carry both requests and * server-initiated events cleanly). * * Frame: [4-byte uint32 big-endian length] [JSON payload] * * Every request has an ID; every response carries the same ID. Server- * initiated events use ID="" and Kind="event". * * Kinds: * * client → server: * "ping" * "enqueue" Input: ToolInput + Owner * "cancel" Input: id, reason * "pause"/"resume" Input: id * "query" Input: id * "list" Input: filter * "snapshot" Input: none * "subscribe" — enter event-stream mode for this connection * "stats" Input: none — returns scheduler + daemon stats * "bye" — orderly close * * server → client: * "ok" Result: ToolOutput / bytes * "error" Error: string * "event" Event: scheduler.Event (only after subscribe) * * Thread safety: one goroutine per connection handles reads+writes * under a single writer mutex. Readers decode frames sequentially.

* ChatCLI - Scheduler: Job struct + lifecycle primitives. * * A Job is the durable record the scheduler works on. It holds: * - Identity (ID, Name, Owner, Tags) * - Schedule (when to fire) * - Action (what to run) * - WaitSpec (optional gating condition) * - Budget (timeouts, retries, poll limits) * - DAG edges (DependsOn, Triggers) * - State (Status + transitions history) * - Results (LastResult, History) * * The struct is serialized verbatim to the WAL (one record per job), * the audit log, and the IPC protocol. Schema migration is handled via * the Version field — new fields use omitempty so old records remain * readable.

* ChatCLI - Scheduler: Prometheus metrics. * * Metrics are registered against the default Prometheus registry and * exposed by the /metrics endpoint in server mode. The scheduler does * not start its own HTTP server — the chatcli metrics package handles * that. * * Naming convention follows Prometheus recommendations: * chatcli_scheduler_<subject>_<unit>_<op>

* ChatCLI - Scheduler: DSL parser for schedules and conditions. * * The user-facing / agent-facing surface of the scheduler accepts * short, ergonomic strings for the common cases and falls back to * structured JSON/YAML specs for complex ones. * * Schedule DSL: * * "in 5m" / "+5m" / "after 30s" → ScheduleRelative * "at 2026-04-25T14:00" / "at now" → ScheduleAbsolute * "every 30s" / "every 2h" → ScheduleInterval * "cron: 0 2 * * *" / "@daily" → ScheduleCron * "when-ready" / "on-condition" → ScheduleOnCondition * "manual" / "triggered" → ScheduleManual * * Condition DSL: * * "http://host/health==200" → http_status expected=200 * "http://host/health~=/ok/" → http_status + regex * "shell: <cmd>" / "shell[0]:cmd" → shell_exit * "file:/path" → file_exists * "file:/path>=10" → file_exists min_size * "k8s:pod/ns/name:ready" → k8s_resource_ready * "docker:<container>:running" → docker_running * "tcp://host:port" → tcp_reachable * "regex:<cmd>~=/pattern/" → regex_match * "llm: <prompt>" → llm_check * "and(<expr1>, <expr2>)" / "or(...)" → composite * "not <expr>" → apply Negate=true * * Ambiguous or complex inputs must be submitted as a JSON spec via * --wait-spec-file / --condition-json.

* ChatCLI - Scheduler: shell policy preflight. * * Every Job admitted into the scheduler is walked BEFORE being written * to the WAL, and each shell command embedded in it is classified * against the CoderMode policy manager (via CLIBridge.ClassifyShellCommand). * * The rules: * * ShellPolicyDeny → reject enqueue with ErrShellPolicyDeny, always. * Even --i-know (DangerousConfirmed=true) cannot * override a deny — denylist is authoritative. * * ShellPolicyAsk → reject enqueue with ErrShellPolicyAsk, UNLESS * job.DangerousConfirmed is true. The user (or * agent with explicit user blessing) must * pre-acknowledge that the command would normally * prompt. This converts an interactive approval * into a durable decision attached to the job. * * ShellPolicyAllow → admit normally. * * At fire time RunShell on the bridge re-checks — a policy update * between schedule and fire that moves a command from Allow→Deny * makes the job fail cleanly. * * This file also enumerates shell commands from a Job. Shell commands * hide in three places: * * 1. Action.Payload — when Action.Type == ActionShell, the command * lives under payload.command. * 2. Wait.Condition — shell-based evaluators (shell_exit, regex_match, * custom, k8s_resource_ready, docker_running, llm_check with * context_cmd) carry a command in Condition.Spec. * 3. Composite children — all_of/any_of recurse through Children. * * Non-shell actions/conditions (webhook, http_status, file_exists, * tcp_reachable) do not need policy review here — their security * gates are in the network / filesystem layers.

* ChatCLI - Scheduler: priority queue. * * The scheduling index is a min-heap keyed on NextFireAt. Fronted by a * map[JobID]*heapEntry for O(1) dedupe, Remove, and Requeue. Lazy * deletion keeps Remove off the heap's critical path — entries are * marked dead and skipped on Pop. * * Invariants: * - Each JobID appears at most once live in the heap. * - Dead entries sort after live ones so Pop reliably returns work. * - The queue is the *scheduling index* only; the WAL is the truth * of what jobs exist. Drain (on boot) re-Enqueue-s from the WAL. * * Thread-safety: single mutex + cond var (same pattern as lessonq, * already proven in production).

* ChatCLI - Scheduler: rate limiting. * * Two levels: * * Global — protects the scheduler itself from enqueue storms. A * token-bucket on total Enqueue rate; when exhausted, Enqueue * returns ErrRateLimited with a Retry-After hint. * * Per-owner — caps how many jobs a single agent or worker can create * per minute. Prevents a runaway ReAct loop from filling the queue. * * Implemented on top of golang.org/x/time/rate, which is already a * dependency (used by the LLM manager's per-provider rate limits).

* ChatCLI - Scheduler: evaluator + executor interfaces and registries. * * The scheduler stays extensible by deferring "how do I evaluate * k8s readiness" to a ConditionEvaluator plug-in, and "how do I run a * shell command" to an ActionExecutor. Both interfaces are small and * synchronous; the scheduler owns concurrency, the plug-in owns * semantics. * * Registration happens at scheduler construction time (see * builtin.go's RegisterBuiltins helper). Users who want to add a custom * evaluator / executor implement the interface and call * scheduler.ConditionRegistry().Register(...) before Start.

* ChatCLI - Scheduler: terminal render helpers for /jobs output. * * These helpers are string generators — the CLI package decides where * to print them. Keeping render in the scheduler package lets the * daemon IPC client reuse the same formatting when the operator * attaches remotely. * * Formatting is chosen for terminal consumption: ASCII boxes + ANSI- * agnostic (no colors here; color is applied by the caller via the * existing cli/colors.go palette).

* ChatCLI - Scheduler: boot-time replay. * * Boot order: * 1. Try the snapshot — fast path. If present and version-compatible, * hydrate the in-memory map from it, then overlay any .wal records * not covered by the snapshot. * 2. If no usable snapshot, fall back to a full WAL scan. * 3. For every live (non-terminal) job, re-enqueue it with an * adjusted NextFireAt according to MissPolicy. * * MissPolicy semantics on replay: * fire_once — coalesce all missed ticks into a single fire at now. * fire_all — schedule now, and scheduler.maybeReschedule will keep * firing as the queue catches up (bounded by the natural * cron tick so we don't runaway). * skip — leave NextFireAt in the future; missed windows are * considered a no-op.

* ChatCLI - Scheduler: retry / backoff math. * * Isolated here so unit tests can exercise the ramp without spinning * up a full Scheduler. The shape is identical to the Reflexion queue's * retry policy (RetryPolicy.NextDelay in lessonq): expo with cap + * uniform jitter.

* ChatCLI - Scheduler: Schedule value type + cron parser. * * A Schedule answers one question: "when should this job next be * considered for execution?". The scheduler's main loop calls * Schedule.Next(now, prev) to compute the next fire time. * * Cron expressions: * * Five-field standard form — "minute hour day-of-month month day-of-week" * * Field Range Aliases * minute 0-59 * hour 0-23 * day-of-month 1-31 * month 1-12 jan feb mar apr may jun jul aug sep oct nov dec * day-of-week 0-6 (0=Sun) sun mon tue wed thu fri sat * * Specials per field: "*", "N", "A,B,C", "A-B", "N/S", "A-B/S". * Shorthand: "@hourly" "@daily" "@weekly" "@monthly" "@yearly". * * The parser is intentionally minimal — we support the subset that * covers 99% of automation use cases and bail with a clear error on * anything exotic. No external cron library so we keep compile time * / supply chain small.

* ChatCLI - Scheduler: core engine. * * Scheduler is the single object callers interact with. It composes the * WAL + in-memory queue + rate limiter + breaker group + registries * into a coherent lifecycle: * * sched := scheduler.New(cfg, bridge, logger) * scheduler.RegisterBuiltins(sched) // standard plug-ins * sched.Start(ctx) // spin up loops * id, _ := sched.Enqueue(ctx, job) * // ... * sched.DrainAndShutdown(30 * time.Second) * * The worker model is two-tier: * * Schedule pump — one goroutine owns the scheduleQueue. It PopReady's * the earliest fireAt and hands the JobID to a bounded worker pool. * * Worker pool — N goroutines (Config.WorkerCount) pick up job IDs * and run handleJob which walks the full Wait → Action → Finalize * pipeline. Wait polls spawn their own ticker; the worker blocks on * ctx and the poll channel. * * Durability contract: * 1. Every Enqueue fsyncs the WAL before returning. * 2. Every state transition Writes the updated Job to the WAL. * 3. Terminal jobs stay on disk for Config.DefaultTTL; the GC * goroutine Acks them after expiration.

* ChatCLI - Scheduler: shell policy classification types. * * Dedicated file so the CLIBridge interface stays readable and tests * can import the ShellPolicy symbol without pulling the full bridge * surface. * * The scheduler package intentionally does NOT depend on cli/coder; * classification is delegated to the host process through the * CLIBridge abstraction. This keeps the scheduler engine unit- * testable against mock policies and keeps the import graph shallow.

* ChatCLI - Scheduler: periodic state snapshot. * * Rationale: the WAL alone recovers correctly, but boot time grows * linearly with total record count. A periodic snapshot freezes the * live state to a single file (snapshot.json). On boot the scheduler * loads the snapshot first, then overlays any .wal records newer than * the snapshot's timestamp. Records covered by the snapshot can be * GC'd out of the WAL directory. * * The snapshot is written atomically via tmp-rename so partial writes * are never observable. Stale / corrupt snapshots are ignored in favor * of a full WAL scan — the WAL is the truth, the snapshot is cache.

* ChatCLI - Scheduler: Job state machine. * * The state machine serves three purposes: * * 1. Enforce legal transitions — rejecting bugs that try to push a * job from (for example) Completed back to Running. * * 2. Centralize the side-effects of a transition — metrics, audit * log entry, event-bus publish, and hook fire. Every transition * path produces the same observability signal with no duplication. * * 3. Make concurrent mutation safe. Each Job owns a sync.Mutex that * Scheduler methods lock for the duration of the transition so no * two goroutines race over the state. * * Legal transitions (read top→bottom; missing edges are illegal): * * Pending → Blocked, Waiting, Running, Paused, Cancelled, Skipped * Blocked → Pending, Cancelled * Waiting → Running, TimedOut, Cancelled, Failed * Running → Completed, Failed, TimedOut, Cancelled, Pending (recurring re-arm) * Paused → Pending, Cancelled * (terminal) Completed / Failed / Cancelled / TimedOut / Skipped * * The scheduler never resurrects terminal jobs. A recurring cron or * interval schedule re-arms BEFORE the Running→Completed transition: * after a successful action the dispatcher takes the Running→Pending * edge so the same Job record (and JobID) keeps cycling. This keeps * Job count, WAL footprint, and queue depth bounded — a critical * property when intervals are short (e.g. health-check every 30s * would otherwise grow 2880 records/day). History entries carry a * CycleNum so /jobs logs separates cycles cleanly even though they * share one Job. Terminal statuses remain a one-way door for non- * recurring work.

* ChatCLI - Scheduler: agent tool adapter. * * The ReAct loop gains five tools that map directly onto the Scheduler * API: * * schedule_job(spec) — create a job; returns the ID. * wait_until(spec) — sync wait (agent yields); returns outcome. * With async=true, returns immediately after * creating the job (caller receives job.fired * event in next turn via AppendHistory). * query_job(id) — return a Job clone. * list_jobs(filter) — return summaries. * cancel_job(id) — cancel if authorized. * * The adapter is stateless: every method gets the owner context from * the caller (agent mode passes its agent name, the ChatCLI session * passes user). Authorization is enforced inside Scheduler itself. * * The adapter returns JSON strings suitable for direct insertion into * the ReAct loop's tool_result block. Callers just pass (input_json, * agent_owner) and get (result_json, error).

* ChatCLI - Scheduler: core value types. * * Every type here is a plain value type. Behavior lives in separate * files (scheduler.go, queue.go, state_machine.go, action/, condition/). * Keeping types/behavior separated makes the schema trivial to JSON * encode for the WAL, the IPC protocol and the audit log.

* ChatCLI - Scheduler: Write-Ahead Log. * * Identical durability contract as the Reflexion lesson queue: * - Append is synchronous (fsync + atomic rename) before the job is * visible to the in-memory queue. * - Record format is a framed envelope with leading+trailing CRC32 * so torn writes are detected and discarded on replay. * - One file per record named by JobID, so ACK is a single unlink. * * The scheduler WAL differs from lessonq in two ways: * * 1. Records mutate — a job moves Pending → Waiting → Running → * Completed. Each transition rewrites the .wal file via the same * write-then-rename path, so readers never observe torn state. * * 2. Terminal jobs are NOT immediately unlinked; they're kept for * TTL so /jobs history can list them, then GC'd. The snapshot * (snapshot.go) periodically freezes the entire state into a * single file so replay is fast even with thousands of records. * * File layout: * * <dir>/ * <jobid>.wal — current record per job * <jobid>.tmp.* — in-flight Append; leftover tmp on crash = ignore * snapshot.json — periodic full-state snapshot (see snapshot.go) * snapshot.tmp — in-flight snapshot write

Index

Constants

View Source
const SchemaVersion = 1

SchemaVersion increments whenever Job gains a required field. Tests assert the WAL is forward-compatible via the /testdata fixtures.

Variables

View Source
var (
	// ErrSchedulerClosed is returned from any public method after the
	// scheduler has been stopped. Callers should treat this as terminal
	// for the current instance — a new Scheduler must be constructed to
	// accept work again.
	ErrSchedulerClosed = errors.New("scheduler: closed")

	// ErrSchedulerDraining is returned from Enqueue after Stop has been
	// called but before all in-flight jobs have finished. New work is
	// rejected; already-queued jobs continue to run.
	ErrSchedulerDraining = errors.New("scheduler: draining")

	// ErrNotStarted is returned when a method that requires a running
	// loop is called before Start.
	ErrNotStarted = errors.New("scheduler: not started")
)

Lifecycle.

View Source
var (
	// ErrInvalidJob wraps a specific validation failure from Job.Validate.
	ErrInvalidJob = errors.New("scheduler: invalid job")

	// ErrInvalidSchedule indicates a Schedule whose fields are
	// inconsistent with its Kind (e.g. absolute schedule with zero
	// time, cron with empty expression).
	ErrInvalidSchedule = errors.New("scheduler: invalid schedule")

	// ErrInvalidCondition is returned when a Condition.Type is unknown
	// to the registry, or when the Spec is missing required fields.
	ErrInvalidCondition = errors.New("scheduler: invalid condition")

	// ErrInvalidAction mirrors ErrInvalidCondition for actions.
	ErrInvalidAction = errors.New("scheduler: invalid action")

	// ErrDuplicateName is returned when Enqueue sees a job name already
	// held by a non-terminal job. Job IDs are globally unique (derived
	// from name + owner + nonce) — but users expect human-readable
	// names to be unique within their scope, so we enforce it.
	ErrDuplicateName = errors.New("scheduler: duplicate name")

	// ErrDAGCycle is returned when the caller passes DependsOn edges
	// that form a cycle. Scheduler refuses to admit such jobs.
	ErrDAGCycle = errors.New("scheduler: dag cycle")
)

Validation — raised during parse / admission.

View Source
var (
	// ErrJobNotFound is returned from Query / Cancel / Pause when the
	// given ID has no record in the active set.
	ErrJobNotFound = errors.New("scheduler: job not found")

	// ErrJobTerminal is returned when a mutating operation is attempted
	// on a job that has already reached a terminal state (completed,
	// failed, cancelled).
	ErrJobTerminal = errors.New("scheduler: job is terminal")

	// ErrWaitTimeout is returned by evaluators that exhausted their
	// MaxPolls or WaitTimeout without the condition ever being
	// satisfied.
	ErrWaitTimeout = errors.New("scheduler: wait condition timed out")

	// ErrBreakerOpen is returned when an evaluator or action is skipped
	// because its circuit breaker is open. The scheduler still marks
	// the job as failed for metric purposes; the message surfaces why.
	ErrBreakerOpen = errors.New("scheduler: circuit breaker open")

	// ErrRateLimited is returned from Enqueue when the per-owner or
	// global rate limit would be exceeded. Callers may retry after
	// the Retry-After hint surfaced on the underlying limiter error.
	ErrRateLimited = errors.New("scheduler: rate limited")

	// ErrQueueFull is returned when Enqueue cannot admit more jobs
	// and the overflow policy is Block (with timeout elapsed) or when
	// the WAL has already reached MaxJobs.
	ErrQueueFull = errors.New("scheduler: queue full")
)

Runtime — raised while executing.

View Source
var (
	// ErrNotAuthorized is returned when a caller (typically an agent)
	// tries to mutate a job owned by a different principal.
	ErrNotAuthorized = errors.New("scheduler: not authorized")

	// ErrActionDisallowed is returned when a requested Action type is
	// not in the operator-configured allowlist.
	ErrActionDisallowed = errors.New("scheduler: action type not in allowlist")

	// ErrDangerousShell is returned when a shell action's command
	// matches a pattern flagged by the shell safety filter and the
	// job was not submitted with the explicit --i-know flag.
	ErrDangerousShell = errors.New("scheduler: dangerous shell command requires explicit confirmation")

	// ErrShellPolicyDeny is returned when a shell command (in an
	// action or wait condition) is on the CoderMode denylist.
	// Unlike ErrDangerousShell, this cannot be overridden by --i-know
	// — denylist beats user confirmation. The job is rejected at
	// enqueue time and never admitted to the WAL.
	ErrShellPolicyDeny = errors.New("scheduler: shell command denied by policy")

	// ErrShellPolicyAsk is returned when a shell command would
	// normally require interactive approval under CoderMode. The
	// scheduler has no interactive channel at fire time, so the
	// command is rejected unless the job was created with --i-know
	// (which sets Job.DangerousConfirmed=true). When re-checked at
	// fire time (policy may have changed since enqueue), the same
	// error is returned if the classification drifted to Ask.
	ErrShellPolicyAsk = errors.New(`scheduler: shell command requires approval. Pre-authorize with one of: ` +
		`(slash command) /schedule ... --i-know  ` +
		`(@scheduler tool) {"cmd":"schedule","args":{...,"i_know":true}}  ` +
		`(persist for future jobs) /config security allow "<pattern>"`)
)

Authorization.

View Source
var (
	// ErrNoDaemon is returned by the IPC client when no daemon is
	// reachable on the configured socket path. Callers may fall back
	// to in-process scheduling or prompt the user to start the daemon.
	ErrNoDaemon = errors.New("scheduler: no daemon running")

	// ErrDaemonRunning is returned by `chatcli daemon start` when a
	// daemon is already bound to the socket — starting a second daemon
	// would split the WAL between two writers.
	ErrDaemonRunning = errors.New("scheduler: daemon already running")

	// ErrIPCProtocol covers any malformed / unrecognized frame on the
	// daemon socket.
	ErrIPCProtocol = errors.New("scheduler: ipc protocol error")
)

Daemon.

View Source
var (
	// ErrWALCorrupted is returned when a WAL record fails CRC validation
	// and is not recoverable. The record is quarantined to the DLQ.
	ErrWALCorrupted = errors.New("scheduler: wal record corrupted")
)

Persistence.

Functions

func CheckDaemon

func CheckDaemon(socketPath string) error

CheckDaemon tries to Ping the daemon at socketPath; returns nil iff a daemon is up and responsive.

func DefaultSocketPath

func DefaultSocketPath(cfg Config) string

DefaultSocketPath returns the platform-default daemon socket path. Honors CHATCLI_SCHEDULER_DAEMON_SOCKET when set.

func NewAuditFileWriter

func NewAuditFileWriter(dir string, maxSizeMB, maxBackups, maxAgeDays int, logger *zap.Logger, m *Metrics) auditWriter

NewAuditFileWriter returns a file-backed audit writer. Exposed for the daemon, which wants the path reported via IPC.

func NopAuditWriter

func NopAuditWriter() auditWriter

NopAuditWriter returns the no-op audit writer.

func RenderList

func RenderList(summaries []JobSummary) string

RenderList emits the /jobs list table as plain text (no color). Columns: STATUS | ID | NAME | OWNER | TYPE | NEXT FIRE | LAST

func RenderShow

func RenderShow(j *Job) string

RenderShow emits the detailed `/jobs show <id>` view.

func RenderTree

func RenderTree(jobs []*Job) string

RenderTree emits the DAG view rooted at any top-level job (one whose ParentID is empty or whose parent isn't in the list).

func StatusBadge

func StatusBadge(s JobStatus) string

StatusBadge returns a fixed-width one-glyph label for a JobStatus. Used by the status line prefix and the tree view.

func StatusLine

func StatusLine(summaries []JobSummary) string

StatusLine renders a short indicator for the prompt prefix: "[jobs: 2▶ 3⏳ 1⏱]"

Types

type Action

type Action struct {
	Type    ActionType     `json:"type"`
	Payload map[string]any `json:"payload,omitempty"`
}

Action describes what to execute when the schedule fires (and the wait condition, if any, is satisfied).

func ParseActionDSL

func ParseActionDSL(input string) (Action, error)

ParseActionDSL interprets a short string as an Action.

"/foo bar"              → slash_cmd command=/foo bar
"shell: <cmd>"          → shell command=<cmd>
"agent: <task>"         → agent_task task=<task>
"worker <name>: <task>" → worker_dispatch
"llm: <prompt>"         → llm_prompt prompt=<prompt>
"POST https://...| body" → webhook
"hook:<event>"          → hook event=<event>
"noop"                  → noop

func ParseActionJSON

func ParseActionJSON(raw string) (Action, error)

ParseActionJSON loads an action from a JSON string.

func (Action) PayloadBool

func (a Action) PayloadBool(key string, def bool) bool

PayloadBool reads a bool field from the action payload.

func (Action) PayloadInt

func (a Action) PayloadInt(key string, def int) int

PayloadInt reads an int field from the action payload.

func (Action) PayloadMap

func (a Action) PayloadMap(key string) map[string]any

PayloadMap reads a map[string]any field. Useful for webhook headers and llm_prompt tool overrides.

func (Action) PayloadString

func (a Action) PayloadString(key, def string) string

PayloadString reads a string field from the action payload with a default.

func (Action) PayloadStringSlice

func (a Action) PayloadStringSlice(key string) []string

PayloadStringSlice reads a []string field (or a single string that splits on commas/whitespace).

func (Action) Validate

func (a Action) Validate() error

Validate returns a non-nil error when the Action is malformed. Type- specific validation (e.g. URL shape for webhook) is the executor's responsibility.

type ActionExecutor

type ActionExecutor interface {
	Type() ActionType
	ValidateSpec(payload map[string]any) error
	Execute(ctx context.Context, action Action, env *ExecEnv) ActionResult
}

ActionExecutor is a plug-in that can run one action type.

type ActionRegistry

type ActionRegistry struct {
	// contains filtered or unexported fields
}

ActionRegistry mirrors ConditionRegistry.

func NewActionRegistry

func NewActionRegistry() *ActionRegistry

NewActionRegistry builds an empty registry.

func (*ActionRegistry) Get

Get returns the executor for a type.

func (*ActionRegistry) MustRegister

func (r *ActionRegistry) MustRegister(e ActionExecutor)

MustRegister panics on duplicate.

func (*ActionRegistry) Register

func (r *ActionRegistry) Register(e ActionExecutor) error

Register adds an executor.

func (*ActionRegistry) Types

func (r *ActionRegistry) Types() []ActionType

Types returns the registered type names, sorted.

type ActionResult

type ActionResult struct {
	Output string
	Tokens int
	Cost   float64
	// Transient mirrors EvalOutcome.Transient — retryable vs permanent.
	Transient bool
	Err       error
}

ActionResult is what an executor returns after a single invocation.

type ActionType

type ActionType string

ActionType enumerates the built-in action categories.

const (
	// ActionSlashCmd — invokes a CLI slash command via the command handler.
	ActionSlashCmd ActionType = "slash_cmd"
	// ActionShell — runs a shell command under CoderMode safety.
	ActionShell ActionType = "shell"
	// ActionAgentTask — boots a ReAct loop with the given task.
	ActionAgentTask ActionType = "agent_task"
	// ActionWorkerDispatch — single-agent worker invocation.
	ActionWorkerDispatch ActionType = "worker_dispatch"
	// ActionLLMPrompt — headless LLM call (no tool loop).
	ActionLLMPrompt ActionType = "llm_prompt"
	// ActionWebhook — HTTP POST.
	ActionWebhook ActionType = "webhook"
	// ActionHook — fires a chatcli hook by name.
	ActionHook ActionType = "hook"
	// ActionNoop — do nothing (testing, placeholders).
	ActionNoop ActionType = "noop"
	// ActionAgentResume — load a parked agent snapshot and re-enter the
	// interactive ReAct loop. Fired by single-shot timer parks
	// (delay/until) and by the polling driver when the probe matches.
	ActionAgentResume ActionType = "agent_resume"
	// ActionParkPoll — drives the polling loop for @park for_url /
	// for_cmd. Re-enqueues itself at every interval until the probe
	// matches success_when (transitions to AgentResume) or the deadline
	// elapses (also transitions to AgentResume with timeout outcome).
	ActionParkPoll ActionType = "park_poll"
)

type Breaker

type Breaker struct {
	// contains filtered or unexported fields
}

Breaker is the per-(kind,key) circuit breaker. Safe for concurrent use.

func (*Breaker) Acquire

func (b *Breaker) Acquire() (release func(success bool), err error)

Acquire asks the breaker whether a call may proceed. Returns:

  • (true, nil) → call allowed. Caller must invoke Release after.
  • (false, err) → call denied. err wraps ErrBreakerOpen.

The returned release callback records success/failure outcome. On (false, err) the release callback is a no-op.

func (*Breaker) Key

func (b *Breaker) Key() string

Key returns the breaker's identifier.

func (*Breaker) State

func (b *Breaker) State() BreakerState

State reports the current breaker state for /jobs show and metrics.

type BreakerConfig

type BreakerConfig struct {
	// FailureThreshold is how many failures within Window trip the breaker.
	FailureThreshold int
	// Window is the sliding window during which failures are counted.
	Window time.Duration
	// Cooldown is how long to stay Open before a HalfOpen probe.
	Cooldown time.Duration
	// HalfOpenSuccessRequired is how many consecutive successes close
	// the breaker from HalfOpen. Default 1 — one probe is enough.
	HalfOpenSuccessRequired int
}

BreakerConfig tunes the trip thresholds. Zero values inherit scheduler defaults — see breakerGroup.NewBreaker.

type BreakerState

type BreakerState int32

BreakerState is the exposed lifecycle state. Kept as a small int so atomic loads are cheap on the hot path.

const (
	BreakerClosed BreakerState = iota
	BreakerOpen
	BreakerHalfOpen
)

func (BreakerState) String

func (s BreakerState) String() string

String makes states log-friendly.

type Budget

type Budget struct {
	// ActionTimeout bounds a single execution of Action.
	ActionTimeout time.Duration `json:"action_timeout,omitempty"`

	// MaxRetries is the total number of retries after a failure
	// (so MaxRetries=3 means up to 4 executions total).
	MaxRetries int `json:"max_retries,omitempty"`

	// Backoff describes the retry delay ramp.
	BackoffInitial time.Duration `json:"backoff_initial,omitempty"`
	BackoffMax     time.Duration `json:"backoff_max,omitempty"`
	BackoffMult    float64       `json:"backoff_mult,omitempty"`
	BackoffJitter  float64       `json:"backoff_jitter,omitempty"` // 0..0.5

	// WaitTimeout caps the total time a wait-until loop may consume.
	WaitTimeout time.Duration `json:"wait_timeout,omitempty"`

	// PollInterval is the gap between wait-condition evaluations.
	PollInterval time.Duration `json:"poll_interval,omitempty"`

	// MaxPolls caps the number of wait evaluations. Zero = unlimited
	// (bounded only by WaitTimeout).
	MaxPolls int `json:"max_polls,omitempty"`
}

Budget bundles every "how long / how many times" limit the scheduler enforces on a single job. Any zero field is interpreted as "use the scheduler default from Config".

func DefaultBackoff

func DefaultBackoff() Budget

DefaultBackoff returns the production default: 1s → 5min ramp, factor 2, 20% jitter, 5 retries.

func (Budget) Merge

func (b Budget) Merge(other Budget) Budget

Merge fills zero fields in b from other, returning the merged copy. Used to layer Job.Budget over Config defaults.

type CLIBridge

type CLIBridge interface {
	// ExecuteSlashCommand runs a /foo command as if the user typed it.
	// Returns the output captured and whether the command asked to
	// exit the CLI loop. dangerousConfirmed mirrors
	// Job.DangerousConfirmed so the scheduler's headless policy
	// checker can admit "Ask" classifications when the job was
	// pre-authorized via --i-know / i_know:true.
	ExecuteSlashCommand(ctx context.Context, line string, dangerousConfirmed bool) (output string, exit bool, err error)

	// RunAgentTask boots a ReAct loop with the given task + system
	// prompt hint (same API as ChatCLI.agentMode.Run). Returns the
	// final assistant message. dangerousConfirmed has the same
	// semantics as on ExecuteSlashCommand.
	RunAgentTask(ctx context.Context, task, systemHint string, dangerousConfirmed bool) (string, error)

	// DispatchWorker runs a single worker (see cli/agent/workers) with
	// the named agent type and task. Useful as a lightweight action
	// when the user wants one-shot behavior without full ReAct state.
	DispatchWorker(ctx context.Context, agentType, task string) (string, error)

	// SendLLMPrompt calls the currently-configured LLM client outside
	// the chat history. Used by ActionLLMPrompt and llm_check
	// evaluator.
	SendLLMPrompt(ctx context.Context, system, prompt string, maxTokens int) (text string, tokens int, cost float64, err error)

	// FireHook dispatches a chatcli hook event synchronously and
	// returns the HookResult (for scheduler use of action_type=hook).
	FireHook(event hooks.HookEvent) *hooks.HookResult

	// RunShell executes a shell command under CoderMode safety rules
	// (same allowlist the agent mode enforces). When coderSafetyBypass
	// is true and the operator has explicitly granted it, the command
	// runs without the allowlist — reserved for trusted automation.
	//
	// dangerousConfirmed mirrors Job.DangerousConfirmed (set by
	// --i-know on /schedule or i_know:true on @scheduler). When true,
	// the fire-time recheck admits "Ask" classifications — the user
	// already pre-authorized at enqueue. Denylist still rejects (deny
	// always beats --i-know). Action executors that hold an env.Job
	// (e.g. action/shell.go) MUST pass env.Job.DangerousConfirmed
	// here; otherwise jobs admitted via i_know fail their re-check.
	//
	// Defense in depth: even though Enqueue already preflight-checked
	// every shell command via ClassifyShellCommand, RunShell re-checks
	// at fire time so policy changes between schedule and execution
	// propagate — a command that was Allowed when enqueued but that
	// now hits a new Deny rule must fail instead of running.
	RunShell(ctx context.Context, cmd string, envOverrides map[string]string, coderSafetyBypass, dangerousConfirmed bool) (stdout string, stderr string, exitCode int, err error)

	// ClassifyShellCommand asks the host's CoderMode policy whether a
	// raw shell command would be allowed, denied, or require
	// interactive approval. Called by Scheduler.Enqueue to pre-flight
	// every shell command in a Job (action + wait condition + nested
	// composites) before the job is even admitted — so daemon-mode
	// jobs can never hit an interactive prompt downstream.
	//
	// The scheduler never itself prompts the user; this method is the
	// one chance to classify. Returning ShellPolicyAsk means "would
	// normally prompt" and causes the scheduler to reject the enqueue
	// unless the job is explicitly marked DangerousConfirmed (user
	// passed --i-know, or the agent passed i_know:true in its tool
	// call with explicit user blessing upstream).
	ClassifyShellCommand(cmd string) ShellPolicy

	// KubeconfigPath returns the path the K8s evaluator should use
	// (honors CHATCLI_KUBECONFIG, KUBECONFIG, or the watcher's config).
	KubeconfigPath() string

	// DockerSocketPath returns the docker engine socket (env
	// DOCKER_HOST or unix:///var/run/docker.sock).
	DockerSocketPath() string

	// WorkspaceDir returns the chatcli workspace root (project dir or
	// CWD), used for resolving relative paths in conditions/actions.
	WorkspaceDir() string

	// LLMClient returns the currently-configured LLM client for
	// evaluators that want to issue their own prompts (llm_check).
	LLMClient() client.LLMClient

	// AppendHistory lets async actions add a message to the chat
	// history (tagged ownership=scheduler so compaction recognizes it).
	AppendHistory(msg models.Message)

	// PublishEvent forwards a scheduler Event to the live cli.bus
	// so the Ctrl+J overlay and the status line can react.
	PublishEvent(evt Event)

	// NotifyParkComplete is called by ActionAgentResume when a parked
	// agent's wait condition is satisfied (timer elapsed, probe matched,
	// or deadline reached). The bridge resolves the snapshot, restores
	// the interactive agent loop, and injects parkResult as the synthetic
	// tool-call result that the @park invocation would have returned.
	//
	// outcome is one of "elapsed", "matched", "timeout", "cancelled" —
	// short tags driven by the action that fired this notify.
	// detail carries the raw probe output (HTTP body, stdout) when
	// relevant; passed through verbatim to the agent's system prompt.
	NotifyParkComplete(ctx context.Context, token, outcome, detail string) error

	// RunHTTPProbe issues a single HTTP request against url with the
	// given method and headers, and returns the status code, body
	// (capped to a sane size), and error. Used by ActionParkPoll. The
	// bridge implementation centralizes timeout, redirect policy, TLS
	// and proxy semantics so action executors stay terse.
	RunHTTPProbe(ctx context.Context, url, method string, headers map[string]string, timeout time.Duration) (status int, body string, err error)
}

CLIBridge is implemented by the top-level cli.ChatCLI so the scheduler can invoke slash commands, dispatch agent tasks, query the LLM, fire hooks, and read K8s / workspace state.

func NewNoopBridge

func NewNoopBridge() CLIBridge

NewNoopBridge returns a bridge whose methods all return stubs. Used in tests and by the daemon when no CLI is attached.

type Condition

type Condition struct {
	Type     string         `json:"type"`
	Spec     map[string]any `json:"spec,omitempty"`
	Children []Condition    `json:"children,omitempty"`
	Negate   bool           `json:"negate,omitempty"`
}

Condition is the polymorphic descriptor the scheduler hands to a ConditionEvaluator at evaluation time. Type picks the evaluator; Spec carries its parameters; Children allows composites (all_of, any_of, not).

func ParseConditionDSL

func ParseConditionDSL(input string) (Condition, error)

ParseConditionDSL interprets a short string as a Condition.

func ParseConditionJSON

func ParseConditionJSON(raw string) (Condition, error)

ParseConditionJSON loads a condition from a JSON string.

func (Condition) SpecBool

func (c Condition) SpecBool(key string, def bool) bool

SpecBool reads a bool field from Spec with a default.

func (Condition) SpecDuration

func (c Condition) SpecDuration(key string, def time.Duration) time.Duration

SpecDuration reads a duration field from Spec. Accepts Go duration strings ("5s", "10m") or integer seconds.

func (Condition) SpecInt

func (c Condition) SpecInt(key string, def int) int

SpecInt reads an int field from Spec with a default.

func (Condition) SpecString

func (c Condition) SpecString(key, def string) string

SpecString reads a string field from Spec with a default.

func (Condition) Validate

func (c Condition) Validate() error

Validate checks that the Condition has a Type and that the Spec is usable. Type-specific validation happens in the evaluator itself (evaluator.ValidateSpec). This pass only catches structural errors.

type ConditionEvaluator

type ConditionEvaluator interface {
	// Type is the string that appears in Condition.Type.
	Type() string

	// ValidateSpec is called once at job admission. Failure here blocks
	// Enqueue with ErrInvalidCondition.
	ValidateSpec(spec map[string]any) error

	// Evaluate performs one check. ctx carries a timeout set by the
	// scheduler (from Budget.PollInterval by default). Network-bound
	// evaluators must honor ctx.Done().
	Evaluate(ctx context.Context, cond Condition, env *EvalEnv) EvalOutcome
}

ConditionEvaluator is a plug-in that knows how to check one condition type. Methods are pure from the scheduler's standpoint — the caller will wrap Evaluate in a breaker and a timeout.

type ConditionRegistry

type ConditionRegistry struct {
	// contains filtered or unexported fields
}

ConditionRegistry holds the set of registered evaluators.

func NewConditionRegistry

func NewConditionRegistry() *ConditionRegistry

NewConditionRegistry builds an empty registry.

func (*ConditionRegistry) Get

Get returns the evaluator for a type, or (nil, false).

func (*ConditionRegistry) MustRegister

func (r *ConditionRegistry) MustRegister(e ConditionEvaluator)

MustRegister panics on duplicate — suitable for RegisterBuiltins.

func (*ConditionRegistry) Register

Register adds an evaluator. Duplicates are rejected.

func (*ConditionRegistry) Types

func (r *ConditionRegistry) Types() []string

Types returns the registered type names, sorted.

type Config

type Config struct {
	Enabled bool
	DataDir string

	MaxJobs int

	DefaultActionTimeout  time.Duration
	DefaultPollInterval   time.Duration
	DefaultWaitTimeout    time.Duration
	DefaultMaxPolls       int
	DefaultBackoffInitial time.Duration
	DefaultBackoffMax     time.Duration
	DefaultBackoffMult    float64
	DefaultBackoffJitter  float64
	DefaultMaxRetries     int
	DefaultTTL            time.Duration
	HistoryLimit          int

	AllowAgents     bool
	ActionAllowlist map[ActionType]bool

	RateLimitGlobalRPS   float64
	RateLimitGlobalBurst int
	RateLimitOwnerRPS    float64
	RateLimitOwnerBurst  int

	BreakerConfig BreakerConfig

	AuditEnabled    bool
	AuditMaxSizeMB  int
	AuditMaxBackups int
	AuditMaxAgeDays int

	DaemonSocket      string
	DaemonAutoConnect bool

	SnapshotInterval time.Duration
	WALGCInterval    time.Duration

	WorkerCount     int
	WaitWorkerCount int
}

Config bundles every scheduler knob. Callers typically build one via LoadConfigFromEnv() + overrides; tests construct literals.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns a production-safe baseline with all knobs set to the values documented at the top of the file.

func LoadConfigFromEnv

func LoadConfigFromEnv() Config

LoadConfigFromEnv applies CHATCLI_SCHEDULER_* overrides on top of DefaultConfig().

type Daemon

type Daemon struct {
	// contains filtered or unexported fields
}

Daemon is the long-lived server binding the scheduler to a UNIX socket. Built from an existing Scheduler so tests can inject a mock.

func NewDaemon

func NewDaemon(s *Scheduler, socketPath string, logger *zap.Logger) *Daemon

NewDaemon builds a Daemon. Caller must Start and then Wait/Stop.

func (*Daemon) Run

func (d *Daemon) Run(ctx context.Context) error

Run starts the daemon and blocks until ctx is cancelled or the process receives SIGINT/SIGTERM. Intended entry point from the "chatcli daemon start" subcommand.

func (*Daemon) Start

func (d *Daemon) Start(ctx context.Context) error

Start binds the socket, writes the pid file, and starts accepting. If the socket is already held, returns ErrDaemonRunning.

func (*Daemon) Stop

func (d *Daemon) Stop(drainTimeout time.Duration) error

Stop closes the listener and shuts down the scheduler.

type DaemonStats

type DaemonStats struct {
	Version     string                  `json:"version"`
	Started     time.Time               `json:"started_at"`
	Uptime      string                  `json:"uptime"`
	JobsActive  int                     `json:"jobs_active"`
	QueueDepth  int                     `json:"queue_depth"`
	WALSegments int                     `json:"wal_segments"`
	Connections int                     `json:"connections"`
	Breakers    map[string]BreakerState `json:"breakers,omitempty"`
}

DaemonStats is returned for Kind=stats.

type EvalEnv

type EvalEnv struct {
	Logger *zap.Logger
	// Bridge is an optional interface for plug-ins that need to reach
	// into the host ChatCLI (for example to read a K8s kubeconfig path).
	// See CLIBridge in tool_adapter.go for the current contract.
	Bridge CLIBridge
	// DangerousConfirmed mirrors Job.DangerousConfirmed. Wait
	// conditions that shell out (k8s/llm/regex/shell_exit) must
	// forward it to Bridge.RunShell so a job admitted with --i-know
	// at enqueue passes its fire-time recheck on the wait predicate
	// too — not just on the action.
	DangerousConfirmed bool
}

EvalEnv bundles runtime dependencies an evaluator may need (logger, the chatcli bridge for K8s client reuse, etc.). Passed by value so the plug-in doesn't accidentally hold a pointer across turns.

type EvalOutcome

type EvalOutcome struct {
	Satisfied bool
	// Transient means the evaluator is confident the check can be retried
	// (network blip, temporary HTTP error). Persistent errors (bad URL,
	// unknown k8s resource) bubble up via Err and drive the breaker.
	Transient bool
	// Details is shown in the /jobs show output to explain a poll result.
	Details string
	Err     error
}

EvalOutcome is the result of a single condition evaluation.

type Event

type Event struct {
	Type      EventType        `json:"type"`
	Timestamp time.Time        `json:"timestamp"`
	JobID     JobID            `json:"job_id,omitempty"`
	Name      string           `json:"name,omitempty"`
	Owner     Owner            `json:"owner,omitempty"`
	Status    JobStatus        `json:"status,omitempty"`
	Outcome   Outcome          `json:"outcome,omitempty"`
	Message   string           `json:"message,omitempty"`
	Data      map[string]any   `json:"data,omitempty"`
	Execution *ExecutionResult `json:"execution,omitempty"`
}

Event is the payload fan-out. JSON-serialized for IPC + audit; bus subscribers receive the struct directly.

func NewEvent

func NewEvent(t EventType) Event

NewEvent stamps the timestamp and returns a fresh Event value.

func (Event) WithData

func (e Event) WithData(k string, v any) Event

WithData attaches arbitrary k/v payload.

func (Event) WithExecution

func (e Event) WithExecution(r ExecutionResult) Event

WithExecution attaches an ExecutionResult (typically on job.completed or job.failed).

func (Event) WithJob

func (e Event) WithJob(j *Job) Event

WithJob enriches an event with job identity fields.

func (Event) WithMessage

func (e Event) WithMessage(msg string) Event

WithMessage adds a human-readable message.

type EventType

type EventType string

EventType enumerates the scheduler-specific events. All values are strings so they round-trip cleanly through JSON (IPC, audit).

const (
	EventJobCreated       EventType = "job.created"
	EventJobScheduled     EventType = "job.scheduled"
	EventJobFired         EventType = "job.fired"
	EventJobWaitStarted   EventType = "job.wait_started"
	EventJobWaitTick      EventType = "job.wait_tick"
	EventJobWaitSatisfied EventType = "job.wait_satisfied"
	EventJobRunning       EventType = "job.running"
	EventJobCompleted     EventType = "job.completed"
	EventJobFailed        EventType = "job.failed"
	EventJobTimedOut      EventType = "job.timed_out"
	EventJobCancelled     EventType = "job.cancelled"
	EventJobSkipped       EventType = "job.skipped"
	EventJobRetryQueued   EventType = "job.retry_queued"
	EventJobPaused        EventType = "job.paused"
	EventJobResumed       EventType = "job.resumed"
	EventJobDependency    EventType = "job.dependency_resolved"
	EventBreakerOpened    EventType = "breaker.opened"
	EventBreakerHalfOpen  EventType = "breaker.half_open"
	EventBreakerClosed    EventType = "breaker.closed"
	EventDaemonStarted    EventType = "daemon.started"
	EventDaemonStopped    EventType = "daemon.stopped"
)

type ExecEnv

type ExecEnv struct {
	Logger *zap.Logger
	Bridge CLIBridge
	// Job is a read-only summary of the job being executed, useful for
	// error messages ("action failed for job X").
	Job JobSummary
	// Enqueue lets an executor schedule follow-up jobs (e.g. ParkPoll
	// re-enqueues itself on every tick, fans out to AgentResume on
	// success/timeout). The dispatcher injects a closure bound to the
	// live Scheduler so executors don't need to import it themselves.
	// Nil in tests that don't exercise self-rescheduling.
	Enqueue func(ctx context.Context, job *Job) (*Job, error)
}

ExecEnv bundles dependencies action executors may need. Mirrors EvalEnv but distinct because actions often need the full CLI bridge (run a slash command, invoke an agent, fire a hook).

type ExecutionResult

type ExecutionResult struct {
	// CycleNum is the recurring-schedule cycle this execution belongs
	// to (1 = first cycle). Always 0 for one-shot jobs. Lets /jobs logs
	// group attempts within a cycle and separate cycles visually.
	CycleNum   int           `json:"cycle,omitempty"`
	AttemptNum int           `json:"attempt"`
	StartedAt  time.Time     `json:"started_at"`
	FinishedAt time.Time     `json:"finished_at"`
	Duration   time.Duration `json:"duration_ns"`
	Outcome    Outcome       `json:"outcome"`
	Output     string        `json:"output,omitempty"`
	// Error holds the stringified error message; we keep it as a
	// string (not an error value) because ExecutionResult travels
	// through JSON (WAL, IPC, audit log).
	Error string `json:"error,omitempty"`
	// Tokens / Cost surface LLM-action cost to /jobs show.
	Tokens int     `json:"tokens,omitempty"`
	Cost   float64 `json:"cost_usd,omitempty"`
	// ConditionSatisfied is true when a wait_until job's condition
	// flipped on this poll. Left empty for non-wait executions.
	ConditionSatisfied bool   `json:"condition_satisfied,omitempty"`
	ConditionDetails   string `json:"condition_details,omitempty"`
}

ExecutionResult captures one attempt — either a wait-condition poll result (from an on_condition schedule) or an action execution. Stored in Job.History as a ring buffer.

func (ExecutionResult) Succeeded

func (r ExecutionResult) Succeeded() bool

Succeeded reports whether the execution ended in OutcomeSuccess. Helper for hook event payloads and audit queries.

type Frame

type Frame struct {
	ID     string          `json:"id,omitempty"`
	Kind   FrameKind       `json:"kind"`
	Owner  *Owner          `json:"owner,omitempty"`
	Input  json.RawMessage `json:"input,omitempty"`
	Result json.RawMessage `json:"result,omitempty"`
	Error  string          `json:"error,omitempty"`
	Event  *Event          `json:"event,omitempty"`
	TS     time.Time       `json:"ts,omitempty"`
}

Frame is the on-wire envelope.

type FrameKind

type FrameKind string

FrameKind classifies a frame.

const (
	KindPing      FrameKind = "ping"
	KindEnqueue   FrameKind = "enqueue"
	KindCancel    FrameKind = "cancel"
	KindPause     FrameKind = "pause"
	KindResume    FrameKind = "resume"
	KindQuery     FrameKind = "query"
	KindList      FrameKind = "list"
	KindSnapshot  FrameKind = "snapshot"
	KindSubscribe FrameKind = "subscribe"
	KindStats     FrameKind = "stats"
	KindBye       FrameKind = "bye"

	KindOK    FrameKind = "ok"
	KindError FrameKind = "error"
	KindEvent FrameKind = "event"
)

type Job

type Job struct {
	// ─── Identity ──────────────────────────────────────────
	ID      JobID             `json:"id"`
	Name    string            `json:"name"`
	Owner   Owner             `json:"owner"`
	Tags    map[string]string `json:"tags,omitempty"`
	Version int               `json:"version"`

	// ─── Scheduling ────────────────────────────────────────
	Schedule Schedule  `json:"schedule"`
	Action   Action    `json:"action"`
	Wait     *WaitSpec `json:"wait,omitempty"`

	// ─── DAG ───────────────────────────────────────────────
	DependsOn []JobID `json:"depends_on,omitempty"`
	Triggers  []JobID `json:"triggers,omitempty"`
	// ParentID is non-empty for jobs spawned by a Triggers edge, so
	// /jobs tree can render the provenance.
	ParentID JobID `json:"parent_id,omitempty"`

	// ─── Execution budget ──────────────────────────────────
	Budget Budget        `json:"budget,omitempty"`
	TTL    time.Duration `json:"ttl,omitempty"`

	// ─── State machine ─────────────────────────────────────
	Status       JobStatus          `json:"status"`
	NextFireAt   time.Time          `json:"next_fire_at,omitempty"`
	CreatedAt    time.Time          `json:"created_at"`
	UpdatedAt    time.Time          `json:"updated_at"`
	FinishedAt   time.Time          `json:"finished_at,omitempty"`
	Transitions  []TransitionReason `json:"transitions,omitempty"`
	PauseReason  string             `json:"pause_reason,omitempty"`
	CancelReason string             `json:"cancel_reason,omitempty"`

	// ─── Execution history ─────────────────────────────────
	LastResult   *ExecutionResult  `json:"last_result,omitempty"`
	History      []ExecutionResult `json:"history,omitempty"`
	HistoryLimit int               `json:"history_limit,omitempty"`
	Attempts     int               `json:"attempts,omitempty"`
	// CycleCount is incremented on a recurring template every time it
	// spawns a child cycle. Always zero on a non-recurring or one-shot
	// (cycle) Job — it identifies templates and gives /jobs show a
	// cheap count of how many fires have happened so far.
	CycleCount int `json:"cycle_count,omitempty"`

	// ─── Flags ─────────────────────────────────────────────
	// DangerousConfirmed marks a job whose action is otherwise filtered
	// by the shell safety filter but the creator passed --i-know.
	DangerousConfirmed bool `json:"dangerous_confirmed,omitempty"`
	// Description is a free-form label shown in `/jobs show`.
	Description string `json:"description,omitempty"`
	// contains filtered or unexported fields
}

Job is the unit of scheduling. See the state machine in state_machine.go for legal transitions.

func NewJob

func NewJob(name string, owner Owner, sched Schedule, action Action) *Job

NewJob builds a Job with sensible defaults. The Validate pass runs inside Scheduler.Enqueue; callers don't need to invoke it directly.

func (*Job) Clone

func (j *Job) Clone() *Job

Clone returns a deep-ish copy suitable for returning to callers via `/jobs show` / IPC without exposing the internal mutex. Maps and slices are duplicated so mutations on the copy don't leak into the scheduler's working set.

func (*Job) IsExpired

func (j *Job) IsExpired(now time.Time) bool

IsExpired returns true when the job's TTL has elapsed since it reached a terminal state. Scheduler.gc prunes expired records.

func (*Job) Summary

func (j *Job) Summary() JobSummary

Summary projects a Job to a JobSummary.

func (*Job) Validate

func (j *Job) Validate() error

Validate runs admission-time checks. Called from Scheduler.Enqueue.

type JobID

type JobID string

JobID is the globally unique identifier. Derived from sha256(name|owner|created_nonce) truncated to 16 hex chars. See DeriveJobID in id.go. Stable across retries — re-Enqueue of the same (name, owner) produces the same JobID and is an idempotent no-op.

func DeriveJobID

func DeriveJobID(name string, owner Owner, nonce string) JobID

DeriveJobID produces a deterministic ID from the given components. nonce may be any string that disambiguates re-submissions (typically the created-at timestamp as RFC3339Nano). If nonce is empty a UUID is used to guarantee uniqueness.

func NewJobID

func NewJobID() JobID

NewJobID returns a random, high-entropy ID. Used when determinism is undesirable (e.g. spawning a child job from a Triggers edge — each child gets a fresh ID even if the parent reruns).

func (JobID) IsZero

func (j JobID) IsZero() bool

IsZero reports whether the ID is empty (used in validation paths).

func (JobID) String

func (j JobID) String() string

String returns the ID as a plain string for logging / display.

type JobStatus

type JobStatus string

JobStatus is the externally observable lifecycle state. Transitions are enforced by stateMachine.Transition; see state_machine.go.

const (
	// StatusPending — job is registered, waiting for its NextFireAt.
	StatusPending JobStatus = "pending"
	// StatusBlocked — job is waiting for one or more DependsOn jobs.
	StatusBlocked JobStatus = "blocked"
	// StatusWaiting — poll loop active, evaluating a wait condition.
	StatusWaiting JobStatus = "waiting"
	// StatusRunning — action is executing.
	StatusRunning JobStatus = "running"
	// StatusPaused — user asked to hold; no evaluation until Resume.
	StatusPaused JobStatus = "paused"
	// StatusCompleted — action succeeded.
	StatusCompleted JobStatus = "completed"
	// StatusFailed — action (or wait) returned an error.
	StatusFailed JobStatus = "failed"
	// StatusCancelled — user or owner asked to stop mid-flight.
	StatusCancelled JobStatus = "cancelled"
	// StatusTimedOut — wait condition never satisfied within WaitTimeout
	// and the timeout behavior was "fail".
	StatusTimedOut JobStatus = "timed_out"
	// StatusSkipped — missed-fire policy decided to drop the occurrence.
	StatusSkipped JobStatus = "skipped"
)

func (JobStatus) IsActive

func (s JobStatus) IsActive() bool

IsActive reports whether the job currently occupies a worker slot or is otherwise doing work. Used for queue depth metrics.

func (JobStatus) IsTerminal

func (s JobStatus) IsTerminal() bool

IsTerminal reports whether no further transitions are possible.

type JobSummary

type JobSummary struct {
	ID          JobID     `json:"id"`
	Name        string    `json:"name"`
	Owner       Owner     `json:"owner"`
	Status      JobStatus `json:"status"`
	Type        string    `json:"type"`
	NextFireAt  time.Time `json:"next_fire_at,omitempty"`
	LastOutcome Outcome   `json:"last_outcome,omitempty"`
	Attempts    int       `json:"attempts,omitempty"`
	Description string    `json:"description,omitempty"`
	CreatedAt   time.Time `json:"created_at"`
	UpdatedAt   time.Time `json:"updated_at"`
	Tags        []string  `json:"tags,omitempty"`
	// DangerousConfirmed mirrors Job.DangerousConfirmed so action
	// executors (notably action/shell.go) can pass it through to
	// CLIBridge.RunShell at fire time. Without this, a job that was
	// admitted to the queue with --i-know / i_know:true still failed
	// its fire-time policy re-check because the bridge had no way to
	// see the per-job authorization.
	DangerousConfirmed bool `json:"dangerous_confirmed,omitempty"`
}

JobSummary is the compact view returned by List and the status line. Stripping history and transitions keeps the JSON small for IPC.

type ListFilter

type ListFilter struct {
	Owner           *Owner
	Statuses        []JobStatus
	Tag             string
	NameSubstr      string
	IncludeTerminal bool
}

ListFilter is an OR-of-conjunctions filter for List.

type Metrics

type Metrics struct {
	JobsCreated       *prometheus.CounterVec   // label: owner_kind, action_type
	JobsFired         *prometheus.CounterVec   // label: outcome, action_type
	WaitChecks        *prometheus.CounterVec   // label: condition_type, satisfied
	WaitDuration      *prometheus.HistogramVec // label: condition_type
	ActionDuration    *prometheus.HistogramVec // label: action_type, outcome
	QueueDepth        prometheus.Gauge
	ActiveJobs        prometheus.Gauge
	BreakerState      *prometheus.GaugeVec   // label: kind, key
	RetryCount        *prometheus.CounterVec // label: attempt (bucketed)
	EnqueueErrors     *prometheus.CounterVec // label: reason (rate_limited, full, invalid)
	WALSegments       prometheus.Gauge
	AuditWrites       prometheus.Counter
	DaemonConnections prometheus.Gauge
}

Metrics is the scheduler's Prometheus surface. Use GetMetrics() to obtain the package singleton — the counters are process-wide so a single Scheduler per process is the normal case.

func GetMetrics

func GetMetrics() *Metrics

GetMetrics returns the package-wide singleton, constructing it on first call. Subsequent calls return the same instance so registering twice doesn't panic with Prometheus's duplicate-metric error.

type MissPolicy

type MissPolicy string

MissPolicy decides how to handle a fire window that passed while the scheduler was not running (laptop asleep, daemon crashed).

const (
	// MissFireOnce — run once for the missed window, regardless of
	// how many cron ticks happened. Default.
	MissFireOnce MissPolicy = "fire_once"
	// MissFireAll — run once per missed tick (noisy; opt-in).
	MissFireAll MissPolicy = "fire_all"
	// MissSkip — drop the missed window entirely.
	MissSkip MissPolicy = "skip"
)

type Outcome

type Outcome string

Outcome classifies a single ExecutionResult. Distinct from JobStatus because a job may have many executions (cron, retries, interval) and each one has its own Outcome.

const (
	OutcomeSuccess    Outcome = "success"
	OutcomeFailed     Outcome = "failed"
	OutcomeTimeout    Outcome = "timeout"
	OutcomeCancelled  Outcome = "cancelled"
	OutcomeBreakerOff Outcome = "breaker_open"
	OutcomeSkipped    Outcome = "skipped"
)

type Owner

type Owner struct {
	Kind OwnerKind `json:"kind"`
	// ID is the owner-specific identifier (session id for user,
	// agent name for agent, worker call id for worker).
	ID string `json:"id"`
	// Tag is a free-form label for grouping in the UI. Optional.
	Tag string `json:"tag,omitempty"`
}

Owner identifies the principal responsible for a job. Used for authorization (cancel / pause) and filtering (`/jobs list --owner me`).

func (Owner) Equal

func (o Owner) Equal(other Owner) bool

Equal compares two owners by Kind + ID only. Tag is ignored — two user-owned jobs from the same session are considered the same owner even if they were tagged differently.

func (Owner) String

func (o Owner) String() string

String returns "kind:id" for compact log/display lines.

type OwnerKind

type OwnerKind string

OwnerKind classifies who created a job.

const (
	// OwnerUser — direct /schedule or /wait invocation by the human.
	OwnerUser OwnerKind = "user"
	// OwnerAgent — created inside the ReAct loop via a tool call.
	OwnerAgent OwnerKind = "agent"
	// OwnerWorker — created by a subagent (worker dispatcher).
	OwnerWorker OwnerKind = "worker"
	// OwnerSystem — scheduler itself (e.g. retry follow-ups).
	OwnerSystem OwnerKind = "system"
	// OwnerHook — triggered by a hook firing.
	OwnerHook OwnerKind = "hook"
)

type PruneFilter added in v1.109.1

type PruneFilter struct {
	Statuses   []JobStatus // empty = every terminal status
	Owner      *Owner      // nil = any owner
	OlderThan  time.Duration
	NameSubstr string
}

PruneFilter narrows which terminal jobs Prune deletes. Zero value matches every terminal job (succeeded + failed + cancelled + timed_out + skipped). Combine fields to narrow further.

type RemoteClient

type RemoteClient struct {
	// contains filtered or unexported fields
}

RemoteClient is a thin IPC client talking to a Daemon.

func Dial

func Dial(socketPath string) (*RemoteClient, error)

Dial connects to the daemon at socketPath.

func (*RemoteClient) Cancel

func (c *RemoteClient) Cancel(ctx context.Context, owner Owner, id JobID, reason string) error

Cancel cancels a job via IPC.

func (*RemoteClient) Close

func (c *RemoteClient) Close() error

Close terminates the connection.

func (*RemoteClient) Enqueue

func (c *RemoteClient) Enqueue(ctx context.Context, owner Owner, in ToolInput) (*ToolOutput, error)

Enqueue sends a schedule_job request.

func (*RemoteClient) List

func (c *RemoteClient) List(ctx context.Context, filter ListFilter) ([]JobSummary, error)

List fetches summaries.

func (*RemoteClient) Pause

func (c *RemoteClient) Pause(ctx context.Context, owner Owner, id JobID, reason string) error

Pause pauses a job via IPC.

func (*RemoteClient) Ping

func (c *RemoteClient) Ping(ctx context.Context) error

Ping confirms the daemon is responsive.

func (*RemoteClient) Query

func (c *RemoteClient) Query(ctx context.Context, id JobID) (*Job, error)

Query fetches a single job.

func (*RemoteClient) Resume

func (c *RemoteClient) Resume(ctx context.Context, owner Owner, id JobID) error

Resume resumes a paused job.

func (*RemoteClient) Snapshot

func (c *RemoteClient) Snapshot(ctx context.Context) error

Snapshot forces the daemon to write a snapshot now.

func (*RemoteClient) Stats

func (c *RemoteClient) Stats(ctx context.Context) (DaemonStats, error)

Stats returns the daemon's current runtime stats.

func (*RemoteClient) Subscribe

func (c *RemoteClient) Subscribe(ctx context.Context) (<-chan Event, error)

Subscribe registers the caller for server-sent events. The returned channel is closed when the connection drops.

type Schedule

type Schedule struct {
	Kind       ScheduleKind  `json:"kind"`
	ExactTime  time.Time     `json:"exact_time,omitempty"`
	Relative   time.Duration `json:"relative,omitempty"`
	Cron       string        `json:"cron,omitempty"`
	Interval   time.Duration `json:"interval,omitempty"`
	MissPolicy MissPolicy    `json:"miss_policy,omitempty"`
	// Timezone, if set, adjusts cron evaluation. Go time.Location name
	// (IANA) — "Local", "UTC", "America/Sao_Paulo", etc. Empty = Local.
	Timezone string `json:"timezone,omitempty"`
	// contains filtered or unexported fields
}

Schedule describes when a Job should next be considered for execution. Exactly one of ExactTime / Relative / Cron / Interval is used per Kind; the others are zero values.

func ParseScheduleDSL

func ParseScheduleDSL(input string) (Schedule, error)

ParseScheduleDSL interprets a short string as a Schedule.

func (*Schedule) IsRecurring

func (s *Schedule) IsRecurring() bool

IsRecurring reports whether the schedule can fire more than once.

func (*Schedule) Next

func (s *Schedule) Next(after, createdAt time.Time) time.Time

Next returns the time at which this Schedule should fire strictly after `after`. createdAt is used to anchor relative/interval. When no further fires are possible (absolute schedule already passed), returns zero time.

Next is deterministic: given the same inputs, the result is the same. The scheduler main loop calls it once per transition.

func (*Schedule) Validate

func (s *Schedule) Validate() error

Validate returns a non-nil error when the Schedule's fields are inconsistent with its Kind. Also pre-parses cron so Next() doesn't pay the cost repeatedly.

type ScheduleKind

type ScheduleKind string

ScheduleKind classifies how Next is computed.

const (
	// ScheduleAbsolute fires once at ExactTime and is terminal.
	ScheduleAbsolute ScheduleKind = "absolute"
	// ScheduleRelative fires once at created_at + Relative.
	ScheduleRelative ScheduleKind = "relative"
	// ScheduleCron fires on every Cron match until cancelled.
	ScheduleCron ScheduleKind = "cron"
	// ScheduleInterval fires every Interval from first fire.
	ScheduleInterval ScheduleKind = "interval"
	// ScheduleOnCondition has no time component — Next returns now and
	// the wait-condition gate does the work.
	ScheduleOnCondition ScheduleKind = "on_condition"
	// ScheduleManual never fires on its own; it's triggered by another
	// job's Triggers edge. Used for DAG leaf nodes.
	ScheduleManual ScheduleKind = "manual"
)

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler is the top-level engine. Safe for concurrent use.

func New

func New(cfg Config, bridge CLIBridge, deps SchedulerDeps, logger *zap.Logger) (*Scheduler, error)

New constructs a Scheduler from cfg + bridge. Call RegisterBuiltins to install the standard evaluators/executors, then Start to run.

bridge may be nil; a noopBridge is substituted so the scheduler still boots (useful for the daemon's bootstrap phase before a CLI attaches).

func (*Scheduler) Actions

func (s *Scheduler) Actions() *ActionRegistry

Actions returns the action registry.

func (*Scheduler) Bridge

func (s *Scheduler) Bridge() CLIBridge

Bridge returns the wired CLIBridge (may be the noop one).

func (*Scheduler) Cancel

func (s *Scheduler) Cancel(id JobID, reason string, requester Owner) error

Cancel marks the job cancelled. Running jobs are notified via ctx cancellation (their action executor sees ctx.Done() fire).

Locking discipline: the job lock is held only during the mutation; emit() is invoked outside the lock because Event.WithJob reacquires the same mutex to snapshot job fields and would otherwise deadlock.

func (*Scheduler) Conditions

func (s *Scheduler) Conditions() *ConditionRegistry

Conditions returns the condition registry so callers may add extensions before Start.

func (*Scheduler) Config

func (s *Scheduler) Config() Config

Config returns a copy of the current runtime configuration.

func (*Scheduler) DrainAndShutdown

func (s *Scheduler) DrainAndShutdown(timeout time.Duration)

DrainAndShutdown stops accepting new Enqueues, waits for in-flight work (up to timeout), then closes the queue + WAL + audit. Pending jobs remain in the WAL and will be replayed on next Start.

func (*Scheduler) Enqueue

func (s *Scheduler) Enqueue(ctx context.Context, job *Job) (*Job, error)

Enqueue admits a new job. On success the returned Job is a clone of the one held inside the scheduler — callers may inspect but not mutate.

func (*Scheduler) FindByName

func (s *Scheduler) FindByName(name string) (*Job, bool)

FindByName returns the live (non-terminal) job for a name, if any.

func (*Scheduler) IsClosed

func (s *Scheduler) IsClosed() bool

IsClosed reports whether the scheduler has been shut down.

func (*Scheduler) List

func (s *Scheduler) List(filter ListFilter) []JobSummary

List returns summaries of every known job matching the filter. Defaults: non-terminal only, sorted by NextFireAt then CreatedAt.

func (*Scheduler) Metrics

func (s *Scheduler) Metrics() *Metrics

Metrics returns the Prometheus surface.

func (*Scheduler) Pause

func (s *Scheduler) Pause(id JobID, reason string, requester Owner) error

Pause halts a non-running job until Resume is called.

func (*Scheduler) Prune added in v1.109.1

func (s *Scheduler) Prune(filter PruneFilter) ([]JobID, error)

Prune removes terminal jobs that match the filter. Returns the list of removed job IDs. Active jobs (pending/running/waiting/paused) are never touched — use Cancel for those.

Locking: matching is done under s.mu, deletion is done in-place inside the same critical section to keep the WAL ack consistent with the in-memory map. WAL acks happen outside the map lock to avoid blocking other operations on slow disks.

func (*Scheduler) Query

func (s *Scheduler) Query(id JobID) (*Job, error)

Query returns a cloned snapshot of the job (including history), or ErrJobNotFound.

func (*Scheduler) Resume

func (s *Scheduler) Resume(id JobID, requester Owner) error

Resume takes a Paused job back to Pending and re-enqueues it.

func (*Scheduler) SetBridge

func (s *Scheduler) SetBridge(b CLIBridge)

SetBridge swaps the CLIBridge. Used by daemon mode when a CLI attaches. Safe to call while the scheduler is running.

func (*Scheduler) Snapshot

func (s *Scheduler) Snapshot() error

Snapshot triggers an immediate on-disk snapshot. Exposed for /jobs gc and daemon IPC "snapshot" calls.

func (*Scheduler) Start

func (s *Scheduler) Start(ctx context.Context) error

Start spins up the schedule pump, the worker pool, the snapshot and GC goroutines. Idempotent — a second Start is a no-op.

func (*Scheduler) Version

func (s *Scheduler) Version() uint64

Version returns an opaque counter that bumps on every mutation. UI subscribers poll Version cheaply to decide whether to rerender.

type SchedulerDeps

type SchedulerDeps struct {
	EventBus *bus.MessageBus
	Hooks    *hooks.Manager
}

SchedulerDeps bundles the optional external dependencies Scheduler uses for observability and coordination. All fields are optional; a nil value means "skip that channel".

type ShellPolicy

type ShellPolicy int

ShellPolicy classifies a shell command's security disposition as reported by the host CoderMode policy manager.

const (
	// ShellPolicyAllow — command is on the allowlist (or is a known
	// read-only command like `kubectl get`, `git status`, etc.).
	// Scheduler admits the job without further questions.
	ShellPolicyAllow ShellPolicy = iota

	// ShellPolicyAsk — command would require interactive approval in
	// coder mode. The scheduler never prompts (daemon/autonomous
	// context); it rejects the enqueue unless the job is marked
	// DangerousConfirmed (user used --i-know, or an agent tool call
	// passed i_know:true with explicit upstream blessing).
	ShellPolicyAsk

	// ShellPolicyDeny — command is on the denylist. Always rejected
	// regardless of flags — --i-know CANNOT override a deny.
	ShellPolicyDeny
)

func (ShellPolicy) IsTerminal

func (p ShellPolicy) IsTerminal() bool

IsTerminal reports whether this classification by itself is enough to decide admission (Allow or Deny). Ask is non-terminal — the caller must consult DangerousConfirmed.

func (ShellPolicy) String

func (p ShellPolicy) String() string

String makes the policy log-friendly.

type Suggestion

type Suggestion struct {
	Text        string
	Description string
}

Suggestion is a (text, description) pair consumed by the top-level completer. Kept minimal on purpose so the completer can adapt to future terminal libraries.

func CommandSuggestions

func CommandSuggestions(command string) []Suggestion

CommandSuggestions returns the top-level /schedule /wait /jobs subcommands.

func JobsListFlags

func JobsListFlags() []Suggestion

JobsListFlags returns the flag suggestions for /jobs list.

type TimeoutBehavior

type TimeoutBehavior string

TimeoutBehavior decides what happens when a wait-until loop runs out of time or poll budget.

const (
	// TimeoutFail — mark job StatusTimedOut. Default.
	TimeoutFail TimeoutBehavior = "fail"
	// TimeoutFireAnyway — run the Action even though the condition
	// never turned true. Useful for "cleanup regardless" semantics.
	TimeoutFireAnyway TimeoutBehavior = "fire_anyway"
	// TimeoutFallback — run WaitSpec.Fallback (a different Action) and
	// then fail. Used to notify when a deployment never came up.
	TimeoutFallback TimeoutBehavior = "fallback"
)

type ToolAdapter

type ToolAdapter struct {
	// contains filtered or unexported fields
}

ToolAdapter exposes the scheduler methods as JSON-in/JSON-out callable tools. Used by the ReAct loop and (via IPC) by the daemon.

func NewToolAdapter

func NewToolAdapter(s *Scheduler) *ToolAdapter

NewToolAdapter builds the adapter.

func (*ToolAdapter) CancelJob

func (t *ToolAdapter) CancelJob(_ context.Context, owner Owner, rawIn string) (string, error)

CancelJob implements the cancel_job tool.

func (*ToolAdapter) ListJobs

func (t *ToolAdapter) ListJobs(_ context.Context, owner Owner, rawIn string) (string, error)

ListJobs implements the list_jobs tool.

func (*ToolAdapter) QueryJob

func (t *ToolAdapter) QueryJob(_ context.Context, _ Owner, rawIn string) (string, error)

QueryJob implements the query_job tool.

func (*ToolAdapter) ScheduleJob

func (t *ToolAdapter) ScheduleJob(ctx context.Context, owner Owner, rawIn string) (string, error)

ScheduleJob implements the schedule_job tool.

func (*ToolAdapter) WaitUntil

func (t *ToolAdapter) WaitUntil(ctx context.Context, owner Owner, rawIn string) (string, error)

WaitUntil implements the wait_until tool.

type ToolInput

type ToolInput struct {
	// schedule_job / wait_until inputs share most fields.
	Name        string            `json:"name,omitempty"`
	Description string            `json:"description,omitempty"`
	Tags        map[string]string `json:"tags,omitempty"`

	// Schedule.
	When     string    `json:"when,omitempty"`     // DSL string
	Schedule *Schedule `json:"schedule,omitempty"` // explicit
	// Action.
	Do     string  `json:"do,omitempty"`
	Action *Action `json:"action,omitempty"`
	// Wait.
	Until string    `json:"until,omitempty"`
	Wait  *WaitSpec `json:"wait,omitempty"`
	// Budget overrides.
	Timeout      string `json:"timeout,omitempty"`
	PollInterval string `json:"poll,omitempty"`
	MaxPolls     int    `json:"max_polls,omitempty"`
	WaitTimeout  string `json:"wait_timeout,omitempty"`
	MaxRetries   int    `json:"max_retries,omitempty"`
	OnTimeout    string `json:"on_timeout,omitempty"`
	// DAG.
	DependsOn []string `json:"depends_on,omitempty"`
	Triggers  []string `json:"triggers,omitempty"`
	// TTL.
	TTL string `json:"ttl,omitempty"`
	// async for wait_until.
	Async bool `json:"async,omitempty"`

	// IKnow pre-authorizes shell commands that would normally hit
	// ShellPolicyAsk at enqueue time. When true, the resulting Job
	// has DangerousConfirmed=true and passes preflight even on
	// Ask classifications. Denylist rules still bloquean — IKnow
	// cannot override Deny. Both the user (`--i-know` flag) and
	// agents (`i_know` field in the @scheduler tool call) may set
	// this; the authorization decision already happened at a higher
	// layer (the user ran /agent to spawn the agent, or the user
	// ran /schedule directly).
	IKnow bool `json:"i_know,omitempty"`

	// query/list/cancel inputs.
	ID     string     `json:"id,omitempty"`
	Filter ListFilter `json:"filter,omitempty"`
	Reason string     `json:"reason,omitempty"`
}

ToolInput is the JSON shape the ReAct loop passes in.

type ToolOutput

type ToolOutput struct {
	OK      bool         `json:"ok"`
	JobID   JobID        `json:"job_id,omitempty"`
	Status  JobStatus    `json:"status,omitempty"`
	Summary *JobSummary  `json:"summary,omitempty"`
	Job     *Job         `json:"job,omitempty"`
	Jobs    []JobSummary `json:"jobs,omitempty"`
	Outcome Outcome      `json:"outcome,omitempty"`
	Details string       `json:"details,omitempty"`
	Error   string       `json:"error,omitempty"`
}

ToolOutput is the normalized JSON shape returned to the ReAct loop.

type TransitionReason

type TransitionReason struct {
	From    JobStatus
	To      JobStatus
	Message string
	At      time.Time
}

TransitionReason carries human-readable context for audit/event. Separated from the status because the same status may be reached via different paths (e.g. StatusFailed after action error vs after circuit breaker rejection).

type WaitSpec

type WaitSpec struct {
	Condition Condition       `json:"condition"`
	OnTimeout TimeoutBehavior `json:"on_timeout,omitempty"`
	// Fallback is consulted when OnTimeout=TimeoutFallback.
	Fallback *Action `json:"fallback,omitempty"`
}

WaitSpec optionally gates an Action behind a polled condition. When Wait is non-nil on a Job, the scheduler enters StatusWaiting at NextFireAt and stays there, evaluating WaitSpec.Condition every PollInterval, until the condition is satisfied or the budget runs out.

func (*WaitSpec) Validate

func (w *WaitSpec) Validate() error

Validate returns a non-nil error when the WaitSpec is malformed. Called during Enqueue admission.

Directories

Path Synopsis
* AgentResume — fires when a parked agent's wait condition is satisfied * and the interactive ReAct loop should re-enter.
* AgentResume — fires when a parked agent's wait condition is satisfied * and the interactive ReAct loop should re-enter.
* Package builtins wires the built-in condition evaluators and action * executors into a Scheduler.
* Package builtins wires the built-in condition evaluators and action * executors into a Scheduler.
* Package condition: built-in evaluators registry.
* Package condition: built-in evaluators registry.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL