runner

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 7, 2026 License: MIT Imports: 21 Imported by: 0

Documentation

Overview

Package runner implements the cloud-mode iterion runner pod. It pulls RunMessages from the NATS JetStream queue, claims a distributed lease, hydrates the workflow IR, and executes runs against the Mongo+S3 store.

One runner pod handles one in-flight run at a time (MaxAckPending=1 on the JetStream consumer); horizontal scale comes from spawning more pods (KEDA scales on lag — see plan §F T-36 runner-keda-scaledobject.yaml).

Cloud-ready plan §F (T-27, T-28, T-29).

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	NATS              *natsq.Conn
	Store             store.RunStore
	RunnerID          string
	WorkDir           string        // base directory for per-run workspaces
	HeartbeatInterval time.Duration // how often to refresh the NATS KV lease
	PendingPoll       time.Duration // how often to refresh nats_pending_messages (0 = 15s)
	FetchWait         time.Duration // long-poll wait per fetch
	Logger            *iterlog.Logger
	// Metrics, when non-nil, receives counters/gauges updates from the
	// runner loop (in-flight runs, durations, heartbeat errors, NATS
	// queue depth, LLM token usage). Nil-safe: passing nil disables
	// metrics emission without changing the loop's behaviour, useful
	// for unit tests and the local-mode dev runner.
	Metrics *metrics.Registry
}

Config is the runner bootstrap.

type Runner

type Runner struct {
	// contains filtered or unexported fields
}

Runner is the long-running consumer loop.

func New

func New(ctx context.Context, cfg Config) (*Runner, error)

New builds a runner from the supplied dependencies and creates the JetStream consumer. The actual loop starts via Run.

func (*Runner) Run

func (r *Runner) Run(ctx context.Context) error

Run drains the queue until ctx is cancelled. Each iteration fetches one message, processes it synchronously, and acks (or naks/terms on failure). Returns ctx.Err() when shut down cleanly.

func (*Runner) Shutdown

func (r *Runner) Shutdown(ctx context.Context) error

Shutdown signals the loop to stop fetching new messages and waits for the in-flight run (if any) to be cancelled, then republishes its delivery so a sibling pod can pick it up. Plan §F T-28.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL