tasks

package
v0.40.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 14, 2026 License: MIT Imports: 11 Imported by: 0

Documentation

Overview

Package tasks provides hierarchical task trees with progress tracking, output streaming, and both request-scoped and shared (broadcast) task systems for the github.com/marrasen/aprot framework.

Overview

Tasks let a server report structured, real-time progress to connected clients while a handler executes long-running work. A task is a tree of nodes, each with a title, status (Created → Running → Completed/Failed), optional numeric progress, typed metadata, and streamed text output.

There are two flavours of task:

  • Request-scoped tasks are tied to a single RPC call and visible only to the client that made the request.
  • Shared tasks are broadcast to every connected client and survive after the originating handler returns.

Enabling the task system

Register the task system during server setup:

tasks.Enable(registry)                   // no typed metadata
tasks.EnableWithMeta[MyMeta](registry)   // with typed metadata

This registers the CancelTask handler, push event types, and the middleware that wires task delivery into every request context.

Request-scoped tasks

Create a task inside a handler. It auto-completes when the handler returns nil, or auto-fails when the handler returns an error:

func (h *Handler) Import(ctx context.Context, req ImportReq) (*tasks.TaskRef, error) {
    ctx, task := tasks.StartTask[MyMeta](ctx, "Importing data")
    task.SetMeta(MyMeta{Filename: req.File})

    // Nest child work with SubTask:
    err := tasks.SubTask(ctx, "Validating", func(ctx context.Context) error {
        tasks.TaskProgress(ctx, 0, 100)
        // ...
        tasks.TaskProgress(ctx, 100, 100)
        return nil
    })
    return nil, err  // task auto-completes / auto-fails
}

The client receives RequestTaskTreeEvent, RequestTaskOutputEvent, and RequestTaskProgressEvent push events scoped to its request.

Shared (broadcast) tasks

Pass the Shared option to make the task visible to all clients:

ctx, task := tasks.StartTask[MyMeta](ctx, "Building index", tasks.Shared())

All clients receive TaskStateEvent and TaskUpdateEvent push events. Each client sees an IsOwner flag indicating whether it started the task.

To let a shared task outlive the client connection, use context.WithoutCancel:

ctx, task := tasks.StartTask[MyMeta](
    context.WithoutCancel(ctx), "Background job", tasks.Shared(),
)

SharedSubTask bridge

SharedSubTask creates a shared task that also routes nested SubTask, Output, and TaskProgress calls through the shared delivery system. If no connection or task manager is available, it falls back to a regular SubTask:

err := tasks.SharedSubTask(ctx, "Sync accounts", func(ctx context.Context) error {
    return tasks.SubTask(ctx, "Fetching", func(ctx context.Context) error {
        tasks.Output(ctx, "fetched 42 records")
        return nil
    })
})

Progress reporting

TaskProgress sets absolute progress on the current task node. StepTaskProgress increments the current value by a delta, which is convenient inside loops:

for i, item := range items {
    process(item)
    tasks.StepTaskProgress(ctx, 1)
}

Output streaming

Output sends a text message attached to the nearest task node. OutputWriter and WriterProgress return an io.WriteCloser that creates a child task node, sending each Write as output or tracking bytes written as progress:

w := tasks.OutputWriter(ctx, "Build log")
cmd.Stdout = w
cmd.Run()
w.Close()

Task and TaskSub types

StartTask returns a *Task handle for the root node. Use it to set metadata, report progress, stream output, create sub-tasks, or manually close/fail the task. Task.SubTask returns a *TaskSub handle for child nodes with the same capabilities.

TypeScript client

The generated TypeScript client provides hooks and helpers:

  • React: useSharedTasks, useMyTasks, useSharedTask(id), useTaskOutput(taskId), cancelSharedTask(client, taskId)
  • Vanilla: TaskStateEvent, TaskUpdateEvent push event types
  • Request-scoped: onTaskProgress and onOutput callbacks in RequestOptions

Cancellation

Clients can cancel shared tasks via the generated CancelTask handler. On the server side, use CancelSharedTask directly.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CancelSharedTask

func CancelSharedTask(ctx context.Context, taskID string) error

CancelSharedTask cancels a shared task by ID.

func Enable

func Enable(r *aprot.Registry)

Enable registers the shared task system with the registry.

func EnableWithMeta

func EnableWithMeta[M any](r *aprot.Registry)

EnableWithMeta registers the shared task system with typed metadata.

func Output

func Output(ctx context.Context, msg string)

Output sends a text output message attached to the nearest task node.

func OutputWriter

func OutputWriter(ctx context.Context, title string) io.WriteCloser

OutputWriter returns an io.WriteCloser that creates a child task node and sends each Write as output attached to that node.

func SharedSubTask

func SharedSubTask(ctx context.Context, title string, fn func(ctx context.Context) error) error

SharedSubTask bridges the request-scoped and shared task systems.

func StepTaskProgress

func StepTaskProgress(ctx context.Context, step int)

StepTaskProgress increments the current progress on the current task node by step.

func SubTask

func SubTask(ctx context.Context, title string, fn func(ctx context.Context) error) error

SubTask creates a child task under the current task node, runs fn, and marks the sub-task as completed (or failed if fn returns an error).

func TaskProgress

func TaskProgress(ctx context.Context, current, total int)

TaskProgress sets the progress (current/total) on the current task node.

func WriterProgress

func WriterProgress(ctx context.Context, title string, size int) io.WriteCloser

WriterProgress returns an io.WriteCloser that creates a child task node tracking bytes written as progress (current/total).

Types

type RequestTaskOutputEvent

type RequestTaskOutputEvent struct {
	RequestID string `json:"requestId"`
	TaskID    string `json:"taskId"`
	Output    string `json:"output"`
}

RequestTaskOutputEvent is the push event sent to the requesting client with text output for a specific task node within a request-scoped task.

type RequestTaskProgressEvent

type RequestTaskProgressEvent struct {
	RequestID string `json:"requestId"`
	TaskID    string `json:"taskId"`
	Current   int    `json:"current"`
	Total     int    `json:"total"`
}

RequestTaskProgressEvent is the push event sent to the requesting client with progress for a specific task node within a request-scoped task.

type RequestTaskTreeEvent

type RequestTaskTreeEvent struct {
	RequestID string      `json:"requestId"`
	Tasks     []*TaskNode `json:"tasks"`
}

RequestTaskTreeEvent is the push event sent to the requesting client with a full task tree snapshot for a request-scoped task.

type SharedTaskState

type SharedTaskState struct {
	ID       string         `json:"id"`
	ParentID string         `json:"parentId,omitempty"`
	Title    string         `json:"title"`
	Status   TaskNodeStatus `json:"status"`
	Error    string         `json:"error,omitempty"`
	Current  int            `json:"current,omitempty"`
	Total    int            `json:"total,omitempty"`
	Meta     any            `json:"meta,omitempty"`
	Children []*TaskNode    `json:"children,omitempty"`
	IsOwner  bool           `json:"isOwner"`
}

SharedTaskState is the wire representation of a shared task.

type Task

type Task[M any] struct {
	// contains filtered or unexported fields
}

Task is a type-safe, generic wrapper around a task node. It works for both request-scoped and shared tasks.

func StartTask

func StartTask[M any](ctx context.Context, title string, opts ...TaskOption) (context.Context, *Task[M])

StartTask creates a task. By default it is request-scoped (visible only to the calling client). Pass Shared() to make it visible to all clients.

func (*Task[M]) Close

func (t *Task[M]) Close()

Close marks the task as completed.

func (*Task[M]) Context

func (t *Task[M]) Context() context.Context

Context returns the task's context (for shared tasks, the cancellable task context).

func (*Task[M]) Err

func (t *Task[M]) Err(err error)

Err fails the task with err.Error() if err is non-nil, or completes it if nil.

func (*Task[M]) Fail

func (t *Task[M]) Fail(message string)

Fail marks the task as failed with the given error message.

func (*Task[M]) ID

func (t *Task[M]) ID() string

ID returns the task's unique identifier.

func (*Task[M]) IsShared

func (t *Task[M]) IsShared() bool

IsShared returns true if this task broadcasts to all clients.

func (*Task[M]) Output

func (t *Task[M]) Output(msg string)

Output sends output text associated with this task.

func (*Task[M]) Progress

func (t *Task[M]) Progress(current, total int)

Progress updates the task's progress counters.

func (*Task[M]) SetMeta

func (t *Task[M]) SetMeta(v M)

SetMeta sets typed metadata on the task.

func (*Task[M]) SubTask

func (t *Task[M]) SubTask(title string) *TaskSub[M]

SubTask creates a child node under this task.

func (*Task[M]) WithContext

func (t *Task[M]) WithContext(ctx context.Context) context.Context

WithContext returns a new context that carries this task's delivery and node.

type TaskNode

type TaskNode struct {
	ID       string         `json:"id"`
	Title    string         `json:"title"`
	Status   TaskNodeStatus `json:"status"`
	Error    string         `json:"error,omitempty"`
	Current  int            `json:"current,omitempty"`
	Total    int            `json:"total,omitempty"`
	Meta     any            `json:"meta,omitempty"`
	Children []*TaskNode    `json:"children,omitempty"`
}

TaskNode is the JSON-serializable snapshot of a task sent to the client.

type TaskNodeStatus

type TaskNodeStatus string

TaskNodeStatus represents the current state of a task node.

const (
	TaskNodeStatusCreated   TaskNodeStatus = "created"
	TaskNodeStatusRunning   TaskNodeStatus = "running"
	TaskNodeStatusCompleted TaskNodeStatus = "completed"
	TaskNodeStatusFailed    TaskNodeStatus = "failed"
)

func TaskNodeStatusValues

func TaskNodeStatusValues() []TaskNodeStatus

TaskNodeStatusValues returns all possible TaskNodeStatus values.

type TaskOption

type TaskOption func(*taskOptions)

TaskOption configures a task created by StartTask.

func Shared

func Shared() TaskOption

Shared makes the task visible to all connected clients (broadcast). Without this option, the task is only visible to the requesting client.

type TaskRef

type TaskRef struct {
	TaskID string `json:"taskId"`
}

TaskRef is the reference returned to the client from a handler when a shared task is created.

type TaskStateEvent

type TaskStateEvent struct {
	Tasks []SharedTaskState `json:"tasks"`
}

TaskStateEvent is the push event broadcast to all clients when shared tasks change.

type TaskSub

type TaskSub[M any] struct {
	// contains filtered or unexported fields
}

TaskSub is a type-safe, generic child node of a Task.

func (*TaskSub[M]) Close

func (s *TaskSub[M]) Close()

Close marks this sub-task as completed.

func (*TaskSub[M]) Err

func (s *TaskSub[M]) Err(err error)

Err fails the sub-task with err.Error() if err is non-nil, or completes it if nil.

func (*TaskSub[M]) Fail

func (s *TaskSub[M]) Fail(message string)

Fail marks this sub-task as failed with the given error message.

func (*TaskSub[M]) Progress

func (s *TaskSub[M]) Progress(current, total int)

Progress updates the sub-task's progress.

func (*TaskSub[M]) SetMeta

func (s *TaskSub[M]) SetMeta(v M)

SetMeta sets typed metadata on this sub-task node.

func (*TaskSub[M]) SubTask

func (s *TaskSub[M]) SubTask(title string) *TaskSub[M]

SubTask creates a child node under this sub-task.

type TaskUpdateEvent

type TaskUpdateEvent struct {
	TaskID  string  `json:"taskId"`
	Output  *string `json:"output,omitempty"`
	Current *int    `json:"current,omitempty"`
	Total   *int    `json:"total,omitempty"`
}

TaskUpdateEvent is the push event for per-node output and progress updates on shared tasks.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL