controllers

package
v1.2.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 3, 2025 License: Apache-2.0 Imports: 74 Imported by: 0

Documentation

Index

Constants

View Source
const (
	EtcdMetricPortName = "client"
	MetricPortName     = "metrics"
	MetricPath         = "/metrics"

	RestfulPortName = "restful"

	MixCoordName      = v1beta1.MixCoordName
	RootCoordName     = v1beta1.RootCoordName
	DataCoordName     = v1beta1.DataCoordName
	QueryCoordName    = v1beta1.QueryCoordName
	IndexCoordName    = v1beta1.IndexCoordName
	DataNodeName      = v1beta1.DataNodeName
	QueryNodeName     = v1beta1.QueryNodeName
	IndexNodeName     = v1beta1.IndexNodeName
	ProxyName         = v1beta1.ProxyName
	StandaloneName    = v1beta1.StandaloneName
	StreamingNodeName = v1beta1.StreamingNodeName
	MilvusName        = "milvus"

	MixCoordFieldName      = "MixCoord"
	RootCoordFieldName     = "RootCoord"
	DataCoordFieldName     = "DataCoord"
	QueryCoordFieldName    = "QueryCoord"
	IndexCoordFieldName    = "IndexCoord"
	DataNodeFieldName      = "DataNode"
	QueryNodeFieldName     = "QueryNode"
	IndexNodeFieldName     = "IndexNode"
	StreamingNodeFieldName = "StreamingNode"
	ProxyFieldName         = "Proxy"
	StandaloneFieldName    = "Standalone"

	MetricPort        = 9091
	MultiplePorts     = -1
	RootCoordPort     = 53100
	DataCoordPort     = 13333
	QueryCoordPort    = 19531
	IndexCoordPort    = 31000
	IndexNodePort     = 21121
	QueryNodePort     = 21123
	DataNodePort      = 21124
	StreamingNodePort = 22222
	ProxyPort         = 19530
	// TODO: use configurable port?
	MilvusPort     = ProxyPort
	StandalonePort = MilvusPort
)

const name or ports

View Source
const (
	Etcd     = "etcd"
	Minio    = "minio"
	Pulsar   = "pulsar"
	PulsarV3 = "pulsar-v3"
	Kafka    = "kafka"
)
View Source
const (
	MilvusDataVolumeName     = "milvus-data" // for standalone persistence only
	MilvusConfigVolumeName   = "milvus-config"
	MilvusConfigRootPath     = "/milvus/configs"
	MilvusOriginalConfigPath = MilvusConfigRootPath + "/milvus.yaml"
	MilvusConfigmapMountPath = MilvusConfigRootPath + "/operator"

	UserYaml                   = "user.yaml"
	HookYaml                   = "hook.yaml"
	AccessKey                  = "accesskey"
	SecretKey                  = "secretkey"
	AnnotationCheckSum         = "checksum/config"
	AnnotationMilvusGeneration = v1beta1.AnnotationMilvusGeneration

	ToolsVolumeName = "tools"
	ToolsMountPath  = "/milvus/tools"
	RunScriptPath   = ToolsMountPath + "/run.sh"
	MergeToolPath   = ToolsMountPath + "/merge"
)
View Source
const (
	MilvusStatusCodePending     = float64(0)
	MilvusStatusCodeHealthy     = float64(1)
	MilvusStatusCodeUnHealthy   = float64(2)
	MilvusStatusCodeDeleting    = float64(3)
	MilvusStautsCodeStopped     = float64(4)
	MilvusStautsCodeMaintaining = float64(5)
)

MilvusStatusCode for milvusStatusCollector

View Source
const (
	MilvusFinalizerName         = "milvus.milvus.io/finalizer"
	ForegroundDeletionFinalizer = "foregroundDeletion"
	PauseReconcileAnnotation    = "milvus.io/pause-reconcile"
	MaintainingAnnotation       = "milvus.io/maintaining"
)
View Source
const (
	ConditionCheckingUpgrationState = "CheckingUpgrationState"
	ConditionUpgraded               = "Upgraded"
	ConditionRollbacked             = "Rollbacked"
)
View Source
const (
	LabelUpgrade  = v1beta1.MilvusIO + "upgrade"
	LabelTaskKind = v1beta1.MilvusIO + "task-kind"
	BackupMeta    = "backup-meta"
	UpdateMeta    = "update-meta"
	RollbackMeta  = "rollback-meta"
)
View Source
const (
	MessageEtcdReady         = "Etcd endpoints is healthy"
	MessageEtcdNotReady      = "All etcd endpoints are unhealthy"
	MessageStorageReady      = "Storage endpoints is healthy"
	MessageStorageNotReady   = "All Storage endpoints are unhealthy"
	MessageMsgStreamReady    = "MsgStream is ready"
	MessageMsgStreamNotReady = "MsgStream is not ready"
	MessageSecretNotExist    = "Secret not exist"
	MessageKeyNotExist       = "accesskey or secretkey not exist in secret"
	MessageDecodeErr         = "accesskey or secretkey decode error"
	MessageMilvusHealthy     = "All Milvus components are healthy"
	MessageMilvusStopped     = "All Milvus components are stopped"
	MessageMilvusStopping    = "All Milvus components are stopping"
	// SSL related messages
	MessageStorageSSLCertSecretNotExist = "SSL CA certificate secret not exist"
	MessageStorageSSLCertKeyNotExist    = "CA certificate not found in secret, expected key: ca.crt"
	MessageStorageSSLCertLoadFailed     = "Failed to load CA certificate"
)
View Source
const (
	AppLabel          = "app.kubernetes.io/"
	AppLabelInstance  = AppLabel + "instance"
	AppLabelVersion   = AppLabel + "version"
	AppLabelComponent = AppLabel + "component"
	AppLabelName      = AppLabel + "name"
	AppLabelManagedBy = AppLabel + "managed-by"
	HelmReleaseLabel  = "release"
)
View Source
const (
	CacheSizeEnvVarName = "CACHE_SIZE"
)
View Source
const ManagerName = "milvus-operator"

Variables

define MilvusComponents

View Source
var (
	ErrNotFound         = errors.New("not found")
	ErrNoLastDeployment = errors.New("no last deployment found")
)
View Source
var (
	DefaultConfigMapMode = corev1.ConfigMapVolumeSourceDefaultMode
	ErrRequeue           = errors.New("requeue")
)
View Source
var (
	DefaultOperatorImageInfo = ImageInfo{
		Image:           "milvusdb/milvus-operator:main-latest",
		ImagePullPolicy: corev1.PullAlways,
	}
	ToolImage = ""
)
View Source
var (
	S3ReadyCondition = v1beta1.MilvusCondition{
		Type:   v1beta1.StorageReady,
		Status: GetConditionStatus(true),
		Reason: v1beta1.ReasonS3Ready,
	}
	Debug = false
)
View Source
var (
	CacheSizeEnvVar = corev1.EnvVar{
		Name: CacheSizeEnvVarName,
		ValueFrom: &corev1.EnvVarSource{
			ResourceFieldRef: &corev1.ResourceFieldSelector{
				Divisor:  resource.MustParse("1Gi"),
				Resource: "limits.memory",
			},
		},
	}
)
View Source
var CheckComponentHasTerminatingPod = func(ctx context.Context, cli client.Client, mc v1beta1.Milvus, component MilvusComponent) (bool, error) {
	opts := &client.ListOptions{
		Namespace: mc.Namespace,
	}
	opts.LabelSelector = labels.SelectorFromSet(NewComponentAppLabels(mc.Name, component.Name))
	list, err := listTerminatingPodByOpts(ctx, cli, opts)
	if err != nil {
		return false, err
	}
	return len(list.Items) > 0, nil
}
View Source
var CheckMilvusStopped = func(ctx context.Context, cli client.Client, mc v1beta1.Milvus) (bool, error) {
	podList, err := listMilvusPods(ctx, cli, mc)
	if err != nil {
		return false, err
	}
	if len(podList.Items) > 0 {
		logger := ctrl.LoggerFrom(ctx)
		logger.Info("milvus has pods not stopped", "pods count", len(podList.Items))
		return false, ExecKillIfTerminating(ctx, podList)
	}
	return true, nil
}
View Source
var DeploymentConditionNotSet = appsv1.DeploymentCondition{}
View Source
var Finalize = func(ctx context.Context, r *MilvusReconciler, mc v1beta1.Milvus) error {
	deletingReleases := map[string]bool{}
	if !mc.Spec.Dep.Etcd.External && mc.Spec.Dep.Etcd.InCluster.DeletionPolicy == v1beta1.DeletionPolicyDelete {
		deletingReleases[mc.Name+"-etcd"] = mc.Spec.Dep.Etcd.InCluster.PVCDeletion
	}

	switch mc.Spec.Dep.MsgStreamType {
	case v1beta1.MsgStreamTypeKafka:
		if !mc.Spec.Dep.Kafka.External && mc.Spec.Dep.Kafka.InCluster.DeletionPolicy == v1beta1.DeletionPolicyDelete {
			deletingReleases[mc.Name+"-kafka"] = mc.Spec.Dep.Kafka.InCluster.PVCDeletion
		}
	case v1beta1.MsgStreamTypePulsar:
		if !mc.Spec.Dep.Pulsar.External && mc.Spec.Dep.Pulsar.InCluster.DeletionPolicy == v1beta1.DeletionPolicyDelete {
			deletingReleases[mc.Name+"-pulsar"] = mc.Spec.Dep.Pulsar.InCluster.PVCDeletion
		}
	default:

		persist := mc.Spec.Dep.RocksMQ.Persistence
		if persist.Enabled && persist.PVCDeletion {
			pvcName := getPVCNameByInstName(mc.Name)
			pvc := &corev1.PersistentVolumeClaim{}
			pvc.Namespace = mc.Namespace
			pvc.Name = pvcName
			if err := r.Delete(ctx, pvc); err != nil {
				return errors.Wrap(err, "delete data pvc failed")
			} else {
				r.logger.Info("pvc deleted", "name", pvc.Name, "namespace", pvc.Namespace)
			}
		}
	}

	if !mc.Spec.Dep.Storage.External && mc.Spec.Dep.Storage.InCluster.DeletionPolicy == v1beta1.DeletionPolicyDelete {
		deletingReleases[mc.Name+"-minio"] = mc.Spec.Dep.Storage.InCluster.PVCDeletion
	}

	if len(deletingReleases) > 0 {
		cfg := r.helmReconciler.NewHelmCfg(mc.Namespace)

		errs := []error{}
		for releaseName, deletePVC := range deletingReleases {
			if err := helm.Uninstall(cfg, releaseName); err != nil {
				errs = append(errs, err)
				continue
			}

			if deletePVC {

				err := r.batchDeletePVC(ctx, mc.Namespace, AppLabelInstance, releaseName)
				if err != nil {
					err = errors.Wrapf(err, "delete pvc with label %s=%s failed", AppLabelInstance, releaseName)
					errs = append(errs, err)
					continue
				}

				err = r.batchDeletePVC(ctx, mc.Namespace, HelmReleaseLabel, releaseName)
				if err != nil {
					err = errors.Wrapf(err, "delete pvc with label %s=%s failed", HelmReleaseLabel, releaseName)
					errs = append(errs, err)
					continue
				}
			}
		}

		if len(errs) > 0 {
			return errors.New(util.JoinErrors(errs))
		}
	}

	return nil
}
View Source
var ListMilvusTerminatingPods = func(ctx context.Context, cli client.Client, mc v1beta1.Milvus) (*corev1.PodList, error) {
	opts := &client.ListOptions{
		Namespace: mc.Namespace,
	}
	opts.LabelSelector = labels.SelectorFromSet(NewAppLabels(mc.Name))
	return listTerminatingPodByOpts(ctx, cli, opts)
}
View Source
var MergeAnnotations = MergeLabels
View Source
var PodConditionNotSet = corev1.PodCondition{}

PodConditionNotSet is used when pod condition is not found when calling GetPodConditionByType

View Source
var (
	SetControllerReference = func(owner, controlled metav1.Object, scheme *kruntime.Scheme) error {
		if err := ctrl.SetControllerReference(owner, controlled, scheme); err != nil {
			if reflect.TypeOf(err) == alreadyOwnedErrorType {
				if err.(*controllerutil.AlreadyOwnedError).Owner.Kind == milvusclusterOwnerKind {
					return nil
				}
			}
			return errors.Wrap(err, "set controller reference")
		}
		return nil
	}
)

Functions

func DeploymentReady

func DeploymentReady(status appsv1.DeploymentStatus) bool

DeploymentReady deployment is ready when all components are available

func ExecKillIfTerminating

func ExecKillIfTerminating(ctx context.Context, podList *corev1.PodList) error

func GetAzureStorageAccount

func GetAzureStorageAccount(conf map[string]interface{}) string

func GetCondition

func GetCondition(getter func() v1beta1.MilvusCondition, eps []string) v1beta1.MilvusCondition

func GetConditionStatus

func GetConditionStatus(b bool) corev1.ConditionStatus

func GetConfCheckSum

func GetConfCheckSum(spec v1beta1.MilvusSpec) string

GetConfCheckSum returns the checksum of the component configuration

func GetContainerIndex

func GetContainerIndex(containers []corev1.Container, name string) int

GetContainerIndex returns index of container @name in @containers, -1 if not found

func GetContainerMessage

func GetContainerMessage(status corev1.ContainerStatus) string

func GetDefaultLivenessProbe

func GetDefaultLivenessProbe() *corev1.Probe

func GetDefaultReadinessProbe

func GetDefaultReadinessProbe() *corev1.Probe

func GetDefaultStartupProbe

func GetDefaultStartupProbe() *corev1.Probe

func GetDeploymentFalseCondition

func GetDeploymentFalseCondition(deploy appsv1.Deployment) (*appsv1.DeploymentCondition, error)

func GetDeploymentGroupId

func GetDeploymentGroupId(deploy *appsv1.Deployment) (int, error)

func GetDeploymentStrategy added in v1.2.6

func GetDeploymentStrategy(milvus *v1beta1.Milvus, component MilvusComponent) appsv1.DeploymentStrategy

func GetEndpointsHealth

func GetEndpointsHealth(ctx context.Context, authConfig EtcdAuthConfig, endpoints []string) map[string]EtcdEndPointHealth

func GetEtcdCondition

func GetEtcdCondition(ctx context.Context, authCfg EtcdAuthConfig, endpoints []string) v1beta1.MilvusCondition

func GetKafkaConfFromCR

func GetKafkaConfFromCR(mc v1beta1.Milvus) (*external.CheckKafkaConfig, error)

GetKafkaConfFromCR get kafka config from CR

func GetMilvusConfCheckSum

func GetMilvusConfCheckSum(spec v1beta1.MilvusSpec) string

GetMilvusConfCheckSum returns the checksum of the component configuration

func GetMilvusEndpoint

func GetMilvusEndpoint(ctx context.Context, logger logr.Logger, client client.Client, info MilvusEndpointInfo) string

func GetMilvusUpdatedCondition

func GetMilvusUpdatedCondition(m *v1beta1.Milvus) v1beta1.MilvusCondition

func GetMinioBucket

func GetMinioBucket(conf map[string]interface{}) string

func GetMinioIAMEndpoint

func GetMinioIAMEndpoint(conf map[string]interface{}) string

func GetMinioSecure

func GetMinioSecure(conf map[string]interface{}) bool

func GetMinioUseIAM

func GetMinioUseIAM(conf map[string]interface{}) bool

func GetNotReadyDependencyConditions

func GetNotReadyDependencyConditions(conditions []v1beta1.MilvusCondition) map[v1beta1.MilvusConditionType]*v1beta1.MilvusCondition

func GetNotReadyPods

func GetNotReadyPods(pods []corev1.Pod) []corev1.Pod

func GetPodConditionByType

func GetPodConditionByType(conditions []corev1.PodCondition, t corev1.PodConditionType) corev1.PodCondition

GetPodConditionByType returns the condition with the provided type, return ConditionNotSet if not found

func GetPodFalseCondition

func GetPodFalseCondition(pod corev1.Pod) (*corev1.PodCondition, error)

func GetServiceInstanceName

func GetServiceInstanceName(instance string) string

GetServiceInstanceName returns the name of the component service

func GetStorageSecretRefEnv

func GetStorageSecretRefEnv(secretRef string) []corev1.EnvVar

func GetStringValueWithDefault

func GetStringValueWithDefault(conf map[string]interface{}, defaultVal string, fields ...string) string

func GetTerminatingPods

func GetTerminatingPods(pods []corev1.Pod) []corev1.Pod

func GetVolumeIndex

func GetVolumeIndex(volumes []corev1.Volume, name string) int

GetVolumeIndex returns index of volume @name in @volumes, -1 if not found

func GetVolumeMountIndex

func GetVolumeMountIndex(volumeMounts []corev1.VolumeMount, mountPath string) int

GetVolumeMountIndex returns index of volumeMount with @mountPath in @volumeMounts, -1 if not found

func InitializeMetrics

func InitializeMetrics()

InitializeMetrics for controllers

func IsDependencyReady

func IsDependencyReady(conditions []v1beta1.MilvusCondition) bool

func IsEqual

func IsEqual(obj1, obj2 interface{}) bool

IsEqual check two object is equal.

func IsMilvusConditionTrueByType

func IsMilvusConditionTrueByType(conditions []v1beta1.MilvusCondition, t v1beta1.MilvusConditionType) bool

func IsMilvusDeploymentsComplete

func IsMilvusDeploymentsComplete(m *v1beta1.Milvus) bool

func IsPulsarChartPath

func IsPulsarChartPath(chartPath string) bool

func IsSetDefaultDone

func IsSetDefaultDone(mc *v1beta1.Milvus) bool

func LoopWithInterval

func LoopWithInterval(ctx context.Context, loopFunc func() error, interval time.Duration, logger logr.Logger)

func MergeContainerPort

func MergeContainerPort(src, dst []corev1.ContainerPort) []corev1.ContainerPort

func MergeEnvVar

func MergeEnvVar(src, dst []corev1.EnvVar) []corev1.EnvVar

Merge dst env into src

func MergeLabels

func MergeLabels(allLabels ...map[string]string) map[string]string

MergeLabels merges all labels together and returns a new label.

func MergeServicePort

func MergeServicePort(src, dst []corev1.ServicePort) []corev1.ServicePort

func MergeVolumeMount

func MergeVolumeMount(src, dst []corev1.VolumeMount) []corev1.VolumeMount

func MilvusStatusToCode

func MilvusStatusToCode(status v1beta1.MilvusHealthStatus, isMaintaining bool) float64

func NamespacedName

func NamespacedName(namespace, name string) types.NamespacedName

func NewAppLabels

func NewAppLabels(instance string) map[string]string

func NewComponentAppLabels

func NewComponentAppLabels(instance, component string) map[string]string

func NewServicePodLabels

func NewServicePodLabels(instance string) map[string]string

func PodReady

func PodReady(pod corev1.Pod) bool

PodReady returns whether a pod is running and each container has passed it's ready state.

func RemoveConditions

func RemoveConditions(status *v1beta1.MilvusStatus, typesToRemove []v1beta1.MilvusConditionType)

func ReplicasValue

func ReplicasValue(i *int32) int32

func SetupControllers

func SetupControllers(ctx context.Context, mgr manager.Manager, stopReconcilers []string, enableHook bool) error

func ShouldUseVirtualHost

func ShouldUseVirtualHost(conf map[string]interface{}) bool

func UpdateCondition

func UpdateCondition(status *v1beta1.MilvusStatus, c v1beta1.MilvusCondition)

func WarppedReconcileComponentFunc

func WarppedReconcileComponentFunc(
	f func(context.Context, v1beta1.Milvus, MilvusComponent) error,
	ctx context.Context, mc v1beta1.Milvus, c MilvusComponent) func() error

func WrappedFunc

func WrappedFunc(f interface{}, args ...interface{}) func() error

Types

type Args

type Args = []interface{}

Args array of args for a func

type Chart

type Chart = string

type CommonComponentReconciler

type CommonComponentReconciler struct {
	// contains filtered or unexported fields
}

func NewCommonComponentReconciler

func NewCommonComponentReconciler(r *MilvusReconciler) *CommonComponentReconciler

func (*CommonComponentReconciler) Reconcile

func (r *CommonComponentReconciler) Reconcile(ctx context.Context, mc v1beta1.Milvus, component MilvusComponent) error

type CommonInfo

type CommonInfo struct {
	OperatorImageInfo ImageInfo
	// contains filtered or unexported fields
}

CommonInfo should be init when before time reconcile

func (*CommonInfo) InitIfNot

func (c *CommonInfo) InitIfNot(cli client.Client)

type ComponentConditionGetter

type ComponentConditionGetter interface {
	GetMilvusInstanceCondition(ctx context.Context, cli client.Client, mc v1beta1.Milvus) (v1beta1.MilvusCondition, error)
}

func GetComponentConditionGetter

func GetComponentConditionGetter() ComponentConditionGetter

type ComponentConditionGetterImpl

type ComponentConditionGetterImpl struct{}

func (ComponentConditionGetterImpl) GetMilvusInstanceCondition

func (c ComponentConditionGetterImpl) GetMilvusInstanceCondition(ctx context.Context, cli client.Client, mc v1beta1.Milvus) (v1beta1.MilvusCondition, error)

type ComponentErrorDetail

type ComponentErrorDetail struct {
	ComponentName string
	NotObserved   bool
	Deployment    *appsv1.DeploymentCondition
	PodName       string
	Pod           *corev1.PodCondition
	Container     *corev1.ContainerStatus
}

ComponentErrorDetail is one sample of error detail among all error pods

func (ComponentErrorDetail) String

func (m ComponentErrorDetail) String() string

type ComponentSpec

type ComponentSpec = v1beta1.ComponentSpec

func MergeComponentSpec

func MergeComponentSpec(src, dst ComponentSpec) ComponentSpec

MergeComponentSpec merges the src ComponentSpec to dst

type DeployController

type DeployController interface {
	Reconcile(context.Context, v1beta1.Milvus, MilvusComponent) error
}

DeployController controls milvus deployments

type DeployControllerBiz

type DeployControllerBiz interface {
	// backward compatible logic
	CheckDeployMode(ctx context.Context, mc v1beta1.Milvus) (v1beta1.ComponentDeployMode, error)
	IsUpdating(ctx context.Context, mc v1beta1.Milvus) (bool, error)
	DeployModeChanger

	// 2 deployment mode logic
	IsPaused(ctx context.Context, mc v1beta1.Milvus) bool
	HandleCreate(ctx context.Context, mc v1beta1.Milvus) error
	HandleStop(ctx context.Context, mc v1beta1.Milvus) error
	HandleScaling(ctx context.Context, mc v1beta1.Milvus) error
	HandleRolling(ctx context.Context, mc v1beta1.Milvus) error
	HandleManualMode(ctx context.Context, mc v1beta1.Milvus) error
}

DeployControllerBiz are the business logics of DeployController, abstracted for unit test

type DeployControllerBizFactory

type DeployControllerBizFactory interface {
	GetBiz(component MilvusComponent) DeployControllerBiz
}

DeployControllerBizFactory is the factory of DeployControllerBiz

type DeployControllerBizFactoryImpl

type DeployControllerBizFactoryImpl struct {
	// contains filtered or unexported fields
}

DeployControllerBizFactoryImpl is the implementation of DeployControllerBizFactory

func NewDeployControllerBizFactory

func NewDeployControllerBizFactory(modeChangerFactory DeployModeChangerFactory, utilFactory DeployControllerBizUtilFactory, cli client.Client) *DeployControllerBizFactoryImpl

NewDeployControllerBizFactory creates a new DeployControllerBizFactory

func (*DeployControllerBizFactoryImpl) GetBiz

GetBiz get DeployControllerBiz for the given component

type DeployControllerBizImpl

type DeployControllerBizImpl struct {
	DeployModeChanger
	// contains filtered or unexported fields
}

DeployControllerBizImpl implements DeployControllerBiz

func NewDeployControllerBizImpl

func NewDeployControllerBizImpl(component MilvusComponent, util DeployControllerBizUtil, modeChanger DeployModeChanger, cli client.Client) *DeployControllerBizImpl

func (*DeployControllerBizImpl) CheckDeployMode

func (*DeployControllerBizImpl) HandleCreate

func (c *DeployControllerBizImpl) HandleCreate(ctx context.Context, mc v1beta1.Milvus) error

func (*DeployControllerBizImpl) HandleManualMode

func (c *DeployControllerBizImpl) HandleManualMode(ctx context.Context, mc v1beta1.Milvus) error

func (*DeployControllerBizImpl) HandleRolling

func (c *DeployControllerBizImpl) HandleRolling(ctx context.Context, mc v1beta1.Milvus) error

func (*DeployControllerBizImpl) HandleScaling

func (c *DeployControllerBizImpl) HandleScaling(ctx context.Context, mc v1beta1.Milvus) error

func (*DeployControllerBizImpl) HandleStop

func (c *DeployControllerBizImpl) HandleStop(ctx context.Context, mc v1beta1.Milvus) error

func (*DeployControllerBizImpl) IsPaused

func (*DeployControllerBizImpl) IsUpdating

func (c *DeployControllerBizImpl) IsUpdating(ctx context.Context, mc v1beta1.Milvus) (bool, error)

type DeployControllerBizUtil

type DeployControllerBizUtil interface {
	RenderPodTemplateWithoutGroupID(mc v1beta1.Milvus, currentTemplate *corev1.PodTemplateSpec, component MilvusComponent, forceUpdateAll bool) *corev1.PodTemplateSpec

	// GetDeploys returns currentDeployment, lastDeployment when there is exactly one currentDeployment, one lastDeployment
	// otherwise return err. in particular:
	// - return ErrNotFound when no deployment found
	// - return ErrNoCurrentDeployment when no current deployment found
	// - return ErrNoLastDeployment when no last deployment found
	GetDeploys(ctx context.Context, mc v1beta1.Milvus) (currentDeployment, lastDeployment *appsv1.Deployment, err error)
	// CreateDeploy with replica = 0, if groupId != 0, set image to dummy to avoid rolling back and forth
	CreateDeploy(ctx context.Context, mc v1beta1.Milvus, podTemplate *corev1.PodTemplateSpec, groupId int) error

	ShouldRollback(ctx context.Context, currentDeploy, lastDeployment *appsv1.Deployment, podTemplate *corev1.PodTemplateSpec) bool
	LastRolloutFinished(ctx context.Context, mc v1beta1.Milvus, currentDeployment, lastDeployment *appsv1.Deployment) (bool, error)
	IsNewRollout(ctx context.Context, currentDeployment *appsv1.Deployment, podTemplate *corev1.PodTemplateSpec) bool
	// ScaleDeployments scales 2 deployments to proper replicas, it assumes currentDeployment & lastDeployment not nil
	ScaleDeployments(ctx context.Context, mc v1beta1.Milvus, currentDeployment, lastDeployment *appsv1.Deployment) error
	// PrepareNewRollout prepare a new rollout, it assumes currentDeployment not nil
	PrepareNewRollout(ctx context.Context, mc v1beta1.Milvus, currentDeployment *appsv1.Deployment, podTemplate *corev1.PodTemplateSpec) error
	// RenewDeployAnnotation check annotation, renew if necessary, returns true if annotation is updated
	RenewDeployAnnotation(ctx context.Context, mc v1beta1.Milvus, currentDeployment *appsv1.Deployment) bool

	K8sUtil
}

DeployControllerBizUtil are the business logics of DeployControllerBizImpl, abstracted for unit test

type DeployControllerBizUtilFactory

type DeployControllerBizUtilFactory interface {
	GetBizUtil(component MilvusComponent) DeployControllerBizUtil
}

type DeployControllerBizUtilFactoryImpl

type DeployControllerBizUtilFactoryImpl struct {
	// contains filtered or unexported fields
}

func NewDeployControllerBizUtilFactory

func NewDeployControllerBizUtilFactory(cli client.Client, k8sUtil K8sUtil) *DeployControllerBizUtilFactoryImpl

func (*DeployControllerBizUtilFactoryImpl) GetBizUtil

type DeployControllerBizUtilImpl

type DeployControllerBizUtilImpl struct {
	K8sUtil
	// contains filtered or unexported fields
}

func NewDeployControllerBizUtil

func NewDeployControllerBizUtil(component MilvusComponent, cli client.Client, k8sUtil K8sUtil) *DeployControllerBizUtilImpl

func (*DeployControllerBizUtilImpl) CreateDeploy

func (c *DeployControllerBizUtilImpl) CreateDeploy(ctx context.Context, mc v1beta1.Milvus, podTemplate *corev1.PodTemplateSpec, groupId int) error

func (*DeployControllerBizUtilImpl) GetDeploys

func (c *DeployControllerBizUtilImpl) GetDeploys(ctx context.Context, mc v1beta1.Milvus) (currentDeployment, lastDeployment *appsv1.Deployment, err error)

func (*DeployControllerBizUtilImpl) IsNewRollout

func (c *DeployControllerBizUtilImpl) IsNewRollout(ctx context.Context, currentDeployment *appsv1.Deployment, podTemplate *corev1.PodTemplateSpec) bool

func (*DeployControllerBizUtilImpl) LastRolloutFinished

func (c *DeployControllerBizUtilImpl) LastRolloutFinished(ctx context.Context, mc v1beta1.Milvus, currentDeployment, lastDeployment *appsv1.Deployment) (bool, error)

func (*DeployControllerBizUtilImpl) PrepareNewRollout

func (c *DeployControllerBizUtilImpl) PrepareNewRollout(ctx context.Context, mc v1beta1.Milvus, currentDeployment *appsv1.Deployment, podTemplate *corev1.PodTemplateSpec) error

func (*DeployControllerBizUtilImpl) RenderPodTemplateWithoutGroupID

func (c *DeployControllerBizUtilImpl) RenderPodTemplateWithoutGroupID(mc v1beta1.Milvus, currentTemplate *corev1.PodTemplateSpec, component MilvusComponent, forceUpdateAll bool) *corev1.PodTemplateSpec

func (*DeployControllerBizUtilImpl) RenewDeployAnnotation

func (c *DeployControllerBizUtilImpl) RenewDeployAnnotation(ctx context.Context, mc v1beta1.Milvus, currentDeploy *appsv1.Deployment) bool

RenewDeployAnnotation returns true if annotation is updated

func (*DeployControllerBizUtilImpl) ScaleDeployments

func (c *DeployControllerBizUtilImpl) ScaleDeployments(ctx context.Context, mc v1beta1.Milvus, currentDeployment, lastDeployment *appsv1.Deployment) error

ScaleDeployments to current deploymement, we assume both current & last deploy is not nil

func (*DeployControllerBizUtilImpl) ShouldRollback

func (c *DeployControllerBizUtilImpl) ShouldRollback(ctx context.Context, currentDeploy, lastDeploy *appsv1.Deployment, podTemplate *corev1.PodTemplateSpec) bool

ShouldRollback returns if query node should rollback, it assumes currentDeploy not nil

type DeployControllerImpl

type DeployControllerImpl struct {
	// contains filtered or unexported fields
}

DeployControllerImpl is the implementation of DeployController

func NewDeployController

func NewDeployController(
	bizFactory DeployControllerBizFactory,
	oneDeployModeController DeployController,
	rollingModeStatusUpdater RollingModeStatusUpdater) *DeployControllerImpl

NewDeployController returns a DeployController

func (*DeployControllerImpl) Reconcile

func (c *DeployControllerImpl) Reconcile(ctx context.Context, mc v1beta1.Milvus, component MilvusComponent) error

type DeployModeChanger

type DeployModeChanger interface {
	MarkDeployModeChanging(ctx context.Context, mc v1beta1.Milvus, changing bool) error
	ChangeToTwoDeployMode(ctx context.Context, mc v1beta1.Milvus) error
}

DeployModeChanger changes deploy mode

type DeployModeChangerFactory

type DeployModeChangerFactory interface {
	GetDeployModeChanger(component MilvusComponent) DeployModeChanger
}

type DeployModeChangerFactoryImpl

type DeployModeChangerFactoryImpl struct {
	// contains filtered or unexported fields
}

func NewDeployModeChangerFactory

func NewDeployModeChangerFactory(cli client.Client, util K8sUtil) *DeployModeChangerFactoryImpl

func (*DeployModeChangerFactoryImpl) GetDeployModeChanger

func (f *DeployModeChangerFactoryImpl) GetDeployModeChanger(component MilvusComponent) DeployModeChanger

type DeployModeChangerImpl

type DeployModeChangerImpl struct {
	// contains filtered or unexported fields
}

func NewDeployModeChanger

func NewDeployModeChanger(component MilvusComponent, cli client.Client, util K8sUtil) *DeployModeChangerImpl

func (*DeployModeChangerImpl) ChangeToTwoDeployMode

func (c *DeployModeChangerImpl) ChangeToTwoDeployMode(ctx context.Context, mc v1beta1.Milvus) error

func (*DeployModeChangerImpl) MarkCurrentDeploy

func (c *DeployModeChangerImpl) MarkCurrentDeploy(ctx context.Context, mc v1beta1.Milvus) error

func (*DeployModeChangerImpl) MarkDeployModeChanging

func (c *DeployModeChangerImpl) MarkDeployModeChanging(ctx context.Context, mc v1beta1.Milvus, changing bool) error

func (*DeployModeChangerImpl) RecoverDeploy

func (c *DeployModeChangerImpl) RecoverDeploy(ctx context.Context, mc v1beta1.Milvus) error

func (*DeployModeChangerImpl) RecoverReplicaSets

func (c *DeployModeChangerImpl) RecoverReplicaSets(ctx context.Context, mc v1beta1.Milvus) error

func (*DeployModeChangerImpl) SaveDeleteOldDeploy

func (c *DeployModeChangerImpl) SaveDeleteOldDeploy(ctx context.Context, mc v1beta1.Milvus) error

func (*DeployModeChangerImpl) SaveDeleteOldReplicaSet

func (c *DeployModeChangerImpl) SaveDeleteOldReplicaSet(ctx context.Context, mc v1beta1.Milvus) error

func (*DeployModeChangerImpl) UpdateOldPodLabels

func (c *DeployModeChangerImpl) UpdateOldPodLabels(ctx context.Context, mc v1beta1.Milvus) error

type EndpointCheckCache

type EndpointCheckCache interface {
	TryStartProbeFor(endpoint []string) bool
	EndProbeFor(endpoint []string)
	Get(endpoint []string) (condition *v1beta1.MilvusCondition, found bool)
	Set(endpoints []string, condition *v1beta1.MilvusCondition)
}

EndpointCheckCache coordinates endpoint check to avoid duplicated check for same endpoint

func NewEndpointCheckCacheImpl

func NewEndpointCheckCacheImpl() EndpointCheckCache

type EndpointCheckCacheImpl

type EndpointCheckCacheImpl struct {
	// contains filtered or unexported fields
}

EndpointCheckCacheImpl implements EndpointCheckCache

func (EndpointCheckCacheImpl) EndProbeFor

func (e EndpointCheckCacheImpl) EndProbeFor(endpoints []string)

func (EndpointCheckCacheImpl) Get

func (e EndpointCheckCacheImpl) Get(endpoints []string) (condition *v1beta1.MilvusCondition, isUpToDate bool)

func (EndpointCheckCacheImpl) Set

func (e EndpointCheckCacheImpl) Set(endpoints []string, condition *v1beta1.MilvusCondition)

func (EndpointCheckCacheImpl) TryStartProbeFor

func (e EndpointCheckCacheImpl) TryStartProbeFor(endpoints []string) bool

TryStartProbeFor use an atomic int32 to lock the endpoint

type EtcdAuthConfig

type EtcdAuthConfig struct {
	Enabled  bool
	Username string
	Password string
}

type EtcdClient

type EtcdClient interface {
	Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error)
	AlarmList(ctx context.Context) (*clientv3.AlarmResponse, error)
	Close() error
}

EtcdClient for mock

type EtcdConditionInfo

type EtcdConditionInfo struct {
	Endpoints []string
}

type EtcdEndPointHealth

type EtcdEndPointHealth struct {
	Ep     string `json:"endpoint"`
	Health bool   `json:"health"`
	Error  string `json:"error,omitempty"`
}

type Func

type Func = interface{}

Func any callable func

type Group

type Group struct {
	// contains filtered or unexported fields
}

func NewGroup

func NewGroup(ctx context.Context) (*Group, context.Context)

NewGroup creates a new group.

func (*Group) Go

func (g *Group) Go(f func() error)

Go to run a func.

func (*Group) Wait

func (g *Group) Wait() error

Wait until all the go functions are returned If errors occurred, they'll be combined with ":" and returned.

type GroupRunner

type GroupRunner interface {
	// Run runs a group of funcs by same args, if any func fail, it should return err
	Run(funcs []Func, ctx context.Context, args ...interface{}) error
	// RunWithResult runs a group of funcs by same args, returns results with data & err for each func called
	RunWithResult(funcs []Func, ctx context.Context, args ...interface{}) []Result
	// RunDiffArgs runs a func by groups of args from @argsArray multiple times, if any failed, it should return err
	RunDiffArgs(f MilvusReconcileFunc, ctx context.Context, argsArray []*v1beta1.Milvus) error
}

GroupRunner does a group of funcs in parallel

type HelmReconciler

type HelmReconciler interface {
	NewHelmCfg(namespace string) *action.Configuration
	Reconcile(ctx context.Context, request helm.ChartRequest) error
	GetValues(namespace, release string) (map[string]interface{}, error)
}

HelmReconciler reconciles Helm releases

type ImageInfo

type ImageInfo struct {
	Image           string
	ImagePullPolicy corev1.PullPolicy
}

ImageInfo for image pulling

type K8sClient

type K8sClient interface {
	client.Client
}

K8sClient for mock

type K8sStatusClient

type K8sStatusClient interface {
	client.SubResourceWriter
}

K8sStatusClient for mock K8sClient.Status()

type K8sUtil

type K8sUtil interface {

	// CreateObject if not exist
	CreateObject(ctx context.Context, obj client.Object) error
	OrphanDelete(ctx context.Context, obj client.Object) error
	MarkMilvusComponentGroupId(ctx context.Context, mc v1beta1.Milvus, component MilvusComponent, groupId int) error
	UpdateAndRequeue(ctx context.Context, obj client.Object) error

	// SaveObject in controllerrevision
	SaveObject(ctx context.Context, mc v1beta1.Milvus, name string, obj runtime.Object) error
	// GetObject from controllerrevision
	GetSavedObject(ctx context.Context, key client.ObjectKey, obj interface{}) error

	// read
	GetOldDeploy(ctx context.Context, mc v1beta1.Milvus, component MilvusComponent) (*appsv1.Deployment, error)
	ListOldReplicaSets(ctx context.Context, mc v1beta1.Milvus, component MilvusComponent) (appsv1.ReplicaSetList, error)
	ListOldPods(ctx context.Context, mc v1beta1.Milvus, component MilvusComponent) ([]corev1.Pod, error)
	ListDeployPods(ctx context.Context, deploy *appsv1.Deployment, component MilvusComponent) ([]corev1.Pod, error)

	// logic
	// DeploymentIsStable returns whether deployment is stable
	// if deployment is not stable, return reason string
	DeploymentIsStable(deploy *appsv1.Deployment, allPods []corev1.Pod) (isStable bool, reason string)
}

type K8sUtilImpl

type K8sUtilImpl struct {
	// contains filtered or unexported fields
}

func NewK8sUtil

func NewK8sUtil(cli client.Client) *K8sUtilImpl

func (*K8sUtilImpl) CreateObject

func (c *K8sUtilImpl) CreateObject(ctx context.Context, obj client.Object) error

func (*K8sUtilImpl) DeploymentIsStable

func (c *K8sUtilImpl) DeploymentIsStable(deploy *appsv1.Deployment, allPods []corev1.Pod) (isStable bool, reason string)

func (*K8sUtilImpl) GetOldDeploy

func (c *K8sUtilImpl) GetOldDeploy(ctx context.Context, mc v1beta1.Milvus, component MilvusComponent) (*appsv1.Deployment, error)

func (*K8sUtilImpl) GetSavedObject

func (c *K8sUtilImpl) GetSavedObject(ctx context.Context, key client.ObjectKey, obj interface{}) error

func (*K8sUtilImpl) ListDeployPods

func (c *K8sUtilImpl) ListDeployPods(ctx context.Context, deploy *appsv1.Deployment, component MilvusComponent) ([]corev1.Pod, error)

func (*K8sUtilImpl) ListOldPods

func (c *K8sUtilImpl) ListOldPods(ctx context.Context, mc v1beta1.Milvus, component MilvusComponent) ([]corev1.Pod, error)

func (*K8sUtilImpl) ListOldReplicaSets

func (c *K8sUtilImpl) ListOldReplicaSets(ctx context.Context, mc v1beta1.Milvus, component MilvusComponent) (appsv1.ReplicaSetList, error)

func (*K8sUtilImpl) MarkMilvusComponentGroupId

func (c *K8sUtilImpl) MarkMilvusComponentGroupId(ctx context.Context, mc v1beta1.Milvus, component MilvusComponent, groupId int) error

func (*K8sUtilImpl) OrphanDelete

func (c *K8sUtilImpl) OrphanDelete(ctx context.Context, obj client.Object) error

func (*K8sUtilImpl) SaveObject

func (c *K8sUtilImpl) SaveObject(ctx context.Context, mc v1beta1.Milvus, name string, obj runtime.Object) error

SaveObject in controllerrevision

func (*K8sUtilImpl) UpdateAndRequeue

func (c *K8sUtilImpl) UpdateAndRequeue(ctx context.Context, obj client.Object) error

type LocalHelmReconciler

type LocalHelmReconciler struct {
	// contains filtered or unexported fields
}

LocalHelmReconciler implements HelmReconciler at local

func MustNewLocalHelmReconciler

func MustNewLocalHelmReconciler(helmSettings *cli.EnvSettings, logger logr.Logger, mgr manager.Manager) *LocalHelmReconciler

func (*LocalHelmReconciler) GetValues

func (l *LocalHelmReconciler) GetValues(namespace, release string) (map[string]interface{}, error)

func (LocalHelmReconciler) NewHelmCfg

func (l LocalHelmReconciler) NewHelmCfg(namespace string) *action.Configuration

func (LocalHelmReconciler) Reconcile

func (l LocalHelmReconciler) Reconcile(ctx context.Context, request helm.ChartRequest) error

ReconcileHelm reconciles Helm releases

type Manager

type Manager interface {
	// Add adds a runnable to the Manager
	Add(manager.Runnable) error

	// AddHealthzCheck allows you to add a HealthzCheck to the Manager
	AddHealthzCheck(name string, check healthz.Checker) error

	// AddReadyzCheck allows you to add a ReadyzCheck to the Manager
	AddReadyzCheck(name string, check healthz.Checker) error

	// AddMetricsServerExtraHandler adds an extra handler to the Metrics server
	AddMetricsServerExtraHandler(path string, handler http.Handler) error

	// Start starts the Manager and waits for it to be stopped
	Start(ctx context.Context) error

	// GetConfig returns the rest.Config used by the Manager
	GetConfig() *rest.Config

	// GetScheme returns the scheme.Scheme used by the Manager
	GetScheme() *runtime.Scheme

	// GetClient returns the client.Client used by the Manager
	GetClient() client.Client

	// GetFieldIndexer returns the client.FieldIndexer used by the Manager
	GetFieldIndexer() client.FieldIndexer

	// GetCache returns the cache.Cache used by the Manager
	GetCache() cache.Cache

	// GetEventRecorderFor returns a new record.EventRecorder for the provided name
	GetEventRecorderFor(name string) record.EventRecorder

	// GetRESTMapper returns the meta.RESTMapper used by the Manager
	GetRESTMapper() meta.RESTMapper

	// GetAPIReader returns a client.Reader that will hit the API server
	GetAPIReader() client.Reader

	// GetWebhookServer returns the webhook server used by the Manager
	GetWebhookServer() webhook.Server

	// GetLogger returns the logger used by the Manager
	GetLogger() logr.Logger

	// GetControllerOptions returns the controller options
	GetControllerOptions() config.Controller

	// Elected returns a channel that is closed when this Manager is elected leader
	Elected() <-chan struct{}

	// GetHTTPClient returns the http.Client used by the Manager
	GetHTTPClient() *http.Client
}

Manager defines the interface for a controller-runtime manager

type MilvusComponent

type MilvusComponent struct {
	Name        string
	FieldName   string
	DefaultPort int32
}

MilvusComponent contains basic info of a milvus cluster component

func GetComponentsBySpec

func GetComponentsBySpec(spec v1beta1.MilvusSpec) []MilvusComponent

GetComponentsBySpec returns the components by the spec

func GetExpectedTwoDeployComponents

func GetExpectedTwoDeployComponents(spec v1beta1.MilvusSpec) []MilvusComponent

func (MilvusComponent) GetComponentPort

func (c MilvusComponent) GetComponentPort(spec v1beta1.MilvusSpec) int32

GetComponentPort returns the port of the component

func (MilvusComponent) GetComponentSpec

func (c MilvusComponent) GetComponentSpec(spec v1beta1.MilvusSpec) v1beta1.ComponentSpec

GetComponentSpec returns the component spec

func (MilvusComponent) GetContainerName

func (c MilvusComponent) GetContainerName() string

GetContainerName returns the name of the component container

func (MilvusComponent) GetDependencies

func (c MilvusComponent) GetDependencies(spec v1beta1.MilvusSpec) []MilvusComponent

func (MilvusComponent) GetDeploymentName

func (c MilvusComponent) GetDeploymentName(instance string) string

GetDeploymentName returns the name of the component deployment

func (MilvusComponent) GetDeploymentStrategy

func (c MilvusComponent) GetDeploymentStrategy(configs map[string]interface{}) appsv1.DeploymentStrategy

func (MilvusComponent) GetInitContainers

func (c MilvusComponent) GetInitContainers(spec v1beta1.MilvusSpec) []corev1.Container

GetInitContainers returns the component init containers

func (MilvusComponent) GetLeastReplicasRegardingHPA

func (c MilvusComponent) GetLeastReplicasRegardingHPA(spec v1beta1.MilvusSpec) int32

GetLeastReplicasRegardingHPA returns the least replicas for the component regarding HPA

func (MilvusComponent) GetMilvusReplicas

func (c MilvusComponent) GetMilvusReplicas(status *v1beta1.MilvusReplicas) int

SetStatusReplica sets the replica status of the component, input status should not be nil

func (MilvusComponent) GetName

func (c MilvusComponent) GetName() string

String returns the name of the component

func (MilvusComponent) GetPortName

func (c MilvusComponent) GetPortName() string

GetPortName returns the port name of the component container

func (MilvusComponent) GetReplicas

func (c MilvusComponent) GetReplicas(spec v1beta1.MilvusSpec) *int32

GetReplicas returns the replicas for the component

func (MilvusComponent) GetRestfulPort

func (c MilvusComponent) GetRestfulPort(spec v1beta1.MilvusSpec) int32

GetComponentPort returns the port of the component

func (MilvusComponent) GetRunCommands

func (c MilvusComponent) GetRunCommands() []string

String returns the name of the component

func (MilvusComponent) GetServicePorts

func (c MilvusComponent) GetServicePorts(spec v1beta1.MilvusSpec) []corev1.ServicePort

GetServicePorts returns the ports of the component service

func (MilvusComponent) GetServiceType

func (c MilvusComponent) GetServiceType(spec v1beta1.MilvusSpec) corev1.ServiceType

GetServiceType returns the type of the component service

func (MilvusComponent) GetSideCars

func (c MilvusComponent) GetSideCars(spec v1beta1.MilvusSpec) []corev1.Container

GetSideCars returns the component sidecar containers

func (MilvusComponent) IsCoord

func (c MilvusComponent) IsCoord() bool

IsCoord return if it's a coord by its name

func (MilvusComponent) IsImageUpdated

func (c MilvusComponent) IsImageUpdated(m *v1beta1.Milvus) bool

IsImageUpdated returns whether the image of the component is updated

func (MilvusComponent) IsNode

func (c MilvusComponent) IsNode() bool

IsCoord return if it's a node by its name

func (MilvusComponent) IsService

func (c MilvusComponent) IsService() bool

func (MilvusComponent) IsStandalone

func (c MilvusComponent) IsStandalone() bool

IsCoord return if it's a coord by its name

func (MilvusComponent) SetReplicas

func (c MilvusComponent) SetReplicas(spec v1beta1.MilvusSpec, replicas *int32) error

GetReplicas returns the replicas for the component

func (MilvusComponent) SetStatusReplicas

func (c MilvusComponent) SetStatusReplicas(status *v1beta1.MilvusReplicas, replicas int)

SetStatusReplica sets the replica status of the component, input status should not be nil

type MilvusEndpointInfo

type MilvusEndpointInfo struct {
	Namespace   string
	Name        string
	ServiceType corev1.ServiceType
	Port        int32
}

MilvusEndpointInfo info for calculate the endpoint

type MilvusHealthStatusInfo

type MilvusHealthStatusInfo struct {
	LastState  v1beta1.MilvusHealthStatus
	IsStopping bool
	IsHealthy  bool
}

func (MilvusHealthStatusInfo) GetMilvusHealthStatus

func (m MilvusHealthStatusInfo) GetMilvusHealthStatus() v1beta1.MilvusHealthStatus

type MilvusPredicate

type MilvusPredicate struct {
	predicate.Funcs
}

func (*MilvusPredicate) Create

func (*MilvusPredicate) Update

type MilvusReconcileFunc

type MilvusReconcileFunc func(context.Context, *v1beta1.Milvus) error

type MilvusReconciler

type MilvusReconciler struct {
	client.Client
	Scheme *runtime.Scheme
	// contains filtered or unexported fields
}

MilvusReconciler reconciles a Milvus object

func (*MilvusReconciler) Reconcile

func (r *MilvusReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error)

Reconcile is part of the main kubernetes reconciliation loop which aims to move the current state of the cluster closer to the desired state.

For more details, check Reconcile and its Result here: - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.8.3/pkg/reconcile

func (*MilvusReconciler) ReconcileAll

func (r *MilvusReconciler) ReconcileAll(ctx context.Context, mc v1beta1.Milvus) error

func (*MilvusReconciler) ReconcileComponentDeployment

func (r *MilvusReconciler) ReconcileComponentDeployment(
	ctx context.Context, mc v1beta1.Milvus, component MilvusComponent,
) error

func (*MilvusReconciler) ReconcileComponentService

func (r *MilvusReconciler) ReconcileComponentService(
	ctx context.Context, mc v1beta1.Milvus, component MilvusComponent,
) error

func (*MilvusReconciler) ReconcileConfigMaps

func (r *MilvusReconciler) ReconcileConfigMaps(ctx context.Context, mc v1beta1.Milvus) error

func (*MilvusReconciler) ReconcileDeployments

func (r *MilvusReconciler) ReconcileDeployments(ctx context.Context, mc v1beta1.Milvus) error

func (*MilvusReconciler) ReconcileEtcd

func (r *MilvusReconciler) ReconcileEtcd(ctx context.Context, mc v1beta1.Milvus) error

func (*MilvusReconciler) ReconcileIngress

func (r *MilvusReconciler) ReconcileIngress(ctx context.Context, mc v1beta1.Milvus) error

func (*MilvusReconciler) ReconcileKafka

func (r *MilvusReconciler) ReconcileKafka(ctx context.Context, mc v1beta1.Milvus) error

func (*MilvusReconciler) ReconcileLegacyValues

func (r *MilvusReconciler) ReconcileLegacyValues(ctx context.Context, old, milvus *milvusv1beta1.Milvus) error

func (*MilvusReconciler) ReconcileMilvus

func (r *MilvusReconciler) ReconcileMilvus(ctx context.Context, mc v1beta1.Milvus) error

func (*MilvusReconciler) ReconcileMinio

func (r *MilvusReconciler) ReconcileMinio(ctx context.Context, mc v1beta1.Milvus) error

func (*MilvusReconciler) ReconcileMsgStream

func (r *MilvusReconciler) ReconcileMsgStream(ctx context.Context, mc v1beta1.Milvus) error

func (*MilvusReconciler) ReconcilePVCs

func (r *MilvusReconciler) ReconcilePVCs(ctx context.Context, mil v1beta1.Milvus) error

func (*MilvusReconciler) ReconcilePodMonitor

func (r *MilvusReconciler) ReconcilePodMonitor(ctx context.Context, mc v1beta1.Milvus) error

func (*MilvusReconciler) ReconcilePulsar

func (r *MilvusReconciler) ReconcilePulsar(ctx context.Context, mc v1beta1.Milvus) error

func (*MilvusReconciler) ReconcileServices

func (r *MilvusReconciler) ReconcileServices(ctx context.Context, mc v1beta1.Milvus) error

func (*MilvusReconciler) RemoveOldStandlone

func (r *MilvusReconciler) RemoveOldStandlone(ctx context.Context, mc v1beta1.Milvus) error

func (*MilvusReconciler) SetDefaultStatus

func (r *MilvusReconciler) SetDefaultStatus(ctx context.Context, mc *v1beta1.Milvus) error

SetDefaultStatus update status if default not set; return true if updated, return false if not, return err if update failed

func (*MilvusReconciler) SetupWithManager

func (r *MilvusReconciler) SetupWithManager(mgr ctrl.Manager) error

SetupWithManager sets up the controller with the Manager.

func (*MilvusReconciler) VerifyCR

func (r *MilvusReconciler) VerifyCR(ctx context.Context, milvus *milvusv1beta1.Milvus) error

type MilvusStatusSyncer

type MilvusStatusSyncer struct {
	client.Client

	sync.Once
	// contains filtered or unexported fields
}

func NewMilvusStatusSyncer

func NewMilvusStatusSyncer(ctx context.Context, client client.Client, logger logr.Logger) *MilvusStatusSyncer

func (*MilvusStatusSyncer) GetEtcdCondition

func (r *MilvusStatusSyncer) GetEtcdCondition(ctx context.Context, mc v1beta1.Milvus) (v1beta1.MilvusCondition, error)

func (*MilvusStatusSyncer) GetMilvusEndpoint

func (r *MilvusStatusSyncer) GetMilvusEndpoint(ctx context.Context, mc v1beta1.Milvus) string

func (*MilvusStatusSyncer) GetMinioCondition

func (r *MilvusStatusSyncer) GetMinioCondition(
	ctx context.Context, mc v1beta1.Milvus) (v1beta1.MilvusCondition, error)

TODO: rename as GetStorageCondition

func (*MilvusStatusSyncer) GetMsgStreamCondition

func (r *MilvusStatusSyncer) GetMsgStreamCondition(
	ctx context.Context, mc v1beta1.Milvus) (v1beta1.MilvusCondition, error)

func (*MilvusStatusSyncer) RunIfNot

func (r *MilvusStatusSyncer) RunIfNot()

func (*MilvusStatusSyncer) UpdateIngressStatus

func (r *MilvusStatusSyncer) UpdateIngressStatus(ctx context.Context, mc *v1beta1.Milvus) error

func (*MilvusStatusSyncer) UpdateStatusForNewGeneration

func (r *MilvusStatusSyncer) UpdateStatusForNewGeneration(ctx context.Context, mc *v1beta1.Milvus, checkDependency bool) error

UpdateStatusForNewGeneration updates the status of the Milvus CR. if given checkDependency = true, it will check the dependency conditions

func (*MilvusStatusSyncer) UpdateStatusRoutine

func (r *MilvusStatusSyncer) UpdateStatusRoutine(ctx context.Context, mc *v1beta1.Milvus) error

type MilvusStatusSyncerInterface

type MilvusStatusSyncerInterface interface {
	RunIfNot()
	UpdateStatusForNewGeneration(ctx context.Context, mc *v1beta1.Milvus, checkDependency bool) error
}

MilvusStatusSyncerInterface abstracts MilvusStatusSyncer

type MilvusUpgradeReconciler

type MilvusUpgradeReconciler struct {
	client.Client
	Scheme *runtime.Scheme
	// contains filtered or unexported fields
}

MilvusUpgradeReconciler reconciles a MilvusUpgrade object

func NewMilvusUpgradeReconciler

func NewMilvusUpgradeReconciler(client client.Client, scheme *runtime.Scheme) *MilvusUpgradeReconciler

NewMilvusUpgradeReconciler returns a new MilvusUpgradeReconciler

func (*MilvusUpgradeReconciler) BackupMeta

func (*MilvusUpgradeReconciler) DetermineCurrentState

func (r *MilvusUpgradeReconciler) DetermineCurrentState(ctx context.Context, upgrade *v1beta1.MilvusUpgrade) (nextState v1beta1.MilvusUpgradeState)

func (*MilvusUpgradeReconciler) HandleBakupMetaFailed

func (r *MilvusUpgradeReconciler) HandleBakupMetaFailed(ctx context.Context, upgrade *v1beta1.MilvusUpgrade, milvus *v1beta1.Milvus) (v1beta1.MilvusUpgradeState, error)

func (*MilvusUpgradeReconciler) HandleUpgradeFailed

func (r *MilvusUpgradeReconciler) HandleUpgradeFailed(ctx context.Context, upgrade *v1beta1.MilvusUpgrade, milvus *v1beta1.Milvus) (v1beta1.MilvusUpgradeState, error)

func (*MilvusUpgradeReconciler) OldVersionStopping

func (r *MilvusUpgradeReconciler) OldVersionStopping(ctx context.Context, upgrade *v1beta1.MilvusUpgrade, milvus *v1beta1.Milvus) (v1beta1.MilvusUpgradeState, error)

func (*MilvusUpgradeReconciler) Reconcile

func (r *MilvusUpgradeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error)

Reconcile is part of the main kubernetes reconciliation loop which aims to move the current state of the cluster closer to the desired state. TODO(user): Modify the Reconcile function to compare the state specified by the MilvusUpgrade object against the actual cluster state, and then perform operations to make the cluster state reflect the state specified by the user.

For more details, check Reconcile and its Result here: - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.10.0/pkg/reconcile

func (*MilvusUpgradeReconciler) RollbackNewVersionStopping

func (r *MilvusUpgradeReconciler) RollbackNewVersionStopping(ctx context.Context, upgrade *v1beta1.MilvusUpgrade, milvus *v1beta1.Milvus) (v1beta1.MilvusUpgradeState, error)

func (*MilvusUpgradeReconciler) RollbackOldVersionStarting

func (r *MilvusUpgradeReconciler) RollbackOldVersionStarting(ctx context.Context, upgrade *v1beta1.MilvusUpgrade, milvus *v1beta1.Milvus) (v1beta1.MilvusUpgradeState, error)

func (*MilvusUpgradeReconciler) RollbackRestoringOldMeta

func (r *MilvusUpgradeReconciler) RollbackRestoringOldMeta(ctx context.Context, upgrade *v1beta1.MilvusUpgrade, milvus *v1beta1.Milvus) (v1beta1.MilvusUpgradeState, error)

func (*MilvusUpgradeReconciler) RunStateMachine

func (r *MilvusUpgradeReconciler) RunStateMachine(ctx context.Context, upgrade *v1beta1.MilvusUpgrade) error

func (*MilvusUpgradeReconciler) SetupWithManager

func (r *MilvusUpgradeReconciler) SetupWithManager(mgr ctrl.Manager) error

SetupWithManager sets up the controller with the Manager.

func (*MilvusUpgradeReconciler) StartingMilvusNewVersion

func (r *MilvusUpgradeReconciler) StartingMilvusNewVersion(ctx context.Context, upgrade *v1beta1.MilvusUpgrade, milvus *v1beta1.Milvus) (v1beta1.MilvusUpgradeState, error)

func (*MilvusUpgradeReconciler) UpdatingMeta

type MilvusUpgradeReconcilerCommonFunc

type MilvusUpgradeReconcilerCommonFunc func(ctx context.Context, upgrade *v1beta1.MilvusUpgrade, milvus *v1beta1.Milvus) (nextState v1beta1.MilvusUpgradeState, err error)

type NewEtcdClientFunc

type NewEtcdClientFunc func(cfg clientv3.Config) (EtcdClient, error)

type ParallelGroupRunner

type ParallelGroupRunner struct {
}

ParallelGroupRunner is a group runner that run funcs in parallel

func (ParallelGroupRunner) Run

func (ParallelGroupRunner) Run(funcs []Func, ctx context.Context, args ...interface{}) error

Run a group of funcs by same args in parallel, if any func fail, it should return err

func (ParallelGroupRunner) RunDiffArgs

func (ParallelGroupRunner) RunDiffArgs(f MilvusReconcileFunc, ctx context.Context, argsArray []*v1beta1.Milvus) error

RunDiffArgs runs a func by groups of args from @argsArray multiple times, if any failed, it should return err

func (ParallelGroupRunner) RunWithResult

func (ParallelGroupRunner) RunWithResult(funcs []Func, ctx context.Context, args ...interface{}) []Result

RunWithResult runs a group of funcs by same args, returns results with data & err for each func called

type Result

type Result struct {
	Data interface{}
	Err  error
}

Result contains data & err for a func's return

type RollingModeStatusUpdater

type RollingModeStatusUpdater interface {
	Update(ctx context.Context, mc *v1beta1.Milvus) error
}

func NewRollingModeStatusUpdater

func NewRollingModeStatusUpdater(cli client.Client, bizFactory DeployControllerBizFactory) RollingModeStatusUpdater

type RollingModeStatusUpdaterImpl

type RollingModeStatusUpdaterImpl struct {
	// contains filtered or unexported fields
}

func (*RollingModeStatusUpdaterImpl) Update

type StorageConditionInfo

type StorageConditionInfo struct {
	Namespace   string
	Bucket      string
	Storage     v1beta1.MilvusStorage
	UseSSL      bool
	UseIAM      bool
	IAMEndpoint string
	// StorageAccount of azure
	StorageAccount string
	UseVirtualHost bool
}

StorageConditionInfo is info for acquiring storage condition

type Values

type Values = map[string]interface{}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL