batches

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 1, 2026 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

Package batches models and persists batch dispatch decisions.

A Batch records only *dispatch decisions* — which plan entry got which run_id, or why it was skipped. It deliberately does NOT duplicate run outcomes (status/exit_code/logs/artifacts): those live solely in the runs store and are joined live at read time. This single-source-of-truth split avoids double-write inconsistency between batches.json and runs.json.

Index

Constants

This section is empty.

Variables

View Source
var ErrBatchNotFound = errors.New("batch not found")

ErrBatchNotFound is returned by Get / Update when the requested batch ID does not exist. Sentinel so callers can branch with errors.Is (mirrors runs.ErrRunNotFound).

Functions

This section is empty.

Types

type Batch

type Batch struct {
	ID            string    `json:"id"`
	PlanHash      string    `json:"plan_hash"`
	Staging       string    `json:"staging"`
	CreatedBy     string    `json:"created_by"`
	CreatedAt     time.Time `json:"created_at"`
	ExpectedCount int       `json:"expected_count"`
	Source        string    `json:"source"` // "plan:<path>" | "inline"
	Items         []*Item   `json:"items"`
}

Batch is the persisted record of one batch dispatch.

type Item

type Item struct {
	Name      string    `json:"name"`
	Dir       string    `json:"dir"`
	Selector  string    `json:"selector"` // "machine:<name>" | "tag:<tag>"
	State     ItemState `json:"state"`
	RunID     string    `json:"run_id,omitempty"`
	Machine   string    `json:"machine,omitempty"` // assigned machine name
	ErrorCode string    `json:"error_code,omitempty"`
}

Item is one plan entry's dispatch decision within a batch.

type ItemState

type ItemState string

ItemState is the dispatch state of a single batch item. It is distinct from a run's Status: a dispatched item's run outcome is joined from the runs store.

const (
	// ItemQueued: not yet dispatched (awaiting an idle machine). Transient —
	// the backfill worker drives every item out of this state.
	ItemQueued ItemState = "queued"
	// ItemDispatched: a run was created for this item; RunID is set.
	ItemDispatched ItemState = "dispatched"
	// ItemSkipped: terminally not dispatched (no matching machine, or a
	// permanent dispatch error); ErrorCode explains why.
	ItemSkipped ItemState = "skipped"
)

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store persists batches to a JSON file with flock-based cross-process locking.

Safe for concurrent use within a process (mu) and across processes (flock). This mirrors runs.Store exactly: every public method takes the in-proc mutex then acquires/releases the on-disk lock.

func NewStore

func NewStore(stateDir string) (*Store, error)

NewStore prepares the state directory and returns a Store rooted at it.

func (*Store) Create

func (s *Store) Create(b *Batch) error

Create assigns a unique ID to b (overwriting any existing value) and appends it to the store.

func (*Store) FindByPlanHashAndStaging

func (s *Store) FindByPlanHashAndStaging(planHash, staging string) (*Batch, bool)

FindByPlanHashAndStaging returns the first batch matching (planHash, staging), used for idempotent re-dispatch: re-running the same plan against the same staging resumes the existing batch rather than creating a new one. The bool is false when no match exists.

func (*Store) Get

func (s *Store) Get(id string) (*Batch, error)

Get returns the batch with the given ID, or ErrBatchNotFound.

func (*Store) List

func (s *Store) List() ([]*Batch, error)

List returns all batches (in insertion order).

func (*Store) Update

func (s *Store) Update(b *Batch) error

Update persists the item transitions in b against the stored record matching b.ID. Returns ErrBatchNotFound if no such record exists.

It is NOT a blind whole-record overwrite. The foreground `batch run` resume path and the detached `_batch-worker` can both hold a snapshot of the same batch and write back concurrently; a blind overwrite would let a stale snapshot clobber another writer's progress (lost update) or resurrect an already-dispatched item. Instead we re-read the authoritative record under the lock and merge per item: an item transition is monotonic (queued → dispatched/skipped), so we adopt b's version of an item ONLY where the stored item is still queued and b has advanced it. That makes concurrent writers commutative and idempotent. Immutable batch metadata (plan_hash, staging, created_*, source) is fixed at Create and never taken from the snapshot.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL