sharding

package
v1.14.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 12, 2026 License: Apache-2.0 Imports: 33 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CleanupController

func CleanupController(testCtrl *TestShardingController)

CleanupController cleans up controller resources

func CreateTestNode

func CreateTestNode(name string, cpu string, isWarmup bool, labels map[string]string) *corev1.Node

CreateTestNode creates a test node with specified configuration

func CreateTestNodeWithPods

func CreateTestNodeWithPods(t *testing.T, controller *ShardingController, nodeName string, cpu string, isWarmup bool, pods []*corev1.Pod)

CreateTestNodeWithPods creates a test node with multiple pods

func CreateTestPod

func CreateTestPod(namespace, name, nodeName, cpu, memory string) *corev1.Pod

CreateTestPod creates a test pod with specified resource requests

func ForceSyncShards

func ForceSyncShards(t *testing.T, controller *ShardingController, timeout time.Duration)

ForceSyncShards forces a shard synchronization and waits for completion

func SetupPodsOnNode

func SetupPodsOnNode(t *testing.T, controller *ShardingController, nodeName string, pods []*corev1.Pod)

SetupPodsOnNode creates multiple pods on a specific node

func VerifyAssignment

func VerifyAssignment(t *testing.T, controller *ShardingController, schedulerName string, expectedNodes []string)

VerifyAssignment verifies assignment for a scheduler

func VerifyAssignmentUpdate

func VerifyAssignmentUpdate(t *testing.T, controller *ShardingController, schedulerName string, expectedNodesToAdd []string, expectedNodesToRemove []string)

func WaitForNodeMetricsUpdate

func WaitForNodeMetricsUpdate(t *testing.T, controller *ShardingController, nodeName string, timeout time.Duration)

WaitForNodeMetricsUpdate waits for node metrics to be updated

func WaitForQueueProcessing

func WaitForQueueProcessing(controller *ShardingController, timeout time.Duration) error

WaitForQueueProcessing waits for queue to be processed

Types

type AssignmentCache

type AssignmentCache struct {
	Version     string
	Timestamp   time.Time
	Assignments map[string]*ShardAssignment // scheduler name -> assignment
}

AssignmentCache stores the result of shard assignments with version control

type AssignmentChangeEvent

type AssignmentChangeEvent struct {
	SchedulerName string
	OldNodes      []string
	NewNodes      []string
	NodesToAdd    []string
	NodesToRemove []string
	Version       string
	Timestamp     time.Time
}

AssignmentChangeEvent represents a change in assignment

type AssignmentContext

type AssignmentContext struct {
	AllNodes         []*corev1.Node
	CurrentShards    map[string]*shardv1alpha1.NodeShard
	SchedulerConfigs []SchedulerConfig
	AssignedNodes    map[string]string // node name -> scheduler name
	Timestamp        time.Time
}

AssignmentContext contains context information for shard assignment

type NodeEvent

type NodeEvent struct {
	EventType string // "pod-added", "pod-updated", "pod-deleted", "node-updated"
	NodeName  string
	Source    string // "pod-controller", "node-controller"
	Timestamp time.Time
}

NodeEvent represents a node-related event

type NodeMetrics

type NodeMetrics struct {
	NodeName        string
	ResourceVersion string
	LastUpdated     time.Time

	// Resource capacity and allocatable
	CPUCapacity       resource.Quantity
	CPUAllocatable    resource.Quantity
	MemoryCapacity    resource.Quantity
	MemoryAllocatable resource.Quantity

	// Resource utilization
	CPUUtilization    float64 // 0.0 to 1.0
	MemoryUtilization float64 // 0.0 to 1.0

	// Node characteristics
	IsWarmupNode bool
	PodCount     int
	Labels       map[string]string
	Annotations  map[string]string
}

NodeMetrics contains comprehensive metrics for a node

type NodeMetricsProvider

type NodeMetricsProvider interface {
	GetNodeMetrics(nodeName string) *NodeMetrics
	GetAllNodeMetrics() map[string]*NodeMetrics
	UpdateNodeMetrics(nodeName string, metrics *NodeMetrics)
}

NodeMetricsProvider provides access to node metrics

type NodeResourceInfo

type NodeResourceInfo struct {
	NodeName          string
	CPUAllocatable    resource.Quantity
	CPUCapacity       resource.Quantity
	MemoryAllocatable resource.Quantity
	MemoryCapacity    resource.Quantity
	CPUUtilization    float64 // 0.0 to 1.0
	MemoryUtilization float64 // 0.0 to 1.0
	IsWarmupNode      bool
	PodCount          int
	Labels            map[string]string
	Annotations       map[string]string
}

NodeResourceInfo contains resource utilization information for a node

type SchedulerConfig

type SchedulerConfig struct {
	Name          string
	Type          string // "volcano" or "agent"
	ShardStrategy ShardStrategy
}

SchedulerConfig defines the configuration for a scheduler

type SchedulerConfigSpec

type SchedulerConfigSpec struct {
	Name              string  // Scheduler name
	Type              string  // Workload type
	CPUUtilizationMin float64 // Minimum CPU utilization threshold
	CPUUtilizationMax float64 // Maximum CPU utilization threshold
	PreferWarmupNodes bool    // Whether to prefer warmup nodes
	MinNodes          int     // Minimum number of nodes
	MaxNodes          int     // Maximum number of nodes
}

type ShardAssignment

type ShardAssignment struct {
	SchedulerName string
	NodesDesired  []string
	StrategyUsed  string
	Version       string
	Reason        string
}

ShardAssignment represents assignment for a single scheduler

type ShardStrategy

type ShardStrategy struct {
	// CPUUtilizationRange specifies inclusive range [min, max] for CPU utilization
	CPUUtilizationRange struct {
		Min float64
		Max float64
	}

	// PreferWarmupNodes indicates preference for warmup nodes
	PreferWarmupNodes bool

	// MinNodes and MaxNodes define node count constraints
	MinNodes int
	MaxNodes int
}

ShardStrategy defines hard boundaries for node assignment

type ShardingController

type ShardingController struct {
	// contains filtered or unexported fields
}

ShardingController implements the framework.Controller interface

func (*ShardingController) GetAllNodeMetrics

func (sc *ShardingController) GetAllNodeMetrics() map[string]*NodeMetrics

func (*ShardingController) GetNodeMetrics

func (sc *ShardingController) GetNodeMetrics(nodeName string) *NodeMetrics

Implement NodeMetricsProvider interface

func (*ShardingController) Initialize

func (sc *ShardingController) Initialize(opt *framework.ControllerOption) error

Initialize initializes the controller

func (*ShardingController) InitializeWithConfigs

func (sc *ShardingController) InitializeWithConfigs(opt *framework.ControllerOption, configSpecs []SchedulerConfigSpec) error

InitializeWithConfigs initializes the controller with specific scheduler configs

func (*ShardingController) Name

func (sc *ShardingController) Name() string

Return the name of the controller

func (*ShardingController) Run

func (sc *ShardingController) Run(stopCh <-chan struct{})

Run starts the controller

func (*ShardingController) UpdateNodeMetrics

func (sc *ShardingController) UpdateNodeMetrics(nodeName string, metrics *NodeMetrics)

type ShardingControllerOptions

type ShardingControllerOptions struct {
	SchedulerConfigsRaw    []string
	SchedulerConfigs       []SchedulerConfigSpec
	ShardSyncPeriod        time.Duration
	EnableNodeEventTrigger bool
}

func NewShardingControllerOptions

func NewShardingControllerOptions() ShardingControllerOptions

func (*ShardingControllerOptions) AddFlags

func (opts *ShardingControllerOptions) AddFlags(fs *pflag.FlagSet)

AddFlags adds flags to the flag set using pflag pattern

func (*ShardingControllerOptions) ParseConfig

func (opts *ShardingControllerOptions) ParseConfig() error

ParseConfig parses the raw config strings into scheduler configs

type ShardingManager

type ShardingManager struct {
	// contains filtered or unexported fields
}

ShardingManager calculates shard assignments

func NewShardingManager

func NewShardingManager(schedulerConfigs []SchedulerConfig, nodeMetricsProvider NodeMetricsProvider) *ShardingManager

NewShardingManager creates a new sharding manager

func (*ShardingManager) CalculateShardAssignments

func (sm *ShardingManager) CalculateShardAssignments(
	nodes []*corev1.Node,
	currentShards []*shardv1alpha1.NodeShard,
) (map[string]*ShardAssignment, error)

CalculateShardAssignments calculates shard assignments for all schedulers

type TestControllerOption

type TestControllerOption struct {
	InitialObjects   []runtime.Object
	SchedulerConfigs []SchedulerConfigSpec
	ShardSyncPeriod  time.Duration
	StopCh           chan struct{} // Add stop channel
}

TestControllerOption contains options for test controller

type TestShardingController

type TestShardingController struct {
	Controller *ShardingController
	StopCh     chan struct{}
}

TestShardingController is a test wrapper for ShardingController

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL