xxl

package module
v0.5.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 24, 2021 License: MIT Imports: 15 Imported by: 0

README

xxl-job-executor-go

很多公司java与go开发共存,java中有xxl-job做为任务调度引擎,为此也出现了go执行器(客户端),使用起来比较简单:

支持

1.执行器注册
2.耗时任务取消
3.任务注册,像写http.Handler一样方便
4.任务panic处理
5.阻塞策略处理
6.任务完成支持返回执行备注
7.任务超时取消 (单位:秒,0为不限制)
8.失败重试次数(在参数param中,目前由任务自行处理)
9.可自定义日志
10.自定义日志查看handler
11.支持外部路由(可与gin集成)

Example

package main

import (
	"fmt"
	xxl "github.com/xxl-job/xxl-job-executor-go"
	"github.com/xxl-job/xxl-job-executor-go/example/task"
	"log"
)

func main() {
	exec := xxl.NewExecutor(
		xxl.ServerAddr("http://127.0.0.1/xxl-job-admin"),
		xxl.AccessToken(""),            //请求令牌(默认为空)
		xxl.ExecutorIp("127.0.0.1"),    //可自动获取
		xxl.ExecutorPort("9999"),       //默认9999(非必填)
		xxl.RegistryKey("golang-jobs"), //执行器名称
		xxl.SetLogger(&logger{}),       //自定义日志
	)
	exec.Init()
	//设置日志查看handler
	exec.LogHandler(func(req *xxl.LogReq) *xxl.LogRes {
		return &xxl.LogRes{Code: 200, Msg: "", Content: xxl.LogResContent{
			FromLineNum: req.FromLineNum,
			ToLineNum:   2,
			LogContent:  "这个是自定义日志handler",
			IsEnd:       true,
		}}
	})
	//注册任务handler
	exec.RegTask("task.test", task.Test)
	exec.RegTask("task.test2", task.Test2)
	exec.RegTask("task.panic", task.Panic)
	log.Fatal(exec.Run())
}

//xxl.Logger接口实现
type logger struct{}

func (l *logger) Info(format string, a ...interface{}) {
	fmt.Println(fmt.Sprintf("自定义日志 - "+format, a...))
}

func (l *logger) Error(format string, a ...interface{}) {
	log.Println(fmt.Sprintf("自定义日志 - "+format, a...))
}

示例项目

github.com/xxl-job/xxl-job-executor-go/example/

与gin框架集成

https://github.com/gin-middleware/xxl-job-executor

xxl-job-admin配置

添加执行器

执行器管理->新增执行器,执行器列表如下:

AppName		名称		注册方式	OnLine 		机器地址 		操作
golang-jobs	golang执行器	自动注册 		查看 ( 1 )   

查看->注册节点

http://127.0.0.1:9999
添加任务

任务管理->新增(注意,使用BEAN模式,JobHandler与RegTask名称一致)

1	测试panic	BEAN:task.panic	* 0 * * * ?	admin	STOP	
2	测试耗时任务	BEAN:task.test2	* * * * * ?	admin	STOP	
3	测试golang	BEAN:task.test		* * * * * ?	admin	STOP

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	DefaultExecutorPort = "9999"
	DefaultRegistryKey  = "golang-jobs"
)

Functions

func Int64ToStr

func Int64ToStr(i int64) string

int64 to str

func StrToInt64

func StrToInt64(str string) int64

str to int64

Types

type Executor

type Executor interface {
	//初始化
	Init(...Option)
	//日志查询
	LogHandler(handler LogHandler)
	//注册任务
	RegTask(pattern string, task TaskFunc)
	//运行任务
	RunTask(writer http.ResponseWriter, request *http.Request)
	//杀死任务
	KillTask(writer http.ResponseWriter, request *http.Request)
	//任务日志
	TaskLog(writer http.ResponseWriter, request *http.Request)
	//运行服务
	Run() error
	//停止服务
	Stop()
}

执行器

func NewExecutor

func NewExecutor(opts ...Option) Executor

创建执行器

type LogFunc

type LogFunc func(req LogReq, res *LogRes) []byte

应用日志

type LogHandler

type LogHandler func(req *LogReq) *LogRes

type LogReq

type LogReq struct {
	LogDateTim  int64 `json:"logDateTim"`  // 本次调度日志时间
	LogID       int64 `json:"logId"`       // 本次调度日志ID
	FromLineNum int   `json:"fromLineNum"` // 日志开始行号,滚动加载日志
}

日志请求

type LogRes

type LogRes struct {
	Code    int64         `json:"code"`    // 200 表示正常、其他失败
	Msg     string        `json:"msg"`     // 错误提示消息
	Content LogResContent `json:"content"` // 日志响应内容
}

日志响应

type LogResContent

type LogResContent struct {
	FromLineNum int    `json:"fromLineNum"` // 本次请求,日志开始行数
	ToLineNum   int    `json:"toLineNum"`   // 本次请求,日志结束行号
	LogContent  string `json:"logContent"`  // 本次请求日志内容
	IsEnd       bool   `json:"isEnd"`       // 日志是否全部加载完
}

日志响应内容

type Logger

type Logger interface {
	Info(format string, a ...interface{})
	Error(format string, a ...interface{})
}

系统日志

type Option

type Option func(o *Options)

func AccessToken

func AccessToken(token string) Option

请求令牌

func ExecutorIp

func ExecutorIp(ip string) Option

设置执行器IP

func ExecutorPort

func ExecutorPort(port string) Option

设置执行器端口

func RegistryKey

func RegistryKey(registryKey string) Option

设置执行器标识

func ServerAddr

func ServerAddr(addr string) Option

设置调度中心地址

func SetLogger

func SetLogger(l Logger) Option

设置日志处理器

type Options

type Options struct {
	ServerAddr   string        `json:"server_addr"`   //调度中心地址
	AccessToken  string        `json:"access_token"`  //请求令牌
	Timeout      time.Duration `json:"timeout"`       //接口超时时间
	ExecutorIp   string        `json:"executor_ip"`   //本地(执行器)IP(可自行获取)
	ExecutorPort string        `json:"executor_port"` //本地(执行器)端口
	RegistryKey  string        `json:"registry_key"`  //执行器名称
	LogDir       string        `json:"log_dir"`       //日志目录
	// contains filtered or unexported fields
}

type Registry

type Registry struct {
	RegistryGroup string `json:"registryGroup"`
	RegistryKey   string `json:"registryKey"`
	RegistryValue string `json:"registryValue"`
}

注册参数

type RunReq

type RunReq struct {
	JobID                 int64  `json:"jobId"`                 // 任务ID
	ExecutorHandler       string `json:"executorHandler"`       // 任务标识
	ExecutorParams        string `json:"executorParams"`        // 任务参数
	ExecutorBlockStrategy string `json:"executorBlockStrategy"` // 任务阻塞策略
	ExecutorTimeout       int64  `json:"executorTimeout"`       // 任务超时时间,单位秒,大于零时生效
	LogID                 int64  `json:"logId"`                 // 本次调度日志ID
	LogDateTime           int64  `json:"logDateTime"`           // 本次调度日志时间
	GlueType              string `json:"glueType"`              // 任务模式,可选值参考 com.xxl.job.core.glue.GlueTypeEnum
	GlueSource            string `json:"glueSource"`            // GLUE脚本代码
	GlueUpdatetime        int64  `json:"glueUpdatetime"`        // GLUE脚本更新时间,用于判定脚本是否变更以及是否需要刷新
	BroadcastIndex        int64  `json:"broadcastIndex"`        // 分片参数:当前分片
	BroadcastTotal        int64  `json:"broadcastTotal"`        // 分片参数:总分片
}

触发任务请求参数

type Task

type Task struct {
	Id    int64
	Name  string
	Ext   context.Context
	Param *RunReq

	Cancel    context.CancelFunc
	StartTime int64
	EndTime   int64
	// contains filtered or unexported fields
}

任务

func (*Task) Info

func (t *Task) Info() string

任务信息

func (*Task) Run

func (t *Task) Run(callback func(code int64, msg string))

运行任务

type TaskFunc

type TaskFunc func(cxt context.Context, param *RunReq) string

任务执行函数

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL