executor

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 7, 2026 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Executor

type Executor struct {
	// contains filtered or unexported fields
}

Executor is the core execution engine. It pulls work messages from NATS, runs the Pulumi executor, and publishes events back.

func New

func New(rootCtx context.Context, config ExecutorConfig) *Executor

func (*Executor) Run

func (e *Executor) Run(ctx context.Context, op *store.Operation) error

Run launches a MulVAL analysis as a background goroutine and returns immediately once the operation row is created. All results flow through PostgreSQL (via the store layer) and NATS.

op must have been created by the caller via store.CreateAnalysis — the executor does not insert the initial row itself, allowing the API handler to return the LRO synchronously before the goroutine starts.

The returned error covers only the synchronous validation phase. Subprocess failures are reported asynchronously via store.MarkFailed + NATS publish.

func (*Executor) Wait

func (e *Executor) Wait(ctx context.Context)

Wait blocks until all in-flight operations have reached a terminal state and their goroutines have returned. Intended to be called after the root context is cancelled to ensure a clean shutdown before the process exits.

type ExecutorConfig

type ExecutorConfig struct {
	Logger       *zap.Logger
	Metrics      *monitoring.Metrics
	Tracer       trace.Tracer
	PgsqlManager *pgsql.Manager
	NatsManager  *nats.Manager
}

type RunContext

type RunContext struct {
	// Abort is cancelled when the operation should stop.
	Abort context.Context

	// Store is used for all DB writes. Never cancelled.
	Store context.Context
	// contains filtered or unexported fields
}

RunContext composes all the contexts a runner goroutine needs.

Sources of cancellation:

  • ctx: the RPC request context, used only for initial setup. Once the work has begun, can only be stopped by other sources.
  • root: the application lifetime context (SIGTERM/SIGINT). When cancelled, the operation can be retried by another executor.
  • an explicit CancelOperation RPC. Abort fires with reason=API.

func NewRunContext

func NewRunContext(config RunContextConfig) *RunContext

func (*RunContext) Finish

func (rc *RunContext) Finish()

Finish cancels Abort from the operation side - call this with defer at the top of every runner goroutine so the LISTEN goroutine exits cleanly when the operation completes normally.

func (*RunContext) Reason

func (rc *RunContext) Reason() string

Reason returns the cancellation reason. Only meaningful after Abort.Done() has fired.

type RunContextConfig

type RunContextConfig struct {
	RootCtx, ReqCtx context.Context
	OpName          string
	NatsManager     *nats.Manager
	Logger          *zap.Logger
	Metrics         *monitoring.Metrics
	Tracer          trace.Tracer
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL