teguh

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 20, 2026 License: Apache-2.0 Imports: 11 Imported by: 0

README

teguh

Teguh is a durable execution engine built on PostgreSQL. It combines the workflow API from absurd with a zero-bloat dispatch queue inspired by PgQ, adding LISTEN/NOTIFY so workers wake immediately instead of polling on a fixed interval.

Features

  • Pull-based claim_task API, fully compatible with the absurd workflow model.
  • LISTEN/NOTIFY wakeup via pg_notify on spawn_task and emit_event, with a configurable fallback poll interval.
  • Zero dead-tuple dispatch path: p_<queue> rows are inserted on spawn and deleted on claim, never updated.
  • Active leases tracked in r_<queue> only while a run is in-flight, keeping the table small and bounded.
  • Exactly-once step checkpoints via teguh.Step[T], with in-memory cache pre-loaded at claim time.
  • Timer-based sleep and resume (SleepFor, SleepUntil) without a run row while sleeping.
  • Event coordination (AwaitEvent, EmitEvent) with first-write-wins semantics and timeout support.
  • Configurable retry strategies: fixed delay, exponential backoff, or none.
  • Task cancellation, manual retry, and result inspection.
  • Optional pg_cron integration via teguh.start() and teguh.stop(), gracefully skipped when pg_cron is absent.
  • Single-file SQL install, no external dependencies beyond PostgreSQL itself.

Requirements

  • PostgreSQL 14 or later.
  • Go 1.21 or later (for generic Step[T]).

No PostgreSQL extensions are required. The UUIDv7 generator uses gen_random_uuid() and uuid_send(), both core built-ins since PostgreSQL 13.

Managed PostgreSQL compatibility

Teguh works out of the box on every major managed provider. No extension pre-configuration is needed.

Provider Works? Notes
Google Cloud SQL No restrictions
Amazon RDS for PostgreSQL No restrictions
Amazon Aurora PostgreSQL No restrictions
Azure Database for PostgreSQL No restrictions
Supabase No restrictions
Neon No restrictions
Aiven No restrictions
Crunchy Bridge No restrictions
DigitalOcean Managed Databases No restrictions
Render No restrictions
Heroku Postgres / EDB No restrictions
Railway No official extension list, but no extension needed
pgcrypto auto-detection

portable_uuidv7() defaults to uuid_send(gen_random_uuid()) — no extension needed. If pgcrypto is already installed, teguh.sql detects it at install time and automatically upgrades the function to use gen_random_bytes(10) instead. Re-running teguh.sql after installing pgcrypto picks up the upgrade with zero per-call overhead.

Implementation paths:

uuid_send(gen_random_uuid()) (default) gen_random_bytes(10) (pgcrypto auto-upgrade)
Extension required None pgcrypto (auto-detected)
Azure extra step None Must allowlist via azure.extensions param
Random source OS CSPRNG via gen_random_uuid() OS CSPRNG directly
Bytes of entropy 80 bits (first 10 of 16) 80 bits
Performance One UUID generation + substr One syscall
Portability All providers, all PG14+ All providers except possibly Railway

Both paths provide 80 bits of OS CSPRNG entropy. The difference is negligible in practice.

Installation

Install the schema once per database:

\i sql/teguh.sql

Or apply it programmatically:

schema, _ := os.ReadFile("sql/teguh.sql")
_, err = pool.Exec(ctx, string(schema))

Quick start

import "github.com/dio/teguh"

client, err := teguh.Connect(ctx, dsn)
if err != nil {
    log.Fatal(err)
}
defer client.Close()

// Create a queue.
if err := client.CreateQueue(ctx, "jobs"); err != nil {
    log.Fatal(err)
}

// Register handlers and start a worker.
w := client.NewWorker("jobs",
    teguh.WithConcurrency(10),
    teguh.WithClaimTimeout(30),
    teguh.WithPollInterval(30*time.Second),
)

w.Handle("send-email", func(ctx context.Context, tc *teguh.TaskContext) error {
    var params struct {
        To      string `json:"to"`
        Subject string `json:"subject"`
    }
    if err := json.Unmarshal(tc.Params(), &params); err != nil {
        return err
    }
    return sendEmail(ctx, params.To, params.Subject)
})

// Start blocks until ctx is cancelled.
if err := w.Start(ctx); err != nil && !errors.Is(err, context.Canceled) {
    log.Fatal(err)
}

Spawn a task from anywhere:

res, err := client.SpawnTask(ctx, "jobs", "send-email",
    map[string]any{"to": "user@example.com", "subject": "Hello"},
    nil,
)

Durable execution

Use teguh.Step[T] for exactly-once sub-steps. On retry, the cached result is returned immediately without re-executing the function.

w.Handle("order-fulfillment", func(ctx context.Context, tc *teguh.TaskContext) error {
    // Step 1: charge the card. Runs exactly once, even across retries.
    charge, err := teguh.Step(ctx, tc, "charge", func(ctx context.Context) (*ChargeResult, error) {
        return chargeCard(ctx, orderID)
    })
    if err != nil {
        return err
    }

    // Step 2: ship the order.
    _, err = teguh.Step(ctx, tc, "ship", func(ctx context.Context) (*ShipResult, error) {
        return shipOrder(ctx, charge.ID)
    })
    return err
})
Sleep and resume
w.Handle("reminder", func(ctx context.Context, tc *teguh.TaskContext) error {
    if tc.Attempt() == 1 {
        // Suspend for 24 hours. The worker re-queues the task after the delay.
        return tc.SleepFor(ctx, 24*time.Hour)
    }
    return sendReminder(ctx)
})
Event coordination
// Wait for an external event (e.g. payment confirmed).
w.Handle("wait-for-payment", func(ctx context.Context, tc *teguh.TaskContext) error {
    payload, err := tc.AwaitEvent(ctx, "payment-received", "payment:"+orderID,
        teguh.WithTimeout(10*time.Minute),
    )
    if errors.Is(err, teguh.ErrSuspended) {
        return teguh.ErrSuspended
    }
    if err != nil {
        return err
    }
    return processPayment(ctx, payload)
})

// Emit the event from another handler or service.
if err := client.EmitEvent(ctx, "jobs", "payment:"+orderID, paymentData); err != nil {
    return err
}

Low-level API

The Client exposes the full low-level API for direct use without the Worker abstraction:

// Spawn and claim manually.
res, err := client.SpawnTask(ctx, "jobs", "ping", map[string]any{"val": 42}, nil)
runs, err := client.ClaimTask(ctx, "jobs", "worker-1", 30, 1)

// Complete, fail, or sleep.
client.CompleteRun(ctx, "jobs", runs[0].RunID, result)
client.FailRun(ctx, "jobs", runs[0].RunID, reason, nil)
client.ScheduleRun(ctx, "jobs", runs[0].RunID, time.Now().Add(1*time.Minute))

// Checkpoints.
client.SetCheckpoint(ctx, "jobs", taskID, "step-name", stateJSON, runID, 0)
cps, err := client.GetCheckpoints(ctx, "jobs", taskID, runID)

// Events.
client.AwaitEvent(ctx, "jobs", taskID, runID, "step", "event-name", nil)
client.EmitEvent(ctx, "jobs", "event-name", payload)

// Cancellation and retry.
client.CancelTask(ctx, "jobs", taskID)
client.RetryTask(ctx, "jobs", taskID, false)

// Ticker: re-queues sleeping tasks whose wake time has arrived.
// In production, call teguh.start() to schedule this via pg_cron.
n, err := client.Ticker(ctx)

Worker options

Option Default Description
WithConcurrency(n) 10 Maximum in-flight runs.
WithClaimTimeout(secs) 30 Lease duration in seconds.
WithPollInterval(d) 30s Fallback poll interval when no NOTIFY arrives.
WithHeartbeatInterval(d) 10s How often to extend the lease while a run is active.
WithBatchSize(n) concurrency Maximum tasks claimed per poll cycle.
WithWorkerID(id) hostname:pid Identifier stored in the run lease.

Per-queue tables

Table Purpose
t_<queue> Canonical task record, durable across all attempts.
p_<queue> Pending dispatch queue, insert on spawn and delete on claim, zero dead tuples.
r_<queue> Active leases only, one row per in-flight run.
c_<queue> Step checkpoints, keyed by task ID and step name.
e_<queue> Events, first-write-wins.
w_<queue> Wait registrations for tasks suspended on an event.

pg_cron integration (optional)

-- Schedule the ticker every minute and a daily cleanup job.
SELECT teguh.start();

-- Unschedule both jobs.
SELECT teguh.stop();

If pg_cron is not installed, teguh.start() emits a notice and returns without error. Call teguh.ticker() manually from your own scheduler or from tests.

Development

Prerequisites
  • Go 1.21 or later.
  • No PostgreSQL installation needed — e2e tests start an embedded PostgreSQL instance automatically via embedded-postgres.
Workflow
# First-time setup: copy sql/teguh.sql into e2e/testdata.
make fetch-schema

# Unit tests (root package).
make test

# E2e tests — starts embedded PostgreSQL, installs the schema, runs all tests.
make test.e2e

# Lint all modules (golangci-lint).
make lint

# Format all Go code in-place (run before committing).
make format
E2e test architecture

E2e tests live in e2e/ and use testify suite. TestMain (in e2e_test.go) owns the embedded PostgreSQL lifecycle; TestTeguhSuite (in teguh_test.go) is the suite entry point discovered by the Go test runner. See e2e/README.md for details.

SQL style

sql/teguh.sql is hand-formatted. Automated SQL formatters (pg_format, sql-formatter) do not preserve the style, so formatting is enforced by convention rather than tooling. Please follow these rules when editing:

  • Lowercase keywordscreate or replace function, select, insert, etc.
  • 2-space indentation inside function bodies and SQL blocks.
  • No space before ()current_time(), not current_time ().
  • Multi-line function signatures with each clause on its own line:
    create or replace function teguh.my_func(p_arg text)
      returns void
      language plpgsql
    as $$
    
  • Column-aligned declarations when a block has multiple variables:
    declare
      v_millis  bigint;
      v_hex     text;
      v_b       bytea;
    
Go formatting

make format rewrites Go files in-place via gofmt and goimports. Run it before committing. make lint enforces linter rules but does not check formatting. make check-format (used in CI) fails if any file needs reformatting.

License

Apache-2.0. Copyright 2026 dio@rockybars.com.

Documentation

Overview

Package teguh is a Go client for Teguh, a durable execution engine built on PostgreSQL that combines absurd's workflow API with a zero-bloat dispatch queue inspired by PgQue/PgQ. No PostgreSQL extensions are required.

Quick start:

client, err := teguh.Connect(ctx, dsn)

// Low-level: spawn a task and claim it manually
res, err := client.SpawnTask(ctx, "jobs", "send-email", params, nil)
runs, err := client.ClaimTask(ctx, "jobs", "worker-1", 30, 1)

// High-level: register handlers and start a worker
w := client.NewWorker("jobs",
    teguh.WithConcurrency(10),
    teguh.WithClaimTimeout(30),
)
w.Handle("send-email", func(ctx context.Context, tc *teguh.TaskContext) error {
    // Use tc.Step, tc.SleepFor, tc.AwaitEvent for durable execution.
    return nil
})
w.Start(ctx)

Index

Constants

This section is empty.

Variables

View Source
var ErrCancelled = errors.New("teguh: task cancelled")

ErrCancelled is returned by Heartbeat when the task has been cancelled by an external actor. Handlers must propagate this error up to the Worker.

View Source
var ErrSuspended = errors.New("teguh: task suspended")

ErrSuspended is returned by TaskContext methods (SleepFor, SleepUntil, AwaitEvent) when the task has been suspended. Handlers must propagate this error up to the Worker without further processing.

Functions

func Step

func Step[T any](ctx context.Context, tc *TaskContext, name string, fn func(context.Context) (T, error)) (T, error)

Step executes fn and durably persists the result under name. On retry the cached result is returned immediately without calling fn again (exactly-once per step name).

T must be JSON-marshalable. Use the package-level generic Step[T] function for typed convenience:

result, err := teguh.Step(ctx, tc, "fetch-user", func(ctx context.Context) (*User, error) {
    return fetchUser(ctx, userID)
})

Important constraints:

  • Each step name must be unique within a task's lifetime. A duplicate name returns the first call's cached result silently — this is usually a bug.
  • Side effects inside fn must be idempotent: if SetCheckpoint fails after fn returns, fn will be called again on the next attempt.
  • TaskContext is not goroutine-safe; do not share it across goroutines.

Types

type Checkpoint

type Checkpoint struct {
	Name  string          `json:"name"`
	State json.RawMessage `json:"state"`
}

Checkpoint is a single step result returned by Client.GetCheckpoints.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is the main teguh client. It wraps a pgx connection pool and is safe for concurrent use.

func Connect

func Connect(ctx context.Context, dsn string) (*Client, error)

Connect opens a new Client using the given PostgreSQL DSN. The caller must call Close when done.

func (*Client) AwaitEvent

func (c *Client) AwaitEvent(ctx context.Context, queue, taskID, runID, stepName, eventName string, timeoutSecs *int) (shouldSuspend bool, payload json.RawMessage, err error)

AwaitEvent suspends the run waiting for eventName, or returns immediately if the event was already emitted. timeoutSecs nil = wait forever. Returns (shouldSuspend, payload, err). When shouldSuspend is true the worker must stop processing the run.

func (*Client) CancelTask

func (c *Client) CancelTask(ctx context.Context, queue, taskID string) error

CancelTask cancels a task by ID. Running workers detect cancellation at the next checkpoint or heartbeat call.

func (*Client) ClaimTask

func (c *Client) ClaimTask(ctx context.Context, queue, workerID string, claimTimeoutSecs, qty int) ([]Run, error)

ClaimTask claims up to qty pending tasks from the queue for workerID with a lease of claimTimeoutSecs seconds. Returns the claimed runs (may be empty).

func (*Client) Close

func (c *Client) Close()

Close releases all connections in the pool.

func (*Client) CompleteRun

func (c *Client) CompleteRun(ctx context.Context, queue, runID string, result any) error

CompleteRun marks the run as successfully completed with an optional result payload.

func (*Client) CreateQueue

func (c *Client) CreateQueue(ctx context.Context, queue string) error

CreateQueue provisions all per-queue tables (t_, p_, r_, c_, e_, w_).

func (*Client) DropQueue

func (c *Client) DropQueue(ctx context.Context, queue string) error

DropQueue removes all per-queue tables and the queue registry entry.

func (*Client) EmitEvent

func (c *Client) EmitEvent(ctx context.Context, queue, eventName string, payload any) error

EmitEvent emits a named event with an optional payload. First-write-wins: subsequent calls for the same event name are silently ignored. Wakes any tasks waiting on this event and fires pg_notify. A nil payload sends SQL NULL, which the engine stores as JSON null.

func (*Client) ExtendClaim

func (c *Client) ExtendClaim(ctx context.Context, queue, runID string, extendBySecs int) error

ExtendClaim extends the lease for an active run by extendBySecs seconds. Returns an error if the task has been cancelled (sqlstate AB001).

func (*Client) FailRun

func (c *Client) FailRun(ctx context.Context, queue, runID string, reason any, retryAt *time.Time) error

FailRun marks the run as failed with a JSON reason payload. The engine will schedule a retry according to the task's retry_strategy. retryAt, when non-nil, overrides the computed retry delay.

func (*Client) GetCheckpoints

func (c *Client) GetCheckpoints(ctx context.Context, queue, taskID, runID string) ([]Checkpoint, error)

GetCheckpoints loads all committed checkpoints for a task visible to a given run's attempt number.

func (*Client) GetTaskResult

func (c *Client) GetTaskResult(ctx context.Context, queue, taskID string) (TaskResult, error)

GetTaskResult returns the current state and result of a task.

func (*Client) NewWorker

func (c *Client) NewWorker(queue string, opts ...Option) *Worker

NewWorker creates a Worker for the given queue. Use Handle to register task handlers, then call Start to begin processing.

func (*Client) Pool

func (c *Client) Pool() *pgxpool.Pool

Pool returns the underlying pgxpool for direct SQL access (e.g. DDL).

func (*Client) RetryTask

func (c *Client) RetryTask(ctx context.Context, queue, taskID string, spawnNew bool) (SpawnResult, error)

RetryTask re-enqueues a failed or cancelled task.

func (*Client) ScheduleRun

func (c *Client) ScheduleRun(ctx context.Context, queue, runID string, wakeAt time.Time) error

ScheduleRun suspends the run until wakeAt. The worker must return after calling this (or after TaskContext.SleepFor/SleepUntil returns ErrSuspended).

func (*Client) SetCheckpoint

func (c *Client) SetCheckpoint(ctx context.Context, queue, taskID, stepName string, state json.RawMessage, ownerRunID string, extendClaimBy int) error

SetCheckpoint writes a named step result for a task. If extendClaimBy > 0 it also extends the lease.

func (*Client) SpawnTask

func (c *Client) SpawnTask(ctx context.Context, queue, taskName string, params any, opts *SpawnOptions) (SpawnResult, error)

SpawnTask enqueues a new task and returns the spawn result. params must be JSON-marshalable; opts may be nil.

func (*Client) Ticker

func (c *Client) Ticker(ctx context.Context) (int, error)

Ticker re-queues sleeping tasks whose wake time has arrived and fires pg_notify. Returns the number of tasks re-queued. Safe to call manually (used in tests; in production use teguh.start() for pg_cron scheduling).

type HandlerFunc

type HandlerFunc func(ctx context.Context, tc *TaskContext) error

HandlerFunc processes a single task execution. It receives a TaskContext that exposes durable execution primitives (Step, SleepFor, AwaitEvent, …).

  • Return nil to mark the run as completed.
  • Return ErrSuspended (propagated from tc.SleepFor / tc.AwaitEvent) to leave the run suspended, the Worker will not call complete_run or fail_run.
  • Return ErrCancelled (propagated from tc.Heartbeat) when the task has been cancelled; the Worker will not call complete_run or fail_run.
  • Return any other error to fail the run (with retry if configured).

type Option

type Option func(*Worker)

Option configures a Worker.

func WithBatchSize

func WithBatchSize(n int) Option

WithBatchSize sets how many tasks to claim per claim_task call. Defaults to the worker's concurrency setting.

func WithClaimTimeout

func WithClaimTimeout(secs int) Option

WithClaimTimeout sets the lease duration in seconds. Workers must heartbeat before this deadline or the task is automatically failed and re-queued. Defaults to 30s.

func WithConcurrency

func WithConcurrency(n int) Option

WithConcurrency sets the maximum number of tasks executed concurrently by this worker process. Defaults to 10.

func WithHeartbeatInterval

func WithHeartbeatInterval(d time.Duration) Option

WithHeartbeatInterval sets how often the worker extends its lease. Must be less than WithClaimTimeout. Defaults to 10s.

func WithPollInterval

func WithPollInterval(d time.Duration) Option

WithPollInterval sets the fallback sleep between claim_task calls when the queue is empty and no NOTIFY arrives. Defaults to 30s. Workers with LISTEN/NOTIFY only hit this fallback on missed notifications.

func WithWorkerID

func WithWorkerID(id string) Option

WithWorkerID sets an explicit worker identifier (used in lease metadata and logs). Defaults to hostname:pid.

type Run

type Run struct {
	RunID         string          `json:"run_id"`
	TaskID        string          `json:"task_id"`
	Attempt       int             `json:"attempt"`
	TaskName      string          `json:"task_name"`
	Params        json.RawMessage `json:"params"`
	RetryStrategy json.RawMessage `json:"retry_strategy"`
	MaxAttempts   *int            `json:"max_attempts"`
	Headers       json.RawMessage `json:"headers"`
	WakeEvent     *string         `json:"wake_event"`
	EventPayload  json.RawMessage `json:"event_payload"`
}

Run is a claimed task execution returned by Client.ClaimTask.

type SpawnOptions

type SpawnOptions struct {
	// Headers are arbitrary key/value pairs attached to the task (not params).
	Headers map[string]any `json:"headers,omitempty"`
	// RetryStrategy controls retry behaviour: {"kind":"exponential","base_seconds":30,"factor":2}
	RetryStrategy map[string]any `json:"retry_strategy,omitempty"`
	// MaxAttempts caps the number of execution attempts (nil = unlimited).
	MaxAttempts *int `json:"max_attempts,omitempty"`
	// Cancellation configures auto-cancel thresholds:
	// {"max_delay": <seconds>}: cancel if not started within N seconds.
	// {"max_duration": <seconds>}: cancel if running longer than N seconds total.
	Cancellation map[string]any `json:"cancellation,omitempty"`
	// IdempotencyKey makes the spawn idempotent; a second call returns the
	// existing task without creating a new one.
	IdempotencyKey string `json:"idempotency_key,omitempty"`
	// AvailableAt schedules delayed start (zero = immediate).
	AvailableAt time.Time `json:"available_at,omitempty"`
}

SpawnOptions configures an optional spawn_task call.

type SpawnResult

type SpawnResult struct {
	TaskID string `json:"task_id"`
	// RunID is a pre-generated UUID stored as a hint; it is NOT the run ID
	// that will be active once the task is claimed. Use the run_id returned
	// by ClaimTask for all run-scoped APIs (CompleteRun, FailRun, etc.).
	RunID   string `json:"run_id"`
	Attempt int    `json:"attempt"`
	Created bool   `json:"created"` // false on idempotency hit
}

SpawnResult is returned by Client.SpawnTask.

type TaskContext

type TaskContext struct {
	// contains filtered or unexported fields
}

TaskContext is passed to every task handler. It provides durable execution primitives: exactly-once steps, timer-based sleep, and event coordination.

Checkpoint state is pre-loaded from the database when the run is claimed, so step cache hits require no extra round-trips.

func (*TaskContext) Attempt

func (tc *TaskContext) Attempt() int

Attempt returns the current attempt number (1-based).

func (*TaskContext) AwaitEvent

func (tc *TaskContext) AwaitEvent(ctx context.Context, stepName, eventName string, timeout ...time.Duration) (json.RawMessage, error)

AwaitEvent suspends the task waiting for eventName to be emitted. If the event was already emitted before this call, returns immediately with the payload (no suspension).

stepName is used to persist the received payload as a checkpoint so that on replay the handler skips the wait and receives the cached payload.

timeout optionally caps the wait; when it elapses the task resumes with a nil payload.

Returns (payload, ErrSuspended) when suspended; (payload, nil) when the event was already available or on timeout resume.

func (*TaskContext) EmitEvent

func (tc *TaskContext) EmitEvent(ctx context.Context, eventName string, payload any) error

EmitEvent emits eventName with payload. First-write-wins: later calls for the same event name in the same queue are ignored. Wakes any tasks suspended on this event and fires pg_notify.

func (*TaskContext) EventPayload

func (tc *TaskContext) EventPayload() json.RawMessage

EventPayload returns the payload of the wake event, or nil.

func (*TaskContext) Heartbeat

func (tc *TaskContext) Heartbeat(ctx context.Context, extendBySecs int) error

Heartbeat extends the run's lease by the worker's claim timeout. Returns ErrCancelled if the task has been cancelled (workers should stop immediately).

func (*TaskContext) Params

func (tc *TaskContext) Params() json.RawMessage

Params returns the raw JSON params passed at spawn time.

func (*TaskContext) RunID

func (tc *TaskContext) RunID() string

RunID returns the current execution's run ID.

func (*TaskContext) SleepFor

func (tc *TaskContext) SleepFor(ctx context.Context, d time.Duration) error

SleepFor suspends the task for duration d. The run will be re-queued after d has elapsed (by the ticker or claim_task's inline recovery sweep).

The wake time is computed from the Go process clock (time.Now), not the PostgreSQL server clock. A small clock skew between the two hosts may cause the task to wake slightly earlier or later than intended.

Always returns ErrSuspended; the handler must return this error immediately.

func (*TaskContext) SleepUntil

func (tc *TaskContext) SleepUntil(ctx context.Context, t time.Time) error

SleepUntil suspends the task until t. Always returns ErrSuspended.

func (*TaskContext) TaskID

func (tc *TaskContext) TaskID() string

TaskID returns the task ID.

func (*TaskContext) WakeEvent

func (tc *TaskContext) WakeEvent() *string

WakeEvent returns the event name that caused this resumption, or nil.

type TaskResult

type TaskResult struct {
	TaskID           string          `json:"task_id"`
	State            string          `json:"state"`
	Attempts         int             `json:"attempts"`
	CompletedPayload json.RawMessage `json:"completed_payload"`
	CancelledAt      *time.Time      `json:"cancelled_at"`
}

TaskResult is returned by Client.GetTaskResult.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker polls a teguh queue and dispatches claimed tasks to registered handlers. It uses LISTEN/NOTIFY (channel: teguh_<queue>) to wake immediately when new tasks are available, falling back to a poll interval.

func (*Worker) Handle

func (w *Worker) Handle(taskName string, fn HandlerFunc)

Handle registers a handler for the named task. Use "*" to register a catch-all handler for any task name without a specific handler. All Handle calls must complete before calling Start; the handler map is not goroutine-safe after Start begins dispatching.

func (*Worker) Start

func (w *Worker) Start(ctx context.Context) error

Start begins the worker loop, blocking until ctx is cancelled. It opens a dedicated LISTEN connection (separate from the pool) for immediate pg_notify wakeup. The fallback poll interval fires when no notification arrives within WithPollInterval.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL