Documentation
¶
Overview ¶
* ChatCLI - Scheduler: append-only JSONL audit log. * * Every scheduler mutation (create, transition, cancel, fire) writes a * line to <dir>/audit.log. Operators and compliance auditors consume * this file via their existing log pipelines. The file is a plain * JSONL (one JSON object per line) to keep parsers simple. * * Rotation: lumberjack handles size-based rolling. Default 10 MiB per * file, 7 backups kept. Configurable via Config.AuditMaxSizeMB.
* ChatCLI - Scheduler: circuit breaker. * * A breaker protects the scheduler from a cascading failure mode: the * k8s API is down → every k8s_resource_ready evaluator times out → * hundreds of jobs stack up polling, saturating the worker pool and * preventing unrelated jobs from making progress. * * Breaker states (classic three-state pattern): * * Closed — calls pass through. Failures are counted. When the * count crosses the threshold within a sliding window, * trip to Open. * * Open — calls short-circuit with ErrBreakerOpen. After * cooldown elapses, allow one probe by transitioning * to HalfOpen. * * HalfOpen — exactly one probe permitted; subsequent callers * short-circuit until the probe result arrives. * probe success → Closed * probe failure → Open (cooldown restarted) * * Granularity: one breaker per (kind, key) — e.g. one for the * k8s_resource_ready evaluator type as a whole, another for each * shell action's command class. The breakerGroup holds them.
* ChatCLI - Scheduler: CLIBridge interface. * * The scheduler lives inside the cli package tree but deliberately does * NOT import the top-level cli.ChatCLI struct — that would create a * circular dependency with command_handler.go, cli_completer.go, * agent_mode.go, and everything else that will in turn want to call * the scheduler. * * Instead, everything the scheduler needs from the host process is * abstracted behind CLIBridge. The implementation lives in * cli/scheduler_bridge.go (top-level cli package) and is constructed * at NewChatCLI time. Unit tests provide a mock implementation. * * The bridge surface is deliberately wide — a narrow surface would * force us to duplicate logic, and the scheduler is already a * privileged subsystem (it can fire any slash command the user can).
* ChatCLI - Scheduler: IPC client. * * RemoteClient wraps a UNIX-socket connection and exposes methods that * mirror the in-process Scheduler API. The chatcli CLI uses this when * a daemon is detected on Start. * * The connection is persistent — reads and writes are serialized. When * the connection drops, every pending request gets ErrNoDaemon and * the CLI falls back to in-process mode (configurable).
* ChatCLI - Scheduler: autocomplete suggestions. * * The top-level cli/cli_completer.go calls Suggest(...) to fetch * /schedule, /wait and /jobs suggestions. Returning plain strings * rather than go-prompt.Suggest structs keeps this package free of a * TUI library dependency.
* ChatCLI - Scheduler: configuration. * * All operator-tunable knobs live here. Environment variables are * resolved once at Scheduler construction (via LoadConfigFromEnv); * subsequent changes require a restart — on purpose, so a mid-flight * `/reload` doesn't mutate the rate-limiter or the data dir under a * running queue. * * Env vars (prefixed CHATCLI_SCHEDULER_): * * ENABLED bool — master kill switch (default true) * DATA_DIR path — WAL + audit base dir * (default ~/.chatcli/scheduler) * MAX_JOBS int — max non-terminal jobs at once * (default 256) * DEFAULT_ACTION_TIMEOUT dur — default Action timeout (default 5m) * DEFAULT_POLL_INTERVAL dur — default wait poll (default 5s) * DEFAULT_WAIT_TIMEOUT dur — default wait timeout (default 30m) * DEFAULT_MAX_POLLS int — default wait poll cap (default 0=unlim) * DEFAULT_BACKOFF_INITIAL dur — default retry initial (default 1s) * DEFAULT_BACKOFF_MAX dur — default retry cap (default 5m) * DEFAULT_BACKOFF_MULT float — default retry multiplier (default 2.0) * DEFAULT_BACKOFF_JITTER float — default jitter fraction (default 0.2) * DEFAULT_MAX_RETRIES int — default MaxRetries (default 3) * DEFAULT_TTL dur — how long terminal jobs linger for * /jobs history (default 24h) * HISTORY_LIMIT int — max ExecutionResult per job * (default 16) * * ALLOW_AGENTS bool — may agents create jobs? (default true) * ACTION_ALLOWLIST csv — action types that may be scheduled * (default: slash_cmd,llm_prompt, * agent_task,worker_dispatch,hook, * noop,webhook,shell — shell still * goes through CoderMode safety) * * RATE_LIMIT_GLOBAL_RPS float (default 5.0) * RATE_LIMIT_GLOBAL_BURST int (default 20) * RATE_LIMIT_OWNER_RPS float (default 1.0) * RATE_LIMIT_OWNER_BURST int (default 10) * * BREAKER_FAILURE_THRESHOLD int (default 5) * BREAKER_WINDOW dur (default 60s) * BREAKER_COOLDOWN dur (default 30s) * * AUDIT_ENABLED bool (default true) * AUDIT_MAX_SIZE_MB int (default 10) * AUDIT_MAX_BACKUPS int (default 7) * AUDIT_MAX_AGE_DAYS int (default 30) * * DAEMON_SOCKET path (default ~/.chatcli/scheduler/daemon.sock) * DAEMON_AUTO_CONNECT bool (default true) * * SNAPSHOT_INTERVAL dur (default 5m) * WAL_GC_INTERVAL dur (default 1h) * * WORKER_COUNT int (default 4) * WAIT_WORKER_COUNT int (default 8)
* ChatCLI - Scheduler: daemon mode. * * Running `chatcli daemon start` detaches the scheduler from the * interactive process. Any CLI that comes up on the same host detects * the daemon via the UNIX socket and becomes a thin client: /schedule, * /wait, /jobs all round-trip over IPC. * * Lifecycle: * * start → bind socket, start Scheduler, accept connections * stop → orderly Scheduler.DrainAndShutdown + close listener * status → print stats retrieved via IPC * * The daemon uses a PID file (daemon.pid) alongside the socket to * detect stale sockets after a crash. If the PID file exists but the * process is gone, the socket file is removed and start retries.
* ChatCLI - Scheduler: per-job dispatch logic (wait → action → finalize). * * handleJob is the heart of the scheduler. For a single fire: * * 1. Load the Job and verify it is still eligible (not cancelled * between pump and worker dequeue). * * 2. If Wait is set, enter the wait loop: poll Condition every * PollInterval until satisfied, timed out, or budget exhausted. * Each poll result is metrics-emitted and the breaker is updated. * * 3. Run the Action under a bounded timeout. Outcome classification * drives the next transition (success → Completed, transient * failure → retry-with-backoff, permanent → Failed). * * 4. If the job has Triggers edges, spawn the children and mark them * ready. Children created from a Triggers edge get a fresh ID * (NewJobID) so a recurring parent can fan out a fresh DAG on * every cycle. * * 5. Recurring schedules (cron / interval) re-enqueue with the * freshly computed next time unless the job itself is terminal.
* ChatCLI - Scheduler / Chronos subsystem. * * The scheduler is chatcli's durable, crash-consistent automation layer. * It turns chatcli into a standing process that can be told "wait until * X, then do Y" and keep that promise across restarts, network hiccups, * and user-input lulls. The same machinery is used by the ReAct loop * (agents and workers gain schedule_job / wait_until tools) and by the * human via /schedule, /wait and /jobs commands. * * High-level model: * * ┌─────────────┐ ┌──────────────┐ ┌─────────────┐ * │ CLI surface │◀──────▶│ Scheduler │◀──────▶│ WAL + snap │ * │ /schedule… │ │ state m/c │ │ on disk │ * └─────────────┘ │ priority Q │ └─────────────┘ * │ breakers │ * ┌─────────────┐ │ rate limit │ ┌─────────────┐ * │ Agent tools │◀──────▶│ dispatcher │──────▶│ Condition │ * │ schedule_* │ │ │ │ evaluators │ * └─────────────┘ │ │ └─────────────┘ * │ │ ┌─────────────┐ * │ │──────▶│ Action │ * │ │ │ executors │ * │ │ └─────────────┘ * │ │ * │ │─────▶ event bus + hooks + audit * └──────────────┘ * * Durability contract (identical to the Reflexion lesson queue): * 1. Accept Enqueue only after the WAL record is fsynced. * 2. Process, then ACK (delete or update the WAL). * 3. On crash, replay the WAL on boot and resume. * 4. A missed fire is classified by MissPolicy (fire_once / fire_all / * skip) so laptops that slept through a schedule do not thrash. * * Autonomy: * In-process — jobs run while chatcli is open. WAL persists state; a * new CLI session replays pending work. * Daemon — `chatcli daemon start` detaches the scheduler from the * interactive process. Any CLI that comes up detects the daemon via * UNIX socket (chatcli-scheduler.sock), thin-client style. See * daemon.go and ipc.go for the protocol. * * Safety: * Actions are gated by an allowlist (CHATCLI_SCHEDULER_ACTION_ALLOWLIST); * shell actions run under CoderMode SafeMode; agent-created jobs are * flagged trust=agent and cannot cancel user-owned jobs without the * hook manager firing a PreJobCancel event. See action/ subpackage. * * See SCHEDULER_DESIGN.md (docs/) for the architecture rationale and * tradeoff discussion. Every design decision here was made explicitly * and the options are documented alongside the chosen path.
* ChatCLI - Scheduler: sentinel errors. * * Every surface-facing error the scheduler can return is declared here * so callers (CLI handlers, agent tool adapters, daemon IPC) can match * with errors.Is. Internal-only errors that never cross a package * boundary stay inline where they occur.
* ChatCLI - Scheduler: event payloads + publishing helpers. * * The scheduler uses three notification channels, each serving a * different audience: * * 1. Event bus (cli/bus) — live fan-out to UI subscribers (status * line, Ctrl+J overlay). Non-durable; dropped on disconnect. * * 2. Hook manager (cli/hooks) — user-configured command/HTTP * callbacks. Durable to the extent the hook command is. * * 3. Audit log — append-only JSONL for forensics. Always written. * * This file defines the payload shape shared by all three. Scheduler * helper methods (emit*) centralize the triple-fan-out so every state * change produces a consistent observability signal.
* ChatCLI - Scheduler: JobID derivation. * * JobIDs are derived deterministically from (name | owner | nonce) via * sha256 truncated to 16 hex chars. Deterministic derivation gives us * two things for free: * * 1. Idempotency — re-submitting the same (name, owner) with the * same nonce produces the same ID. WAL Append dedupes on ID so * a nervous caller can retry without creating duplicates. * * 2. Readability — 16 hex chars is enough entropy to avoid collisions * in a single workspace, and operators can eyeball log lines. * * For recurring jobs the nonce is the schedule's CreatedAt timestamp * (rounded to microseconds); each occurrence gets its own JobID but * the *definition* is stable — cancel_by_name uses that stability.
* ChatCLI - Scheduler: daemon IPC protocol. * * The daemon exposes the scheduler over a UNIX socket. The protocol is * length-prefixed JSON frames (so a stream can carry both requests and * server-initiated events cleanly). * * Frame: [4-byte uint32 big-endian length] [JSON payload] * * Every request has an ID; every response carries the same ID. Server- * initiated events use ID="" and Kind="event". * * Kinds: * * client → server: * "ping" * "enqueue" Input: ToolInput + Owner * "cancel" Input: id, reason * "pause"/"resume" Input: id * "query" Input: id * "list" Input: filter * "snapshot" Input: none * "subscribe" — enter event-stream mode for this connection * "stats" Input: none — returns scheduler + daemon stats * "bye" — orderly close * * server → client: * "ok" Result: ToolOutput / bytes * "error" Error: string * "event" Event: scheduler.Event (only after subscribe) * * Thread safety: one goroutine per connection handles reads+writes * under a single writer mutex. Readers decode frames sequentially.
* ChatCLI - Scheduler: Job struct + lifecycle primitives. * * A Job is the durable record the scheduler works on. It holds: * - Identity (ID, Name, Owner, Tags) * - Schedule (when to fire) * - Action (what to run) * - WaitSpec (optional gating condition) * - Budget (timeouts, retries, poll limits) * - DAG edges (DependsOn, Triggers) * - State (Status + transitions history) * - Results (LastResult, History) * * The struct is serialized verbatim to the WAL (one record per job), * the audit log, and the IPC protocol. Schema migration is handled via * the Version field — new fields use omitempty so old records remain * readable.
* ChatCLI - Scheduler: Prometheus metrics. * * Metrics are registered against the default Prometheus registry and * exposed by the /metrics endpoint in server mode. The scheduler does * not start its own HTTP server — the chatcli metrics package handles * that. * * Naming convention follows Prometheus recommendations: * chatcli_scheduler_<subject>_<unit>_<op>
* ChatCLI - Scheduler: DSL parser for schedules and conditions. * * The user-facing / agent-facing surface of the scheduler accepts * short, ergonomic strings for the common cases and falls back to * structured JSON/YAML specs for complex ones. * * Schedule DSL: * * "in 5m" / "+5m" / "after 30s" → ScheduleRelative * "at 2026-04-25T14:00" / "at now" → ScheduleAbsolute * "every 30s" / "every 2h" → ScheduleInterval * "cron: 0 2 * * *" / "@daily" → ScheduleCron * "when-ready" / "on-condition" → ScheduleOnCondition * "manual" / "triggered" → ScheduleManual * * Condition DSL: * * "http://host/health==200" → http_status expected=200 * "http://host/health~=/ok/" → http_status + regex * "shell: <cmd>" / "shell[0]:cmd" → shell_exit * "file:/path" → file_exists * "file:/path>=10" → file_exists min_size * "k8s:pod/ns/name:ready" → k8s_resource_ready * "docker:<container>:running" → docker_running * "tcp://host:port" → tcp_reachable * "regex:<cmd>~=/pattern/" → regex_match * "llm: <prompt>" → llm_check * "and(<expr1>, <expr2>)" / "or(...)" → composite * "not <expr>" → apply Negate=true * * Ambiguous or complex inputs must be submitted as a JSON spec via * --wait-spec-file / --condition-json.
* ChatCLI - Scheduler: shell policy preflight. * * Every Job admitted into the scheduler is walked BEFORE being written * to the WAL, and each shell command embedded in it is classified * against the CoderMode policy manager (via CLIBridge.ClassifyShellCommand). * * The rules: * * ShellPolicyDeny → reject enqueue with ErrShellPolicyDeny, always. * Even --i-know (DangerousConfirmed=true) cannot * override a deny — denylist is authoritative. * * ShellPolicyAsk → reject enqueue with ErrShellPolicyAsk, UNLESS * job.DangerousConfirmed is true. The user (or * agent with explicit user blessing) must * pre-acknowledge that the command would normally * prompt. This converts an interactive approval * into a durable decision attached to the job. * * ShellPolicyAllow → admit normally. * * At fire time RunShell on the bridge re-checks — a policy update * between schedule and fire that moves a command from Allow→Deny * makes the job fail cleanly. * * This file also enumerates shell commands from a Job. Shell commands * hide in three places: * * 1. Action.Payload — when Action.Type == ActionShell, the command * lives under payload.command. * 2. Wait.Condition — shell-based evaluators (shell_exit, regex_match, * custom, k8s_resource_ready, docker_running, llm_check with * context_cmd) carry a command in Condition.Spec. * 3. Composite children — all_of/any_of recurse through Children. * * Non-shell actions/conditions (webhook, http_status, file_exists, * tcp_reachable) do not need policy review here — their security * gates are in the network / filesystem layers.
* ChatCLI - Scheduler: priority queue. * * The scheduling index is a min-heap keyed on NextFireAt. Fronted by a * map[JobID]*heapEntry for O(1) dedupe, Remove, and Requeue. Lazy * deletion keeps Remove off the heap's critical path — entries are * marked dead and skipped on Pop. * * Invariants: * - Each JobID appears at most once live in the heap. * - Dead entries sort after live ones so Pop reliably returns work. * - The queue is the *scheduling index* only; the WAL is the truth * of what jobs exist. Drain (on boot) re-Enqueue-s from the WAL. * * Thread-safety: single mutex + cond var (same pattern as lessonq, * already proven in production).
* ChatCLI - Scheduler: rate limiting. * * Two levels: * * Global — protects the scheduler itself from enqueue storms. A * token-bucket on total Enqueue rate; when exhausted, Enqueue * returns ErrRateLimited with a Retry-After hint. * * Per-owner — caps how many jobs a single agent or worker can create * per minute. Prevents a runaway ReAct loop from filling the queue. * * Implemented on top of golang.org/x/time/rate, which is already a * dependency (used by the LLM manager's per-provider rate limits).
* ChatCLI - Scheduler: evaluator + executor interfaces and registries. * * The scheduler stays extensible by deferring "how do I evaluate * k8s readiness" to a ConditionEvaluator plug-in, and "how do I run a * shell command" to an ActionExecutor. Both interfaces are small and * synchronous; the scheduler owns concurrency, the plug-in owns * semantics. * * Registration happens at scheduler construction time (see * builtin.go's RegisterBuiltins helper). Users who want to add a custom * evaluator / executor implement the interface and call * scheduler.ConditionRegistry().Register(...) before Start.
* ChatCLI - Scheduler: terminal render helpers for /jobs output. * * These helpers are string generators — the CLI package decides where * to print them. Keeping render in the scheduler package lets the * daemon IPC client reuse the same formatting when the operator * attaches remotely. * * Formatting is chosen for terminal consumption: ASCII boxes + ANSI- * agnostic (no colors here; color is applied by the caller via the * existing cli/colors.go palette).
* ChatCLI - Scheduler: boot-time replay. * * Boot order: * 1. Try the snapshot — fast path. If present and version-compatible, * hydrate the in-memory map from it, then overlay any .wal records * not covered by the snapshot. * 2. If no usable snapshot, fall back to a full WAL scan. * 3. For every live (non-terminal) job, re-enqueue it with an * adjusted NextFireAt according to MissPolicy. * * MissPolicy semantics on replay: * fire_once — coalesce all missed ticks into a single fire at now. * fire_all — schedule now, and scheduler.maybeReschedule will keep * firing as the queue catches up (bounded by the natural * cron tick so we don't runaway). * skip — leave NextFireAt in the future; missed windows are * considered a no-op.
* ChatCLI - Scheduler: retry / backoff math. * * Isolated here so unit tests can exercise the ramp without spinning * up a full Scheduler. The shape is identical to the Reflexion queue's * retry policy (RetryPolicy.NextDelay in lessonq): expo with cap + * uniform jitter.
* ChatCLI - Scheduler: Schedule value type + cron parser. * * A Schedule answers one question: "when should this job next be * considered for execution?". The scheduler's main loop calls * Schedule.Next(now, prev) to compute the next fire time. * * Cron expressions: * * Five-field standard form — "minute hour day-of-month month day-of-week" * * Field Range Aliases * minute 0-59 * hour 0-23 * day-of-month 1-31 * month 1-12 jan feb mar apr may jun jul aug sep oct nov dec * day-of-week 0-6 (0=Sun) sun mon tue wed thu fri sat * * Specials per field: "*", "N", "A,B,C", "A-B", "N/S", "A-B/S". * Shorthand: "@hourly" "@daily" "@weekly" "@monthly" "@yearly". * * The parser is intentionally minimal — we support the subset that * covers 99% of automation use cases and bail with a clear error on * anything exotic. No external cron library so we keep compile time * / supply chain small.
* ChatCLI - Scheduler: core engine. * * Scheduler is the single object callers interact with. It composes the * WAL + in-memory queue + rate limiter + breaker group + registries * into a coherent lifecycle: * * sched := scheduler.New(cfg, bridge, logger) * scheduler.RegisterBuiltins(sched) // standard plug-ins * sched.Start(ctx) // spin up loops * id, _ := sched.Enqueue(ctx, job) * // ... * sched.DrainAndShutdown(30 * time.Second) * * The worker model is two-tier: * * Schedule pump — one goroutine owns the scheduleQueue. It PopReady's * the earliest fireAt and hands the JobID to a bounded worker pool. * * Worker pool — N goroutines (Config.WorkerCount) pick up job IDs * and run handleJob which walks the full Wait → Action → Finalize * pipeline. Wait polls spawn their own ticker; the worker blocks on * ctx and the poll channel. * * Durability contract: * 1. Every Enqueue fsyncs the WAL before returning. * 2. Every state transition Writes the updated Job to the WAL. * 3. Terminal jobs stay on disk for Config.DefaultTTL; the GC * goroutine Acks them after expiration.
* ChatCLI - Scheduler: shell policy classification types. * * Dedicated file so the CLIBridge interface stays readable and tests * can import the ShellPolicy symbol without pulling the full bridge * surface. * * The scheduler package intentionally does NOT depend on cli/coder; * classification is delegated to the host process through the * CLIBridge abstraction. This keeps the scheduler engine unit- * testable against mock policies and keeps the import graph shallow.
* ChatCLI - Scheduler: periodic state snapshot. * * Rationale: the WAL alone recovers correctly, but boot time grows * linearly with total record count. A periodic snapshot freezes the * live state to a single file (snapshot.json). On boot the scheduler * loads the snapshot first, then overlays any .wal records newer than * the snapshot's timestamp. Records covered by the snapshot can be * GC'd out of the WAL directory. * * The snapshot is written atomically via tmp-rename so partial writes * are never observable. Stale / corrupt snapshots are ignored in favor * of a full WAL scan — the WAL is the truth, the snapshot is cache.
* ChatCLI - Scheduler: Job state machine. * * The state machine serves three purposes: * * 1. Enforce legal transitions — rejecting bugs that try to push a * job from (for example) Completed back to Running. * * 2. Centralize the side-effects of a transition — metrics, audit * log entry, event-bus publish, and hook fire. Every transition * path produces the same observability signal with no duplication. * * 3. Make concurrent mutation safe. Each Job owns a sync.Mutex that * Scheduler methods lock for the duration of the transition so no * two goroutines race over the state. * * Legal transitions (read top→bottom; missing edges are illegal): * * Pending → Blocked, Waiting, Running, Paused, Cancelled, Skipped * Blocked → Pending, Cancelled * Waiting → Running, TimedOut, Cancelled, Failed * Running → Completed, Failed, TimedOut, Cancelled, Pending (recurring re-arm) * Paused → Pending, Cancelled * (terminal) Completed / Failed / Cancelled / TimedOut / Skipped * * The scheduler never resurrects terminal jobs. A recurring cron or * interval schedule re-arms BEFORE the Running→Completed transition: * after a successful action the dispatcher takes the Running→Pending * edge so the same Job record (and JobID) keeps cycling. This keeps * Job count, WAL footprint, and queue depth bounded — a critical * property when intervals are short (e.g. health-check every 30s * would otherwise grow 2880 records/day). History entries carry a * CycleNum so /jobs logs separates cycles cleanly even though they * share one Job. Terminal statuses remain a one-way door for non- * recurring work.
* ChatCLI - Scheduler: agent tool adapter. * * The ReAct loop gains five tools that map directly onto the Scheduler * API: * * schedule_job(spec) — create a job; returns the ID. * wait_until(spec) — sync wait (agent yields); returns outcome. * With async=true, returns immediately after * creating the job (caller receives job.fired * event in next turn via AppendHistory). * query_job(id) — return a Job clone. * list_jobs(filter) — return summaries. * cancel_job(id) — cancel if authorized. * * The adapter is stateless: every method gets the owner context from * the caller (agent mode passes its agent name, the ChatCLI session * passes user). Authorization is enforced inside Scheduler itself. * * The adapter returns JSON strings suitable for direct insertion into * the ReAct loop's tool_result block. Callers just pass (input_json, * agent_owner) and get (result_json, error).
* ChatCLI - Scheduler: core value types. * * Every type here is a plain value type. Behavior lives in separate * files (scheduler.go, queue.go, state_machine.go, action/, condition/). * Keeping types/behavior separated makes the schema trivial to JSON * encode for the WAL, the IPC protocol and the audit log.
* ChatCLI - Scheduler: Write-Ahead Log. * * Identical durability contract as the Reflexion lesson queue: * - Append is synchronous (fsync + atomic rename) before the job is * visible to the in-memory queue. * - Record format is a framed envelope with leading+trailing CRC32 * so torn writes are detected and discarded on replay. * - One file per record named by JobID, so ACK is a single unlink. * * The scheduler WAL differs from lessonq in two ways: * * 1. Records mutate — a job moves Pending → Waiting → Running → * Completed. Each transition rewrites the .wal file via the same * write-then-rename path, so readers never observe torn state. * * 2. Terminal jobs are NOT immediately unlinked; they're kept for * TTL so /jobs history can list them, then GC'd. The snapshot * (snapshot.go) periodically freezes the entire state into a * single file so replay is fast even with thousands of records. * * File layout: * * <dir>/ * <jobid>.wal — current record per job * <jobid>.tmp.* — in-flight Append; leftover tmp on crash = ignore * snapshot.json — periodic full-state snapshot (see snapshot.go) * snapshot.tmp — in-flight snapshot write
Index ¶
- Constants
- Variables
- func CheckDaemon(socketPath string) error
- func DefaultSocketPath(cfg Config) string
- func NewAuditFileWriter(dir string, maxSizeMB, maxBackups, maxAgeDays int, logger *zap.Logger, ...) auditWriter
- func NopAuditWriter() auditWriter
- func RenderList(summaries []JobSummary) string
- func RenderShow(j *Job) string
- func RenderTree(jobs []*Job) string
- func StatusBadge(s JobStatus) string
- func StatusLine(summaries []JobSummary) string
- type Action
- func (a Action) PayloadBool(key string, def bool) bool
- func (a Action) PayloadInt(key string, def int) int
- func (a Action) PayloadMap(key string) map[string]any
- func (a Action) PayloadString(key, def string) string
- func (a Action) PayloadStringSlice(key string) []string
- func (a Action) Validate() error
- type ActionExecutor
- type ActionRegistry
- type ActionResult
- type ActionType
- type Breaker
- type BreakerConfig
- type BreakerState
- type Budget
- type CLIBridge
- type Condition
- type ConditionEvaluator
- type ConditionRegistry
- type Config
- type Daemon
- type DaemonStats
- type EvalEnv
- type EvalOutcome
- type Event
- type EventType
- type ExecEnv
- type ExecutionResult
- type Frame
- type FrameKind
- type Job
- type JobID
- type JobStatus
- type JobSummary
- type ListFilter
- type Metrics
- type MissPolicy
- type Outcome
- type Owner
- type OwnerKind
- type PruneFilter
- type RemoteClient
- func (c *RemoteClient) Cancel(ctx context.Context, owner Owner, id JobID, reason string) error
- func (c *RemoteClient) Close() error
- func (c *RemoteClient) Enqueue(ctx context.Context, owner Owner, in ToolInput) (*ToolOutput, error)
- func (c *RemoteClient) List(ctx context.Context, filter ListFilter) ([]JobSummary, error)
- func (c *RemoteClient) Pause(ctx context.Context, owner Owner, id JobID, reason string) error
- func (c *RemoteClient) Ping(ctx context.Context) error
- func (c *RemoteClient) Query(ctx context.Context, id JobID) (*Job, error)
- func (c *RemoteClient) Resume(ctx context.Context, owner Owner, id JobID) error
- func (c *RemoteClient) Snapshot(ctx context.Context) error
- func (c *RemoteClient) Stats(ctx context.Context) (DaemonStats, error)
- func (c *RemoteClient) Subscribe(ctx context.Context) (<-chan Event, error)
- type Schedule
- type ScheduleKind
- type Scheduler
- func (s *Scheduler) Actions() *ActionRegistry
- func (s *Scheduler) Bridge() CLIBridge
- func (s *Scheduler) Cancel(id JobID, reason string, requester Owner) error
- func (s *Scheduler) Conditions() *ConditionRegistry
- func (s *Scheduler) Config() Config
- func (s *Scheduler) DrainAndShutdown(timeout time.Duration)
- func (s *Scheduler) Enqueue(ctx context.Context, job *Job) (*Job, error)
- func (s *Scheduler) FindByName(name string) (*Job, bool)
- func (s *Scheduler) IsClosed() bool
- func (s *Scheduler) List(filter ListFilter) []JobSummary
- func (s *Scheduler) Metrics() *Metrics
- func (s *Scheduler) Pause(id JobID, reason string, requester Owner) error
- func (s *Scheduler) Prune(filter PruneFilter) ([]JobID, error)
- func (s *Scheduler) Query(id JobID) (*Job, error)
- func (s *Scheduler) Resume(id JobID, requester Owner) error
- func (s *Scheduler) SetBridge(b CLIBridge)
- func (s *Scheduler) Snapshot() error
- func (s *Scheduler) Start(ctx context.Context) error
- func (s *Scheduler) Version() uint64
- type SchedulerDeps
- type ShellPolicy
- type Suggestion
- type TimeoutBehavior
- type ToolAdapter
- func (t *ToolAdapter) CancelJob(_ context.Context, owner Owner, rawIn string) (string, error)
- func (t *ToolAdapter) ListJobs(_ context.Context, owner Owner, rawIn string) (string, error)
- func (t *ToolAdapter) QueryJob(_ context.Context, _ Owner, rawIn string) (string, error)
- func (t *ToolAdapter) ScheduleJob(ctx context.Context, owner Owner, rawIn string) (string, error)
- func (t *ToolAdapter) WaitUntil(ctx context.Context, owner Owner, rawIn string) (string, error)
- type ToolInput
- type ToolOutput
- type TransitionReason
- type WaitSpec
Constants ¶
const SchemaVersion = 1
SchemaVersion increments whenever Job gains a required field. Tests assert the WAL is forward-compatible via the /testdata fixtures.
Variables ¶
var ( // ErrSchedulerClosed is returned from any public method after the // scheduler has been stopped. Callers should treat this as terminal // for the current instance — a new Scheduler must be constructed to // accept work again. ErrSchedulerClosed = errors.New("scheduler: closed") // ErrSchedulerDraining is returned from Enqueue after Stop has been // called but before all in-flight jobs have finished. New work is // rejected; already-queued jobs continue to run. ErrSchedulerDraining = errors.New("scheduler: draining") // ErrNotStarted is returned when a method that requires a running // loop is called before Start. ErrNotStarted = errors.New("scheduler: not started") )
Lifecycle.
var ( // ErrInvalidJob wraps a specific validation failure from Job.Validate. ErrInvalidJob = errors.New("scheduler: invalid job") // ErrInvalidSchedule indicates a Schedule whose fields are // inconsistent with its Kind (e.g. absolute schedule with zero // time, cron with empty expression). ErrInvalidSchedule = errors.New("scheduler: invalid schedule") // ErrInvalidCondition is returned when a Condition.Type is unknown // to the registry, or when the Spec is missing required fields. ErrInvalidCondition = errors.New("scheduler: invalid condition") // ErrInvalidAction mirrors ErrInvalidCondition for actions. ErrInvalidAction = errors.New("scheduler: invalid action") // ErrDuplicateName is returned when Enqueue sees a job name already // held by a non-terminal job. Job IDs are globally unique (derived // from name + owner + nonce) — but users expect human-readable // names to be unique within their scope, so we enforce it. ErrDuplicateName = errors.New("scheduler: duplicate name") // ErrDAGCycle is returned when the caller passes DependsOn edges // that form a cycle. Scheduler refuses to admit such jobs. ErrDAGCycle = errors.New("scheduler: dag cycle") )
Validation — raised during parse / admission.
var ( // ErrJobNotFound is returned from Query / Cancel / Pause when the // given ID has no record in the active set. ErrJobNotFound = errors.New("scheduler: job not found") // ErrJobTerminal is returned when a mutating operation is attempted // on a job that has already reached a terminal state (completed, // failed, cancelled). ErrJobTerminal = errors.New("scheduler: job is terminal") // ErrWaitTimeout is returned by evaluators that exhausted their // MaxPolls or WaitTimeout without the condition ever being // satisfied. ErrWaitTimeout = errors.New("scheduler: wait condition timed out") // ErrBreakerOpen is returned when an evaluator or action is skipped // because its circuit breaker is open. The scheduler still marks // the job as failed for metric purposes; the message surfaces why. ErrBreakerOpen = errors.New("scheduler: circuit breaker open") // ErrRateLimited is returned from Enqueue when the per-owner or // global rate limit would be exceeded. Callers may retry after // the Retry-After hint surfaced on the underlying limiter error. ErrRateLimited = errors.New("scheduler: rate limited") // ErrQueueFull is returned when Enqueue cannot admit more jobs // and the overflow policy is Block (with timeout elapsed) or when // the WAL has already reached MaxJobs. ErrQueueFull = errors.New("scheduler: queue full") )
Runtime — raised while executing.
var ( // ErrNotAuthorized is returned when a caller (typically an agent) // tries to mutate a job owned by a different principal. ErrNotAuthorized = errors.New("scheduler: not authorized") // ErrActionDisallowed is returned when a requested Action type is // not in the operator-configured allowlist. ErrActionDisallowed = errors.New("scheduler: action type not in allowlist") // ErrDangerousShell is returned when a shell action's command // matches a pattern flagged by the shell safety filter and the // job was not submitted with the explicit --i-know flag. ErrDangerousShell = errors.New("scheduler: dangerous shell command requires explicit confirmation") // ErrShellPolicyDeny is returned when a shell command (in an // action or wait condition) is on the CoderMode denylist. // Unlike ErrDangerousShell, this cannot be overridden by --i-know // — denylist beats user confirmation. The job is rejected at // enqueue time and never admitted to the WAL. ErrShellPolicyDeny = errors.New("scheduler: shell command denied by policy") // ErrShellPolicyAsk is returned when a shell command would // normally require interactive approval under CoderMode. The // scheduler has no interactive channel at fire time, so the // command is rejected unless the job was created with --i-know // (which sets Job.DangerousConfirmed=true). When re-checked at // fire time (policy may have changed since enqueue), the same // error is returned if the classification drifted to Ask. ErrShellPolicyAsk = errors.New(`scheduler: shell command requires approval. Pre-authorize with one of: ` + `(slash command) /schedule ... --i-know ` + `(@scheduler tool) {"cmd":"schedule","args":{...,"i_know":true}} ` + `(persist for future jobs) /config security allow "<pattern>"`) )
Authorization.
var ( // ErrNoDaemon is returned by the IPC client when no daemon is // reachable on the configured socket path. Callers may fall back // to in-process scheduling or prompt the user to start the daemon. ErrNoDaemon = errors.New("scheduler: no daemon running") // ErrDaemonRunning is returned by `chatcli daemon start` when a // daemon is already bound to the socket — starting a second daemon // would split the WAL between two writers. ErrDaemonRunning = errors.New("scheduler: daemon already running") // ErrIPCProtocol covers any malformed / unrecognized frame on the // daemon socket. ErrIPCProtocol = errors.New("scheduler: ipc protocol error") )
Daemon.
var ( // ErrWALCorrupted is returned when a WAL record fails CRC validation // and is not recoverable. The record is quarantined to the DLQ. ErrWALCorrupted = errors.New("scheduler: wal record corrupted") )
Persistence.
Functions ¶
func CheckDaemon ¶
CheckDaemon tries to Ping the daemon at socketPath; returns nil iff a daemon is up and responsive.
func DefaultSocketPath ¶
DefaultSocketPath returns the platform-default daemon socket path. Honors CHATCLI_SCHEDULER_DAEMON_SOCKET when set.
func NewAuditFileWriter ¶
func NewAuditFileWriter(dir string, maxSizeMB, maxBackups, maxAgeDays int, logger *zap.Logger, m *Metrics) auditWriter
NewAuditFileWriter returns a file-backed audit writer. Exposed for the daemon, which wants the path reported via IPC.
func NopAuditWriter ¶
func NopAuditWriter() auditWriter
NopAuditWriter returns the no-op audit writer.
func RenderList ¶
func RenderList(summaries []JobSummary) string
RenderList emits the /jobs list table as plain text (no color). Columns: STATUS | ID | NAME | OWNER | TYPE | NEXT FIRE | LAST
func RenderShow ¶
RenderShow emits the detailed `/jobs show <id>` view.
func RenderTree ¶
RenderTree emits the DAG view rooted at any top-level job (one whose ParentID is empty or whose parent isn't in the list).
func StatusBadge ¶
StatusBadge returns a fixed-width one-glyph label for a JobStatus. Used by the status line prefix and the tree view.
func StatusLine ¶
func StatusLine(summaries []JobSummary) string
StatusLine renders a short indicator for the prompt prefix: "[jobs: 2▶ 3⏳ 1⏱]"
Types ¶
type Action ¶
type Action struct {
Type ActionType `json:"type"`
Payload map[string]any `json:"payload,omitempty"`
}
Action describes what to execute when the schedule fires (and the wait condition, if any, is satisfied).
func ParseActionDSL ¶
ParseActionDSL interprets a short string as an Action.
"/foo bar" → slash_cmd command=/foo bar "shell: <cmd>" → shell command=<cmd> "agent: <task>" → agent_task task=<task> "worker <name>: <task>" → worker_dispatch "llm: <prompt>" → llm_prompt prompt=<prompt> "POST https://...| body" → webhook "hook:<event>" → hook event=<event> "noop" → noop
func ParseActionJSON ¶
ParseActionJSON loads an action from a JSON string.
func (Action) PayloadBool ¶
PayloadBool reads a bool field from the action payload.
func (Action) PayloadInt ¶
PayloadInt reads an int field from the action payload.
func (Action) PayloadMap ¶
PayloadMap reads a map[string]any field. Useful for webhook headers and llm_prompt tool overrides.
func (Action) PayloadString ¶
PayloadString reads a string field from the action payload with a default.
func (Action) PayloadStringSlice ¶
PayloadStringSlice reads a []string field (or a single string that splits on commas/whitespace).
type ActionExecutor ¶
type ActionExecutor interface {
Type() ActionType
ValidateSpec(payload map[string]any) error
Execute(ctx context.Context, action Action, env *ExecEnv) ActionResult
}
ActionExecutor is a plug-in that can run one action type.
type ActionRegistry ¶
type ActionRegistry struct {
// contains filtered or unexported fields
}
ActionRegistry mirrors ConditionRegistry.
func NewActionRegistry ¶
func NewActionRegistry() *ActionRegistry
NewActionRegistry builds an empty registry.
func (*ActionRegistry) Get ¶
func (r *ActionRegistry) Get(t ActionType) (ActionExecutor, bool)
Get returns the executor for a type.
func (*ActionRegistry) MustRegister ¶
func (r *ActionRegistry) MustRegister(e ActionExecutor)
MustRegister panics on duplicate.
func (*ActionRegistry) Register ¶
func (r *ActionRegistry) Register(e ActionExecutor) error
Register adds an executor.
func (*ActionRegistry) Types ¶
func (r *ActionRegistry) Types() []ActionType
Types returns the registered type names, sorted.
type ActionResult ¶
type ActionResult struct {
Output string
Tokens int
Cost float64
// Transient mirrors EvalOutcome.Transient — retryable vs permanent.
Transient bool
Err error
}
ActionResult is what an executor returns after a single invocation.
type ActionType ¶
type ActionType string
ActionType enumerates the built-in action categories.
const ( // ActionSlashCmd — invokes a CLI slash command via the command handler. ActionSlashCmd ActionType = "slash_cmd" // ActionShell — runs a shell command under CoderMode safety. ActionShell ActionType = "shell" // ActionAgentTask — boots a ReAct loop with the given task. ActionAgentTask ActionType = "agent_task" // ActionWorkerDispatch — single-agent worker invocation. ActionWorkerDispatch ActionType = "worker_dispatch" // ActionLLMPrompt — headless LLM call (no tool loop). ActionLLMPrompt ActionType = "llm_prompt" // ActionWebhook — HTTP POST. ActionWebhook ActionType = "webhook" // ActionHook — fires a chatcli hook by name. ActionHook ActionType = "hook" // ActionNoop — do nothing (testing, placeholders). ActionNoop ActionType = "noop" // ActionAgentResume — load a parked agent snapshot and re-enter the // interactive ReAct loop. Fired by single-shot timer parks // (delay/until) and by the polling driver when the probe matches. ActionAgentResume ActionType = "agent_resume" // ActionParkPoll — drives the polling loop for @park for_url / // for_cmd. Re-enqueues itself at every interval until the probe // matches success_when (transitions to AgentResume) or the deadline // elapses (also transitions to AgentResume with timeout outcome). ActionParkPoll ActionType = "park_poll" )
type Breaker ¶
type Breaker struct {
// contains filtered or unexported fields
}
Breaker is the per-(kind,key) circuit breaker. Safe for concurrent use.
func (*Breaker) Acquire ¶
Acquire asks the breaker whether a call may proceed. Returns:
- (true, nil) → call allowed. Caller must invoke Release after.
- (false, err) → call denied. err wraps ErrBreakerOpen.
The returned release callback records success/failure outcome. On (false, err) the release callback is a no-op.
func (*Breaker) State ¶
func (b *Breaker) State() BreakerState
State reports the current breaker state for /jobs show and metrics.
type BreakerConfig ¶
type BreakerConfig struct {
// FailureThreshold is how many failures within Window trip the breaker.
FailureThreshold int
// Window is the sliding window during which failures are counted.
Window time.Duration
// Cooldown is how long to stay Open before a HalfOpen probe.
Cooldown time.Duration
// HalfOpenSuccessRequired is how many consecutive successes close
// the breaker from HalfOpen. Default 1 — one probe is enough.
HalfOpenSuccessRequired int
}
BreakerConfig tunes the trip thresholds. Zero values inherit scheduler defaults — see breakerGroup.NewBreaker.
type BreakerState ¶
type BreakerState int32
BreakerState is the exposed lifecycle state. Kept as a small int so atomic loads are cheap on the hot path.
const ( BreakerClosed BreakerState = iota BreakerOpen BreakerHalfOpen )
func (BreakerState) String ¶
func (s BreakerState) String() string
String makes states log-friendly.
type Budget ¶
type Budget struct {
// ActionTimeout bounds a single execution of Action.
ActionTimeout time.Duration `json:"action_timeout,omitempty"`
// MaxRetries is the total number of retries after a failure
// (so MaxRetries=3 means up to 4 executions total).
MaxRetries int `json:"max_retries,omitempty"`
// Backoff describes the retry delay ramp.
BackoffInitial time.Duration `json:"backoff_initial,omitempty"`
BackoffMax time.Duration `json:"backoff_max,omitempty"`
BackoffMult float64 `json:"backoff_mult,omitempty"`
BackoffJitter float64 `json:"backoff_jitter,omitempty"` // 0..0.5
// WaitTimeout caps the total time a wait-until loop may consume.
WaitTimeout time.Duration `json:"wait_timeout,omitempty"`
// PollInterval is the gap between wait-condition evaluations.
PollInterval time.Duration `json:"poll_interval,omitempty"`
// MaxPolls caps the number of wait evaluations. Zero = unlimited
// (bounded only by WaitTimeout).
MaxPolls int `json:"max_polls,omitempty"`
}
Budget bundles every "how long / how many times" limit the scheduler enforces on a single job. Any zero field is interpreted as "use the scheduler default from Config".
func DefaultBackoff ¶
func DefaultBackoff() Budget
DefaultBackoff returns the production default: 1s → 5min ramp, factor 2, 20% jitter, 5 retries.
type CLIBridge ¶
type CLIBridge interface {
// ExecuteSlashCommand runs a /foo command as if the user typed it.
// Returns the output captured and whether the command asked to
// exit the CLI loop. dangerousConfirmed mirrors
// Job.DangerousConfirmed so the scheduler's headless policy
// checker can admit "Ask" classifications when the job was
// pre-authorized via --i-know / i_know:true.
ExecuteSlashCommand(ctx context.Context, line string, dangerousConfirmed bool) (output string, exit bool, err error)
// RunAgentTask boots a ReAct loop with the given task + system
// prompt hint (same API as ChatCLI.agentMode.Run). Returns the
// final assistant message. dangerousConfirmed has the same
// semantics as on ExecuteSlashCommand.
RunAgentTask(ctx context.Context, task, systemHint string, dangerousConfirmed bool) (string, error)
// DispatchWorker runs a single worker (see cli/agent/workers) with
// the named agent type and task. Useful as a lightweight action
// when the user wants one-shot behavior without full ReAct state.
DispatchWorker(ctx context.Context, agentType, task string) (string, error)
// SendLLMPrompt calls the currently-configured LLM client outside
// the chat history. Used by ActionLLMPrompt and llm_check
// evaluator.
SendLLMPrompt(ctx context.Context, system, prompt string, maxTokens int) (text string, tokens int, cost float64, err error)
// FireHook dispatches a chatcli hook event synchronously and
// returns the HookResult (for scheduler use of action_type=hook).
FireHook(event hooks.HookEvent) *hooks.HookResult
// RunShell executes a shell command under CoderMode safety rules
// (same allowlist the agent mode enforces). When coderSafetyBypass
// is true and the operator has explicitly granted it, the command
// runs without the allowlist — reserved for trusted automation.
//
// dangerousConfirmed mirrors Job.DangerousConfirmed (set by
// --i-know on /schedule or i_know:true on @scheduler). When true,
// the fire-time recheck admits "Ask" classifications — the user
// already pre-authorized at enqueue. Denylist still rejects (deny
// always beats --i-know). Action executors that hold an env.Job
// (e.g. action/shell.go) MUST pass env.Job.DangerousConfirmed
// here; otherwise jobs admitted via i_know fail their re-check.
//
// Defense in depth: even though Enqueue already preflight-checked
// every shell command via ClassifyShellCommand, RunShell re-checks
// at fire time so policy changes between schedule and execution
// propagate — a command that was Allowed when enqueued but that
// now hits a new Deny rule must fail instead of running.
RunShell(ctx context.Context, cmd string, envOverrides map[string]string, coderSafetyBypass, dangerousConfirmed bool) (stdout string, stderr string, exitCode int, err error)
// ClassifyShellCommand asks the host's CoderMode policy whether a
// raw shell command would be allowed, denied, or require
// interactive approval. Called by Scheduler.Enqueue to pre-flight
// every shell command in a Job (action + wait condition + nested
// composites) before the job is even admitted — so daemon-mode
// jobs can never hit an interactive prompt downstream.
//
// The scheduler never itself prompts the user; this method is the
// one chance to classify. Returning ShellPolicyAsk means "would
// normally prompt" and causes the scheduler to reject the enqueue
// unless the job is explicitly marked DangerousConfirmed (user
// passed --i-know, or the agent passed i_know:true in its tool
// call with explicit user blessing upstream).
ClassifyShellCommand(cmd string) ShellPolicy
// KubeconfigPath returns the path the K8s evaluator should use
// (honors CHATCLI_KUBECONFIG, KUBECONFIG, or the watcher's config).
KubeconfigPath() string
// DockerSocketPath returns the docker engine socket (env
// DOCKER_HOST or unix:///var/run/docker.sock).
DockerSocketPath() string
// WorkspaceDir returns the chatcli workspace root (project dir or
// CWD), used for resolving relative paths in conditions/actions.
WorkspaceDir() string
// LLMClient returns the currently-configured LLM client for
// evaluators that want to issue their own prompts (llm_check).
LLMClient() client.LLMClient
// AppendHistory lets async actions add a message to the chat
// history (tagged ownership=scheduler so compaction recognizes it).
AppendHistory(msg models.Message)
// PublishEvent forwards a scheduler Event to the live cli.bus
// so the Ctrl+J overlay and the status line can react.
PublishEvent(evt Event)
// NotifyParkComplete is called by ActionAgentResume when a parked
// agent's wait condition is satisfied (timer elapsed, probe matched,
// or deadline reached). The bridge resolves the snapshot, restores
// the interactive agent loop, and injects parkResult as the synthetic
// tool-call result that the @park invocation would have returned.
//
// outcome is one of "elapsed", "matched", "timeout", "cancelled" —
// short tags driven by the action that fired this notify.
// detail carries the raw probe output (HTTP body, stdout) when
// relevant; passed through verbatim to the agent's system prompt.
NotifyParkComplete(ctx context.Context, token, outcome, detail string) error
// RunHTTPProbe issues a single HTTP request against url with the
// given method and headers, and returns the status code, body
// (capped to a sane size), and error. Used by ActionParkPoll. The
// bridge implementation centralizes timeout, redirect policy, TLS
// and proxy semantics so action executors stay terse.
RunHTTPProbe(ctx context.Context, url, method string, headers map[string]string, timeout time.Duration) (status int, body string, err error)
}
CLIBridge is implemented by the top-level cli.ChatCLI so the scheduler can invoke slash commands, dispatch agent tasks, query the LLM, fire hooks, and read K8s / workspace state.
func NewNoopBridge ¶
func NewNoopBridge() CLIBridge
NewNoopBridge returns a bridge whose methods all return stubs. Used in tests and by the daemon when no CLI is attached.
type Condition ¶
type Condition struct {
Type string `json:"type"`
Spec map[string]any `json:"spec,omitempty"`
Children []Condition `json:"children,omitempty"`
Negate bool `json:"negate,omitempty"`
}
Condition is the polymorphic descriptor the scheduler hands to a ConditionEvaluator at evaluation time. Type picks the evaluator; Spec carries its parameters; Children allows composites (all_of, any_of, not).
func ParseConditionDSL ¶
ParseConditionDSL interprets a short string as a Condition.
func ParseConditionJSON ¶
ParseConditionJSON loads a condition from a JSON string.
func (Condition) SpecDuration ¶
SpecDuration reads a duration field from Spec. Accepts Go duration strings ("5s", "10m") or integer seconds.
func (Condition) SpecString ¶
SpecString reads a string field from Spec with a default.
type ConditionEvaluator ¶
type ConditionEvaluator interface {
// Type is the string that appears in Condition.Type.
Type() string
// ValidateSpec is called once at job admission. Failure here blocks
// Enqueue with ErrInvalidCondition.
ValidateSpec(spec map[string]any) error
// Evaluate performs one check. ctx carries a timeout set by the
// scheduler (from Budget.PollInterval by default). Network-bound
// evaluators must honor ctx.Done().
Evaluate(ctx context.Context, cond Condition, env *EvalEnv) EvalOutcome
}
ConditionEvaluator is a plug-in that knows how to check one condition type. Methods are pure from the scheduler's standpoint — the caller will wrap Evaluate in a breaker and a timeout.
type ConditionRegistry ¶
type ConditionRegistry struct {
// contains filtered or unexported fields
}
ConditionRegistry holds the set of registered evaluators.
func NewConditionRegistry ¶
func NewConditionRegistry() *ConditionRegistry
NewConditionRegistry builds an empty registry.
func (*ConditionRegistry) Get ¶
func (r *ConditionRegistry) Get(t string) (ConditionEvaluator, bool)
Get returns the evaluator for a type, or (nil, false).
func (*ConditionRegistry) MustRegister ¶
func (r *ConditionRegistry) MustRegister(e ConditionEvaluator)
MustRegister panics on duplicate — suitable for RegisterBuiltins.
func (*ConditionRegistry) Register ¶
func (r *ConditionRegistry) Register(e ConditionEvaluator) error
Register adds an evaluator. Duplicates are rejected.
func (*ConditionRegistry) Types ¶
func (r *ConditionRegistry) Types() []string
Types returns the registered type names, sorted.
type Config ¶
type Config struct {
Enabled bool
DataDir string
MaxJobs int
DefaultActionTimeout time.Duration
DefaultPollInterval time.Duration
DefaultWaitTimeout time.Duration
DefaultMaxPolls int
DefaultBackoffInitial time.Duration
DefaultBackoffMax time.Duration
DefaultBackoffMult float64
DefaultBackoffJitter float64
DefaultMaxRetries int
DefaultTTL time.Duration
HistoryLimit int
AllowAgents bool
ActionAllowlist map[ActionType]bool
RateLimitGlobalRPS float64
RateLimitGlobalBurst int
RateLimitOwnerRPS float64
RateLimitOwnerBurst int
BreakerConfig BreakerConfig
AuditEnabled bool
AuditMaxSizeMB int
AuditMaxBackups int
AuditMaxAgeDays int
DaemonSocket string
DaemonAutoConnect bool
SnapshotInterval time.Duration
WALGCInterval time.Duration
WorkerCount int
WaitWorkerCount int
}
Config bundles every scheduler knob. Callers typically build one via LoadConfigFromEnv() + overrides; tests construct literals.
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns a production-safe baseline with all knobs set to the values documented at the top of the file.
func LoadConfigFromEnv ¶
func LoadConfigFromEnv() Config
LoadConfigFromEnv applies CHATCLI_SCHEDULER_* overrides on top of DefaultConfig().
type Daemon ¶
type Daemon struct {
// contains filtered or unexported fields
}
Daemon is the long-lived server binding the scheduler to a UNIX socket. Built from an existing Scheduler so tests can inject a mock.
func (*Daemon) Run ¶
Run starts the daemon and blocks until ctx is cancelled or the process receives SIGINT/SIGTERM. Intended entry point from the "chatcli daemon start" subcommand.
type DaemonStats ¶
type DaemonStats struct {
Version string `json:"version"`
Started time.Time `json:"started_at"`
Uptime string `json:"uptime"`
JobsActive int `json:"jobs_active"`
QueueDepth int `json:"queue_depth"`
WALSegments int `json:"wal_segments"`
Connections int `json:"connections"`
Breakers map[string]BreakerState `json:"breakers,omitempty"`
}
DaemonStats is returned for Kind=stats.
type EvalEnv ¶
type EvalEnv struct {
Logger *zap.Logger
// Bridge is an optional interface for plug-ins that need to reach
// into the host ChatCLI (for example to read a K8s kubeconfig path).
// See CLIBridge in tool_adapter.go for the current contract.
Bridge CLIBridge
// DangerousConfirmed mirrors Job.DangerousConfirmed. Wait
// conditions that shell out (k8s/llm/regex/shell_exit) must
// forward it to Bridge.RunShell so a job admitted with --i-know
// at enqueue passes its fire-time recheck on the wait predicate
// too — not just on the action.
DangerousConfirmed bool
}
EvalEnv bundles runtime dependencies an evaluator may need (logger, the chatcli bridge for K8s client reuse, etc.). Passed by value so the plug-in doesn't accidentally hold a pointer across turns.
type EvalOutcome ¶
type EvalOutcome struct {
Satisfied bool
// Transient means the evaluator is confident the check can be retried
// (network blip, temporary HTTP error). Persistent errors (bad URL,
// unknown k8s resource) bubble up via Err and drive the breaker.
Transient bool
// Details is shown in the /jobs show output to explain a poll result.
Details string
Err error
}
EvalOutcome is the result of a single condition evaluation.
type Event ¶
type Event struct {
Type EventType `json:"type"`
Timestamp time.Time `json:"timestamp"`
JobID JobID `json:"job_id,omitempty"`
Name string `json:"name,omitempty"`
Owner Owner `json:"owner,omitempty"`
Status JobStatus `json:"status,omitempty"`
Outcome Outcome `json:"outcome,omitempty"`
Message string `json:"message,omitempty"`
Data map[string]any `json:"data,omitempty"`
Execution *ExecutionResult `json:"execution,omitempty"`
}
Event is the payload fan-out. JSON-serialized for IPC + audit; bus subscribers receive the struct directly.
func (Event) WithExecution ¶
func (e Event) WithExecution(r ExecutionResult) Event
WithExecution attaches an ExecutionResult (typically on job.completed or job.failed).
func (Event) WithMessage ¶
WithMessage adds a human-readable message.
type EventType ¶
type EventType string
EventType enumerates the scheduler-specific events. All values are strings so they round-trip cleanly through JSON (IPC, audit).
const ( EventJobCreated EventType = "job.created" EventJobScheduled EventType = "job.scheduled" EventJobFired EventType = "job.fired" EventJobWaitStarted EventType = "job.wait_started" EventJobWaitTick EventType = "job.wait_tick" EventJobWaitSatisfied EventType = "job.wait_satisfied" EventJobRunning EventType = "job.running" EventJobCompleted EventType = "job.completed" EventJobFailed EventType = "job.failed" EventJobTimedOut EventType = "job.timed_out" EventJobCancelled EventType = "job.cancelled" EventJobSkipped EventType = "job.skipped" EventJobRetryQueued EventType = "job.retry_queued" EventJobPaused EventType = "job.paused" EventJobResumed EventType = "job.resumed" EventJobDependency EventType = "job.dependency_resolved" EventBreakerOpened EventType = "breaker.opened" EventBreakerHalfOpen EventType = "breaker.half_open" EventBreakerClosed EventType = "breaker.closed" EventDaemonStarted EventType = "daemon.started" EventDaemonStopped EventType = "daemon.stopped" )
type ExecEnv ¶
type ExecEnv struct {
Logger *zap.Logger
Bridge CLIBridge
// Job is a read-only summary of the job being executed, useful for
// error messages ("action failed for job X").
Job JobSummary
// Enqueue lets an executor schedule follow-up jobs (e.g. ParkPoll
// re-enqueues itself on every tick, fans out to AgentResume on
// success/timeout). The dispatcher injects a closure bound to the
// live Scheduler so executors don't need to import it themselves.
// Nil in tests that don't exercise self-rescheduling.
Enqueue func(ctx context.Context, job *Job) (*Job, error)
}
ExecEnv bundles dependencies action executors may need. Mirrors EvalEnv but distinct because actions often need the full CLI bridge (run a slash command, invoke an agent, fire a hook).
type ExecutionResult ¶
type ExecutionResult struct {
// CycleNum is the recurring-schedule cycle this execution belongs
// to (1 = first cycle). Always 0 for one-shot jobs. Lets /jobs logs
// group attempts within a cycle and separate cycles visually.
CycleNum int `json:"cycle,omitempty"`
AttemptNum int `json:"attempt"`
StartedAt time.Time `json:"started_at"`
FinishedAt time.Time `json:"finished_at"`
Duration time.Duration `json:"duration_ns"`
Outcome Outcome `json:"outcome"`
Output string `json:"output,omitempty"`
// Error holds the stringified error message; we keep it as a
// string (not an error value) because ExecutionResult travels
// through JSON (WAL, IPC, audit log).
Error string `json:"error,omitempty"`
// Tokens / Cost surface LLM-action cost to /jobs show.
Tokens int `json:"tokens,omitempty"`
Cost float64 `json:"cost_usd,omitempty"`
// ConditionSatisfied is true when a wait_until job's condition
// flipped on this poll. Left empty for non-wait executions.
ConditionSatisfied bool `json:"condition_satisfied,omitempty"`
ConditionDetails string `json:"condition_details,omitempty"`
}
ExecutionResult captures one attempt — either a wait-condition poll result (from an on_condition schedule) or an action execution. Stored in Job.History as a ring buffer.
func (ExecutionResult) Succeeded ¶
func (r ExecutionResult) Succeeded() bool
Succeeded reports whether the execution ended in OutcomeSuccess. Helper for hook event payloads and audit queries.
type Frame ¶
type Frame struct {
ID string `json:"id,omitempty"`
Kind FrameKind `json:"kind"`
Owner *Owner `json:"owner,omitempty"`
Input json.RawMessage `json:"input,omitempty"`
Result json.RawMessage `json:"result,omitempty"`
Error string `json:"error,omitempty"`
Event *Event `json:"event,omitempty"`
TS time.Time `json:"ts,omitempty"`
}
Frame is the on-wire envelope.
type FrameKind ¶
type FrameKind string
FrameKind classifies a frame.
const ( KindPing FrameKind = "ping" KindEnqueue FrameKind = "enqueue" KindCancel FrameKind = "cancel" KindPause FrameKind = "pause" KindResume FrameKind = "resume" KindQuery FrameKind = "query" KindList FrameKind = "list" KindSnapshot FrameKind = "snapshot" KindSubscribe FrameKind = "subscribe" KindStats FrameKind = "stats" KindBye FrameKind = "bye" KindOK FrameKind = "ok" KindError FrameKind = "error" KindEvent FrameKind = "event" )
type Job ¶
type Job struct {
// ─── Identity ──────────────────────────────────────────
ID JobID `json:"id"`
Name string `json:"name"`
Owner Owner `json:"owner"`
Tags map[string]string `json:"tags,omitempty"`
Version int `json:"version"`
// ─── Scheduling ────────────────────────────────────────
Schedule Schedule `json:"schedule"`
Action Action `json:"action"`
Wait *WaitSpec `json:"wait,omitempty"`
// ─── DAG ───────────────────────────────────────────────
DependsOn []JobID `json:"depends_on,omitempty"`
Triggers []JobID `json:"triggers,omitempty"`
// ParentID is non-empty for jobs spawned by a Triggers edge, so
// /jobs tree can render the provenance.
ParentID JobID `json:"parent_id,omitempty"`
// ─── Execution budget ──────────────────────────────────
Budget Budget `json:"budget,omitempty"`
TTL time.Duration `json:"ttl,omitempty"`
// ─── State machine ─────────────────────────────────────
Status JobStatus `json:"status"`
NextFireAt time.Time `json:"next_fire_at,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
FinishedAt time.Time `json:"finished_at,omitempty"`
Transitions []TransitionReason `json:"transitions,omitempty"`
PauseReason string `json:"pause_reason,omitempty"`
CancelReason string `json:"cancel_reason,omitempty"`
// ─── Execution history ─────────────────────────────────
LastResult *ExecutionResult `json:"last_result,omitempty"`
History []ExecutionResult `json:"history,omitempty"`
HistoryLimit int `json:"history_limit,omitempty"`
Attempts int `json:"attempts,omitempty"`
// CycleCount is incremented on a recurring template every time it
// spawns a child cycle. Always zero on a non-recurring or one-shot
// (cycle) Job — it identifies templates and gives /jobs show a
// cheap count of how many fires have happened so far.
CycleCount int `json:"cycle_count,omitempty"`
// ─── Flags ─────────────────────────────────────────────
// DangerousConfirmed marks a job whose action is otherwise filtered
// by the shell safety filter but the creator passed --i-know.
DangerousConfirmed bool `json:"dangerous_confirmed,omitempty"`
// Description is a free-form label shown in `/jobs show`.
Description string `json:"description,omitempty"`
// contains filtered or unexported fields
}
Job is the unit of scheduling. See the state machine in state_machine.go for legal transitions.
func NewJob ¶
NewJob builds a Job with sensible defaults. The Validate pass runs inside Scheduler.Enqueue; callers don't need to invoke it directly.
func (*Job) Clone ¶
Clone returns a deep-ish copy suitable for returning to callers via `/jobs show` / IPC without exposing the internal mutex. Maps and slices are duplicated so mutations on the copy don't leak into the scheduler's working set.
type JobID ¶
type JobID string
JobID is the globally unique identifier. Derived from sha256(name|owner|created_nonce) truncated to 16 hex chars. See DeriveJobID in id.go. Stable across retries — re-Enqueue of the same (name, owner) produces the same JobID and is an idempotent no-op.
func DeriveJobID ¶
DeriveJobID produces a deterministic ID from the given components. nonce may be any string that disambiguates re-submissions (typically the created-at timestamp as RFC3339Nano). If nonce is empty a UUID is used to guarantee uniqueness.
func NewJobID ¶
func NewJobID() JobID
NewJobID returns a random, high-entropy ID. Used when determinism is undesirable (e.g. spawning a child job from a Triggers edge — each child gets a fresh ID even if the parent reruns).
type JobStatus ¶
type JobStatus string
JobStatus is the externally observable lifecycle state. Transitions are enforced by stateMachine.Transition; see state_machine.go.
const ( // StatusPending — job is registered, waiting for its NextFireAt. StatusPending JobStatus = "pending" // StatusBlocked — job is waiting for one or more DependsOn jobs. StatusBlocked JobStatus = "blocked" // StatusWaiting — poll loop active, evaluating a wait condition. StatusWaiting JobStatus = "waiting" // StatusRunning — action is executing. StatusRunning JobStatus = "running" // StatusPaused — user asked to hold; no evaluation until Resume. StatusPaused JobStatus = "paused" // StatusCompleted — action succeeded. StatusCompleted JobStatus = "completed" // StatusFailed — action (or wait) returned an error. StatusFailed JobStatus = "failed" // StatusCancelled — user or owner asked to stop mid-flight. StatusCancelled JobStatus = "cancelled" // StatusTimedOut — wait condition never satisfied within WaitTimeout // and the timeout behavior was "fail". StatusTimedOut JobStatus = "timed_out" // StatusSkipped — missed-fire policy decided to drop the occurrence. StatusSkipped JobStatus = "skipped" )
func (JobStatus) IsActive ¶
IsActive reports whether the job currently occupies a worker slot or is otherwise doing work. Used for queue depth metrics.
func (JobStatus) IsTerminal ¶
IsTerminal reports whether no further transitions are possible.
type JobSummary ¶
type JobSummary struct {
ID JobID `json:"id"`
Name string `json:"name"`
Owner Owner `json:"owner"`
Status JobStatus `json:"status"`
Type string `json:"type"`
NextFireAt time.Time `json:"next_fire_at,omitempty"`
LastOutcome Outcome `json:"last_outcome,omitempty"`
Attempts int `json:"attempts,omitempty"`
Description string `json:"description,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
Tags []string `json:"tags,omitempty"`
// DangerousConfirmed mirrors Job.DangerousConfirmed so action
// executors (notably action/shell.go) can pass it through to
// CLIBridge.RunShell at fire time. Without this, a job that was
// admitted to the queue with --i-know / i_know:true still failed
// its fire-time policy re-check because the bridge had no way to
// see the per-job authorization.
DangerousConfirmed bool `json:"dangerous_confirmed,omitempty"`
}
JobSummary is the compact view returned by List and the status line. Stripping history and transitions keeps the JSON small for IPC.
type ListFilter ¶
type ListFilter struct {
Owner *Owner
Statuses []JobStatus
Tag string
NameSubstr string
IncludeTerminal bool
}
ListFilter is an OR-of-conjunctions filter for List.
type Metrics ¶
type Metrics struct {
JobsCreated *prometheus.CounterVec // label: owner_kind, action_type
JobsFired *prometheus.CounterVec // label: outcome, action_type
WaitChecks *prometheus.CounterVec // label: condition_type, satisfied
WaitDuration *prometheus.HistogramVec // label: condition_type
ActionDuration *prometheus.HistogramVec // label: action_type, outcome
QueueDepth prometheus.Gauge
ActiveJobs prometheus.Gauge
BreakerState *prometheus.GaugeVec // label: kind, key
RetryCount *prometheus.CounterVec // label: attempt (bucketed)
EnqueueErrors *prometheus.CounterVec // label: reason (rate_limited, full, invalid)
WALSegments prometheus.Gauge
AuditWrites prometheus.Counter
DaemonConnections prometheus.Gauge
}
Metrics is the scheduler's Prometheus surface. Use GetMetrics() to obtain the package singleton — the counters are process-wide so a single Scheduler per process is the normal case.
func GetMetrics ¶
func GetMetrics() *Metrics
GetMetrics returns the package-wide singleton, constructing it on first call. Subsequent calls return the same instance so registering twice doesn't panic with Prometheus's duplicate-metric error.
type MissPolicy ¶
type MissPolicy string
MissPolicy decides how to handle a fire window that passed while the scheduler was not running (laptop asleep, daemon crashed).
const ( // MissFireOnce — run once for the missed window, regardless of // how many cron ticks happened. Default. MissFireOnce MissPolicy = "fire_once" // MissFireAll — run once per missed tick (noisy; opt-in). MissFireAll MissPolicy = "fire_all" // MissSkip — drop the missed window entirely. MissSkip MissPolicy = "skip" )
type Outcome ¶
type Outcome string
Outcome classifies a single ExecutionResult. Distinct from JobStatus because a job may have many executions (cron, retries, interval) and each one has its own Outcome.
type Owner ¶
type Owner struct {
Kind OwnerKind `json:"kind"`
// ID is the owner-specific identifier (session id for user,
// agent name for agent, worker call id for worker).
ID string `json:"id"`
// Tag is a free-form label for grouping in the UI. Optional.
Tag string `json:"tag,omitempty"`
}
Owner identifies the principal responsible for a job. Used for authorization (cancel / pause) and filtering (`/jobs list --owner me`).
type OwnerKind ¶
type OwnerKind string
OwnerKind classifies who created a job.
const ( // OwnerUser — direct /schedule or /wait invocation by the human. OwnerUser OwnerKind = "user" // OwnerAgent — created inside the ReAct loop via a tool call. OwnerAgent OwnerKind = "agent" // OwnerWorker — created by a subagent (worker dispatcher). OwnerWorker OwnerKind = "worker" // OwnerSystem — scheduler itself (e.g. retry follow-ups). OwnerSystem OwnerKind = "system" // OwnerHook — triggered by a hook firing. OwnerHook OwnerKind = "hook" )
type PruneFilter ¶ added in v1.109.1
type PruneFilter struct {
Statuses []JobStatus // empty = every terminal status
Owner *Owner // nil = any owner
OlderThan time.Duration
NameSubstr string
}
PruneFilter narrows which terminal jobs Prune deletes. Zero value matches every terminal job (succeeded + failed + cancelled + timed_out + skipped). Combine fields to narrow further.
type RemoteClient ¶
type RemoteClient struct {
// contains filtered or unexported fields
}
RemoteClient is a thin IPC client talking to a Daemon.
func Dial ¶
func Dial(socketPath string) (*RemoteClient, error)
Dial connects to the daemon at socketPath.
func (*RemoteClient) Enqueue ¶
func (c *RemoteClient) Enqueue(ctx context.Context, owner Owner, in ToolInput) (*ToolOutput, error)
Enqueue sends a schedule_job request.
func (*RemoteClient) List ¶
func (c *RemoteClient) List(ctx context.Context, filter ListFilter) ([]JobSummary, error)
List fetches summaries.
func (*RemoteClient) Ping ¶
func (c *RemoteClient) Ping(ctx context.Context) error
Ping confirms the daemon is responsive.
func (*RemoteClient) Snapshot ¶
func (c *RemoteClient) Snapshot(ctx context.Context) error
Snapshot forces the daemon to write a snapshot now.
func (*RemoteClient) Stats ¶
func (c *RemoteClient) Stats(ctx context.Context) (DaemonStats, error)
Stats returns the daemon's current runtime stats.
type Schedule ¶
type Schedule struct {
Kind ScheduleKind `json:"kind"`
ExactTime time.Time `json:"exact_time,omitempty"`
Relative time.Duration `json:"relative,omitempty"`
Cron string `json:"cron,omitempty"`
Interval time.Duration `json:"interval,omitempty"`
MissPolicy MissPolicy `json:"miss_policy,omitempty"`
// Timezone, if set, adjusts cron evaluation. Go time.Location name
// (IANA) — "Local", "UTC", "America/Sao_Paulo", etc. Empty = Local.
Timezone string `json:"timezone,omitempty"`
// contains filtered or unexported fields
}
Schedule describes when a Job should next be considered for execution. Exactly one of ExactTime / Relative / Cron / Interval is used per Kind; the others are zero values.
func ParseScheduleDSL ¶
ParseScheduleDSL interprets a short string as a Schedule.
func (*Schedule) IsRecurring ¶
IsRecurring reports whether the schedule can fire more than once.
func (*Schedule) Next ¶
Next returns the time at which this Schedule should fire strictly after `after`. createdAt is used to anchor relative/interval. When no further fires are possible (absolute schedule already passed), returns zero time.
Next is deterministic: given the same inputs, the result is the same. The scheduler main loop calls it once per transition.
type ScheduleKind ¶
type ScheduleKind string
ScheduleKind classifies how Next is computed.
const ( // ScheduleAbsolute fires once at ExactTime and is terminal. ScheduleAbsolute ScheduleKind = "absolute" // ScheduleRelative fires once at created_at + Relative. ScheduleRelative ScheduleKind = "relative" // ScheduleCron fires on every Cron match until cancelled. ScheduleCron ScheduleKind = "cron" // ScheduleInterval fires every Interval from first fire. ScheduleInterval ScheduleKind = "interval" // ScheduleOnCondition has no time component — Next returns now and // the wait-condition gate does the work. ScheduleOnCondition ScheduleKind = "on_condition" // ScheduleManual never fires on its own; it's triggered by another // job's Triggers edge. Used for DAG leaf nodes. ScheduleManual ScheduleKind = "manual" )
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler is the top-level engine. Safe for concurrent use.
func New ¶
New constructs a Scheduler from cfg + bridge. Call RegisterBuiltins to install the standard evaluators/executors, then Start to run.
bridge may be nil; a noopBridge is substituted so the scheduler still boots (useful for the daemon's bootstrap phase before a CLI attaches).
func (*Scheduler) Actions ¶
func (s *Scheduler) Actions() *ActionRegistry
Actions returns the action registry.
func (*Scheduler) Cancel ¶
Cancel marks the job cancelled. Running jobs are notified via ctx cancellation (their action executor sees ctx.Done() fire).
Locking discipline: the job lock is held only during the mutation; emit() is invoked outside the lock because Event.WithJob reacquires the same mutex to snapshot job fields and would otherwise deadlock.
func (*Scheduler) Conditions ¶
func (s *Scheduler) Conditions() *ConditionRegistry
Conditions returns the condition registry so callers may add extensions before Start.
func (*Scheduler) DrainAndShutdown ¶
DrainAndShutdown stops accepting new Enqueues, waits for in-flight work (up to timeout), then closes the queue + WAL + audit. Pending jobs remain in the WAL and will be replayed on next Start.
func (*Scheduler) Enqueue ¶
Enqueue admits a new job. On success the returned Job is a clone of the one held inside the scheduler — callers may inspect but not mutate.
func (*Scheduler) FindByName ¶
FindByName returns the live (non-terminal) job for a name, if any.
func (*Scheduler) List ¶
func (s *Scheduler) List(filter ListFilter) []JobSummary
List returns summaries of every known job matching the filter. Defaults: non-terminal only, sorted by NextFireAt then CreatedAt.
func (*Scheduler) Prune ¶ added in v1.109.1
func (s *Scheduler) Prune(filter PruneFilter) ([]JobID, error)
Prune removes terminal jobs that match the filter. Returns the list of removed job IDs. Active jobs (pending/running/waiting/paused) are never touched — use Cancel for those.
Locking: matching is done under s.mu, deletion is done in-place inside the same critical section to keep the WAL ack consistent with the in-memory map. WAL acks happen outside the map lock to avoid blocking other operations on slow disks.
func (*Scheduler) Query ¶
Query returns a cloned snapshot of the job (including history), or ErrJobNotFound.
func (*Scheduler) SetBridge ¶
SetBridge swaps the CLIBridge. Used by daemon mode when a CLI attaches. Safe to call while the scheduler is running.
func (*Scheduler) Snapshot ¶
Snapshot triggers an immediate on-disk snapshot. Exposed for /jobs gc and daemon IPC "snapshot" calls.
type SchedulerDeps ¶
type SchedulerDeps struct {
EventBus *bus.MessageBus
Hooks *hooks.Manager
}
SchedulerDeps bundles the optional external dependencies Scheduler uses for observability and coordination. All fields are optional; a nil value means "skip that channel".
type ShellPolicy ¶
type ShellPolicy int
ShellPolicy classifies a shell command's security disposition as reported by the host CoderMode policy manager.
const ( // ShellPolicyAllow — command is on the allowlist (or is a known // read-only command like `kubectl get`, `git status`, etc.). // Scheduler admits the job without further questions. ShellPolicyAllow ShellPolicy = iota // ShellPolicyAsk — command would require interactive approval in // coder mode. The scheduler never prompts (daemon/autonomous // context); it rejects the enqueue unless the job is marked // DangerousConfirmed (user used --i-know, or an agent tool call // passed i_know:true with explicit upstream blessing). ShellPolicyAsk // ShellPolicyDeny — command is on the denylist. Always rejected // regardless of flags — --i-know CANNOT override a deny. ShellPolicyDeny )
func (ShellPolicy) IsTerminal ¶
func (p ShellPolicy) IsTerminal() bool
IsTerminal reports whether this classification by itself is enough to decide admission (Allow or Deny). Ask is non-terminal — the caller must consult DangerousConfirmed.
func (ShellPolicy) String ¶
func (p ShellPolicy) String() string
String makes the policy log-friendly.
type Suggestion ¶
Suggestion is a (text, description) pair consumed by the top-level completer. Kept minimal on purpose so the completer can adapt to future terminal libraries.
func CommandSuggestions ¶
func CommandSuggestions(command string) []Suggestion
CommandSuggestions returns the top-level /schedule /wait /jobs subcommands.
func JobsListFlags ¶
func JobsListFlags() []Suggestion
JobsListFlags returns the flag suggestions for /jobs list.
type TimeoutBehavior ¶
type TimeoutBehavior string
TimeoutBehavior decides what happens when a wait-until loop runs out of time or poll budget.
const ( // TimeoutFail — mark job StatusTimedOut. Default. TimeoutFail TimeoutBehavior = "fail" // TimeoutFireAnyway — run the Action even though the condition // never turned true. Useful for "cleanup regardless" semantics. TimeoutFireAnyway TimeoutBehavior = "fire_anyway" // TimeoutFallback — run WaitSpec.Fallback (a different Action) and // then fail. Used to notify when a deployment never came up. TimeoutFallback TimeoutBehavior = "fallback" )
type ToolAdapter ¶
type ToolAdapter struct {
// contains filtered or unexported fields
}
ToolAdapter exposes the scheduler methods as JSON-in/JSON-out callable tools. Used by the ReAct loop and (via IPC) by the daemon.
func NewToolAdapter ¶
func NewToolAdapter(s *Scheduler) *ToolAdapter
NewToolAdapter builds the adapter.
func (*ToolAdapter) ScheduleJob ¶
ScheduleJob implements the schedule_job tool.
type ToolInput ¶
type ToolInput struct {
// schedule_job / wait_until inputs share most fields.
Name string `json:"name,omitempty"`
Description string `json:"description,omitempty"`
Tags map[string]string `json:"tags,omitempty"`
// Schedule.
When string `json:"when,omitempty"` // DSL string
Schedule *Schedule `json:"schedule,omitempty"` // explicit
// Action.
Do string `json:"do,omitempty"`
Action *Action `json:"action,omitempty"`
// Wait.
Until string `json:"until,omitempty"`
Wait *WaitSpec `json:"wait,omitempty"`
// Budget overrides.
Timeout string `json:"timeout,omitempty"`
PollInterval string `json:"poll,omitempty"`
MaxPolls int `json:"max_polls,omitempty"`
WaitTimeout string `json:"wait_timeout,omitempty"`
MaxRetries int `json:"max_retries,omitempty"`
OnTimeout string `json:"on_timeout,omitempty"`
// DAG.
DependsOn []string `json:"depends_on,omitempty"`
Triggers []string `json:"triggers,omitempty"`
// TTL.
TTL string `json:"ttl,omitempty"`
// async for wait_until.
Async bool `json:"async,omitempty"`
// IKnow pre-authorizes shell commands that would normally hit
// ShellPolicyAsk at enqueue time. When true, the resulting Job
// has DangerousConfirmed=true and passes preflight even on
// Ask classifications. Denylist rules still bloquean — IKnow
// cannot override Deny. Both the user (`--i-know` flag) and
// agents (`i_know` field in the @scheduler tool call) may set
// this; the authorization decision already happened at a higher
// layer (the user ran /agent to spawn the agent, or the user
// ran /schedule directly).
IKnow bool `json:"i_know,omitempty"`
// query/list/cancel inputs.
ID string `json:"id,omitempty"`
Filter ListFilter `json:"filter,omitempty"`
Reason string `json:"reason,omitempty"`
}
ToolInput is the JSON shape the ReAct loop passes in.
type ToolOutput ¶
type ToolOutput struct {
OK bool `json:"ok"`
JobID JobID `json:"job_id,omitempty"`
Status JobStatus `json:"status,omitempty"`
Summary *JobSummary `json:"summary,omitempty"`
Job *Job `json:"job,omitempty"`
Jobs []JobSummary `json:"jobs,omitempty"`
Outcome Outcome `json:"outcome,omitempty"`
Details string `json:"details,omitempty"`
Error string `json:"error,omitempty"`
}
ToolOutput is the normalized JSON shape returned to the ReAct loop.
type TransitionReason ¶
TransitionReason carries human-readable context for audit/event. Separated from the status because the same status may be reached via different paths (e.g. StatusFailed after action error vs after circuit breaker rejection).
type WaitSpec ¶
type WaitSpec struct {
Condition Condition `json:"condition"`
OnTimeout TimeoutBehavior `json:"on_timeout,omitempty"`
// Fallback is consulted when OnTimeout=TimeoutFallback.
Fallback *Action `json:"fallback,omitempty"`
}
WaitSpec optionally gates an Action behind a polled condition. When Wait is non-nil on a Job, the scheduler enters StatusWaiting at NextFireAt and stays there, evaluating WaitSpec.Condition every PollInterval, until the condition is satisfied or the budget runs out.
Source Files
¶
- audit.go
- breaker.go
- bridge.go
- client.go
- completer.go
- config.go
- daemon.go
- dispatcher.go
- doc.go
- errors.go
- events.go
- id.go
- ipc.go
- job.go
- metrics.go
- parser.go
- policy.go
- queue.go
- rate_limit.go
- registry.go
- render.go
- replay.go
- retry.go
- schedule.go
- scheduler.go
- shell_policy.go
- snapshot.go
- state_machine.go
- tool_adapter.go
- types.go
- wal.go
Directories
¶
| Path | Synopsis |
|---|---|
|
* AgentResume — fires when a parked agent's wait condition is satisfied * and the interactive ReAct loop should re-enter.
|
* AgentResume — fires when a parked agent's wait condition is satisfied * and the interactive ReAct loop should re-enter. |
|
* Package builtins wires the built-in condition evaluators and action * executors into a Scheduler.
|
* Package builtins wires the built-in condition evaluators and action * executors into a Scheduler. |
|
* Package condition: built-in evaluators registry.
|
* Package condition: built-in evaluators registry. |