htask

package module
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 30, 2018 License: MIT Imports: 4 Imported by: 4

README

htask (min Heap TASK scheduler)

CircleCI

High Scalable In-memory task scheduler using Min Heap implemented in Golang.

htask creates only 1 (scheduler) + n (worker) goroutines, NOT creating goroutines for each task.

if workers size == 0 then scheduler create goroutine for each task when timer have expired.

github.com/kawasin73/htask/cron is wrapper of htask.Scheduler, cron implementation with human friendly interface.

Install

go get github.com/kawasin73/htask

Cron Usage

package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/kawasin73/htask/cron"
)

func main() {
	var wg sync.WaitGroup
	workers := 1
	c := cron.NewCron(&wg, cron.Option{
		Workers: workers,
	})

	task := func() {
		fmt.Println("hello world")
	}

	// executed every 10:11 AM.
	c.Every(1).Day().At(10, 11).Run(task)

	// task will be executed in every 1 minute from now.
	c.Every(1).Minute().Run(task)

	tenSecondsLater := time.Now().Add(10 * time.Second)
	// executed in every 2 seconds started from 10 seconds later.
	cancel, err := c.Every(2).Second().From(tenSecondsLater).Run(task)
	if err != nil {
		// handle error
	}

	// cron can schedule one time task.
	c.Once(tenSecondsLater.Add(time.Minute)).Run(func() {
		// task can be cancelled.
		cancel()
	})

	c.ChangeWorkers(0)

	time.Sleep(3 * time.Second)

	// on shutdown all queued task will be discarded.
	c.Close()
	wg.Wait()
}

Scheduler Usage

package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"github.com/kawasin73/htask"
)

func main() {
	var wg sync.WaitGroup
	workers := 1
	scheduler := htask.NewScheduler(&wg, workers)

	ctx, _ := context.WithCancel(context.Background())
	scheduler.Set(ctx.Done(), time.Now().Add(time.Second*2), func(t time.Time) {
		fmt.Println("later executed at :", t)
	})
	scheduler.Set(ctx.Done(), time.Now().Add(time.Second), func(t time.Time) {
		fmt.Println("first executed at :", t)
		// it can set to scheduler while executing task.
		scheduler.Set(ctx.Done(), time.Now().Add(time.Millisecond*500), func(t time.Time) {
			fmt.Println("second executed at :", t)
		})
	})

	scheduler.ChangeWorkers(10)

	time.Sleep(3 * time.Second)

	// on shutdown
	scheduler.Close()
	wg.Wait()
}

Interface

Scheduler Interface

  • func NewScheduler(wg *sync.WaitGroup, workers int) *Scheduler
  • func (s *Scheduler) Set(chCancel <-chan struct{}, t time.Time, task func(time.Time)) error
  • func (s *Scheduler) ChangeWorkers(workers int) error
  • func (s *Scheduler) Close() error

Notes

  • min heap have no limit size.
  • when main context is canceled, all pending tasks will be discarded.

LICENSE

MIT

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrClosed         = errors.New("scheduler is already closed")
	ErrInvalidWorkers = errors.New("workers must be more than 0")
	ErrInvalidTime    = errors.New("time is invalid zero time")
	ErrInvalidTask    = errors.New("task must not be nil")
	ErrTaskCancelled  = errors.New("task cancelled")
)

errors

View Source
var (
	ErrMax = errors.New("heap max size")
)

errors

Functions

This section is empty.

Types

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler is used to schedule tasks.

func NewScheduler

func NewScheduler(wg *sync.WaitGroup, workers int) *Scheduler

NewScheduler creates Scheduler and start scheduler and workers. number of created goroutines is counted to sync.WaitGroup.

func (*Scheduler) ChangeWorkers

func (c *Scheduler) ChangeWorkers(workers int) error

ChangeWorkers will change workers size. workers must greater than 0. if new size is smaller, shut appropriate number of workers down. if new size is bigger, create appropriate number of workers.

func (*Scheduler) Close

func (c *Scheduler) Close() error

Close shutdown scheduler and workers goroutine. if Scheduler is already closed then returns ErrClosed.

func (*Scheduler) Set

func (c *Scheduler) Set(chCancel <-chan struct{}, t time.Time, task func(time.Time)) error

Set enqueue new task to scheduler heap queue. task will be cancelled by closing chCancel. chCancel == nil is acceptable.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL