trace

package
v0.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 31, 2024 License: MIT Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	AnsiMoveCursorUp        = "\x1b[%dA"
	AnsiMoveCursorStartLine = "\x1b[0G"
	AnsiEraseToEnd          = "\x1b[0J"
)
View Source
const (
	MinFoldingDepth = 1
)

Variables

View Source
var (
	StatusRunning              string = color.BlueString("▷")
	StatusCompleted            string = color.GreenString("✓")
	StatusTerminated           string = color.RedString("x")
	StatusCanceled             string = color.YellowString("x")
	StatusFailed               string = color.RedString("!")
	StatusContinueAsNew        string = color.GreenString("»")
	StatusTimedOut             string = color.RedString("⏱")
	StatusUnspecifiedScheduled string = "•"
	StatusCancelRequested      string = color.YellowString("▷")
	StatusTimerWaiting         string = color.BlueString("⧖")
	StatusTimerFired           string = color.GreenString("⧖")
	StatusTimerCanceled        string = color.YellowString("⧖")
)
View Source
var StatusIconsLegend = []StatusIcon{
	{
		Name: "Unspecified or Scheduled", Icon: StatusUnspecifiedScheduled,
	},
	{
		Name: "Running", Icon: StatusRunning,
	},
	{
		Name: "Completed", Icon: StatusCompleted,
	},
	{
		Name: "Continue As New", Icon: StatusContinueAsNew,
	},
	{
		Name: "Failed", Icon: StatusFailed,
	},
	{
		Name: "Timed Out", Icon: StatusTimedOut,
	},
	{
		Name: "Cancel Requested", Icon: StatusCancelRequested,
	},
	{
		Name: "Canceled", Icon: StatusCanceled,
	},
	{
		Name: "Terminated", Icon: StatusTerminated,
	},
}

Functions

func CountBytesPrintWidth added in v0.10.0

func CountBytesPrintWidth(b []byte) int

CountBytesPrintWidth counts the number of printed characters a byte array will take.

func DiffDuration added in v0.10.0

func DiffDuration(start *time.Time, close *time.Time) string

DiffDuration returns a string representing the difference it time between start and close (or start and now).

func ExecutionStatus added in v0.10.0

func ExecutionStatus(exec ExecutionState) string

ExecutionStatus returns the icon (with color) for a given ExecutionState's status.

func FmtDuration added in v0.10.0

func FmtDuration(duration time.Duration) string

FmtDuration produces a string for a given duration, rounding to the most reasonable timeframe.

func GetExitCode added in v0.9.0

func GetExitCode(exec *WorkflowExecutionState) int

GetExitCode returns the exit code for a given workflow execution status.

func GetFoldStatus added in v0.9.0

func GetFoldStatus(c *cli.Context) ([]enums.WorkflowExecutionStatus, error)

func LineHeight added in v0.10.0

func LineHeight(line []byte, maxWidth int) int

LineHeight returns the number of lines a string is going to take

func MoveCursorUp added in v0.10.0

func MoveCursorUp(lines int) string

func NewTailBoxBoundBuffer added in v0.10.0

func NewTailBoxBoundBuffer(buf *bytes.Buffer, maxLines int, maxWidth int) (*bytes.Buffer, int)

NewTailBoxBoundBuffer returns trims a buffer to fit into the box defined by maxLines and maxWidth and the number of lines printing the buffer will take. For no limit on lines, use maxLines = 0. NOTE: This is a best guess. It'll take ansi codes into account but some other chars might throw this off.

func PrintAndExit added in v0.10.0

func PrintAndExit(writer *TermWriter, tmpl *ExecutionTemplate, update *WorkflowExecutionUpdate) (int, error)

func PrintWorkflowSummary added in v0.11.0

func PrintWorkflowSummary(c *cli.Context, sdkClient sdkclient.Client, wfId, runId string) error

func PrintWorkflowTrace added in v0.10.0

func PrintWorkflowTrace(c *cli.Context, sdkClient sdkclient.Client, wid, rid string, foldStatus []enums.WorkflowExecutionStatus) (int, error)

PrintWorkflowTrace prints and updates a workflow trace following printWorkflowProgress pattern

func ProgressString added in v0.10.0

func ProgressString(currentEvents int64, totalEvents int64) string

func ReverseLinesBuffer added in v0.10.0

func ReverseLinesBuffer(buf *bytes.Buffer) [][]byte

func ShouldFoldStatus added in v0.10.0

func ShouldFoldStatus(foldStatus []enums.WorkflowExecutionStatus, noFold bool) func(*WorkflowExecutionState, int) bool

ShouldFoldStatus returns a predicate that will return true when the workflow status can be folded for a given depth. NOTE: Depth starts at 0 (i.e. the root workflow will be at depth 0).

func StripBytesAnsi added in v0.10.0

func StripBytesAnsi(b []byte) []byte

StripBytesAnsi removes all ansi codes from a byte array.

Types

type ActivityExecutionState

type ActivityExecutionState struct {
	// ActivityId is the Activity's id, which will usually be the EventId of the Event it was scheduled with.
	ActivityId string
	// Status is the Execution's Status based on the last event that was processed.
	Status ActivityExecutionStatus
	// Type is the name/type of Activity.
	Type *common.ActivityType
	// Attempt contains the current Activity Execution's attempt.
	// Since Activities' events aren't reported until the Activity is closed, this will always be the last attempt.
	Attempt int32
	// Failure contains the last failure that the Execution has reported (if any).
	Failure *failure.Failure
	// RetryState contains the reason provided for whether the Task should or shouldn't be retried.
	RetryState enums.RetryState

	// StartTime is the time the Execution was started (based on the start Event).
	StartTime *time.Time
	// CloseTime is the time the Execution was closed (based on the closing Event). Will be nil if the Execution hasn't been closed yet.
	CloseTime *time.Time
}

ActivityExecutionState is a snapshot of the state of an Activity's Execution. It implements the ExecutionState interface so it can be referenced as a WorkflowExecutionState's child state.

func NewActivityExecutionState

func NewActivityExecutionState() *ActivityExecutionState

func (*ActivityExecutionState) GetAttempt

func (state *ActivityExecutionState) GetAttempt() int32

func (*ActivityExecutionState) GetDuration

func (state *ActivityExecutionState) GetDuration() *time.Duration

func (*ActivityExecutionState) GetFailure

func (state *ActivityExecutionState) GetFailure() *failure.Failure

func (*ActivityExecutionState) GetName

func (state *ActivityExecutionState) GetName() string

func (*ActivityExecutionState) GetRetryState

func (state *ActivityExecutionState) GetRetryState() enums.RetryState

func (*ActivityExecutionState) GetStartTime

func (state *ActivityExecutionState) GetStartTime() *time.Time

func (*ActivityExecutionState) Update

func (state *ActivityExecutionState) Update(event *history.HistoryEvent)

Update updates the ActivityExecutionState with a HistoryEvent.

type ActivityExecutionStatus

type ActivityExecutionStatus int32

ActivityExecutionStatus is the Status of an ActivityExecution, analogous to enums.WorkflowExecutionStatus.

var (
	ACTIVITY_EXECUTION_STATUS_UNSPECIFIED      ActivityExecutionStatus = 0
	ACTIVITY_EXECUTION_STATUS_SCHEDULED        ActivityExecutionStatus = 1
	ACTIVITY_EXECUTION_STATUS_RUNNING          ActivityExecutionStatus = 2
	ACTIVITY_EXECUTION_STATUS_COMPLETED        ActivityExecutionStatus = 3
	ACTIVITY_EXECUTION_STATUS_FAILED           ActivityExecutionStatus = 4
	ACTIVITY_EXECUTION_STATUS_TIMED_OUT        ActivityExecutionStatus = 5
	ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED ActivityExecutionStatus = 6
	ACTIVITY_EXECUTION_STATUS_CANCELED         ActivityExecutionStatus = 7
)

type ExecutionState

type ExecutionState interface {
	// Update updates an ExecutionState with a new HistoryEvent.
	Update(*history.HistoryEvent)
	// GetName returns the state's name (usually for displaying to the user).
	GetName() string
	// GetAttempt returns the attempts to execute the current ExecutionState.
	GetAttempt() int32
	// GetFailure returns the execution's failure (if any).
	GetFailure() *failure.Failure
	// GetRetryState returns the execution's RetryState.
	GetRetryState() enums.RetryState

	GetDuration() *time.Duration
	GetStartTime() *time.Time
}

ExecutionState provides a common interface to any execution (Workflows, Activities and Timers in this case) updated through HistoryEvents.

type ExecutionTemplate added in v0.10.0

type ExecutionTemplate struct {
	// contains filtered or unexported fields
}

ExecutionTemplate contains the necessary templates and utilities to render WorkflowExecutionState and its child states.

func NewExecutionTemplate added in v0.10.0

func NewExecutionTemplate(w io.Writer, foldStatus []enums.WorkflowExecutionStatus, noFold bool) (*ExecutionTemplate, error)

NewExecutionTemplate initializes the templates with the necessary functions.

func (*ExecutionTemplate) Execute added in v0.10.0

func (t *ExecutionTemplate) Execute(state ExecutionState, depth int) error

Execute executes the templates for a given Execution state and writes it into the ExecutionTemplate's writer.

type Row added in v0.11.0

type Row struct {
	Key   string
	Value string
}

type StateTemplate added in v0.10.0

type StateTemplate struct {
	State ExecutionState
	Depth int
}

type StatusIcon added in v0.10.0

type StatusIcon struct {
	Name string
	Icon string
}

StatusIcon has names for each status (useful for help messages).

type TermWriter added in v0.10.0

type TermWriter struct {
	// contains filtered or unexported fields
}

func NewTermWriter added in v0.10.0

func NewTermWriter() *TermWriter

NewTermWriter returns a new TermWriter set to output to Stdout. TermWriter is a stateful writer designed to print into a terminal window by limiting the number of lines printed what fits and clearing them on new outputs.

func (*TermWriter) Flush added in v0.10.0

func (w *TermWriter) Flush(trim bool) error

Flush writes to the out and resets the buffer. It should be called after the last call to Write to ensure that any data buffered in the TermWriter is written to output. Any incomplete escape sequence at the end is considered complete for formatting purposes. An error is returned if the contents of the buffer cannot be written to the underlying output stream.

func (*TermWriter) GetSize added in v0.10.0

func (w *TermWriter) GetSize() (int, int)

func (*TermWriter) WithSize added in v0.10.0

func (w *TermWriter) WithSize(width, height int) *TermWriter

WithSize sets the size of TermWriter to the desired width and height.

func (*TermWriter) WithTerminalSize added in v0.10.0

func (w *TermWriter) WithTerminalSize() *TermWriter

WithTerminalSize sets the size of TermWriter to that of the terminal.

func (*TermWriter) WithWriter added in v0.10.0

func (w *TermWriter) WithWriter(out io.Writer) *TermWriter

WithWriter sets the writer for TermWriter.

func (*TermWriter) Write added in v0.10.0

func (w *TermWriter) Write(buf []byte) (n int, err error)

Write save the contents of buf to the writer b. The only errors returned are ones encountered while writing to the underlying buffer. TODO: Consider merging it into Flush since we might always want to write and flush. Alternatively, we can pass the writer to the Sprint functions and write (but we might run into issues if the normal writing is interrupted and the interrupt writing starts).

func (*TermWriter) WriteString added in v0.10.0

func (w *TermWriter) WriteString(s string) (n int, err error)

WriteString writes a string into TermWriter.

type TimerExecutionState

type TimerExecutionState struct {
	TimerId string
	// Name is the name of the Timer (if any has been given to it)
	Name string
	// StartToFireTimeout is the amount of time to elapse before the timer fires.
	StartToFireTimeout *time.Duration
	// Status is the Execution's Status based on the last event that was processed.
	Status TimerExecutionStatus
	// StartTime is the time the Execution was started (based on the start Event).
	StartTime *time.Time
	// CloseTime is the time the Execution was closed (based on the closing Event). Will be nil if the Execution hasn't been closed yet.
	CloseTime *time.Time
}

TimerExecutionState contains information about a Timer as an execution. It implements the ExecutionState interface so it can be referenced as a WorkflowExecutionState's child state.

func (*TimerExecutionState) GetAttempt

func (t *TimerExecutionState) GetAttempt() int32

func (*TimerExecutionState) GetDuration

func (t *TimerExecutionState) GetDuration() *time.Duration

func (*TimerExecutionState) GetFailure

func (t *TimerExecutionState) GetFailure() *failure.Failure

func (*TimerExecutionState) GetName

func (t *TimerExecutionState) GetName() string

func (*TimerExecutionState) GetRetryState

func (t *TimerExecutionState) GetRetryState() enums.RetryState

GetRetryState will always return RETRY_STATE_UNSPECIFIED since Timers don't retry.

func (*TimerExecutionState) GetStartTime

func (t *TimerExecutionState) GetStartTime() *time.Time

func (*TimerExecutionState) Update

func (t *TimerExecutionState) Update(event *history.HistoryEvent)

Update updates the TimerExecutionState with a HistoryEvent.

type TimerExecutionStatus

type TimerExecutionStatus int32

TimerExecutionStatus is the Status of a TimerExecution, analogous to enums.WorkflowExecutionStatus.

var (
	TIMER_STATUS_WAITING  TimerExecutionStatus = 0
	TIMER_STATUS_FIRED    TimerExecutionStatus = 1
	TIMER_STATUS_CANCELED TimerExecutionStatus = 2
)

type WorkflowExecutionState

type WorkflowExecutionState struct {
	// Execution is the workflow's execution (WorkflowId and RunId).
	Execution *common.WorkflowExecution
	// Type is the name/type of Workflow.
	Type *common.WorkflowType
	// StartTime is the time the Execution was started (based on the first Execution's Event).
	StartTime *time.Time
	// CloseTime is the time the Execution was closed (based on the first Execution's Event). Will be nil if the Execution hasn't been closed yet.
	CloseTime *time.Time
	// Status is the Execution's Status based on the last event that was processed.
	Status enums.WorkflowExecutionStatus
	// IsArchived will be true if the workflow has been archived.
	IsArchived bool

	// LastEventId is the EventId of the last processed HistoryEvent.
	LastEventId int64
	// HistoryLength is the number of HistoryEvents available in the server. It will zero for archived workflows and non-zero positive for any other workflow executions.
	HistoryLength int64

	// ChildStates contains all the ExecutionStates contained by this WorkflowExecutionState in order of execution.
	ChildStates []ExecutionState

	// Non-successful closed states
	// Failure contains the last failure that the Execution has reported (if any).
	Failure *failure.Failure
	// Termination contains the last available termination information that the Workflow Execution has reported (if any).
	Termination *history.WorkflowExecutionTerminatedEventAttributes
	// CancelRequest contains the last request that has been made to cancel the Workflow Execution (if any).
	CancelRequest *history.WorkflowExecutionCancelRequestedEventAttributes
	// RetryState contains the reason provided for whether the Task should or shouldn't be retried.
	RetryState enums.RetryState

	// Timeout and retry policies
	// WorkflowExecutionTimeout contains the Workflow Execution's timeout if it has been set.
	WorkflowExecutionTimeout *time.Duration
	// Attempt contains the current Workflow Execution's attempt.
	Attempt int32
	// MaximumAttempts contains the maximum number of times the Workflow Execution is allowed to retry before failing.
	MaximumAttempts int32

	// ParentWorkflowExecution identifies the parent Workflow and the execution run.
	ParentWorkflowExecution *common.WorkflowExecution
	// contains filtered or unexported fields
}

WorkflowExecutionState is a snapshot of the state of a WorkflowExecution. It is updated through HistoryEvents.

func NewWorkflowExecutionState

func NewWorkflowExecutionState(wfId, runId string) *WorkflowExecutionState

func (*WorkflowExecutionState) GetAttempt

func (state *WorkflowExecutionState) GetAttempt() int32

func (*WorkflowExecutionState) GetChildWorkflowByEventId added in v0.9.0

func (state *WorkflowExecutionState) GetChildWorkflowByEventId(initiatedEventId int64) (*WorkflowExecutionState, bool)

GetChildWorkflowByEventId returns a child workflow for a given initiated event id

func (*WorkflowExecutionState) GetDuration

func (state *WorkflowExecutionState) GetDuration() *time.Duration

func (*WorkflowExecutionState) GetFailure

func (state *WorkflowExecutionState) GetFailure() *failure.Failure

func (*WorkflowExecutionState) GetName

func (state *WorkflowExecutionState) GetName() string

func (*WorkflowExecutionState) GetNumberOfEvents

func (state *WorkflowExecutionState) GetNumberOfEvents() (int64, int64)

GetNumberOfEvents returns a count of the number of events processed and the total for a workflow execution. This method iteratively sums the LastEventId (the sequential id of the last event processed) and the HistoryLength for all child workflows

func (*WorkflowExecutionState) GetRetryState

func (state *WorkflowExecutionState) GetRetryState() enums.RetryState

func (*WorkflowExecutionState) GetStartTime

func (state *WorkflowExecutionState) GetStartTime() *time.Time

func (*WorkflowExecutionState) IsClosed added in v0.9.0

func (state *WorkflowExecutionState) IsClosed() (bool, error)

IsClosed returns true when the Workflow Execution is closed. A Closed status means that the Workflow Execution cannot make further progress.

func (*WorkflowExecutionState) Update

func (state *WorkflowExecutionState) Update(event *history.HistoryEvent)

Update updates the WorkflowExecutionState and its child states with a HistoryEvent.

type WorkflowExecutionUpdate added in v0.9.0

type WorkflowExecutionUpdate struct {
	State *WorkflowExecutionState
}

func (*WorkflowExecutionUpdate) GetState added in v0.9.0

func (update *WorkflowExecutionUpdate) GetState() *WorkflowExecutionState

type WorkflowExecutionUpdateIterator added in v0.9.0

type WorkflowExecutionUpdateIterator interface {
	HasNext() bool
	Next() (*WorkflowExecutionUpdate, error)
}

WorkflowExecutionUpdateIterator is the interface the provides iterative updates, analogous to the HistoryEventIterator interface.

func GetWorkflowExecutionUpdates added in v0.9.0

func GetWorkflowExecutionUpdates(ctx context.Context, client sdkclient.Client, wfId, runId string, fetchAll bool, foldStatus []enums.WorkflowExecutionStatus, depth int, concurrency int) (WorkflowExecutionUpdateIterator, error)

GetWorkflowExecutionUpdates gets workflow execution updates for a particular workflow - workflow ID of the workflow - runID can be default (empty string) - depth of child workflows to request updates for (-1 for unlimited depth) - concurrency of requests (non-zero positive integer) Returns iterator (see client.GetWorkflowHistory) that provides updated WorkflowExecutionState snapshots. Example: To print a workflow's state whenever there's updates

iter := GetWorkflowExecutionUpdates(ctx, client, wfId, runId, -1, 5)
var state *WorkflowExecutionState
for iter.HasNext() {
	update = iter.Next()
	PrintWorkflowState(update.State)
}

type WorkflowExecutionUpdateIteratorImpl added in v0.9.0

type WorkflowExecutionUpdateIteratorImpl struct {
	// contains filtered or unexported fields
}

WorkflowExecutionUpdateIteratorImpl implements the iterator interface. Keeps information about the last processed update and receives new updates through the updateChan channel.

func (*WorkflowExecutionUpdateIteratorImpl) HasNext added in v0.9.0

func (iter *WorkflowExecutionUpdateIteratorImpl) HasNext() bool

HasNext checks if there's any more updates in the updateChan channel. Returns false if the channel has been closed.

func (*WorkflowExecutionUpdateIteratorImpl) Next added in v0.9.0

Next return the last processed execution update. HasNext has to be called first (following the HasNext/Next pattern).

type WorkflowStateJob added in v0.9.0

type WorkflowStateJob struct {
	// contains filtered or unexported fields
}

WorkflowStateJob implements a WorkerJob to retrieve updates for a WorkflowExecutionState and its child workflows.

func NewWorkflowStateJob added in v0.9.0

func NewWorkflowStateJob(ctx context.Context, client sdkclient.Client, state *WorkflowExecutionState, fetchAll bool, foldStatus []enums.WorkflowExecutionStatus, depth int, updateChan chan struct{}) (*WorkflowStateJob, error)

NewWorkflowStateJob returns a new WorkflowStateJob. It requires an updateChan to signal when there's updates.

func (*WorkflowStateJob) GetChildJob added in v0.9.0

func (job *WorkflowStateJob) GetChildJob(event *history.HistoryEvent) (*WorkflowStateJob, error)

GetChildJob gets a new child job and appends it to the list of childJobs. These jobs don't start until the parent is catched up.

func (*WorkflowStateJob) Run added in v0.9.0

func (job *WorkflowStateJob) Run(group *pond.TaskGroupWithContext) func() error

Run starts the WorkflowStateJob, which retrieves the workflow's events and spawns new jobs for the child workflows once it's up-to-date. New jobs are submitted to the pool when the job is up-to-date to reduce the amount of unnecessary history fetches (e.g. when the child workflow is already completed).

func (*WorkflowStateJob) ShouldStart added in v0.9.0

func (job *WorkflowStateJob) ShouldStart() bool

ShouldStart will return true if the state is in a status that requires requesting its event history. This will help reduce the amount of event histories requested when they're not needed.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL