xxl

package module
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 30, 2025 License: MIT Imports: 25 Imported by: 0

README

xxl-job-executor-go

golang基于xxl-job的bean类型的执行器

项目徽章 (如构建状态、版本、许可证等)

功能亮点

  • 执行器注册
  • 耗时任务取消
  • 任务注册,像写http.Handler一样方便
  • 任务panic处理
  • 阻塞策略处理
  • 任务完成支持返回执行备注
  • 任务超时取消 (单位:秒,0为不限制)
  • 失败重试次数(在参数param中,目前由任务自行处理)
  • 可自定义日志
  • 自定义日志查看handler
  • 支持外部路由(可与gin集成)
  • 支持自定义中间件

更新记录

安装说明

使用示例

https://yuebaiapp.feishu.cn/wiki/T6Y6wfNxfid220knD2pcFmeAnUb?fromScene=spaceOverview

贡献指南

许可证信息

联系方式/支持

致谢/鸣谢

Documentation

Index

Constants

View Source
const (
	SuccessCode = 200
	FailureCode = 500
)

响应码

Variables

View Source
var (
	DefaultExecutorPort = "9999"
	DefaultRegistryKey  = "golang-jobs"
)

Functions

func Int64ToStr

func Int64ToStr(i int64) string

Int64ToStr int64 to str

func OpenLogFile

func OpenLogFile(filePath string) (*os.File, error)

func ReadLog

func ReadLog(fileName string, startLine, lineCount int) ([]string, int, bool, error)

ReadLog 读取日志

func ReadLogBackwards

func ReadLogBackwards(fileName string, startLine, lineCount int) (string, int, bool, error)

ReadLogBackwards 倒序读取日志

func SafeGo

func SafeGo(f func())

SafeGo 安全的启动协程

Types

type AutoRotateWriter

type AutoRotateWriter struct {
	// contains filtered or unexported fields
}

func NewAutoRotateWriter

func NewAutoRotateWriter(filename string, maxSizeMB int64) (*AutoRotateWriter, error)

func (*AutoRotateWriter) Close

func (w *AutoRotateWriter) Close() error

func (*AutoRotateWriter) Write

func (w *AutoRotateWriter) Write(p []byte) (n int, err error)

type ExecuteResult

type ExecuteResult struct {
	Code int64       `json:"code"`
	Msg  interface{} `json:"msg"`
}

ExecuteResult 任务执行结果 200 表示任务执行正常,500表示失败

type Executor

type Executor interface {
	// Init 初始化
	Init(...Option)
	// LogHandler 日志查询
	LogHandler(handler LogHandler)
	// LogCleanHandler 日志清理
	LogCleanHandler(handler LogCLeanHandler)
	// Use 使用中间件
	Use(middlewares ...Middleware)
	// RegTask 注册任务
	RegTask(pattern string, task TaskFunc)
	// RunTask 运行任务
	RunTask(writer http.ResponseWriter, request *http.Request)
	// KillTask 杀死任务
	KillTask(writer http.ResponseWriter, request *http.Request)
	// TaskLog 任务日志
	TaskLog(writer http.ResponseWriter, request *http.Request)
	// Beat 心跳检测
	Beat(writer http.ResponseWriter, request *http.Request)
	// IdleBeat 忙碌检测
	IdleBeat(writer http.ResponseWriter, request *http.Request)
	// Run 运行服务
	Run() error
	// Stop 停止服务
	Stop()
	// Info 执行器信息
	Info() map[string]interface{}
}

Executor 执行器

func NewExecutor

func NewExecutor(opts ...Option) Executor

NewExecutor 创建执行器

type ExecutorLogger

type ExecutorLogger struct {
	// contains filtered or unexported fields
}

func NewExecutorLogger

func NewExecutorLogger(slug string, conf *ZapLogConf) (logger *ExecutorLogger, err error)

NewExecutorLogger 初始化全局Logger

func (*ExecutorLogger) Close

func (l *ExecutorLogger) Close()

func (*ExecutorLogger) Debug

func (l *ExecutorLogger) Debug(args ...interface{})

func (*ExecutorLogger) Debugf

func (l *ExecutorLogger) Debugf(template string, args ...interface{})

func (*ExecutorLogger) Debugw

func (l *ExecutorLogger) Debugw(msg string, keysAndValues ...interface{})

func (*ExecutorLogger) Error

func (l *ExecutorLogger) Error(args ...interface{})

func (*ExecutorLogger) Errorf

func (l *ExecutorLogger) Errorf(template string, args ...interface{})

func (*ExecutorLogger) Errorw

func (l *ExecutorLogger) Errorw(msg string, keysAndValues ...interface{})

func (*ExecutorLogger) Info

func (l *ExecutorLogger) Info(args ...interface{})

func (*ExecutorLogger) Infof

func (l *ExecutorLogger) Infof(template string, args ...interface{})

func (*ExecutorLogger) Infow

func (l *ExecutorLogger) Infow(msg string, keysAndValues ...interface{})

func (*ExecutorLogger) Warn

func (l *ExecutorLogger) Warn(args ...interface{})

func (*ExecutorLogger) Warnf

func (l *ExecutorLogger) Warnf(template string, args ...interface{})

func (*ExecutorLogger) Warnw

func (l *ExecutorLogger) Warnw(msg string, keysAndValues ...interface{})

type LogCLeanHandler

type LogCLeanHandler func(executor *executor)

func DefaultLogCLeanHandler

func DefaultLogCLeanHandler() LogCLeanHandler

type LogFunc

type LogFunc func(req LogReq, res *LogRes) []byte

LogFunc 应用日志

type LogHandler

type LogHandler func(req *LogReq) *LogRes

type LogReq

type LogReq struct {
	LogDateTim  int64 `json:"logDateTim"`  // 本次调度日志时间
	LogID       int64 `json:"logId"`       // 本次调度日志ID
	JogID       int64 `json:"jobId"`       // 本次调度日志ID
	FromLineNum int   `json:"fromLineNum"` // 日志开始行号,滚动加载日志
}

LogReq 日志请求

type LogRes

type LogRes struct {
	Code    int64         `json:"code"`    // 200 表示正常、其他失败
	Msg     string        `json:"msg"`     // 错误提示消息
	Content LogResContent `json:"content"` // 日志响应内容
}

LogRes 日志响应

type LogResContent

type LogResContent struct {
	FromLineNum int    `json:"fromLineNum"` // 本次请求,日志开始行数
	ToLineNum   int    `json:"toLineNum"`   // 本次请求,日志结束行号
	LogContent  string `json:"logContent"`  // 本次请求日志内容
	IsEnd       bool   `json:"isEnd"`       // 日志是否全部加载完
}

LogResContent 日志响应内容

type Logger

type Logger interface {
	Info(a ...interface{})
	Error(a ...interface{})
	Warn(a ...interface{})
	Debug(a ...interface{})
	Infof(template string, args ...interface{})
	Errorf(template string, args ...interface{})
	Warnf(template string, args ...interface{})
	Debugf(template string, args ...interface{})
	Infow(msg string, keysAndValues ...interface{})
	Errorw(msg string, keysAndValues ...interface{})
	Warnw(msg string, keysAndValues ...interface{})
	Debugw(msg string, keysAndValues ...interface{})
	Close()
}

Logger 系统日志

type Middleware

type Middleware func(TaskFunc) TaskFunc

Middleware 中间件构造函数

type Option

type Option func(o *Options)

func AccessToken

func AccessToken(token string) Option

AccessToken 请求令牌

func ExecutorIp

func ExecutorIp(ip string) Option

ExecutorIp 设置执行器IP

func ExecutorPathPrefix

func ExecutorPathPrefix(prefix string) Option

ExecutorPathPrefix 设置执行器路由前缀

func ExecutorPort

func ExecutorPort(port string) Option

ExecutorPort 设置执行器端口

func RegistryKey

func RegistryKey(registryKey string) Option

RegistryKey 设置执行器标识

func ServerAddr

func ServerAddr(addr string) Option

ServerAddr 设置调度中心地址

func SetGlobalLogger

func SetGlobalLogger(l Logger) Option

SetGlobalLogger 设置日志处理器

func SetLogConfig

func SetLogConfig(lc *ZapLogConf) Option

SetLogConfig 设置日志配置

func SetTaskLogger

func SetTaskLogger(l Logger) Option

SetTaskLogger 设置任务日志处理器

type Options

type Options struct {
	ServerAddr         string        `json:"server_addr"`          //调度中心地址
	AccessToken        string        `json:"access_token"`         //请求令牌
	Timeout            time.Duration `json:"timeout"`              //接口超时时间
	ExecutorIp         string        `json:"executor_ip"`          //本地(执行器)IP(可自行获取)
	ExecutorPort       string        `json:"executor_port"`        //本地(执行器)端口
	ExecutorPathPrefix string        `json:"executor_path_prefix"` //执行器路径
	RegistryKey        string        `json:"registry_key"`         //执行器名称
	LogConfig          *ZapLogConf   `json:"log_config"`           //日志配置
	// contains filtered or unexported fields
}

type Registry

type Registry struct {
	RegistryGroup string `json:"registryGroup"`
	RegistryKey   string `json:"registryKey"`
	RegistryValue string `json:"registryValue"`
}

Registry 注册参数

type RunReq

type RunReq struct {
	JobID                 int64  `json:"jobId"`                 // 任务ID
	ExecutorHandler       string `json:"executorHandler"`       // 任务标识
	ExecutorParams        string `json:"executorParams"`        // 任务参数
	ExecutorBlockStrategy string `json:"executorBlockStrategy"` // 任务阻塞策略
	ExecutorTimeout       int64  `json:"executorTimeout"`       // 任务超时时间,单位秒,大于零时生效
	LogID                 int64  `json:"logId"`                 // 本次调度日志ID
	LogDateTime           int64  `json:"logDateTime"`           // 本次调度日志时间
	GlueType              string `json:"glueType"`              // 任务模式,可选值参考 com.xxl.job.core.glue.GlueTypeEnum
	GlueSource            string `json:"glueSource"`            // GLUE脚本代码
	GlueUpdatetime        int64  `json:"glueUpdatetime"`        // GLUE脚本更新时间,用于判定脚本是否变更以及是否需要刷新
	BroadcastIndex        int64  `json:"broadcastIndex"`        // 分片参数:当前分片
	BroadcastTotal        int64  `json:"broadcastTotal"`        // 分片参数:总分片
}

RunReq 触发任务请求参数

type Task

type Task struct {
	Id    int64
	Name  string
	Ext   context.Context
	Param *RunReq

	Cancel    context.CancelFunc
	StartTime int64
	EndTime   int64
	//日志
	Log Logger
	// contains filtered or unexported fields
}

Task 任务

func (*Task) Info

func (t *Task) Info() *RunReq

Info 任务信息

func (*Task) Run

func (t *Task) Run(callback func(code int64, msg string))

Run 运行任务

type TaskFunc

type TaskFunc func(cxt context.Context, task *Task) string

TaskFunc 任务执行函数

type ZapLogConf

type ZapLogConf struct {
	AppendToSTDOUT  int    `mapstructure:"appendToStdout"`
	AppendToFile    int    `mapstructure:"appendToFile"`
	Level           int8   `mapstructure:"level"` // 0 debug, 1 info, 2 warn, 3 Error, 4 panic
	FilePrefix      string `mapstructure:"filePrefix"`
	DefaultFileName string `mapstructure:"defaultFileName"`
	FileMaxSize     int    `mapstructure:"fileMaxSizeMB"` // 日志文件默认大小
	MaxBackups      int    `mapstructure:"maxBackups"`    // 最多备份日志文件数
	MaxAge          int    `mapstructure:"maxAge"`        // 保留天数
	TimeKey         string `mapstructure:"timeKey"`       // 日志中时间key
	LevelKey        string `mapstructure:"levelKey"`      // 报错等级level_key
	MessageKey      string `mapstructure:"messageKey"`    // 消息key
	EncodeLevel     int    `mapstructure:"encodeLevel"`   // 等级level输出格式 1小写;2大写
	AddCaller       bool   `mapstructure:"addCaller"`     // 是否输出caller文件地址,默认不输出

}

ZapLogConf 配置文件

func DefaultZapLogConf

func DefaultZapLogConf() (zapLogConfig *ZapLogConf)

DefaultZapLogConf 默认配置

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL