controlapi

package
v0.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 19, 2026 License: Apache-2.0 Imports: 30 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrWorkerPodNotFound = errors.New("worker pod not found")

Functions

func AteletInformer

AteletInformer creates a SharedInformerFactory and SharedIndexInformer for Atelet pods.

func RunWorkflow

func RunWorkflow[Params any, Context any](ctx context.Context, params Params, wCtx Context, steps []WorkflowStep[Params, Context]) error

RunWorkflow is a synchronous executor that iterates through a sequence of generic steps. It implements the Client-Driven Forward Recovery pattern.

func StatusErrorInterceptor

func StatusErrorInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error)

StatusErrorInterceptor searches the error chain for a gRPC status error. If found, it extracts the code and message to send to the client, ignoring any outer wrapping, while logging the full error chain internally.

func WorkerPodInformer

WorkerPodInformer creates a SharedInformerFactory and SharedIndexInformer for Worker pods.

Types

type ActorWorkflow

type ActorWorkflow struct {
	// contains filtered or unexported fields
}

ActorWorkflow handles the workflows for actor's resume / suspend operations.

func NewActorWorkflow

func NewActorWorkflow(store store.Interface, dialer *AteletDialer, actorTemplateLister listersv1alpha1.ActorTemplateLister) *ActorWorkflow

NewActorWorkflow creates a new ActorWorkflow.

func (*ActorWorkflow) ResumeActor

func (w *ActorWorkflow) ResumeActor(ctx context.Context, id string, boot bool) (*ateapipb.Actor, error)

ResumeActor executes the workflow to resume a suspended actor. Idempotent.

func (*ActorWorkflow) SuspendActor

func (w *ActorWorkflow) SuspendActor(ctx context.Context, id string) (*ateapipb.Actor, error)

SuspendActor executes the workflow to suspend a running actor. Idempotent.

type AssignWorkerStep

type AssignWorkerStep struct {
	// contains filtered or unexported fields
}

func (*AssignWorkerStep) Execute

func (s *AssignWorkerStep) Execute(ctx context.Context, input *ResumeInput, state *ResumeState) error

func (*AssignWorkerStep) IsComplete

func (s *AssignWorkerStep) IsComplete(ctx context.Context, input *ResumeInput, state *ResumeState) (bool, error)

func (*AssignWorkerStep) Name

func (s *AssignWorkerStep) Name() string

func (*AssignWorkerStep) RetryBackoff

func (s *AssignWorkerStep) RetryBackoff() *wait.Backoff

type AteletDialer

type AteletDialer struct {
	// contains filtered or unexported fields
}

AteletDialer handles gRPC connections to Atelet pods.

func NewAteletDialer

func NewAteletDialer(workerIndexer cache.Indexer, ateletIndexer cache.Indexer) *AteletDialer

NewAteletDialer creates a new AteletDialer.

func (*AteletDialer) DialForWorker

func (d *AteletDialer) DialForWorker(workerPodNamespace, workerPodName string) (*grpc.ClientConn, error)

DialForWorker returns a gRPC connection to the Atelet running on the same node as the specified worker pod. Returns ErrWorkerPodNotFound if the worker pod is not found in the informer cache.

type CallAteletRestoreStep

type CallAteletRestoreStep struct {
	// contains filtered or unexported fields
}

func (*CallAteletRestoreStep) Execute

func (s *CallAteletRestoreStep) Execute(ctx context.Context, input *ResumeInput, state *ResumeState) error

func (*CallAteletRestoreStep) IsComplete

func (s *CallAteletRestoreStep) IsComplete(ctx context.Context, input *ResumeInput, state *ResumeState) (bool, error)

func (*CallAteletRestoreStep) Name

func (s *CallAteletRestoreStep) Name() string

func (*CallAteletRestoreStep) RetryBackoff

func (s *CallAteletRestoreStep) RetryBackoff() *wait.Backoff

type CallAteletSuspendStep

type CallAteletSuspendStep struct {
	// contains filtered or unexported fields
}

func (*CallAteletSuspendStep) Execute

func (s *CallAteletSuspendStep) Execute(ctx context.Context, input *SuspendInput, state *SuspendState) error

func (*CallAteletSuspendStep) IsComplete

func (s *CallAteletSuspendStep) IsComplete(ctx context.Context, input *SuspendInput, state *SuspendState) (bool, error)

func (*CallAteletSuspendStep) Name

func (s *CallAteletSuspendStep) Name() string

func (*CallAteletSuspendStep) RetryBackoff

func (s *CallAteletSuspendStep) RetryBackoff() *wait.Backoff

type FinalizeRunningStep

type FinalizeRunningStep struct {
	// contains filtered or unexported fields
}

func (*FinalizeRunningStep) Execute

func (s *FinalizeRunningStep) Execute(ctx context.Context, input *ResumeInput, state *ResumeState) error

func (*FinalizeRunningStep) IsComplete

func (s *FinalizeRunningStep) IsComplete(ctx context.Context, input *ResumeInput, state *ResumeState) (bool, error)

func (*FinalizeRunningStep) Name

func (s *FinalizeRunningStep) Name() string

func (*FinalizeRunningStep) RetryBackoff

func (s *FinalizeRunningStep) RetryBackoff() *wait.Backoff

type FinalizeSuspendedStep

type FinalizeSuspendedStep struct {
	// contains filtered or unexported fields
}

func (*FinalizeSuspendedStep) Execute

func (s *FinalizeSuspendedStep) Execute(ctx context.Context, input *SuspendInput, state *SuspendState) error

func (*FinalizeSuspendedStep) IsComplete

func (s *FinalizeSuspendedStep) IsComplete(ctx context.Context, input *SuspendInput, state *SuspendState) (bool, error)

func (*FinalizeSuspendedStep) Name

func (s *FinalizeSuspendedStep) Name() string

func (*FinalizeSuspendedStep) RetryBackoff

func (s *FinalizeSuspendedStep) RetryBackoff() *wait.Backoff

type LoadActorForResumeStep

type LoadActorForResumeStep struct {
	// contains filtered or unexported fields
}

func (*LoadActorForResumeStep) Execute

func (s *LoadActorForResumeStep) Execute(ctx context.Context, input *ResumeInput, state *ResumeState) error

func (*LoadActorForResumeStep) IsComplete

func (s *LoadActorForResumeStep) IsComplete(ctx context.Context, input *ResumeInput, state *ResumeState) (bool, error)

func (*LoadActorForResumeStep) Name

func (s *LoadActorForResumeStep) Name() string

func (*LoadActorForResumeStep) RetryBackoff

func (s *LoadActorForResumeStep) RetryBackoff() *wait.Backoff

type LoadActorForSuspendStep

type LoadActorForSuspendStep struct {
	// contains filtered or unexported fields
}

func (*LoadActorForSuspendStep) Execute

func (s *LoadActorForSuspendStep) Execute(ctx context.Context, input *SuspendInput, state *SuspendState) error

func (*LoadActorForSuspendStep) IsComplete

func (s *LoadActorForSuspendStep) IsComplete(ctx context.Context, input *SuspendInput, state *SuspendState) (bool, error)

func (*LoadActorForSuspendStep) Name

func (s *LoadActorForSuspendStep) Name() string

func (*LoadActorForSuspendStep) RetryBackoff

func (s *LoadActorForSuspendStep) RetryBackoff() *wait.Backoff

type MarkSuspendingStep

type MarkSuspendingStep struct {
	// contains filtered or unexported fields
}

func (*MarkSuspendingStep) Execute

func (s *MarkSuspendingStep) Execute(ctx context.Context, input *SuspendInput, state *SuspendState) error

func (*MarkSuspendingStep) IsComplete

func (s *MarkSuspendingStep) IsComplete(ctx context.Context, input *SuspendInput, state *SuspendState) (bool, error)

func (*MarkSuspendingStep) Name

func (s *MarkSuspendingStep) Name() string

func (*MarkSuspendingStep) RetryBackoff

func (s *MarkSuspendingStep) RetryBackoff() *wait.Backoff

type ResumeInput

type ResumeInput struct {
	ActorID string
	Boot    bool
}

ResumeInput holds the immutable parameters requested by the client.

type ResumeState

type ResumeState struct {
	Actor         *ateapipb.Actor
	ActorTemplate *atev1alpha1.ActorTemplate
}

ResumeState holds the mutable state loaded and modified during execution.

type Service

type Service struct {
	ateapipb.UnimplementedControlServer
	// contains filtered or unexported fields
}

Service implements ateapipb.Control

func NewService

func NewService(persistence store.Interface, actorTemplateLister listersv1alpha1.ActorTemplateLister, dialer *AteletDialer) *Service

NewService creates a service.

func (*Service) CreateActor

func (*Service) DebugClear

func (*Service) DeleteActor

func (*Service) GetActor

func (*Service) ListActors

func (*Service) ListWorkers

func (*Service) ResumeActor

func (*Service) SuspendActor

type SuspendInput

type SuspendInput struct {
	ActorID string
}

SuspendInput holds the immutable parameters requested by the client.

type SuspendState

type SuspendState struct {
	Actor         *ateapipb.Actor
	ActorTemplate *atev1alpha1.ActorTemplate
}

SuspendState holds the mutable state loaded and modified during execution.

type WorkerPoolSyncer

type WorkerPoolSyncer struct {
	// contains filtered or unexported fields
}

WorkerPoolSyncer reconciles the state of worker pods from Kubernetes Informer into the store.

func NewWorkerPoolSyncer

func NewWorkerPoolSyncer(persistence store.Interface, workerInformer cache.SharedIndexInformer) *WorkerPoolSyncer

NewWorkerPoolSyncer creates a new WorkerPoolSyncer.

func (*WorkerPoolSyncer) Start

func (s *WorkerPoolSyncer) Start(ctx context.Context)

Start starts the background reconciliation loop.

type WorkflowStep

type WorkflowStep[Params any, Context any] interface {
	// Name returns the identifier for this step (useful for logging and debugging).
	Name() string

	// IsComplete checks if this step's work has already been completed.
	// If it returns true, the engine skips Execute() and fast-forwards to the next step.
	IsComplete(ctx context.Context, params Params, wCtx Context) (bool, error)

	// Execute performs the step's business logic and persists any state changes.
	// If an error is returned, the workflow stops and relies on the client to retry.
	Execute(ctx context.Context, params Params, wCtx Context) error

	// RetryBackoff returns an optional backoff configuration for this step.
	// If non-nil, the workflow orchestrator automatically retries Execute() on persistence conflicts.
	RetryBackoff() *wait.Backoff
}

WorkflowStep represents a single, idempotent operation in a workflow graph. Params is the immutable parameters used to start the workflow. Context is the mutable context fetched or modified during execution.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL