escheduler

package module
v0.1.59 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 7, 2023 License: Apache-2.0 Imports: 23 Imported by: 0

README

escheduler

todo

  1. generate unique process id in distributed systems.

point

  1. first start, worker go to barrier together, ensure scheduler do not schedule meaningless.
  2. after first schedule worker watcher wait for a while and then re-balance
  3. worker left barrier together, ensure external program can get when all tasks are running, for k8s rolling update
  4. priority queue is for worker can start task according to their priority(todo)
  5. balancer use least-load as assign algorithm, ensure there is no worker effort too much.
  6. balancer reassignment uses a sticky strategy, change as little as possible.
  7. check worker num < max num before register worker info in etcd, use mutex

Documentation

Index

Constants

View Source
const (
	WorkerStatusNew         = "new"          // 0
	WorkerStatusRegistered  = "registered"   // 1
	WorkerStatusInBarrier   = "in_barrier"   // 2
	WorkerStatusLeftBarrier = "left_barrier" // 3
	WorkerStatusDead        = "dead"         // 4
)
View Source
const (
	ActionNew     = 1
	ActionDeleted = 2
)
View Source
const (
	ReasonFirstSchedule = "first schedule"
)
View Source
const (
	WorkerValueRunning = "running"
)

Variables

View Source
var (
	ErrWorkerNumExceedMaximum = errors.New("worker num exceed maximum")
)

Functions

func GetLocalIP

func GetLocalIP() (ipv4 string, err error)

GetLocalIP get local ip

func GetWorkerBarrierLeftKey added in v0.1.22

func GetWorkerBarrierLeftKey(rootName string) string

GetWorkerBarrierLeftKey /kline-pump-20220628/worker_barrier_left

func GetWorkerBarrierName added in v0.1.8

func GetWorkerBarrierName(rootName string) string

GetWorkerBarrierName /kline-pump/20220628/worker_barrier

Types

type Assigner added in v0.1.29

type Assigner struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func (*Assigner) GetBalancer added in v0.1.29

func (a *Assigner) GetBalancer(key, group string) (balancer.Balancer, error)

func (*Assigner) GetReBalanceResult added in v0.1.29

func (a *Assigner) GetReBalanceResult(workerList []string, taskMap map[string]Task, taskPathResp []*mvccpb.KeyValue) (toDeleteWorkerTaskKey map[string]struct{}, toDeleteTaskKey []string, assignMap map[string][]Task, err error)

GetReBalanceResult workerList current online worker list, elements are worker's name taskMap current task collection, key is ID ,value is task taskPathResp current assigned state taskPathResp []kv key: /Root/task/worker-0/task-abbr-1 value: task raw data for task 1

type Generator added in v0.1.8

type Generator func(ctx context.Context) ([]Task, error)

type Master added in v0.1.41

type Master struct {
	Node

	// balancer
	RoundRobinBalancer balancer.Balancer
	HashBalancer       balancer.Balancer
	// contains filtered or unexported fields
}

func NewMaster added in v0.1.41

func NewMaster(config MasterConfig, node Node) (*Master, error)

NewMaster create a scheduler

func (*Master) Campaign added in v0.1.58

func (m *Master) Campaign(ctx context.Context) (err error)

Campaign

@Description: 竞选期间有子context,其控制的生命周期包括
1. 成为leader 2. 监听workers的变动,通知chan进行调度 3.定时调度  3. 打印
@receiver s
@param ctx
@return error

func (*Master) ElectionKey added in v0.1.58

func (s *Master) ElectionKey() string

func (*Master) NotifySchedule added in v0.1.41

func (m *Master) NotifySchedule(request string)

func (*Master) Start added in v0.1.41

func (m *Master) Start()

Start The endless loop is for trying to election. lifecycle 1. if outer ctx done, scheduler done 2. if leader changed to the other, leader ctx done

func (*Master) Stop added in v0.1.41

func (m *Master) Stop()

type MasterConfig added in v0.1.41

type MasterConfig struct {
	// Interval configures interval of schedule task.
	// If Interval is <= 0, the default 60 seconds Interval will be used.
	Interval      time.Duration
	Timeout       time.Duration // The maximum time to schedule once
	Generator     Generator
	ReBalanceWait time.Duration
}

func (MasterConfig) Validation added in v0.1.41

func (sc MasterConfig) Validation() error

type Node

type Node struct {
	EtcdConfig clientv3.Config
	RootName   string
	// TTL configures the session's TTL in seconds.
	// If TTL is <= 0, the default 60 seconds TTL will be used.
	TTL int64 // worker registered in etcd

	MaxNumNodes int    // total worker num + 1 scheduler
	Name        string // if not set, default {ip}-{pid}
	// contains filtered or unexported fields
}

func (*Node) GetDefaultName added in v0.1.31

func (n *Node) GetDefaultName() (string, error)

func (*Node) Validate added in v0.1.57

func (n *Node) Validate() error

type RawData

type RawData []byte

type Task

type Task struct {
	P     float64
	Key   string  // if not empty, will use hash-rebalance
	Group string  //
	ID    string  // a short name which uniquely identify the task
	Raw   RawData // task value, []byte
}

type TaskChange

type TaskChange struct {
	Action int // 1 new 2 deleted
	Task
}

func (TaskChange) CreatedTask added in v0.1.14

func (t TaskChange) CreatedTask() (Task, bool)

func (TaskChange) DeletedTask added in v0.1.14

func (t TaskChange) DeletedTask() (string, bool)

func (TaskChange) String added in v0.1.18

func (t TaskChange) String() string

type WatchEvent added in v0.1.14

type WatchEvent interface {
	CreatedTask() (Task, bool)
	DeletedTask() (string, bool)
}

type Watcher added in v0.1.26

type Watcher struct {
	EventChan <-chan *clientv3.Event // output event channel

	IncipientKVs []*mvccpb.KeyValue // initial kv with prefix
	// contains filtered or unexported fields
}

func NewWatcher added in v0.1.26

func NewWatcher(ctx context.Context, client *clientv3.Client, pathPrefix string) (*Watcher, error)

NewWatcher 关于 watch 哪个版本: watch 某一个 key 时,想要从历史记录开始就用 CreateRevision,最新一条(这一条直接返回) 开始就用 ModRevision 。 watch 某个前缀,就必须使用 Revision。如果要watch当前前缀后续的变化,则应该从当前集群的 Revision+1 版本开始watch。

type Worker

type Worker struct {
	Node
	// contains filtered or unexported fields
}

func NewWorker

func NewWorker(node Node) (*Worker, error)

NewWorker create a worker

func (*Worker) Add added in v0.1.57

func (w *Worker) Add(task Task)

func (*Worker) Del added in v0.1.57

func (w *Worker) Del(id string)

func (*Worker) IsAllRunning added in v0.1.57

func (w *Worker) IsAllRunning() bool

func (*Worker) SetStatus added in v0.1.57

func (w *Worker) SetStatus(status string)

func (*Worker) Start

func (w *Worker) Start()

func (*Worker) Status added in v0.1.16

func (w *Worker) Status() string

func (*Worker) Stop

func (w *Worker) Stop()

func (*Worker) Tasks added in v0.1.23

func (w *Worker) Tasks(ctx context.Context) (map[string]struct{}, error)

func (*Worker) TryLeaveBarrier added in v0.1.16

func (w *Worker) TryLeaveBarrier(d time.Duration) bool

func (*Worker) WatchTask added in v0.1.16

func (w *Worker) WatchTask() <-chan WatchEvent

type WorkerTask added in v0.1.49

type WorkerTask struct {
	Task
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL