rea

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 20, 2025 License: MIT Imports: 16 Imported by: 0

Documentation

Overview

Package framework provides a high-level abstraction layer for the Restate Go SDK, enforcing all best practices ("dos and don'ts") and eliminating boilerplate when writing Restate services.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ClearAll

func ClearAll(ctx interface{}) error

ClearAll removes all state. Only permitted from exclusive contexts.

func ConfigureSecureServer

func ConfigureSecureServer(cfg SecurityConfig)

ConfigureSecureServer logs security configuration (server options handled by SDK directly)

func FanOutFail

func FanOutFail[T any](
	ctx restate.Context,
	operations []func() (T, error),
) ([]T, error)

FanOutFail executes operations concurrently and fails fast on first error

func ForEach

func ForEach[T any](
	ctx restate.WorkflowContext,
	items []T,
	body func(item T, index int) error,
) error

ForEach executes body for each item in the slice (standalone generic function)

func Gather

func Gather(ctx restate.Context, futures ...restate.Future) ([]any, error)

Gather waits for all futures to complete and returns results. Returns slice of interface{} - caller must type assert.

func GetInternalSignal

func GetInternalSignal[T any](ctx restate.WorkflowSharedContext, name string) restate.DurablePromise[T]

GetInternalSignal obtains a durable promise for intra-workflow coordination.

func GuardRunContext

func GuardRunContext(rc restate.RunContext)

GuardRunContext ensures restate.Run is only called from appropriate contexts.

func HandleGuardrailViolation

func HandleGuardrailViolation(
	violation GuardrailViolation,
	logger *slog.Logger,
	policyOverride FrameworkPolicy,
) error

HandleGuardrailViolation processes a guardrail violation according to policy. Returns an error if the violation should fail execution (strict mode). Logs a warning if the violation should be noted but execution continues (warn mode). Does nothing if validation is disabled.

If policyOverride is empty, uses the global framework policy.

func MapConcurrent

func MapConcurrent[I, O any](
	ctx restate.Context,
	items []I,
	mapper func(I) (O, error),
) ([]O, error)

MapConcurrent applies a function to each item concurrently

func NewTerminalError

func NewTerminalError(err error) error

NewTerminalError creates a terminal error for permanent business failures.

func ParallelInvoke

func ParallelInvoke[T any](
	ctx restate.Context,
	clients []ServiceClient[any, T],
	inputs []any,
) ([]T, error)

ParallelInvoke invokes multiple service calls concurrently

func ParseSigningKey

func ParseSigningKey(keyBase64 string) (ed25519.PublicKey, error)

ParseSigningKey parses a base64-encoded Ed25519 public key

func ParseSigningKeys

func ParseSigningKeys(keysBase64 []string) ([]ed25519.PublicKey, error)

ParseSigningKeys parses multiple base64-encoded Ed25519 public keys

func ProcessBatch

func ProcessBatch[I, O any](
	ctx restate.Context,
	items []I,
	processor func(I) (O, error),
	maxConcurrency int,
) ([]O, error)

ProcessBatch processes items in batches with controlled concurrency (helper function)

func RaceAwakeableWithTimeout

func RaceAwakeableWithTimeout[T any](
	ctx restate.WorkflowContext,
	awakeable restate.AwakeableFuture[T],
	timeout time.Duration,
	timeoutValue T,
) (T, bool, error)

RaceAwakeableWithTimeout races an awakeable against a timeout (standalone generic function)

func RandChoice

func RandChoice[T any](ctx restate.Context, items []T) (T, error)

RandChoice picks a deterministic random item from a slice

func RejectExternalSignal

func RejectExternalSignal(ctx restate.Context, id string, reason error)

RejectExternalSignal fails an awakeable from an external system.

func ResolveExternalSignal

func ResolveExternalSignal[T any](ctx restate.Context, id string, value T)

ResolveExternalSignal completes an awakeable from an external callback handler.

func RunAsync

func RunAsync[T any](
	ctx restate.Context,
	operation func(restate.RunContext) (T, error),
	opts ...restate.RunOption,
) restate.Future

RunAsync executes a side effect asynchronously and returns a future

func RunAsyncWithRetry

func RunAsyncWithRetry[T any](
	ctx restate.Context,
	cfg RunConfig,
	operation func(restate.RunContext) (T, error),
) restate.Future

RunAsyncWithRetry combines RunAsync with retry logic

func RunDo

func RunDo[T any](
	ctx restate.Context,
	operation func(restate.RunContext) (T, error),
	opts ...restate.RunOption,
) (T, error)

RunDo executes a function within restate.Run, ensuring the function receives only RunContext to prevent accidentally capturing outer context.

Use this for side effects that need to return a value. The function you provide should ONLY use the RunContext parameter (rc), never the outer ctx.

Example:

user, err := RunDo(ctx, func(rc restate.RunContext) (User, error) {
    // Do NOT use ctx here - only use rc if needed
    return fetchUserFromDB(userID) // side effect
}, restate.WithName("fetch-user"))

func RunDoVoid

func RunDoVoid(
	ctx restate.Context,
	operation func(restate.RunContext) error,
	opts ...restate.RunOption,
) error

RunDoVoid executes a void function within restate.Run.

Use this for side effects that don't return a meaningful value. The function should ONLY use the RunContext parameter, never the outer ctx.

Example:

err := RunDoVoid(ctx, func(rc restate.RunContext) error {
    // Do NOT use ctx here
    return sendEmail(userEmail, subject, body)
}, restate.WithName("send-email"))

func RunWithRetry

func RunWithRetry[T any](
	ctx restate.Context,
	cfg RunConfig,
	operation func(restate.RunContext) (T, error),
) (T, error)

RunWithRetry executes a side effect with automatic retry on transient errors

func SecureHandlerFunc

func SecureHandlerFunc(validator *SecurityValidator, handler http.HandlerFunc) http.HandlerFunc

SecureHandlerFunc is a convenience wrapper for securing a single http.HandlerFunc

Usage:

http.HandleFunc("/restate", SecureHandlerFunc(validator, myHandler))

func SecureServer

func SecureServer(validator *SecurityValidator, mux *http.ServeMux) http.Handler

SecureServer wraps an entire http.ServeMux with security middleware

Usage:

mux := http.NewServeMux()
mux.HandleFunc("/restate", restateHandler)
mux.HandleFunc("/health", healthHandler)

securedMux := SecureServer(validator, mux)
http.ListenAndServe(":8080", securedMux)

func SecurityMiddleware

func SecurityMiddleware(validator *SecurityValidator) func(http.Handler) http.Handler

SecurityMiddleware creates an HTTP middleware that enforces SecurityConfig

This middleware validates incoming requests according to the SecurityConfig:

  • Signature verification (if EnableRequestValidation is true)
  • HTTPS enforcement (if RequireHTTPS is true)
  • Origin validation (if AllowedOrigins is configured)

Behavior per ValidationMode:

  • SecurityModeStrict: Reject invalid requests with HTTP 401/403
  • SecurityModePermissive: Log warnings but allow requests through
  • SecurityModeDisabled: Skip all validation

Usage:

config := DefaultSecurityConfig()
config.SigningKeys = []ed25519.PublicKey{publicKey}
validator := NewSecurityValidator(config, slog.Default())

http.Handle("/restate", SecurityMiddleware(validator)(restateHandler))

func SetFrameworkPolicy

func SetFrameworkPolicy(policy FrameworkPolicy)

SetFrameworkPolicy sets the global framework policy. This should be called early in application initialization.

func UpdateStatus

func UpdateStatus(ctx restate.WorkflowContext, statusKey string, update StatusData) error

UpdateStatus updates workflow status (must be called from exclusive run handler)

func ValidateIdempotencyKey

func ValidateIdempotencyKey(key string) error

ValidateIdempotencyKey checks if an idempotency key appears to be deterministic. Returns an error if the key contains patterns that suggest non-deterministic generation.

func ValidateServiceDefinition

func ValidateServiceDefinition(svc any) error

ValidateServiceDefinition checks that a service struct follows Restate rules.

func ValidateServiceEndpoint

func ValidateServiceEndpoint(endpoint string, requireHTTPS bool) error

ValidateServiceEndpoint checks if a service endpoint meets security requirements

func WaitForExternalSignal

func WaitForExternalSignal[T any](ctx restate.Context) restate.AwakeableFuture[T]

WaitForExternalSignal creates a durable awakeable for external coordination.

func WrapTerminalError

func WrapTerminalError(err error, statusCode int) error

WrapTerminalError wraps an error with custom terminal status code.

Types

type BatchProcessor

type BatchProcessor struct {
	// contains filtered or unexported fields
}

BatchProcessor handles batch processing with concurrency control

func NewBatchProcessor

func NewBatchProcessor(ctx restate.Context, maxConcurrency int) *BatchProcessor

NewBatchProcessor creates a batch processor with concurrency limit

type CallOption

type CallOption struct {
	IdempotencyKey string
	Delay          time.Duration
	ValidationMode IdempotencyValidationMode // Controls validation behavior (warn/fail/disabled)
}

CallOption configures inter-service calls

type CompensationStrategy

type CompensationStrategy int

CompensationStrategy defines how to handle partial compensation

const (
	// CompensateAll runs all compensations (default, all-or-nothing)
	CompensateAll CompensationStrategy = iota

	// CompensateCompleted only compensates successfully completed steps
	CompensateCompleted

	// CompensateBestEffort tries all compensations, continues on errors
	CompensateBestEffort

	// CompensateUntilSuccess stops after first successful compensation
	CompensateUntilSuccess
)

type ControlPlaneService

type ControlPlaneService struct {
	// contains filtered or unexported fields
}

ControlPlaneService provides orchestration capabilities with built-in saga management.

func NewControlPlaneService

func NewControlPlaneService(ctx restate.WorkflowContext, name string, idempotencyPrefix string) *ControlPlaneService

NewControlPlaneService creates an orchestrator with automatic saga setup.

func (*ControlPlaneService) AddCompensationStep

func (cp *ControlPlaneService) AddCompensationStep(name string, payload any, dedupe bool) error

AddCompensationStep persists a compensation step before executing main action.

func (*ControlPlaneService) AwaitHumanApproval

func (cp *ControlPlaneService) AwaitHumanApproval(
	ctx restate.Context,
	approvalID string,
	timeout time.Duration,
) (approved bool, err error)

AwaitHumanApproval coordinates human-in-the-loop with durable timeout.

func (*ControlPlaneService) GenerateIdempotencyKey

func (cp *ControlPlaneService) GenerateIdempotencyKey(ctx restate.Context, suffix string) string

GenerateIdempotencyKey creates a deterministic idempotency key for external calls. Uses restate.UUID to ensure deterministic generation across retries. IMPORTANT: Must be called from within a durable handler context.

func (*ControlPlaneService) GenerateIdempotencyKeyDeterministic

func (cp *ControlPlaneService) GenerateIdempotencyKeyDeterministic(businessKeys ...string) string

GenerateIdempotencyKeyDeterministic creates an idempotency key using only deterministic inputs. Use this when you need a predictable key based on business data (e.g., user ID + order ID).

func (*ControlPlaneService) Orchestrate

func (cp *ControlPlaneService) Orchestrate(fn func() error) (err error)

Orchestrate executes a function within a saga context with automatic compensation.

func (*ControlPlaneService) RegisterCompensation

func (cp *ControlPlaneService) RegisterCompensation(name string, fn SagaCompensationFunc)

RegisterCompensation adds a compensation handler to the saga.

type DataPlaneService

type DataPlaneService[I, O any] struct {
	Name    string
	Handler func(restate.RunContext, I) (O, error)
}

DataPlaneService encapsulates pure business logic with automatic durability.

func NewStatelessService

func NewStatelessService[I, O any](
	name string,
	operation func(restate.RunContext, I) (O, error),
) *DataPlaneService[I, O]

NewStatelessService creates a durable data plane service.

func (*DataPlaneService[I, O]) Execute

func (dp *DataPlaneService[I, O]) Execute(ctx restate.Context, input I) (O, error)

Execute runs the data plane operation within a durable context.

type DeterministicHelpers

type DeterministicHelpers struct {
	// contains filtered or unexported fields
}

DeterministicHelpers provides deterministic operations

func NewDeterministicHelpers

func NewDeterministicHelpers(ctx restate.Context) *DeterministicHelpers

NewDeterministicHelpers creates helpers for deterministic operations

func (*DeterministicHelpers) RandFloat

func (h *DeterministicHelpers) RandFloat() float64

RandFloat generates a deterministic random float64 between 0.0 and 1.0

func (*DeterministicHelpers) RandInt

func (h *DeterministicHelpers) RandInt(min, max int) int

RandInt generates a deterministic random integer

func (*DeterministicHelpers) UUID

func (h *DeterministicHelpers) UUID() string

UUID generates a deterministic UUID (same on replay)

type FanOutResult

type FanOutResult[T any] struct {
	Results []T
	Errors  []error
	Failed  int
	Success int
}

FanOutResult represents the result of a fan-out operation

func FanOut

func FanOut[T any](
	ctx restate.Context,
	operations []func() (T, error),
) FanOutResult[T]

FanOut executes multiple operations concurrently and collects all results Continues execution even if some operations fail

type FrameworkPolicy

type FrameworkPolicy string

FrameworkPolicy controls the strictness of all framework runtime checks

const (
	// PolicyStrict fails fast on any guardrail violation
	// Recommended for: CI pipelines, production environments
	PolicyStrict FrameworkPolicy = "strict"

	// PolicyWarn logs warnings but allows execution to continue
	// Recommended for: Local development, staging environments
	PolicyWarn FrameworkPolicy = "warn"

	// PolicyDisabled skips most validation checks
	// NOT RECOMMENDED: Only for testing/debugging specific issues
	PolicyDisabled FrameworkPolicy = "disabled"
)

func GetFrameworkPolicy

func GetFrameworkPolicy() FrameworkPolicy

GetFrameworkPolicy returns the current global framework policy

type GuardrailViolation

type GuardrailViolation struct {
	Check    string // Name of the check (e.g., "idempotency_key_validation")
	Message  string // Human-readable error message
	Severity string // "error", "warning", "info"
}

GuardrailViolation represents a framework guardrail violation

type IdempotencyValidationMode

type IdempotencyValidationMode string

IdempotencyValidationMode controls how idempotency key validation failures are handled

const (
	// IdempotencyValidationWarn logs validation errors but allows the call to proceed (default, permissive)
	IdempotencyValidationWarn IdempotencyValidationMode = "warn"

	// IdempotencyValidationFail rejects calls with invalid idempotency keys (strict)
	IdempotencyValidationFail IdempotencyValidationMode = "fail"

	// IdempotencyValidationDisabled skips validation entirely
	IdempotencyValidationDisabled IdempotencyValidationMode = "disabled"
)

type IngressCallOption

type IngressCallOption struct {
	IdempotencyKey string
	Delay          time.Duration
	Headers        map[string]string
	RequestID      string
	ValidationMode IdempotencyValidationMode // Control validation behavior
}

IngressCallOption configures ingress client calls

type IngressClient

type IngressClient struct {
	// contains filtered or unexported fields
}

IngressClient wraps the Restate ingress client for external invocations

func NewIngressClient

func NewIngressClient(baseURL string, authKey string) *IngressClient

NewIngressClient creates an ingress client for external (non-Restate) applications

type IngressObjectClient

type IngressObjectClient[I, O any] struct {
	// contains filtered or unexported fields
}

IngressObjectClient provides type-safe external calls to Virtual Objects

func IngressObject

func IngressObject[I, O any](ic *IngressClient, serviceName, handlerName string) *IngressObjectClient[I, O]

IngressObject creates a type-safe ingress client for calling a Virtual Object

func (IngressObjectClient[I, O]) AttachByIdempotencyKey

func (c IngressObjectClient[I, O]) AttachByIdempotencyKey(
	ctx context.Context,
	key string,
	idempotencyKey string,
) (O, error)

AttachByIdempotencyKey attaches to an existing object invocation using its idempotency key

func (IngressObjectClient[I, O]) Call

func (c IngressObjectClient[I, O]) Call(
	ctx context.Context,
	key string,
	input I,
	opts ...IngressCallOption,
) (O, error)

Call invokes a Virtual Object handler synchronously

func (IngressObjectClient[I, O]) Send

func (c IngressObjectClient[I, O]) Send(
	ctx context.Context,
	key string,
	input I,
	opts ...IngressCallOption,
) (string, error)

Send invokes a Virtual Object handler asynchronously

type IngressServiceClient

type IngressServiceClient[I, O any] struct {
	// contains filtered or unexported fields
}

IngressServiceClient provides type-safe external calls to stateless services

func IngressService

func IngressService[I, O any](ic *IngressClient, serviceName, handlerName string) *IngressServiceClient[I, O]

IngressService creates a type-safe ingress client for calling a stateless service

func (IngressServiceClient[I, O]) AttachByIdempotencyKey

func (c IngressServiceClient[I, O]) AttachByIdempotencyKey(
	ctx context.Context,
	idempotencyKey string,
) (O, error)

AttachByIdempotencyKey attaches to an existing service invocation using its idempotency key. This only works if the original invocation was started with an idempotency key.

func (IngressServiceClient[I, O]) Call

func (c IngressServiceClient[I, O]) Call(
	ctx context.Context,
	input I,
	opts ...IngressCallOption,
) (O, error)

Call invokes a service handler synchronously and waits for the result

func (IngressServiceClient[I, O]) Send

func (c IngressServiceClient[I, O]) Send(
	ctx context.Context,
	input I,
	opts ...IngressCallOption,
) (string, error)

Send invokes a service handler asynchronously (fire-and-forget)

type IngressWorkflowClient

type IngressWorkflowClient[I, O any] struct {
	// contains filtered or unexported fields
}

IngressWorkflowClient provides type-safe external calls to Workflows

func IngressWorkflow

func IngressWorkflow[I, O any](ic *IngressClient, serviceName, handlerName string) *IngressWorkflowClient[I, O]

IngressWorkflow creates a type-safe ingress client for calling a Workflow

func (IngressWorkflowClient[I, O]) Attach

func (c IngressWorkflowClient[I, O]) Attach(
	ctx context.Context,
	workflowID string,
) (O, error)

Attach attaches to an existing workflow and waits for result. Works by workflow ID alone (workflows use ID as the key).

func (IngressWorkflowClient[I, O]) GetOutput

func (c IngressWorkflowClient[I, O]) GetOutput(
	ctx context.Context,
	workflowID string,
	outputHandler string,
) (O, error)

GetOutput queries workflow output via shared handler. This is useful when you want to query the workflow result without running the main handler.

func (IngressWorkflowClient[I, O]) Submit

func (c IngressWorkflowClient[I, O]) Submit(
	ctx context.Context,
	workflowID string,
	input I,
	opts ...IngressCallOption,
) (string, error)

Submit starts a new workflow instance

type InstrumentedServiceClient

type InstrumentedServiceClient[I, O any] struct {
	Client  ServiceClient[I, O]
	Metrics *MetricsCollector
	Tracing *TracingContext
	Hooks   *ObservabilityHooks
}

InstrumentedServiceClient wraps ServiceClient with observability

func NewInstrumentedClient

func NewInstrumentedClient[I, O any](
	client ServiceClient[I, O],
	metrics *MetricsCollector,
	tracing *TracingContext,
	hooks *ObservabilityHooks,
) *InstrumentedServiceClient[I, O]

NewInstrumentedClient creates an instrumented service client

func (*InstrumentedServiceClient[I, O]) Call

func (isc *InstrumentedServiceClient[I, O]) Call(
	ctx restate.Context,
	input I,
	opts ...CallOption,
) (O, error)

Call executes with full observability

type LoopBody

type LoopBody func(iteration int) error

LoopBody is the function executed in each iteration

type LoopCondition

type LoopCondition func() (shouldContinue bool, err error)

LoopCondition is a function that determines if loop should continue

type MetricsCollector

type MetricsCollector struct {
	// Counters
	InvocationTotal    map[string]int64
	InvocationErrors   map[string]int64
	CompensationTotal  map[string]int64
	CompensationErrors map[string]int64

	// Gauges
	ActiveInvocations map[string]int64
	StateSize         map[string]int64

	// Histograms (stored as buckets)
	InvocationDuration   map[string][]float64
	CompensationDuration map[string][]float64
	// contains filtered or unexported fields
}

MetricsCollector provides Prometheus-compatible metrics collection

func NewMetricsCollector

func NewMetricsCollector() *MetricsCollector

NewMetricsCollector creates a new metrics collector

func (*MetricsCollector) DecrementActiveInvocations

func (mc *MetricsCollector) DecrementActiveInvocations(serviceName string)

DecrementActiveInvocations decrements active invocation gauge

func (*MetricsCollector) GetMetrics

func (mc *MetricsCollector) GetMetrics() map[string]interface{}

GetMetrics returns a snapshot of all metrics

func (*MetricsCollector) IncrementActiveInvocations

func (mc *MetricsCollector) IncrementActiveInvocations(serviceName string)

IncrementActiveInvocations increments active invocation gauge

func (*MetricsCollector) RecordCompensation

func (mc *MetricsCollector) RecordCompensation(stepName string, duration time.Duration, err error)

RecordCompensation records a saga compensation execution

func (*MetricsCollector) RecordInvocation

func (mc *MetricsCollector) RecordInvocation(serviceName, handlerName string, duration time.Duration, err error)

RecordInvocation records a service invocation

func (*MetricsCollector) RecordStateSize

func (mc *MetricsCollector) RecordStateSize(objectKey string, sizeBytes int64)

RecordStateSize records state size in bytes

type MutableState

type MutableState[T any] struct {
	// contains filtered or unexported fields
}

MutableState provides compile-time safe state access for exclusive contexts

func NewMutableObjectState

func NewMutableObjectState[T any](ctx restate.ObjectContext, key string) *MutableState[T]

NewMutableObjectState creates state for Object exclusive handlers

func NewMutableWorkflowState

func NewMutableWorkflowState[T any](ctx restate.WorkflowContext, key string) *MutableState[T]

NewMutableWorkflowState creates state for Workflow run handlers

func (*MutableState[T]) Clear

func (s *MutableState[T]) Clear()

Clear removes the value

func (*MutableState[T]) Get

func (s *MutableState[T]) Get() (T, error)

Get retrieves the value

func (*MutableState[T]) Set

func (s *MutableState[T]) Set(value T) error

Set updates the value (guaranteed mutable by constructor)

type ObjectClient

type ObjectClient[I, O any] struct {
	ServiceName string
	HandlerName string
}

ObjectClient provides type-safe communication with Virtual Object services

func (ObjectClient[I, O]) Call

func (c ObjectClient[I, O]) Call(
	ctx restate.Context,
	key string,
	input I,
	opts ...CallOption,
) (O, error)

Call invokes a Virtual Object handler with the specified key (request-response)

func (ObjectClient[I, O]) RequestFuture

func (c ObjectClient[I, O]) RequestFuture(
	ctx restate.Context,
	key string,
	input I,
) restate.Future

RequestFuture invokes a Virtual Object handler and returns a future (for concurrent calls) Returns the future for use with restate.Wait() or restate.WaitFirst()

func (ObjectClient[I, O]) Send

func (c ObjectClient[I, O]) Send(
	ctx restate.Context,
	key string,
	input I,
	opts ...CallOption,
) restate.Invocation

Send invokes a Virtual Object handler asynchronously (one-way)

type ObservabilityHooks

type ObservabilityHooks struct {
	// Invocation hooks
	OnInvocationStart func(serviceName, handlerName string, input interface{})
	OnInvocationEnd   func(serviceName, handlerName string, output interface{}, err error, duration time.Duration)

	// State hooks
	OnStateGet   func(key string, value interface{})
	OnStateSet   func(key string, value interface{})
	OnStateClear func(key string)

	// Saga hooks
	OnSagaStart         func(sagaName string)
	OnSagaStepAdded     func(sagaName, stepName string)
	OnCompensationStart func(stepName string)
	OnCompensationEnd   func(stepName string, err error, duration time.Duration)

	// Workflow hooks
	OnWorkflowStart func(workflowID string, input interface{})
	OnWorkflowEnd   func(workflowID string, output interface{}, err error)

	// Error hooks
	OnError func(context string, err error)
}

ObservabilityHooks provides hooks for custom observability

func DefaultObservabilityHooks

func DefaultObservabilityHooks(log *slog.Logger) *ObservabilityHooks

DefaultObservabilityHooks returns hooks that log to slog

type OpenTelemetrySpan

type OpenTelemetrySpan struct {
	TraceID    string
	SpanID     string
	ParentID   string
	Name       string
	StartTime  time.Time
	EndTime    time.Time
	Attributes map[string]string
	Status     string
	Error      error
}

OpenTelemetrySpan represents a trace span

type PartialCompensationConfig

type PartialCompensationConfig struct {
	Strategy        CompensationStrategy
	ContinueOnError bool
	MaxRetries      int
}

PartialCompensationConfig configures partial compensation behavior

type PromiseRaceResult

type PromiseRaceResult[T any] struct {
	Value         T
	TimedOut      bool
	PromiseWon    bool
	PromiseResult T
	Error         error
}

PromiseRaceResult indicates which future won the race (promise vs timeout)

func RacePromiseWithTimeout

func RacePromiseWithTimeout[T any](
	ctx restate.WorkflowContext,
	promiseName string,
	timeout time.Duration,
) (PromiseRaceResult[T], error)

RacePromiseWithTimeout races a promise against a timeout (standalone generic function)

type PromiseRacer

type PromiseRacer struct {
	// contains filtered or unexported fields
}

PromiseRacer provides utilities for racing promises against timeouts

func NewPromiseRacer

func NewPromiseRacer(ctx restate.WorkflowContext) *PromiseRacer

NewPromiseRacer creates a promise racing utility

type RaceResult

type RaceResult struct {
	Winner restate.Future
	Index  int
	Error  error
}

RaceResult represents the winner of a race between futures.

func Race

func Race(ctx restate.Context, futures ...restate.Future) (*RaceResult, error)

Race executes multiple futures and returns the first to complete.

type ReadOnlyState

type ReadOnlyState[T any] struct {
	// contains filtered or unexported fields
}

ReadOnlyState provides read-only state access for shared contexts

func NewReadOnlyObjectState

func NewReadOnlyObjectState[T any](ctx restate.ObjectSharedContext, key string) *ReadOnlyState[T]

NewReadOnlyObjectState creates read-only state for Object shared handlers

func NewReadOnlyWorkflowState

func NewReadOnlyWorkflowState[T any](ctx restate.WorkflowSharedContext, key string) *ReadOnlyState[T]

NewReadOnlyWorkflowState creates read-only state for Workflow shared handlers

func (*ReadOnlyState[T]) Get

func (s *ReadOnlyState[T]) Get() (T, error)

Get retrieves the value (no Set method exists!)

type RequestValidationResult

type RequestValidationResult struct {
	Valid         bool
	KeyIndex      int    // Index of the signing key that validated (if Valid=true)
	ErrorMessage  string // Human-readable error if Valid=false
	RequestOrigin string // Origin header from the request
}

RequestValidationResult contains the outcome of request signature validation

type RunConfig

type RunConfig struct {
	MaxRetries    int
	InitialDelay  time.Duration
	MaxDelay      time.Duration
	BackoffFactor float64
	Name          string
}

RunConfig configures retry behavior for Run blocks

func DefaultRunConfig

func DefaultRunConfig(name string) RunConfig

DefaultRunConfig returns sensible defaults for Run blocks

type SafeStep

type SafeStep[T any] struct {
	// contains filtered or unexported fields
}

SafeStep enforces "register compensation BEFORE action" pattern

func (*SafeStep[T]) Execute

func (step *SafeStep[T]) Execute(
	ctx restate.Context,
	action func() (T, error),
) (T, error)

Execute runs the action (compensation must be registered first)

func (*SafeStep[T]) WithCompensation

func (step *SafeStep[T]) WithCompensation(compensation SagaCompensationFunc) *SafeStep[T]

WithCompensation registers the compensation (MUST be called before Execute)

type SagaCompensationFunc

type SagaCompensationFunc func(rc restate.RunContext, payload []byte) error

SagaCompensationFunc executes a compensation action inside restate.Run.

func ValidateCompensationIdempotent

func ValidateCompensationIdempotent(
	name string,
	handler SagaCompensationFunc,
) SagaCompensationFunc

ValidateCompensationIdempotent is a documentation/lint helper for compensation handlers.

While the framework cannot enforce idempotency at runtime, this helper:

  1. Documents the idempotency contract clearly in code
  2. Provides a consistent API for developers
  3. Enables potential static analysis/linting
  4. Adds runtime logging to track compensation execution

Compensation handlers MUST be idempotent because:

  • Restate may replay them during recovery
  • Retries will execute the same compensation multiple times
  • Network failures can cause duplicate attempts

Usage:

saga.Register("charge_payment", ValidateCompensationIdempotent(
    "charge_payment",
    func(rc restate.RunContext, payload []byte) error {
        var data PaymentData
        json.Unmarshal(payload, &data)
        // ✅ Idempotent: refund checks if already refunded
        return refundPaymentIdempotent(data.PaymentID)
    },
))

Idempotency Patterns:

  1. Check-Then-Act: Verify side effect not already done
  2. Use External Idempotency Keys: Payment provider handles deduplication
  3. State-Based Deduplication: Store completion flag in workflow state
  4. Set Absolute Values: Don't increment/decrement, set exact values

Anti-Patterns to Avoid:

❌ Incrementing/decrementing without checks
❌ Creating resources without unique constraints
❌ Time-dependent logic (non-deterministic)
❌ Assuming single execution

See SAGA_GUIDE.MD for detailed patterns and examples.

type SagaConfig

type SagaConfig struct {
	MaxRetries         int
	InitialRetryDelay  time.Duration
	MaxRetryDelay      time.Duration
	FailOnCleanupError bool
	DLQKey             string
}

SagaConfig controls retry behavior and DLQ handling.

func DefaultSagaConfig

func DefaultSagaConfig() SagaConfig

DefaultSagaConfig returns production-ready retry defaults.

type SagaEntry

type SagaEntry struct {
	Name      string    `json:"name"`
	Payload   []byte    `json:"payload"`
	StepID    string    `json:"step_id"`
	Timestamp time.Time `json:"timestamp"`
	Attempt   int       `json:"attempt"`
}

SagaEntry is the persisted record of a compensation step.

type SagaFramework

type SagaFramework struct {
	// contains filtered or unexported fields
}

SagaFramework manages durable compensation for control plane operations.

func NewSaga

func NewSaga(ctx restate.WorkflowContext, name string, cfg *SagaConfig) *SagaFramework

NewSaga creates a saga bound to a workflow context.

func (*SagaFramework) Add

func (s *SagaFramework) Add(name string, payload any, dedupe bool) error

Add persists a compensation step BEFORE executing the main action.

func (*SagaFramework) CompensateIfNeeded

func (s *SagaFramework) CompensateIfNeeded(errPtr *error)

CompensateIfNeeded executes all compensations in reverse order if error occurred.

func (*SagaFramework) NewSafeStep

func (s *SagaFramework) NewSafeStep(name string) *SafeStep[any]

NewSafeStep creates a step that enforces compensation-before-action

func (*SagaFramework) Register

func (s *SagaFramework) Register(name string, fn SagaCompensationFunc)

Register adds a compensation handler. Must be called before Add.

func (*SagaFramework) RollbackWithStrategy

func (s *SagaFramework) RollbackWithStrategy(
	ctx restate.WorkflowContext,
	strategy CompensationStrategy,
) error

RollbackWithStrategy executes compensations with the specified strategy Simplified version that works with existing saga implementation

func (*SagaFramework) SetCompensationStrategy

func (s *SagaFramework) SetCompensationStrategy(strategy CompensationStrategy)

SetCompensationStrategy configures how compensations are executed

type SecurityConfig

type SecurityConfig struct {
	// EnableRequestValidation enables cryptographic validation of incoming requests
	EnableRequestValidation bool

	// SigningKeys are the Ed25519 public keys used to verify request signatures
	// These should match the keys configured in your Restate server
	SigningKeys []ed25519.PublicKey

	// RequireHTTPS enforces that services only accept HTTPS connections
	RequireHTTPS bool

	// AllowedOrigins restricts which Restate instances can invoke this service
	// Empty means all origins are allowed
	AllowedOrigins []string

	// RestrictPublicAccess marks services as private (not accessible via HTTP ingress)
	RestrictPublicAccess bool

	// ValidationMode determines how strict the validation is
	ValidationMode SecurityValidationMode
}

SecurityConfig holds security settings for Restate services

func DefaultSecurityConfig

func DefaultSecurityConfig() SecurityConfig

DefaultSecurityConfig returns production-ready security settings

func DevelopmentSecurityConfig

func DevelopmentSecurityConfig() SecurityConfig

DevelopmentSecurityConfig returns permissive settings for local development

type SecurityValidationMode

type SecurityValidationMode string

SecurityValidationMode controls security validation behavior

const (
	// SecurityModePermissive logs warnings but allows requests
	SecurityModePermissive SecurityValidationMode = "permissive"

	// SecurityModeStrict rejects requests that fail validation
	SecurityModeStrict SecurityValidationMode = "strict"

	// SecurityModeDisabled skips validation (NOT RECOMMENDED for production)
	SecurityModeDisabled SecurityValidationMode = "disabled"
)

type SecurityValidator

type SecurityValidator struct {
	// contains filtered or unexported fields
}

SecurityValidator provides request signature validation and security checks

func NewSecurityValidator

func NewSecurityValidator(config SecurityConfig, logger *slog.Logger) *SecurityValidator

NewSecurityValidator creates a validator with the given configuration

func (*SecurityValidator) ValidateRequest

func (sv *SecurityValidator) ValidateRequest(req *http.Request) RequestValidationResult

ValidateRequest validates an incoming HTTP request's signature and origin

type ServiceClient

type ServiceClient[I, O any] struct {
	ServiceName string
	HandlerName string
}

ServiceClient provides type-safe inter-service communication.

func (ServiceClient[I, O]) Call

func (c ServiceClient[I, O]) Call(
	ctx restate.Context,
	input I,
	opts ...CallOption,
) (O, error)

Call executes a request-response interaction.

func (ServiceClient[I, O]) Send

func (c ServiceClient[I, O]) Send(
	ctx restate.Context,
	input I,
	opts ...CallOption,
) restate.Invocation

Send executes a one-way fire-and-forget message. FIXED: Method cannot have its own type parameters - uses receiver's [I, O]

type ServiceType

type ServiceType string

ServiceType distinguishes between orchestration and business logic layers

const (
	ServiceTypeControlPlane ServiceType = "control_plane" // Orchestration, sagas, human workflows
	ServiceTypeDataPlane    ServiceType = "data_plane"    // Business logic, external calls, state management
)

type State

type State[T any] struct {
	// contains filtered or unexported fields
}

State provides type-safe access to Restate's key-value store with runtime guards

func NewState

func NewState[T any](ctx interface{}, key string) *State[T]

NewState creates a state accessor. Write operations require exclusive context.

func (*State[T]) Clear

func (s *State[T]) Clear() error

Clear removes state key. Only permitted from exclusive contexts.

func (*State[T]) Get

func (s *State[T]) Get() (T, error)

Get retrieves state value. Safe from any context type.

func (*State[T]) Set

func (s *State[T]) Set(value T) error

Set writes state value. Only permitted from exclusive contexts (enforced).

type StatusData

type StatusData struct {
	Phase          string                 `json:"phase"`
	Progress       float64                `json:"progress"` // 0.0 to 1.0
	CurrentStep    string                 `json:"current_step"`
	CompletedSteps []string               `json:"completed_steps"`
	Metadata       map[string]interface{} `json:"metadata"`
	UpdatedAt      time.Time              `json:"updated_at"`
	IsComplete     bool                   `json:"is_complete"`
	Error          string                 `json:"error,omitempty"`
}

StatusData represents workflow progress information

type Time

type Time struct {
	// contains filtered or unexported fields
}

Time captures the current time deterministically

func NewTime

func NewTime(ctx restate.Context) *Time

NewTime creates a deterministic time helper

func (*Time) Now

func (t *Time) Now() time.Time

Now returns the current time (captured once, deterministic on replay)

func (*Time) Since

func (t *Time) Since(start time.Time) time.Duration

Since returns the duration since a given time (uses deterministic Now)

func (*Time) Until

func (t *Time) Until(target time.Time) time.Duration

Until returns the duration until a given time (uses deterministic Now)

type TracingContext

type TracingContext struct {
	// contains filtered or unexported fields
}

TracingContext provides OpenTelemetry-compatible tracing

func NewTracingContext

func NewTracingContext(ctx restate.Context) *TracingContext

NewTracingContext creates a new tracing context

func (*TracingContext) EndSpan

func (tc *TracingContext) EndSpan(span *OpenTelemetrySpan, err error)

EndSpan ends the current span

func (*TracingContext) GetSpans

func (tc *TracingContext) GetSpans() []*OpenTelemetrySpan

GetSpans returns all recorded spans

func (*TracingContext) StartSpan

func (tc *TracingContext) StartSpan(name string, attributes map[string]string) *OpenTelemetrySpan

StartSpan starts a new span

type Void

type Void = restate.Void

Void is a type alias for operations that return no meaningful data

type WorkflowClient

type WorkflowClient[I, O any] struct {
	ServiceName string
	HandlerName string // Usually "run" for the main workflow handler
}

WorkflowClient provides type-safe communication with Workflow services

func (WorkflowClient[I, O]) Attach

func (c WorkflowClient[I, O]) Attach(
	ctx restate.Context,
	workflowID string,
	opts ...CallOption,
) (O, error)

Attach attaches to an existing workflow instance (request-response)

func (WorkflowClient[I, O]) AttachFuture

func (c WorkflowClient[I, O]) AttachFuture(
	ctx restate.Context,
	workflowID string,
) restate.Future

AttachFuture attaches to a workflow and returns a future Returns the future for use with restate.Wait() or restate.WaitFirst()

func (WorkflowClient[I, O]) GetOutput

func (c WorkflowClient[I, O]) GetOutput(
	ctx restate.Context,
	workflowID string,
	outputHandler string,
) (O, error)

GetOutput queries a workflow's output via shared handler

func (WorkflowClient[I, O]) Signal

func (c WorkflowClient[I, O]) Signal(
	ctx restate.Context,
	workflowID string,
	signalHandler string,
	input I,
	opts ...CallOption,
) restate.Invocation

Signal sends a signal to a workflow's shared handler

func (WorkflowClient[I, O]) Submit

func (c WorkflowClient[I, O]) Submit(
	ctx restate.Context,
	workflowID string,
	input I,
	opts ...CallOption,
) restate.Invocation

Submit starts a new workflow instance with the given ID (idempotent)

type WorkflowConfig

type WorkflowConfig struct {
	// StateRetentionDays configures how long workflow state is retained
	// Default: 30 days (Restate default)
	// Min: 1 day
	// Max: 90 days (Restate cluster limit)
	// Note: Actual retention depends on Restate cluster configuration
	StateRetentionDays int

	// EnableStatusPersistence enables durable storage of workflow status
	// When true: Status survives restarts, counts against retention
	// When false: Status is ephemeral, cheaper but lost on restart
	// Default: true
	//
	// IMPORTANT: Status persistence adds to state size. For workflows with
	// frequent status updates (e.g., every second), consider:
	// 1. Reducing update frequency
	// 2. Using external status storage
	// 3. Disabling persistence for short-lived workflows
	EnableStatusPersistence bool

	// AutoCleanupOnCompletion automatically purges workflow state after completion
	// When true: State is deleted when workflow completes successfully
	// When false: State retained until retention period expires
	// Default: false (preserve for audit/debugging)
	//
	// Set to true for:
	// - High-volume workflows (millions per day)
	// - Workflows with large state (>1MB)
	// - When audit trail not needed
	AutoCleanupOnCompletion bool

	// MaxStateSizeBytes warns when workflow state approaches this limit
	// Default: 1MB (conservative)
	// Restate limit: Typically 10MB per workflow
	//
	// Large state causes:
	// - Slower workflow execution
	// - Higher memory usage
	// - Longer retention costs
	MaxStateSizeBytes int64

	// CleanupGracePeriod is time to keep state after completion before cleanup
	// Only applies if AutoCleanupOnCompletion is true
	// Default: 24 hours
	// Use cases:
	// - Allow time for result queries
	// - Grace period for auditing
	// - Debugging window
	CleanupGracePeriod time.Duration
}

WorkflowConfig defines configuration for workflow behavior and state retention

CRITICAL: Workflow state is stored in Restate and subject to retention limits. Default Restate retention is typically 24 hours to 90 days depending on deployment.

State Retention Considerations:

  • Workflow state includes: execution history, promises, timers, durable state
  • If retention expires, workflow becomes unrecoverable
  • Long-running workflows (>90 days) need external state archival
  • WorkflowStatus persistence counts against retention

See WORKFLOW_RETENTION_GUIDE.MD for comprehensive guidance.

func DefaultWorkflowConfig

func DefaultWorkflowConfig() WorkflowConfig

DefaultWorkflowConfig returns recommended default configuration

Defaults are conservative and suitable for most use cases:

  • 30 day retention (balance between cost and recovery window)
  • Status persistence enabled (durability over cost)
  • No auto-cleanup (preserve for debugging)
  • 1MB max state warning (conservative limit)
  • 24h cleanup grace period (reasonable audit window)

Adjust based on your requirements. See WorkflowConfig documentation.

func HighVolumeWorkflowConfig

func HighVolumeWorkflowConfig() WorkflowConfig

HighVolumeWorkflowConfig returns configuration for high-volume workflows

High-volume defaults prioritize cost and performance:

  • Short retention (7 days - minimal compliance)
  • Status persistence disabled (reduce state size)
  • Auto-cleanup enabled (reduce storage costs)
  • Smaller state limit (512KB - encourage efficiency)
  • Short grace period (1 hour - quick cleanup)

Use for:

  • Millions of workflows per day
  • Short-lived workflows (<1 hour)
  • No audit requirements
  • Cost-sensitive deployments

func ProductionWorkflowConfig

func ProductionWorkflowConfig() WorkflowConfig

ProductionWorkflowConfig returns configuration optimized for production

Production defaults prioritize:

  • Longer retention (90 days for compliance)
  • Status persistence (critical for monitoring)
  • Auto-cleanup disabled (audit requirements)
  • Larger state limit (10MB - Restate max)
  • Longer grace period (7 days for investigation)

func (WorkflowConfig) ApplyToWorkflow

func (cfg WorkflowConfig) ApplyToWorkflow(workflow *restate.ServiceDefinition) *restate.ServiceDefinition

ApplyToWorkflow is a convenience method to apply config when creating a workflow

Example:

config := DefaultWorkflowConfig()
workflow := config.ApplyToWorkflow(restate.NewWorkflow("MyWorkflow"))

This is equivalent to:

workflow := restate.NewWorkflow("MyWorkflow", config.ToRestateOptions()...)

func (WorkflowConfig) EstimateStorageCost

func (cfg WorkflowConfig) EstimateStorageCost(workflowsPerDay int, avgStateSizeKB int) float64

EstimateStorageCost estimates monthly storage cost based on workflow volume

Parameters:

  • workflowsPerDay: Number of workflows started per day
  • avgStateSizeKB: Average workflow state size in KB

Returns:

  • Estimated total state size in GB for the retention period

Example:

config := DefaultWorkflowConfig()
storageGB := config.EstimateStorageCost(10000, 50) // 10k workflows/day, 50KB each
// Result: ~15GB for 30-day retention

Note: Actual costs depend on cloud provider pricing

func (WorkflowConfig) LogConfiguration

func (cfg WorkflowConfig) LogConfiguration(logger *slog.Logger, workflowName string)

LogConfiguration logs the workflow configuration for visibility

Use at application startup to document configuration:

config := ProductionWorkflowConfig()
config.LogConfiguration(slog.Default(), "OrderWorkflow")

func (WorkflowConfig) MonitorStateSize

func (cfg WorkflowConfig) MonitorStateSize(ctx interface{ Log() *slog.Logger }, estimatedSizeBytes int64) error

MonitorStateSize tracks workflow state size and logs warnings

Use this in your workflow handlers to monitor state growth:

func (w *MyWorkflow) Run(ctx restate.WorkflowContext, req Request) (Result, error) {
    cfg := w.GetConfig()

    // Check state size periodically
    if err := cfg.MonitorStateSize(ctx, estimatedStateSize); err != nil {
        ctx.Log().Warn("State size warning", "error", err)
    }

    // ... workflow logic ...
}

Parameters:

  • ctx: Workflow context (for logging)
  • estimatedSizeBytes: Current estimated state size in bytes

Returns error if state exceeds configured maximum

func (WorkflowConfig) ToRestateOptions

func (cfg WorkflowConfig) ToRestateOptions() []restate.ServiceDefinitionOption

ToRestateOptions converts WorkflowConfig to Restate SDK workflow options

This helper bridges our framework configuration to the native Restate SDK. Use when defining workflows with restate.NewWorkflow().

Example:

config := ProductionWorkflowConfig()
workflow := restate.NewWorkflow("MyWorkflow", config.ToRestateOptions()...)

Note: The Restate SDK supports these workflow-specific options:

  • restate.WithWorkflowRetention(duration) - sets state retention period
  • restate.WithIdempotencyRetention(duration) - sets idempotency key retention
  • restate.WithInactivityTimeout(duration) - auto-fails workflows after inactivity

See: https://docs.restate.dev/develop/go/workflows

func (WorkflowConfig) Validate

func (cfg WorkflowConfig) Validate(logger *slog.Logger) error

ValidateConfig checks if WorkflowConfig is valid and logs warnings

Validation checks:

  • Retention within Restate limits (1-90 days)
  • State size within Restate limit (10MB)
  • Cleanup grace period reasonable (<retention)

Warnings for common issues:

  • Very short retention (<7 days) - recovery risk
  • Very long retention (>60 days) - cost concern
  • No auto-cleanup + short retention - state orphaning
  • Large state limit (>5MB) - performance impact

func (WorkflowConfig) WithAutoCleanup

func (cfg WorkflowConfig) WithAutoCleanup(enabled bool, gracePeriod time.Duration) WorkflowConfig

WithAutoCleanup creates a new config with auto-cleanup enabled

Example:

config := DefaultWorkflowConfig().WithAutoCleanup(true, 2*time.Hour)

func (WorkflowConfig) WithCustomRetention

func (cfg WorkflowConfig) WithCustomRetention(days int) WorkflowConfig

WithCustomRetention creates a new config with custom retention period

Example:

config := DefaultWorkflowConfig().WithCustomRetention(45)

func (WorkflowConfig) WithMaxStateSize

func (cfg WorkflowConfig) WithMaxStateSize(bytes int64) WorkflowConfig

WithMaxStateSize creates a new config with custom max state size

Example:

config := DefaultWorkflowConfig().WithMaxStateSize(5 * 1024 * 1024) // 5MB

type WorkflowLoop

type WorkflowLoop struct {
	// contains filtered or unexported fields
}

WorkflowLoop provides looping constructs with built-in safety

func NewWorkflowLoop

func NewWorkflowLoop(ctx restate.WorkflowContext, maxIterations int) *WorkflowLoop

NewWorkflowLoop creates a loop controller

func (*WorkflowLoop) Retry

func (wl *WorkflowLoop) Retry(body LoopBody, maxAttempts int, initialDelay time.Duration) error

Retry executes body with exponential backoff retry

func (*WorkflowLoop) While

func (wl *WorkflowLoop) While(condition LoopCondition, body LoopBody) error

While executes body while condition returns true

type WorkflowStatus

type WorkflowStatus struct {
	// contains filtered or unexported fields
}

WorkflowStatus provides utilities for exposing workflow progress via shared handlers

func NewWorkflowStatus

func NewWorkflowStatus(ctx restate.WorkflowSharedContext, statusKey string) *WorkflowStatus

NewWorkflowStatus creates a status tracker for the workflow

func (*WorkflowStatus) GetStatus

func (ws *WorkflowStatus) GetStatus() (StatusData, error)

GetStatus retrieves current workflow status (read-only, safe from shared context)

type WorkflowTimer

type WorkflowTimer struct {
	// contains filtered or unexported fields
}

WorkflowTimer provides durable timer utilities for workflows

func NewWorkflowTimer

func NewWorkflowTimer(ctx restate.WorkflowContext) *WorkflowTimer

NewWorkflowTimer creates timer utilities bound to a workflow context

func (*WorkflowTimer) After

func (wt *WorkflowTimer) After(duration time.Duration) restate.AfterFuture

After creates a durable timer that completes after the specified duration

func (*WorkflowTimer) Sleep

func (wt *WorkflowTimer) Sleep(duration time.Duration) error

Sleep pauses workflow execution for the specified duration (durable)

func (*WorkflowTimer) SleepUntil

func (wt *WorkflowTimer) SleepUntil(targetTime time.Time) error

SleepUntil pauses until a specific time (calculates duration from now)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL