curing

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 7, 2026 License: GPL-3.0 Imports: 21 Imported by: 0

Documentation

Overview

Package curing loads curing workflow definitions and provides a first-match router for mapping intake events to curing workflows.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func LoadDir

func LoadDir(dir string) ([]model.CuringDefinition, error)

LoadDir reads all *.curing.yaml files from dir and returns validated CuringDefinitions. If dir is empty or does not exist, an empty slice is returned without error. Individual files that fail to parse or validate are collected and returned as a combined error.

Types

type Router

type Router struct {
	// contains filtered or unexported fields
}

Router performs first-match routing from an intake event to a TanneryRoute. Route order is preserved from the configuration slice.

func NewRouter

func NewRouter(routes []model.TanneryRoute) *Router

NewRouter returns a Router backed by the provided routes.

func (*Router) Match

func (r *Router) Match(source, eventType string) (model.TanneryRoute, bool)

Match returns the first TanneryRoute whose Match.Source equals source and whose Match.EventType either is empty (wildcard) or equals eventType. Returns (zero, false) when no route matches.

func (*Router) MatchAll

func (r *Router) MatchAll(source, eventType string) []model.TanneryRoute

MatchAll returns all TanneryRoutes whose Match.Source equals source and whose Match.EventType either is empty (wildcard) or equals eventType. Order is preserved from configuration. Returns nil when no route matches.

func (*Router) Routes

func (r *Router) Routes() []model.TanneryRoute

Routes returns the ordered list of routes held by this Router.

type RunnerDeps

type RunnerDeps struct {
	Client        session.LLMClient
	ToolReg       *tool.Registry
	Log           *logging.Logger
	MaxToolRounds int
	Cache         *cache.FileCache
	QueueMgr      *queue.Manager
	Notifiers     map[string]notify.Notifier
	MCPReg        *mcp.Registry
	// Budget is the base token budget derived from the global config.
	// Per-agent MaxTokens overrides are applied at process time.
	Budget model.TokenBudget
	// ProgressFn, when non-nil, is forwarded to each runner.Runner so the caller
	// receives live progress events (tool calls, results, extracts) during a turn.
	ProgressFn func(runner.ProgressEvent)
	// ProgressWithAgent, when non-nil, is called for every runner progress event
	// with the curing and agent name included. Useful for callers that need to tag
	// events by agent (e.g. DevTools bus) without building a per-agent ProgressFn closure.
	ProgressWithAgent func(curingName, agentName string, ev runner.ProgressEvent)
	// DebugContextFn, when non-nil, is forwarded to each runner.Runner so the
	// caller can inspect the exact message window before each LLM completion.
	DebugContextFn func(runner.ContextSnapshot)
	// OnComplete, when non-nil, is called after a successful curing item — artifact
	// written, before hide cleanup. Callers use this to accumulate token statistics,
	// persist run records, or render pretty output. Called synchronously; must not
	// block for long (it runs inside the worker goroutine).
	// events holds all ProgressEvents fired by the runner during the run so callers
	// can replay them into a pretty printer without needing a real-time ProgressFn.
	OnComplete func(ag model.Agent, rec model.RunRecord, art model.Artifact, events []runner.ProgressEvent)
	// OnRunRecord, when non-nil, is called after every curing item run — success
	// or failure — with the resulting RunRecord and the error (nil on success).
	// Used to persist run history for curing-driven agents (which never reach
	// OnComplete on failure). Called synchronously on the worker goroutine.
	OnRunRecord func(ag model.Agent, rec model.RunRecord, runErr error)
	// EventFn, when non-nil, receives pipeline lifecycle events (dequeue, retry, dlq).
	// Called synchronously on the worker goroutine; must not block for long.
	// T4.8: by default the Worker serializes EventFn calls under a mutex so an
	// EventFn that mutates shared state without internal locking is safe.
	// Callers whose EventFn is itself thread-safe can set EventFnConcurrent: true
	// to opt out and let the worker fan out without serialization.
	EventFn func(TanneryEvent)
	// EventFnConcurrent disables the default EventFn serialization. Default false
	// (serialized).
	EventFnConcurrent bool
}

RunnerDeps groups the dependencies needed to construct a runner.Runner per call. Owned by the Supervisor and shared (by pointer) across all Workers.

type Supervisor

type Supervisor struct {
	// contains filtered or unexported fields
}

Supervisor manages a fleet of Workers across all configured curings. One Worker is created per CuringDefinition; concurrency is controlled per-queue via QueueConcurrencyConfig.Concurrency (the semaphore on each Worker).

func NewSupervisor

func NewSupervisor(
	defs []model.CuringDefinition,
	agents map[string]model.Agent,
	concMap map[string]model.QueueConcurrencyConfig,
	hideStore *hide.Store,
	artStore *artifact.Store,
	deps *RunnerDeps,
	qmgr *queue.Manager,
	router *Router,
	log *logging.Logger,
) (*Supervisor, error)

NewSupervisor constructs one Worker per CuringDefinition. concMap maps queue name to QueueConcurrencyConfig. Missing queue entries default to concurrency=1, max_attempts=3. deps is shared (by pointer) across all workers.

func (*Supervisor) Drain

func (s *Supervisor) Drain()

Drain waits for all worker Run loops to exit AND for any in-flight item handlers spawned by those loops to finish. Call after ctx is cancelled during graceful shutdown.

Returns when both conditions are met. There is no internal timeout; callers that need a bounded shutdown should run Drain in a goroutine and select on their own deadline.

func (*Supervisor) Start

func (s *Supervisor) Start(ctx context.Context)

Start launches one goroutine per Worker. Goroutines are tracked by the internal WaitGroup and exit when ctx is cancelled.

func (*Supervisor) TotalActive added in v0.3.0

func (s *Supervisor) TotalActive() int

TotalActive returns the sum of in-flight item handlers across all workers. A non-zero value means workers are still processing items even if all queues report depth 0 (items are dequeued before processing begins).

type TanneryEvent

type TanneryEvent struct {
	// Kind is one of: "webhook", "enqueue", "dequeue", "retry", "dlq".
	// "webhook" is emitted by the HTTP webhook handler on successful enqueue.
	Kind string
	// Curing is the curing definition name (w.def.Name).
	Curing string
	// Queue is the source queue name (w.def.Queue).
	Queue string
	// DestQueue is the destination queue for "enqueue" events.
	DestQueue string
	// HideID and HideKind identify the hide being processed.
	HideID   string
	HideKind string
	// ItemID is the QueueItem.ID.
	ItemID string
	// Attempt is the 1-based attempt count; 0 for non-failure events.
	Attempt int
	// Err is the error string for failure events; empty otherwise.
	Err string
	// Source is the intake source identifier (e.g. webhook HTTP path).
	// Set for "webhook" events; empty for worker-emitted events.
	Source string
	// WebhookName is the webhook configuration name (e.g. "demo").
	// Set for "webhook" events; empty for worker-emitted events.
	WebhookName string
}

TanneryEvent describes a discrete state change in the tannery pipeline. EventFn receivers use this to drive pretty output, metrics, and alerting without coupling the worker to any display or transport layer.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker consumes one named queue and runs the curing workflow for each item. Each Worker polls its queue at 1-second intervals. Concurrent item processing is bounded by the semaphore channel capacity (QueueConcurrencyConfig.Concurrency).

func NewWorker

func NewWorker(
	def model.CuringDefinition,
	agents map[string]model.Agent,
	concurrency int,
	pollInterval time.Duration,
	hideStore *hide.Store,
	artStore *artifact.Store,
	deps *RunnerDeps,
	qmgr *queue.Manager,
	_ *Router,
	log *logging.Logger,
) (*Worker, error)

NewWorker constructs a Worker for the given CuringDefinition. concurrency sets the semaphore size; values <= 0 default to 1. Returns an error when def.Agent does not appear in agents (fail-fast on misconfiguration rather than wasting an item on the first dequeue).

func (*Worker) ActiveCount added in v0.3.0

func (w *Worker) ActiveCount() int

ActiveCount returns the number of item-handler goroutines currently running. Used by waitForQuiescence to detect in-flight work even when queue depth is 0.

func (*Worker) ProcessItem

func (w *Worker) ProcessItem(ctx context.Context, item model.QueueItem) error

ProcessItem processes one QueueItem end-to-end, returning any error. Unlike handleItem it does not apply retry/DLQ logic — callers are responsible for re-enqueuing on failure. Intended for testing and one-shot invocations.

func (*Worker) Run

func (w *Worker) Run(ctx context.Context)

Run blocks, polling the curing queue at 1-second intervals. Returns when ctx is cancelled. Goroutine panics are recovered and logged. In-flight item handlers spawned from this loop are tracked on w.inflight and must be drained (Worker.WaitInflight) before the Worker is considered shut down.

func (*Worker) WaitInflight

func (w *Worker) WaitInflight()

WaitInflight blocks until every handleItem goroutine spawned by Run has finished. Call this after the Run loop has returned (i.e. after ctx is cancelled) to ensure no in-flight artifact write outlives the lock release.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL