tasker

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 30, 2018 License: MIT Imports: 11 Imported by: 0

README

Tasker

A light distribute producer&consumer task model based on beego.

Features

  • Custom task input, execute logic
  • Auto retry after task execute failed

Limited

  • based on beego orm

Installation

go get github.com/NoneBorder/tasker

Usage

Register to beego model
...
orm.RegisterModel(new(tasker.Task))
...
Create your task struct
type ExampleTask struct {
    Name string
}

func (self *ExampleTask) New() tasker.MsgQ {
    return &ExampleTask{}
}

func (self *ExampleTask) Topic() string {
    return "example_task"
}

func (self *ExampleTask) TaskSpec() string {
    // beego task spec format
    return "*/1 * * * * *"
}

func (self *ExampleTask) Exec(workerID uint64) error {
    fmt.Println(self.Name)
    return nil
}
Publish task
tasker.MsgQPublish(&ExampleTask{
    Name: "example test",
})
Consume task
  • control consume your self
tasker.MsgQConsume(new(ExampleTask))
  • consumer generate as taskspec
tasker.MsgQInitTask(new(ExampleTask))

License

The MIT License (MIT)

Documentation

Overview

Package tasker is a light distribute producer&consumer task model based on beego.

Task

the main description of message or job. the state machine like this:

                                +----> failed
                                |
pending ----+----> running -----+----> success
            ^                   |
            |                   v
            +---------------- retry

Index

Constants

View Source
const (
	TaskStatPending = "pending"
	TaskStatRunning = "running"
	TaskStatRetry   = "retry"
	TaskStatFailed  = "failed"
	TaskStatSuccess = "success"
)

Variables

View Source
var InstanceID uint16

InstanceID is the tasker instance uniq key.

View Source
var IsMaster bool

IsMaster when true, the instance is master instance.

View Source
var RegisteredTask map[string]MsgQ

RegisteredTask is a map record all consumer task

UniqID use for generate worker id.

Functions

func Consume

func Consume(topic string, fn ConsumeFn, concurency ...int) (int, error)

Consume a topic task.

func FQDN

func FQDN() string

FQDN Get Fully Qualified Domain Name returns "unknown" or hostanme in case of error

func Init

func Init(MachineID func() (uint16, error), CheckMachineID func(uint16) bool) (err error)

Init will initialize the tasker instance, include:

  1. generate the InstanceID use MachineID func, use instance private ip address when MachineID is nil
  2. start race master in goroutine
  3. initialize all task

func InitAllTask

func InitAllTask()

InitAllTask will init all RegisteredTask to beego toolbox, this will start consume task

func InitIDGEN

func InitIDGEN(MachineID func() (uint16, error), CheckMachineID func(uint16) bool) error

InitIDGEN is initialize the ID generator, does not need invoke if had use `Init`

func MsgQConsume

func MsgQConsume(m MsgQ) error

MsgQConsume is for consume a message type.

func MsgQInitTask

func MsgQInitTask(m MsgQ)

MsgQInitTask add consume task to beego toolbox task, run consumer interval.

func MsgQPublish

func MsgQPublish(m MsgQ) error

MsgQPublish represents publish a task.

func MsgQPublishWithRetry

func MsgQPublishWithRetry(m MsgQ, timeout time.Duration, retry int) error

MsgQPublishWithRetry represents publish with retry and timeout set

func RegisterModel

func RegisterModel()

func RegisterTask

func RegisterTask(item MsgQ)

RegisterTask is used for consumer task at init func, register them self to the tasker

func SetStats

func SetStats(s Stats)

SetStats is used for specify a real stat implemention

func Stat

func Stat(topic, status string, duration time.Duration)

Types

type ConsumeFn

type ConsumeFn func(Input string, WorkerID uint64) (err error)

ConsumeFn represents consume func definition.

type Core

type Core struct {
	Id                int
	MasterInstanceID  uint16    `orm:"column(master_instance_id)"`
	MasterFQDN        string    `orm:"column(master_fqdn)"`
	Updated           time.Time `orm:"auto_now"`
	MasterOutOfDate   int64     // ms
	InstanceHeartbeat int64     // ms
}

Core is the task package config table.

`MasterOutOfDate` means the time master state can be save, instance can race to be master when the duration of `Updated` to now bigger than this. `InstanceHeartbeat` is the max interval Instance should be check Master, should be less than `MasterOutOfDate`

type MsgQ

type MsgQ interface {
	New() MsgQ
	Topic() string
	TaskSpec() string
	Concurency() int
	Exec(uint64) error
}

MsgQ is interface for msg, all messages should be implements these interfaces.

type Stats

type Stats interface {
	Stat(topic, status string, t time.Time, duration time.Duration) error
}

Stats is a interface for tasker to stat task execution result

type Task

type Task struct {
	Id       int
	Topic    string
	Status   string
	Timeout  int // 超时ms
	Retry    int
	Input    string `orm:"type(text)"`
	WorkerId uint64
	Created  time.Time `orm:"auto_now_add"`
	Updated  time.Time `orm:"auto_now"`
	Log      string    `orm:"type(text)"`
}

Task is the core object for tasker package.

func NewSimpleTask

func NewSimpleTask(topic, input string) *Task

NewSimpleTask is new task with default settings.

func (*Task) Publish

func (self *Task) Publish() (err error)

Publish Task to Msg Queue.

func (*Task) TableEngine

func (self *Task) TableEngine() string

func (*Task) TableIndex

func (self *Task) TableIndex() [][]string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL