daemon

package
v0.0.0-...-baecb57 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 14, 2026 License: Apache-2.0 Imports: 35 Imported by: 0

Documentation

Index

Constants

View Source
const (
	FactoryAdmissionReasonEvidenceProviderMissing = "evidence_provider_missing"
	FactoryAdmissionReasonExpiredWorkOrder        = "expired_work_order"
	FactoryAdmissionReasonFutureGeneratedAt       = "future_generated_at"
	FactoryAdmissionReasonRepoStateUnknown        = "repo_state_unknown"
	FactoryAdmissionReasonRepoHeadUnknown         = "repo_head_unknown"
	FactoryAdmissionReasonBaseSHAMismatch         = "base_sha_mismatch"
	FactoryAdmissionReasonDirtyWorktree           = "dirty_worktree"
	FactoryAdmissionReasonTrackedAgents           = "tracked_agents"
	FactoryAdmissionReasonOpenPREvidenceUnknown   = "open_pr_evidence_unknown"
	FactoryAdmissionReasonOpenPROverlap           = "open_pr_overlap"
	FactoryAdmissionReasonMainCIUnknown           = "main_ci_unknown"
	FactoryAdmissionReasonMainCIRed               = "main_ci_red"
	FactoryAdmissionReasonRPIHandoffUnavailable   = "rpi_handoff_unavailable"
)
View Source
const (
	// ProjectionSnapshotDirName lives under .agents/daemon/.
	ProjectionSnapshotDirName = "projections"
	// ProjectionSnapshotPrefix matches files of the form snapshot-<ts>.json so
	// the timestamp encoded in the filename gives a stable chronological sort.
	ProjectionSnapshotPrefix = "snapshot-"
	ProjectionSnapshotSuffix = ".json"
)
View Source
const (
	StoreDirRel         = ".agents/daemon"
	LedgerFileName      = "ledger.jsonl"
	QuarantineDirName   = "quarantine"
	LedgerSchemaVersion = 1
	// DefaultLedgerMaxBytes is the size threshold at which ledger.jsonl is
	// rotated. Operators may override via Store.WithLedgerMaxBytes.
	DefaultLedgerMaxBytes int64 = 50 * 1024 * 1024
	// MaxLedgerLineBytes caps the size of a single ledger event line on
	// replay. Lines that exceed this are recorded as corrupt and skipped, so
	// a single oversized record cannot panic the daemon at startup. Paired
	// with the submission-time cap in server.go (MaxJobSubmissionBytes); the
	// replay cap is intentionally higher because daemon-emitted events can
	// be larger than client-supplied job payloads.
	MaxLedgerLineBytes = 16 * 1024 * 1024
)
View Source
const AgentUpdateVersion = 1

AgentUpdateVersion is the wire-version stamped onto every agent-update event payload. Consumers (projections, transcripts, downstream judges) use it to pin parsing to the schema in schemas/agent-update.schema.json.

soc-y0ct.1 (UW7-1 of soc-bcrn): introduced the agent-update protocol surface.

View Source
const DaemonPlansProjectionSchemaVersion = 1

DaemonPlansProjectionSchemaVersion is the schema version for the daemon-side plans manifest projection.

View Source
const (
	DefaultMutationTokenHeader = "X-AgentOps-Daemon-Token" // #nosec G101 -- HTTP header name, not a credential.
)
View Source
const DefaultTelemetryWindow = 24 * time.Hour
View Source
const DreamJobSpecSchemaVersion = 1
View Source
const FactoryAdmissionJobSpecSchemaVersion = 1
View Source
const MaxJobSubmissionBytes = 1 * 1024 * 1024

MaxJobSubmissionBytes caps how large a /v1/jobs (and /v1/jobs/cancel) request body may be. Submissions larger than this return 413 instead of being decoded, which prevents oversized job payloads from making it into the append-only ledger as events that would later overflow the replay reader. Paired with MaxLedgerLineBytes in store.go (replay tolerates larger lines than submission so daemon-emitted events have headroom).

View Source
const PlansProjectionJobSpecSchemaVersion = 1

PlansProjectionJobSpecSchemaVersion is the schema version for PlansProjectionJobSpec round-trips. Bumped when the spec shape changes.

View Source
const ProjectionSchemaVersion = 1
View Source
const RPIJobSpecSchemaVersion = 1
View Source
const RoutingPolicySchemaVersion = 1
View Source
const SkillInvokeJobSpecSchemaVersion = 1
View Source
const WikiJobSpecSchemaVersion = 1

Variables

View Source
var (
	ErrMutationDenied       = errors.New("daemon mutation denied")
	ErrUnsafeBindAddress    = errors.New("daemon bind address is not loopback")
	ErrUnsafeTokenFileMode  = errors.New("daemon mutation token file mode is too permissive")
	ErrMutationTokenMissing = errors.New("daemon mutation token is missing")
)
View Source
var (
	ErrNoClaimableJobs    = errors.New("daemon queue: no claimable jobs")
	ErrJobAlreadyClaimed  = errors.New("daemon queue: job already claimed")
	ErrClaimFenceMismatch = errors.New("daemon queue: claim token or lease epoch mismatch")
	ErrLeaseExpired       = errors.New("daemon queue: lease expired")
	ErrJobNotFound        = errors.New("daemon queue: job not found")
	ErrFailpoint          = errors.New("daemon queue failpoint")
)
View Source
var (
	ErrRPIExecutorRequired = errors.New("daemon rpi runner: executor is required")
	ErrNoRPIJobs           = errors.New("daemon rpi runner: no claimable RPI jobs")
)

Functions

func AuthorizeMutation

func AuthorizeMutation(r *http.Request, policy MutationPolicy) error

func CanTransitionJobStatus

func CanTransitionJobStatus(from, to JobStatus) bool

func DefaultMutationPathCapabilities

func DefaultMutationPathCapabilities() map[string]MutationCapability

func FormatLedgerTelemetrySummary

func FormatLedgerTelemetrySummary(telemetry LedgerTelemetry) string

func LoadMutationTokenFile

func LoadMutationTokenFile(path string) (string, error)

func MutationRoutesForTest

func MutationRoutesForTest() []string

MutationRoutesForTest returns a copy of the registered mutation paths. Used by TestMutation_AllRegisteredRoutesEnforcePolicy and similar parity checks to assert every tracked route enforces auth.

func NewDaemonRouter

func NewDaemonRouter(store *Store, opts ServerOptions) http.Handler

func NewDbBdSource

func NewDbBdSource(dsn string) (*dbBdSource, error)

NewDbBdSource opens a connection pool for plans.projection bd queries. The caller owns Close(). dsn examples:

  • "root:@tcp(127.0.0.1:3306)/bushido" (from bushido itself)
  • "root:@tcp(100.109.17.108:3306)/bushido" (from Mac via tailnet)

func NewReadOnlyRouter

func NewReadOnlyRouter(store *Store, opts ServerOptions) http.Handler

func ParseCron

func ParseCron(expr string) (cron.Schedule, error)

ParseCron validates a cron expression using the 5-field standard with descriptors (e.g., "0 3 * * *", "@daily"). 6-field expressions with seconds are rejected per pre-mortem amendment B4 (DoS protection: prevents sub-minute schedules).

func RPIPhaseName

func RPIPhaseName(phase int) string

func ValidateDaemonPlansProjection

func ValidateDaemonPlansProjection(projection DaemonPlansProjection) error

ValidateDaemonPlansProjection returns a descriptive error if the projection shape is malformed. Used by replay paths and tests.

func ValidateDreamJobSpec

func ValidateDreamJobSpec(job JobSpec) error

func ValidateDreamMode

func ValidateDreamMode(mode DreamMode) error

func ValidateDreamStage

func ValidateDreamStage(stage DreamStage) error

func ValidateEventType

func ValidateEventType(value EventType) error

func ValidateFactoryAdmissionMode

func ValidateFactoryAdmissionMode(mode FactoryAdmissionMode) error

func ValidateFactoryCIStatus

func ValidateFactoryCIStatus(status FactoryCIStatus) error

func ValidateFactoryDigestPolicy

func ValidateFactoryDigestPolicy(policy FactoryDigestPolicy) error

func ValidateFactoryHandoffKind

func ValidateFactoryHandoffKind(kind FactoryHandoffKind) error

func ValidateFactoryLandingPolicy

func ValidateFactoryLandingPolicy(policy FactoryLandingPolicy) error

func ValidateFactoryMergeDecision

func ValidateFactoryMergeDecision(value FactoryMergeDecision) error

func ValidateFactorySlotStatus

func ValidateFactorySlotStatus(value FactorySlotStatus) error

func ValidateFactoryTargetType

func ValidateFactoryTargetType(targetType FactoryTargetType) error

func ValidateFactoryUnknownEvidencePolicy

func ValidateFactoryUnknownEvidencePolicy(policy FactoryUnknownEvidencePolicy) error

func ValidateFactoryValidationStatus

func ValidateFactoryValidationStatus(value FactoryValidationStatus) error

func ValidateFailureCode

func ValidateFailureCode(value FailureCode) error

func ValidateJobResultStatus

func ValidateJobResultStatus(value JobResultStatus) error

func ValidateJobStatus

func ValidateJobStatus(value JobStatus) error

func ValidateJobType

func ValidateJobType(value JobType) error

func ValidateLeaseState

func ValidateLeaseState(value LeaseState) error

func ValidateLedgerEvent

func ValidateLedgerEvent(event LedgerEvent) error

func ValidateLocalBindAddress

func ValidateLocalBindAddress(addr string) error

func ValidateRPIBackend

func ValidateRPIBackend(backend RPIBackend) error

func ValidateRPIJobSpec

func ValidateRPIJobSpec(job JobSpec) error

func ValidateRPIRegistryProjection

func ValidateRPIRegistryProjection(projection DaemonRPIRegistryProjection) error

func ValidateRecurringJobTemplatePayload

func ValidateRecurringJobTemplatePayload(t RecurringJobTemplate) error

ValidateRecurringJobTemplatePayload applies the same defaults the recurrence supervisor applies at fire time, then validates the materialized job payload.

func ValidateRequestID

func ValidateRequestID(value string) error

func ValidateRoutingAuthority

func ValidateRoutingAuthority(authority RoutingAuthority) error

func ValidateRoutingPolicy

func ValidateRoutingPolicy(policy RoutingPolicy) error

func WriteDaemonPlansProjection

func WriteDaemonPlansProjection(root string, projection DaemonPlansProjection) (string, error)

WriteDaemonPlansProjection writes the plans manifest snapshot atomically: the entries are serialised to a JSONL file under root, written to a tmp file in the same directory, then renamed to the final path. Concurrent writers in the same OutputDir are protected externally via the manifest.lock file lock added by atom-3 (G3); the write call here is single-writer-safe under that contract.

func WriteRPIRegistryProjection

func WriteRPIRegistryProjection(root string, projection DaemonRPIRegistryProjection, writer cliRPI.RunRegistryWriter) error

Types

type AgentUpdateCriterionVerdict

type AgentUpdateCriterionVerdict struct {
	CriterionID  string `json:"criterion_id"`
	Status       string `json:"status"`
	EvidencePath string `json:"evidence_path,omitempty"`
	Notes        string `json:"notes,omitempty"`
	RunID        string `json:"run_id"`
	Timestamp    string `json:"timestamp"`
}

AgentUpdateCriterionVerdict payloads carry a per-criterion judgement. Status is one of "PASS" | "FAIL" | "SKIP". Mirrors $defs/criterion_verdict in schemas/agent-update.schema.json.

type AgentUpdatePhaseComplete

type AgentUpdatePhaseComplete struct {
	PhaseName  string            `json:"phase_name"`
	RunID      string            `json:"run_id"`
	Timestamp  string            `json:"timestamp"`
	Status     string            `json:"status"`
	DurationMs int64             `json:"duration_ms,omitempty"`
	Artifacts  map[string]string `json:"artifacts,omitempty"`
}

AgentUpdatePhaseComplete payloads mark the terminal boundary of a phase. Status is one of "success" | "failure" | "timeout". Mirrors $defs/phase_complete in schemas/agent-update.schema.json.

type AgentUpdatePhaseHandoff

type AgentUpdatePhaseHandoff struct {
	FromPhase  string `json:"from_phase"`
	ToPhase    string `json:"to_phase"`
	RunID      string `json:"run_id"`
	Timestamp  string `json:"timestamp"`
	PacketPath string `json:"packet_path,omitempty"`
}

AgentUpdatePhaseHandoff payloads describe a transition between two phases of the same run. Mirrors $defs/phase_handoff in schemas/agent-update.schema.json.

type AgentUpdatePhaseStart

type AgentUpdatePhaseStart struct {
	PhaseName string         `json:"phase_name"`
	RunID     string         `json:"run_id"`
	Timestamp string         `json:"timestamp"`
	Metadata  map[string]any `json:"metadata,omitempty"`
}

AgentUpdatePhaseStart payloads mark the beginning of a named RPI phase. Mirrors $defs/phase_start in schemas/agent-update.schema.json.

type ArtifactRef

type ArtifactRef struct {
	Path      string `json:"path"`
	SHA256    string `json:"sha256"`
	Size      int64  `json:"size"`
	WrittenAt string `json:"written_at"`
}

ArtifactRef is the ledger-resident identity for an artifact stored by content hash. Path is repository-relative so replay can reconstruct the compatibility artifact map without consulting local runtime state.

func (ArtifactRef) Validate

func (r ArtifactRef) Validate() error

type ArtifactStoreOptions

type ArtifactStoreOptions struct {
	Now func() time.Time
}

type CancelJobInput

type CancelJobInput struct {
	JobID     string
	RequestID RequestID
	Actor     string
	Reason    string
}

CancelJobInput identifies a job cancellation request.

type CancelJobOutcome

type CancelJobOutcome string

CancelJobOutcome reports how a cancellation request affected a job.

const (
	// CancelJobOutcomeCancelled means the queue appended a cancellation event.
	CancelJobOutcomeCancelled CancelJobOutcome = "cancelled"
	// CancelJobOutcomeAlreadyTerminalCompleted means the job was already completed.
	CancelJobOutcomeAlreadyTerminalCompleted CancelJobOutcome = "already_terminal_completed"
	// CancelJobOutcomeAlreadyTerminalFailed means the job was already failed.
	CancelJobOutcomeAlreadyTerminalFailed CancelJobOutcome = "already_terminal_failed"
	// CancelJobOutcomeAlreadyTerminalCancelled means the job was already cancelled.
	CancelJobOutcomeAlreadyTerminalCancelled CancelJobOutcome = "already_terminal_cancelled"
)

type CancelJobRequest

type CancelJobRequest struct {
	RequestID string `json:"request_id,omitempty"`
	JobID     string `json:"job_id"`
	Reason    string `json:"reason,omitempty"`
}

type CancelJobResponse

type CancelJobResponse struct {
	Cancelled        bool             `json:"cancelled"`
	Outcome          CancelJobOutcome `json:"outcome"`
	Job              QueueJobState    `json:"job"`
	ProjectionStatus ProjectionStatus `json:"projection_status"`
	ProjectionLag    ProjectionLag    `json:"projection_lag"`
	DegradedReasons  []string         `json:"degraded_reasons,omitempty"`
}

type CancelJobResult

type CancelJobResult struct {
	Job     QueueJobState    `json:"job"`
	Outcome CancelJobOutcome `json:"outcome"`
}

CancelJobResult returns the job state and cancellation outcome.

type Clock

type Clock interface {
	Now() time.Time
	After(d time.Duration) <-chan time.Time
}

Clock abstracts time for testability. The recurrence supervisor uses Clock so tests can drive ticks deterministically with a fake clock.

type CompleteJobInput

type CompleteJobInput struct {
	JobID        string
	RequestID    RequestID
	ClaimToken   string
	LeaseEpoch   int
	Actor        string
	Artifacts    map[string]string
	ArtifactRefs map[string]ArtifactRef
}

type ContentAddressedArtifactStore

type ContentAddressedArtifactStore struct {
	// contains filtered or unexported fields
}

func NewContentAddressedArtifactStore

func NewContentAddressedArtifactStore(root string, opts ArtifactStoreOptions) *ContentAddressedArtifactStore

func (*ContentAddressedArtifactStore) PutBytes

func (s *ContentAddressedArtifactStore) PutBytes(data []byte) (ArtifactRef, error)

type CorruptRecord

type CorruptRecord struct {
	LineNumber     int    `json:"line_number"`
	Error          string `json:"error"`
	QuarantinePath string `json:"quarantine_path,omitempty"`
}

type CreateScheduleResponse

type CreateScheduleResponse struct {
	Name string `json:"name"`
}

CreateScheduleResponse is the body of a successful POST /v1/schedules.

type CronParseError

type CronParseError struct {
	Original string
	Reason   string
}

CronParseError preserves the operator's original input for actionable errors.

func (*CronParseError) Error

func (e *CronParseError) Error() string

type DaemonPlansProjection

type DaemonPlansProjection struct {
	SchemaVersion int                    `json:"schema_version"`
	ProjectID     string                 `json:"project_id,omitempty"`
	IssuePrefix   string                 `json:"issue_prefix,omitempty"`
	Entries       []PlansProjectionEntry `json:"entries"`
	LastEventID   string                 `json:"last_event_id,omitempty"`
	RebuiltAt     string                 `json:"rebuilt_at,omitempty"`
}

DaemonPlansProjection is the daemon-side wrapper around plans manifest state. Mirrors the DaemonRPIRegistryProjection shape (rpi_registry.go:9): Entries + LastEventID + a SchemaVersion stamp.

func RebuildDaemonPlansProjection

func RebuildDaemonPlansProjection(events []LedgerEvent) (DaemonPlansProjection, error)

RebuildDaemonPlansProjection folds the ledger event slice into the latest plans projection state. Currently the projection is fully rebuilt by the executor on each subscription tick; ledger events are advisory (last-event cursor + degraded-flag carry). atom-2 does NOT replay-build entries from events because plans.projection is a pull-from-bd projection, not an event-sourced one — the source of truth is bd, not the daemon ledger.

type DaemonRPIRegistryProjection

type DaemonRPIRegistryProjection struct {
	States      []cliRPI.RunRegistryState `json:"states"`
	LastEventID string                    `json:"last_event_id,omitempty"`
}

func RebuildRPIRegistryProjection

func RebuildRPIRegistryProjection(events []LedgerEvent) (DaemonRPIRegistryProjection, error)

type DefaultRPIPromptBuilder

type DefaultRPIPromptBuilder struct{}

func (DefaultRPIPromptBuilder) BuildRPIPrompt

type DeleteScheduleResponse

type DeleteScheduleResponse struct {
	Name    string `json:"name"`
	Deleted bool   `json:"deleted"`
}

DeleteScheduleResponse is the body of a successful DELETE /v1/schedules/{name}.

type DreamExecutor

type DreamExecutor struct {
	// contains filtered or unexported fields
}

func NewDreamExecutor

func NewDreamExecutor(opts DreamExecutorOptions) (*DreamExecutor, error)

func (*DreamExecutor) JobTypes

func (e *DreamExecutor) JobTypes() []JobType

func (*DreamExecutor) RunJob

RunJob requires a non-nil ctx; callers passing nil will panic on first use.

type DreamExecutorOptions

type DreamExecutorOptions struct {
	Cwd     string
	RunLoop DreamRunLoopFunc
	Now     func() time.Time
}

type DreamMode

type DreamMode string
const (
	DreamModeDaemon  DreamMode = "daemon"
	DreamModeOneShot DreamMode = "one-shot"
)

type DreamRunJobSpec

type DreamRunJobSpec struct {
	SchemaVersion    int       `json:"schema_version"`
	JobType          JobType   `json:"job_type"`
	DreamRunID       string    `json:"dream_run_id"`
	Goal             string    `json:"goal,omitempty"`
	Mode             DreamMode `json:"mode"`
	OutputDir        string    `json:"output_dir"`
	MaxIterations    int       `json:"max_iterations,omitempty"`
	ExecutionTimeout string    `json:"execution_timeout,omitempty"`
}

func DreamRunJobSpecFromPayload

func DreamRunJobSpecFromPayload(payload map[string]any) (DreamRunJobSpec, error)

func NewDreamRunJobSpec

func NewDreamRunJobSpec(dreamRunID, outputDir string) DreamRunJobSpec

func (DreamRunJobSpec) ToJobSpec

func (spec DreamRunJobSpec) ToJobSpec(jobID string) (JobSpec, error)

func (DreamRunJobSpec) Validate

func (spec DreamRunJobSpec) Validate() error

type DreamRunLoopOptions

type DreamRunLoopOptions struct {
	Cwd           string
	OutputDir     string
	RunID         string
	MaxIterations int
	WarnOnly      bool
	LogWriter     io.Writer
}

type DreamRunLoopResult

type DreamRunLoopResult struct {
	Raw             any  `json:"raw,omitempty"`
	IterationCount  int  `json:"iteration_count"`
	BudgetExhausted bool `json:"budget_exhausted"`
}

type DreamRunsProjection

type DreamRunsProjection struct {
	Runs []JobProjection `json:"runs"`
}

type DreamStage

type DreamStage string
const (
	DreamStageIngest  DreamStage = "ingest"
	DreamStageReduce  DreamStage = "reduce"
	DreamStageMeasure DreamStage = "measure"
	DreamStageCommit  DreamStage = "commit"
	DreamStageReport  DreamStage = "report"
)

type DreamStageEntry

type DreamStageEntry struct {
	Stage       DreamStage `json:"stage"`
	JobID       string     `json:"job_id,omitempty"`
	IterationID string     `json:"iteration_id,omitempty"`
	Required    bool       `json:"required"`
}

type DreamStageJobSpec

type DreamStageJobSpec struct {
	SchemaVersion int        `json:"schema_version"`
	JobType       JobType    `json:"job_type"`
	DreamRunID    string     `json:"dream_run_id"`
	IterationID   string     `json:"iteration_id,omitempty"`
	Iteration     int        `json:"iteration,omitempty"`
	Stage         DreamStage `json:"stage"`
	Mode          DreamMode  `json:"mode"`
	OutputDir     string     `json:"output_dir"`
	CheckpointDir string     `json:"checkpoint_dir,omitempty"`
	ParentJobID   string     `json:"parent_job_id,omitempty"`
}

func DreamStageJobSpecFromPayload

func DreamStageJobSpecFromPayload(payload map[string]any) (DreamStageJobSpec, error)

func NewDreamStageJobSpec

func NewDreamStageJobSpec(dreamRunID, outputDir string, stage DreamStage) DreamStageJobSpec

func (DreamStageJobSpec) ToJobSpec

func (spec DreamStageJobSpec) ToJobSpec(jobID string) (JobSpec, error)

func (DreamStageJobSpec) Validate

func (spec DreamStageJobSpec) Validate() error

type DreamStageManifest

type DreamStageManifest struct {
	SchemaVersion int               `json:"schema_version"`
	DreamRunID    string            `json:"dream_run_id"`
	Mode          DreamMode         `json:"mode"`
	OutputDir     string            `json:"output_dir"`
	Stages        []DreamStageEntry `json:"stages"`
	Metadata      map[string]string `json:"metadata,omitempty"`
}

func DefaultDreamStageManifest

func DefaultDreamStageManifest(dreamRunID, outputDir string) DreamStageManifest

func (DreamStageManifest) Validate

func (manifest DreamStageManifest) Validate() error

type EventType

type EventType string
const (
	EventScheduleCreated EventType = "schedule.created"
	EventScheduleFired   EventType = "schedule.fired"
	EventScheduleSkipped EventType = "schedule.skipped"
	EventScheduleDeleted EventType = "schedule.deleted"
)

Schedule event types are additive event-type vocabulary (LedgerSchemaVersion stays at 1). Older daemon binaries replaying ledgers with these events must skip-and-log instead of erroring — see projections.go's reducer for the forward-compat contract (pre-mortem amendment B3).

const (
	EventJobAccepted           EventType = "job.accepted"
	EventJobClaimed            EventType = "job.claimed"
	EventJobHeartbeat          EventType = "job.heartbeat"
	EventJobLeaseExpired       EventType = "job.lease_expired"
	EventJobCompleted          EventType = "job.completed"
	EventJobFailed             EventType = "job.failed"
	EventJobCancelled          EventType = "job.cancelled"
	EventProjectionMarkedStale EventType = "projection.marked_stale"
	EventProjectionRebuilt     EventType = "projection.rebuilt"

	EventFactoryAdmissionDecided    EventType = "factory.admission_decided"
	EventFactoryJobSubmitted        EventType = "factory.job_submitted"
	EventFactoryJobClaimed          EventType = "factory.job_claimed"
	EventFactoryJobStarted          EventType = "factory.job_started"
	EventFactoryRoutingDecided      EventType = "factory.routing_decided"
	EventFactorySlotAllocated       EventType = "factory.slot_allocated"
	EventFactoryWorktreeAllocated   EventType = "factory.worktree_allocated"
	EventFactoryValidationStarted   EventType = "factory.validation_started"
	EventFactoryValidationCompleted EventType = "factory.validation_completed"
	EventFactoryMergeDecision       EventType = "factory.merge_decision"
	EventFactoryJobTerminal         EventType = "factory.job_terminal"
	EventFactoryYieldObservation    EventType = "factory.yield_observation"

	// Agent-update events: phase-boundary messages emitted by daemon-mode RPI
	// runners (soc-y0ct). Schema: schemas/agent-update.schema.json.
	EventAgentUpdatePhaseStart       EventType = "agent_update.phase_start"
	EventAgentUpdatePhaseComplete    EventType = "agent_update.phase_complete"
	EventAgentUpdateCriterionVerdict EventType = "agent_update.criterion_verdict"
	EventAgentUpdatePhaseHandoff     EventType = "agent_update.phase_handoff"
)

type FactoryAdmissionDecision

type FactoryAdmissionDecision struct {
	SchemaVersion int                     `json:"schema_version"`
	WorkOrderID   string                  `json:"work_order_id"`
	RunID         string                  `json:"run_id"`
	EvaluatedAt   string                  `json:"evaluated_at"`
	Allowed       bool                    `json:"allowed"`
	Reasons       []string                `json:"reasons"`
	LandingPolicy FactoryLandingPolicy    `json:"landing_policy"`
	DigestPolicy  FactoryDigestPolicy     `json:"digest_policy"`
	ChildJobID    string                  `json:"child_job_id,omitempty"`
	ArtifactRefs  map[string]string       `json:"artifact_refs,omitempty"`
	Evidence      FactoryDecisionEvidence `json:"evidence"`
}

func (FactoryAdmissionDecision) Validate

func (decision FactoryAdmissionDecision) Validate() error

type FactoryAdmissionEvaluator

type FactoryAdmissionEvaluator struct {
	Clock    func() time.Time
	Evidence FactoryAdmissionEvidenceProvider
}

func (FactoryAdmissionEvaluator) EvaluateAdmission

func (FactoryAdmissionEvaluator) EvaluateLocalPilot

type FactoryAdmissionEvidenceProvider

type FactoryAdmissionEvidenceProvider interface {
	RepoState(context.Context) (FactoryRepoState, error)
	OpenPRBlockers(context.Context, []string) (FactoryPRBlockerMatrix, error)
	MainCIBaseline(context.Context) (FactoryCIBaselineEvidence, error)
}

type FactoryAdmissionEvidenceProviderFactory

type FactoryAdmissionEvidenceProviderFactory func(FactoryWorkOrder) FactoryAdmissionEvidenceProvider

type FactoryAdmissionExecutor

type FactoryAdmissionExecutor struct {
	// contains filtered or unexported fields
}

func (*FactoryAdmissionExecutor) JobTypes

func (e *FactoryAdmissionExecutor) JobTypes() []JobType

func (*FactoryAdmissionExecutor) RunJob

type FactoryAdmissionExecutorOptions

type FactoryAdmissionExecutorOptions struct {
	Store                   *Store
	Root                    string
	Clock                   func() time.Time
	EvidenceProviderFactory FactoryAdmissionEvidenceProviderFactory
	EnableRPIHandoff        bool
	Actor                   string
}

type FactoryAdmissionJobSpec

type FactoryAdmissionJobSpec struct {
	SchemaVersion int                  `json:"schema_version"`
	JobType       JobType              `json:"job_type"`
	RunID         string               `json:"run_id"`
	Mode          FactoryAdmissionMode `json:"mode"`
	WorkOrder     FactoryWorkOrder     `json:"work_order"`
	Handoff       FactoryHandoff       `json:"handoff,omitempty"`
}

func FactoryAdmissionJobSpecFromPayload

func FactoryAdmissionJobSpecFromPayload(payload map[string]any) (FactoryAdmissionJobSpec, error)

func NewFactoryAdmissionJobSpec

func NewFactoryAdmissionJobSpec(runID string, workOrder FactoryWorkOrder) FactoryAdmissionJobSpec

func (FactoryAdmissionJobSpec) ToJobSpec

func (spec FactoryAdmissionJobSpec) ToJobSpec(jobID string) (JobSpec, error)

func (FactoryAdmissionJobSpec) Validate

func (spec FactoryAdmissionJobSpec) Validate() error

type FactoryAdmissionMode

type FactoryAdmissionMode string
const (
	FactoryAdmissionModeAdmissionOnly FactoryAdmissionMode = "admission-only"
	FactoryAdmissionModeRPIHandoff    FactoryAdmissionMode = "rpi-handoff"
)

type FactoryAdmissionProjection

type FactoryAdmissionProjection struct {
	JobID         string                  `json:"job_id"`
	RunID         string                  `json:"run_id,omitempty"`
	WorkOrderID   string                  `json:"work_order_id"`
	Allowed       bool                    `json:"allowed"`
	Reasons       []string                `json:"reasons,omitempty"`
	LandingPolicy FactoryLandingPolicy    `json:"landing_policy,omitempty"`
	DigestPolicy  FactoryDigestPolicy     `json:"digest_policy,omitempty"`
	ChildJobID    string                  `json:"child_job_id,omitempty"`
	Artifacts     map[string]string       `json:"artifacts,omitempty"`
	Evidence      FactoryDecisionEvidence `json:"evidence"`
	DecidedAt     string                  `json:"decided_at,omitempty"`
	LastEventID   string                  `json:"last_event_id,omitempty"`
}

type FactoryCIBaselineEvidence

type FactoryCIBaselineEvidence struct {
	Known    bool
	Baseline FactoryMainCIBaseline
}

type FactoryCIStatus

type FactoryCIStatus string
const (
	FactoryCIStatusGreen   FactoryCIStatus = "green"
	FactoryCIStatusRed     FactoryCIStatus = "red"
	FactoryCIStatusUnknown FactoryCIStatus = "unknown"
)

type FactoryDecisionEvidence

type FactoryDecisionEvidence struct {
	BaseSHA            string          `json:"base_sha"`
	OpenPRBlockerCount int             `json:"open_pr_blocker_count"`
	MainCIStatus       FactoryCIStatus `json:"main_ci_status"`
	Stale              bool            `json:"stale,omitempty"`
}

func (FactoryDecisionEvidence) Validate

func (evidence FactoryDecisionEvidence) Validate() error

type FactoryDigestPolicy

type FactoryDigestPolicy string
const (
	FactoryDigestPolicyRequired FactoryDigestPolicy = "required"
)

type FactoryEventRef

type FactoryEventRef struct {
	EventID      string    `json:"event_id"`
	EventType    EventType `json:"event_type"`
	JobID        string    `json:"job_id,omitempty"`
	RunID        string    `json:"run_id,omitempty"`
	TaskID       string    `json:"task_id,omitempty"`
	SlotID       string    `json:"slot_id,omitempty"`
	WorktreeID   string    `json:"worktree_id,omitempty"`
	ValidationID string    `json:"validation_id,omitempty"`
	OccurredAt   string    `json:"occurred_at,omitempty"`
}

type FactoryHandoff

type FactoryHandoff struct {
	Kind                FactoryHandoffKind `json:"kind,omitempty"`
	ExecutionPacketPath string             `json:"execution_packet_path,omitempty"`
	EpicID              string             `json:"epic_id,omitempty"`
}

func (FactoryHandoff) Validate

func (handoff FactoryHandoff) Validate() error

type FactoryHandoffKind

type FactoryHandoffKind string
const (
	FactoryHandoffNone FactoryHandoffKind = "none"
	FactoryHandoffRPI  FactoryHandoffKind = "rpi.run"
)

type FactoryJobProjection

type FactoryJobProjection struct {
	JobID       string           `json:"job_id"`
	RunID       string           `json:"run_id,omitempty"`
	TaskID      string           `json:"task_id,omitempty"`
	RequestedBy string           `json:"requested_by,omitempty"`
	Objective   string           `json:"objective,omitempty"`
	LaneID      string           `json:"lane_id,omitempty"`
	Provider    string           `json:"provider,omitempty"`
	Runtime     string           `json:"runtime,omitempty"`
	Model       string           `json:"model,omitempty"`
	Authority   RoutingAuthority `json:"authority,omitempty"`
	Status      FactoryJobStatus `json:"status"`
	SubmittedAt string           `json:"submitted_at,omitempty"`
	UpdatedAt   string           `json:"updated_at,omitempty"`
	LastEventID string           `json:"last_event_id,omitempty"`
}

type FactoryJobStatus

type FactoryJobStatus string
const (
	FactoryJobStatusSubmitted           FactoryJobStatus = "submitted"
	FactoryJobStatusAdmitted            FactoryJobStatus = "admitted"
	FactoryJobStatusAdmissionBlocked    FactoryJobStatus = "admission_blocked"
	FactoryJobStatusClaimed             FactoryJobStatus = "claimed"
	FactoryJobStatusStarted             FactoryJobStatus = "started"
	FactoryJobStatusRouted              FactoryJobStatus = "routed"
	FactoryJobStatusAllocated           FactoryJobStatus = "allocated"
	FactoryJobStatusValidating          FactoryJobStatus = "validating"
	FactoryJobStatusValidated           FactoryJobStatus = "validated"
	FactoryJobStatusValidationFailed    FactoryJobStatus = "validation_failed"
	FactoryJobStatusAwaitingManualMerge FactoryJobStatus = "awaiting_manual_merge"
	FactoryJobStatusTerminal            FactoryJobStatus = "terminal"
	FactoryJobStatusRetainedFailed      FactoryJobStatus = "retained_failed"
)

type FactoryLandingPolicy

type FactoryLandingPolicy string
const (
	FactoryLandingPolicyOff      FactoryLandingPolicy = "off"
	FactoryLandingPolicyManualPR FactoryLandingPolicy = "manual_pr"
)

type FactoryLocalPilotJobSpec

type FactoryLocalPilotJobSpec struct {
	SchemaVersion int                  `json:"schema_version"`
	JobType       JobType              `json:"job_type"`
	RunID         string               `json:"run_id"`
	Mode          FactoryAdmissionMode `json:"mode"`
	WorkOrder     FactoryWorkOrder     `json:"work_order"`
	Handoff       FactoryHandoff       `json:"handoff,omitempty"`
	MaxCycles     int                  `json:"max_cycles,omitempty"`
}

func FactoryLocalPilotJobSpecFromPayload

func FactoryLocalPilotJobSpecFromPayload(payload map[string]any) (FactoryLocalPilotJobSpec, error)

func NewFactoryLocalPilotJobSpec

func NewFactoryLocalPilotJobSpec(runID string, workOrder FactoryWorkOrder) FactoryLocalPilotJobSpec

func (FactoryLocalPilotJobSpec) ToJobSpec

func (spec FactoryLocalPilotJobSpec) ToJobSpec(jobID string) (JobSpec, error)

func (FactoryLocalPilotJobSpec) Validate

func (spec FactoryLocalPilotJobSpec) Validate() error

type FactoryMainCIBaseline

type FactoryMainCIBaseline struct {
	Status     FactoryCIStatus `json:"status"`
	RunID      string          `json:"run_id,omitempty"`
	CheckedAt  string          `json:"checked_at"`
	FailedJobs []string        `json:"failed_jobs,omitempty"`
}

func (FactoryMainCIBaseline) Validate

func (baseline FactoryMainCIBaseline) Validate() error

type FactoryMergeDecision

type FactoryMergeDecision string
const (
	FactoryMergeDecisionNotRequested  FactoryMergeDecision = "not_requested"
	FactoryMergeDecisionManualPending FactoryMergeDecision = "manual_pending"
	FactoryMergeDecisionManualMerged  FactoryMergeDecision = "manual_merged"
	FactoryMergeDecisionRejected      FactoryMergeDecision = "rejected"
	FactoryMergeDecisionAbandoned     FactoryMergeDecision = "abandoned"
)

type FactoryMergeDecisionProjection

type FactoryMergeDecisionProjection struct {
	JobID         string               `json:"job_id"`
	RunID         string               `json:"run_id,omitempty"`
	SlotID        string               `json:"slot_id,omitempty"`
	Decision      FactoryMergeDecision `json:"decision"`
	Decider       string               `json:"decider,omitempty"`
	Reason        string               `json:"reason,omitempty"`
	Conflicts     []string             `json:"conflicts,omitempty"`
	ManualCommand string               `json:"manual_command,omitempty"`
	DecidedAt     string               `json:"decided_at,omitempty"`
	LastEventID   string               `json:"last_event_id,omitempty"`
}

type FactoryModelLaneProjection

type FactoryModelLaneProjection struct {
	LaneID         string           `json:"lane_id"`
	Provider       string           `json:"provider,omitempty"`
	Runtime        string           `json:"runtime,omitempty"`
	Model          string           `json:"model,omitempty"`
	Authority      RoutingAuthority `json:"authority,omitempty"`
	LastReason     string           `json:"last_reason,omitempty"`
	DisabledReason string           `json:"disabled_reason,omitempty"`
	LastEventID    string           `json:"last_event_id,omitempty"`
}

type FactoryOpenPRBlocker

type FactoryOpenPRBlocker struct {
	PRNumber int      `json:"pr_number"`
	HeadRef  string   `json:"head_ref"`
	Files    []string `json:"files"`
}

func (FactoryOpenPRBlocker) Validate

func (blocker FactoryOpenPRBlocker) Validate() error

type FactoryPRBlockerMatrix

type FactoryPRBlockerMatrix struct {
	Known    bool
	Blockers []FactoryOpenPRBlocker
}

type FactoryPointer

type FactoryPointer struct {
	JobID   string `json:"job_id,omitempty"`
	RunID   string `json:"run_id,omitempty"`
	SlotID  string `json:"slot_id,omitempty"`
	Kind    string `json:"kind"`
	Name    string `json:"name,omitempty"`
	Path    string `json:"path,omitempty"`
	Ref     string `json:"ref,omitempty"`
	EventID string `json:"event_id,omitempty"`
}

type FactoryQueueLaneProjection

type FactoryQueueLaneProjection struct {
	LaneID         string           `json:"lane_id"`
	Provider       string           `json:"provider,omitempty"`
	Runtime        string           `json:"runtime,omitempty"`
	Model          string           `json:"model,omitempty"`
	Authority      RoutingAuthority `json:"authority,omitempty"`
	QueueDepth     int              `json:"queue_depth"`
	DisabledReason string           `json:"disabled_reason,omitempty"`
	LastEventID    string           `json:"last_event_id,omitempty"`
}

type FactoryRepoState

type FactoryRepoState struct {
	HeadSHA       string
	Dirty         bool
	TrackedAgents []string
}

type FactoryRoutingDecisionProjection

type FactoryRoutingDecisionProjection struct {
	JobID          string           `json:"job_id"`
	RunID          string           `json:"run_id,omitempty"`
	TaskID         string           `json:"task_id,omitempty"`
	LaneID         string           `json:"lane_id,omitempty"`
	Provider       string           `json:"provider,omitempty"`
	Runtime        string           `json:"runtime,omitempty"`
	Model          string           `json:"model,omitempty"`
	Authority      RoutingAuthority `json:"authority,omitempty"`
	Reason         string           `json:"reason,omitempty"`
	DisabledReason string           `json:"disabled_reason,omitempty"`
	DecidedAt      string           `json:"decided_at,omitempty"`
	LastEventID    string           `json:"last_event_id,omitempty"`
}

type FactorySlotProjection

type FactorySlotProjection struct {
	SlotID                 string            `json:"slot_id"`
	WorkerID               string            `json:"worker_id,omitempty"`
	RunID                  string            `json:"run_id,omitempty"`
	JobID                  string            `json:"job_id"`
	TaskID                 string            `json:"task_id,omitempty"`
	LaneID                 string            `json:"lane_id,omitempty"`
	Provider               string            `json:"provider,omitempty"`
	Runtime                string            `json:"runtime,omitempty"`
	Model                  string            `json:"model,omitempty"`
	Authority              RoutingAuthority  `json:"authority,omitempty"`
	Branch                 string            `json:"branch,omitempty"`
	WorktreeID             string            `json:"worktree_id,omitempty"`
	WorktreePath           string            `json:"worktree_path,omitempty"`
	ResourcePolicy         map[string]any    `json:"resource_policy,omitempty"`
	LeaseEpoch             int               `json:"lease_epoch,omitempty"`
	MaxConcurrencySnapshot int               `json:"max_concurrency_snapshot,omitempty"`
	Status                 FactorySlotStatus `json:"status"`
	AllocatedAt            string            `json:"allocated_at,omitempty"`
	UpdatedAt              string            `json:"updated_at,omitempty"`
	LastEventID            string            `json:"last_event_id,omitempty"`
}

type FactorySlotStatus

type FactorySlotStatus string
const (
	FactorySlotStatusIdle                FactorySlotStatus = "idle"
	FactorySlotStatusAllocated           FactorySlotStatus = "allocated"
	FactorySlotStatusRunning             FactorySlotStatus = "running"
	FactorySlotStatusBlockedValidation   FactorySlotStatus = "blocked_validation"
	FactorySlotStatusAwaitingManualMerge FactorySlotStatus = "awaiting_manual_merge"
	FactorySlotStatusTerminal            FactorySlotStatus = "terminal"
	FactorySlotStatusRetainedFailed      FactorySlotStatus = "retained_failed"
)

type FactoryStatusProjection

type FactoryStatusProjection struct {
	Admissions              []FactoryAdmissionProjection      `json:"admissions,omitempty"`
	Jobs                    []FactoryJobProjection            `json:"jobs,omitempty"`
	ActiveWorkers           []FactoryWorkerProjection         `json:"active_workers,omitempty"`
	Slots                   []FactorySlotProjection           `json:"slots,omitempty"`
	QueueLanes              []FactoryQueueLaneProjection      `json:"queue_lanes,omitempty"`
	ModelLanes              []FactoryModelLaneProjection      `json:"model_lanes,omitempty"`
	Validations             []FactoryValidationProjection     `json:"validations,omitempty"`
	BlockedValidations      []FactoryValidationProjection     `json:"blocked_validations,omitempty"`
	Worktrees               []FactoryWorktreeProjection       `json:"worktrees,omitempty"`
	RetainedFailedWorktrees []FactoryWorktreeProjection       `json:"retained_failed_worktrees,omitempty"`
	MergeDecisions          []FactoryMergeDecisionProjection  `json:"merge_decisions,omitempty"`
	PendingManualMerges     []FactoryMergeDecisionProjection  `json:"pending_manual_merges,omitempty"`
	TerminalJobs            []FactoryTerminalProjection       `json:"terminal_jobs,omitempty"`
	RecentEvents            []FactoryEventRef                 `json:"recent_events,omitempty"`
	Logs                    []FactoryPointer                  `json:"logs,omitempty"`
	Artifacts               []FactoryPointer                  `json:"artifacts,omitempty"`
	Transcripts             []FactoryPointer                  `json:"transcripts,omitempty"`
	Diffs                   []FactoryPointer                  `json:"diffs,omitempty"`
	LastRoutingDecision     *FactoryRoutingDecisionProjection `json:"last_routing_decision,omitempty"`
}

type FactoryTarget

type FactoryTarget struct {
	Type    FactoryTargetType `json:"type"`
	ID      string            `json:"id"`
	Summary string            `json:"summary"`
}

func (FactoryTarget) Validate

func (target FactoryTarget) Validate() error

type FactoryTargetType

type FactoryTargetType string
const (
	FactoryTargetGoal            FactoryTargetType = "goal"
	FactoryTargetBead            FactoryTargetType = "bead"
	FactoryTargetExecutionPacket FactoryTargetType = "execution_packet"
)

type FactoryTerminalProjection

type FactoryTerminalProjection struct {
	JobID            string                 `json:"job_id"`
	RunID            string                 `json:"run_id,omitempty"`
	SlotID           string                 `json:"slot_id,omitempty"`
	Status           JobStatus              `json:"status"`
	ArtifactRefs     map[string]ArtifactRef `json:"artifact_refs,omitempty"`
	TranscriptRef    string                 `json:"transcript_ref,omitempty"`
	RetainedWorktree bool                   `json:"retained_worktree,omitempty"`
	OccurredAt       string                 `json:"occurred_at,omitempty"`
	LastEventID      string                 `json:"last_event_id,omitempty"`
}

type FactoryUnknownEvidencePolicy

type FactoryUnknownEvidencePolicy string
const (
	FactoryUnknownEvidenceBlock            FactoryUnknownEvidencePolicy = "block"
	FactoryUnknownEvidenceAllowNonMutating FactoryUnknownEvidencePolicy = "allow_non_mutating"
)

type FactoryValidationProjection

type FactoryValidationProjection struct {
	ValidationID string                  `json:"validation_id"`
	JobID        string                  `json:"job_id"`
	RunID        string                  `json:"run_id,omitempty"`
	SlotID       string                  `json:"slot_id,omitempty"`
	Level        string                  `json:"level,omitempty"`
	Commands     []string                `json:"commands,omitempty"`
	Status       FactoryValidationStatus `json:"status"`
	Artifacts    map[string]string       `json:"artifacts,omitempty"`
	ArtifactRefs map[string]ArtifactRef  `json:"artifact_refs,omitempty"`
	StartedAt    string                  `json:"started_at,omitempty"`
	CompletedAt  string                  `json:"completed_at,omitempty"`
	DurationMS   int                     `json:"duration_ms,omitempty"`
	LastEventID  string                  `json:"last_event_id,omitempty"`
}

type FactoryValidationStatus

type FactoryValidationStatus string
const (
	FactoryValidationStatusRunning   FactoryValidationStatus = "running"
	FactoryValidationStatusPassed    FactoryValidationStatus = "passed"
	FactoryValidationStatusFailed    FactoryValidationStatus = "failed"
	FactoryValidationStatusBlocked   FactoryValidationStatus = "blocked"
	FactoryValidationStatusCancelled FactoryValidationStatus = "cancelled"
)

type FactoryWorkOrder

type FactoryWorkOrder struct {
	SchemaVersion         int                          `json:"schema_version"`
	WorkOrderID           string                       `json:"work_order_id"`
	GeneratedAt           string                       `json:"generated_at"`
	ExpiresAt             string                       `json:"expires_at"`
	BaseSHA               string                       `json:"base_sha"`
	Target                FactoryTarget                `json:"target"`
	AllowedFiles          []string                     `json:"allowed_files"`
	ValidationCommands    []string                     `json:"validation_commands"`
	LandingPolicy         FactoryLandingPolicy         `json:"landing_policy"`
	DigestPolicy          FactoryDigestPolicy          `json:"digest_policy"`
	OpenPRBlockers        []FactoryOpenPRBlocker       `json:"open_pr_blockers"`
	MainCIBaseline        FactoryMainCIBaseline        `json:"main_ci_baseline"`
	UnknownEvidencePolicy FactoryUnknownEvidencePolicy `json:"unknown_evidence_policy,omitempty"`
}

func (FactoryWorkOrder) Validate

func (work FactoryWorkOrder) Validate() error

type FactoryWorkerProjection

type FactoryWorkerProjection struct {
	WorkerID     string            `json:"worker_id,omitempty"`
	SlotID       string            `json:"slot_id"`
	RunID        string            `json:"run_id,omitempty"`
	JobID        string            `json:"job_id"`
	TaskID       string            `json:"task_id,omitempty"`
	LaneID       string            `json:"lane_id,omitempty"`
	Provider     string            `json:"provider,omitempty"`
	Runtime      string            `json:"runtime,omitempty"`
	Model        string            `json:"model,omitempty"`
	Authority    RoutingAuthority  `json:"authority,omitempty"`
	Status       FactorySlotStatus `json:"status"`
	WorktreePath string            `json:"worktree_path,omitempty"`
	LastEventID  string            `json:"last_event_id,omitempty"`
}

type FactoryWorktreeProjection

type FactoryWorktreeProjection struct {
	WorktreeID       string            `json:"worktree_id"`
	RunID            string            `json:"run_id,omitempty"`
	JobID            string            `json:"job_id"`
	SlotID           string            `json:"slot_id,omitempty"`
	BaseCommit       string            `json:"base_commit,omitempty"`
	Branch           string            `json:"branch,omitempty"`
	Path             string            `json:"path,omitempty"`
	CreatedAt        string            `json:"created_at,omitempty"`
	DirtyState       string            `json:"dirty_state,omitempty"`
	RetentionPolicy  string            `json:"retention_policy,omitempty"`
	MergeDisposition string            `json:"merge_disposition,omitempty"`
	Status           FactorySlotStatus `json:"status,omitempty"`
	LastEventID      string            `json:"last_event_id,omitempty"`
}

type FailJobInput

type FailJobInput struct {
	JobID        string
	RequestID    RequestID
	ClaimToken   string
	LeaseEpoch   int
	Actor        string
	Failure      JobFailure
	Artifacts    map[string]string
	ArtifactRefs map[string]ArtifactRef
}

type FailureCode

type FailureCode string
const (
	FailureDaemonUnavailable         FailureCode = "daemon_unavailable"
	FailureProviderUnreachable       FailureCode = "provider_unreachable"
	FailureSessionPending            FailureCode = "session_pending"
	FailureSessionLost               FailureCode = "lost"
	FailureEventStreamUnavailable    FailureCode = "event_stream_unavailable"
	FailureTerminalWithoutTranscript FailureCode = "terminal_without_transcript"
	FailureRequestRejected           FailureCode = "request_rejected"
	FailureProjectionDegraded        FailureCode = "projection_degraded"
	FailureRetryExhausted            FailureCode = "retry_exhausted"
)

type GasCityClientAdapter

type GasCityClientAdapter struct {
	Client *gascity.Client
}

func (GasCityClientAdapter) CityReadiness

func (a GasCityClientAdapter) CityReadiness(ctx context.Context, cityName string) (gascity.ReadinessResponse, error)

func (GasCityClientAdapter) CreateSession

func (GasCityClientAdapter) GetSession

func (GasCityClientAdapter) SessionTranscript

func (GasCityClientAdapter) StreamCityEvents

func (GasCityClientAdapter) SubmitSession

type GasCityRPIEventStream

type GasCityRPIEventStream interface {
	NextEvent() (gascity.EventStreamFrame, error)
	Close() error
}

type GasCityRPIPhaseExecutor

type GasCityRPIPhaseExecutor struct {
	Client           GasCityRPIClient
	CityName         string
	SessionAgentName string
	PhaseTimeout     time.Duration
}

func (GasCityRPIPhaseExecutor) ExecuteRPIPhase

type HeartbeatInput

type HeartbeatInput struct {
	JobID        string
	RequestID    RequestID
	ClaimToken   string
	LeaseEpoch   int
	Actor        string
	Artifacts    map[string]string
	ArtifactRefs map[string]ArtifactRef
}

type JobExecutionResult

type JobExecutionResult struct {
	Artifacts    map[string]string
	ArtifactRefs map[string]ArtifactRef
}

JobExecutionResult is the terminal output from a daemon job executor.

type JobExecutor

type JobExecutor interface {
	JobTypes() []JobType
	RunJob(context.Context, QueueLease) (JobExecutionResult, error)
}

JobExecutor runs claimed daemon jobs for one or more job types.

type JobFailure

type JobFailure struct {
	Code      FailureCode `json:"code"`
	Message   string      `json:"message,omitempty"`
	Retryable bool        `json:"retryable,omitempty"`
}

type JobProjection

type JobProjection struct {
	JobID             string                 `json:"job_id"`
	JobType           JobType                `json:"job_type,omitempty"`
	RequestID         string                 `json:"request_id"`
	RequestIDs        []string               `json:"request_ids,omitempty"`
	Status            JobStatus              `json:"status"`
	ResultStatus      JobResultStatus        `json:"result_status,omitempty"`
	Failure           *JobFailure            `json:"failure,omitempty"`
	Artifacts         map[string]string      `json:"artifacts,omitempty"`
	ArtifactRefs      map[string]ArtifactRef `json:"artifact_refs,omitempty"`
	ProjectionTargets []ProjectionName       `json:"projection_targets,omitempty"`
	CreatedAt         string                 `json:"created_at,omitempty"`
	UpdatedAt         string                 `json:"updated_at,omitempty"`
	LastEventID       string                 `json:"last_event_id,omitempty"`
}

type JobResult

type JobResult struct {
	Status      JobResultStatus   `json:"status"`
	Artifacts   map[string]string `json:"artifacts,omitempty"`
	CompletedAt string            `json:"completed_at,omitempty"`
}

type JobResultStatus

type JobResultStatus string
const (
	JobResultSucceeded JobResultStatus = "succeeded"
	JobResultFailed    JobResultStatus = "failed"
	JobResultCancelled JobResultStatus = "cancelled"
)

type JobSpec

type JobSpec struct {
	ID        string         `json:"id"`
	Type      JobType        `json:"type"`
	Payload   map[string]any `json:"payload,omitempty"`
	CreatedAt string         `json:"created_at,omitempty"`
}

type JobStatus

type JobStatus string
const (
	JobStatusQueued       JobStatus = "queued"
	JobStatusRunning      JobStatus = "running"
	JobStatusRetryWaiting JobStatus = "retry_waiting"
	JobStatusCompleted    JobStatus = "completed"
	JobStatusFailed       JobStatus = "failed"
	JobStatusCancelled    JobStatus = "cancelled"
	JobStatusDegraded     JobStatus = "degraded"
)

func ProjectJobStatus

func ProjectJobStatus(input JobStatusProjectionInput) JobStatus

type JobStatusProjectionInput

type JobStatusProjectionInput struct {
	TerminalEvent   EventType
	Lease           LeaseState
	ProjectionStale bool
}

type JobType

type JobType string
const (
	JobTypeRPIRun            JobType = "rpi.run"
	JobTypeRPIPhase          JobType = "rpi.phase"
	JobTypeDreamRun          JobType = "dream.run"
	JobTypeDreamStage        JobType = "dream.stage"
	JobTypeWikiBuild         JobType = "wiki.build"
	JobTypeWikiForge         JobType = "wiki.forge"
	JobTypeFactoryAdmission  JobType = "factory.admission"
	JobTypeFactoryLocalPilot JobType = "factory.local-pilot"
	JobTypeOpenClawSnapshot  JobType = "openclaw.snapshot"
	JobTypePlansProjection   JobType = "plans.projection"
	// JobTypeLLMWikiLoop is the Karpathy-pattern external-knowledge loop job type.
	// Operates on raw/ + wiki/ trees, distinct from internal .agents/ work.
	JobTypeLLMWikiLoop    JobType = "llmwiki.loop"
	JobTypeEvalSuite      JobType = "eval.suite"
	JobTypeEvalSkillDelta JobType = "eval.skill-delta"
	JobTypeSkillInvoke    JobType = "skill.invoke"
)

type JobTypeFailureRateSummary

type JobTypeFailureRateSummary struct {
	JobType       JobType `json:"job_type"`
	TerminalCount int     `json:"terminal_count"`
	FailedCount   int     `json:"failed_count"`
	FailureRate   float64 `json:"failure_rate"`
}

type LeaseState

type LeaseState string
const (
	LeaseNone    LeaseState = "none"
	LeaseFresh   LeaseState = "fresh"
	LeaseExpired LeaseState = "expired"
	LeaseUnknown LeaseState = "unknown"
)

type LedgerEvent

type LedgerEvent struct {
	SchemaVersion int            `json:"schema_version"`
	EventID       string         `json:"event_id"`
	RequestID     string         `json:"request_id"`
	JobID         string         `json:"job_id"`
	EventType     EventType      `json:"event_type"`
	OccurredAt    string         `json:"occurred_at"`
	Actor         string         `json:"actor"`
	Payload       map[string]any `json:"payload,omitempty"`
}

func NewLedgerEvent

func NewLedgerEvent(input LedgerEventInput) (LedgerEvent, error)

func NormalizeLedgerEvent

func NormalizeLedgerEvent(event LedgerEvent) (LedgerEvent, error)

type LedgerEventInput

type LedgerEventInput struct {
	EventID           string
	RequestID         RequestID
	JobID             string
	EventType         EventType
	OccurredAt        time.Time
	Actor             string
	JobType           JobType
	ProjectionTargets []ProjectionName
	Payload           map[string]any
}

func NewAgentUpdateCriterionVerdictEvent

func NewAgentUpdateCriterionVerdictEvent(p AgentUpdateCriterionVerdict) LedgerEventInput

NewAgentUpdateCriterionVerdictEvent builds a LedgerEventInput for an agent-update criterion_verdict payload. An empty Timestamp is defaulted to time.Now().UTC() in RFC 3339 nano format.

func NewAgentUpdatePhaseCompleteEvent

func NewAgentUpdatePhaseCompleteEvent(p AgentUpdatePhaseComplete) LedgerEventInput

NewAgentUpdatePhaseCompleteEvent builds a LedgerEventInput for an agent-update phase_complete payload. An empty Timestamp is defaulted to time.Now().UTC() in RFC 3339 nano format.

func NewAgentUpdatePhaseHandoffEvent

func NewAgentUpdatePhaseHandoffEvent(p AgentUpdatePhaseHandoff) LedgerEventInput

NewAgentUpdatePhaseHandoffEvent builds a LedgerEventInput for an agent-update phase_handoff payload. An empty Timestamp is defaulted to time.Now().UTC() in RFC 3339 nano format.

func NewAgentUpdatePhaseStartEvent

func NewAgentUpdatePhaseStartEvent(p AgentUpdatePhaseStart) LedgerEventInput

NewAgentUpdatePhaseStartEvent builds a LedgerEventInput for an agent-update phase_start payload. An empty Timestamp is defaulted to time.Now().UTC() in RFC 3339 nano format so callers can leave it blank for "now".

type LedgerHealth

type LedgerHealth struct {
	LedgerSizeBytes    int64         `json:"ledger_size_bytes"`
	LedgerMaxBytes     int64         `json:"ledger_max_bytes"`
	LedgerSizeRatio    float64       `json:"ledger_size_ratio"`
	LatestSnapshotPath string        `json:"latest_snapshot_path,omitempty"`
	LatestSnapshotAge  time.Duration `json:"latest_snapshot_age_ns"`
	HasSnapshot        bool          `json:"has_snapshot"`
	ArchiveCount       int           `json:"archive_count"`
	OldestArchiveTime  time.Time     `json:"oldest_archive_time,omitempty"`
	WarnReasons        []string      `json:"warn_reasons,omitempty"`
}

LedgerHealth summarizes durability state for the ao doctor surface (TB-Δ3 Phase 2-C). Read-only — populated by Store.LedgerHealth from on-disk facts.

type LedgerHealthThresholds

type LedgerHealthThresholds struct {
	// LedgerSizeWarnRatio fires WARN when ledger.jsonl size / max >= ratio.
	LedgerSizeWarnRatio float64
	// SnapshotMaxAge fires WARN when latest snapshot age >= this duration.
	// Zero disables the snapshot-age check.
	SnapshotMaxAge time.Duration
	// ArchiveCountWarn fires WARN when archive count >= this value. Zero
	// disables the check.
	ArchiveCountWarn int
}

LedgerHealthThresholds controls the WARN bands. Zero-value means "use LedgerHealthDefaultThresholds()". Operators may override per-call.

func LedgerHealthDefaultThresholds

func LedgerHealthDefaultThresholds() LedgerHealthThresholds

type LedgerTelemetry

type LedgerTelemetry struct {
	EventCount             int                         `json:"event_count"`
	Window                 string                      `json:"window"`
	PhaseLatency           []PhaseLatencyHistogram     `json:"phase_latency,omitempty"`
	WorkerKindDistribution []WorkerKindDistribution    `json:"worker_kind_distribution,omitempty"`
	FailureRates           []JobTypeFailureRateSummary `json:"failure_rates,omitempty"`
}

func BuildLedgerTelemetry

func BuildLedgerTelemetry(events []LedgerEvent, now time.Time, window time.Duration) LedgerTelemetry

type ListSchedulesResponse

type ListSchedulesResponse struct {
	Schedules []RecurringJobTemplate `json:"schedules"`
}

ListSchedulesResponse is the body of GET /v1/schedules.

type LocalFactoryAdmissionEvidenceProvider

type LocalFactoryAdmissionEvidenceProvider struct {
	Root      string
	WorkOrder FactoryWorkOrder
}

func (LocalFactoryAdmissionEvidenceProvider) MainCIBaseline

func (LocalFactoryAdmissionEvidenceProvider) OpenPRBlockers

func (LocalFactoryAdmissionEvidenceProvider) RepoState

type MutationCapability

type MutationCapability string
const (
	MutationCapabilitySubmitJob       MutationCapability = "submit_job"
	MutationCapabilityCancelJob       MutationCapability = "cancel_job"
	MutationCapabilityOpenClawTrigger MutationCapability = "openclaw_trigger"
	MutationCapabilityAdmin           MutationCapability = "admin"
)

type MutationDecision

type MutationDecision struct {
	Allowed            bool                 `json:"allowed"`
	Reason             string               `json:"reason,omitempty"`
	TokenName          string               `json:"token_name,omitempty"`
	Capabilities       []MutationCapability `json:"capabilities,omitempty"`
	RequiredCapability MutationCapability   `json:"required_capability,omitempty"`
	LocalOnly          bool                 `json:"local_only,omitempty"`
}

func AuthorizeMutationDecision

func AuthorizeMutationDecision(r *http.Request, policy MutationPolicy) (MutationDecision, error)

func EvaluateMutationPolicy

func EvaluateMutationPolicy(r *http.Request, policy MutationPolicy) MutationDecision

func MutationDecisionFromContext

func MutationDecisionFromContext(ctx context.Context) (MutationDecision, bool)

MutationDecisionFromContext returns the decision attached by registerMutationRoute. Handlers that need TokenName for ledger actor attribution should call this rather than re-running the policy check.

type MutationPolicy

type MutationPolicy struct {
	Token              string
	Tokens             []MutationToken
	TokenHeader        string
	AllowedPaths       []string
	AllowedMethods     []string
	PathCapabilities   map[string]MutationCapability
	AllowedOrigins     []string
	RequireLocalRemote bool
}

func DefaultMutationPolicy

func DefaultMutationPolicy(token string, allowedPaths []string) MutationPolicy

type MutationPolicyProvider

type MutationPolicyProvider func() MutationPolicy

MutationPolicyProvider lazily resolves the policy at request time. The daemon defaults are evaluated per-request because tests inject options after the router is built.

type MutationToken

type MutationToken struct {
	Name         string               `json:"name"`
	Token        string               `json:"token"` // #nosec G101 -- config field name, not a hard-coded credential.
	Capabilities []MutationCapability `json:"capabilities"`
	LocalOnly    bool                 `json:"local_only,omitempty"`
}

func LoadMutationTokensFile

func LoadMutationTokensFile(path string) ([]MutationToken, error)

type OpenClawResources

type OpenClawResources struct {
	Runs []JobProjection `json:"runs"`
	Jobs []JobProjection `json:"jobs"`
	Wiki []JobProjection `json:"wiki"`
}

type OpenClawSnapshotProjection

type OpenClawSnapshotProjection struct {
	SchemaVersion int               `json:"schema_version"`
	SnapshotID    string            `json:"snapshot_id"`
	GeneratedAt   string            `json:"generated_at"`
	Source        ProjectionSource  `json:"source"`
	Resources     OpenClawResources `json:"resources"`
	Status        ProjectionStatus  `json:"status"`
}

type PhaseLatencyHistogram

type PhaseLatencyHistogram struct {
	PhaseName string `json:"phase_name"`
	Count     int    `json:"count"`
	P50Millis int64  `json:"p50_ms"`
	P99Millis int64  `json:"p99_ms"`
}

type PlansBdSource

type PlansBdSource interface {
	// QueryEpics returns the bd-side epic state for project_id (filtered to
	// rows whose ID begins with issuePrefix when non-empty). Implementations
	// must respect ctx cancellation.
	QueryEpics(ctx context.Context, projectID, issuePrefix string) ([]PlansProjectionEntry, error)
}

PlansBdSource is the input edge of PlansProjectionExecutor. Production wires this to the shared bushido Dolt server via dbBdSource (database/sql); tests inject a fakePlansBdSource for L1 BDD coverage.

type PlansProjectionEntry

type PlansProjectionEntry struct {
	BeadsID   string    `json:"beads_id"`
	Title     string    `json:"title,omitempty"`
	Status    string    `json:"status,omitempty"`
	Priority  string    `json:"priority,omitempty"`
	IssueType string    `json:"issue_type,omitempty"`
	UpdatedAt time.Time `json:"updated_at,omitempty"`
	Checksum  string    `json:"checksum,omitempty"`
}

PlansProjectionEntry is one entry in the daemon-rebuilt plans manifest. Source-of-truth fields come from bd via the executor's BdSource.

type PlansProjectionExecutor

type PlansProjectionExecutor struct {
	// contains filtered or unexported fields
}

PlansProjectionExecutor is the JobExecutor (supervisor.go:18) for plans.projection jobs. Mirrors the RPIJobExecutor shape: thin wrapper that the supervisor invokes with a claimed job; rebuild/write/validate mechanics live in plans_projection.go.

func NewPlansProjectionExecutor

func NewPlansProjectionExecutor(opts PlansProjectionExecutorOptions) (*PlansProjectionExecutor, error)

NewPlansProjectionExecutor builds an executor from explicit dependencies. Returns an error when the store or bd source is nil — mirrors NewRPIJobExecutor's required-field contract.

func (*PlansProjectionExecutor) JobTypes

func (e *PlansProjectionExecutor) JobTypes() []JobType

JobTypes reports the daemon job types this executor handles.

func (*PlansProjectionExecutor) RunJob

RunJob executes one claimed plans.projection job. The supervisor wraps this call with claim/heartbeat/terminal-record bookkeeping. The executor:

  1. parses the spec from claim payload,
  2. queries bd for the project's epic set,
  3. builds a DaemonPlansProjection in memory,
  4. writes the manifest snapshot atomically (tmp + os.Rename), and
  5. returns artifacts mapping the snapshot path and entry count.

RunJob requires a non-nil ctx; callers passing nil will panic on first use.

type PlansProjectionExecutorOptions

type PlansProjectionExecutorOptions struct {
	Store    *Store
	BdSource PlansBdSource
	Now      func() time.Time
}

PlansProjectionExecutorOptions configures a PlansProjectionExecutor.

type PlansProjectionJobSpec

type PlansProjectionJobSpec struct {
	SchemaVersion  int                           `json:"schema_version"`
	JobType        JobType                       `json:"job_type"`
	ProjectID      string                        `json:"project_id"`
	IssuePrefix    string                        `json:"issue_prefix"`
	RefreshTrigger PlansProjectionRefreshTrigger `json:"refresh_trigger"`
	OutputDir      string                        `json:"output_dir"`
}

PlansProjectionJobSpec is the payload contract for plans.projection jobs. Mirrors the rpi/dream spec round-trip pattern.

func NewPlansProjectionJobSpec

func NewPlansProjectionJobSpec(projectID, issuePrefix, outputDir string) PlansProjectionJobSpec

NewPlansProjectionJobSpec builds a spec with default schema/job_type values.

func PlansProjectionJobSpecFromPayload

func PlansProjectionJobSpecFromPayload(payload map[string]any) (PlansProjectionJobSpec, error)

PlansProjectionJobSpecFromPayload parses a JobSpec.Payload back into a typed spec. Returns an error if required fields are missing or invalid.

func (PlansProjectionJobSpec) IdempotencyKey

func (spec PlansProjectionJobSpec) IdempotencyKey() string

IdempotencyKey returns the singleton-per-project key used by the queue to collapse duplicate submissions per foundation §1 idempotency rule.

func (PlansProjectionJobSpec) Validate

func (spec PlansProjectionJobSpec) Validate() error

Validate enforces the spec contract for submission and replay.

type PlansProjectionRefreshTrigger

type PlansProjectionRefreshTrigger string

PlansProjectionRefreshTrigger names how the daemon decided to (re)run the projection. Used for diagnostics and ledger correlation.

const (
	PlansProjectionTriggerManual       PlansProjectionRefreshTrigger = "manual"
	PlansProjectionTriggerInterval     PlansProjectionRefreshTrigger = "interval"
	PlansProjectionTriggerSubscription PlansProjectionRefreshTrigger = "subscription"
)

type ProjectionLag

type ProjectionLag struct {
	LastEventID        string `json:"last_event_id,omitempty"`
	EventCount         int    `json:"event_count"`
	CorruptRecordCount int    `json:"corrupt_record_count"`
	Degraded           bool   `json:"degraded"`
}

type ProjectionManifest

type ProjectionManifest struct {
	SchemaVersion   int              `json:"schema_version"`
	Projection      ProjectionName   `json:"projection"`
	SourceLedger    string           `json:"source_ledger"`
	LastEventID     string           `json:"last_event_id,omitempty"`
	Status          ProjectionStatus `json:"status"`
	RebuiltAt       string           `json:"rebuilt_at"`
	OutputPaths     []string         `json:"output_paths,omitempty"`
	DegradedReasons []string         `json:"degraded_reasons,omitempty"`
}

type ProjectionName

type ProjectionName string
const (
	ProjectionRPIRegistry     ProjectionName = "rpi-registry"
	ProjectionDreamRuns       ProjectionName = "dream-runs"
	ProjectionWikiJobs        ProjectionName = "wiki-jobs"
	ProjectionOpenClaw        ProjectionName = "openclaw-snapshot"
	ProjectionDaemonStatus    ProjectionName = "daemon-status"
	ProjectionDaemonJobStatus ProjectionName = "daemon-job-status"
	ProjectionPlansManifest   ProjectionName = "plans-manifest"
)

type ProjectionRebuildOptions

type ProjectionRebuildOptions struct {
	RebuiltAt    time.Time
	SourceLedger string
	// FromSnapshot, when non-nil, makes RebuildProjections start from the
	// given snapshot's state and apply only events whose EventID is strictly
	// greater than FromSnapshot.LastEventID. Use this to amortize replay cost
	// across daemon restarts (Phase 2-B of TB-Δ3).
	FromSnapshot *ProjectionSet
}

type ProjectionSet

type ProjectionSet struct {
	SchemaVersion   int                                   `json:"schema_version"`
	RebuiltAt       string                                `json:"rebuilt_at"`
	SourceLedger    string                                `json:"source_ledger"`
	LastEventID     string                                `json:"last_event_id,omitempty"`
	Manifests       map[ProjectionName]ProjectionManifest `json:"manifests"`
	Jobs            []JobProjection                       `json:"jobs"`
	RPI             RPIRegistryProjection                 `json:"rpi"`
	Dream           DreamRunsProjection                   `json:"dream"`
	Wiki            WikiJobsProjection                    `json:"wiki"`
	OpenClaw        OpenClawSnapshotProjection            `json:"openclaw"`
	Plans           DaemonPlansProjection                 `json:"plans"`
	Schedules       []RecurringJobTemplate                `json:"schedules,omitempty"`
	Factory         FactoryStatusProjection               `json:"factory"`
	DegradedReasons []string                              `json:"degraded_reasons,omitempty"`
}

func RebuildProjections

func RebuildProjections(events []LedgerEvent, opts ProjectionRebuildOptions) (ProjectionSet, error)

RebuildProjections folds the given ledger events into a fresh ProjectionSet (or, if opts.FromSnapshot is set, into a delta replay seeded from that snapshot).

Callers MUST check err before using the returned set. On error, the returned set is zero-valued (SchemaVersion == 0, nil Manifests/Jobs/ Schedules, empty derived RPI/Dream/Wiki/OpenClaw buckets). Treat the SchemaVersion == 0 sentinel as "do not use this set"; downstream code that indexes into Manifests or appends to Jobs without checking err first risks nil-map panics or silently emitting an empty projection. See the bug-hunt L2 test TestRebuildProjections_ErrorReturnsUnusableSet for the contract.

type ProjectionSource

type ProjectionSource struct {
	Ledger      string `json:"ledger"`
	LastEventID string `json:"last_event_id,omitempty"`
}

type ProjectionStatus

type ProjectionStatus string
const (
	ProjectionStatusCurrent  ProjectionStatus = "current"
	ProjectionStatusStale    ProjectionStatus = "stale"
	ProjectionStatusDegraded ProjectionStatus = "degraded"
)

type ProviderProjectionInput

type ProviderProjectionInput struct {
	DaemonReady        bool
	GasCityReady       bool
	WorkerSessionKnown bool
}

type ProviderStatus

type ProviderStatus string
const (
	ProviderDaemonUnavailable ProviderStatus = "daemon_unavailable"
	ProviderUnreachable       ProviderStatus = "provider_unreachable"
	ProviderSessionPending    ProviderStatus = "session_pending"
	ProviderSessionBound      ProviderStatus = "session_bound"
)

func ProjectProviderStatus

func ProjectProviderStatus(input ProviderProjectionInput) ProviderStatus

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

func NewQueue

func NewQueue(store *Store, opts QueueOptions) *Queue

NewQueue constructs a Queue bound to store with the given options.

HTTP handlers must call NewQueue per request rather than reusing a shared instance: the returned Queue carries an in-memory sequence counter initialized from the durable store, and opts may be request-scoped (Now, failpoints, actor). The store itself is shared and concurrency-safe; the Queue wrapper is the per-request boundary. See cli/internal/daemon/server.go handlers for the canonical pattern.

func (*Queue) CancelJob

func (q *Queue) CancelJob(input CancelJobInput, opts QueueMutationOptions) (CancelJobResult, error)

CancelJob appends a terminal cancellation event for a non-terminal job.

func (*Queue) ClaimJob

func (q *Queue) ClaimJob(jobID, actor string, opts QueueMutationOptions) (QueueLease, error)

func (*Queue) ClaimNext

func (q *Queue) ClaimNext(actor string, opts QueueMutationOptions) (QueueLease, error)

func (*Queue) ClaimNextMatching

func (q *Queue) ClaimNextMatching(actor string, match func(QueueJobState) bool, opts QueueMutationOptions) (QueueLease, error)

ClaimNextMatching claims the next claimable job accepted by match.

func (*Queue) CompleteJob

func (q *Queue) CompleteJob(input CompleteJobInput, opts QueueMutationOptions) (QueueJobState, error)

func (*Queue) FailJob

func (q *Queue) FailJob(input FailJobInput, opts QueueMutationOptions) (QueueJobState, error)

func (*Queue) Heartbeat

func (q *Queue) Heartbeat(input HeartbeatInput, opts QueueMutationOptions) (QueueJobState, error)

func (*Queue) Snapshot

func (q *Queue) Snapshot() (QueueSnapshot, error)

func (*Queue) SubmitJob

func (q *Queue) SubmitJob(input SubmitJobInput, opts QueueMutationOptions) (QueueJobState, error)

type QueueFailpoint

type QueueFailpoint string
const (
	QueueFailpointBeforeAppend         QueueFailpoint = "before_append"
	QueueFailpointAfterAppendBeforeAck QueueFailpoint = "after_append_before_ack"
)

type QueueJobState

type QueueJobState struct {
	JobID             string                 `json:"job_id"`
	JobType           JobType                `json:"job_type"`
	RequestID         string                 `json:"request_id"`
	RequestIDs        []string               `json:"request_ids,omitempty"`
	Status            JobStatus              `json:"status"`
	IdempotencyKey    string                 `json:"idempotency_key,omitempty"`
	Attempt           int                    `json:"attempt"`
	MaxAttempts       int                    `json:"max_attempts"`
	ClaimToken        string                 `json:"claim_token,omitempty"`
	LeaseEpoch        int                    `json:"lease_epoch,omitempty"`
	LeaseExpiresAt    string                 `json:"lease_expires_at,omitempty"`
	RetryExhausted    bool                   `json:"retry_exhausted,omitempty"`
	Failure           *JobFailure            `json:"failure,omitempty"`
	Artifacts         map[string]string      `json:"artifacts,omitempty"`
	ArtifactRefs      map[string]ArtifactRef `json:"artifact_refs,omitempty"`
	ProjectionTargets []ProjectionName       `json:"projection_targets,omitempty"`
	Payload           map[string]any         `json:"payload,omitempty"`
	LastEventID       string                 `json:"last_event_id,omitempty"`
	CreatedAt         string                 `json:"created_at,omitempty"`
	UpdatedAt         string                 `json:"updated_at,omitempty"`
}

type QueueLease

type QueueLease struct {
	Job            QueueJobState `json:"job"`
	ClaimToken     string        `json:"claim_token"`
	LeaseEpoch     int           `json:"lease_epoch"`
	LeaseExpiresAt string        `json:"lease_expires_at"`
}

type QueueMutationOptions

type QueueMutationOptions struct {
	Failpoint QueueFailpoint
}

type QueueOptions

type QueueOptions struct {
	LeaseDuration time.Duration
	MaxAttempts   int
	Actor         string
	Now           func() time.Time
}

type QueueSnapshot

type QueueSnapshot struct {
	Jobs        []QueueJobState `json:"jobs"`
	LastEventID string          `json:"last_event_id,omitempty"`
}

type RPIBackend

type RPIBackend string
const (
	RPIBackendGasCityAPI     RPIBackend = "gascity-api"
	RPIBackendGasCityCLI     RPIBackend = "gc-cli-fallback"
	RPIBackendForeground     RPIBackend = "foreground"
	RPIBackendDaemonDegraded RPIBackend = "daemon-degraded"
)

type RPIJobExecutor

type RPIJobExecutor struct {
	// contains filtered or unexported fields
}

RPIJobExecutor is a JobExecutor (Supervisor-compatible) for rpi.run and rpi.phase jobs. The Supervisor handles claim/heartbeat/terminal-write; this executor only runs the user-visible RPI work for an already-claimed job by delegating to an RPIPhaseExecutor (today: GasCityRPIPhaseExecutor).

soc-5of.8 (TB-Δ8) — production caller for daemon-submitted RPI jobs under the gascity executor policy. Symmetric in shape to WikiForgeExecutor + DreamExecutor.

func NewRPIJobExecutor

func NewRPIJobExecutor(opts RPIJobExecutorOptions) (*RPIJobExecutor, error)

NewRPIJobExecutor constructs an executor for rpi.run / rpi.phase jobs. The underlying RPIRunner is reused so behavior matches the operator-driven `ao rpi run` path; only the top-level claim/terminal-record loop differs (Supervisor owns that).

func (*RPIJobExecutor) JobTypes

func (e *RPIJobExecutor) JobTypes() []JobType

JobTypes reports the daemon job types this executor handles.

func (*RPIJobExecutor) RunJob

RunJob executes a claimed RPI job. The supervisor wraps this call with claim/heartbeat/terminal-record bookkeeping; we only contribute the per-job execution.

RunJob requires a non-nil ctx; callers passing nil will panic on first use.

type RPIJobExecutorOptions

type RPIJobExecutorOptions struct {
	Store         *Store
	Executor      RPIPhaseExecutor
	PromptBuilder RPIPromptBuilder
	// RegistryWriter is optional; used to keep the rpi-registry projection
	// fresh after each phase. Pass nil to disable writes.
	RegistryWriter cliRPI.RunRegistryWriter
}

RPIJobExecutorOptions configures an RPIJobExecutor.

type RPIJobRunResult

type RPIJobRunResult struct {
	JobID     string            `json:"job_id"`
	RunID     string            `json:"run_id,omitempty"`
	JobType   JobType           `json:"job_type"`
	Status    JobStatus         `json:"status"`
	Artifacts map[string]string `json:"artifacts,omitempty"`
	Failure   *JobFailure       `json:"failure,omitempty"`
}

type RPIPhaseExecutionError

type RPIPhaseExecutionError struct {
	Code      FailureCode
	Message   string
	Retryable bool
	Cause     error
}

func (*RPIPhaseExecutionError) Error

func (e *RPIPhaseExecutionError) Error() string

func (*RPIPhaseExecutionError) Unwrap

func (e *RPIPhaseExecutionError) Unwrap() error

type RPIPhaseExecutionRequest

type RPIPhaseExecutionRequest struct {
	Root                string
	JobID               string
	RunID               string
	Goal                string
	EpicID              string
	ExecutionPacketPath string
	ParentRunJobID      string
	StartPhase          int
	MaxPhase            int
	Phase               int
	PhaseName           string
	Attempt             int
	Backend             RPIBackend
	GasCityCityName     string
	GasCitySessionAlias string
	PhaseTimeout        time.Duration
	Prompt              string
	Progress            RPIPhaseProgressFunc
}

type RPIPhaseExecutionResult

type RPIPhaseExecutionResult struct {
	Status              string            `json:"status,omitempty"`
	Artifacts           map[string]string `json:"artifacts,omitempty"`
	RequestIDs          map[string]string `json:"request_ids,omitempty"`
	GasCityCityName     string            `json:"gascity_city_name,omitempty"`
	GasCitySessionID    string            `json:"gascity_session_id,omitempty"`
	GasCitySessionAlias string            `json:"gascity_session_alias,omitempty"`
	EventCursor         string            `json:"event_cursor,omitempty"`
	EvidencePath        string            `json:"evidence_path,omitempty"`
}

type RPIPhaseExecutor

type RPIPhaseExecutor interface {
	ExecuteRPIPhase(context.Context, RPIPhaseExecutionRequest) (RPIPhaseExecutionResult, error)
}

type RPIPhaseJobSpec

type RPIPhaseJobSpec struct {
	SchemaVersion       int        `json:"schema_version"`
	JobType             JobType    `json:"job_type"`
	RunID               string     `json:"run_id"`
	Goal                string     `json:"goal"`
	EpicID              string     `json:"epic_id,omitempty"`
	ParentRunJobID      string     `json:"parent_run_job_id,omitempty"`
	ExecutionPacketPath string     `json:"execution_packet_path,omitempty"`
	Phase               int        `json:"phase"`
	PhaseName           string     `json:"phase_name"`
	Attempt             int        `json:"attempt,omitempty"`
	Backend             RPIBackend `json:"backend"`
	GasCityCityName     string     `json:"gascity_city_name,omitempty"`
	GasCitySessionAlias string     `json:"gascity_session_alias,omitempty"`
	PhaseTimeout        string     `json:"phase_timeout,omitempty"`
}

func NewRPIPhaseJobSpec

func NewRPIPhaseJobSpec(runID, goal string, phase int) RPIPhaseJobSpec

func RPIPhaseJobSpecFromPayload

func RPIPhaseJobSpecFromPayload(payload map[string]any) (RPIPhaseJobSpec, error)

func (RPIPhaseJobSpec) ToJobSpec

func (spec RPIPhaseJobSpec) ToJobSpec(jobID string) (JobSpec, error)

func (RPIPhaseJobSpec) Validate

func (spec RPIPhaseJobSpec) Validate() error

type RPIPhaseProgress

type RPIPhaseProgress struct {
	Phase     int
	Status    string
	Artifacts map[string]string
}

type RPIPhaseProgressFunc

type RPIPhaseProgressFunc func(context.Context, RPIPhaseProgress) error

type RPIPromptBuilder

type RPIPromptBuilder interface {
	BuildRPIPrompt(RPIPhaseExecutionRequest) (string, error)
}

type RPIPromptBuilderFunc

type RPIPromptBuilderFunc func(RPIPhaseExecutionRequest) (string, error)

func (RPIPromptBuilderFunc) BuildRPIPrompt

func (f RPIPromptBuilderFunc) BuildRPIPrompt(req RPIPhaseExecutionRequest) (string, error)

type RPIReconcileJob

type RPIReconcileJob struct {
	JobID          string             `json:"job_id"`
	RunID          string             `json:"run_id,omitempty"`
	Phase          int                `json:"phase,omitempty"`
	JobStatus      JobStatus          `json:"job_status"`
	Status         RPIReconcileStatus `json:"status"`
	ProviderStatus ProviderStatus     `json:"provider_status,omitempty"`
	CityName       string             `json:"city_name,omitempty"`
	SessionID      string             `json:"session_id,omitempty"`
	SessionAlias   string             `json:"session_alias,omitempty"`
	EventCursor    string             `json:"event_cursor,omitempty"`
	EvidencePath   string             `json:"evidence_path,omitempty"`
	FailureCode    FailureCode        `json:"failure_code,omitempty"`
	RepairedLedger bool               `json:"repaired_ledger,omitempty"`
	Message        string             `json:"message,omitempty"`
}

type RPIReconcileReport

type RPIReconcileReport struct {
	Jobs      []RPIReconcileJob `json:"jobs"`
	Active    int               `json:"active"`
	Completed int               `json:"completed"`
	Lost      int               `json:"lost"`
	Failed    int               `json:"failed"`
}

type RPIReconcileStatus

type RPIReconcileStatus string
const (
	RPIReconcileActive              RPIReconcileStatus = "active"
	RPIReconcileCompleted           RPIReconcileStatus = "completed"
	RPIReconcileLost                RPIReconcileStatus = "lost"
	RPIReconcileFailed              RPIReconcileStatus = "failed"
	RPIReconcileProviderUnreachable RPIReconcileStatus = "provider_unreachable"
	RPIReconcilePending             RPIReconcileStatus = "pending"
)

type RPIReconciler

type RPIReconciler struct {
	// contains filtered or unexported fields
}

func NewRPIReconciler

func NewRPIReconciler(store *Store, opts RPIReconcilerOptions) (*RPIReconciler, error)

func (*RPIReconciler) ReconcileRPIJobs

func (r *RPIReconciler) ReconcileRPIJobs(ctx context.Context) (RPIReconcileReport, error)

type RPIReconcilerOptions

type RPIReconcilerOptions struct {
	Queue          *Queue
	GasCityClient  RPIReconcileGasCityClient
	RegistryWriter cliRPI.RunRegistryWriter
	Actor          string
}

type RPIRegistryProjection

type RPIRegistryProjection struct {
	Runs []JobProjection `json:"runs"`
}

type RPIRunExecutor

type RPIRunExecutor struct {
	// contains filtered or unexported fields
}

RPIRunExecutor is a JobExecutor for rpi.run that calls the injected runner in-process. Replaces RPICLIExecutor's shell-out path on the daemon CLI-fallback wire-up. Mirrors DreamExecutor's function-pointer pattern.

soc-bcrn.3.6 (E3.W4 sub-5a): see RPIRunRequest for context. soc-y0ct.2: emits agent-update events on phase boundaries when store != nil.

func NewRPIRunExecutor

func NewRPIRunExecutor(opts RPIRunExecutorOptions) (*RPIRunExecutor, error)

NewRPIRunExecutor constructs an RPIRunExecutor. Run and Root are required; Store/Clock/Actor are optional and gate agent-update emission.

func (*RPIRunExecutor) JobTypes

func (e *RPIRunExecutor) JobTypes() []JobType

JobTypes reports the queue job types this executor handles.

func (*RPIRunExecutor) RunJob

RunJob parses the spec, validates it (only full cycles supported), and dispatches to the injected runner. Runner-emitted artifacts are merged on top of the executor-default artifacts so per-run output paths surface to the supervisor's terminal record.

RunJob requires a non-nil ctx; callers passing nil receive a Background ctx.

type RPIRunExecutorOptions

type RPIRunExecutorOptions struct {
	Run   RPIRunFunc
	Root  string
	Store *Store
	Clock func() time.Time
	Actor string
}

RPIRunExecutorOptions configures NewRPIRunExecutor.

Store/Clock/Actor are optional. When Store is non-nil, RunJob emits agent-update ledger events on phase boundaries (phase_start before the runner; phase_complete + phase_handoff after) per soc-y0ct.2. When Store is nil, no events are emitted — preserves back-compat with sub-wave 5a tests that injected a fake runner only.

type RPIRunFunc

type RPIRunFunc func(ctx context.Context, req RPIRunRequest) (RPIRunResult, error)

RPIRunFunc executes a single rpi.run claim in-process. The cmd/ao package (agentopsd.go) supplies the implementation; this lets the daemon package stay free of cmd/ao imports. Mirrors the function-pointer pattern used by DreamExecutor.

type RPIRunJobSpec

type RPIRunJobSpec struct {
	SchemaVersion       int        `json:"schema_version"`
	JobType             JobType    `json:"job_type"`
	RunID               string     `json:"run_id"`
	Goal                string     `json:"goal"`
	EpicID              string     `json:"epic_id,omitempty"`
	ExecutionPacketPath string     `json:"execution_packet_path,omitempty"`
	StartPhase          int        `json:"start_phase"`
	MaxPhase            int        `json:"max_phase"`
	Complexity          string     `json:"complexity,omitempty"`
	TestFirst           bool       `json:"test_first"`
	Backend             RPIBackend `json:"backend"`
	GasCityCityName     string     `json:"gascity_city_name,omitempty"`
	PhaseTimeout        string     `json:"phase_timeout,omitempty"`

	// Supervisor policy fields (soc-bcrn.3.8 / E3.W4 sub-5a-fix). These let
	// daemon-submitted rpi.run jobs preserve gate enforcement and landing
	// semantics that the legacy `ao rpi loop --supervisor` shell wrapper
	// provided. All are optional; zero values mean "let the supervisor pick a
	// safe default" so older payloads stay backward compatible.
	MaxCycles      int    `json:"max_cycles,omitempty"`
	GatePolicy     string `json:"gate_policy,omitempty"`    // off | best-effort | required
	LandingPolicy  string `json:"landing_policy,omitempty"` // off | commit | sync-push
	LandingBranch  string `json:"landing_branch,omitempty"`
	BDSyncPolicy   string `json:"bd_sync_policy,omitempty"` // auto | always | never
	FailurePolicy  string `json:"failure_policy,omitempty"` // stop | continue
	KillSwitchPath string `json:"kill_switch_path,omitempty"`
}

func NewRPIRunJobSpec

func NewRPIRunJobSpec(runID, goal string) RPIRunJobSpec

func RPIRunJobSpecFromPayload

func RPIRunJobSpecFromPayload(payload map[string]any) (RPIRunJobSpec, error)

func (RPIRunJobSpec) ToJobSpec

func (spec RPIRunJobSpec) ToJobSpec(jobID string) (JobSpec, error)

func (RPIRunJobSpec) Validate

func (spec RPIRunJobSpec) Validate() error

type RPIRunRequest

type RPIRunRequest struct {
	Spec  RPIRunJobSpec
	Claim QueueLease
	Root  string
}

RPIRunRequest is the per-job input handed to the runner function injected into RPIRunExecutor. It carries the parsed RPIRunJobSpec, the original QueueLease (so the runner can correlate IDs / heartbeat metadata), and the daemon root cwd.

soc-bcrn.3.6 (E3.W4 sub-5a): introduced as part of the in-process executor swap that retires RPICLIExecutor's shell-out path. The cmd/ao package supplies the runner so the daemon stays free of cmd/ao imports.

type RPIRunResult

type RPIRunResult struct {
	Artifacts map[string]string
}

RPIRunResult is the per-job output from the injected runner. Artifacts are merged on top of the executor-default artifacts before being handed back to the supervisor.

type RPIRunner

type RPIRunner struct {
	// contains filtered or unexported fields
}

func NewRPIRunner

func NewRPIRunner(store *Store, opts RPIRunnerOptions) (*RPIRunner, error)

func (*RPIRunner) ExecuteClaim

func (r *RPIRunner) ExecuteClaim(ctx context.Context, claim QueueLease) (map[string]string, string, error)

ExecuteClaim runs the user-visible RPI work for a job that has already been claimed by some caller (the supervisor's claim path is one such caller; the operator-driven `ao rpi run` is another). It does not handle claim, heartbeat, or terminal write — those are the caller's responsibility.

func (*RPIRunner) RunNextRPIJob

func (r *RPIRunner) RunNextRPIJob(ctx context.Context) (RPIJobRunResult, error)

func (*RPIRunner) RunRPIJob

func (r *RPIRunner) RunRPIJob(ctx context.Context, jobID string) (RPIJobRunResult, error)

type RPIRunnerOptions

type RPIRunnerOptions struct {
	Queue             *Queue
	Executor          RPIPhaseExecutor
	RegistryWriter    cliRPI.RunRegistryWriter
	Actor             string
	HeartbeatInterval time.Duration
	PromptBuilder     RPIPromptBuilder
}

type ReadOnlyEventsResponse

type ReadOnlyEventsResponse struct {
	Events      []LedgerEvent   `json:"events"`
	Corrupt     []CorruptRecord `json:"corrupt,omitempty"`
	LastEventID string          `json:"last_event_id,omitempty"`
}

type ReadOnlyHealthResponse

type ReadOnlyHealthResponse struct {
	Status   string `json:"status"`
	Daemon   string `json:"daemon"`
	ReadOnly bool   `json:"read_only"`
	Now      string `json:"now"`
}

type ReadOnlyReadyResponse

type ReadOnlyReadyResponse struct {
	Ready              bool                 `json:"ready"`
	LedgerReplayStatus SnapshotReplayStatus `json:"ledger_replay_status"`
	ProjectionStatus   ProjectionStatus     `json:"projection_status"`
	ProjectionLag      ProjectionLag        `json:"projection_lag"`
	DegradedReasons    []string             `json:"degraded_reasons,omitempty"`
}

type ReadOnlyServer

type ReadOnlyServer struct {
	// contains filtered or unexported fields
}

type ReadOnlyStatusResponse

type ReadOnlyStatusResponse struct {
	Ready         bool          `json:"ready"`
	ProjectionLag ProjectionLag `json:"projection_lag"`
	Queue         QueueSnapshot `json:"queue"`
	Projections   ProjectionSet `json:"projections"`
}

type RealClock

type RealClock struct{}

RealClock is the production Clock backed by the standard library.

func (RealClock) After

func (RealClock) After(d time.Duration) <-chan time.Time

func (RealClock) Now

func (RealClock) Now() time.Time

type RecurrenceBackpressure

type RecurrenceBackpressure struct {
	SkipIfRunning bool `json:"skip_if_running"`
	MaxQueueDepth int  `json:"max_queue_depth"`
}

RecurrenceBackpressure controls how the supervisor handles in-flight schedules.

type RecurrenceSupervisor

type RecurrenceSupervisor struct {
	// contains filtered or unexported fields
}

RecurrenceSupervisor ticks RecurringJobTemplate schedules on cron cadence and submits jobs into the daemon Queue. One Start() goroutine per supervisor.

func NewRecurrenceSupervisor

func NewRecurrenceSupervisor(store *Store, queue *Queue, clock Clock) *RecurrenceSupervisor

NewRecurrenceSupervisor builds a supervisor bound to the given store + queue. Production callers pass RealClock{}; tests pass a *FakeClock from clock_test.

func (*RecurrenceSupervisor) PollInterval

func (s *RecurrenceSupervisor) PollInterval() time.Duration

func (*RecurrenceSupervisor) Start

func (s *RecurrenceSupervisor) Start(ctx context.Context) error

Start runs the supervisor loop until ctx is cancelled. It re-reads the schedule list from store on every iteration to pick up adds/deletes.

func (*RecurrenceSupervisor) WithPollInterval

func (s *RecurrenceSupervisor) WithPollInterval(d time.Duration) *RecurrenceSupervisor

WithPollInterval overrides the cadence at which Start() re-reads the schedule list and runs tick(). Defaults to 1 minute (matches cron's 5-field minimum granularity, amendment B4).

type RecurringJobTemplate

type RecurringJobTemplate struct {
	Name         string                 `json:"name"`
	Cron         string                 `json:"cron"`
	JobType      JobType                `json:"job_type"`
	Payload      json.RawMessage        `json:"payload,omitempty"`
	Timeout      time.Duration          `json:"timeout,omitempty"`
	Backpressure RecurrenceBackpressure `json:"backpressure"`
}

RecurringJobTemplate is a schedule entry that materializes a Job on each cron tick.

func ScheduleStateFromEvents

func ScheduleStateFromEvents(events []LedgerEvent) []RecurringJobTemplate

ScheduleStateFromEvents reduces a ledger event slice to the active schedule list. Insertion order is preserved (the slice key is creation time). schedule.deleted removes the entry by name. Unknown event types are ignored — schedule reduction only cares about its own vocabulary.

Exposed for tests + projection helpers in projections.go.

type ReplayResult

type ReplayResult struct {
	Events  []LedgerEvent   `json:"events"`
	Corrupt []CorruptRecord `json:"corrupt,omitempty"`
}

type RequestID

type RequestID string

type RoutingAuthority

type RoutingAuthority string
const (
	RoutingAuthorityObserve       RoutingAuthority = "OBSERVE"
	RoutingAuthorityAdvisory      RoutingAuthority = "ADVISORY"
	RoutingAuthorityDelegated     RoutingAuthority = "DELEGATED"
	RoutingAuthorityAuthoritative RoutingAuthority = "AUTHORITATIVE"
)

type RoutingLane

type RoutingLane struct {
	ID                 string                `json:"id"`
	Enabled            bool                  `json:"enabled"`
	Authority          RoutingAuthority      `json:"authority"`
	Provider           string                `json:"provider"`
	Runtime            string                `json:"runtime"`
	Model              string                `json:"model"`
	TaskClasses        []string              `json:"task_classes"`
	MaxConcurrency     int                   `json:"max_concurrency"`
	CostHintUSDPerHour float64               `json:"cost_hint_usd_per_hour,omitempty"`
	LatencyHint        string                `json:"latency_hint,omitempty"`
	QualityPrior       string                `json:"quality_prior,omitempty"`
	YieldGate          *RoutingYieldGate     `json:"yield_gate,omitempty"`
	PromotionGate      *RoutingPromotionGate `json:"promotion_gate,omitempty"`
	MergeEligibility   *RoutingMergeGate     `json:"merge_eligibility,omitempty"`
	DisabledReason     string                `json:"disabled_reason,omitempty"`
}

type RoutingMergeGate

type RoutingMergeGate struct {
	ManualMergeRequired               bool      `json:"manual_merge_required"`
	ValidationCommands                []string  `json:"validation_commands"`
	ValidationFailureTerminalEvent    EventType `json:"validation_failure_terminal_event"`
	RetainArtifactsOnFailure          bool      `json:"retain_artifacts_on_failure"`
	RetainWorktreeOnValidationFailure bool      `json:"retain_worktree_on_validation_failure"`
}

type RoutingPolicy

type RoutingPolicy struct {
	SchemaVersion        int           `json:"schema_version"`
	PolicyID             string        `json:"policy_id"`
	DefaultLane          string        `json:"default_lane"`
	MaxTotalConcurrency  int           `json:"max_total_concurrency"`
	AutoMergeEnabled     bool          `json:"auto_merge_enabled"`
	Lanes                []RoutingLane `json:"lanes"`
	ManualMergeByDefault bool          `json:"manual_merge_by_default"`
}

func DefaultFactoryRoutingPolicy

func DefaultFactoryRoutingPolicy() RoutingPolicy

func ParseRoutingPolicyJSON

func ParseRoutingPolicyJSON(data []byte) (RoutingPolicy, error)

func (RoutingPolicy) LaneByID

func (policy RoutingPolicy) LaneByID(id string) (RoutingLane, bool)

func (RoutingPolicy) SelectLane

func (policy RoutingPolicy) SelectLane(taskClass string) (RoutingLane, error)

type RoutingPromotionGate

type RoutingPromotionGate struct {
	RequiresYieldEvidence bool `json:"requires_yield_evidence"`
}

type RoutingYieldGate

type RoutingYieldGate struct {
	MinAcceptedPatchesPerHour float64 `json:"min_accepted_patches_per_hour"`
	MinSampleSize             int     `json:"min_sample_size"`
}

type ServerOptions

type ServerOptions struct {
	Now            func() time.Time
	SourceLedger   string
	QueueOptions   QueueOptions
	MutationPolicy MutationPolicy
}

type ServiceInstallPlan

type ServiceInstallPlan struct {
	ServiceName string   `json:"service_name"`
	Platform    string   `json:"platform"`
	DryRun      bool     `json:"dry_run"`
	Executable  string   `json:"executable"`
	RepoRoot    string   `json:"repo_root"`
	Address     string   `json:"address"`
	Args        []string `json:"args"`
	UnitPath    string   `json:"unit_path,omitempty"`
}

func BuildServiceInstallPlan

func BuildServiceInstallPlan(repoRoot, executable, address string, dryRun bool) ServiceInstallPlan

type SkillInvokeExecutor

type SkillInvokeExecutor struct {
	// contains filtered or unexported fields
}

func (*SkillInvokeExecutor) JobTypes

func (e *SkillInvokeExecutor) JobTypes() []JobType

func (*SkillInvokeExecutor) RunJob

type SkillInvokeExecutorOptions

type SkillInvokeExecutorOptions struct {
	Root string
	Run  SkillInvokeFunc
}

type SkillInvokeJobSpec

type SkillInvokeJobSpec struct {
	SchemaVersion int      `json:"schema_version"`
	JobType       JobType  `json:"job_type"`
	SkillName     string   `json:"skill_name"`
	Args          []string `json:"args,omitempty"`
}

func NewSkillInvokeJobSpec

func NewSkillInvokeJobSpec(skillName string, args []string) SkillInvokeJobSpec

func SkillInvokeJobSpecFromPayload

func SkillInvokeJobSpecFromPayload(payload map[string]any) (SkillInvokeJobSpec, error)

func (SkillInvokeJobSpec) ToJobSpec

func (spec SkillInvokeJobSpec) ToJobSpec(jobID string) (JobSpec, error)

func (SkillInvokeJobSpec) Validate

func (spec SkillInvokeJobSpec) Validate() error

type SkillInvokeRequest

type SkillInvokeRequest struct {
	Spec  SkillInvokeJobSpec
	Claim QueueLease
	Root  string
}

type SkillInvokeResult

type SkillInvokeResult struct {
	Artifacts map[string]string
}

type SnapshotConsumerResult

type SnapshotConsumerResult string
const (
	SnapshotServe              SnapshotConsumerResult = "serve_snapshot"
	SnapshotCompatibilityError SnapshotConsumerResult = "compatibility_error"
	SnapshotProjectionMissing  SnapshotConsumerResult = "projection_missing"
	SnapshotProjectionDegraded SnapshotConsumerResult = "projection_degraded"
)

type SnapshotProjectionInput

type SnapshotProjectionInput struct {
	ReplayStatus     SnapshotReplayStatus
	FileExists       bool
	VersionSupported bool
}

type SnapshotReplayStatus

type SnapshotReplayStatus string
const (
	SnapshotReplayComplete SnapshotReplayStatus = "complete"
	SnapshotReplayCorrupt  SnapshotReplayStatus = "corrupt"
)

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store persists daemon ledger events to ~/<root>/.agents/daemon/ledger.jsonl. Concurrent writers are serialized through s.mu; readers (Replay*) operate on committed file contents and do not contend for the lock — O_APPEND atomicity covers per-line consistency.

func NewStore

func NewStore(root string) *Store

func (*Store) AppendLedgerEvent

func (s *Store) AppendLedgerEvent(event LedgerEvent) (LedgerEvent, error)

func (*Store) DeleteSchedule

func (s *Store) DeleteSchedule(name string) error

DeleteSchedule removes a schedule by appending a schedule.deleted event. Idempotent: deleting a non-existent schedule is a no-op (logs a warning but returns nil) so callers don't have to guard against races.

func (*Store) Dir

func (s *Store) Dir() string

func (*Store) LedgerArchivePaths

func (s *Store) LedgerArchivePaths() ([]string, error)

LedgerArchivePaths returns the rotated ledger archives in chronological order (oldest first). Each path is one of ledger.<ts>.jsonl[.gz]; the active ledger.jsonl is excluded.

func (*Store) LedgerHealth

func (s *Store) LedgerHealth(now time.Time, thresholds LedgerHealthThresholds) (LedgerHealth, error)

LedgerHealth gathers ledger-durability facts for the doctor surface. now must be supplied (caller's clock) so tests can be deterministic. thresholds uses LedgerHealthDefaultThresholds when its LedgerSizeWarnRatio is zero.

func (*Store) LedgerPath

func (s *Store) LedgerPath() string

func (*Store) ListProjectionSnapshots

func (s *Store) ListProjectionSnapshots() ([]string, error)

ListProjectionSnapshots returns every snapshot file under the snapshot dir, sorted chronologically (oldest first). Both .json and .json.tmp leftovers from a crashed write are excluded; only completed snapshot-<ts>.json files are returned.

func (*Store) ListSchedules

func (s *Store) ListSchedules() ([]RecurringJobTemplate, error)

ListSchedules returns the current set of recurring schedules derived from ledger replay. A schedule is present iff its most recent schedule.created has not been followed by a matching schedule.deleted. Order matches the order schedules were created (stable, from the ledger).

func (*Store) LoadLatestProjectionSnapshot

func (s *Store) LoadLatestProjectionSnapshot() (ProjectionSet, string, error)

LoadLatestProjectionSnapshot finds the newest snapshot by filename ordering and json-decodes it. Returns (zero ProjectionSet, "", nil) when no snapshot exists — callers must distinguish empty-set from absent via the path.

func (*Store) ProjectionSnapshotDir

func (s *Store) ProjectionSnapshotDir() string

ProjectionSnapshotDir returns the on-disk location for projection snapshots. The dir is created lazily by WriteProjectionSnapshot.

func (*Store) QuarantineDir

func (s *Store) QuarantineDir() string

func (*Store) ReadLedger

func (s *Store) ReadLedger() ([]LedgerEvent, error)

func (*Store) RebuildProjections

func (s *Store) RebuildProjections(opts ProjectionRebuildOptions) (ProjectionSet, error)

RebuildProjections rebuilds the projection set by replaying the store's ledger and folding events through the package-level RebuildProjections.

Callers MUST check err before using the returned ProjectionSet. On error, the returned set is zero-valued: SchemaVersion == 0, RebuiltAt == "", Manifests == nil, and Jobs/Schedules/derived buckets are nil. Reading any map field (e.g., set.Manifests[name]) on a zero-valued set returns the zero value, but mutating those nil maps (e.g., set.Manifests[name] = ...) WILL panic. The bug-hunt audit (W-B-22 / soc-58q5.7) confirmed all in-tree callers (server.go readState, projections_test.go) check err first; this godoc is a guard for future callers.

func (*Store) RebuildRPIRegistryProjection

func (s *Store) RebuildRPIRegistryProjection() (DaemonRPIRegistryProjection, error)

func (*Store) RecordScheduleFired

func (s *Store) RecordScheduleFired(name, submissionID string, tickAt time.Time) error

RecordScheduleFired appends a schedule.fired event linking a recurrence tick to the submission_id of the job it enqueued. Used by the recurrence supervisor (soc-8inr.4).

func (*Store) RecordScheduleSkipped

func (s *Store) RecordScheduleSkipped(name, reason string, tickAt time.Time) error

RecordScheduleSkipped appends a schedule.skipped event when the supervisor elects not to enqueue a tick (e.g., backpressure: SkipIfRunning, MaxQueueDepth exceeded). reason is free-form so future backpressure strategies can record their own labels without a schema change.

func (*Store) ReplayLedger

func (s *Store) ReplayLedger() (ReplayResult, error)

func (*Store) ReplayLedgerReadOnly

func (s *Store) ReplayLedgerReadOnly() (ReplayResult, error)

func (*Store) SaveSchedule

func (s *Store) SaveSchedule(t RecurringJobTemplate) error

SaveSchedule persists a recurring job template by appending a schedule.created event to the ledger. Returns an error if a schedule with the same Name already exists (no upsert; callers must DeleteSchedule first to replace).

The full RecurringJobTemplate is serialized into the event payload under the "template" key so replay can reconstruct the schedule list.

func (*Store) WithLedgerMaxBytes

func (s *Store) WithLedgerMaxBytes(n int64) *Store

WithLedgerMaxBytes overrides the rotation threshold and returns the store for fluent configuration. Pass <=0 to disable rotation entirely.

func (*Store) WriteProjectionSnapshot

func (s *Store) WriteProjectionSnapshot(set ProjectionSet) (string, error)

WriteProjectionSnapshot writes the ProjectionSet to a timestamped file under the snapshot dir using a temp-file + atomic rename so concurrent readers never observe a partial JSON document. Returns the absolute path written.

type SubmitJobInput

type SubmitJobInput struct {
	RequestID      RequestID
	JobID          string
	JobType        JobType
	IdempotencyKey string
	Actor          string
	Payload        map[string]any
}

type SubmitJobRequest

type SubmitJobRequest struct {
	RequestID      string         `json:"request_id,omitempty"`
	JobID          string         `json:"job_id,omitempty"`
	JobType        JobType        `json:"job_type"`
	IdempotencyKey string         `json:"idempotency_key,omitempty"`
	Payload        map[string]any `json:"payload,omitempty"`
}

type SubmitJobResponse

type SubmitJobResponse struct {
	Accepted         bool             `json:"accepted"`
	RequestID        string           `json:"request_id"`
	JobID            string           `json:"job_id"`
	Status           JobStatus        `json:"status"`
	LastEventID      string           `json:"last_event_id,omitempty"`
	ProjectionStatus ProjectionStatus `json:"projection_status"`
	ProjectionLag    ProjectionLag    `json:"projection_lag"`
	DegradedReasons  []string         `json:"degraded_reasons,omitempty"`
	IdempotencyKey   string           `json:"idempotency_key,omitempty"`
}

type Supervisor

type Supervisor struct {
	// contains filtered or unexported fields
}

Supervisor claims queue jobs, runs executors, and records terminal state.

func NewSupervisor

func NewSupervisor(opts SupervisorOptions) (*Supervisor, error)

NewSupervisor builds a queue supervisor from explicit executors.

func (*Supervisor) RunLoop

func (s *Supervisor) RunLoop(ctx context.Context) error

RunLoop claims and executes supported jobs until the context is cancelled.

func (*Supervisor) RunOnce

RunOnce attempts to claim and execute one supported job.

type SupervisorOptions

type SupervisorOptions struct {
	Queue             *Queue
	Executors         []JobExecutor
	Actor             string
	PollInterval      time.Duration
	HeartbeatInterval time.Duration
	ExecutionTimeout  time.Duration
}

SupervisorOptions configures a daemon queue supervisor.

type SupervisorRunOnceResult

type SupervisorRunOnceResult struct {
	Claimed bool
	Job     QueueJobState
}

SupervisorRunOnceResult reports one supervisor claim attempt.

type WikiForgeExecutor

type WikiForgeExecutor struct {
	// contains filtered or unexported fields
}

func NewWikiForgeExecutor

func NewWikiForgeExecutor(opts WikiForgeExecutorOptions) (*WikiForgeExecutor, error)

func (*WikiForgeExecutor) JobTypes

func (e *WikiForgeExecutor) JobTypes() []JobType

func (*WikiForgeExecutor) RunJob

type WikiForgeExecutorOptions

type WikiForgeExecutorOptions struct {
	Store         *Store
	Worker        WikiForgeWorker
	QuarantineDir string
}

type WikiForgeJobRunResult

type WikiForgeJobRunResult struct {
	JobID          string                 `json:"job_id"`
	DreamRunID     string                 `json:"dream_run_id,omitempty"`
	Status         JobStatus              `json:"status"`
	Artifacts      map[string]string      `json:"artifacts,omitempty"`
	ArtifactRefs   map[string]ArtifactRef `json:"artifact_refs,omitempty"`
	WorkerSessions []WikiWorkerSessionRef `json:"worker_sessions,omitempty"`
	Failure        *JobFailure            `json:"failure,omitempty"`
}

type WikiForgeJobSpec

type WikiForgeJobSpec struct {
	SchemaVersion int                    `json:"schema_version"`
	JobType       JobType                `json:"job_type"`
	DreamRunID    string                 `json:"dream_run_id,omitempty"`
	SourcePaths   []string               `json:"source_paths"`
	OutputDir     string                 `json:"output_dir"`
	WorkerKind    agentworker.WorkerKind `json:"worker_kind"`
	Provider      agentworker.Provider   `json:"provider"`
	Model         string                 `json:"model,omitempty"`
	CWD           string                 `json:"cwd,omitempty"`
	MaxAttempts   int                    `json:"max_attempts,omitempty"`
	QuarantineDir string                 `json:"quarantine_dir,omitempty"`
}

func NewWikiForgeJobSpec

func NewWikiForgeJobSpec(dreamRunID, outputDir string, sourcePaths []string) WikiForgeJobSpec

func WikiForgeJobSpecFromPayload

func WikiForgeJobSpecFromPayload(payload map[string]any) (WikiForgeJobSpec, error)

func (WikiForgeJobSpec) ToJobSpec

func (spec WikiForgeJobSpec) ToJobSpec(jobID string) (JobSpec, error)

func (WikiForgeJobSpec) Validate

func (spec WikiForgeJobSpec) Validate() error

type WikiForgeRunner

type WikiForgeRunner struct {
	// contains filtered or unexported fields
}

func NewWikiForgeRunner

func NewWikiForgeRunner(store *Store, opts WikiForgeRunnerOptions) (*WikiForgeRunner, error)

func (*WikiForgeRunner) RunWikiForgeJob

func (r *WikiForgeRunner) RunWikiForgeJob(ctx context.Context, jobID string) (WikiForgeJobRunResult, error)

type WikiForgeRunnerOptions

type WikiForgeRunnerOptions struct {
	Queue         *Queue
	Worker        WikiForgeWorker
	Actor         string
	QuarantineDir string
}

type WikiForgeWorker

type WikiForgeWorker interface {
	RunExtractionWithRetry(context.Context, wikiworker.ExtractionRequest, wikiworker.RetryOptions) (wikiworker.ExtractionResult, error)
}

type WikiJobsProjection

type WikiJobsProjection struct {
	Jobs []JobProjection `json:"jobs"`
}

type WikiWorkerSessionRef

type WikiWorkerSessionRef struct {
	SourcePath string                    `json:"source_path"`
	Session    agentworker.SessionRef    `json:"session"`
	Terminal   agentworker.TerminalState `json:"terminal"`
}

type WorkerKindDistribution

type WorkerKindDistribution struct {
	WorkerKind string `json:"worker_kind"`
	Count      int    `json:"count"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL