Documentation
¶
Overview ¶
Package starflow provides a workflow engine for Go that enables deterministic and resumable workflow execution using Starlark scripting. Every execution step is recorded and can be resumed exactly where it left off.
Key Features ¶
- Deterministic & Durable Workflows: Write workflows that are deterministic and can be replayed from any point with full durability guarantees
- Pluggable Backends: Works with any backend that implements the Store interface
- Resumable Workflows: Workflows can yield and resume based on external signals
For more information, see https://github.com/dynoinc/starflow
Example ¶
Example demonstrates the basic usage of the starflow package. This example shows how to create a simple workflow that echoes input using JSON.
package main import ( "context" "fmt" "github.com/dynoinc/starflow" ) // StringValue represents a simple string wrapper for the example type StringValue struct { Value string `json:"value"` } // Example demonstrates the basic usage of the starflow package. // This example shows how to create a simple workflow that echoes input using JSON. func main() { // Create an in-memory store store := starflow.NewInMemoryStore() // Create a client with JSON-based types client := starflow.NewClient[StringValue, StringValue](store) // Register a simple echo function echoFn := func(ctx context.Context, req StringValue) (StringValue, error) { return StringValue{Value: "echo: " + req.Value}, nil } starflow.RegisterFunc(client, echoFn, starflow.WithName("module.echoFn")) // Define a simple workflow script using JSON script := ` def main(ctx, input): # Call our registered function with JSON data result = module.echoFn(ctx=ctx, req={"value": input["value"]}) # Return the result return result ` // Run the workflow output, err := client.Run(context.Background(), "run-id", []byte(script), StringValue{Value: "hello world"}) if err != nil { panic(err) } fmt.Printf("Result: %s\n", output.Value) }
Output: Result: echo: hello world
Index ¶
- Variables
- func GetRunID(ctx context.Context) (string, bool)
- func NewYieldError(ctx context.Context) (string, string, error)
- func RegisterFunc[Input any, Output any, Req any, Res any](c *Client[Input, Output], fn func(ctx context.Context, req Req) (Res, error), ...)
- func ValidateScript(script []byte) error
- type CallEvent
- type Client
- func (c *Client[Input, Output]) GetEvents(ctx context.Context, runID string) ([]*Event, error)
- func (c *Client[Input, Output]) Run(ctx context.Context, runID string, script []byte, input Input) (Output, error)
- func (c *Client[Input, Output]) Signal(ctx context.Context, runID, cid string, output any) error
- type Event
- type EventMetadata
- type EventType
- type FinishEvent
- type InMemoryStore
- func (s *InMemoryStore) AppendEvent(ctx context.Context, runID string, expectedVersion int, eventData []byte) (int, error)
- func (s *InMemoryStore) GetEvents(ctx context.Context, runID string) ([][]byte, error)
- func (s *InMemoryStore) GetLastEvent(ctx context.Context, runID string) ([]byte, int, error)
- type Option
- type RandIntEvent
- type ResumeEvent
- type ReturnEvent
- type SleepEvent
- type StartEvent
- type Store
- type TimeNowEvent
- type YieldError
- type YieldEvent
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrConcurrentUpdate = errors.New("concurrent update")
ErrConcurrentUpdate indicates optimistic concurrency failure. This error is returned when a concurrent update to a run is detected.
Functions ¶
func RegisterFunc ¶
func RegisterFunc[Input any, Output any, Req any, Res any]( c *Client[Input, Output], fn func(ctx context.Context, req Req) (Res, error), opts ...Option, )
RegisterFunc registers a Go function to be callable from Starlark using generics and reflection. The function must have the signature: func(ctx context.Context, req ReqType) (ResType, error) where ReqType and ResType are JSON-serializable types.
The function will be automatically named based on its package and function name, or you can override this using the WithName option.
func ValidateScript ¶
ValidateScript performs validation on the Starlark script. It checks for syntax errors and ensures the script has a main function.
Types ¶
type CallEvent ¶
type CallEvent struct {
// contains filtered or unexported fields
}
CallEvent metadata
func NewCallEvent ¶
func (CallEvent) FunctionName ¶
func (CallEvent) MarshalJSON ¶
func (*CallEvent) UnmarshalJSON ¶
type Client ¶
Client provides an interface for creating and managing workflow runs.
func NewClient ¶
NewClient creates a new workflow client with the specified input type. The client uses the provided store for persistence and workflow management.
func (*Client[Input, Output]) GetEvents ¶
GetEvents retrieves the execution history of a workflow run. Returns a chronological list of events that occurred during execution.
type Event ¶
type Event struct { Timestamp time.Time Metadata EventMetadata }
Event represents a single event in the execution history of a run.
func (Event) MarshalJSON ¶
MarshalJSON serializes the Event to JSON
func (*Event) UnmarshalJSON ¶
UnmarshalJSON deserializes JSON into an Event
type EventMetadata ¶
type EventMetadata interface {
EventType() EventType
}
type EventType ¶
type EventType string
EventType represents the type of an event in the execution history.
const ( EventTypeStart EventType = "START" EventTypeFinish EventType = "FINISH" EventTypeCall EventType = "CALL" EventTypeReturn EventType = "RETURN" EventTypeYield EventType = "YIELD" EventTypeResume EventType = "RESUME" EventTypeSleep EventType = "SLEEP" EventTypeTimeNow EventType = "TIME_NOW" EventTypeRandInt EventType = "RAND_INT" )
type FinishEvent ¶
type FinishEvent struct {
// contains filtered or unexported fields
}
FinishEvent
func NewFinishEvent ¶
func NewFinishEvent(output any, err error) FinishEvent
func (FinishEvent) EventType ¶
func (f FinishEvent) EventType() EventType
func (FinishEvent) MarshalJSON ¶
func (f FinishEvent) MarshalJSON() ([]byte, error)
func (FinishEvent) Output ¶
func (f FinishEvent) Output() (any, error)
func (*FinishEvent) UnmarshalJSON ¶
func (f *FinishEvent) UnmarshalJSON(data []byte) error
type InMemoryStore ¶
type InMemoryStore struct {
// contains filtered or unexported fields
}
InMemoryStore is an in-memory implementation of the Store interface. This store is suitable for testing, development, and single-instance deployments. All data is stored in memory and will be lost when the process terminates.
func NewInMemoryStore ¶
func NewInMemoryStore() *InMemoryStore
NewInMemoryStore creates a new InMemoryStore.
func (*InMemoryStore) AppendEvent ¶
func (s *InMemoryStore) AppendEvent(ctx context.Context, runID string, expectedVersion int, eventData []byte) (int, error)
AppendEvent appends an event to a run's history with optimistic concurrency control.
func (*InMemoryStore) GetEvents ¶
GetEvents retrieves all events for a specific run, ordered by time.
func (*InMemoryStore) GetLastEvent ¶
GetLastEvent returns the last event for a given run.
type Option ¶
type Option func(*registeredFn)
Option configures behaviour of a registered function.
func WithName ¶
WithName overrides the automatically derived name for the function. The name must be in the format "module.funcname".
func WithRetryPolicy ¶
func WithRetryPolicy(b backoff.BackOff) Option
WithRetryPolicy specifies a retry policy for the function.
type RandIntEvent ¶
type RandIntEvent struct {
// contains filtered or unexported fields
}
RandIntEvent
func NewRandIntEvent ¶
func NewRandIntEvent(result int64) RandIntEvent
func (RandIntEvent) EventType ¶
func (r RandIntEvent) EventType() EventType
func (RandIntEvent) MarshalJSON ¶
func (r RandIntEvent) MarshalJSON() ([]byte, error)
func (RandIntEvent) Result ¶
func (r RandIntEvent) Result() int64
func (*RandIntEvent) UnmarshalJSON ¶
func (r *RandIntEvent) UnmarshalJSON(data []byte) error
type ResumeEvent ¶
type ResumeEvent struct {
// contains filtered or unexported fields
}
ResumeEvent - now uses any for maximum flexibility
func NewResumeEvent ¶
func NewResumeEvent(signalID string, output any) ResumeEvent
func (ResumeEvent) EventType ¶
func (r ResumeEvent) EventType() EventType
func (ResumeEvent) MarshalJSON ¶
func (r ResumeEvent) MarshalJSON() ([]byte, error)
func (ResumeEvent) Output ¶
func (r ResumeEvent) Output() any
func (ResumeEvent) SignalID ¶
func (r ResumeEvent) SignalID() string
func (*ResumeEvent) UnmarshalJSON ¶
func (r *ResumeEvent) UnmarshalJSON(data []byte) error
type ReturnEvent ¶
type ReturnEvent struct {
// contains filtered or unexported fields
}
ReturnEvent metadata
func NewReturnEvent ¶
func NewReturnEvent(output any, err error) ReturnEvent
func (ReturnEvent) EventType ¶
func (r ReturnEvent) EventType() EventType
func (ReturnEvent) MarshalJSON ¶
func (r ReturnEvent) MarshalJSON() ([]byte, error)
func (ReturnEvent) Output ¶
func (r ReturnEvent) Output() (any, error)
func (*ReturnEvent) UnmarshalJSON ¶
func (r *ReturnEvent) UnmarshalJSON(data []byte) error
type SleepEvent ¶
type SleepEvent struct {
// contains filtered or unexported fields
}
SleepEvent
func NewSleepEvent ¶
func NewSleepEvent(wakeupAt time.Time) SleepEvent
func (SleepEvent) EventType ¶
func (s SleepEvent) EventType() EventType
func (SleepEvent) MarshalJSON ¶
func (s SleepEvent) MarshalJSON() ([]byte, error)
func (*SleepEvent) UnmarshalJSON ¶
func (s *SleepEvent) UnmarshalJSON(data []byte) error
func (SleepEvent) WakeupAt ¶
func (s SleepEvent) WakeupAt() time.Time
type StartEvent ¶
type StartEvent struct {
// contains filtered or unexported fields
}
StartEvent metadata
func NewStartEvent ¶
func NewStartEvent(scriptHash string, input any) StartEvent
func (StartEvent) EventType ¶
func (s StartEvent) EventType() EventType
func (StartEvent) Input ¶
func (s StartEvent) Input() any
func (StartEvent) MarshalJSON ¶
func (s StartEvent) MarshalJSON() ([]byte, error)
func (StartEvent) ScriptHash ¶
func (s StartEvent) ScriptHash() string
func (*StartEvent) UnmarshalJSON ¶
func (s *StartEvent) UnmarshalJSON(data []byte) error
type Store ¶
type Store interface { // AppendEvent appends an event to a run's history. // expectedVersion should match the current number of events for the run. // Returns the new version (number of events) after append. // Returns ErrConcurrentUpdate if expectedVersion doesn't match current version. AppendEvent(ctx context.Context, runID string, expectedVersion int, eventData []byte) (int, error) // GetEvents returns the event data for a given run in the order they were recorded. GetEvents(ctx context.Context, runID string) ([][]byte, error) // GetLastEvent returns the last event data and version for a given run. GetLastEvent(ctx context.Context, runID string) ([]byte, int, error) }
Store is the interface for persisting workflow data. This is a simple append-only interface with optimistic concurrency control.
type TimeNowEvent ¶
type TimeNowEvent struct {
// contains filtered or unexported fields
}
TimeNowEvent
func NewTimeNowEvent ¶
func NewTimeNowEvent(timestamp time.Time) TimeNowEvent
func (TimeNowEvent) EventType ¶
func (t TimeNowEvent) EventType() EventType
func (TimeNowEvent) MarshalJSON ¶
func (t TimeNowEvent) MarshalJSON() ([]byte, error)
func (TimeNowEvent) Timestamp ¶
func (t TimeNowEvent) Timestamp() time.Time
func (*TimeNowEvent) UnmarshalJSON ¶
func (t *TimeNowEvent) UnmarshalJSON(data []byte) error
type YieldError ¶
type YieldError struct {
// contains filtered or unexported fields
}
YieldError is returned when the script yields waiting for a signal.
func (*YieldError) Error ¶
func (e *YieldError) Error() string
func (*YieldError) Is ¶
func (y *YieldError) Is(target error) bool
type YieldEvent ¶
type YieldEvent struct {
// contains filtered or unexported fields
}
YieldEvent
func NewYieldEvent ¶
func NewYieldEvent(signalID, runID string) YieldEvent
func (YieldEvent) EventType ¶
func (y YieldEvent) EventType() EventType
func (YieldEvent) MarshalJSON ¶
func (y YieldEvent) MarshalJSON() ([]byte, error)
func (YieldEvent) RunID ¶
func (y YieldEvent) RunID() string
func (YieldEvent) SignalID ¶
func (y YieldEvent) SignalID() string
func (*YieldEvent) UnmarshalJSON ¶
func (y *YieldEvent) UnmarshalJSON(data []byte) error