Documentation
¶
Index ¶
- Variables
- func AwaitTimeoutInsert[Type any, Status StatusType](t testing.TB, w *Workflow[Type, Status], foreignID, runID string, ...)
- func CreateDiagram[Type any, Status StatusType](a API[Type, Status], path string, d MermaidDirection) error
- func DeleteForever(ctx context.Context, workflowName string, processName string, c Consumer, ...) error
- func DeleteTopic(workflowName string) string
- func FilterConnectorEventUsing(e *ConnectorEvent, filters ...ConnectorEventFilter) bool
- func FilterUsing(e *Event, filters ...EventFilter) bool
- func MakeFilter(filters ...RecordFilter) *recordFilters
- func Marshal[T any](t *T) ([]byte, error)
- func Require[Type any, Status StatusType](t testing.TB, w *Workflow[Type, Status], foreignID string, ...)
- func RunStateChangeTopic(workflowName string) string
- func Topic(workflowName string, statusType int) string
- func TriggerCallbackOn[Type any, Status StatusType, Payload any](t testing.TB, w *Workflow[Type, Status], foreignID, runID string, ...)
- func Unmarshal[T any](b []byte, t *T) error
- func WaitFor[Type any, Status StatusType](t testing.TB, w *Workflow[Type, Status], foreignID string, ...)
- type API
- type Ack
- type AwaitOption
- type BuildOption
- func WithClock(c clock.Clock) BuildOption
- func WithCustomDelete[Type any](fn func(object *Type) error) BuildOption
- func WithDebugMode() BuildOption
- func WithDefaultOptions(opts ...Option) BuildOption
- func WithLogger(l Logger) BuildOption
- func WithOutboxErrBackoff(d time.Duration) BuildOption
- func WithOutboxLagAlert(d time.Duration) BuildOption
- func WithOutboxLookupLimit(limit int64) BuildOption
- func WithOutboxParallelCount(count int) BuildOption
- func WithOutboxPollingFrequency(d time.Duration) BuildOption
- func WithTimeoutStore(s TimeoutStore) BuildOption
- type Builder
- func (b *Builder[Type, Status]) AddCallback(from Status, fn CallbackFunc[Type, Status], allowedDestinations ...Status)
- func (b *Builder[Type, Status]) AddConnector(name string, csc ConnectorConstructor, cf ConnectorFunc[Type, Status]) *connectorUpdater[Type, Status]
- func (b *Builder[Type, Status]) AddStep(from Status, c ConsumerFunc[Type, Status], allowedDestinations ...Status) *stepUpdater[Type, Status]
- func (b *Builder[Type, Status]) AddTimeout(from Status, timer TimerFunc[Type, Status], tf TimeoutFunc[Type, Status], ...) *timeoutUpdater[Type, Status]
- func (b *Builder[Type, Status]) Build(eventStreamer EventStreamer, recordStore RecordStore, ...) *Workflow[Type, Status]
- func (b *Builder[Type, Status]) OnCancel(hook RunStateChangeHookFunc[Type, Status])
- func (b *Builder[Type, Status]) OnComplete(hook RunStateChangeHookFunc[Type, Status])
- func (b *Builder[Type, Status]) OnPause(hook RunStateChangeHookFunc[Type, Status])
- type CallbackFunc
- type ConnectorConstructor
- type ConnectorConsumer
- type ConnectorEvent
- type ConnectorEventFilter
- type ConnectorFunc
- type Consumer
- type ConsumerFunc
- type ConsumerOption
- type ConsumerOptions
- type Event
- type EventFilter
- type EventStreamer
- type FilterValue
- type Header
- type Logger
- type MermaidDirection
- type MermaidFormat
- type MermaidTransition
- type Option
- type OrderType
- type OutboxEvent
- type OutboxEventData
- type Producer
- type Record
- type RecordFilter
- type RecordStore
- type RoleScheduler
- type Run
- type RunState
- type RunStateChangeHookFunc
- type RunStateController
- type ScheduleOption
- type SkipType
- type State
- type StatusType
- type TestingRecordStore
- type TestingRunOption
- func WithCancelFn(cancel func(ctx context.Context) error) TestingRunOption
- func WithDeleteDataFn(deleteData func(ctx context.Context) error) TestingRunOption
- func WithPauseFn(pause func(ctx context.Context) error) TestingRunOption
- func WithResumeFn(resume func(ctx context.Context) error) TestingRunOption
- type TimeoutFunc
- type TimeoutRecord
- type TimeoutStore
- type TimerFunc
- type TriggerOption
- type TypedRecord
- type Workflow
- func (w *Workflow[Type, Status]) Await(ctx context.Context, foreignID, runID string, status Status, ...) (*Run[Type, Status], error)
- func (w *Workflow[Type, Status]) Callback(ctx context.Context, foreignID string, status Status, payload io.Reader) error
- func (w *Workflow[Type, Status]) Name() string
- func (w *Workflow[Type, Status]) Run(ctx context.Context)
- func (w *Workflow[Type, Status]) Schedule(foreignID string, startingStatus Status, spec string, ...) error
- func (w *Workflow[Type, Status]) States() map[string]State
- func (w *Workflow[Type, Status]) Stop()
- func (w *Workflow[Type, Status]) Trigger(ctx context.Context, foreignID string, startingStatus Status, ...) (runID string, err error)
Constants ¶
This section is empty.
Variables ¶
var ( ErrRecordNotFound = errors.New("record not found") ErrTimeoutNotFound = errors.New("timeout not found") ErrWorkflowInProgress = errors.New("current workflow still in progress - retry once complete") ErrOutboxRecordNotFound = errors.New("outbox record not found") ErrInvalidTransition = errors.New("invalid transition") )
Functions ¶
func AwaitTimeoutInsert ¶
func CreateDiagram ¶ added in v0.2.0
func CreateDiagram[Type any, Status StatusType](a API[Type, Status], path string, d MermaidDirection) error
CreateDiagram creates a diagram in a md file for communicating a workflow's set of steps in an easy-to-understand manner.
func DeleteForever ¶ added in v0.1.2
func DeleteTopic ¶ added in v0.1.2
func FilterConnectorEventUsing ¶ added in v0.1.2
func FilterConnectorEventUsing(e *ConnectorEvent, filters ...ConnectorEventFilter) bool
func FilterUsing ¶ added in v0.1.2
func FilterUsing(e *Event, filters ...EventFilter) bool
func MakeFilter ¶ added in v0.1.2
func MakeFilter(filters ...RecordFilter) *recordFilters
func RunStateChangeTopic ¶ added in v0.2.0
func TriggerCallbackOn ¶
Types ¶
type API ¶
type API[Type any, Status StatusType] interface { // Name returns the name of the implemented workflow. Name() string // Trigger will kickstart a workflow for the provided foreignID starting from the provided starting status. There // is no limitation as to where you start the workflow from. For workflows that have data preceding the initial // trigger that needs to be used in the workflow, using WithInitialValue will allow you to provide pre-populated // fields of Type that can be accessed by the consumers. // // foreignID should not be random and should be deterministic for the thing that you are running the workflow for. // This especially helps when connecting other workflows as the foreignID is the only way to connect the streams. The // same goes for Callback as you will need the foreignID to connect the callback back to the workflow instance that // was run. Trigger( ctx context.Context, foreignID string, startingStatus Status, opts ...TriggerOption[Type, Status], ) (runID string, err error) // Schedule takes a cron spec and will call Trigger at the specified intervals. Schedule is a blocking call and all // schedule errors will be retried indefinitely. The same options are available for Schedule as they are // for Trigger. Schedule(foreignID string, startingStatus Status, spec string, opts ...ScheduleOption[Type, Status]) error // Await is a blocking call that returns the typed Run when the workflow of the specified run ID reaches the // specified status. Await(ctx context.Context, foreignID, runID string, status Status, opts ...AwaitOption) (*Run[Type, Status], error) // Callback can be used if Builder.AddCallback has been defined for the provided status. The data in the reader // will be passed to the CallbackFunc that you specify and so the serialisation and deserialisation is in the // hands of the user. Callback(ctx context.Context, foreignID string, status Status, payload io.Reader) error // Run must be called in order to start up all the background consumers / consumers required to run the workflow. Run // only needs to be called once. Any subsequent calls to run are safe and are noop. Run(ctx context.Context) // Stop tells the workflow to shut down gracefully. Stop() }
type Ack ¶
type Ack func() error
Ack is used for the event streamer to safeUpdate its cursor of what messages have been consumed. If Ack is not called then the event streamer, depending on implementation, will likely not keep track of which records / events have been consumed.
type AwaitOption ¶
type AwaitOption func(o *awaitOpts)
func WithAwaitPollingFrequency ¶ added in v0.1.2
func WithAwaitPollingFrequency(d time.Duration) AwaitOption
type BuildOption ¶
type BuildOption func(w *buildOptions)
func WithClock ¶
func WithClock(c clock.Clock) BuildOption
WithClock allows the configuring of workflow's use and access of time. Instead of using time.Now() and other associated functionality from the time package a clock is used instead in order to make it testable.
func WithCustomDelete ¶ added in v0.1.2
func WithCustomDelete[Type any](fn func(object *Type) error) BuildOption
WithCustomDelete allows for specifying a custom deleter function for scrubbing PII data when a workflow Run enters RunStateRequestedDataDeleted and is the function that once executed successfully allows for the RunState to move to RunStateDataDeleted.
func WithDebugMode ¶
func WithDebugMode() BuildOption
WithDebugMode enabled debug mode for a workflow which results in increased logs such as when processes ar launched, shutdown, events are skipped etc.
func WithDefaultOptions ¶ added in v0.1.2
func WithDefaultOptions(opts ...Option) BuildOption
WithDefaultOptions applies the provided options to the entire workflow and not just to an individual process.
func WithLogger ¶ added in v0.2.0
func WithLogger(l Logger) BuildOption
WithLogger allows for specifying a custom logger. The default is to use a wrapped version of log/slog's Logger.
func WithOutboxErrBackoff ¶ added in v0.1.2
func WithOutboxErrBackoff(d time.Duration) BuildOption
func WithOutboxLagAlert ¶ added in v0.1.2
func WithOutboxLagAlert(d time.Duration) BuildOption
func WithOutboxLookupLimit ¶ added in v0.1.2
func WithOutboxLookupLimit(limit int64) BuildOption
func WithOutboxParallelCount ¶ added in v0.1.2
func WithOutboxParallelCount(count int) BuildOption
func WithOutboxPollingFrequency ¶ added in v0.1.2
func WithOutboxPollingFrequency(d time.Duration) BuildOption
func WithTimeoutStore ¶ added in v0.1.2
func WithTimeoutStore(s TimeoutStore) BuildOption
WithTimeoutStore allows the configuration of a TimeoutStore which is required when using timeouts in a workflow. It is not required by default as timeouts are less common of a feature requirement but when needed the abstraction of complexity of handling scheduling, expiring, and executing are incredibly useful and is included as one of the three key feature offerings of workflow which are sequential steps, callbacks, and timeouts.
type Builder ¶
type Builder[Type any, Status StatusType] struct { // contains filtered or unexported fields }
func NewBuilder ¶
func NewBuilder[Type any, Status StatusType](name string) *Builder[Type, Status]
func (*Builder[Type, Status]) AddCallback ¶
func (b *Builder[Type, Status]) AddCallback(from Status, fn CallbackFunc[Type, Status], allowedDestinations ...Status)
func (*Builder[Type, Status]) AddConnector ¶
func (b *Builder[Type, Status]) AddConnector( name string, csc ConnectorConstructor, cf ConnectorFunc[Type, Status], ) *connectorUpdater[Type, Status]
func (*Builder[Type, Status]) AddStep ¶
func (b *Builder[Type, Status]) AddStep( from Status, c ConsumerFunc[Type, Status], allowedDestinations ...Status, ) *stepUpdater[Type, Status]
func (*Builder[Type, Status]) AddTimeout ¶
func (b *Builder[Type, Status]) AddTimeout( from Status, timer TimerFunc[Type, Status], tf TimeoutFunc[Type, Status], allowedDestinations ...Status, ) *timeoutUpdater[Type, Status]
func (*Builder[Type, Status]) Build ¶
func (b *Builder[Type, Status]) Build( eventStreamer EventStreamer, recordStore RecordStore, roleScheduler RoleScheduler, opts ...BuildOption, ) *Workflow[Type, Status]
func (*Builder[Type, Status]) OnCancel ¶ added in v0.2.0
func (b *Builder[Type, Status]) OnCancel(hook RunStateChangeHookFunc[Type, Status])
func (*Builder[Type, Status]) OnComplete ¶ added in v0.2.0
func (b *Builder[Type, Status]) OnComplete(hook RunStateChangeHookFunc[Type, Status])
func (*Builder[Type, Status]) OnPause ¶ added in v0.2.0
func (b *Builder[Type, Status]) OnPause(hook RunStateChangeHookFunc[Type, Status])
type CallbackFunc ¶
type ConnectorConstructor ¶ added in v0.1.2
type ConnectorConstructor interface {
Make(ctx context.Context, consumerName string) (ConnectorConsumer, error)
}
type ConnectorConsumer ¶ added in v0.1.2
type ConnectorEvent ¶ added in v0.1.2
type ConnectorEvent struct {
// ID is a unique ID for the event.
ID string
// ForeignID refers to the ID of the element that the event relates to.
ForeignID string
// Type relates to the StatusType that the associated record changed to.
Type string
// Headers stores meta-data in a simple and easily queryable way.
Headers map[string]string
// CreatedAt is the time that the event was produced and is generated by the event streamer.
CreatedAt time.Time
}
ConnectorEvent defines a schema that is inline with how workflow uses an event notification pattern. This means that events only tell us what happened and do not transmit the state change. ConnectorEvent differs slightly from Event in that all fields, except for CreatedAt, are string based and allows representation relations to elements with string identifiers and string based types.
type ConnectorEventFilter ¶ added in v0.1.2
type ConnectorEventFilter func(e *ConnectorEvent) bool
ConnectorEventFilter can be passed to the event streaming implementation to allow specific consumers to have an earlier on filtering process. True is returned when the event should be skipped.
type ConnectorFunc ¶
type ConnectorFunc[Type any, Status StatusType] func(ctx context.Context, w API[Type, Status], e *ConnectorEvent) error
type ConsumerFunc ¶
type ConsumerFunc[Type any, Status StatusType] func(ctx context.Context, r *Run[Type, Status]) (Status, error)
ConsumerFunc provides a record that is expected to be modified if the data needs to change. If true is returned with a nil error then the record, along with its modifications, will be stored. If false is returned with a nil error then the record will not be stored and the event will be skipped and move onto the next event. If a non-nil error is returned then the consumer will back off and try again until a nil error occurs or the retry max has been reached if a Dead Letter Queue has been configured for the workflow.
type ConsumerOption ¶
type ConsumerOption func(*ConsumerOptions)
func WithConsumerPollFrequency ¶
func WithConsumerPollFrequency(d time.Duration) ConsumerOption
type ConsumerOptions ¶
type Event ¶
type Event struct {
// ID is a unique ID for the event generated by the event streamer.
ID int64
// ForeignID refers to the ID of a record in the record store.
ForeignID string
// Type relates to the StatusType that the associated record changed to.
Type int
// Headers stores meta-data in a simple and easily queryable way.
Headers map[Header]string
// CreatedAt is the time that the event was produced and is generated by the event streamer.
CreatedAt time.Time
}
type EventFilter ¶
EventFilter can be passed to the event streaming implementation to allow specific consumers to have an earlier on filtering process. True is returned when the event should be skipped.
type EventStreamer ¶
type EventStreamer interface {
NewProducer(ctx context.Context, topic string) (Producer, error)
NewConsumer(ctx context.Context, topic string, name string, opts ...ConsumerOption) (Consumer, error)
}
EventStreamer implementations should all be tested with adaptertest.TestEventStreamer
type FilterValue ¶ added in v0.1.2
type Logger ¶ added in v0.2.0
type Logger interface {
// Debug will be used by workflow for debug logs when in debug mode.
Debug(ctx context.Context, msg string, meta map[string]string)
// Error is used when writing errors to the logs.
Error(ctx context.Context, err error)
}
Logger interface allows the user of Workflow to provide a custom logger and not use the default which is provided in internal/logger. Workflow only writes two types of logs: Debug and Error. Error is only used at the highest level where an auto-retry process (consumers and pollers) errors and retries.
Error is used only when the error cannot be passed back to the caller and cannot be bubbled up any further.
Debug is used only when the Workflow is built with WithDebugMode.
type MermaidDirection ¶
type MermaidDirection string
const ( UnknownDirection MermaidDirection = "" TopToBottomDirection MermaidDirection = "TB" LeftToRightDirection MermaidDirection = "LR" RightToLeftDirection MermaidDirection = "RL" BottomToTopDirection MermaidDirection = "BT" )
type MermaidFormat ¶
type MermaidFormat struct {
WorkflowName string
Direction MermaidDirection
Nodes []int
StartingPoints []int
TerminalPoints []int
Transitions []MermaidTransition
}
type MermaidTransition ¶
type Option ¶ added in v0.1.2
type Option func(so *options)
func ConsumeLag ¶ added in v0.1.2
ConsumeLag defines the age of the event that the consumer will consume. The workflow consumer will not consume events newer than the time specified and will wait to consume them.
func ErrBackOff ¶ added in v0.1.2
ErrBackOff defines the time duration of the backoff of the workflow process when an error is encountered.
func LagAlert ¶ added in v0.1.2
LagAlert defines the time duration / threshold before the prometheus metric defined in /internal/metrics/metrics.go switches to true which means that the workflow consumer is struggling to consume events fast enough and might need to be converted to a parallel consumer.
func ParallelCount ¶ added in v0.1.2
ParallelCount defines the number of instances of the workflow process. The processes are shareded consistently and will be provided a name such as "consumer-1-of-5" to show the instance number and the total number of instances that the process is a part of.
func PauseAfterErrCount ¶ added in v0.1.2
PauseAfterErrCount defines the number of times an error can occur until the record is updated to RunStatePaused which is similar to a Dead Letter Queue in the sense that the record will no longer be processed and won't block the workflow's consumers and can be investigated and retried later on.
func PollingFrequency ¶ added in v0.1.2
PollingFrequency defines the time duration of which the workflow process will poll for changes.
type OutboxEvent ¶ added in v0.1.2
type OutboxEvent struct {
// ID is a unique ID for this specific OutboxEvent.
ID string
// WorkflowName refers to the name of the workflow that the OutboxEventData belongs to.
WorkflowName string
// Data represents a slice of bytes the OutboxEventDataMaker constructs via serialising event data
// in an expected way for it to also be deserialized by the outbox consumer.
Data []byte
// CreatedAt is the time that this specific OutboxEvent was produced.
CreatedAt time.Time
}
type OutboxEventData ¶ added in v0.1.2
type OutboxEventData struct {
ID string
// WorkflowName refers to the name of the workflow that the OutboxEventData belongs to.
WorkflowName string
// Data represents a slice of bytes the OutboxEventDataMaker constructs via serialising event data
// in an expected way for it to also be deserialized by the outbox consumer.
Data []byte
}
func MakeOutboxEventData ¶ added in v0.2.0
func MakeOutboxEventData(record Record) (OutboxEventData, error)
MakeOutboxEventData creates a OutboxEventData that houses all the information that must be stored and be retrievable from the outbox.
type Record ¶
type Record struct {
WorkflowName string
ForeignID string
RunID string
RunState RunState
Status int
Object []byte
CreatedAt time.Time
UpdatedAt time.Time
}
Record is the cornerstone of Workflow. Record must always be wire compatible with no generics as it's intended purpose is to be the stored structure of a Run.
type RecordFilter ¶ added in v0.1.2
type RecordFilter func(filters *recordFilters)
func FilterByForeignID ¶ added in v0.1.2
func FilterByForeignID(val string) RecordFilter
func FilterByRunState ¶ added in v0.1.2
func FilterByRunState(rs RunState) RecordFilter
func FilterByStatus ¶ added in v0.1.2
func FilterByStatus(status int64) RecordFilter
type RecordStore ¶
type RecordStore interface {
// Store should create or update a record depending on whether the underlying store is mutable or append only. Store
// must implement transactions and a separate outbox store to store the outbox record (that should be
// generated using MakeOutboxEventData) which can be retrieved when calling ListOutboxEvents and can be
// deleted when DeleteOutboxEvent is called.
Store(ctx context.Context, record *Record) error
Lookup(ctx context.Context, runID string) (*Record, error)
Latest(ctx context.Context, workflowName, foreignID string) (*Record, error)
// List provides a slice of Record where the total items will be equal or less than the limit depending
// on the offset provided and how many records remain after that ID.
List(ctx context.Context, workflowName string, offsetID int64, limit int, order OrderType, filters ...RecordFilter) ([]Record, error)
// ListOutboxEvents lists all events that are yet to be published to the event streamer. A requirement for
// implementation of the RecordStore is to support a Transactional Outbox that has Event's written to it when
// Store is called.
ListOutboxEvents(ctx context.Context, workflowName string, limit int64) ([]OutboxEvent, error)
// DeleteOutboxEvent will expect an Event's ID field and will remove the event from the outbox store when the
// event has successfully been published to the event streamer.
DeleteOutboxEvent(ctx context.Context, id string) error
}
RecordStore implementations should all be tested with adaptertest.TestRecordStore. The underlying implementation of store must support transactions or the ability to commit the record and an outbox event in a single call as well as being able to obtain an ID for the record before it is created.
type RoleScheduler ¶
type RoleScheduler interface {
// Await must return a child context of the provided (parent) context. Await should block until the role is
// assigned to the caller. Only one caller should be able to be assigned the role at any given time. The returned
// context.CancelFunc is called after each process execution. Some process executions can be more long living and
// others not but if any process errors the context.CancelFunc will be called after the specified error backoff
// has finished.
Await(ctx context.Context, role string) (context.Context, context.CancelFunc, error)
}
RoleScheduler implementations should all be tested with adaptertest.TestRoleScheduler
type Run ¶ added in v0.2.0
type Run[Type any, Status StatusType] struct { TypedRecord[Type, Status] // contains filtered or unexported fields }
Run is a representation of a workflow run. It incorporates all the fields from the Record as well as having defined types for the Status and Object fields along with access to the RunStateController which controls the state of the run aka "RunState".
func NewTestingRun ¶ added in v0.2.0
func NewTestingRun[Type any, Status StatusType]( t *testing.T, wr Record, object Type, opts ...TestingRunOption, ) Run[Type, Status]
NewTestingRun should be used when testing logic that defines a workflow.Run as a parameter. This is usually the case in unit tests and would not normally be found when doing an Acceptance test for the entire workflow.
func (*Run[Type, Status]) Cancel ¶ added in v0.2.0
Cancel is intended to be used inside a workflow process where (Status, error) are the return signature. This allows the user to simply type "return r.Cancel(ctx)" to cancel a record from inside a workflow which results in the record being permanently left alone and will not be processed.
func (*Run[Type, Status]) Pause ¶ added in v0.2.0
Pause is intended to be used inside a workflow process where (Status, error) are the return signature. This allows the user to simply type "return r.Pause(ctx)" to pause a record from inside a workflow which results in the record being temporarily left alone and will not be processed until it is resumed.
type RunState ¶ added in v0.1.2
type RunState int
func (RunState) Stopped ¶ added in v0.1.2
Stopped is the type of status that requires consumers to ignore the workflow run as it is in a stopped state. Only paused workflow runs can be resumed and must be done so via the workflow API or the Run methods. All cancelled workflow runs are cancelled permanently and cannot be undone whereas Pausing can be resumed.
type RunStateChangeHookFunc ¶ added in v0.2.0
type RunStateChangeHookFunc[Type any, Status StatusType] func(ctx context.Context, record *TypedRecord[Type, Status]) error
RunStateChangeHookFunc defines the function signature for all hooks associated to the run.
type RunStateController ¶ added in v0.1.2
type RunStateController interface {
// Pause will take the workflow run specified and move it into a temporary state where it will no longer be processed.
// A paused workflow run can be resumed by calling Resume. ErrUnableToPause is returned when a workflow is not in a
// state to be paused.
Pause(ctx context.Context) error
// Cancel can be called after Pause has been called. A paused run of the workflow can be indefinitely cancelled.
// Once cancelled, DeleteData can be called and will move the run into an indefinite state of DataDeleted.
// ErrUnableToCancel is returned when the workflow record is not in a state to be cancelled.
Cancel(ctx context.Context) error
// Resume can be called on a workflow run that has been paused. ErrUnableToResume is returned when the workflow
// run is not in a state to be resumed.
Resume(ctx context.Context) error
// DeleteData can be called after a workflow run has been completed or cancelled. DeleteData should be used to
// comply with the right to be forgotten such as complying with GDPR. ErrUnableToDelete is returned when the
// workflow run is not in a state to be deleted.
DeleteData(ctx context.Context) error
}
RunStateController allows the interaction with a specific workflow record.
func NewRunStateController ¶ added in v0.1.2
func NewRunStateController(store storeFunc, wr *Record) RunStateController
type ScheduleOption ¶ added in v0.1.2
type ScheduleOption[Type any, Status StatusType] func(o *scheduleOpts[Type, Status])
func WithScheduleFilter ¶ added in v0.1.2
func WithScheduleFilter[Type any, Status StatusType]( fn func(ctx context.Context) (bool, error), ) ScheduleOption[Type, Status]
func WithScheduleInitialValue ¶ added in v0.1.2
func WithScheduleInitialValue[Type any, Status StatusType](t *Type) ScheduleOption[Type, Status]
type TestingRecordStore ¶
type TestingRecordStore interface {
RecordStore
Snapshots(workflowName, foreignID, runID string) []*Record
}
type TestingRunOption ¶ added in v0.2.0
type TestingRunOption func(*testingRunOpts)
func WithCancelFn ¶ added in v0.2.0
func WithCancelFn(cancel func(ctx context.Context) error) TestingRunOption
func WithDeleteDataFn ¶ added in v0.2.0
func WithDeleteDataFn(deleteData func(ctx context.Context) error) TestingRunOption
func WithPauseFn ¶ added in v0.2.0
func WithPauseFn(pause func(ctx context.Context) error) TestingRunOption
func WithResumeFn ¶ added in v0.2.0
func WithResumeFn(resume func(ctx context.Context) error) TestingRunOption
type TimeoutFunc ¶
type TimeoutFunc[Type any, Status StatusType] func(ctx context.Context, r *Run[Type, Status], now time.Time) (Status, error)
TimeoutFunc runs once the timeout has expired which is set by TimerFunc. If false is returned with a nil error then the timeout is skipped and not retried at a later date. If a non-nil error is returned the TimeoutFunc will be called again until a nil error is returned. If true is returned with a nil error then the provided record and any modifications made to it will be stored and the status updated - continuing the workflow.
type TimeoutRecord ¶ added in v0.1.2
type TimeoutStore ¶
type TimeoutStore interface {
Create(ctx context.Context, workflowName, foreignID, runID string, status int, expireAt time.Time) error
Complete(ctx context.Context, id int64) error
Cancel(ctx context.Context, id int64) error
List(ctx context.Context, workflowName string) ([]TimeoutRecord, error)
ListValid(ctx context.Context, workflowName string, status int, now time.Time) ([]TimeoutRecord, error)
}
TimeoutStore implementations should all be tested with adaptertest.TestTimeoutStore
type TimerFunc ¶
type TimerFunc[Type any, Status StatusType] func(ctx context.Context, r *Run[Type, Status], now time.Time) (time.Time, error)
TimerFunc exists to allow the specification of when the timeout should expire dynamically. If not time is set then a timeout will not be created and the event will be skipped. If the time is set then a timeout will be created and once expired TimeoutFunc will be called. Any non-nil error will be retried with backoff.
func DurationTimerFunc ¶
func DurationTimerFunc[Type any, Status StatusType](duration time.Duration) TimerFunc[Type, Status]
func TimeTimerFunc ¶
func TimeTimerFunc[Type any, Status StatusType](t time.Time) TimerFunc[Type, Status]
type TriggerOption ¶
type TriggerOption[Type any, Status StatusType] func(o *triggerOpts[Type, Status])
func WithInitialValue ¶
func WithInitialValue[Type any, Status StatusType](t *Type) TriggerOption[Type, Status]
type TypedRecord ¶ added in v0.2.0
type TypedRecord[Type any, Status StatusType] struct { Record Status Status Object *Type }
TypedRecord differs from Record in that it contains a Typed Object and Typed Status
type Workflow ¶
type Workflow[Type any, Status StatusType] struct { // contains filtered or unexported fields }
func (*Workflow[Type, Status]) Schedule ¶ added in v0.1.2
func (w *Workflow[Type, Status]) Schedule( foreignID string, startingStatus Status, spec string, opts ...ScheduleOption[Type, Status], ) error
Source Files
¶
- autopause.go
- await.go
- builder.go
- callback.go
- connector.go
- consumer.go
- delete.go
- errors.go
- event.go
- eventfilter.go
- eventstream.go
- filter.go
- hook.go
- logger.go
- marshal.go
- metrics.go
- options.go
- order.go
- outbox.go
- record.go
- rolescheduler.go
- run.go
- runstate.go
- schedule.go
- state.go
- status.go
- store.go
- testing.go
- timeout.go
- topic.go
- trigger.go
- unmarshal.go
- update.go
- visualiser.go
- workflow.go
Directories
¶
| Path | Synopsis |
|---|---|
|
_examples
|
|
|
callback
module
|
|
|
connector
module
|
|
|
gettingstarted
module
|
|
|
schedule
module
|
|
|
timeout
module
|
|
|
webui
module
|
|
|
adapters
|
|
|
jlog
module
|
|
|
kafkastreamer
module
|
|
|
reflexstreamer
module
|
|
|
rinkrolescheduler
module
|
|
|
sqlstore
module
|
|
|
sqltimeout
module
|
|
|
webui
module
|
|
|
examples
module
|
|
|
internal
|
|