executor

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 6, 2026 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Overview

Package executor provides a framework for executing activities and flows with retry logic and feedback.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ExecuteActivity

func ExecuteActivity[T, K any](
	ctx context.Context,
	e Executor,
	parentCtx *Context,
	name string,
	activity Activity[T, K],
	input T,
) (K, error)

ExecuteActivity is a generic wrapper around the executor's ExecuteActivity method. It provides compile-time type safety for activity inputs and outputs.

func Expected

func Expected(err error) error

Expected wraps err in an ExpectedError. If err is nil, Expected returns nil.

func IsExpected

func IsExpected(err error) bool

IsExpected reports whether err (or any wrapped error) is an ExpectedError.

func IsReportedError

func IsReportedError(err error) bool

IsReportedError checks if an error is a ReportedError.

func NewReportedError

func NewReportedError(err error) error

NewReportedError wraps an error as a ReportedError.

func NoOpFeedbackHandler

func NoOpFeedbackHandler(_ *Feedback)

NoOpFeedbackHandler is a feedback handler that does nothing.

Types

type Activity

type Activity[T any, K any] func(ctx context.Context, activityCtx *Context, input T) (K, error)

Activity defines a function that can be executed by the executor.

type ActivityFactory

type ActivityFactory[T any, K any] interface {
	NewActivity() Activity[T, K]
}

ActivityFactory creates new instances of activities with specific input and output types.

type BubbleTeaTracker

type BubbleTeaTracker struct {
	// contains filtered or unexported fields
}

BubbleTeaTracker provides a styled TUI-like log with a single live status line.

func NewBubbleTeaTracker

func NewBubbleTeaTracker(silent bool) *BubbleTeaTracker

NewBubbleTeaTracker creates a new progress tracker.

func (*BubbleTeaTracker) FeedbackHandler

func (t *BubbleTeaTracker) FeedbackHandler() FeedbackHandler

FeedbackHandler returns a handler for receiving feedback.

func (*BubbleTeaTracker) GetExecution

func (t *BubbleTeaTracker) GetExecution(name string) *Execution

GetExecution returns a copy of an execution (for testing).

func (*BubbleTeaTracker) GetExecutionCount

func (t *BubbleTeaTracker) GetExecutionCount() int

GetExecutionCount returns the number of tracked executions (for testing).

func (*BubbleTeaTracker) Start

func (t *BubbleTeaTracker) Start()

Start launches the live status writer.

func (*BubbleTeaTracker) Stop

func (t *BubbleTeaTracker) Stop()

Stop stops the tracker.

type Context

type Context struct {
	Executor Executor
	// contains filtered or unexported fields
}

Context provides feedback capabilities to activities and access to the executor.

func NewContext

func NewContext(name string, feedbackFunc FeedbackHandler, executor Executor) *Context

NewContext creates a new executor context.

func (*Context) Child

func (c *Context) Child(name string) *Context

Child returns a new context that reports feedback under the current activity.

func (*Context) GetExecutor

func (c *Context) GetExecutor() Executor

GetExecutor returns the executor associated with the context.

func (*Context) IsErrorReported

func (c *Context) IsErrorReported() bool

IsErrorReported returns true if an error was reported via SendFailed.

func (*Context) Name

func (c *Context) Name() string

Name returns the name of the activity.

func (*Context) SendCompleted

func (c *Context) SendCompleted(message string)

SendCompleted sends a completed status feedback with the given message.

func (*Context) SendCompletedWithDetails

func (c *Context) SendCompletedWithDetails(message string, details string)

SendCompletedWithDetails sends a completed status feedback with the given message and details.

func (*Context) SendFailed

func (c *Context) SendFailed(err error, message string)

SendFailed sends a failed status feedback with the given error and message.

func (*Context) SendFailedWithDetails

func (c *Context) SendFailedWithDetails(err error, message string, details string)

SendFailedWithDetails sends a failed status feedback with the given error, message, and details.

func (*Context) SendProgress

func (c *Context) SendProgress(message string)

SendProgress sends a running status feedback intended to be logged as progress.

func (*Context) SendProgressWithDetails

func (c *Context) SendProgressWithDetails(message string, details string)

SendProgressWithDetails sends a running status feedback intended to be logged as progress.

func (*Context) SendRetry

func (c *Context) SendRetry(attempt int, err error)

SendRetry sends a retry status feedback with the given attempt number and error.

func (*Context) SendRetryWithDetails

func (c *Context) SendRetryWithDetails(message string, details string, attempt int, err error)

SendRetryWithDetails sends a retry status feedback with the given message, details, attempt number, and error.

func (*Context) SendRunning

func (c *Context) SendRunning(message string)

SendRunning sends a running status feedback with the given message.

func (*Context) SendRunningWithDetails

func (c *Context) SendRunningWithDetails(message string, details string)

SendRunningWithDetails sends a running status feedback with the given message and details.

type Execution

type Execution struct {
	Name       string
	Status     Status
	Message    string
	Result     string
	StartTime  time.Time
	EndTime    *time.Time
	Error      error
	Metadata   map[string]any
	ParentName string
	Children   []string
	Level      int
}

Execution represents the progress state of a single execution.

type Executor

type Executor interface {
	ExecuteActivity(
		ctx context.Context,
		parentCtx *Context,
		name string,
		activity Activity[any, any],
		input any,
	) (any, error)
	ExecuteFlow(ctx context.Context, name string, flow Flow) error
	WithRetryPolicy(policy *RetryPolicy) Executor
	WithFeedbackHandler(handler FeedbackHandler) Executor
}

Executor defines the interface for executing flows and activities.

func NewExecutor

func NewExecutor(concurrency int) Executor

NewExecutor creates a new activity executor with the given configuration. If concurrency <= 0, no in-flight limit is applied.

type ExpectedError

type ExpectedError struct {
	Err error
}

ExpectedError marks an error as non-fatal for executor status reporting and retry.

The executor will not retry ExpectedError, and will not mark the activity as failed. Callers can still decide how to handle the returned error.

This is useful for tool-like activities where "not found" or "no match" should be returned to the caller/model as normal output rather than failing the whole flow.

func (ExpectedError) Error

func (e ExpectedError) Error() string

func (ExpectedError) Unwrap

func (e ExpectedError) Unwrap() error

type Feedback

type Feedback struct {
	Timestamp    time.Time `json:"timestamp"`
	Error        error     `json:"-"`
	ActivityName string    `json:"activity_name"`
	Status       Status    `json:"status"`
	Message      string    `json:"message"`
	Details      string    `json:"details,omitempty"`
	Progress     float64   `json:"progress"`
}

Feedback contains information about activity execution progress.

func (*Feedback) String

func (f *Feedback) String() string

String returns a string representation of the feedback.

type FeedbackHandler

type FeedbackHandler func(feedback *Feedback)

FeedbackHandler processes feedback from activities.

type Flow

type Flow func(ctx context.Context, flowCtx *Context) error

Flow defines a function that can be executed by the executor.

type Orchestrator

type Orchestrator struct {
	// contains filtered or unexported fields
}

Orchestrator provides a unified way to execute flows with proper tracker and output handling.

func NewOrchestrator

func NewOrchestrator(outputSink OutputSink, traceLogger TraceLogger, concurrency int) (*Orchestrator, error)

NewOrchestrator creates a new FlowRunner with the given container. traceLogger can be nil if trace logging is not needed. concurrency limits the number of in-flight activities (0 means no limit).

func (*Orchestrator) Execute

func (o *Orchestrator) Execute(
	ctx context.Context,
	flowName string,
	flow Flow,
	silent bool,
) error

Execute executes a flow with proper tracker initialization and cleanup. If the flow returns a ReportedError, it suppresses the error since it was already reported via the feedback tracker.

type OutputSink

type OutputSink interface {
	Flush() error
}

OutputSink defines an interface for flushing output.

type ProgressTracker

type ProgressTracker interface {
	Start()
	Stop()
	FeedbackHandler() FeedbackHandler
	GetExecution(name string) *Execution
	GetExecutionCount() int
}

ProgressTracker is the interface for tracking execution progress.

func NewProgressTracker

func NewProgressTracker(silent bool) ProgressTracker

NewProgressTracker creates a new Bubbletea-based progress tracker.

type ReportedError

type ReportedError struct {
	// contains filtered or unexported fields
}

ReportedError wraps an error that has already been reported via the feedback tracker. When this error is returned, callers should not report it again.

func (*ReportedError) Error

func (e *ReportedError) Error() string

Error implements the error interface.

func (*ReportedError) Unwrap

func (e *ReportedError) Unwrap() error

Unwrap returns the wrapped error.

type RetryPolicy

type RetryPolicy struct {
	// MaxAttempts is the maximum number of retry attempts.
	MaxAttempts int

	// InitialDelay is the initial time to wait between attempts.
	InitialDelay time.Duration

	// MaxDelay is the maximum time to wait between attempts.
	MaxDelay time.Duration

	// Multiplier is the factor by which the delay is multiplied after each attempt.
	Multiplier float64
}

RetryPolicy defines the retry behavior for an operation.

func DefaultRetryPolicy

func DefaultRetryPolicy() *RetryPolicy

DefaultRetryPolicy returns a sensible default retry policy.

func NoRetryPolicy

func NoRetryPolicy() RetryPolicy

NoRetryPolicy returns a policy that doesn't retry.

type Status

type Status string

Status represents the current status of an activity.

const (
	// StatusRunning indicates that the activity is running.
	StatusRunning Status = "running"
	// StatusProgress indicates that the activity has a progress update.
	StatusProgress Status = "progress"
	// StatusCompleted indicates that the activity has completed.
	StatusCompleted Status = "completed"
	// StatusFailed indicates that the activity has failed.
	StatusFailed Status = "failed"
)

type TraceLogger

type TraceLogger interface {
	FeedbackHandler(inner FeedbackHandler) FeedbackHandler
}

TraceLogger defines the interface for trace logging feedback.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL