snailjob

package module
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 29, 2024 License: Apache-2.0 Imports: 6 Imported by: 0

README

snail-job-Logo

🔥🔥🔥 灵活,可靠和快速的分布式任务重试和分布式任务调度平台

✅️ 可重放,可管控、为提高分布式业务系统一致性的分布式任务重试平台
✅️ 支持秒级、可中断、可编排的高性能分布式任务调度平台

简介

SnailJob 是一个灵活、可靠且高效的分布式任务重试和任务调度平台。其核心采用分区模式实现,具备高度可伸缩性和容错性的分布式系统。拥有完善的权限管理、强大的告警监控功能和友好的界面交互。欢迎大家接入并使用。

snail-job-go

snail-job 项目的 GO 客户端。snail-job项目 java 后端

采用GO原生语言开发的SnailJob客户端具备与SnailJob的Java客户端Job模块一样的能力包括(集群、广播、静态分片、Map、MapReuce、DAG工作流、实时日志等功能)

相关链接

快速入门

开始使用

  1. 在go.mod文件中添加依赖

仓库地址: https://pkg.go.dev/github.com/open-snail/snail-job-go

require  github.com/open-snail/snail-job-go {版本号}
  1. 配置客户端参数
// 配置Options参数
exec := snailjob.NewSnailJobManager(&dto.Options{
		ServerHost:   "127.0.0.1",
		ServerPort:   "17888",
		HostIP:       "127.0.0.1",
		HostPort:     "17889",
		Namespace:    "764d604ec6fc45f68cd92514c40e9e1a",
		GroupName:    "snail_job_demo_group",
		Token:        "SJ_Wyz3dmsdbDOkDujOTSSoBjGQP1BMsVnj",
		Level:        logrus.InfoLevel,
		ReportCaller: true,
	})

    // 注册执行器
	exec.Register("testJobExecutor", func() job.IJobExecutor {
		return &Test3JobExecutor{}
	})

    // 初始化环境
	if nil == exec.Init() {
		// 启动客户端
		exec.Run()
	}

登录后台,能看到对应host-id 为 go-xxxxxx 的客户端

示例
定时任务
// TestJobExecutor 这是一个示例执行器
type TestJobExecutor struct {
   job.BaseJobExecutor
}

func (executor *Test2JobExecutor) DoJobExecute(jobArgs dto.IJobArgs) dto.ExecuteResult {
executor.RemoteLogger.Infof("TestJobExecutor 执行结束 DoJobExecute. jobId: [%d] now:[%s]", jobArgs.GetJobId(), time.Now().String())
return *dto.Success().WithMessage("hello 这是go客户端")
}

新建定时任务, 执行器类型选择【Go】,执行器名称填入【testJobExecutor】

动态分片
// TestMapJobExecutor 这是一个测试类
type TestMapJobExecutor struct {
	job.BaseMapJobExecutor
}

func (executor *TestMapJobExecutor) DoJobMapExecute(mpArgs *dto.MapArgs) dto.ExecuteResult {
	logger := executor.LocalLogger
	if mpArgs.TaskName == constant.ROOT_MAP {
		_, _ = executor.DoMap([]interface{}{1, 2, 3}, "secondTaskName")
		return *dto.Success()
	}

	logger.Infof("TestMapJobExecutor执行 DoJobMapExecute. jobId: [%d] TaskName:[%s] ", mpArgs.GetJobId(), mpArgs.TaskName)
	return *dto.Success().WithMessage("这是动态分片")
}

MapReduce
// TestMapReduceJobExecutor 这是一个测试类
type TestMapReduceJobExecutor struct {
	job.BaseMapReduceJobExecutor
}

func (executor *TestMapReduceJobExecutor) DoJobMapExecute(mpArgs *dto.MapArgs) dto.ExecuteResult {
	logger := executor.LocalLogger
	return *dto.Success().WithMessage("这是动态分片阶段")
}

// DoReduceExecute 模板类
func (executor *TestMapReduceJobExecutor) DoReduceExecute(jobArgs *dto.ReduceArgs) dto.ExecuteResult {
	logger := executor.LocalLogger
	logger.Infof("TestMapReduceJobExecutor 开始执行 DoReduceExecute.")

    return *dto.Success().WithMessage("这是Reduce阶段")
}

func (executor *TestMapReduceJobExecutor) DoMergeReduceExecute(jobArgs *dto.MergeReduceArgs) dto.ExecuteResult {
	logger := executor.LocalLogger
	logger.Info("TestMapReduceJobExecutor 开始执行 DoMergeReduceExecute.")

    return *dto.Success().WithMessage("这是merge阶段")
}

响应停止事件
type TestJobExecutor struct {
	job.BaseJobExecutor
}

// 测试超时时间
func (executor *TestJobExecutor) DoJobExecute(jobArgs dto.IJobArgs) dto.ExecuteResult {

	time.Sleep(1 * time.Second)
	interrupt := executor.Context().Value(constant.INTERRUPT_KEY)
	if interrupt != nil {
		executor.LocalLogger.Errorf("任务被中断. jobId: [%d] now:[%s]", jobArgs.GetJobId(), time.Now().String())
		return *dto.Failure().WithMessage("任务被中断")
	}
	
	return *dto.Success().WithMessage("hello 这是go客户端")
}

工作流

// TestWorkflowJobExecutor 这是一个测试类
type TestWorkflowJobExecutor struct {
	job.BaseJobExecutor
}

func (executor *TestWorkflowJobExecutor) DoJobExecute(jobArgs dto.IJobArgs) dto.ExecuteResult {
	executor.LocalLogger.Infof("TestWorkflowJobExecutor. jobId: [%d] wfContext:[%+v]",
		jobArgs.GetJobId(), jobArgs.GetWfContext("name"))
	jobArgs.AppendContext("name", "xiaowoniu")
	return *dto.Success().WithMessage("测试工作流")
}

系统截图

系统截图 系统截图
系统截图 系统截图
系统截图 系统截图
系统截图 系统截图
系统截图 系统截图
系统截图 系统截图
系统截图 系统截图

期望

欢迎提出更好的意见,帮助完善 snail-job

版权

Aizuda/SnailJob 采用APACHE LICENSE 2.0 开源协议,您在使用过程中,需要注意以下几点:

  1. 不得修改产品相关代码的源码头注释和出处;
  2. 不得应用于危害国家安全、荣誉和利益的行为,不能以任何形式用于非法目的;

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type SnailJobManager

type SnailJobManager struct {
	// contains filtered or unexported fields
}

SnailJobManager snail job 客户端启动者

func NewSnailJobManager

func NewSnailJobManager(opts *dto.Options) *SnailJobManager

func (*SnailJobManager) GetClient

func (e *SnailJobManager) GetClient() job.SnailJobClient

func (*SnailJobManager) GetLoggerFactory

func (e *SnailJobManager) GetLoggerFactory() job.LoggerFactory

func (*SnailJobManager) Init

func (e *SnailJobManager) Init() error

func (*SnailJobManager) Register

func (e *SnailJobManager) Register(name string, executor job.NewJobExecutor) *SnailJobManager

Register 注册job执行器入门

func (*SnailJobManager) Run

func (e *SnailJobManager) Run()

Directories

Path Synopsis
job

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL