Documentation
¶
Overview ¶
Package loom provides a central task mediator with lifecycle management, dependency injection, and observability hooks for long-running work.
LoomEngine dispatches Tasks to pluggable Workers, persists state in SQLite with crash recovery, emits lifecycle events via a callback EventBus, and exposes OpenTelemetry metrics and structured logging.
Basic usage:
db, _ := sql.Open("sqlite", "tasks.db?_pragma=journal_mode(WAL)")
defer db.Close()
engine, err := loom.NewEngine(db, "my-daemon",
loom.WithLogger(myLogger),
loom.WithMeter(myMeter),
)
if err != nil { panic(err) }
defer engine.Close(context.Background())
engine.RegisterWorker(loom.WorkerTypeCLI, myWorker)
id, err := engine.Submit(ctx, loom.TaskRequest{
WorkerType: loom.WorkerTypeCLI,
ProjectID: "my-project",
Prompt: "do the thing",
})
See README.md, CONTRACT.md, PLAYBOOK.md, and TESTING.md for details.
Index ¶
- Variables
- func RequestIDFrom(ctx context.Context) string
- func WithRequestID(ctx context.Context, requestID string) context.Context
- type EventBus
- type EventType
- type GateDecision
- type LoomEngine
- func (l *LoomEngine) Cancel(taskID string) error
- func (l *LoomEngine) CancelAllForProject(projectID string) (int, error)
- func (l *LoomEngine) Close(ctx context.Context) error
- func (e *LoomEngine) Count(filter TaskFilter) (int, error)
- func (e *LoomEngine) CountAll() (int, error)
- func (l *LoomEngine) Events() *EventBus
- func (l *LoomEngine) Get(taskID string) (*Task, error)
- func (l *LoomEngine) List(projectID string, statuses ...TaskStatus) ([]*Task, error)
- func (l *LoomEngine) ListAll(statuses ...TaskStatus) ([]*Task, error)
- func (l *LoomEngine) RecoverCrashed() (int, error)
- func (l *LoomEngine) RegisterWorker(wt WorkerType, w Worker)
- func (l *LoomEngine) Submit(ctx context.Context, req TaskRequest) (string, error)
- type Option
- type QualityGate
- type QualityGateOption
- type RequestIDKey
- type Task
- type TaskEvent
- type TaskFilter
- type TaskRequest
- type TaskStatus
- type TaskStore
- func (s *TaskStore) Count(filter TaskFilter) (int, error)
- func (s *TaskStore) CountAll() (int, error)
- func (s *TaskStore) Create(task *Task) error
- func (s *TaskStore) Get(id string) (*Task, error)
- func (s *TaskStore) IncrementRetries(id string) error
- func (s *TaskStore) List(projectID string, statuses ...TaskStatus) ([]*Task, error)
- func (s *TaskStore) ListAll(statuses ...TaskStatus) ([]*Task, error)
- func (s *TaskStore) MarkCrashed() (int, error)
- func (s *TaskStore) SetDaemonUUID(uuid string)
- func (s *TaskStore) SetResult(id string, result string, errMsg string) error
- func (s *TaskStore) UpdateStatus(id string, from, to TaskStatus) error
- type Worker
- type WorkerResult
- type WorkerType
Constants ¶
This section is empty.
Variables ¶
var ErrEngineClosed = errors.New("loom: engine closed")
ErrEngineClosed is returned by Submit when the engine has been shut down via Close. It is a sentinel error — callers can compare against it with errors.Is to distinguish graceful shutdown from other failures.
Functions ¶
func RequestIDFrom ¶
RequestIDFrom extracts the request ID from the context. Returns empty string if no request ID was attached.
Types ¶
type EventBus ¶
type EventBus struct {
// contains filtered or unexported fields
}
EventBus is a synchronous fan-out event broadcaster with callback subscribers. Subscribers are invoked in registration order, synchronously from the emitter's goroutine. Panics in a subscriber are recovered and logged; they do NOT affect other subscribers or the engine. Slow subscribers block the engine — subscribers MUST return quickly and offload heavy work to their own goroutine.
func NewEventBus ¶
NewEventBus creates a new EventBus with an optional logger for panic recovery. If logger is nil, a NoopLogger is used.
type EventType ¶
type EventType string
EventType identifies a task lifecycle event.
const ( EventTaskCreated EventType = "task.created" EventTaskDispatched EventType = "task.dispatched" EventTaskRunning EventType = "task.running" EventTaskCompleted EventType = "task.completed" EventTaskFailed EventType = "task.failed" EventTaskFailedCrash EventType = "task.failed_crash" EventTaskRetrying EventType = "task.retrying" EventTaskCancelled EventType = "task.cancelled" )
type GateDecision ¶
type GateDecision struct {
Accept bool `json:"accept"`
Reason string `json:"reason"` // "pass", "empty_output", "rate_limit_error", "thrashing"
Retry bool `json:"retry"` // if !Accept && Retry → status retrying, re-dispatch
}
GateDecision is the result of a quality gate evaluation.
type LoomEngine ¶
type LoomEngine struct {
// contains filtered or unexported fields
}
LoomEngine is the central task mediator. All tool handler work flows through LoomEngine which owns task creation, dispatch, execution, persistence, and delivery.
func New ¶
func New(store *TaskStore, opts ...Option) *LoomEngine
New creates a LoomEngine with the given store and options. Dep fields (logger, clock, idGen, meter) are initialised to their noop/system defaults before Options are applied so callers that omit an option get a safe default. EventBus is created AFTER options so it receives the final (possibly injected) logger.
func NewEngine ¶
NewEngine constructs a LoomEngine from a raw *sql.DB. It creates a TaskStore internally and returns the engine. This is the v0.1.0-aligned constructor from spec FR-6 — New(store, opts) remains for backwards compatibility with aimux call sites and will be removed during Phase 3 atomic migration.
engineName identifies the owning daemon for per-daemon task scoping (AIMUX-10). It must not be empty; NewEngine returns an error if it is.
func (*LoomEngine) Cancel ¶
func (l *LoomEngine) Cancel(taskID string) error
Cancel requests cancellation of a running task.
func (*LoomEngine) CancelAllForProject ¶
func (l *LoomEngine) CancelAllForProject(projectID string) (int, error)
CancelAllForProject cancels all running tasks for the given project. Returns the number of tasks signaled for cancellation. Tasks that are not currently running (pending, completed, failed, already cancelled) are not affected. Per US9: used by engram to cancel all work for a disconnecting project.
func (*LoomEngine) Close ¶ added in v0.1.1
func (l *LoomEngine) Close(ctx context.Context) error
Close signals engine shutdown and waits for all in-flight dispatch goroutines to complete (or ctx to expire). Callers MUST invoke Close before closing the underlying *sql.DB to prevent write-after-close races. Close is idempotent: subsequent invocations return nil immediately.
After Close returns, Submit will reject new tasks with ErrEngineClosed. In-flight dispatch goroutines already running continue until they finish naturally. ctx is used only as a deadline on how long Close will wait for them — it does NOT cancel the tasks themselves. Use Cancel or CancelAllForProject before Close if you need to abort in-flight work.
func (*LoomEngine) Count ¶ added in v0.2.0
func (e *LoomEngine) Count(filter TaskFilter) (int, error)
Count returns the number of tasks matching the filter, scoped to the store's engine_name. This is an optional capability that can be used by budget-layer callers (FR-4, C2).
func (*LoomEngine) CountAll ¶ added in v0.2.0
func (e *LoomEngine) CountAll() (int, error)
CountAll returns the total number of tasks across all engines. This is an optional capability for cross-engine diagnostics.
func (*LoomEngine) Events ¶
func (l *LoomEngine) Events() *EventBus
Events returns the event bus for subscribing to task lifecycle events.
func (*LoomEngine) Get ¶
func (l *LoomEngine) Get(taskID string) (*Task, error)
Get returns current task state.
func (*LoomEngine) List ¶
func (l *LoomEngine) List(projectID string, statuses ...TaskStatus) ([]*Task, error)
List returns tasks for a project, optionally filtered by status.
func (*LoomEngine) ListAll ¶ added in v0.2.0
func (l *LoomEngine) ListAll(statuses ...TaskStatus) ([]*Task, error)
ListAll returns tasks across all engines and projects, optionally filtered by status. Used for cross-daemon diagnostic views (AIMUX-10 FR-5).
func (*LoomEngine) RecoverCrashed ¶
func (l *LoomEngine) RecoverCrashed() (int, error)
RecoverCrashed marks all dispatched/running tasks as failed_crash. Called once on daemon startup.
func (*LoomEngine) RegisterWorker ¶
func (l *LoomEngine) RegisterWorker(wt WorkerType, w Worker)
RegisterWorker registers a worker for a given worker type.
func (*LoomEngine) Submit ¶
func (l *LoomEngine) Submit(ctx context.Context, req TaskRequest) (string, error)
Submit creates a persistent task and dispatches to the appropriate worker. Returns immediately with taskID. Execution happens in a background goroutine. RequestID is extracted from ctx via RequestIDFrom for distributed tracing. After Close has been called Submit returns ErrEngineClosed without side effects.
type Option ¶
type Option func(*LoomEngine)
Option configures LoomEngine.
func WithClock ¶
WithClock injects a custom Clock into the LoomEngine. If not supplied, SystemClock (time.Now) is used. A nil argument is ignored so the safe default is never overwritten.
func WithIDGenerator ¶
func WithIDGenerator(g deps.IDGenerator) Option
WithIDGenerator injects a custom IDGenerator into the LoomEngine. If not supplied, UUIDGenerator (uuid.NewV7) is used. A nil argument is ignored so the safe default is never overwritten.
func WithLogger ¶
WithLogger injects a custom Logger into the LoomEngine. If not supplied, NoopLogger is used and all log output is discarded. A nil argument is ignored so the safe default is never overwritten.
func WithMaxRetries ¶
WithMaxRetries sets the maximum retry count (default 2).
type QualityGate ¶
type QualityGate struct {
// contains filtered or unexported fields
}
QualityGate validates worker results.
func NewQualityGate ¶
func NewQualityGate() *QualityGate
NewQualityGate creates a quality gate with defaults (threshold=0.8, window=3).
func NewQualityGateWithOpts ¶
func NewQualityGateWithOpts(opts ...QualityGateOption) *QualityGate
NewQualityGateWithOpts creates a quality gate with options.
func (*QualityGate) Check ¶
func (g *QualityGate) Check(task *Task, result *WorkerResult) GateDecision
Check evaluates a worker result.
func (*QualityGate) Clear ¶
func (g *QualityGate) Clear(taskID string)
Clear removes the history for a task, freeing memory after dispatch completes.
type QualityGateOption ¶
type QualityGateOption func(*QualityGate)
QualityGateOption configures the quality gate.
func WithThreshold ¶
func WithThreshold(t float64) QualityGateOption
WithThreshold sets the Jaccard similarity threshold.
func WithWindowSize ¶
func WithWindowSize(n int) QualityGateOption
WithWindowSize sets the thrashing detection window (minimum 2).
type RequestIDKey ¶
type RequestIDKey struct{}
RequestIDKey is the context key type used to attach request IDs to loom-managed contexts. Exported so external callers can read request IDs off contexts produced by WithRequestID without going through loom helpers.
Use as a value: context.Value(loom.RequestIDKey{}) Prefer the helpers WithRequestID and RequestIDFrom where possible.
type Task ¶
type Task struct {
ID string `json:"id"`
Status TaskStatus `json:"status"`
WorkerType WorkerType `json:"worker_type"`
ProjectID string `json:"project_id"`
RequestID string `json:"request_id,omitempty"`
EngineName string `json:"engine_name,omitempty"`
Prompt string `json:"prompt"`
CWD string `json:"cwd,omitempty"`
Env map[string]string `json:"env,omitempty"`
CLI string `json:"cli,omitempty"`
Role string `json:"role,omitempty"`
Model string `json:"model,omitempty"`
Effort string `json:"effort,omitempty"`
Timeout int `json:"timeout,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
Result string `json:"result,omitempty"`
Error string `json:"error,omitempty"`
Retries int `json:"retries"`
CreatedAt time.Time `json:"created_at"`
DispatchedAt *time.Time `json:"dispatched_at,omitempty"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
}
Task represents a unit of work managed by LoomEngine.
type TaskEvent ¶
type TaskEvent struct {
Type EventType `json:"type"`
TaskID string `json:"task_id"`
ProjectID string `json:"project_id"`
RequestID string `json:"request_id,omitempty"`
Status TaskStatus `json:"status"`
Timestamp time.Time `json:"timestamp"`
}
TaskEvent carries task lifecycle data to subscribers. All fields are required — subscribers can filter on ProjectID for multi-tenant fanout and correlate with RequestID for distributed tracing.
type TaskFilter ¶ added in v0.2.0
type TaskFilter struct {
ProjectID string
Statuses []TaskStatus
}
TaskFilter holds optional filtering criteria for loom task queries. An empty TaskFilter (zero value) matches tasks scoped to the current engine. Use CountAll for cross-engine total count.
type TaskRequest ¶
type TaskRequest struct {
WorkerType WorkerType
ProjectID string
RequestID string
Prompt string
CWD string
Env map[string]string
CLI string
Role string
Model string
Effort string
Timeout int
Metadata map[string]any
}
TaskRequest is the input for submitting a new task.
type TaskStatus ¶
type TaskStatus string
TaskStatus represents the lifecycle state of a task.
const ( TaskStatusPending TaskStatus = "pending" TaskStatusDispatched TaskStatus = "dispatched" TaskStatusRunning TaskStatus = "running" TaskStatusCompleted TaskStatus = "completed" TaskStatusFailed TaskStatus = "failed" TaskStatusFailedCrash TaskStatus = "failed_crash" TaskStatusRetrying TaskStatus = "retrying" )
func (TaskStatus) CanTransitionTo ¶
func (s TaskStatus) CanTransitionTo(target TaskStatus) bool
CanTransitionTo checks if transitioning from current status to target is valid.
func (TaskStatus) IsTerminal ¶
func (s TaskStatus) IsTerminal() bool
IsTerminal returns true if the status is a terminal state.
type TaskStore ¶
type TaskStore struct {
// contains filtered or unexported fields
}
TaskStore persists tasks in SQLite.
func NewTaskStore ¶
NewTaskStore initialises the tasks table and returns a TaskStore. engineName identifies the owning daemon and is used to scope task queries (MarkCrashed, List, Count). Returns an error if engineName is empty — silent fallback to empty identity is forbidden (spec C3 / FR-7).
func (*TaskStore) Count ¶ added in v0.2.0
func (s *TaskStore) Count(filter TaskFilter) (int, error)
Count returns the number of tasks matching the filter, scoped to the store's engine_name. Uses SQL COUNT for efficiency — avoids loading full rows.
func (*TaskStore) CountAll ¶ added in v0.2.0
CountAll returns the total number of tasks across all engines. Unlike Count, this applies no engine_name filter.
func (*TaskStore) IncrementRetries ¶
IncrementRetries bumps the retry count for a task.
func (*TaskStore) List ¶
func (s *TaskStore) List(projectID string, statuses ...TaskStatus) ([]*Task, error)
List returns tasks for a project, optionally filtered by status values.
func (*TaskStore) ListAll ¶ added in v0.2.0
func (s *TaskStore) ListAll(statuses ...TaskStatus) ([]*Task, error)
ListAll returns tasks across all engines and projects, optionally filtered by status. Unlike List, it applies no engine_name or project_id filter — use for cross-daemon global views (AIMUX-10 FR-5, sessions tool all=true opt-in).
func (*TaskStore) MarkCrashed ¶
MarkCrashed sets status='failed_crash' for all dispatched or running tasks. Returns the number of tasks marked.
Raw SQL is used intentionally: on daemon startup this bulk-updates every in-flight row in a single statement, which is both simpler and faster than iterating with UpdateStatus. The init() assertion above ensures the state machine continues to permit these transitions so the raw SQL can never silently diverge from CanTransitionTo validation.
func (*TaskStore) SetDaemonUUID ¶ added in v0.2.0
SetDaemonUUID configures the daemon-lifetime UUID to be stamped on every new task row. Called once at startup by the main binary after generating the UUID via pkg/session.GetDaemonUUID(). Loom is a separate module and cannot import pkg/session directly, so the UUID is injected here.
func (*TaskStore) SetResult ¶
SetResult stores the execution result and marks completed_at. errMsg is redacted before storage — secrets (API keys, Bearer tokens) are replaced with [REDACTED]. result is stored verbatim (callers own its content).
func (*TaskStore) UpdateStatus ¶
func (s *TaskStore) UpdateStatus(id string, from, to TaskStatus) error
UpdateStatus transitions a task from `from` to `to`, enforcing state machine rules. Returns an error if the current status does not match `from` or the transition is invalid.
type Worker ¶
type Worker interface {
Execute(ctx context.Context, task *Task) (*WorkerResult, error)
Type() WorkerType
}
Worker executes a task and returns the result.
type WorkerResult ¶
type WorkerResult struct {
Content string `json:"content"`
Metadata map[string]any `json:"metadata,omitempty"`
DurationMS int64 `json:"duration_ms"`
}
WorkerResult holds the output from a worker execution.
type WorkerType ¶
type WorkerType string
WorkerType identifies which worker handles a task.
const ( WorkerTypeCLI WorkerType = "cli" WorkerTypeThinker WorkerType = "thinker" WorkerTypeInvestigator WorkerType = "investigator" WorkerTypeOrchestrator WorkerType = "orchestrator" )
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package deps contains injectable dependency interfaces for LoomEngine.
|
Package deps contains injectable dependency interfaces for LoomEngine. |
|
examples
|
|
|
custom_worker
command
CustomWorker demonstrates how to satisfy loom.Worker from scratch using only stdlib — no SubprocessBase, no HTTPBase, just the Worker interface directly.
|
CustomWorker demonstrates how to satisfy loom.Worker from scratch using only stdlib — no SubprocessBase, no HTTPBase, just the Worker interface directly. |
|
hello
command
Hello demonstrates the minimal loom.Submit + Get polling pattern.
|
Hello demonstrates the minimal loom.Submit + Get polling pattern. |
|
http
command
HTTP demonstrates HTTPBase usage with an in-process httptest.Server.
|
HTTP demonstrates HTTPBase usage with an in-process httptest.Server. |
|
subprocess
command
Subprocess demonstrates SubprocessBase composition: how to wrap an OS process as a loom Worker.
|
Subprocess demonstrates SubprocessBase composition: how to wrap an OS process as a loom Worker. |