Documentation
¶
Overview ¶
Package catbird provides a PostgreSQL-based message queue with task and workflow execution engine.
Scheduling for Distributed Environments ¶
Catbird includes built-in support for scheduled task and flow execution using cron syntax. When multiple workers run concurrently (on different machines or processes), Catbird guarantees that each scheduled execution runs **exactly once per cron tick**, even across clock skew and timezone differences.
This is achieved through:
- UTC-normalized cron scheduling: all workers use UTC, eliminating timezone confusion
- Idempotency key deduplication: each cron tick generates a deterministic key (format: "schedule:{unix_nanos_utc}") that persists across completion
- PostgreSQL as the single source of truth: the database enforces the unique constraint on idempotency keys, preventing duplicates at the atomic level
See scheduler.go for implementation details and SCHEDULING_ADVANCED.md for the roadmap toward a future DB-driven scheduling system with even stronger guarantees.
Index ¶
- Constants
- Variables
- func Bind(ctx context.Context, conn Conn, queue string, pattern string) error
- func CreateFlow(ctx context.Context, conn Conn, flows ...*Flow) error
- func CreateQueue(ctx context.Context, conn Conn, queues ...*Queue) error
- func CreateTask(ctx context.Context, conn Conn, tasks ...*Task) error
- func Delete(ctx context.Context, conn Conn, queue string, id int64) (bool, error)
- func DeleteMany(ctx context.Context, conn Conn, queue string, ids []int64) error
- func DeleteQueue(ctx context.Context, conn Conn, name string) (bool, error)
- func EnqueuePublish(batch *pgx.Batch, topic string, payload any, opts PublishOpts) error
- func EnqueueSend(batch *pgx.Batch, queue string, payload any, opts SendOpts) error
- func GC(ctx context.Context, conn Conn) error
- func Hide(ctx context.Context, conn Conn, queue string, id int64, hideFor time.Duration) (bool, error)
- func HideMany(ctx context.Context, conn Conn, queue string, ids []int64, ...) error
- func MigrateDownTo(ctx context.Context, db *sql.DB, version int) error
- func MigrateUpTo(ctx context.Context, db *sql.DB, version int) error
- func Publish(ctx context.Context, conn Conn, topic string, payload any, opts *PublishOpts) error
- func Send(ctx context.Context, conn Conn, queue string, payload any, opts *SendOpts) error
- func SignalFlow(ctx context.Context, conn Conn, flowName string, flowRunID int64, ...) error
- func Unbind(ctx context.Context, conn Conn, queue string, pattern string) error
- type BackoffStrategy
- type CircuitBreaker
- type CircuitBreakerStrategy
- type Client
- func (c *Client) Bind(ctx context.Context, queue string, pattern string) error
- func (c *Client) CreateFlow(ctx context.Context, flows ...*Flow) error
- func (c *Client) CreateQueue(ctx context.Context, queues ...*Queue) error
- func (c *Client) CreateTask(ctx context.Context, tasks ...*Task) error
- func (c *Client) Delete(ctx context.Context, queue string, id int64) (bool, error)
- func (c *Client) DeleteMany(ctx context.Context, queue string, ids []int64) error
- func (c *Client) DeleteQueue(ctx context.Context, name string) (bool, error)
- func (c *Client) GC(ctx context.Context) error
- func (c *Client) GetFlow(ctx context.Context, name string) (*FlowInfo, error)
- func (c *Client) GetFlowRun(ctx context.Context, name string, id int64) (*RunInfo, error)
- func (c *Client) GetFlowRunSteps(ctx context.Context, flowName string, flowRunID int64) ([]*StepRunInfo, error)
- func (c *Client) GetQueue(ctx context.Context, name string) (*QueueInfo, error)
- func (c *Client) GetTask(ctx context.Context, name string) (*TaskInfo, error)
- func (c *Client) GetTaskRun(ctx context.Context, name string, id int64) (*RunInfo, error)
- func (c *Client) Hide(ctx context.Context, queue string, id int64, hideFor time.Duration) (bool, error)
- func (c *Client) HideMany(ctx context.Context, queue string, ids []int64, hideFor time.Duration) error
- func (c *Client) ListFlowRuns(ctx context.Context, name string) ([]*RunInfo, error)
- func (c *Client) ListFlows(ctx context.Context) ([]*FlowInfo, error)
- func (c *Client) ListQueues(ctx context.Context) ([]*QueueInfo, error)
- func (c *Client) ListTaskRuns(ctx context.Context, name string) ([]*RunInfo, error)
- func (c *Client) ListTasks(ctx context.Context) ([]*TaskInfo, error)
- func (c *Client) ListWorkers(ctx context.Context) ([]*WorkerInfo, error)
- func (c *Client) NewWorker(ctx context.Context, opts *WorkerOpts) *Worker
- func (c *Client) Publish(ctx context.Context, topic string, payload any, opts *PublishOpts) error
- func (c *Client) Read(ctx context.Context, queue string, quantity int, hideFor time.Duration) ([]Message, error)
- func (c *Client) ReadPoll(ctx context.Context, queue string, quantity int, ...) ([]Message, error)
- func (c *Client) RunFlow(ctx context.Context, name string, input any, opts *RunFlowOpts) (*RunHandle, error)
- func (c *Client) RunTask(ctx context.Context, name string, input any, opts *RunOpts) (*RunHandle, error)
- func (c *Client) Send(ctx context.Context, queue string, payload any, opts *SendOpts) error
- func (c *Client) SignalFlow(ctx context.Context, flowName string, flowRunID int64, stepName string, ...) error
- func (c *Client) Unbind(ctx context.Context, queue string, pattern string) error
- type Conn
- type Flow
- type FlowInfo
- type FullJitterBackoff
- type HandlerOpts
- type Message
- type Optional
- type PublishOpts
- type Queue
- type QueueInfo
- type QueueOpts
- type RunFlowOpts
- type RunHandle
- type RunInfo
- func GetFlowRun(ctx context.Context, conn Conn, name string, id int64) (*RunInfo, error)
- func GetTaskRun(ctx context.Context, conn Conn, name string, id int64) (*RunInfo, error)
- func ListFlowRuns(ctx context.Context, conn Conn, name string) ([]*RunInfo, error)
- func ListTaskRuns(ctx context.Context, conn Conn, name string) ([]*RunInfo, error)
- type RunOpts
- type Scheduler
- func (s *Scheduler) AddFlow(flowName string, schedule string, inputFn func(context.Context) (any, error))
- func (s *Scheduler) AddTask(taskName string, schedule string, inputFn func(context.Context) (any, error))
- func (s *Scheduler) Start(ctx context.Context) error
- func (s *Scheduler) Stop(ctx context.Context)
- type SendOpts
- type Step
- type StepDependencyInfo
- type StepHandlerInfo
- type StepInfo
- type StepRunInfo
- type Task
- type TaskHandlerInfo
- type TaskInfo
- type Worker
- type WorkerInfo
- type WorkerOpts
Constants ¶
const ( StatusCreated = "created" StatusStarted = "started" StatusCompleted = "completed" StatusFailed = "failed" StatusSkipped = "skipped" // Step skipped due to condition )
Status constants for task and flow runs
const SchemaVersion = 3
Variables ¶
var ( // ErrRunFailed is returned when you try to unmarshal the output of a failed task or flow run ErrRunFailed = fmt.Errorf("run failed") )
Functions ¶
func Bind ¶
Bind subscribes a queue to a topic pattern. Pattern supports exact topics and wildcards: ? (single token), * (multi-token tail). Examples: "foo.bar", "foo.?.bar", "foo.bar.*"
func CreateFlow ¶
CreateFlow creates one or more flow definitions.
func CreateQueue ¶
CreateQueue creates one or more queue definitions. Use Bind() separately to create topic bindings.
func CreateTask ¶
CreateTask creates one or more task definitions.
func DeleteMany ¶
DeleteMany deletes multiple messages from the queue.
func DeleteQueue ¶
DeleteQueue deletes a queue and all its messages. Returns true if the queue existed.
func EnqueuePublish ¶ added in v0.0.3
EnqueuePublish adds a Publish operation to a batch for efficient bulk message publishing.
func EnqueueSend ¶
EnqueueSend adds a Send operation to a batch for efficient bulk message sending.
func Hide ¶
func Hide(ctx context.Context, conn Conn, queue string, id int64, hideFor time.Duration) (bool, error)
Hide hides a single message from being read for the specified duration. Returns true if the message existed.
func HideMany ¶
func HideMany(ctx context.Context, conn Conn, queue string, ids []int64, hideFor time.Duration) error
HideMany hides multiple messages from being read for the specified duration.
func Publish ¶ added in v0.0.3
Publish sends a message to topic-subscribed queues with options. Pass nil opts to use defaults.
func Send ¶
Send enqueues a message to the specified queue. Pass nil opts to use defaults. The payload is marshaled to JSON.
func SignalFlow ¶
func SignalFlow(ctx context.Context, conn Conn, flowName string, flowRunID int64, stepName string, input any) error
SignalFlow delivers a signal to a waiting step in a flow run. The step must have been defined with a signal variant (e.g., NewStepWithSignal, NewStepWithSignalAndDependency). Signals enable human-in-the-loop workflows where a step waits for external input before executing. Returns an error if the signal was already delivered or the step doesn't require a signal.
Types ¶
type BackoffStrategy ¶ added in v0.0.3
type BackoffStrategy interface {
// Validate returns an error if configuration is invalid.
Validate() error
// NextDelay returns a delay for a zero-based delivery count (first retry = 0).
// Implementations should always return a positive duration.
NextDelay(deliveryCount int) time.Duration
}
BackoffStrategy defines how retry delays are calculated based on delivery count. Implementations must be safe for concurrent use.
type CircuitBreaker ¶ added in v0.0.3
type CircuitBreaker struct {
// contains filtered or unexported fields
}
func NewCircuitBreaker ¶ added in v0.0.3
func NewCircuitBreaker(failureThreshold int, openTimeout time.Duration) *CircuitBreaker
func (*CircuitBreaker) RecordFailure ¶ added in v0.0.3
func (c *CircuitBreaker) RecordFailure(now time.Time)
func (*CircuitBreaker) RecordSuccess ¶ added in v0.0.3
func (c *CircuitBreaker) RecordSuccess()
func (*CircuitBreaker) Validate ¶ added in v0.0.3
func (c *CircuitBreaker) Validate() error
type CircuitBreakerStrategy ¶ added in v0.0.3
type CircuitBreakerStrategy interface {
// Validate returns an error if configuration is invalid.
Validate() error
// Allow returns whether a call is permitted and how long to wait if not.
Allow(now time.Time) (bool, time.Duration)
// RecordSuccess updates breaker state after a successful call.
RecordSuccess()
// RecordFailure updates breaker state after a failed call.
RecordFailure(now time.Time)
}
CircuitBreakerStrategy defines the interface for circuit breaker behavior. Implementations must be safe for concurrent use.
type Client ¶
type Client struct {
Conn Conn
}
Client is a facade for interacting with Catbird
func New ¶
New creates a new Client with the given database connection.
The connection can be a *pgxpool.Pool, *pgx.Conn, or pgx.Tx.
func (*Client) Bind ¶
Bind subscribes a queue to a topic pattern. Pattern supports exact topics and wildcards: ? (single token), * (multi-token tail). Examples: "foo.bar", "foo.?.bar", "foo.bar.*"
func (*Client) CreateFlow ¶
CreateFlow creates one or more flow definitions.
func (*Client) CreateQueue ¶
CreateQueue creates one or more queue definitions.
func (*Client) CreateTask ¶
CreateTask creates one or more task definitions.
func (*Client) Delete ¶
Delete deletes a single message from the queue. Returns true if the message existed.
func (*Client) DeleteMany ¶
DeleteMany deletes multiple messages from the queue.
func (*Client) DeleteQueue ¶
DeleteQueue deletes a queue and all its messages. Returns true if the queue existed.
func (*Client) GetFlowRun ¶
GetFlowRun retrieves a specific flow run result by ID.
func (*Client) GetFlowRunSteps ¶
func (c *Client) GetFlowRunSteps(ctx context.Context, flowName string, flowRunID int64) ([]*StepRunInfo, error)
GetFlowRunSteps retrieves all step runs for a specific flow run.
func (*Client) GetTaskRun ¶
GetTaskRun retrieves a specific task run result by ID.
func (*Client) Hide ¶
func (c *Client) Hide(ctx context.Context, queue string, id int64, hideFor time.Duration) (bool, error)
Hide hides a single message from being read for the specified duration. Returns true if the message existed.
func (*Client) HideMany ¶
func (c *Client) HideMany(ctx context.Context, queue string, ids []int64, hideFor time.Duration) error
HideMany hides multiple messages from being read for the specified duration.
func (*Client) ListFlowRuns ¶
ListFlowRuns returns recent flow runs for the specified flow.
func (*Client) ListQueues ¶
ListQueues returns all queues
func (*Client) ListTaskRuns ¶
ListTaskRuns returns recent task runs for the specified task.
func (*Client) ListWorkers ¶
func (c *Client) ListWorkers(ctx context.Context) ([]*WorkerInfo, error)
ListWorkers returns all registered workers.
func (*Client) NewWorker ¶
func (c *Client) NewWorker(ctx context.Context, opts *WorkerOpts) *Worker
NewWorker creates a new worker that processes task and flow executions. Use the builder pattern methods (AddTask, AddFlow, etc.) to configure, then call Start(ctx) to begin processing.
func (*Client) Publish ¶ added in v0.0.3
Publish sends a message to all queues subscribed to the specified topic.
func (*Client) Read ¶
func (c *Client) Read(ctx context.Context, queue string, quantity int, hideFor time.Duration) ([]Message, error)
Read reads up to quantity messages from the queue, hiding them from other readers for the specified duration.
func (*Client) ReadPoll ¶
func (c *Client) ReadPoll(ctx context.Context, queue string, quantity int, hideFor, pollFor, pollInterval time.Duration) ([]Message, error)
ReadPoll reads messages from a queue with polling support. It polls repeatedly at the specified interval until messages are available or the pollFor timeout is reached.
func (*Client) RunFlow ¶
func (c *Client) RunFlow(ctx context.Context, name string, input any, opts *RunFlowOpts) (*RunHandle, error)
RunFlow enqueues a flow execution and returns a handle for monitoring.
func (*Client) RunTask ¶
func (c *Client) RunTask(ctx context.Context, name string, input any, opts *RunOpts) (*RunHandle, error)
RunTask enqueues a task execution and returns a handle for monitoring progress and retrieving output.
func (*Client) Send ¶
Send enqueues a message to the specified queue. The payload is marshaled to JSON.
func (*Client) SignalFlow ¶
func (c *Client) SignalFlow(ctx context.Context, flowName string, flowRunID int64, stepName string, input any) error
SignalFlow delivers a signal to a waiting step in a flow run. The step must have been defined with a signal variant (e.g., NewStepWithSignal). Returns an error if the signal was already delivered or the step doesn't require a signal.
type Conn ¶
type Conn interface {
Exec(context.Context, string, ...any) (pgconn.CommandTag, error)
Query(context.Context, string, ...any) (pgx.Rows, error)
QueryRow(context.Context, string, ...any) pgx.Row
}
Conn is an interface for database connections compatible with pgx.Conn and pgx.Pool
type FlowInfo ¶
type FlowInfo struct {
Name string `json:"name"`
Steps []StepInfo `json:"steps"`
CreatedAt time.Time `json:"created_at"`
}
type FullJitterBackoff ¶ added in v0.0.3
FullJitterBackoff implements exponential backoff with full jitter.
func NewFullJitterBackoff ¶ added in v0.0.3
func NewFullJitterBackoff(minDelay, maxDelay time.Duration) *FullJitterBackoff
NewFullJitterBackoff creates a FullJitterBackoff with the provided bounds.
func (*FullJitterBackoff) NextDelay ¶ added in v0.0.3
func (b *FullJitterBackoff) NextDelay(deliveryCount int) time.Duration
NextDelay returns the jittered delay for the given delivery count. deliveryCount is expected to be zero-based for the first retry.
func (*FullJitterBackoff) Validate ¶ added in v0.0.3
func (b *FullJitterBackoff) Validate() error
Validate checks the backoff configuration for consistency.
type HandlerOpts ¶ added in v0.0.3
type HandlerOpts struct {
Concurrency int
BatchSize int
Timeout time.Duration
MaxRetries int
Backoff BackoffStrategy
CircuitBreaker CircuitBreakerStrategy
}
type Message ¶
type Message struct {
ID int64 `json:"id"`
IdempotencyKey string `json:"idempotency_key,omitempty"`
Topic string `json:"topic"`
Payload json.RawMessage `json:"payload"`
Deliveries int `json:"deliveries"`
CreatedAt time.Time `json:"created_at"`
DeliverAt time.Time `json:"deliver_at"`
}
Message represents a message in a queue
func Read ¶
func Read(ctx context.Context, conn Conn, queue string, quantity int, hideFor time.Duration) ([]Message, error)
Read reads up to quantity messages from the queue, hiding them from other readers for the specified duration.
func ReadPoll ¶
func ReadPoll(ctx context.Context, conn Conn, queue string, quantity int, hideFor, pollFor, pollInterval time.Duration) ([]Message, error)
ReadPoll reads messages from a queue with polling support. It polls repeatedly at the specified interval until messages are available or the pollFor timeout is reached.
type PublishOpts ¶ added in v0.0.3
type QueueInfo ¶
type QueueInfo struct {
Name string `json:"name"`
Unlogged bool `json:"unlogged"`
CreatedAt time.Time `json:"created_at"`
ExpiresAt time.Time `json:"expires_at,omitzero"`
}
type RunFlowOpts ¶
type RunFlowOpts = RunOpts
type RunHandle ¶
RunHandle is a handle to a task or flow execution
func RunFlow ¶
func RunFlow(ctx context.Context, conn Conn, name string, input any, opts *RunFlowOpts) (*RunHandle, error)
RunFlow enqueues a flow execution and returns a handle for monitoring.
type RunInfo ¶
type RunInfo struct {
ID int64 `json:"id"`
ConcurrencyKey string `json:"concurrency_key,omitempty"`
IdempotencyKey string `json:"idempotency_key,omitempty"`
Status string `json:"status"`
Input json.RawMessage `json:"input,omitempty"`
Output json.RawMessage `json:"output,omitempty"`
ErrorMessage string `json:"error_message,omitempty"`
StartedAt time.Time `json:"started_at,omitzero"`
CompletedAt time.Time `json:"completed_at,omitzero"`
FailedAt time.Time `json:"failed_at,omitzero"`
SkippedAt time.Time `json:"skipped_at,omitzero"`
}
RunInfo represents the details of a task or flow execution
func GetFlowRun ¶
GetFlowRun retrieves a specific flow run result by ID.
func GetTaskRun ¶
GetTaskRun retrieves a specific task run result by ID.
func ListFlowRuns ¶
ListFlowRuns returns recent flow runs for the specified flow.
func ListTaskRuns ¶
ListTaskRuns returns recent task runs for the specified task.
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler manages scheduled task and flow executions using cron syntax.
func NewScheduler ¶
NewScheduler creates a new scheduler instance.
func (*Scheduler) AddFlow ¶
func (s *Scheduler) AddFlow(flowName string, schedule string, inputFn func(context.Context) (any, error))
AddFlow registers a scheduled flow execution using cron syntax. inputFn can be nil to use an empty JSON object as input, or it can provide dynamic input at execution time.
All scheduled flow executions use idempotency deduplication keyed on the scheduled execution time. This means exactly one execution per cron tick will occur even when running multiple workers concurrently.
func (*Scheduler) AddTask ¶
func (s *Scheduler) AddTask(taskName string, schedule string, inputFn func(context.Context) (any, error))
AddTask registers a scheduled task execution using cron syntax. inputFn can be nil to use an empty JSON object as input, or it can provide dynamic input at execution time.
All scheduled task executions use idempotency deduplication keyed on the scheduled execution time. This means exactly one execution per cron tick will occur even when running multiple workers concurrently.
func (*Scheduler) Start ¶
Start begins executing scheduled tasks and flows. Returns an error if any schedule fails to register. The scheduler will continue until ctx is cancelled or Stop is called.
Deduplication strategy: All scheduled runs use IdempotencyKey derived from the scheduled execution time in UTC (format: "schedule:<unix_seconds>"). This ensures: - Multiple workers/machines generate identical keys for the same cron tick - One execution per scheduled tick, even after completion - Retries allowed on failed runs (idempotency persists across completion) - No clock skew or timezone issues (all times normalized to UTC)
type StepDependencyInfo ¶
type StepDependencyInfo struct {
Name string `json:"name"`
}
type StepHandlerInfo ¶
type StepInfo ¶
type StepInfo struct {
Name string `json:"name"`
DependsOn []StepDependencyInfo `json:"depends_on,omitempty"`
}
type StepRunInfo ¶
type StepRunInfo struct {
ID int64 `json:"id"`
StepName string `json:"step_name"`
Status string `json:"status"`
Output json.RawMessage `json:"output,omitempty"`
ErrorMessage string `json:"error_message,omitempty"`
StartedAt time.Time `json:"started_at,omitzero"`
CompletedAt time.Time `json:"completed_at,omitzero"`
FailedAt time.Time `json:"failed_at,omitzero"`
SkippedAt time.Time `json:"skipped_at,omitzero"`
}
StepRunInfo represents the execution state of a single step within a flow run.
func GetFlowRunSteps ¶
func GetFlowRunSteps(ctx context.Context, conn Conn, flowName string, flowRunID int64) ([]*StepRunInfo, error)
GetFlowRunSteps retrieves all step runs for a specific flow run.
type Task ¶
type Task struct {
// contains filtered or unexported fields
}
Task is a reflection-based task with optional handler. Use NewTask().Handler(fn, opts) for tasks with handlers. Use NewTask() for definition-only tasks.
func NewTask ¶
NewTask creates a new task definition with the given name. Chain .Handler() to add a handler, otherwise returns a definition-only task.
func (*Task) Handler ¶ added in v0.0.3
func (t *Task) Handler(fn any, opts *HandlerOpts) *Task
Handler sets the task handler function and execution options. fn must have signature (context.Context, In) (Out, error). If opts is nil, defaults are used (concurrency: 1, batchSize: 10).
func (*Task) MarshalJSON ¶ added in v0.0.3
MarshalJSON serializes the task for JSON output
type TaskHandlerInfo ¶
type TaskHandlerInfo struct {
TaskName string `json:"task_name"`
}
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker processes tasks and flows from the queue
func NewWorker ¶
func NewWorker(conn Conn, opts *WorkerOpts) *Worker
NewWorker creates a new worker with the given connection and configuration. Use builder methods (AddTask, AddFlow, etc.) to configure the worker. Call Start(ctx) to begin processing tasks and flows.
func (*Worker) AddFlow ¶ added in v0.0.3
AddFlow registers a flow with the worker. If the flow has a Schedule set via .Schedule(), it will be executed according to the cron schedule.
func (*Worker) AddTask ¶ added in v0.0.3
AddTask registers a task with the worker. If the task has a Schedule set via .Schedule(), it will be executed according to the cron schedule.
func (*Worker) Start ¶
Start begins processing tasks and flows.
The worker will:
- poll for new work and execute task and flow step handlers while ctx is active
- run any configured cron-style task and flow schedules
- send periodic heartbeats while it is running
- register built-in garbage collection task running every 5 minutes
Shutdown behaviour:
- when ctx is cancelled the worker immediately stops reading new work and begins shutting down
- if ShutdownTimeout is set to a value > 0, that duration is used as a grace period for in‑flight handlers after ctx is cancelled; once the grace period expires the handler context is cancelled and remaining handlers are asked to stop. The default graceful shutdown timeout is 5 seconds.
- if ShutdownTimeout is not set or set to 0, there is no grace period: the handler context is cancelled immediately once ctx is cancelled and Start returns after all goroutines finish
type WorkerInfo ¶
type WorkerInfo struct {
ID string `json:"id"`
TaskHandlers []*TaskHandlerInfo `json:"task_handlers"`
StepHandlers []*StepHandlerInfo `json:"step_handlers"`
StartedAt time.Time `json:"started_at"`
LastHeartbeatAt time.Time `json:"last_heartbeat_at"`
}
func ListWorkers ¶
func ListWorkers(ctx context.Context, conn Conn) ([]*WorkerInfo, error)
ListWorkers returns all registered workers.