server

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 15, 2023 License: Apache-2.0 Imports: 56 Imported by: 1

Documentation

Overview

Package server provides a set of struct definitions for the resource group, can be imported.

Index

Constants

This section is empty.

Variables

View Source
var SetUpRestHandler = func(srv *Service) (http.Handler, apiutil.APIServiceGroup) {
	return dummyRestService{}, apiutil.APIServiceGroup{}
}

SetUpRestHandler is a hook to sets up the REST service.

Functions

func CreateServerWrapper

func CreateServerWrapper(cmd *cobra.Command, args []string)

CreateServerWrapper encapsulates the configuration/log/metrics initialization and create the server

func NewService

NewService creates a new resource manager service.

Types

type Config

type Config struct {
	BackendEndpoints    string `toml:"backend-endpoints" json:"backend-endpoints"`
	ListenAddr          string `toml:"listen-addr" json:"listen-addr"`
	AdvertiseListenAddr string `toml:"advertise-listen-addr" json:"advertise-listen-addr"`
	Name                string `toml:"name" json:"name"`
	DataDir             string `toml:"data-dir" json:"data-dir"` // TODO: remove this after refactoring
	EnableGRPCGateway   bool   `json:"enable-grpc-gateway"`      // TODO: use it

	Metric metricutil.MetricConfig `toml:"metric" json:"metric"`

	// Log related config.
	Log      log.Config `toml:"log" json:"log"`
	Logger   *zap.Logger
	LogProps *log.ZapProperties

	Security configutil.SecurityConfig `toml:"security" json:"security"`

	// LeaderLease defines the time within which a Resource Manager primary/leader must
	// update its TTL in etcd, otherwise etcd will expire the leader key and other servers
	// can campaign the primary/leader again. Etcd only supports seconds TTL, so here is
	// second too.
	LeaderLease int64 `toml:"lease" json:"lease"`

	Controller ControllerConfig `toml:"controller" json:"controller"`
}

Config is the configuration for the resource manager.

func GenerateConfig

func GenerateConfig(c *Config) (*Config, error)

GenerateConfig generates a new config with the given options.

func NewConfig

func NewConfig() *Config

NewConfig creates a new config.

func (*Config) Adjust

func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error

Adjust is used to adjust the resource manager configurations.

func (*Config) GetTLSConfig

func (c *Config) GetTLSConfig() *grpcutil.TLSConfig

GetTLSConfig returns the TLS config.

func (*Config) Parse

func (c *Config) Parse(flagSet *pflag.FlagSet) error

Parse parses flag definitions from the argument list.

func (*Config) Validate

func (c *Config) Validate() error

Validate is used to validate if some configurations are right.

type ControllerConfig

type ControllerConfig struct {
	// EnableDegradedMode is to control whether resource control client enable degraded mode when server is disconnect.
	DegradedModeWaitDuration typeutil.Duration `toml:"degraded-mode-wait-duration" json:"degraded-mode-wait-duration"`

	// RequestUnit is the configuration determines the coefficients of the RRU and WRU cost.
	// This configuration should be modified carefully.
	RequestUnit RequestUnitConfig `toml:"request-unit" json:"request-unit"`
}

ControllerConfig is the configuration of the resource manager controller which includes some option for client needed.

func (*ControllerConfig) Adjust

func (rmc *ControllerConfig) Adjust(meta *configutil.ConfigMetaData)

Adjust adjusts the configuration and initializes it with the default value if necessary.

type GroupStates

type GroupStates struct {
	// RU tokens
	RU *GroupTokenBucketState `json:"r_u,omitempty"`
	// raw resource tokens
	CPU     *GroupTokenBucketState `json:"cpu,omitempty"`
	IORead  *GroupTokenBucketState `json:"io_read,omitempty"`
	IOWrite *GroupTokenBucketState `json:"io_write,omitempty"`
}

GroupStates is the tokens set of a resource group.

type GroupTokenBucket

type GroupTokenBucket struct {
	// Settings is the setting of TokenBucket.
	// BurstLimit is used as below:
	//   - If b == 0, that means the limiter is unlimited capacity. default use in resource controller (burst with a rate within an unlimited capacity).
	//   - If b < 0, that means the limiter is unlimited capacity and fillrate(r) is ignored, can be seen as r == Inf (burst within an unlimited capacity).
	//   - If b > 0, that means the limiter is limited capacity.
	// MaxTokens limits the number of tokens that can be accumulated
	Settings              *rmpb.TokenLimitSettings `json:"settings,omitempty"`
	GroupTokenBucketState `json:"state,omitempty"`
}

GroupTokenBucket is a token bucket for a resource group. Now we don't save consumption in `GroupTokenBucket`, only statistics it in prometheus.

func NewGroupTokenBucket

func NewGroupTokenBucket(tokenBucket *rmpb.TokenBucket) *GroupTokenBucket

NewGroupTokenBucket returns a new GroupTokenBucket

func (*GroupTokenBucket) GetTokenBucket

func (gtb *GroupTokenBucket) GetTokenBucket() *rmpb.TokenBucket

GetTokenBucket returns the grpc protoc struct of GroupTokenBucket.

type GroupTokenBucketState

type GroupTokenBucketState struct {
	Tokens float64 `json:"tokens,omitempty"`

	LastUpdate  *time.Time `json:"last_update,omitempty"`
	Initialized bool       `json:"initialized"`
	// contains filtered or unexported fields
}

GroupTokenBucketState is the running state of TokenBucket.

func (*GroupTokenBucketState) Clone

Clone returns the copy of GroupTokenBucketState

type Manager

type Manager struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Manager is the manager of resource group.

func NewManager

func NewManager[T ResourceManagerConfigProvider](srv bs.Server) *Manager

NewManager returns a new manager base on the given server, which should implement the `ResourceManagerConfigProvider` interface.

func (*Manager) AddResourceGroup

func (m *Manager) AddResourceGroup(grouppb *rmpb.ResourceGroup) error

AddResourceGroup puts a resource group.

func (*Manager) DeleteResourceGroup

func (m *Manager) DeleteResourceGroup(name string) error

DeleteResourceGroup deletes a resource group.

func (*Manager) GetBasicServer

func (m *Manager) GetBasicServer() bs.Server

GetBasicServer returns the basic server.

func (*Manager) GetMutableResourceGroup

func (m *Manager) GetMutableResourceGroup(name string) *ResourceGroup

GetMutableResourceGroup returns a mutable resource group.

func (*Manager) GetResourceGroup

func (m *Manager) GetResourceGroup(name string) *ResourceGroup

GetResourceGroup returns a copy of a resource group.

func (*Manager) GetResourceGroupList

func (m *Manager) GetResourceGroupList() []*ResourceGroup

GetResourceGroupList returns copies of resource group list.

func (*Manager) Init

func (m *Manager) Init(ctx context.Context)

Init initializes the resource group manager.

func (*Manager) ModifyResourceGroup

func (m *Manager) ModifyResourceGroup(group *rmpb.ResourceGroup) error

ModifyResourceGroup modifies an existing resource group.

type RequestUnitConfig

type RequestUnitConfig struct {
	// ReadBaseCost is the base cost for a read request. No matter how many bytes read/written or
	// the CPU times taken for a request, this cost is inevitable.
	ReadBaseCost float64 `toml:"read-base-cost" json:"read-base-cost"`
	// ReadCostPerByte is the cost for each byte read. It's 1 RU = 64 KiB by default.
	ReadCostPerByte float64 `toml:"read-cost-per-byte" json:"read-cost-per-byte"`
	// WriteBaseCost is the base cost for a write request. No matter how many bytes read/written or
	// the CPU times taken for a request, this cost is inevitable.
	WriteBaseCost float64 `toml:"write-base-cost" json:"write-base-cost"`
	// WriteCostPerByte is the cost for each byte written. It's 1 RU = 1 KiB by default.
	WriteCostPerByte float64 `toml:"write-cost-per-byte" json:"write-cost-per-byte"`
	// CPUMsCost is the cost for each millisecond of CPU time taken.
	// It's 1 RU = 3 millisecond by default.
	CPUMsCost float64 `toml:"read-cpu-ms-cost" json:"read-cpu-ms-cost"`
}

RequestUnitConfig is the configuration of the request units, which determines the coefficients of the RRU and WRU cost.

func (*RequestUnitConfig) Adjust

func (ruc *RequestUnitConfig) Adjust()

Adjust adjusts the configuration and initializes it with the default value if necessary.

type RequestUnitSettings

type RequestUnitSettings struct {
	RU *GroupTokenBucket `json:"r_u,omitempty"`
}

RequestUnitSettings is the definition of the RU settings.

func NewRequestUnitSettings

func NewRequestUnitSettings(tokenBucket *rmpb.TokenBucket) *RequestUnitSettings

NewRequestUnitSettings creates a new RequestUnitSettings with the given token bucket.

type ResourceGroup

type ResourceGroup struct {
	sync.RWMutex
	Name string         `json:"name"`
	Mode rmpb.GroupMode `json:"mode"`
	// RU settings
	RUSettings *RequestUnitSettings `json:"r_u_settings,omitempty"`
	Priority   uint32               `json:"priority"`
}

ResourceGroup is the definition of a resource group, for REST API.

func FromProtoResourceGroup

func FromProtoResourceGroup(group *rmpb.ResourceGroup) *ResourceGroup

FromProtoResourceGroup converts a rmpb.ResourceGroup to a ResourceGroup.

func (*ResourceGroup) Copy

func (rg *ResourceGroup) Copy() *ResourceGroup

Copy copies the resource group.

func (*ResourceGroup) GetGroupStates

func (rg *ResourceGroup) GetGroupStates() *GroupStates

GetGroupStates get the token set of ResourceGroup.

func (*ResourceGroup) IntoProtoResourceGroup

func (rg *ResourceGroup) IntoProtoResourceGroup() *rmpb.ResourceGroup

IntoProtoResourceGroup converts a ResourceGroup to a rmpb.ResourceGroup.

func (*ResourceGroup) PatchSettings

func (rg *ResourceGroup) PatchSettings(metaGroup *rmpb.ResourceGroup) error

PatchSettings patches the resource group settings. Only used to patch the resource group when updating. Note: the tokens is the delta value to patch.

func (*ResourceGroup) RequestRU

func (rg *ResourceGroup) RequestRU(
	now time.Time,
	neededTokens float64,
	targetPeriodMs, clientUniqueID uint64,
) *rmpb.GrantedRUTokenBucket

RequestRU requests the RU of the resource group.

func (*ResourceGroup) SetStatesIntoResourceGroup

func (rg *ResourceGroup) SetStatesIntoResourceGroup(states *GroupStates)

SetStatesIntoResourceGroup updates the state of resource group.

func (*ResourceGroup) String

func (rg *ResourceGroup) String() string

type ResourceManagerConfigProvider

type ResourceManagerConfigProvider interface {
	GetControllerConfig() *ControllerConfig
}

ResourceManagerConfigProvider is used to get resource manager config from the given `bs.server` without modifying its interface.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is the resource manager server, and it implements bs.Server.

func NewServer

func NewServer(ctx context.Context, cfg *Config) *Server

NewServer creates a new resource manager server.

func NewTestServer

func NewTestServer(ctx context.Context, re *require.Assertions, cfg *Config) (*Server, testutil.CleanupFunc, error)

NewTestServer creates a resource manager server for testing.

func (*Server) AddServiceReadyCallback

func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context))

AddServiceReadyCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise.

func (*Server) AddStartCallback

func (s *Server) AddStartCallback(callbacks ...func())

AddStartCallback adds a callback in the startServer phase.

func (*Server) Close

func (s *Server) Close()

Close closes the server.

func (*Server) Context

func (s *Server) Context() context.Context

Context returns the context.

func (*Server) GetAddr

func (s *Server) GetAddr() string

GetAddr returns the server address.

func (*Server) GetClient

func (s *Server) GetClient() *clientv3.Client

GetClient returns builtin etcd client.

func (*Server) GetControllerConfig

func (s *Server) GetControllerConfig() *ControllerConfig

GetControllerConfig returns the controller config.

func (*Server) GetHTTPClient

func (s *Server) GetHTTPClient() *http.Client

GetHTTPClient returns builtin http client.

func (*Server) GetLeaderListenUrls

func (s *Server) GetLeaderListenUrls() []string

GetLeaderListenUrls gets service endpoints from the leader in election group.

func (*Server) IsClosed

func (s *Server) IsClosed() bool

IsClosed checks if the server loop is closed

func (*Server) IsServing

func (s *Server) IsServing() bool

IsServing returns whether the server is the leader, if there is embedded etcd, or the primary otherwise.

func (*Server) Name

func (s *Server) Name() string

Name returns the unique etcd name for this server in etcd cluster.

func (*Server) Run

func (s *Server) Run() (err error)

Run runs the Resource Manager server.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service is the gRPC service for resource manager.

func (*Service) AcquireTokenBuckets

func (s *Service) AcquireTokenBuckets(stream rmpb.ResourceManager_AcquireTokenBucketsServer) error

AcquireTokenBuckets implements ResourceManagerServer.AcquireTokenBuckets.

func (*Service) AddResourceGroup

AddResourceGroup implements ResourceManagerServer.AddResourceGroup.

func (*Service) DeleteResourceGroup

DeleteResourceGroup implements ResourceManagerServer.DeleteResourceGroup.

func (*Service) GetManager

func (s *Service) GetManager() *Manager

GetManager returns the resource manager.

func (*Service) GetResourceGroup

GetResourceGroup implements ResourceManagerServer.GetResourceGroup.

func (*Service) ListResourceGroups

ListResourceGroups implements ResourceManagerServer.ListResourceGroups.

func (*Service) ModifyResourceGroup

func (s *Service) ModifyResourceGroup(ctx context.Context, req *rmpb.PutResourceGroupRequest) (*rmpb.PutResourceGroupResponse, error)

ModifyResourceGroup implements ResourceManagerServer.ModifyResourceGroup.

func (*Service) RegisterGRPCService

func (s *Service) RegisterGRPCService(g *grpc.Server)

RegisterGRPCService registers the service to gRPC server.

func (*Service) RegisterRESTHandler

func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler)

RegisterRESTHandler registers the service to REST server.

type TokenSlot

type TokenSlot struct {
	// contains filtered or unexported fields
}

TokenSlot is used to split a token bucket into multiple slots to server different clients within the same resource group.

Directories

Path Synopsis
apis
v1

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL