Documentation
¶
Overview ¶
Package catbird provides a PostgreSQL-based message queue with task and workflow execution engine.
Scheduling for Distributed Environments ¶
Catbird includes built-in support for scheduled task and flow execution using cron syntax. When multiple workers run concurrently (on different machines or processes), Catbird guarantees that each scheduled execution runs **exactly once per cron tick**, even across clock skew and timezone differences.
This is achieved through:
- UTC-normalized cron scheduling: all workers use UTC, eliminating timezone confusion
- Idempotency key deduplication: each cron tick generates a deterministic key (format: "schedule:{unix_nanos_utc}") that persists across completion
- PostgreSQL as the single source of truth: the database enforces the unique constraint on idempotency keys, preventing duplicates at the atomic level
See scheduler.go for implementation details and SCHEDULING_ADVANCED.md for the roadmap toward a future DB-driven scheduling system with even stronger guarantees.
Index ¶
- Constants
- Variables
- func Bind(ctx context.Context, conn Conn, queueName string, pattern string) error
- func CreateFlow(ctx context.Context, conn Conn, flows ...*Flow) error
- func CreateFlowSchedule(ctx context.Context, conn Conn, flowName, cronSpec string, ...) error
- func CreateQueue(ctx context.Context, conn Conn, queueName string, opts ...QueueOpts) error
- func CreateTask(ctx context.Context, conn Conn, tasks ...*Task) error
- func CreateTaskSchedule(ctx context.Context, conn Conn, taskName, cronSpec string, ...) error
- func Delete(ctx context.Context, conn Conn, queueName string, id int64) (bool, error)
- func DeleteMany(ctx context.Context, conn Conn, queueName string, ids []int64) error
- func DeleteQueue(ctx context.Context, conn Conn, queueName string) (bool, error)
- func GC(ctx context.Context, conn Conn) error
- func Hide(ctx context.Context, conn Conn, queueName string, id int64, ...) (bool, error)
- func HideMany(ctx context.Context, conn Conn, queueName string, ids []int64, ...) error
- func MigrateDownTo(ctx context.Context, db *sql.DB, version int) error
- func MigrateUpTo(ctx context.Context, db *sql.DB, version int) error
- func Publish(ctx context.Context, conn Conn, topic string, payload any, opts ...PublishOpts) error
- func PublishQuery(topic string, payload any, opts ...PublishOpts) (string, []any, error)
- func RunFlowQuery(flowName string, input any, opts ...RunFlowOpts) (string, []any, error)
- func RunTaskQuery(taskName string, input any, opts ...RunTaskOpts) (string, []any, error)
- func Send(ctx context.Context, conn Conn, queueName string, payload any, ...) error
- func SendQuery(queueName string, payload any, opts ...SendOpts) (string, []any, error)
- func SignalFlow(ctx context.Context, conn Conn, flowName string, flowRunID int64, ...) error
- func Unbind(ctx context.Context, conn Conn, queueName string, pattern string) error
- type BackoffStrategy
- type CircuitBreaker
- type CircuitBreakerStrategy
- type Client
- func (c *Client) Bind(ctx context.Context, queueName string, pattern string) error
- func (c *Client) CreateFlow(ctx context.Context, flows ...*Flow) error
- func (c *Client) CreateFlowSchedule(ctx context.Context, flowName, cronSpec string, opts ...ScheduleOpts) error
- func (c *Client) CreateQueue(ctx context.Context, queueName string, opts ...QueueOpts) error
- func (c *Client) CreateTask(ctx context.Context, tasks ...*Task) error
- func (c *Client) CreateTaskSchedule(ctx context.Context, taskName, cronSpec string, opts ...ScheduleOpts) error
- func (c *Client) Delete(ctx context.Context, queueName string, id int64) (bool, error)
- func (c *Client) DeleteMany(ctx context.Context, queueName string, ids []int64) error
- func (c *Client) DeleteQueue(ctx context.Context, queueName string) (bool, error)
- func (c *Client) GetFlow(ctx context.Context, flowName string) (*FlowInfo, error)
- func (c *Client) GetFlowRun(ctx context.Context, flowName string, flowRunID int64) (*FlowRunInfo, error)
- func (c *Client) GetFlowRunSteps(ctx context.Context, flowName string, flowRunID int64) ([]*StepRunInfo, error)
- func (c *Client) GetQueue(ctx context.Context, queueName string) (*QueueInfo, error)
- func (c *Client) GetTask(ctx context.Context, taskName string) (*TaskInfo, error)
- func (c *Client) GetTaskRun(ctx context.Context, taskName string, taskRunID int64) (*TaskRunInfo, error)
- func (c *Client) Hide(ctx context.Context, queueName string, id int64, hideFor time.Duration) (bool, error)
- func (c *Client) HideMany(ctx context.Context, queueName string, ids []int64, hideFor time.Duration) error
- func (c *Client) ListFlowRuns(ctx context.Context, flowName string) ([]*FlowRunInfo, error)
- func (c *Client) ListFlowSchedules(ctx context.Context) ([]*FlowScheduleInfo, error)
- func (c *Client) ListFlows(ctx context.Context) ([]*FlowInfo, error)
- func (c *Client) ListQueues(ctx context.Context) ([]*QueueInfo, error)
- func (c *Client) ListTaskRuns(ctx context.Context, taskName string) ([]*TaskRunInfo, error)
- func (c *Client) ListTaskSchedules(ctx context.Context) ([]*TaskScheduleInfo, error)
- func (c *Client) ListTasks(ctx context.Context) ([]*TaskInfo, error)
- func (c *Client) ListWorkers(ctx context.Context) ([]*WorkerInfo, error)
- func (c *Client) NewWorker(ctx context.Context, opts ...WorkerOpts) *Worker
- func (c *Client) Publish(ctx context.Context, topic string, payload any, opts ...PublishOpts) error
- func (c *Client) Read(ctx context.Context, queueName string, quantity int, hideFor time.Duration) ([]Message, error)
- func (c *Client) ReadPoll(ctx context.Context, queueName string, quantity int, hideFor time.Duration, ...) ([]Message, error)
- func (c *Client) RunFlow(ctx context.Context, flowName string, input any, opts ...RunFlowOpts) (*FlowHandle, error)
- func (c *Client) RunTask(ctx context.Context, taskName string, input any, opts ...RunTaskOpts) (*TaskHandle, error)
- func (c *Client) Send(ctx context.Context, queueName string, payload any, opts ...SendOpts) error
- func (c *Client) SignalFlow(ctx context.Context, flowName string, flowRunID int64, stepName string, ...) error
- func (c *Client) Unbind(ctx context.Context, queueName string, pattern string) error
- type Conn
- type Flow
- type FlowHandle
- type FlowInfo
- type FlowRunInfo
- type FlowScheduleInfo
- type FullJitterBackoff
- type HandlerOpts
- type Message
- type Optional
- type PublishOpts
- type QueueInfo
- type QueueOpts
- type ReadPollOpts
- type RunFlowOpts
- type RunTaskOpts
- type ScheduleOpts
- type SendOpts
- type Step
- type StepDependencyInfo
- type StepHandlerInfo
- type StepInfo
- type StepRunInfo
- type Task
- type TaskHandle
- type TaskHandlerInfo
- type TaskInfo
- type TaskRunInfo
- type TaskScheduleInfo
- type WaitOpts
- type Worker
- type WorkerInfo
- type WorkerOpts
Constants ¶
const ( StatusCreated = "created" StatusStarted = "started" StatusCompleted = "completed" StatusFailed = "failed" StatusSkipped = "skipped" // Step skipped due to condition )
Status constants for task and flow runs
const SchemaVersion = 13
Variables ¶
var ( // ErrRunFailed is returned when you try to unmarshal the output of a failed task or flow run ErrRunFailed = fmt.Errorf("run failed") )
Functions ¶
func Bind ¶
Bind subscribes a queue to a topic pattern. Pattern supports exact topics and wildcards: ? (single token), * (multi-token tail). Examples: "foo.bar", "foo.?.bar", "foo.bar.*"
func CreateFlow ¶
CreateFlow creates one or more flow definitions.
func CreateFlowSchedule ¶ added in v0.0.5
func CreateFlowSchedule(ctx context.Context, conn Conn, flowName, cronSpec string, opts ...ScheduleOpts) error
CreateFlowSchedule creates a cron-based schedule for a flow.
func CreateQueue ¶
CreateQueue creates a queue with the given name and optional options. Use Bind() separately to create topic bindings.
func CreateTask ¶
CreateTask creates one or more task definitions.
func CreateTaskSchedule ¶ added in v0.0.5
func CreateTaskSchedule(ctx context.Context, conn Conn, taskName, cronSpec string, opts ...ScheduleOpts) error
CreateTaskSchedule creates a cron-based schedule for a task.
func DeleteMany ¶
DeleteMany deletes multiple messages from the queue.
func DeleteQueue ¶
DeleteQueue deletes a queue and all its messages. Returns true if the queue existed.
func GC ¶
GC runs garbage collection to clean up expired queues and stale workers. Note: Worker heartbeats automatically perform cleanup, so this is mainly useful for deployments without workers or for manual control.
func Hide ¶
func Hide(ctx context.Context, conn Conn, queueName string, id int64, hideFor time.Duration) (bool, error)
Hide hides a single message from being read for the specified duration. Returns true if the message existed.
func HideMany ¶
func HideMany(ctx context.Context, conn Conn, queueName string, ids []int64, hideFor time.Duration) error
HideMany hides multiple messages from being read for the specified duration.
func Publish ¶ added in v0.0.3
Publish sends a message to topic-subscribed queues with options. Pass no opts to use defaults.
func PublishQuery ¶ added in v0.0.4
PublishQuery builds the SQL query and args for a Publish operation. Pass no opts to use defaults.
func RunFlowQuery ¶ added in v0.0.4
RunFlowQuery builds the SQL query and args for a RunFlow operation. Pass no opts to use defaults.
func RunTaskQuery ¶ added in v0.0.4
RunTaskQuery builds the SQL query and args for a RunTask operation. Pass no opts to use defaults.
func SendQuery ¶ added in v0.0.4
SendQuery builds the SQL query and args for a Send operation. Pass no opts to use defaults.
func SignalFlow ¶
func SignalFlow(ctx context.Context, conn Conn, flowName string, flowRunID int64, stepName string, input any) error
SignalFlow delivers a signal to a waiting step in a flow run. The step must have been defined with a signal variant (e.g., NewStepWithSignal, NewStepWithSignalAndDependency). Signals enable human-in-the-loop workflows where a step waits for external input before executing. Returns an error if the signal was already delivered or the step doesn't require a signal.
Types ¶
type BackoffStrategy ¶ added in v0.0.3
type BackoffStrategy interface {
// Validate returns an error if configuration is invalid.
Validate() error
// NextDelay returns a delay for a zero-based delivery count (first retry = 0).
// Implementations should always return a positive duration.
NextDelay(deliveryCount int) time.Duration
}
BackoffStrategy defines how retry delays are calculated based on delivery count. Implementations must be safe for concurrent use.
type CircuitBreaker ¶ added in v0.0.3
type CircuitBreaker struct {
// contains filtered or unexported fields
}
func NewCircuitBreaker ¶ added in v0.0.3
func NewCircuitBreaker(failureThreshold int, openTimeout time.Duration) *CircuitBreaker
func (*CircuitBreaker) RecordFailure ¶ added in v0.0.3
func (c *CircuitBreaker) RecordFailure(now time.Time)
func (*CircuitBreaker) RecordSuccess ¶ added in v0.0.3
func (c *CircuitBreaker) RecordSuccess()
func (*CircuitBreaker) Validate ¶ added in v0.0.3
func (c *CircuitBreaker) Validate() error
type CircuitBreakerStrategy ¶ added in v0.0.3
type CircuitBreakerStrategy interface {
// Validate returns an error if configuration is invalid.
Validate() error
// Allow returns whether a call is permitted and how long to wait if not.
Allow(now time.Time) (bool, time.Duration)
// RecordSuccess updates breaker state after a successful call.
RecordSuccess()
// RecordFailure updates breaker state after a failed call.
RecordFailure(now time.Time)
}
CircuitBreakerStrategy defines the interface for circuit breaker behavior. Implementations must be safe for concurrent use.
type Client ¶
type Client struct {
Conn Conn
}
Client is a facade for interacting with Catbird
func New ¶
New creates a new Client with the given database connection.
The connection can be a *pgxpool.Pool, *pgx.Conn, or pgx.Tx.
func (*Client) Bind ¶
Bind subscribes a queue to a topic pattern. Pattern supports exact topics and wildcards: ? (single token), * (multi-token tail). Examples: "foo.bar", "foo.?.bar", "foo.bar.*"
func (*Client) CreateFlow ¶
CreateFlow creates one or more flow definitions.
func (*Client) CreateFlowSchedule ¶ added in v0.0.5
func (c *Client) CreateFlowSchedule(ctx context.Context, flowName, cronSpec string, opts ...ScheduleOpts) error
CreateFlowSchedule creates a cron-based schedule for a flow. cronSpec should be in 5-field format (min hour day month dow) or descriptors like @hourly, @daily. opts is optional ScheduleOpts configuring the schedule (Input field for static input).
func (*Client) CreateQueue ¶
CreateQueue creates a queue with the given name and optional options.
func (*Client) CreateTask ¶
CreateTask creates one or more task definitions.
func (*Client) CreateTaskSchedule ¶ added in v0.0.5
func (c *Client) CreateTaskSchedule(ctx context.Context, taskName, cronSpec string, opts ...ScheduleOpts) error
CreateTaskSchedule creates a cron-based schedule for a task. cronSpec should be in 5-field format (min hour day month dow) or descriptors like @hourly, @daily. opts is optional ScheduleOpts configuring the schedule (Input field for static input).
func (*Client) Delete ¶
Delete deletes a single message from the queue. Returns true if the message existed.
func (*Client) DeleteMany ¶
DeleteMany deletes multiple messages from the queue.
func (*Client) DeleteQueue ¶
DeleteQueue deletes a queue and all its messages. Returns true if the queue existed.
func (*Client) GetFlowRun ¶
func (c *Client) GetFlowRun(ctx context.Context, flowName string, flowRunID int64) (*FlowRunInfo, error)
GetFlowRun retrieves a specific flow run result by ID.
func (*Client) GetFlowRunSteps ¶
func (c *Client) GetFlowRunSteps(ctx context.Context, flowName string, flowRunID int64) ([]*StepRunInfo, error)
GetFlowRunSteps retrieves all step runs for a specific flow run.
func (*Client) GetTaskRun ¶
func (c *Client) GetTaskRun(ctx context.Context, taskName string, taskRunID int64) (*TaskRunInfo, error)
GetTaskRun retrieves a specific task run result by ID.
func (*Client) Hide ¶
func (c *Client) Hide(ctx context.Context, queueName string, id int64, hideFor time.Duration) (bool, error)
Hide hides a single message from being read for the specified duration. Returns true if the message existed.
func (*Client) HideMany ¶
func (c *Client) HideMany(ctx context.Context, queueName string, ids []int64, hideFor time.Duration) error
HideMany hides multiple messages from being read for the specified duration.
func (*Client) ListFlowRuns ¶
ListFlowRuns returns recent flow runs for the specified flow.
func (*Client) ListFlowSchedules ¶ added in v0.0.5
func (c *Client) ListFlowSchedules(ctx context.Context) ([]*FlowScheduleInfo, error)
ListFlowSchedules returns all flow schedules ordered by next_run_at.
func (*Client) ListQueues ¶
ListQueues returns all queues
func (*Client) ListTaskRuns ¶
ListTaskRuns returns recent task runs for the specified task.
func (*Client) ListTaskSchedules ¶ added in v0.0.5
func (c *Client) ListTaskSchedules(ctx context.Context) ([]*TaskScheduleInfo, error)
ListTaskSchedules returns all task schedules ordered by next_run_at.
func (*Client) ListWorkers ¶
func (c *Client) ListWorkers(ctx context.Context) ([]*WorkerInfo, error)
ListWorkers returns all registered workers.
func (*Client) NewWorker ¶
func (c *Client) NewWorker(ctx context.Context, opts ...WorkerOpts) *Worker
NewWorker creates a new worker that processes task and flow executions. Use the builder pattern methods (AddTask, AddFlow, etc.) to configure, then call Start(ctx) to begin processing.
func (*Client) Publish ¶ added in v0.0.3
Publish sends a message to all queues subscribed to the specified topic.
func (*Client) Read ¶
func (c *Client) Read(ctx context.Context, queueName string, quantity int, hideFor time.Duration) ([]Message, error)
Read reads up to quantity messages from the queue, hiding them from other readers for the specified duration.
func (*Client) ReadPoll ¶
func (c *Client) ReadPoll(ctx context.Context, queueName string, quantity int, hideFor time.Duration, opts ...ReadPollOpts) ([]Message, error)
ReadPoll reads messages from a queue with polling support. It polls repeatedly at the specified interval until messages are available or the pollFor timeout is reached.
func (*Client) RunFlow ¶
func (c *Client) RunFlow(ctx context.Context, flowName string, input any, opts ...RunFlowOpts) (*FlowHandle, error)
RunFlow enqueues a flow execution and returns a handle for monitoring.
func (*Client) RunTask ¶
func (c *Client) RunTask(ctx context.Context, taskName string, input any, opts ...RunTaskOpts) (*TaskHandle, error)
RunTask enqueues a task execution and returns a handle for monitoring progress and retrieving output.
func (*Client) SignalFlow ¶
func (c *Client) SignalFlow(ctx context.Context, flowName string, flowRunID int64, stepName string, input any) error
SignalFlow delivers a signal to a waiting step in a flow run. The step must have been defined with a signal variant (e.g., NewStepWithSignal). Returns an error if the signal was already delivered or the step doesn't require a signal.
type Conn ¶
type Conn interface {
Exec(context.Context, string, ...any) (pgconn.CommandTag, error)
Query(context.Context, string, ...any) (pgx.Rows, error)
QueryRow(context.Context, string, ...any) pgx.Row
}
Conn is an interface for database connections compatible with pgx.Conn and pgx.Pool
type FlowHandle ¶ added in v0.0.6
FlowHandle is a handle to a flow execution.
func RunFlow ¶
func RunFlow(ctx context.Context, conn Conn, flowName string, input any, opts ...RunFlowOpts) (*FlowHandle, error)
RunFlow enqueues a flow execution and returns a handle for monitoring.
func (*FlowHandle) WaitForOutput ¶ added in v0.0.6
WaitForOutput blocks until the flow execution completes and unmarshals the output. Pass optional WaitOpts to customize polling behavior; defaults are used when omitted.
type FlowInfo ¶
type FlowInfo struct {
Name string `json:"name"`
Steps []StepInfo `json:"steps"`
CreatedAt time.Time `json:"created_at"`
}
type FlowRunInfo ¶ added in v0.0.6
type FlowRunInfo struct {
ID int64 `json:"id"`
ConcurrencyKey string `json:"concurrency_key,omitempty"`
IdempotencyKey string `json:"idempotency_key,omitempty"`
Status string `json:"status"`
Input json.RawMessage `json:"input,omitempty"`
Output json.RawMessage `json:"output,omitempty"`
ErrorMessage string `json:"error_message,omitempty"`
StartedAt time.Time `json:"started_at,omitzero"`
CompletedAt time.Time `json:"completed_at,omitzero"`
FailedAt time.Time `json:"failed_at,omitzero"`
}
FlowRunInfo represents the details of a flow execution.
func GetFlowRun ¶
func GetFlowRun(ctx context.Context, conn Conn, flowName string, flowRunID int64) (*FlowRunInfo, error)
GetFlowRun retrieves a specific flow run result by ID.
func ListFlowRuns ¶
ListFlowRuns returns recent flow runs for the specified flow.
func (*FlowRunInfo) OutputAs ¶ added in v0.0.6
func (r *FlowRunInfo) OutputAs(out any) error
OutputAs unmarshals the output of a completed flow run. Returns an error if the flow run has failed or is not completed yet.
type FlowScheduleInfo ¶ added in v0.0.5
type FlowScheduleInfo struct {
FlowName string `json:"flow_name"`
CronSpec string `json:"cron_spec"`
NextRunAt time.Time `json:"next_run_at"`
LastRunAt time.Time `json:"last_run_at,omitzero"`
LastEnqueuedAt time.Time `json:"last_enqueued_at,omitzero"`
Enabled bool `json:"enabled"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
FlowScheduleInfo contains metadata about a scheduled flow.
func ListFlowSchedules ¶ added in v0.0.5
func ListFlowSchedules(ctx context.Context, conn Conn) ([]*FlowScheduleInfo, error)
ListFlowSchedules returns all flow schedules ordered by next_run_at.
type FullJitterBackoff ¶ added in v0.0.3
FullJitterBackoff implements exponential backoff with full jitter.
func NewFullJitterBackoff ¶ added in v0.0.3
func NewFullJitterBackoff(minDelay, maxDelay time.Duration) *FullJitterBackoff
NewFullJitterBackoff creates a FullJitterBackoff with the provided bounds.
func (*FullJitterBackoff) NextDelay ¶ added in v0.0.3
func (b *FullJitterBackoff) NextDelay(deliveryCount int) time.Duration
NextDelay returns the jittered delay for the given delivery count. deliveryCount is expected to be zero-based for the first retry.
func (*FullJitterBackoff) Validate ¶ added in v0.0.3
func (b *FullJitterBackoff) Validate() error
Validate checks the backoff configuration for consistency.
type HandlerOpts ¶ added in v0.0.3
type HandlerOpts struct {
Concurrency int
BatchSize int
Timeout time.Duration
MaxRetries int
Backoff BackoffStrategy
CircuitBreaker CircuitBreakerStrategy
}
type Message ¶
type Message struct {
ID int64 `json:"id"`
IdempotencyKey string `json:"idempotency_key,omitempty"`
Topic string `json:"topic"`
Payload json.RawMessage `json:"payload"`
Deliveries int `json:"deliveries"`
CreatedAt time.Time `json:"created_at"`
VisibleAt time.Time `json:"visible_at"`
}
Message represents a message in a queue
func Read ¶
func Read(ctx context.Context, conn Conn, queueName string, quantity int, hideFor time.Duration) ([]Message, error)
Read reads up to quantity messages from the queue, hiding them from other readers for the specified duration.
func ReadPoll ¶
func ReadPoll(ctx context.Context, conn Conn, queueName string, quantity int, hideFor time.Duration, opts ...ReadPollOpts) ([]Message, error)
ReadPoll reads messages from a queue with polling support. It polls repeatedly at the specified interval until messages are available or the pollFor timeout is reached. Pass optional ReadPollOpts to configure polling behavior; defaults are used when omitted.
type PublishOpts ¶ added in v0.0.3
type QueueInfo ¶
type QueueInfo struct {
Name string `json:"name"`
Unlogged bool `json:"unlogged"`
CreatedAt time.Time `json:"created_at"`
ExpiresAt time.Time `json:"expires_at,omitzero"`
}
type ReadPollOpts ¶ added in v0.0.6
ReadPollOpts configures ReadPoll polling behavior. Zero values use defaults.
type RunFlowOpts ¶
type RunTaskOpts ¶ added in v0.0.6
type ScheduleOpts ¶ added in v0.0.5
type ScheduleOpts struct {
Input any
}
ScheduleOpts configures scheduled task/flow behavior.
type StepDependencyInfo ¶
type StepDependencyInfo struct {
Name string `json:"name"`
}
type StepHandlerInfo ¶
type StepInfo ¶
type StepInfo struct {
Name string `json:"name"`
DependsOn []StepDependencyInfo `json:"depends_on,omitempty"`
}
type StepRunInfo ¶
type StepRunInfo struct {
ID int64 `json:"id"`
StepName string `json:"step_name"`
Status string `json:"status"`
Output json.RawMessage `json:"output,omitempty"`
ErrorMessage string `json:"error_message,omitempty"`
StartedAt time.Time `json:"started_at,omitzero"`
CompletedAt time.Time `json:"completed_at,omitzero"`
FailedAt time.Time `json:"failed_at,omitzero"`
SkippedAt time.Time `json:"skipped_at,omitzero"`
}
StepRunInfo represents the execution state of a single step within a flow run.
func GetFlowRunSteps ¶
func GetFlowRunSteps(ctx context.Context, conn Conn, flowName string, flowRunID int64) ([]*StepRunInfo, error)
GetFlowRunSteps retrieves all step runs for a specific flow run.
type Task ¶
type Task struct {
// contains filtered or unexported fields
}
Task is a reflection-based task with optional handler. Use NewTask().Handler(fn, opts) for tasks with handlers. Use NewTask() for definition-only tasks.
func NewTask ¶
NewTask creates a new task definition with the given name. Chain .Handler() to add a handler, otherwise returns a definition-only task.
func (*Task) Handler ¶ added in v0.0.3
func (t *Task) Handler(fn any, opts ...HandlerOpts) *Task
Handler sets the task handler function and execution options. fn must have signature (context.Context, In) (Out, error). If opts is omitted, defaults are used (concurrency: 1, batchSize: 10).
func (*Task) MarshalJSON ¶ added in v0.0.3
MarshalJSON serializes the task for JSON output
type TaskHandle ¶ added in v0.0.6
TaskHandle is a handle to a task execution.
func RunTask ¶
func RunTask(ctx context.Context, conn Conn, taskName string, input any, opts ...RunTaskOpts) (*TaskHandle, error)
RunTask enqueues a task execution and returns a handle for monitoring progress and retrieving output.
func (*TaskHandle) WaitForOutput ¶ added in v0.0.6
WaitForOutput blocks until the task execution completes and unmarshals the output. Pass optional WaitOpts to customize polling behavior; defaults are used when omitted.
type TaskHandlerInfo ¶
type TaskHandlerInfo struct {
TaskName string `json:"task_name"`
}
type TaskRunInfo ¶ added in v0.0.6
type TaskRunInfo struct {
ID int64 `json:"id"`
ConcurrencyKey string `json:"concurrency_key,omitempty"`
IdempotencyKey string `json:"idempotency_key,omitempty"`
Status string `json:"status"`
Input json.RawMessage `json:"input,omitempty"`
Output json.RawMessage `json:"output,omitempty"`
ErrorMessage string `json:"error_message,omitempty"`
StartedAt time.Time `json:"started_at,omitzero"`
CompletedAt time.Time `json:"completed_at,omitzero"`
FailedAt time.Time `json:"failed_at,omitzero"`
SkippedAt time.Time `json:"skipped_at,omitzero"`
}
TaskRunInfo represents the details of a task execution.
func GetTaskRun ¶
func GetTaskRun(ctx context.Context, conn Conn, taskName string, taskRunID int64) (*TaskRunInfo, error)
GetTaskRun retrieves a specific task run result by ID.
func ListTaskRuns ¶
ListTaskRuns returns recent task runs for the specified task.
func (*TaskRunInfo) OutputAs ¶ added in v0.0.6
func (r *TaskRunInfo) OutputAs(out any) error
OutputAs unmarshals the output of a completed task run. Returns an error if the task run has failed or is not completed yet.
type TaskScheduleInfo ¶ added in v0.0.5
type TaskScheduleInfo struct {
TaskName string `json:"task_name"`
CronSpec string `json:"cron_spec"`
NextRunAt time.Time `json:"next_run_at"`
LastRunAt time.Time `json:"last_run_at,omitzero"`
LastEnqueuedAt time.Time `json:"last_enqueued_at,omitzero"`
Enabled bool `json:"enabled"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
TaskScheduleInfo contains metadata about a scheduled task.
func ListTaskSchedules ¶ added in v0.0.5
func ListTaskSchedules(ctx context.Context, conn Conn) ([]*TaskScheduleInfo, error)
ListTaskSchedules returns all task schedules ordered by next_run_at.
type WaitOpts ¶ added in v0.0.6
WaitOpts configures WaitForOutput polling behavior. Zero values use defaults.
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker processes tasks and flows from the queue
func NewWorker ¶
func NewWorker(conn Conn, opts ...WorkerOpts) *Worker
NewWorker creates a new worker with the given connection and configuration. Use builder methods (AddTask, AddFlow, etc.) to configure the worker. Call Start(ctx) to begin processing tasks and flows.
func (*Worker) Start ¶
Start begins processing tasks and flows.
The worker will:
- poll for new work and execute task and flow step handlers while ctx is active
- run any configured cron-style task and flow schedules
- send periodic heartbeats while it is running
- register built-in garbage collection task running every 5 minutes
Shutdown behaviour:
- when ctx is cancelled the worker immediately stops reading new work and begins shutting down
- if ShutdownTimeout is set to a value > 0, that duration is used as a grace period for in‑flight handlers after ctx is cancelled; once the grace period expires the handler context is cancelled and remaining handlers are asked to stop. The default graceful shutdown timeout is 5 seconds.
- if ShutdownTimeout is not set or set to 0, there is no grace period: the handler context is cancelled immediately once ctx is cancelled and Start returns after all goroutines finish
type WorkerInfo ¶
type WorkerInfo struct {
ID string `json:"id"`
TaskHandlers []*TaskHandlerInfo `json:"task_handlers"`
StepHandlers []*StepHandlerInfo `json:"step_handlers"`
StartedAt time.Time `json:"started_at"`
LastHeartbeatAt time.Time `json:"last_heartbeat_at"`
}
func ListWorkers ¶
func ListWorkers(ctx context.Context, conn Conn) ([]*WorkerInfo, error)
ListWorkers returns all registered workers.