durex

package module
v0.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 25, 2026 License: MIT Imports: 16 Imported by: 0

README

Durex

Go Reference Go Report Card

Temporal for the rest of us. Durable background jobs for Go — no Redis, no Kafka, just Go.

Durex is a lightweight, embeddable task queue with persistence, automatic retries, workflows, and a built-in dashboard. Start with SQLite, scale with PostgreSQL.

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│  Your App   │────▶│   Durex     │────▶│  SQLite or  │
│             │     │  Executor   │     │  PostgreSQL │
└─────────────┘     └──────┬──────┘     └─────────────┘
                           │
              ┌────────────┼────────────┐
              ▼            ▼            ▼
         ┌────────┐   ┌────────┐   ┌────────┐
         │Worker 1│   │Worker 2│   │Worker N│
         └────────┘   └────────┘   └────────┘
executor := durex.New(storage.NewMemory(), durex.WithDashboard(":8080"))
executor.HandleFunc("sendEmail", sendEmailHandler, durex.Retries(3))
executor.Start(ctx)

executor.Add(ctx, durex.Spec{Name: "sendEmail", Data: durex.M{"to": "user@example.com"}})

Why Durex?

Most teams face a choice: simple queues (Asynq, River) that lack workflows, or Temporal which is powerful but complex. Durex gives you 80% of Temporal's features with 20% of the complexity.

Durex Asynq River Temporal
Zero infrastructure ✅ SQLite/Postgres ❌ Redis ✅ Postgres ❌ Server cluster
Embedded dashboard ✅ Built-in ❌ Separate ❌ Separate ✅ Built-in
Workflow sequences
Saga pattern
Dead Letter Queue
Prometheus metrics
Multi-instance safe
Learning curve Low Low Low High
Time to first job 5 min 15 min 15 min 1+ hour
Choose Durex when you need:
  • Workflows without complexity — Sequences, sagas, fan-out/fan-in without learning a new paradigm
  • Zero infrastructure — No Redis/Kafka to deploy; SQLite for dev, Postgres for prod
  • Embeddable library — Ships as a Go package, not a separate service
  • Built-in observability — Dashboard, health checks, Prometheus metrics out of the box
When to consider alternatives:
  • Need Redis → Use Asynq (mature, battle-tested)
  • Already on Postgres, want simple → Use River (newer, focused)
  • Complex long-running workflows → Use Temporal (more powerful, more complex)
  • Need cron scheduling → Durex doesn't have cron expressions yet (coming soon)

Features

Core
Feature Description
Persistent Jobs Commands survive restarts — pick up where you left off
Automatic Retries Exponential backoff, jitter, configurable per command
Workflows Chain commands with Sequence, pass data between steps
Saga Pattern Compensation handlers for failed workflows (OnRecover)
Type Safety Generic typed handlers with HandleTyped[T]
Reliability
Feature Description
Multi-Instance Safe PostgreSQL row-level locking (FOR UPDATE SKIP LOCKED)
Panic Recovery Workers survive panics, commands marked as failed
Stuck Command Recovery Auto-detect and retry commands stuck in STARTED
Dead Letter Queue Inspect and replay failed commands
Execution Timeouts Cancel long-running handlers with context
Observability
Feature Description
Web Dashboard Built-in UI with retry/cancel actions
Prometheus Metrics Counters, histograms, gauges for all operations
Health Endpoint /api/health for load balancers
Tracing Trace and correlation IDs across command chains
Control
Feature Description
Rate Limiting Per-command and global concurrency limits
Deduplication Unique keys prevent duplicate jobs
Deadlines Time-bound execution with expiration handlers
Cancellation Cancel by ID or tag

Architecture

graph TB
    subgraph Your Application
        A[App Code]
    end
    
    subgraph Durex
        B[Executor]
        C[Command Registry]
        D[Worker Pool]
    end
    
    subgraph Storage Backends
        E[(PostgreSQL)]
        F[(SQLite)]
        G[(Memory)]
    end
    
    A -->|Add/HandleFunc| B
    B --> C
    B --> D
    D -->|Execute| C
    B <-->|Persist/Fetch| E
    B <-->|Persist/Fetch| F
    B <-->|Persist/Fetch| G

Installation

go get github.com/simonovic86/durex

Quick Start

package main

import (
    "context"
    "github.com/simonovic86/durex"
    "github.com/simonovic86/durex/storage"
)

func main() {
    // Create executor
    executor := durex.New(storage.NewMemory())

    // Register a command - just a function!
    executor.HandleFunc("greet", func(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
        name := cmd.GetString("name")
        fmt.Printf("Hello, %s!\n", name)
        return durex.Empty(), nil
    })

    // Start processing
    executor.Start(context.Background())
    defer executor.Stop()

    // Add a command
    executor.Add(ctx, durex.Spec{
        Name: "greet",
        Data: durex.M{"name": "World"},
    })
}

Three Ways to Create Commands

// Basic - just a function
executor.HandleFunc("sendEmail", func(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
    to := cmd.GetString("to")
    return mailer.Send(to), nil
})

// With options
executor.HandleFunc("sendEmail", sendEmailFn,
    durex.Retries(3),
    durex.OnRecover(handleFailure),
    durex.OnExpired(handleTimeout),
)
2. Typed Function (Type-Safe Data)
type EmailData struct {
    To      string `json:"to"`
    Subject string `json:"subject"`
}

// No more GetString() - data is typed!
durex.HandleTyped(executor, "sendEmail", func(ctx context.Context, data EmailData, cmd *durex.Instance) (durex.Result, error) {
    return mailer.Send(data.To, data.Subject), nil
}, durex.WithRetries[EmailData](3))

// Add with typed data
executor.Add(ctx, durex.Typed("sendEmail", EmailData{
    To:      "user@example.com",
    Subject: "Welcome!",
}))
3. Struct (When You Need Dependencies)
type SendEmailCommand struct {
    durex.BaseCommand
    mailer *MailService
}

func (c *SendEmailCommand) Name() string { return "sendEmail" }

func (c *SendEmailCommand) Execute(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
    return c.mailer.Send(cmd.GetString("to")), nil
}

executor.Register(&SendEmailCommand{mailer: mailerService})

Command Results

Result Description
durex.Empty() Done, no follow-up
durex.Repeat() Run again after Period
durex.Retry() Retry immediately
durex.Next(spec) Spawn one command
durex.Spawn(specs...) Spawn multiple commands
Command Lifecycle
stateDiagram-v2
    [*] --> Pending: Add()
    Pending --> Running: Worker picks up
    Running --> Completed: Success
    Running --> Failed: Error (retries exhausted)
    Running --> Pending: Retry / Repeat
    Running --> Expired: Deadline exceeded
    Expired --> [*]: OnExpired handler
    Failed --> [*]: OnRecover handler
    Completed --> [*]

Workflows (Command Chaining)

sequenceDiagram
    participant App
    participant Executor
    participant step1
    participant step2
    participant step3
    
    App->>Executor: Add(Spec with Sequence)
    Executor->>step1: Execute
    Note right of step1: Set("validated", true)
    step1-->>Executor: ContinueSequence()
    Executor->>step2: Execute (data passed)
    Note right of step2: GetBool("validated")
    step2-->>Executor: ContinueSequence()
    Executor->>step3: Execute
    step3-->>Executor: Empty()
// Register steps
executor.HandleFunc("step1", func(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
    cmd.Set("validated", true)  // Pass data to next step
    return cmd.ContinueSequence(nil), nil
})

executor.HandleFunc("step2", func(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
    validated := cmd.GetBool("validated")  // Receive data from previous step
    return cmd.ContinueSequence(nil), nil
})

// Execute workflow: step1 → step2 → step3
executor.Add(ctx, durex.Spec{
    Name:     "step1",
    Sequence: []string{"step2", "step3"},
    Data:     durex.M{"orderId": "123"},
})

Repeating Commands (Cron-like)

executor.HandleFunc("cleanup", func(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
    // cleanup logic...
    return durex.Repeat(), nil  // Run again after period
}, durex.Period(time.Hour))

Error Recovery (Saga Pattern)

graph TD
    A[processPayment] -->|retries exhausted| B[OnRecover Handler]
    B -->|Spawn| C[refundPayment]
    B -->|Spawn| D[releaseInventory]
    B -->|Spawn| E[notifyCustomer]
    
    style A fill:#ff6b6b,color:#fff
    style B fill:#ffd93d,color:#333
    style C fill:#6bcb77,color:#fff
    style D fill:#6bcb77,color:#fff
    style E fill:#6bcb77,color:#fff
executor.HandleFunc("processPayment", processPayment,
    durex.Retries(3),
    durex.OnRecover(func(ctx context.Context, cmd *durex.Instance, err error) (durex.Result, error) {
        // Payment failed - spawn compensation commands
        return durex.Spawn(
            durex.Spec{Name: "refundPayment", Data: cmd.Data},
            durex.Spec{Name: "releaseInventory", Data: cmd.Data},
            durex.Spec{Name: "notifyCustomer", Data: durex.M{"error": err.Error()}},
        ), nil
    }),
)

Delayed Execution

executor.Add(ctx, durex.Spec{
    Name:  "sendReminder",
    Delay: 24 * time.Hour,  // Run tomorrow
    Data:  durex.M{"userId": "123"},
})

Deadlines

executor.HandleFunc("timeoutTask", taskFn,
    durex.Deadline(5*time.Minute),
    durex.OnExpired(func(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
        log.Println("Task timed out!")
        return durex.Empty(), nil
    }),
)

Execution Timeouts

Limit how long each command execution can take. Unlike deadlines (which prevent starting after a time), timeouts cancel long-running executions:

// Per-command timeout
executor.Add(ctx, durex.Spec{
    Name:    "slowTask",
    Timeout: 30 * time.Second,  // Cancel if takes longer than 30s
})

// Default timeout for all commands
executor := durex.New(store,
    durex.WithDefaultTimeout(time.Minute),
)

// Command-level override
executor.Add(ctx, durex.Spec{
    Name:    "quickTask",
    Timeout: 5 * time.Second,  // Override default
})

When a timeout occurs:

  • The context passed to your handler is cancelled
  • The command is marked as failed
  • Retries are attempted if configured
  • Your handler should check ctx.Done() for graceful cancellation

Storage Backends

// In-memory (development/testing)
store := storage.NewMemory()

// SQLite (single instance)
store, _ := storage.OpenSQLite("commands.db")
store.Migrate(ctx)

// PostgreSQL (production)
db, _ := sql.Open("postgres", "postgres://...")
store := storage.NewPostgres(db)
store.Migrate(ctx)

Configuration

executor := durex.New(store,
    durex.WithParallelism(8),              // Worker count
    durex.WithDefaultRetries(3),           // Default retries
    durex.WithDefaultTimeout(30*time.Second), // Default execution timeout
    durex.WithCleanupInterval(time.Hour),  // Auto-cleanup
    durex.WithGracefulShutdown(30*time.Second),
    durex.WithDashboard(":8080"),          // Enable web dashboard
    durex.WithDeadLetterQueue(),           // Enable DLQ for failed commands
    durex.WithMiddleware(loggingMiddleware),
    durex.WithBackoff(durex.DefaultExponentialBackoff()), // Retry backoff
    durex.WithRateLimit("sendEmail", 10),  // Max 10 concurrent emails
    durex.WithGlobalRateLimit(100),        // Max 100 total concurrent
    durex.WithStuckCommandRecovery(time.Minute, 5*time.Minute), // Recover stuck commands
)

Backoff Strategies

Control retry timing with configurable backoff:

// Exponential backoff with jitter (recommended for production)
executor := durex.New(store,
    durex.WithBackoff(durex.DefaultExponentialBackoff()),
)

// Custom exponential: 1s → 2s → 4s → 8s... (max 5 min)
executor := durex.New(store,
    durex.WithBackoff(durex.ExponentialBackoff{
        InitialDelay: time.Second,
        MaxDelay:     5 * time.Minute,
        Multiplier:   2.0,
    }),
)

// Add jitter to prevent thundering herd
executor := durex.New(store,
    durex.WithBackoff(durex.JitteredBackoff{
        Strategy:   durex.ExponentialBackoff{InitialDelay: time.Second},
        JitterRate: 0.1, // ±10% randomness
    }),
)

Available strategies:

  • NoBackoff() - Immediate retry (default)
  • ConstantBackoff{Delay: 5*time.Second} - Fixed delay
  • LinearBackoff{InitialDelay: time.Second, MaxDelay: time.Minute} - Linear increase
  • ExponentialBackoff{...} - Exponential increase
  • JitteredBackoff{...} - Wrap any strategy with randomness

Rate Limiting

Control concurrent command execution to prevent overwhelming external services:

executor := durex.New(store,
    durex.WithRateLimit("sendEmail", 10),    // Max 10 concurrent emails
    durex.WithRateLimit("apiCall", 5),       // Max 5 concurrent API calls
    durex.WithGlobalRateLimit(100),          // Max 100 total concurrent
)

Commands will wait for a slot to become available before executing.

Deduplication (Unique Keys)

Prevent duplicate commands from running simultaneously:

// Only one active "welcome email to user123" can exist
executor.Add(ctx, durex.Spec{
    Name:      "sendEmail",
    UniqueKey: "welcome-email:user123",
    Data:      durex.M{"to": "user@example.com"},
})

// Attempting to add another with same key returns ErrDuplicateCommand
_, err := executor.Add(ctx, durex.Spec{
    Name:      "sendEmail",
    UniqueKey: "welcome-email:user123",
})
// err == durex.ErrDuplicateCommand

Use unique keys for:

  • Preventing duplicate notifications
  • Ensuring idempotent operations
  • Rate limiting per-entity (e.g., one sync per user)

Tracing & Correlation

Track related commands across workflows:

// Set trace/correlation IDs on the root command
executor.Add(ctx, durex.Spec{
    Name:          "processOrder",
    TraceID:       "trace-abc123",      // From your tracing system
    CorrelationID: "order-456",         // Links all related commands
    Sequence:      []string{"chargePayment", "shipOrder", "sendConfirmation"},
})

// Access in your command handler
executor.HandleFunc("chargePayment", func(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
    log.Printf("[%s] Processing payment for correlation: %s", 
        cmd.TraceID, cmd.CorrelationID)
    
    // IDs automatically propagate to child commands
    return cmd.ContinueSequence(nil), nil
})

// Query all commands in a workflow
commands, _ := store.FindByCorrelationID(ctx, "order-456")

Middleware

func loggingMiddleware(ctx durex.MiddlewareContext, next func() (durex.Result, error)) (durex.Result, error) {
    start := time.Now()
    result, err := next()
    slog.Info("Command executed", "name", ctx.Command.Name, "duration", time.Since(start))
    return result, err
}

Web Dashboard

Durex includes a built-in real-time monitoring dashboard with zero external dependencies:

// Recommended: enable via option (auto-starts with executor)
executor := durex.New(store,
    durex.WithDashboard(":8080"),
)

// Or start manually
go executor.ServeDashboard(":8080")

// Or integrate with existing server
http.Handle("/durex/", http.StripPrefix("/durex", executor.DashboardHandler()))

The dashboard shows:

  • Live command counts (pending, completed, failed)
  • Rate limit utilization
  • Recent commands table with status, attempts, timing
  • Auto-refreshes every 2 seconds

Multi-Instance Deployment

For horizontal scaling, use PostgreSQL with row-level locking. Durex automatically detects LockingStorage and uses FOR UPDATE SKIP LOCKED to prevent multiple instances from claiming the same command:

// PostgreSQL storage automatically enables locking mode
db, _ := sql.Open("postgres", "postgres://...")
store := storage.NewPostgres(db)
store.Migrate(ctx)

executor := durex.New(store,
    durex.WithParallelism(8),
    durex.WithPollInterval(500*time.Millisecond),  // How often to poll for work
    durex.WithClaimBatchSize(20),                   // Commands claimed per poll
)

This enables safe deployment of multiple executor instances behind a load balancer.

Reliability Features

Panic Recovery

Durex automatically recovers from panics in command handlers. Workers continue processing other commands:

executor.HandleFunc("riskyTask", func(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
    panic("something went wrong")  // Worker survives this!
})

When a panic occurs:

  • The panic is logged with command details
  • The command is marked as FAILED with the panic message
  • The error handler is called (if configured)
  • Workers continue processing other commands
Stuck Command Recovery

Commands can get stuck in STARTED status if a worker crashes or the process restarts. Enable automatic recovery:

executor := durex.New(store,
    durex.WithStuckCommandRecovery(
        time.Minute,      // Check every minute
        5*time.Minute,    // Commands stuck >5 min are recovered
    ),
)

Recovered commands are reset to PENDING and re-executed.

Error Handling

Global error handler for all command failures:

executor := durex.New(store,
    durex.WithErrorHandler(func(cmd *durex.Instance, err error) {
        slog.Error("Command failed",
            "id", cmd.ID,
            "name", cmd.Name,
            "error", err,
        )
        // Send to error tracking service, etc.
    }),
)
Dead Letter Queue

Enable DLQ to preserve failed commands for inspection and replay:

executor := durex.New(store,
    durex.WithDeadLetterQueue(),
)

// Later, inspect failed commands
deadLettered, _ := executor.FindDeadLettered(ctx)

// Replay a specific command
executor.ReplayFromDLQ(ctx, "cmd_abc123")

// Purge old dead-lettered commands
purged, _ := executor.PurgeDLQ(ctx, 7*24*time.Hour) // Older than 7 days
Command Cancellation

Cancel pending commands programmatically:

// Cancel a specific command
executor.Cancel(ctx, "cmd_abc123")

// Cancel all commands with a tag (requires QueryableStorage)
cancelled, _ := executor.CancelByTag(ctx, "batch-123")
Health Endpoint

The dashboard includes a health endpoint for load balancers:

GET /api/health

{
  "status": "healthy",
  "started": true,
  "storage_ok": true,
  "worker_count": 4,
  "queue_depth": 0,
  "timestamp": "2024-01-15T10:30:00Z"
}

Status values: healthy, degraded (shutting down), unhealthy (not started or storage error).

Execution History

Every command automatically tracks its execution history. Query it for debugging and auditing:

history, _ := executor.History(ctx, "cmd_abc123")
for _, event := range history {
    fmt.Printf("%s: %s (attempt %d)\n", event.Timestamp, event.Type, event.Attempt)
}
// Output:
// 2024-01-15 10:30:00: created (attempt 0)
// 2024-01-15 10:30:01: started (attempt 1)
// 2024-01-15 10:30:02: failed (attempt 1)
// 2024-01-15 10:30:03: started (attempt 2)
// 2024-01-15 10:30:04: completed (attempt 2)

Event types: created, started, completed, failed, retrying, expired, cancelled, repeating, recovered

The dashboard also exposes history via API:

GET /api/commands/history?id=cmd_abc123
Prometheus Metrics

Durex includes built-in Prometheus metrics support:

import (
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promhttp"
)

// Create metrics collector
metrics := durex.NewPrometheusMetrics(prometheus.DefaultRegisterer)

// Use with executor
executor := durex.New(store,
    durex.WithMetrics(metrics),
)

// Expose metrics endpoint
http.Handle("/metrics", promhttp.Handler())

Exported metrics:

Metric Type Labels Description
durex_commands_started_total Counter command Total commands started
durex_commands_completed_total Counter command Total commands completed
durex_commands_failed_total Counter command Total commands failed
durex_commands_retried_total Counter command Total command retries
durex_command_duration_seconds Histogram command Command execution duration
durex_queue_size Gauge - Current queue size

Customize namespace and buckets:

metrics := durex.NewPrometheusMetrics(
    prometheus.DefaultRegisterer,
    durex.WithPrometheusNamespace("myapp"),
    durex.WithPrometheusSubsystem("jobs"),
    durex.WithPrometheusBuckets([]float64{0.01, 0.1, 0.5, 1, 5, 10}),
)

Try It (30 seconds)

# Clone and run
git clone https://github.com/simonovic86/durex.git
cd durex/examples/basic
go run main.go

# Open http://localhost:8080 to see the dashboard

Examples

Example Description
examples/basic Simple jobs, retries, repeating tasks, dashboard
examples/workflow E-commerce order flow with sequences and saga

Documentation

Contributing

Contributions welcome! Please open an issue first to discuss what you'd like to change.

License

MIT License - see LICENSE

Documentation

Overview

Package durex provides a durable execution framework for Go.

Durex is a production-grade command pattern implementation that provides persistent task execution with retries, deadlines, and recovery mechanisms.

Basic usage:

executor := durex.New(storage,
	durex.WithParallelism(4),
	durex.WithLogger(slog.Default()),
)

executor.Register(&MyCommand{})
executor.Start(ctx)

executor.Add(ctx, durex.Spec{
	Name: "myCommand",
	Data: durex.M{"key": "value"},
})

Package durex provides a durable background job queue and workflow engine for Go.

Durex is a lightweight, embeddable task queue with persistence, automatic retries, workflow sequences, and saga pattern support. It's an alternative to Asynq, River, and Temporal for teams who want workflow capabilities without infrastructure complexity.

Use SQLite for development, PostgreSQL for production. No Redis or Kafka required.

Key Features

  • Persistent Commands: Commands survive process restarts
  • Automatic Retries: Configurable retry logic with backoff strategies
  • Deadlines: Time-bound execution with expiration handling
  • Command Chaining: Build workflows with sequences
  • Recovery: Custom error handling and compensation
  • Middleware: Extensible execution pipeline
  • Multiple Storage Backends: PostgreSQL, SQLite, Memory
  • Rate Limiting: Control concurrent execution per command type
  • Deduplication: Prevent duplicate commands with unique keys
  • Context Propagation: Trace and correlation IDs across command chains

Basic Usage

Define a command by implementing the Command interface:

type SendEmailCommand struct {
	durex.BaseCommand
	mailer *MailService
}

func (c *SendEmailCommand) Name() string {
	return "sendEmail"
}

