core

package
v0.7.0-alpha Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 10, 2026 License: MIT Imports: 29 Imported by: 7

Documentation

Overview

Package core provides the fundamental primitives for building resolute workflows.

Index

Examples

Constants

View Source
const (
	BackendLocal = "local"
	BackendS3    = "s3"
	BackendGCS   = "gcs"
)

Backend identifiers

View Source
const (
	// TriggerWebhook indicates the flow is started via HTTP webhook.
	TriggerWebhook TriggerType = "webhook"

	// DefaultWebhookMethod is the default HTTP method for webhooks.
	DefaultWebhookMethod = "POST"

	// WebhookSignatureHeader is the header containing the HMAC signature.
	WebhookSignatureHeader = "X-Webhook-Signature"
)

Variables

This section is empty.

Functions

func ClearMetricsExporter

func ClearMetricsExporter()

ClearMetricsExporter removes the global metrics exporter (for testing).

func ComputeWebhookSignature

func ComputeWebhookSignature(payload []byte, secret string) string

ComputeWebhookSignature computes a signature for testing or external webhook senders.

func CursorFor

func CursorFor(source string) *time.Time

CursorFor creates a magic marker that resolves to the persisted cursor for a source. The framework replaces this with the actual cursor value at execution time.

Example:

jira.FetchIssues(jira.Input{
    Project: "PLATFORM",
    Since:   core.CursorFor("jira"),
})

func CursorString

func CursorString(source string) string

CursorString creates a string marker for cursor reference (for string fields).

func ExampleActivity

func ExampleActivity(ctx context.Context, input struct{}) (struct{}, error)

ExampleActivity is a sample activity function signature for documentation. All activities must follow this pattern: func(context.Context, Input) (Output, error)

func Get

func Get[T any](s FlowStateReader, key string) T

Get retrieves a typed value from results. Panics if the key doesn't exist or type doesn't match.

func GetOr

func GetOr[T any](s FlowStateReader, key string, defaultVal T) T

GetOr retrieves a typed value from results with a default fallback.

func GetSafe

func GetSafe[T any](s FlowStateReader, key string) (T, error)

GetSafe retrieves a typed value from results with error handling. Returns an error if the key doesn't exist or type doesn't match.

func GetWebhookHeaders

func GetWebhookHeaders(input FlowInput) http.Header

GetWebhookHeaders extracts the webhook headers from FlowInput.

func GetWebhookPayload

func GetWebhookPayload(input FlowInput) []byte

GetWebhookPayload extracts the webhook payload from FlowInput.

func Has

func Has(s FlowStateReader, key string) bool

Has returns true if the key exists in state results, even if the stored value is nil.

func InjectSignal

func InjectSignal[T any](fs *FlowState, name string, payload T)

InjectSignal appends a typed payload directly to the buffer. Used by tests to simulate signal arrival without driving the workflow signal channel.

Production code should never call this — production payloads enter the buffer via the receive pump installed by RegisterSignal[T].

func InputData

func InputData(key string) string

InputData creates a magic marker that resolves to a value from the flow's initial input. Webhook-triggered flows store payload data in FlowInput; this marker provides access from activity input structs.

Example:

bitbucket.ParseWebhook(bitbucket.ParseWebhookInput{
    RawPayload: core.InputData("webhook_payload"),
})

func IsFatalError

func IsFatalError(err error) bool

IsFatalError checks if an error has been classified as fatal.

func IsOutputRefMarker

func IsOutputRefMarker(ref DataRef) bool

IsOutputRefMarker returns true if the DataRef is a marker for output resolution.

func IsTerminalError

func IsTerminalError(err error) bool

IsTerminalError checks if an error has been classified as terminal.

func Keys

func Keys(s FlowStateReader) []string

Keys returns all keys in state results, sorted alphabetically. Returns nil when the underlying reader is not a *FlowState — exposing enumeration on the bare interface would force every adapter to implement it, and hooks rarely need to enumerate.

func LoadConfig

func LoadConfig[T any](prefix string) (T, error)

LoadConfig loads configuration from environment variables into a struct. The prefix is prepended to all environment variable names with an underscore.

Supported struct tags:

  • env:"VAR_NAME" - the environment variable suffix (prefix_VAR_NAME)
  • required:"true" - fail if the variable is not set
  • default:"value" - use this value if the variable is not set

Supported field types: string, int, int64, bool, float64, time.Duration

Example:

type Config struct {
    BaseURL string        `env:"BASE_URL" required:"true"`
    Token   string        `env:"TOKEN" required:"true"`
    Timeout time.Duration `env:"TIMEOUT" default:"30s"`
    Retries int           `env:"RETRIES" default:"3"`
}

cfg, err := LoadConfig[Config]("JIRA")
// Reads JIRA_BASE_URL, JIRA_TOKEN, JIRA_TIMEOUT, JIRA_RETRIES

func MustGet

func MustGet[T any](s FlowStateReader, key string) T

MustGet retrieves a typed value from results. Panics with a descriptive error if the key doesn't exist or type doesn't match.

func MustLoadConfig

func MustLoadConfig[T any](prefix string) T

MustLoadConfig loads configuration and panics if there's an error. Use this in main() or init() where you want to fail fast.

func Output

func Output(path string) string

Output creates a magic marker that resolves to a previous node's output. The framework replaces this with the actual value at execution time.

Example:

gcp.CreateSubnet(gcp.SubnetInput{
    VPC: core.Output("vpc.name"),
})

func ParseWebhookPayload

func ParseWebhookPayload[T any](input FlowInput) (T, error)

ParseWebhookPayload parses the webhook payload into a typed struct.

func PeekSignal

func PeekSignal[T any](fs *FlowState, name string) (T, bool)

PeekSignal returns the oldest buffered payload for `name`, typed as T, without removing it. Returns (zero, false) if no payload is buffered.

Panics under the same condition as TakeSignal.

func RateLimitWaitActivity

func RateLimitWaitActivity(ctx context.Context, limiterID string) error

RateLimitWaitActivity is a local activity that waits on a rate limiter. This is used to rate limit workflow activities without violating determinism.

func RecordActivityExecution

func RecordActivityExecution(input RecordActivityExecutionInput)

RecordActivityExecution records metrics for a completed activity execution.

func RecordFlowExecution

func RecordFlowExecution(input RecordFlowExecutionInput)

RecordFlowExecution records metrics for a completed flow execution.

func RecordRateLimitWait

func RecordRateLimitWait(input RecordRateLimitWaitInput)

RecordRateLimitWait records metrics for rate limiter wait time.

func RecordStateOperation

func RecordStateOperation(input RecordStateOperationInput)

RecordStateOperation records metrics for state backend operations.

func RegisterProviderActivities

func RegisterProviderActivities(w worker.Worker, p Provider)

RegisterProviderActivities is a helper function to register activities from a provider directly with a Temporal worker without using the registry.

func ResolveCursorRef

func ResolveCursorRef(source string, state *FlowState) *time.Time

ResolveCursorRef looks up a cursor value from the flow state.

func ResolveOutputRef

func ResolveOutputRef(path string, state *FlowState) (string, bool)

ResolveOutputRef looks up a previous node's output from the flow state.

func ResolveStringRef

func ResolveStringRef(s string, state *FlowState) (string, error)

ResolveStringRef resolves a string that may contain output/input/cursor markers.

func Set

func Set[T any](s *FlowState, key string, value T)

Set stores a typed value in results. Mutating accessor — takes *FlowState, not FlowStateReader, by design (hooks observe; they do not mutate).

func SetDefaultBackend

func SetDefaultBackend(b StateBackend)

SetDefaultBackend allows overriding the default backend (for testing).

func SetMetricsExporter

func SetMetricsExporter(exporter MetricsExporter)

SetMetricsExporter configures the global metrics exporter. Call this during application initialization before starting workflows.

func SetStorage

func SetStorage(s *Storage)

SetStorage sets the global storage instance. Call this early in initialization to use a custom backend.

func SignalCount

func SignalCount(fs *FlowState, name string) int

SignalCount returns the number of buffered payloads for `name`.

func TakeAllSignals

func TakeAllSignals[T any](fs *FlowState, name string) []T

TakeAllSignals returns all buffered payloads for `name`, typed as T, and clears the buffer. Returns nil when no payloads are buffered.

Panics under the same condition as TakeSignal — caller-provided T must match the registered T for `name`.

func TakeSignal

func TakeSignal[T any](fs *FlowState, name string) (T, bool)

TakeSignal removes and returns the oldest buffered payload for `name`, typed as T. Returns (zero, false) if no payload is buffered.

Panics if the buffered payload is not assertable to T. By construction (RegisterSignal[T] is the only public injection path), this can only occur if a signal name was registered with a different T than the caller requests — i.e. programmer error, surfaced loudly per Go convention.

func Validate

func Validate(v interface{}) error

Validate checks struct fields against validate tags. Supported tags: required, min=N, max=N, minlen=N, maxlen=N, oneof=a|b|c

func ValidateActivityFunc

func ValidateActivityFunc(fn ActivityFunc) error

ValidateActivityFunc checks if a function has a valid activity signature. Valid signatures are: func(context.Context, I) (O, error)

func WrapFlowError

func WrapFlowError(flowName, stepName, nodeName string, input interface{}, err error) error

WrapFlowError wraps an error with flow execution context. Returns nil if err is nil.

func WrapNodeError

func WrapNodeError(nodeName string, input interface{}, err error) error

WrapNodeError wraps an error with node execution context. Returns nil if err is nil.

Types

type ActivityFunc

type ActivityFunc interface{}

ActivityFunc is the function signature for all activity implementations. Using a concrete type instead of `any` for type safety.

type ActivityMeta

type ActivityMeta struct {
	Name        string
	Description string
	Function    ActivityFunc
}

ActivityMeta describes a single activity for registration and discovery.

type ActivityOptions

type ActivityOptions struct {
	RetryPolicy         *RetryPolicy
	StartToCloseTimeout time.Duration
	HeartbeatTimeout    time.Duration
	TaskQueue           string
}

ActivityOptions configures retry and timeout behavior for a node.

func DefaultActivityOptions

func DefaultActivityOptions() ActivityOptions

DefaultActivityOptions returns sensible defaults for activity execution.

type ActivityRegistrar

type ActivityRegistrar interface {
	RegisterActivities(w worker.Worker)
}

ActivityRegistrar is implemented by types that can register their activities.

type BaseProvider

type BaseProvider struct {
	ProviderName    string
	ProviderVersion string
	ActivityList    []ActivityMeta
	RateLimiter     *SharedRateLimiter
	// contains filtered or unexported fields
}

BaseProvider provides a default implementation of the Provider interface. Embed this in provider implementations to reduce boilerplate.

func NewProvider

func NewProvider(name, version string) *BaseProvider

NewProvider creates a new base provider with the given name and version.

func (*BaseProvider) Activities

func (p *BaseProvider) Activities() []ActivityMeta

Activities returns the list of activities.

func (*BaseProvider) AddActivity

func (p *BaseProvider) AddActivity(name string, fn ActivityFunc) *BaseProvider

AddActivity adds an activity to the provider.

func (*BaseProvider) AddActivityWithDescription

func (p *BaseProvider) AddActivityWithDescription(name, description string, fn ActivityFunc) *BaseProvider

AddActivityWithDescription adds an activity with a description.

func (*BaseProvider) GetRateLimiter

func (p *BaseProvider) GetRateLimiter() *SharedRateLimiter

GetRateLimiter returns the provider's rate limiter, if configured.

func (*BaseProvider) HealthCheck

func (p *BaseProvider) HealthCheck(ctx context.Context) error

HealthCheck verifies provider connectivity and configuration. Returns nil if healthy, or an error describing the failure.

func (*BaseProvider) Name

func (p *BaseProvider) Name() string

Name returns the provider name.

func (*BaseProvider) Version

func (p *BaseProvider) Version() string

Version returns the provider version.

func (*BaseProvider) WithHealthCheck

func (p *BaseProvider) WithHealthCheck(fn HealthCheckFunc) *BaseProvider

WithHealthCheck configures a health check function for the provider. The health check is called during worker startup to validate configuration.

Example:

provider := core.NewProvider("jira", "1.0.0").
    WithHealthCheck(func(ctx context.Context) error {
        _, err := client.GetServerInfo(ctx)
        return err
    })

func (*BaseProvider) WithRateLimit

func (p *BaseProvider) WithRateLimit(requests int, per time.Duration) *BaseProvider

WithRateLimit configures a shared rate limiter for all activities in this provider. All activities registered through this provider will share the same rate limit, which is useful for preventing overwhelming external APIs.

Example:

jiraProvider := jira.NewProvider().WithRateLimit(100, time.Minute)

type ChildFlowConfig

type ChildFlowConfig struct {
	Flow        *Flow
	InputMapper func(*FlowState) []FlowInput
	Sequential  bool
}

ChildFlowConfig defines how child workflows are spawned from a parent flow.

type ChildFlowNode

type ChildFlowNode struct {
	// contains filtered or unexported fields
}

ChildFlowNode spawns child workflows from a parent flow.

func NewChildFlowNode

func NewChildFlowNode(name string, config ChildFlowConfig) *ChildFlowNode

NewChildFlowNode creates a child flow node with the given name and configuration.

func (*ChildFlowNode) As

func (c *ChildFlowNode) As(key string) *ChildFlowNode

As names the output of this child flow node for reference by downstream nodes.

func (*ChildFlowNode) Compensate

func (c *ChildFlowNode) Compensate(_ workflow.Context, _ *FlowState) error

Compensate is a no-op for child flow nodes.

func (*ChildFlowNode) Compensation

func (c *ChildFlowNode) Compensation() ExecutableNode

Compensation returns nil.

func (*ChildFlowNode) Execute

func (c *ChildFlowNode) Execute(ctx workflow.Context, state *FlowState) error

Execute spawns child workflows using workflow.ExecuteChildWorkflow. Parallel by default; sequential if config.Sequential is true.

func (*ChildFlowNode) HasCompensation

func (c *ChildFlowNode) HasCompensation() bool

HasCompensation returns false — child flow nodes have no compensation.

func (*ChildFlowNode) Input

func (c *ChildFlowNode) Input() interface{}

Input returns nil — child flow nodes derive input from InputMapper.

func (*ChildFlowNode) Name

func (c *ChildFlowNode) Name() string

Name returns the child flow node's identifier.

func (*ChildFlowNode) OutputKey

func (c *ChildFlowNode) OutputKey() string

OutputKey returns the key used to store child flow results in FlowState.

func (*ChildFlowNode) RateLimiterID

func (c *ChildFlowNode) RateLimiterID() string

RateLimiterID returns empty — child flows don't use rate limiting.

type ChildFlowResults

type ChildFlowResults struct {
	States []*FlowState
	Errors []error
	Count  int
}

ChildFlowResults holds the outcomes of all spawned child workflows.

type ClassifiedError

type ClassifiedError struct {
	Err  error
	Type ErrorType
}

ClassifiedError wraps an error with its classification.

func (*ClassifiedError) Error

func (e *ClassifiedError) Error() string

func (*ClassifiedError) IsRetryable

func (e *ClassifiedError) IsRetryable() bool

IsRetryable returns true if the error should be retried.

func (*ClassifiedError) Unwrap

func (e *ClassifiedError) Unwrap() error

type CompensationChain

type CompensationChain struct {
	// contains filtered or unexported fields
}

CompensationChain manages the ordered list of compensation entries. Compensations are executed in reverse order (LIFO) on failure.

func NewCompensationChain

func NewCompensationChain() *CompensationChain

NewCompensationChain creates an empty compensation chain.

func (*CompensationChain) Add

func (c *CompensationChain) Add(node ExecutableNode, state *FlowState)

Add records a new compensation entry.

func (*CompensationChain) Clear

func (c *CompensationChain) Clear()

Clear removes all compensation entries (called after successful completion).

func (*CompensationChain) Entries

func (c *CompensationChain) Entries() []CompensationEntry

Entries returns all compensation entries in execution order.

func (*CompensationChain) Len

func (c *CompensationChain) Len() int

Len returns the number of compensation entries.

func (*CompensationChain) Reversed

func (c *CompensationChain) Reversed() []CompensationEntry

Reversed returns compensation entries in reverse order for rollback.

type CompensationEntry

type CompensationEntry struct {
	// contains filtered or unexported fields
}

CompensationEntry records a node execution for potential rollback. Used by the Saga pattern to track which compensations need to run on failure.

type ConditionalBuilder

type ConditionalBuilder struct {
	// contains filtered or unexported fields
}

ConditionalBuilder provides a fluent API for building conditional branches.

func (*ConditionalBuilder) Else

Else switches to building the "else" branch (when predicate is false). After calling Else, subsequent Then/ThenParallel calls add to the else branch.

func (*ConditionalBuilder) EndWhen

func (cb *ConditionalBuilder) EndWhen() *FlowBuilder

EndWhen completes the conditional block and returns to the main flow builder. Use this when you don't need an else branch.

func (*ConditionalBuilder) Otherwise

func (cb *ConditionalBuilder) Otherwise(node ExecutableNode) *FlowBuilder

Otherwise adds a single node to the "else" branch and returns to the main flow builder. This is a convenience method for simple conditionals with a single else action.

func (*ConditionalBuilder) OtherwiseParallel

func (cb *ConditionalBuilder) OtherwiseParallel(name string, nodes ...ExecutableNode) *FlowBuilder

OtherwiseParallel adds parallel nodes to the "else" branch and returns to the main flow builder.

func (*ConditionalBuilder) Then

Then adds a sequential step to the "then" branch (when predicate is true).

func (*ConditionalBuilder) ThenParallel

func (cb *ConditionalBuilder) ThenParallel(name string, nodes ...ExecutableNode) *ConditionalBuilder

ThenParallel adds a parallel step to the current branch.

type ConditionalConfig

type ConditionalConfig struct {
	// contains filtered or unexported fields
}

ConditionalConfig defines the structure of a conditional step.

type ConfigError

type ConfigError struct {
	Field   string
	EnvVar  string
	Message string
}

ConfigError represents a configuration loading error.

func (*ConfigError) Error

func (e *ConfigError) Error() string

type ConfiguredPageFetcher

type ConfiguredPageFetcher[T any, C any] func(ctx context.Context, config C, cursor string) (PageResult[T], error)

ConfiguredPageFetcher is a page fetcher that also receives custom configuration.

type ContinueAsNewPolicy

type ContinueAsNewPolicy struct {
	AfterIterations   int
	AfterHistoryBytes int64
}

ContinueAsNewPolicy controls when the loop emits ContinueAsNew.

type CostEntry

type CostEntry struct {
	NodeName  string
	Model     string
	Provider  string
	TokensIn  int
	TokensOut int
	CostUSD   float64
	Duration  time.Duration
	Metadata  map[string]string
}

CostEntry represents a cost event derived from a node output (typically an LLM call). Emitted by user-supplied AfterNode hooks; the framework itself does not produce CostEntry values.

type Cursor

type Cursor struct {
	Source    string    `json:"source"`
	Position  string    `json:"position"`
	UpdatedAt time.Time `json:"updated_at"`
}

Cursor tracks incremental processing position for a data source.

func (Cursor) Time

func (c Cursor) Time() (time.Time, error)

Time parses the cursor position as a time.Time value.

func (Cursor) TimeOr

func (c Cursor) TimeOr(def time.Time) time.Time

TimeOr parses the cursor position as time.Time, returning def on error.

type CursorMarker

type CursorMarker struct {
	// contains filtered or unexported fields
}

CursorMarker is a special time value that acts as a placeholder for cursor resolution. The marker contains a unique ID that maps to the cursor source.

type CursorUpdateConfig

type CursorUpdateConfig struct {
	Source string
	Field  string
}

CursorUpdateConfig defines how a node updates a cursor after execution.

type DataRef

type DataRef struct {
	StorageKey string    `json:"storage_key"`
	Schema     string    `json:"schema"`
	Count      int       `json:"count"`
	Checksum   string    `json:"checksum,omitempty"`
	Backend    string    `json:"backend"`
	CreatedAt  time.Time `json:"created_at"`
}

DataRef is a reference to data stored in an external storage backend. It enables the Claim Check pattern for passing large data between activities without bloating Temporal's workflow history.

func NewDataRef

func NewDataRef(storageKey, schema, backend string, count int) DataRef

NewDataRef creates a new DataRef with the given parameters.

func OutputRef

func OutputRef(nodeKey string) DataRef

OutputRef creates a DataRef marker that resolves to a previous node's DataRef output. The framework replaces this with the actual DataRef at execution time.

Example:

ollama.BatchEmbed(ollama.BatchEmbedInput{
    DocumentsRef: core.OutputRef("jira_docs"),
})

func WindowOutput

func WindowOutput() DataRef

WindowOutput creates a DataRef marker that resolves to the current windowed batch's output. In windowed execution, the framework stores each batch's output under the "__window__" key.

func (DataRef) IsEmpty

func (r DataRef) IsEmpty() bool

IsEmpty returns true if the DataRef has no storage key.

func (DataRef) String

func (r DataRef) String() string

String returns a human-readable representation of the DataRef.

func (DataRef) Validate

func (r DataRef) Validate() error

Validate checks that the DataRef has required fields.

func (DataRef) WithChecksum

func (r DataRef) WithChecksum(data []byte) DataRef

WithChecksum adds a checksum to the DataRef for content verification.

type ErrorClassifier

type ErrorClassifier func(error) ErrorType

ErrorClassifier determines how an error should be handled for retries.

type ErrorType

type ErrorType int

ErrorType classifies errors for retry decisions.

const (
	// ErrorTypeRetryable indicates the error is transient (network timeouts, 5xx).
	ErrorTypeRetryable ErrorType = iota
	// ErrorTypeTerminal indicates a permanent failure (4xx auth/validation errors).
	ErrorTypeTerminal
	// ErrorTypeFatal indicates an unrecoverable error that should stop the workflow.
	ErrorTypeFatal
)

func HTTPErrorClassifier

func HTTPErrorClassifier(err error) ErrorType

HTTPErrorClassifier classifies errors based on HTTP status codes. 5xx and 429 are retryable, 4xx are terminal, others default to retryable.

func (ErrorType) String

func (t ErrorType) String() string

type ExecutableNode

type ExecutableNode interface {
	// Name returns the node's identifier.
	Name() string

	// OutputKey returns the key used to store this node's output in FlowState.
	OutputKey() string

	// Execute runs the node within a Temporal workflow context.
	Execute(ctx workflow.Context, state *FlowState) error

	// Compensate runs the compensation logic if configured (Saga pattern).
	Compensate(ctx workflow.Context, state *FlowState) error

	// HasCompensation returns true if this node has compensation logic.
	HasCompensation() bool

	// Compensation returns the compensation node, if any.
	Compensation() ExecutableNode

	// Input returns the node's input value (used for testing).
	// Returns any because the actual type is generic.
	Input() any

	// RateLimiterID returns the ID of the rate limiter for this node, if any.
	// Returns empty string if no rate limiting is configured.
	RateLimiterID() string
}

ExecutableNode is the interface implemented by all nodes that can be executed within a flow. This allows type-erased storage of generic nodes in flow steps.

type Flow

type Flow struct {
	// contains filtered or unexported fields
}

Flow defines an executable workflow composed of steps.

func (*Flow) Execute

func (f *Flow) Execute(ctx workflow.Context, input FlowInput) error

Execute runs the flow as a Temporal workflow.

func (*Flow) Hooks

func (f *Flow) Hooks() *FlowHooks

Hooks returns the flow's hook configuration, or nil if none set.

func (*Flow) Name

func (f *Flow) Name() string

Name returns the flow's identifier.

func (*Flow) Signals

func (f *Flow) Signals() []SignalDef

Signals returns the flow's registered signal definitions.

func (*Flow) StateConfig

func (f *Flow) StateConfig() *StateConfig

StateConfig returns the flow's state configuration, or nil for default.

func (*Flow) Steps

func (f *Flow) Steps() []Step

Steps returns the flow's execution steps.

func (*Flow) Trigger

func (f *Flow) Trigger() Trigger

Trigger returns the flow's trigger configuration.

type FlowBuilder

type FlowBuilder struct {
	// contains filtered or unexported fields
}

FlowBuilder provides a fluent API for constructing flows.

func NewFlow

func NewFlow(name string) *FlowBuilder

NewFlow creates a new flow builder with the given name.

func RegisterSignal

func RegisterSignal[T any](b *FlowBuilder, name string) *FlowBuilder

RegisterSignal[T] declares that this flow accepts a signal named `name` whose payload deserializes as T. The registered signal is wired at flow startup: a per-signal goroutine receives from the workflow signal channel directly into a value of type T (so Temporal's data converter does the typed deserialization), then injects it into FlowState's buffer for non-blocking consumption via TakeSignal[T] / TakeAllSignals[T] / PeekSignal[T].

Multiple signals with different payload types may be registered on the same flow. Calling RegisterSignal twice with the same name appends a second receive pump for that name — last write wins on conflicts; prefer to register each name once.

Pairs with the typed query helper WithQueryState[S]: both replace the old untyped Signals().Inject / Signals().Take APIs with type-parameterised access that fails at compile time on mismatch.

func ThenLoop

func ThenLoop[S any](b *FlowBuilder, name string, opts ...LoopOption[S]) *FlowBuilder

ThenLoop adds a Loop step to the builder under the given name.

func ThenParallelEach

func ThenParallelEach[I, O any](b *FlowBuilder, name string, opts ...ParallelEachOption[I, O]) *FlowBuilder

ThenParallelEach adds a ParallelEach step to the builder under the given name. It is a generic free function because Go does not allow generic methods on non-generic types; call sites read flow.ThenParallelEach(b, "name", ...).

func WithQueryState

func WithQueryState[S any](b *FlowBuilder, name, key string) *FlowBuilder

WithQueryState registers a query handler that returns the latest typed value of S persisted at the given StateKey. Pairs with Loop[S]: the Loop runner writes S to its StateKey on every transition (init, before/after each iteration, finalize), so this query always observes a current value or the zero value of S if no Loop has written yet.

Use this in place of capturing a *S in a closure mutated from AfterIteration; that pattern bypasses the read-only FlowStateReader contract and forces every consumer to maintain a parallel pointer.

func (*FlowBuilder) Build

func (b *FlowBuilder) Build() *Flow

Build validates and returns the constructed flow.

func (*FlowBuilder) Then

func (b *FlowBuilder) Then(node ExecutableNode) *FlowBuilder

Then adds a sequential step with a single node.

func (*FlowBuilder) ThenChildren

func (b *FlowBuilder) ThenChildren(name string, config ChildFlowConfig) *FlowBuilder

ThenChildren adds a step that spawns child workflows. Each child receives input from the InputMapper applied to the parent's FlowState.

func (*FlowBuilder) ThenGate

func (b *FlowBuilder) ThenGate(name string, config GateConfig) *FlowBuilder

ThenGate adds a gate step that pauses the flow until an external signal is received. The gate waits for a Temporal signal matching config.SignalName carrying a GateResult.

func (*FlowBuilder) ThenParallel

func (b *FlowBuilder) ThenParallel(name string, nodes ...ExecutableNode) *FlowBuilder

ThenParallel adds a parallel step with multiple nodes executed concurrently.

func (*FlowBuilder) TriggeredBy

func (b *FlowBuilder) TriggeredBy(t Trigger) *FlowBuilder

TriggeredBy sets the trigger that initiates this flow.

func (*FlowBuilder) When

func (b *FlowBuilder) When(pred Predicate) *ConditionalBuilder

When starts a conditional branch based on a predicate. The predicate is evaluated at runtime against the current FlowState.

Example:

flow := NewFlow("order-flow").
    TriggeredBy(Manual("test")).
    Then(fetchOrder).
    When(func(s *FlowState) bool {
        return Get[Order](s, "order").Total > 1000
    }).
    Then(requireApproval).
    Otherwise(autoApprove).
    Then(fulfillOrder).
    Build()

func (*FlowBuilder) WithHooks

func (b *FlowBuilder) WithHooks(hooks *FlowHooks) *FlowBuilder

WithHooks attaches lifecycle callbacks to the flow. Hooks fire at flow, step, and node boundaries with structured context.

func (*FlowBuilder) WithQuery

func (b *FlowBuilder) WithQuery(name string, handler any) *FlowBuilder

WithQuery registers a query handler for this flow. Query handlers are installed at workflow startup alongside signals.

func (*FlowBuilder) WithState

func (b *FlowBuilder) WithState(cfg StateConfig) *FlowBuilder

WithState overrides the default state backend (.resolute/). Use this to configure cloud storage backends (S3, GCS) for production.

type FlowError

type FlowError struct {
	FlowName string
	StepName string
	NodeName string
	Input    interface{}
	Cause    error
}

FlowError provides rich context for errors that occur during flow execution.

func (*FlowError) Error

func (e *FlowError) Error() string

func (*FlowError) Unwrap

func (e *FlowError) Unwrap() error

type FlowHooks

type FlowHooks struct {
	BeforeFlow func(HookContext, FlowStateReader)
	AfterFlow  func(HookContext, FlowStateReader)
	BeforeStep func(HookContext, FlowStateReader)
	AfterStep  func(HookContext, FlowStateReader)
	BeforeNode func(HookContext, FlowStateReader)
	AfterNode  func(HookContext, FlowStateReader)
}

FlowHooks defines observability callbacks invoked at flow, step, and node execution boundaries. All callbacks are optional — nil callbacks are safely skipped.

Hooks run inside Temporal's deterministic workflow context. They must NOT perform I/O directly. They observe; they do not mutate. The FlowStateReader argument is read-only by design — mutations belong in Step execution and Iteration callbacks.

For side effects (e.g. writing events to Postgres), enqueue activities from within the callback.

type FlowInput

type FlowInput struct {
	Data map[string][]byte
}

FlowInput contains the initial input to a flow execution.

type FlowState

type FlowState struct {
	// contains filtered or unexported fields
}

FlowState carries data through workflow execution. It holds input data, activity outputs, cursor state, and buffered signals for incremental processing.

func NewFlowState

func NewFlowState(input FlowInput) *FlowState

NewFlowState creates a new flow state with the given input.

func (*FlowState) ChildWorkflowID

func (s *FlowState) ChildWorkflowID(nodeName string) (string, bool)

ChildWorkflowID returns the child workflow ID for a node, if known.

func (*FlowState) ChildWorkflows

func (s *FlowState) ChildWorkflows() map[string]string

ChildWorkflows returns a copy of all registered child workflow IDs.

func (*FlowState) GetCursor

func (s *FlowState) GetCursor(source string) Cursor

GetCursor returns the cursor for a data source.

func (*FlowState) GetInputData

func (s *FlowState) GetInputData(key string) ([]byte, bool)

GetInputData retrieves raw input data by key. Returns the byte slice and whether the key was found.

func (*FlowState) GetResult

func (s *FlowState) GetResult(key string) any

GetResult retrieves a raw result by key. Returns any because activity result types vary; prefer typed Get[T]() for compile-time type safety.

func (*FlowState) GetWindowMeta

func (s *FlowState) GetWindowMeta() WindowMeta

GetWindowMeta returns the current window meta. Returns zero WindowMeta if not set.

func (*FlowState) HasResult

func (s *FlowState) HasResult(key string) bool

HasResult returns true if the key exists in state results, even if the stored value is nil. Satisfies the FlowStateReader interface.

func (*FlowState) LoadPersisted

func (s *FlowState) LoadPersisted(ctx workflow.Context, flowName string, cfg *StateConfig) error

LoadPersisted loads persisted state (cursors) from the configured backend.

func (*FlowState) NewBatchState

func (s *FlowState) NewBatchState() *FlowState

NewBatchState creates an isolated state for a windowed batch. Inherits input and cursors from parent. Results, window meta, and signals start empty.

func (*FlowState) RegisterChildWorkflow

func (s *FlowState) RegisterChildWorkflow(nodeName, workflowID string)

RegisterChildWorkflow records a child workflow ID for a node.

func (*FlowState) SavePersisted

func (s *FlowState) SavePersisted(ctx workflow.Context, flowName string, cfg *StateConfig) error

SavePersisted saves persisted state (cursors) to the configured backend.

func (*FlowState) SetCursor

func (s *FlowState) SetCursor(source, position string)

SetCursor updates the cursor for a data source.

func (*FlowState) SetInputData

func (s *FlowState) SetInputData(key string, data []byte)

SetInputData stores raw input data by key. Nodes can call this to inject context (e.g. previous agent responses) that downstream template markers like {{input:key}} can resolve against.

func (*FlowState) SetResult

func (s *FlowState) SetResult(key string, value any)

SetResult stores a result by key. Accepts any because activity result types vary; type verification happens at retrieval via Get[T]().

func (*FlowState) SetWindowMeta

func (s *FlowState) SetWindowMeta(cursor string, size int)

SetWindowMeta sets the ephemeral window cursor and size for the current batch.

func (*FlowState) Snapshot

func (s *FlowState) Snapshot() *FlowState

Snapshot creates a copy of the current state for compensation.

type FlowStateReader

type FlowStateReader interface {
	// GetResult returns a raw result by key, or nil if absent. Prefer the
	// typed Get[T] / GetSafe[T] free functions for type-safe access.
	// Note: nil is also returned for keys present with a nil value; use
	// HasResult to distinguish presence from nil-value.
	GetResult(key string) any

	// HasResult returns true if the key exists in results, regardless of
	// whether its stored value is nil.
	HasResult(key string) bool

	// GetInputData returns raw input data by key.
	GetInputData(key string) ([]byte, bool)

	// GetCursor returns the cursor for a data source.
	GetCursor(source string) Cursor
}

FlowStateReader is the read-only view of FlowState exposed to observability hooks. Mutations belong in Step execution and Iteration callbacks; hooks observe but never write. *FlowState satisfies this interface.

type FlowTemplate

type FlowTemplate struct {
	// contains filtered or unexported fields
}

FlowTemplate constructs flows dynamically from runtime configuration. Unlike FlowBuilder's compile-time fluent chain, FlowTemplate accepts step definitions that can vary per execution (e.g. different iteration phases running different subsets of steps).

func NewFlowTemplate

func NewFlowTemplate(name string) *FlowTemplate

NewFlowTemplate creates a new template for dynamic flow construction.

func (*FlowTemplate) AddChildren

func (ft *FlowTemplate) AddChildren(name string, config ChildFlowConfig) *FlowTemplate

AddChildren adds a child workflow spawning step.

func (*FlowTemplate) AddConditional

func (ft *FlowTemplate) AddConditional(pred Predicate, thenSteps []ExecutableNode, elseSteps []ExecutableNode) *FlowTemplate

AddConditional adds a conditional branch evaluated at runtime.

func (*FlowTemplate) AddGate

func (ft *FlowTemplate) AddGate(name string, config GateConfig) *FlowTemplate

AddGate adds a gate step that pauses until a signal is received.

func (*FlowTemplate) AddParallel

func (ft *FlowTemplate) AddParallel(name string, nodes ...ExecutableNode) *FlowTemplate

AddParallel adds a parallel step with multiple nodes.

func (*FlowTemplate) AddStep

func (ft *FlowTemplate) AddStep(node ExecutableNode) *FlowTemplate

AddStep adds a sequential step with a single node.

func (*FlowTemplate) Build

func (ft *FlowTemplate) Build() *Flow

Build validates and returns a Flow from the template.

func (*FlowTemplate) TriggeredBy

func (ft *FlowTemplate) TriggeredBy(t Trigger) *FlowTemplate

TriggeredBy sets the trigger for the flow.

func (*FlowTemplate) WithHooks

func (ft *FlowTemplate) WithHooks(hooks *FlowHooks) *FlowTemplate

WithHooks attaches lifecycle callbacks.

func (*FlowTemplate) WithState

func (ft *FlowTemplate) WithState(cfg StateConfig) *FlowTemplate

WithState sets the state persistence configuration.

type FlowTester

type FlowTester struct {
	// contains filtered or unexported fields
}

FlowTester provides a test harness for running flows without Temporal. It allows mocking activity implementations and asserting on execution.

func NewFlowTester

func NewFlowTester() *FlowTester

NewFlowTester creates a new test harness.

func (*FlowTester) AssertCallCount

func (t *FlowTester) AssertCallCount(tb TestingT, nodeName string, expected int)

AssertCallCount asserts that a node was called exactly n times.

func (*FlowTester) AssertCalled

func (t *FlowTester) AssertCalled(tb TestingT, nodeName string)

AssertCalled asserts that a node was called at least once.

func (*FlowTester) AssertNotCalled

func (t *FlowTester) AssertNotCalled(tb TestingT, nodeName string)

AssertNotCalled asserts that a node was not called.

func (*FlowTester) CallArgs

func (t *FlowTester) CallArgs(nodeName string) []any

CallArgs returns all arguments passed to a node across all calls.

func (*FlowTester) CallCount

func (t *FlowTester) CallCount(nodeName string) int

CallCount returns how many times a node was called.

func (*FlowTester) LastCallArg

func (t *FlowTester) LastCallArg(nodeName string) any

LastCallArg returns the argument from the last call to a node. Returns nil if the node was never called.

func (*FlowTester) Mock

func (t *FlowTester) Mock(nodeName string, fn any) *FlowTester

Mock registers a mock implementation for a node by name. The mock function should have signature: func(I) (O, error) where I and O match the node's input/output types.

Example:

tester.Mock("jira.FetchIssues", func(input jira.FetchIssuesInput) (jira.FetchIssuesOutput, error) {
    return jira.FetchIssuesOutput{Count: 5}, nil
})

func (*FlowTester) MockChildFlow

func (t *FlowTester) MockChildFlow(name string, fn func(*FlowState) (*FlowState, error)) *FlowTester

MockChildFlow registers a mock for a child flow node by name. The function receives parent state and returns child state and error.

func (*FlowTester) MockError

func (t *FlowTester) MockError(nodeName string, err error) *FlowTester

MockError registers a mock that always returns an error.

Example:

tester.MockError("jira.FetchIssues", errors.New("api error"))

func (*FlowTester) MockGate

func (t *FlowTester) MockGate(gateName string, result GateResult) *FlowTester

MockGate registers a mock resolution for a gate by name. When the tester encounters this gate, it immediately resolves with the given result.

func (*FlowTester) MockValue

func (t *FlowTester) MockValue(nodeName string, value any) *FlowTester

MockValue registers a mock that always returns a fixed value.

Example:

tester.MockValue("jira.FetchIssues", jira.FetchIssuesOutput{Count: 5})

func (*FlowTester) Reset

func (t *FlowTester) Reset()

Reset clears all call tracking (but keeps mocks).

func (*FlowTester) ResetAll

func (t *FlowTester) ResetAll()

ResetAll clears all mocks and call tracking.

func (*FlowTester) Run

func (t *FlowTester) Run(flow *Flow, input FlowInput) (*FlowState, error)

Run executes the flow synchronously with mocked activities. Returns the final FlowState and any error encountered.

func (*FlowTester) RunWithContext

func (t *FlowTester) RunWithContext(ctx context.Context, flow *Flow, input FlowInput) (*FlowState, error)

RunWithContext executes the flow with a context for cancellation support.

func (*FlowTester) WasCalled

func (t *FlowTester) WasCalled(nodeName string) bool

WasCalled returns true if the node was called at least once.

func (*FlowTester) WithRateLimiting

func (t *FlowTester) WithRateLimiting() *FlowTester

WithRateLimiting enables rate limiting during test execution. By default, rate limiting is disabled in tests for faster execution. Enable this when you want to test rate limiting behavior.

type GateConfig

type GateConfig struct {
	SignalName string
	Timeout    time.Duration
}

GateConfig defines how a gate node pauses and resumes.

type GateNode

type GateNode struct {
	// contains filtered or unexported fields
}

GateNode pauses a flow until an external signal is received. It implements ExecutableNode so it can be placed in a Step.

func NewGateNode

func NewGateNode(name string, config GateConfig) *GateNode

NewGateNode creates a gate node with the given name and configuration.

func (*GateNode) As

func (g *GateNode) As(key string) *GateNode

As names the output of this gate for reference by downstream nodes.

func (*GateNode) Compensate

func (g *GateNode) Compensate(_ workflow.Context, _ *FlowState) error

Compensate is a no-op for gates.

func (*GateNode) Compensation

func (g *GateNode) Compensation() ExecutableNode

Compensation returns nil — gates have no compensation node.

func (*GateNode) Execute

func (g *GateNode) Execute(ctx workflow.Context, state *FlowState) error

Execute waits for a Temporal signal carrying a GateResult. If a timeout is configured, returns GateTimeoutError on expiry.

func (*GateNode) HasCompensation

func (g *GateNode) HasCompensation() bool

HasCompensation returns false — gates have no compensation.

func (*GateNode) Input

func (g *GateNode) Input() interface{}

Input returns nil — gates have no input value.

func (*GateNode) Name

func (g *GateNode) Name() string

Name returns the gate's identifier.

func (*GateNode) OutputKey

func (g *GateNode) OutputKey() string

OutputKey returns the key used to store the gate result in FlowState.

func (*GateNode) RateLimiterID

func (g *GateNode) RateLimiterID() string

RateLimiterID returns empty — gates don't use rate limiting.

type GateResult

type GateResult struct {
	Approved  bool
	Decision  string
	DecidedBy string
	DecidedAt time.Time
	Reason    string
	Metadata  map[string]string
}

GateResult carries the outcome of a gate decision.

type GateTimeoutError

type GateTimeoutError struct {
	GateName string
	Timeout  time.Duration
}

GateTimeoutError indicates a gate was not resolved within the configured timeout.

func (*GateTimeoutError) Error

func (e *GateTimeoutError) Error() string

type HTTPStatusError

type HTTPStatusError interface {
	StatusCode() int
}

HTTPStatusError is implemented by errors that have an HTTP status code.

type HealthCheckFunc

type HealthCheckFunc func(ctx context.Context) error

HealthCheckFunc is the signature for provider health check functions.

type HealthServer

type HealthServer struct {
	// contains filtered or unexported fields
}

func NewHealthServer

func NewHealthServer() *HealthServer

func (*HealthServer) Handler

func (h *HealthServer) Handler() http.Handler

func (*HealthServer) IsReady

func (h *HealthServer) IsReady() bool

func (*HealthServer) IsStarted

func (h *HealthServer) IsStarted() bool

func (*HealthServer) SetReady

func (h *HealthServer) SetReady(ready bool)

func (*HealthServer) SetStarted

func (h *HealthServer) SetStarted(started bool)

func (*HealthServer) Shutdown

func (h *HealthServer) Shutdown(ctx context.Context) error

func (*HealthServer) Start

func (h *HealthServer) Start(addr string) error

func (*HealthServer) StartAsync

func (h *HealthServer) StartAsync(addr string) error

type HealthStatus

type HealthStatus struct {
	Status    string    `json:"status"`
	Timestamp time.Time `json:"timestamp"`
}

type HookContext

type HookContext struct {
	FlowName string
	StepName string
	NodeName string
	Duration time.Duration // populated in After* callbacks only
	Error    error         // populated in After* callbacks only
}

HookContext carries structured context about the current execution point.

type ItemActivity

type ItemActivity[I, O any] interface {
	Execute(ctx workflow.Context, in I) (O, error)
}

ItemActivity is what the user provides as the per-item activity. Typed in I/O so no `any` appears at the user boundary. Production implementations invoke workflow.ExecuteActivity; tests can run synchronously.

type ItemContext

type ItemContext struct {
	StepName string
	Index    int
	Duration time.Duration // populated in AfterItem only
}

ItemContext carries structured context about a per-item execution within a ParallelEach step.

type ItemErrorMode

type ItemErrorMode int

ItemErrorMode controls how ParallelEach handles per-item failures.

const (
	// ItemErrorContinue: failed items get a zero-value O; Merge sees the
	// full ordered result list. Suitable for LLM tool dispatch where partial
	// failures are normal.
	ItemErrorContinue ItemErrorMode = iota
	// ItemErrorFailFast: first failure cancels remaining items and the
	// error propagates from runParallelEach.
	ItemErrorFailFast
)

type IterationEvent

type IterationEvent struct {
	Phase string // "started" | "completed"
	N     int    // 1-indexed iteration number
}

IterationEvent is the payload delivered to an IterationHook.

type LocalStorage

type LocalStorage struct {
	// contains filtered or unexported fields
}

LocalStorage implements StorageBackend using the local filesystem.

func DefaultLocalStorage

func DefaultLocalStorage() (*LocalStorage, error)

DefaultLocalStorage returns a LocalStorage using .resolute/data directory.

func NewLocalStorage

func NewLocalStorage(basePath string) (*LocalStorage, error)

NewLocalStorage creates a LocalStorage with the given base path.

func (*LocalStorage) Backend

func (s *LocalStorage) Backend() string

Backend returns the backend identifier.

func (*LocalStorage) Delete

func (s *LocalStorage) Delete(ctx context.Context, ref DataRef) error

Delete removes a local file.

func (*LocalStorage) Load

func (s *LocalStorage) Load(ctx context.Context, ref DataRef) ([]byte, error)

Load retrieves data from a local file.

func (*LocalStorage) Store

func (s *LocalStorage) Store(ctx context.Context, schema string, data []byte) (DataRef, error)

Store saves data to a local file and returns a DataRef.

type LoopExitReason

type LoopExitReason string

LoopExitReason identifies why a Loop terminated.

const (
	LoopExitCondition     LoopExitReason = "condition"
	LoopExitMaxIterations LoopExitReason = "max_iterations"
	LoopExitCancelled     LoopExitReason = "cancelled"
	LoopExitContinueAsNew LoopExitReason = "continue_as_new"
)

type LoopOption

type LoopOption[S any] func(*loopConfig[S])

LoopOption configures a Loop. Apply options via the Loop or ThenLoop constructors.

func AfterIteration

func AfterIteration[S any](fn func(s S, fs FlowStateReader) S) LoopOption[S]

AfterIteration installs a callback that runs at the end of every iteration with read-only FlowStateReader access. Use it to merge Step outputs into S. Returns the next S (written back to the StateKey).

Runs in workflow code; must be deterministic.

func BeforeIteration

func BeforeIteration[S any](fn func(s S, fs *FlowState) (S, bool)) LoopOption[S]

BeforeIteration installs a callback that runs at the start of every iteration with mutating *FlowState access. Use it to drain signals, perform pre-flight checks, and update S in workflow code. Returning skipBody=true skips the Steps body for this iteration but still runs AfterIteration and the While predicate (useful for cancel signals).

Runs in workflow code; must be deterministic.

func ContinueAsNewAfter

func ContinueAsNewAfter[S any](p ContinueAsNewPolicy) LoopOption[S]

ContinueAsNewAfter installs a continue-as-new policy. Default: never. Reserved; consumed when CAN emission lands in a future task.

func FinalState

func FinalState[S any](inject func(*FlowState, S)) LoopOption[S]

FinalState installs an optional callback that runs once after the last iteration to write the final S anywhere the caller wants (e.g. a different FlowState key for downstream Steps).

func InitialState

func InitialState[S any](extract func(*FlowState) S) LoopOption[S]

InitialState supplies the function that produces the seed S for the loop. Called once before the first iteration; the result is written to the FlowState key and threaded into iteration 1's BeforeIteration / Steps. Required.

func IterationHook

func IterationHook[S any](h func(IterationEvent)) LoopOption[S]

IterationHook installs a low-detail per-iteration callback for "iteration N started/completed" event emission. Distinct from FlowHooks (which observe Step/Node lifecycle) and from BeforeIteration/AfterIteration (which thread state). Runs in workflow code; must be deterministic.

func MaxIterations

func MaxIterations[S any](n int) LoopOption[S]

MaxIterations sets the hard ceiling on iteration count. Default 1000.

func StateKey

func StateKey[S any](key string) LoopOption[S]

StateKey sets the FlowState key where the typed state S is persisted across iterations. Required. Reads/writes happen at the start and end of each iteration; downstream code can also read it via core.Get[S].

func Steps

func Steps[S any](steps ...Step) LoopOption[S]

Steps sets the per-iteration Steps body. Required. Each iteration runs these Steps in order through the standard step executor, so observability hooks (BeforeStep / AfterStep / BeforeNode / AfterNode) fire normally.

func While

func While[S any](cond func(S) bool) LoopOption[S]

While sets the continuation predicate. Evaluated after each iteration (after AfterIteration). Must be deterministic over S.

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

Metrics provides the global metrics recording interface.

type MetricsExporter

type MetricsExporter interface {
	CounterInc(name string, labels map[string]string)
	HistogramObserve(name string, value float64, labels map[string]string)
}

MetricsExporter defines the interface for exporting metrics to backends. Implementations can target Prometheus, OpenTelemetry, or other systems.

func GetMetricsExporter

func GetMetricsExporter() MetricsExporter

GetMetricsExporter returns the currently configured metrics exporter.

type Node

type Node[I, O any] struct {
	// contains filtered or unexported fields
}

Node represents a single Temporal Activity with typed input/output. Provider functions return ready-to-use nodes with direct struct inputs.

func NewNode

func NewNode[I, O any](name string, activity func(context.Context, I) (O, error), input I) *Node[I, O]

NewNode creates a node wrapping an activity function. This is typically called by provider packages, not directly by users.

func NewNodeByName

func NewNodeByName[I, O any](name string, activityName string, input I) *Node[I, O]

NewNodeByName creates a node that executes an activity by its registered name. Use this when the activity is registered with the worker by name (e.g., factory-created closures).

func Paginate

func Paginate[T any](name string, fetcher PageFetcher[T], opts ...PaginationOption) *Node[PaginateInput, PaginateOutput[T]]

Paginate creates a Node that fetches all pages from a paginated source. The fetcher function is called repeatedly until HasMore is false or MaxPages is reached.

Example usage in a provider:

func FetchAllIssues(input FetchAllIssuesInput) *core.Node[core.PaginateInput, core.PaginateOutput[Issue]] {
    fetcher := func(ctx context.Context, cursor string) (core.PageResult[Issue], error) {
        startAt, _ := strconv.Atoi(cursor)
        result, err := client.SearchJQL(ctx, input.JQL, startAt, 100)
        if err != nil {
            return core.PageResult[Issue]{}, err
        }
        nextCursor := ""
        hasMore := startAt+len(result.Issues) < result.Total
        if hasMore {
            nextCursor = strconv.Itoa(startAt + len(result.Issues))
        }
        return core.PageResult[Issue]{Items: result.Issues, NextCursor: nextCursor, HasMore: hasMore}, nil
    }
    return core.Paginate("jira.FetchAllIssues", fetcher)
}

func PaginateWithConfig

func PaginateWithConfig[T any, C any](
	name string,
	fetcher ConfiguredPageFetcher[T, C],
	opts ...PaginationOption,
) *Node[PaginateWithInputParams[C], PaginateWithInputOutput[T, C]]

PaginateWithConfig creates a Node that fetches all pages with custom configuration. This is useful when the fetcher needs API credentials, filters, etc.

Example:

type JiraConfig struct {
    BaseURL  string
    APIToken string
    JQL      string
}

func FetchAllIssues() *core.Node[core.PaginateWithInputParams[JiraConfig], core.PaginateWithInputOutput[Issue, JiraConfig]] {
    return core.PaginateWithConfig[Issue, JiraConfig]("jira.FetchAllIssues",
        func(ctx context.Context, cfg JiraConfig, cursor string) (core.PageResult[Issue], error) {
            // ... fetch logic using cfg
        })
}

func (*Node[I, O]) As

func (n *Node[I, O]) As(outputKey string) *Node[I, O]

As names the output of this node for reference by downstream nodes.

func (*Node[I, O]) Compensate

func (n *Node[I, O]) Compensate(ctx workflow.Context, state *FlowState) error

Compensate runs the compensation activity if one is configured.

func (*Node[I, O]) Compensation

func (n *Node[I, O]) Compensation() ExecutableNode

Compensation returns the compensation node, if any.

func (*Node[I, O]) Execute

func (n *Node[I, O]) Execute(ctx workflow.Context, state *FlowState) error

Execute runs the activity within a Temporal workflow context.

func (*Node[I, O]) HasCompensation

func (n *Node[I, O]) HasCompensation() bool

HasCompensation returns true if this node has a compensation handler.

func (*Node[I, O]) Input

func (n *Node[I, O]) Input() any

Input returns the node's input value (used for testing).

func (*Node[I, O]) Name

func (n *Node[I, O]) Name() string

Name returns the node's identifier.

func (*Node[I, O]) OnError

func (n *Node[I, O]) OnError(compensation ExecutableNode) *Node[I, O]

OnError attaches a compensation node to run if subsequent steps fail (Saga pattern).

func (*Node[I, O]) OutputKey

func (n *Node[I, O]) OutputKey() string

OutputKey returns the key used to store this node's output.

func (*Node[I, O]) RateLimiterID

func (n *Node[I, O]) RateLimiterID() string

RateLimiterID returns the rate limiter ID for this node.

func (*Node[I, O]) WindowConfig

func (n *Node[I, O]) WindowConfig() Window

WindowConfig returns the window configuration. Returns zero Window if not set.

func (*Node[I, O]) WithCursorUpdate

func (n *Node[I, O]) WithCursorUpdate(source, field string) *Node[I, O]

WithCursorUpdate configures the node to update a cursor after successful execution. The named field is extracted from the activity output and persisted as the cursor position.

func (*Node[I, O]) WithErrorClassifier

func (n *Node[I, O]) WithErrorClassifier(fn ErrorClassifier) *Node[I, O]

WithErrorClassifier sets a function to classify errors for retry decisions. Terminal errors are marked as non-retryable for Temporal.

Example:

node.WithErrorClassifier(core.HTTPErrorClassifier)

func (*Node[I, O]) WithRateLimit

func (n *Node[I, O]) WithRateLimit(requests int, per time.Duration) *Node[I, O]

WithRateLimit configures rate limiting for this node. requests is the maximum number of requests allowed per duration. The rate limiter is unique to this node instance.

Example:

node := jira.FetchIssues(config).WithRateLimit(100, time.Minute)

func (*Node[I, O]) WithRetry

func (n *Node[I, O]) WithRetry(policy RetryPolicy) *Node[I, O]

WithRetry configures the retry policy for this node.

func (*Node[I, O]) WithSharedRateLimit

func (n *Node[I, O]) WithSharedRateLimit(limiter *SharedRateLimiter) *Node[I, O]

WithSharedRateLimit configures this node to use a shared rate limiter. Multiple nodes can share the same rate limiter to coordinate request rates.

Example:

limiter := core.NewSharedRateLimiter("jira-api", 100, time.Minute)
node1 := jira.FetchIssues(config).WithSharedRateLimit(limiter)
node2 := jira.SearchJQL(config).WithSharedRateLimit(limiter)

func (*Node[I, O]) WithTimeout

func (n *Node[I, O]) WithTimeout(d time.Duration) *Node[I, O]

WithTimeout sets the start-to-close timeout for this node.

func (*Node[I, O]) WithValidation

func (n *Node[I, O]) WithValidation() *Node[I, O]

WithValidation enables input validation using struct tags before execution. Validation tags: required, min=N, max=N, minlen=N, maxlen=N, oneof=a|b|c

Example:

type Input struct {
    Name string `validate:"required"`
    Age  int    `validate:"min=0,max=150"`
}

func (*Node[I, O]) WithWindow

func (n *Node[I, O]) WithWindow(w Window) *Node[I, O]

WithWindow configures batched/windowed processing for this node. When used in a parallel step, the framework runs the downstream pipeline per batch instead of waiting for all data.

type NodeError

type NodeError struct {
	NodeName string
	Input    interface{}
	Cause    error
}

NodeError provides context for errors that occur during node execution.

func (*NodeError) Error

func (e *NodeError) Error() string

func (*NodeError) Unwrap

func (e *NodeError) Unwrap() error

type NoopExporter

type NoopExporter struct{}

NoopExporter is a metrics exporter that does nothing. Useful for testing or when metrics are disabled.

func (*NoopExporter) CounterInc

func (n *NoopExporter) CounterInc(name string, labels map[string]string)

func (*NoopExporter) HistogramObserve

func (n *NoopExporter) HistogramObserve(name string, value float64, labels map[string]string)

type PageFetcher

type PageFetcher[T any] func(ctx context.Context, cursor string) (PageResult[T], error)

PageFetcher is the function signature for fetching a single page. It receives a cursor (empty string for first page) and returns a PageResult.

type PageResult

type PageResult[T any] struct {
	// Items are the results from this page.
	Items []T
	// NextCursor is the cursor to fetch the next page. Empty string means no more pages.
	NextCursor string
	// HasMore indicates whether there are more pages to fetch.
	HasMore bool
}

PageResult represents a single page of results from a paginated API.

type PaginateInput

type PaginateInput struct {
	// StartCursor is the initial cursor position (empty for start).
	StartCursor string
}

PaginateInput is the input for a paginated activity.

type PaginateOutput

type PaginateOutput[T any] struct {
	// Items are all collected items across all pages.
	Items []T
	// FinalCursor is the cursor after the last page fetched.
	FinalCursor string
	// PageCount is the number of pages fetched.
	PageCount int
	// TotalItems is the total count of items fetched.
	TotalItems int
}

PaginateOutput is the output of a paginated activity.

type PaginateWithInputOutput

type PaginateWithInputOutput[T any, C any] struct {
	PaginateOutput[T]
	Config C
}

PaginateWithInputOutput extends PaginateOutput with the original config.

type PaginateWithInputParams

type PaginateWithInputParams[C any] struct {
	// Config is the custom configuration passed to each page fetch.
	Config C
	// StartCursor is the initial cursor position.
	StartCursor string
}

PaginateWithInput creates a Paginate node that also passes through custom input. This is useful when the fetcher needs configuration beyond just the cursor.

type PaginationConfig

type PaginationConfig struct {
	// MaxPages limits the number of pages to fetch (0 = unlimited).
	MaxPages int
	// PageSize is a hint to the fetcher for items per page.
	PageSize int
	// CursorSource is the key for persisting cursor to FlowState between runs.
	// Empty string means no cursor persistence.
	CursorSource string
}

PaginationConfig configures pagination behavior.

type PaginationOption

type PaginationOption func(*PaginationConfig)

PaginationOption configures pagination behavior.

func WithCursorSource

func WithCursorSource(source string) PaginationOption

WithCursorSource enables cursor persistence to FlowState. The cursor is saved after fetching and restored on next run.

func WithMaxPages

func WithMaxPages(n int) PaginationOption

WithMaxPages limits the number of pages to fetch.

func WithPageSize

func WithPageSize(n int) PaginationOption

WithPageSize sets the page size hint.

type ParallelEachOption

type ParallelEachOption[I, O any] func(*parallelEachConfig[I, O])

ParallelEachOption configures a ParallelEach step. Typed in I and O.

func AfterItem

func AfterItem[I, O any](fn func(ItemContext, I, O, error, FlowStateReader)) ParallelEachOption[I, O]

AfterItem installs an observability callback fired once per item, immediately after the item activity completes (whether success or error). Receives the typed item I, the typed output O (zero-value on error), the activity error (nil on success), and a read-only FlowStateReader. Runs in workflow code; must be deterministic.

func BeforeItem

func BeforeItem[I, O any](fn func(ItemContext, I, FlowStateReader)) ParallelEachOption[I, O]

BeforeItem installs an observability callback fired once per item, immediately before the item activity starts. Receives the typed item I and a read-only FlowStateReader. Runs in workflow code; must be deterministic.

func From

func From[I, O any](fn func(*FlowState) []I) ParallelEachOption[I, O]

From sets the per-item input deriver. Required.

The closure must be deterministic over *FlowState — it runs in workflow code.

func MaxConcurrency

func MaxConcurrency[I, O any](n int) ParallelEachOption[I, O]

MaxConcurrency caps simultaneous activities. Default unbounded (0).

Useful for rate-limited downstream services.

func Merge

func Merge[I, O any](fn func(*FlowState, []O)) ParallelEachOption[I, O]

Merge sets the result folder. Required. Receives *FlowState and the ordered []O (length == len([]I)). On ContinueOnError, failed items produce zero-value O entries.

The closure must be deterministic over its inputs — it runs in workflow code.

func OnItemError

func OnItemError[I, O any](m ItemErrorMode) ParallelEachOption[I, O]

OnItemError controls failed-item handling. Default ItemErrorContinue.

func ParallelBody

func ParallelBody[I, O any](a ItemActivity[I, O]) ParallelEachOption[I, O]

ParallelBody sets the per-item activity. Required.

func PerItemRetry

func PerItemRetry[I, O any](p RetryPolicyOverride) ParallelEachOption[I, O]

PerItemRetry overrides the activity-default retry policy for each item.

Reserved; consumed when activity-options plumbing lands in a future task.

type PersistedState

type PersistedState struct {
	Cursors   map[string]Cursor `json:"cursors"`
	Metadata  map[string]string `json:"metadata,omitempty"`
	Version   int64             `json:"version"`
	UpdatedAt time.Time         `json:"updated_at"`
}

PersistedState is the data structure saved between workflow runs.

func (*PersistedState) MarshalJSON

func (p *PersistedState) MarshalJSON() ([]byte, error)

MarshalJSON implements json.Marshaler.

type Predicate

type Predicate func(*FlowState) bool

Predicate evaluates flow state to determine branching.

type PrometheusExporter

type PrometheusExporter struct {
	// contains filtered or unexported fields
}

PrometheusExporter implements MetricsExporter for Prometheus. It lazily creates and caches metric collectors on first use.

func NewPrometheusExporter

func NewPrometheusExporter() *PrometheusExporter

NewPrometheusExporter creates a new Prometheus metrics exporter. The exporter registers metrics with the default Prometheus registry.

func (*PrometheusExporter) CounterInc

func (e *PrometheusExporter) CounterInc(name string, labels map[string]string)

CounterInc increments a counter metric.

func (*PrometheusExporter) HistogramObserve

func (e *PrometheusExporter) HistogramObserve(name string, value float64, labels map[string]string)

HistogramObserve records a value in a histogram metric.

type Provider

type Provider interface {
	Name() string
	Version() string
	Activities() []ActivityMeta
	HealthCheck(ctx context.Context) error
}

Provider represents a collection of related activities. Providers are the building blocks of resolute workflows, each exposing a set of activities that can be used in flow definitions.

type ProviderRegistry

type ProviderRegistry struct {
	// contains filtered or unexported fields
}

ProviderRegistry tracks available providers and their activities.

func NewProviderRegistry

func NewProviderRegistry() *ProviderRegistry

NewProviderRegistry creates a new provider registry.

func (*ProviderRegistry) Get

func (r *ProviderRegistry) Get(name string) (Provider, bool)

Get returns a provider by name.

func (*ProviderRegistry) List

func (r *ProviderRegistry) List() []Provider

List returns all registered providers.

func (*ProviderRegistry) Register

func (r *ProviderRegistry) Register(p Provider) error

Register adds a provider to the registry. Returns an error if a provider with the same name is already registered.

func (*ProviderRegistry) RegisterActivities

func (r *ProviderRegistry) RegisterActivities(w worker.Worker, providerName string) error

RegisterActivities registers all activities from a provider with a Temporal worker.

func (*ProviderRegistry) RegisterAllActivities

func (r *ProviderRegistry) RegisterAllActivities(w worker.Worker)

RegisterAllActivities registers all activities from all providers with a Temporal worker.

type QueryDef

type QueryDef struct {
	Name    string
	Handler any                  // pre-built handler (WithQuery)
	Build   func(*FlowState) any // built per execution against live FlowState (WithQueryState)
}

QueryDef defines a query handler for a flow. Either Handler is set (the handler is constructed at builder time, by WithQuery) or Build is set (the handler is constructed at execute time with access to the live FlowState, by WithQueryState). Exactly one of the two is non-nil.

type RateLimitConfig

type RateLimitConfig struct {
	Requests int
	Per      time.Duration
}

RateLimitConfig holds rate limit configuration.

type RateLimiter

type RateLimiter interface {
	Wait(ctx context.Context) error
	TryAcquire() bool
}

RateLimiter controls the rate of operations.

type RecordActivityExecutionInput

type RecordActivityExecutionInput struct {
	NodeName string
	Duration time.Duration
	Err      error
}

RecordActivityExecutionInput holds parameters for recording activity execution metrics.

type RecordFlowExecutionInput

type RecordFlowExecutionInput struct {
	FlowName string
	Status   string
	Duration time.Duration
}

RecordFlowExecutionInput holds parameters for recording flow execution metrics.

type RecordRateLimitWaitInput

type RecordRateLimitWaitInput struct {
	LimiterID string
	WaitTime  time.Duration
}

RecordRateLimitWaitInput holds parameters for recording rate limit wait metrics.

type RecordStateOperationInput

type RecordStateOperationInput struct {
	Operation string
}

RecordStateOperationInput holds parameters for recording state operation metrics.

type RegisterActivitiesFunc

type RegisterActivitiesFunc func(w worker.Worker)

RegisterActivitiesFunc is a helper type for provider packages that want to provide a simple RegisterActivities function without the full Provider interface.

type RetryPolicy

type RetryPolicy struct {
	InitialInterval    time.Duration
	BackoffCoefficient float64
	MaximumInterval    time.Duration
	MaximumAttempts    int32
}

RetryPolicy defines retry behavior for failed activities.

type RetryPolicyOverride

type RetryPolicyOverride struct {
	InitialInterval    string // duration string, parsed when wired
	BackoffCoefficient float64
	MaxAttempts        int
}

RetryPolicyOverride configures per-item retry behavior. Reserved for a future task; the current runner does not consume it.

type SharedRateLimiter

type SharedRateLimiter struct {
	// contains filtered or unexported fields
}

SharedRateLimiter represents a rate limiter that can be shared across multiple nodes.

func NewSharedRateLimiter

func NewSharedRateLimiter(name string, requests int, per time.Duration) *SharedRateLimiter

NewSharedRateLimiter creates a new shared rate limiter.

func (*SharedRateLimiter) Close

func (s *SharedRateLimiter) Close()

Close unregisters the rate limiter from the global registry.

func (*SharedRateLimiter) ID

func (s *SharedRateLimiter) ID() string

ID returns the rate limiter's unique identifier.

type SignalBuffer

type SignalBuffer struct {
	// contains filtered or unexported fields
}

SignalBuffer holds buffered signal payloads per signal name. Storage is untyped at the map level because signals with different payload types share the buffer; type safety is enforced at registration (RegisterSignal[T]) and recovered at consumption (TakeSignal[T] / TakeAllSignals[T] / PeekSignal[T]).

Constructed once per FlowState by NewFlowState. Direct construction is not part of the public API; callers reach the buffer through the typed accessors above.

type SignalDef

type SignalDef struct {
	Name string
	// contains filtered or unexported fields
}

SignalDef is the internal record of a signal registration. Construct via RegisterSignal[T] — the typed registration helper that wires the receive pump and ensures payloads enter the buffer with a known type.

type StateBackend

type StateBackend interface {
	Load(workflowID, flowName string) (*PersistedState, error)
	Save(workflowID, flowName string, state *PersistedState) error
}

StateBackend interface for pluggable state persistence.

type StateConfig

type StateConfig struct {
	Backend   StateBackend
	Namespace string
}

StateConfig defines state persistence behavior.

type Step

type Step struct {
	// contains filtered or unexported fields
}

Step represents one execution unit within a flow. A step can contain one node (sequential), multiple nodes (parallel), a conditional branch, a gate, or child workflows.

func AsStep

func AsStep(n ExecutableNode) Step

AsStep wraps any ExecutableNode as a single-node sequential Step suitable for use inside Loop's Steps option (or anywhere a Step is expected). The step name defaults to the node name.

func Loop

func Loop[S any](opts ...LoopOption[S]) Step

Loop returns a Step that re-executes a Steps body while While returns true, capped by MaxIterations. State is threaded between iterations as the typed value S; the framework persists S at the FlowState key declared via StateKey before/after each iteration so observability hooks and downstream readers see a consistent value.

Determinism contract: Steps may invoke activities (no determinism constraint). The While predicate, BeforeIteration, AfterIteration, InitialState, FinalState, IterationHook, and MaxIterations all run in workflow code and must be deterministic.

Example

ExampleLoop demonstrates the Steps-based Loop API: each iteration runs the noop body Step, AfterIteration mutates typed state by incrementing a counter, and While exits when the counter reaches 2.

package main

import (
	"context"
	"fmt"

	"github.com/resolute-sh/resolute/core"
)

func main() {
	type tick struct{ N int }

	noop := func(_ context.Context, _ struct{}) (struct{}, error) {
		return struct{}{}, nil
	}
	noopNode := core.NewNode("noop", noop, struct{}{})

	step := core.Loop[tick](
		core.StateKey[tick]("ticker"),
		core.InitialState[tick](func(_ *core.FlowState) tick { return tick{N: 0} }),
		core.Steps[tick](core.AsStep(noopNode)),
		core.AfterIteration[tick](func(s tick, _ core.FlowStateReader) tick {
			s.N++
			return s
		}),
		core.While[tick](func(s tick) bool { return s.N < 2 }),
		core.MaxIterations[tick](10),
	)
	_ = step

	fmt.Println("loop step constructed")
}
Output:
loop step constructed

func ParallelEach

func ParallelEach[I, O any](opts ...ParallelEachOption[I, O]) Step

ParallelEach returns a Step that fans out one activity per element of a runtime-derived list, awaits all, and folds the results back into *FlowState via the user-provided merge closure.

Concretely: From(fs) -> []I; for each I, body.Execute(ctx, I) -> O; then merge(fs, []O) writes the aggregated results back. Activities are scheduled via workflow.Go; concurrency is bounded by MaxConcurrency when set.

Determinism contract: From and Merge run in workflow code and must be deterministic functions of *FlowState. The body activity has no determinism constraint.

History event emission (ParallelEachStarted/ParallelEachCompleted markers) is planned for a future task; the current implementation only surfaces item errors via the activity error chain.

Example

ExampleParallelEach demonstrates constructing a ParallelEach step that fans out one activity per element of a runtime-derived slice and folds the ordered results back into *FlowState via Merge.

The ParallelBody option is omitted here for brevity; production callers pass a real ItemActivity[I, O] whose Execute typically calls workflow.ExecuteActivity to invoke a registered activity. From and Merge run in workflow code and must be deterministic over *FlowState.

package main

import (
	"fmt"

	"github.com/resolute-sh/resolute/core"
)

func main() {
	step := core.ParallelEach[int, int](
		core.From[int, int](func(_ *core.FlowState) []int { return []int{1, 2, 3} }),
		core.Merge[int, int](func(_ *core.FlowState, _ []int) {}),
		core.MaxConcurrency[int, int](4),
	)
	_ = step

	fmt.Println("parallel-each step constructed")
}
Output:
parallel-each step constructed

type Storage

type Storage struct {
	// contains filtered or unexported fields
}

Storage provides typed helpers for storing and loading data.

func GetStorage

func GetStorage() (*Storage, error)

GetStorage returns the global storage instance. Initializes with local storage on first call.

func NewStorage

func NewStorage(backend StorageBackend) *Storage

NewStorage creates a new Storage with the given backend.

func (*Storage) Delete

func (s *Storage) Delete(ctx context.Context, ref DataRef) error

Delete removes data referenced by DataRef.

func (*Storage) LoadJSON

func (s *Storage) LoadJSON(ctx context.Context, ref DataRef, dest interface{}) error

LoadJSON loads JSON data from a DataRef into the provided destination.

func (*Storage) StoreJSON

func (s *Storage) StoreJSON(ctx context.Context, schema string, v interface{}) (DataRef, error)

StoreJSON stores any JSON-serializable data and returns a DataRef.

type StorageBackend

type StorageBackend interface {
	// Store saves data and returns a DataRef pointing to it.
	Store(ctx context.Context, schema string, data []byte) (DataRef, error)

	// Load retrieves data by its DataRef.
	Load(ctx context.Context, ref DataRef) ([]byte, error)

	// Delete removes data referenced by DataRef.
	Delete(ctx context.Context, ref DataRef) error

	// Backend returns the backend identifier.
	Backend() string
}

StorageBackend defines the interface for storing and retrieving data referenced by DataRef. Implementations handle the actual persistence.

type TestingT

type TestingT interface {
	Helper()
	Errorf(format string, args ...any)
}

TestingT is a subset of testing.T used for assertions.

type TokenBucket

type TokenBucket struct {
	// contains filtered or unexported fields
}

TokenBucket implements a token bucket rate limiter. It allows bursts up to the bucket capacity while maintaining an average rate over time.

func NewTokenBucket

func NewTokenBucket(requests int, per time.Duration) *TokenBucket

NewTokenBucket creates a new token bucket rate limiter. requests is the number of allowed requests per duration. The bucket starts full, allowing an initial burst.

func (*TokenBucket) Tokens

func (tb *TokenBucket) Tokens() float64

Tokens returns the current number of available tokens.

func (*TokenBucket) TryAcquire

func (tb *TokenBucket) TryAcquire() bool

TryAcquire attempts to acquire a token without blocking. Returns true if a token was acquired, false otherwise.

func (*TokenBucket) Wait

func (tb *TokenBucket) Wait(ctx context.Context) error

Wait blocks until a token is available or the context is cancelled.

type Trigger

type Trigger interface {
	// Type returns the trigger type identifier.
	Type() TriggerType

	// Config returns the trigger-specific configuration.
	Config() TriggerConfig
}

Trigger defines how a flow is initiated.

func Manual

func Manual(id string) Trigger

Manual creates a trigger for API-initiated flow execution.

Example:

flow := core.NewFlow("my-flow").
    TriggeredBy(core.Manual("api-trigger")).
    Then(myNode).
    Build()

func Schedule

func Schedule(cron string) Trigger

Schedule creates a trigger for cron-scheduled flow execution. Uses standard cron expression format (minute hour day month weekday). Defaults to UTC. Use ScheduleWithTimezone for timezone-aware scheduling.

Example:

flow := core.NewFlow("daily-sync").
    TriggeredBy(core.Schedule("0 2 * * *")).  // Daily at 2 AM UTC
    Then(syncNode).
    Build()

func ScheduleWithTimezone

func ScheduleWithTimezone(cron, timezone string) Trigger

ScheduleWithTimezone creates a trigger for cron-scheduled flow execution with an explicit IANA timezone (e.g. "America/Los_Angeles"). The timezone maps to Temporal's ScheduleSpec.TimeZoneName.

Example:

flow := core.NewFlow("morning-sync").
    TriggeredBy(core.ScheduleWithTimezone("0 6 * * 1-5", "America/Los_Angeles")).
    Then(syncNode).
    Build()

func Signal

func Signal(name string) Trigger

Signal creates a trigger that starts the flow from a Temporal signal.

Example:

flow := core.NewFlow("event-handler").
    TriggeredBy(core.Signal("new-event")).
    Then(handleEventNode).
    Build()

type TriggerConfig

type TriggerConfig struct {
	// ID is the identifier for manual triggers (API endpoint path).
	ID string

	// CronSchedule is the cron expression for scheduled triggers.
	CronSchedule string

	// Timezone is the IANA time zone name for scheduled triggers (e.g. "America/Los_Angeles").
	// Maps to Temporal's ScheduleSpec.TimeZoneName. Empty defaults to UTC.
	Timezone string

	// SignalName is the Temporal signal name for signal triggers.
	SignalName string

	// WebhookPath is the HTTP path for webhook triggers.
	WebhookPath string

	// WebhookMethod is the HTTP method for webhook triggers (default: POST).
	WebhookMethod string

	// WebhookSecret is the HMAC secret for webhook signature verification.
	WebhookSecret string
}

TriggerConfig holds trigger-specific configuration.

type TriggerType

type TriggerType string

TriggerType identifies the type of trigger.

const (
	// TriggerManual indicates the flow is started via API call.
	TriggerManual TriggerType = "manual"

	// TriggerSchedule indicates the flow runs on a cron schedule.
	TriggerSchedule TriggerType = "schedule"

	// TriggerSignal indicates the flow starts from a Temporal signal.
	TriggerSignal TriggerType = "signal"
)

type ValidationError

type ValidationError struct {
	Field   string
	Tag     string
	Message string
}

ValidationError represents a single field validation failure.

func (ValidationError) Error

func (e ValidationError) Error() string

type ValidationErrors

type ValidationErrors []ValidationError

ValidationErrors is a collection of validation failures.

func (ValidationErrors) Error

func (e ValidationErrors) Error() string

type WebhookBuilder

type WebhookBuilder struct {
	// contains filtered or unexported fields
}

WebhookBuilder provides a fluent API for configuring webhook triggers.

func Webhook

func Webhook(path string) *WebhookBuilder

Webhook creates a trigger for HTTP webhook-initiated flow execution. The path should start with "/" and will be the endpoint that receives webhook requests.

Example:

flow := core.NewFlow("github-handler").
    TriggeredBy(core.Webhook("/hooks/github").WithMethod("POST")).
    Then(processWebhookNode).
    Build()

func (*WebhookBuilder) Build

func (b *WebhookBuilder) Build() Trigger

Build returns the configured trigger. This is called implicitly when passed to TriggeredBy.

func (*WebhookBuilder) Config

func (b *WebhookBuilder) Config() TriggerConfig

Config implements Trigger interface for WebhookBuilder.

func (*WebhookBuilder) Type

func (b *WebhookBuilder) Type() TriggerType

Type implements Trigger interface for WebhookBuilder.

func (*WebhookBuilder) WithMethod

func (b *WebhookBuilder) WithMethod(method string) *WebhookBuilder

WithMethod sets the HTTP method for the webhook (default: POST).

func (*WebhookBuilder) WithSecret

func (b *WebhookBuilder) WithSecret(secret string) *WebhookBuilder

WithSecret sets the HMAC secret for webhook signature verification. When set, incoming webhooks must include a valid X-Webhook-Signature header.

type WebhookResponse

type WebhookResponse struct {
	WorkflowID string `json:"workflow_id"`
	RunID      string `json:"run_id"`
	Status     string `json:"status"`
}

WebhookResponse is returned to webhook callers.

type WebhookServer

type WebhookServer struct {
	// contains filtered or unexported fields
}

WebhookServer handles incoming webhook requests and starts Temporal workflows.

func NewWebhookServer

func NewWebhookServer(cfg WebhookServerConfig) *WebhookServer

NewWebhookServer creates a new webhook server.

func (*WebhookServer) Handler

func (s *WebhookServer) Handler() http.Handler

Handler returns the HTTP handler for use with custom servers.

func (*WebhookServer) RegisterFlow

func (s *WebhookServer) RegisterFlow(flow *Flow) error

RegisterFlow registers a flow with a webhook trigger. Returns an error if the flow doesn't have a webhook trigger or if the path is already registered.

func (*WebhookServer) Routes

func (s *WebhookServer) Routes() []string

Routes returns the registered webhook paths.

func (*WebhookServer) Shutdown

func (s *WebhookServer) Shutdown(ctx context.Context) error

Shutdown gracefully shuts down the webhook server.

func (*WebhookServer) Start

func (s *WebhookServer) Start(addr string) error

Start starts the webhook server on the given address.

func (*WebhookServer) StartAsync

func (s *WebhookServer) StartAsync(addr string) error

StartAsync starts the webhook server in the background. Returns immediately after the server starts listening.

type WebhookServerConfig

type WebhookServerConfig struct {
	Client    client.Client
	TaskQueue string
}

WebhookServerConfig holds configuration for the webhook server.

type Window

type Window struct {
	Size int // max items per batch (0 = disabled, fetch all)
}

Window configures batched/windowed processing for a node. When attached to a node in a parallel step, the framework loops: fetch batch → run downstream pipeline → persist cursors → repeat.

type WindowMeta

type WindowMeta struct {
	Cursor string
	Size   int
}

WindowMeta holds ephemeral window cursor and size for the current batch iteration.

type WindowedNode

type WindowedNode interface {
	ExecutableNode
	WindowConfig() Window
}

WindowedNode extends ExecutableNode with window configuration. Nodes that implement this interface support batched/windowed processing.

type WorkerBuilder

type WorkerBuilder struct {
	// contains filtered or unexported fields
}

WorkerBuilder provides a fluent API for constructing and running a Temporal worker.

func NewWorker

func NewWorker() *WorkerBuilder

NewWorker creates a new worker builder with environment defaults loaded.

func (*WorkerBuilder) Build

func (b *WorkerBuilder) Build() error

Build creates the Temporal client and worker without starting them. This is useful for testing or custom lifecycle management. Safe to call multiple times — subsequent calls are no-ops.

func (*WorkerBuilder) Client

func (b *WorkerBuilder) Client() client.Client

Client returns the underlying Temporal client after Build() has been called. Returns nil if Build() has not been called.

func (*WorkerBuilder) HealthServer

func (b *WorkerBuilder) HealthServer() *HealthServer

HealthServer returns the health server if configured. Returns nil if health server is not enabled or Build() has not been called.

func (*WorkerBuilder) Run

func (b *WorkerBuilder) Run() error

Run builds and runs the worker, blocking until interrupted. This is the typical entry point for a worker process.

func (*WorkerBuilder) RunAsync

func (b *WorkerBuilder) RunAsync() (shutdown func(), err error)

RunAsync builds and starts the worker in the background. Returns a shutdown function that should be called to stop the worker.

func (*WorkerBuilder) WebhookServer

func (b *WorkerBuilder) WebhookServer() *WebhookServer

WebhookServer returns the webhook server if configured. Returns nil if webhook server is not enabled or Build() has not been called.

func (*WorkerBuilder) WithConfig

func (b *WorkerBuilder) WithConfig(cfg WorkerConfig) *WorkerBuilder

WithConfig sets the worker configuration. Empty fields will be populated from environment variables or defaults.

func (*WorkerBuilder) WithFlow

func (b *WorkerBuilder) WithFlow(f *Flow) *WorkerBuilder

WithFlow adds a flow to be registered with this worker. Multiple flows can be registered by calling WithFlow multiple times.

func (*WorkerBuilder) WithHealthServer

func (b *WorkerBuilder) WithHealthServer(addr string) *WorkerBuilder

WithHealthServer enables Kubernetes-compatible health endpoints on the specified address. Provides /health/live, /health/ready, and /health/startup endpoints.

Example:

worker := core.NewWorker().
    WithConfig(cfg).
    WithFlow(flow).
    WithHealthServer(":8081").
    Run()

func (*WorkerBuilder) WithMetrics

func (b *WorkerBuilder) WithMetrics(exporter MetricsExporter) *WorkerBuilder

WithMetrics enables metrics collection with the provided exporter. Metrics are recorded for flow executions, activity durations, errors, and rate limiting.

Example:

worker := core.NewWorker().
    WithConfig(cfg).
    WithFlow(flow).
    WithMetrics(core.NewPrometheusExporter()).
    Run()

func (*WorkerBuilder) WithProviders

func (b *WorkerBuilder) WithProviders(providers ...Provider) *WorkerBuilder

WithProviders adds providers whose activities will be registered with the worker.

func (*WorkerBuilder) WithWebhookServer

func (b *WorkerBuilder) WithWebhookServer(addr string) *WorkerBuilder

WithWebhookServer enables the webhook server on the specified address. If the flow has a webhook trigger, incoming webhooks will start workflow executions.

Example:

worker := core.NewWorker().
    WithConfig(cfg).
    WithFlow(flow).
    WithWebhookServer(":8080").
    Run()

func (*WorkerBuilder) Worker

func (b *WorkerBuilder) Worker() worker.Worker

Worker returns the underlying Temporal worker after Build() has been called. Returns nil if Build() has not been called.

type WorkerConfig

type WorkerConfig struct {
	TemporalHost  string // Default: TEMPORAL_HOST env or "localhost:7233"
	TaskQueue     string // Required - no default
	Namespace     string // Default: TEMPORAL_NAMESPACE env or "default"
	MaxConcurrent int    // Default: 0 (unlimited)
}

WorkerConfig holds configuration for connecting to Temporal and running the worker.

func (*WorkerConfig) Validate

func (c *WorkerConfig) Validate() error

Validate checks that required fields are set.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL