Documentation
¶
Index ¶
- Variables
- func GetAttempt(ctx context.Context) int
- func GetErrorClassString(errClass ErrorClassification) string
- func GetHeaders(ctx context.Context) map[string]string
- func GetJobID(ctx context.Context) string
- func GetMaxRetry(ctx context.Context) int
- func GetNodeID(ctx context.Context) string
- func GetRetryAfter(err error) time.Duration
- func GetRunID(ctx context.Context) string
- func IsPanicError(err error) bool
- func ReportProgress(ctx context.Context, data any) error
- func ValidJobTransition(from, to JobStatus) bool
- func ValidRunTransition(from, to RunStatus) bool
- func WithAttempt(ctx context.Context, attempt int) context.Context
- func WithHeaders(ctx context.Context, headers map[string]string) context.Context
- func WithJobID(ctx context.Context, id string) context.Context
- func WithMaxRetry(ctx context.Context, max int) context.Context
- func WithNodeID(ctx context.Context, id string) context.Context
- func WithPriority(ctx context.Context, p Priority) context.Context
- func WithProgressReporter(ctx context.Context, fn ProgressReporterFunc) context.Context
- func WithRunID(ctx context.Context, id string) context.Context
- func WithTaskType(ctx context.Context, tt TaskType) context.Context
- type AggregateResult
- type AtTime
- type BatchDefinition
- type BatchFailurePolicy
- type BatchInstance
- type BatchItem
- type BatchItemResult
- type BatchItemState
- type BatchOnetimeState
- type BatchProgress
- type ConditionResult
- type Duration
- type EnqueueGroupOption
- type ErrorClassification
- type EventType
- type GroupAggregator
- type GroupAggregatorFunc
- type GroupConfig
- type GroupHandler
- type GroupMessage
- type HandlerDefinition
- type HandlerFunc
- type HandlerFuncWithResult
- func ChainMiddlewareWithResult(handler HandlerFuncWithResult, middlewares ...MiddlewareWithResultFunc) HandlerFuncWithResult
- func TypedConditionHandler[T any](fn func(ctx context.Context, payload T) (uint, error)) HandlerFuncWithResult
- func TypedHandlerWithResult[T any, R any](fn func(ctx context.Context, payload T) (R, error)) HandlerFuncWithResult
- func TypedHandlerWithUpstreamResult[U any, T any, R any](fn func(ctx context.Context, upstream U, original T) (R, error)) HandlerFuncWithResult
- func TypedSubflowHandler[T any](fn func(ctx context.Context, payload T) ([]WorkflowTask, error)) HandlerFuncWithResult
- type HookFunc
- type Hooks
- type Job
- type JobEvent
- type JobRun
- type JobStatus
- type MiddlewareFunc
- type MiddlewareWithResultFunc
- type NodeInfo
- type NonRetryableError
- type OverlapPolicy
- type PanicError
- type PauseError
- type PeriodicTaskConfig
- type PeriodicTaskConfigProvider
- type PeriodicTaskConfigProviderFunc
- type PoolStats
- type Precondition
- type Priority
- type PriorityTier
- type ProgressReporterFunc
- type RateLimitedError
- type RedisOptions
- type RedisSentinelConfig
- type RepeatError
- type RetryPolicy
- type RetryableError
- type RunStatus
- type Schedule
- type ScheduleEntry
- type ScheduleType
- type SkipError
- type SubflowResult
- type TaskType
- type UpstreamPayload
- type WorkMessage
- type WorkResult
- type WorkflowDefinition
- type WorkflowHookDef
- type WorkflowHooks
- type WorkflowInstance
- type WorkflowSignal
- type WorkflowStatus
- type WorkflowTask
- type WorkflowTaskState
- type WorkflowTaskType
Constants ¶
This section is empty.
Variables ¶
var ( ErrJobNotFound = errors.New("dureq: job not found") ErrScheduleNotFound = errors.New("dureq: schedule not found") ErrRunNotFound = errors.New("dureq: run not found") ErrDuplicateJob = errors.New("dureq: duplicate unique key") ErrHandlerNotFound = errors.New("dureq: handler not registered for task type") ErrNotLeader = errors.New("dureq: not the leader node") ErrLockFailed = errors.New("dureq: failed to acquire lock") ErrLockNotHeld = errors.New("dureq: lock not held") ErrServerStopped = errors.New("dureq: server stopped") ErrInvalidSchedule = errors.New("dureq: invalid schedule") ErrInvalidJobStatus = errors.New("dureq: invalid job status transition") ErrRedisNotConnected = errors.New("dureq: Redis not connected") ErrWorkflowNotFound = errors.New("dureq: workflow not found") ErrCyclicDependency = errors.New("dureq: workflow has cyclic dependencies") ErrBatchNotFound = errors.New("dureq: batch not found") ErrExecutionTimedOut = errors.New("dureq: execution timed out") ErrScheduleToStartTimeout = errors.New("dureq: schedule-to-start timeout exceeded") ErrNoProgressReporter = errors.New("dureq: no progress reporter in context (called outside handler?)") )
Sentinel errors.
Functions ¶
func GetAttempt ¶
GetAttempt extracts the attempt number from the context.
func GetErrorClassString ¶
func GetErrorClassString(errClass ErrorClassification) string
GetErrorClassString extracts the error string from an ErrorClassification.
func GetHeaders ¶
GetHeaders extracts the headers map from the context.
func GetMaxRetry ¶
GetMaxRetry extracts the max retry count from the context.
func GetRetryAfter ¶
GetRetryAfter extracts the RetryAfter duration from a RateLimitedError.
func IsPanicError ¶
IsPanicError checks whether err (or any wrapped error) is a PanicError.
func ReportProgress ¶
ReportProgress reports user-defined progress data for the current run. The data is stored alongside the run's heartbeat and is accessible via the monitoring API and client SDK.
func ValidJobTransition ¶ added in v0.1.10
ValidJobTransition returns true if transitioning from → to is allowed.
func ValidRunTransition ¶ added in v0.1.10
ValidRunTransition returns true if transitioning from → to is allowed.
func WithAttempt ¶
WithAttempt stores the attempt number in the context.
func WithHeaders ¶
WithHeaders stores the headers map in the context.
func WithMaxRetry ¶
WithMaxRetry stores the max retry count in the context.
func WithNodeID ¶
WithNodeID stores the node ID in the context.
func WithPriority ¶
WithPriority stores the priority in the context.
func WithProgressReporter ¶
func WithProgressReporter(ctx context.Context, fn ProgressReporterFunc) context.Context
WithProgressReporter stores the progress reporter function in the context.
Types ¶
type AggregateResult ¶
type AggregateResult struct {
Group string `json:"group"`
Count int `json:"count"`
AggregatedPayload json.RawMessage `json:"aggregated_payload"`
}
AggregateResult is the outcome of flushing a group.
type AtTime ¶
type AtTime struct {
Hour uint `json:"hour"`
Minute uint `json:"minute"`
Second uint `json:"second"`
}
AtTime represents a time of day as hour, minute, second.
type BatchDefinition ¶
type BatchDefinition struct {
Name string `json:"name"`
// OnetimeTaskType is the handler for shared preprocessing (runs once before items).
OnetimeTaskType *TaskType `json:"onetime_task_type,omitempty"`
OnetimePayload json.RawMessage `json:"onetime_payload,omitempty"`
// ItemTaskType is the handler that processes each batch item.
ItemTaskType TaskType `json:"item_task_type"`
Items []BatchItem `json:"items"`
// FailurePolicy controls partial failure behavior. Default: continue_on_error.
FailurePolicy BatchFailurePolicy `json:"failure_policy,omitempty"`
// ChunkSize controls how many items are dispatched concurrently. Default: 100.
ChunkSize int `json:"chunk_size,omitempty"`
// ItemRetryPolicy for individual item failures.
ItemRetryPolicy *RetryPolicy `json:"item_retry_policy,omitempty"`
// Execution timeout for the entire batch.
ExecutionTimeout *Duration `json:"execution_timeout,omitempty"`
// RetryPolicy for retrying the entire batch on failure.
RetryPolicy *RetryPolicy `json:"retry_policy,omitempty"`
// DefaultPriority for all jobs created by this batch.
DefaultPriority *Priority `json:"default_priority,omitempty"`
}
BatchDefinition describes a batch processing job to submit.
type BatchFailurePolicy ¶
type BatchFailurePolicy string
BatchFailurePolicy controls behavior when a batch item fails.
const ( // BatchFailFast stops the batch immediately on first item failure. BatchFailFast BatchFailurePolicy = "fail_fast" // BatchContinueOnError continues processing remaining items. BatchContinueOnError BatchFailurePolicy = "continue_on_error" )
type BatchInstance ¶
type BatchInstance struct {
ID string `json:"id"`
Name string `json:"name"`
Status WorkflowStatus `json:"status"`
Definition BatchDefinition `json:"definition"`
// Onetime preprocessing state.
OnetimeState *BatchOnetimeState `json:"onetime_state,omitempty"`
// Per-item tracking.
ItemStates map[string]BatchItemState `json:"item_states"`
// Progress counters.
TotalItems int `json:"total_items"`
CompletedItems int `json:"completed_items"`
FailedItems int `json:"failed_items"`
RunningItems int `json:"running_items"`
PendingItems int `json:"pending_items"`
// Chunk tracking — index into Definition.Items for the next chunk.
NextChunkIndex int `json:"next_chunk_index"`
// Timeout and retry tracking.
Deadline *time.Time `json:"deadline,omitempty"`
Attempt int `json:"attempt,omitempty"`
MaxAttempts int `json:"max_attempts,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
}
BatchInstance is the runtime state of a batch being processed.
type BatchItem ¶
type BatchItem struct {
ID string `json:"id"`
Payload json.RawMessage `json:"payload"`
}
BatchItem is a single item within a batch.
type BatchItemResult ¶
type BatchItemResult struct {
BatchID string `json:"batch_id"`
ItemID string `json:"item_id"`
Success bool `json:"success"`
Output json.RawMessage `json:"output,omitempty"`
Error *string `json:"error,omitempty"`
}
BatchItemResult stores the output of a single batch item execution.
type BatchItemState ¶
type BatchItemState struct {
ItemID string `json:"item_id"`
JobID string `json:"job_id,omitempty"`
Status JobStatus `json:"status"`
Error *string `json:"error,omitempty"`
StartedAt *time.Time `json:"started_at,omitempty"`
FinishedAt *time.Time `json:"finished_at,omitempty"`
}
BatchItemState tracks an individual item within a batch.
type BatchOnetimeState ¶
type BatchOnetimeState struct {
JobID string `json:"job_id,omitempty"`
Status JobStatus `json:"status"`
ResultData json.RawMessage `json:"result_data,omitempty"`
Error *string `json:"error,omitempty"`
StartedAt *time.Time `json:"started_at,omitempty"`
FinishedAt *time.Time `json:"finished_at,omitempty"`
}
BatchOnetimeState tracks the shared preprocessing step.
type BatchProgress ¶
type BatchProgress struct {
BatchID string `json:"batch_id"`
Done int `json:"done"`
Total int `json:"total"`
Failed int `json:"failed"`
Step string `json:"step,omitempty"`
}
BatchProgress is published as an event for progress tracking.
type ConditionResult ¶ added in v0.1.10
type ConditionResult struct {
Route uint `json:"route"`
}
ConditionResult wraps the route index chosen by a condition handler. The orchestrator reads this from the job result to determine branching.
type Duration ¶
Duration is a JSON-friendly time.Duration.
func (Duration) MarshalJSON ¶
func (*Duration) UnmarshalJSON ¶
type EnqueueGroupOption ¶
type EnqueueGroupOption struct {
Group string // the group name (required)
TaskType TaskType
Payload json.RawMessage
}
EnqueueGroupOption configures a grouped task enqueue.
type ErrorClassification ¶
type ErrorClassification int
ErrorClassification categorizes errors for retry decisions.
const ( ErrorClassRetryable ErrorClassification = iota // transient, should retry ErrorClassNonRetryable // permanent, do not retry ErrorClassRateLimited // retry after specific delay )
const ( ErrorClassSkip ErrorClassification = iota + 10 // skip execution, no state change ErrorClassRepeat // save and re-enqueue immediately ErrorClassPause // pause the job )
func ClassifyControlFlow ¶ added in v0.1.10
func ClassifyControlFlow(err error) ErrorClassification
ClassifyControlFlow returns the extended classification including control flow errors. Returns ErrorClassRetryable for standard errors.
func ClassifyError ¶
func ClassifyError(err error) ErrorClassification
ClassifyError determines the retry classification of an error.
type EventType ¶
type EventType string
EventType identifies the kind of event published on the DUREQ_EVENTS stream.
const ( EventJobEnqueued EventType = "job.enqueued" EventJobScheduled EventType = "job.scheduled" EventJobDispatched EventType = "job.dispatched" EventJobStarted EventType = "job.started" EventJobCompleted EventType = "job.completed" EventJobFailed EventType = "job.failed" EventJobRetrying EventType = "job.retrying" EventJobDead EventType = "job.dead" EventJobCancelled EventType = "job.cancelled" EventJobPaused EventType = "job.paused" EventJobResumed EventType = "job.resumed" EventNodeJoined EventType = "node.joined" EventNodeLeft EventType = "node.left" EventLeaderElected EventType = "leader.elected" EventLeaderLost EventType = "leader.lost" EventScheduleCreated EventType = "schedule.created" EventScheduleRemoved EventType = "schedule.removed" EventWorkflowStarted EventType = "workflow.started" EventWorkflowCompleted EventType = "workflow.completed" EventWorkflowFailed EventType = "workflow.failed" EventWorkflowCancelled EventType = "workflow.cancelled" EventWorkflowTaskDispatched EventType = "workflow.task.dispatched" EventWorkflowTaskCompleted EventType = "workflow.task.completed" EventWorkflowTaskFailed EventType = "workflow.task.failed" EventBatchStarted EventType = "batch.started" EventBatchCompleted EventType = "batch.completed" EventBatchFailed EventType = "batch.failed" EventBatchCancelled EventType = "batch.cancelled" EventBatchOnetimeCompleted EventType = "batch.onetime.completed" EventBatchOnetimeFailed EventType = "batch.onetime.failed" EventBatchItemCompleted EventType = "batch.item.completed" EventBatchItemFailed EventType = "batch.item.failed" EventBatchProgress EventType = "batch.progress" EventWorkflowTimedOut EventType = "workflow.timed_out" EventWorkflowRetrying EventType = "workflow.retrying" EventBatchTimedOut EventType = "batch.timed_out" EventBatchRetrying EventType = "batch.retrying" EventJobScheduleToStartTimeout EventType = "job.schedule_to_start_timeout" EventNodeCrashDetected EventType = "node.crash_detected" EventJobAutoRecovered EventType = "job.auto_recovered" EventWorkflowSignalReceived EventType = "workflow.signal.received" EventWorkflowTaskPanicked EventType = "workflow.task.panicked" EventWorkflowSuspended EventType = "workflow.suspended" EventWorkflowResumed EventType = "workflow.resumed" EventWorkflowHookDispatched EventType = "workflow.hook.dispatched" EventWorkflowHookCompleted EventType = "workflow.hook.completed" EventWorkflowHookFailed EventType = "workflow.hook.failed" )
type GroupAggregator ¶
type GroupAggregator interface {
// Aggregate combines multiple task payloads into a single payload.
// The group name and list of payloads are provided.
Aggregate(group string, payloads []json.RawMessage) (json.RawMessage, error)
}
GroupAggregator defines how a set of grouped tasks should be combined into a single aggregated task for processing.
type GroupAggregatorFunc ¶
type GroupAggregatorFunc func(group string, payloads []json.RawMessage) (json.RawMessage, error)
GroupAggregatorFunc is a convenience adapter for GroupAggregator.
func (GroupAggregatorFunc) Aggregate ¶
func (f GroupAggregatorFunc) Aggregate(group string, payloads []json.RawMessage) (json.RawMessage, error)
type GroupConfig ¶
type GroupConfig struct {
// Aggregator combines grouped tasks into one before processing.
Aggregator GroupAggregator
// GracePeriod is how long to wait for additional tasks after the
// first task arrives in a group. Default: 5s.
GracePeriod time.Duration
// MaxDelay is the maximum time a group can wait before being
// flushed, regardless of GracePeriod. Default: 30s.
MaxDelay time.Duration
// MaxSize is the maximum number of tasks in a group before
// it is flushed immediately. Default: 100.
MaxSize int
}
GroupConfig configures task aggregation for a server.
type GroupHandler ¶
GroupHandler processes an aggregated group of tasks.
type GroupMessage ¶
type GroupMessage struct {
JobID string `json:"job_id"`
TaskType TaskType `json:"task_type"`
Payload json.RawMessage `json:"payload"`
AddedAt time.Time `json:"added_at"`
}
GroupMessage is stored in the group set pending aggregation.
type HandlerDefinition ¶
type HandlerDefinition struct {
TaskType TaskType
Handler HandlerFunc // original handler (no result)
HandlerWithResult HandlerFuncWithResult // handler that returns result data (optional)
Concurrency int // max concurrent per node (0 = pool default)
Timeout time.Duration // per-execution timeout (0 = no timeout)
RetryPolicy *RetryPolicy // default retry (overridable per-job)
Middlewares []MiddlewareFunc // per-handler middleware (applied after global)
// Version identifies the handler build for safe deployments.
// When set, the worker will skip messages dispatched with a different version
// and re-enqueue them for a matching worker to pick up.
Version string
}
HandlerDefinition describes a handler registered on the server.
type HandlerFunc ¶
type HandlerFunc func(ctx context.Context, payload json.RawMessage) error
HandlerFunc is the function signature server handlers must implement.
func ChainMiddleware ¶
func ChainMiddleware(handler HandlerFunc, middlewares ...MiddlewareFunc) HandlerFunc
ChainMiddleware applies middlewares to a HandlerFunc in order. The first middleware in the list is the outermost (executed first).
func TypedHandler ¶ added in v0.1.10
func TypedHandler[T any](fn func(ctx context.Context, payload T) error) HandlerFunc
TypedHandler wraps a type-safe handler function into a HandlerFunc. The payload is automatically unmarshalled from JSON into T.
srv.RegisterHandler(types.HandlerDefinition{
TaskType: "order.process",
Handler: types.TypedHandler(func(ctx context.Context, order OrderPayload) error {
// order is already unmarshalled
return nil
}),
})
func TypedHandlerWithUpstream ¶ added in v0.1.10
func TypedHandlerWithUpstream[U any, T any](fn func(ctx context.Context, upstream U, original T) error) HandlerFunc
TypedHandlerWithUpstream wraps a handler that receives both the upstream task's result and this task's original payload. Use with WorkflowTask.ResultFrom to enable automatic data flow between workflow tasks.
srv.RegisterHandler(types.HandlerDefinition{
TaskType: "process.order",
Handler: types.TypedHandlerWithUpstream(func(ctx context.Context, upstream ValidateResult, original OrderPayload) error {
// upstream is the result from the task specified in ResultFrom
// original is this task's own payload
return nil
}),
})
type HandlerFuncWithResult ¶
type HandlerFuncWithResult func(ctx context.Context, payload json.RawMessage) (json.RawMessage, error)
HandlerFuncWithResult is an extended handler that returns output data.
func ChainMiddlewareWithResult ¶
func ChainMiddlewareWithResult(handler HandlerFuncWithResult, middlewares ...MiddlewareWithResultFunc) HandlerFuncWithResult
ChainMiddlewareWithResult applies middlewares to a HandlerFuncWithResult.
func TypedConditionHandler ¶ added in v0.1.10
func TypedConditionHandler[T any](fn func(ctx context.Context, payload T) (uint, error)) HandlerFuncWithResult
TypedConditionHandler wraps a type-safe condition handler that returns a route index. The route index maps to ConditionRoutes in WorkflowTask to determine the next branch.
srv.RegisterHandler(types.HandlerDefinition{
TaskType: "check.threshold",
HandlerWithResult: types.TypedConditionHandler(func(ctx context.Context, p ThresholdPayload) (uint, error) {
if p.Value > 100 { return 0, nil } // route 0: high
return 1, nil // route 1: low
}),
})
func TypedHandlerWithResult ¶ added in v0.1.10
func TypedHandlerWithResult[T any, R any](fn func(ctx context.Context, payload T) (R, error)) HandlerFuncWithResult
TypedHandlerWithResult wraps a type-safe handler that returns output data. The payload is unmarshalled from JSON into T, and the output R is marshalled back.
srv.RegisterHandler(types.HandlerDefinition{
TaskType: "order.process",
HandlerWithResult: types.TypedHandlerWithResult(func(ctx context.Context, order OrderPayload) (OrderResult, error) {
return OrderResult{Status: "ok"}, nil
}),
})
func TypedHandlerWithUpstreamResult ¶ added in v0.1.10
func TypedHandlerWithUpstreamResult[U any, T any, R any](fn func(ctx context.Context, upstream U, original T) (R, error)) HandlerFuncWithResult
TypedHandlerWithUpstreamResult wraps a handler that receives upstream result + original payload and returns output data. Combines ResultFrom piping with result production.
func TypedSubflowHandler ¶ added in v0.1.10
func TypedSubflowHandler[T any](fn func(ctx context.Context, payload T) ([]WorkflowTask, error)) HandlerFuncWithResult
TypedSubflowHandler wraps a handler that dynamically generates workflow tasks at runtime. The returned tasks are injected into the workflow instance and dispatched by the orchestrator.
srv.RegisterHandler(types.HandlerDefinition{
TaskType: "generate.subtasks",
HandlerWithResult: types.TypedSubflowHandler(func(ctx context.Context, p Input) ([]types.WorkflowTask, error) {
return []types.WorkflowTask{...}, nil
}),
})
type HookFunc ¶ added in v0.1.10
HookFunc is a callback invoked when a lifecycle event occurs. The context carries the same metadata available in handler contexts (job ID, run ID, etc. via GetJobID, GetRunID, etc.).
type Hooks ¶ added in v0.1.10
type Hooks struct {
OnJobCompleted []HookFunc
OnJobFailed []HookFunc
OnJobDead []HookFunc
OnJobPaused []HookFunc
OnJobResumed []HookFunc
OnJobCancelled []HookFunc
OnWorkflowCompleted []HookFunc
OnWorkflowFailed []HookFunc
OnBatchCompleted []HookFunc
OnBatchFailed []HookFunc
}
Hooks holds registered lifecycle callbacks. All hooks are optional and invoked asynchronously (non-blocking).
type Job ¶
type Job struct {
ID string `json:"id"`
TaskType TaskType `json:"task_type"`
Payload json.RawMessage `json:"payload"`
Schedule Schedule `json:"schedule"`
RetryPolicy *RetryPolicy `json:"retry_policy,omitempty"`
// State
Status JobStatus `json:"status"`
Attempt int `json:"attempt"`
ConsecutiveErrors int `json:"consecutive_errors,omitempty"`
LastError *string `json:"last_error,omitempty"`
LastRunAt *time.Time `json:"last_run_at,omitempty"`
NextRunAt *time.Time `json:"next_run_at,omitempty"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
PausedAt *time.Time `json:"paused_at,omitempty"`
ResumeAt *time.Time `json:"resume_at,omitempty"` // auto-resume time (nil = manual only)
// Metadata
Tags []string `json:"tags,omitempty"`
Headers map[string]string `json:"headers,omitempty"` // key-value metadata
UniqueKey *string `json:"unique_key,omitempty"`
DLQAfter *int `json:"dlq_after,omitempty"` // move to DLQ after N failures
// Workflow association (set when this job is part of a workflow).
WorkflowID *string `json:"workflow_id,omitempty"`
WorkflowTask *string `json:"workflow_task,omitempty"`
// Batch association (set when this job is part of a batch).
BatchID *string `json:"batch_id,omitempty"`
BatchItem *string `json:"batch_item,omitempty"` // item ID within batch
BatchRole *string `json:"batch_role,omitempty"` // "onetime" or "item"
// Priority and queue policies.
Priority *Priority `json:"priority,omitempty"`
ScheduleToStartTimeout *Duration `json:"schedule_to_start_timeout,omitempty"`
// HeartbeatTimeout overrides the default 30s stale age for orphan detection.
// If set, a run is considered orphaned if no heartbeat is received within
// this duration.
HeartbeatTimeout *Duration `json:"heartbeat_timeout,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
Job is the persisted entity representing a registered job with its schedule.
type JobEvent ¶
type JobEvent struct {
Type EventType `json:"type"`
JobID string `json:"job_id,omitempty"`
RunID string `json:"run_id,omitempty"`
NodeID string `json:"node_id,omitempty"`
TaskType TaskType `json:"task_type,omitempty"`
Error *string `json:"error,omitempty"`
Attempt int `json:"attempt,omitempty"`
Timestamp time.Time `json:"timestamp"`
BatchProgress *BatchProgress `json:"batch_progress,omitempty"`
AffectedRunIDs []string `json:"affected_run_ids,omitempty"`
// WorkflowID is set for task-level workflow events so they can be indexed
// under the parent workflow rather than the task's job ID.
WorkflowID string `json:"workflow_id,omitempty"`
}
JobEvent is the envelope published to the DUREQ_EVENTS stream.
type JobRun ¶
type JobRun struct {
ID string `json:"id"`
JobID string `json:"job_id"`
NodeID string `json:"node_id"`
Status RunStatus `json:"status"`
Attempt int `json:"attempt"`
Error *string `json:"error,omitempty"`
StartedAt time.Time `json:"started_at"`
FinishedAt *time.Time `json:"finished_at,omitempty"`
Duration time.Duration `json:"duration,omitempty"`
LastHeartbeatAt *time.Time `json:"last_heartbeat_at,omitempty"`
Progress json.RawMessage `json:"progress,omitempty"` // user-reported progress data
}
JobRun represents an individual execution of a job.
type JobStatus ¶
type JobStatus string
JobStatus represents the lifecycle state of a job definition.
const ( JobStatusPending JobStatus = "pending" JobStatusScheduled JobStatus = "scheduled" JobStatusRunning JobStatus = "running" JobStatusCompleted JobStatus = "completed" JobStatusFailed JobStatus = "failed" JobStatusRetrying JobStatus = "retrying" JobStatusPaused JobStatus = "paused" JobStatusDead JobStatus = "dead" JobStatusCancelled JobStatus = "cancelled" )
func (JobStatus) IsPaused ¶ added in v0.1.10
IsPaused returns true if the job is in the paused state.
func (JobStatus) IsTerminal ¶
type MiddlewareFunc ¶
type MiddlewareFunc func(HandlerFunc) HandlerFunc
MiddlewareFunc wraps a HandlerFunc, allowing pre/post processing.
type MiddlewareWithResultFunc ¶
type MiddlewareWithResultFunc func(HandlerFuncWithResult) HandlerFuncWithResult
MiddlewareWithResultFunc wraps a HandlerFuncWithResult.
func AdaptMiddleware ¶
func AdaptMiddleware(mw MiddlewareFunc) MiddlewareWithResultFunc
AdaptMiddleware converts a MiddlewareFunc for use with HandlerFuncWithResult. The adapted middleware wraps the result handler by converting it to/from a plain handler.
type NodeInfo ¶
type NodeInfo struct {
NodeID string `json:"node_id"`
Address string `json:"address,omitempty"`
TaskTypes []string `json:"task_types"`
StartedAt time.Time `json:"started_at"`
LastHeartbeat time.Time `json:"last_heartbeat"`
PoolStats *PoolStats `json:"pool_stats,omitempty"`
ActiveRunIDs []string `json:"active_run_ids,omitempty"`
// HandlerVersions maps task type → handler version for safe deployment routing.
HandlerVersions map[string]string `json:"handler_versions,omitempty"`
}
NodeInfo is registered by each node in the dureq_nodes KV bucket.
type NonRetryableError ¶
type NonRetryableError struct{ Err error }
NonRetryableError wraps an error as non-retryable.
func (*NonRetryableError) Error ¶
func (e *NonRetryableError) Error() string
func (*NonRetryableError) Unwrap ¶
func (e *NonRetryableError) Unwrap() error
type OverlapPolicy ¶
type OverlapPolicy string
OverlapPolicy controls behavior when a scheduled job fires while a previous execution is still running.
const ( // OverlapAllowAll dispatches regardless of active runs (default). OverlapAllowAll OverlapPolicy = "ALLOW_ALL" // OverlapSkip skips the dispatch if any run for this job is active. OverlapSkip OverlapPolicy = "SKIP" // OverlapBufferOne buffers at most one pending dispatch while a run is active. OverlapBufferOne OverlapPolicy = "BUFFER_ONE" // OverlapBufferAll buffers all pending dispatches while a run is active. OverlapBufferAll OverlapPolicy = "BUFFER_ALL" // OverlapReplace cancels the current active run and starts a new one. OverlapReplace OverlapPolicy = "REPLACE" )
type PanicError ¶
type PanicError struct {
Value interface{}
Stacktrace string
}
PanicError wraps a recovered panic value as an error.
func GetPanicError ¶
func GetPanicError(err error) *PanicError
GetPanicError extracts the PanicError from an error chain, if present.
func (*PanicError) Error ¶
func (e *PanicError) Error() string
type PauseError ¶ added in v0.1.10
PauseError signals that the handler wants to pause the job. The job transitions to paused status and can be resumed manually or auto-resumed after RetryAfter duration.
func (*PauseError) Error ¶ added in v0.1.10
func (e *PauseError) Error() string
type PeriodicTaskConfig ¶
type PeriodicTaskConfig struct {
// CronExpr is the cron expression (e.g., "*/5 * * * *").
CronExpr string `json:"cron_expr"`
// TaskType is the handler to invoke.
TaskType TaskType `json:"task_type"`
// Payload is the task payload.
Payload json.RawMessage `json:"payload,omitempty"`
// UniqueKey prevents duplicate schedules for the same logical task.
UniqueKey string `json:"unique_key,omitempty"`
}
PeriodicTaskConfig describes a single periodic task to be managed.
type PeriodicTaskConfigProvider ¶
type PeriodicTaskConfigProvider interface {
GetConfigs() ([]*PeriodicTaskConfig, error)
}
PeriodicTaskConfigProvider is an interface that returns the current set of periodic tasks. The PeriodicTaskManager calls GetConfigs periodically and syncs the returned tasks with the scheduler — adding new ones, removing stale ones, and updating changed ones.
type PeriodicTaskConfigProviderFunc ¶
type PeriodicTaskConfigProviderFunc func() ([]*PeriodicTaskConfig, error)
PeriodicTaskConfigProviderFunc is a convenience adapter.
func (PeriodicTaskConfigProviderFunc) GetConfigs ¶
func (f PeriodicTaskConfigProviderFunc) GetConfigs() ([]*PeriodicTaskConfig, error)
type PoolStats ¶
type PoolStats struct {
RunningWorkers int `json:"running_workers"`
IdleWorkers int `json:"idle_workers"`
MaxConcurrency int `json:"max_concurrency"`
TotalSubmitted uint64 `json:"total_submitted"`
TotalCompleted uint64 `json:"total_completed"`
TotalFailed uint64 `json:"total_failed"`
QueueLength int `json:"queue_length"`
}
PoolStats captures worker pool metrics for a node.
type Precondition ¶ added in v0.1.10
type Precondition struct {
Type string `json:"type"` // "upstream_output"
Task string `json:"task,omitempty"` // upstream task name
Path string `json:"path"` // JSONPath in upstream output
Expected string `json:"expected"` // expected value (string comparison)
}
Precondition defines a condition that must be true before a task is dispatched.
type Priority ¶
type Priority int
Priority represents the execution priority of a job (1-10).
func GetPriority ¶
GetPriority extracts the priority from the context.
type PriorityTier ¶
type PriorityTier string
PriorityTier maps a numeric priority to a Redis Stream tier.
const ( TierHigh PriorityTier = "high" TierNormal PriorityTier = "normal" TierLow PriorityTier = "low" )
func TierForPriority ¶
func TierForPriority(p Priority) PriorityTier
TierForPriority returns the subject tier for a given priority value.
type ProgressReporterFunc ¶
type ProgressReporterFunc func(ctx context.Context, data json.RawMessage) error
ProgressReporterFunc is the callback stored in context for reporting progress.
type RateLimitedError ¶
RateLimitedError wraps an error with a retry-after duration.
func (*RateLimitedError) Error ¶
func (e *RateLimitedError) Error() string
func (*RateLimitedError) Unwrap ¶
func (e *RateLimitedError) Unwrap() error
type RedisOptions ¶
type RedisOptions struct {
// URL is the Redis server URL (e.g., "redis://localhost:6379", "rediss://host:6380/0").
URL string
// Username for Redis ACL authentication (Redis 6+). Leave empty for legacy auth.
Username string
// Password for Redis authentication. Overridden if set in URL.
Password string
// DB is the Redis database number. Default: 0.
DB int
// PoolSize is the maximum number of connections in the pool. Default: 10.
PoolSize int
// TLSConfig enables TLS for the connection. Set to non-nil to enable.
TLSConfig *tls.Config
// SentinelConfig enables Redis Sentinel failover mode.
// When set, URL is ignored and Sentinel is used instead.
SentinelConfig *RedisSentinelConfig
// ClusterAddrs enables Redis Cluster mode.
// When set, URL is ignored and a cluster client is created.
ClusterAddrs []string
}
RedisOptions holds connection parameters for the Redis client.
type RedisSentinelConfig ¶
type RedisSentinelConfig struct {
// MasterName is the name of the master as configured in Sentinel.
MasterName string
// SentinelAddrs is the list of Sentinel node addresses (host:port).
SentinelAddrs []string
// SentinelPassword is the password for Sentinel nodes (if different from master password).
SentinelPassword string
}
RedisSentinelConfig holds configuration for Redis Sentinel failover.
type RepeatError ¶ added in v0.1.10
RepeatError signals that the handler wants to save current progress and immediately re-enqueue the job for another execution. This is useful for long-running tasks that want to checkpoint periodically.
func (*RepeatError) Error ¶ added in v0.1.10
func (e *RepeatError) Error() string
func (*RepeatError) Unwrap ¶ added in v0.1.10
func (e *RepeatError) Unwrap() error
type RetryPolicy ¶
type RetryPolicy struct {
MaxAttempts int `json:"max_attempts"`
InitialDelay time.Duration `json:"initial_delay"`
MaxDelay time.Duration `json:"max_delay"`
Multiplier float64 `json:"multiplier"`
Jitter float64 `json:"jitter"` // 0.0 - 1.0
// NonRetryableErrors is a list of error message substrings that should not be retried.
// If a task fails with an error containing any of these strings, it is immediately
// moved to dead status without retrying.
NonRetryableErrors []string `json:"non_retryable_errors,omitempty"`
// PauseAfterErrCount pauses the job instead of moving to dead when
// N consecutive errors are reached. 0 = disabled (go directly to dead).
// When set, the job transitions to "paused" instead of "dead" after
// this many consecutive failures, giving operators a chance to investigate.
PauseAfterErrCount int `json:"pause_after_err_count,omitempty"`
// PauseRetryDelay is the duration after which a paused job is automatically
// resumed. 0 = manual resume only.
PauseRetryDelay time.Duration `json:"pause_retry_delay,omitempty"`
}
RetryPolicy describes retry behavior.
func DefaultRetryPolicy ¶
func DefaultRetryPolicy() *RetryPolicy
DefaultRetryPolicy returns a sensible default retry policy.
type RetryableError ¶
type RetryableError struct{ Err error }
RetryableError wraps an error as retryable.
func (*RetryableError) Error ¶
func (e *RetryableError) Error() string
func (*RetryableError) Unwrap ¶
func (e *RetryableError) Unwrap() error
type RunStatus ¶
type RunStatus string
RunStatus represents the state of an individual job execution.
func (RunStatus) IsTerminal ¶
type Schedule ¶
type Schedule struct {
Type ScheduleType `json:"type"`
// ONE_TIME: run once at this time
RunAt *time.Time `json:"run_at,omitempty"`
// DURATION: repeat at fixed interval
Interval *Duration `json:"interval,omitempty"`
// CRON: cron expression
CronExpr *string `json:"cron_expr,omitempty"`
// DAILY/WEEKLY/MONTHLY
RegularInterval *uint `json:"regular_interval,omitempty"` // every N days/weeks/months
AtTimes []AtTime `json:"at_times,omitempty"` // times of day
IncludedDays []int `json:"included_days,omitempty"` // weekdays (0-6) or month days (1-31)
// Common boundaries
StartsAt *time.Time `json:"starts_at,omitempty"`
EndsAt *time.Time `json:"ends_at,omitempty"`
Timezone string `json:"timezone,omitempty"`
// OverlapPolicy controls what happens when a new occurrence fires
// while a previous run is still active. Only for recurring schedules.
OverlapPolicy OverlapPolicy `json:"overlap_policy,omitempty"`
// CatchupWindow defines the maximum age of a missed firing that should
// be backfilled. Zero means no backfill (only the latest due firing).
CatchupWindow *Duration `json:"catchup_window,omitempty"`
// MaxBackfillPerTick limits how many missed firings are enqueued in a
// single scheduler tick to prevent flooding. Default: 10 if CatchupWindow is set.
MaxBackfillPerTick *int `json:"max_backfill_per_tick,omitempty"`
// Jitter adds a random offset to the computed next run time.
// This prevents thundering herd when many jobs share the same schedule.
// E.g., Jitter of 30s means the firing time is offset by rand(0, 30s).
Jitter *Duration `json:"jitter,omitempty"`
}
Schedule defines "when" a job should run.
type ScheduleEntry ¶
type ScheduleEntry struct {
JobID string `json:"job_id"`
NextRunAt time.Time `json:"next_run_at"`
Schedule Schedule `json:"schedule"`
LastProcessedAt *time.Time `json:"last_processed_at,omitempty"` // last successful processing time
}
ScheduleEntry is the KV entry the leader's scheduler scans for due jobs.
type ScheduleType ¶
type ScheduleType string
ScheduleType defines how a job is scheduled.
const ( ScheduleImmediate ScheduleType = "IMMEDIATE" ScheduleOneTime ScheduleType = "ONE_TIME" ScheduleDuration ScheduleType = "DURATION" ScheduleCron ScheduleType = "CRON" ScheduleDaily ScheduleType = "DAILY" ScheduleWeekly ScheduleType = "WEEKLY" ScheduleMonthly ScheduleType = "MONTHLY" )
type SkipError ¶ added in v0.1.10
type SkipError struct {
Reason string
}
SkipError signals that the handler wants to skip this execution without changing the job status. Useful when the handler determines that preconditions are not met and the job should simply be ignored for this run.
type SubflowResult ¶ added in v0.1.10
type SubflowResult struct {
Tasks []WorkflowTask `json:"tasks"`
}
SubflowResult wraps dynamically generated tasks from a subflow handler.
type TaskType ¶
type TaskType string
TaskType identifies a registered handler. This is the "how" — what function to call when a job of this type fires.
func GetTaskType ¶
GetTaskType extracts the task type from the context.
type UpstreamPayload ¶ added in v0.1.10
type UpstreamPayload[T any] struct { Upstream json.RawMessage `json:"upstream"` Original T `json:"original"` }
UpstreamPayload wraps an upstream task's result alongside the original task payload. Used with ResultFrom to automatically pipe upstream outputs to downstream tasks.
type WorkMessage ¶
type WorkMessage struct {
RunID string `json:"run_id"`
JobID string `json:"job_id"`
TaskType TaskType `json:"task_type"`
Payload json.RawMessage `json:"payload"`
Attempt int `json:"attempt"`
Deadline time.Time `json:"deadline"`
Metadata map[string]string `json:"metadata,omitempty"`
Headers map[string]string `json:"headers,omitempty"`
Priority Priority `json:"priority,omitempty"`
DispatchedAt time.Time `json:"dispatched_at,omitempty"`
// Version is the handler version this message was dispatched with.
// Workers with a mismatched version will skip and re-enqueue the message.
Version string `json:"version,omitempty"`
}
WorkMessage is the message published to DUREQ_WORK stream.
type WorkResult ¶
type WorkResult struct {
RunID string `json:"run_id"`
JobID string `json:"job_id"`
Success bool `json:"success"`
Error *string `json:"error,omitempty"`
Output json.RawMessage `json:"output,omitempty"` // handler output data
}
WorkResult is the outcome of executing a work message.
type WorkflowDefinition ¶
type WorkflowDefinition struct {
Name string `json:"name"`
Tasks []WorkflowTask `json:"tasks"`
ExecutionTimeout *Duration `json:"execution_timeout,omitempty"`
RetryPolicy *RetryPolicy `json:"retry_policy,omitempty"`
DefaultPriority *Priority `json:"default_priority,omitempty"`
Hooks *WorkflowHooks `json:"hooks,omitempty"`
}
WorkflowDefinition describes a DAG of tasks to execute.
type WorkflowHookDef ¶ added in v0.1.10
type WorkflowHookDef struct {
TaskType TaskType `json:"task_type"`
Payload json.RawMessage `json:"payload,omitempty"`
Timeout Duration `json:"timeout,omitempty"`
}
WorkflowHookDef defines a single lifecycle hook job.
type WorkflowHooks ¶ added in v0.1.10
type WorkflowHooks struct {
OnInit *WorkflowHookDef `json:"on_init,omitempty"`
OnSuccess *WorkflowHookDef `json:"on_success,omitempty"`
OnFailure *WorkflowHookDef `json:"on_failure,omitempty"`
OnExit *WorkflowHookDef `json:"on_exit,omitempty"`
}
WorkflowHooks defines lifecycle hooks that are automatically dispatched at specific workflow lifecycle events.
type WorkflowInstance ¶
type WorkflowInstance struct {
ID string `json:"id"`
WorkflowName string `json:"workflow_name"`
Status WorkflowStatus `json:"status"`
Tasks map[string]WorkflowTaskState `json:"tasks"`
Definition WorkflowDefinition `json:"definition"`
Input json.RawMessage `json:"input,omitempty"`
Deadline *time.Time `json:"deadline,omitempty"`
Attempt int `json:"attempt,omitempty"`
MaxAttempts int `json:"max_attempts,omitempty"`
// PendingDeps tracks the number of unresolved dependencies per task.
// Initialized at workflow start, decremented on dependency completion.
// A task with PendingDeps[name] == 0 and status == pending is ready.
PendingDeps map[string]int `json:"pending_deps,omitempty"`
// ConditionIterations tracks how many times each condition node has been
// evaluated, to enforce MaxIterations limits.
ConditionIterations map[string]int `json:"condition_iterations,omitempty"`
// ParentWorkflowID links this workflow to a parent workflow that spawned it
// as a child workflow task.
ParentWorkflowID *string `json:"parent_workflow_id,omitempty"`
// ParentTaskName is the task name in the parent workflow that this child represents.
ParentTaskName *string `json:"parent_task_name,omitempty"`
// HookStates tracks the state of lifecycle hooks dispatched for this workflow.
HookStates map[string]WorkflowTaskState `json:"hook_states,omitempty"`
// Output is the result of the final (leaf) task, copied here when the
// workflow completes so callers can retrieve the workflow-level result.
Output json.RawMessage `json:"output,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
}
WorkflowInstance is a running instance of a workflow definition.
type WorkflowSignal ¶ added in v0.1.3
type WorkflowSignal struct {
Name string `json:"name"`
Payload json.RawMessage `json:"payload,omitempty"`
SentAt time.Time `json:"sent_at"`
WorkflowID string `json:"workflow_id"`
}
WorkflowSignal is an external signal sent to a running workflow.
type WorkflowStatus ¶
type WorkflowStatus string
WorkflowStatus represents the lifecycle state of a workflow instance.
const ( WorkflowStatusPending WorkflowStatus = "pending" WorkflowStatusRunning WorkflowStatus = "running" WorkflowStatusCompleted WorkflowStatus = "completed" WorkflowStatusFailed WorkflowStatus = "failed" WorkflowStatusCancelled WorkflowStatus = "cancelled" WorkflowStatusSuspended WorkflowStatus = "suspended" )
func (WorkflowStatus) IsTerminal ¶
func (s WorkflowStatus) IsTerminal() bool
func (WorkflowStatus) String ¶
func (s WorkflowStatus) String() string
type WorkflowTask ¶
type WorkflowTask struct {
Name string `json:"name"`
TaskType TaskType `json:"task_type"`
Payload json.RawMessage `json:"payload,omitempty"`
DependsOn []string `json:"depends_on,omitempty"`
// Priority controls dispatch order among concurrently ready tasks.
// Higher values are dispatched first. Default is 0.
Priority int `json:"priority,omitempty"`
// AllowFailure makes this task's failure non-fatal for the workflow.
// When true, the task's failure is logged but does not block downstream
// tasks or cause the workflow to fail. Downstream dependencies are
// considered "satisfied" even if this task fails.
AllowFailure bool `json:"allow_failure,omitempty"`
// ResultFrom specifies which upstream task's output should be injected
// as this task's payload when dispatching. The referenced task must be
// in DependsOn. If set, the original Payload field is ignored and
// replaced with the upstream task's result at dispatch time.
ResultFrom string `json:"result_from,omitempty"`
// Type distinguishes static tasks, condition nodes, and subflow generators.
// Empty string (default) means a regular static task.
Type WorkflowTaskType `json:"type,omitempty"`
// ConditionRoutes maps condition handler return values (0, 1, 2, ...)
// to successor task names. Only used when Type == WorkflowTaskCondition.
// The condition handler returns a uint; the orchestrator looks up the
// corresponding task name and marks only that branch as "satisfied".
ConditionRoutes map[uint]string `json:"condition_routes,omitempty"`
// MaxIterations limits how many times a condition node can be re-evaluated
// (to prevent infinite loops). Default 0 means use system default (100).
MaxIterations int `json:"max_iterations,omitempty"`
// ChildWorkflowDef, when set, makes this task spawn a child workflow
// instead of dispatching a regular job. The child runs independently
// and the parent task completes when the child workflow completes.
ChildWorkflowDef *WorkflowDefinition `json:"child_workflow_def,omitempty"`
// Preconditions define conditions that must be true before this task is dispatched.
// If any precondition fails, the task is skipped.
Preconditions []Precondition `json:"preconditions,omitempty"`
}
WorkflowTask is one step in a workflow DAG. A task is either a regular task (TaskType set), a child workflow (ChildWorkflowDef set), a condition node, or a dynamic subflow generator.
type WorkflowTaskState ¶
type WorkflowTaskState struct {
Name string `json:"name"`
JobID string `json:"job_id,omitempty"`
Status JobStatus `json:"status"`
Error *string `json:"error,omitempty"`
StartedAt *time.Time `json:"started_at,omitempty"`
FinishedAt *time.Time `json:"finished_at,omitempty"`
// ChildWorkflowID is set when this task spawned a child workflow.
ChildWorkflowID string `json:"child_workflow_id,omitempty"`
// ConditionRoute is set when a condition node completes, indicating
// which route index was selected by the handler.
ConditionRoute *uint `json:"condition_route,omitempty"`
// SubflowTasks lists the task names that were dynamically injected
// by a subflow generator node.
SubflowTasks []string `json:"subflow_tasks,omitempty"`
// Skipped indicates the task was skipped due to a failed precondition.
Skipped bool `json:"skipped,omitempty"`
SkipReason string `json:"skip_reason,omitempty"`
}
WorkflowTaskState tracks the state of one task within a workflow instance.
type WorkflowTaskType ¶ added in v0.1.10
type WorkflowTaskType string
WorkflowTaskType distinguishes static tasks, condition nodes, and subflow generators.
const ( // WorkflowTaskStatic is a regular task dispatched as a job (default). WorkflowTaskStatic WorkflowTaskType = "" // WorkflowTaskCondition is a condition node whose handler returns a route index. // The orchestrator evaluates the condition and branches to the corresponding successor. WorkflowTaskCondition WorkflowTaskType = "condition" // WorkflowTaskSubflow is a dynamic subflow node whose handler returns []WorkflowTask // at runtime. The orchestrator injects these tasks into the workflow instance. WorkflowTaskSubflow WorkflowTaskType = "subflow" )