Documentation
¶
Overview ¶
Package executor provides a framework for executing activities and flows with retry logic and feedback.
Index ¶
- func ExecuteActivity[T, K any](ctx context.Context, e Executor, parentCtx *Context, name string, ...) (K, error)
- func Expected(err error) error
- func IsExpected(err error) bool
- func IsReportedError(err error) bool
- func NewReportedError(err error) error
- func NoOpFeedbackHandler(_ *Feedback)
- type Activity
- type ActivityFactory
- type BubbleTeaTracker
- type Context
- func (c *Context) Child(name string) *Context
- func (c *Context) GetExecutor() Executor
- func (c *Context) IsErrorReported() bool
- func (c *Context) Name() string
- func (c *Context) SendCompleted(message string)
- func (c *Context) SendCompletedWithDetails(message string, details string)
- func (c *Context) SendFailed(err error, message string)
- func (c *Context) SendFailedWithDetails(err error, message string, details string)
- func (c *Context) SendProgress(message string)
- func (c *Context) SendProgressWithDetails(message string, details string)
- func (c *Context) SendRetry(attempt int, err error)
- func (c *Context) SendRetryWithDetails(message string, details string, attempt int, err error)
- func (c *Context) SendRunning(message string)
- func (c *Context) SendRunningWithDetails(message string, details string)
- type Execution
- type Executor
- type ExpectedError
- type Feedback
- type FeedbackHandler
- type Flow
- type Orchestrator
- type OutputSink
- type ProgressTracker
- type ReportedError
- type RetryPolicy
- type Status
- type TraceLogger
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ExecuteActivity ¶
func ExecuteActivity[T, K any]( ctx context.Context, e Executor, parentCtx *Context, name string, activity Activity[T, K], input T, ) (K, error)
ExecuteActivity is a generic wrapper around the executor's ExecuteActivity method. It provides compile-time type safety for activity inputs and outputs.
func IsExpected ¶
IsExpected reports whether err (or any wrapped error) is an ExpectedError.
func IsReportedError ¶
IsReportedError checks if an error is a ReportedError.
func NewReportedError ¶
NewReportedError wraps an error as a ReportedError.
func NoOpFeedbackHandler ¶
func NoOpFeedbackHandler(_ *Feedback)
NoOpFeedbackHandler is a feedback handler that does nothing.
Types ¶
type ActivityFactory ¶
ActivityFactory creates new instances of activities with specific input and output types.
type BubbleTeaTracker ¶
type BubbleTeaTracker struct {
// contains filtered or unexported fields
}
BubbleTeaTracker provides a styled TUI-like log with a single live status line.
func NewBubbleTeaTracker ¶
func NewBubbleTeaTracker(silent bool) *BubbleTeaTracker
NewBubbleTeaTracker creates a new progress tracker.
func (*BubbleTeaTracker) FeedbackHandler ¶
func (t *BubbleTeaTracker) FeedbackHandler() FeedbackHandler
FeedbackHandler returns a handler for receiving feedback.
func (*BubbleTeaTracker) GetExecution ¶
func (t *BubbleTeaTracker) GetExecution(name string) *Execution
GetExecution returns a copy of an execution (for testing).
func (*BubbleTeaTracker) GetExecutionCount ¶
func (t *BubbleTeaTracker) GetExecutionCount() int
GetExecutionCount returns the number of tracked executions (for testing).
func (*BubbleTeaTracker) Start ¶
func (t *BubbleTeaTracker) Start()
Start launches the live status writer.
type Context ¶
type Context struct {
Executor Executor
// contains filtered or unexported fields
}
Context provides feedback capabilities to activities and access to the executor.
func NewContext ¶
func NewContext(name string, feedbackFunc FeedbackHandler, executor Executor) *Context
NewContext creates a new executor context.
func (*Context) Child ¶
Child returns a new context that reports feedback under the current activity.
func (*Context) GetExecutor ¶
GetExecutor returns the executor associated with the context.
func (*Context) IsErrorReported ¶
IsErrorReported returns true if an error was reported via SendFailed.
func (*Context) SendCompleted ¶
SendCompleted sends a completed status feedback with the given message.
func (*Context) SendCompletedWithDetails ¶
SendCompletedWithDetails sends a completed status feedback with the given message and details.
func (*Context) SendFailed ¶
SendFailed sends a failed status feedback with the given error and message.
func (*Context) SendFailedWithDetails ¶
SendFailedWithDetails sends a failed status feedback with the given error, message, and details.
func (*Context) SendProgress ¶
SendProgress sends a running status feedback intended to be logged as progress.
func (*Context) SendProgressWithDetails ¶
SendProgressWithDetails sends a running status feedback intended to be logged as progress.
func (*Context) SendRetry ¶
SendRetry sends a retry status feedback with the given attempt number and error.
func (*Context) SendRetryWithDetails ¶
SendRetryWithDetails sends a retry status feedback with the given message, details, attempt number, and error.
func (*Context) SendRunning ¶
SendRunning sends a running status feedback with the given message.
func (*Context) SendRunningWithDetails ¶
SendRunningWithDetails sends a running status feedback with the given message and details.
type Execution ¶
type Execution struct {
Name string
Status Status
Message string
Result string
StartTime time.Time
EndTime *time.Time
Error error
Metadata map[string]any
ParentName string
Children []string
Level int
}
Execution represents the progress state of a single execution.
type Executor ¶
type Executor interface {
ExecuteActivity(
ctx context.Context,
parentCtx *Context,
name string,
activity Activity[any, any],
input any,
) (any, error)
ExecuteFlow(ctx context.Context, name string, flow Flow) error
WithRetryPolicy(policy *RetryPolicy) Executor
WithFeedbackHandler(handler FeedbackHandler) Executor
}
Executor defines the interface for executing flows and activities.
func NewExecutor ¶
NewExecutor creates a new activity executor with the given configuration. If concurrency <= 0, no in-flight limit is applied.
type ExpectedError ¶
type ExpectedError struct {
Err error
}
ExpectedError marks an error as non-fatal for executor status reporting and retry.
The executor will not retry ExpectedError, and will not mark the activity as failed. Callers can still decide how to handle the returned error.
This is useful for tool-like activities where "not found" or "no match" should be returned to the caller/model as normal output rather than failing the whole flow.
func (ExpectedError) Error ¶
func (e ExpectedError) Error() string
func (ExpectedError) Unwrap ¶
func (e ExpectedError) Unwrap() error
type Feedback ¶
type Feedback struct {
Timestamp time.Time `json:"timestamp"`
Error error `json:"-"`
ActivityName string `json:"activity_name"`
Status Status `json:"status"`
Message string `json:"message"`
Details string `json:"details,omitempty"`
Progress float64 `json:"progress"`
}
Feedback contains information about activity execution progress.
type FeedbackHandler ¶
type FeedbackHandler func(feedback *Feedback)
FeedbackHandler processes feedback from activities.
type Orchestrator ¶
type Orchestrator struct {
// contains filtered or unexported fields
}
Orchestrator provides a unified way to execute flows with proper tracker and output handling.
func NewOrchestrator ¶
func NewOrchestrator(outputSink OutputSink, traceLogger TraceLogger, concurrency int) (*Orchestrator, error)
NewOrchestrator creates a new FlowRunner with the given container. traceLogger can be nil if trace logging is not needed. concurrency limits the number of in-flight activities (0 means no limit).
func (*Orchestrator) Execute ¶
func (o *Orchestrator) Execute( ctx context.Context, flowName string, flow Flow, silent bool, ) error
Execute executes a flow with proper tracker initialization and cleanup. If the flow returns a ReportedError, it suppresses the error since it was already reported via the feedback tracker.
type OutputSink ¶
type OutputSink interface {
Flush() error
}
OutputSink defines an interface for flushing output.
type ProgressTracker ¶
type ProgressTracker interface {
Start()
Stop()
FeedbackHandler() FeedbackHandler
GetExecution(name string) *Execution
GetExecutionCount() int
}
ProgressTracker is the interface for tracking execution progress.
func NewProgressTracker ¶
func NewProgressTracker(silent bool) ProgressTracker
NewProgressTracker creates a new Bubbletea-based progress tracker.
type ReportedError ¶
type ReportedError struct {
// contains filtered or unexported fields
}
ReportedError wraps an error that has already been reported via the feedback tracker. When this error is returned, callers should not report it again.
func (*ReportedError) Error ¶
func (e *ReportedError) Error() string
Error implements the error interface.
func (*ReportedError) Unwrap ¶
func (e *ReportedError) Unwrap() error
Unwrap returns the wrapped error.
type RetryPolicy ¶
type RetryPolicy struct {
// MaxAttempts is the maximum number of retry attempts.
MaxAttempts int
// InitialDelay is the initial time to wait between attempts.
InitialDelay time.Duration
// MaxDelay is the maximum time to wait between attempts.
MaxDelay time.Duration
// Multiplier is the factor by which the delay is multiplied after each attempt.
Multiplier float64
}
RetryPolicy defines the retry behavior for an operation.
func DefaultRetryPolicy ¶
func DefaultRetryPolicy() *RetryPolicy
DefaultRetryPolicy returns a sensible default retry policy.
func NoRetryPolicy ¶
func NoRetryPolicy() RetryPolicy
NoRetryPolicy returns a policy that doesn't retry.
type Status ¶
type Status string
Status represents the current status of an activity.
const ( // StatusRunning indicates that the activity is running. StatusRunning Status = "running" // StatusProgress indicates that the activity has a progress update. StatusProgress Status = "progress" // StatusCompleted indicates that the activity has completed. StatusCompleted Status = "completed" // StatusFailed indicates that the activity has failed. StatusFailed Status = "failed" )
type TraceLogger ¶
type TraceLogger interface {
FeedbackHandler(inner FeedbackHandler) FeedbackHandler
}
TraceLogger defines the interface for trace logging feedback.