Documentation
¶
Overview ¶
Package durex provides a durable execution framework for Go.
Durex is a production-grade command pattern implementation that provides persistent task execution with retries, deadlines, and recovery mechanisms.
Basic usage:
executor := durex.New(storage,
durex.WithParallelism(4),
durex.WithLogger(slog.Default()),
)
executor.Register(&MyCommand{})
executor.Start(ctx)
executor.Add(ctx, durex.Spec{
Name: "myCommand",
Data: durex.M{"key": "value"},
})
Package durex provides a durable background job queue and workflow engine for Go.
Durex is a lightweight, embeddable task queue with persistence, automatic retries, workflow sequences, and saga pattern support. It's an alternative to Asynq, River, and Temporal for teams who want workflow capabilities without infrastructure complexity.
Use SQLite for development, PostgreSQL for production. No Redis or Kafka required.
Key Features ¶
- Persistent Commands: Commands survive process restarts
- Automatic Retries: Configurable retry logic with backoff strategies
- Deadlines: Time-bound execution with expiration handling
- Command Chaining: Build workflows with sequences
- Recovery: Custom error handling and compensation
- Middleware: Extensible execution pipeline
- Multiple Storage Backends: PostgreSQL, SQLite, Memory
- Rate Limiting: Control concurrent execution per command type
- Deduplication: Prevent duplicate commands with unique keys
- Context Propagation: Trace and correlation IDs across command chains
Basic Usage ¶
Define a command by implementing the Command interface:
type SendEmailCommand struct {
durex.BaseCommand
mailer *MailService
}
func (c *SendEmailCommand) Name() string {
return "sendEmail"
}
func (c *SendEmailCommand) Execute(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
to := cmd.GetString("to")
subject := cmd.GetString("subject")
body := cmd.GetString("body")
if err := c.mailer.Send(to, subject, body); err != nil {
return durex.Empty(), err // Will retry if retries > 0
}
return durex.Empty(), nil
}
Create an executor and register commands:
storage := storage.NewMemory()
executor := durex.New(storage,
durex.WithParallelism(4),
durex.WithDefaultRetries(3),
)
executor.Register(&SendEmailCommand{mailer: mailerService})
executor.Start(ctx)
defer executor.Stop()
Add commands for execution:
executor.Add(ctx, durex.Spec{
Name: "sendEmail",
Data: durex.M{
"to": "user@example.com",
"subject": "Welcome!",
"body": "Thanks for signing up.",
},
Retries: 3,
})
Command Results ¶
Commands return a Result that tells the executor what to do next:
- durex.Empty(): Command completed, no follow-up actions
- durex.Repeat(): Reschedule this command to run again after its Period
- durex.Retry(): Retry immediately (uses retry counter, doesn't trigger Recover)
- durex.Next(spec): Spawn a single follow-up command
- durex.Spawn(specs...): Spawn multiple follow-up commands
Command Chaining ¶
Build workflows by chaining commands:
executor.Add(ctx, durex.Spec{
Name: "validateOrder",
Sequence: []string{"processPayment", "shipOrder", "sendConfirmation"},
Data: durex.M{"orderId": "12345"},
})
Each command can continue the sequence:
func (c *ValidateOrderCommand) Execute(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
// Validate order...
cmd.Set("validated", true)
return cmd.ContinueSequence(nil), nil
}
Error Handling ¶
Commands can implement the Recoverable interface for custom error handling:
func (c *SendEmailCommand) Recover(ctx context.Context, cmd *durex.Instance, err error) (durex.Result, error) {
// Log the failure, notify ops, spawn compensation commands
return durex.Next(durex.Spec{
Name: "notifyFailure",
Data: durex.M{"error": err.Error()},
}), nil
}
Deadlines ¶
Set execution deadlines:
executor.Add(ctx, durex.Spec{
Name: "processOrder",
Deadline: 5 * time.Minute,
})
Commands can implement Expirable to handle deadline expiration:
func (c *ProcessOrderCommand) Expired(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
// Handle timeout - refund, notify, etc.
return durex.Empty(), nil
}
Middleware ¶
Add cross-cutting concerns:
executor := durex.New(storage,
durex.WithMiddleware(
func(ctx durex.MiddlewareContext, next func() (durex.Result, error)) (durex.Result, error) {
start := time.Now()
result, err := next()
log.Printf("Command %s took %v", ctx.Command.Name, time.Since(start))
return result, err
},
),
)
Storage Backends ¶
Durex supports multiple storage backends:
// In-memory (for testing)
storage := storage.NewMemory()
// SQLite (for single-instance deployments)
storage, _ := storage.OpenSQLite("commands.db")
storage.Migrate(ctx)
// PostgreSQL (for production)
db, _ := sql.Open("postgres", "postgres://...")
storage := storage.NewPostgres(db)
storage.Migrate(ctx)
Backoff Strategies ¶
Configure retry backoff behavior:
executor := durex.New(storage, durex.WithBackoff(durex.DefaultExponentialBackoff()), )
Available strategies:
- durex.NoBackoff(): Immediate retry (default)
- durex.ConstantBackoff{Delay: 5 * time.Second}: Fixed delay
- durex.LinearBackoff{InitialDelay: time.Second, MaxDelay: time.Minute}
- durex.ExponentialBackoff{InitialDelay: time.Second, MaxDelay: 5 * time.Minute, Multiplier: 2.0}
- durex.JitteredBackoff{Strategy: ..., JitterRate: 0.1}: Add randomness to prevent thundering herd
Deduplication ¶
Prevent duplicate commands with unique keys:
executor.Add(ctx, durex.Spec{
Name: "sendEmail",
UniqueKey: "email:user123:welcome", // Only one active command with this key
})
If a non-terminal command with the same UniqueKey exists, Add() returns ErrDuplicateCommand.
Rate Limiting ¶
Control concurrent command execution:
executor := durex.New(storage,
durex.WithRateLimit("sendEmail", 10), // Max 10 concurrent emails
durex.WithRateLimit("apiCall", 5), // Max 5 concurrent API calls
durex.WithGlobalRateLimit(100), // Max 100 total concurrent commands
)
Tracing and Correlation ¶
Commands automatically propagate trace and correlation IDs to child commands:
executor.Add(ctx, durex.Spec{
Name: "workflow",
TraceID: "trace-123", // Propagated to all children
CorrelationID: "correlation-456", // Links related commands
})
Access these in your command:
func (c *MyCommand) Execute(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
log.Printf("TraceID: %s, CorrelationID: %s", cmd.TraceID, cmd.CorrelationID)
// ...
}
Web Dashboard ¶
Enable the built-in monitoring dashboard:
// Simple standalone server
go executor.ServeDashboard(":8080")
// Or integrate with existing HTTP server
http.Handle("/durex/", http.StripPrefix("/durex", executor.DashboardHandler()))
Multi-Instance Deployment ¶
For horizontal scaling with PostgreSQL, Durex automatically uses row-level locking:
db, _ := sql.Open("postgres", "postgres://...")
store := storage.NewPostgres(db)
store.Migrate(ctx)
executor := durex.New(store,
durex.WithPollInterval(500*time.Millisecond),
durex.WithClaimBatchSize(20),
)
Multiple executor instances can safely run concurrently - each will claim different commands.
Production Considerations ¶
For production deployments:
- Use PostgreSQL for durability and multi-instance support
- Configure appropriate parallelism based on workload
- Set up monitoring using the MetricsCollector interface
- Implement proper error handling and alerting
- Use deadlines to prevent runaway commands
- Consider idempotency in command implementations
See the examples directory for complete working examples.
Example ¶
package main
import (
"context"
"fmt"
"time"
"github.com/simonovic86/durex"
"github.com/simonovic86/durex/storage"
)
// GreetCommand sends a greeting.
type GreetCommand struct{}
func (c *GreetCommand) Name() string { return "greet" }
func (c *GreetCommand) Execute(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
name := cmd.GetString("name")
fmt.Printf("Hello, %s!\n", name)
return durex.Empty(), nil
}
func main() {
// Create in-memory storage
store := storage.NewMemory()
// Create executor with options
executor := durex.New(store,
durex.WithParallelism(4),
durex.WithDefaultRetries(3),
)
// Register command handlers
executor.Register(&GreetCommand{})
// Start processing
ctx := context.Background()
executor.Start(ctx)
defer executor.Stop()
// Add a command
executor.Add(ctx, durex.Spec{
Name: "greet",
Data: durex.M{"name": "World"},
})
// Wait for processing
time.Sleep(100 * time.Millisecond)
}
Output: Hello, World!
Index ¶
- Variables
- func CorrelationIDFromContext(ctx context.Context) string
- func GenerateID() string
- func HandleTyped[T any](e *Executor, name string, fn TypedExecuteFunc[T], opts ...TypedOption[T])
- func SetIDGenerator(gen IDGenerator)
- func TraceIDFromContext(ctx context.Context) string
- func WithCorrelationID(ctx context.Context, correlationID string) context.Context
- func WithInstance(ctx context.Context, instance *Instance) context.Context
- func WithTraceID(ctx context.Context, traceID string) context.Context
- type BackoffStrategy
- type BaseCommand
- func (b BaseCommand) Default() Spec
- func (b BaseCommand) Execute(ctx context.Context, cmd *Instance) (Result, error)
- func (b BaseCommand) Expired(ctx context.Context, cmd *Instance) (Result, error)
- func (b BaseCommand) Name() string
- func (b BaseCommand) Recover(ctx context.Context, cmd *Instance, err error) (Result, error)
- type Command
- type CommandRateStats
- type ConstantBackoff
- type ContextExtractor
- type ContextInjector
- type DefaultContextExtractor
- type DefaultContextInjector
- type DefaultIDGenerator
- type Defaulter
- type Duration
- type Event
- type EventType
- type ExecuteFunc
- type Executor
- func (e *Executor) Add(ctx context.Context, spec Spec) (*Instance, error)
- func (e *Executor) AddMany(ctx context.Context, specs ...Spec) ([]*Instance, error)
- func (e *Executor) Cancel(ctx context.Context, id string) error
- func (e *Executor) CancelByTag(ctx context.Context, tag string) (int, error)
- func (e *Executor) DashboardHandler() http.Handler
- func (e *Executor) FindDeadLettered(ctx context.Context) ([]*Instance, error)
- func (e *Executor) Get(ctx context.Context, id string) (*Instance, error)
- func (e *Executor) HandleFunc(name string, fn ExecuteFunc, opts ...FuncOption) *Executor
- func (e *Executor) History(ctx context.Context, id string) ([]Event, error)
- func (e *Executor) PurgeDLQ(ctx context.Context, age time.Duration) (int, error)
- func (e *Executor) Register(cmd Command) *Executor
- func (e *Executor) ReplayFromDLQ(ctx context.Context, id string) error
- func (e *Executor) ServeDashboard(addr string) error
- func (e *Executor) Start(ctx context.Context) error
- func (e *Executor) Stats(ctx context.Context) (*Stats, error)
- func (e *Executor) Stop() error
- type Expirable
- type ExpiredFunc
- type ExponentialBackoff
- type FuncCommand
- func (f *FuncCommand) Default() Spec
- func (f *FuncCommand) Execute(ctx context.Context, cmd *Instance) (Result, error)
- func (f *FuncCommand) Expired(ctx context.Context, cmd *Instance) (Result, error)
- func (f *FuncCommand) Name() string
- func (f *FuncCommand) Recover(ctx context.Context, cmd *Instance, err error) (Result, error)
- type FuncOption
- type HookedStorage
- type Hooks
- type IDGenerator
- type Instance
- func (i *Instance) Age() time.Duration
- func (i *Instance) Clone() *Instance
- func (i *Instance) ContinueSequence(additionalData M) Result
- func (i *Instance) Duration() time.Duration
- func (i *Instance) Get(key string) any
- func (i *Instance) GetBool(key string) bool
- func (i *Instance) GetInt(key string) int
- func (i *Instance) GetMap(key string) M
- func (i *Instance) GetSlice(key string) []any
- func (i *Instance) GetString(key string) string
- func (i *Instance) HasTag(tag string) bool
- func (i *Instance) IsOverdue() bool
- func (i *Instance) MarshalJSON() ([]byte, error)
- func (i *Instance) RecordError(eventType EventType, err error)
- func (i *Instance) RecordEvent(eventType EventType, message string)
- func (i *Instance) RecordEventWithDuration(eventType EventType, duration time.Duration, message string)
- func (i *Instance) Set(key string, value any)
- func (i *Instance) UnmarshalJSON(data []byte) error
- type JitteredBackoff
- type LinearBackoff
- type LockingStorage
- type M
- type MetricsCollector
- type Middleware
- type MiddlewareContext
- type Option
- func WithBackoff(strategy BackoffStrategy) Option
- func WithClaimBatchSize(size int) Option
- func WithCleanupAge(d time.Duration) Option
- func WithCleanupInterval(d time.Duration) Option
- func WithDashboard(addr string) Option
- func WithDeadLetterQueue() Option
- func WithDefaultRepeatInterval(d time.Duration) Option
- func WithDefaultRetries(n int) Option
- func WithDefaultTimeout(d time.Duration) Option
- func WithErrorHandler(handler func(cmd *Instance, err error)) Option
- func WithGlobalRateLimit(maxConcurrent int) Option
- func WithGracefulShutdown(d time.Duration) Option
- func WithLogger(logger *slog.Logger) Option
- func WithMaxDelay(d time.Duration) Option
- func WithMetrics(collector MetricsCollector) Option
- func WithMiddleware(mw ...Middleware) Option
- func WithParallelism(n int) Option
- func WithPermanentCommands(names ...string) Option
- func WithPollInterval(d time.Duration) Option
- func WithQueueSize(size int) Option
- func WithRateLimit(commandName string, maxConcurrent int) Option
- func WithRateLimiter(limiter *RateLimiter) Option
- func WithStuckCommandRecovery(checkInterval, threshold time.Duration) Option
- type PrometheusMetrics
- func (m *PrometheusMetrics) Collect(ch chan<- prometheus.Metric)
- func (m *PrometheusMetrics) Collectors() []prometheus.Collector
- func (m *PrometheusMetrics) CommandCompleted(name string, duration time.Duration)
- func (m *PrometheusMetrics) CommandFailed(name string, _ error)
- func (m *PrometheusMetrics) CommandRetried(name string, _ int)
- func (m *PrometheusMetrics) CommandStarted(name string)
- func (m *PrometheusMetrics) Describe(ch chan<- *prometheus.Desc)
- func (m *PrometheusMetrics) QueueSize(size int)
- type PrometheusOption
- type Query
- type QueryableStorage
- type RateLimitStats
- type RateLimiter
- func (r *RateLimiter) Acquire(ctx context.Context, name string) (release func(), err error)
- func (r *RateLimiter) Current(name string) int
- func (r *RateLimiter) GlobalCurrent() int
- func (r *RateLimiter) SetGlobalLimit(limit int)
- func (r *RateLimiter) SetLimit(name string, limit int)
- func (r *RateLimiter) Stats() RateLimitStats
- func (r *RateLimiter) TryAcquire(name string) (release func(), ok bool)
- type RecoverFunc
- type Recoverable
- type Registry
- func (r *Registry) Clear()
- func (r *Registry) Count() int
- func (r *Registry) Has(name string) bool
- func (r *Registry) MustRegister(cmd Command)
- func (r *Registry) Names() []string
- func (r *Registry) Register(cmd Command)
- func (r *Registry) Resolve(name string) (Command, error)
- func (r *Registry) Unregister(name string) bool
- type Result
- type SimpleIDGenerator
- type Spec
- func (s Spec) WithCorrelationID(correlationID string) Spec
- func (s Spec) WithData(data M) Spec
- func (s Spec) WithDeadline(d time.Duration) Spec
- func (s Spec) WithDelay(d time.Duration) Spec
- func (s Spec) WithPriority(p int) Spec
- func (s Spec) WithRetries(n int) Spec
- func (s Spec) WithTags(tags ...string) Spec
- func (s Spec) WithTimeout(d time.Duration) Spec
- func (s Spec) WithTraceID(traceID string) Spec
- func (s Spec) WithUniqueKey(key string) Spec
- type Stats
- type Status
- type Storage
- type Transaction
- type TransactionalStorage
- type TypedCommand
- type TypedExecuteFunc
- type TypedOption
- type TypedRecoverFunc
- type ULIDGenerator
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( ErrExecutorStopped = errors.New("durex: executor is stopped") ErrExecutorNotReady = errors.New("durex: executor is not started") )
Common executor errors.
var ( ErrNotFound = errors.New("durex: command not found") ErrAlreadyExists = errors.New("durex: command already exists") ErrStorageClosed = errors.New("durex: storage is closed") ErrDuplicateCommand = errors.New("durex: command with this unique key already exists") )
Common storage errors.
Functions ¶
func CorrelationIDFromContext ¶ added in v0.3.0
CorrelationIDFromContext returns the correlation ID from the context, or empty string if not set.
func GenerateID ¶
func GenerateID() string
GenerateID creates a new ID using the package-level generator.
func HandleTyped ¶ added in v0.2.0
func HandleTyped[T any](e *Executor, name string, fn TypedExecuteFunc[T], opts ...TypedOption[T])
HandleTyped registers a typed command handler. The data from the command instance is automatically unmarshaled to the type T.
Example:
type OrderData struct {
OrderID string `json:"orderId"`
Amount float64 `json:"amount"`
}
durex.HandleTyped(executor, "processOrder", func(ctx context.Context, data OrderData, cmd *durex.Instance) (durex.Result, error) {
// data is already typed - no GetString() needed!
log.Printf("Processing order %s for $%.2f", data.OrderID, data.Amount)
return durex.Empty(), nil
})
func SetIDGenerator ¶
func SetIDGenerator(gen IDGenerator)
SetIDGenerator sets the package-level ID generator.
func TraceIDFromContext ¶ added in v0.3.0
TraceIDFromContext returns the trace ID from the context, or empty string if not set.
func WithCorrelationID ¶ added in v0.3.0
WithCorrelationID returns a new context with the given correlation ID.
func WithInstance ¶ added in v0.3.0
WithInstance returns a new context with the given command instance.
Types ¶
type BackoffStrategy ¶ added in v0.3.0
type BackoffStrategy interface {
// NextDelay returns the delay before the next retry attempt.
// attempt starts at 1 for the first retry.
NextDelay(attempt int) time.Duration
}
BackoffStrategy calculates the delay before retrying a failed command.
func DefaultExponentialBackoff ¶ added in v0.3.0
func DefaultExponentialBackoff() BackoffStrategy
DefaultExponentialBackoff returns a sensible default exponential backoff. Starts at 1 second, doubles each time, max 5 minutes, with 10% jitter.
func NoBackoff ¶ added in v0.3.0
func NoBackoff() BackoffStrategy
NoBackoff returns a strategy with zero delay (immediate retry).
type BaseCommand ¶
type BaseCommand struct{}
BaseCommand provides a minimal Command implementation. Embed this in your commands to satisfy the Command interface with sensible defaults, then override only what you need.
Example:
type MyCommand struct {
durex.BaseCommand
db *sql.DB
}
func (c *MyCommand) Name() string { return "myCommand" }
func (c *MyCommand) Execute(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
// your logic here
return durex.Empty(), nil
}
func (BaseCommand) Default ¶
func (b BaseCommand) Default() Spec
Default returns an empty Spec. Override this to provide defaults.
func (BaseCommand) Name ¶
func (b BaseCommand) Name() string
Name returns an empty string. Override this in your command.
type Command ¶
type Command interface {
// Name returns the unique identifier for this command type.
// This name is used to register and resolve command handlers.
Name() string
// Execute runs the command logic.
// Returns a Result indicating what should happen next.
// If an error is returned, the command will be retried (if retries > 0)
// or marked as failed and Recover will be called.
Execute(ctx context.Context, cmd *Instance) (Result, error)
}
Command defines the interface that all command handlers must implement.
Commands encapsulate business logic that can be executed asynchronously, persisted, retried on failure, and recovered on system restart.
type CommandRateStats ¶ added in v0.3.0
CommandRateStats holds per-command rate statistics.
type ConstantBackoff ¶ added in v0.3.0
ConstantBackoff returns the same delay for every retry.
type ContextExtractor ¶ added in v0.3.0
type ContextExtractor interface {
// ExtractTraceID extracts a trace ID from the context.
ExtractTraceID(ctx context.Context) string
// ExtractCorrelationID extracts a correlation ID from the context.
ExtractCorrelationID(ctx context.Context) string
}
ContextExtractor extracts trace/correlation IDs from incoming requests. Implement this interface to integrate with your tracing system.
type ContextInjector ¶ added in v0.3.0
type ContextInjector interface {
// InjectContext injects trace and correlation IDs into the context.
InjectContext(ctx context.Context, traceID, correlationID string) context.Context
}
ContextInjector injects trace/correlation IDs into outgoing contexts. Implement this interface to integrate with your tracing system.
type DefaultContextExtractor ¶ added in v0.3.0
type DefaultContextExtractor struct{}
DefaultContextExtractor uses durex context keys.
func (DefaultContextExtractor) ExtractCorrelationID ¶ added in v0.3.0
func (DefaultContextExtractor) ExtractCorrelationID(ctx context.Context) string
ExtractCorrelationID implements ContextExtractor.
func (DefaultContextExtractor) ExtractTraceID ¶ added in v0.3.0
func (DefaultContextExtractor) ExtractTraceID(ctx context.Context) string
ExtractTraceID implements ContextExtractor.
type DefaultContextInjector ¶ added in v0.3.0
type DefaultContextInjector struct{}
DefaultContextInjector uses durex context keys.
func (DefaultContextInjector) InjectContext ¶ added in v0.3.0
func (DefaultContextInjector) InjectContext(ctx context.Context, traceID, correlationID string) context.Context
InjectContext implements ContextInjector.
type DefaultIDGenerator ¶
type DefaultIDGenerator struct {
// contains filtered or unexported fields
}
DefaultIDGenerator creates IDs using timestamp + random bytes. Format: "cmd_<timestamp_hex>_<random_hex>" Example: "cmd_18d5f3a2b4c_8a7b6c5d4e3f2a1b"
func (*DefaultIDGenerator) Generate ¶
func (g *DefaultIDGenerator) Generate() string
Generate creates a new unique ID.
type Defaulter ¶
type Defaulter interface {
// Default returns the default Spec for this command type.
// These values are merged with user-provided values when creating instances.
Default() Spec
}
Defaulter is an optional interface commands can implement to provide default specification values.
type Duration ¶ added in v0.2.0
Duration is an alias for time.Duration that allows FuncOption to work without import cycles in user code.
type Event ¶ added in v0.8.0
type Event struct {
// Type is the event type (created, started, completed, etc.).
Type EventType `json:"type"`
// Timestamp is when the event occurred.
Timestamp time.Time `json:"timestamp"`
// Attempt is the attempt number (for started/failed/completed events).
Attempt int `json:"attempt,omitempty"`
// Error contains the error message (for failed events).
Error string `json:"error,omitempty"`
// DurationMs is how long the execution took in milliseconds.
DurationMs int64 `json:"duration_ms,omitempty"`
// Message contains additional context about the event.
Message string `json:"message,omitempty"`
}
Event represents a single execution event in a command's history.
type EventType ¶ added in v0.8.0
type EventType string
EventType represents the type of execution event.
const ( EventCreated EventType = "created" EventStarted EventType = "started" EventCompleted EventType = "completed" EventFailed EventType = "failed" EventRetrying EventType = "retrying" EventExpired EventType = "expired" EventCancelled EventType = "cancelled" EventRepeating EventType = "repeating" EventRecovered EventType = "recovered" // Moved to DLQ or recovery handler called )
Event types for command lifecycle.
type ExecuteFunc ¶ added in v0.2.0
ExecuteFunc is the function signature for command execution.
type Executor ¶
type Executor struct {
// contains filtered or unexported fields
}
Executor manages command execution with persistence, retries, and scheduling.
func (*Executor) Add ¶
Add queues a new command for execution.
Example ¶
package main
import (
"context"
"fmt"
"time"
"github.com/simonovic86/durex"
"github.com/simonovic86/durex/storage"
)
// GreetCommand sends a greeting.
type GreetCommand struct{}
func (c *GreetCommand) Name() string { return "greet" }
func (c *GreetCommand) Execute(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
name := cmd.GetString("name")
fmt.Printf("Hello, %s!\n", name)
return durex.Empty(), nil
}
func main() {
store := storage.NewMemory()
executor := durex.New(store)
executor.Register(&GreetCommand{})
executor.Start(context.Background())
defer executor.Stop()
ctx := context.Background()
// Add a simple command
instance, _ := executor.Add(ctx, durex.Spec{
Name: "greet",
Data: durex.M{"name": "Alice"},
})
fmt.Println("Added command:", instance.Name)
time.Sleep(100 * time.Millisecond)
}
Output: Added command: greet Hello, Alice!
Example (WithRetries) ¶
package main
import (
"context"
"fmt"
"time"
"github.com/simonovic86/durex"
"github.com/simonovic86/durex/storage"
)
// GreetCommand sends a greeting.
type GreetCommand struct{}
func (c *GreetCommand) Name() string { return "greet" }
func (c *GreetCommand) Execute(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
name := cmd.GetString("name")
fmt.Printf("Hello, %s!\n", name)
return durex.Empty(), nil
}
func main() {
store := storage.NewMemory()
executor := durex.New(store)
executor.Register(&GreetCommand{})
executor.Start(context.Background())
defer executor.Stop()
ctx := context.Background()
// Add a command with retries and delay
executor.Add(ctx, durex.Spec{
Name: "greet",
Data: durex.M{"name": "Bob"},
Retries: 3,
Delay: time.Second,
})
}
Example (WithTags) ¶
package main
import (
"context"
"fmt"
"github.com/simonovic86/durex"
"github.com/simonovic86/durex/storage"
)
// GreetCommand sends a greeting.
type GreetCommand struct{}
func (c *GreetCommand) Name() string { return "greet" }
func (c *GreetCommand) Execute(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
name := cmd.GetString("name")
fmt.Printf("Hello, %s!\n", name)
return durex.Empty(), nil
}
func main() {
store := storage.NewMemory()
executor := durex.New(store)
executor.Register(&GreetCommand{})
executor.Start(context.Background())
defer executor.Stop()
ctx := context.Background()
// Add a command with tags for categorization
executor.Add(ctx, durex.Spec{
Name: "greet",
Data: durex.M{"name": "Charlie"},
Tags: []string{"priority:high", "batch:123"},
})
}
func (*Executor) CancelByTag ¶ added in v0.5.0
CancelByTag cancels all pending commands with the given tag. Returns the number of commands cancelled. Requires QueryableStorage to support tag queries.
func (*Executor) DashboardHandler ¶ added in v0.4.0
DashboardHandler returns an http.Handler that serves the Durex dashboard. Mount this at your desired path to enable the web dashboard.
Example:
http.Handle("/durex/", http.StripPrefix("/durex", executor.DashboardHandler()))
http.ListenAndServe(":8080", nil)
func (*Executor) FindDeadLettered ¶ added in v0.5.0
FindDeadLettered returns all commands in the dead letter queue.
func (*Executor) HandleFunc ¶ added in v0.2.0
func (e *Executor) HandleFunc(name string, fn ExecuteFunc, opts ...FuncOption) *Executor
HandleFunc registers a function as a command handler. This is the simplest way to create a command.
Example:
executor.HandleFunc("sendEmail", func(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
to := cmd.GetString("to")
if err := mailer.Send(to); err != nil {
return durex.Empty(), err
}
return durex.Empty(), nil
})
With options:
executor.HandleFunc("sendEmail", sendEmailFn,
durex.Retries(3),
durex.OnRecover(handleFailure),
)
Example ¶
package main
import (
"context"
"fmt"
"time"
"github.com/simonovic86/durex"
"github.com/simonovic86/durex/storage"
)
func main() {
store := storage.NewMemory()
executor := durex.New(store)
// Register using HandleFunc for simple handlers
executor.HandleFunc("sendEmail", func(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
to := cmd.GetString("to")
subject := cmd.GetString("subject")
fmt.Printf("Sending email to %s: %s\n", to, subject)
return durex.Empty(), nil
})
executor.Start(context.Background())
defer executor.Stop()
executor.Add(context.Background(), durex.Spec{
Name: "sendEmail",
Data: durex.M{
"to": "user@example.com",
"subject": "Welcome!",
},
})
time.Sleep(100 * time.Millisecond)
}
Output: Sending email to user@example.com: Welcome!
func (*Executor) PurgeDLQ ¶ added in v0.5.0
PurgeDLQ removes dead-lettered commands older than the specified age. Returns the number of commands purged.
func (*Executor) Register ¶
Register adds a command handler to the executor. Must be called before Start.
func (*Executor) ReplayFromDLQ ¶ added in v0.5.0
ReplayFromDLQ replays a dead-lettered command. The command is reset to PENDING status and will be executed again.
func (*Executor) ServeDashboard ¶ added in v0.4.0
ServeDashboard starts an HTTP server serving the dashboard on the given address. This is a convenience method for simple deployments. For production, use DashboardHandler() and integrate with your existing server.
Example:
go executor.ServeDashboard(":8080")
func (*Executor) Start ¶
Start begins processing commands. It replays pending commands from storage and starts worker goroutines.
type Expirable ¶
type Expirable interface {
// Expired is called when a command's deadline has passed before execution.
// Use this to handle timeout scenarios gracefully.
Expired(ctx context.Context, cmd *Instance) (Result, error)
}
Expirable is an optional interface commands can implement to handle deadline expiration.
type ExpiredFunc ¶ added in v0.2.0
ExpiredFunc is the function signature for deadline expiration.
type ExponentialBackoff ¶ added in v0.3.0
type ExponentialBackoff struct {
InitialDelay time.Duration
MaxDelay time.Duration
Multiplier float64
}
ExponentialBackoff increases delay exponentially with each attempt. Delay = InitialDelay * (Multiplier ^ (attempt - 1))
type FuncCommand ¶ added in v0.2.0
type FuncCommand struct {
// contains filtered or unexported fields
}
FuncCommand wraps a function as a Command. Created via HandleFunc or NewFunc.
func NewFunc ¶ added in v0.2.0
func NewFunc(name string, fn ExecuteFunc, opts ...FuncOption) *FuncCommand
NewFunc creates a new FuncCommand. Use this when you need to configure the command before registering.
Example:
cmd := durex.NewFunc("sendEmail", sendEmailFn,
durex.Retries(3),
durex.OnRecover(handleFailure),
)
executor.Handle(cmd)
func (*FuncCommand) Default ¶ added in v0.2.0
func (f *FuncCommand) Default() Spec
Default implements Defaulter.
func (*FuncCommand) Name ¶ added in v0.2.0
func (f *FuncCommand) Name() string
Name implements Command.
type FuncOption ¶ added in v0.2.0
type FuncOption func(*FuncCommand)
FuncOption configures a FuncCommand.
func Deadline ¶ added in v0.2.0
func Deadline(d Duration) FuncOption
Deadline sets the default deadline.
func OnExpired ¶ added in v0.2.0
func OnExpired(fn ExpiredFunc) FuncOption
OnExpired sets the expiration handler.
func OnRecover ¶ added in v0.2.0
func OnRecover(fn RecoverFunc) FuncOption
OnRecover sets the recovery function.
func Period ¶ added in v0.2.0
func Period(d Duration) FuncOption
Period sets the repeat period for recurring commands.
type HookedStorage ¶
HookedStorage wraps a Storage with lifecycle hooks.
func (*HookedStorage) Create ¶
func (h *HookedStorage) Create(ctx context.Context, cmd *Instance) error
Create implements Storage.
type Hooks ¶
type Hooks struct {
// AfterCreate is called after a command is created.
AfterCreate func(ctx context.Context, cmd *Instance)
// AfterUpdate is called after a command is updated.
AfterUpdate func(ctx context.Context, cmd *Instance)
// AfterDelete is called after a command is deleted.
AfterDelete func(ctx context.Context, id string)
// OnError is called when a storage operation fails.
OnError func(ctx context.Context, op string, err error)
}
Hooks allows observing storage operations. Useful for logging, metrics, and debugging.
type IDGenerator ¶
type IDGenerator interface {
Generate() string
}
IDGenerator generates unique command instance IDs.
type Instance ¶
type Instance struct {
// ID is the unique identifier for this instance.
ID string `json:"id"`
// Name is the command type identifier.
Name string `json:"name"`
// Data contains the command payload.
Data M `json:"data,omitempty"`
// Status is the current execution state.
Status Status `json:"status"`
// Retries is the remaining retry count.
Retries int `json:"retries"`
// Sequence is the remaining command chain.
Sequence []string `json:"sequence,omitempty"`
// ParentID links to the parent command that spawned this instance.
ParentID *string `json:"parent_id,omitempty"`
// Priority determines execution order.
Priority int `json:"priority"`
// Tags for categorization.
Tags []string `json:"tags,omitempty"`
// UniqueKey for deduplication.
// If set, only one active command with this key can exist at a time.
UniqueKey string `json:"unique_key,omitempty"`
// TraceID for distributed tracing.
// Propagated from parent commands.
TraceID string `json:"trace_id,omitempty"`
// CorrelationID links related commands together.
// Propagated from parent commands.
CorrelationID string `json:"correlation_id,omitempty"`
// CreatedAt is when the instance was created.
CreatedAt time.Time `json:"created_at"`
// ReadyAt is when the instance becomes eligible for execution.
ReadyAt time.Time `json:"ready_at"`
// StartedAt is when execution began (nil if not started).
StartedAt *time.Time `json:"started_at,omitempty"`
// CompletedAt is when execution finished (nil if not completed).
CompletedAt *time.Time `json:"completed_at,omitempty"`
// DeadlineAt is the execution deadline (nil if no deadline).
DeadlineAt *time.Time `json:"deadline_at,omitempty"`
// Timeout is the maximum execution time per attempt.
// If the handler doesn't complete within this duration, the context is cancelled.
Timeout time.Duration `json:"timeout,omitempty"`
// Period is the repeat interval for recurring commands.
Period time.Duration `json:"period,omitempty"`
// Error contains the error message if the command failed.
Error string `json:"error,omitempty"`
// Attempt tracks the current attempt number (starts at 1).
Attempt int `json:"attempt"`
// Metadata stores additional runtime information.
Metadata M `json:"metadata,omitempty"`
// History contains the execution history of this command.
// Events are appended as the command progresses through its lifecycle.
History []Event `json:"history,omitempty"`
}
Instance represents a persisted command with runtime state. Instances are created from Specs and track execution progress.
func InstanceFromContext ¶ added in v0.3.0
InstanceFromContext returns the command instance from the context, or nil if not set.
func (*Instance) ContinueSequence ¶
ContinueSequence creates a Result that spawns the next command in the sequence. The accumulated data is passed to the next command. Returns Empty() if the sequence is empty.
Example:
func (c *Step1Command) Execute(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
cmd.Set("step1_result", "done")
return cmd.ContinueSequence(nil), nil
}
Example ¶
package main
import (
"context"
"fmt"
"time"
"github.com/simonovic86/durex"
"github.com/simonovic86/durex/storage"
)
func main() {
store := storage.NewMemory()
executor := durex.New(store)
executor.HandleFunc("validate", func(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
fmt.Println("Validating order")
cmd.Set("validated", true)
return cmd.ContinueSequence(nil), nil
})
executor.HandleFunc("charge", func(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
fmt.Println("Charging payment")
return cmd.ContinueSequence(nil), nil
})
executor.HandleFunc("ship", func(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
fmt.Println("Shipping order")
return durex.Empty(), nil
})
executor.Start(context.Background())
defer executor.Stop()
// Create a sequence of commands
executor.Add(context.Background(), durex.Spec{
Name: "validate",
Sequence: []string{"charge", "ship"},
Data: durex.M{"order_id": "12345"},
})
time.Sleep(100 * time.Millisecond)
}
Output: Validating order Charging payment Shipping order
func (*Instance) Duration ¶
Duration returns how long the command took to execute. Returns 0 if the command hasn't completed.
func (*Instance) Get ¶
Get retrieves a value from the command data with type assertion. Returns the zero value if the key doesn't exist or type doesn't match.
func (*Instance) GetInt ¶
GetInt retrieves an int value from command data. Handles both int and float64 (from JSON unmarshaling).
func (*Instance) MarshalJSON ¶
MarshalJSON implements json.Marshaler.
func (*Instance) RecordError ¶ added in v0.8.0
RecordError appends a failed event with error details to the command's history.
func (*Instance) RecordEvent ¶ added in v0.8.0
RecordEvent appends an event to the command's history.
func (*Instance) RecordEventWithDuration ¶ added in v0.8.0
func (i *Instance) RecordEventWithDuration(eventType EventType, duration time.Duration, message string)
RecordEventWithDuration appends an event with duration to the command's history.
func (*Instance) Set ¶
Set stores a value in the command data. Note: Changes are not automatically persisted. Call executor.Update() if needed.
func (*Instance) UnmarshalJSON ¶
UnmarshalJSON implements json.Unmarshaler.
type JitteredBackoff ¶ added in v0.3.0
type JitteredBackoff struct {
Strategy BackoffStrategy
JitterRate float64 // 0.0 to 1.0, e.g., 0.1 = ±10% jitter
}
JitteredBackoff wraps another BackoffStrategy and adds random jitter. This helps prevent thundering herd problems.
type LinearBackoff ¶ added in v0.3.0
LinearBackoff increases delay linearly with each attempt. Delay = InitialDelay * attempt
type LockingStorage ¶ added in v0.4.0
type LockingStorage interface {
Storage
// ClaimPending atomically finds and claims up to limit pending commands.
// Returns commands that are ready to execute (ReadyAt <= now).
// Uses row-level locking (FOR UPDATE SKIP LOCKED in PostgreSQL) to prevent
// multiple executors from claiming the same command.
// The returned commands have their status set to STARTED.
ClaimPending(ctx context.Context, limit int) ([]*Instance, error)
}
LockingStorage extends Storage with row-level locking for safe concurrent access. Implement this interface for multi-instance deployments.
type MetricsCollector ¶
type MetricsCollector interface {
// CommandStarted is called when a command begins execution.
CommandStarted(name string)
// CommandCompleted is called when a command finishes successfully.
CommandCompleted(name string, duration time.Duration)
// CommandFailed is called when a command fails.
CommandFailed(name string, err error)
// CommandRetried is called when a command is retried.
CommandRetried(name string, attempt int)
// QueueSize reports the current queue size.
QueueSize(size int)
}
MetricsCollector receives execution metrics.
type Middleware ¶
type Middleware func(ctx MiddlewareContext, next func() (Result, error)) (Result, error)
Middleware wraps command execution. Return the result of calling next() to continue the chain, or return early to short-circuit execution.
func RateLimitMiddleware ¶ added in v0.3.0
func RateLimitMiddleware(limiter *RateLimiter, timeout time.Duration) Middleware
RateLimitMiddleware creates middleware that applies rate limiting. The timeout specifies how long to wait for a slot before giving up.
func TracingMiddleware ¶ added in v0.3.0
func TracingMiddleware(injector ContextInjector) Middleware
TracingMiddleware creates middleware that propagates trace and correlation IDs. It injects the instance's trace/correlation IDs into the context before execution.
type MiddlewareContext ¶
type MiddlewareContext struct {
// Command is the command being executed.
Command *Instance
// Handler is the command handler.
Handler Command
// Executor is the executor running the command.
Executor *Executor
}
MiddlewareContext provides context for middleware.
type Option ¶
type Option func(*Executor)
Option configures an Executor.
func WithBackoff ¶ added in v0.3.0
func WithBackoff(strategy BackoffStrategy) Option
WithBackoff sets the backoff strategy for retries. Default is NoBackoff() (immediate retry). Use DefaultExponentialBackoff() for production workloads.
func WithClaimBatchSize ¶ added in v0.4.0
WithClaimBatchSize sets how many commands each worker claims per poll cycle when using LockingStorage. Default is 10.
func WithCleanupAge ¶
WithCleanupAge sets how old completed commands must be before cleanup. Default is 24 hours.
func WithCleanupInterval ¶
WithCleanupInterval sets how often completed commands are cleaned up. Set to 0 to disable automatic cleanup. Default is 1 hour.
func WithDashboard ¶ added in v0.5.0
WithDashboard enables the web dashboard on the specified address. The dashboard provides a UI for monitoring commands and executor stats.
Example:
executor := durex.New(store, durex.WithDashboard(":8080"))
The dashboard will be available at http://localhost:8080 after Start().
func WithDeadLetterQueue ¶ added in v0.5.0
func WithDeadLetterQueue() Option
WithDeadLetterQueue enables the dead letter queue. When enabled, commands that fail after exhausting all retries are moved to DEAD_LETTER status instead of FAILED. This allows for manual inspection and replay of failed commands.
Use ReplayFromDLQ() to retry a dead-lettered command, and PurgeDLQ() to remove old dead-lettered commands.
func WithDefaultRepeatInterval ¶
WithDefaultRepeatInterval sets the default period for repeating commands. Default is 1 minute.
func WithDefaultRetries ¶
WithDefaultRetries sets the default retry count for commands that don't specify their own. Default is 0 (no retries).
func WithDefaultTimeout ¶ added in v0.5.0
WithDefaultTimeout sets the default execution timeout for commands that don't specify their own. Default is 0 (no timeout).
The timeout limits how long each execution attempt can take. If the handler doesn't complete within this duration, the context is cancelled and the command is treated as failed (will retry if retries remain).
func WithErrorHandler ¶
WithErrorHandler sets a custom error handler for command failures. This is called in addition to the command's Recover method.
func WithGlobalRateLimit ¶ added in v0.3.0
WithGlobalRateLimit sets the maximum total concurrent executions across all commands. This is useful for limiting overall system load.
Example:
executor := durex.New(storage, durex.WithGlobalRateLimit(100), // max 100 total concurrent commands )
func WithGracefulShutdown ¶
WithGracefulShutdown sets the timeout for graceful shutdown. Default is 30 seconds.
func WithLogger ¶
WithLogger sets the logger for the executor. Default is slog.Default().
func WithMaxDelay ¶
WithMaxDelay sets the maximum delay for scheduled commands. Commands scheduled further out will be re-queued periodically. Default is 24 hours.
func WithMetrics ¶
func WithMetrics(collector MetricsCollector) Option
WithMetrics enables metrics collection. The provided MetricsCollector will receive execution metrics.
func WithMiddleware ¶
func WithMiddleware(mw ...Middleware) Option
WithMiddleware adds middleware to the execution pipeline. Middleware is executed in the order added.
Example ¶
package main
import (
"context"
"fmt"
"time"
"github.com/simonovic86/durex"
"github.com/simonovic86/durex/storage"
)
func main() {
store := storage.NewMemory()
// Logging middleware
loggingMiddleware := func(ctx durex.MiddlewareContext, next func() (durex.Result, error)) (durex.Result, error) {
fmt.Printf("Starting: %s\n", ctx.Command.Name)
result, err := next()
fmt.Printf("Finished: %s\n", ctx.Command.Name)
return result, err
}
executor := durex.New(store,
durex.WithMiddleware(loggingMiddleware),
)
executor.HandleFunc("task", func(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
fmt.Println("Executing task")
return durex.Empty(), nil
})
executor.Start(context.Background())
defer executor.Stop()
executor.Add(context.Background(), durex.Spec{Name: "task"})
time.Sleep(100 * time.Millisecond)
}
Output: Starting: task Executing task Finished: task
func WithParallelism ¶
WithParallelism sets the number of concurrent command workers. Default is 4.
func WithPermanentCommands ¶
WithPermanentCommands sets commands that should always be running. These commands are started on executor Start() and restarted if they fail.
func WithPollInterval ¶ added in v0.4.0
WithPollInterval sets how often workers poll for new commands when using LockingStorage (multi-instance mode). Default is 1 second.
func WithQueueSize ¶
WithQueueSize sets the internal queue buffer size. Default is 1000.
func WithRateLimit ¶ added in v0.3.0
WithRateLimit sets the maximum concurrent executions for a specific command type. This creates or updates the executor's rate limiter.
Example:
executor := durex.New(storage,
durex.WithRateLimit("sendEmail", 10), // max 10 concurrent emails
durex.WithRateLimit("processOrder", 5), // max 5 concurrent orders
)
func WithRateLimiter ¶ added in v0.3.0
func WithRateLimiter(limiter *RateLimiter) Option
WithRateLimiter sets a custom rate limiter for the executor. Use this when you need fine-grained control over rate limiting.
func WithStuckCommandRecovery ¶ added in v0.5.0
WithStuckCommandRecovery enables automatic recovery of stuck commands. Stuck commands are those in STARTED status for longer than threshold, which may indicate a worker crash or process restart.
checkInterval: how often to check for stuck commands threshold: how long a command must be in STARTED status to be considered stuck
By default, stuck command recovery is disabled. Recommended settings: checkInterval=1m, threshold=5m
type PrometheusMetrics ¶ added in v0.7.0
type PrometheusMetrics struct {
// contains filtered or unexported fields
}
PrometheusMetrics implements MetricsCollector using Prometheus metrics.
func NewPrometheusMetrics ¶ added in v0.7.0
func NewPrometheusMetrics( registry prometheus.Registerer, opts ...PrometheusOption, ) *PrometheusMetrics
NewPrometheusMetrics creates a new PrometheusMetrics collector. The metrics are automatically registered with the provided registry. If registry is nil, prometheus.DefaultRegisterer is used.
func (*PrometheusMetrics) Collect ¶ added in v0.7.0
func (m *PrometheusMetrics) Collect(ch chan<- prometheus.Metric)
Collect implements prometheus.Collector.
func (*PrometheusMetrics) Collectors ¶ added in v0.7.0
func (m *PrometheusMetrics) Collectors() []prometheus.Collector
Collectors returns all Prometheus collectors for manual registration. Use this if you need more control over registration.
func (*PrometheusMetrics) CommandCompleted ¶ added in v0.7.0
func (m *PrometheusMetrics) CommandCompleted(name string, duration time.Duration)
CommandCompleted implements MetricsCollector.
func (*PrometheusMetrics) CommandFailed ¶ added in v0.7.0
func (m *PrometheusMetrics) CommandFailed(name string, _ error)
CommandFailed implements MetricsCollector.
func (*PrometheusMetrics) CommandRetried ¶ added in v0.7.0
func (m *PrometheusMetrics) CommandRetried(name string, _ int)
CommandRetried implements MetricsCollector.
func (*PrometheusMetrics) CommandStarted ¶ added in v0.7.0
func (m *PrometheusMetrics) CommandStarted(name string)
CommandStarted implements MetricsCollector.
func (*PrometheusMetrics) Describe ¶ added in v0.7.0
func (m *PrometheusMetrics) Describe(ch chan<- *prometheus.Desc)
Describe implements prometheus.Collector.
func (*PrometheusMetrics) QueueSize ¶ added in v0.7.0
func (m *PrometheusMetrics) QueueSize(size int)
QueueSize implements MetricsCollector.
type PrometheusOption ¶ added in v0.7.0
type PrometheusOption func(*prometheusConfig)
PrometheusOption configures PrometheusMetrics.
func WithPrometheusBuckets ¶ added in v0.7.0
func WithPrometheusBuckets(buckets []float64) PrometheusOption
WithPrometheusBuckets sets custom histogram buckets for duration metrics.
func WithPrometheusNamespace ¶ added in v0.7.0
func WithPrometheusNamespace(namespace string) PrometheusOption
WithPrometheusNamespace sets the namespace for all metrics.
func WithPrometheusSubsystem ¶ added in v0.7.0
func WithPrometheusSubsystem(subsystem string) PrometheusOption
WithPrometheusSubsystem sets the subsystem for all metrics.
type Query ¶
type Query struct {
// Status filters by command status.
Status *Status
// Name filters by command name.
Name *string
// ParentID filters by parent command.
ParentID *string
// Tags filters by tags (commands must have all specified tags).
Tags []string
// CreatedAfter filters commands created after this time.
CreatedAfter *time.Time
// CreatedBefore filters commands created before this time.
CreatedBefore *time.Time
// Limit restricts the number of results.
Limit int
// Offset skips the first N results.
Offset int
// OrderBy specifies the sort field.
OrderBy string
// OrderDesc sorts in descending order.
OrderDesc bool
}
Query represents a flexible query for finding commands.
type QueryableStorage ¶
type QueryableStorage interface {
Storage
// Find returns commands matching the query.
Find(ctx context.Context, query Query) ([]*Instance, error)
}
QueryableStorage extends Storage with advanced query capabilities. Implement this interface for full-featured storage backends.
type RateLimitStats ¶ added in v0.3.0
type RateLimitStats struct {
GlobalLimit int
GlobalCurrent int
Commands map[string]CommandRateStats
}
RateLimitStats holds rate limiter statistics.
type RateLimiter ¶ added in v0.3.0
type RateLimiter struct {
// contains filtered or unexported fields
}
RateLimiter controls concurrent execution of commands.
func NewRateLimiter ¶ added in v0.3.0
func NewRateLimiter() *RateLimiter
NewRateLimiter creates a new rate limiter.
func (*RateLimiter) Acquire ¶ added in v0.3.0
func (r *RateLimiter) Acquire(ctx context.Context, name string) (release func(), err error)
Acquire attempts to acquire a slot for the given command. It blocks until a slot is available or the context is canceled. Returns a release function that must be called when the command completes.
func (*RateLimiter) Current ¶ added in v0.3.0
func (r *RateLimiter) Current(name string) int
Current returns the current concurrent count for a command.
func (*RateLimiter) GlobalCurrent ¶ added in v0.3.0
func (r *RateLimiter) GlobalCurrent() int
GlobalCurrent returns the total concurrent count across all commands.
func (*RateLimiter) SetGlobalLimit ¶ added in v0.3.0
func (r *RateLimiter) SetGlobalLimit(limit int)
SetGlobalLimit sets the maximum total concurrent executions across all commands. Use 0 to remove the limit.
func (*RateLimiter) SetLimit ¶ added in v0.3.0
func (r *RateLimiter) SetLimit(name string, limit int)
SetLimit sets the maximum concurrent executions for a command type. Use 0 to remove the limit.
func (*RateLimiter) Stats ¶ added in v0.3.0
func (r *RateLimiter) Stats() RateLimitStats
Stats returns rate limiter statistics.
func (*RateLimiter) TryAcquire ¶ added in v0.3.0
func (r *RateLimiter) TryAcquire(name string) (release func(), ok bool)
TryAcquire attempts to acquire a slot without blocking. Returns the release function and true if successful, nil and false otherwise.
type RecoverFunc ¶ added in v0.2.0
RecoverFunc is the function signature for error recovery.
type Recoverable ¶
type Recoverable interface {
// Recover is called when a command fails after exhausting all retries.
// Use this to clean up resources, notify systems, or spawn compensating commands.
Recover(ctx context.Context, cmd *Instance, err error) (Result, error)
}
Recoverable is an optional interface commands can implement to handle failures after all retries are exhausted.
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry manages command handler registration and resolution.
func (*Registry) MustRegister ¶
MustRegister is like Register but allows overwriting existing handlers. Useful for testing or dynamic reconfiguration.
func (*Registry) Register ¶
Register adds a command handler to the registry. Panics if a handler with the same name is already registered.
func (*Registry) Resolve ¶
Resolve returns the command handler for the given name. Returns an error if no handler is registered.
func (*Registry) Unregister ¶
Unregister removes a command handler from the registry. Returns true if a handler was removed.
type Result ¶
type Result struct {
// Commands to spawn after this command completes.
// Child commands will have their ParentID set to this command's ID.
Commands []Spec
// Repeat signals that this command should be rescheduled.
// The command will run again after its Period duration.
// Use this for recurring tasks like cleanup jobs or polling.
Repeat bool
// Retry signals that this command should retry immediately.
// Only effective if the command has remaining retries.
// Unlike returning an error, Retry doesn't trigger Recover.
Retry bool
}
Result represents the outcome of command execution. It tells the executor what to do next.
func Empty ¶
func Empty() Result
Empty returns a Result that halts the execution chain. Use this when the command completes successfully with no follow-up.
func Next ¶
Next returns a Result that spawns a single follow-up command. This is a convenience wrapper around Result{Commands: []Spec{spec}}.
Example:
func (c *Step1Command) Execute(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
// step 1 logic...
return durex.Next(durex.Spec{
Name: "step2Command",
Data: cmd.Data,
}), nil
}
Example ¶
package main
import (
"context"
"fmt"
"time"
"github.com/simonovic86/durex"
"github.com/simonovic86/durex/storage"
)
func main() {
store := storage.NewMemory()
executor := durex.New(store)
// Step 1 spawns Step 2
executor.HandleFunc("step1", func(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
fmt.Println("Step 1 complete")
return durex.Next(durex.Spec{
Name: "step2",
Data: durex.M{"from": "step1"},
}), nil
})
executor.HandleFunc("step2", func(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
fmt.Println("Step 2 complete")
return durex.Empty(), nil
})
executor.Start(context.Background())
defer executor.Stop()
executor.Add(context.Background(), durex.Spec{Name: "step1"})
time.Sleep(100 * time.Millisecond)
}
Output: Step 1 complete Step 2 complete
func Repeat ¶
func Repeat() Result
Repeat returns a Result that reschedules the command. The command will execute again after its Period duration.
Example:
func (c *CleanupCommand) Execute(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
// cleanup logic...
return durex.Repeat(), nil // run again after period
}
Example ¶
package main
import (
"context"
"fmt"
"time"
"github.com/simonovic86/durex"
"github.com/simonovic86/durex/storage"
)
func main() {
store := storage.NewMemory()
executor := durex.New(store)
count := 0
executor.HandleFunc("heartbeat", func(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
count++
fmt.Printf("Heartbeat #%d\n", count)
if count >= 2 {
return durex.Empty(), nil // Stop repeating
}
return durex.Repeat(), nil
})
executor.Start(context.Background())
defer executor.Stop()
// Add repeating command with 50ms period
executor.Add(context.Background(), durex.Spec{
Name: "heartbeat",
Period: 50 * time.Millisecond,
})
time.Sleep(150 * time.Millisecond)
}
Output: Heartbeat #1 Heartbeat #2
func Retry ¶
func Retry() Result
Retry returns a Result that retries the command. This decrements the retry counter without triggering Recover. If no retries remain, the command completes normally.
Use Retry for expected transient failures where you want explicit control over retry behavior.
Example:
func (c *SendCommand) Execute(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
err := c.send()
if isTransient(err) {
return durex.Retry(), nil // retry without triggering Recover
}
if err != nil {
return durex.Empty(), err // triggers Recover after retries exhausted
}
return durex.Empty(), nil
}
func Spawn ¶
Spawn returns a Result that creates multiple follow-up commands.
Example:
func (c *FanOutCommand) Execute(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
items := cmd.Data["items"].([]string)
specs := make([]durex.Spec, len(items))
for i, item := range items {
specs[i] = durex.Spec{
Name: "processItemCommand",
Data: durex.M{"item": item},
}
}
return durex.Spawn(specs...), nil
}
Example ¶
package main
import (
"context"
"fmt"
"time"
"github.com/simonovic86/durex"
"github.com/simonovic86/durex/storage"
)
func main() {
store := storage.NewMemory()
executor := durex.New(store)
// Fan-out: process multiple items in parallel
executor.HandleFunc("fanOut", func(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
items := []string{"a", "b", "c"}
specs := make([]durex.Spec, len(items))
for i, item := range items {
specs[i] = durex.Spec{
Name: "process",
Data: durex.M{"item": item},
}
}
return durex.Spawn(specs...), nil
})
executor.HandleFunc("process", func(ctx context.Context, cmd *durex.Instance) (durex.Result, error) {
item := cmd.GetString("item")
fmt.Printf("Processing: %s\n", item)
return durex.Empty(), nil
})
executor.Start(context.Background())
defer executor.Stop()
executor.Add(context.Background(), durex.Spec{Name: "fanOut"})
time.Sleep(100 * time.Millisecond)
}
type SimpleIDGenerator ¶
type SimpleIDGenerator struct {
// contains filtered or unexported fields
}
SimpleIDGenerator generates simple incremental IDs. Useful for testing but not recommended for production.
func NewSimpleIDGenerator ¶
func NewSimpleIDGenerator(prefix string) *SimpleIDGenerator
NewSimpleIDGenerator creates a new SimpleIDGenerator with the given prefix.
func (*SimpleIDGenerator) Generate ¶
func (g *SimpleIDGenerator) Generate() string
Generate creates a new incremental ID.
type Spec ¶
type Spec struct {
// Name is the command type identifier.
// Must match a registered command handler's Name().
Name string `json:"name"`
// Data contains the command payload.
// This data is persisted and available during execution.
Data M `json:"data,omitempty"`
// Delay postpones command execution by the specified duration.
// The command will be scheduled to run after this delay.
Delay time.Duration `json:"delay,omitempty"`
// Period sets the interval for repeating commands.
// When a command returns Repeat(), it will be rescheduled after this duration.
// If not set, defaults to executor's DefaultRepeatInterval.
Period time.Duration `json:"period,omitempty"`
// Timeout sets the maximum execution time per attempt.
// If the handler doesn't complete within this duration, the context is cancelled
// and the command is treated as failed (will retry if retries remain).
// This is different from Deadline which prevents starting after a time.
Timeout time.Duration `json:"timeout,omitempty"`
// Deadline sets a relative deadline from now.
// If the command hasn't started by this time, Expired() is called instead.
// Takes precedence over DeadlineAt if both are set.
Deadline time.Duration `json:"deadline,omitempty"`
// DeadlineAt sets an absolute deadline.
// If the command hasn't started by this time, Expired() is called instead.
DeadlineAt *time.Time `json:"deadline_at,omitempty"`
// Retries is the number of retry attempts on failure.
// When a command returns an error, it will be retried up to this many times.
// After retries are exhausted, Recover() is called.
Retries int `json:"retries,omitempty"`
// Sequence defines a chain of commands to execute in order.
// When the current command completes, the next command in the sequence
// is automatically spawned with the accumulated data.
Sequence []string `json:"sequence,omitempty"`
// Priority sets the command's priority (higher = more important).
// Commands with higher priority are executed first when multiple
// commands are ready. Default is 0.
Priority int `json:"priority,omitempty"`
// Tags are optional labels for categorization and filtering.
Tags []string `json:"tags,omitempty"`
// UniqueKey prevents duplicate commands with the same key.
// If a non-terminal command with this key already exists, Add() returns
// ErrDuplicateCommand instead of creating a new instance.
// Useful for ensuring idempotency (e.g., "send-email:user123").
UniqueKey string `json:"unique_key,omitempty"`
// TraceID is used for distributed tracing.
// Automatically propagated to child commands.
TraceID string `json:"trace_id,omitempty"`
// CorrelationID links related commands together.
// Automatically propagated to child commands.
// If not set, defaults to the root command's ID.
CorrelationID string `json:"correlation_id,omitempty"`
}
Spec defines the specification for creating a command instance. Use this when adding new commands to the executor.
func Typed ¶ added in v0.2.0
Typed creates a Spec with typed data. The data struct is automatically converted to a map.
Example:
executor.Add(ctx, durex.Typed("sendEmail", EmailData{
To: "user@example.com",
Subject: "Welcome!",
}))
func (Spec) WithCorrelationID ¶ added in v0.3.0
WithCorrelationID returns a copy of the Spec with the given correlation ID. The correlation ID is propagated to all child commands.
func (Spec) WithDeadline ¶
WithDeadline returns a copy of the Spec with the given deadline.
func (Spec) WithPriority ¶
WithPriority returns a copy of the Spec with the given priority.
func (Spec) WithRetries ¶
WithRetries returns a copy of the Spec with the given retry count.
func (Spec) WithTimeout ¶ added in v0.5.0
WithTimeout returns a copy of the Spec with the given execution timeout. The timeout limits how long each execution attempt can take.
func (Spec) WithTraceID ¶ added in v0.3.0
WithTraceID returns a copy of the Spec with the given trace ID. The trace ID is propagated to all child commands.
func (Spec) WithUniqueKey ¶ added in v0.3.0
WithUniqueKey returns a copy of the Spec with the given unique key. Commands with the same unique key cannot be added while one is still active.
type Stats ¶
type Stats struct {
Pending int64
Completed int64
Failed int64
DeadLetter int64
Repeating int64
QueueSize int
RegisteredCommands int
WorkerCount int
RateLimit *RateLimitStats
}
Stats holds executor statistics.
type Status ¶
type Status string
Status represents the execution state of a command instance.
const ( // StatusPending indicates the command is waiting to be executed. StatusPending Status = "PENDING" // StatusStarted indicates the command is currently executing. StatusStarted Status = "STARTED" // StatusCompleted indicates the command finished successfully. StatusCompleted Status = "COMPLETED" // StatusFailed indicates the command failed after exhausting retries. StatusFailed Status = "FAILED" // StatusExpired indicates the command's deadline passed before execution. StatusExpired Status = "EXPIRED" // StatusRepeating indicates the command is scheduled to repeat. StatusRepeating Status = "REPEATING" // StatusCancelled indicates the command was cancelled before completion. StatusCancelled Status = "CANCELLED" // StatusDeadLetter indicates the command failed permanently and was moved to DLQ. // Commands in DLQ can be inspected and replayed manually. StatusDeadLetter Status = "DEAD_LETTER" )
func (Status) IsTerminal ¶
IsTerminal returns true if the status represents a final state.
type Storage ¶
type Storage interface {
// Create persists a new command instance.
// Returns ErrAlreadyExists if an instance with the same ID exists.
Create(ctx context.Context, cmd *Instance) error
// Update saves changes to an existing command instance.
// Returns ErrNotFound if the instance doesn't exist.
Update(ctx context.Context, cmd *Instance) error
// Delete removes a command instance by ID.
// Returns nil if the instance doesn't exist (idempotent).
Delete(ctx context.Context, id string) error
// Get retrieves a command instance by ID.
// Returns ErrNotFound if the instance doesn't exist.
Get(ctx context.Context, id string) (*Instance, error)
// FindPending returns all commands that are ready for execution.
// This includes commands with status PENDING, STARTED, or REPEATING
// where ReadyAt <= now.
FindPending(ctx context.Context) ([]*Instance, error)
// FindByStatus returns commands with the given status.
FindByStatus(ctx context.Context, status Status) ([]*Instance, error)
// FindByParent returns all child commands of the given parent.
FindByParent(ctx context.Context, parentID string) ([]*Instance, error)
// FindByUniqueKey returns an active command with the given unique key.
// Returns ErrNotFound if no active command with this key exists.
// Only searches non-terminal statuses (PENDING, STARTED, REPEATING).
FindByUniqueKey(ctx context.Context, key string) (*Instance, error)
// Cleanup removes completed/failed/expired commands older than the given age.
// Returns the number of commands deleted.
Cleanup(ctx context.Context, olderThan time.Duration) (int64, error)
// Count returns the total number of commands, optionally filtered by status.
Count(ctx context.Context, status *Status) (int64, error)
// Close releases any resources held by the storage.
Close() error
}
Storage defines the interface for command persistence. Implementations must be safe for concurrent use.
type Transaction ¶
type Transaction interface {
Storage
// Commit commits the transaction.
Commit() error
// Rollback aborts the transaction.
Rollback() error
}
Transaction represents a storage transaction.
type TransactionalStorage ¶
type TransactionalStorage interface {
Storage
// Begin starts a new transaction.
Begin(ctx context.Context) (Transaction, error)
}
TransactionalStorage extends Storage with transaction support. Implement this interface for ACID-compliant storage backends.
type TypedCommand ¶ added in v0.2.0
type TypedCommand[T any] struct { // contains filtered or unexported fields }
TypedCommand wraps a typed function as a Command.
func NewTyped ¶ added in v0.2.0
func NewTyped[T any](name string, fn TypedExecuteFunc[T], opts ...TypedOption[T]) *TypedCommand[T]
NewTyped creates a new typed command.
Example:
type EmailData struct {
To string `json:"to"`
Subject string `json:"subject"`
}
cmd := durex.NewTyped("sendEmail", func(ctx context.Context, data EmailData, cmd *durex.Instance) (durex.Result, error) {
return mailer.Send(data.To, data.Subject), nil
})
func (*TypedCommand[T]) Default ¶ added in v0.2.0
func (c *TypedCommand[T]) Default() Spec
Default implements Defaulter.
func (*TypedCommand[T]) Name ¶ added in v0.2.0
func (c *TypedCommand[T]) Name() string
Name implements Command.
type TypedExecuteFunc ¶ added in v0.2.0
TypedExecuteFunc is a function that receives typed data.
type TypedOption ¶ added in v0.2.0
type TypedOption[T any] func(*TypedCommand[T])
TypedOption configures a TypedCommand.
func WithDeadline ¶ added in v0.2.0
func WithDeadline[T any](d time.Duration) TypedOption[T]
WithDeadline sets the default deadline for typed commands.
func WithPeriod ¶ added in v0.2.0
func WithPeriod[T any](d time.Duration) TypedOption[T]
WithPeriod sets the repeat period for typed commands.
func WithRecover ¶ added in v0.2.0
func WithRecover[T any](fn TypedRecoverFunc[T]) TypedOption[T]
WithRecover sets the recovery function for typed commands.
func WithRetries ¶ added in v0.2.0
func WithRetries[T any](n int) TypedOption[T]
WithRetries sets the default retry count for typed commands.
type TypedRecoverFunc ¶ added in v0.2.0
type TypedRecoverFunc[T any] func(ctx context.Context, data T, cmd *Instance, err error) (Result, error)
TypedRecoverFunc is a recovery function that receives typed data.
type ULIDGenerator ¶
type ULIDGenerator struct{}
ULIDGenerator generates ULID-style IDs. Lexicographically sortable and URL-safe.
func (*ULIDGenerator) Generate ¶
func (g *ULIDGenerator) Generate() string
Generate creates a new ULID.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
examples
|
|
|
basic
command
Example: Basic durex usage with functional commands
|
Example: Basic durex usage with functional commands |
|
workflow
command
Example: E-commerce order processing with typed commands
|
Example: E-commerce order processing with typed commands |
|
Package storage provides storage implementations for durex.
|
Package storage provides storage implementations for durex. |