pool

package
v0.0.0-...-215fcef Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 3, 2026 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Overview

Package pool: ingest_orchestrator.go provides a shared file-iteration loop used by both `ao pool ingest` and `ao flywheel close-loop`. The two paths previously inlined the same per-file ReadFile + parse-header + parse-blocks + per-block ingest + per-file processed-file tracking pattern. This helper extracts the outer loop while keeping all parsing/scoring/move helpers in cmd/ao via injected callbacks — the orchestrator is purely structural and has no dependency on the candidate-building or pool-package internals.

Package pool manages knowledge candidate pools for the quality pipeline. Candidates flow through: pending → staged → promoted (or rejected)

Index

Constants

View Source
const (
	// PoolDir is the base directory for pool storage.
	PoolDir = ".agents/pool"

	// PendingDir holds candidates awaiting scoring/review.
	PendingDir = "pending"

	// StagedDir holds candidates ready for promotion.
	StagedDir = "staged"

	// RejectedDir holds rejected candidates for audit.
	RejectedDir = "rejected"

	// ValidatedDir holds validated candidates (legacy name for staged).
	ValidatedDir = "validated"

	// IndexFile is the JSONL index of all pool entries.
	IndexFile = "index.jsonl"

	// ChainFile records all pool operations.
	ChainFile = "chain.jsonl"
)
View Source
const MaxReasonLength = 1000

MaxReasonLength is the maximum length for reason/note fields. Prevents excessively large review notes that could slow down operations.

View Source
const MinBulkApproveThreshold = time.Hour

MinBulkApproveThreshold is the minimum duration for bulk approval. Prevents accidental approval of very recent candidates.

View Source
const PromotedIndexFile = promotedIndexFile

PromotedIndexFile is the basename of the promoted-content-hash sidecar inside .agents/pool. Exposed so external callers (e.g. reindex) can write to the same file Promote reads.

Variables

View Source
var (
	// ErrEmptyID is returned when a candidate ID is empty.
	ErrEmptyID = errors.New("candidate ID cannot be empty")
	// ErrIDTooLong is returned when a candidate ID exceeds 128 characters.
	ErrIDTooLong = errors.New("candidate ID too long (max 128 characters)")
	// ErrIDInvalidChars is returned when a candidate ID contains disallowed characters.
	ErrIDInvalidChars = errors.New("candidate ID contains invalid characters (only alphanumeric, hyphen, underscore allowed)")
	// ErrCandidateNotFound is returned when a candidate cannot be located in the pool.
	ErrCandidateNotFound = errors.New("candidate not found")
	// ErrStageRejected is returned when attempting to stage a rejected candidate.
	ErrStageRejected = errors.New("cannot stage rejected candidate")
	// ErrPromoteRejected is returned when attempting to promote a rejected candidate.
	ErrPromoteRejected = errors.New("cannot promote rejected candidate")
	// ErrNotStaged is returned when attempting to promote a candidate that is not staged.
	ErrNotStaged = errors.New("candidate must be staged before promotion")
	// ErrThresholdTooLow is returned when bulk approval threshold is below minimum.
	ErrThresholdTooLow = errors.New("threshold must be >= 1h")
	// ErrReasonTooLong is returned when reason/note exceeds MaxReasonLength.
	ErrReasonTooLong = fmt.Errorf("reason/note exceeds maximum length of %d characters", MaxReasonLength)
)

Sentinel errors for common pool operations.

Functions

func CanonicalContentBody

func CanonicalContentBody(markdown string) string

CanonicalContentBody returns the semantic body used for promoted-content hashing. Pending captures and older Pool.writeArtifact output can both contain nested "## What We Learned" sections, so the last matching section is treated as the highest-signal body. Documents without that section fall back to the post-frontmatter text.

func CollectArchivedArtifactFiles

func CollectArchivedArtifactFiles(baseDir string) ([]string, error)

CollectArchivedArtifactFiles returns markdown artifacts that were archived by AgentOps corpus cleanup. These bodies are no longer active promoted artifacts, but they are known corpus bodies and must not be recreated from stale pending or pool state.

func CollectPromotedArtifactFiles

func CollectPromotedArtifactFiles(baseDir string) ([]string, error)

CollectPromotedArtifactFiles walks .agents/learnings and .agents/patterns under baseDir and returns all markdown artifacts in deterministic order.

func ComputeActionabilityScore

func ComputeActionabilityScore(body string) float64

ComputeActionabilityScore scores how actionable a learning body is.

func ComputeContextScore

func ComputeContextScore(lower string) float64

ComputeContextScore scores how well the body documents its context.

func ComputeNoveltyScore

func ComputeNoveltyScore(body string) float64

ComputeNoveltyScore scores novelty based on body length heuristics.

func ComputeSpecificityScore

func ComputeSpecificityScore(body, lower string) float64

ComputeSpecificityScore scores how specific a learning body is.

func ConfidenceToScore

func ConfidenceToScore(s string) float64

ConfidenceToScore maps high/medium/low confidence strings to a numeric score.

func ContentHash

func ContentHash(body string) string

ContentHash computes the dedup key used by Promote for a given candidate content body. Exported so reindex/backfill paths can compute the same hash from on-disk artifact bodies without constructing a synthetic Candidate. Identical to candidateContentHash(types.Candidate{Content: body}).

func ExtractFirstHeadingText

func ExtractFirstHeadingText(body string) string

ExtractFirstHeadingText returns the first non-empty text line with leading '#' characters trimmed.

func ExtractPromotedArtifactBody

func ExtractPromotedArtifactBody(markdown string) (string, bool)

ExtractPromotedArtifactBody returns the canonical candidate body from a promoted artifact markdown document. Pool.writeArtifact stores candidate content under "## What We Learned"; older hand-authored artifacts fall back to the post-frontmatter document body so they still participate in live dedupe scans and reindex backfills.

func ExtractPromotedArtifactBodyFile

func ExtractPromotedArtifactBodyFile(path string) (string, bool)

ExtractPromotedArtifactBodyFile reads path and extracts the canonical body with ExtractPromotedArtifactBody.

func IsSlugAlphanumeric

func IsSlugAlphanumeric(r rune) bool

IsSlugAlphanumeric reports whether the rune is kept as-is in a slug.

func IterateIngestFiles

func IterateIngestFiles(files []string, opts IngestOrchestratorOpts)

IterateIngestFiles drives the shared outer file-iteration loop. For each path it reads the file once, hands the bytes to IngestFile, and tracks processed-file state when configured. Read errors do not stop sibling files — every file is independently processed.

This function deliberately accepts no pool reference and returns no error: all behavior is observable through the callbacks and the optional MoveProcessed terminator.

func LoadArchivedContentHashes

func LoadArchivedContentHashes(baseDir string) map[string]string

LoadArchivedContentHashes returns hashes for cleanup-archived knowledge artifacts. Use this to block recreation of seen-again bodies; do not use it for pool reindex because archived paths are not active promoted artifacts.

func LoadKnownPromotedContentHashes

func LoadKnownPromotedContentHashes(baseDir string) map[string]string

LoadKnownPromotedContentHashes combines active promoted artifacts and cleanup archives. Active artifacts win when the same hash exists in both.

func LoadPromotedContentHashes

func LoadPromotedContentHashes(baseDir string) map[string]string

LoadPromotedContentHashes live-scans surviving promoted artifacts and returns content_hash -> artifact_path. It is the canonical fallback when promoted-index.jsonl is missing or stale.

func ParseYAMLFrontmatter

func ParseYAMLFrontmatter(raw string) map[string]string

ParseYAMLFrontmatter parses a raw YAML frontmatter block into a string map.

func Slugify

func Slugify(s string) string

Slugify converts a string to a dash-separated lowercase slug.

Types

type ChainEvent

type ChainEvent struct {
	// Timestamp is when the event occurred.
	Timestamp time.Time `json:"timestamp"`

	// Operation is the action taken (add, stage, promote, reject).
	Operation string `json:"operation"`

	// CandidateID is the affected candidate.
	CandidateID string `json:"candidate_id"`

	// FromStatus is the previous status.
	FromStatus types.PoolStatus `json:"from_status,omitempty"`

	// ToStatus is the new status.
	ToStatus types.PoolStatus `json:"to_status,omitempty"`

	// Reason explains why the operation occurred.
	Reason string `json:"reason,omitempty"`

	// Reviewer is who performed the operation.
	Reviewer string `json:"reviewer,omitempty"`

	// ArtifactPath is the destination path for promotions.
	ArtifactPath string `json:"artifact_path,omitempty"`
}

ChainEvent records a pool operation.

type IngestFileFn

type IngestFileFn func(path string, data []byte) (hadError bool)

IngestFileFn ingests every learning block from a single file's bytes. It is implemented in cmd/ao using the existing parsePendingFileHeader, parseLearningBlocks, and ingestFileBlocks helpers. It must update any caller-owned counters via closure capture (the orchestrator does not inspect the result struct itself).

Returns true when the file produced an ingest error that should suppress downstream processed-file move behavior.

type IngestOrchestratorOpts

type IngestOrchestratorOpts struct {
	// IngestFile must be non-nil. Called once per input file.
	IngestFile IngestFileFn

	// TrackProcessed enables processed-file accumulation. When true, each
	// file that ingests without errors is appended to the processed list and
	// passed to MoveProcessed at the end. Set to false for callers that
	// must not move files (e.g., the in-process flywheel close-loop path,
	// which historically left files in pending/ for separate orchestration).
	TrackProcessed bool

	// MoveProcessed is invoked once with the accumulated processed-files list.
	// Required when TrackProcessed is true; ignored otherwise.
	MoveProcessed MoveProcessedFn

	// OnReadError is called when os.ReadFile fails for a file. The orchestrator
	// always skips the file in that case; the callback lets the caller record
	// the error in its own result struct (e.g., bumping res.Errors and emitting
	// a verbose-print warning). May be nil.
	OnReadError func(path string, err error)
}

IngestOrchestratorOpts wires the cmd/ao-side helpers into the shared loop.

type LearningBlock

type LearningBlock struct {
	Title      string
	ID         string
	Category   string
	Confidence string
	Body       string
}

LearningBlock is a parsed markdown learning block ready for scoring/ingest.

func ParseLearningBlocks

func ParseLearningBlocks(md string) []LearningBlock

ParseLearningBlocks extracts one or more LearningBlock entries from markdown. Falls back to a single legacy frontmatter-based block when no "# Learning:" header is present.

func ParseLegacyFrontmatterLearning

func ParseLegacyFrontmatterLearning(md string) (LearningBlock, bool)

ParseLegacyFrontmatterLearning parses a single legacy /learn-style markdown file (YAML frontmatter + body) into a LearningBlock.

type ListOptions

type ListOptions struct {
	// Tier filters by quality tier.
	Tier types.Tier

	// Status filters by pool status.
	Status types.PoolStatus

	// Offset skips the first N results (for pagination).
	Offset int

	// Limit caps the number of results.
	Limit int
}

ListOptions configures pool listing.

type ListResult

type ListResult struct {
	// Entries is the page of results.
	Entries []PoolEntry

	// Total is the count of all matching entries before pagination.
	Total int
}

ListResult contains pool entries and pagination metadata.

type MoveProcessedFn

type MoveProcessedFn func(processed []string)

MoveProcessedFn moves successfully processed files to the processed directory. Implemented in cmd/ao via moveIngestedFiles. Called once at the end of the run with the full list of files that completed without errors (and only when GetDryRun() is false at the call site).

type Pool

type Pool struct {
	// BaseDir is the working directory.
	BaseDir string

	// PoolPath is the full path to .agents/pool.
	PoolPath string
}

Pool manages the candidate pool.

func NewPool

func NewPool(baseDir string) *Pool

NewPool creates a new pool manager.

func (*Pool) Add

func (p *Pool) Add(candidate types.Candidate, scoring types.Scoring) error

Add adds a new candidate to the pending pool.

func (*Pool) AddAt

func (p *Pool) AddAt(candidate types.Candidate, scoring types.Scoring, addedAt time.Time) error

AddAt adds a new candidate to the pending pool with a caller-supplied AddedAt timestamp. This is useful when ingesting historical artifacts where "age" should reflect the original creation/modification time, not the ingestion time.

func (*Pool) AppendPromotedIndexEntry

func (p *Pool) AppendPromotedIndexEntry(contentHash, artifactPath, candidateID string) error

AppendPromotedIndexEntry appends a single entry to the promoted-content-hash sidecar. Mirrors the write path used by Promote so reindex/backfill writes are byte-compatible with regular promotions.

func (*Pool) Approve

func (p *Pool) Approve(candidateID, note, reviewer string) error

Approve records human approval for a bronze candidate.

func (*Pool) BulkApprove

func (p *Pool) BulkApprove(olderThan time.Duration, reviewer string, dryRun bool) ([]string, error)

BulkApprove approves all silver candidates older than threshold. Returns ErrThresholdTooLow if olderThan < 1h to prevent accidental mass approval.

func (*Pool) FindByPrefix

func (p *Pool) FindByPrefix(prefix string) ([]*PoolEntry, error)

FindByPrefix finds candidates whose ID starts with the given prefix. Returns matching entries. Useful for shortened ID lookups.

func (*Pool) Get

func (p *Pool) Get(candidateID string) (*PoolEntry, error)

Get retrieves a specific candidate by ID.

func (*Pool) GetChain

func (p *Pool) GetChain() (events []ChainEvent, err error)

GetChain returns all chain events.

func (*Pool) Init

func (p *Pool) Init() error

Init creates the required directory structure.

func (*Pool) List

func (p *Pool) List(opts ListOptions) ([]PoolEntry, error)

List returns pool entries matching the options.

func (*Pool) ListPaginated

func (p *Pool) ListPaginated(opts ListOptions) (*ListResult, error)

ListPaginated returns pool entries with pagination metadata.

func (*Pool) ListPendingReview

func (p *Pool) ListPendingReview() ([]PoolEntry, error)

ListPendingReview returns bronze candidates awaiting human review.

func (*Pool) Promote

func (p *Pool) Promote(candidateID string) (string, error)

Promote moves a staged candidate to learnings/patterns.

Content-hash dedup contract (soc-f2q4 / 4-A): if the candidate's content body has already been promoted by this pool, the existing artifact path is returned and no new file is written. This matches the harvest catalog contract where ContentHash is the only collapse key, and prevents two rewrites of the same insight from re-bloating the global hub.

func (*Pool) PromotedIndexPath

func (p *Pool) PromotedIndexPath() string

PromotedIndexPath returns the absolute path to the promoted-content-hash sidecar for this pool.

func (*Pool) RecordSkip

func (p *Pool) RecordSkip(candidateID, reason, reviewer string) error

RecordSkip appends an auditable skip event for candidates rejected before a pool entry exists, such as pending knowledge whose body already has a promoted artifact on disk.

func (*Pool) Reject

func (p *Pool) Reject(candidateID, reason, reviewer string) error

Reject marks a candidate as rejected.

func (*Pool) Stage

func (p *Pool) Stage(candidateID string, minTier types.Tier) error

Stage moves a candidate from pending to staged.

type PoolEntry

type PoolEntry struct {
	types.PoolEntry

	// FilePath is where this entry is stored.
	FilePath string `json:"file_path,omitempty"`

	// Age is how long since the entry was added.
	Age time.Duration `json:"-"`

	// AgeString is the human-readable age.
	AgeString string `json:"age,omitempty"`

	// ApproachingAutoPromote indicates if nearing 24h threshold.
	ApproachingAutoPromote bool `json:"approaching_auto_promote,omitempty"`
}

PoolEntry extends types.PoolEntry with operational fields.

type PromotedIndexEntry

type PromotedIndexEntry = promotedIndexEntry

PromotedIndexEntry is one line of the promoted-index sidecar. Exposed so reindex can read existing entries and append new ones with the same shape Promote writes.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL