Documentation
¶
Overview ¶
Package core provides the fundamental primitives for building resolute workflows.
Index ¶
- Constants
- func ClearMetricsExporter()
- func ComputeWebhookSignature(payload []byte, secret string) string
- func CursorFor(source string) *time.Time
- func CursorString(source string) string
- func ExampleActivity(ctx context.Context, input struct{}) (struct{}, error)
- func Get[T any](s FlowStateReader, key string) T
- func GetOr[T any](s FlowStateReader, key string, defaultVal T) T
- func GetSafe[T any](s FlowStateReader, key string) (T, error)
- func GetWebhookHeaders(input FlowInput) http.Header
- func GetWebhookPayload(input FlowInput) []byte
- func Has(s FlowStateReader, key string) bool
- func InjectSignal[T any](fs *FlowState, name string, payload T)
- func InputData(key string) string
- func IsFatalError(err error) bool
- func IsOutputRefMarker(ref DataRef) bool
- func IsTerminalError(err error) bool
- func Keys(s FlowStateReader) []string
- func LoadConfig[T any](prefix string) (T, error)
- func MustGet[T any](s FlowStateReader, key string) T
- func MustLoadConfig[T any](prefix string) T
- func Output(path string) string
- func ParseWebhookPayload[T any](input FlowInput) (T, error)
- func PeekSignal[T any](fs *FlowState, name string) (T, bool)
- func RateLimitWaitActivity(ctx context.Context, limiterID string) error
- func RecordActivityExecution(input RecordActivityExecutionInput)
- func RecordFlowExecution(input RecordFlowExecutionInput)
- func RecordRateLimitWait(input RecordRateLimitWaitInput)
- func RecordStateOperation(input RecordStateOperationInput)
- func RegisterProviderActivities(w worker.Worker, p Provider)
- func ResolveCursorRef(source string, state *FlowState) *time.Time
- func ResolveOutputRef(path string, state *FlowState) (string, bool)
- func ResolveStringRef(s string, state *FlowState) (string, error)
- func Set[T any](s *FlowState, key string, value T)
- func SetDefaultBackend(b StateBackend)
- func SetMetricsExporter(exporter MetricsExporter)
- func SetStorage(s *Storage)
- func SignalCount(fs *FlowState, name string) int
- func TakeAllSignals[T any](fs *FlowState, name string) []T
- func TakeSignal[T any](fs *FlowState, name string) (T, bool)
- func Validate(v interface{}) error
- func ValidateActivityFunc(fn ActivityFunc) error
- func WrapFlowError(flowName, stepName, nodeName string, input interface{}, err error) error
- func WrapNodeError(nodeName string, input interface{}, err error) error
- type ActivityFunc
- type ActivityMeta
- type ActivityOptions
- type ActivityRegistrar
- type BaseProvider
- func (p *BaseProvider) Activities() []ActivityMeta
- func (p *BaseProvider) AddActivity(name string, fn ActivityFunc) *BaseProvider
- func (p *BaseProvider) AddActivityWithDescription(name, description string, fn ActivityFunc) *BaseProvider
- func (p *BaseProvider) GetRateLimiter() *SharedRateLimiter
- func (p *BaseProvider) HealthCheck(ctx context.Context) error
- func (p *BaseProvider) Name() string
- func (p *BaseProvider) Version() string
- func (p *BaseProvider) WithHealthCheck(fn HealthCheckFunc) *BaseProvider
- func (p *BaseProvider) WithRateLimit(requests int, per time.Duration) *BaseProvider
- type ChildFlowConfig
- type ChildFlowNode
- func (c *ChildFlowNode) As(key string) *ChildFlowNode
- func (c *ChildFlowNode) Compensate(_ workflow.Context, _ *FlowState) error
- func (c *ChildFlowNode) Compensation() ExecutableNode
- func (c *ChildFlowNode) Execute(ctx workflow.Context, state *FlowState) error
- func (c *ChildFlowNode) HasCompensation() bool
- func (c *ChildFlowNode) Input() interface{}
- func (c *ChildFlowNode) Name() string
- func (c *ChildFlowNode) OutputKey() string
- func (c *ChildFlowNode) RateLimiterID() string
- type ChildFlowResults
- type ClassifiedError
- type CompensationChain
- type CompensationEntry
- type ConditionalBuilder
- func (cb *ConditionalBuilder) Else() *ConditionalBuilder
- func (cb *ConditionalBuilder) EndWhen() *FlowBuilder
- func (cb *ConditionalBuilder) Otherwise(node ExecutableNode) *FlowBuilder
- func (cb *ConditionalBuilder) OtherwiseParallel(name string, nodes ...ExecutableNode) *FlowBuilder
- func (cb *ConditionalBuilder) Then(node ExecutableNode) *ConditionalBuilder
- func (cb *ConditionalBuilder) ThenParallel(name string, nodes ...ExecutableNode) *ConditionalBuilder
- type ConditionalConfig
- type ConfigError
- type ConfiguredPageFetcher
- type ContinueAsNewPolicy
- type CostEntry
- type Cursor
- type CursorMarker
- type CursorUpdateConfig
- type DataRef
- type ErrorClassifier
- type ErrorType
- type ExecutableNode
- type Flow
- type FlowBuilder
- func NewFlow(name string) *FlowBuilder
- func RegisterSignal[T any](b *FlowBuilder, name string) *FlowBuilder
- func ThenLoop[S any](b *FlowBuilder, name string, opts ...LoopOption[S]) *FlowBuilder
- func ThenParallelEach[I, O any](b *FlowBuilder, name string, opts ...ParallelEachOption[I, O]) *FlowBuilder
- func WithQueryState[S any](b *FlowBuilder, name, key string) *FlowBuilder
- func (b *FlowBuilder) Build() *Flow
- func (b *FlowBuilder) Then(node ExecutableNode) *FlowBuilder
- func (b *FlowBuilder) ThenChildren(name string, config ChildFlowConfig) *FlowBuilder
- func (b *FlowBuilder) ThenGate(name string, config GateConfig) *FlowBuilder
- func (b *FlowBuilder) ThenParallel(name string, nodes ...ExecutableNode) *FlowBuilder
- func (b *FlowBuilder) TriggeredBy(t Trigger) *FlowBuilder
- func (b *FlowBuilder) When(pred Predicate) *ConditionalBuilder
- func (b *FlowBuilder) WithHooks(hooks *FlowHooks) *FlowBuilder
- func (b *FlowBuilder) WithQuery(name string, handler any) *FlowBuilder
- func (b *FlowBuilder) WithState(cfg StateConfig) *FlowBuilder
- type FlowError
- type FlowHooks
- type FlowInput
- type FlowState
- func (s *FlowState) ChildWorkflowID(nodeName string) (string, bool)
- func (s *FlowState) ChildWorkflows() map[string]string
- func (s *FlowState) GetCursor(source string) Cursor
- func (s *FlowState) GetInputData(key string) ([]byte, bool)
- func (s *FlowState) GetResult(key string) any
- func (s *FlowState) GetWindowMeta() WindowMeta
- func (s *FlowState) HasResult(key string) bool
- func (s *FlowState) LoadPersisted(ctx workflow.Context, flowName string, cfg *StateConfig) error
- func (s *FlowState) NewBatchState() *FlowState
- func (s *FlowState) RegisterChildWorkflow(nodeName, workflowID string)
- func (s *FlowState) SavePersisted(ctx workflow.Context, flowName string, cfg *StateConfig) error
- func (s *FlowState) SetCursor(source, position string)
- func (s *FlowState) SetInputData(key string, data []byte)
- func (s *FlowState) SetResult(key string, value any)
- func (s *FlowState) SetWindowMeta(cursor string, size int)
- func (s *FlowState) Snapshot() *FlowState
- type FlowStateReader
- type FlowTemplate
- func (ft *FlowTemplate) AddChildren(name string, config ChildFlowConfig) *FlowTemplate
- func (ft *FlowTemplate) AddConditional(pred Predicate, thenSteps []ExecutableNode, elseSteps []ExecutableNode) *FlowTemplate
- func (ft *FlowTemplate) AddGate(name string, config GateConfig) *FlowTemplate
- func (ft *FlowTemplate) AddParallel(name string, nodes ...ExecutableNode) *FlowTemplate
- func (ft *FlowTemplate) AddStep(node ExecutableNode) *FlowTemplate
- func (ft *FlowTemplate) Build() *Flow
- func (ft *FlowTemplate) TriggeredBy(t Trigger) *FlowTemplate
- func (ft *FlowTemplate) WithHooks(hooks *FlowHooks) *FlowTemplate
- func (ft *FlowTemplate) WithState(cfg StateConfig) *FlowTemplate
- type FlowTester
- func (t *FlowTester) AssertCallCount(tb TestingT, nodeName string, expected int)
- func (t *FlowTester) AssertCalled(tb TestingT, nodeName string)
- func (t *FlowTester) AssertNotCalled(tb TestingT, nodeName string)
- func (t *FlowTester) CallArgs(nodeName string) []any
- func (t *FlowTester) CallCount(nodeName string) int
- func (t *FlowTester) LastCallArg(nodeName string) any
- func (t *FlowTester) Mock(nodeName string, fn any) *FlowTester
- func (t *FlowTester) MockChildFlow(name string, fn func(*FlowState) (*FlowState, error)) *FlowTester
- func (t *FlowTester) MockError(nodeName string, err error) *FlowTester
- func (t *FlowTester) MockGate(gateName string, result GateResult) *FlowTester
- func (t *FlowTester) MockValue(nodeName string, value any) *FlowTester
- func (t *FlowTester) Reset()
- func (t *FlowTester) ResetAll()
- func (t *FlowTester) Run(flow *Flow, input FlowInput) (*FlowState, error)
- func (t *FlowTester) RunWithContext(ctx context.Context, flow *Flow, input FlowInput) (*FlowState, error)
- func (t *FlowTester) WasCalled(nodeName string) bool
- func (t *FlowTester) WithRateLimiting() *FlowTester
- type GateConfig
- type GateNode
- func (g *GateNode) As(key string) *GateNode
- func (g *GateNode) Compensate(_ workflow.Context, _ *FlowState) error
- func (g *GateNode) Compensation() ExecutableNode
- func (g *GateNode) Execute(ctx workflow.Context, state *FlowState) error
- func (g *GateNode) HasCompensation() bool
- func (g *GateNode) Input() interface{}
- func (g *GateNode) Name() string
- func (g *GateNode) OutputKey() string
- func (g *GateNode) RateLimiterID() string
- type GateResult
- type GateTimeoutError
- type HTTPStatusError
- type HealthCheckFunc
- type HealthServer
- func (h *HealthServer) Handler() http.Handler
- func (h *HealthServer) IsReady() bool
- func (h *HealthServer) IsStarted() bool
- func (h *HealthServer) SetReady(ready bool)
- func (h *HealthServer) SetStarted(started bool)
- func (h *HealthServer) Shutdown(ctx context.Context) error
- func (h *HealthServer) Start(addr string) error
- func (h *HealthServer) StartAsync(addr string) error
- type HealthStatus
- type HookContext
- type ItemActivity
- type ItemContext
- type ItemErrorMode
- type IterationEvent
- type LocalStorage
- type LoopExitReason
- type LoopOption
- func AfterIteration[S any](fn func(s S, fs FlowStateReader) S) LoopOption[S]
- func BeforeIteration[S any](fn func(s S, fs *FlowState) (S, bool)) LoopOption[S]
- func ContinueAsNewAfter[S any](p ContinueAsNewPolicy) LoopOption[S]
- func FinalState[S any](inject func(*FlowState, S)) LoopOption[S]
- func InitialState[S any](extract func(*FlowState) S) LoopOption[S]
- func IterationHook[S any](h func(IterationEvent)) LoopOption[S]
- func MaxIterations[S any](n int) LoopOption[S]
- func StateKey[S any](key string) LoopOption[S]
- func Steps[S any](steps ...Step) LoopOption[S]
- func While[S any](cond func(S) bool) LoopOption[S]
- type Metrics
- type MetricsExporter
- type Node
- func NewNode[I, O any](name string, activity func(context.Context, I) (O, error), input I) *Node[I, O]
- func NewNodeByName[I, O any](name string, activityName string, input I) *Node[I, O]
- func Paginate[T any](name string, fetcher PageFetcher[T], opts ...PaginationOption) *Node[PaginateInput, PaginateOutput[T]]
- func PaginateWithConfig[T any, C any](name string, fetcher ConfiguredPageFetcher[T, C], opts ...PaginationOption) *Node[PaginateWithInputParams[C], PaginateWithInputOutput[T, C]]
- func (n *Node[I, O]) As(outputKey string) *Node[I, O]
- func (n *Node[I, O]) Compensate(ctx workflow.Context, state *FlowState) error
- func (n *Node[I, O]) Compensation() ExecutableNode
- func (n *Node[I, O]) Execute(ctx workflow.Context, state *FlowState) error
- func (n *Node[I, O]) HasCompensation() bool
- func (n *Node[I, O]) Input() any
- func (n *Node[I, O]) Name() string
- func (n *Node[I, O]) OnError(compensation ExecutableNode) *Node[I, O]
- func (n *Node[I, O]) OutputKey() string
- func (n *Node[I, O]) RateLimiterID() string
- func (n *Node[I, O]) WindowConfig() Window
- func (n *Node[I, O]) WithCursorUpdate(source, field string) *Node[I, O]
- func (n *Node[I, O]) WithErrorClassifier(fn ErrorClassifier) *Node[I, O]
- func (n *Node[I, O]) WithRateLimit(requests int, per time.Duration) *Node[I, O]
- func (n *Node[I, O]) WithRetry(policy RetryPolicy) *Node[I, O]
- func (n *Node[I, O]) WithSharedRateLimit(limiter *SharedRateLimiter) *Node[I, O]
- func (n *Node[I, O]) WithTimeout(d time.Duration) *Node[I, O]
- func (n *Node[I, O]) WithValidation() *Node[I, O]
- func (n *Node[I, O]) WithWindow(w Window) *Node[I, O]
- type NodeError
- type NoopExporter
- type PageFetcher
- type PageResult
- type PaginateInput
- type PaginateOutput
- type PaginateWithInputOutput
- type PaginateWithInputParams
- type PaginationConfig
- type PaginationOption
- type ParallelEachOption
- func AfterItem[I, O any](fn func(ItemContext, I, O, error, FlowStateReader)) ParallelEachOption[I, O]
- func BeforeItem[I, O any](fn func(ItemContext, I, FlowStateReader)) ParallelEachOption[I, O]
- func From[I, O any](fn func(*FlowState) []I) ParallelEachOption[I, O]
- func MaxConcurrency[I, O any](n int) ParallelEachOption[I, O]
- func Merge[I, O any](fn func(*FlowState, []O)) ParallelEachOption[I, O]
- func OnItemError[I, O any](m ItemErrorMode) ParallelEachOption[I, O]
- func ParallelBody[I, O any](a ItemActivity[I, O]) ParallelEachOption[I, O]
- func PerItemRetry[I, O any](p RetryPolicyOverride) ParallelEachOption[I, O]
- type PersistedState
- type Predicate
- type PrometheusExporter
- type Provider
- type ProviderRegistry
- func (r *ProviderRegistry) Get(name string) (Provider, bool)
- func (r *ProviderRegistry) List() []Provider
- func (r *ProviderRegistry) Register(p Provider) error
- func (r *ProviderRegistry) RegisterActivities(w worker.Worker, providerName string) error
- func (r *ProviderRegistry) RegisterAllActivities(w worker.Worker)
- type QueryDef
- type RateLimitConfig
- type RateLimiter
- type RecordActivityExecutionInput
- type RecordFlowExecutionInput
- type RecordRateLimitWaitInput
- type RecordStateOperationInput
- type RegisterActivitiesFunc
- type RetryPolicy
- type RetryPolicyOverride
- type SharedRateLimiter
- type SignalBuffer
- type SignalDef
- type StateBackend
- type StateConfig
- type Step
- type Storage
- type StorageBackend
- type TestingT
- type TokenBucket
- type Trigger
- type TriggerConfig
- type TriggerType
- type ValidationError
- type ValidationErrors
- type WebhookBuilder
- type WebhookResponse
- type WebhookServer
- func (s *WebhookServer) Handler() http.Handler
- func (s *WebhookServer) RegisterFlow(flow *Flow) error
- func (s *WebhookServer) Routes() []string
- func (s *WebhookServer) Shutdown(ctx context.Context) error
- func (s *WebhookServer) Start(addr string) error
- func (s *WebhookServer) StartAsync(addr string) error
- type WebhookServerConfig
- type Window
- type WindowMeta
- type WindowedNode
- type WorkerBuilder
- func (b *WorkerBuilder) Build() error
- func (b *WorkerBuilder) Client() client.Client
- func (b *WorkerBuilder) HealthServer() *HealthServer
- func (b *WorkerBuilder) Run() error
- func (b *WorkerBuilder) RunAsync() (shutdown func(), err error)
- func (b *WorkerBuilder) WebhookServer() *WebhookServer
- func (b *WorkerBuilder) WithConfig(cfg WorkerConfig) *WorkerBuilder
- func (b *WorkerBuilder) WithFlow(f *Flow) *WorkerBuilder
- func (b *WorkerBuilder) WithHealthServer(addr string) *WorkerBuilder
- func (b *WorkerBuilder) WithMetrics(exporter MetricsExporter) *WorkerBuilder
- func (b *WorkerBuilder) WithProviders(providers ...Provider) *WorkerBuilder
- func (b *WorkerBuilder) WithWebhookServer(addr string) *WorkerBuilder
- func (b *WorkerBuilder) Worker() worker.Worker
- type WorkerConfig
Examples ¶
Constants ¶
const ( BackendLocal = "local" BackendS3 = "s3" BackendGCS = "gcs" )
Backend identifiers
const ( // TriggerWebhook indicates the flow is started via HTTP webhook. TriggerWebhook TriggerType = "webhook" // DefaultWebhookMethod is the default HTTP method for webhooks. DefaultWebhookMethod = "POST" // WebhookSignatureHeader is the header containing the HMAC signature. WebhookSignatureHeader = "X-Webhook-Signature" )
Variables ¶
This section is empty.
Functions ¶
func ClearMetricsExporter ¶
func ClearMetricsExporter()
ClearMetricsExporter removes the global metrics exporter (for testing).
func ComputeWebhookSignature ¶
ComputeWebhookSignature computes a signature for testing or external webhook senders.
func CursorFor ¶
CursorFor creates a magic marker that resolves to the persisted cursor for a source. The framework replaces this with the actual cursor value at execution time.
Example:
jira.FetchIssues(jira.Input{
Project: "PLATFORM",
Since: core.CursorFor("jira"),
})
func CursorString ¶
CursorString creates a string marker for cursor reference (for string fields).
func ExampleActivity ¶
ExampleActivity is a sample activity function signature for documentation. All activities must follow this pattern: func(context.Context, Input) (Output, error)
func Get ¶
func Get[T any](s FlowStateReader, key string) T
Get retrieves a typed value from results. Panics if the key doesn't exist or type doesn't match.
func GetOr ¶
func GetOr[T any](s FlowStateReader, key string, defaultVal T) T
GetOr retrieves a typed value from results with a default fallback.
func GetSafe ¶
func GetSafe[T any](s FlowStateReader, key string) (T, error)
GetSafe retrieves a typed value from results with error handling. Returns an error if the key doesn't exist or type doesn't match.
func GetWebhookHeaders ¶
GetWebhookHeaders extracts the webhook headers from FlowInput.
func GetWebhookPayload ¶
GetWebhookPayload extracts the webhook payload from FlowInput.
func Has ¶
func Has(s FlowStateReader, key string) bool
Has returns true if the key exists in state results, even if the stored value is nil.
func InjectSignal ¶
InjectSignal appends a typed payload directly to the buffer. Used by tests to simulate signal arrival without driving the workflow signal channel.
Production code should never call this — production payloads enter the buffer via the receive pump installed by RegisterSignal[T].
func InputData ¶
InputData creates a magic marker that resolves to a value from the flow's initial input. Webhook-triggered flows store payload data in FlowInput; this marker provides access from activity input structs.
Example:
bitbucket.ParseWebhook(bitbucket.ParseWebhookInput{
RawPayload: core.InputData("webhook_payload"),
})
func IsFatalError ¶
IsFatalError checks if an error has been classified as fatal.
func IsOutputRefMarker ¶
IsOutputRefMarker returns true if the DataRef is a marker for output resolution.
func IsTerminalError ¶
IsTerminalError checks if an error has been classified as terminal.
func Keys ¶
func Keys(s FlowStateReader) []string
Keys returns all keys in state results, sorted alphabetically. Returns nil when the underlying reader is not a *FlowState — exposing enumeration on the bare interface would force every adapter to implement it, and hooks rarely need to enumerate.
func LoadConfig ¶
LoadConfig loads configuration from environment variables into a struct. The prefix is prepended to all environment variable names with an underscore.
Supported struct tags:
- env:"VAR_NAME" - the environment variable suffix (prefix_VAR_NAME)
- required:"true" - fail if the variable is not set
- default:"value" - use this value if the variable is not set
Supported field types: string, int, int64, bool, float64, time.Duration
Example:
type Config struct {
BaseURL string `env:"BASE_URL" required:"true"`
Token string `env:"TOKEN" required:"true"`
Timeout time.Duration `env:"TIMEOUT" default:"30s"`
Retries int `env:"RETRIES" default:"3"`
}
cfg, err := LoadConfig[Config]("JIRA")
// Reads JIRA_BASE_URL, JIRA_TOKEN, JIRA_TIMEOUT, JIRA_RETRIES
func MustGet ¶
func MustGet[T any](s FlowStateReader, key string) T
MustGet retrieves a typed value from results. Panics with a descriptive error if the key doesn't exist or type doesn't match.
func MustLoadConfig ¶
MustLoadConfig loads configuration and panics if there's an error. Use this in main() or init() where you want to fail fast.
func Output ¶
Output creates a magic marker that resolves to a previous node's output. The framework replaces this with the actual value at execution time.
Example:
gcp.CreateSubnet(gcp.SubnetInput{
VPC: core.Output("vpc.name"),
})
func ParseWebhookPayload ¶
ParseWebhookPayload parses the webhook payload into a typed struct.
func PeekSignal ¶
PeekSignal returns the oldest buffered payload for `name`, typed as T, without removing it. Returns (zero, false) if no payload is buffered.
Panics under the same condition as TakeSignal.
func RateLimitWaitActivity ¶
RateLimitWaitActivity is a local activity that waits on a rate limiter. This is used to rate limit workflow activities without violating determinism.
func RecordActivityExecution ¶
func RecordActivityExecution(input RecordActivityExecutionInput)
RecordActivityExecution records metrics for a completed activity execution.
func RecordFlowExecution ¶
func RecordFlowExecution(input RecordFlowExecutionInput)
RecordFlowExecution records metrics for a completed flow execution.
func RecordRateLimitWait ¶
func RecordRateLimitWait(input RecordRateLimitWaitInput)
RecordRateLimitWait records metrics for rate limiter wait time.
func RecordStateOperation ¶
func RecordStateOperation(input RecordStateOperationInput)
RecordStateOperation records metrics for state backend operations.
func RegisterProviderActivities ¶
RegisterProviderActivities is a helper function to register activities from a provider directly with a Temporal worker without using the registry.
func ResolveCursorRef ¶
ResolveCursorRef looks up a cursor value from the flow state.
func ResolveOutputRef ¶
ResolveOutputRef looks up a previous node's output from the flow state.
func ResolveStringRef ¶
ResolveStringRef resolves a string that may contain output/input/cursor markers.
func Set ¶
Set stores a typed value in results. Mutating accessor — takes *FlowState, not FlowStateReader, by design (hooks observe; they do not mutate).
func SetDefaultBackend ¶
func SetDefaultBackend(b StateBackend)
SetDefaultBackend allows overriding the default backend (for testing).
func SetMetricsExporter ¶
func SetMetricsExporter(exporter MetricsExporter)
SetMetricsExporter configures the global metrics exporter. Call this during application initialization before starting workflows.
func SetStorage ¶
func SetStorage(s *Storage)
SetStorage sets the global storage instance. Call this early in initialization to use a custom backend.
func SignalCount ¶
SignalCount returns the number of buffered payloads for `name`.
func TakeAllSignals ¶
TakeAllSignals returns all buffered payloads for `name`, typed as T, and clears the buffer. Returns nil when no payloads are buffered.
Panics under the same condition as TakeSignal — caller-provided T must match the registered T for `name`.
func TakeSignal ¶
TakeSignal removes and returns the oldest buffered payload for `name`, typed as T. Returns (zero, false) if no payload is buffered.
Panics if the buffered payload is not assertable to T. By construction (RegisterSignal[T] is the only public injection path), this can only occur if a signal name was registered with a different T than the caller requests — i.e. programmer error, surfaced loudly per Go convention.
func Validate ¶
func Validate(v interface{}) error
Validate checks struct fields against validate tags. Supported tags: required, min=N, max=N, minlen=N, maxlen=N, oneof=a|b|c
func ValidateActivityFunc ¶
func ValidateActivityFunc(fn ActivityFunc) error
ValidateActivityFunc checks if a function has a valid activity signature. Valid signatures are: func(context.Context, I) (O, error)
func WrapFlowError ¶
WrapFlowError wraps an error with flow execution context. Returns nil if err is nil.
func WrapNodeError ¶
WrapNodeError wraps an error with node execution context. Returns nil if err is nil.
Types ¶
type ActivityFunc ¶
type ActivityFunc interface{}
ActivityFunc is the function signature for all activity implementations. Using a concrete type instead of `any` for type safety.
type ActivityMeta ¶
type ActivityMeta struct {
Name string
Description string
Function ActivityFunc
}
ActivityMeta describes a single activity for registration and discovery.
type ActivityOptions ¶
type ActivityOptions struct {
RetryPolicy *RetryPolicy
StartToCloseTimeout time.Duration
HeartbeatTimeout time.Duration
TaskQueue string
}
ActivityOptions configures retry and timeout behavior for a node.
func DefaultActivityOptions ¶
func DefaultActivityOptions() ActivityOptions
DefaultActivityOptions returns sensible defaults for activity execution.
type ActivityRegistrar ¶
ActivityRegistrar is implemented by types that can register their activities.
type BaseProvider ¶
type BaseProvider struct {
ProviderName string
ProviderVersion string
ActivityList []ActivityMeta
RateLimiter *SharedRateLimiter
// contains filtered or unexported fields
}
BaseProvider provides a default implementation of the Provider interface. Embed this in provider implementations to reduce boilerplate.
func NewProvider ¶
func NewProvider(name, version string) *BaseProvider
NewProvider creates a new base provider with the given name and version.
func (*BaseProvider) Activities ¶
func (p *BaseProvider) Activities() []ActivityMeta
Activities returns the list of activities.
func (*BaseProvider) AddActivity ¶
func (p *BaseProvider) AddActivity(name string, fn ActivityFunc) *BaseProvider
AddActivity adds an activity to the provider.
func (*BaseProvider) AddActivityWithDescription ¶
func (p *BaseProvider) AddActivityWithDescription(name, description string, fn ActivityFunc) *BaseProvider
AddActivityWithDescription adds an activity with a description.
func (*BaseProvider) GetRateLimiter ¶
func (p *BaseProvider) GetRateLimiter() *SharedRateLimiter
GetRateLimiter returns the provider's rate limiter, if configured.
func (*BaseProvider) HealthCheck ¶
func (p *BaseProvider) HealthCheck(ctx context.Context) error
HealthCheck verifies provider connectivity and configuration. Returns nil if healthy, or an error describing the failure.
func (*BaseProvider) Version ¶
func (p *BaseProvider) Version() string
Version returns the provider version.
func (*BaseProvider) WithHealthCheck ¶
func (p *BaseProvider) WithHealthCheck(fn HealthCheckFunc) *BaseProvider
WithHealthCheck configures a health check function for the provider. The health check is called during worker startup to validate configuration.
Example:
provider := core.NewProvider("jira", "1.0.0").
WithHealthCheck(func(ctx context.Context) error {
_, err := client.GetServerInfo(ctx)
return err
})
func (*BaseProvider) WithRateLimit ¶
func (p *BaseProvider) WithRateLimit(requests int, per time.Duration) *BaseProvider
WithRateLimit configures a shared rate limiter for all activities in this provider. All activities registered through this provider will share the same rate limit, which is useful for preventing overwhelming external APIs.
Example:
jiraProvider := jira.NewProvider().WithRateLimit(100, time.Minute)
type ChildFlowConfig ¶
ChildFlowConfig defines how child workflows are spawned from a parent flow.
type ChildFlowNode ¶
type ChildFlowNode struct {
// contains filtered or unexported fields
}
ChildFlowNode spawns child workflows from a parent flow.
func NewChildFlowNode ¶
func NewChildFlowNode(name string, config ChildFlowConfig) *ChildFlowNode
NewChildFlowNode creates a child flow node with the given name and configuration.
func (*ChildFlowNode) As ¶
func (c *ChildFlowNode) As(key string) *ChildFlowNode
As names the output of this child flow node for reference by downstream nodes.
func (*ChildFlowNode) Compensate ¶
func (c *ChildFlowNode) Compensate(_ workflow.Context, _ *FlowState) error
Compensate is a no-op for child flow nodes.
func (*ChildFlowNode) Compensation ¶
func (c *ChildFlowNode) Compensation() ExecutableNode
Compensation returns nil.
func (*ChildFlowNode) Execute ¶
func (c *ChildFlowNode) Execute(ctx workflow.Context, state *FlowState) error
Execute spawns child workflows using workflow.ExecuteChildWorkflow. Parallel by default; sequential if config.Sequential is true.
func (*ChildFlowNode) HasCompensation ¶
func (c *ChildFlowNode) HasCompensation() bool
HasCompensation returns false — child flow nodes have no compensation.
func (*ChildFlowNode) Input ¶
func (c *ChildFlowNode) Input() interface{}
Input returns nil — child flow nodes derive input from InputMapper.
func (*ChildFlowNode) Name ¶
func (c *ChildFlowNode) Name() string
Name returns the child flow node's identifier.
func (*ChildFlowNode) OutputKey ¶
func (c *ChildFlowNode) OutputKey() string
OutputKey returns the key used to store child flow results in FlowState.
func (*ChildFlowNode) RateLimiterID ¶
func (c *ChildFlowNode) RateLimiterID() string
RateLimiterID returns empty — child flows don't use rate limiting.
type ChildFlowResults ¶
ChildFlowResults holds the outcomes of all spawned child workflows.
type ClassifiedError ¶
ClassifiedError wraps an error with its classification.
func (*ClassifiedError) Error ¶
func (e *ClassifiedError) Error() string
func (*ClassifiedError) IsRetryable ¶
func (e *ClassifiedError) IsRetryable() bool
IsRetryable returns true if the error should be retried.
func (*ClassifiedError) Unwrap ¶
func (e *ClassifiedError) Unwrap() error
type CompensationChain ¶
type CompensationChain struct {
// contains filtered or unexported fields
}
CompensationChain manages the ordered list of compensation entries. Compensations are executed in reverse order (LIFO) on failure.
func NewCompensationChain ¶
func NewCompensationChain() *CompensationChain
NewCompensationChain creates an empty compensation chain.
func (*CompensationChain) Add ¶
func (c *CompensationChain) Add(node ExecutableNode, state *FlowState)
Add records a new compensation entry.
func (*CompensationChain) Clear ¶
func (c *CompensationChain) Clear()
Clear removes all compensation entries (called after successful completion).
func (*CompensationChain) Entries ¶
func (c *CompensationChain) Entries() []CompensationEntry
Entries returns all compensation entries in execution order.
func (*CompensationChain) Len ¶
func (c *CompensationChain) Len() int
Len returns the number of compensation entries.
func (*CompensationChain) Reversed ¶
func (c *CompensationChain) Reversed() []CompensationEntry
Reversed returns compensation entries in reverse order for rollback.
type CompensationEntry ¶
type CompensationEntry struct {
// contains filtered or unexported fields
}
CompensationEntry records a node execution for potential rollback. Used by the Saga pattern to track which compensations need to run on failure.
type ConditionalBuilder ¶
type ConditionalBuilder struct {
// contains filtered or unexported fields
}
ConditionalBuilder provides a fluent API for building conditional branches.
func (*ConditionalBuilder) Else ¶
func (cb *ConditionalBuilder) Else() *ConditionalBuilder
Else switches to building the "else" branch (when predicate is false). After calling Else, subsequent Then/ThenParallel calls add to the else branch.
func (*ConditionalBuilder) EndWhen ¶
func (cb *ConditionalBuilder) EndWhen() *FlowBuilder
EndWhen completes the conditional block and returns to the main flow builder. Use this when you don't need an else branch.
func (*ConditionalBuilder) Otherwise ¶
func (cb *ConditionalBuilder) Otherwise(node ExecutableNode) *FlowBuilder
Otherwise adds a single node to the "else" branch and returns to the main flow builder. This is a convenience method for simple conditionals with a single else action.
func (*ConditionalBuilder) OtherwiseParallel ¶
func (cb *ConditionalBuilder) OtherwiseParallel(name string, nodes ...ExecutableNode) *FlowBuilder
OtherwiseParallel adds parallel nodes to the "else" branch and returns to the main flow builder.
func (*ConditionalBuilder) Then ¶
func (cb *ConditionalBuilder) Then(node ExecutableNode) *ConditionalBuilder
Then adds a sequential step to the "then" branch (when predicate is true).
func (*ConditionalBuilder) ThenParallel ¶
func (cb *ConditionalBuilder) ThenParallel(name string, nodes ...ExecutableNode) *ConditionalBuilder
ThenParallel adds a parallel step to the current branch.
type ConditionalConfig ¶
type ConditionalConfig struct {
// contains filtered or unexported fields
}
ConditionalConfig defines the structure of a conditional step.
type ConfigError ¶
ConfigError represents a configuration loading error.
func (*ConfigError) Error ¶
func (e *ConfigError) Error() string
type ConfiguredPageFetcher ¶
type ConfiguredPageFetcher[T any, C any] func(ctx context.Context, config C, cursor string) (PageResult[T], error)
ConfiguredPageFetcher is a page fetcher that also receives custom configuration.
type ContinueAsNewPolicy ¶
ContinueAsNewPolicy controls when the loop emits ContinueAsNew.
type CostEntry ¶
type CostEntry struct {
NodeName string
Model string
Provider string
TokensIn int
TokensOut int
CostUSD float64
Duration time.Duration
Metadata map[string]string
}
CostEntry represents a cost event derived from a node output (typically an LLM call). Emitted by user-supplied AfterNode hooks; the framework itself does not produce CostEntry values.
type Cursor ¶
type Cursor struct {
Source string `json:"source"`
Position string `json:"position"`
UpdatedAt time.Time `json:"updated_at"`
}
Cursor tracks incremental processing position for a data source.
type CursorMarker ¶
type CursorMarker struct {
// contains filtered or unexported fields
}
CursorMarker is a special time value that acts as a placeholder for cursor resolution. The marker contains a unique ID that maps to the cursor source.
type CursorUpdateConfig ¶
CursorUpdateConfig defines how a node updates a cursor after execution.
type DataRef ¶
type DataRef struct {
StorageKey string `json:"storage_key"`
Schema string `json:"schema"`
Count int `json:"count"`
Checksum string `json:"checksum,omitempty"`
Backend string `json:"backend"`
CreatedAt time.Time `json:"created_at"`
}
DataRef is a reference to data stored in an external storage backend. It enables the Claim Check pattern for passing large data between activities without bloating Temporal's workflow history.
func NewDataRef ¶
NewDataRef creates a new DataRef with the given parameters.
func OutputRef ¶
OutputRef creates a DataRef marker that resolves to a previous node's DataRef output. The framework replaces this with the actual DataRef at execution time.
Example:
ollama.BatchEmbed(ollama.BatchEmbedInput{
DocumentsRef: core.OutputRef("jira_docs"),
})
func WindowOutput ¶
func WindowOutput() DataRef
WindowOutput creates a DataRef marker that resolves to the current windowed batch's output. In windowed execution, the framework stores each batch's output under the "__window__" key.
func (DataRef) WithChecksum ¶
WithChecksum adds a checksum to the DataRef for content verification.
type ErrorClassifier ¶
ErrorClassifier determines how an error should be handled for retries.
type ErrorType ¶
type ErrorType int
ErrorType classifies errors for retry decisions.
const ( // ErrorTypeRetryable indicates the error is transient (network timeouts, 5xx). ErrorTypeRetryable ErrorType = iota // ErrorTypeTerminal indicates a permanent failure (4xx auth/validation errors). ErrorTypeTerminal // ErrorTypeFatal indicates an unrecoverable error that should stop the workflow. ErrorTypeFatal )
func HTTPErrorClassifier ¶
HTTPErrorClassifier classifies errors based on HTTP status codes. 5xx and 429 are retryable, 4xx are terminal, others default to retryable.
type ExecutableNode ¶
type ExecutableNode interface {
// Name returns the node's identifier.
Name() string
// OutputKey returns the key used to store this node's output in FlowState.
OutputKey() string
// Execute runs the node within a Temporal workflow context.
Execute(ctx workflow.Context, state *FlowState) error
// Compensate runs the compensation logic if configured (Saga pattern).
Compensate(ctx workflow.Context, state *FlowState) error
// HasCompensation returns true if this node has compensation logic.
HasCompensation() bool
// Compensation returns the compensation node, if any.
Compensation() ExecutableNode
// Input returns the node's input value (used for testing).
// Returns any because the actual type is generic.
Input() any
// RateLimiterID returns the ID of the rate limiter for this node, if any.
// Returns empty string if no rate limiting is configured.
RateLimiterID() string
}
ExecutableNode is the interface implemented by all nodes that can be executed within a flow. This allows type-erased storage of generic nodes in flow steps.
type Flow ¶
type Flow struct {
// contains filtered or unexported fields
}
Flow defines an executable workflow composed of steps.
func (*Flow) StateConfig ¶
func (f *Flow) StateConfig() *StateConfig
StateConfig returns the flow's state configuration, or nil for default.
type FlowBuilder ¶
type FlowBuilder struct {
// contains filtered or unexported fields
}
FlowBuilder provides a fluent API for constructing flows.
func NewFlow ¶
func NewFlow(name string) *FlowBuilder
NewFlow creates a new flow builder with the given name.
func RegisterSignal ¶
func RegisterSignal[T any](b *FlowBuilder, name string) *FlowBuilder
RegisterSignal[T] declares that this flow accepts a signal named `name` whose payload deserializes as T. The registered signal is wired at flow startup: a per-signal goroutine receives from the workflow signal channel directly into a value of type T (so Temporal's data converter does the typed deserialization), then injects it into FlowState's buffer for non-blocking consumption via TakeSignal[T] / TakeAllSignals[T] / PeekSignal[T].
Multiple signals with different payload types may be registered on the same flow. Calling RegisterSignal twice with the same name appends a second receive pump for that name — last write wins on conflicts; prefer to register each name once.
Pairs with the typed query helper WithQueryState[S]: both replace the old untyped Signals().Inject / Signals().Take APIs with type-parameterised access that fails at compile time on mismatch.
func ThenLoop ¶
func ThenLoop[S any](b *FlowBuilder, name string, opts ...LoopOption[S]) *FlowBuilder
ThenLoop adds a Loop step to the builder under the given name.
func ThenParallelEach ¶
func ThenParallelEach[I, O any](b *FlowBuilder, name string, opts ...ParallelEachOption[I, O]) *FlowBuilder
ThenParallelEach adds a ParallelEach step to the builder under the given name. It is a generic free function because Go does not allow generic methods on non-generic types; call sites read flow.ThenParallelEach(b, "name", ...).
func WithQueryState ¶
func WithQueryState[S any](b *FlowBuilder, name, key string) *FlowBuilder
WithQueryState registers a query handler that returns the latest typed value of S persisted at the given StateKey. Pairs with Loop[S]: the Loop runner writes S to its StateKey on every transition (init, before/after each iteration, finalize), so this query always observes a current value or the zero value of S if no Loop has written yet.
Use this in place of capturing a *S in a closure mutated from AfterIteration; that pattern bypasses the read-only FlowStateReader contract and forces every consumer to maintain a parallel pointer.
func (*FlowBuilder) Build ¶
func (b *FlowBuilder) Build() *Flow
Build validates and returns the constructed flow.
func (*FlowBuilder) Then ¶
func (b *FlowBuilder) Then(node ExecutableNode) *FlowBuilder
Then adds a sequential step with a single node.
func (*FlowBuilder) ThenChildren ¶
func (b *FlowBuilder) ThenChildren(name string, config ChildFlowConfig) *FlowBuilder
ThenChildren adds a step that spawns child workflows. Each child receives input from the InputMapper applied to the parent's FlowState.
func (*FlowBuilder) ThenGate ¶
func (b *FlowBuilder) ThenGate(name string, config GateConfig) *FlowBuilder
ThenGate adds a gate step that pauses the flow until an external signal is received. The gate waits for a Temporal signal matching config.SignalName carrying a GateResult.
func (*FlowBuilder) ThenParallel ¶
func (b *FlowBuilder) ThenParallel(name string, nodes ...ExecutableNode) *FlowBuilder
ThenParallel adds a parallel step with multiple nodes executed concurrently.
func (*FlowBuilder) TriggeredBy ¶
func (b *FlowBuilder) TriggeredBy(t Trigger) *FlowBuilder
TriggeredBy sets the trigger that initiates this flow.
func (*FlowBuilder) When ¶
func (b *FlowBuilder) When(pred Predicate) *ConditionalBuilder
When starts a conditional branch based on a predicate. The predicate is evaluated at runtime against the current FlowState.
Example:
flow := NewFlow("order-flow").
TriggeredBy(Manual("test")).
Then(fetchOrder).
When(func(s *FlowState) bool {
return Get[Order](s, "order").Total > 1000
}).
Then(requireApproval).
Otherwise(autoApprove).
Then(fulfillOrder).
Build()
func (*FlowBuilder) WithHooks ¶
func (b *FlowBuilder) WithHooks(hooks *FlowHooks) *FlowBuilder
WithHooks attaches lifecycle callbacks to the flow. Hooks fire at flow, step, and node boundaries with structured context.
func (*FlowBuilder) WithQuery ¶
func (b *FlowBuilder) WithQuery(name string, handler any) *FlowBuilder
WithQuery registers a query handler for this flow. Query handlers are installed at workflow startup alongside signals.
func (*FlowBuilder) WithState ¶
func (b *FlowBuilder) WithState(cfg StateConfig) *FlowBuilder
WithState overrides the default state backend (.resolute/). Use this to configure cloud storage backends (S3, GCS) for production.
type FlowError ¶
type FlowError struct {
FlowName string
StepName string
NodeName string
Input interface{}
Cause error
}
FlowError provides rich context for errors that occur during flow execution.
type FlowHooks ¶
type FlowHooks struct {
BeforeFlow func(HookContext, FlowStateReader)
AfterFlow func(HookContext, FlowStateReader)
BeforeStep func(HookContext, FlowStateReader)
AfterStep func(HookContext, FlowStateReader)
BeforeNode func(HookContext, FlowStateReader)
AfterNode func(HookContext, FlowStateReader)
}
FlowHooks defines observability callbacks invoked at flow, step, and node execution boundaries. All callbacks are optional — nil callbacks are safely skipped.
Hooks run inside Temporal's deterministic workflow context. They must NOT perform I/O directly. They observe; they do not mutate. The FlowStateReader argument is read-only by design — mutations belong in Step execution and Iteration callbacks.
For side effects (e.g. writing events to Postgres), enqueue activities from within the callback.
type FlowState ¶
type FlowState struct {
// contains filtered or unexported fields
}
FlowState carries data through workflow execution. It holds input data, activity outputs, cursor state, and buffered signals for incremental processing.
func NewFlowState ¶
NewFlowState creates a new flow state with the given input.
func (*FlowState) ChildWorkflowID ¶
ChildWorkflowID returns the child workflow ID for a node, if known.
func (*FlowState) ChildWorkflows ¶
ChildWorkflows returns a copy of all registered child workflow IDs.
func (*FlowState) GetInputData ¶
GetInputData retrieves raw input data by key. Returns the byte slice and whether the key was found.
func (*FlowState) GetResult ¶
GetResult retrieves a raw result by key. Returns any because activity result types vary; prefer typed Get[T]() for compile-time type safety.
func (*FlowState) GetWindowMeta ¶
func (s *FlowState) GetWindowMeta() WindowMeta
GetWindowMeta returns the current window meta. Returns zero WindowMeta if not set.
func (*FlowState) HasResult ¶
HasResult returns true if the key exists in state results, even if the stored value is nil. Satisfies the FlowStateReader interface.
func (*FlowState) LoadPersisted ¶
LoadPersisted loads persisted state (cursors) from the configured backend.
func (*FlowState) NewBatchState ¶
NewBatchState creates an isolated state for a windowed batch. Inherits input and cursors from parent. Results, window meta, and signals start empty.
func (*FlowState) RegisterChildWorkflow ¶
RegisterChildWorkflow records a child workflow ID for a node.
func (*FlowState) SavePersisted ¶
SavePersisted saves persisted state (cursors) to the configured backend.
func (*FlowState) SetInputData ¶
SetInputData stores raw input data by key. Nodes can call this to inject context (e.g. previous agent responses) that downstream template markers like {{input:key}} can resolve against.
func (*FlowState) SetResult ¶
SetResult stores a result by key. Accepts any because activity result types vary; type verification happens at retrieval via Get[T]().
func (*FlowState) SetWindowMeta ¶
SetWindowMeta sets the ephemeral window cursor and size for the current batch.
type FlowStateReader ¶
type FlowStateReader interface {
// GetResult returns a raw result by key, or nil if absent. Prefer the
// typed Get[T] / GetSafe[T] free functions for type-safe access.
// Note: nil is also returned for keys present with a nil value; use
// HasResult to distinguish presence from nil-value.
GetResult(key string) any
// HasResult returns true if the key exists in results, regardless of
// whether its stored value is nil.
HasResult(key string) bool
// GetInputData returns raw input data by key.
GetInputData(key string) ([]byte, bool)
// GetCursor returns the cursor for a data source.
GetCursor(source string) Cursor
}
FlowStateReader is the read-only view of FlowState exposed to observability hooks. Mutations belong in Step execution and Iteration callbacks; hooks observe but never write. *FlowState satisfies this interface.
type FlowTemplate ¶
type FlowTemplate struct {
// contains filtered or unexported fields
}
FlowTemplate constructs flows dynamically from runtime configuration. Unlike FlowBuilder's compile-time fluent chain, FlowTemplate accepts step definitions that can vary per execution (e.g. different iteration phases running different subsets of steps).
func NewFlowTemplate ¶
func NewFlowTemplate(name string) *FlowTemplate
NewFlowTemplate creates a new template for dynamic flow construction.
func (*FlowTemplate) AddChildren ¶
func (ft *FlowTemplate) AddChildren(name string, config ChildFlowConfig) *FlowTemplate
AddChildren adds a child workflow spawning step.
func (*FlowTemplate) AddConditional ¶
func (ft *FlowTemplate) AddConditional(pred Predicate, thenSteps []ExecutableNode, elseSteps []ExecutableNode) *FlowTemplate
AddConditional adds a conditional branch evaluated at runtime.
func (*FlowTemplate) AddGate ¶
func (ft *FlowTemplate) AddGate(name string, config GateConfig) *FlowTemplate
AddGate adds a gate step that pauses until a signal is received.
func (*FlowTemplate) AddParallel ¶
func (ft *FlowTemplate) AddParallel(name string, nodes ...ExecutableNode) *FlowTemplate
AddParallel adds a parallel step with multiple nodes.
func (*FlowTemplate) AddStep ¶
func (ft *FlowTemplate) AddStep(node ExecutableNode) *FlowTemplate
AddStep adds a sequential step with a single node.
func (*FlowTemplate) Build ¶
func (ft *FlowTemplate) Build() *Flow
Build validates and returns a Flow from the template.
func (*FlowTemplate) TriggeredBy ¶
func (ft *FlowTemplate) TriggeredBy(t Trigger) *FlowTemplate
TriggeredBy sets the trigger for the flow.
func (*FlowTemplate) WithHooks ¶
func (ft *FlowTemplate) WithHooks(hooks *FlowHooks) *FlowTemplate
WithHooks attaches lifecycle callbacks.
func (*FlowTemplate) WithState ¶
func (ft *FlowTemplate) WithState(cfg StateConfig) *FlowTemplate
WithState sets the state persistence configuration.
type FlowTester ¶
type FlowTester struct {
// contains filtered or unexported fields
}
FlowTester provides a test harness for running flows without Temporal. It allows mocking activity implementations and asserting on execution.
func (*FlowTester) AssertCallCount ¶
func (t *FlowTester) AssertCallCount(tb TestingT, nodeName string, expected int)
AssertCallCount asserts that a node was called exactly n times.
func (*FlowTester) AssertCalled ¶
func (t *FlowTester) AssertCalled(tb TestingT, nodeName string)
AssertCalled asserts that a node was called at least once.
func (*FlowTester) AssertNotCalled ¶
func (t *FlowTester) AssertNotCalled(tb TestingT, nodeName string)
AssertNotCalled asserts that a node was not called.
func (*FlowTester) CallArgs ¶
func (t *FlowTester) CallArgs(nodeName string) []any
CallArgs returns all arguments passed to a node across all calls.
func (*FlowTester) CallCount ¶
func (t *FlowTester) CallCount(nodeName string) int
CallCount returns how many times a node was called.
func (*FlowTester) LastCallArg ¶
func (t *FlowTester) LastCallArg(nodeName string) any
LastCallArg returns the argument from the last call to a node. Returns nil if the node was never called.
func (*FlowTester) Mock ¶
func (t *FlowTester) Mock(nodeName string, fn any) *FlowTester
Mock registers a mock implementation for a node by name. The mock function should have signature: func(I) (O, error) where I and O match the node's input/output types.
Example:
tester.Mock("jira.FetchIssues", func(input jira.FetchIssuesInput) (jira.FetchIssuesOutput, error) {
return jira.FetchIssuesOutput{Count: 5}, nil
})
func (*FlowTester) MockChildFlow ¶
func (t *FlowTester) MockChildFlow(name string, fn func(*FlowState) (*FlowState, error)) *FlowTester
MockChildFlow registers a mock for a child flow node by name. The function receives parent state and returns child state and error.
func (*FlowTester) MockError ¶
func (t *FlowTester) MockError(nodeName string, err error) *FlowTester
MockError registers a mock that always returns an error.
Example:
tester.MockError("jira.FetchIssues", errors.New("api error"))
func (*FlowTester) MockGate ¶
func (t *FlowTester) MockGate(gateName string, result GateResult) *FlowTester
MockGate registers a mock resolution for a gate by name. When the tester encounters this gate, it immediately resolves with the given result.
func (*FlowTester) MockValue ¶
func (t *FlowTester) MockValue(nodeName string, value any) *FlowTester
MockValue registers a mock that always returns a fixed value.
Example:
tester.MockValue("jira.FetchIssues", jira.FetchIssuesOutput{Count: 5})
func (*FlowTester) Reset ¶
func (t *FlowTester) Reset()
Reset clears all call tracking (but keeps mocks).
func (*FlowTester) ResetAll ¶
func (t *FlowTester) ResetAll()
ResetAll clears all mocks and call tracking.
func (*FlowTester) Run ¶
func (t *FlowTester) Run(flow *Flow, input FlowInput) (*FlowState, error)
Run executes the flow synchronously with mocked activities. Returns the final FlowState and any error encountered.
func (*FlowTester) RunWithContext ¶
func (t *FlowTester) RunWithContext(ctx context.Context, flow *Flow, input FlowInput) (*FlowState, error)
RunWithContext executes the flow with a context for cancellation support.
func (*FlowTester) WasCalled ¶
func (t *FlowTester) WasCalled(nodeName string) bool
WasCalled returns true if the node was called at least once.
func (*FlowTester) WithRateLimiting ¶
func (t *FlowTester) WithRateLimiting() *FlowTester
WithRateLimiting enables rate limiting during test execution. By default, rate limiting is disabled in tests for faster execution. Enable this when you want to test rate limiting behavior.
type GateConfig ¶
GateConfig defines how a gate node pauses and resumes.
type GateNode ¶
type GateNode struct {
// contains filtered or unexported fields
}
GateNode pauses a flow until an external signal is received. It implements ExecutableNode so it can be placed in a Step.
func NewGateNode ¶
func NewGateNode(name string, config GateConfig) *GateNode
NewGateNode creates a gate node with the given name and configuration.
func (*GateNode) Compensate ¶
Compensate is a no-op for gates.
func (*GateNode) Compensation ¶
func (g *GateNode) Compensation() ExecutableNode
Compensation returns nil — gates have no compensation node.
func (*GateNode) Execute ¶
Execute waits for a Temporal signal carrying a GateResult. If a timeout is configured, returns GateTimeoutError on expiry.
func (*GateNode) HasCompensation ¶
HasCompensation returns false — gates have no compensation.
func (*GateNode) Input ¶
func (g *GateNode) Input() interface{}
Input returns nil — gates have no input value.
func (*GateNode) RateLimiterID ¶
RateLimiterID returns empty — gates don't use rate limiting.
type GateResult ¶
type GateResult struct {
Approved bool
Decision string
DecidedBy string
DecidedAt time.Time
Reason string
Metadata map[string]string
}
GateResult carries the outcome of a gate decision.
type GateTimeoutError ¶
GateTimeoutError indicates a gate was not resolved within the configured timeout.
func (*GateTimeoutError) Error ¶
func (e *GateTimeoutError) Error() string
type HTTPStatusError ¶
type HTTPStatusError interface {
StatusCode() int
}
HTTPStatusError is implemented by errors that have an HTTP status code.
type HealthCheckFunc ¶
HealthCheckFunc is the signature for provider health check functions.
type HealthServer ¶
type HealthServer struct {
// contains filtered or unexported fields
}
func NewHealthServer ¶
func NewHealthServer() *HealthServer
func (*HealthServer) Handler ¶
func (h *HealthServer) Handler() http.Handler
func (*HealthServer) IsReady ¶
func (h *HealthServer) IsReady() bool
func (*HealthServer) IsStarted ¶
func (h *HealthServer) IsStarted() bool
func (*HealthServer) SetReady ¶
func (h *HealthServer) SetReady(ready bool)
func (*HealthServer) SetStarted ¶
func (h *HealthServer) SetStarted(started bool)
func (*HealthServer) Start ¶
func (h *HealthServer) Start(addr string) error
func (*HealthServer) StartAsync ¶
func (h *HealthServer) StartAsync(addr string) error
type HealthStatus ¶
type HookContext ¶
type HookContext struct {
FlowName string
StepName string
NodeName string
Duration time.Duration // populated in After* callbacks only
Error error // populated in After* callbacks only
}
HookContext carries structured context about the current execution point.
type ItemActivity ¶
ItemActivity is what the user provides as the per-item activity. Typed in I/O so no `any` appears at the user boundary. Production implementations invoke workflow.ExecuteActivity; tests can run synchronously.
type ItemContext ¶
type ItemContext struct {
StepName string
Index int
Duration time.Duration // populated in AfterItem only
}
ItemContext carries structured context about a per-item execution within a ParallelEach step.
type ItemErrorMode ¶
type ItemErrorMode int
ItemErrorMode controls how ParallelEach handles per-item failures.
const ( // ItemErrorContinue: failed items get a zero-value O; Merge sees the // full ordered result list. Suitable for LLM tool dispatch where partial // failures are normal. ItemErrorContinue ItemErrorMode = iota // ItemErrorFailFast: first failure cancels remaining items and the // error propagates from runParallelEach. ItemErrorFailFast )
type IterationEvent ¶
type IterationEvent struct {
Phase string // "started" | "completed"
N int // 1-indexed iteration number
}
IterationEvent is the payload delivered to an IterationHook.
type LocalStorage ¶
type LocalStorage struct {
// contains filtered or unexported fields
}
LocalStorage implements StorageBackend using the local filesystem.
func DefaultLocalStorage ¶
func DefaultLocalStorage() (*LocalStorage, error)
DefaultLocalStorage returns a LocalStorage using .resolute/data directory.
func NewLocalStorage ¶
func NewLocalStorage(basePath string) (*LocalStorage, error)
NewLocalStorage creates a LocalStorage with the given base path.
func (*LocalStorage) Backend ¶
func (s *LocalStorage) Backend() string
Backend returns the backend identifier.
func (*LocalStorage) Delete ¶
func (s *LocalStorage) Delete(ctx context.Context, ref DataRef) error
Delete removes a local file.
type LoopExitReason ¶
type LoopExitReason string
LoopExitReason identifies why a Loop terminated.
const ( LoopExitCondition LoopExitReason = "condition" LoopExitMaxIterations LoopExitReason = "max_iterations" LoopExitCancelled LoopExitReason = "cancelled" LoopExitContinueAsNew LoopExitReason = "continue_as_new" )
type LoopOption ¶
type LoopOption[S any] func(*loopConfig[S])
LoopOption configures a Loop. Apply options via the Loop or ThenLoop constructors.
func AfterIteration ¶
func AfterIteration[S any](fn func(s S, fs FlowStateReader) S) LoopOption[S]
AfterIteration installs a callback that runs at the end of every iteration with read-only FlowStateReader access. Use it to merge Step outputs into S. Returns the next S (written back to the StateKey).
Runs in workflow code; must be deterministic.
func BeforeIteration ¶
func BeforeIteration[S any](fn func(s S, fs *FlowState) (S, bool)) LoopOption[S]
BeforeIteration installs a callback that runs at the start of every iteration with mutating *FlowState access. Use it to drain signals, perform pre-flight checks, and update S in workflow code. Returning skipBody=true skips the Steps body for this iteration but still runs AfterIteration and the While predicate (useful for cancel signals).
Runs in workflow code; must be deterministic.
func ContinueAsNewAfter ¶
func ContinueAsNewAfter[S any](p ContinueAsNewPolicy) LoopOption[S]
ContinueAsNewAfter installs a continue-as-new policy. Default: never. Reserved; consumed when CAN emission lands in a future task.
func FinalState ¶
func FinalState[S any](inject func(*FlowState, S)) LoopOption[S]
FinalState installs an optional callback that runs once after the last iteration to write the final S anywhere the caller wants (e.g. a different FlowState key for downstream Steps).
func InitialState ¶
func InitialState[S any](extract func(*FlowState) S) LoopOption[S]
InitialState supplies the function that produces the seed S for the loop. Called once before the first iteration; the result is written to the FlowState key and threaded into iteration 1's BeforeIteration / Steps. Required.
func IterationHook ¶
func IterationHook[S any](h func(IterationEvent)) LoopOption[S]
IterationHook installs a low-detail per-iteration callback for "iteration N started/completed" event emission. Distinct from FlowHooks (which observe Step/Node lifecycle) and from BeforeIteration/AfterIteration (which thread state). Runs in workflow code; must be deterministic.
func MaxIterations ¶
func MaxIterations[S any](n int) LoopOption[S]
MaxIterations sets the hard ceiling on iteration count. Default 1000.
func StateKey ¶
func StateKey[S any](key string) LoopOption[S]
StateKey sets the FlowState key where the typed state S is persisted across iterations. Required. Reads/writes happen at the start and end of each iteration; downstream code can also read it via core.Get[S].
func Steps ¶
func Steps[S any](steps ...Step) LoopOption[S]
Steps sets the per-iteration Steps body. Required. Each iteration runs these Steps in order through the standard step executor, so observability hooks (BeforeStep / AfterStep / BeforeNode / AfterNode) fire normally.
func While ¶
func While[S any](cond func(S) bool) LoopOption[S]
While sets the continuation predicate. Evaluated after each iteration (after AfterIteration). Must be deterministic over S.
type Metrics ¶
type Metrics struct {
// contains filtered or unexported fields
}
Metrics provides the global metrics recording interface.
type MetricsExporter ¶
type MetricsExporter interface {
CounterInc(name string, labels map[string]string)
HistogramObserve(name string, value float64, labels map[string]string)
}
MetricsExporter defines the interface for exporting metrics to backends. Implementations can target Prometheus, OpenTelemetry, or other systems.
func GetMetricsExporter ¶
func GetMetricsExporter() MetricsExporter
GetMetricsExporter returns the currently configured metrics exporter.
type Node ¶
type Node[I, O any] struct { // contains filtered or unexported fields }
Node represents a single Temporal Activity with typed input/output. Provider functions return ready-to-use nodes with direct struct inputs.
func NewNode ¶
func NewNode[I, O any](name string, activity func(context.Context, I) (O, error), input I) *Node[I, O]
NewNode creates a node wrapping an activity function. This is typically called by provider packages, not directly by users.
func NewNodeByName ¶
NewNodeByName creates a node that executes an activity by its registered name. Use this when the activity is registered with the worker by name (e.g., factory-created closures).
func Paginate ¶
func Paginate[T any](name string, fetcher PageFetcher[T], opts ...PaginationOption) *Node[PaginateInput, PaginateOutput[T]]
Paginate creates a Node that fetches all pages from a paginated source. The fetcher function is called repeatedly until HasMore is false or MaxPages is reached.
Example usage in a provider:
func FetchAllIssues(input FetchAllIssuesInput) *core.Node[core.PaginateInput, core.PaginateOutput[Issue]] {
fetcher := func(ctx context.Context, cursor string) (core.PageResult[Issue], error) {
startAt, _ := strconv.Atoi(cursor)
result, err := client.SearchJQL(ctx, input.JQL, startAt, 100)
if err != nil {
return core.PageResult[Issue]{}, err
}
nextCursor := ""
hasMore := startAt+len(result.Issues) < result.Total
if hasMore {
nextCursor = strconv.Itoa(startAt + len(result.Issues))
}
return core.PageResult[Issue]{Items: result.Issues, NextCursor: nextCursor, HasMore: hasMore}, nil
}
return core.Paginate("jira.FetchAllIssues", fetcher)
}
func PaginateWithConfig ¶
func PaginateWithConfig[T any, C any]( name string, fetcher ConfiguredPageFetcher[T, C], opts ...PaginationOption, ) *Node[PaginateWithInputParams[C], PaginateWithInputOutput[T, C]]
PaginateWithConfig creates a Node that fetches all pages with custom configuration. This is useful when the fetcher needs API credentials, filters, etc.
Example:
type JiraConfig struct {
BaseURL string
APIToken string
JQL string
}
func FetchAllIssues() *core.Node[core.PaginateWithInputParams[JiraConfig], core.PaginateWithInputOutput[Issue, JiraConfig]] {
return core.PaginateWithConfig[Issue, JiraConfig]("jira.FetchAllIssues",
func(ctx context.Context, cfg JiraConfig, cursor string) (core.PageResult[Issue], error) {
// ... fetch logic using cfg
})
}
func (*Node[I, O]) Compensate ¶
Compensate runs the compensation activity if one is configured.
func (*Node[I, O]) Compensation ¶
func (n *Node[I, O]) Compensation() ExecutableNode
Compensation returns the compensation node, if any.
func (*Node[I, O]) HasCompensation ¶
HasCompensation returns true if this node has a compensation handler.
func (*Node[I, O]) OnError ¶
func (n *Node[I, O]) OnError(compensation ExecutableNode) *Node[I, O]
OnError attaches a compensation node to run if subsequent steps fail (Saga pattern).
func (*Node[I, O]) RateLimiterID ¶
RateLimiterID returns the rate limiter ID for this node.
func (*Node[I, O]) WindowConfig ¶
WindowConfig returns the window configuration. Returns zero Window if not set.
func (*Node[I, O]) WithCursorUpdate ¶
WithCursorUpdate configures the node to update a cursor after successful execution. The named field is extracted from the activity output and persisted as the cursor position.
func (*Node[I, O]) WithErrorClassifier ¶
func (n *Node[I, O]) WithErrorClassifier(fn ErrorClassifier) *Node[I, O]
WithErrorClassifier sets a function to classify errors for retry decisions. Terminal errors are marked as non-retryable for Temporal.
Example:
node.WithErrorClassifier(core.HTTPErrorClassifier)
func (*Node[I, O]) WithRateLimit ¶
WithRateLimit configures rate limiting for this node. requests is the maximum number of requests allowed per duration. The rate limiter is unique to this node instance.
Example:
node := jira.FetchIssues(config).WithRateLimit(100, time.Minute)
func (*Node[I, O]) WithRetry ¶
func (n *Node[I, O]) WithRetry(policy RetryPolicy) *Node[I, O]
WithRetry configures the retry policy for this node.
func (*Node[I, O]) WithSharedRateLimit ¶
func (n *Node[I, O]) WithSharedRateLimit(limiter *SharedRateLimiter) *Node[I, O]
WithSharedRateLimit configures this node to use a shared rate limiter. Multiple nodes can share the same rate limiter to coordinate request rates.
Example:
limiter := core.NewSharedRateLimiter("jira-api", 100, time.Minute)
node1 := jira.FetchIssues(config).WithSharedRateLimit(limiter)
node2 := jira.SearchJQL(config).WithSharedRateLimit(limiter)
func (*Node[I, O]) WithTimeout ¶
WithTimeout sets the start-to-close timeout for this node.
func (*Node[I, O]) WithValidation ¶
WithValidation enables input validation using struct tags before execution. Validation tags: required, min=N, max=N, minlen=N, maxlen=N, oneof=a|b|c
Example:
type Input struct {
Name string `validate:"required"`
Age int `validate:"min=0,max=150"`
}
func (*Node[I, O]) WithWindow ¶
WithWindow configures batched/windowed processing for this node. When used in a parallel step, the framework runs the downstream pipeline per batch instead of waiting for all data.
type NoopExporter ¶
type NoopExporter struct{}
NoopExporter is a metrics exporter that does nothing. Useful for testing or when metrics are disabled.
func (*NoopExporter) CounterInc ¶
func (n *NoopExporter) CounterInc(name string, labels map[string]string)
func (*NoopExporter) HistogramObserve ¶
func (n *NoopExporter) HistogramObserve(name string, value float64, labels map[string]string)
type PageFetcher ¶
PageFetcher is the function signature for fetching a single page. It receives a cursor (empty string for first page) and returns a PageResult.
type PageResult ¶
type PageResult[T any] struct { // Items are the results from this page. Items []T // NextCursor is the cursor to fetch the next page. Empty string means no more pages. NextCursor string // HasMore indicates whether there are more pages to fetch. HasMore bool }
PageResult represents a single page of results from a paginated API.
type PaginateInput ¶
type PaginateInput struct {
// StartCursor is the initial cursor position (empty for start).
StartCursor string
}
PaginateInput is the input for a paginated activity.
type PaginateOutput ¶
type PaginateOutput[T any] struct { // Items are all collected items across all pages. Items []T // FinalCursor is the cursor after the last page fetched. FinalCursor string // PageCount is the number of pages fetched. PageCount int // TotalItems is the total count of items fetched. TotalItems int }
PaginateOutput is the output of a paginated activity.
type PaginateWithInputOutput ¶
type PaginateWithInputOutput[T any, C any] struct { PaginateOutput[T] Config C }
PaginateWithInputOutput extends PaginateOutput with the original config.
type PaginateWithInputParams ¶
type PaginateWithInputParams[C any] struct { // Config is the custom configuration passed to each page fetch. Config C // StartCursor is the initial cursor position. StartCursor string }
PaginateWithInput creates a Paginate node that also passes through custom input. This is useful when the fetcher needs configuration beyond just the cursor.
type PaginationConfig ¶
type PaginationConfig struct {
// MaxPages limits the number of pages to fetch (0 = unlimited).
MaxPages int
// PageSize is a hint to the fetcher for items per page.
PageSize int
// CursorSource is the key for persisting cursor to FlowState between runs.
// Empty string means no cursor persistence.
CursorSource string
}
PaginationConfig configures pagination behavior.
type PaginationOption ¶
type PaginationOption func(*PaginationConfig)
PaginationOption configures pagination behavior.
func WithCursorSource ¶
func WithCursorSource(source string) PaginationOption
WithCursorSource enables cursor persistence to FlowState. The cursor is saved after fetching and restored on next run.
func WithMaxPages ¶
func WithMaxPages(n int) PaginationOption
WithMaxPages limits the number of pages to fetch.
type ParallelEachOption ¶
type ParallelEachOption[I, O any] func(*parallelEachConfig[I, O])
ParallelEachOption configures a ParallelEach step. Typed in I and O.
func AfterItem ¶
func AfterItem[I, O any](fn func(ItemContext, I, O, error, FlowStateReader)) ParallelEachOption[I, O]
AfterItem installs an observability callback fired once per item, immediately after the item activity completes (whether success or error). Receives the typed item I, the typed output O (zero-value on error), the activity error (nil on success), and a read-only FlowStateReader. Runs in workflow code; must be deterministic.
func BeforeItem ¶
func BeforeItem[I, O any](fn func(ItemContext, I, FlowStateReader)) ParallelEachOption[I, O]
BeforeItem installs an observability callback fired once per item, immediately before the item activity starts. Receives the typed item I and a read-only FlowStateReader. Runs in workflow code; must be deterministic.
func From ¶
func From[I, O any](fn func(*FlowState) []I) ParallelEachOption[I, O]
From sets the per-item input deriver. Required.
The closure must be deterministic over *FlowState — it runs in workflow code.
func MaxConcurrency ¶
func MaxConcurrency[I, O any](n int) ParallelEachOption[I, O]
MaxConcurrency caps simultaneous activities. Default unbounded (0).
Useful for rate-limited downstream services.
func Merge ¶
func Merge[I, O any](fn func(*FlowState, []O)) ParallelEachOption[I, O]
Merge sets the result folder. Required. Receives *FlowState and the ordered []O (length == len([]I)). On ContinueOnError, failed items produce zero-value O entries.
The closure must be deterministic over its inputs — it runs in workflow code.
func OnItemError ¶
func OnItemError[I, O any](m ItemErrorMode) ParallelEachOption[I, O]
OnItemError controls failed-item handling. Default ItemErrorContinue.
func ParallelBody ¶
func ParallelBody[I, O any](a ItemActivity[I, O]) ParallelEachOption[I, O]
ParallelBody sets the per-item activity. Required.
func PerItemRetry ¶
func PerItemRetry[I, O any](p RetryPolicyOverride) ParallelEachOption[I, O]
PerItemRetry overrides the activity-default retry policy for each item.
Reserved; consumed when activity-options plumbing lands in a future task.
type PersistedState ¶
type PersistedState struct {
Cursors map[string]Cursor `json:"cursors"`
Metadata map[string]string `json:"metadata,omitempty"`
Version int64 `json:"version"`
UpdatedAt time.Time `json:"updated_at"`
}
PersistedState is the data structure saved between workflow runs.
func (*PersistedState) MarshalJSON ¶
func (p *PersistedState) MarshalJSON() ([]byte, error)
MarshalJSON implements json.Marshaler.
type PrometheusExporter ¶
type PrometheusExporter struct {
// contains filtered or unexported fields
}
PrometheusExporter implements MetricsExporter for Prometheus. It lazily creates and caches metric collectors on first use.
func NewPrometheusExporter ¶
func NewPrometheusExporter() *PrometheusExporter
NewPrometheusExporter creates a new Prometheus metrics exporter. The exporter registers metrics with the default Prometheus registry.
func (*PrometheusExporter) CounterInc ¶
func (e *PrometheusExporter) CounterInc(name string, labels map[string]string)
CounterInc increments a counter metric.
func (*PrometheusExporter) HistogramObserve ¶
func (e *PrometheusExporter) HistogramObserve(name string, value float64, labels map[string]string)
HistogramObserve records a value in a histogram metric.
type Provider ¶
type Provider interface {
Name() string
Version() string
Activities() []ActivityMeta
HealthCheck(ctx context.Context) error
}
Provider represents a collection of related activities. Providers are the building blocks of resolute workflows, each exposing a set of activities that can be used in flow definitions.
type ProviderRegistry ¶
type ProviderRegistry struct {
// contains filtered or unexported fields
}
ProviderRegistry tracks available providers and their activities.
func NewProviderRegistry ¶
func NewProviderRegistry() *ProviderRegistry
NewProviderRegistry creates a new provider registry.
func (*ProviderRegistry) Get ¶
func (r *ProviderRegistry) Get(name string) (Provider, bool)
Get returns a provider by name.
func (*ProviderRegistry) List ¶
func (r *ProviderRegistry) List() []Provider
List returns all registered providers.
func (*ProviderRegistry) Register ¶
func (r *ProviderRegistry) Register(p Provider) error
Register adds a provider to the registry. Returns an error if a provider with the same name is already registered.
func (*ProviderRegistry) RegisterActivities ¶
func (r *ProviderRegistry) RegisterActivities(w worker.Worker, providerName string) error
RegisterActivities registers all activities from a provider with a Temporal worker.
func (*ProviderRegistry) RegisterAllActivities ¶
func (r *ProviderRegistry) RegisterAllActivities(w worker.Worker)
RegisterAllActivities registers all activities from all providers with a Temporal worker.
type QueryDef ¶
type QueryDef struct {
Name string
Handler any // pre-built handler (WithQuery)
Build func(*FlowState) any // built per execution against live FlowState (WithQueryState)
}
QueryDef defines a query handler for a flow. Either Handler is set (the handler is constructed at builder time, by WithQuery) or Build is set (the handler is constructed at execute time with access to the live FlowState, by WithQueryState). Exactly one of the two is non-nil.
type RateLimitConfig ¶
RateLimitConfig holds rate limit configuration.
type RateLimiter ¶
RateLimiter controls the rate of operations.
type RecordActivityExecutionInput ¶
RecordActivityExecutionInput holds parameters for recording activity execution metrics.
type RecordFlowExecutionInput ¶
RecordFlowExecutionInput holds parameters for recording flow execution metrics.
type RecordRateLimitWaitInput ¶
RecordRateLimitWaitInput holds parameters for recording rate limit wait metrics.
type RecordStateOperationInput ¶
type RecordStateOperationInput struct {
Operation string
}
RecordStateOperationInput holds parameters for recording state operation metrics.
type RegisterActivitiesFunc ¶
RegisterActivitiesFunc is a helper type for provider packages that want to provide a simple RegisterActivities function without the full Provider interface.
type RetryPolicy ¶
type RetryPolicy struct {
InitialInterval time.Duration
BackoffCoefficient float64
MaximumInterval time.Duration
MaximumAttempts int32
}
RetryPolicy defines retry behavior for failed activities.
type RetryPolicyOverride ¶
type RetryPolicyOverride struct {
InitialInterval string // duration string, parsed when wired
BackoffCoefficient float64
MaxAttempts int
}
RetryPolicyOverride configures per-item retry behavior. Reserved for a future task; the current runner does not consume it.
type SharedRateLimiter ¶
type SharedRateLimiter struct {
// contains filtered or unexported fields
}
SharedRateLimiter represents a rate limiter that can be shared across multiple nodes.
func NewSharedRateLimiter ¶
func NewSharedRateLimiter(name string, requests int, per time.Duration) *SharedRateLimiter
NewSharedRateLimiter creates a new shared rate limiter.
func (*SharedRateLimiter) Close ¶
func (s *SharedRateLimiter) Close()
Close unregisters the rate limiter from the global registry.
func (*SharedRateLimiter) ID ¶
func (s *SharedRateLimiter) ID() string
ID returns the rate limiter's unique identifier.
type SignalBuffer ¶
type SignalBuffer struct {
// contains filtered or unexported fields
}
SignalBuffer holds buffered signal payloads per signal name. Storage is untyped at the map level because signals with different payload types share the buffer; type safety is enforced at registration (RegisterSignal[T]) and recovered at consumption (TakeSignal[T] / TakeAllSignals[T] / PeekSignal[T]).
Constructed once per FlowState by NewFlowState. Direct construction is not part of the public API; callers reach the buffer through the typed accessors above.
type SignalDef ¶
type SignalDef struct {
Name string
// contains filtered or unexported fields
}
SignalDef is the internal record of a signal registration. Construct via RegisterSignal[T] — the typed registration helper that wires the receive pump and ensures payloads enter the buffer with a known type.
type StateBackend ¶
type StateBackend interface {
Load(workflowID, flowName string) (*PersistedState, error)
Save(workflowID, flowName string, state *PersistedState) error
}
StateBackend interface for pluggable state persistence.
type StateConfig ¶
type StateConfig struct {
Backend StateBackend
Namespace string
}
StateConfig defines state persistence behavior.
type Step ¶
type Step struct {
// contains filtered or unexported fields
}
Step represents one execution unit within a flow. A step can contain one node (sequential), multiple nodes (parallel), a conditional branch, a gate, or child workflows.
func AsStep ¶
func AsStep(n ExecutableNode) Step
AsStep wraps any ExecutableNode as a single-node sequential Step suitable for use inside Loop's Steps option (or anywhere a Step is expected). The step name defaults to the node name.
func Loop ¶
func Loop[S any](opts ...LoopOption[S]) Step
Loop returns a Step that re-executes a Steps body while While returns true, capped by MaxIterations. State is threaded between iterations as the typed value S; the framework persists S at the FlowState key declared via StateKey before/after each iteration so observability hooks and downstream readers see a consistent value.
Determinism contract: Steps may invoke activities (no determinism constraint). The While predicate, BeforeIteration, AfterIteration, InitialState, FinalState, IterationHook, and MaxIterations all run in workflow code and must be deterministic.
Example ¶
ExampleLoop demonstrates the Steps-based Loop API: each iteration runs the noop body Step, AfterIteration mutates typed state by incrementing a counter, and While exits when the counter reaches 2.
package main
import (
"context"
"fmt"
"github.com/resolute-sh/resolute/core"
)
func main() {
type tick struct{ N int }
noop := func(_ context.Context, _ struct{}) (struct{}, error) {
return struct{}{}, nil
}
noopNode := core.NewNode("noop", noop, struct{}{})
step := core.Loop[tick](
core.StateKey[tick]("ticker"),
core.InitialState[tick](func(_ *core.FlowState) tick { return tick{N: 0} }),
core.Steps[tick](core.AsStep(noopNode)),
core.AfterIteration[tick](func(s tick, _ core.FlowStateReader) tick {
s.N++
return s
}),
core.While[tick](func(s tick) bool { return s.N < 2 }),
core.MaxIterations[tick](10),
)
_ = step
fmt.Println("loop step constructed")
}
Output: loop step constructed
func ParallelEach ¶
func ParallelEach[I, O any](opts ...ParallelEachOption[I, O]) Step
ParallelEach returns a Step that fans out one activity per element of a runtime-derived list, awaits all, and folds the results back into *FlowState via the user-provided merge closure.
Concretely: From(fs) -> []I; for each I, body.Execute(ctx, I) -> O; then merge(fs, []O) writes the aggregated results back. Activities are scheduled via workflow.Go; concurrency is bounded by MaxConcurrency when set.
Determinism contract: From and Merge run in workflow code and must be deterministic functions of *FlowState. The body activity has no determinism constraint.
History event emission (ParallelEachStarted/ParallelEachCompleted markers) is planned for a future task; the current implementation only surfaces item errors via the activity error chain.
Example ¶
ExampleParallelEach demonstrates constructing a ParallelEach step that fans out one activity per element of a runtime-derived slice and folds the ordered results back into *FlowState via Merge.
The ParallelBody option is omitted here for brevity; production callers pass a real ItemActivity[I, O] whose Execute typically calls workflow.ExecuteActivity to invoke a registered activity. From and Merge run in workflow code and must be deterministic over *FlowState.
package main
import (
"fmt"
"github.com/resolute-sh/resolute/core"
)
func main() {
step := core.ParallelEach[int, int](
core.From[int, int](func(_ *core.FlowState) []int { return []int{1, 2, 3} }),
core.Merge[int, int](func(_ *core.FlowState, _ []int) {}),
core.MaxConcurrency[int, int](4),
)
_ = step
fmt.Println("parallel-each step constructed")
}
Output: parallel-each step constructed
type Storage ¶
type Storage struct {
// contains filtered or unexported fields
}
Storage provides typed helpers for storing and loading data.
func GetStorage ¶
GetStorage returns the global storage instance. Initializes with local storage on first call.
func NewStorage ¶
func NewStorage(backend StorageBackend) *Storage
NewStorage creates a new Storage with the given backend.
type StorageBackend ¶
type StorageBackend interface {
// Store saves data and returns a DataRef pointing to it.
Store(ctx context.Context, schema string, data []byte) (DataRef, error)
// Load retrieves data by its DataRef.
Load(ctx context.Context, ref DataRef) ([]byte, error)
// Delete removes data referenced by DataRef.
Delete(ctx context.Context, ref DataRef) error
// Backend returns the backend identifier.
Backend() string
}
StorageBackend defines the interface for storing and retrieving data referenced by DataRef. Implementations handle the actual persistence.
type TokenBucket ¶
type TokenBucket struct {
// contains filtered or unexported fields
}
TokenBucket implements a token bucket rate limiter. It allows bursts up to the bucket capacity while maintaining an average rate over time.
func NewTokenBucket ¶
func NewTokenBucket(requests int, per time.Duration) *TokenBucket
NewTokenBucket creates a new token bucket rate limiter. requests is the number of allowed requests per duration. The bucket starts full, allowing an initial burst.
func (*TokenBucket) Tokens ¶
func (tb *TokenBucket) Tokens() float64
Tokens returns the current number of available tokens.
func (*TokenBucket) TryAcquire ¶
func (tb *TokenBucket) TryAcquire() bool
TryAcquire attempts to acquire a token without blocking. Returns true if a token was acquired, false otherwise.
type Trigger ¶
type Trigger interface {
// Type returns the trigger type identifier.
Type() TriggerType
// Config returns the trigger-specific configuration.
Config() TriggerConfig
}
Trigger defines how a flow is initiated.
func Manual ¶
Manual creates a trigger for API-initiated flow execution.
Example:
flow := core.NewFlow("my-flow").
TriggeredBy(core.Manual("api-trigger")).
Then(myNode).
Build()
func Schedule ¶
Schedule creates a trigger for cron-scheduled flow execution. Uses standard cron expression format (minute hour day month weekday). Defaults to UTC. Use ScheduleWithTimezone for timezone-aware scheduling.
Example:
flow := core.NewFlow("daily-sync").
TriggeredBy(core.Schedule("0 2 * * *")). // Daily at 2 AM UTC
Then(syncNode).
Build()
func ScheduleWithTimezone ¶
ScheduleWithTimezone creates a trigger for cron-scheduled flow execution with an explicit IANA timezone (e.g. "America/Los_Angeles"). The timezone maps to Temporal's ScheduleSpec.TimeZoneName.
Example:
flow := core.NewFlow("morning-sync").
TriggeredBy(core.ScheduleWithTimezone("0 6 * * 1-5", "America/Los_Angeles")).
Then(syncNode).
Build()
type TriggerConfig ¶
type TriggerConfig struct {
// ID is the identifier for manual triggers (API endpoint path).
ID string
// CronSchedule is the cron expression for scheduled triggers.
CronSchedule string
// Timezone is the IANA time zone name for scheduled triggers (e.g. "America/Los_Angeles").
// Maps to Temporal's ScheduleSpec.TimeZoneName. Empty defaults to UTC.
Timezone string
// SignalName is the Temporal signal name for signal triggers.
SignalName string
// WebhookPath is the HTTP path for webhook triggers.
WebhookPath string
// WebhookMethod is the HTTP method for webhook triggers (default: POST).
WebhookMethod string
// WebhookSecret is the HMAC secret for webhook signature verification.
WebhookSecret string
}
TriggerConfig holds trigger-specific configuration.
type TriggerType ¶
type TriggerType string
TriggerType identifies the type of trigger.
const ( // TriggerManual indicates the flow is started via API call. TriggerManual TriggerType = "manual" // TriggerSchedule indicates the flow runs on a cron schedule. TriggerSchedule TriggerType = "schedule" // TriggerSignal indicates the flow starts from a Temporal signal. TriggerSignal TriggerType = "signal" )
type ValidationError ¶
ValidationError represents a single field validation failure.
func (ValidationError) Error ¶
func (e ValidationError) Error() string
type ValidationErrors ¶
type ValidationErrors []ValidationError
ValidationErrors is a collection of validation failures.
func (ValidationErrors) Error ¶
func (e ValidationErrors) Error() string
type WebhookBuilder ¶
type WebhookBuilder struct {
// contains filtered or unexported fields
}
WebhookBuilder provides a fluent API for configuring webhook triggers.
func Webhook ¶
func Webhook(path string) *WebhookBuilder
Webhook creates a trigger for HTTP webhook-initiated flow execution. The path should start with "/" and will be the endpoint that receives webhook requests.
Example:
flow := core.NewFlow("github-handler").
TriggeredBy(core.Webhook("/hooks/github").WithMethod("POST")).
Then(processWebhookNode).
Build()
func (*WebhookBuilder) Build ¶
func (b *WebhookBuilder) Build() Trigger
Build returns the configured trigger. This is called implicitly when passed to TriggeredBy.
func (*WebhookBuilder) Config ¶
func (b *WebhookBuilder) Config() TriggerConfig
Config implements Trigger interface for WebhookBuilder.
func (*WebhookBuilder) Type ¶
func (b *WebhookBuilder) Type() TriggerType
Type implements Trigger interface for WebhookBuilder.
func (*WebhookBuilder) WithMethod ¶
func (b *WebhookBuilder) WithMethod(method string) *WebhookBuilder
WithMethod sets the HTTP method for the webhook (default: POST).
func (*WebhookBuilder) WithSecret ¶
func (b *WebhookBuilder) WithSecret(secret string) *WebhookBuilder
WithSecret sets the HMAC secret for webhook signature verification. When set, incoming webhooks must include a valid X-Webhook-Signature header.
type WebhookResponse ¶
type WebhookResponse struct {
WorkflowID string `json:"workflow_id"`
RunID string `json:"run_id"`
Status string `json:"status"`
}
WebhookResponse is returned to webhook callers.
type WebhookServer ¶
type WebhookServer struct {
// contains filtered or unexported fields
}
WebhookServer handles incoming webhook requests and starts Temporal workflows.
func NewWebhookServer ¶
func NewWebhookServer(cfg WebhookServerConfig) *WebhookServer
NewWebhookServer creates a new webhook server.
func (*WebhookServer) Handler ¶
func (s *WebhookServer) Handler() http.Handler
Handler returns the HTTP handler for use with custom servers.
func (*WebhookServer) RegisterFlow ¶
func (s *WebhookServer) RegisterFlow(flow *Flow) error
RegisterFlow registers a flow with a webhook trigger. Returns an error if the flow doesn't have a webhook trigger or if the path is already registered.
func (*WebhookServer) Routes ¶
func (s *WebhookServer) Routes() []string
Routes returns the registered webhook paths.
func (*WebhookServer) Shutdown ¶
func (s *WebhookServer) Shutdown(ctx context.Context) error
Shutdown gracefully shuts down the webhook server.
func (*WebhookServer) Start ¶
func (s *WebhookServer) Start(addr string) error
Start starts the webhook server on the given address.
func (*WebhookServer) StartAsync ¶
func (s *WebhookServer) StartAsync(addr string) error
StartAsync starts the webhook server in the background. Returns immediately after the server starts listening.
type WebhookServerConfig ¶
WebhookServerConfig holds configuration for the webhook server.
type Window ¶
type Window struct {
Size int // max items per batch (0 = disabled, fetch all)
}
Window configures batched/windowed processing for a node. When attached to a node in a parallel step, the framework loops: fetch batch → run downstream pipeline → persist cursors → repeat.
type WindowMeta ¶
WindowMeta holds ephemeral window cursor and size for the current batch iteration.
type WindowedNode ¶
type WindowedNode interface {
ExecutableNode
WindowConfig() Window
}
WindowedNode extends ExecutableNode with window configuration. Nodes that implement this interface support batched/windowed processing.
type WorkerBuilder ¶
type WorkerBuilder struct {
// contains filtered or unexported fields
}
WorkerBuilder provides a fluent API for constructing and running a Temporal worker.
func NewWorker ¶
func NewWorker() *WorkerBuilder
NewWorker creates a new worker builder with environment defaults loaded.
func (*WorkerBuilder) Build ¶
func (b *WorkerBuilder) Build() error
Build creates the Temporal client and worker without starting them. This is useful for testing or custom lifecycle management. Safe to call multiple times — subsequent calls are no-ops.
func (*WorkerBuilder) Client ¶
func (b *WorkerBuilder) Client() client.Client
Client returns the underlying Temporal client after Build() has been called. Returns nil if Build() has not been called.
func (*WorkerBuilder) HealthServer ¶
func (b *WorkerBuilder) HealthServer() *HealthServer
HealthServer returns the health server if configured. Returns nil if health server is not enabled or Build() has not been called.
func (*WorkerBuilder) Run ¶
func (b *WorkerBuilder) Run() error
Run builds and runs the worker, blocking until interrupted. This is the typical entry point for a worker process.
func (*WorkerBuilder) RunAsync ¶
func (b *WorkerBuilder) RunAsync() (shutdown func(), err error)
RunAsync builds and starts the worker in the background. Returns a shutdown function that should be called to stop the worker.
func (*WorkerBuilder) WebhookServer ¶
func (b *WorkerBuilder) WebhookServer() *WebhookServer
WebhookServer returns the webhook server if configured. Returns nil if webhook server is not enabled or Build() has not been called.
func (*WorkerBuilder) WithConfig ¶
func (b *WorkerBuilder) WithConfig(cfg WorkerConfig) *WorkerBuilder
WithConfig sets the worker configuration. Empty fields will be populated from environment variables or defaults.
func (*WorkerBuilder) WithFlow ¶
func (b *WorkerBuilder) WithFlow(f *Flow) *WorkerBuilder
WithFlow adds a flow to be registered with this worker. Multiple flows can be registered by calling WithFlow multiple times.
func (*WorkerBuilder) WithHealthServer ¶
func (b *WorkerBuilder) WithHealthServer(addr string) *WorkerBuilder
WithHealthServer enables Kubernetes-compatible health endpoints on the specified address. Provides /health/live, /health/ready, and /health/startup endpoints.
Example:
worker := core.NewWorker().
WithConfig(cfg).
WithFlow(flow).
WithHealthServer(":8081").
Run()
func (*WorkerBuilder) WithMetrics ¶
func (b *WorkerBuilder) WithMetrics(exporter MetricsExporter) *WorkerBuilder
WithMetrics enables metrics collection with the provided exporter. Metrics are recorded for flow executions, activity durations, errors, and rate limiting.
Example:
worker := core.NewWorker().
WithConfig(cfg).
WithFlow(flow).
WithMetrics(core.NewPrometheusExporter()).
Run()
func (*WorkerBuilder) WithProviders ¶
func (b *WorkerBuilder) WithProviders(providers ...Provider) *WorkerBuilder
WithProviders adds providers whose activities will be registered with the worker.
func (*WorkerBuilder) WithWebhookServer ¶
func (b *WorkerBuilder) WithWebhookServer(addr string) *WorkerBuilder
WithWebhookServer enables the webhook server on the specified address. If the flow has a webhook trigger, incoming webhooks will start workflow executions.
Example:
worker := core.NewWorker().
WithConfig(cfg).
WithFlow(flow).
WithWebhookServer(":8080").
Run()
func (*WorkerBuilder) Worker ¶
func (b *WorkerBuilder) Worker() worker.Worker
Worker returns the underlying Temporal worker after Build() has been called. Returns nil if Build() has not been called.
type WorkerConfig ¶
type WorkerConfig struct {
TemporalHost string // Default: TEMPORAL_HOST env or "localhost:7233"
TaskQueue string // Required - no default
Namespace string // Default: TEMPORAL_NAMESPACE env or "default"
MaxConcurrent int // Default: 0 (unlimited)
}
WorkerConfig holds configuration for connecting to Temporal and running the worker.
func (*WorkerConfig) Validate ¶
func (c *WorkerConfig) Validate() error
Validate checks that required fields are set.
Source Files
¶
- childflow.go
- compensation.go
- conditional.go
- config.go
- cursor.go
- dataref.go
- errors.go
- executable.go
- flow.go
- gate.go
- health.go
- hooks.go
- loop.go
- metrics.go
- metrics_prometheus.go
- node.go
- pagination.go
- parallel_each.go
- provider.go
- ratelimit.go
- signal_buffer.go
- signals.go
- state.go
- storage.go
- template.go
- testing.go
- trigger.go
- validation.go
- webhook.go
- worker.go