rebalance

package
v1.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 25, 2022 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var EnsureTrailingDNsAreDrainedOrBlock = polardbxv1reconcile.NewStepBinder(
	"EnsureTrailingDNsAreDrainedOrRestartRebalance",
	func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) {

		taskCm, err := rc.GetPolarDBXConfigMap(convention.ConfigMapTypeTask)
		if err != nil {
			return flow.Error(err, "Unable to get config map for task.")
		}

		contextAccess := task.NewContextAccess(taskCm, "rebalance")
		rebalanceTask := &DataRebalanceTask{}
		ok, err := contextAccess.Read(rebalanceTask)
		if err != nil {
			return flow.Error(err, "Unable to read rebalance task context.")
		}
		if !ok {
			return flow.Error(errors.New("no rebalance task context found"), "Unable to find rebalance task context.")
		}

		drained, err := IsTrailingDNsDrained(rc, rebalanceTask)
		if err != nil {
			return flow.Error(err, "Unable to determine if trailing DNs are drained.")
		}

		polardbx := rc.MustGetPolarDBX()
		cdcNodeSpec := polardbx.Status.SpecSnapshot.Topology.Nodes.CDC
		if featuregate.WaitDrainedNodeToBeOffline.Enabled() ||
			(cdcNodeSpec != nil && cdcNodeSpec.Replicas > 0) {
			offline, err := rebalanceTask.areScaleInDrainedNodesOffline(rc)
			if err != nil {
				return flow.Error(err, "Unable to determine offline status from GMS.")
			}
			if !offline {
				return flow.RetryAfter(20*time.Second, "Block until trailing DNs are marked offline.")
			}
		}

		if drained {
			return flow.Pass()
		} else {
			polardbx := rc.MustGetPolarDBX()
			polardbx.Status.StatusForPrint.RebalanceProcess = "stuck"

			return flow.Wait("Trailing DNs are not drained, must be verified manually.")
		}
	},
)
View Source
var PrepareRebalanceTaskContext = polardbxv1reconcile.NewStepBinder("PrepareRebalanceTaskContext",
	func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) {

		taskCm, err := rc.GetPolarDBXConfigMap(convention.ConfigMapTypeTask)
		if err != nil {
			return flow.Error(err, "Unable to get config map for task.")
		}

		contextAccess := task.NewContextAccess(taskCm, "rebalance")
		rebalanceTask := &DataRebalanceTask{}
		found, err := contextAccess.Read(rebalanceTask)
		if err != nil {
			return flow.Error(err, "Unable to read rebalance task context.")
		}

		if found {
			return flow.Pass()
		}

		polardbx := rc.MustGetPolarDBX()
		toReplicas := int(polardbx.Status.SpecSnapshot.Topology.Nodes.DN.Replicas)

		gmsMgr, err := rc.GetPolarDBXGMSManager()
		if err != nil {
			return flow.Error(err, "Unable to get manager of GMS.")
		}
		storageNodes, err := gmsMgr.ListStorageNodes(gms.StorageKindMaster)
		if err != nil {
			return flow.Error(err, "Unable to list storages of DNs.")
		}

		fromReplicas := len(storageNodes)

		rebalanceTask = &DataRebalanceTask{
			From: fromReplicas,
			To:   toReplicas,
		}

		err = contextAccess.Write(rebalanceTask)
		if err != nil {
			return flow.Error(err, "Unable to write rebalance task into config map.")
		}

		err = rc.Client().Update(rc.Context(), taskCm)
		if err != nil {
			return flow.Error(err, "Unable to update task config map.")
		}

		return flow.Continue("Rebalance task context prepared.")
	},
)
View Source
var ResetRebalanceTask = polardbxv1reconcile.NewStepBinder("ResetRebalanceTask",
	func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		polardbx := rc.MustGetPolarDBX()
		polardbx.Status.StatusForPrint.RebalanceProcess = ""

		taskCm, err := rc.GetPolarDBXConfigMap(convention.ConfigMapTypeTask)
		if err != nil {
			return flow.Error(err, "Unable to get config map for task.")
		}

		contextAccess := task.NewContextAccess(taskCm, "rebalance")
		ok := contextAccess.Clear()

		if ok {
			err = rc.Client().Update(rc.Context(), taskCm)
			if err != nil {
				return flow.Error(err, "Unable to update task config map.")
			}
		}

		return flow.Pass()
	},
)
View Source
var StartRebalanceTask = polardbxv1reconcile.NewStepBinder("StartRebalanceTask",
	func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) {

		taskCm, err := rc.GetPolarDBXConfigMap(convention.ConfigMapTypeTask)
		if err != nil {
			return flow.Error(err, "Unable to get config map for task.")
		}

		contextAccess := task.NewContextAccess(taskCm, "rebalance")
		rebalanceTask := &DataRebalanceTask{}
		ok, err := contextAccess.Read(rebalanceTask)
		if err != nil {
			return flow.Error(err, "Unable to read rebalance task context.")
		}
		if !ok {
			return flow.Error(errors.New("no rebalance task context found"), "Unable to find rebalance task context.")
		}

		if rebalanceTask.Skip() || rebalanceTask.Started() {
			return flow.Pass()
		}

		planId, err := rebalanceTask.Start(rc)
		if err != nil {
			if err == group.ErrAlreadyInRebalance {
				flow.Logger().Info("Already in rebalance.")
			} else {
				return flow.Error(err, "Unable to start rebalance task.")
			}
		}

		flow.Logger().Info("Rebalance actions started.", "rebalance-plan", planId)

		rebalanceTask.PlanId = &planId

		err = contextAccess.Write(rebalanceTask)
		if err != nil {
			return flow.Error(err, "Unable to write rebalance task into config map.")
		}

		err = rc.Client().Update(rc.Context(), taskCm)
		if err != nil {
			return flow.Error(err, "Unable to update task config map.")
		}

		return flow.Pass()
	},
)

Functions

func IsTrailingDNsDrained

func IsTrailingDNsDrained(rc *polardbxv1reconcile.Context, rebalanceTask *DataRebalanceTask) (bool, error)

func WatchRebalanceTaskAntUpdateProgress

func WatchRebalanceTaskAntUpdateProgress(interval time.Duration) control.BindFunc

Types

type DataRebalanceTask

type DataRebalanceTask struct {
	From   int     `json:"from,omitempty"`
	To     int     `json:"to,omitempty"`
	PlanId *string `json:"plan_id,omitempty"`
}

func (*DataRebalanceTask) IsReady

func (*DataRebalanceTask) Progress

func (*DataRebalanceTask) Skip

func (t *DataRebalanceTask) Skip() bool

func (*DataRebalanceTask) Start

func (*DataRebalanceTask) Started

func (t *DataRebalanceTask) Started() bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL