Documentation
¶
Overview ¶
Package catbird provides a PostgreSQL-based message queue with task and workflow execution engine.
Index ¶
- Constants
- Variables
- func Bind(ctx context.Context, conn Conn, queueName string, pattern string) error
- func Cancel(ctx context.Context, opts ...CancelOpts) error
- func CancelFlowRun(ctx context.Context, conn Conn, flowName string, runID int64, ...) (bool, error)
- func CancelTaskRun(ctx context.Context, conn Conn, taskName string, runID int64, ...) (bool, error)
- func CompleteEarly(ctx context.Context, output any, reason string) error
- func CreateFlow(ctx context.Context, conn Conn, flow *Flow) error
- func CreateFlowSchedule(ctx context.Context, conn Conn, flowName, cronSpec string, opts ...ScheduleOpt) error
- func CreateQueue(ctx context.Context, conn Conn, queueName string, opts ...QueueOpts) error
- func CreateTask(ctx context.Context, conn Conn, task *Task) error
- func CreateTaskSchedule(ctx context.Context, conn Conn, taskName, cronSpec string, opts ...ScheduleOpt) error
- func Delete(ctx context.Context, conn Conn, queueName string, id int64) (bool, error)
- func DeleteMany(ctx context.Context, conn Conn, queueName string, ids []int64) ([]int64, error)
- func DeleteQueue(ctx context.Context, conn Conn, queueName string) (bool, error)
- func Hide(ctx context.Context, conn Conn, queueName string, id int64, ...) (bool, error)
- func HideMany(ctx context.Context, conn Conn, queueName string, ids []int64, ...) ([]int64, error)
- func MigrateDownTo(ctx context.Context, db *sql.DB, version int) error
- func MigrateUpTo(ctx context.Context, db *sql.DB, version int) error
- func Publish(ctx context.Context, conn Conn, topic string, payload any, opts ...PublishOpts) (int, error)
- func PublishMany(ctx context.Context, conn Conn, topic string, payloads []any, ...) (int, error)
- func PublishManyQuery(topic string, payloads []any, opts ...PublishManyOpts) (string, []any, error)
- func PublishQuery(topic string, payload any, opts ...PublishOpts) (string, []any, error)
- func PurgeFlowRuns(ctx context.Context, conn Conn, flowName string, olderThan time.Duration) (int, error)
- func PurgeTaskRuns(ctx context.Context, conn Conn, taskName string, olderThan time.Duration) (int, error)
- func RunFlowQuery(flowName string, input any, opts ...RunFlowOpts) (string, []any, error)
- func RunTaskQuery(taskName string, input any, opts ...RunTaskOpts) (string, []any, error)
- func Send(ctx context.Context, conn Conn, queueName string, payload any, ...) error
- func SendMany(ctx context.Context, conn Conn, queueName string, payloads []any, ...) error
- func SendManyQuery(queueName string, payloads []any, opts ...SendManyOpts) (string, []any, error)
- func SendQuery(queueName string, payload any, opts ...SendOpts) (string, []any, error)
- func SignalFlow(ctx context.Context, conn Conn, flowName string, flowRunID int64, ...) error
- func Unbind(ctx context.Context, conn Conn, queueName string, pattern string) (bool, error)
- type BackoffStrategy
- type CancelOpts
- type CircuitBreaker
- type CircuitBreakerStrategy
- type Client
- func (c *Client) Bind(ctx context.Context, queueName string, pattern string) error
- func (c *Client) CancelFlowRun(ctx context.Context, flowName string, runID int64, opts ...CancelOpts) (bool, error)
- func (c *Client) CancelTaskRun(ctx context.Context, taskName string, runID int64, opts ...CancelOpts) (bool, error)
- func (c *Client) CreateFlow(ctx context.Context, flow *Flow) error
- func (c *Client) CreateFlowSchedule(ctx context.Context, flowName, cronSpec string, opts ...ScheduleOpt) error
- func (c *Client) CreateQueue(ctx context.Context, queueName string, opts ...QueueOpts) error
- func (c *Client) CreateTask(ctx context.Context, task *Task) error
- func (c *Client) CreateTaskSchedule(ctx context.Context, taskName, cronSpec string, opts ...ScheduleOpt) error
- func (c *Client) Delete(ctx context.Context, queueName string, id int64) (bool, error)
- func (c *Client) DeleteMany(ctx context.Context, queueName string, ids []int64) ([]int64, error)
- func (c *Client) DeleteQueue(ctx context.Context, queueName string) (bool, error)
- func (c *Client) GC(ctx context.Context) (*GCInfo, error)
- func (c *Client) GetFlow(ctx context.Context, flowName string) (*FlowInfo, error)
- func (c *Client) GetFlowRun(ctx context.Context, flowName string, flowRunID int64) (*FlowRunInfo, error)
- func (c *Client) GetFlowRunSteps(ctx context.Context, flowName string, flowRunID int64) ([]*StepRunInfo, error)
- func (c *Client) GetQueue(ctx context.Context, queueName string) (*QueueInfo, error)
- func (c *Client) GetTask(ctx context.Context, taskName string) (*TaskInfo, error)
- func (c *Client) GetTaskRun(ctx context.Context, taskName string, taskRunID int64) (*TaskRunInfo, error)
- func (c *Client) Hide(ctx context.Context, queueName string, id int64, hideFor time.Duration) (bool, error)
- func (c *Client) HideMany(ctx context.Context, queueName string, ids []int64, hideFor time.Duration) ([]int64, error)
- func (c *Client) ListFlowRuns(ctx context.Context, flowName string) ([]*FlowRunInfo, error)
- func (c *Client) ListFlowSchedules(ctx context.Context) ([]*FlowScheduleInfo, error)
- func (c *Client) ListFlows(ctx context.Context) ([]*FlowInfo, error)
- func (c *Client) ListQueues(ctx context.Context) ([]*QueueInfo, error)
- func (c *Client) ListTaskRuns(ctx context.Context, taskName string) ([]*TaskRunInfo, error)
- func (c *Client) ListTaskSchedules(ctx context.Context) ([]*TaskScheduleInfo, error)
- func (c *Client) ListTasks(ctx context.Context) ([]*TaskInfo, error)
- func (c *Client) ListWorkers(ctx context.Context) ([]*WorkerInfo, error)
- func (c *Client) NewWorker() *Worker
- func (c *Client) Publish(ctx context.Context, topic string, payload any, opts ...PublishOpts) (int, error)
- func (c *Client) PublishMany(ctx context.Context, topic string, payloads []any, opts ...PublishManyOpts) (int, error)
- func (c *Client) PurgeFlowRuns(ctx context.Context, flowName string, olderThan time.Duration) (int, error)
- func (c *Client) PurgeTaskRuns(ctx context.Context, taskName string, olderThan time.Duration) (int, error)
- func (c *Client) Read(ctx context.Context, queueName string, quantity int, hideFor time.Duration) ([]Message, error)
- func (c *Client) ReadPoll(ctx context.Context, queueName string, quantity int, hideFor time.Duration, ...) ([]Message, error)
- func (c *Client) RunFlow(ctx context.Context, flowName string, input any, opts ...RunFlowOpts) (*FlowHandle, error)
- func (c *Client) RunTask(ctx context.Context, taskName string, input any, opts ...RunTaskOpts) (*TaskHandle, error)
- func (c *Client) Send(ctx context.Context, queueName string, payload any, opts ...SendOpts) error
- func (c *Client) SendMany(ctx context.Context, queueName string, payloads []any, opts ...SendManyOpts) error
- func (c *Client) SignalFlow(ctx context.Context, flowName string, flowRunID int64, stepName string, ...) error
- func (c *Client) Unbind(ctx context.Context, queueName string, pattern string) (bool, error)
- type Conn
- type Flow
- func (f *Flow) AddStep(step *Step) *Flow
- func (f *Flow) OnFail(fn any, opts ...HandlerOpt) *Flow
- func (f *Flow) Output(stepName string) *Flow
- func (f *Flow) OutputPriority(stepNames ...string) *Flow
- func (f *Flow) RetentionPeriod(d time.Duration) *Flow
- func (f *Flow) WithDescription(description string) *Flow
- type FlowFailure
- type FlowHandle
- type FlowInfo
- type FlowRunInfo
- type FlowScheduleInfo
- type FullJitterBackoff
- type GCInfo
- type HandlerOpt
- func WithBatchSize(batchSize int) HandlerOpt
- func WithCircuitBreaker(failureThreshold int, openTimeout time.Duration) HandlerOpt
- func WithConcurrency(concurrency int) HandlerOpt
- func WithFullJitterBackoff(minDelay, maxDelay time.Duration) HandlerOpt
- func WithMaxRetries(maxRetries int) HandlerOpt
- func WithTimeout(timeout time.Duration) HandlerOpt
- type Message
- type Optional
- type PublishManyOpts
- type PublishOpts
- type QueueInfo
- type QueueOpts
- type ReadPollOpts
- type RunFlowOpts
- type RunTaskOpts
- type ScheduleOpt
- type SendManyOpts
- type SendOpts
- type Step
- func (s *Step) DependsOn(deps ...string) *Step
- func (s *Step) Do(fn any, opts ...HandlerOpt) *Step
- func (s *Step) Generate(fn any) *Step
- func (s *Step) Map(fn any, opts ...HandlerOpt) *Step
- func (s *Step) MapFlowInput() *Step
- func (s *Step) MapStepOutput(stepName string) *Step
- func (s *Step) Reduce(initial any, fn any) *Step
- func (s *Step) ReduceStep(stepName string) *Step
- func (s *Step) WithCondition(condition string) *Step
- func (s *Step) WithDescription(description string) *Step
- func (s *Step) WithSignal() *Step
- type StepDependencyInfo
- type StepHandlerInfo
- type StepInfo
- type StepRunInfo
- type StepType
- type Task
- type TaskFailure
- type TaskHandle
- type TaskHandlerInfo
- type TaskInfo
- type TaskRunInfo
- type TaskScheduleInfo
- type WaitOpts
- type Worker
- type WorkerInfo
Constants ¶
const ( StatusWaitingForDependencies = "waiting_for_dependencies" StatusWaitingForSignal = "waiting_for_signal" StatusWaitingForMapTasks = "waiting_for_map_tasks" StatusQueued = "queued" StatusStarted = "started" StatusCanceling = "canceling" StatusCompleted = "completed" StatusFailed = "failed" StatusSkipped = "skipped" StatusCanceled = "canceled" )
const SchemaVersion = 13
Variables ¶
var ( // ErrRunFailed is returned when you try to unmarshal the output of a failed task or flow run ErrRunFailed = fmt.Errorf("run failed") // ErrRunCanceled is returned when you try to wait for output from a canceled task or flow run ErrRunCanceled = fmt.Errorf("run canceled") // ErrNotFound is returned when a requested run or resource cannot be found ErrNotFound = fmt.Errorf("not found") // ErrNoRunContext is returned when cancellation helpers are called outside handler run context ErrNoRunContext = fmt.Errorf("no run context") // ErrUnknownStepOutput is returned when a requested step output is not present in completed outputs. ErrUnknownStepOutput = fmt.Errorf("unknown step output") // ErrNoFailedStepInput is returned when failed step input is not available. ErrNoFailedStepInput = fmt.Errorf("failed step input not available") // ErrNoFailedStepSignal is returned when failed step signal input is not available. ErrNoFailedStepSignal = fmt.Errorf("failed step signal input not available") // ErrNoOutputCandidate is returned when a flow completes without any configured output candidate producing output. ErrNoOutputCandidate = fmt.Errorf("no output candidate produced output") )
Functions ¶
func Bind ¶
Bind subscribes a queue to a topic pattern. Pattern supports exact topics and wildcards: * (single token), # (multi-token tail). Examples: "foo.bar", "foo.*.bar", "foo.bar.#"
func Cancel ¶ added in v0.0.8
func Cancel(ctx context.Context, opts ...CancelOpts) error
Cancel requests cancellation for the current run from inside a task or flow handler.
func CancelFlowRun ¶ added in v0.0.8
func CancelFlowRun(ctx context.Context, conn Conn, flowName string, runID int64, opts ...CancelOpts) (bool, error)
CancelFlowRun cancels a flow run. Returns true when the run exists (including idempotent no-op), false when it does not exist.
func CancelTaskRun ¶ added in v0.0.8
func CancelTaskRun(ctx context.Context, conn Conn, taskName string, runID int64, opts ...CancelOpts) (bool, error)
CancelTaskRun cancels a task run. Returns true when the run exists (including idempotent no-op), false when it does not exist.
func CompleteEarly ¶ added in v0.0.8
CompleteEarly requests early completion for the current flow run.
Return this from a flow step handler to signal that the flow should complete early with the provided output and optional reason.
func CreateFlow ¶
CreateFlow creates a flow definition.
func CreateFlowSchedule ¶ added in v0.0.5
func CreateFlowSchedule(ctx context.Context, conn Conn, flowName, cronSpec string, opts ...ScheduleOpt) error
CreateFlowSchedule creates a cron-based schedule for a flow.
func CreateQueue ¶
CreateQueue creates a queue with the given name and optional options. Use Bind() separately to create topic bindings.
func CreateTask ¶
CreateTask creates a task definition.
func CreateTaskSchedule ¶ added in v0.0.5
func CreateTaskSchedule(ctx context.Context, conn Conn, taskName, cronSpec string, opts ...ScheduleOpt) error
CreateTaskSchedule creates a cron-based schedule for a task.
func DeleteMany ¶
DeleteMany deletes multiple messages from the queue. Returns the IDs that were actually deleted.
func DeleteQueue ¶
DeleteQueue deletes a queue and all its messages. Returns true if the queue existed.
func Hide ¶
func Hide(ctx context.Context, conn Conn, queueName string, id int64, hideFor time.Duration) (bool, error)
Hide hides a single message from being read for the specified duration. Returns true if the message existed.
func HideMany ¶
func HideMany(ctx context.Context, conn Conn, queueName string, ids []int64, hideFor time.Duration) ([]int64, error)
HideMany hides multiple messages from being read for the specified duration. Returns the IDs that were actually hidden.
func Publish ¶ added in v0.0.3
func Publish(ctx context.Context, conn Conn, topic string, payload any, opts ...PublishOpts) (int, error)
Publish sends a message to topic-subscribed queues with options. Pass no opts to use defaults.
func PublishMany ¶ added in v0.0.8
func PublishMany(ctx context.Context, conn Conn, topic string, payloads []any, opts ...PublishManyOpts) (int, error)
PublishMany sends multiple messages to topic-subscribed queues with options. Pass no opts to use defaults.
func PublishManyQuery ¶ added in v0.0.8
PublishManyQuery builds the SQL query and args for a PublishMany operation. Pass no opts to use defaults.
func PublishQuery ¶ added in v0.0.4
PublishQuery builds the SQL query and args for a Publish operation. Pass no opts to use defaults.
func PurgeFlowRuns ¶ added in v0.0.8
func PurgeFlowRuns(ctx context.Context, conn Conn, flowName string, olderThan time.Duration) (int, error)
PurgeFlowRuns deletes terminal flow runs (completed, failed, canceled) older than the given duration. Step runs and map tasks are deleted via cascade. Useful for manual cleanup or targeted removal independent of the configured retention period.
func PurgeTaskRuns ¶ added in v0.0.8
func PurgeTaskRuns(ctx context.Context, conn Conn, taskName string, olderThan time.Duration) (int, error)
PurgeTaskRuns deletes terminal task runs (completed, failed, skipped, canceled) older than the given duration. Useful for manual cleanup or targeted removal independent of the configured retention period.
func RunFlowQuery ¶ added in v0.0.4
RunFlowQuery builds the SQL query and args for a RunFlow operation. Pass no opts to use defaults.
func RunTaskQuery ¶ added in v0.0.4
RunTaskQuery builds the SQL query and args for a RunTask operation. Pass no opts to use defaults.
func SendMany ¶ added in v0.0.8
func SendMany(ctx context.Context, conn Conn, queueName string, payloads []any, opts ...SendManyOpts) error
SendMany enqueues multiple messages to the specified queue. Pass no opts to use defaults.
func SendManyQuery ¶ added in v0.0.8
SendManyQuery builds the SQL query and args for a SendMany operation. Pass no opts to use defaults.
func SendQuery ¶ added in v0.0.4
SendQuery builds the SQL query and args for a Send operation. Pass no opts to use defaults.
func SignalFlow ¶
func SignalFlow(ctx context.Context, conn Conn, flowName string, flowRunID int64, stepName string, input any) error
SignalFlow delivers a signal to a waiting step in a flow run. The step must have been defined with `.WithSignal()`. Signals enable human-in-the-loop workflows where a step waits for external input before executing. Returns an error if the signal was already delivered or the step doesn't require a signal.
Types ¶
type BackoffStrategy ¶ added in v0.0.3
type BackoffStrategy interface {
// Validate returns an error if configuration is invalid.
Validate() error
// NextDelay returns a delay for a zero-based delivery count (first retry = 0).
// Implementations should always return a positive duration.
NextDelay(deliveryCount int) time.Duration
}
BackoffStrategy defines how retry delays are calculated based on delivery count. Implementations must be safe for concurrent use.
type CancelOpts ¶ added in v0.0.8
type CancelOpts struct {
Reason string
}
CancelOpts configures cancellation behavior and metadata.
type CircuitBreaker ¶ added in v0.0.3
type CircuitBreaker struct {
// contains filtered or unexported fields
}
func NewCircuitBreaker ¶ added in v0.0.3
func NewCircuitBreaker(failureThreshold int, openTimeout time.Duration) *CircuitBreaker
func (*CircuitBreaker) RecordFailure ¶ added in v0.0.3
func (c *CircuitBreaker) RecordFailure(now time.Time)
func (*CircuitBreaker) RecordSuccess ¶ added in v0.0.3
func (c *CircuitBreaker) RecordSuccess()
func (*CircuitBreaker) Validate ¶ added in v0.0.3
func (c *CircuitBreaker) Validate() error
type CircuitBreakerStrategy ¶ added in v0.0.3
type CircuitBreakerStrategy interface {
// Validate returns an error if configuration is invalid.
Validate() error
// Allow returns whether a call is permitted and how long to wait if not.
Allow(now time.Time) (bool, time.Duration)
// RecordSuccess updates breaker state after a successful call.
RecordSuccess()
// RecordFailure updates breaker state after a failed call.
RecordFailure(now time.Time)
}
CircuitBreakerStrategy defines the interface for circuit breaker behavior. Implementations must be safe for concurrent use.
type Client ¶
type Client struct {
Conn Conn
}
Client is a facade for interacting with Catbird
func New ¶
New creates a new Client with the given database connection.
The connection can be a *pgxpool.Pool, *pgx.Conn, or pgx.Tx.
func (*Client) Bind ¶
Bind subscribes a queue to a topic pattern. Pattern supports exact topics and wildcards: * (single token), # (multi-token tail). Examples: "foo.bar", "foo.*.bar", "foo.bar.#"
func (*Client) CancelFlowRun ¶ added in v0.0.8
func (c *Client) CancelFlowRun(ctx context.Context, flowName string, runID int64, opts ...CancelOpts) (bool, error)
CancelFlowRun cancels a flow run.
func (*Client) CancelTaskRun ¶ added in v0.0.8
func (c *Client) CancelTaskRun(ctx context.Context, taskName string, runID int64, opts ...CancelOpts) (bool, error)
CancelTaskRun cancels a task run.
func (*Client) CreateFlow ¶
CreateFlow creates a flow definition.
func (*Client) CreateFlowSchedule ¶ added in v0.0.5
func (c *Client) CreateFlowSchedule(ctx context.Context, flowName, cronSpec string, opts ...ScheduleOpt) error
CreateFlowSchedule creates a cron-based schedule for a flow. cronSpec should be in 5-field format (min hour day month dow) or descriptors like @hourly, @daily. opts are optional ScheduleOpt values configuring the schedule.
func (*Client) CreateQueue ¶
CreateQueue creates a queue with the given name and optional options.
func (*Client) CreateTask ¶
CreateTask creates a task definition.
func (*Client) CreateTaskSchedule ¶ added in v0.0.5
func (c *Client) CreateTaskSchedule(ctx context.Context, taskName, cronSpec string, opts ...ScheduleOpt) error
CreateTaskSchedule creates a cron-based schedule for a task. cronSpec should be in 5-field format (min hour day month dow) or descriptors like @hourly, @daily. opts are optional ScheduleOpt values configuring the schedule.
func (*Client) Delete ¶
Delete deletes a single message from the queue. Returns true if the message existed.
func (*Client) DeleteMany ¶
DeleteMany deletes multiple messages from the queue.
func (*Client) DeleteQueue ¶
DeleteQueue deletes a queue and all its messages. Returns true if the queue existed.
func (*Client) GetFlowRun ¶
func (c *Client) GetFlowRun(ctx context.Context, flowName string, flowRunID int64) (*FlowRunInfo, error)
GetFlowRun retrieves a specific flow run result by ID.
func (*Client) GetFlowRunSteps ¶
func (c *Client) GetFlowRunSteps(ctx context.Context, flowName string, flowRunID int64) ([]*StepRunInfo, error)
GetFlowRunSteps retrieves all step runs for a specific flow run.
func (*Client) GetTaskRun ¶
func (c *Client) GetTaskRun(ctx context.Context, taskName string, taskRunID int64) (*TaskRunInfo, error)
GetTaskRun retrieves a specific task run result by ID.
func (*Client) Hide ¶
func (c *Client) Hide(ctx context.Context, queueName string, id int64, hideFor time.Duration) (bool, error)
Hide hides a single message from being read for the specified duration. Returns true if the message existed.
func (*Client) HideMany ¶
func (c *Client) HideMany(ctx context.Context, queueName string, ids []int64, hideFor time.Duration) ([]int64, error)
HideMany hides multiple messages from being read for the specified duration.
func (*Client) ListFlowRuns ¶
ListFlowRuns returns recent flow runs for the specified flow.
func (*Client) ListFlowSchedules ¶ added in v0.0.5
func (c *Client) ListFlowSchedules(ctx context.Context) ([]*FlowScheduleInfo, error)
ListFlowSchedules returns all flow schedules ordered by next_run_at.
func (*Client) ListQueues ¶
ListQueues returns all queues
func (*Client) ListTaskRuns ¶
ListTaskRuns returns recent task runs for the specified task.
func (*Client) ListTaskSchedules ¶ added in v0.0.5
func (c *Client) ListTaskSchedules(ctx context.Context) ([]*TaskScheduleInfo, error)
ListTaskSchedules returns all task schedules ordered by next_run_at.
func (*Client) ListWorkers ¶
func (c *Client) ListWorkers(ctx context.Context) ([]*WorkerInfo, error)
ListWorkers returns all registered workers.
func (*Client) NewWorker ¶
NewWorker creates a new worker that processes task and flow executions. Use the builder pattern methods (AddTask, AddFlow, etc.) to configure, then call Start(ctx) to begin processing.
func (*Client) Publish ¶ added in v0.0.3
func (c *Client) Publish(ctx context.Context, topic string, payload any, opts ...PublishOpts) (int, error)
Publish sends a message to all queues subscribed to the specified topic.
func (*Client) PublishMany ¶ added in v0.0.8
func (c *Client) PublishMany(ctx context.Context, topic string, payloads []any, opts ...PublishManyOpts) (int, error)
PublishMany sends multiple messages to all queues subscribed to the specified topic.
func (*Client) PurgeFlowRuns ¶ added in v0.0.8
func (c *Client) PurgeFlowRuns(ctx context.Context, flowName string, olderThan time.Duration) (int, error)
PurgeFlowRuns deletes terminal flow runs older than the given duration. See PurgeFlowRuns for details.
func (*Client) PurgeTaskRuns ¶ added in v0.0.8
func (c *Client) PurgeTaskRuns(ctx context.Context, taskName string, olderThan time.Duration) (int, error)
PurgeTaskRuns deletes terminal task runs older than the given duration. See PurgeTaskRuns for details.
func (*Client) Read ¶
func (c *Client) Read(ctx context.Context, queueName string, quantity int, hideFor time.Duration) ([]Message, error)
Read reads up to quantity messages from the queue, hiding them from other readers for the specified duration.
func (*Client) ReadPoll ¶
func (c *Client) ReadPoll(ctx context.Context, queueName string, quantity int, hideFor time.Duration, opts ...ReadPollOpts) ([]Message, error)
ReadPoll reads messages from a queue with polling support. It polls repeatedly at the specified interval until messages are available or the pollFor timeout is reached.
func (*Client) RunFlow ¶
func (c *Client) RunFlow(ctx context.Context, flowName string, input any, opts ...RunFlowOpts) (*FlowHandle, error)
RunFlow enqueues a flow execution and returns a handle for monitoring.
func (*Client) RunTask ¶
func (c *Client) RunTask(ctx context.Context, taskName string, input any, opts ...RunTaskOpts) (*TaskHandle, error)
RunTask enqueues a task execution and returns a handle for monitoring progress and retrieving output.
func (*Client) SendMany ¶ added in v0.0.8
func (c *Client) SendMany(ctx context.Context, queueName string, payloads []any, opts ...SendManyOpts) error
SendMany enqueues multiple messages to the specified queue.
func (*Client) SignalFlow ¶
func (c *Client) SignalFlow(ctx context.Context, flowName string, flowRunID int64, stepName string, input any) error
SignalFlow delivers a signal to a waiting step in a flow run. The step must have been defined with a signal variant (e.g., NewStepWithSignal). Returns an error if the signal was already delivered or the step doesn't require a signal.
type Conn ¶
type Conn interface {
Exec(context.Context, string, ...any) (pgconn.CommandTag, error)
Query(context.Context, string, ...any) (pgx.Rows, error)
QueryRow(context.Context, string, ...any) pgx.Row
}
Conn is an interface for database connections compatible with pgx.Conn and pgx.Pool
type Flow ¶
type Flow struct {
// contains filtered or unexported fields
}
func (*Flow) OnFail ¶ added in v0.0.8
func (f *Flow) OnFail(fn any, opts ...HandlerOpt) *Flow
OnFail sets a flow failure handler and execution options. fn must have signature (context.Context, In, FlowFailure) error. If opts is omitted, defaults are used (concurrency: 4, batchSize: 64, timeout: 30s, maxRetries: 2 with full-jitter backoff 100ms-2s).
func (*Flow) OutputPriority ¶ added in v0.0.8
func (*Flow) RetentionPeriod ¶ added in v0.0.8
RetentionPeriod sets how long terminal runs (completed, failed, canceled) are retained before being deleted by the garbage collector. A zero duration disables cleanup.
func (*Flow) WithDescription ¶ added in v0.0.8
type FlowFailure ¶ added in v0.0.8
type FlowFailure struct {
FlowName string `json:"flow_name"`
FlowRunID int64 `json:"flow_run_id"`
FailedStepName string `json:"failed_step_name,omitempty"`
ErrorMessage string `json:"error_message"`
Attempts int `json:"attempts"`
OnFailAttempts int `json:"on_fail_attempts"`
StartedAt time.Time `json:"started_at,omitzero"`
FailedAt time.Time `json:"failed_at,omitzero"`
ConcurrencyKey string `json:"concurrency_key,omitempty"`
IdempotencyKey string `json:"idempotency_key,omitempty"`
FailedStepInput json.RawMessage `json:"failed_step_input,omitempty"`
FailedStepSignalInput json.RawMessage `json:"failed_step_signal_input,omitempty"`
// contains filtered or unexported fields
}
func (FlowFailure) FailedStepInputAs ¶ added in v0.0.8
func (f FlowFailure) FailedStepInputAs(out any) error
func (FlowFailure) FailedStepSignalAs ¶ added in v0.0.8
func (f FlowFailure) FailedStepSignalAs(out any) error
func (FlowFailure) Output ¶ added in v0.0.8
func (f FlowFailure) Output(ctx context.Context, step string) (json.RawMessage, error)
type FlowHandle ¶ added in v0.0.6
FlowHandle is a handle to a flow execution.
func RunFlow ¶
func RunFlow(ctx context.Context, conn Conn, flowName string, input any, opts ...RunFlowOpts) (*FlowHandle, error)
RunFlow enqueues a flow execution and returns a handle for monitoring.
func (*FlowHandle) WaitForOutput ¶ added in v0.0.6
WaitForOutput blocks until the flow execution completes and unmarshals the output. Pass optional WaitOpts to customize polling behavior; defaults are used when omitted.
type FlowInfo ¶
type FlowInfo struct {
Name string `json:"name"`
Description string `json:"description,omitempty"`
Steps []StepInfo `json:"steps"`
OutputPriority []string `json:"output_priority,omitempty"`
RetentionPeriod time.Duration `json:"retention_period,omitzero"`
CreatedAt time.Time `json:"created_at"`
}
type FlowRunInfo ¶ added in v0.0.6
type FlowRunInfo struct {
ID int64 `json:"id"`
ConcurrencyKey string `json:"concurrency_key,omitempty"`
IdempotencyKey string `json:"idempotency_key,omitempty"`
Status string `json:"status"`
Input json.RawMessage `json:"input,omitempty"`
Headers json.RawMessage `json:"headers,omitempty"`
Output json.RawMessage `json:"output,omitempty"`
ErrorMessage string `json:"error_message,omitempty"`
CancelReason string `json:"cancel_reason,omitempty"`
CancelRequestedAt time.Time `json:"cancel_requested_at,omitzero"`
CanceledAt time.Time `json:"canceled_at,omitzero"`
StartedAt time.Time `json:"started_at,omitzero"`
CompletedAt time.Time `json:"completed_at,omitzero"`
FailedAt time.Time `json:"failed_at,omitzero"`
}
FlowRunInfo represents the details of a flow execution.
func GetFlowRun ¶
func GetFlowRun(ctx context.Context, conn Conn, flowName string, flowRunID int64) (*FlowRunInfo, error)
GetFlowRun retrieves a specific flow run result by ID.
func ListFlowRuns ¶
ListFlowRuns returns recent flow runs for the specified flow.
func (*FlowRunInfo) IsCompleted ¶ added in v0.0.8
func (r *FlowRunInfo) IsCompleted() bool
IsCompleted reports whether the flow run completed successfully.
func (*FlowRunInfo) IsDone ¶ added in v0.0.8
func (r *FlowRunInfo) IsDone() bool
IsDone reports whether the flow run reached a terminal state.
func (*FlowRunInfo) OutputAs ¶ added in v0.0.6
func (r *FlowRunInfo) OutputAs(out any) error
OutputAs unmarshals the output of a completed flow run. Returns an error if the flow run has failed or is not completed yet.
type FlowScheduleInfo ¶ added in v0.0.5
type FlowScheduleInfo struct {
FlowName string `json:"flow_name"`
CronSpec string `json:"cron_spec"`
NextRunAt time.Time `json:"next_run_at"`
LastRunAt time.Time `json:"last_run_at,omitzero"`
LastEnqueuedAt time.Time `json:"last_enqueued_at,omitzero"`
Enabled bool `json:"enabled"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
FlowScheduleInfo contains metadata about a scheduled flow.
func ListFlowSchedules ¶ added in v0.0.5
func ListFlowSchedules(ctx context.Context, conn Conn) ([]*FlowScheduleInfo, error)
ListFlowSchedules returns all flow schedules ordered by next_run_at.
type FullJitterBackoff ¶ added in v0.0.3
FullJitterBackoff implements exponential backoff with full jitter.
func NewFullJitterBackoff ¶ added in v0.0.3
func NewFullJitterBackoff(minDelay, maxDelay time.Duration) *FullJitterBackoff
NewFullJitterBackoff creates a FullJitterBackoff with the provided bounds.
func (*FullJitterBackoff) NextDelay ¶ added in v0.0.3
func (b *FullJitterBackoff) NextDelay(deliveryCount int) time.Duration
NextDelay returns the jittered delay for the given delivery count. deliveryCount is expected to be zero-based for the first retry.
func (*FullJitterBackoff) Validate ¶ added in v0.0.3
func (b *FullJitterBackoff) Validate() error
Validate checks the backoff configuration for consistency.
type GCInfo ¶ added in v0.0.8
type GCInfo struct {
ExpiredQueuesDeleted int `json:"expired_queues_deleted"`
StaleWorkersDeleted int `json:"stale_workers_deleted"`
TaskRunsPurged int `json:"task_runs_purged"`
FlowRunsPurged int `json:"flow_runs_purged"`
}
GCInfo is the garbage collection report returned by cb_gc().
type HandlerOpt ¶
type HandlerOpt func(*handlerOpts)
func WithBatchSize ¶
func WithBatchSize(batchSize int) HandlerOpt
WithBatchSize sets how many claims are fetched per poll.
func WithCircuitBreaker ¶
func WithCircuitBreaker(failureThreshold int, openTimeout time.Duration) HandlerOpt
WithCircuitBreaker sets optional circuit breaker strategy.
func WithConcurrency ¶
func WithConcurrency(concurrency int) HandlerOpt
WithConcurrency sets maximum concurrent handler executions.
func WithFullJitterBackoff ¶ added in v0.0.8
func WithFullJitterBackoff(minDelay, maxDelay time.Duration) HandlerOpt
WithFullJitterBackoff sets full-jitter retry backoff strategy.
func WithMaxRetries ¶
func WithMaxRetries(maxRetries int) HandlerOpt
WithMaxRetries sets retry attempts for handler failures.
func WithTimeout ¶ added in v0.0.8
func WithTimeout(timeout time.Duration) HandlerOpt
WithTimeout sets per-handler execution timeout.
type Message ¶
type Message struct {
ID int64 `json:"id"`
IdempotencyKey string `json:"idempotency_key,omitempty"`
Topic string `json:"topic"`
Payload json.RawMessage `json:"payload"`
Headers json.RawMessage `json:"headers,omitempty"`
Deliveries int `json:"deliveries"`
CreatedAt time.Time `json:"created_at"`
VisibleAt time.Time `json:"visible_at"`
}
Message represents a message in a queue
func Read ¶
func Read(ctx context.Context, conn Conn, queueName string, quantity int, hideFor time.Duration) ([]Message, error)
Read reads up to quantity messages from the queue, hiding them from other readers for the specified duration.
func ReadPoll ¶
func ReadPoll(ctx context.Context, conn Conn, queueName string, quantity int, hideFor time.Duration, opts ...ReadPollOpts) ([]Message, error)
ReadPoll reads messages from a queue with polling support. It polls repeatedly at the specified interval until messages are available or the pollFor timeout is reached. Pass optional ReadPollOpts to configure polling behavior; defaults are used when omitted.
type PublishManyOpts ¶ added in v0.0.8
type PublishOpts ¶ added in v0.0.3
type QueueInfo ¶
type QueueInfo struct {
Name string `json:"name"`
Description string `json:"description,omitempty"`
CreatedAt time.Time `json:"created_at"`
ExpiresAt time.Time `json:"expires_at,omitzero"`
}
type ReadPollOpts ¶ added in v0.0.6
ReadPollOpts configures ReadPoll polling behavior. Zero values use defaults.
type RunFlowOpts ¶
type RunTaskOpts ¶ added in v0.0.6
type ScheduleOpt ¶
type ScheduleOpt func(*scheduleOpts)
ScheduleOpt configures scheduled task/flow behavior.
func WithInput ¶
func WithInput(input any) ScheduleOpt
WithInput sets static input payload for scheduled task/flow runs.
type SendManyOpts ¶ added in v0.0.8
type Step ¶
type Step struct {
// contains filtered or unexported fields
}
func (*Step) MapFlowInput ¶ added in v0.0.8
func (*Step) MapStepOutput ¶ added in v0.0.8
func (*Step) ReduceStep ¶ added in v0.0.8
func (*Step) WithCondition ¶ added in v0.0.8
func (*Step) WithDescription ¶ added in v0.0.8
func (*Step) WithSignal ¶ added in v0.0.8
type StepDependencyInfo ¶
type StepDependencyInfo struct {
Name string `json:"name"`
}
type StepHandlerInfo ¶
type StepInfo ¶
type StepInfo struct {
Name string `json:"name"`
Description string `json:"description,omitempty"`
StepType StepType `json:"step_type,omitempty"`
MapSourceStepName string `json:"map_source_step_name,omitempty"`
ReduceSourceStepName string `json:"reduce_source_step_name,omitempty"`
Signal bool `json:"signal,omitempty"`
DependsOn []StepDependencyInfo `json:"depends_on,omitempty"`
}
type StepRunInfo ¶
type StepRunInfo struct {
ID int64 `json:"id"`
StepName string `json:"step_name"`
Status string `json:"status"`
Attempts int `json:"attempts"`
Output json.RawMessage `json:"output,omitempty"`
ErrorMessage string `json:"error_message,omitempty"`
CreatedAt time.Time `json:"created_at,omitzero"`
VisibleAt time.Time `json:"visible_at,omitzero"`
StartedAt time.Time `json:"started_at,omitzero"`
CompletedAt time.Time `json:"completed_at,omitzero"`
FailedAt time.Time `json:"failed_at,omitzero"`
SkippedAt time.Time `json:"skipped_at,omitzero"`
CanceledAt time.Time `json:"canceled_at,omitzero"`
}
StepRunInfo represents the execution state of a single step within a flow run.
func GetFlowRunSteps ¶
func GetFlowRunSteps(ctx context.Context, conn Conn, flowName string, flowRunID int64) ([]*StepRunInfo, error)
GetFlowRunSteps retrieves all step runs for a specific flow run.
func GetStep ¶ added in v0.0.8
func GetStep(ctx context.Context, stepName string) (*StepRunInfo, error)
GetStep retrieves status details for a step in the current flow run. Intended for use inside flow step handlers.
func WaitForStep ¶ added in v0.0.8
WaitForStep blocks until the given step reaches a terminal state in the current flow run. Pass optional WaitOpts to customize polling behavior; defaults are used when omitted.
func (*StepRunInfo) IsCompleted ¶ added in v0.0.8
func (r *StepRunInfo) IsCompleted() bool
IsCompleted reports whether the step run completed successfully.
func (*StepRunInfo) IsDone ¶ added in v0.0.8
func (r *StepRunInfo) IsDone() bool
IsDone reports whether the step run reached a terminal state.
type Task ¶
type Task struct {
// contains filtered or unexported fields
}
Task is a reflection-based task with optional handler. Use NewTask().Do(fn, opts) for tasks with handlers. Use NewTask() for definition-only tasks.
func NewTask ¶
NewTask creates a new task definition with the given name. Chain .Do() to add a handler, otherwise returns a definition-only task.
func (*Task) Do ¶ added in v0.0.8
func (t *Task) Do(fn any, opts ...HandlerOpt) *Task
Do sets the task handler function and execution options. fn must have signature (context.Context, In) (Out, error). If opts is omitted, defaults are used (concurrency: 4, batchSize: 64, timeout: 30s, maxRetries: 2 with full-jitter backoff 100ms-2s).
func (*Task) OnFail ¶ added in v0.0.8
func (t *Task) OnFail(fn any, opts ...HandlerOpt) *Task
OnFail sets a task failure handler and execution options. fn must have signature (context.Context, In, TaskFailure) error. If opts is omitted, defaults are used (concurrency: 4, batchSize: 64, timeout: 30s, maxRetries: 2 with full-jitter backoff 100ms-2s).
func (*Task) RetentionPeriod ¶ added in v0.0.8
RetentionPeriod sets how long terminal runs (completed, failed, skipped, canceled) are retained before being deleted by the garbage collector. A zero duration disables cleanup.
func (*Task) WithCondition ¶ added in v0.0.8
WithCondition sets the condition expression for the task.
func (*Task) WithDescription ¶ added in v0.0.8
WithDescription sets the description for the task definition.
type TaskFailure ¶ added in v0.0.8
type TaskFailure struct {
TaskName string `json:"task_name"`
TaskRunID int64 `json:"task_run_id"`
ErrorMessage string `json:"error_message"`
Attempts int `json:"attempts"`
OnFailAttempts int `json:"on_fail_attempts"`
StartedAt time.Time `json:"started_at,omitzero"`
FailedAt time.Time `json:"failed_at,omitzero"`
ConcurrencyKey string `json:"concurrency_key,omitempty"`
IdempotencyKey string `json:"idempotency_key,omitempty"`
}
type TaskHandle ¶ added in v0.0.6
TaskHandle is a handle to a task execution.
func RunTask ¶
func RunTask(ctx context.Context, conn Conn, taskName string, input any, opts ...RunTaskOpts) (*TaskHandle, error)
RunTask enqueues a task execution and returns a handle for monitoring progress and retrieving output.
func (*TaskHandle) WaitForOutput ¶ added in v0.0.6
WaitForOutput blocks until the task execution completes and unmarshals the output. Pass optional WaitOpts to customize polling behavior; defaults are used when omitted.
type TaskHandlerInfo ¶
type TaskHandlerInfo struct {
TaskName string `json:"task_name"`
}
type TaskInfo ¶
type TaskInfo struct {
Name string `json:"name"`
Description string `json:"description,omitempty"`
RetentionPeriod time.Duration `json:"retention_period,omitzero"`
CreatedAt time.Time `json:"created_at"`
}
type TaskRunInfo ¶ added in v0.0.6
type TaskRunInfo struct {
ID int64 `json:"id"`
ConcurrencyKey string `json:"concurrency_key,omitempty"`
IdempotencyKey string `json:"idempotency_key,omitempty"`
Status string `json:"status"`
Input json.RawMessage `json:"input,omitempty"`
Headers json.RawMessage `json:"headers,omitempty"`
Output json.RawMessage `json:"output,omitempty"`
ErrorMessage string `json:"error_message,omitempty"`
CancelReason string `json:"cancel_reason,omitempty"`
CancelRequestedAt time.Time `json:"cancel_requested_at,omitzero"`
CanceledAt time.Time `json:"canceled_at,omitzero"`
StartedAt time.Time `json:"started_at,omitzero"`
CompletedAt time.Time `json:"completed_at,omitzero"`
FailedAt time.Time `json:"failed_at,omitzero"`
SkippedAt time.Time `json:"skipped_at,omitzero"`
}
TaskRunInfo represents the details of a task execution.
func GetTaskRun ¶
func GetTaskRun(ctx context.Context, conn Conn, taskName string, taskRunID int64) (*TaskRunInfo, error)
GetTaskRun retrieves a specific task run result by ID.
func ListTaskRuns ¶
ListTaskRuns returns recent task runs for the specified task.
func (*TaskRunInfo) IsCompleted ¶ added in v0.0.8
func (r *TaskRunInfo) IsCompleted() bool
IsCompleted reports whether the task run completed successfully.
func (*TaskRunInfo) IsDone ¶ added in v0.0.8
func (r *TaskRunInfo) IsDone() bool
IsDone reports whether the task run reached a terminal state.
func (*TaskRunInfo) OutputAs ¶ added in v0.0.6
func (r *TaskRunInfo) OutputAs(out any) error
OutputAs unmarshals the output of a completed task run. Returns an error if the task run has failed or is not completed yet.
type TaskScheduleInfo ¶ added in v0.0.5
type TaskScheduleInfo struct {
TaskName string `json:"task_name"`
CronSpec string `json:"cron_spec"`
NextRunAt time.Time `json:"next_run_at"`
LastRunAt time.Time `json:"last_run_at,omitzero"`
LastEnqueuedAt time.Time `json:"last_enqueued_at,omitzero"`
Enabled bool `json:"enabled"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
TaskScheduleInfo contains metadata about a scheduled task.
func ListTaskSchedules ¶ added in v0.0.5
func ListTaskSchedules(ctx context.Context, conn Conn) ([]*TaskScheduleInfo, error)
ListTaskSchedules returns all task schedules ordered by next_run_at.
type WaitOpts ¶ added in v0.0.6
WaitOpts configures WaitForOutput polling behavior. Zero values use defaults.
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker processes tasks and flows from the queue
func NewWorker ¶
NewWorker creates a new worker with the given connection. Use builder methods (AddTask, AddFlow, etc.) to configure the worker. Call Start(ctx) to begin processing tasks and flows.
func (*Worker) Start ¶
Start begins processing tasks and flows.
The worker will:
- poll for new work and execute task and flow step handlers while ctx is active
- run any configured cron-style task and flow schedules
- send periodic heartbeats while it is running
- register built-in garbage collection task running every 5 minutes
Shutdown behaviour:
- when ctx is cancelled the worker immediately stops reading new work and begins shutting down
- if WithShutdownTimeout is set to a value > 0, that duration is used as a grace period for in‑flight handlers after ctx is cancelled; once the grace period expires the handler context is cancelled and remaining handlers are asked to stop. The default graceful shutdown timeout is 5 seconds.
- if WithShutdownTimeout is not set or set to 0, there is no grace period: the handler context is cancelled immediately once ctx is cancelled and Start returns after all goroutines finish
func (*Worker) WithLogger ¶ added in v0.0.10
WithLogger sets the worker logger.
type WorkerInfo ¶
type WorkerInfo struct {
ID string `json:"id"`
TaskHandlers []*TaskHandlerInfo `json:"task_handlers"`
StepHandlers []*StepHandlerInfo `json:"step_handlers"`
StartedAt time.Time `json:"started_at"`
LastHeartbeatAt time.Time `json:"last_heartbeat_at"`
}
func ListWorkers ¶
func ListWorkers(ctx context.Context, conn Conn) ([]*WorkerInfo, error)
ListWorkers returns all registered workers.