Documentation
¶
Overview ¶
Package catbird provides a PostgreSQL-based message queue with task and workflow execution engine.
Scheduling for Distributed Environments ¶
Catbird includes built-in support for scheduled task and flow execution using cron syntax. When multiple workers run concurrently (on different machines or processes), Catbird guarantees that each scheduled execution runs **exactly once per cron tick**, even across clock skew and timezone differences.
This is achieved through:
- UTC-normalized cron scheduling: all workers use UTC, eliminating timezone confusion
- Idempotency key deduplication: each cron tick generates a deterministic key (format: "schedule:{unix_nanos_utc}") that persists across completion
- PostgreSQL as the single source of truth: the database enforces the unique constraint on idempotency keys, preventing duplicates at the atomic level
See scheduler.go for implementation details and SCHEDULING_ADVANCED.md for the roadmap toward a future DB-driven scheduling system with even stronger guarantees.
Index ¶
- Constants
- Variables
- func Bind(ctx context.Context, conn Conn, queueName string, pattern string) error
- func CreateFlow(ctx context.Context, conn Conn, flows ...*Flow) error
- func CreateFlowSchedule(ctx context.Context, conn Conn, flowName, cronSpec string, opts *ScheduleOpts) error
- func CreateQueue(ctx context.Context, conn Conn, queueName string, opts *QueueOpts) error
- func CreateTask(ctx context.Context, conn Conn, tasks ...*Task) error
- func CreateTaskSchedule(ctx context.Context, conn Conn, taskName, cronSpec string, opts *ScheduleOpts) error
- func Delete(ctx context.Context, conn Conn, queueName string, id int64) (bool, error)
- func DeleteMany(ctx context.Context, conn Conn, queueName string, ids []int64) error
- func DeleteQueue(ctx context.Context, conn Conn, queueName string) (bool, error)
- func GC(ctx context.Context, conn Conn) error
- func Hide(ctx context.Context, conn Conn, queueName string, id int64, ...) (bool, error)
- func HideMany(ctx context.Context, conn Conn, queueName string, ids []int64, ...) error
- func MigrateDownTo(ctx context.Context, db *sql.DB, version int) error
- func MigrateUpTo(ctx context.Context, db *sql.DB, version int) error
- func Publish(ctx context.Context, conn Conn, topic string, payload any, opts *PublishOpts) error
- func PublishQuery(topic string, payload any, opts *PublishOpts) (string, []any, error)
- func RunFlowQuery(flowName string, input any, opts *RunFlowOpts) (string, []any, error)
- func RunTaskQuery(taskName string, input any, opts *RunOpts) (string, []any, error)
- func Send(ctx context.Context, conn Conn, queueName string, payload any, opts *SendOpts) error
- func SendQuery(queueName string, payload any, opts *SendOpts) (string, []any, error)
- func SignalFlow(ctx context.Context, conn Conn, flowName string, flowRunID int64, ...) error
- func Unbind(ctx context.Context, conn Conn, queueName string, pattern string) error
- type BackoffStrategy
- type CircuitBreaker
- type CircuitBreakerStrategy
- type Client
- func (c *Client) Bind(ctx context.Context, queueName string, pattern string) error
- func (c *Client) CreateFlow(ctx context.Context, flows ...*Flow) error
- func (c *Client) CreateFlowSchedule(ctx context.Context, flowName, cronSpec string, opts *ScheduleOpts) error
- func (c *Client) CreateQueue(ctx context.Context, queueName string, opts *QueueOpts) error
- func (c *Client) CreateTask(ctx context.Context, tasks ...*Task) error
- func (c *Client) CreateTaskSchedule(ctx context.Context, taskName, cronSpec string, opts *ScheduleOpts) error
- func (c *Client) Delete(ctx context.Context, queueName string, id int64) (bool, error)
- func (c *Client) DeleteMany(ctx context.Context, queueName string, ids []int64) error
- func (c *Client) DeleteQueue(ctx context.Context, queueName string) (bool, error)
- func (c *Client) GetFlow(ctx context.Context, flowName string) (*FlowInfo, error)
- func (c *Client) GetFlowRun(ctx context.Context, flowName string, flowRunID int64) (*RunInfo, error)
- func (c *Client) GetFlowRunSteps(ctx context.Context, flowName string, flowRunID int64) ([]*StepRunInfo, error)
- func (c *Client) GetQueue(ctx context.Context, queueName string) (*QueueInfo, error)
- func (c *Client) GetTask(ctx context.Context, taskName string) (*TaskInfo, error)
- func (c *Client) GetTaskRun(ctx context.Context, taskName string, taskRunID int64) (*RunInfo, error)
- func (c *Client) Hide(ctx context.Context, queueName string, id int64, hideFor time.Duration) (bool, error)
- func (c *Client) HideMany(ctx context.Context, queueName string, ids []int64, hideFor time.Duration) error
- func (c *Client) ListFlowRuns(ctx context.Context, flowName string) ([]*RunInfo, error)
- func (c *Client) ListFlowSchedules(ctx context.Context) ([]*FlowScheduleInfo, error)
- func (c *Client) ListFlows(ctx context.Context) ([]*FlowInfo, error)
- func (c *Client) ListQueues(ctx context.Context) ([]*QueueInfo, error)
- func (c *Client) ListTaskRuns(ctx context.Context, taskName string) ([]*RunInfo, error)
- func (c *Client) ListTaskSchedules(ctx context.Context) ([]*TaskScheduleInfo, error)
- func (c *Client) ListTasks(ctx context.Context) ([]*TaskInfo, error)
- func (c *Client) ListWorkers(ctx context.Context) ([]*WorkerInfo, error)
- func (c *Client) NewWorker(ctx context.Context, opts *WorkerOpts) *Worker
- func (c *Client) Publish(ctx context.Context, topic string, payload any, opts *PublishOpts) error
- func (c *Client) Read(ctx context.Context, queueName string, quantity int, hideFor time.Duration) ([]Message, error)
- func (c *Client) ReadPoll(ctx context.Context, queueName string, quantity int, ...) ([]Message, error)
- func (c *Client) RunFlow(ctx context.Context, flowName string, input any, opts *RunFlowOpts) (*RunHandle, error)
- func (c *Client) RunTask(ctx context.Context, taskName string, input any, opts *RunOpts) (*RunHandle, error)
- func (c *Client) Send(ctx context.Context, queueName string, payload any, opts *SendOpts) error
- func (c *Client) SignalFlow(ctx context.Context, flowName string, flowRunID int64, stepName string, ...) error
- func (c *Client) Unbind(ctx context.Context, queueName string, pattern string) error
- type Conn
- type Flow
- type FlowInfo
- type FlowScheduleInfo
- type FullJitterBackoff
- type HandlerOpts
- type Message
- type Optional
- type PublishOpts
- type QueueInfo
- type QueueOpts
- type RunFlowOpts
- type RunHandle
- type RunInfo
- func GetFlowRun(ctx context.Context, conn Conn, flowName string, flowRunID int64) (*RunInfo, error)
- func GetTaskRun(ctx context.Context, conn Conn, taskName string, taskRunID int64) (*RunInfo, error)
- func ListFlowRuns(ctx context.Context, conn Conn, flowName string) ([]*RunInfo, error)
- func ListTaskRuns(ctx context.Context, conn Conn, taskName string) ([]*RunInfo, error)
- type RunOpts
- type ScheduleOpts
- type SendOpts
- type Step
- type StepDependencyInfo
- type StepHandlerInfo
- type StepInfo
- type StepRunInfo
- type Task
- type TaskHandlerInfo
- type TaskInfo
- type TaskScheduleInfo
- type Worker
- type WorkerInfo
- type WorkerOpts
Constants ¶
const ( StatusCreated = "created" StatusStarted = "started" StatusCompleted = "completed" StatusFailed = "failed" StatusSkipped = "skipped" // Step skipped due to condition )
Status constants for task and flow runs
const SchemaVersion = 13
Variables ¶
var ( // ErrRunFailed is returned when you try to unmarshal the output of a failed task or flow run ErrRunFailed = fmt.Errorf("run failed") )
Functions ¶
func Bind ¶
Bind subscribes a queue to a topic pattern. Pattern supports exact topics and wildcards: ? (single token), * (multi-token tail). Examples: "foo.bar", "foo.?.bar", "foo.bar.*"
func CreateFlow ¶
CreateFlow creates one or more flow definitions.
func CreateFlowSchedule ¶ added in v0.0.5
func CreateFlowSchedule(ctx context.Context, conn Conn, flowName, cronSpec string, opts *ScheduleOpts) error
CreateFlowSchedule creates a cron-based schedule for a flow.
Parameters:
- ctx: context for database operations
- conn: database connection (can be *pgxpool.Pool, *pgx.Conn, or pgx.Tx)
- flowName: name of the flow to schedule (must already exist via CreateFlow)
- cronSpec: cron schedule in 5-field format (min hour day month dow) or descriptors Descriptors: @hourly, @daily, @midnight, @weekly, @monthly, @yearly Examples: "0 * * * *" (hourly), "0 0 * * *" (daily at midnight)
- opts: optional ScheduleOpts configuring the schedule (Input field for static input) If nil or opts.Input is nil, empty JSON object {} is used
The schedule will execute the flow with the provided static input at each cron tick. Returns an error if the flow does not exist or if the cron spec is invalid.
func CreateQueue ¶
CreateQueue creates a queue with the given name and optional options. Use Bind() separately to create topic bindings.
func CreateTask ¶
CreateTask creates one or more task definitions.
func CreateTaskSchedule ¶ added in v0.0.5
func CreateTaskSchedule(ctx context.Context, conn Conn, taskName, cronSpec string, opts *ScheduleOpts) error
CreateTaskSchedule creates a cron-based schedule for a task.
Parameters:
- ctx: context for database operations
- conn: database connection (can be *pgxpool.Pool, *pgx.Conn, or pgx.Tx)
- taskName: name of the task to schedule (must already exist via CreateTask)
- cronSpec: cron schedule in 5-field format (min hour day month dow) or descriptors Descriptors: @hourly, @daily, @midnight, @weekly, @monthly, @yearly Examples: "0 * * * *" (hourly), "0 0 * * *" (daily at midnight)
- opts: optional ScheduleOpts configuring the schedule (Input field for static input) If nil or opts.Input is nil, empty JSON object {} is used
The schedule will execute the task with the provided static input at each cron tick. Returns an error if the task does not exist or if the cron spec is invalid.
func DeleteMany ¶
DeleteMany deletes multiple messages from the queue.
func DeleteQueue ¶
DeleteQueue deletes a queue and all its messages. Returns true if the queue existed.
func GC ¶
GC runs garbage collection to clean up expired queues and stale workers. Note: Worker heartbeats automatically perform cleanup, so this is mainly useful for deployments without workers or for manual control.
func Hide ¶
func Hide(ctx context.Context, conn Conn, queueName string, id int64, hideFor time.Duration) (bool, error)
Hide hides a single message from being read for the specified duration. Returns true if the message existed.
func HideMany ¶
func HideMany(ctx context.Context, conn Conn, queueName string, ids []int64, hideFor time.Duration) error
HideMany hides multiple messages from being read for the specified duration.
func Publish ¶ added in v0.0.3
Publish sends a message to topic-subscribed queues with options. Pass nil opts to use defaults.
func PublishQuery ¶ added in v0.0.4
PublishQuery builds the SQL query and args for a Publish operation. Pass nil opts to use defaults.
func RunFlowQuery ¶ added in v0.0.4
RunFlowQuery builds the SQL query and args for a RunFlow operation. Pass nil opts to use defaults.
func RunTaskQuery ¶ added in v0.0.4
RunTaskQuery builds the SQL query and args for a RunTask operation. Pass nil opts to use defaults.
func SendQuery ¶ added in v0.0.4
SendQuery builds the SQL query and args for a Send operation. Pass nil opts to use defaults.
func SignalFlow ¶
func SignalFlow(ctx context.Context, conn Conn, flowName string, flowRunID int64, stepName string, input any) error
SignalFlow delivers a signal to a waiting step in a flow run. The step must have been defined with a signal variant (e.g., NewStepWithSignal, NewStepWithSignalAndDependency). Signals enable human-in-the-loop workflows where a step waits for external input before executing. Returns an error if the signal was already delivered or the step doesn't require a signal.
Types ¶
type BackoffStrategy ¶ added in v0.0.3
type BackoffStrategy interface {
// Validate returns an error if configuration is invalid.
Validate() error
// NextDelay returns a delay for a zero-based delivery count (first retry = 0).
// Implementations should always return a positive duration.
NextDelay(deliveryCount int) time.Duration
}
BackoffStrategy defines how retry delays are calculated based on delivery count. Implementations must be safe for concurrent use.
type CircuitBreaker ¶ added in v0.0.3
type CircuitBreaker struct {
// contains filtered or unexported fields
}
func NewCircuitBreaker ¶ added in v0.0.3
func NewCircuitBreaker(failureThreshold int, openTimeout time.Duration) *CircuitBreaker
func (*CircuitBreaker) RecordFailure ¶ added in v0.0.3
func (c *CircuitBreaker) RecordFailure(now time.Time)
func (*CircuitBreaker) RecordSuccess ¶ added in v0.0.3
func (c *CircuitBreaker) RecordSuccess()
func (*CircuitBreaker) Validate ¶ added in v0.0.3
func (c *CircuitBreaker) Validate() error
type CircuitBreakerStrategy ¶ added in v0.0.3
type CircuitBreakerStrategy interface {
// Validate returns an error if configuration is invalid.
Validate() error
// Allow returns whether a call is permitted and how long to wait if not.
Allow(now time.Time) (bool, time.Duration)
// RecordSuccess updates breaker state after a successful call.
RecordSuccess()
// RecordFailure updates breaker state after a failed call.
RecordFailure(now time.Time)
}
CircuitBreakerStrategy defines the interface for circuit breaker behavior. Implementations must be safe for concurrent use.
type Client ¶
type Client struct {
Conn Conn
}
Client is a facade for interacting with Catbird
func New ¶
New creates a new Client with the given database connection.
The connection can be a *pgxpool.Pool, *pgx.Conn, or pgx.Tx.
func (*Client) Bind ¶
Bind subscribes a queue to a topic pattern. Pattern supports exact topics and wildcards: ? (single token), * (multi-token tail). Examples: "foo.bar", "foo.?.bar", "foo.bar.*"
func (*Client) CreateFlow ¶
CreateFlow creates one or more flow definitions.
func (*Client) CreateFlowSchedule ¶ added in v0.0.5
func (c *Client) CreateFlowSchedule(ctx context.Context, flowName, cronSpec string, opts *ScheduleOpts) error
CreateFlowSchedule creates a cron-based schedule for a flow. cronSpec should be in 5-field format (min hour day month dow) or descriptors like @hourly, @daily. opts is optional ScheduleOpts configuring the schedule (Input field for static input).
func (*Client) CreateQueue ¶
CreateQueue creates a queue with the given name and optional options.
func (*Client) CreateTask ¶
CreateTask creates one or more task definitions.
func (*Client) CreateTaskSchedule ¶ added in v0.0.5
func (c *Client) CreateTaskSchedule(ctx context.Context, taskName, cronSpec string, opts *ScheduleOpts) error
CreateTaskSchedule creates a cron-based schedule for a task. cronSpec should be in 5-field format (min hour day month dow) or descriptors like @hourly, @daily. opts is optional ScheduleOpts configuring the schedule (Input field for static input).
func (*Client) Delete ¶
Delete deletes a single message from the queue. Returns true if the message existed.
func (*Client) DeleteMany ¶
DeleteMany deletes multiple messages from the queue.
func (*Client) DeleteQueue ¶
DeleteQueue deletes a queue and all its messages. Returns true if the queue existed.
func (*Client) GetFlowRun ¶
func (c *Client) GetFlowRun(ctx context.Context, flowName string, flowRunID int64) (*RunInfo, error)
GetFlowRun retrieves a specific flow run result by ID.
func (*Client) GetFlowRunSteps ¶
func (c *Client) GetFlowRunSteps(ctx context.Context, flowName string, flowRunID int64) ([]*StepRunInfo, error)
GetFlowRunSteps retrieves all step runs for a specific flow run.
func (*Client) GetTaskRun ¶
func (c *Client) GetTaskRun(ctx context.Context, taskName string, taskRunID int64) (*RunInfo, error)
GetTaskRun retrieves a specific task run result by ID.
func (*Client) Hide ¶
func (c *Client) Hide(ctx context.Context, queueName string, id int64, hideFor time.Duration) (bool, error)
Hide hides a single message from being read for the specified duration. Returns true if the message existed.
func (*Client) HideMany ¶
func (c *Client) HideMany(ctx context.Context, queueName string, ids []int64, hideFor time.Duration) error
HideMany hides multiple messages from being read for the specified duration.
func (*Client) ListFlowRuns ¶
ListFlowRuns returns recent flow runs for the specified flow.
func (*Client) ListFlowSchedules ¶ added in v0.0.5
func (c *Client) ListFlowSchedules(ctx context.Context) ([]*FlowScheduleInfo, error)
ListFlowSchedules returns all flow schedules ordered by next_run_at.
func (*Client) ListQueues ¶
ListQueues returns all queues
func (*Client) ListTaskRuns ¶
ListTaskRuns returns recent task runs for the specified task.
func (*Client) ListTaskSchedules ¶ added in v0.0.5
func (c *Client) ListTaskSchedules(ctx context.Context) ([]*TaskScheduleInfo, error)
ListTaskSchedules returns all task schedules ordered by next_run_at.
func (*Client) ListWorkers ¶
func (c *Client) ListWorkers(ctx context.Context) ([]*WorkerInfo, error)
ListWorkers returns all registered workers.
func (*Client) NewWorker ¶
func (c *Client) NewWorker(ctx context.Context, opts *WorkerOpts) *Worker
NewWorker creates a new worker that processes task and flow executions. Use the builder pattern methods (AddTask, AddFlow, etc.) to configure, then call Start(ctx) to begin processing.
func (*Client) Publish ¶ added in v0.0.3
Publish sends a message to all queues subscribed to the specified topic.
func (*Client) Read ¶
func (c *Client) Read(ctx context.Context, queueName string, quantity int, hideFor time.Duration) ([]Message, error)
Read reads up to quantity messages from the queue, hiding them from other readers for the specified duration.
func (*Client) ReadPoll ¶
func (c *Client) ReadPoll(ctx context.Context, queueName string, quantity int, hideFor, pollFor, pollInterval time.Duration) ([]Message, error)
ReadPoll reads messages from a queue with polling support. It polls repeatedly at the specified interval until messages are available or the pollFor timeout is reached.
func (*Client) RunFlow ¶
func (c *Client) RunFlow(ctx context.Context, flowName string, input any, opts *RunFlowOpts) (*RunHandle, error)
RunFlow enqueues a flow execution and returns a handle for monitoring.
func (*Client) RunTask ¶
func (c *Client) RunTask(ctx context.Context, taskName string, input any, opts *RunOpts) (*RunHandle, error)
RunTask enqueues a task execution and returns a handle for monitoring progress and retrieving output.
func (*Client) SignalFlow ¶
func (c *Client) SignalFlow(ctx context.Context, flowName string, flowRunID int64, stepName string, input any) error
SignalFlow delivers a signal to a waiting step in a flow run. The step must have been defined with a signal variant (e.g., NewStepWithSignal). Returns an error if the signal was already delivered or the step doesn't require a signal.
type Conn ¶
type Conn interface {
Exec(context.Context, string, ...any) (pgconn.CommandTag, error)
Query(context.Context, string, ...any) (pgx.Rows, error)
QueryRow(context.Context, string, ...any) pgx.Row
}
Conn is an interface for database connections compatible with pgx.Conn and pgx.Pool
type FlowInfo ¶
type FlowInfo struct {
Name string `json:"name"`
Steps []StepInfo `json:"steps"`
CreatedAt time.Time `json:"created_at"`
}
type FlowScheduleInfo ¶ added in v0.0.5
type FlowScheduleInfo struct {
FlowName string `json:"flow_name"`
CronSpec string `json:"cron_spec"`
NextRunAt time.Time `json:"next_run_at"`
LastRunAt *time.Time `json:"last_run_at,omitempty"`
LastEnqueuedAt *time.Time `json:"last_enqueued_at,omitempty"`
Enabled bool `json:"enabled"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
FlowScheduleInfo contains metadata about a scheduled flow.
func ListFlowSchedules ¶ added in v0.0.5
func ListFlowSchedules(ctx context.Context, conn Conn) ([]*FlowScheduleInfo, error)
ListFlowSchedules returns all flow schedules ordered by next_run_at.
type FullJitterBackoff ¶ added in v0.0.3
FullJitterBackoff implements exponential backoff with full jitter.
func NewFullJitterBackoff ¶ added in v0.0.3
func NewFullJitterBackoff(minDelay, maxDelay time.Duration) *FullJitterBackoff
NewFullJitterBackoff creates a FullJitterBackoff with the provided bounds.
func (*FullJitterBackoff) NextDelay ¶ added in v0.0.3
func (b *FullJitterBackoff) NextDelay(deliveryCount int) time.Duration
NextDelay returns the jittered delay for the given delivery count. deliveryCount is expected to be zero-based for the first retry.
func (*FullJitterBackoff) Validate ¶ added in v0.0.3
func (b *FullJitterBackoff) Validate() error
Validate checks the backoff configuration for consistency.
type HandlerOpts ¶ added in v0.0.3
type HandlerOpts struct {
Concurrency int
BatchSize int
Timeout time.Duration
MaxRetries int
Backoff BackoffStrategy
CircuitBreaker CircuitBreakerStrategy
}
type Message ¶
type Message struct {
ID int64 `json:"id"`
IdempotencyKey string `json:"idempotency_key,omitempty"`
Topic string `json:"topic"`
Payload json.RawMessage `json:"payload"`
Deliveries int `json:"deliveries"`
CreatedAt time.Time `json:"created_at"`
DeliverAt time.Time `json:"deliver_at"`
}
Message represents a message in a queue
func Read ¶
func Read(ctx context.Context, conn Conn, queueName string, quantity int, hideFor time.Duration) ([]Message, error)
Read reads up to quantity messages from the queue, hiding them from other readers for the specified duration.
func ReadPoll ¶
func ReadPoll(ctx context.Context, conn Conn, queueName string, quantity int, hideFor, pollFor, pollInterval time.Duration) ([]Message, error)
ReadPoll reads messages from a queue with polling support. It polls repeatedly at the specified interval until messages are available or the pollFor timeout is reached.
type PublishOpts ¶ added in v0.0.3
type QueueInfo ¶
type QueueInfo struct {
Name string `json:"name"`
Unlogged bool `json:"unlogged"`
CreatedAt time.Time `json:"created_at"`
ExpiresAt time.Time `json:"expires_at,omitzero"`
}
type RunFlowOpts ¶
type RunFlowOpts = RunOpts
type RunHandle ¶
RunHandle is a handle to a task or flow execution
func RunFlow ¶
func RunFlow(ctx context.Context, conn Conn, flowName string, input any, opts *RunFlowOpts) (*RunHandle, error)
RunFlow enqueues a flow execution and returns a handle for monitoring.
type RunInfo ¶
type RunInfo struct {
ID int64 `json:"id"`
ConcurrencyKey string `json:"concurrency_key,omitempty"`
IdempotencyKey string `json:"idempotency_key,omitempty"`
Status string `json:"status"`
Input json.RawMessage `json:"input,omitempty"`
Output json.RawMessage `json:"output,omitempty"`
ErrorMessage string `json:"error_message,omitempty"`
StartedAt time.Time `json:"started_at,omitzero"`
CompletedAt time.Time `json:"completed_at,omitzero"`
FailedAt time.Time `json:"failed_at,omitzero"`
SkippedAt time.Time `json:"skipped_at,omitzero"`
}
RunInfo represents the details of a task or flow execution
func GetFlowRun ¶
GetFlowRun retrieves a specific flow run result by ID.
func GetTaskRun ¶
GetTaskRun retrieves a specific task run result by ID.
func ListFlowRuns ¶
ListFlowRuns returns recent flow runs for the specified flow.
func ListTaskRuns ¶
ListTaskRuns returns recent task runs for the specified task.
type ScheduleOpts ¶ added in v0.0.5
type ScheduleOpts struct {
// Input is the static input value for each scheduled execution.
// It will be marshaled to JSON and stored in the database.
// If nil, an empty object {} is used.
Input any
}
ScheduleOpts configures scheduled task/flow behavior.
type StepDependencyInfo ¶
type StepDependencyInfo struct {
Name string `json:"name"`
}
type StepHandlerInfo ¶
type StepInfo ¶
type StepInfo struct {
Name string `json:"name"`
DependsOn []StepDependencyInfo `json:"depends_on,omitempty"`
}
type StepRunInfo ¶
type StepRunInfo struct {
ID int64 `json:"id"`
StepName string `json:"step_name"`
Status string `json:"status"`
Output json.RawMessage `json:"output,omitempty"`
ErrorMessage string `json:"error_message,omitempty"`
StartedAt time.Time `json:"started_at,omitzero"`
CompletedAt time.Time `json:"completed_at,omitzero"`
FailedAt time.Time `json:"failed_at,omitzero"`
SkippedAt time.Time `json:"skipped_at,omitzero"`
}
StepRunInfo represents the execution state of a single step within a flow run.
func GetFlowRunSteps ¶
func GetFlowRunSteps(ctx context.Context, conn Conn, flowName string, flowRunID int64) ([]*StepRunInfo, error)
GetFlowRunSteps retrieves all step runs for a specific flow run.
type Task ¶
type Task struct {
// contains filtered or unexported fields
}
Task is a reflection-based task with optional handler. Use NewTask().Handler(fn, opts) for tasks with handlers. Use NewTask() for definition-only tasks.
func NewTask ¶
NewTask creates a new task definition with the given name. Chain .Handler() to add a handler, otherwise returns a definition-only task.
func (*Task) Handler ¶ added in v0.0.3
func (t *Task) Handler(fn any, opts *HandlerOpts) *Task
Handler sets the task handler function and execution options. fn must have signature (context.Context, In) (Out, error). If opts is nil, defaults are used (concurrency: 1, batchSize: 10).
func (*Task) MarshalJSON ¶ added in v0.0.3
MarshalJSON serializes the task for JSON output
type TaskHandlerInfo ¶
type TaskHandlerInfo struct {
TaskName string `json:"task_name"`
}
type TaskScheduleInfo ¶ added in v0.0.5
type TaskScheduleInfo struct {
TaskName string `json:"task_name"`
CronSpec string `json:"cron_spec"`
NextRunAt time.Time `json:"next_run_at"`
LastRunAt *time.Time `json:"last_run_at,omitempty"`
LastEnqueuedAt *time.Time `json:"last_enqueued_at,omitempty"`
Enabled bool `json:"enabled"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
TaskScheduleInfo contains metadata about a scheduled task.
func ListTaskSchedules ¶ added in v0.0.5
func ListTaskSchedules(ctx context.Context, conn Conn) ([]*TaskScheduleInfo, error)
ListTaskSchedules returns all task schedules ordered by next_run_at.
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker processes tasks and flows from the queue
func NewWorker ¶
func NewWorker(conn Conn, opts *WorkerOpts) *Worker
NewWorker creates a new worker with the given connection and configuration. Use builder methods (AddTask, AddFlow, etc.) to configure the worker. Call Start(ctx) to begin processing tasks and flows.
func (*Worker) Start ¶
Start begins processing tasks and flows.
The worker will:
- poll for new work and execute task and flow step handlers while ctx is active
- run any configured cron-style task and flow schedules
- send periodic heartbeats while it is running
- register built-in garbage collection task running every 5 minutes
Shutdown behaviour:
- when ctx is cancelled the worker immediately stops reading new work and begins shutting down
- if ShutdownTimeout is set to a value > 0, that duration is used as a grace period for in‑flight handlers after ctx is cancelled; once the grace period expires the handler context is cancelled and remaining handlers are asked to stop. The default graceful shutdown timeout is 5 seconds.
- if ShutdownTimeout is not set or set to 0, there is no grace period: the handler context is cancelled immediately once ctx is cancelled and Start returns after all goroutines finish
type WorkerInfo ¶
type WorkerInfo struct {
ID string `json:"id"`
TaskHandlers []*TaskHandlerInfo `json:"task_handlers"`
StepHandlers []*StepHandlerInfo `json:"step_handlers"`
StartedAt time.Time `json:"started_at"`
LastHeartbeatAt time.Time `json:"last_heartbeat_at"`
}
func ListWorkers ¶
func ListWorkers(ctx context.Context, conn Conn) ([]*WorkerInfo, error)
ListWorkers returns all registered workers.