AGScheduler

package module
v0.0.0-...-7b0deeb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 4, 2020 License: MIT Imports: 13 Imported by: 0

README

AGScheduler

Framework
  • Scheduler: 调度核心
  • Task: 任务模块
  • Store: 任务存储模块
    • Memory (Done): 存储在内存中
    • Postgrosql (Todo): 存储在PG中
    • Redis (Todo): 存储在Redis中
  • Trigger: 任务触发模块
    • Date (Done): 执行一次后删除任务,参考time.NewTimer()
    • Interval (Done): 定期执行,参考time.NewTicker()
    • Cron (Done): 根据cron指令周期性执行任务
how to use

执行date

func main() {
	store := stores.NewMemoryStore()
	scheduler := schedulers.NewScheduler(AGScheduler.WorksMap{}, store)

	dateTrigger, _ := triggers.NewDateTrigger(time.Now().Add(time.Second * 1))
	task := tasks.NewTask("task1", func(args []interface{}) {}, []interface{}{"this", "is", "task1"}, dateTrigger)
	_ = scheduler.AddTask(task)

	go func() {
		time.Sleep(time.Second * 10)
		os.Exit(0)
	}()
	scheduler.Start()
}

执行interval

func main() {
	store := stores.NewMemoryStore()
	scheduler := schedulers.NewScheduler(AGScheduler.WorksMap{}, store)

	now := time.Now()
	trigger, _ := triggers.NewIntervalTrigger(now.Add(time.Second*1), now.Add(time.Second*30), time.Second*5)
	task := tasks.NewTask("task1", func(args []interface{}) {}, []interface{}{"this", "is", "task1"}, trigger)

	_ = scheduler.AddTask(task)

	go func() {
		time.Sleep(time.Second * 60)
		os.Exit(0)
	}()
	scheduler.Start()
}

执行cron

func main() {
	store := stores.NewMemoryStore()
	scheduler := schedulers.NewScheduler(AGScheduler.WorksMap{}, store)

	cronTrigger, _ := triggers.NewCronTrigger("*/5 * * * *")

	task := tasks.NewTask("task1", taskFunc, []interface{}{"this", "is", "task1"}, cronTrigger)
	_ = scheduler.AddTask(task)

	go func() {
		time.Sleep(time.Second * 60)
		os.Exit(0)
	}()
	scheduler.Start()
}

停止和启动任务

func main() {
	now := time.Now()
	scheduler := schedulers.NewScheduler(AGScheduler.WorksMap{}, stores.NewMemoryStore())

	trigger1, _ := triggers.NewIntervalTrigger(now.Add(time.Second*1), AGScheduler.EmptyDateTime, time.Second*5)
	task1 := tasks.NewTask("task1", func(args []interface{}) {}, []interface{}{"this", "is", "task1"}, trigger1)
	_ = scheduler.AddTask(task1)

	go func() {
		time.Sleep(time.Second * 10)
		task1.Pause()
		fmt.Println("Pause", time.Now())
		time.Sleep(time.Second * 20)
		fmt.Println("Resume", time.Now())
		task1.Resume()
	}()

	scheduler.Start()
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var EmptyDateTime time.Time
View Source
var MaxDateTime = time.Now().Add(time.Duration(math.MaxInt64))
View Source
var WorksMap = map[string]WorkDetail{}

Functions

func RegisterWorksMap

func RegisterWorksMap(worksMap map[string]WorkDetail) error

Types

type Controller

type Controller struct {
	Ctx      context.Context
	Deadline context.Context
	Cancel   context.CancelFunc
}

func NewController

func NewController() *Controller

func (*Controller) Reset

func (c *Controller) Reset(deadlineTime time.Time)

type CronState

type CronState struct {
	CronCmd string `json:"cron_cmd"`
}

type CronTrigger

type CronTrigger struct {
	CronCmd   string
	StartTime time.Time
	Schedule  cron.Schedule
}

* Cron *

func NewCronTrigger

func NewCronTrigger(cronCmd string) (*CronTrigger, error)

func (*CronTrigger) GetTriggerState

func (c *CronTrigger) GetTriggerState() TriggerState

func (*CronTrigger) NextFireTime

func (c *CronTrigger) NextFireTime(previous, now time.Time) time.Time

type DateState

type DateState struct {
	RunDateTime time.Time `json:"run_date_time"`
}

type DateTrigger

type DateTrigger struct {
	RunDateTime time.Time
}

* Date *

func NewDateTrigger

func NewDateTrigger(runDateTime time.Time) (*DateTrigger, error)

func (*DateTrigger) GetTriggerState

func (d *DateTrigger) GetTriggerState() TriggerState

func (*DateTrigger) NextFireTime

func (d *DateTrigger) NextFireTime(previous, now time.Time) time.Time

type IStore

type IStore interface {
	GetDueTasks(now time.Time) []*Task
	GetTaskByName(name string) (*Task, error)
	GetAllTasks() []*Task
	AddTask(task *Task) error
	DelTask(task *Task) error
	UpdateTask(task *Task) error
	GetNextRunTime() time.Time
}

type ITrigger

type ITrigger interface {
	NextFireTime(previous, now time.Time) time.Time
	GetTriggerState() TriggerState
}

func FromTriggerState

func FromTriggerState(state TriggerState) (ITrigger, error)

type IntervalState

type IntervalState struct {
	StartRunTime time.Time     `json:"start_run_time"`
	EndRunTime   time.Time     `json:"end_run_time"`
	Interval     time.Duration `json:"interval"`
}

type IntervalTrigger

type IntervalTrigger struct {
	Interval     time.Duration
	StartRunTime time.Time
	EndRunTime   time.Time
}

* Interval *

func NewIntervalTrigger

func NewIntervalTrigger(startTime, endTime time.Time, interval time.Duration) (*IntervalTrigger, error)

func (*IntervalTrigger) GetTriggerState

func (i *IntervalTrigger) GetTriggerState() TriggerState

func (*IntervalTrigger) NextFireTime

func (i *IntervalTrigger) NextFireTime(previous, now time.Time) time.Time

type MemoryStore

type MemoryStore struct {
	Tasks    *list.List
	TasksMap map[string]*list.Element
}

* MemoryStore *

func NewMemoryStore

func NewMemoryStore() *MemoryStore

func (*MemoryStore) AddTask

func (m *MemoryStore) AddTask(task *Task) error

func (*MemoryStore) DelTask

func (m *MemoryStore) DelTask(task *Task) error

func (*MemoryStore) GetAllTasks

func (m *MemoryStore) GetAllTasks() []*Task

func (*MemoryStore) GetDueTasks

func (m *MemoryStore) GetDueTasks(now time.Time) []*Task

func (*MemoryStore) GetNextRunTime

func (m *MemoryStore) GetNextRunTime() time.Time

func (*MemoryStore) GetTaskByName

func (m *MemoryStore) GetTaskByName(name string) (*Task, error)

func (*MemoryStore) UpdateTask

func (m *MemoryStore) UpdateTask(task *Task) error

type PgStore

type PgStore struct {
	Pg     *pg.DB
	Logger *logrus.Entry
}

* PostGreSQL *

func NewPgStore

func NewPgStore(pg *pg.DB) (*PgStore, error)

func (*PgStore) AddTask

func (p *PgStore) AddTask(task *Task) error

func (*PgStore) DelTask

func (p *PgStore) DelTask(task *Task) error

func (*PgStore) GetAllTasks

func (p *PgStore) GetAllTasks() []*Task

func (*PgStore) GetDueTasks

func (p *PgStore) GetDueTasks(now time.Time) []*Task

func (*PgStore) GetNextRunTime

func (p *PgStore) GetNextRunTime() time.Time

func (*PgStore) GetTaskByName

func (p *PgStore) GetTaskByName(name string) (*Task, error)

func (*PgStore) UpdateTask

func (p *PgStore) UpdateTask(task *Task) error

type Scheduler

type Scheduler struct {
	StoresMap   map[string]IStore
	Logger      *logrus.Entry
	Controller  *Controller
	CloseCancel context.CancelFunc
}

func NewScheduler

func NewScheduler(store IStore) *Scheduler

func (*Scheduler) AddTask

func (s *Scheduler) AddTask(task *Task) error

func (*Scheduler) AddTaskFromTasksMap

func (s *Scheduler) AddTaskFromTasksMap(name, taskMapKey string, trigger ITrigger, args ...interface{}) error

func (*Scheduler) Close

func (s *Scheduler) Close()

func (*Scheduler) DelTask

func (s *Scheduler) DelTask(task *Task) error

func (*Scheduler) GetAllTasks

func (s *Scheduler) GetAllTasks() []*Task

func (*Scheduler) GetTaskByName

func (s *Scheduler) GetTaskByName(name string) (*Task, error)

func (*Scheduler) Start

func (s *Scheduler) Start()

func (*Scheduler) UpdateTask

func (s *Scheduler) UpdateTask(task *Task) error

func (*Scheduler) Wake

func (s *Scheduler) Wake()

type Task

type Task struct {
	Id              int64                     `json:"id" pg:",pk"`
	Name            string                    `json:"name" pg:",use_zero"`
	WorkKey         string                    `json:"work_key" pg:",use_zero"`
	Func            func(args ...interface{}) `json:"func" pg:"-"`
	Args            []interface{}             `json:"args" pg:",use_zero"`
	Scheduler       *Scheduler                `json:"scheduler" pg:"-"`
	Trigger         ITrigger                  `json:"trigger" pg:"-"`
	TriggerState    TriggerState              `json:"trigger_state" pg:",use_zero"`
	PreviousRunTime time.Time                 `json:"previous_run_time" pg:",use_zero"`
	NextRunTime     time.Time                 `json:"next_run_time" pg:",use_zero"`
	Logger          *logrus.Entry             `json:"logger" pg:"-"`
	Running         bool                      `json:"running" pg:",use_zero"`
	Coalesce        bool                      `json:"coalesce" pg:",use_zero"`
	Count           int64                     `json:"count" pg:",use_zero"`
}

func FromPgTask

func FromPgTask(pgTask *Task, trigger ITrigger, workDetail WorkDetail) *Task

func NewTask

func NewTask(
	name string,
	trigger ITrigger,
	method func(args ...interface{}),
	args ...interface{},
) *Task

func (*Task) Delete

func (t *Task) Delete() error

func (*Task) GetNextFireTime

func (t *Task) GetNextFireTime(now time.Time) time.Time

func (*Task) Go

func (t *Task) Go(runTime time.Time)

func (*Task) Pause

func (t *Task) Pause() error

func (*Task) Resume

func (t *Task) Resume() error

func (*Task) UpdateTrigger

func (t *Task) UpdateTrigger(trigger ITrigger) error

type TriggerState

type TriggerState struct {
	Name     string        `json:"name"`
	Cron     CronState     `json:"cron"`
	Date     DateState     `json:"date"`
	Interval IntervalState `json:"interval"`
}

type WorkDetail

type WorkDetail struct {
	Func func(args ...interface{})
	Args []interface{}
}

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL