Documentation
¶
Overview ¶
Package tasks owns Harbor's unified task surface: the `TaskRegistry` interface every planner / runtime / steering caller uses to spawn, list, cancel, and prioritise both foreground runs and background tasks under one `TaskID` namespace.
Phase 20 ships the per-task surface (Spawn / Get / List / Cancel / Prioritize / Mark*); Phase 21 lays groups + retain-turn + patches on top. Bundling the whole TaskService into one phase would slow the wave-end E2E + delay the per-task surface that downstream phases (steering Phase 53, planner Phase 42) want as a stable foundation. The split is recorded in `docs/decisions.md` as D-030.
Lifecycle FSM (enforced at the driver):
PENDING ──Spawn──▶ RUNNING ──MarkComplete──▶ COMPLETE │ │ │ ├──MarkPaused──▶ PAUSED ──MarkResumed──▶ RUNNING │ │ │ └──MarkFailed──▶ FAILED (terminal) │ └──Cancel──▶ CANCELLED (terminal; valid from any non-terminal state)
Invalid transitions return `ErrInvalidTransition` (wrapped with the from/to states named in the message). Terminal-to-anything is invalid; same-state is invalid (no idempotent self-transitions on the driver — the runtime engine knows whether a transition is real before calling).
Idempotency. `Spawn` keys on `(SessionID, IdempotencyKey)`: same key → returns the existing `TaskHandle` with `Reused: true`; divergent `SpawnRequest` under the same key returns `ErrIdempotencyConflict`. Empty `IdempotencyKey` disables dedup entirely (each Spawn yields a fresh handle, no collisions).
Cancellation propagation. `Cancel` walks the children index per `Task.PropagateOnCancel`:
- `"cascade"` (default): BFS through descendants, transitioning each to `StatusCancelled`, emitting `task.cancelled` per cancel.
- `"isolate"`: only the target task transitions; children stay in their current state.
Identity. The triple `(tenant, user, session)` is mandatory at every API boundary (D-001). Empty tenant/user/session in `SpawnRequest.Identity` (or the ctx Identity for Get/Cancel) is rejected with `ErrIdentityRequired`. RunID is task-scoped: the foreground run IS a task; the background task has its own ID.
Persistence. Each lifecycle transition writes through the configured `state.StateStore` as `StateRecord{Kind: "task.lifecycle", Bytes: marshal(task)}`. The wrapper layer is the typed adapter per D-027 — opaque bytes go to the leaf store. Caller-side audit redaction runs against `Description`, `Query`, `Result`, and `Error` BEFORE Save (per D-020).
Bus events. Each lifecycle transition emits one of the registered `task.*` event types on the configured `events.EventBus`; payloads are typed (one struct per event type) and carry the `TaskID`, prior status (where applicable), and new status. Identity is on `Event.Identity` (the existing `Quadruple` field).
SpawnTool. The `SpawnTool` surface lifts from RFC §6.8 verbatim so the FSM models `task.tool` lifecycle today. Actual tool dispatch wiring lands at Phase 26+; in Phase 20 `SpawnTool` returns a `TaskHandle` whose execution body is a no-op stub — the task persists at `StatusPending` and never auto-advances.
Index ¶
- Constants
- Variables
- func Register(name string, factory Factory)
- func RegisteredDrivers() []string
- func ValidateRequest(req SpawnRequest) error
- func ValidateToolRequest(req SpawnToolRequest) error
- func WithRegistry(ctx context.Context, r TaskRegistry) context.Context
- type Dependencies
- type Factory
- type GroupAction
- type GroupCompletion
- type GroupRequest
- type MemberOutcome
- type Patch
- type PatchAction
- type SpawnRequest
- type SpawnToolRequest
- type Task
- type TaskBackgroundAcknowledgedPayload
- type TaskCancelledPayload
- type TaskCompletedPayload
- type TaskError
- type TaskFailedPayload
- type TaskFilter
- type TaskGroup
- type TaskGroupCancelledPayload
- type TaskGroupCreatedPayload
- type TaskGroupID
- type TaskGroupResolvedPayload
- type TaskGroupSealedPayload
- type TaskGroupStatus
- type TaskHandle
- type TaskID
- type TaskKind
- type TaskPatchAppliedPayload
- type TaskPatchRejectedPayload
- type TaskPausedPayload
- type TaskPrioritisedPayload
- type TaskRegistry
- type TaskResult
- type TaskResumedPayload
- type TaskSpawnedPayload
- type TaskStartedPayload
- type TaskStatus
- type TaskSummary
Constants ¶
const ( // EventTypeTaskSpawned — emitted by Spawn / SpawnTool when a // fresh task is persisted (NOT emitted on idempotency-key reuse). EventTypeTaskSpawned events.EventType = "task.spawned" // EventTypeTaskStarted — emitted by MarkRunning when a task // transitions from Pending or Paused → Running. EventTypeTaskStarted events.EventType = "task.started" // EventTypeTaskPaused — emitted by MarkPaused (Running → Paused). // No V1 production caller drives MarkPaused on the live pause path // (a paused run stays `running` in the task projection; pause state // travels on the `pause.*` events — see the Protocol pause model), // so the shipped runtime does not produce this type; the registry // driver contract and the conformance suite keep it canonical. EventTypeTaskPaused events.EventType = "task.paused" // EventTypeTaskResumed — emitted by MarkResumed (Paused → Running). // Distinct from task.started so subscribers can tell pause-resume // from initial start. Like task.paused, not produced on the V1 live // pause path — subscribe to `pause.resumed` for live resume signals. EventTypeTaskResumed events.EventType = "task.resumed" // EventTypeTaskCompleted — emitted by MarkComplete (Running → // Complete; terminal). EventTypeTaskCompleted events.EventType = "task.completed" // EventTypeTaskFailed — emitted by MarkFailed (Running → Failed; // terminal). EventTypeTaskFailed events.EventType = "task.failed" // EventTypeTaskCancelled — emitted by Cancel for the target task // AND each cascaded descendant. EventTypeTaskCancelled events.EventType = "task.cancelled" // EventTypeTaskPrioritised — emitted by Prioritize when the // task's priority value changes. EventTypeTaskPrioritised events.EventType = "task.prioritised" // EventTypeTaskGroupCreated — emitted by ResolveOrCreateGroup on // the first creation (NOT emitted on idempotent-return). EventTypeTaskGroupCreated events.EventType = "task.group_created" // EventTypeTaskGroupSealed — emitted by SealGroup / // ApplyGroup(ActionSeal) when the group transitions Open → Sealed. EventTypeTaskGroupSealed events.EventType = "task.group_sealed" // EventTypeTaskGroupResolved — emitted when a sealed group's last // non-terminal member transitions to terminal AND the group's // final status is `GroupCompleted`. Payload is `GroupCompletion` // — the SAME canonical shape `WatchGroup` delivers. EventTypeTaskGroupResolved events.EventType = "task.group_resolved" // EventTypeTaskGroupCancelled — emitted by CancelGroup or by the // driver's FailFast cascade when the group transitions to // `GroupCancelled`. Payload is `GroupCompletion` with // `FinalStatus = GroupCancelled` and `Reason` populated. EventTypeTaskGroupCancelled events.EventType = "task.group_cancelled" // EventTypeTaskPatchApplied — emitted by // ApplyPatch(action=PatchAccept) on the pending → applied // transition. EventTypeTaskPatchApplied events.EventType = "task.patch_applied" // EventTypeTaskPatchRejected — emitted by // ApplyPatch(action=PatchReject) on the pending → rejected // transition. EventTypeTaskPatchRejected events.EventType = "task.patch_rejected" // EventTypeTaskBackgroundAcknowledged — emitted once per task by // AcknowledgeBackground on the un-ack → ack transition. EventTypeTaskBackgroundAcknowledged events.EventType = "task.background_acknowledged" )
Task lifecycle event types. Each is registered with the events package's exhaustive registry via init() so Publish accepts them without ErrUnknownEventType. Registration follows the Phase 08 sessions precedent — the consumer subsystem registers its own events alongside its typed payloads, keeping the events package free of task-domain knowledge.
Phase 20 ships eight types: seven lifecycle transitions plus `task.prioritised` for caller-driven priority updates. Phase 21 adds seven more group/patch types — `task.group_created`, `task.group_sealed`, `task.group_resolved`, `task.group_cancelled`, `task.patch_applied`, `task.patch_rejected`, `task.background_acknowledged`. The `task.group_resolved` payload IS `GroupCompletion` so subscribers (planner runtime, Console, durable event log, sidecar status emitters) consume one canonical shape regardless of how they're wired.
const ( // PropagateCascade walks the children index in BFS order, // transitioning each descendant to StatusCancelled. The default. PropagateCascade = "cascade" // PropagateIsolate only transitions the target task; children // keep running. PropagateIsolate = "isolate" )
PropagateOnCancel controls how `Cancel` walks descendants.
const DefaultDriver = "inprocess"
DefaultDriver is the Phase 20 production driver name. Phase 87+ post-V1 work may register additional names; Open switches on `cfg.Driver`.
const GroupKind = "task.group"
GroupKind is the StateStore Kind constant for group lifecycle records.
const LifecycleKind = "task.lifecycle"
LifecycleKind is the StateStore Kind constant for task-lifecycle records. Centralised so callers / tests / Phase 60 Protocol mappers reference one symbol.
const PatchKind = "task.patch"
PatchKind is the StateStore Kind constant for patch records. Phase 21 persists pending / applied / rejected patch state through the same typed-wrapper-over-generic adapter the per-task surface uses (D-027).
Variables ¶
var ( // ErrGroupNotFound — Get / Seal / Cancel / Apply / WatchGroup // targeting a TaskGroupID that has no record (or the group is not // visible to the ctx identity). ErrGroupNotFound = errors.New("tasks: group not found") // ErrGroupSealed — mutation attempted on a sealed group (e.g. // spawning a new member into a sealed group). ErrGroupSealed = errors.New("tasks: group is sealed; cannot mutate membership") // ErrGroupNotSealed — `ApplyGroup(ActionResolve)` called on an // open (not-yet-sealed) group. ErrGroupNotSealed = errors.New("tasks: group must be sealed before resolve") // ErrGroupInvalidTransition — FSM transition not in the table // (e.g. Sealed → Open; Completed → Anything). ErrGroupInvalidTransition = errors.New("tasks: invalid group transition") // ErrPatchNotFound — `ApplyPatch` targeting an unknown patch ID. ErrPatchNotFound = errors.New("tasks: patch not found") )
Sentinel errors for the group + patch surface. Callers compare via `errors.Is`.
var ( // ErrNotFound — Get / Cancel / Prioritize / Mark* targeting a // TaskID that has no record (or the record is not visible to the // ctx identity). ErrNotFound = errors.New("tasks: task not found") // ErrInvalidTransition — Mark* called for a transition that is // not in the FSM table (e.g. Pending → Complete skipping Running). ErrInvalidTransition = errors.New("tasks: invalid lifecycle transition") // ErrIdempotencyConflict — Spawn called with a previously-seen // IdempotencyKey but a divergent SpawnRequest. Tells the caller // a retry policy bug exists upstream. ErrIdempotencyConflict = errors.New("tasks: idempotency key reused with divergent SpawnRequest") // ErrIdentityRequired — Spawn / Get / List / Cancel / Prioritize // / Mark* called with an Identity missing one of (tenant, user, // session). Identity is mandatory (D-001). ErrIdentityRequired = errors.New("tasks: identity required (tenant/user/session)") // ErrUnknownDriver — Open was asked for a driver name no // registered factory handles. ErrUnknownDriver = errors.New("tasks: unknown driver") // ErrRegistryClosed — any operation called after Close. ErrRegistryClosed = errors.New("tasks: registry is closed") // ErrInvalidRequest — SpawnRequest / SpawnToolRequest fails // structural validation (empty Kind, negative priority, unknown // PropagateOnCancel value, etc.). ErrInvalidRequest = errors.New("tasks: invalid request") )
Sentinel errors. Callers compare via errors.Is.
var ErrInvalidListFilter = errors.New("tasks: invalid tasks.list wire filter")
ErrInvalidListFilter is returned by ListFilterFromWire when the wire filter names an enum value outside the canonical task-status / task-kind sets. Callers compare with errors.Is.
Functions ¶
func Register ¶
Register installs a driver factory under name. Drivers self-register from their package init(); cmd/harbor blank-imports the production driver to trigger registration. Per AGENTS.md §4.4.
Re-registering the same name panics — the registration model is write-once-at-init and a duplicate signals a build mis-configuration.
func RegisteredDrivers ¶
func RegisteredDrivers() []string
RegisteredDrivers returns a sorted list of driver names. Useful for boot-log output and surfacing in error messages.
func ValidateRequest ¶
func ValidateRequest(req SpawnRequest) error
ValidateRequest checks structural invariants Spawn needs before touching driver storage: identity triple present, Kind known, PropagateOnCancel known (or empty for the default).
func ValidateToolRequest ¶
func ValidateToolRequest(req SpawnToolRequest) error
ValidateToolRequest mirrors ValidateRequest for SpawnToolRequest.
func WithRegistry ¶
func WithRegistry(ctx context.Context, r TaskRegistry) context.Context
WithRegistry attaches r to ctx so downstream handlers can recover it via MustFrom or From.
Types ¶
type Dependencies ¶
type Dependencies struct {
// Store is the StateStore used to persist task lifecycle records
// (D-027 typed-wrapper-over-generic). Required.
Store state.StateStore
// Bus is the EventBus where lifecycle events land. Required.
Bus events.EventBus
// Redactor is the audit redactor applied to Description / Query
// / Result / Error BEFORE Save (D-020). Required; wiring code
// passes the global redactor here.
Redactor audit.Redactor
// Cfg carries Phase 20's TasksConfig (driver name today; Phase
// 21 adds RetainTurnTimeout + ContinuationHopLimit).
Cfg config.TasksConfig
}
Dependencies bundles the wiring inputs every TaskRegistry driver needs. Sessions / state / events are required; the redactor and config are passed through verbatim. Wiring lives in `cmd/harbor` (or test helpers); the registry never reaches into ctx for these.
type Factory ¶
type Factory func(deps Dependencies) (TaskRegistry, error)
Factory builds a TaskRegistry from a Dependencies struct. Drivers expose one Factory each via init() → Register.
type GroupAction ¶
type GroupAction string
GroupAction is the verb a caller passes to `ApplyGroup`.
const ( // ActionSeal seals an open group, freezing membership. Equivalent // to `SealGroup`. Idempotent on an already-sealed group's no-op // path; invalid on a terminal group (returns // `ErrGroupInvalidTransition`). ActionSeal GroupAction = "seal" // ActionCancel cancels a non-terminal group (propagate = true). // Equivalent to `CancelGroup(reason="action:cancel", propagate=true)`. ActionCancel GroupAction = "cancel" // ActionResolve marks a sealed group as `Completed` when the // caller knows all members are done (used by drivers that defer // resolution to an external signal — Phase 21's in-process driver // resolves automatically when all members are terminal, so this // action is rarely needed but exposed for symmetry with the brief // 05 surface). ActionResolve GroupAction = "resolve" )
Group actions.
type GroupCompletion ¶
type GroupCompletion struct {
GroupID TaskGroupID
SessionID identity.Identity
OwnerTaskID TaskID
FinalStatus TaskGroupStatus // GroupCompleted | GroupCancelled
ResolvedAt time.Time
Members []MemberOutcome
Reason string // populated for GroupCancelled (cancel reason); empty otherwise
}
GroupCompletion is the typed wake-up payload delivered by `WatchGroup` (and as the `task.group_resolved` bus-event payload). Carries the group's terminal status, the resolve timestamp, the cancel reason (populated when `FinalStatus == GroupCancelled`), and a `MemberOutcome` per group member.
Discipline: `MemberOutcome.Result` is ref-shaped. Heavy results upstream MUST already have been substituted with `ArtifactRef`s (D-022, D-026); the payload is NOT byte-bound. A `TaskResult.Value` carrying inline bytes above the heavy-output threshold is a leak — the LLM-edge enforcement pass will fail loudly. The conformance suite's `WatchGroup_Push_DeliversCompletionPayload` verifies the ref-shaped contract by stuffing an ArtifactRef-shaped JSON into a member result and asserting it round-trips through the payload unchanged.
Concurrent reuse: multiple `WatchGroup` subscribers on the same group all receive the same payload (D-025). The driver fans out the payload to each subscriber's buffered-size-1 channel without blocking.
type GroupRequest ¶
type GroupRequest struct {
ID TaskGroupID
SessionID identity.Identity
OwnerTaskID TaskID
RetainTurn bool
FailFast bool
Description string
}
GroupRequest is the input shape for `ResolveOrCreateGroup`. `ID` is optional — empty → the registry assigns a fresh ULID and returns the new group; non-empty + already-existing → the existing group is returned (idempotency).
Identity is mandatory.
type MemberOutcome ¶
type MemberOutcome struct {
TaskID TaskID
Status TaskStatus
Result *TaskResult
Error *TaskError
}
MemberOutcome is the per-task terminal record carried inside `GroupCompletion`. Either `Result` is populated (when `Status == StatusComplete`) or `Error` is populated (when `Status == StatusFailed`); neither is populated when `Status == StatusCancelled`.
type Patch ¶
type Patch struct {
ID string
SessionID identity.Identity
Status string // "pending" | "applied" | "rejected"
Bytes []byte
CreatedAt time.Time
UpdatedAt time.Time
}
Patch is the persisted patch record. The `Bytes` slot carries the caller-shaped patch payload; the registry does not interpret it. Identity is mandatory.
type PatchAction ¶
type PatchAction string
PatchAction is the verb a caller passes to `ApplyPatch`. Patches transition `pending → applied | rejected` through the registry. The patch payload is opaque bytes — the actual context-patch shape lives at the planner (Phase 42+); Phase 21 stores + retrieves; the planner consumes.
const ( // PatchAccept transitions a pending patch to `applied`. PatchAccept PatchAction = "accept" // PatchReject transitions a pending patch to `rejected`. PatchReject PatchAction = "reject" )
Patch actions.
type SpawnRequest ¶
type SpawnRequest struct {
Identity identity.Quadruple
Kind TaskKind
ParentTaskID *TaskID
Description string
Query string
Priority int
IdempotencyKey string
PropagateOnCancel string
NotifyOnComplete bool
GroupID TaskGroupID
// InputArtifactIDs are operator-uploaded multimodal inputs the
// task carries onto its first planner turn (Round-7 F11 / D-166).
// Persisted onto `Task.InputArtifactIDs`; consumed by the run
// loop's first-turn materializer. Empty is the text-only default.
InputArtifactIDs []string
}
SpawnRequest is the input shape for `Spawn`. Identity is mandatory. `IdempotencyKey` is namespaced by `Identity.SessionID`: same key across different sessions creates two distinct tasks.
`PropagateOnCancel` defaults to "cascade" when empty; "isolate" is opt-in for tasks that must survive a parent's cancellation.
`GroupID` (Phase 21, optional) wires the new task into an existing `TaskGroup`. The driver verifies the group is `Open` (sealed or terminal groups reject with `ErrGroupSealed`) and registers the new task as a member. Empty `GroupID` is the default — most foreground turns aren't group members.
type SpawnToolRequest ¶
type SpawnToolRequest struct {
Identity identity.Quadruple
ParentTaskID *TaskID
ToolName string
ToolArgs json.RawMessage
Description string
Priority int
IdempotencyKey string
PropagateOnCancel string
NotifyOnComplete bool
GroupID TaskGroupID
}
SpawnToolRequest is the input shape for `SpawnTool`. The shape lifts from RFC §6.8 verbatim so the FSM models tool-task lifecycle today; actual tool dispatch wiring lands at Phase 26.
Phase 20's `SpawnTool` execution body is a no-op stub: the task is persisted at `StatusPending` and never auto-advances. The runtime engine (Phase 26) drives the lifecycle once dispatch is wired.
`GroupID` (Phase 21, optional) wires the new tool task into an existing `TaskGroup`. See `SpawnRequest.GroupID` for the contract.
type Task ¶
type Task struct {
ID TaskID
Identity identity.Quadruple
Kind TaskKind
Status TaskStatus
Priority int
ParentTaskID *TaskID
Description string
Query string
Result *TaskResult
Error *TaskError
PropagateOnCancel string
NotifyOnComplete bool
IdempotencyKey string
CreatedAt int64 // unix nanoseconds; matches sessions / events convention
UpdatedAt int64 // unix nanoseconds
// ToolCount is the running count of tool dispatches the runtime
// has performed against this task. Advanced exclusively through
// `TaskRegistry.IncrementToolCount` — never set directly by callers
// (Phase 83m item 7). Projected to `prototypes.TaskRow.ToolCount`
// for the Console Tasks page.
ToolCount int
// InputArtifactIDs carry operator-uploaded multimodal inputs the
// run consumes on its first planner turn (Round-7 F11 / D-166).
// The run loop materializes these into `RunContext.InputArtifacts`
// via the per-MIME dispatcher: image bytes inline as
// `ImagePart.DataURL`; everything else stays as an `ArtifactStub`
// the LLM routes to a matching tool through the tool catalog. Empty
// is the common case — text-only turns.
InputArtifactIDs []string
}
Task is the persisted lifecycle record for one task. The Identity quadruple is captured immutably on Spawn; the runtime engine drives state transitions via the registry's Mark* methods.
Group/patch fields are reserved for Phase 21 — the surface is intentionally narrow at Phase 20 so the Phase 21 PR adds those fields against a stable shape.
type TaskBackgroundAcknowledgedPayload ¶
type TaskBackgroundAcknowledgedPayload struct {
events.SafeSealed
TaskID TaskID
}
TaskBackgroundAcknowledgedPayload reports AcknowledgeBackground for a single task (one event per task). SafePayload by construction.
type TaskCancelledPayload ¶
type TaskCancelledPayload struct {
events.SafeSealed
TaskID TaskID
Reason string
Cascaded bool
}
TaskCancelledPayload reports a Cancel transition. `Cascaded` is true when this payload landed because of a parent's cascade-cancel (so subscribers can distinguish operator cancel from cascade). `Reason` is the operator-supplied reason (a short caller-controlled string; callers MUST NOT pass tool args, raw user input, or any secret-shaped material — the same SafePayload contract sessions uses for ClosedReason).
SafePayload by construction.
type TaskCompletedPayload ¶
type TaskCompletedPayload struct {
events.SafeSealed
TaskID TaskID
}
TaskCompletedPayload reports MarkComplete. The result is on the Task record itself; this payload only signals the transition so subscribers do not see an unredacted result by accident. The caller pre-redacts `TaskResult.Value` before MarkComplete (D-020).
SafePayload by construction.
type TaskError ¶
TaskError carries the failure payload. `Code` is a caller-defined short string; `Message` is the human-readable explanation, also pre-redacted by the caller.
type TaskFailedPayload ¶
type TaskFailedPayload struct {
events.SafeSealed
TaskID TaskID
ErrorCode string
}
TaskFailedPayload reports MarkFailed. Carries the error code; the caller-controlled message is on the Task record (already redacted by the caller). SafePayload by construction.
type TaskFilter ¶
type TaskFilter struct {
Status *TaskStatus
Kind *TaskKind
ParentID *TaskID
}
TaskFilter is the read-side filter for `List`.
Empty pointer fields are wildcards: a zero-valued `TaskFilter` returns every task in the session.
func ListFilterFromWire ¶
func ListFilterFromWire(wire *prototypes.TaskFilter) (TaskFilter, error)
ListFilterFromWire translates the Protocol-layer `types.TaskFilter` into the runtime-internal `tasks.TaskFilter` the `TaskRegistry.List` method consumes. It narrows the registry filter ONLY on the facets the single-valued registry `TaskFilter` can express (a one-element status / kind set, and the parent-task pointer); every richer facet is left to the Protocol-layer `filterMatches` pass.
The function is pure: it allocates and returns a fresh `tasks.TaskFilter`, reads only its argument, and holds no state. It is therefore trivially safe for concurrent use by N goroutines (D-025) — there is no reusable artifact to leak.
type TaskGroup ¶
type TaskGroup struct {
ID TaskGroupID
SessionID identity.Identity
OwnerTaskID TaskID
Status TaskGroupStatus
RetainTurn bool
FailFast bool
Members []TaskID
Description string
CreatedAt time.Time
UpdatedAt time.Time
ResolvedAt *time.Time
}
TaskGroup is the persisted group record. The Identity is captured from the owning session at create time; cross-session group membership is forbidden (V1; nesting / cross-session lands post-V1 if a planner concrete genuinely needs it — brief 05).
`Members` is the in-order list of member task IDs assigned during the `Open` phase via the spawn-with-GroupID seam (Phase 26+ wires `SpawnTool`'s group hookup). Phase 21 ships the group surface; the member-assignment wiring is exercised by the conformance suite directly through the new `AddMember` driver helper.
`RetainTurn` is the foreground-blocking flag — when true, the owning session blocks foreground-turn dispatch until the group reaches a terminal state.
`FailFast` cancels remaining members when the first member fails. `ResolvedAt` is non-nil only on terminal status.
type TaskGroupCancelledPayload ¶
type TaskGroupCancelledPayload struct {
events.SafeSealed
Completion GroupCompletion
}
TaskGroupCancelledPayload reports a CancelGroup transition (or a FailFast cascade-cancel). Same `GroupCompletion` shape as resolved, with `FinalStatus = GroupCancelled` and `Reason` populated.
SafePayload by construction.
type TaskGroupCreatedPayload ¶
type TaskGroupCreatedPayload struct {
events.SafeSealed
GroupID TaskGroupID
OwnerTaskID TaskID
RetainTurn bool
FailFast bool
Description string
}
TaskGroupCreatedPayload reports a successful ResolveOrCreateGroup on the first-creation path. Carries the assigned group ID, owner task, retain-turn / fail-fast flags, and the description. SafePayload by construction — the description is a caller-controlled short string with the same SafePayload contract as session.opened's `ClosedReason`.
type TaskGroupID ¶
type TaskGroupID string
TaskGroupID is the ULID-shaped identifier for a `TaskGroup`. The caller MAY pre-assign in `GroupRequest.ID` (idempotency); empty → the registry assigns a fresh ULID.
type TaskGroupResolvedPayload ¶
type TaskGroupResolvedPayload struct {
events.SafeSealed
Completion GroupCompletion
}
TaskGroupResolvedPayload reports a sealed group's terminal transition to `GroupCompleted`. The payload doubles as the `WatchGroup` wake-up payload — same canonical `GroupCompletion` shape so subscribers (Console, planner, sidecar status emitters) consume one shape regardless of how they're wired.
SafePayload by construction. `MemberOutcome.Result` is ref-shaped (D-022, D-026); heavy bytes should already be ArtifactRefs upstream.
type TaskGroupSealedPayload ¶
type TaskGroupSealedPayload struct {
events.SafeSealed
GroupID TaskGroupID
Members []TaskID
SealedAt int64 // unix nanoseconds
}
TaskGroupSealedPayload reports a SealGroup transition. SafePayload by construction.
type TaskGroupStatus ¶
type TaskGroupStatus string
TaskGroupStatus is the group lifecycle state. The FSM lives at the driver:
Open ──Seal──▶ Sealed ──(all members terminal)──▶ Completed │ │ │ └──Cancel──▶ Cancelled └──Cancel──▶ Cancelled (valid from Open too)
`Completed` and `Cancelled` are terminal. Invalid transitions return `ErrGroupInvalidTransition`.
const ( // GroupOpen is the initial state — members may be added. GroupOpen TaskGroupStatus = "open" // GroupSealed freezes membership; the group still has non-terminal // members. Sealed groups CANNOT accept new members // (`SpawnTool` / `Spawn` carrying a `GroupID` for a sealed group // returns `ErrGroupSealed`). GroupSealed TaskGroupStatus = "sealed" // GroupCompleted is the terminal-success state — the group was // sealed AND every member reached terminal state without // triggering a `FailFast` cancellation. GroupCompleted TaskGroupStatus = "completed" // GroupCancelled is the terminal-failure state — the group was // cancelled by `CancelGroup`, or `FailFast` was true and a member // transitioned to `StatusFailed`. GroupCancelled TaskGroupStatus = "cancelled" )
Group statuses.
type TaskHandle ¶
TaskHandle is the return shape of `Spawn` / `SpawnTool`. `Reused` is true when an idempotency-key match returned an existing handle.
type TaskID ¶
type TaskID string
TaskID is the unified identifier covering both foreground runs and background tasks (brief 05 §1). ULID-shaped at construction time; the registry assigns the value, callers do not.
type TaskKind ¶
type TaskKind string
TaskKind distinguishes a foreground task (a run inside a session's primary turn) from a background task (a spawned-without-blocking task). Both share the same TaskID namespace; this field is the discriminator.
const ( // KindForeground is the kind for a run inside a session's primary // turn — what the predecessor called a "trace_id" lives here under // the unified TaskID. KindForeground TaskKind = "foreground" // KindBackground is the kind for a task spawned without blocking // the parent run. KindBackground TaskKind = "background" )
Task kinds.
type TaskPatchAppliedPayload ¶
type TaskPatchAppliedPayload struct {
events.SafeSealed
PatchID string
}
TaskPatchAppliedPayload reports ApplyPatch(action=PatchAccept). Carries the patch ID; the patch payload bytes are on the Patch record (already through the audit redactor; we do NOT inline the bytes here). SafePayload by construction.
type TaskPatchRejectedPayload ¶
type TaskPatchRejectedPayload struct {
events.SafeSealed
PatchID string
}
TaskPatchRejectedPayload reports ApplyPatch(action=PatchReject). SafePayload by construction.
type TaskPausedPayload ¶
type TaskPausedPayload struct {
events.SafeSealed
TaskID TaskID
}
TaskPausedPayload reports MarkPaused. SafePayload by construction.
type TaskPrioritisedPayload ¶
type TaskPrioritisedPayload struct {
events.SafeSealed
TaskID TaskID
PriorPriority int
NewPriority int
}
TaskPrioritisedPayload reports Prioritize. Carries the new and prior priority values. SafePayload by construction.
type TaskRegistry ¶
type TaskRegistry interface {
// Spawn creates a new task or returns the existing handle when an
// idempotency-key match is found. Returns `ErrIdentityRequired`
// when the request's identity triple is incomplete.
Spawn(ctx context.Context, req SpawnRequest) (TaskHandle, error)
// SpawnTool creates a task representing a tool invocation. Phase
// 20 ships the surface; tool dispatch wiring lands at Phase 26+
// — the persisted task stays at `StatusPending` until then.
SpawnTool(ctx context.Context, req SpawnToolRequest) (TaskHandle, error)
// Get loads the task with `id`. Returns `ErrNotFound` (wrapped)
// when no record exists or the task is not visible to the ctx
// identity (cross-tenant / cross-session reads are rejected).
Get(ctx context.Context, id TaskID) (*Task, error)
// List returns task summaries for the given session matching `f`.
// Empty pointer fields in `f` are wildcards.
List(ctx context.Context, sessionID identity.Identity, f TaskFilter) ([]TaskSummary, error)
// Cancel transitions the task to `StatusCancelled`. The descendant
// walk depends on the task's `PropagateOnCancel`:
//
// - "cascade" (default): BFS through children, each emitting
// `task.cancelled`.
// - "isolate": only the target transitions.
//
// Returns (true, nil) when the task transitioned; (false, nil)
// when the task was already terminal (Cancel is idempotent on
// already-terminal states). Returns `ErrNotFound` when the task
// does not exist.
Cancel(ctx context.Context, id TaskID, reason string) (bool, error)
// Prioritize updates the task's `Priority`. Phase 20 stores the
// value but does not preempt or reorder execution — scheduling is
// the runtime engine's concern. Emits `task.prioritised`.
//
// Returns (true, nil) when the priority changed; (false, nil)
// when the value matched (no-op write).
Prioritize(ctx context.Context, id TaskID, priority int) (bool, error)
// MarkRunning transitions Pending or Paused → Running. Invalid
// transitions return `ErrInvalidTransition`.
MarkRunning(ctx context.Context, id TaskID) error
// MarkPaused transitions Running → Paused.
MarkPaused(ctx context.Context, id TaskID) error
// MarkResumed transitions Paused → Running. Distinct method (vs
// MarkRunning) so the bus event can be `task.resumed` rather than
// `task.started`.
MarkResumed(ctx context.Context, id TaskID) error
// MarkComplete transitions Running → Complete. Persists `result`
// on the Task record; emits `task.completed`.
MarkComplete(ctx context.Context, id TaskID, result TaskResult) error
// MarkFailed transitions Running → Failed. Persists `err` on the
// Task record; emits `task.failed`.
MarkFailed(ctx context.Context, id TaskID, err TaskError) error
// IncrementToolCount atomically increments `Task.ToolCount` by 1
// and persists the updated record (Phase 83m item 7). NOT
// idempotent — every call increments. The new value is reflected
// on the next `Get` / `List` projection (`prototypes.TaskRow.ToolCount`).
//
// Returns `ErrNotFound` when the task does not exist or is not
// visible to the ctx identity; `ErrRegistryClosed` after Close.
// Does NOT change the task's FSM status — runs against tasks in
// any non-terminal state. Terminal tasks (Complete / Failed /
// Cancelled) still accept increments (the runloop's late-arriving
// tool dispatches against a cancelled run can still be counted);
// the storage write is unconditional.
//
// The runloop calls this from its CallTool dispatch path once the
// ToolExecutor returns without error — that is the only documented
// producer in V1.
IncrementToolCount(ctx context.Context, id TaskID) error
// ResolveOrCreateGroup is the idempotent group constructor. Empty
// `GroupRequest.ID` → registry assigns a fresh ULID. Non-empty +
// already-existing → the existing group is returned unchanged.
// Identity is mandatory; cross-session reuse of an ID is rejected
// with `ErrGroupNotFound` (existence-without-access).
//
// Emits `task.group_created` on the first creation; the no-op
// idempotent return does NOT re-emit.
ResolveOrCreateGroup(ctx context.Context, req GroupRequest) (*TaskGroup, error)
// SealGroup transitions an open group to `GroupSealed`, freezing
// membership. Sealed groups still have non-terminal members; the
// driver resolves the group automatically when the last member
// transitions to terminal.
//
// Invalid transitions (already sealed; terminal) return
// `ErrGroupInvalidTransition`. Emits `task.group_sealed`.
SealGroup(ctx context.Context, id TaskGroupID) error
// CancelGroup transitions a non-terminal group to `GroupCancelled`
// and (when `propagate=true`) cancels every non-terminal member
// task. The `reason` is a short caller-controlled string (same
// `SafePayload` contract as `Cancel`'s reason).
//
// Emits `task.group_cancelled` carrying the canonical
// `GroupCompletion` payload (so `WatchGroup` subscribers receive
// the cancel-with-reason as a single typed delivery).
CancelGroup(ctx context.Context, id TaskGroupID, reason string, propagate bool) error
// ApplyGroup is the action-verb wrapper over `SealGroup` /
// `CancelGroup` / explicit resolve. Convenience for callers that
// dispatch by enum.
//
// - ActionSeal → SealGroup
// - ActionCancel → CancelGroup(reason="action:cancel", propagate=true)
// - ActionResolve → mark sealed → completed (errors with
// `ErrGroupNotSealed` on an open group)
ApplyGroup(ctx context.Context, id TaskGroupID, action GroupAction) error
// ListGroups returns the groups owned by `sessionID` matching the
// optional `status` filter (nil = wildcard). Empty list + nil
// error is the no-groups case; missing-identity returns
// `ErrIdentityRequired`.
ListGroups(ctx context.Context, sessionID identity.Identity, status *TaskGroupStatus) ([]TaskGroup, error)
// ApplyPatch transitions a pending patch through
// `pending → applied | rejected`. Returns `(true, nil)` when the
// transition occurred; `(false, nil)` when the patch was already
// in the target terminal state (idempotent). Returns
// `ErrPatchNotFound` on a missing patch ID.
//
// Patches are persisted through StateStore under `Kind =
// "task.patch"`. The patch payload is opaque bytes (D-027); the
// actual context-patch shape lives at the planner (Phase 42+).
//
// Emits `task.patch_applied` or `task.patch_rejected` on a real
// transition; no re-emit on the no-op path.
ApplyPatch(ctx context.Context, sessionID identity.Identity, patchID string, action PatchAction) (bool, error)
// AcknowledgeBackground marks completed background tasks as
// user-acknowledged. Returns the count of tasks that transitioned
// from un-acknowledged → acknowledged (idempotent on a re-ack).
//
// Emits one `task.background_acknowledged` event per task on the
// real-transition path. Unknown task IDs are silently skipped (no
// error; the count reflects only the real ack transitions).
AcknowledgeBackground(ctx context.Context, sessionID identity.Identity, ids []TaskID) (int, error)
// RegisterRetainTurnWaiter returns a channel that closes when the
// session's earliest-active retain-turn group resolves, and a
// cancel func that unsubscribes. The runtime engine consumes the
// closed channel as the "all retain-turn groups have resolved"
// signal so foreground-turn dispatch can resume.
//
// Buffered size 1 — the resolve path delivers the resolved
// group's ID without blocking even if the consumer is slow.
// Channel close is the termination signal; the optional payload
// is the ID of the group whose terminal transition triggered the
// wake.
//
// Implementations are required to close the channel exactly once.
RegisterRetainTurnWaiter(sessionID identity.Identity) (<-chan TaskGroupID, func())
// WatchGroup is the non-retain-turn dual of
// `RegisterRetainTurnWaiter`: it does NOT block any foreground
// turn — the planner is free to proceed while the group runs in
// the background. When the group reaches a terminal state, the
// runtime delivers a typed `GroupCompletion` payload on the
// returned channel and closes it.
//
// Callers typically use this as a "wake the planner" signal so
// background results integrate back into the conversation; see
// the "Wake policy modes" godoc in `groups.go` for the three
// patterns (push, poll, hybrid) the planner runtime can implement
// against this single mechanism.
//
// Returns `ErrGroupNotFound` when the group is unknown at
// registration time (e.g. resolved + GC'd). For a
// resolved-but-still-tracked group, the implementation returns a
// channel that is *already* primed with the cached
// `GroupCompletion` (so late subscribers don't deadlock).
//
// The cancel func unsubscribes; calling it after a delivery is a
// no-op. The channel is closed exactly once — either by the
// resolve path (with a delivery) or by the cancel path (without
// a delivery).
//
// Concurrent reuse: multiple subscribers on the same group all
// receive the same payload (D-025).
WatchGroup(sessionID identity.Identity, groupID TaskGroupID) (<-chan GroupCompletion, func(), error)
// Close releases registry resources. Subsequent operations return
// `ErrRegistryClosed`. Idempotent.
Close(ctx context.Context) error
}
TaskRegistry is the orchestration surface for the task subsystem.
Implementations MUST be safe for concurrent use by N goroutines against a single shared instance (D-025). Mutable state must be guarded; per-call state lives in `ctx`, never on the driver.
The Mark* methods are the lifecycle drive-points called by the runtime engine; Cancel / Prioritize are caller-initiated (planner, steering, Console).
func From ¶
func From(ctx context.Context) (TaskRegistry, bool)
From returns the TaskRegistry in ctx and a presence bool. Use when absence is recoverable.
func MustFrom ¶
func MustFrom(ctx context.Context) TaskRegistry
MustFrom returns the TaskRegistry in ctx; panics with ErrRegistryClosed (used as the sentinel for "no registry configured") when none is present. Use in handler/runtime paths where a registry is mandatory.
func Open ¶
func Open(_ context.Context, deps Dependencies) (TaskRegistry, error)
Open returns the TaskRegistry built by the factory whose name matches deps.Cfg.Driver (defaults to DefaultDriver when empty).
func OpenDriver ¶
func OpenDriver(name string, deps Dependencies) (TaskRegistry, error)
OpenDriver opens a specific driver by name; useful for tests that want to exercise the registry against a non-default driver.
type TaskResult ¶
type TaskResult struct {
Value json.RawMessage
}
TaskResult carries the successful-completion payload. `Value` is pre-redacted by the caller (D-020); the registry stores it verbatim.
Phase 106 (V1.2) pins the answer-envelope contract: when the run-loop driver (cmd/harbor/cmd_dev_runloop.go::handleSpawn) produces TaskResult from a planner.Finish, `Value` is the JSON encoding of:
{
"answer": string, // the LLM's natural-language answer
"finish_reason": string, // planner.FinishReason as string
"tool_calls_seen": int // len(traj.Steps) at finish
}
Consumers (Console Playground, CLI, third-party UIs) MAY rely on this shape. Future planners that return richer answers (markdown structure, multimodal) will EXTEND the shape with new keys, never break existing ones (forward-compatible additive evolution).
type TaskResumedPayload ¶
type TaskResumedPayload struct {
events.SafeSealed
TaskID TaskID
}
TaskResumedPayload reports MarkResumed. SafePayload by construction.
type TaskSpawnedPayload ¶
type TaskSpawnedPayload struct {
events.SafeSealed
TaskID TaskID
Kind TaskKind
ParentTaskID TaskID // empty when no parent
Priority int
IdempotencyKey string
}
TaskSpawnedPayload reports a successful Spawn / SpawnTool. Carries the assigned TaskID, the kind, and the parent (when any). Identity lives on the Event itself, intentionally not duplicated here.
SafePayload by construction — every field is internal bookkeeping (TaskID is a registry-assigned ULID; Kind is an enum; ParentID is caller-controlled but bounded; IdempotencyKey is caller-controlled and may carry caller-meaningful tokens, but task identifiers are not secret-shaped — same threat model as session.opened's SessionID field).
type TaskStartedPayload ¶
type TaskStartedPayload struct {
events.SafeSealed
TaskID TaskID
PriorState TaskStatus
}
TaskStartedPayload reports MarkRunning. Carries the prior status (Pending or Paused) and the TaskID. SafePayload by construction.
type TaskStatus ¶
type TaskStatus string
TaskStatus is the lifecycle state. Transitions are enforced by the driver; invalid transitions return `ErrInvalidTransition`.
const ( // StatusPending is the initial state assigned by Spawn. StatusPending TaskStatus = "pending" // StatusRunning is the active-execution state. StatusRunning TaskStatus = "running" // StatusPaused is the pause-state for retain-turn / HITL flows. // Phase 21 layers retain-turn semantics on top; Phase 20 only // enforces the FSM transition Running → Paused → Running. StatusPaused TaskStatus = "paused" // StatusComplete is a terminal state — execution finished // successfully and `Task.Result` is populated. StatusComplete TaskStatus = "complete" // StatusFailed is a terminal state — execution finished // unsuccessfully and `Task.Error` is populated. StatusFailed TaskStatus = "failed" // StatusCancelled is a terminal state — `Cancel` was invoked // (possibly via cascade from a parent). StatusCancelled TaskStatus = "cancelled" )
Task statuses.
type TaskSummary ¶
type TaskSummary struct {
ID TaskID
Status TaskStatus
Kind TaskKind
Priority int
UpdatedAt int64 // unix nanoseconds
}
TaskSummary is the projection returned by `List`. Compact by design — full Task records are loaded via `Get` when needed.
Directories
¶
| Path | Synopsis |
|---|---|
|
Package conformancetest exposes the canonical correctness suite every tasks.TaskRegistry driver must pass.
|
Package conformancetest exposes the canonical correctness suite every tasks.TaskRegistry driver must pass. |
|
drivers
|
|
|
inprocess
Package inprocess is Harbor's V1 in-process TaskRegistry driver.
|
Package inprocess is Harbor's V1 in-process TaskRegistry driver. |
|
Package protocol implements the two `tasks.*` read methods the Console Tasks page (Phase 73d / D-123) consumes:
|
Package protocol implements the two `tasks.*` read methods the Console Tasks page (Phase 73d / D-123) consumes: |