executor4go

package module
v0.0.0-...-d0d23b5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2022 License: Apache-2.0 Imports: 14 Imported by: 0

README

boot4go-xxljob-executor

golang for xxl-job executor support

license

Java and go development coexist in many companies. XXL-job is used as the task scheduling engine in Java. Therefore, go executor (client) also appears, which is relatively simple to use:

Feature

1. Executor registration
2. Time consuming task cancellation
3. Task registration, like http As convenient as handler
4. Task panic processing
5. Blocking strategy processing
6. Task completion support return execution remarks
7. Task timeout cancellation (unit: seconds, 0 is unlimited)
8. Number of failed retries (in parameter param, the task will handle it by itself at present)

Example

package main

import (
	"fmt"
	executor4go "github.com/gohutool/boot4go-xxljob-executor"
	"github.com/gohutool/boot4go-xxljob-executor/examples/task"
	"log"
)

func main() {
	log4go.LoggerManager.InitWithDefaultConfig()
	logger := log4go.LoggerManager.GetLogger("gohutool.executor4go.examples")

	exec := NewExecutor(
		ServerAddr("http://192.168.56.101:18080/xxl-job-admin"),
		AccessToken(""),            //请求令牌(默认为空)
		ExecutorIp("192.168.56.1"), //可自动获取
		ExecutorPort("9999"),       //默认9999(非必填)
		RegistryKey("executor4go"), //执行器名称
		SetLogger(logger),          //自定义日志
	)
	
		exec.Init()
	//设置日志查看handler
	exec.SetLogHandler(func(req *LogReq) *LogRes {
		return &LogRes{Code: 200, Msg: "", Content: LogResContent{
			FromLineNum: req.FromLineNum,
			ToLineNum:   2,
			LogContent:  "这个是自定义日志handler",
			IsEnd:       true,
		}}
	})
	//注册任务handler
	exec.RegTask("task.simple", SimpleTask)
	exec.RegTask("task.longTime", LongTimeTask)
	exec.RegTask("task.panic", PanicTask)
	exec.RegTask("task.exception", ExceptionTask)

	logger.Info(exec.Run())
}

Output of startup

[18:40:58 CST 2022/04/10 583] [INFO][gohutool.executor4go.examples] (github.com/gohutool/boot4go-xxljob-executor.(*Executor).Init:112) executor4go-v1.0.0 192.106.56.1:9999
[18:40:58 CST 2022/04/10 583] [INFO][gohutool.executor4go.examples] (github.com/gohutool/boot4go-xxljob-executor.(*RestFulExecutor).Run:49) Starting server at 192.106.56.1:9999
[18:41:33 CST 2022/04/10 252] [EROR][gohutool.executor4go.examples] (github.com/gohutool/boot4go-xxljob-executor.(*RestFulExecutor).Registry.func1:84) 执行器注册失败1:Post "http
://192.106.56.101/xxl-job-admin/api/registry": dial tcp 192.106.56.101:80: connectex: A connection attempt failed because the connected party did not properly respond after a pe
riod of time, or established connection failed because connected host has failed to respond.

    func SimpleTask(cxt context.Context, param *RunReq) (string, error) {
        fmt.Println("test one task" + param.ExecutorHandler + " param:" +
        param.ExecutorParams + " log_id:" + Int64ToStr(param.LogID))
        return "test done", nil
    }
    
    func PanicTask(cxt context.Context, param *RunReq) (string, error) {
        panic("test one task" + param.ExecutorHandler + " param:" +
        param.ExecutorParams + " log_id:" + Int64ToStr(param.LogID))
    }
    
    func ExceptionTask(cxt context.Context, param *RunReq) (string, error) {
        return "", errors.New("test one task" + param.ExecutorHandler +
        " param:" + param.ExecutorParams + " log_id:" + Int64ToStr(param.LogID))
    }
func LongTimeTask(cxt context.Context, param *RunReq) (string, error) {
    
    ....

	for {
		select {
		case <-cxt.Done():
			fmt.Println("task" + param.ExecutorHandler + "被手动终止")
			return fmt.Sprintf("task"+param.ExecutorHandler+"被手动终止, 执行次数%d 返回值为%d", num, rtn), nil
		default:
			num++

			rtn++

			time.Sleep(time.Duration(interval) * time.Second)

			fmt.Printf("test one task"+param.ExecutorHandler+" param:"+param.ExecutorParams+" 执行次数%d 值为%d\n",
				num, rtn)

			if num > times {
				fmt.Printf("test one task"+param.ExecutorHandler+
					" param:"+param.ExecutorParams+"执行完毕, 执行次数%d 值为%d!\n", num, rtn)
				return fmt.Sprintf("test one task"+param.ExecutorHandler+
					" param:"+param.ExecutorParams+"执行完毕, 执行次数%d 值为%d!\n", num, rtn), nil
			}
		}
	}
	
	....

}

Sample project

github.com/gohutool/boot4go-xxljob-executor/examples/

xxl-job-admin configuration

Add executor

Executor management - > Add Executor. The list of Executors is as follows:

AppName		名称		注册方式	OnLine 		机器地址 		操作
executor4go	golang执行器	自动注册 		查看 ( 1 )   

View->Registered Nodes

http://192.168.56.1:9999
Add Tasks

Task management - > Add (note that the bean mode is used, and the name of jobhandler is the same as that of regtask)

1	测试简单任务	BEAN:task.simple	    * 0 * * * ?	admin	STOP	
2	测试耗时任务	BEAN:task.longTime	    * * * * * ?	admin	STOP	
3	测试异常任务	BEAN:task.panic		* * * * * ?	admin	STOP
4	测试错误任务	BEAN:task.exception	* * * * * ?	admin	STOP

Documentation

Index

Constants

View Source
const (
	EXECUTOR4G_VERSION = "executor4go-v1.0.0"
	EXECUTOR4G_MAJOR   = 1
	EXECUTOR4G_MINOR   = 0
	EXECUTOR4G_BUILD   = 0
)

Variables

View Source
var (
	DefaultExecutorPort = "9999"
	DefaultRegistryKey  = "executor4go"
)

Functions

func NewExecutor

func NewExecutor(opts ...Option) executor

NewExecutor 创建执行器

Types

type ExecuteResult

type ExecuteResult struct {
	Code int64       `json:"code"`
	Msg  interface{} `json:"msg"`
}

ExecuteResult 任务执行结果 200 表示任务执行正常,500表示失败

type Executor

type Executor struct {
	// contains filtered or unexported fields
}

func (*Executor) CancelTask

func (e *Executor) CancelTask(param *killReq)

func (*Executor) Init

func (e *Executor) Init(opts ...Option)

func (*Executor) RegTask

func (e *Executor) RegTask(pattern string, task TaskFunc)

RegTask 注册任务

func (*Executor) Registry

func (e *Executor) Registry()

Registry 注册执行器到调度中心

func (*Executor) RequestCallback

func (e *Executor) RequestCallback(task *Task, code int64, msg string)

func (*Executor) Run

func (e *Executor) Run() error

Run 运行执行器引擎 * 在执行器引擎里必须处理来至于调度中心的一下命令 run kill log beat idleBeat

func (*Executor) RunTask

func (e *Executor) RunTask(param *RunReq,
	onRunning func(task *Task, param *RunReq),
	onSuccess func(task *Task, param *RunReq)) (*Task, error)

func (*Executor) SetLogHandler

func (e *Executor) SetLogHandler(logHandler LogHandler)

func (*Executor) Stop

func (e *Executor) Stop()

Stop 停止服务

func (*Executor) UnRegistry

func (e *Executor) UnRegistry() error

UnRegistry 从调度中心在注销执行器

type LogHandler

type LogHandler func(req *LogReq) *LogRes

LogHandler jobLogger处理器

type LogReq

type LogReq struct {
	LogDateTim  int64 `json:"logDateTim"`  // 本次调度日志时间
	LogID       int64 `json:"logId"`       // 本次调度日志ID
	FromLineNum int   `json:"fromLineNum"` // 日志开始行号,滚动加载日志
}

LogReq 日志请求

type LogRes

type LogRes struct {
	Code    int64         `json:"code"`    // 200 表示正常、其他失败
	Msg     string        `json:"msg"`     // 错误提示消息
	Content LogResContent `json:"content"` // 日志响应内容
}

LogRes 日志响应

type LogResContent

type LogResContent struct {
	FromLineNum int    `json:"fromLineNum"` // 本次请求,日志开始行数
	ToLineNum   int    `json:"toLineNum"`   // 本次请求,日志结束行号
	LogContent  string `json:"logContent"`  // 本次请求日志内容
	IsEnd       bool   `json:"isEnd"`       // 日志是否全部加载完
}

LogResContent 日志响应内容

type Option

type Option func(o *Options)

func AccessToken

func AccessToken(token string) Option

AccessToken 请求令牌

func ExecutorIp

func ExecutorIp(ip string) Option

ExecutorIp 设置执行器IP

func ExecutorPort

func ExecutorPort(port string) Option

ExecutorPort 设置执行器端口

func RegistryKey

func RegistryKey(registryKey string) Option

RegistryKey 设置执行器标识

func ServerAddr

func ServerAddr(addr string) Option

ServerAddr 设置调度中心地址

func SetLogger

func SetLogger(l log4go.Logger) Option

SetLogger 设置日志处理器

type Options

type Options struct {
	ServerAddr   string        `json:"server_addr"`   //调度中心地址
	AccessToken  string        `json:"access_token"`  //请求令牌
	Timeout      time.Duration `json:"timeout"`       //接口超时时间
	ExecutorIp   string        `json:"executor_ip"`   //本地(执行器)IP(可自行获取)
	ExecutorPort string        `json:"executor_port"` //本地(执行器)端口
	RegistryKey  string        `json:"registry_key"`  //执行器名称
	LogDir       string        `json:"log_dir"`       //日志目录
	// contains filtered or unexported fields
}

type Registry

type Registry struct {
	RegistryGroup string `json:"registryGroup"`
	RegistryKey   string `json:"registryKey"`
	RegistryValue string `json:"registryValue"`
}

Registry 注册参数

type RestFulExecutor

type RestFulExecutor struct {
	Executor
	// contains filtered or unexported fields
}

func (*RestFulExecutor) Registry

func (e *RestFulExecutor) Registry()

Registry 注册执行器到调度中心

func (*RestFulExecutor) RequestCallback

func (e *RestFulExecutor) RequestCallback(task *Task, code int64, msg string)

RequestCallback 回调任务列表

func (*RestFulExecutor) Run

func (e *RestFulExecutor) Run() (err error)

func (*RestFulExecutor) Stop

func (e *RestFulExecutor) Stop()

func (*RestFulExecutor) UnRegistry

func (e *RestFulExecutor) UnRegistry() error

UnRegistry 从调度中心在注销执行器

type RunReq

type RunReq struct {
	JobID                 int64  `json:"jobId"`                 // 任务ID
	ExecutorHandler       string `json:"executorHandler"`       // 任务标识
	ExecutorParams        string `json:"executorParams"`        // 任务参数
	ExecutorBlockStrategy string `json:"executorBlockStrategy"` // 任务阻塞策略
	ExecutorTimeout       int64  `json:"executorTimeout"`       // 任务超时时间,单位秒,大于零时生效
	LogID                 int64  `json:"logId"`                 // 本次调度日志ID
	LogDateTime           int64  `json:"logDateTime"`           // 本次调度日志时间
	GlueType              string `json:"glueType"`              // 任务模式,可选值参考 com.xxl.job.core.glue.GlueTypeEnum
	GlueSource            string `json:"glueSource"`            // GLUE脚本代码
	GlueUpdatetime        int64  `json:"glueUpdatetime"`        // GLUE脚本更新时间,用于判定脚本是否变更以及是否需要刷新
	BroadcastIndex        int64  `json:"broadcastIndex"`        // 分片参数:当前分片
	BroadcastTotal        int64  `json:"broadcastTotal"`        // 分片参数:总分片
}

RunReq 触发任务请求参数

type Task

type Task struct {
	Id    int64
	Name  string
	Ext   context.Context
	Param *RunReq

	Cancel    context.CancelFunc
	StartTime int64
	EndTime   int64
	// contains filtered or unexported fields
}

Task 任务

func (*Task) Info

func (t *Task) Info() string

Info 任务信息

func (*Task) Run

func (t *Task) Run(callback func(code int64, msg string))

Run 运行任务

type TaskFunc

type TaskFunc func(cxt context.Context, param *RunReq) (string, error)

TaskFunc 任务执行函数

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL