Documentation
¶
Overview ¶
Package teguh is a Go client for Teguh, a durable execution engine built on PostgreSQL that combines absurd's workflow API with a zero-bloat dispatch queue inspired by PgQue/PgQ. No PostgreSQL extensions are required.
Quick start:
client, err := teguh.Connect(ctx, dsn)
// Low-level: spawn a task and claim it manually
res, err := client.SpawnTask(ctx, "jobs", "send-email", params, nil)
runs, err := client.ClaimTask(ctx, "jobs", "worker-1", 30, 1)
// High-level: register handlers and start a worker
w := client.NewWorker("jobs",
teguh.WithConcurrency(10),
teguh.WithClaimTimeout(30),
)
w.Handle("send-email", func(ctx context.Context, tc *teguh.TaskContext) error {
// Use tc.Step, tc.SleepFor, tc.AwaitEvent for durable execution.
return nil
})
w.Start(ctx)
Index ¶
- Variables
- func Step[T any](ctx context.Context, tc *TaskContext, name string, ...) (T, error)
- type Checkpoint
- type Client
- func (c *Client) AwaitEvent(ctx context.Context, queue, taskID, runID, stepName, eventName string, ...) (shouldSuspend bool, payload json.RawMessage, err error)
- func (c *Client) CancelTask(ctx context.Context, queue, taskID string) error
- func (c *Client) ClaimTask(ctx context.Context, queue, workerID string, claimTimeoutSecs, qty int) ([]Run, error)
- func (c *Client) Close()
- func (c *Client) CompleteRun(ctx context.Context, queue, runID string, result any) error
- func (c *Client) CreateQueue(ctx context.Context, queue string) error
- func (c *Client) DropQueue(ctx context.Context, queue string) error
- func (c *Client) EmitEvent(ctx context.Context, queue, eventName string, payload any) error
- func (c *Client) ExtendClaim(ctx context.Context, queue, runID string, extendBySecs int) error
- func (c *Client) FailRun(ctx context.Context, queue, runID string, reason any, retryAt *time.Time) error
- func (c *Client) GetCheckpoints(ctx context.Context, queue, taskID, runID string) ([]Checkpoint, error)
- func (c *Client) GetTaskResult(ctx context.Context, queue, taskID string) (TaskResult, error)
- func (c *Client) NewWorker(queue string, opts ...Option) *Worker
- func (c *Client) Pool() *pgxpool.Pool
- func (c *Client) RetryTask(ctx context.Context, queue, taskID string, spawnNew bool) (SpawnResult, error)
- func (c *Client) ScheduleRun(ctx context.Context, queue, runID string, wakeAt time.Time) error
- func (c *Client) SetCheckpoint(ctx context.Context, queue, taskID, stepName string, state json.RawMessage, ...) error
- func (c *Client) SpawnTask(ctx context.Context, queue, taskName string, params any, opts *SpawnOptions) (SpawnResult, error)
- func (c *Client) Ticker(ctx context.Context) (int, error)
- type HandlerFunc
- type Option
- type Run
- type SpawnOptions
- type SpawnResult
- type TaskContext
- func (tc *TaskContext) Attempt() int
- func (tc *TaskContext) AwaitEvent(ctx context.Context, stepName, eventName string, timeout ...time.Duration) (json.RawMessage, error)
- func (tc *TaskContext) EmitEvent(ctx context.Context, eventName string, payload any) error
- func (tc *TaskContext) EventPayload() json.RawMessage
- func (tc *TaskContext) Heartbeat(ctx context.Context, extendBySecs int) error
- func (tc *TaskContext) Params() json.RawMessage
- func (tc *TaskContext) RunID() string
- func (tc *TaskContext) SleepFor(ctx context.Context, d time.Duration) error
- func (tc *TaskContext) SleepUntil(ctx context.Context, t time.Time) error
- func (tc *TaskContext) TaskID() string
- func (tc *TaskContext) WakeEvent() *string
- type TaskResult
- type Worker
Constants ¶
This section is empty.
Variables ¶
var ErrCancelled = errors.New("teguh: task cancelled")
ErrCancelled is returned by Heartbeat when the task has been cancelled by an external actor. Handlers must propagate this error up to the Worker.
var ErrSuspended = errors.New("teguh: task suspended")
ErrSuspended is returned by TaskContext methods (SleepFor, SleepUntil, AwaitEvent) when the task has been suspended. Handlers must propagate this error up to the Worker without further processing.
Functions ¶
func Step ¶
func Step[T any](ctx context.Context, tc *TaskContext, name string, fn func(context.Context) (T, error)) (T, error)
Step executes fn and durably persists the result under name. On retry the cached result is returned immediately without calling fn again (exactly-once per step name).
T must be JSON-marshalable. Use the package-level generic Step[T] function for typed convenience:
result, err := teguh.Step(ctx, tc, "fetch-user", func(ctx context.Context) (*User, error) {
return fetchUser(ctx, userID)
})
Important constraints:
- Each step name must be unique within a task's lifetime. A duplicate name returns the first call's cached result silently — this is usually a bug.
- Side effects inside fn must be idempotent: if SetCheckpoint fails after fn returns, fn will be called again on the next attempt.
- TaskContext is not goroutine-safe; do not share it across goroutines.
Types ¶
type Checkpoint ¶
type Checkpoint struct {
Name string `json:"name"`
State json.RawMessage `json:"state"`
}
Checkpoint is a single step result returned by Client.GetCheckpoints.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is the main teguh client. It wraps a pgx connection pool and is safe for concurrent use.
func Connect ¶
Connect opens a new Client using the given PostgreSQL DSN. The caller must call Close when done.
func (*Client) AwaitEvent ¶
func (c *Client) AwaitEvent(ctx context.Context, queue, taskID, runID, stepName, eventName string, timeoutSecs *int) (shouldSuspend bool, payload json.RawMessage, err error)
AwaitEvent suspends the run waiting for eventName, or returns immediately if the event was already emitted. timeoutSecs nil = wait forever. Returns (shouldSuspend, payload, err). When shouldSuspend is true the worker must stop processing the run.
func (*Client) CancelTask ¶
CancelTask cancels a task by ID. Running workers detect cancellation at the next checkpoint or heartbeat call.
func (*Client) ClaimTask ¶
func (c *Client) ClaimTask(ctx context.Context, queue, workerID string, claimTimeoutSecs, qty int) ([]Run, error)
ClaimTask claims up to qty pending tasks from the queue for workerID with a lease of claimTimeoutSecs seconds. Returns the claimed runs (may be empty).
func (*Client) CompleteRun ¶
CompleteRun marks the run as successfully completed with an optional result payload.
func (*Client) CreateQueue ¶
CreateQueue provisions all per-queue tables (t_, p_, r_, c_, e_, w_).
func (*Client) EmitEvent ¶
EmitEvent emits a named event with an optional payload. First-write-wins: subsequent calls for the same event name are silently ignored. Wakes any tasks waiting on this event and fires pg_notify. A nil payload sends SQL NULL, which the engine stores as JSON null.
func (*Client) ExtendClaim ¶
ExtendClaim extends the lease for an active run by extendBySecs seconds. Returns an error if the task has been cancelled (sqlstate AB001).
func (*Client) FailRun ¶
func (c *Client) FailRun(ctx context.Context, queue, runID string, reason any, retryAt *time.Time) error
FailRun marks the run as failed with a JSON reason payload. The engine will schedule a retry according to the task's retry_strategy. retryAt, when non-nil, overrides the computed retry delay.
func (*Client) GetCheckpoints ¶
func (c *Client) GetCheckpoints(ctx context.Context, queue, taskID, runID string) ([]Checkpoint, error)
GetCheckpoints loads all committed checkpoints for a task visible to a given run's attempt number.
func (*Client) GetTaskResult ¶
GetTaskResult returns the current state and result of a task.
func (*Client) NewWorker ¶
NewWorker creates a Worker for the given queue. Use Handle to register task handlers, then call Start to begin processing.
func (*Client) RetryTask ¶
func (c *Client) RetryTask(ctx context.Context, queue, taskID string, spawnNew bool) (SpawnResult, error)
RetryTask re-enqueues a failed or cancelled task.
func (*Client) ScheduleRun ¶
ScheduleRun suspends the run until wakeAt. The worker must return after calling this (or after TaskContext.SleepFor/SleepUntil returns ErrSuspended).
func (*Client) SetCheckpoint ¶
func (c *Client) SetCheckpoint(ctx context.Context, queue, taskID, stepName string, state json.RawMessage, ownerRunID string, extendClaimBy int) error
SetCheckpoint writes a named step result for a task. If extendClaimBy > 0 it also extends the lease.
func (*Client) SpawnTask ¶
func (c *Client) SpawnTask(ctx context.Context, queue, taskName string, params any, opts *SpawnOptions) (SpawnResult, error)
SpawnTask enqueues a new task and returns the spawn result. params must be JSON-marshalable; opts may be nil.
type HandlerFunc ¶
type HandlerFunc func(ctx context.Context, tc *TaskContext) error
HandlerFunc processes a single task execution. It receives a TaskContext that exposes durable execution primitives (Step, SleepFor, AwaitEvent, …).
- Return nil to mark the run as completed.
- Return ErrSuspended (propagated from tc.SleepFor / tc.AwaitEvent) to leave the run suspended, the Worker will not call complete_run or fail_run.
- Return ErrCancelled (propagated from tc.Heartbeat) when the task has been cancelled; the Worker will not call complete_run or fail_run.
- Return any other error to fail the run (with retry if configured).
type Option ¶
type Option func(*Worker)
Option configures a Worker.
func WithBatchSize ¶
WithBatchSize sets how many tasks to claim per claim_task call. Defaults to the worker's concurrency setting.
func WithClaimTimeout ¶
WithClaimTimeout sets the lease duration in seconds. Workers must heartbeat before this deadline or the task is automatically failed and re-queued. Defaults to 30s.
func WithConcurrency ¶
WithConcurrency sets the maximum number of tasks executed concurrently by this worker process. Defaults to 10.
func WithHeartbeatInterval ¶
WithHeartbeatInterval sets how often the worker extends its lease. Must be less than WithClaimTimeout. Defaults to 10s.
func WithPollInterval ¶
WithPollInterval sets the fallback sleep between claim_task calls when the queue is empty and no NOTIFY arrives. Defaults to 30s. Workers with LISTEN/NOTIFY only hit this fallback on missed notifications.
func WithWorkerID ¶
WithWorkerID sets an explicit worker identifier (used in lease metadata and logs). Defaults to hostname:pid.
type Run ¶
type Run struct {
RunID string `json:"run_id"`
TaskID string `json:"task_id"`
Attempt int `json:"attempt"`
TaskName string `json:"task_name"`
Params json.RawMessage `json:"params"`
RetryStrategy json.RawMessage `json:"retry_strategy"`
MaxAttempts *int `json:"max_attempts"`
Headers json.RawMessage `json:"headers"`
WakeEvent *string `json:"wake_event"`
EventPayload json.RawMessage `json:"event_payload"`
}
Run is a claimed task execution returned by Client.ClaimTask.
type SpawnOptions ¶
type SpawnOptions struct {
// Headers are arbitrary key/value pairs attached to the task (not params).
Headers map[string]any `json:"headers,omitempty"`
// RetryStrategy controls retry behaviour: {"kind":"exponential","base_seconds":30,"factor":2}
RetryStrategy map[string]any `json:"retry_strategy,omitempty"`
// MaxAttempts caps the number of execution attempts (nil = unlimited).
MaxAttempts *int `json:"max_attempts,omitempty"`
// Cancellation configures auto-cancel thresholds:
// {"max_delay": <seconds>}: cancel if not started within N seconds.
// {"max_duration": <seconds>}: cancel if running longer than N seconds total.
Cancellation map[string]any `json:"cancellation,omitempty"`
// IdempotencyKey makes the spawn idempotent; a second call returns the
// existing task without creating a new one.
IdempotencyKey string `json:"idempotency_key,omitempty"`
// AvailableAt schedules delayed start (zero = immediate).
AvailableAt time.Time `json:"available_at,omitempty"`
}
SpawnOptions configures an optional spawn_task call.
type SpawnResult ¶
type SpawnResult struct {
TaskID string `json:"task_id"`
// RunID is a pre-generated UUID stored as a hint; it is NOT the run ID
// that will be active once the task is claimed. Use the run_id returned
// by ClaimTask for all run-scoped APIs (CompleteRun, FailRun, etc.).
RunID string `json:"run_id"`
Attempt int `json:"attempt"`
Created bool `json:"created"` // false on idempotency hit
}
SpawnResult is returned by Client.SpawnTask.
type TaskContext ¶
type TaskContext struct {
// contains filtered or unexported fields
}
TaskContext is passed to every task handler. It provides durable execution primitives: exactly-once steps, timer-based sleep, and event coordination.
Checkpoint state is pre-loaded from the database when the run is claimed, so step cache hits require no extra round-trips.
func (*TaskContext) Attempt ¶
func (tc *TaskContext) Attempt() int
Attempt returns the current attempt number (1-based).
func (*TaskContext) AwaitEvent ¶
func (tc *TaskContext) AwaitEvent(ctx context.Context, stepName, eventName string, timeout ...time.Duration) (json.RawMessage, error)
AwaitEvent suspends the task waiting for eventName to be emitted. If the event was already emitted before this call, returns immediately with the payload (no suspension).
stepName is used to persist the received payload as a checkpoint so that on replay the handler skips the wait and receives the cached payload.
timeout optionally caps the wait; when it elapses the task resumes with a nil payload.
Returns (payload, ErrSuspended) when suspended; (payload, nil) when the event was already available or on timeout resume.
func (*TaskContext) EmitEvent ¶
EmitEvent emits eventName with payload. First-write-wins: later calls for the same event name in the same queue are ignored. Wakes any tasks suspended on this event and fires pg_notify.
func (*TaskContext) EventPayload ¶
func (tc *TaskContext) EventPayload() json.RawMessage
EventPayload returns the payload of the wake event, or nil.
func (*TaskContext) Heartbeat ¶
func (tc *TaskContext) Heartbeat(ctx context.Context, extendBySecs int) error
Heartbeat extends the run's lease by the worker's claim timeout. Returns ErrCancelled if the task has been cancelled (workers should stop immediately).
func (*TaskContext) Params ¶
func (tc *TaskContext) Params() json.RawMessage
Params returns the raw JSON params passed at spawn time.
func (*TaskContext) RunID ¶
func (tc *TaskContext) RunID() string
RunID returns the current execution's run ID.
func (*TaskContext) SleepFor ¶
SleepFor suspends the task for duration d. The run will be re-queued after d has elapsed (by the ticker or claim_task's inline recovery sweep).
The wake time is computed from the Go process clock (time.Now), not the PostgreSQL server clock. A small clock skew between the two hosts may cause the task to wake slightly earlier or later than intended.
Always returns ErrSuspended; the handler must return this error immediately.
func (*TaskContext) SleepUntil ¶
SleepUntil suspends the task until t. Always returns ErrSuspended.
func (*TaskContext) WakeEvent ¶
func (tc *TaskContext) WakeEvent() *string
WakeEvent returns the event name that caused this resumption, or nil.
type TaskResult ¶
type TaskResult struct {
TaskID string `json:"task_id"`
State string `json:"state"`
Attempts int `json:"attempts"`
CompletedPayload json.RawMessage `json:"completed_payload"`
CancelledAt *time.Time `json:"cancelled_at"`
}
TaskResult is returned by Client.GetTaskResult.
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker polls a teguh queue and dispatches claimed tasks to registered handlers. It uses LISTEN/NOTIFY (channel: teguh_<queue>) to wake immediately when new tasks are available, falling back to a poll interval.
func (*Worker) Handle ¶
func (w *Worker) Handle(taskName string, fn HandlerFunc)
Handle registers a handler for the named task. Use "*" to register a catch-all handler for any task name without a specific handler. All Handle calls must complete before calling Start; the handler map is not goroutine-safe after Start begins dispatching.