loom

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 27, 2026 License: MIT Imports: 14 Imported by: 0

README

loom

Task mediator library for Go. Submit tasks, dispatch to workers, persist state, deliver lifecycle events to subscribers, and recover from crashes.

Overview

loom is a small, dependency-light Go library that gives you a persistent task queue with pluggable workers, lifecycle events, quality validation, and crash recovery — all in fewer than 1000 lines of production code.

Key properties:

  • Submit returns immediately. Dispatch happens in a background goroutine whose lifetime is independent of the caller's context. A disconnecting HTTP client or cancelled request does NOT cancel an already-dispatched task.
  • All state is persisted. Tasks survive process restart. RecoverCrashed marks in-flight tasks as failed_crash on the next daemon boot.
  • Pluggable workers. Register any type that satisfies Worker — subprocess wrappers, HTTP clients, pure-function executors, or composed bases.
  • Quality gate. Worker results are validated before the task is marked completed. Empty output and rate-limit responses trigger automatic retry.
  • Observable. Eight OTel metric instruments plus structured log fields on every significant operation.

Install

go get github.com/thebtf/aimux/loom@v0.1.0

The module path is github.com/thebtf/aimux/loom. It is a standalone nested module — it does NOT pull in the full aimux server or any MCP dependencies.

Minimum Go version: 1.25.

Quick Start

The minimal pattern is: create an engine from an SQLite database, register a worker, submit a task, and poll until completion.

package main

import (
    "context"
    "database/sql"
    "fmt"
    "log"
    "time"

    _ "modernc.org/sqlite"
    "github.com/thebtf/aimux/loom"
)

// echoWorker returns the prompt back as the result.
type echoWorker struct{}

func (echoWorker) Type() loom.WorkerType { return loom.WorkerTypeCLI }
func (echoWorker) Execute(_ context.Context, t *loom.Task) (*loom.WorkerResult, error) {
    return &loom.WorkerResult{Content: "hello: " + t.Prompt}, nil
}

func main() {
    db, _ := sql.Open("sqlite", "file:hello?cache=shared&mode=memory")
    defer db.Close()

    engine, err := loom.NewEngine(db)
    if err != nil {
        log.Fatal(err)
    }
    engine.RegisterWorker(loom.WorkerTypeCLI, echoWorker{})

    id, err := engine.Submit(context.Background(), loom.TaskRequest{
        WorkerType: loom.WorkerTypeCLI,
        ProjectID:  "demo",
        Prompt:     "world",
    })
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("submitted: %s\n", id)

    for i := 0; i < 20; i++ {
        task, _ := engine.Get(id)
        if task.Status == loom.TaskStatusCompleted {
            fmt.Printf("status: %s\nresult: %s\n", task.Status, task.Result)
            return
        }
        time.Sleep(50 * time.Millisecond)
    }
    log.Fatal("timeout")
}

See examples/hello/main.go for the full runnable version.

Concepts

Task

A Task is the unit of work. It carries an auto-generated ID, a ProjectID for multi-tenant isolation, a RequestID for distributed tracing, a free-form Prompt string, optional Env/CWD overrides, and routing hints (CLI, Role, Model).

Tasks are created by Submit and persisted to SQLite immediately. Their status advances through a documented state machine (see CONTRACT.md).

Worker

A Worker is any type that implements two methods:

type Worker interface {
    Execute(ctx context.Context, task *Task) (*WorkerResult, error)
    Type() WorkerType
}

The library ships three composable bases in loom/workers/:

Base Use case
SubprocessBase Wraps an os/exec subprocess
HTTPBase Makes HTTP calls with retry/backoff
StreamingBase Adds line-by-line progress to any inner Worker
Engine

LoomEngine owns dispatch, persistence, event delivery, and cancellation. It is created once per process via NewEngine(db, opts...).

engine, _ := loom.NewEngine(db,
    loom.WithLogger(myLogger),
    loom.WithMeter(myMeter),
)
engine.RegisterWorker(loom.WorkerTypeCLI, myWorker)
QualityGate

After Execute returns, the result is evaluated before the task transitions to completed. The gate rejects empty output and rate-limit responses (both trigger retry up to maxRetries). Thrashing detection (Jaccard similarity across the last N results) prevents infinite retry loops.

Events

Subscribe to task lifecycle events via the event bus:

unsubscribe := engine.Events().Subscribe(func(e loom.TaskEvent) {
    fmt.Printf("%s → %s\n", e.TaskID, e.Status)
})
defer unsubscribe()

Delivery is synchronous on the dispatch goroutine. Subscribers must return quickly — offload heavy work to their own goroutine.

Crash Recovery

Call RecoverCrashed() once during daemon startup:

n, err := engine.RecoverCrashed()
log.Printf("marked %d crashed tasks as failed_crash", n)

This marks any tasks still in dispatched or running state as failed_crash.

Dependencies

The loom module has a strict, minimal dependency closure:

Dependency Purpose
stdlib All language primitives
github.com/google/uuid Task ID generation (UUID v7)
go.opentelemetry.io/otel/metric OTel metric API (API-only, no SDK)
modernc.org/sqlite Pure-Go SQLite driver (no CGO)

No MCP SDK, no aimux server code, no external HTTP clients beyond stdlib.

  • CONTRACT.md — formal interface specification, state machine, and stability contract
  • PLAYBOOK.md — 7+ complete recipes for common patterns
  • TESTING.md — unit and integration test patterns
  • RECOVERY.md — terminal states and operator playbook
  • CHANGELOG.md — v0.1.0 release notes and full public API surface

Documentation

Overview

Package loom provides a central task mediator with lifecycle management, dependency injection, and observability hooks for long-running work.

LoomEngine dispatches Tasks to pluggable Workers, persists state in SQLite with crash recovery, emits lifecycle events via a callback EventBus, and exposes OpenTelemetry metrics and structured logging.

Basic usage:

db, _ := sql.Open("sqlite", "tasks.db?_pragma=journal_mode(WAL)")
defer db.Close()

engine, err := loom.NewEngine(db, "my-daemon",
    loom.WithLogger(myLogger),
    loom.WithMeter(myMeter),
)
if err != nil { panic(err) }
defer engine.Close(context.Background())

engine.RegisterWorker(loom.WorkerTypeCLI, myWorker)

id, err := engine.Submit(ctx, loom.TaskRequest{
    WorkerType: loom.WorkerTypeCLI,
    ProjectID:  "my-project",
    Prompt:     "do the thing",
})

See README.md, CONTRACT.md, PLAYBOOK.md, and TESTING.md for details.

Index

Constants

This section is empty.

Variables

View Source
var ErrEngineClosed = errors.New("loom: engine closed")

ErrEngineClosed is returned by Submit when the engine has been shut down via Close. It is a sentinel error — callers can compare against it with errors.Is to distinguish graceful shutdown from other failures.

Functions

func RequestIDFrom

func RequestIDFrom(ctx context.Context) string

RequestIDFrom extracts the request ID from the context. Returns empty string if no request ID was attached.

func WithRequestID

func WithRequestID(ctx context.Context, requestID string) context.Context

WithRequestID returns a new context carrying the given request ID. Used by aimux server handlers before Submit to propagate tracing.

Types

type EventBus

type EventBus struct {
	// contains filtered or unexported fields
}

EventBus is a synchronous fan-out event broadcaster with callback subscribers. Subscribers are invoked in registration order, synchronously from the emitter's goroutine. Panics in a subscriber are recovered and logged; they do NOT affect other subscribers or the engine. Slow subscribers block the engine — subscribers MUST return quickly and offload heavy work to their own goroutine.

func NewEventBus

func NewEventBus(logger deps.Logger) *EventBus

NewEventBus creates a new EventBus with an optional logger for panic recovery. If logger is nil, a NoopLogger is used.

func (*EventBus) Emit

func (b *EventBus) Emit(e TaskEvent)

Emit delivers the event to all subscribers synchronously in registration order. Panics in a subscriber are recovered and logged — they do NOT propagate to other subscribers or back to the caller.

func (*EventBus) Subscribe

func (b *EventBus) Subscribe(handler func(TaskEvent)) func()

Subscribe registers a callback and returns an unsubscribe function. Calling the returned unsubscribe multiple times is safe (idempotent).

type EventType

type EventType string

EventType identifies a task lifecycle event.

const (
	EventTaskCreated     EventType = "task.created"
	EventTaskDispatched  EventType = "task.dispatched"
	EventTaskRunning     EventType = "task.running"
	EventTaskCompleted   EventType = "task.completed"
	EventTaskFailed      EventType = "task.failed"
	EventTaskFailedCrash EventType = "task.failed_crash"
	EventTaskRetrying    EventType = "task.retrying"
	EventTaskCancelled   EventType = "task.cancelled"
)

type GateDecision

type GateDecision struct {
	Accept bool   `json:"accept"`
	Reason string `json:"reason"` // "pass", "empty_output", "rate_limit_error", "thrashing"
	Retry  bool   `json:"retry"`  // if !Accept && Retry → status retrying, re-dispatch
}

GateDecision is the result of a quality gate evaluation.

type LoomEngine

type LoomEngine struct {
	// contains filtered or unexported fields
}

LoomEngine is the central task mediator. All tool handler work flows through LoomEngine which owns task creation, dispatch, execution, persistence, and delivery.

func New

func New(store *TaskStore, opts ...Option) *LoomEngine

New creates a LoomEngine with the given store and options. Dep fields (logger, clock, idGen, meter) are initialised to their noop/system defaults before Options are applied so callers that omit an option get a safe default. EventBus is created AFTER options so it receives the final (possibly injected) logger.

func NewEngine

func NewEngine(db *sql.DB, engineName string, opts ...Option) (*LoomEngine, error)

NewEngine constructs a LoomEngine from a raw *sql.DB. It creates a TaskStore internally and returns the engine. This is the v0.1.0-aligned constructor from spec FR-6 — New(store, opts) remains for backwards compatibility with aimux call sites and will be removed during Phase 3 atomic migration.

engineName identifies the owning daemon for per-daemon task scoping (AIMUX-10). It must not be empty; NewEngine returns an error if it is.

func (*LoomEngine) Cancel

func (l *LoomEngine) Cancel(taskID string) error

Cancel requests cancellation of a running task.

func (*LoomEngine) CancelAllForProject

func (l *LoomEngine) CancelAllForProject(projectID string) (int, error)

CancelAllForProject cancels all running tasks for the given project. Returns the number of tasks signaled for cancellation. Tasks that are not currently running (pending, completed, failed, already cancelled) are not affected. Per US9: used by engram to cancel all work for a disconnecting project.

func (*LoomEngine) Close added in v0.1.1

func (l *LoomEngine) Close(ctx context.Context) error

Close signals engine shutdown and waits for all in-flight dispatch goroutines to complete (or ctx to expire). Callers MUST invoke Close before closing the underlying *sql.DB to prevent write-after-close races. Close is idempotent: subsequent invocations return nil immediately.

After Close returns, Submit will reject new tasks with ErrEngineClosed. In-flight dispatch goroutines already running continue until they finish naturally. ctx is used only as a deadline on how long Close will wait for them — it does NOT cancel the tasks themselves. Use Cancel or CancelAllForProject before Close if you need to abort in-flight work.

func (*LoomEngine) Count added in v0.2.0

func (e *LoomEngine) Count(filter TaskFilter) (int, error)

Count returns the number of tasks matching the filter, scoped to the store's engine_name. This is an optional capability that can be used by budget-layer callers (FR-4, C2).

func (*LoomEngine) CountAll added in v0.2.0

func (e *LoomEngine) CountAll() (int, error)

CountAll returns the total number of tasks across all engines. This is an optional capability for cross-engine diagnostics.

func (*LoomEngine) Events

func (l *LoomEngine) Events() *EventBus

Events returns the event bus for subscribing to task lifecycle events.

func (*LoomEngine) Get

func (l *LoomEngine) Get(taskID string) (*Task, error)

Get returns current task state.

func (*LoomEngine) List

func (l *LoomEngine) List(projectID string, statuses ...TaskStatus) ([]*Task, error)

List returns tasks for a project, optionally filtered by status.

func (*LoomEngine) ListAll added in v0.2.0

func (l *LoomEngine) ListAll(statuses ...TaskStatus) ([]*Task, error)

ListAll returns tasks across all engines and projects, optionally filtered by status. Used for cross-daemon diagnostic views (AIMUX-10 FR-5).

func (*LoomEngine) RecoverCrashed

func (l *LoomEngine) RecoverCrashed() (int, error)

RecoverCrashed marks all dispatched/running tasks as failed_crash. Called once on daemon startup.

func (*LoomEngine) RegisterWorker

func (l *LoomEngine) RegisterWorker(wt WorkerType, w Worker)

RegisterWorker registers a worker for a given worker type.

func (*LoomEngine) Submit

func (l *LoomEngine) Submit(ctx context.Context, req TaskRequest) (string, error)

Submit creates a persistent task and dispatches to the appropriate worker. Returns immediately with taskID. Execution happens in a background goroutine. RequestID is extracted from ctx via RequestIDFrom for distributed tracing. After Close has been called Submit returns ErrEngineClosed without side effects.

type Option

type Option func(*LoomEngine)

Option configures LoomEngine.

func WithClock

func WithClock(c deps.Clock) Option

WithClock injects a custom Clock into the LoomEngine. If not supplied, SystemClock (time.Now) is used. A nil argument is ignored so the safe default is never overwritten.

func WithIDGenerator

func WithIDGenerator(g deps.IDGenerator) Option

WithIDGenerator injects a custom IDGenerator into the LoomEngine. If not supplied, UUIDGenerator (uuid.NewV7) is used. A nil argument is ignored so the safe default is never overwritten.

func WithLogger

func WithLogger(l deps.Logger) Option

WithLogger injects a custom Logger into the LoomEngine. If not supplied, NoopLogger is used and all log output is discarded. A nil argument is ignored so the safe default is never overwritten.

func WithMaxRetries

func WithMaxRetries(n int) Option

WithMaxRetries sets the maximum retry count (default 2).

func WithMeter

func WithMeter(m deps.Meter) Option

WithMeter injects a custom Meter into the LoomEngine for OTel instrumentation. If not supplied, NoopMeter is used and all metrics are discarded. A nil argument is ignored so the safe default is never overwritten.

type QualityGate

type QualityGate struct {
	// contains filtered or unexported fields
}

QualityGate validates worker results.

func NewQualityGate

func NewQualityGate() *QualityGate

NewQualityGate creates a quality gate with defaults (threshold=0.8, window=3).

func NewQualityGateWithOpts

func NewQualityGateWithOpts(opts ...QualityGateOption) *QualityGate

NewQualityGateWithOpts creates a quality gate with options.

func (*QualityGate) Check

func (g *QualityGate) Check(task *Task, result *WorkerResult) GateDecision

Check evaluates a worker result.

func (*QualityGate) Clear

func (g *QualityGate) Clear(taskID string)

Clear removes the history for a task, freeing memory after dispatch completes.

type QualityGateOption

type QualityGateOption func(*QualityGate)

QualityGateOption configures the quality gate.

func WithThreshold

func WithThreshold(t float64) QualityGateOption

WithThreshold sets the Jaccard similarity threshold.

func WithWindowSize

func WithWindowSize(n int) QualityGateOption

WithWindowSize sets the thrashing detection window (minimum 2).

type RequestIDKey

type RequestIDKey struct{}

RequestIDKey is the context key type used to attach request IDs to loom-managed contexts. Exported so external callers can read request IDs off contexts produced by WithRequestID without going through loom helpers.

Use as a value: context.Value(loom.RequestIDKey{}) Prefer the helpers WithRequestID and RequestIDFrom where possible.

type Task

type Task struct {
	ID           string            `json:"id"`
	Status       TaskStatus        `json:"status"`
	WorkerType   WorkerType        `json:"worker_type"`
	ProjectID    string            `json:"project_id"`
	RequestID    string            `json:"request_id,omitempty"`
	EngineName   string            `json:"engine_name,omitempty"`
	Prompt       string            `json:"prompt"`
	CWD          string            `json:"cwd,omitempty"`
	Env          map[string]string `json:"env,omitempty"`
	CLI          string            `json:"cli,omitempty"`
	Role         string            `json:"role,omitempty"`
	Model        string            `json:"model,omitempty"`
	Effort       string            `json:"effort,omitempty"`
	Timeout      int               `json:"timeout,omitempty"`
	Metadata     map[string]any    `json:"metadata,omitempty"`
	Result       string            `json:"result,omitempty"`
	Error        string            `json:"error,omitempty"`
	Retries      int               `json:"retries"`
	CreatedAt    time.Time         `json:"created_at"`
	DispatchedAt *time.Time        `json:"dispatched_at,omitempty"`
	CompletedAt  *time.Time        `json:"completed_at,omitempty"`
}

Task represents a unit of work managed by LoomEngine.

type TaskEvent

type TaskEvent struct {
	Type      EventType  `json:"type"`
	TaskID    string     `json:"task_id"`
	ProjectID string     `json:"project_id"`
	RequestID string     `json:"request_id,omitempty"`
	Status    TaskStatus `json:"status"`
	Timestamp time.Time  `json:"timestamp"`
}

TaskEvent carries task lifecycle data to subscribers. All fields are required — subscribers can filter on ProjectID for multi-tenant fanout and correlate with RequestID for distributed tracing.

type TaskFilter added in v0.2.0

type TaskFilter struct {
	ProjectID string
	Statuses  []TaskStatus
}

TaskFilter holds optional filtering criteria for loom task queries. An empty TaskFilter (zero value) matches tasks scoped to the current engine. Use CountAll for cross-engine total count.

type TaskRequest

type TaskRequest struct {
	WorkerType WorkerType
	ProjectID  string
	RequestID  string
	Prompt     string
	CWD        string
	Env        map[string]string
	CLI        string
	Role       string
	Model      string
	Effort     string
	Timeout    int
	Metadata   map[string]any
}

TaskRequest is the input for submitting a new task.

type TaskStatus

type TaskStatus string

TaskStatus represents the lifecycle state of a task.

const (
	TaskStatusPending     TaskStatus = "pending"
	TaskStatusDispatched  TaskStatus = "dispatched"
	TaskStatusRunning     TaskStatus = "running"
	TaskStatusCompleted   TaskStatus = "completed"
	TaskStatusFailed      TaskStatus = "failed"
	TaskStatusFailedCrash TaskStatus = "failed_crash"
	TaskStatusRetrying    TaskStatus = "retrying"
)

func (TaskStatus) CanTransitionTo

func (s TaskStatus) CanTransitionTo(target TaskStatus) bool

CanTransitionTo checks if transitioning from current status to target is valid.

func (TaskStatus) IsTerminal

func (s TaskStatus) IsTerminal() bool

IsTerminal returns true if the status is a terminal state.

type TaskStore

type TaskStore struct {
	// contains filtered or unexported fields
}

TaskStore persists tasks in SQLite.

func NewTaskStore

func NewTaskStore(db *sql.DB, engineName string) (*TaskStore, error)

NewTaskStore initialises the tasks table and returns a TaskStore. engineName identifies the owning daemon and is used to scope task queries (MarkCrashed, List, Count). Returns an error if engineName is empty — silent fallback to empty identity is forbidden (spec C3 / FR-7).

func (*TaskStore) Count added in v0.2.0

func (s *TaskStore) Count(filter TaskFilter) (int, error)

Count returns the number of tasks matching the filter, scoped to the store's engine_name. Uses SQL COUNT for efficiency — avoids loading full rows.

func (*TaskStore) CountAll added in v0.2.0

func (s *TaskStore) CountAll() (int, error)

CountAll returns the total number of tasks across all engines. Unlike Count, this applies no engine_name filter.

func (*TaskStore) Create

func (s *TaskStore) Create(task *Task) error

Create inserts a new task into the store.

func (*TaskStore) Get

func (s *TaskStore) Get(id string) (*Task, error)

Get retrieves a task by ID.

func (*TaskStore) IncrementRetries

func (s *TaskStore) IncrementRetries(id string) error

IncrementRetries bumps the retry count for a task.

func (*TaskStore) List

func (s *TaskStore) List(projectID string, statuses ...TaskStatus) ([]*Task, error)

List returns tasks for a project, optionally filtered by status values.

func (*TaskStore) ListAll added in v0.2.0

func (s *TaskStore) ListAll(statuses ...TaskStatus) ([]*Task, error)

ListAll returns tasks across all engines and projects, optionally filtered by status. Unlike List, it applies no engine_name or project_id filter — use for cross-daemon global views (AIMUX-10 FR-5, sessions tool all=true opt-in).

func (*TaskStore) MarkCrashed

func (s *TaskStore) MarkCrashed() (int, error)

MarkCrashed sets status='failed_crash' for all dispatched or running tasks. Returns the number of tasks marked.

Raw SQL is used intentionally: on daemon startup this bulk-updates every in-flight row in a single statement, which is both simpler and faster than iterating with UpdateStatus. The init() assertion above ensures the state machine continues to permit these transitions so the raw SQL can never silently diverge from CanTransitionTo validation.

func (*TaskStore) SetDaemonUUID added in v0.2.0

func (s *TaskStore) SetDaemonUUID(uuid string)

SetDaemonUUID configures the daemon-lifetime UUID to be stamped on every new task row. Called once at startup by the main binary after generating the UUID via pkg/session.GetDaemonUUID(). Loom is a separate module and cannot import pkg/session directly, so the UUID is injected here.

func (*TaskStore) SetResult

func (s *TaskStore) SetResult(id string, result string, errMsg string) error

SetResult stores the execution result and marks completed_at. errMsg is redacted before storage — secrets (API keys, Bearer tokens) are replaced with [REDACTED]. result is stored verbatim (callers own its content).

func (*TaskStore) UpdateStatus

func (s *TaskStore) UpdateStatus(id string, from, to TaskStatus) error

UpdateStatus transitions a task from `from` to `to`, enforcing state machine rules. Returns an error if the current status does not match `from` or the transition is invalid.

type Worker

type Worker interface {
	Execute(ctx context.Context, task *Task) (*WorkerResult, error)
	Type() WorkerType
}

Worker executes a task and returns the result.

type WorkerResult

type WorkerResult struct {
	Content    string         `json:"content"`
	Metadata   map[string]any `json:"metadata,omitempty"`
	DurationMS int64          `json:"duration_ms"`
}

WorkerResult holds the output from a worker execution.

type WorkerType

type WorkerType string

WorkerType identifies which worker handles a task.

const (
	WorkerTypeCLI          WorkerType = "cli"
	WorkerTypeThinker      WorkerType = "thinker"
	WorkerTypeInvestigator WorkerType = "investigator"
	WorkerTypeOrchestrator WorkerType = "orchestrator"
)

Directories

Path Synopsis
Package deps contains injectable dependency interfaces for LoomEngine.
Package deps contains injectable dependency interfaces for LoomEngine.
examples
custom_worker command
CustomWorker demonstrates how to satisfy loom.Worker from scratch using only stdlib — no SubprocessBase, no HTTPBase, just the Worker interface directly.
CustomWorker demonstrates how to satisfy loom.Worker from scratch using only stdlib — no SubprocessBase, no HTTPBase, just the Worker interface directly.
hello command
Hello demonstrates the minimal loom.Submit + Get polling pattern.
Hello demonstrates the minimal loom.Submit + Get polling pattern.
http command
HTTP demonstrates HTTPBase usage with an in-process httptest.Server.
HTTP demonstrates HTTPBase usage with an in-process httptest.Server.
subprocess command
Subprocess demonstrates SubprocessBase composition: how to wrap an OS process as a loom Worker.
Subprocess demonstrates SubprocessBase composition: how to wrap an OS process as a loom Worker.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL