tasks

package
v1.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 11, 2026 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Overview

Package tasks owns Harbor's unified task surface: the `TaskRegistry` interface every planner / runtime / steering caller uses to spawn, list, cancel, and prioritise both foreground runs and background tasks under one `TaskID` namespace.

Phase 20 ships the per-task surface (Spawn / Get / List / Cancel / Prioritize / Mark*); Phase 21 lays groups + retain-turn + patches on top. Bundling the whole TaskService into one phase would slow the wave-end E2E + delay the per-task surface that downstream phases (steering Phase 53, planner Phase 42) want as a stable foundation. The split is recorded in `docs/decisions.md` as D-030.

Lifecycle FSM (enforced at the driver):

PENDING ──Spawn──▶ RUNNING ──MarkComplete──▶ COMPLETE
   │                  │
   │                  ├──MarkPaused──▶ PAUSED ──MarkResumed──▶ RUNNING
   │                  │
   │                  └──MarkFailed──▶ FAILED (terminal)
   │
   └──Cancel──▶ CANCELLED (terminal; valid from any non-terminal state)

Invalid transitions return `ErrInvalidTransition` (wrapped with the from/to states named in the message). Terminal-to-anything is invalid; same-state is invalid (no idempotent self-transitions on the driver — the runtime engine knows whether a transition is real before calling).

Idempotency. `Spawn` keys on `(SessionID, IdempotencyKey)`: same key → returns the existing `TaskHandle` with `Reused: true`; divergent `SpawnRequest` under the same key returns `ErrIdempotencyConflict`. Empty `IdempotencyKey` disables dedup entirely (each Spawn yields a fresh handle, no collisions).

Cancellation propagation. `Cancel` walks the children index per `Task.PropagateOnCancel`:

  • `"cascade"` (default): BFS through descendants, transitioning each to `StatusCancelled`, emitting `task.cancelled` per cancel.
  • `"isolate"`: only the target task transitions; children stay in their current state.

Identity. The triple `(tenant, user, session)` is mandatory at every API boundary (D-001). Empty tenant/user/session in `SpawnRequest.Identity` (or the ctx Identity for Get/Cancel) is rejected with `ErrIdentityRequired`. RunID is task-scoped: the foreground run IS a task; the background task has its own ID.

Persistence. Each lifecycle transition writes through the configured `state.StateStore` as `StateRecord{Kind: "task.lifecycle", Bytes: marshal(task)}`. The wrapper layer is the typed adapter per D-027 — opaque bytes go to the leaf store. Caller-side audit redaction runs against `Description`, `Query`, `Result`, and `Error` BEFORE Save (per D-020).

Bus events. Each lifecycle transition emits one of the registered `task.*` event types on the configured `events.EventBus`; payloads are typed (one struct per event type) and carry the `TaskID`, prior status (where applicable), and new status. Identity is on `Event.Identity` (the existing `Quadruple` field).

SpawnTool. The `SpawnTool` surface lifts from RFC §6.8 verbatim so the FSM models `task.tool` lifecycle today. Actual tool dispatch wiring lands at Phase 26+; in Phase 20 `SpawnTool` returns a `TaskHandle` whose execution body is a no-op stub — the task persists at `StatusPending` and never auto-advances.

Index

Constants

View Source
const (
	// EventTypeTaskSpawned — emitted by Spawn / SpawnTool when a
	// fresh task is persisted (NOT emitted on idempotency-key reuse).
	EventTypeTaskSpawned events.EventType = "task.spawned"
	// EventTypeTaskStarted — emitted by MarkRunning when a task
	// transitions from Pending or Paused → Running.
	EventTypeTaskStarted events.EventType = "task.started"
	// EventTypeTaskPaused — emitted by MarkPaused (Running → Paused).
	// No V1 production caller drives MarkPaused on the live pause path
	// (a paused run stays `running` in the task projection; pause state
	// travels on the `pause.*` events — see the Protocol pause model),
	// so the shipped runtime does not produce this type; the registry
	// driver contract and the conformance suite keep it canonical.
	EventTypeTaskPaused events.EventType = "task.paused"
	// EventTypeTaskResumed — emitted by MarkResumed (Paused → Running).
	// Distinct from task.started so subscribers can tell pause-resume
	// from initial start. Like task.paused, not produced on the V1 live
	// pause path — subscribe to `pause.resumed` for live resume signals.
	EventTypeTaskResumed events.EventType = "task.resumed"
	// EventTypeTaskCompleted — emitted by MarkComplete (Running →
	// Complete; terminal).
	EventTypeTaskCompleted events.EventType = "task.completed"
	// EventTypeTaskFailed — emitted by MarkFailed (Running → Failed;
	// terminal).
	EventTypeTaskFailed events.EventType = "task.failed"
	// EventTypeTaskCancelled — emitted by Cancel for the target task
	// AND each cascaded descendant.
	EventTypeTaskCancelled events.EventType = "task.cancelled"
	// EventTypeTaskPrioritised — emitted by Prioritize when the
	// task's priority value changes.
	EventTypeTaskPrioritised events.EventType = "task.prioritised"

	// EventTypeTaskGroupCreated — emitted by ResolveOrCreateGroup on
	// the first creation (NOT emitted on idempotent-return).
	EventTypeTaskGroupCreated events.EventType = "task.group_created"
	// EventTypeTaskGroupSealed — emitted by SealGroup /
	// ApplyGroup(ActionSeal) when the group transitions Open → Sealed.
	EventTypeTaskGroupSealed events.EventType = "task.group_sealed"
	// EventTypeTaskGroupResolved — emitted when a sealed group's last
	// non-terminal member transitions to terminal AND the group's
	// final status is `GroupCompleted`. Payload is `GroupCompletion`
	// — the SAME canonical shape `WatchGroup` delivers.
	EventTypeTaskGroupResolved events.EventType = "task.group_resolved"
	// EventTypeTaskGroupCancelled — emitted by CancelGroup or by the
	// driver's FailFast cascade when the group transitions to
	// `GroupCancelled`. Payload is `GroupCompletion` with
	// `FinalStatus = GroupCancelled` and `Reason` populated.
	EventTypeTaskGroupCancelled events.EventType = "task.group_cancelled"
	// EventTypeTaskPatchApplied — emitted by
	// ApplyPatch(action=PatchAccept) on the pending → applied
	// transition.
	EventTypeTaskPatchApplied events.EventType = "task.patch_applied"
	// EventTypeTaskPatchRejected — emitted by
	// ApplyPatch(action=PatchReject) on the pending → rejected
	// transition.
	EventTypeTaskPatchRejected events.EventType = "task.patch_rejected"
	// EventTypeTaskBackgroundAcknowledged — emitted once per task by
	// AcknowledgeBackground on the un-ack → ack transition.
	EventTypeTaskBackgroundAcknowledged events.EventType = "task.background_acknowledged"
)

Task lifecycle event types. Each is registered with the events package's exhaustive registry via init() so Publish accepts them without ErrUnknownEventType. Registration follows the Phase 08 sessions precedent — the consumer subsystem registers its own events alongside its typed payloads, keeping the events package free of task-domain knowledge.

Phase 20 ships eight types: seven lifecycle transitions plus `task.prioritised` for caller-driven priority updates. Phase 21 adds seven more group/patch types — `task.group_created`, `task.group_sealed`, `task.group_resolved`, `task.group_cancelled`, `task.patch_applied`, `task.patch_rejected`, `task.background_acknowledged`. The `task.group_resolved` payload IS `GroupCompletion` so subscribers (planner runtime, Console, durable event log, sidecar status emitters) consume one canonical shape regardless of how they're wired.

View Source
const (
	// PropagateCascade walks the children index in BFS order,
	// transitioning each descendant to StatusCancelled. The default.
	PropagateCascade = "cascade"
	// PropagateIsolate only transitions the target task; children
	// keep running.
	PropagateIsolate = "isolate"
)

PropagateOnCancel controls how `Cancel` walks descendants.

View Source
const DefaultDriver = "inprocess"

DefaultDriver is the Phase 20 production driver name. Phase 87+ post-V1 work may register additional names; Open switches on `cfg.Driver`.

View Source
const GroupKind = "task.group"

GroupKind is the StateStore Kind constant for group lifecycle records.

View Source
const LifecycleKind = "task.lifecycle"

LifecycleKind is the StateStore Kind constant for task-lifecycle records. Centralised so callers / tests / Phase 60 Protocol mappers reference one symbol.

View Source
const PatchKind = "task.patch"

PatchKind is the StateStore Kind constant for patch records. Phase 21 persists pending / applied / rejected patch state through the same typed-wrapper-over-generic adapter the per-task surface uses (D-027).

Variables

View Source
var (
	// ErrGroupNotFound — Get / Seal / Cancel / Apply / WatchGroup
	// targeting a TaskGroupID that has no record (or the group is not
	// visible to the ctx identity).
	ErrGroupNotFound = errors.New("tasks: group not found")
	// ErrGroupSealed — mutation attempted on a sealed group (e.g.
	// spawning a new member into a sealed group).
	ErrGroupSealed = errors.New("tasks: group is sealed; cannot mutate membership")
	// ErrGroupNotSealed — `ApplyGroup(ActionResolve)` called on an
	// open (not-yet-sealed) group.
	ErrGroupNotSealed = errors.New("tasks: group must be sealed before resolve")
	// ErrGroupInvalidTransition — FSM transition not in the table
	// (e.g. Sealed → Open; Completed → Anything).
	ErrGroupInvalidTransition = errors.New("tasks: invalid group transition")
	// ErrPatchNotFound — `ApplyPatch` targeting an unknown patch ID.
	ErrPatchNotFound = errors.New("tasks: patch not found")
)

Sentinel errors for the group + patch surface. Callers compare via `errors.Is`.

View Source
var (
	// ErrNotFound — Get / Cancel / Prioritize / Mark* targeting a
	// TaskID that has no record (or the record is not visible to the
	// ctx identity).
	ErrNotFound = errors.New("tasks: task not found")
	// ErrInvalidTransition — Mark* called for a transition that is
	// not in the FSM table (e.g. Pending → Complete skipping Running).
	ErrInvalidTransition = errors.New("tasks: invalid lifecycle transition")
	// ErrIdempotencyConflict — Spawn called with a previously-seen
	// IdempotencyKey but a divergent SpawnRequest. Tells the caller
	// a retry policy bug exists upstream.
	ErrIdempotencyConflict = errors.New("tasks: idempotency key reused with divergent SpawnRequest")
	// ErrIdentityRequired — Spawn / Get / List / Cancel / Prioritize
	// / Mark* called with an Identity missing one of (tenant, user,
	// session). Identity is mandatory (D-001).
	ErrIdentityRequired = errors.New("tasks: identity required (tenant/user/session)")
	// ErrUnknownDriver — Open was asked for a driver name no
	// registered factory handles.
	ErrUnknownDriver = errors.New("tasks: unknown driver")
	// ErrRegistryClosed — any operation called after Close.
	ErrRegistryClosed = errors.New("tasks: registry is closed")
	// ErrInvalidRequest — SpawnRequest / SpawnToolRequest fails
	// structural validation (empty Kind, negative priority, unknown
	// PropagateOnCancel value, etc.).
	ErrInvalidRequest = errors.New("tasks: invalid request")
)

Sentinel errors. Callers compare via errors.Is.

View Source
var ErrInvalidListFilter = errors.New("tasks: invalid tasks.list wire filter")

ErrInvalidListFilter is returned by ListFilterFromWire when the wire filter names an enum value outside the canonical task-status / task-kind sets. Callers compare with errors.Is.

Functions

func Register

func Register(name string, factory Factory)

Register installs a driver factory under name. Drivers self-register from their package init(); cmd/harbor blank-imports the production driver to trigger registration. Per AGENTS.md §4.4.

Re-registering the same name panics — the registration model is write-once-at-init and a duplicate signals a build mis-configuration.

func RegisteredDrivers

func RegisteredDrivers() []string

RegisteredDrivers returns a sorted list of driver names. Useful for boot-log output and surfacing in error messages.

func ValidateRequest

func ValidateRequest(req SpawnRequest) error

ValidateRequest checks structural invariants Spawn needs before touching driver storage: identity triple present, Kind known, PropagateOnCancel known (or empty for the default).

func ValidateToolRequest

func ValidateToolRequest(req SpawnToolRequest) error

ValidateToolRequest mirrors ValidateRequest for SpawnToolRequest.

func WithRegistry

func WithRegistry(ctx context.Context, r TaskRegistry) context.Context

WithRegistry attaches r to ctx so downstream handlers can recover it via MustFrom or From.

Types

type Dependencies

type Dependencies struct {
	// Store is the StateStore used to persist task lifecycle records
	// (D-027 typed-wrapper-over-generic). Required.
	Store state.StateStore
	// Bus is the EventBus where lifecycle events land. Required.
	Bus events.EventBus
	// Redactor is the audit redactor applied to Description / Query
	// / Result / Error BEFORE Save (D-020). Required; wiring code
	// passes the global redactor here.
	Redactor audit.Redactor
	// Cfg carries Phase 20's TasksConfig (driver name today; Phase
	// 21 adds RetainTurnTimeout + ContinuationHopLimit).
	Cfg config.TasksConfig
}

Dependencies bundles the wiring inputs every TaskRegistry driver needs. Sessions / state / events are required; the redactor and config are passed through verbatim. Wiring lives in `cmd/harbor` (or test helpers); the registry never reaches into ctx for these.

type Factory

type Factory func(deps Dependencies) (TaskRegistry, error)

Factory builds a TaskRegistry from a Dependencies struct. Drivers expose one Factory each via init() → Register.

type GroupAction

type GroupAction string

GroupAction is the verb a caller passes to `ApplyGroup`.

const (
	// ActionSeal seals an open group, freezing membership. Equivalent
	// to `SealGroup`. Idempotent on an already-sealed group's no-op
	// path; invalid on a terminal group (returns
	// `ErrGroupInvalidTransition`).
	ActionSeal GroupAction = "seal"
	// ActionCancel cancels a non-terminal group (propagate = true).
	// Equivalent to `CancelGroup(reason="action:cancel", propagate=true)`.
	ActionCancel GroupAction = "cancel"
	// ActionResolve marks a sealed group as `Completed` when the
	// caller knows all members are done (used by drivers that defer
	// resolution to an external signal — Phase 21's in-process driver
	// resolves automatically when all members are terminal, so this
	// action is rarely needed but exposed for symmetry with the brief
	// 05 surface).
	ActionResolve GroupAction = "resolve"
)

Group actions.

type GroupCompletion

type GroupCompletion struct {
	GroupID     TaskGroupID
	SessionID   identity.Identity
	OwnerTaskID TaskID
	FinalStatus TaskGroupStatus // GroupCompleted | GroupCancelled
	ResolvedAt  time.Time
	Members     []MemberOutcome
	Reason      string // populated for GroupCancelled (cancel reason); empty otherwise
}

GroupCompletion is the typed wake-up payload delivered by `WatchGroup` (and as the `task.group_resolved` bus-event payload). Carries the group's terminal status, the resolve timestamp, the cancel reason (populated when `FinalStatus == GroupCancelled`), and a `MemberOutcome` per group member.

Discipline: `MemberOutcome.Result` is ref-shaped. Heavy results upstream MUST already have been substituted with `ArtifactRef`s (D-022, D-026); the payload is NOT byte-bound. A `TaskResult.Value` carrying inline bytes above the heavy-output threshold is a leak — the LLM-edge enforcement pass will fail loudly. The conformance suite's `WatchGroup_Push_DeliversCompletionPayload` verifies the ref-shaped contract by stuffing an ArtifactRef-shaped JSON into a member result and asserting it round-trips through the payload unchanged.

Concurrent reuse: multiple `WatchGroup` subscribers on the same group all receive the same payload (D-025). The driver fans out the payload to each subscriber's buffered-size-1 channel without blocking.

type GroupRequest

type GroupRequest struct {
	ID          TaskGroupID
	SessionID   identity.Identity
	OwnerTaskID TaskID
	RetainTurn  bool
	FailFast    bool
	Description string
}

GroupRequest is the input shape for `ResolveOrCreateGroup`. `ID` is optional — empty → the registry assigns a fresh ULID and returns the new group; non-empty + already-existing → the existing group is returned (idempotency).

Identity is mandatory.

type MemberOutcome

type MemberOutcome struct {
	TaskID TaskID
	Status TaskStatus
	Result *TaskResult
	Error  *TaskError
}

MemberOutcome is the per-task terminal record carried inside `GroupCompletion`. Either `Result` is populated (when `Status == StatusComplete`) or `Error` is populated (when `Status == StatusFailed`); neither is populated when `Status == StatusCancelled`.

type Patch

type Patch struct {
	ID        string
	SessionID identity.Identity
	Status    string // "pending" | "applied" | "rejected"
	Bytes     []byte
	CreatedAt time.Time
	UpdatedAt time.Time
}

Patch is the persisted patch record. The `Bytes` slot carries the caller-shaped patch payload; the registry does not interpret it. Identity is mandatory.

type PatchAction

type PatchAction string

PatchAction is the verb a caller passes to `ApplyPatch`. Patches transition `pending → applied | rejected` through the registry. The patch payload is opaque bytes — the actual context-patch shape lives at the planner (Phase 42+); Phase 21 stores + retrieves; the planner consumes.

const (
	// PatchAccept transitions a pending patch to `applied`.
	PatchAccept PatchAction = "accept"
	// PatchReject transitions a pending patch to `rejected`.
	PatchReject PatchAction = "reject"
)

Patch actions.

type SpawnRequest

type SpawnRequest struct {
	Identity          identity.Quadruple
	Kind              TaskKind
	ParentTaskID      *TaskID
	Description       string
	Query             string
	Priority          int
	IdempotencyKey    string
	PropagateOnCancel string
	NotifyOnComplete  bool
	GroupID           TaskGroupID
	// InputArtifactIDs are operator-uploaded multimodal inputs the
	// task carries onto its first planner turn (Round-7 F11 / D-166).
	// Persisted onto `Task.InputArtifactIDs`; consumed by the run
	// loop's first-turn materializer. Empty is the text-only default.
	InputArtifactIDs []string
}

SpawnRequest is the input shape for `Spawn`. Identity is mandatory. `IdempotencyKey` is namespaced by `Identity.SessionID`: same key across different sessions creates two distinct tasks.

`PropagateOnCancel` defaults to "cascade" when empty; "isolate" is opt-in for tasks that must survive a parent's cancellation.

`GroupID` (Phase 21, optional) wires the new task into an existing `TaskGroup`. The driver verifies the group is `Open` (sealed or terminal groups reject with `ErrGroupSealed`) and registers the new task as a member. Empty `GroupID` is the default — most foreground turns aren't group members.

type SpawnToolRequest

type SpawnToolRequest struct {
	Identity          identity.Quadruple
	ParentTaskID      *TaskID
	ToolName          string
	ToolArgs          json.RawMessage
	Description       string
	Priority          int
	IdempotencyKey    string
	PropagateOnCancel string
	NotifyOnComplete  bool
	GroupID           TaskGroupID
}

SpawnToolRequest is the input shape for `SpawnTool`. The shape lifts from RFC §6.8 verbatim so the FSM models tool-task lifecycle today; actual tool dispatch wiring lands at Phase 26.

Phase 20's `SpawnTool` execution body is a no-op stub: the task is persisted at `StatusPending` and never auto-advances. The runtime engine (Phase 26) drives the lifecycle once dispatch is wired.

`GroupID` (Phase 21, optional) wires the new tool task into an existing `TaskGroup`. See `SpawnRequest.GroupID` for the contract.

type Task

type Task struct {
	ID                TaskID
	Identity          identity.Quadruple
	Kind              TaskKind
	Status            TaskStatus
	Priority          int
	ParentTaskID      *TaskID
	Description       string
	Query             string
	Result            *TaskResult
	Error             *TaskError
	PropagateOnCancel string
	NotifyOnComplete  bool
	IdempotencyKey    string
	CreatedAt         int64 // unix nanoseconds; matches sessions / events convention
	UpdatedAt         int64 // unix nanoseconds
	// ToolCount is the running count of tool dispatches the runtime
	// has performed against this task. Advanced exclusively through
	// `TaskRegistry.IncrementToolCount` — never set directly by callers
	// (Phase 83m item 7). Projected to `prototypes.TaskRow.ToolCount`
	// for the Console Tasks page.
	ToolCount int
	// InputArtifactIDs carry operator-uploaded multimodal inputs the
	// run consumes on its first planner turn (Round-7 F11 / D-166).
	// The run loop materializes these into `RunContext.InputArtifacts`
	// via the per-MIME dispatcher: image bytes inline as
	// `ImagePart.DataURL`; everything else stays as an `ArtifactStub`
	// the LLM routes to a matching tool through the tool catalog. Empty
	// is the common case — text-only turns.
	InputArtifactIDs []string
}

Task is the persisted lifecycle record for one task. The Identity quadruple is captured immutably on Spawn; the runtime engine drives state transitions via the registry's Mark* methods.

Group/patch fields are reserved for Phase 21 — the surface is intentionally narrow at Phase 20 so the Phase 21 PR adds those fields against a stable shape.

type TaskBackgroundAcknowledgedPayload

type TaskBackgroundAcknowledgedPayload struct {
	events.SafeSealed
	TaskID TaskID
}

TaskBackgroundAcknowledgedPayload reports AcknowledgeBackground for a single task (one event per task). SafePayload by construction.

type TaskCancelledPayload

type TaskCancelledPayload struct {
	events.SafeSealed
	TaskID   TaskID
	Reason   string
	Cascaded bool
}

TaskCancelledPayload reports a Cancel transition. `Cascaded` is true when this payload landed because of a parent's cascade-cancel (so subscribers can distinguish operator cancel from cascade). `Reason` is the operator-supplied reason (a short caller-controlled string; callers MUST NOT pass tool args, raw user input, or any secret-shaped material — the same SafePayload contract sessions uses for ClosedReason).

SafePayload by construction.

type TaskCompletedPayload

type TaskCompletedPayload struct {
	events.SafeSealed
	TaskID TaskID
}

TaskCompletedPayload reports MarkComplete. The result is on the Task record itself; this payload only signals the transition so subscribers do not see an unredacted result by accident. The caller pre-redacts `TaskResult.Value` before MarkComplete (D-020).

SafePayload by construction.

type TaskError

type TaskError struct {
	Code    string
	Message string
}

TaskError carries the failure payload. `Code` is a caller-defined short string; `Message` is the human-readable explanation, also pre-redacted by the caller.

type TaskFailedPayload

type TaskFailedPayload struct {
	events.SafeSealed
	TaskID    TaskID
	ErrorCode string
}

TaskFailedPayload reports MarkFailed. Carries the error code; the caller-controlled message is on the Task record (already redacted by the caller). SafePayload by construction.

type TaskFilter

type TaskFilter struct {
	Status   *TaskStatus
	Kind     *TaskKind
	ParentID *TaskID
}

TaskFilter is the read-side filter for `List`.

Empty pointer fields are wildcards: a zero-valued `TaskFilter` returns every task in the session.

func ListFilterFromWire

func ListFilterFromWire(wire *prototypes.TaskFilter) (TaskFilter, error)

ListFilterFromWire translates the Protocol-layer `types.TaskFilter` into the runtime-internal `tasks.TaskFilter` the `TaskRegistry.List` method consumes. It narrows the registry filter ONLY on the facets the single-valued registry `TaskFilter` can express (a one-element status / kind set, and the parent-task pointer); every richer facet is left to the Protocol-layer `filterMatches` pass.

The function is pure: it allocates and returns a fresh `tasks.TaskFilter`, reads only its argument, and holds no state. It is therefore trivially safe for concurrent use by N goroutines (D-025) — there is no reusable artifact to leak.

type TaskGroup

type TaskGroup struct {
	ID          TaskGroupID
	SessionID   identity.Identity
	OwnerTaskID TaskID
	Status      TaskGroupStatus
	RetainTurn  bool
	FailFast    bool
	Members     []TaskID
	Description string
	CreatedAt   time.Time
	UpdatedAt   time.Time
	ResolvedAt  *time.Time
}

TaskGroup is the persisted group record. The Identity is captured from the owning session at create time; cross-session group membership is forbidden (V1; nesting / cross-session lands post-V1 if a planner concrete genuinely needs it — brief 05).

`Members` is the in-order list of member task IDs assigned during the `Open` phase via the spawn-with-GroupID seam (Phase 26+ wires `SpawnTool`'s group hookup). Phase 21 ships the group surface; the member-assignment wiring is exercised by the conformance suite directly through the new `AddMember` driver helper.

`RetainTurn` is the foreground-blocking flag — when true, the owning session blocks foreground-turn dispatch until the group reaches a terminal state.

`FailFast` cancels remaining members when the first member fails. `ResolvedAt` is non-nil only on terminal status.

type TaskGroupCancelledPayload

type TaskGroupCancelledPayload struct {
	events.SafeSealed
	Completion GroupCompletion
}

TaskGroupCancelledPayload reports a CancelGroup transition (or a FailFast cascade-cancel). Same `GroupCompletion` shape as resolved, with `FinalStatus = GroupCancelled` and `Reason` populated.

SafePayload by construction.

type TaskGroupCreatedPayload

type TaskGroupCreatedPayload struct {
	events.SafeSealed
	GroupID     TaskGroupID
	OwnerTaskID TaskID
	RetainTurn  bool
	FailFast    bool
	Description string
}

TaskGroupCreatedPayload reports a successful ResolveOrCreateGroup on the first-creation path. Carries the assigned group ID, owner task, retain-turn / fail-fast flags, and the description. SafePayload by construction — the description is a caller-controlled short string with the same SafePayload contract as session.opened's `ClosedReason`.

type TaskGroupID

type TaskGroupID string

TaskGroupID is the ULID-shaped identifier for a `TaskGroup`. The caller MAY pre-assign in `GroupRequest.ID` (idempotency); empty → the registry assigns a fresh ULID.

type TaskGroupResolvedPayload

type TaskGroupResolvedPayload struct {
	events.SafeSealed
	Completion GroupCompletion
}

TaskGroupResolvedPayload reports a sealed group's terminal transition to `GroupCompleted`. The payload doubles as the `WatchGroup` wake-up payload — same canonical `GroupCompletion` shape so subscribers (Console, planner, sidecar status emitters) consume one shape regardless of how they're wired.

SafePayload by construction. `MemberOutcome.Result` is ref-shaped (D-022, D-026); heavy bytes should already be ArtifactRefs upstream.

type TaskGroupSealedPayload

type TaskGroupSealedPayload struct {
	events.SafeSealed
	GroupID  TaskGroupID
	Members  []TaskID
	SealedAt int64 // unix nanoseconds
}

TaskGroupSealedPayload reports a SealGroup transition. SafePayload by construction.

type TaskGroupStatus

type TaskGroupStatus string

TaskGroupStatus is the group lifecycle state. The FSM lives at the driver:

Open ──Seal──▶ Sealed ──(all members terminal)──▶ Completed
  │                │
  │                └──Cancel──▶ Cancelled
  └──Cancel──▶ Cancelled (valid from Open too)

`Completed` and `Cancelled` are terminal. Invalid transitions return `ErrGroupInvalidTransition`.

const (
	// GroupOpen is the initial state — members may be added.
	GroupOpen TaskGroupStatus = "open"
	// GroupSealed freezes membership; the group still has non-terminal
	// members. Sealed groups CANNOT accept new members
	// (`SpawnTool` / `Spawn` carrying a `GroupID` for a sealed group
	// returns `ErrGroupSealed`).
	GroupSealed TaskGroupStatus = "sealed"
	// GroupCompleted is the terminal-success state — the group was
	// sealed AND every member reached terminal state without
	// triggering a `FailFast` cancellation.
	GroupCompleted TaskGroupStatus = "completed"
	// GroupCancelled is the terminal-failure state — the group was
	// cancelled by `CancelGroup`, or `FailFast` was true and a member
	// transitioned to `StatusFailed`.
	GroupCancelled TaskGroupStatus = "cancelled"
)

Group statuses.

type TaskHandle

type TaskHandle struct {
	ID     TaskID
	Reused bool
}

TaskHandle is the return shape of `Spawn` / `SpawnTool`. `Reused` is true when an idempotency-key match returned an existing handle.

type TaskID

type TaskID string

TaskID is the unified identifier covering both foreground runs and background tasks (brief 05 §1). ULID-shaped at construction time; the registry assigns the value, callers do not.

type TaskKind

type TaskKind string

TaskKind distinguishes a foreground task (a run inside a session's primary turn) from a background task (a spawned-without-blocking task). Both share the same TaskID namespace; this field is the discriminator.

const (
	// KindForeground is the kind for a run inside a session's primary
	// turn — what the predecessor called a "trace_id" lives here under
	// the unified TaskID.
	KindForeground TaskKind = "foreground"
	// KindBackground is the kind for a task spawned without blocking
	// the parent run.
	KindBackground TaskKind = "background"
)

Task kinds.

type TaskPatchAppliedPayload

type TaskPatchAppliedPayload struct {
	events.SafeSealed
	PatchID string
}

TaskPatchAppliedPayload reports ApplyPatch(action=PatchAccept). Carries the patch ID; the patch payload bytes are on the Patch record (already through the audit redactor; we do NOT inline the bytes here). SafePayload by construction.

type TaskPatchRejectedPayload

type TaskPatchRejectedPayload struct {
	events.SafeSealed
	PatchID string
}

TaskPatchRejectedPayload reports ApplyPatch(action=PatchReject). SafePayload by construction.

type TaskPausedPayload

type TaskPausedPayload struct {
	events.SafeSealed
	TaskID TaskID
}

TaskPausedPayload reports MarkPaused. SafePayload by construction.

type TaskPrioritisedPayload

type TaskPrioritisedPayload struct {
	events.SafeSealed
	TaskID        TaskID
	PriorPriority int
	NewPriority   int
}

TaskPrioritisedPayload reports Prioritize. Carries the new and prior priority values. SafePayload by construction.

type TaskRegistry

type TaskRegistry interface {
	// Spawn creates a new task or returns the existing handle when an
	// idempotency-key match is found. Returns `ErrIdentityRequired`
	// when the request's identity triple is incomplete.
	Spawn(ctx context.Context, req SpawnRequest) (TaskHandle, error)

	// SpawnTool creates a task representing a tool invocation. Phase
	// 20 ships the surface; tool dispatch wiring lands at Phase 26+
	// — the persisted task stays at `StatusPending` until then.
	SpawnTool(ctx context.Context, req SpawnToolRequest) (TaskHandle, error)

	// Get loads the task with `id`. Returns `ErrNotFound` (wrapped)
	// when no record exists or the task is not visible to the ctx
	// identity (cross-tenant / cross-session reads are rejected).
	Get(ctx context.Context, id TaskID) (*Task, error)

	// List returns task summaries for the given session matching `f`.
	// Empty pointer fields in `f` are wildcards.
	List(ctx context.Context, sessionID identity.Identity, f TaskFilter) ([]TaskSummary, error)

	// Cancel transitions the task to `StatusCancelled`. The descendant
	// walk depends on the task's `PropagateOnCancel`:
	//
	//   - "cascade" (default): BFS through children, each emitting
	//     `task.cancelled`.
	//   - "isolate": only the target transitions.
	//
	// Returns (true, nil) when the task transitioned; (false, nil)
	// when the task was already terminal (Cancel is idempotent on
	// already-terminal states). Returns `ErrNotFound` when the task
	// does not exist.
	Cancel(ctx context.Context, id TaskID, reason string) (bool, error)

	// Prioritize updates the task's `Priority`. Phase 20 stores the
	// value but does not preempt or reorder execution — scheduling is
	// the runtime engine's concern. Emits `task.prioritised`.
	//
	// Returns (true, nil) when the priority changed; (false, nil)
	// when the value matched (no-op write).
	Prioritize(ctx context.Context, id TaskID, priority int) (bool, error)

	// MarkRunning transitions Pending or Paused → Running. Invalid
	// transitions return `ErrInvalidTransition`.
	MarkRunning(ctx context.Context, id TaskID) error

	// MarkPaused transitions Running → Paused.
	MarkPaused(ctx context.Context, id TaskID) error

	// MarkResumed transitions Paused → Running. Distinct method (vs
	// MarkRunning) so the bus event can be `task.resumed` rather than
	// `task.started`.
	MarkResumed(ctx context.Context, id TaskID) error

	// MarkComplete transitions Running → Complete. Persists `result`
	// on the Task record; emits `task.completed`.
	MarkComplete(ctx context.Context, id TaskID, result TaskResult) error

	// MarkFailed transitions Running → Failed. Persists `err` on the
	// Task record; emits `task.failed`.
	MarkFailed(ctx context.Context, id TaskID, err TaskError) error

	// IncrementToolCount atomically increments `Task.ToolCount` by 1
	// and persists the updated record (Phase 83m item 7). NOT
	// idempotent — every call increments. The new value is reflected
	// on the next `Get` / `List` projection (`prototypes.TaskRow.ToolCount`).
	//
	// Returns `ErrNotFound` when the task does not exist or is not
	// visible to the ctx identity; `ErrRegistryClosed` after Close.
	// Does NOT change the task's FSM status — runs against tasks in
	// any non-terminal state. Terminal tasks (Complete / Failed /
	// Cancelled) still accept increments (the runloop's late-arriving
	// tool dispatches against a cancelled run can still be counted);
	// the storage write is unconditional.
	//
	// The runloop calls this from its CallTool dispatch path once the
	// ToolExecutor returns without error — that is the only documented
	// producer in V1.
	IncrementToolCount(ctx context.Context, id TaskID) error

	// ResolveOrCreateGroup is the idempotent group constructor. Empty
	// `GroupRequest.ID` → registry assigns a fresh ULID. Non-empty +
	// already-existing → the existing group is returned unchanged.
	// Identity is mandatory; cross-session reuse of an ID is rejected
	// with `ErrGroupNotFound` (existence-without-access).
	//
	// Emits `task.group_created` on the first creation; the no-op
	// idempotent return does NOT re-emit.
	ResolveOrCreateGroup(ctx context.Context, req GroupRequest) (*TaskGroup, error)

	// SealGroup transitions an open group to `GroupSealed`, freezing
	// membership. Sealed groups still have non-terminal members; the
	// driver resolves the group automatically when the last member
	// transitions to terminal.
	//
	// Invalid transitions (already sealed; terminal) return
	// `ErrGroupInvalidTransition`. Emits `task.group_sealed`.
	SealGroup(ctx context.Context, id TaskGroupID) error

	// CancelGroup transitions a non-terminal group to `GroupCancelled`
	// and (when `propagate=true`) cancels every non-terminal member
	// task. The `reason` is a short caller-controlled string (same
	// `SafePayload` contract as `Cancel`'s reason).
	//
	// Emits `task.group_cancelled` carrying the canonical
	// `GroupCompletion` payload (so `WatchGroup` subscribers receive
	// the cancel-with-reason as a single typed delivery).
	CancelGroup(ctx context.Context, id TaskGroupID, reason string, propagate bool) error

	// ApplyGroup is the action-verb wrapper over `SealGroup` /
	// `CancelGroup` / explicit resolve. Convenience for callers that
	// dispatch by enum.
	//
	//   - ActionSeal    → SealGroup
	//   - ActionCancel  → CancelGroup(reason="action:cancel", propagate=true)
	//   - ActionResolve → mark sealed → completed (errors with
	//                     `ErrGroupNotSealed` on an open group)
	ApplyGroup(ctx context.Context, id TaskGroupID, action GroupAction) error

	// ListGroups returns the groups owned by `sessionID` matching the
	// optional `status` filter (nil = wildcard). Empty list + nil
	// error is the no-groups case; missing-identity returns
	// `ErrIdentityRequired`.
	ListGroups(ctx context.Context, sessionID identity.Identity, status *TaskGroupStatus) ([]TaskGroup, error)

	// ApplyPatch transitions a pending patch through
	// `pending → applied | rejected`. Returns `(true, nil)` when the
	// transition occurred; `(false, nil)` when the patch was already
	// in the target terminal state (idempotent). Returns
	// `ErrPatchNotFound` on a missing patch ID.
	//
	// Patches are persisted through StateStore under `Kind =
	// "task.patch"`. The patch payload is opaque bytes (D-027); the
	// actual context-patch shape lives at the planner (Phase 42+).
	//
	// Emits `task.patch_applied` or `task.patch_rejected` on a real
	// transition; no re-emit on the no-op path.
	ApplyPatch(ctx context.Context, sessionID identity.Identity, patchID string, action PatchAction) (bool, error)

	// AcknowledgeBackground marks completed background tasks as
	// user-acknowledged. Returns the count of tasks that transitioned
	// from un-acknowledged → acknowledged (idempotent on a re-ack).
	//
	// Emits one `task.background_acknowledged` event per task on the
	// real-transition path. Unknown task IDs are silently skipped (no
	// error; the count reflects only the real ack transitions).
	AcknowledgeBackground(ctx context.Context, sessionID identity.Identity, ids []TaskID) (int, error)

	// RegisterRetainTurnWaiter returns a channel that closes when the
	// session's earliest-active retain-turn group resolves, and a
	// cancel func that unsubscribes. The runtime engine consumes the
	// closed channel as the "all retain-turn groups have resolved"
	// signal so foreground-turn dispatch can resume.
	//
	// Buffered size 1 — the resolve path delivers the resolved
	// group's ID without blocking even if the consumer is slow.
	// Channel close is the termination signal; the optional payload
	// is the ID of the group whose terminal transition triggered the
	// wake.
	//
	// Implementations are required to close the channel exactly once.
	RegisterRetainTurnWaiter(sessionID identity.Identity) (<-chan TaskGroupID, func())

	// WatchGroup is the non-retain-turn dual of
	// `RegisterRetainTurnWaiter`: it does NOT block any foreground
	// turn — the planner is free to proceed while the group runs in
	// the background. When the group reaches a terminal state, the
	// runtime delivers a typed `GroupCompletion` payload on the
	// returned channel and closes it.
	//
	// Callers typically use this as a "wake the planner" signal so
	// background results integrate back into the conversation; see
	// the "Wake policy modes" godoc in `groups.go` for the three
	// patterns (push, poll, hybrid) the planner runtime can implement
	// against this single mechanism.
	//
	// Returns `ErrGroupNotFound` when the group is unknown at
	// registration time (e.g. resolved + GC'd). For a
	// resolved-but-still-tracked group, the implementation returns a
	// channel that is *already* primed with the cached
	// `GroupCompletion` (so late subscribers don't deadlock).
	//
	// The cancel func unsubscribes; calling it after a delivery is a
	// no-op. The channel is closed exactly once — either by the
	// resolve path (with a delivery) or by the cancel path (without
	// a delivery).
	//
	// Concurrent reuse: multiple subscribers on the same group all
	// receive the same payload (D-025).
	WatchGroup(sessionID identity.Identity, groupID TaskGroupID) (<-chan GroupCompletion, func(), error)

	// Close releases registry resources. Subsequent operations return
	// `ErrRegistryClosed`. Idempotent.
	Close(ctx context.Context) error
}

TaskRegistry is the orchestration surface for the task subsystem.

Implementations MUST be safe for concurrent use by N goroutines against a single shared instance (D-025). Mutable state must be guarded; per-call state lives in `ctx`, never on the driver.

The Mark* methods are the lifecycle drive-points called by the runtime engine; Cancel / Prioritize are caller-initiated (planner, steering, Console).

func From

func From(ctx context.Context) (TaskRegistry, bool)

From returns the TaskRegistry in ctx and a presence bool. Use when absence is recoverable.

func MustFrom

func MustFrom(ctx context.Context) TaskRegistry

MustFrom returns the TaskRegistry in ctx; panics with ErrRegistryClosed (used as the sentinel for "no registry configured") when none is present. Use in handler/runtime paths where a registry is mandatory.

func Open

Open returns the TaskRegistry built by the factory whose name matches deps.Cfg.Driver (defaults to DefaultDriver when empty).

func OpenDriver

func OpenDriver(name string, deps Dependencies) (TaskRegistry, error)

OpenDriver opens a specific driver by name; useful for tests that want to exercise the registry against a non-default driver.

type TaskResult

type TaskResult struct {
	Value json.RawMessage
}

TaskResult carries the successful-completion payload. `Value` is pre-redacted by the caller (D-020); the registry stores it verbatim.

Phase 106 (V1.2) pins the answer-envelope contract: when the run-loop driver (cmd/harbor/cmd_dev_runloop.go::handleSpawn) produces TaskResult from a planner.Finish, `Value` is the JSON encoding of:

{
  "answer":          string,  // the LLM's natural-language answer
  "finish_reason":   string,  // planner.FinishReason as string
  "tool_calls_seen": int      // len(traj.Steps) at finish
}

Consumers (Console Playground, CLI, third-party UIs) MAY rely on this shape. Future planners that return richer answers (markdown structure, multimodal) will EXTEND the shape with new keys, never break existing ones (forward-compatible additive evolution).

type TaskResumedPayload

type TaskResumedPayload struct {
	events.SafeSealed
	TaskID TaskID
}

TaskResumedPayload reports MarkResumed. SafePayload by construction.

type TaskSpawnedPayload

type TaskSpawnedPayload struct {
	events.SafeSealed
	TaskID         TaskID
	Kind           TaskKind
	ParentTaskID   TaskID // empty when no parent
	Priority       int
	IdempotencyKey string
}

TaskSpawnedPayload reports a successful Spawn / SpawnTool. Carries the assigned TaskID, the kind, and the parent (when any). Identity lives on the Event itself, intentionally not duplicated here.

SafePayload by construction — every field is internal bookkeeping (TaskID is a registry-assigned ULID; Kind is an enum; ParentID is caller-controlled but bounded; IdempotencyKey is caller-controlled and may carry caller-meaningful tokens, but task identifiers are not secret-shaped — same threat model as session.opened's SessionID field).

type TaskStartedPayload

type TaskStartedPayload struct {
	events.SafeSealed
	TaskID     TaskID
	PriorState TaskStatus
}

TaskStartedPayload reports MarkRunning. Carries the prior status (Pending or Paused) and the TaskID. SafePayload by construction.

type TaskStatus

type TaskStatus string

TaskStatus is the lifecycle state. Transitions are enforced by the driver; invalid transitions return `ErrInvalidTransition`.

const (
	// StatusPending is the initial state assigned by Spawn.
	StatusPending TaskStatus = "pending"
	// StatusRunning is the active-execution state.
	StatusRunning TaskStatus = "running"
	// StatusPaused is the pause-state for retain-turn / HITL flows.
	// Phase 21 layers retain-turn semantics on top; Phase 20 only
	// enforces the FSM transition Running → Paused → Running.
	StatusPaused TaskStatus = "paused"
	// StatusComplete is a terminal state — execution finished
	// successfully and `Task.Result` is populated.
	StatusComplete TaskStatus = "complete"
	// StatusFailed is a terminal state — execution finished
	// unsuccessfully and `Task.Error` is populated.
	StatusFailed TaskStatus = "failed"
	// StatusCancelled is a terminal state — `Cancel` was invoked
	// (possibly via cascade from a parent).
	StatusCancelled TaskStatus = "cancelled"
)

Task statuses.

type TaskSummary

type TaskSummary struct {
	ID        TaskID
	Status    TaskStatus
	Kind      TaskKind
	Priority  int
	UpdatedAt int64 // unix nanoseconds
}

TaskSummary is the projection returned by `List`. Compact by design — full Task records are loaded via `Get` when needed.

Directories

Path Synopsis
Package conformancetest exposes the canonical correctness suite every tasks.TaskRegistry driver must pass.
Package conformancetest exposes the canonical correctness suite every tasks.TaskRegistry driver must pass.
drivers
inprocess
Package inprocess is Harbor's V1 in-process TaskRegistry driver.
Package inprocess is Harbor's V1 in-process TaskRegistry driver.
Package protocol implements the two `tasks.*` read methods the Console Tasks page (Phase 73d / D-123) consumes:
Package protocol implements the two `tasks.*` read methods the Console Tasks page (Phase 73d / D-123) consumes:

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL