jack

package module
v0.0.0-...-99da62e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 11, 2026 License: MIT Imports: 21 Imported by: 5

README

Jack

Production-grade concurrency toolkit for Go

Jack provides the missing pieces for building robust, observable concurrent systems. No magic, no reflection hacks—just solid patterns you'd otherwise write yourself.

Why This Exists

Go's concurrency primitives are excellent, but production systems need more:

  • Panic recovery that doesn't crash your entire process
  • Backpressure when queues fill up
  • Visibility into what your goroutines are actually doing
  • Graceful shutdown that finishes in-flight work
  • Health checks that degrade and accelerate automatically

Jack fills these gaps without getting in your way.

What's Inside

Pool

Fixed-size worker pool with backpressure. Tasks queue when workers are busy. Submissions fail fast when the queue is full.

pool := jack.NewPool(5, jack.PoolingWithQueueSize(100))
pool.Submit(jack.Func(func() error {
    // work
    return nil
}))
Future/Promise

Type-safe async computation with composition. Wait for results, chain transformations, recover from errors.

f := jack.Async(func() (string, error) {
    return fetchUser()
})

f.Then(ctx, func(user string) (any, error) {
    return fetchProfile(user)
}).Await()
Doctor

Health check scheduler that degrades and accelerates. Tracks consecutive failures, applies jitter, notifies observers.

doctor := jack.NewDoctor(jack.DoctorWithMaxConcurrent(10))
doctor.Add(jack.NewPatient(jack.PatientConfig{
    ID:          "database",
    Interval:    10 * time.Second,
    MaxFailures: 3,
    Check:       checkDB,
    OnStateChange: func(e jack.PatientEvent) {
        if e.State == jack.PatientFailed {
            triggerAlert(e.ID)
        }
    },
}))
Debouncer

Rate-limit rapid calls. Execute only after a quiet period or when thresholds are hit.

db := jack.NewDebouncer(
    jack.WithDebounceDelay(500*time.Millisecond),
    jack.WithDebounceMaxCalls(10),
)
db.Do(expensiveOperation)
Looper

Background task with exponential backoff and jitter. Perfect for reconciliation loops.

looper := jack.NewLooper(reconcile,
    jack.WithLooperInterval(5*time.Second),
    jack.WithLooperBackoff(true),
    jack.WithLooperMaxInterval(time.Minute),
)
looper.Start()
Shutdown

Graceful termination with signal handling. Register cleanup in LIFO order.

sd := jack.NewShutdown(jack.ShutdownWithTimeout(30*time.Second))
sd.Register(db.Close)
sd.Register(cache.Flush)
sd.Wait() // blocks until SIGTERM
Reaper

TTL expiration with min-heap and sharding.

reaper := jack.NewReaper(5*time.Minute,
    jack.ReaperWithHandler(func(ctx context.Context, id string) {
        cleanup(id)
    }),
)
reaper.Touch("session-123")
Lifetime

Scheduled callbacks with keep-alive resets.

lm := jack.NewLifetime()
lm.ScheduleTimed(ctx, "heartbeat", func(ctx context.Context, id string) {
    markDead(id)
}, 30*time.Second)
lm.ResetTimed("heartbeat") // extend on activity
Runner, Scheduler, Group

Single-worker queue, cron-style scheduling, and coordinated goroutine groups with error collection.

Safely

Context-aware mutex with panic recovery.

var mu jack.Safely
err := mu.SafeCtx(ctx, func() error {
    // critical section that respects context cancellation
    return nil
})

Observability

Every component emits events you can hook into:

obs := jack.NewObservable[jack.Event](10)
obs.Add(myObserver)

pool := jack.NewPool(5, jack.PoolingWithObservable(obs))

Doctor, Scheduler, and Looper have their own event types for metrics and alerting.

Error Handling

Panics become *jack.CaughtPanic with stack traces. No silent failures.

err := jack.Safe(func() error {
    panic("boom")
})
if cp, ok := err.(*jack.CaughtPanic); ok {
    log.Printf("panic: %v\n%s", cp.Value, cp.Stack)
}

When To Use What

Problem Use
Process many independent tasks concurrently Pool
Need result from async operation Future
Run periodic health checks with degradation Doctor
Rate-limit bursty calls Debouncer
Background loop with backoff Looper
Graceful shutdown with cleanup ordering Shutdown
Expire items after TTL Reaper
Schedule callbacks with keep-alive Lifetime
Coordinate multiple goroutines, collect errors Group
Sequential async processing Runner
Cron-style recurring tasks Scheduler
Safe locking with timeouts Safely

Testing

go test -v -race ./...

Race detector is your friend. Jack is race-free by design.

License

MIT

Documentation

Index

Constants

View Source
const (
	// NoLimitCalls is the default for maxCalls, meaning no limit on the number of calls before execution.
	// Set to math.MaxInt to indicate unbounded call accumulation.
	NoLimitCalls = math.MaxInt
	// NoLimitWait is the default for maxWait, meaning no time limit before forced execution.
	// Set to time.Duration(math.MaxInt64) to disable the maximum wait enforcement.
	NoLimitWait = time.Duration(math.MaxInt64)
)

Variables

View Source
var (
	ErrFutureCanceled = fmt.Errorf("future was canceled")
	ErrFutureTimeout  = fmt.Errorf("future timed out")
)
View Source
var (
	// ErrPoolClosed indicates the worker pool has been closed.
	ErrPoolClosed = errors.New("pool has been closed")
	// ErrRunnerClosed indicates the runner has been closed.
	ErrRunnerClosed = errors.New("runner has been closed")
	// ErrSchedulerClosed indicates the scheduler has been closed.
	ErrSchedulerClosed = errors.New("scheduler has been closed")
	// ErrTaskTimeout indicates a task exceeded its execution timeout.
	ErrTaskTimeout = errors.New("task execution timed out")
	// ErrTaskPanic indicates a task panicked during execution.
	ErrTaskPanic = errors.New("task panicked during execution")
	// ErrQueueFull indicates the task queue is full and cannot accept new tasks.
	ErrQueueFull = errors.New("task queue is full")
	// ErrShutdownTimedOut indicates the pool shutdown exceeded its timeout.
	ErrShutdownTimedOut = errors.New("shutdown timed out")
)

Errors returned by the worker pool and task execution.

View Source
var (
	ErrSchedulerJobAlreadyRunning = errors.New("scheduler: job is already running")
	ErrSchedulerNotRunning        = errors.New("scheduler: job is not running or already stopped")
	ErrSchedulerPoolNil           = errors.New("scheduler: pool cannot be nil")
	ErrSchedulerNameMissing       = errors.New("scheduler: name cannot be empty")
)

Functions

func Go

func Go(f func() error) <-chan error

Go runs a function in a standalone goroutine with error and panic handling. It returns a buffered channel that receives the function’s error or a CaughtPanic. The channel is closed after the error or completion. Thread-safe via goroutine and channel operations. Example:

errCh := Go(func() error { return nil }) // Runs a function and returns error channel

func Logger

func Logger() *ll.Logger

Logger returns the default logger for the jack package. Thread-safe as it returns a pre-initialized logger. Example:

log := jack.Logger() // Retrieves the default logger

func Safe

func Safe(fn func() error) error

Safe executes fn with panic recovery. If fn panics, a *CaughtPanic error is returned. Otherwise, the error returned by fn is returned. If fn is nil, Safe will return a *CaughtPanic. Example:

err := Safe(func() error {
	// May panic or return an error
	return nil
})
if cp, ok := err.(*CaughtPanic); ok {
	fmt.Printf("Caught panic: %v\nStack: %s\n", cp.Value, cp.Stack)
}

func SafeCtx

func SafeCtx(ctx context.Context, fn func() error) error

SafeCtx executes fn with context support and panic recovery. The function fn will be interrupted if the context is canceled (e.g., via context.WithTimeout). If fn is nil, SafeCtx will return a *CaughtPanic. If ctx is nil, SafeCtx will panic when ctx.Err() is called. Example:

ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
err := SafeCtx(ctx, func() error {
	// No need to check ctx
	time.Sleep(100 * time.Millisecond)
	return nil
})
// err will likely be context.DeadlineExceeded due to timeout

func SafeCtxNoStack

func SafeCtxNoStack(ctx context.Context, fn func() error) error

SafeCtxNoStack executes fn with context support, panic recovery, but no stack trace. The function fn will be interrupted if the context is canceled (e.g., via context.WithTimeout). If fn is nil, SafeCtxNoStack will return a *CaughtPanic (without stack). If ctx is nil, SafeCtxNoStack will panic when ctx.Err() is called. Example:

ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
err := SafeCtxNoStack(ctx, func() error {
	// No need to check ctx
	time.Sleep(100 * time.Millisecond)
	return nil
})
// err will likely be context.DeadlineExceeded due to timeout

func SafeNoStack

func SafeNoStack(fn func() error) error

SafeNoStack executes fn with panic recovery but no stack trace. If fn panics, a *CaughtPanic error (without stack) is returned. Otherwise, fn's error. If fn is nil, SafeNoStack will return a *CaughtPanic. Example:

err := SafeNoStack(func() error {
	panic("error")
	return nil
})
if cp, ok := err.(*CaughtPanic); ok {
	fmt.Printf("Caught panic: %v, Stack: %v\n", cp.Value, cp.Stack == nil)
}

func Select

func Select[T any](ctx context.Context, futures ...*Future[T]) (int, T, error)

func VitalsWithRun

func VitalsWithRun(ctx context.Context, id string, operation Func, opts ...VitalsOption) error

VitalsWithRun creates a Vitals instance and executes an operation with it in one call. It accepts functional options to configure hooks before immediately running the operation. This convenience function combines NewVitals and Execute for simplified inline usage.

func VitalsWithRunCtx

func VitalsWithRunCtx(ctx context.Context, id string, operation FuncCtx, opts ...VitalsOption) error

VitalsWithRunCtx creates a Vitals instance and executes a context-aware operation in one call. It accepts functional options to configure hooks before immediately running the operation with context. This convenience function combines NewVitals and ExecuteCtx for simplified inline usage.

func WaitAll

func WaitAll[T any](futures ...*Future[T]) ([]T, error)

func WaitAny

func WaitAny[T any](futures ...*Future[T]) (int, T, error)

Types

type Callback

type Callback func(id string)

Callback defines a callback function without error return for post-execution logic. It accepts a unique identifier string for the operation that completed. Use this type for notifications, cleanup, or metrics that do not affect flow control.

type CallbackCtx

type CallbackCtx func(ctx context.Context, id string)

CallbackCtx defines a context-aware callback function without error return. It accepts a context for cancellation and an identifier for the completed operation. Use this type when callbacks need to coordinate with context deadlines or values.

type Caller

type Caller struct {
	// contains filtered or unexported fields
}

Caller defines a group of calls that execute together

func NewCaller

func NewCaller(calls ...Func) *Caller

NewFuncGroup creates a new Caller

func (*Caller) Append

func (cg *Caller) Append(calls ...Func) *Caller

Append adds more calls to the group

func (*Caller) Execute

func (cg *Caller) Execute() Func

Same as run but iignore errors

func (*Caller) Func

func (cg *Caller) Func() Func

Func converts Caller to a Func

func (*Caller) Run

func (cg *Caller) Run() error

Run runs all calls in the group

type CaughtPanic

type CaughtPanic struct {
	Value interface{} // The value passed to panic()
	Stack []byte      // The stack trace (may be empty if not collected)
}

Package jack provides utilities for safe, context-aware function execution with mutex protection. It includes methods to execute functions with panic recovery, context cancellation support, and mutex locking, eliminating the need for verbose boilerplate when handling timeouts or cancellations. CaughtPanic represents a panic that was caught during execution.

func (*CaughtPanic) Error

func (c *CaughtPanic) Error() string

Error implements the error interface.

func (*CaughtPanic) String

func (c *CaughtPanic) String() string

String provides a formatted string representation of the panic.

func (*CaughtPanic) Unwrap

func (c *CaughtPanic) Unwrap() error

Unwrap provides compatibility with errors.Is/As.

type Cycle

type Cycle func(*Scheduling)

Cycle is a functional option type for configuring a Scheduler. It uses the functional options pattern to allow flexible and extensible configuration of scheduler settings.

func SchedulingWithObservable

func SchedulingWithObservable(obs Observable[Schedule]) Cycle

SchedulingWithObservable returns a Cycle to set an observable for events.

func SchedulingWithRetry

func SchedulingWithRetry(count int, backoff time.Duration) Cycle

SchedulingWithRetry returns a Cycle to configure retry attempts on queue full errors.

type Debouncer

type Debouncer struct {
	// contains filtered or unexported fields
}

Debouncer groups calls to a function, executing only after a period of inactivity or when certain thresholds (max calls or max wait) are met. It uses a mutex for thread-safety and timers for delay and maxWait enforcement.

func NewDebouncer

func NewDebouncer(options ...DebouncerOption) *Debouncer

NewDebouncer creates a new Debouncer instance configured with the given functional options. The WithDebounceDelay option is required; other options are optional with sensible defaults. Timers are initialized but stopped until first use; limits default to no restrictions.

func (*Debouncer) Cancel

func (d *Debouncer) Cancel()

Cancel prevents a pending debounced function from executing by clearing internal state. It stops all active timers and resets call counters without invoking the pending function. Safe to call multiple times or when no function is pending; has no effect if already closed.

func (*Debouncer) Do

func (d *Debouncer) Do(fn func())

Do schedules the given function to execute after the configured delay period of inactivity. Each call resets the delay timer and updates the pending function to the latest provided. If maxCalls or maxWait thresholds are met, execution happens immediately without waiting.

func (*Debouncer) Flush

func (d *Debouncer) Flush()

Flush executes any pending debounced function immediately without waiting for timers. It stops all active timers, executes the function in a new goroutine, and resets state. If no function is currently pending, this method does nothing and returns safely.

func (*Debouncer) IsPending

func (d *Debouncer) IsPending() bool

IsPending returns true if a debounced function is currently waiting to be executed. This method checks the internal call counter while holding the mutex for thread safety. Useful for monitoring debouncer state without triggering any execution or side effects.

func (*Debouncer) Stop

func (d *Debouncer) Stop()

Stop permanently shuts down the debouncer and prevents any future function executions. It sets the closed flag, clears pending functions, and stops all active timers immediately. After calling Stop, any subsequent Do calls will be ignored and no operations will proceed.

type DebouncerOption

type DebouncerOption func(*Debouncer)

DebouncerOption is a functional option for configuring the Debouncer. These options allow customization of delay, maximum calls, and maximum wait time. Use WithDebounceDelay (required), WithDebounceMaxCalls, and WithDebounceMaxWait to set behaviors.

func WithDebounceDelay

func WithDebounceDelay(d time.Duration) DebouncerOption

WithDebounceDelay sets the delay period after the last call before execution. This is a required option; without it, the debouncer will not function correctly. The delay determines the inactivity period needed to trigger the debounced function.

func WithDebounceMaxCalls

func WithDebounceMaxCalls(count int) DebouncerOption

WithDebounceMaxCalls sets the maximum number of calls before immediate execution. By default, there is no limit on call accumulation using NoLimitCalls constant. Once this threshold is reached, the function executes regardless of remaining delay.

func WithDebounceMaxWait

func WithDebounceMaxWait(limit time.Duration) DebouncerOption

WithDebounceMaxWait sets the maximum wait time before forced function execution. This check happens on each Do call, so total wait could be up to maxWait plus delay. A separate timer ensures execution after maxWait even if no new calls occur.

func WithDebouncerPool

func WithDebouncerPool(pool *Pool) DebouncerOption

WithDebouncerPool sets a custom pool for executing debounced functions asynchronously. If not provided via this option, the default package-level pool will be used instead. This allows control over goroutine scheduling and resource management for executions.

type Doctor

type Doctor struct {
	// contains filtered or unexported fields
}

Doctor monitors patients with global and per-patient metrics.

func NewDoctor

func NewDoctor(opts ...DoctorOption) *Doctor

NewDoctor creates and initializes a new Doctor instance with the provided configuration options. It sets up the patient priority queue, starts the scheduler goroutine, and creates a default pool if needed. The returned Doctor is immediately active and ready to accept patient registrations via Add.

func (*Doctor) Add

func (d *Doctor) Add(p *Patient) error

Add registers a new patient with the Doctor, validating its configuration and scheduling its first check. If a patient with the same ID already exists, it is replaced and the old instance is marked removed. Returns an error if the patient lacks a required Check function or if registration fails.

func (*Doctor) GetState

func (d *Doctor) GetState(id string) (PatientState, bool)

GetState retrieves the current health state of a patient by ID along with an existence indicator. If the patient is not registered, it returns PatientUnknown and false for the existence flag. This method provides thread-safe read access to patient state without modifying any internal data.

func (*Doctor) Metrics

func (d *Doctor) Metrics() *DoctorMetrics

Metrics returns a reference to the Doctor's global metrics struct for monitoring operational statistics. All metric fields use atomic operations, allowing safe concurrent reads without additional locking. Use this to expose health check statistics to dashboards, alerts, or external monitoring systems.

func (*Doctor) Remove

func (d *Doctor) Remove(id string) bool

Remove unregisters a patient by ID, marking it as removed and preventing future scheduled checks. The method returns true if the patient was found and successfully removed, false otherwise. Removed patients are cleaned from the scheduler queue on the next scheduling cycle.

func (*Doctor) SetDegraded

func (d *Doctor) SetDegraded(id string, degraded bool)

SetDegraded manually overrides a patient's degradation state, forcing degraded or healthy status. If the patient ID is not found, the method logs a warning and returns without making changes. Use this for operational control, such as forcing maintenance mode or acknowledging known issues.

func (*Doctor) Stop

func (d *Doctor) Stop(id string) bool

Stop halts a specific patient by ID, marking it removed and preventing any further health checks. It returns true if the patient was found and stopped, false if the patient did not exist. This method is safe to call multiple times and ensures clean removal from the scheduler.

func (*Doctor) StopAll

func (d *Doctor) StopAll(timeout time.Duration)

StopAll gracefully shuts down the Doctor, stopping all patients and the scheduler goroutine. It uses an atomic flag to ensure idempotency and waits for in-flight operations with the given timeout. After calling StopAll, the Doctor cannot be restarted and all patient registrations are cleared.

type DoctorMetrics

type DoctorMetrics struct {
	PatientsTotal     atomic.Int64
	ChecksTotal       atomic.Uint64
	ChecksHealthy     atomic.Uint64
	ChecksDegraded    atomic.Uint64
	ChecksFailed      atomic.Uint64
	StateChanges      atomic.Uint64
	AcceleratedChecks atomic.Uint64
	ManualDegraded    atomic.Uint64
	Recoveries        atomic.Uint64
	Timeouts          atomic.Uint64
	PanicsRecovered   atomic.Uint64
	PoolSubmits       atomic.Uint64
	PoolSubmitFails   atomic.Uint64
}

DoctorMetrics tracks global operational metrics across all patients.

type DoctorOption

type DoctorOption func(*Doctor)

func DoctorWithGlobalTimeout

func DoctorWithGlobalTimeout(t time.Duration) DoctorOption

DoctorWithGlobalTimeout configures the default timeout for patient checks lacking individual settings. If the provided duration is zero or negative, the option is ignored and the default applies. This ensures checks do not hang indefinitely and helps maintain responsive health monitoring.

func DoctorWithLogger

func DoctorWithLogger(l *ll.Logger) DoctorOption

DoctorWithLogger assigns a namespaced logger instance for structured Doctor operation logging. If the provided logger is nil, the option has no effect and the default logger remains active. Use this to integrate Doctor logs with your application's logging infrastructure and levels.

func DoctorWithMaxConcurrent

func DoctorWithMaxConcurrent(n int) DoctorOption

DoctorWithMaxConcurrent sets the maximum number of concurrent health checks the Doctor can run. If the provided value is zero or negative, the option is ignored and the default applies. This limit controls resource usage and prevents system overload during intensive monitoring.

func DoctorWithObservable

func DoctorWithObservable(obs Observable[PatientEvent]) DoctorOption

DoctorWithObservable attaches an event observer to receive patient state change notifications. The observable receives PatientEvent structs whenever a patient's health status updates. Use this for external monitoring, logging, or triggering downstream actions based on health.

func DoctorWithPool

func DoctorWithPool(pool *Pool) DoctorOption

DoctorWithPool configures the Doctor to use a custom worker pool for check execution. If the provided pool is nil, the option has no effect and default pool creation applies. Use this to control concurrency limits and resource sharing across multiple Doctors.

func DoctorWithVerbose

func DoctorWithVerbose(v bool) DoctorOption

DoctorWithVerbose enables or disables detailed debug logging for Doctor operations and checks. When enabled, additional context like durations and errors are logged for troubleshooting. Use this during development or debugging, and disable in production for reduced log volume.

type Event

type Event struct {
	Type     string        // Type of event: "queued", "run", "done"
	TaskID   string        // Optional task identifier
	WorkerID string        // Identifier of the worker processing the task
	Time     time.Time     // Time of the event
	Duration time.Duration // Duration of task execution (for "done" events)
	Err      error         // Error, if any (for "done" events)
}

Event captures details of task execution for observability. Thread-safe for use in notification across goroutines.

type Func

type Func func() error

Func converts a function returning an error into a Task. Example:

pool.Submit(jack.Func(func() error { return nil })) // Submits a simple task

func (Func) Do

func (f Func) Do() error

Do executes the function to satisfy the Task interface.

type FuncCtx

type FuncCtx func(ctx context.Context) error

FuncCtx converts a context-aware function into a TaskCtx. Example:

pool.SubmitCtx(ctx, jack.FuncCtx(func(ctx context.Context) error { return nil })) // Submits a context-aware task

func (FuncCtx) Do

func (f FuncCtx) Do(ctx context.Context) error

Do executes the function with the given context to satisfy the TaskCtx interface.

type Future

type Future[T any] struct {
	// contains filtered or unexported fields
}

func Async

func Async[T any](fn func() (T, error)) *Future[T]

func AsyncWithContext

func AsyncWithContext[T any](ctx context.Context, fn func(context.Context) (T, error)) *Future[T]

func NewFuture

func NewFuture[T any](ctx context.Context, fn func(context.Context) (T, error)) *Future[T]

func (*Future[T]) Await

func (f *Future[T]) Await() (T, error)

func (*Future[T]) AwaitWithContext

func (f *Future[T]) AwaitWithContext(ctx context.Context) (T, error)

func (*Future[T]) AwaitWithTimeout

func (f *Future[T]) AwaitWithTimeout(timeout time.Duration) (T, error)

func (*Future[T]) Cancel

func (f *Future[T]) Cancel() bool

func (*Future[T]) IsCanceled

func (f *Future[T]) IsCanceled() bool

func (*Future[T]) IsDone

func (f *Future[T]) IsDone() bool

func (*Future[T]) Recover

func (f *Future[T]) Recover(ctx context.Context, fn func(error) (T, error)) *Future[T]

func (*Future[T]) Then

func (f *Future[T]) Then(ctx context.Context, fn func(T) (any, error)) *Future[any]

type Group

type Group struct {
	// contains filtered or unexported fields
}

Group coordinates concurrent execution of functions with error and panic handling. It supports context cancellation and worker limits, collecting errors in a channel. Thread-safe via wait group, mutex, and channel operations.

func NewGroup

func NewGroup() *Group

NewGroup creates a new Group for running functions concurrently. It initializes an error channel with a buffer of 1. Thread-safe via initialization. Example:

group := NewGroup() // Creates a new group

func (*Group) Errors

func (g *Group) Errors() <-chan error

Errors returns a channel that receives the first error from Go or GoCtx goroutines. The channel has a buffer of 1 and is closed by Wait. Thread-safe via channel operations. Example:

for err := range group.Errors() { ... } // Reads errors

func (*Group) Go

func (g *Group) Go(f func() error)

Go runs a function in a new goroutine with error and panic handling. The first non-nil error or panic is sent to the Errors channel. If a context is set, it is cancelled on the first error. Thread-safe via wait group and doWork. Example:

group.Go(func() error { return nil }) // Runs a function

func (*Group) GoCtx

func (g *Group) GoCtx(f func(context.Context) error)

GoCtx runs a context-aware function in a new goroutine. It requires a context set via WithContext and passes it to the function. The first error, panic, or context cancellation is sent to the Errors channel, and the context is cancelled. Thread-safe via wait group and doWork. Example:

group.WithContext(ctx).GoCtx(func(ctx context.Context) error { return nil }) // Runs a context-aware function

func (*Group) Wait

func (g *Group) Wait()

Wait blocks until all Go and GoCtx goroutines complete. It closes the Errors channel and cancels the Group’s context, if set. Thread-safe via wait group and once. Example:

group.Wait() // Waits for all goroutines

func (*Group) WithContext

func (g *Group) WithContext(ctx context.Context) *Group

WithContext associates a context with the Group, enabling cancellation. It cancels any previous context and creates a new cancellable context. Thread-safe via context operations. Example:

group.WithContext(ctx) // Sets group context

func (*Group) WithLimit

func (g *Group) WithLimit(n int) *Group

WithLimit sets the maximum number of concurrent workers in the Group. Non-positive values remove the limit. Thread-safe via semaphore initialization. Example:

group.WithLimit(5) // Limits to 5 concurrent workers

type Hook

type Hook func(id string) error

Hook defines a lifecycle hook function that returns an error. It accepts a unique identifier string for the operation being tracked. Use this type for pre-execution validation or setup logic that may fail.

type HookCtx

type HookCtx func(ctx context.Context, id string) error

HookCtx defines a context-aware lifecycle hook function that returns an error. It accepts a context for cancellation and an identifier for the tracked operation. Use this type when hooks need to respect timeouts or external cancellation signals.

type Identifiable

type Identifiable interface {
	// ID returns a unique identifier for the task.
	ID() string
}

Identifiable is an optional interface for tasks to provide a custom ID for logging, metrics, and tracing.

type Lifetime

type Lifetime struct {
	// contains filtered or unexported fields
}

func NewLifetime

func NewLifetime(opts ...LifetimeOption) *Lifetime

NewLifetime creates and initializes a new Lifetime manager with the provided configuration options. It spawns one pruning goroutine per shard to handle timer expiration with minimal lock contention. The returned Lifetime is immediately active and ready to schedule timed callbacks via ScheduleTimed.

func (*Lifetime) CancelTimed

func (lm *Lifetime) CancelTimed(id string) bool

CancelTimed removes a scheduled timer by ID, preventing its callback from executing. It returns true if the timer was found and successfully cancelled, false if none existed. This method is safe to call multiple times and has no effect if the timer already expired.

func (*Lifetime) ExecuteCtxWithLifetime

func (lm *Lifetime) ExecuteCtxWithLifetime(ctx context.Context, id string, lifetime *Vitals, operation FuncCtx) error

ExecuteCtxWithLifetime runs a context-aware operation with Vitals hooks and schedules timed callback on success. If the operation succeeds and a timed callback is configured, it is scheduled for later execution. This method combines immediate context-aware execution with deferred lifecycle callback management.

func (*Lifetime) ExecuteWithLifetime

func (lm *Lifetime) ExecuteWithLifetime(ctx context.Context, id string, lifetime *Vitals, operation Func) error

ExecuteWithLifetime runs an operation with Vitals hooks and schedules a timed callback on success. If the operation succeeds and a timed callback is configured, it is scheduled for later execution. This method combines immediate operation execution with deferred lifecycle callback management.

func (*Lifetime) GetRemainingDuration

func (lm *Lifetime) GetRemainingDuration(id string) (time.Duration, bool)

GetRemainingDuration returns the time remaining until a scheduled timer expires for the given ID. It returns the duration and true if the timer exists and has not yet expired, otherwise zero and false. This method is useful for displaying countdowns or making scheduling decisions based on timer state.

func (*Lifetime) HasPending

func (lm *Lifetime) HasPending(id string) bool

HasPending checks whether a timer is currently scheduled for the given ID. It returns true if the ID exists in any shard's queue, false otherwise. This method provides thread-safe read access without modifying internal state.

func (*Lifetime) Metrics

func (lm *Lifetime) Metrics() *LifetimeMetrics

Metrics returns a reference to the Lifetime manager's metrics struct for monitoring operational statistics. All metric fields use atomic operations, allowing safe concurrent reads without additional locking. Use this to expose timer scheduling statistics to dashboards, alerts, or external monitoring systems.

func (*Lifetime) PendingCount

func (lm *Lifetime) PendingCount() int

PendingCount returns the total number of timers currently scheduled across all shards. It iterates through each shard while holding locks to ensure an accurate snapshot count. Use this for monitoring queue depth or implementing backpressure in high-load scenarios.

func (*Lifetime) ResetTimed

func (lm *Lifetime) ResetTimed(id string) bool

ResetTimed extends the expiration time of an existing timer by its original duration from now. It returns true if the timer was found and successfully reset, false if no timer existed for the ID. This method is useful for implementing keep-alive or activity-based timeout extension patterns.

func (*Lifetime) ScheduleLifetimeTimed

func (lm *Lifetime) ScheduleLifetimeTimed(ctx context.Context, id string, lifetime *Vitals)

ScheduleLifetimeTimed schedules a timed callback using configuration from a Vitals instance. If the Vitals or its Timed callback is nil, it resets any existing timer for the ID instead. This convenience method integrates Lifetime timer management with Vitals lifecycle configuration.

func (*Lifetime) ScheduleTimed

func (lm *Lifetime) ScheduleTimed(ctx context.Context, id string, callback CallbackCtx, wait time.Duration)

ScheduleTimed registers a callback to execute after the specified wait duration for a given ID. If a timer already exists for the ID, it is replaced with the new callback and expiration time. The method uses sharding for concurrency and signals the prune loop to re-evaluate scheduling.

func (*Lifetime) Stop

func (lm *Lifetime) Stop(ids ...string)

Stop cancels timers for the specified IDs, or all timers if no IDs are provided. It groups IDs by shard for efficient batch removal and signals prune loops to re-evaluate. This method provides a convenient way to clean up multiple timers without individual CancelTimed calls.

func (*Lifetime) StopAll

func (lm *Lifetime) StopAll()

StopAll gracefully shuts down the Lifetime manager, cancelling all pending timers and goroutines. It signals the context to stop prune loops, waits for them to exit, then clears all shard state. After calling StopAll, the Lifetime instance cannot be reused and all scheduled callbacks are discarded.

type LifetimeMetrics

type LifetimeMetrics struct {
	ScheduleCalls   atomic.Uint64
	ResetCalls      atomic.Uint64
	CancelCalls     atomic.Uint64
	ExecuteCalls    atomic.Uint64
	ExpiredTotal    atomic.Uint64
	ExpiredHandled  atomic.Uint64
	ExpiredMissed   atomic.Uint64
	ExpiredErrors   atomic.Uint64
	ActiveTimers    atomic.Int64
	MaxActiveTimers atomic.Int64
	TotalTimersSeen atomic.Uint64
	AvgExpirationMs atomic.Int64
	MinExpirationMs atomic.Int64
	MaxExpirationMs atomic.Int64
	LoopIterations  atomic.Uint64
	SignalsSent     atomic.Uint64
	StopsReceived   atomic.Uint64
}

LifetimeMetrics tracks operational metrics for the lifetime manager.

type LifetimeOption

type LifetimeOption func(*Lifetime)

func LifetimeWithLogger

func LifetimeWithLogger(l *ll.Logger) LifetimeOption

LifetimeWithLogger assigns a namespaced logger instance for structured Lifetime operation logging. If the provided logger is nil, the option has no effect and the default logger remains active. Use this to integrate Lifetime logs with your application's logging infrastructure and levels.

func LifetimeWithMinTick

func LifetimeWithMinTick(tick time.Duration) LifetimeOption

LifetimeWithMinTick sets the minimum polling interval for the expiration check loop. If the provided duration is zero or negative, the option is ignored and default applies. Use this to balance responsiveness against CPU usage when processing expired timers.

func LifetimeWithShards

func LifetimeWithShards(count uint32) LifetimeOption

LifetimeWithShards configures the number of shards for concurrent timer management. The count must be a power of two for efficient bitwise modulo hashing distribution. Use higher shard counts to reduce lock contention under heavy timer scheduling load.

func LifetimeWithTimerLimit

func LifetimeWithTimerLimit(limit time.Duration) LifetimeOption

LifetimeWithTimerLimit configures the maximum execution time allowed for timer callbacks. If the provided duration is zero or negative, the option is ignored and default applies. This prevents long-running callbacks from blocking the expiration processing loop.

type LoopConfig

type LoopConfig struct {
	Name        string
	Interval    time.Duration
	Jitter      float64
	Backoff     bool
	MaxInterval time.Duration
	MinInterval time.Duration
	Immediate   bool
	Context     context.Context
	Logger      *ll.Logger
}

type Looper

type Looper struct {
	// contains filtered or unexported fields
}

func NewLooper

func NewLooper(task Func, opts ...LooperOption) *Looper

NewLooper creates a new Looper with full metrics.

func (*Looper) CurrentInterval

func (l *Looper) CurrentInterval() time.Duration

func (*Looper) FailureCount

func (l *Looper) FailureCount() uint64

func (*Looper) IsRunning

func (l *Looper) IsRunning() bool

func (*Looper) Metrics

func (l *Looper) Metrics() *LooperMetrics

Metrics returns the looper's metrics.

func (*Looper) ResetInterval

func (l *Looper) ResetInterval()

func (*Looper) SetInterval

func (l *Looper) SetInterval(d time.Duration)

func (*Looper) Start

func (l *Looper) Start()

func (*Looper) Stop

func (l *Looper) Stop()

type LooperMetrics

type LooperMetrics struct {
	Executions      atomic.Uint64
	Failures        atomic.Uint64
	Successes       atomic.Uint64
	BackoffEvents   atomic.Uint64
	IntervalChanges atomic.Uint64
	PanicsRecovered atomic.Uint64
	ContextCancels  atomic.Uint64
	Timeouts        atomic.Uint64

	LastRun   atomic.Value
	LastError atomic.Value
	LastStart atomic.Int64
	LastEnd   atomic.Int64

	TotalDurationNs atomic.Int64
	MinDurationNs   atomic.Int64
	MaxDurationNs   atomic.Int64

	CurrentIntervalNs atomic.Int64
	CurrentBackoffNs  atomic.Int64
}

LooperMetrics tracks comprehensive operational metrics.

type LooperOption

type LooperOption func(*LoopConfig)

func WithLooperBackoff

func WithLooperBackoff(enabled bool) LooperOption

func WithLooperContext

func WithLooperContext(ctx context.Context) LooperOption

func WithLooperImmediate

func WithLooperImmediate(immediate bool) LooperOption

func WithLooperInterval

func WithLooperInterval(interval time.Duration) LooperOption

func WithLooperJitter

func WithLooperJitter(jitter float64) LooperOption

func WithLooperLogger

func WithLooperLogger(logger *ll.Logger) LooperOption

func WithLooperMaxInterval

func WithLooperMaxInterval(max time.Duration) LooperOption

func WithLooperMinInterval

func WithLooperMinInterval(min time.Duration) LooperOption

func WithLooperName

func WithLooperName(name string) LooperOption

type Observable

type Observable[T any] interface {
	// Add registers one or more observers to receive notifications.
	Add(observers ...Observer[T])
	// Remove unregisters one or more observers.
	Remove(observers ...Observer[T])
	// Notify sends one or more events to all registered observers.
	Notify(events ...T)
	// Shutdown stops the observable and waits for all notifications to complete.
	Shutdown()
}

Observable defines the interface for objects that can be observed by multiple Observer instances. It supports adding and removing observers, notifying them of one or more events, and shutting down cleanly. The type parameter T matches the event type for observers. Example:

obs := NewObservable[string]()
logObserver := &LogObserver{}
obs.Add(logObserver)
obs.Notify("Task completed")
obs.Shutdown()

func NewObservable

func NewObservable[T any](numNotifyWorkers ...int) Observable[T]

NewObservable creates and returns a new Observable instance. It optionally accepts the number of worker goroutines for asynchronous notifications (defaults to 5 if not provided or invalid). Workers process notifications in parallel to avoid blocking the notifier, with panic recovery for observer errors. Parameters: numNotifyWorkers: Optional number of worker goroutines (positive integer). If not provided or invalid, defaults to 5.

Returns: Observable[T]: A new Observable instance ready to accept observers and events.

Example:

// Create an observable with 2 workers
obs := NewObservable[string](2)
logObserver := &LogObserver{}
obs.Add(logObserver)
obs.Notify("Event 1") // Sends "Event 1" to logObserver asynchronously

type Observer

type Observer[T any] interface {
	OnNotify(value T) // Called when an event of type T is received
}

Observer defines the interface for objects that wish to receive notifications from an Observable. The type parameter T specifies the type of events or values that will be notified. Implementors must provide an OnNotify method to handle incoming events. Example:

type LogObserver struct {}
func (l *LogObserver) OnNotify(event string) {
	fmt.Printf("Received event: %s\n", event)
}

type Patient

type Patient struct {
	// contains filtered or unexported fields
}

Patient holds runtime state for one monitored entity with its own metrics.

func NewPatient

func NewPatient(cfg PatientConfig) *Patient

NewPatient creates a Patient with its own metrics instance.

func (*Patient) Metrics

func (p *Patient) Metrics() *PatientMetrics

Metrics returns the patient's individual metrics.

func (*Patient) Remove

func (p *Patient) Remove()

Remove signals that this patient has been removed from monitoring. It invokes the OnRemove callback if configured.

type PatientConfig

type PatientConfig struct {
	ID            string
	Interval      time.Duration
	Jitter        float64
	Timeout       time.Duration
	Accelerated   time.Duration
	MaxFailures   uint64
	Check         FuncCtx
	OnStart       FuncCtx
	OnComplete    FuncCtx
	OnError       FuncCtx
	OnTimeout     FuncCtx
	OnRecover     FuncCtx
	OnRemove      func()
	OnStateChange func(PatientEvent)
}

PatientConfig defines monitoring parameters for a single patient.

type PatientEvent

type PatientEvent struct {
	ID        string
	State     PatientState
	LastCheck time.Time
	Duration  time.Duration
	Error     error
	Meta      map[string]any
}

PatientEvent is emitted for observability when a patient's state changes.

type PatientMetrics

type PatientMetrics struct {
	ChecksTotal     atomic.Uint64
	ChecksHealthy   atomic.Uint64
	ChecksDegraded  atomic.Uint64
	ChecksFailed    atomic.Uint64
	StateChanges    atomic.Uint64
	Recoveries      atomic.Uint64
	Timeouts        atomic.Uint64
	PanicsRecovered atomic.Uint64
	ConsecFailures  atomic.Int64
	ConsecSuccesses atomic.Int64
	LastCheckMs     atomic.Int64
	AvgCheckMs      atomic.Int64
	LastFailureMs   atomic.Int64
	LastRecoveryMs  atomic.Int64
}

PatientMetrics tracks per-patient operational metrics.

type PatientState

type PatientState int

PatientState represents the health state of a monitored patient.

const (
	PatientUnknown PatientState = iota
	PatientHealthy
	PatientDegraded
	PatientFailed
)

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool manages a fixed number of worker goroutines to execute tasks concurrently. It supports task submission with or without context, shutdown with timeout, and observability. The pool uses a channel for task queuing and a wait group for graceful shutdown.

func NewPool

func NewPool(numWorkers int, opts ...Pooling) *Pool

NewPool creates a new pool with the specified number of workers and optional configurations. Ensures at least one worker; initializes task channel, observable, and logger. Starts all workers immediately and returns the ready pool instance.

func (*Pool) Do

func (p *Pool) Do(fn func())

Do is a shorthand for pool.Submit(Func(...)) but discards any returned error

func (*Pool) DoCtx

func (p *Pool) DoCtx(ctx context.Context, fn func(ctx context.Context))

DoCtx is a shorthand for pool.SubmitCtx(FuncCtx(...)) but discards any returned error

func (*Pool) IsClosed

func (p *Pool) IsClosed() bool

IsClosed returns true if the pool has been closed

func (*Pool) Logger

func (p *Pool) Logger(extLogger *ll.Logger) *Pool

Logger sets a custom logger for the pool, namespacing it as "pool". If the provided logger is nil, it retains the existing logger. Returns the pool for method chaining.

func (*Pool) QueueSize

func (p *Pool) QueueSize() int

QueueSize returns the current number of pending tasks in the queue. Useful for monitoring pool load and backpressure. Thread-safe due to channel len being atomic.

func (*Pool) Shutdown

func (p *Pool) Shutdown(timeout time.Duration) error

Shutdown gracefully stops the pool, closing the task channel and waiting for workers with a timeout. Idempotent; logs shutdown process and returns timeout error if workers don't finish in time. Ensures no new tasks are accepted after initiation.

func (*Pool) Submit

func (p *Pool) Submit(ts ...Task) error

Submit enqueues one or more tasks to the pool for execution without context. Checks if pool is closed; returns error for nil tasks or full queue. Notifies observable of queued events and logs submission details.

func (*Pool) SubmitCtx

func (p *Pool) SubmitCtx(ctx context.Context, ts ...TaskCtx) error

SubmitCtx enqueues one or more context-aware tasks to the pool. First checks parent context; then pool closure; handles nil tasks and context cancellation during submission. Notifies observable and logs; returns errors for closure, nil tasks, or queue issues.

func (*Pool) Workers

func (p *Pool) Workers() int

Workers returns the number of worker goroutines configured in the pool. This is fixed at creation and does not change dynamically. Helpful for querying pool capacity.

type Pooling

type Pooling func(*poolingOpt)

Pooling is a functional option type for configuring the pool during creation. It allows setting observable, queue size, ID generator, etc., in a flexible manner. Multiple options can be passed to NewPool for combined configuration.

func PoolingWithIDGenerator

func PoolingWithIDGenerator(fn func(interface{}) string) Pooling

PoolingWithIDGenerator sets a custom task ID generator function in the pool options. The function takes the task interface and returns a unique string ID. Defaults to a built-in generator if not provided.

func PoolingWithObservable

func PoolingWithObservable(obs Observable[Event]) Pooling

PoolingWithObservable sets an observable for event notifications in the pool options. The observable will receive events like queued, run, done for tasks. Useful for monitoring and logging pool activities externally.

func PoolingWithQueueSize

func PoolingWithQueueSize(size int) Pooling

PoolingWithQueueSize sets the task queue size in the pool options. If size is negative, it is ignored and defaults to twice the number of workers. A larger queue allows more pending tasks but may increase memory usage.

type Reaper

type Reaper struct {
	// contains filtered or unexported fields
}

func NewReaper

func NewReaper(ttl time.Duration, opts ...ReaperOption) *Reaper

func (*Reaper) Clear

func (r *Reaper) Clear() int

func (*Reaper) Count

func (r *Reaper) Count() int

func (*Reaper) Deadline

func (r *Reaper) Deadline() (time.Time, bool)

func (*Reaper) Metrics

func (r *Reaper) Metrics() *ReaperMetrics

func (*Reaper) Register

func (r *Reaper) Register(h ReaperHandler)

Register sets the handler for expired tasks (backward compatible).

func (*Reaper) Remove

func (r *Reaper) Remove(id string) bool

func (*Reaper) Start

func (r *Reaper) Start()

Start is a no-op for backward compatibility. Reaper starts automatically now.

func (*Reaper) Stop

func (r *Reaper) Stop()

func (*Reaper) Touch

func (r *Reaper) Touch(id string)

func (*Reaper) TouchAt

func (r *Reaper) TouchAt(id string, deadline time.Time)

type ReaperHandler

type ReaperHandler func(context.Context, string)

type ReaperMetrics

type ReaperMetrics struct {
	TouchCalls      atomic.Uint64
	TouchAtCalls    atomic.Uint64
	RemoveCalls     atomic.Uint64
	ClearCalls      atomic.Uint64
	ExpiredTotal    atomic.Uint64
	ExpiredHandled  atomic.Uint64
	ExpiredMissed   atomic.Uint64
	ExpiredErrors   atomic.Uint64
	ActiveTasks     atomic.Int64
	MaxActiveTasks  atomic.Int64
	TotalTasksSeen  atomic.Uint64
	AvgExpirationMs atomic.Int64
	MinExpirationMs atomic.Int64
	MaxExpirationMs atomic.Int64
	LoopIterations  atomic.Uint64
	SignalsSent     atomic.Uint64
	StopsReceived   atomic.Uint64
}

type ReaperOption

type ReaperOption func(*Reaper)

func ReaperWithHandler

func ReaperWithHandler(h ReaperHandler) ReaperOption

func ReaperWithLogger

func ReaperWithLogger(l *ll.Logger) ReaperOption

func ReaperWithMinTick

func ReaperWithMinTick(tick time.Duration) ReaperOption

func ReaperWithShards

func ReaperWithShards(count uint32) ReaperOption

func ReaperWithTimerLimit

func ReaperWithTimerLimit(limit time.Duration) ReaperOption

type ReaperTask

type ReaperTask struct {
	ID       string
	Deadline time.Time
	// contains filtered or unexported fields
}

type Routine

type Routine struct {
	Interval time.Duration // Interval between task executions
	MaxRuns  int           // Maximum number of runs (0 for unlimited)
	Cron     string        // Cron expression (e.g., "0 0 * * *" for daily at midnight, "@every 1m")
}

Routine configures recurring task execution with an interval and maximum runs. Thread-safe for use in scheduling configurations.

type Runner

type Runner struct {
	// contains filtered or unexported fields
}

Runner executes tasks asynchronously using a buffered task queue. It supports observability and logging, processing tasks in a single goroutine. Thread-safe via mutex, wait group, and channel operations.

func NewRunner

func NewRunner(opts ...RunnerOption) *Runner

NewRunner creates a new Runner with the specified options. It initializes a task queue and starts a processing goroutine. Thread-safe via initialization and logger namespace. Example:

runner := NewRunner(WithRunnerQueueSize(10), WithRunnerObservable(obs)) // Creates runner with queue size 10

func (*Runner) Do

func (r *Runner) Do(t Task) error

Do submits a Task for execution in the runner’s queue. It returns an error if the task is nil or the runner is closed. Thread-safe via submit and channel operations. Example:

runner.Do(myTask) // Submits a task

func (*Runner) DoCtx

func (r *Runner) DoCtx(ctx context.Context, t TaskCtx) error

DoCtx submits a TaskCtx for execution with the given context. It returns an error if the context or task is nil, the context is done, or the runner is closed. Thread-safe via submit and channel operations. Example:

runner.DoCtx(ctx, myTaskCtx) // Submits a context-aware task

func (*Runner) QueueSize

func (r *Runner) QueueSize() int

QueueSize returns the current number of tasks in the queue. Thread-safe via channel length access. Example:

size := runner.QueueSize() // Gets current queue size

func (*Runner) Shutdown

func (r *Runner) Shutdown(timeout time.Duration) error

Shutdown closes the task queue and waits for the processing goroutine to finish. It returns an error if the shutdown times out. Thread-safe via mutex, once, and wait group. Example:

runner.Shutdown(5 * time.Second) // Shuts down runner with 5-second timeout

type RunnerOption

type RunnerOption func(*runnerOptions)

RunnerOption configures a Runner during creation.

func WithRunnerIDGenerator

func WithRunnerIDGenerator(fn func(interface{}) string) RunnerOption

WithRunnerIDGenerator sets the task ID generator function. Example:

runner := NewRunner(WithRunnerIDGenerator(customIDFunc)) // Sets custom ID generator

func WithRunnerObservable

func WithRunnerObservable(obs Observable[Event]) RunnerOption

WithRunnerObservable sets the observable for task events. Example:

runner := NewRunner(WithRunnerObservable(obs)) // Configures observable

func WithRunnerQueueSize

func WithRunnerQueueSize(size int) RunnerOption

WithRunnerQueueSize sets the task queue size. Non-positive values are ignored. Example:

runner := NewRunner(WithRunnerQueueSize(20)) // Sets queue size to 20

type Safely

type Safely struct {
	sync.Mutex
}

Safely wraps sync.Mutex with safe execution methods.

func (*Safely) Do

func (m *Safely) Do(fn func())

Do executes fn while holding the mutex lock. The function is guaranteed to run with exclusive access, ensuring thread safety. If fn is nil, Do will panic. Example:

var mu Safely
mu.Do(func() {
	// Critical section
	fmt.Println("Safe execution")
})

func (*Safely) DoCtx

func (m *Safely) DoCtx(ctx context.Context, fn func() error) error

DoCtx executes fn while holding the mutex lock, with context support. The lock is acquired, then the context is checked. If the context is done, its error is returned and fn is not executed. Otherwise, fn is executed in a goroutine, allowing it to be interrupted if the context is canceled or times out (e.g., via context.WithTimeout). If fn is nil, DoCtx will return a *CaughtPanic. If ctx is nil, DoCtx will panic when ctx.Err() is called. This method does not recover from panics in fn; use SafeCtx for panic recovery. Example:

ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
err := mu.DoCtx(ctx, func() error {
	// Critical section, no need to check ctx
	time.Sleep(100 * time.Millisecond)
	return nil
})
// err will likely be context.DeadlineExceeded due to timeout

func (*Safely) Safe

func (m *Safely) Safe(fn func() error) error

Safe executes fn while holding the mutex lock, with panic recovery. If fn panics, a *CaughtPanic error is returned. Otherwise, the error returned by fn is returned. If fn is nil, Safe will return a *CaughtPanic. Example:

var mu Safely
err := mu.Safe(func() error {
	// Critical section
	return nil
})
if err != nil {
	fmt.Println("Error:", err)
}

func (*Safely) SafeCtx

func (m *Safely) SafeCtx(ctx context.Context, fn func() error) error

SafeCtx executes fn with panic recovery and context awareness while holding the mutex lock. The lock is acquired, then fn is executed via the standalone SafeCtx function. This ensures fn can be canceled by the context (e.g., via context.WithTimeout), and any panics are recovered. The lock is held until fn completes or the context is canceled. If fn is nil, SafeCtx will return a *CaughtPanic. If ctx is nil, SafeCtx will panic when ctx.Err() is called. Example:

ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
err := mu.SafeCtx(ctx, func() error {
	// Critical section, no need to check ctx
	time.Sleep(100 * time.Millisecond)
	return nil
})
// err will likely be context.DeadlineExceeded due to timeout

type Schedule

type Schedule struct {
	Type     string    // Type of event (e.g., "task_submitted", "task_submission_failed", "stopped")
	Name     string    // Name of the scheduler emitting the event
	TaskID   string    // Unique identifier for the task
	TaskType string    // Type of the task (e.g., struct name or type description)
	Message  string    // Descriptive message providing context for the event
	Error    error     // Any error associated with the event, if applicable
	Routine  Routine   // Configuration of the scheduling routine (e.g., interval, max runs)
	Time     time.Time // Timestamp when the event occurred
	NextRun  time.Time // Scheduled time for the next task execution, if applicable
}

Schedule represents an event emitted by the scheduler for observability.

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler manages periodic or limited task submissions to a Pool. It supports both interval-based and cron expression-based scheduling.

func NewScheduler

func NewScheduler(name string, pool *Pool, schedule Routine, opts ...Cycle) (*Scheduler, error)

NewScheduler creates a new Scheduler instance.

func (*Scheduler) Do

func (s *Scheduler) Do(ts ...Task) error

Do starts scheduling for the provided non-context-aware tasks. It initializes the scheduler to execute the given tasks according to the configured routine (e.g., interval or max runs). The method ensures thread-safety by locking the scheduler state and checks if the scheduler is already running to prevent duplicate executions. Each task is executed in its own goroutine, and the method emits a "started" event for observability.

func (*Scheduler) DoCtx

func (s *Scheduler) DoCtx(taskExecCtx context.Context, ts ...TaskCtx) error

DoCtx starts scheduling for context-aware tasks. Similar to Do, but designed for tasks that accept a context for cancellation or deadlines. It ensures thread-safety, checks for existing runs, and associates a task execution context with the tasks. Each task runs in its own goroutine, and a "started" event is emitted for observability.

func (*Scheduler) Entries

func (s *Scheduler) Entries() []cron.Entry

Entries returns the current cron entries if using cron-based scheduling.

func (*Scheduler) Name

func (s *Scheduler) Name() string

Name returns the scheduler's identifying name.

func (*Scheduler) NextRun

func (s *Scheduler) NextRun() (time.Time, bool)

NextRun returns the next scheduled run time for the first task.

func (*Scheduler) Running

func (s *Scheduler) Running() bool

Running checks if the scheduler is currently active.

func (*Scheduler) Stop

func (s *Scheduler) Stop() error

Stop terminates the scheduler without shutting down the pool.

func (*Scheduler) Terminate

func (s *Scheduler) Terminate(cancelPool bool) error

Terminate gracefully stops all running scheduler loops.

type Scheduling

type Scheduling struct {
	RetryCount   int           // Number of retry attempts for task submission on failure
	RetryBackoff time.Duration // Duration to wait between retry attempts for failed submissions
	// contains filtered or unexported fields
}

Scheduling holds configuration for retry behavior and observability.

type Shutdown

type Shutdown struct {
	// contains filtered or unexported fields
}

Shutdown manages the graceful shutdown process.

func NewShutdown

func NewShutdown(opts ...ShutdownOption) *Shutdown

NewShutdown creates a configured manager. Defaults: 30s timeout, sequential execution, standard signals. Returns a ready-to-use Shutdown instance with signal handling set up.

func (*Shutdown) Done

func (sm *Shutdown) Done() <-chan struct{}

Done returns a channel that's closed when shutdown completes. Allows integration with context or select-based waiting patterns. Compatible with context.Done() style usage.

func (*Shutdown) GetStats

func (sm *Shutdown) GetStats() *ShutdownStats

GetStats returns a deep copy of current shutdown statistics. Safe for concurrent reads; protects against mutation during access. Used by public APIs to return final results. Used by public APIs to return final results.

func (*Shutdown) IsShuttingDown

func (sm *Shutdown) IsShuttingDown() bool

IsShuttingDown reports whether shutdown has been initiated. Thread-safe via atomic boolean; useful for guarding late registrations. Returns true once the first shutdown trigger occurs.

func (*Shutdown) Register

func (sm *Shutdown) Register(fn any) error

Register adds a cleanup task. Supported types: func(), func() error, func(context.Context) error, io.Closer

Automatically wraps panics into structured errors and tracks stats.

func (*Shutdown) RegisterCall

func (sm *Shutdown) RegisterCall(name string, fn Func) error

RegisterCall registers a simple function returning an error. Convenience wrapper around Register; supports jack.Func signature. Name is auto-generated when empty string is provided.

func (*Shutdown) RegisterCloser

func (sm *Shutdown) RegisterCloser(name string, closer io.Closer) error

RegisterCloser registers an io.Closer. Convenience wrapper that converts Close() error into shutdown error. Name reflects the concrete type when left empty.

func (*Shutdown) RegisterFunc

func (sm *Shutdown) RegisterFunc(name string, fn func()) error

RegisterFunc registers a simple void function. Convenience wrapper around Register; name is auto-generated if empty. Useful for quick registration of fire-and-forget cleanup.

func (*Shutdown) RegisterWithContext

func (sm *Shutdown) RegisterWithContext(name string, fn FuncCtx) error

RegisterWithContext registers a fully context-aware callback. Allows explicit naming and direct use of jack.FuncCtx functions. Preferred for advanced cleanup needing cancellation/timeout awareness.

func (*Shutdown) TriggerShutdown

func (sm *Shutdown) TriggerShutdown() *ShutdownStats

TriggerShutdown manually initiates the shutdown process. Useful for programmatic shutdown (e.g. health check failure). Returns final statistics once shutdown completes.

func (*Shutdown) Wait

func (sm *Shutdown) Wait() *ShutdownStats

Wait blocks until a signal is received or TriggerShutdown is called. Returns shutdown statistics once all cleanup tasks have finished. This is the primary entry point for most applications.

func (*Shutdown) WaitChan

func (sm *Shutdown) WaitChan() <-chan *ShutdownStats

WaitChan returns a channel that receives stats once shutdown is complete. Non-blocking alternative to Wait(), ideal for async integration. Channel is closed after sending the single stats value.

type ShutdownError

type ShutdownError struct {
	Name      string
	Err       error
	Timestamp time.Time
}

ShutdownError provides structured error details.

func (*ShutdownError) Error

func (e *ShutdownError) Error() string

func (*ShutdownError) Unwrap

func (e *ShutdownError) Unwrap() error

type ShutdownOption

type ShutdownOption func(*Shutdown)

ShutdownOption configures the Shutdown.

func ShutdownConcurrent

func ShutdownConcurrent() ShutdownOption

ShutdownConcurrent enables concurrent execution of cleanup functions. By default, execution is sequential (LIFO).

func ShutdownWithForceQuit

func ShutdownWithForceQuit(d time.Duration) ShutdownOption

ShutdownWithForceQuit enables a force quit trigger after a specific timeout. This triggers context cancellation if the shutdown takes too long.

func ShutdownWithLogger

func ShutdownWithLogger(l *ll.Logger) ShutdownOption

ShutdownWithLogger sets a custom logger for the manager.

func ShutdownWithSignals

func ShutdownWithSignals(signals ...os.Signal) ShutdownOption

ShutdownWithSignals specifies which OS signals to capture. If not set, defaults to SIGINT, SIGTERM, and SIGQUIT.

func ShutdownWithTimeout

func ShutdownWithTimeout(d time.Duration) ShutdownOption

ShutdownWithTimeout sets the maximum time to wait for shutdown completion. Used as the context timeout for callbacks.

type ShutdownStats

type ShutdownStats struct {
	TotalEvents     int
	CompletedEvents int
	FailedEvents    int
	StartTime       time.Time
	EndTime         time.Time
	Errors          []error
}

ShutdownStats contains metrics about the shutdown execution.

type Task

type Task interface {
	// Do executes the task and returns any error.
	Do() error
}

Task represents a simple task that can be executed without a context.

func Do

func Do(fn func()) Task

Do wraps a function with no return value into a Task. Useful for fire-and-forget or simple operations that don’t produce errors.

Example:

pool.Submit(jack.Do(func() {
	fmt.Println("Hello from task")
}))

type TaskCtx

type TaskCtx interface {
	// Do executes the task with the given context and returns any error.
	Do(ctx context.Context) error
}

TaskCtx represents a context-aware task that can be executed with a context.

func DoCtx

func DoCtx(fn func(ctx context.Context)) TaskCtx

DoCtx wraps a context-aware function with no return value into a TaskCtx. Useful when you only need context propagation without returning an error.

Example:

pool.SubmitCtx(ctx, jack.DoCtx(func(ctx context.Context) {
	fmt.Println("Running with context:", ctx)
}))

type Vitals

type Vitals struct {
	Start     HookCtx
	End       CallbackCtx
	Timed     CallbackCtx
	TimedWait time.Duration
}

func NewVitals

func NewVitals(opts ...VitalsOption) *Vitals

NewVitals creates a new Vitals instance configured with the provided functional options. Each option modifies the Vitals configuration before returning the initialized instance. If no options are provided, returns a Vitals with all fields set to their zero values.

func (*Vitals) Execute

func (l *Vitals) Execute(ctx context.Context, id string, operation Func) error

Execute runs an operation with optional start and end hooks while propagating errors. It invokes the Start hook first, then the operation, and finally the End callback on success. If Start or the operation returns an error, execution stops and the error is returned immediately.

func (*Vitals) ExecuteCtx

func (l *Vitals) ExecuteCtx(ctx context.Context, id string, operation FuncCtx) error

ExecuteCtx runs a context-aware operation with optional start and end hooks. It invokes the Start hook first, then the operation with context, and finally the End callback. Errors from Start or the operation are propagated immediately, skipping subsequent steps.

type VitalsOption

type VitalsOption func(*Vitals)

func VitalsWithEnd

func VitalsWithEnd(callback CallbackCtx) VitalsOption

VitalsWithEnd configures a callback function to run after successful operation completion. The callback receives the context and operation ID but does not return an error. Use this option for post-operation cleanup, notifications, or result processing.

func VitalsWithStart

func VitalsWithStart(hook HookCtx) VitalsOption

VitalsWithStart configures a hook function to run at the beginning of a vital operation. The hook receives the context and operation ID, and can return an error to abort execution. Use this option to inject pre-operation logic like logging, metrics, or validation.

func VitalsWithTimed

func VitalsWithTimed(callback CallbackCtx, wait time.Duration) VitalsOption

VitalsWithTimed configures a timed callback with a wait duration for delayed execution. The callback receives context and ID after the specified wait period elapses. Use this option for deferred operations like timeout handling or scheduled cleanup.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL