agscheduler

package module
v0.0.0-...-15d225b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 7, 2021 License: MIT Imports: 17 Imported by: 0

README

AGScheduler

Framework
  • Scheduler: 调度核心
  • Task: 任务模块
  • Store: 任务存储模块
    • Memory (Done): 存储在内存中
    • Postgrosql (Todo): 存储在PG中
    • Redis (Todo): 存储在Redis中
  • Trigger: 任务触发模块
    • Date (Done): 执行一次后删除任务,参考time.NewTimer()
    • Interval (Done): 定期执行,参考time.NewTicker()
    • Cron (Done): 根据cron指令周期性执行任务
how to use

执行date

func main() {
	store := stores.NewMemoryStore()
	scheduler := schedulers.NewScheduler(AGScheduler.WorksMap{}, store)

	dateTrigger, _ := triggers.NewDateTrigger(time.Now().Add(time.Second * 1))
	task := tasks.NewTask("task1", func(args []interface{}) {}, []interface{}{"this", "is", "task1"}, dateTrigger)
	_ = scheduler.AddTask(task)

	go func() {
		time.Sleep(time.Second * 10)
		os.Exit(0)
	}()
	scheduler.Start()
}

执行interval

func main() {
	store := stores.NewMemoryStore()
	scheduler := schedulers.NewScheduler(AGScheduler.WorksMap{}, store)

	now := time.Now()
	trigger, _ := triggers.NewIntervalTrigger(now.Add(time.Second*1), now.Add(time.Second*30), time.Second*5)
	task := tasks.NewTask("task1", func(args []interface{}) {}, []interface{}{"this", "is", "task1"}, trigger)

	_ = scheduler.AddTask(task)

	go func() {
		time.Sleep(time.Second * 60)
		os.Exit(0)
	}()
	scheduler.Start()
}

执行cron

func main() {
	store := stores.NewMemoryStore()
	scheduler := schedulers.NewScheduler(AGScheduler.WorksMap{}, store)

	cronTrigger, _ := triggers.NewCronTrigger("*/5 * * * *")

	task := tasks.NewTask("task1", taskFunc, []interface{}{"this", "is", "task1"}, cronTrigger)
	_ = scheduler.AddTask(task)

	go func() {
		time.Sleep(time.Second * 60)
		os.Exit(0)
	}()
	scheduler.Start()
}

停止和启动任务

func main() {
	now := time.Now()
	scheduler := schedulers.NewScheduler(AGScheduler.WorksMap{}, stores.NewMemoryStore())

	trigger1, _ := triggers.NewIntervalTrigger(now.Add(time.Second*1), AGScheduler.EmptyDateTime, time.Second*5)
	task1 := tasks.NewTask("task1", func(args []interface{}) {}, []interface{}{"this", "is", "task1"}, trigger1)
	_ = scheduler.AddTask(task1)

	go func() {
		time.Sleep(time.Second * 10)
		task1.Pause()
		fmt.Println("Pause", time.Now())
		time.Sleep(time.Second * 20)
		fmt.Println("Resume", time.Now())
		task1.Resume()
	}()

	scheduler.Start()
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	MaxTolerance = 1024
	InstanceLock = sync.Mutex{}
	InstanceMap  = map[string]int{}
)
View Source
var (
	AGSLog = logrus.New()
	Log    = AGSLog.WithFields(GenAGSVersion())
)
View Source
var (
	STATUS_RUNNING = STATUS("RUNNING")
	STATUS_PAUSED  = STATUS("PAUSED")
	STATUS_STOPPED = STATUS("STOPPED")
)
View Source
var (
	MinDateTime = time.Time{}
	MaxDateTime = time.Now().Add(time.Duration(math.MaxInt64))
)
View Source
var (
	Version = "0.0.1"
	Author  = "https://github.com/CzaOrz"
	GitHub  = "https://github.com/CzaOrz/agscheduler"
	GitBook = "https://github.com/CzaOrz/agscheduler/blob/main/README.md"
	Email   = "972542655@qq.com"
)
View Source
var (
	AGSContext = context.Background()
)
View Source
var JobsMapLock = sync.Mutex{}
View Source
var PG *pg.DB

Functions

func DeleteInstance

func DeleteInstance(key string)

func DeserializeTask

func DeserializeTask(job *Job) error

func DeserializeTrigger

func DeserializeTrigger(job *Job) error

func GenAGSDetails

func GenAGSDetails() logrus.Fields

func GenAGSVersion

func GenAGSVersion() logrus.Fields

func GenASGModule

func GenASGModule(module string) logrus.Fields

func IncreaseInstance

func IncreaseInstance(key string) int

func NewPG

func NewPG() *pg.DB

func ReduceInstance

func ReduceInstance(key string) int

func RegisterAllTasks

func RegisterAllTasks(tasks ...ITask)

func SerializeTask

func SerializeTask(job *Job) error

func SerializeTrigger

func SerializeTrigger(job *Job) error

Types

type AGSConfig

type AGSConfig struct {
	Log LogConfig
	PG  PGConfig
}
var Config AGSConfig

type AGScheduler

type AGScheduler struct {
	Store  IStore
	Logger *logrus.Entry
	Status STATUS
	// context.
	Context    context.Context
	WaitCancel context.CancelFunc
}

func (*AGScheduler) AddJob

func (ags *AGScheduler) AddJob(jobs ...*Job) (err error)

func (*AGScheduler) Close

func (ags *AGScheduler) Close() error

func (*AGScheduler) DelJob

func (ags *AGScheduler) DelJob(jobs ...*Job) (err error)

func (*AGScheduler) FillByDefault

func (ags *AGScheduler) FillByDefault()

func (*AGScheduler) GetAllJobs

func (ags *AGScheduler) GetAllJobs() (jobs []*Job, err error)

func (*AGScheduler) GetJobByJobName

func (ags *AGScheduler) GetJobByJobName(jobName string) (job *Job, err error)

func (*AGScheduler) Pause

func (ags *AGScheduler) Pause()

func (*AGScheduler) Start

func (ags *AGScheduler) Start()

func (*AGScheduler) UpdateJob

func (ags *AGScheduler) UpdateJob(jobs ...*Job) (err error)

func (*AGScheduler) WaitWithTime

func (ags *AGScheduler) WaitWithTime(waitTime time.Time)

func (*AGScheduler) Wake

func (ags *AGScheduler) Wake()

type CronTrigger

type CronTrigger struct {
	CronCmd      string        `json:"cron_cmd"`
	StartRunTime time.Time     `json:"start_run_time"`
	EndRunTime   time.Time     `json:"end_run_time"`
	CronIns      cron.Schedule `json:"-"`
}

func (*CronTrigger) GetNextRunTime

func (t *CronTrigger) GetNextRunTime(previous, now time.Time) time.Time

type DateTrigger

type DateTrigger struct {
	NextRunTime time.Time `json:"next_run_time"`
}

func (DateTrigger) GetNextRunTime

func (t DateTrigger) GetNextRunTime(previous, now time.Time) time.Time

if previous is empty, return NextRunTime directly

type IStore

type IStore interface {
	GetSchedulingJobs(now time.Time) ([]*Job, error)
	GetJobByName(name string) (*Job, error)
	GetAllJobs() ([]*Job, error)
	AddJob(job *Job) error
	DelJob(job *Job) error
	UpdateJob(job *Job) error
	GetNextRunTime() (time.Time, error)
}

GetSchedulingJobs: get jobs which should be scheduled now. GetJobByName: find job by name. GetNextRunTime: get the first schedule time from all jobs.

type ITask

type ITask interface {
	Run(ctx context.Context)
}

type ITrigger

type ITrigger interface {
	GetNextRunTime(previous, now time.Time) time.Time
}

GetNextRunTime: if result is MinDateTime, it mean this Job is over, should be delete.

type IntervalTrigger

type IntervalTrigger struct {
	Interval     time.Duration `json:"interval"`
	StartRunTime time.Time     `json:"start_run_time"`
	EndRunTime   time.Time     `json:"end_run_time"`
}

func (IntervalTrigger) GetNextRunTime

func (t IntervalTrigger) GetNextRunTime(previous, now time.Time) time.Time

type Job

type Job struct {
	Id             int                    `json:"id" pg:",pk"`
	Name           string                 `json:"name" pg:",use_zero,unique"`
	Task           ITask                  `json:"-" pg:"-"`
	Trigger        ITrigger               `json:"-" pg:"-"`
	Status         STATUS                 `json:"status" pg:",use_zero"`
	NotCoalesce    bool                   `json:"not_coalesce" pg:",use_zero"`
	MaxInstances   int                    `json:"max_instances" pg:",use_zero"`
	DelayGraceTime time.Duration          `json:"delay_grace_time" pg:",use_zero"`
	Scheduler      AGScheduler            `json:"-" pg:"-"`
	NextRunTime    time.Time              `json:"next_run_time" pg:",use_zero"`
	Logger         *logrus.Entry          `json:"-" pg:"-"`
	TriggerMeta    TriggerMeta            `json:"trigger_meta" pg:",use_zero"`
	TaskMeta       map[string]interface{} `json:"task_meta" pg:",use_zero"`
	// contains filtered or unexported fields
}

func (*Job) FillByDefault

func (j *Job) FillByDefault()

func (*Job) GetRunTimes

func (j *Job) GetRunTimes(now time.Time) []time.Time

func (*Job) Run

func (j *Job) Run(runTimes []time.Time)

type LogConfig

type LogConfig struct {
	Level string `default:"info"`
	Json  bool   `default:"false"`
}

type MemoryStore

type MemoryStore struct {
	Jobs    *list.List
	JobsMap map[string]*list.Element
	Logger  *logrus.Entry
}

func (*MemoryStore) AddJob

func (ms *MemoryStore) AddJob(job *Job) error

func (*MemoryStore) DelJob

func (ms *MemoryStore) DelJob(job *Job) error

func (*MemoryStore) FillByDefault

func (ms *MemoryStore) FillByDefault()

func (*MemoryStore) GetAllJobs

func (ms *MemoryStore) GetAllJobs() ([]*Job, error)

func (*MemoryStore) GetJobByName

func (ms *MemoryStore) GetJobByName(name string) (*Job, error)

func (*MemoryStore) GetNextRunTime

func (ms *MemoryStore) GetNextRunTime() (time.Time, error)

func (*MemoryStore) GetSchedulingJobs

func (ms *MemoryStore) GetSchedulingJobs(now time.Time) ([]*Job, error)

func (*MemoryStore) UpdateJob

func (ms *MemoryStore) UpdateJob(job *Job) error

type Options

type Options struct{}

type PGConfig

type PGConfig struct {
	Addr     string `default:"localhost:5432"`
	User     string `default:"postgres"`
	Password string `default:"postgres"`
	Database string `default:"postgres"`
	PoolSize int    `default:"3"`
}

type PostgresStore

type PostgresStore struct {
	Logger *logrus.Entry
	PG     *pg.DB
}

func (*PostgresStore) AddJob

func (ps *PostgresStore) AddJob(job *Job) error

func (*PostgresStore) DelJob

func (ps *PostgresStore) DelJob(job *Job) error

func (*PostgresStore) FillByDefault

func (ps *PostgresStore) FillByDefault()

func (*PostgresStore) GetAllJobs

func (ps *PostgresStore) GetAllJobs() ([]*Job, error)

func (*PostgresStore) GetJobByName

func (ps *PostgresStore) GetJobByName(name string) (*Job, error)

func (*PostgresStore) GetNextRunTime

func (ps *PostgresStore) GetNextRunTime() (time.Time, error)

func (*PostgresStore) GetSchedulingJobs

func (ps *PostgresStore) GetSchedulingJobs(now time.Time) ([]*Job, error)

func (*PostgresStore) UpdateJob

func (ps *PostgresStore) UpdateJob(job *Job) error

type STATUS

type STATUS string

func (*STATUS) IsPaused

func (s *STATUS) IsPaused() bool

func (*STATUS) IsRunning

func (s *STATUS) IsRunning() bool

func (*STATUS) IsStopped

func (s *STATUS) IsStopped() bool

func (*STATUS) SetPaused

func (s *STATUS) SetPaused()

func (*STATUS) SetRunning

func (s *STATUS) SetRunning()

func (*STATUS) SetStopped

func (s *STATUS) SetStopped()

type TriggerMeta

type TriggerMeta struct {
	Type         string        `json:"type"`
	NextRunTime  time.Time     `json:"next_run_time"`
	Interval     time.Duration `json:"interval"`
	StartRunTime time.Time     `json:"start_run_time"`
	EndRunTime   time.Time     `json:"end_run_time"`
	CronCmd      string        `json:"cron_cmd"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL