Documentation
¶
Overview ¶
Package framework provides a high-level abstraction layer for the Restate Go SDK, enforcing all best practices ("dos and don'ts") and eliminating boilerplate when writing Restate services.
Index ¶
- func ClearAll(ctx interface{}) error
- func ConfigureSecureServer(cfg SecurityConfig)
- func FanOutFail[T any](ctx restate.Context, operations []func() (T, error)) ([]T, error)
- func ForEach[T any](ctx restate.WorkflowContext, items []T, body func(item T, index int) error) error
- func Gather(ctx restate.Context, futures ...restate.Future) ([]any, error)
- func GetInternalSignal[T any](ctx restate.WorkflowSharedContext, name string) restate.DurablePromise[T]
- func GuardRunContext(rc restate.RunContext)
- func HandleGuardrailViolation(violation GuardrailViolation, logger *slog.Logger, ...) error
- func MapConcurrent[I, O any](ctx restate.Context, items []I, mapper func(I) (O, error)) ([]O, error)
- func NewTerminalError(err error) error
- func ParallelInvoke[T any](ctx restate.Context, clients []ServiceClient[any, T], inputs []any) ([]T, error)
- func ParseSigningKey(keyBase64 string) (ed25519.PublicKey, error)
- func ParseSigningKeys(keysBase64 []string) ([]ed25519.PublicKey, error)
- func ProcessBatch[I, O any](ctx restate.Context, items []I, processor func(I) (O, error), ...) ([]O, error)
- func RaceAwakeableWithTimeout[T any](ctx restate.WorkflowContext, awakeable restate.AwakeableFuture[T], ...) (T, bool, error)
- func RandChoice[T any](ctx restate.Context, items []T) (T, error)
- func RejectExternalSignal(ctx restate.Context, id string, reason error)
- func ResolveExternalSignal[T any](ctx restate.Context, id string, value T)
- func RunAsync[T any](ctx restate.Context, operation func(restate.RunContext) (T, error), ...) restate.Future
- func RunAsyncWithRetry[T any](ctx restate.Context, cfg RunConfig, ...) restate.Future
- func RunDo[T any](ctx restate.Context, operation func(restate.RunContext) (T, error), ...) (T, error)
- func RunDoVoid(ctx restate.Context, operation func(restate.RunContext) error, ...) error
- func RunWithRetry[T any](ctx restate.Context, cfg RunConfig, ...) (T, error)
- func SecureHandlerFunc(validator *SecurityValidator, handler http.HandlerFunc) http.HandlerFunc
- func SecureServer(validator *SecurityValidator, mux *http.ServeMux) http.Handler
- func SecurityMiddleware(validator *SecurityValidator) func(http.Handler) http.Handler
- func SetFrameworkPolicy(policy FrameworkPolicy)
- func UpdateStatus(ctx restate.WorkflowContext, statusKey string, update StatusData) error
- func ValidateIdempotencyKey(key string) error
- func ValidateServiceDefinition(svc any) error
- func ValidateServiceEndpoint(endpoint string, requireHTTPS bool) error
- func WaitForExternalSignal[T any](ctx restate.Context) restate.AwakeableFuture[T]
- func WrapTerminalError(err error, statusCode int) error
- type BatchProcessor
- type CallOption
- type CompensationStrategy
- type ControlPlaneService
- func (cp *ControlPlaneService) AddCompensationStep(name string, payload any, dedupe bool) error
- func (cp *ControlPlaneService) AwaitHumanApproval(ctx restate.Context, approvalID string, timeout time.Duration) (approved bool, err error)
- func (cp *ControlPlaneService) GenerateIdempotencyKey(ctx restate.Context, suffix string) string
- func (cp *ControlPlaneService) GenerateIdempotencyKeyDeterministic(businessKeys ...string) string
- func (cp *ControlPlaneService) Orchestrate(fn func() error) (err error)
- func (cp *ControlPlaneService) RegisterCompensation(name string, fn SagaCompensationFunc)
- type DataPlaneService
- type DeterministicHelpers
- type FanOutResult
- type FrameworkPolicy
- type GuardrailViolation
- type IdempotencyValidationMode
- type IngressCallOption
- type IngressClient
- type IngressObjectClient
- func (c IngressObjectClient[I, O]) AttachByIdempotencyKey(ctx context.Context, key string, idempotencyKey string) (O, error)
- func (c IngressObjectClient[I, O]) Call(ctx context.Context, key string, input I, opts ...IngressCallOption) (O, error)
- func (c IngressObjectClient[I, O]) Send(ctx context.Context, key string, input I, opts ...IngressCallOption) (string, error)
- type IngressServiceClient
- func (c IngressServiceClient[I, O]) AttachByIdempotencyKey(ctx context.Context, idempotencyKey string) (O, error)
- func (c IngressServiceClient[I, O]) Call(ctx context.Context, input I, opts ...IngressCallOption) (O, error)
- func (c IngressServiceClient[I, O]) Send(ctx context.Context, input I, opts ...IngressCallOption) (string, error)
- type IngressWorkflowClient
- func (c IngressWorkflowClient[I, O]) Attach(ctx context.Context, workflowID string) (O, error)
- func (c IngressWorkflowClient[I, O]) GetOutput(ctx context.Context, workflowID string, outputHandler string) (O, error)
- func (c IngressWorkflowClient[I, O]) Submit(ctx context.Context, workflowID string, input I, opts ...IngressCallOption) (string, error)
- type InstrumentedServiceClient
- type LoopBody
- type LoopCondition
- type MetricsCollector
- func (mc *MetricsCollector) DecrementActiveInvocations(serviceName string)
- func (mc *MetricsCollector) GetMetrics() map[string]interface{}
- func (mc *MetricsCollector) IncrementActiveInvocations(serviceName string)
- func (mc *MetricsCollector) RecordCompensation(stepName string, duration time.Duration, err error)
- func (mc *MetricsCollector) RecordInvocation(serviceName, handlerName string, duration time.Duration, err error)
- func (mc *MetricsCollector) RecordStateSize(objectKey string, sizeBytes int64)
- type MutableState
- type ObjectClient
- func (c ObjectClient[I, O]) Call(ctx restate.Context, key string, input I, opts ...CallOption) (O, error)
- func (c ObjectClient[I, O]) RequestFuture(ctx restate.Context, key string, input I) restate.Future
- func (c ObjectClient[I, O]) Send(ctx restate.Context, key string, input I, opts ...CallOption) restate.Invocation
- type ObservabilityHooks
- type OpenTelemetrySpan
- type PartialCompensationConfig
- type PromiseRaceResult
- type PromiseRacer
- type RaceResult
- type ReadOnlyState
- type RequestValidationResult
- type RunConfig
- type SafeStep
- type SagaCompensationFunc
- type SagaConfig
- type SagaEntry
- type SagaFramework
- func (s *SagaFramework) Add(name string, payload any, dedupe bool) error
- func (s *SagaFramework) CompensateIfNeeded(errPtr *error)
- func (s *SagaFramework) NewSafeStep(name string) *SafeStep[any]
- func (s *SagaFramework) Register(name string, fn SagaCompensationFunc)
- func (s *SagaFramework) RollbackWithStrategy(ctx restate.WorkflowContext, strategy CompensationStrategy) error
- func (s *SagaFramework) SetCompensationStrategy(strategy CompensationStrategy)
- type SecurityConfig
- type SecurityValidationMode
- type SecurityValidator
- type ServiceClient
- type ServiceType
- type State
- type StatusData
- type Time
- type TracingContext
- type Void
- type WorkflowClient
- func (c WorkflowClient[I, O]) Attach(ctx restate.Context, workflowID string, opts ...CallOption) (O, error)
- func (c WorkflowClient[I, O]) AttachFuture(ctx restate.Context, workflowID string) restate.Future
- func (c WorkflowClient[I, O]) GetOutput(ctx restate.Context, workflowID string, outputHandler string) (O, error)
- func (c WorkflowClient[I, O]) Signal(ctx restate.Context, workflowID string, signalHandler string, input I, ...) restate.Invocation
- func (c WorkflowClient[I, O]) Submit(ctx restate.Context, workflowID string, input I, opts ...CallOption) restate.Invocation
- type WorkflowConfig
- func (cfg WorkflowConfig) ApplyToWorkflow(workflow *restate.ServiceDefinition) *restate.ServiceDefinition
- func (cfg WorkflowConfig) EstimateStorageCost(workflowsPerDay int, avgStateSizeKB int) float64
- func (cfg WorkflowConfig) LogConfiguration(logger *slog.Logger, workflowName string)
- func (cfg WorkflowConfig) MonitorStateSize(ctx interface{ ... }, estimatedSizeBytes int64) error
- func (cfg WorkflowConfig) ToRestateOptions() []restate.ServiceDefinitionOption
- func (cfg WorkflowConfig) Validate(logger *slog.Logger) error
- func (cfg WorkflowConfig) WithAutoCleanup(enabled bool, gracePeriod time.Duration) WorkflowConfig
- func (cfg WorkflowConfig) WithCustomRetention(days int) WorkflowConfig
- func (cfg WorkflowConfig) WithMaxStateSize(bytes int64) WorkflowConfig
- type WorkflowLoop
- type WorkflowStatus
- type WorkflowTimer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ClearAll ¶
func ClearAll(ctx interface{}) error
ClearAll removes all state. Only permitted from exclusive contexts.
func ConfigureSecureServer ¶
func ConfigureSecureServer(cfg SecurityConfig)
ConfigureSecureServer logs security configuration (server options handled by SDK directly)
func FanOutFail ¶
FanOutFail executes operations concurrently and fails fast on first error
func ForEach ¶
func ForEach[T any]( ctx restate.WorkflowContext, items []T, body func(item T, index int) error, ) error
ForEach executes body for each item in the slice (standalone generic function)
func Gather ¶
Gather waits for all futures to complete and returns results. Returns slice of interface{} - caller must type assert.
func GetInternalSignal ¶
func GetInternalSignal[T any](ctx restate.WorkflowSharedContext, name string) restate.DurablePromise[T]
GetInternalSignal obtains a durable promise for intra-workflow coordination.
func GuardRunContext ¶
func GuardRunContext(rc restate.RunContext)
GuardRunContext ensures restate.Run is only called from appropriate contexts.
func HandleGuardrailViolation ¶
func HandleGuardrailViolation( violation GuardrailViolation, logger *slog.Logger, policyOverride FrameworkPolicy, ) error
HandleGuardrailViolation processes a guardrail violation according to policy. Returns an error if the violation should fail execution (strict mode). Logs a warning if the violation should be noted but execution continues (warn mode). Does nothing if validation is disabled.
If policyOverride is empty, uses the global framework policy.
func MapConcurrent ¶
func MapConcurrent[I, O any]( ctx restate.Context, items []I, mapper func(I) (O, error), ) ([]O, error)
MapConcurrent applies a function to each item concurrently
func NewTerminalError ¶
NewTerminalError creates a terminal error for permanent business failures.
func ParallelInvoke ¶
func ParallelInvoke[T any]( ctx restate.Context, clients []ServiceClient[any, T], inputs []any, ) ([]T, error)
ParallelInvoke invokes multiple service calls concurrently
func ParseSigningKey ¶
ParseSigningKey parses a base64-encoded Ed25519 public key
func ParseSigningKeys ¶
ParseSigningKeys parses multiple base64-encoded Ed25519 public keys
func ProcessBatch ¶
func ProcessBatch[I, O any]( ctx restate.Context, items []I, processor func(I) (O, error), maxConcurrency int, ) ([]O, error)
ProcessBatch processes items in batches with controlled concurrency (helper function)
func RaceAwakeableWithTimeout ¶
func RaceAwakeableWithTimeout[T any]( ctx restate.WorkflowContext, awakeable restate.AwakeableFuture[T], timeout time.Duration, timeoutValue T, ) (T, bool, error)
RaceAwakeableWithTimeout races an awakeable against a timeout (standalone generic function)
func RandChoice ¶
RandChoice picks a deterministic random item from a slice
func RejectExternalSignal ¶
RejectExternalSignal fails an awakeable from an external system.
func ResolveExternalSignal ¶
ResolveExternalSignal completes an awakeable from an external callback handler.
func RunAsync ¶
func RunAsync[T any]( ctx restate.Context, operation func(restate.RunContext) (T, error), opts ...restate.RunOption, ) restate.Future
RunAsync executes a side effect asynchronously and returns a future
func RunAsyncWithRetry ¶
func RunAsyncWithRetry[T any]( ctx restate.Context, cfg RunConfig, operation func(restate.RunContext) (T, error), ) restate.Future
RunAsyncWithRetry combines RunAsync with retry logic
func RunDo ¶
func RunDo[T any]( ctx restate.Context, operation func(restate.RunContext) (T, error), opts ...restate.RunOption, ) (T, error)
RunDo executes a function within restate.Run, ensuring the function receives only RunContext to prevent accidentally capturing outer context.
Use this for side effects that need to return a value. The function you provide should ONLY use the RunContext parameter (rc), never the outer ctx.
Example:
user, err := RunDo(ctx, func(rc restate.RunContext) (User, error) {
// Do NOT use ctx here - only use rc if needed
return fetchUserFromDB(userID) // side effect
}, restate.WithName("fetch-user"))
func RunDoVoid ¶
func RunDoVoid( ctx restate.Context, operation func(restate.RunContext) error, opts ...restate.RunOption, ) error
RunDoVoid executes a void function within restate.Run.
Use this for side effects that don't return a meaningful value. The function should ONLY use the RunContext parameter, never the outer ctx.
Example:
err := RunDoVoid(ctx, func(rc restate.RunContext) error {
// Do NOT use ctx here
return sendEmail(userEmail, subject, body)
}, restate.WithName("send-email"))
func RunWithRetry ¶
func RunWithRetry[T any]( ctx restate.Context, cfg RunConfig, operation func(restate.RunContext) (T, error), ) (T, error)
RunWithRetry executes a side effect with automatic retry on transient errors
func SecureHandlerFunc ¶
func SecureHandlerFunc(validator *SecurityValidator, handler http.HandlerFunc) http.HandlerFunc
SecureHandlerFunc is a convenience wrapper for securing a single http.HandlerFunc
Usage:
http.HandleFunc("/restate", SecureHandlerFunc(validator, myHandler))
func SecureServer ¶
func SecureServer(validator *SecurityValidator, mux *http.ServeMux) http.Handler
SecureServer wraps an entire http.ServeMux with security middleware
Usage:
mux := http.NewServeMux()
mux.HandleFunc("/restate", restateHandler)
mux.HandleFunc("/health", healthHandler)
securedMux := SecureServer(validator, mux)
http.ListenAndServe(":8080", securedMux)
func SecurityMiddleware ¶
func SecurityMiddleware(validator *SecurityValidator) func(http.Handler) http.Handler
SecurityMiddleware creates an HTTP middleware that enforces SecurityConfig
This middleware validates incoming requests according to the SecurityConfig:
- Signature verification (if EnableRequestValidation is true)
- HTTPS enforcement (if RequireHTTPS is true)
- Origin validation (if AllowedOrigins is configured)
Behavior per ValidationMode:
- SecurityModeStrict: Reject invalid requests with HTTP 401/403
- SecurityModePermissive: Log warnings but allow requests through
- SecurityModeDisabled: Skip all validation
Usage:
config := DefaultSecurityConfig()
config.SigningKeys = []ed25519.PublicKey{publicKey}
validator := NewSecurityValidator(config, slog.Default())
http.Handle("/restate", SecurityMiddleware(validator)(restateHandler))
func SetFrameworkPolicy ¶
func SetFrameworkPolicy(policy FrameworkPolicy)
SetFrameworkPolicy sets the global framework policy. This should be called early in application initialization.
func UpdateStatus ¶
func UpdateStatus(ctx restate.WorkflowContext, statusKey string, update StatusData) error
UpdateStatus updates workflow status (must be called from exclusive run handler)
func ValidateIdempotencyKey ¶
ValidateIdempotencyKey checks if an idempotency key appears to be deterministic. Returns an error if the key contains patterns that suggest non-deterministic generation.
func ValidateServiceDefinition ¶
ValidateServiceDefinition checks that a service struct follows Restate rules.
func ValidateServiceEndpoint ¶
ValidateServiceEndpoint checks if a service endpoint meets security requirements
func WaitForExternalSignal ¶
func WaitForExternalSignal[T any](ctx restate.Context) restate.AwakeableFuture[T]
WaitForExternalSignal creates a durable awakeable for external coordination.
func WrapTerminalError ¶
WrapTerminalError wraps an error with custom terminal status code.
Types ¶
type BatchProcessor ¶
type BatchProcessor struct {
// contains filtered or unexported fields
}
BatchProcessor handles batch processing with concurrency control
func NewBatchProcessor ¶
func NewBatchProcessor(ctx restate.Context, maxConcurrency int) *BatchProcessor
NewBatchProcessor creates a batch processor with concurrency limit
type CallOption ¶
type CallOption struct {
IdempotencyKey string
Delay time.Duration
ValidationMode IdempotencyValidationMode // Controls validation behavior (warn/fail/disabled)
}
CallOption configures inter-service calls
type CompensationStrategy ¶
type CompensationStrategy int
CompensationStrategy defines how to handle partial compensation
const ( // CompensateAll runs all compensations (default, all-or-nothing) CompensateAll CompensationStrategy = iota // CompensateCompleted only compensates successfully completed steps CompensateCompleted // CompensateBestEffort tries all compensations, continues on errors CompensateBestEffort // CompensateUntilSuccess stops after first successful compensation CompensateUntilSuccess )
type ControlPlaneService ¶
type ControlPlaneService struct {
// contains filtered or unexported fields
}
ControlPlaneService provides orchestration capabilities with built-in saga management.
func NewControlPlaneService ¶
func NewControlPlaneService(ctx restate.WorkflowContext, name string, idempotencyPrefix string) *ControlPlaneService
NewControlPlaneService creates an orchestrator with automatic saga setup.
func (*ControlPlaneService) AddCompensationStep ¶
func (cp *ControlPlaneService) AddCompensationStep(name string, payload any, dedupe bool) error
AddCompensationStep persists a compensation step before executing main action.
func (*ControlPlaneService) AwaitHumanApproval ¶
func (cp *ControlPlaneService) AwaitHumanApproval( ctx restate.Context, approvalID string, timeout time.Duration, ) (approved bool, err error)
AwaitHumanApproval coordinates human-in-the-loop with durable timeout.
func (*ControlPlaneService) GenerateIdempotencyKey ¶
func (cp *ControlPlaneService) GenerateIdempotencyKey(ctx restate.Context, suffix string) string
GenerateIdempotencyKey creates a deterministic idempotency key for external calls. Uses restate.UUID to ensure deterministic generation across retries. IMPORTANT: Must be called from within a durable handler context.
func (*ControlPlaneService) GenerateIdempotencyKeyDeterministic ¶
func (cp *ControlPlaneService) GenerateIdempotencyKeyDeterministic(businessKeys ...string) string
GenerateIdempotencyKeyDeterministic creates an idempotency key using only deterministic inputs. Use this when you need a predictable key based on business data (e.g., user ID + order ID).
func (*ControlPlaneService) Orchestrate ¶
func (cp *ControlPlaneService) Orchestrate(fn func() error) (err error)
Orchestrate executes a function within a saga context with automatic compensation.
func (*ControlPlaneService) RegisterCompensation ¶
func (cp *ControlPlaneService) RegisterCompensation(name string, fn SagaCompensationFunc)
RegisterCompensation adds a compensation handler to the saga.
type DataPlaneService ¶
type DataPlaneService[I, O any] struct { Name string Handler func(restate.RunContext, I) (O, error) }
DataPlaneService encapsulates pure business logic with automatic durability.
func NewStatelessService ¶
func NewStatelessService[I, O any]( name string, operation func(restate.RunContext, I) (O, error), ) *DataPlaneService[I, O]
NewStatelessService creates a durable data plane service.
type DeterministicHelpers ¶
type DeterministicHelpers struct {
// contains filtered or unexported fields
}
DeterministicHelpers provides deterministic operations
func NewDeterministicHelpers ¶
func NewDeterministicHelpers(ctx restate.Context) *DeterministicHelpers
NewDeterministicHelpers creates helpers for deterministic operations
func (*DeterministicHelpers) RandFloat ¶
func (h *DeterministicHelpers) RandFloat() float64
RandFloat generates a deterministic random float64 between 0.0 and 1.0
func (*DeterministicHelpers) RandInt ¶
func (h *DeterministicHelpers) RandInt(min, max int) int
RandInt generates a deterministic random integer
func (*DeterministicHelpers) UUID ¶
func (h *DeterministicHelpers) UUID() string
UUID generates a deterministic UUID (same on replay)
type FanOutResult ¶
FanOutResult represents the result of a fan-out operation
type FrameworkPolicy ¶
type FrameworkPolicy string
FrameworkPolicy controls the strictness of all framework runtime checks
const ( // PolicyStrict fails fast on any guardrail violation // Recommended for: CI pipelines, production environments PolicyStrict FrameworkPolicy = "strict" // PolicyWarn logs warnings but allows execution to continue // Recommended for: Local development, staging environments PolicyWarn FrameworkPolicy = "warn" // PolicyDisabled skips most validation checks // NOT RECOMMENDED: Only for testing/debugging specific issues PolicyDisabled FrameworkPolicy = "disabled" )
func GetFrameworkPolicy ¶
func GetFrameworkPolicy() FrameworkPolicy
GetFrameworkPolicy returns the current global framework policy
type GuardrailViolation ¶
type GuardrailViolation struct {
Check string // Name of the check (e.g., "idempotency_key_validation")
Message string // Human-readable error message
Severity string // "error", "warning", "info"
}
GuardrailViolation represents a framework guardrail violation
type IdempotencyValidationMode ¶
type IdempotencyValidationMode string
IdempotencyValidationMode controls how idempotency key validation failures are handled
const ( // IdempotencyValidationWarn logs validation errors but allows the call to proceed (default, permissive) IdempotencyValidationWarn IdempotencyValidationMode = "warn" // IdempotencyValidationFail rejects calls with invalid idempotency keys (strict) IdempotencyValidationFail IdempotencyValidationMode = "fail" // IdempotencyValidationDisabled skips validation entirely IdempotencyValidationDisabled IdempotencyValidationMode = "disabled" )
type IngressCallOption ¶
type IngressCallOption struct {
IdempotencyKey string
Delay time.Duration
Headers map[string]string
RequestID string
ValidationMode IdempotencyValidationMode // Control validation behavior
}
IngressCallOption configures ingress client calls
type IngressClient ¶
type IngressClient struct {
// contains filtered or unexported fields
}
IngressClient wraps the Restate ingress client for external invocations
func NewIngressClient ¶
func NewIngressClient(baseURL string, authKey string) *IngressClient
NewIngressClient creates an ingress client for external (non-Restate) applications
type IngressObjectClient ¶
type IngressObjectClient[I, O any] struct { // contains filtered or unexported fields }
IngressObjectClient provides type-safe external calls to Virtual Objects
func IngressObject ¶
func IngressObject[I, O any](ic *IngressClient, serviceName, handlerName string) *IngressObjectClient[I, O]
IngressObject creates a type-safe ingress client for calling a Virtual Object
func (IngressObjectClient[I, O]) AttachByIdempotencyKey ¶
func (c IngressObjectClient[I, O]) AttachByIdempotencyKey( ctx context.Context, key string, idempotencyKey string, ) (O, error)
AttachByIdempotencyKey attaches to an existing object invocation using its idempotency key
func (IngressObjectClient[I, O]) Call ¶
func (c IngressObjectClient[I, O]) Call( ctx context.Context, key string, input I, opts ...IngressCallOption, ) (O, error)
Call invokes a Virtual Object handler synchronously
func (IngressObjectClient[I, O]) Send ¶
func (c IngressObjectClient[I, O]) Send( ctx context.Context, key string, input I, opts ...IngressCallOption, ) (string, error)
Send invokes a Virtual Object handler asynchronously
type IngressServiceClient ¶
type IngressServiceClient[I, O any] struct { // contains filtered or unexported fields }
IngressServiceClient provides type-safe external calls to stateless services
func IngressService ¶
func IngressService[I, O any](ic *IngressClient, serviceName, handlerName string) *IngressServiceClient[I, O]
IngressService creates a type-safe ingress client for calling a stateless service
func (IngressServiceClient[I, O]) AttachByIdempotencyKey ¶
func (c IngressServiceClient[I, O]) AttachByIdempotencyKey( ctx context.Context, idempotencyKey string, ) (O, error)
AttachByIdempotencyKey attaches to an existing service invocation using its idempotency key. This only works if the original invocation was started with an idempotency key.
func (IngressServiceClient[I, O]) Call ¶
func (c IngressServiceClient[I, O]) Call( ctx context.Context, input I, opts ...IngressCallOption, ) (O, error)
Call invokes a service handler synchronously and waits for the result
func (IngressServiceClient[I, O]) Send ¶
func (c IngressServiceClient[I, O]) Send( ctx context.Context, input I, opts ...IngressCallOption, ) (string, error)
Send invokes a service handler asynchronously (fire-and-forget)
type IngressWorkflowClient ¶
type IngressWorkflowClient[I, O any] struct { // contains filtered or unexported fields }
IngressWorkflowClient provides type-safe external calls to Workflows
func IngressWorkflow ¶
func IngressWorkflow[I, O any](ic *IngressClient, serviceName, handlerName string) *IngressWorkflowClient[I, O]
IngressWorkflow creates a type-safe ingress client for calling a Workflow
func (IngressWorkflowClient[I, O]) Attach ¶
func (c IngressWorkflowClient[I, O]) Attach( ctx context.Context, workflowID string, ) (O, error)
Attach attaches to an existing workflow and waits for result. Works by workflow ID alone (workflows use ID as the key).
func (IngressWorkflowClient[I, O]) GetOutput ¶
func (c IngressWorkflowClient[I, O]) GetOutput( ctx context.Context, workflowID string, outputHandler string, ) (O, error)
GetOutput queries workflow output via shared handler. This is useful when you want to query the workflow result without running the main handler.
func (IngressWorkflowClient[I, O]) Submit ¶
func (c IngressWorkflowClient[I, O]) Submit( ctx context.Context, workflowID string, input I, opts ...IngressCallOption, ) (string, error)
Submit starts a new workflow instance
type InstrumentedServiceClient ¶
type InstrumentedServiceClient[I, O any] struct { Client ServiceClient[I, O] Metrics *MetricsCollector Tracing *TracingContext Hooks *ObservabilityHooks }
InstrumentedServiceClient wraps ServiceClient with observability
func NewInstrumentedClient ¶
func NewInstrumentedClient[I, O any]( client ServiceClient[I, O], metrics *MetricsCollector, tracing *TracingContext, hooks *ObservabilityHooks, ) *InstrumentedServiceClient[I, O]
NewInstrumentedClient creates an instrumented service client
func (*InstrumentedServiceClient[I, O]) Call ¶
func (isc *InstrumentedServiceClient[I, O]) Call( ctx restate.Context, input I, opts ...CallOption, ) (O, error)
Call executes with full observability
type LoopCondition ¶
LoopCondition is a function that determines if loop should continue
type MetricsCollector ¶
type MetricsCollector struct {
// Counters
InvocationTotal map[string]int64
InvocationErrors map[string]int64
CompensationTotal map[string]int64
CompensationErrors map[string]int64
// Gauges
ActiveInvocations map[string]int64
StateSize map[string]int64
// Histograms (stored as buckets)
InvocationDuration map[string][]float64
CompensationDuration map[string][]float64
// contains filtered or unexported fields
}
MetricsCollector provides Prometheus-compatible metrics collection
func NewMetricsCollector ¶
func NewMetricsCollector() *MetricsCollector
NewMetricsCollector creates a new metrics collector
func (*MetricsCollector) DecrementActiveInvocations ¶
func (mc *MetricsCollector) DecrementActiveInvocations(serviceName string)
DecrementActiveInvocations decrements active invocation gauge
func (*MetricsCollector) GetMetrics ¶
func (mc *MetricsCollector) GetMetrics() map[string]interface{}
GetMetrics returns a snapshot of all metrics
func (*MetricsCollector) IncrementActiveInvocations ¶
func (mc *MetricsCollector) IncrementActiveInvocations(serviceName string)
IncrementActiveInvocations increments active invocation gauge
func (*MetricsCollector) RecordCompensation ¶
func (mc *MetricsCollector) RecordCompensation(stepName string, duration time.Duration, err error)
RecordCompensation records a saga compensation execution
func (*MetricsCollector) RecordInvocation ¶
func (mc *MetricsCollector) RecordInvocation(serviceName, handlerName string, duration time.Duration, err error)
RecordInvocation records a service invocation
func (*MetricsCollector) RecordStateSize ¶
func (mc *MetricsCollector) RecordStateSize(objectKey string, sizeBytes int64)
RecordStateSize records state size in bytes
type MutableState ¶
type MutableState[T any] struct { // contains filtered or unexported fields }
MutableState provides compile-time safe state access for exclusive contexts
func NewMutableObjectState ¶
func NewMutableObjectState[T any](ctx restate.ObjectContext, key string) *MutableState[T]
NewMutableObjectState creates state for Object exclusive handlers
func NewMutableWorkflowState ¶
func NewMutableWorkflowState[T any](ctx restate.WorkflowContext, key string) *MutableState[T]
NewMutableWorkflowState creates state for Workflow run handlers
func (*MutableState[T]) Set ¶
func (s *MutableState[T]) Set(value T) error
Set updates the value (guaranteed mutable by constructor)
type ObjectClient ¶
ObjectClient provides type-safe communication with Virtual Object services
func (ObjectClient[I, O]) Call ¶
func (c ObjectClient[I, O]) Call( ctx restate.Context, key string, input I, opts ...CallOption, ) (O, error)
Call invokes a Virtual Object handler with the specified key (request-response)
func (ObjectClient[I, O]) RequestFuture ¶
func (c ObjectClient[I, O]) RequestFuture( ctx restate.Context, key string, input I, ) restate.Future
RequestFuture invokes a Virtual Object handler and returns a future (for concurrent calls) Returns the future for use with restate.Wait() or restate.WaitFirst()
func (ObjectClient[I, O]) Send ¶
func (c ObjectClient[I, O]) Send( ctx restate.Context, key string, input I, opts ...CallOption, ) restate.Invocation
Send invokes a Virtual Object handler asynchronously (one-way)
type ObservabilityHooks ¶
type ObservabilityHooks struct {
// Invocation hooks
OnInvocationStart func(serviceName, handlerName string, input interface{})
OnInvocationEnd func(serviceName, handlerName string, output interface{}, err error, duration time.Duration)
// State hooks
OnStateGet func(key string, value interface{})
OnStateSet func(key string, value interface{})
OnStateClear func(key string)
// Saga hooks
OnSagaStart func(sagaName string)
OnSagaStepAdded func(sagaName, stepName string)
OnCompensationStart func(stepName string)
OnCompensationEnd func(stepName string, err error, duration time.Duration)
// Workflow hooks
OnWorkflowStart func(workflowID string, input interface{})
OnWorkflowEnd func(workflowID string, output interface{}, err error)
// Error hooks
OnError func(context string, err error)
}
ObservabilityHooks provides hooks for custom observability
func DefaultObservabilityHooks ¶
func DefaultObservabilityHooks(log *slog.Logger) *ObservabilityHooks
DefaultObservabilityHooks returns hooks that log to slog
type OpenTelemetrySpan ¶
type OpenTelemetrySpan struct {
TraceID string
SpanID string
ParentID string
Name string
StartTime time.Time
EndTime time.Time
Attributes map[string]string
Status string
Error error
}
OpenTelemetrySpan represents a trace span
type PartialCompensationConfig ¶
type PartialCompensationConfig struct {
Strategy CompensationStrategy
ContinueOnError bool
MaxRetries int
}
PartialCompensationConfig configures partial compensation behavior
type PromiseRaceResult ¶
type PromiseRaceResult[T any] struct { Value T TimedOut bool PromiseWon bool PromiseResult T Error error }
PromiseRaceResult indicates which future won the race (promise vs timeout)
func RacePromiseWithTimeout ¶
func RacePromiseWithTimeout[T any]( ctx restate.WorkflowContext, promiseName string, timeout time.Duration, ) (PromiseRaceResult[T], error)
RacePromiseWithTimeout races a promise against a timeout (standalone generic function)
type PromiseRacer ¶
type PromiseRacer struct {
// contains filtered or unexported fields
}
PromiseRacer provides utilities for racing promises against timeouts
func NewPromiseRacer ¶
func NewPromiseRacer(ctx restate.WorkflowContext) *PromiseRacer
NewPromiseRacer creates a promise racing utility
type RaceResult ¶
RaceResult represents the winner of a race between futures.
type ReadOnlyState ¶
type ReadOnlyState[T any] struct { // contains filtered or unexported fields }
ReadOnlyState provides read-only state access for shared contexts
func NewReadOnlyObjectState ¶
func NewReadOnlyObjectState[T any](ctx restate.ObjectSharedContext, key string) *ReadOnlyState[T]
NewReadOnlyObjectState creates read-only state for Object shared handlers
func NewReadOnlyWorkflowState ¶
func NewReadOnlyWorkflowState[T any](ctx restate.WorkflowSharedContext, key string) *ReadOnlyState[T]
NewReadOnlyWorkflowState creates read-only state for Workflow shared handlers
func (*ReadOnlyState[T]) Get ¶
func (s *ReadOnlyState[T]) Get() (T, error)
Get retrieves the value (no Set method exists!)
type RequestValidationResult ¶
type RequestValidationResult struct {
Valid bool
KeyIndex int // Index of the signing key that validated (if Valid=true)
ErrorMessage string // Human-readable error if Valid=false
RequestOrigin string // Origin header from the request
}
RequestValidationResult contains the outcome of request signature validation
type RunConfig ¶
type RunConfig struct {
MaxRetries int
InitialDelay time.Duration
MaxDelay time.Duration
BackoffFactor float64
Name string
}
RunConfig configures retry behavior for Run blocks
func DefaultRunConfig ¶
DefaultRunConfig returns sensible defaults for Run blocks
type SafeStep ¶
type SafeStep[T any] struct { // contains filtered or unexported fields }
SafeStep enforces "register compensation BEFORE action" pattern
func (*SafeStep[T]) WithCompensation ¶
func (step *SafeStep[T]) WithCompensation(compensation SagaCompensationFunc) *SafeStep[T]
WithCompensation registers the compensation (MUST be called before Execute)
type SagaCompensationFunc ¶
type SagaCompensationFunc func(rc restate.RunContext, payload []byte) error
SagaCompensationFunc executes a compensation action inside restate.Run.
func ValidateCompensationIdempotent ¶
func ValidateCompensationIdempotent( name string, handler SagaCompensationFunc, ) SagaCompensationFunc
ValidateCompensationIdempotent is a documentation/lint helper for compensation handlers.
While the framework cannot enforce idempotency at runtime, this helper:
- Documents the idempotency contract clearly in code
- Provides a consistent API for developers
- Enables potential static analysis/linting
- Adds runtime logging to track compensation execution
Compensation handlers MUST be idempotent because:
- Restate may replay them during recovery
- Retries will execute the same compensation multiple times
- Network failures can cause duplicate attempts
Usage:
saga.Register("charge_payment", ValidateCompensationIdempotent(
"charge_payment",
func(rc restate.RunContext, payload []byte) error {
var data PaymentData
json.Unmarshal(payload, &data)
// ✅ Idempotent: refund checks if already refunded
return refundPaymentIdempotent(data.PaymentID)
},
))
Idempotency Patterns:
- Check-Then-Act: Verify side effect not already done
- Use External Idempotency Keys: Payment provider handles deduplication
- State-Based Deduplication: Store completion flag in workflow state
- Set Absolute Values: Don't increment/decrement, set exact values
Anti-Patterns to Avoid:
❌ Incrementing/decrementing without checks ❌ Creating resources without unique constraints ❌ Time-dependent logic (non-deterministic) ❌ Assuming single execution
See SAGA_GUIDE.MD for detailed patterns and examples.
type SagaConfig ¶
type SagaConfig struct {
MaxRetries int
InitialRetryDelay time.Duration
MaxRetryDelay time.Duration
FailOnCleanupError bool
DLQKey string
}
SagaConfig controls retry behavior and DLQ handling.
func DefaultSagaConfig ¶
func DefaultSagaConfig() SagaConfig
DefaultSagaConfig returns production-ready retry defaults.
type SagaEntry ¶
type SagaEntry struct {
Name string `json:"name"`
Payload []byte `json:"payload"`
StepID string `json:"step_id"`
Timestamp time.Time `json:"timestamp"`
Attempt int `json:"attempt"`
}
SagaEntry is the persisted record of a compensation step.
type SagaFramework ¶
type SagaFramework struct {
// contains filtered or unexported fields
}
SagaFramework manages durable compensation for control plane operations.
func NewSaga ¶
func NewSaga(ctx restate.WorkflowContext, name string, cfg *SagaConfig) *SagaFramework
NewSaga creates a saga bound to a workflow context.
func (*SagaFramework) Add ¶
func (s *SagaFramework) Add(name string, payload any, dedupe bool) error
Add persists a compensation step BEFORE executing the main action.
func (*SagaFramework) CompensateIfNeeded ¶
func (s *SagaFramework) CompensateIfNeeded(errPtr *error)
CompensateIfNeeded executes all compensations in reverse order if error occurred.
func (*SagaFramework) NewSafeStep ¶
func (s *SagaFramework) NewSafeStep(name string) *SafeStep[any]
NewSafeStep creates a step that enforces compensation-before-action
func (*SagaFramework) Register ¶
func (s *SagaFramework) Register(name string, fn SagaCompensationFunc)
Register adds a compensation handler. Must be called before Add.
func (*SagaFramework) RollbackWithStrategy ¶
func (s *SagaFramework) RollbackWithStrategy( ctx restate.WorkflowContext, strategy CompensationStrategy, ) error
RollbackWithStrategy executes compensations with the specified strategy Simplified version that works with existing saga implementation
func (*SagaFramework) SetCompensationStrategy ¶
func (s *SagaFramework) SetCompensationStrategy(strategy CompensationStrategy)
SetCompensationStrategy configures how compensations are executed
type SecurityConfig ¶
type SecurityConfig struct {
// EnableRequestValidation enables cryptographic validation of incoming requests
EnableRequestValidation bool
// SigningKeys are the Ed25519 public keys used to verify request signatures
// These should match the keys configured in your Restate server
SigningKeys []ed25519.PublicKey
// RequireHTTPS enforces that services only accept HTTPS connections
RequireHTTPS bool
// AllowedOrigins restricts which Restate instances can invoke this service
// Empty means all origins are allowed
AllowedOrigins []string
// RestrictPublicAccess marks services as private (not accessible via HTTP ingress)
RestrictPublicAccess bool
// ValidationMode determines how strict the validation is
ValidationMode SecurityValidationMode
}
SecurityConfig holds security settings for Restate services
func DefaultSecurityConfig ¶
func DefaultSecurityConfig() SecurityConfig
DefaultSecurityConfig returns production-ready security settings
func DevelopmentSecurityConfig ¶
func DevelopmentSecurityConfig() SecurityConfig
DevelopmentSecurityConfig returns permissive settings for local development
type SecurityValidationMode ¶
type SecurityValidationMode string
SecurityValidationMode controls security validation behavior
const ( // SecurityModePermissive logs warnings but allows requests SecurityModePermissive SecurityValidationMode = "permissive" // SecurityModeStrict rejects requests that fail validation SecurityModeStrict SecurityValidationMode = "strict" // SecurityModeDisabled skips validation (NOT RECOMMENDED for production) SecurityModeDisabled SecurityValidationMode = "disabled" )
type SecurityValidator ¶
type SecurityValidator struct {
// contains filtered or unexported fields
}
SecurityValidator provides request signature validation and security checks
func NewSecurityValidator ¶
func NewSecurityValidator(config SecurityConfig, logger *slog.Logger) *SecurityValidator
NewSecurityValidator creates a validator with the given configuration
func (*SecurityValidator) ValidateRequest ¶
func (sv *SecurityValidator) ValidateRequest(req *http.Request) RequestValidationResult
ValidateRequest validates an incoming HTTP request's signature and origin
type ServiceClient ¶
ServiceClient provides type-safe inter-service communication.
func (ServiceClient[I, O]) Call ¶
func (c ServiceClient[I, O]) Call( ctx restate.Context, input I, opts ...CallOption, ) (O, error)
Call executes a request-response interaction.
func (ServiceClient[I, O]) Send ¶
func (c ServiceClient[I, O]) Send( ctx restate.Context, input I, opts ...CallOption, ) restate.Invocation
Send executes a one-way fire-and-forget message. FIXED: Method cannot have its own type parameters - uses receiver's [I, O]
type ServiceType ¶
type ServiceType string
ServiceType distinguishes between orchestration and business logic layers
const ( ServiceTypeControlPlane ServiceType = "control_plane" // Orchestration, sagas, human workflows ServiceTypeDataPlane ServiceType = "data_plane" // Business logic, external calls, state management )
type State ¶
type State[T any] struct { // contains filtered or unexported fields }
State provides type-safe access to Restate's key-value store with runtime guards
type StatusData ¶
type StatusData struct {
Phase string `json:"phase"`
Progress float64 `json:"progress"` // 0.0 to 1.0
CurrentStep string `json:"current_step"`
CompletedSteps []string `json:"completed_steps"`
Metadata map[string]interface{} `json:"metadata"`
UpdatedAt time.Time `json:"updated_at"`
IsComplete bool `json:"is_complete"`
Error string `json:"error,omitempty"`
}
StatusData represents workflow progress information
type Time ¶
type Time struct {
// contains filtered or unexported fields
}
Time captures the current time deterministically
type TracingContext ¶
type TracingContext struct {
// contains filtered or unexported fields
}
TracingContext provides OpenTelemetry-compatible tracing
func NewTracingContext ¶
func NewTracingContext(ctx restate.Context) *TracingContext
NewTracingContext creates a new tracing context
func (*TracingContext) EndSpan ¶
func (tc *TracingContext) EndSpan(span *OpenTelemetrySpan, err error)
EndSpan ends the current span
func (*TracingContext) GetSpans ¶
func (tc *TracingContext) GetSpans() []*OpenTelemetrySpan
GetSpans returns all recorded spans
func (*TracingContext) StartSpan ¶
func (tc *TracingContext) StartSpan(name string, attributes map[string]string) *OpenTelemetrySpan
StartSpan starts a new span
type WorkflowClient ¶
type WorkflowClient[I, O any] struct { ServiceName string HandlerName string // Usually "run" for the main workflow handler }
WorkflowClient provides type-safe communication with Workflow services
func (WorkflowClient[I, O]) Attach ¶
func (c WorkflowClient[I, O]) Attach( ctx restate.Context, workflowID string, opts ...CallOption, ) (O, error)
Attach attaches to an existing workflow instance (request-response)
func (WorkflowClient[I, O]) AttachFuture ¶
func (c WorkflowClient[I, O]) AttachFuture( ctx restate.Context, workflowID string, ) restate.Future
AttachFuture attaches to a workflow and returns a future Returns the future for use with restate.Wait() or restate.WaitFirst()
func (WorkflowClient[I, O]) GetOutput ¶
func (c WorkflowClient[I, O]) GetOutput( ctx restate.Context, workflowID string, outputHandler string, ) (O, error)
GetOutput queries a workflow's output via shared handler
func (WorkflowClient[I, O]) Signal ¶
func (c WorkflowClient[I, O]) Signal( ctx restate.Context, workflowID string, signalHandler string, input I, opts ...CallOption, ) restate.Invocation
Signal sends a signal to a workflow's shared handler
func (WorkflowClient[I, O]) Submit ¶
func (c WorkflowClient[I, O]) Submit( ctx restate.Context, workflowID string, input I, opts ...CallOption, ) restate.Invocation
Submit starts a new workflow instance with the given ID (idempotent)
type WorkflowConfig ¶
type WorkflowConfig struct {
// StateRetentionDays configures how long workflow state is retained
// Default: 30 days (Restate default)
// Min: 1 day
// Max: 90 days (Restate cluster limit)
// Note: Actual retention depends on Restate cluster configuration
StateRetentionDays int
// EnableStatusPersistence enables durable storage of workflow status
// When true: Status survives restarts, counts against retention
// When false: Status is ephemeral, cheaper but lost on restart
// Default: true
//
// IMPORTANT: Status persistence adds to state size. For workflows with
// frequent status updates (e.g., every second), consider:
// 1. Reducing update frequency
// 2. Using external status storage
// 3. Disabling persistence for short-lived workflows
EnableStatusPersistence bool
// AutoCleanupOnCompletion automatically purges workflow state after completion
// When true: State is deleted when workflow completes successfully
// When false: State retained until retention period expires
// Default: false (preserve for audit/debugging)
//
// Set to true for:
// - High-volume workflows (millions per day)
// - Workflows with large state (>1MB)
// - When audit trail not needed
AutoCleanupOnCompletion bool
// MaxStateSizeBytes warns when workflow state approaches this limit
// Default: 1MB (conservative)
// Restate limit: Typically 10MB per workflow
//
// Large state causes:
// - Slower workflow execution
// - Higher memory usage
// - Longer retention costs
MaxStateSizeBytes int64
// CleanupGracePeriod is time to keep state after completion before cleanup
// Only applies if AutoCleanupOnCompletion is true
// Default: 24 hours
// Use cases:
// - Allow time for result queries
// - Grace period for auditing
// - Debugging window
CleanupGracePeriod time.Duration
}
WorkflowConfig defines configuration for workflow behavior and state retention
CRITICAL: Workflow state is stored in Restate and subject to retention limits. Default Restate retention is typically 24 hours to 90 days depending on deployment.
State Retention Considerations:
- Workflow state includes: execution history, promises, timers, durable state
- If retention expires, workflow becomes unrecoverable
- Long-running workflows (>90 days) need external state archival
- WorkflowStatus persistence counts against retention
See WORKFLOW_RETENTION_GUIDE.MD for comprehensive guidance.
func DefaultWorkflowConfig ¶
func DefaultWorkflowConfig() WorkflowConfig
DefaultWorkflowConfig returns recommended default configuration
Defaults are conservative and suitable for most use cases:
- 30 day retention (balance between cost and recovery window)
- Status persistence enabled (durability over cost)
- No auto-cleanup (preserve for debugging)
- 1MB max state warning (conservative limit)
- 24h cleanup grace period (reasonable audit window)
Adjust based on your requirements. See WorkflowConfig documentation.
func HighVolumeWorkflowConfig ¶
func HighVolumeWorkflowConfig() WorkflowConfig
HighVolumeWorkflowConfig returns configuration for high-volume workflows
High-volume defaults prioritize cost and performance:
- Short retention (7 days - minimal compliance)
- Status persistence disabled (reduce state size)
- Auto-cleanup enabled (reduce storage costs)
- Smaller state limit (512KB - encourage efficiency)
- Short grace period (1 hour - quick cleanup)
Use for:
- Millions of workflows per day
- Short-lived workflows (<1 hour)
- No audit requirements
- Cost-sensitive deployments
func ProductionWorkflowConfig ¶
func ProductionWorkflowConfig() WorkflowConfig
ProductionWorkflowConfig returns configuration optimized for production
Production defaults prioritize:
- Longer retention (90 days for compliance)
- Status persistence (critical for monitoring)
- Auto-cleanup disabled (audit requirements)
- Larger state limit (10MB - Restate max)
- Longer grace period (7 days for investigation)
func (WorkflowConfig) ApplyToWorkflow ¶
func (cfg WorkflowConfig) ApplyToWorkflow(workflow *restate.ServiceDefinition) *restate.ServiceDefinition
ApplyToWorkflow is a convenience method to apply config when creating a workflow
Example:
config := DefaultWorkflowConfig()
workflow := config.ApplyToWorkflow(restate.NewWorkflow("MyWorkflow"))
This is equivalent to:
workflow := restate.NewWorkflow("MyWorkflow", config.ToRestateOptions()...)
func (WorkflowConfig) EstimateStorageCost ¶
func (cfg WorkflowConfig) EstimateStorageCost(workflowsPerDay int, avgStateSizeKB int) float64
EstimateStorageCost estimates monthly storage cost based on workflow volume
Parameters:
- workflowsPerDay: Number of workflows started per day
- avgStateSizeKB: Average workflow state size in KB
Returns:
- Estimated total state size in GB for the retention period
Example:
config := DefaultWorkflowConfig() storageGB := config.EstimateStorageCost(10000, 50) // 10k workflows/day, 50KB each // Result: ~15GB for 30-day retention
Note: Actual costs depend on cloud provider pricing
func (WorkflowConfig) LogConfiguration ¶
func (cfg WorkflowConfig) LogConfiguration(logger *slog.Logger, workflowName string)
LogConfiguration logs the workflow configuration for visibility
Use at application startup to document configuration:
config := ProductionWorkflowConfig() config.LogConfiguration(slog.Default(), "OrderWorkflow")
func (WorkflowConfig) MonitorStateSize ¶
func (cfg WorkflowConfig) MonitorStateSize(ctx interface{ Log() *slog.Logger }, estimatedSizeBytes int64) error
MonitorStateSize tracks workflow state size and logs warnings
Use this in your workflow handlers to monitor state growth:
func (w *MyWorkflow) Run(ctx restate.WorkflowContext, req Request) (Result, error) {
cfg := w.GetConfig()
// Check state size periodically
if err := cfg.MonitorStateSize(ctx, estimatedStateSize); err != nil {
ctx.Log().Warn("State size warning", "error", err)
}
// ... workflow logic ...
}
Parameters:
- ctx: Workflow context (for logging)
- estimatedSizeBytes: Current estimated state size in bytes
Returns error if state exceeds configured maximum
func (WorkflowConfig) ToRestateOptions ¶
func (cfg WorkflowConfig) ToRestateOptions() []restate.ServiceDefinitionOption
ToRestateOptions converts WorkflowConfig to Restate SDK workflow options
This helper bridges our framework configuration to the native Restate SDK. Use when defining workflows with restate.NewWorkflow().
Example:
config := ProductionWorkflowConfig()
workflow := restate.NewWorkflow("MyWorkflow", config.ToRestateOptions()...)
Note: The Restate SDK supports these workflow-specific options:
- restate.WithWorkflowRetention(duration) - sets state retention period
- restate.WithIdempotencyRetention(duration) - sets idempotency key retention
- restate.WithInactivityTimeout(duration) - auto-fails workflows after inactivity
func (WorkflowConfig) Validate ¶
func (cfg WorkflowConfig) Validate(logger *slog.Logger) error
ValidateConfig checks if WorkflowConfig is valid and logs warnings
Validation checks:
- Retention within Restate limits (1-90 days)
- State size within Restate limit (10MB)
- Cleanup grace period reasonable (<retention)
Warnings for common issues:
- Very short retention (<7 days) - recovery risk
- Very long retention (>60 days) - cost concern
- No auto-cleanup + short retention - state orphaning
- Large state limit (>5MB) - performance impact
func (WorkflowConfig) WithAutoCleanup ¶
func (cfg WorkflowConfig) WithAutoCleanup(enabled bool, gracePeriod time.Duration) WorkflowConfig
WithAutoCleanup creates a new config with auto-cleanup enabled
Example:
config := DefaultWorkflowConfig().WithAutoCleanup(true, 2*time.Hour)
func (WorkflowConfig) WithCustomRetention ¶
func (cfg WorkflowConfig) WithCustomRetention(days int) WorkflowConfig
WithCustomRetention creates a new config with custom retention period
Example:
config := DefaultWorkflowConfig().WithCustomRetention(45)
func (WorkflowConfig) WithMaxStateSize ¶
func (cfg WorkflowConfig) WithMaxStateSize(bytes int64) WorkflowConfig
WithMaxStateSize creates a new config with custom max state size
Example:
config := DefaultWorkflowConfig().WithMaxStateSize(5 * 1024 * 1024) // 5MB
type WorkflowLoop ¶
type WorkflowLoop struct {
// contains filtered or unexported fields
}
WorkflowLoop provides looping constructs with built-in safety
func NewWorkflowLoop ¶
func NewWorkflowLoop(ctx restate.WorkflowContext, maxIterations int) *WorkflowLoop
NewWorkflowLoop creates a loop controller
func (*WorkflowLoop) While ¶
func (wl *WorkflowLoop) While(condition LoopCondition, body LoopBody) error
While executes body while condition returns true
type WorkflowStatus ¶
type WorkflowStatus struct {
// contains filtered or unexported fields
}
WorkflowStatus provides utilities for exposing workflow progress via shared handlers
func NewWorkflowStatus ¶
func NewWorkflowStatus(ctx restate.WorkflowSharedContext, statusKey string) *WorkflowStatus
NewWorkflowStatus creates a status tracker for the workflow
func (*WorkflowStatus) GetStatus ¶
func (ws *WorkflowStatus) GetStatus() (StatusData, error)
GetStatus retrieves current workflow status (read-only, safe from shared context)
type WorkflowTimer ¶
type WorkflowTimer struct {
// contains filtered or unexported fields
}
WorkflowTimer provides durable timer utilities for workflows
func NewWorkflowTimer ¶
func NewWorkflowTimer(ctx restate.WorkflowContext) *WorkflowTimer
NewWorkflowTimer creates timer utilities bound to a workflow context
func (*WorkflowTimer) After ¶
func (wt *WorkflowTimer) After(duration time.Duration) restate.AfterFuture
After creates a durable timer that completes after the specified duration
func (*WorkflowTimer) Sleep ¶
func (wt *WorkflowTimer) Sleep(duration time.Duration) error
Sleep pauses workflow execution for the specified duration (durable)
func (*WorkflowTimer) SleepUntil ¶
func (wt *WorkflowTimer) SleepUntil(targetTime time.Time) error
SleepUntil pauses until a specific time (calculates duration from now)