Documentation ¶
Index ¶
- Constants
- Variables
- func CountBytesPrintWidth(b []byte) int
- func DiffDuration(start *time.Time, close *time.Time) string
- func ExecutionStatus(exec ExecutionState) string
- func FmtDuration(duration time.Duration) string
- func GetExitCode(exec *WorkflowExecutionState) int
- func GetFoldStatus(c *cli.Context) ([]enums.WorkflowExecutionStatus, error)
- func LineHeight(line []byte, maxWidth int) int
- func MoveCursorUp(lines int) string
- func NewTailBoxBoundBuffer(buf *bytes.Buffer, maxLines int, maxWidth int) (*bytes.Buffer, int)
- func PrintAndExit(writer *TermWriter, tmpl *ExecutionTemplate, update *WorkflowExecutionUpdate) (int, error)
- func PrintWorkflowSummary(c *cli.Context, sdkClient sdkclient.Client, wfId, runId string) error
- func PrintWorkflowTrace(c *cli.Context, sdkClient sdkclient.Client, wid, rid string, ...) (int, error)
- func ProgressString(currentEvents int64, totalEvents int64) string
- func ReverseLinesBuffer(buf *bytes.Buffer) [][]byte
- func ShouldFoldStatus(foldStatus []enums.WorkflowExecutionStatus, noFold bool) func(*WorkflowExecutionState, int) bool
- func StripBytesAnsi(b []byte) []byte
- type ActivityExecutionState
- func (state *ActivityExecutionState) GetAttempt() int32
- func (state *ActivityExecutionState) GetDuration() *time.Duration
- func (state *ActivityExecutionState) GetFailure() *failure.Failure
- func (state *ActivityExecutionState) GetName() string
- func (state *ActivityExecutionState) GetRetryState() enums.RetryState
- func (state *ActivityExecutionState) GetStartTime() *time.Time
- func (state *ActivityExecutionState) Update(event *history.HistoryEvent)
- type ActivityExecutionStatus
- type ExecutionState
- type ExecutionTemplate
- type Row
- type StateTemplate
- type StatusIcon
- type TermWriter
- func (w *TermWriter) Flush(trim bool) error
- func (w *TermWriter) GetSize() (int, int)
- func (w *TermWriter) WithSize(width, height int) *TermWriter
- func (w *TermWriter) WithTerminalSize() *TermWriter
- func (w *TermWriter) WithWriter(out io.Writer) *TermWriter
- func (w *TermWriter) Write(buf []byte) (n int, err error)
- func (w *TermWriter) WriteString(s string) (n int, err error)
- type TimerExecutionState
- func (t *TimerExecutionState) GetAttempt() int32
- func (t *TimerExecutionState) GetDuration() *time.Duration
- func (t *TimerExecutionState) GetFailure() *failure.Failure
- func (t *TimerExecutionState) GetName() string
- func (t *TimerExecutionState) GetRetryState() enums.RetryState
- func (t *TimerExecutionState) GetStartTime() *time.Time
- func (t *TimerExecutionState) Update(event *history.HistoryEvent)
- type TimerExecutionStatus
- type WorkflowExecutionState
- func (state *WorkflowExecutionState) GetAttempt() int32
- func (state *WorkflowExecutionState) GetChildWorkflowByEventId(initiatedEventId int64) (*WorkflowExecutionState, bool)
- func (state *WorkflowExecutionState) GetDuration() *time.Duration
- func (state *WorkflowExecutionState) GetFailure() *failure.Failure
- func (state *WorkflowExecutionState) GetName() string
- func (state *WorkflowExecutionState) GetNumberOfEvents() (int64, int64)
- func (state *WorkflowExecutionState) GetRetryState() enums.RetryState
- func (state *WorkflowExecutionState) GetStartTime() *time.Time
- func (state *WorkflowExecutionState) IsClosed() (bool, error)
- func (state *WorkflowExecutionState) Update(event *history.HistoryEvent)
- type WorkflowExecutionUpdate
- type WorkflowExecutionUpdateIterator
- type WorkflowExecutionUpdateIteratorImpl
- type WorkflowStateJob
Constants ¶
const ( AnsiMoveCursorUp = "\x1b[%dA" AnsiMoveCursorStartLine = "\x1b[0G" AnsiEraseToEnd = "\x1b[0J" )
const (
MinFoldingDepth = 1
)
Variables ¶
var ( StatusRunning string = color.BlueString("▷") StatusCompleted string = color.GreenString("✓") StatusTerminated string = color.RedString("x") StatusCanceled string = color.YellowString("x") StatusFailed string = color.RedString("!") StatusContinueAsNew string = color.GreenString("»") StatusTimedOut string = color.RedString("⏱") StatusUnspecifiedScheduled string = "•" StatusCancelRequested string = color.YellowString("▷") StatusTimerWaiting string = color.BlueString("⧖") StatusTimerFired string = color.GreenString("⧖") StatusTimerCanceled string = color.YellowString("⧖") )
var StatusIconsLegend = []StatusIcon{ { Name: "Unspecified or Scheduled", Icon: StatusUnspecifiedScheduled, }, { Name: "Running", Icon: StatusRunning, }, { Name: "Completed", Icon: StatusCompleted, }, { Name: "Continue As New", Icon: StatusContinueAsNew, }, { Name: "Failed", Icon: StatusFailed, }, { Name: "Timed Out", Icon: StatusTimedOut, }, { Name: "Cancel Requested", Icon: StatusCancelRequested, }, { Name: "Canceled", Icon: StatusCanceled, }, { Name: "Terminated", Icon: StatusTerminated, }, }
Functions ¶
func CountBytesPrintWidth ¶ added in v0.10.0
CountBytesPrintWidth counts the number of printed characters a byte array will take.
func DiffDuration ¶ added in v0.10.0
DiffDuration returns a string representing the difference it time between start and close (or start and now).
func ExecutionStatus ¶ added in v0.10.0
func ExecutionStatus(exec ExecutionState) string
ExecutionStatus returns the icon (with color) for a given ExecutionState's status.
func FmtDuration ¶ added in v0.10.0
FmtDuration produces a string for a given duration, rounding to the most reasonable timeframe.
func GetExitCode ¶ added in v0.9.0
func GetExitCode(exec *WorkflowExecutionState) int
GetExitCode returns the exit code for a given workflow execution status.
func GetFoldStatus ¶ added in v0.9.0
func GetFoldStatus(c *cli.Context) ([]enums.WorkflowExecutionStatus, error)
func LineHeight ¶ added in v0.10.0
LineHeight returns the number of lines a string is going to take
func MoveCursorUp ¶ added in v0.10.0
func NewTailBoxBoundBuffer ¶ added in v0.10.0
NewTailBoxBoundBuffer returns trims a buffer to fit into the box defined by maxLines and maxWidth and the number of lines printing the buffer will take. For no limit on lines, use maxLines = 0. NOTE: This is a best guess. It'll take ansi codes into account but some other chars might throw this off.
func PrintAndExit ¶ added in v0.10.0
func PrintAndExit(writer *TermWriter, tmpl *ExecutionTemplate, update *WorkflowExecutionUpdate) (int, error)
func PrintWorkflowSummary ¶ added in v0.11.0
func PrintWorkflowTrace ¶ added in v0.10.0
func PrintWorkflowTrace(c *cli.Context, sdkClient sdkclient.Client, wid, rid string, foldStatus []enums.WorkflowExecutionStatus) (int, error)
PrintWorkflowTrace prints and updates a workflow trace following printWorkflowProgress pattern
func ProgressString ¶ added in v0.10.0
func ReverseLinesBuffer ¶ added in v0.10.0
func ShouldFoldStatus ¶ added in v0.10.0
func ShouldFoldStatus(foldStatus []enums.WorkflowExecutionStatus, noFold bool) func(*WorkflowExecutionState, int) bool
ShouldFoldStatus returns a predicate that will return true when the workflow status can be folded for a given depth. NOTE: Depth starts at 0 (i.e. the root workflow will be at depth 0).
func StripBytesAnsi ¶ added in v0.10.0
StripBytesAnsi removes all ansi codes from a byte array.
Types ¶
type ActivityExecutionState ¶
type ActivityExecutionState struct { // ActivityId is the Activity's id, which will usually be the EventId of the Event it was scheduled with. ActivityId string // Status is the Execution's Status based on the last event that was processed. Status ActivityExecutionStatus // Type is the name/type of Activity. Type *common.ActivityType // Attempt contains the current Activity Execution's attempt. // Since Activities' events aren't reported until the Activity is closed, this will always be the last attempt. Attempt int32 // Failure contains the last failure that the Execution has reported (if any). Failure *failure.Failure // RetryState contains the reason provided for whether the Task should or shouldn't be retried. RetryState enums.RetryState // StartTime is the time the Execution was started (based on the start Event). StartTime *time.Time // CloseTime is the time the Execution was closed (based on the closing Event). Will be nil if the Execution hasn't been closed yet. CloseTime *time.Time }
ActivityExecutionState is a snapshot of the state of an Activity's Execution. It implements the ExecutionState interface so it can be referenced as a WorkflowExecutionState's child state.
func NewActivityExecutionState ¶
func NewActivityExecutionState() *ActivityExecutionState
func (*ActivityExecutionState) GetAttempt ¶
func (state *ActivityExecutionState) GetAttempt() int32
func (*ActivityExecutionState) GetDuration ¶
func (state *ActivityExecutionState) GetDuration() *time.Duration
func (*ActivityExecutionState) GetFailure ¶
func (state *ActivityExecutionState) GetFailure() *failure.Failure
func (*ActivityExecutionState) GetName ¶
func (state *ActivityExecutionState) GetName() string
func (*ActivityExecutionState) GetRetryState ¶
func (state *ActivityExecutionState) GetRetryState() enums.RetryState
func (*ActivityExecutionState) GetStartTime ¶
func (state *ActivityExecutionState) GetStartTime() *time.Time
func (*ActivityExecutionState) Update ¶
func (state *ActivityExecutionState) Update(event *history.HistoryEvent)
Update updates the ActivityExecutionState with a HistoryEvent.
type ActivityExecutionStatus ¶
type ActivityExecutionStatus int32
ActivityExecutionStatus is the Status of an ActivityExecution, analogous to enums.WorkflowExecutionStatus.
var ( ACTIVITY_EXECUTION_STATUS_UNSPECIFIED ActivityExecutionStatus = 0 ACTIVITY_EXECUTION_STATUS_SCHEDULED ActivityExecutionStatus = 1 ACTIVITY_EXECUTION_STATUS_RUNNING ActivityExecutionStatus = 2 ACTIVITY_EXECUTION_STATUS_COMPLETED ActivityExecutionStatus = 3 ACTIVITY_EXECUTION_STATUS_FAILED ActivityExecutionStatus = 4 ACTIVITY_EXECUTION_STATUS_TIMED_OUT ActivityExecutionStatus = 5 ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED ActivityExecutionStatus = 6 ACTIVITY_EXECUTION_STATUS_CANCELED ActivityExecutionStatus = 7 )
type ExecutionState ¶
type ExecutionState interface { // Update updates an ExecutionState with a new HistoryEvent. Update(*history.HistoryEvent) // GetName returns the state's name (usually for displaying to the user). GetName() string // GetAttempt returns the attempts to execute the current ExecutionState. GetAttempt() int32 // GetFailure returns the execution's failure (if any). GetFailure() *failure.Failure // GetRetryState returns the execution's RetryState. GetRetryState() enums.RetryState GetDuration() *time.Duration GetStartTime() *time.Time }
ExecutionState provides a common interface to any execution (Workflows, Activities and Timers in this case) updated through HistoryEvents.
type ExecutionTemplate ¶ added in v0.10.0
type ExecutionTemplate struct {
// contains filtered or unexported fields
}
ExecutionTemplate contains the necessary templates and utilities to render WorkflowExecutionState and its child states.
func NewExecutionTemplate ¶ added in v0.10.0
func NewExecutionTemplate(w io.Writer, foldStatus []enums.WorkflowExecutionStatus, noFold bool) (*ExecutionTemplate, error)
NewExecutionTemplate initializes the templates with the necessary functions.
func (*ExecutionTemplate) Execute ¶ added in v0.10.0
func (t *ExecutionTemplate) Execute(state ExecutionState, depth int) error
Execute executes the templates for a given Execution state and writes it into the ExecutionTemplate's writer.
type StateTemplate ¶ added in v0.10.0
type StateTemplate struct { State ExecutionState Depth int }
type StatusIcon ¶ added in v0.10.0
StatusIcon has names for each status (useful for help messages).
type TermWriter ¶ added in v0.10.0
type TermWriter struct {
// contains filtered or unexported fields
}
func NewTermWriter ¶ added in v0.10.0
func NewTermWriter() *TermWriter
NewTermWriter returns a new TermWriter set to output to Stdout. TermWriter is a stateful writer designed to print into a terminal window by limiting the number of lines printed what fits and clearing them on new outputs.
func (*TermWriter) Flush ¶ added in v0.10.0
func (w *TermWriter) Flush(trim bool) error
Flush writes to the out and resets the buffer. It should be called after the last call to Write to ensure that any data buffered in the TermWriter is written to output. Any incomplete escape sequence at the end is considered complete for formatting purposes. An error is returned if the contents of the buffer cannot be written to the underlying output stream.
func (*TermWriter) GetSize ¶ added in v0.10.0
func (w *TermWriter) GetSize() (int, int)
func (*TermWriter) WithSize ¶ added in v0.10.0
func (w *TermWriter) WithSize(width, height int) *TermWriter
WithSize sets the size of TermWriter to the desired width and height.
func (*TermWriter) WithTerminalSize ¶ added in v0.10.0
func (w *TermWriter) WithTerminalSize() *TermWriter
WithTerminalSize sets the size of TermWriter to that of the terminal.
func (*TermWriter) WithWriter ¶ added in v0.10.0
func (w *TermWriter) WithWriter(out io.Writer) *TermWriter
WithWriter sets the writer for TermWriter.
func (*TermWriter) Write ¶ added in v0.10.0
func (w *TermWriter) Write(buf []byte) (n int, err error)
Write save the contents of buf to the writer b. The only errors returned are ones encountered while writing to the underlying buffer. TODO: Consider merging it into Flush since we might always want to write and flush. Alternatively, we can pass the writer to the Sprint functions and write (but we might run into issues if the normal writing is interrupted and the interrupt writing starts).
func (*TermWriter) WriteString ¶ added in v0.10.0
func (w *TermWriter) WriteString(s string) (n int, err error)
WriteString writes a string into TermWriter.
type TimerExecutionState ¶
type TimerExecutionState struct { TimerId string // Name is the name of the Timer (if any has been given to it) Name string // StartToFireTimeout is the amount of time to elapse before the timer fires. StartToFireTimeout *time.Duration // Status is the Execution's Status based on the last event that was processed. Status TimerExecutionStatus // StartTime is the time the Execution was started (based on the start Event). StartTime *time.Time // CloseTime is the time the Execution was closed (based on the closing Event). Will be nil if the Execution hasn't been closed yet. CloseTime *time.Time }
TimerExecutionState contains information about a Timer as an execution. It implements the ExecutionState interface so it can be referenced as a WorkflowExecutionState's child state.
func (*TimerExecutionState) GetAttempt ¶
func (t *TimerExecutionState) GetAttempt() int32
func (*TimerExecutionState) GetDuration ¶
func (t *TimerExecutionState) GetDuration() *time.Duration
func (*TimerExecutionState) GetFailure ¶
func (t *TimerExecutionState) GetFailure() *failure.Failure
func (*TimerExecutionState) GetName ¶
func (t *TimerExecutionState) GetName() string
func (*TimerExecutionState) GetRetryState ¶
func (t *TimerExecutionState) GetRetryState() enums.RetryState
GetRetryState will always return RETRY_STATE_UNSPECIFIED since Timers don't retry.
func (*TimerExecutionState) GetStartTime ¶
func (t *TimerExecutionState) GetStartTime() *time.Time
func (*TimerExecutionState) Update ¶
func (t *TimerExecutionState) Update(event *history.HistoryEvent)
Update updates the TimerExecutionState with a HistoryEvent.
type TimerExecutionStatus ¶
type TimerExecutionStatus int32
TimerExecutionStatus is the Status of a TimerExecution, analogous to enums.WorkflowExecutionStatus.
var ( TIMER_STATUS_WAITING TimerExecutionStatus = 0 TIMER_STATUS_FIRED TimerExecutionStatus = 1 TIMER_STATUS_CANCELED TimerExecutionStatus = 2 )
type WorkflowExecutionState ¶
type WorkflowExecutionState struct { // Execution is the workflow's execution (WorkflowId and RunId). Execution *common.WorkflowExecution // Type is the name/type of Workflow. Type *common.WorkflowType // StartTime is the time the Execution was started (based on the first Execution's Event). StartTime *time.Time // CloseTime is the time the Execution was closed (based on the first Execution's Event). Will be nil if the Execution hasn't been closed yet. CloseTime *time.Time // Status is the Execution's Status based on the last event that was processed. Status enums.WorkflowExecutionStatus // IsArchived will be true if the workflow has been archived. IsArchived bool // LastEventId is the EventId of the last processed HistoryEvent. LastEventId int64 // HistoryLength is the number of HistoryEvents available in the server. It will zero for archived workflows and non-zero positive for any other workflow executions. HistoryLength int64 // ChildStates contains all the ExecutionStates contained by this WorkflowExecutionState in order of execution. ChildStates []ExecutionState // Non-successful closed states // Failure contains the last failure that the Execution has reported (if any). Failure *failure.Failure // Termination contains the last available termination information that the Workflow Execution has reported (if any). Termination *history.WorkflowExecutionTerminatedEventAttributes // CancelRequest contains the last request that has been made to cancel the Workflow Execution (if any). CancelRequest *history.WorkflowExecutionCancelRequestedEventAttributes // RetryState contains the reason provided for whether the Task should or shouldn't be retried. RetryState enums.RetryState // Timeout and retry policies // WorkflowExecutionTimeout contains the Workflow Execution's timeout if it has been set. WorkflowExecutionTimeout *time.Duration // Attempt contains the current Workflow Execution's attempt. Attempt int32 // MaximumAttempts contains the maximum number of times the Workflow Execution is allowed to retry before failing. MaximumAttempts int32 // ParentWorkflowExecution identifies the parent Workflow and the execution run. ParentWorkflowExecution *common.WorkflowExecution // contains filtered or unexported fields }
WorkflowExecutionState is a snapshot of the state of a WorkflowExecution. It is updated through HistoryEvents.
func NewWorkflowExecutionState ¶
func NewWorkflowExecutionState(wfId, runId string) *WorkflowExecutionState
func (*WorkflowExecutionState) GetAttempt ¶
func (state *WorkflowExecutionState) GetAttempt() int32
func (*WorkflowExecutionState) GetChildWorkflowByEventId ¶ added in v0.9.0
func (state *WorkflowExecutionState) GetChildWorkflowByEventId(initiatedEventId int64) (*WorkflowExecutionState, bool)
GetChildWorkflowByEventId returns a child workflow for a given initiated event id
func (*WorkflowExecutionState) GetDuration ¶
func (state *WorkflowExecutionState) GetDuration() *time.Duration
func (*WorkflowExecutionState) GetFailure ¶
func (state *WorkflowExecutionState) GetFailure() *failure.Failure
func (*WorkflowExecutionState) GetName ¶
func (state *WorkflowExecutionState) GetName() string
func (*WorkflowExecutionState) GetNumberOfEvents ¶
func (state *WorkflowExecutionState) GetNumberOfEvents() (int64, int64)
GetNumberOfEvents returns a count of the number of events processed and the total for a workflow execution. This method iteratively sums the LastEventId (the sequential id of the last event processed) and the HistoryLength for all child workflows
func (*WorkflowExecutionState) GetRetryState ¶
func (state *WorkflowExecutionState) GetRetryState() enums.RetryState
func (*WorkflowExecutionState) GetStartTime ¶
func (state *WorkflowExecutionState) GetStartTime() *time.Time
func (*WorkflowExecutionState) IsClosed ¶ added in v0.9.0
func (state *WorkflowExecutionState) IsClosed() (bool, error)
IsClosed returns true when the Workflow Execution is closed. A Closed status means that the Workflow Execution cannot make further progress.
func (*WorkflowExecutionState) Update ¶
func (state *WorkflowExecutionState) Update(event *history.HistoryEvent)
Update updates the WorkflowExecutionState and its child states with a HistoryEvent.
type WorkflowExecutionUpdate ¶ added in v0.9.0
type WorkflowExecutionUpdate struct {
State *WorkflowExecutionState
}
func (*WorkflowExecutionUpdate) GetState ¶ added in v0.9.0
func (update *WorkflowExecutionUpdate) GetState() *WorkflowExecutionState
type WorkflowExecutionUpdateIterator ¶ added in v0.9.0
type WorkflowExecutionUpdateIterator interface { HasNext() bool Next() (*WorkflowExecutionUpdate, error) }
WorkflowExecutionUpdateIterator is the interface the provides iterative updates, analogous to the HistoryEventIterator interface.
func GetWorkflowExecutionUpdates ¶ added in v0.9.0
func GetWorkflowExecutionUpdates(ctx context.Context, client sdkclient.Client, wfId, runId string, fetchAll bool, foldStatus []enums.WorkflowExecutionStatus, depth int, concurrency int) (WorkflowExecutionUpdateIterator, error)
GetWorkflowExecutionUpdates gets workflow execution updates for a particular workflow - workflow ID of the workflow - runID can be default (empty string) - depth of child workflows to request updates for (-1 for unlimited depth) - concurrency of requests (non-zero positive integer) Returns iterator (see client.GetWorkflowHistory) that provides updated WorkflowExecutionState snapshots. Example: To print a workflow's state whenever there's updates
iter := GetWorkflowExecutionUpdates(ctx, client, wfId, runId, -1, 5) var state *WorkflowExecutionState for iter.HasNext() { update = iter.Next() PrintWorkflowState(update.State) }
type WorkflowExecutionUpdateIteratorImpl ¶ added in v0.9.0
type WorkflowExecutionUpdateIteratorImpl struct {
// contains filtered or unexported fields
}
WorkflowExecutionUpdateIteratorImpl implements the iterator interface. Keeps information about the last processed update and receives new updates through the updateChan channel.
func (*WorkflowExecutionUpdateIteratorImpl) HasNext ¶ added in v0.9.0
func (iter *WorkflowExecutionUpdateIteratorImpl) HasNext() bool
HasNext checks if there's any more updates in the updateChan channel. Returns false if the channel has been closed.
func (*WorkflowExecutionUpdateIteratorImpl) Next ¶ added in v0.9.0
func (iter *WorkflowExecutionUpdateIteratorImpl) Next() (*WorkflowExecutionUpdate, error)
Next return the last processed execution update. HasNext has to be called first (following the HasNext/Next pattern).
type WorkflowStateJob ¶ added in v0.9.0
type WorkflowStateJob struct {
// contains filtered or unexported fields
}
WorkflowStateJob implements a WorkerJob to retrieve updates for a WorkflowExecutionState and its child workflows.
func NewWorkflowStateJob ¶ added in v0.9.0
func NewWorkflowStateJob(ctx context.Context, client sdkclient.Client, state *WorkflowExecutionState, fetchAll bool, foldStatus []enums.WorkflowExecutionStatus, depth int, updateChan chan struct{}) (*WorkflowStateJob, error)
NewWorkflowStateJob returns a new WorkflowStateJob. It requires an updateChan to signal when there's updates.
func (*WorkflowStateJob) GetChildJob ¶ added in v0.9.0
func (job *WorkflowStateJob) GetChildJob(event *history.HistoryEvent) (*WorkflowStateJob, error)
GetChildJob gets a new child job and appends it to the list of childJobs. These jobs don't start until the parent is catched up.
func (*WorkflowStateJob) Run ¶ added in v0.9.0
func (job *WorkflowStateJob) Run(group *pond.TaskGroupWithContext) func() error
Run starts the WorkflowStateJob, which retrieves the workflow's events and spawns new jobs for the child workflows once it's up-to-date. New jobs are submitted to the pool when the job is up-to-date to reduce the amount of unnecessary history fetches (e.g. when the child workflow is already completed).
func (*WorkflowStateJob) ShouldStart ¶ added in v0.9.0
func (job *WorkflowStateJob) ShouldStart() bool
ShouldStart will return true if the state is in a status that requires requesting its event history. This will help reduce the amount of event histories requested when they're not needed.