workflow

package module
v0.4.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 30, 2023 License: BSD-3-Clause Imports: 25 Imported by: 0

README

Go Go Report Card codecov

alt text

Workflow is a Golang workflow framework that encompasses these main features:

  • Defining small units of work called "Steps"
  • Consumer management and graceful shutdown
  • Supports event streaming platforms such as Kafka and Reflex (or you can write your own implementation of the EventStreamer interface!)
  • Built in support for timeout operations (e.g. account cool down periods etc).
  • Built in support for callbacks (e.g. Call an async endpoint and trigger the callback from a webhook handler).
  • Connect two workflows together. Wait for specific events from another workflow and make it part of your workflow!
  • Super Duper testable

Example

Head on over to ./examples to get familiar with the syntax 😊

Glossary

  1. API:

    • An interface providing methods for interacting with workflows. It includes functionality for triggering, scheduling, awaiting, and stopping workflows.
  2. Await:

    • A method in the workflow API that blocks until a workflow with a specific runID reaches a specified status. It returns the record associated with that status.
  3. Builder:

    • A struct type that facilitates the construction of workflows. It provides methods for adding steps, callbacks, timeouts, and connecting workflows.
  4. Build:

    • A method in the builder that finalizes the construction of the workflow and returns the built workflow.
  5. BuildOption:

    • A functional option for configuring the build process, such as specifying a custom clock or enabling debug mode.
  6. Callback:

    • A method in the workflow API that can be used to trigger a callback function for a specified status. It passes data from a reader to the specified callback function.
  7. CallbackFunc:

    • A function type representing a callback in the workflow, triggered when transitioning from one status to another.
  8. ConnectWorkflow:

    • A method in the builder that connects a workflow to another workflow using a connector configuration.
  9. ConnectorConfig:

    • A configuration struct representing the settings for a connector, including workflow name, status, stream, filter, and consumer.
  10. Consumer:

    • A component that consumes events from an event stream. In this context, it refers to the background consumer goroutines launched by the workflow.
  11. ConsumerFunc:

    • A function type representing a step in the workflow that consumes records and transitions to a specified status.
  12. DebugMode:

    • A configuration option that, when enabled, causes the workflow to operate in debug mode, providing additional information or logging for debugging purposes.
  13. DurationTimerFunc:

    • A function that creates a timer function based on a specified duration.
  14. Endpoints:

    • Statuses in the workflow that do not have any outgoing transitions, indicating the end points of the workflow.
  15. EventStreamer:

    • An interface representing a stream for workflow events. It includes methods for producing and consuming events.
  16. Graph:

    • A representation of the workflow's structure, showing the relationships between different statuses and transitions.
  17. InternalState:

    • A map holding the state of all expected consumers and timeout goroutines using their role names as keys. It is protected by a mutex to ensure thread safety.
  18. MermaidDiagram:

    • A function generating a Mermaid diagram for the workflow structure based on the provided Workflow, path, and MermaidDirection.
  19. MermaidDirection:

    • A type representing the direction of the Mermaid diagram, such as TopToBottom, LeftToRight, RightToLeft, or BottomToTop.
  20. Not:

    • A function that negates the result of another consumer function, used as a filter for steps.
  21. Producer:

    • A component that produces events to an event stream. It is responsible for sending events to the stream.
  22. RecordStore:

    • An interface representing a store for workflow records. It includes methods for storing and retrieving records.
  23. RoleScheduler:

    • An interface representing a scheduler for roles in the workflow. It is responsible for coordinating the execution of different roles.
  24. Run:

    • A method in the workflow struct that starts background processes necessary for running the workflow, such as consumers, timeouts, and connectors.
  25. ScheduleTrigger:

    • A method in the workflow API that schedules workflow triggers at specified intervals using a cron-like specification.
  26. State:

    • An enumeration representing the state of a consumer or timeout goroutine. Possible states include StateUnknown, StateShutdown, and others.
  27. StepOption:

    • A functional option for configuring step-specific settings, such as parallel count, polling frequency, and error backoff.
  28. TimeoutFunc:

    • A function type representing a timeout action in the workflow, executed when a specified time duration has elapsed.
  29. TimeoutOption:

    • A functional option for configuring timeout-specific settings, such as polling frequency and error backoff.
  30. Topic:

    • A method that generates a topic for producing events in the event streamer based on the workflow name and status.
  31. TimerFunc:

    • A function type used to calculate the time for a timeout. It takes a context, record, and the current time as inputs.
  32. Trigger:

    • A method in the workflow API that initiates a workflow for a specified foreignID and starting status. It returns a runID and allows for additional configuration options.
  33. WithClock:

    • A build option that sets a custom clock for the workflow.
  34. WithDebugMode:

    • A build option that enables debug mode for the workflow.
  35. WithParallelCount:

    • A step option that sets the parallel count for a step, indicating how many instances of the step can run concurrently.
  36. WithStepErrBackOff:

    • A step option that sets the error backoff duration for a step, specifying the time to wait before retrying in case of an error.
  37. WithStepPollingFrequency:

    • A step option that sets the polling frequency for a step, determining how often the step should check for updates.
  38. WithTimeoutErrBackOff:

    • A timeout option that sets the error backoff duration for a timeout transition.
  39. WithTimeoutPollingFrequency:

    • A timeout option that sets the polling frequency for a timeout transition.
  40. WireFormat:

    • A format used for serializing and deserializing data for communication between workflow components. It refers to the wire format of the WireRecord.
  41. WireRecord:

    • A struct representing a record with additional metadata used for communication between workflow components. It can be marshaled to a wire format for storage and transmission.

Authors

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrWorkflowShutdown            = errors.New("workflow has been shutdown")
	ErrCursorNotFound              = errors.New("cursor not found")
	ErrRecordNotFound              = errors.New("record not found")
	ErrTimeoutNotFound             = errors.New("timeout not found")
	ErrRunIDNotFound               = errors.New("run ID not found")
	ErrWorkflowInProgress          = errors.New("current workflow still in progress - retry once complete")
	ErrWorkflowNotRunning          = errors.New("trigger failed - workflow is not running")
	ErrStatusProvidedNotConfigured = errors.New("status provided is not configured for workflow")
)

Functions

func AwaitTimeoutInsert

func AwaitTimeoutInsert[Type any, Status StatusType](t *testing.T, w *Workflow[Type, Status], foreignID, runID string, waitFor Status)

func Marshal

func Marshal[T any](t *T) ([]byte, error)

func MermaidDiagram

func MermaidDiagram[Type any, Status StatusType](w *Workflow[Type, Status], path string, d MermaidDirection) error

func ParseTopic

func ParseTopic(topic string) (workflowName string, statusType int, err error)

func Require

func Require[Type any, Status StatusType](t *testing.T, w *Workflow[Type, Status], foreignID string, waitFor Status, expected Type)

func ToProto

func ToProto(r *WireRecord) *workflowpb.Record

func Topic

func Topic(workflowName string, statusType int) string

func TriggerCallbackOn

func TriggerCallbackOn[Type any, Status StatusType, Payload any](t *testing.T, w *Workflow[Type, Status], foreignID, runID string, waitFor Status, p Payload)

func Unmarshal

func Unmarshal[T any](b []byte, t *T) error

Types

type API

type API[Type any, Status StatusType] interface {
	// Trigger will kickstart a workflow for the provided foreignID starting from the provided starting status. There
	// is no limitation as to where you start the workflow from. For workflows that have data preceding the initial
	// trigger that needs to be used in the workflow, using WithInitialValue will allow you to provide pre-populated
	// fields of Type that can be accessed by the consumers.
	//
	// foreignID should not be random and should be deterministic for the thing that you are running the workflow for.
	// This especially helps when connecting other workflows as the foreignID is the only way to connect the streams. The
	// same goes for Callback as you will need the foreignID to connect the callback back to the workflow instance that
	// was run.
	Trigger(ctx context.Context, foreignID string, startingStatus Status, opts ...TriggerOption[Type, Status]) (runID string, err error)

	// ScheduleTrigger takes a cron spec and will call Trigger at the specified intervals. ScheduleTrigger is a blocking
	// call and will return ErrWorkflowNotRunning or ErrStatusProvidedNotConfigured to indicate that it cannot begin to
	// schedule. All schedule errors will be retried indefinitely. The same options are available for ScheduleTrigger
	// as they are for Trigger.
	ScheduleTrigger(ctx context.Context, foreignID string, startingStatus Status, spec string, opts ...TriggerOption[Type, Status]) error

	// Await is a blocking call that returns the typed Record when the workflow of the specified run ID reaches the
	// specified status.
	Await(ctx context.Context, foreignID, runID string, status Status, opts ...AwaitOption) (*Record[Type, Status], error)

	// Callback can be used if Builder.AddCallback has been defined for the provided status. The data in the reader
	// will be passed to the CallbackFunc that you specify and so the serialisation and deserialisation is in the
	// hands of the user.
	Callback(ctx context.Context, foreignID string, status Status, payload io.Reader) error

	// Run must be called in order to start up all the background consumers / consumers required to run the workflow. Run
	// only needs to be called once. Any subsequent calls to run are safe and are noop.
	Run(ctx context.Context)

	// Stop tells the workflow to shut down gracefully.
	Stop()
}

type Ack

type Ack func() error

Ack is used for the event streamer to update its cursor of what messages have been consumed. If Ack is not called then the event streamer, depending on implementation, will likely not keep track of which records / events have been consumed.

type AwaitOption

type AwaitOption func(o *awaitOpts)

func WithPollingFrequency

func WithPollingFrequency(d time.Duration) AwaitOption

type BuildOption

type BuildOption func(w *buildOptions)

func WithClock

func WithClock(c clock.Clock) BuildOption

func WithDebugMode

func WithDebugMode() BuildOption

type Builder

type Builder[Type any, Status StatusType] struct {
	// contains filtered or unexported fields
}

func NewBuilder

func NewBuilder[Type any, Status StatusType](name string) *Builder[Type, Status]

func (*Builder[Type, Status]) AddCallback

func (b *Builder[Type, Status]) AddCallback(from Status, fn CallbackFunc[Type, Status], to Status)

func (*Builder[Type, Status]) AddConnector

func (b *Builder[Type, Status]) AddConnector(name string, c Consumer, cf ConnectorFunc[Type, Status], opts ...ConnectorOption)

func (*Builder[Type, Status]) AddStep

func (b *Builder[Type, Status]) AddStep(from Status, c ConsumerFunc[Type, Status], to Status, opts ...StepOption)

func (*Builder[Type, Status]) AddTimeout

func (b *Builder[Type, Status]) AddTimeout(from Status, timer TimerFunc[Type, Status], tf TimeoutFunc[Type, Status], to Status, opts ...TimeoutOption)

func (*Builder[Type, Status]) AddWorkflowConnector

func (b *Builder[Type, Status]) AddWorkflowConnector(cd WorkflowConnectionDetails, filter ConnectorFilter, from Status, consumer ConnectorConsumerFunc[Type, Status], to Status, opts ...StepOption)

func (*Builder[Type, Status]) Build

func (b *Builder[Type, Status]) Build(eventStreamer EventStreamer, recordStore RecordStore, timeoutStore TimeoutStore, roleScheduler RoleScheduler, opts ...BuildOption) *Workflow[Type, Status]

type CallbackFunc

type CallbackFunc[Type any, Status StatusType] func(ctx context.Context, r *Record[Type, Status], reader io.Reader) (bool, error)

type ConnectorConsumerFunc

type ConnectorConsumerFunc[Type any, Status StatusType] func(ctx context.Context, r *Record[Type, Status], e *Event) (bool, error)

type ConnectorFilter

type ConnectorFilter func(ctx context.Context, e *Event) (foreignID string, err error)

ConnectorFilter should return an empty string as the foreignID if the event should be filtered out / skipped, and it should be non-empty if event should be processed. The value of foreignID should match the foreignID of your workflow.

type ConnectorFunc

type ConnectorFunc[Type any, Status StatusType] func(ctx context.Context, w *Workflow[Type, Status], e *Event) error

type ConnectorOption

type ConnectorOption func(co *connectorOptions)

func WithConnectorErrBackOff

func WithConnectorErrBackOff(d time.Duration) ConnectorOption

func WithConnectorParallelCount

func WithConnectorParallelCount(instances int) ConnectorOption

type Consumer

type Consumer interface {
	Recv(ctx context.Context) (*Event, Ack, error)
	Close() error
}

type ConsumerFunc

type ConsumerFunc[Type any, Status StatusType] func(ctx context.Context, r *Record[Type, Status]) (bool, error)

ConsumerFunc provides a record that is expected to be modified if the data needs to change. If true is returned with a nil error then the record, along with its modifications, will be stored. If false is returned with a nil error then the record will not be stored and the event will be skipped and move onto the next event. If a non-nil error is returned then the consumer will back off and try again until a nil error occurs or the retry max has been reached if a Dead Letter Queue has been configured for the workflow.

func Not

func Not[Type any, Status StatusType](c ConsumerFunc[Type, Status]) ConsumerFunc[Type, Status]

type ConsumerOption

type ConsumerOption func(*ConsumerOptions)

func WithConsumerPollFrequency

func WithConsumerPollFrequency(d time.Duration) ConsumerOption

func WithEventFilter

func WithEventFilter(ef EventFilter) ConsumerOption

type ConsumerOptions

type ConsumerOptions struct {
	PollFrequency time.Duration
	EventFilter   EventFilter
}

type Cursor

type Cursor interface {
	Get(ctx context.Context, name string) (string, error)
	Set(ctx context.Context, name, value string) error
}

type Event

type Event struct {
	// ID is a unique ID for the event generated by the event streamer.
	ID int64

	// ForeignID refers to the ID of a record in the record store.
	ForeignID int64

	// Type relates to the StatusType that the associated record changed to.
	Type int

	// Headers stores meta-data in a simple and easily queryable way.
	Headers map[Header]string

	// CreatedAt is the time that the event was produced and is generated by the event streamer.
	CreatedAt time.Time
}

type EventEmitter

type EventEmitter func(id int64) error

EventEmitter is a function that gets called before committing the change to the store. The store needs to support transactions if it is implemented as an append only datastore to allow rolling back if the event fails to emit.

type EventFilter

type EventFilter func(e *Event) bool

EventFilter can be passed to the event streaming implementation to allow specific consumers to have an earlier on filtering process. True is returned when the event should be skipped.

type EventStreamer

type EventStreamer interface {
	NewProducer(topic string) Producer
	NewConsumer(topic string, name string, opts ...ConsumerOption) Consumer
}
type Header string
const (
	HeaderWorkflowName      Header = "workflow_name"
	HeaderWorkflowForeignID Header = "workflow_foreign_id"
	HeaderTopic             Header = "topic"
	HeaderRunID             Header = "run_id"
)

type MermaidDirection

type MermaidDirection string
const (
	UnknownDirection     MermaidDirection = ""
	TopToBottomDirection MermaidDirection = "TB"
	LeftToRightDirection MermaidDirection = "LR"
	RightToLeftDirection MermaidDirection = "RL"
	BottomToTopDirection MermaidDirection = "BT"
)

type MermaidFormat

type MermaidFormat struct {
	Direction      MermaidDirection
	StartingPoints []string
	TerminalPoints []string
	Transitions    []MermaidTransition
}

type MermaidTransition

type MermaidTransition struct {
	From string
	To   string
}

type Producer

type Producer interface {
	Send(ctx context.Context, recordID int64, statusType int, headers map[Header]string) error
	Close() error
}

type Record

type Record[Type any, Status StatusType] struct {
	WireRecord
	Status Status
	Object *Type
}

type RecordStore

type RecordStore interface {
	// Store should create or update a record depending on whether the underlying store is mutable or append only. Store
	// should implement transactions if it is supported especially if the Store is append-only as a new ID for the
	// record will need to be passed to the event emitter.
	Store(ctx context.Context, record *WireRecord, eventEmitter EventEmitter) error
	Lookup(ctx context.Context, id int64) (*WireRecord, error)
	Latest(ctx context.Context, workflowName, foreignID string) (*WireRecord, error)
}

type RoleScheduler

type RoleScheduler interface {
	Await(ctx context.Context, role string) (context.Context, context.CancelFunc, error)
}

type State

type State string
const (
	StateUnknown  State = ""
	StateIdle     State = "Idle"
	StateRunning  State = "Running"
	StateShutdown State = "Shutdown"
)

type StatusType

type StatusType interface {
	~int | ~int32 | ~int64

	String() string
}

type StepOption

type StepOption func(so *stepOptions)

func WithParallelCount

func WithParallelCount(instances int) StepOption

func WithStepErrBackOff

func WithStepErrBackOff(d time.Duration) StepOption

func WithStepLagAlert

func WithStepLagAlert(d time.Duration) StepOption

func WithStepPollingFrequency

func WithStepPollingFrequency(d time.Duration) StepOption

type TestingRecordStore

type TestingRecordStore interface {
	RecordStore

	Snapshots(workflowName, foreignID, runID string) []*WireRecord
	SetSnapshotOffset(workflowName, foreignID, runID string, offset int)
	SnapshotOffset(workflowName, foreignID, runID string) int
}

type Timeout

type Timeout struct {
	ID           int64
	WorkflowName string
	ForeignID    string
	RunID        string
	Status       int
	Completed    bool
	ExpireAt     time.Time
	CreatedAt    time.Time
}

type TimeoutFunc

type TimeoutFunc[Type any, Status StatusType] func(ctx context.Context, r *Record[Type, Status], now time.Time) (bool, error)

TimeoutFunc runs once the timeout has expired which is set by TimerFunc. If false is returned with a nil error then the timeout is skipped and not retried at a later date. If a non-nil error is returned the TimeoutFunc will be called again until a nil error is returned. If true is returned with a nil error then the provided record and any modifications made to it will be stored and the status updated - continuing the workflow.

type TimeoutOption

type TimeoutOption func(so *timeoutOptions)

func WithTimeoutErrBackOff

func WithTimeoutErrBackOff(d time.Duration) TimeoutOption

func WithTimeoutLagAlert

func WithTimeoutLagAlert(d time.Duration) TimeoutOption

func WithTimeoutPollingFrequency

func WithTimeoutPollingFrequency(d time.Duration) TimeoutOption

type TimeoutStore

type TimeoutStore interface {
	Create(ctx context.Context, workflowName, foreignID, runID string, status int, expireAt time.Time) error
	Complete(ctx context.Context, id int64) error
	Cancel(ctx context.Context, id int64) error
	List(ctx context.Context, workflowName string) ([]Timeout, error)
	ListValid(ctx context.Context, workflowName string, status int, now time.Time) ([]Timeout, error)
}

type TimerFunc

type TimerFunc[Type any, Status StatusType] func(ctx context.Context, r *Record[Type, Status], now time.Time) (time.Time, error)

TimerFunc exists to allow the specification of when the timeout should expire dynamically. If not time is set then a timeout will not be created and the event will be skipped. If the time is set then a timeout will be created and once expired TimeoutFunc will be called. Any non-nil error will be retried with backoff.

func DurationTimerFunc

func DurationTimerFunc[Type any, Status StatusType](duration time.Duration) TimerFunc[Type, Status]

func TimeTimerFunc

func TimeTimerFunc[Type any, Status StatusType](t time.Time) TimerFunc[Type, Status]

type TriggerOption

type TriggerOption[Type any, Status StatusType] func(o *triggerOpts[Type, Status])

func WithInitialValue

func WithInitialValue[Type any, Status StatusType](t *Type) TriggerOption[Type, Status]

type WireRecord

type WireRecord struct {
	ID           int64
	WorkflowName string
	ForeignID    string
	RunID        string
	Status       int
	IsStart      bool
	IsEnd        bool
	Object       []byte
	CreatedAt    time.Time
}

func UnmarshalRecord

func UnmarshalRecord(b []byte) (*WireRecord, error)

func (*WireRecord) ProtoMarshal

func (r *WireRecord) ProtoMarshal() ([]byte, error)

type Workflow

type Workflow[Type any, Status StatusType] struct {
	Name string
	// contains filtered or unexported fields
}

func (*Workflow[Type, Status]) Await

func (w *Workflow[Type, Status]) Await(ctx context.Context, foreignID, runID string, status Status, opts ...AwaitOption) (*Record[Type, Status], error)

func (*Workflow[Type, Status]) Callback

func (w *Workflow[Type, Status]) Callback(ctx context.Context, foreignID string, status Status, payload io.Reader) error

func (*Workflow[Type, Status]) Run

func (w *Workflow[Type, Status]) Run(ctx context.Context)

func (*Workflow[Type, Status]) ScheduleTrigger

func (w *Workflow[Type, Status]) ScheduleTrigger(foreignID string, startingStatus Status, spec string, opts ...TriggerOption[Type, Status]) error

func (*Workflow[Type, Status]) States

func (w *Workflow[Type, Status]) States() map[string]State

func (*Workflow[Type, Status]) Stop

func (w *Workflow[Type, Status]) Stop()

Stop cancels the context provided to all the background processes that the workflow launched and waits for all of them to shut down gracefully.

func (*Workflow[Type, Status]) Trigger

func (w *Workflow[Type, Status]) Trigger(ctx context.Context, foreignID string, startingStatus Status, opts ...TriggerOption[Type, Status]) (runID string, err error)

type WorkflowConnectionDetails

type WorkflowConnectionDetails struct {
	WorkflowName string
	Status       int
	Stream       EventStreamer
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL