starflow

package module
v0.0.0-...-afb83fc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 28, 2025 License: MIT Imports: 22 Imported by: 0

README

Starflow

Build Status Go Reference

A workflow engine for Go that enables deterministic and resumable workflow execution using Starlark scripting.

Features

Deterministic & Durable Workflows

Write workflows in Starlark (Python-like syntax) that are deterministic and can be replayed from any point with full durability guarantees. Every execution step is recorded and can be resumed exactly where it left off.

Pluggable Storage Backends

Any store that implements the simple Store interface can be used as a backend. The interface uses append-only operations with optimistic concurrency control.

Installation

go get github.com/dynoinc/starflow

Quick Start

For a complete working example, please see the example_test.go file in this repository. It demonstrates how to:

  • Create an in-memory store
  • Register functions for use in workflows
  • Define workflow scripts using Starlark
  • Execute workflows and retrieve results

You can run the example with:

go test -run Example

License

This project is licensed under the MIT License - see the LICENSE file for details.

Security

Please see SECURITY.md for security policy and reporting guidelines.

Documentation

Overview

Package starflow provides a workflow engine for Go that enables deterministic and resumable workflow execution using Starlark scripting. Every execution step is recorded and can be resumed exactly where it left off.

Key Features

  • Deterministic & Durable Workflows: Write workflows that are deterministic and can be replayed from any point with full durability guarantees
  • Pluggable Backends: Works with any backend that implements the Store interface
  • Resumable Workflows: Workflows can yield and resume based on external signals

For more information, see https://github.com/dynoinc/starflow

Example

Example demonstrates the basic usage of the starflow package. This example shows how to create a simple workflow that echoes input using JSON.

package main

import (
	"context"
	"fmt"

	"github.com/dynoinc/starflow"
)

// StringValue represents a simple string wrapper for the example
type StringValue struct {
	Value string `json:"value"`
}

// Example demonstrates the basic usage of the starflow package.
// This example shows how to create a simple workflow that echoes input using JSON.
func main() {
	// Create an in-memory store
	store := starflow.NewInMemoryStore()

	// Create a client with JSON-based types
	client := starflow.NewClient[StringValue, StringValue](store)

	// Register a simple echo function
	echoFn := func(ctx context.Context, req StringValue) (StringValue, error) {
		return StringValue{Value: "echo: " + req.Value}, nil
	}
	starflow.RegisterFunc(client, echoFn, starflow.WithName("module.echoFn"))

	// Define a simple workflow script using JSON
	script := `
def main(ctx, input):
    # Call our registered function with JSON data
    result = module.echoFn(ctx=ctx, req={"value": input["value"]})
    
    # Return the result
    return result
`

	// Run the workflow
	output, err := client.Run(context.Background(), "run-id", []byte(script), StringValue{Value: "hello world"})
	if err != nil {
		panic(err)
	}

	fmt.Printf("Result: %s\n", output.Value)

}
Output:

Result: echo: hello world

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrConcurrentUpdate = errors.New("concurrent update")

ErrConcurrentUpdate indicates optimistic concurrency failure. This error is returned when a concurrent update to a run is detected.

Functions

func GetRunID

func GetRunID(ctx context.Context) (string, bool)

GetRunID extracts runID from context

func NewYieldError

func NewYieldError(ctx context.Context) (string, string, error)

func RegisterFunc

func RegisterFunc[Input any, Output any, Req any, Res any](
	c *Client[Input, Output],
	fn func(ctx context.Context, req Req) (Res, error),
	opts ...Option,
)

RegisterFunc registers a Go function to be callable from Starlark using generics and reflection. The function must have the signature: func(ctx context.Context, req ReqType) (ResType, error) where ReqType and ResType are JSON-serializable types.

The function will be automatically named based on its package and function name, or you can override this using the WithName option.

func ValidateScript

func ValidateScript(script []byte) error

ValidateScript performs validation on the Starlark script. It checks for syntax errors and ensures the script has a main function.

Types

type CallEvent

type CallEvent struct {
	// contains filtered or unexported fields
}

CallEvent metadata

func NewCallEvent

func NewCallEvent(functionName string, input any) CallEvent

func (CallEvent) EventType

func (c CallEvent) EventType() EventType

func (CallEvent) FunctionName

func (c CallEvent) FunctionName() string

func (CallEvent) Input

func (c CallEvent) Input() any

func (CallEvent) MarshalJSON

func (c CallEvent) MarshalJSON() ([]byte, error)

func (*CallEvent) UnmarshalJSON

func (c *CallEvent) UnmarshalJSON(data []byte) error

type Client

type Client[Input any, Output any] struct {
	// contains filtered or unexported fields
}

Client provides an interface for creating and managing workflow runs.

func NewClient

func NewClient[Input any, Output any](store Store) *Client[Input, Output]

NewClient creates a new workflow client with the specified input type. The client uses the provided store for persistence and workflow management.

func (*Client[Input, Output]) GetEvents

func (c *Client[Input, Output]) GetEvents(ctx context.Context, runID string) ([]*Event, error)

GetEvents retrieves the execution history of a workflow run. Returns a chronological list of events that occurred during execution.

func (*Client[Input, Output]) Run

func (c *Client[Input, Output]) Run(ctx context.Context, runID string, script []byte, input Input) (Output, error)

Run creates a new workflow run with a script, and input, returning the run ID.

func (*Client[Input, Output]) Signal

func (c *Client[Input, Output]) Signal(ctx context.Context, runID, cid string, output any) error

Signal resumes a yielded workflow run with the provided output. The cid parameter should match the signal ID from the yield event.

type Event

type Event struct {
	Timestamp time.Time
	Metadata  EventMetadata
}

Event represents a single event in the execution history of a run.

func (Event) MarshalJSON

func (e Event) MarshalJSON() ([]byte, error)

MarshalJSON serializes the Event to JSON

func (Event) Type

func (e Event) Type() EventType

func (*Event) UnmarshalJSON

func (e *Event) UnmarshalJSON(data []byte) error

UnmarshalJSON deserializes JSON into an Event

type EventMetadata

type EventMetadata interface {
	EventType() EventType
}

type EventType

type EventType string

EventType represents the type of an event in the execution history.

const (
	EventTypeStart  EventType = "START"
	EventTypeFinish EventType = "FINISH"

	EventTypeCall   EventType = "CALL"
	EventTypeReturn EventType = "RETURN"

	EventTypeYield  EventType = "YIELD"
	EventTypeResume EventType = "RESUME"

	EventTypeSleep   EventType = "SLEEP"
	EventTypeTimeNow EventType = "TIME_NOW"
	EventTypeRandInt EventType = "RAND_INT"
)

type FinishEvent

type FinishEvent struct {
	// contains filtered or unexported fields
}

FinishEvent

func NewFinishEvent

func NewFinishEvent(output any, err error) FinishEvent

func (FinishEvent) EventType

func (f FinishEvent) EventType() EventType

func (FinishEvent) MarshalJSON

func (f FinishEvent) MarshalJSON() ([]byte, error)

func (FinishEvent) Output

func (f FinishEvent) Output() (any, error)

func (*FinishEvent) UnmarshalJSON

func (f *FinishEvent) UnmarshalJSON(data []byte) error

type InMemoryStore

type InMemoryStore struct {
	// contains filtered or unexported fields
}

InMemoryStore is an in-memory implementation of the Store interface. This store is suitable for testing, development, and single-instance deployments. All data is stored in memory and will be lost when the process terminates.

func NewInMemoryStore

func NewInMemoryStore() *InMemoryStore

NewInMemoryStore creates a new InMemoryStore.

func (*InMemoryStore) AppendEvent

func (s *InMemoryStore) AppendEvent(ctx context.Context, runID string, expectedVersion int, eventData []byte) (int, error)

AppendEvent appends an event to a run's history with optimistic concurrency control.

func (*InMemoryStore) GetEvents

func (s *InMemoryStore) GetEvents(ctx context.Context, runID string) ([][]byte, error)

GetEvents retrieves all events for a specific run, ordered by time.

func (*InMemoryStore) GetLastEvent

func (s *InMemoryStore) GetLastEvent(ctx context.Context, runID string) ([]byte, int, error)

GetLastEvent returns the last event for a given run.

type Option

type Option func(*registeredFn)

Option configures behaviour of a registered function.

func WithName

func WithName(name string) Option

WithName overrides the automatically derived name for the function. The name must be in the format "module.funcname".

func WithRetryPolicy

func WithRetryPolicy(b backoff.BackOff) Option

WithRetryPolicy specifies a retry policy for the function.

type RandIntEvent

type RandIntEvent struct {
	// contains filtered or unexported fields
}

RandIntEvent

func NewRandIntEvent

func NewRandIntEvent(result int64) RandIntEvent

func (RandIntEvent) EventType

func (r RandIntEvent) EventType() EventType

func (RandIntEvent) MarshalJSON

func (r RandIntEvent) MarshalJSON() ([]byte, error)

func (RandIntEvent) Result

func (r RandIntEvent) Result() int64

func (*RandIntEvent) UnmarshalJSON

func (r *RandIntEvent) UnmarshalJSON(data []byte) error

type ResumeEvent

type ResumeEvent struct {
	// contains filtered or unexported fields
}

ResumeEvent - now uses any for maximum flexibility

func NewResumeEvent

func NewResumeEvent(signalID string, output any) ResumeEvent

func (ResumeEvent) EventType

func (r ResumeEvent) EventType() EventType

func (ResumeEvent) MarshalJSON

func (r ResumeEvent) MarshalJSON() ([]byte, error)

func (ResumeEvent) Output

func (r ResumeEvent) Output() any

func (ResumeEvent) SignalID

func (r ResumeEvent) SignalID() string

func (*ResumeEvent) UnmarshalJSON

func (r *ResumeEvent) UnmarshalJSON(data []byte) error

type ReturnEvent

type ReturnEvent struct {
	// contains filtered or unexported fields
}

ReturnEvent metadata

func NewReturnEvent

func NewReturnEvent(output any, err error) ReturnEvent

func (ReturnEvent) EventType

func (r ReturnEvent) EventType() EventType

func (ReturnEvent) MarshalJSON

func (r ReturnEvent) MarshalJSON() ([]byte, error)

func (ReturnEvent) Output

func (r ReturnEvent) Output() (any, error)

func (*ReturnEvent) UnmarshalJSON

func (r *ReturnEvent) UnmarshalJSON(data []byte) error

type SleepEvent

type SleepEvent struct {
	// contains filtered or unexported fields
}

SleepEvent

func NewSleepEvent

func NewSleepEvent(wakeupAt time.Time) SleepEvent

func (SleepEvent) EventType

func (s SleepEvent) EventType() EventType

func (SleepEvent) MarshalJSON

func (s SleepEvent) MarshalJSON() ([]byte, error)

func (*SleepEvent) UnmarshalJSON

func (s *SleepEvent) UnmarshalJSON(data []byte) error

func (SleepEvent) WakeupAt

func (s SleepEvent) WakeupAt() time.Time

type StartEvent

type StartEvent struct {
	// contains filtered or unexported fields
}

StartEvent metadata

func NewStartEvent

func NewStartEvent(scriptHash string, input any) StartEvent

func (StartEvent) EventType

func (s StartEvent) EventType() EventType

func (StartEvent) Input

func (s StartEvent) Input() any

func (StartEvent) MarshalJSON

func (s StartEvent) MarshalJSON() ([]byte, error)

func (StartEvent) ScriptHash

func (s StartEvent) ScriptHash() string

func (*StartEvent) UnmarshalJSON

func (s *StartEvent) UnmarshalJSON(data []byte) error

type Store

type Store interface {
	// AppendEvent appends an event to a run's history.
	// expectedVersion should match the current number of events for the run.
	// Returns the new version (number of events) after append.
	// Returns ErrConcurrentUpdate if expectedVersion doesn't match current version.
	AppendEvent(ctx context.Context, runID string, expectedVersion int, eventData []byte) (int, error)

	// GetEvents returns the event data for a given run in the order they were recorded.
	GetEvents(ctx context.Context, runID string) ([][]byte, error)

	// GetLastEvent returns the last event data and version for a given run.
	GetLastEvent(ctx context.Context, runID string) ([]byte, int, error)
}

Store is the interface for persisting workflow data. This is a simple append-only interface with optimistic concurrency control.

type TimeNowEvent

type TimeNowEvent struct {
	// contains filtered or unexported fields
}

TimeNowEvent

func NewTimeNowEvent

func NewTimeNowEvent(timestamp time.Time) TimeNowEvent

func (TimeNowEvent) EventType

func (t TimeNowEvent) EventType() EventType

func (TimeNowEvent) MarshalJSON

func (t TimeNowEvent) MarshalJSON() ([]byte, error)

func (TimeNowEvent) Timestamp

func (t TimeNowEvent) Timestamp() time.Time

func (*TimeNowEvent) UnmarshalJSON

func (t *TimeNowEvent) UnmarshalJSON(data []byte) error

type YieldError

type YieldError struct {
	// contains filtered or unexported fields
}

YieldError is returned when the script yields waiting for a signal.

func (*YieldError) Error

func (e *YieldError) Error() string

func (*YieldError) Is

func (y *YieldError) Is(target error) bool

type YieldEvent

type YieldEvent struct {
	// contains filtered or unexported fields
}

YieldEvent

func NewYieldEvent

func NewYieldEvent(signalID, runID string) YieldEvent

func (YieldEvent) EventType

func (y YieldEvent) EventType() EventType

func (YieldEvent) MarshalJSON

func (y YieldEvent) MarshalJSON() ([]byte, error)

func (YieldEvent) RunID

func (y YieldEvent) RunID() string

func (YieldEvent) SignalID

func (y YieldEvent) SignalID() string

func (*YieldEvent) UnmarshalJSON

func (y *YieldEvent) UnmarshalJSON(data []byte) error

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL