Documentation
¶
Index ¶
- Constants
- Variables
- func CreateDefaultISBService(jetstreamVersion string, phase numaflowv1.ISBSvcPhase, fullyReconciled bool) *numaflowv1.InterStepBufferService
- func CreateDefaultISBServiceSpec(jetstreamVersion string) numaflowv1.InterStepBufferServiceSpec
- func CreateDefaultTestMVOfPhase(phase numaflowv1.MonoVertexPhase) *numaflowv1.MonoVertex
- func CreateDefaultTestPipelineOfPhase(phase numaflowv1.PipelinePhase) *numaflowv1.Pipeline
- func CreateDeploymentInK8S(ctx context.Context, t *testing.T, k8sClientSet *k8sclientgo.Clientset, ...)
- func CreateISBServiceRollout(isbsvcSpec numaflowv1.InterStepBufferServiceSpec, ...) *apiv1.ISBServiceRollout
- func CreateISBServiceRolloutInK8S(ctx context.Context, t *testing.T, numaplaneClient client.Client, ...)
- func CreateISBSvcInK8S(ctx context.Context, t *testing.T, ...)
- func CreateMVRolloutInK8S(ctx context.Context, t *testing.T, numaplaneClient client.Client, ...)
- func CreateMonoVertexInK8S(ctx context.Context, t *testing.T, ...)
- func CreateNumaflowControllerInK8S(ctx context.Context, t *testing.T, numaplaneClient client.Client, ...)
- func CreatePipelineInK8S(ctx context.Context, t *testing.T, ...)
- func CreatePipelineRolloutInK8S(ctx context.Context, t *testing.T, numaplaneClient client.Client, ...)
- func CreateRidersForNewChild(ctx context.Context, controller RolloutController, rolloutObject RolloutObject, ...) error
- func CreateStatefulSetInK8S(ctx context.Context, t *testing.T, k8sClientSet *k8sclientgo.Clientset, ...)
- func CreateTestISBService(jetstreamVersion string, name string, phase numaflowv1.ISBSvcPhase, ...) *numaflowv1.InterStepBufferService
- func CreateTestMVRollout(mvSpec numaflowv1.MonoVertexSpec, rolloutAnnotations map[string]string, ...) *apiv1.MonoVertexRollout
- func CreateTestMonoVertexOfSpec(spec numaflowv1.MonoVertexSpec, name string, phase numaflowv1.MonoVertexPhase, ...) *numaflowv1.MonoVertex
- func CreateTestPipelineOfSpec(spec numaflowv1.PipelineSpec, name string, phase numaflowv1.PipelinePhase, ...) *numaflowv1.Pipeline
- func CreateTestPipelineRollout(pipelineSpec numaflowv1.PipelineSpec, rolloutAnnotations map[string]string, ...) *apiv1.PipelineRollout
- func CreateTestPipelineUnstructured(name string, spec string) (*unstructured.Unstructured, error)
- func CreateVertexInK8S(ctx context.Context, t *testing.T, ...)
- func FindChildrenOfUpgradeState(ctx context.Context, rolloutObject RolloutObject, ...) (unstructured.UnstructuredList, error)
- func FindMostCurrentChildOfUpgradeState(ctx context.Context, rolloutObject RolloutObject, ...) (*unstructured.Unstructured, error)
- func GarbageCollectChildren(ctx context.Context, rolloutObject RolloutObject, controller RolloutController, ...) (bool, error)
- func GetChildName(ctx context.Context, rolloutObject RolloutObject, controller RolloutController, ...) (string, error)
- func GetNumaflowControllerDefinitions(definitionsFile string) (*config.NumaflowControllerDefinitionConfig, error)
- func GetRolloutParentName(childName string) (string, error)
- func GetUpgradeState(ctx context.Context, c client.Client, childObject *unstructured.Unstructured) (*common.UpgradeState, *common.UpgradeStateReason)
- func PipelineWithDesiredPhase(spec numaflowv1.PipelineSpec, phase numaflowv1.PipelinePhase) numaflowv1.PipelineSpec
- func UpdateUpgradeState(ctx context.Context, c client.Client, upgradeState common.UpgradeState, ...) error
- func VerifyAutoHealing(ctx context.Context, gvk schema.GroupVersionKind, namespace string, ...)
- func VerifyStatusPhase(ctx context.Context, gvk schema.GroupVersionKind, namespace string, ...)
- type InProgressStrategyMgr
- type RolloutController
- type RolloutObject
- type TypedGenerationChangedPredicate
Constants ¶
const ( TestDefaultTimeout = 15 * time.Second TestDefaultDuration = 10 * time.Second TestDefaultInterval = 250 * time.Millisecond )
Variables ¶
var ( DefaultTestNamespace = "default" DefaultTestISBSvcRolloutName = "isbservicerollout-test" DefaultTestISBSvcName = DefaultTestISBSvcRolloutName + "-0" DefaultTestPipelineRolloutName = "pipelinerollout-test" DefaultTestPipelineName = DefaultTestPipelineRolloutName + "-0" DefaultTestMonoVertexRolloutName = "monovertexrollout-test" DefaultTestMonoVertexName = DefaultTestMonoVertexRolloutName + "-0" DefaultTestNumaflowControllerRolloutName = "numaflow-controller" DefaultTestNumaflowControllerName = "numaflow-controller" // TODO: change to add "-0" suffix after Progressive DefaultTestNumaflowControllerDeploymentName = "numaflow-controller" )
var ( TestRESTConfig *rest.Config TestK8sClient client.Client TestCustomMetrics *metrics.CustomMetrics )
var DefaultScaleJSONString string = `{"min":2,"max":2}`
var DefaultScaleTo int64 = 1
Functions ¶
func CreateDefaultISBService ¶ added in v0.11.0
func CreateDefaultISBService(jetstreamVersion string, phase numaflowv1.ISBSvcPhase, fullyReconciled bool) *numaflowv1.InterStepBufferService
func CreateDefaultISBServiceSpec ¶ added in v0.11.0
func CreateDefaultISBServiceSpec(jetstreamVersion string) numaflowv1.InterStepBufferServiceSpec
func CreateDefaultTestMVOfPhase ¶ added in v0.10.0
func CreateDefaultTestMVOfPhase(phase numaflowv1.MonoVertexPhase) *numaflowv1.MonoVertex
func CreateDefaultTestPipelineOfPhase ¶
func CreateDefaultTestPipelineOfPhase(phase numaflowv1.PipelinePhase) *numaflowv1.Pipeline
func CreateDeploymentInK8S ¶
func CreateDeploymentInK8S(ctx context.Context, t *testing.T, k8sClientSet *k8sclientgo.Clientset, deployment *appsv1.Deployment)
func CreateISBServiceRollout ¶ added in v0.11.0
func CreateISBServiceRollout(isbsvcSpec numaflowv1.InterStepBufferServiceSpec, optionalStatus *apiv1.ISBServiceRolloutStatus) *apiv1.ISBServiceRollout
func CreateISBServiceRolloutInK8S ¶ added in v0.11.0
func CreateISBSvcInK8S ¶
func CreateISBSvcInK8S(ctx context.Context, t *testing.T, numaflowClientSet *numaflowversioned.Clientset, isbsvc *numaflowv1.InterStepBufferService)
func CreateMVRolloutInK8S ¶ added in v0.10.0
func CreateMonoVertexInK8S ¶ added in v0.10.0
func CreateMonoVertexInK8S(ctx context.Context, t *testing.T, numaflowClientSet *numaflowversioned.Clientset, monoVertex *numaflowv1.MonoVertex)
func CreateNumaflowControllerInK8S ¶ added in v0.10.0
func CreatePipelineInK8S ¶
func CreatePipelineInK8S(ctx context.Context, t *testing.T, numaflowClientSet *numaflowversioned.Clientset, pipeline *numaflowv1.Pipeline)
func CreateRidersForNewChild ¶ added in v0.22.0
func CreateRidersForNewChild( ctx context.Context, controller RolloutController, rolloutObject RolloutObject, child *unstructured.Unstructured, c client.Client, ) error
Determine the list of Riders which are needed for the child and create them on the cluster
func CreateStatefulSetInK8S ¶
func CreateStatefulSetInK8S(ctx context.Context, t *testing.T, k8sClientSet *k8sclientgo.Clientset, statefulSet *appsv1.StatefulSet)
func CreateTestISBService ¶ added in v0.13.0
func CreateTestISBService( jetstreamVersion string, name string, phase numaflowv1.ISBSvcPhase, fullyReconciled bool, labels map[string]string, annotations map[string]string, ) *numaflowv1.InterStepBufferService
func CreateTestMVRollout ¶ added in v0.10.0
func CreateTestMVRollout(mvSpec numaflowv1.MonoVertexSpec, rolloutAnnotations map[string]string, rolloutLabels map[string]string, mvAnnotations map[string]string, mvLabels map[string]string, optionalStatus *apiv1.MonoVertexRolloutStatus) *apiv1.MonoVertexRollout
func CreateTestMonoVertexOfSpec ¶ added in v0.10.0
func CreateTestMonoVertexOfSpec( spec numaflowv1.MonoVertexSpec, name string, phase numaflowv1.MonoVertexPhase, status numaflowv1.Status, labels map[string]string, annotations map[string]string, ) *numaflowv1.MonoVertex
func CreateTestPipelineOfSpec ¶
func CreateTestPipelineOfSpec( spec numaflowv1.PipelineSpec, name string, phase numaflowv1.PipelinePhase, status numaflowv1.Status, drainedOnPause bool, labels map[string]string, annotations map[string]string, ) *numaflowv1.Pipeline
func CreateTestPipelineRollout ¶
func CreateTestPipelineRollout(pipelineSpec numaflowv1.PipelineSpec, rolloutAnnotations map[string]string, rolloutLabels map[string]string, pipelineAnnotations map[string]string, pipelineLabels map[string]string, optionalStatus *apiv1.PipelineRolloutStatus) *apiv1.PipelineRollout
func CreateTestPipelineUnstructured ¶ added in v0.20.0
func CreateTestPipelineUnstructured(name string, spec string) (*unstructured.Unstructured, error)
func CreateVertexInK8S ¶ added in v0.26.0
func CreateVertexInK8S(ctx context.Context, t *testing.T, numaflowClientSet *numaflowversioned.Clientset, vertex *numaflowv1.Vertex)
func FindChildrenOfUpgradeState ¶ added in v0.13.0
func FindChildrenOfUpgradeState(ctx context.Context, rolloutObject RolloutObject, upgradeState common.UpgradeState, upgradeStateReason *common.UpgradeStateReason, checkLive bool, c client.Client) (unstructured.UnstructuredList, error)
Find the children of a given Rollout of specified UpgradeState (plus optional UpgradeStateReason)
func FindMostCurrentChildOfUpgradeState ¶ added in v0.13.0
func FindMostCurrentChildOfUpgradeState(ctx context.Context, rolloutObject RolloutObject, upgradeState common.UpgradeState, upgradeStateReason *common.UpgradeStateReason, checkLive bool, c client.Client) (*unstructured.Unstructured, error)
find the most current child of a Rollout (of specified UpgradeState, plus optional UpgradeStateReason) typically we should only find one, but perhaps a previous reconciliation failure could cause us to find multiple if we do see older ones, recycle them
func GarbageCollectChildren ¶ added in v0.13.0
func GarbageCollectChildren( ctx context.Context, rolloutObject RolloutObject, controller RolloutController, c client.Client, ) (bool, error)
Garbage Collect all recyclable children; return true if we've deleted all that are recyclable
func GetChildName ¶ added in v0.13.0
func GetChildName(ctx context.Context, rolloutObject RolloutObject, controller RolloutController, upgradeState common.UpgradeState, upgradeStateReason *common.UpgradeStateReason, c client.Client, useExistingChild bool) (string, error)
get the name of the child whose parent is "rolloutObject" and whose upgrade state is "upgradeState" (and if upgradeStateReason is that, check that as well) if none is found, create a new one if one is found, create a new one if "useExistingChild=false", else use existing one
func GetNumaflowControllerDefinitions ¶
func GetNumaflowControllerDefinitions(definitionsFile string) (*config.NumaflowControllerDefinitionConfig, error)
func GetRolloutParentName ¶
assume child name is "<rolloutname>-<number>"
func GetUpgradeState ¶ added in v0.13.0
func GetUpgradeState(ctx context.Context, c client.Client, childObject *unstructured.Unstructured) (*common.UpgradeState, *common.UpgradeStateReason)
func PipelineWithDesiredPhase ¶
func PipelineWithDesiredPhase(spec numaflowv1.PipelineSpec, phase numaflowv1.PipelinePhase) numaflowv1.PipelineSpec
func UpdateUpgradeState ¶ added in v0.13.0
func UpdateUpgradeState(ctx context.Context, c client.Client, upgradeState common.UpgradeState, upgradeStateReason *common.UpgradeStateReason, childObject *unstructured.Unstructured) error
update the in-memory object with the new Label and patch the object in K8S
func VerifyAutoHealing ¶
func VerifyAutoHealing(ctx context.Context, gvk schema.GroupVersionKind, namespace string, resourceName string, pathToValue string, newValue any)
VerifyAutoHealing tests the auto healing feature
func VerifyStatusPhase ¶
Types ¶
type InProgressStrategyMgr ¶
type InProgressStrategyMgr struct { Store *inProgressStrategyStore // contains filtered or unexported fields }
InProgressStrategyMgr is responsible to maintain a Rollout's inProgressStrategy state is maintained both in memory as well as in the Rollout's Status in memory always gives us the latest state in case the Informer cache is out of date the Rollout's Status is useful as a backup mechanism in case Numaplane has just restarted
func NewInProgressStrategyMgr ¶
func NewInProgressStrategyMgr( getRolloutStrategy func(context.Context, client.Object) *apiv1.UpgradeStrategy, setRolloutStrategy func(context.Context, client.Object, apiv1.UpgradeStrategy)) *InProgressStrategyMgr
func (*InProgressStrategyMgr) GetStrategy ¶
func (mgr *InProgressStrategyMgr) GetStrategy(ctx context.Context, rollout client.Object) apiv1.UpgradeStrategy
func (*InProgressStrategyMgr) SetStrategy ¶
func (mgr *InProgressStrategyMgr) SetStrategy(ctx context.Context, rollout client.Object, upgradeStrategy apiv1.UpgradeStrategy)
store in both memory and the Resource itself
func (*InProgressStrategyMgr) UnsetStrategy ¶
func (mgr *InProgressStrategyMgr) UnsetStrategy(ctx context.Context, rollout client.Object)
type RolloutController ¶ added in v0.13.0
type RolloutController interface { // IncrementChildCount updates the count of children for the Resource in Kubernetes and returns the index that should be used for the next child IncrementChildCount(ctx context.Context, rolloutObject RolloutObject) (int32, error) // Recycle deletes child; returns true if it was in fact deleted Recycle(ctx context.Context, childObject *unstructured.Unstructured, c client.Client) (bool, error) // GetDesiredRiders gets the list of Riders as specified in the RolloutObject, templated for the specific child name and // based on the child definition. // Note the child name can be different from child.GetName() // The child name is what's used for templating the Rider definition, while the `child` is really only used by the PipelineRolloutReconciler // in the case of "per-vertex" Riders. In this case, it's necessary to use the existing child's name to template in order to effectively compare whether the // Rider has changed, but use the latest child definition to derive the current list of Vertices that need Riders. GetDesiredRiders(rolloutObject RolloutObject, childName string, child *unstructured.Unstructured) ([]riders.Rider, error) // GetExistingRiders gets the list of Riders that already exists, either for the Promoted child or the Upgrading child depending on the value of "upgrading" GetExistingRiders(ctx context.Context, rolloutObject RolloutObject, upgrading bool) (unstructured.UnstructuredList, error) // SetCurrentRiderList updates the list of Riders SetCurrentRiderList(ctx context.Context, rolloutObject RolloutObject, riders []riders.Rider) }
type RolloutObject ¶
type RolloutObject interface { GetRolloutGVR() metav1.GroupVersionResource GetRolloutGVK() schema.GroupVersionKind GetChildGVR() metav1.GroupVersionResource GetChildGVK() schema.GroupVersionKind GetRolloutObjectMeta() *metav1.ObjectMeta GetRolloutStatus() *apiv1.Status }
type TypedGenerationChangedPredicate ¶ added in v0.11.0
type TypedGenerationChangedPredicate[object metav1.Object] struct { // Returns true by default for all events; see overrides below. predicate.TypedFuncs[object] }
func (TypedGenerationChangedPredicate[object]) Update ¶ added in v0.11.0
func (p TypedGenerationChangedPredicate[object]) Update(e event.TypedUpdateEvent[object]) bool