worker

package
v0.0.0-...-3f08e25 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 3, 2020 License: MIT Imports: 28 Imported by: 0

README

Worker

Worker相关的代码。

Documentation

Overview

计划任务的执行

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Executor

type Executor struct {
}

任务执行器

func NewExecutor

func NewExecutor() (executor *Executor)

初始化执行器

func (*Executor) ExecuteJob

func (executor *Executor) ExecuteJob(info *datamodels.JobExecuteInfo, c chan<- *datamodels.JobExecuteResult) (err error)

执行一个任务

func (*Executor) GetJobCategory

func (executor *Executor) GetJobCategory(idOrName string) (category *datamodels.Category, err error)

获取计划任务的分类信息 URL:/api/v1/category/:name Method: GET

func (*Executor) PostCategoryToMaster

func (executor *Executor) PostCategoryToMaster(category *datamodels.Category) (*datamodels.Category, error)

创建分类 URL:/api/v1/category/:name Method: GET

func (*Executor) PostJobExecuteResultToMaster

func (executor *Executor) PostJobExecuteResultToMaster(result *datamodels.JobExecuteResult) (*datamodels.JobExecuteResult, error)

Post发送任务执行信息到Master URL:/api/v1/job/execute/create Method: POST Data: jobExecute

func (*Executor) PostJobExecuteToMaster

func (executor *Executor) PostJobExecuteToMaster(jobExecute *datamodels.JobExecute) (*datamodels.JobExecute, error)

Post发送任务执行信息到Master URL:/api/v1/job/execute/create Method: POST Data: jobExecute

type JobExecuteLogFilter

type JobExecuteLogFilter struct {
	Name string `bson: "name"` // job的名字

}

type JobLock

type JobLock struct {
	ID           int       `json:"id"`        // 锁请求的序号
	Name         string    `json:"name"`      // 锁的名称
	LeaseID      int64     `json:"lease_id"`  // 锁对应的租约ID
	Password     string    `json:"password"`  // 锁的密码
	IsActive     bool      `json:"is_active"` // 锁是否有效中
	NeedKillChan chan bool // 释放本程序的通道:当timer到期了,还未发起续租,那么就需要kill,jobLock对应的任务
	// contains filtered or unexported fields
}

计划任务的锁 通过httl连接发起TryLock. 通过http发起续租 通过http发起release释放锁

func NewJobLock

func NewJobLock(name string) (jobLock *JobLock)

实例化一个JobLock

func (*JobLock) LeaseLoop

func (jobLock *JobLock) LeaseLoop()

循环发起续租

func (*JobLock) ReleaseLock

func (jobLock *JobLock) ReleaseLock() (err error)

释放锁 释放租约应该也需要传递秘钥,后续优化,这里暂时只传递锁的名字

func (*JobLock) TryLock

func (jobLock *JobLock) TryLock() (err error)

尝试上锁 通过http向master发起抢锁请求 如果抢锁成功会返回200的结果 抢锁失败就是400的错误请求 Master API: URL: /api/v1/lock/create Method: POST Data: {"name": 锁的名字, "ttl": "锁绑定的租约的time to live"}

func (*JobLock) Unlock

func (jobLock *JobLock) Unlock()

type LogHandler

type LogHandler interface {
	ConsumeLogsLoop()                                  // 消费日志循环函数
	AddLog(executeLog *datamodels.JobExecuteLog) error // 添加日志
	Stop()                                             // 日志处理器停止时的操作

}

日志处理的接口

type MongoLogHandler

type MongoLogHandler struct {
	Duration int // 刷新日志的间隔(毫秒)
	// contains filtered or unexported fields
}

日志处理器--mongo

func NewMongoLogHandler

func NewMongoLogHandler(mongoConfig *common.MongoConfig) (logHandler *MongoLogHandler, err error)

func (*MongoLogHandler) AddLog

func (logHandler *MongoLogHandler) AddLog(executeLog *datamodels.JobExecuteLog) (err error)

保存日志操作

func (*MongoLogHandler) ConsumeLogsLoop

func (logHandler *MongoLogHandler) ConsumeLogsLoop()

消费日志循环

func (*MongoLogHandler) List

func (logHandler *MongoLogHandler) List(page int, pageSize int) (logList []*datamodels.JobExecuteLog, err error)

获取日志的列表

func (*MongoLogHandler) Stop

func (logHandler *MongoLogHandler) Stop()

日志处理器停止时候的操作 停止的时候,需要把日志全部写入 当worker需要停止的时候,需要调度这些

type Register

type Register struct {
	Info *datamodels.Worker // Worker节点的信息

}

注册节点信息到master

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

任务调度器

func NewScheduler

func NewScheduler() *Scheduler

初始化调度器

func (*Scheduler) HandlerJobExecuteResult

func (scheduler *Scheduler) HandlerJobExecuteResult(result *datamodels.JobExecuteResult)

处理计划任务的结果

func (*Scheduler) PushJobEvent

func (scheduler *Scheduler) PushJobEvent(jobEvent *datamodels.JobEvent)

推送任务变化事件

func (*Scheduler) PushJobExecuteResult

func (scheduler *Scheduler) PushJobExecuteResult(result *datamodels.JobExecuteResult)

回传任务执行结果

func (*Scheduler) ScheduleLoop

func (scheduler *Scheduler) ScheduleLoop()

调度协程

func (*Scheduler) TryRunJob

func (scheduler *Scheduler) TryRunJob(jobPlan *datamodels.JobSchedulePlan) (err error)

执行计划任务

func (*Scheduler) TrySchedule

func (scheduler *Scheduler) TrySchedule() (scheduleAfter time.Duration)

计算任务调度状态 会尝试执行需要执行的计划任务,并计算jobPlan的下次执行时间 计算now与所有jobPlan中最近的下次执行的时间的间隔 当间隔大于1分钟的时候,设置其为一分钟

type Socket

type Socket struct {
	IsActive bool // 是否有效,断开的时候设置为false
	// contains filtered or unexported fields
}

func (*Socket) ReadeLoop

func (socket *Socket) ReadeLoop()

接收消息

func (*Socket) SendMessage

func (socket *Socket) SendMessage(messageType int, data []byte, needPacket bool) (err error)

发送消息 messageType: 消息类型 data []byte: 发送小消息内容 needPacket bool: 是否需要封装一下包,有时候可自行封装

func (*Socket) SendMessageEventToMaster

func (socket *Socket) SendMessageEventToMaster(category string, data string) (err error)

发送消息 messageType: 消息类型 data []byte: 发送小消息内容 needPacket bool: 是否需要封装一下包,有时候可自行封装

func (*Socket) Stop

func (socket *Socket) Stop()

socket即将关闭的相关操作

type SortLogByStartTime

type SortLogByStartTime struct {
	StartTime int `bson: "startTime"` // 根据开始时间排序
}

type WatchJobsHandler

type WatchJobsHandler struct {
	KeyDir    string     // 监听的key目录
	Scheduler *Scheduler // 调度器
}

监听etcd中jobs的变化

func (*WatchJobsHandler) HandlerGetResponse

func (watch *WatchJobsHandler) HandlerGetResponse(response *clientv3.GetResponse)

func (*WatchJobsHandler) HandlerWatchChan

func (watch *WatchJobsHandler) HandlerWatchChan(watchChan clientv3.WatchChan)

处理watch

type WatchKillHandler

type WatchKillHandler struct {
	KeyDir    string     // 监听的key目录
	Scheduler *Scheduler // 调度器
}

func (*WatchKillHandler) HandlerGetResponse

func (watch *WatchKillHandler) HandlerGetResponse(response *clientv3.GetResponse)

func (*WatchKillHandler) HandlerWatchChan

func (watch *WatchKillHandler) HandlerWatchChan(watchChan clientv3.WatchChan)

type Worker

type Worker struct {
	TimeStart  time.Time       // 启动时间
	Scheduler  *Scheduler      // 调度器
	Categories map[string]bool // 执行计划任务的类型

	IsActive bool // 是否有效
	// contains filtered or unexported fields
}

func NewWorkerApp

func NewWorkerApp() *Worker

实例化Worker

func (*Worker) Run

func (w *Worker) Run()

func (*Worker) Stop

func (w *Worker) Stop()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL