task

package
v1.21.23 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 15, 2026 License: Apache-2.0 Imports: 31 Imported by: 25

Documentation

Overview

Package task provides a comprehensive concurrent task execution system with progress tracking, status management, and visual rendering capabilities.

Quick Start

The task system provides a simple interface for running concurrent operations with visual feedback:

import "github.com/flanksource/clicky/task"

// Start a simple task
task := task.StartTask("Download File", func(ctx flanksourceContext.Context, t *task.Task) error {
	t.Infof("Starting download...")
	// Perform work
	time.Sleep(2 * time.Second)
	t.Infof("Download complete")
	return nil
})

// Wait for completion
result := task.WaitFor()
if result.Error != nil {
	fmt.Printf("Task failed: %v\n", result.Error)
}

Core Concepts

Task Lifecycle

Tasks progress through well-defined states with visual indicators:

- StatusPending (⏳): Task is queued but not yet started - StatusRunning (⟳): Task is currently executing - StatusSuccess (✓): Task completed successfully - StatusFailed (✗): Task failed with an error - StatusWarning (⚠): Task completed with warnings - StatusCancelled (⊘): Task was canceled

Manager

The Manager coordinates task execution and provides visual rendering:

manager := task.NewManager(
	task.WithMaxConcurrency(5),        // Limit concurrent tasks
	task.WithVerbose(true),            // Enable verbose logging
	task.WithNoProgress(false),        // Show progress bars
)

// Tasks automatically use the global manager if none specified

Task Groups

Groups organize related tasks and provide concurrency control:

group := task.NewGroup("Database Migration",
	task.WithConcurrency(2),  // Max 2 concurrent tasks in group
)

// Add tasks to the group
task1 := group.Add("Migrate Users", func(ctx, t) error { ... })
task2 := group.Add("Migrate Orders", func(ctx, t) error { ... })

// Wait for all tasks in the group
result := group.WaitFor()

TypedTask for Type Safety

TypedTask provides compile-time type safety for task results:

// Define a typed task that returns a string
task := task.StartTask("Fetch Data", func(ctx flanksourceContext.Context, t *task.Task) (string, error) {
	// Perform work and return typed result
	return "Hello, World!", nil
})

// Get typed result
result, err := task.GetResult()
// result is of type string

// Or wait and get result in one call
wait := task.WaitFor()
if wait.Error != nil {
	// Handle error
}

TypedGroup

Groups can also be typed for consistent result handling:

group := task.NewTypedGroup[UserData]("Load Users")

user1 := group.Add("Load User 1", func(ctx, t) (UserData, error) {
	return loadUser(1)
})

user2 := group.Add("Load User 2", func(ctx, t) (UserData, error) {
	return loadUser(2)
})

// Get all results as map
results, err := group.GetResults()
if err != nil {
	return err
}

for task, userData := range results {
	fmt.Printf("Task %s loaded: %+v\n", task.Name(), userData)
}

Status and Health System

The task system includes a rich status and health reporting system:

Status Types

Tasks support multiple status paradigms:

// Standard task statuses
task.SetStatus(task.StatusSuccess)
task.SetStatus(task.StatusFailed)
task.SetStatus(task.StatusWarning)

// Test-style statuses
task.SetStatus(task.StatusPASS)
task.SetStatus(task.StatusFAIL)
task.SetStatus(task.StatusSKIP)

Health Mixin

Results can implement HealthMixin for automatic status determination:

type DatabaseResult struct {
	Connected bool
	Error     error
}

func (r DatabaseResult) Health() task.Health {
	if r.Error != nil {
		return task.HealthError
	}
	if !r.Connected {
		return task.HealthWarning
	}
	return task.HealthOK
}

// Task status will automatically reflect the health
task.SetResult(DatabaseResult{Connected: true})
// Task status becomes StatusSuccess automatically

Visual Styling

Status information includes visual styling with Tailwind CSS classes:

status := task.StatusSuccess
fmt.Println(status.Icon())  // ✓
fmt.Println(status.Style()) // "text-green-600"

// Apply status styling to text
text := api.Text{Content: "Operation Complete"}
styledText := status.Apply(text)

Concurrency Control

Manager-Level Concurrency

Control global task concurrency:

manager := task.NewManager(task.WithMaxConcurrency(10))
// Maximum 10 tasks running simultaneously

Group-Level Concurrency

Fine-grained control within groups:

group := task.NewGroup("API Calls", task.WithConcurrency(3))
// Maximum 3 concurrent tasks within this group

Semaphore-Based Control

Groups use semaphores for precise concurrency management:

// Group automatically handles semaphore acquisition/release
group.Add("Task 1", taskFunc1)  // Acquires semaphore
group.Add("Task 2", taskFunc2)  // Waits if limit reached
// Semaphore released when task completes

Error Handling and Retry

Basic Error Handling

Tasks provide multiple ways to handle errors:

task := task.StartTask("Risky Operation", func(ctx, t) error {
	if someCondition {
		return errors.New("operation failed")
	}
	return nil
})

result := task.WaitFor()
if result.Error != nil {
	fmt.Printf("Task failed: %v\n", result.Error)
	fmt.Printf("Status: %s\n", result.Status)
}

Retry Configuration

Tasks support automatic retry with exponential backoff:

retryConfig := task.RetryConfig{
	RetryableErrors: []string{"timeout", "connection", "rate limit"},
	BaseDelay:      1 * time.Second,
	MaxDelay:       30 * time.Second,
	BackoffFactor:  2.0,
	JitterFactor:   0.1,
	MaxRetries:     3,
}

task := task.StartTaskWithOptions("Flaky Operation", taskFunc,
	task.WithRetry(retryConfig),
)

Fatal Errors

For unrecoverable errors that should stop execution:

task.Fatal(errors.New("critical system failure"))
// Immediately stops execution and exits program

Logging and Progress

Built-in Logging

Tasks include comprehensive logging capabilities:

task := task.StartTask("Process Data", func(ctx, t) error {
	t.Infof("Starting data processing...")
	t.Debugf("Processing batch %d", batchNum)

	if err := processData(); err != nil {
		t.Errorf("Failed to process batch: %v", err)
		return err
	}

	t.Infof("Processing complete")
	return nil
})

Log levels: - t.Tracef(): Detailed tracing (lowest level) - t.Debugf(): Debug information - t.Infof(): General information - t.Warnf(): Warnings - t.Errorf(): Errors - t.Fatalf(): Fatal errors

Progress Tracking

Tasks support progress indication:

task := task.StartTask("Upload Files", func(ctx, t) error {
	files := getFilesToUpload()
	total := len(files)

	for i, file := range files {
		t.SetProgress(i, total)
		err := uploadFile(file)
		if err != nil {
			return err
		}
	}

	t.SetProgress(total, total) // 100% complete
	return nil
})

Integration Examples

CLI Tool Integration

func runCommand(args []string) error {
	manager := task.NewManager(
		task.WithVerbose(verbose),
		task.WithNoProgress(noProgress),
	)

	// Start background tasks
	task1 := task.StartTask("Validate Input", validateInput)
	task2 := task.StartTask("Load Configuration", loadConfig)

	// Wait for prerequisites
	task1.WaitFor()
	task2.WaitFor()

	// Main processing
	mainTask := task.StartTask("Process Data", processData)
	return mainTask.WaitFor().Error
}

HTTP Server Integration

func handleRequest(w http.ResponseWriter, r *http.Request) {
	requestID := generateRequestID()

	task := task.StartTask(fmt.Sprintf("Handle Request %s", requestID),
		func(ctx, t) error {
			t.Infof("Processing request from %s", r.RemoteAddr)

			// Process request
			result, err := processRequest(r)
			if err != nil {
				t.Errorf("Request failed: %v", err)
				return err
			}

			t.Infof("Request completed successfully")
			return writeResponse(w, result)
		})

	// Wait for completion
	if result := task.WaitFor(); result.Error != nil {
		http.Error(w, result.Error.Error(), 500)
	}
}

Batch Processing

func processBatch(items []Item) error {
	group := task.NewTypedGroup[ProcessedItem]("Batch Processing",
		task.WithConcurrency(5),
	)

	// Process items concurrently
	for i, item := range items {
		group.Add(fmt.Sprintf("Process Item %d", i+1),
			func(ctx, t) (ProcessedItem, error) {
				return processItem(item)
			})
	}

	// Wait for all processing to complete
	results, err := group.GetResults()
	if err != nil {
		return fmt.Errorf("batch processing failed: %w", err)
	}

	// Handle results
	for task, result := range results {
		fmt.Printf("Task %s: %+v\n", task.Name(), result)
	}

	return nil
}

Testing Integration

The task system automatically detects test environments and adjusts behavior:

func TestDataProcessing(t *testing.T) {
	// Progress bars and colors automatically disabled in tests
	task := task.StartTask("Test Operation", func(ctx, task) error {
		// Task logging available in tests
		task.Infof("Running test operation")
		return performTestOperation()
	})

	result := task.WaitFor()
	assert.NoError(t, result.Error)
	assert.Equal(t, task.StatusSuccess, result.Status)
}

Advanced Features

Task Dependencies

Tasks can depend on other tasks:

prerequisite := task.StartTask("Load Configuration", loadConfig)

mainTask := task.StartTaskWithOptions("Main Process", mainProcess,
	task.WithDependencies(prerequisite),
)

// mainTask won't start until prerequisite completes successfully

Task Identity and Deduplication

Prevent duplicate tasks with identity tracking:

task1 := task.StartTaskWithOptions("Download File", downloadFunc,
	task.WithIdentity("download-file-xyz"),
)

// This will return the existing task instead of creating a new one
task2 := task.StartTaskWithOptions("Download File", downloadFunc,
	task.WithIdentity("download-file-xyz"),
)

// task1 and task2 reference the same underlying task

Custom Styling and Themes

Tasks support custom visual themes:

customTheme := api.Theme{
	Success: "text-blue-600",
	Error:   "text-purple-600",
	Warning: "text-orange-500",
}

manager := task.NewManager(task.WithTheme(customTheme))

Graceful Shutdown

Handle interrupts gracefully:

manager := task.NewManager(
	task.WithGracefulTimeout(30 * time.Second),
	task.WithInterruptHandler(func() {
		fmt.Println("Shutting down gracefully...")
	}),
)

// Manager will handle SIGINT/SIGTERM and allow running tasks to complete

// Terminal Safety // // The task system saves and restores terminal state automatically: // // - Normal exit: restored via shutdown hooks // - SIGINT: restored during graceful shutdown // - Double SIGINT: restored before forced os.Exit(1) // - Panic in main: restored if main defers shutdown.RecoverAndShutdown() // - Panic in task func: caught by worker, task marked as failed, process continues // // For panic protection in main, use: // // func main() { // defer shutdown.RecoverAndShutdown() // // ... start tasks, wait, etc. // } // // RecoverAndShutdown runs all shutdown hooks, restores the terminal, then // re-panics so the stack trace is preserved.

The task package provides a complete solution for concurrent task execution with rich visual feedback, comprehensive error handling, and flexible configuration options suitable for CLI tools, servers, and batch processing applications.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrBatchTimeout is returned when the entire batch exceeds its timeout
	ErrBatchTimeout = errors.New("batch execution exceeded timeout")

	// ErrItemTimeout is returned when an individual item exceeds its timeout
	ErrItemTimeout = errors.New("item execution exceeded timeout")
)
View Source
var OnBeforeGC func(groupID string, snapshots []TaskSnapshot)

OnBeforeGC, if non-nil, is called with each group's full snapshot just before GCRuns removes it from the in-memory manager. The callback receives the group's stable ID and the full snapshot slice (group + child tasks). It is called while global.mu is held, so it must not call back into the task package.

Functions

func AcquirePromptTerminal

func AcquirePromptTerminal() (func(), bool)

AcquirePromptTerminal waits for exclusive ANSI terminal ownership for prompt rendering, stopping the active task renderer first when necessary. The second return value reports whether the prompt displaced an active task renderer.

func BindManagerFlags

func BindManagerFlags(flags *flag.FlagSet, options *ManagerOptions)

BindManagerFlags adds TaskManager flags to standard flag set

func BindManagerPFlags

func BindManagerPFlags(flags *pflag.FlagSet, options *ManagerOptions)

BindManagerPFlags adds TaskManager flags to pflag set (for Cobra)

func CancelAll

func CancelAll()

CancelAll cancels all running tasks and groups

func ClearTasks

func ClearTasks()

ClearTasks removes all completed tasks (and groups) from the registry, keeping only those still pending or running. Groups must be pruned too: SnapshotAll iterates global.groups, so a leftover completed group would keep surfacing its (now-removed) tasks — e.g. a stale "Running tests" group whose task IDs no longer resolve, causing StopTask lookups to fail.

func Debug

func Debug() string

Debug returns debug information about the task manager

func GCRuns

func GCRuns()

GCRuns removes finished runs older than runRetention from the global manager. A run is "finished" once it has a non-zero FinishedAt (recorded the first time it is observed terminal). Live runs are retained regardless of age. If OnBeforeGC is set, each evicted group's full snapshot is passed to it before removal.

func GatedStderr

func GatedStderr() io.Writer

GatedStderr returns a writer that wraps os.Stderr but drops writes while the interactive task renderer owns the TTY. The writer is stateless; each Write rechecks ownership, so a writer captured before the renderer started still gates correctly.

func IsForceInteractive

func IsForceInteractive() bool

IsForceInteractive reports whether the interactive renderer has been forced on via SetForceInteractive or the CLICKY_FORCE_INTERACTIVE env var.

func IsInteractiveRenderActive

func IsInteractiveRenderActive() bool

IsInteractiveRenderActive reports whether the global task manager's interactive render loop currently owns the terminal. Callers that write to os.Stderr can consult this to drop writes that would corrupt the live frame. The check is cheap (atomic load + RLock).

Returns false when the manager is non-interactive (PlainRender mode), before the renderer has acquired the TTY, or after stopRender has released it.

func IsNoRender

func IsNoRender() bool

IsNoRender reports whether task rendering is currently disabled.

func JSONHandler

func JSONHandler(taskIDs ...string) http.Handler

JSONHandler returns an http.Handler that serves the full task state as JSON. If taskIDs are provided, only groups matching those IDs are included. This is used for initial page load, reconnection after SSE drop, or polling fallback.

func RegisterHandlers

func RegisterHandlers(mux *http.ServeMux, prefix string)

RegisterHandlers wires the generic task-manager API under prefix:

GET {prefix}/tasks         run listing (RunMeta[], ?kind=&status=&label=k=v)
GET {prefix}/tasks/stream  SSE stream of TaskSnapshots (?tasks=<id>&kind=)
GET {prefix}/tasks/{id}    id-scoped snapshot (group + tasks)

The {id} route reuses Go 1.22 net/http path-value routing; the stream route is registered before the {id} route so "stream" is not treated as an id.

func RunHandler

func RunHandler() http.Handler

RunHandler serves the id-scoped snapshot (group + its tasks) for a single run. The id is read from the {id} path value of the registered route.

func RunsHandler

func RunsHandler() http.Handler

RunsHandler serves the run listing (RunMeta per group) for the generic task-manager view, filtered by the ?kind=, ?status=, and repeated ?label=k=v query params.

func RunsSSEHandler

func RunsSSEHandler(supplement func(RunFilter) []RunMeta) http.Handler

RunsSSEHandler streams the run listing (RunMeta) as SSE. Unlike SSEHandler it never sends a terminal event: a manager view stays subscribed to observe new and changing runs. supplement (may be nil) merges extra runs — e.g. archived or persisted runs the in-memory registry no longer holds; live runs win on id. It emits a single "event: runs" frame carrying the full listing, and only re-emits when the listing changes.

func SSEHandler

func SSEHandler(taskIDs ...string) http.Handler

SSEHandler returns an http.Handler that streams task events via Server-Sent Events. If taskIDs are provided, only groups matching those IDs are streamed. The handler polls for dirty tasks every 200ms and emits JSON snapshots. It sends an "event: done" when all tracked groups have completed.

func SetForceInteractive

func SetForceInteractive(force bool)

SetForceInteractive forces the interactive renderer on even when the process is running under `go test` or CI (GO_TEST, CI, GITHUB_ACTIONS, …). It also marks the manager as interactive when the underlying FD is not a TTY, so that a PTY-wrapping caller sees the full cursor/clear/redraw ANSI stream. Intended for harnesses that need to audit interactive output — normal applications should rely on the automatic TTY detection.

func SetGracefulTimeout

func SetGracefulTimeout(timeout time.Duration)

SetGracefulTimeout sets the timeout for graceful shutdown

func SetInterruptHandler

func SetInterruptHandler(fn func())

SetInterruptHandler sets a custom callback to be called on interrupt

func SetLiveRenderer

func SetLiveRenderer(r LiveRenderer)

SetLiveRenderer installs a custom renderer on the global manager, or removes it when r is nil. It is process-global like SetNoRender; install it for the duration of one command and restore (SetLiveRenderer(nil)) afterwards.

func SetMaxConcurrent

func SetMaxConcurrent(max int)

SetMaxConcurrent sets the maximum number of concurrent tasks

func SetNoColor

func SetNoColor(noColor bool)

SetNoColor enables or disables colored output

func SetNoProgress

func SetNoProgress(noProgress bool)

SetNoProgress enables or disables progress display

func SetNoRender

func SetNoRender(noRender bool)

SetNoRender enables or disables all task rendering, including final summaries.

func SetRetryConfig

func SetRetryConfig(config RetryConfig)

SetRetryConfig sets the default retry configuration for new tasks

func SetVerbose

func SetVerbose(verbose bool)

SetVerbose enables or disables verbose logging

func StartCapturingOutput

func StartCapturingOutput()

StartCapturingOutput replaces os.Stdout and os.Stderr on the global task manager with pipes that buffer everything written until StopCapturingOutput is called. The live task renderer is unaffected because it captured the original file descriptors at manager init time (see Manager.renderer in manager.go). Loggers that captured os.Stderr before this call will also keep writing to the real terminal — only bare fmt.Print / os.Stderr writes after this call get buffered.

func StopCapturingOutput

func StopCapturingOutput()

StopCapturingOutput restores the real os.Stdout and os.Stderr on the global task manager and flushes every buffered line to the restored streams in the order it was written, tagged by stream of origin. Safe to call when capture wasn't started (no-op).

func StopTask

func StopTask(id string) bool

StopTask cancels a specific pending or running task by immutable ID.

func Wait

func Wait() int

Wait waits for all tasks to complete and returns the appropriate exit code

func WaitForAllTasks

func WaitForAllTasks()

WaitForAllTasks waits for all global tasks to complete and forces a final render

func WaitSilent

func WaitSilent() int

WaitSilent waits for all tasks to complete without displaying results

Types

type Batch

type Batch[T any] struct {
	Name       string
	Items      []func(logger logger.Logger) (T, error)
	MaxWorkers int
	Results    []T
	// Timeout is the maximum duration for the entire batch to complete.
	// Zero value means no timeout (infinite wait until completion or context cancellation).
	Timeout time.Duration
	// ItemTimeout is the maximum duration for each individual item to complete.
	// Zero value means no per-item timeout.
	ItemTimeout time.Duration
}
Example (ItemTimeout)

ExampleBatch_itemTimeout demonstrates using per-item timeouts

package main

import (
	"errors"
	"fmt"
	"time"

	"github.com/flanksource/commons/logger"

	"github.com/flanksource/clicky/task"
)

func main() {
	batch := &task.Batch[int]{
		Name:        "example-item-timeout",
		ItemTimeout: 100 * time.Millisecond, // Each item must complete within 100ms
		MaxWorkers:  3,
	}

	// Add 6 items: 3 fast (50ms), 3 slow (150ms)
	for i := 0; i < 6; i++ {
		i := i
		batch.Items = append(batch.Items, func(log logger.Logger) (int, error) {
			if i%2 == 0 {
				time.Sleep(50 * time.Millisecond)
			} else {
				time.Sleep(150 * time.Millisecond)
			}
			return i, nil
		})
	}

	completed := []int{}
	itemTimeouts := 0

	for result := range batch.Run() {
		if result.Error != nil {
			if errors.Is(result.Error, task.ErrItemTimeout) {
				itemTimeouts++
			}
		} else {
			completed = append(completed, result.Value)
		}
	}

	fmt.Printf("Completed: %d, Timed out: %d\n", len(completed), itemTimeouts)
}
Output:
Completed: 3, Timed out: 3
Example (Timeout)

ExampleBatch_timeout demonstrates using a batch timeout to limit total execution time

package main

import (
	"errors"
	"fmt"
	"time"

	"github.com/flanksource/commons/logger"

	"github.com/flanksource/clicky/task"
)

func main() {
	batch := &task.Batch[string]{
		Name:       "example-batch-timeout",
		Timeout:    500 * time.Millisecond, // Batch must complete within 500ms
		MaxWorkers: 2,
	}

	// Add 10 items that each take 200ms - only ~5 will complete before timeout
	for i := 0; i < 10; i++ {
		i := i
		batch.Items = append(batch.Items, func(log logger.Logger) (string, error) {
			time.Sleep(200 * time.Millisecond)
			return fmt.Sprintf("item-%d", i), nil
		})
	}

	completed := []string{}
	timedOut := false

	for result := range batch.Run() {
		if result.Error != nil {
			if errors.Is(result.Error, task.ErrBatchTimeout) {
				timedOut = true
				fmt.Println("Batch timeout occurred")
			}
		} else {
			completed = append(completed, result.Value)
		}
	}

	fmt.Printf("Completed %d items before timeout: %v\n", len(completed), timedOut)
}
Output:
Batch timeout occurred
Completed 6 items before timeout: true
Example (ZeroTimeout)

ExampleBatch_zeroTimeout demonstrates zero-value backward compatibility

package main

import (
	"fmt"
	"time"

	"github.com/flanksource/commons/logger"

	"github.com/flanksource/clicky/task"
)

func main() {
	batch := &task.Batch[int]{
		Name:       "example-zero-timeout",
		Timeout:    0, // Zero means no timeout (backward compatible)
		MaxWorkers: 2,
	}

	// Add items that would take a while
	for i := 0; i < 5; i++ {
		i := i
		batch.Items = append(batch.Items, func(log logger.Logger) (int, error) {
			time.Sleep(50 * time.Millisecond)
			return i, nil
		})
	}

	count := 0
	for result := range batch.Run() {
		if result.Error == nil {
			count++
		}
	}

	fmt.Printf("All %d items completed (no timeout)\n", count)
}
Output:
All 5 items completed (no timeout)

func (*Batch[T]) Run

func (b *Batch[T]) Run() chan BatchResult[T]

func (*Batch[T]) WithItemTimeout

func (b *Batch[T]) WithItemTimeout(duration time.Duration) *Batch[T]

WithItemTimeout sets the maximum duration for each individual item to complete. Returns the batch for method chaining.

func (*Batch[T]) WithTimeout

func (b *Batch[T]) WithTimeout(duration time.Duration) *Batch[T]

WithTimeout sets the maximum duration for the entire batch to complete. Returns the batch for method chaining.

Example

ExampleBatch_WithTimeout demonstrates method chaining

package main

import (
	"fmt"
	"time"

	"github.com/flanksource/clicky/task"
)

func main() {
	batch := (&task.Batch[string]{
		Name:       "example-chaining",
		MaxWorkers: 3,
	}).WithTimeout(1 * time.Second).WithItemTimeout(200 * time.Millisecond)

	fmt.Printf("Batch timeout: %v, Item timeout: %v\n", batch.Timeout, batch.ItemTimeout)
}
Output:
Batch timeout: 1s, Item timeout: 200ms

type BatchResult

type BatchResult[T any] struct {
	Value    T
	Error    error
	Duration time.Duration
}

type Group

type Group struct {
	Items []Taskable // Can contain Tasks or nested Groups
	// contains filtered or unexported fields
}

Group represents a group of tasks that can be managed collectively

func (*Group) Cancel

func (g *Group) Cancel()

Cancel cancels all items in the group

func (*Group) FinishedAt

func (g *Group) FinishedAt() time.Time

FinishedAt returns the time the group first became terminal, or the zero time if it is still running/pending. It is recorded lazily by observeTerminal (called from snapshotting) so it does not depend on a WaitFor caller.

func (*Group) GetTasks

func (g *Group) GetTasks() []Taskable

func (*Group) ID

func (g *Group) ID() string

ID returns the group's stable unique id.

func (*Group) IsGroup

func (g *Group) IsGroup() bool

IsGroup returns true for Group

func (*Group) Metadata

func (g *Group) Metadata() GroupMetadata

Metadata returns a copy of the group's metadata.

func (*Group) Name

func (g *Group) Name() string

Name returns the group name

func (*Group) StartedAt

func (g *Group) StartedAt() time.Time

StartedAt returns the group's start time.

func (*Group) Status

func (g *Group) Status() Status

type GroupMetadata

type GroupMetadata struct {
	Kind   string            `json:"kind,omitempty"`
	Labels map[string]string `json:"labels,omitempty"`
	Owner  string            `json:"owner,omitempty"`
}

GroupMetadata is the queryable metadata attached to a run (task group). Kind classifies the run (e.g. "sql-fix", "test-run"), Labels carry arbitrary key/value facets for filtering, and Owner identifies who started it. All fields are optional.

type Health

type Health string
const (
	HealthOK      Health = "ok"
	HealthWarning Health = "warning"
	HealthError   Health = "error"
	HealthPending Health = "pending"
)

func (Health) Style

func (h Health) Style() string

type HealthMixin

type HealthMixin interface {
	Health() Health
}

type LiveRenderer

type LiveRenderer interface {
	// RenderLive returns the content for one live frame — a 250ms interactive
	// tick or a single non-interactive (PlainRender) flush.
	RenderLive(tasks []*Task) api.Text
	// RenderFinal returns the content drawn once after all tasks complete.
	RenderFinal(tasks []*Task) api.Text
}

LiveRenderer replaces the default task-tree rendering with caller-supplied content while keeping clicky's terminal ownership: the render loop still owns the TTY, ClearLines line accounting, and the logger serializer that prevents concurrent log lines from corrupting the in-place frame. Only the rendered content (an api.Text) is swapped — a caller that wants its own block (a status table, a custom dashboard) renders it through clicky instead of hand-rolling ANSI redraws that collide with logger output.

Install with SetLiveRenderer; remove by passing nil. Both methods receive a snapshot of the current tasks and must not mutate them.

type LogEntry

type LogEntry struct {
	Level   string `json:"level"`
	Message string `json:"message"`
}

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager manages and displays multiple tasks with progress bars

func (*Manager) PlainRender

func (tm *Manager) PlainRender()

PlainRender outputs the current task statuses in plain text without any interactive / ANSI / console features

func (*Manager) Pretty

func (tm *Manager) Pretty() api.Text

func (*Manager) Run

func (tm *Manager) Run() error

Run starts all tasks and waits for completion

func (*Manager) Start

func (tm *Manager) Start(name string, opts ...Option) *Task

Start creates and starts tracking a new task

func (*Manager) StartCapturingOutput

func (tm *Manager) StartCapturingOutput()

StartCapturingOutput redirects stdout/stderr to internal buffer

func (*Manager) StartWithResult

func (tm *Manager) StartWithResult(name string, taskFunc func(flanksourceContext.Context, *Task) (interface{}, error), opts ...Option) *Task

StartWithResult creates and starts tracking a new task with typed result handling

func (*Manager) StopCapturingOutput

func (tm *Manager) StopCapturingOutput()

StopCapturingOutput restores stdout/stderr and prints buffered output

type ManagerOptions

type ManagerOptions struct {
	NoProgress      bool          // Disable progress display
	NoRender        bool          // Disable all task rendering
	MaxConcurrent   int           // Maximum concurrent tasks (0 = unlimited)
	GracefulTimeout time.Duration // Timeout for graceful shutdown

	// Retry configuration
	MaxRetries int           // Maximum retry attempts
	RetryDelay time.Duration // Base delay between retries
}

ManagerOptions contains configuration options for TaskManager

func DefaultManagerOptions

func DefaultManagerOptions() *ManagerOptions

DefaultManagerOptions returns sensible defaults

func (*ManagerOptions) Apply

func (opts *ManagerOptions) Apply()

Apply configures a TaskManager with these options

type Option

type Option func(*Task)

Option configures task creation

func WithDependencies

func WithDependencies(deps ...*Task) Option

WithDependencies sets tasks that must complete before this task can start

func WithFunc

func WithFunc(fn func(flanksourceContext.Context, *Task) error) Option

WithFunc sets the function to run for the task

func WithIdentity

func WithIdentity(identity string) Option

WithIdentity sets a unique identifier for task deduplication

func WithModel

func WithModel(modelName string) Option

WithModel sets the model name for the task

func WithPriority

func WithPriority(priority int) Option

WithPriority sets the priority for task scheduling (lower = higher priority)

func WithPrompt

func WithPrompt(prompt string) Option

WithPrompt sets the prompt for the task

func WithRetryConfig

func WithRetryConfig(config RetryConfig) Option

WithRetryConfig sets custom retry configuration for the task

func WithTaskTimeout

func WithTaskTimeout(d time.Duration) Option

WithTaskTimeout sets an individual task timeout applied at execution time

func WithTimeout

func WithTimeout(d time.Duration) Option

WithTimeout sets a timeout for the task

type OutputEntry

type OutputEntry struct {
	Timestamp time.Time
	Stream    string // "stdout" or "stderr"
	Line      string
}

OutputEntry represents a captured stdout/stderr line with metadata

type RetryConfig

type RetryConfig struct {
	RetryableErrors []string // Error message patterns that should trigger retries
	BaseDelay       time.Duration
	MaxDelay        time.Duration
	BackoffFactor   float64
	JitterFactor    float64
	MaxRetries      int
}

RetryConfig holds configuration for task retry behavior

func DefaultRetryConfig

func DefaultRetryConfig() RetryConfig

DefaultRetryConfig returns sensible default retry configuration

type RunFilter

type RunFilter struct {
	Kind   string
	Status string
	Labels map[string]string // every entry must match
}

RunFilter narrows the runs returned by Runs. Empty fields match everything.

func (RunFilter) Matches

func (f RunFilter) Matches(m RunMeta) bool

type RunMeta

type RunMeta struct {
	ID         string            `json:"id"`
	Name       string            `json:"name"`
	Kind       string            `json:"kind,omitempty"`
	Labels     map[string]string `json:"labels,omitempty"`
	Owner      string            `json:"owner,omitempty"`
	Status     string            `json:"status"`
	StartedAt  string            `json:"startedAt,omitempty"`  // RFC3339
	FinishedAt string            `json:"finishedAt,omitempty"` // RFC3339
	Total      int               `json:"total"`
	Completed  int               `json:"completed"`
	Failed     int               `json:"failed"`
	Running    int               `json:"running"`
}

RunMeta is the listing summary for one run (task group): identity, metadata, status, timing, and child-task counts. It is what the generic task-manager list view renders; drill-down uses SnapshotByID.

func RunMetaFromSnapshot

func RunMetaFromSnapshot(snap TaskSnapshot) RunMeta

RunMetaFromSnapshot lifts the group-level fields of a group snapshot into a RunMeta. The snapshot's ID is the group name; GroupID carries the stable id.

func Runs

func Runs(filter RunFilter) []RunMeta

Runs returns one RunMeta per registered group, newest-first, optionally narrowed by filter. It runs GC first so stale finished runs drop out.

func RunsRaw

func RunsRaw(filter RunFilter) []RunMeta

RunsRaw is like Runs but does NOT trigger GC first. Callers that manage their own GC timing (e.g. an L2-backed wrapper that needs to snapshot before GC) use this to avoid double-GC.

type Status

type Status string

Status represents the status of a task

const (
	// StatusPending indicates the task is waiting to start
	StatusPending Status = "pending"
	// StatusRunning indicates the task is currently running
	StatusRunning Status = "running"
	// StatusSuccess indicates the task completed successfully
	StatusSuccess Status = "success"
	// StatusFailed indicates the task failed
	StatusFailed Status = "failed"
	// StatusWarning indicates the task completed with warnings
	StatusWarning Status = "warning"
	// StatusCancelled indicates the task was canceled
	StatusCancelled Status = "canceled"

	// StatusPASS indicates a test passed
	StatusPASS Status = "PASS"
	// StatusFAIL indicates a test failed
	StatusFAIL Status = "FAIL"
	// StatusERR indicates a test had an error
	StatusERR Status = "ERR"
	// StatusSKIP indicates a test was skipped
	StatusSKIP Status = "SKIP"
)

func (Status) Apply

func (s Status) Apply(t api.Text) api.Text

Apply applies the status icon and style to the given text, preserving any style classes (such as width/truncation directives) the caller has already set.

func (Status) Health

func (s Status) Health() Health

Health converts the status to a health state

func (Status) Icon

func (s Status) Icon() string

Icon returns the emoji icon representation of the status

func (Status) Pretty

func (s Status) Pretty() api.Text

Pretty returns a pretty formatted text representation of the status

func (Status) String

func (s Status) String() string

func (Status) Style

func (s Status) Style() string

Style returns the CSS style class for the status

type Task

type Task struct {
	// contains filtered or unexported fields
}

Task represents a single task being tracked by the TaskManager

func (*Task) Cancel

func (t *Task) Cancel()

Cancel cancels the task

func (*Task) ClearLogs

func (t *Task) ClearLogs()

ClearLogs clears all buffered logs for this task

func (*Task) Context

func (t *Task) Context() context.Context

Context returns the task's context for cancellation

func (*Task) Debugf

func (t *Task) Debugf(format string, args ...interface{})

Debugf logs a debug message (only shown in verbose mode)

func (*Task) Description

func (t *Task) Description() string

Description returns the task description

func (*Task) Duration

func (t *Task) Duration() time.Duration

Duration returns the task duration

func (*Task) Error

func (t *Task) Error() error

Error returns the task's error if any

func (*Task) Errorf

func (t *Task) Errorf(format string, args ...interface{})

Errorf logs an error message

func (*Task) Failed

func (t *Task) Failed() *Task

Failed marks the task as failed

func (*Task) FailedWithError

func (t *Task) FailedWithError(err error) (*Task, error)

FailedWithError marks the task as failed with an error

func (*Task) Fatal

func (t *Task) Fatal(err error)

Fatal marks the task as failed and exits the program immediately

func (*Task) Fatalf

func (t *Task) Fatalf(format string, args ...interface{})

Fatalf logs a fatal message (implements Logger interface)

func (*Task) FlanksourceContext

func (t *Task) FlanksourceContext() flanksourceContext.Context

FlanksourceContext returns the task's flanksource context for logging

func (*Task) GetLevel

func (t *Task) GetLevel() logger.LogLevel

GetLevel returns the current log level (implements Logger interface)

func (*Task) GetResult

func (t *Task) GetResult() (interface{}, error)

GetResult returns the stored result and error

func (*Task) GetSlogLogger

func (t *Task) GetSlogLogger() *slog.Logger

GetSlogLogger returns the slog logger (implements Logger interface - unsupported)

func (*Task) GetTask

func (t *Task) GetTask() *Task

GetTask returns the task itself

func (*Task) GetTypedResult

func (t *Task) GetTypedResult(target interface{}) error

GetTypedResult retrieves the result with type assertion

func (*Task) ID

func (t *Task) ID() string

ID returns the task's immutable UUID.

func (*Task) Identity

func (t *Task) Identity() string

Identity returns the task's unique identifier for deduplication

func (*Task) Infof

func (t *Task) Infof(format string, args ...interface{})

Infof logs an info message (only shown in verbose mode)

func (*Task) IsDebugEnabled

func (t *Task) IsDebugEnabled() bool

IsDebugEnabled checks if debug level is enabled (implements Logger interface)

func (*Task) IsGroup

func (t *Task) IsGroup() bool

IsGroup returns false for Task

func (*Task) IsLevelEnabled

func (t *Task) IsLevelEnabled(level logger.LogLevel) bool

IsLevelEnabled checks if a specific level is enabled (implements Logger interface)

func (*Task) IsOk

func (t *Task) IsOk() bool

IsOk returns true if the task completed successfully

func (*Task) IsTraceEnabled

func (t *Task) IsTraceEnabled() bool

IsTraceEnabled checks if trace level is enabled (implements Logger interface)

func (*Task) Name

func (t *Task) Name() string

Name returns the task name

func (*Task) Named

func (t *Task) Named(name string) logger.Logger

Named returns a named logger (implements Logger interface - noop)

func (*Task) PopDirty

func (t *Task) PopDirty() bool

PopDirty checks and clears the dirty flag atomically

func (*Task) Pretty

func (t *Task) Pretty() api.Text

Pretty returns a formatted text representation of the task

func (*Task) Progress

func (t *Task) Progress() (value, maximum int)

Progress returns the task's current progress value and maximum. A maximum of 0 means the task has no bounded progress.

func (*Task) SetDescription

func (t *Task) SetDescription(description string)

SetDescription sets the task description

func (*Task) SetLogLevel

func (t *Task) SetLogLevel(level any)

SetLogLevel sets the log level (implements Logger interface)

func (*Task) SetMinLogLevel

func (t *Task) SetMinLogLevel(level any)

SetMinLogLevel sets the minimum log level (implements Logger interface)

func (*Task) SetName

func (t *Task) SetName(name string)

SetName sets the task name

func (*Task) SetProgress

func (t *Task) SetProgress(value, maximum int)

SetProgress updates the task's progress

func (*Task) SetResult

func (t *Task) SetResult(result interface{})

SetResult stores a result in the task

func (*Task) SetStatus

func (t *Task) SetStatus(status Status)

SetStatus updates the task's display name/status message

func (*Task) StartTime

func (t *Task) StartTime() time.Time

StartTime returns when the task started execution

func (*Task) Status

func (t *Task) Status() Status

Status returns the current task status

func (*Task) Success

func (t *Task) Success() *Task

Success marks the task as successfully completed

func (*Task) Tracef

func (t *Task) Tracef(format string, args ...interface{})

Tracef logs a trace message (implements Logger interface)

func (*Task) V

func (t *Task) V(level any) logger.Verbose

V returns a verbose logger (implements Logger interface)

func (*Task) WaitFor

func (t *Task) WaitFor() *WaitResult

WaitFor waits for this specific task to complete and returns the result

func (*Task) WaitTime

func (t *Task) WaitTime() time.Duration

WaitTime returns how long the task waited before starting

func (*Task) Warnf

func (t *Task) Warnf(format string, args ...interface{})

Warnf logs a warning message

func (*Task) Warning

func (t *Task) Warning() *Task

Warning marks the task as completed with warnings

func (*Task) WithSkipReportLevel

func (t *Task) WithSkipReportLevel(i int) logger.Logger

WithSkipReportLevel returns a logger with skip report level (implements Logger interface - noop)

func (*Task) WithV

func (t *Task) WithV(level any) logger.Logger

WithV returns a logger with verbosity level (implements Logger interface)

func (*Task) WithValues

func (t *Task) WithValues(keysAndValues ...interface{}) logger.Logger

WithValues returns a logger with additional key-value pairs (implements Logger interface)

func (*Task) WithoutName

func (t *Task) WithoutName() logger.Logger

WithoutName returns a logger without name (implements Logger interface - noop)

type TaskFunc

type TaskFunc[T any] func(flanksourceContext.Context, *Task) (T, error)

TaskFunc is a generic task function that returns a typed result

type TaskGroupOption

type TaskGroupOption func(group *Group)

func WithConcurrency

func WithConcurrency(concurrency int) TaskGroupOption

func WithGroupID

func WithGroupID(id string) TaskGroupOption

WithGroupID sets a caller-supplied stable id for the group. When unset, StartGroup assigns a uuid.

func WithKind

func WithKind(kind string) TaskGroupOption

WithKind classifies the run for the task-manager listing/filtering.

func WithLabels

func WithLabels(labels map[string]string) TaskGroupOption

WithLabels attaches filterable key/value facets to the run.

func WithOwner

func WithOwner(owner string) TaskGroupOption

WithOwner records who started the run.

type TaskResult

type TaskResult[T any] struct {
	Result T
	Error  error
}

TaskResult holds a typed result and error

type TaskSnapshot

type TaskSnapshot struct {
	ID        string     `json:"id"`
	Name      string     `json:"name"`
	Type      string     `json:"type"`            // "task" or "group"
	Group     string     `json:"group,omitempty"` // parent group name
	Status    string     `json:"status"`
	Duration  string     `json:"duration,omitempty"`
	Error     string     `json:"error,omitempty"`
	Message   string     `json:"message,omitempty"`   // latest log line
	Logs      []LogEntry `json:"logs,omitempty"`      // all log entries
	Total     int        `json:"total,omitempty"`     // group: total child tasks
	Completed int        `json:"completed,omitempty"` // group: completed tasks
	Failed    int        `json:"failed,omitempty"`    // group: failed tasks
	Running   int        `json:"running,omitempty"`   // group: running tasks

	// Per-task fields (type == "task"). Description is the live stage label set
	// via Task.SetDescription; Progress/MaxValue mirror Task.SetProgress so the UI
	// can render an x/y count and percent for a running task. MaxValue 0 means the
	// task has no bounded progress.
	Description string `json:"description,omitempty"`
	Progress    int    `json:"progress,omitempty"`
	MaxValue    int    `json:"maxValue,omitempty"`

	// Registry metadata (additive). For a group these describe the run itself;
	// for a task GroupID links it to its parent run so the SSE/JSON clients can
	// key on a stable id rather than the human-facing name.
	GroupID    string            `json:"groupId,omitempty"`
	Kind       string            `json:"kind,omitempty"`
	Labels     map[string]string `json:"labels,omitempty"`
	Owner      string            `json:"owner,omitempty"`
	StartedAt  string            `json:"startedAt,omitempty"`  // RFC3339
	FinishedAt string            `json:"finishedAt,omitempty"` // RFC3339
}

TaskSnapshot is a JSON-serializable snapshot of a task or group's current state.

func SnapshotAll

func SnapshotAll(taskIDs ...string) []TaskSnapshot

SnapshotAll returns snapshots for all groups and their tasks. If taskIDs is non-empty, only groups whose name OR stable id matches are included (matching by id lets the registry/SSE drill into one run; matching by name preserves the legacy name-keyed callers).

func SnapshotByID

func SnapshotByID(id string) []TaskSnapshot

SnapshotByID returns the group + task snapshots for the run with the given stable id (not name). Returns nil when no such run exists.

func SnapshotGroup

func SnapshotGroup(g *Group) TaskSnapshot

SnapshotGroup creates a TaskSnapshot from a Group with aggregate child stats. The snapshot ID stays the group NAME for backward compatibility with the name-keyed Preact UI and JSONHandler; the stable id is carried separately in GroupID. Observing a terminal status here records finishedAt lazily.

func SnapshotTask

func SnapshotTask(t *Task, group *Group) TaskSnapshot

SnapshotTask creates a TaskSnapshot from a Task. group is the parent group, or nil for an ungrouped task; its name and id are recorded on the snapshot.

type Taskable

type Taskable interface {
	GetTask() *Task
}

Taskable represents objects that can return a Task

type TypedGroup

type TypedGroup[T any] struct {
	*Group
}

func StartGroup

func StartGroup[T any](name string, opts ...TaskGroupOption) TypedGroup[T]

StartGroup creates and starts tracking a new task group

func (TypedGroup[T]) Add

func (g TypedGroup[T]) Add(name string, taskFunc func(flanksourceContext.Context, *Task) (T, error), opts ...Option) TypedTask[T]

Add adds a Waitable item (Task or Group) to this group

func (*TypedGroup[T]) Duration

func (g *TypedGroup[T]) Duration() time.Duration

Duration returns the total duration from first start to last completion

func (TypedGroup[T]) GetResults

func (g TypedGroup[T]) GetResults() (map[TypedTask[T]]T, error)

GetResults waits for all tasks in the group and returns typed results

func (TypedGroup[T]) IsGroup

func (g TypedGroup[T]) IsGroup() bool

IsGroup returns true for Group

func (*TypedGroup[T]) WaitFor

func (g *TypedGroup[T]) WaitFor() *WaitResult

WaitFor waits for all child items to complete and returns aggregate results This version handles dynamically added tasks by continuously checking for new tasks

type TypedTask

type TypedTask[T any] struct {
	*Task
}

TypedTask provides typed access to task results

func StartTask

func StartTask[T any](name string, taskFunc func(flanksourceContext.Context, *Task) (T, error), opts ...Option) TypedTask[T]

func (TypedTask[T]) GetResult

func (t TypedTask[T]) GetResult() (T, error)

GetResult retrieves the typed result from a TypedTask

type WaitResult

type WaitResult struct {
	Error        error
	Status       Status
	Duration     time.Duration
	TaskCount    int // Number of individual tasks (1 for Task, N for TaskGroup)
	SuccessCount int // Number of successful tasks
	FailureCount int // Number of failed tasks
	WarningCount int // Number of tasks with warnings
}

WaitResult contains unified result information

type Waitable

type Waitable interface {
	Name() string
	Status() Status
	WaitFor() *WaitResult
	Context() context.Context
	Cancel()
	Duration() time.Duration
	IsGroup() bool
}

Waitable represents something that can be waited on (Task or TaskGroup)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL