Documentation
¶
Index ¶
- Constants
- Variables
- func AuthorizeMutation(r *http.Request, policy MutationPolicy) error
- func CanTransitionJobStatus(from, to JobStatus) bool
- func DefaultMutationPathCapabilities() map[string]MutationCapability
- func FormatLedgerTelemetrySummary(telemetry LedgerTelemetry) string
- func LoadMutationTokenFile(path string) (string, error)
- func MutationRoutesForTest() []string
- func NewDaemonRouter(store *Store, opts ServerOptions) http.Handler
- func NewDbBdSource(dsn string) (*dbBdSource, error)
- func NewReadOnlyRouter(store *Store, opts ServerOptions) http.Handler
- func ParseCron(expr string) (cron.Schedule, error)
- func RPIPhaseName(phase int) string
- func ValidateDaemonPlansProjection(projection DaemonPlansProjection) error
- func ValidateDreamJobSpec(job JobSpec) error
- func ValidateDreamMode(mode DreamMode) error
- func ValidateDreamStage(stage DreamStage) error
- func ValidateEventType(value EventType) error
- func ValidateFactoryAdmissionMode(mode FactoryAdmissionMode) error
- func ValidateFactoryCIStatus(status FactoryCIStatus) error
- func ValidateFactoryDigestPolicy(policy FactoryDigestPolicy) error
- func ValidateFactoryHandoffKind(kind FactoryHandoffKind) error
- func ValidateFactoryLandingPolicy(policy FactoryLandingPolicy) error
- func ValidateFactoryMergeDecision(value FactoryMergeDecision) error
- func ValidateFactorySlotStatus(value FactorySlotStatus) error
- func ValidateFactoryTargetType(targetType FactoryTargetType) error
- func ValidateFactoryUnknownEvidencePolicy(policy FactoryUnknownEvidencePolicy) error
- func ValidateFactoryValidationStatus(value FactoryValidationStatus) error
- func ValidateFailureCode(value FailureCode) error
- func ValidateJobResultStatus(value JobResultStatus) error
- func ValidateJobStatus(value JobStatus) error
- func ValidateJobType(value JobType) error
- func ValidateLeaseState(value LeaseState) error
- func ValidateLedgerEvent(event LedgerEvent) error
- func ValidateLocalBindAddress(addr string) error
- func ValidateRPIBackend(backend RPIBackend) error
- func ValidateRPIJobSpec(job JobSpec) error
- func ValidateRPIRegistryProjection(projection DaemonRPIRegistryProjection) error
- func ValidateRecurringJobTemplatePayload(t RecurringJobTemplate) error
- func ValidateRequestID(value string) error
- func ValidateRoutingAuthority(authority RoutingAuthority) error
- func ValidateRoutingPolicy(policy RoutingPolicy) error
- func WriteDaemonPlansProjection(root string, projection DaemonPlansProjection) (string, error)
- func WriteRPIRegistryProjection(root string, projection DaemonRPIRegistryProjection, ...) error
- type AgentUpdateCriterionVerdict
- type AgentUpdatePhaseComplete
- type AgentUpdatePhaseHandoff
- type AgentUpdatePhaseStart
- type ArtifactRef
- type ArtifactStoreOptions
- type CancelJobInput
- type CancelJobOutcome
- type CancelJobRequest
- type CancelJobResponse
- type CancelJobResult
- type Clock
- type CompleteJobInput
- type ContentAddressedArtifactStore
- type CorruptRecord
- type CreateScheduleResponse
- type CronParseError
- type DaemonPlansProjection
- type DaemonRPIRegistryProjection
- type DefaultRPIPromptBuilder
- type DeleteScheduleResponse
- type DreamExecutor
- type DreamExecutorOptions
- type DreamMode
- type DreamRunJobSpec
- type DreamRunLoopFunc
- type DreamRunLoopOptions
- type DreamRunLoopResult
- type DreamRunsProjection
- type DreamStage
- type DreamStageEntry
- type DreamStageJobSpec
- type DreamStageManifest
- type EventType
- type FactoryAdmissionDecision
- type FactoryAdmissionEvaluator
- type FactoryAdmissionEvidenceProvider
- type FactoryAdmissionEvidenceProviderFactory
- type FactoryAdmissionExecutor
- type FactoryAdmissionExecutorOptions
- type FactoryAdmissionJobSpec
- type FactoryAdmissionMode
- type FactoryAdmissionProjection
- type FactoryCIBaselineEvidence
- type FactoryCIStatus
- type FactoryDecisionEvidence
- type FactoryDigestPolicy
- type FactoryEventRef
- type FactoryHandoff
- type FactoryHandoffKind
- type FactoryJobProjection
- type FactoryJobStatus
- type FactoryLandingPolicy
- type FactoryLocalPilotJobSpec
- type FactoryMainCIBaseline
- type FactoryMergeDecision
- type FactoryMergeDecisionProjection
- type FactoryModelLaneProjection
- type FactoryOpenPRBlocker
- type FactoryPRBlockerMatrix
- type FactoryPointer
- type FactoryQueueLaneProjection
- type FactoryRepoState
- type FactoryRoutingDecisionProjection
- type FactorySlotProjection
- type FactorySlotStatus
- type FactoryStatusProjection
- type FactoryTarget
- type FactoryTargetType
- type FactoryTerminalProjection
- type FactoryUnknownEvidencePolicy
- type FactoryValidationProjection
- type FactoryValidationStatus
- type FactoryWorkOrder
- type FactoryWorkerProjection
- type FactoryWorktreeProjection
- type FailJobInput
- type FailureCode
- type GasCityClientAdapter
- func (a GasCityClientAdapter) CityReadiness(ctx context.Context, cityName string) (gascity.ReadinessResponse, error)
- func (a GasCityClientAdapter) CreateSession(ctx context.Context, cityName string, req gascity.SessionCreateRequest) (gascity.Session, gascity.ResponseMeta, error)
- func (a GasCityClientAdapter) GetSession(ctx context.Context, cityName string, id string, ...) (gascity.Session, gascity.ResponseMeta, error)
- func (a GasCityClientAdapter) SessionTranscript(ctx context.Context, cityName string, id string, ...) (gascity.TranscriptResponse, gascity.ResponseMeta, error)
- func (a GasCityClientAdapter) StreamCityEvents(ctx context.Context, cityName string, opts gascity.EventStreamOptions) (GasCityRPIEventStream, gascity.ResponseMeta, error)
- func (a GasCityClientAdapter) SubmitSession(ctx context.Context, cityName string, id string, ...) (gascity.SessionSubmitResponse, gascity.ResponseMeta, error)
- type GasCityRPIClient
- type GasCityRPIEventStream
- type GasCityRPIPhaseExecutor
- type HeartbeatInput
- type JobExecutionResult
- type JobExecutor
- type JobFailure
- type JobProjection
- type JobResult
- type JobResultStatus
- type JobSpec
- type JobStatus
- type JobStatusProjectionInput
- type JobType
- type JobTypeFailureRateSummary
- type LeaseState
- type LedgerEvent
- type LedgerEventInput
- func NewAgentUpdateCriterionVerdictEvent(p AgentUpdateCriterionVerdict) LedgerEventInput
- func NewAgentUpdatePhaseCompleteEvent(p AgentUpdatePhaseComplete) LedgerEventInput
- func NewAgentUpdatePhaseHandoffEvent(p AgentUpdatePhaseHandoff) LedgerEventInput
- func NewAgentUpdatePhaseStartEvent(p AgentUpdatePhaseStart) LedgerEventInput
- type LedgerHealth
- type LedgerHealthThresholds
- type LedgerTelemetry
- type ListSchedulesResponse
- type LocalFactoryAdmissionEvidenceProvider
- func (p LocalFactoryAdmissionEvidenceProvider) MainCIBaseline(context.Context) (FactoryCIBaselineEvidence, error)
- func (p LocalFactoryAdmissionEvidenceProvider) OpenPRBlockers(context.Context, []string) (FactoryPRBlockerMatrix, error)
- func (p LocalFactoryAdmissionEvidenceProvider) RepoState(ctx context.Context) (FactoryRepoState, error)
- type MutationCapability
- type MutationDecision
- type MutationPolicy
- type MutationPolicyProvider
- type MutationToken
- type OpenClawResources
- type OpenClawSnapshotProjection
- type PhaseLatencyHistogram
- type PlansBdSource
- type PlansProjectionEntry
- type PlansProjectionExecutor
- type PlansProjectionExecutorOptions
- type PlansProjectionJobSpec
- type PlansProjectionRefreshTrigger
- type ProjectionLag
- type ProjectionManifest
- type ProjectionName
- type ProjectionRebuildOptions
- type ProjectionSet
- type ProjectionSource
- type ProjectionStatus
- type ProviderProjectionInput
- type ProviderStatus
- type Queue
- func (q *Queue) CancelJob(input CancelJobInput, opts QueueMutationOptions) (CancelJobResult, error)
- func (q *Queue) ClaimJob(jobID, actor string, opts QueueMutationOptions) (QueueLease, error)
- func (q *Queue) ClaimNext(actor string, opts QueueMutationOptions) (QueueLease, error)
- func (q *Queue) ClaimNextMatching(actor string, match func(QueueJobState) bool, opts QueueMutationOptions) (QueueLease, error)
- func (q *Queue) CompleteJob(input CompleteJobInput, opts QueueMutationOptions) (QueueJobState, error)
- func (q *Queue) FailJob(input FailJobInput, opts QueueMutationOptions) (QueueJobState, error)
- func (q *Queue) Heartbeat(input HeartbeatInput, opts QueueMutationOptions) (QueueJobState, error)
- func (q *Queue) Snapshot() (QueueSnapshot, error)
- func (q *Queue) SubmitJob(input SubmitJobInput, opts QueueMutationOptions) (QueueJobState, error)
- type QueueFailpoint
- type QueueJobState
- type QueueLease
- type QueueMutationOptions
- type QueueOptions
- type QueueSnapshot
- type RPIBackend
- type RPIJobExecutor
- type RPIJobExecutorOptions
- type RPIJobRunResult
- type RPIPhaseExecutionError
- type RPIPhaseExecutionRequest
- type RPIPhaseExecutionResult
- type RPIPhaseExecutor
- type RPIPhaseJobSpec
- type RPIPhaseProgress
- type RPIPhaseProgressFunc
- type RPIPromptBuilder
- type RPIPromptBuilderFunc
- type RPIReconcileGasCityClient
- type RPIReconcileJob
- type RPIReconcileReport
- type RPIReconcileStatus
- type RPIReconciler
- type RPIReconcilerOptions
- type RPIRegistryProjection
- type RPIRunExecutor
- type RPIRunExecutorOptions
- type RPIRunFunc
- type RPIRunJobSpec
- type RPIRunRequest
- type RPIRunResult
- type RPIRunner
- type RPIRunnerOptions
- type ReadOnlyEventsResponse
- type ReadOnlyHealthResponse
- type ReadOnlyReadyResponse
- type ReadOnlyServer
- type ReadOnlyStatusResponse
- type RealClock
- type RecurrenceBackpressure
- type RecurrenceSupervisor
- type RecurringJobTemplate
- type ReplayResult
- type RequestID
- type RoutingAuthority
- type RoutingLane
- type RoutingMergeGate
- type RoutingPolicy
- type RoutingPromotionGate
- type RoutingYieldGate
- type ServerOptions
- type ServiceInstallPlan
- type SkillInvokeExecutor
- type SkillInvokeExecutorOptions
- type SkillInvokeFunc
- type SkillInvokeJobSpec
- type SkillInvokeRequest
- type SkillInvokeResult
- type SnapshotConsumerResult
- type SnapshotProjectionInput
- type SnapshotReplayStatus
- type Store
- func (s *Store) AppendLedgerEvent(event LedgerEvent) (LedgerEvent, error)
- func (s *Store) DeleteSchedule(name string) error
- func (s *Store) Dir() string
- func (s *Store) LedgerArchivePaths() ([]string, error)
- func (s *Store) LedgerHealth(now time.Time, thresholds LedgerHealthThresholds) (LedgerHealth, error)
- func (s *Store) LedgerPath() string
- func (s *Store) ListProjectionSnapshots() ([]string, error)
- func (s *Store) ListSchedules() ([]RecurringJobTemplate, error)
- func (s *Store) LoadLatestProjectionSnapshot() (ProjectionSet, string, error)
- func (s *Store) ProjectionSnapshotDir() string
- func (s *Store) QuarantineDir() string
- func (s *Store) ReadLedger() ([]LedgerEvent, error)
- func (s *Store) RebuildProjections(opts ProjectionRebuildOptions) (ProjectionSet, error)
- func (s *Store) RebuildRPIRegistryProjection() (DaemonRPIRegistryProjection, error)
- func (s *Store) RecordScheduleFired(name, submissionID string, tickAt time.Time) error
- func (s *Store) RecordScheduleSkipped(name, reason string, tickAt time.Time) error
- func (s *Store) ReplayLedger() (ReplayResult, error)
- func (s *Store) ReplayLedgerReadOnly() (ReplayResult, error)
- func (s *Store) SaveSchedule(t RecurringJobTemplate) error
- func (s *Store) WithLedgerMaxBytes(n int64) *Store
- func (s *Store) WriteProjectionSnapshot(set ProjectionSet) (string, error)
- type SubmitJobInput
- type SubmitJobRequest
- type SubmitJobResponse
- type Supervisor
- type SupervisorOptions
- type SupervisorRunOnceResult
- type WikiForgeExecutor
- type WikiForgeExecutorOptions
- type WikiForgeJobRunResult
- type WikiForgeJobSpec
- type WikiForgeRunner
- type WikiForgeRunnerOptions
- type WikiForgeWorker
- type WikiJobsProjection
- type WikiWorkerSessionRef
- type WorkerKindDistribution
Constants ¶
const ( FactoryAdmissionReasonEvidenceProviderMissing = "evidence_provider_missing" FactoryAdmissionReasonExpiredWorkOrder = "expired_work_order" FactoryAdmissionReasonFutureGeneratedAt = "future_generated_at" FactoryAdmissionReasonRepoStateUnknown = "repo_state_unknown" FactoryAdmissionReasonRepoHeadUnknown = "repo_head_unknown" FactoryAdmissionReasonBaseSHAMismatch = "base_sha_mismatch" FactoryAdmissionReasonDirtyWorktree = "dirty_worktree" FactoryAdmissionReasonTrackedAgents = "tracked_agents" FactoryAdmissionReasonOpenPREvidenceUnknown = "open_pr_evidence_unknown" FactoryAdmissionReasonOpenPROverlap = "open_pr_overlap" FactoryAdmissionReasonMainCIUnknown = "main_ci_unknown" FactoryAdmissionReasonMainCIRed = "main_ci_red" )
const ( // ProjectionSnapshotDirName lives under .agents/daemon/. ProjectionSnapshotDirName = "projections" // ProjectionSnapshotPrefix matches files of the form snapshot-<ts>.json so // the timestamp encoded in the filename gives a stable chronological sort. ProjectionSnapshotPrefix = "snapshot-" ProjectionSnapshotSuffix = ".json" )
const ( StoreDirRel = ".agents/daemon" LedgerFileName = "ledger.jsonl" QuarantineDirName = "quarantine" LedgerSchemaVersion = 1 // DefaultLedgerMaxBytes is the size threshold at which ledger.jsonl is // rotated. Operators may override via Store.WithLedgerMaxBytes. DefaultLedgerMaxBytes int64 = 50 * 1024 * 1024 // MaxLedgerLineBytes caps the size of a single ledger event line on // replay. Lines that exceed this are recorded as corrupt and skipped, so // a single oversized record cannot panic the daemon at startup. Paired // with the submission-time cap in server.go (MaxJobSubmissionBytes); the // replay cap is intentionally higher because daemon-emitted events can // be larger than client-supplied job payloads. MaxLedgerLineBytes = 16 * 1024 * 1024 )
const AgentUpdateVersion = 1
AgentUpdateVersion is the wire-version stamped onto every agent-update event payload. Consumers (projections, transcripts, downstream judges) use it to pin parsing to the schema in schemas/agent-update.schema.json.
soc-y0ct.1 (UW7-1 of soc-bcrn): introduced the agent-update protocol surface.
const DaemonPlansProjectionSchemaVersion = 1
DaemonPlansProjectionSchemaVersion is the schema version for the daemon-side plans manifest projection.
const (
DefaultMutationTokenHeader = "X-AgentOps-Daemon-Token" // #nosec G101 -- HTTP header name, not a credential.
)
const DefaultTelemetryWindow = 24 * time.Hour
const DreamJobSpecSchemaVersion = 1
const FactoryAdmissionJobSpecSchemaVersion = 1
const MaxJobSubmissionBytes = 1 * 1024 * 1024
MaxJobSubmissionBytes caps how large a /v1/jobs (and /v1/jobs/cancel) request body may be. Submissions larger than this return 413 instead of being decoded, which prevents oversized job payloads from making it into the append-only ledger as events that would later overflow the replay reader. Paired with MaxLedgerLineBytes in store.go (replay tolerates larger lines than submission so daemon-emitted events have headroom).
const PlansProjectionJobSpecSchemaVersion = 1
PlansProjectionJobSpecSchemaVersion is the schema version for PlansProjectionJobSpec round-trips. Bumped when the spec shape changes.
const ProjectionSchemaVersion = 1
const RPIJobSpecSchemaVersion = 1
const RoutingPolicySchemaVersion = 1
const SkillInvokeJobSpecSchemaVersion = 1
const WikiJobSpecSchemaVersion = 1
Variables ¶
var ( ErrMutationDenied = errors.New("daemon mutation denied") ErrUnsafeBindAddress = errors.New("daemon bind address is not loopback") ErrUnsafeTokenFileMode = errors.New("daemon mutation token file mode is too permissive") ErrMutationTokenMissing = errors.New("daemon mutation token is missing") )
var ( ErrNoClaimableJobs = errors.New("daemon queue: no claimable jobs") ErrJobAlreadyClaimed = errors.New("daemon queue: job already claimed") ErrClaimFenceMismatch = errors.New("daemon queue: claim token or lease epoch mismatch") ErrLeaseExpired = errors.New("daemon queue: lease expired") ErrJobNotFound = errors.New("daemon queue: job not found") ErrFailpoint = errors.New("daemon queue failpoint") )
var ( ErrRPIExecutorRequired = errors.New("daemon rpi runner: executor is required") ErrNoRPIJobs = errors.New("daemon rpi runner: no claimable RPI jobs") )
Functions ¶
func AuthorizeMutation ¶
func AuthorizeMutation(r *http.Request, policy MutationPolicy) error
func CanTransitionJobStatus ¶
func DefaultMutationPathCapabilities ¶
func DefaultMutationPathCapabilities() map[string]MutationCapability
func FormatLedgerTelemetrySummary ¶
func FormatLedgerTelemetrySummary(telemetry LedgerTelemetry) string
func LoadMutationTokenFile ¶
func MutationRoutesForTest ¶
func MutationRoutesForTest() []string
MutationRoutesForTest returns a copy of the registered mutation paths. Used by TestMutation_AllRegisteredRoutesEnforcePolicy and similar parity checks to assert every tracked route enforces auth.
func NewDaemonRouter ¶
func NewDaemonRouter(store *Store, opts ServerOptions) http.Handler
func NewDbBdSource ¶
NewDbBdSource opens a connection pool for plans.projection bd queries. The caller owns Close(). dsn examples:
- "root:@tcp(127.0.0.1:3306)/bushido" (from bushido itself)
- "root:@tcp(100.109.17.108:3306)/bushido" (from Mac via tailnet)
func NewReadOnlyRouter ¶
func NewReadOnlyRouter(store *Store, opts ServerOptions) http.Handler
func ParseCron ¶
ParseCron validates a cron expression using the 5-field standard with descriptors (e.g., "0 3 * * *", "@daily"). 6-field expressions with seconds are rejected per pre-mortem amendment B4 (DoS protection: prevents sub-minute schedules).
func RPIPhaseName ¶
func ValidateDaemonPlansProjection ¶
func ValidateDaemonPlansProjection(projection DaemonPlansProjection) error
ValidateDaemonPlansProjection returns a descriptive error if the projection shape is malformed. Used by replay paths and tests.
func ValidateDreamJobSpec ¶
func ValidateDreamMode ¶
func ValidateDreamStage ¶
func ValidateDreamStage(stage DreamStage) error
func ValidateEventType ¶
func ValidateFactoryAdmissionMode ¶
func ValidateFactoryAdmissionMode(mode FactoryAdmissionMode) error
func ValidateFactoryCIStatus ¶
func ValidateFactoryCIStatus(status FactoryCIStatus) error
func ValidateFactoryDigestPolicy ¶
func ValidateFactoryDigestPolicy(policy FactoryDigestPolicy) error
func ValidateFactoryHandoffKind ¶
func ValidateFactoryHandoffKind(kind FactoryHandoffKind) error
func ValidateFactoryLandingPolicy ¶
func ValidateFactoryLandingPolicy(policy FactoryLandingPolicy) error
func ValidateFactoryMergeDecision ¶
func ValidateFactoryMergeDecision(value FactoryMergeDecision) error
func ValidateFactorySlotStatus ¶
func ValidateFactorySlotStatus(value FactorySlotStatus) error
func ValidateFactoryTargetType ¶
func ValidateFactoryTargetType(targetType FactoryTargetType) error
func ValidateFactoryUnknownEvidencePolicy ¶
func ValidateFactoryUnknownEvidencePolicy(policy FactoryUnknownEvidencePolicy) error
func ValidateFactoryValidationStatus ¶
func ValidateFactoryValidationStatus(value FactoryValidationStatus) error
func ValidateFailureCode ¶
func ValidateFailureCode(value FailureCode) error
func ValidateJobResultStatus ¶
func ValidateJobResultStatus(value JobResultStatus) error
func ValidateJobStatus ¶
func ValidateJobType ¶
func ValidateLeaseState ¶
func ValidateLeaseState(value LeaseState) error
func ValidateLedgerEvent ¶
func ValidateLedgerEvent(event LedgerEvent) error
func ValidateRPIBackend ¶
func ValidateRPIBackend(backend RPIBackend) error
func ValidateRPIJobSpec ¶
func ValidateRPIRegistryProjection ¶
func ValidateRPIRegistryProjection(projection DaemonRPIRegistryProjection) error
func ValidateRecurringJobTemplatePayload ¶
func ValidateRecurringJobTemplatePayload(t RecurringJobTemplate) error
ValidateRecurringJobTemplatePayload applies the same defaults the recurrence supervisor applies at fire time, then validates the materialized job payload.
func ValidateRequestID ¶
func ValidateRoutingAuthority ¶
func ValidateRoutingAuthority(authority RoutingAuthority) error
func ValidateRoutingPolicy ¶
func ValidateRoutingPolicy(policy RoutingPolicy) error
func WriteDaemonPlansProjection ¶
func WriteDaemonPlansProjection(root string, projection DaemonPlansProjection) (string, error)
WriteDaemonPlansProjection writes the plans manifest snapshot atomically: the entries are serialised to a JSONL file under root, written to a tmp file in the same directory, then renamed to the final path. Concurrent writers in the same OutputDir are protected externally via the manifest.lock file lock added by atom-3 (G3); the write call here is single-writer-safe under that contract.
func WriteRPIRegistryProjection ¶
func WriteRPIRegistryProjection(root string, projection DaemonRPIRegistryProjection, writer cliRPI.RunRegistryWriter) error
Types ¶
type AgentUpdateCriterionVerdict ¶
type AgentUpdateCriterionVerdict struct {
CriterionID string `json:"criterion_id"`
Status string `json:"status"`
EvidencePath string `json:"evidence_path,omitempty"`
Notes string `json:"notes,omitempty"`
RunID string `json:"run_id"`
Timestamp string `json:"timestamp"`
}
AgentUpdateCriterionVerdict payloads carry a per-criterion judgement. Status is one of "PASS" | "FAIL" | "SKIP". Mirrors $defs/criterion_verdict in schemas/agent-update.schema.json.
type AgentUpdatePhaseComplete ¶
type AgentUpdatePhaseComplete struct {
PhaseName string `json:"phase_name"`
RunID string `json:"run_id"`
Timestamp string `json:"timestamp"`
Status string `json:"status"`
DurationMs int64 `json:"duration_ms,omitempty"`
Artifacts map[string]string `json:"artifacts,omitempty"`
}
AgentUpdatePhaseComplete payloads mark the terminal boundary of a phase. Status is one of "success" | "failure" | "timeout". Mirrors $defs/phase_complete in schemas/agent-update.schema.json.
type AgentUpdatePhaseHandoff ¶
type AgentUpdatePhaseHandoff struct {
FromPhase string `json:"from_phase"`
ToPhase string `json:"to_phase"`
RunID string `json:"run_id"`
Timestamp string `json:"timestamp"`
PacketPath string `json:"packet_path,omitempty"`
}
AgentUpdatePhaseHandoff payloads describe a transition between two phases of the same run. Mirrors $defs/phase_handoff in schemas/agent-update.schema.json.
type AgentUpdatePhaseStart ¶
type AgentUpdatePhaseStart struct {
PhaseName string `json:"phase_name"`
RunID string `json:"run_id"`
Timestamp string `json:"timestamp"`
Metadata map[string]any `json:"metadata,omitempty"`
}
AgentUpdatePhaseStart payloads mark the beginning of a named RPI phase. Mirrors $defs/phase_start in schemas/agent-update.schema.json.
type ArtifactRef ¶
type ArtifactRef struct {
Path string `json:"path"`
SHA256 string `json:"sha256"`
Size int64 `json:"size"`
WrittenAt string `json:"written_at"`
}
ArtifactRef is the ledger-resident identity for an artifact stored by content hash. Path is repository-relative so replay can reconstruct the compatibility artifact map without consulting local runtime state.
func (ArtifactRef) Validate ¶
func (r ArtifactRef) Validate() error
type ArtifactStoreOptions ¶
type CancelJobInput ¶
CancelJobInput identifies a job cancellation request.
type CancelJobOutcome ¶
type CancelJobOutcome string
CancelJobOutcome reports how a cancellation request affected a job.
const ( // CancelJobOutcomeCancelled means the queue appended a cancellation event. CancelJobOutcomeCancelled CancelJobOutcome = "cancelled" // CancelJobOutcomeAlreadyTerminalCompleted means the job was already completed. CancelJobOutcomeAlreadyTerminalCompleted CancelJobOutcome = "already_terminal_completed" // CancelJobOutcomeAlreadyTerminalFailed means the job was already failed. CancelJobOutcomeAlreadyTerminalFailed CancelJobOutcome = "already_terminal_failed" // CancelJobOutcomeAlreadyTerminalCancelled means the job was already cancelled. CancelJobOutcomeAlreadyTerminalCancelled CancelJobOutcome = "already_terminal_cancelled" )
type CancelJobRequest ¶
type CancelJobResponse ¶
type CancelJobResponse struct {
Cancelled bool `json:"cancelled"`
Outcome CancelJobOutcome `json:"outcome"`
Job QueueJobState `json:"job"`
ProjectionStatus ProjectionStatus `json:"projection_status"`
ProjectionLag ProjectionLag `json:"projection_lag"`
DegradedReasons []string `json:"degraded_reasons,omitempty"`
}
type CancelJobResult ¶
type CancelJobResult struct {
Job QueueJobState `json:"job"`
Outcome CancelJobOutcome `json:"outcome"`
}
CancelJobResult returns the job state and cancellation outcome.
type Clock ¶
Clock abstracts time for testability. The recurrence supervisor uses Clock so tests can drive ticks deterministically with a fake clock.
type CompleteJobInput ¶
type ContentAddressedArtifactStore ¶
type ContentAddressedArtifactStore struct {
// contains filtered or unexported fields
}
func NewContentAddressedArtifactStore ¶
func NewContentAddressedArtifactStore(root string, opts ArtifactStoreOptions) *ContentAddressedArtifactStore
func (*ContentAddressedArtifactStore) PutBytes ¶
func (s *ContentAddressedArtifactStore) PutBytes(data []byte) (ArtifactRef, error)
type CorruptRecord ¶
type CreateScheduleResponse ¶
type CreateScheduleResponse struct {
Name string `json:"name"`
}
CreateScheduleResponse is the body of a successful POST /v1/schedules.
type CronParseError ¶
CronParseError preserves the operator's original input for actionable errors.
func (*CronParseError) Error ¶
func (e *CronParseError) Error() string
type DaemonPlansProjection ¶
type DaemonPlansProjection struct {
SchemaVersion int `json:"schema_version"`
ProjectID string `json:"project_id,omitempty"`
IssuePrefix string `json:"issue_prefix,omitempty"`
Entries []PlansProjectionEntry `json:"entries"`
LastEventID string `json:"last_event_id,omitempty"`
RebuiltAt string `json:"rebuilt_at,omitempty"`
}
DaemonPlansProjection is the daemon-side wrapper around plans manifest state. Mirrors the DaemonRPIRegistryProjection shape (rpi_registry.go:9): Entries + LastEventID + a SchemaVersion stamp.
func RebuildDaemonPlansProjection ¶
func RebuildDaemonPlansProjection(events []LedgerEvent) (DaemonPlansProjection, error)
RebuildDaemonPlansProjection folds the ledger event slice into the latest plans projection state. Currently the projection is fully rebuilt by the executor on each subscription tick; ledger events are advisory (last-event cursor + degraded-flag carry). atom-2 does NOT replay-build entries from events because plans.projection is a pull-from-bd projection, not an event-sourced one — the source of truth is bd, not the daemon ledger.
type DaemonRPIRegistryProjection ¶
type DaemonRPIRegistryProjection struct {
States []cliRPI.RunRegistryState `json:"states"`
LastEventID string `json:"last_event_id,omitempty"`
}
func RebuildRPIRegistryProjection ¶
func RebuildRPIRegistryProjection(events []LedgerEvent) (DaemonRPIRegistryProjection, error)
type DefaultRPIPromptBuilder ¶
type DefaultRPIPromptBuilder struct{}
func (DefaultRPIPromptBuilder) BuildRPIPrompt ¶
func (DefaultRPIPromptBuilder) BuildRPIPrompt(req RPIPhaseExecutionRequest) (string, error)
type DeleteScheduleResponse ¶
DeleteScheduleResponse is the body of a successful DELETE /v1/schedules/{name}.
type DreamExecutor ¶
type DreamExecutor struct {
// contains filtered or unexported fields
}
func NewDreamExecutor ¶
func NewDreamExecutor(opts DreamExecutorOptions) (*DreamExecutor, error)
func (*DreamExecutor) JobTypes ¶
func (e *DreamExecutor) JobTypes() []JobType
func (*DreamExecutor) RunJob ¶
func (e *DreamExecutor) RunJob(ctx context.Context, claim QueueLease) (JobExecutionResult, error)
RunJob requires a non-nil ctx; callers passing nil will panic on first use.
type DreamExecutorOptions ¶
type DreamExecutorOptions struct {
Cwd string
RunLoop DreamRunLoopFunc
Now func() time.Time
}
type DreamRunJobSpec ¶
type DreamRunJobSpec struct {
SchemaVersion int `json:"schema_version"`
JobType JobType `json:"job_type"`
DreamRunID string `json:"dream_run_id"`
Goal string `json:"goal,omitempty"`
Mode DreamMode `json:"mode"`
OutputDir string `json:"output_dir"`
MaxIterations int `json:"max_iterations,omitempty"`
ExecutionTimeout string `json:"execution_timeout,omitempty"`
}
func DreamRunJobSpecFromPayload ¶
func DreamRunJobSpecFromPayload(payload map[string]any) (DreamRunJobSpec, error)
func NewDreamRunJobSpec ¶
func NewDreamRunJobSpec(dreamRunID, outputDir string) DreamRunJobSpec
func (DreamRunJobSpec) ToJobSpec ¶
func (spec DreamRunJobSpec) ToJobSpec(jobID string) (JobSpec, error)
func (DreamRunJobSpec) Validate ¶
func (spec DreamRunJobSpec) Validate() error
type DreamRunLoopFunc ¶
type DreamRunLoopFunc func(context.Context, DreamRunLoopOptions) (DreamRunLoopResult, error)
type DreamRunLoopOptions ¶
type DreamRunLoopResult ¶
type DreamRunsProjection ¶
type DreamRunsProjection struct {
Runs []JobProjection `json:"runs"`
}
type DreamStage ¶
type DreamStage string
const ( DreamStageIngest DreamStage = "ingest" DreamStageReduce DreamStage = "reduce" DreamStageMeasure DreamStage = "measure" DreamStageCommit DreamStage = "commit" DreamStageReport DreamStage = "report" )
type DreamStageEntry ¶
type DreamStageEntry struct {
Stage DreamStage `json:"stage"`
JobID string `json:"job_id,omitempty"`
IterationID string `json:"iteration_id,omitempty"`
Required bool `json:"required"`
}
type DreamStageJobSpec ¶
type DreamStageJobSpec struct {
SchemaVersion int `json:"schema_version"`
JobType JobType `json:"job_type"`
DreamRunID string `json:"dream_run_id"`
IterationID string `json:"iteration_id,omitempty"`
Iteration int `json:"iteration,omitempty"`
Stage DreamStage `json:"stage"`
Mode DreamMode `json:"mode"`
OutputDir string `json:"output_dir"`
CheckpointDir string `json:"checkpoint_dir,omitempty"`
ParentJobID string `json:"parent_job_id,omitempty"`
}
func DreamStageJobSpecFromPayload ¶
func DreamStageJobSpecFromPayload(payload map[string]any) (DreamStageJobSpec, error)
func NewDreamStageJobSpec ¶
func NewDreamStageJobSpec(dreamRunID, outputDir string, stage DreamStage) DreamStageJobSpec
func (DreamStageJobSpec) ToJobSpec ¶
func (spec DreamStageJobSpec) ToJobSpec(jobID string) (JobSpec, error)
func (DreamStageJobSpec) Validate ¶
func (spec DreamStageJobSpec) Validate() error
type DreamStageManifest ¶
type DreamStageManifest struct {
SchemaVersion int `json:"schema_version"`
DreamRunID string `json:"dream_run_id"`
Mode DreamMode `json:"mode"`
OutputDir string `json:"output_dir"`
Stages []DreamStageEntry `json:"stages"`
Metadata map[string]string `json:"metadata,omitempty"`
}
func DefaultDreamStageManifest ¶
func DefaultDreamStageManifest(dreamRunID, outputDir string) DreamStageManifest
func (DreamStageManifest) Validate ¶
func (manifest DreamStageManifest) Validate() error
type EventType ¶
type EventType string
const ( EventScheduleCreated EventType = "schedule.created" EventScheduleFired EventType = "schedule.fired" EventScheduleSkipped EventType = "schedule.skipped" EventScheduleDeleted EventType = "schedule.deleted" )
Schedule event types are additive event-type vocabulary (LedgerSchemaVersion stays at 1). Older daemon binaries replaying ledgers with these events must skip-and-log instead of erroring — see projections.go's reducer for the forward-compat contract (pre-mortem amendment B3).
const ( EventJobAccepted EventType = "job.accepted" EventJobClaimed EventType = "job.claimed" EventJobHeartbeat EventType = "job.heartbeat" EventJobLeaseExpired EventType = "job.lease_expired" EventJobCompleted EventType = "job.completed" EventJobFailed EventType = "job.failed" EventJobCancelled EventType = "job.cancelled" EventProjectionMarkedStale EventType = "projection.marked_stale" EventProjectionRebuilt EventType = "projection.rebuilt" EventFactoryAdmissionDecided EventType = "factory.admission_decided" EventFactoryJobSubmitted EventType = "factory.job_submitted" EventFactoryJobClaimed EventType = "factory.job_claimed" EventFactoryJobStarted EventType = "factory.job_started" EventFactoryRoutingDecided EventType = "factory.routing_decided" EventFactorySlotAllocated EventType = "factory.slot_allocated" EventFactoryWorktreeAllocated EventType = "factory.worktree_allocated" EventFactoryValidationStarted EventType = "factory.validation_started" EventFactoryValidationCompleted EventType = "factory.validation_completed" EventFactoryMergeDecision EventType = "factory.merge_decision" EventFactoryJobTerminal EventType = "factory.job_terminal" EventFactoryYieldObservation EventType = "factory.yield_observation" // Agent-update events: phase-boundary messages emitted by daemon-mode RPI // runners (soc-y0ct). Schema: schemas/agent-update.schema.json. EventAgentUpdatePhaseStart EventType = "agent_update.phase_start" EventAgentUpdatePhaseComplete EventType = "agent_update.phase_complete" EventAgentUpdateCriterionVerdict EventType = "agent_update.criterion_verdict" EventAgentUpdatePhaseHandoff EventType = "agent_update.phase_handoff" )
type FactoryAdmissionDecision ¶
type FactoryAdmissionDecision struct {
SchemaVersion int `json:"schema_version"`
WorkOrderID string `json:"work_order_id"`
RunID string `json:"run_id"`
EvaluatedAt string `json:"evaluated_at"`
Allowed bool `json:"allowed"`
Reasons []string `json:"reasons"`
LandingPolicy FactoryLandingPolicy `json:"landing_policy"`
DigestPolicy FactoryDigestPolicy `json:"digest_policy"`
ChildJobID string `json:"child_job_id,omitempty"`
ArtifactRefs map[string]string `json:"artifact_refs,omitempty"`
Evidence FactoryDecisionEvidence `json:"evidence"`
}
func (FactoryAdmissionDecision) Validate ¶
func (decision FactoryAdmissionDecision) Validate() error
type FactoryAdmissionEvaluator ¶
type FactoryAdmissionEvaluator struct {
Clock func() time.Time
Evidence FactoryAdmissionEvidenceProvider
}
func (FactoryAdmissionEvaluator) EvaluateAdmission ¶
func (e FactoryAdmissionEvaluator) EvaluateAdmission(ctx context.Context, spec FactoryAdmissionJobSpec) (FactoryAdmissionDecision, error)
func (FactoryAdmissionEvaluator) EvaluateLocalPilot ¶
func (e FactoryAdmissionEvaluator) EvaluateLocalPilot(ctx context.Context, spec FactoryLocalPilotJobSpec) (FactoryAdmissionDecision, error)
type FactoryAdmissionEvidenceProvider ¶
type FactoryAdmissionEvidenceProvider interface {
RepoState(context.Context) (FactoryRepoState, error)
OpenPRBlockers(context.Context, []string) (FactoryPRBlockerMatrix, error)
MainCIBaseline(context.Context) (FactoryCIBaselineEvidence, error)
}
type FactoryAdmissionEvidenceProviderFactory ¶
type FactoryAdmissionEvidenceProviderFactory func(FactoryWorkOrder) FactoryAdmissionEvidenceProvider
type FactoryAdmissionExecutor ¶
type FactoryAdmissionExecutor struct {
// contains filtered or unexported fields
}
func NewFactoryAdmissionExecutor ¶
func NewFactoryAdmissionExecutor(opts FactoryAdmissionExecutorOptions) (*FactoryAdmissionExecutor, error)
func (*FactoryAdmissionExecutor) JobTypes ¶
func (e *FactoryAdmissionExecutor) JobTypes() []JobType
func (*FactoryAdmissionExecutor) RunJob ¶
func (e *FactoryAdmissionExecutor) RunJob(ctx context.Context, claim QueueLease) (JobExecutionResult, error)
type FactoryAdmissionJobSpec ¶
type FactoryAdmissionJobSpec struct {
SchemaVersion int `json:"schema_version"`
JobType JobType `json:"job_type"`
RunID string `json:"run_id"`
Mode FactoryAdmissionMode `json:"mode"`
WorkOrder FactoryWorkOrder `json:"work_order"`
Handoff FactoryHandoff `json:"handoff,omitempty"`
}
func FactoryAdmissionJobSpecFromPayload ¶
func FactoryAdmissionJobSpecFromPayload(payload map[string]any) (FactoryAdmissionJobSpec, error)
func NewFactoryAdmissionJobSpec ¶
func NewFactoryAdmissionJobSpec(runID string, workOrder FactoryWorkOrder) FactoryAdmissionJobSpec
func (FactoryAdmissionJobSpec) ToJobSpec ¶
func (spec FactoryAdmissionJobSpec) ToJobSpec(jobID string) (JobSpec, error)
func (FactoryAdmissionJobSpec) Validate ¶
func (spec FactoryAdmissionJobSpec) Validate() error
type FactoryAdmissionMode ¶
type FactoryAdmissionMode string
const ( FactoryAdmissionModeAdmissionOnly FactoryAdmissionMode = "admission-only" FactoryAdmissionModeRPIHandoff FactoryAdmissionMode = "rpi-handoff" )
type FactoryAdmissionProjection ¶
type FactoryAdmissionProjection struct {
JobID string `json:"job_id"`
RunID string `json:"run_id,omitempty"`
WorkOrderID string `json:"work_order_id"`
Allowed bool `json:"allowed"`
Reasons []string `json:"reasons,omitempty"`
LandingPolicy FactoryLandingPolicy `json:"landing_policy,omitempty"`
DigestPolicy FactoryDigestPolicy `json:"digest_policy,omitempty"`
ChildJobID string `json:"child_job_id,omitempty"`
Artifacts map[string]string `json:"artifacts,omitempty"`
Evidence FactoryDecisionEvidence `json:"evidence"`
DecidedAt string `json:"decided_at,omitempty"`
LastEventID string `json:"last_event_id,omitempty"`
}
type FactoryCIBaselineEvidence ¶
type FactoryCIBaselineEvidence struct {
Known bool
Baseline FactoryMainCIBaseline
}
type FactoryCIStatus ¶
type FactoryCIStatus string
const ( FactoryCIStatusGreen FactoryCIStatus = "green" FactoryCIStatusRed FactoryCIStatus = "red" FactoryCIStatusUnknown FactoryCIStatus = "unknown" )
type FactoryDecisionEvidence ¶
type FactoryDecisionEvidence struct {
BaseSHA string `json:"base_sha"`
OpenPRBlockerCount int `json:"open_pr_blocker_count"`
MainCIStatus FactoryCIStatus `json:"main_ci_status"`
Stale bool `json:"stale,omitempty"`
}
func (FactoryDecisionEvidence) Validate ¶
func (evidence FactoryDecisionEvidence) Validate() error
type FactoryDigestPolicy ¶
type FactoryDigestPolicy string
const (
FactoryDigestPolicyRequired FactoryDigestPolicy = "required"
)
type FactoryEventRef ¶
type FactoryEventRef struct {
EventID string `json:"event_id"`
EventType EventType `json:"event_type"`
JobID string `json:"job_id,omitempty"`
RunID string `json:"run_id,omitempty"`
TaskID string `json:"task_id,omitempty"`
SlotID string `json:"slot_id,omitempty"`
WorktreeID string `json:"worktree_id,omitempty"`
ValidationID string `json:"validation_id,omitempty"`
OccurredAt string `json:"occurred_at,omitempty"`
}
type FactoryHandoff ¶
type FactoryHandoff struct {
Kind FactoryHandoffKind `json:"kind,omitempty"`
ExecutionPacketPath string `json:"execution_packet_path,omitempty"`
EpicID string `json:"epic_id,omitempty"`
}
func (FactoryHandoff) Validate ¶
func (handoff FactoryHandoff) Validate() error
type FactoryHandoffKind ¶
type FactoryHandoffKind string
const ( FactoryHandoffNone FactoryHandoffKind = "none" FactoryHandoffRPI FactoryHandoffKind = "rpi.run" )
type FactoryJobProjection ¶
type FactoryJobProjection struct {
JobID string `json:"job_id"`
RunID string `json:"run_id,omitempty"`
TaskID string `json:"task_id,omitempty"`
RequestedBy string `json:"requested_by,omitempty"`
Objective string `json:"objective,omitempty"`
LaneID string `json:"lane_id,omitempty"`
Provider string `json:"provider,omitempty"`
Runtime string `json:"runtime,omitempty"`
Model string `json:"model,omitempty"`
Authority RoutingAuthority `json:"authority,omitempty"`
Status FactoryJobStatus `json:"status"`
SubmittedAt string `json:"submitted_at,omitempty"`
UpdatedAt string `json:"updated_at,omitempty"`
LastEventID string `json:"last_event_id,omitempty"`
}
type FactoryJobStatus ¶
type FactoryJobStatus string
const ( FactoryJobStatusSubmitted FactoryJobStatus = "submitted" FactoryJobStatusAdmitted FactoryJobStatus = "admitted" FactoryJobStatusAdmissionBlocked FactoryJobStatus = "admission_blocked" FactoryJobStatusClaimed FactoryJobStatus = "claimed" FactoryJobStatusStarted FactoryJobStatus = "started" FactoryJobStatusRouted FactoryJobStatus = "routed" FactoryJobStatusAllocated FactoryJobStatus = "allocated" FactoryJobStatusValidating FactoryJobStatus = "validating" FactoryJobStatusValidated FactoryJobStatus = "validated" FactoryJobStatusValidationFailed FactoryJobStatus = "validation_failed" FactoryJobStatusAwaitingManualMerge FactoryJobStatus = "awaiting_manual_merge" FactoryJobStatusTerminal FactoryJobStatus = "terminal" FactoryJobStatusRetainedFailed FactoryJobStatus = "retained_failed" )
type FactoryLandingPolicy ¶
type FactoryLandingPolicy string
const ( FactoryLandingPolicyOff FactoryLandingPolicy = "off" FactoryLandingPolicyManualPR FactoryLandingPolicy = "manual_pr" )
type FactoryLocalPilotJobSpec ¶
type FactoryLocalPilotJobSpec struct {
SchemaVersion int `json:"schema_version"`
JobType JobType `json:"job_type"`
RunID string `json:"run_id"`
Mode FactoryAdmissionMode `json:"mode"`
WorkOrder FactoryWorkOrder `json:"work_order"`
Handoff FactoryHandoff `json:"handoff,omitempty"`
MaxCycles int `json:"max_cycles,omitempty"`
}
func FactoryLocalPilotJobSpecFromPayload ¶
func FactoryLocalPilotJobSpecFromPayload(payload map[string]any) (FactoryLocalPilotJobSpec, error)
func NewFactoryLocalPilotJobSpec ¶
func NewFactoryLocalPilotJobSpec(runID string, workOrder FactoryWorkOrder) FactoryLocalPilotJobSpec
func (FactoryLocalPilotJobSpec) ToJobSpec ¶
func (spec FactoryLocalPilotJobSpec) ToJobSpec(jobID string) (JobSpec, error)
func (FactoryLocalPilotJobSpec) Validate ¶
func (spec FactoryLocalPilotJobSpec) Validate() error
type FactoryMainCIBaseline ¶
type FactoryMainCIBaseline struct {
Status FactoryCIStatus `json:"status"`
RunID string `json:"run_id,omitempty"`
CheckedAt string `json:"checked_at"`
FailedJobs []string `json:"failed_jobs,omitempty"`
}
func (FactoryMainCIBaseline) Validate ¶
func (baseline FactoryMainCIBaseline) Validate() error
type FactoryMergeDecision ¶
type FactoryMergeDecision string
const ( FactoryMergeDecisionNotRequested FactoryMergeDecision = "not_requested" FactoryMergeDecisionManualPending FactoryMergeDecision = "manual_pending" FactoryMergeDecisionManualMerged FactoryMergeDecision = "manual_merged" FactoryMergeDecisionRejected FactoryMergeDecision = "rejected" FactoryMergeDecisionAbandoned FactoryMergeDecision = "abandoned" )
type FactoryMergeDecisionProjection ¶
type FactoryMergeDecisionProjection struct {
JobID string `json:"job_id"`
RunID string `json:"run_id,omitempty"`
SlotID string `json:"slot_id,omitempty"`
Decision FactoryMergeDecision `json:"decision"`
Decider string `json:"decider,omitempty"`
Reason string `json:"reason,omitempty"`
Conflicts []string `json:"conflicts,omitempty"`
ManualCommand string `json:"manual_command,omitempty"`
DecidedAt string `json:"decided_at,omitempty"`
LastEventID string `json:"last_event_id,omitempty"`
}
type FactoryModelLaneProjection ¶
type FactoryModelLaneProjection struct {
LaneID string `json:"lane_id"`
Provider string `json:"provider,omitempty"`
Runtime string `json:"runtime,omitempty"`
Model string `json:"model,omitempty"`
Authority RoutingAuthority `json:"authority,omitempty"`
LastReason string `json:"last_reason,omitempty"`
DisabledReason string `json:"disabled_reason,omitempty"`
LastEventID string `json:"last_event_id,omitempty"`
}
type FactoryOpenPRBlocker ¶
type FactoryOpenPRBlocker struct {
PRNumber int `json:"pr_number"`
HeadRef string `json:"head_ref"`
Files []string `json:"files"`
}
func (FactoryOpenPRBlocker) Validate ¶
func (blocker FactoryOpenPRBlocker) Validate() error
type FactoryPRBlockerMatrix ¶
type FactoryPRBlockerMatrix struct {
Known bool
Blockers []FactoryOpenPRBlocker
}
type FactoryPointer ¶
type FactoryPointer struct {
JobID string `json:"job_id,omitempty"`
RunID string `json:"run_id,omitempty"`
SlotID string `json:"slot_id,omitempty"`
Kind string `json:"kind"`
Name string `json:"name,omitempty"`
Path string `json:"path,omitempty"`
Ref string `json:"ref,omitempty"`
EventID string `json:"event_id,omitempty"`
}
type FactoryQueueLaneProjection ¶
type FactoryQueueLaneProjection struct {
LaneID string `json:"lane_id"`
Provider string `json:"provider,omitempty"`
Runtime string `json:"runtime,omitempty"`
Model string `json:"model,omitempty"`
Authority RoutingAuthority `json:"authority,omitempty"`
QueueDepth int `json:"queue_depth"`
DisabledReason string `json:"disabled_reason,omitempty"`
LastEventID string `json:"last_event_id,omitempty"`
}
type FactoryRepoState ¶
type FactoryRoutingDecisionProjection ¶
type FactoryRoutingDecisionProjection struct {
JobID string `json:"job_id"`
RunID string `json:"run_id,omitempty"`
TaskID string `json:"task_id,omitempty"`
LaneID string `json:"lane_id,omitempty"`
Provider string `json:"provider,omitempty"`
Runtime string `json:"runtime,omitempty"`
Model string `json:"model,omitempty"`
Authority RoutingAuthority `json:"authority,omitempty"`
Reason string `json:"reason,omitempty"`
DisabledReason string `json:"disabled_reason,omitempty"`
DecidedAt string `json:"decided_at,omitempty"`
LastEventID string `json:"last_event_id,omitempty"`
}
type FactorySlotProjection ¶
type FactorySlotProjection struct {
SlotID string `json:"slot_id"`
WorkerID string `json:"worker_id,omitempty"`
RunID string `json:"run_id,omitempty"`
JobID string `json:"job_id"`
TaskID string `json:"task_id,omitempty"`
LaneID string `json:"lane_id,omitempty"`
Provider string `json:"provider,omitempty"`
Runtime string `json:"runtime,omitempty"`
Model string `json:"model,omitempty"`
Authority RoutingAuthority `json:"authority,omitempty"`
Branch string `json:"branch,omitempty"`
WorktreeID string `json:"worktree_id,omitempty"`
WorktreePath string `json:"worktree_path,omitempty"`
ResourcePolicy map[string]any `json:"resource_policy,omitempty"`
LeaseEpoch int `json:"lease_epoch,omitempty"`
MaxConcurrencySnapshot int `json:"max_concurrency_snapshot,omitempty"`
Status FactorySlotStatus `json:"status"`
AllocatedAt string `json:"allocated_at,omitempty"`
UpdatedAt string `json:"updated_at,omitempty"`
LastEventID string `json:"last_event_id,omitempty"`
}
type FactorySlotStatus ¶
type FactorySlotStatus string
const ( FactorySlotStatusIdle FactorySlotStatus = "idle" FactorySlotStatusAllocated FactorySlotStatus = "allocated" FactorySlotStatusRunning FactorySlotStatus = "running" FactorySlotStatusBlockedValidation FactorySlotStatus = "blocked_validation" FactorySlotStatusAwaitingManualMerge FactorySlotStatus = "awaiting_manual_merge" FactorySlotStatusTerminal FactorySlotStatus = "terminal" FactorySlotStatusRetainedFailed FactorySlotStatus = "retained_failed" )
type FactoryStatusProjection ¶
type FactoryStatusProjection struct {
Admissions []FactoryAdmissionProjection `json:"admissions,omitempty"`
Jobs []FactoryJobProjection `json:"jobs,omitempty"`
ActiveWorkers []FactoryWorkerProjection `json:"active_workers,omitempty"`
Slots []FactorySlotProjection `json:"slots,omitempty"`
QueueLanes []FactoryQueueLaneProjection `json:"queue_lanes,omitempty"`
ModelLanes []FactoryModelLaneProjection `json:"model_lanes,omitempty"`
Validations []FactoryValidationProjection `json:"validations,omitempty"`
BlockedValidations []FactoryValidationProjection `json:"blocked_validations,omitempty"`
Worktrees []FactoryWorktreeProjection `json:"worktrees,omitempty"`
RetainedFailedWorktrees []FactoryWorktreeProjection `json:"retained_failed_worktrees,omitempty"`
MergeDecisions []FactoryMergeDecisionProjection `json:"merge_decisions,omitempty"`
PendingManualMerges []FactoryMergeDecisionProjection `json:"pending_manual_merges,omitempty"`
TerminalJobs []FactoryTerminalProjection `json:"terminal_jobs,omitempty"`
RecentEvents []FactoryEventRef `json:"recent_events,omitempty"`
Logs []FactoryPointer `json:"logs,omitempty"`
Artifacts []FactoryPointer `json:"artifacts,omitempty"`
Transcripts []FactoryPointer `json:"transcripts,omitempty"`
Diffs []FactoryPointer `json:"diffs,omitempty"`
LastRoutingDecision *FactoryRoutingDecisionProjection `json:"last_routing_decision,omitempty"`
}
type FactoryTarget ¶
type FactoryTarget struct {
Type FactoryTargetType `json:"type"`
ID string `json:"id"`
Summary string `json:"summary"`
}
func (FactoryTarget) Validate ¶
func (target FactoryTarget) Validate() error
type FactoryTargetType ¶
type FactoryTargetType string
const ( FactoryTargetGoal FactoryTargetType = "goal" FactoryTargetBead FactoryTargetType = "bead" FactoryTargetExecutionPacket FactoryTargetType = "execution_packet" )
type FactoryTerminalProjection ¶
type FactoryTerminalProjection struct {
JobID string `json:"job_id"`
RunID string `json:"run_id,omitempty"`
SlotID string `json:"slot_id,omitempty"`
Status JobStatus `json:"status"`
ArtifactRefs map[string]ArtifactRef `json:"artifact_refs,omitempty"`
TranscriptRef string `json:"transcript_ref,omitempty"`
RetainedWorktree bool `json:"retained_worktree,omitempty"`
OccurredAt string `json:"occurred_at,omitempty"`
LastEventID string `json:"last_event_id,omitempty"`
}
type FactoryUnknownEvidencePolicy ¶
type FactoryUnknownEvidencePolicy string
const ( FactoryUnknownEvidenceBlock FactoryUnknownEvidencePolicy = "block" FactoryUnknownEvidenceAllowNonMutating FactoryUnknownEvidencePolicy = "allow_non_mutating" )
type FactoryValidationProjection ¶
type FactoryValidationProjection struct {
ValidationID string `json:"validation_id"`
JobID string `json:"job_id"`
RunID string `json:"run_id,omitempty"`
SlotID string `json:"slot_id,omitempty"`
Level string `json:"level,omitempty"`
Commands []string `json:"commands,omitempty"`
Status FactoryValidationStatus `json:"status"`
Artifacts map[string]string `json:"artifacts,omitempty"`
ArtifactRefs map[string]ArtifactRef `json:"artifact_refs,omitempty"`
StartedAt string `json:"started_at,omitempty"`
CompletedAt string `json:"completed_at,omitempty"`
DurationMS int `json:"duration_ms,omitempty"`
LastEventID string `json:"last_event_id,omitempty"`
}
type FactoryValidationStatus ¶
type FactoryValidationStatus string
const ( FactoryValidationStatusRunning FactoryValidationStatus = "running" FactoryValidationStatusPassed FactoryValidationStatus = "passed" FactoryValidationStatusFailed FactoryValidationStatus = "failed" FactoryValidationStatusBlocked FactoryValidationStatus = "blocked" FactoryValidationStatusCancelled FactoryValidationStatus = "cancelled" )
type FactoryWorkOrder ¶
type FactoryWorkOrder struct {
SchemaVersion int `json:"schema_version"`
WorkOrderID string `json:"work_order_id"`
GeneratedAt string `json:"generated_at"`
ExpiresAt string `json:"expires_at"`
BaseSHA string `json:"base_sha"`
Target FactoryTarget `json:"target"`
AllowedFiles []string `json:"allowed_files"`
ValidationCommands []string `json:"validation_commands"`
LandingPolicy FactoryLandingPolicy `json:"landing_policy"`
DigestPolicy FactoryDigestPolicy `json:"digest_policy"`
OpenPRBlockers []FactoryOpenPRBlocker `json:"open_pr_blockers"`
MainCIBaseline FactoryMainCIBaseline `json:"main_ci_baseline"`
UnknownEvidencePolicy FactoryUnknownEvidencePolicy `json:"unknown_evidence_policy,omitempty"`
}
func (FactoryWorkOrder) Validate ¶
func (work FactoryWorkOrder) Validate() error
type FactoryWorkerProjection ¶
type FactoryWorkerProjection struct {
WorkerID string `json:"worker_id,omitempty"`
SlotID string `json:"slot_id"`
RunID string `json:"run_id,omitempty"`
JobID string `json:"job_id"`
TaskID string `json:"task_id,omitempty"`
LaneID string `json:"lane_id,omitempty"`
Provider string `json:"provider,omitempty"`
Runtime string `json:"runtime,omitempty"`
Model string `json:"model,omitempty"`
Authority RoutingAuthority `json:"authority,omitempty"`
Status FactorySlotStatus `json:"status"`
WorktreePath string `json:"worktree_path,omitempty"`
LastEventID string `json:"last_event_id,omitempty"`
}
type FactoryWorktreeProjection ¶
type FactoryWorktreeProjection struct {
WorktreeID string `json:"worktree_id"`
RunID string `json:"run_id,omitempty"`
JobID string `json:"job_id"`
SlotID string `json:"slot_id,omitempty"`
BaseCommit string `json:"base_commit,omitempty"`
Branch string `json:"branch,omitempty"`
Path string `json:"path,omitempty"`
CreatedAt string `json:"created_at,omitempty"`
DirtyState string `json:"dirty_state,omitempty"`
RetentionPolicy string `json:"retention_policy,omitempty"`
MergeDisposition string `json:"merge_disposition,omitempty"`
Status FactorySlotStatus `json:"status,omitempty"`
LastEventID string `json:"last_event_id,omitempty"`
}
type FailJobInput ¶
type FailJobInput struct {
JobID string
RequestID RequestID
ClaimToken string
LeaseEpoch int
Actor string
Failure JobFailure
Artifacts map[string]string
ArtifactRefs map[string]ArtifactRef
}
type FailureCode ¶
type FailureCode string
const ( FailureProviderUnreachable FailureCode = "provider_unreachable" FailureSessionPending FailureCode = "session_pending" FailureSessionLost FailureCode = "lost" FailureTerminalWithoutTranscript FailureCode = "terminal_without_transcript" FailureRequestRejected FailureCode = "request_rejected" FailureProjectionDegraded FailureCode = "projection_degraded" FailureRetryExhausted FailureCode = "retry_exhausted" )
type GasCityClientAdapter ¶
func (GasCityClientAdapter) CityReadiness ¶
func (a GasCityClientAdapter) CityReadiness(ctx context.Context, cityName string) (gascity.ReadinessResponse, error)
func (GasCityClientAdapter) CreateSession ¶
func (a GasCityClientAdapter) CreateSession(ctx context.Context, cityName string, req gascity.SessionCreateRequest) (gascity.Session, gascity.ResponseMeta, error)
func (GasCityClientAdapter) GetSession ¶
func (a GasCityClientAdapter) GetSession(ctx context.Context, cityName string, id string, opts gascity.SessionGetOptions) (gascity.Session, gascity.ResponseMeta, error)
func (GasCityClientAdapter) SessionTranscript ¶
func (a GasCityClientAdapter) SessionTranscript(ctx context.Context, cityName string, id string, opts gascity.TranscriptOptions) (gascity.TranscriptResponse, gascity.ResponseMeta, error)
func (GasCityClientAdapter) StreamCityEvents ¶
func (a GasCityClientAdapter) StreamCityEvents(ctx context.Context, cityName string, opts gascity.EventStreamOptions) (GasCityRPIEventStream, gascity.ResponseMeta, error)
func (GasCityClientAdapter) SubmitSession ¶
func (a GasCityClientAdapter) SubmitSession(ctx context.Context, cityName string, id string, req gascity.SessionSubmitRequest) (gascity.SessionSubmitResponse, gascity.ResponseMeta, error)
type GasCityRPIClient ¶
type GasCityRPIClient interface {
CityReadiness(context.Context, string) (gascity.ReadinessResponse, error)
CreateSession(context.Context, string, gascity.SessionCreateRequest) (gascity.Session, gascity.ResponseMeta, error)
GetSession(context.Context, string, string, gascity.SessionGetOptions) (gascity.Session, gascity.ResponseMeta, error)
SubmitSession(context.Context, string, string, gascity.SessionSubmitRequest) (gascity.SessionSubmitResponse, gascity.ResponseMeta, error)
StreamCityEvents(context.Context, string, gascity.EventStreamOptions) (GasCityRPIEventStream, gascity.ResponseMeta, error)
SessionTranscript(context.Context, string, string, gascity.TranscriptOptions) (gascity.TranscriptResponse, gascity.ResponseMeta, error)
}
type GasCityRPIEventStream ¶
type GasCityRPIEventStream interface {
NextEvent() (gascity.EventStreamFrame, error)
Close() error
}
type GasCityRPIPhaseExecutor ¶
type GasCityRPIPhaseExecutor struct {
Client GasCityRPIClient
CityName string
SessionAgentName string
PhaseTimeout time.Duration
}
func (GasCityRPIPhaseExecutor) ExecuteRPIPhase ¶
func (e GasCityRPIPhaseExecutor) ExecuteRPIPhase(ctx context.Context, req RPIPhaseExecutionRequest) (RPIPhaseExecutionResult, error)
type HeartbeatInput ¶
type JobExecutionResult ¶
type JobExecutionResult struct {
Artifacts map[string]string
ArtifactRefs map[string]ArtifactRef
}
JobExecutionResult is the terminal output from a daemon job executor.
type JobExecutor ¶
type JobExecutor interface {
JobTypes() []JobType
RunJob(context.Context, QueueLease) (JobExecutionResult, error)
}
JobExecutor runs claimed daemon jobs for one or more job types.
type JobFailure ¶
type JobFailure struct {
Code FailureCode `json:"code"`
Message string `json:"message,omitempty"`
Retryable bool `json:"retryable,omitempty"`
}
type JobProjection ¶
type JobProjection struct {
JobID string `json:"job_id"`
JobType JobType `json:"job_type,omitempty"`
RequestID string `json:"request_id"`
RequestIDs []string `json:"request_ids,omitempty"`
Status JobStatus `json:"status"`
ResultStatus JobResultStatus `json:"result_status,omitempty"`
Failure *JobFailure `json:"failure,omitempty"`
Artifacts map[string]string `json:"artifacts,omitempty"`
ArtifactRefs map[string]ArtifactRef `json:"artifact_refs,omitempty"`
ProjectionTargets []ProjectionName `json:"projection_targets,omitempty"`
CreatedAt string `json:"created_at,omitempty"`
UpdatedAt string `json:"updated_at,omitempty"`
LastEventID string `json:"last_event_id,omitempty"`
}
type JobResult ¶
type JobResult struct {
Status JobResultStatus `json:"status"`
Artifacts map[string]string `json:"artifacts,omitempty"`
CompletedAt string `json:"completed_at,omitempty"`
}
type JobResultStatus ¶
type JobResultStatus string
const ( JobResultSucceeded JobResultStatus = "succeeded" JobResultFailed JobResultStatus = "failed" JobResultCancelled JobResultStatus = "cancelled" )
type JobStatus ¶
type JobStatus string
func ProjectJobStatus ¶
func ProjectJobStatus(input JobStatusProjectionInput) JobStatus
type JobStatusProjectionInput ¶
type JobStatusProjectionInput struct {
TerminalEvent EventType
Lease LeaseState
ProjectionStale bool
}
type JobType ¶
type JobType string
const ( JobTypeRPIRun JobType = "rpi.run" JobTypeRPIPhase JobType = "rpi.phase" JobTypeDreamRun JobType = "dream.run" JobTypeDreamStage JobType = "dream.stage" JobTypeWikiBuild JobType = "wiki.build" JobTypeWikiForge JobType = "wiki.forge" JobTypeFactoryAdmission JobType = "factory.admission" JobTypeFactoryLocalPilot JobType = "factory.local-pilot" JobTypeOpenClawSnapshot JobType = "openclaw.snapshot" JobTypePlansProjection JobType = "plans.projection" // JobTypeLLMWikiLoop is the Karpathy-pattern external-knowledge loop job type. // Operates on raw/ + wiki/ trees, distinct from internal .agents/ work. JobTypeLLMWikiLoop JobType = "llmwiki.loop" JobTypeEvalSuite JobType = "eval.suite" JobTypeEvalSkillDelta JobType = "eval.skill-delta" JobTypeSkillInvoke JobType = "skill.invoke" )
type LeaseState ¶
type LeaseState string
const ( LeaseNone LeaseState = "none" LeaseFresh LeaseState = "fresh" LeaseExpired LeaseState = "expired" LeaseUnknown LeaseState = "unknown" )
type LedgerEvent ¶
type LedgerEvent struct {
SchemaVersion int `json:"schema_version"`
EventID string `json:"event_id"`
RequestID string `json:"request_id"`
JobID string `json:"job_id"`
EventType EventType `json:"event_type"`
OccurredAt string `json:"occurred_at"`
Actor string `json:"actor"`
Payload map[string]any `json:"payload,omitempty"`
}
func NewLedgerEvent ¶
func NewLedgerEvent(input LedgerEventInput) (LedgerEvent, error)
func NormalizeLedgerEvent ¶
func NormalizeLedgerEvent(event LedgerEvent) (LedgerEvent, error)
type LedgerEventInput ¶
type LedgerEventInput struct {
EventID string
RequestID RequestID
JobID string
EventType EventType
OccurredAt time.Time
Actor string
JobType JobType
ProjectionTargets []ProjectionName
Payload map[string]any
}
func NewAgentUpdateCriterionVerdictEvent ¶
func NewAgentUpdateCriterionVerdictEvent(p AgentUpdateCriterionVerdict) LedgerEventInput
NewAgentUpdateCriterionVerdictEvent builds a LedgerEventInput for an agent-update criterion_verdict payload. An empty Timestamp is defaulted to time.Now().UTC() in RFC 3339 nano format.
func NewAgentUpdatePhaseCompleteEvent ¶
func NewAgentUpdatePhaseCompleteEvent(p AgentUpdatePhaseComplete) LedgerEventInput
NewAgentUpdatePhaseCompleteEvent builds a LedgerEventInput for an agent-update phase_complete payload. An empty Timestamp is defaulted to time.Now().UTC() in RFC 3339 nano format.
func NewAgentUpdatePhaseHandoffEvent ¶
func NewAgentUpdatePhaseHandoffEvent(p AgentUpdatePhaseHandoff) LedgerEventInput
NewAgentUpdatePhaseHandoffEvent builds a LedgerEventInput for an agent-update phase_handoff payload. An empty Timestamp is defaulted to time.Now().UTC() in RFC 3339 nano format.
func NewAgentUpdatePhaseStartEvent ¶
func NewAgentUpdatePhaseStartEvent(p AgentUpdatePhaseStart) LedgerEventInput
NewAgentUpdatePhaseStartEvent builds a LedgerEventInput for an agent-update phase_start payload. An empty Timestamp is defaulted to time.Now().UTC() in RFC 3339 nano format so callers can leave it blank for "now".
type LedgerHealth ¶
type LedgerHealth struct {
LedgerSizeBytes int64 `json:"ledger_size_bytes"`
LedgerMaxBytes int64 `json:"ledger_max_bytes"`
LedgerSizeRatio float64 `json:"ledger_size_ratio"`
LatestSnapshotPath string `json:"latest_snapshot_path,omitempty"`
LatestSnapshotAge time.Duration `json:"latest_snapshot_age_ns"`
HasSnapshot bool `json:"has_snapshot"`
ArchiveCount int `json:"archive_count"`
OldestArchiveTime time.Time `json:"oldest_archive_time,omitempty"`
WarnReasons []string `json:"warn_reasons,omitempty"`
}
LedgerHealth summarizes durability state for the ao doctor surface (TB-Δ3 Phase 2-C). Read-only — populated by Store.LedgerHealth from on-disk facts.
type LedgerHealthThresholds ¶
type LedgerHealthThresholds struct {
// LedgerSizeWarnRatio fires WARN when ledger.jsonl size / max >= ratio.
LedgerSizeWarnRatio float64
// SnapshotMaxAge fires WARN when latest snapshot age >= this duration.
// Zero disables the snapshot-age check.
SnapshotMaxAge time.Duration
// ArchiveCountWarn fires WARN when archive count >= this value. Zero
// disables the check.
ArchiveCountWarn int
}
LedgerHealthThresholds controls the WARN bands. Zero-value means "use LedgerHealthDefaultThresholds()". Operators may override per-call.
func LedgerHealthDefaultThresholds ¶
func LedgerHealthDefaultThresholds() LedgerHealthThresholds
type LedgerTelemetry ¶
type LedgerTelemetry struct {
EventCount int `json:"event_count"`
Window string `json:"window"`
PhaseLatency []PhaseLatencyHistogram `json:"phase_latency,omitempty"`
WorkerKindDistribution []WorkerKindDistribution `json:"worker_kind_distribution,omitempty"`
FailureRates []JobTypeFailureRateSummary `json:"failure_rates,omitempty"`
}
func BuildLedgerTelemetry ¶
func BuildLedgerTelemetry(events []LedgerEvent, now time.Time, window time.Duration) LedgerTelemetry
type ListSchedulesResponse ¶
type ListSchedulesResponse struct {
Schedules []RecurringJobTemplate `json:"schedules"`
}
ListSchedulesResponse is the body of GET /v1/schedules.
type LocalFactoryAdmissionEvidenceProvider ¶
type LocalFactoryAdmissionEvidenceProvider struct {
Root string
WorkOrder FactoryWorkOrder
}
func (LocalFactoryAdmissionEvidenceProvider) MainCIBaseline ¶
func (p LocalFactoryAdmissionEvidenceProvider) MainCIBaseline(context.Context) (FactoryCIBaselineEvidence, error)
func (LocalFactoryAdmissionEvidenceProvider) OpenPRBlockers ¶
func (p LocalFactoryAdmissionEvidenceProvider) OpenPRBlockers(context.Context, []string) (FactoryPRBlockerMatrix, error)
func (LocalFactoryAdmissionEvidenceProvider) RepoState ¶
func (p LocalFactoryAdmissionEvidenceProvider) RepoState(ctx context.Context) (FactoryRepoState, error)
type MutationCapability ¶
type MutationCapability string
const ( MutationCapabilitySubmitJob MutationCapability = "submit_job" MutationCapabilityCancelJob MutationCapability = "cancel_job" MutationCapabilityOpenClawTrigger MutationCapability = "openclaw_trigger" MutationCapabilityAdmin MutationCapability = "admin" )
type MutationDecision ¶
type MutationDecision struct {
Allowed bool `json:"allowed"`
Reason string `json:"reason,omitempty"`
TokenName string `json:"token_name,omitempty"`
Capabilities []MutationCapability `json:"capabilities,omitempty"`
RequiredCapability MutationCapability `json:"required_capability,omitempty"`
LocalOnly bool `json:"local_only,omitempty"`
}
func AuthorizeMutationDecision ¶
func AuthorizeMutationDecision(r *http.Request, policy MutationPolicy) (MutationDecision, error)
func EvaluateMutationPolicy ¶
func EvaluateMutationPolicy(r *http.Request, policy MutationPolicy) MutationDecision
func MutationDecisionFromContext ¶
func MutationDecisionFromContext(ctx context.Context) (MutationDecision, bool)
MutationDecisionFromContext returns the decision attached by registerMutationRoute. Handlers that need TokenName for ledger actor attribution should call this rather than re-running the policy check.
type MutationPolicy ¶
type MutationPolicy struct {
Token string
Tokens []MutationToken
TokenHeader string
AllowedPaths []string
AllowedMethods []string
PathCapabilities map[string]MutationCapability
AllowedOrigins []string
RequireLocalRemote bool
}
func DefaultMutationPolicy ¶
func DefaultMutationPolicy(token string, allowedPaths []string) MutationPolicy
type MutationPolicyProvider ¶
type MutationPolicyProvider func() MutationPolicy
MutationPolicyProvider lazily resolves the policy at request time. The daemon defaults are evaluated per-request because tests inject options after the router is built.
type MutationToken ¶
type MutationToken struct {
Name string `json:"name"`
Token string `json:"token"` // #nosec G101 -- config field name, not a hard-coded credential.
Capabilities []MutationCapability `json:"capabilities"`
LocalOnly bool `json:"local_only,omitempty"`
}
func LoadMutationTokensFile ¶
func LoadMutationTokensFile(path string) ([]MutationToken, error)
type OpenClawResources ¶
type OpenClawResources struct {
Runs []JobProjection `json:"runs"`
Jobs []JobProjection `json:"jobs"`
Wiki []JobProjection `json:"wiki"`
}
type OpenClawSnapshotProjection ¶
type OpenClawSnapshotProjection struct {
SchemaVersion int `json:"schema_version"`
SnapshotID string `json:"snapshot_id"`
GeneratedAt string `json:"generated_at"`
Source ProjectionSource `json:"source"`
Resources OpenClawResources `json:"resources"`
Status ProjectionStatus `json:"status"`
}
type PhaseLatencyHistogram ¶
type PlansBdSource ¶
type PlansBdSource interface {
// QueryEpics returns the bd-side epic state for project_id (filtered to
// rows whose ID begins with issuePrefix when non-empty). Implementations
// must respect ctx cancellation.
QueryEpics(ctx context.Context, projectID, issuePrefix string) ([]PlansProjectionEntry, error)
}
PlansBdSource is the input edge of PlansProjectionExecutor. Production wires this to the shared bushido Dolt server via dbBdSource (database/sql); tests inject a fakePlansBdSource for L1 BDD coverage.
type PlansProjectionEntry ¶
type PlansProjectionEntry struct {
BeadsID string `json:"beads_id"`
Title string `json:"title,omitempty"`
Status string `json:"status,omitempty"`
Priority string `json:"priority,omitempty"`
IssueType string `json:"issue_type,omitempty"`
UpdatedAt time.Time `json:"updated_at,omitempty"`
Checksum string `json:"checksum,omitempty"`
}
PlansProjectionEntry is one entry in the daemon-rebuilt plans manifest. Source-of-truth fields come from bd via the executor's BdSource.
type PlansProjectionExecutor ¶
type PlansProjectionExecutor struct {
// contains filtered or unexported fields
}
PlansProjectionExecutor is the JobExecutor (supervisor.go:18) for plans.projection jobs. Mirrors the RPIJobExecutor shape: thin wrapper that the supervisor invokes with a claimed job; rebuild/write/validate mechanics live in plans_projection.go.
func NewPlansProjectionExecutor ¶
func NewPlansProjectionExecutor(opts PlansProjectionExecutorOptions) (*PlansProjectionExecutor, error)
NewPlansProjectionExecutor builds an executor from explicit dependencies. Returns an error when the store or bd source is nil — mirrors NewRPIJobExecutor's required-field contract.
func (*PlansProjectionExecutor) JobTypes ¶
func (e *PlansProjectionExecutor) JobTypes() []JobType
JobTypes reports the daemon job types this executor handles.
func (*PlansProjectionExecutor) RunJob ¶
func (e *PlansProjectionExecutor) RunJob(ctx context.Context, claim QueueLease) (JobExecutionResult, error)
RunJob executes one claimed plans.projection job. The supervisor wraps this call with claim/heartbeat/terminal-record bookkeeping. The executor:
- parses the spec from claim payload,
- queries bd for the project's epic set,
- builds a DaemonPlansProjection in memory,
- writes the manifest snapshot atomically (tmp + os.Rename), and
- returns artifacts mapping the snapshot path and entry count.
RunJob requires a non-nil ctx; callers passing nil will panic on first use.
type PlansProjectionExecutorOptions ¶
type PlansProjectionExecutorOptions struct {
Store *Store
BdSource PlansBdSource
Now func() time.Time
}
PlansProjectionExecutorOptions configures a PlansProjectionExecutor.
type PlansProjectionJobSpec ¶
type PlansProjectionJobSpec struct {
SchemaVersion int `json:"schema_version"`
JobType JobType `json:"job_type"`
ProjectID string `json:"project_id"`
IssuePrefix string `json:"issue_prefix"`
RefreshTrigger PlansProjectionRefreshTrigger `json:"refresh_trigger"`
OutputDir string `json:"output_dir"`
}
PlansProjectionJobSpec is the payload contract for plans.projection jobs. Mirrors the rpi/dream spec round-trip pattern.
func NewPlansProjectionJobSpec ¶
func NewPlansProjectionJobSpec(projectID, issuePrefix, outputDir string) PlansProjectionJobSpec
NewPlansProjectionJobSpec builds a spec with default schema/job_type values.
func PlansProjectionJobSpecFromPayload ¶
func PlansProjectionJobSpecFromPayload(payload map[string]any) (PlansProjectionJobSpec, error)
PlansProjectionJobSpecFromPayload parses a JobSpec.Payload back into a typed spec. Returns an error if required fields are missing or invalid.
func (PlansProjectionJobSpec) IdempotencyKey ¶
func (spec PlansProjectionJobSpec) IdempotencyKey() string
IdempotencyKey returns the singleton-per-project key used by the queue to collapse duplicate submissions per foundation §1 idempotency rule.
func (PlansProjectionJobSpec) Validate ¶
func (spec PlansProjectionJobSpec) Validate() error
Validate enforces the spec contract for submission and replay.
type PlansProjectionRefreshTrigger ¶
type PlansProjectionRefreshTrigger string
PlansProjectionRefreshTrigger names how the daemon decided to (re)run the projection. Used for diagnostics and ledger correlation.
const ( PlansProjectionTriggerManual PlansProjectionRefreshTrigger = "manual" PlansProjectionTriggerInterval PlansProjectionRefreshTrigger = "interval" PlansProjectionTriggerSubscription PlansProjectionRefreshTrigger = "subscription" )
type ProjectionLag ¶
type ProjectionManifest ¶
type ProjectionManifest struct {
SchemaVersion int `json:"schema_version"`
Projection ProjectionName `json:"projection"`
SourceLedger string `json:"source_ledger"`
LastEventID string `json:"last_event_id,omitempty"`
Status ProjectionStatus `json:"status"`
RebuiltAt string `json:"rebuilt_at"`
OutputPaths []string `json:"output_paths,omitempty"`
DegradedReasons []string `json:"degraded_reasons,omitempty"`
}
type ProjectionName ¶
type ProjectionName string
const ( ProjectionRPIRegistry ProjectionName = "rpi-registry" ProjectionDreamRuns ProjectionName = "dream-runs" ProjectionWikiJobs ProjectionName = "wiki-jobs" ProjectionOpenClaw ProjectionName = "openclaw-snapshot" ProjectionDaemonStatus ProjectionName = "daemon-status" ProjectionDaemonJobStatus ProjectionName = "daemon-job-status" ProjectionPlansManifest ProjectionName = "plans-manifest" )
type ProjectionRebuildOptions ¶
type ProjectionRebuildOptions struct {
RebuiltAt time.Time
SourceLedger string
// FromSnapshot, when non-nil, makes RebuildProjections start from the
// given snapshot's state and apply only events whose EventID is strictly
// greater than FromSnapshot.LastEventID. Use this to amortize replay cost
// across daemon restarts (Phase 2-B of TB-Δ3).
FromSnapshot *ProjectionSet
}
type ProjectionSet ¶
type ProjectionSet struct {
SchemaVersion int `json:"schema_version"`
RebuiltAt string `json:"rebuilt_at"`
SourceLedger string `json:"source_ledger"`
LastEventID string `json:"last_event_id,omitempty"`
Manifests map[ProjectionName]ProjectionManifest `json:"manifests"`
Jobs []JobProjection `json:"jobs"`
RPI RPIRegistryProjection `json:"rpi"`
Dream DreamRunsProjection `json:"dream"`
Wiki WikiJobsProjection `json:"wiki"`
OpenClaw OpenClawSnapshotProjection `json:"openclaw"`
Plans DaemonPlansProjection `json:"plans"`
Schedules []RecurringJobTemplate `json:"schedules,omitempty"`
Factory FactoryStatusProjection `json:"factory"`
DegradedReasons []string `json:"degraded_reasons,omitempty"`
}
func RebuildProjections ¶
func RebuildProjections(events []LedgerEvent, opts ProjectionRebuildOptions) (ProjectionSet, error)
RebuildProjections folds the given ledger events into a fresh ProjectionSet (or, if opts.FromSnapshot is set, into a delta replay seeded from that snapshot).
Callers MUST check err before using the returned set. On error, the returned set is zero-valued (SchemaVersion == 0, nil Manifests/Jobs/ Schedules, empty derived RPI/Dream/Wiki/OpenClaw buckets). Treat the SchemaVersion == 0 sentinel as "do not use this set"; downstream code that indexes into Manifests or appends to Jobs without checking err first risks nil-map panics or silently emitting an empty projection. See the bug-hunt L2 test TestRebuildProjections_ErrorReturnsUnusableSet for the contract.
type ProjectionSource ¶
type ProjectionStatus ¶
type ProjectionStatus string
const ( ProjectionStatusCurrent ProjectionStatus = "current" ProjectionStatusStale ProjectionStatus = "stale" ProjectionStatusDegraded ProjectionStatus = "degraded" )
type ProviderProjectionInput ¶
type ProviderStatus ¶
type ProviderStatus string
const ( ProviderUnreachable ProviderStatus = "provider_unreachable" ProviderSessionPending ProviderStatus = "session_pending" ProviderSessionBound ProviderStatus = "session_bound" )
func ProjectProviderStatus ¶
func ProjectProviderStatus(input ProviderProjectionInput) ProviderStatus
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
func NewQueue ¶
func NewQueue(store *Store, opts QueueOptions) *Queue
NewQueue constructs a Queue bound to store with the given options.
HTTP handlers must call NewQueue per request rather than reusing a shared instance: the returned Queue carries an in-memory sequence counter initialized from the durable store, and opts may be request-scoped (Now, failpoints, actor). The store itself is shared and concurrency-safe; the Queue wrapper is the per-request boundary. See cli/internal/daemon/server.go handlers for the canonical pattern.
func (*Queue) CancelJob ¶
func (q *Queue) CancelJob(input CancelJobInput, opts QueueMutationOptions) (CancelJobResult, error)
CancelJob appends a terminal cancellation event for a non-terminal job.
func (*Queue) ClaimJob ¶
func (q *Queue) ClaimJob(jobID, actor string, opts QueueMutationOptions) (QueueLease, error)
func (*Queue) ClaimNext ¶
func (q *Queue) ClaimNext(actor string, opts QueueMutationOptions) (QueueLease, error)
func (*Queue) ClaimNextMatching ¶
func (q *Queue) ClaimNextMatching(actor string, match func(QueueJobState) bool, opts QueueMutationOptions) (QueueLease, error)
ClaimNextMatching claims the next claimable job accepted by match.
func (*Queue) CompleteJob ¶
func (q *Queue) CompleteJob(input CompleteJobInput, opts QueueMutationOptions) (QueueJobState, error)
func (*Queue) FailJob ¶
func (q *Queue) FailJob(input FailJobInput, opts QueueMutationOptions) (QueueJobState, error)
func (*Queue) Heartbeat ¶
func (q *Queue) Heartbeat(input HeartbeatInput, opts QueueMutationOptions) (QueueJobState, error)
func (*Queue) Snapshot ¶
func (q *Queue) Snapshot() (QueueSnapshot, error)
func (*Queue) SubmitJob ¶
func (q *Queue) SubmitJob(input SubmitJobInput, opts QueueMutationOptions) (QueueJobState, error)
type QueueFailpoint ¶
type QueueFailpoint string
const ( QueueFailpointBeforeAppend QueueFailpoint = "before_append" QueueFailpointAfterAppendBeforeAck QueueFailpoint = "after_append_before_ack" )
type QueueJobState ¶
type QueueJobState struct {
JobID string `json:"job_id"`
JobType JobType `json:"job_type"`
RequestID string `json:"request_id"`
RequestIDs []string `json:"request_ids,omitempty"`
Status JobStatus `json:"status"`
IdempotencyKey string `json:"idempotency_key,omitempty"`
Attempt int `json:"attempt"`
MaxAttempts int `json:"max_attempts"`
ClaimToken string `json:"claim_token,omitempty"`
LeaseEpoch int `json:"lease_epoch,omitempty"`
LeaseExpiresAt string `json:"lease_expires_at,omitempty"`
RetryExhausted bool `json:"retry_exhausted,omitempty"`
Failure *JobFailure `json:"failure,omitempty"`
Artifacts map[string]string `json:"artifacts,omitempty"`
ArtifactRefs map[string]ArtifactRef `json:"artifact_refs,omitempty"`
ProjectionTargets []ProjectionName `json:"projection_targets,omitempty"`
Payload map[string]any `json:"payload,omitempty"`
LastEventID string `json:"last_event_id,omitempty"`
CreatedAt string `json:"created_at,omitempty"`
UpdatedAt string `json:"updated_at,omitempty"`
}
type QueueLease ¶
type QueueLease struct {
Job QueueJobState `json:"job"`
ClaimToken string `json:"claim_token"`
LeaseEpoch int `json:"lease_epoch"`
LeaseExpiresAt string `json:"lease_expires_at"`
}
type QueueMutationOptions ¶
type QueueMutationOptions struct {
Failpoint QueueFailpoint
}
type QueueOptions ¶
type QueueSnapshot ¶
type QueueSnapshot struct {
Jobs []QueueJobState `json:"jobs"`
LastEventID string `json:"last_event_id,omitempty"`
}
type RPIBackend ¶
type RPIBackend string
const ( RPIBackendGasCityAPI RPIBackend = "gascity-api" RPIBackendGasCityCLI RPIBackend = "gc-cli-fallback" RPIBackendForeground RPIBackend = "foreground" RPIBackendDaemonDegraded RPIBackend = "daemon-degraded" )
type RPIJobExecutor ¶
type RPIJobExecutor struct {
// contains filtered or unexported fields
}
RPIJobExecutor is a JobExecutor (Supervisor-compatible) for rpi.run and rpi.phase jobs. The Supervisor handles claim/heartbeat/terminal-write; this executor only runs the user-visible RPI work for an already-claimed job by delegating to an RPIPhaseExecutor (today: GasCityRPIPhaseExecutor).
soc-5of.8 (TB-Δ8) — production caller for daemon-submitted RPI jobs under the gascity executor policy. Symmetric in shape to WikiForgeExecutor + DreamExecutor.
func NewRPIJobExecutor ¶
func NewRPIJobExecutor(opts RPIJobExecutorOptions) (*RPIJobExecutor, error)
NewRPIJobExecutor constructs an executor for rpi.run / rpi.phase jobs. The underlying RPIRunner is reused so behavior matches the operator-driven `ao rpi run` path; only the top-level claim/terminal-record loop differs (Supervisor owns that).
func (*RPIJobExecutor) JobTypes ¶
func (e *RPIJobExecutor) JobTypes() []JobType
JobTypes reports the daemon job types this executor handles.
func (*RPIJobExecutor) RunJob ¶
func (e *RPIJobExecutor) RunJob(ctx context.Context, claim QueueLease) (JobExecutionResult, error)
RunJob executes a claimed RPI job. The supervisor wraps this call with claim/heartbeat/terminal-record bookkeeping; we only contribute the per-job execution.
RunJob requires a non-nil ctx; callers passing nil will panic on first use.
type RPIJobExecutorOptions ¶
type RPIJobExecutorOptions struct {
Store *Store
Executor RPIPhaseExecutor
PromptBuilder RPIPromptBuilder
// RegistryWriter is optional; used to keep the rpi-registry projection
// fresh after each phase. Pass nil to disable writes.
RegistryWriter cliRPI.RunRegistryWriter
}
RPIJobExecutorOptions configures an RPIJobExecutor.
type RPIJobRunResult ¶
type RPIPhaseExecutionError ¶
type RPIPhaseExecutionError struct {
Code FailureCode
Message string
Retryable bool
Cause error
}
func (*RPIPhaseExecutionError) Error ¶
func (e *RPIPhaseExecutionError) Error() string
func (*RPIPhaseExecutionError) Unwrap ¶
func (e *RPIPhaseExecutionError) Unwrap() error
type RPIPhaseExecutionRequest ¶
type RPIPhaseExecutionRequest struct {
Root string
JobID string
RunID string
Goal string
EpicID string
ExecutionPacketPath string
ParentRunJobID string
StartPhase int
MaxPhase int
Phase int
PhaseName string
Attempt int
Backend RPIBackend
GasCityCityName string
GasCitySessionAlias string
PhaseTimeout time.Duration
Prompt string
Progress RPIPhaseProgressFunc
}
type RPIPhaseExecutionResult ¶
type RPIPhaseExecutionResult struct {
Status string `json:"status,omitempty"`
Artifacts map[string]string `json:"artifacts,omitempty"`
RequestIDs map[string]string `json:"request_ids,omitempty"`
GasCityCityName string `json:"gascity_city_name,omitempty"`
GasCitySessionID string `json:"gascity_session_id,omitempty"`
GasCitySessionAlias string `json:"gascity_session_alias,omitempty"`
EventCursor string `json:"event_cursor,omitempty"`
EvidencePath string `json:"evidence_path,omitempty"`
}
type RPIPhaseExecutor ¶
type RPIPhaseExecutor interface {
ExecuteRPIPhase(context.Context, RPIPhaseExecutionRequest) (RPIPhaseExecutionResult, error)
}
type RPIPhaseJobSpec ¶
type RPIPhaseJobSpec struct {
SchemaVersion int `json:"schema_version"`
JobType JobType `json:"job_type"`
RunID string `json:"run_id"`
Goal string `json:"goal"`
EpicID string `json:"epic_id,omitempty"`
ParentRunJobID string `json:"parent_run_job_id,omitempty"`
ExecutionPacketPath string `json:"execution_packet_path,omitempty"`
Phase int `json:"phase"`
PhaseName string `json:"phase_name"`
Attempt int `json:"attempt,omitempty"`
Backend RPIBackend `json:"backend"`
GasCityCityName string `json:"gascity_city_name,omitempty"`
GasCitySessionAlias string `json:"gascity_session_alias,omitempty"`
PhaseTimeout string `json:"phase_timeout,omitempty"`
}
func NewRPIPhaseJobSpec ¶
func NewRPIPhaseJobSpec(runID, goal string, phase int) RPIPhaseJobSpec
func RPIPhaseJobSpecFromPayload ¶
func RPIPhaseJobSpecFromPayload(payload map[string]any) (RPIPhaseJobSpec, error)
func (RPIPhaseJobSpec) ToJobSpec ¶
func (spec RPIPhaseJobSpec) ToJobSpec(jobID string) (JobSpec, error)
func (RPIPhaseJobSpec) Validate ¶
func (spec RPIPhaseJobSpec) Validate() error
type RPIPhaseProgress ¶
type RPIPhaseProgressFunc ¶
type RPIPhaseProgressFunc func(context.Context, RPIPhaseProgress) error
type RPIPromptBuilder ¶
type RPIPromptBuilder interface {
BuildRPIPrompt(RPIPhaseExecutionRequest) (string, error)
}
type RPIPromptBuilderFunc ¶
type RPIPromptBuilderFunc func(RPIPhaseExecutionRequest) (string, error)
func (RPIPromptBuilderFunc) BuildRPIPrompt ¶
func (f RPIPromptBuilderFunc) BuildRPIPrompt(req RPIPhaseExecutionRequest) (string, error)
type RPIReconcileGasCityClient ¶
type RPIReconcileGasCityClient interface {
CityReadiness(context.Context, string) (gascity.ReadinessResponse, error)
GetSession(context.Context, string, string, gascity.SessionGetOptions) (gascity.Session, gascity.ResponseMeta, error)
SessionTranscript(context.Context, string, string, gascity.TranscriptOptions) (gascity.TranscriptResponse, gascity.ResponseMeta, error)
}
type RPIReconcileJob ¶
type RPIReconcileJob struct {
JobID string `json:"job_id"`
RunID string `json:"run_id,omitempty"`
Phase int `json:"phase,omitempty"`
JobStatus JobStatus `json:"job_status"`
Status RPIReconcileStatus `json:"status"`
ProviderStatus ProviderStatus `json:"provider_status,omitempty"`
CityName string `json:"city_name,omitempty"`
SessionID string `json:"session_id,omitempty"`
SessionAlias string `json:"session_alias,omitempty"`
EventCursor string `json:"event_cursor,omitempty"`
EvidencePath string `json:"evidence_path,omitempty"`
FailureCode FailureCode `json:"failure_code,omitempty"`
RepairedLedger bool `json:"repaired_ledger,omitempty"`
Message string `json:"message,omitempty"`
}
type RPIReconcileReport ¶
type RPIReconcileReport struct {
Jobs []RPIReconcileJob `json:"jobs"`
Active int `json:"active"`
Completed int `json:"completed"`
Lost int `json:"lost"`
Failed int `json:"failed"`
}
type RPIReconcileStatus ¶
type RPIReconcileStatus string
const ( RPIReconcileActive RPIReconcileStatus = "active" RPIReconcileCompleted RPIReconcileStatus = "completed" RPIReconcileLost RPIReconcileStatus = "lost" RPIReconcileFailed RPIReconcileStatus = "failed" RPIReconcileProviderUnreachable RPIReconcileStatus = "provider_unreachable" RPIReconcilePending RPIReconcileStatus = "pending" )
type RPIReconciler ¶
type RPIReconciler struct {
// contains filtered or unexported fields
}
func NewRPIReconciler ¶
func NewRPIReconciler(store *Store, opts RPIReconcilerOptions) (*RPIReconciler, error)
func (*RPIReconciler) ReconcileRPIJobs ¶
func (r *RPIReconciler) ReconcileRPIJobs(ctx context.Context) (RPIReconcileReport, error)
type RPIReconcilerOptions ¶
type RPIReconcilerOptions struct {
Queue *Queue
GasCityClient RPIReconcileGasCityClient
RegistryWriter cliRPI.RunRegistryWriter
Actor string
}
type RPIRegistryProjection ¶
type RPIRegistryProjection struct {
Runs []JobProjection `json:"runs"`
}
type RPIRunExecutor ¶
type RPIRunExecutor struct {
// contains filtered or unexported fields
}
RPIRunExecutor is a JobExecutor for rpi.run that calls the injected runner in-process. Replaces RPICLIExecutor's shell-out path on the daemon CLI-fallback wire-up. Mirrors DreamExecutor's function-pointer pattern.
soc-bcrn.3.6 (E3.W4 sub-5a): see RPIRunRequest for context. soc-y0ct.2: emits agent-update events on phase boundaries when store != nil.
func NewRPIRunExecutor ¶
func NewRPIRunExecutor(opts RPIRunExecutorOptions) (*RPIRunExecutor, error)
NewRPIRunExecutor constructs an RPIRunExecutor. Run and Root are required; Store/Clock/Actor are optional and gate agent-update emission.
func (*RPIRunExecutor) JobTypes ¶
func (e *RPIRunExecutor) JobTypes() []JobType
JobTypes reports the queue job types this executor handles.
func (*RPIRunExecutor) RunJob ¶
func (e *RPIRunExecutor) RunJob(ctx context.Context, claim QueueLease) (JobExecutionResult, error)
RunJob parses the spec, validates it (only full cycles supported), and dispatches to the injected runner. Runner-emitted artifacts are merged on top of the executor-default artifacts so per-run output paths surface to the supervisor's terminal record.
RunJob requires a non-nil ctx; callers passing nil receive a Background ctx.
type RPIRunExecutorOptions ¶
type RPIRunExecutorOptions struct {
Run RPIRunFunc
Root string
Store *Store
Clock func() time.Time
Actor string
}
RPIRunExecutorOptions configures NewRPIRunExecutor.
Store/Clock/Actor are optional. When Store is non-nil, RunJob emits agent-update ledger events on phase boundaries (phase_start before the runner; phase_complete + phase_handoff after) per soc-y0ct.2. When Store is nil, no events are emitted — preserves back-compat with sub-wave 5a tests that injected a fake runner only.
type RPIRunFunc ¶
type RPIRunFunc func(ctx context.Context, req RPIRunRequest) (RPIRunResult, error)
RPIRunFunc executes a single rpi.run claim in-process. The cmd/ao package (agentopsd.go) supplies the implementation; this lets the daemon package stay free of cmd/ao imports. Mirrors the function-pointer pattern used by DreamExecutor.
type RPIRunJobSpec ¶
type RPIRunJobSpec struct {
SchemaVersion int `json:"schema_version"`
JobType JobType `json:"job_type"`
RunID string `json:"run_id"`
Goal string `json:"goal"`
EpicID string `json:"epic_id,omitempty"`
ExecutionPacketPath string `json:"execution_packet_path,omitempty"`
StartPhase int `json:"start_phase"`
MaxPhase int `json:"max_phase"`
Complexity string `json:"complexity,omitempty"`
TestFirst bool `json:"test_first"`
Backend RPIBackend `json:"backend"`
GasCityCityName string `json:"gascity_city_name,omitempty"`
PhaseTimeout string `json:"phase_timeout,omitempty"`
// Supervisor policy fields (soc-bcrn.3.8 / E3.W4 sub-5a-fix). These let
// daemon-submitted rpi.run jobs preserve gate enforcement and landing
// semantics that the legacy `ao rpi loop --supervisor` shell wrapper
// provided. All are optional; zero values mean "let the supervisor pick a
// safe default" so older payloads stay backward compatible.
MaxCycles int `json:"max_cycles,omitempty"`
GatePolicy string `json:"gate_policy,omitempty"` // off | best-effort | required
LandingPolicy string `json:"landing_policy,omitempty"` // off | commit | sync-push
LandingBranch string `json:"landing_branch,omitempty"`
BDSyncPolicy string `json:"bd_sync_policy,omitempty"` // auto | always | never
FailurePolicy string `json:"failure_policy,omitempty"` // stop | continue
KillSwitchPath string `json:"kill_switch_path,omitempty"`
}
func NewRPIRunJobSpec ¶
func NewRPIRunJobSpec(runID, goal string) RPIRunJobSpec
func RPIRunJobSpecFromPayload ¶
func RPIRunJobSpecFromPayload(payload map[string]any) (RPIRunJobSpec, error)
func (RPIRunJobSpec) Validate ¶
func (spec RPIRunJobSpec) Validate() error
type RPIRunRequest ¶
type RPIRunRequest struct {
Spec RPIRunJobSpec
Claim QueueLease
Root string
}
RPIRunRequest is the per-job input handed to the runner function injected into RPIRunExecutor. It carries the parsed RPIRunJobSpec, the original QueueLease (so the runner can correlate IDs / heartbeat metadata), and the daemon root cwd.
soc-bcrn.3.6 (E3.W4 sub-5a): introduced as part of the in-process executor swap that retires RPICLIExecutor's shell-out path. The cmd/ao package supplies the runner so the daemon stays free of cmd/ao imports.
type RPIRunResult ¶
RPIRunResult is the per-job output from the injected runner. Artifacts are merged on top of the executor-default artifacts before being handed back to the supervisor.
type RPIRunner ¶
type RPIRunner struct {
// contains filtered or unexported fields
}
func NewRPIRunner ¶
func NewRPIRunner(store *Store, opts RPIRunnerOptions) (*RPIRunner, error)
func (*RPIRunner) ExecuteClaim ¶
func (r *RPIRunner) ExecuteClaim(ctx context.Context, claim QueueLease) (map[string]string, string, error)
ExecuteClaim runs the user-visible RPI work for a job that has already been claimed by some caller (the supervisor's claim path is one such caller; the operator-driven `ao rpi run` is another). It does not handle claim, heartbeat, or terminal write — those are the caller's responsibility.
func (*RPIRunner) RunNextRPIJob ¶
func (r *RPIRunner) RunNextRPIJob(ctx context.Context) (RPIJobRunResult, error)
type RPIRunnerOptions ¶
type RPIRunnerOptions struct {
Queue *Queue
Executor RPIPhaseExecutor
RegistryWriter cliRPI.RunRegistryWriter
Actor string
HeartbeatInterval time.Duration
PromptBuilder RPIPromptBuilder
}
type ReadOnlyEventsResponse ¶
type ReadOnlyEventsResponse struct {
Events []LedgerEvent `json:"events"`
Corrupt []CorruptRecord `json:"corrupt,omitempty"`
LastEventID string `json:"last_event_id,omitempty"`
}
type ReadOnlyHealthResponse ¶
type ReadOnlyReadyResponse ¶
type ReadOnlyReadyResponse struct {
Ready bool `json:"ready"`
LedgerReplayStatus SnapshotReplayStatus `json:"ledger_replay_status"`
ProjectionStatus ProjectionStatus `json:"projection_status"`
ProjectionLag ProjectionLag `json:"projection_lag"`
DegradedReasons []string `json:"degraded_reasons,omitempty"`
}
type ReadOnlyServer ¶
type ReadOnlyServer struct {
// contains filtered or unexported fields
}
type ReadOnlyStatusResponse ¶
type ReadOnlyStatusResponse struct {
Ready bool `json:"ready"`
ProjectionLag ProjectionLag `json:"projection_lag"`
Queue QueueSnapshot `json:"queue"`
Projections ProjectionSet `json:"projections"`
}
type RealClock ¶
type RealClock struct{}
RealClock is the production Clock backed by the standard library.
type RecurrenceBackpressure ¶
type RecurrenceBackpressure struct {
SkipIfRunning bool `json:"skip_if_running"`
MaxQueueDepth int `json:"max_queue_depth"`
}
RecurrenceBackpressure controls how the supervisor handles in-flight schedules.
type RecurrenceSupervisor ¶
type RecurrenceSupervisor struct {
// contains filtered or unexported fields
}
RecurrenceSupervisor ticks RecurringJobTemplate schedules on cron cadence and submits jobs into the daemon Queue. One Start() goroutine per supervisor.
func NewRecurrenceSupervisor ¶
func NewRecurrenceSupervisor(store *Store, queue *Queue, clock Clock) *RecurrenceSupervisor
NewRecurrenceSupervisor builds a supervisor bound to the given store + queue. Production callers pass RealClock{}; tests pass a *FakeClock from clock_test.
func (*RecurrenceSupervisor) PollInterval ¶
func (s *RecurrenceSupervisor) PollInterval() time.Duration
func (*RecurrenceSupervisor) Start ¶
func (s *RecurrenceSupervisor) Start(ctx context.Context) error
Start runs the supervisor loop until ctx is cancelled. It re-reads the schedule list from store on every iteration to pick up adds/deletes.
func (*RecurrenceSupervisor) WithPollInterval ¶
func (s *RecurrenceSupervisor) WithPollInterval(d time.Duration) *RecurrenceSupervisor
WithPollInterval overrides the cadence at which Start() re-reads the schedule list and runs tick(). Defaults to 1 minute (matches cron's 5-field minimum granularity, amendment B4).
type RecurringJobTemplate ¶
type RecurringJobTemplate struct {
Name string `json:"name"`
Cron string `json:"cron"`
JobType JobType `json:"job_type"`
Payload json.RawMessage `json:"payload,omitempty"`
Timeout time.Duration `json:"timeout,omitempty"`
Backpressure RecurrenceBackpressure `json:"backpressure"`
}
RecurringJobTemplate is a schedule entry that materializes a Job on each cron tick.
func ScheduleStateFromEvents ¶
func ScheduleStateFromEvents(events []LedgerEvent) []RecurringJobTemplate
ScheduleStateFromEvents reduces a ledger event slice to the active schedule list. Insertion order is preserved (the slice key is creation time). schedule.deleted removes the entry by name. Unknown event types are ignored — schedule reduction only cares about its own vocabulary.
Exposed for tests + projection helpers in projections.go.
type ReplayResult ¶
type ReplayResult struct {
Events []LedgerEvent `json:"events"`
Corrupt []CorruptRecord `json:"corrupt,omitempty"`
}
type RoutingAuthority ¶
type RoutingAuthority string
const ( RoutingAuthorityObserve RoutingAuthority = "OBSERVE" RoutingAuthorityAdvisory RoutingAuthority = "ADVISORY" RoutingAuthorityDelegated RoutingAuthority = "DELEGATED" RoutingAuthorityAuthoritative RoutingAuthority = "AUTHORITATIVE" )
type RoutingLane ¶
type RoutingLane struct {
ID string `json:"id"`
Enabled bool `json:"enabled"`
Authority RoutingAuthority `json:"authority"`
Provider string `json:"provider"`
Runtime string `json:"runtime"`
Model string `json:"model"`
TaskClasses []string `json:"task_classes"`
MaxConcurrency int `json:"max_concurrency"`
CostHintUSDPerHour float64 `json:"cost_hint_usd_per_hour,omitempty"`
LatencyHint string `json:"latency_hint,omitempty"`
QualityPrior string `json:"quality_prior,omitempty"`
YieldGate *RoutingYieldGate `json:"yield_gate,omitempty"`
PromotionGate *RoutingPromotionGate `json:"promotion_gate,omitempty"`
MergeEligibility *RoutingMergeGate `json:"merge_eligibility,omitempty"`
DisabledReason string `json:"disabled_reason,omitempty"`
}
type RoutingMergeGate ¶
type RoutingMergeGate struct {
ManualMergeRequired bool `json:"manual_merge_required"`
ValidationCommands []string `json:"validation_commands"`
ValidationFailureTerminalEvent EventType `json:"validation_failure_terminal_event"`
RetainArtifactsOnFailure bool `json:"retain_artifacts_on_failure"`
RetainWorktreeOnValidationFailure bool `json:"retain_worktree_on_validation_failure"`
}
type RoutingPolicy ¶
type RoutingPolicy struct {
SchemaVersion int `json:"schema_version"`
PolicyID string `json:"policy_id"`
DefaultLane string `json:"default_lane"`
MaxTotalConcurrency int `json:"max_total_concurrency"`
AutoMergeEnabled bool `json:"auto_merge_enabled"`
Lanes []RoutingLane `json:"lanes"`
ManualMergeByDefault bool `json:"manual_merge_by_default"`
}
func DefaultFactoryRoutingPolicy ¶
func DefaultFactoryRoutingPolicy() RoutingPolicy
func ParseRoutingPolicyJSON ¶
func ParseRoutingPolicyJSON(data []byte) (RoutingPolicy, error)
func (RoutingPolicy) LaneByID ¶
func (policy RoutingPolicy) LaneByID(id string) (RoutingLane, bool)
func (RoutingPolicy) SelectLane ¶
func (policy RoutingPolicy) SelectLane(taskClass string) (RoutingLane, error)
type RoutingPromotionGate ¶
type RoutingPromotionGate struct {
RequiresYieldEvidence bool `json:"requires_yield_evidence"`
}
type RoutingYieldGate ¶
type ServerOptions ¶
type ServerOptions struct {
Now func() time.Time
SourceLedger string
QueueOptions QueueOptions
MutationPolicy MutationPolicy
}
type ServiceInstallPlan ¶
type ServiceInstallPlan struct {
ServiceName string `json:"service_name"`
Platform string `json:"platform"`
DryRun bool `json:"dry_run"`
Executable string `json:"executable"`
RepoRoot string `json:"repo_root"`
Address string `json:"address"`
Args []string `json:"args"`
UnitPath string `json:"unit_path,omitempty"`
}
func BuildServiceInstallPlan ¶
func BuildServiceInstallPlan(repoRoot, executable, address string, dryRun bool) ServiceInstallPlan
type SkillInvokeExecutor ¶
type SkillInvokeExecutor struct {
// contains filtered or unexported fields
}
func NewSkillInvokeExecutor ¶
func NewSkillInvokeExecutor(opts SkillInvokeExecutorOptions) (*SkillInvokeExecutor, error)
func (*SkillInvokeExecutor) JobTypes ¶
func (e *SkillInvokeExecutor) JobTypes() []JobType
func (*SkillInvokeExecutor) RunJob ¶
func (e *SkillInvokeExecutor) RunJob(ctx context.Context, claim QueueLease) (JobExecutionResult, error)
type SkillInvokeExecutorOptions ¶
type SkillInvokeExecutorOptions struct {
Root string
Run SkillInvokeFunc
}
type SkillInvokeFunc ¶
type SkillInvokeFunc func(context.Context, SkillInvokeRequest) (SkillInvokeResult, error)
type SkillInvokeJobSpec ¶
type SkillInvokeJobSpec struct {
SchemaVersion int `json:"schema_version"`
JobType JobType `json:"job_type"`
SkillName string `json:"skill_name"`
Args []string `json:"args,omitempty"`
}
func NewSkillInvokeJobSpec ¶
func NewSkillInvokeJobSpec(skillName string, args []string) SkillInvokeJobSpec
func SkillInvokeJobSpecFromPayload ¶
func SkillInvokeJobSpecFromPayload(payload map[string]any) (SkillInvokeJobSpec, error)
func (SkillInvokeJobSpec) ToJobSpec ¶
func (spec SkillInvokeJobSpec) ToJobSpec(jobID string) (JobSpec, error)
func (SkillInvokeJobSpec) Validate ¶
func (spec SkillInvokeJobSpec) Validate() error
type SkillInvokeRequest ¶
type SkillInvokeRequest struct {
Spec SkillInvokeJobSpec
Claim QueueLease
Root string
}
type SkillInvokeResult ¶
type SnapshotConsumerResult ¶
type SnapshotConsumerResult string
const ( SnapshotServe SnapshotConsumerResult = "serve_snapshot" SnapshotCompatibilityError SnapshotConsumerResult = "compatibility_error" SnapshotProjectionMissing SnapshotConsumerResult = "projection_missing" SnapshotProjectionDegraded SnapshotConsumerResult = "projection_degraded" )
func ProjectSnapshotResult ¶
func ProjectSnapshotResult(input SnapshotProjectionInput) SnapshotConsumerResult
type SnapshotProjectionInput ¶
type SnapshotProjectionInput struct {
ReplayStatus SnapshotReplayStatus
FileExists bool
VersionSupported bool
}
type SnapshotReplayStatus ¶
type SnapshotReplayStatus string
const ( SnapshotReplayComplete SnapshotReplayStatus = "complete" SnapshotReplayCorrupt SnapshotReplayStatus = "corrupt" )
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store persists daemon ledger events to ~/<root>/.agents/daemon/ledger.jsonl. Concurrent writers are serialized through s.mu; readers (Replay*) operate on committed file contents and do not contend for the lock — O_APPEND atomicity covers per-line consistency.
func (*Store) AppendLedgerEvent ¶
func (s *Store) AppendLedgerEvent(event LedgerEvent) (LedgerEvent, error)
func (*Store) DeleteSchedule ¶
DeleteSchedule removes a schedule by appending a schedule.deleted event. Idempotent: deleting a non-existent schedule is a no-op (logs a warning but returns nil) so callers don't have to guard against races.
func (*Store) LedgerArchivePaths ¶
LedgerArchivePaths returns the rotated ledger archives in chronological order (oldest first). Each path is one of ledger.<ts>.jsonl[.gz]; the active ledger.jsonl is excluded.
func (*Store) LedgerHealth ¶
func (s *Store) LedgerHealth(now time.Time, thresholds LedgerHealthThresholds) (LedgerHealth, error)
LedgerHealth gathers ledger-durability facts for the doctor surface. now must be supplied (caller's clock) so tests can be deterministic. thresholds uses LedgerHealthDefaultThresholds when its LedgerSizeWarnRatio is zero.
func (*Store) LedgerPath ¶
func (*Store) ListProjectionSnapshots ¶
ListProjectionSnapshots returns every snapshot file under the snapshot dir, sorted chronologically (oldest first). Both .json and .json.tmp leftovers from a crashed write are excluded; only completed snapshot-<ts>.json files are returned.
func (*Store) ListSchedules ¶
func (s *Store) ListSchedules() ([]RecurringJobTemplate, error)
ListSchedules returns the current set of recurring schedules derived from ledger replay. A schedule is present iff its most recent schedule.created has not been followed by a matching schedule.deleted. Order matches the order schedules were created (stable, from the ledger).
func (*Store) LoadLatestProjectionSnapshot ¶
func (s *Store) LoadLatestProjectionSnapshot() (ProjectionSet, string, error)
LoadLatestProjectionSnapshot finds the newest snapshot by filename ordering and json-decodes it. Returns (zero ProjectionSet, "", nil) when no snapshot exists — callers must distinguish empty-set from absent via the path.
func (*Store) ProjectionSnapshotDir ¶
ProjectionSnapshotDir returns the on-disk location for projection snapshots. The dir is created lazily by WriteProjectionSnapshot.
func (*Store) QuarantineDir ¶
func (*Store) ReadLedger ¶
func (s *Store) ReadLedger() ([]LedgerEvent, error)
func (*Store) RebuildProjections ¶
func (s *Store) RebuildProjections(opts ProjectionRebuildOptions) (ProjectionSet, error)
RebuildProjections rebuilds the projection set by replaying the store's ledger and folding events through the package-level RebuildProjections.
Callers MUST check err before using the returned ProjectionSet. On error, the returned set is zero-valued: SchemaVersion == 0, RebuiltAt == "", Manifests == nil, and Jobs/Schedules/derived buckets are nil. Reading any map field (e.g., set.Manifests[name]) on a zero-valued set returns the zero value, but mutating those nil maps (e.g., set.Manifests[name] = ...) WILL panic. The bug-hunt audit (W-B-22 / soc-58q5.7) confirmed all in-tree callers (server.go readState, projections_test.go) check err first; this godoc is a guard for future callers.
func (*Store) RebuildRPIRegistryProjection ¶
func (s *Store) RebuildRPIRegistryProjection() (DaemonRPIRegistryProjection, error)
func (*Store) RecordScheduleFired ¶
RecordScheduleFired appends a schedule.fired event linking a recurrence tick to the submission_id of the job it enqueued. Used by the recurrence supervisor (soc-8inr.4).
func (*Store) RecordScheduleSkipped ¶
RecordScheduleSkipped appends a schedule.skipped event when the supervisor elects not to enqueue a tick (e.g., backpressure: SkipIfRunning, MaxQueueDepth exceeded). reason is free-form so future backpressure strategies can record their own labels without a schema change.
func (*Store) ReplayLedger ¶
func (s *Store) ReplayLedger() (ReplayResult, error)
func (*Store) ReplayLedgerReadOnly ¶
func (s *Store) ReplayLedgerReadOnly() (ReplayResult, error)
func (*Store) SaveSchedule ¶
func (s *Store) SaveSchedule(t RecurringJobTemplate) error
SaveSchedule persists a recurring job template by appending a schedule.created event to the ledger. Returns an error if a schedule with the same Name already exists (no upsert; callers must DeleteSchedule first to replace).
The full RecurringJobTemplate is serialized into the event payload under the "template" key so replay can reconstruct the schedule list.
func (*Store) WithLedgerMaxBytes ¶
WithLedgerMaxBytes overrides the rotation threshold and returns the store for fluent configuration. Pass <=0 to disable rotation entirely.
func (*Store) WriteProjectionSnapshot ¶
func (s *Store) WriteProjectionSnapshot(set ProjectionSet) (string, error)
WriteProjectionSnapshot writes the ProjectionSet to a timestamped file under the snapshot dir using a temp-file + atomic rename so concurrent readers never observe a partial JSON document. Returns the absolute path written.
type SubmitJobInput ¶
type SubmitJobRequest ¶
type SubmitJobResponse ¶
type SubmitJobResponse struct {
Accepted bool `json:"accepted"`
RequestID string `json:"request_id"`
JobID string `json:"job_id"`
Status JobStatus `json:"status"`
LastEventID string `json:"last_event_id,omitempty"`
ProjectionStatus ProjectionStatus `json:"projection_status"`
ProjectionLag ProjectionLag `json:"projection_lag"`
DegradedReasons []string `json:"degraded_reasons,omitempty"`
IdempotencyKey string `json:"idempotency_key,omitempty"`
}
type Supervisor ¶
type Supervisor struct {
// contains filtered or unexported fields
}
Supervisor claims queue jobs, runs executors, and records terminal state.
func NewSupervisor ¶
func NewSupervisor(opts SupervisorOptions) (*Supervisor, error)
NewSupervisor builds a queue supervisor from explicit executors.
func (*Supervisor) RunLoop ¶
func (s *Supervisor) RunLoop(ctx context.Context) error
RunLoop claims and executes supported jobs until the context is cancelled.
func (*Supervisor) RunOnce ¶
func (s *Supervisor) RunOnce(ctx context.Context) (SupervisorRunOnceResult, error)
RunOnce attempts to claim and execute one supported job.
type SupervisorOptions ¶
type SupervisorOptions struct {
Queue *Queue
Executors []JobExecutor
Actor string
PollInterval time.Duration
HeartbeatInterval time.Duration
ExecutionTimeout time.Duration
}
SupervisorOptions configures a daemon queue supervisor.
type SupervisorRunOnceResult ¶
type SupervisorRunOnceResult struct {
Claimed bool
Job QueueJobState
}
SupervisorRunOnceResult reports one supervisor claim attempt.
type WikiForgeExecutor ¶
type WikiForgeExecutor struct {
// contains filtered or unexported fields
}
func NewWikiForgeExecutor ¶
func NewWikiForgeExecutor(opts WikiForgeExecutorOptions) (*WikiForgeExecutor, error)
func (*WikiForgeExecutor) JobTypes ¶
func (e *WikiForgeExecutor) JobTypes() []JobType
func (*WikiForgeExecutor) RunJob ¶
func (e *WikiForgeExecutor) RunJob(ctx context.Context, claim QueueLease) (JobExecutionResult, error)
type WikiForgeExecutorOptions ¶
type WikiForgeExecutorOptions struct {
Store *Store
Worker WikiForgeWorker
QuarantineDir string
}
type WikiForgeJobRunResult ¶
type WikiForgeJobRunResult struct {
JobID string `json:"job_id"`
DreamRunID string `json:"dream_run_id,omitempty"`
Status JobStatus `json:"status"`
Artifacts map[string]string `json:"artifacts,omitempty"`
ArtifactRefs map[string]ArtifactRef `json:"artifact_refs,omitempty"`
WorkerSessions []WikiWorkerSessionRef `json:"worker_sessions,omitempty"`
Failure *JobFailure `json:"failure,omitempty"`
}
type WikiForgeJobSpec ¶
type WikiForgeJobSpec struct {
SchemaVersion int `json:"schema_version"`
JobType JobType `json:"job_type"`
DreamRunID string `json:"dream_run_id,omitempty"`
SourcePaths []string `json:"source_paths"`
OutputDir string `json:"output_dir"`
WorkerKind agentworker.WorkerKind `json:"worker_kind"`
Provider agentworker.Provider `json:"provider"`
Model string `json:"model,omitempty"`
CWD string `json:"cwd,omitempty"`
MaxAttempts int `json:"max_attempts,omitempty"`
QuarantineDir string `json:"quarantine_dir,omitempty"`
}
func NewWikiForgeJobSpec ¶
func NewWikiForgeJobSpec(dreamRunID, outputDir string, sourcePaths []string) WikiForgeJobSpec
func WikiForgeJobSpecFromPayload ¶
func WikiForgeJobSpecFromPayload(payload map[string]any) (WikiForgeJobSpec, error)
func (WikiForgeJobSpec) ToJobSpec ¶
func (spec WikiForgeJobSpec) ToJobSpec(jobID string) (JobSpec, error)
func (WikiForgeJobSpec) Validate ¶
func (spec WikiForgeJobSpec) Validate() error
type WikiForgeRunner ¶
type WikiForgeRunner struct {
// contains filtered or unexported fields
}
func NewWikiForgeRunner ¶
func NewWikiForgeRunner(store *Store, opts WikiForgeRunnerOptions) (*WikiForgeRunner, error)
func (*WikiForgeRunner) RunWikiForgeJob ¶
func (r *WikiForgeRunner) RunWikiForgeJob(ctx context.Context, jobID string) (WikiForgeJobRunResult, error)
type WikiForgeRunnerOptions ¶
type WikiForgeRunnerOptions struct {
Queue *Queue
Worker WikiForgeWorker
Actor string
QuarantineDir string
}
type WikiForgeWorker ¶
type WikiForgeWorker interface {
RunExtractionWithRetry(context.Context, wikiworker.ExtractionRequest, wikiworker.RetryOptions) (wikiworker.ExtractionResult, error)
}
type WikiJobsProjection ¶
type WikiJobsProjection struct {
Jobs []JobProjection `json:"jobs"`
}
type WikiWorkerSessionRef ¶
type WikiWorkerSessionRef struct {
SourcePath string `json:"source_path"`
Session agentworker.SessionRef `json:"session"`
Terminal agentworker.TerminalState `json:"terminal"`
}
type WorkerKindDistribution ¶
Source Files
¶
- agent_update.go
- artifacts.go
- auth.go
- clock.go
- dream_executor.go
- dream_jobs.go
- events.go
- factory_admission.go
- factory_admission_executor.go
- factory_admission_jobs.go
- jobs.go
- ledger_health.go
- lifecycle.go
- plans_executor.go
- plans_jobs.go
- plans_projection.go
- projections.go
- reconcile.go
- recurrence.go
- recurrence_payload.go
- routing_policy.go
- rpi_executor.go
- rpi_jobs.go
- rpi_registry.go
- rpi_run.go
- rpi_runner.go
- server.go
- skill_invoke.go
- snapshot.go
- store.go
- supervisor.go
- telemetry.go
- types.go
- wiki_executor.go
- wiki_jobs.go