Documentation
¶
Overview ¶
Package catbird provides a PostgreSQL-based message queue with task and workflow execution engine.
Scheduling for Distributed Environments ¶
Catbird includes built-in support for scheduled task and flow execution using cron syntax. When multiple workers run concurrently (on different machines or processes), Catbird guarantees that each scheduled execution runs **exactly once per cron tick**, even across clock skew and timezone differences.
This is achieved through:
- UTC-normalized cron scheduling: all workers use UTC, eliminating timezone confusion
- Idempotency key deduplication: each cron tick generates a deterministic key (format: "schedule:{unix_nanos_utc}") that persists across completion
- PostgreSQL as the single source of truth: the database enforces the unique constraint on idempotency keys, preventing duplicates at the atomic level
See scheduler.go for implementation details and SCHEDULING_ADVANCED.md for the roadmap toward a future DB-driven scheduling system with even stronger guarantees.
Index ¶
- Constants
- Variables
- func Bind(ctx context.Context, conn Conn, queue string, pattern string) error
- func CreateFlow(ctx context.Context, conn Conn, flow *Flow) error
- func CreateQueue(ctx context.Context, conn Conn, name string) error
- func CreateQueueWithOpts(ctx context.Context, conn Conn, name string, opts QueueOpts) error
- func CreateTask(ctx context.Context, conn Conn, task *Task) error
- func Delete(ctx context.Context, conn Conn, queue string, id int64) (bool, error)
- func DeleteMany(ctx context.Context, conn Conn, queue string, ids []int64) error
- func DeleteQueue(ctx context.Context, conn Conn, name string) (bool, error)
- func Dispatch(ctx context.Context, conn Conn, topic string, payload any) error
- func DispatchWithOpts(ctx context.Context, conn Conn, topic string, payload any, opts DispatchOpts) error
- func EnqueueDispatch(batch *pgx.Batch, topic string, payload any, opts DispatchOpts) error
- func EnqueueSend(batch *pgx.Batch, queue string, payload any, opts SendOpts) error
- func GC(ctx context.Context, conn Conn) error
- func Hide(ctx context.Context, conn Conn, queue string, id int64, hideFor time.Duration) (bool, error)
- func HideMany(ctx context.Context, conn Conn, queue string, ids []int64, ...) error
- func MigrateDownTo(ctx context.Context, db *sql.DB, version int) error
- func MigrateUpTo(ctx context.Context, db *sql.DB, version int) error
- func Send(ctx context.Context, conn Conn, queue string, payload any) error
- func SendWithOpts(ctx context.Context, conn Conn, queue string, payload any, opts SendOpts) error
- func SignalFlow(ctx context.Context, conn Conn, flowName string, flowRunID int64, ...) error
- func Unbind(ctx context.Context, conn Conn, queue string, pattern string) error
- type Client
- func (c *Client) Bind(ctx context.Context, queue string, pattern string) error
- func (c *Client) CreateFlow(ctx context.Context, flow *Flow) error
- func (c *Client) CreateQueue(ctx context.Context, name string) error
- func (c *Client) CreateQueueWithOpts(ctx context.Context, name string, opts QueueOpts) error
- func (c *Client) CreateTask(ctx context.Context, task *Task) error
- func (c *Client) Delete(ctx context.Context, queue string, id int64) (bool, error)
- func (c *Client) DeleteMany(ctx context.Context, queue string, ids []int64) error
- func (c *Client) DeleteQueue(ctx context.Context, name string) (bool, error)
- func (c *Client) Dispatch(ctx context.Context, topic string, payload any) error
- func (c *Client) DispatchWithOpts(ctx context.Context, topic string, payload any, opts DispatchOpts) error
- func (c *Client) GC(ctx context.Context) error
- func (c *Client) GetFlow(ctx context.Context, name string) (*FlowInfo, error)
- func (c *Client) GetFlowRun(ctx context.Context, name string, id int64) (*RunInfo, error)
- func (c *Client) GetFlowRunSteps(ctx context.Context, flowName string, flowRunID int64) ([]*StepRunInfo, error)
- func (c *Client) GetQueue(ctx context.Context, name string) (*QueueInfo, error)
- func (c *Client) GetTask(ctx context.Context, name string) (*TaskInfo, error)
- func (c *Client) GetTaskRun(ctx context.Context, name string, id int64) (*RunInfo, error)
- func (c *Client) Hide(ctx context.Context, queue string, id int64, hideFor time.Duration) (bool, error)
- func (c *Client) HideMany(ctx context.Context, queue string, ids []int64, hideFor time.Duration) error
- func (c *Client) ListFlowRuns(ctx context.Context, name string) ([]*RunInfo, error)
- func (c *Client) ListFlows(ctx context.Context) ([]*FlowInfo, error)
- func (c *Client) ListQueues(ctx context.Context) ([]*QueueInfo, error)
- func (c *Client) ListTaskRuns(ctx context.Context, name string) ([]*RunInfo, error)
- func (c *Client) ListTasks(ctx context.Context) ([]*TaskInfo, error)
- func (c *Client) ListWorkers(ctx context.Context) ([]*WorkerInfo, error)
- func (c *Client) NewWorker(ctx context.Context, opts ...WorkerOpt) (*Worker, error)
- func (c *Client) Read(ctx context.Context, queue string, quantity int, hideFor time.Duration) ([]Message, error)
- func (c *Client) ReadPoll(ctx context.Context, queue string, quantity int, ...) ([]Message, error)
- func (c *Client) RunFlow(ctx context.Context, name string, input any, opts ...RunOpts) (*RunHandle, error)
- func (c *Client) RunFlowWithOpts(ctx context.Context, name string, input any, opts RunFlowOpts) (*RunHandle, error)
- func (c *Client) RunTask(ctx context.Context, name string, input any, opts ...RunOpts) (*RunHandle, error)
- func (c *Client) RunTaskWithOpts(ctx context.Context, name string, input any, opts RunOpts) (*RunHandle, error)
- func (c *Client) Send(ctx context.Context, queue string, payload any) error
- func (c *Client) SendWithOpts(ctx context.Context, queue string, payload any, opts SendOpts) error
- func (c *Client) SignalFlow(ctx context.Context, flowName string, flowRunID int64, stepName string, ...) error
- func (c *Client) Unbind(ctx context.Context, queue string, pattern string) error
- type Conn
- type DispatchOpts
- type Flow
- type FlowInfo
- type FlowOpt
- func InitialStep[In, Out any](name string, fn func(context.Context, In) (Out, error), opts ...HandlerOpt) FlowOpt
- func InitialStepWithSignal[In, SigIn, Out any](name string, fn func(context.Context, In, SigIn) (Out, error), ...) FlowOpt
- func StepWithDependency[In, Dep1Out, Out any](name string, dep1 *StepDependency, ...) FlowOpt
- func StepWithDependencyAndSignal[In, SigIn, Dep1Out, Out any](name string, dep1 *StepDependency, ...) FlowOpt
- func StepWithThreeDependencies[In, Dep1Out, Dep2Out, Dep3Out, Out any](name string, dep1 *StepDependency, dep2 *StepDependency, dep3 *StepDependency, ...) FlowOpt
- func StepWithThreeDependenciesAndSignal[In, SigIn, Dep1Out, Dep2Out, Dep3Out, Out any](name string, dep1 *StepDependency, dep2 *StepDependency, dep3 *StepDependency, ...) FlowOpt
- func StepWithTwoDependencies[In, Dep1Out, Dep2Out, Out any](name string, dep1 *StepDependency, dep2 *StepDependency, ...) FlowOpt
- func StepWithTwoDependenciesAndSignal[In, SigIn, Dep1Out, Dep2Out, Out any](name string, dep1 *StepDependency, dep2 *StepDependency, ...) FlowOpt
- type HandlerOpt
- func WithBackoff(minDelay, maxDelay time.Duration) HandlerOpt
- func WithBatchSize(n int) HandlerOpt
- func WithCircuitBreaker(failureThreshold int, openTimeout time.Duration) HandlerOpt
- func WithConcurrency(n int) HandlerOpt
- func WithCondition(expr string) HandlerOpt
- func WithMaxDuration(d time.Duration) HandlerOpt
- func WithMaxRetries(n int) HandlerOpt
- type Message
- type Optional
- type QueueInfo
- type QueueOpts
- type RunFlowOpts
- type RunHandle
- func RunFlow(ctx context.Context, conn Conn, name string, input any, opts ...RunOpts) (*RunHandle, error)
- func RunFlowWithOpts(ctx context.Context, conn Conn, name string, input any, opts RunFlowOpts) (*RunHandle, error)
- func RunTask(ctx context.Context, conn Conn, name string, input any, opts ...RunOpts) (*RunHandle, error)
- func RunTaskWithOpts(ctx context.Context, conn Conn, name string, input any, opts RunOpts) (*RunHandle, error)
- type RunInfo
- func GetFlowRun(ctx context.Context, conn Conn, name string, id int64) (*RunInfo, error)
- func GetTaskRun(ctx context.Context, conn Conn, name string, id int64) (*RunInfo, error)
- func ListFlowRuns(ctx context.Context, conn Conn, name string) ([]*RunInfo, error)
- func ListTaskRuns(ctx context.Context, conn Conn, name string) ([]*RunInfo, error)
- type RunOpts
- type ScheduleOpt
- type Scheduler
- type SendOpts
- type Step
- type StepDependency
- type StepDependencyInfo
- type StepHandlerInfo
- type StepInfo
- type StepOpt
- type StepRunInfo
- type Task
- type TaskHandlerInfo
- type TaskInfo
- type Worker
- type WorkerInfo
- type WorkerOpt
- func WithFlow(f *Flow) WorkerOpt
- func WithGracefulShutdown(d time.Duration) WorkerOpt
- func WithLogger(l *slog.Logger) WorkerOpt
- func WithScheduledFlow(flowName string, schedule string, opts ...ScheduleOpt) WorkerOpt
- func WithScheduledTask(taskName string, schedule string, opts ...ScheduleOpt) WorkerOpt
- func WithTask(t *Task) WorkerOpt
Constants ¶
const ( StatusCreated = "created" StatusStarted = "started" StatusCompleted = "completed" StatusFailed = "failed" StatusSkipped = "skipped" // Step skipped due to condition )
Status constants for task and flow runs
const SchemaVersion = 6
Variables ¶
var ( // ErrRunFailed is returned when you try to unmarshal the output of a failed task or flow run ErrRunFailed = fmt.Errorf("run failed") )
Functions ¶
func Bind ¶
Bind subscribes a queue to a topic pattern. Pattern supports exact topics and wildcards: ? (single token), * (multi-token tail). Examples: "foo.bar", "foo.?.bar", "foo.bar.*"
func CreateFlow ¶
CreateFlow creates a new flow definition.
func CreateQueue ¶
CreateQueue creates a new queue with the given name.
func CreateQueueWithOpts ¶
CreateQueueWithOpts creates a queue with the specified options. Use Bind() separately to create topic bindings.
func CreateTask ¶
CreateTask creates a new task definition.
func DeleteMany ¶
DeleteMany deletes multiple messages from the queue.
func DeleteQueue ¶
DeleteQueue deletes a queue and all its messages. Returns true if the queue existed.
func DispatchWithOpts ¶
func DispatchWithOpts(ctx context.Context, conn Conn, topic string, payload any, opts DispatchOpts) error
DispatchWithOpts sends a message to topic-subscribed queues with options for idempotency key and delivery time.
func EnqueueDispatch ¶
EnqueueDispatch adds a Dispatch operation to a batch for efficient bulk message dispatching.
func EnqueueSend ¶
EnqueueSend adds a Send operation to a batch for efficient bulk message sending.
func Hide ¶
func Hide(ctx context.Context, conn Conn, queue string, id int64, hideFor time.Duration) (bool, error)
Hide hides a single message from being read for the specified duration. Returns true if the message existed.
func HideMany ¶
func HideMany(ctx context.Context, conn Conn, queue string, ids []int64, hideFor time.Duration) error
HideMany hides multiple messages from being read for the specified duration.
func SendWithOpts ¶
SendWithOpts enqueues a message with options for topic, idempotency key, and delivery time.
func SignalFlow ¶
func SignalFlow(ctx context.Context, conn Conn, flowName string, flowRunID int64, stepName string, input any) error
SignalFlow delivers a signal to a waiting step in a flow run. The step must have been defined with a signal variant (e.g., InitialStepWithSignal, StepWithDependencyAndSignal). Signals enable human-in-the-loop workflows where a step waits for external input before executing. Returns an error if the signal was already delivered or the step doesn't require a signal.
Types ¶
type Client ¶
type Client struct {
Conn Conn
}
Client is a facade for interacting with Catbird
func New ¶
New creates a new Client with the given database connection.
The connection can be a *pgxpool.Pool, *pgx.Conn, or pgx.Tx.
func (*Client) Bind ¶
Bind subscribes a queue to a topic pattern. Pattern supports exact topics and wildcards: ? (single token), * (multi-token tail). Examples: "foo.bar", "foo.?.bar", "foo.bar.*"
func (*Client) CreateFlow ¶
CreateFlow creates a new flow definition.
func (*Client) CreateQueue ¶
CreateQueue creates a new queue with the given name.
func (*Client) CreateQueueWithOpts ¶
CreateQueueWithOpts creates a queue with the specified options including topics, deletion time, and unlogged mode.
func (*Client) CreateTask ¶
CreateTask creates a new task definition.
func (*Client) Delete ¶
Delete deletes a single message from the queue. Returns true if the message existed.
func (*Client) DeleteMany ¶
DeleteMany deletes multiple messages from the queue.
func (*Client) DeleteQueue ¶
DeleteQueue deletes a queue and all its messages. Returns true if the queue existed.
func (*Client) DispatchWithOpts ¶
func (c *Client) DispatchWithOpts(ctx context.Context, topic string, payload any, opts DispatchOpts) error
DispatchWithOpts sends a message to topic-subscribed queues with options for deduplication ID and delivery time.
func (*Client) GetFlowRun ¶
GetFlowRun retrieves a specific flow run result by ID.
func (*Client) GetFlowRunSteps ¶
func (c *Client) GetFlowRunSteps(ctx context.Context, flowName string, flowRunID int64) ([]*StepRunInfo, error)
GetFlowRunSteps retrieves all step runs for a specific flow run.
func (*Client) GetTaskRun ¶
GetTaskRun retrieves a specific task run result by ID.
func (*Client) Hide ¶
func (c *Client) Hide(ctx context.Context, queue string, id int64, hideFor time.Duration) (bool, error)
Hide hides a single message from being read for the specified duration. Returns true if the message existed.
func (*Client) HideMany ¶
func (c *Client) HideMany(ctx context.Context, queue string, ids []int64, hideFor time.Duration) error
HideMany hides multiple messages from being read for the specified duration.
func (*Client) ListFlowRuns ¶
ListFlowRuns returns recent flow runs for the specified flow.
func (*Client) ListQueues ¶
ListQueues returns all queues
func (*Client) ListTaskRuns ¶
ListTaskRuns returns recent task runs for the specified task.
func (*Client) ListWorkers ¶
func (c *Client) ListWorkers(ctx context.Context) ([]*WorkerInfo, error)
ListWorkers returns all registered workers.
func (*Client) NewWorker ¶
NewWorker creates a new worker that processes task and flow executions. Configure the worker with options like WithTask, WithFlow, and WithScheduledTask.
func (*Client) Read ¶
func (c *Client) Read(ctx context.Context, queue string, quantity int, hideFor time.Duration) ([]Message, error)
Read reads up to quantity messages from the queue, hiding them from other readers for the specified duration.
func (*Client) ReadPoll ¶
func (c *Client) ReadPoll(ctx context.Context, queue string, quantity int, hideFor, pollFor, pollInterval time.Duration) ([]Message, error)
ReadPoll reads messages from a queue with polling support. It polls repeatedly at the specified interval until messages are available or the pollFor timeout is reached.
func (*Client) RunFlow ¶
func (c *Client) RunFlow(ctx context.Context, name string, input any, opts ...RunOpts) (*RunHandle, error)
RunFlow enqueues a flow execution and returns a handle for monitoring.
func (*Client) RunFlowWithOpts ¶
func (c *Client) RunFlowWithOpts(ctx context.Context, name string, input any, opts RunFlowOpts) (*RunHandle, error)
RunFlowWithOpts enqueues a flow with options for concurrency/idempotency control.
func (*Client) RunTask ¶
func (c *Client) RunTask(ctx context.Context, name string, input any, opts ...RunOpts) (*RunHandle, error)
RunTask enqueues a task execution and returns a handle for monitoring progress and retrieving output.
func (*Client) RunTaskWithOpts ¶
func (c *Client) RunTaskWithOpts(ctx context.Context, name string, input any, opts RunOpts) (*RunHandle, error)
RunTaskWithOpts enqueues a task with options for concurrency/idempotency control.
func (*Client) Send ¶
Send enqueues a message to the specified queue. The payload is marshaled to JSON.
func (*Client) SendWithOpts ¶
SendWithOpts enqueues a message with options for topic, deduplication ID, and delivery time.
func (*Client) SignalFlow ¶
func (c *Client) SignalFlow(ctx context.Context, flowName string, flowRunID int64, stepName string, input any) error
SignalFlow delivers a signal to a waiting step in a flow run. The step must have been defined with a signal variant (e.g., InitialStepWithSignal). Returns an error if the signal was already delivered or the step doesn't require a signal.
type Conn ¶
type Conn interface {
Exec(context.Context, string, ...any) (pgconn.CommandTag, error)
Query(context.Context, string, ...any) (pgx.Rows, error)
QueryRow(context.Context, string, ...any) pgx.Row
}
Conn is an interface for database connections compatible with pgx.Conn and pgx.Pool
type DispatchOpts ¶
type FlowInfo ¶
type FlowInfo struct {
Name string `json:"name"`
Steps []StepInfo `json:"steps"`
CreatedAt time.Time `json:"created_at"`
}
type FlowOpt ¶
type FlowOpt func(*Flow)
func InitialStep ¶
func InitialStep[In, Out any](name string, fn func(context.Context, In) (Out, error), opts ...HandlerOpt) FlowOpt
InitialStep creates a flow step with no dependencies The handler receives the flow input directly and produces output Input and output types are automatically marshaled to/from JSON
func InitialStepWithSignal ¶
func InitialStepWithSignal[In, SigIn, Out any](name string, fn func(context.Context, In, SigIn) (Out, error), opts ...HandlerOpt) FlowOpt
InitialStepWithSignal creates a flow step with no dependencies that requires a signal The handler receives the flow input and signal input, then produces output Step waits for signal delivery via Signal() before executing
func StepWithDependency ¶
func StepWithDependency[In, Dep1Out, Out any](name string, dep1 *StepDependency, fn func(context.Context, In, Dep1Out) (Out, error), opts ...HandlerOpt) FlowOpt
StepWithDependency creates a flow step that depends on one previous step The handler receives the flow input and the output of the dependency step
func StepWithDependencyAndSignal ¶
func StepWithDependencyAndSignal[In, SigIn, Dep1Out, Out any](name string, dep1 *StepDependency, fn func(context.Context, In, SigIn, Dep1Out) (Out, error), opts ...HandlerOpt) FlowOpt
StepWithDependencyAndSignal creates a flow step with one dependency that requires a signal The handler receives the flow input, signal input, and the output of the dependency step Step waits for both dependency completion AND signal delivery before executing
func StepWithThreeDependencies ¶
func StepWithThreeDependencies[In, Dep1Out, Dep2Out, Dep3Out, Out any](name string, dep1 *StepDependency, dep2 *StepDependency, dep3 *StepDependency, fn func(context.Context, In, Dep1Out, Dep2Out, Dep3Out) (Out, error), opts ...HandlerOpt) FlowOpt
StepWithThreeDependencies creates a flow step that depends on three previous steps The handler receives the flow input and the outputs of all three dependency steps
func StepWithThreeDependenciesAndSignal ¶
func StepWithThreeDependenciesAndSignal[In, SigIn, Dep1Out, Dep2Out, Dep3Out, Out any](name string, dep1 *StepDependency, dep2 *StepDependency, dep3 *StepDependency, fn func(context.Context, In, SigIn, Dep1Out, Dep2Out, Dep3Out) (Out, error), opts ...HandlerOpt) FlowOpt
StepWithThreeDependenciesAndSignal creates a flow step with three dependencies that requires a signal The handler receives the flow input, signal input, and the outputs of all three dependency steps Step waits for all dependencies to complete AND signal delivery before executing
func StepWithTwoDependencies ¶
func StepWithTwoDependencies[In, Dep1Out, Dep2Out, Out any](name string, dep1 *StepDependency, dep2 *StepDependency, fn func(context.Context, In, Dep1Out, Dep2Out) (Out, error), opts ...HandlerOpt) FlowOpt
StepWithTwoDependencies creates a flow step that depends on two previous steps The handler receives the flow input and the outputs of both dependency steps
func StepWithTwoDependenciesAndSignal ¶
func StepWithTwoDependenciesAndSignal[In, SigIn, Dep1Out, Dep2Out, Out any](name string, dep1 *StepDependency, dep2 *StepDependency, fn func(context.Context, In, SigIn, Dep1Out, Dep2Out) (Out, error), opts ...HandlerOpt) FlowOpt
StepWithTwoDependenciesAndSignal creates a flow step with two dependencies that requires a signal The handler receives the flow input, signal input, and the outputs of both dependency steps Step waits for both dependencies to complete AND signal delivery before executing
type HandlerOpt ¶
type HandlerOpt func(*handlerOpts)
HandlerOpt is an option for configuring task and flow step handlers
func WithBackoff ¶
func WithBackoff(minDelay, maxDelay time.Duration) HandlerOpt
WithBackoff sets the delay between retries, exponentially backing off from minDelay to maxDelay
func WithBatchSize ¶
func WithBatchSize(n int) HandlerOpt
WithBatchSize sets the number of messages to read per batch
func WithCircuitBreaker ¶
func WithCircuitBreaker(failureThreshold int, openTimeout time.Duration) HandlerOpt
WithCircuitBreaker configures a circuit breaker for handler execution. failureThreshold is the number of consecutive failures before opening. openTimeout controls how long the circuit stays open before trying again.
func WithConcurrency ¶
func WithConcurrency(n int) HandlerOpt
WithConcurrency sets the number of concurrent handler executions
func WithCondition ¶
func WithCondition(expr string) HandlerOpt
WithCondition sets a condition that must be satisfied for the handler to execute. If the condition evaluates to false, the task/step is skipped. Condition syntax: "input.field op value" or "dep_name.field op value" Example: WithCondition("input.priority eq high"), WithCondition("validate.score gte 80")
func WithMaxDuration ¶
func WithMaxDuration(d time.Duration) HandlerOpt
WithMaxDuration sets the maximum duration for handler execution
func WithMaxRetries ¶
func WithMaxRetries(n int) HandlerOpt
WithMaxRetries sets the number of retry attempts for failed handlers
type Message ¶
type Message struct {
ID int64 `json:"id"`
IdempotencyKey string `json:"idempotency_key,omitempty"`
Topic string `json:"topic"`
Payload json.RawMessage `json:"payload"`
Deliveries int `json:"deliveries"`
CreatedAt time.Time `json:"created_at"`
DeliverAt time.Time `json:"deliver_at"`
}
Message represents a message in a queue
func Read ¶
func Read(ctx context.Context, conn Conn, queue string, quantity int, hideFor time.Duration) ([]Message, error)
Read reads up to quantity messages from the queue, hiding them from other readers for the specified duration.
func ReadPoll ¶
func ReadPoll(ctx context.Context, conn Conn, queue string, quantity int, hideFor, pollFor, pollInterval time.Duration) ([]Message, error)
ReadPoll reads messages from a queue with polling support. It polls repeatedly at the specified interval until messages are available or the pollFor timeout is reached.
type QueueInfo ¶
type QueueInfo struct {
Name string `json:"name"`
Unlogged bool `json:"unlogged"`
CreatedAt time.Time `json:"created_at"`
ExpiresAt time.Time `json:"expires_at,omitzero"`
}
type RunFlowOpts ¶
type RunFlowOpts = RunOpts
type RunHandle ¶
RunHandle is a handle to a task or flow execution
func RunFlow ¶
func RunFlow(ctx context.Context, conn Conn, name string, input any, opts ...RunOpts) (*RunHandle, error)
RunFlow enqueues a flow execution and returns a handle for monitoring.
func RunFlowWithOpts ¶
func RunFlowWithOpts(ctx context.Context, conn Conn, name string, input any, opts RunFlowOpts) (*RunHandle, error)
RunFlowWithOpts enqueues a flow with options for concurrency/idempotency control.
func RunTask ¶
func RunTask(ctx context.Context, conn Conn, name string, input any, opts ...RunOpts) (*RunHandle, error)
RunTask enqueues a task execution and returns a handle for monitoring progress and retrieving output.
type RunInfo ¶
type RunInfo struct {
ID int64 `json:"id"`
ConcurrencyKey string `json:"concurrency_key,omitempty"`
IdempotencyKey string `json:"idempotency_key,omitempty"`
Status string `json:"status"`
Input json.RawMessage `json:"input,omitempty"`
Output json.RawMessage `json:"output,omitempty"`
ErrorMessage string `json:"error_message,omitempty"`
StartedAt time.Time `json:"started_at,omitzero"`
CompletedAt time.Time `json:"completed_at,omitzero"`
FailedAt time.Time `json:"failed_at,omitzero"`
SkippedAt time.Time `json:"skipped_at,omitzero"`
}
RunInfo represents the details of a task or flow execution
func GetFlowRun ¶
GetFlowRun retrieves a specific flow run result by ID.
func GetTaskRun ¶
GetTaskRun retrieves a specific task run result by ID.
func ListFlowRuns ¶
ListFlowRuns returns recent flow runs for the specified flow.
func ListTaskRuns ¶
ListTaskRuns returns recent task runs for the specified task.
type ScheduleOpt ¶
type ScheduleOpt func(*scheduleEntry)
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler manages scheduled task and flow executions using cron syntax.
func NewScheduler ¶
NewScheduler creates a new scheduler instance.
func (*Scheduler) AddFlow ¶
func (s *Scheduler) AddFlow(flowName string, schedule string, opts ...ScheduleOpt)
AddFlow registers a scheduled flow execution using cron syntax. The WithInput option can be used to provide dynamic input at execution time. Otherwise an empty JSON object will be used as input to the flow.
All scheduled flow executions use idempotency deduplication keyed on the scheduled execution time. This means exactly one execution per cron tick will occur even when running multiple workers concurrently.
func (*Scheduler) AddTask ¶
func (s *Scheduler) AddTask(taskName string, schedule string, opts ...ScheduleOpt)
AddTask registers a scheduled task execution using cron syntax. The WithInput option can be used to provide dynamic input at execution time. Otherwise an empty JSON object will be used as input to the task.
All scheduled task executions use idempotency deduplication keyed on the scheduled execution time. This means exactly one execution per cron tick will occur even when running multiple workers concurrently.
func (*Scheduler) Start ¶
Start begins executing scheduled tasks and flows. Returns an error if any schedule fails to register. The scheduler will continue until ctx is cancelled or Stop is called.
Deduplication strategy: All scheduled runs use IdempotencyKey derived from the scheduled execution time in UTC (format: "schedule:<unix_seconds>"). This ensures: - Multiple workers/machines generate identical keys for the same cron tick - One execution per scheduled tick, even after completion - Retries allowed on failed runs (idempotency persists across completion) - No clock skew or timezone issues (all times normalized to UTC)
type Step ¶
type Step struct {
Name string `json:"name"`
Condition string `json:"condition,omitempty"`
HasSignal bool `json:"has_signal"`
DependsOn []*StepDependency `json:"depends_on,omitempty"`
// contains filtered or unexported fields
}
type StepDependency ¶
StepDependency represents a dependency on another step's output.
func Dependency ¶
func Dependency(name string) *StepDependency
Dependency creates a step dependency reference by name. Used when defining flow steps that depend on other steps.
Example: Dependency("analyze")
func OptionalDependency ¶
func OptionalDependency(name string) *StepDependency
OptionalDependency creates a dependency that may be skipped. When used, the corresponding handler argument must be Optional[T]. Use this when depending on a step that has a condition and may be skipped.
type StepDependencyInfo ¶
type StepDependencyInfo struct {
Name string `json:"name"`
}
type StepHandlerInfo ¶
type StepInfo ¶
type StepInfo struct {
Name string `json:"name"`
DependsOn []StepDependencyInfo `json:"depends_on,omitempty"`
}
type StepRunInfo ¶
type StepRunInfo struct {
ID int64 `json:"id"`
StepName string `json:"step_name"`
Status string `json:"status"`
Output json.RawMessage `json:"output,omitempty"`
ErrorMessage string `json:"error_message,omitempty"`
StartedAt time.Time `json:"started_at,omitzero"`
CompletedAt time.Time `json:"completed_at,omitzero"`
FailedAt time.Time `json:"failed_at,omitzero"`
SkippedAt time.Time `json:"skipped_at,omitzero"`
}
StepRunInfo represents the execution state of a single step within a flow run.
func GetFlowRunSteps ¶
func GetFlowRunSteps(ctx context.Context, conn Conn, flowName string, flowRunID int64) ([]*StepRunInfo, error)
GetFlowRunSteps retrieves all step runs for a specific flow run.
type Task ¶
type Task struct {
Name string `json:"name"`
// contains filtered or unexported fields
}
Task represents a task definition with a generic typed handler
func NewTask ¶
func NewTask[In, Out any](name string, fn func(context.Context, In) (Out, error), opts ...HandlerOpt) *Task
NewTask creates a new task with a generic handler function The handler receives typed input and returns typed output Input and output types are automatically marshaled to/from JSON
type TaskHandlerInfo ¶
type TaskHandlerInfo struct {
TaskName string `json:"task_name"`
}
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker processes tasks and flows from the queue
func NewWorker ¶
NewWorker creates a new worker with the given options The worker will register all tasks and flows it has been configured with. Garbage collection is automatically enabled and runs every 5 minutes to clean up expired queues and stale worker heartbeats.
func (*Worker) Start ¶
Start begins processing tasks and flows.
The worker will:
- poll for new work and execute task and flow step handlers while ctx is active
- run any configured cron-style task and flow schedules
- send periodic heartbeats while it is running
Shutdown behaviour:
- when ctx is cancelled the worker immediately stops reading new work and begins shutting down
- if WithGracefulShutdown is set to a value > 0, that duration is used as a grace period for in‑flight handlers after ctx is cancelled; once the grace period expires the handler context is cancelled and remaining handlers are asked to stop. The default graceful shutdown timeout is 5 seconds.
- if WithGracefulShutdown is not set or set to 0, there is no grace period: the handler context is cancelled immediately once ctx is cancelled and Start returns after all goroutines finish
type WorkerInfo ¶
type WorkerInfo struct {
ID string `json:"id"`
TaskHandlers []*TaskHandlerInfo `json:"task_handlers"`
StepHandlers []*StepHandlerInfo `json:"step_handlers"`
StartedAt time.Time `json:"started_at"`
LastHeartbeatAt time.Time `json:"last_heartbeat_at"`
}
func ListWorkers ¶
func ListWorkers(ctx context.Context, conn Conn) ([]*WorkerInfo, error)
ListWorkers returns all registered workers.
type WorkerOpt ¶
type WorkerOpt func(*Worker)
WorkerOpt is an option for configuring a worker
func WithGracefulShutdown ¶
WithGracefulShutdown sets the graceful shutdown timeout for the worker. Default is 5 seconds.
func WithLogger ¶
WithLogger sets a custom logger for the worker. Default is slog.Default().
func WithScheduledFlow ¶
func WithScheduledFlow(flowName string, schedule string, opts ...ScheduleOpt) WorkerOpt
WithScheduledFlow registers a scheduled flow execution using cron syntax. The WithInput option can be used to provide dynamic input at execution time. Otherwise an empty JSON object will be used as input to the flow.
func WithScheduledTask ¶
func WithScheduledTask(taskName string, schedule string, opts ...ScheduleOpt) WorkerOpt
WithScheduledTask registers a scheduled task execution using cron syntax. The WithInput option can be used to provide dynamic input at execution time. Otherwise an empty JSON object will be used as input to the task.