Documentation
¶
Index ¶
- Variables
- func AteletInformer(kc kubernetes.Interface) (informers.SharedInformerFactory, cache.SharedIndexInformer)
- func RunWorkflow[Params any, Context any](ctx context.Context, params Params, wCtx Context, ...) error
- func StatusErrorInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, ...) (interface{}, error)
- func WorkerPodInformer(kc kubernetes.Interface) (informers.SharedInformerFactory, cache.SharedIndexInformer)
- type ActorWorkflow
- type AssignWorkerStep
- func (s *AssignWorkerStep) Execute(ctx context.Context, input *ResumeInput, state *ResumeState) error
- func (s *AssignWorkerStep) IsComplete(ctx context.Context, input *ResumeInput, state *ResumeState) (bool, error)
- func (s *AssignWorkerStep) Name() string
- func (s *AssignWorkerStep) RetryBackoff() *wait.Backoff
- type AteletDialer
- type CallAteletRestoreStep
- func (s *CallAteletRestoreStep) Execute(ctx context.Context, input *ResumeInput, state *ResumeState) error
- func (s *CallAteletRestoreStep) IsComplete(ctx context.Context, input *ResumeInput, state *ResumeState) (bool, error)
- func (s *CallAteletRestoreStep) Name() string
- func (s *CallAteletRestoreStep) RetryBackoff() *wait.Backoff
- type CallAteletSuspendStep
- func (s *CallAteletSuspendStep) Execute(ctx context.Context, input *SuspendInput, state *SuspendState) error
- func (s *CallAteletSuspendStep) IsComplete(ctx context.Context, input *SuspendInput, state *SuspendState) (bool, error)
- func (s *CallAteletSuspendStep) Name() string
- func (s *CallAteletSuspendStep) RetryBackoff() *wait.Backoff
- type FinalizeRunningStep
- func (s *FinalizeRunningStep) Execute(ctx context.Context, input *ResumeInput, state *ResumeState) error
- func (s *FinalizeRunningStep) IsComplete(ctx context.Context, input *ResumeInput, state *ResumeState) (bool, error)
- func (s *FinalizeRunningStep) Name() string
- func (s *FinalizeRunningStep) RetryBackoff() *wait.Backoff
- type FinalizeSuspendedStep
- func (s *FinalizeSuspendedStep) Execute(ctx context.Context, input *SuspendInput, state *SuspendState) error
- func (s *FinalizeSuspendedStep) IsComplete(ctx context.Context, input *SuspendInput, state *SuspendState) (bool, error)
- func (s *FinalizeSuspendedStep) Name() string
- func (s *FinalizeSuspendedStep) RetryBackoff() *wait.Backoff
- type LoadActorForResumeStep
- func (s *LoadActorForResumeStep) Execute(ctx context.Context, input *ResumeInput, state *ResumeState) error
- func (s *LoadActorForResumeStep) IsComplete(ctx context.Context, input *ResumeInput, state *ResumeState) (bool, error)
- func (s *LoadActorForResumeStep) Name() string
- func (s *LoadActorForResumeStep) RetryBackoff() *wait.Backoff
- type LoadActorForSuspendStep
- func (s *LoadActorForSuspendStep) Execute(ctx context.Context, input *SuspendInput, state *SuspendState) error
- func (s *LoadActorForSuspendStep) IsComplete(ctx context.Context, input *SuspendInput, state *SuspendState) (bool, error)
- func (s *LoadActorForSuspendStep) Name() string
- func (s *LoadActorForSuspendStep) RetryBackoff() *wait.Backoff
- type MarkSuspendingStep
- func (s *MarkSuspendingStep) Execute(ctx context.Context, input *SuspendInput, state *SuspendState) error
- func (s *MarkSuspendingStep) IsComplete(ctx context.Context, input *SuspendInput, state *SuspendState) (bool, error)
- func (s *MarkSuspendingStep) Name() string
- func (s *MarkSuspendingStep) RetryBackoff() *wait.Backoff
- type ResumeInput
- type ResumeState
- type Service
- func (s *Service) CreateActor(ctx context.Context, req *ateapipb.CreateActorRequest) (*ateapipb.CreateActorResponse, error)
- func (s *Service) DebugClear(ctx context.Context, req *ateapipb.DebugClearRequest) (*ateapipb.DebugClearResponse, error)
- func (s *Service) DeleteActor(ctx context.Context, req *ateapipb.DeleteActorRequest) (*ateapipb.DeleteActorResponse, error)
- func (s *Service) GetActor(ctx context.Context, req *ateapipb.GetActorRequest) (*ateapipb.GetActorResponse, error)
- func (s *Service) ListActors(ctx context.Context, req *ateapipb.ListActorsRequest) (*ateapipb.ListActorsResponse, error)
- func (s *Service) ListWorkers(ctx context.Context, req *ateapipb.ListWorkersRequest) (*ateapipb.ListWorkersResponse, error)
- func (s *Service) ResumeActor(ctx context.Context, req *ateapipb.ResumeActorRequest) (*ateapipb.ResumeActorResponse, error)
- func (s *Service) SuspendActor(ctx context.Context, req *ateapipb.SuspendActorRequest) (*ateapipb.SuspendActorResponse, error)
- type SuspendInput
- type SuspendState
- type WorkerPoolSyncer
- type WorkflowStep
Constants ¶
This section is empty.
Variables ¶
var ErrWorkerPodNotFound = errors.New("worker pod not found")
Functions ¶
func AteletInformer ¶
func AteletInformer(kc kubernetes.Interface) (informers.SharedInformerFactory, cache.SharedIndexInformer)
AteletInformer creates a SharedInformerFactory and SharedIndexInformer for Atelet pods.
func RunWorkflow ¶
func RunWorkflow[Params any, Context any](ctx context.Context, params Params, wCtx Context, steps []WorkflowStep[Params, Context]) error
RunWorkflow is a synchronous executor that iterates through a sequence of generic steps. It implements the Client-Driven Forward Recovery pattern.
func StatusErrorInterceptor ¶
func StatusErrorInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error)
StatusErrorInterceptor searches the error chain for a gRPC status error. If found, it extracts the code and message to send to the client, ignoring any outer wrapping, while logging the full error chain internally.
func WorkerPodInformer ¶
func WorkerPodInformer(kc kubernetes.Interface) (informers.SharedInformerFactory, cache.SharedIndexInformer)
WorkerPodInformer creates a SharedInformerFactory and SharedIndexInformer for Worker pods.
Types ¶
type ActorWorkflow ¶
type ActorWorkflow struct {
// contains filtered or unexported fields
}
ActorWorkflow handles the workflows for actor's resume / suspend operations.
func NewActorWorkflow ¶
func NewActorWorkflow(store store.Interface, dialer *AteletDialer, actorTemplateLister listersv1alpha1.ActorTemplateLister) *ActorWorkflow
NewActorWorkflow creates a new ActorWorkflow.
func (*ActorWorkflow) ResumeActor ¶
func (w *ActorWorkflow) ResumeActor(ctx context.Context, id string, boot bool) (*ateapipb.Actor, error)
ResumeActor executes the workflow to resume a suspended actor. Idempotent.
func (*ActorWorkflow) SuspendActor ¶
SuspendActor executes the workflow to suspend a running actor. Idempotent.
type AssignWorkerStep ¶
type AssignWorkerStep struct {
// contains filtered or unexported fields
}
func (*AssignWorkerStep) Execute ¶
func (s *AssignWorkerStep) Execute(ctx context.Context, input *ResumeInput, state *ResumeState) error
func (*AssignWorkerStep) IsComplete ¶
func (s *AssignWorkerStep) IsComplete(ctx context.Context, input *ResumeInput, state *ResumeState) (bool, error)
func (*AssignWorkerStep) Name ¶
func (s *AssignWorkerStep) Name() string
func (*AssignWorkerStep) RetryBackoff ¶
func (s *AssignWorkerStep) RetryBackoff() *wait.Backoff
type AteletDialer ¶
type AteletDialer struct {
// contains filtered or unexported fields
}
AteletDialer handles gRPC connections to Atelet pods.
func NewAteletDialer ¶
func NewAteletDialer(workerIndexer cache.Indexer, ateletIndexer cache.Indexer) *AteletDialer
NewAteletDialer creates a new AteletDialer.
func (*AteletDialer) DialForWorker ¶
func (d *AteletDialer) DialForWorker(workerPodNamespace, workerPodName string) (*grpc.ClientConn, error)
DialForWorker returns a gRPC connection to the Atelet running on the same node as the specified worker pod. Returns ErrWorkerPodNotFound if the worker pod is not found in the informer cache.
type CallAteletRestoreStep ¶
type CallAteletRestoreStep struct {
// contains filtered or unexported fields
}
func (*CallAteletRestoreStep) Execute ¶
func (s *CallAteletRestoreStep) Execute(ctx context.Context, input *ResumeInput, state *ResumeState) error
func (*CallAteletRestoreStep) IsComplete ¶
func (s *CallAteletRestoreStep) IsComplete(ctx context.Context, input *ResumeInput, state *ResumeState) (bool, error)
func (*CallAteletRestoreStep) Name ¶
func (s *CallAteletRestoreStep) Name() string
func (*CallAteletRestoreStep) RetryBackoff ¶
func (s *CallAteletRestoreStep) RetryBackoff() *wait.Backoff
type CallAteletSuspendStep ¶
type CallAteletSuspendStep struct {
// contains filtered or unexported fields
}
func (*CallAteletSuspendStep) Execute ¶
func (s *CallAteletSuspendStep) Execute(ctx context.Context, input *SuspendInput, state *SuspendState) error
func (*CallAteletSuspendStep) IsComplete ¶
func (s *CallAteletSuspendStep) IsComplete(ctx context.Context, input *SuspendInput, state *SuspendState) (bool, error)
func (*CallAteletSuspendStep) Name ¶
func (s *CallAteletSuspendStep) Name() string
func (*CallAteletSuspendStep) RetryBackoff ¶
func (s *CallAteletSuspendStep) RetryBackoff() *wait.Backoff
type FinalizeRunningStep ¶
type FinalizeRunningStep struct {
// contains filtered or unexported fields
}
func (*FinalizeRunningStep) Execute ¶
func (s *FinalizeRunningStep) Execute(ctx context.Context, input *ResumeInput, state *ResumeState) error
func (*FinalizeRunningStep) IsComplete ¶
func (s *FinalizeRunningStep) IsComplete(ctx context.Context, input *ResumeInput, state *ResumeState) (bool, error)
func (*FinalizeRunningStep) Name ¶
func (s *FinalizeRunningStep) Name() string
func (*FinalizeRunningStep) RetryBackoff ¶
func (s *FinalizeRunningStep) RetryBackoff() *wait.Backoff
type FinalizeSuspendedStep ¶
type FinalizeSuspendedStep struct {
// contains filtered or unexported fields
}
func (*FinalizeSuspendedStep) Execute ¶
func (s *FinalizeSuspendedStep) Execute(ctx context.Context, input *SuspendInput, state *SuspendState) error
func (*FinalizeSuspendedStep) IsComplete ¶
func (s *FinalizeSuspendedStep) IsComplete(ctx context.Context, input *SuspendInput, state *SuspendState) (bool, error)
func (*FinalizeSuspendedStep) Name ¶
func (s *FinalizeSuspendedStep) Name() string
func (*FinalizeSuspendedStep) RetryBackoff ¶
func (s *FinalizeSuspendedStep) RetryBackoff() *wait.Backoff
type LoadActorForResumeStep ¶
type LoadActorForResumeStep struct {
// contains filtered or unexported fields
}
func (*LoadActorForResumeStep) Execute ¶
func (s *LoadActorForResumeStep) Execute(ctx context.Context, input *ResumeInput, state *ResumeState) error
func (*LoadActorForResumeStep) IsComplete ¶
func (s *LoadActorForResumeStep) IsComplete(ctx context.Context, input *ResumeInput, state *ResumeState) (bool, error)
func (*LoadActorForResumeStep) Name ¶
func (s *LoadActorForResumeStep) Name() string
func (*LoadActorForResumeStep) RetryBackoff ¶
func (s *LoadActorForResumeStep) RetryBackoff() *wait.Backoff
type LoadActorForSuspendStep ¶
type LoadActorForSuspendStep struct {
// contains filtered or unexported fields
}
func (*LoadActorForSuspendStep) Execute ¶
func (s *LoadActorForSuspendStep) Execute(ctx context.Context, input *SuspendInput, state *SuspendState) error
func (*LoadActorForSuspendStep) IsComplete ¶
func (s *LoadActorForSuspendStep) IsComplete(ctx context.Context, input *SuspendInput, state *SuspendState) (bool, error)
func (*LoadActorForSuspendStep) Name ¶
func (s *LoadActorForSuspendStep) Name() string
func (*LoadActorForSuspendStep) RetryBackoff ¶
func (s *LoadActorForSuspendStep) RetryBackoff() *wait.Backoff
type MarkSuspendingStep ¶
type MarkSuspendingStep struct {
// contains filtered or unexported fields
}
func (*MarkSuspendingStep) Execute ¶
func (s *MarkSuspendingStep) Execute(ctx context.Context, input *SuspendInput, state *SuspendState) error
func (*MarkSuspendingStep) IsComplete ¶
func (s *MarkSuspendingStep) IsComplete(ctx context.Context, input *SuspendInput, state *SuspendState) (bool, error)
func (*MarkSuspendingStep) Name ¶
func (s *MarkSuspendingStep) Name() string
func (*MarkSuspendingStep) RetryBackoff ¶
func (s *MarkSuspendingStep) RetryBackoff() *wait.Backoff
type ResumeInput ¶
ResumeInput holds the immutable parameters requested by the client.
type ResumeState ¶
type ResumeState struct {
Actor *ateapipb.Actor
ActorTemplate *atev1alpha1.ActorTemplate
}
ResumeState holds the mutable state loaded and modified during execution.
type Service ¶
type Service struct {
ateapipb.UnimplementedControlServer
// contains filtered or unexported fields
}
Service implements ateapipb.Control
func NewService ¶
func NewService(persistence store.Interface, actorTemplateLister listersv1alpha1.ActorTemplateLister, dialer *AteletDialer) *Service
NewService creates a service.
func (*Service) CreateActor ¶
func (s *Service) CreateActor(ctx context.Context, req *ateapipb.CreateActorRequest) (*ateapipb.CreateActorResponse, error)
func (*Service) DebugClear ¶
func (s *Service) DebugClear(ctx context.Context, req *ateapipb.DebugClearRequest) (*ateapipb.DebugClearResponse, error)
func (*Service) DeleteActor ¶
func (s *Service) DeleteActor(ctx context.Context, req *ateapipb.DeleteActorRequest) (*ateapipb.DeleteActorResponse, error)
func (*Service) GetActor ¶
func (s *Service) GetActor(ctx context.Context, req *ateapipb.GetActorRequest) (*ateapipb.GetActorResponse, error)
func (*Service) ListActors ¶
func (s *Service) ListActors(ctx context.Context, req *ateapipb.ListActorsRequest) (*ateapipb.ListActorsResponse, error)
func (*Service) ListWorkers ¶
func (s *Service) ListWorkers(ctx context.Context, req *ateapipb.ListWorkersRequest) (*ateapipb.ListWorkersResponse, error)
func (*Service) ResumeActor ¶
func (s *Service) ResumeActor(ctx context.Context, req *ateapipb.ResumeActorRequest) (*ateapipb.ResumeActorResponse, error)
func (*Service) SuspendActor ¶
func (s *Service) SuspendActor(ctx context.Context, req *ateapipb.SuspendActorRequest) (*ateapipb.SuspendActorResponse, error)
type SuspendInput ¶
type SuspendInput struct {
ActorID string
}
SuspendInput holds the immutable parameters requested by the client.
type SuspendState ¶
type SuspendState struct {
Actor *ateapipb.Actor
ActorTemplate *atev1alpha1.ActorTemplate
}
SuspendState holds the mutable state loaded and modified during execution.
type WorkerPoolSyncer ¶
type WorkerPoolSyncer struct {
// contains filtered or unexported fields
}
WorkerPoolSyncer reconciles the state of worker pods from Kubernetes Informer into the store.
func NewWorkerPoolSyncer ¶
func NewWorkerPoolSyncer(persistence store.Interface, workerInformer cache.SharedIndexInformer) *WorkerPoolSyncer
NewWorkerPoolSyncer creates a new WorkerPoolSyncer.
func (*WorkerPoolSyncer) Start ¶
func (s *WorkerPoolSyncer) Start(ctx context.Context)
Start starts the background reconciliation loop.
type WorkflowStep ¶
type WorkflowStep[Params any, Context any] interface { // Name returns the identifier for this step (useful for logging and debugging). Name() string // IsComplete checks if this step's work has already been completed. // If it returns true, the engine skips Execute() and fast-forwards to the next step. IsComplete(ctx context.Context, params Params, wCtx Context) (bool, error) // Execute performs the step's business logic and persists any state changes. // If an error is returned, the workflow stops and relies on the client to retry. Execute(ctx context.Context, params Params, wCtx Context) error // RetryBackoff returns an optional backoff configuration for this step. // If non-nil, the workflow orchestrator automatically retries Execute() on persistence conflicts. RetryBackoff() *wait.Backoff }
WorkflowStep represents a single, idempotent operation in a workflow graph. Params is the immutable parameters used to start the workflow. Context is the mutable context fetched or modified during execution.