Documentation
¶
Overview ¶
Package batches models and persists batch dispatch decisions.
A Batch records only *dispatch decisions* — which plan entry got which run_id, or why it was skipped. It deliberately does NOT duplicate run outcomes (status/exit_code/logs/artifacts): those live solely in the runs store and are joined live at read time. This single-source-of-truth split avoids double-write inconsistency between batches.json and runs.json.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ErrBatchNotFound = errors.New("batch not found")
ErrBatchNotFound is returned by Get / Update when the requested batch ID does not exist. Sentinel so callers can branch with errors.Is (mirrors runs.ErrRunNotFound).
Functions ¶
This section is empty.
Types ¶
type Batch ¶
type Batch struct {
ID string `json:"id"`
PlanHash string `json:"plan_hash"`
Staging string `json:"staging"`
CreatedBy string `json:"created_by"`
CreatedAt time.Time `json:"created_at"`
ExpectedCount int `json:"expected_count"`
Source string `json:"source"` // "plan:<path>" | "inline"
Items []*Item `json:"items"`
}
Batch is the persisted record of one batch dispatch.
type Item ¶
type Item struct {
Name string `json:"name"`
Dir string `json:"dir"`
Selector string `json:"selector"` // "machine:<name>" | "tag:<tag>"
State ItemState `json:"state"`
RunID string `json:"run_id,omitempty"`
Machine string `json:"machine,omitempty"` // assigned machine name
ErrorCode string `json:"error_code,omitempty"`
}
Item is one plan entry's dispatch decision within a batch.
type ItemState ¶
type ItemState string
ItemState is the dispatch state of a single batch item. It is distinct from a run's Status: a dispatched item's run outcome is joined from the runs store.
const ( // ItemQueued: not yet dispatched (awaiting an idle machine). Transient — // the backfill worker drives every item out of this state. ItemQueued ItemState = "queued" // ItemDispatched: a run was created for this item; RunID is set. ItemDispatched ItemState = "dispatched" // ItemSkipped: terminally not dispatched (no matching machine, or a // permanent dispatch error); ErrorCode explains why. ItemSkipped ItemState = "skipped" )
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store persists batches to a JSON file with flock-based cross-process locking.
Safe for concurrent use within a process (mu) and across processes (flock). This mirrors runs.Store exactly: every public method takes the in-proc mutex then acquires/releases the on-disk lock.
func (*Store) Create ¶
Create assigns a unique ID to b (overwriting any existing value) and appends it to the store.
func (*Store) FindByPlanHashAndStaging ¶
FindByPlanHashAndStaging returns the first batch matching (planHash, staging), used for idempotent re-dispatch: re-running the same plan against the same staging resumes the existing batch rather than creating a new one. The bool is false when no match exists.
func (*Store) Update ¶
Update persists the item transitions in b against the stored record matching b.ID. Returns ErrBatchNotFound if no such record exists.
It is NOT a blind whole-record overwrite. The foreground `batch run` resume path and the detached `_batch-worker` can both hold a snapshot of the same batch and write back concurrently; a blind overwrite would let a stale snapshot clobber another writer's progress (lost update) or resurrect an already-dispatched item. Instead we re-read the authoritative record under the lock and merge per item: an item transition is monotonic (queued → dispatched/skipped), so we adopt b's version of an item ONLY where the stored item is still queued and b has advanced it. That makes concurrent writers commutative and idempotent. Immutable batch metadata (plan_hash, staging, created_*, source) is fixed at Create and never taken from the snapshot.