romancy

package module
v0.6.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 25, 2025 License: MIT Imports: 23 Imported by: 0

README

Romancy

Romancy (from "Romance" - medieval chivalric tales) - Durable execution framework for Go

Lightweight durable execution framework - no separate server required

GitHub

Overview

Romancy is a lightweight durable execution framework for Go that runs as a library in your application - no separate workflow server required. It provides automatic crash recovery through deterministic replay, allowing long-running workflows to survive process restarts and failures without losing progress.

Perfect for: Order processing, distributed transactions (Saga pattern), and any workflow that must survive crashes.

Romancy is a Go port of Edda (Python), providing the same core concepts and patterns in idiomatic Go.

Key Features

  • Lightweight Library: Runs in your application process - no separate server infrastructure
  • 🔄 Durable Execution: Deterministic replay with workflow history for automatic crash recovery
  • 🎯 Workflow & Activity: Clear separation between orchestration logic and business logic
  • 🔁 Saga Pattern: Automatic compensation on failure
  • 🌐 Multi-worker Execution: Run workflows safely across multiple servers or containers
  • 📦 Transactional Outbox: Reliable event publishing with guaranteed delivery
  • ☁️ CloudEvents Support: Native support for CloudEvents protocol
  • ⏱️ Event & Timer Waiting: Free up worker resources while waiting for events or timers
  • 🌍 net/http Integration: Use with any Go HTTP router (chi, gorilla/mux, standard library)
  • 🔒 Automatic Transactions: Activities run in transactions by default with ctx.Session() access
  • 🤖 MCP Integration: Expose workflows as MCP tools for AI assistants
  • 📬 Channel Messaging: Broadcast and competing message delivery between workflows
  • 🔄 Recur Pattern: Erlang-style tail recursion for long-running workflows
  • 📡 PostgreSQL LISTEN/NOTIFY: Real-time event delivery without polling
  • 🤖 LLM Integration: Durable LLM calls via bucephalus with automatic caching for replay

Documentation

📚 Full documentation: https://i2y.github.io/romancy/

Architecture

Romancy runs as a lightweight library in your applications, with all workflow state stored in a shared database:

┌─────────────────────────────────────────────────────────────────────┐
│                          Your Go Applications                        │
├──────────────────────┬──────────────────────┬──────────────────────┤
│   order-service-1    │   order-service-2    │   order-service-3    │
│   ┌──────────────┐   │   ┌──────────────┐   │   ┌──────────────┐   │
│   │   Romancy    │   │   │   Romancy    │   │   │   Romancy    │   │
│   │   Workflow   │   │   │   Workflow   │   │   │   Workflow   │   │
│   └──────────────┘   │   └──────────────┘   │   └──────────────┘   │
└──────────┬───────────┴──────────┬───────────┴──────────┬───────────┘
           │                      │                      │
           └──────────────────────┼──────────────────────┘
                                  │
                         ┌────────▼────────┐
                         │ Shared Database │
                         │ (SQLite/PG/MySQL)│
                         └─────────────────┘

Key Points:

  • Multiple workers can run simultaneously across different pods/servers
  • Each workflow instance runs on only one worker at a time (automatic coordination)
  • WaitEvent() and Sleep() free up worker resources while waiting
  • Automatic crash recovery with stale lock cleanup and workflow auto-resume

Quick Start

package main

import (
    "context"
    "log"
    "net/http"

    "github.com/i2y/romancy"
)

// Input/Output types
type OrderInput struct {
    OrderID string  `json:"order_id"`
    Amount  float64 `json:"amount"`
}

type OrderResult struct {
    OrderID string         `json:"order_id"`
    Payment map[string]any `json:"payment"`
}

// Define an activity
var processPayment = romancy.DefineActivity("process_payment",
    func(ctx context.Context, amount float64) (map[string]any, error) {
        log.Printf("Processing payment: $%.2f", amount)
        return map[string]any{"status": "paid", "amount": amount}, nil
    },
)

// Define a workflow
var orderWorkflow = romancy.DefineWorkflow("order_workflow",
    func(ctx *romancy.WorkflowContext, input OrderInput) (OrderResult, error) {
        // Activity results are recorded in history
        result, err := processPayment.Execute(ctx, input.Amount)
        if err != nil {
            return OrderResult{}, err
        }
        return OrderResult{OrderID: input.OrderID, Payment: result}, nil
    },
)

func main() {
    ctx := context.Background()

    // Create app with SQLite storage
    app := romancy.NewApp(
        romancy.WithDatabase("workflow.db"),
        romancy.WithWorkerID("order-service"),
    )
    defer app.Shutdown(ctx)

    // Start the application
    if err := app.Start(ctx); err != nil {
        log.Fatal(err)
    }

    // Register and start workflow
    romancy.RegisterWorkflow[OrderInput, OrderResult](app, orderWorkflow)

    instanceID, err := romancy.StartWorkflow(ctx, app, orderWorkflow, OrderInput{
        OrderID: "ORD-123",
        Amount:  99.99,
    })
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("Started workflow: %s", instanceID)

    // Serve CloudEvents endpoint
    http.Handle("/", app.Handler())
    log.Fatal(http.ListenAndServe(":8001", nil))
}

What happens on crash?

  1. Activities already executed return cached results from history
  2. Workflow resumes from the last checkpoint
  3. No manual intervention required

Installation

go get github.com/i2y/romancy
Database Support
Database Use Case Multi-Pod Support Production Ready
SQLite Development, testing, single-process ⚠️ Limited ⚠️ Limited
PostgreSQL Production, multi-process/multi-pod ✅ Yes ✅ Recommended
MySQL Production, multi-process/multi-pod ✅ Yes ✅ Yes (8.0+)

Important: For multi-process or multi-pod deployments (K8s, Docker Compose with multiple replicas), use PostgreSQL or MySQL.

Database Migrations

Romancy automatically applies database migrations on startup. Migration files are embedded in the binary, so no external files are needed at runtime.

// Default: auto-migration enabled
app := romancy.NewApp(
    romancy.WithDatabase("postgres://user:pass@localhost/db"),
)

// Disable auto-migration (manage with dbmate CLI instead)
app := romancy.NewApp(
    romancy.WithDatabase("postgres://user:pass@localhost/db"),
    romancy.WithAutoMigrate(false),
)

Features:

  • Automatic: Migrations run during app.Start() before any workflows execute
  • dbmate-compatible: Uses the same schema_migrations table as dbmate CLI
  • Multi-worker safe: Safe for concurrent startup across multiple pods/processes
  • Embedded: Migration files are bundled into the binary via Go embed

The database schema is managed in the durax-io/schema repository, shared between Romancy (Go) and Edda (Python) for cross-framework compatibility.

Manual migration with dbmate (optional):

If you prefer to manage migrations externally:

# Install dbmate
brew install dbmate  # macOS
# or: go install github.com/amacneil/dbmate@latest

# Run migrations (SQLite)
dbmate --url "sqlite:workflow.db" --migrations-dir schema/db/migrations/sqlite up

# Run migrations (PostgreSQL)
dbmate --url "postgres://user:pass@localhost/db?sslmode=disable" --migrations-dir schema/db/migrations/postgresql up

# Run migrations (MySQL)
dbmate --url "mysql://user:pass@localhost/db" --migrations-dir schema/db/migrations/mysql up

Core Concepts

Workflows and Activities

Activity: A unit of work that performs business logic. Activity results are recorded in history.

Workflow: Orchestration logic that coordinates activities. Workflows can be replayed from history after crashes.

// Input/Output types
type EmailInput struct {
    Email   string `json:"email"`
    Message string `json:"message"`
}

type SignupResult struct {
    Status string `json:"status"`
}

// Define an activity with DefineActivity
var sendEmail = romancy.DefineActivity("send_email",
    func(ctx context.Context, input EmailInput) (bool, error) {
        // Business logic - this will be recorded
        log.Printf("Sending email to %s", input.Email)
        return true, nil
    },
)

// Define a workflow with DefineWorkflow
var userSignup = romancy.DefineWorkflow("user_signup",
    func(ctx *romancy.WorkflowContext, email string) (SignupResult, error) {
        // Orchestration logic
        _, err := sendEmail.Execute(ctx, EmailInput{Email: email, Message: "Welcome!"})
        if err != nil {
            return SignupResult{}, err
        }
        return SignupResult{Status: "completed"}, nil
    },
)
Durable Execution

Romancy ensures workflow progress is never lost through deterministic replay:

  1. Activity results are recorded in a history table
  2. On crash recovery, workflows resume from the last checkpoint
  3. Already-executed activities return cached results from history
  4. New activities continue from where the workflow left off

Key guarantees:

  • Activities execute exactly once (results cached in history)
  • Workflows can survive arbitrary crashes
  • No manual checkpoint management required
Compensation (Saga Pattern)

When a workflow fails, Romancy can execute compensation functions for already-executed activities in reverse order:

// Define activity with inline compensation
var reserveInventory = romancy.DefineActivity("reserve_inventory",
    func(ctx context.Context, itemID string) (string, error) {
        log.Printf("Reserved %s", itemID)
        return "reservation-123", nil
    },
    romancy.WithCompensation[string, string](func(ctx context.Context, itemID string) error {
        log.Printf("Cancelled reservation for %s", itemID)
        return nil
    }),
)
Multi-worker Execution

Multiple workers can safely process workflows using database-based exclusive control:

app := romancy.NewApp(
    romancy.WithDatabase("postgres://localhost/workflows"),  // Shared database
    romancy.WithWorkerID("order-service"),
)

Features:

  • Each workflow instance runs on only one worker at a time
  • Automatic stale lock cleanup (5-minute timeout)
  • Crashed workflows automatically resume on any available worker
Transactional Outbox

Activities are automatically transactional by default, ensuring atomicity between:

  1. Activity execution (side effects)
  2. History recording (replay data)
  3. Event publishing (outbox table)
// Default: activity runs in a transaction
var createOrder = romancy.DefineActivity("create_order",
    func(ctx context.Context, orderID string) (map[string]any, error) {
        // If this activity fails, both the history record and any
        // outbox events will be rolled back together
        log.Printf("Creating order: %s", orderID)
        return map[string]any{"order_id": orderID, "status": "created"}, nil
    },
)

Opt-out of transactions for external API calls:

var callExternalAPI = romancy.DefineActivity("call_api",
    func(ctx context.Context, url string) (map[string]any, error) {
        // Not transactional - suitable for external calls
        resp, err := http.Get(url)
        // ...
    },
    romancy.WithTransactional(false),
)
Event & Timer Waiting

Workflows can wait for external events or timers without consuming worker resources:

// Event data type
type PaymentData struct {
    OrderID string `json:"order_id"`
    Status  string `json:"status"`
}

var paymentWorkflow = romancy.DefineWorkflow("payment_workflow",
    func(ctx *romancy.WorkflowContext, orderID string) (PaymentData, error) {
        // Wait for payment completion event (process-releasing)
        event, err := romancy.WaitEvent[PaymentData](ctx, "payment.completed")
        if err != nil {
            return PaymentData{}, err
        }
        return event.Data, nil
    },
)

// With timeout
var paymentWithTimeout = romancy.DefineWorkflow("payment_with_timeout",
    func(ctx *romancy.WorkflowContext, orderID string) (PaymentData, error) {
        event, err := romancy.WaitEvent[PaymentData](ctx, "payment.completed",
            romancy.WithEventTimeout(5*time.Minute))
        if err != nil {
            return PaymentData{}, err
        }
        return event.Data, nil
    },
)

Sleep() for time-based waiting:

var orderWithTimeout = romancy.DefineWorkflow("order_with_timeout",
    func(ctx *romancy.WorkflowContext, orderID string) (map[string]any, error) {
        // Sleep for 60 seconds
        if err := romancy.Sleep(ctx, 60*time.Second); err != nil {
            return nil, err
        }
        // Continue after sleep completes
        return checkPayment.Execute(ctx, orderID)
    },
)
Channel Messaging

Workflows can communicate with each other using durable channels with two delivery modes:

  • Broadcast: All subscribers receive every message
  • Competing: Each message goes to exactly one subscriber (work queue pattern)
var workerWorkflow = romancy.DefineWorkflow("worker",
    func(ctx *romancy.WorkflowContext, workerID string) (map[string]any, error) {
        // Subscribe to a channel in competing mode (work queue)
        if err := romancy.Subscribe(ctx, "tasks", romancy.ModeCompeting); err != nil {
            return nil, err
        }

        // Wait for and receive a message
        msg, err := romancy.Receive[TaskData](ctx, "tasks",
            romancy.WithReceiveTimeout(60*time.Second))
        if err != nil {
            return nil, err
        }

        log.Printf("Worker %s received task: %v", workerID, msg.Data)
        return map[string]any{"task": msg.Data}, nil
    },
)

var dispatcherWorkflow = romancy.DefineWorkflow("dispatcher",
    func(ctx *romancy.WorkflowContext, _ struct{}) (map[string]any, error) {
        // Publish a task to the channel
        if err := romancy.Publish(ctx, "tasks", TaskData{ID: "task-1"}); err != nil {
            return nil, err
        }
        return map[string]any{"dispatched": true}, nil
    },
)
// Send a direct message to a specific workflow instance
romancy.SendTo(ctx, targetInstanceID, "channel", data)

// Unsubscribe from a channel
romancy.Unsubscribe(ctx, "channel")

// Publish with metadata
romancy.Publish(ctx, "channel", data, romancy.WithMetadata(map[string]any{
    "priority": "high",
}))
Recur Pattern

Long-running workflows can use Erlang-style tail recursion to prevent unbounded history growth:

// For Recur, input and output types must be the same
type CounterState struct {
    Count      int  `json:"count"`
    Completed  bool `json:"completed"`
    FinalCount int  `json:"final_count,omitempty"`
}

var counterWorkflow = romancy.DefineWorkflow("counter",
    func(ctx *romancy.WorkflowContext, input CounterState) (CounterState, error) {
        newCount := input.Count + 1

        if newCount < 1000 {
            // Continue with new input (archives history, restarts workflow)
            return romancy.Recur(ctx, CounterState{Count: newCount})
        }

        // Complete when done
        return CounterState{Completed: true, FinalCount: newCount}, nil
    },
)

Key benefits:

  • History is archived before each recur
  • continued_from field tracks workflow lineage
  • Prevents memory/storage issues with long-running workflows

HTTP Integration

Using with net/http

Romancy provides a Handler() method for integration with any Go HTTP router:

// Standard library
http.Handle("/", app.Handler())

// With chi router
r := chi.NewRouter()
r.Mount("/workflows", app.Handler())

// With gorilla/mux
r := mux.NewRouter()
r.PathPrefix("/workflows").Handler(app.Handler())
CloudEvents Support

Romancy accepts CloudEvents at the HTTP endpoint:

curl -X POST http://localhost:8001/ \
  -H "Content-Type: application/cloudevents+json" \
  -d '{
    "specversion": "1.0",
    "type": "order.created",
    "source": "order-service",
    "id": "event-123",
    "data": {"order_id": "ORD-123", "amount": 99.99}
  }'

HTTP Response Codes:

  • 202 Accepted: Event accepted for asynchronous processing
  • 400 Bad Request: CloudEvents parsing/validation error (non-retryable)
  • 500 Internal Server Error: Internal error (retryable)

MCP Integration

Romancy provides Model Context Protocol (MCP) integration, allowing you to expose workflows as MCP tools for AI assistants.

Quick Start
package main

import (
	"context"
	"log"

	"github.com/i2y/romancy"
	"github.com/i2y/romancy/mcp"
)

// Define workflow input/output types with jsonschema tags
type OrderInput struct {
	OrderID    string  `json:"order_id" jsonschema:"The unique order ID"`
	CustomerID string  `json:"customer_id" jsonschema:"The customer ID"`
	Amount     float64 `json:"amount" jsonschema:"The order amount in dollars"`
}

type OrderResult struct {
	OrderID      string `json:"order_id" jsonschema:"The order ID"`
	Status       string `json:"status" jsonschema:"Final order status"`
	Confirmation string `json:"confirmation" jsonschema:"Confirmation number"`
}

// Define workflow
var orderWorkflow = romancy.DefineWorkflow("order_workflow",
	func(ctx *romancy.WorkflowContext, input OrderInput) (OrderResult, error) {
		// Workflow logic here
		return OrderResult{
			OrderID:      input.OrderID,
			Status:       "confirmed",
			Confirmation: "CONF-123",
		}, nil
	},
)

func main() {
	// Create Romancy app
	app := romancy.NewApp(romancy.WithDatabase("workflow.db"))

	// Create MCP server
	server := mcp.NewServer(app,
		mcp.WithServerName("order-service"),
		mcp.WithServerVersion("1.0.0"),
	)

	// Register workflow - auto-generates 4 MCP tools:
	// - order_workflow_start
	// - order_workflow_status
	// - order_workflow_result
	// - order_workflow_cancel
	mcp.RegisterWorkflow[OrderInput, OrderResult](server, orderWorkflow,
		mcp.WithDescription("Process customer orders"),
	)

	// Initialize and run
	ctx := context.Background()
	if err := server.Initialize(ctx); err != nil {
		log.Fatal(err)
	}
	defer server.Shutdown(ctx)

	// Run on stdio transport
	if err := server.RunStdio(ctx); err != nil {
		log.Fatal(err)
	}
}
Auto-Generated Tools

When you register a workflow, Romancy automatically generates four MCP tools:

Tool Description
{workflow}_start Start a new workflow instance with input parameters
{workflow}_status Get the current status of a workflow instance
{workflow}_result Get the result of a completed workflow
{workflow}_cancel Cancel a running workflow instance

LLM Integration

Romancy provides durable LLM calls via the bucephalus library. All LLM calls are automatically cached as activities, so workflow replay returns cached results without re-invoking the LLM API.

Quick Start
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/i2y/romancy"
	"github.com/i2y/romancy/llm"

	// Import the provider you want to use
	_ "github.com/i2y/bucephalus/anthropic"
)

type SummaryInput struct {
	Text string `json:"text"`
}

type SummaryResult struct {
	Summary string `json:"summary"`
}

var summarizeWorkflow = romancy.DefineWorkflow("summarize",
	func(ctx *romancy.WorkflowContext, input SummaryInput) (SummaryResult, error) {
		// LLM call is cached for replay
		response, err := llm.Call(ctx, input.Text,
			llm.WithSystemMessage("Summarize the following text concisely."),
		)
		if err != nil {
			return SummaryResult{}, err
		}
		return SummaryResult{Summary: response.Text}, nil
	},
)

func main() {
	ctx := context.Background()

	// Create app
	app := romancy.NewApp(romancy.WithDatabase("workflow.db"))

	// Set LLM defaults for the app
	llm.SetAppDefaults(app,
		llm.WithProvider("anthropic"),
		llm.WithModel("claude-sonnet-4-5-20250929"),
		llm.WithMaxTokens(1024),
	)

	romancy.RegisterWorkflow(app, summarizeWorkflow)

	if err := app.Start(ctx); err != nil {
		log.Fatal(err)
	}
	defer app.Shutdown(ctx)

	instanceID, _ := romancy.StartWorkflow(ctx, app, summarizeWorkflow, SummaryInput{
		Text: "Long article text here...",
	})
	fmt.Println("Started:", instanceID)
}
Features
Feature Description
llm.Call() Ad-hoc LLM call with automatic caching
llm.CallParse[T]() Parse response into a struct
llm.CallMessages() Multi-turn conversation
llm.DefineDurableCall() Reusable LLM definition
llm.DurableAgent[T] Stateful multi-turn agent
llm.SetAppDefaults() App-level default configuration
Supported Providers

Via bucephalus, Romancy supports:

  • Anthropic (Claude models)
  • OpenAI (GPT models)
  • Google Gemini

Development

Running Tests
go test ./...
Project Structure
romancy/
├── app.go           # Main App struct and HTTP handling
├── workflow.go      # Workflow definition and execution
├── activity.go      # Activity definition with transactions
├── context.go       # WorkflowContext for workflow execution
├── events.go        # Event and timer waiting
├── compensation.go  # Saga pattern compensation
├── hooks/           # Observability hooks
├── outbox/          # Transactional outbox
├── mcp/             # MCP (Model Context Protocol) integration
├── llm/             # LLM integration (via bucephalus)
└── internal/
    ├── storage/     # Database abstraction
    └── replay/      # Deterministic replay engine

License

This project is licensed under the MIT License - see the LICENSE file for details.

  • Edda: The original Python implementation

Documentation

Overview

Package romancy provides a durable execution framework for Go.

Package romancy provides a durable execution framework for Go.

Index

Constants

View Source
const (
	// SuspendForTimer indicates the workflow is waiting for a timer to expire.
	SuspendForTimer = replay.SuspendForTimer
	// SuspendForChannelMessage indicates the workflow is waiting for a channel message (via Receive or WaitEvent).
	SuspendForChannelMessage = replay.SuspendForChannelMessage
	// SuspendForRecur indicates the workflow is recursing with new input.
	SuspendForRecur = replay.SuspendForRecur
)

Variables

View Source
var (
	// IsSuspendSignal returns true if the error is a SuspendSignal.
	IsSuspendSignal = replay.IsSuspendSignal
	// AsSuspendSignal extracts the SuspendSignal from an error if present.
	AsSuspendSignal = replay.AsSuspendSignal
	// NewTimerSuspend creates a SuspendSignal for timer waiting.
	NewTimerSuspend = replay.NewTimerSuspend
	// NewChannelMessageSuspend creates a SuspendSignal for channel message waiting (via Receive or WaitEvent).
	NewChannelMessageSuspend = replay.NewChannelMessageSuspend
	// NewRecurSuspend creates a SuspendSignal for workflow recursion.
	NewRecurSuspend = replay.NewRecurSuspend
)
View Source
var ErrActivityIDConflict = errors.New("activity ID conflict: duplicate activity ID in workflow")

ErrActivityIDConflict indicates duplicate activity IDs in a workflow.

View Source
var ErrChannelNotSubscribed = errors.New("not subscribed to channel")

ErrChannelNotSubscribed indicates an operation on a channel without subscription.

View Source
var ErrDeterminismViolation = errors.New("determinism violation during replay")

ErrDeterminismViolation indicates non-deterministic behavior during replay.

View Source
var ErrGroupNotJoined = errors.New("not a member of group")

ErrGroupNotJoined indicates an operation on a group without membership.

View Source
var ErrInvalidWorkflowState = errors.New("invalid workflow state")

ErrInvalidWorkflowState indicates an invalid workflow state transition.

View Source
var ErrWorkflowAlreadyCancelled = errors.New("workflow already cancelled")

ErrWorkflowAlreadyCancelled indicates an operation on a cancelled workflow.

View Source
var ErrWorkflowAlreadyCompleted = errors.New("workflow already completed")

ErrWorkflowAlreadyCompleted indicates an operation on a completed workflow.

View Source
var ErrWorkflowAlreadyFailed = errors.New("workflow already failed")

ErrWorkflowAlreadyFailed indicates an operation on a failed workflow.

View Source
var ErrWorkflowNotCancellable = errors.New("workflow cannot be cancelled")

ErrWorkflowNotCancellable indicates that the workflow cannot be cancelled. This happens when the workflow is already completed, cancelled, failed, or does not exist.

Functions

func CancelWorkflow

func CancelWorkflow(ctx context.Context, app *App, instanceID, reason string) error

CancelWorkflow cancels a running workflow instance.

func ContextWithWorkflowContext

func ContextWithWorkflowContext(ctx context.Context, wfCtx *WorkflowContext) context.Context

ContextWithWorkflowContext returns a new context with the WorkflowContext embedded. This is used internally to pass WorkflowContext to activities.

func EmbeddedMigrationsFS added in v0.3.0

func EmbeddedMigrationsFS() fs.FS

EmbeddedMigrationsFS returns a filesystem rooted at schema/db/migrations for use with the migrations package.

The returned FS contains subdirectories for each database type:

  • sqlite/
  • postgresql/
  • mysql/

func GetGroupMembers

func GetGroupMembers(ctx context.Context, s storage.Storage, groupName string) ([]string, error)

GetGroupMembers returns the instance IDs of all workflows in a group. This is useful for broadcasting messages to group members.

func IsTerminalError

func IsTerminalError(err error) bool

IsTerminalError returns true if the error is or wraps a TerminalError.

func JoinGroup

func JoinGroup(ctx *WorkflowContext, groupName string) error

JoinGroup adds the workflow to a named group. Groups are useful for scenarios where you need to send messages to a set of related workflows.

Groups persist across workflow restarts and members are automatically removed when the workflow completes.

func LeaveGroup

func LeaveGroup(ctx *WorkflowContext, groupName string) error

LeaveGroup removes the workflow from a named group.

func Publish

func Publish[T any](ctx *WorkflowContext, channelName string, data T, opts ...PublishOption) error

Publish sends a message to all subscribers of a channel. This is an activity that persists the message to storage.

The message will be delivered to: - Broadcast subscribers: All receive the message - Competing subscribers: Exactly one receives the message

func Recur

func Recur[T any](ctx *WorkflowContext, input T) (T, error)

Recur implements Erlang-style tail recursion for workflows. It archives the current workflow's history and starts a new instance with the provided input, maintaining the same instance ID.

This is useful for long-running workflows that need to periodically "reset" their history to prevent unbounded growth.

The workflow will: 1. Archive current history to workflow_history_archive 2. Clean up all subscriptions (events, timers, channels, groups) 3. Mark current instance as "recurred" 4. Create a new instance with continued_from set to current instance 5. Start executing with the new input

Example:

workflow := romancy.DefineWorkflow("counter", func(ctx *romancy.WorkflowContext, input CounterInput) (CounterResult, error) {
    // Process batch
    newCount := input.Count + 1
    if newCount < 1000 {
        // Continue with new input (tail recursion)
        return romancy.Recur(ctx, CounterInput{Count: newCount})
    }
    return CounterResult{FinalCount: newCount}, nil
})

func RegisterActivity

func RegisterActivity[I, O any](activity *Activity[I, O])

RegisterActivity registers an activity's compensation function globally. This is called automatically when an activity with compensation is created.

func RegisterWorkflow

func RegisterWorkflow[I, O any](app *App, workflow Workflow[I, O], opts ...WorkflowOption)

RegisterWorkflow registers a workflow with the application. The workflow can later be started by name or by type.

func SendEvent

func SendEvent[T any](ctx *WorkflowContext, eventType, source string, data T) error

SendEvent sends an event through the transactional outbox. This is a convenience wrapper for outbox.SendEventTransactional.

The event is stored in the database within the current transaction (if any) and will be delivered asynchronously by the outbox relayer.

This ensures that the event is only sent if the activity/transaction commits, providing exactly-once delivery guarantees when combined with idempotent consumers.

Example:

activity := romancy.DefineActivity("process_order", func(ctx context.Context, input OrderInput) (OrderResult, error) {
    wfCtx := romancy.GetWorkflowContext(ctx)
    if wfCtx == nil {
        return OrderResult{}, fmt.Errorf("workflow context not available")
    }
    err := romancy.SendEvent(wfCtx, "order.created", "order-service", map[string]any{
        "order_id": input.OrderID,
        "amount":   input.Amount,
    })
    if err != nil {
        return OrderResult{}, err
    }
    return OrderResult{Status: "created"}, nil
})

func SendEventTransactional

func SendEventTransactional[T any](ctx *WorkflowContext, eventType, source string, data T, opts ...outbox.SendEventOption) error

SendEventTransactional is an alias for outbox.SendEventTransactional. Use this when you need more control over event options.

Example:

err := romancy.SendEventTransactional(wfCtx, "order.created", "order-service", orderData,
    outbox.WithEventID("custom-event-id"),
    outbox.WithContentType("application/json"),
)

func SendTo

func SendTo[T any](ctx *WorkflowContext, targetInstanceID, channelName string, data T, opts ...PublishOption) error

SendTo sends a direct message to a specific workflow instance. The target instance must be subscribed to the channel. Uses dynamic channel names (Edda approach): publishes to "channel:instance_id" so only the target instance receives the message.

func Sleep

func Sleep(ctx *WorkflowContext, duration time.Duration, opts ...SleepOption) error

Sleep suspends the workflow for a specified duration. The workflow will resume after the duration has elapsed.

During replay, if the timer already fired, this returns immediately. Otherwise, it returns a SuspendSignal that signals the engine to: 1. Register a timer subscription 2. Update workflow status to "waiting_for_timer" 3. Release the workflow lock

When the timer expires, the workflow will be resumed.

func SleepUntil

func SleepUntil(ctx *WorkflowContext, t time.Time, opts ...SleepOption) error

SleepUntil suspends the workflow until a specific time.

func StartWorkflow

func StartWorkflow[I, O any](
	ctx context.Context,
	app *App,
	workflow Workflow[I, O],
	input I,
	opts ...StartOption,
) (string, error)

StartWorkflow starts a new workflow instance. Returns the instance ID.

func Subscribe

func Subscribe(ctx *WorkflowContext, channelName string, mode ChannelMode) error

Subscribe registers the workflow to receive messages from a channel. The mode determines how messages are delivered: - ModeBroadcast: All subscribers receive every message - ModeCompeting: Each message goes to exactly one subscriber - ModeDirect: Receives messages sent via SendTo to this instance

Subscriptions persist across workflow restarts and are automatically cleaned up when the workflow completes.

Once a channel has subscribers, its mode is locked. Attempting to subscribe with a different mode will return a ChannelModeConflictError.

func Unsubscribe

func Unsubscribe(ctx *WorkflowContext, channelName string) error

Unsubscribe removes the workflow's subscription to a channel.

Types

type Activity

type Activity[I, O any] struct {
	// contains filtered or unexported fields
}

Activity represents a single unit of work within a workflow. Activities are the only way to perform I/O or side effects in workflows. I is the input type, O is the output type.

func DefineActivity

func DefineActivity[I, O any](
	name string,
	fn func(ctx context.Context, input I) (O, error),
	opts ...ActivityOption[I, O],
) *Activity[I, O]

DefineActivity creates a new activity. By default, activities are wrapped in a transaction for atomicity with history recording and outbox events.

func (*Activity[I, O]) Compensate

func (a *Activity[I, O]) Compensate(ctx context.Context, input I) error

Compensate executes the compensation function if defined.

func (*Activity[I, O]) Execute

func (a *Activity[I, O]) Execute(
	ctx *WorkflowContext,
	input I,
	opts ...ExecuteOption,
) (O, error)

Execute runs the activity within a workflow context. If activityID is empty, it will be auto-generated.

When transactional=true (default), the activity execution, history recording, and outbox events are wrapped in a database transaction for atomicity.

func (*Activity[I, O]) HasCompensation

func (a *Activity[I, O]) HasCompensation() bool

HasCompensation returns true if the activity has a compensation function.

func (*Activity[I, O]) Name

func (a *Activity[I, O]) Name() string

Name returns the activity name.

type ActivityOption

type ActivityOption[I, O any] func(*Activity[I, O])

ActivityOption configures an activity.

func WithCompensation

func WithCompensation[I, O any](fn func(ctx context.Context, input I) error) ActivityOption[I, O]

WithCompensation sets the compensation function for the activity. This function will be called during saga rollback.

func WithRetryPolicy

func WithRetryPolicy[I, O any](policy *retry.Policy) ActivityOption[I, O]

WithRetryPolicy sets the retry policy for the activity.

func WithTimeout

func WithTimeout[I, O any](d time.Duration) ActivityOption[I, O]

WithTimeout sets the timeout for each activity execution attempt.

func WithTransactional

func WithTransactional[I, O any](transactional bool) ActivityOption[I, O]

WithTransactional sets whether the activity execution should be wrapped in a database transaction. Default is true.

When transactional=true (default):

  • Activity execution, history recording, and outbox events are atomic
  • Rollback occurs on failure
  • Use ctx.Storage() for database operations within the same transaction

When transactional=false:

  • Useful for activities that call external APIs or don't need atomicity
  • History and outbox events are still recorded, but not atomically

type App

type App struct {
	// contains filtered or unexported fields
}

App is the main entry point for Romancy. It manages workflow execution, storage, and background tasks.

func NewApp

func NewApp(opts ...Option) *App

NewApp creates a new Romancy application.

func (*App) FindInstances

func (a *App) FindInstances(ctx context.Context, inputFilters map[string]any) ([]*storage.WorkflowInstance, error)

FindInstances searches for workflow instances with input filters. This is a convenience method for searching workflows by their input data.

The inputFilters parameter maps JSON paths to expected values:

  • "customer.id" -> "12345" matches input like {"customer": {"id": "12345"}}
  • "status" -> "active" matches input like {"status": "active"}

Values are compared with exact match. Supported value types:

  • string: Exact string match
  • int/float64: Numeric comparison
  • bool: Boolean comparison
  • nil: Matches null values

Example:

instances, err := app.FindInstances(ctx, map[string]any{
    "order.customer_id": "cust_123",
    "order.status": "pending",
})

func (*App) FindInstancesWithOptions

func (a *App) FindInstancesWithOptions(ctx context.Context, opts storage.ListInstancesOptions) ([]*storage.WorkflowInstance, error)

FindInstancesWithOptions searches for workflow instances with full options. Use this when you need pagination, status filters, or other advanced options in addition to input filters.

func (*App) GetInstance

func (a *App) GetInstance(ctx context.Context, instanceID string) (*storage.WorkflowInstance, error)

GetInstance retrieves a workflow instance by ID.

func (*App) Handler

func (a *App) Handler() http.Handler

Handler returns an http.Handler for CloudEvents and health endpoints. This allows integration with existing HTTP routers (gin, echo, chi, etc.).

Example with chi:

r := chi.NewRouter()
r.Mount("/romancy", app.Handler())
http.ListenAndServe(":8080", r)

Example with standard http.ServeMux:

mux := http.NewServeMux()
mux.Handle("/api/workflows/", app.Handler())
http.ListenAndServe(":8080", mux)

func (*App) ListenAndServe

func (a *App) ListenAndServe(addr string) error

ListenAndServe starts the HTTP server for CloudEvents. For integration with existing HTTP routers, use Handler() instead.

func (*App) Shutdown

func (a *App) Shutdown(ctx context.Context) error

Shutdown gracefully shuts down the application.

func (*App) Start

func (a *App) Start(ctx context.Context) error

Start initializes the application and starts background tasks.

func (*App) Storage

func (a *App) Storage() storage.Storage

Storage returns the storage instance for advanced usage.

func (*App) WorkerID

func (a *App) WorkerID() string

WorkerID returns the worker ID.

type ChannelMessageTimeoutError

type ChannelMessageTimeoutError struct {
	InstanceID  string
	ChannelName string
}

ChannelMessageTimeoutError indicates that waiting for a channel message timed out.

func (*ChannelMessageTimeoutError) Error

type ChannelMode

type ChannelMode = storage.ChannelMode

ChannelMode defines the message delivery mode for a channel subscription.

const (
	// ModeBroadcast delivers messages to all subscribers.
	// Each subscriber receives every message.
	ModeBroadcast ChannelMode = storage.ChannelModeBroadcast

	// ModeCompeting delivers each message to exactly one subscriber.
	// Multiple subscribers compete for messages (work queue pattern).
	ModeCompeting ChannelMode = storage.ChannelModeCompeting

	// ModeDirect receives messages sent via SendTo to this workflow instance.
	// This is syntactic sugar that subscribes to "channel:instanceID" internally.
	// Use with SendTo for point-to-point messaging.
	//
	// Example:
	//   // Receiver workflow
	//   romancy.Subscribe(ctx, "requests", romancy.ModeDirect)
	//   msg, err := romancy.Receive[Request](ctx, "requests")
	//
	//   // Sender workflow
	//   romancy.SendTo(ctx, targetInstanceID, "requests", request)
	ModeDirect ChannelMode = storage.ChannelModeDirect
)

type ChannelModeConflictError added in v0.6.0

type ChannelModeConflictError struct {
	Channel       string
	ExistingMode  string
	RequestedMode string
}

ChannelModeConflictError indicates subscribing with a different mode than the channel's established mode.

func (*ChannelModeConflictError) Error added in v0.6.0

func (e *ChannelModeConflictError) Error() string

type CloudEvent

type CloudEvent struct {
	ID          string          `json:"id"`
	Type        string          `json:"type"`
	Source      string          `json:"source"`
	SpecVersion string          `json:"specversion"`
	Time        *time.Time      `json:"time,omitempty"`
	DataSchema  string          `json:"dataschema,omitempty"`
	Subject     string          `json:"subject,omitempty"`
	Data        json.RawMessage `json:"data,omitempty"`
	Extensions  map[string]any  `json:"-"` // CloudEvents extension attributes
}

CloudEvent represents a CloudEvents v1.0 event.

func (*CloudEvent) UnmarshalJSON

func (e *CloudEvent) UnmarshalJSON(data []byte) error

UnmarshalJSON implements custom JSON unmarshaling to capture extension attributes.

type CompensationExecutor

type CompensationExecutor func(ctx context.Context, input []byte) error

CompensationExecutor is a function that executes a compensation. It takes JSON-encoded input and returns an error.

func GetCompensationExecutor

func GetCompensationExecutor(activityName string) (CompensationExecutor, bool)

GetCompensationExecutor returns the compensation executor for an activity.

type EventTimeoutError

type EventTimeoutError struct {
	InstanceID string
	EventType  string
}

EventTimeoutError indicates that waiting for an event timed out.

func (*EventTimeoutError) Error

func (e *EventTimeoutError) Error() string

type ExecuteOption

type ExecuteOption func(*executeOptions)

ExecuteOption configures activity execution.

func WithActivityID

func WithActivityID(id string) ExecuteOption

WithActivityID specifies a manual activity ID. Required for concurrent activity execution to maintain determinism.

type LockAcquisitionError

type LockAcquisitionError struct {
	InstanceID string
	WorkerID   string
	Reason     string
}

LockAcquisitionError indicates failure to acquire a workflow lock.

func (*LockAcquisitionError) Error

func (e *LockAcquisitionError) Error() string

type Option

type Option func(*appConfig)

Option configures an App.

func WithAutoMigrate

func WithAutoMigrate(enabled bool) Option

WithAutoMigrate enables or disables automatic database migrations on startup.

When enabled (default), romancy will automatically apply pending dbmate-compatible migrations from the embedded schema/db/migrations/ directory during App.Start().

This is compatible with the dbmate CLI tool and uses the same schema_migrations table for tracking applied migrations.

Set to false if you prefer to manage migrations manually using dbmate CLI:

dbmate -d schema/db/migrations/sqlite up

func WithBrokerURL

func WithBrokerURL(url string) Option

WithBrokerURL sets the CloudEvents broker URL for outbox event publishing. Example: "http://broker-ingress.knative-eventing.svc.cluster.local/default/default"

func WithChannelCleanupInterval

func WithChannelCleanupInterval(d time.Duration) Option

WithChannelCleanupInterval sets the interval for cleaning up old channel messages.

func WithChannelMessageRetention

func WithChannelMessageRetention(d time.Duration) Option

WithChannelMessageRetention sets how long to keep channel messages before cleanup.

func WithDatabase

func WithDatabase(url string) Option

WithDatabase sets the database connection URL. Supported formats:

  • SQLite: "file:path/to/db.db" or "sqlite://path/to/db.db"
  • PostgreSQL: "postgres://user:pass@host:port/dbname"

func WithEventTimeoutInterval

func WithEventTimeoutInterval(d time.Duration) Option

WithEventTimeoutInterval sets the interval for checking event timeouts.

func WithHooks

func WithHooks(h hooks.WorkflowHooks) Option

WithHooks sets the workflow lifecycle hooks.

func WithLeaderHeartbeatInterval added in v0.3.0

func WithLeaderHeartbeatInterval(d time.Duration) Option

WithLeaderHeartbeatInterval sets the interval for leader election heartbeat. The leader will renew its lease at this interval. Default: 15 seconds.

func WithLeaderLeaseDuration added in v0.3.0

func WithLeaderLeaseDuration(d time.Duration) Option

WithLeaderLeaseDuration sets the duration for which a leader holds its lease. Should be at least 3x the heartbeat interval to allow for missed heartbeats. Default: 45 seconds.

func WithListenNotify

func WithListenNotify(enabled *bool) Option

WithListenNotify configures PostgreSQL LISTEN/NOTIFY usage. - nil (default): auto-detect based on database URL (enabled for PostgreSQL) - true: force enable (fails if not PostgreSQL) - false: force disable (use polling only)

func WithMaxConcurrentMessages

func WithMaxConcurrentMessages(n int) Option

WithMaxConcurrentMessages sets the maximum number of concurrent message handlers. Default: 10.

func WithMaxConcurrentResumptions

func WithMaxConcurrentResumptions(n int) Option

WithMaxConcurrentResumptions sets the maximum number of concurrent workflow resumptions. This limits goroutine spawning in background tasks to prevent resource exhaustion. Default: 10.

func WithMaxConcurrentTimers

func WithMaxConcurrentTimers(n int) Option

WithMaxConcurrentTimers sets the maximum number of concurrent timer handlers. Default: 10.

func WithMaxMessagesPerBatch

func WithMaxMessagesPerBatch(n int) Option

WithMaxMessagesPerBatch sets the maximum number of channel messages to process per polling cycle. Default: 100.

func WithMaxTimersPerBatch

func WithMaxTimersPerBatch(n int) Option

WithMaxTimersPerBatch sets the maximum number of timers to process per polling cycle. Default: 100.

func WithMaxWorkflowsPerBatch

func WithMaxWorkflowsPerBatch(n int) Option

WithMaxWorkflowsPerBatch sets the maximum number of workflows to process per polling cycle. Default: 100.

func WithMessageCheckInterval

func WithMessageCheckInterval(d time.Duration) Option

WithMessageCheckInterval sets the interval for checking channel message subscriptions.

func WithMigrationsFS added in v0.3.0

func WithMigrationsFS(migrationsFS fs.FS) Option

WithMigrationsFS sets a custom filesystem for database migrations.

By default, romancy uses embedded migrations from schema/db/migrations/. Use this option to provide custom migrations from a different source.

The filesystem should contain subdirectories for each database type:

  • sqlite/
  • postgresql/
  • mysql/

Each subdirectory should contain .sql files in dbmate format with -- migrate:up and -- migrate:down sections.

func WithNotifyReconnectDelay

func WithNotifyReconnectDelay(d time.Duration) Option

WithNotifyReconnectDelay sets the delay before reconnecting after a LISTEN/NOTIFY connection failure. Default: 60 seconds.

func WithOutbox

func WithOutbox(enabled bool) Option

WithOutbox enables the transactional outbox pattern.

func WithOutboxBatchSize

func WithOutboxBatchSize(size int) Option

WithOutboxBatchSize sets the batch size for outbox processing.

func WithOutboxInterval

func WithOutboxInterval(d time.Duration) Option

WithOutboxInterval sets the interval for the outbox relayer.

func WithRecurCheckInterval

func WithRecurCheckInterval(d time.Duration) Option

WithRecurCheckInterval sets the interval for checking recurred workflows.

func WithServiceName

func WithServiceName(name string) Option

WithServiceName sets the service name for identification.

func WithShutdownTimeout

func WithShutdownTimeout(d time.Duration) Option

WithShutdownTimeout sets the timeout for graceful shutdown.

func WithStaleLockInterval

func WithStaleLockInterval(d time.Duration) Option

WithStaleLockInterval sets the interval for stale lock cleanup.

func WithStaleLockTimeout

func WithStaleLockTimeout(d time.Duration) Option

WithStaleLockTimeout sets the timeout after which a lock is considered stale.

func WithTimerCheckInterval

func WithTimerCheckInterval(d time.Duration) Option

WithTimerCheckInterval sets the interval for checking expired timers.

func WithWorkerID

func WithWorkerID(id string) Option

WithWorkerID sets a custom worker ID. If not set, a UUID will be generated.

func WithWorkflowResumptionInterval

func WithWorkflowResumptionInterval(d time.Duration) Option

WithWorkflowResumptionInterval sets the interval for the workflow resumption task. This task finds workflows with status='running' that don't have an active lock and resumes them. This is essential for load balancing in multi-worker environments. Default: 1 second (same as Edda).

type PublishOption

type PublishOption func(*publishOptions)

PublishOption configures publish behavior.

func WithMetadata

func WithMetadata(metadata map[string]any) PublishOption

WithMetadata attaches metadata to the published message.

type ReceiveOption

type ReceiveOption func(*receiveOptions)

ReceiveOption configures channel receive behavior.

func WithReceiveTimeout

func WithReceiveTimeout(d time.Duration) ReceiveOption

WithReceiveTimeout sets a timeout for waiting for a message.

type ReceivedEvent

type ReceivedEvent[T any] struct {
	// CloudEvents metadata
	ID          string         `json:"id"`
	Type        string         `json:"type"`
	Source      string         `json:"source"`
	SpecVersion string         `json:"specversion"`
	Time        *time.Time     `json:"time,omitempty"`
	Extensions  map[string]any `json:"extensions,omitempty"`

	// Event data
	Data T `json:"data"`
}

ReceivedEvent represents an event received by a workflow.

func WaitEvent

func WaitEvent[T any](ctx *WorkflowContext, eventType string, opts ...WaitEventOption) (*ReceivedEvent[T], error)

WaitEvent suspends the workflow until an event of the specified type is received. The event data will be deserialized into type T.

Internally, this uses the channel messaging system. The event_type is used as the channel name, and the workflow subscribes in broadcast mode.

During replay, if the event was already received, this returns immediately. Otherwise, the workflow is suspended until an event arrives on the channel.

When the event arrives (published to the channel), the workflow will be resumed.

type ReceivedMessage

type ReceivedMessage[T any] struct {
	// Message ID
	ID int64 `json:"id"`

	// Channel name
	ChannelName string `json:"channel_name"`

	// Message data
	Data T `json:"data"`

	// Metadata (optional)
	Metadata map[string]any `json:"metadata,omitempty"`

	// Sender instance ID (if sent via SendTo)
	SenderInstanceID string `json:"sender_instance_id,omitempty"`

	// When the message was created
	CreatedAt time.Time `json:"created_at"`
}

ReceivedMessage represents a message received from a channel.

func Receive

func Receive[T any](ctx *WorkflowContext, channelName string, opts ...ReceiveOption) (*ReceivedMessage[T], error)

Receive waits for and receives a message from a channel. The workflow must be subscribed to the channel before calling Receive.

This is a blocking operation - the workflow will be suspended until a message is available or the optional timeout expires.

During replay, if the message was already received, this returns immediately.

For channels subscribed with ModeDirect, this automatically receives from the direct channel (channel:instanceID) without needing to specify it.

type RetryExhaustedError

type RetryExhaustedError struct {
	ActivityName string
	Attempts     int
	LastErr      error
}

RetryExhaustedError indicates that all retry attempts have been exhausted.

func (*RetryExhaustedError) Error

func (e *RetryExhaustedError) Error() string

func (*RetryExhaustedError) Unwrap

func (e *RetryExhaustedError) Unwrap() error

type SleepOption

type SleepOption func(*sleepOptions)

SleepOption configures sleep behavior.

func WithSleepID

func WithSleepID(id string) SleepOption

WithSleepID sets a custom timer ID for the sleep. This is useful for identifying timers in logs and for deterministic replay.

type StartOption

type StartOption func(*startOptions)

StartOption configures workflow start options.

func WithInstanceID

func WithInstanceID(id string) StartOption

WithInstanceID specifies a custom instance ID. If not provided, a UUID will be generated.

type SuspendSignal

type SuspendSignal = replay.SuspendSignal

SuspendSignal is returned when a workflow needs to suspend execution. It implements the error interface for compatibility with Go's error handling, but it is NOT an error - it's a control flow signal.

type SuspendType

type SuspendType = replay.SuspendType

SuspendType represents the type of workflow suspension.

type TerminalError

type TerminalError struct {
	Err error
}

TerminalError wraps an error to indicate it should not be retried. When an activity returns a TerminalError, the workflow will fail without attempting any retries.

func NewTerminalError

func NewTerminalError(err error) *TerminalError

NewTerminalError creates a new TerminalError wrapping the given error.

func NewTerminalErrorf

func NewTerminalErrorf(format string, args ...any) *TerminalError

NewTerminalErrorf creates a new TerminalError with a formatted message.

func (*TerminalError) Error

func (e *TerminalError) Error() string

func (*TerminalError) Unwrap

func (e *TerminalError) Unwrap() error

type WaitEventOption

type WaitEventOption func(*waitEventOptions)

WaitEventOption configures event waiting behavior.

func WithEventTimeout

func WithEventTimeout(d time.Duration) WaitEventOption

WithEventTimeout sets a timeout for waiting for an event.

type Workflow

type Workflow[I, O any] interface {
	// Name returns the unique name of the workflow.
	Name() string

	// Execute runs the workflow logic.
	Execute(ctx *WorkflowContext, input I) (O, error)
}

Workflow defines the interface for a durable workflow. I is the input type, O is the output type.

func GetWorkflow

func GetWorkflow[I, O any](name string) Workflow[I, O]

GetWorkflow retrieves a registered workflow by name. Returns nil if not found.

type WorkflowCancelledError

type WorkflowCancelledError struct {
	InstanceID string
	Reason     string
}

WorkflowCancelledError indicates that a workflow has been cancelled.

func (*WorkflowCancelledError) Error

func (e *WorkflowCancelledError) Error() string

type WorkflowContext

type WorkflowContext struct {
	// contains filtered or unexported fields
}

WorkflowContext provides context for workflow execution. It manages activity ID generation, replay state, and history caching.

func GetWorkflowContext

func GetWorkflowContext(ctx context.Context) *WorkflowContext

GetWorkflowContext retrieves the WorkflowContext from a context.Context. This is useful in activities to access workflow-level features like Session() for database operations within the same transaction.

Returns nil if the context does not contain a WorkflowContext.

Example:

activity := romancy.DefineActivity("process_order", func(ctx context.Context, input OrderInput) (OrderResult, error) {
    wfCtx := romancy.GetWorkflowContext(ctx)
    if wfCtx == nil {
        return OrderResult{}, fmt.Errorf("workflow context not available")
    }
    session := wfCtx.Session()
    // Use session for database operations...
    return OrderResult{}, nil
})

func NewWorkflowContext

func NewWorkflowContext(
	ctx context.Context,
	instanceID, workerID, workflowName string,
	historyCache map[string]any,
	isReplaying bool,
) *WorkflowContext

NewWorkflowContext creates a new WorkflowContext.

func NewWorkflowContextFromExecution

func NewWorkflowContextFromExecution(execCtx *replay.ExecutionContext) *WorkflowContext

NewWorkflowContextFromExecution creates a WorkflowContext from a replay ExecutionContext.

func (*WorkflowContext) App added in v0.2.0

func (c *WorkflowContext) App() *App

App returns the App reference for this workflow context. Returns nil if not set.

func (*WorkflowContext) Cancel

func (c *WorkflowContext) Cancel()

Cancel cancels the workflow execution.

func (*WorkflowContext) Context

func (c *WorkflowContext) Context() context.Context

Context returns the underlying context.Context.

func (*WorkflowContext) Done

func (c *WorkflowContext) Done() <-chan struct{}

Done returns a channel that's closed when the context is cancelled.

func (*WorkflowContext) Err

func (c *WorkflowContext) Err() error

Err returns any error from the context.

func (*WorkflowContext) GenerateActivityID

func (c *WorkflowContext) GenerateActivityID(functionName string) string

GenerateActivityID generates a unique activity ID for the given function name. Format: {function_name}:{counter} This is used for deterministic replay - the same sequence of calls will always generate the same IDs.

func (*WorkflowContext) GetCachedResult

func (c *WorkflowContext) GetCachedResult(activityID string) (any, bool)

GetCachedResult retrieves a cached result for the given activity ID. Returns the result and true if found, or nil and false if not cached.

func (*WorkflowContext) GetCachedResultRaw

func (c *WorkflowContext) GetCachedResultRaw(activityID string) ([]byte, bool)

GetCachedResultRaw retrieves a cached result with raw JSON bytes. Returns (rawJSON, true) if found, (nil, false) if not found. Use this to avoid re-serialization when the raw JSON is needed.

func (*WorkflowContext) Hooks

func (c *WorkflowContext) Hooks() hooks.WorkflowHooks

Hooks returns the workflow hooks for observability. Returns nil if hooks are not available (e.g., not using replay engine).

func (*WorkflowContext) InstanceID

func (c *WorkflowContext) InstanceID() string

InstanceID returns the workflow instance ID.

func (*WorkflowContext) IsReplaying

func (c *WorkflowContext) IsReplaying() bool

IsReplaying returns true if the workflow is being replayed.

func (*WorkflowContext) RecordActivityID

func (c *WorkflowContext) RecordActivityID(activityID string)

RecordActivityID records that an activity ID has been used. This is used for tracking the current activity during execution.

func (*WorkflowContext) RecordActivityResult

func (c *WorkflowContext) RecordActivityResult(activityID string, result any, err error) error

RecordActivityResult records an activity result to storage and caches it. This is called after activity execution when NOT replaying.

func (*WorkflowContext) Session

func (c *WorkflowContext) Session() storage.Executor

Session returns the database executor for the current context. When called within a transactional activity, this returns the transaction, allowing you to execute custom SQL queries in the same transaction as the activity execution, history recording, and outbox events.

Returns nil if storage is not available.

Example:

activity := romancy.DefineActivity("process_order", func(ctx context.Context, input OrderInput) (OrderResult, error) {
    // Get the database session (transaction if in transactional activity)
    wfCtx := romancy.GetWorkflowContext(ctx)
    session := wfCtx.Session()
    if session != nil {
        // Execute custom SQL in the same transaction
        _, err := session.ExecContext(ctx, "INSERT INTO orders ...", input.OrderID)
        if err != nil {
            return OrderResult{}, err
        }
    }
    return OrderResult{Status: "completed"}, nil
})

func (*WorkflowContext) SetApp added in v0.2.0

func (c *WorkflowContext) SetApp(app *App)

SetApp sets the App reference for this workflow context. This is typically called internally when creating the context.

func (*WorkflowContext) SetCachedResult

func (c *WorkflowContext) SetCachedResult(activityID string, result any)

SetCachedResult stores a result in the history cache. This is typically called after activity execution to cache results locally.

func (*WorkflowContext) Storage

func (c *WorkflowContext) Storage() storage.Storage

Storage returns the storage interface for advanced operations like compensation. Returns nil if not using the replay engine.

func (*WorkflowContext) WithContext

func (c *WorkflowContext) WithContext(ctx context.Context) *WorkflowContext

WithContext returns a copy of the WorkflowContext with a new underlying context. This is useful for passing transaction contexts to storage operations.

func (*WorkflowContext) WorkerID

func (c *WorkflowContext) WorkerID() string

WorkerID returns the worker ID that is executing this workflow.

func (*WorkflowContext) WorkflowName

func (c *WorkflowContext) WorkflowName() string

WorkflowName returns the workflow name.

type WorkflowFunc

type WorkflowFunc[I, O any] struct {
	// contains filtered or unexported fields
}

WorkflowFunc is a convenience type for workflows defined as functions.

func DefineWorkflow

func DefineWorkflow[I, O any](name string, fn func(ctx *WorkflowContext, input I) (O, error)) *WorkflowFunc[I, O]

DefineWorkflow creates a new workflow from a function.

func (*WorkflowFunc[I, O]) Execute

func (w *WorkflowFunc[I, O]) Execute(ctx *WorkflowContext, input I) (O, error)

Execute runs the workflow function.

func (*WorkflowFunc[I, O]) Name

func (w *WorkflowFunc[I, O]) Name() string

Name returns the workflow name.

type WorkflowNotFoundError

type WorkflowNotFoundError struct {
	InstanceID string
}

WorkflowNotFoundError indicates that a workflow instance was not found.

func (*WorkflowNotFoundError) Error

func (e *WorkflowNotFoundError) Error() string

type WorkflowOption

type WorkflowOption func(*workflowOptions)

WorkflowOption configures workflow registration.

func WithEventHandler

func WithEventHandler(enabled bool) WorkflowOption

WithEventHandler marks the workflow as an event handler. When true, the workflow will be automatically started when a CloudEvent with a matching type is received.

type WorkflowResult

type WorkflowResult[O any] struct {
	InstanceID string
	Status     string
	Output     O
	Error      error
}

WorkflowResult represents the result of a workflow execution.

func GetWorkflowResult

func GetWorkflowResult[O any](ctx context.Context, app *App, instanceID string) (*WorkflowResult[O], error)

GetWorkflowResult retrieves the result of a workflow instance.

Directories

Path Synopsis
cmd
crosstest command
Package main provides a CLI tool for cross-language channel testing between Romancy (Go) and Edda (Python).
Package main provides a CLI tool for cross-language channel testing between Romancy (Go) and Edda (Python).
romancy command
Package main provides a CLI tool for interacting with Romancy workflows.
Package main provides a CLI tool for interacting with Romancy workflows.
Package compensation provides saga compensation execution.
Package compensation provides saga compensation execution.
examples
booking command
Package main demonstrates human-in-the-loop workflow patterns.
Package main demonstrates human-in-the-loop workflow patterns.
channel command
Package main demonstrates channel-based message passing between workflows.
Package main demonstrates channel-based message passing between workflows.
demo command
Package main demonstrates a simple order processing workflow using Romancy.
Package main demonstrates a simple order processing workflow using Romancy.
event_handler command
Package main demonstrates event-driven workflow patterns.
Package main demonstrates event-driven workflow patterns.
llm command
Package main demonstrates LLM integration with Romancy workflows.
Package main demonstrates LLM integration with Romancy workflows.
mcp command
Package main demonstrates MCP (Model Context Protocol) integration with Romancy.
Package main demonstrates MCP (Model Context Protocol) integration with Romancy.
retry command
Package main demonstrates retry policies in activities.
Package main demonstrates retry policies in activities.
saga command
Package main demonstrates the Saga pattern with compensation.
Package main demonstrates the Saga pattern with compensation.
simple command
Package main demonstrates a simple workflow without events.
Package main demonstrates a simple workflow without events.
timer command
Package main demonstrates sleep/timer functionality in workflows.
Package main demonstrates sleep/timer functionality in workflows.
Package hooks provides lifecycle hooks for workflow observability.
Package hooks provides lifecycle hooks for workflow observability.
otel
Package otel provides OpenTelemetry integration for Romancy workflow hooks.
Package otel provides OpenTelemetry integration for Romancy workflow hooks.
internal
migrations
Package migrations provides automatic dbmate-compatible migration support.
Package migrations provides automatic dbmate-compatible migration support.
notify
Package notify provides PostgreSQL LISTEN/NOTIFY functionality for real-time notifications.
Package notify provides PostgreSQL LISTEN/NOTIFY functionality for real-time notifications.
storage
Package storage provides the storage layer for Romancy.
Package storage provides the storage layer for Romancy.
Package llm provides durable LLM integration for Romancy workflows.
Package llm provides durable LLM integration for Romancy workflows.
Package mcp provides Model Context Protocol (MCP) integration for Romancy.
Package mcp provides Model Context Protocol (MCP) integration for Romancy.
Package outbox provides transactional outbox pattern for reliable event delivery.
Package outbox provides transactional outbox pattern for reliable event delivery.
Package retry provides retry policies for activities.
Package retry provides retry policies for activities.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL