scheduler

package module
v0.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 27, 2023 License: MIT Imports: 5 Imported by: 1

README

go-scheduler

A simple scheduler for goroutines, go-scheduler helps to manage goroutines, only needs to set three optional quotas:

  • the maximum count of goroutines
  • the maximum count of processed requests per interval
  • the maximum value of rate (processed requests per interval/ incoming requests per interval)

Actually go-sheduler only adjust count of goroutines to satisfy those quotas if set, the default strategy works like gradienter, if runtime statistics don't match any quotas, go-sheduler starts to work.
Since scheduler manage goroutines to handle user's Request which contains Data and Handler, the scheduler simple call Request.Handler(Request.Data).
note: three optional quotas are only undercontrolled in go-scheduler

How-to-use

go-scheduler supplies several easy-understand and easy-integrate interfaces, Let's see a easy sample.

import (
    "fmt"
    "sync/atomic"
    "time"
    scheduler "github.com/singchia/go-scheduler"
)

func main() {
    sch := scheduler.NewScheduler()
	
    sch.SetMaxGoroutines(5000)
    sch.StartSchedule()
	
    var val int64
    for i := 0; i < 10*10000*10000; i++ {
        sch.PublishRequest(&scheduler.Request{Data: val, Handler: SchedulerHandler})
        atomic.AddInt64(&val, 1)
    }
    time.Sleep(time.Second * 5)
    fmt.Printf("maxValue: %d\n", maxValue)
    sch.Close()
}
	
var maxValue int64 = 0
	
func SchedulerHandler(data interface{}) {
    val, ok := data.(int64)
    if ok {
        if val > maxValue {
            maxValue = val
        }
    }
}

It's not a good sample in production environment, but it does illustrate the usage of go-scheduler. After SetMaxGoroutines(5000), the max count of scheduler's goroutines shouldn't go beyond the range 5000, use StartSchedule to start the scheduler, publish the Request into the scheduler by using PublishRequest, then scheduler will handle the request undercontrol.

Installation

If you don't have the Go development environment installed, visit the Getting Started document and follow the instructions. Once you're ready, execute the following command:

go get -u github.com/singchia/go-scheduler

Interfaces

Scheduler.Interval

This should be set before call StartSchedule and bigger than 500us, if not set or less than 500us, default 200ms.

Scheduler.SetMaxGoroutines(int64)

This limits the max count of goroutines in go-scheduler, can be set at any time.

Scheduler.SetMaxProcessedReqs(int64)

This limits the max processed requests per interval, can be set at any time.

Scheduler.SetMaxRate(float64)

The rate is the value of processed requests / incoming requests, bigger means you want a faster speed to handle requests, can be set at any time.

Scheduler.SetDefaultHandler(scheduler.Handler)

If you want set a default handler when scheduler.Request.Handler not given, can be set at any time.

Scheduler.SetMonitor(scheduler.Monitor)

You can use this to monitor incoming requests, processed requests, shift(changing of goroutines), count of goroutines this interval, can be set at any time.

Scheduler.SetStrategy(scheduler.Strategy)

scheduler.Strategy is the key deciding how to shift(update) the count of goroutines, you can replace it as your own strategy.

Strategy

scheduler.Gradienter

Defaultly go-scheduler uses Gradienter as strategy, it behaves like:

if incoming requests == 0 then shrink 20%
if any quotas > max quotas then shrink the count of goroutines
	if quotas == 1 then shrink directly to MaxGoroutines
	else shrink 20%  
if all quotas < max quotas then expand randFloat * incomingReqs / (incomingReqs + maxCountGoroutines) * (maxCountGoroutines - currentCountGoroutines)
other strategies

In scheduler file, a circularLink.go exists, I was trying to look for next goroutines-updating by using history status, but temporarily no idea came up, if you have some idea welcome to contact me.

Documentation

Index

Constants

View Source
const (
	NoNeedUpdating = iota
	NumActivesNeedsExpansion
	NumActivesNeedsShrinking
	MaxProcessedReqsNeedsExpansion
	MaxProcessedReqsNeedsShrinking
	MaxRateNeedsExpansion
	MaxRateNeedsShrinking
)
View Source
const (
	MaxDefaultGoRoutines    int64   = 500 * 10000
	MaxDefaultProcessedReqs int64   = int64(^uint(0) >> 1)
	MaxDefaultRate          float64 = 1000
)

Variables

This section is empty.

Functions

This section is empty.

Types

type CircularList

type CircularList struct {
	// contains filtered or unexported fields
}

func NewCircularList

func NewCircularList() *CircularList

func (*CircularList) AppendNode

func (c *CircularList) AppendNode(data interface{}) *CircularNode

func (*CircularList) DeleteNode

func (c *CircularList) DeleteNode(node *CircularNode) bool

func (*CircularList) ForEach

func (c *CircularList) ForEach(f ForEachFunc) bool

func (*CircularList) GetCurNode

func (c *CircularList) GetCurNode() *CircularNode

func (*CircularList) GetCurNodeWithNoCopied

func (c *CircularList) GetCurNodeWithNoCopied() *CircularNode

func (*CircularList) RightShiftCurPointer

func (c *CircularList) RightShiftCurPointer() bool

func (*CircularList) RightShiftCurPointerAndUpdate

func (c *CircularList) RightShiftCurPointerAndUpdate(data interface{}) bool

func (*CircularList) RightShiftCurPointerToCertainNode

func (c *CircularList) RightShiftCurPointerToCertainNode(dstNode *CircularNode) bool

type CircularNode

type CircularNode struct {
	// contains filtered or unexported fields
}

type ForEachFunc

type ForEachFunc func(node *CircularNode) error

type Gradienter

type Gradienter struct {
	// contains filtered or unexported fields
}

func NewGradienter

func NewGradienter() *Gradienter

func (*Gradienter) ExpandOrShrink

func (g *Gradienter) ExpandOrShrink(ir int64, pr int64, numActives int64) (diff int64)

func (*Gradienter) SetMaxActives

func (g *Gradienter) SetMaxActives(maxActives int64)

already locked at sheduler

func (*Gradienter) SetMaxProcessedReqs

func (g *Gradienter) SetMaxProcessedReqs(maxProcessedReqs int64)

func (*Gradienter) SetMaxRate

func (g *Gradienter) SetMaxRate(maxRate float64)

type Handler

type Handler func(data interface{})

type Monitor

type Monitor func(incomingReqsLastInterval, processedReqsLastInterval, shift, numActives int64)

type Request

type Request struct {
	Data    interface{}
	Handler Handler
}

type Scheduler

type Scheduler struct {
	//the interval of goroutines number changing, min 200ms, default 1s
	//should be set before Schedule called
	Interval time.Duration
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler() *Scheduler

func (*Scheduler) Close

func (s *Scheduler) Close()

func (*Scheduler) PublishRequest

func (s *Scheduler) PublishRequest(req *Request)

func (*Scheduler) SetDefaultHandler

func (s *Scheduler) SetDefaultHandler(handler Handler)

func (*Scheduler) SetMaxGoroutines

func (s *Scheduler) SetMaxGoroutines(maxCountGoroutines int64)

max number of active goroutines, -1 means not limited default -1, can be set at any runtime

func (*Scheduler) SetMaxProcessedReqs

func (s *Scheduler) SetMaxProcessedReqs(maxProcessedReqs int64)

max number of request processed per second, -1 means not limited default -1, can be set at any runtime

func (*Scheduler) SetMaxRate

func (s *Scheduler) SetMaxRate(rate float64)

max rate of (incoming requests)/(processed requests), should between 0 and 1 default 1

func (*Scheduler) SetMonitor

func (s *Scheduler) SetMonitor(monitor Monitor)

func (*Scheduler) SetStrategy

func (s *Scheduler) SetStrategy(strategy Strategy)

func (*Scheduler) StartSchedule

func (s *Scheduler) StartSchedule()

type Strategy

type Strategy interface {
	//SetMaxQuota(maxProcessedReqs int, maxRate float64, maxActives int)
	SetMaxActives(maxActives int64)
	SetMaxProcessedReqs(maxProcessedReqs int64)
	SetMaxRate(maxRate float64)

	ExpandOrShrink(incomingRegsItv int64, processedReqsItv int64, numActives int64) int64
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL