Documentation
¶
Overview ¶
Package tasks provides hierarchical task trees with progress tracking, output streaming, and both request-scoped and shared (broadcast) task systems for the github.com/marrasen/aprot framework.
Overview ¶
Tasks let a server report structured, real-time progress to connected clients while a handler executes long-running work. A task is a tree of nodes, each with a title, status (Created → Running → Completed/Failed), optional numeric progress, typed metadata, and streamed text output.
There are two flavours of task:
- Request-scoped tasks are tied to a single RPC call and visible only to the client that made the request.
- Shared tasks are broadcast to every connected client and survive after the originating handler returns.
Enabling the task system ¶
Register the task system during server setup:
tasks.Enable(registry) // no typed metadata tasks.EnableWithMeta[MyMeta](registry) // with typed metadata
This registers the CancelTask handler, push event types, and the middleware that wires task delivery into every request context.
Request-scoped tasks ¶
Create a task inside a handler. It auto-completes when the handler returns nil, or auto-fails when the handler returns an error:
func (h *Handler) Import(ctx context.Context, req ImportReq) (*tasks.TaskRef, error) {
ctx, task := tasks.StartTask[MyMeta](ctx, "Importing data")
task.SetMeta(MyMeta{Filename: req.File})
// Nest child work with SubTask:
err := tasks.SubTask(ctx, "Validating", func(ctx context.Context) error {
tasks.TaskProgress(ctx, 0, 100)
// ...
tasks.TaskProgress(ctx, 100, 100)
return nil
})
return nil, err // task auto-completes / auto-fails
}
The client receives RequestTaskTreeEvent, RequestTaskOutputEvent, and RequestTaskProgressEvent push events scoped to its request.
Shared (broadcast) tasks ¶
Pass the Shared option to make the task visible to all clients:
ctx, task := tasks.StartTask[MyMeta](ctx, "Building index", tasks.Shared())
All clients receive TaskStateEvent and TaskUpdateEvent push events. Each client sees an IsOwner flag indicating whether it started the task.
To let a shared task outlive the client connection, use context.WithoutCancel:
ctx, task := tasks.StartTask[MyMeta](
context.WithoutCancel(ctx), "Background job", tasks.Shared(),
)
SharedSubTask bridge ¶
SharedSubTask creates a shared task that also routes nested SubTask, Output, and TaskProgress calls through the shared delivery system. If no connection or task manager is available, it falls back to a regular SubTask:
err := tasks.SharedSubTask(ctx, "Sync accounts", func(ctx context.Context) error {
return tasks.SubTask(ctx, "Fetching", func(ctx context.Context) error {
tasks.Output(ctx, "fetched 42 records")
return nil
})
})
Progress reporting ¶
TaskProgress sets absolute progress on the current task node. StepTaskProgress increments the current value by a delta, which is convenient inside loops:
for i, item := range items {
process(item)
tasks.StepTaskProgress(ctx, 1)
}
Output streaming ¶
Output sends a text message attached to the nearest task node. OutputWriter and WriterProgress return an io.WriteCloser that creates a child task node, sending each Write as output or tracking bytes written as progress:
w := tasks.OutputWriter(ctx, "Build log") cmd.Stdout = w cmd.Run() w.Close()
Task and TaskSub types ¶
StartTask returns a *Task handle for the root node. Use it to set metadata, report progress, stream output, create sub-tasks, or manually close/fail the task. Task.SubTask returns a *TaskSub handle for child nodes with the same capabilities.
TypeScript client ¶
The generated TypeScript client provides hooks and helpers:
- React: useSharedTasks, useMyTasks, useSharedTask(id), useTaskOutput(taskId), cancelSharedTask(client, taskId)
- Vanilla: TaskStateEvent, TaskUpdateEvent push event types
- Request-scoped: onTaskProgress and onOutput callbacks in RequestOptions
Cancellation ¶
Clients can cancel shared tasks via the generated CancelTask handler. On the server side, use CancelSharedTask directly.
Index ¶
- func CancelSharedTask(ctx context.Context, taskID string) error
- func Enable(r *aprot.Registry)
- func EnableWithMeta[M any](r *aprot.Registry)
- func Output(ctx context.Context, msg string)
- func OutputWriter(ctx context.Context, title string) io.WriteCloser
- func SharedSubTask(ctx context.Context, title string, fn func(ctx context.Context) error) error
- func StepTaskProgress(ctx context.Context, step int)
- func SubTask(ctx context.Context, title string, fn func(ctx context.Context) error) error
- func TaskProgress(ctx context.Context, current, total int)
- func WriterProgress(ctx context.Context, title string, size int) io.WriteCloser
- type RequestTaskOutputEvent
- type RequestTaskProgressEvent
- type RequestTaskTreeEvent
- type SharedTaskState
- type Task
- func (t *Task[M]) Close()
- func (t *Task[M]) Context() context.Context
- func (t *Task[M]) Err(err error)
- func (t *Task[M]) Fail(message string)
- func (t *Task[M]) ID() string
- func (t *Task[M]) IsShared() bool
- func (t *Task[M]) Output(msg string)
- func (t *Task[M]) Progress(current, total int)
- func (t *Task[M]) SetMeta(v M)
- func (t *Task[M]) SubTask(title string) *TaskSub[M]
- func (t *Task[M]) WithContext(ctx context.Context) context.Context
- type TaskNode
- type TaskNodeStatus
- type TaskOption
- type TaskRef
- type TaskStateEvent
- type TaskSub
- type TaskUpdateEvent
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CancelSharedTask ¶
CancelSharedTask cancels a shared task by ID.
func EnableWithMeta ¶
EnableWithMeta registers the shared task system with typed metadata.
func OutputWriter ¶
func OutputWriter(ctx context.Context, title string) io.WriteCloser
OutputWriter returns an io.WriteCloser that creates a child task node and sends each Write as output attached to that node.
func SharedSubTask ¶
SharedSubTask bridges the request-scoped and shared task systems.
func StepTaskProgress ¶
StepTaskProgress increments the current progress on the current task node by step.
func SubTask ¶
SubTask creates a child task under the current task node, runs fn, and marks the sub-task as completed (or failed if fn returns an error).
func TaskProgress ¶
TaskProgress sets the progress (current/total) on the current task node.
func WriterProgress ¶
WriterProgress returns an io.WriteCloser that creates a child task node tracking bytes written as progress (current/total).
Types ¶
type RequestTaskOutputEvent ¶
type RequestTaskOutputEvent struct {
RequestID string `json:"requestId"`
TaskID string `json:"taskId"`
Output string `json:"output"`
}
RequestTaskOutputEvent is the push event sent to the requesting client with text output for a specific task node within a request-scoped task.
type RequestTaskProgressEvent ¶
type RequestTaskProgressEvent struct {
RequestID string `json:"requestId"`
TaskID string `json:"taskId"`
Current int `json:"current"`
Total int `json:"total"`
}
RequestTaskProgressEvent is the push event sent to the requesting client with progress for a specific task node within a request-scoped task.
type RequestTaskTreeEvent ¶
type RequestTaskTreeEvent struct {
RequestID string `json:"requestId"`
Tasks []*TaskNode `json:"tasks"`
}
RequestTaskTreeEvent is the push event sent to the requesting client with a full task tree snapshot for a request-scoped task.
type SharedTaskState ¶
type SharedTaskState struct {
}
SharedTaskState is the wire representation of a shared task.
type Task ¶
type Task[M any] struct { // contains filtered or unexported fields }
Task is a type-safe, generic wrapper around a task node. It works for both request-scoped and shared tasks.
func StartTask ¶
func StartTask[M any](ctx context.Context, title string, opts ...TaskOption) (context.Context, *Task[M])
StartTask creates a task. By default it is request-scoped (visible only to the calling client). Pass Shared() to make it visible to all clients.
func (*Task[M]) Context ¶
Context returns the task's context (for shared tasks, the cancellable task context).
func (*Task[M]) Err ¶
Err fails the task with err.Error() if err is non-nil, or completes it if nil.
type TaskNode ¶
type TaskNode struct {
ID string `json:"id"`
Title string `json:"title"`
Status TaskNodeStatus `json:"status"`
Error string `json:"error,omitempty"`
Current int `json:"current,omitempty"`
Total int `json:"total,omitempty"`
Meta any `json:"meta,omitempty"`
Children []*TaskNode `json:"children,omitempty"`
}
TaskNode is the JSON-serializable snapshot of a task sent to the client.
type TaskNodeStatus ¶
type TaskNodeStatus string
TaskNodeStatus represents the current state of a task node.
const ( TaskNodeStatusCreated TaskNodeStatus = "created" TaskNodeStatusRunning TaskNodeStatus = "running" TaskNodeStatusCompleted TaskNodeStatus = "completed" TaskNodeStatusFailed TaskNodeStatus = "failed" )
func TaskNodeStatusValues ¶
func TaskNodeStatusValues() []TaskNodeStatus
TaskNodeStatusValues returns all possible TaskNodeStatus values.
type TaskOption ¶
type TaskOption func(*taskOptions)
TaskOption configures a task created by StartTask.
func Shared ¶
func Shared() TaskOption
Shared makes the task visible to all connected clients (broadcast). Without this option, the task is only visible to the requesting client.
type TaskRef ¶
type TaskRef struct {
TaskID string `json:"taskId"`
}
TaskRef is the reference returned to the client from a handler when a shared task is created.
type TaskStateEvent ¶
type TaskStateEvent struct {
Tasks []SharedTaskState `json:"tasks"`
}
TaskStateEvent is the push event broadcast to all clients when shared tasks change.
type TaskSub ¶
type TaskSub[M any] struct { // contains filtered or unexported fields }
TaskSub is a type-safe, generic child node of a Task.
func (*TaskSub[M]) Err ¶
Err fails the sub-task with err.Error() if err is non-nil, or completes it if nil.