Documentation
¶
Overview ¶
Package task provides small, standard-library flavored background task primitives.
Design highlights ¶
- Manager: holds tasks, starts schedulers, and coordinates graceful shutdown.
- Trigger task: runs on demand via Handle.Trigger/TryTrigger/TriggerAndWait.
- Every task: runs periodically, with either fixed-delay or fixed-rate scheduling.
- Overlap policy: Skip or Merge (bounded; no unbounded queue).
- Panic/error reporting: uses safego-style handlers and tags; by default reports to stderr.
Lifecycle ¶
Manager must be started explicitly:
m := task.NewManager()
h, _ := m.Add(task.Every(10*time.Second, work), task.WithName("refresh"))
_ = m.Start(ctx)
defer m.Shutdown(context.Background())
Start is not idempotent: calling Start more than once returns ErrAlreadyStarted.
Shutdown is safe to call even if Start was never called; it marks all registered tasks as stopped for observability.
Shutdown cancels the manager context and waits for all internal goroutines (schedulers + runs) to exit. During/after Shutdown:
- Add returns ErrClosed
- Trigger/TryTrigger/TriggerAndWait are no-ops (TryTrigger=false; TriggerAndWait=ErrClosed)
Triggering before Start is also a no-op (TryTrigger=false; TriggerAndWait=ErrNotRunning).
Trigger vs Every ¶
Trigger tasks run when triggered:
h, _ := m.Add(task.Trigger(func(ctx context.Context) error {
return rebuildIndex(ctx)
}), task.WithName("rebuild-index"))
h.Trigger() // fire-and-forget
By default, Trigger uses OverlapMerge (maxConcurrent defaults to 1).
Every tasks run periodically:
_, _ = m.Add(task.Every(10*time.Second, refreshCache),
task.WithName("cache-refresh"),
)
By default, Every uses OverlapSkip (maxConcurrent defaults to 1).
By default, Every does NOT run immediately on Start (first run happens after one interval). Use WithStartImmediately(true) to run immediately.
Fixed-delay vs fixed-rate ¶
EveryFixedDelay (default): next run is scheduled after a run finishes, then waits interval.
Note: fixed-delay uses the completion time of the latest run, regardless of whether the run was triggered manually (Handle.Trigger) or scheduled. In other words, manual triggers also "reset" the delay window: the next fixed-delay tick will be interval after the most recent completion. If a tick happens while a run is still in-flight and the overlap policy is OverlapSkip, that tick is dropped; the scheduler still waits for the next completion to compute the following tick.
EveryFixedRate: schedules run opportunities aligned to a base time and interval, but it never "catches up" by emitting multiple missed ticks; it only schedules the next tick.
Base time:
- If a task is added before Manager.Start, base time is the Start time.
- If a task is added after Manager.Start, base time is the Add time.
Overlap policies (Skip / Merge) ¶
Under contention (max concurrency reached), run opportunities are handled as:
- OverlapSkip: drop the opportunity (TriggerAndWait returns ErrSkipped)
- OverlapMerge: coalesce all overlapping opportunities into one pending run
This keeps behavior bounded; task does not provide unbounded queues.
Hooks ¶
Manager and task options may provide OnRunStart/OnRunFinish hooks.
Hooks are called synchronously on the task execution path. They must be fast and must not block (avoid network I/O and long computations). If you need asynchronous processing, start your own goroutine or send to a buffered channel from the hook.
TriggerAndWait ¶
TriggerAndWait is useful for admin/ops endpoints or startup "run once now":
if err := h.TriggerAndWait(ctx); err != nil {
// ErrNotRunning / ErrClosed / ErrSkipped / ErrPanicked / ctx.Err() / or the task's error
}
Observability ¶
Task status can be observed via Handle.Status() or Manager.Snapshot(), which is designed for consumption by an ops layer:
snap := m.Snapshot()
if st, ok := snap.Get("cache-refresh"); ok {
_ = st.LastError
_ = st.NextRun
}
Note on Status.LastError: LastError is updated only when a run fails (or panics), and it is not cleared on success. Treat it as "last failure" rather than "last run error"; use the timestamps/counters to interpret recency and outcome.
Names and lookup ¶
Task names are optional. If a task is named (WithName), the name is:
- normalized by strings.TrimSpace
- validated against [A-Za-z0-9._-]
- unique within the Manager
Named tasks can be looked up by name:
h, ok := m.Lookup("cache-refresh")
_ = h
Index ¶
- Variables
- type EveryMode
- type Func
- type Handle
- type Manager
- func (m *Manager) Add(t Task, opts ...Option) (Handle, error)
- func (m *Manager) Lookup(name string) (Handle, bool)
- func (m *Manager) MustAdd(t Task, opts ...Option) Handle
- func (m *Manager) Shutdown(ctx context.Context) error
- func (m *Manager) Snapshot() Snapshot
- func (m *Manager) Start(ctx context.Context) error
- func (m *Manager) Wait()
- type ManagerOption
- func WithManagerErrorHandler(h safego.ErrorHandler) ManagerOption
- func WithManagerOnRunFinish(fn func(info RunFinishInfo)) ManagerOption
- func WithManagerOnRunStart(fn func(info RunStartInfo)) ManagerOption
- func WithManagerPanicHandler(h safego.PanicHandler) ManagerOption
- func WithManagerReportContextCancel(report bool) ManagerOption
- type Option
- func WithErrorHandler(h safego.ErrorHandler) Option
- func WithEveryMode(mode EveryMode) Option
- func WithMaxConcurrent(n int) Option
- func WithName(name string) Option
- func WithOnRunFinish(fn func(info RunFinishInfo)) Option
- func WithOnRunStart(fn func(info RunStartInfo)) Option
- func WithOverlapPolicy(p OverlapPolicy) Option
- func WithPanicHandler(h safego.PanicHandler) Option
- func WithReportContextCancel(report bool) Option
- func WithStartImmediately(v bool) Option
- func WithTags(tags ...safego.Tag) Option
- type OverlapPolicy
- type RunFinishInfo
- type RunKind
- type RunStartInfo
- type Runner
- type Snapshot
- type State
- type Status
- type Task
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrAlreadyStarted is returned by Start when called more than once. ErrAlreadyStarted = errors.New("task: manager already started") // ErrClosed is returned when the manager is shutting down or already stopped. ErrClosed = errors.New("task: manager closed") // ErrNotRunning is returned when an operation requires a running manager. ErrNotRunning = errors.New("task: manager not running") // ErrSkipped indicates a run opportunity was dropped due to OverlapSkip. ErrSkipped = errors.New("task: trigger skipped") // ErrPanicked indicates a run panicked (panic is recovered and reported). ErrPanicked = errors.New("task: run panicked") // ErrInvalidName is returned by Add when a task name is invalid. // // Name rules: // - name is optional (empty means unnamed) // - non-empty name must match [A-Za-z0-9._-] // - name is normalized by strings.TrimSpace before validation ErrInvalidName = errors.New("task: invalid name") // ErrDuplicateName is returned by Add when a non-empty task name is already registered. // // Names are unique within a manager (after normalization). ErrDuplicateName = errors.New("task: duplicate name") )
Functions ¶
This section is empty.
Types ¶
type EveryMode ¶
type EveryMode int
EveryMode controls the scheduling semantics of an Every task.
const ( // EveryFixedDelay schedules the next run after a run finishes, then waits interval. EveryFixedDelay EveryMode = iota // EveryFixedRate schedules run opportunities aligned to a base time and interval. // It never "catches up" by emitting multiple missed ticks; it only schedules the next tick. EveryFixedRate )
type Func ¶
Func is the user-provided function executed by a task.
Returning a non-nil error is reported via the configured error handler (unless filtered), and may also be observed via TriggerAndWait.
type Handle ¶
type Handle interface {
// Name returns the configured name (may be empty).
Name() string
// Trigger requests a run opportunity. It is equivalent to calling TryTrigger and
// ignoring the return value.
Trigger()
// TryTrigger requests a run opportunity and reports whether it was accepted.
TryTrigger() bool
// TriggerAndWait requests a run opportunity and waits for its completion.
//
// Errors:
// - ErrNotRunning: Manager not started yet.
// - ErrClosed: Manager is shutting down or already stopped.
// - ErrSkipped: OverlapSkip drops this trigger due to concurrency.
// - ctx.Err(): ctx canceled/timeout while waiting.
//
// If the run panicked, TriggerAndWait returns ErrPanicked.
// If the run returned an error (and it was not filtered), that error is returned.
//
// If ctx is nil, it is treated as context.Background().
TriggerAndWait(ctx context.Context) error
// Status returns a snapshot of the task's current status.
Status() Status
}
Handle is a registered task handle.
Triggering before Start is a no-op (TryTrigger returns false). Triggering during/after Shutdown is a no-op (TryTrigger returns false).
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager coordinates tasks and their lifecycles.
It is safe for concurrent use.
The zero value is ready to use with default configuration. To apply ManagerOption (hooks/handlers), use NewManager.
Example (ShutdownWithoutStart) ¶
package main
import (
"context"
"fmt"
"github.com/evan-idocoding/zkit/rt/task"
)
func main() {
m := task.NewManager()
h := m.MustAdd(task.Trigger(func(context.Context) error { return nil }), task.WithName("job"))
_ = m.Shutdown(context.Background())
fmt.Println(h.Status().State)
}
Output: stopped
Example (Snapshot) ¶
package main
import (
"context"
"fmt"
"github.com/evan-idocoding/zkit/rt/task"
)
func main() {
m := task.NewManager()
h := m.MustAdd(task.Trigger(func(context.Context) error { return nil }), task.WithName("job"))
_ = m.Start(context.Background())
defer m.Shutdown(context.Background())
_ = h.TriggerAndWait(context.Background())
snap := m.Snapshot()
st, ok := snap.Get("job")
fmt.Println(ok, st.RunCount, st.SuccessCount, st.FailCount, st.CanceledCount)
}
Output: true 1 1 0 0
Example (TriggerAndWait) ¶
package main
import (
"context"
"fmt"
"github.com/evan-idocoding/zkit/rt/task"
)
func main() {
m := task.NewManager()
h := m.MustAdd(task.Trigger(func(context.Context) error {
fmt.Println("ran")
return nil
}), task.WithName("rebuild-index"))
_ = m.Start(context.Background())
defer m.Shutdown(context.Background())
_ = h.TriggerAndWait(context.Background())
}
Output: ran
func (*Manager) Add ¶
Add registers a task and returns its handle.
It can be called before or after Start. If called during/after Shutdown, it returns ErrClosed.
func (*Manager) Lookup ¶
Lookup finds a task handle by name.
Name is normalized by strings.TrimSpace. Empty names are not indexed and always return (nil, false). Lookup is safe for concurrent use.
func (*Manager) MustAdd ¶
MustAdd is like Add but panics on error.
It is intended for initialization-time wiring where an error indicates a programming/configuration mistake (for example, invalid name or duplicate name).
func (*Manager) Shutdown ¶
Shutdown stops scheduling new work, cancels task contexts, and waits for running tasks to finish.
Shutdown is safe to call multiple times. It is also safe to call without a prior Start; in that case, it marks tasks as stopped for observability and returns nil.
If ctx is nil, it is treated as context.Background().
type ManagerOption ¶
type ManagerOption func(*managerConfig)
func WithManagerErrorHandler ¶
func WithManagerErrorHandler(h safego.ErrorHandler) ManagerOption
WithManagerErrorHandler sets a default error handler for tasks added to this manager.
func WithManagerOnRunFinish ¶
func WithManagerOnRunFinish(fn func(info RunFinishInfo)) ManagerOption
WithManagerOnRunFinish sets a global hook for all tasks in this manager.
func WithManagerOnRunStart ¶
func WithManagerOnRunStart(fn func(info RunStartInfo)) ManagerOption
WithManagerOnRunStart sets a global hook for all tasks in this manager.
func WithManagerPanicHandler ¶
func WithManagerPanicHandler(h safego.PanicHandler) ManagerOption
WithManagerPanicHandler sets a default panic handler for tasks added to this manager.
func WithManagerReportContextCancel ¶
func WithManagerReportContextCancel(report bool) ManagerOption
WithManagerReportContextCancel sets the default reportContextCancel for tasks added to this manager.
type Option ¶
type Option func(*taskConfig)
func WithErrorHandler ¶
func WithErrorHandler(h safego.ErrorHandler) Option
WithErrorHandler sets the error handler. If not set, errors are reported to stderr by default.
func WithEveryMode ¶
WithEveryMode sets the scheduling mode for an Every task. Default is EveryFixedDelay.
func WithMaxConcurrent ¶
WithMaxConcurrent sets the max concurrent runs for a task.
If n <= 0, Add panics (configuration error).
func WithName ¶
WithName sets a human-friendly task name.
Notes:
- Name is optional (empty means unnamed).
- Name is normalized by strings.TrimSpace.
- Non-empty names must match [A-Za-z0-9._-].
- Non-empty names are unique within a Manager; Add returns ErrDuplicateName on duplicates.
func WithOnRunFinish ¶
func WithOnRunFinish(fn func(info RunFinishInfo)) Option
WithOnRunFinish sets a hook to observe run finishes. Hooks are called synchronously.
func WithOnRunStart ¶
func WithOnRunStart(fn func(info RunStartInfo)) Option
WithOnRunStart sets a hook to observe run starts. Hooks are called synchronously.
func WithOverlapPolicy ¶
func WithOverlapPolicy(p OverlapPolicy) Option
WithOverlapPolicy sets how run opportunities are handled under contention.
func WithPanicHandler ¶
func WithPanicHandler(h safego.PanicHandler) Option
WithPanicHandler sets the panic handler. If not set, panics are reported to stderr by default.
func WithReportContextCancel ¶
WithReportContextCancel controls whether context.Canceled and context.DeadlineExceeded are reported.
func WithStartImmediately ¶
WithStartImmediately controls whether an Every task runs immediately upon Start/Add. Default is false.
type OverlapPolicy ¶
type OverlapPolicy int
OverlapPolicy controls how overlapping run opportunities are handled.
const ( // OverlapSkip drops a run opportunity if max concurrency is reached. OverlapSkip OverlapPolicy = iota // OverlapMerge merges all overlapping run opportunities into a single pending run. OverlapMerge )
func (OverlapPolicy) String ¶
func (p OverlapPolicy) String() string
type RunFinishInfo ¶
type RunFinishInfo struct {
Name string
Tags []safego.Tag
Kind RunKind
ScheduledAt time.Time
StartedAt time.Time
FinishedAt time.Time
Duration time.Duration
Err string
Panicked bool
}
RunFinishInfo is passed to OnRunFinish hooks.
type RunStartInfo ¶
type RunStartInfo struct {
Name string
Tags []safego.Tag
Kind RunKind
ScheduledAt time.Time // non-zero for schedule-based runs (best-effort).
StartedAt time.Time
}
RunStartInfo is passed to OnRunStart hooks.
type Snapshot ¶
type Snapshot struct {
Tasks []Status
}
Snapshot is a point-in-time view of all tasks in a Manager.
type Status ¶
type Status struct {
Name string
Tags []safego.Tag
State State
Running int
// Pending is true when OverlapMerge has a pending run opportunity.
// Pending does not imply the task is currently running; check State/Running for that.
Pending bool
RunCount uint64
FailCount uint64
SuccessCount uint64
// CanceledCount counts context cancellation / deadline exceeded that is filtered by
// reportContextCancel=false (i.e. not reported, and not treated as success or failure).
CanceledCount uint64
LastStarted time.Time
LastFinished time.Time
LastSuccess time.Time
LastDuration time.Duration
// LastError is the most recent run error for this task *when the run failed*.
//
// Important semantics:
// - LastError is updated only when a run fails (non-nil error) or panics ("panic").
// - LastError is NOT cleared on success.
// - Therefore, LastError represents "last failure" rather than "last run error".
//
// Use LastFinished/LastSuccess and the counters (FailCount/SuccessCount/CanceledCount)
// to interpret recency and outcome.
LastError string
// NextRun is the next scheduled time (Every tasks only). Zero for Trigger tasks.
NextRun time.Time
}
Status is a task state snapshot.
type Task ¶
type Task interface {
// contains filtered or unexported methods
}
Task is a task definition that can be registered to a Manager.