func (c *SendEmailCommand) Execute(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
	to := cmd.GetString("to")
	subject := cmd.GetString("subject")
	body := cmd.GetString("body")

	if err := c.mailer.Send(to, subject, body); err != nil {
		return durex.Empty(), err // Will retry if retries > 0
	}

	return durex.Empty(), nil
}

Create an executor and register commands:

storage := storage.NewMemory()
executor := durex.New(storage,
	durex.WithParallelism(4),
	durex.WithDefaultRetries(3),
)

executor.Register(&SendEmailCommand{mailer: mailerService})
executor.Start(ctx)
defer executor.Stop()

Add commands for execution:

executor.Add(ctx, durex.Spec{
	Name: "sendEmail",
	Data: durex.M{
		"to":      "user@example.com",
		"subject": "Welcome!",
		"body":    "Thanks for signing up.",
	},
	Retries: 3,
})

Command Results

Commands return a Result that tells the executor what to do next:

  • durex.Empty(): Command completed, no follow-up actions
  • durex.Repeat(): Reschedule this command to run again after its Period
  • durex.Retry(): Retry immediately (uses retry counter, doesn't trigger Recover)
  • durex.Next(spec): Spawn a single follow-up command
  • durex.Spawn(specs...): Spawn multiple follow-up commands

Command Chaining

Build workflows by chaining commands:

executor.Add(ctx, durex.Spec{
	Name:     "validateOrder",
	Sequence: []string{"processPayment", "shipOrder", "sendConfirmation"},
	Data:     durex.M{"orderId": "12345"},
})

Each command can continue the sequence:

func (c *ValidateOrderCommand) Execute(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
	// Validate order...
	cmd.Set("validated", true)
	return cmd.ContinueSequence(nil), nil
}

Error Handling

Commands can implement the Recoverable interface for custom error handling:

func (c *SendEmailCommand) Recover(ctx context.Context, cmd *durex.Instance, err error) (durex.Result, error) {
	// Log the failure, notify ops, spawn compensation commands
	return durex.Next(durex.Spec{
		Name: "notifyFailure",
		Data: durex.M{"error": err.Error()},
	}), nil
}

Deadlines

Set execution deadlines:

executor.Add(ctx, durex.Spec{
	Name:     "processOrder",
	Deadline: 5 * time.Minute,
})

Commands can implement Expirable to handle deadline expiration:

func (c *ProcessOrderCommand) Expired(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
	// Handle timeout - refund, notify, etc.
	return durex.Empty(), nil
}

Middleware

Add cross-cutting concerns:

executor := durex.New(storage,
	durex.WithMiddleware(
		func(ctx durex.MiddlewareContext, next func() (durex.Result, error)) (durex.Result, error) {
			start := time.Now()
			result, err := next()
			log.Printf("Command %s took %v", ctx.Command.Name, time.Since(start))
			return result, err
		},
	),
)

Storage Backends

Durex supports multiple storage backends:

// In-memory (for testing)
storage := storage.NewMemory()

// SQLite (for single-instance deployments)
storage, _ := storage.OpenSQLite("commands.db")
storage.Migrate(ctx)

// PostgreSQL (for production)
db, _ := sql.Open("postgres", "postgres://...")
storage := storage.NewPostgres(db)
storage.Migrate(ctx)

Backoff Strategies

Configure retry backoff behavior:

executor := durex.New(storage,
	durex.WithBackoff(durex.DefaultExponentialBackoff()),
)

Available strategies:

  • durex.NoBackoff(): Immediate retry (default)
  • durex.ConstantBackoff{Delay: 5 * time.Second}: Fixed delay
  • durex.LinearBackoff{InitialDelay: time.Second, MaxDelay: time.Minute}
  • durex.ExponentialBackoff{InitialDelay: time.Second, MaxDelay: 5 * time.Minute, Multiplier: 2.0}
  • durex.JitteredBackoff{Strategy: ..., JitterRate: 0.1}: Add randomness to prevent thundering herd

Deduplication

Prevent duplicate commands with unique keys:

executor.Add(ctx, durex.Spec{
	Name:      "sendEmail",
	UniqueKey: "email:user123:welcome",  // Only one active command with this key
})

If a non-terminal command with the same UniqueKey exists, Add() returns ErrDuplicateCommand.

Rate Limiting

Control concurrent command execution:

executor := durex.New(storage,
	durex.WithRateLimit("sendEmail", 10),     // Max 10 concurrent emails
	durex.WithRateLimit("apiCall", 5),        // Max 5 concurrent API calls
	durex.WithGlobalRateLimit(100),           // Max 100 total concurrent commands
)

Tracing and Correlation

Commands automatically propagate trace and correlation IDs to child commands:

executor.Add(ctx, durex.Spec{
	Name:          "workflow",
	TraceID:       "trace-123",      // Propagated to all children
	CorrelationID: "correlation-456", // Links related commands
})

Access these in your command:

func (c *MyCommand) Execute(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
	log.Printf("TraceID: %s, CorrelationID: %s", cmd.TraceID, cmd.CorrelationID)
	// ...
}

Web Dashboard

Enable the built-in monitoring dashboard:

// Simple standalone server
go executor.ServeDashboard(":8080")

// Or integrate with existing HTTP server
http.Handle("/durex/", http.StripPrefix("/durex", executor.DashboardHandler()))

Multi-Instance Deployment

For horizontal scaling with PostgreSQL, Durex automatically uses row-level locking:

db, _ := sql.Open("postgres", "postgres://...")
store := storage.NewPostgres(db)
store.Migrate(ctx)

executor := durex.New(store,
	durex.WithPollInterval(500*time.Millisecond),
	durex.WithClaimBatchSize(20),
)

Multiple executor instances can safely run concurrently - each will claim different commands.

Production Considerations

For production deployments:

  • Use PostgreSQL for durability and multi-instance support
  • Configure appropriate parallelism based on workload
  • Set up monitoring using the MetricsCollector interface
  • Implement proper error handling and alerting
  • Use deadlines to prevent runaway commands
  • Consider idempotency in command implementations

See the examples directory for complete working examples.

Example
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/simonovic86/durex"
	"github.com/simonovic86/durex/storage"
)

// GreetCommand sends a greeting.
type GreetCommand struct{}

func (c *GreetCommand) Name() string { return "greet" }

func (c *GreetCommand) Execute(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
	name := cmd.GetString("name")
	fmt.Printf("Hello, %s!\n", name)
	return durex.Empty(), nil
}

func main() {
	// Create in-memory storage
	store := storage.NewMemory()

	// Create executor with options
	executor := durex.New(store,
		durex.WithParallelism(4),
		durex.WithDefaultRetries(3),
	)

	// Register command handlers
	executor.Register(&GreetCommand{})

	// Start processing
	ctx := context.Background()
	executor.Start(ctx)
	defer executor.Stop()

	// Add a command
	executor.Add(ctx, durex.Spec{
		Name: "greet",
		Data: durex.M{"name": "World"},
	})

	// Wait for processing
	time.Sleep(100 * time.Millisecond)
}
Output:

Hello, World!

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	ErrExecutorStopped  = errors.New("durex: executor is stopped")
	ErrExecutorNotReady = errors.New("durex: executor is not started")
)

Common executor errors.

View Source
var (
	ErrNotFound         = errors.New("durex: command not found")
	ErrAlreadyExists    = errors.New("durex: command already exists")
	ErrStorageClosed    = errors.New("durex: storage is closed")
	ErrDuplicateCommand = errors.New("durex: command with this unique key already exists")
)

Common storage errors.

Functions

func CorrelationIDFromContext added in v0.3.0

func CorrelationIDFromContext(ctx context.Context) string

CorrelationIDFromContext returns the correlation ID from the context, or empty string if not set.

func GenerateID

func GenerateID() string

GenerateID creates a new ID using the package-level generator.

func HandleTyped added in v0.2.0

func HandleTyped[T any](e *Executor, name string, fn TypedExecuteFunc[T], opts ...TypedOption[T])

HandleTyped registers a typed command handler. The data from the command instance is automatically unmarshaled to the type T.

Example:

type OrderData struct {
    OrderID string  `json:"orderId"`
    Amount  float64 `json:"amount"`
}

durex.HandleTyped(executor, "processOrder", func(ctx context.Context, data OrderData, cmd *durex.Instance) (durex.Result, error) {
    // data is already typed - no GetString() needed!
    log.Printf("Processing order %s for $%.2f", data.OrderID, data.Amount)
    return durex.Empty(), nil
})

func SetIDGenerator

func SetIDGenerator(gen IDGenerator)

SetIDGenerator sets the package-level ID generator.

func TraceIDFromContext added in v0.3.0

func TraceIDFromContext(ctx context.Context) string

TraceIDFromContext returns the trace ID from the context, or empty string if not set.

func WithCorrelationID added in v0.3.0

func WithCorrelationID(ctx context.Context, correlationID string) context.Context

WithCorrelationID returns a new context with the given correlation ID.

func WithInstance added in v0.3.0

func WithInstance(ctx context.Context, instance *Instance) context.Context

WithInstance returns a new context with the given command instance.

func WithTraceID added in v0.3.0

func WithTraceID(ctx context.Context, traceID string) context.Context

WithTraceID returns a new context with the given trace ID.

Types

type BackoffStrategy added in v0.3.0

type BackoffStrategy interface {
	// NextDelay returns the delay before the next retry attempt.
	// attempt starts at 1 for the first retry.
	NextDelay(attempt int) time.Duration
}

BackoffStrategy calculates the delay before retrying a failed command.

func DefaultExponentialBackoff added in v0.3.0

func DefaultExponentialBackoff() BackoffStrategy

DefaultExponentialBackoff returns a sensible default exponential backoff. Starts at 1 second, doubles each time, max 5 minutes, with 10% jitter.

func NoBackoff added in v0.3.0

func NoBackoff() BackoffStrategy

NoBackoff returns a strategy with zero delay (immediate retry).

type BaseCommand

type BaseCommand struct{}

BaseCommand provides a minimal Command implementation. Embed this in your commands to satisfy the Command interface with sensible defaults, then override only what you need.

Example:

type MyCommand struct {
	durex.BaseCommand
	db *sql.DB
}

func (c *MyCommand) Name() string { return "myCommand" }

func (c *MyCommand) Execute(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
	// your logic here
	return durex.Empty(), nil
}

func (BaseCommand) Default

func (b BaseCommand) Default() Spec

Default returns an empty Spec. Override this to provide defaults.

func (BaseCommand) Execute

func (b BaseCommand) Execute(ctx context.Context, cmd *Instance) (Result, error)

Execute returns Empty. Override this in your command.

func (BaseCommand) Expired

func (b BaseCommand) Expired(ctx context.Context, cmd *Instance) (Result, error)

Expired returns Empty. Override this to handle deadline expiration.

func (BaseCommand) Name

func (b BaseCommand) Name() string

Name returns an empty string. Override this in your command.

func (BaseCommand) Recover

func (b BaseCommand) Recover(ctx context.Context, cmd *Instance, err error) (Result, error)

Recover returns Empty. Override this to handle failures.

type Command

type Command interface {
	// Name returns the unique identifier for this command type.
	// This name is used to register and resolve command handlers.
	Name() string

	// Execute runs the command logic.
	// Returns a Result indicating what should happen next.
	// If an error is returned, the command will be retried (if retries > 0)
	// or marked as failed and Recover will be called.
	Execute(ctx context.Context, cmd *Instance) (Result, error)
}

Command defines the interface that all command handlers must implement.

Commands encapsulate business logic that can be executed asynchronously, persisted, retried on failure, and recovered on system restart.

type CommandRateStats added in v0.3.0

type CommandRateStats struct {
	Limit   int
	Current int
	Waiting int
}

CommandRateStats holds per-command rate statistics.

type ConstantBackoff added in v0.3.0

type ConstantBackoff struct {
	Delay time.Duration
}

ConstantBackoff returns the same delay for every retry.

func (ConstantBackoff) NextDelay added in v0.3.0

func (b ConstantBackoff) NextDelay(attempt int) time.Duration

NextDelay implements BackoffStrategy.

type ContextExtractor added in v0.3.0

type ContextExtractor interface {
	// ExtractTraceID extracts a trace ID from the context.
	ExtractTraceID(ctx context.Context) string

	// ExtractCorrelationID extracts a correlation ID from the context.
	ExtractCorrelationID(ctx context.Context) string
}

ContextExtractor extracts trace/correlation IDs from incoming requests. Implement this interface to integrate with your tracing system.

type ContextInjector added in v0.3.0

type ContextInjector interface {
	// InjectContext injects trace and correlation IDs into the context.
	InjectContext(ctx context.Context, traceID, correlationID string) context.Context
}

ContextInjector injects trace/correlation IDs into outgoing contexts. Implement this interface to integrate with your tracing system.

type DefaultContextExtractor added in v0.3.0

type DefaultContextExtractor struct{}

DefaultContextExtractor uses durex context keys.

func (DefaultContextExtractor) ExtractCorrelationID added in v0.3.0

func (DefaultContextExtractor) ExtractCorrelationID(ctx context.Context) string

ExtractCorrelationID implements ContextExtractor.

func (DefaultContextExtractor) ExtractTraceID added in v0.3.0

func (DefaultContextExtractor) ExtractTraceID(ctx context.Context) string

ExtractTraceID implements ContextExtractor.

type DefaultContextInjector added in v0.3.0

type DefaultContextInjector struct{}

DefaultContextInjector uses durex context keys.

func (DefaultContextInjector) InjectContext added in v0.3.0

func (DefaultContextInjector) InjectContext(ctx context.Context, traceID, correlationID string) context.Context

InjectContext implements ContextInjector.

type DefaultIDGenerator

type DefaultIDGenerator struct {
	// contains filtered or unexported fields
}

DefaultIDGenerator creates IDs using timestamp + random bytes. Format: "cmd_<timestamp_hex>_<random_hex>" Example: "cmd_18d5f3a2b4c_8a7b6c5d4e3f2a1b"

func (*DefaultIDGenerator) Generate

func (g *DefaultIDGenerator) Generate() string

Generate creates a new unique ID.

type Defaulter

type Defaulter interface {
	// Default returns the default Spec for this command type.
	// These values are merged with user-provided values when creating instances.
	Default() Spec
}

Defaulter is an optional interface commands can implement to provide default specification values.

type Duration added in v0.2.0

type Duration = time.Duration

Duration is an alias for time.Duration that allows FuncOption to work without import cycles in user code.

type Event added in v0.8.0

type Event struct {
	// Type is the event type (created, started, completed, etc.).
	Type EventType `json:"type"`

	// Timestamp is when the event occurred.
	Timestamp time.Time `json:"timestamp"`

	// Attempt is the attempt number (for started/failed/completed events).
	Attempt int `json:"attempt,omitempty"`

	// Error contains the error message (for failed events).
	Error string `json:"error,omitempty"`

	// DurationMs is how long the execution took in milliseconds.
	DurationMs int64 `json:"duration_ms,omitempty"`

	// Message contains additional context about the event.
	Message string `json:"message,omitempty"`
}

Event represents a single execution event in a command's history.

type EventType added in v0.8.0

type EventType string

EventType represents the type of execution event.

const (
	EventCreated   EventType = "created"
	EventStarted   EventType = "started"
	EventCompleted EventType = "completed"
	EventFailed    EventType = "failed"
	EventRetrying  EventType = "retrying"
	EventExpired   EventType = "expired"
	EventCancelled EventType = "cancelled"
	EventRepeating EventType = "repeating"
	EventRecovered EventType = "recovered" // Moved to DLQ or recovery handler called
)

Event types for command lifecycle.

type ExecuteFunc added in v0.2.0

type ExecuteFunc func(ctx context.Context, cmd *Instance) (Result, error)

ExecuteFunc is the function signature for command execution.

type Executor

type Executor struct {
	// contains filtered or unexported fields
}

Executor manages command execution with persistence, retries, and scheduling.

func New

func New(storage Storage, opts ...Option) *Executor

New creates a new Executor with the given storage and options.

func (*Executor) Add

func (e *Executor) Add(ctx context.Context, spec Spec) (*Instance, error)

Add queues a new command for execution.

Example
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/simonovic86/durex"
	"github.com/simonovic86/durex/storage"
)

// GreetCommand sends a greeting.
type GreetCommand struct{}

func (c *GreetCommand) Name() string { return "greet" }

func (c *GreetCommand) Execute(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
	name := cmd.GetString("name")
	fmt.Printf("Hello, %s!\n", name)
	return durex.Empty(), nil
}

func main() {
	store := storage.NewMemory()
	executor := durex.New(store)
	executor.Register(&GreetCommand{})
	executor.Start(context.Background())
	defer executor.Stop()

	ctx := context.Background()

	// Add a simple command
	instance, _ := executor.Add(ctx, durex.Spec{
		Name: "greet",
		Data: durex.M{"name": "Alice"},
	})

	fmt.Println("Added command:", instance.Name)
	time.Sleep(100 * time.Millisecond)
}
Output:

Added command: greet
Hello, Alice!
Example (WithRetries)
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/simonovic86/durex"
	"github.com/simonovic86/durex/storage"
)

// GreetCommand sends a greeting.
type GreetCommand struct{}

func (c *GreetCommand) Name() string { return "greet" }

func (c *GreetCommand) Execute(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
	name := cmd.GetString("name")
	fmt.Printf("Hello, %s!\n", name)
	return durex.Empty(), nil
}

func main() {
	store := storage.NewMemory()
	executor := durex.New(store)
	executor.Register(&GreetCommand{})
	executor.Start(context.Background())
	defer executor.Stop()

	ctx := context.Background()

	// Add a command with retries and delay
	executor.Add(ctx, durex.Spec{
		Name:    "greet",
		Data:    durex.M{"name": "Bob"},
		Retries: 3,
		Delay:   time.Second,
	})
}
Example (WithTags)
package main

import (
	"context"
	"fmt"

	"github.com/simonovic86/durex"
	"github.com/simonovic86/durex/storage"
)

// GreetCommand sends a greeting.
type GreetCommand struct{}

func (c *GreetCommand) Name() string { return "greet" }

func (c *GreetCommand) Execute(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
	name := cmd.GetString("name")
	fmt.Printf("Hello, %s!\n", name)
	return durex.Empty(), nil
}

func main() {
	store := storage.NewMemory()
	executor := durex.New(store)
	executor.Register(&GreetCommand{})
	executor.Start(context.Background())
	defer executor.Stop()

	ctx := context.Background()

	// Add a command with tags for categorization
	executor.Add(ctx, durex.Spec{
		Name: "greet",
		Data: durex.M{"name": "Charlie"},
		Tags: []string{"priority:high", "batch:123"},
	})
}

func (*Executor) AddMany

func (e *Executor) AddMany(ctx context.Context, specs ...Spec) ([]*Instance, error)

AddMany queues multiple commands for execution.

func (*Executor) Cancel

func (e *Executor) Cancel(ctx context.Context, id string) error

Cancel cancels a pending command.

func (*Executor) CancelByTag added in v0.5.0

func (e *Executor) CancelByTag(ctx context.Context, tag string) (int, error)

CancelByTag cancels all pending commands with the given tag. Returns the number of commands cancelled. Requires QueryableStorage to support tag queries.

func (*Executor) DashboardHandler added in v0.4.0

func (e *Executor) DashboardHandler() http.Handler

DashboardHandler returns an http.Handler that serves the Durex dashboard. Mount this at your desired path to enable the web dashboard.

Example:

http.Handle("/durex/", http.StripPrefix("/durex", executor.DashboardHandler()))
http.ListenAndServe(":8080", nil)

func (*Executor) FindDeadLettered added in v0.5.0

func (e *Executor) FindDeadLettered(ctx context.Context) ([]*Instance, error)

FindDeadLettered returns all commands in the dead letter queue.

func (*Executor) Get

func (e *Executor) Get(ctx context.Context, id string) (*Instance, error)

Get retrieves a command instance by ID.

func (*Executor) HandleFunc added in v0.2.0

func (e *Executor) HandleFunc(name string, fn ExecuteFunc, opts ...FuncOption) *Executor

HandleFunc registers a function as a command handler. This is the simplest way to create a command.

Example:

executor.HandleFunc("sendEmail", func(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
    to := cmd.GetString("to")
    if err := mailer.Send(to); err != nil {
        return durex.Empty(), err
    }
    return durex.Empty(), nil
})

With options:

executor.HandleFunc("sendEmail", sendEmailFn,
    durex.Retries(3),
    durex.OnRecover(handleFailure),
)
Example
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/simonovic86/durex"
	"github.com/simonovic86/durex/storage"
)

func main() {
	store := storage.NewMemory()
	executor := durex.New(store)

	// Register using HandleFunc for simple handlers
	executor.HandleFunc("sendEmail", func(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
		to := cmd.GetString("to")
		subject := cmd.GetString("subject")
		fmt.Printf("Sending email to %s: %s\n", to, subject)
		return durex.Empty(), nil
	})

	executor.Start(context.Background())
	defer executor.Stop()

	executor.Add(context.Background(), durex.Spec{
		Name: "sendEmail",
		Data: durex.M{
			"to":      "user@example.com",
			"subject": "Welcome!",
		},
	})

	time.Sleep(100 * time.Millisecond)
}
Output:

Sending email to user@example.com: Welcome!

func (*Executor) History added in v0.8.0

func (e *Executor) History(ctx context.Context, id string) ([]Event, error)

History returns the execution history for a command.

func (*Executor) PurgeDLQ added in v0.5.0

func (e *Executor) PurgeDLQ(ctx context.Context, age time.Duration) (int, error)

PurgeDLQ removes dead-lettered commands older than the specified age. Returns the number of commands purged.

func (*Executor) Register

func (e *Executor) Register(cmd Command) *Executor

Register adds a command handler to the executor. Must be called before Start.

func (*Executor) ReplayFromDLQ added in v0.5.0

func (e *Executor) ReplayFromDLQ(ctx context.Context, id string) error

ReplayFromDLQ replays a dead-lettered command. The command is reset to PENDING status and will be executed again.

func (*Executor) ServeDashboard added in v0.4.0

func (e *Executor) ServeDashboard(addr string) error

ServeDashboard starts an HTTP server serving the dashboard on the given address. This is a convenience method for simple deployments. For production, use DashboardHandler() and integrate with your existing server.

Example:

go executor.ServeDashboard(":8080")

func (*Executor) Start

func (e *Executor) Start(ctx context.Context) error

Start begins processing commands. It replays pending commands from storage and starts worker goroutines.

func (*Executor) Stats

func (e *Executor) Stats(ctx context.Context) (*Stats, error)

Stats returns current executor statistics.

func (*Executor) Stop

func (e *Executor) Stop() error

Stop gracefully shuts down the executor. It waits for in-flight commands to complete up to the shutdown timeout.

type Expirable

type Expirable interface {
	// Expired is called when a command's deadline has passed before execution.
	// Use this to handle timeout scenarios gracefully.
	Expired(ctx context.Context, cmd *Instance) (Result, error)
}

Expirable is an optional interface commands can implement to handle deadline expiration.

type ExpiredFunc added in v0.2.0

type ExpiredFunc func(ctx context.Context, cmd *Instance) (Result, error)

ExpiredFunc is the function signature for deadline expiration.

type ExponentialBackoff added in v0.3.0

type ExponentialBackoff struct {
	InitialDelay time.Duration
	MaxDelay     time.Duration
	Multiplier   float64
}

ExponentialBackoff increases delay exponentially with each attempt. Delay = InitialDelay * (Multiplier ^ (attempt - 1))

func (ExponentialBackoff) NextDelay added in v0.3.0

func (b ExponentialBackoff) NextDelay(attempt int) time.Duration

NextDelay implements BackoffStrategy.

type FuncCommand added in v0.2.0

type FuncCommand struct {
	// contains filtered or unexported fields
}

FuncCommand wraps a function as a Command. Created via HandleFunc or NewFunc.

func NewFunc added in v0.2.0

func NewFunc(name string, fn ExecuteFunc, opts ...FuncOption) *FuncCommand

NewFunc creates a new FuncCommand. Use this when you need to configure the command before registering.

Example:

cmd := durex.NewFunc("sendEmail", sendEmailFn,
    durex.Retries(3),
    durex.OnRecover(handleFailure),
)
executor.Handle(cmd)

func (*FuncCommand) Default added in v0.2.0

func (f *FuncCommand) Default() Spec

Default implements Defaulter.

func (*FuncCommand) Execute added in v0.2.0

func (f *FuncCommand) Execute(ctx context.Context, cmd *Instance) (Result, error)

Execute implements Command.

func (*FuncCommand) Expired added in v0.2.0

func (f *FuncCommand) Expired(ctx context.Context, cmd *Instance) (Result, error)

Expired implements Expirable.

func (*FuncCommand) Name added in v0.2.0

func (f *FuncCommand) Name() string

Name implements Command.

func (*FuncCommand) Recover added in v0.2.0

func (f *FuncCommand) Recover(ctx context.Context, cmd *Instance, err error) (Result, error)

Recover implements Recoverable.

type FuncOption added in v0.2.0

type FuncOption func(*FuncCommand)

FuncOption configures a FuncCommand.

func Deadline added in v0.2.0

func Deadline(d Duration) FuncOption

Deadline sets the default deadline.

func OnExpired added in v0.2.0

func OnExpired(fn ExpiredFunc) FuncOption

OnExpired sets the expiration handler.

func OnRecover added in v0.2.0

func OnRecover(fn RecoverFunc) FuncOption

OnRecover sets the recovery function.

func Period added in v0.2.0

func Period(d Duration) FuncOption

Period sets the repeat period for recurring commands.

func Retries added in v0.2.0

func Retries(n int) FuncOption

Retries sets the default retry count.

func Tags added in v0.2.0

func Tags(tags ...string) FuncOption

Tags sets default tags.

type HookedStorage

type HookedStorage struct {
	Storage
	Hooks Hooks
}

HookedStorage wraps a Storage with lifecycle hooks.

func (*HookedStorage) Create

func (h *HookedStorage) Create(ctx context.Context, cmd *Instance) error

Create implements Storage.

func (*HookedStorage) Delete

func (h *HookedStorage) Delete(ctx context.Context, id string) error

Delete implements Storage.

func (*HookedStorage) Update

func (h *HookedStorage) Update(ctx context.Context, cmd *Instance) error

Update implements Storage.

type Hooks

type Hooks struct {
	// AfterCreate is called after a command is created.
	AfterCreate func(ctx context.Context, cmd *Instance)

	// AfterUpdate is called after a command is updated.
	AfterUpdate func(ctx context.Context, cmd *Instance)

	// AfterDelete is called after a command is deleted.
	AfterDelete func(ctx context.Context, id string)

	// OnError is called when a storage operation fails.
	OnError func(ctx context.Context, op string, err error)
}

Hooks allows observing storage operations. Useful for logging, metrics, and debugging.

type IDGenerator

type IDGenerator interface {
	Generate() string
}

IDGenerator generates unique command instance IDs.

type Instance

type Instance struct {
	// ID is the unique identifier for this instance.
	ID string `json:"id"`

	// Name is the command type identifier.
	Name string `json:"name"`

	// Data contains the command payload.
	Data M `json:"data,omitempty"`

	// Status is the current execution state.
	Status Status `json:"status"`

	// Retries is the remaining retry count.
	Retries int `json:"retries"`

	// Sequence is the remaining command chain.
	Sequence []string `json:"sequence,omitempty"`

	// ParentID links to the parent command that spawned this instance.
	ParentID *string `json:"parent_id,omitempty"`

	// Priority determines execution order.
	Priority int `json:"priority"`

	// Tags for categorization.
	Tags []string `json:"tags,omitempty"`

	// UniqueKey for deduplication.
	// If set, only one active command with this key can exist at a time.
	UniqueKey string `json:"unique_key,omitempty"`

	// TraceID for distributed tracing.
	// Propagated from parent commands.
	TraceID string `json:"trace_id,omitempty"`

	// CorrelationID links related commands together.
	// Propagated from parent commands.
	CorrelationID string `json:"correlation_id,omitempty"`

	// CreatedAt is when the instance was created.
	CreatedAt time.Time `json:"created_at"`

	// ReadyAt is when the instance becomes eligible for execution.
	ReadyAt time.Time `json:"ready_at"`

	// StartedAt is when execution began (nil if not started).
	StartedAt *time.Time `json:"started_at,omitempty"`

	// CompletedAt is when execution finished (nil if not completed).
	CompletedAt *time.Time `json:"completed_at,omitempty"`

	// DeadlineAt is the execution deadline (nil if no deadline).
	DeadlineAt *time.Time `json:"deadline_at,omitempty"`

	// Timeout is the maximum execution time per attempt.
	// If the handler doesn't complete within this duration, the context is cancelled.
	Timeout time.Duration `json:"timeout,omitempty"`

	// Period is the repeat interval for recurring commands.
	Period time.Duration `json:"period,omitempty"`

	// Error contains the error message if the command failed.
	Error string `json:"error,omitempty"`

	// Attempt tracks the current attempt number (starts at 1).
	Attempt int `json:"attempt"`

	// Metadata stores additional runtime information.
	Metadata M `json:"metadata,omitempty"`

	// History contains the execution history of this command.
	// Events are appended as the command progresses through its lifecycle.
	History []Event `json:"history,omitempty"`
}

Instance represents a persisted command with runtime state. Instances are created from Specs and track execution progress.

func InstanceFromContext added in v0.3.0

func InstanceFromContext(ctx context.Context) *Instance

InstanceFromContext returns the command instance from the context, or nil if not set.

func (*Instance) Age

func (i *Instance) Age() time.Duration

Age returns how long ago the command was created.

func (*Instance) Clone

func (i *Instance) Clone() *Instance

Clone creates a deep copy of the instance.

func (*Instance) ContinueSequence

func (i *Instance) ContinueSequence(additionalData M) Result

ContinueSequence creates a Result that spawns the next command in the sequence. The accumulated data is passed to the next command. Returns Empty() if the sequence is empty.

Example:

func (c *Step1Command) Execute(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
	cmd.Set("step1_result", "done")
	return cmd.ContinueSequence(nil), nil
}
Example
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/simonovic86/durex"
	"github.com/simonovic86/durex/storage"
)

func main() {
	store := storage.NewMemory()
	executor := durex.New(store)

	executor.HandleFunc("validate", func(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
		fmt.Println("Validating order")
		cmd.Set("validated", true)
		return cmd.ContinueSequence(nil), nil
	})

	executor.HandleFunc("charge", func(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
		fmt.Println("Charging payment")
		return cmd.ContinueSequence(nil), nil
	})

	executor.HandleFunc("ship", func(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
		fmt.Println("Shipping order")
		return durex.Empty(), nil
	})

	executor.Start(context.Background())
	defer executor.Stop()

	// Create a sequence of commands
	executor.Add(context.Background(), durex.Spec{
		Name:     "validate",
		Sequence: []string{"charge", "ship"},
		Data:     durex.M{"order_id": "12345"},
	})

	time.Sleep(100 * time.Millisecond)
}
Output:

Validating order
Charging payment
Shipping order

func (*Instance) Duration

func (i *Instance) Duration() time.Duration

Duration returns how long the command took to execute. Returns 0 if the command hasn't completed.

func (*Instance) Get

func (i *Instance) Get(key string) any

Get retrieves a value from the command data with type assertion. Returns the zero value if the key doesn't exist or type doesn't match.

func (*Instance) GetBool

func (i *Instance) GetBool(key string) bool

GetBool retrieves a bool value from command data.

func (*Instance) GetInt

func (i *Instance) GetInt(key string) int

GetInt retrieves an int value from command data. Handles both int and float64 (from JSON unmarshaling).

func (*Instance) GetMap

func (i *Instance) GetMap(key string) M

GetMap retrieves a map value from command data.

func (*Instance) GetSlice

func (i *Instance) GetSlice(key string) []any

GetSlice retrieves a slice value from command data.

func (*Instance) GetString

func (i *Instance) GetString(key string) string

GetString retrieves a string value from command data.

func (*Instance) HasTag

func (i *Instance) HasTag(tag string) bool

HasTag returns true if the instance has the given tag.

func (*Instance) IsOverdue

func (i *Instance) IsOverdue() bool

IsOverdue returns true if the command has passed its deadline.

func (*Instance) MarshalJSON

func (i *Instance) MarshalJSON() ([]byte, error)

MarshalJSON implements json.Marshaler.

func (*Instance) RecordError added in v0.8.0

func (i *Instance) RecordError(eventType EventType, err error)

RecordError appends a failed event with error details to the command's history.

func (*Instance) RecordEvent added in v0.8.0

func (i *Instance) RecordEvent(eventType EventType, message string)

RecordEvent appends an event to the command's history.

func (*Instance) RecordEventWithDuration added in v0.8.0

func (i *Instance) RecordEventWithDuration(eventType EventType, duration time.Duration, message string)

RecordEventWithDuration appends an event with duration to the command's history.

func (*Instance) Set

func (i *Instance) Set(key string, value any)

Set stores a value in the command data. Note: Changes are not automatically persisted. Call executor.Update() if needed.

func (*Instance) UnmarshalJSON

func (i *Instance) UnmarshalJSON(data []byte) error

UnmarshalJSON implements json.Unmarshaler.

type JitteredBackoff added in v0.3.0

type JitteredBackoff struct {
	Strategy   BackoffStrategy
	JitterRate float64 // 0.0 to 1.0, e.g., 0.1 = ±10% jitter
}

JitteredBackoff wraps another BackoffStrategy and adds random jitter. This helps prevent thundering herd problems.

func (JitteredBackoff) NextDelay added in v0.3.0

func (b JitteredBackoff) NextDelay(attempt int) time.Duration

NextDelay implements BackoffStrategy.

type LinearBackoff added in v0.3.0

type LinearBackoff struct {
	InitialDelay time.Duration
	MaxDelay     time.Duration
}

LinearBackoff increases delay linearly with each attempt. Delay = InitialDelay * attempt

func (LinearBackoff) NextDelay added in v0.3.0

func (b LinearBackoff) NextDelay(attempt int) time.Duration

NextDelay implements BackoffStrategy.

type LockingStorage added in v0.4.0

type LockingStorage interface {
	Storage

	// ClaimPending atomically finds and claims up to limit pending commands.
	// Returns commands that are ready to execute (ReadyAt <= now).
	// Uses row-level locking (FOR UPDATE SKIP LOCKED in PostgreSQL) to prevent
	// multiple executors from claiming the same command.
	// The returned commands have their status set to STARTED.
	ClaimPending(ctx context.Context, limit int) ([]*Instance, error)
}

LockingStorage extends Storage with row-level locking for safe concurrent access. Implement this interface for multi-instance deployments.

type M

type M = map[string]any

M is a convenience alias for command data maps.

type MetricsCollector

type MetricsCollector interface {
	// CommandStarted is called when a command begins execution.
	CommandStarted(name string)

	// CommandCompleted is called when a command finishes successfully.
	CommandCompleted(name string, duration time.Duration)

	// CommandFailed is called when a command fails.
	CommandFailed(name string, err error)

	// CommandRetried is called when a command is retried.
	CommandRetried(name string, attempt int)

	// QueueSize reports the current queue size.
	QueueSize(size int)
}

MetricsCollector receives execution metrics.

type Middleware

type Middleware func(ctx MiddlewareContext, next func() (Result, error)) (Result, error)

Middleware wraps command execution. Return the result of calling next() to continue the chain, or return early to short-circuit execution.

func RateLimitMiddleware added in v0.3.0

func RateLimitMiddleware(limiter *RateLimiter, timeout time.Duration) Middleware

RateLimitMiddleware creates middleware that applies rate limiting. The timeout specifies how long to wait for a slot before giving up.

func TracingMiddleware added in v0.3.0

func TracingMiddleware(injector ContextInjector) Middleware

TracingMiddleware creates middleware that propagates trace and correlation IDs. It injects the instance's trace/correlation IDs into the context before execution.

type MiddlewareContext

type MiddlewareContext struct {
	// Command is the command being executed.
	Command *Instance

	// Handler is the command handler.
	Handler Command

	// Executor is the executor running the command.
	Executor *Executor
}

MiddlewareContext provides context for middleware.

type Option

type Option func(*Executor)

Option configures an Executor.

func WithBackoff added in v0.3.0

func WithBackoff(strategy BackoffStrategy) Option

WithBackoff sets the backoff strategy for retries. Default is NoBackoff() (immediate retry). Use DefaultExponentialBackoff() for production workloads.

func WithClaimBatchSize added in v0.4.0

func WithClaimBatchSize(size int) Option

WithClaimBatchSize sets how many commands each worker claims per poll cycle when using LockingStorage. Default is 10.

func WithCleanupAge

func WithCleanupAge(d time.Duration) Option

WithCleanupAge sets how old completed commands must be before cleanup. Default is 24 hours.

func WithCleanupInterval

func WithCleanupInterval(d time.Duration) Option

WithCleanupInterval sets how often completed commands are cleaned up. Set to 0 to disable automatic cleanup. Default is 1 hour.

func WithDashboard added in v0.5.0

func WithDashboard(addr string) Option

WithDashboard enables the web dashboard on the specified address. The dashboard provides a UI for monitoring commands and executor stats.

Example:

executor := durex.New(store, durex.WithDashboard(":8080"))

The dashboard will be available at http://localhost:8080 after Start().

func WithDeadLetterQueue added in v0.5.0

func WithDeadLetterQueue() Option

WithDeadLetterQueue enables the dead letter queue. When enabled, commands that fail after exhausting all retries are moved to DEAD_LETTER status instead of FAILED. This allows for manual inspection and replay of failed commands.

Use ReplayFromDLQ() to retry a dead-lettered command, and PurgeDLQ() to remove old dead-lettered commands.

func WithDefaultRepeatInterval

func WithDefaultRepeatInterval(d time.Duration) Option

WithDefaultRepeatInterval sets the default period for repeating commands. Default is 1 minute.

func WithDefaultRetries

func WithDefaultRetries(n int) Option

WithDefaultRetries sets the default retry count for commands that don't specify their own. Default is 0 (no retries).

func WithDefaultTimeout added in v0.5.0

func WithDefaultTimeout(d time.Duration) Option

WithDefaultTimeout sets the default execution timeout for commands that don't specify their own. Default is 0 (no timeout).

The timeout limits how long each execution attempt can take. If the handler doesn't complete within this duration, the context is cancelled and the command is treated as failed (will retry if retries remain).

func WithErrorHandler

func WithErrorHandler(handler func(cmd *Instance, err error)) Option

WithErrorHandler sets a custom error handler for command failures. This is called in addition to the command's Recover method.

func WithGlobalRateLimit added in v0.3.0

func WithGlobalRateLimit(maxConcurrent int) Option

WithGlobalRateLimit sets the maximum total concurrent executions across all commands. This is useful for limiting overall system load.

Example:

executor := durex.New(storage,
	durex.WithGlobalRateLimit(100),  // max 100 total concurrent commands
)

func WithGracefulShutdown

func WithGracefulShutdown(d time.Duration) Option

WithGracefulShutdown sets the timeout for graceful shutdown. Default is 30 seconds.

func WithLogger

func WithLogger(logger *slog.Logger) Option

WithLogger sets the logger for the executor. Default is slog.Default().

func WithMaxDelay

func WithMaxDelay(d time.Duration) Option

WithMaxDelay sets the maximum delay for scheduled commands. Commands scheduled further out will be re-queued periodically. Default is 24 hours.

func WithMetrics

func WithMetrics(collector MetricsCollector) Option

WithMetrics enables metrics collection. The provided MetricsCollector will receive execution metrics.

func WithMiddleware

func WithMiddleware(mw ...Middleware) Option

WithMiddleware adds middleware to the execution pipeline. Middleware is executed in the order added.

Example
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/simonovic86/durex"
	"github.com/simonovic86/durex/storage"
)

func main() {
	store := storage.NewMemory()

	// Logging middleware
	loggingMiddleware := func(ctx durex.MiddlewareContext, next func() (durex.Result, error)) (durex.Result, error) {
		fmt.Printf("Starting: %s\n", ctx.Command.Name)
		result, err := next()
		fmt.Printf("Finished: %s\n", ctx.Command.Name)
		return result, err
	}

	executor := durex.New(store,
		durex.WithMiddleware(loggingMiddleware),
	)

	executor.HandleFunc("task", func(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
		fmt.Println("Executing task")
		return durex.Empty(), nil
	})

	executor.Start(context.Background())
	defer executor.Stop()

	executor.Add(context.Background(), durex.Spec{Name: "task"})

	time.Sleep(100 * time.Millisecond)
}
Output:

Starting: task
Executing task
Finished: task

func WithParallelism

func WithParallelism(n int) Option

WithParallelism sets the number of concurrent command workers. Default is 4.

func WithPermanentCommands

func WithPermanentCommands(names ...string) Option

WithPermanentCommands sets commands that should always be running. These commands are started on executor Start() and restarted if they fail.

func WithPollInterval added in v0.4.0

func WithPollInterval(d time.Duration) Option

WithPollInterval sets how often workers poll for new commands when using LockingStorage (multi-instance mode). Default is 1 second.

func WithQueueSize

func WithQueueSize(size int) Option

WithQueueSize sets the internal queue buffer size. Default is 1000.

func WithRateLimit added in v0.3.0

func WithRateLimit(commandName string, maxConcurrent int) Option

WithRateLimit sets the maximum concurrent executions for a specific command type. This creates or updates the executor's rate limiter.

Example:

executor := durex.New(storage,
	durex.WithRateLimit("sendEmail", 10),    // max 10 concurrent emails
	durex.WithRateLimit("processOrder", 5),  // max 5 concurrent orders
)

func WithRateLimiter added in v0.3.0

func WithRateLimiter(limiter *RateLimiter) Option

WithRateLimiter sets a custom rate limiter for the executor. Use this when you need fine-grained control over rate limiting.

func WithStuckCommandRecovery added in v0.5.0

func WithStuckCommandRecovery(checkInterval, threshold time.Duration) Option

WithStuckCommandRecovery enables automatic recovery of stuck commands. Stuck commands are those in STARTED status for longer than threshold, which may indicate a worker crash or process restart.

checkInterval: how often to check for stuck commands threshold: how long a command must be in STARTED status to be considered stuck

By default, stuck command recovery is disabled. Recommended settings: checkInterval=1m, threshold=5m

type PrometheusMetrics added in v0.7.0

type PrometheusMetrics struct {
	// contains filtered or unexported fields
}

PrometheusMetrics implements MetricsCollector using Prometheus metrics.

func NewPrometheusMetrics added in v0.7.0

func NewPrometheusMetrics(
	registry prometheus.Registerer,
	opts ...PrometheusOption,
) *PrometheusMetrics

NewPrometheusMetrics creates a new PrometheusMetrics collector. The metrics are automatically registered with the provided registry. If registry is nil, prometheus.DefaultRegisterer is used.

func (*PrometheusMetrics) Collect added in v0.7.0

func (m *PrometheusMetrics) Collect(ch chan<- prometheus.Metric)

Collect implements prometheus.Collector.

func (*PrometheusMetrics) Collectors added in v0.7.0

func (m *PrometheusMetrics) Collectors() []prometheus.Collector

Collectors returns all Prometheus collectors for manual registration. Use this if you need more control over registration.

func (*PrometheusMetrics) CommandCompleted added in v0.7.0

func (m *PrometheusMetrics) CommandCompleted(name string, duration time.Duration)

CommandCompleted implements MetricsCollector.

func (*PrometheusMetrics) CommandFailed added in v0.7.0

func (m *PrometheusMetrics) CommandFailed(name string, _ error)

CommandFailed implements MetricsCollector.

func (*PrometheusMetrics) CommandRetried added in v0.7.0

func (m *PrometheusMetrics) CommandRetried(name string, _ int)

CommandRetried implements MetricsCollector.

func (*PrometheusMetrics) CommandStarted added in v0.7.0

func (m *PrometheusMetrics) CommandStarted(name string)

CommandStarted implements MetricsCollector.

func (*PrometheusMetrics) Describe added in v0.7.0

func (m *PrometheusMetrics) Describe(ch chan<- *prometheus.Desc)

Describe implements prometheus.Collector.

func (*PrometheusMetrics) QueueSize added in v0.7.0

func (m *PrometheusMetrics) QueueSize(size int)

QueueSize implements MetricsCollector.

type PrometheusOption added in v0.7.0

type PrometheusOption func(*prometheusConfig)

PrometheusOption configures PrometheusMetrics.

func WithPrometheusBuckets added in v0.7.0

func WithPrometheusBuckets(buckets []float64) PrometheusOption

WithPrometheusBuckets sets custom histogram buckets for duration metrics.

func WithPrometheusNamespace added in v0.7.0

func WithPrometheusNamespace(namespace string) PrometheusOption

WithPrometheusNamespace sets the namespace for all metrics.

func WithPrometheusSubsystem added in v0.7.0

func WithPrometheusSubsystem(subsystem string) PrometheusOption

WithPrometheusSubsystem sets the subsystem for all metrics.

type Query

type Query struct {
	// Status filters by command status.
	Status *Status

	// Name filters by command name.
	Name *string

	// ParentID filters by parent command.
	ParentID *string

	// Tags filters by tags (commands must have all specified tags).
	Tags []string

	// CreatedAfter filters commands created after this time.
	CreatedAfter *time.Time

	// CreatedBefore filters commands created before this time.
	CreatedBefore *time.Time

	// Limit restricts the number of results.
	Limit int

	// Offset skips the first N results.
	Offset int

	// OrderBy specifies the sort field.
	OrderBy string

	// OrderDesc sorts in descending order.
	OrderDesc bool
}

Query represents a flexible query for finding commands.

type QueryableStorage

type QueryableStorage interface {
	Storage

	// Find returns commands matching the query.
	Find(ctx context.Context, query Query) ([]*Instance, error)
}

QueryableStorage extends Storage with advanced query capabilities. Implement this interface for full-featured storage backends.

type RateLimitStats added in v0.3.0

type RateLimitStats struct {
	GlobalLimit   int
	GlobalCurrent int
	Commands      map[string]CommandRateStats
}

RateLimitStats holds rate limiter statistics.

type RateLimiter added in v0.3.0

type RateLimiter struct {
	// contains filtered or unexported fields
}

RateLimiter controls concurrent execution of commands.

func NewRateLimiter added in v0.3.0

func NewRateLimiter() *RateLimiter

NewRateLimiter creates a new rate limiter.

func (*RateLimiter) Acquire added in v0.3.0

func (r *RateLimiter) Acquire(ctx context.Context, name string) (release func(), err error)

Acquire attempts to acquire a slot for the given command. It blocks until a slot is available or the context is canceled. Returns a release function that must be called when the command completes.

func (*RateLimiter) Current added in v0.3.0

func (r *RateLimiter) Current(name string) int

Current returns the current concurrent count for a command.

func (*RateLimiter) GlobalCurrent added in v0.3.0

func (r *RateLimiter) GlobalCurrent() int

GlobalCurrent returns the total concurrent count across all commands.

func (*RateLimiter) SetGlobalLimit added in v0.3.0

func (r *RateLimiter) SetGlobalLimit(limit int)

SetGlobalLimit sets the maximum total concurrent executions across all commands. Use 0 to remove the limit.

func (*RateLimiter) SetLimit added in v0.3.0

func (r *RateLimiter) SetLimit(name string, limit int)

SetLimit sets the maximum concurrent executions for a command type. Use 0 to remove the limit.

func (*RateLimiter) Stats added in v0.3.0

func (r *RateLimiter) Stats() RateLimitStats

Stats returns rate limiter statistics.

func (*RateLimiter) TryAcquire added in v0.3.0

func (r *RateLimiter) TryAcquire(name string) (release func(), ok bool)

TryAcquire attempts to acquire a slot without blocking. Returns the release function and true if successful, nil and false otherwise.

type RecoverFunc added in v0.2.0

type RecoverFunc func(ctx context.Context, cmd *Instance, err error) (Result, error)

RecoverFunc is the function signature for error recovery.

type Recoverable

type Recoverable interface {
	// Recover is called when a command fails after exhausting all retries.
	// Use this to clean up resources, notify systems, or spawn compensating commands.
	Recover(ctx context.Context, cmd *Instance, err error) (Result, error)
}

Recoverable is an optional interface commands can implement to handle failures after all retries are exhausted.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry manages command handler registration and resolution.

func NewRegistry

func NewRegistry() *Registry

NewRegistry creates a new command registry.

func (*Registry) Clear

func (r *Registry) Clear()

Clear removes all registered handlers.

func (*Registry) Count

func (r *Registry) Count() int

Count returns the number of registered handlers.

func (*Registry) Has

func (r *Registry) Has(name string) bool

Has returns true if a handler is registered for the given name.

func (*Registry) MustRegister

func (r *Registry) MustRegister(cmd Command)

MustRegister is like Register but allows overwriting existing handlers. Useful for testing or dynamic reconfiguration.

func (*Registry) Names

func (r *Registry) Names() []string

Names returns a list of all registered command names.

func (*Registry) Register

func (r *Registry) Register(cmd Command)

Register adds a command handler to the registry. Panics if a handler with the same name is already registered.

func (*Registry) Resolve

func (r *Registry) Resolve(name string) (Command, error)

Resolve returns the command handler for the given name. Returns an error if no handler is registered.

func (*Registry) Unregister

func (r *Registry) Unregister(name string) bool

Unregister removes a command handler from the registry. Returns true if a handler was removed.

type Result

type Result struct {
	// Commands to spawn after this command completes.
	// Child commands will have their ParentID set to this command's ID.
	Commands []Spec

	// Repeat signals that this command should be rescheduled.
	// The command will run again after its Period duration.
	// Use this for recurring tasks like cleanup jobs or polling.
	Repeat bool

	// Retry signals that this command should retry immediately.
	// Only effective if the command has remaining retries.
	// Unlike returning an error, Retry doesn't trigger Recover.
	Retry bool
}

Result represents the outcome of command execution. It tells the executor what to do next.

func Empty

func Empty() Result

Empty returns a Result that halts the execution chain. Use this when the command completes successfully with no follow-up.

func Next

func Next(spec Spec) Result

Next returns a Result that spawns a single follow-up command. This is a convenience wrapper around Result{Commands: []Spec{spec}}.

Example:

func (c *Step1Command) Execute(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
	// step 1 logic...
	return durex.Next(durex.Spec{
		Name: "step2Command",
		Data: cmd.Data,
	}), nil
}
Example
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/simonovic86/durex"
	"github.com/simonovic86/durex/storage"
)

func main() {
	store := storage.NewMemory()
	executor := durex.New(store)

	// Step 1 spawns Step 2
	executor.HandleFunc("step1", func(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
		fmt.Println("Step 1 complete")
		return durex.Next(durex.Spec{
			Name: "step2",
			Data: durex.M{"from": "step1"},
		}), nil
	})

	executor.HandleFunc("step2", func(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
		fmt.Println("Step 2 complete")
		return durex.Empty(), nil
	})

	executor.Start(context.Background())
	defer executor.Stop()

	executor.Add(context.Background(), durex.Spec{Name: "step1"})

	time.Sleep(100 * time.Millisecond)
}
Output:

Step 1 complete
Step 2 complete

func Repeat

func Repeat() Result

Repeat returns a Result that reschedules the command. The command will execute again after its Period duration.

Example:

func (c *CleanupCommand) Execute(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
	// cleanup logic...
	return durex.Repeat(), nil  // run again after period
}
Example
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/simonovic86/durex"
	"github.com/simonovic86/durex/storage"
)

func main() {
	store := storage.NewMemory()
	executor := durex.New(store)

	count := 0
	executor.HandleFunc("heartbeat", func(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
		count++
		fmt.Printf("Heartbeat #%d\n", count)
		if count >= 2 {
			return durex.Empty(), nil // Stop repeating
		}
		return durex.Repeat(), nil
	})

	executor.Start(context.Background())
	defer executor.Stop()

	// Add repeating command with 50ms period
	executor.Add(context.Background(), durex.Spec{
		Name:   "heartbeat",
		Period: 50 * time.Millisecond,
	})

	time.Sleep(150 * time.Millisecond)
}
Output:

Heartbeat #1
Heartbeat #2

func Retry

func Retry() Result

Retry returns a Result that retries the command. This decrements the retry counter without triggering Recover. If no retries remain, the command completes normally.

Use Retry for expected transient failures where you want explicit control over retry behavior.

Example:

func (c *SendCommand) Execute(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
	err := c.send()
	if isTransient(err) {
		return durex.Retry(), nil  // retry without triggering Recover
	}
	if err != nil {
		return durex.Empty(), err  // triggers Recover after retries exhausted
	}
	return durex.Empty(), nil
}

func Spawn

func Spawn(specs ...Spec) Result

Spawn returns a Result that creates multiple follow-up commands.

Example:

func (c *FanOutCommand) Execute(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
	items := cmd.Data["items"].([]string)
	specs := make([]durex.Spec, len(items))
	for i, item := range items {
		specs[i] = durex.Spec{
			Name: "processItemCommand",
			Data: durex.M{"item": item},
		}
	}
	return durex.Spawn(specs...), nil
}
Example
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/simonovic86/durex"
	"github.com/simonovic86/durex/storage"
)

func main() {
	store := storage.NewMemory()
	executor := durex.New(store)

	// Fan-out: process multiple items in parallel
	executor.HandleFunc("fanOut", func(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
		items := []string{"a", "b", "c"}
		specs := make([]durex.Spec, len(items))
		for i, item := range items {
			specs[i] = durex.Spec{
				Name: "process",
				Data: durex.M{"item": item},
			}
		}
		return durex.Spawn(specs...), nil
	})

	executor.HandleFunc("process", func(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
		item := cmd.GetString("item")
		fmt.Printf("Processing: %s\n", item)
		return durex.Empty(), nil
	})

	executor.Start(context.Background())
	defer executor.Stop()

	executor.Add(context.Background(), durex.Spec{Name: "fanOut"})

	time.Sleep(100 * time.Millisecond)
}

type SimpleIDGenerator

type SimpleIDGenerator struct {
	// contains filtered or unexported fields
}

SimpleIDGenerator generates simple incremental IDs. Useful for testing but not recommended for production.

func NewSimpleIDGenerator

func NewSimpleIDGenerator(prefix string) *SimpleIDGenerator

NewSimpleIDGenerator creates a new SimpleIDGenerator with the given prefix.

func (*SimpleIDGenerator) Generate

func (g *SimpleIDGenerator) Generate() string

Generate creates a new incremental ID.

type Spec

type Spec struct {
	// Name is the command type identifier.
	// Must match a registered command handler's Name().
	Name string `json:"name"`

	// Data contains the command payload.
	// This data is persisted and available during execution.
	Data M `json:"data,omitempty"`

	// Delay postpones command execution by the specified duration.
	// The command will be scheduled to run after this delay.
	Delay time.Duration `json:"delay,omitempty"`

	// Period sets the interval for repeating commands.
	// When a command returns Repeat(), it will be rescheduled after this duration.
	// If not set, defaults to executor's DefaultRepeatInterval.
	Period time.Duration `json:"period,omitempty"`

	// Timeout sets the maximum execution time per attempt.
	// If the handler doesn't complete within this duration, the context is cancelled
	// and the command is treated as failed (will retry if retries remain).
	// This is different from Deadline which prevents starting after a time.
	Timeout time.Duration `json:"timeout,omitempty"`

	// Deadline sets a relative deadline from now.
	// If the command hasn't started by this time, Expired() is called instead.
	// Takes precedence over DeadlineAt if both are set.
	Deadline time.Duration `json:"deadline,omitempty"`

	// DeadlineAt sets an absolute deadline.
	// If the command hasn't started by this time, Expired() is called instead.
	DeadlineAt *time.Time `json:"deadline_at,omitempty"`

	// Retries is the number of retry attempts on failure.
	// When a command returns an error, it will be retried up to this many times.
	// After retries are exhausted, Recover() is called.
	Retries int `json:"retries,omitempty"`

	// Sequence defines a chain of commands to execute in order.
	// When the current command completes, the next command in the sequence
	// is automatically spawned with the accumulated data.
	Sequence []string `json:"sequence,omitempty"`

	// Priority sets the command's priority (higher = more important).
	// Commands with higher priority are executed first when multiple
	// commands are ready. Default is 0.
	Priority int `json:"priority,omitempty"`

	// Tags are optional labels for categorization and filtering.
	Tags []string `json:"tags,omitempty"`

	// UniqueKey prevents duplicate commands with the same key.
	// If a non-terminal command with this key already exists, Add() returns
	// ErrDuplicateCommand instead of creating a new instance.
	// Useful for ensuring idempotency (e.g., "send-email:user123").
	UniqueKey string `json:"unique_key,omitempty"`

	// TraceID is used for distributed tracing.
	// Automatically propagated to child commands.
	TraceID string `json:"trace_id,omitempty"`

	// CorrelationID links related commands together.
	// Automatically propagated to child commands.
	// If not set, defaults to the root command's ID.
	CorrelationID string `json:"correlation_id,omitempty"`
}

Spec defines the specification for creating a command instance. Use this when adding new commands to the executor.

func Typed added in v0.2.0

func Typed[T any](name string, data T) Spec

Typed creates a Spec with typed data. The data struct is automatically converted to a map.

Example:

executor.Add(ctx, durex.Typed("sendEmail", EmailData{
    To:      "user@example.com",
    Subject: "Welcome!",
}))

func (Spec) WithCorrelationID added in v0.3.0

func (s Spec) WithCorrelationID(correlationID string) Spec

WithCorrelationID returns a copy of the Spec with the given correlation ID. The correlation ID is propagated to all child commands.

func (Spec) WithData

func (s Spec) WithData(data M) Spec

WithData returns a copy of the Spec with the given data merged in.

func (Spec) WithDeadline

func (s Spec) WithDeadline(d time.Duration) Spec

WithDeadline returns a copy of the Spec with the given deadline.

func (Spec) WithDelay

func (s Spec) WithDelay(d time.Duration) Spec

WithDelay returns a copy of the Spec with the given delay.

func (Spec) WithPriority

func (s Spec) WithPriority(p int) Spec

WithPriority returns a copy of the Spec with the given priority.

func (Spec) WithRetries

func (s Spec) WithRetries(n int) Spec

WithRetries returns a copy of the Spec with the given retry count.

func (Spec) WithTags

func (s Spec) WithTags(tags ...string) Spec

WithTags returns a copy of the Spec with the given tags.

func (Spec) WithTimeout added in v0.5.0

func (s Spec) WithTimeout(d time.Duration) Spec

WithTimeout returns a copy of the Spec with the given execution timeout. The timeout limits how long each execution attempt can take.

func (Spec) WithTraceID added in v0.3.0

func (s Spec) WithTraceID(traceID string) Spec

WithTraceID returns a copy of the Spec with the given trace ID. The trace ID is propagated to all child commands.

func (Spec) WithUniqueKey added in v0.3.0

func (s Spec) WithUniqueKey(key string) Spec

WithUniqueKey returns a copy of the Spec with the given unique key. Commands with the same unique key cannot be added while one is still active.

type Stats

type Stats struct {
	Pending            int64
	Completed          int64
	Failed             int64
	DeadLetter         int64
	Repeating          int64
	QueueSize          int
	RegisteredCommands int
	WorkerCount        int
	RateLimit          *RateLimitStats
}

Stats holds executor statistics.

type Status

type Status string

Status represents the execution state of a command instance.

const (
	// StatusPending indicates the command is waiting to be executed.
	StatusPending Status = "PENDING"

	// StatusStarted indicates the command is currently executing.
	StatusStarted Status = "STARTED"

	// StatusCompleted indicates the command finished successfully.
	StatusCompleted Status = "COMPLETED"

	// StatusFailed indicates the command failed after exhausting retries.
	StatusFailed Status = "FAILED"

	// StatusExpired indicates the command's deadline passed before execution.
	StatusExpired Status = "EXPIRED"

	// StatusRepeating indicates the command is scheduled to repeat.
	StatusRepeating Status = "REPEATING"

	// StatusCancelled indicates the command was cancelled before completion.
	StatusCancelled Status = "CANCELLED"

	// StatusDeadLetter indicates the command failed permanently and was moved to DLQ.
	// Commands in DLQ can be inspected and replayed manually.
	StatusDeadLetter Status = "DEAD_LETTER"
)

func (Status) IsActive

func (s Status) IsActive() bool

IsActive returns true if the status represents an active/running state.

func (Status) IsTerminal

func (s Status) IsTerminal() bool

IsTerminal returns true if the status represents a final state.

func (Status) String

func (s Status) String() string

String returns the string representation of the status.

type Storage

type Storage interface {
	// Create persists a new command instance.
	// Returns ErrAlreadyExists if an instance with the same ID exists.
	Create(ctx context.Context, cmd *Instance) error

	// Update saves changes to an existing command instance.
	// Returns ErrNotFound if the instance doesn't exist.
	Update(ctx context.Context, cmd *Instance) error

	// Delete removes a command instance by ID.
	// Returns nil if the instance doesn't exist (idempotent).
	Delete(ctx context.Context, id string) error

	// Get retrieves a command instance by ID.
	// Returns ErrNotFound if the instance doesn't exist.
	Get(ctx context.Context, id string) (*Instance, error)

	// FindPending returns all commands that are ready for execution.
	// This includes commands with status PENDING, STARTED, or REPEATING
	// where ReadyAt <= now.
	FindPending(ctx context.Context) ([]*Instance, error)

	// FindByStatus returns commands with the given status.
	FindByStatus(ctx context.Context, status Status) ([]*Instance, error)

	// FindByParent returns all child commands of the given parent.
	FindByParent(ctx context.Context, parentID string) ([]*Instance, error)

	// FindByUniqueKey returns an active command with the given unique key.
	// Returns ErrNotFound if no active command with this key exists.
	// Only searches non-terminal statuses (PENDING, STARTED, REPEATING).
	FindByUniqueKey(ctx context.Context, key string) (*Instance, error)

	// Cleanup removes completed/failed/expired commands older than the given age.
	// Returns the number of commands deleted.
	Cleanup(ctx context.Context, olderThan time.Duration) (int64, error)

	// Count returns the total number of commands, optionally filtered by status.
	Count(ctx context.Context, status *Status) (int64, error)

	// Close releases any resources held by the storage.
	Close() error
}

Storage defines the interface for command persistence. Implementations must be safe for concurrent use.

type Transaction

type Transaction interface {
	Storage

	// Commit commits the transaction.
	Commit() error

	// Rollback aborts the transaction.
	Rollback() error
}

Transaction represents a storage transaction.

type TransactionalStorage

type TransactionalStorage interface {
	Storage

	// Begin starts a new transaction.
	Begin(ctx context.Context) (Transaction, error)
}

TransactionalStorage extends Storage with transaction support. Implement this interface for ACID-compliant storage backends.

type TypedCommand added in v0.2.0

type TypedCommand[T any] struct {
	// contains filtered or unexported fields
}

TypedCommand wraps a typed function as a Command.

func NewTyped added in v0.2.0

func NewTyped[T any](name string, fn TypedExecuteFunc[T], opts ...TypedOption[T]) *TypedCommand[T]

NewTyped creates a new typed command.

Example:

type EmailData struct {
    To      string `json:"to"`
    Subject string `json:"subject"`
}

cmd := durex.NewTyped("sendEmail", func(ctx context.Context, data EmailData, cmd *durex.Instance) (durex.Result, error) {
    return mailer.Send(data.To, data.Subject), nil
})

func (*TypedCommand[T]) Default added in v0.2.0

func (c *TypedCommand[T]) Default() Spec

Default implements Defaulter.

func (*TypedCommand[T]) Execute added in v0.2.0

func (c *TypedCommand[T]) Execute(ctx context.Context, cmd *Instance) (Result, error)

Execute implements Command.

func (*TypedCommand[T]) Name added in v0.2.0

func (c *TypedCommand[T]) Name() string

Name implements Command.

func (*TypedCommand[T]) Recover added in v0.2.0

func (c *TypedCommand[T]) Recover(ctx context.Context, cmd *Instance, err error) (Result, error)

Recover implements Recoverable.

type TypedExecuteFunc added in v0.2.0

type TypedExecuteFunc[T any] func(ctx context.Context, data T, cmd *Instance) (Result, error)

TypedExecuteFunc is a function that receives typed data.

type TypedOption added in v0.2.0

type TypedOption[T any] func(*TypedCommand[T])

TypedOption configures a TypedCommand.

func WithDeadline added in v0.2.0

func WithDeadline[T any](d time.Duration) TypedOption[T]

WithDeadline sets the default deadline for typed commands.

func WithPeriod added in v0.2.0

func WithPeriod[T any](d time.Duration) TypedOption[T]

WithPeriod sets the repeat period for typed commands.

func WithRecover added in v0.2.0

func WithRecover[T any](fn TypedRecoverFunc[T]) TypedOption[T]

WithRecover sets the recovery function for typed commands.

func WithRetries added in v0.2.0

func WithRetries[T any](n int) TypedOption[T]

WithRetries sets the default retry count for typed commands.

type TypedRecoverFunc added in v0.2.0

type TypedRecoverFunc[T any] func(ctx context.Context, data T, cmd *Instance, err error) (Result, error)

TypedRecoverFunc is a recovery function that receives typed data.

type ULIDGenerator

type ULIDGenerator struct{}

ULIDGenerator generates ULID-style IDs. Lexicographically sortable and URL-safe.

func (*ULIDGenerator) Generate

func (g *ULIDGenerator) Generate() string

Generate creates a new ULID.

Directories

Path Synopsis
examples
basic command
Example: Basic durex usage with functional commands
Example: Basic durex usage with functional commands
workflow command
Example: E-commerce order processing with typed commands
Example: E-commerce order processing with typed commands
Package storage provides storage implementations for durex.
Package storage provides storage implementations for durex.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL