Documentation
¶
Overview ¶
Package pool: ingest_orchestrator.go provides a shared file-iteration loop used by both `ao pool ingest` and `ao flywheel close-loop`. The two paths previously inlined the same per-file ReadFile + parse-header + parse-blocks + per-block ingest + per-file processed-file tracking pattern. This helper extracts the outer loop while keeping all parsing/scoring/move helpers in cmd/ao via injected callbacks — the orchestrator is purely structural and has no dependency on the candidate-building or pool-package internals.
Package pool manages knowledge candidate pools for the quality pipeline. Candidates flow through: pending → staged → promoted (or rejected)
Index ¶
- Constants
- Variables
- func CanonicalContentBody(markdown string) string
- func CollectArchivedArtifactFiles(baseDir string) ([]string, error)
- func CollectPromotedArtifactFiles(baseDir string) ([]string, error)
- func ComputeActionabilityScore(body string) float64
- func ComputeContextScore(lower string) float64
- func ComputeNoveltyScore(body string) float64
- func ComputeSpecificityScore(body, lower string) float64
- func ConfidenceToScore(s string) float64
- func ContentHash(body string) string
- func ExtractFirstHeadingText(body string) string
- func ExtractPromotedArtifactBody(markdown string) (string, bool)
- func ExtractPromotedArtifactBodyFile(path string) (string, bool)
- func IsSlugAlphanumeric(r rune) bool
- func IterateIngestFiles(files []string, opts IngestOrchestratorOpts)
- func LoadArchivedContentHashes(baseDir string) map[string]string
- func LoadKnownPromotedContentHashes(baseDir string) map[string]string
- func LoadPromotedContentHashes(baseDir string) map[string]string
- func ParseYAMLFrontmatter(raw string) map[string]string
- func Slugify(s string) string
- type ChainEvent
- type IngestFileFn
- type IngestOrchestratorOpts
- type LearningBlock
- type ListOptions
- type ListResult
- type MoveProcessedFn
- type Pool
- func (p *Pool) Add(candidate types.Candidate, scoring types.Scoring) error
- func (p *Pool) AddAt(candidate types.Candidate, scoring types.Scoring, addedAt time.Time) error
- func (p *Pool) AppendPromotedIndexEntry(contentHash, artifactPath, candidateID string) error
- func (p *Pool) Approve(candidateID, note, reviewer string) error
- func (p *Pool) BulkApprove(olderThan time.Duration, reviewer string, dryRun bool) ([]string, error)
- func (p *Pool) FindByPrefix(prefix string) ([]*PoolEntry, error)
- func (p *Pool) Get(candidateID string) (*PoolEntry, error)
- func (p *Pool) GetChain() (events []ChainEvent, err error)
- func (p *Pool) Init() error
- func (p *Pool) List(opts ListOptions) ([]PoolEntry, error)
- func (p *Pool) ListPaginated(opts ListOptions) (*ListResult, error)
- func (p *Pool) ListPendingReview() ([]PoolEntry, error)
- func (p *Pool) Promote(candidateID string) (string, error)
- func (p *Pool) PromotedIndexPath() string
- func (p *Pool) RecordSkip(candidateID, reason, reviewer string) error
- func (p *Pool) Reject(candidateID, reason, reviewer string) error
- func (p *Pool) Stage(candidateID string, minTier types.Tier) error
- type PoolEntry
- type PromotedIndexEntry
Constants ¶
const ( // PoolDir is the base directory for pool storage. PoolDir = ".agents/pool" // PendingDir holds candidates awaiting scoring/review. PendingDir = "pending" // StagedDir holds candidates ready for promotion. StagedDir = "staged" // RejectedDir holds rejected candidates for audit. RejectedDir = "rejected" // ValidatedDir holds validated candidates (legacy name for staged). ValidatedDir = "validated" // IndexFile is the JSONL index of all pool entries. IndexFile = "index.jsonl" // ChainFile records all pool operations. ChainFile = "chain.jsonl" )
const MaxReasonLength = 1000
MaxReasonLength is the maximum length for reason/note fields. Prevents excessively large review notes that could slow down operations.
const MinBulkApproveThreshold = time.Hour
MinBulkApproveThreshold is the minimum duration for bulk approval. Prevents accidental approval of very recent candidates.
const PromotedIndexFile = promotedIndexFile
PromotedIndexFile is the basename of the promoted-content-hash sidecar inside .agents/pool. Exposed so external callers (e.g. reindex) can write to the same file Promote reads.
Variables ¶
var ( // ErrEmptyID is returned when a candidate ID is empty. ErrEmptyID = errors.New("candidate ID cannot be empty") // ErrIDTooLong is returned when a candidate ID exceeds 128 characters. ErrIDTooLong = errors.New("candidate ID too long (max 128 characters)") // ErrIDInvalidChars is returned when a candidate ID contains disallowed characters. ErrIDInvalidChars = errors.New("candidate ID contains invalid characters (only alphanumeric, hyphen, underscore allowed)") // ErrCandidateNotFound is returned when a candidate cannot be located in the pool. ErrCandidateNotFound = errors.New("candidate not found") // ErrStageRejected is returned when attempting to stage a rejected candidate. ErrStageRejected = errors.New("cannot stage rejected candidate") // ErrPromoteRejected is returned when attempting to promote a rejected candidate. ErrPromoteRejected = errors.New("cannot promote rejected candidate") // ErrNotStaged is returned when attempting to promote a candidate that is not staged. ErrNotStaged = errors.New("candidate must be staged before promotion") // ErrThresholdTooLow is returned when bulk approval threshold is below minimum. ErrThresholdTooLow = errors.New("threshold must be >= 1h") // ErrReasonTooLong is returned when reason/note exceeds MaxReasonLength. ErrReasonTooLong = fmt.Errorf("reason/note exceeds maximum length of %d characters", MaxReasonLength) )
Sentinel errors for common pool operations.
Functions ¶
func CanonicalContentBody ¶
CanonicalContentBody returns the semantic body used for promoted-content hashing. Pending captures and older Pool.writeArtifact output can both contain nested "## What We Learned" sections, so the last matching section is treated as the highest-signal body. Documents without that section fall back to the post-frontmatter text.
func CollectArchivedArtifactFiles ¶
CollectArchivedArtifactFiles returns markdown artifacts that were archived by AgentOps corpus cleanup. These bodies are no longer active promoted artifacts, but they are known corpus bodies and must not be recreated from stale pending or pool state.
func CollectPromotedArtifactFiles ¶
CollectPromotedArtifactFiles walks .agents/learnings and .agents/patterns under baseDir and returns all markdown artifacts in deterministic order.
func ComputeActionabilityScore ¶
ComputeActionabilityScore scores how actionable a learning body is.
func ComputeContextScore ¶
ComputeContextScore scores how well the body documents its context.
func ComputeNoveltyScore ¶
ComputeNoveltyScore scores novelty based on body length heuristics.
func ComputeSpecificityScore ¶
ComputeSpecificityScore scores how specific a learning body is.
func ConfidenceToScore ¶
ConfidenceToScore maps high/medium/low confidence strings to a numeric score.
func ContentHash ¶
ContentHash computes the dedup key used by Promote for a given candidate content body. Exported so reindex/backfill paths can compute the same hash from on-disk artifact bodies without constructing a synthetic Candidate. Identical to candidateContentHash(types.Candidate{Content: body}).
func ExtractFirstHeadingText ¶
ExtractFirstHeadingText returns the first non-empty text line with leading '#' characters trimmed.
func ExtractPromotedArtifactBody ¶
ExtractPromotedArtifactBody returns the canonical candidate body from a promoted artifact markdown document. Pool.writeArtifact stores candidate content under "## What We Learned"; older hand-authored artifacts fall back to the post-frontmatter document body so they still participate in live dedupe scans and reindex backfills.
func ExtractPromotedArtifactBodyFile ¶
ExtractPromotedArtifactBodyFile reads path and extracts the canonical body with ExtractPromotedArtifactBody.
func IsSlugAlphanumeric ¶
IsSlugAlphanumeric reports whether the rune is kept as-is in a slug.
func IterateIngestFiles ¶
func IterateIngestFiles(files []string, opts IngestOrchestratorOpts)
IterateIngestFiles drives the shared outer file-iteration loop. For each path it reads the file once, hands the bytes to IngestFile, and tracks processed-file state when configured. Read errors do not stop sibling files — every file is independently processed.
This function deliberately accepts no pool reference and returns no error: all behavior is observable through the callbacks and the optional MoveProcessed terminator.
func LoadArchivedContentHashes ¶
LoadArchivedContentHashes returns hashes for cleanup-archived knowledge artifacts. Use this to block recreation of seen-again bodies; do not use it for pool reindex because archived paths are not active promoted artifacts.
func LoadKnownPromotedContentHashes ¶
LoadKnownPromotedContentHashes combines active promoted artifacts and cleanup archives. Active artifacts win when the same hash exists in both.
func LoadPromotedContentHashes ¶
LoadPromotedContentHashes live-scans surviving promoted artifacts and returns content_hash -> artifact_path. It is the canonical fallback when promoted-index.jsonl is missing or stale.
func ParseYAMLFrontmatter ¶
ParseYAMLFrontmatter parses a raw YAML frontmatter block into a string map.
Types ¶
type ChainEvent ¶
type ChainEvent struct {
// Timestamp is when the event occurred.
Timestamp time.Time `json:"timestamp"`
// Operation is the action taken (add, stage, promote, reject).
Operation string `json:"operation"`
// CandidateID is the affected candidate.
CandidateID string `json:"candidate_id"`
// FromStatus is the previous status.
FromStatus types.PoolStatus `json:"from_status,omitempty"`
// ToStatus is the new status.
ToStatus types.PoolStatus `json:"to_status,omitempty"`
// Reason explains why the operation occurred.
Reason string `json:"reason,omitempty"`
// Reviewer is who performed the operation.
Reviewer string `json:"reviewer,omitempty"`
// ArtifactPath is the destination path for promotions.
ArtifactPath string `json:"artifact_path,omitempty"`
}
ChainEvent records a pool operation.
type IngestFileFn ¶
IngestFileFn ingests every learning block from a single file's bytes. It is implemented in cmd/ao using the existing parsePendingFileHeader, parseLearningBlocks, and ingestFileBlocks helpers. It must update any caller-owned counters via closure capture (the orchestrator does not inspect the result struct itself).
Returns true when the file produced an ingest error that should suppress downstream processed-file move behavior.
type IngestOrchestratorOpts ¶
type IngestOrchestratorOpts struct {
// IngestFile must be non-nil. Called once per input file.
IngestFile IngestFileFn
// TrackProcessed enables processed-file accumulation. When true, each
// file that ingests without errors is appended to the processed list and
// passed to MoveProcessed at the end. Set to false for callers that
// must not move files (e.g., the in-process flywheel close-loop path,
// which historically left files in pending/ for separate orchestration).
TrackProcessed bool
// MoveProcessed is invoked once with the accumulated processed-files list.
// Required when TrackProcessed is true; ignored otherwise.
MoveProcessed MoveProcessedFn
// OnReadError is called when os.ReadFile fails for a file. The orchestrator
// always skips the file in that case; the callback lets the caller record
// the error in its own result struct (e.g., bumping res.Errors and emitting
// a verbose-print warning). May be nil.
OnReadError func(path string, err error)
}
IngestOrchestratorOpts wires the cmd/ao-side helpers into the shared loop.
type LearningBlock ¶
LearningBlock is a parsed markdown learning block ready for scoring/ingest.
func ParseLearningBlocks ¶
func ParseLearningBlocks(md string) []LearningBlock
ParseLearningBlocks extracts one or more LearningBlock entries from markdown. Falls back to a single legacy frontmatter-based block when no "# Learning:" header is present.
func ParseLegacyFrontmatterLearning ¶
func ParseLegacyFrontmatterLearning(md string) (LearningBlock, bool)
ParseLegacyFrontmatterLearning parses a single legacy /learn-style markdown file (YAML frontmatter + body) into a LearningBlock.
type ListOptions ¶
type ListOptions struct {
// Tier filters by quality tier.
Tier types.Tier
// Status filters by pool status.
Status types.PoolStatus
// Offset skips the first N results (for pagination).
Offset int
// Limit caps the number of results.
Limit int
}
ListOptions configures pool listing.
type ListResult ¶
type ListResult struct {
// Entries is the page of results.
Entries []PoolEntry
// Total is the count of all matching entries before pagination.
Total int
}
ListResult contains pool entries and pagination metadata.
type MoveProcessedFn ¶
type MoveProcessedFn func(processed []string)
MoveProcessedFn moves successfully processed files to the processed directory. Implemented in cmd/ao via moveIngestedFiles. Called once at the end of the run with the full list of files that completed without errors (and only when GetDryRun() is false at the call site).
type Pool ¶
type Pool struct {
// BaseDir is the working directory.
BaseDir string
// PoolPath is the full path to .agents/pool.
PoolPath string
}
Pool manages the candidate pool.
func (*Pool) AddAt ¶
AddAt adds a new candidate to the pending pool with a caller-supplied AddedAt timestamp. This is useful when ingesting historical artifacts where "age" should reflect the original creation/modification time, not the ingestion time.
func (*Pool) AppendPromotedIndexEntry ¶
AppendPromotedIndexEntry appends a single entry to the promoted-content-hash sidecar. Mirrors the write path used by Promote so reindex/backfill writes are byte-compatible with regular promotions.
func (*Pool) BulkApprove ¶
BulkApprove approves all silver candidates older than threshold. Returns ErrThresholdTooLow if olderThan < 1h to prevent accidental mass approval.
func (*Pool) FindByPrefix ¶
FindByPrefix finds candidates whose ID starts with the given prefix. Returns matching entries. Useful for shortened ID lookups.
func (*Pool) GetChain ¶
func (p *Pool) GetChain() (events []ChainEvent, err error)
GetChain returns all chain events.
func (*Pool) List ¶
func (p *Pool) List(opts ListOptions) ([]PoolEntry, error)
List returns pool entries matching the options.
func (*Pool) ListPaginated ¶
func (p *Pool) ListPaginated(opts ListOptions) (*ListResult, error)
ListPaginated returns pool entries with pagination metadata.
func (*Pool) ListPendingReview ¶
ListPendingReview returns bronze candidates awaiting human review.
func (*Pool) Promote ¶
Promote moves a staged candidate to learnings/patterns.
Content-hash dedup contract (soc-f2q4 / 4-A): if the candidate's content body has already been promoted by this pool, the existing artifact path is returned and no new file is written. This matches the harvest catalog contract where ContentHash is the only collapse key, and prevents two rewrites of the same insight from re-bloating the global hub.
func (*Pool) PromotedIndexPath ¶
PromotedIndexPath returns the absolute path to the promoted-content-hash sidecar for this pool.
func (*Pool) RecordSkip ¶
RecordSkip appends an auditable skip event for candidates rejected before a pool entry exists, such as pending knowledge whose body already has a promoted artifact on disk.
type PoolEntry ¶
type PoolEntry struct {
types.PoolEntry
// FilePath is where this entry is stored.
FilePath string `json:"file_path,omitempty"`
// Age is how long since the entry was added.
Age time.Duration `json:"-"`
// AgeString is the human-readable age.
AgeString string `json:"age,omitempty"`
// ApproachingAutoPromote indicates if nearing 24h threshold.
ApproachingAutoPromote bool `json:"approaching_auto_promote,omitempty"`
}
PoolEntry extends types.PoolEntry with operational fields.
type PromotedIndexEntry ¶
type PromotedIndexEntry = promotedIndexEntry
PromotedIndexEntry is one line of the promoted-index sidecar. Exposed so reindex can read existing entries and append new ones with the same shape Promote writes.