catbird

package module
v0.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 25, 2026 License: Apache-2.0 Imports: 18 Imported by: 4

README

Go Reference License Go Version Go Report Card

CatBird

Catbird

A PostgreSQL-powered message queue and task execution engine. Catbird brings reliability and simplicity to background job processing by using your database as the single source of truth—no extra services to manage, just your database coordinating everything.

Why Catbird?

  • Transactional by default: enqueue messages in the same DB transaction as your app writes; rollback means no message.
  • Exactly-once within a visibility window: safe retries after crashes, no duplicate processing.
  • Database as coordinator: horizontal workers, PostgreSQL handles distribution and state.
  • Workflows as DAGs: dependencies, branching, and data passing between steps.
  • Persistence and auditability: queues, runs, and results live in PostgreSQL.
  • Resiliency baked in: retries, backoff, optional circuit breakers.
  • Operational UX: web dashboard for runs, queues, and workers.

Flow Visualization

Quick Start

client := catbird.New(conn)
ctx := context.Background()

// Queues
err := client.CreateQueue(ctx, "my-queue")
err = client.Send(ctx, "my-queue", map[string]any{"user_id": 123}, catbird.SendOpts{
    IdempotencyKey: "user-123",
})
messages, err := client.Read(ctx, "my-queue", 10, 30*time.Second)
for _, msg := range messages {
    err = client.Delete(ctx, "my-queue", msg.ID)
}

// Delayed send
client.Send(ctx, "my_queue", map[string]any{"job": "cleanup"}, catbird.SendOpts{VisibleAt: time.Now().Add(30 * time.Minute)})

// Tasks and flows
task := catbird.NewTask("send-email").
    Handler(func(ctx context.Context, input string) (string, error) {
        return "sent", nil
    })

flow := catbird.NewFlow("double-add").
    AddStep(catbird.NewStep("double").
        Handler(func(ctx context.Context, input int) (int, error) {
            return input * 2, nil
        })).
    AddStep(catbird.NewStep("add").
        DependsOn("double").
        Handler(func(ctx context.Context, input int, doubled int) (int, error) {
            return doubled + 1, nil
        }))

worker := client.NewWorker(ctx)
worker.AddTask(task)
worker.AddFlow(flow)
go worker.Start(ctx)

taskHandle, err := client.RunTask(ctx, "send-email", "hello")
var taskOut string
err = taskHandle.WaitForOutput(ctx, &taskOut)

flowHandle, err := client.RunFlow(ctx, "double-add", 10)
var flowOut int
err = flowHandle.WaitForOutput(ctx, &flowOut)

// Delayed execution
client.RunTask(ctx, "process-user", userID, catbird.RunTaskOpts{VisibleAt: time.Now().Add(5 * time.Minute)})
client.RunFlow(ctx, "order_processing", map[string]any{"order_id": 123}, catbird.RunFlowOpts{VisibleAt: time.Now().Add(30 * time.Second)})

// Ensure definitions exist before usage; this is not necessary if you
// just want to run a worker, definitions will be created for you on
// Start
err := client.CreateTask(ctx, taskA, taskB)
err := client.CreateFlow(ctx, flowA, flowB)

// Direct package-level usage (no Client), for example in a transaction:
taskHandle, err := catbird.RunTask(ctx, tx, "send-email", "hello")

Deduplication Strategies

Catbird supports two deduplication strategies for tasks and flows.

ConcurrencyKey (Temporary)

Prevents overlapping runs; allows re-runs after completion or failure.

IdempotencyKey (Permanent)

Ensures exactly-once execution; blocks reuse after completion.

// ConcurrencyKey: prevent overlap
_, err := client.RunTask(ctx, "process-user", userID, catbird.RunTaskOpts{
    ConcurrencyKey: fmt.Sprintf("user-%d", userID),
})

// IdempotencyKey: exactly once
_, err = client.RunTask(ctx, "charge-payment", payment, catbird.RunTaskOpts{
    IdempotencyKey: fmt.Sprintf("payment-%s", payment.ID),
})
Comparison Table
Feature ConcurrencyKey IdempotencyKey
Purpose Prevent overlapping runs Ensure exactly-once execution
Deduplicates queued, started queued, started, completed
After completion Allows re-run Rejects duplicate
After failure Allows retry Allows retry
Use for Rate limiting, resource locking, scheduled tasks Payments, orders, webhooks, audit logs

Important Notes

  • Mutually exclusive: You cannot provide both ConcurrencyKey and IdempotencyKey for the same run (returns error)
  • Return value on duplicate: RunTask()/RunFlow() return a handle to the existing run ID
  • Failure retries: Both strategies allow retries on failed runs
  • No key = no deduplication: If you don't provide either key, duplicates are allowed
  • Queue messages: Use IdempotencyKey in SendOpts for exactly-once message delivery

Topic-Based Routing

err := client.CreateQueue(ctx, "user-events")
err = client.CreateQueue(ctx, "audit-log")

err = client.Bind(ctx, "user-events", "events.user.created")
err = client.Bind(ctx, "user-events", "events.?.updated")
err = client.Bind(ctx, "audit-log", "events.*")

err = client.Publish(ctx, "events.user.created", map[string]any{
    "user_id": 123,
    "email":   "user@example.com",
})
err = client.Unbind(ctx, "user-events", "events.?.updated")

Wildcard rules:

  • ? matches a single token (e.g., events.?.created matches events.user.created)
  • * matches one or more tokens at the end (e.g., events.user.* matches events.user.created.v1)
  • * must appear as .* at the end of the pattern
  • Tokens are separated by . and can contain a-z, A-Z, 0-9, _, -

Task Execution

// Define task (scheduling is separate)
task := catbird.NewTask("send-email").
    Handler(func(ctx context.Context, input EmailRequest) (EmailResponse, error) {
        return EmailResponse{SentAt: time.Now()}, nil
    }, catbird.HandlerOpts{
        Concurrency: 5,
        MaxRetries:  3,
        Backoff:     catbird.NewFullJitterBackoff(500*time.Millisecond, 10*time.Second),
        CircuitBreaker: catbird.NewCircuitBreaker(5, 30*time.Second),
    })

// Create a schedule for the task (optional; can run manually via RunTask)
client.CreateTaskSchedule(ctx, "send-email", "@hourly")

// Or with static input
client.CreateTaskSchedule(ctx, "send-report", "@hourly", catbird.ScheduleOpts{
    Input: EmailRequest{To: "ops@example.com", Subject: "Hourly report"},
})

// Define a task with a condition (skipped when condition is false)
conditionalTask := catbird.NewTask("premium-processing").
    Condition("input.is_premium"). // Skipped if is_premium = false
    Handler(func(ctx context.Context, input ProcessRequest) (string, error) {
        return "processed", nil
    })

// Create worker
worker := client.NewWorker(ctx)
// Add tasks
worker.AddTask(task)
worker.AddTask(conditionalTask)
go worker.Start(ctx)

// Run the task
handle, err := client.RunTask(ctx, "send-email", EmailRequest{
    To:      "user@example.com",
    Subject: "Hello",
})

// Get result
var result EmailResponse
err = handle.WaitForOutput(ctx, &result)

Flow Execution

A flow is a directed acyclic graph (DAG) of steps that execute when their dependencies are satisfied.

Summary

  • Steps with no dependencies start immediately; independent branches run in parallel.
  • A flow has exactly one final step; the output of the final step is the output of the flow.
  • Conditions can skip steps; downstream handlers must accept Optional[T] for any conditional dependency.
  • A step with a signal waits for both its dependencies and the signal input.
  • WaitForOutput() returns the final step output once the flow completes.

Examples: Workflows

flow := catbird.NewFlow("order-processing").
    AddStep(catbird.NewStep("validate").
        Handler(func(ctx context.Context, order Order) (ValidationResult, error) {
            if order.Amount <= 0 {
                return ValidationResult{Valid: false, Reason: "Invalid amount"}, nil
            }
            return ValidationResult{Valid: true}, nil
        })).
    AddStep(catbird.NewStep("charge").
        DependsOn("validate").
        Handler(func(ctx context.Context, order Order, validated ValidationResult) (ChargeResult, error) {
            if !validated.Valid {
                return ChargeResult{}, fmt.Errorf("cannot charge invalid order")
            }
            return ChargeResult{
                TransactionID: "txn-" + order.ID,
                Amount:        order.Amount,
            }, nil
        })).
    AddStep(catbird.NewStep("check-inventory").
        DependsOn("validate").
        Handler(func(ctx context.Context, order Order, validated ValidationResult) (InventoryCheck, error) {
            return InventoryCheck{
                InStock: true,
                Qty:     order.Amount,
            }, nil
        })).
    AddStep(catbird.NewStep("ship").
        DependsOn("charge", "check-inventory").
        Handler(func(ctx context.Context, order Order, chargeResult ChargeResult, inventory InventoryCheck) (ShipmentResult, error) {
            if !inventory.InStock {
                return ShipmentResult{}, fmt.Errorf("out of stock")
            }
            return ShipmentResult{
                TrackingNumber: "TRK-" + chargeResult.TransactionID,
                EstimatedDays:  3,
            }, nil
        }))

    // Create a schedule for the flow (optional; can run manually via RunFlow)
    client.CreateFlowSchedule(ctx, "order-processing", "0 2 * * *") // Daily at 2 AM

// Create worker
    worker := client.NewWorker(ctx)
// Add flow
worker.AddFlow(flow)
go worker.Start(ctx)

Example: Signals & Human-in-the-Loop

Signals enable workflows that wait for external input before proceeding, such as approval workflows or webhooks.

flow := catbird.NewFlow("document_approval").
    AddStep(catbird.NewStep("submit").
        Handler(func(ctx context.Context, doc Document) (string, error) {
            return doc.ID, nil
        })).
    AddStep(catbird.NewStep("approve").
        DependsOn("submit").
        Signal(true).
        Handler(func(ctx context.Context, doc Document, approval ApprovalInput, docID string) (ApprovalResult, error) {
            if !approval.Approved {
                return ApprovalResult{}, fmt.Errorf("approval denied by %s: %s", approval.ApproverID, approval.Notes)
            }
            return ApprovalResult{
                Status:     "approved",
                ApprovedBy: approval.ApproverID,
                Timestamp:  time.Now().Format(time.RFC3339),
            }, nil
        })).
    AddStep(catbird.NewStep("publish").
        DependsOn("approve").
        Handler(func(ctx context.Context, doc Document, approval ApprovalResult) (PublishResult, error) {
            return PublishResult{
                PublishedAt: time.Now().Format(time.RFC3339),
                URL:         "https://example.com/docs/" + approval.ApprovedBy,
            }, nil
        }))

A step with both dependencies and a signal waits for both conditions: all dependencies must complete and the signal must be delivered before the step executes.

Conditional Execution

Both tasks and flow steps support conditional execution via Condition on the builder methods. If the condition evaluates to false (or a referenced field is missing), the task/step is marked skipped and its handler does not run.

Rules at a Glance

  • Prefixes: tasks use input.*; flow steps use input.*, step_name.*, or signal.*.
  • Operators: eq, ne, gt, gte, lt, lte, in, exists, contains, plus not <expr>.
  • Optional outputs: if a step can be skipped, downstream handlers must accept Optional[T] for that dependency.
  • No AND/OR: only one expression per task/step; compute a derived field upstream if needed.

Tasks with Conditions

Tasks can use conditions to skip execution based on input fields.

type ProcessRequest struct {
    UserID     int    `json:"user_id"`
    IsPremium  bool   `json:"is_premium"`
    Amount     int    `json:"amount"`
    Environment string `json:"environment"`
}

// Only process premium users
premiumTask := catbird.NewTask("premium_processing").
    Condition("input.is_premium"). // Skipped if is_premium = false
    Handler(func(ctx context.Context, req ProcessRequest) (string, error) {
        return fmt.Sprintf("Processed premium user %d", req.UserID), nil
    })

// Run task - may be skipped based on input
client.RunTask(ctx, "premium_processing", ProcessRequest{UserID: 123, IsPremium: false})
// This task run will be skipped (is_premium = false)

Flows with Conditions

Flow steps can branch based on prior outputs. Use Optional[T] to handle skipped dependencies.

flow := catbird.NewFlow("payment_processing").
    AddStep(catbird.NewStep("validate").
        Handler(func(ctx context.Context, order Order) (ValidationResult, error) {
            return ValidationResult{Valid: order.Amount > 0}, nil
        })).
    AddStep(catbird.NewStep("charge").
        DependsOn("validate").
        Condition("validate.valid").
        Handler(func(ctx context.Context, order Order, validation ValidationResult) (ChargeResult, error) {
            return ChargeResult{TransactionID: "txn-123"}, nil
        })).
    AddStep(catbird.NewStep("finalize").
        DependsOn("charge").
        Handler(func(ctx context.Context, order Order, charge catbird.Optional[ChargeResult]) (FinalResult, error) {
            if charge.IsSet {
                return FinalResult{Status: "charged", TxnID: charge.Value.TransactionID}, nil
            }
            return FinalResult{Status: "free_order", TxnID: ""}, nil
        }))

Resiliency

Catbird includes multiple resiliency layers for runtime failures. Handler-level retries are configured with HandlerOpts (MaxRetries, Backoff), and external calls can be protected with HandlerOpts.CircuitBreaker (typically created via NewCircuitBreaker(...)) to avoid cascading outages. In worker database paths, PostgreSQL reads/writes are retried with bounded attempts and full-jitter backoff; retries stop immediately on context cancellation or deadline expiry.

Be aware of side effects

Catbird deduplication (ConcurrencyKey/IdempotencyKey) controls duplicate run creation, while handler retries can still re-attempt the same run after transient failures. For non-repeatable side effects (payments, email, webhooks), use idempotent write patterns or upstream idempotency keys so retry attempts remain safe.

Naming Rules

  • Queue, task, flow, and step names: Lowercase letters, digits, and underscores only (a-z, 0-9, _). Max 58 characters. Step names must be unique within a flow. Reserved step names: input, signal.
  • Topics/Patterns: Letters (upper/lower), digits, dots, underscores, and hyphens (a-z, A-Z, 0-9, ., _, -, plus wildcards ?, *).

Query Helpers

Use query builders when you want SQL + args directly (for pgx.Batch or custom execution):

  • SendQuery(queue, payload, opts)
  • PublishQuery(topic, payload, opts)
  • RunTaskQuery(name, input, opts)
  • RunFlowQuery(name, input, opts)
// Queue into a batch
var batch pgx.Batch
q1, args1, err := catbird.SendQuery("my-queue", map[string]any{"user_id": 123})
if err != nil {
    return err
}
batch.Queue(q1, args1...)

PostgreSQL API Reference

Catbird is built on PostgreSQL functions, so you can use the API directly from any language or tool with PostgreSQL support (psql, Python, Node.js, Ruby, etc.).

Queues

-- Create a queue
SELECT cb_create_queue(name => 'my_queue', expires_at => null, unlogged => false);

-- Send a message
SELECT cb_send(queue => 'my_queue', payload => '{"user_id": 123, "action": "process"}'::jsonb, 
               topic => null, idempotency_key => null, visible_at => null);

-- Publish to topic-bound queues
SELECT cb_publish(topic => 'events.user.created', payload => '{"user_id": 456}'::jsonb,
                   idempotency_key => 'user-456-created', visible_at => null);

-- Read messages (with 30 second visibility timeout)
SELECT * FROM cb_read(queue => 'my_queue', quantity => 10, hide_for => 30000);

-- Delete a message
SELECT cb_delete(queue => 'my_queue', id => 1);

-- Bind queue to topic pattern
SELECT cb_bind(queue_name => 'user_events', pattern => 'events.user.*');
SELECT cb_unbind(queue_name => 'user_events', pattern => 'events.user.*');

Tasks

-- Create a task definition
SELECT cb_create_task(name => 'send_email');

-- Run a task
SELECT * FROM cb_run_task(name => 'send_email', input => '{"to": "user@example.com"}'::jsonb, 
                          concurrency_key => null, idempotency_key => null, visible_at => null);

Workflows

-- Create a flow definition
SELECT cb_create_flow(name => 'order_processing', steps => '[
  {"name": "validate"},
  {"name": "charge", "depends_on": [{"name": "validate"}]},
  {"name": "ship", "depends_on": [{"name": "charge"}]}
]'::jsonb);

-- Run a flow
SELECT * FROM cb_run_flow(name => 'order_processing', input => '{"order_id": 123}'::jsonb,
                          concurrency_key => null, idempotency_key => null, visible_at => null);

Monitoring Task and Flow Runs

You can query task and flow run information directly:

-- List recent task runs (replace send_email with your task name)
SELECT id, concurrency_key, idempotency_key, status, input, output, error_message, started_at, completed_at, failed_at
FROM cb_t_send_email
ORDER BY started_at DESC
LIMIT 20;

-- Get flow run (replace order_processing with your flow name)
SELECT id, concurrency_key, idempotency_key, status, input, output, error_message, started_at, completed_at, failed_at
FROM cb_f_order_processing
WHERE id = $1;

Dashboard

The dashboard provides a web UI for monitoring queues, tasks, flows, and workers. You can run it standalone with the cb CLI or embed it as an http.Handler.

go install github.com/ugent-library/catbird/cmd/cb@latest
export CB_CONN="postgres://user:pass@localhost:5432/mydb?sslmode=disable"
cb dashboard

The dashboard is a standard http.Handler and can be embedded in any Go web application:

import (
    "log/slog"
    "net/http"
    
    "github.com/ugent-library/catbird"
    "github.com/ugent-library/catbird/dashboard"
)

func main() {
    client := catbird.New(conn)
    dash := dashboard.New(dashboard.Config{
        Client:     client,
        Log:        slog.Default(), // Optional: provide custom logger
        PathPrefix: "",              // Optional: mount at a subpath (e.g., "/admin")
    })
    http.ListenAndServe(":8080", dash.Handler())
}

Documentation

Acknowledgments

SQL code is taken from or inspired by the excellent pgmq and pgflow projects.

Documentation

Overview

Package catbird provides a PostgreSQL-based message queue with task and workflow execution engine.

Scheduling for Distributed Environments

Catbird includes built-in support for scheduled task and flow execution using cron syntax. When multiple workers run concurrently (on different machines or processes), Catbird guarantees that each scheduled execution runs **exactly once per cron tick**, even across clock skew and timezone differences.

This is achieved through:

  • UTC-normalized cron scheduling: all workers use UTC, eliminating timezone confusion
  • Idempotency key deduplication: each cron tick generates a deterministic key (format: "schedule:{unix_nanos_utc}") that persists across completion
  • PostgreSQL as the single source of truth: the database enforces the unique constraint on idempotency keys, preventing duplicates at the atomic level

See scheduler.go for implementation details and SCHEDULING_ADVANCED.md for the roadmap toward a future DB-driven scheduling system with even stronger guarantees.

Index

Constants

View Source
const (
	StatusCreated   = "created"
	StatusStarted   = "started"
	StatusCompleted = "completed"
	StatusFailed    = "failed"
	StatusSkipped   = "skipped" // Step skipped due to condition
)

Status constants for task and flow runs

View Source
const SchemaVersion = 13

Variables

View Source
var (
	// ErrRunFailed is returned when you try to unmarshal the output of a failed task or flow run
	ErrRunFailed = fmt.Errorf("run failed")
)

Functions

func Bind

func Bind(ctx context.Context, conn Conn, queueName string, pattern string) error

Bind subscribes a queue to a topic pattern. Pattern supports exact topics and wildcards: ? (single token), * (multi-token tail). Examples: "foo.bar", "foo.?.bar", "foo.bar.*"

func CreateFlow

func CreateFlow(ctx context.Context, conn Conn, flows ...*Flow) error

CreateFlow creates one or more flow definitions.

func CreateFlowSchedule added in v0.0.5

func CreateFlowSchedule(ctx context.Context, conn Conn, flowName, cronSpec string, opts ...ScheduleOpts) error

CreateFlowSchedule creates a cron-based schedule for a flow.

func CreateQueue

func CreateQueue(ctx context.Context, conn Conn, queueName string, opts ...QueueOpts) error

CreateQueue creates a queue with the given name and optional options. Use Bind() separately to create topic bindings.

func CreateTask

func CreateTask(ctx context.Context, conn Conn, tasks ...*Task) error

CreateTask creates one or more task definitions.

func CreateTaskSchedule added in v0.0.5

func CreateTaskSchedule(ctx context.Context, conn Conn, taskName, cronSpec string, opts ...ScheduleOpts) error

CreateTaskSchedule creates a cron-based schedule for a task.

func Delete

func Delete(ctx context.Context, conn Conn, queueName string, id int64) (bool, error)

Delete deletes a single message from the queue. Returns true if the message existed.

func DeleteMany

func DeleteMany(ctx context.Context, conn Conn, queueName string, ids []int64) error

DeleteMany deletes multiple messages from the queue.

func DeleteQueue

func DeleteQueue(ctx context.Context, conn Conn, queueName string) (bool, error)

DeleteQueue deletes a queue and all its messages. Returns true if the queue existed.

func GC

func GC(ctx context.Context, conn Conn) error

GC runs garbage collection to clean up expired queues and stale workers. Note: Worker heartbeats automatically perform cleanup, so this is mainly useful for deployments without workers or for manual control.

func Hide

func Hide(ctx context.Context, conn Conn, queueName string, id int64, hideFor time.Duration) (bool, error)

Hide hides a single message from being read for the specified duration. Returns true if the message existed.

func HideMany

func HideMany(ctx context.Context, conn Conn, queueName string, ids []int64, hideFor time.Duration) error

HideMany hides multiple messages from being read for the specified duration.

func MigrateDownTo

func MigrateDownTo(ctx context.Context, db *sql.DB, version int) error

func MigrateUpTo

func MigrateUpTo(ctx context.Context, db *sql.DB, version int) error

func Publish added in v0.0.3

func Publish(ctx context.Context, conn Conn, topic string, payload any, opts ...PublishOpts) error

Publish sends a message to topic-subscribed queues with options. Pass no opts to use defaults.

func PublishQuery added in v0.0.4

func PublishQuery(topic string, payload any, opts ...PublishOpts) (string, []any, error)

PublishQuery builds the SQL query and args for a Publish operation. Pass no opts to use defaults.

func RunFlowQuery added in v0.0.4

func RunFlowQuery(flowName string, input any, opts ...RunFlowOpts) (string, []any, error)

RunFlowQuery builds the SQL query and args for a RunFlow operation. Pass no opts to use defaults.

func RunTaskQuery added in v0.0.4

func RunTaskQuery(taskName string, input any, opts ...RunTaskOpts) (string, []any, error)

RunTaskQuery builds the SQL query and args for a RunTask operation. Pass no opts to use defaults.

func Send

func Send(ctx context.Context, conn Conn, queueName string, payload any, opts ...SendOpts) error

Send enqueues a message to the specified queue. Pass no opts to use defaults.

func SendQuery added in v0.0.4

func SendQuery(queueName string, payload any, opts ...SendOpts) (string, []any, error)

SendQuery builds the SQL query and args for a Send operation. Pass no opts to use defaults.

func SignalFlow

func SignalFlow(ctx context.Context, conn Conn, flowName string, flowRunID int64, stepName string, input any) error

SignalFlow delivers a signal to a waiting step in a flow run. The step must have been defined with a signal variant (e.g., NewStepWithSignal, NewStepWithSignalAndDependency). Signals enable human-in-the-loop workflows where a step waits for external input before executing. Returns an error if the signal was already delivered or the step doesn't require a signal.

func Unbind

func Unbind(ctx context.Context, conn Conn, queueName string, pattern string) error

Unbind unsubscribes a queue from a topic pattern.

Types

type BackoffStrategy added in v0.0.3

type BackoffStrategy interface {
	// Validate returns an error if configuration is invalid.
	Validate() error
	// NextDelay returns a delay for a zero-based delivery count (first retry = 0).
	// Implementations should always return a positive duration.
	NextDelay(deliveryCount int) time.Duration
}

BackoffStrategy defines how retry delays are calculated based on delivery count. Implementations must be safe for concurrent use.

type CircuitBreaker added in v0.0.3

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

func NewCircuitBreaker added in v0.0.3

func NewCircuitBreaker(failureThreshold int, openTimeout time.Duration) *CircuitBreaker

func (*CircuitBreaker) Allow added in v0.0.3

func (c *CircuitBreaker) Allow(now time.Time) (bool, time.Duration)

func (*CircuitBreaker) RecordFailure added in v0.0.3

func (c *CircuitBreaker) RecordFailure(now time.Time)

func (*CircuitBreaker) RecordSuccess added in v0.0.3

func (c *CircuitBreaker) RecordSuccess()

func (*CircuitBreaker) Validate added in v0.0.3

func (c *CircuitBreaker) Validate() error

type CircuitBreakerStrategy added in v0.0.3

type CircuitBreakerStrategy interface {
	// Validate returns an error if configuration is invalid.
	Validate() error
	// Allow returns whether a call is permitted and how long to wait if not.
	Allow(now time.Time) (bool, time.Duration)
	// RecordSuccess updates breaker state after a successful call.
	RecordSuccess()
	// RecordFailure updates breaker state after a failed call.
	RecordFailure(now time.Time)
}

CircuitBreakerStrategy defines the interface for circuit breaker behavior. Implementations must be safe for concurrent use.

type Client

type Client struct {
	Conn Conn
}

Client is a facade for interacting with Catbird

func New

func New(conn Conn) *Client

New creates a new Client with the given database connection.

The connection can be a *pgxpool.Pool, *pgx.Conn, or pgx.Tx.

func (*Client) Bind

func (c *Client) Bind(ctx context.Context, queueName string, pattern string) error

Bind subscribes a queue to a topic pattern. Pattern supports exact topics and wildcards: ? (single token), * (multi-token tail). Examples: "foo.bar", "foo.?.bar", "foo.bar.*"

func (*Client) CreateFlow

func (c *Client) CreateFlow(ctx context.Context, flows ...*Flow) error

CreateFlow creates one or more flow definitions.

func (*Client) CreateFlowSchedule added in v0.0.5

func (c *Client) CreateFlowSchedule(ctx context.Context, flowName, cronSpec string, opts ...ScheduleOpts) error

CreateFlowSchedule creates a cron-based schedule for a flow. cronSpec should be in 5-field format (min hour day month dow) or descriptors like @hourly, @daily. opts is optional ScheduleOpts configuring the schedule (Input field for static input).

func (*Client) CreateQueue

func (c *Client) CreateQueue(ctx context.Context, queueName string, opts ...QueueOpts) error

CreateQueue creates a queue with the given name and optional options.

func (*Client) CreateTask

func (c *Client) CreateTask(ctx context.Context, tasks ...*Task) error

CreateTask creates one or more task definitions.

func (*Client) CreateTaskSchedule added in v0.0.5

func (c *Client) CreateTaskSchedule(ctx context.Context, taskName, cronSpec string, opts ...ScheduleOpts) error

CreateTaskSchedule creates a cron-based schedule for a task. cronSpec should be in 5-field format (min hour day month dow) or descriptors like @hourly, @daily. opts is optional ScheduleOpts configuring the schedule (Input field for static input).

func (*Client) Delete

func (c *Client) Delete(ctx context.Context, queueName string, id int64) (bool, error)

Delete deletes a single message from the queue. Returns true if the message existed.

func (*Client) DeleteMany

func (c *Client) DeleteMany(ctx context.Context, queueName string, ids []int64) error

DeleteMany deletes multiple messages from the queue.

func (*Client) DeleteQueue

func (c *Client) DeleteQueue(ctx context.Context, queueName string) (bool, error)

DeleteQueue deletes a queue and all its messages. Returns true if the queue existed.

func (*Client) GetFlow

func (c *Client) GetFlow(ctx context.Context, flowName string) (*FlowInfo, error)

GetFlow retrieves flow metadata by name.

func (*Client) GetFlowRun

func (c *Client) GetFlowRun(ctx context.Context, flowName string, flowRunID int64) (*FlowRunInfo, error)

GetFlowRun retrieves a specific flow run result by ID.

func (*Client) GetFlowRunSteps

func (c *Client) GetFlowRunSteps(ctx context.Context, flowName string, flowRunID int64) ([]*StepRunInfo, error)

GetFlowRunSteps retrieves all step runs for a specific flow run.

func (*Client) GetQueue

func (c *Client) GetQueue(ctx context.Context, queueName string) (*QueueInfo, error)

GetQueue retrieves queue metadata by name.

func (*Client) GetTask

func (c *Client) GetTask(ctx context.Context, taskName string) (*TaskInfo, error)

GetTask retrieves task metadata by name.

func (*Client) GetTaskRun

func (c *Client) GetTaskRun(ctx context.Context, taskName string, taskRunID int64) (*TaskRunInfo, error)

GetTaskRun retrieves a specific task run result by ID.

func (*Client) Hide

func (c *Client) Hide(ctx context.Context, queueName string, id int64, hideFor time.Duration) (bool, error)

Hide hides a single message from being read for the specified duration. Returns true if the message existed.

func (*Client) HideMany

func (c *Client) HideMany(ctx context.Context, queueName string, ids []int64, hideFor time.Duration) error

HideMany hides multiple messages from being read for the specified duration.

func (*Client) ListFlowRuns

func (c *Client) ListFlowRuns(ctx context.Context, flowName string) ([]*FlowRunInfo, error)

ListFlowRuns returns recent flow runs for the specified flow.

func (*Client) ListFlowSchedules added in v0.0.5

func (c *Client) ListFlowSchedules(ctx context.Context) ([]*FlowScheduleInfo, error)

ListFlowSchedules returns all flow schedules ordered by next_run_at.

func (*Client) ListFlows

func (c *Client) ListFlows(ctx context.Context) ([]*FlowInfo, error)

ListFlows returns all flows

func (*Client) ListQueues

func (c *Client) ListQueues(ctx context.Context) ([]*QueueInfo, error)

ListQueues returns all queues

func (*Client) ListTaskRuns

func (c *Client) ListTaskRuns(ctx context.Context, taskName string) ([]*TaskRunInfo, error)

ListTaskRuns returns recent task runs for the specified task.

func (*Client) ListTaskSchedules added in v0.0.5

func (c *Client) ListTaskSchedules(ctx context.Context) ([]*TaskScheduleInfo, error)

ListTaskSchedules returns all task schedules ordered by next_run_at.

func (*Client) ListTasks

func (c *Client) ListTasks(ctx context.Context) ([]*TaskInfo, error)

ListTasks returns all tasks

func (*Client) ListWorkers

func (c *Client) ListWorkers(ctx context.Context) ([]*WorkerInfo, error)

ListWorkers returns all registered workers.

func (*Client) NewWorker

func (c *Client) NewWorker(ctx context.Context, opts ...WorkerOpts) *Worker

NewWorker creates a new worker that processes task and flow executions. Use the builder pattern methods (AddTask, AddFlow, etc.) to configure, then call Start(ctx) to begin processing.

func (*Client) Publish added in v0.0.3

func (c *Client) Publish(ctx context.Context, topic string, payload any, opts ...PublishOpts) error

Publish sends a message to all queues subscribed to the specified topic.

func (*Client) Read

func (c *Client) Read(ctx context.Context, queueName string, quantity int, hideFor time.Duration) ([]Message, error)

Read reads up to quantity messages from the queue, hiding them from other readers for the specified duration.

func (*Client) ReadPoll

func (c *Client) ReadPoll(ctx context.Context, queueName string, quantity int, hideFor time.Duration, opts ...ReadPollOpts) ([]Message, error)

ReadPoll reads messages from a queue with polling support. It polls repeatedly at the specified interval until messages are available or the pollFor timeout is reached.

func (*Client) RunFlow

func (c *Client) RunFlow(ctx context.Context, flowName string, input any, opts ...RunFlowOpts) (*FlowHandle, error)

RunFlow enqueues a flow execution and returns a handle for monitoring.

func (*Client) RunTask

func (c *Client) RunTask(ctx context.Context, taskName string, input any, opts ...RunTaskOpts) (*TaskHandle, error)

RunTask enqueues a task execution and returns a handle for monitoring progress and retrieving output.

func (*Client) Send

func (c *Client) Send(ctx context.Context, queueName string, payload any, opts ...SendOpts) error

Send enqueues a message to the specified queue.

func (*Client) SignalFlow

func (c *Client) SignalFlow(ctx context.Context, flowName string, flowRunID int64, stepName string, input any) error

SignalFlow delivers a signal to a waiting step in a flow run. The step must have been defined with a signal variant (e.g., NewStepWithSignal). Returns an error if the signal was already delivered or the step doesn't require a signal.

func (*Client) Unbind

func (c *Client) Unbind(ctx context.Context, queueName string, pattern string) error

Unbind unsubscribes a queue from a topic pattern.

type Conn

type Conn interface {
	Exec(context.Context, string, ...any) (pgconn.CommandTag, error)
	Query(context.Context, string, ...any) (pgx.Rows, error)
	QueryRow(context.Context, string, ...any) pgx.Row
}

Conn is an interface for database connections compatible with pgx.Conn and pgx.Pool

type Flow

type Flow struct {
	// contains filtered or unexported fields
}

func NewFlow

func NewFlow(name string) *Flow

func (*Flow) AddStep added in v0.0.3

func (f *Flow) AddStep(step *Step) *Flow

type FlowHandle added in v0.0.6

type FlowHandle struct {
	Name string
	ID   int64
	// contains filtered or unexported fields
}

FlowHandle is a handle to a flow execution.

func RunFlow

func RunFlow(ctx context.Context, conn Conn, flowName string, input any, opts ...RunFlowOpts) (*FlowHandle, error)

RunFlow enqueues a flow execution and returns a handle for monitoring.

func (*FlowHandle) WaitForOutput added in v0.0.6

func (h *FlowHandle) WaitForOutput(ctx context.Context, out any, opts ...WaitOpts) error

WaitForOutput blocks until the flow execution completes and unmarshals the output. Pass optional WaitOpts to customize polling behavior; defaults are used when omitted.

type FlowInfo

type FlowInfo struct {
	Name      string     `json:"name"`
	Steps     []StepInfo `json:"steps"`
	CreatedAt time.Time  `json:"created_at"`
}

func GetFlow

func GetFlow(ctx context.Context, conn Conn, flowName string) (*FlowInfo, error)

GetFlow retrieves flow metadata by name.

func ListFlows

func ListFlows(ctx context.Context, conn Conn) ([]*FlowInfo, error)

ListFlows returns all flows

type FlowRunInfo added in v0.0.6

type FlowRunInfo struct {
	ID             int64           `json:"id"`
	ConcurrencyKey string          `json:"concurrency_key,omitempty"`
	IdempotencyKey string          `json:"idempotency_key,omitempty"`
	Status         string          `json:"status"`
	Input          json.RawMessage `json:"input,omitempty"`
	Output         json.RawMessage `json:"output,omitempty"`
	ErrorMessage   string          `json:"error_message,omitempty"`
	StartedAt      time.Time       `json:"started_at,omitzero"`
	CompletedAt    time.Time       `json:"completed_at,omitzero"`
	FailedAt       time.Time       `json:"failed_at,omitzero"`
}

FlowRunInfo represents the details of a flow execution.

func GetFlowRun

func GetFlowRun(ctx context.Context, conn Conn, flowName string, flowRunID int64) (*FlowRunInfo, error)

GetFlowRun retrieves a specific flow run result by ID.

func ListFlowRuns

func ListFlowRuns(ctx context.Context, conn Conn, flowName string) ([]*FlowRunInfo, error)

ListFlowRuns returns recent flow runs for the specified flow.

func (*FlowRunInfo) OutputAs added in v0.0.6

func (r *FlowRunInfo) OutputAs(out any) error

OutputAs unmarshals the output of a completed flow run. Returns an error if the flow run has failed or is not completed yet.

type FlowScheduleInfo added in v0.0.5

type FlowScheduleInfo struct {
	FlowName       string    `json:"flow_name"`
	CronSpec       string    `json:"cron_spec"`
	NextRunAt      time.Time `json:"next_run_at"`
	LastRunAt      time.Time `json:"last_run_at,omitzero"`
	LastEnqueuedAt time.Time `json:"last_enqueued_at,omitzero"`
	Enabled        bool      `json:"enabled"`
	CreatedAt      time.Time `json:"created_at"`
	UpdatedAt      time.Time `json:"updated_at"`
}

FlowScheduleInfo contains metadata about a scheduled flow.

func ListFlowSchedules added in v0.0.5

func ListFlowSchedules(ctx context.Context, conn Conn) ([]*FlowScheduleInfo, error)

ListFlowSchedules returns all flow schedules ordered by next_run_at.

type FullJitterBackoff added in v0.0.3

type FullJitterBackoff struct {
	MinDelay time.Duration
	MaxDelay time.Duration
}

FullJitterBackoff implements exponential backoff with full jitter.

func NewFullJitterBackoff added in v0.0.3

func NewFullJitterBackoff(minDelay, maxDelay time.Duration) *FullJitterBackoff

NewFullJitterBackoff creates a FullJitterBackoff with the provided bounds.

func (*FullJitterBackoff) NextDelay added in v0.0.3

func (b *FullJitterBackoff) NextDelay(deliveryCount int) time.Duration

NextDelay returns the jittered delay for the given delivery count. deliveryCount is expected to be zero-based for the first retry.

func (*FullJitterBackoff) Validate added in v0.0.3

func (b *FullJitterBackoff) Validate() error

Validate checks the backoff configuration for consistency.

type HandlerOpts added in v0.0.3

type HandlerOpts struct {
	Concurrency    int
	BatchSize      int
	Timeout        time.Duration
	MaxRetries     int
	Backoff        BackoffStrategy
	CircuitBreaker CircuitBreakerStrategy
}

type Message

type Message struct {
	ID             int64           `json:"id"`
	IdempotencyKey string          `json:"idempotency_key,omitempty"`
	Topic          string          `json:"topic"`
	Payload        json.RawMessage `json:"payload"`
	Deliveries     int             `json:"deliveries"`
	CreatedAt      time.Time       `json:"created_at"`
	VisibleAt      time.Time       `json:"visible_at"`
}

Message represents a message in a queue

func Read

func Read(ctx context.Context, conn Conn, queueName string, quantity int, hideFor time.Duration) ([]Message, error)

Read reads up to quantity messages from the queue, hiding them from other readers for the specified duration.

func ReadPoll

func ReadPoll(ctx context.Context, conn Conn, queueName string, quantity int, hideFor time.Duration, opts ...ReadPollOpts) ([]Message, error)

ReadPoll reads messages from a queue with polling support. It polls repeatedly at the specified interval until messages are available or the pollFor timeout is reached. Pass optional ReadPollOpts to configure polling behavior; defaults are used when omitted.

type Optional

type Optional[T any] struct {
	IsSet bool
	Value T
}

Optional wraps a dependency output that may be absent.

type PublishOpts added in v0.0.3

type PublishOpts struct {
	IdempotencyKey string
	VisibleAt      *time.Time
}

type QueueInfo

type QueueInfo struct {
	Name      string    `json:"name"`
	Unlogged  bool      `json:"unlogged"`
	CreatedAt time.Time `json:"created_at"`
	ExpiresAt time.Time `json:"expires_at,omitzero"`
}

func GetQueue

func GetQueue(ctx context.Context, conn Conn, queueName string) (*QueueInfo, error)

GetQueue retrieves queue metadata by name.

func ListQueues

func ListQueues(ctx context.Context, conn Conn) ([]*QueueInfo, error)

ListQueues returns all queues

type QueueOpts

type QueueOpts struct {
	ExpiresAt time.Time
	Unlogged  bool
}

type ReadPollOpts added in v0.0.6

type ReadPollOpts struct {
	PollFor      time.Duration
	PollInterval time.Duration
}

ReadPollOpts configures ReadPoll polling behavior. Zero values use defaults.

type RunFlowOpts

type RunFlowOpts struct {
	ConcurrencyKey string // Prevents overlapping runs; allows reruns after completion
	IdempotencyKey string // Prevents all duplicate runs; permanent across all statuses
	VisibleAt      time.Time
}

type RunTaskOpts added in v0.0.6

type RunTaskOpts struct {
	ConcurrencyKey string // Prevents overlapping runs; allows reruns after completion
	IdempotencyKey string // Prevents all duplicate runs; permanent across all statuses
	VisibleAt      time.Time
}

type ScheduleOpts added in v0.0.5

type ScheduleOpts struct {
	Input any
}

ScheduleOpts configures scheduled task/flow behavior.

type SendOpts

type SendOpts struct {
	Topic          string
	IdempotencyKey string
	VisibleAt      time.Time
}

type Step

type Step struct {
	// contains filtered or unexported fields
}

func NewStep added in v0.0.3

func NewStep(name string) *Step

func (*Step) Condition

func (s *Step) Condition(condition string) *Step

func (*Step) DependsOn

func (s *Step) DependsOn(deps ...string) *Step

func (*Step) Handler added in v0.0.3

func (s *Step) Handler(fn any, opts ...HandlerOpts) *Step

func (*Step) Signal added in v0.0.3

func (s *Step) Signal(signal bool) *Step

type StepDependencyInfo

type StepDependencyInfo struct {
	Name string `json:"name"`
}

type StepHandlerInfo

type StepHandlerInfo struct {
	FlowName string `json:"flow_name"`
	StepName string `json:"step_name"`
}

type StepInfo

type StepInfo struct {
	Name      string               `json:"name"`
	DependsOn []StepDependencyInfo `json:"depends_on,omitempty"`
}

type StepRunInfo

type StepRunInfo struct {
	ID           int64           `json:"id"`
	StepName     string          `json:"step_name"`
	Status       string          `json:"status"`
	Output       json.RawMessage `json:"output,omitempty"`
	ErrorMessage string          `json:"error_message,omitempty"`
	StartedAt    time.Time       `json:"started_at,omitzero"`
	CompletedAt  time.Time       `json:"completed_at,omitzero"`
	FailedAt     time.Time       `json:"failed_at,omitzero"`
	SkippedAt    time.Time       `json:"skipped_at,omitzero"`
}

StepRunInfo represents the execution state of a single step within a flow run.

func GetFlowRunSteps

func GetFlowRunSteps(ctx context.Context, conn Conn, flowName string, flowRunID int64) ([]*StepRunInfo, error)

GetFlowRunSteps retrieves all step runs for a specific flow run.

type Task

type Task struct {
	// contains filtered or unexported fields
}

Task is a reflection-based task with optional handler. Use NewTask().Handler(fn, opts) for tasks with handlers. Use NewTask() for definition-only tasks.

func NewTask

func NewTask(name string) *Task

NewTask creates a new task definition with the given name. Chain .Handler() to add a handler, otherwise returns a definition-only task.

func (*Task) Condition added in v0.0.3

func (t *Task) Condition(condition string) *Task

Condition sets the condition expression for the task.

func (*Task) Handler added in v0.0.3

func (t *Task) Handler(fn any, opts ...HandlerOpts) *Task

Handler sets the task handler function and execution options. fn must have signature (context.Context, In) (Out, error). If opts is omitted, defaults are used (concurrency: 1, batchSize: 10).

func (*Task) MarshalJSON added in v0.0.3

func (t *Task) MarshalJSON() ([]byte, error)

MarshalJSON serializes the task for JSON output

type TaskHandle added in v0.0.6

type TaskHandle struct {
	Name string
	ID   int64
	// contains filtered or unexported fields
}

TaskHandle is a handle to a task execution.

func RunTask

func RunTask(ctx context.Context, conn Conn, taskName string, input any, opts ...RunTaskOpts) (*TaskHandle, error)

RunTask enqueues a task execution and returns a handle for monitoring progress and retrieving output.

func (*TaskHandle) WaitForOutput added in v0.0.6

func (h *TaskHandle) WaitForOutput(ctx context.Context, out any, opts ...WaitOpts) error

WaitForOutput blocks until the task execution completes and unmarshals the output. Pass optional WaitOpts to customize polling behavior; defaults are used when omitted.

type TaskHandlerInfo

type TaskHandlerInfo struct {
	TaskName string `json:"task_name"`
}

type TaskInfo

type TaskInfo struct {
	Name      string    `json:"name"`
	CreatedAt time.Time `json:"created_at"`
}

func GetTask

func GetTask(ctx context.Context, conn Conn, taskName string) (*TaskInfo, error)

GetTask retrieves task metadata by name.

func ListTasks

func ListTasks(ctx context.Context, conn Conn) ([]*TaskInfo, error)

ListTasks returns all tasks

type TaskRunInfo added in v0.0.6

type TaskRunInfo struct {
	ID             int64           `json:"id"`
	ConcurrencyKey string          `json:"concurrency_key,omitempty"`
	IdempotencyKey string          `json:"idempotency_key,omitempty"`
	Status         string          `json:"status"`
	Input          json.RawMessage `json:"input,omitempty"`
	Output         json.RawMessage `json:"output,omitempty"`
	ErrorMessage   string          `json:"error_message,omitempty"`
	StartedAt      time.Time       `json:"started_at,omitzero"`
	CompletedAt    time.Time       `json:"completed_at,omitzero"`
	FailedAt       time.Time       `json:"failed_at,omitzero"`
	SkippedAt      time.Time       `json:"skipped_at,omitzero"`
}

TaskRunInfo represents the details of a task execution.

func GetTaskRun

func GetTaskRun(ctx context.Context, conn Conn, taskName string, taskRunID int64) (*TaskRunInfo, error)

GetTaskRun retrieves a specific task run result by ID.

func ListTaskRuns

func ListTaskRuns(ctx context.Context, conn Conn, taskName string) ([]*TaskRunInfo, error)

ListTaskRuns returns recent task runs for the specified task.

func (*TaskRunInfo) OutputAs added in v0.0.6

func (r *TaskRunInfo) OutputAs(out any) error

OutputAs unmarshals the output of a completed task run. Returns an error if the task run has failed or is not completed yet.

type TaskScheduleInfo added in v0.0.5

type TaskScheduleInfo struct {
	TaskName       string    `json:"task_name"`
	CronSpec       string    `json:"cron_spec"`
	NextRunAt      time.Time `json:"next_run_at"`
	LastRunAt      time.Time `json:"last_run_at,omitzero"`
	LastEnqueuedAt time.Time `json:"last_enqueued_at,omitzero"`
	Enabled        bool      `json:"enabled"`
	CreatedAt      time.Time `json:"created_at"`
	UpdatedAt      time.Time `json:"updated_at"`
}

TaskScheduleInfo contains metadata about a scheduled task.

func ListTaskSchedules added in v0.0.5

func ListTaskSchedules(ctx context.Context, conn Conn) ([]*TaskScheduleInfo, error)

ListTaskSchedules returns all task schedules ordered by next_run_at.

type WaitOpts added in v0.0.6

type WaitOpts struct {
	PollFor      time.Duration
	PollInterval time.Duration
}

WaitOpts configures WaitForOutput polling behavior. Zero values use defaults.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker processes tasks and flows from the queue

func NewWorker

func NewWorker(conn Conn, opts ...WorkerOpts) *Worker

NewWorker creates a new worker with the given connection and configuration. Use builder methods (AddTask, AddFlow, etc.) to configure the worker. Call Start(ctx) to begin processing tasks and flows.

func (*Worker) AddFlow added in v0.0.3

func (w *Worker) AddFlow(f *Flow) *Worker

AddFlow registers a flow with the worker.

func (*Worker) AddTask added in v0.0.3

func (w *Worker) AddTask(t *Task) *Worker

AddTask registers a task with the worker.

func (*Worker) Start

func (w *Worker) Start(ctx context.Context) error

Start begins processing tasks and flows.

The worker will:

  • poll for new work and execute task and flow step handlers while ctx is active
  • run any configured cron-style task and flow schedules
  • send periodic heartbeats while it is running
  • register built-in garbage collection task running every 5 minutes

Shutdown behaviour:

  • when ctx is cancelled the worker immediately stops reading new work and begins shutting down
  • if ShutdownTimeout is set to a value > 0, that duration is used as a grace period for in‑flight handlers after ctx is cancelled; once the grace period expires the handler context is cancelled and remaining handlers are asked to stop. The default graceful shutdown timeout is 5 seconds.
  • if ShutdownTimeout is not set or set to 0, there is no grace period: the handler context is cancelled immediately once ctx is cancelled and Start returns after all goroutines finish

type WorkerInfo

type WorkerInfo struct {
	ID              string             `json:"id"`
	TaskHandlers    []*TaskHandlerInfo `json:"task_handlers"`
	StepHandlers    []*StepHandlerInfo `json:"step_handlers"`
	StartedAt       time.Time          `json:"started_at"`
	LastHeartbeatAt time.Time          `json:"last_heartbeat_at"`
}

func ListWorkers

func ListWorkers(ctx context.Context, conn Conn) ([]*WorkerInfo, error)

ListWorkers returns all registered workers.

type WorkerOpts added in v0.0.3

type WorkerOpts struct {
	Logger          *slog.Logger
	ShutdownTimeout time.Duration
}

WorkerOpts is a configuration struct for creating workers

Directories

Path Synopsis
cmd
cb command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL